KVM: Abstract runtime file removal in a function
[ganeti-local] / lib / rpc.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Inter-node RPC library.
23
24 """
25
26 # pylint: disable-msg=C0103,R0201,R0904
27 # C0103: Invalid name, since call_ are not valid
28 # R0201: Method could be a function, we keep all rpcs instance methods
29 # as not to change them back and forth between static/instance methods
30 # if they need to start using instance attributes
31 # R0904: Too many public methods
32
33 import os
34 import socket
35 import logging
36 import zlib
37 import base64
38
39 from ganeti import utils
40 from ganeti import objects
41 from ganeti import http
42 from ganeti import serializer
43 from ganeti import constants
44 from ganeti import errors
45
46 import ganeti.http.client
47
48
49 # Module level variable
50 _http_manager = None
51
52
53 def Init():
54   """Initializes the module-global HTTP client manager.
55
56   Must be called before using any RPC function.
57
58   """
59   global _http_manager
60
61   assert not _http_manager, "RPC module initialized more than once"
62
63   _http_manager = http.client.HttpClientManager()
64
65
66 def Shutdown():
67   """Stops the module-global HTTP client manager.
68
69   Must be called before quitting the program.
70
71   """
72   global _http_manager
73
74   if _http_manager:
75     _http_manager.Shutdown()
76     _http_manager = None
77
78
79 class RpcResult(object):
80   """RPC Result class.
81
82   This class holds an RPC result. It is needed since in multi-node
83   calls we can't raise an exception just because one one out of many
84   failed, and therefore we use this class to encapsulate the result.
85
86   @ivar data: the data payload, for successfull results, or None
87   @type failed: boolean
88   @ivar failed: whether the operation failed at RPC level (not
89       application level on the remote node)
90   @ivar call: the name of the RPC call
91   @ivar node: the name of the node to which we made the call
92   @ivar offline: whether the operation failed because the node was
93       offline, as opposed to actual failure; offline=True will always
94       imply failed=True, in order to allow simpler checking if
95       the user doesn't care about the exact failure mode
96
97   """
98   def __init__(self, data=None, failed=False, offline=False,
99                call=None, node=None):
100     self.failed = failed
101     self.offline = offline
102     self.call = call
103     self.node = node
104     if offline:
105       self.failed = True
106       self.error = "Node is marked offline"
107       self.data = self.payload = None
108     elif failed:
109       self.error = data
110       self.data = self.payload = None
111     else:
112       self.data = data
113       self.error = None
114       if isinstance(data, (tuple, list)) and len(data) == 2:
115         self.payload = data[1]
116       else:
117         self.payload = None
118
119   def Raise(self):
120     """If the result has failed, raise an OpExecError.
121
122     This is used so that LU code doesn't have to check for each
123     result, but instead can call this function.
124
125     """
126     if self.failed:
127       raise errors.OpExecError("Call '%s' to node '%s' has failed: %s" %
128                                (self.call, self.node, self.error))
129
130   def RemoteFailMsg(self):
131     """Check if the remote procedure failed.
132
133     This is valid only for RPC calls which return result of the form
134     (status, data | error_msg).
135
136     @return: empty string for succcess, otherwise an error message
137
138     """
139     def _EnsureErr(val):
140       """Helper to ensure we return a 'True' value for error."""
141       if val:
142         return val
143       else:
144         return "No error information"
145
146     if self.failed:
147       return _EnsureErr(self.error)
148     if not isinstance(self.data, (tuple, list)):
149       return "Invalid result type (%s)" % type(self.data)
150     if len(self.data) != 2:
151       return "Invalid result length (%d), expected 2" % len(self.data)
152     if not self.data[0]:
153       return _EnsureErr(self.data[1])
154     return ""
155
156
157 class Client:
158   """RPC Client class.
159
160   This class, given a (remote) method name, a list of parameters and a
161   list of nodes, will contact (in parallel) all nodes, and return a
162   dict of results (key: node name, value: result).
163
164   One current bug is that generic failure is still signalled by
165   'False' result, which is not good. This overloading of values can
166   cause bugs.
167
168   """
169   def __init__(self, procedure, body, port):
170     self.procedure = procedure
171     self.body = body
172     self.port = port
173     self.nc = {}
174
175     self._ssl_params = \
176       http.HttpSslParams(ssl_key_path=constants.SSL_CERT_FILE,
177                          ssl_cert_path=constants.SSL_CERT_FILE)
178
179   def ConnectList(self, node_list, address_list=None):
180     """Add a list of nodes to the target nodes.
181
182     @type node_list: list
183     @param node_list: the list of node names to connect
184     @type address_list: list or None
185     @keyword address_list: either None or a list with node addresses,
186         which must have the same length as the node list
187
188     """
189     if address_list is None:
190       address_list = [None for _ in node_list]
191     else:
192       assert len(node_list) == len(address_list), \
193              "Name and address lists should have the same length"
194     for node, address in zip(node_list, address_list):
195       self.ConnectNode(node, address)
196
197   def ConnectNode(self, name, address=None):
198     """Add a node to the target list.
199
200     @type name: str
201     @param name: the node name
202     @type address: str
203     @keyword address: the node address, if known
204
205     """
206     if address is None:
207       address = name
208
209     self.nc[name] = \
210       http.client.HttpClientRequest(address, self.port, http.HTTP_PUT,
211                                     "/%s" % self.procedure,
212                                     post_data=self.body,
213                                     ssl_params=self._ssl_params,
214                                     ssl_verify_peer=True)
215
216   def GetResults(self):
217     """Call nodes and return results.
218
219     @rtype: list
220     @return: List of RPC results
221
222     """
223     assert _http_manager, "RPC module not intialized"
224
225     _http_manager.ExecRequests(self.nc.values())
226
227     results = {}
228
229     for name, req in self.nc.iteritems():
230       if req.success and req.resp_status_code == http.HTTP_OK:
231         results[name] = RpcResult(data=serializer.LoadJson(req.resp_body),
232                                   node=name, call=self.procedure)
233         continue
234
235       # TODO: Better error reporting
236       if req.error:
237         msg = req.error
238       else:
239         msg = req.resp_body
240
241       logging.error("RPC error in %s from node %s: %s",
242                     self.procedure, name, msg)
243       results[name] = RpcResult(data=msg, failed=True, node=name,
244                                 call=self.procedure)
245
246     return results
247
248
249 class RpcRunner(object):
250   """RPC runner class"""
251
252   def __init__(self, cfg):
253     """Initialized the rpc runner.
254
255     @type cfg:  C{config.ConfigWriter}
256     @param cfg: the configuration object that will be used to get data
257                 about the cluster
258
259     """
260     self._cfg = cfg
261     self.port = utils.GetNodeDaemonPort()
262
263   def _InstDict(self, instance):
264     """Convert the given instance to a dict.
265
266     This is done via the instance's ToDict() method and additionally
267     we fill the hvparams with the cluster defaults.
268
269     @type instance: L{objects.Instance}
270     @param instance: an Instance object
271     @rtype: dict
272     @return: the instance dict, with the hvparams filled with the
273         cluster defaults
274
275     """
276     idict = instance.ToDict()
277     cluster = self._cfg.GetClusterInfo()
278     idict["hvparams"] = cluster.FillHV(instance)
279     idict["beparams"] = cluster.FillBE(instance)
280     return idict
281
282   def _ConnectList(self, client, node_list, call):
283     """Helper for computing node addresses.
284
285     @type client: L{Client}
286     @param client: a C{Client} instance
287     @type node_list: list
288     @param node_list: the node list we should connect
289     @type call: string
290     @param call: the name of the remote procedure call, for filling in
291         correctly any eventual offline nodes' results
292
293     """
294     all_nodes = self._cfg.GetAllNodesInfo()
295     name_list = []
296     addr_list = []
297     skip_dict = {}
298     for node in node_list:
299       if node in all_nodes:
300         if all_nodes[node].offline:
301           skip_dict[node] = RpcResult(node=node, offline=True, call=call)
302           continue
303         val = all_nodes[node].primary_ip
304       else:
305         val = None
306       addr_list.append(val)
307       name_list.append(node)
308     if name_list:
309       client.ConnectList(name_list, address_list=addr_list)
310     return skip_dict
311
312   def _ConnectNode(self, client, node, call):
313     """Helper for computing one node's address.
314
315     @type client: L{Client}
316     @param client: a C{Client} instance
317     @type node: str
318     @param node: the node we should connect
319     @type call: string
320     @param call: the name of the remote procedure call, for filling in
321         correctly any eventual offline nodes' results
322
323     """
324     node_info = self._cfg.GetNodeInfo(node)
325     if node_info is not None:
326       if node_info.offline:
327         return RpcResult(node=node, offline=True, call=call)
328       addr = node_info.primary_ip
329     else:
330       addr = None
331     client.ConnectNode(node, address=addr)
332
333   def _MultiNodeCall(self, node_list, procedure, args):
334     """Helper for making a multi-node call
335
336     """
337     body = serializer.DumpJson(args, indent=False)
338     c = Client(procedure, body, self.port)
339     skip_dict = self._ConnectList(c, node_list, procedure)
340     skip_dict.update(c.GetResults())
341     return skip_dict
342
343   @classmethod
344   def _StaticMultiNodeCall(cls, node_list, procedure, args,
345                            address_list=None):
346     """Helper for making a multi-node static call
347
348     """
349     body = serializer.DumpJson(args, indent=False)
350     c = Client(procedure, body, utils.GetNodeDaemonPort())
351     c.ConnectList(node_list, address_list=address_list)
352     return c.GetResults()
353
354   def _SingleNodeCall(self, node, procedure, args):
355     """Helper for making a single-node call
356
357     """
358     body = serializer.DumpJson(args, indent=False)
359     c = Client(procedure, body, self.port)
360     result = self._ConnectNode(c, node, procedure)
361     if result is None:
362       # we did connect, node is not offline
363       result = c.GetResults()[node]
364     return result
365
366   @classmethod
367   def _StaticSingleNodeCall(cls, node, procedure, args):
368     """Helper for making a single-node static call
369
370     """
371     body = serializer.DumpJson(args, indent=False)
372     c = Client(procedure, body, utils.GetNodeDaemonPort())
373     c.ConnectNode(node)
374     return c.GetResults()[node]
375
376   @staticmethod
377   def _Compress(data):
378     """Compresses a string for transport over RPC.
379
380     Small amounts of data are not compressed.
381
382     @type data: str
383     @param data: Data
384     @rtype: tuple
385     @return: Encoded data to send
386
387     """
388     # Small amounts of data are not compressed
389     if len(data) < 512:
390       return (constants.RPC_ENCODING_NONE, data)
391
392     # Compress with zlib and encode in base64
393     return (constants.RPC_ENCODING_ZLIB_BASE64,
394             base64.b64encode(zlib.compress(data, 3)))
395
396   #
397   # Begin RPC calls
398   #
399
400   def call_volume_list(self, node_list, vg_name):
401     """Gets the logical volumes present in a given volume group.
402
403     This is a multi-node call.
404
405     """
406     return self._MultiNodeCall(node_list, "volume_list", [vg_name])
407
408   def call_vg_list(self, node_list):
409     """Gets the volume group list.
410
411     This is a multi-node call.
412
413     """
414     return self._MultiNodeCall(node_list, "vg_list", [])
415
416   def call_bridges_exist(self, node, bridges_list):
417     """Checks if a node has all the bridges given.
418
419     This method checks if all bridges given in the bridges_list are
420     present on the remote node, so that an instance that uses interfaces
421     on those bridges can be started.
422
423     This is a single-node call.
424
425     """
426     return self._SingleNodeCall(node, "bridges_exist", [bridges_list])
427
428   def call_instance_start(self, node, instance):
429     """Starts an instance.
430
431     This is a single-node call.
432
433     """
434     return self._SingleNodeCall(node, "instance_start",
435                                 [self._InstDict(instance)])
436
437   def call_instance_shutdown(self, node, instance):
438     """Stops an instance.
439
440     This is a single-node call.
441
442     """
443     return self._SingleNodeCall(node, "instance_shutdown",
444                                 [self._InstDict(instance)])
445
446   def call_migration_info(self, node, instance):
447     """Gather the information necessary to prepare an instance migration.
448
449     This is a single-node call.
450
451     @type node: string
452     @param node: the node on which the instance is currently running
453     @type instance: C{objects.Instance}
454     @param instance: the instance definition
455
456     """
457     return self._SingleNodeCall(node, "migration_info",
458                                 [self._InstDict(instance)])
459
460   def call_accept_instance(self, node, instance, info, target):
461     """Prepare a node to accept an instance.
462
463     This is a single-node call.
464
465     @type node: string
466     @param node: the target node for the migration
467     @type instance: C{objects.Instance}
468     @param instance: the instance definition
469     @type info: opaque/hypervisor specific (string/data)
470     @param info: result for the call_migration_info call
471     @type target: string
472     @param target: target hostname (usually ip address) (on the node itself)
473
474     """
475     return self._SingleNodeCall(node, "accept_instance",
476                                 [self._InstDict(instance), info, target])
477
478   def call_finalize_migration(self, node, instance, info, success):
479     """Finalize any target-node migration specific operation.
480
481     This is called both in case of a successful migration and in case of error
482     (in which case it should abort the migration).
483
484     This is a single-node call.
485
486     @type node: string
487     @param node: the target node for the migration
488     @type instance: C{objects.Instance}
489     @param instance: the instance definition
490     @type info: opaque/hypervisor specific (string/data)
491     @param info: result for the call_migration_info call
492     @type success: boolean
493     @param success: whether the migration was a success or a failure
494
495     """
496     return self._SingleNodeCall(node, "finalize_migration",
497                                 [self._InstDict(instance), info, success])
498
499   def call_instance_migrate(self, node, instance, target, live):
500     """Migrate an instance.
501
502     This is a single-node call.
503
504     @type node: string
505     @param node: the node on which the instance is currently running
506     @type instance: C{objects.Instance}
507     @param instance: the instance definition
508     @type target: string
509     @param target: the target node name
510     @type live: boolean
511     @param live: whether the migration should be done live or not (the
512         interpretation of this parameter is left to the hypervisor)
513
514     """
515     return self._SingleNodeCall(node, "instance_migrate",
516                                 [self._InstDict(instance), target, live])
517
518   def call_instance_reboot(self, node, instance, reboot_type):
519     """Reboots an instance.
520
521     This is a single-node call.
522
523     """
524     return self._SingleNodeCall(node, "instance_reboot",
525                                 [self._InstDict(instance), reboot_type])
526
527   def call_instance_os_add(self, node, inst, reinstall):
528     """Installs an OS on the given instance.
529
530     This is a single-node call.
531
532     """
533     return self._SingleNodeCall(node, "instance_os_add",
534                                 [self._InstDict(inst), reinstall])
535
536   def call_instance_run_rename(self, node, inst, old_name):
537     """Run the OS rename script for an instance.
538
539     This is a single-node call.
540
541     """
542     return self._SingleNodeCall(node, "instance_run_rename",
543                                 [self._InstDict(inst), old_name])
544
545   def call_instance_info(self, node, instance, hname):
546     """Returns information about a single instance.
547
548     This is a single-node call.
549
550     @type node: list
551     @param node: the list of nodes to query
552     @type instance: string
553     @param instance: the instance name
554     @type hname: string
555     @param hname: the hypervisor type of the instance
556
557     """
558     return self._SingleNodeCall(node, "instance_info", [instance, hname])
559
560   def call_instance_migratable(self, node, instance):
561     """Checks whether the given instance can be migrated.
562
563     This is a single-node call.
564
565     @param node: the node to query
566     @type instance: L{objects.Instance}
567     @param instance: the instance to check
568
569
570     """
571     return self._SingleNodeCall(node, "instance_migratable",
572                                 [self._InstDict(instance)])
573
574   def call_all_instances_info(self, node_list, hypervisor_list):
575     """Returns information about all instances on the given nodes.
576
577     This is a multi-node call.
578
579     @type node_list: list
580     @param node_list: the list of nodes to query
581     @type hypervisor_list: list
582     @param hypervisor_list: the hypervisors to query for instances
583
584     """
585     return self._MultiNodeCall(node_list, "all_instances_info",
586                                [hypervisor_list])
587
588   def call_instance_list(self, node_list, hypervisor_list):
589     """Returns the list of running instances on a given node.
590
591     This is a multi-node call.
592
593     @type node_list: list
594     @param node_list: the list of nodes to query
595     @type hypervisor_list: list
596     @param hypervisor_list: the hypervisors to query for instances
597
598     """
599     return self._MultiNodeCall(node_list, "instance_list", [hypervisor_list])
600
601   def call_node_tcp_ping(self, node, source, target, port, timeout,
602                          live_port_needed):
603     """Do a TcpPing on the remote node
604
605     This is a single-node call.
606
607     """
608     return self._SingleNodeCall(node, "node_tcp_ping",
609                                 [source, target, port, timeout,
610                                  live_port_needed])
611
612   def call_node_has_ip_address(self, node, address):
613     """Checks if a node has the given IP address.
614
615     This is a single-node call.
616
617     """
618     return self._SingleNodeCall(node, "node_has_ip_address", [address])
619
620   def call_node_info(self, node_list, vg_name, hypervisor_type):
621     """Return node information.
622
623     This will return memory information and volume group size and free
624     space.
625
626     This is a multi-node call.
627
628     @type node_list: list
629     @param node_list: the list of nodes to query
630     @type vg_name: C{string}
631     @param vg_name: the name of the volume group to ask for disk space
632         information
633     @type hypervisor_type: C{str}
634     @param hypervisor_type: the name of the hypervisor to ask for
635         memory information
636
637     """
638     retux = self._MultiNodeCall(node_list, "node_info",
639                                 [vg_name, hypervisor_type])
640
641     for result in retux.itervalues():
642       if result.failed or not isinstance(result.data, dict):
643         result.data = {}
644       if result.offline:
645         log_name = None
646       else:
647         log_name = "call_node_info"
648
649       utils.CheckDict(result.data, {
650         'memory_total' : '-',
651         'memory_dom0' : '-',
652         'memory_free' : '-',
653         'vg_size' : 'node_unreachable',
654         'vg_free' : '-',
655         }, log_name)
656     return retux
657
658   def call_node_add(self, node, dsa, dsapub, rsa, rsapub, ssh, sshpub):
659     """Add a node to the cluster.
660
661     This is a single-node call.
662
663     """
664     return self._SingleNodeCall(node, "node_add",
665                                 [dsa, dsapub, rsa, rsapub, ssh, sshpub])
666
667   def call_node_verify(self, node_list, checkdict, cluster_name):
668     """Request verification of given parameters.
669
670     This is a multi-node call.
671
672     """
673     return self._MultiNodeCall(node_list, "node_verify",
674                                [checkdict, cluster_name])
675
676   @classmethod
677   def call_node_start_master(cls, node, start_daemons):
678     """Tells a node to activate itself as a master.
679
680     This is a single-node call.
681
682     """
683     return cls._StaticSingleNodeCall(node, "node_start_master",
684                                      [start_daemons])
685
686   @classmethod
687   def call_node_stop_master(cls, node, stop_daemons):
688     """Tells a node to demote itself from master status.
689
690     This is a single-node call.
691
692     """
693     return cls._StaticSingleNodeCall(node, "node_stop_master", [stop_daemons])
694
695   @classmethod
696   def call_master_info(cls, node_list):
697     """Query master info.
698
699     This is a multi-node call.
700
701     """
702     # TODO: should this method query down nodes?
703     return cls._StaticMultiNodeCall(node_list, "master_info", [])
704
705   def call_version(self, node_list):
706     """Query node version.
707
708     This is a multi-node call.
709
710     """
711     return self._MultiNodeCall(node_list, "version", [])
712
713   def call_blockdev_create(self, node, bdev, size, owner, on_primary, info):
714     """Request creation of a given block device.
715
716     This is a single-node call.
717
718     """
719     return self._SingleNodeCall(node, "blockdev_create",
720                                 [bdev.ToDict(), size, owner, on_primary, info])
721
722   def call_blockdev_remove(self, node, bdev):
723     """Request removal of a given block device.
724
725     This is a single-node call.
726
727     """
728     return self._SingleNodeCall(node, "blockdev_remove", [bdev.ToDict()])
729
730   def call_blockdev_rename(self, node, devlist):
731     """Request rename of the given block devices.
732
733     This is a single-node call.
734
735     """
736     return self._SingleNodeCall(node, "blockdev_rename",
737                                 [(d.ToDict(), uid) for d, uid in devlist])
738
739   def call_blockdev_assemble(self, node, disk, owner, on_primary):
740     """Request assembling of a given block device.
741
742     This is a single-node call.
743
744     """
745     return self._SingleNodeCall(node, "blockdev_assemble",
746                                 [disk.ToDict(), owner, on_primary])
747
748   def call_blockdev_shutdown(self, node, disk):
749     """Request shutdown of a given block device.
750
751     This is a single-node call.
752
753     """
754     return self._SingleNodeCall(node, "blockdev_shutdown", [disk.ToDict()])
755
756   def call_blockdev_addchildren(self, node, bdev, ndevs):
757     """Request adding a list of children to a (mirroring) device.
758
759     This is a single-node call.
760
761     """
762     return self._SingleNodeCall(node, "blockdev_addchildren",
763                                 [bdev.ToDict(),
764                                  [disk.ToDict() for disk in ndevs]])
765
766   def call_blockdev_removechildren(self, node, bdev, ndevs):
767     """Request removing a list of children from a (mirroring) device.
768
769     This is a single-node call.
770
771     """
772     return self._SingleNodeCall(node, "blockdev_removechildren",
773                                 [bdev.ToDict(),
774                                  [disk.ToDict() for disk in ndevs]])
775
776   def call_blockdev_getmirrorstatus(self, node, disks):
777     """Request status of a (mirroring) device.
778
779     This is a single-node call.
780
781     """
782     return self._SingleNodeCall(node, "blockdev_getmirrorstatus",
783                                 [dsk.ToDict() for dsk in disks])
784
785   def call_blockdev_find(self, node, disk):
786     """Request identification of a given block device.
787
788     This is a single-node call.
789
790     """
791     return self._SingleNodeCall(node, "blockdev_find", [disk.ToDict()])
792
793   def call_blockdev_close(self, node, instance_name, disks):
794     """Closes the given block devices.
795
796     This is a single-node call.
797
798     """
799     params = [instance_name, [cf.ToDict() for cf in disks]]
800     return self._SingleNodeCall(node, "blockdev_close", params)
801
802   def call_drbd_disconnect_net(self, node_list, nodes_ip, disks):
803     """Disconnects the network of the given drbd devices.
804
805     This is a multi-node call.
806
807     """
808     return self._MultiNodeCall(node_list, "drbd_disconnect_net",
809                                [nodes_ip, [cf.ToDict() for cf in disks]])
810
811   def call_drbd_attach_net(self, node_list, nodes_ip,
812                            disks, instance_name, multimaster):
813     """Disconnects the given drbd devices.
814
815     This is a multi-node call.
816
817     """
818     return self._MultiNodeCall(node_list, "drbd_attach_net",
819                                [nodes_ip, [cf.ToDict() for cf in disks],
820                                 instance_name, multimaster])
821
822   def call_drbd_wait_sync(self, node_list, nodes_ip, disks):
823     """Waits for the synchronization of drbd devices is complete.
824
825     This is a multi-node call.
826
827     """
828     return self._MultiNodeCall(node_list, "drbd_wait_sync",
829                                [nodes_ip, [cf.ToDict() for cf in disks]])
830
831   @classmethod
832   def call_upload_file(cls, node_list, file_name, address_list=None):
833     """Upload a file.
834
835     The node will refuse the operation in case the file is not on the
836     approved file list.
837
838     This is a multi-node call.
839
840     @type node_list: list
841     @param node_list: the list of node names to upload to
842     @type file_name: str
843     @param file_name: the filename to upload
844     @type address_list: list or None
845     @keyword address_list: an optional list of node addresses, in order
846         to optimize the RPC speed
847
848     """
849     file_contents = utils.ReadFile(file_name)
850     data = cls._Compress(file_contents)
851     st = os.stat(file_name)
852     params = [file_name, data, st.st_mode, st.st_uid, st.st_gid,
853               st.st_atime, st.st_mtime]
854     return cls._StaticMultiNodeCall(node_list, "upload_file", params,
855                                     address_list=address_list)
856
857   @classmethod
858   def call_write_ssconf_files(cls, node_list, values):
859     """Write ssconf files.
860
861     This is a multi-node call.
862
863     """
864     return cls._StaticMultiNodeCall(node_list, "write_ssconf_files", [values])
865
866   def call_os_diagnose(self, node_list):
867     """Request a diagnose of OS definitions.
868
869     This is a multi-node call.
870
871     """
872     result = self._MultiNodeCall(node_list, "os_diagnose", [])
873
874     for node_result in result.values():
875       if not node_result.failed and node_result.data:
876         node_result.data = [objects.OS.FromDict(oss)
877                             for oss in node_result.data]
878     return result
879
880   def call_os_get(self, node, name):
881     """Returns an OS definition.
882
883     This is a single-node call.
884
885     """
886     result = self._SingleNodeCall(node, "os_get", [name])
887     if not result.failed and isinstance(result.data, dict):
888       result.data = objects.OS.FromDict(result.data)
889     return result
890
891   def call_hooks_runner(self, node_list, hpath, phase, env):
892     """Call the hooks runner.
893
894     Args:
895       - op: the OpCode instance
896       - env: a dictionary with the environment
897
898     This is a multi-node call.
899
900     """
901     params = [hpath, phase, env]
902     return self._MultiNodeCall(node_list, "hooks_runner", params)
903
904   def call_iallocator_runner(self, node, name, idata):
905     """Call an iallocator on a remote node
906
907     Args:
908       - name: the iallocator name
909       - input: the json-encoded input string
910
911     This is a single-node call.
912
913     """
914     return self._SingleNodeCall(node, "iallocator_runner", [name, idata])
915
916   def call_blockdev_grow(self, node, cf_bdev, amount):
917     """Request a snapshot of the given block device.
918
919     This is a single-node call.
920
921     """
922     return self._SingleNodeCall(node, "blockdev_grow",
923                                 [cf_bdev.ToDict(), amount])
924
925   def call_blockdev_snapshot(self, node, cf_bdev):
926     """Request a snapshot of the given block device.
927
928     This is a single-node call.
929
930     """
931     return self._SingleNodeCall(node, "blockdev_snapshot", [cf_bdev.ToDict()])
932
933   def call_snapshot_export(self, node, snap_bdev, dest_node, instance,
934                            cluster_name, idx):
935     """Request the export of a given snapshot.
936
937     This is a single-node call.
938
939     """
940     return self._SingleNodeCall(node, "snapshot_export",
941                                 [snap_bdev.ToDict(), dest_node,
942                                  self._InstDict(instance), cluster_name, idx])
943
944   def call_finalize_export(self, node, instance, snap_disks):
945     """Request the completion of an export operation.
946
947     This writes the export config file, etc.
948
949     This is a single-node call.
950
951     """
952     flat_disks = []
953     for disk in snap_disks:
954       flat_disks.append(disk.ToDict())
955
956     return self._SingleNodeCall(node, "finalize_export",
957                                 [self._InstDict(instance), flat_disks])
958
959   def call_export_info(self, node, path):
960     """Queries the export information in a given path.
961
962     This is a single-node call.
963
964     """
965     result = self._SingleNodeCall(node, "export_info", [path])
966     if not result.failed and result.data:
967       result.data = objects.SerializableConfigParser.Loads(str(result.data))
968     return result
969
970   def call_instance_os_import(self, node, inst, src_node, src_images,
971                               cluster_name):
972     """Request the import of a backup into an instance.
973
974     This is a single-node call.
975
976     """
977     return self._SingleNodeCall(node, "instance_os_import",
978                                 [self._InstDict(inst), src_node, src_images,
979                                  cluster_name])
980
981   def call_export_list(self, node_list):
982     """Gets the stored exports list.
983
984     This is a multi-node call.
985
986     """
987     return self._MultiNodeCall(node_list, "export_list", [])
988
989   def call_export_remove(self, node, export):
990     """Requests removal of a given export.
991
992     This is a single-node call.
993
994     """
995     return self._SingleNodeCall(node, "export_remove", [export])
996
997   @classmethod
998   def call_node_leave_cluster(cls, node):
999     """Requests a node to clean the cluster information it has.
1000
1001     This will remove the configuration information from the ganeti data
1002     dir.
1003
1004     This is a single-node call.
1005
1006     """
1007     return cls._StaticSingleNodeCall(node, "node_leave_cluster", [])
1008
1009   def call_node_volumes(self, node_list):
1010     """Gets all volumes on node(s).
1011
1012     This is a multi-node call.
1013
1014     """
1015     return self._MultiNodeCall(node_list, "node_volumes", [])
1016
1017   def call_node_demote_from_mc(self, node):
1018     """Demote a node from the master candidate role.
1019
1020     This is a single-node call.
1021
1022     """
1023     return self._SingleNodeCall(node, "node_demote_from_mc", [])
1024
1025   def call_test_delay(self, node_list, duration):
1026     """Sleep for a fixed time on given node(s).
1027
1028     This is a multi-node call.
1029
1030     """
1031     return self._MultiNodeCall(node_list, "test_delay", [duration])
1032
1033   def call_file_storage_dir_create(self, node, file_storage_dir):
1034     """Create the given file storage directory.
1035
1036     This is a single-node call.
1037
1038     """
1039     return self._SingleNodeCall(node, "file_storage_dir_create",
1040                                 [file_storage_dir])
1041
1042   def call_file_storage_dir_remove(self, node, file_storage_dir):
1043     """Remove the given file storage directory.
1044
1045     This is a single-node call.
1046
1047     """
1048     return self._SingleNodeCall(node, "file_storage_dir_remove",
1049                                 [file_storage_dir])
1050
1051   def call_file_storage_dir_rename(self, node, old_file_storage_dir,
1052                                    new_file_storage_dir):
1053     """Rename file storage directory.
1054
1055     This is a single-node call.
1056
1057     """
1058     return self._SingleNodeCall(node, "file_storage_dir_rename",
1059                                 [old_file_storage_dir, new_file_storage_dir])
1060
1061   @classmethod
1062   def call_jobqueue_update(cls, node_list, address_list, file_name, content):
1063     """Update job queue.
1064
1065     This is a multi-node call.
1066
1067     """
1068     return cls._StaticMultiNodeCall(node_list, "jobqueue_update",
1069                                     [file_name, cls._Compress(content)],
1070                                     address_list=address_list)
1071
1072   @classmethod
1073   def call_jobqueue_purge(cls, node):
1074     """Purge job queue.
1075
1076     This is a single-node call.
1077
1078     """
1079     return cls._StaticSingleNodeCall(node, "jobqueue_purge", [])
1080
1081   @classmethod
1082   def call_jobqueue_rename(cls, node_list, address_list, rename):
1083     """Rename a job queue file.
1084
1085     This is a multi-node call.
1086
1087     """
1088     return cls._StaticMultiNodeCall(node_list, "jobqueue_rename", rename,
1089                                     address_list=address_list)
1090
1091   @classmethod
1092   def call_jobqueue_set_drain(cls, node_list, drain_flag):
1093     """Set the drain flag on the queue.
1094
1095     This is a multi-node call.
1096
1097     @type node_list: list
1098     @param node_list: the list of nodes to query
1099     @type drain_flag: bool
1100     @param drain_flag: if True, will set the drain flag, otherwise reset it.
1101
1102     """
1103     return cls._StaticMultiNodeCall(node_list, "jobqueue_set_drain",
1104                                     [drain_flag])
1105
1106   def call_hypervisor_validate_params(self, node_list, hvname, hvparams):
1107     """Validate the hypervisor params.
1108
1109     This is a multi-node call.
1110
1111     @type node_list: list
1112     @param node_list: the list of nodes to query
1113     @type hvname: string
1114     @param hvname: the hypervisor name
1115     @type hvparams: dict
1116     @param hvparams: the hypervisor parameters to be validated
1117
1118     """
1119     cluster = self._cfg.GetClusterInfo()
1120     hv_full = cluster.FillDict(cluster.hvparams.get(hvname, {}), hvparams)
1121     return self._MultiNodeCall(node_list, "hypervisor_validate_params",
1122                                [hvname, hv_full])