Catch BlockDeviceError when starting instance
[ganeti-local] / lib / rpc.py
index 86a6a3f..a9619f9 100644 (file)
@@ -33,6 +33,8 @@
 import os
 import socket
 import logging
 import os
 import socket
 import logging
+import zlib
+import base64
 
 from ganeti import utils
 from ganeti import objects
 
 from ganeti import utils
 from ganeti import objects
@@ -81,12 +83,29 @@ class RpcResult(object):
   calls we can't raise an exception just because one one out of many
   failed, and therefore we use this class to encapsulate the result.
 
   calls we can't raise an exception just because one one out of many
   failed, and therefore we use this class to encapsulate the result.
 
+  @ivar data: the data payload, for successfull results, or None
+  @type failed: boolean
+  @ivar failed: whether the operation failed at RPC level (not
+      application level on the remote node)
+  @ivar call: the name of the RPC call
+  @ivar node: the name of the node to which we made the call
+  @ivar offline: whether the operation failed because the node was
+      offline, as opposed to actual failure; offline=True will always
+      imply failed=True, in order to allow simpler checking if
+      the user doesn't care about the exact failure mode
+
   """
   """
-  def __init__(self, data, failed=False, call=None, node=None):
+  def __init__(self, data=None, failed=False, offline=False,
+               call=None, node=None):
     self.failed = failed
     self.failed = failed
-    self.call = None
-    self.node = None
-    if failed:
+    self.offline = offline
+    self.call = call
+    self.node = node
+    if offline:
+      self.failed = True
+      self.error = "Node is marked offline"
+      self.data = None
+    elif failed:
       self.error = data
       self.data = None
     else:
       self.error = data
       self.data = None
     else:
@@ -239,14 +258,22 @@ class RpcRunner(object):
 
     """
     all_nodes = self._cfg.GetAllNodesInfo()
 
     """
     all_nodes = self._cfg.GetAllNodesInfo()
+    name_list = []
     addr_list = []
     addr_list = []
+    skip_dict = {}
     for node in node_list:
       if node in all_nodes:
     for node in node_list:
       if node in all_nodes:
+        if all_nodes[node].offline:
+          skip_dict[node] = RpcResult(node=node, offline=True)
+          continue
         val = all_nodes[node].primary_ip
       else:
         val = None
       addr_list.append(val)
         val = all_nodes[node].primary_ip
       else:
         val = None
       addr_list.append(val)
-    client.ConnectList(node_list, address_list=addr_list)
+      name_list.append(node)
+    if name_list:
+      client.ConnectList(name_list, address_list=addr_list)
+    return skip_dict
 
   def _ConnectNode(self, client, node):
     """Helper for computing one node's address.
 
   def _ConnectNode(self, client, node):
     """Helper for computing one node's address.
@@ -259,23 +286,22 @@ class RpcRunner(object):
     """
     node_info = self._cfg.GetNodeInfo(node)
     if node_info is not None:
     """
     node_info = self._cfg.GetNodeInfo(node)
     if node_info is not None:
+      if node_info.offline:
+        return RpcResult(node=node, offline=True)
       addr = node_info.primary_ip
     else:
       addr = None
     client.ConnectNode(node, address=addr)
 
       addr = node_info.primary_ip
     else:
       addr = None
     client.ConnectNode(node, address=addr)
 
-  def _MultiNodeCall(self, node_list, procedure, args,
-                     address_list=None):
+  def _MultiNodeCall(self, node_list, procedure, args):
     """Helper for making a multi-node call
 
     """
     body = serializer.DumpJson(args, indent=False)
     c = Client(procedure, body, self.port)
     """Helper for making a multi-node call
 
     """
     body = serializer.DumpJson(args, indent=False)
     c = Client(procedure, body, self.port)
