from peak.api import * from peak.running.commands import AbstractCommand from peak.web.requests import TestRequest from StringIO import StringIO class PublishInteraction(web.Interaction): """ subclass interaction for testing """ def handleException(self, object, request, exc_info, retry_allowed=1): txt = 'ERROR %s: %s for request: %s\n' % (exc_info[0], exc_info[1], request['PATH_INFO']) txt += '------------------------\n' import traceback txt += '\n'.join(traceback.format_tb(exc_info[2])) txt += '------------------------\n' self.response.setBody(txt) class PublishTest(AbstractCommand): publish = binding.Obtain('import:zope.publisher.publish:publish') def run(self): if len(self.argv)<3: raise InvocationError("missing argument(s)") name = self.argv[1] path = self.argv[2] ob = self.lookupComponent(name, suggestParent=False, default=NOT_FOUND) if ob is NOT_FOUND: raise InvocationError("Name not found: %s" % name) app = ob(self) policy = adapt(app, web.IInteractionPolicy, None) request = self._request(path, self.stdin, self.stdout) interaction = PublishInteraction(app, None, policy=policy, request=request) request.setPublication(interaction) self.publish(request) self.stdout.write('\n') def _request(self, path='/', stdin='', stdout=None, basic=None, environment = None, form=None, request=TestRequest): """Create a request """ env = {} if stdout is None: stdout = StringIO() if type(stdin) is str: stdin = StringIO(stdin) p=path.split('?') if len(p)==1: env['PATH_INFO'] = p[0] elif len(p)==2: env['PATH_INFO'], env['QUERY_STRING'] = p else: raise ValueError("Too many ?s in path", path) if environment is not None: env.update(environment) if basic: env['HTTP_AUTHORIZATION']="Basic %s" % base64.encodestring(basic) request = request(stdin, stdout, env) return request