Convert export_info rpc to new style result
[ganeti-local] / lib / rpc.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Inter-node RPC library.
23
24 """
25
26 # pylint: disable-msg=C0103,R0201,R0904
27 # C0103: Invalid name, since call_ are not valid
28 # R0201: Method could be a function, we keep all rpcs instance methods
29 # as not to change them back and forth between static/instance methods
30 # if they need to start using instance attributes
31 # R0904: Too many public methods
32
33 import os
34 import socket
35 import logging
36 import zlib
37 import base64
38
39 from ganeti import utils
40 from ganeti import objects
41 from ganeti import http
42 from ganeti import serializer
43 from ganeti import constants
44 from ganeti import errors
45
46 import ganeti.http.client
47
48
49 # Module level variable
50 _http_manager = None
51
52
53 def Init():
54   """Initializes the module-global HTTP client manager.
55
56   Must be called before using any RPC function.
57
58   """
59   global _http_manager
60
61   assert not _http_manager, "RPC module initialized more than once"
62
63   _http_manager = http.client.HttpClientManager()
64
65
66 def Shutdown():
67   """Stops the module-global HTTP client manager.
68
69   Must be called before quitting the program.
70
71   """
72   global _http_manager
73
74   if _http_manager:
75     _http_manager.Shutdown()
76     _http_manager = None
77
78
79 class RpcResult(object):
80   """RPC Result class.
81
82   This class holds an RPC result. It is needed since in multi-node
83   calls we can't raise an exception just because one one out of many
84   failed, and therefore we use this class to encapsulate the result.
85
86   @ivar data: the data payload, for successfull results, or None
87   @type failed: boolean
88   @ivar failed: whether the operation failed at RPC level (not
89       application level on the remote node)
90   @ivar call: the name of the RPC call
91   @ivar node: the name of the node to which we made the call
92   @ivar offline: whether the operation failed because the node was
93       offline, as opposed to actual failure; offline=True will always
94       imply failed=True, in order to allow simpler checking if
95       the user doesn't care about the exact failure mode
96
97   """
98   def __init__(self, data=None, failed=False, offline=False,
99                call=None, node=None):
100     self.failed = failed
101     self.offline = offline
102     self.call = call
103     self.node = node
104     if offline:
105       self.failed = True
106       self.error = "Node is marked offline"
107       self.data = self.payload = None
108     elif failed:
109       self.error = data
110       self.data = self.payload = None
111     else:
112       self.data = data
113       self.error = None
114       if isinstance(data, (tuple, list)) and len(data) == 2:
115         self.payload = data[1]
116       else:
117         self.payload = None
118
119   def Raise(self):
120     """If the result has failed, raise an OpExecError.
121
122     This is used so that LU code doesn't have to check for each
123     result, but instead can call this function.
124
125     """
126     if self.failed:
127       raise errors.OpExecError("Call '%s' to node '%s' has failed: %s" %
128                                (self.call, self.node, self.error))
129
130   def RemoteFailMsg(self):
131     """Check if the remote procedure failed.
132
133     This is valid only for RPC calls which return result of the form
134     (status, data | error_msg).
135
136     @return: empty string for succcess, otherwise an error message
137
138     """
139     def _EnsureErr(val):
140       """Helper to ensure we return a 'True' value for error."""
141       if val:
142         return val
143       else:
144         return "No error information"
145
146     if self.failed:
147       return _EnsureErr(self.error)
148     if not isinstance(self.data, (tuple, list)):
149       return "Invalid result type (%s)" % type(self.data)
150     if len(self.data) != 2:
151       return "Invalid result length (%d), expected 2" % len(self.data)
152     if not self.data[0]:
153       return _EnsureErr(self.data[1])
154     return ""
155
156
157 class Client:
158   """RPC Client class.
159
160   This class, given a (remote) method name, a list of parameters and a
161   list of nodes, will contact (in parallel) all nodes, and return a
162   dict of results (key: node name, value: result).
163
164   One current bug is that generic failure is still signalled by
165   'False' result, which is not good. This overloading of values can
166   cause bugs.
167
168   """
169   def __init__(self, procedure, body, port):
170     self.procedure = procedure
171     self.body = body
172     self.port = port
173     self.nc = {}
174
175     self._ssl_params = \
176       http.HttpSslParams(ssl_key_path=constants.SSL_CERT_FILE,
177                          ssl_cert_path=constants.SSL_CERT_FILE)
178
179   def ConnectList(self, node_list, address_list=None):
180     """Add a list of nodes to the target nodes.
181
182     @type node_list: list
183     @param node_list: the list of node names to connect
184     @type address_list: list or None
185     @keyword address_list: either None or a list with node addresses,
186         which must have the same length as the node list
187
188     """
189     if address_list is None:
190       address_list = [None for _ in node_list]
191     else:
192       assert len(node_list) == len(address_list), \
193              "Name and address lists should have the same length"
194     for node, address in zip(node_list, address_list):
195       self.ConnectNode(node, address)
196
197   def ConnectNode(self, name, address=None):
198     """Add a node to the target list.
199
200     @type name: str
201     @param name: the node name
202     @type address: str
203     @keyword address: the node address, if known
204
205     """
206     if address is None:
207       address = name
208
209     self.nc[name] = \
210       http.client.HttpClientRequest(address, self.port, http.HTTP_PUT,
211                                     "/%s" % self.procedure,
212                                     post_data=self.body,
213                                     ssl_params=self._ssl_params,
214                                     ssl_verify_peer=True)
215
216   def GetResults(self):
217     """Call nodes and return results.
218
219     @rtype: list
220     @return: List of RPC results
221
222     """
223     assert _http_manager, "RPC module not intialized"
224
225     _http_manager.ExecRequests(self.nc.values())
226
227     results = {}
228
229     for name, req in self.nc.iteritems():
230       if req.success and req.resp_status_code == http.HTTP_OK:
231         results[name] = RpcResult(data=serializer.LoadJson(req.resp_body),
232                                   node=name, call=self.procedure)
233         continue
234
235       # TODO: Better error reporting
236       if req.error:
237         msg = req.error
238       else:
239         msg = req.resp_body
240
241       logging.error("RPC error in %s from node %s: %s",
242                     self.procedure, name, msg)
243       results[name] = RpcResult(data=msg, failed=True, node=name,
244                                 call=self.procedure)
245
246     return results
247
248
249 class RpcRunner(object):
250   """RPC runner class"""
251
252   def __init__(self, cfg):
253     """Initialized the rpc runner.
254
255     @type cfg:  C{config.ConfigWriter}
256     @param cfg: the configuration object that will be used to get data
257                 about the cluster
258
259     """
260     self._cfg = cfg
261     self.port = utils.GetNodeDaemonPort()
262
263   def _InstDict(self, instance, hvp=None, bep=None):
264     """Convert the given instance to a dict.
265
266     This is done via the instance's ToDict() method and additionally
267     we fill the hvparams with the cluster defaults.
268
269     @type instance: L{objects.Instance}
270     @param instance: an Instance object
271     @type hvp: dict or None
272     @param hvp: a dictionary with overriden hypervisor parameters
273     @type bep: dict or None
274     @param bep: a dictionary with overriden backend parameters
275     @rtype: dict
276     @return: the instance dict, with the hvparams filled with the
277         cluster defaults
278
279     """
280     idict = instance.ToDict()
281     cluster = self._cfg.GetClusterInfo()
282     idict["hvparams"] = cluster.FillHV(instance)
283     if hvp is not None:
284       idict["hvparams"].update(hvp)
285     idict["beparams"] = cluster.FillBE(instance)
286     if bep is not None:
287       idict["beparams"].update(bep)
288     for nic in idict["nics"]:
289       nic['nicparams'] = objects.FillDict(
290         cluster.nicparams[constants.PP_DEFAULT],
291         nic['nicparams'])
292     return idict
293
294   def _ConnectList(self, client, node_list, call):
295     """Helper for computing node addresses.
296
297     @type client: L{Client}
298     @param client: a C{Client} instance
299     @type node_list: list
300     @param node_list: the node list we should connect
301     @type call: string
302     @param call: the name of the remote procedure call, for filling in
303         correctly any eventual offline nodes' results
304
305     """
306     all_nodes = self._cfg.GetAllNodesInfo()
307     name_list = []
308     addr_list = []
309     skip_dict = {}
310     for node in node_list:
311       if node in all_nodes:
312         if all_nodes[node].offline:
313           skip_dict[node] = RpcResult(node=node, offline=True, call=call)
314           continue
315         val = all_nodes[node].primary_ip
316       else:
317         val = None
318       addr_list.append(val)
319       name_list.append(node)
320     if name_list:
321       client.ConnectList(name_list, address_list=addr_list)
322     return skip_dict
323
324   def _ConnectNode(self, client, node, call):
325     """Helper for computing one node's address.
326
327     @type client: L{Client}
328     @param client: a C{Client} instance
329     @type node: str
330     @param node: the node we should connect
331     @type call: string
332     @param call: the name of the remote procedure call, for filling in
333         correctly any eventual offline nodes' results
334
335     """
336     node_info = self._cfg.GetNodeInfo(node)
337     if node_info is not None:
338       if node_info.offline:
339         return RpcResult(node=node, offline=True, call=call)
340       addr = node_info.primary_ip
341     else:
342       addr = None
343     client.ConnectNode(node, address=addr)
344
345   def _MultiNodeCall(self, node_list, procedure, args):
346     """Helper for making a multi-node call
347
348     """
349     body = serializer.DumpJson(args, indent=False)
350     c = Client(procedure, body, self.port)
351     skip_dict = self._ConnectList(c, node_list, procedure)
352     skip_dict.update(c.GetResults())
353     return skip_dict
354
355   @classmethod
356   def _StaticMultiNodeCall(cls, node_list, procedure, args,
357                            address_list=None):
358     """Helper for making a multi-node static call
359
360     """
361     body = serializer.DumpJson(args, indent=False)
362     c = Client(procedure, body, utils.GetNodeDaemonPort())
363     c.ConnectList(node_list, address_list=address_list)
364     return c.GetResults()
365
366   def _SingleNodeCall(self, node, procedure, args):
367     """Helper for making a single-node call
368
369     """
370     body = serializer.DumpJson(args, indent=False)
371     c = Client(procedure, body, self.port)
372     result = self._ConnectNode(c, node, procedure)
373     if result is None:
374       # we did connect, node is not offline
375       result = c.GetResults()[node]
376     return result
377
378   @classmethod
379   def _StaticSingleNodeCall(cls, node, procedure, args):
380     """Helper for making a single-node static call
381
382     """
383     body = serializer.DumpJson(args, indent=False)
384     c = Client(procedure, body, utils.GetNodeDaemonPort())
385     c.ConnectNode(node)
386     return c.GetResults()[node]
387
388   @staticmethod
389   def _Compress(data):
390     """Compresses a string for transport over RPC.
391
392     Small amounts of data are not compressed.
393
394     @type data: str
395     @param data: Data
396     @rtype: tuple
397     @return: Encoded data to send
398
399     """
400     # Small amounts of data are not compressed
401     if len(data) < 512:
402       return (constants.RPC_ENCODING_NONE, data)
403
404     # Compress with zlib and encode in base64
405     return (constants.RPC_ENCODING_ZLIB_BASE64,
406             base64.b64encode(zlib.compress(data, 3)))
407
408   #
409   # Begin RPC calls
410   #
411
412   def call_volume_list(self, node_list, vg_name):
413     """Gets the logical volumes present in a given volume group.
414
415     This is a multi-node call.
416
417     """
418     return self._MultiNodeCall(node_list, "volume_list", [vg_name])
419
420   def call_vg_list(self, node_list):
421     """Gets the volume group list.
422
423     This is a multi-node call.
424
425     """
426     return self._MultiNodeCall(node_list, "vg_list", [])
427
428   def call_bridges_exist(self, node, bridges_list):
429     """Checks if a node has all the bridges given.
430
431     This method checks if all bridges given in the bridges_list are
432     present on the remote node, so that an instance that uses interfaces
433     on those bridges can be started.
434
435     This is a single-node call.
436
437     """
438     return self._SingleNodeCall(node, "bridges_exist", [bridges_list])
439
440   def call_instance_start(self, node, instance, hvp, bep):
441     """Starts an instance.
442
443     This is a single-node call.
444
445     """
446     idict = self._InstDict(instance, hvp=hvp, bep=bep)
447     return self._SingleNodeCall(node, "instance_start", [idict])
448
449   def call_instance_shutdown(self, node, instance):
450     """Stops an instance.
451
452     This is a single-node call.
453
454     """
455     return self._SingleNodeCall(node, "instance_shutdown",
456                                 [self._InstDict(instance)])
457
458   def call_migration_info(self, node, instance):
459     """Gather the information necessary to prepare an instance migration.
460
461     This is a single-node call.
462
463     @type node: string
464     @param node: the node on which the instance is currently running
465     @type instance: C{objects.Instance}
466     @param instance: the instance definition
467
468     """
469     return self._SingleNodeCall(node, "migration_info",
470                                 [self._InstDict(instance)])
471
472   def call_accept_instance(self, node, instance, info, target):
473     """Prepare a node to accept an instance.
474
475     This is a single-node call.
476
477     @type node: string
478     @param node: the target node for the migration
479     @type instance: C{objects.Instance}
480     @param instance: the instance definition
481     @type info: opaque/hypervisor specific (string/data)
482     @param info: result for the call_migration_info call
483     @type target: string
484     @param target: target hostname (usually ip address) (on the node itself)
485
486     """
487     return self._SingleNodeCall(node, "accept_instance",
488                                 [self._InstDict(instance), info, target])
489
490   def call_finalize_migration(self, node, instance, info, success):
491     """Finalize any target-node migration specific operation.
492
493     This is called both in case of a successful migration and in case of error
494     (in which case it should abort the migration).
495
496     This is a single-node call.
497
498     @type node: string
499     @param node: the target node for the migration
500     @type instance: C{objects.Instance}
501     @param instance: the instance definition
502     @type info: opaque/hypervisor specific (string/data)
503     @param info: result for the call_migration_info call
504     @type success: boolean
505     @param success: whether the migration was a success or a failure
506
507     """
508     return self._SingleNodeCall(node, "finalize_migration",
509                                 [self._InstDict(instance), info, success])
510
511   def call_instance_migrate(self, node, instance, target, live):
512     """Migrate an instance.
513
514     This is a single-node call.
515
516     @type node: string
517     @param node: the node on which the instance is currently running
518     @type instance: C{objects.Instance}
519     @param instance: the instance definition
520     @type target: string
521     @param target: the target node name
522     @type live: boolean
523     @param live: whether the migration should be done live or not (the
524         interpretation of this parameter is left to the hypervisor)
525
526     """
527     return self._SingleNodeCall(node, "instance_migrate",
528                                 [self._InstDict(instance), target, live])
529
530   def call_instance_reboot(self, node, instance, reboot_type):
531     """Reboots an instance.
532
533     This is a single-node call.
534
535     """
536     return self._SingleNodeCall(node, "instance_reboot",
537                                 [self._InstDict(instance), reboot_type])
538
539   def call_instance_os_add(self, node, inst, reinstall):
540     """Installs an OS on the given instance.
541
542     This is a single-node call.
543
544     """
545     return self._SingleNodeCall(node, "instance_os_add",
546                                 [self._InstDict(inst), reinstall])
547
548   def call_instance_run_rename(self, node, inst, old_name):
549     """Run the OS rename script for an instance.
550
551     This is a single-node call.
552
553     """
554     return self._SingleNodeCall(node, "instance_run_rename",
555                                 [self._InstDict(inst), old_name])
556
557   def call_instance_info(self, node, instance, hname):
558     """Returns information about a single instance.
559
560     This is a single-node call.
561
562     @type node: list
563     @param node: the list of nodes to query
564     @type instance: string
565     @param instance: the instance name
566     @type hname: string
567     @param hname: the hypervisor type of the instance
568
569     """
570     return self._SingleNodeCall(node, "instance_info", [instance, hname])
571
572   def call_instance_migratable(self, node, instance):
573     """Checks whether the given instance can be migrated.
574
575     This is a single-node call.
576
577     @param node: the node to query
578     @type instance: L{objects.Instance}
579     @param instance: the instance to check
580
581
582     """
583     return self._SingleNodeCall(node, "instance_migratable",
584                                 [self._InstDict(instance)])
585
586   def call_all_instances_info(self, node_list, hypervisor_list):
587     """Returns information about all instances on the given nodes.
588
589     This is a multi-node call.
590
591     @type node_list: list
592     @param node_list: the list of nodes to query
593     @type hypervisor_list: list
594     @param hypervisor_list: the hypervisors to query for instances
595
596     """
597     return self._MultiNodeCall(node_list, "all_instances_info",
598                                [hypervisor_list])
599
600   def call_instance_list(self, node_list, hypervisor_list):
601     """Returns the list of running instances on a given node.
602
603     This is a multi-node call.
604
605     @type node_list: list
606     @param node_list: the list of nodes to query
607     @type hypervisor_list: list
608     @param hypervisor_list: the hypervisors to query for instances
609
610     """
611     return self._MultiNodeCall(node_list, "instance_list", [hypervisor_list])
612
613   def call_node_tcp_ping(self, node, source, target, port, timeout,
614                          live_port_needed):
615     """Do a TcpPing on the remote node
616
617     This is a single-node call.
618
619     """
620     return self._SingleNodeCall(node, "node_tcp_ping",
621                                 [source, target, port, timeout,
622                                  live_port_needed])
623
624   def call_node_has_ip_address(self, node, address):
625     """Checks if a node has the given IP address.
626
627     This is a single-node call.
628
629     """
630     return self._SingleNodeCall(node, "node_has_ip_address", [address])
631
632   def call_node_info(self, node_list, vg_name, hypervisor_type):
633     """Return node information.
634
635     This will return memory information and volume group size and free
636     space.
637
638     This is a multi-node call.
639
640     @type node_list: list
641     @param node_list: the list of nodes to query
642     @type vg_name: C{string}
643     @param vg_name: the name of the volume group to ask for disk space
644         information
645     @type hypervisor_type: C{str}
646     @param hypervisor_type: the name of the hypervisor to ask for
647         memory information
648
649     """
650     retux = self._MultiNodeCall(node_list, "node_info",
651                                 [vg_name, hypervisor_type])
652
653     for result in retux.itervalues():
654       if result.failed or not isinstance(result.data, dict):
655         result.data = {}
656       if result.offline:
657         log_name = None
658       else:
659         log_name = "call_node_info"
660
661       utils.CheckDict(result.data, {
662         'memory_total' : '-',
663         'memory_dom0' : '-',
664         'memory_free' : '-',
665         'vg_size' : 'node_unreachable',
666         'vg_free' : '-',
667         }, log_name)
668     return retux
669
670   def call_node_add(self, node, dsa, dsapub, rsa, rsapub, ssh, sshpub):
671     """Add a node to the cluster.
672
673     This is a single-node call.
674
675     """
676     return self._SingleNodeCall(node, "node_add",
677                                 [dsa, dsapub, rsa, rsapub, ssh, sshpub])
678
679   def call_node_verify(self, node_list, checkdict, cluster_name):
680     """Request verification of given parameters.
681
682     This is a multi-node call.
683
684     """
685     return self._MultiNodeCall(node_list, "node_verify",
686                                [checkdict, cluster_name])
687
688   @classmethod
689   def call_node_start_master(cls, node, start_daemons):
690     """Tells a node to activate itself as a master.
691
692     This is a single-node call.
693
694     """
695     return cls._StaticSingleNodeCall(node, "node_start_master",
696                                      [start_daemons])
697
698   @classmethod
699   def call_node_stop_master(cls, node, stop_daemons):
700     """Tells a node to demote itself from master status.
701
702     This is a single-node call.
703
704     """
705     return cls._StaticSingleNodeCall(node, "node_stop_master", [stop_daemons])
706
707   @classmethod
708   def call_master_info(cls, node_list):
709     """Query master info.
710
711     This is a multi-node call.
712
713     """
714     # TODO: should this method query down nodes?
715     return cls._StaticMultiNodeCall(node_list, "master_info", [])
716
717   def call_version(self, node_list):
718     """Query node version.
719
720     This is a multi-node call.
721
722     """
723     return self._MultiNodeCall(node_list, "version", [])
724
725   def call_blockdev_create(self, node, bdev, size, owner, on_primary, info):
726     """Request creation of a given block device.
727
728     This is a single-node call.
729
730     """
731     return self._SingleNodeCall(node, "blockdev_create",
732                                 [bdev.ToDict(), size, owner, on_primary, info])
733
734   def call_blockdev_remove(self, node, bdev):
735     """Request removal of a given block device.
736
737     This is a single-node call.
738
739     """
740     return self._SingleNodeCall(node, "blockdev_remove", [bdev.ToDict()])
741
742   def call_blockdev_rename(self, node, devlist):
743     """Request rename of the given block devices.
744
745     This is a single-node call.
746
747     """
748     return self._SingleNodeCall(node, "blockdev_rename",
749                                 [(d.ToDict(), uid) for d, uid in devlist])
750
751   def call_blockdev_assemble(self, node, disk, owner, on_primary):
752     """Request assembling of a given block device.
753
754     This is a single-node call.
755
756     """
757     return self._SingleNodeCall(node, "blockdev_assemble",
758                                 [disk.ToDict(), owner, on_primary])
759
760   def call_blockdev_shutdown(self, node, disk):
761     """Request shutdown of a given block device.
762
763     This is a single-node call.
764
765     """
766     return self._SingleNodeCall(node, "blockdev_shutdown", [disk.ToDict()])
767
768   def call_blockdev_addchildren(self, node, bdev, ndevs):
769     """Request adding a list of children to a (mirroring) device.
770
771     This is a single-node call.
772
773     """
774     return self._SingleNodeCall(node, "blockdev_addchildren",
775                                 [bdev.ToDict(),
776                                  [disk.ToDict() for disk in ndevs]])
777
778   def call_blockdev_removechildren(self, node, bdev, ndevs):
779     """Request removing a list of children from a (mirroring) device.
780
781     This is a single-node call.
782
783     """
784     return self._SingleNodeCall(node, "blockdev_removechildren",
785                                 [bdev.ToDict(),
786                                  [disk.ToDict() for disk in ndevs]])
787
788   def call_blockdev_getmirrorstatus(self, node, disks):
789     """Request status of a (mirroring) device.
790
791     This is a single-node call.
792
793     """
794     return self._SingleNodeCall(node, "blockdev_getmirrorstatus",
795                                 [dsk.ToDict() for dsk in disks])
796
797   def call_blockdev_find(self, node, disk):
798     """Request identification of a given block device.
799
800     This is a single-node call.
801
802     """
803     return self._SingleNodeCall(node, "blockdev_find", [disk.ToDict()])
804
805   def call_blockdev_close(self, node, instance_name, disks):
806     """Closes the given block devices.
807
808     This is a single-node call.
809
810     """
811     params = [instance_name, [cf.ToDict() for cf in disks]]
812     return self._SingleNodeCall(node, "blockdev_close", params)
813
814   def call_drbd_disconnect_net(self, node_list, nodes_ip, disks):
815     """Disconnects the network of the given drbd devices.
816
817     This is a multi-node call.
818
819     """
820     return self._MultiNodeCall(node_list, "drbd_disconnect_net",
821                                [nodes_ip, [cf.ToDict() for cf in disks]])
822
823   def call_drbd_attach_net(self, node_list, nodes_ip,
824                            disks, instance_name, multimaster):
825     """Disconnects the given drbd devices.
826
827     This is a multi-node call.
828
829     """
830     return self._MultiNodeCall(node_list, "drbd_attach_net",
831                                [nodes_ip, [cf.ToDict() for cf in disks],
832                                 instance_name, multimaster])
833
834   def call_drbd_wait_sync(self, node_list, nodes_ip, disks):
835     """Waits for the synchronization of drbd devices is complete.
836
837     This is a multi-node call.
838
839     """
840     return self._MultiNodeCall(node_list, "drbd_wait_sync",
841                                [nodes_ip, [cf.ToDict() for cf in disks]])
842
843   @classmethod
844   def call_upload_file(cls, node_list, file_name, address_list=None):
845     """Upload a file.
846
847     The node will refuse the operation in case the file is not on the
848     approved file list.
849
850     This is a multi-node call.
851
852     @type node_list: list
853     @param node_list: the list of node names to upload to
854     @type file_name: str
855     @param file_name: the filename to upload
856     @type address_list: list or None
857     @keyword address_list: an optional list of node addresses, in order
858         to optimize the RPC speed
859
860     """
861     file_contents = utils.ReadFile(file_name)
862     data = cls._Compress(file_contents)
863     st = os.stat(file_name)
864     params = [file_name, data, st.st_mode, st.st_uid, st.st_gid,
865               st.st_atime, st.st_mtime]
866     return cls._StaticMultiNodeCall(node_list, "upload_file", params,
867                                     address_list=address_list)
868
869   @classmethod
870   def call_write_ssconf_files(cls, node_list, values):
871     """Write ssconf files.
872
873     This is a multi-node call.
874
875     """
876     return cls._StaticMultiNodeCall(node_list, "write_ssconf_files", [values])
877
878   def call_os_diagnose(self, node_list):
879     """Request a diagnose of OS definitions.
880
881     This is a multi-node call.
882
883     """
884     result = self._MultiNodeCall(node_list, "os_diagnose", [])
885
886     for node_result in result.values():
887       if not node_result.failed and node_result.data:
888         node_result.data = [objects.OS.FromDict(oss)
889                             for oss in node_result.data]
890     return result
891
892   def call_os_get(self, node, name):
893     """Returns an OS definition.
894
895     This is a single-node call.
896
897     """
898     result = self._SingleNodeCall(node, "os_get", [name])
899     if not result.failed and isinstance(result.data, dict):
900       result.data = objects.OS.FromDict(result.data)
901     return result
902
903   def call_hooks_runner(self, node_list, hpath, phase, env):
904     """Call the hooks runner.
905
906     Args:
907       - op: the OpCode instance
908       - env: a dictionary with the environment
909
910     This is a multi-node call.
911
912     """
913     params = [hpath, phase, env]
914     return self._MultiNodeCall(node_list, "hooks_runner", params)
915
916   def call_iallocator_runner(self, node, name, idata):
917     """Call an iallocator on a remote node
918
919     Args:
920       - name: the iallocator name
921       - input: the json-encoded input string
922
923     This is a single-node call.
924
925     """
926     return self._SingleNodeCall(node, "iallocator_runner", [name, idata])
927
928   def call_blockdev_grow(self, node, cf_bdev, amount):
929     """Request a snapshot of the given block device.
930
931     This is a single-node call.
932
933     """
934     return self._SingleNodeCall(node, "blockdev_grow",
935                                 [cf_bdev.ToDict(), amount])
936
937   def call_blockdev_snapshot(self, node, cf_bdev):
938     """Request a snapshot of the given block device.
939
940     This is a single-node call.
941
942     """
943     return self._SingleNodeCall(node, "blockdev_snapshot", [cf_bdev.ToDict()])
944
945   def call_snapshot_export(self, node, snap_bdev, dest_node, instance,
946                            cluster_name, idx):
947     """Request the export of a given snapshot.
948
949     This is a single-node call.
950
951     """
952     return self._SingleNodeCall(node, "snapshot_export",
953                                 [snap_bdev.ToDict(), dest_node,
954                                  self._InstDict(instance), cluster_name, idx])
955
956   def call_finalize_export(self, node, instance, snap_disks):
957     """Request the completion of an export operation.
958
959     This writes the export config file, etc.
960
961     This is a single-node call.
962
963     """
964     flat_disks = []
965     for disk in snap_disks:
966       flat_disks.append(disk.ToDict())
967
968     return self._SingleNodeCall(node, "finalize_export",
969                                 [self._InstDict(instance), flat_disks])
970
971   def call_export_info(self, node, path):
972     """Queries the export information in a given path.
973
974     This is a single-node call.
975
976     """
977     return self._SingleNodeCall(node, "export_info", [path])
978
979   def call_instance_os_import(self, node, inst, src_node, src_images,
980                               cluster_name):
981     """Request the import of a backup into an instance.
982
983     This is a single-node call.
984
985     """
986     return self._SingleNodeCall(node, "instance_os_import",
987                                 [self._InstDict(inst), src_node, src_images,
988                                  cluster_name])
989
990   def call_export_list(self, node_list):
991     """Gets the stored exports list.
992
993     This is a multi-node call.
994
995     """
996     return self._MultiNodeCall(node_list, "export_list", [])
997
998   def call_export_remove(self, node, export):
999     """Requests removal of a given export.
1000
1001     This is a single-node call.
1002
1003     """
1004     return self._SingleNodeCall(node, "export_remove", [export])
1005
1006   @classmethod
1007   def call_node_leave_cluster(cls, node):
1008     """Requests a node to clean the cluster information it has.
1009
1010     This will remove the configuration information from the ganeti data
1011     dir.
1012
1013     This is a single-node call.
1014
1015     """
1016     return cls._StaticSingleNodeCall(node, "node_leave_cluster", [])
1017
1018   def call_node_volumes(self, node_list):
1019     """Gets all volumes on node(s).
1020
1021     This is a multi-node call.
1022
1023     """
1024     return self._MultiNodeCall(node_list, "node_volumes", [])
1025
1026   def call_node_demote_from_mc(self, node):
1027     """Demote a node from the master candidate role.
1028
1029     This is a single-node call.
1030
1031     """
1032     return self._SingleNodeCall(node, "node_demote_from_mc", [])
1033
1034
1035   def call_node_powercycle(self, node, hypervisor):
1036     """Tries to powercycle a node.
1037
1038     This is a single-node call.
1039
1040     """
1041     return self._SingleNodeCall(node, "node_powercycle", [hypervisor])
1042
1043
1044   def call_test_delay(self, node_list, duration):
1045     """Sleep for a fixed time on given node(s).
1046
1047     This is a multi-node call.
1048
1049     """
1050     return self._MultiNodeCall(node_list, "test_delay", [duration])
1051
1052   def call_file_storage_dir_create(self, node, file_storage_dir):
1053     """Create the given file storage directory.
1054
1055     This is a single-node call.
1056
1057     """
1058     return self._SingleNodeCall(node, "file_storage_dir_create",
1059                                 [file_storage_dir])
1060
1061   def call_file_storage_dir_remove(self, node, file_storage_dir):
1062     """Remove the given file storage directory.
1063
1064     This is a single-node call.
1065
1066     """
1067     return self._SingleNodeCall(node, "file_storage_dir_remove",
1068                                 [file_storage_dir])
1069
1070   def call_file_storage_dir_rename(self, node, old_file_storage_dir,
1071                                    new_file_storage_dir):
1072     """Rename file storage directory.
1073
1074     This is a single-node call.
1075
1076     """
1077     return self._SingleNodeCall(node, "file_storage_dir_rename",
1078                                 [old_file_storage_dir, new_file_storage_dir])
1079
1080   @classmethod
1081   def call_jobqueue_update(cls, node_list, address_list, file_name, content):
1082     """Update job queue.
1083
1084     This is a multi-node call.
1085
1086     """
1087     return cls._StaticMultiNodeCall(node_list, "jobqueue_update",
1088                                     [file_name, cls._Compress(content)],
1089                                     address_list=address_list)
1090
1091   @classmethod
1092   def call_jobqueue_purge(cls, node):
1093     """Purge job queue.
1094
1095     This is a single-node call.
1096
1097     """
1098     return cls._StaticSingleNodeCall(node, "jobqueue_purge", [])
1099
1100   @classmethod
1101   def call_jobqueue_rename(cls, node_list, address_list, rename):
1102     """Rename a job queue file.
1103
1104     This is a multi-node call.
1105
1106     """
1107     return cls._StaticMultiNodeCall(node_list, "jobqueue_rename", rename,
1108                                     address_list=address_list)
1109
1110   @classmethod
1111   def call_jobqueue_set_drain(cls, node_list, drain_flag):
1112     """Set the drain flag on the queue.
1113
1114     This is a multi-node call.
1115
1116     @type node_list: list
1117     @param node_list: the list of nodes to query
1118     @type drain_flag: bool
1119     @param drain_flag: if True, will set the drain flag, otherwise reset it.
1120
1121     """
1122     return cls._StaticMultiNodeCall(node_list, "jobqueue_set_drain",
1123                                     [drain_flag])
1124
1125   def call_hypervisor_validate_params(self, node_list, hvname, hvparams):
1126     """Validate the hypervisor params.
1127
1128     This is a multi-node call.
1129
1130     @type node_list: list
1131     @param node_list: the list of nodes to query
1132     @type hvname: string
1133     @param hvname: the hypervisor name
1134     @type hvparams: dict
1135     @param hvparams: the hypervisor parameters to be validated
1136
1137     """
1138     cluster = self._cfg.GetClusterInfo()
1139     hv_full = objects.FillDict(cluster.hvparams.get(hvname, {}), hvparams)
1140     return self._MultiNodeCall(node_list, "hypervisor_validate_params",
1141                                [hvname, hv_full])