Change bdev.LogicalVolume.GetPVInfo usage
[ganeti-local] / lib / rpc.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Inter-node RPC library.
23
24 """
25
26 # pylint: disable-msg=C0103,R0201,R0904
27 # C0103: Invalid name, since call_ are not valid
28 # R0201: Method could be a function, we keep all rpcs instance methods
29 # as not to change them back and forth between static/instance methods
30 # if they need to start using instance attributes
31 # R0904: Too many public methods
32
33 import os
34 import logging
35 import zlib
36 import base64
37
38 from ganeti import utils
39 from ganeti import objects
40 from ganeti import http
41 from ganeti import serializer
42 from ganeti import constants
43 from ganeti import errors
44
45 import ganeti.http.client
46
47
48 # Module level variable
49 _http_manager = None
50
51
52 def Init():
53   """Initializes the module-global HTTP client manager.
54
55   Must be called before using any RPC function.
56
57   """
58   global _http_manager
59
60   assert not _http_manager, "RPC module initialized more than once"
61
62   _http_manager = http.client.HttpClientManager()
63
64
65 def Shutdown():
66   """Stops the module-global HTTP client manager.
67
68   Must be called before quitting the program.
69
70   """
71   global _http_manager
72
73   if _http_manager:
74     _http_manager.Shutdown()
75     _http_manager = None
76
77
78 class RpcResult(object):
79   """RPC Result class.
80
81   This class holds an RPC result. It is needed since in multi-node
82   calls we can't raise an exception just because one one out of many
83   failed, and therefore we use this class to encapsulate the result.
84
85   @ivar data: the data payload, for successful results, or None
86   @ivar call: the name of the RPC call
87   @ivar node: the name of the node to which we made the call
88   @ivar offline: whether the operation failed because the node was
89       offline, as opposed to actual failure; offline=True will always
90       imply failed=True, in order to allow simpler checking if
91       the user doesn't care about the exact failure mode
92   @ivar fail_msg: the error message if the call failed
93
94   """
95   def __init__(self, data=None, failed=False, offline=False,
96                call=None, node=None):
97     self.offline = offline
98     self.call = call
99     self.node = node
100
101     if offline:
102       self.fail_msg = "Node is marked offline"
103       self.data = self.payload = None
104     elif failed:
105       self.fail_msg = self._EnsureErr(data)
106       self.data = self.payload = None
107     else:
108       self.data = data
109       if not isinstance(self.data, (tuple, list)):
110         self.fail_msg = ("RPC layer error: invalid result type (%s)" %
111                          type(self.data))
112         self.payload = None
113       elif len(data) != 2:
114         self.fail_msg = ("RPC layer error: invalid result length (%d), "
115                          "expected 2" % len(self.data))
116         self.payload = None
117       elif not self.data[0]:
118         self.fail_msg = self._EnsureErr(self.data[1])
119         self.payload = None
120       else:
121         # finally success
122         self.fail_msg = None
123         self.payload = data[1]
124
125     assert hasattr(self, "call")
126     assert hasattr(self, "data")
127     assert hasattr(self, "fail_msg")
128     assert hasattr(self, "node")
129     assert hasattr(self, "offline")
130     assert hasattr(self, "payload")
131
132   @staticmethod
133   def _EnsureErr(val):
134     """Helper to ensure we return a 'True' value for error."""
135     if val:
136       return val
137     else:
138       return "No error information"
139
140   def Raise(self, msg, prereq=False, ecode=None):
141     """If the result has failed, raise an OpExecError.
142
143     This is used so that LU code doesn't have to check for each
144     result, but instead can call this function.
145
146     """
147     if not self.fail_msg:
148       return
149
150     if not msg: # one could pass None for default message
151       msg = ("Call '%s' to node '%s' has failed: %s" %
152              (self.call, self.node, self.fail_msg))
153     else:
154       msg = "%s: %s" % (msg, self.fail_msg)
155     if prereq:
156       ec = errors.OpPrereqError
157     else:
158       ec = errors.OpExecError
159     if ecode is not None:
160       args = (msg, prereq)
161     else:
162       args = (msg, )
163     raise ec(*args)
164
165
166 class Client:
167   """RPC Client class.
168
169   This class, given a (remote) method name, a list of parameters and a
170   list of nodes, will contact (in parallel) all nodes, and return a
171   dict of results (key: node name, value: result).
172
173   One current bug is that generic failure is still signaled by
174   'False' result, which is not good. This overloading of values can
175   cause bugs.
176
177   """
178   def __init__(self, procedure, body, port):
179     self.procedure = procedure
180     self.body = body
181     self.port = port
182     self.nc = {}
183
184     self._ssl_params = \
185       http.HttpSslParams(ssl_key_path=constants.SSL_CERT_FILE,
186                          ssl_cert_path=constants.SSL_CERT_FILE)
187
188   def ConnectList(self, node_list, address_list=None):
189     """Add a list of nodes to the target nodes.
190
191     @type node_list: list
192     @param node_list: the list of node names to connect
193     @type address_list: list or None
194     @keyword address_list: either None or a list with node addresses,
195         which must have the same length as the node list
196
197     """
198     if address_list is None:
199       address_list = [None for _ in node_list]
200     else:
201       assert len(node_list) == len(address_list), \
202              "Name and address lists should have the same length"
203     for node, address in zip(node_list, address_list):
204       self.ConnectNode(node, address)
205
206   def ConnectNode(self, name, address=None):
207     """Add a node to the target list.
208
209     @type name: str
210     @param name: the node name
211     @type address: str
212     @keyword address: the node address, if known
213
214     """
215     if address is None:
216       address = name
217
218     self.nc[name] = \
219       http.client.HttpClientRequest(address, self.port, http.HTTP_PUT,
220                                     "/%s" % self.procedure,
221                                     post_data=self.body,
222                                     ssl_params=self._ssl_params,
223                                     ssl_verify_peer=True)
224
225   def GetResults(self):
226     """Call nodes and return results.
227
228     @rtype: list
229     @return: List of RPC results
230
231     """
232     assert _http_manager, "RPC module not initialized"
233
234     _http_manager.ExecRequests(self.nc.values())
235
236     results = {}
237
238     for name, req in self.nc.iteritems():
239       if req.success and req.resp_status_code == http.HTTP_OK:
240         results[name] = RpcResult(data=serializer.LoadJson(req.resp_body),
241                                   node=name, call=self.procedure)
242         continue
243
244       # TODO: Better error reporting
245       if req.error:
246         msg = req.error
247       else:
248         msg = req.resp_body
249
250       logging.error("RPC error in %s from node %s: %s",
251                     self.procedure, name, msg)
252       results[name] = RpcResult(data=msg, failed=True, node=name,
253                                 call=self.procedure)
254
255     return results
256
257
258 class RpcRunner(object):
259   """RPC runner class"""
260
261   def __init__(self, cfg):
262     """Initialized the rpc runner.
263
264     @type cfg:  C{config.ConfigWriter}
265     @param cfg: the configuration object that will be used to get data
266                 about the cluster
267
268     """
269     self._cfg = cfg
270     self.port = utils.GetDaemonPort(constants.NODED)
271
272   def _InstDict(self, instance, hvp=None, bep=None):
273     """Convert the given instance to a dict.
274
275     This is done via the instance's ToDict() method and additionally
276     we fill the hvparams with the cluster defaults.
277
278     @type instance: L{objects.Instance}
279     @param instance: an Instance object
280     @type hvp: dict or None
281     @param hvp: a dictionary with overridden hypervisor parameters
282     @type bep: dict or None
283     @param bep: a dictionary with overridden backend parameters
284     @rtype: dict
285     @return: the instance dict, with the hvparams filled with the
286         cluster defaults
287
288     """
289     idict = instance.ToDict()
290     cluster = self._cfg.GetClusterInfo()
291     idict["hvparams"] = cluster.FillHV(instance)
292     if hvp is not None:
293       idict["hvparams"].update(hvp)
294     idict["beparams"] = cluster.FillBE(instance)
295     if bep is not None:
296       idict["beparams"].update(bep)
297     for nic in idict["nics"]:
298       nic['nicparams'] = objects.FillDict(
299         cluster.nicparams[constants.PP_DEFAULT],
300         nic['nicparams'])
301     return idict
302
303   def _ConnectList(self, client, node_list, call):
304     """Helper for computing node addresses.
305
306     @type client: L{ganeti.rpc.Client}
307     @param client: a C{Client} instance
308     @type node_list: list
309     @param node_list: the node list we should connect
310     @type call: string
311     @param call: the name of the remote procedure call, for filling in
312         correctly any eventual offline nodes' results
313
314     """
315     all_nodes = self._cfg.GetAllNodesInfo()
316     name_list = []
317     addr_list = []
318     skip_dict = {}
319     for node in node_list:
320       if node in all_nodes:
321         if all_nodes[node].offline:
322           skip_dict[node] = RpcResult(node=node, offline=True, call=call)
323           continue
324         val = all_nodes[node].primary_ip
325       else:
326         val = None
327       addr_list.append(val)
328       name_list.append(node)
329     if name_list:
330       client.ConnectList(name_list, address_list=addr_list)
331     return skip_dict
332
333   def _ConnectNode(self, client, node, call):
334     """Helper for computing one node's address.
335
336     @type client: L{ganeti.rpc.Client}
337     @param client: a C{Client} instance
338     @type node: str
339     @param node: the node we should connect
340     @type call: string
341     @param call: the name of the remote procedure call, for filling in
342         correctly any eventual offline nodes' results
343
344     """
345     node_info = self._cfg.GetNodeInfo(node)
346     if node_info is not None:
347       if node_info.offline:
348         return RpcResult(node=node, offline=True, call=call)
349       addr = node_info.primary_ip
350     else:
351       addr = None
352     client.ConnectNode(node, address=addr)
353
354   def _MultiNodeCall(self, node_list, procedure, args):
355     """Helper for making a multi-node call
356
357     """
358     body = serializer.DumpJson(args, indent=False)
359     c = Client(procedure, body, self.port)
360     skip_dict = self._ConnectList(c, node_list, procedure)
361     skip_dict.update(c.GetResults())
362     return skip_dict
363
364   @classmethod
365   def _StaticMultiNodeCall(cls, node_list, procedure, args,
366                            address_list=None):
367     """Helper for making a multi-node static call
368
369     """
370     body = serializer.DumpJson(args, indent=False)
371     c = Client(procedure, body, utils.GetDaemonPort(constants.NODED))
372     c.ConnectList(node_list, address_list=address_list)
373     return c.GetResults()
374
375   def _SingleNodeCall(self, node, procedure, args):
376     """Helper for making a single-node call
377
378     """
379     body = serializer.DumpJson(args, indent=False)
380     c = Client(procedure, body, self.port)
381     result = self._ConnectNode(c, node, procedure)
382     if result is None:
383       # we did connect, node is not offline
384       result = c.GetResults()[node]
385     return result
386
387   @classmethod
388   def _StaticSingleNodeCall(cls, node, procedure, args):
389     """Helper for making a single-node static call
390
391     """
392     body = serializer.DumpJson(args, indent=False)
393     c = Client(procedure, body, utils.GetDaemonPort(constants.NODED))
394     c.ConnectNode(node)
395     return c.GetResults()[node]
396
397   @staticmethod
398   def _Compress(data):
399     """Compresses a string for transport over RPC.
400
401     Small amounts of data are not compressed.
402
403     @type data: str
404     @param data: Data
405     @rtype: tuple
406     @return: Encoded data to send
407
408     """
409     # Small amounts of data are not compressed
410     if len(data) < 512:
411       return (constants.RPC_ENCODING_NONE, data)
412
413     # Compress with zlib and encode in base64
414     return (constants.RPC_ENCODING_ZLIB_BASE64,
415             base64.b64encode(zlib.compress(data, 3)))
416
417   #
418   # Begin RPC calls
419   #
420
421   def call_lv_list(self, node_list, vg_name):
422     """Gets the logical volumes present in a given volume group.
423
424     This is a multi-node call.
425
426     """
427     return self._MultiNodeCall(node_list, "lv_list", [vg_name])
428
429   def call_vg_list(self, node_list):
430     """Gets the volume group list.
431
432     This is a multi-node call.
433
434     """
435     return self._MultiNodeCall(node_list, "vg_list", [])
436
437   def call_storage_list(self, node_list, su_name, su_args, name, fields):
438     """Get list of storage units.
439
440     This is a multi-node call.
441
442     """
443     return self._MultiNodeCall(node_list, "storage_list",
444                                [su_name, su_args, name, fields])
445
446   def call_storage_modify(self, node, su_name, su_args, name, changes):
447     """Modify a storage unit.
448
449     This is a single-node call.
450
451     """
452     return self._SingleNodeCall(node, "storage_modify",
453                                 [su_name, su_args, name, changes])
454
455   def call_storage_execute(self, node, su_name, su_args, name, op):
456     """Executes an operation on a storage unit.
457
458     This is a single-node call.
459
460     """
461     return self._SingleNodeCall(node, "storage_execute",
462                                 [su_name, su_args, name, op])
463
464   def call_bridges_exist(self, node, bridges_list):
465     """Checks if a node has all the bridges given.
466
467     This method checks if all bridges given in the bridges_list are
468     present on the remote node, so that an instance that uses interfaces
469     on those bridges can be started.
470
471     This is a single-node call.
472
473     """
474     return self._SingleNodeCall(node, "bridges_exist", [bridges_list])
475
476   def call_instance_start(self, node, instance, hvp, bep):
477     """Starts an instance.
478
479     This is a single-node call.
480
481     """
482     idict = self._InstDict(instance, hvp=hvp, bep=bep)
483     return self._SingleNodeCall(node, "instance_start", [idict])
484
485   def call_instance_shutdown(self, node, instance, timeout):
486     """Stops an instance.
487
488     This is a single-node call.
489
490     """
491     return self._SingleNodeCall(node, "instance_shutdown",
492                                 [self._InstDict(instance), timeout])
493
494   def call_migration_info(self, node, instance):
495     """Gather the information necessary to prepare an instance migration.
496
497     This is a single-node call.
498
499     @type node: string
500     @param node: the node on which the instance is currently running
501     @type instance: C{objects.Instance}
502     @param instance: the instance definition
503
504     """
505     return self._SingleNodeCall(node, "migration_info",
506                                 [self._InstDict(instance)])
507
508   def call_accept_instance(self, node, instance, info, target):
509     """Prepare a node to accept an instance.
510
511     This is a single-node call.
512
513     @type node: string
514     @param node: the target node for the migration
515     @type instance: C{objects.Instance}
516     @param instance: the instance definition
517     @type info: opaque/hypervisor specific (string/data)
518     @param info: result for the call_migration_info call
519     @type target: string
520     @param target: target hostname (usually ip address) (on the node itself)
521
522     """
523     return self._SingleNodeCall(node, "accept_instance",
524                                 [self._InstDict(instance), info, target])
525
526   def call_finalize_migration(self, node, instance, info, success):
527     """Finalize any target-node migration specific operation.
528
529     This is called both in case of a successful migration and in case of error
530     (in which case it should abort the migration).
531
532     This is a single-node call.
533
534     @type node: string
535     @param node: the target node for the migration
536     @type instance: C{objects.Instance}
537     @param instance: the instance definition
538     @type info: opaque/hypervisor specific (string/data)
539     @param info: result for the call_migration_info call
540     @type success: boolean
541     @param success: whether the migration was a success or a failure
542
543     """
544     return self._SingleNodeCall(node, "finalize_migration",
545                                 [self._InstDict(instance), info, success])
546
547   def call_instance_migrate(self, node, instance, target, live):
548     """Migrate an instance.
549
550     This is a single-node call.
551
552     @type node: string
553     @param node: the node on which the instance is currently running
554     @type instance: C{objects.Instance}
555     @param instance: the instance definition
556     @type target: string
557     @param target: the target node name
558     @type live: boolean
559     @param live: whether the migration should be done live or not (the
560         interpretation of this parameter is left to the hypervisor)
561
562     """
563     return self._SingleNodeCall(node, "instance_migrate",
564                                 [self._InstDict(instance), target, live])
565
566   def call_instance_reboot(self, node, inst, reboot_type, shutdown_timeout):
567     """Reboots an instance.
568
569     This is a single-node call.
570
571     """
572     return self._SingleNodeCall(node, "instance_reboot",
573                                 [self._InstDict(inst), reboot_type,
574                                  shutdown_timeout])
575
576   def call_instance_os_add(self, node, inst, reinstall):
577     """Installs an OS on the given instance.
578
579     This is a single-node call.
580
581     """
582     return self._SingleNodeCall(node, "instance_os_add",
583                                 [self._InstDict(inst), reinstall])
584
585   def call_instance_run_rename(self, node, inst, old_name):
586     """Run the OS rename script for an instance.
587
588     This is a single-node call.
589
590     """
591     return self._SingleNodeCall(node, "instance_run_rename",
592                                 [self._InstDict(inst), old_name])
593
594   def call_instance_info(self, node, instance, hname):
595     """Returns information about a single instance.
596
597     This is a single-node call.
598
599     @type node: list
600     @param node: the list of nodes to query
601     @type instance: string
602     @param instance: the instance name
603     @type hname: string
604     @param hname: the hypervisor type of the instance
605
606     """
607     return self._SingleNodeCall(node, "instance_info", [instance, hname])
608
609   def call_instance_migratable(self, node, instance):
610     """Checks whether the given instance can be migrated.
611
612     This is a single-node call.
613
614     @param node: the node to query
615     @type instance: L{objects.Instance}
616     @param instance: the instance to check
617
618
619     """
620     return self._SingleNodeCall(node, "instance_migratable",
621                                 [self._InstDict(instance)])
622
623   def call_all_instances_info(self, node_list, hypervisor_list):
624     """Returns information about all instances on the given nodes.
625
626     This is a multi-node call.
627
628     @type node_list: list
629     @param node_list: the list of nodes to query
630     @type hypervisor_list: list
631     @param hypervisor_list: the hypervisors to query for instances
632
633     """
634     return self._MultiNodeCall(node_list, "all_instances_info",
635                                [hypervisor_list])
636
637   def call_instance_list(self, node_list, hypervisor_list):
638     """Returns the list of running instances on a given node.
639
640     This is a multi-node call.
641
642     @type node_list: list
643     @param node_list: the list of nodes to query
644     @type hypervisor_list: list
645     @param hypervisor_list: the hypervisors to query for instances
646
647     """
648     return self._MultiNodeCall(node_list, "instance_list", [hypervisor_list])
649
650   def call_node_tcp_ping(self, node, source, target, port, timeout,
651                          live_port_needed):
652     """Do a TcpPing on the remote node
653
654     This is a single-node call.
655
656     """
657     return self._SingleNodeCall(node, "node_tcp_ping",
658                                 [source, target, port, timeout,
659                                  live_port_needed])
660
661   def call_node_has_ip_address(self, node, address):
662     """Checks if a node has the given IP address.
663
664     This is a single-node call.
665
666     """
667     return self._SingleNodeCall(node, "node_has_ip_address", [address])
668
669   def call_node_info(self, node_list, vg_name, hypervisor_type):
670     """Return node information.
671
672     This will return memory information and volume group size and free
673     space.
674
675     This is a multi-node call.
676
677     @type node_list: list
678     @param node_list: the list of nodes to query
679     @type vg_name: C{string}
680     @param vg_name: the name of the volume group to ask for disk space
681         information
682     @type hypervisor_type: C{str}
683     @param hypervisor_type: the name of the hypervisor to ask for
684         memory information
685
686     """
687     return self._MultiNodeCall(node_list, "node_info",
688                                [vg_name, hypervisor_type])
689
690   def call_node_add(self, node, dsa, dsapub, rsa, rsapub, ssh, sshpub):
691     """Add a node to the cluster.
692
693     This is a single-node call.
694
695     """
696     return self._SingleNodeCall(node, "node_add",
697                                 [dsa, dsapub, rsa, rsapub, ssh, sshpub])
698
699   def call_node_verify(self, node_list, checkdict, cluster_name):
700     """Request verification of given parameters.
701
702     This is a multi-node call.
703
704     """
705     return self._MultiNodeCall(node_list, "node_verify",
706                                [checkdict, cluster_name])
707
708   @classmethod
709   def call_node_start_master(cls, node, start_daemons, no_voting):
710     """Tells a node to activate itself as a master.
711
712     This is a single-node call.
713
714     """
715     return cls._StaticSingleNodeCall(node, "node_start_master",
716                                      [start_daemons, no_voting])
717
718   @classmethod
719   def call_node_stop_master(cls, node, stop_daemons):
720     """Tells a node to demote itself from master status.
721
722     This is a single-node call.
723
724     """
725     return cls._StaticSingleNodeCall(node, "node_stop_master", [stop_daemons])
726
727   @classmethod
728   def call_master_info(cls, node_list):
729     """Query master info.
730
731     This is a multi-node call.
732
733     """
734     # TODO: should this method query down nodes?
735     return cls._StaticMultiNodeCall(node_list, "master_info", [])
736
737   @classmethod
738   def call_version(cls, node_list):
739     """Query node version.
740
741     This is a multi-node call.
742
743     """
744     return cls._StaticMultiNodeCall(node_list, "version", [])
745
746   def call_blockdev_create(self, node, bdev, size, owner, on_primary, info):
747     """Request creation of a given block device.
748
749     This is a single-node call.
750
751     """
752     return self._SingleNodeCall(node, "blockdev_create",
753                                 [bdev.ToDict(), size, owner, on_primary, info])
754
755   def call_blockdev_remove(self, node, bdev):
756     """Request removal of a given block device.
757
758     This is a single-node call.
759
760     """
761     return self._SingleNodeCall(node, "blockdev_remove", [bdev.ToDict()])
762
763   def call_blockdev_rename(self, node, devlist):
764     """Request rename of the given block devices.
765
766     This is a single-node call.
767
768     """
769     return self._SingleNodeCall(node, "blockdev_rename",
770                                 [(d.ToDict(), uid) for d, uid in devlist])
771
772   def call_blockdev_assemble(self, node, disk, owner, on_primary):
773     """Request assembling of a given block device.
774
775     This is a single-node call.
776
777     """
778     return self._SingleNodeCall(node, "blockdev_assemble",
779                                 [disk.ToDict(), owner, on_primary])
780
781   def call_blockdev_shutdown(self, node, disk):
782     """Request shutdown of a given block device.
783
784     This is a single-node call.
785
786     """
787     return self._SingleNodeCall(node, "blockdev_shutdown", [disk.ToDict()])
788
789   def call_blockdev_addchildren(self, node, bdev, ndevs):
790     """Request adding a list of children to a (mirroring) device.
791
792     This is a single-node call.
793
794     """
795     return self._SingleNodeCall(node, "blockdev_addchildren",
796                                 [bdev.ToDict(),
797                                  [disk.ToDict() for disk in ndevs]])
798
799   def call_blockdev_removechildren(self, node, bdev, ndevs):
800     """Request removing a list of children from a (mirroring) device.
801
802     This is a single-node call.
803
804     """
805     return self._SingleNodeCall(node, "blockdev_removechildren",
806                                 [bdev.ToDict(),
807                                  [disk.ToDict() for disk in ndevs]])
808
809   def call_blockdev_getmirrorstatus(self, node, disks):
810     """Request status of a (mirroring) device.
811
812     This is a single-node call.
813
814     """
815     result = self._SingleNodeCall(node, "blockdev_getmirrorstatus",
816                                   [dsk.ToDict() for dsk in disks])
817     if not result.fail_msg:
818       result.payload = [objects.BlockDevStatus.FromDict(i)
819                         for i in result.payload]
820     return result
821
822   def call_blockdev_find(self, node, disk):
823     """Request identification of a given block device.
824
825     This is a single-node call.
826
827     """
828     result = self._SingleNodeCall(node, "blockdev_find", [disk.ToDict()])
829     if not result.fail_msg and result.payload is not None:
830       result.payload = objects.BlockDevStatus.FromDict(result.payload)
831     return result
832
833   def call_blockdev_close(self, node, instance_name, disks):
834     """Closes the given block devices.
835
836     This is a single-node call.
837
838     """
839     params = [instance_name, [cf.ToDict() for cf in disks]]
840     return self._SingleNodeCall(node, "blockdev_close", params)
841
842   def call_blockdev_getsizes(self, node, disks):
843     """Returns the size of the given disks.
844
845     This is a single-node call.
846
847     """
848     params = [[cf.ToDict() for cf in disks]]
849     return self._SingleNodeCall(node, "blockdev_getsize", params)
850
851   def call_drbd_disconnect_net(self, node_list, nodes_ip, disks):
852     """Disconnects the network of the given drbd devices.
853
854     This is a multi-node call.
855
856     """
857     return self._MultiNodeCall(node_list, "drbd_disconnect_net",
858                                [nodes_ip, [cf.ToDict() for cf in disks]])
859
860   def call_drbd_attach_net(self, node_list, nodes_ip,
861                            disks, instance_name, multimaster):
862     """Disconnects the given drbd devices.
863
864     This is a multi-node call.
865
866     """
867     return self._MultiNodeCall(node_list, "drbd_attach_net",
868                                [nodes_ip, [cf.ToDict() for cf in disks],
869                                 instance_name, multimaster])
870
871   def call_drbd_wait_sync(self, node_list, nodes_ip, disks):
872     """Waits for the synchronization of drbd devices is complete.
873
874     This is a multi-node call.
875
876     """
877     return self._MultiNodeCall(node_list, "drbd_wait_sync",
878                                [nodes_ip, [cf.ToDict() for cf in disks]])
879
880   @classmethod
881   def call_upload_file(cls, node_list, file_name, address_list=None):
882     """Upload a file.
883
884     The node will refuse the operation in case the file is not on the
885     approved file list.
886
887     This is a multi-node call.
888
889     @type node_list: list
890     @param node_list: the list of node names to upload to
891     @type file_name: str
892     @param file_name: the filename to upload
893     @type address_list: list or None
894     @keyword address_list: an optional list of node addresses, in order
895         to optimize the RPC speed
896
897     """
898     file_contents = utils.ReadFile(file_name)
899     data = cls._Compress(file_contents)
900     st = os.stat(file_name)
901     params = [file_name, data, st.st_mode, st.st_uid, st.st_gid,
902               st.st_atime, st.st_mtime]
903     return cls._StaticMultiNodeCall(node_list, "upload_file", params,
904                                     address_list=address_list)
905
906   @classmethod
907   def call_write_ssconf_files(cls, node_list, values):
908     """Write ssconf files.
909
910     This is a multi-node call.
911
912     """
913     return cls._StaticMultiNodeCall(node_list, "write_ssconf_files", [values])
914
915   def call_os_diagnose(self, node_list):
916     """Request a diagnose of OS definitions.
917
918     This is a multi-node call.
919
920     """
921     return self._MultiNodeCall(node_list, "os_diagnose", [])
922
923   def call_os_get(self, node, name):
924     """Returns an OS definition.
925
926     This is a single-node call.
927
928     """
929     result = self._SingleNodeCall(node, "os_get", [name])
930     if not result.fail_msg and isinstance(result.payload, dict):
931       result.payload = objects.OS.FromDict(result.payload)
932     return result
933
934   def call_hooks_runner(self, node_list, hpath, phase, env):
935     """Call the hooks runner.
936
937     Args:
938       - op: the OpCode instance
939       - env: a dictionary with the environment
940
941     This is a multi-node call.
942
943     """
944     params = [hpath, phase, env]
945     return self._MultiNodeCall(node_list, "hooks_runner", params)
946
947   def call_iallocator_runner(self, node, name, idata):
948     """Call an iallocator on a remote node
949
950     Args:
951       - name: the iallocator name
952       - input: the json-encoded input string
953
954     This is a single-node call.
955
956     """
957     return self._SingleNodeCall(node, "iallocator_runner", [name, idata])
958
959   def call_blockdev_grow(self, node, cf_bdev, amount):
960     """Request a snapshot of the given block device.
961
962     This is a single-node call.
963
964     """
965     return self._SingleNodeCall(node, "blockdev_grow",
966                                 [cf_bdev.ToDict(), amount])
967
968   def call_blockdev_export(self, node, cf_bdev,
969                            dest_node, dest_path, cluster_name):
970     """Export a given disk to another node.
971
972     This is a single-node call.
973
974     """
975     return self._SingleNodeCall(node, "blockdev_export",
976                                 [cf_bdev.ToDict(), dest_node, dest_path,
977                                  cluster_name])
978
979   def call_blockdev_snapshot(self, node, cf_bdev):
980     """Request a snapshot of the given block device.
981
982     This is a single-node call.
983
984     """
985     return self._SingleNodeCall(node, "blockdev_snapshot", [cf_bdev.ToDict()])
986
987   def call_snapshot_export(self, node, snap_bdev, dest_node, instance,
988                            cluster_name, idx):
989     """Request the export of a given snapshot.
990
991     This is a single-node call.
992
993     """
994     return self._SingleNodeCall(node, "snapshot_export",
995                                 [snap_bdev.ToDict(), dest_node,
996                                  self._InstDict(instance), cluster_name, idx])
997
998   def call_finalize_export(self, node, instance, snap_disks):
999     """Request the completion of an export operation.
1000
1001     This writes the export config file, etc.
1002
1003     This is a single-node call.
1004
1005     """
1006     flat_disks = []
1007     for disk in snap_disks:
1008       if isinstance(disk, bool):
1009         flat_disks.append(disk)
1010       else:
1011         flat_disks.append(disk.ToDict())
1012
1013     return self._SingleNodeCall(node, "finalize_export",
1014                                 [self._InstDict(instance), flat_disks])
1015
1016   def call_export_info(self, node, path):
1017     """Queries the export information in a given path.
1018
1019     This is a single-node call.
1020
1021     """
1022     return self._SingleNodeCall(node, "export_info", [path])
1023
1024   def call_instance_os_import(self, node, inst, src_node, src_images,
1025                               cluster_name):
1026     """Request the import of a backup into an instance.
1027
1028     This is a single-node call.
1029
1030     """
1031     return self._SingleNodeCall(node, "instance_os_import",
1032                                 [self._InstDict(inst), src_node, src_images,
1033                                  cluster_name])
1034
1035   def call_export_list(self, node_list):
1036     """Gets the stored exports list.
1037
1038     This is a multi-node call.
1039
1040     """
1041     return self._MultiNodeCall(node_list, "export_list", [])
1042
1043   def call_export_remove(self, node, export):
1044     """Requests removal of a given export.
1045
1046     This is a single-node call.
1047
1048     """
1049     return self._SingleNodeCall(node, "export_remove", [export])
1050
1051   @classmethod
1052   def call_node_leave_cluster(cls, node, modify_ssh_setup):
1053     """Requests a node to clean the cluster information it has.
1054
1055     This will remove the configuration information from the ganeti data
1056     dir.
1057
1058     This is a single-node call.
1059
1060     """
1061     return cls._StaticSingleNodeCall(node, "node_leave_cluster",
1062                                      [modify_ssh_setup])
1063
1064   def call_node_volumes(self, node_list):
1065     """Gets all volumes on node(s).
1066
1067     This is a multi-node call.
1068
1069     """
1070     return self._MultiNodeCall(node_list, "node_volumes", [])
1071
1072   def call_node_demote_from_mc(self, node):
1073     """Demote a node from the master candidate role.
1074
1075     This is a single-node call.
1076
1077     """
1078     return self._SingleNodeCall(node, "node_demote_from_mc", [])
1079
1080
1081   def call_node_powercycle(self, node, hypervisor):
1082     """Tries to powercycle a node.
1083
1084     This is a single-node call.
1085
1086     """
1087     return self._SingleNodeCall(node, "node_powercycle", [hypervisor])
1088
1089
1090   def call_test_delay(self, node_list, duration):
1091     """Sleep for a fixed time on given node(s).
1092
1093     This is a multi-node call.
1094
1095     """
1096     return self._MultiNodeCall(node_list, "test_delay", [duration])
1097
1098   def call_file_storage_dir_create(self, node, file_storage_dir):
1099     """Create the given file storage directory.
1100
1101     This is a single-node call.
1102
1103     """
1104     return self._SingleNodeCall(node, "file_storage_dir_create",
1105                                 [file_storage_dir])
1106
1107   def call_file_storage_dir_remove(self, node, file_storage_dir):
1108     """Remove the given file storage directory.
1109
1110     This is a single-node call.
1111
1112     """
1113     return self._SingleNodeCall(node, "file_storage_dir_remove",
1114                                 [file_storage_dir])
1115
1116   def call_file_storage_dir_rename(self, node, old_file_storage_dir,
1117                                    new_file_storage_dir):
1118     """Rename file storage directory.
1119
1120     This is a single-node call.
1121
1122     """
1123     return self._SingleNodeCall(node, "file_storage_dir_rename",
1124                                 [old_file_storage_dir, new_file_storage_dir])
1125
1126   @classmethod
1127   def call_jobqueue_update(cls, node_list, address_list, file_name, content):
1128     """Update job queue.
1129
1130     This is a multi-node call.
1131
1132     """
1133     return cls._StaticMultiNodeCall(node_list, "jobqueue_update",
1134                                     [file_name, cls._Compress(content)],
1135                                     address_list=address_list)
1136
1137   @classmethod
1138   def call_jobqueue_purge(cls, node):
1139     """Purge job queue.
1140
1141     This is a single-node call.
1142
1143     """
1144     return cls._StaticSingleNodeCall(node, "jobqueue_purge", [])
1145
1146   @classmethod
1147   def call_jobqueue_rename(cls, node_list, address_list, rename):
1148     """Rename a job queue file.
1149
1150     This is a multi-node call.
1151
1152     """
1153     return cls._StaticMultiNodeCall(node_list, "jobqueue_rename", rename,
1154                                     address_list=address_list)
1155
1156   @classmethod
1157   def call_jobqueue_set_drain(cls, node_list, drain_flag):
1158     """Set the drain flag on the queue.
1159
1160     This is a multi-node call.
1161
1162     @type node_list: list
1163     @param node_list: the list of nodes to query
1164     @type drain_flag: bool
1165     @param drain_flag: if True, will set the drain flag, otherwise reset it.
1166
1167     """
1168     return cls._StaticMultiNodeCall(node_list, "jobqueue_set_drain",
1169                                     [drain_flag])
1170
1171   def call_hypervisor_validate_params(self, node_list, hvname, hvparams):
1172     """Validate the hypervisor params.
1173
1174     This is a multi-node call.
1175
1176     @type node_list: list
1177     @param node_list: the list of nodes to query
1178     @type hvname: string
1179     @param hvname: the hypervisor name
1180     @type hvparams: dict
1181     @param hvparams: the hypervisor parameters to be validated
1182
1183     """
1184     cluster = self._cfg.GetClusterInfo()
1185     hv_full = objects.FillDict(cluster.hvparams.get(hvname, {}), hvparams)
1186     return self._MultiNodeCall(node_list, "hypervisor_validate_params",
1187                                [hvname, hv_full])