The last few days I have taken a considerable time to remove all the errors from the Ubuntu One tests on Windows. The tests were leaving tcp connections in the twisted reactor that on some Windows systems will result on a DirtyReactorError which is a PITA. Due to a refactor the pattern I described here was remove (ouch!) and I had to re-add it. In this case, instead of doing the pattern everywhere is needed I created some helper classes that will ensure that the clients and servers are correctly cleaned up after you use them.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 | from twisted.internet import defer, protocol, reactor from twisted.spread import pb from ubuntuone.devtools.testcases import BaseTestCase # no init method + twisted common warnings # pylint: disable=W0232, C0103, E1101 def server_protocol_factory(cls): """Factory to create tidy protocols.""" if cls is None: cls = protocol.Protocol class ServerSaveProtocol(cls): """A tidy protocol.""" def connectionLost(self, *args): """Lost the connection.""" cls.connectionLost(self, *args) # lets tell everyone # pylint: disable=W0212 if (self.factory._disconnecting and self.factory.testserver_on_connection_lost is not None and not self.factory.testserver_on_connection_lost.called): self.factory.testserver_on_connection_lost.callback(self) # pylint: enable=W0212 return ServerSaveProtocol def client_protocol_factory(cls): """Factory to create tidy protocols.""" if cls is None: cls = protocol.Protocol class ClientSaveProtocol(cls): """A tidy protocol.""" def connectionMade(self): """Connection made.""" if (self.factory.testserver_on_connection_made is not None and not self.factory.testserver_on_connection_made.called): self.factory.testserver_on_connection_made.callback(self) cls.connectionMade(self) def connectionLost(self, *a): """Connection list.""" # pylint: disable=W0212 if (self.factory._disconnecting and self.factory.testserver_on_connection_lost is not None and not self.factory.testserver_on_connection_lost.called): self.factory.testserver_on_connection_lost.callback(self) # pylint: enable=W0212 cls.connectionLost(self, *a) return ClientSaveProtocol class TidyTCPServer(object): """Ensure that twisted services are correctly managed in tests. Closing a twisted service is a complicated matter. In order to do so you have to ensure that three different deferreds are fired: 1. The server must stop listening. 2. The client connection must disconnect. 3. The server connection must disconnect. This class allows to create a service and a client that will ensure that the reactor is left clean by following the pattern described at http://mumak.net/stuff/twisted-disconnect.html """ def __init__(self): """Create a new instance.""" self.listener = None self.server_factory = None self.connector = None self.client_factory = None def listen_server(self, server_class, *args, **kwargs): """Start a server in a random port.""" # pylint: disable=W0621 from twisted.internet import reactor # pylint: enable=W0621 self.server_factory = server_class(*args, **kwargs) self.server_factory._disconnecting = False self.server_factory.testserver_on_connection_lost = defer.Deferred() self.server_factory.protocol = server_protocol_factory( self.server_factory.protocol) self.listener = reactor.listenTCP(0, self.server_factory) return self.server_factory def connect_client(self, client_class, *args, **kwargs): """Conect a client to a given service.""" if self.server_factory is None: raise ValueError('Server Factory was not provided.') if self.listener is None: raise ValueError('%s has not started listening.', self.server_factory) self.client_factory = client_class(*args, **kwargs) self.client_factory._disconnecting = False self.client_factory.protocol = client_protocol_factory( self.client_factory.protocol) self.client_factory.testserver_on_connection_made = defer.Deferred() self.client_factory.testserver_on_connection_lost = defer.Deferred() self.connector = reactor.connectTCP('localhost', self.listener.getHost().port, self.client_factory) return self.client_factory def clean_up(self): """Action to be performed for clean up.""" if self.server_factory is None or self.listener is None: # nothing to clean return defer.succeed(None) if self.listener and self.connector: # clean client and server self.server_factory._disconnecting = True self.client_factory._disconnecting = True d = defer.maybeDeferred(self.listener.stopListening) self.connector.disconnect() return defer.gatherResults([d, self.client_factory.testserver_on_connection_lost, self.server_factory.testserver_on_connection_lost]) if self.listener: # just clean the server since there is no client self.server_factory._disconnecting = True return defer.maybeDeferred(self.listener.stopListening) class TCPServerTestCase(BaseTestCase): """Test that uses a single twisted service.""" @defer.inlineCallbacks def setUp(self): """Set the diff tests.""" yield super(TCPServerTestCase, self).setUp() self.service_runner = TidyTCPServer() self.server_factory = None self.client_factory = None self.server_disconnected = None self.client_connected = None self.client_disconnected = None self.listener = None self.connector = None self.addCleanup(self.tear_down_server_client) def listen_server(self, server_class, *args, **kwargs): """Listen a server. The method takes the server class and the arguments that should be passed to the server constructor. """ self.server_factory = self.service_runner.listen_server(server_class, *args, **kwargs) self.server_disconnected = self.server_factory.testserver_on_connection_lost self.listener = self.service_runner.listener def connect_client(self, client_class, *args, **kwargs): """Connect the client. The method takes the client factory class and the arguments that should be passed to the client constructor. """ self.client_factory = self.service_runner.connect_client(client_class, *args, **kwargs) self.client_connected = self.client_factory.testserver_on_connection_made self.client_disconnected = self.client_factory.testserver_on_connection_lost self.connector = self.service_runner.connector def tear_down_server_client(self): """Clean the server and client.""" if self.server_factory and self.client_factory: return self.service_runner.clean_up() class PbServiceTestCase(TCPServerTestCase): """Test a pb service.""" def listen_server(self, *args, **kwargs): """Listen a pb server.""" super(PbServiceTestCase, self).listen_server(pb.PBServerFactory, *args, **kwargs) def connect_client(self, *args, **kwargs): """Connect a pb client.""" super(PbServiceTestCase, self).connect_client(pb.PBClientFactory, *args, **kwargs) |
The above classes can be used in the following way:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 | from twisted.internet import defer, protocol from twisted.spread import pb from twisted.trial.unittest import TestCase from ubuntuone.devtools.testcases.txtcpserver import ( client_protocol_factory, server_protocol_factory, PbServiceTestCase, TidyTCPServer, TCPServerTestCase, ) # no init # pylint: disable=W0232 class Adder(pb.Referenceable): """A remote adder.""" def remote_add(self, first, second): """Remote adding numbers.""" return first + second class Calculator(pb.Root): """A calculator ran somewhere on the net.""" def __init__(self, adder): """Create a new instance.""" # pb.Root has no __init__ self.adder = adder def remote_get_adder(self): """Get the remote added.""" return self.adder def remote_check_adder(self, other_adder): """Check if the are the same.""" return self.adder == other_adder class Echoer(pb.Root): """An echoer that repeats what we say.""" def remote_say(self, sentence): """Echo what we want to say.""" return 'Echoer: %s' % sentence # pylint: enable=W0232 class MultipleSercicesTestCase(TestCase): """Ensure that several services can be ran.""" timeout = 2 @defer.inlineCallbacks def setUp(self): """Set the diff tests.""" yield super(MultipleSercicesTestCase, self).setUp() self.first_tcp_server = TidyTCPServer() self.second_tcp_server = TidyTCPServer() self.adder = Adder() self.calculator = Calculator(self.adder) self.echoer = Echoer() @defer.inlineCallbacks def test_single_service(self): """Test setting a single service.""" first_number = 1 second_number = 2 self.first_tcp_server.listen_server(pb.PBServerFactory, self.calculator) self.addCleanup(self.first_tcp_server.clean_up) calculator_c = self.first_tcp_server.connect_client(pb.PBClientFactory) # ensure we do have connected yield calculator_c.testserver_on_connection_made calculator = yield calculator_c.getRootObject() adder = yield calculator.callRemote('get_adder') result = yield adder.callRemote('add', first_number, second_number) self.assertEqual(first_number + second_number, result) @defer.inlineCallbacks def test_multiple_services(self): """Test setting multiple services.""" first_number = 1 second_number = 2 # first service self.first_tcp_server.listen_server(pb.PBServerFactory, self.calculator) self.addCleanup(self.first_tcp_server.clean_up) # second service self.second_tcp_server.listen_server(pb.PBServerFactory, self.echoer) self.addCleanup(self.second_tcp_server.clean_up) # connect the diff clients calculator_c = self.first_tcp_server.connect_client(pb.PBClientFactory) echoer_c = self.second_tcp_server.connect_client(pb.PBClientFactory) # ensure we do have connected yield calculator_c.testserver_on_connection_made yield echoer_c.testserver_on_connection_made calculator = yield calculator_c.getRootObject() adder = yield calculator.callRemote('get_adder') result = yield adder.callRemote('add', first_number, second_number) self.assertEqual(first_number + second_number, result) echoer = yield echoer_c.getRootObject() echo = yield echoer.callRemote('say', 'hello') self.assertEqual(self.echoer.remote_say('hello'), echo) |
On top of this helper classes I added a twisted site that would follow a similar pattern:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 | """A tx based web server.""" from twisted.internet import defer, reactor, ssl from twisted.protocols.policies import WrappingFactory from twisted.web import server from ubuntuone.devtools.testcases.txtcpserver import server_protocol_factory # no init method + twisted common warnings # pylint: disable=W0232, C0103, E1101 class WebServer(object): """Webserver used to perform requests in tests.""" def __init__(self, root_resource, ssl_settings=None): """Create and start the instance. The ssl_settings parameter contains a dictionary with the key and cert that will be used to perform ssl connections. The root_resource contains the resource with all its childre. """ self.root = root_resource self.ssl_settings = ssl_settings self.port = None # use an http.HTTPFactory that was modified to ensure that we have # clean close connections self.site = server.Site(self.root, timeout=None) self.wrapper = WrappingFactory(self.site) self.wrapper.testserver_on_connection_lost = defer.Deferred() self.wrapper.protocol = server_protocol_factory(self.wrapper.protocol) self.wrapper._disconnecting = False def _listen(self, site, ssl_settings=None): """Listen a port to allow the tests.""" if ssl_settings is None: return reactor.listenTCP(0, site) else: ssl_context = ssl.DefaultOpenSSLContextFactory( ssl_settings['key'], ssl_settings['cert']) return reactor.listenSSL(0, site, ssl_context) def get_iri(self): """Build the iri for this mock server.""" url = u"http://127.0.0.1:%d/" return url % self.get_port() def get_uri(self): """Build the uri for this mock server.""" url = "http://127.0.0.1:%d/" return url % self.get_port() def get_ssl_iri(self): """Build the ssl iri for this mock server.""" if self.ssl_settings: url = u"https://127.0.0.1:%d/" return url % self.get_ssl_port() def get_ssl_uri(self): """Build the ssl iri for this mock server.""" if self.ssl_settings: url = "https://127.0.0.1:%d/" return url % self.get_ssl_port() def get_port(self): """Return the port where we are listening.""" return self.port.getHost().port def get_ssl_port(self): """Return the ssl port where we are listening.""" # pylint: disable=W0212 if self.ssl_settings: return self.port.getHost().port # pylint: enable=W0212 def start(self): """Start the service.""" self.port = self._listen(self.wrapper, self.ssl_settings) def stop(self): """Shut it down.""" if self.port: self.wrapper._disconnecting = True connected = self.wrapper.protocols.keys() if connected: for con in connected: con.transport.loseConnection() else: self.wrapper.testserver_on_connection_lost = defer.succeed(None) d = defer.maybeDeferred(self.port.stopListening) return defer.gatherResults([d, self.wrapper.testserver_on_connection_lost]) return defer.succeed(None) |
When using this webserver you have to be careful because we do not longer pay attention to the client protocols, if you do not use twisted to access it you have no problems (libsoup, qtnetwork etc..) but if, for example, you use the HTTPClientFactory you have to do something similar to this in your TestCase setUp:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 | class TxWebClientTestCase(WebClientTestCase): """Test case for txweb.""" webclient_factory = txweb.WebClient @defer.inlineCallbacks def setUp(self): """Set the diff tests.""" # delay import, otherwise a default reactor gets installed from twisted.web import client self.factory = client.HTTPClientFactory # set the factory to be used # Hook the server's buildProtocol to make the protocol instance # accessible to tests and ensure that the reactor is clean! build_protocol = self.factory.buildProtocol self.serverProtocol = None def remember_protocol_instance(my_self, addr): """Remember the protocol used in the test.""" protocol = build_protocol(my_self, addr) self.serverProtocol = protocol on_connection_lost = defer.Deferred() connection_lost = protocol.connectionLost def defer_connection_lost(protocol, *a): """Lost connection.""" if not on_connection_lost.called: on_connection_lost.callback(None) connection_lost(protocol, *a) self.patch(protocol, 'connectionLost', defer_connection_lost) def cleanup(): """Clean the connection.""" if self.serverProtocol.transport is not None: self.serverProtocol.transport.loseConnection() return on_connection_lost self.addCleanup(cleanup) return protocol self.factory.buildProtocol = remember_protocol_instance self.addCleanup(self.set_build_protocol, build_protocol) txweb.WebClient.client_factory = self.factory yield super(TxWebClientTestCase, self).setUp() def set_build_protocol(self, method): """Set the method back.""" self.factory.buildProtocol = method |
I have spent some time on this so let me know if you know about any improvements that I can add my brain does not longer want to know about DirtyReactors…
Read more
Latest Official Posts