X-Git-Url: https://code.grnet.gr/git/ganeti-local/blobdiff_plain/b43dcc5a11c2bd743b9d457319ab6f162ffcc82b..5c097318202f47e88b6a168f97920549416fb15a:/lib/rpc.py diff --git a/lib/rpc.py b/lib/rpc.py index 6373e1a..4e2693e 100644 --- a/lib/rpc.py +++ b/lib/rpc.py @@ -1,7 +1,7 @@ # # -# Copyright (C) 2006, 2007, 2010 Google Inc. +# Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011 Google Inc. # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by @@ -55,6 +55,7 @@ _RPC_CONNECT_TIMEOUT = 5 _RPC_CLIENT_HEADERS = [ "Content-type: %s" % http.HTTP_APP_JSON, + "Expect:", ] # Various time constants for the timeout table @@ -118,7 +119,12 @@ def _ConfigRpcCurl(curl): curl.setopt(pycurl.CONNECTTIMEOUT, _RPC_CONNECT_TIMEOUT) -class _RpcThreadLocal(threading.local): +# Aliasing this module avoids the following warning by epydoc: "Warning: No +# information available for ganeti.rpc._RpcThreadLocal's base threading.local" +_threading = threading + + +class _RpcThreadLocal(_threading.local): def GetHttpClientPool(self): """Returns a per-thread HTTP client pool. @@ -134,6 +140,10 @@ class _RpcThreadLocal(threading.local): return pool +# Remove module alias (see above) +del _threading + + _thread_local = _RpcThreadLocal() @@ -216,12 +226,9 @@ class RpcResult(object): self.fail_msg = None self.payload = data[1] - assert hasattr(self, "call") - assert hasattr(self, "data") - assert hasattr(self, "fail_msg") - assert hasattr(self, "node") - assert hasattr(self, "offline") - assert hasattr(self, "payload") + for attr_name in ["call", "data", "fail_msg", + "node", "offline", "payload"]: + assert hasattr(self, attr_name), "Missing attribute %s" % attr_name @staticmethod def _EnsureErr(val): @@ -266,8 +273,8 @@ def _AddressLookup(node_list, @param node_list: List of node names @type ssc: class @param ssc: SimpleStore class that is used to obtain node->ip mappings - @type lookup_fn: callable - @param lookup_fn: function use to do NS lookup + @type nslookup_fn: callable + @param nslookup_fn: function use to do NS lookup @rtype: list of addresses and/or None's @returns: List of corresponding addresses, if found @@ -432,7 +439,7 @@ class RpcRunner(object): @type bep: dict or None @param bep: a dictionary with overridden backend parameters @type osp: dict or None - @param osp: a dictionary with overriden os parameters + @param osp: a dictionary with overridden os parameters @rtype: dict @return: the instance dict, with the hvparams filled with the cluster defaults @@ -751,14 +758,15 @@ class RpcRunner(object): shutdown_timeout]) @_RpcTimeout(_TMO_1DAY) - def call_instance_os_add(self, node, inst, reinstall, debug): + def call_instance_os_add(self, node, inst, reinstall, debug, osparams=None): """Installs an OS on the given instance. This is a single-node call. """ return self._SingleNodeCall(node, "instance_os_add", - [self._InstDict(inst), reinstall, debug]) + [self._InstDict(inst, osp=osparams), + reinstall, debug]) @_RpcTimeout(_TMO_SLOW) def call_instance_run_rename(self, node, inst, old_name, debug): @@ -874,14 +882,20 @@ class RpcRunner(object): [vg_name, hypervisor_type]) @_RpcTimeout(_TMO_NORMAL) - def call_node_add(self, node, dsa, dsapub, rsa, rsapub, ssh, sshpub): - """Add a node to the cluster. + def call_etc_hosts_modify(self, node, mode, name, ip): + """Modify hosts file with name - This is a single-node call. + @type node: string + @param node: The node to call + @type mode: string + @param mode: The mode to operate. Currently "add" or "remove" + @type name: string + @param name: The host name to be modified + @type ip: string + @param ip: The ip of the entry (just valid if mode is "add") """ - return self._SingleNodeCall(node, "node_add", - [dsa, dsapub, rsa, rsapub, ssh, sshpub]) + return self._SingleNodeCall(node, "etc_hosts_modify", [mode, name, ip]) @_RpcTimeout(_TMO_NORMAL) def call_node_verify(self, node_list, checkdict, cluster_name): @@ -945,6 +959,16 @@ class RpcRunner(object): return self._SingleNodeCall(node, "blockdev_create", [bdev.ToDict(), size, owner, on_primary, info]) + @_RpcTimeout(_TMO_SLOW) + def call_blockdev_wipe(self, node, bdev, offset, size): + """Request wipe at given offset with given size of a block device. + + This is a single-node call. + + """ + return self._SingleNodeCall(node, "blockdev_wipe", + [bdev.ToDict(), offset, size]) + @_RpcTimeout(_TMO_NORMAL) def call_blockdev_remove(self, node, bdev): """Request removal of a given block device. @@ -965,14 +989,24 @@ class RpcRunner(object): [(d.ToDict(), uid) for d, uid in devlist]) @_RpcTimeout(_TMO_NORMAL) - def call_blockdev_assemble(self, node, disk, owner, on_primary): + def call_blockdev_pause_resume_sync(self, node, disks, pause): + """Request a pause/resume of given block device. + + This is a single-node call. + + """ + return self._SingleNodeCall(node, "blockdev_pause_resume_sync", + [[bdev.ToDict() for bdev in disks], pause]) + + @_RpcTimeout(_TMO_NORMAL) + def call_blockdev_assemble(self, node, disk, owner, on_primary, idx): """Request assembling of a given block device. This is a single-node call. """ return self._SingleNodeCall(node, "blockdev_assemble", - [disk.ToDict(), owner, on_primary]) + [disk.ToDict(), owner, on_primary, idx]) @_RpcTimeout(_TMO_NORMAL) def call_blockdev_shutdown(self, node, disk): @@ -1020,6 +1054,26 @@ class RpcRunner(object): return result @_RpcTimeout(_TMO_NORMAL) + def call_blockdev_getmirrorstatus_multi(self, node_list, node_disks): + """Request status of (mirroring) devices from multiple nodes. + + This is a multi-node call. + + """ + result = self._MultiNodeCall(node_list, "blockdev_getmirrorstatus_multi", + [dict((name, [dsk.ToDict() for dsk in disks]) + for name, disks in node_disks.items())]) + for nres in result.values(): + if nres.fail_msg: + continue + + for idx, (success, status) in enumerate(nres.payload): + if success: + nres.payload[idx] = (success, objects.BlockDevStatus.FromDict(status)) + + return result + + @_RpcTimeout(_TMO_NORMAL) def call_blockdev_find(self, node, disk): """Request identification of a given block device. @@ -1042,7 +1096,7 @@ class RpcRunner(object): return self._SingleNodeCall(node, "blockdev_close", params) @_RpcTimeout(_TMO_NORMAL) - def call_blockdev_getsizes(self, node, disks): + def call_blockdev_getsize(self, node, disks): """Returns the size of the given disks. This is a single-node call. @@ -1129,6 +1183,16 @@ class RpcRunner(object): """ return cls._StaticMultiNodeCall(node_list, "write_ssconf_files", [values]) + @_RpcTimeout(_TMO_NORMAL) + def call_run_oob(self, node, oob_program, command, remote_node, timeout): + """Runs OOB. + + This is a single-node call. + + """ + return self._SingleNodeCall(node, "run_oob", [oob_program, command, + remote_node, timeout]) + @_RpcTimeout(_TMO_FAST) def call_os_diagnose(self, node_list): """Request a diagnose of OS definitions. @@ -1347,7 +1411,7 @@ class RpcRunner(object): [old_file_storage_dir, new_file_storage_dir]) @classmethod - @_RpcTimeout(_TMO_FAST) + @_RpcTimeout(_TMO_URGENT) def call_jobqueue_update(cls, node_list, address_list, file_name, content): """Update job queue. @@ -1369,7 +1433,7 @@ class RpcRunner(object): return cls._StaticSingleNodeCall(node, "jobqueue_purge", []) @classmethod - @_RpcTimeout(_TMO_FAST) + @_RpcTimeout(_TMO_URGENT) def call_jobqueue_rename(cls, node_list, address_list, rename): """Rename a job queue file.