ensure-dirs: Set permissions on queue lock file
[ganeti-local] / lib / rpc.py
index 3e1e632..c93c16d 100644 (file)
@@ -1,7 +1,7 @@
 #
 #
 
-# Copyright (C) 2006, 2007, 2010 Google Inc.
+# Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011 Google Inc.
 #
 # This program is free software; you can redistribute it and/or modify
 # it under the terms of the GNU General Public License as published by
@@ -45,6 +45,7 @@ from ganeti import constants
 from ganeti import errors
 from ganeti import netutils
 from ganeti import ssconf
+from ganeti import runtime
 
 # pylint has a bug here, doesn't see this import
 import ganeti.http.client  # pylint: disable-msg=W0611
@@ -55,6 +56,7 @@ _RPC_CONNECT_TIMEOUT = 5
 
 _RPC_CLIENT_HEADERS = [
   "Content-type: %s" % http.HTTP_APP_JSON,
+  "Expect:",
   ]
 
 # Various time constants for the timeout table
@@ -118,7 +120,12 @@ def _ConfigRpcCurl(curl):
   curl.setopt(pycurl.CONNECTTIMEOUT, _RPC_CONNECT_TIMEOUT)
 
 
-class _RpcThreadLocal(threading.local):
+# Aliasing this module avoids the following warning by epydoc: "Warning: No
+# information available for ganeti.rpc._RpcThreadLocal's base threading.local"
+_threading = threading
+
+
+class _RpcThreadLocal(_threading.local):
   def GetHttpClientPool(self):
     """Returns a per-thread HTTP client pool.
 
@@ -134,6 +141,10 @@ class _RpcThreadLocal(threading.local):
     return pool
 
 
+# Remove module alias (see above)
+del _threading
+
+
 _thread_local = _RpcThreadLocal()
 
 
@@ -216,12 +227,9 @@ class RpcResult(object):
         self.fail_msg = None
         self.payload = data[1]
 
-    assert hasattr(self, "call")
-    assert hasattr(self, "data")
-    assert hasattr(self, "fail_msg")
-    assert hasattr(self, "node")
-    assert hasattr(self, "offline")
-    assert hasattr(self, "payload")
+    for attr_name in ["call", "data", "fail_msg",
+                      "node", "offline", "payload"]:
+      assert hasattr(self, attr_name), "Missing attribute %s" % attr_name
 
   @staticmethod
   def _EnsureErr(val):
@@ -266,8 +274,8 @@ def _AddressLookup(node_list,
   @param node_list: List of node names
   @type ssc: class
   @param ssc: SimpleStore class that is used to obtain node->ip mappings
-  @type lookup_fn: callable
-  @param lookup_fn: function use to do NS lookup
+  @type nslookup_fn: callable
+  @param nslookup_fn: function use to do NS lookup
   @rtype: list of addresses and/or None's
   @returns: List of corresponding addresses, if found
 
@@ -432,7 +440,7 @@ class RpcRunner(object):
     @type bep: dict or None
     @param bep: a dictionary with overridden backend parameters
     @type osp: dict or None
-    @param osp: a dictionary with overriden os parameters
+    @param osp: a dictionary with overridden os parameters
     @rtype: dict
     @return: the instance dict, with the hvparams filled with the
         cluster defaults
@@ -583,6 +591,15 @@ class RpcRunner(object):
   #
 
   @_RpcTimeout(_TMO_URGENT)
+  def call_bdev_sizes(self, node_list, devices):
+    """Gets the sizes of requested block devices present on a node
+
+    This is a multi-node call.
+
+    """
+    return self._MultiNodeCall(node_list, "bdev_sizes", [devices])
+
+  @_RpcTimeout(_TMO_URGENT)
   def call_lv_list(self, node_list, vg_name):
     """Gets the logical volumes present in a given volume group.
 
@@ -644,14 +661,14 @@ class RpcRunner(object):
     return self._SingleNodeCall(node, "bridges_exist", [bridges_list])
 
   @_RpcTimeout(_TMO_NORMAL)
-  def call_instance_start(self, node, instance, hvp, bep):
+  def call_instance_start(self, node, instance, hvp, bep, startup_paused):
     """Starts an instance.
 
     This is a single-node call.
 
     """
     idict = self._InstDict(instance, hvp=hvp, bep=bep)
-    return self._SingleNodeCall(node, "instance_start", [idict])
+    return self._SingleNodeCall(node, "instance_start", [idict, startup_paused])
 
   @_RpcTimeout(_TMO_NORMAL)
   def call_instance_shutdown(self, node, instance, timeout):
