############################################################################## # # Copyright (c) 2003 Zope Corporation and Contributors. # All Rights Reserved. # # This software is subject to the provisions of the Zope Public License, # Version 2.1 (ZPL). A copy of the ZPL should accompany this distribution. # THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED # WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED # WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS # FOR A PARTICULAR PURPOSE. # ############################################################################## """Test the ZServer configuration machinery.""" import cStringIO as StringIO import os import tempfile import unittest import ZConfig import ZServer.datatypes TEMPFILENAME = tempfile.mktemp() class BaseTest(unittest.TestCase): schema = None def get_schema(self): if self.schema is None: sio = StringIO.StringIO(""" """) schema = ZConfig.loadSchemaFile(sio) BaseTest.schema = schema return self.schema def load_factory(self, text): conf, xxx = ZConfig.loadConfigFile(self.get_schema(), StringIO.StringIO(text)) self.assertEqual(len(conf.servers), 1) return conf.servers[0] def check_prepare(self, factory, defaulthost='127.0.0.1'): # On Linux the default hostname is an empty string, but on Windows # it's "localhost". So factory.prepare() will replace factory.host # with "127.0.0.1" only on non-Windows boxes. expected_factory_host = factory.host or defaulthost port = factory.port o = object() factory.prepare(defaulthost, o, "module", {"key": "value"}, portbase=9300) self.assert_(factory.dnsresolver is o) self.assertEqual(factory.module, "module") self.assertEqual(factory.cgienv.items(), [("key", "value")]) if port is None: self.assert_(factory.host is None, factory.host) self.assert_(factory.port is None, factory.port) else: self.assertEqual(factory.host, expected_factory_host) self.assertEqual(factory.port, 9300 + port) class WarningInterceptor: _old_stderr = None _our_stderr_stream = None def _trap_warning_output( self ): if self._old_stderr is not None: return import sys from StringIO import StringIO self._old_stderr = sys.stderr self._our_stderr_stream = sys.stderr = StringIO() def _free_warning_output( self ): if self._old_stderr is None: return import sys sys.stderr = self._old_stderr class ZServerConfigurationTestCase(BaseTest, WarningInterceptor): def setUp(self): BaseTest.setUp(self) def tearDown(self): self._free_warning_output() BaseTest.tearDown(self) def load_unix_domain_factory(self, text): fn = TEMPFILENAME f = open(fn, 'w') f.close() try: factory = self.load_factory(text % fn) finally: os.unlink(fn) self.assert_(factory.host is None) self.assert_(factory.port is None) self.assertEqual(factory.path, fn) return factory def test_http_factory(self): factory = self.load_factory("""\ address 81 force-connection-close true webdav-source-clients cadaever """) self.assert_(isinstance(factory, ZServer.datatypes.HTTPServerFactory)) self.assert_(factory.force_connection_close) self.assertEqual(factory.host, "") self.assertEqual(factory.port, 81) self.assertEqual(factory.webdav_source_clients, "cadaever") self.check_prepare(factory) server = factory.create() self.assertEqual(server.ip, '127.0.0.1') self.assertEqual(server.port, 9381) server.close() def test_http_factory_defaulthost(self): factory = self.load_factory("""\ address 81 force-connection-close true webdav-source-clients cadaever """) self.check_prepare(factory, defaulthost='0.0.0.0') server = factory.create() self.assertEqual(server.ip, '0.0.0.0', 'Zope Collector issue #1507/1728 (ignoring ' 'defaulthost): %r != \'0.0.0.0\'' % server.ip) self.assertEqual(server.port, 9381) server.close() def test_webdav_source_factory(self): factory = self.load_factory("""\ address 82 force-connection-close true """) self.assert_(isinstance(factory, ZServer.datatypes.WebDAVSourceServerFactory)) self.assert_(factory.force_connection_close) self.assertEqual(factory.host, "") self.assertEqual(factory.port, 82) self.check_prepare(factory) server = factory.create() self.assertEqual(server.ip, '127.0.0.1') self.assertEqual(server.port, 9382) server.close() def test_pcgi_factory(self): factory = self.load_unix_domain_factory("""\ path %s """) self.assert_(isinstance(factory, ZServer.datatypes.PCGIServerFactory)) def test_fcgi_factory(self): self._trap_warning_output() factory = self.load_factory("""\ address 83 """) self.assert_(isinstance(factory, ZServer.datatypes.FCGIServerFactory)) self.assertEqual(factory.host, "") self.assertEqual(factory.port, 83) self.assertEqual(factory.path, None) self.check_prepare(factory) factory.create().close() factory = self.load_unix_domain_factory("""\ address %s """) self.assert_(isinstance(factory, ZServer.datatypes.FCGIServerFactory)) self.check_prepare(factory) def test_ftp_factory(self): factory = self.load_factory("""\ address 84 """) self.assert_(isinstance(factory, ZServer.datatypes.FTPServerFactory)) self.assertEqual(factory.host, "") self.assertEqual(factory.port, 84) self.check_prepare(factory) factory.create().close() def test_icp_factory(self): factory = self.load_factory("""\ address 86 """) self.assert_(isinstance(factory, ZServer.datatypes.ICPServerFactory)) self.assertEqual(factory.host, "") self.assertEqual(factory.port, 86) self.check_prepare(factory) factory.create().close() def test_clockserver_factory(self): factory = self.load_factory("""\ method /foo/bar period 30 user chrism password 123 host www.example.com """) self.assert_(isinstance(factory, ZServer.datatypes.ClockServerFactory)) self.assertEqual(factory.method, '/foo/bar') self.assertEqual(factory.period, 30) self.assertEqual(factory.user, 'chrism') self.assertEqual(factory.password, '123') self.assertEqual(factory.hostheader, 'www.example.com') factory.create().close() class MonitorServerConfigurationTestCase(BaseTest): def setUp(self): from AccessControl import User self.__emergency_user = User.emergency_user class FakeUser: def _getPassword(self): return "foo" def tearDown(self): from AccessControl import User User.emergency_user = self.__emergency_user def setUser(self, null): from AccessControl import User u = self.FakeUser() if null: u.__null_user__ = True User.emergency_user = u def create(self): factory = self.load_factory("""\ address 85 """) self.assert_(isinstance(factory, ZServer.datatypes.MonitorServerFactory)) self.assertEqual(factory.host, "") self.assertEqual(factory.port, 85) self.check_prepare(factory) return factory.create() def test_monitor_factory_without_emergency_user(self): self.setUser(True) self.assert_(self.create() is None) def test_monitor_factory_with_emergency_user(self): self.setUser(False) self.create().close() def test_suite(): suite = unittest.makeSuite(ZServerConfigurationTestCase) suite.addTest(unittest.makeSuite(MonitorServerConfigurationTestCase)) return suite if __name__ == "__main__": unittest.main(defaultTest="test_suite")