Restrict blacklisted OSes in instance installation
[ganeti-local] / lib / rpc.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Inter-node RPC library.
23
24 """
25
26 # pylint: disable-msg=C0103,R0201,R0904
27 # C0103: Invalid name, since call_ are not valid
28 # R0201: Method could be a function, we keep all rpcs instance methods
29 # as not to change them back and forth between static/instance methods
30 # if they need to start using instance attributes
31 # R0904: Too many public methods
32
33 import os
34 import logging
35 import zlib
36 import base64
37 import pycurl
38 import threading
39
40 from ganeti import utils
41 from ganeti import objects
42 from ganeti import http
43 from ganeti import serializer
44 from ganeti import constants
45 from ganeti import errors
46 from ganeti import netutils
47
48 # pylint has a bug here, doesn't see this import
49 import ganeti.http.client  # pylint: disable-msg=W0611
50
51
52 # Timeout for connecting to nodes (seconds)
53 _RPC_CONNECT_TIMEOUT = 5
54
55 _RPC_CLIENT_HEADERS = [
56   "Content-type: %s" % http.HTTP_APP_JSON,
57   ]
58
59 # Various time constants for the timeout table
60 _TMO_URGENT = 60 # one minute
61 _TMO_FAST = 5 * 60 # five minutes
62 _TMO_NORMAL = 15 * 60 # 15 minutes
63 _TMO_SLOW = 3600 # one hour
64 _TMO_4HRS = 4 * 3600
65 _TMO_1DAY = 86400
66
67 # Timeout table that will be built later by decorators
68 # Guidelines for choosing timeouts:
69 # - call used during watcher: timeout -> 1min, _TMO_URGENT
70 # - trivial (but be sure it is trivial) (e.g. reading a file): 5min, _TMO_FAST
71 # - other calls: 15 min, _TMO_NORMAL
72 # - special calls (instance add, etc.): either _TMO_SLOW (1h) or huge timeouts
73
74 _TIMEOUTS = {
75 }
76
77
78 def Init():
79   """Initializes the module-global HTTP client manager.
80
81   Must be called before using any RPC function and while exactly one thread is
82   running.
83
84   """
85   # curl_global_init(3) and curl_global_cleanup(3) must be called with only
86   # one thread running. This check is just a safety measure -- it doesn't
87   # cover all cases.
88   assert threading.activeCount() == 1, \
89          "Found more than one active thread when initializing pycURL"
90
91   logging.info("Using PycURL %s", pycurl.version)
92
93   pycurl.global_init(pycurl.GLOBAL_ALL)
94
95
96 def Shutdown():
97   """Stops the module-global HTTP client manager.
98
99   Must be called before quitting the program and while exactly one thread is
100   running.
101
102   """
103   pycurl.global_cleanup()
104
105
106 def _ConfigRpcCurl(curl):
107   noded_cert = str(constants.NODED_CERT_FILE)
108
109   curl.setopt(pycurl.FOLLOWLOCATION, False)
110   curl.setopt(pycurl.CAINFO, noded_cert)
111   curl.setopt(pycurl.SSL_VERIFYHOST, 0)
112   curl.setopt(pycurl.SSL_VERIFYPEER, True)
113   curl.setopt(pycurl.SSLCERTTYPE, "PEM")
114   curl.setopt(pycurl.SSLCERT, noded_cert)
115   curl.setopt(pycurl.SSLKEYTYPE, "PEM")
116   curl.setopt(pycurl.SSLKEY, noded_cert)
117   curl.setopt(pycurl.CONNECTTIMEOUT, _RPC_CONNECT_TIMEOUT)
118
119
120 class _RpcThreadLocal(threading.local):
121   def GetHttpClientPool(self):
122     """Returns a per-thread HTTP client pool.
123
124     @rtype: L{http.client.HttpClientPool}
125
126     """
127     try:
128       pool = self.hcp
129     except AttributeError:
130       pool = http.client.HttpClientPool(_ConfigRpcCurl)
131       self.hcp = pool
132
133     return pool
134
135
136 _thread_local = _RpcThreadLocal()
137
138
139 def _RpcTimeout(secs):
140   """Timeout decorator.
141
142   When applied to a rpc call_* function, it updates the global timeout
143   table with the given function/timeout.
144
145   """
146   def decorator(f):
147     name = f.__name__
148     assert name.startswith("call_")
149     _TIMEOUTS[name[len("call_"):]] = secs
150     return f
151   return decorator
152
153
154 def RunWithRPC(fn):
155   """RPC-wrapper decorator.
156
157   When applied to a function, it runs it with the RPC system
158   initialized, and it shutsdown the system afterwards. This means the
159   function must be called without RPC being initialized.
160
161   """
162   def wrapper(*args, **kwargs):
163     Init()
164     try:
165       return fn(*args, **kwargs)
166     finally:
167       Shutdown()
168   return wrapper
169
170
171 class RpcResult(object):
172   """RPC Result class.
173
174   This class holds an RPC result. It is needed since in multi-node
175   calls we can't raise an exception just because one one out of many
176   failed, and therefore we use this class to encapsulate the result.
177
178   @ivar data: the data payload, for successful results, or None
179   @ivar call: the name of the RPC call
180   @ivar node: the name of the node to which we made the call
181   @ivar offline: whether the operation failed because the node was
182       offline, as opposed to actual failure; offline=True will always
183       imply failed=True, in order to allow simpler checking if
184       the user doesn't care about the exact failure mode
185   @ivar fail_msg: the error message if the call failed
186
187   """
188   def __init__(self, data=None, failed=False, offline=False,
189                call=None, node=None):
190     self.offline = offline
191     self.call = call
192     self.node = node
193
194     if offline:
195       self.fail_msg = "Node is marked offline"
196       self.data = self.payload = None
197     elif failed:
198       self.fail_msg = self._EnsureErr(data)
199       self.data = self.payload = None
200     else:
201       self.data = data
202       if not isinstance(self.data, (tuple, list)):
203         self.fail_msg = ("RPC layer error: invalid result type (%s)" %
204                          type(self.data))
205         self.payload = None
206       elif len(data) != 2:
207         self.fail_msg = ("RPC layer error: invalid result length (%d), "
208                          "expected 2" % len(self.data))
209         self.payload = None
210       elif not self.data[0]:
211         self.fail_msg = self._EnsureErr(self.data[1])
212         self.payload = None
213       else:
214         # finally success
215         self.fail_msg = None
216         self.payload = data[1]
217
218     assert hasattr(self, "call")
219     assert hasattr(self, "data")
220     assert hasattr(self, "fail_msg")
221     assert hasattr(self, "node")
222     assert hasattr(self, "offline")
223     assert hasattr(self, "payload")
224
225   @staticmethod
226   def _EnsureErr(val):
227     """Helper to ensure we return a 'True' value for error."""
228     if val:
229       return val
230     else:
231       return "No error information"
232
233   def Raise(self, msg, prereq=False, ecode=None):
234     """If the result has failed, raise an OpExecError.
235
236     This is used so that LU code doesn't have to check for each
237     result, but instead can call this function.
238
239     """
240     if not self.fail_msg:
241       return
242
243     if not msg: # one could pass None for default message
244       msg = ("Call '%s' to node '%s' has failed: %s" %
245              (self.call, self.node, self.fail_msg))
246     else:
247       msg = "%s: %s" % (msg, self.fail_msg)
248     if prereq:
249       ec = errors.OpPrereqError
250     else:
251       ec = errors.OpExecError
252     if ecode is not None:
253       args = (msg, ecode)
254     else:
255       args = (msg, )
256     raise ec(*args) # pylint: disable-msg=W0142
257
258
259 class Client:
260   """RPC Client class.
261
262   This class, given a (remote) method name, a list of parameters and a
263   list of nodes, will contact (in parallel) all nodes, and return a
264   dict of results (key: node name, value: result).
265
266   One current bug is that generic failure is still signaled by
267   'False' result, which is not good. This overloading of values can
268   cause bugs.
269
270   """
271   def __init__(self, procedure, body, port):
272     assert procedure in _TIMEOUTS, ("New RPC call not declared in the"
273                                     " timeouts table")
274     self.procedure = procedure
275     self.body = body
276     self.port = port
277     self._request = {}
278
279   def ConnectList(self, node_list, address_list=None, read_timeout=None):
280     """Add a list of nodes to the target nodes.
281
282     @type node_list: list
283     @param node_list: the list of node names to connect
284     @type address_list: list or None
285     @keyword address_list: either None or a list with node addresses,
286         which must have the same length as the node list
287     @type read_timeout: int
288     @param read_timeout: overwrites the default read timeout for the
289         given operation
290
291     """
292     if address_list is None:
293       address_list = [None for _ in node_list]
294     else:
295       assert len(node_list) == len(address_list), \
296              "Name and address lists should have the same length"
297     for node, address in zip(node_list, address_list):
298       self.ConnectNode(node, address, read_timeout=read_timeout)
299
300   def ConnectNode(self, name, address=None, read_timeout=None):
301     """Add a node to the target list.
302
303     @type name: str
304     @param name: the node name
305     @type address: str
306     @keyword address: the node address, if known
307
308     """
309     if address is None:
310       address = name
311
312     if read_timeout is None:
313       read_timeout = _TIMEOUTS[self.procedure]
314
315     self._request[name] = \
316       http.client.HttpClientRequest(str(address), self.port,
317                                     http.HTTP_PUT, str("/%s" % self.procedure),
318                                     headers=_RPC_CLIENT_HEADERS,
319                                     post_data=str(self.body),
320                                     read_timeout=read_timeout)
321
322   def GetResults(self, http_pool=None):
323     """Call nodes and return results.
324
325     @rtype: list
326     @return: List of RPC results
327
328     """
329     if not http_pool:
330       http_pool = _thread_local.GetHttpClientPool()
331
332     http_pool.ProcessRequests(self._request.values())
333
334     results = {}
335
336     for name, req in self._request.iteritems():
337       if req.success and req.resp_status_code == http.HTTP_OK:
338         results[name] = RpcResult(data=serializer.LoadJson(req.resp_body),
339                                   node=name, call=self.procedure)
340         continue
341
342       # TODO: Better error reporting
343       if req.error:
344         msg = req.error
345       else:
346         msg = req.resp_body
347
348       logging.error("RPC error in %s from node %s: %s",
349                     self.procedure, name, msg)
350       results[name] = RpcResult(data=msg, failed=True, node=name,
351                                 call=self.procedure)
352
353     return results
354
355
356 def _EncodeImportExportIO(ieio, ieioargs):
357   """Encodes import/export I/O information.
358
359   """
360   if ieio == constants.IEIO_RAW_DISK:
361     assert len(ieioargs) == 1
362     return (ieioargs[0].ToDict(), )
363
364   if ieio == constants.IEIO_SCRIPT:
365     assert len(ieioargs) == 2
366     return (ieioargs[0].ToDict(), ieioargs[1])
367
368   return ieioargs
369
370
371 class RpcRunner(object):
372   """RPC runner class"""
373
374   def __init__(self, cfg):
375     """Initialized the rpc runner.
376
377     @type cfg:  C{config.ConfigWriter}
378     @param cfg: the configuration object that will be used to get data
379                 about the cluster
380
381     """
382     self._cfg = cfg
383     self.port = netutils.GetDaemonPort(constants.NODED)
384
385   def _InstDict(self, instance, hvp=None, bep=None, osp=None):
386     """Convert the given instance to a dict.
387
388     This is done via the instance's ToDict() method and additionally
389     we fill the hvparams with the cluster defaults.
390
391     @type instance: L{objects.Instance}
392     @param instance: an Instance object
393     @type hvp: dict or None
394     @param hvp: a dictionary with overridden hypervisor parameters
395     @type bep: dict or None
396     @param bep: a dictionary with overridden backend parameters
397     @type osp: dict or None
398     @param osp: a dictionary with overriden os parameters
399     @rtype: dict
400     @return: the instance dict, with the hvparams filled with the
401         cluster defaults
402
403     """
404     idict = instance.ToDict()
405     cluster = self._cfg.GetClusterInfo()
406     idict["hvparams"] = cluster.FillHV(instance)
407     if hvp is not None:
408       idict["hvparams"].update(hvp)
409     idict["beparams"] = cluster.FillBE(instance)
410     if bep is not None:
411       idict["beparams"].update(bep)
412     idict["osparams"] = cluster.SimpleFillOS(instance.os, instance.osparams)
413     if osp is not None:
414       idict["osparams"].update(osp)
415     for nic in idict["nics"]:
416       nic['nicparams'] = objects.FillDict(
417         cluster.nicparams[constants.PP_DEFAULT],
418         nic['nicparams'])
419     return idict
420
421   def _ConnectList(self, client, node_list, call, read_timeout=None):
422     """Helper for computing node addresses.
423
424     @type client: L{ganeti.rpc.Client}
425     @param client: a C{Client} instance
426     @type node_list: list
427     @param node_list: the node list we should connect
428     @type call: string
429     @param call: the name of the remote procedure call, for filling in
430         correctly any eventual offline nodes' results
431     @type read_timeout: int
432     @param read_timeout: overwrites the default read timeout for the
433         given operation
434
435     """
436     all_nodes = self._cfg.GetAllNodesInfo()
437     name_list = []
438     addr_list = []
439     skip_dict = {}
440     for node in node_list:
441       if node in all_nodes:
442         if all_nodes[node].offline:
443           skip_dict[node] = RpcResult(node=node, offline=True, call=call)
444           continue
445         val = all_nodes[node].primary_ip
446       else:
447         val = None
448       addr_list.append(val)
449       name_list.append(node)
450     if name_list:
451       client.ConnectList(name_list, address_list=addr_list,
452                          read_timeout=read_timeout)
453     return skip_dict
454
455   def _ConnectNode(self, client, node, call, read_timeout=None):
456     """Helper for computing one node's address.
457
458     @type client: L{ganeti.rpc.Client}
459     @param client: a C{Client} instance
460     @type node: str
461     @param node: the node we should connect
462     @type call: string
463     @param call: the name of the remote procedure call, for filling in
464         correctly any eventual offline nodes' results
465     @type read_timeout: int
466     @param read_timeout: overwrites the default read timeout for the
467         given operation
468
469     """
470     node_info = self._cfg.GetNodeInfo(node)
471     if node_info is not None:
472       if node_info.offline:
473         return RpcResult(node=node, offline=True, call=call)
474       addr = node_info.primary_ip
475     else:
476       addr = None
477     client.ConnectNode(node, address=addr, read_timeout=read_timeout)
478
479   def _MultiNodeCall(self, node_list, procedure, args, read_timeout=None):
480     """Helper for making a multi-node call
481
482     """
483     body = serializer.DumpJson(args, indent=False)
484     c = Client(procedure, body, self.port)
485     skip_dict = self._ConnectList(c, node_list, procedure,
486                                   read_timeout=read_timeout)
487     skip_dict.update(c.GetResults())
488     return skip_dict
489
490   @classmethod
491   def _StaticMultiNodeCall(cls, node_list, procedure, args,
492                            address_list=None, read_timeout=None):
493     """Helper for making a multi-node static call
494
495     """
496     body = serializer.DumpJson(args, indent=False)
497     c = Client(procedure, body, netutils.GetDaemonPort(constants.NODED))
498     c.ConnectList(node_list, address_list=address_list,
499                   read_timeout=read_timeout)
500     return c.GetResults()
501
502   def _SingleNodeCall(self, node, procedure, args, read_timeout=None):
503     """Helper for making a single-node call
504
505     """
506     body = serializer.DumpJson(args, indent=False)
507     c = Client(procedure, body, self.port)
508     result = self._ConnectNode(c, node, procedure, read_timeout=read_timeout)
509     if result is None:
510       # we did connect, node is not offline
511       result = c.GetResults()[node]
512     return result
513
514   @classmethod
515   def _StaticSingleNodeCall(cls, node, procedure, args, read_timeout=None):
516     """Helper for making a single-node static call
517
518     """
519     body = serializer.DumpJson(args, indent=False)
520     c = Client(procedure, body, netutils.GetDaemonPort(constants.NODED))
521     c.ConnectNode(node, read_timeout=read_timeout)
522     return c.GetResults()[node]
523
524   @staticmethod
525   def _Compress(data):
526     """Compresses a string for transport over RPC.
527
528     Small amounts of data are not compressed.
529
530     @type data: str
531     @param data: Data
532     @rtype: tuple
533     @return: Encoded data to send
534
535     """
536     # Small amounts of data are not compressed
537     if len(data) < 512:
538       return (constants.RPC_ENCODING_NONE, data)
539
540     # Compress with zlib and encode in base64
541     return (constants.RPC_ENCODING_ZLIB_BASE64,
542             base64.b64encode(zlib.compress(data, 3)))
543
544   #
545   # Begin RPC calls
546   #
547
548   @_RpcTimeout(_TMO_URGENT)
549   def call_lv_list(self, node_list, vg_name):
550     """Gets the logical volumes present in a given volume group.
551
552     This is a multi-node call.
553
554     """
555     return self._MultiNodeCall(node_list, "lv_list", [vg_name])
556
557   @_RpcTimeout(_TMO_URGENT)
558   def call_vg_list(self, node_list):
559     """Gets the volume group list.
560
561     This is a multi-node call.
562
563     """
564     return self._MultiNodeCall(node_list, "vg_list", [])
565
566   @_RpcTimeout(_TMO_NORMAL)
567   def call_storage_list(self, node_list, su_name, su_args, name, fields):
568     """Get list of storage units.
569
570     This is a multi-node call.
571
572     """
573     return self._MultiNodeCall(node_list, "storage_list",
574                                [su_name, su_args, name, fields])
575
576   @_RpcTimeout(_TMO_NORMAL)
577   def call_storage_modify(self, node, su_name, su_args, name, changes):
578     """Modify a storage unit.
579
580     This is a single-node call.
581
582     """
583     return self._SingleNodeCall(node, "storage_modify",
584                                 [su_name, su_args, name, changes])
585
586   @_RpcTimeout(_TMO_NORMAL)
587   def call_storage_execute(self, node, su_name, su_args, name, op):
588     """Executes an operation on a storage unit.
589
590     This is a single-node call.
591
592     """
593     return self._SingleNodeCall(node, "storage_execute",
594                                 [su_name, su_args, name, op])
595
596   @_RpcTimeout(_TMO_URGENT)
597   def call_bridges_exist(self, node, bridges_list):
598     """Checks if a node has all the bridges given.
599
600     This method checks if all bridges given in the bridges_list are
601     present on the remote node, so that an instance that uses interfaces
602     on those bridges can be started.
603
604     This is a single-node call.
605
606     """
607     return self._SingleNodeCall(node, "bridges_exist", [bridges_list])
608
609   @_RpcTimeout(_TMO_NORMAL)
610   def call_instance_start(self, node, instance, hvp, bep):
611     """Starts an instance.
612
613     This is a single-node call.
614
615     """
616     idict = self._InstDict(instance, hvp=hvp, bep=bep)
617     return self._SingleNodeCall(node, "instance_start", [idict])
618
619   @_RpcTimeout(_TMO_NORMAL)
620   def call_instance_shutdown(self, node, instance, timeout):
621     """Stops an instance.
622
623     This is a single-node call.
624
625     """
626     return self._SingleNodeCall(node, "instance_shutdown",
627                                 [self._InstDict(instance), timeout])
628
629   @_RpcTimeout(_TMO_NORMAL)
630   def call_migration_info(self, node, instance):
631     """Gather the information necessary to prepare an instance migration.
632
633     This is a single-node call.
634
635     @type node: string
636     @param node: the node on which the instance is currently running
637     @type instance: C{objects.Instance}
638     @param instance: the instance definition
639
640     """
641     return self._SingleNodeCall(node, "migration_info",
642                                 [self._InstDict(instance)])
643
644   @_RpcTimeout(_TMO_NORMAL)
645   def call_accept_instance(self, node, instance, info, target):
646     """Prepare a node to accept an instance.
647
648     This is a single-node call.
649
650     @type node: string
651     @param node: the target node for the migration
652     @type instance: C{objects.Instance}
653     @param instance: the instance definition
654     @type info: opaque/hypervisor specific (string/data)
655     @param info: result for the call_migration_info call
656     @type target: string
657     @param target: target hostname (usually ip address) (on the node itself)
658
659     """
660     return self._SingleNodeCall(node, "accept_instance",
661                                 [self._InstDict(instance), info, target])
662
663   @_RpcTimeout(_TMO_NORMAL)
664   def call_finalize_migration(self, node, instance, info, success):
665     """Finalize any target-node migration specific operation.
666
667     This is called both in case of a successful migration and in case of error
668     (in which case it should abort the migration).
669
670     This is a single-node call.
671
672     @type node: string
673     @param node: the target node for the migration
674     @type instance: C{objects.Instance}
675     @param instance: the instance definition
676     @type info: opaque/hypervisor specific (string/data)
677     @param info: result for the call_migration_info call
678     @type success: boolean
679     @param success: whether the migration was a success or a failure
680
681     """
682     return self._SingleNodeCall(node, "finalize_migration",
683                                 [self._InstDict(instance), info, success])
684
685   @_RpcTimeout(_TMO_SLOW)
686   def call_instance_migrate(self, node, instance, target, live):
687     """Migrate an instance.
688
689     This is a single-node call.
690
691     @type node: string
692     @param node: the node on which the instance is currently running
693     @type instance: C{objects.Instance}
694     @param instance: the instance definition
695     @type target: string
696     @param target: the target node name
697     @type live: boolean
698     @param live: whether the migration should be done live or not (the
699         interpretation of this parameter is left to the hypervisor)
700
701     """
702     return self._SingleNodeCall(node, "instance_migrate",
703                                 [self._InstDict(instance), target, live])
704
705   @_RpcTimeout(_TMO_NORMAL)
706   def call_instance_reboot(self, node, inst, reboot_type, shutdown_timeout):
707     """Reboots an instance.
708
709     This is a single-node call.
710
711     """
712     return self._SingleNodeCall(node, "instance_reboot",
713                                 [self._InstDict(inst), reboot_type,
714                                  shutdown_timeout])
715
716   @_RpcTimeout(_TMO_1DAY)
717   def call_instance_os_add(self, node, inst, reinstall, debug):
718     """Installs an OS on the given instance.
719
720     This is a single-node call.
721
722     """
723     return self._SingleNodeCall(node, "instance_os_add",
724                                 [self._InstDict(inst), reinstall, debug])
725
726   @_RpcTimeout(_TMO_SLOW)
727   def call_instance_run_rename(self, node, inst, old_name, debug):
728     """Run the OS rename script for an instance.
729
730     This is a single-node call.
731
732     """
733     return self._SingleNodeCall(node, "instance_run_rename",
734                                 [self._InstDict(inst), old_name, debug])
735
736   @_RpcTimeout(_TMO_URGENT)
737   def call_instance_info(self, node, instance, hname):
738     """Returns information about a single instance.
739
740     This is a single-node call.
741
742     @type node: list
743     @param node: the list of nodes to query
744     @type instance: string
745     @param instance: the instance name
746     @type hname: string
747     @param hname: the hypervisor type of the instance
748
749     """
750     return self._SingleNodeCall(node, "instance_info", [instance, hname])
751
752   @_RpcTimeout(_TMO_NORMAL)
753   def call_instance_migratable(self, node, instance):
754     """Checks whether the given instance can be migrated.
755
756     This is a single-node call.
757
758     @param node: the node to query
759     @type instance: L{objects.Instance}
760     @param instance: the instance to check
761
762
763     """
764     return self._SingleNodeCall(node, "instance_migratable",
765                                 [self._InstDict(instance)])
766
767   @_RpcTimeout(_TMO_URGENT)
768   def call_all_instances_info(self, node_list, hypervisor_list):
769     """Returns information about all instances on the given nodes.
770
771     This is a multi-node call.
772
773     @type node_list: list
774     @param node_list: the list of nodes to query
775     @type hypervisor_list: list
776     @param hypervisor_list: the hypervisors to query for instances
777
778     """
779     return self._MultiNodeCall(node_list, "all_instances_info",
780                                [hypervisor_list])
781
782   @_RpcTimeout(_TMO_URGENT)
783   def call_instance_list(self, node_list, hypervisor_list):
784     """Returns the list of running instances on a given node.
785
786     This is a multi-node call.
787
788     @type node_list: list
789     @param node_list: the list of nodes to query
790     @type hypervisor_list: list
791     @param hypervisor_list: the hypervisors to query for instances
792
793     """
794     return self._MultiNodeCall(node_list, "instance_list", [hypervisor_list])
795
796   @_RpcTimeout(_TMO_FAST)
797   def call_node_tcp_ping(self, node, source, target, port, timeout,
798                          live_port_needed):
799     """Do a TcpPing on the remote node
800
801     This is a single-node call.
802
803     """
804     return self._SingleNodeCall(node, "node_tcp_ping",
805                                 [source, target, port, timeout,
806                                  live_port_needed])
807
808   @_RpcTimeout(_TMO_FAST)
809   def call_node_has_ip_address(self, node, address):
810     """Checks if a node has the given IP address.
811
812     This is a single-node call.
813
814     """
815     return self._SingleNodeCall(node, "node_has_ip_address", [address])
816
817   @_RpcTimeout(_TMO_URGENT)
818   def call_node_info(self, node_list, vg_name, hypervisor_type):
819     """Return node information.
820
821     This will return memory information and volume group size and free
822     space.
823
824     This is a multi-node call.
825
826     @type node_list: list
827     @param node_list: the list of nodes to query
828     @type vg_name: C{string}
829     @param vg_name: the name of the volume group to ask for disk space
830         information
831     @type hypervisor_type: C{str}
832     @param hypervisor_type: the name of the hypervisor to ask for
833         memory information
834
835     """
836     return self._MultiNodeCall(node_list, "node_info",
837                                [vg_name, hypervisor_type])
838
839   @_RpcTimeout(_TMO_NORMAL)
840   def call_node_add(self, node, dsa, dsapub, rsa, rsapub, ssh, sshpub):
841     """Add a node to the cluster.
842
843     This is a single-node call.
844
845     """
846     return self._SingleNodeCall(node, "node_add",
847                                 [dsa, dsapub, rsa, rsapub, ssh, sshpub])
848
849   @_RpcTimeout(_TMO_NORMAL)
850   def call_node_verify(self, node_list, checkdict, cluster_name):
851     """Request verification of given parameters.
852
853     This is a multi-node call.
854
855     """
856     return self._MultiNodeCall(node_list, "node_verify",
857                                [checkdict, cluster_name])
858
859   @classmethod
860   @_RpcTimeout(_TMO_FAST)
861   def call_node_start_master(cls, node, start_daemons, no_voting):
862     """Tells a node to activate itself as a master.
863
864     This is a single-node call.
865
866     """
867     return cls._StaticSingleNodeCall(node, "node_start_master",
868                                      [start_daemons, no_voting])
869
870   @classmethod
871   @_RpcTimeout(_TMO_FAST)
872   def call_node_stop_master(cls, node, stop_daemons):
873     """Tells a node to demote itself from master status.
874
875     This is a single-node call.
876
877     """
878     return cls._StaticSingleNodeCall(node, "node_stop_master", [stop_daemons])
879
880   @classmethod
881   @_RpcTimeout(_TMO_URGENT)
882   def call_master_info(cls, node_list):
883     """Query master info.
884
885     This is a multi-node call.
886
887     """
888     # TODO: should this method query down nodes?
889     return cls._StaticMultiNodeCall(node_list, "master_info", [])
890
891   @classmethod
892   @_RpcTimeout(_TMO_URGENT)
893   def call_version(cls, node_list):
894     """Query node version.
895
896     This is a multi-node call.
897
898     """
899     return cls._StaticMultiNodeCall(node_list, "version", [])
900
901   @_RpcTimeout(_TMO_NORMAL)
902   def call_blockdev_create(self, node, bdev, size, owner, on_primary, info):
903     """Request creation of a given block device.
904
905     This is a single-node call.
906
907     """
908     return self._SingleNodeCall(node, "blockdev_create",
909                                 [bdev.ToDict(), size, owner, on_primary, info])
910
911   @_RpcTimeout(_TMO_NORMAL)
912   def call_blockdev_remove(self, node, bdev):
913     """Request removal of a given block device.
914
915     This is a single-node call.
916
917     """
918     return self._SingleNodeCall(node, "blockdev_remove", [bdev.ToDict()])
919
920   @_RpcTimeout(_TMO_NORMAL)
921   def call_blockdev_rename(self, node, devlist):
922     """Request rename of the given block devices.
923
924     This is a single-node call.
925
926     """
927     return self._SingleNodeCall(node, "blockdev_rename",
928                                 [(d.ToDict(), uid) for d, uid in devlist])
929
930   @_RpcTimeout(_TMO_NORMAL)
931   def call_blockdev_assemble(self, node, disk, owner, on_primary):
932     """Request assembling of a given block device.
933
934     This is a single-node call.
935
936     """
937     return self._SingleNodeCall(node, "blockdev_assemble",
938                                 [disk.ToDict(), owner, on_primary])
939
940   @_RpcTimeout(_TMO_NORMAL)
941   def call_blockdev_shutdown(self, node, disk):
942     """Request shutdown of a given block device.
943
944     This is a single-node call.
945
946     """
947     return self._SingleNodeCall(node, "blockdev_shutdown", [disk.ToDict()])
948
949   @_RpcTimeout(_TMO_NORMAL)
950   def call_blockdev_addchildren(self, node, bdev, ndevs):
951     """Request adding a list of children to a (mirroring) device.
952
953     This is a single-node call.
954
955     """
956     return self._SingleNodeCall(node, "blockdev_addchildren",
957                                 [bdev.ToDict(),
958                                  [disk.ToDict() for disk in ndevs]])
959
960   @_RpcTimeout(_TMO_NORMAL)
961   def call_blockdev_removechildren(self, node, bdev, ndevs):
962     """Request removing a list of children from a (mirroring) device.
963
964     This is a single-node call.
965
966     """
967     return self._SingleNodeCall(node, "blockdev_removechildren",
968                                 [bdev.ToDict(),
969                                  [disk.ToDict() for disk in ndevs]])
970
971   @_RpcTimeout(_TMO_NORMAL)
972   def call_blockdev_getmirrorstatus(self, node, disks):
973     """Request status of a (mirroring) device.
974
975     This is a single-node call.
976
977     """
978     result = self._SingleNodeCall(node, "blockdev_getmirrorstatus",
979                                   [dsk.ToDict() for dsk in disks])
980     if not result.fail_msg:
981       result.payload = [objects.BlockDevStatus.FromDict(i)
982                         for i in result.payload]
983     return result
984
985   @_RpcTimeout(_TMO_NORMAL)
986   def call_blockdev_find(self, node, disk):
987     """Request identification of a given block device.
988
989     This is a single-node call.
990
991     """
992     result = self._SingleNodeCall(node, "blockdev_find", [disk.ToDict()])
993     if not result.fail_msg and result.payload is not None:
994       result.payload = objects.BlockDevStatus.FromDict(result.payload)
995     return result
996
997   @_RpcTimeout(_TMO_NORMAL)
998   def call_blockdev_close(self, node, instance_name, disks):
999     """Closes the given block devices.
1000
1001     This is a single-node call.
1002
1003     """
1004     params = [instance_name, [cf.ToDict() for cf in disks]]
1005     return self._SingleNodeCall(node, "blockdev_close", params)
1006
1007   @_RpcTimeout(_TMO_NORMAL)
1008   def call_blockdev_getsizes(self, node, disks):
1009     """Returns the size of the given disks.
1010
1011     This is a single-node call.
1012
1013     """
1014     params = [[cf.ToDict() for cf in disks]]
1015     return self._SingleNodeCall(node, "blockdev_getsize", params)
1016
1017   @_RpcTimeout(_TMO_NORMAL)
1018   def call_drbd_disconnect_net(self, node_list, nodes_ip, disks):
1019     """Disconnects the network of the given drbd devices.
1020
1021     This is a multi-node call.
1022
1023     """
1024     return self._MultiNodeCall(node_list, "drbd_disconnect_net",
1025                                [nodes_ip, [cf.ToDict() for cf in disks]])
1026
1027   @_RpcTimeout(_TMO_NORMAL)
1028   def call_drbd_attach_net(self, node_list, nodes_ip,
1029                            disks, instance_name, multimaster):
1030     """Disconnects the given drbd devices.
1031
1032     This is a multi-node call.
1033
1034     """
1035     return self._MultiNodeCall(node_list, "drbd_attach_net",
1036                                [nodes_ip, [cf.ToDict() for cf in disks],
1037                                 instance_name, multimaster])
1038
1039   @_RpcTimeout(_TMO_SLOW)
1040   def call_drbd_wait_sync(self, node_list, nodes_ip, disks):
1041     """Waits for the synchronization of drbd devices is complete.
1042
1043     This is a multi-node call.
1044
1045     """
1046     return self._MultiNodeCall(node_list, "drbd_wait_sync",
1047                                [nodes_ip, [cf.ToDict() for cf in disks]])
1048
1049   @_RpcTimeout(_TMO_URGENT)
1050   def call_drbd_helper(self, node_list):
1051     """Gets drbd helper.
1052
1053     This is a multi-node call.
1054
1055     """
1056     return self._MultiNodeCall(node_list, "drbd_helper", [])
1057
1058   @classmethod
1059   @_RpcTimeout(_TMO_NORMAL)
1060   def call_upload_file(cls, node_list, file_name, address_list=None):
1061     """Upload a file.
1062
1063     The node will refuse the operation in case the file is not on the
1064     approved file list.
1065
1066     This is a multi-node call.
1067
1068     @type node_list: list
1069     @param node_list: the list of node names to upload to
1070     @type file_name: str
1071     @param file_name: the filename to upload
1072     @type address_list: list or None
1073     @keyword address_list: an optional list of node addresses, in order
1074         to optimize the RPC speed
1075
1076     """
1077     file_contents = utils.ReadFile(file_name)
1078     data = cls._Compress(file_contents)
1079     st = os.stat(file_name)
1080     params = [file_name, data, st.st_mode, st.st_uid, st.st_gid,
1081               st.st_atime, st.st_mtime]
1082     return cls._StaticMultiNodeCall(node_list, "upload_file", params,
1083                                     address_list=address_list)
1084
1085   @classmethod
1086   @_RpcTimeout(_TMO_NORMAL)
1087   def call_write_ssconf_files(cls, node_list, values):
1088     """Write ssconf files.
1089
1090     This is a multi-node call.
1091
1092     """
1093     return cls._StaticMultiNodeCall(node_list, "write_ssconf_files", [values])
1094
1095   @_RpcTimeout(_TMO_FAST)
1096   def call_os_diagnose(self, node_list):
1097     """Request a diagnose of OS definitions.
1098
1099     This is a multi-node call.
1100
1101     """
1102     return self._MultiNodeCall(node_list, "os_diagnose", [])
1103
1104   @_RpcTimeout(_TMO_FAST)
1105   def call_os_get(self, node, name):
1106     """Returns an OS definition.
1107
1108     This is a single-node call.
1109
1110     """
1111     result = self._SingleNodeCall(node, "os_get", [name])
1112     if not result.fail_msg and isinstance(result.payload, dict):
1113       result.payload = objects.OS.FromDict(result.payload)
1114     return result
1115
1116   @_RpcTimeout(_TMO_FAST)
1117   def call_os_validate(self, required, nodes, name, checks, params):
1118     """Run a validation routine for a given OS.
1119
1120     This is a multi-node call.
1121
1122     """
1123     return self._MultiNodeCall(nodes, "os_validate",
1124                                [required, name, checks, params])
1125
1126   @_RpcTimeout(_TMO_NORMAL)
1127   def call_hooks_runner(self, node_list, hpath, phase, env):
1128     """Call the hooks runner.
1129
1130     Args:
1131       - op: the OpCode instance
1132       - env: a dictionary with the environment
1133
1134     This is a multi-node call.
1135
1136     """
1137     params = [hpath, phase, env]
1138     return self._MultiNodeCall(node_list, "hooks_runner", params)
1139
1140   @_RpcTimeout(_TMO_NORMAL)
1141   def call_iallocator_runner(self, node, name, idata):
1142     """Call an iallocator on a remote node
1143
1144     Args:
1145       - name: the iallocator name
1146       - input: the json-encoded input string
1147
1148     This is a single-node call.
1149
1150     """
1151     return self._SingleNodeCall(node, "iallocator_runner", [name, idata])
1152
1153   @_RpcTimeout(_TMO_NORMAL)
1154   def call_blockdev_grow(self, node, cf_bdev, amount):
1155     """Request a snapshot of the given block device.
1156
1157     This is a single-node call.
1158
1159     """
1160     return self._SingleNodeCall(node, "blockdev_grow",
1161                                 [cf_bdev.ToDict(), amount])
1162
1163   @_RpcTimeout(_TMO_1DAY)
1164   def call_blockdev_export(self, node, cf_bdev,
1165                            dest_node, dest_path, cluster_name):
1166     """Export a given disk to another node.
1167
1168     This is a single-node call.
1169
1170     """
1171     return self._SingleNodeCall(node, "blockdev_export",
1172                                 [cf_bdev.ToDict(), dest_node, dest_path,
1173                                  cluster_name])
1174
1175   @_RpcTimeout(_TMO_NORMAL)
1176   def call_blockdev_snapshot(self, node, cf_bdev):
1177     """Request a snapshot of the given block device.
1178
1179     This is a single-node call.
1180
1181     """
1182     return self._SingleNodeCall(node, "blockdev_snapshot", [cf_bdev.ToDict()])
1183
1184   @_RpcTimeout(_TMO_NORMAL)
1185   def call_finalize_export(self, node, instance, snap_disks):
1186     """Request the completion of an export operation.
1187
1188     This writes the export config file, etc.
1189
1190     This is a single-node call.
1191
1192     """
1193     flat_disks = []
1194     for disk in snap_disks:
1195       if isinstance(disk, bool):
1196         flat_disks.append(disk)
1197       else:
1198         flat_disks.append(disk.ToDict())
1199
1200     return self._SingleNodeCall(node, "finalize_export",
1201                                 [self._InstDict(instance), flat_disks])
1202
1203   @_RpcTimeout(_TMO_FAST)
1204   def call_export_info(self, node, path):
1205     """Queries the export information in a given path.
1206
1207     This is a single-node call.
1208
1209     """
1210     return self._SingleNodeCall(node, "export_info", [path])
1211
1212   @_RpcTimeout(_TMO_FAST)
1213   def call_export_list(self, node_list):
1214     """Gets the stored exports list.
1215
1216     This is a multi-node call.
1217
1218     """
1219     return self._MultiNodeCall(node_list, "export_list", [])
1220
1221   @_RpcTimeout(_TMO_FAST)
1222   def call_export_remove(self, node, export):
1223     """Requests removal of a given export.
1224
1225     This is a single-node call.
1226
1227     """
1228     return self._SingleNodeCall(node, "export_remove", [export])
1229
1230   @classmethod
1231   @_RpcTimeout(_TMO_NORMAL)
1232   def call_node_leave_cluster(cls, node, modify_ssh_setup):
1233     """Requests a node to clean the cluster information it has.
1234
1235     This will remove the configuration information from the ganeti data
1236     dir.
1237
1238     This is a single-node call.
1239
1240     """
1241     return cls._StaticSingleNodeCall(node, "node_leave_cluster",
1242                                      [modify_ssh_setup])
1243
1244   @_RpcTimeout(_TMO_FAST)
1245   def call_node_volumes(self, node_list):
1246     """Gets all volumes on node(s).
1247
1248     This is a multi-node call.
1249
1250     """
1251     return self._MultiNodeCall(node_list, "node_volumes", [])
1252
1253   @_RpcTimeout(_TMO_FAST)
1254   def call_node_demote_from_mc(self, node):
1255     """Demote a node from the master candidate role.
1256
1257     This is a single-node call.
1258
1259     """
1260     return self._SingleNodeCall(node, "node_demote_from_mc", [])
1261
1262   @_RpcTimeout(_TMO_NORMAL)
1263   def call_node_powercycle(self, node, hypervisor):
1264     """Tries to powercycle a node.
1265
1266     This is a single-node call.
1267
1268     """
1269     return self._SingleNodeCall(node, "node_powercycle", [hypervisor])
1270
1271   @_RpcTimeout(None)
1272   def call_test_delay(self, node_list, duration):
1273     """Sleep for a fixed time on given node(s).
1274
1275     This is a multi-node call.
1276
1277     """
1278     return self._MultiNodeCall(node_list, "test_delay", [duration],
1279                                read_timeout=int(duration + 5))
1280
1281   @_RpcTimeout(_TMO_FAST)
1282   def call_file_storage_dir_create(self, node, file_storage_dir):
1283     """Create the given file storage directory.
1284
1285     This is a single-node call.
1286
1287     """
1288     return self._SingleNodeCall(node, "file_storage_dir_create",
1289                                 [file_storage_dir])
1290
1291   @_RpcTimeout(_TMO_FAST)
1292   def call_file_storage_dir_remove(self, node, file_storage_dir):
1293     """Remove the given file storage directory.
1294
1295     This is a single-node call.
1296
1297     """
1298     return self._SingleNodeCall(node, "file_storage_dir_remove",
1299                                 [file_storage_dir])
1300
1301   @_RpcTimeout(_TMO_FAST)
1302   def call_file_storage_dir_rename(self, node, old_file_storage_dir,
1303                                    new_file_storage_dir):
1304     """Rename file storage directory.
1305
1306     This is a single-node call.
1307
1308     """
1309     return self._SingleNodeCall(node, "file_storage_dir_rename",
1310                                 [old_file_storage_dir, new_file_storage_dir])
1311
1312   @classmethod
1313   @_RpcTimeout(_TMO_FAST)
1314   def call_jobqueue_update(cls, node_list, address_list, file_name, content):
1315     """Update job queue.
1316
1317     This is a multi-node call.
1318
1319     """
1320     return cls._StaticMultiNodeCall(node_list, "jobqueue_update",
1321                                     [file_name, cls._Compress(content)],
1322                                     address_list=address_list)
1323
1324   @classmethod
1325   @_RpcTimeout(_TMO_NORMAL)
1326   def call_jobqueue_purge(cls, node):
1327     """Purge job queue.
1328
1329     This is a single-node call.
1330
1331     """
1332     return cls._StaticSingleNodeCall(node, "jobqueue_purge", [])
1333
1334   @classmethod
1335   @_RpcTimeout(_TMO_FAST)
1336   def call_jobqueue_rename(cls, node_list, address_list, rename):
1337     """Rename a job queue file.
1338
1339     This is a multi-node call.
1340
1341     """
1342     return cls._StaticMultiNodeCall(node_list, "jobqueue_rename", rename,
1343                                     address_list=address_list)
1344
1345   @_RpcTimeout(_TMO_NORMAL)
1346   def call_hypervisor_validate_params(self, node_list, hvname, hvparams):
1347     """Validate the hypervisor params.
1348
1349     This is a multi-node call.
1350
1351     @type node_list: list
1352     @param node_list: the list of nodes to query
1353     @type hvname: string
1354     @param hvname: the hypervisor name
1355     @type hvparams: dict
1356     @param hvparams: the hypervisor parameters to be validated
1357
1358     """
1359     cluster = self._cfg.GetClusterInfo()
1360     hv_full = objects.FillDict(cluster.hvparams.get(hvname, {}), hvparams)
1361     return self._MultiNodeCall(node_list, "hypervisor_validate_params",
1362                                [hvname, hv_full])
1363
1364   @_RpcTimeout(_TMO_NORMAL)
1365   def call_x509_cert_create(self, node, validity):
1366     """Creates a new X509 certificate for SSL/TLS.
1367
1368     This is a single-node call.
1369
1370     @type validity: int
1371     @param validity: Validity in seconds
1372
1373     """
1374     return self._SingleNodeCall(node, "x509_cert_create", [validity])
1375
1376   @_RpcTimeout(_TMO_NORMAL)
1377   def call_x509_cert_remove(self, node, name):
1378     """Removes a X509 certificate.
1379
1380     This is a single-node call.
1381
1382     @type name: string
1383     @param name: Certificate name
1384
1385     """
1386     return self._SingleNodeCall(node, "x509_cert_remove", [name])
1387
1388   @_RpcTimeout(_TMO_NORMAL)
1389   def call_import_start(self, node, opts, instance, dest, dest_args):
1390     """Starts a listener for an import.
1391
1392     This is a single-node call.
1393
1394     @type node: string
1395     @param node: Node name
1396     @type instance: C{objects.Instance}
1397     @param instance: Instance object
1398
1399     """
1400     return self._SingleNodeCall(node, "import_start",
1401                                 [opts.ToDict(),
1402                                  self._InstDict(instance), dest,
1403                                  _EncodeImportExportIO(dest, dest_args)])
1404
1405   @_RpcTimeout(_TMO_NORMAL)
1406   def call_export_start(self, node, opts, host, port,
1407                         instance, source, source_args):
1408     """Starts an export daemon.
1409
1410     This is a single-node call.
1411
1412     @type node: string
1413     @param node: Node name
1414     @type instance: C{objects.Instance}
1415     @param instance: Instance object
1416
1417     """
1418     return self._SingleNodeCall(node, "export_start",
1419                                 [opts.ToDict(), host, port,
1420                                  self._InstDict(instance), source,
1421                                  _EncodeImportExportIO(source, source_args)])
1422
1423   @_RpcTimeout(_TMO_FAST)
1424   def call_impexp_status(self, node, names):
1425     """Gets the status of an import or export.
1426
1427     This is a single-node call.
1428
1429     @type node: string
1430     @param node: Node name
1431     @type names: List of strings
1432     @param names: Import/export names
1433     @rtype: List of L{objects.ImportExportStatus} instances
1434     @return: Returns a list of the state of each named import/export or None if
1435              a status couldn't be retrieved
1436
1437     """
1438     result = self._SingleNodeCall(node, "impexp_status", [names])
1439
1440     if not result.fail_msg:
1441       decoded = []
1442
1443       for i in result.payload:
1444         if i is None:
1445           decoded.append(None)
1446           continue
1447         decoded.append(objects.ImportExportStatus.FromDict(i))
1448
1449       result.payload = decoded
1450
1451     return result
1452
1453   @_RpcTimeout(_TMO_NORMAL)
1454   def call_impexp_abort(self, node, name):
1455     """Aborts an import or export.
1456
1457     This is a single-node call.
1458
1459     @type node: string
1460     @param node: Node name
1461     @type name: string
1462     @param name: Import/export name
1463
1464     """
1465     return self._SingleNodeCall(node, "impexp_abort", [name])
1466
1467   @_RpcTimeout(_TMO_NORMAL)
1468   def call_impexp_cleanup(self, node, name):
1469     """Cleans up after an import or export.
1470
1471     This is a single-node call.
1472
1473     @type node: string
1474     @param node: Node name
1475     @type name: string
1476     @param name: Import/export name
1477
1478     """
1479     return self._SingleNodeCall(node, "impexp_cleanup", [name])