X-Git-Url: https://code.grnet.gr/git/ganeti-local/blobdiff_plain/e8d61457f16974cbf0d77479f9d06f4c6345a02e..247ee81f3e3970506f082530f1eaf8f407bfa85f:/lib/rpc.py diff --git a/lib/rpc.py b/lib/rpc.py index 3e1e632..c93c16d 100644 --- a/lib/rpc.py +++ b/lib/rpc.py @@ -1,7 +1,7 @@ # # -# Copyright (C) 2006, 2007, 2010 Google Inc. +# Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011 Google Inc. # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by @@ -45,6 +45,7 @@ from ganeti import constants from ganeti import errors from ganeti import netutils from ganeti import ssconf +from ganeti import runtime # pylint has a bug here, doesn't see this import import ganeti.http.client # pylint: disable-msg=W0611 @@ -55,6 +56,7 @@ _RPC_CONNECT_TIMEOUT = 5 _RPC_CLIENT_HEADERS = [ "Content-type: %s" % http.HTTP_APP_JSON, + "Expect:", ] # Various time constants for the timeout table @@ -118,7 +120,12 @@ def _ConfigRpcCurl(curl): curl.setopt(pycurl.CONNECTTIMEOUT, _RPC_CONNECT_TIMEOUT) -class _RpcThreadLocal(threading.local): +# Aliasing this module avoids the following warning by epydoc: "Warning: No +# information available for ganeti.rpc._RpcThreadLocal's base threading.local" +_threading = threading + + +class _RpcThreadLocal(_threading.local): def GetHttpClientPool(self): """Returns a per-thread HTTP client pool. @@ -134,6 +141,10 @@ class _RpcThreadLocal(threading.local): return pool +# Remove module alias (see above) +del _threading + + _thread_local = _RpcThreadLocal() @@ -216,12 +227,9 @@ class RpcResult(object): self.fail_msg = None self.payload = data[1] - assert hasattr(self, "call") - assert hasattr(self, "data") - assert hasattr(self, "fail_msg") - assert hasattr(self, "node") - assert hasattr(self, "offline") - assert hasattr(self, "payload") + for attr_name in ["call", "data", "fail_msg", + "node", "offline", "payload"]: + assert hasattr(self, attr_name), "Missing attribute %s" % attr_name @staticmethod def _EnsureErr(val): @@ -266,8 +274,8 @@ def _AddressLookup(node_list, @param node_list: List of node names @type ssc: class @param ssc: SimpleStore class that is used to obtain node->ip mappings - @type lookup_fn: callable - @param lookup_fn: function use to do NS lookup + @type nslookup_fn: callable + @param nslookup_fn: function use to do NS lookup @rtype: list of addresses and/or None's @returns: List of corresponding addresses, if found @@ -432,7 +440,7 @@ class RpcRunner(object): @type bep: dict or None @param bep: a dictionary with overridden backend parameters @type osp: dict or None - @param osp: a dictionary with overriden os parameters + @param osp: a dictionary with overridden os parameters @rtype: dict @return: the instance dict, with the hvparams filled with the cluster defaults @@ -583,6 +591,15 @@ class RpcRunner(object): # @_RpcTimeout(_TMO_URGENT) + def call_bdev_sizes(self, node_list, devices): + """Gets the sizes of requested block devices present on a node + + This is a multi-node call. + + """ + return self._MultiNodeCall(node_list, "bdev_sizes", [devices]) + + @_RpcTimeout(_TMO_URGENT) def call_lv_list(self, node_list, vg_name): """Gets the logical volumes present in a given volume group. @@ -644,14 +661,14 @@ class RpcRunner(object): return self._SingleNodeCall(node, "bridges_exist", [bridges_list]) @_RpcTimeout(_TMO_NORMAL) - def call_instance_start(self, node, instance, hvp, bep): + def call_instance_start(self, node, instance, hvp, bep, startup_paused): """Starts an instance. This is a single-node call. """ idict = self._InstDict(instance, hvp=hvp, bep=bep) - return self._SingleNodeCall(node, "instance_start", [idict]) + return self._SingleNodeCall(node, "instance_start", [idict, startup_paused]) @_RpcTimeout(_TMO_NORMAL) def call_instance_shutdown(self, node, instance, timeout): @@ -751,14 +768,15 @@ class RpcRunner(object): shutdown_timeout]) @_RpcTimeout(_TMO_1DAY) - def call_instance_os_add(self, node, inst, reinstall, debug): + def call_instance_os_add(self, node, inst, reinstall, debug, osparams=None): """Installs an OS on the given instance. This is a single-node call. """ return self._SingleNodeCall(node, "instance_os_add", - [self._InstDict(inst), reinstall, debug]) + [self._InstDict(inst, osp=osparams), + reinstall, debug]) @_RpcTimeout(_TMO_SLOW) def call_instance_run_rename(self, node, inst, old_name, debug): @@ -874,6 +892,22 @@ class RpcRunner(object): [vg_name, hypervisor_type]) @_RpcTimeout(_TMO_NORMAL) + def call_etc_hosts_modify(self, node, mode, name, ip): + """Modify hosts file with name + + @type node: string + @param node: The node to call + @type mode: string + @param mode: The mode to operate. Currently "add" or "remove" + @type name: string + @param name: The host name to be modified + @type ip: string + @param ip: The ip of the entry (just valid if mode is "add") + + """ + return self._SingleNodeCall(node, "etc_hosts_modify", [mode, name, ip]) + + @_RpcTimeout(_TMO_NORMAL) def call_node_verify(self, node_list, checkdict, cluster_name): """Request verification of given parameters. @@ -935,6 +969,16 @@ class RpcRunner(object): return self._SingleNodeCall(node, "blockdev_create", [bdev.ToDict(), size, owner, on_primary, info]) + @_RpcTimeout(_TMO_SLOW) + def call_blockdev_wipe(self, node, bdev, offset, size): + """Request wipe at given offset with given size of a block device. + + This is a single-node call. + + """ + return self._SingleNodeCall(node, "blockdev_wipe", + [bdev.ToDict(), offset, size]) + @_RpcTimeout(_TMO_NORMAL) def call_blockdev_remove(self, node, bdev): """Request removal of a given block device. @@ -955,14 +999,24 @@ class RpcRunner(object): [(d.ToDict(), uid) for d, uid in devlist]) @_RpcTimeout(_TMO_NORMAL) - def call_blockdev_assemble(self, node, disk, owner, on_primary): + def call_blockdev_pause_resume_sync(self, node, disks, pause): + """Request a pause/resume of given block device. + + This is a single-node call. + + """ + return self._SingleNodeCall(node, "blockdev_pause_resume_sync", + [[bdev.ToDict() for bdev in disks], pause]) + + @_RpcTimeout(_TMO_NORMAL) + def call_blockdev_assemble(self, node, disk, owner, on_primary, idx): """Request assembling of a given block device. This is a single-node call. """ return self._SingleNodeCall(node, "blockdev_assemble", - [disk.ToDict(), owner, on_primary]) + [disk.ToDict(), owner, on_primary, idx]) @_RpcTimeout(_TMO_NORMAL) def call_blockdev_shutdown(self, node, disk): @@ -1010,6 +1064,26 @@ class RpcRunner(object): return result @_RpcTimeout(_TMO_NORMAL) + def call_blockdev_getmirrorstatus_multi(self, node_list, node_disks): + """Request status of (mirroring) devices from multiple nodes. + + This is a multi-node call. + + """ + result = self._MultiNodeCall(node_list, "blockdev_getmirrorstatus_multi", + [dict((name, [dsk.ToDict() for dsk in disks]) + for name, disks in node_disks.items())]) + for nres in result.values(): + if nres.fail_msg: + continue + + for idx, (success, status) in enumerate(nres.payload): + if success: + nres.payload[idx] = (success, objects.BlockDevStatus.FromDict(status)) + + return result + + @_RpcTimeout(_TMO_NORMAL) def call_blockdev_find(self, node, disk): """Request identification of a given block device. @@ -1032,7 +1106,7 @@ class RpcRunner(object): return self._SingleNodeCall(node, "blockdev_close", params) @_RpcTimeout(_TMO_NORMAL) - def call_blockdev_getsizes(self, node, disks): + def call_blockdev_getsize(self, node, disks): """Returns the size of the given disks. This is a single-node call. @@ -1104,8 +1178,9 @@ class RpcRunner(object): file_contents = utils.ReadFile(file_name) data = cls._Compress(file_contents) st = os.stat(file_name) - params = [file_name, data, st.st_mode, st.st_uid, st.st_gid, - st.st_atime, st.st_mtime] + getents = runtime.GetEnts() + params = [file_name, data, st.st_mode, getents.LookupUid(st.st_uid), + getents.LookupGid(st.st_gid), st.st_atime, st.st_mtime] return cls._StaticMultiNodeCall(node_list, "upload_file", params, address_list=address_list) @@ -1119,6 +1194,16 @@ class RpcRunner(object): """ return cls._StaticMultiNodeCall(node_list, "write_ssconf_files", [values]) + @_RpcTimeout(_TMO_NORMAL) + def call_run_oob(self, node, oob_program, command, remote_node, timeout): + """Runs OOB. + + This is a single-node call. + + """ + return self._SingleNodeCall(node, "run_oob", [oob_program, command, + remote_node, timeout]) + @_RpcTimeout(_TMO_FAST) def call_os_diagnose(self, node_list): """Request a diagnose of OS definitions. @@ -1178,14 +1263,14 @@ class RpcRunner(object): return self._SingleNodeCall(node, "iallocator_runner", [name, idata]) @_RpcTimeout(_TMO_NORMAL) - def call_blockdev_grow(self, node, cf_bdev, amount): + def call_blockdev_grow(self, node, cf_bdev, amount, dryrun): """Request a snapshot of the given block device. This is a single-node call. """ return self._SingleNodeCall(node, "blockdev_grow", - [cf_bdev.ToDict(), amount]) + [cf_bdev.ToDict(), amount, dryrun]) @_RpcTimeout(_TMO_1DAY) def call_blockdev_export(self, node, cf_bdev, @@ -1337,7 +1422,7 @@ class RpcRunner(object): [old_file_storage_dir, new_file_storage_dir]) @classmethod - @_RpcTimeout(_TMO_FAST) + @_RpcTimeout(_TMO_URGENT) def call_jobqueue_update(cls, node_list, address_list, file_name, content): """Update job queue. @@ -1359,7 +1444,7 @@ class RpcRunner(object): return cls._StaticSingleNodeCall(node, "jobqueue_purge", []) @classmethod - @_RpcTimeout(_TMO_FAST) + @_RpcTimeout(_TMO_URGENT) def call_jobqueue_rename(cls, node_list, address_list, rename): """Rename a job queue file. @@ -1413,7 +1498,8 @@ class RpcRunner(object): return self._SingleNodeCall(node, "x509_cert_remove", [name]) @_RpcTimeout(_TMO_NORMAL) - def call_import_start(self, node, opts, instance, dest, dest_args): + def call_import_start(self, node, opts, instance, component, + dest, dest_args): """Starts a listener for an import. This is a single-node call. @@ -1422,16 +1508,18 @@ class RpcRunner(object): @param node: Node name @type instance: C{objects.Instance} @param instance: Instance object + @type component: string + @param component: which part of the instance is being imported """ return self._SingleNodeCall(node, "import_start", [opts.ToDict(), - self._InstDict(instance), dest, + self._InstDict(instance), component, dest, _EncodeImportExportIO(dest, dest_args)]) @_RpcTimeout(_TMO_NORMAL) def call_export_start(self, node, opts, host, port, - instance, source, source_args): + instance, component, source, source_args): """Starts an export daemon. This is a single-node call. @@ -1440,11 +1528,14 @@ class RpcRunner(object): @param node: Node name @type instance: C{objects.Instance} @param instance: Instance object + @type component: string + @param component: which part of the instance is being imported """ return self._SingleNodeCall(node, "export_start", [opts.ToDict(), host, port, - self._InstDict(instance), source, + self._InstDict(instance), + component, source, _EncodeImportExportIO(source, source_args)]) @_RpcTimeout(_TMO_FAST)