Fix handling of failures in create instance disks
[ganeti-local] / lib / rpc.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Inter-node RPC library.
23
24 """
25
26 # pylint: disable-msg=C0103,R0201,R0904
27 # C0103: Invalid name, since call_ are not valid
28 # R0201: Method could be a function, we keep all rpcs instance methods
29 # as not to change them back and forth between static/instance methods
30 # if they need to start using instance attributes
31 # R0904: Too many public methods
32
33 import os
34 import socket
35 import logging
36 import zlib
37 import base64
38
39 from ganeti import utils
40 from ganeti import objects
41 from ganeti import http
42 from ganeti import serializer
43 from ganeti import constants
44 from ganeti import errors
45
46 import ganeti.http.client
47
48
49 # Module level variable
50 _http_manager = None
51
52
53 def Init():
54   """Initializes the module-global HTTP client manager.
55
56   Must be called before using any RPC function.
57
58   """
59   global _http_manager
60
61   assert not _http_manager, "RPC module initialized more than once"
62
63   _http_manager = http.client.HttpClientManager()
64
65
66 def Shutdown():
67   """Stops the module-global HTTP client manager.
68
69   Must be called before quitting the program.
70
71   """
72   global _http_manager
73
74   if _http_manager:
75     _http_manager.Shutdown()
76     _http_manager = None
77
78
79 class RpcResult(object):
80   """RPC Result class.
81
82   This class holds an RPC result. It is needed since in multi-node
83   calls we can't raise an exception just because one one out of many
84   failed, and therefore we use this class to encapsulate the result.
85
86   @ivar data: the data payload, for successfull results, or None
87   @type failed: boolean
88   @ivar failed: whether the operation failed at RPC level (not
89       application level on the remote node)
90   @ivar call: the name of the RPC call
91   @ivar node: the name of the node to which we made the call
92   @ivar offline: whether the operation failed because the node was
93       offline, as opposed to actual failure; offline=True will always
94       imply failed=True, in order to allow simpler checking if
95       the user doesn't care about the exact failure mode
96
97   """
98   def __init__(self, data=None, failed=False, offline=False,
99                call=None, node=None):
100     self.failed = failed
101     self.offline = offline
102     self.call = call
103     self.node = node
104     if offline:
105       self.failed = True
106       self.error = "Node is marked offline"
107       self.data = None
108     elif failed:
109       self.error = data
110       self.data = None
111     else:
112       self.data = data
113       self.error = None
114
115   def Raise(self):
116     """If the result has failed, raise an OpExecError.
117
118     This is used so that LU code doesn't have to check for each
119     result, but instead can call this function.
120
121     """
122     if self.failed:
123       raise errors.OpExecError("Call '%s' to node '%s' has failed: %s" %
124                                (self.call, self.node, self.error))
125
126   def RemoteFailMsg(self):
127     """Check if the remote procedure failed.
128
129     This is valid only for RPC calls which return result of the form
130     (status, data | error_msg).
131
132     @return: empty string for succcess, otherwise an error message
133
134     """
135     def _EnsureErr(val):
136       """Helper to ensure we return a 'True' value for error."""
137       if val:
138         return val
139       else:
140         return "No error information"
141
142     if self.failed:
143       return _EnsureErr(self.error)
144     if not isinstance(self.data, (tuple, list)):
145       return "Invalid result type (%s)" % type(self.data)
146     if len(self.data) != 2:
147       return "Invalid result length (%d), expected 2" % len(self.data)
148     if not self.data[0]:
149       return _EnsureErr(self.data[1])
150     return ""
151
152
153 class Client:
154   """RPC Client class.
155
156   This class, given a (remote) method name, a list of parameters and a
157   list of nodes, will contact (in parallel) all nodes, and return a
158   dict of results (key: node name, value: result).
159
160   One current bug is that generic failure is still signalled by
161   'False' result, which is not good. This overloading of values can
162   cause bugs.
163
164   """
165   def __init__(self, procedure, body, port):
166     self.procedure = procedure
167     self.body = body
168     self.port = port
169     self.nc = {}
170
171     self._ssl_params = \
172       http.HttpSslParams(ssl_key_path=constants.SSL_CERT_FILE,
173                          ssl_cert_path=constants.SSL_CERT_FILE)
174
175   def ConnectList(self, node_list, address_list=None):
176     """Add a list of nodes to the target nodes.
177
178     @type node_list: list
179     @param node_list: the list of node names to connect
180     @type address_list: list or None
181     @keyword address_list: either None or a list with node addresses,
182         which must have the same length as the node list
183
184     """
185     if address_list is None:
186       address_list = [None for _ in node_list]
187     else:
188       assert len(node_list) == len(address_list), \
189              "Name and address lists should have the same length"
190     for node, address in zip(node_list, address_list):
191       self.ConnectNode(node, address)
192
193   def ConnectNode(self, name, address=None):
194     """Add a node to the target list.
195
196     @type name: str
197     @param name: the node name
198     @type address: str
199     @keyword address: the node address, if known
200
201     """
202     if address is None:
203       address = name
204
205     self.nc[name] = \
206       http.client.HttpClientRequest(address, self.port, http.HTTP_PUT,
207                                     "/%s" % self.procedure,
208                                     post_data=self.body,
209                                     ssl_params=self._ssl_params,
210                                     ssl_verify_peer=True)
211
212   def GetResults(self):
213     """Call nodes and return results.
214
215     @rtype: list
216     @returns: List of RPC results
217
218     """
219     assert _http_manager, "RPC module not intialized"
220
221     _http_manager.ExecRequests(self.nc.values())
222
223     results = {}
224
225     for name, req in self.nc.iteritems():
226       if req.success and req.resp_status_code == http.HTTP_OK:
227         results[name] = RpcResult(data=serializer.LoadJson(req.resp_body),
228                                   node=name, call=self.procedure)
229         continue
230
231       # TODO: Better error reporting
232       if req.error:
233         msg = req.error
234       else:
235         msg = req.resp_body
236
237       logging.error("RPC error from node %s: %s", name, msg)
238       results[name] = RpcResult(data=msg, failed=True, node=name,
239                                 call=self.procedure)
240
241     return results
242
243
244 class RpcRunner(object):
245   """RPC runner class"""
246
247   def __init__(self, cfg):
248     """Initialized the rpc runner.
249
250     @type cfg:  C{config.ConfigWriter}
251     @param cfg: the configuration object that will be used to get data
252                 about the cluster
253
254     """
255     self._cfg = cfg
256     self.port = utils.GetNodeDaemonPort()
257
258   def _InstDict(self, instance):
259     """Convert the given instance to a dict.
260
261     This is done via the instance's ToDict() method and additionally
262     we fill the hvparams with the cluster defaults.
263
264     @type instance: L{objects.Instance}
265     @param instance: an Instance object
266     @rtype: dict
267     @return: the instance dict, with the hvparams filled with the
268         cluster defaults
269
270     """
271     idict = instance.ToDict()
272     cluster = self._cfg.GetClusterInfo()
273     idict["hvparams"] = cluster.FillHV(instance)
274     idict["beparams"] = cluster.FillBE(instance)
275     return idict
276
277   def _ConnectList(self, client, node_list):
278     """Helper for computing node addresses.
279
280     @type client: L{Client}
281     @param client: a C{Client} instance
282     @type node_list: list
283     @param node_list: the node list we should connect
284
285     """
286     all_nodes = self._cfg.GetAllNodesInfo()
287     name_list = []
288     addr_list = []
289     skip_dict = {}
290     for node in node_list:
291       if node in all_nodes:
292         if all_nodes[node].offline:
293           skip_dict[node] = RpcResult(node=node, offline=True)
294           continue
295         val = all_nodes[node].primary_ip
296       else:
297         val = None
298       addr_list.append(val)
299       name_list.append(node)
300     if name_list:
301       client.ConnectList(name_list, address_list=addr_list)
302     return skip_dict
303
304   def _ConnectNode(self, client, node):
305     """Helper for computing one node's address.
306
307     @type client: L{Client}
308     @param client: a C{Client} instance
309     @type node: str
310     @param node: the node we should connect
311
312     """
313     node_info = self._cfg.GetNodeInfo(node)
314     if node_info is not None:
315       if node_info.offline:
316         return RpcResult(node=node, offline=True)
317       addr = node_info.primary_ip
318     else:
319       addr = None
320     client.ConnectNode(node, address=addr)
321
322   def _MultiNodeCall(self, node_list, procedure, args):
323     """Helper for making a multi-node call
324
325     """
326     body = serializer.DumpJson(args, indent=False)
327     c = Client(procedure, body, self.port)
328     skip_dict = self._ConnectList(c, node_list)
329     skip_dict.update(c.GetResults())
330     return skip_dict
331
332   @classmethod
333   def _StaticMultiNodeCall(cls, node_list, procedure, args,
334                            address_list=None):
335     """Helper for making a multi-node static call
336
337     """
338     body = serializer.DumpJson(args, indent=False)
339     c = Client(procedure, body, utils.GetNodeDaemonPort())
340     c.ConnectList(node_list, address_list=address_list)
341     return c.GetResults()
342
343   def _SingleNodeCall(self, node, procedure, args):
344     """Helper for making a single-node call
345
346     """
347     body = serializer.DumpJson(args, indent=False)
348     c = Client(procedure, body, self.port)
349     result = self._ConnectNode(c, node)
350     if result is None:
351       # we did connect, node is not offline
352       result = c.GetResults()[node]
353     return result
354
355   @classmethod
356   def _StaticSingleNodeCall(cls, node, procedure, args):
357     """Helper for making a single-node static call
358
359     """
360     body = serializer.DumpJson(args, indent=False)
361     c = Client(procedure, body, utils.GetNodeDaemonPort())
362     c.ConnectNode(node)
363     return c.GetResults()[node]
364
365   @staticmethod
366   def _Compress(data):
367     """Compresses a string for transport over RPC.
368
369     Small amounts of data are not compressed.
370
371     @type data: str
372     @param data: Data
373     @rtype: tuple
374     @return: Encoded data to send
375
376     """
377     # Small amounts of data are not compressed
378     if len(data) < 512:
379       return (constants.RPC_ENCODING_NONE, data)
380
381     # Compress with zlib and encode in base64
382     return (constants.RPC_ENCODING_ZLIB_BASE64,
383             base64.b64encode(zlib.compress(data, 3)))
384
385   #
386   # Begin RPC calls
387   #
388
389   def call_volume_list(self, node_list, vg_name):
390     """Gets the logical volumes present in a given volume group.
391
392     This is a multi-node call.
393
394     """
395     return self._MultiNodeCall(node_list, "volume_list", [vg_name])
396
397   def call_vg_list(self, node_list):
398     """Gets the volume group list.
399
400     This is a multi-node call.
401
402     """
403     return self._MultiNodeCall(node_list, "vg_list", [])
404
405   def call_bridges_exist(self, node, bridges_list):
406     """Checks if a node has all the bridges given.
407
408     This method checks if all bridges given in the bridges_list are
409     present on the remote node, so that an instance that uses interfaces
410     on those bridges can be started.
411
412     This is a single-node call.
413
414     """
415     return self._SingleNodeCall(node, "bridges_exist", [bridges_list])
416
417   def call_instance_start(self, node, instance, extra_args):
418     """Starts an instance.
419
420     This is a single-node call.
421
422     """
423     return self._SingleNodeCall(node, "instance_start",
424                                 [self._InstDict(instance), extra_args])
425
426   def call_instance_shutdown(self, node, instance):
427     """Stops an instance.
428
429     This is a single-node call.
430
431     """
432     return self._SingleNodeCall(node, "instance_shutdown",
433                                 [self._InstDict(instance)])
434
435   def call_instance_migrate(self, node, instance, target, live):
436     """Migrate an instance.
437
438     This is a single-node call.
439
440     @type node: string
441     @param node: the node on which the instance is currently running
442     @type instance: C{objects.Instance}
443     @param instance: the instance definition
444     @type target: string
445     @param target: the target node name
446     @type live: boolean
447     @param live: whether the migration should be done live or not (the
448         interpretation of this parameter is left to the hypervisor)
449
450     """
451     return self._SingleNodeCall(node, "instance_migrate",
452                                 [self._InstDict(instance), target, live])
453
454   def call_instance_reboot(self, node, instance, reboot_type, extra_args):
455     """Reboots an instance.
456
457     This is a single-node call.
458
459     """
460     return self._SingleNodeCall(node, "instance_reboot",
461                                 [self._InstDict(instance), reboot_type,
462                                  extra_args])
463
464   def call_instance_os_add(self, node, inst):
465     """Installs an OS on the given instance.
466
467     This is a single-node call.
468
469     """
470     return self._SingleNodeCall(node, "instance_os_add",
471                                 [self._InstDict(inst)])
472
473   def call_instance_run_rename(self, node, inst, old_name):
474     """Run the OS rename script for an instance.
475
476     This is a single-node call.
477
478     """
479     return self._SingleNodeCall(node, "instance_run_rename",
480                                 [self._InstDict(inst), old_name])
481
482   def call_instance_info(self, node, instance, hname):
483     """Returns information about a single instance.
484
485     This is a single-node call.
486
487     @type node: list
488     @param node: the list of nodes to query
489     @type instance: string
490     @param instance: the instance name
491     @type hname: string
492     @param hname: the hypervisor type of the instance
493
494     """
495     return self._SingleNodeCall(node, "instance_info", [instance, hname])
496
497   def call_instance_migratable(self, node, instance):
498     """Checks whether the given instance can be migrated.
499
500     This is a single-node call.
501
502     @param node: the node to query
503     @type instance: L{objects.Instance}
504     @param instance: the instance to check
505
506
507     """
508     return self._SingleNodeCall(node, "instance_migratable",
509                                 [self._InstDict(instance)])
510
511   def call_all_instances_info(self, node_list, hypervisor_list):
512     """Returns information about all instances on the given nodes.
513
514     This is a multi-node call.
515
516     @type node_list: list
517     @param node_list: the list of nodes to query
518     @type hypervisor_list: list
519     @param hypervisor_list: the hypervisors to query for instances
520
521     """
522     return self._MultiNodeCall(node_list, "all_instances_info",
523                                [hypervisor_list])
524
525   def call_instance_list(self, node_list, hypervisor_list):
526     """Returns the list of running instances on a given node.
527
528     This is a multi-node call.
529
530     @type node_list: list
531     @param node_list: the list of nodes to query
532     @type hypervisor_list: list
533     @param hypervisor_list: the hypervisors to query for instances
534
535     """
536     return self._MultiNodeCall(node_list, "instance_list", [hypervisor_list])
537
538   def call_node_tcp_ping(self, node, source, target, port, timeout,
539                          live_port_needed):
540     """Do a TcpPing on the remote node
541
542     This is a single-node call.
543
544     """
545     return self._SingleNodeCall(node, "node_tcp_ping",
546                                 [source, target, port, timeout,
547                                  live_port_needed])
548
549   def call_node_has_ip_address(self, node, address):
550     """Checks if a node has the given IP address.
551
552     This is a single-node call.
553
554     """
555     return self._SingleNodeCall(node, "node_has_ip_address", [address])
556
557   def call_node_info(self, node_list, vg_name, hypervisor_type):
558     """Return node information.
559
560     This will return memory information and volume group size and free
561     space.
562
563     This is a multi-node call.
564
565     @type node_list: list
566     @param node_list: the list of nodes to query
567     @type vg_name: C{string}
568     @param vg_name: the name of the volume group to ask for disk space
569         information
570     @type hypervisor_type: C{str}
571     @param hypervisor_type: the name of the hypervisor to ask for
572         memory information
573
574     """
575     retux = self._MultiNodeCall(node_list, "node_info",
576                                 [vg_name, hypervisor_type])
577
578     for result in retux.itervalues():
579       if result.failed or not isinstance(result.data, dict):
580         result.data = {}
581       if result.offline:
582         log_name = None
583       else:
584         log_name = "call_node_info"
585
586       utils.CheckDict(result.data, {
587         'memory_total' : '-',
588         'memory_dom0' : '-',
589         'memory_free' : '-',
590         'vg_size' : 'node_unreachable',
591         'vg_free' : '-',
592         }, log_name)
593     return retux
594
595   def call_node_add(self, node, dsa, dsapub, rsa, rsapub, ssh, sshpub):
596     """Add a node to the cluster.
597
598     This is a single-node call.
599
600     """
601     return self._SingleNodeCall(node, "node_add",
602                                 [dsa, dsapub, rsa, rsapub, ssh, sshpub])
603
604   def call_node_verify(self, node_list, checkdict, cluster_name):
605     """Request verification of given parameters.
606
607     This is a multi-node call.
608
609     """
610     return self._MultiNodeCall(node_list, "node_verify",
611                                [checkdict, cluster_name])
612
613   @classmethod
614   def call_node_start_master(cls, node, start_daemons):
615     """Tells a node to activate itself as a master.
616
617     This is a single-node call.
618
619     """
620     return cls._StaticSingleNodeCall(node, "node_start_master",
621                                      [start_daemons])
622
623   @classmethod
624   def call_node_stop_master(cls, node, stop_daemons):
625     """Tells a node to demote itself from master status.
626
627     This is a single-node call.
628
629     """
630     return cls._StaticSingleNodeCall(node, "node_stop_master", [stop_daemons])
631
632   @classmethod
633   def call_master_info(cls, node_list):
634     """Query master info.
635
636     This is a multi-node call.
637
638     """
639     # TODO: should this method query down nodes?
640     return cls._StaticMultiNodeCall(node_list, "master_info", [])
641
642   def call_version(self, node_list):
643     """Query node version.
644
645     This is a multi-node call.
646
647     """
648     return self._MultiNodeCall(node_list, "version", [])
649
650   def call_blockdev_create(self, node, bdev, size, owner, on_primary, info):
651     """Request creation of a given block device.
652
653     This is a single-node call.
654
655     """
656     return self._SingleNodeCall(node, "blockdev_create",
657                                 [bdev.ToDict(), size, owner, on_primary, info])
658
659   def call_blockdev_remove(self, node, bdev):
660     """Request removal of a given block device.
661
662     This is a single-node call.
663
664     """
665     return self._SingleNodeCall(node, "blockdev_remove", [bdev.ToDict()])
666
667   def call_blockdev_rename(self, node, devlist):
668     """Request rename of the given block devices.
669
670     This is a single-node call.
671
672     """
673     return self._SingleNodeCall(node, "blockdev_rename",
674                                 [(d.ToDict(), uid) for d, uid in devlist])
675
676   def call_blockdev_assemble(self, node, disk, owner, on_primary):
677     """Request assembling of a given block device.
678
679     This is a single-node call.
680
681     """
682     return self._SingleNodeCall(node, "blockdev_assemble",
683                                 [disk.ToDict(), owner, on_primary])
684
685   def call_blockdev_shutdown(self, node, disk):
686     """Request shutdown of a given block device.
687
688     This is a single-node call.
689
690     """
691     return self._SingleNodeCall(node, "blockdev_shutdown", [disk.ToDict()])
692
693   def call_blockdev_addchildren(self, node, bdev, ndevs):
694     """Request adding a list of children to a (mirroring) device.
695
696     This is a single-node call.
697
698     """
699     return self._SingleNodeCall(node, "blockdev_addchildren",
700                                 [bdev.ToDict(),
701                                  [disk.ToDict() for disk in ndevs]])
702
703   def call_blockdev_removechildren(self, node, bdev, ndevs):
704     """Request removing a list of children from a (mirroring) device.
705
706     This is a single-node call.
707
708     """
709     return self._SingleNodeCall(node, "blockdev_removechildren",
710                                 [bdev.ToDict(),
711                                  [disk.ToDict() for disk in ndevs]])
712
713   def call_blockdev_getmirrorstatus(self, node, disks):
714     """Request status of a (mirroring) device.
715
716     This is a single-node call.
717
718     """
719     return self._SingleNodeCall(node, "blockdev_getmirrorstatus",
720                                 [dsk.ToDict() for dsk in disks])
721
722   def call_blockdev_find(self, node, disk):
723     """Request identification of a given block device.
724
725     This is a single-node call.
726
727     """
728     return self._SingleNodeCall(node, "blockdev_find", [disk.ToDict()])
729
730   def call_blockdev_close(self, node, instance_name, disks):
731     """Closes the given block devices.
732
733     This is a single-node call.
734
735     """
736     params = [instance_name, [cf.ToDict() for cf in disks]]
737     return self._SingleNodeCall(node, "blockdev_close", params)
738
739   def call_drbd_disconnect_net(self, node_list, nodes_ip, disks):
740     """Disconnects the network of the given drbd devices.
741
742     This is a multi-node call.
743
744     """
745     return self._MultiNodeCall(node_list, "drbd_disconnect_net",
746                                [nodes_ip, [cf.ToDict() for cf in disks]])
747
748   def call_drbd_attach_net(self, node_list, nodes_ip,
749                            disks, instance_name, multimaster):
750     """Disconnects the given drbd devices.
751
752     This is a multi-node call.
753
754     """
755     return self._MultiNodeCall(node_list, "drbd_attach_net",
756                                [nodes_ip, [cf.ToDict() for cf in disks],
757                                 instance_name, multimaster])
758
759   def call_drbd_wait_sync(self, node_list, nodes_ip, disks):
760     """Waits for the synchronization of drbd devices is complete.
761
762     This is a multi-node call.
763
764     """
765     return self._MultiNodeCall(node_list, "drbd_wait_sync",
766                                [nodes_ip, [cf.ToDict() for cf in disks]])
767
768   @classmethod
769   def call_upload_file(cls, node_list, file_name, address_list=None):
770     """Upload a file.
771
772     The node will refuse the operation in case the file is not on the
773     approved file list.
774
775     This is a multi-node call.
776
777     @type node_list: list
778     @param node_list: the list of node names to upload to
779     @type file_name: str
780     @param file_name: the filename to upload
781     @type address_list: list or None
782     @keyword address_list: an optional list of node addresses, in order
783         to optimize the RPC speed
784
785     """
786     file_contents = utils.ReadFile(file_name)
787     data = cls._Compress(file_contents)
788     st = os.stat(file_name)
789     params = [file_name, data, st.st_mode, st.st_uid, st.st_gid,
790               st.st_atime, st.st_mtime]
791     return cls._StaticMultiNodeCall(node_list, "upload_file", params,
792                                     address_list=address_list)
793
794   @classmethod
795   def call_write_ssconf_files(cls, node_list, values):
796     """Write ssconf files.
797
798     This is a multi-node call.
799
800     """
801     return cls._StaticMultiNodeCall(node_list, "write_ssconf_files", [values])
802
803   def call_os_diagnose(self, node_list):
804     """Request a diagnose of OS definitions.
805
806     This is a multi-node call.
807
808     """
809     result = self._MultiNodeCall(node_list, "os_diagnose", [])
810
811     for node_result in result.values():
812       if not node_result.failed and node_result.data:
813         node_result.data = [objects.OS.FromDict(oss)
814                             for oss in node_result.data]
815     return result
816
817   def call_os_get(self, node, name):
818     """Returns an OS definition.
819
820     This is a single-node call.
821
822     """
823     result = self._SingleNodeCall(node, "os_get", [name])
824     if not result.failed and isinstance(result.data, dict):
825       result.data = objects.OS.FromDict(result.data)
826     return result
827
828   def call_hooks_runner(self, node_list, hpath, phase, env):
829     """Call the hooks runner.
830
831     Args:
832       - op: the OpCode instance
833       - env: a dictionary with the environment
834
835     This is a multi-node call.
836
837     """
838     params = [hpath, phase, env]
839     return self._MultiNodeCall(node_list, "hooks_runner", params)
840
841   def call_iallocator_runner(self, node, name, idata):
842     """Call an iallocator on a remote node
843
844     Args:
845       - name: the iallocator name
846       - input: the json-encoded input string
847
848     This is a single-node call.
849
850     """
851     return self._SingleNodeCall(node, "iallocator_runner", [name, idata])
852
853   def call_blockdev_grow(self, node, cf_bdev, amount):
854     """Request a snapshot of the given block device.
855
856     This is a single-node call.
857
858     """
859     return self._SingleNodeCall(node, "blockdev_grow",
860                                 [cf_bdev.ToDict(), amount])
861
862   def call_blockdev_snapshot(self, node, cf_bdev):
863     """Request a snapshot of the given block device.
864
865     This is a single-node call.
866
867     """
868     return self._SingleNodeCall(node, "blockdev_snapshot", [cf_bdev.ToDict()])
869
870   def call_snapshot_export(self, node, snap_bdev, dest_node, instance,
871                            cluster_name, idx):
872     """Request the export of a given snapshot.
873
874     This is a single-node call.
875
876     """
877     return self._SingleNodeCall(node, "snapshot_export",
878                                 [snap_bdev.ToDict(), dest_node,
879                                  self._InstDict(instance), cluster_name, idx])
880
881   def call_finalize_export(self, node, instance, snap_disks):
882     """Request the completion of an export operation.
883
884     This writes the export config file, etc.
885
886     This is a single-node call.
887
888     """
889     flat_disks = []
890     for disk in snap_disks:
891       flat_disks.append(disk.ToDict())
892
893     return self._SingleNodeCall(node, "finalize_export",
894                                 [self._InstDict(instance), flat_disks])
895
896   def call_export_info(self, node, path):
897     """Queries the export information in a given path.
898
899     This is a single-node call.
900
901     """
902     result = self._SingleNodeCall(node, "export_info", [path])
903     if not result.failed and result.data:
904       result.data = objects.SerializableConfigParser.Loads(str(result.data))
905     return result
906
907   def call_instance_os_import(self, node, inst, src_node, src_images,
908                               cluster_name):
909     """Request the import of a backup into an instance.
910
911     This is a single-node call.
912
913     """
914     return self._SingleNodeCall(node, "instance_os_import",
915                                 [self._InstDict(inst), src_node, src_images,
916                                  cluster_name])
917
918   def call_export_list(self, node_list):
919     """Gets the stored exports list.
920
921     This is a multi-node call.
922
923     """
924     return self._MultiNodeCall(node_list, "export_list", [])
925
926   def call_export_remove(self, node, export):
927     """Requests removal of a given export.
928
929     This is a single-node call.
930
931     """
932     return self._SingleNodeCall(node, "export_remove", [export])
933
934   @classmethod
935   def call_node_leave_cluster(cls, node):
936     """Requests a node to clean the cluster information it has.
937
938     This will remove the configuration information from the ganeti data
939     dir.
940
941     This is a single-node call.
942
943     """
944     return cls._StaticSingleNodeCall(node, "node_leave_cluster", [])
945
946   def call_node_volumes(self, node_list):
947     """Gets all volumes on node(s).
948
949     This is a multi-node call.
950
951     """
952     return self._MultiNodeCall(node_list, "node_volumes", [])
953
954   def call_node_demote_from_mc(self, node):
955     """Demote a node from the master candidate role.
956
957     This is a single-node call.
958
959     """
960     return self._SingleNodeCall(node, "node_demote_from_mc", [])
961
962   def call_test_delay(self, node_list, duration):
963     """Sleep for a fixed time on given node(s).
964
965     This is a multi-node call.
966
967     """
968     return self._MultiNodeCall(node_list, "test_delay", [duration])
969
970   def call_file_storage_dir_create(self, node, file_storage_dir):
971     """Create the given file storage directory.
972
973     This is a single-node call.
974
975     """
976     return self._SingleNodeCall(node, "file_storage_dir_create",
977                                 [file_storage_dir])
978
979   def call_file_storage_dir_remove(self, node, file_storage_dir):
980     """Remove the given file storage directory.
981
982     This is a single-node call.
983
984     """
985     return self._SingleNodeCall(node, "file_storage_dir_remove",
986                                 [file_storage_dir])
987
988   def call_file_storage_dir_rename(self, node, old_file_storage_dir,
989                                    new_file_storage_dir):
990     """Rename file storage directory.
991
992     This is a single-node call.
993
994     """
995     return self._SingleNodeCall(node, "file_storage_dir_rename",
996                                 [old_file_storage_dir, new_file_storage_dir])
997
998   @classmethod
999   def call_jobqueue_update(cls, node_list, address_list, file_name, content):
1000     """Update job queue.
1001
1002     This is a multi-node call.
1003
1004     """
1005     return cls._StaticMultiNodeCall(node_list, "jobqueue_update",
1006                                     [file_name, cls._Compress(content)],
1007                                     address_list=address_list)
1008
1009   @classmethod
1010   def call_jobqueue_purge(cls, node):
1011     """Purge job queue.
1012
1013     This is a single-node call.
1014
1015     """
1016     return cls._StaticSingleNodeCall(node, "jobqueue_purge", [])
1017
1018   @classmethod
1019   def call_jobqueue_rename(cls, node_list, address_list, rename):
1020     """Rename a job queue file.
1021
1022     This is a multi-node call.
1023
1024     """
1025     return cls._StaticMultiNodeCall(node_list, "jobqueue_rename", rename,
1026                                     address_list=address_list)
1027
1028   @classmethod
1029   def call_jobqueue_set_drain(cls, node_list, drain_flag):
1030     """Set the drain flag on the queue.
1031
1032     This is a multi-node call.
1033
1034     @type node_list: list
1035     @param node_list: the list of nodes to query
1036     @type drain_flag: bool
1037     @param drain_flag: if True, will set the drain flag, otherwise reset it.
1038
1039     """
1040     return cls._StaticMultiNodeCall(node_list, "jobqueue_set_drain",
1041                                     [drain_flag])
1042
1043   def call_hypervisor_validate_params(self, node_list, hvname, hvparams):
1044     """Validate the hypervisor params.
1045
1046     This is a multi-node call.
1047
1048     @type node_list: list
1049     @param node_list: the list of nodes to query
1050     @type hvname: string
1051     @param hvname: the hypervisor name
1052     @type hvparams: dict
1053     @param hvparams: the hypervisor parameters to be validated
1054
1055     """
1056     cluster = self._cfg.GetClusterInfo()
1057     hv_full = cluster.FillDict(cluster.hvparams.get(hvname, {}), hvparams)
1058     return self._MultiNodeCall(node_list, "hypervisor_validate_params",
1059                                [hvname, hv_full])