import unittest
import Testing
import Zope2
Zope2.startup()
import os, sys
import time
from cStringIO import StringIO
from OFS.Application import Application
from OFS.SimpleItem import SimpleItem
from OFS.Cache import ZCM_MANAGERS
from OFS.Image import Pdata
from ZPublisher.HTTPRequest import HTTPRequest
from ZPublisher.HTTPResponse import HTTPResponse
from App.Common import rfc1123_date
from Testing.makerequest import makerequest
from zExceptions import Redirect
import transaction
try:
    here = os.path.dirname(os.path.abspath(__file__))
except:
    here = os.path.dirname(os.path.abspath(sys.argv[0]))
imagedata = os.path.join(here, 'test.gif')
filedata = os.path.join(here, 'test.gif')
def makeConnection():
    import ZODB
    from ZODB.DemoStorage import DemoStorage
    s = DemoStorage(quota=(1<<20))
    return ZODB.DB( s ).open()
def aputrequest(file, content_type):
    resp = HTTPResponse(stdout=sys.stdout)
    environ = {}
    environ['SERVER_NAME']='foo'
    environ['SERVER_PORT']='80'
    environ['REQUEST_METHOD'] = 'PUT'
    environ['CONTENT_TYPE'] = content_type
    req = HTTPRequest(stdin=file, environ=environ, response=resp)
    return req
class DummyCache:
    def __init__(self):
        self.clear()
    def ZCache_set(self, ob, data, view_name='', keywords=None,
                   mtime_func=None):
        self.set = (ob, data)
    def ZCache_get(self, ob, data, view_name='', keywords=None,
                   mtime_func=None):
        self.get = ob
        if self.si:
            return si
    def ZCache_invalidate(self, ob):
        self.invalidated = ob
    def clear(self):
        self.set=None
        self.get=None
        self.invalidated = None
        self.si = None
    def setStreamIterator(self, si):
        self.si = si
ADummyCache=DummyCache()
class DummyCacheManager(SimpleItem):
    def ZCacheManager_getCache(self):
        return ADummyCache
class FileTests(unittest.TestCase):
    data = open(filedata, 'rb').read()
    content_type = 'application/octet-stream'
    factory = 'manage_addFile'
    def setUp( self ):
        self.connection = makeConnection()
        try:
            r = self.connection.root()
            a = Application()
            r['Application'] = a
            self.root = a
            responseOut = self.responseOut = StringIO()
            self.app = makerequest( self.root, stdout=responseOut )
            self.app.dcm = DummyCacheManager()
            factory = getattr(self.app, self.factory)
            factory('file',
                    file=self.data, content_type=self.content_type)
            self.app.file.ZCacheable_setManagerId('dcm')
            self.app.file.ZCacheable_setEnabled(enabled=1)
            setattr(self.app, ZCM_MANAGERS, ('dcm',))
            # Hack, we need a _p_mtime for the file, so we make sure that it
            # has one.
            transaction.commit()
        except:
            self.connection.close()
            raise
        transaction.begin()
        self.file = getattr( self.app, 'file' )
    def tearDown( self ):
        del self.file
        transaction.abort()
        self.connection.close()
        del self.app
        del self.responseOut
        del self.root
        del self.connection
        ADummyCache.clear()
    def testViewImageOrFile(self):
        self.assertRaises(Redirect, self.file.view_image_or_file, 'foo')
    def testUpdateData(self):
        self.file.update_data('foo')
        self.assertEqual(self.file.size, 3)
        self.assertEqual(self.file.data, 'foo')
        self.failUnless(ADummyCache.invalidated)
        self.failUnless(ADummyCache.set)
    def testReadData(self):
        s = "a" * (2 << 16)
        f = StringIO(s)
        data, size = self.file._read_data(f)
        self.failUnless(isinstance(data, Pdata))
        self.assertEqual(str(data), s)
        self.assertEqual(len(s), len(str(data)))
        self.assertEqual(len(s), size)
    def testBigPdata(self):
        # Test that a big enough string is split into several Pdata
        # From a file
        s = "a" * (1 << 16) * 3
        data, size = self.file._read_data(StringIO(s))
        self.failIfEqual(data.next, None)
        # From a string
        data, size = self.file._read_data(s)
        self.failIfEqual(data.next, None)
    def testManageEditWithFileData(self):
        self.file.manage_edit('foobar', 'text/plain', filedata='ASD')
        self.assertEqual(self.file.title, 'foobar')
        self.assertEqual(self.file.content_type, 'text/plain')
        self.failUnless(ADummyCache.invalidated)
        self.failUnless(ADummyCache.set)
    def testManageEditWithoutFileData(self):
        self.file.manage_edit('foobar', 'text/plain')
        self.assertEqual(self.file.title, 'foobar')
        self.assertEqual(self.file.content_type, 'text/plain')
        self.failUnless(ADummyCache.invalidated)
    def testManageUpload(self):
        f = StringIO('jammyjohnson')
        self.file.manage_upload(f)
        self.assertEqual(self.file.data, 'jammyjohnson')
        self.assertEqual(self.file.content_type, 'application/octet-stream')
    def testIfModSince(self):
        now = time.time()
        e = {'SERVER_NAME':'foo', 'SERVER_PORT':'80', 'REQUEST_METHOD':'GET'}
        # not modified since
        t_notmod = rfc1123_date(now)
        e['HTTP_IF_MODIFIED_SINCE'] = t_notmod
        out = StringIO()
        resp = HTTPResponse(stdout=out)
        req = HTTPRequest(sys.stdin, e, resp)
        data = self.file.index_html(req,resp)
        self.assertEqual(resp.getStatus(), 304)
        self.assertEqual(data, '')
        # modified since
        t_mod = rfc1123_date(now - 100)
        e['HTTP_IF_MODIFIED_SINCE'] = t_mod
        out = StringIO()
        resp = HTTPResponse(stdout=out)
        req = HTTPRequest(sys.stdin, e, resp)
        data = self.file.index_html(req,resp)
        self.assertEqual(resp.getStatus(), 200)
        self.assertEqual(data, str(self.file.data))
    def testPUT(self):
        s = '# some python\n'
        # with content type
        data = StringIO(s)
        req = aputrequest(data, 'text/x-python')
        req.processInputs()
        self.file.PUT(req, req.RESPONSE)
        self.assertEqual(self.file.content_type, 'text/x-python')
        self.assertEqual(str(self.file.data), s)
        # without content type
        data.seek(0)
        req = aputrequest(data, '')
        req.processInputs()
        self.file.PUT(req, req.RESPONSE)
        self.assertEqual(self.file.content_type, 'text/x-python')
        self.assertEqual(str(self.file.data), s)
    def testIndexHtmlWithPdata(self):
        self.file.manage_upload('a' * (2 << 16)) # 128K
        self.file.index_html(self.app.REQUEST, self.app.REQUEST.RESPONSE)
        self.assert_(self.app.REQUEST.RESPONSE._wrote)
    def testIndexHtmlWithString(self):
        self.file.manage_upload('a' * 100) # 100 bytes
        self.file.index_html(self.app.REQUEST, self.app.REQUEST.RESPONSE)
        self.assert_(not self.app.REQUEST.RESPONSE._wrote)
    def testStr(self):
        self.assertEqual(str(self.file), self.data)
    def testFindSupport_not_text(self):
        self.file.manage_edit('foobar', 'application/octet-stream',
                              filedata=''.join([chr(x) for x in range(256)]))
        self.assertEqual(self.file.PrincipiaSearchSource(), '')
    def testFindSupport_text(self):
        self.file.manage_edit('foobar', 'text/plain',
                              filedata='Now is the time for all good men to '
                                       'come to the aid of the Party.')
        self.failUnless('Party' in self.file.PrincipiaSearchSource())
    def testFindFile(self):
        self.file.manage_edit('foobar', 'text/plain',
                              filedata='Now is the time for all good men to '
                                       'come to the aid of the Party.')
        results = self.app.ZopeFind(self.app, obj_searchterm='Party')
        self.assertEqual(len(results), 1)
        self.assertEqual(results[0][1], self.file)
    def test_z2interfaces(self):
        from Interface.Verify import verifyClass
        from OFS.Image import File
        from webdav.WriteLockInterface import WriteLockInterface
        from ZPublisher.HTTPRangeSupport import HTTPRangeInterface
        verifyClass(HTTPRangeInterface, File)
        verifyClass(WriteLockInterface, File)
    def testUnicode(self):
        val = u'some unicode string here'
        
        self.assertRaises(TypeError, self.file.manage_edit,
                          'foobar', 'text/plain', filedata=val)
        
