Pre-compute error status in RpcResult
[ganeti-local] / lib / rpc.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Inter-node RPC library.
23
24 """
25
26 # pylint: disable-msg=C0103,R0201,R0904
27 # C0103: Invalid name, since call_ are not valid
28 # R0201: Method could be a function, we keep all rpcs instance methods
29 # as not to change them back and forth between static/instance methods
30 # if they need to start using instance attributes
31 # R0904: Too many public methods
32
33 import os
34 import socket
35 import logging
36 import zlib
37 import base64
38
39 from ganeti import utils
40 from ganeti import objects
41 from ganeti import http
42 from ganeti import serializer
43 from ganeti import constants
44 from ganeti import errors
45
46 import ganeti.http.client
47
48
49 # Module level variable
50 _http_manager = None
51
52
53 def Init():
54   """Initializes the module-global HTTP client manager.
55
56   Must be called before using any RPC function.
57
58   """
59   global _http_manager
60
61   assert not _http_manager, "RPC module initialized more than once"
62
63   _http_manager = http.client.HttpClientManager()
64
65
66 def Shutdown():
67   """Stops the module-global HTTP client manager.
68
69   Must be called before quitting the program.
70
71   """
72   global _http_manager
73
74   if _http_manager:
75     _http_manager.Shutdown()
76     _http_manager = None
77
78
79 class RpcResult(object):
80   """RPC Result class.
81
82   This class holds an RPC result. It is needed since in multi-node
83   calls we can't raise an exception just because one one out of many
84   failed, and therefore we use this class to encapsulate the result.
85
86   @ivar data: the data payload, for successfull results, or None
87   @type failed: boolean
88   @ivar failed: whether the operation failed at transport level (not
89       application level on the remote node)
90   @ivar call: the name of the RPC call
91   @ivar node: the name of the node to which we made the call
92   @ivar offline: whether the operation failed because the node was
93       offline, as opposed to actual failure; offline=True will always
94       imply failed=True, in order to allow simpler checking if
95       the user doesn't care about the exact failure mode
96   @ivar error: the error message if the call failed
97
98   """
99   def __init__(self, data=None, failed=False, offline=False,
100                call=None, node=None):
101     self.failed = failed
102     self.offline = offline
103     self.call = call
104     self.node = node
105     if offline:
106       self.failed = True
107       self.error = "Node is marked offline"
108       self.data = self.payload = None
109     elif failed:
110       self.error = self._EnsureErr(data)
111       self.data = self.payload = None
112     else:
113       self.data = data
114       if not isinstance(self.data, (tuple, list)):
115         self.error = ("RPC layer error: invalid result type (%s)" %
116                       type(self.data))
117       elif len(data) != 2:
118         self.error = ("RPC layer error: invalid result length (%d), "
119                       "expected 2" % len(self.data))
120       elif not self.data[0]:
121         self.error = self._EnsureErr(self.data[1])
122       else:
123         # finally success
124         self.error = None
125         self.payload = data[1]
126
127   @staticmethod
128   def _EnsureErr(val):
129     """Helper to ensure we return a 'True' value for error."""
130     if val:
131       return val
132     else:
133       return "No error information"
134
135   def Raise(self):
136     """If the result has failed, raise an OpExecError.
137
138     This is used so that LU code doesn't have to check for each
139     result, but instead can call this function.
140
141     """
142     if self.failed:
143       raise errors.OpExecError("Call '%s' to node '%s' has failed: %s" %
144                                (self.call, self.node, self.error))
145
146   def RemoteFailMsg(self):
147     """Check if the remote procedure failed.
148
149     @return: the fail_msg attribute
150
151     """
152     return self.error
153
154
155 class Client:
156   """RPC Client class.
157
158   This class, given a (remote) method name, a list of parameters and a
159   list of nodes, will contact (in parallel) all nodes, and return a
160   dict of results (key: node name, value: result).
161
162   One current bug is that generic failure is still signalled by
163   'False' result, which is not good. This overloading of values can
164   cause bugs.
165
166   """
167   def __init__(self, procedure, body, port):
168     self.procedure = procedure
169     self.body = body
170     self.port = port
171     self.nc = {}
172
173     self._ssl_params = \
174       http.HttpSslParams(ssl_key_path=constants.SSL_CERT_FILE,
175                          ssl_cert_path=constants.SSL_CERT_FILE)
176
177   def ConnectList(self, node_list, address_list=None):
178     """Add a list of nodes to the target nodes.
179
180     @type node_list: list
181     @param node_list: the list of node names to connect
182     @type address_list: list or None
183     @keyword address_list: either None or a list with node addresses,
184         which must have the same length as the node list
185
186     """
187     if address_list is None:
188       address_list = [None for _ in node_list]
189     else:
190       assert len(node_list) == len(address_list), \
191              "Name and address lists should have the same length"
192     for node, address in zip(node_list, address_list):
193       self.ConnectNode(node, address)
194
195   def ConnectNode(self, name, address=None):
196     """Add a node to the target list.
197
198     @type name: str
199     @param name: the node name
200     @type address: str
201     @keyword address: the node address, if known
202
203     """
204     if address is None:
205       address = name
206
207     self.nc[name] = \
208       http.client.HttpClientRequest(address, self.port, http.HTTP_PUT,
209                                     "/%s" % self.procedure,
210                                     post_data=self.body,
211                                     ssl_params=self._ssl_params,
212                                     ssl_verify_peer=True)
213
214   def GetResults(self):
215     """Call nodes and return results.
216
217     @rtype: list
218     @return: List of RPC results
219
220     """
221     assert _http_manager, "RPC module not intialized"
222
223     _http_manager.ExecRequests(self.nc.values())
224
225     results = {}
226
227     for name, req in self.nc.iteritems():
228       if req.success and req.resp_status_code == http.HTTP_OK:
229         results[name] = RpcResult(data=serializer.LoadJson(req.resp_body),
230                                   node=name, call=self.procedure)
231         continue
232
233       # TODO: Better error reporting
234       if req.error:
235         msg = req.error
236       else:
237         msg = req.resp_body
238
239       logging.error("RPC error in %s from node %s: %s",
240                     self.procedure, name, msg)
241       results[name] = RpcResult(data=msg, failed=True, node=name,
242                                 call=self.procedure)
243
244     return results
245
246
247 class RpcRunner(object):
248   """RPC runner class"""
249
250   def __init__(self, cfg):
251     """Initialized the rpc runner.
252
253     @type cfg:  C{config.ConfigWriter}
254     @param cfg: the configuration object that will be used to get data
255                 about the cluster
256
257     """
258     self._cfg = cfg
259     self.port = utils.GetNodeDaemonPort()
260
261   def _InstDict(self, instance, hvp=None, bep=None):
262     """Convert the given instance to a dict.
263
264     This is done via the instance's ToDict() method and additionally
265     we fill the hvparams with the cluster defaults.
266
267     @type instance: L{objects.Instance}
268     @param instance: an Instance object
269     @type hvp: dict or None
270     @param hvp: a dictionary with overriden hypervisor parameters
271     @type bep: dict or None
272     @param bep: a dictionary with overriden backend parameters
273     @rtype: dict
274     @return: the instance dict, with the hvparams filled with the
275         cluster defaults
276
277     """
278     idict = instance.ToDict()
279     cluster = self._cfg.GetClusterInfo()
280     idict["hvparams"] = cluster.FillHV(instance)
281     if hvp is not None:
282       idict["hvparams"].update(hvp)
283     idict["beparams"] = cluster.FillBE(instance)
284     if bep is not None:
285       idict["beparams"].update(bep)
286     for nic in idict["nics"]:
287       nic['nicparams'] = objects.FillDict(
288         cluster.nicparams[constants.PP_DEFAULT],
289         nic['nicparams'])
290     return idict
291
292   def _ConnectList(self, client, node_list, call):
293     """Helper for computing node addresses.
294
295     @type client: L{Client}
296     @param client: a C{Client} instance
297     @type node_list: list
298     @param node_list: the node list we should connect
299     @type call: string
300     @param call: the name of the remote procedure call, for filling in
301         correctly any eventual offline nodes' results
302
303     """
304     all_nodes = self._cfg.GetAllNodesInfo()
305     name_list = []
306     addr_list = []
307     skip_dict = {}
308     for node in node_list:
309       if node in all_nodes:
310         if all_nodes[node].offline:
311           skip_dict[node] = RpcResult(node=node, offline=True, call=call)
312           continue
313         val = all_nodes[node].primary_ip
314       else:
315         val = None
316       addr_list.append(val)
317       name_list.append(node)
318     if name_list:
319       client.ConnectList(name_list, address_list=addr_list)
320     return skip_dict
321
322   def _ConnectNode(self, client, node, call):
323     """Helper for computing one node's address.
324
325     @type client: L{Client}
326     @param client: a C{Client} instance
327     @type node: str
328     @param node: the node we should connect
329     @type call: string
330     @param call: the name of the remote procedure call, for filling in
331         correctly any eventual offline nodes' results
332
333     """
334     node_info = self._cfg.GetNodeInfo(node)
335     if node_info is not None:
336       if node_info.offline:
337         return RpcResult(node=node, offline=True, call=call)
338       addr = node_info.primary_ip
339     else:
340       addr = None
341     client.ConnectNode(node, address=addr)
342
343   def _MultiNodeCall(self, node_list, procedure, args):
344     """Helper for making a multi-node call
345
346     """
347     body = serializer.DumpJson(args, indent=False)
348     c = Client(procedure, body, self.port)
349     skip_dict = self._ConnectList(c, node_list, procedure)
350     skip_dict.update(c.GetResults())
351     return skip_dict
352
353   @classmethod
354   def _StaticMultiNodeCall(cls, node_list, procedure, args,
355                            address_list=None):
356     """Helper for making a multi-node static call
357
358     """
359     body = serializer.DumpJson(args, indent=False)
360     c = Client(procedure, body, utils.GetNodeDaemonPort())
361     c.ConnectList(node_list, address_list=address_list)
362     return c.GetResults()
363
364   def _SingleNodeCall(self, node, procedure, args):
365     """Helper for making a single-node call
366
367     """
368     body = serializer.DumpJson(args, indent=False)
369     c = Client(procedure, body, self.port)
370     result = self._ConnectNode(c, node, procedure)
371     if result is None:
372       # we did connect, node is not offline
373       result = c.GetResults()[node]
374     return result
375
376   @classmethod
377   def _StaticSingleNodeCall(cls, node, procedure, args):
378     """Helper for making a single-node static call
379
380     """
381     body = serializer.DumpJson(args, indent=False)
382     c = Client(procedure, body, utils.GetNodeDaemonPort())
383     c.ConnectNode(node)
384     return c.GetResults()[node]
385
386   @staticmethod
387   def _Compress(data):
388     """Compresses a string for transport over RPC.
389
390     Small amounts of data are not compressed.
391
392     @type data: str
393     @param data: Data
394     @rtype: tuple
395     @return: Encoded data to send
396
397     """
398     # Small amounts of data are not compressed
399     if len(data) < 512:
400       return (constants.RPC_ENCODING_NONE, data)
401
402     # Compress with zlib and encode in base64
403     return (constants.RPC_ENCODING_ZLIB_BASE64,
404             base64.b64encode(zlib.compress(data, 3)))
405
406   #
407   # Begin RPC calls
408   #
409
410   def call_volume_list(self, node_list, vg_name):
411     """Gets the logical volumes present in a given volume group.
412
413     This is a multi-node call.
414
415     """
416     return self._MultiNodeCall(node_list, "volume_list", [vg_name])
417
418   def call_vg_list(self, node_list):
419     """Gets the volume group list.
420
421     This is a multi-node call.
422
423     """
424     return self._MultiNodeCall(node_list, "vg_list", [])
425
426   def call_bridges_exist(self, node, bridges_list):
427     """Checks if a node has all the bridges given.
428
429     This method checks if all bridges given in the bridges_list are
430     present on the remote node, so that an instance that uses interfaces
431     on those bridges can be started.
432
433     This is a single-node call.
434
435     """
436     return self._SingleNodeCall(node, "bridges_exist", [bridges_list])
437
438   def call_instance_start(self, node, instance, hvp, bep):
439     """Starts an instance.
440
441     This is a single-node call.
442
443     """
444     idict = self._InstDict(instance, hvp=hvp, bep=bep)
445     return self._SingleNodeCall(node, "instance_start", [idict])
446
447   def call_instance_shutdown(self, node, instance):
448     """Stops an instance.
449
450     This is a single-node call.
451
452     """
453     return self._SingleNodeCall(node, "instance_shutdown",
454                                 [self._InstDict(instance)])
455
456   def call_migration_info(self, node, instance):
457     """Gather the information necessary to prepare an instance migration.
458
459     This is a single-node call.
460
461     @type node: string
462     @param node: the node on which the instance is currently running
463     @type instance: C{objects.Instance}
464     @param instance: the instance definition
465
466     """
467     return self._SingleNodeCall(node, "migration_info",
468                                 [self._InstDict(instance)])
469
470   def call_accept_instance(self, node, instance, info, target):
471     """Prepare a node to accept an instance.
472
473     This is a single-node call.
474
475     @type node: string
476     @param node: the target node for the migration
477     @type instance: C{objects.Instance}
478     @param instance: the instance definition
479     @type info: opaque/hypervisor specific (string/data)
480     @param info: result for the call_migration_info call
481     @type target: string
482     @param target: target hostname (usually ip address) (on the node itself)
483
484     """
485     return self._SingleNodeCall(node, "accept_instance",
486                                 [self._InstDict(instance), info, target])
487
488   def call_finalize_migration(self, node, instance, info, success):
489     """Finalize any target-node migration specific operation.
490
491     This is called both in case of a successful migration and in case of error
492     (in which case it should abort the migration).
493
494     This is a single-node call.
495
496     @type node: string
497     @param node: the target node for the migration
498     @type instance: C{objects.Instance}
499     @param instance: the instance definition
500     @type info: opaque/hypervisor specific (string/data)
501     @param info: result for the call_migration_info call
502     @type success: boolean
503     @param success: whether the migration was a success or a failure
504
505     """
506     return self._SingleNodeCall(node, "finalize_migration",
507                                 [self._InstDict(instance), info, success])
508
509   def call_instance_migrate(self, node, instance, target, live):
510     """Migrate an instance.
511
512     This is a single-node call.
513
514     @type node: string
515     @param node: the node on which the instance is currently running
516     @type instance: C{objects.Instance}
517     @param instance: the instance definition
518     @type target: string
519     @param target: the target node name
520     @type live: boolean
521     @param live: whether the migration should be done live or not (the
522         interpretation of this parameter is left to the hypervisor)
523
524     """
525     return self._SingleNodeCall(node, "instance_migrate",
526                                 [self._InstDict(instance), target, live])
527
528   def call_instance_reboot(self, node, instance, reboot_type):
529     """Reboots an instance.
530
531     This is a single-node call.
532
533     """
534     return self._SingleNodeCall(node, "instance_reboot",
535                                 [self._InstDict(instance), reboot_type])
536
537   def call_instance_os_add(self, node, inst, reinstall):
538     """Installs an OS on the given instance.
539
540     This is a single-node call.
541
542     """
543     return self._SingleNodeCall(node, "instance_os_add",
544                                 [self._InstDict(inst), reinstall])
545
546   def call_instance_run_rename(self, node, inst, old_name):
547     """Run the OS rename script for an instance.
548
549     This is a single-node call.
550
551     """
552     return self._SingleNodeCall(node, "instance_run_rename",
553                                 [self._InstDict(inst), old_name])
554
555   def call_instance_info(self, node, instance, hname):
556     """Returns information about a single instance.
557
558     This is a single-node call.
559
560     @type node: list
561     @param node: the list of nodes to query
562     @type instance: string
563     @param instance: the instance name
564     @type hname: string
565     @param hname: the hypervisor type of the instance
566
567     """
568     return self._SingleNodeCall(node, "instance_info", [instance, hname])
569
570   def call_instance_migratable(self, node, instance):
571     """Checks whether the given instance can be migrated.
572
573     This is a single-node call.
574
575     @param node: the node to query
576     @type instance: L{objects.Instance}
577     @param instance: the instance to check
578
579
580     """
581     return self._SingleNodeCall(node, "instance_migratable",
582                                 [self._InstDict(instance)])
583
584   def call_all_instances_info(self, node_list, hypervisor_list):
585     """Returns information about all instances on the given nodes.
586
587     This is a multi-node call.
588
589     @type node_list: list
590     @param node_list: the list of nodes to query
591     @type hypervisor_list: list
592     @param hypervisor_list: the hypervisors to query for instances
593
594     """
595     return self._MultiNodeCall(node_list, "all_instances_info",
596                                [hypervisor_list])
597
598   def call_instance_list(self, node_list, hypervisor_list):
599     """Returns the list of running instances on a given node.
600
601     This is a multi-node call.
602
603     @type node_list: list
604     @param node_list: the list of nodes to query
605     @type hypervisor_list: list
606     @param hypervisor_list: the hypervisors to query for instances
607
608     """
609     return self._MultiNodeCall(node_list, "instance_list", [hypervisor_list])
610
611   def call_node_tcp_ping(self, node, source, target, port, timeout,
612                          live_port_needed):
613     """Do a TcpPing on the remote node
614
615     This is a single-node call.
616
617     """
618     return self._SingleNodeCall(node, "node_tcp_ping",
619                                 [source, target, port, timeout,
620                                  live_port_needed])
621
622   def call_node_has_ip_address(self, node, address):
623     """Checks if a node has the given IP address.
624
625     This is a single-node call.
626
627     """
628     return self._SingleNodeCall(node, "node_has_ip_address", [address])
629
630   def call_node_info(self, node_list, vg_name, hypervisor_type):
631     """Return node information.
632
633     This will return memory information and volume group size and free
634     space.
635
636     This is a multi-node call.
637
638     @type node_list: list
639     @param node_list: the list of nodes to query
640     @type vg_name: C{string}
641     @param vg_name: the name of the volume group to ask for disk space
642         information
643     @type hypervisor_type: C{str}
644     @param hypervisor_type: the name of the hypervisor to ask for
645         memory information
646
647     """
648     return self._MultiNodeCall(node_list, "node_info",
649                                [vg_name, hypervisor_type])
650
651   def call_node_add(self, node, dsa, dsapub, rsa, rsapub, ssh, sshpub):
652     """Add a node to the cluster.
653
654     This is a single-node call.
655
656     """
657     return self._SingleNodeCall(node, "node_add",
658                                 [dsa, dsapub, rsa, rsapub, ssh, sshpub])
659
660   def call_node_verify(self, node_list, checkdict, cluster_name):
661     """Request verification of given parameters.
662
663     This is a multi-node call.
664
665     """
666     return self._MultiNodeCall(node_list, "node_verify",
667                                [checkdict, cluster_name])
668
669   @classmethod
670   def call_node_start_master(cls, node, start_daemons):
671     """Tells a node to activate itself as a master.
672
673     This is a single-node call.
674
675     """
676     return cls._StaticSingleNodeCall(node, "node_start_master",
677                                      [start_daemons])
678
679   @classmethod
680   def call_node_stop_master(cls, node, stop_daemons):
681     """Tells a node to demote itself from master status.
682
683     This is a single-node call.
684
685     """
686     return cls._StaticSingleNodeCall(node, "node_stop_master", [stop_daemons])
687
688   @classmethod
689   def call_master_info(cls, node_list):
690     """Query master info.
691
692     This is a multi-node call.
693
694     """
695     # TODO: should this method query down nodes?
696     return cls._StaticMultiNodeCall(node_list, "master_info", [])
697
698   def call_version(self, node_list):
699     """Query node version.
700
701     This is a multi-node call.
702
703     """
704     return self._MultiNodeCall(node_list, "version", [])
705
706   def call_blockdev_create(self, node, bdev, size, owner, on_primary, info):
707     """Request creation of a given block device.
708
709     This is a single-node call.
710
711     """
712     return self._SingleNodeCall(node, "blockdev_create",
713                                 [bdev.ToDict(), size, owner, on_primary, info])
714
715   def call_blockdev_remove(self, node, bdev):
716     """Request removal of a given block device.
717
718     This is a single-node call.
719
720     """
721     return self._SingleNodeCall(node, "blockdev_remove", [bdev.ToDict()])
722
723   def call_blockdev_rename(self, node, devlist):
724     """Request rename of the given block devices.
725
726     This is a single-node call.
727
728     """
729     return self._SingleNodeCall(node, "blockdev_rename",
730                                 [(d.ToDict(), uid) for d, uid in devlist])
731
732   def call_blockdev_assemble(self, node, disk, owner, on_primary):
733     """Request assembling of a given block device.
734
735     This is a single-node call.
736
737     """
738     return self._SingleNodeCall(node, "blockdev_assemble",
739                                 [disk.ToDict(), owner, on_primary])
740
741   def call_blockdev_shutdown(self, node, disk):
742     """Request shutdown of a given block device.
743
744     This is a single-node call.
745
746     """
747     return self._SingleNodeCall(node, "blockdev_shutdown", [disk.ToDict()])
748
749   def call_blockdev_addchildren(self, node, bdev, ndevs):
750     """Request adding a list of children to a (mirroring) device.
751
752     This is a single-node call.
753
754     """
755     return self._SingleNodeCall(node, "blockdev_addchildren",
756                                 [bdev.ToDict(),
757                                  [disk.ToDict() for disk in ndevs]])
758
759   def call_blockdev_removechildren(self, node, bdev, ndevs):
760     """Request removing a list of children from a (mirroring) device.
761
762     This is a single-node call.
763
764     """
765     return self._SingleNodeCall(node, "blockdev_removechildren",
766                                 [bdev.ToDict(),
767                                  [disk.ToDict() for disk in ndevs]])
768
769   def call_blockdev_getmirrorstatus(self, node, disks):
770     """Request status of a (mirroring) device.
771
772     This is a single-node call.
773
774     """
775     return self._SingleNodeCall(node, "blockdev_getmirrorstatus",
776                                 [dsk.ToDict() for dsk in disks])
777
778   def call_blockdev_find(self, node, disk):
779     """Request identification of a given block device.
780
781     This is a single-node call.
782
783     """
784     return self._SingleNodeCall(node, "blockdev_find", [disk.ToDict()])
785
786   def call_blockdev_close(self, node, instance_name, disks):
787     """Closes the given block devices.
788
789     This is a single-node call.
790
791     """
792     params = [instance_name, [cf.ToDict() for cf in disks]]
793     return self._SingleNodeCall(node, "blockdev_close", params)
794
795   def call_drbd_disconnect_net(self, node_list, nodes_ip, disks):
796     """Disconnects the network of the given drbd devices.
797
798     This is a multi-node call.
799
800     """
801     return self._MultiNodeCall(node_list, "drbd_disconnect_net",
802                                [nodes_ip, [cf.ToDict() for cf in disks]])
803
804   def call_drbd_attach_net(self, node_list, nodes_ip,
805                            disks, instance_name, multimaster):
806     """Disconnects the given drbd devices.
807
808     This is a multi-node call.
809
810     """
811     return self._MultiNodeCall(node_list, "drbd_attach_net",
812                                [nodes_ip, [cf.ToDict() for cf in disks],
813                                 instance_name, multimaster])
814
815   def call_drbd_wait_sync(self, node_list, nodes_ip, disks):
816     """Waits for the synchronization of drbd devices is complete.
817
818     This is a multi-node call.
819
820     """
821     return self._MultiNodeCall(node_list, "drbd_wait_sync",
822                                [nodes_ip, [cf.ToDict() for cf in disks]])
823
824   @classmethod
825   def call_upload_file(cls, node_list, file_name, address_list=None):
826     """Upload a file.
827
828     The node will refuse the operation in case the file is not on the
829     approved file list.
830
831     This is a multi-node call.
832
833     @type node_list: list
834     @param node_list: the list of node names to upload to
835     @type file_name: str
836     @param file_name: the filename to upload
837     @type address_list: list or None
838     @keyword address_list: an optional list of node addresses, in order
839         to optimize the RPC speed
840
841     """
842     file_contents = utils.ReadFile(file_name)
843     data = cls._Compress(file_contents)
844     st = os.stat(file_name)
845     params = [file_name, data, st.st_mode, st.st_uid, st.st_gid,
846               st.st_atime, st.st_mtime]
847     return cls._StaticMultiNodeCall(node_list, "upload_file", params,
848                                     address_list=address_list)
849
850   @classmethod
851   def call_write_ssconf_files(cls, node_list, values):
852     """Write ssconf files.
853
854     This is a multi-node call.
855
856     """
857     return cls._StaticMultiNodeCall(node_list, "write_ssconf_files", [values])
858
859   def call_os_diagnose(self, node_list):
860     """Request a diagnose of OS definitions.
861
862     This is a multi-node call.
863
864     """
865     return self._MultiNodeCall(node_list, "os_diagnose", [])
866
867   def call_os_get(self, node, name):
868     """Returns an OS definition.
869
870     This is a single-node call.
871
872     """
873     result = self._SingleNodeCall(node, "os_get", [name])
874     if not result.failed and isinstance(result.data, dict):
875       result.data = objects.OS.FromDict(result.data)
876     return result
877
878   def call_hooks_runner(self, node_list, hpath, phase, env):
879     """Call the hooks runner.
880
881     Args:
882       - op: the OpCode instance
883       - env: a dictionary with the environment
884
885     This is a multi-node call.
886
887     """
888     params = [hpath, phase, env]
889     return self._MultiNodeCall(node_list, "hooks_runner", params)
890
891   def call_iallocator_runner(self, node, name, idata):
892     """Call an iallocator on a remote node
893
894     Args:
895       - name: the iallocator name
896       - input: the json-encoded input string
897
898     This is a single-node call.
899
900     """
901     return self._SingleNodeCall(node, "iallocator_runner", [name, idata])
902
903   def call_blockdev_grow(self, node, cf_bdev, amount):
904     """Request a snapshot of the given block device.
905
906     This is a single-node call.
907
908     """
909     return self._SingleNodeCall(node, "blockdev_grow",
910                                 [cf_bdev.ToDict(), amount])
911
912   def call_blockdev_snapshot(self, node, cf_bdev):
913     """Request a snapshot of the given block device.
914
915     This is a single-node call.
916
917     """
918     return self._SingleNodeCall(node, "blockdev_snapshot", [cf_bdev.ToDict()])
919
920   def call_snapshot_export(self, node, snap_bdev, dest_node, instance,
921                            cluster_name, idx):
922     """Request the export of a given snapshot.
923
924     This is a single-node call.
925
926     """
927     return self._SingleNodeCall(node, "snapshot_export",
928                                 [snap_bdev.ToDict(), dest_node,
929                                  self._InstDict(instance), cluster_name, idx])
930
931   def call_finalize_export(self, node, instance, snap_disks):
932     """Request the completion of an export operation.
933
934     This writes the export config file, etc.
935
936     This is a single-node call.
937
938     """
939     flat_disks = []
940     for disk in snap_disks:
941       flat_disks.append(disk.ToDict())
942
943     return self._SingleNodeCall(node, "finalize_export",
944                                 [self._InstDict(instance), flat_disks])
945
946   def call_export_info(self, node, path):
947     """Queries the export information in a given path.
948
949     This is a single-node call.
950
951     """
952     return self._SingleNodeCall(node, "export_info", [path])
953
954   def call_instance_os_import(self, node, inst, src_node, src_images,
955                               cluster_name):
956     """Request the import of a backup into an instance.
957
958     This is a single-node call.
959
960     """
961     return self._SingleNodeCall(node, "instance_os_import",
962                                 [self._InstDict(inst), src_node, src_images,
963                                  cluster_name])
964
965   def call_export_list(self, node_list):
966     """Gets the stored exports list.
967
968     This is a multi-node call.
969
970     """
971     return self._MultiNodeCall(node_list, "export_list", [])
972
973   def call_export_remove(self, node, export):
974     """Requests removal of a given export.
975
976     This is a single-node call.
977
978     """
979     return self._SingleNodeCall(node, "export_remove", [export])
980
981   @classmethod
982   def call_node_leave_cluster(cls, node):
983     """Requests a node to clean the cluster information it has.
984
985     This will remove the configuration information from the ganeti data
986     dir.
987
988     This is a single-node call.
989
990     """
991     return cls._StaticSingleNodeCall(node, "node_leave_cluster", [])
992
993   def call_node_volumes(self, node_list):
994     """Gets all volumes on node(s).
995
996     This is a multi-node call.
997
998     """
999     return self._MultiNodeCall(node_list, "node_volumes", [])
1000
1001   def call_node_demote_from_mc(self, node):
1002     """Demote a node from the master candidate role.
1003
1004     This is a single-node call.
1005
1006     """
1007     return self._SingleNodeCall(node, "node_demote_from_mc", [])
1008
1009
1010   def call_node_powercycle(self, node, hypervisor):
1011     """Tries to powercycle a node.
1012
1013     This is a single-node call.
1014
1015     """
1016     return self._SingleNodeCall(node, "node_powercycle", [hypervisor])
1017
1018
1019   def call_test_delay(self, node_list, duration):
1020     """Sleep for a fixed time on given node(s).
1021
1022     This is a multi-node call.
1023
1024     """
1025     return self._MultiNodeCall(node_list, "test_delay", [duration])
1026
1027   def call_file_storage_dir_create(self, node, file_storage_dir):
1028     """Create the given file storage directory.
1029
1030     This is a single-node call.
1031
1032     """
1033     return self._SingleNodeCall(node, "file_storage_dir_create",
1034                                 [file_storage_dir])
1035
1036   def call_file_storage_dir_remove(self, node, file_storage_dir):
1037     """Remove the given file storage directory.
1038
1039     This is a single-node call.
1040
1041     """
1042     return self._SingleNodeCall(node, "file_storage_dir_remove",
1043                                 [file_storage_dir])
1044
1045   def call_file_storage_dir_rename(self, node, old_file_storage_dir,
1046                                    new_file_storage_dir):
1047     """Rename file storage directory.
1048
1049     This is a single-node call.
1050
1051     """
1052     return self._SingleNodeCall(node, "file_storage_dir_rename",
1053                                 [old_file_storage_dir, new_file_storage_dir])
1054
1055   @classmethod
1056   def call_jobqueue_update(cls, node_list, address_list, file_name, content):
1057     """Update job queue.
1058
1059     This is a multi-node call.
1060
1061     """
1062     return cls._StaticMultiNodeCall(node_list, "jobqueue_update",
1063                                     [file_name, cls._Compress(content)],
1064                                     address_list=address_list)
1065
1066   @classmethod
1067   def call_jobqueue_purge(cls, node):
1068     """Purge job queue.
1069
1070     This is a single-node call.
1071
1072     """
1073     return cls._StaticSingleNodeCall(node, "jobqueue_purge", [])
1074
1075   @classmethod
1076   def call_jobqueue_rename(cls, node_list, address_list, rename):
1077     """Rename a job queue file.
1078
1079     This is a multi-node call.
1080
1081     """
1082     return cls._StaticMultiNodeCall(node_list, "jobqueue_rename", rename,
1083                                     address_list=address_list)
1084
1085   @classmethod
1086   def call_jobqueue_set_drain(cls, node_list, drain_flag):
1087     """Set the drain flag on the queue.
1088
1089     This is a multi-node call.
1090
1091     @type node_list: list
1092     @param node_list: the list of nodes to query
1093     @type drain_flag: bool
1094     @param drain_flag: if True, will set the drain flag, otherwise reset it.
1095
1096     """
1097     return cls._StaticMultiNodeCall(node_list, "jobqueue_set_drain",
1098                                     [drain_flag])
1099
1100   def call_hypervisor_validate_params(self, node_list, hvname, hvparams):
1101     """Validate the hypervisor params.
1102
1103     This is a multi-node call.
1104
1105     @type node_list: list
1106     @param node_list: the list of nodes to query
1107     @type hvname: string
1108     @param hvname: the hypervisor name
1109     @type hvparams: dict
1110     @param hvparams: the hypervisor parameters to be validated
1111
1112     """
1113     cluster = self._cfg.GetClusterInfo()
1114     hv_full = objects.FillDict(cluster.hvparams.get(hvname, {}), hvparams)
1115     return self._MultiNodeCall(node_list, "hypervisor_validate_params",
1116                                [hvname, hv_full])