call_instance_start: add optional hv/be parameters
[ganeti-local] / lib / rpc.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Inter-node RPC library.
23
24 """
25
26 # pylint: disable-msg=C0103,R0201,R0904
27 # C0103: Invalid name, since call_ are not valid
28 # R0201: Method could be a function, we keep all rpcs instance methods
29 # as not to change them back and forth between static/instance methods
30 # if they need to start using instance attributes
31 # R0904: Too many public methods
32
33 import os
34 import socket
35 import logging
36 import zlib
37 import base64
38
39 from ganeti import utils
40 from ganeti import objects
41 from ganeti import http
42 from ganeti import serializer
43 from ganeti import constants
44 from ganeti import errors
45
46 import ganeti.http.client
47
48
49 # Module level variable
50 _http_manager = None
51
52
53 def Init():
54   """Initializes the module-global HTTP client manager.
55
56   Must be called before using any RPC function.
57
58   """
59   global _http_manager
60
61   assert not _http_manager, "RPC module initialized more than once"
62
63   _http_manager = http.client.HttpClientManager()
64
65
66 def Shutdown():
67   """Stops the module-global HTTP client manager.
68
69   Must be called before quitting the program.
70
71   """
72   global _http_manager
73
74   if _http_manager:
75     _http_manager.Shutdown()
76     _http_manager = None
77
78
79 class RpcResult(object):
80   """RPC Result class.
81
82   This class holds an RPC result. It is needed since in multi-node
83   calls we can't raise an exception just because one one out of many
84   failed, and therefore we use this class to encapsulate the result.
85
86   @ivar data: the data payload, for successfull results, or None
87   @type failed: boolean
88   @ivar failed: whether the operation failed at RPC level (not
89       application level on the remote node)
90   @ivar call: the name of the RPC call
91   @ivar node: the name of the node to which we made the call
92   @ivar offline: whether the operation failed because the node was
93       offline, as opposed to actual failure; offline=True will always
94       imply failed=True, in order to allow simpler checking if
95       the user doesn't care about the exact failure mode
96
97   """
98   def __init__(self, data=None, failed=False, offline=False,
99                call=None, node=None):
100     self.failed = failed
101     self.offline = offline
102     self.call = call
103     self.node = node
104     if offline:
105       self.failed = True
106       self.error = "Node is marked offline"
107       self.data = self.payload = None
108     elif failed:
109       self.error = data
110       self.data = self.payload = None
111     else:
112       self.data = data
113       self.error = None
114       if isinstance(data, (tuple, list)) and len(data) == 2:
115         self.payload = data[1]
116       else:
117         self.payload = None
118
119   def Raise(self):
120     """If the result has failed, raise an OpExecError.
121
122     This is used so that LU code doesn't have to check for each
123     result, but instead can call this function.
124
125     """
126     if self.failed:
127       raise errors.OpExecError("Call '%s' to node '%s' has failed: %s" %
128                                (self.call, self.node, self.error))
129
130   def RemoteFailMsg(self):
131     """Check if the remote procedure failed.
132
133     This is valid only for RPC calls which return result of the form
134     (status, data | error_msg).
135
136     @return: empty string for succcess, otherwise an error message
137
138     """
139     def _EnsureErr(val):
140       """Helper to ensure we return a 'True' value for error."""
141       if val:
142         return val
143       else:
144         return "No error information"
145
146     if self.failed:
147       return _EnsureErr(self.error)
148     if not isinstance(self.data, (tuple, list)):
149       return "Invalid result type (%s)" % type(self.data)
150     if len(self.data) != 2:
151       return "Invalid result length (%d), expected 2" % len(self.data)
152     if not self.data[0]:
153       return _EnsureErr(self.data[1])
154     return ""
155
156
157 class Client:
158   """RPC Client class.
159
160   This class, given a (remote) method name, a list of parameters and a
161   list of nodes, will contact (in parallel) all nodes, and return a
162   dict of results (key: node name, value: result).
163
164   One current bug is that generic failure is still signalled by
165   'False' result, which is not good. This overloading of values can
166   cause bugs.
167
168   """
169   def __init__(self, procedure, body, port):
170     self.procedure = procedure
171     self.body = body
172     self.port = port
173     self.nc = {}
174
175     self._ssl_params = \
176       http.HttpSslParams(ssl_key_path=constants.SSL_CERT_FILE,
177                          ssl_cert_path=constants.SSL_CERT_FILE)
178
179   def ConnectList(self, node_list, address_list=None):
180     """Add a list of nodes to the target nodes.
181
182     @type node_list: list
183     @param node_list: the list of node names to connect
184     @type address_list: list or None
185     @keyword address_list: either None or a list with node addresses,
186         which must have the same length as the node list
187
188     """
189     if address_list is None:
190       address_list = [None for _ in node_list]
191     else:
192       assert len(node_list) == len(address_list), \
193              "Name and address lists should have the same length"
194     for node, address in zip(node_list, address_list):
195       self.ConnectNode(node, address)
196
197   def ConnectNode(self, name, address=None):
198     """Add a node to the target list.
199
200     @type name: str
201     @param name: the node name
202     @type address: str
203     @keyword address: the node address, if known
204
205     """
206     if address is None:
207       address = name
208
209     self.nc[name] = \
210       http.client.HttpClientRequest(address, self.port, http.HTTP_PUT,
211                                     "/%s" % self.procedure,
212                                     post_data=self.body,
213                                     ssl_params=self._ssl_params,
214                                     ssl_verify_peer=True)
215
216   def GetResults(self):
217     """Call nodes and return results.
218
219     @rtype: list
220     @return: List of RPC results
221
222     """
223     assert _http_manager, "RPC module not intialized"
224
225     _http_manager.ExecRequests(self.nc.values())
226
227     results = {}
228
229     for name, req in self.nc.iteritems():
230       if req.success and req.resp_status_code == http.HTTP_OK:
231         results[name] = RpcResult(data=serializer.LoadJson(req.resp_body),
232                                   node=name, call=self.procedure)
233         continue
234
235       # TODO: Better error reporting
236       if req.error:
237         msg = req.error
238       else:
239         msg = req.resp_body
240
241       logging.error("RPC error in %s from node %s: %s",
242                     self.procedure, name, msg)
243       results[name] = RpcResult(data=msg, failed=True, node=name,
244                                 call=self.procedure)
245
246     return results
247
248
249 class RpcRunner(object):
250   """RPC runner class"""
251
252   def __init__(self, cfg):
253     """Initialized the rpc runner.
254
255     @type cfg:  C{config.ConfigWriter}
256     @param cfg: the configuration object that will be used to get data
257                 about the cluster
258
259     """
260     self._cfg = cfg
261     self.port = utils.GetNodeDaemonPort()
262
263   def _InstDict(self, instance, hvp=None, bep=None):
264     """Convert the given instance to a dict.
265
266     This is done via the instance's ToDict() method and additionally
267     we fill the hvparams with the cluster defaults.
268
269     @type instance: L{objects.Instance}
270     @param instance: an Instance object
271     @type hvp: dict or None
272     @param hvp: a dictionary with overriden hypervisor parameters
273     @type bep: dict or None
274     @param bep: a dictionary with overriden backend parameters
275     @rtype: dict
276     @return: the instance dict, with the hvparams filled with the
277         cluster defaults
278
279     """
280     idict = instance.ToDict()
281     cluster = self._cfg.GetClusterInfo()
282     idict["hvparams"] = cluster.FillHV(instance)
283     if hvp is not None:
284       idict["hvparams"].update(hvp)
285     idict["beparams"] = cluster.FillBE(instance)
286     if bep is not None:
287       idict["beparams"].update(bep)
288     return idict
289
290   def _ConnectList(self, client, node_list, call):
291     """Helper for computing node addresses.
292
293     @type client: L{Client}
294     @param client: a C{Client} instance
295     @type node_list: list
296     @param node_list: the node list we should connect
297     @type call: string
298     @param call: the name of the remote procedure call, for filling in
299         correctly any eventual offline nodes' results
300
301     """
302     all_nodes = self._cfg.GetAllNodesInfo()
303     name_list = []
304     addr_list = []
305     skip_dict = {}
306     for node in node_list:
307       if node in all_nodes:
308         if all_nodes[node].offline:
309           skip_dict[node] = RpcResult(node=node, offline=True, call=call)
310           continue
311         val = all_nodes[node].primary_ip
312       else:
313         val = None
314       addr_list.append(val)
315       name_list.append(node)
316     if name_list:
317       client.ConnectList(name_list, address_list=addr_list)
318     return skip_dict
319
320   def _ConnectNode(self, client, node, call):
321     """Helper for computing one node's address.
322
323     @type client: L{Client}
324     @param client: a C{Client} instance
325     @type node: str
326     @param node: the node we should connect
327     @type call: string
328     @param call: the name of the remote procedure call, for filling in
329         correctly any eventual offline nodes' results
330
331     """
332     node_info = self._cfg.GetNodeInfo(node)
333     if node_info is not None:
334       if node_info.offline:
335         return RpcResult(node=node, offline=True, call=call)
336       addr = node_info.primary_ip
337     else:
338       addr = None
339     client.ConnectNode(node, address=addr)
340
341   def _MultiNodeCall(self, node_list, procedure, args):
342     """Helper for making a multi-node call
343
344     """
345     body = serializer.DumpJson(args, indent=False)
346     c = Client(procedure, body, self.port)
347     skip_dict = self._ConnectList(c, node_list, procedure)
348     skip_dict.update(c.GetResults())
349     return skip_dict
350
351   @classmethod
352   def _StaticMultiNodeCall(cls, node_list, procedure, args,
353                            address_list=None):
354     """Helper for making a multi-node static call
355
356     """
357     body = serializer.DumpJson(args, indent=False)
358     c = Client(procedure, body, utils.GetNodeDaemonPort())
359     c.ConnectList(node_list, address_list=address_list)
360     return c.GetResults()
361
362   def _SingleNodeCall(self, node, procedure, args):
363     """Helper for making a single-node call
364
365     """
366     body = serializer.DumpJson(args, indent=False)
367     c = Client(procedure, body, self.port)
368     result = self._ConnectNode(c, node, procedure)
369     if result is None:
370       # we did connect, node is not offline
371       result = c.GetResults()[node]
372     return result
373
374   @classmethod
375   def _StaticSingleNodeCall(cls, node, procedure, args):
376     """Helper for making a single-node static call
377
378     """
379     body = serializer.DumpJson(args, indent=False)
380     c = Client(procedure, body, utils.GetNodeDaemonPort())
381     c.ConnectNode(node)
382     return c.GetResults()[node]
383
384   @staticmethod
385   def _Compress(data):
386     """Compresses a string for transport over RPC.
387
388     Small amounts of data are not compressed.
389
390     @type data: str
391     @param data: Data
392     @rtype: tuple
393     @return: Encoded data to send
394
395     """
396     # Small amounts of data are not compressed
397     if len(data) < 512:
398       return (constants.RPC_ENCODING_NONE, data)
399
400     # Compress with zlib and encode in base64
401     return (constants.RPC_ENCODING_ZLIB_BASE64,
402             base64.b64encode(zlib.compress(data, 3)))
403
404   #
405   # Begin RPC calls
406   #
407
408   def call_volume_list(self, node_list, vg_name):
409     """Gets the logical volumes present in a given volume group.
410
411     This is a multi-node call.
412
413     """
414     return self._MultiNodeCall(node_list, "volume_list", [vg_name])
415
416   def call_vg_list(self, node_list):
417     """Gets the volume group list.
418
419     This is a multi-node call.
420
421     """
422     return self._MultiNodeCall(node_list, "vg_list", [])
423
424   def call_bridges_exist(self, node, bridges_list):
425     """Checks if a node has all the bridges given.
426
427     This method checks if all bridges given in the bridges_list are
428     present on the remote node, so that an instance that uses interfaces
429     on those bridges can be started.
430
431     This is a single-node call.
432
433     """
434     return self._SingleNodeCall(node, "bridges_exist", [bridges_list])
435
436   def call_instance_start(self, node, instance, hvp, bep):
437     """Starts an instance.
438
439     This is a single-node call.
440
441     """
442     idict = self._InstDict(instance, hvp=hvp, bep=bep)
443     return self._SingleNodeCall(node, "instance_start", [idict])
444
445   def call_instance_shutdown(self, node, instance):
446     """Stops an instance.
447
448     This is a single-node call.
449
450     """
451     return self._SingleNodeCall(node, "instance_shutdown",
452                                 [self._InstDict(instance)])
453
454   def call_migration_info(self, node, instance):
455     """Gather the information necessary to prepare an instance migration.
456
457     This is a single-node call.
458
459     @type node: string
460     @param node: the node on which the instance is currently running
461     @type instance: C{objects.Instance}
462     @param instance: the instance definition
463
464     """
465     return self._SingleNodeCall(node, "migration_info",
466                                 [self._InstDict(instance)])
467
468   def call_accept_instance(self, node, instance, info, target):
469     """Prepare a node to accept an instance.
470
471     This is a single-node call.
472
473     @type node: string
474     @param node: the target node for the migration
475     @type instance: C{objects.Instance}
476     @param instance: the instance definition
477     @type info: opaque/hypervisor specific (string/data)
478     @param info: result for the call_migration_info call
479     @type target: string
480     @param target: target hostname (usually ip address) (on the node itself)
481
482     """
483     return self._SingleNodeCall(node, "accept_instance",
484                                 [self._InstDict(instance), info, target])
485
486   def call_finalize_migration(self, node, instance, info, success):
487     """Finalize any target-node migration specific operation.
488
489     This is called both in case of a successful migration and in case of error
490     (in which case it should abort the migration).
491
492     This is a single-node call.
493
494     @type node: string
495     @param node: the target node for the migration
496     @type instance: C{objects.Instance}
497     @param instance: the instance definition
498     @type info: opaque/hypervisor specific (string/data)
499     @param info: result for the call_migration_info call
500     @type success: boolean
501     @param success: whether the migration was a success or a failure
502
503     """
504     return self._SingleNodeCall(node, "finalize_migration",
505                                 [self._InstDict(instance), info, success])
506
507   def call_instance_migrate(self, node, instance, target, live):
508     """Migrate an instance.
509
510     This is a single-node call.
511
512     @type node: string
513     @param node: the node on which the instance is currently running
514     @type instance: C{objects.Instance}
515     @param instance: the instance definition
516     @type target: string
517     @param target: the target node name
518     @type live: boolean
519     @param live: whether the migration should be done live or not (the
520         interpretation of this parameter is left to the hypervisor)
521
522     """
523     return self._SingleNodeCall(node, "instance_migrate",
524                                 [self._InstDict(instance), target, live])
525
526   def call_instance_reboot(self, node, instance, reboot_type):
527     """Reboots an instance.
528
529     This is a single-node call.
530
531     """
532     return self._SingleNodeCall(node, "instance_reboot",
533                                 [self._InstDict(instance), reboot_type])
534
535   def call_instance_os_add(self, node, inst):
536     """Installs an OS on the given instance.
537
538     This is a single-node call.
539
540     """
541     return self._SingleNodeCall(node, "instance_os_add",
542                                 [self._InstDict(inst)])
543
544   def call_instance_run_rename(self, node, inst, old_name):
545     """Run the OS rename script for an instance.
546
547     This is a single-node call.
548
549     """
550     return self._SingleNodeCall(node, "instance_run_rename",
551                                 [self._InstDict(inst), old_name])
552
553   def call_instance_info(self, node, instance, hname):
554     """Returns information about a single instance.
555
556     This is a single-node call.
557
558     @type node: list
559     @param node: the list of nodes to query
560     @type instance: string
561     @param instance: the instance name
562     @type hname: string
563     @param hname: the hypervisor type of the instance
564
565     """
566     return self._SingleNodeCall(node, "instance_info", [instance, hname])
567
568   def call_instance_migratable(self, node, instance):
569     """Checks whether the given instance can be migrated.
570
571     This is a single-node call.
572
573     @param node: the node to query
574     @type instance: L{objects.Instance}
575     @param instance: the instance to check
576
577
578     """
579     return self._SingleNodeCall(node, "instance_migratable",
580                                 [self._InstDict(instance)])
581
582   def call_all_instances_info(self, node_list, hypervisor_list):
583     """Returns information about all instances on the given nodes.
584
585     This is a multi-node call.
586
587     @type node_list: list
588     @param node_list: the list of nodes to query
589     @type hypervisor_list: list
590     @param hypervisor_list: the hypervisors to query for instances
591
592     """
593     return self._MultiNodeCall(node_list, "all_instances_info",
594                                [hypervisor_list])
595
596   def call_instance_list(self, node_list, hypervisor_list):
597     """Returns the list of running instances on a given node.
598
599     This is a multi-node call.
600
601     @type node_list: list
602     @param node_list: the list of nodes to query
603     @type hypervisor_list: list
604     @param hypervisor_list: the hypervisors to query for instances
605
606     """
607     return self._MultiNodeCall(node_list, "instance_list", [hypervisor_list])
608
609   def call_node_tcp_ping(self, node, source, target, port, timeout,
610                          live_port_needed):
611     """Do a TcpPing on the remote node
612
613     This is a single-node call.
614
615     """
616     return self._SingleNodeCall(node, "node_tcp_ping",
617                                 [source, target, port, timeout,
618                                  live_port_needed])
619
620   def call_node_has_ip_address(self, node, address):
621     """Checks if a node has the given IP address.
622
623     This is a single-node call.
624
625     """
626     return self._SingleNodeCall(node, "node_has_ip_address", [address])
627
628   def call_node_info(self, node_list, vg_name, hypervisor_type):
629     """Return node information.
630
631     This will return memory information and volume group size and free
632     space.
633
634     This is a multi-node call.
635
636     @type node_list: list
637     @param node_list: the list of nodes to query
638     @type vg_name: C{string}
639     @param vg_name: the name of the volume group to ask for disk space
640         information
641     @type hypervisor_type: C{str}
642     @param hypervisor_type: the name of the hypervisor to ask for
643         memory information
644
645     """
646     retux = self._MultiNodeCall(node_list, "node_info",
647                                 [vg_name, hypervisor_type])
648
649     for result in retux.itervalues():
650       if result.failed or not isinstance(result.data, dict):
651         result.data = {}
652       if result.offline:
653         log_name = None
654       else:
655         log_name = "call_node_info"
656
657       utils.CheckDict(result.data, {
658         'memory_total' : '-',
659         'memory_dom0' : '-',
660         'memory_free' : '-',
661         'vg_size' : 'node_unreachable',
662         'vg_free' : '-',
663         }, log_name)
664     return retux
665
666   def call_node_add(self, node, dsa, dsapub, rsa, rsapub, ssh, sshpub):
667     """Add a node to the cluster.
668
669     This is a single-node call.
670
671     """
672     return self._SingleNodeCall(node, "node_add",
673                                 [dsa, dsapub, rsa, rsapub, ssh, sshpub])
674
675   def call_node_verify(self, node_list, checkdict, cluster_name):
676     """Request verification of given parameters.
677
678     This is a multi-node call.
679
680     """
681     return self._MultiNodeCall(node_list, "node_verify",
682                                [checkdict, cluster_name])
683
684   @classmethod
685   def call_node_start_master(cls, node, start_daemons):
686     """Tells a node to activate itself as a master.
687
688     This is a single-node call.
689
690     """
691     return cls._StaticSingleNodeCall(node, "node_start_master",
692                                      [start_daemons])
693
694   @classmethod
695   def call_node_stop_master(cls, node, stop_daemons):
696     """Tells a node to demote itself from master status.
697
698     This is a single-node call.
699
700     """
701     return cls._StaticSingleNodeCall(node, "node_stop_master", [stop_daemons])
702
703   @classmethod
704   def call_master_info(cls, node_list):
705     """Query master info.
706
707     This is a multi-node call.
708
709     """
710     # TODO: should this method query down nodes?
711     return cls._StaticMultiNodeCall(node_list, "master_info", [])
712
713   def call_version(self, node_list):
714     """Query node version.
715
716     This is a multi-node call.
717
718     """
719     return self._MultiNodeCall(node_list, "version", [])
720
721   def call_blockdev_create(self, node, bdev, size, owner, on_primary, info):
722     """Request creation of a given block device.
723
724     This is a single-node call.
725
726     """
727     return self._SingleNodeCall(node, "blockdev_create",
728                                 [bdev.ToDict(), size, owner, on_primary, info])
729
730   def call_blockdev_remove(self, node, bdev):
731     """Request removal of a given block device.
732
733     This is a single-node call.
734
735     """
736     return self._SingleNodeCall(node, "blockdev_remove", [bdev.ToDict()])
737
738   def call_blockdev_rename(self, node, devlist):
739     """Request rename of the given block devices.
740
741     This is a single-node call.
742
743     """
744     return self._SingleNodeCall(node, "blockdev_rename",
745                                 [(d.ToDict(), uid) for d, uid in devlist])
746
747   def call_blockdev_assemble(self, node, disk, owner, on_primary):
748     """Request assembling of a given block device.
749
750     This is a single-node call.
751
752     """
753     return self._SingleNodeCall(node, "blockdev_assemble",
754                                 [disk.ToDict(), owner, on_primary])
755
756   def call_blockdev_shutdown(self, node, disk):
757     """Request shutdown of a given block device.
758
759     This is a single-node call.
760
761     """
762     return self._SingleNodeCall(node, "blockdev_shutdown", [disk.ToDict()])
763
764   def call_blockdev_addchildren(self, node, bdev, ndevs):
765     """Request adding a list of children to a (mirroring) device.
766
767     This is a single-node call.
768
769     """
770     return self._SingleNodeCall(node, "blockdev_addchildren",
771                                 [bdev.ToDict(),
772                                  [disk.ToDict() for disk in ndevs]])
773
774   def call_blockdev_removechildren(self, node, bdev, ndevs):
775     """Request removing a list of children from a (mirroring) device.
776
777     This is a single-node call.
778
779     """
780     return self._SingleNodeCall(node, "blockdev_removechildren",
781                                 [bdev.ToDict(),
782                                  [disk.ToDict() for disk in ndevs]])
783
784   def call_blockdev_getmirrorstatus(self, node, disks):
785     """Request status of a (mirroring) device.
786
787     This is a single-node call.
788
789     """
790     return self._SingleNodeCall(node, "blockdev_getmirrorstatus",
791                                 [dsk.ToDict() for dsk in disks])
792
793   def call_blockdev_find(self, node, disk):
794     """Request identification of a given block device.
795
796     This is a single-node call.
797
798     """
799     return self._SingleNodeCall(node, "blockdev_find", [disk.ToDict()])
800
801   def call_blockdev_close(self, node, instance_name, disks):
802     """Closes the given block devices.
803
804     This is a single-node call.
805
806     """
807     params = [instance_name, [cf.ToDict() for cf in disks]]
808     return self._SingleNodeCall(node, "blockdev_close", params)
809
810   def call_drbd_disconnect_net(self, node_list, nodes_ip, disks):
811     """Disconnects the network of the given drbd devices.
812
813     This is a multi-node call.
814
815     """
816     return self._MultiNodeCall(node_list, "drbd_disconnect_net",
817                                [nodes_ip, [cf.ToDict() for cf in disks]])
818
819   def call_drbd_attach_net(self, node_list, nodes_ip,
820                            disks, instance_name, multimaster):
821     """Disconnects the given drbd devices.
822
823     This is a multi-node call.
824
825     """
826     return self._MultiNodeCall(node_list, "drbd_attach_net",
827                                [nodes_ip, [cf.ToDict() for cf in disks],
828                                 instance_name, multimaster])
829
830   def call_drbd_wait_sync(self, node_list, nodes_ip, disks):
831     """Waits for the synchronization of drbd devices is complete.
832
833     This is a multi-node call.
834
835     """
836     return self._MultiNodeCall(node_list, "drbd_wait_sync",
837                                [nodes_ip, [cf.ToDict() for cf in disks]])
838
839   @classmethod
840   def call_upload_file(cls, node_list, file_name, address_list=None):
841     """Upload a file.
842
843     The node will refuse the operation in case the file is not on the
844     approved file list.
845
846     This is a multi-node call.
847
848     @type node_list: list
849     @param node_list: the list of node names to upload to
850     @type file_name: str
851     @param file_name: the filename to upload
852     @type address_list: list or None
853     @keyword address_list: an optional list of node addresses, in order
854         to optimize the RPC speed
855
856     """
857     file_contents = utils.ReadFile(file_name)
858     data = cls._Compress(file_contents)
859     st = os.stat(file_name)
860     params = [file_name, data, st.st_mode, st.st_uid, st.st_gid,
861               st.st_atime, st.st_mtime]
862     return cls._StaticMultiNodeCall(node_list, "upload_file", params,
863                                     address_list=address_list)
864
865   @classmethod
866   def call_write_ssconf_files(cls, node_list, values):
867     """Write ssconf files.
868
869     This is a multi-node call.
870
871     """
872     return cls._StaticMultiNodeCall(node_list, "write_ssconf_files", [values])
873
874   def call_os_diagnose(self, node_list):
875     """Request a diagnose of OS definitions.
876
877     This is a multi-node call.
878
879     """
880     result = self._MultiNodeCall(node_list, "os_diagnose", [])
881
882     for node_result in result.values():
883       if not node_result.failed and node_result.data:
884         node_result.data = [objects.OS.FromDict(oss)
885                             for oss in node_result.data]
886     return result
887
888   def call_os_get(self, node, name):
889     """Returns an OS definition.
890
891     This is a single-node call.
892
893     """
894     result = self._SingleNodeCall(node, "os_get", [name])
895     if not result.failed and isinstance(result.data, dict):
896       result.data = objects.OS.FromDict(result.data)
897     return result
898
899   def call_hooks_runner(self, node_list, hpath, phase, env):
900     """Call the hooks runner.
901
902     Args:
903       - op: the OpCode instance
904       - env: a dictionary with the environment
905
906     This is a multi-node call.
907
908     """
909     params = [hpath, phase, env]
910     return self._MultiNodeCall(node_list, "hooks_runner", params)
911
912   def call_iallocator_runner(self, node, name, idata):
913     """Call an iallocator on a remote node
914
915     Args:
916       - name: the iallocator name
917       - input: the json-encoded input string
918
919     This is a single-node call.
920
921     """
922     return self._SingleNodeCall(node, "iallocator_runner", [name, idata])
923
924   def call_blockdev_grow(self, node, cf_bdev, amount):
925     """Request a snapshot of the given block device.
926
927     This is a single-node call.
928
929     """
930     return self._SingleNodeCall(node, "blockdev_grow",
931                                 [cf_bdev.ToDict(), amount])
932
933   def call_blockdev_snapshot(self, node, cf_bdev):
934     """Request a snapshot of the given block device.
935
936     This is a single-node call.
937
938     """
939     return self._SingleNodeCall(node, "blockdev_snapshot", [cf_bdev.ToDict()])
940
941   def call_snapshot_export(self, node, snap_bdev, dest_node, instance,
942                            cluster_name, idx):
943     """Request the export of a given snapshot.
944
945     This is a single-node call.
946
947     """
948     return self._SingleNodeCall(node, "snapshot_export",
949                                 [snap_bdev.ToDict(), dest_node,
950                                  self._InstDict(instance), cluster_name, idx])
951
952   def call_finalize_export(self, node, instance, snap_disks):
953     """Request the completion of an export operation.
954
955     This writes the export config file, etc.
956
957     This is a single-node call.
958
959     """
960     flat_disks = []
961     for disk in snap_disks:
962       flat_disks.append(disk.ToDict())
963
964     return self._SingleNodeCall(node, "finalize_export",
965                                 [self._InstDict(instance), flat_disks])
966
967   def call_export_info(self, node, path):
968     """Queries the export information in a given path.
969
970     This is a single-node call.
971
972     """
973     result = self._SingleNodeCall(node, "export_info", [path])
974     if not result.failed and result.data:
975       result.data = objects.SerializableConfigParser.Loads(str(result.data))
976     return result
977
978   def call_instance_os_import(self, node, inst, src_node, src_images,
979                               cluster_name):
980     """Request the import of a backup into an instance.
981
982     This is a single-node call.
983
984     """
985     return self._SingleNodeCall(node, "instance_os_import",
986                                 [self._InstDict(inst), src_node, src_images,
987                                  cluster_name])
988
989   def call_export_list(self, node_list):
990     """Gets the stored exports list.
991
992     This is a multi-node call.
993
994     """
995     return self._MultiNodeCall(node_list, "export_list", [])
996
997   def call_export_remove(self, node, export):
998     """Requests removal of a given export.
999
1000     This is a single-node call.
1001
1002     """
1003     return self._SingleNodeCall(node, "export_remove", [export])
1004
1005   @classmethod
1006   def call_node_leave_cluster(cls, node):
1007     """Requests a node to clean the cluster information it has.
1008
1009     This will remove the configuration information from the ganeti data
1010     dir.
1011
1012     This is a single-node call.
1013
1014     """
1015     return cls._StaticSingleNodeCall(node, "node_leave_cluster", [])
1016
1017   def call_node_volumes(self, node_list):
1018     """Gets all volumes on node(s).
1019
1020     This is a multi-node call.
1021
1022     """
1023     return self._MultiNodeCall(node_list, "node_volumes", [])
1024
1025   def call_node_demote_from_mc(self, node):
1026     """Demote a node from the master candidate role.
1027
1028     This is a single-node call.
1029
1030     """
1031     return self._SingleNodeCall(node, "node_demote_from_mc", [])
1032
1033   def call_test_delay(self, node_list, duration):
1034     """Sleep for a fixed time on given node(s).
1035
1036     This is a multi-node call.
1037
1038     """
1039     return self._MultiNodeCall(node_list, "test_delay", [duration])
1040
1041   def call_file_storage_dir_create(self, node, file_storage_dir):
1042     """Create the given file storage directory.
1043
1044     This is a single-node call.
1045
1046     """
1047     return self._SingleNodeCall(node, "file_storage_dir_create",
1048                                 [file_storage_dir])
1049
1050   def call_file_storage_dir_remove(self, node, file_storage_dir):
1051     """Remove the given file storage directory.
1052
1053     This is a single-node call.
1054
1055     """
1056     return self._SingleNodeCall(node, "file_storage_dir_remove",
1057                                 [file_storage_dir])
1058
1059   def call_file_storage_dir_rename(self, node, old_file_storage_dir,
1060                                    new_file_storage_dir):
1061     """Rename file storage directory.
1062
1063     This is a single-node call.
1064
1065     """
1066     return self._SingleNodeCall(node, "file_storage_dir_rename",
1067                                 [old_file_storage_dir, new_file_storage_dir])
1068
1069   @classmethod
1070   def call_jobqueue_update(cls, node_list, address_list, file_name, content):
1071     """Update job queue.
1072
1073     This is a multi-node call.
1074
1075     """
1076     return cls._StaticMultiNodeCall(node_list, "jobqueue_update",
1077                                     [file_name, cls._Compress(content)],
1078                                     address_list=address_list)
1079
1080   @classmethod
1081   def call_jobqueue_purge(cls, node):
1082     """Purge job queue.
1083
1084     This is a single-node call.
1085
1086     """
1087     return cls._StaticSingleNodeCall(node, "jobqueue_purge", [])
1088
1089   @classmethod
1090   def call_jobqueue_rename(cls, node_list, address_list, rename):
1091     """Rename a job queue file.
1092
1093     This is a multi-node call.
1094
1095     """
1096     return cls._StaticMultiNodeCall(node_list, "jobqueue_rename", rename,
1097                                     address_list=address_list)
1098
1099   @classmethod
1100   def call_jobqueue_set_drain(cls, node_list, drain_flag):
1101     """Set the drain flag on the queue.
1102
1103     This is a multi-node call.
1104
1105     @type node_list: list
1106     @param node_list: the list of nodes to query
1107     @type drain_flag: bool
1108     @param drain_flag: if True, will set the drain flag, otherwise reset it.
1109
1110     """
1111     return cls._StaticMultiNodeCall(node_list, "jobqueue_set_drain",
1112                                     [drain_flag])
1113
1114   def call_hypervisor_validate_params(self, node_list, hvname, hvparams):
1115     """Validate the hypervisor params.
1116
1117     This is a multi-node call.
1118
1119     @type node_list: list
1120     @param node_list: the list of nodes to query
1121     @type hvname: string
1122     @param hvname: the hypervisor name
1123     @type hvparams: dict
1124     @param hvparams: the hypervisor parameters to be validated
1125
1126     """
1127     cluster = self._cfg.GetClusterInfo()
1128     hv_full = cluster.FillDict(cluster.hvparams.get(hvname, {}), hvparams)
1129     return self._MultiNodeCall(node_list, "hypervisor_validate_params",
1130                                [hvname, hv_full])