class ImageTests(FileTests):
    data = open(filedata, 'rb').read()
    content_type = 'image/gif'
    factory = 'manage_addImage'
    def testUpdateData(self):
        self.file.update_data(self.data)
        self.assertEqual(self.file.size, len(self.data))
        self.assertEqual(self.file.data, self.data)
        self.assertEqual(self.file.width, 16)
        self.assertEqual(self.file.height, 16)
        self.failUnless(ADummyCache.invalidated)
        self.failUnless(ADummyCache.set)
    def testStr(self):
        self.assertEqual(str(self.file),
          (' '))
    def testTag(self):
        tag_fmt = '
'))
    def testTag(self):
        tag_fmt = ' '
        self.assertEqual(self.file.tag(), (tag_fmt % ('','')))
        self.file.manage_changeProperties(title='foo')
        self.assertEqual(self.file.tag(), (tag_fmt % ('','foo')))
        self.file.manage_changeProperties(alt='bar')
        self.assertEqual(self.file.tag(), (tag_fmt % ('bar','foo')))
    def testViewImageOrFile(self):
        pass # dtml method,screw it
    def test_z2interfaces(self):
        from Interface.Verify import verifyClass
        from OFS.Image import Image
        from webdav.WriteLockInterface import WriteLockInterface
        verifyClass(WriteLockInterface, Image)
def test_suite():
    return unittest.TestSuite((
        unittest.makeSuite(FileTests),
        unittest.makeSuite(ImageTests),
        ))
if __name__ == '__main__':
    unittest.main(defaultTest='test_suite')
'
        self.assertEqual(self.file.tag(), (tag_fmt % ('','')))
        self.file.manage_changeProperties(title='foo')
        self.assertEqual(self.file.tag(), (tag_fmt % ('','foo')))
        self.file.manage_changeProperties(alt='bar')
        self.assertEqual(self.file.tag(), (tag_fmt % ('bar','foo')))
    def testViewImageOrFile(self):
        pass # dtml method,screw it
    def test_z2interfaces(self):
        from Interface.Verify import verifyClass
        from OFS.Image import Image
        from webdav.WriteLockInterface import WriteLockInterface
        verifyClass(WriteLockInterface, Image)
def test_suite():
    return unittest.TestSuite((
        unittest.makeSuite(FileTests),
        unittest.makeSuite(ImageTests),
        ))
if __name__ == '__main__':
    unittest.main(defaultTest='test_suite')