GetShellCommand: get hvparams and beparams
[ganeti-local] / lib / rpc.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Inter-node RPC library.
23
24 """
25
26 # pylint: disable-msg=C0103,R0201,R0904
27 # C0103: Invalid name, since call_ are not valid
28 # R0201: Method could be a function, we keep all rpcs instance methods
29 # as not to change them back and forth between static/instance methods
30 # if they need to start using instance attributes
31 # R0904: Too many public methods
32
33 import os
34 import socket
35 import logging
36 import zlib
37 import base64
38
39 from ganeti import utils
40 from ganeti import objects
41 from ganeti import http
42 from ganeti import serializer
43 from ganeti import constants
44 from ganeti import errors
45
46 import ganeti.http.client
47
48
49 # Module level variable
50 _http_manager = None
51
52
53 def Init():
54   """Initializes the module-global HTTP client manager.
55
56   Must be called before using any RPC function.
57
58   """
59   global _http_manager
60
61   assert not _http_manager, "RPC module initialized more than once"
62
63   _http_manager = http.client.HttpClientManager()
64
65
66 def Shutdown():
67   """Stops the module-global HTTP client manager.
68
69   Must be called before quitting the program.
70
71   """
72   global _http_manager
73
74   if _http_manager:
75     _http_manager.Shutdown()
76     _http_manager = None
77
78
79 class RpcResult(object):
80   """RPC Result class.
81
82   This class holds an RPC result. It is needed since in multi-node
83   calls we can't raise an exception just because one one out of many
84   failed, and therefore we use this class to encapsulate the result.
85
86   @ivar data: the data payload, for successfull results, or None
87   @type failed: boolean
88   @ivar failed: whether the operation failed at RPC level (not
89       application level on the remote node)
90   @ivar call: the name of the RPC call
91   @ivar node: the name of the node to which we made the call
92   @ivar offline: whether the operation failed because the node was
93       offline, as opposed to actual failure; offline=True will always
94       imply failed=True, in order to allow simpler checking if
95       the user doesn't care about the exact failure mode
96
97   """
98   def __init__(self, data=None, failed=False, offline=False,
99                call=None, node=None):
100     self.failed = failed
101     self.offline = offline
102     self.call = call
103     self.node = node
104     if offline:
105       self.failed = True
106       self.error = "Node is marked offline"
107       self.data = None
108     elif failed:
109       self.error = data
110       self.data = None
111     else:
112       self.data = data
113       self.error = None
114
115   def Raise(self):
116     """If the result has failed, raise an OpExecError.
117
118     This is used so that LU code doesn't have to check for each
119     result, but instead can call this function.
120
121     """
122     if self.failed:
123       raise errors.OpExecError("Call '%s' to node '%s' has failed: %s" %
124                                (self.call, self.node, self.error))
125
126   def RemoteFailMsg(self):
127     """Check if the remote procedure failed.
128
129     This is valid only for RPC calls which return result of the form
130     (status, data | error_msg).
131
132     @return: empty string for succcess, otherwise an error message
133
134     """
135     def _EnsureErr(val):
136       """Helper to ensure we return a 'True' value for error."""
137       if val:
138         return val
139       else:
140         return "No error information"
141
142     if self.failed:
143       return _EnsureErr(self.error)
144     if not isinstance(self.data, (tuple, list)):
145       return "Invalid result type (%s)" % type(self.data)
146     if len(self.data) != 2:
147       return "Invalid result length (%d), expected 2" % len(self.data)
148     if not self.data[0]:
149       return _EnsureErr(self.data[1])
150     return ""
151
152
153 class Client:
154   """RPC Client class.
155
156   This class, given a (remote) method name, a list of parameters and a
157   list of nodes, will contact (in parallel) all nodes, and return a
158   dict of results (key: node name, value: result).
159
160   One current bug is that generic failure is still signalled by
161   'False' result, which is not good. This overloading of values can
162   cause bugs.
163
164   """
165   def __init__(self, procedure, body, port):
166     self.procedure = procedure
167     self.body = body
168     self.port = port
169     self.nc = {}
170
171     self._ssl_params = \
172       http.HttpSslParams(ssl_key_path=constants.SSL_CERT_FILE,
173                          ssl_cert_path=constants.SSL_CERT_FILE)
174
175   def ConnectList(self, node_list, address_list=None):
176     """Add a list of nodes to the target nodes.
177
178     @type node_list: list
179     @param node_list: the list of node names to connect
180     @type address_list: list or None
181     @keyword address_list: either None or a list with node addresses,
182         which must have the same length as the node list
183
184     """
185     if address_list is None:
186       address_list = [None for _ in node_list]
187     else:
188       assert len(node_list) == len(address_list), \
189              "Name and address lists should have the same length"
190     for node, address in zip(node_list, address_list):
191       self.ConnectNode(node, address)
192
193   def ConnectNode(self, name, address=None):
194     """Add a node to the target list.
195
196     @type name: str
197     @param name: the node name
198     @type address: str
199     @keyword address: the node address, if known
200
201     """
202     if address is None:
203       address = name
204
205     self.nc[name] = \
206       http.client.HttpClientRequest(address, self.port, http.HTTP_PUT,
207                                     "/%s" % self.procedure,
208                                     post_data=self.body,
209                                     ssl_params=self._ssl_params,
210                                     ssl_verify_peer=True)
211
212   def GetResults(self):
213     """Call nodes and return results.
214
215     @rtype: list
216     @returns: List of RPC results
217
218     """
219     assert _http_manager, "RPC module not intialized"
220
221     _http_manager.ExecRequests(self.nc.values())
222
223     results = {}
224
225     for name, req in self.nc.iteritems():
226       if req.success and req.resp_status_code == http.HTTP_OK:
227         results[name] = RpcResult(data=serializer.LoadJson(req.resp_body),
228                                   node=name, call=self.procedure)
229         continue
230
231       # TODO: Better error reporting
232       if req.error:
233         msg = req.error
234       else:
235         msg = req.resp_body
236
237       logging.error("RPC error in %s from node %s: %s",
238                     self.procedure, name, msg)
239       results[name] = RpcResult(data=msg, failed=True, node=name,
240                                 call=self.procedure)
241
242     return results
243
244
245 class RpcRunner(object):
246   """RPC runner class"""
247
248   def __init__(self, cfg):
249     """Initialized the rpc runner.
250
251     @type cfg:  C{config.ConfigWriter}
252     @param cfg: the configuration object that will be used to get data
253                 about the cluster
254
255     """
256     self._cfg = cfg
257     self.port = utils.GetNodeDaemonPort()
258
259   def _InstDict(self, instance):
260     """Convert the given instance to a dict.
261
262     This is done via the instance's ToDict() method and additionally
263     we fill the hvparams with the cluster defaults.
264
265     @type instance: L{objects.Instance}
266     @param instance: an Instance object
267     @rtype: dict
268     @return: the instance dict, with the hvparams filled with the
269         cluster defaults
270
271     """
272     idict = instance.ToDict()
273     cluster = self._cfg.GetClusterInfo()
274     idict["hvparams"] = cluster.FillHV(instance)
275     idict["beparams"] = cluster.FillBE(instance)
276     return idict
277
278   def _ConnectList(self, client, node_list, call):
279     """Helper for computing node addresses.
280
281     @type client: L{Client}
282     @param client: a C{Client} instance
283     @type node_list: list
284     @param node_list: the node list we should connect
285     @type call: string
286     @param call: the name of the remote procedure call, for filling in
287         correctly any eventual offline nodes' results
288
289     """
290     all_nodes = self._cfg.GetAllNodesInfo()
291     name_list = []
292     addr_list = []
293     skip_dict = {}
294     for node in node_list:
295       if node in all_nodes:
296         if all_nodes[node].offline:
297           skip_dict[node] = RpcResult(node=node, offline=True, call=call)
298           continue
299         val = all_nodes[node].primary_ip
300       else:
301         val = None
302       addr_list.append(val)
303       name_list.append(node)
304     if name_list:
305       client.ConnectList(name_list, address_list=addr_list)
306     return skip_dict
307
308   def _ConnectNode(self, client, node, call):
309     """Helper for computing one node's address.
310
311     @type client: L{Client}
312     @param client: a C{Client} instance
313     @type node: str
314     @param node: the node we should connect
315     @type call: string
316     @param call: the name of the remote procedure call, for filling in
317         correctly any eventual offline nodes' results
318
319     """
320     node_info = self._cfg.GetNodeInfo(node)
321     if node_info is not None:
322       if node_info.offline:
323         return RpcResult(node=node, offline=True, call=call)
324       addr = node_info.primary_ip
325     else:
326       addr = None
327     client.ConnectNode(node, address=addr)
328
329   def _MultiNodeCall(self, node_list, procedure, args):
330     """Helper for making a multi-node call
331
332     """
333     body = serializer.DumpJson(args, indent=False)
334     c = Client(procedure, body, self.port)
335     skip_dict = self._ConnectList(c, node_list, procedure)
336     skip_dict.update(c.GetResults())
337     return skip_dict
338
339   @classmethod
340   def _StaticMultiNodeCall(cls, node_list, procedure, args,
341                            address_list=None):
342     """Helper for making a multi-node static call
343
344     """
345     body = serializer.DumpJson(args, indent=False)
346     c = Client(procedure, body, utils.GetNodeDaemonPort())
347     c.ConnectList(node_list, address_list=address_list)
348     return c.GetResults()
349
350   def _SingleNodeCall(self, node, procedure, args):
351     """Helper for making a single-node call
352
353     """
354     body = serializer.DumpJson(args, indent=False)
355     c = Client(procedure, body, self.port)
356     result = self._ConnectNode(c, node, procedure)
357     if result is None:
358       # we did connect, node is not offline
359       result = c.GetResults()[node]
360     return result
361
362   @classmethod
363   def _StaticSingleNodeCall(cls, node, procedure, args):
364     """Helper for making a single-node static call
365
366     """
367     body = serializer.DumpJson(args, indent=False)
368     c = Client(procedure, body, utils.GetNodeDaemonPort())
369     c.ConnectNode(node)
370     return c.GetResults()[node]
371
372   @staticmethod
373   def _Compress(data):
374     """Compresses a string for transport over RPC.
375
376     Small amounts of data are not compressed.
377
378     @type data: str
379     @param data: Data
380     @rtype: tuple
381     @return: Encoded data to send
382
383     """
384     # Small amounts of data are not compressed
385     if len(data) < 512:
386       return (constants.RPC_ENCODING_NONE, data)
387
388     # Compress with zlib and encode in base64
389     return (constants.RPC_ENCODING_ZLIB_BASE64,
390             base64.b64encode(zlib.compress(data, 3)))
391
392   #
393   # Begin RPC calls
394   #
395
396   def call_volume_list(self, node_list, vg_name):
397     """Gets the logical volumes present in a given volume group.
398
399     This is a multi-node call.
400
401     """
402     return self._MultiNodeCall(node_list, "volume_list", [vg_name])
403
404   def call_vg_list(self, node_list):
405     """Gets the volume group list.
406
407     This is a multi-node call.
408
409     """
410     return self._MultiNodeCall(node_list, "vg_list", [])
411
412   def call_bridges_exist(self, node, bridges_list):
413     """Checks if a node has all the bridges given.
414
415     This method checks if all bridges given in the bridges_list are
416     present on the remote node, so that an instance that uses interfaces
417     on those bridges can be started.
418
419     This is a single-node call.
420
421     """
422     return self._SingleNodeCall(node, "bridges_exist", [bridges_list])
423
424   def call_instance_start(self, node, instance, extra_args):
425     """Starts an instance.
426
427     This is a single-node call.
428
429     """
430     return self._SingleNodeCall(node, "instance_start",
431                                 [self._InstDict(instance), extra_args])
432
433   def call_instance_shutdown(self, node, instance):
434     """Stops an instance.
435
436     This is a single-node call.
437
438     """
439     return self._SingleNodeCall(node, "instance_shutdown",
440                                 [self._InstDict(instance)])
441
442   def call_migration_info(self, node, instance):
443     """Gather the information necessary to prepare an instance migration.
444
445     This is a single-node call.
446
447     @type node: string
448     @param node: the node on which the instance is currently running
449     @type instance: C{objects.Instance}
450     @param instance: the instance definition
451
452     """
453     return self._SingleNodeCall(node, "migration_info",
454                                 [self._InstDict(instance)])
455
456   def call_accept_instance(self, node, instance, info, target):
457     """Prepare a node to accept an instance.
458
459     This is a single-node call.
460
461     @type node: string
462     @param node: the target node for the migration
463     @type instance: C{objects.Instance}
464     @param instance: the instance definition
465     @type info: opaque/hypervisor specific (string/data)
466     @param info: result for the call_migration_info call
467     @type target: string
468     @param target: target hostname (usually ip address) (on the node itself)
469
470     """
471     return self._SingleNodeCall(node, "accept_instance",
472                                 [self._InstDict(instance), info, target])
473
474   def call_finalize_migration(self, node, instance, info, success):
475     """Finalize any target-node migration specific operation.
476
477     This is called both in case of a successful migration and in case of error
478     (in which case it should abort the migration).
479
480     This is a single-node call.
481
482     @type node: string
483     @param node: the target node for the migration
484     @type instance: C{objects.Instance}
485     @param instance: the instance definition
486     @type info: opaque/hypervisor specific (string/data)
487     @param info: result for the call_migration_info call
488     @type success: boolean
489     @param success: whether the migration was a success or a failure
490
491     """
492     return self._SingleNodeCall(node, "finalize_migration",
493                                 [self._InstDict(instance), info, success])
494
495   def call_instance_migrate(self, node, instance, target, live):
496     """Migrate an instance.
497
498     This is a single-node call.
499
500     @type node: string
501     @param node: the node on which the instance is currently running
502     @type instance: C{objects.Instance}
503     @param instance: the instance definition
504     @type target: string
505     @param target: the target node name
506     @type live: boolean
507     @param live: whether the migration should be done live or not (the
508         interpretation of this parameter is left to the hypervisor)
509
510     """
511     return self._SingleNodeCall(node, "instance_migrate",
512                                 [self._InstDict(instance), target, live])
513
514   def call_instance_reboot(self, node, instance, reboot_type, extra_args):
515     """Reboots an instance.
516
517     This is a single-node call.
518
519     """
520     return self._SingleNodeCall(node, "instance_reboot",
521                                 [self._InstDict(instance), reboot_type,
522                                  extra_args])
523
524   def call_instance_os_add(self, node, inst):
525     """Installs an OS on the given instance.
526
527     This is a single-node call.
528
529     """
530     return self._SingleNodeCall(node, "instance_os_add",
531                                 [self._InstDict(inst)])
532
533   def call_instance_run_rename(self, node, inst, old_name):
534     """Run the OS rename script for an instance.
535
536     This is a single-node call.
537
538     """
539     return self._SingleNodeCall(node, "instance_run_rename",
540                                 [self._InstDict(inst), old_name])
541
542   def call_instance_info(self, node, instance, hname):
543     """Returns information about a single instance.
544
545     This is a single-node call.
546
547     @type node: list
548     @param node: the list of nodes to query
549     @type instance: string
550     @param instance: the instance name
551     @type hname: string
552     @param hname: the hypervisor type of the instance
553
554     """
555     return self._SingleNodeCall(node, "instance_info", [instance, hname])
556
557   def call_instance_migratable(self, node, instance):
558     """Checks whether the given instance can be migrated.
559
560     This is a single-node call.
561
562     @param node: the node to query
563     @type instance: L{objects.Instance}
564     @param instance: the instance to check
565
566
567     """
568     return self._SingleNodeCall(node, "instance_migratable",
569                                 [self._InstDict(instance)])
570
571   def call_all_instances_info(self, node_list, hypervisor_list):
572     """Returns information about all instances on the given nodes.
573
574     This is a multi-node call.
575
576     @type node_list: list
577     @param node_list: the list of nodes to query
578     @type hypervisor_list: list
579     @param hypervisor_list: the hypervisors to query for instances
580
581     """
582     return self._MultiNodeCall(node_list, "all_instances_info",
583                                [hypervisor_list])
584
585   def call_instance_list(self, node_list, hypervisor_list):
586     """Returns the list of running instances on a given node.
587
588     This is a multi-node call.
589
590     @type node_list: list
591     @param node_list: the list of nodes to query
592     @type hypervisor_list: list
593     @param hypervisor_list: the hypervisors to query for instances
594
595     """
596     return self._MultiNodeCall(node_list, "instance_list", [hypervisor_list])
597
598   def call_node_tcp_ping(self, node, source, target, port, timeout,
599                          live_port_needed):
600     """Do a TcpPing on the remote node
601
602     This is a single-node call.
603
604     """
605     return self._SingleNodeCall(node, "node_tcp_ping",
606                                 [source, target, port, timeout,
607                                  live_port_needed])
608
609   def call_node_has_ip_address(self, node, address):
610     """Checks if a node has the given IP address.
611
612     This is a single-node call.
613
614     """
615     return self._SingleNodeCall(node, "node_has_ip_address", [address])
616
617   def call_node_info(self, node_list, vg_name, hypervisor_type):
618     """Return node information.
619
620     This will return memory information and volume group size and free
621     space.
622
623     This is a multi-node call.
624
625     @type node_list: list
626     @param node_list: the list of nodes to query
627     @type vg_name: C{string}
628     @param vg_name: the name of the volume group to ask for disk space
629         information
630     @type hypervisor_type: C{str}
631     @param hypervisor_type: the name of the hypervisor to ask for
632         memory information
633
634     """
635     retux = self._MultiNodeCall(node_list, "node_info",
636                                 [vg_name, hypervisor_type])
637
638     for result in retux.itervalues():
639       if result.failed or not isinstance(result.data, dict):
640         result.data = {}
641       if result.offline:
642         log_name = None
643       else:
644         log_name = "call_node_info"
645
646       utils.CheckDict(result.data, {
647         'memory_total' : '-',
648         'memory_dom0' : '-',
649         'memory_free' : '-',
650         'vg_size' : 'node_unreachable',
651         'vg_free' : '-',
652         }, log_name)
653     return retux
654
655   def call_node_add(self, node, dsa, dsapub, rsa, rsapub, ssh, sshpub):
656     """Add a node to the cluster.
657
658     This is a single-node call.
659
660     """
661     return self._SingleNodeCall(node, "node_add",
662                                 [dsa, dsapub, rsa, rsapub, ssh, sshpub])
663
664   def call_node_verify(self, node_list, checkdict, cluster_name):
665     """Request verification of given parameters.
666
667     This is a multi-node call.
668
669     """
670     return self._MultiNodeCall(node_list, "node_verify",
671                                [checkdict, cluster_name])
672
673   @classmethod
674   def call_node_start_master(cls, node, start_daemons):
675     """Tells a node to activate itself as a master.
676
677     This is a single-node call.
678
679     """
680     return cls._StaticSingleNodeCall(node, "node_start_master",
681                                      [start_daemons])
682
683   @classmethod
684   def call_node_stop_master(cls, node, stop_daemons):
685     """Tells a node to demote itself from master status.
686
687     This is a single-node call.
688
689     """
690     return cls._StaticSingleNodeCall(node, "node_stop_master", [stop_daemons])
691
692   @classmethod
693   def call_master_info(cls, node_list):
694     """Query master info.
695
696     This is a multi-node call.
697
698     """
699     # TODO: should this method query down nodes?
700     return cls._StaticMultiNodeCall(node_list, "master_info", [])
701
702   def call_version(self, node_list):
703     """Query node version.
704
705     This is a multi-node call.
706
707     """
708     return self._MultiNodeCall(node_list, "version", [])
709
710   def call_blockdev_create(self, node, bdev, size, owner, on_primary, info):
711     """Request creation of a given block device.
712
713     This is a single-node call.
714
715     """
716     return self._SingleNodeCall(node, "blockdev_create",
717                                 [bdev.ToDict(), size, owner, on_primary, info])
718
719   def call_blockdev_remove(self, node, bdev):
720     """Request removal of a given block device.
721
722     This is a single-node call.
723
724     """
725     return self._SingleNodeCall(node, "blockdev_remove", [bdev.ToDict()])
726
727   def call_blockdev_rename(self, node, devlist):
728     """Request rename of the given block devices.
729
730     This is a single-node call.
731
732     """
733     return self._SingleNodeCall(node, "blockdev_rename",
734                                 [(d.ToDict(), uid) for d, uid in devlist])
735
736   def call_blockdev_assemble(self, node, disk, owner, on_primary):
737     """Request assembling of a given block device.
738
739     This is a single-node call.
740
741     """
742     return self._SingleNodeCall(node, "blockdev_assemble",
743                                 [disk.ToDict(), owner, on_primary])
744
745   def call_blockdev_shutdown(self, node, disk):
746     """Request shutdown of a given block device.
747
748     This is a single-node call.
749
750     """
751     return self._SingleNodeCall(node, "blockdev_shutdown", [disk.ToDict()])
752
753   def call_blockdev_addchildren(self, node, bdev, ndevs):
754     """Request adding a list of children to a (mirroring) device.
755
756     This is a single-node call.
757
758     """
759     return self._SingleNodeCall(node, "blockdev_addchildren",
760                                 [bdev.ToDict(),
761                                  [disk.ToDict() for disk in ndevs]])
762
763   def call_blockdev_removechildren(self, node, bdev, ndevs):
764     """Request removing a list of children from a (mirroring) device.
765
766     This is a single-node call.
767
768     """
769     return self._SingleNodeCall(node, "blockdev_removechildren",
770                                 [bdev.ToDict(),
771                                  [disk.ToDict() for disk in ndevs]])
772
773   def call_blockdev_getmirrorstatus(self, node, disks):
774     """Request status of a (mirroring) device.
775
776     This is a single-node call.
777
778     """
779     return self._SingleNodeCall(node, "blockdev_getmirrorstatus",
780                                 [dsk.ToDict() for dsk in disks])
781
782   def call_blockdev_find(self, node, disk):
783     """Request identification of a given block device.
784
785     This is a single-node call.
786
787     """
788     return self._SingleNodeCall(node, "blockdev_find", [disk.ToDict()])
789
790   def call_blockdev_close(self, node, instance_name, disks):
791     """Closes the given block devices.
792
793     This is a single-node call.
794
795     """
796     params = [instance_name, [cf.ToDict() for cf in disks]]
797     return self._SingleNodeCall(node, "blockdev_close", params)
798
799   def call_drbd_disconnect_net(self, node_list, nodes_ip, disks):
800     """Disconnects the network of the given drbd devices.
801
802     This is a multi-node call.
803
804     """
805     return self._MultiNodeCall(node_list, "drbd_disconnect_net",
806                                [nodes_ip, [cf.ToDict() for cf in disks]])
807
808   def call_drbd_attach_net(self, node_list, nodes_ip,
809                            disks, instance_name, multimaster):
810     """Disconnects the given drbd devices.
811
812     This is a multi-node call.
813
814     """
815     return self._MultiNodeCall(node_list, "drbd_attach_net",
816                                [nodes_ip, [cf.ToDict() for cf in disks],
817                                 instance_name, multimaster])
818
819   def call_drbd_wait_sync(self, node_list, nodes_ip, disks):
820     """Waits for the synchronization of drbd devices is complete.
821
822     This is a multi-node call.
823
824     """
825     return self._MultiNodeCall(node_list, "drbd_wait_sync",
826                                [nodes_ip, [cf.ToDict() for cf in disks]])
827
828   @classmethod
829   def call_upload_file(cls, node_list, file_name, address_list=None):
830     """Upload a file.
831
832     The node will refuse the operation in case the file is not on the
833     approved file list.
834
835     This is a multi-node call.
836
837     @type node_list: list
838     @param node_list: the list of node names to upload to
839     @type file_name: str
840     @param file_name: the filename to upload
841     @type address_list: list or None
842     @keyword address_list: an optional list of node addresses, in order
843         to optimize the RPC speed
844
845     """
846     file_contents = utils.ReadFile(file_name)
847     data = cls._Compress(file_contents)
848     st = os.stat(file_name)
849     params = [file_name, data, st.st_mode, st.st_uid, st.st_gid,
850               st.st_atime, st.st_mtime]
851     return cls._StaticMultiNodeCall(node_list, "upload_file", params,
852                                     address_list=address_list)
853
854   @classmethod
855   def call_write_ssconf_files(cls, node_list, values):
856     """Write ssconf files.
857
858     This is a multi-node call.
859
860     """
861     return cls._StaticMultiNodeCall(node_list, "write_ssconf_files", [values])
862
863   def call_os_diagnose(self, node_list):
864     """Request a diagnose of OS definitions.
865
866     This is a multi-node call.
867
868     """
869     result = self._MultiNodeCall(node_list, "os_diagnose", [])
870
871     for node_result in result.values():
872       if not node_result.failed and node_result.data:
873         node_result.data = [objects.OS.FromDict(oss)
874                             for oss in node_result.data]
875     return result
876
877   def call_os_get(self, node, name):
878     """Returns an OS definition.
879
880     This is a single-node call.
881
882     """
883     result = self._SingleNodeCall(node, "os_get", [name])
884     if not result.failed and isinstance(result.data, dict):
885       result.data = objects.OS.FromDict(result.data)
886     return result
887
888   def call_hooks_runner(self, node_list, hpath, phase, env):
889     """Call the hooks runner.
890
891     Args:
892       - op: the OpCode instance
893       - env: a dictionary with the environment
894
895     This is a multi-node call.
896
897     """
898     params = [hpath, phase, env]
899     return self._MultiNodeCall(node_list, "hooks_runner", params)
900
901   def call_iallocator_runner(self, node, name, idata):
902     """Call an iallocator on a remote node
903
904     Args:
905       - name: the iallocator name
906       - input: the json-encoded input string
907
908     This is a single-node call.
909
910     """
911     return self._SingleNodeCall(node, "iallocator_runner", [name, idata])
912
913   def call_blockdev_grow(self, node, cf_bdev, amount):
914     """Request a snapshot of the given block device.
915
916     This is a single-node call.
917
918     """
919     return self._SingleNodeCall(node, "blockdev_grow",
920                                 [cf_bdev.ToDict(), amount])
921
922   def call_blockdev_snapshot(self, node, cf_bdev):
923     """Request a snapshot of the given block device.
924
925     This is a single-node call.
926
927     """
928     return self._SingleNodeCall(node, "blockdev_snapshot", [cf_bdev.ToDict()])
929
930   def call_snapshot_export(self, node, snap_bdev, dest_node, instance,
931                            cluster_name, idx):
932     """Request the export of a given snapshot.
933
934     This is a single-node call.
935
936     """
937     return self._SingleNodeCall(node, "snapshot_export",
938                                 [snap_bdev.ToDict(), dest_node,
939                                  self._InstDict(instance), cluster_name, idx])
940
941   def call_finalize_export(self, node, instance, snap_disks):
942     """Request the completion of an export operation.
943
944     This writes the export config file, etc.
945
946     This is a single-node call.
947
948     """
949     flat_disks = []
950     for disk in snap_disks:
951       flat_disks.append(disk.ToDict())
952
953     return self._SingleNodeCall(node, "finalize_export",
954                                 [self._InstDict(instance), flat_disks])
955
956   def call_export_info(self, node, path):
957     """Queries the export information in a given path.
958
959     This is a single-node call.
960
961     """
962     result = self._SingleNodeCall(node, "export_info", [path])
963     if not result.failed and result.data:
964       result.data = objects.SerializableConfigParser.Loads(str(result.data))
965     return result
966
967   def call_instance_os_import(self, node, inst, src_node, src_images,
968                               cluster_name):
969     """Request the import of a backup into an instance.
970
971     This is a single-node call.
972
973     """
974     return self._SingleNodeCall(node, "instance_os_import",
975                                 [self._InstDict(inst), src_node, src_images,
976                                  cluster_name])
977
978   def call_export_list(self, node_list):
979     """Gets the stored exports list.
980
981     This is a multi-node call.
982
983     """
984     return self._MultiNodeCall(node_list, "export_list", [])
985
986   def call_export_remove(self, node, export):
987     """Requests removal of a given export.
988
989     This is a single-node call.
990
991     """
992     return self._SingleNodeCall(node, "export_remove", [export])
993
994   @classmethod
995   def call_node_leave_cluster(cls, node):
996     """Requests a node to clean the cluster information it has.
997
998     This will remove the configuration information from the ganeti data
999     dir.
1000
1001     This is a single-node call.
1002
1003     """
1004     return cls._StaticSingleNodeCall(node, "node_leave_cluster", [])
1005
1006   def call_node_volumes(self, node_list):
1007     """Gets all volumes on node(s).
1008
1009     This is a multi-node call.
1010
1011     """
1012     return self._MultiNodeCall(node_list, "node_volumes", [])
1013
1014   def call_node_demote_from_mc(self, node):
1015     """Demote a node from the master candidate role.
1016
1017     This is a single-node call.
1018
1019     """
1020     return self._SingleNodeCall(node, "node_demote_from_mc", [])
1021
1022   def call_test_delay(self, node_list, duration):
1023     """Sleep for a fixed time on given node(s).
1024
1025     This is a multi-node call.
1026
1027     """
1028     return self._MultiNodeCall(node_list, "test_delay", [duration])
1029
1030   def call_file_storage_dir_create(self, node, file_storage_dir):
1031     """Create the given file storage directory.
1032
1033     This is a single-node call.
1034
1035     """
1036     return self._SingleNodeCall(node, "file_storage_dir_create",
1037                                 [file_storage_dir])
1038
1039   def call_file_storage_dir_remove(self, node, file_storage_dir):
1040     """Remove the given file storage directory.
1041
1042     This is a single-node call.
1043
1044     """
1045     return self._SingleNodeCall(node, "file_storage_dir_remove",
1046                                 [file_storage_dir])
1047
1048   def call_file_storage_dir_rename(self, node, old_file_storage_dir,
1049                                    new_file_storage_dir):
1050     """Rename file storage directory.
1051
1052     This is a single-node call.
1053
1054     """
1055     return self._SingleNodeCall(node, "file_storage_dir_rename",
1056                                 [old_file_storage_dir, new_file_storage_dir])
1057
1058   @classmethod
1059   def call_jobqueue_update(cls, node_list, address_list, file_name, content):
1060     """Update job queue.
1061
1062     This is a multi-node call.
1063
1064     """
1065     return cls._StaticMultiNodeCall(node_list, "jobqueue_update",
1066                                     [file_name, cls._Compress(content)],
1067                                     address_list=address_list)
1068
1069   @classmethod
1070   def call_jobqueue_purge(cls, node):
1071     """Purge job queue.
1072
1073     This is a single-node call.
1074
1075     """
1076     return cls._StaticSingleNodeCall(node, "jobqueue_purge", [])
1077
1078   @classmethod
1079   def call_jobqueue_rename(cls, node_list, address_list, rename):
1080     """Rename a job queue file.
1081
1082     This is a multi-node call.
1083
1084     """
1085     return cls._StaticMultiNodeCall(node_list, "jobqueue_rename", rename,
1086                                     address_list=address_list)
1087
1088   @classmethod
1089   def call_jobqueue_set_drain(cls, node_list, drain_flag):
1090     """Set the drain flag on the queue.
1091
1092     This is a multi-node call.
1093
1094     @type node_list: list
1095     @param node_list: the list of nodes to query
1096     @type drain_flag: bool
1097     @param drain_flag: if True, will set the drain flag, otherwise reset it.
1098
1099     """
1100     return cls._StaticMultiNodeCall(node_list, "jobqueue_set_drain",
1101                                     [drain_flag])
1102
1103   def call_hypervisor_validate_params(self, node_list, hvname, hvparams):
1104     """Validate the hypervisor params.
1105
1106     This is a multi-node call.
1107
1108     @type node_list: list
1109     @param node_list: the list of nodes to query
1110     @type hvname: string
1111     @param hvname: the hypervisor name
1112     @type hvparams: dict
1113     @param hvparams: the hypervisor parameters to be validated
1114
1115     """
1116     cluster = self._cfg.GetClusterInfo()
1117     hv_full = cluster.FillDict(cluster.hvparams.get(hvname, {}), hvparams)
1118     return self._MultiNodeCall(node_list, "hypervisor_validate_params",
1119                                [hvname, hv_full])