cmdlib: Change tasklet logging to debug level
[ganeti-local] / lib / rpc.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Inter-node RPC library.
23
24 """
25
26 # pylint: disable-msg=C0103,R0201,R0904
27 # C0103: Invalid name, since call_ are not valid
28 # R0201: Method could be a function, we keep all rpcs instance methods
29 # as not to change them back and forth between static/instance methods
30 # if they need to start using instance attributes
31 # R0904: Too many public methods
32
33 import os
34 import logging
35 import zlib
36 import base64
37
38 from ganeti import utils
39 from ganeti import objects
40 from ganeti import http
41 from ganeti import serializer
42 from ganeti import constants
43 from ganeti import errors
44
45 import ganeti.http.client
46
47
48 # Module level variable
49 _http_manager = None
50
51
52 def Init():
53   """Initializes the module-global HTTP client manager.
54
55   Must be called before using any RPC function.
56
57   """
58   global _http_manager
59
60   assert not _http_manager, "RPC module initialized more than once"
61
62   _http_manager = http.client.HttpClientManager()
63
64
65 def Shutdown():
66   """Stops the module-global HTTP client manager.
67
68   Must be called before quitting the program.
69
70   """
71   global _http_manager
72
73   if _http_manager:
74     _http_manager.Shutdown()
75     _http_manager = None
76
77
78 class RpcResult(object):
79   """RPC Result class.
80
81   This class holds an RPC result. It is needed since in multi-node
82   calls we can't raise an exception just because one one out of many
83   failed, and therefore we use this class to encapsulate the result.
84
85   @ivar data: the data payload, for successful results, or None
86   @type failed: boolean
87   @ivar failed: whether the operation failed at transport level (not
88       application level on the remote node)
89   @ivar call: the name of the RPC call
90   @ivar node: the name of the node to which we made the call
91   @ivar offline: whether the operation failed because the node was
92       offline, as opposed to actual failure; offline=True will always
93       imply failed=True, in order to allow simpler checking if
94       the user doesn't care about the exact failure mode
95   @ivar fail_msg: the error message if the call failed
96
97   """
98   def __init__(self, data=None, failed=False, offline=False,
99                call=None, node=None):
100     self.failed = failed
101     self.offline = offline
102     self.call = call
103     self.node = node
104     if offline:
105       self.failed = True
106       self.fail_msg = "Node is marked offline"
107       self.data = self.payload = None
108     elif failed:
109       self.fail_msg = self._EnsureErr(data)
110       self.data = self.payload = None
111     else:
112       self.data = data
113       if not isinstance(self.data, (tuple, list)):
114         self.fail_msg = ("RPC layer error: invalid result type (%s)" %
115                          type(self.data))
116       elif len(data) != 2:
117         self.fail_msg = ("RPC layer error: invalid result length (%d), "
118                          "expected 2" % len(self.data))
119       elif not self.data[0]:
120         self.fail_msg = self._EnsureErr(self.data[1])
121       else:
122         # finally success
123         self.fail_msg = None
124         self.payload = data[1]
125
126   @staticmethod
127   def _EnsureErr(val):
128     """Helper to ensure we return a 'True' value for error."""
129     if val:
130       return val
131     else:
132       return "No error information"
133
134   def Raise(self, msg, prereq=False):
135     """If the result has failed, raise an OpExecError.
136
137     This is used so that LU code doesn't have to check for each
138     result, but instead can call this function.
139
140     """
141     if not self.fail_msg:
142       return
143
144     if not msg: # one could pass None for default message
145       msg = ("Call '%s' to node '%s' has failed: %s" %
146              (self.call, self.node, self.fail_msg))
147     else:
148       msg = "%s: %s" % (msg, self.fail_msg)
149     if prereq:
150       ec = errors.OpPrereqError
151     else:
152       ec = errors.OpExecError
153     raise ec(msg)
154
155   def RemoteFailMsg(self):
156     """Check if the remote procedure failed.
157
158     @return: the fail_msg attribute
159
160     """
161     return self.fail_msg
162
163
164 class Client:
165   """RPC Client class.
166
167   This class, given a (remote) method name, a list of parameters and a
168   list of nodes, will contact (in parallel) all nodes, and return a
169   dict of results (key: node name, value: result).
170
171   One current bug is that generic failure is still signaled by
172   'False' result, which is not good. This overloading of values can
173   cause bugs.
174
175   """
176   def __init__(self, procedure, body, port):
177     self.procedure = procedure
178     self.body = body
179     self.port = port
180     self.nc = {}
181
182     self._ssl_params = \
183       http.HttpSslParams(ssl_key_path=constants.SSL_CERT_FILE,
184                          ssl_cert_path=constants.SSL_CERT_FILE)
185
186   def ConnectList(self, node_list, address_list=None):
187     """Add a list of nodes to the target nodes.
188
189     @type node_list: list
190     @param node_list: the list of node names to connect
191     @type address_list: list or None
192     @keyword address_list: either None or a list with node addresses,
193         which must have the same length as the node list
194
195     """
196     if address_list is None:
197       address_list = [None for _ in node_list]
198     else:
199       assert len(node_list) == len(address_list), \
200              "Name and address lists should have the same length"
201     for node, address in zip(node_list, address_list):
202       self.ConnectNode(node, address)
203
204   def ConnectNode(self, name, address=None):
205     """Add a node to the target list.
206
207     @type name: str
208     @param name: the node name
209     @type address: str
210     @keyword address: the node address, if known
211
212     """
213     if address is None:
214       address = name
215
216     self.nc[name] = \
217       http.client.HttpClientRequest(address, self.port, http.HTTP_PUT,
218                                     "/%s" % self.procedure,
219                                     post_data=self.body,
220                                     ssl_params=self._ssl_params,
221                                     ssl_verify_peer=True)
222
223   def GetResults(self):
224     """Call nodes and return results.
225
226     @rtype: list
227     @return: List of RPC results
228
229     """
230     assert _http_manager, "RPC module not initialized"
231
232     _http_manager.ExecRequests(self.nc.values())
233
234     results = {}
235
236     for name, req in self.nc.iteritems():
237       if req.success and req.resp_status_code == http.HTTP_OK:
238         results[name] = RpcResult(data=serializer.LoadJson(req.resp_body),
239                                   node=name, call=self.procedure)
240         continue
241
242       # TODO: Better error reporting
243       if req.error:
244         msg = req.error
245       else:
246         msg = req.resp_body
247
248       logging.error("RPC error in %s from node %s: %s",
249                     self.procedure, name, msg)
250       results[name] = RpcResult(data=msg, failed=True, node=name,
251                                 call=self.procedure)
252
253     return results
254
255
256 class RpcRunner(object):
257   """RPC runner class"""
258
259   def __init__(self, cfg):
260     """Initialized the rpc runner.
261
262     @type cfg:  C{config.ConfigWriter}
263     @param cfg: the configuration object that will be used to get data
264                 about the cluster
265
266     """
267     self._cfg = cfg
268     self.port = utils.GetDaemonPort(constants.NODED)
269
270   def _InstDict(self, instance, hvp=None, bep=None):
271     """Convert the given instance to a dict.
272
273     This is done via the instance's ToDict() method and additionally
274     we fill the hvparams with the cluster defaults.
275
276     @type instance: L{objects.Instance}
277     @param instance: an Instance object
278     @type hvp: dict or None
279     @param hvp: a dictionary with overridden hypervisor parameters
280     @type bep: dict or None
281     @param bep: a dictionary with overridden backend parameters
282     @rtype: dict
283     @return: the instance dict, with the hvparams filled with the
284         cluster defaults
285
286     """
287     idict = instance.ToDict()
288     cluster = self._cfg.GetClusterInfo()
289     idict["hvparams"] = cluster.FillHV(instance)
290     if hvp is not None:
291       idict["hvparams"].update(hvp)
292     idict["beparams"] = cluster.FillBE(instance)
293     if bep is not None:
294       idict["beparams"].update(bep)
295     for nic in idict["nics"]:
296       nic['nicparams'] = objects.FillDict(
297         cluster.nicparams[constants.PP_DEFAULT],
298         nic['nicparams'])
299     return idict
300
301   def _ConnectList(self, client, node_list, call):
302     """Helper for computing node addresses.
303
304     @type client: L{ganeti.rpc.Client}
305     @param client: a C{Client} instance
306     @type node_list: list
307     @param node_list: the node list we should connect
308     @type call: string
309     @param call: the name of the remote procedure call, for filling in
310         correctly any eventual offline nodes' results
311
312     """
313     all_nodes = self._cfg.GetAllNodesInfo()
314     name_list = []
315     addr_list = []
316     skip_dict = {}
317     for node in node_list:
318       if node in all_nodes:
319         if all_nodes[node].offline:
320           skip_dict[node] = RpcResult(node=node, offline=True, call=call)
321           continue
322         val = all_nodes[node].primary_ip
323       else:
324         val = None
325       addr_list.append(val)
326       name_list.append(node)
327     if name_list:
328       client.ConnectList(name_list, address_list=addr_list)
329     return skip_dict
330
331   def _ConnectNode(self, client, node, call):
332     """Helper for computing one node's address.
333
334     @type client: L{ganeti.rpc.Client}
335     @param client: a C{Client} instance
336     @type node: str
337     @param node: the node we should connect
338     @type call: string
339     @param call: the name of the remote procedure call, for filling in
340         correctly any eventual offline nodes' results
341
342     """
343     node_info = self._cfg.GetNodeInfo(node)
344     if node_info is not None:
345       if node_info.offline:
346         return RpcResult(node=node, offline=True, call=call)
347       addr = node_info.primary_ip
348     else:
349       addr = None
350     client.ConnectNode(node, address=addr)
351
352   def _MultiNodeCall(self, node_list, procedure, args):
353     """Helper for making a multi-node call
354
355     """
356     body = serializer.DumpJson(args, indent=False)
357     c = Client(procedure, body, self.port)
358     skip_dict = self._ConnectList(c, node_list, procedure)
359     skip_dict.update(c.GetResults())
360     return skip_dict
361
362   @classmethod
363   def _StaticMultiNodeCall(cls, node_list, procedure, args,
364                            address_list=None):
365     """Helper for making a multi-node static call
366
367     """
368     body = serializer.DumpJson(args, indent=False)
369     c = Client(procedure, body, utils.GetDaemonPort(constants.NODED))
370     c.ConnectList(node_list, address_list=address_list)
371     return c.GetResults()
372
373   def _SingleNodeCall(self, node, procedure, args):
374     """Helper for making a single-node call
375
376     """
377     body = serializer.DumpJson(args, indent=False)
378     c = Client(procedure, body, self.port)
379     result = self._ConnectNode(c, node, procedure)
380     if result is None:
381       # we did connect, node is not offline
382       result = c.GetResults()[node]
383     return result
384
385   @classmethod
386   def _StaticSingleNodeCall(cls, node, procedure, args):
387     """Helper for making a single-node static call
388
389     """
390     body = serializer.DumpJson(args, indent=False)
391     c = Client(procedure, body, utils.GetDaemonPort(constants.NODED))
392     c.ConnectNode(node)
393     return c.GetResults()[node]
394
395   @staticmethod
396   def _Compress(data):
397     """Compresses a string for transport over RPC.
398
399     Small amounts of data are not compressed.
400
401     @type data: str
402     @param data: Data
403     @rtype: tuple
404     @return: Encoded data to send
405
406     """
407     # Small amounts of data are not compressed
408     if len(data) < 512:
409       return (constants.RPC_ENCODING_NONE, data)
410
411     # Compress with zlib and encode in base64
412     return (constants.RPC_ENCODING_ZLIB_BASE64,
413             base64.b64encode(zlib.compress(data, 3)))
414
415   #
416   # Begin RPC calls
417   #
418
419   def call_lv_list(self, node_list, vg_name):
420     """Gets the logical volumes present in a given volume group.
421
422     This is a multi-node call.
423
424     """
425     return self._MultiNodeCall(node_list, "lv_list", [vg_name])
426
427   def call_vg_list(self, node_list):
428     """Gets the volume group list.
429
430     This is a multi-node call.
431
432     """
433     return self._MultiNodeCall(node_list, "vg_list", [])
434
435   def call_storage_list(self, node_list, su_name, su_args, name, fields):
436     """Get list of storage units..
437
438     This is a multi-node call.
439
440     """
441     return self._MultiNodeCall(node_list, "storage_list",
442                                [su_name, su_args, name, fields])
443
444   def call_bridges_exist(self, node, bridges_list):
445     """Checks if a node has all the bridges given.
446
447     This method checks if all bridges given in the bridges_list are
448     present on the remote node, so that an instance that uses interfaces
449     on those bridges can be started.
450
451     This is a single-node call.
452
453     """
454     return self._SingleNodeCall(node, "bridges_exist", [bridges_list])
455
456   def call_instance_start(self, node, instance, hvp, bep):
457     """Starts an instance.
458
459     This is a single-node call.
460
461     """
462     idict = self._InstDict(instance, hvp=hvp, bep=bep)
463     return self._SingleNodeCall(node, "instance_start", [idict])
464
465   def call_instance_shutdown(self, node, instance):
466     """Stops an instance.
467
468     This is a single-node call.
469
470     """
471     return self._SingleNodeCall(node, "instance_shutdown",
472                                 [self._InstDict(instance)])
473
474   def call_migration_info(self, node, instance):
475     """Gather the information necessary to prepare an instance migration.
476
477     This is a single-node call.
478
479     @type node: string
480     @param node: the node on which the instance is currently running
481     @type instance: C{objects.Instance}
482     @param instance: the instance definition
483
484     """
485     return self._SingleNodeCall(node, "migration_info",
486                                 [self._InstDict(instance)])
487
488   def call_accept_instance(self, node, instance, info, target):
489     """Prepare a node to accept an instance.
490
491     This is a single-node call.
492
493     @type node: string
494     @param node: the target node for the migration
495     @type instance: C{objects.Instance}
496     @param instance: the instance definition
497     @type info: opaque/hypervisor specific (string/data)
498     @param info: result for the call_migration_info call
499     @type target: string
500     @param target: target hostname (usually ip address) (on the node itself)
501
502     """
503     return self._SingleNodeCall(node, "accept_instance",
504                                 [self._InstDict(instance), info, target])
505
506   def call_finalize_migration(self, node, instance, info, success):
507     """Finalize any target-node migration specific operation.
508
509     This is called both in case of a successful migration and in case of error
510     (in which case it should abort the migration).
511
512     This is a single-node call.
513
514     @type node: string
515     @param node: the target node for the migration
516     @type instance: C{objects.Instance}
517     @param instance: the instance definition
518     @type info: opaque/hypervisor specific (string/data)
519     @param info: result for the call_migration_info call
520     @type success: boolean
521     @param success: whether the migration was a success or a failure
522
523     """
524     return self._SingleNodeCall(node, "finalize_migration",
525                                 [self._InstDict(instance), info, success])
526
527   def call_instance_migrate(self, node, instance, target, live):
528     """Migrate an instance.
529
530     This is a single-node call.
531
532     @type node: string
533     @param node: the node on which the instance is currently running
534     @type instance: C{objects.Instance}
535     @param instance: the instance definition
536     @type target: string
537     @param target: the target node name
538     @type live: boolean
539     @param live: whether the migration should be done live or not (the
540         interpretation of this parameter is left to the hypervisor)
541
542     """
543     return self._SingleNodeCall(node, "instance_migrate",
544                                 [self._InstDict(instance), target, live])
545
546   def call_instance_reboot(self, node, instance, reboot_type):
547     """Reboots an instance.
548
549     This is a single-node call.
550
551     """
552     return self._SingleNodeCall(node, "instance_reboot",
553                                 [self._InstDict(instance), reboot_type])
554
555   def call_instance_os_add(self, node, inst, reinstall):
556     """Installs an OS on the given instance.
557
558     This is a single-node call.
559
560     """
561     return self._SingleNodeCall(node, "instance_os_add",
562                                 [self._InstDict(inst), reinstall])
563
564   def call_instance_run_rename(self, node, inst, old_name):
565     """Run the OS rename script for an instance.
566
567     This is a single-node call.
568
569     """
570     return self._SingleNodeCall(node, "instance_run_rename",
571                                 [self._InstDict(inst), old_name])
572
573   def call_instance_info(self, node, instance, hname):
574     """Returns information about a single instance.
575
576     This is a single-node call.
577
578     @type node: list
579     @param node: the list of nodes to query
580     @type instance: string
581     @param instance: the instance name
582     @type hname: string
583     @param hname: the hypervisor type of the instance
584
585     """
586     return self._SingleNodeCall(node, "instance_info", [instance, hname])
587
588   def call_instance_migratable(self, node, instance):
589     """Checks whether the given instance can be migrated.
590
591     This is a single-node call.
592
593     @param node: the node to query
594     @type instance: L{objects.Instance}
595     @param instance: the instance to check
596
597
598     """
599     return self._SingleNodeCall(node, "instance_migratable",
600                                 [self._InstDict(instance)])
601
602   def call_all_instances_info(self, node_list, hypervisor_list):
603     """Returns information about all instances on the given nodes.
604
605     This is a multi-node call.
606
607     @type node_list: list
608     @param node_list: the list of nodes to query
609     @type hypervisor_list: list
610     @param hypervisor_list: the hypervisors to query for instances
611
612     """
613     return self._MultiNodeCall(node_list, "all_instances_info",
614                                [hypervisor_list])
615
616   def call_instance_list(self, node_list, hypervisor_list):
617     """Returns the list of running instances on a given node.
618
619     This is a multi-node call.
620
621     @type node_list: list
622     @param node_list: the list of nodes to query
623     @type hypervisor_list: list
624     @param hypervisor_list: the hypervisors to query for instances
625
626     """
627     return self._MultiNodeCall(node_list, "instance_list", [hypervisor_list])
628
629   def call_node_tcp_ping(self, node, source, target, port, timeout,
630                          live_port_needed):
631     """Do a TcpPing on the remote node
632
633     This is a single-node call.
634
635     """
636     return self._SingleNodeCall(node, "node_tcp_ping",
637                                 [source, target, port, timeout,
638                                  live_port_needed])
639
640   def call_node_has_ip_address(self, node, address):
641     """Checks if a node has the given IP address.
642
643     This is a single-node call.
644
645     """
646     return self._SingleNodeCall(node, "node_has_ip_address", [address])
647
648   def call_node_info(self, node_list, vg_name, hypervisor_type):
649     """Return node information.
650
651     This will return memory information and volume group size and free
652     space.
653
654     This is a multi-node call.
655
656     @type node_list: list
657     @param node_list: the list of nodes to query
658     @type vg_name: C{string}
659     @param vg_name: the name of the volume group to ask for disk space
660         information
661     @type hypervisor_type: C{str}
662     @param hypervisor_type: the name of the hypervisor to ask for
663         memory information
664
665     """
666     return self._MultiNodeCall(node_list, "node_info",
667                                [vg_name, hypervisor_type])
668
669   def call_node_add(self, node, dsa, dsapub, rsa, rsapub, ssh, sshpub):
670     """Add a node to the cluster.
671
672     This is a single-node call.
673
674     """
675     return self._SingleNodeCall(node, "node_add",
676                                 [dsa, dsapub, rsa, rsapub, ssh, sshpub])
677
678   def call_node_verify(self, node_list, checkdict, cluster_name):
679     """Request verification of given parameters.
680
681     This is a multi-node call.
682
683     """
684     return self._MultiNodeCall(node_list, "node_verify",
685                                [checkdict, cluster_name])
686
687   @classmethod
688   def call_node_start_master(cls, node, start_daemons, no_voting):
689     """Tells a node to activate itself as a master.
690
691     This is a single-node call.
692
693     """
694     return cls._StaticSingleNodeCall(node, "node_start_master",
695                                      [start_daemons, no_voting])
696
697   @classmethod
698   def call_node_stop_master(cls, node, stop_daemons):
699     """Tells a node to demote itself from master status.
700
701     This is a single-node call.
702
703     """
704     return cls._StaticSingleNodeCall(node, "node_stop_master", [stop_daemons])
705
706   @classmethod
707   def call_master_info(cls, node_list):
708     """Query master info.
709
710     This is a multi-node call.
711
712     """
713     # TODO: should this method query down nodes?
714     return cls._StaticMultiNodeCall(node_list, "master_info", [])
715
716   def call_version(self, node_list):
717     """Query node version.
718
719     This is a multi-node call.
720
721     """
722     return self._MultiNodeCall(node_list, "version", [])
723
724   def call_blockdev_create(self, node, bdev, size, owner, on_primary, info):
725     """Request creation of a given block device.
726
727     This is a single-node call.
728
729     """
730     return self._SingleNodeCall(node, "blockdev_create",
731                                 [bdev.ToDict(), size, owner, on_primary, info])
732
733   def call_blockdev_remove(self, node, bdev):
734     """Request removal of a given block device.
735
736     This is a single-node call.
737
738     """
739     return self._SingleNodeCall(node, "blockdev_remove", [bdev.ToDict()])
740
741   def call_blockdev_rename(self, node, devlist):
742     """Request rename of the given block devices.
743
744     This is a single-node call.
745
746     """
747     return self._SingleNodeCall(node, "blockdev_rename",
748                                 [(d.ToDict(), uid) for d, uid in devlist])
749
750   def call_blockdev_assemble(self, node, disk, owner, on_primary):
751     """Request assembling of a given block device.
752
753     This is a single-node call.
754
755     """
756     return self._SingleNodeCall(node, "blockdev_assemble",
757                                 [disk.ToDict(), owner, on_primary])
758
759   def call_blockdev_shutdown(self, node, disk):
760     """Request shutdown of a given block device.
761
762     This is a single-node call.
763
764     """
765     return self._SingleNodeCall(node, "blockdev_shutdown", [disk.ToDict()])
766
767   def call_blockdev_addchildren(self, node, bdev, ndevs):
768     """Request adding a list of children to a (mirroring) device.
769
770     This is a single-node call.
771
772     """
773     return self._SingleNodeCall(node, "blockdev_addchildren",
774                                 [bdev.ToDict(),
775                                  [disk.ToDict() for disk in ndevs]])
776
777   def call_blockdev_removechildren(self, node, bdev, ndevs):
778     """Request removing a list of children from a (mirroring) device.
779
780     This is a single-node call.
781
782     """
783     return self._SingleNodeCall(node, "blockdev_removechildren",
784                                 [bdev.ToDict(),
785                                  [disk.ToDict() for disk in ndevs]])
786
787   def call_blockdev_getmirrorstatus(self, node, disks):
788     """Request status of a (mirroring) device.
789
790     This is a single-node call.
791
792     """
793     return self._SingleNodeCall(node, "blockdev_getmirrorstatus",
794                                 [dsk.ToDict() for dsk in disks])
795
796   def call_blockdev_find(self, node, disk):
797     """Request identification of a given block device.
798
799     This is a single-node call.
800
801     """
802     return self._SingleNodeCall(node, "blockdev_find", [disk.ToDict()])
803
804   def call_blockdev_close(self, node, instance_name, disks):
805     """Closes the given block devices.
806
807     This is a single-node call.
808
809     """
810     params = [instance_name, [cf.ToDict() for cf in disks]]
811     return self._SingleNodeCall(node, "blockdev_close", params)
812
813   def call_drbd_disconnect_net(self, node_list, nodes_ip, disks):
814     """Disconnects the network of the given drbd devices.
815
816     This is a multi-node call.
817
818     """
819     return self._MultiNodeCall(node_list, "drbd_disconnect_net",
820                                [nodes_ip, [cf.ToDict() for cf in disks]])
821
822   def call_drbd_attach_net(self, node_list, nodes_ip,
823                            disks, instance_name, multimaster):
824     """Disconnects the given drbd devices.
825
826     This is a multi-node call.
827
828     """
829     return self._MultiNodeCall(node_list, "drbd_attach_net",
830                                [nodes_ip, [cf.ToDict() for cf in disks],
831                                 instance_name, multimaster])
832
833   def call_drbd_wait_sync(self, node_list, nodes_ip, disks):
834     """Waits for the synchronization of drbd devices is complete.
835
836     This is a multi-node call.
837
838     """
839     return self._MultiNodeCall(node_list, "drbd_wait_sync",
840                                [nodes_ip, [cf.ToDict() for cf in disks]])
841
842   @classmethod
843   def call_upload_file(cls, node_list, file_name, address_list=None):
844     """Upload a file.
845
846     The node will refuse the operation in case the file is not on the
847     approved file list.
848
849     This is a multi-node call.
850
851     @type node_list: list
852     @param node_list: the list of node names to upload to
853     @type file_name: str
854     @param file_name: the filename to upload
855     @type address_list: list or None
856     @keyword address_list: an optional list of node addresses, in order
857         to optimize the RPC speed
858
859     """
860     file_contents = utils.ReadFile(file_name)
861     data = cls._Compress(file_contents)
862     st = os.stat(file_name)
863     params = [file_name, data, st.st_mode, st.st_uid, st.st_gid,
864               st.st_atime, st.st_mtime]
865     return cls._StaticMultiNodeCall(node_list, "upload_file", params,
866                                     address_list=address_list)
867
868   @classmethod
869   def call_write_ssconf_files(cls, node_list, values):
870     """Write ssconf files.
871
872     This is a multi-node call.
873
874     """
875     return cls._StaticMultiNodeCall(node_list, "write_ssconf_files", [values])
876
877   def call_os_diagnose(self, node_list):
878     """Request a diagnose of OS definitions.
879
880     This is a multi-node call.
881
882     """
883     return self._MultiNodeCall(node_list, "os_diagnose", [])
884
885   def call_os_get(self, node, name):
886     """Returns an OS definition.
887
888     This is a single-node call.
889
890     """
891     result = self._SingleNodeCall(node, "os_get", [name])
892     if not result.failed and isinstance(result.data, dict):
893       result.data = objects.OS.FromDict(result.data)
894     return result
895
896   def call_hooks_runner(self, node_list, hpath, phase, env):
897     """Call the hooks runner.
898
899     Args:
900       - op: the OpCode instance
901       - env: a dictionary with the environment
902
903     This is a multi-node call.
904
905     """
906     params = [hpath, phase, env]
907     return self._MultiNodeCall(node_list, "hooks_runner", params)
908
909   def call_iallocator_runner(self, node, name, idata):
910     """Call an iallocator on a remote node
911
912     Args:
913       - name: the iallocator name
914       - input: the json-encoded input string
915
916     This is a single-node call.
917
918     """
919     return self._SingleNodeCall(node, "iallocator_runner", [name, idata])
920
921   def call_blockdev_grow(self, node, cf_bdev, amount):
922     """Request a snapshot of the given block device.
923
924     This is a single-node call.
925
926     """
927     return self._SingleNodeCall(node, "blockdev_grow",
928                                 [cf_bdev.ToDict(), amount])
929
930   def call_blockdev_snapshot(self, node, cf_bdev):
931     """Request a snapshot of the given block device.
932
933     This is a single-node call.
934
935     """
936     return self._SingleNodeCall(node, "blockdev_snapshot", [cf_bdev.ToDict()])
937
938   def call_snapshot_export(self, node, snap_bdev, dest_node, instance,
939                            cluster_name, idx):
940     """Request the export of a given snapshot.
941
942     This is a single-node call.
943
944     """
945     return self._SingleNodeCall(node, "snapshot_export",
946                                 [snap_bdev.ToDict(), dest_node,
947                                  self._InstDict(instance), cluster_name, idx])
948
949   def call_finalize_export(self, node, instance, snap_disks):
950     """Request the completion of an export operation.
951
952     This writes the export config file, etc.
953
954     This is a single-node call.
955
956     """
957     flat_disks = []
958     for disk in snap_disks:
959       if isinstance(disk, bool):
960         flat_disks.append(disk)
961       else:
962         flat_disks.append(disk.ToDict())
963
964     return self._SingleNodeCall(node, "finalize_export",
965                                 [self._InstDict(instance), flat_disks])
966
967   def call_export_info(self, node, path):
968     """Queries the export information in a given path.
969
970     This is a single-node call.
971
972     """
973     return self._SingleNodeCall(node, "export_info", [path])
974
975   def call_instance_os_import(self, node, inst, src_node, src_images,
976                               cluster_name):
977     """Request the import of a backup into an instance.
978
979     This is a single-node call.
980
981     """
982     return self._SingleNodeCall(node, "instance_os_import",
983                                 [self._InstDict(inst), src_node, src_images,
984                                  cluster_name])
985
986   def call_export_list(self, node_list):
987     """Gets the stored exports list.
988
989     This is a multi-node call.
990
991     """
992     return self._MultiNodeCall(node_list, "export_list", [])
993
994   def call_export_remove(self, node, export):
995     """Requests removal of a given export.
996
997     This is a single-node call.
998
999     """
1000     return self._SingleNodeCall(node, "export_remove", [export])
1001
1002   @classmethod
1003   def call_node_leave_cluster(cls, node):
1004     """Requests a node to clean the cluster information it has.
1005
1006     This will remove the configuration information from the ganeti data
1007     dir.
1008
1009     This is a single-node call.
1010
1011     """
1012     return cls._StaticSingleNodeCall(node, "node_leave_cluster", [])
1013
1014   def call_node_volumes(self, node_list):
1015     """Gets all volumes on node(s).
1016
1017     This is a multi-node call.
1018
1019     """
1020     return self._MultiNodeCall(node_list, "node_volumes", [])
1021
1022   def call_node_demote_from_mc(self, node):
1023     """Demote a node from the master candidate role.
1024
1025     This is a single-node call.
1026
1027     """
1028     return self._SingleNodeCall(node, "node_demote_from_mc", [])
1029
1030
1031   def call_node_powercycle(self, node, hypervisor):
1032     """Tries to powercycle a node.
1033
1034     This is a single-node call.
1035
1036     """
1037     return self._SingleNodeCall(node, "node_powercycle", [hypervisor])
1038
1039
1040   def call_test_delay(self, node_list, duration):
1041     """Sleep for a fixed time on given node(s).
1042
1043     This is a multi-node call.
1044
1045     """
1046     return self._MultiNodeCall(node_list, "test_delay", [duration])
1047
1048   def call_file_storage_dir_create(self, node, file_storage_dir):
1049     """Create the given file storage directory.
1050
1051     This is a single-node call.
1052
1053     """
1054     return self._SingleNodeCall(node, "file_storage_dir_create",
1055                                 [file_storage_dir])
1056
1057   def call_file_storage_dir_remove(self, node, file_storage_dir):
1058     """Remove the given file storage directory.
1059
1060     This is a single-node call.
1061
1062     """
1063     return self._SingleNodeCall(node, "file_storage_dir_remove",
1064                                 [file_storage_dir])
1065
1066   def call_file_storage_dir_rename(self, node, old_file_storage_dir,
1067                                    new_file_storage_dir):
1068     """Rename file storage directory.
1069
1070     This is a single-node call.
1071
1072     """
1073     return self._SingleNodeCall(node, "file_storage_dir_rename",
1074                                 [old_file_storage_dir, new_file_storage_dir])
1075
1076   @classmethod
1077   def call_jobqueue_update(cls, node_list, address_list, file_name, content):
1078     """Update job queue.
1079
1080     This is a multi-node call.
1081
1082     """
1083     return cls._StaticMultiNodeCall(node_list, "jobqueue_update",
1084                                     [file_name, cls._Compress(content)],
1085                                     address_list=address_list)
1086
1087   @classmethod
1088   def call_jobqueue_purge(cls, node):
1089     """Purge job queue.
1090
1091     This is a single-node call.
1092
1093     """
1094     return cls._StaticSingleNodeCall(node, "jobqueue_purge", [])
1095
1096   @classmethod
1097   def call_jobqueue_rename(cls, node_list, address_list, rename):
1098     """Rename a job queue file.
1099
1100     This is a multi-node call.
1101
1102     """
1103     return cls._StaticMultiNodeCall(node_list, "jobqueue_rename", rename,
1104                                     address_list=address_list)
1105
1106   @classmethod
1107   def call_jobqueue_set_drain(cls, node_list, drain_flag):
1108     """Set the drain flag on the queue.
1109
1110     This is a multi-node call.
1111
1112     @type node_list: list
1113     @param node_list: the list of nodes to query
1114     @type drain_flag: bool
1115     @param drain_flag: if True, will set the drain flag, otherwise reset it.
1116
1117     """
1118     return cls._StaticMultiNodeCall(node_list, "jobqueue_set_drain",
1119                                     [drain_flag])
1120
1121   def call_hypervisor_validate_params(self, node_list, hvname, hvparams):
1122     """Validate the hypervisor params.
1123
1124     This is a multi-node call.
1125
1126     @type node_list: list
1127     @param node_list: the list of nodes to query
1128     @type hvname: string
1129     @param hvname: the hypervisor name
1130     @type hvparams: dict
1131     @param hvparams: the hypervisor parameters to be validated
1132
1133     """
1134     cluster = self._cfg.GetClusterInfo()
1135     hv_full = objects.FillDict(cluster.hvparams.get(hvname, {}), hvparams)
1136     return self._MultiNodeCall(node_list, "hypervisor_validate_params",
1137                                [hvname, hv_full])