4 # Copyright (C) 2006, 2007 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Inter-node RPC library.
26 # pylint: disable-msg=C0103,R0201,R0904
27 # C0103: Invalid name, since call_ are not valid
28 # R0201: Method could be a function, we keep all rpcs instance methods
29 # as not to change them back and forth between static/instance methods
30 # if they need to start using instance attributes
31 # R0904: Too many public methods
38 from ganeti import utils
39 from ganeti import objects
40 from ganeti import http
41 from ganeti import serializer
42 from ganeti import constants
43 from ganeti import errors
45 # pylint has a bug here, doesn't see this import
46 import ganeti.http.client # pylint: disable-msg=W0611
49 # Module level variable
52 # Various time constants for the timeout table
53 _TMO_URGENT = 60 # one minute
54 _TMO_FAST = 5 * 60 # five minutes
55 _TMO_NORMAL = 15 * 60 # 15 minutes
56 _TMO_SLOW = 3600 # one hour
60 # Timeout table that will be built later by decorators
61 # Guidelines for choosing timeouts:
62 # - call used during watcher: timeout -> 1min, _TMO_URGENT
63 # - trivial (but be sure it is trivial) (e.g. reading a file): 5min, _TMO_FAST
64 # - other calls: 15 min, _TMO_NORMAL
65 # - special calls (instance add, etc.): either _TMO_SLOW (1h) or huge timeouts
72 """Initializes the module-global HTTP client manager.
74 Must be called before using any RPC function.
77 global _http_manager # pylint: disable-msg=W0603
79 assert not _http_manager, "RPC module initialized more than once"
83 _http_manager = http.client.HttpClientManager()
87 """Stops the module-global HTTP client manager.
89 Must be called before quitting the program.
92 global _http_manager # pylint: disable-msg=W0603
95 _http_manager.Shutdown()
99 def _RpcTimeout(secs):
100 """Timeout decorator.
102 When applied to a rpc call_* function, it updates the global timeout
103 table with the given function/timeout.
108 assert name.startswith("call_")
109 _TIMEOUTS[name[len("call_"):]] = secs
114 class RpcResult(object):
117 This class holds an RPC result. It is needed since in multi-node
118 calls we can't raise an exception just because one one out of many
119 failed, and therefore we use this class to encapsulate the result.
121 @ivar data: the data payload, for successful results, or None
122 @ivar call: the name of the RPC call
123 @ivar node: the name of the node to which we made the call
124 @ivar offline: whether the operation failed because the node was
125 offline, as opposed to actual failure; offline=True will always
126 imply failed=True, in order to allow simpler checking if
127 the user doesn't care about the exact failure mode
128 @ivar fail_msg: the error message if the call failed
131 def __init__(self, data=None, failed=False, offline=False,
132 call=None, node=None):
133 self.offline = offline
138 self.fail_msg = "Node is marked offline"
139 self.data = self.payload = None
141 self.fail_msg = self._EnsureErr(data)
142 self.data = self.payload = None
145 if not isinstance(self.data, (tuple, list)):
146 self.fail_msg = ("RPC layer error: invalid result type (%s)" %
150 self.fail_msg = ("RPC layer error: invalid result length (%d), "
151 "expected 2" % len(self.data))
153 elif not self.data[0]:
154 self.fail_msg = self._EnsureErr(self.data[1])
159 self.payload = data[1]
161 assert hasattr(self, "call")
162 assert hasattr(self, "data")
163 assert hasattr(self, "fail_msg")
164 assert hasattr(self, "node")
165 assert hasattr(self, "offline")
166 assert hasattr(self, "payload")
170 """Helper to ensure we return a 'True' value for error."""
174 return "No error information"
176 def Raise(self, msg, prereq=False, ecode=None):
177 """If the result has failed, raise an OpExecError.
179 This is used so that LU code doesn't have to check for each
180 result, but instead can call this function.
183 if not self.fail_msg:
186 if not msg: # one could pass None for default message
187 msg = ("Call '%s' to node '%s' has failed: %s" %
188 (self.call, self.node, self.fail_msg))
190 msg = "%s: %s" % (msg, self.fail_msg)
192 ec = errors.OpPrereqError
194 ec = errors.OpExecError
195 if ecode is not None:
199 raise ec(*args) # pylint: disable-msg=W0142
205 This class, given a (remote) method name, a list of parameters and a
206 list of nodes, will contact (in parallel) all nodes, and return a
207 dict of results (key: node name, value: result).
209 One current bug is that generic failure is still signaled by
210 'False' result, which is not good. This overloading of values can
214 def __init__(self, procedure, body, port):
215 assert procedure in _TIMEOUTS, ("New RPC call not declared in the"
217 self.procedure = procedure
223 http.HttpSslParams(ssl_key_path=constants.NODED_CERT_FILE,
224 ssl_cert_path=constants.NODED_CERT_FILE)
226 def ConnectList(self, node_list, address_list=None, read_timeout=None):
227 """Add a list of nodes to the target nodes.
229 @type node_list: list
230 @param node_list: the list of node names to connect
231 @type address_list: list or None
232 @keyword address_list: either None or a list with node addresses,
233 which must have the same length as the node list
234 @type read_timeout: int
235 @param read_timeout: overwrites the default read timeout for the
239 if address_list is None:
240 address_list = [None for _ in node_list]
242 assert len(node_list) == len(address_list), \
243 "Name and address lists should have the same length"
244 for node, address in zip(node_list, address_list):
245 self.ConnectNode(node, address, read_timeout=read_timeout)
247 def ConnectNode(self, name, address=None, read_timeout=None):
248 """Add a node to the target list.
251 @param name: the node name
253 @keyword address: the node address, if known
259 if read_timeout is None:
260 read_timeout = _TIMEOUTS[self.procedure]
263 http.client.HttpClientRequest(address, self.port, http.HTTP_PUT,
264 "/%s" % self.procedure,
266 ssl_params=self._ssl_params,
267 ssl_verify_peer=True,
268 read_timeout=read_timeout)
270 def GetResults(self):
271 """Call nodes and return results.
274 @return: List of RPC results
277 assert _http_manager, "RPC module not initialized"
279 _http_manager.ExecRequests(self.nc.values())
283 for name, req in self.nc.iteritems():
284 if req.success and req.resp_status_code == http.HTTP_OK:
285 results[name] = RpcResult(data=serializer.LoadJson(req.resp_body),
286 node=name, call=self.procedure)
289 # TODO: Better error reporting
295 logging.error("RPC error in %s from node %s: %s",
296 self.procedure, name, msg)
297 results[name] = RpcResult(data=msg, failed=True, node=name,
303 def _EncodeImportExportIO(ieio, ieioargs):
304 """Encodes import/export I/O information.
307 if ieio == constants.IEIO_RAW_DISK:
308 assert len(ieioargs) == 1
309 return (ieioargs[0].ToDict(), )
311 if ieio == constants.IEIO_SCRIPT:
312 assert len(ieioargs) == 2
313 return (ieioargs[0].ToDict(), ieioargs[1])
318 class RpcRunner(object):
319 """RPC runner class"""
321 def __init__(self, cfg):
322 """Initialized the rpc runner.
324 @type cfg: C{config.ConfigWriter}
325 @param cfg: the configuration object that will be used to get data
330 self.port = utils.GetDaemonPort(constants.NODED)
332 def _InstDict(self, instance, hvp=None, bep=None):
333 """Convert the given instance to a dict.
335 This is done via the instance's ToDict() method and additionally
336 we fill the hvparams with the cluster defaults.
338 @type instance: L{objects.Instance}
339 @param instance: an Instance object
340 @type hvp: dict or None
341 @param hvp: a dictionary with overridden hypervisor parameters
342 @type bep: dict or None
343 @param bep: a dictionary with overridden backend parameters
345 @return: the instance dict, with the hvparams filled with the
349 idict = instance.ToDict()
350 cluster = self._cfg.GetClusterInfo()
351 idict["hvparams"] = cluster.FillHV(instance)
353 idict["hvparams"].update(hvp)
354 idict["beparams"] = cluster.FillBE(instance)
356 idict["beparams"].update(bep)
357 for nic in idict["nics"]:
358 nic['nicparams'] = objects.FillDict(
359 cluster.nicparams[constants.PP_DEFAULT],
363 def _ConnectList(self, client, node_list, call, read_timeout=None):
364 """Helper for computing node addresses.
366 @type client: L{ganeti.rpc.Client}
367 @param client: a C{Client} instance
368 @type node_list: list
369 @param node_list: the node list we should connect
371 @param call: the name of the remote procedure call, for filling in
372 correctly any eventual offline nodes' results
373 @type read_timeout: int
374 @param read_timeout: overwrites the default read timeout for the
378 all_nodes = self._cfg.GetAllNodesInfo()
382 for node in node_list:
383 if node in all_nodes:
384 if all_nodes[node].offline:
385 skip_dict[node] = RpcResult(node=node, offline=True, call=call)
387 val = all_nodes[node].primary_ip
390 addr_list.append(val)
391 name_list.append(node)
393 client.ConnectList(name_list, address_list=addr_list,
394 read_timeout=read_timeout)
397 def _ConnectNode(self, client, node, call, read_timeout=None):
398 """Helper for computing one node's address.
400 @type client: L{ganeti.rpc.Client}
401 @param client: a C{Client} instance
403 @param node: the node we should connect
405 @param call: the name of the remote procedure call, for filling in
406 correctly any eventual offline nodes' results
407 @type read_timeout: int
408 @param read_timeout: overwrites the default read timeout for the
412 node_info = self._cfg.GetNodeInfo(node)
413 if node_info is not None:
414 if node_info.offline:
415 return RpcResult(node=node, offline=True, call=call)
416 addr = node_info.primary_ip
419 client.ConnectNode(node, address=addr, read_timeout=read_timeout)
421 def _MultiNodeCall(self, node_list, procedure, args, read_timeout=None):
422 """Helper for making a multi-node call
425 body = serializer.DumpJson(args, indent=False)
426 c = Client(procedure, body, self.port)
427 skip_dict = self._ConnectList(c, node_list, procedure,
428 read_timeout=read_timeout)
429 skip_dict.update(c.GetResults())
433 def _StaticMultiNodeCall(cls, node_list, procedure, args,
434 address_list=None, read_timeout=None):
435 """Helper for making a multi-node static call
438 body = serializer.DumpJson(args, indent=False)
439 c = Client(procedure, body, utils.GetDaemonPort(constants.NODED))
440 c.ConnectList(node_list, address_list=address_list,
441 read_timeout=read_timeout)
442 return c.GetResults()
444 def _SingleNodeCall(self, node, procedure, args, read_timeout=None):
445 """Helper for making a single-node call
448 body = serializer.DumpJson(args, indent=False)
449 c = Client(procedure, body, self.port)
450 result = self._ConnectNode(c, node, procedure, read_timeout=read_timeout)
452 # we did connect, node is not offline
453 result = c.GetResults()[node]
457 def _StaticSingleNodeCall(cls, node, procedure, args, read_timeout=None):
458 """Helper for making a single-node static call
461 body = serializer.DumpJson(args, indent=False)
462 c = Client(procedure, body, utils.GetDaemonPort(constants.NODED))
463 c.ConnectNode(node, read_timeout=read_timeout)
464 return c.GetResults()[node]
468 """Compresses a string for transport over RPC.
470 Small amounts of data are not compressed.
475 @return: Encoded data to send
478 # Small amounts of data are not compressed
480 return (constants.RPC_ENCODING_NONE, data)
482 # Compress with zlib and encode in base64
483 return (constants.RPC_ENCODING_ZLIB_BASE64,
484 base64.b64encode(zlib.compress(data, 3)))
490 @_RpcTimeout(_TMO_URGENT)
491 def call_lv_list(self, node_list, vg_name):
492 """Gets the logical volumes present in a given volume group.
494 This is a multi-node call.
497 return self._MultiNodeCall(node_list, "lv_list", [vg_name])
499 @_RpcTimeout(_TMO_URGENT)
500 def call_vg_list(self, node_list):
501 """Gets the volume group list.
503 This is a multi-node call.
506 return self._MultiNodeCall(node_list, "vg_list", [])
508 @_RpcTimeout(_TMO_NORMAL)
509 def call_storage_list(self, node_list, su_name, su_args, name, fields):
510 """Get list of storage units.
512 This is a multi-node call.
515 return self._MultiNodeCall(node_list, "storage_list",
516 [su_name, su_args, name, fields])
518 @_RpcTimeout(_TMO_NORMAL)
519 def call_storage_modify(self, node, su_name, su_args, name, changes):
520 """Modify a storage unit.
522 This is a single-node call.
525 return self._SingleNodeCall(node, "storage_modify",
526 [su_name, su_args, name, changes])
528 @_RpcTimeout(_TMO_NORMAL)
529 def call_storage_execute(self, node, su_name, su_args, name, op):
530 """Executes an operation on a storage unit.
532 This is a single-node call.
535 return self._SingleNodeCall(node, "storage_execute",
536 [su_name, su_args, name, op])
538 @_RpcTimeout(_TMO_URGENT)
539 def call_bridges_exist(self, node, bridges_list):
540 """Checks if a node has all the bridges given.
542 This method checks if all bridges given in the bridges_list are
543 present on the remote node, so that an instance that uses interfaces
544 on those bridges can be started.
546 This is a single-node call.
549 return self._SingleNodeCall(node, "bridges_exist", [bridges_list])
551 @_RpcTimeout(_TMO_NORMAL)
552 def call_instance_start(self, node, instance, hvp, bep):
553 """Starts an instance.
555 This is a single-node call.
558 idict = self._InstDict(instance, hvp=hvp, bep=bep)
559 return self._SingleNodeCall(node, "instance_start", [idict])
561 @_RpcTimeout(_TMO_NORMAL)
562 def call_instance_shutdown(self, node, instance, timeout):
563 """Stops an instance.
565 This is a single-node call.
568 return self._SingleNodeCall(node, "instance_shutdown",
569 [self._InstDict(instance), timeout])
571 @_RpcTimeout(_TMO_NORMAL)
572 def call_migration_info(self, node, instance):
573 """Gather the information necessary to prepare an instance migration.
575 This is a single-node call.
578 @param node: the node on which the instance is currently running
579 @type instance: C{objects.Instance}
580 @param instance: the instance definition
583 return self._SingleNodeCall(node, "migration_info",
584 [self._InstDict(instance)])
586 @_RpcTimeout(_TMO_NORMAL)
587 def call_accept_instance(self, node, instance, info, target):
588 """Prepare a node to accept an instance.
590 This is a single-node call.
593 @param node: the target node for the migration
594 @type instance: C{objects.Instance}
595 @param instance: the instance definition
596 @type info: opaque/hypervisor specific (string/data)
597 @param info: result for the call_migration_info call
599 @param target: target hostname (usually ip address) (on the node itself)
602 return self._SingleNodeCall(node, "accept_instance",
603 [self._InstDict(instance), info, target])
605 @_RpcTimeout(_TMO_NORMAL)
606 def call_finalize_migration(self, node, instance, info, success):
607 """Finalize any target-node migration specific operation.
609 This is called both in case of a successful migration and in case of error
610 (in which case it should abort the migration).
612 This is a single-node call.
615 @param node: the target node for the migration
616 @type instance: C{objects.Instance}
617 @param instance: the instance definition
618 @type info: opaque/hypervisor specific (string/data)
619 @param info: result for the call_migration_info call
620 @type success: boolean
621 @param success: whether the migration was a success or a failure
624 return self._SingleNodeCall(node, "finalize_migration",
625 [self._InstDict(instance), info, success])
627 @_RpcTimeout(_TMO_SLOW)
628 def call_instance_migrate(self, node, instance, target, live):
629 """Migrate an instance.
631 This is a single-node call.
634 @param node: the node on which the instance is currently running
635 @type instance: C{objects.Instance}
636 @param instance: the instance definition
638 @param target: the target node name
640 @param live: whether the migration should be done live or not (the
641 interpretation of this parameter is left to the hypervisor)
644 return self._SingleNodeCall(node, "instance_migrate",
645 [self._InstDict(instance), target, live])
647 @_RpcTimeout(_TMO_NORMAL)
648 def call_instance_reboot(self, node, inst, reboot_type, shutdown_timeout):
649 """Reboots an instance.
651 This is a single-node call.
654 return self._SingleNodeCall(node, "instance_reboot",
655 [self._InstDict(inst), reboot_type,
658 @_RpcTimeout(_TMO_1DAY)
659 def call_instance_os_add(self, node, inst, reinstall, debug):
660 """Installs an OS on the given instance.
662 This is a single-node call.
665 return self._SingleNodeCall(node, "instance_os_add",
666 [self._InstDict(inst), reinstall, debug])
668 @_RpcTimeout(_TMO_SLOW)
669 def call_instance_run_rename(self, node, inst, old_name, debug):
670 """Run the OS rename script for an instance.
672 This is a single-node call.
675 return self._SingleNodeCall(node, "instance_run_rename",
676 [self._InstDict(inst), old_name, debug])
678 @_RpcTimeout(_TMO_URGENT)
679 def call_instance_info(self, node, instance, hname):
680 """Returns information about a single instance.
682 This is a single-node call.
685 @param node: the list of nodes to query
686 @type instance: string
687 @param instance: the instance name
689 @param hname: the hypervisor type of the instance
692 return self._SingleNodeCall(node, "instance_info", [instance, hname])
694 @_RpcTimeout(_TMO_NORMAL)
695 def call_instance_migratable(self, node, instance):
696 """Checks whether the given instance can be migrated.
698 This is a single-node call.
700 @param node: the node to query
701 @type instance: L{objects.Instance}
702 @param instance: the instance to check
706 return self._SingleNodeCall(node, "instance_migratable",
707 [self._InstDict(instance)])
709 @_RpcTimeout(_TMO_URGENT)
710 def call_all_instances_info(self, node_list, hypervisor_list):
711 """Returns information about all instances on the given nodes.
713 This is a multi-node call.
715 @type node_list: list
716 @param node_list: the list of nodes to query
717 @type hypervisor_list: list
718 @param hypervisor_list: the hypervisors to query for instances
721 return self._MultiNodeCall(node_list, "all_instances_info",
724 @_RpcTimeout(_TMO_URGENT)
725 def call_instance_list(self, node_list, hypervisor_list):
726 """Returns the list of running instances on a given node.
728 This is a multi-node call.
730 @type node_list: list
731 @param node_list: the list of nodes to query
732 @type hypervisor_list: list
733 @param hypervisor_list: the hypervisors to query for instances
736 return self._MultiNodeCall(node_list, "instance_list", [hypervisor_list])
738 @_RpcTimeout(_TMO_FAST)
739 def call_node_tcp_ping(self, node, source, target, port, timeout,
741 """Do a TcpPing on the remote node
743 This is a single-node call.
746 return self._SingleNodeCall(node, "node_tcp_ping",
747 [source, target, port, timeout,
750 @_RpcTimeout(_TMO_FAST)
751 def call_node_has_ip_address(self, node, address):
752 """Checks if a node has the given IP address.
754 This is a single-node call.
757 return self._SingleNodeCall(node, "node_has_ip_address", [address])
759 @_RpcTimeout(_TMO_URGENT)
760 def call_node_info(self, node_list, vg_name, hypervisor_type):
761 """Return node information.
763 This will return memory information and volume group size and free
766 This is a multi-node call.
768 @type node_list: list
769 @param node_list: the list of nodes to query
770 @type vg_name: C{string}
771 @param vg_name: the name of the volume group to ask for disk space
773 @type hypervisor_type: C{str}
774 @param hypervisor_type: the name of the hypervisor to ask for
778 return self._MultiNodeCall(node_list, "node_info",
779 [vg_name, hypervisor_type])
781 @_RpcTimeout(_TMO_NORMAL)
782 def call_node_add(self, node, dsa, dsapub, rsa, rsapub, ssh, sshpub):
783 """Add a node to the cluster.
785 This is a single-node call.
788 return self._SingleNodeCall(node, "node_add",
789 [dsa, dsapub, rsa, rsapub, ssh, sshpub])
791 @_RpcTimeout(_TMO_NORMAL)
792 def call_node_verify(self, node_list, checkdict, cluster_name):
793 """Request verification of given parameters.
795 This is a multi-node call.
798 return self._MultiNodeCall(node_list, "node_verify",
799 [checkdict, cluster_name])
802 @_RpcTimeout(_TMO_FAST)
803 def call_node_start_master(cls, node, start_daemons, no_voting):
804 """Tells a node to activate itself as a master.
806 This is a single-node call.
809 return cls._StaticSingleNodeCall(node, "node_start_master",
810 [start_daemons, no_voting])
813 @_RpcTimeout(_TMO_FAST)
814 def call_node_stop_master(cls, node, stop_daemons):
815 """Tells a node to demote itself from master status.
817 This is a single-node call.
820 return cls._StaticSingleNodeCall(node, "node_stop_master", [stop_daemons])
823 @_RpcTimeout(_TMO_URGENT)
824 def call_master_info(cls, node_list):
825 """Query master info.
827 This is a multi-node call.
830 # TODO: should this method query down nodes?
831 return cls._StaticMultiNodeCall(node_list, "master_info", [])
834 @_RpcTimeout(_TMO_URGENT)
835 def call_version(cls, node_list):
836 """Query node version.
838 This is a multi-node call.
841 return cls._StaticMultiNodeCall(node_list, "version", [])
843 @_RpcTimeout(_TMO_NORMAL)
844 def call_blockdev_create(self, node, bdev, size, owner, on_primary, info):
845 """Request creation of a given block device.
847 This is a single-node call.
850 return self._SingleNodeCall(node, "blockdev_create",
851 [bdev.ToDict(), size, owner, on_primary, info])
853 @_RpcTimeout(_TMO_NORMAL)
854 def call_blockdev_remove(self, node, bdev):
855 """Request removal of a given block device.
857 This is a single-node call.
860 return self._SingleNodeCall(node, "blockdev_remove", [bdev.ToDict()])
862 @_RpcTimeout(_TMO_NORMAL)
863 def call_blockdev_rename(self, node, devlist):
864 """Request rename of the given block devices.
866 This is a single-node call.
869 return self._SingleNodeCall(node, "blockdev_rename",
870 [(d.ToDict(), uid) for d, uid in devlist])
872 @_RpcTimeout(_TMO_NORMAL)
873 def call_blockdev_assemble(self, node, disk, owner, on_primary):
874 """Request assembling of a given block device.
876 This is a single-node call.
879 return self._SingleNodeCall(node, "blockdev_assemble",
880 [disk.ToDict(), owner, on_primary])
882 @_RpcTimeout(_TMO_NORMAL)
883 def call_blockdev_shutdown(self, node, disk):
884 """Request shutdown of a given block device.
886 This is a single-node call.
889 return self._SingleNodeCall(node, "blockdev_shutdown", [disk.ToDict()])
891 @_RpcTimeout(_TMO_NORMAL)
892 def call_blockdev_addchildren(self, node, bdev, ndevs):
893 """Request adding a list of children to a (mirroring) device.
895 This is a single-node call.
898 return self._SingleNodeCall(node, "blockdev_addchildren",
900 [disk.ToDict() for disk in ndevs]])
902 @_RpcTimeout(_TMO_NORMAL)
903 def call_blockdev_removechildren(self, node, bdev, ndevs):
904 """Request removing a list of children from a (mirroring) device.
906 This is a single-node call.
909 return self._SingleNodeCall(node, "blockdev_removechildren",
911 [disk.ToDict() for disk in ndevs]])
913 @_RpcTimeout(_TMO_NORMAL)
914 def call_blockdev_getmirrorstatus(self, node, disks):
915 """Request status of a (mirroring) device.
917 This is a single-node call.
920 result = self._SingleNodeCall(node, "blockdev_getmirrorstatus",
921 [dsk.ToDict() for dsk in disks])
922 if not result.fail_msg:
923 result.payload = [objects.BlockDevStatus.FromDict(i)
924 for i in result.payload]
927 @_RpcTimeout(_TMO_NORMAL)
928 def call_blockdev_find(self, node, disk):
929 """Request identification of a given block device.
931 This is a single-node call.
934 result = self._SingleNodeCall(node, "blockdev_find", [disk.ToDict()])
935 if not result.fail_msg and result.payload is not None:
936 result.payload = objects.BlockDevStatus.FromDict(result.payload)
939 @_RpcTimeout(_TMO_NORMAL)
940 def call_blockdev_close(self, node, instance_name, disks):
941 """Closes the given block devices.
943 This is a single-node call.
946 params = [instance_name, [cf.ToDict() for cf in disks]]
947 return self._SingleNodeCall(node, "blockdev_close", params)
949 @_RpcTimeout(_TMO_NORMAL)
950 def call_blockdev_getsizes(self, node, disks):
951 """Returns the size of the given disks.
953 This is a single-node call.
956 params = [[cf.ToDict() for cf in disks]]
957 return self._SingleNodeCall(node, "blockdev_getsize", params)
959 @_RpcTimeout(_TMO_NORMAL)
960 def call_drbd_disconnect_net(self, node_list, nodes_ip, disks):
961 """Disconnects the network of the given drbd devices.
963 This is a multi-node call.
966 return self._MultiNodeCall(node_list, "drbd_disconnect_net",
967 [nodes_ip, [cf.ToDict() for cf in disks]])
969 @_RpcTimeout(_TMO_NORMAL)
970 def call_drbd_attach_net(self, node_list, nodes_ip,
971 disks, instance_name, multimaster):
972 """Disconnects the given drbd devices.
974 This is a multi-node call.
977 return self._MultiNodeCall(node_list, "drbd_attach_net",
978 [nodes_ip, [cf.ToDict() for cf in disks],
979 instance_name, multimaster])
981 @_RpcTimeout(_TMO_SLOW)
982 def call_drbd_wait_sync(self, node_list, nodes_ip, disks):
983 """Waits for the synchronization of drbd devices is complete.
985 This is a multi-node call.
988 return self._MultiNodeCall(node_list, "drbd_wait_sync",
989 [nodes_ip, [cf.ToDict() for cf in disks]])
992 @_RpcTimeout(_TMO_NORMAL)
993 def call_upload_file(cls, node_list, file_name, address_list=None):
996 The node will refuse the operation in case the file is not on the
999 This is a multi-node call.
1001 @type node_list: list
1002 @param node_list: the list of node names to upload to
1003 @type file_name: str
1004 @param file_name: the filename to upload
1005 @type address_list: list or None
1006 @keyword address_list: an optional list of node addresses, in order
1007 to optimize the RPC speed
1010 file_contents = utils.ReadFile(file_name)
1011 data = cls._Compress(file_contents)
1012 st = os.stat(file_name)
1013 params = [file_name, data, st.st_mode, st.st_uid, st.st_gid,
1014 st.st_atime, st.st_mtime]
1015 return cls._StaticMultiNodeCall(node_list, "upload_file", params,
1016 address_list=address_list)
1019 @_RpcTimeout(_TMO_NORMAL)
1020 def call_write_ssconf_files(cls, node_list, values):
1021 """Write ssconf files.
1023 This is a multi-node call.
1026 return cls._StaticMultiNodeCall(node_list, "write_ssconf_files", [values])
1028 @_RpcTimeout(_TMO_FAST)
1029 def call_os_diagnose(self, node_list):
1030 """Request a diagnose of OS definitions.
1032 This is a multi-node call.
1035 return self._MultiNodeCall(node_list, "os_diagnose", [])
1037 @_RpcTimeout(_TMO_FAST)
1038 def call_os_get(self, node, name):
1039 """Returns an OS definition.
1041 This is a single-node call.
1044 result = self._SingleNodeCall(node, "os_get", [name])
1045 if not result.fail_msg and isinstance(result.payload, dict):
1046 result.payload = objects.OS.FromDict(result.payload)
1049 @_RpcTimeout(_TMO_NORMAL)
1050 def call_hooks_runner(self, node_list, hpath, phase, env):
1051 """Call the hooks runner.
1054 - op: the OpCode instance
1055 - env: a dictionary with the environment
1057 This is a multi-node call.
1060 params = [hpath, phase, env]
1061 return self._MultiNodeCall(node_list, "hooks_runner", params)
1063 @_RpcTimeout(_TMO_NORMAL)
1064 def call_iallocator_runner(self, node, name, idata):
1065 """Call an iallocator on a remote node
1068 - name: the iallocator name
1069 - input: the json-encoded input string
1071 This is a single-node call.
1074 return self._SingleNodeCall(node, "iallocator_runner", [name, idata])
1076 @_RpcTimeout(_TMO_NORMAL)
1077 def call_blockdev_grow(self, node, cf_bdev, amount):
1078 """Request a snapshot of the given block device.
1080 This is a single-node call.
1083 return self._SingleNodeCall(node, "blockdev_grow",
1084 [cf_bdev.ToDict(), amount])
1086 @_RpcTimeout(_TMO_1DAY)
1087 def call_blockdev_export(self, node, cf_bdev,
1088 dest_node, dest_path, cluster_name):
1089 """Export a given disk to another node.
1091 This is a single-node call.
1094 return self._SingleNodeCall(node, "blockdev_export",
1095 [cf_bdev.ToDict(), dest_node, dest_path,
1098 @_RpcTimeout(_TMO_NORMAL)
1099 def call_blockdev_snapshot(self, node, cf_bdev):
1100 """Request a snapshot of the given block device.
1102 This is a single-node call.
1105 return self._SingleNodeCall(node, "blockdev_snapshot", [cf_bdev.ToDict()])
1107 @_RpcTimeout(_TMO_NORMAL)
1108 def call_finalize_export(self, node, instance, snap_disks):
1109 """Request the completion of an export operation.
1111 This writes the export config file, etc.
1113 This is a single-node call.
1117 for disk in snap_disks:
1118 if isinstance(disk, bool):
1119 flat_disks.append(disk)
1121 flat_disks.append(disk.ToDict())
1123 return self._SingleNodeCall(node, "finalize_export",
1124 [self._InstDict(instance), flat_disks])
1126 @_RpcTimeout(_TMO_FAST)
1127 def call_export_info(self, node, path):
1128 """Queries the export information in a given path.
1130 This is a single-node call.
1133 return self._SingleNodeCall(node, "export_info", [path])
1135 @_RpcTimeout(_TMO_FAST)
1136 def call_export_list(self, node_list):
1137 """Gets the stored exports list.
1139 This is a multi-node call.
1142 return self._MultiNodeCall(node_list, "export_list", [])
1144 @_RpcTimeout(_TMO_FAST)
1145 def call_export_remove(self, node, export):
1146 """Requests removal of a given export.
1148 This is a single-node call.
1151 return self._SingleNodeCall(node, "export_remove", [export])
1154 @_RpcTimeout(_TMO_NORMAL)
1155 def call_node_leave_cluster(cls, node, modify_ssh_setup):
1156 """Requests a node to clean the cluster information it has.
1158 This will remove the configuration information from the ganeti data
1161 This is a single-node call.
1164 return cls._StaticSingleNodeCall(node, "node_leave_cluster",
1167 @_RpcTimeout(_TMO_FAST)
1168 def call_node_volumes(self, node_list):
1169 """Gets all volumes on node(s).
1171 This is a multi-node call.
1174 return self._MultiNodeCall(node_list, "node_volumes", [])
1176 @_RpcTimeout(_TMO_FAST)
1177 def call_node_demote_from_mc(self, node):
1178 """Demote a node from the master candidate role.
1180 This is a single-node call.
1183 return self._SingleNodeCall(node, "node_demote_from_mc", [])
1185 @_RpcTimeout(_TMO_NORMAL)
1186 def call_node_powercycle(self, node, hypervisor):
1187 """Tries to powercycle a node.
1189 This is a single-node call.
1192 return self._SingleNodeCall(node, "node_powercycle", [hypervisor])
1195 def call_test_delay(self, node_list, duration):
1196 """Sleep for a fixed time on given node(s).
1198 This is a multi-node call.
1201 return self._MultiNodeCall(node_list, "test_delay", [duration],
1202 read_timeout=int(duration + 5))
1204 @_RpcTimeout(_TMO_FAST)
1205 def call_file_storage_dir_create(self, node, file_storage_dir):
1206 """Create the given file storage directory.
1208 This is a single-node call.
1211 return self._SingleNodeCall(node, "file_storage_dir_create",
1214 @_RpcTimeout(_TMO_FAST)
1215 def call_file_storage_dir_remove(self, node, file_storage_dir):
1216 """Remove the given file storage directory.
1218 This is a single-node call.
1221 return self._SingleNodeCall(node, "file_storage_dir_remove",
1224 @_RpcTimeout(_TMO_FAST)
1225 def call_file_storage_dir_rename(self, node, old_file_storage_dir,
1226 new_file_storage_dir):
1227 """Rename file storage directory.
1229 This is a single-node call.
1232 return self._SingleNodeCall(node, "file_storage_dir_rename",
1233 [old_file_storage_dir, new_file_storage_dir])
1236 @_RpcTimeout(_TMO_FAST)
1237 def call_jobqueue_update(cls, node_list, address_list, file_name, content):
1238 """Update job queue.
1240 This is a multi-node call.
1243 return cls._StaticMultiNodeCall(node_list, "jobqueue_update",
1244 [file_name, cls._Compress(content)],
1245 address_list=address_list)
1248 @_RpcTimeout(_TMO_NORMAL)
1249 def call_jobqueue_purge(cls, node):
1252 This is a single-node call.
1255 return cls._StaticSingleNodeCall(node, "jobqueue_purge", [])
1258 @_RpcTimeout(_TMO_FAST)
1259 def call_jobqueue_rename(cls, node_list, address_list, rename):
1260 """Rename a job queue file.
1262 This is a multi-node call.
1265 return cls._StaticMultiNodeCall(node_list, "jobqueue_rename", rename,
1266 address_list=address_list)
1268 @_RpcTimeout(_TMO_NORMAL)
1269 def call_hypervisor_validate_params(self, node_list, hvname, hvparams):
1270 """Validate the hypervisor params.
1272 This is a multi-node call.
1274 @type node_list: list
1275 @param node_list: the list of nodes to query
1276 @type hvname: string
1277 @param hvname: the hypervisor name
1278 @type hvparams: dict
1279 @param hvparams: the hypervisor parameters to be validated
1282 cluster = self._cfg.GetClusterInfo()
1283 hv_full = objects.FillDict(cluster.hvparams.get(hvname, {}), hvparams)
1284 return self._MultiNodeCall(node_list, "hypervisor_validate_params",
1287 @_RpcTimeout(_TMO_NORMAL)
1288 def call_x509_cert_create(self, node, validity):
1289 """Creates a new X509 certificate for SSL/TLS.
1291 This is a single-node call.
1294 @param validity: Validity in seconds
1297 return self._SingleNodeCall(node, "x509_cert_create", [validity])
1299 @_RpcTimeout(_TMO_NORMAL)
1300 def call_x509_cert_remove(self, node, name):
1301 """Removes a X509 certificate.
1303 This is a single-node call.
1306 @param name: Certificate name
1309 return self._SingleNodeCall(node, "x509_cert_remove", [name])
1311 @_RpcTimeout(_TMO_NORMAL)
1312 def call_import_start(self, node, opts, instance, dest, dest_args):
1313 """Starts a listener for an import.
1315 This is a single-node call.
1318 @param node: Node name
1319 @type instance: C{objects.Instance}
1320 @param instance: Instance object
1323 return self._SingleNodeCall(node, "import_start",
1325 self._InstDict(instance), dest,
1326 _EncodeImportExportIO(dest, dest_args)])
1328 @_RpcTimeout(_TMO_NORMAL)
1329 def call_export_start(self, node, opts, host, port,
1330 instance, source, source_args):
1331 """Starts an export daemon.
1333 This is a single-node call.
1336 @param node: Node name
1337 @type instance: C{objects.Instance}
1338 @param instance: Instance object
1341 return self._SingleNodeCall(node, "export_start",
1342 [opts.ToDict(), host, port,
1343 self._InstDict(instance), source,
1344 _EncodeImportExportIO(source, source_args)])
1346 @_RpcTimeout(_TMO_FAST)
1347 def call_impexp_status(self, node, names):
1348 """Gets the status of an import or export.
1350 This is a single-node call.
1353 @param node: Node name
1354 @type names: List of strings
1355 @param names: Import/export names
1356 @rtype: List of L{objects.ImportExportStatus} instances
1357 @return: Returns a list of the state of each named import/export or None if
1358 a status couldn't be retrieved
1361 result = self._SingleNodeCall(node, "impexp_status", [names])
1363 if not result.fail_msg:
1366 for i in result.payload:
1368 decoded.append(None)
1370 decoded.append(objects.ImportExportStatus.FromDict(i))
1372 result.payload = decoded
1376 @_RpcTimeout(_TMO_NORMAL)
1377 def call_impexp_abort(self, node, name):
1378 """Aborts an import or export.
1380 This is a single-node call.
1383 @param node: Node name
1385 @param name: Import/export name
1388 return self._SingleNodeCall(node, "impexp_abort", [name])
1390 @_RpcTimeout(_TMO_NORMAL)
1391 def call_impexp_cleanup(self, node, name):
1392 """Cleans up after an import or export.
1394 This is a single-node call.
1397 @param node: Node name
1399 @param name: Import/export name
1402 return self._SingleNodeCall(node, "impexp_cleanup", [name])