@@ -751,14 +768,15 @@ class RpcRunner(object):
                                  shutdown_timeout])
 
   @_RpcTimeout(_TMO_1DAY)
-  def call_instance_os_add(self, node, inst, reinstall, debug):
+  def call_instance_os_add(self, node, inst, reinstall, debug, osparams=None):
     """Installs an OS on the given instance.
 
     This is a single-node call.
 
     """
     return self._SingleNodeCall(node, "instance_os_add",
-                                [self._InstDict(inst), reinstall, debug])
+                                [self._InstDict(inst, osp=osparams),
+                                 reinstall, debug])
 
   @_RpcTimeout(_TMO_SLOW)
   def call_instance_run_rename(self, node, inst, old_name, debug):
@@ -874,6 +892,22 @@ class RpcRunner(object):
                                [vg_name, hypervisor_type])
 
   @_RpcTimeout(_TMO_NORMAL)
+  def call_etc_hosts_modify(self, node, mode, name, ip):
+    """Modify hosts file with name
+
+    @type node: string
+    @param node: The node to call
+    @type mode: string
+    @param mode: The mode to operate. Currently "add" or "remove"
+    @type name: string
+    @param name: The host name to be modified
+    @type ip: string
+    @param ip: The ip of the entry (just valid if mode is "add")
+
+    """
+    return self._SingleNodeCall(node, "etc_hosts_modify", [mode, name, ip])
+
+  @_RpcTimeout(_TMO_NORMAL)
   def call_node_verify(self, node_list, checkdict, cluster_name):
     """Request verification of given parameters.
 
@@ -935,6 +969,16 @@ class RpcRunner(object):
     return self._SingleNodeCall(node, "blockdev_create",
                                 [bdev.ToDict(), size, owner, on_primary, info])
 
+  @_RpcTimeout(_TMO_SLOW)
+  def call_blockdev_wipe(self, node, bdev, offset, size):
+    """Request wipe at given offset with given size of a block device.
+
+    This is a single-node call.
+
+    """
+    return self._SingleNodeCall(node, "blockdev_wipe",
+                                [bdev.ToDict(), offset, size])
+
   @_RpcTimeout(_TMO_NORMAL)
   def call_blockdev_remove(self, node, bdev):
     """Request removal of a given block device.
@@ -955,14 +999,24 @@ class RpcRunner(object):
                                 [(d.ToDict(), uid) for d, uid in devlist])
 
   @_RpcTimeout(_TMO_NORMAL)
-  def call_blockdev_assemble(self, node, disk, owner, on_primary):
+  def call_blockdev_pause_resume_sync(self, node, disks, pause):
+    """Request a pause/resume of given block device.
+
+    This is a single-node call.
+
+    """
+    return self._SingleNodeCall(node, "blockdev_pause_resume_sync",
+                                [[bdev.ToDict() for bdev in disks], pause])
+
+  @_RpcTimeout(_TMO_NORMAL)
+  def call_blockdev_assemble(self, node, disk, owner, on_primary, idx):
     """Request assembling of a given block device.
 
     This is a single-node call.
 
     """
     return self._SingleNodeCall(node, "blockdev_assemble",
-                                [disk.ToDict(), owner, on_primary])
+                                [disk.ToDict(), owner, on_primary, idx])
 
   @_RpcTimeout(_TMO_NORMAL)
   def call_blockdev_shutdown(self, node, disk):
@@ -1010,6 +1064,26 @@ class RpcRunner(object):
     return result
 
   @_RpcTimeout(_TMO_NORMAL)
+  def call_blockdev_getmirrorstatus_multi(self, node_list, node_disks):
+    """Request status of (mirroring) devices from multiple nodes.
+
+    This is a multi-node call.
+
+    """
+    result = self._MultiNodeCall(node_list, "blockdev_getmirrorstatus_multi",
+                                 [dict((name, [dsk.ToDict() for dsk in disks])
+                                       for name, disks in node_disks.items())])
+    for nres in result.values():
+      if nres.fail_msg:
+        continue
+
+      for idx, (success, status) in enumerate(nres.payload):
+        if success:
+          nres.payload[idx] = (success, objects.BlockDevStatus.FromDict(status))
+
+    return result
+
+  @_RpcTimeout(_TMO_NORMAL)
   def call_blockdev_find(self, node, disk):
     """Request identification of a given block device.
 
@@ -1032,7 +1106,7 @@ class RpcRunner(object):
     return self._SingleNodeCall(node, "blockdev_close", params)
 
   @_RpcTimeout(_TMO_NORMAL)
-  def call_blockdev_getsizes(self, node, disks):
+  def call_blockdev_getsize(self, node, disks):
     """Returns the size of the given disks.
 
     This is a single-node call.
