Confd: add primary IPs queries
[ganeti-local] / lib / rpc.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Inter-node RPC library.
23
24 """
25
26 # pylint: disable-msg=C0103,R0201,R0904
27 # C0103: Invalid name, since call_ are not valid
28 # R0201: Method could be a function, we keep all rpcs instance methods
29 # as not to change them back and forth between static/instance methods
30 # if they need to start using instance attributes
31 # R0904: Too many public methods
32
33 import os
34 import logging
35 import zlib
36 import base64
37
38 from ganeti import utils
39 from ganeti import objects
40 from ganeti import http
41 from ganeti import serializer
42 from ganeti import constants
43 from ganeti import errors
44
45 import ganeti.http.client
46
47
48 # Module level variable
49 _http_manager = None
50
51
52 def Init():
53   """Initializes the module-global HTTP client manager.
54
55   Must be called before using any RPC function.
56
57   """
58   global _http_manager
59
60   assert not _http_manager, "RPC module initialized more than once"
61
62   _http_manager = http.client.HttpClientManager()
63
64
65 def Shutdown():
66   """Stops the module-global HTTP client manager.
67
68   Must be called before quitting the program.
69
70   """
71   global _http_manager
72
73   if _http_manager:
74     _http_manager.Shutdown()
75     _http_manager = None
76
77
78 class RpcResult(object):
79   """RPC Result class.
80
81   This class holds an RPC result. It is needed since in multi-node
82   calls we can't raise an exception just because one one out of many
83   failed, and therefore we use this class to encapsulate the result.
84
85   @ivar data: the data payload, for successful results, or None
86   @type failed: boolean
87   @ivar failed: whether the operation failed at transport level (not
88       application level on the remote node)
89   @ivar call: the name of the RPC call
90   @ivar node: the name of the node to which we made the call
91   @ivar offline: whether the operation failed because the node was
92       offline, as opposed to actual failure; offline=True will always
93       imply failed=True, in order to allow simpler checking if
94       the user doesn't care about the exact failure mode
95   @ivar fail_msg: the error message if the call failed
96
97   """
98   def __init__(self, data=None, failed=False, offline=False,
99                call=None, node=None):
100     self.failed = failed
101     self.offline = offline
102     self.call = call
103     self.node = node
104     if offline:
105       self.failed = True
106       self.fail_msg = "Node is marked offline"
107       self.data = self.payload = None
108     elif failed:
109       self.fail_msg = self._EnsureErr(data)
110       self.data = self.payload = None
111     else:
112       self.data = data
113       if not isinstance(self.data, (tuple, list)):
114         self.failed = True
115         self.fail_msg = ("RPC layer error: invalid result type (%s)" %
116                          type(self.data))
117       elif len(data) != 2:
118         self.failed = True
119         self.fail_msg = ("RPC layer error: invalid result length (%d), "
120                          "expected 2" % len(self.data))
121       elif not self.data[0]:
122         self.failed = True
123         self.fail_msg = self._EnsureErr(self.data[1])
124       else:
125         # finally success
126         self.fail_msg = None
127         self.payload = data[1]
128
129   @staticmethod
130   def _EnsureErr(val):
131     """Helper to ensure we return a 'True' value for error."""
132     if val:
133       return val
134     else:
135       return "No error information"
136
137   def Raise(self, msg, prereq=False):
138     """If the result has failed, raise an OpExecError.
139
140     This is used so that LU code doesn't have to check for each
141     result, but instead can call this function.
142
143     """
144     if not self.fail_msg:
145       return
146
147     if not msg: # one could pass None for default message
148       msg = ("Call '%s' to node '%s' has failed: %s" %
149              (self.call, self.node, self.fail_msg))
150     else:
151       msg = "%s: %s" % (msg, self.fail_msg)
152     if prereq:
153       ec = errors.OpPrereqError
154     else:
155       ec = errors.OpExecError
156     raise ec(msg)
157
158   def RemoteFailMsg(self):
159     """Check if the remote procedure failed.
160
161     @return: the fail_msg attribute
162
163     """
164     return self.fail_msg
165
166
167 class Client:
168   """RPC Client class.
169
170   This class, given a (remote) method name, a list of parameters and a
171   list of nodes, will contact (in parallel) all nodes, and return a
172   dict of results (key: node name, value: result).
173
174   One current bug is that generic failure is still signaled by
175   'False' result, which is not good. This overloading of values can
176   cause bugs.
177
178   """
179   def __init__(self, procedure, body, port):
180     self.procedure = procedure
181     self.body = body
182     self.port = port
183     self.nc = {}
184
185     self._ssl_params = \
186       http.HttpSslParams(ssl_key_path=constants.SSL_CERT_FILE,
187                          ssl_cert_path=constants.SSL_CERT_FILE)
188
189   def ConnectList(self, node_list, address_list=None):
190     """Add a list of nodes to the target nodes.
191
192     @type node_list: list
193     @param node_list: the list of node names to connect
194     @type address_list: list or None
195     @keyword address_list: either None or a list with node addresses,
196         which must have the same length as the node list
197
198     """
199     if address_list is None:
200       address_list = [None for _ in node_list]
201     else:
202       assert len(node_list) == len(address_list), \
203              "Name and address lists should have the same length"
204     for node, address in zip(node_list, address_list):
205       self.ConnectNode(node, address)
206
207   def ConnectNode(self, name, address=None):
208     """Add a node to the target list.
209
210     @type name: str
211     @param name: the node name
212     @type address: str
213     @keyword address: the node address, if known
214
215     """
216     if address is None:
217       address = name
218
219     self.nc[name] = \
220       http.client.HttpClientRequest(address, self.port, http.HTTP_PUT,
221                                     "/%s" % self.procedure,
222                                     post_data=self.body,
223                                     ssl_params=self._ssl_params,
224                                     ssl_verify_peer=True)
225
226   def GetResults(self):
227     """Call nodes and return results.
228
229     @rtype: list
230     @return: List of RPC results
231
232     """
233     assert _http_manager, "RPC module not initialized"
234
235     _http_manager.ExecRequests(self.nc.values())
236
237     results = {}
238
239     for name, req in self.nc.iteritems():
240       if req.success and req.resp_status_code == http.HTTP_OK:
241         results[name] = RpcResult(data=serializer.LoadJson(req.resp_body),
242                                   node=name, call=self.procedure)
243         continue
244
245       # TODO: Better error reporting
246       if req.error:
247         msg = req.error
248       else:
249         msg = req.resp_body
250
251       logging.error("RPC error in %s from node %s: %s",
252                     self.procedure, name, msg)
253       results[name] = RpcResult(data=msg, failed=True, node=name,
254                                 call=self.procedure)
255
256     return results
257
258
259 class RpcRunner(object):
260   """RPC runner class"""
261
262   def __init__(self, cfg):
263     """Initialized the rpc runner.
264
265     @type cfg:  C{config.ConfigWriter}
266     @param cfg: the configuration object that will be used to get data
267                 about the cluster
268
269     """
270     self._cfg = cfg
271     self.port = utils.GetDaemonPort(constants.NODED)
272
273   def _InstDict(self, instance, hvp=None, bep=None):
274     """Convert the given instance to a dict.
275
276     This is done via the instance's ToDict() method and additionally
277     we fill the hvparams with the cluster defaults.
278
279     @type instance: L{objects.Instance}
280     @param instance: an Instance object
281     @type hvp: dict or None
282     @param hvp: a dictionary with overridden hypervisor parameters
283     @type bep: dict or None
284     @param bep: a dictionary with overridden backend parameters
285     @rtype: dict
286     @return: the instance dict, with the hvparams filled with the
287         cluster defaults
288
289     """
290     idict = instance.ToDict()
291     cluster = self._cfg.GetClusterInfo()
292     idict["hvparams"] = cluster.FillHV(instance)
293     if hvp is not None:
294       idict["hvparams"].update(hvp)
295     idict["beparams"] = cluster.FillBE(instance)
296     if bep is not None:
297       idict["beparams"].update(bep)
298     for nic in idict["nics"]:
299       nic['nicparams'] = objects.FillDict(
300         cluster.nicparams[constants.PP_DEFAULT],
301         nic['nicparams'])
302     return idict
303
304   def _ConnectList(self, client, node_list, call):
305     """Helper for computing node addresses.
306
307     @type client: L{ganeti.rpc.Client}
308     @param client: a C{Client} instance
309     @type node_list: list
310     @param node_list: the node list we should connect
311     @type call: string
312     @param call: the name of the remote procedure call, for filling in
313         correctly any eventual offline nodes' results
314
315     """
316     all_nodes = self._cfg.GetAllNodesInfo()
317     name_list = []
318     addr_list = []
319     skip_dict = {}
320     for node in node_list:
321       if node in all_nodes:
322         if all_nodes[node].offline:
323           skip_dict[node] = RpcResult(node=node, offline=True, call=call)
324           continue
325         val = all_nodes[node].primary_ip
326       else:
327         val = None
328       addr_list.append(val)
329       name_list.append(node)
330     if name_list:
331       client.ConnectList(name_list, address_list=addr_list)
332     return skip_dict
333
334   def _ConnectNode(self, client, node, call):
335     """Helper for computing one node's address.
336
337     @type client: L{ganeti.rpc.Client}
338     @param client: a C{Client} instance
339     @type node: str
340     @param node: the node we should connect
341     @type call: string
342     @param call: the name of the remote procedure call, for filling in
343         correctly any eventual offline nodes' results
344
345     """
346     node_info = self._cfg.GetNodeInfo(node)
347     if node_info is not None:
348       if node_info.offline:
349         return RpcResult(node=node, offline=True, call=call)
350       addr = node_info.primary_ip
351     else:
352       addr = None
353     client.ConnectNode(node, address=addr)
354
355   def _MultiNodeCall(self, node_list, procedure, args):
356     """Helper for making a multi-node call
357
358     """
359     body = serializer.DumpJson(args, indent=False)
360     c = Client(procedure, body, self.port)
361     skip_dict = self._ConnectList(c, node_list, procedure)
362     skip_dict.update(c.GetResults())
363     return skip_dict
364
365   @classmethod
366   def _StaticMultiNodeCall(cls, node_list, procedure, args,
367                            address_list=None):
368     """Helper for making a multi-node static call
369
370     """
371     body = serializer.DumpJson(args, indent=False)
372     c = Client(procedure, body, utils.GetDaemonPort(constants.NODED))
373     c.ConnectList(node_list, address_list=address_list)
374     return c.GetResults()
375
376   def _SingleNodeCall(self, node, procedure, args):
377     """Helper for making a single-node call
378
379     """
380     body = serializer.DumpJson(args, indent=False)
381     c = Client(procedure, body, self.port)
382     result = self._ConnectNode(c, node, procedure)
383     if result is None:
384       # we did connect, node is not offline
385       result = c.GetResults()[node]
386     return result
387
388   @classmethod
389   def _StaticSingleNodeCall(cls, node, procedure, args):
390     """Helper for making a single-node static call
391
392     """
393     body = serializer.DumpJson(args, indent=False)
394     c = Client(procedure, body, utils.GetDaemonPort(constants.NODED))
395     c.ConnectNode(node)
396     return c.GetResults()[node]
397
398   @staticmethod
399   def _Compress(data):
400     """Compresses a string for transport over RPC.
401
402     Small amounts of data are not compressed.
403
404     @type data: str
405     @param data: Data
406     @rtype: tuple
407     @return: Encoded data to send
408
409     """
410     # Small amounts of data are not compressed
411     if len(data) < 512:
412       return (constants.RPC_ENCODING_NONE, data)
413
414     # Compress with zlib and encode in base64
415     return (constants.RPC_ENCODING_ZLIB_BASE64,
416             base64.b64encode(zlib.compress(data, 3)))
417
418   #
419   # Begin RPC calls
420   #
421
422   def call_lv_list(self, node_list, vg_name):
423     """Gets the logical volumes present in a given volume group.
424
425     This is a multi-node call.
426
427     """
428     return self._MultiNodeCall(node_list, "lv_list", [vg_name])
429
430   def call_vg_list(self, node_list):
431     """Gets the volume group list.
432
433     This is a multi-node call.
434
435     """
436     return self._MultiNodeCall(node_list, "vg_list", [])
437
438   def call_storage_list(self, node_list, su_name, su_args, name, fields):
439     """Get list of storage units.
440
441     This is a multi-node call.
442
443     """
444     return self._MultiNodeCall(node_list, "storage_list",
445                                [su_name, su_args, name, fields])
446
447   def call_storage_modify(self, node, su_name, su_args, name, changes):
448     """Modify a storage unit.
449
450     This is a single-node call.
451
452     """
453     return self._SingleNodeCall(node, "storage_modify",
454                                 [su_name, su_args, name, changes])
455
456   def call_storage_execute(self, node, su_name, su_args, name, op):
457     """Executes an operation on a storage unit.
458
459     This is a single-node call.
460
461     """
462     return self._SingleNodeCall(node, "storage_execute",
463                                 [su_name, su_args, name, op])
464
465   def call_bridges_exist(self, node, bridges_list):
466     """Checks if a node has all the bridges given.
467
468     This method checks if all bridges given in the bridges_list are
469     present on the remote node, so that an instance that uses interfaces
470     on those bridges can be started.
471
472     This is a single-node call.
473
474     """
475     return self._SingleNodeCall(node, "bridges_exist", [bridges_list])
476
477   def call_instance_start(self, node, instance, hvp, bep):
478     """Starts an instance.
479
480     This is a single-node call.
481
482     """
483     idict = self._InstDict(instance, hvp=hvp, bep=bep)
484     return self._SingleNodeCall(node, "instance_start", [idict])
485
486   def call_instance_shutdown(self, node, instance):
487     """Stops an instance.
488
489     This is a single-node call.
490
491     """
492     return self._SingleNodeCall(node, "instance_shutdown",
493                                 [self._InstDict(instance)])
494
495   def call_migration_info(self, node, instance):
496     """Gather the information necessary to prepare an instance migration.
497
498     This is a single-node call.
499
500     @type node: string
501     @param node: the node on which the instance is currently running
502     @type instance: C{objects.Instance}
503     @param instance: the instance definition
504
505     """
506     return self._SingleNodeCall(node, "migration_info",
507                                 [self._InstDict(instance)])
508
509   def call_accept_instance(self, node, instance, info, target):
510     """Prepare a node to accept an instance.
511
512     This is a single-node call.
513
514     @type node: string
515     @param node: the target node for the migration
516     @type instance: C{objects.Instance}
517     @param instance: the instance definition
518     @type info: opaque/hypervisor specific (string/data)
519     @param info: result for the call_migration_info call
520     @type target: string
521     @param target: target hostname (usually ip address) (on the node itself)
522
523     """
524     return self._SingleNodeCall(node, "accept_instance",
525                                 [self._InstDict(instance), info, target])
526
527   def call_finalize_migration(self, node, instance, info, success):
528     """Finalize any target-node migration specific operation.
529
530     This is called both in case of a successful migration and in case of error
531     (in which case it should abort the migration).
532
533     This is a single-node call.
534
535     @type node: string
536     @param node: the target node for the migration
537     @type instance: C{objects.Instance}
538     @param instance: the instance definition
539     @type info: opaque/hypervisor specific (string/data)
540     @param info: result for the call_migration_info call
541     @type success: boolean
542     @param success: whether the migration was a success or a failure
543
544     """
545     return self._SingleNodeCall(node, "finalize_migration",
546                                 [self._InstDict(instance), info, success])
547
548   def call_instance_migrate(self, node, instance, target, live):
549     """Migrate an instance.
550
551     This is a single-node call.
552
553     @type node: string
554     @param node: the node on which the instance is currently running
555     @type instance: C{objects.Instance}
556     @param instance: the instance definition
557     @type target: string
558     @param target: the target node name
559     @type live: boolean
560     @param live: whether the migration should be done live or not (the
561         interpretation of this parameter is left to the hypervisor)
562
563     """
564     return self._SingleNodeCall(node, "instance_migrate",
565                                 [self._InstDict(instance), target, live])
566
567   def call_instance_reboot(self, node, instance, reboot_type):
568     """Reboots an instance.
569
570     This is a single-node call.
571
572     """
573     return self._SingleNodeCall(node, "instance_reboot",
574                                 [self._InstDict(instance), reboot_type])
575
576   def call_instance_os_add(self, node, inst, reinstall):
577     """Installs an OS on the given instance.
578
579     This is a single-node call.
580
581     """
582     return self._SingleNodeCall(node, "instance_os_add",
583                                 [self._InstDict(inst), reinstall])
584
585   def call_instance_run_rename(self, node, inst, old_name):
586     """Run the OS rename script for an instance.
587
588     This is a single-node call.
589
590     """
591     return self._SingleNodeCall(node, "instance_run_rename",
592                                 [self._InstDict(inst), old_name])
593
594   def call_instance_info(self, node, instance, hname):
595     """Returns information about a single instance.
596
597     This is a single-node call.
598
599     @type node: list
600     @param node: the list of nodes to query
601     @type instance: string
602     @param instance: the instance name
603     @type hname: string
604     @param hname: the hypervisor type of the instance
605
606     """
607     return self._SingleNodeCall(node, "instance_info", [instance, hname])
608
609   def call_instance_migratable(self, node, instance):
610     """Checks whether the given instance can be migrated.
611
612     This is a single-node call.
613
614     @param node: the node to query
615     @type instance: L{objects.Instance}
616     @param instance: the instance to check
617
618
619     """
620     return self._SingleNodeCall(node, "instance_migratable",
621                                 [self._InstDict(instance)])
622
623   def call_all_instances_info(self, node_list, hypervisor_list):
624     """Returns information about all instances on the given nodes.
625
626     This is a multi-node call.
627
628     @type node_list: list
629     @param node_list: the list of nodes to query
630     @type hypervisor_list: list
631     @param hypervisor_list: the hypervisors to query for instances
632
633     """
634     return self._MultiNodeCall(node_list, "all_instances_info",
635                                [hypervisor_list])
636
637   def call_instance_list(self, node_list, hypervisor_list):
638     """Returns the list of running instances on a given node.
639
640     This is a multi-node call.
641
642     @type node_list: list
643     @param node_list: the list of nodes to query
644     @type hypervisor_list: list
645     @param hypervisor_list: the hypervisors to query for instances
646
647     """
648     return self._MultiNodeCall(node_list, "instance_list", [hypervisor_list])
649
650   def call_node_tcp_ping(self, node, source, target, port, timeout,
651                          live_port_needed):
652     """Do a TcpPing on the remote node
653
654     This is a single-node call.
655
656     """
657     return self._SingleNodeCall(node, "node_tcp_ping",
658                                 [source, target, port, timeout,
659                                  live_port_needed])
660
661   def call_node_has_ip_address(self, node, address):
662     """Checks if a node has the given IP address.
663
664     This is a single-node call.
665
666     """
667     return self._SingleNodeCall(node, "node_has_ip_address", [address])
668
669   def call_node_info(self, node_list, vg_name, hypervisor_type):
670     """Return node information.
671
672     This will return memory information and volume group size and free
673     space.
674
675     This is a multi-node call.
676
677     @type node_list: list
678     @param node_list: the list of nodes to query
679     @type vg_name: C{string}
680     @param vg_name: the name of the volume group to ask for disk space
681         information
682     @type hypervisor_type: C{str}
683     @param hypervisor_type: the name of the hypervisor to ask for
684         memory information
685
686     """
687     return self._MultiNodeCall(node_list, "node_info",
688                                [vg_name, hypervisor_type])
689
690   def call_node_add(self, node, dsa, dsapub, rsa, rsapub, ssh, sshpub):
691     """Add a node to the cluster.
692
693     This is a single-node call.
694
695     """
696     return self._SingleNodeCall(node, "node_add",
697                                 [dsa, dsapub, rsa, rsapub, ssh, sshpub])
698
699   def call_node_verify(self, node_list, checkdict, cluster_name):
700     """Request verification of given parameters.
701
702     This is a multi-node call.
703
704     """
705     return self._MultiNodeCall(node_list, "node_verify",
706                                [checkdict, cluster_name])
707
708   @classmethod
709   def call_node_start_master(cls, node, start_daemons, no_voting):
710     """Tells a node to activate itself as a master.
711
712     This is a single-node call.
713
714     """
715     return cls._StaticSingleNodeCall(node, "node_start_master",
716                                      [start_daemons, no_voting])
717
718   @classmethod
719   def call_node_stop_master(cls, node, stop_daemons):
720     """Tells a node to demote itself from master status.
721
722     This is a single-node call.
723
724     """
725     return cls._StaticSingleNodeCall(node, "node_stop_master", [stop_daemons])
726
727   @classmethod
728   def call_master_info(cls, node_list):
729     """Query master info.
730
731     This is a multi-node call.
732
733     """
734     # TODO: should this method query down nodes?
735     return cls._StaticMultiNodeCall(node_list, "master_info", [])
736
737   def call_version(self, node_list):
738     """Query node version.
739
740     This is a multi-node call.
741
742     """
743     return self._MultiNodeCall(node_list, "version", [])
744
745   def call_blockdev_create(self, node, bdev, size, owner, on_primary, info):
746     """Request creation of a given block device.
747
748     This is a single-node call.
749
750     """
751     return self._SingleNodeCall(node, "blockdev_create",
752                                 [bdev.ToDict(), size, owner, on_primary, info])
753
754   def call_blockdev_remove(self, node, bdev):
755     """Request removal of a given block device.
756
757     This is a single-node call.
758
759     """
760     return self._SingleNodeCall(node, "blockdev_remove", [bdev.ToDict()])
761
762   def call_blockdev_rename(self, node, devlist):
763     """Request rename of the given block devices.
764
765     This is a single-node call.
766
767     """
768     return self._SingleNodeCall(node, "blockdev_rename",
769                                 [(d.ToDict(), uid) for d, uid in devlist])
770
771   def call_blockdev_assemble(self, node, disk, owner, on_primary):
772     """Request assembling of a given block device.
773
774     This is a single-node call.
775
776     """
777     return self._SingleNodeCall(node, "blockdev_assemble",
778                                 [disk.ToDict(), owner, on_primary])
779
780   def call_blockdev_shutdown(self, node, disk):
781     """Request shutdown of a given block device.
782
783     This is a single-node call.
784
785     """
786     return self._SingleNodeCall(node, "blockdev_shutdown", [disk.ToDict()])
787
788   def call_blockdev_addchildren(self, node, bdev, ndevs):
789     """Request adding a list of children to a (mirroring) device.
790
791     This is a single-node call.
792
793     """
794     return self._SingleNodeCall(node, "blockdev_addchildren",
795                                 [bdev.ToDict(),
796                                  [disk.ToDict() for disk in ndevs]])
797
798   def call_blockdev_removechildren(self, node, bdev, ndevs):
799     """Request removing a list of children from a (mirroring) device.
800
801     This is a single-node call.
802
803     """
804     return self._SingleNodeCall(node, "blockdev_removechildren",
805                                 [bdev.ToDict(),
806                                  [disk.ToDict() for disk in ndevs]])
807
808   def call_blockdev_getmirrorstatus(self, node, disks):
809     """Request status of a (mirroring) device.
810
811     This is a single-node call.
812
813     """
814     result = self._SingleNodeCall(node, "blockdev_getmirrorstatus",
815                                   [dsk.ToDict() for dsk in disks])
816     if not (result.failed or result.fail_msg):
817       result.payload = [objects.BlockDevStatus.FromDict(i)
818                         for i in result.payload]
819     return result
820
821   def call_blockdev_find(self, node, disk):
822     """Request identification of a given block device.
823
824     This is a single-node call.
825
826     """
827     result = self._SingleNodeCall(node, "blockdev_find", [disk.ToDict()])
828     if not result.failed and result.payload is not None:
829       result.payload = objects.BlockDevStatus.FromDict(result.payload)
830     return result
831
832   def call_blockdev_close(self, node, instance_name, disks):
833     """Closes the given block devices.
834
835     This is a single-node call.
836
837     """
838     params = [instance_name, [cf.ToDict() for cf in disks]]
839     return self._SingleNodeCall(node, "blockdev_close", params)
840
841   def call_blockdev_getsizes(self, node, disks):
842     """Returns the size of the given disks.
843
844     This is a single-node call.
845
846     """
847     params = [[cf.ToDict() for cf in disks]]
848     return self._SingleNodeCall(node, "blockdev_getsize", params)
849
850   def call_drbd_disconnect_net(self, node_list, nodes_ip, disks):
851     """Disconnects the network of the given drbd devices.
852
853     This is a multi-node call.
854
855     """
856     return self._MultiNodeCall(node_list, "drbd_disconnect_net",
857                                [nodes_ip, [cf.ToDict() for cf in disks]])
858
859   def call_drbd_attach_net(self, node_list, nodes_ip,
860                            disks, instance_name, multimaster):
861     """Disconnects the given drbd devices.
862
863     This is a multi-node call.
864
865     """
866     return self._MultiNodeCall(node_list, "drbd_attach_net",
867                                [nodes_ip, [cf.ToDict() for cf in disks],
868                                 instance_name, multimaster])
869
870   def call_drbd_wait_sync(self, node_list, nodes_ip, disks):
871     """Waits for the synchronization of drbd devices is complete.
872
873     This is a multi-node call.
874
875     """
876     return self._MultiNodeCall(node_list, "drbd_wait_sync",
877                                [nodes_ip, [cf.ToDict() for cf in disks]])
878
879   @classmethod
880   def call_upload_file(cls, node_list, file_name, address_list=None):
881     """Upload a file.
882
883     The node will refuse the operation in case the file is not on the
884     approved file list.
885
886     This is a multi-node call.
887
888     @type node_list: list
889     @param node_list: the list of node names to upload to
890     @type file_name: str
891     @param file_name: the filename to upload
892     @type address_list: list or None
893     @keyword address_list: an optional list of node addresses, in order
894         to optimize the RPC speed
895
896     """
897     file_contents = utils.ReadFile(file_name)
898     data = cls._Compress(file_contents)
899     st = os.stat(file_name)
900     params = [file_name, data, st.st_mode, st.st_uid, st.st_gid,
901               st.st_atime, st.st_mtime]
902     return cls._StaticMultiNodeCall(node_list, "upload_file", params,
903                                     address_list=address_list)
904
905   @classmethod
906   def call_write_ssconf_files(cls, node_list, values):
907     """Write ssconf files.
908
909     This is a multi-node call.
910
911     """
912     return cls._StaticMultiNodeCall(node_list, "write_ssconf_files", [values])
913
914   def call_os_diagnose(self, node_list):
915     """Request a diagnose of OS definitions.
916
917     This is a multi-node call.
918
919     """
920     return self._MultiNodeCall(node_list, "os_diagnose", [])
921
922   def call_os_get(self, node, name):
923     """Returns an OS definition.
924
925     This is a single-node call.
926
927     """
928     result = self._SingleNodeCall(node, "os_get", [name])
929     if not result.failed and isinstance(result.data, dict):
930       result.data = objects.OS.FromDict(result.data)
931     return result
932
933   def call_hooks_runner(self, node_list, hpath, phase, env):
934     """Call the hooks runner.
935
936     Args:
937       - op: the OpCode instance
938       - env: a dictionary with the environment
939
940     This is a multi-node call.
941
942     """
943     params = [hpath, phase, env]
944     return self._MultiNodeCall(node_list, "hooks_runner", params)
945
946   def call_iallocator_runner(self, node, name, idata):
947     """Call an iallocator on a remote node
948
949     Args:
950       - name: the iallocator name
951       - input: the json-encoded input string
952
953     This is a single-node call.
954
955     """
956     return self._SingleNodeCall(node, "iallocator_runner", [name, idata])
957
958   def call_blockdev_grow(self, node, cf_bdev, amount):
959     """Request a snapshot of the given block device.
960
961     This is a single-node call.
962
963     """
964     return self._SingleNodeCall(node, "blockdev_grow",
965                                 [cf_bdev.ToDict(), amount])
966
967   def call_blockdev_export(self, node, cf_bdev,
968                            dest_node, dest_path, cluster_name):
969     """Export a given disk to another node.
970
971     This is a single-node call.
972
973     """
974     return self._SingleNodeCall(node, "blockdev_export",
975                                 [cf_bdev.ToDict(), dest_node, dest_path,
976                                  cluster_name])
977
978   def call_blockdev_snapshot(self, node, cf_bdev):
979     """Request a snapshot of the given block device.
980
981     This is a single-node call.
982
983     """
984     return self._SingleNodeCall(node, "blockdev_snapshot", [cf_bdev.ToDict()])
985
986   def call_snapshot_export(self, node, snap_bdev, dest_node, instance,
987                            cluster_name, idx):
988     """Request the export of a given snapshot.
989
990     This is a single-node call.
991
992     """
993     return self._SingleNodeCall(node, "snapshot_export",
994                                 [snap_bdev.ToDict(), dest_node,
995                                  self._InstDict(instance), cluster_name, idx])
996
997   def call_finalize_export(self, node, instance, snap_disks):
998     """Request the completion of an export operation.
999
1000     This writes the export config file, etc.
1001
1002     This is a single-node call.
1003
1004     """
1005     flat_disks = []
1006     for disk in snap_disks:
1007       if isinstance(disk, bool):
1008         flat_disks.append(disk)
1009       else:
1010         flat_disks.append(disk.ToDict())
1011
1012     return self._SingleNodeCall(node, "finalize_export",
1013                                 [self._InstDict(instance), flat_disks])
1014
1015   def call_export_info(self, node, path):
1016     """Queries the export information in a given path.
1017
1018     This is a single-node call.
1019
1020     """
1021     return self._SingleNodeCall(node, "export_info", [path])
1022
1023   def call_instance_os_import(self, node, inst, src_node, src_images,
1024                               cluster_name):
1025     """Request the import of a backup into an instance.
1026
1027     This is a single-node call.
1028
1029     """
1030     return self._SingleNodeCall(node, "instance_os_import",
1031                                 [self._InstDict(inst), src_node, src_images,
1032                                  cluster_name])
1033
1034   def call_export_list(self, node_list):
1035     """Gets the stored exports list.
1036
1037     This is a multi-node call.
1038
1039     """
1040     return self._MultiNodeCall(node_list, "export_list", [])
1041
1042   def call_export_remove(self, node, export):
1043     """Requests removal of a given export.
1044
1045     This is a single-node call.
1046
1047     """
1048     return self._SingleNodeCall(node, "export_remove", [export])
1049
1050   @classmethod
1051   def call_node_leave_cluster(cls, node):
1052     """Requests a node to clean the cluster information it has.
1053
1054     This will remove the configuration information from the ganeti data
1055     dir.
1056
1057     This is a single-node call.
1058
1059     """
1060     return cls._StaticSingleNodeCall(node, "node_leave_cluster", [])
1061
1062   def call_node_volumes(self, node_list):
1063     """Gets all volumes on node(s).
1064
1065     This is a multi-node call.
1066
1067     """
1068     return self._MultiNodeCall(node_list, "node_volumes", [])
1069
1070   def call_node_demote_from_mc(self, node):
1071     """Demote a node from the master candidate role.
1072
1073     This is a single-node call.
1074
1075     """
1076     return self._SingleNodeCall(node, "node_demote_from_mc", [])
1077
1078
1079   def call_node_powercycle(self, node, hypervisor):
1080     """Tries to powercycle a node.
1081
1082     This is a single-node call.
1083
1084     """
1085     return self._SingleNodeCall(node, "node_powercycle", [hypervisor])
1086
1087
1088   def call_test_delay(self, node_list, duration):
1089     """Sleep for a fixed time on given node(s).
1090
1091     This is a multi-node call.
1092
1093     """
1094     return self._MultiNodeCall(node_list, "test_delay", [duration])
1095
1096   def call_file_storage_dir_create(self, node, file_storage_dir):
1097     """Create the given file storage directory.
1098
1099     This is a single-node call.
1100
1101     """
1102     return self._SingleNodeCall(node, "file_storage_dir_create",
1103                                 [file_storage_dir])
1104
1105   def call_file_storage_dir_remove(self, node, file_storage_dir):
1106     """Remove the given file storage directory.
1107
1108     This is a single-node call.
1109
1110     """
1111     return self._SingleNodeCall(node, "file_storage_dir_remove",
1112                                 [file_storage_dir])
1113
1114   def call_file_storage_dir_rename(self, node, old_file_storage_dir,
1115                                    new_file_storage_dir):
1116     """Rename file storage directory.
1117
1118     This is a single-node call.
1119
1120     """
1121     return self._SingleNodeCall(node, "file_storage_dir_rename",
1122                                 [old_file_storage_dir, new_file_storage_dir])
1123
1124   @classmethod
1125   def call_jobqueue_update(cls, node_list, address_list, file_name, content):
1126     """Update job queue.
1127
1128     This is a multi-node call.
1129
1130     """
1131     return cls._StaticMultiNodeCall(node_list, "jobqueue_update",
1132                                     [file_name, cls._Compress(content)],
1133                                     address_list=address_list)
1134
1135   @classmethod
1136   def call_jobqueue_purge(cls, node):
1137     """Purge job queue.
1138
1139     This is a single-node call.
1140
1141     """
1142     return cls._StaticSingleNodeCall(node, "jobqueue_purge", [])
1143
1144   @classmethod
1145   def call_jobqueue_rename(cls, node_list, address_list, rename):
1146     """Rename a job queue file.
1147
1148     This is a multi-node call.
1149
1150     """
1151     return cls._StaticMultiNodeCall(node_list, "jobqueue_rename", rename,
1152                                     address_list=address_list)
1153
1154   @classmethod
1155   def call_jobqueue_set_drain(cls, node_list, drain_flag):
1156     """Set the drain flag on the queue.
1157
1158     This is a multi-node call.
1159
1160     @type node_list: list
1161     @param node_list: the list of nodes to query
1162     @type drain_flag: bool
1163     @param drain_flag: if True, will set the drain flag, otherwise reset it.
1164
1165     """
1166     return cls._StaticMultiNodeCall(node_list, "jobqueue_set_drain",
1167                                     [drain_flag])
1168
1169   def call_hypervisor_validate_params(self, node_list, hvname, hvparams):
1170     """Validate the hypervisor params.
1171
1172     This is a multi-node call.
1173
1174     @type node_list: list
1175     @param node_list: the list of nodes to query
1176     @type hvname: string
1177     @param hvname: the hypervisor name
1178     @type hvparams: dict
1179     @param hvparams: the hypervisor parameters to be validated
1180
1181     """
1182     cluster = self._cfg.GetClusterInfo()
1183     hv_full = objects.FillDict(cluster.hvparams.get(hvname, {}), hvparams)
1184     return self._MultiNodeCall(node_list, "hypervisor_validate_params",
1185                                [hvname, hv_full])