############################################################################## # # Copyright (c) 2001, 2002 Zope Corporation and Contributors. # All Rights Reserved. # # This software is subject to the provisions of the Zope Public License, # Version 2.1 (ZPL). A copy of the ZPL should accompany this distribution. # THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED # WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED # WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS # FOR A PARTICULAR PURPOSE # ############################################################################## """Test suite for ZEO based on ZODB.tests.""" # System imports import asyncore import logging import os import random import signal import socket import tempfile import time import unittest # ZODB test support import ZODB from ZODB.tests.MinPO import MinPO from ZODB.tests.StorageTestBase import zodb_unpickle # ZODB test mixin classes from ZODB.tests import StorageTestBase, BasicStorage, VersionStorage, \ TransactionalUndoStorage, TransactionalUndoVersionStorage, \ PackableStorage, Synchronization, ConflictResolution, RevisionStorage, \ MTStorage, ReadOnlyStorage from ZODB.tests.testDemoStorage import DemoStorageWrappedBase from ZEO.ClientStorage import ClientStorage import ZEO.zrpc.connection from ZEO.tests import forker, Cache, CommitLockTests, ThreadTests import ZEO.tests.ConnectionTests logger = logging.getLogger('ZEO.tests.testZEO') class DummyDB: def invalidate(self, *args): pass class OneTimeTests(unittest.TestCase): def checkZEOVersionNumber(self): import ZEO # Starting with ZODB 3.4, the ZODB and ZEO version numbers should # be identical. self.assertEqual(ZODB.__version__, ZEO.version) class MiscZEOTests: """ZEO tests that don't fit in elsewhere.""" def checkLargeUpdate(self): obj = MinPO("X" * (10 * 128 * 1024)) self._dostore(data=obj) def checkZEOInvalidation(self): addr = self._storage._addr storage2 = ClientStorage(addr, wait=1, min_disconnect_poll=0.1) try: oid = self._storage.new_oid() ob = MinPO('first') revid1 = self._dostore(oid, data=ob) data, serial = storage2.load(oid, '') self.assertEqual(zodb_unpickle(data), MinPO('first')) self.assertEqual(serial, revid1) revid2 = self._dostore(oid, data=MinPO('second'), revid=revid1) # Now, storage 2 should eventually get the new data. It # will take some time, although hopefully not much. # We'll poll till we get it and whine if we time out: for n in range(30): time.sleep(.1) data, serial = storage2.load(oid, '') if (serial == revid2 and zodb_unpickle(data) == MinPO('second') ): break else: raise AssertionError('Invalidation message was not sent!') finally: storage2.close() def get_port(): """Return a port that is not in use. Checks if a port is in use by trying to connect to it. Assumes it is not in use if connect raises an exception. Raises RuntimeError after 10 tries. """ for i in range(10): port = random.randrange(20000, 30000) s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) try: try: s.connect(('localhost', port)) except socket.error: # Perhaps we should check value of error too. return port finally: s.close() raise RuntimeError("Can't find port") class GenericTests( # Base class for all ZODB tests StorageTestBase.StorageTestBase, # ZODB test mixin classes (in the same order as imported) BasicStorage.BasicStorage, PackableStorage.PackableStorage, Synchronization.SynchronizedStorage, MTStorage.MTStorage, ReadOnlyStorage.ReadOnlyStorage, # ZEO test mixin classes (in the same order as imported) CommitLockTests.CommitLockVoteTests, ThreadTests.ThreadTests, # Locally defined (see above) MiscZEOTests ): """Combine tests from various origins in one class.""" def setUp(self): logger.info("setUp() %s", self.id()) port = get_port() zconf = forker.ZEOConfig(('', port)) zport, adminaddr, pid, path = forker.start_zeo_server(self.getConfig(), zconf, port) self._pids = [pid] self._servers = [adminaddr] self._conf_path = path self._storage = ClientStorage(zport, '1', cache_size=20000000, min_disconnect_poll=0.5, wait=1, wait_timeout=60) self._storage.registerDB(DummyDB(), None) def tearDown(self): self._storage.close() os.remove(self._conf_path) for server in self._servers: forker.shutdown_zeo_server(server) if hasattr(os, 'waitpid'): # Not in Windows Python until 2.3 for pid in self._pids: os.waitpid(pid, 0) def open(self, read_only=0): # Needed to support ReadOnlyStorage tests. Ought to be a # cleaner way. addr = self._storage._addr self._storage.close() self._storage = ClientStorage(addr, read_only=read_only, wait=1) def checkWriteMethods(self): # ReadOnlyStorage defines checkWriteMethods. The decision # about where to raise the read-only error was changed after # Zope 2.5 was released. So this test needs to detect Zope # of the 2.5 vintage and skip the test. # The __version__ attribute was not present in Zope 2.5. if hasattr(ZODB, "__version__"): ReadOnlyStorage.ReadOnlyStorage.checkWriteMethods(self) def checkSortKey(self): key = '%s:%s' % (self._storage._storage, self._storage._server_addr) self.assertEqual(self._storage.sortKey(), key) class FullGenericTests( GenericTests, Cache.StorageWithCache, Cache.TransUndoStorageWithCache, CommitLockTests.CommitLockUndoTests, ConflictResolution.ConflictResolvingStorage, ConflictResolution.ConflictResolvingTransUndoStorage, PackableStorage.PackableUndoStorage, RevisionStorage.RevisionStorage, TransactionalUndoStorage.TransactionalUndoStorage, TransactionalUndoVersionStorage.TransactionalUndoVersionStorage, VersionStorage.VersionStorage, ): """Extend GenericTests with tests that MappingStorage can't pass.""" class FileStorageTests(FullGenericTests): """Test ZEO backed by a FileStorage.""" level = 2 def getConfig(self): filename = self.__fs_base = tempfile.mktemp() return """\ path %s """ % filename class MappingStorageTests(GenericTests): """ZEO backed by a Mapping storage.""" def getConfig(self): return """""" class HeartbeatTests(ZEO.tests.ConnectionTests.CommonSetupTearDown): """Make sure a heartbeat is being sent and that it does no harm This is really hard to test properly because we can't see the data flow between the client and server and we can't really tell what's going on in the server very well. :( """ def setUp(self): # Crank down the select frequency self.__old_client_timeout = ZEO.zrpc.connection.client_timeout ZEO.zrpc.connection.client_timeout = 0.1 ZEO.zrpc.connection.client_trigger.pull_trigger() ZEO.tests.ConnectionTests.CommonSetupTearDown.setUp(self) def tearDown(self): ZEO.zrpc.connection.client_timeout = self.__old_client_timeout ZEO.zrpc.connection.client_trigger.pull_trigger() ZEO.tests.ConnectionTests.CommonSetupTearDown.tearDown(self) def getConfig(self, path, create, read_only): return """""" def checkHeartbeatWithServerClose(self): # This is a minimal test that mainly tests that the heartbeat # function does no harm. client_timeout_count = ZEO.zrpc.connection.client_timeout_count self._storage = self.openClientStorage() time.sleep(1) # allow some time for the select loop to fire a few times self.assert_(ZEO.zrpc.connection.client_timeout_count > client_timeout_count) self._dostore() if hasattr(os, 'kill'): # Kill server violently, in hopes of provoking problem os.kill(self._pids[0], signal.SIGKILL) self._servers[0] = None else: self.shutdownServer() for i in range(91): # wait for disconnection if not self._storage.is_connected(): break time.sleep(0.1) else: raise AssertionError("Didn't detect server shutdown in 5 seconds") def checkHeartbeatWithClientClose(self): # This is a minimal test that mainly tests that the heartbeat # function does no harm. client_timeout_count = ZEO.zrpc.connection.client_timeout_count self._storage = self.openClientStorage() self._storage.close() time.sleep(1) # allow some time for the select loop to fire a few times self.assert_(ZEO.zrpc.connection.client_timeout_count > client_timeout_count) class CatastrophicClientLoopFailure( ZEO.tests.ConnectionTests.CommonSetupTearDown): """Test what happens when the client loop falls over """ def getConfig(self, path, create, read_only): return """""" def checkCatastrophicClientLoopFailure(self): self._storage = self.openClientStorage() class Evil: def writable(self): raise SystemError("I'm evil") log = [] ZEO.zrpc.connection.client_logger.critical = ( lambda m, *a, **kw: log.append((m % a, kw)) ) ZEO.zrpc.connection.client_map[None] = Evil() try: ZEO.zrpc.connection.client_trigger.pull_trigger() except DisconnectedError: pass time.sleep(.1) self.failIf(self._storage.is_connected()) self.assertEqual(len(ZEO.zrpc.connection.client_map), 1) del ZEO.zrpc.connection.client_logger.critical self.assertEqual(log[0][0], 'The ZEO cient loop failed.') self.assert_('exc_info' in log[0][1]) self.assertEqual(log[1][0], "Couldn't close a dispatcher.") self.assert_('exc_info' in log[1][1]) class ConnectionInvalidationOnReconnect( ZEO.tests.ConnectionTests.CommonSetupTearDown): """Test what happens when the client loop falls over """ def getConfig(self, path, create, read_only): return """""" def checkConnectionInvalidationOnReconnect(self): storage = ClientStorage(self.addr, wait=1, min_disconnect_poll=0.1) self._storage = storage # and we'll wait for the storage to be reconnected: for i in range(100): if storage.is_connected(): break time.sleep(0.1) else: raise AssertionError("Couldn't connect to server") class DummyDB: _invalidatedCache = 0 def invalidateCache(self): self._invalidatedCache += 1 def invalidate(*a, **k): pass db = DummyDB() storage.registerDB(db, None) base = db._invalidatedCache # Now we'll force a disconnection and reconnection storage._connection.close() # and we'll wait for the storage to be reconnected: for i in range(100): if storage.is_connected(): break time.sleep(0.1) else: raise AssertionError("Couldn't connect to server") # Now, the root object in the connection should have been invalidated: self.assertEqual(db._invalidatedCache, base+1) class DemoStorageWrappedAroundClientStorage(DemoStorageWrappedBase): def getConfig(self): return """""" def _makeBaseStorage(self): logger.info("setUp() %s", self.id()) port = get_port() zconf = forker.ZEOConfig(('', port)) zport, adminaddr, pid, path = forker.start_zeo_server(self.getConfig(), zconf, port) self._pids = [pid] self._servers = [adminaddr] self._conf_path = path _base = ClientStorage(zport, '1', cache_size=20000000, min_disconnect_poll=0.5, wait=1, wait_timeout=60) _base.registerDB(DummyDB(), None) return _base def tearDown(self): DemoStorageWrappedBase.tearDown(self) os.remove(self._conf_path) for server in self._servers: forker.shutdown_zeo_server(server) if hasattr(os, 'waitpid'): # Not in Windows Python until 2.3 for pid in self._pids: os.waitpid(pid, 0) test_classes = [OneTimeTests, FileStorageTests, MappingStorageTests, DemoStorageWrappedAroundClientStorage, HeartbeatTests, CatastrophicClientLoopFailure, ConnectionInvalidationOnReconnect, ] def test_suite(): suite = unittest.TestSuite() for klass in test_classes: sub = unittest.makeSuite(klass, "check") suite.addTest(sub) return suite if __name__ == "__main__": unittest.main(defaultTest="test_suite")