Generate a shared HMAC key at cluster init time
[ganeti-local] / lib / rpc.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Inter-node RPC library.
23
24 """
25
26 # pylint: disable-msg=C0103,R0201,R0904
27 # C0103: Invalid name, since call_ are not valid
28 # R0201: Method could be a function, we keep all rpcs instance methods
29 # as not to change them back and forth between static/instance methods
30 # if they need to start using instance attributes
31 # R0904: Too many public methods
32
33 import os
34 import logging
35 import zlib
36 import base64
37
38 from ganeti import utils
39 from ganeti import objects
40 from ganeti import http
41 from ganeti import serializer
42 from ganeti import constants
43 from ganeti import errors
44
45 import ganeti.http.client
46
47
48 # Module level variable
49 _http_manager = None
50
51
52 def Init():
53   """Initializes the module-global HTTP client manager.
54
55   Must be called before using any RPC function.
56
57   """
58   global _http_manager
59
60   assert not _http_manager, "RPC module initialized more than once"
61
62   _http_manager = http.client.HttpClientManager()
63
64
65 def Shutdown():
66   """Stops the module-global HTTP client manager.
67
68   Must be called before quitting the program.
69
70   """
71   global _http_manager
72
73   if _http_manager:
74     _http_manager.Shutdown()
75     _http_manager = None
76
77
78 class RpcResult(object):
79   """RPC Result class.
80
81   This class holds an RPC result. It is needed since in multi-node
82   calls we can't raise an exception just because one one out of many
83   failed, and therefore we use this class to encapsulate the result.
84
85   @ivar data: the data payload, for successful results, or None
86   @type failed: boolean
87   @ivar failed: whether the operation failed at transport level (not
88       application level on the remote node)
89   @ivar call: the name of the RPC call
90   @ivar node: the name of the node to which we made the call
91   @ivar offline: whether the operation failed because the node was
92       offline, as opposed to actual failure; offline=True will always
93       imply failed=True, in order to allow simpler checking if
94       the user doesn't care about the exact failure mode
95   @ivar fail_msg: the error message if the call failed
96
97   """
98   def __init__(self, data=None, failed=False, offline=False,
99                call=None, node=None):
100     self.failed = failed
101     self.offline = offline
102     self.call = call
103     self.node = node
104     if offline:
105       self.failed = True
106       self.fail_msg = "Node is marked offline"
107       self.data = self.payload = None
108     elif failed:
109       self.fail_msg = self._EnsureErr(data)
110       self.data = self.payload = None
111     else:
112       self.data = data
113       if not isinstance(self.data, (tuple, list)):
114         self.fail_msg = ("RPC layer error: invalid result type (%s)" %
115                          type(self.data))
116       elif len(data) != 2:
117         self.fail_msg = ("RPC layer error: invalid result length (%d), "
118                          "expected 2" % len(self.data))
119       elif not self.data[0]:
120         self.fail_msg = self._EnsureErr(self.data[1])
121       else:
122         # finally success
123         self.fail_msg = None
124         self.payload = data[1]
125
126   @staticmethod
127   def _EnsureErr(val):
128     """Helper to ensure we return a 'True' value for error."""
129     if val:
130       return val
131     else:
132       return "No error information"
133
134   def Raise(self, msg, prereq=False):
135     """If the result has failed, raise an OpExecError.
136
137     This is used so that LU code doesn't have to check for each
138     result, but instead can call this function.
139
140     """
141     if not self.fail_msg:
142       return
143
144     if not msg: # one could pass None for default message
145       msg = ("Call '%s' to node '%s' has failed: %s" %
146              (self.call, self.node, self.fail_msg))
147     else:
148       msg = "%s: %s" % (msg, self.fail_msg)
149     if prereq:
150       ec = errors.OpPrereqError
151     else:
152       ec = errors.OpExecError
153     raise ec(msg)
154
155   def RemoteFailMsg(self):
156     """Check if the remote procedure failed.
157
158     @return: the fail_msg attribute
159
160     """
161     return self.fail_msg
162
163
164 class Client:
165   """RPC Client class.
166
167   This class, given a (remote) method name, a list of parameters and a
168   list of nodes, will contact (in parallel) all nodes, and return a
169   dict of results (key: node name, value: result).
170
171   One current bug is that generic failure is still signaled by
172   'False' result, which is not good. This overloading of values can
173   cause bugs.
174
175   """
176   def __init__(self, procedure, body, port):
177     self.procedure = procedure
178     self.body = body
179     self.port = port
180     self.nc = {}
181
182     self._ssl_params = \
183       http.HttpSslParams(ssl_key_path=constants.SSL_CERT_FILE,
184                          ssl_cert_path=constants.SSL_CERT_FILE)
185
186   def ConnectList(self, node_list, address_list=None):
187     """Add a list of nodes to the target nodes.
188
189     @type node_list: list
190     @param node_list: the list of node names to connect
191     @type address_list: list or None
192     @keyword address_list: either None or a list with node addresses,
193         which must have the same length as the node list
194
195     """
196     if address_list is None:
197       address_list = [None for _ in node_list]
198     else:
199       assert len(node_list) == len(address_list), \
200              "Name and address lists should have the same length"
201     for node, address in zip(node_list, address_list):
202       self.ConnectNode(node, address)
203
204   def ConnectNode(self, name, address=None):
205     """Add a node to the target list.
206
207     @type name: str
208     @param name: the node name
209     @type address: str
210     @keyword address: the node address, if known
211
212     """
213     if address is None:
214       address = name
215
216     self.nc[name] = \
217       http.client.HttpClientRequest(address, self.port, http.HTTP_PUT,
218                                     "/%s" % self.procedure,
219                                     post_data=self.body,
220                                     ssl_params=self._ssl_params,
221                                     ssl_verify_peer=True)
222
223   def GetResults(self):
224     """Call nodes and return results.
225
226     @rtype: list
227     @return: List of RPC results
228
229     """
230     assert _http_manager, "RPC module not initialized"
231
232     _http_manager.ExecRequests(self.nc.values())
233
234     results = {}
235
236     for name, req in self.nc.iteritems():
237       if req.success and req.resp_status_code == http.HTTP_OK:
238         results[name] = RpcResult(data=serializer.LoadJson(req.resp_body),
239                                   node=name, call=self.procedure)
240         continue
241
242       # TODO: Better error reporting
243       if req.error:
244         msg = req.error
245       else:
246         msg = req.resp_body
247
248       logging.error("RPC error in %s from node %s: %s",
249                     self.procedure, name, msg)
250       results[name] = RpcResult(data=msg, failed=True, node=name,
251                                 call=self.procedure)
252
253     return results
254
255
256 class RpcRunner(object):
257   """RPC runner class"""
258
259   def __init__(self, cfg):
260     """Initialized the rpc runner.
261
262     @type cfg:  C{config.ConfigWriter}
263     @param cfg: the configuration object that will be used to get data
264                 about the cluster
265
266     """
267     self._cfg = cfg
268     self.port = utils.GetNodeDaemonPort()
269
270   def _InstDict(self, instance, hvp=None, bep=None):
271     """Convert the given instance to a dict.
272
273     This is done via the instance's ToDict() method and additionally
274     we fill the hvparams with the cluster defaults.
275
276     @type instance: L{objects.Instance}
277     @param instance: an Instance object
278     @type hvp: dict or None
279     @param hvp: a dictionary with overridden hypervisor parameters
280     @type bep: dict or None
281     @param bep: a dictionary with overridden backend parameters
282     @rtype: dict
283     @return: the instance dict, with the hvparams filled with the
284         cluster defaults
285
286     """
287     idict = instance.ToDict()
288     cluster = self._cfg.GetClusterInfo()
289     idict["hvparams"] = cluster.FillHV(instance)
290     if hvp is not None:
291       idict["hvparams"].update(hvp)
292     idict["beparams"] = cluster.FillBE(instance)
293     if bep is not None:
294       idict["beparams"].update(bep)
295     for nic in idict["nics"]:
296       nic['nicparams'] = objects.FillDict(
297         cluster.nicparams[constants.PP_DEFAULT],
298         nic['nicparams'])
299     return idict
300
301   def _ConnectList(self, client, node_list, call):
302     """Helper for computing node addresses.
303
304     @type client: L{ganeti.rpc.Client}
305     @param client: a C{Client} instance
306     @type node_list: list
307     @param node_list: the node list we should connect
308     @type call: string
309     @param call: the name of the remote procedure call, for filling in
310         correctly any eventual offline nodes' results
311
312     """
313     all_nodes = self._cfg.GetAllNodesInfo()
314     name_list = []
315     addr_list = []
316     skip_dict = {}
317     for node in node_list:
318       if node in all_nodes:
319         if all_nodes[node].offline:
320           skip_dict[node] = RpcResult(node=node, offline=True, call=call)
321           continue
322         val = all_nodes[node].primary_ip
323       else:
324         val = None
325       addr_list.append(val)
326       name_list.append(node)
327     if name_list:
328       client.ConnectList(name_list, address_list=addr_list)
329     return skip_dict
330
331   def _ConnectNode(self, client, node, call):
332     """Helper for computing one node's address.
333
334     @type client: L{ganeti.rpc.Client}
335     @param client: a C{Client} instance
336     @type node: str
337     @param node: the node we should connect
338     @type call: string
339     @param call: the name of the remote procedure call, for filling in
340         correctly any eventual offline nodes' results
341
342     """
343     node_info = self._cfg.GetNodeInfo(node)
344     if node_info is not None:
345       if node_info.offline:
346         return RpcResult(node=node, offline=True, call=call)
347       addr = node_info.primary_ip
348     else:
349       addr = None
350     client.ConnectNode(node, address=addr)
351
352   def _MultiNodeCall(self, node_list, procedure, args):
353     """Helper for making a multi-node call
354
355     """
356     body = serializer.DumpJson(args, indent=False)
357     c = Client(procedure, body, self.port)
358     skip_dict = self._ConnectList(c, node_list, procedure)
359     skip_dict.update(c.GetResults())
360     return skip_dict
361
362   @classmethod
363   def _StaticMultiNodeCall(cls, node_list, procedure, args,
364                            address_list=None):
365     """Helper for making a multi-node static call
366
367     """
368     body = serializer.DumpJson(args, indent=False)
369     c = Client(procedure, body, utils.GetNodeDaemonPort())
370     c.ConnectList(node_list, address_list=address_list)
371     return c.GetResults()
372
373   def _SingleNodeCall(self, node, procedure, args):
374     """Helper for making a single-node call
375
376     """
377     body = serializer.DumpJson(args, indent=False)
378     c = Client(procedure, body, self.port)
379     result = self._ConnectNode(c, node, procedure)
380     if result is None:
381       # we did connect, node is not offline
382       result = c.GetResults()[node]
383     return result
384
385   @classmethod
386   def _StaticSingleNodeCall(cls, node, procedure, args):
387     """Helper for making a single-node static call
388
389     """
390     body = serializer.DumpJson(args, indent=False)
391     c = Client(procedure, body, utils.GetNodeDaemonPort())
392     c.ConnectNode(node)
393     return c.GetResults()[node]
394
395   @staticmethod
396   def _Compress(data):
397     """Compresses a string for transport over RPC.
398
399     Small amounts of data are not compressed.
400
401     @type data: str
402     @param data: Data
403     @rtype: tuple
404     @return: Encoded data to send
405
406     """
407     # Small amounts of data are not compressed
408     if len(data) < 512:
409       return (constants.RPC_ENCODING_NONE, data)
410
411     # Compress with zlib and encode in base64
412     return (constants.RPC_ENCODING_ZLIB_BASE64,
413             base64.b64encode(zlib.compress(data, 3)))
414
415   #
416   # Begin RPC calls
417   #
418
419   def call_lv_list(self, node_list, vg_name):
420     """Gets the logical volumes present in a given volume group.
421
422     This is a multi-node call.
423
424     """
425     return self._MultiNodeCall(node_list, "lv_list", [vg_name])
426
427   def call_vg_list(self, node_list):
428     """Gets the volume group list.
429
430     This is a multi-node call.
431
432     """
433     return self._MultiNodeCall(node_list, "vg_list", [])
434
435   def call_bridges_exist(self, node, bridges_list):
436     """Checks if a node has all the bridges given.
437
438     This method checks if all bridges given in the bridges_list are
439     present on the remote node, so that an instance that uses interfaces
440     on those bridges can be started.
441
442     This is a single-node call.
443
444     """
445     return self._SingleNodeCall(node, "bridges_exist", [bridges_list])
446
447   def call_instance_start(self, node, instance, hvp, bep):
448     """Starts an instance.
449
450     This is a single-node call.
451
452     """
453     idict = self._InstDict(instance, hvp=hvp, bep=bep)
454     return self._SingleNodeCall(node, "instance_start", [idict])
455
456   def call_instance_shutdown(self, node, instance):
457     """Stops an instance.
458
459     This is a single-node call.
460
461     """
462     return self._SingleNodeCall(node, "instance_shutdown",
463                                 [self._InstDict(instance)])
464
465   def call_migration_info(self, node, instance):
466     """Gather the information necessary to prepare an instance migration.
467
468     This is a single-node call.
469
470     @type node: string
471     @param node: the node on which the instance is currently running
472     @type instance: C{objects.Instance}
473     @param instance: the instance definition
474
475     """
476     return self._SingleNodeCall(node, "migration_info",
477                                 [self._InstDict(instance)])
478
479   def call_accept_instance(self, node, instance, info, target):
480     """Prepare a node to accept an instance.
481
482     This is a single-node call.
483
484     @type node: string
485     @param node: the target node for the migration
486     @type instance: C{objects.Instance}
487     @param instance: the instance definition
488     @type info: opaque/hypervisor specific (string/data)
489     @param info: result for the call_migration_info call
490     @type target: string
491     @param target: target hostname (usually ip address) (on the node itself)
492
493     """
494     return self._SingleNodeCall(node, "accept_instance",
495                                 [self._InstDict(instance), info, target])
496
497   def call_finalize_migration(self, node, instance, info, success):
498     """Finalize any target-node migration specific operation.
499
500     This is called both in case of a successful migration and in case of error
501     (in which case it should abort the migration).
502
503     This is a single-node call.
504
505     @type node: string
506     @param node: the target node for the migration
507     @type instance: C{objects.Instance}
508     @param instance: the instance definition
509     @type info: opaque/hypervisor specific (string/data)
510     @param info: result for the call_migration_info call
511     @type success: boolean
512     @param success: whether the migration was a success or a failure
513
514     """
515     return self._SingleNodeCall(node, "finalize_migration",
516                                 [self._InstDict(instance), info, success])
517
518   def call_instance_migrate(self, node, instance, target, live):
519     """Migrate an instance.
520
521     This is a single-node call.
522
523     @type node: string
524     @param node: the node on which the instance is currently running
525     @type instance: C{objects.Instance}
526     @param instance: the instance definition
527     @type target: string
528     @param target: the target node name
529     @type live: boolean
530     @param live: whether the migration should be done live or not (the
531         interpretation of this parameter is left to the hypervisor)
532
533     """
534     return self._SingleNodeCall(node, "instance_migrate",
535                                 [self._InstDict(instance), target, live])
536
537   def call_instance_reboot(self, node, instance, reboot_type):
538     """Reboots an instance.
539
540     This is a single-node call.
541
542     """
543     return self._SingleNodeCall(node, "instance_reboot",
544                                 [self._InstDict(instance), reboot_type])
545
546   def call_instance_os_add(self, node, inst, reinstall):
547     """Installs an OS on the given instance.
548
549     This is a single-node call.
550
551     """
552     return self._SingleNodeCall(node, "instance_os_add",
553                                 [self._InstDict(inst), reinstall])
554
555   def call_instance_run_rename(self, node, inst, old_name):
556     """Run the OS rename script for an instance.
557
558     This is a single-node call.
559
560     """
561     return self._SingleNodeCall(node, "instance_run_rename",
562                                 [self._InstDict(inst), old_name])
563
564   def call_instance_info(self, node, instance, hname):
565     """Returns information about a single instance.
566
567     This is a single-node call.
568
569     @type node: list
570     @param node: the list of nodes to query
571     @type instance: string
572     @param instance: the instance name
573     @type hname: string
574     @param hname: the hypervisor type of the instance
575
576     """
577     return self._SingleNodeCall(node, "instance_info", [instance, hname])
578
579   def call_instance_migratable(self, node, instance):
580     """Checks whether the given instance can be migrated.
581
582     This is a single-node call.
583
584     @param node: the node to query
585     @type instance: L{objects.Instance}
586     @param instance: the instance to check
587
588
589     """
590     return self._SingleNodeCall(node, "instance_migratable",
591                                 [self._InstDict(instance)])
592
593   def call_all_instances_info(self, node_list, hypervisor_list):
594     """Returns information about all instances on the given nodes.
595
596     This is a multi-node call.
597
598     @type node_list: list
599     @param node_list: the list of nodes to query
600     @type hypervisor_list: list
601     @param hypervisor_list: the hypervisors to query for instances
602
603     """
604     return self._MultiNodeCall(node_list, "all_instances_info",
605                                [hypervisor_list])
606
607   def call_instance_list(self, node_list, hypervisor_list):
608     """Returns the list of running instances on a given node.
609
610     This is a multi-node call.
611
612     @type node_list: list
613     @param node_list: the list of nodes to query
614     @type hypervisor_list: list
615     @param hypervisor_list: the hypervisors to query for instances
616
617     """
618     return self._MultiNodeCall(node_list, "instance_list", [hypervisor_list])
619
620   def call_node_tcp_ping(self, node, source, target, port, timeout,
621                          live_port_needed):
622     """Do a TcpPing on the remote node
623
624     This is a single-node call.
625
626     """
627     return self._SingleNodeCall(node, "node_tcp_ping",
628                                 [source, target, port, timeout,
629                                  live_port_needed])
630
631   def call_node_has_ip_address(self, node, address):
632     """Checks if a node has the given IP address.
633
634     This is a single-node call.
635
636     """
637     return self._SingleNodeCall(node, "node_has_ip_address", [address])
638
639   def call_node_info(self, node_list, vg_name, hypervisor_type):
640     """Return node information.
641
642     This will return memory information and volume group size and free
643     space.
644
645     This is a multi-node call.
646
647     @type node_list: list
648     @param node_list: the list of nodes to query
649     @type vg_name: C{string}
650     @param vg_name: the name of the volume group to ask for disk space
651         information
652     @type hypervisor_type: C{str}
653     @param hypervisor_type: the name of the hypervisor to ask for
654         memory information
655
656     """
657     return self._MultiNodeCall(node_list, "node_info",
658                                [vg_name, hypervisor_type])
659
660   def call_node_add(self, node, dsa, dsapub, rsa, rsapub, ssh, sshpub):
661     """Add a node to the cluster.
662
663     This is a single-node call.
664
665     """
666     return self._SingleNodeCall(node, "node_add",
667                                 [dsa, dsapub, rsa, rsapub, ssh, sshpub])
668
669   def call_node_verify(self, node_list, checkdict, cluster_name):
670     """Request verification of given parameters.
671
672     This is a multi-node call.
673
674     """
675     return self._MultiNodeCall(node_list, "node_verify",
676                                [checkdict, cluster_name])
677
678   @classmethod
679   def call_node_start_master(cls, node, start_daemons, no_voting):
680     """Tells a node to activate itself as a master.
681
682     This is a single-node call.
683
684     """
685     return cls._StaticSingleNodeCall(node, "node_start_master",
686                                      [start_daemons, no_voting])
687
688   @classmethod
689   def call_node_stop_master(cls, node, stop_daemons):
690     """Tells a node to demote itself from master status.
691
692     This is a single-node call.
693
694     """
695     return cls._StaticSingleNodeCall(node, "node_stop_master", [stop_daemons])
696
697   @classmethod
698   def call_master_info(cls, node_list):
699     """Query master info.
700
701     This is a multi-node call.
702
703     """
704     # TODO: should this method query down nodes?
705     return cls._StaticMultiNodeCall(node_list, "master_info", [])
706
707   def call_version(self, node_list):
708     """Query node version.
709
710     This is a multi-node call.
711
712     """
713     return self._MultiNodeCall(node_list, "version", [])
714
715   def call_blockdev_create(self, node, bdev, size, owner, on_primary, info):
716     """Request creation of a given block device.
717
718     This is a single-node call.
719
720     """
721     return self._SingleNodeCall(node, "blockdev_create",
722                                 [bdev.ToDict(), size, owner, on_primary, info])
723
724   def call_blockdev_remove(self, node, bdev):
725     """Request removal of a given block device.
726
727     This is a single-node call.
728
729     """
730     return self._SingleNodeCall(node, "blockdev_remove", [bdev.ToDict()])
731
732   def call_blockdev_rename(self, node, devlist):
733     """Request rename of the given block devices.
734
735     This is a single-node call.
736
737     """
738     return self._SingleNodeCall(node, "blockdev_rename",
739                                 [(d.ToDict(), uid) for d, uid in devlist])
740
741   def call_blockdev_assemble(self, node, disk, owner, on_primary):
742     """Request assembling of a given block device.
743
744     This is a single-node call.
745
746     """
747     return self._SingleNodeCall(node, "blockdev_assemble",
748                                 [disk.ToDict(), owner, on_primary])
749
750   def call_blockdev_shutdown(self, node, disk):
751     """Request shutdown of a given block device.
752
753     This is a single-node call.
754
755     """
756     return self._SingleNodeCall(node, "blockdev_shutdown", [disk.ToDict()])
757
758   def call_blockdev_addchildren(self, node, bdev, ndevs):
759     """Request adding a list of children to a (mirroring) device.
760
761     This is a single-node call.
762
763     """
764     return self._SingleNodeCall(node, "blockdev_addchildren",
765                                 [bdev.ToDict(),
766                                  [disk.ToDict() for disk in ndevs]])
767
768   def call_blockdev_removechildren(self, node, bdev, ndevs):
769     """Request removing a list of children from a (mirroring) device.
770
771     This is a single-node call.
772
773     """
774     return self._SingleNodeCall(node, "blockdev_removechildren",
775                                 [bdev.ToDict(),
776                                  [disk.ToDict() for disk in ndevs]])
777
778   def call_blockdev_getmirrorstatus(self, node, disks):
779     """Request status of a (mirroring) device.
780
781     This is a single-node call.
782
783     """
784     return self._SingleNodeCall(node, "blockdev_getmirrorstatus",
785                                 [dsk.ToDict() for dsk in disks])
786
787   def call_blockdev_find(self, node, disk):
788     """Request identification of a given block device.
789
790     This is a single-node call.
791
792     """
793     return self._SingleNodeCall(node, "blockdev_find", [disk.ToDict()])
794
795   def call_blockdev_close(self, node, instance_name, disks):
796     """Closes the given block devices.
797
798     This is a single-node call.
799
800     """
801     params = [instance_name, [cf.ToDict() for cf in disks]]
802     return self._SingleNodeCall(node, "blockdev_close", params)
803
804   def call_drbd_disconnect_net(self, node_list, nodes_ip, disks):
805     """Disconnects the network of the given drbd devices.
806
807     This is a multi-node call.
808
809     """
810     return self._MultiNodeCall(node_list, "drbd_disconnect_net",
811                                [nodes_ip, [cf.ToDict() for cf in disks]])
812
813   def call_drbd_attach_net(self, node_list, nodes_ip,
814                            disks, instance_name, multimaster):
815     """Disconnects the given drbd devices.
816
817     This is a multi-node call.
818
819     """
820     return self._MultiNodeCall(node_list, "drbd_attach_net",
821                                [nodes_ip, [cf.ToDict() for cf in disks],
822                                 instance_name, multimaster])
823
824   def call_drbd_wait_sync(self, node_list, nodes_ip, disks):
825     """Waits for the synchronization of drbd devices is complete.
826
827     This is a multi-node call.
828
829     """
830     return self._MultiNodeCall(node_list, "drbd_wait_sync",
831                                [nodes_ip, [cf.ToDict() for cf in disks]])
832
833   @classmethod
834   def call_upload_file(cls, node_list, file_name, address_list=None):
835     """Upload a file.
836
837     The node will refuse the operation in case the file is not on the
838     approved file list.
839
840     This is a multi-node call.
841
842     @type node_list: list
843     @param node_list: the list of node names to upload to
844     @type file_name: str
845     @param file_name: the filename to upload
846     @type address_list: list or None
847     @keyword address_list: an optional list of node addresses, in order
848         to optimize the RPC speed
849
850     """
851     file_contents = utils.ReadFile(file_name)
852     data = cls._Compress(file_contents)
853     st = os.stat(file_name)
854     params = [file_name, data, st.st_mode, st.st_uid, st.st_gid,
855               st.st_atime, st.st_mtime]
856     return cls._StaticMultiNodeCall(node_list, "upload_file", params,
857                                     address_list=address_list)
858
859   @classmethod
860   def call_write_ssconf_files(cls, node_list, values):
861     """Write ssconf files.
862
863     This is a multi-node call.
864
865     """
866     return cls._StaticMultiNodeCall(node_list, "write_ssconf_files", [values])
867
868   def call_os_diagnose(self, node_list):
869     """Request a diagnose of OS definitions.
870
871     This is a multi-node call.
872
873     """
874     return self._MultiNodeCall(node_list, "os_diagnose", [])
875
876   def call_os_get(self, node, name):
877     """Returns an OS definition.
878
879     This is a single-node call.
880
881     """
882     result = self._SingleNodeCall(node, "os_get", [name])
883     if not result.failed and isinstance(result.data, dict):
884       result.data = objects.OS.FromDict(result.data)
885     return result
886
887   def call_hooks_runner(self, node_list, hpath, phase, env):
888     """Call the hooks runner.
889
890     Args:
891       - op: the OpCode instance
892       - env: a dictionary with the environment
893
894     This is a multi-node call.
895
896     """
897     params = [hpath, phase, env]
898     return self._MultiNodeCall(node_list, "hooks_runner", params)
899
900   def call_iallocator_runner(self, node, name, idata):
901     """Call an iallocator on a remote node
902
903     Args:
904       - name: the iallocator name
905       - input: the json-encoded input string
906
907     This is a single-node call.
908
909     """
910     return self._SingleNodeCall(node, "iallocator_runner", [name, idata])
911
912   def call_blockdev_grow(self, node, cf_bdev, amount):
913     """Request a snapshot of the given block device.
914
915     This is a single-node call.
916
917     """
918     return self._SingleNodeCall(node, "blockdev_grow",
919                                 [cf_bdev.ToDict(), amount])
920
921   def call_blockdev_snapshot(self, node, cf_bdev):
922     """Request a snapshot of the given block device.
923
924     This is a single-node call.
925
926     """
927     return self._SingleNodeCall(node, "blockdev_snapshot", [cf_bdev.ToDict()])
928
929   def call_snapshot_export(self, node, snap_bdev, dest_node, instance,
930                            cluster_name, idx):
931     """Request the export of a given snapshot.
932
933     This is a single-node call.
934
935     """
936     return self._SingleNodeCall(node, "snapshot_export",
937                                 [snap_bdev.ToDict(), dest_node,
938                                  self._InstDict(instance), cluster_name, idx])
939
940   def call_finalize_export(self, node, instance, snap_disks):
941     """Request the completion of an export operation.
942
943     This writes the export config file, etc.
944
945     This is a single-node call.
946
947     """
948     flat_disks = []
949     for disk in snap_disks:
950       if isinstance(disk, bool):
951         flat_disks.append(disk)
952       else:
953         flat_disks.append(disk.ToDict())
954
955     return self._SingleNodeCall(node, "finalize_export",
956                                 [self._InstDict(instance), flat_disks])
957
958   def call_export_info(self, node, path):
959     """Queries the export information in a given path.
960
961     This is a single-node call.
962
963     """
964     return self._SingleNodeCall(node, "export_info", [path])
965
966   def call_instance_os_import(self, node, inst, src_node, src_images,
967                               cluster_name):
968     """Request the import of a backup into an instance.
969
970     This is a single-node call.
971
972     """
973     return self._SingleNodeCall(node, "instance_os_import",
974                                 [self._InstDict(inst), src_node, src_images,
975                                  cluster_name])
976
977   def call_export_list(self, node_list):
978     """Gets the stored exports list.
979
980     This is a multi-node call.
981
982     """
983     return self._MultiNodeCall(node_list, "export_list", [])
984
985   def call_export_remove(self, node, export):
986     """Requests removal of a given export.
987
988     This is a single-node call.
989
990     """
991     return self._SingleNodeCall(node, "export_remove", [export])
992
993   @classmethod
994   def call_node_leave_cluster(cls, node):
995     """Requests a node to clean the cluster information it has.
996
997     This will remove the configuration information from the ganeti data
998     dir.
999
1000     This is a single-node call.
1001
1002     """
1003     return cls._StaticSingleNodeCall(node, "node_leave_cluster", [])
1004
1005   def call_node_volumes(self, node_list):
1006     """Gets all volumes on node(s).
1007
1008     This is a multi-node call.
1009
1010     """
1011     return self._MultiNodeCall(node_list, "node_volumes", [])
1012
1013   def call_node_demote_from_mc(self, node):
1014     """Demote a node from the master candidate role.
1015
1016     This is a single-node call.
1017
1018     """
1019     return self._SingleNodeCall(node, "node_demote_from_mc", [])
1020
1021
1022   def call_node_powercycle(self, node, hypervisor):
1023     """Tries to powercycle a node.
1024
1025     This is a single-node call.
1026
1027     """
1028     return self._SingleNodeCall(node, "node_powercycle", [hypervisor])
1029
1030
1031   def call_test_delay(self, node_list, duration):
1032     """Sleep for a fixed time on given node(s).
1033
1034     This is a multi-node call.
1035
1036     """
1037     return self._MultiNodeCall(node_list, "test_delay", [duration])
1038
1039   def call_file_storage_dir_create(self, node, file_storage_dir):
1040     """Create the given file storage directory.
1041
1042     This is a single-node call.
1043
1044     """
1045     return self._SingleNodeCall(node, "file_storage_dir_create",
1046                                 [file_storage_dir])
1047
1048   def call_file_storage_dir_remove(self, node, file_storage_dir):
1049     """Remove the given file storage directory.
1050
1051     This is a single-node call.
1052
1053     """
1054     return self._SingleNodeCall(node, "file_storage_dir_remove",
1055                                 [file_storage_dir])
1056
1057   def call_file_storage_dir_rename(self, node, old_file_storage_dir,
1058                                    new_file_storage_dir):
1059     """Rename file storage directory.
1060
1061     This is a single-node call.
1062
1063     """
1064     return self._SingleNodeCall(node, "file_storage_dir_rename",
1065                                 [old_file_storage_dir, new_file_storage_dir])
1066
1067   @classmethod
1068   def call_jobqueue_update(cls, node_list, address_list, file_name, content):
1069     """Update job queue.
1070
1071     This is a multi-node call.
1072
1073     """
1074     return cls._StaticMultiNodeCall(node_list, "jobqueue_update",
1075                                     [file_name, cls._Compress(content)],
1076                                     address_list=address_list)
1077
1078   @classmethod
1079   def call_jobqueue_purge(cls, node):
1080     """Purge job queue.
1081
1082     This is a single-node call.
1083
1084     """
1085     return cls._StaticSingleNodeCall(node, "jobqueue_purge", [])
1086
1087   @classmethod
1088   def call_jobqueue_rename(cls, node_list, address_list, rename):
1089     """Rename a job queue file.
1090
1091     This is a multi-node call.
1092
1093     """
1094     return cls._StaticMultiNodeCall(node_list, "jobqueue_rename", rename,
1095                                     address_list=address_list)
1096
1097   @classmethod
1098   def call_jobqueue_set_drain(cls, node_list, drain_flag):
1099     """Set the drain flag on the queue.
1100
1101     This is a multi-node call.
1102
1103     @type node_list: list
1104     @param node_list: the list of nodes to query
1105     @type drain_flag: bool
1106     @param drain_flag: if True, will set the drain flag, otherwise reset it.
1107
1108     """
1109     return cls._StaticMultiNodeCall(node_list, "jobqueue_set_drain",
1110                                     [drain_flag])
1111
1112   def call_hypervisor_validate_params(self, node_list, hvname, hvparams):
1113     """Validate the hypervisor params.
1114
1115     This is a multi-node call.
1116
1117     @type node_list: list
1118     @param node_list: the list of nodes to query
1119     @type hvname: string
1120     @param hvname: the hypervisor name
1121     @type hvparams: dict
1122     @param hvparams: the hypervisor parameters to be validated
1123
1124     """
1125     cluster = self._cfg.GetClusterInfo()
1126     hv_full = objects.FillDict(cluster.hvparams.get(hvname, {}), hvparams)
1127     return self._MultiNodeCall(node_list, "hypervisor_validate_params",
1128                                [hvname, hv_full])