Implement BuildHooksEnv for NoHooksLU
[ganeti-local] / lib / rpc.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Inter-node RPC library.
23
24 """
25
26 # pylint: disable-msg=C0103,R0201,R0904
27 # C0103: Invalid name, since call_ are not valid
28 # R0201: Method could be a function, we keep all rpcs instance methods
29 # as not to change them back and forth between static/instance methods
30 # if they need to start using instance attributes
31 # R0904: Too many public methods
32
33 import os
34 import logging
35 import zlib
36 import base64
37
38 from ganeti import utils
39 from ganeti import objects
40 from ganeti import http
41 from ganeti import serializer
42 from ganeti import constants
43 from ganeti import errors
44
45 # pylint has a bug here, doesn't see this import
46 import ganeti.http.client  # pylint: disable-msg=W0611
47
48
49 # Module level variable
50 _http_manager = None
51
52
53 def Init():
54   """Initializes the module-global HTTP client manager.
55
56   Must be called before using any RPC function.
57
58   """
59   global _http_manager # pylint: disable-msg=W0603
60
61   assert not _http_manager, "RPC module initialized more than once"
62
63   _http_manager = http.client.HttpClientManager()
64
65
66 def Shutdown():
67   """Stops the module-global HTTP client manager.
68
69   Must be called before quitting the program.
70
71   """
72   global _http_manager # pylint: disable-msg=W0603
73
74   if _http_manager:
75     _http_manager.Shutdown()
76     _http_manager = None
77
78
79 class RpcResult(object):
80   """RPC Result class.
81
82   This class holds an RPC result. It is needed since in multi-node
83   calls we can't raise an exception just because one one out of many
84   failed, and therefore we use this class to encapsulate the result.
85
86   @ivar data: the data payload, for successful results, or None
87   @type failed: boolean
88   @ivar failed: whether the operation failed at RPC level (not
89       application level on the remote node)
90   @ivar call: the name of the RPC call
91   @ivar node: the name of the node to which we made the call
92   @ivar offline: whether the operation failed because the node was
93       offline, as opposed to actual failure; offline=True will always
94       imply failed=True, in order to allow simpler checking if
95       the user doesn't care about the exact failure mode
96
97   """
98   def __init__(self, data=None, failed=False, offline=False,
99                call=None, node=None):
100     self.failed = failed
101     self.offline = offline
102     self.call = call
103     self.node = node
104     if offline:
105       self.failed = True
106       self.error = "Node is marked offline"
107       self.data = self.payload = None
108     elif failed:
109       self.error = data
110       self.data = self.payload = None
111     else:
112       self.data = data
113       self.error = None
114       if isinstance(data, (tuple, list)) and len(data) == 2:
115         self.payload = data[1]
116       else:
117         self.payload = None
118
119   def Raise(self):
120     """If the result has failed, raise an OpExecError.
121
122     This is used so that LU code doesn't have to check for each
123     result, but instead can call this function.
124
125     """
126     if self.failed:
127       raise errors.OpExecError("Call '%s' to node '%s' has failed: %s" %
128                                (self.call, self.node, self.error))
129
130   def RemoteFailMsg(self):
131     """Check if the remote procedure failed.
132
133     This is valid only for RPC calls which return result of the form
134     (status, data | error_msg).
135
136     @return: empty string for succcess, otherwise an error message
137
138     """
139     def _EnsureErr(val):
140       """Helper to ensure we return a 'True' value for error."""
141       if val:
142         return val
143       else:
144         return "No error information"
145
146     if self.failed:
147       return _EnsureErr(self.error)
148     if not isinstance(self.data, (tuple, list)):
149       return "Invalid result type (%s)" % type(self.data)
150     if len(self.data) != 2:
151       return "Invalid result length (%d), expected 2" % len(self.data)
152     if not self.data[0]:
153       return _EnsureErr(self.data[1])
154     return ""
155
156
157 class Client:
158   """RPC Client class.
159
160   This class, given a (remote) method name, a list of parameters and a
161   list of nodes, will contact (in parallel) all nodes, and return a
162   dict of results (key: node name, value: result).
163
164   One current bug is that generic failure is still signaled by
165   'False' result, which is not good. This overloading of values can
166   cause bugs.
167
168   """
169   def __init__(self, procedure, body, port):
170     self.procedure = procedure
171     self.body = body
172     self.port = port
173     self.nc = {}
174
175     self._ssl_params = \
176       http.HttpSslParams(ssl_key_path=constants.SSL_CERT_FILE,
177                          ssl_cert_path=constants.SSL_CERT_FILE)
178
179   def ConnectList(self, node_list, address_list=None):
180     """Add a list of nodes to the target nodes.
181
182     @type node_list: list
183     @param node_list: the list of node names to connect
184     @type address_list: list or None
185     @keyword address_list: either None or a list with node addresses,
186         which must have the same length as the node list
187
188     """
189     if address_list is None:
190       address_list = [None for _ in node_list]
191     else:
192       assert len(node_list) == len(address_list), \
193              "Name and address lists should have the same length"
194     for node, address in zip(node_list, address_list):
195       self.ConnectNode(node, address)
196
197   def ConnectNode(self, name, address=None):
198     """Add a node to the target list.
199
200     @type name: str
201     @param name: the node name
202     @type address: str
203     @keyword address: the node address, if known
204
205     """
206     if address is None:
207       address = name
208
209     self.nc[name] = \
210       http.client.HttpClientRequest(address, self.port, http.HTTP_PUT,
211                                     "/%s" % self.procedure,
212                                     post_data=self.body,
213                                     ssl_params=self._ssl_params,
214                                     ssl_verify_peer=True)
215
216   def GetResults(self):
217     """Call nodes and return results.
218
219     @rtype: list
220     @return: List of RPC results
221
222     """
223     assert _http_manager, "RPC module not initialized"
224
225     _http_manager.ExecRequests(self.nc.values())
226
227     results = {}
228
229     for name, req in self.nc.iteritems():
230       if req.success and req.resp_status_code == http.HTTP_OK:
231         results[name] = RpcResult(data=serializer.LoadJson(req.resp_body),
232                                   node=name, call=self.procedure)
233         continue
234
235       # TODO: Better error reporting
236       if req.error:
237         msg = req.error
238       else:
239         msg = req.resp_body
240
241       logging.error("RPC error in %s from node %s: %s",
242                     self.procedure, name, msg)
243       results[name] = RpcResult(data=msg, failed=True, node=name,
244                                 call=self.procedure)
245
246     return results
247
248
249 class RpcRunner(object):
250   """RPC runner class"""
251
252   def __init__(self, cfg):
253     """Initialized the rpc runner.
254
255     @type cfg:  C{config.ConfigWriter}
256     @param cfg: the configuration object that will be used to get data
257                 about the cluster
258
259     """
260     self._cfg = cfg
261     self.port = utils.GetNodeDaemonPort()
262
263   def _InstDict(self, instance, hvp=None, bep=None):
264     """Convert the given instance to a dict.
265
266     This is done via the instance's ToDict() method and additionally
267     we fill the hvparams with the cluster defaults.
268
269     @type instance: L{objects.Instance}
270     @param instance: an Instance object
271     @type hvp: dict or None
272     @param hvp: a dictionary with overridden hypervisor parameters
273     @type bep: dict or None
274     @param bep: a dictionary with overridden backend parameters
275     @rtype: dict
276     @return: the instance dict, with the hvparams filled with the
277         cluster defaults
278
279     """
280     idict = instance.ToDict()
281     cluster = self._cfg.GetClusterInfo()
282     idict["hvparams"] = cluster.FillHV(instance)
283     if hvp is not None:
284       idict["hvparams"].update(hvp)
285     idict["beparams"] = cluster.FillBE(instance)
286     if bep is not None:
287       idict["beparams"].update(bep)
288     return idict
289
290   def _ConnectList(self, client, node_list, call):
291     """Helper for computing node addresses.
292
293     @type client: L{ganeti.rpc.Client}
294     @param client: a C{Client} instance
295     @type node_list: list
296     @param node_list: the node list we should connect
297     @type call: string
298     @param call: the name of the remote procedure call, for filling in
299         correctly any eventual offline nodes' results
300
301     """
302     all_nodes = self._cfg.GetAllNodesInfo()
303     name_list = []
304     addr_list = []
305     skip_dict = {}
306     for node in node_list:
307       if node in all_nodes:
308         if all_nodes[node].offline:
309           skip_dict[node] = RpcResult(node=node, offline=True, call=call)
310           continue
311         val = all_nodes[node].primary_ip
312       else:
313         val = None
314       addr_list.append(val)
315       name_list.append(node)
316     if name_list:
317       client.ConnectList(name_list, address_list=addr_list)
318     return skip_dict
319
320   def _ConnectNode(self, client, node, call):
321     """Helper for computing one node's address.
322
323     @type client: L{ganeti.rpc.Client}
324     @param client: a C{Client} instance
325     @type node: str
326     @param node: the node we should connect
327     @type call: string
328     @param call: the name of the remote procedure call, for filling in
329         correctly any eventual offline nodes' results
330
331     """
332     node_info = self._cfg.GetNodeInfo(node)
333     if node_info is not None:
334       if node_info.offline:
335         return RpcResult(node=node, offline=True, call=call)
336       addr = node_info.primary_ip
337     else:
338       addr = None
339     client.ConnectNode(node, address=addr)
340
341   def _MultiNodeCall(self, node_list, procedure, args):
342     """Helper for making a multi-node call
343
344     """
345     body = serializer.DumpJson(args, indent=False)
346     c = Client(procedure, body, self.port)
347     skip_dict = self._ConnectList(c, node_list, procedure)
348     skip_dict.update(c.GetResults())
349     return skip_dict
350
351   @classmethod
352   def _StaticMultiNodeCall(cls, node_list, procedure, args,
353                            address_list=None):
354     """Helper for making a multi-node static call
355
356     """
357     body = serializer.DumpJson(args, indent=False)
358     c = Client(procedure, body, utils.GetNodeDaemonPort())
359     c.ConnectList(node_list, address_list=address_list)
360     return c.GetResults()
361
362   def _SingleNodeCall(self, node, procedure, args):
363     """Helper for making a single-node call
364
365     """
366     body = serializer.DumpJson(args, indent=False)
367     c = Client(procedure, body, self.port)
368     result = self._ConnectNode(c, node, procedure)
369     if result is None:
370       # we did connect, node is not offline
371       result = c.GetResults()[node]
372     return result
373
374   @classmethod
375   def _StaticSingleNodeCall(cls, node, procedure, args):
376     """Helper for making a single-node static call
377
378     """
379     body = serializer.DumpJson(args, indent=False)
380     c = Client(procedure, body, utils.GetNodeDaemonPort())
381     c.ConnectNode(node)
382     return c.GetResults()[node]
383
384   @staticmethod
385   def _Compress(data):
386     """Compresses a string for transport over RPC.
387
388     Small amounts of data are not compressed.
389
390     @type data: str
391     @param data: Data
392     @rtype: tuple
393     @return: Encoded data to send
394
395     """
396     # Small amounts of data are not compressed
397     if len(data) < 512:
398       return (constants.RPC_ENCODING_NONE, data)
399
400     # Compress with zlib and encode in base64
401     return (constants.RPC_ENCODING_ZLIB_BASE64,
402             base64.b64encode(zlib.compress(data, 3)))
403
404   #
405   # Begin RPC calls
406   #
407
408   def call_volume_list(self, node_list, vg_name):
409     """Gets the logical volumes present in a given volume group.
410
411     This is a multi-node call.
412
413     """
414     return self._MultiNodeCall(node_list, "volume_list", [vg_name])
415
416   def call_vg_list(self, node_list):
417     """Gets the volume group list.
418
419     This is a multi-node call.
420
421     """
422     return self._MultiNodeCall(node_list, "vg_list", [])
423
424   def call_bridges_exist(self, node, bridges_list):
425     """Checks if a node has all the bridges given.
426
427     This method checks if all bridges given in the bridges_list are
428     present on the remote node, so that an instance that uses interfaces
429     on those bridges can be started.
430
431     This is a single-node call.
432
433     """
434     return self._SingleNodeCall(node, "bridges_exist", [bridges_list])
435
436   def call_instance_start(self, node, instance, hvp, bep):
437     """Starts an instance.
438
439     This is a single-node call.
440
441     """
442     idict = self._InstDict(instance, hvp=hvp, bep=bep)
443     return self._SingleNodeCall(node, "instance_start", [idict])
444
445   def call_instance_shutdown(self, node, instance):
446     """Stops an instance.
447
448     This is a single-node call.
449
450     """
451     return self._SingleNodeCall(node, "instance_shutdown",
452                                 [self._InstDict(instance)])
453
454   def call_migration_info(self, node, instance):
455     """Gather the information necessary to prepare an instance migration.
456
457     This is a single-node call.
458
459     @type node: string
460     @param node: the node on which the instance is currently running
461     @type instance: C{objects.Instance}
462     @param instance: the instance definition
463
464     """
465     return self._SingleNodeCall(node, "migration_info",
466                                 [self._InstDict(instance)])
467
468   def call_accept_instance(self, node, instance, info, target):
469     """Prepare a node to accept an instance.
470
471     This is a single-node call.
472
473     @type node: string
474     @param node: the target node for the migration
475     @type instance: C{objects.Instance}
476     @param instance: the instance definition
477     @type info: opaque/hypervisor specific (string/data)
478     @param info: result for the call_migration_info call
479     @type target: string
480     @param target: target hostname (usually ip address) (on the node itself)
481
482     """
483     return self._SingleNodeCall(node, "accept_instance",
484                                 [self._InstDict(instance), info, target])
485
486   def call_finalize_migration(self, node, instance, info, success):
487     """Finalize any target-node migration specific operation.
488
489     This is called both in case of a successful migration and in case of error
490     (in which case it should abort the migration).
491
492     This is a single-node call.
493
494     @type node: string
495     @param node: the target node for the migration
496     @type instance: C{objects.Instance}
497     @param instance: the instance definition
498     @type info: opaque/hypervisor specific (string/data)
499     @param info: result for the call_migration_info call
500     @type success: boolean
501     @param success: whether the migration was a success or a failure
502
503     """
504     return self._SingleNodeCall(node, "finalize_migration",
505                                 [self._InstDict(instance), info, success])
506
507   def call_instance_migrate(self, node, instance, target, live):
508     """Migrate an instance.
509
510     This is a single-node call.
511
512     @type node: string
513     @param node: the node on which the instance is currently running
514     @type instance: C{objects.Instance}
515     @param instance: the instance definition
516     @type target: string
517     @param target: the target node name
518     @type live: boolean
519     @param live: whether the migration should be done live or not (the
520         interpretation of this parameter is left to the hypervisor)
521
522     """
523     return self._SingleNodeCall(node, "instance_migrate",
524                                 [self._InstDict(instance), target, live])
525
526   def call_instance_reboot(self, node, instance, reboot_type):
527     """Reboots an instance.
528
529     This is a single-node call.
530
531     """
532     return self._SingleNodeCall(node, "instance_reboot",
533                                 [self._InstDict(instance), reboot_type])
534
535   def call_instance_os_add(self, node, inst):
536     """Installs an OS on the given instance.
537
538     This is a single-node call.
539
540     """
541     return self._SingleNodeCall(node, "instance_os_add",
542                                 [self._InstDict(inst)])
543
544   def call_instance_run_rename(self, node, inst, old_name):
545     """Run the OS rename script for an instance.
546
547     This is a single-node call.
548
549     """
550     return self._SingleNodeCall(node, "instance_run_rename",
551                                 [self._InstDict(inst), old_name])
552
553   def call_instance_info(self, node, instance, hname):
554     """Returns information about a single instance.
555
556     This is a single-node call.
557
558     @type node: list
559     @param node: the list of nodes to query
560     @type instance: string
561     @param instance: the instance name
562     @type hname: string
563     @param hname: the hypervisor type of the instance
564
565     """
566     return self._SingleNodeCall(node, "instance_info", [instance, hname])
567
568   def call_instance_migratable(self, node, instance):
569     """Checks whether the given instance can be migrated.
570
571     This is a single-node call.
572
573     @param node: the node to query
574     @type instance: L{objects.Instance}
575     @param instance: the instance to check
576
577
578     """
579     return self._SingleNodeCall(node, "instance_migratable",
580                                 [self._InstDict(instance)])
581
582   def call_all_instances_info(self, node_list, hypervisor_list):
583     """Returns information about all instances on the given nodes.
584
585     This is a multi-node call.
586
587     @type node_list: list
588     @param node_list: the list of nodes to query
589     @type hypervisor_list: list
590     @param hypervisor_list: the hypervisors to query for instances
591
592     """
593     return self._MultiNodeCall(node_list, "all_instances_info",
594                                [hypervisor_list])
595
596   def call_instance_list(self, node_list, hypervisor_list):
597     """Returns the list of running instances on a given node.
598
599     This is a multi-node call.
600
601     @type node_list: list
602     @param node_list: the list of nodes to query
603     @type hypervisor_list: list
604     @param hypervisor_list: the hypervisors to query for instances
605
606     """
607     return self._MultiNodeCall(node_list, "instance_list", [hypervisor_list])
608
609   def call_node_tcp_ping(self, node, source, target, port, timeout,
610                          live_port_needed):
611     """Do a TcpPing on the remote node
612
613     This is a single-node call.
614
615     """
616     return self._SingleNodeCall(node, "node_tcp_ping",
617                                 [source, target, port, timeout,
618                                  live_port_needed])
619
620   def call_node_has_ip_address(self, node, address):
621     """Checks if a node has the given IP address.
622
623     This is a single-node call.
624
625     """
626     return self._SingleNodeCall(node, "node_has_ip_address", [address])
627
628   def call_node_info(self, node_list, vg_name, hypervisor_type):
629     """Return node information.
630
631     This will return memory information and volume group size and free
632     space.
633
634     This is a multi-node call.
635
636     @type node_list: list
637     @param node_list: the list of nodes to query
638     @type vg_name: C{string}
639     @param vg_name: the name of the volume group to ask for disk space
640         information
641     @type hypervisor_type: C{str}
642     @param hypervisor_type: the name of the hypervisor to ask for
643         memory information
644
645     """
646     retux = self._MultiNodeCall(node_list, "node_info",
647                                 [vg_name, hypervisor_type])
648
649     for result in retux.itervalues():
650       if result.failed or not isinstance(result.data, dict):
651         result.data = {}
652       if result.offline:
653         log_name = None
654       else:
655         log_name = "call_node_info"
656
657       utils.CheckDict(result.data, {
658         'memory_total' : '-',
659         'memory_dom0' : '-',
660         'memory_free' : '-',
661         'vg_size' : 'node_unreachable',
662         'vg_free' : '-',
663         }, log_name)
664     return retux
665
666   def call_node_add(self, node, dsa, dsapub, rsa, rsapub, ssh, sshpub):
667     """Add a node to the cluster.
668
669     This is a single-node call.
670
671     """
672     return self._SingleNodeCall(node, "node_add",
673                                 [dsa, dsapub, rsa, rsapub, ssh, sshpub])
674
675   def call_node_verify(self, node_list, checkdict, cluster_name):
676     """Request verification of given parameters.
677
678     This is a multi-node call.
679
680     """
681     return self._MultiNodeCall(node_list, "node_verify",
682                                [checkdict, cluster_name])
683
684   @classmethod
685   def call_node_start_master(cls, node, start_daemons, no_voting):
686     """Tells a node to activate itself as a master.
687
688     This is a single-node call.
689
690     """
691     return cls._StaticSingleNodeCall(node, "node_start_master",
692                                      [start_daemons, no_voting])
693
694   @classmethod
695   def call_node_stop_master(cls, node, stop_daemons):
696     """Tells a node to demote itself from master status.
697
698     This is a single-node call.
699
700     """
701     return cls._StaticSingleNodeCall(node, "node_stop_master", [stop_daemons])
702
703   @classmethod
704   def call_master_info(cls, node_list):
705     """Query master info.
706
707     This is a multi-node call.
708
709     """
710     # TODO: should this method query down nodes?
711     return cls._StaticMultiNodeCall(node_list, "master_info", [])
712
713   def call_version(self, node_list):
714     """Query node version.
715
716     This is a multi-node call.
717
718     """
719     return self._MultiNodeCall(node_list, "version", [])
720
721   def call_blockdev_create(self, node, bdev, size, owner, on_primary, info):
722     """Request creation of a given block device.
723
724     This is a single-node call.
725
726     """
727     return self._SingleNodeCall(node, "blockdev_create",
728                                 [bdev.ToDict(), size, owner, on_primary, info])
729
730   def call_blockdev_remove(self, node, bdev):
731     """Request removal of a given block device.
732
733     This is a single-node call.
734
735     """
736     return self._SingleNodeCall(node, "blockdev_remove", [bdev.ToDict()])
737
738   def call_blockdev_rename(self, node, devlist):
739     """Request rename of the given block devices.
740
741     This is a single-node call.
742
743     """
744     return self._SingleNodeCall(node, "blockdev_rename",
745                                 [(d.ToDict(), uid) for d, uid in devlist])
746
747   def call_blockdev_assemble(self, node, disk, owner, on_primary):
748     """Request assembling of a given block device.
749
750     This is a single-node call.
751
752     """
753     return self._SingleNodeCall(node, "blockdev_assemble",
754                                 [disk.ToDict(), owner, on_primary])
755
756   def call_blockdev_shutdown(self, node, disk):
757     """Request shutdown of a given block device.
758
759     This is a single-node call.
760
761     """
762     return self._SingleNodeCall(node, "blockdev_shutdown", [disk.ToDict()])
763
764   def call_blockdev_addchildren(self, node, bdev, ndevs):
765     """Request adding a list of children to a (mirroring) device.
766
767     This is a single-node call.
768
769     """
770     return self._SingleNodeCall(node, "blockdev_addchildren",
771                                 [bdev.ToDict(),
772                                  [disk.ToDict() for disk in ndevs]])
773
774   def call_blockdev_removechildren(self, node, bdev, ndevs):
775     """Request removing a list of children from a (mirroring) device.
776
777     This is a single-node call.
778
779     """
780     return self._SingleNodeCall(node, "blockdev_removechildren",
781                                 [bdev.ToDict(),
782                                  [disk.ToDict() for disk in ndevs]])
783
784   def call_blockdev_getmirrorstatus(self, node, disks):
785     """Request status of a (mirroring) device.
786
787     This is a single-node call.
788
789     """
790     return self._SingleNodeCall(node, "blockdev_getmirrorstatus",
791                                 [dsk.ToDict() for dsk in disks])
792
793   def call_blockdev_find(self, node, disk):
794     """Request identification of a given block device.
795
796     This is a single-node call.
797
798     """
799     return self._SingleNodeCall(node, "blockdev_find", [disk.ToDict()])
800
801   def call_blockdev_close(self, node, instance_name, disks):
802     """Closes the given block devices.
803
804     This is a single-node call.
805
806     """
807     params = [instance_name, [cf.ToDict() for cf in disks]]
808     return self._SingleNodeCall(node, "blockdev_close", params)
809
810   def call_blockdev_getsizes(self, node, disks):
811     """Returns the size of the given disks.
812
813     This is a single-node call.
814
815     """
816     params = [[cf.ToDict() for cf in disks]]
817     return self._SingleNodeCall(node, "blockdev_getsize", params)
818
819   def call_drbd_disconnect_net(self, node_list, nodes_ip, disks):
820     """Disconnects the network of the given drbd devices.
821
822     This is a multi-node call.
823
824     """
825     return self._MultiNodeCall(node_list, "drbd_disconnect_net",
826                                [nodes_ip, [cf.ToDict() for cf in disks]])
827
828   def call_drbd_attach_net(self, node_list, nodes_ip,
829                            disks, instance_name, multimaster):
830     """Disconnects the given drbd devices.
831
832     This is a multi-node call.
833
834     """
835     return self._MultiNodeCall(node_list, "drbd_attach_net",
836                                [nodes_ip, [cf.ToDict() for cf in disks],
837                                 instance_name, multimaster])
838
839   def call_drbd_wait_sync(self, node_list, nodes_ip, disks):
840     """Waits for the synchronization of drbd devices is complete.
841
842     This is a multi-node call.
843
844     """
845     return self._MultiNodeCall(node_list, "drbd_wait_sync",
846                                [nodes_ip, [cf.ToDict() for cf in disks]])
847
848   @classmethod
849   def call_upload_file(cls, node_list, file_name, address_list=None):
850     """Upload a file.
851
852     The node will refuse the operation in case the file is not on the
853     approved file list.
854
855     This is a multi-node call.
856
857     @type node_list: list
858     @param node_list: the list of node names to upload to
859     @type file_name: str
860     @param file_name: the filename to upload
861     @type address_list: list or None
862     @keyword address_list: an optional list of node addresses, in order
863         to optimize the RPC speed
864
865     """
866     file_contents = utils.ReadFile(file_name)
867     data = cls._Compress(file_contents)
868     st = os.stat(file_name)
869     params = [file_name, data, st.st_mode, st.st_uid, st.st_gid,
870               st.st_atime, st.st_mtime]
871     return cls._StaticMultiNodeCall(node_list, "upload_file", params,
872                                     address_list=address_list)
873
874   @classmethod
875   def call_write_ssconf_files(cls, node_list, values):
876     """Write ssconf files.
877
878     This is a multi-node call.
879
880     """
881     return cls._StaticMultiNodeCall(node_list, "write_ssconf_files", [values])
882
883   def call_os_diagnose(self, node_list):
884     """Request a diagnose of OS definitions.
885
886     This is a multi-node call.
887
888     """
889     result = self._MultiNodeCall(node_list, "os_diagnose", [])
890
891     for node_result in result.values():
892       if not node_result.failed and node_result.data:
893         node_result.data = [objects.OS.FromDict(oss)
894                             for oss in node_result.data]
895     return result
896
897   def call_os_get(self, node, name):
898     """Returns an OS definition.
899
900     This is a single-node call.
901
902     """
903     result = self._SingleNodeCall(node, "os_get", [name])
904     if not result.failed and isinstance(result.data, dict):
905       result.data = objects.OS.FromDict(result.data)
906     return result
907
908   def call_hooks_runner(self, node_list, hpath, phase, env):
909     """Call the hooks runner.
910
911     Args:
912       - op: the OpCode instance
913       - env: a dictionary with the environment
914
915     This is a multi-node call.
916
917     """
918     params = [hpath, phase, env]
919     return self._MultiNodeCall(node_list, "hooks_runner", params)
920
921   def call_iallocator_runner(self, node, name, idata):
922     """Call an iallocator on a remote node
923
924     Args:
925       - name: the iallocator name
926       - input: the json-encoded input string
927
928     This is a single-node call.
929
930     """
931     return self._SingleNodeCall(node, "iallocator_runner", [name, idata])
932
933   def call_blockdev_grow(self, node, cf_bdev, amount):
934     """Request a snapshot of the given block device.
935
936     This is a single-node call.
937
938     """
939     return self._SingleNodeCall(node, "blockdev_grow",
940                                 [cf_bdev.ToDict(), amount])
941
942   def call_blockdev_snapshot(self, node, cf_bdev):
943     """Request a snapshot of the given block device.
944
945     This is a single-node call.
946
947     """
948     return self._SingleNodeCall(node, "blockdev_snapshot", [cf_bdev.ToDict()])
949
950   def call_snapshot_export(self, node, snap_bdev, dest_node, instance,
951                            cluster_name, idx):
952     """Request the export of a given snapshot.
953
954     This is a single-node call.
955
956     """
957     return self._SingleNodeCall(node, "snapshot_export",
958                                 [snap_bdev.ToDict(), dest_node,
959                                  self._InstDict(instance), cluster_name, idx])
960
961   def call_finalize_export(self, node, instance, snap_disks):
962     """Request the completion of an export operation.
963
964     This writes the export config file, etc.
965
966     This is a single-node call.
967
968     """
969     flat_disks = []
970     for disk in snap_disks:
971       if isinstance(disk, bool):
972         flat_disks.append(disk)
973       else:
974         flat_disks.append(disk.ToDict())
975
976     return self._SingleNodeCall(node, "finalize_export",
977                                 [self._InstDict(instance), flat_disks])
978
979   def call_export_info(self, node, path):
980     """Queries the export information in a given path.
981
982     This is a single-node call.
983
984     """
985     result = self._SingleNodeCall(node, "export_info", [path])
986     if not result.failed and result.data:
987       result.data = objects.SerializableConfigParser.Loads(str(result.data))
988     return result
989
990   def call_instance_os_import(self, node, inst, src_node, src_images,
991                               cluster_name):
992     """Request the import of a backup into an instance.
993
994     This is a single-node call.
995
996     """
997     return self._SingleNodeCall(node, "instance_os_import",
998                                 [self._InstDict(inst), src_node, src_images,
999                                  cluster_name])
1000
1001   def call_export_list(self, node_list):
1002     """Gets the stored exports list.
1003
1004     This is a multi-node call.
1005
1006     """
1007     return self._MultiNodeCall(node_list, "export_list", [])
1008
1009   def call_export_remove(self, node, export):
1010     """Requests removal of a given export.
1011
1012     This is a single-node call.
1013
1014     """
1015     return self._SingleNodeCall(node, "export_remove", [export])
1016
1017   @classmethod
1018   def call_node_leave_cluster(cls, node):
1019     """Requests a node to clean the cluster information it has.
1020
1021     This will remove the configuration information from the ganeti data
1022     dir.
1023
1024     This is a single-node call.
1025
1026     """
1027     return cls._StaticSingleNodeCall(node, "node_leave_cluster", [])
1028
1029   def call_node_volumes(self, node_list):
1030     """Gets all volumes on node(s).
1031
1032     This is a multi-node call.
1033
1034     """
1035     return self._MultiNodeCall(node_list, "node_volumes", [])
1036
1037   def call_node_demote_from_mc(self, node):
1038     """Demote a node from the master candidate role.
1039
1040     This is a single-node call.
1041
1042     """
1043     return self._SingleNodeCall(node, "node_demote_from_mc", [])
1044
1045   def call_test_delay(self, node_list, duration):
1046     """Sleep for a fixed time on given node(s).
1047
1048     This is a multi-node call.
1049
1050     """
1051     return self._MultiNodeCall(node_list, "test_delay", [duration])
1052
1053   def call_file_storage_dir_create(self, node, file_storage_dir):
1054     """Create the given file storage directory.
1055
1056     This is a single-node call.
1057
1058     """
1059     return self._SingleNodeCall(node, "file_storage_dir_create",
1060                                 [file_storage_dir])
1061
1062   def call_file_storage_dir_remove(self, node, file_storage_dir):
1063     """Remove the given file storage directory.
1064
1065     This is a single-node call.
1066
1067     """
1068     return self._SingleNodeCall(node, "file_storage_dir_remove",
1069                                 [file_storage_dir])
1070
1071   def call_file_storage_dir_rename(self, node, old_file_storage_dir,
1072                                    new_file_storage_dir):
1073     """Rename file storage directory.
1074
1075     This is a single-node call.
1076
1077     """
1078     return self._SingleNodeCall(node, "file_storage_dir_rename",
1079                                 [old_file_storage_dir, new_file_storage_dir])
1080
1081   @classmethod
1082   def call_jobqueue_update(cls, node_list, address_list, file_name, content):
1083     """Update job queue.
1084
1085     This is a multi-node call.
1086
1087     """
1088     return cls._StaticMultiNodeCall(node_list, "jobqueue_update",
1089                                     [file_name, cls._Compress(content)],
1090                                     address_list=address_list)
1091
1092   @classmethod
1093   def call_jobqueue_purge(cls, node):
1094     """Purge job queue.
1095
1096     This is a single-node call.
1097
1098     """
1099     return cls._StaticSingleNodeCall(node, "jobqueue_purge", [])
1100
1101   @classmethod
1102   def call_jobqueue_rename(cls, node_list, address_list, rename):
1103     """Rename a job queue file.
1104
1105     This is a multi-node call.
1106
1107     """
1108     return cls._StaticMultiNodeCall(node_list, "jobqueue_rename", rename,
1109                                     address_list=address_list)
1110
1111   @classmethod
1112   def call_jobqueue_set_drain(cls, node_list, drain_flag):
1113     """Set the drain flag on the queue.
1114
1115     This is a multi-node call.
1116
1117     @type node_list: list
1118     @param node_list: the list of nodes to query
1119     @type drain_flag: bool
1120     @param drain_flag: if True, will set the drain flag, otherwise reset it.
1121
1122     """
1123     return cls._StaticMultiNodeCall(node_list, "jobqueue_set_drain",
1124                                     [drain_flag])
1125
1126   def call_hypervisor_validate_params(self, node_list, hvname, hvparams):
1127     """Validate the hypervisor params.
1128
1129     This is a multi-node call.
1130
1131     @type node_list: list
1132     @param node_list: the list of nodes to query
1133     @type hvname: string
1134     @param hvname: the hypervisor name
1135     @type hvparams: dict
1136     @param hvparams: the hypervisor parameters to be validated
1137
1138     """
1139     cluster = self._cfg.GetClusterInfo()
1140     hv_full = cluster.FillDict(cluster.hvparams.get(hvname, {}), hvparams)
1141     return self._MultiNodeCall(node_list, "hypervisor_validate_params",
1142                                [hvname, hv_full])