Return error messages in node add ssh handling
[ganeti-local] / lib / luxi.py
index 9117253..7b735aa 100644 (file)
 
 """Module for the unix socket protocol
 
 
 """Module for the unix socket protocol
 
-This module implements the local unix socket protocl. You only need
+This module implements the local unix socket protocol. You only need
 this module and the opcodes module in the client program in order to
 communicate with the master.
 
 this module and the opcodes module in the client program in order to
 communicate with the master.
 
-The module is also be used by the master daemon.
+The module is also used by the master daemon.
 
 """
 
 
 """
 
@@ -54,6 +54,7 @@ REQ_QUERY_INSTANCES = "QueryInstances"
 REQ_QUERY_NODES = "QueryNodes"
 REQ_QUERY_EXPORTS = "QueryExports"
 REQ_QUERY_CONFIG_VALUES = "QueryConfigValues"
 REQ_QUERY_NODES = "QueryNodes"
 REQ_QUERY_EXPORTS = "QueryExports"
 REQ_QUERY_CONFIG_VALUES = "QueryConfigValues"
+REQ_QUEUE_SET_DRAIN_FLAG = "SetDrainFlag"
 
 DEF_CTMO = 10
 DEF_RWTO = 60
 
 DEF_CTMO = 10
 DEF_RWTO = 60
@@ -251,7 +252,32 @@ class Client(object):
     """
     if address is None:
       address = constants.MASTER_SOCKET
     """
     if address is None:
       address = constants.MASTER_SOCKET
-    self.transport = transport(address, timeouts=timeouts)
+    self.address = address
+    self.timeouts = timeouts
+    self.transport_class = transport
+    self.transport = None
+    self._InitTransport()
+
+  def _InitTransport(self):
+    """(Re)initialize the transport if needed.
+
+    """
+    if self.transport is None:
+      self.transport = self.transport_class(self.address,
+                                            timeouts=self.timeouts)
+
+  def _CloseTransport(self):
+    """Close the transport, ignoring errors.
+
+    """
+    if self.transport is None:
+      return
+    try:
+      old_transp = self.transport
+      self.transport = None
+      old_transp.Close()
+    except Exception, err:
+      pass
 
   def CallMethod(self, method, args):
     """Send a generic request and return the response.
 
   def CallMethod(self, method, args):
     """Send a generic request and return the response.
@@ -263,8 +289,18 @@ class Client(object):
       KEY_ARGS: args,
       }
 
       KEY_ARGS: args,
       }
 
+    # Serialize the request
+    send_data = serializer.DumpJson(request, indent=False)
+
     # Send request and wait for response
     # Send request and wait for response
-    result = self.transport.Call(serializer.DumpJson(request, indent=False))
+    try:
+      self._InitTransport()
+      result = self.transport.Call(send_data)
+    except Exception:
+      self._CloseTransport()
+      raise
+
+    # Parse the result
     try:
       data = serializer.LoadJson(result)
     except Exception, err:
     try:
       data = serializer.LoadJson(result)
     except Exception, err:
@@ -291,6 +327,9 @@ class Client(object):
 
     return result
 
 
     return result
 
+  def SetQueueDrainFlag(self, drain_flag):
+    return self.CallMethod(REQ_QUEUE_SET_DRAIN_FLAG, drain_flag)
+
   def SubmitJob(self, ops):
     ops_state = map(lambda op: op.__getstate__(), ops)
     return self.CallMethod(REQ_SUBMIT_JOB, ops_state)
   def SubmitJob(self, ops):
     ops_state = map(lambda op: op.__getstate__(), ops)
     return self.CallMethod(REQ_SUBMIT_JOB, ops_state)
@@ -302,7 +341,8 @@ class Client(object):
     return self.CallMethod(REQ_ARCHIVE_JOB, job_id)
 
   def AutoArchiveJobs(self, age):
     return self.CallMethod(REQ_ARCHIVE_JOB, job_id)
 
   def AutoArchiveJobs(self, age):
-    return self.CallMethod(REQ_AUTOARCHIVE_JOBS, age)
+    timeout = (DEF_RWTO - 1) / 2
+    return self.CallMethod(REQ_AUTOARCHIVE_JOBS, (age, timeout))
 
   def WaitForJobChange(self, job_id, fields, prev_job_info, prev_log_serial):
     timeout = (DEF_RWTO - 1) / 2
 
   def WaitForJobChange(self, job_id, fields, prev_job_info, prev_log_serial):
     timeout = (DEF_RWTO - 1) / 2
@@ -329,4 +369,5 @@ class Client(object):
   def QueryConfigValues(self, fields):
     return self.CallMethod(REQ_QUERY_CONFIG_VALUES, fields)
 
   def QueryConfigValues(self, fields):
     return self.CallMethod(REQ_QUERY_CONFIG_VALUES, fields)
 
+
 # TODO: class Server(object)
 # TODO: class Server(object)