Cleanup the config file on demotion from candidate
[ganeti-local] / lib / rpc.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Inter-node RPC library.
23
24 """
25
26 # pylint: disable-msg=C0103,R0201,R0904
27 # C0103: Invalid name, since call_ are not valid
28 # R0201: Method could be a function, we keep all rpcs instance methods
29 # as not to change them back and forth between static/instance methods
30 # if they need to start using instance attributes
31 # R0904: Too many public methods
32
33 import os
34 import socket
35 import logging
36
37 from ganeti import utils
38 from ganeti import objects
39 from ganeti import http
40 from ganeti import serializer
41 from ganeti import constants
42 from ganeti import errors
43
44 import ganeti.http.client
45
46
47 # Module level variable
48 _http_manager = None
49
50
51 def Init():
52   """Initializes the module-global HTTP client manager.
53
54   Must be called before using any RPC function.
55
56   """
57   global _http_manager
58
59   assert not _http_manager, "RPC module initialized more than once"
60
61   _http_manager = http.client.HttpClientManager()
62
63
64 def Shutdown():
65   """Stops the module-global HTTP client manager.
66
67   Must be called before quitting the program.
68
69   """
70   global _http_manager
71
72   if _http_manager:
73     _http_manager.Shutdown()
74     _http_manager = None
75
76
77 class RpcResult(object):
78   """RPC Result class.
79
80   This class holds an RPC result. It is needed since in multi-node
81   calls we can't raise an exception just because one one out of many
82   failed, and therefore we use this class to encapsulate the result.
83
84   """
85   def __init__(self, data, failed=False, call=None, node=None):
86     self.failed = failed
87     self.call = None
88     self.node = None
89     if failed:
90       self.error = data
91       self.data = None
92     else:
93       self.data = data
94       self.error = None
95
96   def Raise(self):
97     """If the result has failed, raise an OpExecError.
98
99     This is used so that LU code doesn't have to check for each
100     result, but instead can call this function.
101
102     """
103     if self.failed:
104       raise errors.OpExecError("Call '%s' to node '%s' has failed: %s" %
105                                (self.call, self.node, self.error))
106
107
108 class Client:
109   """RPC Client class.
110
111   This class, given a (remote) method name, a list of parameters and a
112   list of nodes, will contact (in parallel) all nodes, and return a
113   dict of results (key: node name, value: result).
114
115   One current bug is that generic failure is still signalled by
116   'False' result, which is not good. This overloading of values can
117   cause bugs.
118
119   """
120   def __init__(self, procedure, body, port):
121     self.procedure = procedure
122     self.body = body
123     self.port = port
124     self.nc = {}
125
126     self._ssl_params = \
127       http.HttpSslParams(ssl_key_path=constants.SSL_CERT_FILE,
128                          ssl_cert_path=constants.SSL_CERT_FILE)
129
130   def ConnectList(self, node_list, address_list=None):
131     """Add a list of nodes to the target nodes.
132
133     @type node_list: list
134     @param node_list: the list of node names to connect
135     @type address_list: list or None
136     @keyword address_list: either None or a list with node addresses,
137         which must have the same length as the node list
138
139     """
140     if address_list is None:
141       address_list = [None for _ in node_list]
142     else:
143       assert len(node_list) == len(address_list), \
144              "Name and address lists should have the same length"
145     for node, address in zip(node_list, address_list):
146       self.ConnectNode(node, address)
147
148   def ConnectNode(self, name, address=None):
149     """Add a node to the target list.
150
151     @type name: str
152     @param name: the node name
153     @type address: str
154     @keyword address: the node address, if known
155
156     """
157     if address is None:
158       address = name
159
160     self.nc[name] = \
161       http.client.HttpClientRequest(address, self.port, http.HTTP_PUT,
162                                     "/%s" % self.procedure,
163                                     post_data=self.body,
164                                     ssl_params=self._ssl_params,
165                                     ssl_verify_peer=True)
166
167   def GetResults(self):
168     """Call nodes and return results.
169
170     @rtype: list
171     @returns: List of RPC results
172
173     """
174     assert _http_manager, "RPC module not intialized"
175
176     _http_manager.ExecRequests(self.nc.values())
177
178     results = {}
179
180     for name, req in self.nc.iteritems():
181       if req.success and req.resp_status_code == http.HTTP_OK:
182         results[name] = RpcResult(data=serializer.LoadJson(req.resp_body),
183                                   node=name, call=self.procedure)
184         continue
185
186       # TODO: Better error reporting
187       if req.error:
188         msg = req.error
189       else:
190         msg = req.resp_body
191
192       logging.error("RPC error from node %s: %s", name, msg)
193       results[name] = RpcResult(data=msg, failed=True, node=name,
194                                 call=self.procedure)
195
196     return results
197
198
199 class RpcRunner(object):
200   """RPC runner class"""
201
202   def __init__(self, cfg):
203     """Initialized the rpc runner.
204
205     @type cfg:  C{config.ConfigWriter}
206     @param cfg: the configuration object that will be used to get data
207                 about the cluster
208
209     """
210     self._cfg = cfg
211     self.port = utils.GetNodeDaemonPort()
212
213   def _InstDict(self, instance):
214     """Convert the given instance to a dict.
215
216     This is done via the instance's ToDict() method and additionally
217     we fill the hvparams with the cluster defaults.
218
219     @type instance: L{objects.Instance}
220     @param instance: an Instance object
221     @rtype: dict
222     @return: the instance dict, with the hvparams filled with the
223         cluster defaults
224
225     """
226     idict = instance.ToDict()
227     cluster = self._cfg.GetClusterInfo()
228     idict["hvparams"] = cluster.FillHV(instance)
229     idict["beparams"] = cluster.FillBE(instance)
230     return idict
231
232   def _ConnectList(self, client, node_list):
233     """Helper for computing node addresses.
234
235     @type client: L{Client}
236     @param client: a C{Client} instance
237     @type node_list: list
238     @param node_list: the node list we should connect
239
240     """
241     all_nodes = self._cfg.GetAllNodesInfo()
242     addr_list = []
243     for node in node_list:
244       if node in all_nodes:
245         val = all_nodes[node].primary_ip
246       else:
247         val = None
248       addr_list.append(val)
249     client.ConnectList(node_list, address_list=addr_list)
250
251   def _ConnectNode(self, client, node):
252     """Helper for computing one node's address.
253
254     @type client: L{Client}
255     @param client: a C{Client} instance
256     @type node: str
257     @param node: the node we should connect
258
259     """
260     node_info = self._cfg.GetNodeInfo(node)
261     if node_info is not None:
262       addr = node_info.primary_ip
263     else:
264       addr = None
265     client.ConnectNode(node, address=addr)
266
267   def _MultiNodeCall(self, node_list, procedure, args,
268                      address_list=None):
269     """Helper for making a multi-node call
270
271     """
272     body = serializer.DumpJson(args, indent=False)
273     c = Client(procedure, body, self.port)
274     if address_list is None:
275       self._ConnectList(c, node_list)
276     else:
277       c.ConnectList(node_list, address_list=address_list)
278     return c.GetResults()
279
280   @classmethod
281   def _StaticMultiNodeCall(cls, node_list, procedure, args,
282                            address_list=None):
283     """Helper for making a multi-node static call
284
285     """
286     body = serializer.DumpJson(args, indent=False)
287     c = Client(procedure, body, utils.GetNodeDaemonPort())
288     c.ConnectList(node_list, address_list=address_list)
289     return c.GetResults()
290
291   def _SingleNodeCall(self, node, procedure, args):
292     """Helper for making a single-node call
293
294     """
295     body = serializer.DumpJson(args, indent=False)
296     c = Client(procedure, body, self.port)
297     self._ConnectNode(c, node)
298     return c.GetResults().get(node, False)
299
300   @classmethod
301   def _StaticSingleNodeCall(cls, node, procedure, args):
302     """Helper for making a single-node static call
303
304     """
305     body = serializer.DumpJson(args, indent=False)
306     c = Client(procedure, body, utils.GetNodeDaemonPort())
307     c.ConnectNode(node)
308     return c.GetResults().get(node, False)
309
310   #
311   # Begin RPC calls
312   #
313
314   def call_volume_list(self, node_list, vg_name):
315     """Gets the logical volumes present in a given volume group.
316
317     This is a multi-node call.
318
319     """
320     return self._MultiNodeCall(node_list, "volume_list", [vg_name])
321
322   def call_vg_list(self, node_list):
323     """Gets the volume group list.
324
325     This is a multi-node call.
326
327     """
328     return self._MultiNodeCall(node_list, "vg_list", [])
329
330   def call_bridges_exist(self, node, bridges_list):
331     """Checks if a node has all the bridges given.
332
333     This method checks if all bridges given in the bridges_list are
334     present on the remote node, so that an instance that uses interfaces
335     on those bridges can be started.
336
337     This is a single-node call.
338
339     """
340     return self._SingleNodeCall(node, "bridges_exist", [bridges_list])
341
342   def call_instance_start(self, node, instance, extra_args):
343     """Starts an instance.
344
345     This is a single-node call.
346
347     """
348     return self._SingleNodeCall(node, "instance_start",
349                                 [self._InstDict(instance), extra_args])
350
351   def call_instance_shutdown(self, node, instance):
352     """Stops an instance.
353
354     This is a single-node call.
355
356     """
357     return self._SingleNodeCall(node, "instance_shutdown",
358                                 [self._InstDict(instance)])
359
360   def call_instance_migrate(self, node, instance, target, live):
361     """Migrate an instance.
362
363     This is a single-node call.
364
365     @type node: string
366     @param node: the node on which the instance is currently running
367     @type instance: C{objects.Instance}
368     @param instance: the instance definition
369     @type target: string
370     @param target: the target node name
371     @type live: boolean
372     @param live: whether the migration should be done live or not (the
373         interpretation of this parameter is left to the hypervisor)
374
375     """
376     return self._SingleNodeCall(node, "instance_migrate",
377                                 [self._InstDict(instance), target, live])
378
379   def call_instance_reboot(self, node, instance, reboot_type, extra_args):
380     """Reboots an instance.
381
382     This is a single-node call.
383
384     """
385     return self._SingleNodeCall(node, "instance_reboot",
386                                 [self._InstDict(instance), reboot_type,
387                                  extra_args])
388
389   def call_instance_os_add(self, node, inst):
390     """Installs an OS on the given instance.
391
392     This is a single-node call.
393
394     """
395     return self._SingleNodeCall(node, "instance_os_add",
396                                 [self._InstDict(inst)])
397
398   def call_instance_run_rename(self, node, inst, old_name):
399     """Run the OS rename script for an instance.
400
401     This is a single-node call.
402
403     """
404     return self._SingleNodeCall(node, "instance_run_rename",
405                                 [self._InstDict(inst), old_name])
406
407   def call_instance_info(self, node, instance, hname):
408     """Returns information about a single instance.
409
410     This is a single-node call.
411
412     @type node: list
413     @param node: the list of nodes to query
414     @type instance: string
415     @param instance: the instance name
416     @type hname: string
417     @param hname: the hypervisor type of the instance
418
419     """
420     return self._SingleNodeCall(node, "instance_info", [instance, hname])
421
422   def call_all_instances_info(self, node_list, hypervisor_list):
423     """Returns information about all instances on the given nodes.
424
425     This is a multi-node call.
426
427     @type node_list: list
428     @param node_list: the list of nodes to query
429     @type hypervisor_list: list
430     @param hypervisor_list: the hypervisors to query for instances
431
432     """
433     return self._MultiNodeCall(node_list, "all_instances_info",
434                                [hypervisor_list])
435
436   def call_instance_list(self, node_list, hypervisor_list):
437     """Returns the list of running instances on a given node.
438
439     This is a multi-node call.
440
441     @type node_list: list
442     @param node_list: the list of nodes to query
443     @type hypervisor_list: list
444     @param hypervisor_list: the hypervisors to query for instances
445
446     """
447     return self._MultiNodeCall(node_list, "instance_list", [hypervisor_list])
448
449   def call_node_tcp_ping(self, node, source, target, port, timeout,
450                          live_port_needed):
451     """Do a TcpPing on the remote node
452
453     This is a single-node call.
454
455     """
456     return self._SingleNodeCall(node, "node_tcp_ping",
457                                 [source, target, port, timeout,
458                                  live_port_needed])
459
460   def call_node_has_ip_address(self, node, address):
461     """Checks if a node has the given IP address.
462
463     This is a single-node call.
464
465     """
466     return self._SingleNodeCall(node, "node_has_ip_address", [address])
467
468   def call_node_info(self, node_list, vg_name, hypervisor_type):
469     """Return node information.
470
471     This will return memory information and volume group size and free
472     space.
473
474     This is a multi-node call.
475
476     @type node_list: list
477     @param node_list: the list of nodes to query
478     @type vgname: C{string}
479     @param vgname: the name of the volume group to ask for disk space
480         information
481     @type hypervisor_type: C{str}
482     @param hypervisor_type: the name of the hypervisor to ask for
483         memory information
484
485     """
486     retux = self._MultiNodeCall(node_list, "node_info",
487                                 [vg_name, hypervisor_type])
488
489     for result in retux.itervalues():
490       if result.failed or not isinstance(result.data, dict):
491         result.data = {}
492
493       utils.CheckDict(result.data, {
494         'memory_total' : '-',
495         'memory_dom0' : '-',
496         'memory_free' : '-',
497         'vg_size' : 'node_unreachable',
498         'vg_free' : '-',
499         }, "call_node_info")
500     return retux
501
502   def call_node_add(self, node, dsa, dsapub, rsa, rsapub, ssh, sshpub):
503     """Add a node to the cluster.
504
505     This is a single-node call.
506
507     """
508     return self._SingleNodeCall(node, "node_add",
509                                 [dsa, dsapub, rsa, rsapub, ssh, sshpub])
510
511   def call_node_verify(self, node_list, checkdict, cluster_name):
512     """Request verification of given parameters.
513
514     This is a multi-node call.
515
516     """
517     return self._MultiNodeCall(node_list, "node_verify",
518                                [checkdict, cluster_name])
519
520   @classmethod
521   def call_node_start_master(cls, node, start_daemons):
522     """Tells a node to activate itself as a master.
523
524     This is a single-node call.
525
526     """
527     return cls._StaticSingleNodeCall(node, "node_start_master",
528                                      [start_daemons])
529
530   @classmethod
531   def call_node_stop_master(cls, node, stop_daemons):
532     """Tells a node to demote itself from master status.
533
534     This is a single-node call.
535
536     """
537     return cls._StaticSingleNodeCall(node, "node_stop_master", [stop_daemons])
538
539   @classmethod
540   def call_master_info(cls, node_list):
541     """Query master info.
542
543     This is a multi-node call.
544
545     """
546     # TODO: should this method query down nodes?
547     return cls._StaticMultiNodeCall(node_list, "master_info", [])
548
549   def call_version(self, node_list):
550     """Query node version.
551
552     This is a multi-node call.
553
554     """
555     return self._MultiNodeCall(node_list, "version", [])
556
557   def call_blockdev_create(self, node, bdev, size, owner, on_primary, info):
558     """Request creation of a given block device.
559
560     This is a single-node call.
561
562     """
563     return self._SingleNodeCall(node, "blockdev_create",
564                                 [bdev.ToDict(), size, owner, on_primary, info])
565
566   def call_blockdev_remove(self, node, bdev):
567     """Request removal of a given block device.
568
569     This is a single-node call.
570
571     """
572     return self._SingleNodeCall(node, "blockdev_remove", [bdev.ToDict()])
573
574   def call_blockdev_rename(self, node, devlist):
575     """Request rename of the given block devices.
576
577     This is a single-node call.
578
579     """
580     return self._SingleNodeCall(node, "blockdev_rename",
581                                 [(d.ToDict(), uid) for d, uid in devlist])
582
583   def call_blockdev_assemble(self, node, disk, owner, on_primary):
584     """Request assembling of a given block device.
585
586     This is a single-node call.
587
588     """
589     return self._SingleNodeCall(node, "blockdev_assemble",
590                                 [disk.ToDict(), owner, on_primary])
591
592   def call_blockdev_shutdown(self, node, disk):
593     """Request shutdown of a given block device.
594
595     This is a single-node call.
596
597     """
598     return self._SingleNodeCall(node, "blockdev_shutdown", [disk.ToDict()])
599
600   def call_blockdev_addchildren(self, node, bdev, ndevs):
601     """Request adding a list of children to a (mirroring) device.
602
603     This is a single-node call.
604
605     """
606     return self._SingleNodeCall(node, "blockdev_addchildren",
607                                 [bdev.ToDict(),
608                                  [disk.ToDict() for disk in ndevs]])
609
610   def call_blockdev_removechildren(self, node, bdev, ndevs):
611     """Request removing a list of children from a (mirroring) device.
612
613     This is a single-node call.
614
615     """
616     return self._SingleNodeCall(node, "blockdev_removechildren",
617                                 [bdev.ToDict(),
618                                  [disk.ToDict() for disk in ndevs]])
619
620   def call_blockdev_getmirrorstatus(self, node, disks):
621     """Request status of a (mirroring) device.
622
623     This is a single-node call.
624
625     """
626     return self._SingleNodeCall(node, "blockdev_getmirrorstatus",
627                                 [dsk.ToDict() for dsk in disks])
628
629   def call_blockdev_find(self, node, disk):
630     """Request identification of a given block device.
631
632     This is a single-node call.
633
634     """
635     return self._SingleNodeCall(node, "blockdev_find", [disk.ToDict()])
636
637   def call_blockdev_close(self, node, disks):
638     """Closes the given block devices.
639
640     This is a single-node call.
641
642     """
643     return self._SingleNodeCall(node, "blockdev_close",
644                                 [cf.ToDict() for cf in disks])
645
646   @classmethod
647   def call_upload_file(cls, node_list, file_name, address_list=None):
648     """Upload a file.
649
650     The node will refuse the operation in case the file is not on the
651     approved file list.
652
653     This is a multi-node call.
654
655     @type node_list: list
656     @param node_list: the list of node names to upload to
657     @type file_name: str
658     @param file_name: the filename to upload
659     @type address_list: list or None
660     @keyword address_list: an optional list of node addresses, in order
661         to optimize the RPC speed
662
663     """
664     data = utils.ReadFile(file_name)
665     st = os.stat(file_name)
666     params = [file_name, data, st.st_mode, st.st_uid, st.st_gid,
667               st.st_atime, st.st_mtime]
668     return cls._StaticMultiNodeCall(node_list, "upload_file", params,
669                                     address_list=address_list)
670
671   @classmethod
672   def call_write_ssconf_files(cls, node_list, values):
673     """Write ssconf files.
674
675     This is a multi-node call.
676
677     """
678     return cls._StaticMultiNodeCall(node_list, "write_ssconf_files", [values])
679
680   def call_os_diagnose(self, node_list):
681     """Request a diagnose of OS definitions.
682
683     This is a multi-node call.
684
685     """
686     result = self._MultiNodeCall(node_list, "os_diagnose", [])
687
688     for node_name, node_result in result.iteritems():
689       if not node_result.failed and node_result.data:
690         node_result.data = [objects.OS.FromDict(oss)
691                             for oss in node_result.data]
692     return result
693
694   def call_os_get(self, node, name):
695     """Returns an OS definition.
696
697     This is a single-node call.
698
699     """
700     result = self._SingleNodeCall(node, "os_get", [name])
701     if not result.failed and isinstance(result.data, dict):
702       result.data = objects.OS.FromDict(result.data)
703     return result
704
705   def call_hooks_runner(self, node_list, hpath, phase, env):
706     """Call the hooks runner.
707
708     Args:
709       - op: the OpCode instance
710       - env: a dictionary with the environment
711
712     This is a multi-node call.
713
714     """
715     params = [hpath, phase, env]
716     return self._MultiNodeCall(node_list, "hooks_runner", params)
717
718   def call_iallocator_runner(self, node, name, idata):
719     """Call an iallocator on a remote node
720
721     Args:
722       - name: the iallocator name
723       - input: the json-encoded input string
724
725     This is a single-node call.
726
727     """
728     return self._SingleNodeCall(node, "iallocator_runner", [name, idata])
729
730   def call_blockdev_grow(self, node, cf_bdev, amount):
731     """Request a snapshot of the given block device.
732
733     This is a single-node call.
734
735     """
736     return self._SingleNodeCall(node, "blockdev_grow",
737                                 [cf_bdev.ToDict(), amount])
738
739   def call_blockdev_snapshot(self, node, cf_bdev):
740     """Request a snapshot of the given block device.
741
742     This is a single-node call.
743
744     """
745     return self._SingleNodeCall(node, "blockdev_snapshot", [cf_bdev.ToDict()])
746
747   def call_snapshot_export(self, node, snap_bdev, dest_node, instance,
748                            cluster_name, idx):
749     """Request the export of a given snapshot.
750
751     This is a single-node call.
752
753     """
754     return self._SingleNodeCall(node, "snapshot_export",
755                                 [snap_bdev.ToDict(), dest_node,
756                                  self._InstDict(instance), cluster_name, idx])
757
758   def call_finalize_export(self, node, instance, snap_disks):
759     """Request the completion of an export operation.
760
761     This writes the export config file, etc.
762
763     This is a single-node call.
764
765     """
766     flat_disks = []
767     for disk in snap_disks:
768       flat_disks.append(disk.ToDict())
769
770     return self._SingleNodeCall(node, "finalize_export",
771                                 [self._InstDict(instance), flat_disks])
772
773   def call_export_info(self, node, path):
774     """Queries the export information in a given path.
775
776     This is a single-node call.
777
778     """
779     result = self._SingleNodeCall(node, "export_info", [path])
780     if not result.failed and result.data:
781       result.data = objects.SerializableConfigParser.Loads(str(result.data))
782     return result
783
784   def call_instance_os_import(self, node, inst, src_node, src_images,
785                               cluster_name):
786     """Request the import of a backup into an instance.
787
788     This is a single-node call.
789
790     """
791     return self._SingleNodeCall(node, "instance_os_import",
792                                 [self._InstDict(inst), src_node, src_images,
793                                  cluster_name])
794
795   def call_export_list(self, node_list):
796     """Gets the stored exports list.
797
798     This is a multi-node call.
799
800     """
801     return self._MultiNodeCall(node_list, "export_list", [])
802
803   def call_export_remove(self, node, export):
804     """Requests removal of a given export.
805
806     This is a single-node call.
807
808     """
809     return self._SingleNodeCall(node, "export_remove", [export])
810
811   @classmethod
812   def call_node_leave_cluster(cls, node):
813     """Requests a node to clean the cluster information it has.
814
815     This will remove the configuration information from the ganeti data
816     dir.
817
818     This is a single-node call.
819
820     """
821     return cls._StaticSingleNodeCall(node, "node_leave_cluster", [])
822
823   def call_node_volumes(self, node_list):
824     """Gets all volumes on node(s).
825
826     This is a multi-node call.
827
828     """
829     return self._MultiNodeCall(node_list, "node_volumes", [])
830
831   def call_node_demote_from_mc(self, node):
832     """Demote a node from the master candidate role.
833
834     This is a single-node call.
835
836     """
837     return self._SingleNodeCall(node, "node_demote_from_mc", [])
838
839   def call_test_delay(self, node_list, duration):
840     """Sleep for a fixed time on given node(s).
841
842     This is a multi-node call.
843
844     """
845     return self._MultiNodeCall(node_list, "test_delay", [duration])
846
847   def call_file_storage_dir_create(self, node, file_storage_dir):
848     """Create the given file storage directory.
849
850     This is a single-node call.
851
852     """
853     return self._SingleNodeCall(node, "file_storage_dir_create",
854                                 [file_storage_dir])
855
856   def call_file_storage_dir_remove(self, node, file_storage_dir):
857     """Remove the given file storage directory.
858
859     This is a single-node call.
860
861     """
862     return self._SingleNodeCall(node, "file_storage_dir_remove",
863                                 [file_storage_dir])
864
865   def call_file_storage_dir_rename(self, node, old_file_storage_dir,
866                                    new_file_storage_dir):
867     """Rename file storage directory.
868
869     This is a single-node call.
870
871     """
872     return self._SingleNodeCall(node, "file_storage_dir_rename",
873                                 [old_file_storage_dir, new_file_storage_dir])
874
875   @classmethod
876   def call_jobqueue_update(cls, node_list, address_list, file_name, content):
877     """Update job queue.
878
879     This is a multi-node call.
880
881     """
882     return cls._StaticMultiNodeCall(node_list, "jobqueue_update",
883                                     [file_name, content],
884                                     address_list=address_list)
885
886   @classmethod
887   def call_jobqueue_purge(cls, node):
888     """Purge job queue.
889
890     This is a single-node call.
891
892     """
893     return cls._StaticSingleNodeCall(node, "jobqueue_purge", [])
894
895   @classmethod
896   def call_jobqueue_rename(cls, node_list, address_list, old, new):
897     """Rename a job queue file.
898
899     This is a multi-node call.
900
901     """
902     return cls._StaticMultiNodeCall(node_list, "jobqueue_rename", [old, new],
903                                     address_list=address_list)
904
905   @classmethod
906   def call_jobqueue_set_drain(cls, node_list, drain_flag):
907     """Set the drain flag on the queue.
908
909     This is a multi-node call.
910
911     @type node_list: list
912     @param node_list: the list of nodes to query
913     @type drain_flag: bool
914     @param drain_flag: if True, will set the drain flag, otherwise reset it.
915
916     """
917     return cls._StaticMultiNodeCall(node_list, "jobqueue_set_drain",
918                                     [drain_flag])
919
920   def call_hypervisor_validate_params(self, node_list, hvname, hvparams):
921     """Validate the hypervisor params.
922
923     This is a multi-node call.
924
925     @type node_list: list
926     @param node_list: the list of nodes to query
927     @type hvname: string
928     @param hvname: the hypervisor name
929     @type hvparams: dict
930     @param hvparams: the hypervisor parameters to be validated
931
932     """
933     cluster = self._cfg.GetClusterInfo()
934     hv_full = cluster.FillDict(cluster.hvparams.get(hvname, {}), hvparams)
935     return self._MultiNodeCall(node_list, "hypervisor_validate_params",
936                                [hvname, hv_full])