-    if address_list is None:
-      self._ConnectList(c, node_list)
-    else:
-      c.ConnectList(node_list, address_list=address_list)
-    return c.GetResults()
+    skip_dict = self._ConnectList(c, node_list)
+    skip_dict.update(c.GetResults())
+    return skip_dict
 
   @classmethod
   def _StaticMultiNodeCall(cls, node_list, procedure, args,
 
   @classmethod
   def _StaticMultiNodeCall(cls, node_list, procedure, args,
@@ -294,8 +320,11 @@ class RpcRunner(object):
     """
     body = serializer.DumpJson(args, indent=False)
     c = Client(procedure, body, self.port)
     """
     body = serializer.DumpJson(args, indent=False)
     c = Client(procedure, body, self.port)
-    self._ConnectNode(c, node)
-    return c.GetResults().get(node, False)
+    result = self._ConnectNode(c, node)
+    if result is None:
+      # we did connect, node is not offline
+      result = c.GetResults()[node]
+    return result
 
   @classmethod
   def _StaticSingleNodeCall(cls, node, procedure, args):
 
   @classmethod
   def _StaticSingleNodeCall(cls, node, procedure, args):
@@ -305,7 +334,27 @@ class RpcRunner(object):
     body = serializer.DumpJson(args, indent=False)
     c = Client(procedure, body, utils.GetNodeDaemonPort())
     c.ConnectNode(node)
     body = serializer.DumpJson(args, indent=False)
     c = Client(procedure, body, utils.GetNodeDaemonPort())
     c.ConnectNode(node)
-    return c.GetResults().get(node, False)
+    return c.GetResults()[node]
+
+  @staticmethod
+  def _Compress(data):
+    """Compresses a string for transport over RPC.
+
+    Small amounts of data are not compressed.
+
+    @type data: str
+    @param data: Data
+    @rtype: tuple
+    @return: Encoded data to send
+
+    """
+    # Small amounts of data are not compressed
+    if len(data) < 512:
+      return (constants.RPC_ENCODING_NONE, data)
+
+    # Compress with zlib and encode in base64
+    return (constants.RPC_ENCODING_ZLIB_BASE64,
+            base64.b64encode(zlib.compress(data, 3)))
 
   #
   # Begin RPC calls
 
   #
   # Begin RPC calls
@@ -475,8 +524,8 @@ class RpcRunner(object):
 
     @type node_list: list
     @param node_list: the list of nodes to query
 
     @type node_list: list
     @param node_list: the list of nodes to query
-    @type vgname: C{string}
-    @param vgname: the name of the volume group to ask for disk space
+    @type vg_name: C{string}
+    @param vg_name: the name of the volume group to ask for disk space
         information
     @type hypervisor_type: C{str}
     @param hypervisor_type: the name of the hypervisor to ask for
         information
     @type hypervisor_type: C{str}
     @param hypervisor_type: the name of the hypervisor to ask for
@@ -661,7 +710,8 @@ class RpcRunner(object):
         to optimize the RPC speed
 
     """
         to optimize the RPC speed
 
     """
-    data = utils.ReadFile(file_name)
+    file_contents = utils.ReadFile(file_name)
+    data = cls._Compress(file_contents)
     st = os.stat(file_name)
     params = [file_name, data, st.st_mode, st.st_uid, st.st_gid,
               st.st_atime, st.st_mtime]
     st = os.stat(file_name)
     params = [file_name, data, st.st_mode, st.st_uid, st.st_gid,
               st.st_atime, st.st_mtime]
@@ -685,7 +735,7 @@ class RpcRunner(object):
     """
     result = self._MultiNodeCall(node_list, "os_diagnose", [])
 
     """
     result = self._MultiNodeCall(node_list, "os_diagnose", [])
 
-    for node_name, node_result in result.iteritems():
+    for node_result in result.values():
       if not node_result.failed and node_result.data:
         node_result.data = [objects.OS.FromDict(oss)
                             for oss in node_result.data]
       if not node_result.failed and node_result.data:
         node_result.data = [objects.OS.FromDict(oss)
                             for oss in node_result.data]
@@ -880,7 +930,7 @@ class RpcRunner(object):
 
     """
     return cls._StaticMultiNodeCall(node_list, "jobqueue_update",
 
     """
     return cls._StaticMultiNodeCall(node_list, "jobqueue_update",
-                                    [file_name, content],
+                                    [file_name, cls._Compress(content)],
                                     address_list=address_list)
 
   @classmethod
                                     address_list=address_list)
 
   @classmethod
@@ -893,13 +943,13 @@ class RpcRunner(object):
     return cls._StaticSingleNodeCall(node, "jobqueue_purge", [])
 
   @classmethod
     return cls._StaticSingleNodeCall(node, "jobqueue_purge", [])
 
   @classmethod
-  def call_jobqueue_rename(cls, node_list, address_list, old, new):
+  def call_jobqueue_rename(cls, node_list, address_list, rename):
     """Rename a job queue file.
 
     This is a multi-node call.
 
     """
     """Rename a job queue file.
 
     This is a multi-node call.
 
     """
-    return cls._StaticMultiNodeCall(node_list, "jobqueue_rename", [old, new],
+    return cls._StaticMultiNodeCall(node_list, "jobqueue_rename", rename,
                                     address_list=address_list)
 
   @classmethod
                                     address_list=address_list)
 
   @classmethod