Put common import/export daemon options into object
[ganeti-local] / lib / rpc.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Inter-node RPC library.
23
24 """
25
26 # pylint: disable-msg=C0103,R0201,R0904
27 # C0103: Invalid name, since call_ are not valid
28 # R0201: Method could be a function, we keep all rpcs instance methods
29 # as not to change them back and forth between static/instance methods
30 # if they need to start using instance attributes
31 # R0904: Too many public methods
32
33 import os
34 import logging
35 import zlib
36 import base64
37
38 from ganeti import utils
39 from ganeti import objects
40 from ganeti import http
41 from ganeti import serializer
42 from ganeti import constants
43 from ganeti import errors
44
45 # pylint has a bug here, doesn't see this import
46 import ganeti.http.client  # pylint: disable-msg=W0611
47
48
49 # Module level variable
50 _http_manager = None
51
52
53 def Init():
54   """Initializes the module-global HTTP client manager.
55
56   Must be called before using any RPC function.
57
58   """
59   global _http_manager # pylint: disable-msg=W0603
60
61   assert not _http_manager, "RPC module initialized more than once"
62
63   http.InitSsl()
64
65   _http_manager = http.client.HttpClientManager()
66
67
68 def Shutdown():
69   """Stops the module-global HTTP client manager.
70
71   Must be called before quitting the program.
72
73   """
74   global _http_manager # pylint: disable-msg=W0603
75
76   if _http_manager:
77     _http_manager.Shutdown()
78     _http_manager = None
79
80
81 class RpcResult(object):
82   """RPC Result class.
83
84   This class holds an RPC result. It is needed since in multi-node
85   calls we can't raise an exception just because one one out of many
86   failed, and therefore we use this class to encapsulate the result.
87
88   @ivar data: the data payload, for successful results, or None
89   @ivar call: the name of the RPC call
90   @ivar node: the name of the node to which we made the call
91   @ivar offline: whether the operation failed because the node was
92       offline, as opposed to actual failure; offline=True will always
93       imply failed=True, in order to allow simpler checking if
94       the user doesn't care about the exact failure mode
95   @ivar fail_msg: the error message if the call failed
96
97   """
98   def __init__(self, data=None, failed=False, offline=False,
99                call=None, node=None):
100     self.offline = offline
101     self.call = call
102     self.node = node
103
104     if offline:
105       self.fail_msg = "Node is marked offline"
106       self.data = self.payload = None
107     elif failed:
108       self.fail_msg = self._EnsureErr(data)
109       self.data = self.payload = None
110     else:
111       self.data = data
112       if not isinstance(self.data, (tuple, list)):
113         self.fail_msg = ("RPC layer error: invalid result type (%s)" %
114                          type(self.data))
115         self.payload = None
116       elif len(data) != 2:
117         self.fail_msg = ("RPC layer error: invalid result length (%d), "
118                          "expected 2" % len(self.data))
119         self.payload = None
120       elif not self.data[0]:
121         self.fail_msg = self._EnsureErr(self.data[1])
122         self.payload = None
123       else:
124         # finally success
125         self.fail_msg = None
126         self.payload = data[1]
127
128     assert hasattr(self, "call")
129     assert hasattr(self, "data")
130     assert hasattr(self, "fail_msg")
131     assert hasattr(self, "node")
132     assert hasattr(self, "offline")
133     assert hasattr(self, "payload")
134
135   @staticmethod
136   def _EnsureErr(val):
137     """Helper to ensure we return a 'True' value for error."""
138     if val:
139       return val
140     else:
141       return "No error information"
142
143   def Raise(self, msg, prereq=False, ecode=None):
144     """If the result has failed, raise an OpExecError.
145
146     This is used so that LU code doesn't have to check for each
147     result, but instead can call this function.
148
149     """
150     if not self.fail_msg:
151       return
152
153     if not msg: # one could pass None for default message
154       msg = ("Call '%s' to node '%s' has failed: %s" %
155              (self.call, self.node, self.fail_msg))
156     else:
157       msg = "%s: %s" % (msg, self.fail_msg)
158     if prereq:
159       ec = errors.OpPrereqError
160     else:
161       ec = errors.OpExecError
162     if ecode is not None:
163       args = (msg, prereq)
164     else:
165       args = (msg, )
166     raise ec(*args) # pylint: disable-msg=W0142
167
168
169 class Client:
170   """RPC Client class.
171
172   This class, given a (remote) method name, a list of parameters and a
173   list of nodes, will contact (in parallel) all nodes, and return a
174   dict of results (key: node name, value: result).
175
176   One current bug is that generic failure is still signaled by
177   'False' result, which is not good. This overloading of values can
178   cause bugs.
179
180   """
181   def __init__(self, procedure, body, port):
182     self.procedure = procedure
183     self.body = body
184     self.port = port
185     self.nc = {}
186
187     self._ssl_params = \
188       http.HttpSslParams(ssl_key_path=constants.NODED_CERT_FILE,
189                          ssl_cert_path=constants.NODED_CERT_FILE)
190
191   def ConnectList(self, node_list, address_list=None):
192     """Add a list of nodes to the target nodes.
193
194     @type node_list: list
195     @param node_list: the list of node names to connect
196     @type address_list: list or None
197     @keyword address_list: either None or a list with node addresses,
198         which must have the same length as the node list
199
200     """
201     if address_list is None:
202       address_list = [None for _ in node_list]
203     else:
204       assert len(node_list) == len(address_list), \
205              "Name and address lists should have the same length"
206     for node, address in zip(node_list, address_list):
207       self.ConnectNode(node, address)
208
209   def ConnectNode(self, name, address=None):
210     """Add a node to the target list.
211
212     @type name: str
213     @param name: the node name
214     @type address: str
215     @keyword address: the node address, if known
216
217     """
218     if address is None:
219       address = name
220
221     self.nc[name] = \
222       http.client.HttpClientRequest(address, self.port, http.HTTP_PUT,
223                                     "/%s" % self.procedure,
224                                     post_data=self.body,
225                                     ssl_params=self._ssl_params,
226                                     ssl_verify_peer=True)
227
228   def GetResults(self):
229     """Call nodes and return results.
230
231     @rtype: list
232     @return: List of RPC results
233
234     """
235     assert _http_manager, "RPC module not initialized"
236
237     _http_manager.ExecRequests(self.nc.values())
238
239     results = {}
240
241     for name, req in self.nc.iteritems():
242       if req.success and req.resp_status_code == http.HTTP_OK:
243         results[name] = RpcResult(data=serializer.LoadJson(req.resp_body),
244                                   node=name, call=self.procedure)
245         continue
246
247       # TODO: Better error reporting
248       if req.error:
249         msg = req.error
250       else:
251         msg = req.resp_body
252
253       logging.error("RPC error in %s from node %s: %s",
254                     self.procedure, name, msg)
255       results[name] = RpcResult(data=msg, failed=True, node=name,
256                                 call=self.procedure)
257
258     return results
259
260
261 def _EncodeImportExportIO(ieio, ieioargs):
262   """Encodes import/export I/O information.
263
264   """
265   if ieio == constants.IEIO_RAW_DISK:
266     assert len(ieioargs) == 1
267     return (ieioargs[0].ToDict(), )
268
269   if ieio == constants.IEIO_SCRIPT:
270     assert len(ieioargs) == 2
271     return (ieioargs[0].ToDict(), ieioargs[1])
272
273   return ieioargs
274
275
276 class RpcRunner(object):
277   """RPC runner class"""
278
279   def __init__(self, cfg):
280     """Initialized the rpc runner.
281
282     @type cfg:  C{config.ConfigWriter}
283     @param cfg: the configuration object that will be used to get data
284                 about the cluster
285
286     """
287     self._cfg = cfg
288     self.port = utils.GetDaemonPort(constants.NODED)
289
290   def _InstDict(self, instance, hvp=None, bep=None):
291     """Convert the given instance to a dict.
292
293     This is done via the instance's ToDict() method and additionally
294     we fill the hvparams with the cluster defaults.
295
296     @type instance: L{objects.Instance}
297     @param instance: an Instance object
298     @type hvp: dict or None
299     @param hvp: a dictionary with overridden hypervisor parameters
300     @type bep: dict or None
301     @param bep: a dictionary with overridden backend parameters
302     @rtype: dict
303     @return: the instance dict, with the hvparams filled with the
304         cluster defaults
305
306     """
307     idict = instance.ToDict()
308     cluster = self._cfg.GetClusterInfo()
309     idict["hvparams"] = cluster.FillHV(instance)
310     if hvp is not None:
311       idict["hvparams"].update(hvp)
312     idict["beparams"] = cluster.FillBE(instance)
313     if bep is not None:
314       idict["beparams"].update(bep)
315     for nic in idict["nics"]:
316       nic['nicparams'] = objects.FillDict(
317         cluster.nicparams[constants.PP_DEFAULT],
318         nic['nicparams'])
319     return idict
320
321   def _ConnectList(self, client, node_list, call):
322     """Helper for computing node addresses.
323
324     @type client: L{ganeti.rpc.Client}
325     @param client: a C{Client} instance
326     @type node_list: list
327     @param node_list: the node list we should connect
328     @type call: string
329     @param call: the name of the remote procedure call, for filling in
330         correctly any eventual offline nodes' results
331
332     """
333     all_nodes = self._cfg.GetAllNodesInfo()
334     name_list = []
335     addr_list = []
336     skip_dict = {}
337     for node in node_list:
338       if node in all_nodes:
339         if all_nodes[node].offline:
340           skip_dict[node] = RpcResult(node=node, offline=True, call=call)
341           continue
342         val = all_nodes[node].primary_ip
343       else:
344         val = None
345       addr_list.append(val)
346       name_list.append(node)
347     if name_list:
348       client.ConnectList(name_list, address_list=addr_list)
349     return skip_dict
350
351   def _ConnectNode(self, client, node, call):
352     """Helper for computing one node's address.
353
354     @type client: L{ganeti.rpc.Client}
355     @param client: a C{Client} instance
356     @type node: str
357     @param node: the node we should connect
358     @type call: string
359     @param call: the name of the remote procedure call, for filling in
360         correctly any eventual offline nodes' results
361
362     """
363     node_info = self._cfg.GetNodeInfo(node)
364     if node_info is not None:
365       if node_info.offline:
366         return RpcResult(node=node, offline=True, call=call)
367       addr = node_info.primary_ip
368     else:
369       addr = None
370     client.ConnectNode(node, address=addr)
371
372   def _MultiNodeCall(self, node_list, procedure, args):
373     """Helper for making a multi-node call
374
375     """
376     body = serializer.DumpJson(args, indent=False)
377     c = Client(procedure, body, self.port)
378     skip_dict = self._ConnectList(c, node_list, procedure)
379     skip_dict.update(c.GetResults())
380     return skip_dict
381
382   @classmethod
383   def _StaticMultiNodeCall(cls, node_list, procedure, args,
384                            address_list=None):
385     """Helper for making a multi-node static call
386
387     """
388     body = serializer.DumpJson(args, indent=False)
389     c = Client(procedure, body, utils.GetDaemonPort(constants.NODED))
390     c.ConnectList(node_list, address_list=address_list)
391     return c.GetResults()
392
393   def _SingleNodeCall(self, node, procedure, args):
394     """Helper for making a single-node call
395
396     """
397     body = serializer.DumpJson(args, indent=False)
398     c = Client(procedure, body, self.port)
399     result = self._ConnectNode(c, node, procedure)
400     if result is None:
401       # we did connect, node is not offline
402       result = c.GetResults()[node]
403     return result
404
405   @classmethod
406   def _StaticSingleNodeCall(cls, node, procedure, args):
407     """Helper for making a single-node static call
408
409     """
410     body = serializer.DumpJson(args, indent=False)
411     c = Client(procedure, body, utils.GetDaemonPort(constants.NODED))
412     c.ConnectNode(node)
413     return c.GetResults()[node]
414
415   @staticmethod
416   def _Compress(data):
417     """Compresses a string for transport over RPC.
418
419     Small amounts of data are not compressed.
420
421     @type data: str
422     @param data: Data
423     @rtype: tuple
424     @return: Encoded data to send
425
426     """
427     # Small amounts of data are not compressed
428     if len(data) < 512:
429       return (constants.RPC_ENCODING_NONE, data)
430
431     # Compress with zlib and encode in base64
432     return (constants.RPC_ENCODING_ZLIB_BASE64,
433             base64.b64encode(zlib.compress(data, 3)))
434
435   #
436   # Begin RPC calls
437   #
438
439   def call_lv_list(self, node_list, vg_name):
440     """Gets the logical volumes present in a given volume group.
441
442     This is a multi-node call.
443
444     """
445     return self._MultiNodeCall(node_list, "lv_list", [vg_name])
446
447   def call_vg_list(self, node_list):
448     """Gets the volume group list.
449
450     This is a multi-node call.
451
452     """
453     return self._MultiNodeCall(node_list, "vg_list", [])
454
455   def call_storage_list(self, node_list, su_name, su_args, name, fields):
456     """Get list of storage units.
457
458     This is a multi-node call.
459
460     """
461     return self._MultiNodeCall(node_list, "storage_list",
462                                [su_name, su_args, name, fields])
463
464   def call_storage_modify(self, node, su_name, su_args, name, changes):
465     """Modify a storage unit.
466
467     This is a single-node call.
468
469     """
470     return self._SingleNodeCall(node, "storage_modify",
471                                 [su_name, su_args, name, changes])
472
473   def call_storage_execute(self, node, su_name, su_args, name, op):
474     """Executes an operation on a storage unit.
475
476     This is a single-node call.
477
478     """
479     return self._SingleNodeCall(node, "storage_execute",
480                                 [su_name, su_args, name, op])
481
482   def call_bridges_exist(self, node, bridges_list):
483     """Checks if a node has all the bridges given.
484
485     This method checks if all bridges given in the bridges_list are
486     present on the remote node, so that an instance that uses interfaces
487     on those bridges can be started.
488
489     This is a single-node call.
490
491     """
492     return self._SingleNodeCall(node, "bridges_exist", [bridges_list])
493
494   def call_instance_start(self, node, instance, hvp, bep):
495     """Starts an instance.
496
497     This is a single-node call.
498
499     """
500     idict = self._InstDict(instance, hvp=hvp, bep=bep)
501     return self._SingleNodeCall(node, "instance_start", [idict])
502
503   def call_instance_shutdown(self, node, instance, timeout):
504     """Stops an instance.
505
506     This is a single-node call.
507
508     """
509     return self._SingleNodeCall(node, "instance_shutdown",
510                                 [self._InstDict(instance), timeout])
511
512   def call_migration_info(self, node, instance):
513     """Gather the information necessary to prepare an instance migration.
514
515     This is a single-node call.
516
517     @type node: string
518     @param node: the node on which the instance is currently running
519     @type instance: C{objects.Instance}
520     @param instance: the instance definition
521
522     """
523     return self._SingleNodeCall(node, "migration_info",
524                                 [self._InstDict(instance)])
525
526   def call_accept_instance(self, node, instance, info, target):
527     """Prepare a node to accept an instance.
528
529     This is a single-node call.
530
531     @type node: string
532     @param node: the target node for the migration
533     @type instance: C{objects.Instance}
534     @param instance: the instance definition
535     @type info: opaque/hypervisor specific (string/data)
536     @param info: result for the call_migration_info call
537     @type target: string
538     @param target: target hostname (usually ip address) (on the node itself)
539
540     """
541     return self._SingleNodeCall(node, "accept_instance",
542                                 [self._InstDict(instance), info, target])
543
544   def call_finalize_migration(self, node, instance, info, success):
545     """Finalize any target-node migration specific operation.
546
547     This is called both in case of a successful migration and in case of error
548     (in which case it should abort the migration).
549
550     This is a single-node call.
551
552     @type node: string
553     @param node: the target node for the migration
554     @type instance: C{objects.Instance}
555     @param instance: the instance definition
556     @type info: opaque/hypervisor specific (string/data)
557     @param info: result for the call_migration_info call
558     @type success: boolean
559     @param success: whether the migration was a success or a failure
560
561     """
562     return self._SingleNodeCall(node, "finalize_migration",
563                                 [self._InstDict(instance), info, success])
564
565   def call_instance_migrate(self, node, instance, target, live):
566     """Migrate an instance.
567
568     This is a single-node call.
569
570     @type node: string
571     @param node: the node on which the instance is currently running
572     @type instance: C{objects.Instance}
573     @param instance: the instance definition
574     @type target: string
575     @param target: the target node name
576     @type live: boolean
577     @param live: whether the migration should be done live or not (the
578         interpretation of this parameter is left to the hypervisor)
579
580     """
581     return self._SingleNodeCall(node, "instance_migrate",
582                                 [self._InstDict(instance), target, live])
583
584   def call_instance_reboot(self, node, inst, reboot_type, shutdown_timeout):
585     """Reboots an instance.
586
587     This is a single-node call.
588
589     """
590     return self._SingleNodeCall(node, "instance_reboot",
591                                 [self._InstDict(inst), reboot_type,
592                                  shutdown_timeout])
593
594   def call_instance_os_add(self, node, inst, reinstall, debug):
595     """Installs an OS on the given instance.
596
597     This is a single-node call.
598
599     """
600     return self._SingleNodeCall(node, "instance_os_add",
601                                 [self._InstDict(inst), reinstall, debug])
602
603   def call_instance_run_rename(self, node, inst, old_name, debug):
604     """Run the OS rename script for an instance.
605
606     This is a single-node call.
607
608     """
609     return self._SingleNodeCall(node, "instance_run_rename",
610                                 [self._InstDict(inst), old_name, debug])
611
612   def call_instance_info(self, node, instance, hname):
613     """Returns information about a single instance.
614
615     This is a single-node call.
616
617     @type node: list
618     @param node: the list of nodes to query
619     @type instance: string
620     @param instance: the instance name
621     @type hname: string
622     @param hname: the hypervisor type of the instance
623
624     """
625     return self._SingleNodeCall(node, "instance_info", [instance, hname])
626
627   def call_instance_migratable(self, node, instance):
628     """Checks whether the given instance can be migrated.
629
630     This is a single-node call.
631
632     @param node: the node to query
633     @type instance: L{objects.Instance}
634     @param instance: the instance to check
635
636
637     """
638     return self._SingleNodeCall(node, "instance_migratable",
639                                 [self._InstDict(instance)])
640
641   def call_all_instances_info(self, node_list, hypervisor_list):
642     """Returns information about all instances on the given nodes.
643
644     This is a multi-node call.
645
646     @type node_list: list
647     @param node_list: the list of nodes to query
648     @type hypervisor_list: list
649     @param hypervisor_list: the hypervisors to query for instances
650
651     """
652     return self._MultiNodeCall(node_list, "all_instances_info",
653                                [hypervisor_list])
654
655   def call_instance_list(self, node_list, hypervisor_list):
656     """Returns the list of running instances on a given node.
657
658     This is a multi-node call.
659
660     @type node_list: list
661     @param node_list: the list of nodes to query
662     @type hypervisor_list: list
663     @param hypervisor_list: the hypervisors to query for instances
664
665     """
666     return self._MultiNodeCall(node_list, "instance_list", [hypervisor_list])
667
668   def call_node_tcp_ping(self, node, source, target, port, timeout,
669                          live_port_needed):
670     """Do a TcpPing on the remote node
671
672     This is a single-node call.
673
674     """
675     return self._SingleNodeCall(node, "node_tcp_ping",
676                                 [source, target, port, timeout,
677                                  live_port_needed])
678
679   def call_node_has_ip_address(self, node, address):
680     """Checks if a node has the given IP address.
681
682     This is a single-node call.
683
684     """
685     return self._SingleNodeCall(node, "node_has_ip_address", [address])
686
687   def call_node_info(self, node_list, vg_name, hypervisor_type):
688     """Return node information.
689
690     This will return memory information and volume group size and free
691     space.
692
693     This is a multi-node call.
694
695     @type node_list: list
696     @param node_list: the list of nodes to query
697     @type vg_name: C{string}
698     @param vg_name: the name of the volume group to ask for disk space
699         information
700     @type hypervisor_type: C{str}
701     @param hypervisor_type: the name of the hypervisor to ask for
702         memory information
703
704     """
705     return self._MultiNodeCall(node_list, "node_info",
706                                [vg_name, hypervisor_type])
707
708   def call_node_add(self, node, dsa, dsapub, rsa, rsapub, ssh, sshpub):
709     """Add a node to the cluster.
710
711     This is a single-node call.
712
713     """
714     return self._SingleNodeCall(node, "node_add",
715                                 [dsa, dsapub, rsa, rsapub, ssh, sshpub])
716
717   def call_node_verify(self, node_list, checkdict, cluster_name):
718     """Request verification of given parameters.
719
720     This is a multi-node call.
721
722     """
723     return self._MultiNodeCall(node_list, "node_verify",
724                                [checkdict, cluster_name])
725
726   @classmethod
727   def call_node_start_master(cls, node, start_daemons, no_voting):
728     """Tells a node to activate itself as a master.
729
730     This is a single-node call.
731
732     """
733     return cls._StaticSingleNodeCall(node, "node_start_master",
734                                      [start_daemons, no_voting])
735
736   @classmethod
737   def call_node_stop_master(cls, node, stop_daemons):
738     """Tells a node to demote itself from master status.
739
740     This is a single-node call.
741
742     """
743     return cls._StaticSingleNodeCall(node, "node_stop_master", [stop_daemons])
744
745   @classmethod
746   def call_master_info(cls, node_list):
747     """Query master info.
748
749     This is a multi-node call.
750
751     """
752     # TODO: should this method query down nodes?
753     return cls._StaticMultiNodeCall(node_list, "master_info", [])
754
755   @classmethod
756   def call_version(cls, node_list):
757     """Query node version.
758
759     This is a multi-node call.
760
761     """
762     return cls._StaticMultiNodeCall(node_list, "version", [])
763
764   def call_blockdev_create(self, node, bdev, size, owner, on_primary, info):
765     """Request creation of a given block device.
766
767     This is a single-node call.
768
769     """
770     return self._SingleNodeCall(node, "blockdev_create",
771                                 [bdev.ToDict(), size, owner, on_primary, info])
772
773   def call_blockdev_remove(self, node, bdev):
774     """Request removal of a given block device.
775
776     This is a single-node call.
777
778     """
779     return self._SingleNodeCall(node, "blockdev_remove", [bdev.ToDict()])
780
781   def call_blockdev_rename(self, node, devlist):
782     """Request rename of the given block devices.
783
784     This is a single-node call.
785
786     """
787     return self._SingleNodeCall(node, "blockdev_rename",
788                                 [(d.ToDict(), uid) for d, uid in devlist])
789
790   def call_blockdev_assemble(self, node, disk, owner, on_primary):
791     """Request assembling of a given block device.
792
793     This is a single-node call.
794
795     """
796     return self._SingleNodeCall(node, "blockdev_assemble",
797                                 [disk.ToDict(), owner, on_primary])
798
799   def call_blockdev_shutdown(self, node, disk):
800     """Request shutdown of a given block device.
801
802     This is a single-node call.
803
804     """
805     return self._SingleNodeCall(node, "blockdev_shutdown", [disk.ToDict()])
806
807   def call_blockdev_addchildren(self, node, bdev, ndevs):
808     """Request adding a list of children to a (mirroring) device.
809
810     This is a single-node call.
811
812     """
813     return self._SingleNodeCall(node, "blockdev_addchildren",
814                                 [bdev.ToDict(),
815                                  [disk.ToDict() for disk in ndevs]])
816
817   def call_blockdev_removechildren(self, node, bdev, ndevs):
818     """Request removing a list of children from a (mirroring) device.
819
820     This is a single-node call.
821
822     """
823     return self._SingleNodeCall(node, "blockdev_removechildren",
824                                 [bdev.ToDict(),
825                                  [disk.ToDict() for disk in ndevs]])
826
827   def call_blockdev_getmirrorstatus(self, node, disks):
828     """Request status of a (mirroring) device.
829
830     This is a single-node call.
831
832     """
833     result = self._SingleNodeCall(node, "blockdev_getmirrorstatus",
834                                   [dsk.ToDict() for dsk in disks])
835     if not result.fail_msg:
836       result.payload = [objects.BlockDevStatus.FromDict(i)
837                         for i in result.payload]
838     return result
839
840   def call_blockdev_find(self, node, disk):
841     """Request identification of a given block device.
842
843     This is a single-node call.
844
845     """
846     result = self._SingleNodeCall(node, "blockdev_find", [disk.ToDict()])
847     if not result.fail_msg and result.payload is not None:
848       result.payload = objects.BlockDevStatus.FromDict(result.payload)
849     return result
850
851   def call_blockdev_close(self, node, instance_name, disks):
852     """Closes the given block devices.
853
854     This is a single-node call.
855
856     """
857     params = [instance_name, [cf.ToDict() for cf in disks]]
858     return self._SingleNodeCall(node, "blockdev_close", params)
859
860   def call_blockdev_getsizes(self, node, disks):
861     """Returns the size of the given disks.
862
863     This is a single-node call.
864
865     """
866     params = [[cf.ToDict() for cf in disks]]
867     return self._SingleNodeCall(node, "blockdev_getsize", params)
868
869   def call_drbd_disconnect_net(self, node_list, nodes_ip, disks):
870     """Disconnects the network of the given drbd devices.
871
872     This is a multi-node call.
873
874     """
875     return self._MultiNodeCall(node_list, "drbd_disconnect_net",
876                                [nodes_ip, [cf.ToDict() for cf in disks]])
877
878   def call_drbd_attach_net(self, node_list, nodes_ip,
879                            disks, instance_name, multimaster):
880     """Disconnects the given drbd devices.
881
882     This is a multi-node call.
883
884     """
885     return self._MultiNodeCall(node_list, "drbd_attach_net",
886                                [nodes_ip, [cf.ToDict() for cf in disks],
887                                 instance_name, multimaster])
888
889   def call_drbd_wait_sync(self, node_list, nodes_ip, disks):
890     """Waits for the synchronization of drbd devices is complete.
891
892     This is a multi-node call.
893
894     """
895     return self._MultiNodeCall(node_list, "drbd_wait_sync",
896                                [nodes_ip, [cf.ToDict() for cf in disks]])
897
898   @classmethod
899   def call_upload_file(cls, node_list, file_name, address_list=None):
900     """Upload a file.
901
902     The node will refuse the operation in case the file is not on the
903     approved file list.
904
905     This is a multi-node call.
906
907     @type node_list: list
908     @param node_list: the list of node names to upload to
909     @type file_name: str
910     @param file_name: the filename to upload
911     @type address_list: list or None
912     @keyword address_list: an optional list of node addresses, in order
913         to optimize the RPC speed
914
915     """
916     file_contents = utils.ReadFile(file_name)
917     data = cls._Compress(file_contents)
918     st = os.stat(file_name)
919     params = [file_name, data, st.st_mode, st.st_uid, st.st_gid,
920               st.st_atime, st.st_mtime]
921     return cls._StaticMultiNodeCall(node_list, "upload_file", params,
922                                     address_list=address_list)
923
924   @classmethod
925   def call_write_ssconf_files(cls, node_list, values):
926     """Write ssconf files.
927
928     This is a multi-node call.
929
930     """
931     return cls._StaticMultiNodeCall(node_list, "write_ssconf_files", [values])
932
933   def call_os_diagnose(self, node_list):
934     """Request a diagnose of OS definitions.
935
936     This is a multi-node call.
937
938     """
939     return self._MultiNodeCall(node_list, "os_diagnose", [])
940
941   def call_os_get(self, node, name):
942     """Returns an OS definition.
943
944     This is a single-node call.
945
946     """
947     result = self._SingleNodeCall(node, "os_get", [name])
948     if not result.fail_msg and isinstance(result.payload, dict):
949       result.payload = objects.OS.FromDict(result.payload)
950     return result
951
952   def call_hooks_runner(self, node_list, hpath, phase, env):
953     """Call the hooks runner.
954
955     Args:
956       - op: the OpCode instance
957       - env: a dictionary with the environment
958
959     This is a multi-node call.
960
961     """
962     params = [hpath, phase, env]
963     return self._MultiNodeCall(node_list, "hooks_runner", params)
964
965   def call_iallocator_runner(self, node, name, idata):
966     """Call an iallocator on a remote node
967
968     Args:
969       - name: the iallocator name
970       - input: the json-encoded input string
971
972     This is a single-node call.
973
974     """
975     return self._SingleNodeCall(node, "iallocator_runner", [name, idata])
976
977   def call_blockdev_grow(self, node, cf_bdev, amount):
978     """Request a snapshot of the given block device.
979
980     This is a single-node call.
981
982     """
983     return self._SingleNodeCall(node, "blockdev_grow",
984                                 [cf_bdev.ToDict(), amount])
985
986   def call_blockdev_export(self, node, cf_bdev,
987                            dest_node, dest_path, cluster_name):
988     """Export a given disk to another node.
989
990     This is a single-node call.
991
992     """
993     return self._SingleNodeCall(node, "blockdev_export",
994                                 [cf_bdev.ToDict(), dest_node, dest_path,
995                                  cluster_name])
996
997   def call_blockdev_snapshot(self, node, cf_bdev):
998     """Request a snapshot of the given block device.
999
1000     This is a single-node call.
1001
1002     """
1003     return self._SingleNodeCall(node, "blockdev_snapshot", [cf_bdev.ToDict()])
1004
1005   def call_finalize_export(self, node, instance, snap_disks):
1006     """Request the completion of an export operation.
1007
1008     This writes the export config file, etc.
1009
1010     This is a single-node call.
1011
1012     """
1013     flat_disks = []
1014     for disk in snap_disks:
1015       if isinstance(disk, bool):
1016         flat_disks.append(disk)
1017       else:
1018         flat_disks.append(disk.ToDict())
1019
1020     return self._SingleNodeCall(node, "finalize_export",
1021                                 [self._InstDict(instance), flat_disks])
1022
1023   def call_export_info(self, node, path):
1024     """Queries the export information in a given path.
1025
1026     This is a single-node call.
1027
1028     """
1029     return self._SingleNodeCall(node, "export_info", [path])
1030
1031   def call_export_list(self, node_list):
1032     """Gets the stored exports list.
1033
1034     This is a multi-node call.
1035
1036     """
1037     return self._MultiNodeCall(node_list, "export_list", [])
1038
1039   def call_export_remove(self, node, export):
1040     """Requests removal of a given export.
1041
1042     This is a single-node call.
1043
1044     """
1045     return self._SingleNodeCall(node, "export_remove", [export])
1046
1047   @classmethod
1048   def call_node_leave_cluster(cls, node, modify_ssh_setup):
1049     """Requests a node to clean the cluster information it has.
1050
1051     This will remove the configuration information from the ganeti data
1052     dir.
1053
1054     This is a single-node call.
1055
1056     """
1057     return cls._StaticSingleNodeCall(node, "node_leave_cluster",
1058                                      [modify_ssh_setup])
1059
1060   def call_node_volumes(self, node_list):
1061     """Gets all volumes on node(s).
1062
1063     This is a multi-node call.
1064
1065     """
1066     return self._MultiNodeCall(node_list, "node_volumes", [])
1067
1068   def call_node_demote_from_mc(self, node):
1069     """Demote a node from the master candidate role.
1070
1071     This is a single-node call.
1072
1073     """
1074     return self._SingleNodeCall(node, "node_demote_from_mc", [])
1075
1076   def call_node_powercycle(self, node, hypervisor):
1077     """Tries to powercycle a node.
1078
1079     This is a single-node call.
1080
1081     """
1082     return self._SingleNodeCall(node, "node_powercycle", [hypervisor])
1083
1084   def call_test_delay(self, node_list, duration):
1085     """Sleep for a fixed time on given node(s).
1086
1087     This is a multi-node call.
1088
1089     """
1090     return self._MultiNodeCall(node_list, "test_delay", [duration])
1091
1092   def call_file_storage_dir_create(self, node, file_storage_dir):
1093     """Create the given file storage directory.
1094
1095     This is a single-node call.
1096
1097     """
1098     return self._SingleNodeCall(node, "file_storage_dir_create",
1099                                 [file_storage_dir])
1100
1101   def call_file_storage_dir_remove(self, node, file_storage_dir):
1102     """Remove the given file storage directory.
1103
1104     This is a single-node call.
1105
1106     """
1107     return self._SingleNodeCall(node, "file_storage_dir_remove",
1108                                 [file_storage_dir])
1109
1110   def call_file_storage_dir_rename(self, node, old_file_storage_dir,
1111                                    new_file_storage_dir):
1112     """Rename file storage directory.
1113
1114     This is a single-node call.
1115
1116     """
1117     return self._SingleNodeCall(node, "file_storage_dir_rename",
1118                                 [old_file_storage_dir, new_file_storage_dir])
1119
1120   @classmethod
1121   def call_jobqueue_update(cls, node_list, address_list, file_name, content):
1122     """Update job queue.
1123
1124     This is a multi-node call.
1125
1126     """
1127     return cls._StaticMultiNodeCall(node_list, "jobqueue_update",
1128                                     [file_name, cls._Compress(content)],
1129                                     address_list=address_list)
1130
1131   @classmethod
1132   def call_jobqueue_purge(cls, node):
1133     """Purge job queue.
1134
1135     This is a single-node call.
1136
1137     """
1138     return cls._StaticSingleNodeCall(node, "jobqueue_purge", [])
1139
1140   @classmethod
1141   def call_jobqueue_rename(cls, node_list, address_list, rename):
1142     """Rename a job queue file.
1143
1144     This is a multi-node call.
1145
1146     """
1147     return cls._StaticMultiNodeCall(node_list, "jobqueue_rename", rename,
1148                                     address_list=address_list)
1149
1150   @classmethod
1151   def call_jobqueue_set_drain(cls, node_list, drain_flag):
1152     """Set the drain flag on the queue.
1153
1154     This is a multi-node call.
1155
1156     @type node_list: list
1157     @param node_list: the list of nodes to query
1158     @type drain_flag: bool
1159     @param drain_flag: if True, will set the drain flag, otherwise reset it.
1160
1161     """
1162     return cls._StaticMultiNodeCall(node_list, "jobqueue_set_drain",
1163                                     [drain_flag])
1164
1165   def call_hypervisor_validate_params(self, node_list, hvname, hvparams):
1166     """Validate the hypervisor params.
1167
1168     This is a multi-node call.
1169
1170     @type node_list: list
1171     @param node_list: the list of nodes to query
1172     @type hvname: string
1173     @param hvname: the hypervisor name
1174     @type hvparams: dict
1175     @param hvparams: the hypervisor parameters to be validated
1176
1177     """
1178     cluster = self._cfg.GetClusterInfo()
1179     hv_full = objects.FillDict(cluster.hvparams.get(hvname, {}), hvparams)
1180     return self._MultiNodeCall(node_list, "hypervisor_validate_params",
1181                                [hvname, hv_full])
1182
1183   def call_x509_cert_create(self, node, validity):
1184     """Creates a new X509 certificate for SSL/TLS.
1185
1186     This is a single-node call.
1187
1188     @type validity: int
1189     @param validity: Validity in seconds
1190
1191     """
1192     return self._SingleNodeCall(node, "x509_cert_create", [validity])
1193
1194   def call_x509_cert_remove(self, node, name):
1195     """Removes a X509 certificate.
1196
1197     This is a single-node call.
1198
1199     @type name: string
1200     @param name: Certificate name
1201
1202     """
1203     return self._SingleNodeCall(node, "x509_cert_remove", [name])
1204
1205   def call_import_start(self, node, opts, instance, dest, dest_args):
1206     """Starts a listener for an import.
1207
1208     This is a single-node call.
1209
1210     @type node: string
1211     @param node: Node name
1212     @type instance: C{objects.Instance}
1213     @param instance: Instance object
1214
1215     """
1216     return self._SingleNodeCall(node, "import_start",
1217                                 [opts.ToDict(),
1218                                  self._InstDict(instance), dest,
1219                                  _EncodeImportExportIO(dest, dest_args)])
1220
1221   def call_export_start(self, node, opts, host, port,
1222                         instance, source, source_args):
1223     """Starts an export daemon.
1224
1225     This is a single-node call.
1226
1227     @type node: string
1228     @param node: Node name
1229     @type instance: C{objects.Instance}
1230     @param instance: Instance object
1231
1232     """
1233     return self._SingleNodeCall(node, "export_start",
1234                                 [opts.ToDict(), host, port,
1235                                  self._InstDict(instance), source,
1236                                  _EncodeImportExportIO(source, source_args)])
1237
1238   def call_impexp_status(self, node, names):
1239     """Gets the status of an import or export.
1240
1241     This is a single-node call.
1242
1243     @type node: string
1244     @param node: Node name
1245     @type names: List of strings
1246     @param names: Import/export names
1247     @rtype: List of L{objects.ImportExportStatus} instances
1248     @return: Returns a list of the state of each named import/export or None if
1249              a status couldn't be retrieved
1250
1251     """
1252     result = self._SingleNodeCall(node, "impexp_status", [names])
1253
1254     if not result.fail_msg:
1255       decoded = []
1256
1257       for i in result.payload:
1258         if i is None:
1259           decoded.append(None)
1260           continue
1261         decoded.append(objects.ImportExportStatus.FromDict(i))
1262
1263       result.payload = decoded
1264
1265     return result
1266
1267   def call_impexp_abort(self, node, name):
1268     """Aborts an import or export.
1269
1270     This is a single-node call.
1271
1272     @type node: string
1273     @param node: Node name
1274     @type name: string
1275     @param name: Import/export name
1276
1277     """
1278     return self._SingleNodeCall(node, "impexp_abort", [name])
1279
1280   def call_impexp_cleanup(self, node, name):
1281     """Cleans up after an import or export.
1282
1283     This is a single-node call.
1284
1285     @type node: string
1286     @param node: Node name
1287     @type name: string
1288     @param name: Import/export name
1289
1290     """
1291     return self._SingleNodeCall(node, "impexp_cleanup", [name])