Add RPC calls to import and export instance data
[ganeti-local] / lib / rpc.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Inter-node RPC library.
23
24 """
25
26 # pylint: disable-msg=C0103,R0201,R0904
27 # C0103: Invalid name, since call_ are not valid
28 # R0201: Method could be a function, we keep all rpcs instance methods
29 # as not to change them back and forth between static/instance methods
30 # if they need to start using instance attributes
31 # R0904: Too many public methods
32
33 import os
34 import logging
35 import zlib
36 import base64
37
38 from ganeti import utils
39 from ganeti import objects
40 from ganeti import http
41 from ganeti import serializer
42 from ganeti import constants
43 from ganeti import errors
44
45 # pylint has a bug here, doesn't see this import
46 import ganeti.http.client  # pylint: disable-msg=W0611
47
48
49 # Module level variable
50 _http_manager = None
51
52
53 def Init():
54   """Initializes the module-global HTTP client manager.
55
56   Must be called before using any RPC function.
57
58   """
59   global _http_manager # pylint: disable-msg=W0603
60
61   assert not _http_manager, "RPC module initialized more than once"
62
63   http.InitSsl()
64
65   _http_manager = http.client.HttpClientManager()
66
67
68 def Shutdown():
69   """Stops the module-global HTTP client manager.
70
71   Must be called before quitting the program.
72
73   """
74   global _http_manager # pylint: disable-msg=W0603
75
76   if _http_manager:
77     _http_manager.Shutdown()
78     _http_manager = None
79
80
81 class RpcResult(object):
82   """RPC Result class.
83
84   This class holds an RPC result. It is needed since in multi-node
85   calls we can't raise an exception just because one one out of many
86   failed, and therefore we use this class to encapsulate the result.
87
88   @ivar data: the data payload, for successful results, or None
89   @ivar call: the name of the RPC call
90   @ivar node: the name of the node to which we made the call
91   @ivar offline: whether the operation failed because the node was
92       offline, as opposed to actual failure; offline=True will always
93       imply failed=True, in order to allow simpler checking if
94       the user doesn't care about the exact failure mode
95   @ivar fail_msg: the error message if the call failed
96
97   """
98   def __init__(self, data=None, failed=False, offline=False,
99                call=None, node=None):
100     self.offline = offline
101     self.call = call
102     self.node = node
103
104     if offline:
105       self.fail_msg = "Node is marked offline"
106       self.data = self.payload = None
107     elif failed:
108       self.fail_msg = self._EnsureErr(data)
109       self.data = self.payload = None
110     else:
111       self.data = data
112       if not isinstance(self.data, (tuple, list)):
113         self.fail_msg = ("RPC layer error: invalid result type (%s)" %
114                          type(self.data))
115         self.payload = None
116       elif len(data) != 2:
117         self.fail_msg = ("RPC layer error: invalid result length (%d), "
118                          "expected 2" % len(self.data))
119         self.payload = None
120       elif not self.data[0]:
121         self.fail_msg = self._EnsureErr(self.data[1])
122         self.payload = None
123       else:
124         # finally success
125         self.fail_msg = None
126         self.payload = data[1]
127
128     assert hasattr(self, "call")
129     assert hasattr(self, "data")
130     assert hasattr(self, "fail_msg")
131     assert hasattr(self, "node")
132     assert hasattr(self, "offline")
133     assert hasattr(self, "payload")
134
135   @staticmethod
136   def _EnsureErr(val):
137     """Helper to ensure we return a 'True' value for error."""
138     if val:
139       return val
140     else:
141       return "No error information"
142
143   def Raise(self, msg, prereq=False, ecode=None):
144     """If the result has failed, raise an OpExecError.
145
146     This is used so that LU code doesn't have to check for each
147     result, but instead can call this function.
148
149     """
150     if not self.fail_msg:
151       return
152
153     if not msg: # one could pass None for default message
154       msg = ("Call '%s' to node '%s' has failed: %s" %
155              (self.call, self.node, self.fail_msg))
156     else:
157       msg = "%s: %s" % (msg, self.fail_msg)
158     if prereq:
159       ec = errors.OpPrereqError
160     else:
161       ec = errors.OpExecError
162     if ecode is not None:
163       args = (msg, prereq)
164     else:
165       args = (msg, )
166     raise ec(*args) # pylint: disable-msg=W0142
167
168
169 class Client:
170   """RPC Client class.
171
172   This class, given a (remote) method name, a list of parameters and a
173   list of nodes, will contact (in parallel) all nodes, and return a
174   dict of results (key: node name, value: result).
175
176   One current bug is that generic failure is still signaled by
177   'False' result, which is not good. This overloading of values can
178   cause bugs.
179
180   """
181   def __init__(self, procedure, body, port):
182     self.procedure = procedure
183     self.body = body
184     self.port = port
185     self.nc = {}
186
187     self._ssl_params = \
188       http.HttpSslParams(ssl_key_path=constants.NODED_CERT_FILE,
189                          ssl_cert_path=constants.NODED_CERT_FILE)
190
191   def ConnectList(self, node_list, address_list=None):
192     """Add a list of nodes to the target nodes.
193
194     @type node_list: list
195     @param node_list: the list of node names to connect
196     @type address_list: list or None
197     @keyword address_list: either None or a list with node addresses,
198         which must have the same length as the node list
199
200     """
201     if address_list is None:
202       address_list = [None for _ in node_list]
203     else:
204       assert len(node_list) == len(address_list), \
205              "Name and address lists should have the same length"
206     for node, address in zip(node_list, address_list):
207       self.ConnectNode(node, address)
208
209   def ConnectNode(self, name, address=None):
210     """Add a node to the target list.
211
212     @type name: str
213     @param name: the node name
214     @type address: str
215     @keyword address: the node address, if known
216
217     """
218     if address is None:
219       address = name
220
221     self.nc[name] = \
222       http.client.HttpClientRequest(address, self.port, http.HTTP_PUT,
223                                     "/%s" % self.procedure,
224                                     post_data=self.body,
225                                     ssl_params=self._ssl_params,
226                                     ssl_verify_peer=True)
227
228   def GetResults(self):
229     """Call nodes and return results.
230
231     @rtype: list
232     @return: List of RPC results
233
234     """
235     assert _http_manager, "RPC module not initialized"
236
237     _http_manager.ExecRequests(self.nc.values())
238
239     results = {}
240
241     for name, req in self.nc.iteritems():
242       if req.success and req.resp_status_code == http.HTTP_OK:
243         results[name] = RpcResult(data=serializer.LoadJson(req.resp_body),
244                                   node=name, call=self.procedure)
245         continue
246
247       # TODO: Better error reporting
248       if req.error:
249         msg = req.error
250       else:
251         msg = req.resp_body
252
253       logging.error("RPC error in %s from node %s: %s",
254                     self.procedure, name, msg)
255       results[name] = RpcResult(data=msg, failed=True, node=name,
256                                 call=self.procedure)
257
258     return results
259
260
261 def _EncodeImportExportIO(ieio, ieioargs):
262   """Encodes import/export I/O information.
263
264   """
265   if ieio == constants.IEIO_RAW_DISK:
266     assert len(ieioargs) == 1
267     return (ieioargs[0].ToDict(), )
268
269   if ieio == constants.IEIO_SCRIPT:
270     assert len(ieioargs) == 2
271     return (ieioargs[0].ToDict(), ieioargs[1])
272
273   return ieioargs
274
275
276 class RpcRunner(object):
277   """RPC runner class"""
278
279   def __init__(self, cfg):
280     """Initialized the rpc runner.
281
282     @type cfg:  C{config.ConfigWriter}
283     @param cfg: the configuration object that will be used to get data
284                 about the cluster
285
286     """
287     self._cfg = cfg
288     self.port = utils.GetDaemonPort(constants.NODED)
289
290   def _InstDict(self, instance, hvp=None, bep=None):
291     """Convert the given instance to a dict.
292
293     This is done via the instance's ToDict() method and additionally
294     we fill the hvparams with the cluster defaults.
295
296     @type instance: L{objects.Instance}
297     @param instance: an Instance object
298     @type hvp: dict or None
299     @param hvp: a dictionary with overridden hypervisor parameters
300     @type bep: dict or None
301     @param bep: a dictionary with overridden backend parameters
302     @rtype: dict
303     @return: the instance dict, with the hvparams filled with the
304         cluster defaults
305
306     """
307     idict = instance.ToDict()
308     cluster = self._cfg.GetClusterInfo()
309     idict["hvparams"] = cluster.FillHV(instance)
310     if hvp is not None:
311       idict["hvparams"].update(hvp)
312     idict["beparams"] = cluster.FillBE(instance)
313     if bep is not None:
314       idict["beparams"].update(bep)
315     for nic in idict["nics"]:
316       nic['nicparams'] = objects.FillDict(
317         cluster.nicparams[constants.PP_DEFAULT],
318         nic['nicparams'])
319     return idict
320
321   def _ConnectList(self, client, node_list, call):
322     """Helper for computing node addresses.
323
324     @type client: L{ganeti.rpc.Client}
325     @param client: a C{Client} instance
326     @type node_list: list
327     @param node_list: the node list we should connect
328     @type call: string
329     @param call: the name of the remote procedure call, for filling in
330         correctly any eventual offline nodes' results
331
332     """
333     all_nodes = self._cfg.GetAllNodesInfo()
334     name_list = []
335     addr_list = []
336     skip_dict = {}
337     for node in node_list:
338       if node in all_nodes:
339         if all_nodes[node].offline:
340           skip_dict[node] = RpcResult(node=node, offline=True, call=call)
341           continue
342         val = all_nodes[node].primary_ip
343       else:
344         val = None
345       addr_list.append(val)
346       name_list.append(node)
347     if name_list:
348       client.ConnectList(name_list, address_list=addr_list)
349     return skip_dict
350
351   def _ConnectNode(self, client, node, call):
352     """Helper for computing one node's address.
353
354     @type client: L{ganeti.rpc.Client}
355     @param client: a C{Client} instance
356     @type node: str
357     @param node: the node we should connect
358     @type call: string
359     @param call: the name of the remote procedure call, for filling in
360         correctly any eventual offline nodes' results
361
362     """
363     node_info = self._cfg.GetNodeInfo(node)
364     if node_info is not None:
365       if node_info.offline:
366         return RpcResult(node=node, offline=True, call=call)
367       addr = node_info.primary_ip
368     else:
369       addr = None
370     client.ConnectNode(node, address=addr)
371
372   def _MultiNodeCall(self, node_list, procedure, args):
373     """Helper for making a multi-node call
374
375     """
376     body = serializer.DumpJson(args, indent=False)
377     c = Client(procedure, body, self.port)
378     skip_dict = self._ConnectList(c, node_list, procedure)
379     skip_dict.update(c.GetResults())
380     return skip_dict
381
382   @classmethod
383   def _StaticMultiNodeCall(cls, node_list, procedure, args,
384                            address_list=None):
385     """Helper for making a multi-node static call
386
387     """
388     body = serializer.DumpJson(args, indent=False)
389     c = Client(procedure, body, utils.GetDaemonPort(constants.NODED))
390     c.ConnectList(node_list, address_list=address_list)
391     return c.GetResults()
392
393   def _SingleNodeCall(self, node, procedure, args):
394     """Helper for making a single-node call
395
396     """
397     body = serializer.DumpJson(args, indent=False)
398     c = Client(procedure, body, self.port)
399     result = self._ConnectNode(c, node, procedure)
400     if result is None:
401       # we did connect, node is not offline
402       result = c.GetResults()[node]
403     return result
404
405   @classmethod
406   def _StaticSingleNodeCall(cls, node, procedure, args):
407     """Helper for making a single-node static call
408
409     """
410     body = serializer.DumpJson(args, indent=False)
411     c = Client(procedure, body, utils.GetDaemonPort(constants.NODED))
412     c.ConnectNode(node)
413     return c.GetResults()[node]
414
415   @staticmethod
416   def _Compress(data):
417     """Compresses a string for transport over RPC.
418
419     Small amounts of data are not compressed.
420
421     @type data: str
422     @param data: Data
423     @rtype: tuple
424     @return: Encoded data to send
425
426     """
427     # Small amounts of data are not compressed
428     if len(data) < 512:
429       return (constants.RPC_ENCODING_NONE, data)
430
431     # Compress with zlib and encode in base64
432     return (constants.RPC_ENCODING_ZLIB_BASE64,
433             base64.b64encode(zlib.compress(data, 3)))
434
435   #
436   # Begin RPC calls
437   #
438
439   def call_lv_list(self, node_list, vg_name):
440     """Gets the logical volumes present in a given volume group.
441
442     This is a multi-node call.
443
444     """
445     return self._MultiNodeCall(node_list, "lv_list", [vg_name])
446
447   def call_vg_list(self, node_list):
448     """Gets the volume group list.
449
450     This is a multi-node call.
451
452     """
453     return self._MultiNodeCall(node_list, "vg_list", [])
454
455   def call_storage_list(self, node_list, su_name, su_args, name, fields):
456     """Get list of storage units.
457
458     This is a multi-node call.
459
460     """
461     return self._MultiNodeCall(node_list, "storage_list",
462                                [su_name, su_args, name, fields])
463
464   def call_storage_modify(self, node, su_name, su_args, name, changes):
465     """Modify a storage unit.
466
467     This is a single-node call.
468
469     """
470     return self._SingleNodeCall(node, "storage_modify",
471                                 [su_name, su_args, name, changes])
472
473   def call_storage_execute(self, node, su_name, su_args, name, op):
474     """Executes an operation on a storage unit.
475
476     This is a single-node call.
477
478     """
479     return self._SingleNodeCall(node, "storage_execute",
480                                 [su_name, su_args, name, op])
481
482   def call_bridges_exist(self, node, bridges_list):
483     """Checks if a node has all the bridges given.
484
485     This method checks if all bridges given in the bridges_list are
486     present on the remote node, so that an instance that uses interfaces
487     on those bridges can be started.
488
489     This is a single-node call.
490
491     """
492     return self._SingleNodeCall(node, "bridges_exist", [bridges_list])
493
494   def call_instance_start(self, node, instance, hvp, bep):
495     """Starts an instance.
496
497     This is a single-node call.
498
499     """
500     idict = self._InstDict(instance, hvp=hvp, bep=bep)
501     return self._SingleNodeCall(node, "instance_start", [idict])
502
503   def call_instance_shutdown(self, node, instance, timeout):
504     """Stops an instance.
505
506     This is a single-node call.
507
508     """
509     return self._SingleNodeCall(node, "instance_shutdown",
510                                 [self._InstDict(instance), timeout])
511
512   def call_migration_info(self, node, instance):
513     """Gather the information necessary to prepare an instance migration.
514
515     This is a single-node call.
516
517     @type node: string
518     @param node: the node on which the instance is currently running
519     @type instance: C{objects.Instance}
520     @param instance: the instance definition
521
522     """
523     return self._SingleNodeCall(node, "migration_info",
524                                 [self._InstDict(instance)])
525
526   def call_accept_instance(self, node, instance, info, target):
527     """Prepare a node to accept an instance.
528
529     This is a single-node call.
530
531     @type node: string
532     @param node: the target node for the migration
533     @type instance: C{objects.Instance}
534     @param instance: the instance definition
535     @type info: opaque/hypervisor specific (string/data)
536     @param info: result for the call_migration_info call
537     @type target: string
538     @param target: target hostname (usually ip address) (on the node itself)
539
540     """
541     return self._SingleNodeCall(node, "accept_instance",
542                                 [self._InstDict(instance), info, target])
543
544   def call_finalize_migration(self, node, instance, info, success):
545     """Finalize any target-node migration specific operation.
546
547     This is called both in case of a successful migration and in case of error
548     (in which case it should abort the migration).
549
550     This is a single-node call.
551
552     @type node: string
553     @param node: the target node for the migration
554     @type instance: C{objects.Instance}
555     @param instance: the instance definition
556     @type info: opaque/hypervisor specific (string/data)
557     @param info: result for the call_migration_info call
558     @type success: boolean
559     @param success: whether the migration was a success or a failure
560
561     """
562     return self._SingleNodeCall(node, "finalize_migration",
563                                 [self._InstDict(instance), info, success])
564
565   def call_instance_migrate(self, node, instance, target, live):
566     """Migrate an instance.
567
568     This is a single-node call.
569
570     @type node: string
571     @param node: the node on which the instance is currently running
572     @type instance: C{objects.Instance}
573     @param instance: the instance definition
574     @type target: string
575     @param target: the target node name
576     @type live: boolean
577     @param live: whether the migration should be done live or not (the
578         interpretation of this parameter is left to the hypervisor)
579
580     """
581     return self._SingleNodeCall(node, "instance_migrate",
582                                 [self._InstDict(instance), target, live])
583
584   def call_instance_reboot(self, node, inst, reboot_type, shutdown_timeout):
585     """Reboots an instance.
586
587     This is a single-node call.
588
589     """
590     return self._SingleNodeCall(node, "instance_reboot",
591                                 [self._InstDict(inst), reboot_type,
592                                  shutdown_timeout])
593
594   def call_instance_os_add(self, node, inst, reinstall, debug):
595     """Installs an OS on the given instance.
596
597     This is a single-node call.
598
599     """
600     return self._SingleNodeCall(node, "instance_os_add",
601                                 [self._InstDict(inst), reinstall, debug])
602
603   def call_instance_run_rename(self, node, inst, old_name, debug):
604     """Run the OS rename script for an instance.
605
606     This is a single-node call.
607
608     """
609     return self._SingleNodeCall(node, "instance_run_rename",
610                                 [self._InstDict(inst), old_name, debug])
611
612   def call_instance_info(self, node, instance, hname):
613     """Returns information about a single instance.
614
615     This is a single-node call.
616
617     @type node: list
618     @param node: the list of nodes to query
619     @type instance: string
620     @param instance: the instance name
621     @type hname: string
622     @param hname: the hypervisor type of the instance
623
624     """
625     return self._SingleNodeCall(node, "instance_info", [instance, hname])
626
627   def call_instance_migratable(self, node, instance):
628     """Checks whether the given instance can be migrated.
629
630     This is a single-node call.
631
632     @param node: the node to query
633     @type instance: L{objects.Instance}
634     @param instance: the instance to check
635
636
637     """
638     return self._SingleNodeCall(node, "instance_migratable",
639                                 [self._InstDict(instance)])
640
641   def call_all_instances_info(self, node_list, hypervisor_list):
642     """Returns information about all instances on the given nodes.
643
644     This is a multi-node call.
645
646     @type node_list: list
647     @param node_list: the list of nodes to query
648     @type hypervisor_list: list
649     @param hypervisor_list: the hypervisors to query for instances
650
651     """
652     return self._MultiNodeCall(node_list, "all_instances_info",
653                                [hypervisor_list])
654
655   def call_instance_list(self, node_list, hypervisor_list):
656     """Returns the list of running instances on a given node.
657
658     This is a multi-node call.
659
660     @type node_list: list
661     @param node_list: the list of nodes to query
662     @type hypervisor_list: list
663     @param hypervisor_list: the hypervisors to query for instances
664
665     """
666     return self._MultiNodeCall(node_list, "instance_list", [hypervisor_list])
667
668   def call_node_tcp_ping(self, node, source, target, port, timeout,
669                          live_port_needed):
670     """Do a TcpPing on the remote node
671
672     This is a single-node call.
673
674     """
675     return self._SingleNodeCall(node, "node_tcp_ping",
676                                 [source, target, port, timeout,
677                                  live_port_needed])
678
679   def call_node_has_ip_address(self, node, address):
680     """Checks if a node has the given IP address.
681
682     This is a single-node call.
683
684     """
685     return self._SingleNodeCall(node, "node_has_ip_address", [address])
686
687   def call_node_info(self, node_list, vg_name, hypervisor_type):
688     """Return node information.
689
690     This will return memory information and volume group size and free
691     space.
692
693     This is a multi-node call.
694
695     @type node_list: list
696     @param node_list: the list of nodes to query
697     @type vg_name: C{string}
698     @param vg_name: the name of the volume group to ask for disk space
699         information
700     @type hypervisor_type: C{str}
701     @param hypervisor_type: the name of the hypervisor to ask for
702         memory information
703
704     """
705     return self._MultiNodeCall(node_list, "node_info",
706                                [vg_name, hypervisor_type])
707
708   def call_node_add(self, node, dsa, dsapub, rsa, rsapub, ssh, sshpub):
709     """Add a node to the cluster.
710
711     This is a single-node call.
712
713     """
714     return self._SingleNodeCall(node, "node_add",
715                                 [dsa, dsapub, rsa, rsapub, ssh, sshpub])
716
717   def call_node_verify(self, node_list, checkdict, cluster_name):
718     """Request verification of given parameters.
719
720     This is a multi-node call.
721
722     """
723     return self._MultiNodeCall(node_list, "node_verify",
724                                [checkdict, cluster_name])
725
726   @classmethod
727   def call_node_start_master(cls, node, start_daemons, no_voting):
728     """Tells a node to activate itself as a master.
729
730     This is a single-node call.
731
732     """
733     return cls._StaticSingleNodeCall(node, "node_start_master",
734                                      [start_daemons, no_voting])
735
736   @classmethod
737   def call_node_stop_master(cls, node, stop_daemons):
738     """Tells a node to demote itself from master status.
739
740     This is a single-node call.
741
742     """
743     return cls._StaticSingleNodeCall(node, "node_stop_master", [stop_daemons])
744
745   @classmethod
746   def call_master_info(cls, node_list):
747     """Query master info.
748
749     This is a multi-node call.
750
751     """
752     # TODO: should this method query down nodes?
753     return cls._StaticMultiNodeCall(node_list, "master_info", [])
754
755   @classmethod
756   def call_version(cls, node_list):
757     """Query node version.
758
759     This is a multi-node call.
760
761     """
762     return cls._StaticMultiNodeCall(node_list, "version", [])
763
764   def call_blockdev_create(self, node, bdev, size, owner, on_primary, info):
765     """Request creation of a given block device.
766
767     This is a single-node call.
768
769     """
770     return self._SingleNodeCall(node, "blockdev_create",
771                                 [bdev.ToDict(), size, owner, on_primary, info])
772
773   def call_blockdev_remove(self, node, bdev):
774     """Request removal of a given block device.
775
776     This is a single-node call.
777
778     """
779     return self._SingleNodeCall(node, "blockdev_remove", [bdev.ToDict()])
780
781   def call_blockdev_rename(self, node, devlist):
782     """Request rename of the given block devices.
783
784     This is a single-node call.
785
786     """
787     return self._SingleNodeCall(node, "blockdev_rename",
788                                 [(d.ToDict(), uid) for d, uid in devlist])
789
790   def call_blockdev_assemble(self, node, disk, owner, on_primary):
791     """Request assembling of a given block device.
792
793     This is a single-node call.
794
795     """
796     return self._SingleNodeCall(node, "blockdev_assemble",
797                                 [disk.ToDict(), owner, on_primary])
798
799   def call_blockdev_shutdown(self, node, disk):
800     """Request shutdown of a given block device.
801
802     This is a single-node call.
803
804     """
805     return self._SingleNodeCall(node, "blockdev_shutdown", [disk.ToDict()])
806
807   def call_blockdev_addchildren(self, node, bdev, ndevs):
808     """Request adding a list of children to a (mirroring) device.
809
810     This is a single-node call.
811
812     """
813     return self._SingleNodeCall(node, "blockdev_addchildren",
814                                 [bdev.ToDict(),
815                                  [disk.ToDict() for disk in ndevs]])
816
817   def call_blockdev_removechildren(self, node, bdev, ndevs):
818     """Request removing a list of children from a (mirroring) device.
819
820     This is a single-node call.
821
822     """
823     return self._SingleNodeCall(node, "blockdev_removechildren",
824                                 [bdev.ToDict(),
825                                  [disk.ToDict() for disk in ndevs]])
826
827   def call_blockdev_getmirrorstatus(self, node, disks):
828     """Request status of a (mirroring) device.
829
830     This is a single-node call.
831
832     """
833     result = self._SingleNodeCall(node, "blockdev_getmirrorstatus",
834                                   [dsk.ToDict() for dsk in disks])
835     if not result.fail_msg:
836       result.payload = [objects.BlockDevStatus.FromDict(i)
837                         for i in result.payload]
838     return result
839
840   def call_blockdev_find(self, node, disk):
841     """Request identification of a given block device.
842
843     This is a single-node call.
844
845     """
846     result = self._SingleNodeCall(node, "blockdev_find", [disk.ToDict()])
847     if not result.fail_msg and result.payload is not None:
848       result.payload = objects.BlockDevStatus.FromDict(result.payload)
849     return result
850
851   def call_blockdev_close(self, node, instance_name, disks):
852     """Closes the given block devices.
853
854     This is a single-node call.
855
856     """
857     params = [instance_name, [cf.ToDict() for cf in disks]]
858     return self._SingleNodeCall(node, "blockdev_close", params)
859
860   def call_blockdev_getsizes(self, node, disks):
861     """Returns the size of the given disks.
862
863     This is a single-node call.
864
865     """
866     params = [[cf.ToDict() for cf in disks]]
867     return self._SingleNodeCall(node, "blockdev_getsize", params)
868
869   def call_drbd_disconnect_net(self, node_list, nodes_ip, disks):
870     """Disconnects the network of the given drbd devices.
871
872     This is a multi-node call.
873
874     """
875     return self._MultiNodeCall(node_list, "drbd_disconnect_net",
876                                [nodes_ip, [cf.ToDict() for cf in disks]])
877
878   def call_drbd_attach_net(self, node_list, nodes_ip,
879                            disks, instance_name, multimaster):
880     """Disconnects the given drbd devices.
881
882     This is a multi-node call.
883
884     """
885     return self._MultiNodeCall(node_list, "drbd_attach_net",
886                                [nodes_ip, [cf.ToDict() for cf in disks],
887                                 instance_name, multimaster])
888
889   def call_drbd_wait_sync(self, node_list, nodes_ip, disks):
890     """Waits for the synchronization of drbd devices is complete.
891
892     This is a multi-node call.
893
894     """
895     return self._MultiNodeCall(node_list, "drbd_wait_sync",
896                                [nodes_ip, [cf.ToDict() for cf in disks]])
897
898   @classmethod
899   def call_upload_file(cls, node_list, file_name, address_list=None):
900     """Upload a file.
901
902     The node will refuse the operation in case the file is not on the
903     approved file list.
904
905     This is a multi-node call.
906
907     @type node_list: list
908     @param node_list: the list of node names to upload to
909     @type file_name: str
910     @param file_name: the filename to upload
911     @type address_list: list or None
912     @keyword address_list: an optional list of node addresses, in order
913         to optimize the RPC speed
914
915     """
916     file_contents = utils.ReadFile(file_name)
917     data = cls._Compress(file_contents)
918     st = os.stat(file_name)
919     params = [file_name, data, st.st_mode, st.st_uid, st.st_gid,
920               st.st_atime, st.st_mtime]
921     return cls._StaticMultiNodeCall(node_list, "upload_file", params,
922                                     address_list=address_list)
923
924   @classmethod
925   def call_write_ssconf_files(cls, node_list, values):
926     """Write ssconf files.
927
928     This is a multi-node call.
929
930     """
931     return cls._StaticMultiNodeCall(node_list, "write_ssconf_files", [values])
932
933   def call_os_diagnose(self, node_list):
934     """Request a diagnose of OS definitions.
935
936     This is a multi-node call.
937
938     """
939     return self._MultiNodeCall(node_list, "os_diagnose", [])
940
941   def call_os_get(self, node, name):
942     """Returns an OS definition.
943
944     This is a single-node call.
945
946     """
947     result = self._SingleNodeCall(node, "os_get", [name])
948     if not result.fail_msg and isinstance(result.payload, dict):
949       result.payload = objects.OS.FromDict(result.payload)
950     return result
951
952   def call_hooks_runner(self, node_list, hpath, phase, env):
953     """Call the hooks runner.
954
955     Args:
956       - op: the OpCode instance
957       - env: a dictionary with the environment
958
959     This is a multi-node call.
960
961     """
962     params = [hpath, phase, env]
963     return self._MultiNodeCall(node_list, "hooks_runner", params)
964
965   def call_iallocator_runner(self, node, name, idata):
966     """Call an iallocator on a remote node
967
968     Args:
969       - name: the iallocator name
970       - input: the json-encoded input string
971
972     This is a single-node call.
973
974     """
975     return self._SingleNodeCall(node, "iallocator_runner", [name, idata])
976
977   def call_blockdev_grow(self, node, cf_bdev, amount):
978     """Request a snapshot of the given block device.
979
980     This is a single-node call.
981
982     """
983     return self._SingleNodeCall(node, "blockdev_grow",
984                                 [cf_bdev.ToDict(), amount])
985
986   def call_blockdev_export(self, node, cf_bdev,
987                            dest_node, dest_path, cluster_name):
988     """Export a given disk to another node.
989
990     This is a single-node call.
991
992     """
993     return self._SingleNodeCall(node, "blockdev_export",
994                                 [cf_bdev.ToDict(), dest_node, dest_path,
995                                  cluster_name])
996
997   def call_blockdev_snapshot(self, node, cf_bdev):
998     """Request a snapshot of the given block device.
999
1000     This is a single-node call.
1001
1002     """
1003     return self._SingleNodeCall(node, "blockdev_snapshot", [cf_bdev.ToDict()])
1004
1005   def call_snapshot_export(self, node, snap_bdev, dest_node, instance,
1006                            cluster_name, idx, debug):
1007     """Request the export of a given snapshot.
1008
1009     This is a single-node call.
1010
1011     """
1012     return self._SingleNodeCall(node, "snapshot_export",
1013                                 [snap_bdev.ToDict(), dest_node,
1014                                  self._InstDict(instance), cluster_name,
1015                                  idx, debug])
1016
1017   def call_finalize_export(self, node, instance, snap_disks):
1018     """Request the completion of an export operation.
1019
1020     This writes the export config file, etc.
1021
1022     This is a single-node call.
1023
1024     """
1025     flat_disks = []
1026     for disk in snap_disks:
1027       if isinstance(disk, bool):
1028         flat_disks.append(disk)
1029       else:
1030         flat_disks.append(disk.ToDict())
1031
1032     return self._SingleNodeCall(node, "finalize_export",
1033                                 [self._InstDict(instance), flat_disks])
1034
1035   def call_export_info(self, node, path):
1036     """Queries the export information in a given path.
1037
1038     This is a single-node call.
1039
1040     """
1041     return self._SingleNodeCall(node, "export_info", [path])
1042
1043   def call_instance_os_import(self, node, inst, src_node, src_images,
1044                               cluster_name, debug):
1045     """Request the import of a backup into an instance.
1046
1047     This is a single-node call.
1048
1049     """
1050     return self._SingleNodeCall(node, "instance_os_import",
1051                                 [self._InstDict(inst), src_node, src_images,
1052                                  cluster_name, debug])
1053
1054   def call_export_list(self, node_list):
1055     """Gets the stored exports list.
1056
1057     This is a multi-node call.
1058
1059     """
1060     return self._MultiNodeCall(node_list, "export_list", [])
1061
1062   def call_export_remove(self, node, export):
1063     """Requests removal of a given export.
1064
1065     This is a single-node call.
1066
1067     """
1068     return self._SingleNodeCall(node, "export_remove", [export])
1069
1070   @classmethod
1071   def call_node_leave_cluster(cls, node, modify_ssh_setup):
1072     """Requests a node to clean the cluster information it has.
1073
1074     This will remove the configuration information from the ganeti data
1075     dir.
1076
1077     This is a single-node call.
1078
1079     """
1080     return cls._StaticSingleNodeCall(node, "node_leave_cluster",
1081                                      [modify_ssh_setup])
1082
1083   def call_node_volumes(self, node_list):
1084     """Gets all volumes on node(s).
1085
1086     This is a multi-node call.
1087
1088     """
1089     return self._MultiNodeCall(node_list, "node_volumes", [])
1090
1091   def call_node_demote_from_mc(self, node):
1092     """Demote a node from the master candidate role.
1093
1094     This is a single-node call.
1095
1096     """
1097     return self._SingleNodeCall(node, "node_demote_from_mc", [])
1098
1099   def call_node_powercycle(self, node, hypervisor):
1100     """Tries to powercycle a node.
1101
1102     This is a single-node call.
1103
1104     """
1105     return self._SingleNodeCall(node, "node_powercycle", [hypervisor])
1106
1107   def call_test_delay(self, node_list, duration):
1108     """Sleep for a fixed time on given node(s).
1109
1110     This is a multi-node call.
1111
1112     """
1113     return self._MultiNodeCall(node_list, "test_delay", [duration])
1114
1115   def call_file_storage_dir_create(self, node, file_storage_dir):
1116     """Create the given file storage directory.
1117
1118     This is a single-node call.
1119
1120     """
1121     return self._SingleNodeCall(node, "file_storage_dir_create",
1122                                 [file_storage_dir])
1123
1124   def call_file_storage_dir_remove(self, node, file_storage_dir):
1125     """Remove the given file storage directory.
1126
1127     This is a single-node call.
1128
1129     """
1130     return self._SingleNodeCall(node, "file_storage_dir_remove",
1131                                 [file_storage_dir])
1132
1133   def call_file_storage_dir_rename(self, node, old_file_storage_dir,
1134                                    new_file_storage_dir):
1135     """Rename file storage directory.
1136
1137     This is a single-node call.
1138
1139     """
1140     return self._SingleNodeCall(node, "file_storage_dir_rename",
1141                                 [old_file_storage_dir, new_file_storage_dir])
1142
1143   @classmethod
1144   def call_jobqueue_update(cls, node_list, address_list, file_name, content):
1145     """Update job queue.
1146
1147     This is a multi-node call.
1148
1149     """
1150     return cls._StaticMultiNodeCall(node_list, "jobqueue_update",
1151                                     [file_name, cls._Compress(content)],
1152                                     address_list=address_list)
1153
1154   @classmethod
1155   def call_jobqueue_purge(cls, node):
1156     """Purge job queue.
1157
1158     This is a single-node call.
1159
1160     """
1161     return cls._StaticSingleNodeCall(node, "jobqueue_purge", [])
1162
1163   @classmethod
1164   def call_jobqueue_rename(cls, node_list, address_list, rename):
1165     """Rename a job queue file.
1166
1167     This is a multi-node call.
1168
1169     """
1170     return cls._StaticMultiNodeCall(node_list, "jobqueue_rename", rename,
1171                                     address_list=address_list)
1172
1173   @classmethod
1174   def call_jobqueue_set_drain(cls, node_list, drain_flag):
1175     """Set the drain flag on the queue.
1176
1177     This is a multi-node call.
1178
1179     @type node_list: list
1180     @param node_list: the list of nodes to query
1181     @type drain_flag: bool
1182     @param drain_flag: if True, will set the drain flag, otherwise reset it.
1183
1184     """
1185     return cls._StaticMultiNodeCall(node_list, "jobqueue_set_drain",
1186                                     [drain_flag])
1187
1188   def call_hypervisor_validate_params(self, node_list, hvname, hvparams):
1189     """Validate the hypervisor params.
1190
1191     This is a multi-node call.
1192
1193     @type node_list: list
1194     @param node_list: the list of nodes to query
1195     @type hvname: string
1196     @param hvname: the hypervisor name
1197     @type hvparams: dict
1198     @param hvparams: the hypervisor parameters to be validated
1199
1200     """
1201     cluster = self._cfg.GetClusterInfo()
1202     hv_full = objects.FillDict(cluster.hvparams.get(hvname, {}), hvparams)
1203     return self._MultiNodeCall(node_list, "hypervisor_validate_params",
1204                                [hvname, hv_full])
1205
1206   def call_create_x509_certificate(self, node, validity):
1207     """Creates a new X509 certificate for SSL/TLS.
1208
1209     This is a single-node call.
1210
1211     @type validity: int
1212     @param validity: Validity in seconds
1213
1214     """
1215     return self._SingleNodeCall(node, "create_x509_certificate", [validity])
1216
1217   def call_remove_x509_certificate(self, node, name):
1218     """Removes a X509 certificate.
1219
1220     This is a single-node call.
1221
1222     @type name: string
1223     @param name: Certificate name
1224
1225     """
1226     return self._SingleNodeCall(node, "remove_x509_certificate", [name])
1227
1228   def call_start_import_listener(self, node, x509_key_name, source_x509_ca,
1229                                  instance, dest, dest_args):
1230     """Starts a listener for an import.
1231
1232     This is a single-node call.
1233
1234     @type node: string
1235     @param node: Node name
1236     @type instance: C{objects.Instance}
1237     @param instance: Instance object
1238
1239     """
1240     return self._SingleNodeCall(node, "start_import_listener",
1241                                 [x509_key_name, source_x509_ca,
1242                                  self._InstDict(instance), dest,
1243                                  _EncodeImportExportIO(dest, dest_args)])
1244
1245   def call_start_export(self, node, x509_key_name, dest_x509_ca, host, port,
1246                         instance, source, source_args):
1247     """Starts an export daemon.
1248
1249     This is a single-node call.
1250
1251     @type node: string
1252     @param node: Node name
1253     @type instance: C{objects.Instance}
1254     @param instance: Instance object
1255
1256     """
1257     return self._SingleNodeCall(node, "start_export",
1258                                 [x509_key_name, dest_x509_ca, host, port,
1259                                  self._InstDict(instance), source,
1260                                  _EncodeImportExportIO(source, source_args)])
1261
1262   def call_get_import_export_status(self, node, names):
1263     """Gets the status of an import or export.
1264
1265     This is a single-node call.
1266
1267     @type node: string
1268     @param node: Node name
1269     @type names: List of strings
1270     @param names: Import/export names
1271     @rtype: List of L{objects.ImportExportStatus} instances
1272     @return: Returns a list of the state of each named import/export or None if
1273              a status couldn't be retrieved
1274
1275     """
1276     result = self._SingleNodeCall(node, "get_import_export_status", [names])
1277
1278     if not result.fail_msg:
1279       decoded = []
1280
1281       for i in result.payload:
1282         if i is None:
1283           decoded.append(None)
1284           continue
1285         decoded.append(objects.ImportExportStatus.FromDict(i))
1286
1287       result.payload = decoded
1288
1289     return result
1290
1291   def call_cleanup_import_export(self, node, name):
1292     """Cleans up after an import or export.
1293
1294     This is a single-node call.
1295
1296     @type node: string
1297     @param node: Node name
1298     @type name: string
1299     @param name: Import/export name
1300
1301     """
1302     return self._SingleNodeCall(node, "cleanup_import_export", [name])