Merge branch 'devel-2.1'
[ganeti-local] / lib / rpc.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Inter-node RPC library.
23
24 """
25
26 # pylint: disable-msg=C0103,R0201,R0904
27 # C0103: Invalid name, since call_ are not valid
28 # R0201: Method could be a function, we keep all rpcs instance methods
29 # as not to change them back and forth between static/instance methods
30 # if they need to start using instance attributes
31 # R0904: Too many public methods
32
33 import os
34 import logging
35 import zlib
36 import base64
37
38 from ganeti import utils
39 from ganeti import objects
40 from ganeti import http
41 from ganeti import serializer
42 from ganeti import constants
43 from ganeti import errors
44
45 # pylint has a bug here, doesn't see this import
46 import ganeti.http.client  # pylint: disable-msg=W0611
47
48
49 # Module level variable
50 _http_manager = None
51
52
53 def Init():
54   """Initializes the module-global HTTP client manager.
55
56   Must be called before using any RPC function.
57
58   """
59   global _http_manager # pylint: disable-msg=W0603
60
61   assert not _http_manager, "RPC module initialized more than once"
62
63   http.InitSsl()
64
65   _http_manager = http.client.HttpClientManager()
66
67
68 def Shutdown():
69   """Stops the module-global HTTP client manager.
70
71   Must be called before quitting the program.
72
73   """
74   global _http_manager # pylint: disable-msg=W0603
75
76   if _http_manager:
77     _http_manager.Shutdown()
78     _http_manager = None
79
80
81 class RpcResult(object):
82   """RPC Result class.
83
84   This class holds an RPC result. It is needed since in multi-node
85   calls we can't raise an exception just because one one out of many
86   failed, and therefore we use this class to encapsulate the result.
87
88   @ivar data: the data payload, for successful results, or None
89   @ivar call: the name of the RPC call
90   @ivar node: the name of the node to which we made the call
91   @ivar offline: whether the operation failed because the node was
92       offline, as opposed to actual failure; offline=True will always
93       imply failed=True, in order to allow simpler checking if
94       the user doesn't care about the exact failure mode
95   @ivar fail_msg: the error message if the call failed
96
97   """
98   def __init__(self, data=None, failed=False, offline=False,
99                call=None, node=None):
100     self.offline = offline
101     self.call = call
102     self.node = node
103
104     if offline:
105       self.fail_msg = "Node is marked offline"
106       self.data = self.payload = None
107     elif failed:
108       self.fail_msg = self._EnsureErr(data)
109       self.data = self.payload = None
110     else:
111       self.data = data
112       if not isinstance(self.data, (tuple, list)):
113         self.fail_msg = ("RPC layer error: invalid result type (%s)" %
114                          type(self.data))
115         self.payload = None
116       elif len(data) != 2:
117         self.fail_msg = ("RPC layer error: invalid result length (%d), "
118                          "expected 2" % len(self.data))
119         self.payload = None
120       elif not self.data[0]:
121         self.fail_msg = self._EnsureErr(self.data[1])
122         self.payload = None
123       else:
124         # finally success
125         self.fail_msg = None
126         self.payload = data[1]
127
128     assert hasattr(self, "call")
129     assert hasattr(self, "data")
130     assert hasattr(self, "fail_msg")
131     assert hasattr(self, "node")
132     assert hasattr(self, "offline")
133     assert hasattr(self, "payload")
134
135   @staticmethod
136   def _EnsureErr(val):
137     """Helper to ensure we return a 'True' value for error."""
138     if val:
139       return val
140     else:
141       return "No error information"
142
143   def Raise(self, msg, prereq=False, ecode=None):
144     """If the result has failed, raise an OpExecError.
145
146     This is used so that LU code doesn't have to check for each
147     result, but instead can call this function.
148
149     """
150     if not self.fail_msg:
151       return
152
153     if not msg: # one could pass None for default message
154       msg = ("Call '%s' to node '%s' has failed: %s" %
155              (self.call, self.node, self.fail_msg))
156     else:
157       msg = "%s: %s" % (msg, self.fail_msg)
158     if prereq:
159       ec = errors.OpPrereqError
160     else:
161       ec = errors.OpExecError
162     if ecode is not None:
163       args = (msg, prereq)
164     else:
165       args = (msg, )
166     raise ec(*args) # pylint: disable-msg=W0142
167
168
169 class Client:
170   """RPC Client class.
171
172   This class, given a (remote) method name, a list of parameters and a
173   list of nodes, will contact (in parallel) all nodes, and return a
174   dict of results (key: node name, value: result).
175
176   One current bug is that generic failure is still signaled by
177   'False' result, which is not good. This overloading of values can
178   cause bugs.
179
180   """
181   def __init__(self, procedure, body, port):
182     self.procedure = procedure
183     self.body = body
184     self.port = port
185     self.nc = {}
186
187     self._ssl_params = \
188       http.HttpSslParams(ssl_key_path=constants.NODED_CERT_FILE,
189                          ssl_cert_path=constants.NODED_CERT_FILE)
190
191   def ConnectList(self, node_list, address_list=None):
192     """Add a list of nodes to the target nodes.
193
194     @type node_list: list
195     @param node_list: the list of node names to connect
196     @type address_list: list or None
197     @keyword address_list: either None or a list with node addresses,
198         which must have the same length as the node list
199
200     """
201     if address_list is None:
202       address_list = [None for _ in node_list]
203     else:
204       assert len(node_list) == len(address_list), \
205              "Name and address lists should have the same length"
206     for node, address in zip(node_list, address_list):
207       self.ConnectNode(node, address)
208
209   def ConnectNode(self, name, address=None):
210     """Add a node to the target list.
211
212     @type name: str
213     @param name: the node name
214     @type address: str
215     @keyword address: the node address, if known
216
217     """
218     if address is None:
219       address = name
220
221     self.nc[name] = \
222       http.client.HttpClientRequest(address, self.port, http.HTTP_PUT,
223                                     "/%s" % self.procedure,
224                                     post_data=self.body,
225                                     ssl_params=self._ssl_params,
226                                     ssl_verify_peer=True)
227
228   def GetResults(self):
229     """Call nodes and return results.
230
231     @rtype: list
232     @return: List of RPC results
233
234     """
235     assert _http_manager, "RPC module not initialized"
236
237     _http_manager.ExecRequests(self.nc.values())
238
239     results = {}
240
241     for name, req in self.nc.iteritems():
242       if req.success and req.resp_status_code == http.HTTP_OK:
243         results[name] = RpcResult(data=serializer.LoadJson(req.resp_body),
244                                   node=name, call=self.procedure)
245         continue
246
247       # TODO: Better error reporting
248       if req.error:
249         msg = req.error
250       else:
251         msg = req.resp_body
252
253       logging.error("RPC error in %s from node %s: %s",
254                     self.procedure, name, msg)
255       results[name] = RpcResult(data=msg, failed=True, node=name,
256                                 call=self.procedure)
257
258     return results
259
260
261 class RpcRunner(object):
262   """RPC runner class"""
263
264   def __init__(self, cfg):
265     """Initialized the rpc runner.
266
267     @type cfg:  C{config.ConfigWriter}
268     @param cfg: the configuration object that will be used to get data
269                 about the cluster
270
271     """
272     self._cfg = cfg
273     self.port = utils.GetDaemonPort(constants.NODED)
274
275   def _InstDict(self, instance, hvp=None, bep=None):
276     """Convert the given instance to a dict.
277
278     This is done via the instance's ToDict() method and additionally
279     we fill the hvparams with the cluster defaults.
280
281     @type instance: L{objects.Instance}
282     @param instance: an Instance object
283     @type hvp: dict or None
284     @param hvp: a dictionary with overridden hypervisor parameters
285     @type bep: dict or None
286     @param bep: a dictionary with overridden backend parameters
287     @rtype: dict
288     @return: the instance dict, with the hvparams filled with the
289         cluster defaults
290
291     """
292     idict = instance.ToDict()
293     cluster = self._cfg.GetClusterInfo()
294     idict["hvparams"] = cluster.FillHV(instance)
295     if hvp is not None:
296       idict["hvparams"].update(hvp)
297     idict["beparams"] = cluster.FillBE(instance)
298     if bep is not None:
299       idict["beparams"].update(bep)
300     for nic in idict["nics"]:
301       nic['nicparams'] = objects.FillDict(
302         cluster.nicparams[constants.PP_DEFAULT],
303         nic['nicparams'])
304     return idict
305
306   def _ConnectList(self, client, node_list, call):
307     """Helper for computing node addresses.
308
309     @type client: L{ganeti.rpc.Client}
310     @param client: a C{Client} instance
311     @type node_list: list
312     @param node_list: the node list we should connect
313     @type call: string
314     @param call: the name of the remote procedure call, for filling in
315         correctly any eventual offline nodes' results
316
317     """
318     all_nodes = self._cfg.GetAllNodesInfo()
319     name_list = []
320     addr_list = []
321     skip_dict = {}
322     for node in node_list:
323       if node in all_nodes:
324         if all_nodes[node].offline:
325           skip_dict[node] = RpcResult(node=node, offline=True, call=call)
326           continue
327         val = all_nodes[node].primary_ip
328       else:
329         val = None
330       addr_list.append(val)
331       name_list.append(node)
332     if name_list:
333       client.ConnectList(name_list, address_list=addr_list)
334     return skip_dict
335
336   def _ConnectNode(self, client, node, call):
337     """Helper for computing one node's address.
338
339     @type client: L{ganeti.rpc.Client}
340     @param client: a C{Client} instance
341     @type node: str
342     @param node: the node we should connect
343     @type call: string
344     @param call: the name of the remote procedure call, for filling in
345         correctly any eventual offline nodes' results
346
347     """
348     node_info = self._cfg.GetNodeInfo(node)
349     if node_info is not None:
350       if node_info.offline:
351         return RpcResult(node=node, offline=True, call=call)
352       addr = node_info.primary_ip
353     else:
354       addr = None
355     client.ConnectNode(node, address=addr)
356
357   def _MultiNodeCall(self, node_list, procedure, args):
358     """Helper for making a multi-node call
359
360     """
361     body = serializer.DumpJson(args, indent=False)
362     c = Client(procedure, body, self.port)
363     skip_dict = self._ConnectList(c, node_list, procedure)
364     skip_dict.update(c.GetResults())
365     return skip_dict
366
367   @classmethod
368   def _StaticMultiNodeCall(cls, node_list, procedure, args,
369                            address_list=None):
370     """Helper for making a multi-node static call
371
372     """
373     body = serializer.DumpJson(args, indent=False)
374     c = Client(procedure, body, utils.GetDaemonPort(constants.NODED))
375     c.ConnectList(node_list, address_list=address_list)
376     return c.GetResults()
377
378   def _SingleNodeCall(self, node, procedure, args):
379     """Helper for making a single-node call
380
381     """
382     body = serializer.DumpJson(args, indent=False)
383     c = Client(procedure, body, self.port)
384     result = self._ConnectNode(c, node, procedure)
385     if result is None:
386       # we did connect, node is not offline
387       result = c.GetResults()[node]
388     return result
389
390   @classmethod
391   def _StaticSingleNodeCall(cls, node, procedure, args):
392     """Helper for making a single-node static call
393
394     """
395     body = serializer.DumpJson(args, indent=False)
396     c = Client(procedure, body, utils.GetDaemonPort(constants.NODED))
397     c.ConnectNode(node)
398     return c.GetResults()[node]
399
400   @staticmethod
401   def _Compress(data):
402     """Compresses a string for transport over RPC.
403
404     Small amounts of data are not compressed.
405
406     @type data: str
407     @param data: Data
408     @rtype: tuple
409     @return: Encoded data to send
410
411     """
412     # Small amounts of data are not compressed
413     if len(data) < 512:
414       return (constants.RPC_ENCODING_NONE, data)
415
416     # Compress with zlib and encode in base64
417     return (constants.RPC_ENCODING_ZLIB_BASE64,
418             base64.b64encode(zlib.compress(data, 3)))
419
420   #
421   # Begin RPC calls
422   #
423
424   def call_lv_list(self, node_list, vg_name):
425     """Gets the logical volumes present in a given volume group.
426
427     This is a multi-node call.
428
429     """
430     return self._MultiNodeCall(node_list, "lv_list", [vg_name])
431
432   def call_vg_list(self, node_list):
433     """Gets the volume group list.
434
435     This is a multi-node call.
436
437     """
438     return self._MultiNodeCall(node_list, "vg_list", [])
439
440   def call_storage_list(self, node_list, su_name, su_args, name, fields):
441     """Get list of storage units.
442
443     This is a multi-node call.
444
445     """
446     return self._MultiNodeCall(node_list, "storage_list",
447                                [su_name, su_args, name, fields])
448
449   def call_storage_modify(self, node, su_name, su_args, name, changes):
450     """Modify a storage unit.
451
452     This is a single-node call.
453
454     """
455     return self._SingleNodeCall(node, "storage_modify",
456                                 [su_name, su_args, name, changes])
457
458   def call_storage_execute(self, node, su_name, su_args, name, op):
459     """Executes an operation on a storage unit.
460
461     This is a single-node call.
462
463     """
464     return self._SingleNodeCall(node, "storage_execute",
465                                 [su_name, su_args, name, op])
466
467   def call_bridges_exist(self, node, bridges_list):
468     """Checks if a node has all the bridges given.
469
470     This method checks if all bridges given in the bridges_list are
471     present on the remote node, so that an instance that uses interfaces
472     on those bridges can be started.
473
474     This is a single-node call.
475
476     """
477     return self._SingleNodeCall(node, "bridges_exist", [bridges_list])
478
479   def call_instance_start(self, node, instance, hvp, bep):
480     """Starts an instance.
481
482     This is a single-node call.
483
484     """
485     idict = self._InstDict(instance, hvp=hvp, bep=bep)
486     return self._SingleNodeCall(node, "instance_start", [idict])
487
488   def call_instance_shutdown(self, node, instance, timeout):
489     """Stops an instance.
490
491     This is a single-node call.
492
493     """
494     return self._SingleNodeCall(node, "instance_shutdown",
495                                 [self._InstDict(instance), timeout])
496
497   def call_migration_info(self, node, instance):
498     """Gather the information necessary to prepare an instance migration.
499
500     This is a single-node call.
501
502     @type node: string
503     @param node: the node on which the instance is currently running
504     @type instance: C{objects.Instance}
505     @param instance: the instance definition
506
507     """
508     return self._SingleNodeCall(node, "migration_info",
509                                 [self._InstDict(instance)])
510
511   def call_accept_instance(self, node, instance, info, target):
512     """Prepare a node to accept an instance.
513
514     This is a single-node call.
515
516     @type node: string
517     @param node: the target node for the migration
518     @type instance: C{objects.Instance}
519     @param instance: the instance definition
520     @type info: opaque/hypervisor specific (string/data)
521     @param info: result for the call_migration_info call
522     @type target: string
523     @param target: target hostname (usually ip address) (on the node itself)
524
525     """
526     return self._SingleNodeCall(node, "accept_instance",
527                                 [self._InstDict(instance), info, target])
528
529   def call_finalize_migration(self, node, instance, info, success):
530     """Finalize any target-node migration specific operation.
531
532     This is called both in case of a successful migration and in case of error
533     (in which case it should abort the migration).
534
535     This is a single-node call.
536
537     @type node: string
538     @param node: the target node for the migration
539     @type instance: C{objects.Instance}
540     @param instance: the instance definition
541     @type info: opaque/hypervisor specific (string/data)
542     @param info: result for the call_migration_info call
543     @type success: boolean
544     @param success: whether the migration was a success or a failure
545
546     """
547     return self._SingleNodeCall(node, "finalize_migration",
548                                 [self._InstDict(instance), info, success])
549
550   def call_instance_migrate(self, node, instance, target, live):
551     """Migrate an instance.
552
553     This is a single-node call.
554
555     @type node: string
556     @param node: the node on which the instance is currently running
557     @type instance: C{objects.Instance}
558     @param instance: the instance definition
559     @type target: string
560     @param target: the target node name
561     @type live: boolean
562     @param live: whether the migration should be done live or not (the
563         interpretation of this parameter is left to the hypervisor)
564
565     """
566     return self._SingleNodeCall(node, "instance_migrate",
567                                 [self._InstDict(instance), target, live])
568
569   def call_instance_reboot(self, node, inst, reboot_type, shutdown_timeout):
570     """Reboots an instance.
571
572     This is a single-node call.
573
574     """
575     return self._SingleNodeCall(node, "instance_reboot",
576                                 [self._InstDict(inst), reboot_type,
577                                  shutdown_timeout])
578
579   def call_instance_os_add(self, node, inst, reinstall, debug):
580     """Installs an OS on the given instance.
581
582     This is a single-node call.
583
584     """
585     return self._SingleNodeCall(node, "instance_os_add",
586                                 [self._InstDict(inst), reinstall, debug])
587
588   def call_instance_run_rename(self, node, inst, old_name, debug):
589     """Run the OS rename script for an instance.
590
591     This is a single-node call.
592
593     """
594     return self._SingleNodeCall(node, "instance_run_rename",
595                                 [self._InstDict(inst), old_name, debug])
596
597   def call_instance_info(self, node, instance, hname):
598     """Returns information about a single instance.
599
600     This is a single-node call.
601
602     @type node: list
603     @param node: the list of nodes to query
604     @type instance: string
605     @param instance: the instance name
606     @type hname: string
607     @param hname: the hypervisor type of the instance
608
609     """
610     return self._SingleNodeCall(node, "instance_info", [instance, hname])
611
612   def call_instance_migratable(self, node, instance):
613     """Checks whether the given instance can be migrated.
614
615     This is a single-node call.
616
617     @param node: the node to query
618     @type instance: L{objects.Instance}
619     @param instance: the instance to check
620
621
622     """
623     return self._SingleNodeCall(node, "instance_migratable",
624                                 [self._InstDict(instance)])
625
626   def call_all_instances_info(self, node_list, hypervisor_list):
627     """Returns information about all instances on the given nodes.
628
629     This is a multi-node call.
630
631     @type node_list: list
632     @param node_list: the list of nodes to query
633     @type hypervisor_list: list
634     @param hypervisor_list: the hypervisors to query for instances
635
636     """
637     return self._MultiNodeCall(node_list, "all_instances_info",
638                                [hypervisor_list])
639
640   def call_instance_list(self, node_list, hypervisor_list):
641     """Returns the list of running instances on a given node.
642
643     This is a multi-node call.
644
645     @type node_list: list
646     @param node_list: the list of nodes to query
647     @type hypervisor_list: list
648     @param hypervisor_list: the hypervisors to query for instances
649
650     """
651     return self._MultiNodeCall(node_list, "instance_list", [hypervisor_list])
652
653   def call_node_tcp_ping(self, node, source, target, port, timeout,
654                          live_port_needed):
655     """Do a TcpPing on the remote node
656
657     This is a single-node call.
658
659     """
660     return self._SingleNodeCall(node, "node_tcp_ping",
661                                 [source, target, port, timeout,
662                                  live_port_needed])
663
664   def call_node_has_ip_address(self, node, address):
665     """Checks if a node has the given IP address.
666
667     This is a single-node call.
668
669     """
670     return self._SingleNodeCall(node, "node_has_ip_address", [address])
671
672   def call_node_info(self, node_list, vg_name, hypervisor_type):
673     """Return node information.
674
675     This will return memory information and volume group size and free
676     space.
677
678     This is a multi-node call.
679
680     @type node_list: list
681     @param node_list: the list of nodes to query
682     @type vg_name: C{string}
683     @param vg_name: the name of the volume group to ask for disk space
684         information
685     @type hypervisor_type: C{str}
686     @param hypervisor_type: the name of the hypervisor to ask for
687         memory information
688
689     """
690     return self._MultiNodeCall(node_list, "node_info",
691                                [vg_name, hypervisor_type])
692
693   def call_node_add(self, node, dsa, dsapub, rsa, rsapub, ssh, sshpub):
694     """Add a node to the cluster.
695
696     This is a single-node call.
697
698     """
699     return self._SingleNodeCall(node, "node_add",
700                                 [dsa, dsapub, rsa, rsapub, ssh, sshpub])
701
702   def call_node_verify(self, node_list, checkdict, cluster_name):
703     """Request verification of given parameters.
704
705     This is a multi-node call.
706
707     """
708     return self._MultiNodeCall(node_list, "node_verify",
709                                [checkdict, cluster_name])
710
711   @classmethod
712   def call_node_start_master(cls, node, start_daemons, no_voting):
713     """Tells a node to activate itself as a master.
714
715     This is a single-node call.
716
717     """
718     return cls._StaticSingleNodeCall(node, "node_start_master",
719                                      [start_daemons, no_voting])
720
721   @classmethod
722   def call_node_stop_master(cls, node, stop_daemons):
723     """Tells a node to demote itself from master status.
724
725     This is a single-node call.
726
727     """
728     return cls._StaticSingleNodeCall(node, "node_stop_master", [stop_daemons])
729
730   @classmethod
731   def call_master_info(cls, node_list):
732     """Query master info.
733
734     This is a multi-node call.
735
736     """
737     # TODO: should this method query down nodes?
738     return cls._StaticMultiNodeCall(node_list, "master_info", [])
739
740   @classmethod
741   def call_version(cls, node_list):
742     """Query node version.
743
744     This is a multi-node call.
745
746     """
747     return cls._StaticMultiNodeCall(node_list, "version", [])
748
749   def call_blockdev_create(self, node, bdev, size, owner, on_primary, info):
750     """Request creation of a given block device.
751
752     This is a single-node call.
753
754     """
755     return self._SingleNodeCall(node, "blockdev_create",
756                                 [bdev.ToDict(), size, owner, on_primary, info])
757
758   def call_blockdev_remove(self, node, bdev):
759     """Request removal of a given block device.
760
761     This is a single-node call.
762
763     """
764     return self._SingleNodeCall(node, "blockdev_remove", [bdev.ToDict()])
765
766   def call_blockdev_rename(self, node, devlist):
767     """Request rename of the given block devices.
768
769     This is a single-node call.
770
771     """
772     return self._SingleNodeCall(node, "blockdev_rename",
773                                 [(d.ToDict(), uid) for d, uid in devlist])
774
775   def call_blockdev_assemble(self, node, disk, owner, on_primary):
776     """Request assembling of a given block device.
777
778     This is a single-node call.
779
780     """
781     return self._SingleNodeCall(node, "blockdev_assemble",
782                                 [disk.ToDict(), owner, on_primary])
783
784   def call_blockdev_shutdown(self, node, disk):
785     """Request shutdown of a given block device.
786
787     This is a single-node call.
788
789     """
790     return self._SingleNodeCall(node, "blockdev_shutdown", [disk.ToDict()])
791
792   def call_blockdev_addchildren(self, node, bdev, ndevs):
793     """Request adding a list of children to a (mirroring) device.
794
795     This is a single-node call.
796
797     """
798     return self._SingleNodeCall(node, "blockdev_addchildren",
799                                 [bdev.ToDict(),
800                                  [disk.ToDict() for disk in ndevs]])
801
802   def call_blockdev_removechildren(self, node, bdev, ndevs):
803     """Request removing a list of children from a (mirroring) device.
804
805     This is a single-node call.
806
807     """
808     return self._SingleNodeCall(node, "blockdev_removechildren",
809                                 [bdev.ToDict(),
810                                  [disk.ToDict() for disk in ndevs]])
811
812   def call_blockdev_getmirrorstatus(self, node, disks):
813     """Request status of a (mirroring) device.
814
815     This is a single-node call.
816
817     """
818     result = self._SingleNodeCall(node, "blockdev_getmirrorstatus",
819                                   [dsk.ToDict() for dsk in disks])
820     if not result.fail_msg:
821       result.payload = [objects.BlockDevStatus.FromDict(i)
822                         for i in result.payload]
823     return result
824
825   def call_blockdev_find(self, node, disk):
826     """Request identification of a given block device.
827
828     This is a single-node call.
829
830     """
831     result = self._SingleNodeCall(node, "blockdev_find", [disk.ToDict()])
832     if not result.fail_msg and result.payload is not None:
833       result.payload = objects.BlockDevStatus.FromDict(result.payload)
834     return result
835
836   def call_blockdev_close(self, node, instance_name, disks):
837     """Closes the given block devices.
838
839     This is a single-node call.
840
841     """
842     params = [instance_name, [cf.ToDict() for cf in disks]]
843     return self._SingleNodeCall(node, "blockdev_close", params)
844
845   def call_blockdev_getsizes(self, node, disks):
846     """Returns the size of the given disks.
847
848     This is a single-node call.
849
850     """
851     params = [[cf.ToDict() for cf in disks]]
852     return self._SingleNodeCall(node, "blockdev_getsize", params)
853
854   def call_drbd_disconnect_net(self, node_list, nodes_ip, disks):
855     """Disconnects the network of the given drbd devices.
856
857     This is a multi-node call.
858
859     """
860     return self._MultiNodeCall(node_list, "drbd_disconnect_net",
861                                [nodes_ip, [cf.ToDict() for cf in disks]])
862
863   def call_drbd_attach_net(self, node_list, nodes_ip,
864                            disks, instance_name, multimaster):
865     """Disconnects the given drbd devices.
866
867     This is a multi-node call.
868
869     """
870     return self._MultiNodeCall(node_list, "drbd_attach_net",
871                                [nodes_ip, [cf.ToDict() for cf in disks],
872                                 instance_name, multimaster])
873
874   def call_drbd_wait_sync(self, node_list, nodes_ip, disks):
875     """Waits for the synchronization of drbd devices is complete.
876
877     This is a multi-node call.
878
879     """
880     return self._MultiNodeCall(node_list, "drbd_wait_sync",
881                                [nodes_ip, [cf.ToDict() for cf in disks]])
882
883   @classmethod
884   def call_upload_file(cls, node_list, file_name, address_list=None):
885     """Upload a file.
886
887     The node will refuse the operation in case the file is not on the
888     approved file list.
889
890     This is a multi-node call.
891
892     @type node_list: list
893     @param node_list: the list of node names to upload to
894     @type file_name: str
895     @param file_name: the filename to upload
896     @type address_list: list or None
897     @keyword address_list: an optional list of node addresses, in order
898         to optimize the RPC speed
899
900     """
901     file_contents = utils.ReadFile(file_name)
902     data = cls._Compress(file_contents)
903     st = os.stat(file_name)
904     params = [file_name, data, st.st_mode, st.st_uid, st.st_gid,
905               st.st_atime, st.st_mtime]
906     return cls._StaticMultiNodeCall(node_list, "upload_file", params,
907                                     address_list=address_list)
908
909   @classmethod
910   def call_write_ssconf_files(cls, node_list, values):
911     """Write ssconf files.
912
913     This is a multi-node call.
914
915     """
916     return cls._StaticMultiNodeCall(node_list, "write_ssconf_files", [values])
917
918   def call_os_diagnose(self, node_list):
919     """Request a diagnose of OS definitions.
920
921     This is a multi-node call.
922
923     """
924     return self._MultiNodeCall(node_list, "os_diagnose", [])
925
926   def call_os_get(self, node, name):
927     """Returns an OS definition.
928
929     This is a single-node call.
930
931     """
932     result = self._SingleNodeCall(node, "os_get", [name])
933     if not result.fail_msg and isinstance(result.payload, dict):
934       result.payload = objects.OS.FromDict(result.payload)
935     return result
936
937   def call_hooks_runner(self, node_list, hpath, phase, env):
938     """Call the hooks runner.
939
940     Args:
941       - op: the OpCode instance
942       - env: a dictionary with the environment
943
944     This is a multi-node call.
945
946     """
947     params = [hpath, phase, env]
948     return self._MultiNodeCall(node_list, "hooks_runner", params)
949
950   def call_iallocator_runner(self, node, name, idata):
951     """Call an iallocator on a remote node
952
953     Args:
954       - name: the iallocator name
955       - input: the json-encoded input string
956
957     This is a single-node call.
958
959     """
960     return self._SingleNodeCall(node, "iallocator_runner", [name, idata])
961
962   def call_blockdev_grow(self, node, cf_bdev, amount):
963     """Request a snapshot of the given block device.
964
965     This is a single-node call.
966
967     """
968     return self._SingleNodeCall(node, "blockdev_grow",
969                                 [cf_bdev.ToDict(), amount])
970
971   def call_blockdev_export(self, node, cf_bdev,
972                            dest_node, dest_path, cluster_name):
973     """Export a given disk to another node.
974
975     This is a single-node call.
976
977     """
978     return self._SingleNodeCall(node, "blockdev_export",
979                                 [cf_bdev.ToDict(), dest_node, dest_path,
980                                  cluster_name])
981
982   def call_blockdev_snapshot(self, node, cf_bdev):
983     """Request a snapshot of the given block device.
984
985     This is a single-node call.
986
987     """
988     return self._SingleNodeCall(node, "blockdev_snapshot", [cf_bdev.ToDict()])
989
990   def call_snapshot_export(self, node, snap_bdev, dest_node, instance,
991                            cluster_name, idx, debug):
992     """Request the export of a given snapshot.
993
994     This is a single-node call.
995
996     """
997     return self._SingleNodeCall(node, "snapshot_export",
998                                 [snap_bdev.ToDict(), dest_node,
999                                  self._InstDict(instance), cluster_name,
1000                                  idx, debug])
1001
1002   def call_finalize_export(self, node, instance, snap_disks):
1003     """Request the completion of an export operation.
1004
1005     This writes the export config file, etc.
1006
1007     This is a single-node call.
1008
1009     """
1010     flat_disks = []
1011     for disk in snap_disks:
1012       if isinstance(disk, bool):
1013         flat_disks.append(disk)
1014       else:
1015         flat_disks.append(disk.ToDict())
1016
1017     return self._SingleNodeCall(node, "finalize_export",
1018                                 [self._InstDict(instance), flat_disks])
1019
1020   def call_export_info(self, node, path):
1021     """Queries the export information in a given path.
1022
1023     This is a single-node call.
1024
1025     """
1026     return self._SingleNodeCall(node, "export_info", [path])
1027
1028   def call_instance_os_import(self, node, inst, src_node, src_images,
1029                               cluster_name, debug):
1030     """Request the import of a backup into an instance.
1031
1032     This is a single-node call.
1033
1034     """
1035     return self._SingleNodeCall(node, "instance_os_import",
1036                                 [self._InstDict(inst), src_node, src_images,
1037                                  cluster_name, debug])
1038
1039   def call_export_list(self, node_list):
1040     """Gets the stored exports list.
1041
1042     This is a multi-node call.
1043
1044     """
1045     return self._MultiNodeCall(node_list, "export_list", [])
1046
1047   def call_export_remove(self, node, export):
1048     """Requests removal of a given export.
1049
1050     This is a single-node call.
1051
1052     """
1053     return self._SingleNodeCall(node, "export_remove", [export])
1054
1055   @classmethod
1056   def call_node_leave_cluster(cls, node, modify_ssh_setup):
1057     """Requests a node to clean the cluster information it has.
1058
1059     This will remove the configuration information from the ganeti data
1060     dir.
1061
1062     This is a single-node call.
1063
1064     """
1065     return cls._StaticSingleNodeCall(node, "node_leave_cluster",
1066                                      [modify_ssh_setup])
1067
1068   def call_node_volumes(self, node_list):
1069     """Gets all volumes on node(s).
1070
1071     This is a multi-node call.
1072
1073     """
1074     return self._MultiNodeCall(node_list, "node_volumes", [])
1075
1076   def call_node_demote_from_mc(self, node):
1077     """Demote a node from the master candidate role.
1078
1079     This is a single-node call.
1080
1081     """
1082     return self._SingleNodeCall(node, "node_demote_from_mc", [])
1083
1084   def call_node_powercycle(self, node, hypervisor):
1085     """Tries to powercycle a node.
1086
1087     This is a single-node call.
1088
1089     """
1090     return self._SingleNodeCall(node, "node_powercycle", [hypervisor])
1091
1092   def call_test_delay(self, node_list, duration):
1093     """Sleep for a fixed time on given node(s).
1094
1095     This is a multi-node call.
1096
1097     """
1098     return self._MultiNodeCall(node_list, "test_delay", [duration])
1099
1100   def call_file_storage_dir_create(self, node, file_storage_dir):
1101     """Create the given file storage directory.
1102
1103     This is a single-node call.
1104
1105     """
1106     return self._SingleNodeCall(node, "file_storage_dir_create",
1107                                 [file_storage_dir])
1108
1109   def call_file_storage_dir_remove(self, node, file_storage_dir):
1110     """Remove the given file storage directory.
1111
1112     This is a single-node call.
1113
1114     """
1115     return self._SingleNodeCall(node, "file_storage_dir_remove",
1116                                 [file_storage_dir])
1117
1118   def call_file_storage_dir_rename(self, node, old_file_storage_dir,
1119                                    new_file_storage_dir):
1120     """Rename file storage directory.
1121
1122     This is a single-node call.
1123
1124     """
1125     return self._SingleNodeCall(node, "file_storage_dir_rename",
1126                                 [old_file_storage_dir, new_file_storage_dir])
1127
1128   @classmethod
1129   def call_jobqueue_update(cls, node_list, address_list, file_name, content):
1130     """Update job queue.
1131
1132     This is a multi-node call.
1133
1134     """
1135     return cls._StaticMultiNodeCall(node_list, "jobqueue_update",
1136                                     [file_name, cls._Compress(content)],
1137                                     address_list=address_list)
1138
1139   @classmethod
1140   def call_jobqueue_purge(cls, node):
1141     """Purge job queue.
1142
1143     This is a single-node call.
1144
1145     """
1146     return cls._StaticSingleNodeCall(node, "jobqueue_purge", [])
1147
1148   @classmethod
1149   def call_jobqueue_rename(cls, node_list, address_list, rename):
1150     """Rename a job queue file.
1151
1152     This is a multi-node call.
1153
1154     """
1155     return cls._StaticMultiNodeCall(node_list, "jobqueue_rename", rename,
1156                                     address_list=address_list)
1157
1158   @classmethod
1159   def call_jobqueue_set_drain(cls, node_list, drain_flag):
1160     """Set the drain flag on the queue.
1161
1162     This is a multi-node call.
1163
1164     @type node_list: list
1165     @param node_list: the list of nodes to query
1166     @type drain_flag: bool
1167     @param drain_flag: if True, will set the drain flag, otherwise reset it.
1168
1169     """
1170     return cls._StaticMultiNodeCall(node_list, "jobqueue_set_drain",
1171                                     [drain_flag])
1172
1173   def call_hypervisor_validate_params(self, node_list, hvname, hvparams):
1174     """Validate the hypervisor params.
1175
1176     This is a multi-node call.
1177
1178     @type node_list: list
1179     @param node_list: the list of nodes to query
1180     @type hvname: string
1181     @param hvname: the hypervisor name
1182     @type hvparams: dict
1183     @param hvparams: the hypervisor parameters to be validated
1184
1185     """
1186     cluster = self._cfg.GetClusterInfo()
1187     hv_full = objects.FillDict(cluster.hvparams.get(hvname, {}), hvparams)
1188     return self._MultiNodeCall(node_list, "hypervisor_validate_params",
1189                                [hvname, hv_full])
1190
1191   def call_create_x509_certificate(self, node, validity):
1192     """Creates a new X509 certificate for SSL/TLS.
1193
1194     This is a single-node call.
1195
1196     @type validity: int
1197     @param validity: Validity in seconds
1198
1199     """
1200     return self._SingleNodeCall(node, "create_x509_certificate", [validity])
1201
1202   def call_remove_x509_certificate(self, node, name):
1203     """Removes a X509 certificate.
1204
1205     This is a single-node call.
1206
1207     @type name: string
1208     @param name: Certificate name
1209
1210     """
1211     return self._SingleNodeCall(node, "remove_x509_certificate", [name])