4 # Copyright (C) 2006, 2007 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Inter-node RPC library.
26 # pylint: disable-msg=C0103,R0201,R0904
27 # C0103: Invalid name, since call_ are not valid
28 # R0201: Method could be a function, we keep all rpcs instance methods
29 # as not to change them back and forth between static/instance methods
30 # if they need to start using instance attributes
31 # R0904: Too many public methods
38 from ganeti import utils
39 from ganeti import objects
40 from ganeti import http
41 from ganeti import serializer
42 from ganeti import constants
43 from ganeti import errors
45 # pylint has a bug here, doesn't see this import
46 import ganeti.http.client # pylint: disable-msg=W0611
49 # Module level variable
54 """Initializes the module-global HTTP client manager.
56 Must be called before using any RPC function.
59 global _http_manager # pylint: disable-msg=W0603
61 assert not _http_manager, "RPC module initialized more than once"
65 _http_manager = http.client.HttpClientManager()
69 """Stops the module-global HTTP client manager.
71 Must be called before quitting the program.
74 global _http_manager # pylint: disable-msg=W0603
77 _http_manager.Shutdown()
81 class RpcResult(object):
84 This class holds an RPC result. It is needed since in multi-node
85 calls we can't raise an exception just because one one out of many
86 failed, and therefore we use this class to encapsulate the result.
88 @ivar data: the data payload, for successful results, or None
89 @ivar call: the name of the RPC call
90 @ivar node: the name of the node to which we made the call
91 @ivar offline: whether the operation failed because the node was
92 offline, as opposed to actual failure; offline=True will always
93 imply failed=True, in order to allow simpler checking if
94 the user doesn't care about the exact failure mode
95 @ivar fail_msg: the error message if the call failed
98 def __init__(self, data=None, failed=False, offline=False,
99 call=None, node=None):
100 self.offline = offline
105 self.fail_msg = "Node is marked offline"
106 self.data = self.payload = None
108 self.fail_msg = self._EnsureErr(data)
109 self.data = self.payload = None
112 if not isinstance(self.data, (tuple, list)):
113 self.fail_msg = ("RPC layer error: invalid result type (%s)" %
117 self.fail_msg = ("RPC layer error: invalid result length (%d), "
118 "expected 2" % len(self.data))
120 elif not self.data[0]:
121 self.fail_msg = self._EnsureErr(self.data[1])
126 self.payload = data[1]
128 assert hasattr(self, "call")
129 assert hasattr(self, "data")
130 assert hasattr(self, "fail_msg")
131 assert hasattr(self, "node")
132 assert hasattr(self, "offline")
133 assert hasattr(self, "payload")
137 """Helper to ensure we return a 'True' value for error."""
141 return "No error information"
143 def Raise(self, msg, prereq=False, ecode=None):
144 """If the result has failed, raise an OpExecError.
146 This is used so that LU code doesn't have to check for each
147 result, but instead can call this function.
150 if not self.fail_msg:
153 if not msg: # one could pass None for default message
154 msg = ("Call '%s' to node '%s' has failed: %s" %
155 (self.call, self.node, self.fail_msg))
157 msg = "%s: %s" % (msg, self.fail_msg)
159 ec = errors.OpPrereqError
161 ec = errors.OpExecError
162 if ecode is not None:
166 raise ec(*args) # pylint: disable-msg=W0142
172 This class, given a (remote) method name, a list of parameters and a
173 list of nodes, will contact (in parallel) all nodes, and return a
174 dict of results (key: node name, value: result).
176 One current bug is that generic failure is still signaled by
177 'False' result, which is not good. This overloading of values can
181 def __init__(self, procedure, body, port):
182 self.procedure = procedure
188 http.HttpSslParams(ssl_key_path=constants.NODED_CERT_FILE,
189 ssl_cert_path=constants.NODED_CERT_FILE)
191 def ConnectList(self, node_list, address_list=None):
192 """Add a list of nodes to the target nodes.
194 @type node_list: list
195 @param node_list: the list of node names to connect
196 @type address_list: list or None
197 @keyword address_list: either None or a list with node addresses,
198 which must have the same length as the node list
201 if address_list is None:
202 address_list = [None for _ in node_list]
204 assert len(node_list) == len(address_list), \
205 "Name and address lists should have the same length"
206 for node, address in zip(node_list, address_list):
207 self.ConnectNode(node, address)
209 def ConnectNode(self, name, address=None):
210 """Add a node to the target list.
213 @param name: the node name
215 @keyword address: the node address, if known
222 http.client.HttpClientRequest(address, self.port, http.HTTP_PUT,
223 "/%s" % self.procedure,
225 ssl_params=self._ssl_params,
226 ssl_verify_peer=True)
228 def GetResults(self):
229 """Call nodes and return results.
232 @return: List of RPC results
235 assert _http_manager, "RPC module not initialized"
237 _http_manager.ExecRequests(self.nc.values())
241 for name, req in self.nc.iteritems():
242 if req.success and req.resp_status_code == http.HTTP_OK:
243 results[name] = RpcResult(data=serializer.LoadJson(req.resp_body),
244 node=name, call=self.procedure)
247 # TODO: Better error reporting
253 logging.error("RPC error in %s from node %s: %s",
254 self.procedure, name, msg)
255 results[name] = RpcResult(data=msg, failed=True, node=name,
261 def _EncodeImportExportIO(ieio, ieioargs):
262 """Encodes import/export I/O information.
265 if ieio == constants.IEIO_RAW_DISK:
266 assert len(ieioargs) == 1
267 return (ieioargs[0].ToDict(), )
269 if ieio == constants.IEIO_SCRIPT:
270 assert len(ieioargs) == 2
271 return (ieioargs[0].ToDict(), ieioargs[1])
276 class RpcRunner(object):
277 """RPC runner class"""
279 def __init__(self, cfg):
280 """Initialized the rpc runner.
282 @type cfg: C{config.ConfigWriter}
283 @param cfg: the configuration object that will be used to get data
288 self.port = utils.GetDaemonPort(constants.NODED)
290 def _InstDict(self, instance, hvp=None, bep=None):
291 """Convert the given instance to a dict.
293 This is done via the instance's ToDict() method and additionally
294 we fill the hvparams with the cluster defaults.
296 @type instance: L{objects.Instance}
297 @param instance: an Instance object
298 @type hvp: dict or None
299 @param hvp: a dictionary with overridden hypervisor parameters
300 @type bep: dict or None
301 @param bep: a dictionary with overridden backend parameters
303 @return: the instance dict, with the hvparams filled with the
307 idict = instance.ToDict()
308 cluster = self._cfg.GetClusterInfo()
309 idict["hvparams"] = cluster.FillHV(instance)
311 idict["hvparams"].update(hvp)
312 idict["beparams"] = cluster.FillBE(instance)
314 idict["beparams"].update(bep)
315 for nic in idict["nics"]:
316 nic['nicparams'] = objects.FillDict(
317 cluster.nicparams[constants.PP_DEFAULT],
321 def _ConnectList(self, client, node_list, call):
322 """Helper for computing node addresses.
324 @type client: L{ganeti.rpc.Client}
325 @param client: a C{Client} instance
326 @type node_list: list
327 @param node_list: the node list we should connect
329 @param call: the name of the remote procedure call, for filling in
330 correctly any eventual offline nodes' results
333 all_nodes = self._cfg.GetAllNodesInfo()
337 for node in node_list:
338 if node in all_nodes:
339 if all_nodes[node].offline:
340 skip_dict[node] = RpcResult(node=node, offline=True, call=call)
342 val = all_nodes[node].primary_ip
345 addr_list.append(val)
346 name_list.append(node)
348 client.ConnectList(name_list, address_list=addr_list)
351 def _ConnectNode(self, client, node, call):
352 """Helper for computing one node's address.
354 @type client: L{ganeti.rpc.Client}
355 @param client: a C{Client} instance
357 @param node: the node we should connect
359 @param call: the name of the remote procedure call, for filling in
360 correctly any eventual offline nodes' results
363 node_info = self._cfg.GetNodeInfo(node)
364 if node_info is not None:
365 if node_info.offline:
366 return RpcResult(node=node, offline=True, call=call)
367 addr = node_info.primary_ip
370 client.ConnectNode(node, address=addr)
372 def _MultiNodeCall(self, node_list, procedure, args):
373 """Helper for making a multi-node call
376 body = serializer.DumpJson(args, indent=False)
377 c = Client(procedure, body, self.port)
378 skip_dict = self._ConnectList(c, node_list, procedure)
379 skip_dict.update(c.GetResults())
383 def _StaticMultiNodeCall(cls, node_list, procedure, args,
385 """Helper for making a multi-node static call
388 body = serializer.DumpJson(args, indent=False)
389 c = Client(procedure, body, utils.GetDaemonPort(constants.NODED))
390 c.ConnectList(node_list, address_list=address_list)
391 return c.GetResults()
393 def _SingleNodeCall(self, node, procedure, args):
394 """Helper for making a single-node call
397 body = serializer.DumpJson(args, indent=False)
398 c = Client(procedure, body, self.port)
399 result = self._ConnectNode(c, node, procedure)
401 # we did connect, node is not offline
402 result = c.GetResults()[node]
406 def _StaticSingleNodeCall(cls, node, procedure, args):
407 """Helper for making a single-node static call
410 body = serializer.DumpJson(args, indent=False)
411 c = Client(procedure, body, utils.GetDaemonPort(constants.NODED))
413 return c.GetResults()[node]
417 """Compresses a string for transport over RPC.
419 Small amounts of data are not compressed.
424 @return: Encoded data to send
427 # Small amounts of data are not compressed
429 return (constants.RPC_ENCODING_NONE, data)
431 # Compress with zlib and encode in base64
432 return (constants.RPC_ENCODING_ZLIB_BASE64,
433 base64.b64encode(zlib.compress(data, 3)))
439 def call_lv_list(self, node_list, vg_name):
440 """Gets the logical volumes present in a given volume group.
442 This is a multi-node call.
445 return self._MultiNodeCall(node_list, "lv_list", [vg_name])
447 def call_vg_list(self, node_list):
448 """Gets the volume group list.
450 This is a multi-node call.
453 return self._MultiNodeCall(node_list, "vg_list", [])
455 def call_storage_list(self, node_list, su_name, su_args, name, fields):
456 """Get list of storage units.
458 This is a multi-node call.
461 return self._MultiNodeCall(node_list, "storage_list",
462 [su_name, su_args, name, fields])
464 def call_storage_modify(self, node, su_name, su_args, name, changes):
465 """Modify a storage unit.
467 This is a single-node call.
470 return self._SingleNodeCall(node, "storage_modify",
471 [su_name, su_args, name, changes])
473 def call_storage_execute(self, node, su_name, su_args, name, op):
474 """Executes an operation on a storage unit.
476 This is a single-node call.
479 return self._SingleNodeCall(node, "storage_execute",
480 [su_name, su_args, name, op])
482 def call_bridges_exist(self, node, bridges_list):
483 """Checks if a node has all the bridges given.
485 This method checks if all bridges given in the bridges_list are
486 present on the remote node, so that an instance that uses interfaces
487 on those bridges can be started.
489 This is a single-node call.
492 return self._SingleNodeCall(node, "bridges_exist", [bridges_list])
494 def call_instance_start(self, node, instance, hvp, bep):
495 """Starts an instance.
497 This is a single-node call.
500 idict = self._InstDict(instance, hvp=hvp, bep=bep)
501 return self._SingleNodeCall(node, "instance_start", [idict])
503 def call_instance_shutdown(self, node, instance, timeout):
504 """Stops an instance.
506 This is a single-node call.
509 return self._SingleNodeCall(node, "instance_shutdown",
510 [self._InstDict(instance), timeout])
512 def call_migration_info(self, node, instance):
513 """Gather the information necessary to prepare an instance migration.
515 This is a single-node call.
518 @param node: the node on which the instance is currently running
519 @type instance: C{objects.Instance}
520 @param instance: the instance definition
523 return self._SingleNodeCall(node, "migration_info",
524 [self._InstDict(instance)])
526 def call_accept_instance(self, node, instance, info, target):
527 """Prepare a node to accept an instance.
529 This is a single-node call.
532 @param node: the target node for the migration
533 @type instance: C{objects.Instance}
534 @param instance: the instance definition
535 @type info: opaque/hypervisor specific (string/data)
536 @param info: result for the call_migration_info call
538 @param target: target hostname (usually ip address) (on the node itself)
541 return self._SingleNodeCall(node, "accept_instance",
542 [self._InstDict(instance), info, target])
544 def call_finalize_migration(self, node, instance, info, success):
545 """Finalize any target-node migration specific operation.
547 This is called both in case of a successful migration and in case of error
548 (in which case it should abort the migration).
550 This is a single-node call.
553 @param node: the target node for the migration
554 @type instance: C{objects.Instance}
555 @param instance: the instance definition
556 @type info: opaque/hypervisor specific (string/data)
557 @param info: result for the call_migration_info call
558 @type success: boolean
559 @param success: whether the migration was a success or a failure
562 return self._SingleNodeCall(node, "finalize_migration",
563 [self._InstDict(instance), info, success])
565 def call_instance_migrate(self, node, instance, target, live):
566 """Migrate an instance.
568 This is a single-node call.
571 @param node: the node on which the instance is currently running
572 @type instance: C{objects.Instance}
573 @param instance: the instance definition
575 @param target: the target node name
577 @param live: whether the migration should be done live or not (the
578 interpretation of this parameter is left to the hypervisor)
581 return self._SingleNodeCall(node, "instance_migrate",
582 [self._InstDict(instance), target, live])
584 def call_instance_reboot(self, node, inst, reboot_type, shutdown_timeout):
585 """Reboots an instance.
587 This is a single-node call.
590 return self._SingleNodeCall(node, "instance_reboot",
591 [self._InstDict(inst), reboot_type,
594 def call_instance_os_add(self, node, inst, reinstall, debug):
595 """Installs an OS on the given instance.
597 This is a single-node call.
600 return self._SingleNodeCall(node, "instance_os_add",
601 [self._InstDict(inst), reinstall, debug])
603 def call_instance_run_rename(self, node, inst, old_name, debug):
604 """Run the OS rename script for an instance.
606 This is a single-node call.
609 return self._SingleNodeCall(node, "instance_run_rename",
610 [self._InstDict(inst), old_name, debug])
612 def call_instance_info(self, node, instance, hname):
613 """Returns information about a single instance.
615 This is a single-node call.
618 @param node: the list of nodes to query
619 @type instance: string
620 @param instance: the instance name
622 @param hname: the hypervisor type of the instance
625 return self._SingleNodeCall(node, "instance_info", [instance, hname])
627 def call_instance_migratable(self, node, instance):
628 """Checks whether the given instance can be migrated.
630 This is a single-node call.
632 @param node: the node to query
633 @type instance: L{objects.Instance}
634 @param instance: the instance to check
638 return self._SingleNodeCall(node, "instance_migratable",
639 [self._InstDict(instance)])
641 def call_all_instances_info(self, node_list, hypervisor_list):
642 """Returns information about all instances on the given nodes.
644 This is a multi-node call.
646 @type node_list: list
647 @param node_list: the list of nodes to query
648 @type hypervisor_list: list
649 @param hypervisor_list: the hypervisors to query for instances
652 return self._MultiNodeCall(node_list, "all_instances_info",
655 def call_instance_list(self, node_list, hypervisor_list):
656 """Returns the list of running instances on a given node.
658 This is a multi-node call.
660 @type node_list: list
661 @param node_list: the list of nodes to query
662 @type hypervisor_list: list
663 @param hypervisor_list: the hypervisors to query for instances
666 return self._MultiNodeCall(node_list, "instance_list", [hypervisor_list])
668 def call_node_tcp_ping(self, node, source, target, port, timeout,
670 """Do a TcpPing on the remote node
672 This is a single-node call.
675 return self._SingleNodeCall(node, "node_tcp_ping",
676 [source, target, port, timeout,
679 def call_node_has_ip_address(self, node, address):
680 """Checks if a node has the given IP address.
682 This is a single-node call.
685 return self._SingleNodeCall(node, "node_has_ip_address", [address])
687 def call_node_info(self, node_list, vg_name, hypervisor_type):
688 """Return node information.
690 This will return memory information and volume group size and free
693 This is a multi-node call.
695 @type node_list: list
696 @param node_list: the list of nodes to query
697 @type vg_name: C{string}
698 @param vg_name: the name of the volume group to ask for disk space
700 @type hypervisor_type: C{str}
701 @param hypervisor_type: the name of the hypervisor to ask for
705 return self._MultiNodeCall(node_list, "node_info",
706 [vg_name, hypervisor_type])
708 def call_node_add(self, node, dsa, dsapub, rsa, rsapub, ssh, sshpub):
709 """Add a node to the cluster.
711 This is a single-node call.
714 return self._SingleNodeCall(node, "node_add",
715 [dsa, dsapub, rsa, rsapub, ssh, sshpub])
717 def call_node_verify(self, node_list, checkdict, cluster_name):
718 """Request verification of given parameters.
720 This is a multi-node call.
723 return self._MultiNodeCall(node_list, "node_verify",
724 [checkdict, cluster_name])
727 def call_node_start_master(cls, node, start_daemons, no_voting):
728 """Tells a node to activate itself as a master.
730 This is a single-node call.
733 return cls._StaticSingleNodeCall(node, "node_start_master",
734 [start_daemons, no_voting])
737 def call_node_stop_master(cls, node, stop_daemons):
738 """Tells a node to demote itself from master status.
740 This is a single-node call.
743 return cls._StaticSingleNodeCall(node, "node_stop_master", [stop_daemons])
746 def call_master_info(cls, node_list):
747 """Query master info.
749 This is a multi-node call.
752 # TODO: should this method query down nodes?
753 return cls._StaticMultiNodeCall(node_list, "master_info", [])
756 def call_version(cls, node_list):
757 """Query node version.
759 This is a multi-node call.
762 return cls._StaticMultiNodeCall(node_list, "version", [])
764 def call_blockdev_create(self, node, bdev, size, owner, on_primary, info):
765 """Request creation of a given block device.
767 This is a single-node call.
770 return self._SingleNodeCall(node, "blockdev_create",
771 [bdev.ToDict(), size, owner, on_primary, info])
773 def call_blockdev_remove(self, node, bdev):
774 """Request removal of a given block device.
776 This is a single-node call.
779 return self._SingleNodeCall(node, "blockdev_remove", [bdev.ToDict()])
781 def call_blockdev_rename(self, node, devlist):
782 """Request rename of the given block devices.
784 This is a single-node call.
787 return self._SingleNodeCall(node, "blockdev_rename",
788 [(d.ToDict(), uid) for d, uid in devlist])
790 def call_blockdev_assemble(self, node, disk, owner, on_primary):
791 """Request assembling of a given block device.
793 This is a single-node call.
796 return self._SingleNodeCall(node, "blockdev_assemble",
797 [disk.ToDict(), owner, on_primary])
799 def call_blockdev_shutdown(self, node, disk):
800 """Request shutdown of a given block device.
802 This is a single-node call.
805 return self._SingleNodeCall(node, "blockdev_shutdown", [disk.ToDict()])
807 def call_blockdev_addchildren(self, node, bdev, ndevs):
808 """Request adding a list of children to a (mirroring) device.
810 This is a single-node call.
813 return self._SingleNodeCall(node, "blockdev_addchildren",
815 [disk.ToDict() for disk in ndevs]])
817 def call_blockdev_removechildren(self, node, bdev, ndevs):
818 """Request removing a list of children from a (mirroring) device.
820 This is a single-node call.
823 return self._SingleNodeCall(node, "blockdev_removechildren",
825 [disk.ToDict() for disk in ndevs]])
827 def call_blockdev_getmirrorstatus(self, node, disks):
828 """Request status of a (mirroring) device.
830 This is a single-node call.
833 result = self._SingleNodeCall(node, "blockdev_getmirrorstatus",
834 [dsk.ToDict() for dsk in disks])
835 if not result.fail_msg:
836 result.payload = [objects.BlockDevStatus.FromDict(i)
837 for i in result.payload]
840 def call_blockdev_find(self, node, disk):
841 """Request identification of a given block device.
843 This is a single-node call.
846 result = self._SingleNodeCall(node, "blockdev_find", [disk.ToDict()])
847 if not result.fail_msg and result.payload is not None:
848 result.payload = objects.BlockDevStatus.FromDict(result.payload)
851 def call_blockdev_close(self, node, instance_name, disks):
852 """Closes the given block devices.
854 This is a single-node call.
857 params = [instance_name, [cf.ToDict() for cf in disks]]
858 return self._SingleNodeCall(node, "blockdev_close", params)
860 def call_blockdev_getsizes(self, node, disks):
861 """Returns the size of the given disks.
863 This is a single-node call.
866 params = [[cf.ToDict() for cf in disks]]
867 return self._SingleNodeCall(node, "blockdev_getsize", params)
869 def call_drbd_disconnect_net(self, node_list, nodes_ip, disks):
870 """Disconnects the network of the given drbd devices.
872 This is a multi-node call.
875 return self._MultiNodeCall(node_list, "drbd_disconnect_net",
876 [nodes_ip, [cf.ToDict() for cf in disks]])
878 def call_drbd_attach_net(self, node_list, nodes_ip,
879 disks, instance_name, multimaster):
880 """Disconnects the given drbd devices.
882 This is a multi-node call.
885 return self._MultiNodeCall(node_list, "drbd_attach_net",
886 [nodes_ip, [cf.ToDict() for cf in disks],
887 instance_name, multimaster])
889 def call_drbd_wait_sync(self, node_list, nodes_ip, disks):
890 """Waits for the synchronization of drbd devices is complete.
892 This is a multi-node call.
895 return self._MultiNodeCall(node_list, "drbd_wait_sync",
896 [nodes_ip, [cf.ToDict() for cf in disks]])
899 def call_upload_file(cls, node_list, file_name, address_list=None):
902 The node will refuse the operation in case the file is not on the
905 This is a multi-node call.
907 @type node_list: list
908 @param node_list: the list of node names to upload to
910 @param file_name: the filename to upload
911 @type address_list: list or None
912 @keyword address_list: an optional list of node addresses, in order
913 to optimize the RPC speed
916 file_contents = utils.ReadFile(file_name)
917 data = cls._Compress(file_contents)
918 st = os.stat(file_name)
919 params = [file_name, data, st.st_mode, st.st_uid, st.st_gid,
920 st.st_atime, st.st_mtime]
921 return cls._StaticMultiNodeCall(node_list, "upload_file", params,
922 address_list=address_list)
925 def call_write_ssconf_files(cls, node_list, values):
926 """Write ssconf files.
928 This is a multi-node call.
931 return cls._StaticMultiNodeCall(node_list, "write_ssconf_files", [values])
933 def call_os_diagnose(self, node_list):
934 """Request a diagnose of OS definitions.
936 This is a multi-node call.
939 return self._MultiNodeCall(node_list, "os_diagnose", [])
941 def call_os_get(self, node, name):
942 """Returns an OS definition.
944 This is a single-node call.
947 result = self._SingleNodeCall(node, "os_get", [name])
948 if not result.fail_msg and isinstance(result.payload, dict):
949 result.payload = objects.OS.FromDict(result.payload)
952 def call_hooks_runner(self, node_list, hpath, phase, env):
953 """Call the hooks runner.
956 - op: the OpCode instance
957 - env: a dictionary with the environment
959 This is a multi-node call.
962 params = [hpath, phase, env]
963 return self._MultiNodeCall(node_list, "hooks_runner", params)
965 def call_iallocator_runner(self, node, name, idata):
966 """Call an iallocator on a remote node
969 - name: the iallocator name
970 - input: the json-encoded input string
972 This is a single-node call.
975 return self._SingleNodeCall(node, "iallocator_runner", [name, idata])
977 def call_blockdev_grow(self, node, cf_bdev, amount):
978 """Request a snapshot of the given block device.
980 This is a single-node call.
983 return self._SingleNodeCall(node, "blockdev_grow",
984 [cf_bdev.ToDict(), amount])
986 def call_blockdev_export(self, node, cf_bdev,
987 dest_node, dest_path, cluster_name):
988 """Export a given disk to another node.
990 This is a single-node call.
993 return self._SingleNodeCall(node, "blockdev_export",
994 [cf_bdev.ToDict(), dest_node, dest_path,
997 def call_blockdev_snapshot(self, node, cf_bdev):
998 """Request a snapshot of the given block device.
1000 This is a single-node call.
1003 return self._SingleNodeCall(node, "blockdev_snapshot", [cf_bdev.ToDict()])
1005 def call_snapshot_export(self, node, snap_bdev, dest_node, instance,
1006 cluster_name, idx, debug):
1007 """Request the export of a given snapshot.
1009 This is a single-node call.
1012 return self._SingleNodeCall(node, "snapshot_export",
1013 [snap_bdev.ToDict(), dest_node,
1014 self._InstDict(instance), cluster_name,
1017 def call_finalize_export(self, node, instance, snap_disks):
1018 """Request the completion of an export operation.
1020 This writes the export config file, etc.
1022 This is a single-node call.
1026 for disk in snap_disks:
1027 if isinstance(disk, bool):
1028 flat_disks.append(disk)
1030 flat_disks.append(disk.ToDict())
1032 return self._SingleNodeCall(node, "finalize_export",
1033 [self._InstDict(instance), flat_disks])
1035 def call_export_info(self, node, path):
1036 """Queries the export information in a given path.
1038 This is a single-node call.
1041 return self._SingleNodeCall(node, "export_info", [path])
1043 def call_instance_os_import(self, node, inst, src_node, src_images,
1044 cluster_name, debug):
1045 """Request the import of a backup into an instance.
1047 This is a single-node call.
1050 return self._SingleNodeCall(node, "instance_os_import",
1051 [self._InstDict(inst), src_node, src_images,
1052 cluster_name, debug])
1054 def call_export_list(self, node_list):
1055 """Gets the stored exports list.
1057 This is a multi-node call.
1060 return self._MultiNodeCall(node_list, "export_list", [])
1062 def call_export_remove(self, node, export):
1063 """Requests removal of a given export.
1065 This is a single-node call.
1068 return self._SingleNodeCall(node, "export_remove", [export])
1071 def call_node_leave_cluster(cls, node, modify_ssh_setup):
1072 """Requests a node to clean the cluster information it has.
1074 This will remove the configuration information from the ganeti data
1077 This is a single-node call.
1080 return cls._StaticSingleNodeCall(node, "node_leave_cluster",
1083 def call_node_volumes(self, node_list):
1084 """Gets all volumes on node(s).
1086 This is a multi-node call.
1089 return self._MultiNodeCall(node_list, "node_volumes", [])
1091 def call_node_demote_from_mc(self, node):
1092 """Demote a node from the master candidate role.
1094 This is a single-node call.
1097 return self._SingleNodeCall(node, "node_demote_from_mc", [])
1099 def call_node_powercycle(self, node, hypervisor):
1100 """Tries to powercycle a node.
1102 This is a single-node call.
1105 return self._SingleNodeCall(node, "node_powercycle", [hypervisor])
1107 def call_test_delay(self, node_list, duration):
1108 """Sleep for a fixed time on given node(s).
1110 This is a multi-node call.
1113 return self._MultiNodeCall(node_list, "test_delay", [duration])
1115 def call_file_storage_dir_create(self, node, file_storage_dir):
1116 """Create the given file storage directory.
1118 This is a single-node call.
1121 return self._SingleNodeCall(node, "file_storage_dir_create",
1124 def call_file_storage_dir_remove(self, node, file_storage_dir):
1125 """Remove the given file storage directory.
1127 This is a single-node call.
1130 return self._SingleNodeCall(node, "file_storage_dir_remove",
1133 def call_file_storage_dir_rename(self, node, old_file_storage_dir,
1134 new_file_storage_dir):
1135 """Rename file storage directory.
1137 This is a single-node call.
1140 return self._SingleNodeCall(node, "file_storage_dir_rename",
1141 [old_file_storage_dir, new_file_storage_dir])
1144 def call_jobqueue_update(cls, node_list, address_list, file_name, content):
1145 """Update job queue.
1147 This is a multi-node call.
1150 return cls._StaticMultiNodeCall(node_list, "jobqueue_update",
1151 [file_name, cls._Compress(content)],
1152 address_list=address_list)
1155 def call_jobqueue_purge(cls, node):
1158 This is a single-node call.
1161 return cls._StaticSingleNodeCall(node, "jobqueue_purge", [])
1164 def call_jobqueue_rename(cls, node_list, address_list, rename):
1165 """Rename a job queue file.
1167 This is a multi-node call.
1170 return cls._StaticMultiNodeCall(node_list, "jobqueue_rename", rename,
1171 address_list=address_list)
1174 def call_jobqueue_set_drain(cls, node_list, drain_flag):
1175 """Set the drain flag on the queue.
1177 This is a multi-node call.
1179 @type node_list: list
1180 @param node_list: the list of nodes to query
1181 @type drain_flag: bool
1182 @param drain_flag: if True, will set the drain flag, otherwise reset it.
1185 return cls._StaticMultiNodeCall(node_list, "jobqueue_set_drain",
1188 def call_hypervisor_validate_params(self, node_list, hvname, hvparams):
1189 """Validate the hypervisor params.
1191 This is a multi-node call.
1193 @type node_list: list
1194 @param node_list: the list of nodes to query
1195 @type hvname: string
1196 @param hvname: the hypervisor name
1197 @type hvparams: dict
1198 @param hvparams: the hypervisor parameters to be validated
1201 cluster = self._cfg.GetClusterInfo()
1202 hv_full = objects.FillDict(cluster.hvparams.get(hvname, {}), hvparams)
1203 return self._MultiNodeCall(node_list, "hypervisor_validate_params",
1206 def call_create_x509_certificate(self, node, validity):
1207 """Creates a new X509 certificate for SSL/TLS.
1209 This is a single-node call.
1212 @param validity: Validity in seconds
1215 return self._SingleNodeCall(node, "create_x509_certificate", [validity])
1217 def call_remove_x509_certificate(self, node, name):
1218 """Removes a X509 certificate.
1220 This is a single-node call.
1223 @param name: Certificate name
1226 return self._SingleNodeCall(node, "remove_x509_certificate", [name])
1228 def call_start_import_listener(self, node, x509_key_name, source_x509_ca,
1229 instance, dest, dest_args):
1230 """Starts a listener for an import.
1232 This is a single-node call.
1235 @param node: Node name
1236 @type instance: C{objects.Instance}
1237 @param instance: Instance object
1240 return self._SingleNodeCall(node, "start_import_listener",
1241 [x509_key_name, source_x509_ca,
1242 self._InstDict(instance), dest,
1243 _EncodeImportExportIO(dest, dest_args)])
1245 def call_start_export(self, node, x509_key_name, dest_x509_ca, host, port,
1246 instance, source, source_args):
1247 """Starts an export daemon.
1249 This is a single-node call.
1252 @param node: Node name
1253 @type instance: C{objects.Instance}
1254 @param instance: Instance object
1257 return self._SingleNodeCall(node, "start_export",
1258 [x509_key_name, dest_x509_ca, host, port,
1259 self._InstDict(instance), source,
1260 _EncodeImportExportIO(source, source_args)])
1262 def call_get_import_export_status(self, node, names):
1263 """Gets the status of an import or export.
1265 This is a single-node call.
1268 @param node: Node name
1269 @type names: List of strings
1270 @param names: Import/export names
1271 @rtype: List of L{objects.ImportExportStatus} instances
1272 @return: Returns a list of the state of each named import/export or None if
1273 a status couldn't be retrieved
1276 result = self._SingleNodeCall(node, "get_import_export_status", [names])
1278 if not result.fail_msg:
1281 for i in result.payload:
1283 decoded.append(None)
1285 decoded.append(objects.ImportExportStatus.FromDict(i))
1287 result.payload = decoded
1291 def call_cleanup_import_export(self, node, name):
1292 """Cleans up after an import or export.
1294 This is a single-node call.
1297 @param node: Node name
1299 @param name: Import/export name
1302 return self._SingleNodeCall(node, "cleanup_import_export", [name])