http client: support per-request read timeout
[ganeti-local] / lib / rpc.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Inter-node RPC library.
23
24 """
25
26 # pylint: disable-msg=C0103,R0201,R0904
27 # C0103: Invalid name, since call_ are not valid
28 # R0201: Method could be a function, we keep all rpcs instance methods
29 # as not to change them back and forth between static/instance methods
30 # if they need to start using instance attributes
31 # R0904: Too many public methods
32
33 import os
34 import logging
35 import zlib
36 import base64
37
38 from ganeti import utils
39 from ganeti import objects
40 from ganeti import http
41 from ganeti import serializer
42 from ganeti import constants
43 from ganeti import errors
44
45 # pylint has a bug here, doesn't see this import
46 import ganeti.http.client  # pylint: disable-msg=W0611
47
48
49 # Module level variable
50 _http_manager = None
51
52
53 def Init():
54   """Initializes the module-global HTTP client manager.
55
56   Must be called before using any RPC function.
57
58   """
59   global _http_manager # pylint: disable-msg=W0603
60
61   assert not _http_manager, "RPC module initialized more than once"
62
63   http.InitSsl()
64
65   _http_manager = http.client.HttpClientManager()
66
67
68 def Shutdown():
69   """Stops the module-global HTTP client manager.
70
71   Must be called before quitting the program.
72
73   """
74   global _http_manager # pylint: disable-msg=W0603
75
76   if _http_manager:
77     _http_manager.Shutdown()
78     _http_manager = None
79
80
81 class RpcResult(object):
82   """RPC Result class.
83
84   This class holds an RPC result. It is needed since in multi-node
85   calls we can't raise an exception just because one one out of many
86   failed, and therefore we use this class to encapsulate the result.
87
88   @ivar data: the data payload, for successful results, or None
89   @ivar call: the name of the RPC call
90   @ivar node: the name of the node to which we made the call
91   @ivar offline: whether the operation failed because the node was
92       offline, as opposed to actual failure; offline=True will always
93       imply failed=True, in order to allow simpler checking if
94       the user doesn't care about the exact failure mode
95   @ivar fail_msg: the error message if the call failed
96
97   """
98   def __init__(self, data=None, failed=False, offline=False,
99                call=None, node=None):
100     self.offline = offline
101     self.call = call
102     self.node = node
103
104     if offline:
105       self.fail_msg = "Node is marked offline"
106       self.data = self.payload = None
107     elif failed:
108       self.fail_msg = self._EnsureErr(data)
109       self.data = self.payload = None
110     else:
111       self.data = data
112       if not isinstance(self.data, (tuple, list)):
113         self.fail_msg = ("RPC layer error: invalid result type (%s)" %
114                          type(self.data))
115         self.payload = None
116       elif len(data) != 2:
117         self.fail_msg = ("RPC layer error: invalid result length (%d), "
118                          "expected 2" % len(self.data))
119         self.payload = None
120       elif not self.data[0]:
121         self.fail_msg = self._EnsureErr(self.data[1])
122         self.payload = None
123       else:
124         # finally success
125         self.fail_msg = None
126         self.payload = data[1]
127
128     assert hasattr(self, "call")
129     assert hasattr(self, "data")
130     assert hasattr(self, "fail_msg")
131     assert hasattr(self, "node")
132     assert hasattr(self, "offline")
133     assert hasattr(self, "payload")
134
135   @staticmethod
136   def _EnsureErr(val):
137     """Helper to ensure we return a 'True' value for error."""
138     if val:
139       return val
140     else:
141       return "No error information"
142
143   def Raise(self, msg, prereq=False, ecode=None):
144     """If the result has failed, raise an OpExecError.
145
146     This is used so that LU code doesn't have to check for each
147     result, but instead can call this function.
148
149     """
150     if not self.fail_msg:
151       return
152
153     if not msg: # one could pass None for default message
154       msg = ("Call '%s' to node '%s' has failed: %s" %
155              (self.call, self.node, self.fail_msg))
156     else:
157       msg = "%s: %s" % (msg, self.fail_msg)
158     if prereq:
159       ec = errors.OpPrereqError
160     else:
161       ec = errors.OpExecError
162     if ecode is not None:
163       args = (msg, prereq)
164     else:
165       args = (msg, )
166     raise ec(*args) # pylint: disable-msg=W0142
167
168
169 class Client:
170   """RPC Client class.
171
172   This class, given a (remote) method name, a list of parameters and a
173   list of nodes, will contact (in parallel) all nodes, and return a
174   dict of results (key: node name, value: result).
175
176   One current bug is that generic failure is still signaled by
177   'False' result, which is not good. This overloading of values can
178   cause bugs.
179
180   """
181   def __init__(self, procedure, body, port):
182     self.procedure = procedure
183     self.body = body
184     self.port = port
185     self.nc = {}
186
187     self._ssl_params = \
188       http.HttpSslParams(ssl_key_path=constants.NODED_CERT_FILE,
189                          ssl_cert_path=constants.NODED_CERT_FILE)
190
191   def ConnectList(self, node_list, address_list=None, read_timeout=None):
192     """Add a list of nodes to the target nodes.
193
194     @type node_list: list
195     @param node_list: the list of node names to connect
196     @type address_list: list or None
197     @keyword address_list: either None or a list with node addresses,
198         which must have the same length as the node list
199     @type read_timeout: int
200     @param read_timeout: overwrites the default read timeout for the
201         given operation
202
203     """
204     if address_list is None:
205       address_list = [None for _ in node_list]
206     else:
207       assert len(node_list) == len(address_list), \
208              "Name and address lists should have the same length"
209     for node, address in zip(node_list, address_list):
210       self.ConnectNode(node, address, read_timeout=read_timeout)
211
212   def ConnectNode(self, name, address=None, read_timeout=None):
213     """Add a node to the target list.
214
215     @type name: str
216     @param name: the node name
217     @type address: str
218     @keyword address: the node address, if known
219
220     """
221     if address is None:
222       address = name
223
224     self.nc[name] = \
225       http.client.HttpClientRequest(address, self.port, http.HTTP_PUT,
226                                     "/%s" % self.procedure,
227                                     post_data=self.body,
228                                     ssl_params=self._ssl_params,
229                                     ssl_verify_peer=True,
230                                     read_timeout=read_timeout)
231
232   def GetResults(self):
233     """Call nodes and return results.
234
235     @rtype: list
236     @return: List of RPC results
237
238     """
239     assert _http_manager, "RPC module not initialized"
240
241     _http_manager.ExecRequests(self.nc.values())
242
243     results = {}
244
245     for name, req in self.nc.iteritems():
246       if req.success and req.resp_status_code == http.HTTP_OK:
247         results[name] = RpcResult(data=serializer.LoadJson(req.resp_body),
248                                   node=name, call=self.procedure)
249         continue
250
251       # TODO: Better error reporting
252       if req.error:
253         msg = req.error
254       else:
255         msg = req.resp_body
256
257       logging.error("RPC error in %s from node %s: %s",
258                     self.procedure, name, msg)
259       results[name] = RpcResult(data=msg, failed=True, node=name,
260                                 call=self.procedure)
261
262     return results
263
264
265 def _EncodeImportExportIO(ieio, ieioargs):
266   """Encodes import/export I/O information.
267
268   """
269   if ieio == constants.IEIO_RAW_DISK:
270     assert len(ieioargs) == 1
271     return (ieioargs[0].ToDict(), )
272
273   if ieio == constants.IEIO_SCRIPT:
274     assert len(ieioargs) == 2
275     return (ieioargs[0].ToDict(), ieioargs[1])
276
277   return ieioargs
278
279
280 class RpcRunner(object):
281   """RPC runner class"""
282
283   def __init__(self, cfg):
284     """Initialized the rpc runner.
285
286     @type cfg:  C{config.ConfigWriter}
287     @param cfg: the configuration object that will be used to get data
288                 about the cluster
289
290     """
291     self._cfg = cfg
292     self.port = utils.GetDaemonPort(constants.NODED)
293
294   def _InstDict(self, instance, hvp=None, bep=None):
295     """Convert the given instance to a dict.
296
297     This is done via the instance's ToDict() method and additionally
298     we fill the hvparams with the cluster defaults.
299
300     @type instance: L{objects.Instance}
301     @param instance: an Instance object
302     @type hvp: dict or None
303     @param hvp: a dictionary with overridden hypervisor parameters
304     @type bep: dict or None
305     @param bep: a dictionary with overridden backend parameters
306     @rtype: dict
307     @return: the instance dict, with the hvparams filled with the
308         cluster defaults
309
310     """
311     idict = instance.ToDict()
312     cluster = self._cfg.GetClusterInfo()
313     idict["hvparams"] = cluster.FillHV(instance)
314     if hvp is not None:
315       idict["hvparams"].update(hvp)
316     idict["beparams"] = cluster.FillBE(instance)
317     if bep is not None:
318       idict["beparams"].update(bep)
319     for nic in idict["nics"]:
320       nic['nicparams'] = objects.FillDict(
321         cluster.nicparams[constants.PP_DEFAULT],
322         nic['nicparams'])
323     return idict
324
325   def _ConnectList(self, client, node_list, call, read_timeout=None):
326     """Helper for computing node addresses.
327
328     @type client: L{ganeti.rpc.Client}
329     @param client: a C{Client} instance
330     @type node_list: list
331     @param node_list: the node list we should connect
332     @type call: string
333     @param call: the name of the remote procedure call, for filling in
334         correctly any eventual offline nodes' results
335     @type read_timeout: int
336     @param read_timeout: overwrites the default read timeout for the
337         given operation
338
339     """
340     all_nodes = self._cfg.GetAllNodesInfo()
341     name_list = []
342     addr_list = []
343     skip_dict = {}
344     for node in node_list:
345       if node in all_nodes:
346         if all_nodes[node].offline:
347           skip_dict[node] = RpcResult(node=node, offline=True, call=call)
348           continue
349         val = all_nodes[node].primary_ip
350       else:
351         val = None
352       addr_list.append(val)
353       name_list.append(node)
354     if name_list:
355       client.ConnectList(name_list, address_list=addr_list,
356                          read_timeout=read_timeout)
357     return skip_dict
358
359   def _ConnectNode(self, client, node, call, read_timeout=None):
360     """Helper for computing one node's address.
361
362     @type client: L{ganeti.rpc.Client}
363     @param client: a C{Client} instance
364     @type node: str
365     @param node: the node we should connect
366     @type call: string
367     @param call: the name of the remote procedure call, for filling in
368         correctly any eventual offline nodes' results
369     @type read_timeout: int
370     @param read_timeout: overwrites the default read timeout for the
371         given operation
372
373     """
374     node_info = self._cfg.GetNodeInfo(node)
375     if node_info is not None:
376       if node_info.offline:
377         return RpcResult(node=node, offline=True, call=call)
378       addr = node_info.primary_ip
379     else:
380       addr = None
381     client.ConnectNode(node, address=addr, read_timeout=read_timeout)
382
383   def _MultiNodeCall(self, node_list, procedure, args, read_timeout=None):
384     """Helper for making a multi-node call
385
386     """
387     body = serializer.DumpJson(args, indent=False)
388     c = Client(procedure, body, self.port)
389     skip_dict = self._ConnectList(c, node_list, procedure,
390                                   read_timeout=read_timeout)
391     skip_dict.update(c.GetResults())
392     return skip_dict
393
394   @classmethod
395   def _StaticMultiNodeCall(cls, node_list, procedure, args,
396                            address_list=None, read_timeout=None):
397     """Helper for making a multi-node static call
398
399     """
400     body = serializer.DumpJson(args, indent=False)
401     c = Client(procedure, body, utils.GetDaemonPort(constants.NODED))
402     c.ConnectList(node_list, address_list=address_list,
403                   read_timeout=read_timeout)
404     return c.GetResults()
405
406   def _SingleNodeCall(self, node, procedure, args, read_timeout=None):
407     """Helper for making a single-node call
408
409     """
410     body = serializer.DumpJson(args, indent=False)
411     c = Client(procedure, body, self.port)
412     result = self._ConnectNode(c, node, procedure, read_timeout=read_timeout)
413     if result is None:
414       # we did connect, node is not offline
415       result = c.GetResults()[node]
416     return result
417
418   @classmethod
419   def _StaticSingleNodeCall(cls, node, procedure, args, read_timeout=None):
420     """Helper for making a single-node static call
421
422     """
423     body = serializer.DumpJson(args, indent=False)
424     c = Client(procedure, body, utils.GetDaemonPort(constants.NODED))
425     c.ConnectNode(node, read_timeout=read_timeout)
426     return c.GetResults()[node]
427
428   @staticmethod
429   def _Compress(data):
430     """Compresses a string for transport over RPC.
431
432     Small amounts of data are not compressed.
433
434     @type data: str
435     @param data: Data
436     @rtype: tuple
437     @return: Encoded data to send
438
439     """
440     # Small amounts of data are not compressed
441     if len(data) < 512:
442       return (constants.RPC_ENCODING_NONE, data)
443
444     # Compress with zlib and encode in base64
445     return (constants.RPC_ENCODING_ZLIB_BASE64,
446             base64.b64encode(zlib.compress(data, 3)))
447
448   #
449   # Begin RPC calls
450   #
451
452   def call_lv_list(self, node_list, vg_name):
453     """Gets the logical volumes present in a given volume group.
454
455     This is a multi-node call.
456
457     """
458     return self._MultiNodeCall(node_list, "lv_list", [vg_name])
459
460   def call_vg_list(self, node_list):
461     """Gets the volume group list.
462
463     This is a multi-node call.
464
465     """
466     return self._MultiNodeCall(node_list, "vg_list", [])
467
468   def call_storage_list(self, node_list, su_name, su_args, name, fields):
469     """Get list of storage units.
470
471     This is a multi-node call.
472
473     """
474     return self._MultiNodeCall(node_list, "storage_list",
475                                [su_name, su_args, name, fields])
476
477   def call_storage_modify(self, node, su_name, su_args, name, changes):
478     """Modify a storage unit.
479
480     This is a single-node call.
481
482     """
483     return self._SingleNodeCall(node, "storage_modify",
484                                 [su_name, su_args, name, changes])
485
486   def call_storage_execute(self, node, su_name, su_args, name, op):
487     """Executes an operation on a storage unit.
488
489     This is a single-node call.
490
491     """
492     return self._SingleNodeCall(node, "storage_execute",
493                                 [su_name, su_args, name, op])
494
495   def call_bridges_exist(self, node, bridges_list):
496     """Checks if a node has all the bridges given.
497
498     This method checks if all bridges given in the bridges_list are
499     present on the remote node, so that an instance that uses interfaces
500     on those bridges can be started.
501
502     This is a single-node call.
503
504     """
505     return self._SingleNodeCall(node, "bridges_exist", [bridges_list])
506
507   def call_instance_start(self, node, instance, hvp, bep):
508     """Starts an instance.
509
510     This is a single-node call.
511
512     """
513     idict = self._InstDict(instance, hvp=hvp, bep=bep)
514     return self._SingleNodeCall(node, "instance_start", [idict])
515
516   def call_instance_shutdown(self, node, instance, timeout):
517     """Stops an instance.
518
519     This is a single-node call.
520
521     """
522     return self._SingleNodeCall(node, "instance_shutdown",
523                                 [self._InstDict(instance), timeout])
524
525   def call_migration_info(self, node, instance):
526     """Gather the information necessary to prepare an instance migration.
527
528     This is a single-node call.
529
530     @type node: string
531     @param node: the node on which the instance is currently running
532     @type instance: C{objects.Instance}
533     @param instance: the instance definition
534
535     """
536     return self._SingleNodeCall(node, "migration_info",
537                                 [self._InstDict(instance)])
538
539   def call_accept_instance(self, node, instance, info, target):
540     """Prepare a node to accept an instance.
541
542     This is a single-node call.
543
544     @type node: string
545     @param node: the target node for the migration
546     @type instance: C{objects.Instance}
547     @param instance: the instance definition
548     @type info: opaque/hypervisor specific (string/data)
549     @param info: result for the call_migration_info call
550     @type target: string
551     @param target: target hostname (usually ip address) (on the node itself)
552
553     """
554     return self._SingleNodeCall(node, "accept_instance",
555                                 [self._InstDict(instance), info, target])
556
557   def call_finalize_migration(self, node, instance, info, success):
558     """Finalize any target-node migration specific operation.
559
560     This is called both in case of a successful migration and in case of error
561     (in which case it should abort the migration).
562
563     This is a single-node call.
564
565     @type node: string
566     @param node: the target node for the migration
567     @type instance: C{objects.Instance}
568     @param instance: the instance definition
569     @type info: opaque/hypervisor specific (string/data)
570     @param info: result for the call_migration_info call
571     @type success: boolean
572     @param success: whether the migration was a success or a failure
573
574     """
575     return self._SingleNodeCall(node, "finalize_migration",
576                                 [self._InstDict(instance), info, success])
577
578   def call_instance_migrate(self, node, instance, target, live):
579     """Migrate an instance.
580
581     This is a single-node call.
582
583     @type node: string
584     @param node: the node on which the instance is currently running
585     @type instance: C{objects.Instance}
586     @param instance: the instance definition
587     @type target: string
588     @param target: the target node name
589     @type live: boolean
590     @param live: whether the migration should be done live or not (the
591         interpretation of this parameter is left to the hypervisor)
592
593     """
594     return self._SingleNodeCall(node, "instance_migrate",
595                                 [self._InstDict(instance), target, live])
596
597   def call_instance_reboot(self, node, inst, reboot_type, shutdown_timeout):
598     """Reboots an instance.
599
600     This is a single-node call.
601
602     """
603     return self._SingleNodeCall(node, "instance_reboot",
604                                 [self._InstDict(inst), reboot_type,
605                                  shutdown_timeout])
606
607   def call_instance_os_add(self, node, inst, reinstall, debug):
608     """Installs an OS on the given instance.
609
610     This is a single-node call.
611
612     """
613     return self._SingleNodeCall(node, "instance_os_add",
614                                 [self._InstDict(inst), reinstall, debug])
615
616   def call_instance_run_rename(self, node, inst, old_name, debug):
617     """Run the OS rename script for an instance.
618
619     This is a single-node call.
620
621     """
622     return self._SingleNodeCall(node, "instance_run_rename",
623                                 [self._InstDict(inst), old_name, debug])
624
625   def call_instance_info(self, node, instance, hname):
626     """Returns information about a single instance.
627
628     This is a single-node call.
629
630     @type node: list
631     @param node: the list of nodes to query
632     @type instance: string
633     @param instance: the instance name
634     @type hname: string
635     @param hname: the hypervisor type of the instance
636
637     """
638     return self._SingleNodeCall(node, "instance_info", [instance, hname])
639
640   def call_instance_migratable(self, node, instance):
641     """Checks whether the given instance can be migrated.
642
643     This is a single-node call.
644
645     @param node: the node to query
646     @type instance: L{objects.Instance}
647     @param instance: the instance to check
648
649
650     """
651     return self._SingleNodeCall(node, "instance_migratable",
652                                 [self._InstDict(instance)])
653
654   def call_all_instances_info(self, node_list, hypervisor_list):
655     """Returns information about all instances on the given nodes.
656
657     This is a multi-node call.
658
659     @type node_list: list
660     @param node_list: the list of nodes to query
661     @type hypervisor_list: list
662     @param hypervisor_list: the hypervisors to query for instances
663
664     """
665     return self._MultiNodeCall(node_list, "all_instances_info",
666                                [hypervisor_list])
667
668   def call_instance_list(self, node_list, hypervisor_list):
669     """Returns the list of running instances on a given node.
670
671     This is a multi-node call.
672
673     @type node_list: list
674     @param node_list: the list of nodes to query
675     @type hypervisor_list: list
676     @param hypervisor_list: the hypervisors to query for instances
677
678     """
679     return self._MultiNodeCall(node_list, "instance_list", [hypervisor_list])
680
681   def call_node_tcp_ping(self, node, source, target, port, timeout,
682                          live_port_needed):
683     """Do a TcpPing on the remote node
684
685     This is a single-node call.
686
687     """
688     return self._SingleNodeCall(node, "node_tcp_ping",
689                                 [source, target, port, timeout,
690                                  live_port_needed])
691
692   def call_node_has_ip_address(self, node, address):
693     """Checks if a node has the given IP address.
694
695     This is a single-node call.
696
697     """
698     return self._SingleNodeCall(node, "node_has_ip_address", [address])
699
700   def call_node_info(self, node_list, vg_name, hypervisor_type):
701     """Return node information.
702
703     This will return memory information and volume group size and free
704     space.
705
706     This is a multi-node call.
707
708     @type node_list: list
709     @param node_list: the list of nodes to query
710     @type vg_name: C{string}
711     @param vg_name: the name of the volume group to ask for disk space
712         information
713     @type hypervisor_type: C{str}
714     @param hypervisor_type: the name of the hypervisor to ask for
715         memory information
716
717     """
718     return self._MultiNodeCall(node_list, "node_info",
719                                [vg_name, hypervisor_type])
720
721   def call_node_add(self, node, dsa, dsapub, rsa, rsapub, ssh, sshpub):
722     """Add a node to the cluster.
723
724     This is a single-node call.
725
726     """
727     return self._SingleNodeCall(node, "node_add",
728                                 [dsa, dsapub, rsa, rsapub, ssh, sshpub])
729
730   def call_node_verify(self, node_list, checkdict, cluster_name):
731     """Request verification of given parameters.
732
733     This is a multi-node call.
734
735     """
736     return self._MultiNodeCall(node_list, "node_verify",
737                                [checkdict, cluster_name])
738
739   @classmethod
740   def call_node_start_master(cls, node, start_daemons, no_voting):
741     """Tells a node to activate itself as a master.
742
743     This is a single-node call.
744
745     """
746     return cls._StaticSingleNodeCall(node, "node_start_master",
747                                      [start_daemons, no_voting])
748
749   @classmethod
750   def call_node_stop_master(cls, node, stop_daemons):
751     """Tells a node to demote itself from master status.
752
753     This is a single-node call.
754
755     """
756     return cls._StaticSingleNodeCall(node, "node_stop_master", [stop_daemons])
757
758   @classmethod
759   def call_master_info(cls, node_list):
760     """Query master info.
761
762     This is a multi-node call.
763
764     """
765     # TODO: should this method query down nodes?
766     return cls._StaticMultiNodeCall(node_list, "master_info", [])
767
768   @classmethod
769   def call_version(cls, node_list):
770     """Query node version.
771
772     This is a multi-node call.
773
774     """
775     return cls._StaticMultiNodeCall(node_list, "version", [])
776
777   def call_blockdev_create(self, node, bdev, size, owner, on_primary, info):
778     """Request creation of a given block device.
779
780     This is a single-node call.
781
782     """
783     return self._SingleNodeCall(node, "blockdev_create",
784                                 [bdev.ToDict(), size, owner, on_primary, info])
785
786   def call_blockdev_remove(self, node, bdev):
787     """Request removal of a given block device.
788
789     This is a single-node call.
790
791     """
792     return self._SingleNodeCall(node, "blockdev_remove", [bdev.ToDict()])
793
794   def call_blockdev_rename(self, node, devlist):
795     """Request rename of the given block devices.
796
797     This is a single-node call.
798
799     """
800     return self._SingleNodeCall(node, "blockdev_rename",
801                                 [(d.ToDict(), uid) for d, uid in devlist])
802
803   def call_blockdev_assemble(self, node, disk, owner, on_primary):
804     """Request assembling of a given block device.
805
806     This is a single-node call.
807
808     """
809     return self._SingleNodeCall(node, "blockdev_assemble",
810                                 [disk.ToDict(), owner, on_primary])
811
812   def call_blockdev_shutdown(self, node, disk):
813     """Request shutdown of a given block device.
814
815     This is a single-node call.
816
817     """
818     return self._SingleNodeCall(node, "blockdev_shutdown", [disk.ToDict()])
819
820   def call_blockdev_addchildren(self, node, bdev, ndevs):
821     """Request adding a list of children to a (mirroring) device.
822
823     This is a single-node call.
824
825     """
826     return self._SingleNodeCall(node, "blockdev_addchildren",
827                                 [bdev.ToDict(),
828                                  [disk.ToDict() for disk in ndevs]])
829
830   def call_blockdev_removechildren(self, node, bdev, ndevs):
831     """Request removing a list of children from a (mirroring) device.
832
833     This is a single-node call.
834
835     """
836     return self._SingleNodeCall(node, "blockdev_removechildren",
837                                 [bdev.ToDict(),
838                                  [disk.ToDict() for disk in ndevs]])
839
840   def call_blockdev_getmirrorstatus(self, node, disks):
841     """Request status of a (mirroring) device.
842
843     This is a single-node call.
844
845     """
846     result = self._SingleNodeCall(node, "blockdev_getmirrorstatus",
847                                   [dsk.ToDict() for dsk in disks])
848     if not result.fail_msg:
849       result.payload = [objects.BlockDevStatus.FromDict(i)
850                         for i in result.payload]
851     return result
852
853   def call_blockdev_find(self, node, disk):
854     """Request identification of a given block device.
855
856     This is a single-node call.
857
858     """
859     result = self._SingleNodeCall(node, "blockdev_find", [disk.ToDict()])
860     if not result.fail_msg and result.payload is not None:
861       result.payload = objects.BlockDevStatus.FromDict(result.payload)
862     return result
863
864   def call_blockdev_close(self, node, instance_name, disks):
865     """Closes the given block devices.
866
867     This is a single-node call.
868
869     """
870     params = [instance_name, [cf.ToDict() for cf in disks]]
871     return self._SingleNodeCall(node, "blockdev_close", params)
872
873   def call_blockdev_getsizes(self, node, disks):
874     """Returns the size of the given disks.
875
876     This is a single-node call.
877
878     """
879     params = [[cf.ToDict() for cf in disks]]
880     return self._SingleNodeCall(node, "blockdev_getsize", params)
881
882   def call_drbd_disconnect_net(self, node_list, nodes_ip, disks):
883     """Disconnects the network of the given drbd devices.
884
885     This is a multi-node call.
886
887     """
888     return self._MultiNodeCall(node_list, "drbd_disconnect_net",
889                                [nodes_ip, [cf.ToDict() for cf in disks]])
890
891   def call_drbd_attach_net(self, node_list, nodes_ip,
892                            disks, instance_name, multimaster):
893     """Disconnects the given drbd devices.
894
895     This is a multi-node call.
896
897     """
898     return self._MultiNodeCall(node_list, "drbd_attach_net",
899                                [nodes_ip, [cf.ToDict() for cf in disks],
900                                 instance_name, multimaster])
901
902   def call_drbd_wait_sync(self, node_list, nodes_ip, disks):
903     """Waits for the synchronization of drbd devices is complete.
904
905     This is a multi-node call.
906
907     """
908     return self._MultiNodeCall(node_list, "drbd_wait_sync",
909                                [nodes_ip, [cf.ToDict() for cf in disks]])
910
911   @classmethod
912   def call_upload_file(cls, node_list, file_name, address_list=None):
913     """Upload a file.
914
915     The node will refuse the operation in case the file is not on the
916     approved file list.
917
918     This is a multi-node call.
919
920     @type node_list: list
921     @param node_list: the list of node names to upload to
922     @type file_name: str
923     @param file_name: the filename to upload
924     @type address_list: list or None
925     @keyword address_list: an optional list of node addresses, in order
926         to optimize the RPC speed
927
928     """
929     file_contents = utils.ReadFile(file_name)
930     data = cls._Compress(file_contents)
931     st = os.stat(file_name)
932     params = [file_name, data, st.st_mode, st.st_uid, st.st_gid,
933               st.st_atime, st.st_mtime]
934     return cls._StaticMultiNodeCall(node_list, "upload_file", params,
935                                     address_list=address_list)
936
937   @classmethod
938   def call_write_ssconf_files(cls, node_list, values):
939     """Write ssconf files.
940
941     This is a multi-node call.
942
943     """
944     return cls._StaticMultiNodeCall(node_list, "write_ssconf_files", [values])
945
946   def call_os_diagnose(self, node_list):
947     """Request a diagnose of OS definitions.
948
949     This is a multi-node call.
950
951     """
952     return self._MultiNodeCall(node_list, "os_diagnose", [])
953
954   def call_os_get(self, node, name):
955     """Returns an OS definition.
956
957     This is a single-node call.
958
959     """
960     result = self._SingleNodeCall(node, "os_get", [name])
961     if not result.fail_msg and isinstance(result.payload, dict):
962       result.payload = objects.OS.FromDict(result.payload)
963     return result
964
965   def call_hooks_runner(self, node_list, hpath, phase, env):
966     """Call the hooks runner.
967
968     Args:
969       - op: the OpCode instance
970       - env: a dictionary with the environment
971
972     This is a multi-node call.
973
974     """
975     params = [hpath, phase, env]
976     return self._MultiNodeCall(node_list, "hooks_runner", params)
977
978   def call_iallocator_runner(self, node, name, idata):
979     """Call an iallocator on a remote node
980
981     Args:
982       - name: the iallocator name
983       - input: the json-encoded input string
984
985     This is a single-node call.
986
987     """
988     return self._SingleNodeCall(node, "iallocator_runner", [name, idata])
989
990   def call_blockdev_grow(self, node, cf_bdev, amount):
991     """Request a snapshot of the given block device.
992
993     This is a single-node call.
994
995     """
996     return self._SingleNodeCall(node, "blockdev_grow",
997                                 [cf_bdev.ToDict(), amount])
998
999   def call_blockdev_export(self, node, cf_bdev,
1000                            dest_node, dest_path, cluster_name):
1001     """Export a given disk to another node.
1002
1003     This is a single-node call.
1004
1005     """
1006     return self._SingleNodeCall(node, "blockdev_export",
1007                                 [cf_bdev.ToDict(), dest_node, dest_path,
1008                                  cluster_name])
1009
1010   def call_blockdev_snapshot(self, node, cf_bdev):
1011     """Request a snapshot of the given block device.
1012
1013     This is a single-node call.
1014
1015     """
1016     return self._SingleNodeCall(node, "blockdev_snapshot", [cf_bdev.ToDict()])
1017
1018   def call_finalize_export(self, node, instance, snap_disks):
1019     """Request the completion of an export operation.
1020
1021     This writes the export config file, etc.
1022
1023     This is a single-node call.
1024
1025     """
1026     flat_disks = []
1027     for disk in snap_disks:
1028       if isinstance(disk, bool):
1029         flat_disks.append(disk)
1030       else:
1031         flat_disks.append(disk.ToDict())
1032
1033     return self._SingleNodeCall(node, "finalize_export",
1034                                 [self._InstDict(instance), flat_disks])
1035
1036   def call_export_info(self, node, path):
1037     """Queries the export information in a given path.
1038
1039     This is a single-node call.
1040
1041     """
1042     return self._SingleNodeCall(node, "export_info", [path])
1043
1044   def call_export_list(self, node_list):
1045     """Gets the stored exports list.
1046
1047     This is a multi-node call.
1048
1049     """
1050     return self._MultiNodeCall(node_list, "export_list", [])
1051
1052   def call_export_remove(self, node, export):
1053     """Requests removal of a given export.
1054
1055     This is a single-node call.
1056
1057     """
1058     return self._SingleNodeCall(node, "export_remove", [export])
1059
1060   @classmethod
1061   def call_node_leave_cluster(cls, node, modify_ssh_setup):
1062     """Requests a node to clean the cluster information it has.
1063
1064     This will remove the configuration information from the ganeti data
1065     dir.
1066
1067     This is a single-node call.
1068
1069     """
1070     return cls._StaticSingleNodeCall(node, "node_leave_cluster",
1071                                      [modify_ssh_setup])
1072
1073   def call_node_volumes(self, node_list):
1074     """Gets all volumes on node(s).
1075
1076     This is a multi-node call.
1077
1078     """
1079     return self._MultiNodeCall(node_list, "node_volumes", [])
1080
1081   def call_node_demote_from_mc(self, node):
1082     """Demote a node from the master candidate role.
1083
1084     This is a single-node call.
1085
1086     """
1087     return self._SingleNodeCall(node, "node_demote_from_mc", [])
1088
1089   def call_node_powercycle(self, node, hypervisor):
1090     """Tries to powercycle a node.
1091
1092     This is a single-node call.
1093
1094     """
1095     return self._SingleNodeCall(node, "node_powercycle", [hypervisor])
1096
1097   def call_test_delay(self, node_list, duration):
1098     """Sleep for a fixed time on given node(s).
1099
1100     This is a multi-node call.
1101
1102     """
1103     return self._MultiNodeCall(node_list, "test_delay", [duration])
1104
1105   def call_file_storage_dir_create(self, node, file_storage_dir):
1106     """Create the given file storage directory.
1107
1108     This is a single-node call.
1109
1110     """
1111     return self._SingleNodeCall(node, "file_storage_dir_create",
1112                                 [file_storage_dir])
1113
1114   def call_file_storage_dir_remove(self, node, file_storage_dir):
1115     """Remove the given file storage directory.
1116
1117     This is a single-node call.
1118
1119     """
1120     return self._SingleNodeCall(node, "file_storage_dir_remove",
1121                                 [file_storage_dir])
1122
1123   def call_file_storage_dir_rename(self, node, old_file_storage_dir,
1124                                    new_file_storage_dir):
1125     """Rename file storage directory.
1126
1127     This is a single-node call.
1128
1129     """
1130     return self._SingleNodeCall(node, "file_storage_dir_rename",
1131                                 [old_file_storage_dir, new_file_storage_dir])
1132
1133   @classmethod
1134   def call_jobqueue_update(cls, node_list, address_list, file_name, content):
1135     """Update job queue.
1136
1137     This is a multi-node call.
1138
1139     """
1140     return cls._StaticMultiNodeCall(node_list, "jobqueue_update",
1141                                     [file_name, cls._Compress(content)],
1142                                     address_list=address_list)
1143
1144   @classmethod
1145   def call_jobqueue_purge(cls, node):
1146     """Purge job queue.
1147
1148     This is a single-node call.
1149
1150     """
1151     return cls._StaticSingleNodeCall(node, "jobqueue_purge", [])
1152
1153   @classmethod
1154   def call_jobqueue_rename(cls, node_list, address_list, rename):
1155     """Rename a job queue file.
1156
1157     This is a multi-node call.
1158
1159     """
1160     return cls._StaticMultiNodeCall(node_list, "jobqueue_rename", rename,
1161                                     address_list=address_list)
1162
1163   @classmethod
1164   def call_jobqueue_set_drain(cls, node_list, drain_flag):
1165     """Set the drain flag on the queue.
1166
1167     This is a multi-node call.
1168
1169     @type node_list: list
1170     @param node_list: the list of nodes to query
1171     @type drain_flag: bool
1172     @param drain_flag: if True, will set the drain flag, otherwise reset it.
1173
1174     """
1175     return cls._StaticMultiNodeCall(node_list, "jobqueue_set_drain",
1176                                     [drain_flag])
1177
1178   def call_hypervisor_validate_params(self, node_list, hvname, hvparams):
1179     """Validate the hypervisor params.
1180
1181     This is a multi-node call.
1182
1183     @type node_list: list
1184     @param node_list: the list of nodes to query
1185     @type hvname: string
1186     @param hvname: the hypervisor name
1187     @type hvparams: dict
1188     @param hvparams: the hypervisor parameters to be validated
1189
1190     """
1191     cluster = self._cfg.GetClusterInfo()
1192     hv_full = objects.FillDict(cluster.hvparams.get(hvname, {}), hvparams)
1193     return self._MultiNodeCall(node_list, "hypervisor_validate_params",
1194                                [hvname, hv_full])
1195
1196   def call_x509_cert_create(self, node, validity):
1197     """Creates a new X509 certificate for SSL/TLS.
1198
1199     This is a single-node call.
1200
1201     @type validity: int
1202     @param validity: Validity in seconds
1203
1204     """
1205     return self._SingleNodeCall(node, "x509_cert_create", [validity])
1206
1207   def call_x509_cert_remove(self, node, name):
1208     """Removes a X509 certificate.
1209
1210     This is a single-node call.
1211
1212     @type name: string
1213     @param name: Certificate name
1214
1215     """
1216     return self._SingleNodeCall(node, "x509_cert_remove", [name])
1217
1218   def call_import_start(self, node, opts, instance, dest, dest_args):
1219     """Starts a listener for an import.
1220
1221     This is a single-node call.
1222
1223     @type node: string
1224     @param node: Node name
1225     @type instance: C{objects.Instance}
1226     @param instance: Instance object
1227
1228     """
1229     return self._SingleNodeCall(node, "import_start",
1230                                 [opts.ToDict(),
1231                                  self._InstDict(instance), dest,
1232                                  _EncodeImportExportIO(dest, dest_args)])
1233
1234   def call_export_start(self, node, opts, host, port,
1235                         instance, source, source_args):
1236     """Starts an export daemon.
1237
1238     This is a single-node call.
1239
1240     @type node: string
1241     @param node: Node name
1242     @type instance: C{objects.Instance}
1243     @param instance: Instance object
1244
1245     """
1246     return self._SingleNodeCall(node, "export_start",
1247                                 [opts.ToDict(), host, port,
1248                                  self._InstDict(instance), source,
1249                                  _EncodeImportExportIO(source, source_args)])
1250
1251   def call_impexp_status(self, node, names):
1252     """Gets the status of an import or export.
1253
1254     This is a single-node call.
1255
1256     @type node: string
1257     @param node: Node name
1258     @type names: List of strings
1259     @param names: Import/export names
1260     @rtype: List of L{objects.ImportExportStatus} instances
1261     @return: Returns a list of the state of each named import/export or None if
1262              a status couldn't be retrieved
1263
1264     """
1265     result = self._SingleNodeCall(node, "impexp_status", [names])
1266
1267     if not result.fail_msg:
1268       decoded = []
1269
1270       for i in result.payload:
1271         if i is None:
1272           decoded.append(None)
1273           continue
1274         decoded.append(objects.ImportExportStatus.FromDict(i))
1275
1276       result.payload = decoded
1277
1278     return result
1279
1280   def call_impexp_abort(self, node, name):
1281     """Aborts an import or export.
1282
1283     This is a single-node call.
1284
1285     @type node: string
1286     @param node: Node name
1287     @type name: string
1288     @param name: Import/export name
1289
1290     """
1291     return self._SingleNodeCall(node, "impexp_abort", [name])
1292
1293   def call_impexp_cleanup(self, node, name):
1294     """Cleans up after an import or export.
1295
1296     This is a single-node call.
1297
1298     @type node: string
1299     @param node: Node name
1300     @type name: string
1301     @param name: Import/export name
1302
1303     """
1304     return self._SingleNodeCall(node, "impexp_cleanup", [name])