X-Git-Url: https://code.grnet.gr/git/ganeti-local/blobdiff_plain/8d8c4eff2135d74b919c3c56dbb51027b76aa680..5c097318202f47e88b6a168f97920549416fb15a:/lib/rpc.py diff --git a/lib/rpc.py b/lib/rpc.py index ffe64c3..4e2693e 100644 --- a/lib/rpc.py +++ b/lib/rpc.py @@ -1,7 +1,7 @@ # # -# Copyright (C) 2006, 2007, 2008, 2009, 2010 Google Inc. +# Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011 Google Inc. # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by @@ -119,7 +119,12 @@ def _ConfigRpcCurl(curl): curl.setopt(pycurl.CONNECTTIMEOUT, _RPC_CONNECT_TIMEOUT) -class _RpcThreadLocal(threading.local): +# Aliasing this module avoids the following warning by epydoc: "Warning: No +# information available for ganeti.rpc._RpcThreadLocal's base threading.local" +_threading = threading + + +class _RpcThreadLocal(_threading.local): def GetHttpClientPool(self): """Returns a per-thread HTTP client pool. @@ -135,6 +140,10 @@ class _RpcThreadLocal(threading.local): return pool +# Remove module alias (see above) +del _threading + + _thread_local = _RpcThreadLocal() @@ -217,12 +226,9 @@ class RpcResult(object): self.fail_msg = None self.payload = data[1] - assert hasattr(self, "call") - assert hasattr(self, "data") - assert hasattr(self, "fail_msg") - assert hasattr(self, "node") - assert hasattr(self, "offline") - assert hasattr(self, "payload") + for attr_name in ["call", "data", "fail_msg", + "node", "offline", "payload"]: + assert hasattr(self, attr_name), "Missing attribute %s" % attr_name @staticmethod def _EnsureErr(val): @@ -953,6 +959,16 @@ class RpcRunner(object): return self._SingleNodeCall(node, "blockdev_create", [bdev.ToDict(), size, owner, on_primary, info]) + @_RpcTimeout(_TMO_SLOW) + def call_blockdev_wipe(self, node, bdev, offset, size): + """Request wipe at given offset with given size of a block device. + + This is a single-node call. + + """ + return self._SingleNodeCall(node, "blockdev_wipe", + [bdev.ToDict(), offset, size]) + @_RpcTimeout(_TMO_NORMAL) def call_blockdev_remove(self, node, bdev): """Request removal of a given block device. @@ -973,14 +989,24 @@ class RpcRunner(object): [(d.ToDict(), uid) for d, uid in devlist]) @_RpcTimeout(_TMO_NORMAL) - def call_blockdev_assemble(self, node, disk, owner, on_primary): + def call_blockdev_pause_resume_sync(self, node, disks, pause): + """Request a pause/resume of given block device. + + This is a single-node call. + + """ + return self._SingleNodeCall(node, "blockdev_pause_resume_sync", + [[bdev.ToDict() for bdev in disks], pause]) + + @_RpcTimeout(_TMO_NORMAL) + def call_blockdev_assemble(self, node, disk, owner, on_primary, idx): """Request assembling of a given block device. This is a single-node call. """ return self._SingleNodeCall(node, "blockdev_assemble", - [disk.ToDict(), owner, on_primary]) + [disk.ToDict(), owner, on_primary, idx]) @_RpcTimeout(_TMO_NORMAL) def call_blockdev_shutdown(self, node, disk): @@ -1028,6 +1054,26 @@ class RpcRunner(object): return result @_RpcTimeout(_TMO_NORMAL) + def call_blockdev_getmirrorstatus_multi(self, node_list, node_disks): + """Request status of (mirroring) devices from multiple nodes. + + This is a multi-node call. + + """ + result = self._MultiNodeCall(node_list, "blockdev_getmirrorstatus_multi", + [dict((name, [dsk.ToDict() for dsk in disks]) + for name, disks in node_disks.items())]) + for nres in result.values(): + if nres.fail_msg: + continue + + for idx, (success, status) in enumerate(nres.payload): + if success: + nres.payload[idx] = (success, objects.BlockDevStatus.FromDict(status)) + + return result + + @_RpcTimeout(_TMO_NORMAL) def call_blockdev_find(self, node, disk): """Request identification of a given block device. @@ -1050,7 +1096,7 @@ class RpcRunner(object): return self._SingleNodeCall(node, "blockdev_close", params) @_RpcTimeout(_TMO_NORMAL) - def call_blockdev_getsizes(self, node, disks): + def call_blockdev_getsize(self, node, disks): """Returns the size of the given disks. This is a single-node call. @@ -1137,6 +1183,16 @@ class RpcRunner(object): """ return cls._StaticMultiNodeCall(node_list, "write_ssconf_files", [values]) + @_RpcTimeout(_TMO_NORMAL) + def call_run_oob(self, node, oob_program, command, remote_node, timeout): + """Runs OOB. + + This is a single-node call. + + """ + return self._SingleNodeCall(node, "run_oob", [oob_program, command, + remote_node, timeout]) + @_RpcTimeout(_TMO_FAST) def call_os_diagnose(self, node_list): """Request a diagnose of OS definitions. @@ -1355,7 +1411,7 @@ class RpcRunner(object): [old_file_storage_dir, new_file_storage_dir]) @classmethod - @_RpcTimeout(_TMO_FAST) + @_RpcTimeout(_TMO_URGENT) def call_jobqueue_update(cls, node_list, address_list, file_name, content): """Update job queue. @@ -1377,7 +1433,7 @@ class RpcRunner(object): return cls._StaticSingleNodeCall(node, "jobqueue_purge", []) @classmethod - @_RpcTimeout(_TMO_FAST) + @_RpcTimeout(_TMO_URGENT) def call_jobqueue_rename(cls, node_list, address_list, rename): """Rename a job queue file.