@@ -1104,8 +1178,9 @@ class RpcRunner(object):
     file_contents = utils.ReadFile(file_name)
     data = cls._Compress(file_contents)
     st = os.stat(file_name)
-    params = [file_name, data, st.st_mode, st.st_uid, st.st_gid,
-              st.st_atime, st.st_mtime]
+    getents = runtime.GetEnts()
+    params = [file_name, data, st.st_mode, getents.LookupUid(st.st_uid),
+              getents.LookupGid(st.st_gid), st.st_atime, st.st_mtime]
     return cls._StaticMultiNodeCall(node_list, "upload_file", params,
                                     address_list=address_list)
 
@@ -1119,6 +1194,16 @@ class RpcRunner(object):
     """
     return cls._StaticMultiNodeCall(node_list, "write_ssconf_files", [values])
 
+  @_RpcTimeout(_TMO_NORMAL)
+  def call_run_oob(self, node, oob_program, command, remote_node, timeout):
+    """Runs OOB.
+
+    This is a single-node call.
+
+    """
+    return self._SingleNodeCall(node, "run_oob", [oob_program, command,
+                                                  remote_node, timeout])
+
   @_RpcTimeout(_TMO_FAST)
   def call_os_diagnose(self, node_list):
     """Request a diagnose of OS definitions.
@@ -1178,14 +1263,14 @@ class RpcRunner(object):
     return self._SingleNodeCall(node, "iallocator_runner", [name, idata])
 
   @_RpcTimeout(_TMO_NORMAL)
-  def call_blockdev_grow(self, node, cf_bdev, amount):
+  def call_blockdev_grow(self, node, cf_bdev, amount, dryrun):
     """Request a snapshot of the given block device.
 
     This is a single-node call.
 
     """
     return self._SingleNodeCall(node, "blockdev_grow",
-                                [cf_bdev.ToDict(), amount])
+                                [cf_bdev.ToDict(), amount, dryrun])
 
   @_RpcTimeout(_TMO_1DAY)
   def call_blockdev_export(self, node, cf_bdev,
@@ -1337,7 +1422,7 @@ class RpcRunner(object):
                                 [old_file_storage_dir, new_file_storage_dir])
 
   @classmethod
-  @_RpcTimeout(_TMO_FAST)
+  @_RpcTimeout(_TMO_URGENT)
   def call_jobqueue_update(cls, node_list, address_list, file_name, content):
     """Update job queue.
 
@@ -1359,7 +1444,7 @@ class RpcRunner(object):
     return cls._StaticSingleNodeCall(node, "jobqueue_purge", [])
 
   @classmethod
-  @_RpcTimeout(_TMO_FAST)
+  @_RpcTimeout(_TMO_URGENT)
   def call_jobqueue_rename(cls, node_list, address_list, rename):
     """Rename a job queue file.
 
@@ -1413,7 +1498,8 @@ class RpcRunner(object):
     return self._SingleNodeCall(node, "x509_cert_remove", [name])
 
   @_RpcTimeout(_TMO_NORMAL)
-  def call_import_start(self, node, opts, instance, dest, dest_args):
+  def call_import_start(self, node, opts, instance, component,
+                        dest, dest_args):
     """Starts a listener for an import.
 
     This is a single-node call.
@@ -1422,16 +1508,18 @@ class RpcRunner(object):
     @param node: Node name
     @type instance: C{objects.Instance}
     @param instance: Instance object
+    @type component: string
+    @param component: which part of the instance is being imported
 
     """
     return self._SingleNodeCall(node, "import_start",
                                 [opts.ToDict(),
-                                 self._InstDict(instance), dest,
+                                 self._InstDict(instance), component, dest,
                                  _EncodeImportExportIO(dest, dest_args)])
 
   @_RpcTimeout(_TMO_NORMAL)
   def call_export_start(self, node, opts, host, port,
-                        instance, source, source_args):
+                        instance, component, source, source_args):
     """Starts an export daemon.
 
     This is a single-node call.
@@ -1440,11 +1528,14 @@ class RpcRunner(object):
     @param node: Node name
     @type instance: C{objects.Instance}
     @param instance: Instance object
+    @type component: string
+    @param component: which part of the instance is being imported
 
     """
     return self._SingleNodeCall(node, "export_start",
                                 [opts.ToDict(), host, port,
-                                 self._InstDict(instance), source,
+                                 self._InstDict(instance),
+                                 component, source,
                                  _EncodeImportExportIO(source, source_args)])
 
   @_RpcTimeout(_TMO_FAST)