import unittest import Testing import Zope2 Zope2.startup() import os, sys import time from cStringIO import StringIO from OFS.Application import Application from OFS.SimpleItem import SimpleItem from OFS.Cache import ZCM_MANAGERS from OFS.Image import Pdata from ZPublisher.HTTPRequest import HTTPRequest from ZPublisher.HTTPResponse import HTTPResponse from App.Common import rfc1123_date from Testing.makerequest import makerequest from zExceptions import Redirect import transaction try: here = os.path.dirname(os.path.abspath(__file__)) except: here = os.path.dirname(os.path.abspath(sys.argv[0])) imagedata = os.path.join(here, 'test.gif') filedata = os.path.join(here, 'test.gif') def makeConnection(): import ZODB from ZODB.DemoStorage import DemoStorage s = DemoStorage(quota=(1<<20)) return ZODB.DB( s ).open() def aputrequest(file, content_type): resp = HTTPResponse(stdout=sys.stdout) environ = {} environ['SERVER_NAME']='foo' environ['SERVER_PORT']='80' environ['REQUEST_METHOD'] = 'PUT' environ['CONTENT_TYPE'] = content_type req = HTTPRequest(stdin=file, environ=environ, response=resp) return req class DummyCache: def __init__(self): self.clear() def ZCache_set(self, ob, data, view_name='', keywords=None, mtime_func=None): self.set = (ob, data) def ZCache_get(self, ob, data, view_name='', keywords=None, mtime_func=None): self.get = ob if self.si: return si def ZCache_invalidate(self, ob): self.invalidated = ob def clear(self): self.set=None self.get=None self.invalidated = None self.si = None def setStreamIterator(self, si): self.si = si ADummyCache=DummyCache() class DummyCacheManager(SimpleItem): def ZCacheManager_getCache(self): return ADummyCache class FileTests(unittest.TestCase): data = open(filedata, 'rb').read() content_type = 'application/octet-stream' factory = 'manage_addFile' def setUp( self ): self.connection = makeConnection() try: r = self.connection.root() a = Application() r['Application'] = a self.root = a responseOut = self.responseOut = StringIO() self.app = makerequest( self.root, stdout=responseOut ) self.app.dcm = DummyCacheManager() factory = getattr(self.app, self.factory) factory('file', file=self.data, content_type=self.content_type) self.app.file.ZCacheable_setManagerId('dcm') self.app.file.ZCacheable_setEnabled(enabled=1) setattr(self.app, ZCM_MANAGERS, ('dcm',)) # Hack, we need a _p_mtime for the file, so we make sure that it # has one. transaction.commit() except: self.connection.close() raise transaction.begin() self.file = getattr( self.app, 'file' ) def tearDown( self ): del self.file transaction.abort() self.connection.close() del self.app del self.responseOut del self.root del self.connection ADummyCache.clear() def testViewImageOrFile(self): self.assertRaises(Redirect, self.file.view_image_or_file, 'foo') def testUpdateData(self): self.file.update_data('foo') self.assertEqual(self.file.size, 3) self.assertEqual(self.file.data, 'foo') self.failUnless(ADummyCache.invalidated) self.failUnless(ADummyCache.set) def testReadData(self): s = "a" * (2 << 16) f = StringIO(s) data, size = self.file._read_data(f) self.failUnless(isinstance(data, Pdata)) self.assertEqual(str(data), s) self.assertEqual(len(s), len(str(data))) self.assertEqual(len(s), size) def testBigPdata(self): # Test that a big enough string is split into several Pdata # From a file s = "a" * (1 << 16) * 3 data, size = self.file._read_data(StringIO(s)) self.failIfEqual(data.next, None) # From a string data, size = self.file._read_data(s) self.failIfEqual(data.next, None) def testManageEditWithFileData(self): self.file.manage_edit('foobar', 'text/plain', filedata='ASD') self.assertEqual(self.file.title, 'foobar') self.assertEqual(self.file.content_type, 'text/plain') self.failUnless(ADummyCache.invalidated) self.failUnless(ADummyCache.set) def testManageEditWithoutFileData(self): self.file.manage_edit('foobar', 'text/plain') self.assertEqual(self.file.title, 'foobar') self.assertEqual(self.file.content_type, 'text/plain') self.failUnless(ADummyCache.invalidated) def testManageUpload(self): f = StringIO('jammyjohnson') self.file.manage_upload(f) self.assertEqual(self.file.data, 'jammyjohnson') self.assertEqual(self.file.content_type, 'application/octet-stream') def testIfModSince(self): now = time.time() e = {'SERVER_NAME':'foo', 'SERVER_PORT':'80', 'REQUEST_METHOD':'GET'} # not modified since t_notmod = rfc1123_date(now) e['HTTP_IF_MODIFIED_SINCE'] = t_notmod out = StringIO() resp = HTTPResponse(stdout=out) req = HTTPRequest(sys.stdin, e, resp) data = self.file.index_html(req,resp) self.assertEqual(resp.getStatus(), 304) self.assertEqual(data, '') # modified since t_mod = rfc1123_date(now - 100) e['HTTP_IF_MODIFIED_SINCE'] = t_mod out = StringIO() resp = HTTPResponse(stdout=out) req = HTTPRequest(sys.stdin, e, resp) data = self.file.index_html(req,resp) self.assertEqual(resp.getStatus(), 200) self.assertEqual(data, str(self.file.data)) def testPUT(self): s = '# some python\n' # with content type data = StringIO(s) req = aputrequest(data, 'text/x-python') req.processInputs() self.file.PUT(req, req.RESPONSE) self.assertEqual(self.file.content_type, 'text/x-python') self.assertEqual(str(self.file.data), s) # without content type data.seek(0) req = aputrequest(data, '') req.processInputs() self.file.PUT(req, req.RESPONSE) self.assertEqual(self.file.content_type, 'text/x-python') self.assertEqual(str(self.file.data), s) def testIndexHtmlWithPdata(self): self.file.manage_upload('a' * (2 << 16)) # 128K self.file.index_html(self.app.REQUEST, self.app.REQUEST.RESPONSE) self.assert_(self.app.REQUEST.RESPONSE._wrote) def testIndexHtmlWithString(self): self.file.manage_upload('a' * 100) # 100 bytes self.file.index_html(self.app.REQUEST, self.app.REQUEST.RESPONSE) self.assert_(not self.app.REQUEST.RESPONSE._wrote) def testStr(self): self.assertEqual(str(self.file), self.data) def testFindSupport_not_text(self): self.file.manage_edit('foobar', 'application/octet-stream', filedata=''.join([chr(x) for x in range(256)])) self.assertEqual(self.file.PrincipiaSearchSource(), '') def testFindSupport_text(self): self.file.manage_edit('foobar', 'text/plain', filedata='Now is the time for all good men to ' 'come to the aid of the Party.') self.failUnless('Party' in self.file.PrincipiaSearchSource()) def testFindFile(self): self.file.manage_edit('foobar', 'text/plain', filedata='Now is the time for all good men to ' 'come to the aid of the Party.') results = self.app.ZopeFind(self.app, obj_searchterm='Party') self.assertEqual(len(results), 1) self.assertEqual(results[0][1], self.file) def test_z2interfaces(self): from Interface.Verify import verifyClass from OFS.Image import File from webdav.WriteLockInterface import WriteLockInterface from ZPublisher.HTTPRangeSupport import HTTPRangeInterface verifyClass(HTTPRangeInterface, File) verifyClass(WriteLockInterface, File) def testUnicode(self): val = u'some unicode string here' self.assertRaises(TypeError, self.file.manage_edit, 'foobar', 'text/plain', filedata=val) class ImageTests(FileTests): data = open(filedata, 'rb').read() content_type = 'image/gif' factory = 'manage_addImage' def testUpdateData(self): self.file.update_data(self.data) self.assertEqual(self.file.size, len(self.data)) self.assertEqual(self.file.data, self.data) self.assertEqual(self.file.width, 16) self.assertEqual(self.file.height, 16) self.failUnless(ADummyCache.invalidated) self.failUnless(ADummyCache.set) def testStr(self): self.assertEqual(str(self.file), ('')) def testTag(self): tag_fmt = '%s' self.assertEqual(self.file.tag(), (tag_fmt % ('',''))) self.file.manage_changeProperties(title='foo') self.assertEqual(self.file.tag(), (tag_fmt % ('','foo'))) self.file.manage_changeProperties(alt='bar') self.assertEqual(self.file.tag(), (tag_fmt % ('bar','foo'))) def testViewImageOrFile(self): pass # dtml method,screw it def test_z2interfaces(self): from Interface.Verify import verifyClass from OFS.Image import Image from webdav.WriteLockInterface import WriteLockInterface verifyClass(WriteLockInterface, Image) def test_suite(): return unittest.TestSuite(( unittest.makeSuite(FileTests), unittest.makeSuite(ImageTests), )) if __name__ == '__main__': unittest.main(defaultTest='test_suite')