KVM: allow netboot
[ganeti-local] / lib / rpc.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Inter-node RPC library.
23
24 """
25
26 # pylint: disable-msg=C0103,R0201,R0904
27 # C0103: Invalid name, since call_ are not valid
28 # R0201: Method could be a function, we keep all rpcs instance methods
29 # as not to change them back and forth between static/instance methods
30 # if they need to start using instance attributes
31 # R0904: Too many public methods
32
33 import os
34 import socket
35 import logging
36 import zlib
37 import base64
38
39 from ganeti import utils
40 from ganeti import objects
41 from ganeti import http
42 from ganeti import serializer
43 from ganeti import constants
44 from ganeti import errors
45
46 import ganeti.http.client
47
48
49 # Module level variable
50 _http_manager = None
51
52
53 def Init():
54   """Initializes the module-global HTTP client manager.
55
56   Must be called before using any RPC function.
57
58   """
59   global _http_manager
60
61   assert not _http_manager, "RPC module initialized more than once"
62
63   _http_manager = http.client.HttpClientManager()
64
65
66 def Shutdown():
67   """Stops the module-global HTTP client manager.
68
69   Must be called before quitting the program.
70
71   """
72   global _http_manager
73
74   if _http_manager:
75     _http_manager.Shutdown()
76     _http_manager = None
77
78
79 class RpcResult(object):
80   """RPC Result class.
81
82   This class holds an RPC result. It is needed since in multi-node
83   calls we can't raise an exception just because one one out of many
84   failed, and therefore we use this class to encapsulate the result.
85
86   @ivar data: the data payload, for successfull results, or None
87   @type failed: boolean
88   @ivar failed: whether the operation failed at RPC level (not
89       application level on the remote node)
90   @ivar call: the name of the RPC call
91   @ivar node: the name of the node to which we made the call
92   @ivar offline: whether the operation failed because the node was
93       offline, as opposed to actual failure; offline=True will always
94       imply failed=True, in order to allow simpler checking if
95       the user doesn't care about the exact failure mode
96
97   """
98   def __init__(self, data=None, failed=False, offline=False,
99                call=None, node=None):
100     self.failed = failed
101     self.offline = offline
102     self.call = call
103     self.node = node
104     if offline:
105       self.failed = True
106       self.error = "Node is marked offline"
107       self.data = self.payload = None
108     elif failed:
109       self.error = data
110       self.data = self.payload = None
111     else:
112       self.data = data
113       self.error = None
114       if isinstance(data, (tuple, list)) and len(data) == 2:
115         self.payload = data[1]
116       else:
117         self.payload = None
118
119   def Raise(self):
120     """If the result has failed, raise an OpExecError.
121
122     This is used so that LU code doesn't have to check for each
123     result, but instead can call this function.
124
125     """
126     if self.failed:
127       raise errors.OpExecError("Call '%s' to node '%s' has failed: %s" %
128                                (self.call, self.node, self.error))
129
130   def RemoteFailMsg(self):
131     """Check if the remote procedure failed.
132
133     This is valid only for RPC calls which return result of the form
134     (status, data | error_msg).
135
136     @return: empty string for succcess, otherwise an error message
137
138     """
139     def _EnsureErr(val):
140       """Helper to ensure we return a 'True' value for error."""
141       if val:
142         return val
143       else:
144         return "No error information"
145
146     if self.failed:
147       return _EnsureErr(self.error)
148     if not isinstance(self.data, (tuple, list)):
149       return "Invalid result type (%s)" % type(self.data)
150     if len(self.data) != 2:
151       return "Invalid result length (%d), expected 2" % len(self.data)
152     if not self.data[0]:
153       return _EnsureErr(self.data[1])
154     return ""
155
156
157 class Client:
158   """RPC Client class.
159
160   This class, given a (remote) method name, a list of parameters and a
161   list of nodes, will contact (in parallel) all nodes, and return a
162   dict of results (key: node name, value: result).
163
164   One current bug is that generic failure is still signalled by
165   'False' result, which is not good. This overloading of values can
166   cause bugs.
167
168   """
169   def __init__(self, procedure, body, port):
170     self.procedure = procedure
171     self.body = body
172     self.port = port
173     self.nc = {}
174
175     self._ssl_params = \
176       http.HttpSslParams(ssl_key_path=constants.SSL_CERT_FILE,
177                          ssl_cert_path=constants.SSL_CERT_FILE)
178
179   def ConnectList(self, node_list, address_list=None):
180     """Add a list of nodes to the target nodes.
181
182     @type node_list: list
183     @param node_list: the list of node names to connect
184     @type address_list: list or None
185     @keyword address_list: either None or a list with node addresses,
186         which must have the same length as the node list
187
188     """
189     if address_list is None:
190       address_list = [None for _ in node_list]
191     else:
192       assert len(node_list) == len(address_list), \
193              "Name and address lists should have the same length"
194     for node, address in zip(node_list, address_list):
195       self.ConnectNode(node, address)
196
197   def ConnectNode(self, name, address=None):
198     """Add a node to the target list.
199
200     @type name: str
201     @param name: the node name
202     @type address: str
203     @keyword address: the node address, if known
204
205     """
206     if address is None:
207       address = name
208
209     self.nc[name] = \
210       http.client.HttpClientRequest(address, self.port, http.HTTP_PUT,
211                                     "/%s" % self.procedure,
212                                     post_data=self.body,
213                                     ssl_params=self._ssl_params,
214                                     ssl_verify_peer=True)
215
216   def GetResults(self):
217     """Call nodes and return results.
218
219     @rtype: list
220     @returns: List of RPC results
221
222     """
223     assert _http_manager, "RPC module not intialized"
224
225     _http_manager.ExecRequests(self.nc.values())
226
227     results = {}
228
229     for name, req in self.nc.iteritems():
230       if req.success and req.resp_status_code == http.HTTP_OK:
231         results[name] = RpcResult(data=serializer.LoadJson(req.resp_body),
232                                   node=name, call=self.procedure)
233         continue
234
235       # TODO: Better error reporting
236       if req.error:
237         msg = req.error
238       else:
239         msg = req.resp_body
240
241       logging.error("RPC error in %s from node %s: %s",
242                     self.procedure, name, msg)
243       results[name] = RpcResult(data=msg, failed=True, node=name,
244                                 call=self.procedure)
245
246     return results
247
248
249 class RpcRunner(object):
250   """RPC runner class"""
251
252   def __init__(self, cfg):
253     """Initialized the rpc runner.
254
255     @type cfg:  C{config.ConfigWriter}
256     @param cfg: the configuration object that will be used to get data
257                 about the cluster
258
259     """
260     self._cfg = cfg
261     self.port = utils.GetNodeDaemonPort()
262
263   def _InstDict(self, instance):
264     """Convert the given instance to a dict.
265
266     This is done via the instance's ToDict() method and additionally
267     we fill the hvparams with the cluster defaults.
268
269     @type instance: L{objects.Instance}
270     @param instance: an Instance object
271     @rtype: dict
272     @return: the instance dict, with the hvparams filled with the
273         cluster defaults
274
275     """
276     idict = instance.ToDict()
277     cluster = self._cfg.GetClusterInfo()
278     idict["hvparams"] = cluster.FillHV(instance)
279     idict["beparams"] = cluster.FillBE(instance)
280     return idict
281
282   def _ConnectList(self, client, node_list, call):
283     """Helper for computing node addresses.
284
285     @type client: L{Client}
286     @param client: a C{Client} instance
287     @type node_list: list
288     @param node_list: the node list we should connect
289     @type call: string
290     @param call: the name of the remote procedure call, for filling in
291         correctly any eventual offline nodes' results
292
293     """
294     all_nodes = self._cfg.GetAllNodesInfo()
295     name_list = []
296     addr_list = []
297     skip_dict = {}
298     for node in node_list:
299       if node in all_nodes:
300         if all_nodes[node].offline:
301           skip_dict[node] = RpcResult(node=node, offline=True, call=call)
302           continue
303         val = all_nodes[node].primary_ip
304       else:
305         val = None
306       addr_list.append(val)
307       name_list.append(node)
308     if name_list:
309       client.ConnectList(name_list, address_list=addr_list)
310     return skip_dict
311
312   def _ConnectNode(self, client, node, call):
313     """Helper for computing one node's address.
314
315     @type client: L{Client}
316     @param client: a C{Client} instance
317     @type node: str
318     @param node: the node we should connect
319     @type call: string
320     @param call: the name of the remote procedure call, for filling in
321         correctly any eventual offline nodes' results
322
323     """
324     node_info = self._cfg.GetNodeInfo(node)
325     if node_info is not None:
326       if node_info.offline:
327         return RpcResult(node=node, offline=True, call=call)
328       addr = node_info.primary_ip
329     else:
330       addr = None
331     client.ConnectNode(node, address=addr)
332
333   def _MultiNodeCall(self, node_list, procedure, args):
334     """Helper for making a multi-node call
335
336     """
337     body = serializer.DumpJson(args, indent=False)
338     c = Client(procedure, body, self.port)
339     skip_dict = self._ConnectList(c, node_list, procedure)
340     skip_dict.update(c.GetResults())
341     return skip_dict
342
343   @classmethod
344   def _StaticMultiNodeCall(cls, node_list, procedure, args,
345                            address_list=None):
346     """Helper for making a multi-node static call
347
348     """
349     body = serializer.DumpJson(args, indent=False)
350     c = Client(procedure, body, utils.GetNodeDaemonPort())
351     c.ConnectList(node_list, address_list=address_list)
352     return c.GetResults()
353
354   def _SingleNodeCall(self, node, procedure, args):
355     """Helper for making a single-node call
356
357     """
358     body = serializer.DumpJson(args, indent=False)
359     c = Client(procedure, body, self.port)
360     result = self._ConnectNode(c, node, procedure)
361     if result is None:
362       # we did connect, node is not offline
363       result = c.GetResults()[node]
364     return result
365
366   @classmethod
367   def _StaticSingleNodeCall(cls, node, procedure, args):
368     """Helper for making a single-node static call
369
370     """
371     body = serializer.DumpJson(args, indent=False)
372     c = Client(procedure, body, utils.GetNodeDaemonPort())
373     c.ConnectNode(node)
374     return c.GetResults()[node]
375
376   @staticmethod
377   def _Compress(data):
378     """Compresses a string for transport over RPC.
379
380     Small amounts of data are not compressed.
381
382     @type data: str
383     @param data: Data
384     @rtype: tuple
385     @return: Encoded data to send
386
387     """
388     # Small amounts of data are not compressed
389     if len(data) < 512:
390       return (constants.RPC_ENCODING_NONE, data)
391
392     # Compress with zlib and encode in base64
393     return (constants.RPC_ENCODING_ZLIB_BASE64,
394             base64.b64encode(zlib.compress(data, 3)))
395
396   #
397   # Begin RPC calls
398   #
399
400   def call_volume_list(self, node_list, vg_name):
401     """Gets the logical volumes present in a given volume group.
402
403     This is a multi-node call.
404
405     """
406     return self._MultiNodeCall(node_list, "volume_list", [vg_name])
407
408   def call_vg_list(self, node_list):
409     """Gets the volume group list.
410
411     This is a multi-node call.
412
413     """
414     return self._MultiNodeCall(node_list, "vg_list", [])
415
416   def call_bridges_exist(self, node, bridges_list):
417     """Checks if a node has all the bridges given.
418
419     This method checks if all bridges given in the bridges_list are
420     present on the remote node, so that an instance that uses interfaces
421     on those bridges can be started.
422
423     This is a single-node call.
424
425     """
426     return self._SingleNodeCall(node, "bridges_exist", [bridges_list])
427
428   def call_instance_start(self, node, instance, extra_args):
429     """Starts an instance.
430
431     This is a single-node call.
432
433     """
434     return self._SingleNodeCall(node, "instance_start",
435                                 [self._InstDict(instance), extra_args])
436
437   def call_instance_shutdown(self, node, instance):
438     """Stops an instance.
439
440     This is a single-node call.
441
442     """
443     return self._SingleNodeCall(node, "instance_shutdown",
444                                 [self._InstDict(instance)])
445
446   def call_migration_info(self, node, instance):
447     """Gather the information necessary to prepare an instance migration.
448
449     This is a single-node call.
450
451     @type node: string
452     @param node: the node on which the instance is currently running
453     @type instance: C{objects.Instance}
454     @param instance: the instance definition
455
456     """
457     return self._SingleNodeCall(node, "migration_info",
458                                 [self._InstDict(instance)])
459
460   def call_accept_instance(self, node, instance, info, target):
461     """Prepare a node to accept an instance.
462
463     This is a single-node call.
464
465     @type node: string
466     @param node: the target node for the migration
467     @type instance: C{objects.Instance}
468     @param instance: the instance definition
469     @type info: opaque/hypervisor specific (string/data)
470     @param info: result for the call_migration_info call
471     @type target: string
472     @param target: target hostname (usually ip address) (on the node itself)
473
474     """
475     return self._SingleNodeCall(node, "accept_instance",
476                                 [self._InstDict(instance), info, target])
477
478   def call_finalize_migration(self, node, instance, info, success):
479     """Finalize any target-node migration specific operation.
480
481     This is called both in case of a successful migration and in case of error
482     (in which case it should abort the migration).
483
484     This is a single-node call.
485
486     @type node: string
487     @param node: the target node for the migration
488     @type instance: C{objects.Instance}
489     @param instance: the instance definition
490     @type info: opaque/hypervisor specific (string/data)
491     @param info: result for the call_migration_info call
492     @type success: boolean
493     @param success: whether the migration was a success or a failure
494
495     """
496     return self._SingleNodeCall(node, "finalize_migration",
497                                 [self._InstDict(instance), info, success])
498
499   def call_instance_migrate(self, node, instance, target, live):
500     """Migrate an instance.
501
502     This is a single-node call.
503
504     @type node: string
505     @param node: the node on which the instance is currently running
506     @type instance: C{objects.Instance}
507     @param instance: the instance definition
508     @type target: string
509     @param target: the target node name
510     @type live: boolean
511     @param live: whether the migration should be done live or not (the
512         interpretation of this parameter is left to the hypervisor)
513
514     """
515     return self._SingleNodeCall(node, "instance_migrate",
516                                 [self._InstDict(instance), target, live])
517
518   def call_instance_reboot(self, node, instance, reboot_type, extra_args):
519     """Reboots an instance.
520
521     This is a single-node call.
522
523     """
524     return self._SingleNodeCall(node, "instance_reboot",
525                                 [self._InstDict(instance), reboot_type,
526                                  extra_args])
527
528   def call_instance_os_add(self, node, inst):
529     """Installs an OS on the given instance.
530
531     This is a single-node call.
532
533     """
534     return self._SingleNodeCall(node, "instance_os_add",
535                                 [self._InstDict(inst)])
536
537   def call_instance_run_rename(self, node, inst, old_name):
538     """Run the OS rename script for an instance.
539
540     This is a single-node call.
541
542     """
543     return self._SingleNodeCall(node, "instance_run_rename",
544                                 [self._InstDict(inst), old_name])
545
546   def call_instance_info(self, node, instance, hname):
547     """Returns information about a single instance.
548
549     This is a single-node call.
550
551     @type node: list
552     @param node: the list of nodes to query
553     @type instance: string
554     @param instance: the instance name
555     @type hname: string
556     @param hname: the hypervisor type of the instance
557
558     """
559     return self._SingleNodeCall(node, "instance_info", [instance, hname])
560
561   def call_instance_migratable(self, node, instance):
562     """Checks whether the given instance can be migrated.
563
564     This is a single-node call.
565
566     @param node: the node to query
567     @type instance: L{objects.Instance}
568     @param instance: the instance to check
569
570
571     """
572     return self._SingleNodeCall(node, "instance_migratable",
573                                 [self._InstDict(instance)])
574
575   def call_all_instances_info(self, node_list, hypervisor_list):
576     """Returns information about all instances on the given nodes.
577
578     This is a multi-node call.
579
580     @type node_list: list
581     @param node_list: the list of nodes to query
582     @type hypervisor_list: list
583     @param hypervisor_list: the hypervisors to query for instances
584
585     """
586     return self._MultiNodeCall(node_list, "all_instances_info",
587                                [hypervisor_list])
588
589   def call_instance_list(self, node_list, hypervisor_list):
590     """Returns the list of running instances on a given node.
591
592     This is a multi-node call.
593
594     @type node_list: list
595     @param node_list: the list of nodes to query
596     @type hypervisor_list: list
597     @param hypervisor_list: the hypervisors to query for instances
598
599     """
600     return self._MultiNodeCall(node_list, "instance_list", [hypervisor_list])
601
602   def call_node_tcp_ping(self, node, source, target, port, timeout,
603                          live_port_needed):
604     """Do a TcpPing on the remote node
605
606     This is a single-node call.
607
608     """
609     return self._SingleNodeCall(node, "node_tcp_ping",
610                                 [source, target, port, timeout,
611                                  live_port_needed])
612
613   def call_node_has_ip_address(self, node, address):
614     """Checks if a node has the given IP address.
615
616     This is a single-node call.
617
618     """
619     return self._SingleNodeCall(node, "node_has_ip_address", [address])
620
621   def call_node_info(self, node_list, vg_name, hypervisor_type):
622     """Return node information.
623
624     This will return memory information and volume group size and free
625     space.
626
627     This is a multi-node call.
628
629     @type node_list: list
630     @param node_list: the list of nodes to query
631     @type vg_name: C{string}
632     @param vg_name: the name of the volume group to ask for disk space
633         information
634     @type hypervisor_type: C{str}
635     @param hypervisor_type: the name of the hypervisor to ask for
636         memory information
637
638     """
639     retux = self._MultiNodeCall(node_list, "node_info",
640                                 [vg_name, hypervisor_type])
641
642     for result in retux.itervalues():
643       if result.failed or not isinstance(result.data, dict):
644         result.data = {}
645       if result.offline:
646         log_name = None
647       else:
648         log_name = "call_node_info"
649
650       utils.CheckDict(result.data, {
651         'memory_total' : '-',
652         'memory_dom0' : '-',
653         'memory_free' : '-',
654         'vg_size' : 'node_unreachable',
655         'vg_free' : '-',
656         }, log_name)
657     return retux
658
659   def call_node_add(self, node, dsa, dsapub, rsa, rsapub, ssh, sshpub):
660     """Add a node to the cluster.
661
662     This is a single-node call.
663
664     """
665     return self._SingleNodeCall(node, "node_add",
666                                 [dsa, dsapub, rsa, rsapub, ssh, sshpub])
667
668   def call_node_verify(self, node_list, checkdict, cluster_name):
669     """Request verification of given parameters.
670
671     This is a multi-node call.
672
673     """
674     return self._MultiNodeCall(node_list, "node_verify",
675                                [checkdict, cluster_name])
676
677   @classmethod
678   def call_node_start_master(cls, node, start_daemons):
679     """Tells a node to activate itself as a master.
680
681     This is a single-node call.
682
683     """
684     return cls._StaticSingleNodeCall(node, "node_start_master",
685                                      [start_daemons])
686
687   @classmethod
688   def call_node_stop_master(cls, node, stop_daemons):
689     """Tells a node to demote itself from master status.
690
691     This is a single-node call.
692
693     """
694     return cls._StaticSingleNodeCall(node, "node_stop_master", [stop_daemons])
695
696   @classmethod
697   def call_master_info(cls, node_list):
698     """Query master info.
699
700     This is a multi-node call.
701
702     """
703     # TODO: should this method query down nodes?
704     return cls._StaticMultiNodeCall(node_list, "master_info", [])
705
706   def call_version(self, node_list):
707     """Query node version.
708
709     This is a multi-node call.
710
711     """
712     return self._MultiNodeCall(node_list, "version", [])
713
714   def call_blockdev_create(self, node, bdev, size, owner, on_primary, info):
715     """Request creation of a given block device.
716
717     This is a single-node call.
718
719     """
720     return self._SingleNodeCall(node, "blockdev_create",
721                                 [bdev.ToDict(), size, owner, on_primary, info])
722
723   def call_blockdev_remove(self, node, bdev):
724     """Request removal of a given block device.
725
726     This is a single-node call.
727
728     """
729     return self._SingleNodeCall(node, "blockdev_remove", [bdev.ToDict()])
730
731   def call_blockdev_rename(self, node, devlist):
732     """Request rename of the given block devices.
733
734     This is a single-node call.
735
736     """
737     return self._SingleNodeCall(node, "blockdev_rename",
738                                 [(d.ToDict(), uid) for d, uid in devlist])
739
740   def call_blockdev_assemble(self, node, disk, owner, on_primary):
741     """Request assembling of a given block device.
742
743     This is a single-node call.
744
745     """
746     return self._SingleNodeCall(node, "blockdev_assemble",
747                                 [disk.ToDict(), owner, on_primary])
748
749   def call_blockdev_shutdown(self, node, disk):
750     """Request shutdown of a given block device.
751
752     This is a single-node call.
753
754     """
755     return self._SingleNodeCall(node, "blockdev_shutdown", [disk.ToDict()])
756
757   def call_blockdev_addchildren(self, node, bdev, ndevs):
758     """Request adding a list of children to a (mirroring) device.
759
760     This is a single-node call.
761
762     """
763     return self._SingleNodeCall(node, "blockdev_addchildren",
764                                 [bdev.ToDict(),
765                                  [disk.ToDict() for disk in ndevs]])
766
767   def call_blockdev_removechildren(self, node, bdev, ndevs):
768     """Request removing a list of children from a (mirroring) device.
769
770     This is a single-node call.
771
772     """
773     return self._SingleNodeCall(node, "blockdev_removechildren",
774                                 [bdev.ToDict(),
775                                  [disk.ToDict() for disk in ndevs]])
776
777   def call_blockdev_getmirrorstatus(self, node, disks):
778     """Request status of a (mirroring) device.
779
780     This is a single-node call.
781
782     """
783     return self._SingleNodeCall(node, "blockdev_getmirrorstatus",
784                                 [dsk.ToDict() for dsk in disks])
785
786   def call_blockdev_find(self, node, disk):
787     """Request identification of a given block device.
788
789     This is a single-node call.
790
791     """
792     return self._SingleNodeCall(node, "blockdev_find", [disk.ToDict()])
793
794   def call_blockdev_close(self, node, instance_name, disks):
795     """Closes the given block devices.
796
797     This is a single-node call.
798
799     """
800     params = [instance_name, [cf.ToDict() for cf in disks]]
801     return self._SingleNodeCall(node, "blockdev_close", params)
802
803   def call_drbd_disconnect_net(self, node_list, nodes_ip, disks):
804     """Disconnects the network of the given drbd devices.
805
806     This is a multi-node call.
807
808     """
809     return self._MultiNodeCall(node_list, "drbd_disconnect_net",
810                                [nodes_ip, [cf.ToDict() for cf in disks]])
811
812   def call_drbd_attach_net(self, node_list, nodes_ip,
813                            disks, instance_name, multimaster):
814     """Disconnects the given drbd devices.
815
816     This is a multi-node call.
817
818     """
819     return self._MultiNodeCall(node_list, "drbd_attach_net",
820                                [nodes_ip, [cf.ToDict() for cf in disks],
821                                 instance_name, multimaster])
822
823   def call_drbd_wait_sync(self, node_list, nodes_ip, disks):
824     """Waits for the synchronization of drbd devices is complete.
825
826     This is a multi-node call.
827
828     """
829     return self._MultiNodeCall(node_list, "drbd_wait_sync",
830                                [nodes_ip, [cf.ToDict() for cf in disks]])
831
832   @classmethod
833   def call_upload_file(cls, node_list, file_name, address_list=None):
834     """Upload a file.
835
836     The node will refuse the operation in case the file is not on the
837     approved file list.
838
839     This is a multi-node call.
840
841     @type node_list: list
842     @param node_list: the list of node names to upload to
843     @type file_name: str
844     @param file_name: the filename to upload
845     @type address_list: list or None
846     @keyword address_list: an optional list of node addresses, in order
847         to optimize the RPC speed
848
849     """
850     file_contents = utils.ReadFile(file_name)
851     data = cls._Compress(file_contents)
852     st = os.stat(file_name)
853     params = [file_name, data, st.st_mode, st.st_uid, st.st_gid,
854               st.st_atime, st.st_mtime]
855     return cls._StaticMultiNodeCall(node_list, "upload_file", params,
856                                     address_list=address_list)
857
858   @classmethod
859   def call_write_ssconf_files(cls, node_list, values):
860     """Write ssconf files.
861
862     This is a multi-node call.
863
864     """
865     return cls._StaticMultiNodeCall(node_list, "write_ssconf_files", [values])
866
867   def call_os_diagnose(self, node_list):
868     """Request a diagnose of OS definitions.
869
870     This is a multi-node call.
871
872     """
873     result = self._MultiNodeCall(node_list, "os_diagnose", [])
874
875     for node_result in result.values():
876       if not node_result.failed and node_result.data:
877         node_result.data = [objects.OS.FromDict(oss)
878                             for oss in node_result.data]
879     return result
880
881   def call_os_get(self, node, name):
882     """Returns an OS definition.
883
884     This is a single-node call.
885
886     """
887     result = self._SingleNodeCall(node, "os_get", [name])
888     if not result.failed and isinstance(result.data, dict):
889       result.data = objects.OS.FromDict(result.data)
890     return result
891
892   def call_hooks_runner(self, node_list, hpath, phase, env):
893     """Call the hooks runner.
894
895     Args:
896       - op: the OpCode instance
897       - env: a dictionary with the environment
898
899     This is a multi-node call.
900
901     """
902     params = [hpath, phase, env]
903     return self._MultiNodeCall(node_list, "hooks_runner", params)
904
905   def call_iallocator_runner(self, node, name, idata):
906     """Call an iallocator on a remote node
907
908     Args:
909       - name: the iallocator name
910       - input: the json-encoded input string
911
912     This is a single-node call.
913
914     """
915     return self._SingleNodeCall(node, "iallocator_runner", [name, idata])
916
917   def call_blockdev_grow(self, node, cf_bdev, amount):
918     """Request a snapshot of the given block device.
919
920     This is a single-node call.
921
922     """
923     return self._SingleNodeCall(node, "blockdev_grow",
924                                 [cf_bdev.ToDict(), amount])
925
926   def call_blockdev_snapshot(self, node, cf_bdev):
927     """Request a snapshot of the given block device.
928
929     This is a single-node call.
930
931     """
932     return self._SingleNodeCall(node, "blockdev_snapshot", [cf_bdev.ToDict()])
933
934   def call_snapshot_export(self, node, snap_bdev, dest_node, instance,
935                            cluster_name, idx):
936     """Request the export of a given snapshot.
937
938     This is a single-node call.
939
940     """
941     return self._SingleNodeCall(node, "snapshot_export",
942                                 [snap_bdev.ToDict(), dest_node,
943                                  self._InstDict(instance), cluster_name, idx])
944
945   def call_finalize_export(self, node, instance, snap_disks):
946     """Request the completion of an export operation.
947
948     This writes the export config file, etc.
949
950     This is a single-node call.
951
952     """
953     flat_disks = []
954     for disk in snap_disks:
955       flat_disks.append(disk.ToDict())
956
957     return self._SingleNodeCall(node, "finalize_export",
958                                 [self._InstDict(instance), flat_disks])
959
960   def call_export_info(self, node, path):
961     """Queries the export information in a given path.
962
963     This is a single-node call.
964
965     """
966     result = self._SingleNodeCall(node, "export_info", [path])
967     if not result.failed and result.data:
968       result.data = objects.SerializableConfigParser.Loads(str(result.data))
969     return result
970
971   def call_instance_os_import(self, node, inst, src_node, src_images,
972                               cluster_name):
973     """Request the import of a backup into an instance.
974
975     This is a single-node call.
976
977     """
978     return self._SingleNodeCall(node, "instance_os_import",
979                                 [self._InstDict(inst), src_node, src_images,
980                                  cluster_name])
981
982   def call_export_list(self, node_list):
983     """Gets the stored exports list.
984
985     This is a multi-node call.
986
987     """
988     return self._MultiNodeCall(node_list, "export_list", [])
989
990   def call_export_remove(self, node, export):
991     """Requests removal of a given export.
992
993     This is a single-node call.
994
995     """
996     return self._SingleNodeCall(node, "export_remove", [export])
997
998   @classmethod
999   def call_node_leave_cluster(cls, node):
1000     """Requests a node to clean the cluster information it has.
1001
1002     This will remove the configuration information from the ganeti data
1003     dir.
1004
1005     This is a single-node call.
1006
1007     """
1008     return cls._StaticSingleNodeCall(node, "node_leave_cluster", [])
1009
1010   def call_node_volumes(self, node_list):
1011     """Gets all volumes on node(s).
1012
1013     This is a multi-node call.
1014
1015     """
1016     return self._MultiNodeCall(node_list, "node_volumes", [])
1017
1018   def call_node_demote_from_mc(self, node):
1019     """Demote a node from the master candidate role.
1020
1021     This is a single-node call.
1022
1023     """
1024     return self._SingleNodeCall(node, "node_demote_from_mc", [])
1025
1026   def call_test_delay(self, node_list, duration):
1027     """Sleep for a fixed time on given node(s).
1028
1029     This is a multi-node call.
1030
1031     """
1032     return self._MultiNodeCall(node_list, "test_delay", [duration])
1033
1034   def call_file_storage_dir_create(self, node, file_storage_dir):
1035     """Create the given file storage directory.
1036
1037     This is a single-node call.
1038
1039     """
1040     return self._SingleNodeCall(node, "file_storage_dir_create",
1041                                 [file_storage_dir])
1042
1043   def call_file_storage_dir_remove(self, node, file_storage_dir):
1044     """Remove the given file storage directory.
1045
1046     This is a single-node call.
1047
1048     """
1049     return self._SingleNodeCall(node, "file_storage_dir_remove",
1050                                 [file_storage_dir])
1051
1052   def call_file_storage_dir_rename(self, node, old_file_storage_dir,
1053                                    new_file_storage_dir):
1054     """Rename file storage directory.
1055
1056     This is a single-node call.
1057
1058     """
1059     return self._SingleNodeCall(node, "file_storage_dir_rename",
1060                                 [old_file_storage_dir, new_file_storage_dir])
1061
1062   @classmethod
1063   def call_jobqueue_update(cls, node_list, address_list, file_name, content):
1064     """Update job queue.
1065
1066     This is a multi-node call.
1067
1068     """
1069     return cls._StaticMultiNodeCall(node_list, "jobqueue_update",
1070                                     [file_name, cls._Compress(content)],
1071                                     address_list=address_list)
1072
1073   @classmethod
1074   def call_jobqueue_purge(cls, node):
1075     """Purge job queue.
1076
1077     This is a single-node call.
1078
1079     """
1080     return cls._StaticSingleNodeCall(node, "jobqueue_purge", [])
1081
1082   @classmethod
1083   def call_jobqueue_rename(cls, node_list, address_list, rename):
1084     """Rename a job queue file.
1085
1086     This is a multi-node call.
1087
1088     """
1089     return cls._StaticMultiNodeCall(node_list, "jobqueue_rename", rename,
1090                                     address_list=address_list)
1091
1092   @classmethod
1093   def call_jobqueue_set_drain(cls, node_list, drain_flag):
1094     """Set the drain flag on the queue.
1095
1096     This is a multi-node call.
1097
1098     @type node_list: list
1099     @param node_list: the list of nodes to query
1100     @type drain_flag: bool
1101     @param drain_flag: if True, will set the drain flag, otherwise reset it.
1102
1103     """
1104     return cls._StaticMultiNodeCall(node_list, "jobqueue_set_drain",
1105                                     [drain_flag])
1106
1107   def call_hypervisor_validate_params(self, node_list, hvname, hvparams):
1108     """Validate the hypervisor params.
1109
1110     This is a multi-node call.
1111
1112     @type node_list: list
1113     @param node_list: the list of nodes to query
1114     @type hvname: string
1115     @param hvname: the hypervisor name
1116     @type hvparams: dict
1117     @param hvparams: the hypervisor parameters to be validated
1118
1119     """
1120     cluster = self._cfg.GetClusterInfo()
1121     hv_full = cluster.FillDict(cluster.hvparams.get(hvname, {}), hvparams)
1122     return self._MultiNodeCall(node_list, "hypervisor_validate_params",
1123                                [hvname, hv_full])