Fix Filehandler / FileHandler typo
[ganeti-local] / lib / rpc.py
index 289b9fb..6de365b 100644 (file)
@@ -42,7 +42,8 @@ from ganeti import serializer
 from ganeti import constants
 from ganeti import errors
 
-import ganeti.http.client
+# pylint has a bug here, doesn't see this import
+import ganeti.http.client  # pylint: disable-msg=W0611
 
 
 # Module level variable
@@ -55,10 +56,12 @@ def Init():
   Must be called before using any RPC function.
 
   """
-  global _http_manager
+  global _http_manager # pylint: disable-msg=W0603
 
   assert not _http_manager, "RPC module initialized more than once"
 
+  http.InitSsl()
+
   _http_manager = http.client.HttpClientManager()
 
 
@@ -68,7 +71,7 @@ def Shutdown():
   Must be called before quitting the program.
 
   """
-  global _http_manager
+  global _http_manager # pylint: disable-msg=W0603
 
   if _http_manager:
     _http_manager.Shutdown()
@@ -83,9 +86,6 @@ class RpcResult(object):
   failed, and therefore we use this class to encapsulate the result.
 
   @ivar data: the data payload, for successful results, or None
-  @type failed: boolean
-  @ivar failed: whether the operation failed at transport level (not
-      application level on the remote node)
   @ivar call: the name of the RPC call
   @ivar node: the name of the node to which we made the call
   @ivar offline: whether the operation failed because the node was
@@ -97,12 +97,11 @@ class RpcResult(object):
   """
   def __init__(self, data=None, failed=False, offline=False,
                call=None, node=None):
-    self.failed = failed
     self.offline = offline
     self.call = call
     self.node = node
+
     if offline:
-      self.failed = True
       self.fail_msg = "Node is marked offline"
       self.data = self.payload = None
     elif failed:
@@ -113,16 +112,26 @@ class RpcResult(object):
       if not isinstance(self.data, (tuple, list)):
         self.fail_msg = ("RPC layer error: invalid result type (%s)" %
                          type(self.data))
+        self.payload = None
       elif len(data) != 2:
         self.fail_msg = ("RPC layer error: invalid result length (%d), "
                          "expected 2" % len(self.data))
+        self.payload = None
       elif not self.data[0]:
         self.fail_msg = self._EnsureErr(self.data[1])
+        self.payload = None
       else:
         # finally success
         self.fail_msg = None
         self.payload = data[1]
 
+    assert hasattr(self, "call")
+    assert hasattr(self, "data")
+    assert hasattr(self, "fail_msg")
+    assert hasattr(self, "node")
+    assert hasattr(self, "offline")
+    assert hasattr(self, "payload")
+
   @staticmethod
   def _EnsureErr(val):
     """Helper to ensure we return a 'True' value for error."""
@@ -131,7 +140,7 @@ class RpcResult(object):
     else:
       return "No error information"
 
-  def Raise(self, msg, prereq=False):
+  def Raise(self, msg, prereq=False, ecode=None):
     """If the result has failed, raise an OpExecError.
 
     This is used so that LU code doesn't have to check for each
@@ -150,15 +159,11 @@ class RpcResult(object):
       ec = errors.OpPrereqError
     else:
       ec = errors.OpExecError
-    raise ec(msg)
-
-  def RemoteFailMsg(self):
-    """Check if the remote procedure failed.
-
-    @return: the fail_msg attribute
-
-    """
-    return self.fail_msg
+    if ecode is not None:
+      args = (msg, prereq)
+    else:
+      args = (msg, )
+    raise ec(*args) # pylint: disable-msg=W0142
 
 
 class Client:
@@ -180,8 +185,8 @@ class Client:
     self.nc = {}
 
     self._ssl_params = \
