Fix epydoc format warnings
[ganeti-local] / lib / rpc.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Inter-node RPC library.
23
24 """
25
26 # pylint: disable-msg=C0103,R0201,R0904
27 # C0103: Invalid name, since call_ are not valid
28 # R0201: Method could be a function, we keep all rpcs instance methods
29 # as not to change them back and forth between static/instance methods
30 # if they need to start using instance attributes
31 # R0904: Too many public methods
32
33 import os
34 import socket
35 import logging
36 import zlib
37 import base64
38
39 from ganeti import utils
40 from ganeti import objects
41 from ganeti import http
42 from ganeti import serializer
43 from ganeti import constants
44 from ganeti import errors
45
46 import ganeti.http.client
47
48
49 # Module level variable
50 _http_manager = None
51
52
53 def Init():
54   """Initializes the module-global HTTP client manager.
55
56   Must be called before using any RPC function.
57
58   """
59   global _http_manager
60
61   assert not _http_manager, "RPC module initialized more than once"
62
63   _http_manager = http.client.HttpClientManager()
64
65
66 def Shutdown():
67   """Stops the module-global HTTP client manager.
68
69   Must be called before quitting the program.
70
71   """
72   global _http_manager
73
74   if _http_manager:
75     _http_manager.Shutdown()
76     _http_manager = None
77
78
79 class RpcResult(object):
80   """RPC Result class.
81
82   This class holds an RPC result. It is needed since in multi-node
83   calls we can't raise an exception just because one one out of many
84   failed, and therefore we use this class to encapsulate the result.
85
86   @ivar data: the data payload, for successfull results, or None
87   @type failed: boolean
88   @ivar failed: whether the operation failed at RPC level (not
89       application level on the remote node)
90   @ivar call: the name of the RPC call
91   @ivar node: the name of the node to which we made the call
92   @ivar offline: whether the operation failed because the node was
93       offline, as opposed to actual failure; offline=True will always
94       imply failed=True, in order to allow simpler checking if
95       the user doesn't care about the exact failure mode
96
97   """
98   def __init__(self, data=None, failed=False, offline=False,
99                call=None, node=None):
100     self.failed = failed
101     self.offline = offline
102     self.call = call
103     self.node = node
104     if offline:
105       self.failed = True
106       self.error = "Node is marked offline"
107       self.data = None
108     elif failed:
109       self.error = data
110       self.data = None
111     else:
112       self.data = data
113       self.error = None
114
115   def Raise(self):
116     """If the result has failed, raise an OpExecError.
117
118     This is used so that LU code doesn't have to check for each
119     result, but instead can call this function.
120
121     """
122     if self.failed:
123       raise errors.OpExecError("Call '%s' to node '%s' has failed: %s" %
124                                (self.call, self.node, self.error))
125
126
127 class Client:
128   """RPC Client class.
129
130   This class, given a (remote) method name, a list of parameters and a
131   list of nodes, will contact (in parallel) all nodes, and return a
132   dict of results (key: node name, value: result).
133
134   One current bug is that generic failure is still signalled by
135   'False' result, which is not good. This overloading of values can
136   cause bugs.
137
138   """
139   def __init__(self, procedure, body, port):
140     self.procedure = procedure
141     self.body = body
142     self.port = port
143     self.nc = {}
144
145     self._ssl_params = \
146       http.HttpSslParams(ssl_key_path=constants.SSL_CERT_FILE,
147                          ssl_cert_path=constants.SSL_CERT_FILE)
148
149   def ConnectList(self, node_list, address_list=None):
150     """Add a list of nodes to the target nodes.
151
152     @type node_list: list
153     @param node_list: the list of node names to connect
154     @type address_list: list or None
155     @keyword address_list: either None or a list with node addresses,
156         which must have the same length as the node list
157
158     """
159     if address_list is None:
160       address_list = [None for _ in node_list]
161     else:
162       assert len(node_list) == len(address_list), \
163              "Name and address lists should have the same length"
164     for node, address in zip(node_list, address_list):
165       self.ConnectNode(node, address)
166
167   def ConnectNode(self, name, address=None):
168     """Add a node to the target list.
169
170     @type name: str
171     @param name: the node name
172     @type address: str
173     @keyword address: the node address, if known
174
175     """
176     if address is None:
177       address = name
178
179     self.nc[name] = \
180       http.client.HttpClientRequest(address, self.port, http.HTTP_PUT,
181                                     "/%s" % self.procedure,
182                                     post_data=self.body,
183                                     ssl_params=self._ssl_params,
184                                     ssl_verify_peer=True)
185
186   def GetResults(self):
187     """Call nodes and return results.
188
189     @rtype: list
190     @returns: List of RPC results
191
192     """
193     assert _http_manager, "RPC module not intialized"
194
195     _http_manager.ExecRequests(self.nc.values())
196
197     results = {}
198
199     for name, req in self.nc.iteritems():
200       if req.success and req.resp_status_code == http.HTTP_OK:
201         results[name] = RpcResult(data=serializer.LoadJson(req.resp_body),
202                                   node=name, call=self.procedure)
203         continue
204
205       # TODO: Better error reporting
206       if req.error:
207         msg = req.error
208       else:
209         msg = req.resp_body
210
211       logging.error("RPC error from node %s: %s", name, msg)
212       results[name] = RpcResult(data=msg, failed=True, node=name,
213                                 call=self.procedure)
214
215     return results
216
217
218 class RpcRunner(object):
219   """RPC runner class"""
220
221   def __init__(self, cfg):
222     """Initialized the rpc runner.
223
224     @type cfg:  C{config.ConfigWriter}
225     @param cfg: the configuration object that will be used to get data
226                 about the cluster
227
228     """
229     self._cfg = cfg
230     self.port = utils.GetNodeDaemonPort()
231
232   def _InstDict(self, instance):
233     """Convert the given instance to a dict.
234
235     This is done via the instance's ToDict() method and additionally
236     we fill the hvparams with the cluster defaults.
237
238     @type instance: L{objects.Instance}
239     @param instance: an Instance object
240     @rtype: dict
241     @return: the instance dict, with the hvparams filled with the
242         cluster defaults
243
244     """
245     idict = instance.ToDict()
246     cluster = self._cfg.GetClusterInfo()
247     idict["hvparams"] = cluster.FillHV(instance)
248     idict["beparams"] = cluster.FillBE(instance)
249     return idict
250
251   def _ConnectList(self, client, node_list):
252     """Helper for computing node addresses.
253
254     @type client: L{Client}
255     @param client: a C{Client} instance
256     @type node_list: list
257     @param node_list: the node list we should connect
258
259     """
260     all_nodes = self._cfg.GetAllNodesInfo()
261     name_list = []
262     addr_list = []
263     skip_dict = {}
264     for node in node_list:
265       if node in all_nodes:
266         if all_nodes[node].offline:
267           skip_dict[node] = RpcResult(node=node, offline=True)
268           continue
269         val = all_nodes[node].primary_ip
270       else:
271         val = None
272       addr_list.append(val)
273       name_list.append(node)
274     if name_list:
275       client.ConnectList(name_list, address_list=addr_list)
276     return skip_dict
277
278   def _ConnectNode(self, client, node):
279     """Helper for computing one node's address.
280
281     @type client: L{Client}
282     @param client: a C{Client} instance
283     @type node: str
284     @param node: the node we should connect
285
286     """
287     node_info = self._cfg.GetNodeInfo(node)
288     if node_info is not None:
289       if node_info.offline:
290         return RpcResult(node=node, offline=True)
291       addr = node_info.primary_ip
292     else:
293       addr = None
294     client.ConnectNode(node, address=addr)
295
296   def _MultiNodeCall(self, node_list, procedure, args):
297     """Helper for making a multi-node call
298
299     """
300     body = serializer.DumpJson(args, indent=False)
301     c = Client(procedure, body, self.port)
302     skip_dict = self._ConnectList(c, node_list)
303     skip_dict.update(c.GetResults())
304     return skip_dict
305
306   @classmethod
307   def _StaticMultiNodeCall(cls, node_list, procedure, args,
308                            address_list=None):
309     """Helper for making a multi-node static call
310
311     """
312     body = serializer.DumpJson(args, indent=False)
313     c = Client(procedure, body, utils.GetNodeDaemonPort())
314     c.ConnectList(node_list, address_list=address_list)
315     return c.GetResults()
316
317   def _SingleNodeCall(self, node, procedure, args):
318     """Helper for making a single-node call
319
320     """
321     body = serializer.DumpJson(args, indent=False)
322     c = Client(procedure, body, self.port)
323     result = self._ConnectNode(c, node)
324     if result is None:
325       # we did connect, node is not offline
326       result = c.GetResults()[node]
327     return result
328
329   @classmethod
330   def _StaticSingleNodeCall(cls, node, procedure, args):
331     """Helper for making a single-node static call
332
333     """
334     body = serializer.DumpJson(args, indent=False)
335     c = Client(procedure, body, utils.GetNodeDaemonPort())
336     c.ConnectNode(node)
337     return c.GetResults()[node]
338
339   @staticmethod
340   def _Compress(data):
341     """Compresses a string for transport over RPC.
342
343     Small amounts of data are not compressed.
344
345     @type data: str
346     @param data: Data
347     @rtype: tuple
348     @return: Encoded data to send
349
350     """
351     # Small amounts of data are not compressed
352     if len(data) < 512:
353       return (constants.RPC_ENCODING_NONE, data)
354
355     # Compress with zlib and encode in base64
356     return (constants.RPC_ENCODING_ZLIB_BASE64,
357             base64.b64encode(zlib.compress(data, 3)))
358
359   #
360   # Begin RPC calls
361   #
362
363   def call_volume_list(self, node_list, vg_name):
364     """Gets the logical volumes present in a given volume group.
365
366     This is a multi-node call.
367
368     """
369     return self._MultiNodeCall(node_list, "volume_list", [vg_name])
370
371   def call_vg_list(self, node_list):
372     """Gets the volume group list.
373
374     This is a multi-node call.
375
376     """
377     return self._MultiNodeCall(node_list, "vg_list", [])
378
379   def call_bridges_exist(self, node, bridges_list):
380     """Checks if a node has all the bridges given.
381
382     This method checks if all bridges given in the bridges_list are
383     present on the remote node, so that an instance that uses interfaces
384     on those bridges can be started.
385
386     This is a single-node call.
387
388     """
389     return self._SingleNodeCall(node, "bridges_exist", [bridges_list])
390
391   def call_instance_start(self, node, instance, extra_args):
392     """Starts an instance.
393
394     This is a single-node call.
395
396     """
397     return self._SingleNodeCall(node, "instance_start",
398                                 [self._InstDict(instance), extra_args])
399
400   def call_instance_shutdown(self, node, instance):
401     """Stops an instance.
402
403     This is a single-node call.
404
405     """
406     return self._SingleNodeCall(node, "instance_shutdown",
407                                 [self._InstDict(instance)])
408
409   def call_instance_migrate(self, node, instance, target, live):
410     """Migrate an instance.
411
412     This is a single-node call.
413
414     @type node: string
415     @param node: the node on which the instance is currently running
416     @type instance: C{objects.Instance}
417     @param instance: the instance definition
418     @type target: string
419     @param target: the target node name
420     @type live: boolean
421     @param live: whether the migration should be done live or not (the
422         interpretation of this parameter is left to the hypervisor)
423
424     """
425     return self._SingleNodeCall(node, "instance_migrate",
426                                 [self._InstDict(instance), target, live])
427
428   def call_instance_reboot(self, node, instance, reboot_type, extra_args):
429     """Reboots an instance.
430
431     This is a single-node call.
432
433     """
434     return self._SingleNodeCall(node, "instance_reboot",
435                                 [self._InstDict(instance), reboot_type,
436                                  extra_args])
437
438   def call_instance_os_add(self, node, inst):
439     """Installs an OS on the given instance.
440
441     This is a single-node call.
442
443     """
444     return self._SingleNodeCall(node, "instance_os_add",
445                                 [self._InstDict(inst)])
446
447   def call_instance_run_rename(self, node, inst, old_name):
448     """Run the OS rename script for an instance.
449
450     This is a single-node call.
451
452     """
453     return self._SingleNodeCall(node, "instance_run_rename",
454                                 [self._InstDict(inst), old_name])
455
456   def call_instance_info(self, node, instance, hname):
457     """Returns information about a single instance.
458
459     This is a single-node call.
460
461     @type node: list
462     @param node: the list of nodes to query
463     @type instance: string
464     @param instance: the instance name
465     @type hname: string
466     @param hname: the hypervisor type of the instance
467
468     """
469     return self._SingleNodeCall(node, "instance_info", [instance, hname])
470
471   def call_all_instances_info(self, node_list, hypervisor_list):
472     """Returns information about all instances on the given nodes.
473
474     This is a multi-node call.
475
476     @type node_list: list
477     @param node_list: the list of nodes to query
478     @type hypervisor_list: list
479     @param hypervisor_list: the hypervisors to query for instances
480
481     """
482     return self._MultiNodeCall(node_list, "all_instances_info",
483                                [hypervisor_list])
484
485   def call_instance_list(self, node_list, hypervisor_list):
486     """Returns the list of running instances on a given node.
487
488     This is a multi-node call.
489
490     @type node_list: list
491     @param node_list: the list of nodes to query
492     @type hypervisor_list: list
493     @param hypervisor_list: the hypervisors to query for instances
494
495     """
496     return self._MultiNodeCall(node_list, "instance_list", [hypervisor_list])
497
498   def call_node_tcp_ping(self, node, source, target, port, timeout,
499                          live_port_needed):
500     """Do a TcpPing on the remote node
501
502     This is a single-node call.
503
504     """
505     return self._SingleNodeCall(node, "node_tcp_ping",
506                                 [source, target, port, timeout,
507                                  live_port_needed])
508
509   def call_node_has_ip_address(self, node, address):
510     """Checks if a node has the given IP address.
511
512     This is a single-node call.
513
514     """
515     return self._SingleNodeCall(node, "node_has_ip_address", [address])
516
517   def call_node_info(self, node_list, vg_name, hypervisor_type):
518     """Return node information.
519
520     This will return memory information and volume group size and free
521     space.
522
523     This is a multi-node call.
524
525     @type node_list: list
526     @param node_list: the list of nodes to query
527     @type vg_name: C{string}
528     @param vg_name: the name of the volume group to ask for disk space
529         information
530     @type hypervisor_type: C{str}
531     @param hypervisor_type: the name of the hypervisor to ask for
532         memory information
533
534     """
535     retux = self._MultiNodeCall(node_list, "node_info",
536                                 [vg_name, hypervisor_type])
537
538     for result in retux.itervalues():
539       if result.failed or not isinstance(result.data, dict):
540         result.data = {}
541
542       utils.CheckDict(result.data, {
543         'memory_total' : '-',
544         'memory_dom0' : '-',
545         'memory_free' : '-',
546         'vg_size' : 'node_unreachable',
547         'vg_free' : '-',
548         }, "call_node_info")
549     return retux
550
551   def call_node_add(self, node, dsa, dsapub, rsa, rsapub, ssh, sshpub):
552     """Add a node to the cluster.
553
554     This is a single-node call.
555
556     """
557     return self._SingleNodeCall(node, "node_add",
558                                 [dsa, dsapub, rsa, rsapub, ssh, sshpub])
559
560   def call_node_verify(self, node_list, checkdict, cluster_name):
561     """Request verification of given parameters.
562
563     This is a multi-node call.
564
565     """
566     return self._MultiNodeCall(node_list, "node_verify",
567                                [checkdict, cluster_name])
568
569   @classmethod
570   def call_node_start_master(cls, node, start_daemons):
571     """Tells a node to activate itself as a master.
572
573     This is a single-node call.
574
575     """
576     return cls._StaticSingleNodeCall(node, "node_start_master",
577                                      [start_daemons])
578
579   @classmethod
580   def call_node_stop_master(cls, node, stop_daemons):
581     """Tells a node to demote itself from master status.
582
583     This is a single-node call.
584
585     """
586     return cls._StaticSingleNodeCall(node, "node_stop_master", [stop_daemons])
587
588   @classmethod
589   def call_master_info(cls, node_list):
590     """Query master info.
591
592     This is a multi-node call.
593
594     """
595     # TODO: should this method query down nodes?
596     return cls._StaticMultiNodeCall(node_list, "master_info", [])
597
598   def call_version(self, node_list):
599     """Query node version.
600
601     This is a multi-node call.
602
603     """
604     return self._MultiNodeCall(node_list, "version", [])
605
606   def call_blockdev_create(self, node, bdev, size, owner, on_primary, info):
607     """Request creation of a given block device.
608
609     This is a single-node call.
610
611     """
612     return self._SingleNodeCall(node, "blockdev_create",
613                                 [bdev.ToDict(), size, owner, on_primary, info])
614
615   def call_blockdev_remove(self, node, bdev):
616     """Request removal of a given block device.
617
618     This is a single-node call.
619
620     """
621     return self._SingleNodeCall(node, "blockdev_remove", [bdev.ToDict()])
622
623   def call_blockdev_rename(self, node, devlist):
624     """Request rename of the given block devices.
625
626     This is a single-node call.
627
628     """
629     return self._SingleNodeCall(node, "blockdev_rename",
630                                 [(d.ToDict(), uid) for d, uid in devlist])
631
632   def call_blockdev_assemble(self, node, disk, owner, on_primary):
633     """Request assembling of a given block device.
634
635     This is a single-node call.
636
637     """
638     return self._SingleNodeCall(node, "blockdev_assemble",
639                                 [disk.ToDict(), owner, on_primary])
640
641   def call_blockdev_shutdown(self, node, disk):
642     """Request shutdown of a given block device.
643
644     This is a single-node call.
645
646     """
647     return self._SingleNodeCall(node, "blockdev_shutdown", [disk.ToDict()])
648
649   def call_blockdev_addchildren(self, node, bdev, ndevs):
650     """Request adding a list of children to a (mirroring) device.
651
652     This is a single-node call.
653
654     """
655     return self._SingleNodeCall(node, "blockdev_addchildren",
656                                 [bdev.ToDict(),
657                                  [disk.ToDict() for disk in ndevs]])
658
659   def call_blockdev_removechildren(self, node, bdev, ndevs):
660     """Request removing a list of children from a (mirroring) device.
661
662     This is a single-node call.
663
664     """
665     return self._SingleNodeCall(node, "blockdev_removechildren",
666                                 [bdev.ToDict(),
667                                  [disk.ToDict() for disk in ndevs]])
668
669   def call_blockdev_getmirrorstatus(self, node, disks):
670     """Request status of a (mirroring) device.
671
672     This is a single-node call.
673
674     """
675     return self._SingleNodeCall(node, "blockdev_getmirrorstatus",
676                                 [dsk.ToDict() for dsk in disks])
677
678   def call_blockdev_find(self, node, disk):
679     """Request identification of a given block device.
680
681     This is a single-node call.
682
683     """
684     return self._SingleNodeCall(node, "blockdev_find", [disk.ToDict()])
685
686   def call_blockdev_close(self, node, disks):
687     """Closes the given block devices.
688
689     This is a single-node call.
690
691     """
692     return self._SingleNodeCall(node, "blockdev_close",
693                                 [cf.ToDict() for cf in disks])
694
695   @classmethod
696   def call_upload_file(cls, node_list, file_name, address_list=None):
697     """Upload a file.
698
699     The node will refuse the operation in case the file is not on the
700     approved file list.
701
702     This is a multi-node call.
703
704     @type node_list: list
705     @param node_list: the list of node names to upload to
706     @type file_name: str
707     @param file_name: the filename to upload
708     @type address_list: list or None
709     @keyword address_list: an optional list of node addresses, in order
710         to optimize the RPC speed
711
712     """
713     file_contents = utils.ReadFile(file_name)
714     data = cls._Compress(file_contents)
715     st = os.stat(file_name)
716     params = [file_name, data, st.st_mode, st.st_uid, st.st_gid,
717               st.st_atime, st.st_mtime]
718     return cls._StaticMultiNodeCall(node_list, "upload_file", params,
719                                     address_list=address_list)
720
721   @classmethod
722   def call_write_ssconf_files(cls, node_list, values):
723     """Write ssconf files.
724
725     This is a multi-node call.
726
727     """
728     return cls._StaticMultiNodeCall(node_list, "write_ssconf_files", [values])
729
730   def call_os_diagnose(self, node_list):
731     """Request a diagnose of OS definitions.
732
733     This is a multi-node call.
734
735     """
736     result = self._MultiNodeCall(node_list, "os_diagnose", [])
737
738     for node_name, node_result in result.iteritems():
739       if not node_result.failed and node_result.data:
740         node_result.data = [objects.OS.FromDict(oss)
741                             for oss in node_result.data]
742     return result
743
744   def call_os_get(self, node, name):
745     """Returns an OS definition.
746
747     This is a single-node call.
748
749     """
750     result = self._SingleNodeCall(node, "os_get", [name])
751     if not result.failed and isinstance(result.data, dict):
752       result.data = objects.OS.FromDict(result.data)
753     return result
754
755   def call_hooks_runner(self, node_list, hpath, phase, env):
756     """Call the hooks runner.
757
758     Args:
759       - op: the OpCode instance
760       - env: a dictionary with the environment
761
762     This is a multi-node call.
763
764     """
765     params = [hpath, phase, env]
766     return self._MultiNodeCall(node_list, "hooks_runner", params)
767
768   def call_iallocator_runner(self, node, name, idata):
769     """Call an iallocator on a remote node
770
771     Args:
772       - name: the iallocator name
773       - input: the json-encoded input string
774
775     This is a single-node call.
776
777     """
778     return self._SingleNodeCall(node, "iallocator_runner", [name, idata])
779
780   def call_blockdev_grow(self, node, cf_bdev, amount):
781     """Request a snapshot of the given block device.
782
783     This is a single-node call.
784
785     """
786     return self._SingleNodeCall(node, "blockdev_grow",
787                                 [cf_bdev.ToDict(), amount])
788
789   def call_blockdev_snapshot(self, node, cf_bdev):
790     """Request a snapshot of the given block device.
791
792     This is a single-node call.
793
794     """
795     return self._SingleNodeCall(node, "blockdev_snapshot", [cf_bdev.ToDict()])
796
797   def call_snapshot_export(self, node, snap_bdev, dest_node, instance,
798                            cluster_name, idx):
799     """Request the export of a given snapshot.
800
801     This is a single-node call.
802
803     """
804     return self._SingleNodeCall(node, "snapshot_export",
805                                 [snap_bdev.ToDict(), dest_node,
806                                  self._InstDict(instance), cluster_name, idx])
807
808   def call_finalize_export(self, node, instance, snap_disks):
809     """Request the completion of an export operation.
810
811     This writes the export config file, etc.
812
813     This is a single-node call.
814
815     """
816     flat_disks = []
817     for disk in snap_disks:
818       flat_disks.append(disk.ToDict())
819
820     return self._SingleNodeCall(node, "finalize_export",
821                                 [self._InstDict(instance), flat_disks])
822
823   def call_export_info(self, node, path):
824     """Queries the export information in a given path.
825
826     This is a single-node call.
827
828     """
829     result = self._SingleNodeCall(node, "export_info", [path])
830     if not result.failed and result.data:
831       result.data = objects.SerializableConfigParser.Loads(str(result.data))
832     return result
833
834   def call_instance_os_import(self, node, inst, src_node, src_images,
835                               cluster_name):
836     """Request the import of a backup into an instance.
837
838     This is a single-node call.
839
840     """
841     return self._SingleNodeCall(node, "instance_os_import",
842                                 [self._InstDict(inst), src_node, src_images,
843                                  cluster_name])
844
845   def call_export_list(self, node_list):
846     """Gets the stored exports list.
847
848     This is a multi-node call.
849
850     """
851     return self._MultiNodeCall(node_list, "export_list", [])
852
853   def call_export_remove(self, node, export):
854     """Requests removal of a given export.
855
856     This is a single-node call.
857
858     """
859     return self._SingleNodeCall(node, "export_remove", [export])
860
861   @classmethod
862   def call_node_leave_cluster(cls, node):
863     """Requests a node to clean the cluster information it has.
864
865     This will remove the configuration information from the ganeti data
866     dir.
867
868     This is a single-node call.
869
870     """
871     return cls._StaticSingleNodeCall(node, "node_leave_cluster", [])
872
873   def call_node_volumes(self, node_list):
874     """Gets all volumes on node(s).
875
876     This is a multi-node call.
877
878     """
879     return self._MultiNodeCall(node_list, "node_volumes", [])
880
881   def call_node_demote_from_mc(self, node):
882     """Demote a node from the master candidate role.
883
884     This is a single-node call.
885
886     """
887     return self._SingleNodeCall(node, "node_demote_from_mc", [])
888
889   def call_test_delay(self, node_list, duration):
890     """Sleep for a fixed time on given node(s).
891
892     This is a multi-node call.
893
894     """
895     return self._MultiNodeCall(node_list, "test_delay", [duration])
896
897   def call_file_storage_dir_create(self, node, file_storage_dir):
898     """Create the given file storage directory.
899
900     This is a single-node call.
901
902     """
903     return self._SingleNodeCall(node, "file_storage_dir_create",
904                                 [file_storage_dir])
905
906   def call_file_storage_dir_remove(self, node, file_storage_dir):
907     """Remove the given file storage directory.
908
909     This is a single-node call.
910
911     """
912     return self._SingleNodeCall(node, "file_storage_dir_remove",
913                                 [file_storage_dir])
914
915   def call_file_storage_dir_rename(self, node, old_file_storage_dir,
916                                    new_file_storage_dir):
917     """Rename file storage directory.
918
919     This is a single-node call.
920
921     """
922     return self._SingleNodeCall(node, "file_storage_dir_rename",
923                                 [old_file_storage_dir, new_file_storage_dir])
924
925   @classmethod
926   def call_jobqueue_update(cls, node_list, address_list, file_name, content):
927     """Update job queue.
928
929     This is a multi-node call.
930
931     """
932     return cls._StaticMultiNodeCall(node_list, "jobqueue_update",
933                                     [file_name, cls._Compress(content)],
934                                     address_list=address_list)
935
936   @classmethod
937   def call_jobqueue_purge(cls, node):
938     """Purge job queue.
939
940     This is a single-node call.
941
942     """
943     return cls._StaticSingleNodeCall(node, "jobqueue_purge", [])
944
945   @classmethod
946   def call_jobqueue_rename(cls, node_list, address_list, old, new):
947     """Rename a job queue file.
948
949     This is a multi-node call.
950
951     """
952     return cls._StaticMultiNodeCall(node_list, "jobqueue_rename", [old, new],
953                                     address_list=address_list)
954
955   @classmethod
956   def call_jobqueue_set_drain(cls, node_list, drain_flag):
957     """Set the drain flag on the queue.
958
959     This is a multi-node call.
960
961     @type node_list: list
962     @param node_list: the list of nodes to query
963     @type drain_flag: bool
964     @param drain_flag: if True, will set the drain flag, otherwise reset it.
965
966     """
967     return cls._StaticMultiNodeCall(node_list, "jobqueue_set_drain",
968                                     [drain_flag])
969
970   def call_hypervisor_validate_params(self, node_list, hvname, hvparams):
971     """Validate the hypervisor params.
972
973     This is a multi-node call.
974
975     @type node_list: list
976     @param node_list: the list of nodes to query
977     @type hvname: string
978     @param hvname: the hypervisor name
979     @type hvparams: dict
980     @param hvparams: the hypervisor parameters to be validated
981
982     """
983     cluster = self._cfg.GetClusterInfo()
984     hv_full = cluster.FillDict(cluster.hvparams.get(hvname, {}), hvparams)
985     return self._MultiNodeCall(node_list, "hypervisor_validate_params",
986                                [hvname, hv_full])