4 # Copyright (C) 2006, 2007 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Inter-node RPC library.
26 # pylint: disable-msg=C0103,R0201,R0904
27 # C0103: Invalid name, since call_ are not valid
28 # R0201: Method could be a function, we keep all rpcs instance methods
29 # as not to change them back and forth between static/instance methods
30 # if they need to start using instance attributes
31 # R0904: Too many public methods
38 from ganeti import utils
39 from ganeti import objects
40 from ganeti import http
41 from ganeti import serializer
42 from ganeti import constants
43 from ganeti import errors
45 # pylint has a bug here, doesn't see this import
46 import ganeti.http.client # pylint: disable-msg=W0611
49 # Module level variable
54 """Initializes the module-global HTTP client manager.
56 Must be called before using any RPC function.
59 global _http_manager # pylint: disable-msg=W0603
61 assert not _http_manager, "RPC module initialized more than once"
65 _http_manager = http.client.HttpClientManager()
69 """Stops the module-global HTTP client manager.
71 Must be called before quitting the program.
74 global _http_manager # pylint: disable-msg=W0603
77 _http_manager.Shutdown()
81 class RpcResult(object):
84 This class holds an RPC result. It is needed since in multi-node
85 calls we can't raise an exception just because one one out of many
86 failed, and therefore we use this class to encapsulate the result.
88 @ivar data: the data payload, for successful results, or None
89 @ivar call: the name of the RPC call
90 @ivar node: the name of the node to which we made the call
91 @ivar offline: whether the operation failed because the node was
92 offline, as opposed to actual failure; offline=True will always
93 imply failed=True, in order to allow simpler checking if
94 the user doesn't care about the exact failure mode
95 @ivar fail_msg: the error message if the call failed
98 def __init__(self, data=None, failed=False, offline=False,
99 call=None, node=None):
100 self.offline = offline
105 self.fail_msg = "Node is marked offline"
106 self.data = self.payload = None
108 self.fail_msg = self._EnsureErr(data)
109 self.data = self.payload = None
112 if not isinstance(self.data, (tuple, list)):
113 self.fail_msg = ("RPC layer error: invalid result type (%s)" %
117 self.fail_msg = ("RPC layer error: invalid result length (%d), "
118 "expected 2" % len(self.data))
120 elif not self.data[0]:
121 self.fail_msg = self._EnsureErr(self.data[1])
126 self.payload = data[1]
128 assert hasattr(self, "call")
129 assert hasattr(self, "data")
130 assert hasattr(self, "fail_msg")
131 assert hasattr(self, "node")
132 assert hasattr(self, "offline")
133 assert hasattr(self, "payload")
137 """Helper to ensure we return a 'True' value for error."""
141 return "No error information"
143 def Raise(self, msg, prereq=False, ecode=None):
144 """If the result has failed, raise an OpExecError.
146 This is used so that LU code doesn't have to check for each
147 result, but instead can call this function.
150 if not self.fail_msg:
153 if not msg: # one could pass None for default message
154 msg = ("Call '%s' to node '%s' has failed: %s" %
155 (self.call, self.node, self.fail_msg))
157 msg = "%s: %s" % (msg, self.fail_msg)
159 ec = errors.OpPrereqError
161 ec = errors.OpExecError
162 if ecode is not None:
166 raise ec(*args) # pylint: disable-msg=W0142
172 This class, given a (remote) method name, a list of parameters and a
173 list of nodes, will contact (in parallel) all nodes, and return a
174 dict of results (key: node name, value: result).
176 One current bug is that generic failure is still signaled by
177 'False' result, which is not good. This overloading of values can
181 def __init__(self, procedure, body, port):
182 self.procedure = procedure
188 http.HttpSslParams(ssl_key_path=constants.NODED_CERT_FILE,
189 ssl_cert_path=constants.NODED_CERT_FILE)
191 def ConnectList(self, node_list, address_list=None):
192 """Add a list of nodes to the target nodes.
194 @type node_list: list
195 @param node_list: the list of node names to connect
196 @type address_list: list or None
197 @keyword address_list: either None or a list with node addresses,
198 which must have the same length as the node list
201 if address_list is None:
202 address_list = [None for _ in node_list]
204 assert len(node_list) == len(address_list), \
205 "Name and address lists should have the same length"
206 for node, address in zip(node_list, address_list):
207 self.ConnectNode(node, address)
209 def ConnectNode(self, name, address=None):
210 """Add a node to the target list.
213 @param name: the node name
215 @keyword address: the node address, if known
222 http.client.HttpClientRequest(address, self.port, http.HTTP_PUT,
223 "/%s" % self.procedure,
225 ssl_params=self._ssl_params,
226 ssl_verify_peer=True)
228 def GetResults(self):
229 """Call nodes and return results.
232 @return: List of RPC results
235 assert _http_manager, "RPC module not initialized"
237 _http_manager.ExecRequests(self.nc.values())
241 for name, req in self.nc.iteritems():
242 if req.success and req.resp_status_code == http.HTTP_OK:
243 results[name] = RpcResult(data=serializer.LoadJson(req.resp_body),
244 node=name, call=self.procedure)
247 # TODO: Better error reporting
253 logging.error("RPC error in %s from node %s: %s",
254 self.procedure, name, msg)
255 results[name] = RpcResult(data=msg, failed=True, node=name,
261 def _EncodeImportExportIO(ieio, ieioargs):
262 """Encodes import/export I/O information.
265 if ieio == constants.IEIO_RAW_DISK:
266 assert len(ieioargs) == 1
267 return (ieioargs[0].ToDict(), )
269 if ieio == constants.IEIO_SCRIPT:
270 assert len(ieioargs) == 2
271 return (ieioargs[0].ToDict(), ieioargs[1])
276 class RpcRunner(object):
277 """RPC runner class"""
279 def __init__(self, cfg):
280 """Initialized the rpc runner.
282 @type cfg: C{config.ConfigWriter}
283 @param cfg: the configuration object that will be used to get data
288 self.port = utils.GetDaemonPort(constants.NODED)
290 def _InstDict(self, instance, hvp=None, bep=None):
291 """Convert the given instance to a dict.
293 This is done via the instance's ToDict() method and additionally
294 we fill the hvparams with the cluster defaults.
296 @type instance: L{objects.Instance}
297 @param instance: an Instance object
298 @type hvp: dict or None
299 @param hvp: a dictionary with overridden hypervisor parameters
300 @type bep: dict or None
301 @param bep: a dictionary with overridden backend parameters
303 @return: the instance dict, with the hvparams filled with the
307 idict = instance.ToDict()
308 cluster = self._cfg.GetClusterInfo()
309 idict["hvparams"] = cluster.FillHV(instance)
311 idict["hvparams"].update(hvp)
312 idict["beparams"] = cluster.FillBE(instance)
314 idict["beparams"].update(bep)
315 for nic in idict["nics"]:
316 nic['nicparams'] = objects.FillDict(
317 cluster.nicparams[constants.PP_DEFAULT],
321 def _ConnectList(self, client, node_list, call):
322 """Helper for computing node addresses.
324 @type client: L{ganeti.rpc.Client}
325 @param client: a C{Client} instance
326 @type node_list: list
327 @param node_list: the node list we should connect
329 @param call: the name of the remote procedure call, for filling in
330 correctly any eventual offline nodes' results
333 all_nodes = self._cfg.GetAllNodesInfo()
337 for node in node_list:
338 if node in all_nodes:
339 if all_nodes[node].offline:
340 skip_dict[node] = RpcResult(node=node, offline=True, call=call)
342 val = all_nodes[node].primary_ip
345 addr_list.append(val)
346 name_list.append(node)
348 client.ConnectList(name_list, address_list=addr_list)
351 def _ConnectNode(self, client, node, call):
352 """Helper for computing one node's address.
354 @type client: L{ganeti.rpc.Client}
355 @param client: a C{Client} instance
357 @param node: the node we should connect
359 @param call: the name of the remote procedure call, for filling in
360 correctly any eventual offline nodes' results
363 node_info = self._cfg.GetNodeInfo(node)
364 if node_info is not None:
365 if node_info.offline:
366 return RpcResult(node=node, offline=True, call=call)
367 addr = node_info.primary_ip
370 client.ConnectNode(node, address=addr)
372 def _MultiNodeCall(self, node_list, procedure, args):
373 """Helper for making a multi-node call
376 body = serializer.DumpJson(args, indent=False)
377 c = Client(procedure, body, self.port)
378 skip_dict = self._ConnectList(c, node_list, procedure)
379 skip_dict.update(c.GetResults())
383 def _StaticMultiNodeCall(cls, node_list, procedure, args,
385 """Helper for making a multi-node static call
388 body = serializer.DumpJson(args, indent=False)
389 c = Client(procedure, body, utils.GetDaemonPort(constants.NODED))
390 c.ConnectList(node_list, address_list=address_list)
391 return c.GetResults()
393 def _SingleNodeCall(self, node, procedure, args):
394 """Helper for making a single-node call
397 body = serializer.DumpJson(args, indent=False)
398 c = Client(procedure, body, self.port)
399 result = self._ConnectNode(c, node, procedure)
401 # we did connect, node is not offline
402 result = c.GetResults()[node]
406 def _StaticSingleNodeCall(cls, node, procedure, args):
407 """Helper for making a single-node static call
410 body = serializer.DumpJson(args, indent=False)
411 c = Client(procedure, body, utils.GetDaemonPort(constants.NODED))
413 return c.GetResults()[node]
417 """Compresses a string for transport over RPC.
419 Small amounts of data are not compressed.
424 @return: Encoded data to send
427 # Small amounts of data are not compressed
429 return (constants.RPC_ENCODING_NONE, data)
431 # Compress with zlib and encode in base64
432 return (constants.RPC_ENCODING_ZLIB_BASE64,
433 base64.b64encode(zlib.compress(data, 3)))
439 def call_lv_list(self, node_list, vg_name):
440 """Gets the logical volumes present in a given volume group.
442 This is a multi-node call.
445 return self._MultiNodeCall(node_list, "lv_list", [vg_name])
447 def call_vg_list(self, node_list):
448 """Gets the volume group list.
450 This is a multi-node call.
453 return self._MultiNodeCall(node_list, "vg_list", [])
455 def call_storage_list(self, node_list, su_name, su_args, name, fields):
456 """Get list of storage units.
458 This is a multi-node call.
461 return self._MultiNodeCall(node_list, "storage_list",
462 [su_name, su_args, name, fields])
464 def call_storage_modify(self, node, su_name, su_args, name, changes):
465 """Modify a storage unit.
467 This is a single-node call.
470 return self._SingleNodeCall(node, "storage_modify",
471 [su_name, su_args, name, changes])
473 def call_storage_execute(self, node, su_name, su_args, name, op):
474 """Executes an operation on a storage unit.
476 This is a single-node call.
479 return self._SingleNodeCall(node, "storage_execute",
480 [su_name, su_args, name, op])
482 def call_bridges_exist(self, node, bridges_list):
483 """Checks if a node has all the bridges given.
485 This method checks if all bridges given in the bridges_list are
486 present on the remote node, so that an instance that uses interfaces
487 on those bridges can be started.
489 This is a single-node call.
492 return self._SingleNodeCall(node, "bridges_exist", [bridges_list])
494 def call_instance_start(self, node, instance, hvp, bep):
495 """Starts an instance.
497 This is a single-node call.
500 idict = self._InstDict(instance, hvp=hvp, bep=bep)
501 return self._SingleNodeCall(node, "instance_start", [idict])
503 def call_instance_shutdown(self, node, instance, timeout):
504 """Stops an instance.
506 This is a single-node call.
509 return self._SingleNodeCall(node, "instance_shutdown",
510 [self._InstDict(instance), timeout])
512 def call_migration_info(self, node, instance):
513 """Gather the information necessary to prepare an instance migration.
515 This is a single-node call.
518 @param node: the node on which the instance is currently running
519 @type instance: C{objects.Instance}
520 @param instance: the instance definition
523 return self._SingleNodeCall(node, "migration_info",
524 [self._InstDict(instance)])
526 def call_accept_instance(self, node, instance, info, target):
527 """Prepare a node to accept an instance.
529 This is a single-node call.
532 @param node: the target node for the migration
533 @type instance: C{objects.Instance}
534 @param instance: the instance definition
535 @type info: opaque/hypervisor specific (string/data)
536 @param info: result for the call_migration_info call
538 @param target: target hostname (usually ip address) (on the node itself)
541 return self._SingleNodeCall(node, "accept_instance",
542 [self._InstDict(instance), info, target])
544 def call_finalize_migration(self, node, instance, info, success):
545 """Finalize any target-node migration specific operation.
547 This is called both in case of a successful migration and in case of error
548 (in which case it should abort the migration).
550 This is a single-node call.
553 @param node: the target node for the migration
554 @type instance: C{objects.Instance}
555 @param instance: the instance definition
556 @type info: opaque/hypervisor specific (string/data)
557 @param info: result for the call_migration_info call
558 @type success: boolean
559 @param success: whether the migration was a success or a failure
562 return self._SingleNodeCall(node, "finalize_migration",
563 [self._InstDict(instance), info, success])
565 def call_instance_migrate(self, node, instance, target, live):
566 """Migrate an instance.
568 This is a single-node call.
571 @param node: the node on which the instance is currently running
572 @type instance: C{objects.Instance}
573 @param instance: the instance definition
575 @param target: the target node name
577 @param live: whether the migration should be done live or not (the
578 interpretation of this parameter is left to the hypervisor)
581 return self._SingleNodeCall(node, "instance_migrate",
582 [self._InstDict(instance), target, live])
584 def call_instance_reboot(self, node, inst, reboot_type, shutdown_timeout):
585 """Reboots an instance.
587 This is a single-node call.
590 return self._SingleNodeCall(node, "instance_reboot",
591 [self._InstDict(inst), reboot_type,
594 def call_instance_os_add(self, node, inst, reinstall, debug):
595 """Installs an OS on the given instance.
597 This is a single-node call.
600 return self._SingleNodeCall(node, "instance_os_add",
601 [self._InstDict(inst), reinstall, debug])
603 def call_instance_run_rename(self, node, inst, old_name, debug):
604 """Run the OS rename script for an instance.
606 This is a single-node call.
609 return self._SingleNodeCall(node, "instance_run_rename",
610 [self._InstDict(inst), old_name, debug])
612 def call_instance_info(self, node, instance, hname):
613 """Returns information about a single instance.
615 This is a single-node call.
618 @param node: the list of nodes to query
619 @type instance: string
620 @param instance: the instance name
622 @param hname: the hypervisor type of the instance
625 return self._SingleNodeCall(node, "instance_info", [instance, hname])
627 def call_instance_migratable(self, node, instance):
628 """Checks whether the given instance can be migrated.
630 This is a single-node call.
632 @param node: the node to query
633 @type instance: L{objects.Instance}
634 @param instance: the instance to check
638 return self._SingleNodeCall(node, "instance_migratable",
639 [self._InstDict(instance)])
641 def call_all_instances_info(self, node_list, hypervisor_list):
642 """Returns information about all instances on the given nodes.
644 This is a multi-node call.
646 @type node_list: list
647 @param node_list: the list of nodes to query
648 @type hypervisor_list: list
649 @param hypervisor_list: the hypervisors to query for instances
652 return self._MultiNodeCall(node_list, "all_instances_info",
655 def call_instance_list(self, node_list, hypervisor_list):
656 """Returns the list of running instances on a given node.
658 This is a multi-node call.
660 @type node_list: list
661 @param node_list: the list of nodes to query
662 @type hypervisor_list: list
663 @param hypervisor_list: the hypervisors to query for instances
666 return self._MultiNodeCall(node_list, "instance_list", [hypervisor_list])
668 def call_node_tcp_ping(self, node, source, target, port, timeout,
670 """Do a TcpPing on the remote node
672 This is a single-node call.
675 return self._SingleNodeCall(node, "node_tcp_ping",
676 [source, target, port, timeout,
679 def call_node_has_ip_address(self, node, address):
680 """Checks if a node has the given IP address.
682 This is a single-node call.
685 return self._SingleNodeCall(node, "node_has_ip_address", [address])
687 def call_node_info(self, node_list, vg_name, hypervisor_type):
688 """Return node information.
690 This will return memory information and volume group size and free
693 This is a multi-node call.
695 @type node_list: list
696 @param node_list: the list of nodes to query
697 @type vg_name: C{string}
698 @param vg_name: the name of the volume group to ask for disk space
700 @type hypervisor_type: C{str}
701 @param hypervisor_type: the name of the hypervisor to ask for
705 return self._MultiNodeCall(node_list, "node_info",
706 [vg_name, hypervisor_type])
708 def call_node_add(self, node, dsa, dsapub, rsa, rsapub, ssh, sshpub):
709 """Add a node to the cluster.
711 This is a single-node call.
714 return self._SingleNodeCall(node, "node_add",
715 [dsa, dsapub, rsa, rsapub, ssh, sshpub])
717 def call_node_verify(self, node_list, checkdict, cluster_name):
718 """Request verification of given parameters.
720 This is a multi-node call.
723 return self._MultiNodeCall(node_list, "node_verify",
724 [checkdict, cluster_name])
727 def call_node_start_master(cls, node, start_daemons, no_voting):
728 """Tells a node to activate itself as a master.
730 This is a single-node call.
733 return cls._StaticSingleNodeCall(node, "node_start_master",
734 [start_daemons, no_voting])
737 def call_node_stop_master(cls, node, stop_daemons):
738 """Tells a node to demote itself from master status.
740 This is a single-node call.
743 return cls._StaticSingleNodeCall(node, "node_stop_master", [stop_daemons])
746 def call_master_info(cls, node_list):
747 """Query master info.
749 This is a multi-node call.
752 # TODO: should this method query down nodes?
753 return cls._StaticMultiNodeCall(node_list, "master_info", [])
756 def call_version(cls, node_list):
757 """Query node version.
759 This is a multi-node call.
762 return cls._StaticMultiNodeCall(node_list, "version", [])
764 def call_blockdev_create(self, node, bdev, size, owner, on_primary, info):
765 """Request creation of a given block device.
767 This is a single-node call.
770 return self._SingleNodeCall(node, "blockdev_create",
771 [bdev.ToDict(), size, owner, on_primary, info])
773 def call_blockdev_remove(self, node, bdev):
774 """Request removal of a given block device.
776 This is a single-node call.
779 return self._SingleNodeCall(node, "blockdev_remove", [bdev.ToDict()])
781 def call_blockdev_rename(self, node, devlist):
782 """Request rename of the given block devices.
784 This is a single-node call.
787 return self._SingleNodeCall(node, "blockdev_rename",
788 [(d.ToDict(), uid) for d, uid in devlist])
790 def call_blockdev_assemble(self, node, disk, owner, on_primary):
791 """Request assembling of a given block device.
793 This is a single-node call.
796 return self._SingleNodeCall(node, "blockdev_assemble",
797 [disk.ToDict(), owner, on_primary])
799 def call_blockdev_shutdown(self, node, disk):
800 """Request shutdown of a given block device.
802 This is a single-node call.
805 return self._SingleNodeCall(node, "blockdev_shutdown", [disk.ToDict()])
807 def call_blockdev_addchildren(self, node, bdev, ndevs):
808 """Request adding a list of children to a (mirroring) device.
810 This is a single-node call.
813 return self._SingleNodeCall(node, "blockdev_addchildren",
815 [disk.ToDict() for disk in ndevs]])
817 def call_blockdev_removechildren(self, node, bdev, ndevs):
818 """Request removing a list of children from a (mirroring) device.
820 This is a single-node call.
823 return self._SingleNodeCall(node, "blockdev_removechildren",
825 [disk.ToDict() for disk in ndevs]])
827 def call_blockdev_getmirrorstatus(self, node, disks):
828 """Request status of a (mirroring) device.
830 This is a single-node call.
833 result = self._SingleNodeCall(node, "blockdev_getmirrorstatus",
834 [dsk.ToDict() for dsk in disks])
835 if not result.fail_msg:
836 result.payload = [objects.BlockDevStatus.FromDict(i)
837 for i in result.payload]
840 def call_blockdev_find(self, node, disk):
841 """Request identification of a given block device.
843 This is a single-node call.
846 result = self._SingleNodeCall(node, "blockdev_find", [disk.ToDict()])
847 if not result.fail_msg and result.payload is not None:
848 result.payload = objects.BlockDevStatus.FromDict(result.payload)
851 def call_blockdev_close(self, node, instance_name, disks):
852 """Closes the given block devices.
854 This is a single-node call.
857 params = [instance_name, [cf.ToDict() for cf in disks]]
858 return self._SingleNodeCall(node, "blockdev_close", params)
860 def call_blockdev_getsizes(self, node, disks):
861 """Returns the size of the given disks.
863 This is a single-node call.
866 params = [[cf.ToDict() for cf in disks]]
867 return self._SingleNodeCall(node, "blockdev_getsize", params)
869 def call_drbd_disconnect_net(self, node_list, nodes_ip, disks):
870 """Disconnects the network of the given drbd devices.
872 This is a multi-node call.
875 return self._MultiNodeCall(node_list, "drbd_disconnect_net",
876 [nodes_ip, [cf.ToDict() for cf in disks]])
878 def call_drbd_attach_net(self, node_list, nodes_ip,
879 disks, instance_name, multimaster):
880 """Disconnects the given drbd devices.
882 This is a multi-node call.
885 return self._MultiNodeCall(node_list, "drbd_attach_net",
886 [nodes_ip, [cf.ToDict() for cf in disks],
887 instance_name, multimaster])
889 def call_drbd_wait_sync(self, node_list, nodes_ip, disks):
890 """Waits for the synchronization of drbd devices is complete.
892 This is a multi-node call.
895 return self._MultiNodeCall(node_list, "drbd_wait_sync",
896 [nodes_ip, [cf.ToDict() for cf in disks]])
899 def call_upload_file(cls, node_list, file_name, address_list=None):
902 The node will refuse the operation in case the file is not on the
905 This is a multi-node call.
907 @type node_list: list
908 @param node_list: the list of node names to upload to
910 @param file_name: the filename to upload
911 @type address_list: list or None
912 @keyword address_list: an optional list of node addresses, in order
913 to optimize the RPC speed
916 file_contents = utils.ReadFile(file_name)
917 data = cls._Compress(file_contents)
918 st = os.stat(file_name)
919 params = [file_name, data, st.st_mode, st.st_uid, st.st_gid,
920 st.st_atime, st.st_mtime]
921 return cls._StaticMultiNodeCall(node_list, "upload_file", params,
922 address_list=address_list)
925 def call_write_ssconf_files(cls, node_list, values):
926 """Write ssconf files.
928 This is a multi-node call.
931 return cls._StaticMultiNodeCall(node_list, "write_ssconf_files", [values])
933 def call_os_diagnose(self, node_list):
934 """Request a diagnose of OS definitions.
936 This is a multi-node call.
939 return self._MultiNodeCall(node_list, "os_diagnose", [])
941 def call_os_get(self, node, name):
942 """Returns an OS definition.
944 This is a single-node call.
947 result = self._SingleNodeCall(node, "os_get", [name])
948 if not result.fail_msg and isinstance(result.payload, dict):
949 result.payload = objects.OS.FromDict(result.payload)
952 def call_hooks_runner(self, node_list, hpath, phase, env):
953 """Call the hooks runner.
956 - op: the OpCode instance
957 - env: a dictionary with the environment
959 This is a multi-node call.
962 params = [hpath, phase, env]
963 return self._MultiNodeCall(node_list, "hooks_runner", params)
965 def call_iallocator_runner(self, node, name, idata):
966 """Call an iallocator on a remote node
969 - name: the iallocator name
970 - input: the json-encoded input string
972 This is a single-node call.
975 return self._SingleNodeCall(node, "iallocator_runner", [name, idata])
977 def call_blockdev_grow(self, node, cf_bdev, amount):
978 """Request a snapshot of the given block device.
980 This is a single-node call.
983 return self._SingleNodeCall(node, "blockdev_grow",
984 [cf_bdev.ToDict(), amount])
986 def call_blockdev_export(self, node, cf_bdev,
987 dest_node, dest_path, cluster_name):
988 """Export a given disk to another node.
990 This is a single-node call.
993 return self._SingleNodeCall(node, "blockdev_export",
994 [cf_bdev.ToDict(), dest_node, dest_path,
997 def call_blockdev_snapshot(self, node, cf_bdev):
998 """Request a snapshot of the given block device.
1000 This is a single-node call.
1003 return self._SingleNodeCall(node, "blockdev_snapshot", [cf_bdev.ToDict()])
1005 def call_finalize_export(self, node, instance, snap_disks):
1006 """Request the completion of an export operation.
1008 This writes the export config file, etc.
1010 This is a single-node call.
1014 for disk in snap_disks:
1015 if isinstance(disk, bool):
1016 flat_disks.append(disk)
1018 flat_disks.append(disk.ToDict())
1020 return self._SingleNodeCall(node, "finalize_export",
1021 [self._InstDict(instance), flat_disks])
1023 def call_export_info(self, node, path):
1024 """Queries the export information in a given path.
1026 This is a single-node call.
1029 return self._SingleNodeCall(node, "export_info", [path])
1031 def call_export_list(self, node_list):
1032 """Gets the stored exports list.
1034 This is a multi-node call.
1037 return self._MultiNodeCall(node_list, "export_list", [])
1039 def call_export_remove(self, node, export):
1040 """Requests removal of a given export.
1042 This is a single-node call.
1045 return self._SingleNodeCall(node, "export_remove", [export])
1048 def call_node_leave_cluster(cls, node, modify_ssh_setup):
1049 """Requests a node to clean the cluster information it has.
1051 This will remove the configuration information from the ganeti data
1054 This is a single-node call.
1057 return cls._StaticSingleNodeCall(node, "node_leave_cluster",
1060 def call_node_volumes(self, node_list):
1061 """Gets all volumes on node(s).
1063 This is a multi-node call.
1066 return self._MultiNodeCall(node_list, "node_volumes", [])
1068 def call_node_demote_from_mc(self, node):
1069 """Demote a node from the master candidate role.
1071 This is a single-node call.
1074 return self._SingleNodeCall(node, "node_demote_from_mc", [])
1076 def call_node_powercycle(self, node, hypervisor):
1077 """Tries to powercycle a node.
1079 This is a single-node call.
1082 return self._SingleNodeCall(node, "node_powercycle", [hypervisor])
1084 def call_test_delay(self, node_list, duration):
1085 """Sleep for a fixed time on given node(s).
1087 This is a multi-node call.
1090 return self._MultiNodeCall(node_list, "test_delay", [duration])
1092 def call_file_storage_dir_create(self, node, file_storage_dir):
1093 """Create the given file storage directory.
1095 This is a single-node call.
1098 return self._SingleNodeCall(node, "file_storage_dir_create",
1101 def call_file_storage_dir_remove(self, node, file_storage_dir):
1102 """Remove the given file storage directory.
1104 This is a single-node call.
1107 return self._SingleNodeCall(node, "file_storage_dir_remove",
1110 def call_file_storage_dir_rename(self, node, old_file_storage_dir,
1111 new_file_storage_dir):
1112 """Rename file storage directory.
1114 This is a single-node call.
1117 return self._SingleNodeCall(node, "file_storage_dir_rename",
1118 [old_file_storage_dir, new_file_storage_dir])
1121 def call_jobqueue_update(cls, node_list, address_list, file_name, content):
1122 """Update job queue.
1124 This is a multi-node call.
1127 return cls._StaticMultiNodeCall(node_list, "jobqueue_update",
1128 [file_name, cls._Compress(content)],
1129 address_list=address_list)
1132 def call_jobqueue_purge(cls, node):
1135 This is a single-node call.
1138 return cls._StaticSingleNodeCall(node, "jobqueue_purge", [])
1141 def call_jobqueue_rename(cls, node_list, address_list, rename):
1142 """Rename a job queue file.
1144 This is a multi-node call.
1147 return cls._StaticMultiNodeCall(node_list, "jobqueue_rename", rename,
1148 address_list=address_list)
1151 def call_jobqueue_set_drain(cls, node_list, drain_flag):
1152 """Set the drain flag on the queue.
1154 This is a multi-node call.
1156 @type node_list: list
1157 @param node_list: the list of nodes to query
1158 @type drain_flag: bool
1159 @param drain_flag: if True, will set the drain flag, otherwise reset it.
1162 return cls._StaticMultiNodeCall(node_list, "jobqueue_set_drain",
1165 def call_hypervisor_validate_params(self, node_list, hvname, hvparams):
1166 """Validate the hypervisor params.
1168 This is a multi-node call.
1170 @type node_list: list
1171 @param node_list: the list of nodes to query
1172 @type hvname: string
1173 @param hvname: the hypervisor name
1174 @type hvparams: dict
1175 @param hvparams: the hypervisor parameters to be validated
1178 cluster = self._cfg.GetClusterInfo()
1179 hv_full = objects.FillDict(cluster.hvparams.get(hvname, {}), hvparams)
1180 return self._MultiNodeCall(node_list, "hypervisor_validate_params",
1183 def call_x509_cert_create(self, node, validity):
1184 """Creates a new X509 certificate for SSL/TLS.
1186 This is a single-node call.
1189 @param validity: Validity in seconds
1192 return self._SingleNodeCall(node, "x509_cert_create", [validity])
1194 def call_x509_cert_remove(self, node, name):
1195 """Removes a X509 certificate.
1197 This is a single-node call.
1200 @param name: Certificate name
1203 return self._SingleNodeCall(node, "x509_cert_remove", [name])
1205 def call_import_start(self, node, opts, instance, dest, dest_args):
1206 """Starts a listener for an import.
1208 This is a single-node call.
1211 @param node: Node name
1212 @type instance: C{objects.Instance}
1213 @param instance: Instance object
1216 return self._SingleNodeCall(node, "import_start",
1218 self._InstDict(instance), dest,
1219 _EncodeImportExportIO(dest, dest_args)])
1221 def call_export_start(self, node, opts, host, port,
1222 instance, source, source_args):
1223 """Starts an export daemon.
1225 This is a single-node call.
1228 @param node: Node name
1229 @type instance: C{objects.Instance}
1230 @param instance: Instance object
1233 return self._SingleNodeCall(node, "export_start",
1234 [opts.ToDict(), host, port,
1235 self._InstDict(instance), source,
1236 _EncodeImportExportIO(source, source_args)])
1238 def call_impexp_status(self, node, names):
1239 """Gets the status of an import or export.
1241 This is a single-node call.
1244 @param node: Node name
1245 @type names: List of strings
1246 @param names: Import/export names
1247 @rtype: List of L{objects.ImportExportStatus} instances
1248 @return: Returns a list of the state of each named import/export or None if
1249 a status couldn't be retrieved
1252 result = self._SingleNodeCall(node, "impexp_status", [names])
1254 if not result.fail_msg:
1257 for i in result.payload:
1259 decoded.append(None)
1261 decoded.append(objects.ImportExportStatus.FromDict(i))
1263 result.payload = decoded
1267 def call_impexp_abort(self, node, name):
1268 """Aborts an import or export.
1270 This is a single-node call.
1273 @param node: Node name
1275 @param name: Import/export name
1278 return self._SingleNodeCall(node, "impexp_abort", [name])
1280 def call_impexp_cleanup(self, node, name):
1281 """Cleans up after an import or export.
1283 This is a single-node call.
1286 @param node: Node name
1288 @param name: Import/export name
1291 return self._SingleNodeCall(node, "impexp_cleanup", [name])