-      http.HttpSslParams(ssl_key_path=constants.SSL_CERT_FILE,
-                         ssl_cert_path=constants.SSL_CERT_FILE)
+      http.HttpSslParams(ssl_key_path=constants.NODED_CERT_FILE,
+                         ssl_cert_path=constants.NODED_CERT_FILE)
 
   def ConnectList(self, node_list, address_list=None):
     """Add a list of nodes to the target nodes.
@@ -450,6 +455,15 @@ class RpcRunner(object):
     return self._SingleNodeCall(node, "storage_modify",
                                 [su_name, su_args, name, changes])
 
+  def call_storage_execute(self, node, su_name, su_args, name, op):
+    """Executes an operation on a storage unit.
+
+    This is a single-node call.
+
+    """
+    return self._SingleNodeCall(node, "storage_execute",
+                                [su_name, su_args, name, op])
+
   def call_bridges_exist(self, node, bridges_list):
     """Checks if a node has all the bridges given.
 
@@ -471,14 +485,14 @@ class RpcRunner(object):
     idict = self._InstDict(instance, hvp=hvp, bep=bep)
     return self._SingleNodeCall(node, "instance_start", [idict])
 
-  def call_instance_shutdown(self, node, instance):
+  def call_instance_shutdown(self, node, instance, timeout):
     """Stops an instance.
 
     This is a single-node call.
 
     """
     return self._SingleNodeCall(node, "instance_shutdown",
-                                [self._InstDict(instance)])
+                                [self._InstDict(instance), timeout])
 
   def call_migration_info(self, node, instance):
     """Gather the information necessary to prepare an instance migration.
@@ -552,32 +566,33 @@ class RpcRunner(object):
     return self._SingleNodeCall(node, "instance_migrate",
                                 [self._InstDict(instance), target, live])
 
-  def call_instance_reboot(self, node, instance, reboot_type):
+  def call_instance_reboot(self, node, inst, reboot_type, shutdown_timeout):
     """Reboots an instance.
 
     This is a single-node call.
 
     """
     return self._SingleNodeCall(node, "instance_reboot",
-                                [self._InstDict(instance), reboot_type])
+                                [self._InstDict(inst), reboot_type,
+                                 shutdown_timeout])
 
-  def call_instance_os_add(self, node, inst, reinstall):
+  def call_instance_os_add(self, node, inst, reinstall, debug):
     """Installs an OS on the given instance.
 
     This is a single-node call.
 
     """
     return self._SingleNodeCall(node, "instance_os_add",
-                                [self._InstDict(inst), reinstall])
+                                [self._InstDict(inst), reinstall, debug])
 
-  def call_instance_run_rename(self, node, inst, old_name):
+  def call_instance_run_rename(self, node, inst, old_name, debug):
     """Run the OS rename script for an instance.
 
     This is a single-node call.
 
     """
     return self._SingleNodeCall(node, "instance_run_rename",
-                                [self._InstDict(inst), old_name])
+                                [self._InstDict(inst), old_name, debug])
 
   def call_instance_info(self, node, instance, hname):
     """Returns information about a single instance.
@@ -722,13 +737,14 @@ class RpcRunner(object):
     # TODO: should this method query down nodes?
     return cls._StaticMultiNodeCall(node_list, "master_info", [])
 
-  def call_version(self, node_list):
+  @classmethod
+  def call_version(cls, node_list):
     """Query node version.
 
     This is a multi-node call.
 
     """
-    return self._MultiNodeCall(node_list, "version", [])
+    return cls._StaticMultiNodeCall(node_list, "version", [])
 
   def call_blockdev_create(self, node, bdev, size, owner, on_primary, info):
     """Request creation of a given block device.
@@ -801,7 +817,7 @@ class RpcRunner(object):
     """
     result = self._SingleNodeCall(node, "blockdev_getmirrorstatus",
                                   [dsk.ToDict() for dsk in disks])
-    if not result.failed:
+    if not result.fail_msg:
       result.payload = [objects.BlockDevStatus.FromDict(i)
                         for i in result.payload]
     return result
