From 8d5b316cd0e1816ac02aa81f01598c4004d312dc Mon Sep 17 00:00:00 2001 From: Iustin Pop <iustin@google.com> Date: Thu, 22 Jan 2009 16:39:40 +0000 Subject: [PATCH] luxi: close and reopen the socket on errors This is less of an actual issue for regular gnt-* clients, but it's easily reproducible with burnin and possible with RAPI (depending on how the program uses luxi.Client(s)). In case of burnin, if we interrupt the client (^C) while it polls the job, it will abort and raise an error. After that, burnin issues a remove instance job, and at this point, we send the submit job (remove) call but the first thing we read from the socket will be the response to the previous poll job request, since that was queued already from the master. To solve this, whenever we detect an error in Transport.Call(), we close that transport and re-create a new one, to start anew. The other alternative would be to introduce a sequence to the protocol, but this is something that would be design-level change and it's not recommended at this stage. Reviewed-by: imsnah --- lib/luxi.py | 41 ++++++++++++++++++++++++++++++++++++++--- 1 file changed, 38 insertions(+), 3 deletions(-) diff --git a/lib/luxi.py b/lib/luxi.py index d248a741a..7b735aa5b 100644 --- a/lib/luxi.py +++ b/lib/luxi.py @@ -21,7 +21,7 @@ """Module for the unix socket protocol -This module implements the local unix socket protocl. You only need +This module implements the local unix socket protocol. You only need this module and the opcodes module in the client program in order to communicate with the master. @@ -252,7 +252,32 @@ class Client(object): """ if address is None: address = constants.MASTER_SOCKET - self.transport = transport(address, timeouts=timeouts) + self.address = address + self.timeouts = timeouts + self.transport_class = transport + self.transport = None + self._InitTransport() + + def _InitTransport(self): + """(Re)initialize the transport if needed. + + """ + if self.transport is None: + self.transport = self.transport_class(self.address, + timeouts=self.timeouts) + + def _CloseTransport(self): + """Close the transport, ignoring errors. + + """ + if self.transport is None: + return + try: + old_transp = self.transport + self.transport = None + old_transp.Close() + except Exception, err: + pass def CallMethod(self, method, args): """Send a generic request and return the response. @@ -264,8 +289,18 @@ class Client(object): KEY_ARGS: args, } + # Serialize the request + send_data = serializer.DumpJson(request, indent=False) + # Send request and wait for response - result = self.transport.Call(serializer.DumpJson(request, indent=False)) + try: + self._InitTransport() + result = self.transport.Call(send_data) + except Exception: + self._CloseTransport() + raise + + # Parse the result try: data = serializer.LoadJson(result) except Exception, err: -- GitLab