Simplify handling of regular fields in LUQuery*
[ganeti-local] / lib / rpc.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Inter-node RPC library.
23
24 """
25
26 # pylint: disable-msg=C0103,R0201,R0904
27 # C0103: Invalid name, since call_ are not valid
28 # R0201: Method could be a function, we keep all rpcs instance methods
29 # as not to change them back and forth between static/instance methods
30 # if they need to start using instance attributes
31 # R0904: Too many public methods
32
33 import os
34 import logging
35 import zlib
36 import base64
37
38 from ganeti import utils
39 from ganeti import objects
40 from ganeti import http
41 from ganeti import serializer
42 from ganeti import constants
43 from ganeti import errors
44
45 import ganeti.http.client
46
47
48 # Module level variable
49 _http_manager = None
50
51
52 def Init():
53   """Initializes the module-global HTTP client manager.
54
55   Must be called before using any RPC function.
56
57   """
58   global _http_manager
59
60   assert not _http_manager, "RPC module initialized more than once"
61
62   _http_manager = http.client.HttpClientManager()
63
64
65 def Shutdown():
66   """Stops the module-global HTTP client manager.
67
68   Must be called before quitting the program.
69
70   """
71   global _http_manager
72
73   if _http_manager:
74     _http_manager.Shutdown()
75     _http_manager = None
76
77
78 class RpcResult(object):
79   """RPC Result class.
80
81   This class holds an RPC result. It is needed since in multi-node
82   calls we can't raise an exception just because one one out of many
83   failed, and therefore we use this class to encapsulate the result.
84
85   @ivar data: the data payload, for successful results, or None
86   @ivar call: the name of the RPC call
87   @ivar node: the name of the node to which we made the call
88   @ivar offline: whether the operation failed because the node was
89       offline, as opposed to actual failure; offline=True will always
90       imply failed=True, in order to allow simpler checking if
91       the user doesn't care about the exact failure mode
92   @ivar fail_msg: the error message if the call failed
93
94   """
95   def __init__(self, data=None, failed=False, offline=False,
96                call=None, node=None):
97     self.offline = offline
98     self.call = call
99     self.node = node
100     if offline:
101       self.fail_msg = "Node is marked offline"
102       self.data = self.payload = None
103     elif failed:
104       self.fail_msg = self._EnsureErr(data)
105       self.data = self.payload = None
106     else:
107       self.data = data
108       if not isinstance(self.data, (tuple, list)):
109         self.fail_msg = ("RPC layer error: invalid result type (%s)" %
110                          type(self.data))
111       elif len(data) != 2:
112         self.fail_msg = ("RPC layer error: invalid result length (%d), "
113                          "expected 2" % len(self.data))
114       elif not self.data[0]:
115         self.fail_msg = self._EnsureErr(self.data[1])
116       else:
117         # finally success
118         self.fail_msg = None
119         self.payload = data[1]
120
121   @staticmethod
122   def _EnsureErr(val):
123     """Helper to ensure we return a 'True' value for error."""
124     if val:
125       return val
126     else:
127       return "No error information"
128
129   def Raise(self, msg, prereq=False):
130     """If the result has failed, raise an OpExecError.
131
132     This is used so that LU code doesn't have to check for each
133     result, but instead can call this function.
134
135     """
136     if not self.fail_msg:
137       return
138
139     if not msg: # one could pass None for default message
140       msg = ("Call '%s' to node '%s' has failed: %s" %
141              (self.call, self.node, self.fail_msg))
142     else:
143       msg = "%s: %s" % (msg, self.fail_msg)
144     if prereq:
145       ec = errors.OpPrereqError
146     else:
147       ec = errors.OpExecError
148     raise ec(msg)
149
150
151 class Client:
152   """RPC Client class.
153
154   This class, given a (remote) method name, a list of parameters and a
155   list of nodes, will contact (in parallel) all nodes, and return a
156   dict of results (key: node name, value: result).
157
158   One current bug is that generic failure is still signaled by
159   'False' result, which is not good. This overloading of values can
160   cause bugs.
161
162   """
163   def __init__(self, procedure, body, port):
164     self.procedure = procedure
165     self.body = body
166     self.port = port
167     self.nc = {}
168
169     self._ssl_params = \
170       http.HttpSslParams(ssl_key_path=constants.SSL_CERT_FILE,
171                          ssl_cert_path=constants.SSL_CERT_FILE)
172
173   def ConnectList(self, node_list, address_list=None):
174     """Add a list of nodes to the target nodes.
175
176     @type node_list: list
177     @param node_list: the list of node names to connect
178     @type address_list: list or None
179     @keyword address_list: either None or a list with node addresses,
180         which must have the same length as the node list
181
182     """
183     if address_list is None:
184       address_list = [None for _ in node_list]
185     else:
186       assert len(node_list) == len(address_list), \
187              "Name and address lists should have the same length"
188     for node, address in zip(node_list, address_list):
189       self.ConnectNode(node, address)
190
191   def ConnectNode(self, name, address=None):
192     """Add a node to the target list.
193
194     @type name: str
195     @param name: the node name
196     @type address: str
197     @keyword address: the node address, if known
198
199     """
200     if address is None:
201       address = name
202
203     self.nc[name] = \
204       http.client.HttpClientRequest(address, self.port, http.HTTP_PUT,
205                                     "/%s" % self.procedure,
206                                     post_data=self.body,
207                                     ssl_params=self._ssl_params,
208                                     ssl_verify_peer=True)
209
210   def GetResults(self):
211     """Call nodes and return results.
212
213     @rtype: list
214     @return: List of RPC results
215
216     """
217     assert _http_manager, "RPC module not initialized"
218
219     _http_manager.ExecRequests(self.nc.values())
220
221     results = {}
222
223     for name, req in self.nc.iteritems():
224       if req.success and req.resp_status_code == http.HTTP_OK:
225         results[name] = RpcResult(data=serializer.LoadJson(req.resp_body),
226                                   node=name, call=self.procedure)
227         continue
228
229       # TODO: Better error reporting
230       if req.error:
231         msg = req.error
232       else:
233         msg = req.resp_body
234
235       logging.error("RPC error in %s from node %s: %s",
236                     self.procedure, name, msg)
237       results[name] = RpcResult(data=msg, failed=True, node=name,
238                                 call=self.procedure)
239
240     return results
241
242
243 class RpcRunner(object):
244   """RPC runner class"""
245
246   def __init__(self, cfg):
247     """Initialized the rpc runner.
248
249     @type cfg:  C{config.ConfigWriter}
250     @param cfg: the configuration object that will be used to get data
251                 about the cluster
252
253     """
254     self._cfg = cfg
255     self.port = utils.GetDaemonPort(constants.NODED)
256
257   def _InstDict(self, instance, hvp=None, bep=None):
258     """Convert the given instance to a dict.
259
260     This is done via the instance's ToDict() method and additionally
261     we fill the hvparams with the cluster defaults.
262
263     @type instance: L{objects.Instance}
264     @param instance: an Instance object
265     @type hvp: dict or None
266     @param hvp: a dictionary with overridden hypervisor parameters
267     @type bep: dict or None
268     @param bep: a dictionary with overridden backend parameters
269     @rtype: dict
270     @return: the instance dict, with the hvparams filled with the
271         cluster defaults
272
273     """
274     idict = instance.ToDict()
275     cluster = self._cfg.GetClusterInfo()
276     idict["hvparams"] = cluster.FillHV(instance)
277     if hvp is not None:
278       idict["hvparams"].update(hvp)
279     idict["beparams"] = cluster.FillBE(instance)
280     if bep is not None:
281       idict["beparams"].update(bep)
282     for nic in idict["nics"]:
283       nic['nicparams'] = objects.FillDict(
284         cluster.nicparams[constants.PP_DEFAULT],
285         nic['nicparams'])
286     return idict
287
288   def _ConnectList(self, client, node_list, call):
289     """Helper for computing node addresses.
290
291     @type client: L{ganeti.rpc.Client}
292     @param client: a C{Client} instance
293     @type node_list: list
294     @param node_list: the node list we should connect
295     @type call: string
296     @param call: the name of the remote procedure call, for filling in
297         correctly any eventual offline nodes' results
298
299     """
300     all_nodes = self._cfg.GetAllNodesInfo()
301     name_list = []
302     addr_list = []
303     skip_dict = {}
304     for node in node_list:
305       if node in all_nodes:
306         if all_nodes[node].offline:
307           skip_dict[node] = RpcResult(node=node, offline=True, call=call)
308           continue
309         val = all_nodes[node].primary_ip
310       else:
311         val = None
312       addr_list.append(val)
313       name_list.append(node)
314     if name_list:
315       client.ConnectList(name_list, address_list=addr_list)
316     return skip_dict
317
318   def _ConnectNode(self, client, node, call):
319     """Helper for computing one node's address.
320
321     @type client: L{ganeti.rpc.Client}
322     @param client: a C{Client} instance
323     @type node: str
324     @param node: the node we should connect
325     @type call: string
326     @param call: the name of the remote procedure call, for filling in
327         correctly any eventual offline nodes' results
328
329     """
330     node_info = self._cfg.GetNodeInfo(node)
331     if node_info is not None:
332       if node_info.offline:
333         return RpcResult(node=node, offline=True, call=call)
334       addr = node_info.primary_ip
335     else:
336       addr = None
337     client.ConnectNode(node, address=addr)
338
339   def _MultiNodeCall(self, node_list, procedure, args):
340     """Helper for making a multi-node call
341
342     """
343     body = serializer.DumpJson(args, indent=False)
344     c = Client(procedure, body, self.port)
345     skip_dict = self._ConnectList(c, node_list, procedure)
346     skip_dict.update(c.GetResults())
347     return skip_dict
348
349   @classmethod
350   def _StaticMultiNodeCall(cls, node_list, procedure, args,
351                            address_list=None):
352     """Helper for making a multi-node static call
353
354     """
355     body = serializer.DumpJson(args, indent=False)
356     c = Client(procedure, body, utils.GetDaemonPort(constants.NODED))
357     c.ConnectList(node_list, address_list=address_list)
358     return c.GetResults()
359
360   def _SingleNodeCall(self, node, procedure, args):
361     """Helper for making a single-node call
362
363     """
364     body = serializer.DumpJson(args, indent=False)
365     c = Client(procedure, body, self.port)
366     result = self._ConnectNode(c, node, procedure)
367     if result is None:
368       # we did connect, node is not offline
369       result = c.GetResults()[node]
370     return result
371
372   @classmethod
373   def _StaticSingleNodeCall(cls, node, procedure, args):
374     """Helper for making a single-node static call
375
376     """
377     body = serializer.DumpJson(args, indent=False)
378     c = Client(procedure, body, utils.GetDaemonPort(constants.NODED))
379     c.ConnectNode(node)
380     return c.GetResults()[node]
381
382   @staticmethod
383   def _Compress(data):
384     """Compresses a string for transport over RPC.
385
386     Small amounts of data are not compressed.
387
388     @type data: str
389     @param data: Data
390     @rtype: tuple
391     @return: Encoded data to send
392
393     """
394     # Small amounts of data are not compressed
395     if len(data) < 512:
396       return (constants.RPC_ENCODING_NONE, data)
397
398     # Compress with zlib and encode in base64
399     return (constants.RPC_ENCODING_ZLIB_BASE64,
400             base64.b64encode(zlib.compress(data, 3)))
401
402   #
403   # Begin RPC calls
404   #
405
406   def call_lv_list(self, node_list, vg_name):
407     """Gets the logical volumes present in a given volume group.
408
409     This is a multi-node call.
410
411     """
412     return self._MultiNodeCall(node_list, "lv_list", [vg_name])
413
414   def call_vg_list(self, node_list):
415     """Gets the volume group list.
416
417     This is a multi-node call.
418
419     """
420     return self._MultiNodeCall(node_list, "vg_list", [])
421
422   def call_storage_list(self, node_list, su_name, su_args, name, fields):
423     """Get list of storage units.
424
425     This is a multi-node call.
426
427     """
428     return self._MultiNodeCall(node_list, "storage_list",
429                                [su_name, su_args, name, fields])
430
431   def call_storage_modify(self, node, su_name, su_args, name, changes):
432     """Modify a storage unit.
433
434     This is a single-node call.
435
436     """
437     return self._SingleNodeCall(node, "storage_modify",
438                                 [su_name, su_args, name, changes])
439
440   def call_storage_execute(self, node, su_name, su_args, name, op):
441     """Executes an operation on a storage unit.
442
443     This is a single-node call.
444
445     """
446     return self._SingleNodeCall(node, "storage_execute",
447                                 [su_name, su_args, name, op])
448
449   def call_bridges_exist(self, node, bridges_list):
450     """Checks if a node has all the bridges given.
451
452     This method checks if all bridges given in the bridges_list are
453     present on the remote node, so that an instance that uses interfaces
454     on those bridges can be started.
455
456     This is a single-node call.
457
458     """
459     return self._SingleNodeCall(node, "bridges_exist", [bridges_list])
460
461   def call_instance_start(self, node, instance, hvp, bep):
462     """Starts an instance.
463
464     This is a single-node call.
465
466     """
467     idict = self._InstDict(instance, hvp=hvp, bep=bep)
468     return self._SingleNodeCall(node, "instance_start", [idict])
469
470   def call_instance_shutdown(self, node, instance):
471     """Stops an instance.
472
473     This is a single-node call.
474
475     """
476     return self._SingleNodeCall(node, "instance_shutdown",
477                                 [self._InstDict(instance)])
478
479   def call_migration_info(self, node, instance):
480     """Gather the information necessary to prepare an instance migration.
481
482     This is a single-node call.
483
484     @type node: string
485     @param node: the node on which the instance is currently running
486     @type instance: C{objects.Instance}
487     @param instance: the instance definition
488
489     """
490     return self._SingleNodeCall(node, "migration_info",
491                                 [self._InstDict(instance)])
492
493   def call_accept_instance(self, node, instance, info, target):
494     """Prepare a node to accept an instance.
495
496     This is a single-node call.
497
498     @type node: string
499     @param node: the target node for the migration
500     @type instance: C{objects.Instance}
501     @param instance: the instance definition
502     @type info: opaque/hypervisor specific (string/data)
503     @param info: result for the call_migration_info call
504     @type target: string
505     @param target: target hostname (usually ip address) (on the node itself)
506
507     """
508     return self._SingleNodeCall(node, "accept_instance",
509                                 [self._InstDict(instance), info, target])
510
511   def call_finalize_migration(self, node, instance, info, success):
512     """Finalize any target-node migration specific operation.
513
514     This is called both in case of a successful migration and in case of error
515     (in which case it should abort the migration).
516
517     This is a single-node call.
518
519     @type node: string
520     @param node: the target node for the migration
521     @type instance: C{objects.Instance}
522     @param instance: the instance definition
523     @type info: opaque/hypervisor specific (string/data)
524     @param info: result for the call_migration_info call
525     @type success: boolean
526     @param success: whether the migration was a success or a failure
527
528     """
529     return self._SingleNodeCall(node, "finalize_migration",
530                                 [self._InstDict(instance), info, success])
531
532   def call_instance_migrate(self, node, instance, target, live):
533     """Migrate an instance.
534
535     This is a single-node call.
536
537     @type node: string
538     @param node: the node on which the instance is currently running
539     @type instance: C{objects.Instance}
540     @param instance: the instance definition
541     @type target: string
542     @param target: the target node name
543     @type live: boolean
544     @param live: whether the migration should be done live or not (the
545         interpretation of this parameter is left to the hypervisor)
546
547     """
548     return self._SingleNodeCall(node, "instance_migrate",
549                                 [self._InstDict(instance), target, live])
550
551   def call_instance_reboot(self, node, instance, reboot_type):
552     """Reboots an instance.
553
554     This is a single-node call.
555
556     """
557     return self._SingleNodeCall(node, "instance_reboot",
558                                 [self._InstDict(instance), reboot_type])
559
560   def call_instance_os_add(self, node, inst, reinstall):
561     """Installs an OS on the given instance.
562
563     This is a single-node call.
564
565     """
566     return self._SingleNodeCall(node, "instance_os_add",
567                                 [self._InstDict(inst), reinstall])
568
569   def call_instance_run_rename(self, node, inst, old_name):
570     """Run the OS rename script for an instance.
571
572     This is a single-node call.
573
574     """
575     return self._SingleNodeCall(node, "instance_run_rename",
576                                 [self._InstDict(inst), old_name])
577
578   def call_instance_info(self, node, instance, hname):
579     """Returns information about a single instance.
580
581     This is a single-node call.
582
583     @type node: list
584     @param node: the list of nodes to query
585     @type instance: string
586     @param instance: the instance name
587     @type hname: string
588     @param hname: the hypervisor type of the instance
589
590     """
591     return self._SingleNodeCall(node, "instance_info", [instance, hname])
592
593   def call_instance_migratable(self, node, instance):
594     """Checks whether the given instance can be migrated.
595
596     This is a single-node call.
597
598     @param node: the node to query
599     @type instance: L{objects.Instance}
600     @param instance: the instance to check
601
602
603     """
604     return self._SingleNodeCall(node, "instance_migratable",
605                                 [self._InstDict(instance)])
606
607   def call_all_instances_info(self, node_list, hypervisor_list):
608     """Returns information about all instances on the given nodes.
609
610     This is a multi-node call.
611
612     @type node_list: list
613     @param node_list: the list of nodes to query
614     @type hypervisor_list: list
615     @param hypervisor_list: the hypervisors to query for instances
616
617     """
618     return self._MultiNodeCall(node_list, "all_instances_info",
619                                [hypervisor_list])
620
621   def call_instance_list(self, node_list, hypervisor_list):
622     """Returns the list of running instances on a given node.
623
624     This is a multi-node call.
625
626     @type node_list: list
627     @param node_list: the list of nodes to query
628     @type hypervisor_list: list
629     @param hypervisor_list: the hypervisors to query for instances
630
631     """
632     return self._MultiNodeCall(node_list, "instance_list", [hypervisor_list])
633
634   def call_node_tcp_ping(self, node, source, target, port, timeout,
635                          live_port_needed):
636     """Do a TcpPing on the remote node
637
638     This is a single-node call.
639
640     """
641     return self._SingleNodeCall(node, "node_tcp_ping",
642                                 [source, target, port, timeout,
643                                  live_port_needed])
644
645   def call_node_has_ip_address(self, node, address):
646     """Checks if a node has the given IP address.
647
648     This is a single-node call.
649
650     """
651     return self._SingleNodeCall(node, "node_has_ip_address", [address])
652
653   def call_node_info(self, node_list, vg_name, hypervisor_type):
654     """Return node information.
655
656     This will return memory information and volume group size and free
657     space.
658
659     This is a multi-node call.
660
661     @type node_list: list
662     @param node_list: the list of nodes to query
663     @type vg_name: C{string}
664     @param vg_name: the name of the volume group to ask for disk space
665         information
666     @type hypervisor_type: C{str}
667     @param hypervisor_type: the name of the hypervisor to ask for
668         memory information
669
670     """
671     return self._MultiNodeCall(node_list, "node_info",
672                                [vg_name, hypervisor_type])
673
674   def call_node_add(self, node, dsa, dsapub, rsa, rsapub, ssh, sshpub):
675     """Add a node to the cluster.
676
677     This is a single-node call.
678
679     """
680     return self._SingleNodeCall(node, "node_add",
681                                 [dsa, dsapub, rsa, rsapub, ssh, sshpub])
682
683   def call_node_verify(self, node_list, checkdict, cluster_name):
684     """Request verification of given parameters.
685
686     This is a multi-node call.
687
688     """
689     return self._MultiNodeCall(node_list, "node_verify",
690                                [checkdict, cluster_name])
691
692   @classmethod
693   def call_node_start_master(cls, node, start_daemons, no_voting):
694     """Tells a node to activate itself as a master.
695
696     This is a single-node call.
697
698     """
699     return cls._StaticSingleNodeCall(node, "node_start_master",
700                                      [start_daemons, no_voting])
701
702   @classmethod
703   def call_node_stop_master(cls, node, stop_daemons):
704     """Tells a node to demote itself from master status.
705
706     This is a single-node call.
707
708     """
709     return cls._StaticSingleNodeCall(node, "node_stop_master", [stop_daemons])
710
711   @classmethod
712   def call_master_info(cls, node_list):
713     """Query master info.
714
715     This is a multi-node call.
716
717     """
718     # TODO: should this method query down nodes?
719     return cls._StaticMultiNodeCall(node_list, "master_info", [])
720
721   def call_version(self, node_list):
722     """Query node version.
723
724     This is a multi-node call.
725
726     """
727     return self._MultiNodeCall(node_list, "version", [])
728
729   def call_blockdev_create(self, node, bdev, size, owner, on_primary, info):
730     """Request creation of a given block device.
731
732     This is a single-node call.
733
734     """
735     return self._SingleNodeCall(node, "blockdev_create",
736                                 [bdev.ToDict(), size, owner, on_primary, info])
737
738   def call_blockdev_remove(self, node, bdev):
739     """Request removal of a given block device.
740
741     This is a single-node call.
742
743     """
744     return self._SingleNodeCall(node, "blockdev_remove", [bdev.ToDict()])
745
746   def call_blockdev_rename(self, node, devlist):
747     """Request rename of the given block devices.
748
749     This is a single-node call.
750
751     """
752     return self._SingleNodeCall(node, "blockdev_rename",
753                                 [(d.ToDict(), uid) for d, uid in devlist])
754
755   def call_blockdev_assemble(self, node, disk, owner, on_primary):
756     """Request assembling of a given block device.
757
758     This is a single-node call.
759
760     """
761     return self._SingleNodeCall(node, "blockdev_assemble",
762                                 [disk.ToDict(), owner, on_primary])
763
764   def call_blockdev_shutdown(self, node, disk):
765     """Request shutdown of a given block device.
766
767     This is a single-node call.
768
769     """
770     return self._SingleNodeCall(node, "blockdev_shutdown", [disk.ToDict()])
771
772   def call_blockdev_addchildren(self, node, bdev, ndevs):
773     """Request adding a list of children to a (mirroring) device.
774
775     This is a single-node call.
776
777     """
778     return self._SingleNodeCall(node, "blockdev_addchildren",
779                                 [bdev.ToDict(),
780                                  [disk.ToDict() for disk in ndevs]])
781
782   def call_blockdev_removechildren(self, node, bdev, ndevs):
783     """Request removing a list of children from a (mirroring) device.
784
785     This is a single-node call.
786
787     """
788     return self._SingleNodeCall(node, "blockdev_removechildren",
789                                 [bdev.ToDict(),
790                                  [disk.ToDict() for disk in ndevs]])
791
792   def call_blockdev_getmirrorstatus(self, node, disks):
793     """Request status of a (mirroring) device.
794
795     This is a single-node call.
796
797     """
798     result = self._SingleNodeCall(node, "blockdev_getmirrorstatus",
799                                   [dsk.ToDict() for dsk in disks])
800     if not result.fail_msg:
801       result.payload = [objects.BlockDevStatus.FromDict(i)
802                         for i in result.payload]
803     return result
804
805   def call_blockdev_find(self, node, disk):
806     """Request identification of a given block device.
807
808     This is a single-node call.
809
810     """
811     result = self._SingleNodeCall(node, "blockdev_find", [disk.ToDict()])
812     if not result.fail_msg and result.payload is not None:
813       result.payload = objects.BlockDevStatus.FromDict(result.payload)
814     return result
815
816   def call_blockdev_close(self, node, instance_name, disks):
817     """Closes the given block devices.
818
819     This is a single-node call.
820
821     """
822     params = [instance_name, [cf.ToDict() for cf in disks]]
823     return self._SingleNodeCall(node, "blockdev_close", params)
824
825   def call_blockdev_getsizes(self, node, disks):
826     """Returns the size of the given disks.
827
828     This is a single-node call.
829
830     """
831     params = [[cf.ToDict() for cf in disks]]
832     return self._SingleNodeCall(node, "blockdev_getsize", params)
833
834   def call_drbd_disconnect_net(self, node_list, nodes_ip, disks):
835     """Disconnects the network of the given drbd devices.
836
837     This is a multi-node call.
838
839     """
840     return self._MultiNodeCall(node_list, "drbd_disconnect_net",
841                                [nodes_ip, [cf.ToDict() for cf in disks]])
842
843   def call_drbd_attach_net(self, node_list, nodes_ip,
844                            disks, instance_name, multimaster):
845     """Disconnects the given drbd devices.
846
847     This is a multi-node call.
848
849     """
850     return self._MultiNodeCall(node_list, "drbd_attach_net",
851                                [nodes_ip, [cf.ToDict() for cf in disks],
852                                 instance_name, multimaster])
853
854   def call_drbd_wait_sync(self, node_list, nodes_ip, disks):
855     """Waits for the synchronization of drbd devices is complete.
856
857     This is a multi-node call.
858
859     """
860     return self._MultiNodeCall(node_list, "drbd_wait_sync",
861                                [nodes_ip, [cf.ToDict() for cf in disks]])
862
863   @classmethod
864   def call_upload_file(cls, node_list, file_name, address_list=None):
865     """Upload a file.
866
867     The node will refuse the operation in case the file is not on the
868     approved file list.
869
870     This is a multi-node call.
871
872     @type node_list: list
873     @param node_list: the list of node names to upload to
874     @type file_name: str
875     @param file_name: the filename to upload
876     @type address_list: list or None
877     @keyword address_list: an optional list of node addresses, in order
878         to optimize the RPC speed
879
880     """
881     file_contents = utils.ReadFile(file_name)
882     data = cls._Compress(file_contents)
883     st = os.stat(file_name)
884     params = [file_name, data, st.st_mode, st.st_uid, st.st_gid,
885               st.st_atime, st.st_mtime]
886     return cls._StaticMultiNodeCall(node_list, "upload_file", params,
887                                     address_list=address_list)
888
889   @classmethod
890   def call_write_ssconf_files(cls, node_list, values):
891     """Write ssconf files.
892
893     This is a multi-node call.
894
895     """
896     return cls._StaticMultiNodeCall(node_list, "write_ssconf_files", [values])
897
898   def call_os_diagnose(self, node_list):
899     """Request a diagnose of OS definitions.
900
901     This is a multi-node call.
902
903     """
904     return self._MultiNodeCall(node_list, "os_diagnose", [])
905
906   def call_os_get(self, node, name):
907     """Returns an OS definition.
908
909     This is a single-node call.
910
911     """
912     result = self._SingleNodeCall(node, "os_get", [name])
913     if not result.fail_msg and isinstance(result.data, dict):
914       result.data = objects.OS.FromDict(result.data)
915     return result
916
917   def call_hooks_runner(self, node_list, hpath, phase, env):
918     """Call the hooks runner.
919
920     Args:
921       - op: the OpCode instance
922       - env: a dictionary with the environment
923
924     This is a multi-node call.
925
926     """
927     params = [hpath, phase, env]
928     return self._MultiNodeCall(node_list, "hooks_runner", params)
929
930   def call_iallocator_runner(self, node, name, idata):
931     """Call an iallocator on a remote node
932
933     Args:
934       - name: the iallocator name
935       - input: the json-encoded input string
936
937     This is a single-node call.
938
939     """
940     return self._SingleNodeCall(node, "iallocator_runner", [name, idata])
941
942   def call_blockdev_grow(self, node, cf_bdev, amount):
943     """Request a snapshot of the given block device.
944
945     This is a single-node call.
946
947     """
948     return self._SingleNodeCall(node, "blockdev_grow",
949                                 [cf_bdev.ToDict(), amount])
950
951   def call_blockdev_export(self, node, cf_bdev,
952                            dest_node, dest_path, cluster_name):
953     """Export a given disk to another node.
954
955     This is a single-node call.
956
957     """
958     return self._SingleNodeCall(node, "blockdev_export",
959                                 [cf_bdev.ToDict(), dest_node, dest_path,
960                                  cluster_name])
961
962   def call_blockdev_snapshot(self, node, cf_bdev):
963     """Request a snapshot of the given block device.
964
965     This is a single-node call.
966
967     """
968     return self._SingleNodeCall(node, "blockdev_snapshot", [cf_bdev.ToDict()])
969
970   def call_snapshot_export(self, node, snap_bdev, dest_node, instance,
971                            cluster_name, idx):
972     """Request the export of a given snapshot.
973
974     This is a single-node call.
975
976     """
977     return self._SingleNodeCall(node, "snapshot_export",
978                                 [snap_bdev.ToDict(), dest_node,
979                                  self._InstDict(instance), cluster_name, idx])
980
981   def call_finalize_export(self, node, instance, snap_disks):
982     """Request the completion of an export operation.
983
984     This writes the export config file, etc.
985
986     This is a single-node call.
987
988     """
989     flat_disks = []
990     for disk in snap_disks:
991       if isinstance(disk, bool):
992         flat_disks.append(disk)
993       else:
994         flat_disks.append(disk.ToDict())
995
996     return self._SingleNodeCall(node, "finalize_export",
997                                 [self._InstDict(instance), flat_disks])
998
999   def call_export_info(self, node, path):
1000     """Queries the export information in a given path.
1001
1002     This is a single-node call.
1003
1004     """
1005     return self._SingleNodeCall(node, "export_info", [path])
1006
1007   def call_instance_os_import(self, node, inst, src_node, src_images,
1008                               cluster_name):
1009     """Request the import of a backup into an instance.
1010
1011     This is a single-node call.
1012
1013     """
1014     return self._SingleNodeCall(node, "instance_os_import",
1015                                 [self._InstDict(inst), src_node, src_images,
1016                                  cluster_name])
1017
1018   def call_export_list(self, node_list):
1019     """Gets the stored exports list.
1020
1021     This is a multi-node call.
1022
1023     """
1024     return self._MultiNodeCall(node_list, "export_list", [])
1025
1026   def call_export_remove(self, node, export):
1027     """Requests removal of a given export.
1028
1029     This is a single-node call.
1030
1031     """
1032     return self._SingleNodeCall(node, "export_remove", [export])
1033
1034   @classmethod
1035   def call_node_leave_cluster(cls, node):
1036     """Requests a node to clean the cluster information it has.
1037
1038     This will remove the configuration information from the ganeti data
1039     dir.
1040
1041     This is a single-node call.
1042
1043     """
1044     return cls._StaticSingleNodeCall(node, "node_leave_cluster", [])
1045
1046   def call_node_volumes(self, node_list):
1047     """Gets all volumes on node(s).
1048
1049     This is a multi-node call.
1050
1051     """
1052     return self._MultiNodeCall(node_list, "node_volumes", [])
1053
1054   def call_node_demote_from_mc(self, node):
1055     """Demote a node from the master candidate role.
1056
1057     This is a single-node call.
1058
1059     """
1060     return self._SingleNodeCall(node, "node_demote_from_mc", [])
1061
1062
1063   def call_node_powercycle(self, node, hypervisor):
1064     """Tries to powercycle a node.
1065
1066     This is a single-node call.
1067
1068     """
1069     return self._SingleNodeCall(node, "node_powercycle", [hypervisor])
1070
1071
1072   def call_test_delay(self, node_list, duration):
1073     """Sleep for a fixed time on given node(s).
1074
1075     This is a multi-node call.
1076
1077     """
1078     return self._MultiNodeCall(node_list, "test_delay", [duration])
1079
1080   def call_file_storage_dir_create(self, node, file_storage_dir):
1081     """Create the given file storage directory.
1082
1083     This is a single-node call.
1084
1085     """
1086     return self._SingleNodeCall(node, "file_storage_dir_create",
1087                                 [file_storage_dir])
1088
1089   def call_file_storage_dir_remove(self, node, file_storage_dir):
1090     """Remove the given file storage directory.
1091
1092     This is a single-node call.
1093
1094     """
1095     return self._SingleNodeCall(node, "file_storage_dir_remove",
1096                                 [file_storage_dir])
1097
1098   def call_file_storage_dir_rename(self, node, old_file_storage_dir,
1099                                    new_file_storage_dir):
1100     """Rename file storage directory.
1101
1102     This is a single-node call.
1103
1104     """
1105     return self._SingleNodeCall(node, "file_storage_dir_rename",
1106                                 [old_file_storage_dir, new_file_storage_dir])
1107
1108   @classmethod
1109   def call_jobqueue_update(cls, node_list, address_list, file_name, content):
1110     """Update job queue.
1111
1112     This is a multi-node call.
1113
1114     """
1115     return cls._StaticMultiNodeCall(node_list, "jobqueue_update",
1116                                     [file_name, cls._Compress(content)],
1117                                     address_list=address_list)
1118
1119   @classmethod
1120   def call_jobqueue_purge(cls, node):
1121     """Purge job queue.
1122
1123     This is a single-node call.
1124
1125     """
1126     return cls._StaticSingleNodeCall(node, "jobqueue_purge", [])
1127
1128   @classmethod
1129   def call_jobqueue_rename(cls, node_list, address_list, rename):
1130     """Rename a job queue file.
1131
1132     This is a multi-node call.
1133
1134     """
1135     return cls._StaticMultiNodeCall(node_list, "jobqueue_rename", rename,
1136                                     address_list=address_list)
1137
1138   @classmethod
1139   def call_jobqueue_set_drain(cls, node_list, drain_flag):
1140     """Set the drain flag on the queue.
1141
1142     This is a multi-node call.
1143
1144     @type node_list: list
1145     @param node_list: the list of nodes to query
1146     @type drain_flag: bool
1147     @param drain_flag: if True, will set the drain flag, otherwise reset it.
1148
1149     """
1150     return cls._StaticMultiNodeCall(node_list, "jobqueue_set_drain",
1151                                     [drain_flag])
1152
1153   def call_hypervisor_validate_params(self, node_list, hvname, hvparams):
1154     """Validate the hypervisor params.
1155
1156     This is a multi-node call.
1157
1158     @type node_list: list
1159     @param node_list: the list of nodes to query
1160     @type hvname: string
1161     @param hvname: the hypervisor name
1162     @type hvparams: dict
1163     @param hvparams: the hypervisor parameters to be validated
1164
1165     """
1166     cluster = self._cfg.GetClusterInfo()
1167     hv_full = objects.FillDict(cluster.hvparams.get(hvname, {}), hvparams)
1168     return self._MultiNodeCall(node_list, "hypervisor_validate_params",
1169                                [hvname, hv_full])