X-Git-Url: https://code.grnet.gr/git/ganeti-local/blobdiff_plain/19ddc57a517d84f59c3c24da232fd98e7e85efd9..297b0cd3faa9e114c40079e3490eb95cf0ec9701:/lib/rpc.py diff --git a/lib/rpc.py b/lib/rpc.py index b2fb7cb..e482a20 100644 --- a/lib/rpc.py +++ b/lib/rpc.py @@ -1,7 +1,7 @@ # # -# Copyright (C) 2006, 2007, 2010 Google Inc. +# Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011 Google Inc. # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by @@ -45,6 +45,7 @@ from ganeti import constants from ganeti import errors from ganeti import netutils from ganeti import ssconf +from ganeti import runtime # pylint has a bug here, doesn't see this import import ganeti.http.client # pylint: disable-msg=W0611 @@ -55,6 +56,7 @@ _RPC_CONNECT_TIMEOUT = 5 _RPC_CLIENT_HEADERS = [ "Content-type: %s" % http.HTTP_APP_JSON, + "Expect:", ] # Various time constants for the timeout table @@ -118,7 +120,12 @@ def _ConfigRpcCurl(curl): curl.setopt(pycurl.CONNECTTIMEOUT, _RPC_CONNECT_TIMEOUT) -class _RpcThreadLocal(threading.local): +# Aliasing this module avoids the following warning by epydoc: "Warning: No +# information available for ganeti.rpc._RpcThreadLocal's base threading.local" +_threading = threading + + +class _RpcThreadLocal(_threading.local): def GetHttpClientPool(self): """Returns a per-thread HTTP client pool. @@ -134,6 +141,10 @@ class _RpcThreadLocal(threading.local): return pool +# Remove module alias (see above) +del _threading + + _thread_local = _RpcThreadLocal() @@ -216,12 +227,9 @@ class RpcResult(object): self.fail_msg = None self.payload = data[1] - assert hasattr(self, "call") - assert hasattr(self, "data") - assert hasattr(self, "fail_msg") - assert hasattr(self, "node") - assert hasattr(self, "offline") - assert hasattr(self, "payload") + for attr_name in ["call", "data", "fail_msg", + "node", "offline", "payload"]: + assert hasattr(self, attr_name), "Missing attribute %s" % attr_name @staticmethod def _EnsureErr(val): @@ -432,7 +440,7 @@ class RpcRunner(object): @type bep: dict or None @param bep: a dictionary with overridden backend parameters @type osp: dict or None - @param osp: a dictionary with overriden os parameters + @param osp: a dictionary with overridden os parameters @rtype: dict @return: the instance dict, with the hvparams filled with the cluster defaults @@ -583,6 +591,15 @@ class RpcRunner(object): # @_RpcTimeout(_TMO_URGENT) + def call_bdev_sizes(self, node_list, devices): + """Gets the sizes of requested block devices present on a node + + This is a multi-node call. + + """ + return self._MultiNodeCall(node_list, "bdev_sizes", [devices]) + + @_RpcTimeout(_TMO_URGENT) def call_lv_list(self, node_list, vg_name): """Gets the logical volumes present in a given volume group. @@ -751,14 +768,15 @@ class RpcRunner(object): shutdown_timeout]) @_RpcTimeout(_TMO_1DAY) - def call_instance_os_add(self, node, inst, reinstall, debug): + def call_instance_os_add(self, node, inst, reinstall, debug, osparams=None): """Installs an OS on the given instance. This is a single-node call. """ return self._SingleNodeCall(node, "instance_os_add", - [self._InstDict(inst), reinstall, debug]) + [self._InstDict(inst, osp=osparams), + reinstall, debug]) @_RpcTimeout(_TMO_SLOW) def call_instance_run_rename(self, node, inst, old_name, debug): @@ -951,6 +969,16 @@ class RpcRunner(object): return self._SingleNodeCall(node, "blockdev_create", [bdev.ToDict(), size, owner, on_primary, info]) + @_RpcTimeout(_TMO_SLOW) + def call_blockdev_wipe(self, node, bdev, offset, size): + """Request wipe at given offset with given size of a block device. + + This is a single-node call. + + """ + return self._SingleNodeCall(node, "blockdev_wipe", + [bdev.ToDict(), offset, size]) + @_RpcTimeout(_TMO_NORMAL) def call_blockdev_remove(self, node, bdev): """Request removal of a given block device. @@ -971,14 +999,24 @@ class RpcRunner(object): [(d.ToDict(), uid) for d, uid in devlist]) @_RpcTimeout(_TMO_NORMAL) - def call_blockdev_assemble(self, node, disk, owner, on_primary): + def call_blockdev_pause_resume_sync(self, node, disks, pause): + """Request a pause/resume of given block device. + + This is a single-node call. + + """ + return self._SingleNodeCall(node, "blockdev_pause_resume_sync", + [[bdev.ToDict() for bdev in disks], pause]) + + @_RpcTimeout(_TMO_NORMAL) + def call_blockdev_assemble(self, node, disk, owner, on_primary, idx): """Request assembling of a given block device. This is a single-node call. """ return self._SingleNodeCall(node, "blockdev_assemble", - [disk.ToDict(), owner, on_primary]) + [disk.ToDict(), owner, on_primary, idx]) @_RpcTimeout(_TMO_NORMAL) def call_blockdev_shutdown(self, node, disk): @@ -1026,6 +1064,26 @@ class RpcRunner(object): return result @_RpcTimeout(_TMO_NORMAL) + def call_blockdev_getmirrorstatus_multi(self, node_list, node_disks): + """Request status of (mirroring) devices from multiple nodes. + + This is a multi-node call. + + """ + result = self._MultiNodeCall(node_list, "blockdev_getmirrorstatus_multi", + [dict((name, [dsk.ToDict() for dsk in disks]) + for name, disks in node_disks.items())]) + for nres in result.values(): + if nres.fail_msg: + continue + + for idx, (success, status) in enumerate(nres.payload): + if success: + nres.payload[idx] = (success, objects.BlockDevStatus.FromDict(status)) + + return result + + @_RpcTimeout(_TMO_NORMAL) def call_blockdev_find(self, node, disk): """Request identification of a given block device. @@ -1048,7 +1106,7 @@ class RpcRunner(object): return self._SingleNodeCall(node, "blockdev_close", params) @_RpcTimeout(_TMO_NORMAL) - def call_blockdev_getsizes(self, node, disks): + def call_blockdev_getsize(self, node, disks): """Returns the size of the given disks. This is a single-node call. @@ -1120,8 +1178,9 @@ class RpcRunner(object): file_contents = utils.ReadFile(file_name) data = cls._Compress(file_contents) st = os.stat(file_name) - params = [file_name, data, st.st_mode, st.st_uid, st.st_gid, - st.st_atime, st.st_mtime] + getents = runtime.GetEnts() + params = [file_name, data, st.st_mode, getents.LookupUid(st.st_uid), + getents.LookupGid(st.st_gid), st.st_atime, st.st_mtime] return cls._StaticMultiNodeCall(node_list, "upload_file", params, address_list=address_list) @@ -1135,6 +1194,16 @@ class RpcRunner(object): """ return cls._StaticMultiNodeCall(node_list, "write_ssconf_files", [values]) + @_RpcTimeout(_TMO_NORMAL) + def call_run_oob(self, node, oob_program, command, remote_node, timeout): + """Runs OOB. + + This is a single-node call. + + """ + return self._SingleNodeCall(node, "run_oob", [oob_program, command, + remote_node, timeout]) + @_RpcTimeout(_TMO_FAST) def call_os_diagnose(self, node_list): """Request a diagnose of OS definitions. @@ -1194,14 +1263,14 @@ class RpcRunner(object): return self._SingleNodeCall(node, "iallocator_runner", [name, idata]) @_RpcTimeout(_TMO_NORMAL) - def call_blockdev_grow(self, node, cf_bdev, amount): + def call_blockdev_grow(self, node, cf_bdev, amount, dryrun): """Request a snapshot of the given block device. This is a single-node call. """ return self._SingleNodeCall(node, "blockdev_grow", - [cf_bdev.ToDict(), amount]) + [cf_bdev.ToDict(), amount, dryrun]) @_RpcTimeout(_TMO_1DAY) def call_blockdev_export(self, node, cf_bdev, @@ -1353,7 +1422,7 @@ class RpcRunner(object): [old_file_storage_dir, new_file_storage_dir]) @classmethod - @_RpcTimeout(_TMO_FAST) + @_RpcTimeout(_TMO_URGENT) def call_jobqueue_update(cls, node_list, address_list, file_name, content): """Update job queue. @@ -1375,7 +1444,7 @@ class RpcRunner(object): return cls._StaticSingleNodeCall(node, "jobqueue_purge", []) @classmethod - @_RpcTimeout(_TMO_FAST) + @_RpcTimeout(_TMO_URGENT) def call_jobqueue_rename(cls, node_list, address_list, rename): """Rename a job queue file.