drbd: change the semantics of Attach vs. Assemble
[ganeti-local] / lib / rpc.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Inter-node RPC library.
23
24 """
25
26 # pylint: disable-msg=C0103,R0201,R0904
27 # C0103: Invalid name, since call_ are not valid
28 # R0201: Method could be a function, we keep all rpcs instance methods
29 # as not to change them back and forth between static/instance methods
30 # if they need to start using instance attributes
31 # R0904: Too many public methods
32
33 import os
34 import socket
35 import logging
36 import zlib
37 import base64
38
39 from ganeti import utils
40 from ganeti import objects
41 from ganeti import http
42 from ganeti import serializer
43 from ganeti import constants
44 from ganeti import errors
45
46 import ganeti.http.client
47
48
49 # Module level variable
50 _http_manager = None
51
52
53 def Init():
54   """Initializes the module-global HTTP client manager.
55
56   Must be called before using any RPC function.
57
58   """
59   global _http_manager
60
61   assert not _http_manager, "RPC module initialized more than once"
62
63   _http_manager = http.client.HttpClientManager()
64
65
66 def Shutdown():
67   """Stops the module-global HTTP client manager.
68
69   Must be called before quitting the program.
70
71   """
72   global _http_manager
73
74   if _http_manager:
75     _http_manager.Shutdown()
76     _http_manager = None
77
78
79 class RpcResult(object):
80   """RPC Result class.
81
82   This class holds an RPC result. It is needed since in multi-node
83   calls we can't raise an exception just because one one out of many
84   failed, and therefore we use this class to encapsulate the result.
85
86   @ivar data: the data payload, for successfull results, or None
87   @type failed: boolean
88   @ivar failed: whether the operation failed at RPC level (not
89       application level on the remote node)
90   @ivar call: the name of the RPC call
91   @ivar node: the name of the node to which we made the call
92   @ivar offline: whether the operation failed because the node was
93       offline, as opposed to actual failure; offline=True will always
94       imply failed=True, in order to allow simpler checking if
95       the user doesn't care about the exact failure mode
96
97   """
98   def __init__(self, data=None, failed=False, offline=False,
99                call=None, node=None):
100     self.failed = failed
101     self.offline = offline
102     self.call = call
103     self.node = node
104     if offline:
105       self.failed = True
106       self.error = "Node is marked offline"
107       self.data = None
108     elif failed:
109       self.error = data
110       self.data = None
111     else:
112       self.data = data
113       self.error = None
114
115   def Raise(self):
116     """If the result has failed, raise an OpExecError.
117
118     This is used so that LU code doesn't have to check for each
119     result, but instead can call this function.
120
121     """
122     if self.failed:
123       raise errors.OpExecError("Call '%s' to node '%s' has failed: %s" %
124                                (self.call, self.node, self.error))
125
126   def RemoteFailMsg(self):
127     """Check if the remote procedure failed.
128
129     This is valid only for RPC calls which return result of the form
130     (status, data | error_msg).
131
132     @return: empty string for succcess, otherwise an error message
133
134     """
135     def _EnsureErr(val):
136       """Helper to ensure we return a 'True' value for error."""
137       if val:
138         return val
139       else:
140         return "No error information"
141
142     if self.failed:
143       return _EnsureErr(self.error)
144     if not isinstance(self.data, (tuple, list)):
145       return "Invalid result type (%s)" % type(self.data)
146     if len(self.data) != 2:
147       return "Invalid result length (%d), expected 2" % len(self.data)
148     if not self.data[0]:
149       return _EnsureErr(self.data[1])
150     return ""
151
152
153 class Client:
154   """RPC Client class.
155
156   This class, given a (remote) method name, a list of parameters and a
157   list of nodes, will contact (in parallel) all nodes, and return a
158   dict of results (key: node name, value: result).
159
160   One current bug is that generic failure is still signalled by
161   'False' result, which is not good. This overloading of values can
162   cause bugs.
163
164   """
165   def __init__(self, procedure, body, port):
166     self.procedure = procedure
167     self.body = body
168     self.port = port
169     self.nc = {}
170
171     self._ssl_params = \
172       http.HttpSslParams(ssl_key_path=constants.SSL_CERT_FILE,
173                          ssl_cert_path=constants.SSL_CERT_FILE)
174
175   def ConnectList(self, node_list, address_list=None):
176     """Add a list of nodes to the target nodes.
177
178     @type node_list: list
179     @param node_list: the list of node names to connect
180     @type address_list: list or None
181     @keyword address_list: either None or a list with node addresses,
182         which must have the same length as the node list
183
184     """
185     if address_list is None:
186       address_list = [None for _ in node_list]
187     else:
188       assert len(node_list) == len(address_list), \
189              "Name and address lists should have the same length"
190     for node, address in zip(node_list, address_list):
191       self.ConnectNode(node, address)
192
193   def ConnectNode(self, name, address=None):
194     """Add a node to the target list.
195
196     @type name: str
197     @param name: the node name
198     @type address: str
199     @keyword address: the node address, if known
200
201     """
202     if address is None:
203       address = name
204
205     self.nc[name] = \
206       http.client.HttpClientRequest(address, self.port, http.HTTP_PUT,
207                                     "/%s" % self.procedure,
208                                     post_data=self.body,
209                                     ssl_params=self._ssl_params,
210                                     ssl_verify_peer=True)
211
212   def GetResults(self):
213     """Call nodes and return results.
214
215     @rtype: list
216     @returns: List of RPC results
217
218     """
219     assert _http_manager, "RPC module not intialized"
220
221     _http_manager.ExecRequests(self.nc.values())
222
223     results = {}
224
225     for name, req in self.nc.iteritems():
226       if req.success and req.resp_status_code == http.HTTP_OK:
227         results[name] = RpcResult(data=serializer.LoadJson(req.resp_body),
228                                   node=name, call=self.procedure)
229         continue
230
231       # TODO: Better error reporting
232       if req.error:
233         msg = req.error
234       else:
235         msg = req.resp_body
236
237       logging.error("RPC error from node %s: %s", name, msg)
238       results[name] = RpcResult(data=msg, failed=True, node=name,
239                                 call=self.procedure)
240
241     return results
242
243
244 class RpcRunner(object):
245   """RPC runner class"""
246
247   def __init__(self, cfg):
248     """Initialized the rpc runner.
249
250     @type cfg:  C{config.ConfigWriter}
251     @param cfg: the configuration object that will be used to get data
252                 about the cluster
253
254     """
255     self._cfg = cfg
256     self.port = utils.GetNodeDaemonPort()
257
258   def _InstDict(self, instance):
259     """Convert the given instance to a dict.
260
261     This is done via the instance's ToDict() method and additionally
262     we fill the hvparams with the cluster defaults.
263
264     @type instance: L{objects.Instance}
265     @param instance: an Instance object
266     @rtype: dict
267     @return: the instance dict, with the hvparams filled with the
268         cluster defaults
269
270     """
271     idict = instance.ToDict()
272     cluster = self._cfg.GetClusterInfo()
273     idict["hvparams"] = cluster.FillHV(instance)
274     idict["beparams"] = cluster.FillBE(instance)
275     return idict
276
277   def _ConnectList(self, client, node_list):
278     """Helper for computing node addresses.
279
280     @type client: L{Client}
281     @param client: a C{Client} instance
282     @type node_list: list
283     @param node_list: the node list we should connect
284
285     """
286     all_nodes = self._cfg.GetAllNodesInfo()
287     name_list = []
288     addr_list = []
289     skip_dict = {}
290     for node in node_list:
291       if node in all_nodes:
292         if all_nodes[node].offline:
293           skip_dict[node] = RpcResult(node=node, offline=True)
294           continue
295         val = all_nodes[node].primary_ip
296       else:
297         val = None
298       addr_list.append(val)
299       name_list.append(node)
300     if name_list:
301       client.ConnectList(name_list, address_list=addr_list)
302     return skip_dict
303
304   def _ConnectNode(self, client, node):
305     """Helper for computing one node's address.
306
307     @type client: L{Client}
308     @param client: a C{Client} instance
309     @type node: str
310     @param node: the node we should connect
311
312     """
313     node_info = self._cfg.GetNodeInfo(node)
314     if node_info is not None:
315       if node_info.offline:
316         return RpcResult(node=node, offline=True)
317       addr = node_info.primary_ip
318     else:
319       addr = None
320     client.ConnectNode(node, address=addr)
321
322   def _MultiNodeCall(self, node_list, procedure, args):
323     """Helper for making a multi-node call
324
325     """
326     body = serializer.DumpJson(args, indent=False)
327     c = Client(procedure, body, self.port)
328     skip_dict = self._ConnectList(c, node_list)
329     skip_dict.update(c.GetResults())
330     return skip_dict
331
332   @classmethod
333   def _StaticMultiNodeCall(cls, node_list, procedure, args,
334                            address_list=None):
335     """Helper for making a multi-node static call
336
337     """
338     body = serializer.DumpJson(args, indent=False)
339     c = Client(procedure, body, utils.GetNodeDaemonPort())
340     c.ConnectList(node_list, address_list=address_list)
341     return c.GetResults()
342
343   def _SingleNodeCall(self, node, procedure, args):
344     """Helper for making a single-node call
345
346     """
347     body = serializer.DumpJson(args, indent=False)
348     c = Client(procedure, body, self.port)
349     result = self._ConnectNode(c, node)
350     if result is None:
351       # we did connect, node is not offline
352       result = c.GetResults()[node]
353     return result
354
355   @classmethod
356   def _StaticSingleNodeCall(cls, node, procedure, args):
357     """Helper for making a single-node static call
358
359     """
360     body = serializer.DumpJson(args, indent=False)
361     c = Client(procedure, body, utils.GetNodeDaemonPort())
362     c.ConnectNode(node)
363     return c.GetResults()[node]
364
365   @staticmethod
366   def _Compress(data):
367     """Compresses a string for transport over RPC.
368
369     Small amounts of data are not compressed.
370
371     @type data: str
372     @param data: Data
373     @rtype: tuple
374     @return: Encoded data to send
375
376     """
377     # Small amounts of data are not compressed
378     if len(data) < 512:
379       return (constants.RPC_ENCODING_NONE, data)
380
381     # Compress with zlib and encode in base64
382     return (constants.RPC_ENCODING_ZLIB_BASE64,
383             base64.b64encode(zlib.compress(data, 3)))
384
385   #
386   # Begin RPC calls
387   #
388
389   def call_volume_list(self, node_list, vg_name):
390     """Gets the logical volumes present in a given volume group.
391
392     This is a multi-node call.
393
394     """
395     return self._MultiNodeCall(node_list, "volume_list", [vg_name])
396
397   def call_vg_list(self, node_list):
398     """Gets the volume group list.
399
400     This is a multi-node call.
401
402     """
403     return self._MultiNodeCall(node_list, "vg_list", [])
404
405   def call_bridges_exist(self, node, bridges_list):
406     """Checks if a node has all the bridges given.
407
408     This method checks if all bridges given in the bridges_list are
409     present on the remote node, so that an instance that uses interfaces
410     on those bridges can be started.
411
412     This is a single-node call.
413
414     """
415     return self._SingleNodeCall(node, "bridges_exist", [bridges_list])
416
417   def call_instance_start(self, node, instance, extra_args):
418     """Starts an instance.
419
420     This is a single-node call.
421
422     """
423     return self._SingleNodeCall(node, "instance_start",
424                                 [self._InstDict(instance), extra_args])
425
426   def call_instance_shutdown(self, node, instance):
427     """Stops an instance.
428
429     This is a single-node call.
430
431     """
432     return self._SingleNodeCall(node, "instance_shutdown",
433                                 [self._InstDict(instance)])
434
435   def call_instance_migrate(self, node, instance, target, live):
436     """Migrate an instance.
437
438     This is a single-node call.
439
440     @type node: string
441     @param node: the node on which the instance is currently running
442     @type instance: C{objects.Instance}
443     @param instance: the instance definition
444     @type target: string
445     @param target: the target node name
446     @type live: boolean
447     @param live: whether the migration should be done live or not (the
448         interpretation of this parameter is left to the hypervisor)
449
450     """
451     return self._SingleNodeCall(node, "instance_migrate",
452                                 [self._InstDict(instance), target, live])
453
454   def call_instance_reboot(self, node, instance, reboot_type, extra_args):
455     """Reboots an instance.
456
457     This is a single-node call.
458
459     """
460     return self._SingleNodeCall(node, "instance_reboot",
461                                 [self._InstDict(instance), reboot_type,
462                                  extra_args])
463
464   def call_instance_os_add(self, node, inst):
465     """Installs an OS on the given instance.
466
467     This is a single-node call.
468
469     """
470     return self._SingleNodeCall(node, "instance_os_add",
471                                 [self._InstDict(inst)])
472
473   def call_instance_run_rename(self, node, inst, old_name):
474     """Run the OS rename script for an instance.
475
476     This is a single-node call.
477
478     """
479     return self._SingleNodeCall(node, "instance_run_rename",
480                                 [self._InstDict(inst), old_name])
481
482   def call_instance_info(self, node, instance, hname):
483     """Returns information about a single instance.
484
485     This is a single-node call.
486
487     @type node: list
488     @param node: the list of nodes to query
489     @type instance: string
490     @param instance: the instance name
491     @type hname: string
492     @param hname: the hypervisor type of the instance
493
494     """
495     return self._SingleNodeCall(node, "instance_info", [instance, hname])
496
497   def call_instance_migratable(self, node, instance):
498     """Checks whether the given instance can be migrated.
499
500     This is a single-node call.
501
502     @param node: the node to query
503     @type instance: L{objects.Instance}
504     @param instance: the instance to check
505
506
507     """
508     return self._SingleNodeCall(node, "instance_migratable",
509                                 [self._InstDict(instance)])
510
511   def call_all_instances_info(self, node_list, hypervisor_list):
512     """Returns information about all instances on the given nodes.
513
514     This is a multi-node call.
515
516     @type node_list: list
517     @param node_list: the list of nodes to query
518     @type hypervisor_list: list
519     @param hypervisor_list: the hypervisors to query for instances
520
521     """
522     return self._MultiNodeCall(node_list, "all_instances_info",
523                                [hypervisor_list])
524
525   def call_instance_list(self, node_list, hypervisor_list):
526     """Returns the list of running instances on a given node.
527
528     This is a multi-node call.
529
530     @type node_list: list
531     @param node_list: the list of nodes to query
532     @type hypervisor_list: list
533     @param hypervisor_list: the hypervisors to query for instances
534
535     """
536     return self._MultiNodeCall(node_list, "instance_list", [hypervisor_list])
537
538   def call_node_tcp_ping(self, node, source, target, port, timeout,
539                          live_port_needed):
540     """Do a TcpPing on the remote node
541
542     This is a single-node call.
543
544     """
545     return self._SingleNodeCall(node, "node_tcp_ping",
546                                 [source, target, port, timeout,
547                                  live_port_needed])
548
549   def call_node_has_ip_address(self, node, address):
550     """Checks if a node has the given IP address.
551
552     This is a single-node call.
553
554     """
555     return self._SingleNodeCall(node, "node_has_ip_address", [address])
556
557   def call_node_info(self, node_list, vg_name, hypervisor_type):
558     """Return node information.
559
560     This will return memory information and volume group size and free
561     space.
562
563     This is a multi-node call.
564
565     @type node_list: list
566     @param node_list: the list of nodes to query
567     @type vg_name: C{string}
568     @param vg_name: the name of the volume group to ask for disk space
569         information
570     @type hypervisor_type: C{str}
571     @param hypervisor_type: the name of the hypervisor to ask for
572         memory information
573
574     """
575     retux = self._MultiNodeCall(node_list, "node_info",
576                                 [vg_name, hypervisor_type])
577
578     for result in retux.itervalues():
579       if result.failed or not isinstance(result.data, dict):
580         result.data = {}
581       if result.offline:
582         log_name = None
583       else:
584         log_name = "call_node_info"
585
586       utils.CheckDict(result.data, {
587         'memory_total' : '-',
588         'memory_dom0' : '-',
589         'memory_free' : '-',
590         'vg_size' : 'node_unreachable',
591         'vg_free' : '-',
592         }, log_name)
593     return retux
594
595   def call_node_add(self, node, dsa, dsapub, rsa, rsapub, ssh, sshpub):
596     """Add a node to the cluster.
597
598     This is a single-node call.
599
600     """
601     return self._SingleNodeCall(node, "node_add",
602                                 [dsa, dsapub, rsa, rsapub, ssh, sshpub])
603
604   def call_node_verify(self, node_list, checkdict, cluster_name):
605     """Request verification of given parameters.
606
607     This is a multi-node call.
608
609     """
610     return self._MultiNodeCall(node_list, "node_verify",
611                                [checkdict, cluster_name])
612
613   @classmethod
614   def call_node_start_master(cls, node, start_daemons):
615     """Tells a node to activate itself as a master.
616
617     This is a single-node call.
618
619     """
620     return cls._StaticSingleNodeCall(node, "node_start_master",
621                                      [start_daemons])
622
623   @classmethod
624   def call_node_stop_master(cls, node, stop_daemons):
625     """Tells a node to demote itself from master status.
626
627     This is a single-node call.
628
629     """
630     return cls._StaticSingleNodeCall(node, "node_stop_master", [stop_daemons])
631
632   @classmethod
633   def call_master_info(cls, node_list):
634     """Query master info.
635
636     This is a multi-node call.
637
638     """
639     # TODO: should this method query down nodes?
640     return cls._StaticMultiNodeCall(node_list, "master_info", [])
641
642   def call_version(self, node_list):
643     """Query node version.
644
645     This is a multi-node call.
646
647     """
648     return self._MultiNodeCall(node_list, "version", [])
649
650   def call_blockdev_create(self, node, bdev, size, owner, on_primary, info):
651     """Request creation of a given block device.
652
653     This is a single-node call.
654
655     """
656     return self._SingleNodeCall(node, "blockdev_create",
657                                 [bdev.ToDict(), size, owner, on_primary, info])
658
659   def call_blockdev_remove(self, node, bdev):
660     """Request removal of a given block device.
661
662     This is a single-node call.
663
664     """
665     return self._SingleNodeCall(node, "blockdev_remove", [bdev.ToDict()])
666
667   def call_blockdev_rename(self, node, devlist):
668     """Request rename of the given block devices.
669
670     This is a single-node call.
671
672     """
673     return self._SingleNodeCall(node, "blockdev_rename",
674                                 [(d.ToDict(), uid) for d, uid in devlist])
675
676   def call_blockdev_assemble(self, node, disk, owner, on_primary):
677     """Request assembling of a given block device.
678
679     This is a single-node call.
680
681     """
682     return self._SingleNodeCall(node, "blockdev_assemble",
683                                 [disk.ToDict(), owner, on_primary])
684
685   def call_blockdev_shutdown(self, node, disk):
686     """Request shutdown of a given block device.
687
688     This is a single-node call.
689
690     """
691     return self._SingleNodeCall(node, "blockdev_shutdown", [disk.ToDict()])
692
693   def call_blockdev_addchildren(self, node, bdev, ndevs):
694     """Request adding a list of children to a (mirroring) device.
695
696     This is a single-node call.
697
698     """
699     return self._SingleNodeCall(node, "blockdev_addchildren",
700                                 [bdev.ToDict(),
701                                  [disk.ToDict() for disk in ndevs]])
702
703   def call_blockdev_removechildren(self, node, bdev, ndevs):
704     """Request removing a list of children from a (mirroring) device.
705
706     This is a single-node call.
707
708     """
709     return self._SingleNodeCall(node, "blockdev_removechildren",
710                                 [bdev.ToDict(),
711                                  [disk.ToDict() for disk in ndevs]])
712
713   def call_blockdev_getmirrorstatus(self, node, disks):
714     """Request status of a (mirroring) device.
715
716     This is a single-node call.
717
718     """
719     return self._SingleNodeCall(node, "blockdev_getmirrorstatus",
720                                 [dsk.ToDict() for dsk in disks])
721
722   def call_blockdev_find(self, node, disk):
723     """Request identification of a given block device.
724
725     This is a single-node call.
726
727     """
728     return self._SingleNodeCall(node, "blockdev_find", [disk.ToDict()])
729
730   def call_blockdev_close(self, node, instance_name, disks):
731     """Closes the given block devices.
732
733     This is a single-node call.
734
735     """
736     params = [instance_name, [cf.ToDict() for cf in disks]]
737     return self._SingleNodeCall(node, "blockdev_close", params)
738
739   @classmethod
740   def call_upload_file(cls, node_list, file_name, address_list=None):
741     """Upload a file.
742
743     The node will refuse the operation in case the file is not on the
744     approved file list.
745
746     This is a multi-node call.
747
748     @type node_list: list
749     @param node_list: the list of node names to upload to
750     @type file_name: str
751     @param file_name: the filename to upload
752     @type address_list: list or None
753     @keyword address_list: an optional list of node addresses, in order
754         to optimize the RPC speed
755
756     """
757     file_contents = utils.ReadFile(file_name)
758     data = cls._Compress(file_contents)
759     st = os.stat(file_name)
760     params = [file_name, data, st.st_mode, st.st_uid, st.st_gid,
761               st.st_atime, st.st_mtime]
762     return cls._StaticMultiNodeCall(node_list, "upload_file", params,
763                                     address_list=address_list)
764
765   @classmethod
766   def call_write_ssconf_files(cls, node_list, values):
767     """Write ssconf files.
768
769     This is a multi-node call.
770
771     """
772     return cls._StaticMultiNodeCall(node_list, "write_ssconf_files", [values])
773
774   def call_os_diagnose(self, node_list):
775     """Request a diagnose of OS definitions.
776
777     This is a multi-node call.
778
779     """
780     result = self._MultiNodeCall(node_list, "os_diagnose", [])
781
782     for node_result in result.values():
783       if not node_result.failed and node_result.data:
784         node_result.data = [objects.OS.FromDict(oss)
785                             for oss in node_result.data]
786     return result
787
788   def call_os_get(self, node, name):
789     """Returns an OS definition.
790
791     This is a single-node call.
792
793     """
794     result = self._SingleNodeCall(node, "os_get", [name])
795     if not result.failed and isinstance(result.data, dict):
796       result.data = objects.OS.FromDict(result.data)
797     return result
798
799   def call_hooks_runner(self, node_list, hpath, phase, env):
800     """Call the hooks runner.
801
802     Args:
803       - op: the OpCode instance
804       - env: a dictionary with the environment
805
806     This is a multi-node call.
807
808     """
809     params = [hpath, phase, env]
810     return self._MultiNodeCall(node_list, "hooks_runner", params)
811
812   def call_iallocator_runner(self, node, name, idata):
813     """Call an iallocator on a remote node
814
815     Args:
816       - name: the iallocator name
817       - input: the json-encoded input string
818
819     This is a single-node call.
820
821     """
822     return self._SingleNodeCall(node, "iallocator_runner", [name, idata])
823
824   def call_blockdev_grow(self, node, cf_bdev, amount):
825     """Request a snapshot of the given block device.
826
827     This is a single-node call.
828
829     """
830     return self._SingleNodeCall(node, "blockdev_grow",
831                                 [cf_bdev.ToDict(), amount])
832
833   def call_blockdev_snapshot(self, node, cf_bdev):
834     """Request a snapshot of the given block device.
835
836     This is a single-node call.
837
838     """
839     return self._SingleNodeCall(node, "blockdev_snapshot", [cf_bdev.ToDict()])
840
841   def call_snapshot_export(self, node, snap_bdev, dest_node, instance,
842                            cluster_name, idx):
843     """Request the export of a given snapshot.
844
845     This is a single-node call.
846
847     """
848     return self._SingleNodeCall(node, "snapshot_export",
849                                 [snap_bdev.ToDict(), dest_node,
850                                  self._InstDict(instance), cluster_name, idx])
851
852   def call_finalize_export(self, node, instance, snap_disks):
853     """Request the completion of an export operation.
854
855     This writes the export config file, etc.
856
857     This is a single-node call.
858
859     """
860     flat_disks = []
861     for disk in snap_disks:
862       flat_disks.append(disk.ToDict())
863
864     return self._SingleNodeCall(node, "finalize_export",
865                                 [self._InstDict(instance), flat_disks])
866
867   def call_export_info(self, node, path):
868     """Queries the export information in a given path.
869
870     This is a single-node call.
871
872     """
873     result = self._SingleNodeCall(node, "export_info", [path])
874     if not result.failed and result.data:
875       result.data = objects.SerializableConfigParser.Loads(str(result.data))
876     return result
877
878   def call_instance_os_import(self, node, inst, src_node, src_images,
879                               cluster_name):
880     """Request the import of a backup into an instance.
881
882     This is a single-node call.
883
884     """
885     return self._SingleNodeCall(node, "instance_os_import",
886                                 [self._InstDict(inst), src_node, src_images,
887                                  cluster_name])
888
889   def call_export_list(self, node_list):
890     """Gets the stored exports list.
891
892     This is a multi-node call.
893
894     """
895     return self._MultiNodeCall(node_list, "export_list", [])
896
897   def call_export_remove(self, node, export):
898     """Requests removal of a given export.
899
900     This is a single-node call.
901
902     """
903     return self._SingleNodeCall(node, "export_remove", [export])
904
905   @classmethod
906   def call_node_leave_cluster(cls, node):
907     """Requests a node to clean the cluster information it has.
908
909     This will remove the configuration information from the ganeti data
910     dir.
911
912     This is a single-node call.
913
914     """
915     return cls._StaticSingleNodeCall(node, "node_leave_cluster", [])
916
917   def call_node_volumes(self, node_list):
918     """Gets all volumes on node(s).
919
920     This is a multi-node call.
921
922     """
923     return self._MultiNodeCall(node_list, "node_volumes", [])
924
925   def call_node_demote_from_mc(self, node):
926     """Demote a node from the master candidate role.
927
928     This is a single-node call.
929
930     """
931     return self._SingleNodeCall(node, "node_demote_from_mc", [])
932
933   def call_test_delay(self, node_list, duration):
934     """Sleep for a fixed time on given node(s).
935
936     This is a multi-node call.
937
938     """
939     return self._MultiNodeCall(node_list, "test_delay", [duration])
940
941   def call_file_storage_dir_create(self, node, file_storage_dir):
942     """Create the given file storage directory.
943
944     This is a single-node call.
945
946     """
947     return self._SingleNodeCall(node, "file_storage_dir_create",
948                                 [file_storage_dir])
949
950   def call_file_storage_dir_remove(self, node, file_storage_dir):
951     """Remove the given file storage directory.
952
953     This is a single-node call.
954
955     """
956     return self._SingleNodeCall(node, "file_storage_dir_remove",
957                                 [file_storage_dir])
958
959   def call_file_storage_dir_rename(self, node, old_file_storage_dir,
960                                    new_file_storage_dir):
961     """Rename file storage directory.
962
963     This is a single-node call.
964
965     """
966     return self._SingleNodeCall(node, "file_storage_dir_rename",
967                                 [old_file_storage_dir, new_file_storage_dir])
968
969   @classmethod
970   def call_jobqueue_update(cls, node_list, address_list, file_name, content):
971     """Update job queue.
972
973     This is a multi-node call.
974
975     """
976     return cls._StaticMultiNodeCall(node_list, "jobqueue_update",
977                                     [file_name, cls._Compress(content)],
978                                     address_list=address_list)
979
980   @classmethod
981   def call_jobqueue_purge(cls, node):
982     """Purge job queue.
983
984     This is a single-node call.
985
986     """
987     return cls._StaticSingleNodeCall(node, "jobqueue_purge", [])
988
989   @classmethod
990   def call_jobqueue_rename(cls, node_list, address_list, rename):
991     """Rename a job queue file.
992
993     This is a multi-node call.
994
995     """
996     return cls._StaticMultiNodeCall(node_list, "jobqueue_rename", rename,
997                                     address_list=address_list)
998
999   @classmethod
1000   def call_jobqueue_set_drain(cls, node_list, drain_flag):
1001     """Set the drain flag on the queue.
1002
1003     This is a multi-node call.
1004
1005     @type node_list: list
1006     @param node_list: the list of nodes to query
1007     @type drain_flag: bool
1008     @param drain_flag: if True, will set the drain flag, otherwise reset it.
1009
1010     """
1011     return cls._StaticMultiNodeCall(node_list, "jobqueue_set_drain",
1012                                     [drain_flag])
1013
1014   def call_hypervisor_validate_params(self, node_list, hvname, hvparams):
1015     """Validate the hypervisor params.
1016
1017     This is a multi-node call.
1018
1019     @type node_list: list
1020     @param node_list: the list of nodes to query
1021     @type hvname: string
1022     @param hvname: the hypervisor name
1023     @type hvparams: dict
1024     @param hvparams: the hypervisor parameters to be validated
1025
1026     """
1027     cluster = self._cfg.GetClusterInfo()
1028     hv_full = cluster.FillDict(cluster.hvparams.get(hvname, {}), hvparams)
1029     return self._MultiNodeCall(node_list, "hypervisor_validate_params",
1030                                [hvname, hv_full])