locking: Convert pipe condition to new timeout class
[ganeti-local] / lib / rpc.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Inter-node RPC library.
23
24 """
25
26 # pylint: disable-msg=C0103,R0201,R0904
27 # C0103: Invalid name, since call_ are not valid
28 # R0201: Method could be a function, we keep all rpcs instance methods
29 # as not to change them back and forth between static/instance methods
30 # if they need to start using instance attributes
31 # R0904: Too many public methods
32
33 import os
34 import logging
35 import zlib
36 import base64
37
38 from ganeti import utils
39 from ganeti import objects
40 from ganeti import http
41 from ganeti import serializer
42 from ganeti import constants
43 from ganeti import errors
44
45 import ganeti.http.client
46
47
48 # Module level variable
49 _http_manager = None
50
51
52 def Init():
53   """Initializes the module-global HTTP client manager.
54
55   Must be called before using any RPC function.
56
57   """
58   global _http_manager
59
60   assert not _http_manager, "RPC module initialized more than once"
61
62   _http_manager = http.client.HttpClientManager()
63
64
65 def Shutdown():
66   """Stops the module-global HTTP client manager.
67
68   Must be called before quitting the program.
69
70   """
71   global _http_manager
72
73   if _http_manager:
74     _http_manager.Shutdown()
75     _http_manager = None
76
77
78 class RpcResult(object):
79   """RPC Result class.
80
81   This class holds an RPC result. It is needed since in multi-node
82   calls we can't raise an exception just because one one out of many
83   failed, and therefore we use this class to encapsulate the result.
84
85   @ivar data: the data payload, for successful results, or None
86   @ivar call: the name of the RPC call
87   @ivar node: the name of the node to which we made the call
88   @ivar offline: whether the operation failed because the node was
89       offline, as opposed to actual failure; offline=True will always
90       imply failed=True, in order to allow simpler checking if
91       the user doesn't care about the exact failure mode
92   @ivar fail_msg: the error message if the call failed
93
94   """
95   def __init__(self, data=None, failed=False, offline=False,
96                call=None, node=None):
97     self.offline = offline
98     self.call = call
99     self.node = node
100     if offline:
101       self.fail_msg = "Node is marked offline"
102       self.data = self.payload = None
103     elif failed:
104       self.fail_msg = self._EnsureErr(data)
105       self.data = self.payload = None
106     else:
107       self.data = data
108       if not isinstance(self.data, (tuple, list)):
109         self.fail_msg = ("RPC layer error: invalid result type (%s)" %
110                          type(self.data))
111       elif len(data) != 2:
112         self.fail_msg = ("RPC layer error: invalid result length (%d), "
113                          "expected 2" % len(self.data))
114       elif not self.data[0]:
115         self.fail_msg = self._EnsureErr(self.data[1])
116       else:
117         # finally success
118         self.fail_msg = None
119         self.payload = data[1]
120
121   @staticmethod
122   def _EnsureErr(val):
123     """Helper to ensure we return a 'True' value for error."""
124     if val:
125       return val
126     else:
127       return "No error information"
128
129   def Raise(self, msg, prereq=False):
130     """If the result has failed, raise an OpExecError.
131
132     This is used so that LU code doesn't have to check for each
133     result, but instead can call this function.
134
135     """
136     if not self.fail_msg:
137       return
138
139     if not msg: # one could pass None for default message
140       msg = ("Call '%s' to node '%s' has failed: %s" %
141              (self.call, self.node, self.fail_msg))
142     else:
143       msg = "%s: %s" % (msg, self.fail_msg)
144     if prereq:
145       ec = errors.OpPrereqError
146     else:
147       ec = errors.OpExecError
148     raise ec(msg)
149
150
151 class Client:
152   """RPC Client class.
153
154   This class, given a (remote) method name, a list of parameters and a
155   list of nodes, will contact (in parallel) all nodes, and return a
156   dict of results (key: node name, value: result).
157
158   One current bug is that generic failure is still signaled by
159   'False' result, which is not good. This overloading of values can
160   cause bugs.
161
162   """
163   def __init__(self, procedure, body, port):
164     self.procedure = procedure
165     self.body = body
166     self.port = port
167     self.nc = {}
168
169     self._ssl_params = \
170       http.HttpSslParams(ssl_key_path=constants.SSL_CERT_FILE,
171                          ssl_cert_path=constants.SSL_CERT_FILE)
172
173   def ConnectList(self, node_list, address_list=None):
174     """Add a list of nodes to the target nodes.
175
176     @type node_list: list
177     @param node_list: the list of node names to connect
178     @type address_list: list or None
179     @keyword address_list: either None or a list with node addresses,
180         which must have the same length as the node list
181
182     """
183     if address_list is None:
184       address_list = [None for _ in node_list]
185     else:
186       assert len(node_list) == len(address_list), \
187              "Name and address lists should have the same length"
188     for node, address in zip(node_list, address_list):
189       self.ConnectNode(node, address)
190
191   def ConnectNode(self, name, address=None):
192     """Add a node to the target list.
193
194     @type name: str
195     @param name: the node name
196     @type address: str
197     @keyword address: the node address, if known
198
199     """
200     if address is None:
201       address = name
202
203     self.nc[name] = \
204       http.client.HttpClientRequest(address, self.port, http.HTTP_PUT,
205                                     "/%s" % self.procedure,
206                                     post_data=self.body,
207                                     ssl_params=self._ssl_params,
208                                     ssl_verify_peer=True)
209
210   def GetResults(self):
211     """Call nodes and return results.
212
213     @rtype: list
214     @return: List of RPC results
215
216     """
217     assert _http_manager, "RPC module not initialized"
218
219     _http_manager.ExecRequests(self.nc.values())
220
221     results = {}
222
223     for name, req in self.nc.iteritems():
224       if req.success and req.resp_status_code == http.HTTP_OK:
225         results[name] = RpcResult(data=serializer.LoadJson(req.resp_body),
226                                   node=name, call=self.procedure)
227         continue
228
229       # TODO: Better error reporting
230       if req.error:
231         msg = req.error
232       else:
233         msg = req.resp_body
234
235       logging.error("RPC error in %s from node %s: %s",
236                     self.procedure, name, msg)
237       results[name] = RpcResult(data=msg, failed=True, node=name,
238                                 call=self.procedure)
239
240     return results
241
242
243 class RpcRunner(object):
244   """RPC runner class"""
245
246   def __init__(self, cfg):
247     """Initialized the rpc runner.
248
249     @type cfg:  C{config.ConfigWriter}
250     @param cfg: the configuration object that will be used to get data
251                 about the cluster
252
253     """
254     self._cfg = cfg
255     self.port = utils.GetDaemonPort(constants.NODED)
256
257   def _InstDict(self, instance, hvp=None, bep=None):
258     """Convert the given instance to a dict.
259
260     This is done via the instance's ToDict() method and additionally
261     we fill the hvparams with the cluster defaults.
262
263     @type instance: L{objects.Instance}
264     @param instance: an Instance object
265     @type hvp: dict or None
266     @param hvp: a dictionary with overridden hypervisor parameters
267     @type bep: dict or None
268     @param bep: a dictionary with overridden backend parameters
269     @rtype: dict
270     @return: the instance dict, with the hvparams filled with the
271         cluster defaults
272
273     """
274     idict = instance.ToDict()
275     cluster = self._cfg.GetClusterInfo()
276     idict["hvparams"] = cluster.FillHV(instance)
277     if hvp is not None:
278       idict["hvparams"].update(hvp)
279     idict["beparams"] = cluster.FillBE(instance)
280     if bep is not None:
281       idict["beparams"].update(bep)
282     for nic in idict["nics"]:
283       nic['nicparams'] = objects.FillDict(
284         cluster.nicparams[constants.PP_DEFAULT],
285         nic['nicparams'])
286     return idict
287
288   def _ConnectList(self, client, node_list, call):
289     """Helper for computing node addresses.
290
291     @type client: L{ganeti.rpc.Client}
292     @param client: a C{Client} instance
293     @type node_list: list
294     @param node_list: the node list we should connect
295     @type call: string
296     @param call: the name of the remote procedure call, for filling in
297         correctly any eventual offline nodes' results
298
299     """
300     all_nodes = self._cfg.GetAllNodesInfo()
301     name_list = []
302     addr_list = []
303     skip_dict = {}
304     for node in node_list:
305       if node in all_nodes:
306         if all_nodes[node].offline:
307           skip_dict[node] = RpcResult(node=node, offline=True, call=call)
308           continue
309         val = all_nodes[node].primary_ip
310       else:
311         val = None
312       addr_list.append(val)
313       name_list.append(node)
314     if name_list:
315       client.ConnectList(name_list, address_list=addr_list)
316     return skip_dict
317
318   def _ConnectNode(self, client, node, call):
319     """Helper for computing one node's address.
320
321     @type client: L{ganeti.rpc.Client}
322     @param client: a C{Client} instance
323     @type node: str
324     @param node: the node we should connect
325     @type call: string
326     @param call: the name of the remote procedure call, for filling in
327         correctly any eventual offline nodes' results
328
329     """
330     node_info = self._cfg.GetNodeInfo(node)
331     if node_info is not None:
332       if node_info.offline:
333         return RpcResult(node=node, offline=True, call=call)
334       addr = node_info.primary_ip
335     else:
336       addr = None
337     client.ConnectNode(node, address=addr)
338
339   def _MultiNodeCall(self, node_list, procedure, args):
340     """Helper for making a multi-node call
341
342     """
343     body = serializer.DumpJson(args, indent=False)
344     c = Client(procedure, body, self.port)
345     skip_dict = self._ConnectList(c, node_list, procedure)
346     skip_dict.update(c.GetResults())
347     return skip_dict
348
349   @classmethod
350   def _StaticMultiNodeCall(cls, node_list, procedure, args,
351                            address_list=None):
352     """Helper for making a multi-node static call
353
354     """
355     body = serializer.DumpJson(args, indent=False)
356     c = Client(procedure, body, utils.GetDaemonPort(constants.NODED))
357     c.ConnectList(node_list, address_list=address_list)
358     return c.GetResults()
359
360   def _SingleNodeCall(self, node, procedure, args):
361     """Helper for making a single-node call
362
363     """
364     body = serializer.DumpJson(args, indent=False)
365     c = Client(procedure, body, self.port)
366     result = self._ConnectNode(c, node, procedure)
367     if result is None:
368       # we did connect, node is not offline
369       result = c.GetResults()[node]
370     return result
371
372   @classmethod
373   def _StaticSingleNodeCall(cls, node, procedure, args):
374     """Helper for making a single-node static call
375
376     """
377     body = serializer.DumpJson(args, indent=False)
378     c = Client(procedure, body, utils.GetDaemonPort(constants.NODED))
379     c.ConnectNode(node)
380     return c.GetResults()[node]
381
382   @staticmethod
383   def _Compress(data):
384     """Compresses a string for transport over RPC.
385
386     Small amounts of data are not compressed.
387
388     @type data: str
389     @param data: Data
390     @rtype: tuple
391     @return: Encoded data to send
392
393     """
394     # Small amounts of data are not compressed
395     if len(data) < 512:
396       return (constants.RPC_ENCODING_NONE, data)
397
398     # Compress with zlib and encode in base64
399     return (constants.RPC_ENCODING_ZLIB_BASE64,
400             base64.b64encode(zlib.compress(data, 3)))
401
402   #
403   # Begin RPC calls
404   #
405
406   def call_lv_list(self, node_list, vg_name):
407     """Gets the logical volumes present in a given volume group.
408
409     This is a multi-node call.
410
411     """
412     return self._MultiNodeCall(node_list, "lv_list", [vg_name])
413
414   def call_vg_list(self, node_list):
415     """Gets the volume group list.
416
417     This is a multi-node call.
418
419     """
420     return self._MultiNodeCall(node_list, "vg_list", [])
421
422   def call_storage_list(self, node_list, su_name, su_args, name, fields):
423     """Get list of storage units.
424
425     This is a multi-node call.
426
427     """
428     return self._MultiNodeCall(node_list, "storage_list",
429                                [su_name, su_args, name, fields])
430
431   def call_storage_modify(self, node, su_name, su_args, name, changes):
432     """Modify a storage unit.
433
434     This is a single-node call.
435
436     """
437     return self._SingleNodeCall(node, "storage_modify",
438                                 [su_name, su_args, name, changes])
439
440   def call_storage_execute(self, node, su_name, su_args, name, op):
441     """Executes an operation on a storage unit.
442
443     This is a single-node call.
444
445     """
446     return self._SingleNodeCall(node, "storage_execute",
447                                 [su_name, su_args, name, op])
448
449   def call_bridges_exist(self, node, bridges_list):
450     """Checks if a node has all the bridges given.
451
452     This method checks if all bridges given in the bridges_list are
453     present on the remote node, so that an instance that uses interfaces
454     on those bridges can be started.
455
456     This is a single-node call.
457
458     """
459     return self._SingleNodeCall(node, "bridges_exist", [bridges_list])
460
461   def call_instance_start(self, node, instance, hvp, bep):
462     """Starts an instance.
463
464     This is a single-node call.
465
466     """
467     idict = self._InstDict(instance, hvp=hvp, bep=bep)
468     return self._SingleNodeCall(node, "instance_start", [idict])
469
470   def call_instance_shutdown(self, node, instance, timeout):
471     """Stops an instance.
472
473     This is a single-node call.
474
475     """
476     return self._SingleNodeCall(node, "instance_shutdown",
477                                 [self._InstDict(instance), timeout])
478
479   def call_migration_info(self, node, instance):
480     """Gather the information necessary to prepare an instance migration.
481
482     This is a single-node call.
483
484     @type node: string
485     @param node: the node on which the instance is currently running
486     @type instance: C{objects.Instance}
487     @param instance: the instance definition
488
489     """
490     return self._SingleNodeCall(node, "migration_info",
491                                 [self._InstDict(instance)])
492
493   def call_accept_instance(self, node, instance, info, target):
494     """Prepare a node to accept an instance.
495
496     This is a single-node call.
497
498     @type node: string
499     @param node: the target node for the migration
500     @type instance: C{objects.Instance}
501     @param instance: the instance definition
502     @type info: opaque/hypervisor specific (string/data)
503     @param info: result for the call_migration_info call
504     @type target: string
505     @param target: target hostname (usually ip address) (on the node itself)
506
507     """
508     return self._SingleNodeCall(node, "accept_instance",
509                                 [self._InstDict(instance), info, target])
510
511   def call_finalize_migration(self, node, instance, info, success):
512     """Finalize any target-node migration specific operation.
513
514     This is called both in case of a successful migration and in case of error
515     (in which case it should abort the migration).
516
517     This is a single-node call.
518
519     @type node: string
520     @param node: the target node for the migration
521     @type instance: C{objects.Instance}
522     @param instance: the instance definition
523     @type info: opaque/hypervisor specific (string/data)
524     @param info: result for the call_migration_info call
525     @type success: boolean
526     @param success: whether the migration was a success or a failure
527
528     """
529     return self._SingleNodeCall(node, "finalize_migration",
530                                 [self._InstDict(instance), info, success])
531
532   def call_instance_migrate(self, node, instance, target, live):
533     """Migrate an instance.
534
535     This is a single-node call.
536
537     @type node: string
538     @param node: the node on which the instance is currently running
539     @type instance: C{objects.Instance}
540     @param instance: the instance definition
541     @type target: string
542     @param target: the target node name
543     @type live: boolean
544     @param live: whether the migration should be done live or not (the
545         interpretation of this parameter is left to the hypervisor)
546
547     """
548     return self._SingleNodeCall(node, "instance_migrate",
549                                 [self._InstDict(instance), target, live])
550
551   def call_instance_reboot(self, node, inst, reboot_type, shutdown_timeout):
552     """Reboots an instance.
553
554     This is a single-node call.
555
556     """
557     return self._SingleNodeCall(node, "instance_reboot",
558                                 [self._InstDict(inst), reboot_type,
559                                  shutdown_timeout])
560
561   def call_instance_os_add(self, node, inst, reinstall):
562     """Installs an OS on the given instance.
563
564     This is a single-node call.
565
566     """
567     return self._SingleNodeCall(node, "instance_os_add",
568                                 [self._InstDict(inst), reinstall])
569
570   def call_instance_run_rename(self, node, inst, old_name):
571     """Run the OS rename script for an instance.
572
573     This is a single-node call.
574
575     """
576     return self._SingleNodeCall(node, "instance_run_rename",
577                                 [self._InstDict(inst), old_name])
578
579   def call_instance_info(self, node, instance, hname):
580     """Returns information about a single instance.
581
582     This is a single-node call.
583
584     @type node: list
585     @param node: the list of nodes to query
586     @type instance: string
587     @param instance: the instance name
588     @type hname: string
589     @param hname: the hypervisor type of the instance
590
591     """
592     return self._SingleNodeCall(node, "instance_info", [instance, hname])
593
594   def call_instance_migratable(self, node, instance):
595     """Checks whether the given instance can be migrated.
596
597     This is a single-node call.
598
599     @param node: the node to query
600     @type instance: L{objects.Instance}
601     @param instance: the instance to check
602
603
604     """
605     return self._SingleNodeCall(node, "instance_migratable",
606                                 [self._InstDict(instance)])
607
608   def call_all_instances_info(self, node_list, hypervisor_list):
609     """Returns information about all instances on the given nodes.
610
611     This is a multi-node call.
612
613     @type node_list: list
614     @param node_list: the list of nodes to query
615     @type hypervisor_list: list
616     @param hypervisor_list: the hypervisors to query for instances
617
618     """
619     return self._MultiNodeCall(node_list, "all_instances_info",
620                                [hypervisor_list])
621
622   def call_instance_list(self, node_list, hypervisor_list):
623     """Returns the list of running instances on a given node.
624
625     This is a multi-node call.
626
627     @type node_list: list
628     @param node_list: the list of nodes to query
629     @type hypervisor_list: list
630     @param hypervisor_list: the hypervisors to query for instances
631
632     """
633     return self._MultiNodeCall(node_list, "instance_list", [hypervisor_list])
634
635   def call_node_tcp_ping(self, node, source, target, port, timeout,
636                          live_port_needed):
637     """Do a TcpPing on the remote node
638
639     This is a single-node call.
640
641     """
642     return self._SingleNodeCall(node, "node_tcp_ping",
643                                 [source, target, port, timeout,
644                                  live_port_needed])
645
646   def call_node_has_ip_address(self, node, address):
647     """Checks if a node has the given IP address.
648
649     This is a single-node call.
650
651     """
652     return self._SingleNodeCall(node, "node_has_ip_address", [address])
653
654   def call_node_info(self, node_list, vg_name, hypervisor_type):
655     """Return node information.
656
657     This will return memory information and volume group size and free
658     space.
659
660     This is a multi-node call.
661
662     @type node_list: list
663     @param node_list: the list of nodes to query
664     @type vg_name: C{string}
665     @param vg_name: the name of the volume group to ask for disk space
666         information
667     @type hypervisor_type: C{str}
668     @param hypervisor_type: the name of the hypervisor to ask for
669         memory information
670
671     """
672     return self._MultiNodeCall(node_list, "node_info",
673                                [vg_name, hypervisor_type])
674
675   def call_node_add(self, node, dsa, dsapub, rsa, rsapub, ssh, sshpub):
676     """Add a node to the cluster.
677
678     This is a single-node call.
679
680     """
681     return self._SingleNodeCall(node, "node_add",
682                                 [dsa, dsapub, rsa, rsapub, ssh, sshpub])
683
684   def call_node_verify(self, node_list, checkdict, cluster_name):
685     """Request verification of given parameters.
686
687     This is a multi-node call.
688
689     """
690     return self._MultiNodeCall(node_list, "node_verify",
691                                [checkdict, cluster_name])
692
693   @classmethod
694   def call_node_start_master(cls, node, start_daemons, no_voting):
695     """Tells a node to activate itself as a master.
696
697     This is a single-node call.
698
699     """
700     return cls._StaticSingleNodeCall(node, "node_start_master",
701                                      [start_daemons, no_voting])
702
703   @classmethod
704   def call_node_stop_master(cls, node, stop_daemons):
705     """Tells a node to demote itself from master status.
706
707     This is a single-node call.
708
709     """
710     return cls._StaticSingleNodeCall(node, "node_stop_master", [stop_daemons])
711
712   @classmethod
713   def call_master_info(cls, node_list):
714     """Query master info.
715
716     This is a multi-node call.
717
718     """
719     # TODO: should this method query down nodes?
720     return cls._StaticMultiNodeCall(node_list, "master_info", [])
721
722   def call_version(self, node_list):
723     """Query node version.
724
725     This is a multi-node call.
726
727     """
728     return self._MultiNodeCall(node_list, "version", [])
729
730   def call_blockdev_create(self, node, bdev, size, owner, on_primary, info):
731     """Request creation of a given block device.
732
733     This is a single-node call.
734
735     """
736     return self._SingleNodeCall(node, "blockdev_create",
737                                 [bdev.ToDict(), size, owner, on_primary, info])
738
739   def call_blockdev_remove(self, node, bdev):
740     """Request removal of a given block device.
741
742     This is a single-node call.
743
744     """
745     return self._SingleNodeCall(node, "blockdev_remove", [bdev.ToDict()])
746
747   def call_blockdev_rename(self, node, devlist):
748     """Request rename of the given block devices.
749
750     This is a single-node call.
751
752     """
753     return self._SingleNodeCall(node, "blockdev_rename",
754                                 [(d.ToDict(), uid) for d, uid in devlist])
755
756   def call_blockdev_assemble(self, node, disk, owner, on_primary):
757     """Request assembling of a given block device.
758
759     This is a single-node call.
760
761     """
762     return self._SingleNodeCall(node, "blockdev_assemble",
763                                 [disk.ToDict(), owner, on_primary])
764
765   def call_blockdev_shutdown(self, node, disk):
766     """Request shutdown of a given block device.
767
768     This is a single-node call.
769
770     """
771     return self._SingleNodeCall(node, "blockdev_shutdown", [disk.ToDict()])
772
773   def call_blockdev_addchildren(self, node, bdev, ndevs):
774     """Request adding a list of children to a (mirroring) device.
775
776     This is a single-node call.
777
778     """
779     return self._SingleNodeCall(node, "blockdev_addchildren",
780                                 [bdev.ToDict(),
781                                  [disk.ToDict() for disk in ndevs]])
782
783   def call_blockdev_removechildren(self, node, bdev, ndevs):
784     """Request removing a list of children from a (mirroring) device.
785
786     This is a single-node call.
787
788     """
789     return self._SingleNodeCall(node, "blockdev_removechildren",
790                                 [bdev.ToDict(),
791                                  [disk.ToDict() for disk in ndevs]])
792
793   def call_blockdev_getmirrorstatus(self, node, disks):
794     """Request status of a (mirroring) device.
795
796     This is a single-node call.
797
798     """
799     result = self._SingleNodeCall(node, "blockdev_getmirrorstatus",
800                                   [dsk.ToDict() for dsk in disks])
801     if not result.fail_msg:
802       result.payload = [objects.BlockDevStatus.FromDict(i)
803                         for i in result.payload]
804     return result
805
806   def call_blockdev_find(self, node, disk):
807     """Request identification of a given block device.
808
809     This is a single-node call.
810
811     """
812     result = self._SingleNodeCall(node, "blockdev_find", [disk.ToDict()])
813     if not result.fail_msg and result.payload is not None:
814       result.payload = objects.BlockDevStatus.FromDict(result.payload)
815     return result
816
817   def call_blockdev_close(self, node, instance_name, disks):
818     """Closes the given block devices.
819
820     This is a single-node call.
821
822     """
823     params = [instance_name, [cf.ToDict() for cf in disks]]
824     return self._SingleNodeCall(node, "blockdev_close", params)
825
826   def call_blockdev_getsizes(self, node, disks):
827     """Returns the size of the given disks.
828
829     This is a single-node call.
830
831     """
832     params = [[cf.ToDict() for cf in disks]]
833     return self._SingleNodeCall(node, "blockdev_getsize", params)
834
835   def call_drbd_disconnect_net(self, node_list, nodes_ip, disks):
836     """Disconnects the network of the given drbd devices.
837
838     This is a multi-node call.
839
840     """
841     return self._MultiNodeCall(node_list, "drbd_disconnect_net",
842                                [nodes_ip, [cf.ToDict() for cf in disks]])
843
844   def call_drbd_attach_net(self, node_list, nodes_ip,
845                            disks, instance_name, multimaster):
846     """Disconnects the given drbd devices.
847
848     This is a multi-node call.
849
850     """
851     return self._MultiNodeCall(node_list, "drbd_attach_net",
852                                [nodes_ip, [cf.ToDict() for cf in disks],
853                                 instance_name, multimaster])
854
855   def call_drbd_wait_sync(self, node_list, nodes_ip, disks):
856     """Waits for the synchronization of drbd devices is complete.
857
858     This is a multi-node call.
859
860     """
861     return self._MultiNodeCall(node_list, "drbd_wait_sync",
862                                [nodes_ip, [cf.ToDict() for cf in disks]])
863
864   @classmethod
865   def call_upload_file(cls, node_list, file_name, address_list=None):
866     """Upload a file.
867
868     The node will refuse the operation in case the file is not on the
869     approved file list.
870
871     This is a multi-node call.
872
873     @type node_list: list
874     @param node_list: the list of node names to upload to
875     @type file_name: str
876     @param file_name: the filename to upload
877     @type address_list: list or None
878     @keyword address_list: an optional list of node addresses, in order
879         to optimize the RPC speed
880
881     """
882     file_contents = utils.ReadFile(file_name)
883     data = cls._Compress(file_contents)
884     st = os.stat(file_name)
885     params = [file_name, data, st.st_mode, st.st_uid, st.st_gid,
886               st.st_atime, st.st_mtime]
887     return cls._StaticMultiNodeCall(node_list, "upload_file", params,
888                                     address_list=address_list)
889
890   @classmethod
891   def call_write_ssconf_files(cls, node_list, values):
892     """Write ssconf files.
893
894     This is a multi-node call.
895
896     """
897     return cls._StaticMultiNodeCall(node_list, "write_ssconf_files", [values])
898
899   def call_os_diagnose(self, node_list):
900     """Request a diagnose of OS definitions.
901
902     This is a multi-node call.
903
904     """
905     return self._MultiNodeCall(node_list, "os_diagnose", [])
906
907   def call_os_get(self, node, name):
908     """Returns an OS definition.
909
910     This is a single-node call.
911
912     """
913     result = self._SingleNodeCall(node, "os_get", [name])
914     if not result.fail_msg and isinstance(result.payload, dict):
915       result.payload = objects.OS.FromDict(result.payload)
916     return result
917
918   def call_hooks_runner(self, node_list, hpath, phase, env):
919     """Call the hooks runner.
920
921     Args:
922       - op: the OpCode instance
923       - env: a dictionary with the environment
924
925     This is a multi-node call.
926
927     """
928     params = [hpath, phase, env]
929     return self._MultiNodeCall(node_list, "hooks_runner", params)
930
931   def call_iallocator_runner(self, node, name, idata):
932     """Call an iallocator on a remote node
933
934     Args:
935       - name: the iallocator name
936       - input: the json-encoded input string
937
938     This is a single-node call.
939
940     """
941     return self._SingleNodeCall(node, "iallocator_runner", [name, idata])
942
943   def call_blockdev_grow(self, node, cf_bdev, amount):
944     """Request a snapshot of the given block device.
945
946     This is a single-node call.
947
948     """
949     return self._SingleNodeCall(node, "blockdev_grow",
950                                 [cf_bdev.ToDict(), amount])
951
952   def call_blockdev_export(self, node, cf_bdev,
953                            dest_node, dest_path, cluster_name):
954     """Export a given disk to another node.
955
956     This is a single-node call.
957
958     """
959     return self._SingleNodeCall(node, "blockdev_export",
960                                 [cf_bdev.ToDict(), dest_node, dest_path,
961                                  cluster_name])
962
963   def call_blockdev_snapshot(self, node, cf_bdev):
964     """Request a snapshot of the given block device.
965
966     This is a single-node call.
967
968     """
969     return self._SingleNodeCall(node, "blockdev_snapshot", [cf_bdev.ToDict()])
970
971   def call_snapshot_export(self, node, snap_bdev, dest_node, instance,
972                            cluster_name, idx):
973     """Request the export of a given snapshot.
974
975     This is a single-node call.
976
977     """
978     return self._SingleNodeCall(node, "snapshot_export",
979                                 [snap_bdev.ToDict(), dest_node,
980                                  self._InstDict(instance), cluster_name, idx])
981
982   def call_finalize_export(self, node, instance, snap_disks):
983     """Request the completion of an export operation.
984
985     This writes the export config file, etc.
986
987     This is a single-node call.
988
989     """
990     flat_disks = []
991     for disk in snap_disks:
992       if isinstance(disk, bool):
993         flat_disks.append(disk)
994       else:
995         flat_disks.append(disk.ToDict())
996
997     return self._SingleNodeCall(node, "finalize_export",
998                                 [self._InstDict(instance), flat_disks])
999
1000   def call_export_info(self, node, path):
1001     """Queries the export information in a given path.
1002
1003     This is a single-node call.
1004
1005     """
1006     return self._SingleNodeCall(node, "export_info", [path])
1007
1008   def call_instance_os_import(self, node, inst, src_node, src_images,
1009                               cluster_name):
1010     """Request the import of a backup into an instance.
1011
1012     This is a single-node call.
1013
1014     """
1015     return self._SingleNodeCall(node, "instance_os_import",
1016                                 [self._InstDict(inst), src_node, src_images,
1017                                  cluster_name])
1018
1019   def call_export_list(self, node_list):
1020     """Gets the stored exports list.
1021
1022     This is a multi-node call.
1023
1024     """
1025     return self._MultiNodeCall(node_list, "export_list", [])
1026
1027   def call_export_remove(self, node, export):
1028     """Requests removal of a given export.
1029
1030     This is a single-node call.
1031
1032     """
1033     return self._SingleNodeCall(node, "export_remove", [export])
1034
1035   @classmethod
1036   def call_node_leave_cluster(cls, node):
1037     """Requests a node to clean the cluster information it has.
1038
1039     This will remove the configuration information from the ganeti data
1040     dir.
1041
1042     This is a single-node call.
1043
1044     """
1045     return cls._StaticSingleNodeCall(node, "node_leave_cluster", [])
1046
1047   def call_node_volumes(self, node_list):
1048     """Gets all volumes on node(s).
1049
1050     This is a multi-node call.
1051
1052     """
1053     return self._MultiNodeCall(node_list, "node_volumes", [])
1054
1055   def call_node_demote_from_mc(self, node):
1056     """Demote a node from the master candidate role.
1057
1058     This is a single-node call.
1059
1060     """
1061     return self._SingleNodeCall(node, "node_demote_from_mc", [])
1062
1063
1064   def call_node_powercycle(self, node, hypervisor):
1065     """Tries to powercycle a node.
1066
1067     This is a single-node call.
1068
1069     """
1070     return self._SingleNodeCall(node, "node_powercycle", [hypervisor])
1071
1072
1073   def call_test_delay(self, node_list, duration):
1074     """Sleep for a fixed time on given node(s).
1075
1076     This is a multi-node call.
1077
1078     """
1079     return self._MultiNodeCall(node_list, "test_delay", [duration])
1080
1081   def call_file_storage_dir_create(self, node, file_storage_dir):
1082     """Create the given file storage directory.
1083
1084     This is a single-node call.
1085
1086     """
1087     return self._SingleNodeCall(node, "file_storage_dir_create",
1088                                 [file_storage_dir])
1089
1090   def call_file_storage_dir_remove(self, node, file_storage_dir):
1091     """Remove the given file storage directory.
1092
1093     This is a single-node call.
1094
1095     """
1096     return self._SingleNodeCall(node, "file_storage_dir_remove",
1097                                 [file_storage_dir])
1098
1099   def call_file_storage_dir_rename(self, node, old_file_storage_dir,
1100                                    new_file_storage_dir):
1101     """Rename file storage directory.
1102
1103     This is a single-node call.
1104
1105     """
1106     return self._SingleNodeCall(node, "file_storage_dir_rename",
1107                                 [old_file_storage_dir, new_file_storage_dir])
1108
1109   @classmethod
1110   def call_jobqueue_update(cls, node_list, address_list, file_name, content):
1111     """Update job queue.
1112
1113     This is a multi-node call.
1114
1115     """
1116     return cls._StaticMultiNodeCall(node_list, "jobqueue_update",
1117                                     [file_name, cls._Compress(content)],
1118                                     address_list=address_list)
1119
1120   @classmethod
1121   def call_jobqueue_purge(cls, node):
1122     """Purge job queue.
1123
1124     This is a single-node call.
1125
1126     """
1127     return cls._StaticSingleNodeCall(node, "jobqueue_purge", [])
1128
1129   @classmethod
1130   def call_jobqueue_rename(cls, node_list, address_list, rename):
1131     """Rename a job queue file.
1132
1133     This is a multi-node call.
1134
1135     """
1136     return cls._StaticMultiNodeCall(node_list, "jobqueue_rename", rename,
1137                                     address_list=address_list)
1138
1139   @classmethod
1140   def call_jobqueue_set_drain(cls, node_list, drain_flag):
1141     """Set the drain flag on the queue.
1142
1143     This is a multi-node call.
1144
1145     @type node_list: list
1146     @param node_list: the list of nodes to query
1147     @type drain_flag: bool
1148     @param drain_flag: if True, will set the drain flag, otherwise reset it.
1149
1150     """
1151     return cls._StaticMultiNodeCall(node_list, "jobqueue_set_drain",
1152                                     [drain_flag])
1153
1154   def call_hypervisor_validate_params(self, node_list, hvname, hvparams):
1155     """Validate the hypervisor params.
1156
1157     This is a multi-node call.
1158
1159     @type node_list: list
1160     @param node_list: the list of nodes to query
1161     @type hvname: string
1162     @param hvname: the hypervisor name
1163     @type hvparams: dict
1164     @param hvparams: the hypervisor parameters to be validated
1165
1166     """
1167     cluster = self._cfg.GetClusterInfo()
1168     hv_full = objects.FillDict(cluster.hvparams.get(hvname, {}), hvparams)
1169     return self._MultiNodeCall(node_list, "hypervisor_validate_params",
1170                                [hvname, hv_full])