Remove RpcResult.failed attribute
[ganeti-local] / lib / rpc.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Inter-node RPC library.
23
24 """
25
26 # pylint: disable-msg=C0103,R0201,R0904
27 # C0103: Invalid name, since call_ are not valid
28 # R0201: Method could be a function, we keep all rpcs instance methods
29 # as not to change them back and forth between static/instance methods
30 # if they need to start using instance attributes
31 # R0904: Too many public methods
32
33 import os
34 import logging
35 import zlib
36 import base64
37
38 from ganeti import utils
39 from ganeti import objects
40 from ganeti import http
41 from ganeti import serializer
42 from ganeti import constants
43 from ganeti import errors
44
45 import ganeti.http.client
46
47
48 # Module level variable
49 _http_manager = None
50
51
52 def Init():
53   """Initializes the module-global HTTP client manager.
54
55   Must be called before using any RPC function.
56
57   """
58   global _http_manager
59
60   assert not _http_manager, "RPC module initialized more than once"
61
62   _http_manager = http.client.HttpClientManager()
63
64
65 def Shutdown():
66   """Stops the module-global HTTP client manager.
67
68   Must be called before quitting the program.
69
70   """
71   global _http_manager
72
73   if _http_manager:
74     _http_manager.Shutdown()
75     _http_manager = None
76
77
78 class RpcResult(object):
79   """RPC Result class.
80
81   This class holds an RPC result. It is needed since in multi-node
82   calls we can't raise an exception just because one one out of many
83   failed, and therefore we use this class to encapsulate the result.
84
85   @ivar data: the data payload, for successful results, or None
86   @ivar call: the name of the RPC call
87   @ivar node: the name of the node to which we made the call
88   @ivar offline: whether the operation failed because the node was
89       offline, as opposed to actual failure; offline=True will always
90       imply failed=True, in order to allow simpler checking if
91       the user doesn't care about the exact failure mode
92   @ivar fail_msg: the error message if the call failed
93
94   """
95   def __init__(self, data=None, failed=False, offline=False,
96                call=None, node=None):
97     self.offline = offline
98     self.call = call
99     self.node = node
100     if offline:
101       self.fail_msg = "Node is marked offline"
102       self.data = self.payload = None
103     elif failed:
104       self.fail_msg = self._EnsureErr(data)
105       self.data = self.payload = None
106     else:
107       self.data = data
108       if not isinstance(self.data, (tuple, list)):
109         self.fail_msg = ("RPC layer error: invalid result type (%s)" %
110                          type(self.data))
111       elif len(data) != 2:
112         self.fail_msg = ("RPC layer error: invalid result length (%d), "
113                          "expected 2" % len(self.data))
114       elif not self.data[0]:
115         self.fail_msg = self._EnsureErr(self.data[1])
116       else:
117         # finally success
118         self.fail_msg = None
119         self.payload = data[1]
120
121   @staticmethod
122   def _EnsureErr(val):
123     """Helper to ensure we return a 'True' value for error."""
124     if val:
125       return val
126     else:
127       return "No error information"
128
129   def Raise(self, msg, prereq=False):
130     """If the result has failed, raise an OpExecError.
131
132     This is used so that LU code doesn't have to check for each
133     result, but instead can call this function.
134
135     """
136     if not self.fail_msg:
137       return
138
139     if not msg: # one could pass None for default message
140       msg = ("Call '%s' to node '%s' has failed: %s" %
141              (self.call, self.node, self.fail_msg))
142     else:
143       msg = "%s: %s" % (msg, self.fail_msg)
144     if prereq:
145       ec = errors.OpPrereqError
146     else:
147       ec = errors.OpExecError
148     raise ec(msg)
149
150   def RemoteFailMsg(self):
151     """Check if the remote procedure failed.
152
153     @return: the fail_msg attribute
154
155     """
156     return self.fail_msg
157
158
159 class Client:
160   """RPC Client class.
161
162   This class, given a (remote) method name, a list of parameters and a
163   list of nodes, will contact (in parallel) all nodes, and return a
164   dict of results (key: node name, value: result).
165
166   One current bug is that generic failure is still signaled by
167   'False' result, which is not good. This overloading of values can
168   cause bugs.
169
170   """
171   def __init__(self, procedure, body, port):
172     self.procedure = procedure
173     self.body = body
174     self.port = port
175     self.nc = {}
176
177     self._ssl_params = \
178       http.HttpSslParams(ssl_key_path=constants.SSL_CERT_FILE,
179                          ssl_cert_path=constants.SSL_CERT_FILE)
180
181   def ConnectList(self, node_list, address_list=None):
182     """Add a list of nodes to the target nodes.
183
184     @type node_list: list
185     @param node_list: the list of node names to connect
186     @type address_list: list or None
187     @keyword address_list: either None or a list with node addresses,
188         which must have the same length as the node list
189
190     """
191     if address_list is None:
192       address_list = [None for _ in node_list]
193     else:
194       assert len(node_list) == len(address_list), \
195              "Name and address lists should have the same length"
196     for node, address in zip(node_list, address_list):
197       self.ConnectNode(node, address)
198
199   def ConnectNode(self, name, address=None):
200     """Add a node to the target list.
201
202     @type name: str
203     @param name: the node name
204     @type address: str
205     @keyword address: the node address, if known
206
207     """
208     if address is None:
209       address = name
210
211     self.nc[name] = \
212       http.client.HttpClientRequest(address, self.port, http.HTTP_PUT,
213                                     "/%s" % self.procedure,
214                                     post_data=self.body,
215                                     ssl_params=self._ssl_params,
216                                     ssl_verify_peer=True)
217
218   def GetResults(self):
219     """Call nodes and return results.
220
221     @rtype: list
222     @return: List of RPC results
223
224     """
225     assert _http_manager, "RPC module not initialized"
226
227     _http_manager.ExecRequests(self.nc.values())
228
229     results = {}
230
231     for name, req in self.nc.iteritems():
232       if req.success and req.resp_status_code == http.HTTP_OK:
233         results[name] = RpcResult(data=serializer.LoadJson(req.resp_body),
234                                   node=name, call=self.procedure)
235         continue
236
237       # TODO: Better error reporting
238       if req.error:
239         msg = req.error
240       else:
241         msg = req.resp_body
242
243       logging.error("RPC error in %s from node %s: %s",
244                     self.procedure, name, msg)
245       results[name] = RpcResult(data=msg, failed=True, node=name,
246                                 call=self.procedure)
247
248     return results
249
250
251 class RpcRunner(object):
252   """RPC runner class"""
253
254   def __init__(self, cfg):
255     """Initialized the rpc runner.
256
257     @type cfg:  C{config.ConfigWriter}
258     @param cfg: the configuration object that will be used to get data
259                 about the cluster
260
261     """
262     self._cfg = cfg
263     self.port = utils.GetDaemonPort(constants.NODED)
264
265   def _InstDict(self, instance, hvp=None, bep=None):
266     """Convert the given instance to a dict.
267
268     This is done via the instance's ToDict() method and additionally
269     we fill the hvparams with the cluster defaults.
270
271     @type instance: L{objects.Instance}
272     @param instance: an Instance object
273     @type hvp: dict or None
274     @param hvp: a dictionary with overridden hypervisor parameters
275     @type bep: dict or None
276     @param bep: a dictionary with overridden backend parameters
277     @rtype: dict
278     @return: the instance dict, with the hvparams filled with the
279         cluster defaults
280
281     """
282     idict = instance.ToDict()
283     cluster = self._cfg.GetClusterInfo()
284     idict["hvparams"] = cluster.FillHV(instance)
285     if hvp is not None:
286       idict["hvparams"].update(hvp)
287     idict["beparams"] = cluster.FillBE(instance)
288     if bep is not None:
289       idict["beparams"].update(bep)
290     for nic in idict["nics"]:
291       nic['nicparams'] = objects.FillDict(
292         cluster.nicparams[constants.PP_DEFAULT],
293         nic['nicparams'])
294     return idict
295
296   def _ConnectList(self, client, node_list, call):
297     """Helper for computing node addresses.
298
299     @type client: L{ganeti.rpc.Client}
300     @param client: a C{Client} instance
301     @type node_list: list
302     @param node_list: the node list we should connect
303     @type call: string
304     @param call: the name of the remote procedure call, for filling in
305         correctly any eventual offline nodes' results
306
307     """
308     all_nodes = self._cfg.GetAllNodesInfo()
309     name_list = []
310     addr_list = []
311     skip_dict = {}
312     for node in node_list:
313       if node in all_nodes:
314         if all_nodes[node].offline:
315           skip_dict[node] = RpcResult(node=node, offline=True, call=call)
316           continue
317         val = all_nodes[node].primary_ip
318       else:
319         val = None
320       addr_list.append(val)
321       name_list.append(node)
322     if name_list:
323       client.ConnectList(name_list, address_list=addr_list)
324     return skip_dict
325
326   def _ConnectNode(self, client, node, call):
327     """Helper for computing one node's address.
328
329     @type client: L{ganeti.rpc.Client}
330     @param client: a C{Client} instance
331     @type node: str
332     @param node: the node we should connect
333     @type call: string
334     @param call: the name of the remote procedure call, for filling in
335         correctly any eventual offline nodes' results
336
337     """
338     node_info = self._cfg.GetNodeInfo(node)
339     if node_info is not None:
340       if node_info.offline:
341         return RpcResult(node=node, offline=True, call=call)
342       addr = node_info.primary_ip
343     else:
344       addr = None
345     client.ConnectNode(node, address=addr)
346
347   def _MultiNodeCall(self, node_list, procedure, args):
348     """Helper for making a multi-node call
349
350     """
351     body = serializer.DumpJson(args, indent=False)
352     c = Client(procedure, body, self.port)
353     skip_dict = self._ConnectList(c, node_list, procedure)
354     skip_dict.update(c.GetResults())
355     return skip_dict
356
357   @classmethod
358   def _StaticMultiNodeCall(cls, node_list, procedure, args,
359                            address_list=None):
360     """Helper for making a multi-node static call
361
362     """
363     body = serializer.DumpJson(args, indent=False)
364     c = Client(procedure, body, utils.GetDaemonPort(constants.NODED))
365     c.ConnectList(node_list, address_list=address_list)
366     return c.GetResults()
367
368   def _SingleNodeCall(self, node, procedure, args):
369     """Helper for making a single-node call
370
371     """
372     body = serializer.DumpJson(args, indent=False)
373     c = Client(procedure, body, self.port)
374     result = self._ConnectNode(c, node, procedure)
375     if result is None:
376       # we did connect, node is not offline
377       result = c.GetResults()[node]
378     return result
379
380   @classmethod
381   def _StaticSingleNodeCall(cls, node, procedure, args):
382     """Helper for making a single-node static call
383
384     """
385     body = serializer.DumpJson(args, indent=False)
386     c = Client(procedure, body, utils.GetDaemonPort(constants.NODED))
387     c.ConnectNode(node)
388     return c.GetResults()[node]
389
390   @staticmethod
391   def _Compress(data):
392     """Compresses a string for transport over RPC.
393
394     Small amounts of data are not compressed.
395
396     @type data: str
397     @param data: Data
398     @rtype: tuple
399     @return: Encoded data to send
400
401     """
402     # Small amounts of data are not compressed
403     if len(data) < 512:
404       return (constants.RPC_ENCODING_NONE, data)
405
406     # Compress with zlib and encode in base64
407     return (constants.RPC_ENCODING_ZLIB_BASE64,
408             base64.b64encode(zlib.compress(data, 3)))
409
410   #
411   # Begin RPC calls
412   #
413
414   def call_lv_list(self, node_list, vg_name):
415     """Gets the logical volumes present in a given volume group.
416
417     This is a multi-node call.
418
419     """
420     return self._MultiNodeCall(node_list, "lv_list", [vg_name])
421
422   def call_vg_list(self, node_list):
423     """Gets the volume group list.
424
425     This is a multi-node call.
426
427     """
428     return self._MultiNodeCall(node_list, "vg_list", [])
429
430   def call_storage_list(self, node_list, su_name, su_args, name, fields):
431     """Get list of storage units.
432
433     This is a multi-node call.
434
435     """
436     return self._MultiNodeCall(node_list, "storage_list",
437                                [su_name, su_args, name, fields])
438
439   def call_storage_modify(self, node, su_name, su_args, name, changes):
440     """Modify a storage unit.
441
442     This is a single-node call.
443
444     """
445     return self._SingleNodeCall(node, "storage_modify",
446                                 [su_name, su_args, name, changes])
447
448   def call_storage_execute(self, node, su_name, su_args, name, op):
449     """Executes an operation on a storage unit.
450
451     This is a single-node call.
452
453     """
454     return self._SingleNodeCall(node, "storage_execute",
455                                 [su_name, su_args, name, op])
456
457   def call_bridges_exist(self, node, bridges_list):
458     """Checks if a node has all the bridges given.
459
460     This method checks if all bridges given in the bridges_list are
461     present on the remote node, so that an instance that uses interfaces
462     on those bridges can be started.
463
464     This is a single-node call.
465
466     """
467     return self._SingleNodeCall(node, "bridges_exist", [bridges_list])
468
469   def call_instance_start(self, node, instance, hvp, bep):
470     """Starts an instance.
471
472     This is a single-node call.
473
474     """
475     idict = self._InstDict(instance, hvp=hvp, bep=bep)
476     return self._SingleNodeCall(node, "instance_start", [idict])
477
478   def call_instance_shutdown(self, node, instance):
479     """Stops an instance.
480
481     This is a single-node call.
482
483     """
484     return self._SingleNodeCall(node, "instance_shutdown",
485                                 [self._InstDict(instance)])
486
487   def call_migration_info(self, node, instance):
488     """Gather the information necessary to prepare an instance migration.
489
490     This is a single-node call.
491
492     @type node: string
493     @param node: the node on which the instance is currently running
494     @type instance: C{objects.Instance}
495     @param instance: the instance definition
496
497     """
498     return self._SingleNodeCall(node, "migration_info",
499                                 [self._InstDict(instance)])
500
501   def call_accept_instance(self, node, instance, info, target):
502     """Prepare a node to accept an instance.
503
504     This is a single-node call.
505
506     @type node: string
507     @param node: the target node for the migration
508     @type instance: C{objects.Instance}
509     @param instance: the instance definition
510     @type info: opaque/hypervisor specific (string/data)
511     @param info: result for the call_migration_info call
512     @type target: string
513     @param target: target hostname (usually ip address) (on the node itself)
514
515     """
516     return self._SingleNodeCall(node, "accept_instance",
517                                 [self._InstDict(instance), info, target])
518
519   def call_finalize_migration(self, node, instance, info, success):
520     """Finalize any target-node migration specific operation.
521
522     This is called both in case of a successful migration and in case of error
523     (in which case it should abort the migration).
524
525     This is a single-node call.
526
527     @type node: string
528     @param node: the target node for the migration
529     @type instance: C{objects.Instance}
530     @param instance: the instance definition
531     @type info: opaque/hypervisor specific (string/data)
532     @param info: result for the call_migration_info call
533     @type success: boolean
534     @param success: whether the migration was a success or a failure
535
536     """
537     return self._SingleNodeCall(node, "finalize_migration",
538                                 [self._InstDict(instance), info, success])
539
540   def call_instance_migrate(self, node, instance, target, live):
541     """Migrate an instance.
542
543     This is a single-node call.
544
545     @type node: string
546     @param node: the node on which the instance is currently running
547     @type instance: C{objects.Instance}
548     @param instance: the instance definition
549     @type target: string
550     @param target: the target node name
551     @type live: boolean
552     @param live: whether the migration should be done live or not (the
553         interpretation of this parameter is left to the hypervisor)
554
555     """
556     return self._SingleNodeCall(node, "instance_migrate",
557                                 [self._InstDict(instance), target, live])
558
559   def call_instance_reboot(self, node, instance, reboot_type):
560     """Reboots an instance.
561
562     This is a single-node call.
563
564     """
565     return self._SingleNodeCall(node, "instance_reboot",
566                                 [self._InstDict(instance), reboot_type])
567
568   def call_instance_os_add(self, node, inst, reinstall):
569     """Installs an OS on the given instance.
570
571     This is a single-node call.
572
573     """
574     return self._SingleNodeCall(node, "instance_os_add",
575                                 [self._InstDict(inst), reinstall])
576
577   def call_instance_run_rename(self, node, inst, old_name):
578     """Run the OS rename script for an instance.
579
580     This is a single-node call.
581
582     """
583     return self._SingleNodeCall(node, "instance_run_rename",
584                                 [self._InstDict(inst), old_name])
585
586   def call_instance_info(self, node, instance, hname):
587     """Returns information about a single instance.
588
589     This is a single-node call.
590
591     @type node: list
592     @param node: the list of nodes to query
593     @type instance: string
594     @param instance: the instance name
595     @type hname: string
596     @param hname: the hypervisor type of the instance
597
598     """
599     return self._SingleNodeCall(node, "instance_info", [instance, hname])
600
601   def call_instance_migratable(self, node, instance):
602     """Checks whether the given instance can be migrated.
603
604     This is a single-node call.
605
606     @param node: the node to query
607     @type instance: L{objects.Instance}
608     @param instance: the instance to check
609
610
611     """
612     return self._SingleNodeCall(node, "instance_migratable",
613                                 [self._InstDict(instance)])
614
615   def call_all_instances_info(self, node_list, hypervisor_list):
616     """Returns information about all instances on the given nodes.
617
618     This is a multi-node call.
619
620     @type node_list: list
621     @param node_list: the list of nodes to query
622     @type hypervisor_list: list
623     @param hypervisor_list: the hypervisors to query for instances
624
625     """
626     return self._MultiNodeCall(node_list, "all_instances_info",
627                                [hypervisor_list])
628
629   def call_instance_list(self, node_list, hypervisor_list):
630     """Returns the list of running instances on a given node.
631
632     This is a multi-node call.
633
634     @type node_list: list
635     @param node_list: the list of nodes to query
636     @type hypervisor_list: list
637     @param hypervisor_list: the hypervisors to query for instances
638
639     """
640     return self._MultiNodeCall(node_list, "instance_list", [hypervisor_list])
641
642   def call_node_tcp_ping(self, node, source, target, port, timeout,
643                          live_port_needed):
644     """Do a TcpPing on the remote node
645
646     This is a single-node call.
647
648     """
649     return self._SingleNodeCall(node, "node_tcp_ping",
650                                 [source, target, port, timeout,
651                                  live_port_needed])
652
653   def call_node_has_ip_address(self, node, address):
654     """Checks if a node has the given IP address.
655
656     This is a single-node call.
657
658     """
659     return self._SingleNodeCall(node, "node_has_ip_address", [address])
660
661   def call_node_info(self, node_list, vg_name, hypervisor_type):
662     """Return node information.
663
664     This will return memory information and volume group size and free
665     space.
666
667     This is a multi-node call.
668
669     @type node_list: list
670     @param node_list: the list of nodes to query
671     @type vg_name: C{string}
672     @param vg_name: the name of the volume group to ask for disk space
673         information
674     @type hypervisor_type: C{str}
675     @param hypervisor_type: the name of the hypervisor to ask for
676         memory information
677
678     """
679     return self._MultiNodeCall(node_list, "node_info",
680                                [vg_name, hypervisor_type])
681
682   def call_node_add(self, node, dsa, dsapub, rsa, rsapub, ssh, sshpub):
683     """Add a node to the cluster.
684
685     This is a single-node call.
686
687     """
688     return self._SingleNodeCall(node, "node_add",
689                                 [dsa, dsapub, rsa, rsapub, ssh, sshpub])
690
691   def call_node_verify(self, node_list, checkdict, cluster_name):
692     """Request verification of given parameters.
693
694     This is a multi-node call.
695
696     """
697     return self._MultiNodeCall(node_list, "node_verify",
698                                [checkdict, cluster_name])
699
700   @classmethod
701   def call_node_start_master(cls, node, start_daemons, no_voting):
702     """Tells a node to activate itself as a master.
703
704     This is a single-node call.
705
706     """
707     return cls._StaticSingleNodeCall(node, "node_start_master",
708                                      [start_daemons, no_voting])
709
710   @classmethod
711   def call_node_stop_master(cls, node, stop_daemons):
712     """Tells a node to demote itself from master status.
713
714     This is a single-node call.
715
716     """
717     return cls._StaticSingleNodeCall(node, "node_stop_master", [stop_daemons])
718
719   @classmethod
720   def call_master_info(cls, node_list):
721     """Query master info.
722
723     This is a multi-node call.
724
725     """
726     # TODO: should this method query down nodes?
727     return cls._StaticMultiNodeCall(node_list, "master_info", [])
728
729   def call_version(self, node_list):
730     """Query node version.
731
732     This is a multi-node call.
733
734     """
735     return self._MultiNodeCall(node_list, "version", [])
736
737   def call_blockdev_create(self, node, bdev, size, owner, on_primary, info):
738     """Request creation of a given block device.
739
740     This is a single-node call.
741
742     """
743     return self._SingleNodeCall(node, "blockdev_create",
744                                 [bdev.ToDict(), size, owner, on_primary, info])
745
746   def call_blockdev_remove(self, node, bdev):
747     """Request removal of a given block device.
748
749     This is a single-node call.
750
751     """
752     return self._SingleNodeCall(node, "blockdev_remove", [bdev.ToDict()])
753
754   def call_blockdev_rename(self, node, devlist):
755     """Request rename of the given block devices.
756
757     This is a single-node call.
758
759     """
760     return self._SingleNodeCall(node, "blockdev_rename",
761                                 [(d.ToDict(), uid) for d, uid in devlist])
762
763   def call_blockdev_assemble(self, node, disk, owner, on_primary):
764     """Request assembling of a given block device.
765
766     This is a single-node call.
767
768     """
769     return self._SingleNodeCall(node, "blockdev_assemble",
770                                 [disk.ToDict(), owner, on_primary])
771
772   def call_blockdev_shutdown(self, node, disk):
773     """Request shutdown of a given block device.
774
775     This is a single-node call.
776
777     """
778     return self._SingleNodeCall(node, "blockdev_shutdown", [disk.ToDict()])
779
780   def call_blockdev_addchildren(self, node, bdev, ndevs):
781     """Request adding a list of children to a (mirroring) device.
782
783     This is a single-node call.
784
785     """
786     return self._SingleNodeCall(node, "blockdev_addchildren",
787                                 [bdev.ToDict(),
788                                  [disk.ToDict() for disk in ndevs]])
789
790   def call_blockdev_removechildren(self, node, bdev, ndevs):
791     """Request removing a list of children from a (mirroring) device.
792
793     This is a single-node call.
794
795     """
796     return self._SingleNodeCall(node, "blockdev_removechildren",
797                                 [bdev.ToDict(),
798                                  [disk.ToDict() for disk in ndevs]])
799
800   def call_blockdev_getmirrorstatus(self, node, disks):
801     """Request status of a (mirroring) device.
802
803     This is a single-node call.
804
805     """
806     result = self._SingleNodeCall(node, "blockdev_getmirrorstatus",
807                                   [dsk.ToDict() for dsk in disks])
808     if not result.fail_msg:
809       result.payload = [objects.BlockDevStatus.FromDict(i)
810                         for i in result.payload]
811     return result
812
813   def call_blockdev_find(self, node, disk):
814     """Request identification of a given block device.
815
816     This is a single-node call.
817
818     """
819     result = self._SingleNodeCall(node, "blockdev_find", [disk.ToDict()])
820     if not result.fail_msg and result.payload is not None:
821       result.payload = objects.BlockDevStatus.FromDict(result.payload)
822     return result
823
824   def call_blockdev_close(self, node, instance_name, disks):
825     """Closes the given block devices.
826
827     This is a single-node call.
828
829     """
830     params = [instance_name, [cf.ToDict() for cf in disks]]
831     return self._SingleNodeCall(node, "blockdev_close", params)
832
833   def call_blockdev_getsizes(self, node, disks):
834     """Returns the size of the given disks.
835
836     This is a single-node call.
837
838     """
839     params = [[cf.ToDict() for cf in disks]]
840     return self._SingleNodeCall(node, "blockdev_getsize", params)
841
842   def call_drbd_disconnect_net(self, node_list, nodes_ip, disks):
843     """Disconnects the network of the given drbd devices.
844
845     This is a multi-node call.
846
847     """
848     return self._MultiNodeCall(node_list, "drbd_disconnect_net",
849                                [nodes_ip, [cf.ToDict() for cf in disks]])
850
851   def call_drbd_attach_net(self, node_list, nodes_ip,
852                            disks, instance_name, multimaster):
853     """Disconnects the given drbd devices.
854
855     This is a multi-node call.
856
857     """
858     return self._MultiNodeCall(node_list, "drbd_attach_net",
859                                [nodes_ip, [cf.ToDict() for cf in disks],
860                                 instance_name, multimaster])
861
862   def call_drbd_wait_sync(self, node_list, nodes_ip, disks):
863     """Waits for the synchronization of drbd devices is complete.
864
865     This is a multi-node call.
866
867     """
868     return self._MultiNodeCall(node_list, "drbd_wait_sync",
869                                [nodes_ip, [cf.ToDict() for cf in disks]])
870
871   @classmethod
872   def call_upload_file(cls, node_list, file_name, address_list=None):
873     """Upload a file.
874
875     The node will refuse the operation in case the file is not on the
876     approved file list.
877
878     This is a multi-node call.
879
880     @type node_list: list
881     @param node_list: the list of node names to upload to
882     @type file_name: str
883     @param file_name: the filename to upload
884     @type address_list: list or None
885     @keyword address_list: an optional list of node addresses, in order
886         to optimize the RPC speed
887
888     """
889     file_contents = utils.ReadFile(file_name)
890     data = cls._Compress(file_contents)
891     st = os.stat(file_name)
892     params = [file_name, data, st.st_mode, st.st_uid, st.st_gid,
893               st.st_atime, st.st_mtime]
894     return cls._StaticMultiNodeCall(node_list, "upload_file", params,
895                                     address_list=address_list)
896
897   @classmethod
898   def call_write_ssconf_files(cls, node_list, values):
899     """Write ssconf files.
900
901     This is a multi-node call.
902
903     """
904     return cls._StaticMultiNodeCall(node_list, "write_ssconf_files", [values])
905
906   def call_os_diagnose(self, node_list):
907     """Request a diagnose of OS definitions.
908
909     This is a multi-node call.
910
911     """
912     return self._MultiNodeCall(node_list, "os_diagnose", [])
913
914   def call_os_get(self, node, name):
915     """Returns an OS definition.
916
917     This is a single-node call.
918
919     """
920     result = self._SingleNodeCall(node, "os_get", [name])
921     if not result.fail_msg and isinstance(result.data, dict):
922       result.data = objects.OS.FromDict(result.data)
923     return result
924
925   def call_hooks_runner(self, node_list, hpath, phase, env):
926     """Call the hooks runner.
927
928     Args:
929       - op: the OpCode instance
930       - env: a dictionary with the environment
931
932     This is a multi-node call.
933
934     """
935     params = [hpath, phase, env]
936     return self._MultiNodeCall(node_list, "hooks_runner", params)
937
938   def call_iallocator_runner(self, node, name, idata):
939     """Call an iallocator on a remote node
940
941     Args:
942       - name: the iallocator name
943       - input: the json-encoded input string
944
945     This is a single-node call.
946
947     """
948     return self._SingleNodeCall(node, "iallocator_runner", [name, idata])
949
950   def call_blockdev_grow(self, node, cf_bdev, amount):
951     """Request a snapshot of the given block device.
952
953     This is a single-node call.
954
955     """
956     return self._SingleNodeCall(node, "blockdev_grow",
957                                 [cf_bdev.ToDict(), amount])
958
959   def call_blockdev_export(self, node, cf_bdev,
960                            dest_node, dest_path, cluster_name):
961     """Export a given disk to another node.
962
963     This is a single-node call.
964
965     """
966     return self._SingleNodeCall(node, "blockdev_export",
967                                 [cf_bdev.ToDict(), dest_node, dest_path,
968                                  cluster_name])
969
970   def call_blockdev_snapshot(self, node, cf_bdev):
971     """Request a snapshot of the given block device.
972
973     This is a single-node call.
974
975     """
976     return self._SingleNodeCall(node, "blockdev_snapshot", [cf_bdev.ToDict()])
977
978   def call_snapshot_export(self, node, snap_bdev, dest_node, instance,
979                            cluster_name, idx):
980     """Request the export of a given snapshot.
981
982     This is a single-node call.
983
984     """
985     return self._SingleNodeCall(node, "snapshot_export",
986                                 [snap_bdev.ToDict(), dest_node,
987                                  self._InstDict(instance), cluster_name, idx])
988
989   def call_finalize_export(self, node, instance, snap_disks):
990     """Request the completion of an export operation.
991
992     This writes the export config file, etc.
993
994     This is a single-node call.
995
996     """
997     flat_disks = []
998     for disk in snap_disks:
999       if isinstance(disk, bool):
1000         flat_disks.append(disk)
1001       else:
1002         flat_disks.append(disk.ToDict())
1003
1004     return self._SingleNodeCall(node, "finalize_export",
1005                                 [self._InstDict(instance), flat_disks])
1006
1007   def call_export_info(self, node, path):
1008     """Queries the export information in a given path.
1009
1010     This is a single-node call.
1011
1012     """
1013     return self._SingleNodeCall(node, "export_info", [path])
1014
1015   def call_instance_os_import(self, node, inst, src_node, src_images,
1016                               cluster_name):
1017     """Request the import of a backup into an instance.
1018
1019     This is a single-node call.
1020
1021     """
1022     return self._SingleNodeCall(node, "instance_os_import",
1023                                 [self._InstDict(inst), src_node, src_images,
1024                                  cluster_name])
1025
1026   def call_export_list(self, node_list):
1027     """Gets the stored exports list.
1028
1029     This is a multi-node call.
1030
1031     """
1032     return self._MultiNodeCall(node_list, "export_list", [])
1033
1034   def call_export_remove(self, node, export):
1035     """Requests removal of a given export.
1036
1037     This is a single-node call.
1038
1039     """
1040     return self._SingleNodeCall(node, "export_remove", [export])
1041
1042   @classmethod
1043   def call_node_leave_cluster(cls, node):
1044     """Requests a node to clean the cluster information it has.
1045
1046     This will remove the configuration information from the ganeti data
1047     dir.
1048
1049     This is a single-node call.
1050
1051     """
1052     return cls._StaticSingleNodeCall(node, "node_leave_cluster", [])
1053
1054   def call_node_volumes(self, node_list):
1055     """Gets all volumes on node(s).
1056
1057     This is a multi-node call.
1058
1059     """
1060     return self._MultiNodeCall(node_list, "node_volumes", [])
1061
1062   def call_node_demote_from_mc(self, node):
1063     """Demote a node from the master candidate role.
1064
1065     This is a single-node call.
1066
1067     """
1068     return self._SingleNodeCall(node, "node_demote_from_mc", [])
1069
1070
1071   def call_node_powercycle(self, node, hypervisor):
1072     """Tries to powercycle a node.
1073
1074     This is a single-node call.
1075
1076     """
1077     return self._SingleNodeCall(node, "node_powercycle", [hypervisor])
1078
1079
1080   def call_test_delay(self, node_list, duration):
1081     """Sleep for a fixed time on given node(s).
1082
1083     This is a multi-node call.
1084
1085     """
1086     return self._MultiNodeCall(node_list, "test_delay", [duration])
1087
1088   def call_file_storage_dir_create(self, node, file_storage_dir):
1089     """Create the given file storage directory.
1090
1091     This is a single-node call.
1092
1093     """
1094     return self._SingleNodeCall(node, "file_storage_dir_create",
1095                                 [file_storage_dir])
1096
1097   def call_file_storage_dir_remove(self, node, file_storage_dir):
1098     """Remove the given file storage directory.
1099
1100     This is a single-node call.
1101
1102     """
1103     return self._SingleNodeCall(node, "file_storage_dir_remove",
1104                                 [file_storage_dir])
1105
1106   def call_file_storage_dir_rename(self, node, old_file_storage_dir,
1107                                    new_file_storage_dir):
1108     """Rename file storage directory.
1109
1110     This is a single-node call.
1111
1112     """
1113     return self._SingleNodeCall(node, "file_storage_dir_rename",
1114                                 [old_file_storage_dir, new_file_storage_dir])
1115
1116   @classmethod
1117   def call_jobqueue_update(cls, node_list, address_list, file_name, content):
1118     """Update job queue.
1119
1120     This is a multi-node call.
1121
1122     """
1123     return cls._StaticMultiNodeCall(node_list, "jobqueue_update",
1124                                     [file_name, cls._Compress(content)],
1125                                     address_list=address_list)
1126
1127   @classmethod
1128   def call_jobqueue_purge(cls, node):
1129     """Purge job queue.
1130
1131     This is a single-node call.
1132
1133     """
1134     return cls._StaticSingleNodeCall(node, "jobqueue_purge", [])
1135
1136   @classmethod
1137   def call_jobqueue_rename(cls, node_list, address_list, rename):
1138     """Rename a job queue file.
1139
1140     This is a multi-node call.
1141
1142     """
1143     return cls._StaticMultiNodeCall(node_list, "jobqueue_rename", rename,
1144                                     address_list=address_list)
1145
1146   @classmethod
1147   def call_jobqueue_set_drain(cls, node_list, drain_flag):
1148     """Set the drain flag on the queue.
1149
1150     This is a multi-node call.
1151
1152     @type node_list: list
1153     @param node_list: the list of nodes to query
1154     @type drain_flag: bool
1155     @param drain_flag: if True, will set the drain flag, otherwise reset it.
1156
1157     """
1158     return cls._StaticMultiNodeCall(node_list, "jobqueue_set_drain",
1159                                     [drain_flag])
1160
1161   def call_hypervisor_validate_params(self, node_list, hvname, hvparams):
1162     """Validate the hypervisor params.
1163
1164     This is a multi-node call.
1165
1166     @type node_list: list
1167     @param node_list: the list of nodes to query
1168     @type hvname: string
1169     @param hvname: the hypervisor name
1170     @type hvparams: dict
1171     @param hvparams: the hypervisor parameters to be validated
1172
1173     """
1174     cluster = self._cfg.GetClusterInfo()
1175     hv_full = objects.FillDict(cluster.hvparams.get(hvname, {}), hvparams)
1176     return self._MultiNodeCall(node_list, "hypervisor_validate_params",
1177                                [hvname, hv_full])