luxi: close and reopen the socket on errors
authorIustin Pop <iustin@google.com>
Thu, 22 Jan 2009 16:39:40 +0000 (16:39 +0000)
committerIustin Pop <iustin@google.com>
Thu, 22 Jan 2009 16:39:40 +0000 (16:39 +0000)
This is less of an actual issue for regular gnt-* clients, but it's
easily reproducible with burnin and possible with RAPI (depending on how
the program uses luxi.Client(s)).

In case of burnin, if we interrupt the client (^C) while it polls the
job, it will abort and raise an error. After that, burnin issues a
remove instance job, and at this point, we send the submit job (remove)
call but the first thing we read from the socket will be the response to
the previous poll job request, since that was queued already from the
master.

To solve this, whenever we detect an error in Transport.Call(), we close
that transport and re-create a new one, to start anew. The other
alternative would be to introduce a sequence to the protocol, but this
is something that would be design-level change and it's not recommended
at this stage.

Reviewed-by: imsnah

lib/luxi.py

index d248a74..7b735aa 100644 (file)
@@ -21,7 +21,7 @@
 
 """Module for the unix socket protocol
 
-This module implements the local unix socket protocl. You only need
+This module implements the local unix socket protocol. You only need
 this module and the opcodes module in the client program in order to
 communicate with the master.
 
@@ -252,7 +252,32 @@ class Client(object):
     """
     if address is None:
       address = constants.MASTER_SOCKET
-    self.transport = transport(address, timeouts=timeouts)
+    self.address = address
+    self.timeouts = timeouts
+    self.transport_class = transport
+    self.transport = None
+    self._InitTransport()
+
+  def _InitTransport(self):
+    """(Re)initialize the transport if needed.
+
+    """
+    if self.transport is None:
+      self.transport = self.transport_class(self.address,
+                                            timeouts=self.timeouts)
+
+  def _CloseTransport(self):
+    """Close the transport, ignoring errors.
+
+    """
+    if self.transport is None:
+      return
+    try:
+      old_transp = self.transport
+      self.transport = None
+      old_transp.Close()
+    except Exception, err:
+      pass
 
   def CallMethod(self, method, args):
     """Send a generic request and return the response.
@@ -264,8 +289,18 @@ class Client(object):
       KEY_ARGS: args,
       }
 
+    # Serialize the request
+    send_data = serializer.DumpJson(request, indent=False)
+
     # Send request and wait for response
-    result = self.transport.Call(serializer.DumpJson(request, indent=False))
+    try:
+      self._InitTransport()
+      result = self.transport.Call(send_data)
+    except Exception:
+      self._CloseTransport()
+      raise
+
+    # Parse the result
     try:
       data = serializer.LoadJson(result)
     except Exception, err: