Catch BlockDeviceError when starting instance
[ganeti-local] / lib / rpc.py
index 5850cd2..a9619f9 100644 (file)
 
 import os
 import socket
 
 import os
 import socket
-import httplib
 import logging
 import logging
+import zlib
+import base64
 
 from ganeti import utils
 from ganeti import objects
 from ganeti import http
 from ganeti import serializer
 
 from ganeti import utils
 from ganeti import objects
 from ganeti import http
 from ganeti import serializer
+from ganeti import constants
+from ganeti import errors
+
+import ganeti.http.client
 
 
 # Module level variable
 
 
 # Module level variable
@@ -55,7 +60,7 @@ def Init():
 
   assert not _http_manager, "RPC module initialized more than once"
 
 
   assert not _http_manager, "RPC module initialized more than once"
 
-  _http_manager = http.HttpClientManager()
+  _http_manager = http.client.HttpClientManager()
 
 
 def Shutdown():
 
 
 def Shutdown():
@@ -71,6 +76,54 @@ def Shutdown():
     _http_manager = None
 
 
     _http_manager = None
 
 
+class RpcResult(object):
+  """RPC Result class.
+
+  This class holds an RPC result. It is needed since in multi-node
+  calls we can't raise an exception just because one one out of many
+  failed, and therefore we use this class to encapsulate the result.
+
+  @ivar data: the data payload, for successfull results, or None
+  @type failed: boolean
+  @ivar failed: whether the operation failed at RPC level (not
+      application level on the remote node)
+  @ivar call: the name of the RPC call
+  @ivar node: the name of the node to which we made the call
+  @ivar offline: whether the operation failed because the node was
+      offline, as opposed to actual failure; offline=True will always
+      imply failed=True, in order to allow simpler checking if
+      the user doesn't care about the exact failure mode
+
+  """
+  def __init__(self, data=None, failed=False, offline=False,
+               call=None, node=None):
+    self.failed = failed
+    self.offline = offline
+    self.call = call
+    self.node = node
+    if offline:
+      self.failed = True
+      self.error = "Node is marked offline"
+      self.data = None
+    elif failed:
+      self.error = data
+      self.data = None
+    else:
+      self.data = data
+      self.error = None
+
+  def Raise(self):
+    """If the result has failed, raise an OpExecError.
+
+    This is used so that LU code doesn't have to check for each
+    result, but instead can call this function.
+
+    """
+    if self.failed:
+      raise errors.OpExecError("Call '%s' to node '%s' has failed: %s" %
+                               (self.call, self.node, self.error))
+
+
 class Client:
   """RPC Client class.
 
 class Client:
   """RPC Client class.
 
@@ -83,15 +136,16 @@ class Client:
   cause bugs.
 
   """
   cause bugs.
 
   """
-  def __init__(self, procedure, args):
+  def __init__(self, procedure, body, port):
     self.procedure = procedure
     self.procedure = procedure
-    self.args = args
-    self.body = serializer.DumpJson(args, indent=False)
-
-    self.port = utils.GetNodeDaemonPort()
-    self.nodepw = utils.GetNodeDaemonPassword()
+    self.body = body
+    self.port = port
     self.nc = {}
 
     self.nc = {}
 
+    self._ssl_params = \
+      http.HttpSslParams(ssl_key_path=constants.SSL_CERT_FILE,
+                         ssl_cert_path=constants.SSL_CERT_FILE)
+
   def ConnectList(self, node_list, address_list=None):
     """Add a list of nodes to the target nodes.
 
   def ConnectList(self, node_list, address_list=None):
     """Add a list of nodes to the target nodes.
 
@@ -122,9 +176,12 @@ class Client:
     if address is None:
       address = name
 
     if address is None:
       address = name
 
-    self.nc[name] = http.HttpClientRequest(address, self.port, http.HTTP_PUT,
-                                           "/%s" % self.procedure,
-                                           post_data=self.body)
+    self.nc[name] = \
+      http.client.HttpClientRequest(address, self.port, http.HTTP_PUT,
+                                    "/%s" % self.procedure,
+                                    post_data=self.body,
+                                    ssl_params=self._ssl_params,
+                                    ssl_verify_peer=True)
 
   def GetResults(self):
     """Call nodes and return results.
 
   def GetResults(self):
     """Call nodes and return results.
@@ -140,17 +197,20 @@ class Client:
     results = {}
 
     for name, req in self.nc.iteritems():
     results = {}
 
     for name, req in self.nc.iteritems():
-      if req.success and req.resp_status == http.HTTP_OK:
-        results[name] = serializer.LoadJson(req.resp_body)
+      if req.success and req.resp_status_code == http.HTTP_OK:
+        results[name] = RpcResult(data=serializer.LoadJson(req.resp_body),
+                                  node=name, call=self.procedure)
         continue
 
         continue
 
+      # TODO: Better error reporting
       if req.error:
         msg = req.error
       else:
         msg = req.resp_body
 
       logging.error("RPC error from node %s: %s", name, msg)
       if req.error:
         msg = req.error
       else:
         msg = req.resp_body
 
       logging.error("RPC error from node %s: %s", name, msg)
-      results[name] = False
+      results[name] = RpcResult(data=msg, failed=True, node=name,
+                                call=self.procedure)
 
     return results
 
 
     return results
 
@@ -167,6 +227,7 @@ class RpcRunner(object):
 
     """
     self._cfg = cfg
 
     """
     self._cfg = cfg
+    self.port = utils.GetNodeDaemonPort()
 
   def _InstDict(self, instance):
     """Convert the given instance to a dict.
 
   def _InstDict(self, instance):
     """Convert the given instance to a dict.
@@ -197,14 +258,22 @@ class RpcRunner(object):
 
     """
     all_nodes = self._cfg.GetAllNodesInfo()
 
     """
     all_nodes = self._cfg.GetAllNodesInfo()
+    name_list = []
     addr_list = []
     addr_list = []
+    skip_dict = {}
     for node in node_list:
       if node in all_nodes:
     for node in node_list:
       if node in all_nodes:
+        if all_nodes[node].offline:
+          skip_dict[node] = RpcResult(node=node, offline=True)
+          continue
         val = all_nodes[node].primary_ip
       else:
         val = None
       addr_list.append(val)
         val = all_nodes[node].primary_ip
       else:
         val = None
       addr_list.append(val)
-    client.ConnectList(node_list, address_list=addr_list)
+      name_list.append(node)
+    if name_list:
+      client.ConnectList(name_list, address_list=addr_list)
+    return skip_dict
 
   def _ConnectNode(self, client, node):
     """Helper for computing one node's address.
 
   def _ConnectNode(self, client, node):
     """Helper for computing one node's address.
@@ -217,43 +286,79 @@ class RpcRunner(object):
     """
     node_info = self._cfg.GetNodeInfo(node)
     if node_info is not None:
     """
     node_info = self._cfg.GetNodeInfo(node)
     if node_info is not None:
+      if node_info.offline:
+        return RpcResult(node=node, offline=True)
       addr = node_info.primary_ip
     else:
       addr = None
     client.ConnectNode(node, address=addr)
 
       addr = node_info.primary_ip
     else:
       addr = None
     client.ConnectNode(node, address=addr)
 
-  def _MultiNodeCall(self, node_list, procedure, args,
-                     address_list=None):
-    c = Client(procedure, args)
-    if address_list is None:
-      self._ConnectList(c, node_list)
-    else:
-      c.ConnectList(node_list, address_list=address_list)
-    return c.GetResults()
+  def _MultiNodeCall(self, node_list, procedure, args):
+    """Helper for making a multi-node call
+
+    """
+    body = serializer.DumpJson(args, indent=False)
+    c = Client(procedure, body, self.port)
+    skip_dict = self._ConnectList(c, node_list)
+    skip_dict.update(c.GetResults())
+    return skip_dict
 
   @classmethod
   def _StaticMultiNodeCall(cls, node_list, procedure, args,
                            address_list=None):
 
   @classmethod
   def _StaticMultiNodeCall(cls, node_list, procedure, args,
                            address_list=None):
-    c = Client(procedure, args)
+    """Helper for making a multi-node static call
+
+    """
+    body = serializer.DumpJson(args, indent=False)
+    c = Client(procedure, body, utils.GetNodeDaemonPort())
     c.ConnectList(node_list, address_list=address_list)
     return c.GetResults()
 
   def _SingleNodeCall(self, node, procedure, args):
     c.ConnectList(node_list, address_list=address_list)
     return c.GetResults()
 
   def _SingleNodeCall(self, node, procedure, args):
-    """
+    """Helper for making a single-node call
 
     """
 
     """
-    c = Client(procedure, args)
-    self._ConnectNode(c, node)
-    return c.GetResults().get(node, False)
+    body = serializer.DumpJson(args, indent=False)
+    c = Client(procedure, body, self.port)
+    result = self._ConnectNode(c, node)
+    if result is None:
+      # we did connect, node is not offline
+      result = c.GetResults()[node]
+    return result
 
   @classmethod
   def _StaticSingleNodeCall(cls, node, procedure, args):
 
   @classmethod
   def _StaticSingleNodeCall(cls, node, procedure, args):
+    """Helper for making a single-node static call
+
     """
     """
+    body = serializer.DumpJson(args, indent=False)
+    c = Client(procedure, body, utils.GetNodeDaemonPort())
+    c.ConnectNode(node)
+    return c.GetResults()[node]
+
+  @staticmethod
+  def _Compress(data):
+    """Compresses a string for transport over RPC.
+
+    Small amounts of data are not compressed.
+
+    @type data: str
+    @param data: Data
+    @rtype: tuple
+    @return: Encoded data to send
 
     """
 
     """
-    c = Client(procedure, args)
-    c.ConnectNode(c, node)
-    return c.GetResults().get(node, False)
+    # Small amounts of data are not compressed
+    if len(data) < 512:
+      return (constants.RPC_ENCODING_NONE, data)
+
+    # Compress with zlib and encode in base64
+    return (constants.RPC_ENCODING_ZLIB_BASE64,
+            base64.b64encode(zlib.compress(data, 3)))
+
+  #
+  # Begin RPC calls
+  #
 
   def call_volume_list(self, node_list, vg_name):
     """Gets the logical volumes present in a given volume group.
 
   def call_volume_list(self, node_list, vg_name):
     """Gets the logical volumes present in a given volume group.
@@ -419,8 +524,8 @@ class RpcRunner(object):
 
     @type node_list: list
     @param node_list: the list of nodes to query
 
     @type node_list: list
     @param node_list: the list of nodes to query
-    @type vgname: C{string}
-    @param vgname: the name of the volume group to ask for disk space
+    @type vg_name: C{string}
+    @param vg_name: the name of the volume group to ask for disk space
         information
     @type hypervisor_type: C{str}
     @param hypervisor_type: the name of the hypervisor to ask for
         information
     @type hypervisor_type: C{str}
     @param hypervisor_type: the name of the hypervisor to ask for
@@ -430,19 +535,17 @@ class RpcRunner(object):
     retux = self._MultiNodeCall(node_list, "node_info",
                                 [vg_name, hypervisor_type])
 
     retux = self._MultiNodeCall(node_list, "node_info",
                                 [vg_name, hypervisor_type])
 
-    for node_name in retux:
-      ret = retux.get(node_name, False)
-      if type(ret) != dict:
-        logging.error("could not connect to node %s", node_name)
-        ret = {}
-
-      utils.CheckDict(ret, {
-                        'memory_total' : '-',
-                        'memory_dom0' : '-',
-                        'memory_free' : '-',
-                        'vg_size' : 'node_unreachable',
-                        'vg_free' : '-',
-                      }, "call_node_info")
+    for result in retux.itervalues():
+      if result.failed or not isinstance(result.data, dict):
+        result.data = {}
+
+      utils.CheckDict(result.data, {
+        'memory_total' : '-',
+        'memory_dom0' : '-',
+        'memory_free' : '-',
+        'vg_size' : 'node_unreachable',
+        'vg_free' : '-',
+        }, "call_node_info")
     return retux
 
   def call_node_add(self, node, dsa, dsapub, rsa, rsapub, ssh, sshpub):
     return retux
 
   def call_node_add(self, node, dsa, dsapub, rsa, rsapub, ssh, sshpub):
@@ -607,7 +710,8 @@ class RpcRunner(object):
         to optimize the RPC speed
 
     """
         to optimize the RPC speed
 
     """
-    data = utils.ReadFile(file_name)
+    file_contents = utils.ReadFile(file_name)
+    data = cls._Compress(file_contents)
     st = os.stat(file_name)
     params = [file_name, data, st.st_mode, st.st_uid, st.st_gid,
               st.st_atime, st.st_mtime]
     st = os.stat(file_name)
     params = [file_name, data, st.st_mode, st.st_uid, st.st_gid,
               st.st_atime, st.st_mtime]
@@ -615,13 +719,13 @@ class RpcRunner(object):
                                     address_list=address_list)
 
   @classmethod
                                     address_list=address_list)
 
   @classmethod
-  def call_write_ssconf_files(cls, node_list):
+  def call_write_ssconf_files(cls, node_list, values):
     """Write ssconf files.
 
     This is a multi-node call.
 
     """
     """Write ssconf files.
 
     This is a multi-node call.
 
     """
