RAPI: Export beparams as dict. The patch also enables LUQueryInstances to accept...
[ganeti-local] / lib / rpc.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Inter-node RPC library.
23
24 """
25
26 # pylint: disable-msg=C0103,R0201,R0904
27 # C0103: Invalid name, since call_ are not valid
28 # R0201: Method could be a function, we keep all rpcs instance methods
29 # as not to change them back and forth between static/instance methods
30 # if they need to start using instance attributes
31 # R0904: Too many public methods
32
33 import os
34 import socket
35 import httplib
36 import logging
37
38 import simplejson
39
40 from ganeti import utils
41 from ganeti import objects
42
43
44 class NodeController:
45   """Node-handling class.
46
47   For each node that we speak with, we create an instance of this
48   class, so that we have a safe place to store the details of this
49   individual call.
50
51   """
52   def __init__(self, parent, node, address=None):
53     """Constructor for the node controller.
54
55     @type parent: L{Client}
56     @param parent: the C{Client} instance which holds global parameters for
57         the call
58     @type node: str
59     @param node: the name of the node we connect to; it is used for error
60         messages and in cases we the address paramater is not passed
61     @type address: str
62     @keyword address: the node's address, in case we know it, so that we
63         don't need to resolve it; testing shows that httplib has high
64         overhead in resolving addresses (even when speficied in /etc/hosts)
65
66     """
67     self.parent = parent
68     self.node = node
69     if address is None:
70       address = node
71     self.failed = False
72
73     self.http_conn = hc = httplib.HTTPConnection(address, parent.port)
74     try:
75       hc.connect()
76       hc.putrequest('PUT', "/%s" % parent.procedure,
77                     skip_accept_encoding=True)
78       hc.putheader('Content-Length', parent.body_length)
79       hc.endheaders()
80       hc.send(parent.body)
81     except socket.error:
82       logging.exception("Error connecting to node %s", node)
83       self.failed = True
84
85   def GetResponse(self):
86     """Try to process the response from the node.
87
88     """
89     if self.failed:
90       # we already failed in connect
91       return False
92     resp = self.http_conn.getresponse()
93     if resp.status != 200:
94       return False
95     try:
96       length = int(resp.getheader('Content-Length', '0'))
97     except ValueError:
98       return False
99     if not length:
100       logging.error("Zero-length reply from node %s", self.node)
101       return False
102     payload = resp.read(length)
103     unload = simplejson.loads(payload)
104     return unload
105
106
107 class Client:
108   """RPC Client class.
109
110   This class, given a (remote) method name, a list of parameters and a
111   list of nodes, will contact (in parallel) all nodes, and return a
112   dict of results (key: node name, value: result).
113
114   One current bug is that generic failure is still signalled by
115   'False' result, which is not good. This overloading of values can
116   cause bugs.
117
118   @var body_length: cached string value of the length of the body (so that
119       individual C{NodeController} instances don't have to recompute it)
120
121   """
122   result_set = False
123   result = False
124   allresult = []
125
126   def __init__(self, procedure, args):
127     self.port = utils.GetNodeDaemonPort()
128     self.nodepw = utils.GetNodeDaemonPassword()
129     self.nc = {}
130     self.results = {}
131     self.procedure = procedure
132     self.args = args
133     self.body = simplejson.dumps(args)
134     self.body_length = str(len(self.body))
135
136   #--- generic connector -------------
137
138   def ConnectList(self, node_list, address_list=None):
139     """Add a list of nodes to the target nodes.
140
141     @type node_list: list
142     @param node_list: the list of node names to connect
143     @type address_list: list or None
144     @keyword address_list: either None or a list with node addresses,
145         which must have the same length as the node list
146
147     """
148     if address_list is None:
149       address_list = [None for _ in node_list]
150     else:
151       assert len(node_list) == len(address_list), \
152              "Name and address lists should have the same length"
153     for node, address in zip(node_list, address_list):
154       self.ConnectNode(node, address)
155
156   def ConnectNode(self, name, address=None):
157     """Add a node to the target list.
158
159     @type name: str
160     @param name: the node name
161     @type address: str
162     @keyword address: the node address, if known
163
164     """
165     self.nc[name] = NodeController(self, name, address)
166
167   def GetResults(self):
168     """Return the results of the call.
169
170     """
171     return self.results
172
173   def Run(self):
174     """Gather results from the node controllers.
175
176     This function simply calls GetResponse() for each of our node
177     controllers.
178
179     """
180     for node, nc in self.nc.items():
181       self.results[node] = nc.GetResponse()
182
183
184 class RpcRunner(object):
185   """RPC runner class"""
186
187   def __init__(self, cfg):
188     """Initialized the rpc runner.
189
190     @type cfg:  C{config.ConfigWriter}
191     @param cfg: the configuration object that will be used to get data
192                 about the cluster
193
194     """
195     self._cfg = cfg
196
197   def _InstDict(self, instance):
198     """Convert the given instance to a dict.
199
200     This is done via the instance's ToDict() method and additionally
201     we fill the hvparams with the cluster defaults.
202
203     @type instance: L{objects.Instance}
204     @param instance: an Instance object
205     @rtype: dict
206     @return: the instance dict, with the hvparams filled with the
207         cluster defaults
208
209     """
210     idict = instance.ToDict()
211     cluster = self._cfg.GetClusterInfo()
212     idict["hvparams"] = cluster.FillHV(instance)
213     idict["beparams"] = cluster.FillBE(instance)
214     return idict
215
216   def _ConnectList(self, client, node_list):
217     """Helper for computing node addresses.
218
219     @type client: L{Client}
220     @param client: a C{Client} instance
221     @type node_list: list
222     @param node_list: the node list we should connect
223
224     """
225     all_nodes = self._cfg.GetAllNodesInfo()
226     addr_list = []
227     for node in node_list:
228       if node in all_nodes:
229         val = all_nodes[node].primary_ip
230       else:
231         val = None
232       addr_list.append(val)
233     client.ConnectList(node_list, address_list=addr_list)
234
235   def _ConnectNode(self, client, node):
236     """Helper for computing one node's address.
237
238     @type client: L{Client}
239     @param client: a C{Client} instance
240     @type node: str
241     @param node: the node we should connect
242
243     """
244     node_info = self._cfg.GetNodeInfo(node)
245     if node_info is not None:
246       addr = node_info.primary_ip
247     else:
248       addr = None
249     client.ConnectNode(node, address=addr)
250
251   def call_volume_list(self, node_list, vg_name):
252     """Gets the logical volumes present in a given volume group.
253
254     This is a multi-node call.
255
256     """
257     c = Client("volume_list", [vg_name])
258     self._ConnectList(c, node_list)
259     c.Run()
260     return c.GetResults()
261
262   def call_vg_list(self, node_list):
263     """Gets the volume group list.
264
265     This is a multi-node call.
266
267     """
268     c = Client("vg_list", [])
269     self._ConnectList(c, node_list)
270     c.Run()
271     return c.GetResults()
272
273   def call_bridges_exist(self, node, bridges_list):
274     """Checks if a node has all the bridges given.
275
276     This method checks if all bridges given in the bridges_list are
277     present on the remote node, so that an instance that uses interfaces
278     on those bridges can be started.
279
280     This is a single-node call.
281
282     """
283     c = Client("bridges_exist", [bridges_list])
284     self._ConnectNode(c, node)
285     c.Run()
286     return c.GetResults().get(node, False)
287
288   def call_instance_start(self, node, instance, extra_args):
289     """Starts an instance.
290
291     This is a single-node call.
292
293     """
294     c = Client("instance_start", [self._InstDict(instance), extra_args])
295     self._ConnectNode(c, node)
296     c.Run()
297     return c.GetResults().get(node, False)
298
299   def call_instance_shutdown(self, node, instance):
300     """Stops an instance.
301
302     This is a single-node call.
303
304     """
305     c = Client("instance_shutdown", [self._InstDict(instance)])
306     self._ConnectNode(c, node)
307     c.Run()
308     return c.GetResults().get(node, False)
309
310   def call_instance_migrate(self, node, instance, target, live):
311     """Migrate an instance.
312
313     This is a single-node call.
314
315     @type node: string
316     @param node: the node on which the instance is currently running
317     @type instance: C{objects.Instance}
318     @param instance: the instance definition
319     @type target: string
320     @param target: the target node name
321     @type live: boolean
322     @param live: whether the migration should be done live or not (the
323         interpretation of this parameter is left to the hypervisor)
324
325     """
326     c = Client("instance_migrate", [self._InstDict(instance), target, live])
327     self._ConnectNode(c, node)
328     c.Run()
329     return c.GetResults().get(node, False)
330
331   def call_instance_reboot(self, node, instance, reboot_type, extra_args):
332     """Reboots an instance.
333
334     This is a single-node call.
335
336     """
337     c = Client("instance_reboot", [self._InstDict(instance),
338                                    reboot_type, extra_args])
339     self._ConnectNode(c, node)
340     c.Run()
341     return c.GetResults().get(node, False)
342
343   def call_instance_os_add(self, node, inst):
344     """Installs an OS on the given instance.
345
346     This is a single-node call.
347
348     """
349     params = [self._InstDict(inst)]
350     c = Client("instance_os_add", params)
351     self._ConnectNode(c, node)
352     c.Run()
353     return c.GetResults().get(node, False)
354
355   def call_instance_run_rename(self, node, inst, old_name):
356     """Run the OS rename script for an instance.
357
358     This is a single-node call.
359
360     """
361     params = [self._InstDict(inst), old_name]
362     c = Client("instance_run_rename", params)
363     self._ConnectNode(c, node)
364     c.Run()
365     return c.GetResults().get(node, False)
366
367   def call_instance_info(self, node, instance, hname):
368     """Returns information about a single instance.
369
370     This is a single-node call.
371
372     @type node_list: list
373     @param node_list: the list of nodes to query
374     @type instance: string
375     @param instance: the instance name
376     @type hname: string
377     @param hname: the hypervisor type of the instance
378
379     """
380     c = Client("instance_info", [instance, hname])
381     self._ConnectNode(c, node)
382     c.Run()
383     return c.GetResults().get(node, False)
384
385   def call_all_instances_info(self, node_list, hypervisor_list):
386     """Returns information about all instances on the given nodes.
387
388     This is a multi-node call.
389
390     @type node_list: list
391     @param node_list: the list of nodes to query
392     @type hypervisor_list: list
393     @param hypervisor_list: the hypervisors to query for instances
394
395     """
396     c = Client("all_instances_info", [hypervisor_list])
397     self._ConnectList(c, node_list)
398     c.Run()
399     return c.GetResults()
400
401   def call_instance_list(self, node_list, hypervisor_list):
402     """Returns the list of running instances on a given node.
403
404     This is a multi-node call.
405
406     @type node_list: list
407     @param node_list: the list of nodes to query
408     @type hypervisor_list: list
409     @param hypervisor_list: the hypervisors to query for instances
410
411     """
412     c = Client("instance_list", [hypervisor_list])
413     self._ConnectList(c, node_list)
414     c.Run()
415     return c.GetResults()
416
417   def call_node_tcp_ping(self, node, source, target, port, timeout,
418                          live_port_needed):
419     """Do a TcpPing on the remote node
420
421     This is a single-node call.
422
423     """
424     c = Client("node_tcp_ping", [source, target, port, timeout,
425                                  live_port_needed])
426     self._ConnectNode(c, node)
427     c.Run()
428     return c.GetResults().get(node, False)
429
430   def call_node_has_ip_address(self, node, address):
431     """Checks if a node has the given IP address.
432
433     This is a single-node call.
434
435     """
436     c = Client("node_has_ip_address", [address])
437     self._ConnectNode(c, node)
438     c.Run()
439     return c.GetResults().get(node, False)
440
441   def call_node_info(self, node_list, vg_name, hypervisor_type):
442     """Return node information.
443
444     This will return memory information and volume group size and free
445     space.
446
447     This is a multi-node call.
448
449     @type node_list: list
450     @param node_list: the list of nodes to query
451     @type vgname: C{string}
452     @param vgname: the name of the volume group to ask for disk space
453         information
454     @type hypervisor_type: C{str}
455     @param hypervisor_type: the name of the hypervisor to ask for
456         memory information
457
458     """
459     c = Client("node_info", [vg_name, hypervisor_type])
460     self._ConnectList(c, node_list)
461     c.Run()
462     retux = c.GetResults()
463
464     for node_name in retux:
465       ret = retux.get(node_name, False)
466       if type(ret) != dict:
467         logging.error("could not connect to node %s", node_name)
468         ret = {}
469
470       utils.CheckDict(ret,
471                       { 'memory_total' : '-',
472                         'memory_dom0' : '-',
473                         'memory_free' : '-',
474                         'vg_size' : 'node_unreachable',
475                         'vg_free' : '-' },
476                       "call_node_info",
477                       )
478     return retux
479
480   def call_node_add(self, node, dsa, dsapub, rsa, rsapub, ssh, sshpub):
481     """Add a node to the cluster.
482
483     This is a single-node call.
484
485     """
486     params = [dsa, dsapub, rsa, rsapub, ssh, sshpub]
487     c = Client("node_add", params)
488     self._ConnectNode(c, node)
489     c.Run()
490     return c.GetResults().get(node, False)
491
492   def call_node_verify(self, node_list, checkdict, cluster_name):
493     """Request verification of given parameters.
494
495     This is a multi-node call.
496
497     """
498     c = Client("node_verify", [checkdict, cluster_name])
499     self._ConnectList(c, node_list)
500     c.Run()
501     return c.GetResults()
502
503   @staticmethod
504   def call_node_start_master(node, start_daemons):
505     """Tells a node to activate itself as a master.
506
507     This is a single-node call.
508
509     """
510     c = Client("node_start_master", [start_daemons])
511     c.ConnectNode(node)
512     c.Run()
513     return c.GetResults().get(node, False)
514
515   @staticmethod
516   def call_node_stop_master(node, stop_daemons):
517     """Tells a node to demote itself from master status.
518
519     This is a single-node call.
520
521     """
522     c = Client("node_stop_master", [stop_daemons])
523     c.ConnectNode(node)
524     c.Run()
525     return c.GetResults().get(node, False)
526
527   @staticmethod
528   def call_master_info(node_list):
529     """Query master info.
530
531     This is a multi-node call.
532
533     """
534     # TODO: should this method query down nodes?
535     c = Client("master_info", [])
536     c.ConnectList(node_list)
537     c.Run()
538     return c.GetResults()
539
540   def call_version(self, node_list):
541     """Query node version.
542
543     This is a multi-node call.
544
545     """
546     c = Client("version", [])
547     self._ConnectList(c, node_list)
548     c.Run()
549     return c.GetResults()
550
551   def call_blockdev_create(self, node, bdev, size, owner, on_primary, info):
552     """Request creation of a given block device.
553
554     This is a single-node call.
555
556     """
557     params = [bdev.ToDict(), size, owner, on_primary, info]
558     c = Client("blockdev_create", params)
559     self._ConnectNode(c, node)
560     c.Run()
561     return c.GetResults().get(node, False)
562
563   def call_blockdev_remove(self, node, bdev):
564     """Request removal of a given block device.
565
566     This is a single-node call.
567
568     """
569     c = Client("blockdev_remove", [bdev.ToDict()])
570     self._ConnectNode(c, node)
571     c.Run()
572     return c.GetResults().get(node, False)
573
574   def call_blockdev_rename(self, node, devlist):
575     """Request rename of the given block devices.
576
577     This is a single-node call.
578
579     """
580     params = [(d.ToDict(), uid) for d, uid in devlist]
581     c = Client("blockdev_rename", params)
582     self._ConnectNode(c, node)
583     c.Run()
584     return c.GetResults().get(node, False)
585
586   def call_blockdev_assemble(self, node, disk, owner, on_primary):
587     """Request assembling of a given block device.
588
589     This is a single-node call.
590
591     """
592     params = [disk.ToDict(), owner, on_primary]
593     c = Client("blockdev_assemble", params)
594     self._ConnectNode(c, node)
595     c.Run()
596     return c.GetResults().get(node, False)
597
598   def call_blockdev_shutdown(self, node, disk):
599     """Request shutdown of a given block device.
600
601     This is a single-node call.
602
603     """
604     c = Client("blockdev_shutdown", [disk.ToDict()])
605     self._ConnectNode(c, node)
606     c.Run()
607     return c.GetResults().get(node, False)
608
609   def call_blockdev_addchildren(self, node, bdev, ndevs):
610     """Request adding a list of children to a (mirroring) device.
611
612     This is a single-node call.
613
614     """
615     params = [bdev.ToDict(), [disk.ToDict() for disk in ndevs]]
616     c = Client("blockdev_addchildren", params)
617     self._ConnectNode(c, node)
618     c.Run()
619     return c.GetResults().get(node, False)
620
621   def call_blockdev_removechildren(self, node, bdev, ndevs):
622     """Request removing a list of children from a (mirroring) device.
623
624     This is a single-node call.
625
626     """
627     params = [bdev.ToDict(), [disk.ToDict() for disk in ndevs]]
628     c = Client("blockdev_removechildren", params)
629     self._ConnectNode(c, node)
630     c.Run()
631     return c.GetResults().get(node, False)
632
633   def call_blockdev_getmirrorstatus(self, node, disks):
634     """Request status of a (mirroring) device.
635
636     This is a single-node call.
637
638     """
639     params = [dsk.ToDict() for dsk in disks]
640     c = Client("blockdev_getmirrorstatus", params)
641     self._ConnectNode(c, node)
642     c.Run()
643     return c.GetResults().get(node, False)
644
645   def call_blockdev_find(self, node, disk):
646     """Request identification of a given block device.
647
648     This is a single-node call.
649
650     """
651     c = Client("blockdev_find", [disk.ToDict()])
652     self._ConnectNode(c, node)
653     c.Run()
654     return c.GetResults().get(node, False)
655
656   def call_blockdev_close(self, node, disks):
657     """Closes the given block devices.
658
659     This is a single-node call.
660
661     """
662     params = [cf.ToDict() for cf in disks]
663     c = Client("blockdev_close", params)
664     self._ConnectNode(c, node)
665     c.Run()
666     return c.GetResults().get(node, False)
667
668   @staticmethod
669   def call_upload_file(node_list, file_name, address_list=None):
670     """Upload a file.
671
672     The node will refuse the operation in case the file is not on the
673     approved file list.
674
675     This is a multi-node call.
676
677     @type node_list: list
678     @param node_list: the list of node names to upload to
679     @type file_name: str
680     @param file_name: the filename to upload
681     @type address_list: list or None
682     @keyword address_list: an optional list of node addresses, in order
683         to optimize the RPC speed
684
685     """
686     fh = file(file_name)
687     try:
688       data = fh.read()
689     finally:
690       fh.close()
691     st = os.stat(file_name)
692     params = [file_name, data, st.st_mode, st.st_uid, st.st_gid,
693               st.st_atime, st.st_mtime]
694     c = Client("upload_file", params)
695     c.ConnectList(node_list, address_list=address_list)
696     c.Run()
697     return c.GetResults()
698
699   def call_os_diagnose(self, node_list):
700     """Request a diagnose of OS definitions.
701
702     This is a multi-node call.
703
704     """
705     c = Client("os_diagnose", [])
706     self._ConnectList(c, node_list)
707     c.Run()
708     result = c.GetResults()
709     new_result = {}
710     for node_name in result:
711       if result[node_name]:
712         nr = [objects.OS.FromDict(oss) for oss in result[node_name]]
713       else:
714         nr = []
715       new_result[node_name] = nr
716     return new_result
717
718   def call_os_get(self, node, name):
719     """Returns an OS definition.
720
721     This is a single-node call.
722
723     """
724     c = Client("os_get", [name])
725     self._ConnectNode(c, node)
726     c.Run()
727     result = c.GetResults().get(node, False)
728     if isinstance(result, dict):
729       return objects.OS.FromDict(result)
730     else:
731       return result
732
733   def call_hooks_runner(self, node_list, hpath, phase, env):
734     """Call the hooks runner.
735
736     Args:
737       - op: the OpCode instance
738       - env: a dictionary with the environment
739
740     This is a multi-node call.
741
742     """
743     params = [hpath, phase, env]
744     c = Client("hooks_runner", params)
745     self._ConnectList(c, node_list)
746     c.Run()
747     result = c.GetResults()
748     return result
749
750   def call_iallocator_runner(self, node, name, idata):
751     """Call an iallocator on a remote node
752
753     Args:
754       - name: the iallocator name
755       - input: the json-encoded input string
756
757     This is a single-node call.
758
759     """
760     params = [name, idata]
761     c = Client("iallocator_runner", params)
762     self._ConnectNode(c, node)
763     c.Run()
764     result = c.GetResults().get(node, False)
765     return result
766
767   def call_blockdev_grow(self, node, cf_bdev, amount):
768     """Request a snapshot of the given block device.
769
770     This is a single-node call.
771
772     """
773     c = Client("blockdev_grow", [cf_bdev.ToDict(), amount])
774     self._ConnectNode(c, node)
775     c.Run()
776     return c.GetResults().get(node, False)
777
778   def call_blockdev_snapshot(self, node, cf_bdev):
779     """Request a snapshot of the given block device.
780
781     This is a single-node call.
782
783     """
784     c = Client("blockdev_snapshot", [cf_bdev.ToDict()])
785     self._ConnectNode(c, node)
786     c.Run()
787     return c.GetResults().get(node, False)
788
789   def call_snapshot_export(self, node, snap_bdev, dest_node, instance,
790                            cluster_name):
791     """Request the export of a given snapshot.
792
793     This is a single-node call.
794
795     """
796     params = [snap_bdev.ToDict(), dest_node,
797               self._InstDict(instance), cluster_name]
798     c = Client("snapshot_export", params)
799     self._ConnectNode(c, node)
800     c.Run()
801     return c.GetResults().get(node, False)
802
803   def call_finalize_export(self, node, instance, snap_disks):
804     """Request the completion of an export operation.
805
806     This writes the export config file, etc.
807
808     This is a single-node call.
809
810     """
811     flat_disks = []
812     for disk in snap_disks:
813       flat_disks.append(disk.ToDict())
814     params = [self._InstDict(instance), flat_disks]
815     c = Client("finalize_export", params)
816     self._ConnectNode(c, node)
817     c.Run()
818     return c.GetResults().get(node, False)
819
820   def call_export_info(self, node, path):
821     """Queries the export information in a given path.
822
823     This is a single-node call.
824
825     """
826     c = Client("export_info", [path])
827     self._ConnectNode(c, node)
828     c.Run()
829     result = c.GetResults().get(node, False)
830     if not result:
831       return result
832     return objects.SerializableConfigParser.Loads(str(result))
833
834   def call_instance_os_import(self, node, inst, src_node, src_images,
835                               cluster_name):
836     """Request the import of a backup into an instance.
837
838     This is a single-node call.
839
840     """
841     params = [self._InstDict(inst), src_node, src_images, cluster_name]
842     c = Client("instance_os_import", params)
843     self._ConnectNode(c, node)
844     c.Run()
845     return c.GetResults().get(node, False)
846
847   def call_export_list(self, node_list):
848     """Gets the stored exports list.
849
850     This is a multi-node call.
851
852     """
853     c = Client("export_list", [])
854     self._ConnectList(c, node_list)
855     c.Run()
856     result = c.GetResults()
857     return result
858
859   def call_export_remove(self, node, export):
860     """Requests removal of a given export.
861
862     This is a single-node call.
863
864     """
865     c = Client("export_remove", [export])
866     self._ConnectNode(c, node)
867     c.Run()
868     return c.GetResults().get(node, False)
869
870   @staticmethod
871   def call_node_leave_cluster(node):
872     """Requests a node to clean the cluster information it has.
873
874     This will remove the configuration information from the ganeti data
875     dir.
876
877     This is a single-node call.
878
879     """
880     c = Client("node_leave_cluster", [])
881     c.ConnectNode(node)
882     c.Run()
883     return c.GetResults().get(node, False)
884
885   def call_node_volumes(self, node_list):
886     """Gets all volumes on node(s).
887
888     This is a multi-node call.
889
890     """
891     c = Client("node_volumes", [])
892     self._ConnectList(c, node_list)
893     c.Run()
894     return c.GetResults()
895
896   def call_test_delay(self, node_list, duration):
897     """Sleep for a fixed time on given node(s).
898
899     This is a multi-node call.
900
901     """
902     c = Client("test_delay", [duration])
903     self._ConnectList(c, node_list)
904     c.Run()
905     return c.GetResults()
906
907   def call_file_storage_dir_create(self, node, file_storage_dir):
908     """Create the given file storage directory.
909
910     This is a single-node call.
911
912     """
913     c = Client("file_storage_dir_create", [file_storage_dir])
914     self._ConnectNode(c, node)
915     c.Run()
916     return c.GetResults().get(node, False)
917
918   def call_file_storage_dir_remove(self, node, file_storage_dir):
919     """Remove the given file storage directory.
920
921     This is a single-node call.
922
923     """
924     c = Client("file_storage_dir_remove", [file_storage_dir])
925     self._ConnectNode(c, node)
926     c.Run()
927     return c.GetResults().get(node, False)
928
929   def call_file_storage_dir_rename(self, node, old_file_storage_dir,
930                                    new_file_storage_dir):
931     """Rename file storage directory.
932
933     This is a single-node call.
934
935     """
936     c = Client("file_storage_dir_rename",
937                [old_file_storage_dir, new_file_storage_dir])
938     self._ConnectNode(c, node)
939     c.Run()
940     return c.GetResults().get(node, False)
941
942   @staticmethod
943   def call_jobqueue_update(node_list, address_list, file_name, content):
944     """Update job queue.
945
946     This is a multi-node call.
947
948     """
949     c = Client("jobqueue_update", [file_name, content])
950     c.ConnectList(node_list, address_list=address_list)
951     c.Run()
952     result = c.GetResults()
953     return result
954
955   @staticmethod
956   def call_jobqueue_purge(node):
957     """Purge job queue.
958
959     This is a single-node call.
960
961     """
962     c = Client("jobqueue_purge", [])
963     c.ConnectNode(node)
964     c.Run()
965     return c.GetResults().get(node, False)
966
967   @staticmethod
968   def call_jobqueue_rename(node_list, address_list, old, new):
969     """Rename a job queue file.
970
971     This is a multi-node call.
972
973     """
974     c = Client("jobqueue_rename", [old, new])
975     c.ConnectList(node_list, address_list=address_list)
976     c.Run()
977     result = c.GetResults()
978     return result
979
980
981   @staticmethod
982   def call_jobqueue_set_drain(node_list, drain_flag):
983     """Set the drain flag on the queue.
984
985     This is a multi-node call.
986
987     @type node_list: list
988     @param node_list: the list of nodes to query
989     @type drain_flag: bool
990     @param drain_flag: if True, will set the drain flag, otherwise reset it.
991
992     """
993     c = Client("jobqueue_set_drain", [drain_flag])
994     c.ConnectList(node_list)
995     c.Run()
996     result = c.GetResults()
997     return result
998
999
1000   def call_hypervisor_validate_params(self, node_list, hvname, hvparams):
1001     """Validate the hypervisor params.
1002
1003     This is a multi-node call.
1004
1005     @type node_list: list
1006     @param node_list: the list of nodes to query
1007     @type hvname: string
1008     @param hvname: the hypervisor name
1009     @type hvparams: dict
1010     @param hvparams: the hypervisor parameters to be validated
1011
1012     """
1013     cluster = self._cfg.GetClusterInfo()
1014     hv_full = cluster.FillDict(cluster.hvparams.get(hvname, {}), hvparams)
1015     c = Client("hypervisor_validate_params", [hvname, hv_full])
1016     self._ConnectList(c, node_list)
1017     c.Run()
1018     result = c.GetResults()
1019     return result