4 # Copyright (C) 2006, 2007 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Inter-node RPC library.
26 # pylint: disable-msg=C0103,R0201,R0904
27 # C0103: Invalid name, since call_ are not valid
28 # R0201: Method could be a function, we keep all rpcs instance methods
29 # as not to change them back and forth between static/instance methods
30 # if they need to start using instance attributes
31 # R0904: Too many public methods
40 from ganeti import utils
41 from ganeti import objects
42 from ganeti import http
43 from ganeti import serializer
44 from ganeti import constants
45 from ganeti import errors
46 from ganeti import netutils
48 # pylint has a bug here, doesn't see this import
49 import ganeti.http.client # pylint: disable-msg=W0611
52 # Timeout for connecting to nodes (seconds)
53 _RPC_CONNECT_TIMEOUT = 5
55 _RPC_CLIENT_HEADERS = [
56 "Content-type: %s" % http.HTTP_APP_JSON,
59 # Various time constants for the timeout table
60 _TMO_URGENT = 60 # one minute
61 _TMO_FAST = 5 * 60 # five minutes
62 _TMO_NORMAL = 15 * 60 # 15 minutes
63 _TMO_SLOW = 3600 # one hour
67 # Timeout table that will be built later by decorators
68 # Guidelines for choosing timeouts:
69 # - call used during watcher: timeout -> 1min, _TMO_URGENT
70 # - trivial (but be sure it is trivial) (e.g. reading a file): 5min, _TMO_FAST
71 # - other calls: 15 min, _TMO_NORMAL
72 # - special calls (instance add, etc.): either _TMO_SLOW (1h) or huge timeouts
79 """Initializes the module-global HTTP client manager.
81 Must be called before using any RPC function and while exactly one thread is
85 # curl_global_init(3) and curl_global_cleanup(3) must be called with only
86 # one thread running. This check is just a safety measure -- it doesn't
88 assert threading.activeCount() == 1, \
89 "Found more than one active thread when initializing pycURL"
91 logging.info("Using PycURL %s", pycurl.version)
93 pycurl.global_init(pycurl.GLOBAL_ALL)
97 """Stops the module-global HTTP client manager.
99 Must be called before quitting the program and while exactly one thread is
103 pycurl.global_cleanup()
106 def _ConfigRpcCurl(curl):
107 noded_cert = str(constants.NODED_CERT_FILE)
109 curl.setopt(pycurl.FOLLOWLOCATION, False)
110 curl.setopt(pycurl.CAINFO, noded_cert)
111 curl.setopt(pycurl.SSL_VERIFYHOST, 0)
112 curl.setopt(pycurl.SSL_VERIFYPEER, True)
113 curl.setopt(pycurl.SSLCERTTYPE, "PEM")
114 curl.setopt(pycurl.SSLCERT, noded_cert)
115 curl.setopt(pycurl.SSLKEYTYPE, "PEM")
116 curl.setopt(pycurl.SSLKEY, noded_cert)
117 curl.setopt(pycurl.CONNECTTIMEOUT, _RPC_CONNECT_TIMEOUT)
120 class _RpcThreadLocal(threading.local):
121 def GetHttpClientPool(self):
122 """Returns a per-thread HTTP client pool.
124 @rtype: L{http.client.HttpClientPool}
129 except AttributeError:
130 pool = http.client.HttpClientPool(_ConfigRpcCurl)
136 _thread_local = _RpcThreadLocal()
139 def _RpcTimeout(secs):
140 """Timeout decorator.
142 When applied to a rpc call_* function, it updates the global timeout
143 table with the given function/timeout.
148 assert name.startswith("call_")
149 _TIMEOUTS[name[len("call_"):]] = secs
155 """RPC-wrapper decorator.
157 When applied to a function, it runs it with the RPC system
158 initialized, and it shutsdown the system afterwards. This means the
159 function must be called without RPC being initialized.
162 def wrapper(*args, **kwargs):
165 return fn(*args, **kwargs)
171 class RpcResult(object):
174 This class holds an RPC result. It is needed since in multi-node
175 calls we can't raise an exception just because one one out of many
176 failed, and therefore we use this class to encapsulate the result.
178 @ivar data: the data payload, for successful results, or None
179 @ivar call: the name of the RPC call
180 @ivar node: the name of the node to which we made the call
181 @ivar offline: whether the operation failed because the node was
182 offline, as opposed to actual failure; offline=True will always
183 imply failed=True, in order to allow simpler checking if
184 the user doesn't care about the exact failure mode
185 @ivar fail_msg: the error message if the call failed
188 def __init__(self, data=None, failed=False, offline=False,
189 call=None, node=None):
190 self.offline = offline
195 self.fail_msg = "Node is marked offline"
196 self.data = self.payload = None
198 self.fail_msg = self._EnsureErr(data)
199 self.data = self.payload = None
202 if not isinstance(self.data, (tuple, list)):
203 self.fail_msg = ("RPC layer error: invalid result type (%s)" %
207 self.fail_msg = ("RPC layer error: invalid result length (%d), "
208 "expected 2" % len(self.data))
210 elif not self.data[0]:
211 self.fail_msg = self._EnsureErr(self.data[1])
216 self.payload = data[1]
218 assert hasattr(self, "call")
219 assert hasattr(self, "data")
220 assert hasattr(self, "fail_msg")
221 assert hasattr(self, "node")
222 assert hasattr(self, "offline")
223 assert hasattr(self, "payload")
227 """Helper to ensure we return a 'True' value for error."""
231 return "No error information"
233 def Raise(self, msg, prereq=False, ecode=None):
234 """If the result has failed, raise an OpExecError.
236 This is used so that LU code doesn't have to check for each
237 result, but instead can call this function.
240 if not self.fail_msg:
243 if not msg: # one could pass None for default message
244 msg = ("Call '%s' to node '%s' has failed: %s" %
245 (self.call, self.node, self.fail_msg))
247 msg = "%s: %s" % (msg, self.fail_msg)
249 ec = errors.OpPrereqError
251 ec = errors.OpExecError
252 if ecode is not None:
256 raise ec(*args) # pylint: disable-msg=W0142
262 This class, given a (remote) method name, a list of parameters and a
263 list of nodes, will contact (in parallel) all nodes, and return a
264 dict of results (key: node name, value: result).
266 One current bug is that generic failure is still signaled by
267 'False' result, which is not good. This overloading of values can
271 def __init__(self, procedure, body, port):
272 assert procedure in _TIMEOUTS, ("New RPC call not declared in the"
274 self.procedure = procedure
279 def ConnectList(self, node_list, address_list=None, read_timeout=None):
280 """Add a list of nodes to the target nodes.
282 @type node_list: list
283 @param node_list: the list of node names to connect
284 @type address_list: list or None
285 @keyword address_list: either None or a list with node addresses,
286 which must have the same length as the node list
287 @type read_timeout: int
288 @param read_timeout: overwrites the default read timeout for the
292 if address_list is None:
293 address_list = [None for _ in node_list]
295 assert len(node_list) == len(address_list), \
296 "Name and address lists should have the same length"
297 for node, address in zip(node_list, address_list):
298 self.ConnectNode(node, address, read_timeout=read_timeout)
300 def ConnectNode(self, name, address=None, read_timeout=None):
301 """Add a node to the target list.
304 @param name: the node name
306 @keyword address: the node address, if known
312 if read_timeout is None:
313 read_timeout = _TIMEOUTS[self.procedure]
315 self._request[name] = \
316 http.client.HttpClientRequest(str(address), self.port,
317 http.HTTP_PUT, str("/%s" % self.procedure),
318 headers=_RPC_CLIENT_HEADERS,
319 post_data=str(self.body),
320 read_timeout=read_timeout)
322 def GetResults(self, http_pool=None):
323 """Call nodes and return results.
326 @return: List of RPC results
330 http_pool = _thread_local.GetHttpClientPool()
332 http_pool.ProcessRequests(self._request.values())
336 for name, req in self._request.iteritems():
337 if req.success and req.resp_status_code == http.HTTP_OK:
338 results[name] = RpcResult(data=serializer.LoadJson(req.resp_body),
339 node=name, call=self.procedure)
342 # TODO: Better error reporting
348 logging.error("RPC error in %s from node %s: %s",
349 self.procedure, name, msg)
350 results[name] = RpcResult(data=msg, failed=True, node=name,
356 def _EncodeImportExportIO(ieio, ieioargs):
357 """Encodes import/export I/O information.
360 if ieio == constants.IEIO_RAW_DISK:
361 assert len(ieioargs) == 1
362 return (ieioargs[0].ToDict(), )
364 if ieio == constants.IEIO_SCRIPT:
365 assert len(ieioargs) == 2
366 return (ieioargs[0].ToDict(), ieioargs[1])
371 class RpcRunner(object):
372 """RPC runner class"""
374 def __init__(self, cfg):
375 """Initialized the rpc runner.
377 @type cfg: C{config.ConfigWriter}
378 @param cfg: the configuration object that will be used to get data
383 self.port = netutils.GetDaemonPort(constants.NODED)
385 def _InstDict(self, instance, hvp=None, bep=None, osp=None):
386 """Convert the given instance to a dict.
388 This is done via the instance's ToDict() method and additionally
389 we fill the hvparams with the cluster defaults.
391 @type instance: L{objects.Instance}
392 @param instance: an Instance object
393 @type hvp: dict or None
394 @param hvp: a dictionary with overridden hypervisor parameters
395 @type bep: dict or None
396 @param bep: a dictionary with overridden backend parameters
397 @type osp: dict or None
398 @param osp: a dictionary with overriden os parameters
400 @return: the instance dict, with the hvparams filled with the
404 idict = instance.ToDict()
405 cluster = self._cfg.GetClusterInfo()
406 idict["hvparams"] = cluster.FillHV(instance)
408 idict["hvparams"].update(hvp)
409 idict["beparams"] = cluster.FillBE(instance)
411 idict["beparams"].update(bep)
412 idict["osparams"] = cluster.SimpleFillOS(instance.os, instance.osparams)
414 idict["osparams"].update(osp)
415 for nic in idict["nics"]:
416 nic['nicparams'] = objects.FillDict(
417 cluster.nicparams[constants.PP_DEFAULT],
421 def _ConnectList(self, client, node_list, call, read_timeout=None):
422 """Helper for computing node addresses.
424 @type client: L{ganeti.rpc.Client}
425 @param client: a C{Client} instance
426 @type node_list: list
427 @param node_list: the node list we should connect
429 @param call: the name of the remote procedure call, for filling in
430 correctly any eventual offline nodes' results
431 @type read_timeout: int
432 @param read_timeout: overwrites the default read timeout for the
436 all_nodes = self._cfg.GetAllNodesInfo()
440 for node in node_list:
441 if node in all_nodes:
442 if all_nodes[node].offline:
443 skip_dict[node] = RpcResult(node=node, offline=True, call=call)
445 val = all_nodes[node].primary_ip
448 addr_list.append(val)
449 name_list.append(node)
451 client.ConnectList(name_list, address_list=addr_list,
452 read_timeout=read_timeout)
455 def _ConnectNode(self, client, node, call, read_timeout=None):
456 """Helper for computing one node's address.
458 @type client: L{ganeti.rpc.Client}
459 @param client: a C{Client} instance
461 @param node: the node we should connect
463 @param call: the name of the remote procedure call, for filling in
464 correctly any eventual offline nodes' results
465 @type read_timeout: int
466 @param read_timeout: overwrites the default read timeout for the
470 node_info = self._cfg.GetNodeInfo(node)
471 if node_info is not None:
472 if node_info.offline:
473 return RpcResult(node=node, offline=True, call=call)
474 addr = node_info.primary_ip
477 client.ConnectNode(node, address=addr, read_timeout=read_timeout)
479 def _MultiNodeCall(self, node_list, procedure, args, read_timeout=None):
480 """Helper for making a multi-node call
483 body = serializer.DumpJson(args, indent=False)
484 c = Client(procedure, body, self.port)
485 skip_dict = self._ConnectList(c, node_list, procedure,
486 read_timeout=read_timeout)
487 skip_dict.update(c.GetResults())
491 def _StaticMultiNodeCall(cls, node_list, procedure, args,
492 address_list=None, read_timeout=None):
493 """Helper for making a multi-node static call
496 body = serializer.DumpJson(args, indent=False)
497 c = Client(procedure, body, netutils.GetDaemonPort(constants.NODED))
498 c.ConnectList(node_list, address_list=address_list,
499 read_timeout=read_timeout)
500 return c.GetResults()
502 def _SingleNodeCall(self, node, procedure, args, read_timeout=None):
503 """Helper for making a single-node call
506 body = serializer.DumpJson(args, indent=False)
507 c = Client(procedure, body, self.port)
508 result = self._ConnectNode(c, node, procedure, read_timeout=read_timeout)
510 # we did connect, node is not offline
511 result = c.GetResults()[node]
515 def _StaticSingleNodeCall(cls, node, procedure, args, read_timeout=None):
516 """Helper for making a single-node static call
519 body = serializer.DumpJson(args, indent=False)
520 c = Client(procedure, body, netutils.GetDaemonPort(constants.NODED))
521 c.ConnectNode(node, read_timeout=read_timeout)
522 return c.GetResults()[node]
526 """Compresses a string for transport over RPC.
528 Small amounts of data are not compressed.
533 @return: Encoded data to send
536 # Small amounts of data are not compressed
538 return (constants.RPC_ENCODING_NONE, data)
540 # Compress with zlib and encode in base64
541 return (constants.RPC_ENCODING_ZLIB_BASE64,
542 base64.b64encode(zlib.compress(data, 3)))
548 @_RpcTimeout(_TMO_URGENT)
549 def call_lv_list(self, node_list, vg_name):
550 """Gets the logical volumes present in a given volume group.
552 This is a multi-node call.
555 return self._MultiNodeCall(node_list, "lv_list", [vg_name])
557 @_RpcTimeout(_TMO_URGENT)
558 def call_vg_list(self, node_list):
559 """Gets the volume group list.
561 This is a multi-node call.
564 return self._MultiNodeCall(node_list, "vg_list", [])
566 @_RpcTimeout(_TMO_NORMAL)
567 def call_storage_list(self, node_list, su_name, su_args, name, fields):
568 """Get list of storage units.
570 This is a multi-node call.
573 return self._MultiNodeCall(node_list, "storage_list",
574 [su_name, su_args, name, fields])
576 @_RpcTimeout(_TMO_NORMAL)
577 def call_storage_modify(self, node, su_name, su_args, name, changes):
578 """Modify a storage unit.
580 This is a single-node call.
583 return self._SingleNodeCall(node, "storage_modify",
584 [su_name, su_args, name, changes])
586 @_RpcTimeout(_TMO_NORMAL)
587 def call_storage_execute(self, node, su_name, su_args, name, op):
588 """Executes an operation on a storage unit.
590 This is a single-node call.
593 return self._SingleNodeCall(node, "storage_execute",
594 [su_name, su_args, name, op])
596 @_RpcTimeout(_TMO_URGENT)
597 def call_bridges_exist(self, node, bridges_list):
598 """Checks if a node has all the bridges given.
600 This method checks if all bridges given in the bridges_list are
601 present on the remote node, so that an instance that uses interfaces
602 on those bridges can be started.
604 This is a single-node call.
607 return self._SingleNodeCall(node, "bridges_exist", [bridges_list])
609 @_RpcTimeout(_TMO_NORMAL)
610 def call_instance_start(self, node, instance, hvp, bep):
611 """Starts an instance.
613 This is a single-node call.
616 idict = self._InstDict(instance, hvp=hvp, bep=bep)
617 return self._SingleNodeCall(node, "instance_start", [idict])
619 @_RpcTimeout(_TMO_NORMAL)
620 def call_instance_shutdown(self, node, instance, timeout):
621 """Stops an instance.
623 This is a single-node call.
626 return self._SingleNodeCall(node, "instance_shutdown",
627 [self._InstDict(instance), timeout])
629 @_RpcTimeout(_TMO_NORMAL)
630 def call_migration_info(self, node, instance):
631 """Gather the information necessary to prepare an instance migration.
633 This is a single-node call.
636 @param node: the node on which the instance is currently running
637 @type instance: C{objects.Instance}
638 @param instance: the instance definition
641 return self._SingleNodeCall(node, "migration_info",
642 [self._InstDict(instance)])
644 @_RpcTimeout(_TMO_NORMAL)
645 def call_accept_instance(self, node, instance, info, target):
646 """Prepare a node to accept an instance.
648 This is a single-node call.
651 @param node: the target node for the migration
652 @type instance: C{objects.Instance}
653 @param instance: the instance definition
654 @type info: opaque/hypervisor specific (string/data)
655 @param info: result for the call_migration_info call
657 @param target: target hostname (usually ip address) (on the node itself)
660 return self._SingleNodeCall(node, "accept_instance",
661 [self._InstDict(instance), info, target])
663 @_RpcTimeout(_TMO_NORMAL)
664 def call_finalize_migration(self, node, instance, info, success):
665 """Finalize any target-node migration specific operation.
667 This is called both in case of a successful migration and in case of error
668 (in which case it should abort the migration).
670 This is a single-node call.
673 @param node: the target node for the migration
674 @type instance: C{objects.Instance}
675 @param instance: the instance definition
676 @type info: opaque/hypervisor specific (string/data)
677 @param info: result for the call_migration_info call
678 @type success: boolean
679 @param success: whether the migration was a success or a failure
682 return self._SingleNodeCall(node, "finalize_migration",
683 [self._InstDict(instance), info, success])
685 @_RpcTimeout(_TMO_SLOW)
686 def call_instance_migrate(self, node, instance, target, live):
687 """Migrate an instance.
689 This is a single-node call.
692 @param node: the node on which the instance is currently running
693 @type instance: C{objects.Instance}
694 @param instance: the instance definition
696 @param target: the target node name
698 @param live: whether the migration should be done live or not (the
699 interpretation of this parameter is left to the hypervisor)
702 return self._SingleNodeCall(node, "instance_migrate",
703 [self._InstDict(instance), target, live])
705 @_RpcTimeout(_TMO_NORMAL)
706 def call_instance_reboot(self, node, inst, reboot_type, shutdown_timeout):
707 """Reboots an instance.
709 This is a single-node call.
712 return self._SingleNodeCall(node, "instance_reboot",
713 [self._InstDict(inst), reboot_type,
716 @_RpcTimeout(_TMO_1DAY)
717 def call_instance_os_add(self, node, inst, reinstall, debug):
718 """Installs an OS on the given instance.
720 This is a single-node call.
723 return self._SingleNodeCall(node, "instance_os_add",
724 [self._InstDict(inst), reinstall, debug])
726 @_RpcTimeout(_TMO_SLOW)
727 def call_instance_run_rename(self, node, inst, old_name, debug):
728 """Run the OS rename script for an instance.
730 This is a single-node call.
733 return self._SingleNodeCall(node, "instance_run_rename",
734 [self._InstDict(inst), old_name, debug])
736 @_RpcTimeout(_TMO_URGENT)
737 def call_instance_info(self, node, instance, hname):
738 """Returns information about a single instance.
740 This is a single-node call.
743 @param node: the list of nodes to query
744 @type instance: string
745 @param instance: the instance name
747 @param hname: the hypervisor type of the instance
750 return self._SingleNodeCall(node, "instance_info", [instance, hname])
752 @_RpcTimeout(_TMO_NORMAL)
753 def call_instance_migratable(self, node, instance):
754 """Checks whether the given instance can be migrated.
756 This is a single-node call.
758 @param node: the node to query
759 @type instance: L{objects.Instance}
760 @param instance: the instance to check
764 return self._SingleNodeCall(node, "instance_migratable",
765 [self._InstDict(instance)])
767 @_RpcTimeout(_TMO_URGENT)
768 def call_all_instances_info(self, node_list, hypervisor_list):
769 """Returns information about all instances on the given nodes.
771 This is a multi-node call.
773 @type node_list: list
774 @param node_list: the list of nodes to query
775 @type hypervisor_list: list
776 @param hypervisor_list: the hypervisors to query for instances
779 return self._MultiNodeCall(node_list, "all_instances_info",
782 @_RpcTimeout(_TMO_URGENT)
783 def call_instance_list(self, node_list, hypervisor_list):
784 """Returns the list of running instances on a given node.
786 This is a multi-node call.
788 @type node_list: list
789 @param node_list: the list of nodes to query
790 @type hypervisor_list: list
791 @param hypervisor_list: the hypervisors to query for instances
794 return self._MultiNodeCall(node_list, "instance_list", [hypervisor_list])
796 @_RpcTimeout(_TMO_FAST)
797 def call_node_tcp_ping(self, node, source, target, port, timeout,
799 """Do a TcpPing on the remote node
801 This is a single-node call.
804 return self._SingleNodeCall(node, "node_tcp_ping",
805 [source, target, port, timeout,
808 @_RpcTimeout(_TMO_FAST)
809 def call_node_has_ip_address(self, node, address):
810 """Checks if a node has the given IP address.
812 This is a single-node call.
815 return self._SingleNodeCall(node, "node_has_ip_address", [address])
817 @_RpcTimeout(_TMO_URGENT)
818 def call_node_info(self, node_list, vg_name, hypervisor_type):
819 """Return node information.
821 This will return memory information and volume group size and free
824 This is a multi-node call.
826 @type node_list: list
827 @param node_list: the list of nodes to query
828 @type vg_name: C{string}
829 @param vg_name: the name of the volume group to ask for disk space
831 @type hypervisor_type: C{str}
832 @param hypervisor_type: the name of the hypervisor to ask for
836 return self._MultiNodeCall(node_list, "node_info",
837 [vg_name, hypervisor_type])
839 @_RpcTimeout(_TMO_NORMAL)
840 def call_node_add(self, node, dsa, dsapub, rsa, rsapub, ssh, sshpub):
841 """Add a node to the cluster.
843 This is a single-node call.
846 return self._SingleNodeCall(node, "node_add",
847 [dsa, dsapub, rsa, rsapub, ssh, sshpub])
849 @_RpcTimeout(_TMO_NORMAL)
850 def call_node_verify(self, node_list, checkdict, cluster_name):
851 """Request verification of given parameters.
853 This is a multi-node call.
856 return self._MultiNodeCall(node_list, "node_verify",
857 [checkdict, cluster_name])
860 @_RpcTimeout(_TMO_FAST)
861 def call_node_start_master(cls, node, start_daemons, no_voting):
862 """Tells a node to activate itself as a master.
864 This is a single-node call.
867 return cls._StaticSingleNodeCall(node, "node_start_master",
868 [start_daemons, no_voting])
871 @_RpcTimeout(_TMO_FAST)
872 def call_node_stop_master(cls, node, stop_daemons):
873 """Tells a node to demote itself from master status.
875 This is a single-node call.
878 return cls._StaticSingleNodeCall(node, "node_stop_master", [stop_daemons])
881 @_RpcTimeout(_TMO_URGENT)
882 def call_master_info(cls, node_list):
883 """Query master info.
885 This is a multi-node call.
888 # TODO: should this method query down nodes?
889 return cls._StaticMultiNodeCall(node_list, "master_info", [])
892 @_RpcTimeout(_TMO_URGENT)
893 def call_version(cls, node_list):
894 """Query node version.
896 This is a multi-node call.
899 return cls._StaticMultiNodeCall(node_list, "version", [])
901 @_RpcTimeout(_TMO_NORMAL)
902 def call_blockdev_create(self, node, bdev, size, owner, on_primary, info):
903 """Request creation of a given block device.
905 This is a single-node call.
908 return self._SingleNodeCall(node, "blockdev_create",
909 [bdev.ToDict(), size, owner, on_primary, info])
911 @_RpcTimeout(_TMO_NORMAL)
912 def call_blockdev_remove(self, node, bdev):
913 """Request removal of a given block device.
915 This is a single-node call.
918 return self._SingleNodeCall(node, "blockdev_remove", [bdev.ToDict()])
920 @_RpcTimeout(_TMO_NORMAL)
921 def call_blockdev_rename(self, node, devlist):
922 """Request rename of the given block devices.
924 This is a single-node call.
927 return self._SingleNodeCall(node, "blockdev_rename",
928 [(d.ToDict(), uid) for d, uid in devlist])
930 @_RpcTimeout(_TMO_NORMAL)
931 def call_blockdev_assemble(self, node, disk, owner, on_primary):
932 """Request assembling of a given block device.
934 This is a single-node call.
937 return self._SingleNodeCall(node, "blockdev_assemble",
938 [disk.ToDict(), owner, on_primary])
940 @_RpcTimeout(_TMO_NORMAL)
941 def call_blockdev_shutdown(self, node, disk):
942 """Request shutdown of a given block device.
944 This is a single-node call.
947 return self._SingleNodeCall(node, "blockdev_shutdown", [disk.ToDict()])
949 @_RpcTimeout(_TMO_NORMAL)
950 def call_blockdev_addchildren(self, node, bdev, ndevs):
951 """Request adding a list of children to a (mirroring) device.
953 This is a single-node call.
956 return self._SingleNodeCall(node, "blockdev_addchildren",
958 [disk.ToDict() for disk in ndevs]])
960 @_RpcTimeout(_TMO_NORMAL)
961 def call_blockdev_removechildren(self, node, bdev, ndevs):
962 """Request removing a list of children from a (mirroring) device.
964 This is a single-node call.
967 return self._SingleNodeCall(node, "blockdev_removechildren",
969 [disk.ToDict() for disk in ndevs]])
971 @_RpcTimeout(_TMO_NORMAL)
972 def call_blockdev_getmirrorstatus(self, node, disks):
973 """Request status of a (mirroring) device.
975 This is a single-node call.
978 result = self._SingleNodeCall(node, "blockdev_getmirrorstatus",
979 [dsk.ToDict() for dsk in disks])
980 if not result.fail_msg:
981 result.payload = [objects.BlockDevStatus.FromDict(i)
982 for i in result.payload]
985 @_RpcTimeout(_TMO_NORMAL)
986 def call_blockdev_find(self, node, disk):
987 """Request identification of a given block device.
989 This is a single-node call.
992 result = self._SingleNodeCall(node, "blockdev_find", [disk.ToDict()])
993 if not result.fail_msg and result.payload is not None:
994 result.payload = objects.BlockDevStatus.FromDict(result.payload)
997 @_RpcTimeout(_TMO_NORMAL)
998 def call_blockdev_close(self, node, instance_name, disks):
999 """Closes the given block devices.
1001 This is a single-node call.
1004 params = [instance_name, [cf.ToDict() for cf in disks]]
1005 return self._SingleNodeCall(node, "blockdev_close", params)
1007 @_RpcTimeout(_TMO_NORMAL)
1008 def call_blockdev_getsizes(self, node, disks):
1009 """Returns the size of the given disks.
1011 This is a single-node call.
1014 params = [[cf.ToDict() for cf in disks]]
1015 return self._SingleNodeCall(node, "blockdev_getsize", params)
1017 @_RpcTimeout(_TMO_NORMAL)
1018 def call_drbd_disconnect_net(self, node_list, nodes_ip, disks):
1019 """Disconnects the network of the given drbd devices.
1021 This is a multi-node call.
1024 return self._MultiNodeCall(node_list, "drbd_disconnect_net",
1025 [nodes_ip, [cf.ToDict() for cf in disks]])
1027 @_RpcTimeout(_TMO_NORMAL)
1028 def call_drbd_attach_net(self, node_list, nodes_ip,
1029 disks, instance_name, multimaster):
1030 """Disconnects the given drbd devices.
1032 This is a multi-node call.
1035 return self._MultiNodeCall(node_list, "drbd_attach_net",
1036 [nodes_ip, [cf.ToDict() for cf in disks],
1037 instance_name, multimaster])
1039 @_RpcTimeout(_TMO_SLOW)
1040 def call_drbd_wait_sync(self, node_list, nodes_ip, disks):
1041 """Waits for the synchronization of drbd devices is complete.
1043 This is a multi-node call.
1046 return self._MultiNodeCall(node_list, "drbd_wait_sync",
1047 [nodes_ip, [cf.ToDict() for cf in disks]])
1049 @_RpcTimeout(_TMO_URGENT)
1050 def call_drbd_helper(self, node_list):
1051 """Gets drbd helper.
1053 This is a multi-node call.
1056 return self._MultiNodeCall(node_list, "drbd_helper", [])
1059 @_RpcTimeout(_TMO_NORMAL)
1060 def call_upload_file(cls, node_list, file_name, address_list=None):
1063 The node will refuse the operation in case the file is not on the
1066 This is a multi-node call.
1068 @type node_list: list
1069 @param node_list: the list of node names to upload to
1070 @type file_name: str
1071 @param file_name: the filename to upload
1072 @type address_list: list or None
1073 @keyword address_list: an optional list of node addresses, in order
1074 to optimize the RPC speed
1077 file_contents = utils.ReadFile(file_name)
1078 data = cls._Compress(file_contents)
1079 st = os.stat(file_name)
1080 params = [file_name, data, st.st_mode, st.st_uid, st.st_gid,
1081 st.st_atime, st.st_mtime]
1082 return cls._StaticMultiNodeCall(node_list, "upload_file", params,
1083 address_list=address_list)
1086 @_RpcTimeout(_TMO_NORMAL)
1087 def call_write_ssconf_files(cls, node_list, values):
1088 """Write ssconf files.
1090 This is a multi-node call.
1093 return cls._StaticMultiNodeCall(node_list, "write_ssconf_files", [values])
1095 @_RpcTimeout(_TMO_FAST)
1096 def call_os_diagnose(self, node_list):
1097 """Request a diagnose of OS definitions.
1099 This is a multi-node call.
1102 return self._MultiNodeCall(node_list, "os_diagnose", [])
1104 @_RpcTimeout(_TMO_FAST)
1105 def call_os_get(self, node, name):
1106 """Returns an OS definition.
1108 This is a single-node call.
1111 result = self._SingleNodeCall(node, "os_get", [name])
1112 if not result.fail_msg and isinstance(result.payload, dict):
1113 result.payload = objects.OS.FromDict(result.payload)
1116 @_RpcTimeout(_TMO_FAST)
1117 def call_os_validate(self, required, nodes, name, checks, params):
1118 """Run a validation routine for a given OS.
1120 This is a multi-node call.
1123 return self._MultiNodeCall(nodes, "os_validate",
1124 [required, name, checks, params])
1126 @_RpcTimeout(_TMO_NORMAL)
1127 def call_hooks_runner(self, node_list, hpath, phase, env):
1128 """Call the hooks runner.
1131 - op: the OpCode instance
1132 - env: a dictionary with the environment
1134 This is a multi-node call.
1137 params = [hpath, phase, env]
1138 return self._MultiNodeCall(node_list, "hooks_runner", params)
1140 @_RpcTimeout(_TMO_NORMAL)
1141 def call_iallocator_runner(self, node, name, idata):
1142 """Call an iallocator on a remote node
1145 - name: the iallocator name
1146 - input: the json-encoded input string
1148 This is a single-node call.
1151 return self._SingleNodeCall(node, "iallocator_runner", [name, idata])
1153 @_RpcTimeout(_TMO_NORMAL)
1154 def call_blockdev_grow(self, node, cf_bdev, amount):
1155 """Request a snapshot of the given block device.
1157 This is a single-node call.
1160 return self._SingleNodeCall(node, "blockdev_grow",
1161 [cf_bdev.ToDict(), amount])
1163 @_RpcTimeout(_TMO_1DAY)
1164 def call_blockdev_export(self, node, cf_bdev,
1165 dest_node, dest_path, cluster_name):
1166 """Export a given disk to another node.
1168 This is a single-node call.
1171 return self._SingleNodeCall(node, "blockdev_export",
1172 [cf_bdev.ToDict(), dest_node, dest_path,
1175 @_RpcTimeout(_TMO_NORMAL)
1176 def call_blockdev_snapshot(self, node, cf_bdev):
1177 """Request a snapshot of the given block device.
1179 This is a single-node call.
1182 return self._SingleNodeCall(node, "blockdev_snapshot", [cf_bdev.ToDict()])
1184 @_RpcTimeout(_TMO_NORMAL)
1185 def call_finalize_export(self, node, instance, snap_disks):
1186 """Request the completion of an export operation.
1188 This writes the export config file, etc.
1190 This is a single-node call.
1194 for disk in snap_disks:
1195 if isinstance(disk, bool):
1196 flat_disks.append(disk)
1198 flat_disks.append(disk.ToDict())
1200 return self._SingleNodeCall(node, "finalize_export",
1201 [self._InstDict(instance), flat_disks])
1203 @_RpcTimeout(_TMO_FAST)
1204 def call_export_info(self, node, path):
1205 """Queries the export information in a given path.
1207 This is a single-node call.
1210 return self._SingleNodeCall(node, "export_info", [path])
1212 @_RpcTimeout(_TMO_FAST)
1213 def call_export_list(self, node_list):
1214 """Gets the stored exports list.
1216 This is a multi-node call.
1219 return self._MultiNodeCall(node_list, "export_list", [])
1221 @_RpcTimeout(_TMO_FAST)
1222 def call_export_remove(self, node, export):
1223 """Requests removal of a given export.
1225 This is a single-node call.
1228 return self._SingleNodeCall(node, "export_remove", [export])
1231 @_RpcTimeout(_TMO_NORMAL)
1232 def call_node_leave_cluster(cls, node, modify_ssh_setup):
1233 """Requests a node to clean the cluster information it has.
1235 This will remove the configuration information from the ganeti data
1238 This is a single-node call.
1241 return cls._StaticSingleNodeCall(node, "node_leave_cluster",
1244 @_RpcTimeout(_TMO_FAST)
1245 def call_node_volumes(self, node_list):
1246 """Gets all volumes on node(s).
1248 This is a multi-node call.
1251 return self._MultiNodeCall(node_list, "node_volumes", [])
1253 @_RpcTimeout(_TMO_FAST)
1254 def call_node_demote_from_mc(self, node):
1255 """Demote a node from the master candidate role.
1257 This is a single-node call.
1260 return self._SingleNodeCall(node, "node_demote_from_mc", [])
1262 @_RpcTimeout(_TMO_NORMAL)
1263 def call_node_powercycle(self, node, hypervisor):
1264 """Tries to powercycle a node.
1266 This is a single-node call.
1269 return self._SingleNodeCall(node, "node_powercycle", [hypervisor])
1272 def call_test_delay(self, node_list, duration):
1273 """Sleep for a fixed time on given node(s).
1275 This is a multi-node call.
1278 return self._MultiNodeCall(node_list, "test_delay", [duration],
1279 read_timeout=int(duration + 5))
1281 @_RpcTimeout(_TMO_FAST)
1282 def call_file_storage_dir_create(self, node, file_storage_dir):
1283 """Create the given file storage directory.
1285 This is a single-node call.
1288 return self._SingleNodeCall(node, "file_storage_dir_create",
1291 @_RpcTimeout(_TMO_FAST)
1292 def call_file_storage_dir_remove(self, node, file_storage_dir):
1293 """Remove the given file storage directory.
1295 This is a single-node call.
1298 return self._SingleNodeCall(node, "file_storage_dir_remove",
1301 @_RpcTimeout(_TMO_FAST)
1302 def call_file_storage_dir_rename(self, node, old_file_storage_dir,
1303 new_file_storage_dir):
1304 """Rename file storage directory.
1306 This is a single-node call.
1309 return self._SingleNodeCall(node, "file_storage_dir_rename",
1310 [old_file_storage_dir, new_file_storage_dir])
1313 @_RpcTimeout(_TMO_FAST)
1314 def call_jobqueue_update(cls, node_list, address_list, file_name, content):
1315 """Update job queue.
1317 This is a multi-node call.
1320 return cls._StaticMultiNodeCall(node_list, "jobqueue_update",
1321 [file_name, cls._Compress(content)],
1322 address_list=address_list)
1325 @_RpcTimeout(_TMO_NORMAL)
1326 def call_jobqueue_purge(cls, node):
1329 This is a single-node call.
1332 return cls._StaticSingleNodeCall(node, "jobqueue_purge", [])
1335 @_RpcTimeout(_TMO_FAST)
1336 def call_jobqueue_rename(cls, node_list, address_list, rename):
1337 """Rename a job queue file.
1339 This is a multi-node call.
1342 return cls._StaticMultiNodeCall(node_list, "jobqueue_rename", rename,
1343 address_list=address_list)
1345 @_RpcTimeout(_TMO_NORMAL)
1346 def call_hypervisor_validate_params(self, node_list, hvname, hvparams):
1347 """Validate the hypervisor params.
1349 This is a multi-node call.
1351 @type node_list: list
1352 @param node_list: the list of nodes to query
1353 @type hvname: string
1354 @param hvname: the hypervisor name
1355 @type hvparams: dict
1356 @param hvparams: the hypervisor parameters to be validated
1359 cluster = self._cfg.GetClusterInfo()
1360 hv_full = objects.FillDict(cluster.hvparams.get(hvname, {}), hvparams)
1361 return self._MultiNodeCall(node_list, "hypervisor_validate_params",
1364 @_RpcTimeout(_TMO_NORMAL)
1365 def call_x509_cert_create(self, node, validity):
1366 """Creates a new X509 certificate for SSL/TLS.
1368 This is a single-node call.
1371 @param validity: Validity in seconds
1374 return self._SingleNodeCall(node, "x509_cert_create", [validity])
1376 @_RpcTimeout(_TMO_NORMAL)
1377 def call_x509_cert_remove(self, node, name):
1378 """Removes a X509 certificate.
1380 This is a single-node call.
1383 @param name: Certificate name
1386 return self._SingleNodeCall(node, "x509_cert_remove", [name])
1388 @_RpcTimeout(_TMO_NORMAL)
1389 def call_import_start(self, node, opts, instance, dest, dest_args):
1390 """Starts a listener for an import.
1392 This is a single-node call.
1395 @param node: Node name
1396 @type instance: C{objects.Instance}
1397 @param instance: Instance object
1400 return self._SingleNodeCall(node, "import_start",
1402 self._InstDict(instance), dest,
1403 _EncodeImportExportIO(dest, dest_args)])
1405 @_RpcTimeout(_TMO_NORMAL)
1406 def call_export_start(self, node, opts, host, port,
1407 instance, source, source_args):
1408 """Starts an export daemon.
1410 This is a single-node call.
1413 @param node: Node name
1414 @type instance: C{objects.Instance}
1415 @param instance: Instance object
1418 return self._SingleNodeCall(node, "export_start",
1419 [opts.ToDict(), host, port,
1420 self._InstDict(instance), source,
1421 _EncodeImportExportIO(source, source_args)])
1423 @_RpcTimeout(_TMO_FAST)
1424 def call_impexp_status(self, node, names):
1425 """Gets the status of an import or export.
1427 This is a single-node call.
1430 @param node: Node name
1431 @type names: List of strings
1432 @param names: Import/export names
1433 @rtype: List of L{objects.ImportExportStatus} instances
1434 @return: Returns a list of the state of each named import/export or None if
1435 a status couldn't be retrieved
1438 result = self._SingleNodeCall(node, "impexp_status", [names])
1440 if not result.fail_msg:
1443 for i in result.payload:
1445 decoded.append(None)
1447 decoded.append(objects.ImportExportStatus.FromDict(i))
1449 result.payload = decoded
1453 @_RpcTimeout(_TMO_NORMAL)
1454 def call_impexp_abort(self, node, name):
1455 """Aborts an import or export.
1457 This is a single-node call.
1460 @param node: Node name
1462 @param name: Import/export name
1465 return self._SingleNodeCall(node, "impexp_abort", [name])
1467 @_RpcTimeout(_TMO_NORMAL)
1468 def call_impexp_cleanup(self, node, name):
1469 """Cleans up after an import or export.
1471 This is a single-node call.
1474 @param node: Node name
1476 @param name: Import/export name
1479 return self._SingleNodeCall(node, "impexp_cleanup", [name])