Add RPC calls to modify storage fields
[ganeti-local] / lib / rpc.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Inter-node RPC library.
23
24 """
25
26 # pylint: disable-msg=C0103,R0201,R0904
27 # C0103: Invalid name, since call_ are not valid
28 # R0201: Method could be a function, we keep all rpcs instance methods
29 # as not to change them back and forth between static/instance methods
30 # if they need to start using instance attributes
31 # R0904: Too many public methods
32
33 import os
34 import logging
35 import zlib
36 import base64
37
38 from ganeti import utils
39 from ganeti import objects
40 from ganeti import http
41 from ganeti import serializer
42 from ganeti import constants
43 from ganeti import errors
44
45 import ganeti.http.client
46
47
48 # Module level variable
49 _http_manager = None
50
51
52 def Init():
53   """Initializes the module-global HTTP client manager.
54
55   Must be called before using any RPC function.
56
57   """
58   global _http_manager
59
60   assert not _http_manager, "RPC module initialized more than once"
61
62   _http_manager = http.client.HttpClientManager()
63
64
65 def Shutdown():
66   """Stops the module-global HTTP client manager.
67
68   Must be called before quitting the program.
69
70   """
71   global _http_manager
72
73   if _http_manager:
74     _http_manager.Shutdown()
75     _http_manager = None
76
77
78 class RpcResult(object):
79   """RPC Result class.
80
81   This class holds an RPC result. It is needed since in multi-node
82   calls we can't raise an exception just because one one out of many
83   failed, and therefore we use this class to encapsulate the result.
84
85   @ivar data: the data payload, for successful results, or None
86   @type failed: boolean
87   @ivar failed: whether the operation failed at transport level (not
88       application level on the remote node)
89   @ivar call: the name of the RPC call
90   @ivar node: the name of the node to which we made the call
91   @ivar offline: whether the operation failed because the node was
92       offline, as opposed to actual failure; offline=True will always
93       imply failed=True, in order to allow simpler checking if
94       the user doesn't care about the exact failure mode
95   @ivar fail_msg: the error message if the call failed
96
97   """
98   def __init__(self, data=None, failed=False, offline=False,
99                call=None, node=None):
100     self.failed = failed
101     self.offline = offline
102     self.call = call
103     self.node = node
104     if offline:
105       self.failed = True
106       self.fail_msg = "Node is marked offline"
107       self.data = self.payload = None
108     elif failed:
109       self.fail_msg = self._EnsureErr(data)
110       self.data = self.payload = None
111     else:
112       self.data = data
113       if not isinstance(self.data, (tuple, list)):
114         self.fail_msg = ("RPC layer error: invalid result type (%s)" %
115                          type(self.data))
116       elif len(data) != 2:
117         self.fail_msg = ("RPC layer error: invalid result length (%d), "
118                          "expected 2" % len(self.data))
119       elif not self.data[0]:
120         self.fail_msg = self._EnsureErr(self.data[1])
121       else:
122         # finally success
123         self.fail_msg = None
124         self.payload = data[1]
125
126   @staticmethod
127   def _EnsureErr(val):
128     """Helper to ensure we return a 'True' value for error."""
129     if val:
130       return val
131     else:
132       return "No error information"
133
134   def Raise(self, msg, prereq=False):
135     """If the result has failed, raise an OpExecError.
136
137     This is used so that LU code doesn't have to check for each
138     result, but instead can call this function.
139
140     """
141     if not self.fail_msg:
142       return
143
144     if not msg: # one could pass None for default message
145       msg = ("Call '%s' to node '%s' has failed: %s" %
146              (self.call, self.node, self.fail_msg))
147     else:
148       msg = "%s: %s" % (msg, self.fail_msg)
149     if prereq:
150       ec = errors.OpPrereqError
151     else:
152       ec = errors.OpExecError
153     raise ec(msg)
154
155   def RemoteFailMsg(self):
156     """Check if the remote procedure failed.
157
158     @return: the fail_msg attribute
159
160     """
161     return self.fail_msg
162
163
164 class Client:
165   """RPC Client class.
166
167   This class, given a (remote) method name, a list of parameters and a
168   list of nodes, will contact (in parallel) all nodes, and return a
169   dict of results (key: node name, value: result).
170
171   One current bug is that generic failure is still signaled by
172   'False' result, which is not good. This overloading of values can
173   cause bugs.
174
175   """
176   def __init__(self, procedure, body, port):
177     self.procedure = procedure
178     self.body = body
179     self.port = port
180     self.nc = {}
181
182     self._ssl_params = \
183       http.HttpSslParams(ssl_key_path=constants.SSL_CERT_FILE,
184                          ssl_cert_path=constants.SSL_CERT_FILE)
185
186   def ConnectList(self, node_list, address_list=None):
187     """Add a list of nodes to the target nodes.
188
189     @type node_list: list
190     @param node_list: the list of node names to connect
191     @type address_list: list or None
192     @keyword address_list: either None or a list with node addresses,
193         which must have the same length as the node list
194
195     """
196     if address_list is None:
197       address_list = [None for _ in node_list]
198     else:
199       assert len(node_list) == len(address_list), \
200              "Name and address lists should have the same length"
201     for node, address in zip(node_list, address_list):
202       self.ConnectNode(node, address)
203
204   def ConnectNode(self, name, address=None):
205     """Add a node to the target list.
206
207     @type name: str
208     @param name: the node name
209     @type address: str
210     @keyword address: the node address, if known
211
212     """
213     if address is None:
214       address = name
215
216     self.nc[name] = \
217       http.client.HttpClientRequest(address, self.port, http.HTTP_PUT,
218                                     "/%s" % self.procedure,
219                                     post_data=self.body,
220                                     ssl_params=self._ssl_params,
221                                     ssl_verify_peer=True)
222
223   def GetResults(self):
224     """Call nodes and return results.
225
226     @rtype: list
227     @return: List of RPC results
228
229     """
230     assert _http_manager, "RPC module not initialized"
231
232     _http_manager.ExecRequests(self.nc.values())
233
234     results = {}
235
236     for name, req in self.nc.iteritems():
237       if req.success and req.resp_status_code == http.HTTP_OK:
238         results[name] = RpcResult(data=serializer.LoadJson(req.resp_body),
239                                   node=name, call=self.procedure)
240         continue
241
242       # TODO: Better error reporting
243       if req.error:
244         msg = req.error
245       else:
246         msg = req.resp_body
247
248       logging.error("RPC error in %s from node %s: %s",
249                     self.procedure, name, msg)
250       results[name] = RpcResult(data=msg, failed=True, node=name,
251                                 call=self.procedure)
252
253     return results
254
255
256 class RpcRunner(object):
257   """RPC runner class"""
258
259   def __init__(self, cfg):
260     """Initialized the rpc runner.
261
262     @type cfg:  C{config.ConfigWriter}
263     @param cfg: the configuration object that will be used to get data
264                 about the cluster
265
266     """
267     self._cfg = cfg
268     self.port = utils.GetDaemonPort(constants.NODED)
269
270   def _InstDict(self, instance, hvp=None, bep=None):
271     """Convert the given instance to a dict.
272
273     This is done via the instance's ToDict() method and additionally
274     we fill the hvparams with the cluster defaults.
275
276     @type instance: L{objects.Instance}
277     @param instance: an Instance object
278     @type hvp: dict or None
279     @param hvp: a dictionary with overridden hypervisor parameters
280     @type bep: dict or None
281     @param bep: a dictionary with overridden backend parameters
282     @rtype: dict
283     @return: the instance dict, with the hvparams filled with the
284         cluster defaults
285
286     """
287     idict = instance.ToDict()
288     cluster = self._cfg.GetClusterInfo()
289     idict["hvparams"] = cluster.FillHV(instance)
290     if hvp is not None:
291       idict["hvparams"].update(hvp)
292     idict["beparams"] = cluster.FillBE(instance)
293     if bep is not None:
294       idict["beparams"].update(bep)
295     for nic in idict["nics"]:
296       nic['nicparams'] = objects.FillDict(
297         cluster.nicparams[constants.PP_DEFAULT],
298         nic['nicparams'])
299     return idict
300
301   def _ConnectList(self, client, node_list, call):
302     """Helper for computing node addresses.
303
304     @type client: L{ganeti.rpc.Client}
305     @param client: a C{Client} instance
306     @type node_list: list
307     @param node_list: the node list we should connect
308     @type call: string
309     @param call: the name of the remote procedure call, for filling in
310         correctly any eventual offline nodes' results
311
312     """
313     all_nodes = self._cfg.GetAllNodesInfo()
314     name_list = []
315     addr_list = []
316     skip_dict = {}
317     for node in node_list:
318       if node in all_nodes:
319         if all_nodes[node].offline:
320           skip_dict[node] = RpcResult(node=node, offline=True, call=call)
321           continue
322         val = all_nodes[node].primary_ip
323       else:
324         val = None
325       addr_list.append(val)
326       name_list.append(node)
327     if name_list:
328       client.ConnectList(name_list, address_list=addr_list)
329     return skip_dict
330
331   def _ConnectNode(self, client, node, call):
332     """Helper for computing one node's address.
333
334     @type client: L{ganeti.rpc.Client}
335     @param client: a C{Client} instance
336     @type node: str
337     @param node: the node we should connect
338     @type call: string
339     @param call: the name of the remote procedure call, for filling in
340         correctly any eventual offline nodes' results
341
342     """
343     node_info = self._cfg.GetNodeInfo(node)
344     if node_info is not None:
345       if node_info.offline:
346         return RpcResult(node=node, offline=True, call=call)
347       addr = node_info.primary_ip
348     else:
349       addr = None
350     client.ConnectNode(node, address=addr)
351
352   def _MultiNodeCall(self, node_list, procedure, args):
353     """Helper for making a multi-node call
354
355     """
356     body = serializer.DumpJson(args, indent=False)
357     c = Client(procedure, body, self.port)
358     skip_dict = self._ConnectList(c, node_list, procedure)
359     skip_dict.update(c.GetResults())
360     return skip_dict
361
362   @classmethod
363   def _StaticMultiNodeCall(cls, node_list, procedure, args,
364                            address_list=None):
365     """Helper for making a multi-node static call
366
367     """
368     body = serializer.DumpJson(args, indent=False)
369     c = Client(procedure, body, utils.GetDaemonPort(constants.NODED))
370     c.ConnectList(node_list, address_list=address_list)
371     return c.GetResults()
372
373   def _SingleNodeCall(self, node, procedure, args):
374     """Helper for making a single-node call
375
376     """
377     body = serializer.DumpJson(args, indent=False)
378     c = Client(procedure, body, self.port)
379     result = self._ConnectNode(c, node, procedure)
380     if result is None:
381       # we did connect, node is not offline
382       result = c.GetResults()[node]
383     return result
384
385   @classmethod
386   def _StaticSingleNodeCall(cls, node, procedure, args):
387     """Helper for making a single-node static call
388
389     """
390     body = serializer.DumpJson(args, indent=False)
391     c = Client(procedure, body, utils.GetDaemonPort(constants.NODED))
392     c.ConnectNode(node)
393     return c.GetResults()[node]
394
395   @staticmethod
396   def _Compress(data):
397     """Compresses a string for transport over RPC.
398
399     Small amounts of data are not compressed.
400
401     @type data: str
402     @param data: Data
403     @rtype: tuple
404     @return: Encoded data to send
405
406     """
407     # Small amounts of data are not compressed
408     if len(data) < 512:
409       return (constants.RPC_ENCODING_NONE, data)
410
411     # Compress with zlib and encode in base64
412     return (constants.RPC_ENCODING_ZLIB_BASE64,
413             base64.b64encode(zlib.compress(data, 3)))
414
415   #
416   # Begin RPC calls
417   #
418
419   def call_lv_list(self, node_list, vg_name):
420     """Gets the logical volumes present in a given volume group.
421
422     This is a multi-node call.
423
424     """
425     return self._MultiNodeCall(node_list, "lv_list", [vg_name])
426
427   def call_vg_list(self, node_list):
428     """Gets the volume group list.
429
430     This is a multi-node call.
431
432     """
433     return self._MultiNodeCall(node_list, "vg_list", [])
434
435   def call_storage_list(self, node_list, su_name, su_args, name, fields):
436     """Get list of storage units.
437
438     This is a multi-node call.
439
440     """
441     return self._MultiNodeCall(node_list, "storage_list",
442                                [su_name, su_args, name, fields])
443
444   def call_storage_modify(self, node, su_name, su_args, name, changes):
445     """Modify a storage unit.
446
447     This is a single-node call.
448
449     """
450     return self._SingleNodeCall(node, "storage_modify",
451                                 [su_name, su_args, name, changes])
452
453   def call_bridges_exist(self, node, bridges_list):
454     """Checks if a node has all the bridges given.
455
456     This method checks if all bridges given in the bridges_list are
457     present on the remote node, so that an instance that uses interfaces
458     on those bridges can be started.
459
460     This is a single-node call.
461
462     """
463     return self._SingleNodeCall(node, "bridges_exist", [bridges_list])
464
465   def call_instance_start(self, node, instance, hvp, bep):
466     """Starts an instance.
467
468     This is a single-node call.
469
470     """
471     idict = self._InstDict(instance, hvp=hvp, bep=bep)
472     return self._SingleNodeCall(node, "instance_start", [idict])
473
474   def call_instance_shutdown(self, node, instance):
475     """Stops an instance.
476
477     This is a single-node call.
478
479     """
480     return self._SingleNodeCall(node, "instance_shutdown",
481                                 [self._InstDict(instance)])
482
483   def call_migration_info(self, node, instance):
484     """Gather the information necessary to prepare an instance migration.
485
486     This is a single-node call.
487
488     @type node: string
489     @param node: the node on which the instance is currently running
490     @type instance: C{objects.Instance}
491     @param instance: the instance definition
492
493     """
494     return self._SingleNodeCall(node, "migration_info",
495                                 [self._InstDict(instance)])
496
497   def call_accept_instance(self, node, instance, info, target):
498     """Prepare a node to accept an instance.
499
500     This is a single-node call.
501
502     @type node: string
503     @param node: the target node for the migration
504     @type instance: C{objects.Instance}
505     @param instance: the instance definition
506     @type info: opaque/hypervisor specific (string/data)
507     @param info: result for the call_migration_info call
508     @type target: string
509     @param target: target hostname (usually ip address) (on the node itself)
510
511     """
512     return self._SingleNodeCall(node, "accept_instance",
513                                 [self._InstDict(instance), info, target])
514
515   def call_finalize_migration(self, node, instance, info, success):
516     """Finalize any target-node migration specific operation.
517
518     This is called both in case of a successful migration and in case of error
519     (in which case it should abort the migration).
520
521     This is a single-node call.
522
523     @type node: string
524     @param node: the target node for the migration
525     @type instance: C{objects.Instance}
526     @param instance: the instance definition
527     @type info: opaque/hypervisor specific (string/data)
528     @param info: result for the call_migration_info call
529     @type success: boolean
530     @param success: whether the migration was a success or a failure
531
532     """
533     return self._SingleNodeCall(node, "finalize_migration",
534                                 [self._InstDict(instance), info, success])
535
536   def call_instance_migrate(self, node, instance, target, live):
537     """Migrate an instance.
538
539     This is a single-node call.
540
541     @type node: string
542     @param node: the node on which the instance is currently running
543     @type instance: C{objects.Instance}
544     @param instance: the instance definition
545     @type target: string
546     @param target: the target node name
547     @type live: boolean
548     @param live: whether the migration should be done live or not (the
549         interpretation of this parameter is left to the hypervisor)
550
551     """
552     return self._SingleNodeCall(node, "instance_migrate",
553                                 [self._InstDict(instance), target, live])
554
555   def call_instance_reboot(self, node, instance, reboot_type):
556     """Reboots an instance.
557
558     This is a single-node call.
559
560     """
561     return self._SingleNodeCall(node, "instance_reboot",
562                                 [self._InstDict(instance), reboot_type])
563
564   def call_instance_os_add(self, node, inst, reinstall):
565     """Installs an OS on the given instance.
566
567     This is a single-node call.
568
569     """
570     return self._SingleNodeCall(node, "instance_os_add",
571                                 [self._InstDict(inst), reinstall])
572
573   def call_instance_run_rename(self, node, inst, old_name):
574     """Run the OS rename script for an instance.
575
576     This is a single-node call.
577
578     """
579     return self._SingleNodeCall(node, "instance_run_rename",
580                                 [self._InstDict(inst), old_name])
581
582   def call_instance_info(self, node, instance, hname):
583     """Returns information about a single instance.
584
585     This is a single-node call.
586
587     @type node: list
588     @param node: the list of nodes to query
589     @type instance: string
590     @param instance: the instance name
591     @type hname: string
592     @param hname: the hypervisor type of the instance
593
594     """
595     return self._SingleNodeCall(node, "instance_info", [instance, hname])
596
597   def call_instance_migratable(self, node, instance):
598     """Checks whether the given instance can be migrated.
599
600     This is a single-node call.
601
602     @param node: the node to query
603     @type instance: L{objects.Instance}
604     @param instance: the instance to check
605
606
607     """
608     return self._SingleNodeCall(node, "instance_migratable",
609                                 [self._InstDict(instance)])
610
611   def call_all_instances_info(self, node_list, hypervisor_list):
612     """Returns information about all instances on the given nodes.
613
614     This is a multi-node call.
615
616     @type node_list: list
617     @param node_list: the list of nodes to query
618     @type hypervisor_list: list
619     @param hypervisor_list: the hypervisors to query for instances
620
621     """
622     return self._MultiNodeCall(node_list, "all_instances_info",
623                                [hypervisor_list])
624
625   def call_instance_list(self, node_list, hypervisor_list):
626     """Returns the list of running instances on a given node.
627
628     This is a multi-node call.
629
630     @type node_list: list
631     @param node_list: the list of nodes to query
632     @type hypervisor_list: list
633     @param hypervisor_list: the hypervisors to query for instances
634
635     """
636     return self._MultiNodeCall(node_list, "instance_list", [hypervisor_list])
637
638   def call_node_tcp_ping(self, node, source, target, port, timeout,
639                          live_port_needed):
640     """Do a TcpPing on the remote node
641
642     This is a single-node call.
643
644     """
645     return self._SingleNodeCall(node, "node_tcp_ping",
646                                 [source, target, port, timeout,
647                                  live_port_needed])
648
649   def call_node_has_ip_address(self, node, address):
650     """Checks if a node has the given IP address.
651
652     This is a single-node call.
653
654     """
655     return self._SingleNodeCall(node, "node_has_ip_address", [address])
656
657   def call_node_info(self, node_list, vg_name, hypervisor_type):
658     """Return node information.
659
660     This will return memory information and volume group size and free
661     space.
662
663     This is a multi-node call.
664
665     @type node_list: list
666     @param node_list: the list of nodes to query
667     @type vg_name: C{string}
668     @param vg_name: the name of the volume group to ask for disk space
669         information
670     @type hypervisor_type: C{str}
671     @param hypervisor_type: the name of the hypervisor to ask for
672         memory information
673
674     """
675     return self._MultiNodeCall(node_list, "node_info",
676                                [vg_name, hypervisor_type])
677
678   def call_node_add(self, node, dsa, dsapub, rsa, rsapub, ssh, sshpub):
679     """Add a node to the cluster.
680
681     This is a single-node call.
682
683     """
684     return self._SingleNodeCall(node, "node_add",
685                                 [dsa, dsapub, rsa, rsapub, ssh, sshpub])
686
687   def call_node_verify(self, node_list, checkdict, cluster_name):
688     """Request verification of given parameters.
689
690     This is a multi-node call.
691
692     """
693     return self._MultiNodeCall(node_list, "node_verify",
694                                [checkdict, cluster_name])
695
696   @classmethod
697   def call_node_start_master(cls, node, start_daemons, no_voting):
698     """Tells a node to activate itself as a master.
699
700     This is a single-node call.
701
702     """
703     return cls._StaticSingleNodeCall(node, "node_start_master",
704                                      [start_daemons, no_voting])
705
706   @classmethod
707   def call_node_stop_master(cls, node, stop_daemons):
708     """Tells a node to demote itself from master status.
709
710     This is a single-node call.
711
712     """
713     return cls._StaticSingleNodeCall(node, "node_stop_master", [stop_daemons])
714
715   @classmethod
716   def call_master_info(cls, node_list):
717     """Query master info.
718
719     This is a multi-node call.
720
721     """
722     # TODO: should this method query down nodes?
723     return cls._StaticMultiNodeCall(node_list, "master_info", [])
724
725   def call_version(self, node_list):
726     """Query node version.
727
728     This is a multi-node call.
729
730     """
731     return self._MultiNodeCall(node_list, "version", [])
732
733   def call_blockdev_create(self, node, bdev, size, owner, on_primary, info):
734     """Request creation of a given block device.
735
736     This is a single-node call.
737
738     """
739     return self._SingleNodeCall(node, "blockdev_create",
740                                 [bdev.ToDict(), size, owner, on_primary, info])
741
742   def call_blockdev_remove(self, node, bdev):
743     """Request removal of a given block device.
744
745     This is a single-node call.
746
747     """
748     return self._SingleNodeCall(node, "blockdev_remove", [bdev.ToDict()])
749
750   def call_blockdev_rename(self, node, devlist):
751     """Request rename of the given block devices.
752
753     This is a single-node call.
754
755     """
756     return self._SingleNodeCall(node, "blockdev_rename",
757                                 [(d.ToDict(), uid) for d, uid in devlist])
758
759   def call_blockdev_assemble(self, node, disk, owner, on_primary):
760     """Request assembling of a given block device.
761
762     This is a single-node call.
763
764     """
765     return self._SingleNodeCall(node, "blockdev_assemble",
766                                 [disk.ToDict(), owner, on_primary])
767
768   def call_blockdev_shutdown(self, node, disk):
769     """Request shutdown of a given block device.
770
771     This is a single-node call.
772
773     """
774     return self._SingleNodeCall(node, "blockdev_shutdown", [disk.ToDict()])
775
776   def call_blockdev_addchildren(self, node, bdev, ndevs):
777     """Request adding a list of children to a (mirroring) device.
778
779     This is a single-node call.
780
781     """
782     return self._SingleNodeCall(node, "blockdev_addchildren",
783                                 [bdev.ToDict(),
784                                  [disk.ToDict() for disk in ndevs]])
785
786   def call_blockdev_removechildren(self, node, bdev, ndevs):
787     """Request removing a list of children from a (mirroring) device.
788
789     This is a single-node call.
790
791     """
792     return self._SingleNodeCall(node, "blockdev_removechildren",
793                                 [bdev.ToDict(),
794                                  [disk.ToDict() for disk in ndevs]])
795
796   def call_blockdev_getmirrorstatus(self, node, disks):
797     """Request status of a (mirroring) device.
798
799     This is a single-node call.
800
801     """
802     return self._SingleNodeCall(node, "blockdev_getmirrorstatus",
803                                 [dsk.ToDict() for dsk in disks])
804
805   def call_blockdev_find(self, node, disk):
806     """Request identification of a given block device.
807
808     This is a single-node call.
809
810     """
811     return self._SingleNodeCall(node, "blockdev_find", [disk.ToDict()])
812
813   def call_blockdev_close(self, node, instance_name, disks):
814     """Closes the given block devices.
815
816     This is a single-node call.
817
818     """
819     params = [instance_name, [cf.ToDict() for cf in disks]]
820     return self._SingleNodeCall(node, "blockdev_close", params)
821
822   def call_drbd_disconnect_net(self, node_list, nodes_ip, disks):
823     """Disconnects the network of the given drbd devices.
824
825     This is a multi-node call.
826
827     """
828     return self._MultiNodeCall(node_list, "drbd_disconnect_net",
829                                [nodes_ip, [cf.ToDict() for cf in disks]])
830
831   def call_drbd_attach_net(self, node_list, nodes_ip,
832                            disks, instance_name, multimaster):
833     """Disconnects the given drbd devices.
834
835     This is a multi-node call.
836
837     """
838     return self._MultiNodeCall(node_list, "drbd_attach_net",
839                                [nodes_ip, [cf.ToDict() for cf in disks],
840                                 instance_name, multimaster])
841
842   def call_drbd_wait_sync(self, node_list, nodes_ip, disks):
843     """Waits for the synchronization of drbd devices is complete.
844
845     This is a multi-node call.
846
847     """
848     return self._MultiNodeCall(node_list, "drbd_wait_sync",
849                                [nodes_ip, [cf.ToDict() for cf in disks]])
850
851   @classmethod
852   def call_upload_file(cls, node_list, file_name, address_list=None):
853     """Upload a file.
854
855     The node will refuse the operation in case the file is not on the
856     approved file list.
857
858     This is a multi-node call.
859
860     @type node_list: list
861     @param node_list: the list of node names to upload to
862     @type file_name: str
863     @param file_name: the filename to upload
864     @type address_list: list or None
865     @keyword address_list: an optional list of node addresses, in order
866         to optimize the RPC speed
867
868     """
869     file_contents = utils.ReadFile(file_name)
870     data = cls._Compress(file_contents)
871     st = os.stat(file_name)
872     params = [file_name, data, st.st_mode, st.st_uid, st.st_gid,
873               st.st_atime, st.st_mtime]
874     return cls._StaticMultiNodeCall(node_list, "upload_file", params,
875                                     address_list=address_list)
876
877   @classmethod
878   def call_write_ssconf_files(cls, node_list, values):
879     """Write ssconf files.
880
881     This is a multi-node call.
882
883     """
884     return cls._StaticMultiNodeCall(node_list, "write_ssconf_files", [values])
885
886   def call_os_diagnose(self, node_list):
887     """Request a diagnose of OS definitions.
888
889     This is a multi-node call.
890
891     """
892     return self._MultiNodeCall(node_list, "os_diagnose", [])
893
894   def call_os_get(self, node, name):
895     """Returns an OS definition.
896
897     This is a single-node call.
898
899     """
900     result = self._SingleNodeCall(node, "os_get", [name])
901     if not result.failed and isinstance(result.data, dict):
902       result.data = objects.OS.FromDict(result.data)
903     return result
904
905   def call_hooks_runner(self, node_list, hpath, phase, env):
906     """Call the hooks runner.
907
908     Args:
909       - op: the OpCode instance
910       - env: a dictionary with the environment
911
912     This is a multi-node call.
913
914     """
915     params = [hpath, phase, env]
916     return self._MultiNodeCall(node_list, "hooks_runner", params)
917
918   def call_iallocator_runner(self, node, name, idata):
919     """Call an iallocator on a remote node
920
921     Args:
922       - name: the iallocator name
923       - input: the json-encoded input string
924
925     This is a single-node call.
926
927     """
928     return self._SingleNodeCall(node, "iallocator_runner", [name, idata])
929
930   def call_blockdev_grow(self, node, cf_bdev, amount):
931     """Request a snapshot of the given block device.
932
933     This is a single-node call.
934
935     """
936     return self._SingleNodeCall(node, "blockdev_grow",
937                                 [cf_bdev.ToDict(), amount])
938
939   def call_blockdev_snapshot(self, node, cf_bdev):
940     """Request a snapshot of the given block device.
941
942     This is a single-node call.
943
944     """
945     return self._SingleNodeCall(node, "blockdev_snapshot", [cf_bdev.ToDict()])
946
947   def call_snapshot_export(self, node, snap_bdev, dest_node, instance,
948                            cluster_name, idx):
949     """Request the export of a given snapshot.
950
951     This is a single-node call.
952
953     """
954     return self._SingleNodeCall(node, "snapshot_export",
955                                 [snap_bdev.ToDict(), dest_node,
956                                  self._InstDict(instance), cluster_name, idx])
957
958   def call_finalize_export(self, node, instance, snap_disks):
959     """Request the completion of an export operation.
960
961     This writes the export config file, etc.
962
963     This is a single-node call.
964
965     """
966     flat_disks = []
967     for disk in snap_disks:
968       if isinstance(disk, bool):
969         flat_disks.append(disk)
970       else:
971         flat_disks.append(disk.ToDict())
972
973     return self._SingleNodeCall(node, "finalize_export",
974                                 [self._InstDict(instance), flat_disks])
975
976   def call_export_info(self, node, path):
977     """Queries the export information in a given path.
978
979     This is a single-node call.
980
981     """
982     return self._SingleNodeCall(node, "export_info", [path])
983
984   def call_instance_os_import(self, node, inst, src_node, src_images,
985                               cluster_name):
986     """Request the import of a backup into an instance.
987
988     This is a single-node call.
989
990     """
991     return self._SingleNodeCall(node, "instance_os_import",
992                                 [self._InstDict(inst), src_node, src_images,
993                                  cluster_name])
994
995   def call_export_list(self, node_list):
996     """Gets the stored exports list.
997
998     This is a multi-node call.
999
1000     """
1001     return self._MultiNodeCall(node_list, "export_list", [])
1002
1003   def call_export_remove(self, node, export):
1004     """Requests removal of a given export.
1005
1006     This is a single-node call.
1007
1008     """
1009     return self._SingleNodeCall(node, "export_remove", [export])
1010
1011   @classmethod
1012   def call_node_leave_cluster(cls, node):
1013     """Requests a node to clean the cluster information it has.
1014
1015     This will remove the configuration information from the ganeti data
1016     dir.
1017
1018     This is a single-node call.
1019
1020     """
1021     return cls._StaticSingleNodeCall(node, "node_leave_cluster", [])
1022
1023   def call_node_volumes(self, node_list):
1024     """Gets all volumes on node(s).
1025
1026     This is a multi-node call.
1027
1028     """
1029     return self._MultiNodeCall(node_list, "node_volumes", [])
1030
1031   def call_node_demote_from_mc(self, node):
1032     """Demote a node from the master candidate role.
1033
1034     This is a single-node call.
1035
1036     """
1037     return self._SingleNodeCall(node, "node_demote_from_mc", [])
1038
1039
1040   def call_node_powercycle(self, node, hypervisor):
1041     """Tries to powercycle a node.
1042
1043     This is a single-node call.
1044
1045     """
1046     return self._SingleNodeCall(node, "node_powercycle", [hypervisor])
1047
1048
1049   def call_test_delay(self, node_list, duration):
1050     """Sleep for a fixed time on given node(s).
1051
1052     This is a multi-node call.
1053
1054     """
1055     return self._MultiNodeCall(node_list, "test_delay", [duration])
1056
1057   def call_file_storage_dir_create(self, node, file_storage_dir):
1058     """Create the given file storage directory.
1059
1060     This is a single-node call.
1061
1062     """
1063     return self._SingleNodeCall(node, "file_storage_dir_create",
1064                                 [file_storage_dir])
1065
1066   def call_file_storage_dir_remove(self, node, file_storage_dir):
1067     """Remove the given file storage directory.
1068
1069     This is a single-node call.
1070
1071     """
1072     return self._SingleNodeCall(node, "file_storage_dir_remove",
1073                                 [file_storage_dir])
1074
1075   def call_file_storage_dir_rename(self, node, old_file_storage_dir,
1076                                    new_file_storage_dir):
1077     """Rename file storage directory.
1078
1079     This is a single-node call.
1080
1081     """
1082     return self._SingleNodeCall(node, "file_storage_dir_rename",
1083                                 [old_file_storage_dir, new_file_storage_dir])
1084
1085   @classmethod
1086   def call_jobqueue_update(cls, node_list, address_list, file_name, content):
1087     """Update job queue.
1088
1089     This is a multi-node call.
1090
1091     """
1092     return cls._StaticMultiNodeCall(node_list, "jobqueue_update",
1093                                     [file_name, cls._Compress(content)],
1094                                     address_list=address_list)
1095
1096   @classmethod
1097   def call_jobqueue_purge(cls, node):
1098     """Purge job queue.
1099
1100     This is a single-node call.
1101
1102     """
1103     return cls._StaticSingleNodeCall(node, "jobqueue_purge", [])
1104
1105   @classmethod
1106   def call_jobqueue_rename(cls, node_list, address_list, rename):
1107     """Rename a job queue file.
1108
1109     This is a multi-node call.
1110
1111     """
1112     return cls._StaticMultiNodeCall(node_list, "jobqueue_rename", rename,
1113                                     address_list=address_list)
1114
1115   @classmethod
1116   def call_jobqueue_set_drain(cls, node_list, drain_flag):
1117     """Set the drain flag on the queue.
1118
1119     This is a multi-node call.
1120
1121     @type node_list: list
1122     @param node_list: the list of nodes to query
1123     @type drain_flag: bool
1124     @param drain_flag: if True, will set the drain flag, otherwise reset it.
1125
1126     """
1127     return cls._StaticMultiNodeCall(node_list, "jobqueue_set_drain",
1128                                     [drain_flag])
1129
1130   def call_hypervisor_validate_params(self, node_list, hvname, hvparams):
1131     """Validate the hypervisor params.
1132
1133     This is a multi-node call.
1134
1135     @type node_list: list
1136     @param node_list: the list of nodes to query
1137     @type hvname: string
1138     @param hvname: the hypervisor name
1139     @type hvparams: dict
1140     @param hvparams: the hypervisor parameters to be validated
1141
1142     """
1143     cluster = self._cfg.GetClusterInfo()
1144     hv_full = objects.FillDict(cluster.hvparams.get(hvname, {}), hvparams)
1145     return self._MultiNodeCall(node_list, "hypervisor_validate_params",
1146                                [hvname, hv_full])