Reduce duplication of work in rpc.Client
[ganeti-local] / lib / rpc.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Inter-node RPC library.
23
24 """
25
26 # pylint: disable-msg=C0103,R0201,R0904
27 # C0103: Invalid name, since call_ are not valid
28 # R0201: Method could be a function, we keep all rpcs instance methods
29 # as not to change them back and forth between static/instance methods
30 # if they need to start using instance attributes
31 # R0904: Too many public methods
32
33 import os
34 import socket
35 import httplib
36 import logging
37
38 from ganeti import utils
39 from ganeti import objects
40 from ganeti import http
41 from ganeti import serializer
42 from ganeti import constants
43
44
45 # Module level variable
46 _http_manager = None
47
48
49 def Init():
50   """Initializes the module-global HTTP client manager.
51
52   Must be called before using any RPC function.
53
54   """
55   global _http_manager
56
57   assert not _http_manager, "RPC module initialized more than once"
58
59   _http_manager = http.HttpClientManager()
60
61
62 def Shutdown():
63   """Stops the module-global HTTP client manager.
64
65   Must be called before quitting the program.
66
67   """
68   global _http_manager
69
70   if _http_manager:
71     _http_manager.Shutdown()
72     _http_manager = None
73
74
75 class Client:
76   """RPC Client class.
77
78   This class, given a (remote) method name, a list of parameters and a
79   list of nodes, will contact (in parallel) all nodes, and return a
80   dict of results (key: node name, value: result).
81
82   One current bug is that generic failure is still signalled by
83   'False' result, which is not good. This overloading of values can
84   cause bugs.
85
86   """
87   def __init__(self, procedure, body, port):
88     self.procedure = procedure
89     self.body = body
90     self.port = port
91     self.nc = {}
92
93     self._ssl_params = \
94       http.HttpSslParams(ssl_key_path=constants.SSL_CERT_FILE,
95                          ssl_cert_path=constants.SSL_CERT_FILE)
96
97   def ConnectList(self, node_list, address_list=None):
98     """Add a list of nodes to the target nodes.
99
100     @type node_list: list
101     @param node_list: the list of node names to connect
102     @type address_list: list or None
103     @keyword address_list: either None or a list with node addresses,
104         which must have the same length as the node list
105
106     """
107     if address_list is None:
108       address_list = [None for _ in node_list]
109     else:
110       assert len(node_list) == len(address_list), \
111              "Name and address lists should have the same length"
112     for node, address in zip(node_list, address_list):
113       self.ConnectNode(node, address)
114
115   def ConnectNode(self, name, address=None):
116     """Add a node to the target list.
117
118     @type name: str
119     @param name: the node name
120     @type address: str
121     @keyword address: the node address, if known
122
123     """
124     if address is None:
125       address = name
126
127     self.nc[name] = http.HttpClientRequest(address, self.port, http.HTTP_PUT,
128                                            "/%s" % self.procedure,
129                                            post_data=self.body,
130                                            ssl_params=self._ssl_params,
131                                            ssl_verify_peer=True)
132
133   def GetResults(self):
134     """Call nodes and return results.
135
136     @rtype: list
137     @returns: List of RPC results
138
139     """
140     assert _http_manager, "RPC module not intialized"
141
142     _http_manager.ExecRequests(self.nc.values())
143
144     results = {}
145
146     for name, req in self.nc.iteritems():
147       if req.success and req.resp_status == http.HTTP_OK:
148         results[name] = serializer.LoadJson(req.resp_body)
149         continue
150
151       # TODO: Better error reporting
152       if req.error:
153         msg = req.error
154       else:
155         msg = req.resp_body
156
157       logging.error("RPC error from node %s: %s", name, msg)
158       results[name] = False
159
160     return results
161
162
163 class RpcRunner(object):
164   """RPC runner class"""
165
166   def __init__(self, cfg):
167     """Initialized the rpc runner.
168
169     @type cfg:  C{config.ConfigWriter}
170     @param cfg: the configuration object that will be used to get data
171                 about the cluster
172
173     """
174     self._cfg = cfg
175     self.port = utils.GetNodeDaemonPort()
176
177   def _InstDict(self, instance):
178     """Convert the given instance to a dict.
179
180     This is done via the instance's ToDict() method and additionally
181     we fill the hvparams with the cluster defaults.
182
183     @type instance: L{objects.Instance}
184     @param instance: an Instance object
185     @rtype: dict
186     @return: the instance dict, with the hvparams filled with the
187         cluster defaults
188
189     """
190     idict = instance.ToDict()
191     cluster = self._cfg.GetClusterInfo()
192     idict["hvparams"] = cluster.FillHV(instance)
193     idict["beparams"] = cluster.FillBE(instance)
194     return idict
195
196   def _ConnectList(self, client, node_list):
197     """Helper for computing node addresses.
198
199     @type client: L{Client}
200     @param client: a C{Client} instance
201     @type node_list: list
202     @param node_list: the node list we should connect
203
204     """
205     all_nodes = self._cfg.GetAllNodesInfo()
206     addr_list = []
207     for node in node_list:
208       if node in all_nodes:
209         val = all_nodes[node].primary_ip
210       else:
211         val = None
212       addr_list.append(val)
213     client.ConnectList(node_list, address_list=addr_list)
214
215   def _ConnectNode(self, client, node):
216     """Helper for computing one node's address.
217
218     @type client: L{Client}
219     @param client: a C{Client} instance
220     @type node: str
221     @param node: the node we should connect
222
223     """
224     node_info = self._cfg.GetNodeInfo(node)
225     if node_info is not None:
226       addr = node_info.primary_ip
227     else:
228       addr = None
229     client.ConnectNode(node, address=addr)
230
231   def _MultiNodeCall(self, node_list, procedure, args,
232                      address_list=None):
233     """Helper for making a multi-node call
234
235     """
236     body = serializer.DumpJson(args, indent=False)
237     c = Client(procedure, body, self.port)
238     if address_list is None:
239       self._ConnectList(c, node_list)
240     else:
241       c.ConnectList(node_list, address_list=address_list)
242     return c.GetResults()
243
244   @classmethod
245   def _StaticMultiNodeCall(cls, node_list, procedure, args,
246                            address_list=None):
247     """Helper for making a multi-node static call
248
249     """
250     body = serializer.DumpJson(args, indent=False)
251     c = Client(procedure, body, utils.GetNodeDaemonPort())
252     c.ConnectList(node_list, address_list=address_list)
253     return c.GetResults()
254
255   def _SingleNodeCall(self, node, procedure, args):
256     """Helper for making a single-node call
257
258     """
259     body = serializer.DumpJson(args, indent=False)
260     c = Client(procedure, body, self.port)
261     self._ConnectNode(c, node)
262     return c.GetResults().get(node, False)
263
264   @classmethod
265   def _StaticSingleNodeCall(cls, node, procedure, args):
266     """Helper for making a single-node static call
267
268     """
269     body = serializer.DumpJson(args, indent=False)
270     c = Client(procedure, body, utils.GetNodeDaemonPort())
271     c.ConnectNode(c, node)
272     return c.GetResults().get(node, False)
273
274   def call_volume_list(self, node_list, vg_name):
275     """Gets the logical volumes present in a given volume group.
276
277     This is a multi-node call.
278
279     """
280     return self._MultiNodeCall(node_list, "volume_list", [vg_name])
281
282   def call_vg_list(self, node_list):
283     """Gets the volume group list.
284
285     This is a multi-node call.
286
287     """
288     return self._MultiNodeCall(node_list, "vg_list", [])
289
290   def call_bridges_exist(self, node, bridges_list):
291     """Checks if a node has all the bridges given.
292
293     This method checks if all bridges given in the bridges_list are
294     present on the remote node, so that an instance that uses interfaces
295     on those bridges can be started.
296
297     This is a single-node call.
298
299     """
300     return self._SingleNodeCall(node, "bridges_exist", [bridges_list])
301
302   def call_instance_start(self, node, instance, extra_args):
303     """Starts an instance.
304
305     This is a single-node call.
306
307     """
308     return self._SingleNodeCall(node, "instance_start",
309                                 [self._InstDict(instance), extra_args])
310
311   def call_instance_shutdown(self, node, instance):
312     """Stops an instance.
313
314     This is a single-node call.
315
316     """
317     return self._SingleNodeCall(node, "instance_shutdown",
318                                 [self._InstDict(instance)])
319
320   def call_instance_migrate(self, node, instance, target, live):
321     """Migrate an instance.
322
323     This is a single-node call.
324
325     @type node: string
326     @param node: the node on which the instance is currently running
327     @type instance: C{objects.Instance}
328     @param instance: the instance definition
329     @type target: string
330     @param target: the target node name
331     @type live: boolean
332     @param live: whether the migration should be done live or not (the
333         interpretation of this parameter is left to the hypervisor)
334
335     """
336     return self._SingleNodeCall(node, "instance_migrate",
337                                 [self._InstDict(instance), target, live])
338
339   def call_instance_reboot(self, node, instance, reboot_type, extra_args):
340     """Reboots an instance.
341
342     This is a single-node call.
343
344     """
345     return self._SingleNodeCall(node, "instance_reboot",
346                                 [self._InstDict(instance), reboot_type,
347                                  extra_args])
348
349   def call_instance_os_add(self, node, inst):
350     """Installs an OS on the given instance.
351
352     This is a single-node call.
353
354     """
355     return self._SingleNodeCall(node, "instance_os_add",
356                                 [self._InstDict(inst)])
357
358   def call_instance_run_rename(self, node, inst, old_name):
359     """Run the OS rename script for an instance.
360
361     This is a single-node call.
362
363     """
364     return self._SingleNodeCall(node, "instance_run_rename",
365                                 [self._InstDict(inst), old_name])
366
367   def call_instance_info(self, node, instance, hname):
368     """Returns information about a single instance.
369
370     This is a single-node call.
371
372     @type node: list
373     @param node: the list of nodes to query
374     @type instance: string
375     @param instance: the instance name
376     @type hname: string
377     @param hname: the hypervisor type of the instance
378
379     """
380     return self._SingleNodeCall(node, "instance_info", [instance, hname])
381
382   def call_all_instances_info(self, node_list, hypervisor_list):
383     """Returns information about all instances on the given nodes.
384
385     This is a multi-node call.
386
387     @type node_list: list
388     @param node_list: the list of nodes to query
389     @type hypervisor_list: list
390     @param hypervisor_list: the hypervisors to query for instances
391
392     """
393     return self._MultiNodeCall(node_list, "all_instances_info",
394                                [hypervisor_list])
395
396   def call_instance_list(self, node_list, hypervisor_list):
397     """Returns the list of running instances on a given node.
398
399     This is a multi-node call.
400
401     @type node_list: list
402     @param node_list: the list of nodes to query
403     @type hypervisor_list: list
404     @param hypervisor_list: the hypervisors to query for instances
405
406     """
407     return self._MultiNodeCall(node_list, "instance_list", [hypervisor_list])
408
409   def call_node_tcp_ping(self, node, source, target, port, timeout,
410                          live_port_needed):
411     """Do a TcpPing on the remote node
412
413     This is a single-node call.
414
415     """
416     return self._SingleNodeCall(node, "node_tcp_ping",
417                                 [source, target, port, timeout,
418                                  live_port_needed])
419
420   def call_node_has_ip_address(self, node, address):
421     """Checks if a node has the given IP address.
422
423     This is a single-node call.
424
425     """
426     return self._SingleNodeCall(node, "node_has_ip_address", [address])
427
428   def call_node_info(self, node_list, vg_name, hypervisor_type):
429     """Return node information.
430
431     This will return memory information and volume group size and free
432     space.
433
434     This is a multi-node call.
435
436     @type node_list: list
437     @param node_list: the list of nodes to query
438     @type vgname: C{string}
439     @param vgname: the name of the volume group to ask for disk space
440         information
441     @type hypervisor_type: C{str}
442     @param hypervisor_type: the name of the hypervisor to ask for
443         memory information
444
445     """
446     retux = self._MultiNodeCall(node_list, "node_info",
447                                 [vg_name, hypervisor_type])
448
449     for node_name in retux:
450       ret = retux.get(node_name, False)
451       if type(ret) != dict:
452         logging.error("could not connect to node %s", node_name)
453         ret = {}
454
455       utils.CheckDict(ret, {
456                         'memory_total' : '-',
457                         'memory_dom0' : '-',
458                         'memory_free' : '-',
459                         'vg_size' : 'node_unreachable',
460                         'vg_free' : '-',
461                       }, "call_node_info")
462     return retux
463
464   def call_node_add(self, node, dsa, dsapub, rsa, rsapub, ssh, sshpub):
465     """Add a node to the cluster.
466
467     This is a single-node call.
468
469     """
470     return self._SingleNodeCall(node, "node_add",
471                                 [dsa, dsapub, rsa, rsapub, ssh, sshpub])
472
473   def call_node_verify(self, node_list, checkdict, cluster_name):
474     """Request verification of given parameters.
475
476     This is a multi-node call.
477
478     """
479     return self._MultiNodeCall(node_list, "node_verify",
480                                [checkdict, cluster_name])
481
482   @classmethod
483   def call_node_start_master(cls, node, start_daemons):
484     """Tells a node to activate itself as a master.
485
486     This is a single-node call.
487
488     """
489     return cls._StaticSingleNodeCall(node, "node_start_master",
490                                      [start_daemons])
491
492   @classmethod
493   def call_node_stop_master(cls, node, stop_daemons):
494     """Tells a node to demote itself from master status.
495
496     This is a single-node call.
497
498     """
499     return cls._StaticSingleNodeCall(node, "node_stop_master", [stop_daemons])
500
501   @classmethod
502   def call_master_info(cls, node_list):
503     """Query master info.
504
505     This is a multi-node call.
506
507     """
508     # TODO: should this method query down nodes?
509     return cls._StaticMultiNodeCall(node_list, "master_info", [])
510
511   def call_version(self, node_list):
512     """Query node version.
513
514     This is a multi-node call.
515
516     """
517     return self._MultiNodeCall(node_list, "version", [])
518
519   def call_blockdev_create(self, node, bdev, size, owner, on_primary, info):
520     """Request creation of a given block device.
521
522     This is a single-node call.
523
524     """
525     return self._SingleNodeCall(node, "blockdev_create",
526                                 [bdev.ToDict(), size, owner, on_primary, info])
527
528   def call_blockdev_remove(self, node, bdev):
529     """Request removal of a given block device.
530
531     This is a single-node call.
532
533     """
534     return self._SingleNodeCall(node, "blockdev_remove", [bdev.ToDict()])
535
536   def call_blockdev_rename(self, node, devlist):
537     """Request rename of the given block devices.
538
539     This is a single-node call.
540
541     """
542     return self._SingleNodeCall(node, "blockdev_rename",
543                                 [(d.ToDict(), uid) for d, uid in devlist])
544
545   def call_blockdev_assemble(self, node, disk, owner, on_primary):
546     """Request assembling of a given block device.
547
548     This is a single-node call.
549
550     """
551     return self._SingleNodeCall(node, "blockdev_assemble",
552                                 [disk.ToDict(), owner, on_primary])
553
554   def call_blockdev_shutdown(self, node, disk):
555     """Request shutdown of a given block device.
556
557     This is a single-node call.
558
559     """
560     return self._SingleNodeCall(node, "blockdev_shutdown", [disk.ToDict()])
561
562   def call_blockdev_addchildren(self, node, bdev, ndevs):
563     """Request adding a list of children to a (mirroring) device.
564
565     This is a single-node call.
566
567     """
568     return self._SingleNodeCall(node, "blockdev_addchildren",
569                                 [bdev.ToDict(),
570                                  [disk.ToDict() for disk in ndevs]])
571
572   def call_blockdev_removechildren(self, node, bdev, ndevs):
573     """Request removing a list of children from a (mirroring) device.
574
575     This is a single-node call.
576
577     """
578     return self._SingleNodeCall(node, "blockdev_removechildren",
579                                 [bdev.ToDict(),
580                                  [disk.ToDict() for disk in ndevs]])
581
582   def call_blockdev_getmirrorstatus(self, node, disks):
583     """Request status of a (mirroring) device.
584
585     This is a single-node call.
586
587     """
588     return self._SingleNodeCall(node, "blockdev_getmirrorstatus",
589                                 [dsk.ToDict() for dsk in disks])
590
591   def call_blockdev_find(self, node, disk):
592     """Request identification of a given block device.
593
594     This is a single-node call.
595
596     """
597     return self._SingleNodeCall(node, "blockdev_find", [disk.ToDict()])
598
599   def call_blockdev_close(self, node, disks):
600     """Closes the given block devices.
601
602     This is a single-node call.
603
604     """
605     return self._SingleNodeCall(node, "blockdev_close",
606                                 [cf.ToDict() for cf in disks])
607
608   @classmethod
609   def call_upload_file(cls, node_list, file_name, address_list=None):
610     """Upload a file.
611
612     The node will refuse the operation in case the file is not on the
613     approved file list.
614
615     This is a multi-node call.
616
617     @type node_list: list
618     @param node_list: the list of node names to upload to
619     @type file_name: str
620     @param file_name: the filename to upload
621     @type address_list: list or None
622     @keyword address_list: an optional list of node addresses, in order
623         to optimize the RPC speed
624
625     """
626     data = utils.ReadFile(file_name)
627     st = os.stat(file_name)
628     params = [file_name, data, st.st_mode, st.st_uid, st.st_gid,
629               st.st_atime, st.st_mtime]
630     return cls._StaticMultiNodeCall(node_list, "upload_file", params,
631                                     address_list=address_list)
632
633   @classmethod
634   def call_write_ssconf_files(cls, node_list, values):
635     """Write ssconf files.
636
637     This is a multi-node call.
638
639     """
640     return cls._StaticMultiNodeCall(node_list, "write_ssconf_files", [values])
641
642   def call_os_diagnose(self, node_list):
643     """Request a diagnose of OS definitions.
644
645     This is a multi-node call.
646
647     """
648     result = self._MultiNodeCall(node_list, "os_diagnose", [])
649
650     new_result = {}
651     for node_name in result:
652       if result[node_name]:
653         nr = [objects.OS.FromDict(oss) for oss in result[node_name]]
654       else:
655         nr = []
656       new_result[node_name] = nr
657     return new_result
658
659   def call_os_get(self, node, name):
660     """Returns an OS definition.
661
662     This is a single-node call.
663
664     """
665     result = self._SingleNodeCall(node, "os_get", [name])
666     if isinstance(result, dict):
667       return objects.OS.FromDict(result)
668     else:
669       return result
670
671   def call_hooks_runner(self, node_list, hpath, phase, env):
672     """Call the hooks runner.
673
674     Args:
675       - op: the OpCode instance
676       - env: a dictionary with the environment
677
678     This is a multi-node call.
679
680     """
681     params = [hpath, phase, env]
682     return self._MultiNodeCall(node_list, "hooks_runner", params)
683
684   def call_iallocator_runner(self, node, name, idata):
685     """Call an iallocator on a remote node
686
687     Args:
688       - name: the iallocator name
689       - input: the json-encoded input string
690
691     This is a single-node call.
692
693     """
694     return self._SingleNodeCall(node, "iallocator_runner", [name, idata])
695
696   def call_blockdev_grow(self, node, cf_bdev, amount):
697     """Request a snapshot of the given block device.
698
699     This is a single-node call.
700
701     """
702     return self._SingleNodeCall(node, "blockdev_grow",
703                                 [cf_bdev.ToDict(), amount])
704
705   def call_blockdev_snapshot(self, node, cf_bdev):
706     """Request a snapshot of the given block device.
707
708     This is a single-node call.
709
710     """
711     return self._SingleNodeCall(node, "blockdev_snapshot", [cf_bdev.ToDict()])
712
713   def call_snapshot_export(self, node, snap_bdev, dest_node, instance,
714                            cluster_name, idx):
715     """Request the export of a given snapshot.
716
717     This is a single-node call.
718
719     """
720     return self._SingleNodeCall(node, "snapshot_export",
721                                 [snap_bdev.ToDict(), dest_node,
722                                  self._InstDict(instance), cluster_name, idx])
723
724   def call_finalize_export(self, node, instance, snap_disks):
725     """Request the completion of an export operation.
726
727     This writes the export config file, etc.
728
729     This is a single-node call.
730
731     """
732     flat_disks = []
733     for disk in snap_disks:
734       flat_disks.append(disk.ToDict())
735
736     return self._SingleNodeCall(node, "finalize_export",
737                                 [self._InstDict(instance), flat_disks])
738
739   def call_export_info(self, node, path):
740     """Queries the export information in a given path.
741
742     This is a single-node call.
743
744     """
745     result = self._SingleNodeCall(node, "export_info", [path])
746     if not result:
747       return result
748     return objects.SerializableConfigParser.Loads(str(result))
749
750   def call_instance_os_import(self, node, inst, src_node, src_images,
751                               cluster_name):
752     """Request the import of a backup into an instance.
753
754     This is a single-node call.
755
756     """
757     return self._SingleNodeCall(node, "instance_os_import",
758                                 [self._InstDict(inst), src_node, src_images,
759                                  cluster_name])
760
761   def call_export_list(self, node_list):
762     """Gets the stored exports list.
763
764     This is a multi-node call.
765
766     """
767     return self._MultiNodeCall(node_list, "export_list", [])
768
769   def call_export_remove(self, node, export):
770     """Requests removal of a given export.
771
772     This is a single-node call.
773
774     """
775     return self._SingleNodeCall(node, "export_remove", [export])
776
777   @classmethod
778   def call_node_leave_cluster(cls, node):
779     """Requests a node to clean the cluster information it has.
780
781     This will remove the configuration information from the ganeti data
782     dir.
783
784     This is a single-node call.
785
786     """
787     return cls._StaticSingleNodeCall(node, "node_leave_cluster", [])
788
789   def call_node_volumes(self, node_list):
790     """Gets all volumes on node(s).
791
792     This is a multi-node call.
793
794     """
795     return self._MultiNodeCall(node_list, "node_volumes", [])
796
797   def call_test_delay(self, node_list, duration):
798     """Sleep for a fixed time on given node(s).
799
800     This is a multi-node call.
801
802     """
803     return self._MultiNodeCall(node_list, "test_delay", [duration])
804
805   def call_file_storage_dir_create(self, node, file_storage_dir):
806     """Create the given file storage directory.
807
808     This is a single-node call.
809
810     """
811     return self._SingleNodeCall(node, "file_storage_dir_create",
812                                 [file_storage_dir])
813
814   def call_file_storage_dir_remove(self, node, file_storage_dir):
815     """Remove the given file storage directory.
816
817     This is a single-node call.
818
819     """
820     return self._SingleNodeCall(node, "file_storage_dir_remove",
821                                 [file_storage_dir])
822
823   def call_file_storage_dir_rename(self, node, old_file_storage_dir,
824                                    new_file_storage_dir):
825     """Rename file storage directory.
826
827     This is a single-node call.
828
829     """
830     return self._SingleNodeCall(node, "file_storage_dir_rename",
831                                 [old_file_storage_dir, new_file_storage_dir])
832
833   @classmethod
834   def call_jobqueue_update(cls, node_list, address_list, file_name, content):
835     """Update job queue.
836
837     This is a multi-node call.
838
839     """
840     return cls._StaticMultiNodeCall(node_list, "jobqueue_update",
841                                     [file_name, content],
842                                     address_list=address_list)
843
844   @classmethod
845   def call_jobqueue_purge(cls, node):
846     """Purge job queue.
847
848     This is a single-node call.
849
850     """
851     return cls._StaticSingleNodeCall(node, "jobqueue_purge", [])
852
853   @classmethod
854   def call_jobqueue_rename(cls, node_list, address_list, old, new):
855     """Rename a job queue file.
856
857     This is a multi-node call.
858
859     """
860     return cls._StaticMultiNodeCall(node_list, "jobqueue_rename", [old, new],
861                                     address_list=address_list)
862
863   @classmethod
864   def call_jobqueue_set_drain(cls, node_list, drain_flag):
865     """Set the drain flag on the queue.
866
867     This is a multi-node call.
868
869     @type node_list: list
870     @param node_list: the list of nodes to query
871     @type drain_flag: bool
872     @param drain_flag: if True, will set the drain flag, otherwise reset it.
873
874     """
875     return cls._StaticMultiNodeCall(node_list, "jobqueue_set_drain",
876                                     [drain_flag])
877
878   def call_hypervisor_validate_params(self, node_list, hvname, hvparams):
879     """Validate the hypervisor params.
880
881     This is a multi-node call.
882
883     @type node_list: list
884     @param node_list: the list of nodes to query
885     @type hvname: string
886     @param hvname: the hypervisor name
887     @type hvparams: dict
888     @param hvparams: the hypervisor parameters to be validated
889
890     """
891     cluster = self._cfg.GetClusterInfo()
892     hv_full = cluster.FillDict(cluster.hvparams.get(hvname, {}), hvparams)
893     return self._MultiNodeCall(node_list, "hypervisor_validate_params",
894                                [hvname, hv_full])