-    return cls._StaticMultiNodeCall(node_list, "write_ssconf_files", [])
+    return cls._StaticMultiNodeCall(node_list, "write_ssconf_files", [values])
 
   def call_os_diagnose(self, node_list):
     """Request a diagnose of OS definitions.
 
   def call_os_diagnose(self, node_list):
     """Request a diagnose of OS definitions.
@@ -631,14 +735,11 @@ class RpcRunner(object):
     """
     result = self._MultiNodeCall(node_list, "os_diagnose", [])
 
     """
     result = self._MultiNodeCall(node_list, "os_diagnose", [])
 
-    new_result = {}
-    for node_name in result:
-      if result[node_name]:
-        nr = [objects.OS.FromDict(oss) for oss in result[node_name]]
-      else:
-        nr = []
-      new_result[node_name] = nr
-    return new_result
+    for node_result in result.values():
+      if not node_result.failed and node_result.data:
+        node_result.data = [objects.OS.FromDict(oss)
+                            for oss in node_result.data]
+    return result
 
   def call_os_get(self, node, name):
     """Returns an OS definition.
 
   def call_os_get(self, node, name):
     """Returns an OS definition.
@@ -647,10 +748,9 @@ class RpcRunner(object):
 
     """
     result = self._SingleNodeCall(node, "os_get", [name])
 
     """
     result = self._SingleNodeCall(node, "os_get", [name])
-    if isinstance(result, dict):
-      return objects.OS.FromDict(result)
-    else:
-      return result
+    if not result.failed and isinstance(result.data, dict):
+      result.data = objects.OS.FromDict(result.data)
+    return result
 
   def call_hooks_runner(self, node_list, hpath, phase, env):
     """Call the hooks runner.
 
   def call_hooks_runner(self, node_list, hpath, phase, env):
     """Call the hooks runner.
@@ -727,9 +827,9 @@ class RpcRunner(object):
 
     """
     result = self._SingleNodeCall(node, "export_info", [path])
 
     """
     result = self._SingleNodeCall(node, "export_info", [path])
-    if not result:
-      return result
-    return objects.SerializableConfigParser.Loads(str(result))
+    if not result.failed and result.data:
+      result.data = objects.SerializableConfigParser.Loads(str(result.data))
+    return result
 
   def call_instance_os_import(self, node, inst, src_node, src_images,
                               cluster_name):
 
   def call_instance_os_import(self, node, inst, src_node, src_images,
                               cluster_name):
@@ -778,6 +878,14 @@ class RpcRunner(object):
     """
     return self._MultiNodeCall(node_list, "node_volumes", [])
 
     """
     return self._MultiNodeCall(node_list, "node_volumes", [])
 
+  def call_node_demote_from_mc(self, node):
+    """Demote a node from the master candidate role.
+
+    This is a single-node call.
+
+    """
+    return self._SingleNodeCall(node, "node_demote_from_mc", [])
+
   def call_test_delay(self, node_list, duration):
     """Sleep for a fixed time on given node(s).
 
   def call_test_delay(self, node_list, duration):
     """Sleep for a fixed time on given node(s).
 
@@ -822,7 +930,7 @@ class RpcRunner(object):
 
     """
     return cls._StaticMultiNodeCall(node_list, "jobqueue_update",
 
     """
     return cls._StaticMultiNodeCall(node_list, "jobqueue_update",
-                                    [file_name, content],
+                                    [file_name, cls._Compress(content)],
                                     address_list=address_list)
 
   @classmethod
                                     address_list=address_list)
 
   @classmethod
@@ -835,13 +943,13 @@ class RpcRunner(object):
     return cls._StaticSingleNodeCall(node, "jobqueue_purge", [])
 
   @classmethod
     return cls._StaticSingleNodeCall(node, "jobqueue_purge", [])
 
   @classmethod
-  def call_jobqueue_rename(cls, node_list, address_list, old, new):
+  def call_jobqueue_rename(cls, node_list, address_list, rename):
     """Rename a job queue file.
 
     This is a multi-node call.
 
     """
     """Rename a job queue file.
 
     This is a multi-node call.
 
     """
-    return cls._StaticMultiNodeCall(node_list, "jobqueue_rename", [old, new],
+    return cls._StaticMultiNodeCall(node_list, "jobqueue_rename", rename,
                                     address_list=address_list)
 
   @classmethod
                                     address_list=address_list)
 
   @classmethod