serializer.DumpSignedJson
[ganeti-local] / lib / rpc.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Inter-node RPC library.
23
24 """
25
26 # pylint: disable-msg=C0103,R0201,R0904
27 # C0103: Invalid name, since call_ are not valid
28 # R0201: Method could be a function, we keep all rpcs instance methods
29 # as not to change them back and forth between static/instance methods
30 # if they need to start using instance attributes
31 # R0904: Too many public methods
32
33 import os
34 import logging
35 import zlib
36 import base64
37
38 from ganeti import utils
39 from ganeti import objects
40 from ganeti import http
41 from ganeti import serializer
42 from ganeti import constants
43 from ganeti import errors
44
45 import ganeti.http.client
46
47
48 # Module level variable
49 _http_manager = None
50
51
52 def Init():
53   """Initializes the module-global HTTP client manager.
54
55   Must be called before using any RPC function.
56
57   """
58   global _http_manager
59
60   assert not _http_manager, "RPC module initialized more than once"
61
62   _http_manager = http.client.HttpClientManager()
63
64
65 def Shutdown():
66   """Stops the module-global HTTP client manager.
67
68   Must be called before quitting the program.
69
70   """
71   global _http_manager
72
73   if _http_manager:
74     _http_manager.Shutdown()
75     _http_manager = None
76
77
78 class RpcResult(object):
79   """RPC Result class.
80
81   This class holds an RPC result. It is needed since in multi-node
82   calls we can't raise an exception just because one one out of many
83   failed, and therefore we use this class to encapsulate the result.
84
85   @ivar data: the data payload, for successful results, or None
86   @type failed: boolean
87   @ivar failed: whether the operation failed at transport level (not
88       application level on the remote node)
89   @ivar call: the name of the RPC call
90   @ivar node: the name of the node to which we made the call
91   @ivar offline: whether the operation failed because the node was
92       offline, as opposed to actual failure; offline=True will always
93       imply failed=True, in order to allow simpler checking if
94       the user doesn't care about the exact failure mode
95   @ivar fail_msg: the error message if the call failed
96
97   """
98   def __init__(self, data=None, failed=False, offline=False,
99                call=None, node=None):
100     self.failed = failed
101     self.offline = offline
102     self.call = call
103     self.node = node
104     if offline:
105       self.failed = True
106       self.fail_msg = "Node is marked offline"
107       self.data = self.payload = None
108     elif failed:
109       self.fail_msg = self._EnsureErr(data)
110       self.data = self.payload = None
111     else:
112       self.data = data
113       if not isinstance(self.data, (tuple, list)):
114         self.fail_msg = ("RPC layer error: invalid result type (%s)" %
115                          type(self.data))
116       elif len(data) != 2:
117         self.fail_msg = ("RPC layer error: invalid result length (%d), "
118                          "expected 2" % len(self.data))
119       elif not self.data[0]:
120         self.fail_msg = self._EnsureErr(self.data[1])
121       else:
122         # finally success
123         self.fail_msg = None
124         self.payload = data[1]
125
126   @staticmethod
127   def _EnsureErr(val):
128     """Helper to ensure we return a 'True' value for error."""
129     if val:
130       return val
131     else:
132       return "No error information"
133
134   def Raise(self, msg, prereq=False):
135     """If the result has failed, raise an OpExecError.
136
137     This is used so that LU code doesn't have to check for each
138     result, but instead can call this function.
139
140     """
141     if not self.fail_msg:
142       return
143
144     if not msg: # one could pass None for default message
145       msg = ("Call '%s' to node '%s' has failed: %s" %
146              (self.call, self.node, self.fail_msg))
147     else:
148       msg = "%s: %s" % (msg, self.fail_msg)
149     if prereq:
150       ec = errors.OpPrereqError
151     else:
152       ec = errors.OpExecError
153     raise ec(msg)
154
155   def RemoteFailMsg(self):
156     """Check if the remote procedure failed.
157
158     @return: the fail_msg attribute
159
160     """
161     return self.fail_msg
162
163
164 class Client:
165   """RPC Client class.
166
167   This class, given a (remote) method name, a list of parameters and a
168   list of nodes, will contact (in parallel) all nodes, and return a
169   dict of results (key: node name, value: result).
170
171   One current bug is that generic failure is still signaled by
172   'False' result, which is not good. This overloading of values can
173   cause bugs.
174
175   """
176   def __init__(self, procedure, body, port):
177     self.procedure = procedure
178     self.body = body
179     self.port = port
180     self.nc = {}
181
182     self._ssl_params = \
183       http.HttpSslParams(ssl_key_path=constants.SSL_CERT_FILE,
184                          ssl_cert_path=constants.SSL_CERT_FILE)
185
186   def ConnectList(self, node_list, address_list=None):
187     """Add a list of nodes to the target nodes.
188
189     @type node_list: list
190     @param node_list: the list of node names to connect
191     @type address_list: list or None
192     @keyword address_list: either None or a list with node addresses,
193         which must have the same length as the node list
194
195     """
196     if address_list is None:
197       address_list = [None for _ in node_list]
198     else:
199       assert len(node_list) == len(address_list), \
200              "Name and address lists should have the same length"
201     for node, address in zip(node_list, address_list):
202       self.ConnectNode(node, address)
203
204   def ConnectNode(self, name, address=None):
205     """Add a node to the target list.
206
207     @type name: str
208     @param name: the node name
209     @type address: str
210     @keyword address: the node address, if known
211
212     """
213     if address is None:
214       address = name
215
216     self.nc[name] = \
217       http.client.HttpClientRequest(address, self.port, http.HTTP_PUT,
218                                     "/%s" % self.procedure,
219                                     post_data=self.body,
220                                     ssl_params=self._ssl_params,
221                                     ssl_verify_peer=True)
222
223   def GetResults(self):
224     """Call nodes and return results.
225
226     @rtype: list
227     @return: List of RPC results
228
229     """
230     assert _http_manager, "RPC module not initialized"
231
232     _http_manager.ExecRequests(self.nc.values())
233
234     results = {}
235
236     for name, req in self.nc.iteritems():
237       if req.success and req.resp_status_code == http.HTTP_OK:
238         results[name] = RpcResult(data=serializer.LoadJson(req.resp_body),
239                                   node=name, call=self.procedure)
240         continue
241
242       # TODO: Better error reporting
243       if req.error:
244         msg = req.error
245       else:
246         msg = req.resp_body
247
248       logging.error("RPC error in %s from node %s: %s",
249                     self.procedure, name, msg)
250       results[name] = RpcResult(data=msg, failed=True, node=name,
251                                 call=self.procedure)
252
253     return results
254
255
256 class RpcRunner(object):
257   """RPC runner class"""
258
259   def __init__(self, cfg):
260     """Initialized the rpc runner.
261
262     @type cfg:  C{config.ConfigWriter}
263     @param cfg: the configuration object that will be used to get data
264                 about the cluster
265
266     """
267     self._cfg = cfg
268     self.port = utils.GetDaemonPort(constants.NODED)
269
270   def _InstDict(self, instance, hvp=None, bep=None):
271     """Convert the given instance to a dict.
272
273     This is done via the instance's ToDict() method and additionally
274     we fill the hvparams with the cluster defaults.
275
276     @type instance: L{objects.Instance}
277     @param instance: an Instance object
278     @type hvp: dict or None
279     @param hvp: a dictionary with overridden hypervisor parameters
280     @type bep: dict or None
281     @param bep: a dictionary with overridden backend parameters
282     @rtype: dict
283     @return: the instance dict, with the hvparams filled with the
284         cluster defaults
285
286     """
287     idict = instance.ToDict()
288     cluster = self._cfg.GetClusterInfo()
289     idict["hvparams"] = cluster.FillHV(instance)
290     if hvp is not None:
291       idict["hvparams"].update(hvp)
292     idict["beparams"] = cluster.FillBE(instance)
293     if bep is not None:
294       idict["beparams"].update(bep)
295     for nic in idict["nics"]:
296       nic['nicparams'] = objects.FillDict(
297         cluster.nicparams[constants.PP_DEFAULT],
298         nic['nicparams'])
299     return idict
300
301   def _ConnectList(self, client, node_list, call):
302     """Helper for computing node addresses.
303
304     @type client: L{ganeti.rpc.Client}
305     @param client: a C{Client} instance
306     @type node_list: list
307     @param node_list: the node list we should connect
308     @type call: string
309     @param call: the name of the remote procedure call, for filling in
310         correctly any eventual offline nodes' results
311
312     """
313     all_nodes = self._cfg.GetAllNodesInfo()
314     name_list = []
315     addr_list = []
316     skip_dict = {}
317     for node in node_list:
318       if node in all_nodes:
319         if all_nodes[node].offline:
320           skip_dict[node] = RpcResult(node=node, offline=True, call=call)
321           continue
322         val = all_nodes[node].primary_ip
323       else:
324         val = None
325       addr_list.append(val)
326       name_list.append(node)
327     if name_list:
328       client.ConnectList(name_list, address_list=addr_list)
329     return skip_dict
330
331   def _ConnectNode(self, client, node, call):
332     """Helper for computing one node's address.
333
334     @type client: L{ganeti.rpc.Client}
335     @param client: a C{Client} instance
336     @type node: str
337     @param node: the node we should connect
338     @type call: string
339     @param call: the name of the remote procedure call, for filling in
340         correctly any eventual offline nodes' results
341
342     """
343     node_info = self._cfg.GetNodeInfo(node)
344     if node_info is not None:
345       if node_info.offline:
346         return RpcResult(node=node, offline=True, call=call)
347       addr = node_info.primary_ip
348     else:
349       addr = None
350     client.ConnectNode(node, address=addr)
351
352   def _MultiNodeCall(self, node_list, procedure, args):
353     """Helper for making a multi-node call
354
355     """
356     body = serializer.DumpJson(args, indent=False)
357     c = Client(procedure, body, self.port)
358     skip_dict = self._ConnectList(c, node_list, procedure)
359     skip_dict.update(c.GetResults())
360     return skip_dict
361
362   @classmethod
363   def _StaticMultiNodeCall(cls, node_list, procedure, args,
364                            address_list=None):
365     """Helper for making a multi-node static call
366
367     """
368     body = serializer.DumpJson(args, indent=False)
369     c = Client(procedure, body, utils.GetDaemonPort(constants.NODED))
370     c.ConnectList(node_list, address_list=address_list)
371     return c.GetResults()
372
373   def _SingleNodeCall(self, node, procedure, args):
374     """Helper for making a single-node call
375
376     """
377     body = serializer.DumpJson(args, indent=False)
378     c = Client(procedure, body, self.port)
379     result = self._ConnectNode(c, node, procedure)
380     if result is None:
381       # we did connect, node is not offline
382       result = c.GetResults()[node]
383     return result
384
385   @classmethod
386   def _StaticSingleNodeCall(cls, node, procedure, args):
387     """Helper for making a single-node static call
388
389     """
390     body = serializer.DumpJson(args, indent=False)
391     c = Client(procedure, body, utils.GetDaemonPort(constants.NODED))
392     c.ConnectNode(node)
393     return c.GetResults()[node]
394
395   @staticmethod
396   def _Compress(data):
397     """Compresses a string for transport over RPC.
398
399     Small amounts of data are not compressed.
400
401     @type data: str
402     @param data: Data
403     @rtype: tuple
404     @return: Encoded data to send
405
406     """
407     # Small amounts of data are not compressed
408     if len(data) < 512:
409       return (constants.RPC_ENCODING_NONE, data)
410
411     # Compress with zlib and encode in base64
412     return (constants.RPC_ENCODING_ZLIB_BASE64,
413             base64.b64encode(zlib.compress(data, 3)))
414
415   #
416   # Begin RPC calls
417   #
418
419   def call_lv_list(self, node_list, vg_name):
420     """Gets the logical volumes present in a given volume group.
421
422     This is a multi-node call.
423
424     """
425     return self._MultiNodeCall(node_list, "lv_list", [vg_name])
426
427   def call_vg_list(self, node_list):
428     """Gets the volume group list.
429
430     This is a multi-node call.
431
432     """
433     return self._MultiNodeCall(node_list, "vg_list", [])
434
435   def call_storage_list(self, node_list, su_name, su_args, name, fields):
436     """Get list of storage units.
437
438     This is a multi-node call.
439
440     """
441     return self._MultiNodeCall(node_list, "storage_list",
442                                [su_name, su_args, name, fields])
443
444   def call_storage_modify(self, node, su_name, su_args, name, changes):
445     """Modify a storage unit.
446
447     This is a single-node call.
448
449     """
450     return self._SingleNodeCall(node, "storage_modify",
451                                 [su_name, su_args, name, changes])
452
453   def call_bridges_exist(self, node, bridges_list):
454     """Checks if a node has all the bridges given.
455
456     This method checks if all bridges given in the bridges_list are
457     present on the remote node, so that an instance that uses interfaces
458     on those bridges can be started.
459
460     This is a single-node call.
461
462     """
463     return self._SingleNodeCall(node, "bridges_exist", [bridges_list])
464
465   def call_instance_start(self, node, instance, hvp, bep):
466     """Starts an instance.
467
468     This is a single-node call.
469
470     """
471     idict = self._InstDict(instance, hvp=hvp, bep=bep)
472     return self._SingleNodeCall(node, "instance_start", [idict])
473
474   def call_instance_shutdown(self, node, instance):
475     """Stops an instance.
476
477     This is a single-node call.
478
479     """
480     return self._SingleNodeCall(node, "instance_shutdown",
481                                 [self._InstDict(instance)])
482
483   def call_migration_info(self, node, instance):
484     """Gather the information necessary to prepare an instance migration.
485
486     This is a single-node call.
487
488     @type node: string
489     @param node: the node on which the instance is currently running
490     @type instance: C{objects.Instance}
491     @param instance: the instance definition
492
493     """
494     return self._SingleNodeCall(node, "migration_info",
495                                 [self._InstDict(instance)])
496
497   def call_accept_instance(self, node, instance, info, target):
498     """Prepare a node to accept an instance.
499
500     This is a single-node call.
501
502     @type node: string
503     @param node: the target node for the migration
504     @type instance: C{objects.Instance}
505     @param instance: the instance definition
506     @type info: opaque/hypervisor specific (string/data)
507     @param info: result for the call_migration_info call
508     @type target: string
509     @param target: target hostname (usually ip address) (on the node itself)
510
511     """
512     return self._SingleNodeCall(node, "accept_instance",
513                                 [self._InstDict(instance), info, target])
514
515   def call_finalize_migration(self, node, instance, info, success):
516     """Finalize any target-node migration specific operation.
517
518     This is called both in case of a successful migration and in case of error
519     (in which case it should abort the migration).
520
521     This is a single-node call.
522
523     @type node: string
524     @param node: the target node for the migration
525     @type instance: C{objects.Instance}
526     @param instance: the instance definition
527     @type info: opaque/hypervisor specific (string/data)
528     @param info: result for the call_migration_info call
529     @type success: boolean
530     @param success: whether the migration was a success or a failure
531
532     """
533     return self._SingleNodeCall(node, "finalize_migration",
534                                 [self._InstDict(instance), info, success])
535
536   def call_instance_migrate(self, node, instance, target, live):
537     """Migrate an instance.
538
539     This is a single-node call.
540
541     @type node: string
542     @param node: the node on which the instance is currently running
543     @type instance: C{objects.Instance}
544     @param instance: the instance definition
545     @type target: string
546     @param target: the target node name
547     @type live: boolean
548     @param live: whether the migration should be done live or not (the
549         interpretation of this parameter is left to the hypervisor)
550
551     """
552     return self._SingleNodeCall(node, "instance_migrate",
553                                 [self._InstDict(instance), target, live])
554
555   def call_instance_reboot(self, node, instance, reboot_type):
556     """Reboots an instance.
557
558     This is a single-node call.
559
560     """
561     return self._SingleNodeCall(node, "instance_reboot",
562                                 [self._InstDict(instance), reboot_type])
563
564   def call_instance_os_add(self, node, inst, reinstall):
565     """Installs an OS on the given instance.
566
567     This is a single-node call.
568
569     """
570     return self._SingleNodeCall(node, "instance_os_add",
571                                 [self._InstDict(inst), reinstall])
572
573   def call_instance_run_rename(self, node, inst, old_name):
574     """Run the OS rename script for an instance.
575
576     This is a single-node call.
577
578     """
579     return self._SingleNodeCall(node, "instance_run_rename",
580                                 [self._InstDict(inst), old_name])
581
582   def call_instance_info(self, node, instance, hname):
583     """Returns information about a single instance.
584
585     This is a single-node call.
586
587     @type node: list
588     @param node: the list of nodes to query
589     @type instance: string
590     @param instance: the instance name
591     @type hname: string
592     @param hname: the hypervisor type of the instance
593
594     """
595     return self._SingleNodeCall(node, "instance_info", [instance, hname])
596
597   def call_instance_migratable(self, node, instance):
598     """Checks whether the given instance can be migrated.
599
600     This is a single-node call.
601
602     @param node: the node to query
603     @type instance: L{objects.Instance}
604     @param instance: the instance to check
605
606
607     """
608     return self._SingleNodeCall(node, "instance_migratable",
609                                 [self._InstDict(instance)])
610
611   def call_all_instances_info(self, node_list, hypervisor_list):
612     """Returns information about all instances on the given nodes.
613
614     This is a multi-node call.
615
616     @type node_list: list
617     @param node_list: the list of nodes to query
618     @type hypervisor_list: list
619     @param hypervisor_list: the hypervisors to query for instances
620
621     """
622     return self._MultiNodeCall(node_list, "all_instances_info",
623                                [hypervisor_list])
624
625   def call_instance_list(self, node_list, hypervisor_list):
626     """Returns the list of running instances on a given node.
627
628     This is a multi-node call.
629
630     @type node_list: list
631     @param node_list: the list of nodes to query
632     @type hypervisor_list: list
633     @param hypervisor_list: the hypervisors to query for instances
634
635     """
636     return self._MultiNodeCall(node_list, "instance_list", [hypervisor_list])
637
638   def call_node_tcp_ping(self, node, source, target, port, timeout,
639                          live_port_needed):
640     """Do a TcpPing on the remote node
641
642     This is a single-node call.
643
644     """
645     return self._SingleNodeCall(node, "node_tcp_ping",
646                                 [source, target, port, timeout,
647                                  live_port_needed])
648
649   def call_node_has_ip_address(self, node, address):
650     """Checks if a node has the given IP address.
651
652     This is a single-node call.
653
654     """
655     return self._SingleNodeCall(node, "node_has_ip_address", [address])
656
657   def call_node_info(self, node_list, vg_name, hypervisor_type):
658     """Return node information.
659
660     This will return memory information and volume group size and free
661     space.
662
663     This is a multi-node call.
664
665     @type node_list: list
666     @param node_list: the list of nodes to query
667     @type vg_name: C{string}
668     @param vg_name: the name of the volume group to ask for disk space
669         information
670     @type hypervisor_type: C{str}
671     @param hypervisor_type: the name of the hypervisor to ask for
672         memory information
673
674     """
675     return self._MultiNodeCall(node_list, "node_info",
676                                [vg_name, hypervisor_type])
677
678   def call_node_add(self, node, dsa, dsapub, rsa, rsapub, ssh, sshpub):
679     """Add a node to the cluster.
680
681     This is a single-node call.
682
683     """
684     return self._SingleNodeCall(node, "node_add",
685                                 [dsa, dsapub, rsa, rsapub, ssh, sshpub])
686
687   def call_node_verify(self, node_list, checkdict, cluster_name):
688     """Request verification of given parameters.
689
690     This is a multi-node call.
691
692     """
693     return self._MultiNodeCall(node_list, "node_verify",
694                                [checkdict, cluster_name])
695
696   @classmethod
697   def call_node_start_master(cls, node, start_daemons, no_voting):
698     """Tells a node to activate itself as a master.
699
700     This is a single-node call.
701
702     """
703     return cls._StaticSingleNodeCall(node, "node_start_master",
704                                      [start_daemons, no_voting])
705
706   @classmethod
707   def call_node_stop_master(cls, node, stop_daemons):
708     """Tells a node to demote itself from master status.
709
710     This is a single-node call.
711
712     """
713     return cls._StaticSingleNodeCall(node, "node_stop_master", [stop_daemons])
714
715   @classmethod
716   def call_master_info(cls, node_list):
717     """Query master info.
718
719     This is a multi-node call.
720
721     """
722     # TODO: should this method query down nodes?
723     return cls._StaticMultiNodeCall(node_list, "master_info", [])
724
725   def call_version(self, node_list):
726     """Query node version.
727
728     This is a multi-node call.
729
730     """
731     return self._MultiNodeCall(node_list, "version", [])
732
733   def call_blockdev_create(self, node, bdev, size, owner, on_primary, info):
734     """Request creation of a given block device.
735
736     This is a single-node call.
737
738     """
739     return self._SingleNodeCall(node, "blockdev_create",
740                                 [bdev.ToDict(), size, owner, on_primary, info])
741
742   def call_blockdev_remove(self, node, bdev):
743     """Request removal of a given block device.
744
745     This is a single-node call.
746
747     """
748     return self._SingleNodeCall(node, "blockdev_remove", [bdev.ToDict()])
749
750   def call_blockdev_rename(self, node, devlist):
751     """Request rename of the given block devices.
752
753     This is a single-node call.
754
755     """
756     return self._SingleNodeCall(node, "blockdev_rename",
757                                 [(d.ToDict(), uid) for d, uid in devlist])
758
759   def call_blockdev_assemble(self, node, disk, owner, on_primary):
760     """Request assembling of a given block device.
761
762     This is a single-node call.
763
764     """
765     return self._SingleNodeCall(node, "blockdev_assemble",
766                                 [disk.ToDict(), owner, on_primary])
767
768   def call_blockdev_shutdown(self, node, disk):
769     """Request shutdown of a given block device.
770
771     This is a single-node call.
772
773     """
774     return self._SingleNodeCall(node, "blockdev_shutdown", [disk.ToDict()])
775
776   def call_blockdev_addchildren(self, node, bdev, ndevs):
777     """Request adding a list of children to a (mirroring) device.
778
779     This is a single-node call.
780
781     """
782     return self._SingleNodeCall(node, "blockdev_addchildren",
783                                 [bdev.ToDict(),
784                                  [disk.ToDict() for disk in ndevs]])
785
786   def call_blockdev_removechildren(self, node, bdev, ndevs):
787     """Request removing a list of children from a (mirroring) device.
788
789     This is a single-node call.
790
791     """
792     return self._SingleNodeCall(node, "blockdev_removechildren",
793                                 [bdev.ToDict(),
794                                  [disk.ToDict() for disk in ndevs]])
795
796   def call_blockdev_getmirrorstatus(self, node, disks):
797     """Request status of a (mirroring) device.
798
799     This is a single-node call.
800
801     """
802     result = self._SingleNodeCall(node, "blockdev_getmirrorstatus",
803                                   [dsk.ToDict() for dsk in disks])
804     if not result.failed:
805       result.payload = [objects.BlockDevStatus.FromDict(i)
806                         for i in result.payload]
807     return result
808
809   def call_blockdev_find(self, node, disk):
810     """Request identification of a given block device.
811
812     This is a single-node call.
813
814     """
815     result = self._SingleNodeCall(node, "blockdev_find", [disk.ToDict()])
816     if not result.failed and result.payload is not None:
817       result.payload = objects.BlockDevStatus.FromDict(result.payload)
818     return result
819
820   def call_blockdev_close(self, node, instance_name, disks):
821     """Closes the given block devices.
822
823     This is a single-node call.
824
825     """
826     params = [instance_name, [cf.ToDict() for cf in disks]]
827     return self._SingleNodeCall(node, "blockdev_close", params)
828
829   def call_drbd_disconnect_net(self, node_list, nodes_ip, disks):
830     """Disconnects the network of the given drbd devices.
831
832     This is a multi-node call.
833
834     """
835     return self._MultiNodeCall(node_list, "drbd_disconnect_net",
836                                [nodes_ip, [cf.ToDict() for cf in disks]])
837
838   def call_drbd_attach_net(self, node_list, nodes_ip,
839                            disks, instance_name, multimaster):
840     """Disconnects the given drbd devices.
841
842     This is a multi-node call.
843
844     """
845     return self._MultiNodeCall(node_list, "drbd_attach_net",
846                                [nodes_ip, [cf.ToDict() for cf in disks],
847                                 instance_name, multimaster])
848
849   def call_drbd_wait_sync(self, node_list, nodes_ip, disks):
850     """Waits for the synchronization of drbd devices is complete.
851
852     This is a multi-node call.
853
854     """
855     return self._MultiNodeCall(node_list, "drbd_wait_sync",
856                                [nodes_ip, [cf.ToDict() for cf in disks]])
857
858   @classmethod
859   def call_upload_file(cls, node_list, file_name, address_list=None):
860     """Upload a file.
861
862     The node will refuse the operation in case the file is not on the
863     approved file list.
864
865     This is a multi-node call.
866
867     @type node_list: list
868     @param node_list: the list of node names to upload to
869     @type file_name: str
870     @param file_name: the filename to upload
871     @type address_list: list or None
872     @keyword address_list: an optional list of node addresses, in order
873         to optimize the RPC speed
874
875     """
876     file_contents = utils.ReadFile(file_name)
877     data = cls._Compress(file_contents)
878     st = os.stat(file_name)
879     params = [file_name, data, st.st_mode, st.st_uid, st.st_gid,
880               st.st_atime, st.st_mtime]
881     return cls._StaticMultiNodeCall(node_list, "upload_file", params,
882                                     address_list=address_list)
883
884   @classmethod
885   def call_write_ssconf_files(cls, node_list, values):
886     """Write ssconf files.
887
888     This is a multi-node call.
889
890     """
891     return cls._StaticMultiNodeCall(node_list, "write_ssconf_files", [values])
892
893   def call_os_diagnose(self, node_list):
894     """Request a diagnose of OS definitions.
895
896     This is a multi-node call.
897
898     """
899     return self._MultiNodeCall(node_list, "os_diagnose", [])
900
901   def call_os_get(self, node, name):
902     """Returns an OS definition.
903
904     This is a single-node call.
905
906     """
907     result = self._SingleNodeCall(node, "os_get", [name])
908     if not result.failed and isinstance(result.data, dict):
909       result.data = objects.OS.FromDict(result.data)
910     return result
911
912   def call_hooks_runner(self, node_list, hpath, phase, env):
913     """Call the hooks runner.
914
915     Args:
916       - op: the OpCode instance
917       - env: a dictionary with the environment
918
919     This is a multi-node call.
920
921     """
922     params = [hpath, phase, env]
923     return self._MultiNodeCall(node_list, "hooks_runner", params)
924
925   def call_iallocator_runner(self, node, name, idata):
926     """Call an iallocator on a remote node
927
928     Args:
929       - name: the iallocator name
930       - input: the json-encoded input string
931
932     This is a single-node call.
933
934     """
935     return self._SingleNodeCall(node, "iallocator_runner", [name, idata])
936
937   def call_blockdev_grow(self, node, cf_bdev, amount):
938     """Request a snapshot of the given block device.
939
940     This is a single-node call.
941
942     """
943     return self._SingleNodeCall(node, "blockdev_grow",
944                                 [cf_bdev.ToDict(), amount])
945
946   def call_blockdev_snapshot(self, node, cf_bdev):
947     """Request a snapshot of the given block device.
948
949     This is a single-node call.
950
951     """
952     return self._SingleNodeCall(node, "blockdev_snapshot", [cf_bdev.ToDict()])
953
954   def call_snapshot_export(self, node, snap_bdev, dest_node, instance,
955                            cluster_name, idx):
956     """Request the export of a given snapshot.
957
958     This is a single-node call.
959
960     """
961     return self._SingleNodeCall(node, "snapshot_export",
962                                 [snap_bdev.ToDict(), dest_node,
963                                  self._InstDict(instance), cluster_name, idx])
964
965   def call_finalize_export(self, node, instance, snap_disks):
966     """Request the completion of an export operation.
967
968     This writes the export config file, etc.
969
970     This is a single-node call.
971
972     """
973     flat_disks = []
974     for disk in snap_disks:
975       if isinstance(disk, bool):
976         flat_disks.append(disk)
977       else:
978         flat_disks.append(disk.ToDict())
979
980     return self._SingleNodeCall(node, "finalize_export",
981                                 [self._InstDict(instance), flat_disks])
982
983   def call_export_info(self, node, path):
984     """Queries the export information in a given path.
985
986     This is a single-node call.
987
988     """
989     return self._SingleNodeCall(node, "export_info", [path])
990
991   def call_instance_os_import(self, node, inst, src_node, src_images,
992                               cluster_name):
993     """Request the import of a backup into an instance.
994
995     This is a single-node call.
996
997     """
998     return self._SingleNodeCall(node, "instance_os_import",
999                                 [self._InstDict(inst), src_node, src_images,
1000                                  cluster_name])
1001
1002   def call_export_list(self, node_list):
1003     """Gets the stored exports list.
1004
1005     This is a multi-node call.
1006
1007     """
1008     return self._MultiNodeCall(node_list, "export_list", [])
1009
1010   def call_export_remove(self, node, export):
1011     """Requests removal of a given export.
1012
1013     This is a single-node call.
1014
1015     """
1016     return self._SingleNodeCall(node, "export_remove", [export])
1017
1018   @classmethod
1019   def call_node_leave_cluster(cls, node):
1020     """Requests a node to clean the cluster information it has.
1021
1022     This will remove the configuration information from the ganeti data
1023     dir.
1024
1025     This is a single-node call.
1026
1027     """
1028     return cls._StaticSingleNodeCall(node, "node_leave_cluster", [])
1029
1030   def call_node_volumes(self, node_list):
1031     """Gets all volumes on node(s).
1032
1033     This is a multi-node call.
1034
1035     """
1036     return self._MultiNodeCall(node_list, "node_volumes", [])
1037
1038   def call_node_demote_from_mc(self, node):
1039     """Demote a node from the master candidate role.
1040
1041     This is a single-node call.
1042
1043     """
1044     return self._SingleNodeCall(node, "node_demote_from_mc", [])
1045
1046
1047   def call_node_powercycle(self, node, hypervisor):
1048     """Tries to powercycle a node.
1049
1050     This is a single-node call.
1051
1052     """
1053     return self._SingleNodeCall(node, "node_powercycle", [hypervisor])
1054
1055
1056   def call_test_delay(self, node_list, duration):
1057     """Sleep for a fixed time on given node(s).
1058
1059     This is a multi-node call.
1060
1061     """
1062     return self._MultiNodeCall(node_list, "test_delay", [duration])
1063
1064   def call_file_storage_dir_create(self, node, file_storage_dir):
1065     """Create the given file storage directory.
1066
1067     This is a single-node call.
1068
1069     """
1070     return self._SingleNodeCall(node, "file_storage_dir_create",
1071                                 [file_storage_dir])
1072
1073   def call_file_storage_dir_remove(self, node, file_storage_dir):
1074     """Remove the given file storage directory.
1075
1076     This is a single-node call.
1077
1078     """
1079     return self._SingleNodeCall(node, "file_storage_dir_remove",
1080                                 [file_storage_dir])
1081
1082   def call_file_storage_dir_rename(self, node, old_file_storage_dir,
1083                                    new_file_storage_dir):
1084     """Rename file storage directory.
1085
1086     This is a single-node call.
1087
1088     """
1089     return self._SingleNodeCall(node, "file_storage_dir_rename",
1090                                 [old_file_storage_dir, new_file_storage_dir])
1091
1092   @classmethod
1093   def call_jobqueue_update(cls, node_list, address_list, file_name, content):
1094     """Update job queue.
1095
1096     This is a multi-node call.
1097
1098     """
1099     return cls._StaticMultiNodeCall(node_list, "jobqueue_update",
1100                                     [file_name, cls._Compress(content)],
1101                                     address_list=address_list)
1102
1103   @classmethod
1104   def call_jobqueue_purge(cls, node):
1105     """Purge job queue.
1106
1107     This is a single-node call.
1108
1109     """
1110     return cls._StaticSingleNodeCall(node, "jobqueue_purge", [])
1111
1112   @classmethod
1113   def call_jobqueue_rename(cls, node_list, address_list, rename):
1114     """Rename a job queue file.
1115
1116     This is a multi-node call.
1117
1118     """
1119     return cls._StaticMultiNodeCall(node_list, "jobqueue_rename", rename,
1120                                     address_list=address_list)
1121
1122   @classmethod
1123   def call_jobqueue_set_drain(cls, node_list, drain_flag):
1124     """Set the drain flag on the queue.
1125
1126     This is a multi-node call.
1127
1128     @type node_list: list
1129     @param node_list: the list of nodes to query
1130     @type drain_flag: bool
1131     @param drain_flag: if True, will set the drain flag, otherwise reset it.
1132
1133     """
1134     return cls._StaticMultiNodeCall(node_list, "jobqueue_set_drain",
1135                                     [drain_flag])
1136
1137   def call_hypervisor_validate_params(self, node_list, hvname, hvparams):
1138     """Validate the hypervisor params.
1139
1140     This is a multi-node call.
1141
1142     @type node_list: list
1143     @param node_list: the list of nodes to query
1144     @type hvname: string
1145     @param hvname: the hypervisor name
1146     @type hvparams: dict
1147     @param hvparams: the hypervisor parameters to be validated
1148
1149     """
1150     cluster = self._cfg.GetClusterInfo()
1151     hv_full = objects.FillDict(cluster.hvparams.get(hvname, {}), hvparams)
1152     return self._MultiNodeCall(node_list, "hypervisor_validate_params",
1153                                [hvname, hv_full])