I really hate my voice when I hear myself in videos etc.. but well, it happens to most of us. Here is the small rant ralsina and I had about multi-platform programming with python:
Read moreI really hate my voice when I hear myself in videos etc.. but well, it happens to most of us. Here is the small rant ralsina and I had about multi-platform programming with python:
Read moreThe following behaviour completely got me out of place:
>>> byte_string = b'Test' >>> byte_string[0] == b'T' False >>> byte_string[0] in b'T' True >>> byte_string[0] 84 >>> b'T' b'T' |
Turns out this is documented (blame me I did not read the docs!!!). The correct way to do it is:
>>> byte_string = b'Test' >>> byte_string[:1] == b'T' True |
I really hope we don’t have any code that does this operations.
Read moreSo yet again I have been confronted with broken tests in Ubuntu One. As I have already mentioned before I have spent a significant amount of time ensuring that the tests of Ubuntu One (which use twisted a lot) are deterministic and we do not leave a dirty reactor in the way. In order to do that a few week a go I wrote the following code that will help the rest of the team write such tests:
import os import shutil import tempfile from twisted.internet import defer, endpoints, protocol from twisted.spread import pb from ubuntuone.devtools.testcases import BaseTestCase # no init method + twisted common warnings # pylint: disable=W0232, C0103, E1101 def server_protocol_factory(cls): """Factory to create tidy protocols.""" if cls is None: cls = protocol.Protocol class ServerTidyProtocol(cls): """A tidy protocol.""" def connectionLost(self, *args): """Lost the connection.""" cls.connectionLost(self, *args) # lets tell everyone # pylint: disable=W0212 if (self.factory._disconnecting and self.factory.testserver_on_connection_lost is not None and not self.factory.testserver_on_connection_lost.called): self.factory.testserver_on_connection_lost.callback(self) # pylint: enable=W0212 return ServerTidyProtocol def client_protocol_factory(cls): """Factory to create tidy protocols.""" if cls is None: cls = protocol.Protocol class ClientTidyProtocol(cls): """A tidy protocol.""" def connectionLost(self, *a): """Connection list.""" # pylint: disable=W0212 if (self.factory._disconnecting and self.factory.testserver_on_connection_lost is not None and not self.factory.testserver_on_connection_lost.called): self.factory.testserver_on_connection_lost.callback(self) # pylint: enable=W0212 cls.connectionLost(self, *a) return ClientTidyProtocol class TidySocketServer(object): """Ensure that twisted servers are correctly managed in tests. Closing a twisted server is a complicated matter. In order to do so you have to ensure that three different deferreds are fired: 1. The server must stop listening. 2. The client connection must disconnect. 3. The server connection must disconnect. This class allows to create a server and a client that will ensure that the reactor is left clean by following the pattern described at http://mumak.net/stuff/twisted-disconnect.html """ def __init__(self): """Create a new instance.""" self.listener = None self.server_factory = None self.connector = None self.client_factory = None def get_server_endpoint(self): """Return the server endpoint description.""" raise NotImplementedError('To be implemented by child classes.') def get_client_endpoint(self): """Return the client endpoint description.""" raise NotImplementedError('To be implemented by child classes.') @defer.inlineCallbacks def listen_server(self, server_class, *args, **kwargs): """Start a server in a random port.""" from twisted.internet import reactor self.server_factory = server_class(*args, **kwargs) self.server_factory._disconnecting = False self.server_factory.testserver_on_connection_lost = defer.Deferred() self.server_factory.protocol = server_protocol_factory( self.server_factory.protocol) endpoint = endpoints.serverFromString(reactor, self.get_server_endpoint()) self.listener = yield endpoint.listen(self.server_factory) defer.returnValue(self.server_factory) @defer.inlineCallbacks def connect_client(self, client_class, *args, **kwargs): """Conect a client to a given server.""" from twisted.internet import reactor if self.server_factory is None: raise ValueError('Server Factory was not provided.') if self.listener is None: raise ValueError('%s has not started listening.', self.server_factory) self.client_factory = client_class(*args, **kwargs) self.client_factory._disconnecting = False self.client_factory.protocol = client_protocol_factory( self.client_factory.protocol) self.client_factory.testserver_on_connection_lost = defer.Deferred() endpoint = endpoints.clientFromString(reactor, self.get_client_endpoint()) self.connector = yield endpoint.connect(self.client_factory) defer.returnValue(self.client_factory) def clean_up(self): """Action to be performed for clean up.""" if self.server_factory is None or self.listener is None: # nothing to clean return defer.succeed(None) if self.listener and self.connector: # clean client and server self.server_factory._disconnecting = True self.client_factory._disconnecting = True self.connector.transport.loseConnection() d = defer.maybeDeferred(self.listener.stopListening) return defer.gatherResults([d, self.client_factory.testserver_on_connection_lost, self.server_factory.testserver_on_connection_lost]) if self.listener: # just clean the server since there is no client self.server_factory._disconnecting = True return defer.maybeDeferred(self.listener.stopListening) class TidyTCPServer(TidySocketServer): """A tidy tcp domain sockets server.""" client_endpoint_pattern = 'tcp:host=127.0.0.1:port=%s' server_endpoint_pattern = 'tcp:0:interface=127.0.0.1' def get_server_endpoint(self): """Return the server endpoint description.""" return self.server_endpoint_pattern def get_client_endpoint(self): """Return the client endpoint description.""" if self.server_factory is None: raise ValueError('Server Factory was not provided.') if self.listener is None: raise ValueError('%s has not started listening.', self.server_factory) return self.client_endpoint_pattern % self.listener.getHost().port class TidyUnixServer(TidySocketServer): """A tidy unix domain sockets server.""" client_endpoint_pattern = 'unix:path=%s' server_endpoint_pattern = 'unix:%s' def __init__(self): """Create a new instance.""" super(TidyUnixServer, self).__init__() self.temp_dir = tempfile.mkdtemp() self.path = os.path.join(self.temp_dir, 'tidy_unix_server') def get_server_endpoint(self): """Return the server endpoint description.""" return self.server_endpoint_pattern % self.path def get_client_endpoint(self): """Return the client endpoint description.""" return self.client_endpoint_pattern % self.path def clean_up(self): """Action to be performed for clean up.""" result = super(TidyUnixServer, self).clean_up() # remove the dir once we are disconnected result.addCallback(lambda _: shutil.rmtree(self.temp_dir)) return result class ServerTestCase(BaseTestCase): """Base test case for tidy servers.""" @defer.inlineCallbacks def setUp(self): """Set the diff tests.""" yield super(ServerTestCase, self).setUp() try: self.server_runner = self.get_server() except NotImplementedError: self.server_runner = None self.server_factory = None self.client_factory = None self.server_disconnected = None self.client_connected = None self.client_disconnected = None self.listener = None self.connector = None self.addCleanup(self.tear_down_server_client) def get_server(self): """Return the server to be used to run the tests.""" raise NotImplementedError('To be implemented by child classes.') @defer.inlineCallbacks def listen_server(self, server_class, *args, **kwargs): """Listen a server. The method takes the server class and the arguments that should be passed to the server constructor. """ self.server_factory = yield self.server_runner.listen_server( server_class, *args, **kwargs) self.server_disconnected = self.server_factory.testserver_on_connection_lost self.listener = self.server_runner.listener @defer.inlineCallbacks def connect_client(self, client_class, *args, **kwargs): """Connect the client. The method takes the client factory class and the arguments that should be passed to the client constructor. """ self.client_factory = yield self.server_runner.connect_client( client_class, *args, **kwargs) self.client_disconnected = self.client_factory.testserver_on_connection_lost self.connector = self.server_runner.connector def tear_down_server_client(self): """Clean the server and client.""" if self.server_runner: return self.server_runner.clean_up() class TCPServerTestCase(ServerTestCase): """Test that uses a single twisted server.""" def get_server(self): """Return the server to be used to run the tests.""" return TidyTCPServer() class UnixServerTestCase(ServerTestCase): """Test that uses a single twisted server.""" def get_server(self): """Return the server to be used to run the tests.""" return TidyUnixServer() class PbServerTestCase(ServerTestCase): """Test a pb server.""" def get_server(self): """Return the server to be used to run the tests.""" raise NotImplementedError('To be implemented by child classes.') @defer.inlineCallbacks def listen_server(self, *args, **kwargs): """Listen a pb server.""" yield super(PbServerTestCase, self).listen_server(pb.PBServerFactory, *args, **kwargs) @defer.inlineCallbacks def connect_client(self, *args, **kwargs): """Connect a pb client.""" yield super(PbServerTestCase, self).connect_client(pb.PBClientFactory, *args, **kwargs) class TCPPbServerTestCase(PbServerTestCase): """Test a pb server over TCP.""" def get_server(self): """Return the server to be used to run the tests.""" return TidyTCPServer() class UnixPbServerTestCase(PbServerTestCase): """Test a pb server over Unix domain sockets.""" def get_server(self): """Return the server to be used to run the tests.""" return TidyUnixServer() |
The idea of the code is that developers do not need to worry about how to stop listening ports in their tests and just write tests like the following:
class TCPMultipleServersTestCase(TestCase): """Ensure that several servers can be ran.""" timeout = 2 @defer.inlineCallbacks def setUp(self): """Set the diff tests.""" yield super(TCPMultipleServersTestCase, self).setUp() self.first_tcp_server = self.get_server() self.second_tcp_server = self.get_server() self.adder = Adder() self.calculator = Calculator(self.adder) self.echoer = Echoer() def get_server(self): """Return the server to be used to run the tests.""" return TidyTCPServer() @defer.inlineCallbacks def test_single_server(self): """Test setting a single server.""" first_number = 1 second_number = 2 yield self.first_tcp_server.listen_server(pb.PBServerFactory, self.calculator) self.addCleanup(self.first_tcp_server.clean_up) calculator_c = yield self.first_tcp_server.connect_client( pb.PBClientFactory) calculator = yield calculator_c.getRootObject() adder = yield calculator.callRemote('get_adder') result = yield adder.callRemote('add', first_number, second_number) self.assertEqual(first_number + second_number, result) @defer.inlineCallbacks def test_multiple_server(self): """Test setting multiple server.""" first_number = 1 second_number = 2 # first server yield self.first_tcp_server.listen_server(pb.PBServerFactory, self.calculator) self.addCleanup(self.first_tcp_server.clean_up) # second server yield self.second_tcp_server.listen_server(pb.PBServerFactory, self.echoer) self.addCleanup(self.second_tcp_server.clean_up) # connect the diff clients calculator_c = yield self.first_tcp_server.connect_client( pb.PBClientFactory) echoer_c = yield self.second_tcp_server.connect_client( pb.PBClientFactory) calculator = yield calculator_c.getRootObject() adder = yield calculator.callRemote('get_adder') result = yield adder.callRemote('add', first_number, second_number) self.assertEqual(first_number + second_number, result) echoer = yield echoer_c.getRootObject() echo = yield echoer.callRemote('say', 'hello') self.assertEqual(self.echoer.remote_say('hello'), echo) |
As you can see those tests do not give a rats ass about ensuring that the clients lose connection or we stop listening ports… Or so I though because the following code made such approach break in Mac OS X (although I suspect it was broken on Linux and Windows but we never experienced it):
class NullProtocol(protocol.Protocol): """A protocol that drops the connection.""" def connectionMade(self): """Just drop the connection.""" self.transport.loseConnection() class PortDetectFactory(protocol.ClientFactory): """Will detect if something is listening in a given port.""" protocol = NullProtocol def __init__(self): """Initialize this instance.""" self.d = defer.Deferred() def is_listening(self): """A deferred that will become True if something is listening.""" return self.d def buildProtocol(self, addr): """Connected.""" p = protocol.ClientFactory.buildProtocol(self, addr) if not self.d.called: self.d.callback(True) return p def clientConnectionLost(self, connector, reason): """The connection was lost.""" if not self.d.called: self.d.callback(False) def clientConnectionFailed(self, connector, reason): """The connection failed.""" if not self.d.called: self.d.callback(False) |
The code used to test the above was written as:
@defer.inlineCallbacks def test_is_already_running(self): """The is_already_running method returns True if already started.""" server = self.get_server() self.addCleanup(server.clean_up) class TestConnect(object): @defer.inlineCallbacks def connect(my_self, factory): connected_factory = yield server.connect_client(PortDetectFactory) self.patch(factory, 'is_listening', lambda: connected_factory.is_listening()) defer.returnValue(connected_factory) self.patch(tcpactivation, 'clientFromString', lambda *args: TestConnect()) yield server.listen_server(protocol.ServerFactory) # pylint: disable=E1101 ad = ActivationDetector(self.config) result = yield ad.is_already_running() self.assertTrue(result, "It should be already running.") |
While in all the other platforms the tests passed with no problems on Mac OS X the tests would block in the clean_up method from the server because the deferred that was called in the connectionLost from the ServerTidyProtocol was never fired… Interesting.. After digging in the code I realized that the main issue with the approach of the clean_up code was wrong. The problem relies on the way in which the NullProtocol works. As you can see in the code the protocol loses its connections as soon as it made. This results in to possible things:
When running the tests on Windows and Linux we were always facing the first scenario, buildProtocol was called which meant that connectionLost in the server protocol would be called. On the other hand, on Mac OS X, 1 out of 10 runs of the tests would block in the clean up because we would be in the second scenario, that is, no protocol would be build in the ServerFactory which results in the connectionLost never being called because it was no needed. The work around this issue is quite simple once you understand what is going on. The ServerFactory has to be modified to set the deferred when buildProtocol is called and not before ensuring that when we cleanup we check if the deferred is None and if it is not we wait for it to be fired. The fixed version of the helper code is the following:
import os import shutil import tempfile from twisted.internet import defer, endpoints, protocol from twisted.spread import pb from ubuntuone.devtools.testcases import BaseTestCase # no init method + twisted common warnings # pylint: disable=W0232, C0103, E1101 def server_protocol_factory(cls): """Factory to create tidy protocols.""" if cls is None: cls = protocol.Protocol class ServerTidyProtocol(cls): """A tidy protocol.""" def connectionLost(self, *args): """Lost the connection.""" cls.connectionLost(self, *args) # lets tell everyone # pylint: disable=W0212 if (self.factory._disconnecting and self.factory.testserver_on_connection_lost is not None and not self.factory.testserver_on_connection_lost.called): self.factory.testserver_on_connection_lost.callback(self) # pylint: enable=W0212 return ServerTidyProtocol def server_factory_factory(cls): """Factory that creates special types of factories for tests.""" if cls is None: cls = protocol.ServerFactory class TidyServerFactory(cls): """A tidy factory.""" testserver_on_connection_lost = None def buildProtocol(self, addr): prot = cls.buildProtocol(self, addr) self.testserver_on_connection_lost = defer.Deferred() return prot return TidyServerFactory def client_protocol_factory(cls): """Factory to create tidy protocols.""" if cls is None: cls = protocol.Protocol class ClientTidyProtocol(cls): """A tidy protocol.""" def connectionLost(self, *a): """Connection list.""" cls.connectionLost(self, *a) # pylint: disable=W0212 if (self.factory._disconnecting and self.factory.testserver_on_connection_lost is not None and not self.factory.testserver_on_connection_lost.called): self.factory.testserver_on_connection_lost.callback(self) # pylint: enable=W0212 return ClientTidyProtocol class TidySocketServer(object): """Ensure that twisted servers are correctly managed in tests. Closing a twisted server is a complicated matter. In order to do so you have to ensure that three different deferreds are fired: 1. The server must stop listening. 2. The client connection must disconnect. 3. The server connection must disconnect. This class allows to create a server and a client that will ensure that the reactor is left clean by following the pattern described at http://mumak.net/stuff/twisted-disconnect.html """ def __init__(self): """Create a new instance.""" self.listener = None self.server_factory = None self.connector = None self.client_factory = None def get_server_endpoint(self): """Return the server endpoint description.""" raise NotImplementedError('To be implemented by child classes.') def get_client_endpoint(self): """Return the client endpoint description.""" raise NotImplementedError('To be implemented by child classes.') @defer.inlineCallbacks def listen_server(self, server_class, *args, **kwargs): """Start a server in a random port.""" from twisted.internet import reactor tidy_class = server_factory_factory(server_class) self.server_factory = tidy_class(*args, **kwargs) self.server_factory._disconnecting = False self.server_factory.protocol = server_protocol_factory( self.server_factory.protocol) endpoint = endpoints.serverFromString(reactor, self.get_server_endpoint()) self.listener = yield endpoint.listen(self.server_factory) defer.returnValue(self.server_factory) @defer.inlineCallbacks def connect_client(self, client_class, *args, **kwargs): """Conect a client to a given server.""" from twisted.internet import reactor if self.server_factory is None: raise ValueError('Server Factory was not provided.') if self.listener is None: raise ValueError('%s has not started listening.', self.server_factory) self.client_factory = client_class(*args, **kwargs) self.client_factory._disconnecting = False self.client_factory.protocol = client_protocol_factory( self.client_factory.protocol) self.client_factory.testserver_on_connection_lost = defer.Deferred() endpoint = endpoints.clientFromString(reactor, self.get_client_endpoint()) self.connector = yield endpoint.connect(self.client_factory) defer.returnValue(self.client_factory) def clean_up(self): """Action to be performed for clean up.""" if self.server_factory is None or self.listener is None: # nothing to clean return defer.succeed(None) if self.listener and self.connector: # clean client and server self.server_factory._disconnecting = True self.client_factory._disconnecting = True d = defer.maybeDeferred(self.listener.stopListening) self.connector.transport.loseConnection() if self.server_factory.testserver_on_connection_lost: return defer.gatherResults([d, self.client_factory.testserver_on_connection_lost, self.server_factory.testserver_on_connection_lost]) else: return defer.gatherResults([d, self.client_factory.testserver_on_connection_lost]) if self.listener: # just clean the server since there is no client # pylint: disable=W0201 self.server_factory._disconnecting = True return defer.maybeDeferred(self.listener.stopListening) # pylint: enable=W0201 class TidyTCPServer(TidySocketServer): """A tidy tcp domain sockets server.""" client_endpoint_pattern = 'tcp:host=127.0.0.1:port=%s' server_endpoint_pattern = 'tcp:0:interface=127.0.0.1' def get_server_endpoint(self): """Return the server endpoint description.""" return self.server_endpoint_pattern def get_client_endpoint(self): """Return the client endpoint description.""" if self.server_factory is None: raise ValueError('Server Factory was not provided.') if self.listener is None: raise ValueError('%s has not started listening.', self.server_factory) return self.client_endpoint_pattern % self.listener.getHost().port class TidyUnixServer(TidySocketServer): """A tidy unix domain sockets server.""" client_endpoint_pattern = 'unix:path=%s' server_endpoint_pattern = 'unix:%s' def __init__(self): """Create a new instance.""" super(TidyUnixServer, self).__init__() self.temp_dir = tempfile.mkdtemp() self.path = os.path.join(self.temp_dir, 'tidy_unix_server') def get_server_endpoint(self): """Return the server endpoint description.""" return self.server_endpoint_pattern % self.path def get_client_endpoint(self): """Return the client endpoint description.""" return self.client_endpoint_pattern % self.path def clean_up(self): """Action to be performed for clean up.""" result = super(TidyUnixServer, self).clean_up() # remove the dir once we are disconnected result.addCallback(lambda _: shutil.rmtree(self.temp_dir)) return result class ServerTestCase(BaseTestCase): """Base test case for tidy servers.""" @defer.inlineCallbacks def setUp(self): """Set the diff tests.""" yield super(ServerTestCase, self).setUp() try: self.server_runner = self.get_server() except NotImplementedError: self.server_runner = None self.server_factory = None self.client_factory = None self.server_disconnected = None self.client_connected = None self.client_disconnected = None self.listener = None self.connector = None self.addCleanup(self.tear_down_server_client) def get_server(self): """Return the server to be used to run the tests.""" raise NotImplementedError('To be implemented by child classes.') @defer.inlineCallbacks def listen_server(self, server_class, *args, **kwargs): """Listen a server. The method takes the server class and the arguments that should be passed to the server constructor. """ self.server_factory = yield self.server_runner.listen_server( server_class, *args, **kwargs) self.server_disconnected = self.server_factory.testserver_on_connection_lost self.listener = self.server_runner.listener @defer.inlineCallbacks def connect_client(self, client_class, *args, **kwargs): """Connect the client. The method takes the client factory class and the arguments that should be passed to the client constructor. """ self.client_factory = yield self.server_runner.connect_client( client_class, *args, **kwargs) self.client_disconnected = self.client_factory.testserver_on_connection_lost self.connector = self.server_runner.connector def tear_down_server_client(self): """Clean the server and client.""" if self.server_runner: return self.server_runner.clean_up() class TCPServerTestCase(ServerTestCase): """Test that uses a single twisted server.""" def get_server(self): """Return the server to be used to run the tests.""" return TidyTCPServer() class UnixServerTestCase(ServerTestCase): """Test that uses a single twisted server.""" def get_server(self): """Return the server to be used to run the tests.""" return TidyUnixServer() class PbServerTestCase(ServerTestCase): """Test a pb server.""" def get_server(self): """Return the server to be used to run the tests.""" raise NotImplementedError('To be implemented by child classes.') @defer.inlineCallbacks def listen_server(self, *args, **kwargs): """Listen a pb server.""" yield super(PbServerTestCase, self).listen_server(pb.PBServerFactory, *args, **kwargs) @defer.inlineCallbacks def connect_client(self, *args, **kwargs): """Connect a pb client.""" yield super(PbServerTestCase, self).connect_client(pb.PBClientFactory, *args, **kwargs) class TCPPbServerTestCase(PbServerTestCase): """Test a pb server over TCP.""" def get_server(self): """Return the server to be used to run the tests.""" return TidyTCPServer() class UnixPbServerTestCase(PbServerTestCase): """Test a pb server over Unix domain sockets.""" def get_server(self): """Return the server to be used to run the tests.""" return TidyUnixServer() |
I wonder if at some point I should share this code for the people out there… any opinions?
Read moreThe last few days I have taken a considerable time to remove all the errors from the Ubuntu One tests on Windows. The tests were leaving tcp connections in the twisted reactor that on some Windows systems will result on a DirtyReactorError which is a PITA. Due to a refactor the pattern I described here was remove (ouch!) and I had to re-add it. In this case, instead of doing the pattern everywhere is needed I created some helper classes that will ensure that the clients and servers are correctly cleaned up after you use them.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 | from twisted.internet import defer, protocol, reactor from twisted.spread import pb from ubuntuone.devtools.testcases import BaseTestCase # no init method + twisted common warnings # pylint: disable=W0232, C0103, E1101 def server_protocol_factory(cls): """Factory to create tidy protocols.""" if cls is None: cls = protocol.Protocol class ServerSaveProtocol(cls): """A tidy protocol.""" def connectionLost(self, *args): """Lost the connection.""" cls.connectionLost(self, *args) # lets tell everyone # pylint: disable=W0212 if (self.factory._disconnecting and self.factory.testserver_on_connection_lost is not None and not self.factory.testserver_on_connection_lost.called): self.factory.testserver_on_connection_lost.callback(self) # pylint: enable=W0212 return ServerSaveProtocol def client_protocol_factory(cls): """Factory to create tidy protocols.""" if cls is None: cls = protocol.Protocol class ClientSaveProtocol(cls): """A tidy protocol.""" def connectionMade(self): """Connection made.""" if (self.factory.testserver_on_connection_made is not None and not self.factory.testserver_on_connection_made.called): self.factory.testserver_on_connection_made.callback(self) cls.connectionMade(self) def connectionLost(self, *a): """Connection list.""" # pylint: disable=W0212 if (self.factory._disconnecting and self.factory.testserver_on_connection_lost is not None and not self.factory.testserver_on_connection_lost.called): self.factory.testserver_on_connection_lost.callback(self) # pylint: enable=W0212 cls.connectionLost(self, *a) return ClientSaveProtocol class TidyTCPServer(object): """Ensure that twisted services are correctly managed in tests. Closing a twisted service is a complicated matter. In order to do so you have to ensure that three different deferreds are fired: 1. The server must stop listening. 2. The client connection must disconnect. 3. The server connection must disconnect. This class allows to create a service and a client that will ensure that the reactor is left clean by following the pattern described at http://mumak.net/stuff/twisted-disconnect.html """ def __init__(self): """Create a new instance.""" self.listener = None self.server_factory = None self.connector = None self.client_factory = None def listen_server(self, server_class, *args, **kwargs): """Start a server in a random port.""" # pylint: disable=W0621 from twisted.internet import reactor # pylint: enable=W0621 self.server_factory = server_class(*args, **kwargs) self.server_factory._disconnecting = False self.server_factory.testserver_on_connection_lost = defer.Deferred() self.server_factory.protocol = server_protocol_factory( self.server_factory.protocol) self.listener = reactor.listenTCP(0, self.server_factory) return self.server_factory def connect_client(self, client_class, *args, **kwargs): """Conect a client to a given service.""" if self.server_factory is None: raise ValueError('Server Factory was not provided.') if self.listener is None: raise ValueError('%s has not started listening.', self.server_factory) self.client_factory = client_class(*args, **kwargs) self.client_factory._disconnecting = False self.client_factory.protocol = client_protocol_factory( self.client_factory.protocol) self.client_factory.testserver_on_connection_made = defer.Deferred() self.client_factory.testserver_on_connection_lost = defer.Deferred() self.connector = reactor.connectTCP('localhost', self.listener.getHost().port, self.client_factory) return self.client_factory def clean_up(self): """Action to be performed for clean up.""" if self.server_factory is None or self.listener is None: # nothing to clean return defer.succeed(None) if self.listener and self.connector: # clean client and server self.server_factory._disconnecting = True self.client_factory._disconnecting = True d = defer.maybeDeferred(self.listener.stopListening) self.connector.disconnect() return defer.gatherResults([d, self.client_factory.testserver_on_connection_lost, self.server_factory.testserver_on_connection_lost]) if self.listener: # just clean the server since there is no client self.server_factory._disconnecting = True return defer.maybeDeferred(self.listener.stopListening) class TCPServerTestCase(BaseTestCase): """Test that uses a single twisted service.""" @defer.inlineCallbacks def setUp(self): """Set the diff tests.""" yield super(TCPServerTestCase, self).setUp() self.service_runner = TidyTCPServer() self.server_factory = None self.client_factory = None self.server_disconnected = None self.client_connected = None self.client_disconnected = None self.listener = None self.connector = None self.addCleanup(self.tear_down_server_client) def listen_server(self, server_class, *args, **kwargs): """Listen a server. The method takes the server class and the arguments that should be passed to the server constructor. """ self.server_factory = self.service_runner.listen_server(server_class, *args, **kwargs) self.server_disconnected = self.server_factory.testserver_on_connection_lost self.listener = self.service_runner.listener def connect_client(self, client_class, *args, **kwargs): """Connect the client. The method takes the client factory class and the arguments that should be passed to the client constructor. """ self.client_factory = self.service_runner.connect_client(client_class, *args, **kwargs) self.client_connected = self.client_factory.testserver_on_connection_made self.client_disconnected = self.client_factory.testserver_on_connection_lost self.connector = self.service_runner.connector def tear_down_server_client(self): """Clean the server and client.""" if self.server_factory and self.client_factory: return self.service_runner.clean_up() class PbServiceTestCase(TCPServerTestCase): """Test a pb service.""" def listen_server(self, *args, **kwargs): """Listen a pb server.""" super(PbServiceTestCase, self).listen_server(pb.PBServerFactory, *args, **kwargs) def connect_client(self, *args, **kwargs): """Connect a pb client.""" super(PbServiceTestCase, self).connect_client(pb.PBClientFactory, *args, **kwargs) |
The above classes can be used in the following way:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 | from twisted.internet import defer, protocol from twisted.spread import pb from twisted.trial.unittest import TestCase from ubuntuone.devtools.testcases.txtcpserver import ( client_protocol_factory, server_protocol_factory, PbServiceTestCase, TidyTCPServer, TCPServerTestCase, ) # no init # pylint: disable=W0232 class Adder(pb.Referenceable): """A remote adder.""" def remote_add(self, first, second): """Remote adding numbers.""" return first + second class Calculator(pb.Root): """A calculator ran somewhere on the net.""" def __init__(self, adder): """Create a new instance.""" # pb.Root has no __init__ self.adder = adder def remote_get_adder(self): """Get the remote added.""" return self.adder def remote_check_adder(self, other_adder): """Check if the are the same.""" return self.adder == other_adder class Echoer(pb.Root): """An echoer that repeats what we say.""" def remote_say(self, sentence): """Echo what we want to say.""" return 'Echoer: %s' % sentence # pylint: enable=W0232 class MultipleSercicesTestCase(TestCase): """Ensure that several services can be ran.""" timeout = 2 @defer.inlineCallbacks def setUp(self): """Set the diff tests.""" yield super(MultipleSercicesTestCase, self).setUp() self.first_tcp_server = TidyTCPServer() self.second_tcp_server = TidyTCPServer() self.adder = Adder() self.calculator = Calculator(self.adder) self.echoer = Echoer() @defer.inlineCallbacks def test_single_service(self): """Test setting a single service.""" first_number = 1 second_number = 2 self.first_tcp_server.listen_server(pb.PBServerFactory, self.calculator) self.addCleanup(self.first_tcp_server.clean_up) calculator_c = self.first_tcp_server.connect_client(pb.PBClientFactory) # ensure we do have connected yield calculator_c.testserver_on_connection_made calculator = yield calculator_c.getRootObject() adder = yield calculator.callRemote('get_adder') result = yield adder.callRemote('add', first_number, second_number) self.assertEqual(first_number + second_number, result) @defer.inlineCallbacks def test_multiple_services(self): """Test setting multiple services.""" first_number = 1 second_number = 2 # first service self.first_tcp_server.listen_server(pb.PBServerFactory, self.calculator) self.addCleanup(self.first_tcp_server.clean_up) # second service self.second_tcp_server.listen_server(pb.PBServerFactory, self.echoer) self.addCleanup(self.second_tcp_server.clean_up) # connect the diff clients calculator_c = self.first_tcp_server.connect_client(pb.PBClientFactory) echoer_c = self.second_tcp_server.connect_client(pb.PBClientFactory) # ensure we do have connected yield calculator_c.testserver_on_connection_made yield echoer_c.testserver_on_connection_made calculator = yield calculator_c.getRootObject() adder = yield calculator.callRemote('get_adder') result = yield adder.callRemote('add', first_number, second_number) self.assertEqual(first_number + second_number, result) echoer = yield echoer_c.getRootObject() echo = yield echoer.callRemote('say', 'hello') self.assertEqual(self.echoer.remote_say('hello'), echo) |
On top of this helper classes I added a twisted site that would follow a similar pattern:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 | """A tx based web server.""" from twisted.internet import defer, reactor, ssl from twisted.protocols.policies import WrappingFactory from twisted.web import server from ubuntuone.devtools.testcases.txtcpserver import server_protocol_factory # no init method + twisted common warnings # pylint: disable=W0232, C0103, E1101 class WebServer(object): """Webserver used to perform requests in tests.""" def __init__(self, root_resource, ssl_settings=None): """Create and start the instance. The ssl_settings parameter contains a dictionary with the key and cert that will be used to perform ssl connections. The root_resource contains the resource with all its childre. """ self.root = root_resource self.ssl_settings = ssl_settings self.port = None # use an http.HTTPFactory that was modified to ensure that we have # clean close connections self.site = server.Site(self.root, timeout=None) self.wrapper = WrappingFactory(self.site) self.wrapper.testserver_on_connection_lost = defer.Deferred() self.wrapper.protocol = server_protocol_factory(self.wrapper.protocol) self.wrapper._disconnecting = False def _listen(self, site, ssl_settings=None): """Listen a port to allow the tests.""" if ssl_settings is None: return reactor.listenTCP(0, site) else: ssl_context = ssl.DefaultOpenSSLContextFactory( ssl_settings['key'], ssl_settings['cert']) return reactor.listenSSL(0, site, ssl_context) def get_iri(self): """Build the iri for this mock server.""" url = u"http://127.0.0.1:%d/" return url % self.get_port() def get_uri(self): """Build the uri for this mock server.""" url = "http://127.0.0.1:%d/" return url % self.get_port() def get_ssl_iri(self): """Build the ssl iri for this mock server.""" if self.ssl_settings: url = u"https://127.0.0.1:%d/" return url % self.get_ssl_port() def get_ssl_uri(self): """Build the ssl iri for this mock server.""" if self.ssl_settings: url = "https://127.0.0.1:%d/" return url % self.get_ssl_port() def get_port(self): """Return the port where we are listening.""" return self.port.getHost().port def get_ssl_port(self): """Return the ssl port where we are listening.""" # pylint: disable=W0212 if self.ssl_settings: return self.port.getHost().port # pylint: enable=W0212 def start(self): """Start the service.""" self.port = self._listen(self.wrapper, self.ssl_settings) def stop(self): """Shut it down.""" if self.port: self.wrapper._disconnecting = True connected = self.wrapper.protocols.keys() if connected: for con in connected: con.transport.loseConnection() else: self.wrapper.testserver_on_connection_lost = defer.succeed(None) d = defer.maybeDeferred(self.port.stopListening) return defer.gatherResults([d, self.wrapper.testserver_on_connection_lost]) return defer.succeed(None) |
When using this webserver you have to be careful because we do not longer pay attention to the client protocols, if you do not use twisted to access it you have no problems (libsoup, qtnetwork etc..) but if, for example, you use the HTTPClientFactory you have to do something similar to this in your TestCase setUp:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 | class TxWebClientTestCase(WebClientTestCase): """Test case for txweb.""" webclient_factory = txweb.WebClient @defer.inlineCallbacks def setUp(self): """Set the diff tests.""" # delay import, otherwise a default reactor gets installed from twisted.web import client self.factory = client.HTTPClientFactory # set the factory to be used # Hook the server's buildProtocol to make the protocol instance # accessible to tests and ensure that the reactor is clean! build_protocol = self.factory.buildProtocol self.serverProtocol = None def remember_protocol_instance(my_self, addr): """Remember the protocol used in the test.""" protocol = build_protocol(my_self, addr) self.serverProtocol = protocol on_connection_lost = defer.Deferred() connection_lost = protocol.connectionLost def defer_connection_lost(protocol, *a): """Lost connection.""" if not on_connection_lost.called: on_connection_lost.callback(None) connection_lost(protocol, *a) self.patch(protocol, 'connectionLost', defer_connection_lost) def cleanup(): """Clean the connection.""" if self.serverProtocol.transport is not None: self.serverProtocol.transport.loseConnection() return on_connection_lost self.addCleanup(cleanup) return protocol self.factory.buildProtocol = remember_protocol_instance self.addCleanup(self.set_build_protocol, build_protocol) txweb.WebClient.client_factory = self.factory yield super(TxWebClientTestCase, self).setUp() def set_build_protocol(self, method): """Set the method back.""" self.factory.buildProtocol = method |
I have spent some time on this so let me know if you know about any improvements that I can add my brain does not longer want to know about DirtyReactors…
Read moreDue to a small project I’m developing at the moment I had to access the Mac OS X keyring over python. There is some code of seen around but was not very ‘clean’. The following is my attempt to access the Mac OS X Keyring via python using ctypes, I hope it helps someone out there:
from ctypes import ( byref, c_int32, c_uint32, c_void_p, create_string_buffer, memmove, cdll, ) from ctypes.util import find_library from secrets.utils import are_not_none, valid_args # Types OSStatus = c_int32 SecKeychainItemRef = c_void_p SecKeychainRef = c_void_p # Constants errSecSuccess = 0 errSecItemNotFound = -25300 # Native libraries _security = cdll.LoadLibrary(find_library('Security')) _core = cdll.LoadLibrary(find_library('CoreServices')) def keyring_is_open(): """Assert that the keyring is not None.""" def keyring_not_none(name, value, method_name='Method'): """Ensure that the keyring is open.""" if not getattr(value, '', None): raise OSError('Keyring most be open.') return valid_args(keyring_not_none, (('Keyring', 0))) class Backend(object): """Keyring backend for Mac Os X.""" def __init__(self): """Create a new Mac OS X backend.""" super(Backend, self).__init__() self.keychain = None def _get_item(self, realm, username): """Return the item that matched the given info.""" item = SecKeychainItemRef() status = _security.SecKeychainFindGenericPassword(self.keychain, len(realm), realm, len(username), username, None, None, byref(item)) return item, status def open(self): """Open the keyring.""" # we should not be open if self.keychain is not None: raise OSError() self.keychain = SecKeychainRef() # try to open the keyring if _security.SecKeychainOpen('login.keychain', byref(self.keychain)): raise OSError("Can't access the keychain") @keyring_is_open def close(self): """Close the keyring.""" _core.CFRelease(self.keychain) self.keychain = None @keyring_is_open @are_not_none(( ('realm', 1), ('username', 2), ('password', 3))) def set_password(self, realm, username, password): """Set the password for the given real and username.""" item, status = self._get_item(realm, username) if status == errSecItemNotFound: status = _security.SecKeychainAddGenericPassword(self.keychain, len(realm), realm, len(username), username, len(password), password, None) if status: raise OSError() else: raise OSError() @keyring_is_open @are_not_none(( ('realm', 1), ('username', 2), ('password', 3))) def update_password(self, realm, username, password): """Update the password for the given realm.""" item, status = self._get_item(realm, username) if status and status == errSecItemNotFound: raise OSError("Can't store password in keychain") else: status = _security.SecKeychainItemModifyAttributesAndData(item, None, len(password), password) _core.CFRelease(item) if status: raise OSError() if status: raise OSError("Can't store password in keychain") @keyring_is_open @are_not_none(( ('realm', 1), ('username', 2))) def get_password(self, realm, username): """Get the password for the given real and username.""" length = c_uint32() data = c_void_p() status = _security.SecKeychainFindGenericPassword(self.keychain, len(realm), realm, len(username), username, byref(length), byref(data), None) if status == errSecSuccess: password = create_string_buffer(length.value) memmove(password, data.value, length.value) password = password.raw _security.SecKeychainItemFreeContent(None, data) elif status == errSecItemNotFound: password = None else: raise OSError("Can't fetch password from system") return password @keyring_is_open @are_not_none(( ('realm', 1), ('username', 2))) def delete_password(self, realm, username): """Delete the password of the given real and username.""" item, status = self._get_item(realm, username) if status == errSecSuccess: _security.SecKeychainItemDelete(item) _core.CFRelease(item) else: raise OSError() |
I have not added the code of the decorator because they are just noise, the only thing they do is to check that the keyring was indeed opened (self.keyring != None) and that the parameters with the given index are not None (I’m lazy and I prefer to use decorators for this mundane tasks that are done everywhere.
Read moreOn Windows Ubuntu One uses the twisted reactor to run the Qt UI. The main reason for this is that the IPC protocol that is used was written in twisted. This has been giving us a number of head aches like the one we experienced today.
The following code simply shows a dialog that will as the user for his proxy credentials and will store them in the key-ring of which ever platform is being used:
def main(): """Main method used to show the creds dialog.""" if sys.platform == 'win32': import qt4reactor qt4reactor.install() logger.debug('Qt reactor installed.') app = QApplication(sys.argv) args = parse_args() win = ProxyCredsDialog(domain=args.domain, retry=args.retry) return_code = win.exec_() if sys.platform == 'win32': from twisted.internet import reactor reactor.run() sys.exit(return_code) |
From the dialog the most interesting part is the one in which the credentials are stored:
@defer.inlineCallbacks def _on_save_clicked(self, *args): """Save the new credentials.""" username = unicode(self.ui.username_entry.text()).encode('utf8') password = unicode(self.ui.password_entry.text()).encode('utf8') creds = dict(username=username, password=password) try: logger.debug('Save credentials as for domain %s.', self.domain) yield self.keyring.set_credentials(self.domain, creds) except Exception, e: logger.exception('Could not set credentials:') self.done(EXCEPTION_RAISED) logger.debug('Stored creds') self.done(USER_SUCCESS) |
And to give even more details, the following is what is used to spawn a thread to store the credentials on windows:
def set_credentials(self, app_name, cred): """Set the credentials of the Ubuntu SSO item.""" # the windows keyring can only store a pair username-password # so we store the data using ubuntu_sso as the user name. Then # the cred will be stored as the string representation of the dict. return deferToThread(self.keyring.set_password, app_name, USERNAME, dumps(cred)) |
A priori there is nothing wrong with the code, or is it? Doing an IRL test you will see that the credentials are never stored, what’s even more none of the deferreds are called. But why? In theory the qt reactor should be taking care of everything which includes the deferreds, the deferToThread and the execution of the ui.. well, it is not. When we look a little closer we can see that we use the exec_ method from the QDialog and this is the root of the bug. Lets put an example, the following is possible in Qt:
import sys from PyQt4.QtGui import QApplication, QDialog app = QApplication(sys.argv) dialog = QDialog() dialog.exec_() |
As you can see we are launching the dialog but we did not execute the application, but why is that? The main reason is found in the implementation of exec in the QDialog class (this time in C++, ouch!):
int QDialog::exec() { Q_D(QDialog); if (d->eventLoop) { qWarning("QDialog::exec: Recursive call detected"); return -1; } bool deleteOnClose = testAttribute(Qt::WA_DeleteOnClose); setAttribute(Qt::WA_DeleteOnClose, false); bool wasShowModal = testAttribute(Qt::WA_ShowModal); setAttribute(Qt::WA_ShowModal, true); setResult(0); show(); QEventLoop eventLoop; d->eventLoop = &eventLoop; (void) eventLoop.exec(); d->eventLoop = 0; setAttribute(Qt::WA_ShowModal, wasShowModal); int res = result(); if (deleteOnClose) delete this; return res; } |
As you can see the implementation uses a QEventLoop, if you read the documentation you will noticed that using exec of the event loops and the default flag all the events will me processed by this event loop, which is not the main event loop, but a child one (small brain fuck here). This means that, due to the implementation of the qtreactor, when using a dialog exec_ (or better say a QEventLoop.exec method) the reactor main loop is not processing the events which means that all your deferreds, deferToThread etc.. will not be processed
Nevertheless there is a way to work around this bug in the qtreactor implementation (because is not the qt fault and it is well documented) which is doing the following workaround:
win = ProxyCredsDialog(domain=args.domain, retry=args.retry) win.show() win.finished.connect(exit_app) |
The above code ensures that the events will be handeled by the reactor main loop and not by a child QEventLoop.
In summary, qtreactor is buggy and will give you problems.. but if you really have to use it (like we do) do remember this detail, DO NOT use exec.
Read moreI have been writing some integration tests lately between Ubuntu One and proxies which use SSL certificates. The idea behind this tests was to be able to test that we deal correctly with those certificates that are not correct (notify the user, remember exceptions, etc..) For that I wrote this small function that I used to generate the certificates.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 | import os from socket import gethostname from OpenSSL import crypto def generate_self_signed_cert(cert_dir, is_valid=True): """Generate a SSL certificate. If the cert_path and the key_path are present they will be overwritten. """ if not os.path.exists(cert_dir): os.makedirs(cert_dir) cert_path = os.path.join(cert_dir, 'squid.crt') key_path = os.path.join(cert_dir, 'squid.key') if os.path.exists(cert_path): os.unlink(cert_path) if os.path.exists(key_path): os.unlink(key_path) # create a key pair key = crypto.PKey() key.generate_key(crypto.TYPE_RSA, 1024) # create a self-signed cert cert = crypto.X509() cert.get_subject().C = 'UK' cert.get_subject().ST = 'London' cert.get_subject().L = 'London' cert.get_subject().O = 'Canonical' cert.get_subject().OU = 'Ubuntu One' cert.get_subject().CN = gethostname() if is_valid else gethostname()[::-1] cert.set_serial_number(1000) cert.gmtime_adj_notBefore(0) cert.gmtime_adj_notAfter(10 * 365 * 24 * 60 * 60) cert.set_issuer(cert.get_subject()) cert.set_pubkey(key) cert.sign(key, 'sha1') with open(cert_path, 'wt') as fd: fd.write(crypto.dump_certificate(crypto.FILETYPE_PEM, cert)) with open(key_path, 'wt') as fd: fd.write(crypto.dump_privatekey(crypto.FILETYPE_PEM, key)) return cert_path, key_path |
I leave to the reader to modify the function to match their needs.
Read moreRecently in Ubuntu One we have been working or using PyQt for the UI of our application so that we could keep a smaller code base. While doing the move I noticed that we needed to have a widget similar to GtkArrow and to my surprise there is not one.
The following is a small implementation fo such a widget that might help other people that are in need of it. Even maybe someone that cares enough will write it in C++ and will propose it to Qt, sorry but I don’t have the time.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 | from PyQt4.QtGui import QPainter, QStyle, QStyleOption, QWidget class QArrow(QWidget): """Qt implementation similar to GtkArrow.""" UP = 0 DOWN = 1 LEFT = 2 RIGHT = 3 def __init__(self, direction, parent=None): """Create a new instance.""" QWidget.__init__(self, parent) if not direction in (QArrow.UP, QArrow.DOWN, QArrow.LEFT, QArrow.RIGHT): raise ValueError('Wrong arrow direction.') self._direction = direction def paintEvent(self, event): """Paint a nice primitive arrow.""" opt = QStyleOption() opt.initFrom(self) p = QPainter(self) if self._direction == QArrow.UP: primitive = QStyle.PE_IndicatorArrowUp elif self._direction == QArrow.DOWN: primitive = QStyle.PE_IndicatorArrowDown elif self._direction == QArrow.LEFT: primitive = QStyle.PE_IndicatorArrowLeft else: primitive = QStyle.PE_IndicatorArrowRight self.style().drawPrimitive(primitive, opt, p, self) |
Took longer to think the correct way to do it than the actual coding, at the end is very simple.
Read moreYesterday I was working on a small UI that uses a WebKit.WebView to show certain information to the user. My idea yesterday was to try and show a native context menu on the view that does not have the default menu items that the WebView has, the only way I’ve found to this is a as follows:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 | from gi.repository import GObject, Gdk, Gtk, WebKit from utils import get_data_file class CustomView(Gtk.EventBox): """Main window used to show the ui and generate signals.""" def __init__(self): """Create a new instance.""" super(CustomView, self).__init__() self.set_events(Gdk.EventMask.BUTTON_PRESS_MASK) self._create_context_menu() self.scroll = Gtk.ScrolledWindow() self.scroll.show() self.view = WebKit.WebView() self.view.show() self.scroll.add(self.view) # load the html used for the main window, from that point # on we will let the html and js do the work and raise # signals via the title self.view.open(get_data_file(MAIN_WINDOW_HMTL)) # lets no use the webkit context menu settings = self.view.get_settings() settings.set_property('enable-default-context-menu', False) self.view.set_settings(settings) self.add(self.scroll) # connect the on button pressed event of the event box # so that we can add a context menu self.connect('button_press_event', self._on_button_press_event) def _create_context_menu(self): """Create the context menu.""" self.menu = Gtk.Menu() delete_menu = Gtk.MenuItem("Delete Task") self.menu.append(delete_menu) def _on_button_press_event(self, widget, event): """Deal with the button press event.""" if event.button == 3: self.menu.popup(None, None, None, None, event.button, event.time) self.menu.show_all() |
Does any one out there know a better way to do this. I know that there is a “populate-popup” signal I can listen to but I cannot get my head around on how to do exactly what I want, which is remove all defaul menu items and add my own. And, does it make sense to have to do that for every popup signal?
Read moreWith GObject introspection is very simple to set the settings of your system trough python. Fist, lets use the command line to find out our current settings:
gsettings list-recursively org.gnome.system.proxy
The following script allows you to retrieve the http proxy settings that you are currently using:
from gi.repository import Gio def get_settings(): """Get proxy settings.""" http_settings = Gio.Settings.new('org.gnome.system.proxy.http') host = http_settings.get_string('host') port = http_settings.get_int('port') if http_settings.get_boolean('use-authentication'): username = http_settings.get_string('authentication_user') password = http_settings.get_string('authentication_password') else: username = password = None return host, port, username, password
Setting them is as easy as getting them:
from gi.repository import Gio def set_settings(host, port, username=None, password=None): """Set proxy settings.""" http_settings = Gio.Settings.new('org.gnome.system.proxy.http') http_settings.set_string('host', host) http_settings.set_int('port', port) if username is not None: http_settings.set_boolean('use-authentication', True) http_settings.set_string('authentication_user', username) http_settings.set_string('authentication_password', password)
This is not utterly complicated but I’m notice that there are not many examples out there, so there you go. There is no code there that can be considered hard but I’d like to point out that if you use the get_value method from the Settings object you will have to call the appropriate get_* method from the returned GVariant, that is:
host = http_settings.get_string('host')
is equal to the following:
host = http_settings.get_value('host').get_string()
I have had a nice riddle given to me by Tim-Erwin within the comments of Bitten in the ass by ReadDirectoryChangesW and multithreading which I really appreciate. His problem was with the following code:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 | import win32file import win32con import win32event import pywintypes FILE_LIST_DIRECTORY = 0x0001 folder = r"C:\some folder to watch" handle = win32file.CreateFile (folder, FILE_LIST_DIRECTORY, win32con.FILE_SHARE_READ | win32con.FILE_SHARE_WRITE, None, win32con.OPEN_EXISTING, win32con.FILE_FLAG_BACKUP_SEMANTICS, None) buffer = win32file.AllocateReadBuffer(1024) overlapped = pywintypes.OVERLAPPED() overlapped.hEvent = win32event.CreateEvent(None, 0, 0, None) flags = win32con.FILE_NOTIFY_CHANGE_FILE_NAME |\ win32con.FILE_NOTIFY_CHANGE_DIR_NAME |\ win32con.FILE_NOTIFY_CHANGE_ATTRIBUTES |\ win32con.FILE_NOTIFY_CHANGE_SIZE |\ win32con.FILE_NOTIFY_CHANGE_LAST_WRITE |\ win32con.FILE_NOTIFY_CHANGE_SECURITY win32file.ReadDirectoryChangesW(handle, buffer, False, flags, overlapped) print "never reached until a change is done in the folder" win32event.WaitForSingleObject(overlapped.hEvent, 5000) |
As Tim-Erwin said in his comments, the above code was not being blocked in the WaitForSingleObject call but in the ReadDirecotryChangesW one. This is clearly my fault because in the blog post I did not give a very important detail that is needed to get the code working async. In order for the above to work correctly it is imperative that you pass to the CreateFile function the FILE_FLAG_OVERLAPPED flag. That means, that in order to fix the code and be blocked in WaitForSingleObject we have to modify the CreateFile call in the following way:
9 10 11 12 | handle = win32file.CreateFile (folder, FILE_LIST_DIRECTORY, win32con.FILE_SHARE_READ | win32con.FILE_SHARE_WRITE, None, win32con.OPEN_EXISTING, win32con.FILE_FLAG_BACKUP_SEMANTICS | win32file.FILE_FLAG_OVERLAPPED , None) |
That fixes Tim-Erwin problem and should help any other person with the same issue. Sorry for not being explicit on such an important detail Tim-Erwin, please accept my apologies
.
In the past days I have been working on implementing a python TestCase that can be used to perform integration tests to the future implementation of proxy support that will be landing in Ubuntu One. The idea of the TestCase is the following:
Initially, one of the major problems I had was to start squid in two different ports so that:
The idea is simple, if you use port A you should never auth while you must in port B, and example configuration of the ACLs and ports is the following:
auth_param basic casesensitive on
# Use a default auth using ncsa and the passed generated file.
auth_param basic program ${auth_process} ${auth_file}
#Recommended minimum configuration:
acl all src all
acl manager proto cache_object
acl localhost src 127.0.0.1/32
acl to_localhost dst 127.0.0.0/8 0.0.0.0/32
#
# Example rule allowing access from your local networks.
# Adapt to list your (internal) IP networks from where browsing
# should be allowed
acl localnet src 10.0.0.0/8 # RFC1918 possible internal network
acl localnet src 172.16.0.0/12 # RFC1918 possible internal network
acl localnet src 192.168.0.0/16 # RFC1918 possible internal network
#
acl SSL_ports port 443 # https
acl SSL_ports port 563 # snews
acl SSL_ports port 873 # rsync
acl Safe_ports port 80 # http
acl Safe_ports port 21 # ftp
acl Safe_ports port 443 # https
acl Safe_ports port 70 # gopher
acl Safe_ports port 210 # wais
acl Safe_ports port 1025-65535 # unregistered ports
acl Safe_ports port 280 # http-mgmt
acl Safe_ports port 488 # gss-http
acl Safe_ports port 591 # filemaker
acl Safe_ports port 777 # multiling http
acl Safe_ports port 631 # cups
acl Safe_ports port 873 # rsync
acl Safe_ports port 901 # SWAT
acl purge method PURGE
acl CONNECT method CONNECT
# make an acl for users that have auth
acl password proxy_auth REQUIRED myportname ${auth_port_number}
acl auth_port_connected myportname ${auth_port_number}
acl nonauth_port_connected myportname ${noauth_port_number}
# Settings used for the tests:
# Allow users connected to the nonauth port
# Allow users authenticated AND connected to the auth port
http_access allow nonauth_port_connected
http_access allow password
#Recommended minimum configuration:
#
# Only allow cachemgr access from localhost
http_access allow manager localhost
http_access deny manager
# Only allow purge requests from localhost
http_access allow purge localhost
http_access deny purge
# Deny requests to unknown ports
http_access deny !Safe_ports
# Deny CONNECT to other than SSL ports
http_access deny CONNECT !SSL_ports
# Example rule allowing access from your local networks.
# Adapt localnet in the ACL section to list your (internal) IP networks
# from where browsing should be allowed
#http_access allow localnet
http_access allow localhost
# And finally deny all other access to this proxy
http_access deny all
#Allow ICP queries from local networks only
icp_access allow localnet
icp_access deny all
# Squid normally listens to port 3128 but we are going to listento two
# different ports, one for auth one for nonauth.
http_port ${noauth_port_number}
http_port ${auth_port_number}
#We recommend you to use at least the following line.
hierarchy_stoplist cgi-bin ?
# Default cache settings.
cache_dir ufs ${spool_temp} 100 16 256
# access log settings
access_log ${squid_temp}/access.log squid
#Default cache stroe log
cache_store_log ${squid_temp}/store.log
#Default pid file name
pid_filename ${squid_temp}/squid.pid
#Default netdb file name:
netdb_filename ${spool_temp}/logs/netdb.state
#Suggested default:
refresh_pattern ^ftp: 1440 20% 10080
refresh_pattern ^gopher: 1440 0% 1440
refresh_pattern -i (/cgi-bin/|\?) 0 0% 0
refresh_pattern (Release|Packages(.gz)*)$ 0 20% 2880
# example line deb packages
#refresh_pattern (\.deb|\.udeb)$ 129600 100% 129600
refresh_pattern . 0 20% 4320
# Don't upgrade ShoutCast responses to HTTP
acl shoutcast rep_header X-HTTP09-First-Line ^ICY.[0-9]
upgrade_http0.9 deny shoutcast
# Apache mod_gzip and mod_deflate known to be broken so don't trust
# Apache to signal ETag correctly on such responses
acl apache rep_header Server ^Apache
broken_vary_encoding allow apache
extension_methods REPORT MERGE MKACTIVITY CHECKOUT
hosts_file /etc/hosts
# Leave coredumps in the first cache dir
coredump_dir ${spool_temp}
Once the above was achieved the code of the test case was quite simple for Ubuntu O, unfortunatly, it was not that issues in Ubuntu P because there we have squid3 which supports http 1.1 and keeps the proxy keeps the connection alive. The fact that the connection is kept alive means that the reactor has a selectable running because the proxy keep it there. In order to solve the issue I wrote the code so that the server could say that the connection timedout. Here is the code that does it:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 | # -*- coding: utf-8 -*- # # Copyright 2011 Canonical Ltd. # # This program is free software: you can redistribute it and/or modify it # under the terms of the GNU General Public License version 3, as published # by the Free Software Foundation. # # This program is distributed in the hope that it will be useful, but # WITHOUT ANY WARRANTY; without even the implied warranties of # MERCHANTABILITY, SATISFACTORY QUALITY, or FITNESS FOR A PARTICULAR # PURPOSE. See the GNU General Public License for more details. # # You should have received a copy of the GNU General Public License along # with this program. If not, see <http://www.gnu.org/licenses/>. """Test the squid test case.""" import base64 from twisted.application import internet, service from twisted.internet import defer, reactor from twisted.web import client, error, http, resource, server from ubuntuone.devtools.testcases.squid import SquidTestCase SAMPLE_RESOURCE = "<p>Hello World!</p>" SIMPLERESOURCE = "simpleresource" THROWERROR = "throwerror" UNAUTHORIZED = "unauthorized" # ignore common twisted lint errors # pylint: disable=C0103, W0212 class ProxyClientFactory(client.HTTPClientFactory): """Factory that supports proxy.""" def __init__(self, proxy_url, proxy_port, url, headers=None): # we set the proxy details before the init because the parent __init__ # calls setURL self.proxy_url = proxy_url self.proxy_port = proxy_port self.disconnected_d = defer.Deferred() client.HTTPClientFactory.__init__(self, url, headers=headers) def setURL(self, url): self.host = self.proxy_url self.port = self.proxy_port self.url = url self.path = url def clientConnectionLost(self, connector, reason, reconnecting=0): """Connection lost.""" self.disconnected_d.callback(self) class ProxyWebClient(object): """Provide useful web methods with proxy.""" def __init__(self, proxy_url=None, proxy_port=None, username=None, password=None): """Create a new instance with the proxy settings.""" self.proxy_url = proxy_url self.proxy_port = proxy_port self.username = username self.password = password self.factory = None self.connectors = [] def _connect(self, url, contextFactory): """Perform the connection.""" scheme, _, _, _ = client._parse(url) # pylint: disable=E1101 if scheme == 'https': from twisted.internet import ssl if contextFactory is None: contextFactory = ssl.ClientContextFactory() self.connectors.append(reactor.connectSSL(self.proxy_url, self.proxy_port, self.factory, contextFactory)) else: self.connectors.append(reactor.connectTCP(self.proxy_url, self.proxy_port, self.factory)) # pylint: enable=E1101 def _process_auth_error(self, failure, url, contextFactory): """Process an auth failure.""" failure.trap(error.Error) if failure.value.status == str(http.PROXY_AUTH_REQUIRED): # we try to get the page using the basic auth auth = base64.b64encode('%s:%s' % (self.username, self.password)) auth_header = 'Basic ' + auth.strip() self.factory = ProxyClientFactory(self.proxy_url, self.proxy_port, url, headers={'Proxy-Authorization': auth_header}) self._connect(url, contextFactory) return self.factory.deferred else: return failure def get_page(self, url, contextFactory=None, *args, **kwargs): """Download a webpage as a string. This method relies on the twisted.web.client.getPage but adds and extra step. If there is an auth error the method will perform a second try so that the username and password are used. """ self.factory = ProxyClientFactory(self.proxy_url, self.proxy_port, url, headers={'Connection': 'close'}) self._connect(url, contextFactory) self.factory.deferred.addErrback(self._process_auth_error, url, contextFactory) return self.factory.deferred @defer.inlineCallbacks def shutdown(self): """Clean all connectors.""" for connector in self.connectors: yield connector.disconnect() defer.returnValue(True) class SimpleResource(resource.Resource): """A simple web resource.""" def render_GET(self, request): """Make a bit of html out of these resource's content.""" return SAMPLE_RESOURCE class SaveHTTPChannel(http.HTTPChannel): """A save protocol to be used in tests.""" protocolInstance = None def connectionMade(self): """Keep track of the given protocol.""" SaveHTTPChannel.protocolInstance = self http.HTTPChannel.connectionMade(self) class SaveSite(server.Site): """A site that let us know when it closed.""" protocol = SaveHTTPChannel def __init__(self, *args, **kwargs): """Create a new instance.""" server.Site.__init__(self, *args, **kwargs) # we disable the timeout in the tests, we will deal with it manually. self.timeOut = None class MockWebServer(object): """A mock webserver for testing""" def __init__(self): """Start up this instance.""" root = resource.Resource() root.putChild(SIMPLERESOURCE, SimpleResource()) root.putChild(THROWERROR, resource.NoResource()) unauthorized_resource = resource.ErrorPage(resource.http.UNAUTHORIZED, "Unauthorized", "Unauthorized") root.putChild(UNAUTHORIZED, unauthorized_resource) self.site = SaveSite(root) application = service.Application('web') self.service_collection = service.IServiceCollection(application) #pylint: disable=E1101 self.tcpserver = internet.TCPServer(0, self.site) self.tcpserver.setServiceParent(self.service_collection) self.service_collection.startService() def get_url(self): """Build the url for this mock server.""" #pylint: disable=W0212 port_num = self.tcpserver._port.getHost().port return "http://localhost:%d/" % port_num @defer.inlineCallbacks def stop(self): """Shut it down.""" #pylint: disable=E1101 # make the connection time out so that is works with squid3 when # the connection is kept alive. if self.site.protocol.protocolInstance: self.site.protocol.protocolInstance.timeoutConnection() yield self.service_collection.stopService() class ProxyTestCase(SquidTestCase): """A squid test with no auth proxy.""" @defer.inlineCallbacks def setUp(self): """Set the tests.""" yield super(ProxyTestCase, self).setUp() self.ws = MockWebServer() self.proxy_client = None self.addCleanup(self.teardown_client_server) self.url = self.ws.get_url() + SIMPLERESOURCE def teardown_client_server(self): """Clean resources.""" if self.proxy_client is not None: self.proxy_client.shutdown() return defer.gatherResults([self.ws.stop(), self.proxy_client.shutdown(), self.proxy_client.factory.disconnected_d]) else: return self.ws.stop() def access_noauth_url(self, address, port): """Access a url throught the proxy.""" self.proxy_client = ProxyWebClient(proxy_url=address, proxy_port=port) return self.proxy_client.get_page(self.url) def access_auth_url(self, address, port, username, password): """Access a url throught the proxy.""" self.proxy_client = ProxyWebClient(proxy_url=address, proxy_port=port, username=username, password=password) return self.proxy_client.get_page(self.url) @defer.inlineCallbacks def test_noauth_url_access(self): """Test accessing to the url.""" settings = self.get_nonauth_proxy_settings() # if there is an exception we fail. data = yield self.access_noauth_url(settings['host'], settings['port']) self.assertEqual(SAMPLE_RESOURCE, data) @defer.inlineCallbacks def test_auth_url_access(self): """Test accessing to the url.""" settings = self.get_auth_proxy_settings() # if there is an exception we fail. data = yield self.access_auth_url(settings['host'], settings['port'], settings['username'], settings['password']) self.assertEqual(SAMPLE_RESOURCE, data) def test_auth_url_401(self): """Test failing accessing the url.""" settings = self.get_auth_proxy_settings() # swap password for username to fail d = self.failUnlessFailure(self.access_auth_url(settings['host'], settings['port'], settings['password'], settings['username']), error.Error) return d def test_auth_url_407(self): """Test failing accessing the url.""" settings = self.get_auth_proxy_settings() d = self.failUnlessFailure(self.access_noauth_url(settings['host'], settings['port']), error.Error) return d |
The above code is the tests for the test case and the important bits are:
133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 | class SaveHTTPChannel(http.HTTPChannel): """A save protocol to be used in tests.""" protocolInstance = None def connectionMade(self): """Keep track of the given protocol.""" SaveHTTPChannel.protocolInstance = self http.HTTPChannel.connectionMade(self) class SaveSite(server.Site): """A site that let us know when it closed.""" protocol = SaveHTTPChannel def __init__(self, *args, **kwargs): """Create a new instance.""" server.Site.__init__(self, *args, **kwargs) # we disable the timeout in the tests, we will deal with it manually. self.timeOut = None |
The above defines a protocol that will know the instance that it was used so that we can trigger the time out in a clean up function.
190 191 | if self.site.protocol.protocolInstance: self.site.protocol.protocolInstance.timeoutConnection() |
This tells the server to time out.
207 208 209 210 211 212 213 214 215 | def teardown_client_server(self): """Clean resources.""" if self.proxy_client is not None: self.proxy_client.shutdown() return defer.gatherResults([self.ws.stop(), self.proxy_client.shutdown(), self.proxy_client.factory.disconnected_d]) else: return self.ws.stop() |
And the clean up function. That is all, now I guess I’ll move to add proxy support or ensure that the test case works on Windows, which is certainly going to be a diff issue.
Read moreThe following is some code in which I have been working (and stupidly wasting time in a small error) that allows to get a page using a methos similar to twisted.web.client.getPage through a proxy that uses base auth.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 | # -*- coding: utf-8 -*- # # Copyright 2011 Canonical Ltd. # # This program is free software: you can redistribute it and/or modify it # under the terms of the GNU General Public License version 3, as published # by the Free Software Foundation. # # This program is distributed in the hope that it will be useful, but # WITHOUT ANY WARRANTY; without even the implied warranties of # MERCHANTABILITY, SATISFACTORY QUALITY, or FITNESS FOR A PARTICULAR # PURPOSE. See the GNU General Public License for more details. # # You should have received a copy of the GNU General Public License along # with this program. If not, see <http://www.gnu.org/licenses/>. """Test the squid test case.""" import base64 from twisted.internet import defer, reactor from twisted.web import client, error, http from ubuntuone.devtools.testcases.squid import SquidTestCase # ignore common twisted lint errors # pylint: disable=C0103, W0212 class ProxyClientFactory(client.HTTPClientFactory): """Factory that supports proxy.""" def __init__(self, proxy_url, proxy_port, url, headers=None): self.proxy_url = proxy_url self.proxy_port = proxy_port client.HTTPClientFactory.__init__(self, url, headers=headers) def setURL(self, url): self.host = self.proxy_url self.port = self.proxy_port self.url = url self.path = url class ProxyWebClient(object): """Provide useful web methods with proxy.""" def __init__(self, proxy_url=None, proxy_port=None, username=None, password=None): """Create a new instance with the proxy settings.""" self.proxy_url = proxy_url self.proxy_port = proxy_port self.username = username self.password = password def _process_auth_error(self, failure, url, contextFactory): """Process an auth failure.""" # we try to get the page using the basic auth failure.trap(error.Error) if failure.value.status == str(http.PROXY_AUTH_REQUIRED): auth = base64.b64encode('%s:%s' % (self.username, self.password)) auth_header = 'Basic ' + auth.strip() factory = ProxyClientFactory(self.proxy_url, self.proxy_port, url, headers={'Proxy-Authorization': auth_header}) # pylint: disable=E1101 reactor.connectTCP(self.proxy_url, self.proxy_port, factory) # pylint: enable=E1101 return factory.deferred else: return failure def get_page(self, url, contextFactory=None, *args, **kwargs): """Download a webpage as a string. This method relies on the twisted.web.client.getPage but adds and extra step. If there is an auth error the method will perform a second try so that the username and password are used. """ scheme, _, _, _ = client._parse(url) factory = ProxyClientFactory(self.proxy_url, self.proxy_port, url) if scheme == 'https': from twisted.internet import ssl if contextFactory is None: contextFactory = ssl.ClientContextFactory() # pylint: disable=E1101 reactor.connectSSL(self.proxy_url, self.proxy_port, factory, contextFactory) # pylint: enable=E1101 else: # pylint: disable=E1101 reactor.connectTCP(self.proxy_url, self.proxy_port, factory) # pylint: enable=E1101 factory.deferred.addErrback(self._process_auth_error, url, contextFactory) return factory.deferred |
I hope that this helps anyone out there
Recently a very interesting bug has been reported agains Ubuntu One on Windows. Apparently we try to sync a number of system folders that are present on Windows 7 to be backward compatible.
The actual problem in the code is that we are using os.listdir. While lisdir on python does return system folders (at the end of the day, they are there) os.walk does not, for example, lets imaging hat we have the following scenario:
Documents
My Pictures (System folder)
My Videos (System folder)
Random dir
Random Text.txt
If we run os.listdir we would have the following:
import os >> os.listdir('Documents') ['My Pictures', 'My Videos', 'Random dir', 'Random Text.txt']
While if we use os.walk we have:
import os path, dirs, files = os.walk('Documents') print dirs >> ['Random dir'] print files >> ['Random Text.txt']
The fix is very simple, simply filter the result from os.listdir using the following function:
import win32file INVALID_FILE_ATTRIBUTES = -1 def is_system_path(path): """Return if the function is a system path.""" attrs = win32file.GetFileAttributesW(path) if attrs == INVALID_FILE_ATTRIBUTES: return False return win32file.FILE_ATTRIBUTE_SYSTEM & attrs ==\ win32file.FILE_ATTRIBUTE_SYSTEM
An interesting question to ask after the above is, how does ReadDirectoryChangesW work with systen directories? Well, thankfully it works correctly. What does that mean? Well, it means the following:
The above means that if you have a system folder in a watch path you do not need to worry since the events will work correctly, which are very very good news.
Read moreHaving a DirtyReactorException in your tests is a PITA and bug 885342 was that type on annoying bug. Since I use this bug not only to tell others what I’m doing but as a log for myself here it is the way to clean the resources nicely when you are testing your PB clients and servers (I mention PB because we use that, a similar approach can be used with any protocol) inspired by this way more interesting post.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 | class SaveProtocolServerFactory(PBServerFactory): """A PBServerFactory that saves the latest connected client.""" protocolInstance = None def clientConnectionMade(self, protocol): """Keep track of the given protocol.""" self.protocolInstance = protocol class SaveClientFactory(PBClientFactory): """Client Factory that knows when we disconnected.""" def __init__(self, connected_d, disconnected_d): """Create a new instance.""" PBClientFactory.__init__(self) self.connected_d = connected_d self.disconnected_d = disconnected_d def clientConnectionMade(self, broker): """Connection made.""" PBClientFactory.clientConnectionMade(self, broker) self.connected_d.callback(True) def clientConnectionLost(self, connector, reason, reconnecting=0): """Connection lost.""" self.disconnected_d.callback(True) class ServerProtocol(Broker): """Server protocol that allows us to clean the tests.""" def connectionLost(self, *a): self.factory.onConnectionLost.callback(self) class ConnectedTestCase(TestCase): """Base test case with a client and a server.""" @defer.inlineCallbacks def setUp(self): """Set up for the tests.""" yield super(ConnectedTestCase, self).setUp() self.server_disconnected = defer.Deferred() self.client_disconnected = defer.Deferred() self.listener = None self.connector = None self.server_factory = None self.client_factory = None def setup_client_server(self, sso_root): """Set tests.""" port = get_sso_pb_port() self.listener = self._listen_server(sso_root, self.server_disconnected, port) connected = defer.Deferred() self.connector = self._connect_client(connected, self.client_disconnected, port) self.addCleanup(self.teardown_client_server) return connected def _listen_server(self, sso_root, d, port): """Start listenting.""" self.server_factory = SaveProtocolServerFactory(sso_root) self.server_factory.onConnectionLost = d self.server_factory.protocol = ServerProtocol return reactor.listenTCP(port, self.server_factory) def _connect_client(self, d1, d2, port): """Connect client.""" self.client_factory = SaveClientFactory(d1, d2) return reactor.connectTCP(LOCALHOST, port, self.client_factory) def teardown_client_server(self): """Clean resources.""" self.connector.disconnect() d = defer.maybeDeferred(self.listener.stopListening) return defer.gatherResults([d, self.client_disconnected, self.server_disconnected]) |
Two really good developers, Alecu and Diego, have discovered a very interestning bug in the os.path.expanduser function in Python. If you have a user in your Windows machine with a name hat uses Japanese characters like “??????” you will have the following in your system:
The above is clearly a problem, specially when the implementation of os.path.expanduser on Winodws is:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 | def expanduser(path): """Expand ~ and ~user constructs. If user or $HOME is unknown, do nothing.""" if path[:1] != '~': return path i, n = 1, len(path) while i < n and path[i] not in '/\\': i = i + 1 if 'HOME' in os.environ: userhome = os.environ['HOME'] elif 'USERPROFILE' in os.environ: userhome = os.environ['USERPROFILE'] elif not 'HOMEPATH' in os.environ: return path else: try: drive = os.environ['HOMEDRIVE'] except KeyError: drive = '' userhome = join(drive, os.environ['HOMEPATH']) if i != 1: #~user userhome = join(dirname(userhome), path[1:i]) return userhome + path[i:] |
For the time being my proposed fix for Ubuntu One is to do the following:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 | import ctypes from ctypes import windll, wintypes class GUID(ctypes.Structure): _fields_ = [ ('Data1', wintypes.DWORD), ('Data2', wintypes.WORD), ('Data3', wintypes.WORD), ('Data4', wintypes.BYTE * 8) ] def __init__(self, l, w1, w2, b1, b2, b3, b4, b5, b6, b7, b8): """Create a new GUID.""" self.Data1 = l self.Data2 = w1 self.Data3 = w2 self.Data4[:] = (b1, b2, b3, b4, b5, b6, b7, b8) def __repr__(self): b1, b2, b3, b4, b5, b6, b7, b8 = self.Data4 return 'GUID(%x-%x-%x-%x%x%x%x%x%x%x%x)' % ( self.Data1, self.Data2, self.Data3, b1, b2, b3, b4, b5, b6, b7, b8) # constants to be used according to the version on shell32 CSIDL_PROFILE = 40 FOLDERID_Profile = GUID(0x5E6C858F, 0x0E22, 0x4760, 0x9A, 0xFE, 0xEA, 0x33, 0x17, 0xB6, 0x71, 0x73) def expand_user(): # get the function that we can find from Vista up, not the one in XP get_folder_path = getattr(windll.shell32, 'SHGetKnownFolderPath', None) if get_folder_path is not None: # ok, we can use the new function which is recomended by the msdn ptr = ctypes.c_wchar_p() get_folder_path(ctypes.byref(FOLDERID_Profile), 0, 0, ctypes.byref(ptr)) return ptr.value else: # use the deprecated one found in XP and on for compatibility reasons get_folder_path = getattr(windll.shell32, 'SHGetSpecialFolderPathW', None) buf = ctypes.create_unicode_buffer(300) get_folder_path(None, buf, CSIDL_PROFILE, False) return buf.value |
The above code ensure that we only use SHGetFolderPathW when SHGetKnownFolderPathW is not available in the system. The reasoning for that is that SHGetFolderPathW is deprecated and new applications are encourage to use SHGetKnownFolderPathW.
A much better solution is to patch ntpath.py so that is something like what I propose for Ubuntu One. Does anyone know if this is fixed in Python 3? Shall I propose a fix?
PS: For ref I got the GUI value from here.
Read moreOn Ubuntu One we use BtiRock to create the packages for Windows. One of the new features I’m working on is to check if there are updates every so often so that the user gets notified. This code on Ubuntu is not needed because the Update Manger takes care of that, but when you work in an inferior OS…
In order to check for updates we use the generated auto-update.exe wizard from BitRock. Generating the wizard is very straight forward first, as with most of the BitRock stuff, we generate the XML to configure the generated .exe.
<autoUpdateProject>
<fullName>Ubuntu One</fullName>
<shortName>ubuntuone</shortName>
<vendor>Canonical</vendor>
<version>201</version>
<singleInstanceCheck>1</singleInstanceCheck>
<requireInstallationByRootUser>0</requireInstallationByRootUser>
<requestedExecutionLevel>asInvoker</requestedExecutionLevel>
</autoUpdateProject>
There is just a single thing that is worth mentioning about the above XML. The requireInstallationByRootUser is true because we use the generated .exe to check if there are updates present and we do not what the user to have to be root for that, it does not make sense. Once you have the above or similar XML you can execute:
{$bitrock_installation$}\autoupdate\bin\customize.exe" build ubuntuone_autoupdate.xml windows
Which generates the .exe (the output is in ~\Documents\AutoUpdate\output).
The following code provides an example of a couple of functions that can be used by the application, first to check for an update, and to perform the actual update.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 | import os import sys # Avoid pylint error on Linux # pylint: disable=F0401 import win32api # pylint: enable=F0401 from twisted.internet import defer from twisted.internet.utils import getProcessValue AUTOUPDATE_EXE_NAME = 'autoupdate-windows.exe' def _get_update_path(): """Return the path in which the autoupdate command is found.""" if hasattr(sys, "frozen"): exec_path = os.path.abspath(sys.executable) else: exec_path = os.path.dirname(__file__) folder = os.path.dirname(exec_path) update_path = os.path.join(folder, AUTOUPDATE_EXE_NAME) if os.path.exists(update_path): return update_path return None @defer.inlineCallbacks def are_updates_present(logger): """Return if there are updates for Ubuntu One.""" update_path = _get_update_path() logger.debug('Update path %s', update_path) if update_path is not None: # If there is an update present we will get 0 and other number # otherwise retcode = yield getProcessValue(update_path, args=('--mode', 'unattended'), path=os.path.dirname(update_path)) logger.debug('Return code %s', retcode) if retcode == 0: logger.debug('Returning True') defer.returnValue(True) logger.debug('Returning False') defer.returnValue(False) def perform_update(): """Spawn the autoupdate process and call the stop function.""" update_path = _get_update_path() if update_path is not None: # lets call the updater with the commands that are required, win32api.ShellExecute(None, 'runas', update_path, '--unattendedmodeui none', '', 0) |
With the above you should be able to easily update the installation of your frozen python app on Windows when using BitRock.
Read moreFollowing my last post regarding how to list all installed applications using python here is the code that one will require to remove an installed msi from a Windows machine using python.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 | class MsiException(Exception): """Raised when there are issues with the msi actions.""" def uninstall_product(uid): """Will remove the old beta from the users machine.""" # we use the msi lib to be able to uninstall apps property_name = u'LocalPackage' uninstall_path = get_property_for_product(uid, property_name) if uninstall_path is not None: # lets remove the package. command_line = u'REMOVE=ALL' result = windll.msi.MsiInstallProductW(uninstall_path, command_line) if result != ERROR_SUCCESS: raise MsiException('Could not remove product %s' % uid) |
The missing functions can be found in the last post about the topic.
Read moreThe new Ubuntu One Windows client is very close to be released (we have already been sending the new code to our beta testers) and in order to make life easier to new user we wanted to provide a migration script that will allow the user migrate his data to the new client and uninstall the old one. In order to be able to know if the old msi is present in the system we had to use the Windows Installer SDK to query the installed applications and find if the old Ubuntu One client is present.
The following code is a small script that contains the functions to query the installed software in the system which is very similar to the script found in WiLstPrd.vbs but using python instead of VB and ctypes.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 | # This scripts allows to get a list of all installed products in a windows # machine. The code uses ctypes becuase there were a number of issues when # trying to achieve the same win win32com.client from collections import namedtuple from ctypes import byref, create_unicode_buffer, windll from ctypes.wintypes import DWORD from itertools import count # defined at http://msdn.microsoft.com/en-us/library/aa370101(v=VS.85).aspx UID_BUFFER_SIZE = 39 PROPERTY_BUFFER_SIZE = 256 ERROR_MORE_DATA = 234 ERROR_INVALID_PARAMETER = 87 ERROR_SUCCESS = 0 ERROR_NO_MORE_ITEMS = 259 ERROR_UNKNOWN_PRODUCT = 1605 # diff propoerties of a product, not all products have all properties PRODUCT_PROPERTIES = [u'Language', u'ProductName', u'PackageCode', u'Transforms', u'AssignmentType', u'PackageName', u'InstalledProductName', u'VersionString', u'RegCompany', u'RegOwner', u'ProductID', u'ProductIcon', u'InstallLocation', u'InstallSource', u'InstallDate', u'Publisher', u'LocalPackage', u'HelpLink', u'HelpTelephone', u'URLInfoAbout', u'URLUpdateInfo',] # class to be used for python users :) Product = namedtuple('Product', PRODUCT_PROPERTIES) def get_property_for_product(product, property, buf_size=PROPERTY_BUFFER_SIZE): """Retruns the value of a fiven property from a product.""" property_buffer = create_unicode_buffer(buf_size) size = DWORD(buf_size) result = windll.msi.MsiGetProductInfoW(product, property, property_buffer, byref(size)) if result == ERROR_MORE_DATA: return get_property_for_product(product, property, 2 * buf_size) elif result == ERROR_SUCCESS: return property_buffer.value else: return None def populate_product(uid): """Return a Product with the different present data.""" properties = [] for property in PRODUCT_PROPERTIES: properties.append(get_property_for_product(uid, property)) return Product(*properties) def get_installed_products_uids(): """Returns a list with all the different uid of the installed apps.""" # enum will return an error code according to the result of the app products = [] for i in count(0): uid_buffer = create_unicode_buffer(UID_BUFFER_SIZE) result = windll.msi.MsiEnumProductsW(i, uid_buffer) if result == ERROR_NO_MORE_ITEMS: # done interating over the collection break products.append(uid_buffer.value) return products def get_installed_products(): """Returns a collection of products that are installed in the system.""" products = [] for puid in get_installed_products_uids(): products.append(populate_product(puid)) return products def is_product_installed_uid(uid): """Return if a product with the given id is installed. uid Most be a unicode object with the uid of the product using the following format {uid} """ # we try to get the VersisonString for the uid, if we get an error it means # that the product is not installed in the system. buf_size = 256 uid_buffer = create_unicode_buffer(uid) property = u'VersionString' property_buffer = create_unicode_buffer(buf_size) size = DWORD(buf_size) result = windll.msi.MsiGetProductInfoW(uid_buffer, property, property_buffer, byref(size)) if result == ERROR_UNKNOWN_PRODUCT: return False else: return True |
The above code will allow a python developer to check which products are installed on Windows as well as to know if a product with the given UID is indeed installed.
Read moreIn a few days (well, if I find some kind person to take care of Iron) I will be attending the Ubuntu One Developer evening in which we should be able to hear Stuart will be talking about the bunch of crazy ideas he has for developers to use or infrastructure to do cool stuff. I’ll be there for two reason:
If you are going to be in Manchester you should join us, the event is FREE and trust me Stuart is a great guy to go out for drinks with (I’m not bad either, but I always get in trouble
).
Latest Official Posts
Featured Blogs
People
You can't take the sky from me
Alex Chiang
allenap
allenap/maas
Amit Kucheria
Andres Rodriguez
Andrew Glen-Young
Ara Pulido
Barry Warsaw
Bazaar team
Bitácora de Vuelo
Bjoern Michaelsen
Björn Tillenius
Blogging in the Wind
Bofu Chen
Brad Figg
Brad Marshall
Brian Fromme
Canonical Blog
Canonical Design Blog
Canonical ISD
Canonical Marketing Team Blog
cat /dev/ursula
cat /dev/ursula
Certifiably (Brendan Donegan's Ubuntu Blog)
Chad Miller
Chris Halse Rogers
Chris Johnston
Christian Reis
Code Singer: Gary Poster's blog
Corey Goldberg
Daniel Holbach's blog
Danilo Segan
Darryl Weaver
David Henningsson
David Murphy
David Murphy
David Owen
David Planella
Distributed Teams
Gavin Panella
Graham Binns
Guilherme Salgado
Gustavo Niemeyer
How Bazaar
Iain Lane
Illruminations
Inert Ramblings
James Tait
James Westby
Jamie Strandboge
jedimike's adventures in typing
Jeremy Kerr
Joey Stanford
John Pugh
Jono Bacon
Jorge Castro
Julian Edwards
Julien Funk
JussiP
Ken VanDine
Keng-Yu Lin
kevin gunn
KyleN Ubuntu
KyleN Ubuntu
Landscape Blog
Launchpad Blog
Launchpad Blog
Lee Jones
Louis Bouchard
Manuel de la Pena
Marcin Juszkiewicz
Mark Shuttleworth
Martin Albisetti's blog
Martin Pitt
Matt Fischer
Michael Hall's Blog
Michael Hudson
Michael Terry
Multi-touch on Ubuntu
Not Lucky All The Time, But Smart Everyday…
Olli's random thoughts and impressions
person@CANONICAL-DESK
person@CANONICAL-DESK
Pixoul Photography
Prakash Advani
racarr's blog
racarrs blog!
RedVoodoo.org
Ricardo Salveti
Rick Harding
Robert Ancell
Robert Ayres
Ryan Finnie
S3hh
Scott Sweeny
Sean Feole
Shang Wu
Shuduo
Sidnei da Silva
sil2100//vx web-page
Smackerel of Opinion
Something driven development
Stéphane Graber
Steve George
Steve Langasek
Stuart Bishop
Stuart Metcalfe
Subcritical
Ted Gould
The Dowdberrys
The Orange Notebook
The Quality Hour
The Raving Rick
Timo Jyrinki
tvoss@work
Ubuntu App Developer Blog
Ubuntu Kernel Team Blog
Ubuntu One Blog
Ubuntu Server Team
Ubuntu Server Team
Ubuntu Server Team Blog
utlemming
utlemming's blog
Victor Palau's Blog
Wanderings of a Kernel Engineer
ZhengPeng Hou
~apw
Canonical Voices© 2010 Canonical Ltd. Ubuntu and Canonical are registered trademarks of Canonical Ltd.