Automatically cleanup _temporary_ids at save
[ganeti-local] / lib / rpc.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Inter-node RPC library.
23
24 """
25
26 # pylint: disable-msg=C0103,R0201,R0904
27 # C0103: Invalid name, since call_ are not valid
28 # R0201: Method could be a function, we keep all rpcs instance methods
29 # as not to change them back and forth between static/instance methods
30 # if they need to start using instance attributes
31 # R0904: Too many public methods
32
33 import os
34 import logging
35 import zlib
36 import base64
37
38 from ganeti import utils
39 from ganeti import objects
40 from ganeti import http
41 from ganeti import serializer
42 from ganeti import constants
43 from ganeti import errors
44
45 import ganeti.http.client
46
47
48 # Module level variable
49 _http_manager = None
50
51
52 def Init():
53   """Initializes the module-global HTTP client manager.
54
55   Must be called before using any RPC function.
56
57   """
58   global _http_manager
59
60   assert not _http_manager, "RPC module initialized more than once"
61
62   _http_manager = http.client.HttpClientManager()
63
64
65 def Shutdown():
66   """Stops the module-global HTTP client manager.
67
68   Must be called before quitting the program.
69
70   """
71   global _http_manager
72
73   if _http_manager:
74     _http_manager.Shutdown()
75     _http_manager = None
76
77
78 class RpcResult(object):
79   """RPC Result class.
80
81   This class holds an RPC result. It is needed since in multi-node
82   calls we can't raise an exception just because one one out of many
83   failed, and therefore we use this class to encapsulate the result.
84
85   @ivar data: the data payload, for successful results, or None
86   @type failed: boolean
87   @ivar failed: whether the operation failed at RPC level (not
88       application level on the remote node)
89   @ivar call: the name of the RPC call
90   @ivar node: the name of the node to which we made the call
91   @ivar offline: whether the operation failed because the node was
92       offline, as opposed to actual failure; offline=True will always
93       imply failed=True, in order to allow simpler checking if
94       the user doesn't care about the exact failure mode
95
96   """
97   def __init__(self, data=None, failed=False, offline=False,
98                call=None, node=None):
99     self.failed = failed
100     self.offline = offline
101     self.call = call
102     self.node = node
103     if offline:
104       self.failed = True
105       self.error = "Node is marked offline"
106       self.data = self.payload = None
107     elif failed:
108       self.error = data
109       self.data = self.payload = None
110     else:
111       self.data = data
112       self.error = None
113       if isinstance(data, (tuple, list)) and len(data) == 2:
114         self.payload = data[1]
115       else:
116         self.payload = None
117
118   def Raise(self):
119     """If the result has failed, raise an OpExecError.
120
121     This is used so that LU code doesn't have to check for each
122     result, but instead can call this function.
123
124     """
125     if self.failed:
126       raise errors.OpExecError("Call '%s' to node '%s' has failed: %s" %
127                                (self.call, self.node, self.error))
128
129   def RemoteFailMsg(self):
130     """Check if the remote procedure failed.
131
132     This is valid only for RPC calls which return result of the form
133     (status, data | error_msg).
134
135     @return: empty string for succcess, otherwise an error message
136
137     """
138     def _EnsureErr(val):
139       """Helper to ensure we return a 'True' value for error."""
140       if val:
141         return val
142       else:
143         return "No error information"
144
145     if self.failed:
146       return _EnsureErr(self.error)
147     if not isinstance(self.data, (tuple, list)):
148       return "Invalid result type (%s)" % type(self.data)
149     if len(self.data) != 2:
150       return "Invalid result length (%d), expected 2" % len(self.data)
151     if not self.data[0]:
152       return _EnsureErr(self.data[1])
153     return ""
154
155
156 class Client:
157   """RPC Client class.
158
159   This class, given a (remote) method name, a list of parameters and a
160   list of nodes, will contact (in parallel) all nodes, and return a
161   dict of results (key: node name, value: result).
162
163   One current bug is that generic failure is still signaled by
164   'False' result, which is not good. This overloading of values can
165   cause bugs.
166
167   """
168   def __init__(self, procedure, body, port):
169     self.procedure = procedure
170     self.body = body
171     self.port = port
172     self.nc = {}
173
174     self._ssl_params = \
175       http.HttpSslParams(ssl_key_path=constants.SSL_CERT_FILE,
176                          ssl_cert_path=constants.SSL_CERT_FILE)
177
178   def ConnectList(self, node_list, address_list=None):
179     """Add a list of nodes to the target nodes.
180
181     @type node_list: list
182     @param node_list: the list of node names to connect
183     @type address_list: list or None
184     @keyword address_list: either None or a list with node addresses,
185         which must have the same length as the node list
186
187     """
188     if address_list is None:
189       address_list = [None for _ in node_list]
190     else:
191       assert len(node_list) == len(address_list), \
192              "Name and address lists should have the same length"
193     for node, address in zip(node_list, address_list):
194       self.ConnectNode(node, address)
195
196   def ConnectNode(self, name, address=None):
197     """Add a node to the target list.
198
199     @type name: str
200     @param name: the node name
201     @type address: str
202     @keyword address: the node address, if known
203
204     """
205     if address is None:
206       address = name
207
208     self.nc[name] = \
209       http.client.HttpClientRequest(address, self.port, http.HTTP_PUT,
210                                     "/%s" % self.procedure,
211                                     post_data=self.body,
212                                     ssl_params=self._ssl_params,
213                                     ssl_verify_peer=True)
214
215   def GetResults(self):
216     """Call nodes and return results.
217
218     @rtype: list
219     @return: List of RPC results
220
221     """
222     assert _http_manager, "RPC module not initialized"
223
224     _http_manager.ExecRequests(self.nc.values())
225
226     results = {}
227
228     for name, req in self.nc.iteritems():
229       if req.success and req.resp_status_code == http.HTTP_OK:
230         results[name] = RpcResult(data=serializer.LoadJson(req.resp_body),
231                                   node=name, call=self.procedure)
232         continue
233
234       # TODO: Better error reporting
235       if req.error:
236         msg = req.error
237       else:
238         msg = req.resp_body
239
240       logging.error("RPC error in %s from node %s: %s",
241                     self.procedure, name, msg)
242       results[name] = RpcResult(data=msg, failed=True, node=name,
243                                 call=self.procedure)
244
245     return results
246
247
248 class RpcRunner(object):
249   """RPC runner class"""
250
251   def __init__(self, cfg):
252     """Initialized the rpc runner.
253
254     @type cfg:  C{config.ConfigWriter}
255     @param cfg: the configuration object that will be used to get data
256                 about the cluster
257
258     """
259     self._cfg = cfg
260     self.port = utils.GetNodeDaemonPort()
261
262   def _InstDict(self, instance, hvp=None, bep=None):
263     """Convert the given instance to a dict.
264
265     This is done via the instance's ToDict() method and additionally
266     we fill the hvparams with the cluster defaults.
267
268     @type instance: L{objects.Instance}
269     @param instance: an Instance object
270     @type hvp: dict or None
271     @param hvp: a dictionary with overridden hypervisor parameters
272     @type bep: dict or None
273     @param bep: a dictionary with overridden backend parameters
274     @rtype: dict
275     @return: the instance dict, with the hvparams filled with the
276         cluster defaults
277
278     """
279     idict = instance.ToDict()
280     cluster = self._cfg.GetClusterInfo()
281     idict["hvparams"] = cluster.FillHV(instance)
282     if hvp is not None:
283       idict["hvparams"].update(hvp)
284     idict["beparams"] = cluster.FillBE(instance)
285     if bep is not None:
286       idict["beparams"].update(bep)
287     return idict
288
289   def _ConnectList(self, client, node_list, call):
290     """Helper for computing node addresses.
291
292     @type client: L{ganeti.rpc.Client}
293     @param client: a C{Client} instance
294     @type node_list: list
295     @param node_list: the node list we should connect
296     @type call: string
297     @param call: the name of the remote procedure call, for filling in
298         correctly any eventual offline nodes' results
299
300     """
301     all_nodes = self._cfg.GetAllNodesInfo()
302     name_list = []
303     addr_list = []
304     skip_dict = {}
305     for node in node_list:
306       if node in all_nodes:
307         if all_nodes[node].offline:
308           skip_dict[node] = RpcResult(node=node, offline=True, call=call)
309           continue
310         val = all_nodes[node].primary_ip
311       else:
312         val = None
313       addr_list.append(val)
314       name_list.append(node)
315     if name_list:
316       client.ConnectList(name_list, address_list=addr_list)
317     return skip_dict
318
319   def _ConnectNode(self, client, node, call):
320     """Helper for computing one node's address.
321
322     @type client: L{ganeti.rpc.Client}
323     @param client: a C{Client} instance
324     @type node: str
325     @param node: the node we should connect
326     @type call: string
327     @param call: the name of the remote procedure call, for filling in
328         correctly any eventual offline nodes' results
329
330     """
331     node_info = self._cfg.GetNodeInfo(node)
332     if node_info is not None:
333       if node_info.offline:
334         return RpcResult(node=node, offline=True, call=call)
335       addr = node_info.primary_ip
336     else:
337       addr = None
338     client.ConnectNode(node, address=addr)
339
340   def _MultiNodeCall(self, node_list, procedure, args):
341     """Helper for making a multi-node call
342
343     """
344     body = serializer.DumpJson(args, indent=False)
345     c = Client(procedure, body, self.port)
346     skip_dict = self._ConnectList(c, node_list, procedure)
347     skip_dict.update(c.GetResults())
348     return skip_dict
349
350   @classmethod
351   def _StaticMultiNodeCall(cls, node_list, procedure, args,
352                            address_list=None):
353     """Helper for making a multi-node static call
354
355     """
356     body = serializer.DumpJson(args, indent=False)
357     c = Client(procedure, body, utils.GetNodeDaemonPort())
358     c.ConnectList(node_list, address_list=address_list)
359     return c.GetResults()
360
361   def _SingleNodeCall(self, node, procedure, args):
362     """Helper for making a single-node call
363
364     """
365     body = serializer.DumpJson(args, indent=False)
366     c = Client(procedure, body, self.port)
367     result = self._ConnectNode(c, node, procedure)
368     if result is None:
369       # we did connect, node is not offline
370       result = c.GetResults()[node]
371     return result
372
373   @classmethod
374   def _StaticSingleNodeCall(cls, node, procedure, args):
375     """Helper for making a single-node static call
376
377     """
378     body = serializer.DumpJson(args, indent=False)
379     c = Client(procedure, body, utils.GetNodeDaemonPort())
380     c.ConnectNode(node)
381     return c.GetResults()[node]
382
383   @staticmethod
384   def _Compress(data):
385     """Compresses a string for transport over RPC.
386
387     Small amounts of data are not compressed.
388
389     @type data: str
390     @param data: Data
391     @rtype: tuple
392     @return: Encoded data to send
393
394     """
395     # Small amounts of data are not compressed
396     if len(data) < 512:
397       return (constants.RPC_ENCODING_NONE, data)
398
399     # Compress with zlib and encode in base64
400     return (constants.RPC_ENCODING_ZLIB_BASE64,
401             base64.b64encode(zlib.compress(data, 3)))
402
403   #
404   # Begin RPC calls
405   #
406
407   def call_volume_list(self, node_list, vg_name):
408     """Gets the logical volumes present in a given volume group.
409
410     This is a multi-node call.
411
412     """
413     return self._MultiNodeCall(node_list, "volume_list", [vg_name])
414
415   def call_vg_list(self, node_list):
416     """Gets the volume group list.
417
418     This is a multi-node call.
419
420     """
421     return self._MultiNodeCall(node_list, "vg_list", [])
422
423   def call_bridges_exist(self, node, bridges_list):
424     """Checks if a node has all the bridges given.
425
426     This method checks if all bridges given in the bridges_list are
427     present on the remote node, so that an instance that uses interfaces
428     on those bridges can be started.
429
430     This is a single-node call.
431
432     """
433     return self._SingleNodeCall(node, "bridges_exist", [bridges_list])
434
435   def call_instance_start(self, node, instance, hvp, bep):
436     """Starts an instance.
437
438     This is a single-node call.
439
440     """
441     idict = self._InstDict(instance, hvp=hvp, bep=bep)
442     return self._SingleNodeCall(node, "instance_start", [idict])
443
444   def call_instance_shutdown(self, node, instance):
445     """Stops an instance.
446
447     This is a single-node call.
448
449     """
450     return self._SingleNodeCall(node, "instance_shutdown",
451                                 [self._InstDict(instance)])
452
453   def call_migration_info(self, node, instance):
454     """Gather the information necessary to prepare an instance migration.
455
456     This is a single-node call.
457
458     @type node: string
459     @param node: the node on which the instance is currently running
460     @type instance: C{objects.Instance}
461     @param instance: the instance definition
462
463     """
464     return self._SingleNodeCall(node, "migration_info",
465                                 [self._InstDict(instance)])
466
467   def call_accept_instance(self, node, instance, info, target):
468     """Prepare a node to accept an instance.
469
470     This is a single-node call.
471
472     @type node: string
473     @param node: the target node for the migration
474     @type instance: C{objects.Instance}
475     @param instance: the instance definition
476     @type info: opaque/hypervisor specific (string/data)
477     @param info: result for the call_migration_info call
478     @type target: string
479     @param target: target hostname (usually ip address) (on the node itself)
480
481     """
482     return self._SingleNodeCall(node, "accept_instance",
483                                 [self._InstDict(instance), info, target])
484
485   def call_finalize_migration(self, node, instance, info, success):
486     """Finalize any target-node migration specific operation.
487
488     This is called both in case of a successful migration and in case of error
489     (in which case it should abort the migration).
490
491     This is a single-node call.
492
493     @type node: string
494     @param node: the target node for the migration
495     @type instance: C{objects.Instance}
496     @param instance: the instance definition
497     @type info: opaque/hypervisor specific (string/data)
498     @param info: result for the call_migration_info call
499     @type success: boolean
500     @param success: whether the migration was a success or a failure
501
502     """
503     return self._SingleNodeCall(node, "finalize_migration",
504                                 [self._InstDict(instance), info, success])
505
506   def call_instance_migrate(self, node, instance, target, live):
507     """Migrate an instance.
508
509     This is a single-node call.
510
511     @type node: string
512     @param node: the node on which the instance is currently running
513     @type instance: C{objects.Instance}
514     @param instance: the instance definition
515     @type target: string
516     @param target: the target node name
517     @type live: boolean
518     @param live: whether the migration should be done live or not (the
519         interpretation of this parameter is left to the hypervisor)
520
521     """
522     return self._SingleNodeCall(node, "instance_migrate",
523                                 [self._InstDict(instance), target, live])
524
525   def call_instance_reboot(self, node, instance, reboot_type):
526     """Reboots an instance.
527
528     This is a single-node call.
529
530     """
531     return self._SingleNodeCall(node, "instance_reboot",
532                                 [self._InstDict(instance), reboot_type])
533
534   def call_instance_os_add(self, node, inst):
535     """Installs an OS on the given instance.
536
537     This is a single-node call.
538
539     """
540     return self._SingleNodeCall(node, "instance_os_add",
541                                 [self._InstDict(inst)])
542
543   def call_instance_run_rename(self, node, inst, old_name):
544     """Run the OS rename script for an instance.
545
546     This is a single-node call.
547
548     """
549     return self._SingleNodeCall(node, "instance_run_rename",
550                                 [self._InstDict(inst), old_name])
551
552   def call_instance_info(self, node, instance, hname):
553     """Returns information about a single instance.
554
555     This is a single-node call.
556
557     @type node: list
558     @param node: the list of nodes to query
559     @type instance: string
560     @param instance: the instance name
561     @type hname: string
562     @param hname: the hypervisor type of the instance
563
564     """
565     return self._SingleNodeCall(node, "instance_info", [instance, hname])
566
567   def call_instance_migratable(self, node, instance):
568     """Checks whether the given instance can be migrated.
569
570     This is a single-node call.
571
572     @param node: the node to query
573     @type instance: L{objects.Instance}
574     @param instance: the instance to check
575
576
577     """
578     return self._SingleNodeCall(node, "instance_migratable",
579                                 [self._InstDict(instance)])
580
581   def call_all_instances_info(self, node_list, hypervisor_list):
582     """Returns information about all instances on the given nodes.
583
584     This is a multi-node call.
585
586     @type node_list: list
587     @param node_list: the list of nodes to query
588     @type hypervisor_list: list
589     @param hypervisor_list: the hypervisors to query for instances
590
591     """
592     return self._MultiNodeCall(node_list, "all_instances_info",
593                                [hypervisor_list])
594
595   def call_instance_list(self, node_list, hypervisor_list):
596     """Returns the list of running instances on a given node.
597
598     This is a multi-node call.
599
600     @type node_list: list
601     @param node_list: the list of nodes to query
602     @type hypervisor_list: list
603     @param hypervisor_list: the hypervisors to query for instances
604
605     """
606     return self._MultiNodeCall(node_list, "instance_list", [hypervisor_list])
607
608   def call_node_tcp_ping(self, node, source, target, port, timeout,
609                          live_port_needed):
610     """Do a TcpPing on the remote node
611
612     This is a single-node call.
613
614     """
615     return self._SingleNodeCall(node, "node_tcp_ping",
616                                 [source, target, port, timeout,
617                                  live_port_needed])
618
619   def call_node_has_ip_address(self, node, address):
620     """Checks if a node has the given IP address.
621
622     This is a single-node call.
623
624     """
625     return self._SingleNodeCall(node, "node_has_ip_address", [address])
626
627   def call_node_info(self, node_list, vg_name, hypervisor_type):
628     """Return node information.
629
630     This will return memory information and volume group size and free
631     space.
632
633     This is a multi-node call.
634
635     @type node_list: list
636     @param node_list: the list of nodes to query
637     @type vg_name: C{string}
638     @param vg_name: the name of the volume group to ask for disk space
639         information
640     @type hypervisor_type: C{str}
641     @param hypervisor_type: the name of the hypervisor to ask for
642         memory information
643
644     """
645     retux = self._MultiNodeCall(node_list, "node_info",
646                                 [vg_name, hypervisor_type])
647
648     for result in retux.itervalues():
649       if result.failed or not isinstance(result.data, dict):
650         result.data = {}
651       if result.offline:
652         log_name = None
653       else:
654         log_name = "call_node_info"
655
656       utils.CheckDict(result.data, {
657         'memory_total' : '-',
658         'memory_dom0' : '-',
659         'memory_free' : '-',
660         'vg_size' : 'node_unreachable',
661         'vg_free' : '-',
662         }, log_name)
663     return retux
664
665   def call_node_add(self, node, dsa, dsapub, rsa, rsapub, ssh, sshpub):
666     """Add a node to the cluster.
667
668     This is a single-node call.
669
670     """
671     return self._SingleNodeCall(node, "node_add",
672                                 [dsa, dsapub, rsa, rsapub, ssh, sshpub])
673
674   def call_node_verify(self, node_list, checkdict, cluster_name):
675     """Request verification of given parameters.
676
677     This is a multi-node call.
678
679     """
680     return self._MultiNodeCall(node_list, "node_verify",
681                                [checkdict, cluster_name])
682
683   @classmethod
684   def call_node_start_master(cls, node, start_daemons, no_voting):
685     """Tells a node to activate itself as a master.
686
687     This is a single-node call.
688
689     """
690     return cls._StaticSingleNodeCall(node, "node_start_master",
691                                      [start_daemons, no_voting])
692
693   @classmethod
694   def call_node_stop_master(cls, node, stop_daemons):
695     """Tells a node to demote itself from master status.
696
697     This is a single-node call.
698
699     """
700     return cls._StaticSingleNodeCall(node, "node_stop_master", [stop_daemons])
701
702   @classmethod
703   def call_master_info(cls, node_list):
704     """Query master info.
705
706     This is a multi-node call.
707
708     """
709     # TODO: should this method query down nodes?
710     return cls._StaticMultiNodeCall(node_list, "master_info", [])
711
712   def call_version(self, node_list):
713     """Query node version.
714
715     This is a multi-node call.
716
717     """
718     return self._MultiNodeCall(node_list, "version", [])
719
720   def call_blockdev_create(self, node, bdev, size, owner, on_primary, info):
721     """Request creation of a given block device.
722
723     This is a single-node call.
724
725     """
726     return self._SingleNodeCall(node, "blockdev_create",
727                                 [bdev.ToDict(), size, owner, on_primary, info])
728
729   def call_blockdev_remove(self, node, bdev):
730     """Request removal of a given block device.
731
732     This is a single-node call.
733
734     """
735     return self._SingleNodeCall(node, "blockdev_remove", [bdev.ToDict()])
736
737   def call_blockdev_rename(self, node, devlist):
738     """Request rename of the given block devices.
739
740     This is a single-node call.
741
742     """
743     return self._SingleNodeCall(node, "blockdev_rename",
744                                 [(d.ToDict(), uid) for d, uid in devlist])
745
746   def call_blockdev_assemble(self, node, disk, owner, on_primary):
747     """Request assembling of a given block device.
748
749     This is a single-node call.
750
751     """
752     return self._SingleNodeCall(node, "blockdev_assemble",
753                                 [disk.ToDict(), owner, on_primary])
754
755   def call_blockdev_shutdown(self, node, disk):
756     """Request shutdown of a given block device.
757
758     This is a single-node call.
759
760     """
761     return self._SingleNodeCall(node, "blockdev_shutdown", [disk.ToDict()])
762
763   def call_blockdev_addchildren(self, node, bdev, ndevs):
764     """Request adding a list of children to a (mirroring) device.
765
766     This is a single-node call.
767
768     """
769     return self._SingleNodeCall(node, "blockdev_addchildren",
770                                 [bdev.ToDict(),
771                                  [disk.ToDict() for disk in ndevs]])
772
773   def call_blockdev_removechildren(self, node, bdev, ndevs):
774     """Request removing a list of children from a (mirroring) device.
775
776     This is a single-node call.
777
778     """
779     return self._SingleNodeCall(node, "blockdev_removechildren",
780                                 [bdev.ToDict(),
781                                  [disk.ToDict() for disk in ndevs]])
782
783   def call_blockdev_getmirrorstatus(self, node, disks):
784     """Request status of a (mirroring) device.
785
786     This is a single-node call.
787
788     """
789     return self._SingleNodeCall(node, "blockdev_getmirrorstatus",
790                                 [dsk.ToDict() for dsk in disks])
791
792   def call_blockdev_find(self, node, disk):
793     """Request identification of a given block device.
794
795     This is a single-node call.
796
797     """
798     return self._SingleNodeCall(node, "blockdev_find", [disk.ToDict()])
799
800   def call_blockdev_close(self, node, instance_name, disks):
801     """Closes the given block devices.
802
803     This is a single-node call.
804
805     """
806     params = [instance_name, [cf.ToDict() for cf in disks]]
807     return self._SingleNodeCall(node, "blockdev_close", params)
808
809   def call_blockdev_getsizes(self, node, disks):
810     """Returns the size of the given disks.
811
812     This is a single-node call.
813
814     """
815     params = [[cf.ToDict() for cf in disks]]
816     return self._SingleNodeCall(node, "blockdev_getsize", params)
817
818   def call_drbd_disconnect_net(self, node_list, nodes_ip, disks):
819     """Disconnects the network of the given drbd devices.
820
821     This is a multi-node call.
822
823     """
824     return self._MultiNodeCall(node_list, "drbd_disconnect_net",
825                                [nodes_ip, [cf.ToDict() for cf in disks]])
826
827   def call_drbd_attach_net(self, node_list, nodes_ip,
828                            disks, instance_name, multimaster):
829     """Disconnects the given drbd devices.
830
831     This is a multi-node call.
832
833     """
834     return self._MultiNodeCall(node_list, "drbd_attach_net",
835                                [nodes_ip, [cf.ToDict() for cf in disks],
836                                 instance_name, multimaster])
837
838   def call_drbd_wait_sync(self, node_list, nodes_ip, disks):
839     """Waits for the synchronization of drbd devices is complete.
840
841     This is a multi-node call.
842
843     """
844     return self._MultiNodeCall(node_list, "drbd_wait_sync",
845                                [nodes_ip, [cf.ToDict() for cf in disks]])
846
847   @classmethod
848   def call_upload_file(cls, node_list, file_name, address_list=None):
849     """Upload a file.
850
851     The node will refuse the operation in case the file is not on the
852     approved file list.
853
854     This is a multi-node call.
855
856     @type node_list: list
857     @param node_list: the list of node names to upload to
858     @type file_name: str
859     @param file_name: the filename to upload
860     @type address_list: list or None
861     @keyword address_list: an optional list of node addresses, in order
862         to optimize the RPC speed
863
864     """
865     file_contents = utils.ReadFile(file_name)
866     data = cls._Compress(file_contents)
867     st = os.stat(file_name)
868     params = [file_name, data, st.st_mode, st.st_uid, st.st_gid,
869               st.st_atime, st.st_mtime]
870     return cls._StaticMultiNodeCall(node_list, "upload_file", params,
871                                     address_list=address_list)
872
873   @classmethod
874   def call_write_ssconf_files(cls, node_list, values):
875     """Write ssconf files.
876
877     This is a multi-node call.
878
879     """
880     return cls._StaticMultiNodeCall(node_list, "write_ssconf_files", [values])
881
882   def call_os_diagnose(self, node_list):
883     """Request a diagnose of OS definitions.
884
885     This is a multi-node call.
886
887     """
888     result = self._MultiNodeCall(node_list, "os_diagnose", [])
889
890     for node_result in result.values():
891       if not node_result.failed and node_result.data:
892         node_result.data = [objects.OS.FromDict(oss)
893                             for oss in node_result.data]
894     return result
895
896   def call_os_get(self, node, name):
897     """Returns an OS definition.
898
899     This is a single-node call.
900
901     """
902     result = self._SingleNodeCall(node, "os_get", [name])
903     if not result.failed and isinstance(result.data, dict):
904       result.data = objects.OS.FromDict(result.data)
905     return result
906
907   def call_hooks_runner(self, node_list, hpath, phase, env):
908     """Call the hooks runner.
909
910     Args:
911       - op: the OpCode instance
912       - env: a dictionary with the environment
913
914     This is a multi-node call.
915
916     """
917     params = [hpath, phase, env]
918     return self._MultiNodeCall(node_list, "hooks_runner", params)
919
920   def call_iallocator_runner(self, node, name, idata):
921     """Call an iallocator on a remote node
922
923     Args:
924       - name: the iallocator name
925       - input: the json-encoded input string
926
927     This is a single-node call.
928
929     """
930     return self._SingleNodeCall(node, "iallocator_runner", [name, idata])
931
932   def call_blockdev_grow(self, node, cf_bdev, amount):
933     """Request a snapshot of the given block device.
934
935     This is a single-node call.
936
937     """
938     return self._SingleNodeCall(node, "blockdev_grow",
939                                 [cf_bdev.ToDict(), amount])
940
941   def call_blockdev_snapshot(self, node, cf_bdev):
942     """Request a snapshot of the given block device.
943
944     This is a single-node call.
945
946     """
947     return self._SingleNodeCall(node, "blockdev_snapshot", [cf_bdev.ToDict()])
948
949   def call_snapshot_export(self, node, snap_bdev, dest_node, instance,
950                            cluster_name, idx):
951     """Request the export of a given snapshot.
952
953     This is a single-node call.
954
955     """
956     return self._SingleNodeCall(node, "snapshot_export",
957                                 [snap_bdev.ToDict(), dest_node,
958                                  self._InstDict(instance), cluster_name, idx])
959
960   def call_finalize_export(self, node, instance, snap_disks):
961     """Request the completion of an export operation.
962
963     This writes the export config file, etc.
964
965     This is a single-node call.
966
967     """
968     flat_disks = []
969     for disk in snap_disks:
970       if isinstance(disk, bool):
971         flat_disks.append(disk)
972       else:
973         flat_disks.append(disk.ToDict())
974
975     return self._SingleNodeCall(node, "finalize_export",
976                                 [self._InstDict(instance), flat_disks])
977
978   def call_export_info(self, node, path):
979     """Queries the export information in a given path.
980
981     This is a single-node call.
982
983     """
984     result = self._SingleNodeCall(node, "export_info", [path])
985     if not result.failed and result.data:
986       result.data = objects.SerializableConfigParser.Loads(str(result.data))
987     return result
988
989   def call_instance_os_import(self, node, inst, src_node, src_images,
990                               cluster_name):
991     """Request the import of a backup into an instance.
992
993     This is a single-node call.
994
995     """
996     return self._SingleNodeCall(node, "instance_os_import",
997                                 [self._InstDict(inst), src_node, src_images,
998                                  cluster_name])
999
1000   def call_export_list(self, node_list):
1001     """Gets the stored exports list.
1002
1003     This is a multi-node call.
1004
1005     """
1006     return self._MultiNodeCall(node_list, "export_list", [])
1007
1008   def call_export_remove(self, node, export):
1009     """Requests removal of a given export.
1010
1011     This is a single-node call.
1012
1013     """
1014     return self._SingleNodeCall(node, "export_remove", [export])
1015
1016   @classmethod
1017   def call_node_leave_cluster(cls, node):
1018     """Requests a node to clean the cluster information it has.
1019
1020     This will remove the configuration information from the ganeti data
1021     dir.
1022
1023     This is a single-node call.
1024
1025     """
1026     return cls._StaticSingleNodeCall(node, "node_leave_cluster", [])
1027
1028   def call_node_volumes(self, node_list):
1029     """Gets all volumes on node(s).
1030
1031     This is a multi-node call.
1032
1033     """
1034     return self._MultiNodeCall(node_list, "node_volumes", [])
1035
1036   def call_node_demote_from_mc(self, node):
1037     """Demote a node from the master candidate role.
1038
1039     This is a single-node call.
1040
1041     """
1042     return self._SingleNodeCall(node, "node_demote_from_mc", [])
1043
1044   def call_test_delay(self, node_list, duration):
1045     """Sleep for a fixed time on given node(s).
1046
1047     This is a multi-node call.
1048
1049     """
1050     return self._MultiNodeCall(node_list, "test_delay", [duration])
1051
1052   def call_file_storage_dir_create(self, node, file_storage_dir):
1053     """Create the given file storage directory.
1054
1055     This is a single-node call.
1056
1057     """
1058     return self._SingleNodeCall(node, "file_storage_dir_create",
1059                                 [file_storage_dir])
1060
1061   def call_file_storage_dir_remove(self, node, file_storage_dir):
1062     """Remove the given file storage directory.
1063
1064     This is a single-node call.
1065
1066     """
1067     return self._SingleNodeCall(node, "file_storage_dir_remove",
1068                                 [file_storage_dir])
1069
1070   def call_file_storage_dir_rename(self, node, old_file_storage_dir,
1071                                    new_file_storage_dir):
1072     """Rename file storage directory.
1073
1074     This is a single-node call.
1075
1076     """
1077     return self._SingleNodeCall(node, "file_storage_dir_rename",
1078                                 [old_file_storage_dir, new_file_storage_dir])
1079
1080   @classmethod
1081   def call_jobqueue_update(cls, node_list, address_list, file_name, content):
1082     """Update job queue.
1083
1084     This is a multi-node call.
1085
1086     """
1087     return cls._StaticMultiNodeCall(node_list, "jobqueue_update",
1088                                     [file_name, cls._Compress(content)],
1089                                     address_list=address_list)
1090
1091   @classmethod
1092   def call_jobqueue_purge(cls, node):
1093     """Purge job queue.
1094
1095     This is a single-node call.
1096
1097     """
1098     return cls._StaticSingleNodeCall(node, "jobqueue_purge", [])
1099
1100   @classmethod
1101   def call_jobqueue_rename(cls, node_list, address_list, rename):
1102     """Rename a job queue file.
1103
1104     This is a multi-node call.
1105
1106     """
1107     return cls._StaticMultiNodeCall(node_list, "jobqueue_rename", rename,
1108                                     address_list=address_list)
1109
1110   @classmethod
1111   def call_jobqueue_set_drain(cls, node_list, drain_flag):
1112     """Set the drain flag on the queue.
1113
1114     This is a multi-node call.
1115
1116     @type node_list: list
1117     @param node_list: the list of nodes to query
1118     @type drain_flag: bool
1119     @param drain_flag: if True, will set the drain flag, otherwise reset it.
1120
1121     """
1122     return cls._StaticMultiNodeCall(node_list, "jobqueue_set_drain",
1123                                     [drain_flag])
1124
1125   def call_hypervisor_validate_params(self, node_list, hvname, hvparams):
1126     """Validate the hypervisor params.
1127
1128     This is a multi-node call.
1129
1130     @type node_list: list
1131     @param node_list: the list of nodes to query
1132     @type hvname: string
1133     @param hvname: the hypervisor name
1134     @type hvparams: dict
1135     @param hvparams: the hypervisor parameters to be validated
1136
1137     """
1138     cluster = self._cfg.GetClusterInfo()
1139     hv_full = cluster.FillDict(cluster.hvparams.get(hvname, {}), hvparams)
1140     return self._MultiNodeCall(node_list, "hypervisor_validate_params",
1141                                [hvname, hv_full])