@@ -813,7 +829,7 @@ class RpcRunner(object):
 
     """
     result = self._SingleNodeCall(node, "blockdev_find", [disk.ToDict()])
-    if not result.failed and result.payload is not None:
+    if not result.fail_msg and result.payload is not None:
       result.payload = objects.BlockDevStatus.FromDict(result.payload)
     return result
 
@@ -826,6 +842,15 @@ class RpcRunner(object):
     params = [instance_name, [cf.ToDict() for cf in disks]]
     return self._SingleNodeCall(node, "blockdev_close", params)
 
+  def call_blockdev_getsizes(self, node, disks):
+    """Returns the size of the given disks.
+
+    This is a single-node call.
+
+    """
+    params = [[cf.ToDict() for cf in disks]]
+    return self._SingleNodeCall(node, "blockdev_getsize", params)
+
   def call_drbd_disconnect_net(self, node_list, nodes_ip, disks):
     """Disconnects the network of the given drbd devices.
 
@@ -905,8 +930,8 @@ class RpcRunner(object):
 
     """
     result = self._SingleNodeCall(node, "os_get", [name])
-    if not result.failed and isinstance(result.data, dict):
-      result.data = objects.OS.FromDict(result.data)
+    if not result.fail_msg and isinstance(result.payload, dict):
+      result.payload = objects.OS.FromDict(result.payload)
     return result
 
   def call_hooks_runner(self, node_list, hpath, phase, env):
@@ -943,6 +968,17 @@ class RpcRunner(object):
     return self._SingleNodeCall(node, "blockdev_grow",
                                 [cf_bdev.ToDict(), amount])
 
+  def call_blockdev_export(self, node, cf_bdev,
+                           dest_node, dest_path, cluster_name):
+    """Export a given disk to another node.
+
+    This is a single-node call.
+
+    """
+    return self._SingleNodeCall(node, "blockdev_export",
+                                [cf_bdev.ToDict(), dest_node, dest_path,
+                                 cluster_name])
+
   def call_blockdev_snapshot(self, node, cf_bdev):
     """Request a snapshot of the given block device.
 
@@ -952,7 +988,7 @@ class RpcRunner(object):
     return self._SingleNodeCall(node, "blockdev_snapshot", [cf_bdev.ToDict()])
 
   def call_snapshot_export(self, node, snap_bdev, dest_node, instance,
-                           cluster_name, idx):
+                           cluster_name, idx, debug):
     """Request the export of a given snapshot.
 
     This is a single-node call.
@@ -960,7 +996,8 @@ class RpcRunner(object):
     """
     return self._SingleNodeCall(node, "snapshot_export",
                                 [snap_bdev.ToDict(), dest_node,
-                                 self._InstDict(instance), cluster_name, idx])
+                                 self._InstDict(instance), cluster_name,
+                                 idx, debug])
 
   def call_finalize_export(self, node, instance, snap_disks):
     """Request the completion of an export operation.
@@ -989,7 +1026,7 @@ class RpcRunner(object):
     return self._SingleNodeCall(node, "export_info", [path])
 
   def call_instance_os_import(self, node, inst, src_node, src_images,
-                              cluster_name):
+                              cluster_name, debug):
     """Request the import of a backup into an instance.
 
     This is a single-node call.
@@ -997,7 +1034,7 @@ class RpcRunner(object):
     """
     return self._SingleNodeCall(node, "instance_os_import",
                                 [self._InstDict(inst), src_node, src_images,
-                                 cluster_name])
+                                 cluster_name, debug])
 
   def call_export_list(self, node_list):
     """Gets the stored exports list.
@@ -1016,7 +1053,7 @@ class RpcRunner(object):
     return self._SingleNodeCall(node, "export_remove", [export])
 
   @classmethod
-  def call_node_leave_cluster(cls, node):
+  def call_node_leave_cluster(cls, node, modify_ssh_setup):
     """Requests a node to clean the cluster information it has.
 
     This will remove the configuration information from the ganeti data
@@ -1025,7 +1062,8 @@ class RpcRunner(object):
     This is a single-node call.
 
     """
-    return cls._StaticSingleNodeCall(node, "node_leave_cluster", [])
+    return cls._StaticSingleNodeCall(node, "node_leave_cluster",
+                                     [modify_ssh_setup])
 
   def call_node_volumes(self, node_list):
     """Gets all volumes on node(s).