Merge branch 'devel-2.2'
[ganeti-local] / lib / rpc.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007, 2008, 2009, 2010 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Inter-node RPC library.
23
24 """
25
26 # pylint: disable-msg=C0103,R0201,R0904
27 # C0103: Invalid name, since call_ are not valid
28 # R0201: Method could be a function, we keep all rpcs instance methods
29 # as not to change them back and forth between static/instance methods
30 # if they need to start using instance attributes
31 # R0904: Too many public methods
32
33 import os
34 import logging
35 import zlib
36 import base64
37 import pycurl
38 import threading
39
40 from ganeti import utils
41 from ganeti import objects
42 from ganeti import http
43 from ganeti import serializer
44 from ganeti import constants
45 from ganeti import errors
46 from ganeti import netutils
47 from ganeti import ssconf
48
49 # pylint has a bug here, doesn't see this import
50 import ganeti.http.client  # pylint: disable-msg=W0611
51
52
53 # Timeout for connecting to nodes (seconds)
54 _RPC_CONNECT_TIMEOUT = 5
55
56 _RPC_CLIENT_HEADERS = [
57   "Content-type: %s" % http.HTTP_APP_JSON,
58   "Expect:",
59   ]
60
61 # Various time constants for the timeout table
62 _TMO_URGENT = 60 # one minute
63 _TMO_FAST = 5 * 60 # five minutes
64 _TMO_NORMAL = 15 * 60 # 15 minutes
65 _TMO_SLOW = 3600 # one hour
66 _TMO_4HRS = 4 * 3600
67 _TMO_1DAY = 86400
68
69 # Timeout table that will be built later by decorators
70 # Guidelines for choosing timeouts:
71 # - call used during watcher: timeout -> 1min, _TMO_URGENT
72 # - trivial (but be sure it is trivial) (e.g. reading a file): 5min, _TMO_FAST
73 # - other calls: 15 min, _TMO_NORMAL
74 # - special calls (instance add, etc.): either _TMO_SLOW (1h) or huge timeouts
75
76 _TIMEOUTS = {
77 }
78
79
80 def Init():
81   """Initializes the module-global HTTP client manager.
82
83   Must be called before using any RPC function and while exactly one thread is
84   running.
85
86   """
87   # curl_global_init(3) and curl_global_cleanup(3) must be called with only
88   # one thread running. This check is just a safety measure -- it doesn't
89   # cover all cases.
90   assert threading.activeCount() == 1, \
91          "Found more than one active thread when initializing pycURL"
92
93   logging.info("Using PycURL %s", pycurl.version)
94
95   pycurl.global_init(pycurl.GLOBAL_ALL)
96
97
98 def Shutdown():
99   """Stops the module-global HTTP client manager.
100
101   Must be called before quitting the program and while exactly one thread is
102   running.
103
104   """
105   pycurl.global_cleanup()
106
107
108 def _ConfigRpcCurl(curl):
109   noded_cert = str(constants.NODED_CERT_FILE)
110
111   curl.setopt(pycurl.FOLLOWLOCATION, False)
112   curl.setopt(pycurl.CAINFO, noded_cert)
113   curl.setopt(pycurl.SSL_VERIFYHOST, 0)
114   curl.setopt(pycurl.SSL_VERIFYPEER, True)
115   curl.setopt(pycurl.SSLCERTTYPE, "PEM")
116   curl.setopt(pycurl.SSLCERT, noded_cert)
117   curl.setopt(pycurl.SSLKEYTYPE, "PEM")
118   curl.setopt(pycurl.SSLKEY, noded_cert)
119   curl.setopt(pycurl.CONNECTTIMEOUT, _RPC_CONNECT_TIMEOUT)
120
121
122 class _RpcThreadLocal(threading.local):
123   def GetHttpClientPool(self):
124     """Returns a per-thread HTTP client pool.
125
126     @rtype: L{http.client.HttpClientPool}
127
128     """
129     try:
130       pool = self.hcp
131     except AttributeError:
132       pool = http.client.HttpClientPool(_ConfigRpcCurl)
133       self.hcp = pool
134
135     return pool
136
137
138 _thread_local = _RpcThreadLocal()
139
140
141 def _RpcTimeout(secs):
142   """Timeout decorator.
143
144   When applied to a rpc call_* function, it updates the global timeout
145   table with the given function/timeout.
146
147   """
148   def decorator(f):
149     name = f.__name__
150     assert name.startswith("call_")
151     _TIMEOUTS[name[len("call_"):]] = secs
152     return f
153   return decorator
154
155
156 def RunWithRPC(fn):
157   """RPC-wrapper decorator.
158
159   When applied to a function, it runs it with the RPC system
160   initialized, and it shutsdown the system afterwards. This means the
161   function must be called without RPC being initialized.
162
163   """
164   def wrapper(*args, **kwargs):
165     Init()
166     try:
167       return fn(*args, **kwargs)
168     finally:
169       Shutdown()
170   return wrapper
171
172
173 class RpcResult(object):
174   """RPC Result class.
175
176   This class holds an RPC result. It is needed since in multi-node
177   calls we can't raise an exception just because one one out of many
178   failed, and therefore we use this class to encapsulate the result.
179
180   @ivar data: the data payload, for successful results, or None
181   @ivar call: the name of the RPC call
182   @ivar node: the name of the node to which we made the call
183   @ivar offline: whether the operation failed because the node was
184       offline, as opposed to actual failure; offline=True will always
185       imply failed=True, in order to allow simpler checking if
186       the user doesn't care about the exact failure mode
187   @ivar fail_msg: the error message if the call failed
188
189   """
190   def __init__(self, data=None, failed=False, offline=False,
191                call=None, node=None):
192     self.offline = offline
193     self.call = call
194     self.node = node
195
196     if offline:
197       self.fail_msg = "Node is marked offline"
198       self.data = self.payload = None
199     elif failed:
200       self.fail_msg = self._EnsureErr(data)
201       self.data = self.payload = None
202     else:
203       self.data = data
204       if not isinstance(self.data, (tuple, list)):
205         self.fail_msg = ("RPC layer error: invalid result type (%s)" %
206                          type(self.data))
207         self.payload = None
208       elif len(data) != 2:
209         self.fail_msg = ("RPC layer error: invalid result length (%d), "
210                          "expected 2" % len(self.data))
211         self.payload = None
212       elif not self.data[0]:
213         self.fail_msg = self._EnsureErr(self.data[1])
214         self.payload = None
215       else:
216         # finally success
217         self.fail_msg = None
218         self.payload = data[1]
219
220     assert hasattr(self, "call")
221     assert hasattr(self, "data")
222     assert hasattr(self, "fail_msg")
223     assert hasattr(self, "node")
224     assert hasattr(self, "offline")
225     assert hasattr(self, "payload")
226
227   @staticmethod
228   def _EnsureErr(val):
229     """Helper to ensure we return a 'True' value for error."""
230     if val:
231       return val
232     else:
233       return "No error information"
234
235   def Raise(self, msg, prereq=False, ecode=None):
236     """If the result has failed, raise an OpExecError.
237
238     This is used so that LU code doesn't have to check for each
239     result, but instead can call this function.
240
241     """
242     if not self.fail_msg:
243       return
244
245     if not msg: # one could pass None for default message
246       msg = ("Call '%s' to node '%s' has failed: %s" %
247              (self.call, self.node, self.fail_msg))
248     else:
249       msg = "%s: %s" % (msg, self.fail_msg)
250     if prereq:
251       ec = errors.OpPrereqError
252     else:
253       ec = errors.OpExecError
254     if ecode is not None:
255       args = (msg, ecode)
256     else:
257       args = (msg, )
258     raise ec(*args) # pylint: disable-msg=W0142
259
260
261 def _AddressLookup(node_list,
262                    ssc=ssconf.SimpleStore,
263                    nslookup_fn=netutils.Hostname.GetIP):
264   """Return addresses for given node names.
265
266   @type node_list: list
267   @param node_list: List of node names
268   @type ssc: class
269   @param ssc: SimpleStore class that is used to obtain node->ip mappings
270   @type nslookup_fn: callable
271   @param nslookup_fn: function use to do NS lookup
272   @rtype: list of addresses and/or None's
273   @returns: List of corresponding addresses, if found
274
275   """
276   ss = ssc()
277   iplist = ss.GetNodePrimaryIPList()
278   family = ss.GetPrimaryIPFamily()
279   addresses = []
280   ipmap = dict(entry.split() for entry in iplist)
281   for node in node_list:
282     address = ipmap.get(node)
283     if address is None:
284       address = nslookup_fn(node, family=family)
285     addresses.append(address)
286
287   return addresses
288
289
290 class Client:
291   """RPC Client class.
292
293   This class, given a (remote) method name, a list of parameters and a
294   list of nodes, will contact (in parallel) all nodes, and return a
295   dict of results (key: node name, value: result).
296
297   One current bug is that generic failure is still signaled by
298   'False' result, which is not good. This overloading of values can
299   cause bugs.
300
301   """
302   def __init__(self, procedure, body, port, address_lookup_fn=_AddressLookup):
303     assert procedure in _TIMEOUTS, ("New RPC call not declared in the"
304                                     " timeouts table")
305     self.procedure = procedure
306     self.body = body
307     self.port = port
308     self._request = {}
309     self._address_lookup_fn = address_lookup_fn
310
311   def ConnectList(self, node_list, address_list=None, read_timeout=None):
312     """Add a list of nodes to the target nodes.
313
314     @type node_list: list
315     @param node_list: the list of node names to connect
316     @type address_list: list or None
317     @keyword address_list: either None or a list with node addresses,
318         which must have the same length as the node list
319     @type read_timeout: int
320     @param read_timeout: overwrites default timeout for operation
321
322     """
323     if address_list is None:
324       # Always use IP address instead of node name
325       address_list = self._address_lookup_fn(node_list)
326
327     assert len(node_list) == len(address_list), \
328            "Name and address lists must have the same length"
329
330     for node, address in zip(node_list, address_list):
331       self.ConnectNode(node, address, read_timeout=read_timeout)
332
333   def ConnectNode(self, name, address=None, read_timeout=None):
334     """Add a node to the target list.
335
336     @type name: str
337     @param name: the node name
338     @type address: str
339     @param address: the node address, if known
340     @type read_timeout: int
341     @param read_timeout: overwrites default timeout for operation
342
343     """
344     if address is None:
345       # Always use IP address instead of node name
346       address = self._address_lookup_fn([name])[0]
347
348     assert(address is not None)
349
350     if read_timeout is None:
351       read_timeout = _TIMEOUTS[self.procedure]
352
353     self._request[name] = \
354       http.client.HttpClientRequest(str(address), self.port,
355                                     http.HTTP_PUT, str("/%s" % self.procedure),
356                                     headers=_RPC_CLIENT_HEADERS,
357                                     post_data=str(self.body),
358                                     read_timeout=read_timeout)
359
360   def GetResults(self, http_pool=None):
361     """Call nodes and return results.
362
363     @rtype: list
364     @return: List of RPC results
365
366     """
367     if not http_pool:
368       http_pool = _thread_local.GetHttpClientPool()
369
370     http_pool.ProcessRequests(self._request.values())
371
372     results = {}
373
374     for name, req in self._request.iteritems():
375       if req.success and req.resp_status_code == http.HTTP_OK:
376         results[name] = RpcResult(data=serializer.LoadJson(req.resp_body),
377                                   node=name, call=self.procedure)
378         continue
379
380       # TODO: Better error reporting
381       if req.error:
382         msg = req.error
383       else:
384         msg = req.resp_body
385
386       logging.error("RPC error in %s from node %s: %s",
387                     self.procedure, name, msg)
388       results[name] = RpcResult(data=msg, failed=True, node=name,
389                                 call=self.procedure)
390
391     return results
392
393
394 def _EncodeImportExportIO(ieio, ieioargs):
395   """Encodes import/export I/O information.
396
397   """
398   if ieio == constants.IEIO_RAW_DISK:
399     assert len(ieioargs) == 1
400     return (ieioargs[0].ToDict(), )
401
402   if ieio == constants.IEIO_SCRIPT:
403     assert len(ieioargs) == 2
404     return (ieioargs[0].ToDict(), ieioargs[1])
405
406   return ieioargs
407
408
409 class RpcRunner(object):
410   """RPC runner class"""
411
412   def __init__(self, cfg):
413     """Initialized the rpc runner.
414
415     @type cfg:  C{config.ConfigWriter}
416     @param cfg: the configuration object that will be used to get data
417                 about the cluster
418
419     """
420     self._cfg = cfg
421     self.port = netutils.GetDaemonPort(constants.NODED)
422
423   def _InstDict(self, instance, hvp=None, bep=None, osp=None):
424     """Convert the given instance to a dict.
425
426     This is done via the instance's ToDict() method and additionally
427     we fill the hvparams with the cluster defaults.
428
429     @type instance: L{objects.Instance}
430     @param instance: an Instance object
431     @type hvp: dict or None
432     @param hvp: a dictionary with overridden hypervisor parameters
433     @type bep: dict or None
434     @param bep: a dictionary with overridden backend parameters
435     @type osp: dict or None
436     @param osp: a dictionary with overriden os parameters
437     @rtype: dict
438     @return: the instance dict, with the hvparams filled with the
439         cluster defaults
440
441     """
442     idict = instance.ToDict()
443     cluster = self._cfg.GetClusterInfo()
444     idict["hvparams"] = cluster.FillHV(instance)
445     if hvp is not None:
446       idict["hvparams"].update(hvp)
447     idict["beparams"] = cluster.FillBE(instance)
448     if bep is not None:
449       idict["beparams"].update(bep)
450     idict["osparams"] = cluster.SimpleFillOS(instance.os, instance.osparams)
451     if osp is not None:
452       idict["osparams"].update(osp)
453     for nic in idict["nics"]:
454       nic['nicparams'] = objects.FillDict(
455         cluster.nicparams[constants.PP_DEFAULT],
456         nic['nicparams'])
457     return idict
458
459   def _ConnectList(self, client, node_list, call, read_timeout=None):
460     """Helper for computing node addresses.
461
462     @type client: L{ganeti.rpc.Client}
463     @param client: a C{Client} instance
464     @type node_list: list
465     @param node_list: the node list we should connect
466     @type call: string
467     @param call: the name of the remote procedure call, for filling in
468         correctly any eventual offline nodes' results
469     @type read_timeout: int
470     @param read_timeout: overwrites the default read timeout for the
471         given operation
472
473     """
474     all_nodes = self._cfg.GetAllNodesInfo()
475     name_list = []
476     addr_list = []
477     skip_dict = {}
478     for node in node_list:
479       if node in all_nodes:
480         if all_nodes[node].offline:
481           skip_dict[node] = RpcResult(node=node, offline=True, call=call)
482           continue
483         val = all_nodes[node].primary_ip
484       else:
485         val = None
486       addr_list.append(val)
487       name_list.append(node)
488     if name_list:
489       client.ConnectList(name_list, address_list=addr_list,
490                          read_timeout=read_timeout)
491     return skip_dict
492
493   def _ConnectNode(self, client, node, call, read_timeout=None):
494     """Helper for computing one node's address.
495
496     @type client: L{ganeti.rpc.Client}
497     @param client: a C{Client} instance
498     @type node: str
499     @param node: the node we should connect
500     @type call: string
501     @param call: the name of the remote procedure call, for filling in
502         correctly any eventual offline nodes' results
503     @type read_timeout: int
504     @param read_timeout: overwrites the default read timeout for the
505         given operation
506
507     """
508     node_info = self._cfg.GetNodeInfo(node)
509     if node_info is not None:
510       if node_info.offline:
511         return RpcResult(node=node, offline=True, call=call)
512       addr = node_info.primary_ip
513     else:
514       addr = None
515     client.ConnectNode(node, address=addr, read_timeout=read_timeout)
516
517   def _MultiNodeCall(self, node_list, procedure, args, read_timeout=None):
518     """Helper for making a multi-node call
519
520     """
521     body = serializer.DumpJson(args, indent=False)
522     c = Client(procedure, body, self.port)
523     skip_dict = self._ConnectList(c, node_list, procedure,
524                                   read_timeout=read_timeout)
525     skip_dict.update(c.GetResults())
526     return skip_dict
527
528   @classmethod
529   def _StaticMultiNodeCall(cls, node_list, procedure, args,
530                            address_list=None, read_timeout=None):
531     """Helper for making a multi-node static call
532
533     """
534     body = serializer.DumpJson(args, indent=False)
535     c = Client(procedure, body, netutils.GetDaemonPort(constants.NODED))
536     c.ConnectList(node_list, address_list=address_list,
537                   read_timeout=read_timeout)
538     return c.GetResults()
539
540   def _SingleNodeCall(self, node, procedure, args, read_timeout=None):
541     """Helper for making a single-node call
542
543     """
544     body = serializer.DumpJson(args, indent=False)
545     c = Client(procedure, body, self.port)
546     result = self._ConnectNode(c, node, procedure, read_timeout=read_timeout)
547     if result is None:
548       # we did connect, node is not offline
549       result = c.GetResults()[node]
550     return result
551
552   @classmethod
553   def _StaticSingleNodeCall(cls, node, procedure, args, read_timeout=None):
554     """Helper for making a single-node static call
555
556     """
557     body = serializer.DumpJson(args, indent=False)
558     c = Client(procedure, body, netutils.GetDaemonPort(constants.NODED))
559     c.ConnectNode(node, read_timeout=read_timeout)
560     return c.GetResults()[node]
561
562   @staticmethod
563   def _Compress(data):
564     """Compresses a string for transport over RPC.
565
566     Small amounts of data are not compressed.
567
568     @type data: str
569     @param data: Data
570     @rtype: tuple
571     @return: Encoded data to send
572
573     """
574     # Small amounts of data are not compressed
575     if len(data) < 512:
576       return (constants.RPC_ENCODING_NONE, data)
577
578     # Compress with zlib and encode in base64
579     return (constants.RPC_ENCODING_ZLIB_BASE64,
580             base64.b64encode(zlib.compress(data, 3)))
581
582   #
583   # Begin RPC calls
584   #
585
586   @_RpcTimeout(_TMO_URGENT)
587   def call_lv_list(self, node_list, vg_name):
588     """Gets the logical volumes present in a given volume group.
589
590     This is a multi-node call.
591
592     """
593     return self._MultiNodeCall(node_list, "lv_list", [vg_name])
594
595   @_RpcTimeout(_TMO_URGENT)
596   def call_vg_list(self, node_list):
597     """Gets the volume group list.
598
599     This is a multi-node call.
600
601     """
602     return self._MultiNodeCall(node_list, "vg_list", [])
603
604   @_RpcTimeout(_TMO_NORMAL)
605   def call_storage_list(self, node_list, su_name, su_args, name, fields):
606     """Get list of storage units.
607
608     This is a multi-node call.
609
610     """
611     return self._MultiNodeCall(node_list, "storage_list",
612                                [su_name, su_args, name, fields])
613
614   @_RpcTimeout(_TMO_NORMAL)
615   def call_storage_modify(self, node, su_name, su_args, name, changes):
616     """Modify a storage unit.
617
618     This is a single-node call.
619
620     """
621     return self._SingleNodeCall(node, "storage_modify",
622                                 [su_name, su_args, name, changes])
623
624   @_RpcTimeout(_TMO_NORMAL)
625   def call_storage_execute(self, node, su_name, su_args, name, op):
626     """Executes an operation on a storage unit.
627
628     This is a single-node call.
629
630     """
631     return self._SingleNodeCall(node, "storage_execute",
632                                 [su_name, su_args, name, op])
633
634   @_RpcTimeout(_TMO_URGENT)
635   def call_bridges_exist(self, node, bridges_list):
636     """Checks if a node has all the bridges given.
637
638     This method checks if all bridges given in the bridges_list are
639     present on the remote node, so that an instance that uses interfaces
640     on those bridges can be started.
641
642     This is a single-node call.
643
644     """
645     return self._SingleNodeCall(node, "bridges_exist", [bridges_list])
646
647   @_RpcTimeout(_TMO_NORMAL)
648   def call_instance_start(self, node, instance, hvp, bep):
649     """Starts an instance.
650
651     This is a single-node call.
652
653     """
654     idict = self._InstDict(instance, hvp=hvp, bep=bep)
655     return self._SingleNodeCall(node, "instance_start", [idict])
656
657   @_RpcTimeout(_TMO_NORMAL)
658   def call_instance_shutdown(self, node, instance, timeout):
659     """Stops an instance.
660
661     This is a single-node call.
662
663     """
664     return self._SingleNodeCall(node, "instance_shutdown",
665                                 [self._InstDict(instance), timeout])
666
667   @_RpcTimeout(_TMO_NORMAL)
668   def call_migration_info(self, node, instance):
669     """Gather the information necessary to prepare an instance migration.
670
671     This is a single-node call.
672
673     @type node: string
674     @param node: the node on which the instance is currently running
675     @type instance: C{objects.Instance}
676     @param instance: the instance definition
677
678     """
679     return self._SingleNodeCall(node, "migration_info",
680                                 [self._InstDict(instance)])
681
682   @_RpcTimeout(_TMO_NORMAL)
683   def call_accept_instance(self, node, instance, info, target):
684     """Prepare a node to accept an instance.
685
686     This is a single-node call.
687
688     @type node: string
689     @param node: the target node for the migration
690     @type instance: C{objects.Instance}
691     @param instance: the instance definition
692     @type info: opaque/hypervisor specific (string/data)
693     @param info: result for the call_migration_info call
694     @type target: string
695     @param target: target hostname (usually ip address) (on the node itself)
696
697     """
698     return self._SingleNodeCall(node, "accept_instance",
699                                 [self._InstDict(instance), info, target])
700
701   @_RpcTimeout(_TMO_NORMAL)
702   def call_finalize_migration(self, node, instance, info, success):
703     """Finalize any target-node migration specific operation.
704
705     This is called both in case of a successful migration and in case of error
706     (in which case it should abort the migration).
707
708     This is a single-node call.
709
710     @type node: string
711     @param node: the target node for the migration
712     @type instance: C{objects.Instance}
713     @param instance: the instance definition
714     @type info: opaque/hypervisor specific (string/data)
715     @param info: result for the call_migration_info call
716     @type success: boolean
717     @param success: whether the migration was a success or a failure
718
719     """
720     return self._SingleNodeCall(node, "finalize_migration",
721                                 [self._InstDict(instance), info, success])
722
723   @_RpcTimeout(_TMO_SLOW)
724   def call_instance_migrate(self, node, instance, target, live):
725     """Migrate an instance.
726
727     This is a single-node call.
728
729     @type node: string
730     @param node: the node on which the instance is currently running
731     @type instance: C{objects.Instance}
732     @param instance: the instance definition
733     @type target: string
734     @param target: the target node name
735     @type live: boolean
736     @param live: whether the migration should be done live or not (the
737         interpretation of this parameter is left to the hypervisor)
738
739     """
740     return self._SingleNodeCall(node, "instance_migrate",
741                                 [self._InstDict(instance), target, live])
742
743   @_RpcTimeout(_TMO_NORMAL)
744   def call_instance_reboot(self, node, inst, reboot_type, shutdown_timeout):
745     """Reboots an instance.
746
747     This is a single-node call.
748
749     """
750     return self._SingleNodeCall(node, "instance_reboot",
751                                 [self._InstDict(inst), reboot_type,
752                                  shutdown_timeout])
753
754   @_RpcTimeout(_TMO_1DAY)
755   def call_instance_os_add(self, node, inst, reinstall, debug):
756     """Installs an OS on the given instance.
757
758     This is a single-node call.
759
760     """
761     return self._SingleNodeCall(node, "instance_os_add",
762                                 [self._InstDict(inst), reinstall, debug])
763
764   @_RpcTimeout(_TMO_SLOW)
765   def call_instance_run_rename(self, node, inst, old_name, debug):
766     """Run the OS rename script for an instance.
767
768     This is a single-node call.
769
770     """
771     return self._SingleNodeCall(node, "instance_run_rename",
772                                 [self._InstDict(inst), old_name, debug])
773
774   @_RpcTimeout(_TMO_URGENT)
775   def call_instance_info(self, node, instance, hname):
776     """Returns information about a single instance.
777
778     This is a single-node call.
779
780     @type node: list
781     @param node: the list of nodes to query
782     @type instance: string
783     @param instance: the instance name
784     @type hname: string
785     @param hname: the hypervisor type of the instance
786
787     """
788     return self._SingleNodeCall(node, "instance_info", [instance, hname])
789
790   @_RpcTimeout(_TMO_NORMAL)
791   def call_instance_migratable(self, node, instance):
792     """Checks whether the given instance can be migrated.
793
794     This is a single-node call.
795
796     @param node: the node to query
797     @type instance: L{objects.Instance}
798     @param instance: the instance to check
799
800
801     """
802     return self._SingleNodeCall(node, "instance_migratable",
803                                 [self._InstDict(instance)])
804
805   @_RpcTimeout(_TMO_URGENT)
806   def call_all_instances_info(self, node_list, hypervisor_list):
807     """Returns information about all instances on the given nodes.
808
809     This is a multi-node call.
810
811     @type node_list: list
812     @param node_list: the list of nodes to query
813     @type hypervisor_list: list
814     @param hypervisor_list: the hypervisors to query for instances
815
816     """
817     return self._MultiNodeCall(node_list, "all_instances_info",
818                                [hypervisor_list])
819
820   @_RpcTimeout(_TMO_URGENT)
821   def call_instance_list(self, node_list, hypervisor_list):
822     """Returns the list of running instances on a given node.
823
824     This is a multi-node call.
825
826     @type node_list: list
827     @param node_list: the list of nodes to query
828     @type hypervisor_list: list
829     @param hypervisor_list: the hypervisors to query for instances
830
831     """
832     return self._MultiNodeCall(node_list, "instance_list", [hypervisor_list])
833
834   @_RpcTimeout(_TMO_FAST)
835   def call_node_tcp_ping(self, node, source, target, port, timeout,
836                          live_port_needed):
837     """Do a TcpPing on the remote node
838
839     This is a single-node call.
840
841     """
842     return self._SingleNodeCall(node, "node_tcp_ping",
843                                 [source, target, port, timeout,
844                                  live_port_needed])
845
846   @_RpcTimeout(_TMO_FAST)
847   def call_node_has_ip_address(self, node, address):
848     """Checks if a node has the given IP address.
849
850     This is a single-node call.
851
852     """
853     return self._SingleNodeCall(node, "node_has_ip_address", [address])
854
855   @_RpcTimeout(_TMO_URGENT)
856   def call_node_info(self, node_list, vg_name, hypervisor_type):
857     """Return node information.
858
859     This will return memory information and volume group size and free
860     space.
861
862     This is a multi-node call.
863
864     @type node_list: list
865     @param node_list: the list of nodes to query
866     @type vg_name: C{string}
867     @param vg_name: the name of the volume group to ask for disk space
868         information
869     @type hypervisor_type: C{str}
870     @param hypervisor_type: the name of the hypervisor to ask for
871         memory information
872
873     """
874     return self._MultiNodeCall(node_list, "node_info",
875                                [vg_name, hypervisor_type])
876
877   @_RpcTimeout(_TMO_NORMAL)
878   def call_etc_hosts_modify(self, node, mode, name, ip):
879     """Modify hosts file with name
880
881     @type node: string
882     @param node: The node to call
883     @type mode: string
884     @param mode: The mode to operate. Currently "add" or "remove"
885     @type name: string
886     @param name: The host name to be modified
887     @type ip: string
888     @param ip: The ip of the entry (just valid if mode is "add")
889
890     """
891     return self._SingleNodeCall(node, "etc_hosts_modify", [mode, name, ip])
892
893   @_RpcTimeout(_TMO_NORMAL)
894   def call_node_verify(self, node_list, checkdict, cluster_name):
895     """Request verification of given parameters.
896
897     This is a multi-node call.
898
899     """
900     return self._MultiNodeCall(node_list, "node_verify",
901                                [checkdict, cluster_name])
902
903   @classmethod
904   @_RpcTimeout(_TMO_FAST)
905   def call_node_start_master(cls, node, start_daemons, no_voting):
906     """Tells a node to activate itself as a master.
907
908     This is a single-node call.
909
910     """
911     return cls._StaticSingleNodeCall(node, "node_start_master",
912                                      [start_daemons, no_voting])
913
914   @classmethod
915   @_RpcTimeout(_TMO_FAST)
916   def call_node_stop_master(cls, node, stop_daemons):
917     """Tells a node to demote itself from master status.
918
919     This is a single-node call.
920
921     """
922     return cls._StaticSingleNodeCall(node, "node_stop_master", [stop_daemons])
923
924   @classmethod
925   @_RpcTimeout(_TMO_URGENT)
926   def call_master_info(cls, node_list):
927     """Query master info.
928
929     This is a multi-node call.
930
931     """
932     # TODO: should this method query down nodes?
933     return cls._StaticMultiNodeCall(node_list, "master_info", [])
934
935   @classmethod
936   @_RpcTimeout(_TMO_URGENT)
937   def call_version(cls, node_list):
938     """Query node version.
939
940     This is a multi-node call.
941
942     """
943     return cls._StaticMultiNodeCall(node_list, "version", [])
944
945   @_RpcTimeout(_TMO_NORMAL)
946   def call_blockdev_create(self, node, bdev, size, owner, on_primary, info):
947     """Request creation of a given block device.
948
949     This is a single-node call.
950
951     """
952     return self._SingleNodeCall(node, "blockdev_create",
953                                 [bdev.ToDict(), size, owner, on_primary, info])
954
955   @_RpcTimeout(_TMO_NORMAL)
956   def call_blockdev_remove(self, node, bdev):
957     """Request removal of a given block device.
958
959     This is a single-node call.
960
961     """
962     return self._SingleNodeCall(node, "blockdev_remove", [bdev.ToDict()])
963
964   @_RpcTimeout(_TMO_NORMAL)
965   def call_blockdev_rename(self, node, devlist):
966     """Request rename of the given block devices.
967
968     This is a single-node call.
969
970     """
971     return self._SingleNodeCall(node, "blockdev_rename",
972                                 [(d.ToDict(), uid) for d, uid in devlist])
973
974   @_RpcTimeout(_TMO_NORMAL)
975   def call_blockdev_assemble(self, node, disk, owner, on_primary):
976     """Request assembling of a given block device.
977
978     This is a single-node call.
979
980     """
981     return self._SingleNodeCall(node, "blockdev_assemble",
982                                 [disk.ToDict(), owner, on_primary])
983
984   @_RpcTimeout(_TMO_NORMAL)
985   def call_blockdev_shutdown(self, node, disk):
986     """Request shutdown of a given block device.
987
988     This is a single-node call.
989
990     """
991     return self._SingleNodeCall(node, "blockdev_shutdown", [disk.ToDict()])
992
993   @_RpcTimeout(_TMO_NORMAL)
994   def call_blockdev_addchildren(self, node, bdev, ndevs):
995     """Request adding a list of children to a (mirroring) device.
996
997     This is a single-node call.
998
999     """
1000     return self._SingleNodeCall(node, "blockdev_addchildren",
1001                                 [bdev.ToDict(),
1002                                  [disk.ToDict() for disk in ndevs]])
1003
1004   @_RpcTimeout(_TMO_NORMAL)
1005   def call_blockdev_removechildren(self, node, bdev, ndevs):
1006     """Request removing a list of children from a (mirroring) device.
1007
1008     This is a single-node call.
1009
1010     """
1011     return self._SingleNodeCall(node, "blockdev_removechildren",
1012                                 [bdev.ToDict(),
1013                                  [disk.ToDict() for disk in ndevs]])
1014
1015   @_RpcTimeout(_TMO_NORMAL)
1016   def call_blockdev_getmirrorstatus(self, node, disks):
1017     """Request status of a (mirroring) device.
1018
1019     This is a single-node call.
1020
1021     """
1022     result = self._SingleNodeCall(node, "blockdev_getmirrorstatus",
1023                                   [dsk.ToDict() for dsk in disks])
1024     if not result.fail_msg:
1025       result.payload = [objects.BlockDevStatus.FromDict(i)
1026                         for i in result.payload]
1027     return result
1028
1029   @_RpcTimeout(_TMO_NORMAL)
1030   def call_blockdev_find(self, node, disk):
1031     """Request identification of a given block device.
1032
1033     This is a single-node call.
1034
1035     """
1036     result = self._SingleNodeCall(node, "blockdev_find", [disk.ToDict()])
1037     if not result.fail_msg and result.payload is not None:
1038       result.payload = objects.BlockDevStatus.FromDict(result.payload)
1039     return result
1040
1041   @_RpcTimeout(_TMO_NORMAL)
1042   def call_blockdev_close(self, node, instance_name, disks):
1043     """Closes the given block devices.
1044
1045     This is a single-node call.
1046
1047     """
1048     params = [instance_name, [cf.ToDict() for cf in disks]]
1049     return self._SingleNodeCall(node, "blockdev_close", params)
1050
1051   @_RpcTimeout(_TMO_NORMAL)
1052   def call_blockdev_getsizes(self, node, disks):
1053     """Returns the size of the given disks.
1054
1055     This is a single-node call.
1056
1057     """
1058     params = [[cf.ToDict() for cf in disks]]
1059     return self._SingleNodeCall(node, "blockdev_getsize", params)
1060
1061   @_RpcTimeout(_TMO_NORMAL)
1062   def call_drbd_disconnect_net(self, node_list, nodes_ip, disks):
1063     """Disconnects the network of the given drbd devices.
1064
1065     This is a multi-node call.
1066
1067     """
1068     return self._MultiNodeCall(node_list, "drbd_disconnect_net",
1069                                [nodes_ip, [cf.ToDict() for cf in disks]])
1070
1071   @_RpcTimeout(_TMO_NORMAL)
1072   def call_drbd_attach_net(self, node_list, nodes_ip,
1073                            disks, instance_name, multimaster):
1074     """Disconnects the given drbd devices.
1075
1076     This is a multi-node call.
1077
1078     """
1079     return self._MultiNodeCall(node_list, "drbd_attach_net",
1080                                [nodes_ip, [cf.ToDict() for cf in disks],
1081                                 instance_name, multimaster])
1082
1083   @_RpcTimeout(_TMO_SLOW)
1084   def call_drbd_wait_sync(self, node_list, nodes_ip, disks):
1085     """Waits for the synchronization of drbd devices is complete.
1086
1087     This is a multi-node call.
1088
1089     """
1090     return self._MultiNodeCall(node_list, "drbd_wait_sync",
1091                                [nodes_ip, [cf.ToDict() for cf in disks]])
1092
1093   @_RpcTimeout(_TMO_URGENT)
1094   def call_drbd_helper(self, node_list):
1095     """Gets drbd helper.
1096
1097     This is a multi-node call.
1098
1099     """
1100     return self._MultiNodeCall(node_list, "drbd_helper", [])
1101
1102   @classmethod
1103   @_RpcTimeout(_TMO_NORMAL)
1104   def call_upload_file(cls, node_list, file_name, address_list=None):
1105     """Upload a file.
1106
1107     The node will refuse the operation in case the file is not on the
1108     approved file list.
1109
1110     This is a multi-node call.
1111
1112     @type node_list: list
1113     @param node_list: the list of node names to upload to
1114     @type file_name: str
1115     @param file_name: the filename to upload
1116     @type address_list: list or None
1117     @keyword address_list: an optional list of node addresses, in order
1118         to optimize the RPC speed
1119
1120     """
1121     file_contents = utils.ReadFile(file_name)
1122     data = cls._Compress(file_contents)
1123     st = os.stat(file_name)
1124     params = [file_name, data, st.st_mode, st.st_uid, st.st_gid,
1125               st.st_atime, st.st_mtime]
1126     return cls._StaticMultiNodeCall(node_list, "upload_file", params,
1127                                     address_list=address_list)
1128
1129   @classmethod
1130   @_RpcTimeout(_TMO_NORMAL)
1131   def call_write_ssconf_files(cls, node_list, values):
1132     """Write ssconf files.
1133
1134     This is a multi-node call.
1135
1136     """
1137     return cls._StaticMultiNodeCall(node_list, "write_ssconf_files", [values])
1138
1139   @_RpcTimeout(_TMO_FAST)
1140   def call_os_diagnose(self, node_list):
1141     """Request a diagnose of OS definitions.
1142
1143     This is a multi-node call.
1144
1145     """
1146     return self._MultiNodeCall(node_list, "os_diagnose", [])
1147
1148   @_RpcTimeout(_TMO_FAST)
1149   def call_os_get(self, node, name):
1150     """Returns an OS definition.
1151
1152     This is a single-node call.
1153
1154     """
1155     result = self._SingleNodeCall(node, "os_get", [name])
1156     if not result.fail_msg and isinstance(result.payload, dict):
1157       result.payload = objects.OS.FromDict(result.payload)
1158     return result
1159
1160   @_RpcTimeout(_TMO_FAST)
1161   def call_os_validate(self, required, nodes, name, checks, params):
1162     """Run a validation routine for a given OS.
1163
1164     This is a multi-node call.
1165
1166     """
1167     return self._MultiNodeCall(nodes, "os_validate",
1168                                [required, name, checks, params])
1169
1170   @_RpcTimeout(_TMO_NORMAL)
1171   def call_hooks_runner(self, node_list, hpath, phase, env):
1172     """Call the hooks runner.
1173
1174     Args:
1175       - op: the OpCode instance
1176       - env: a dictionary with the environment
1177
1178     This is a multi-node call.
1179
1180     """
1181     params = [hpath, phase, env]
1182     return self._MultiNodeCall(node_list, "hooks_runner", params)
1183
1184   @_RpcTimeout(_TMO_NORMAL)
1185   def call_iallocator_runner(self, node, name, idata):
1186     """Call an iallocator on a remote node
1187
1188     Args:
1189       - name: the iallocator name
1190       - input: the json-encoded input string
1191
1192     This is a single-node call.
1193
1194     """
1195     return self._SingleNodeCall(node, "iallocator_runner", [name, idata])
1196
1197   @_RpcTimeout(_TMO_NORMAL)
1198   def call_blockdev_grow(self, node, cf_bdev, amount):
1199     """Request a snapshot of the given block device.
1200
1201     This is a single-node call.
1202
1203     """
1204     return self._SingleNodeCall(node, "blockdev_grow",
1205                                 [cf_bdev.ToDict(), amount])
1206
1207   @_RpcTimeout(_TMO_1DAY)
1208   def call_blockdev_export(self, node, cf_bdev,
1209                            dest_node, dest_path, cluster_name):
1210     """Export a given disk to another node.
1211
1212     This is a single-node call.
1213
1214     """
1215     return self._SingleNodeCall(node, "blockdev_export",
1216                                 [cf_bdev.ToDict(), dest_node, dest_path,
1217                                  cluster_name])
1218
1219   @_RpcTimeout(_TMO_NORMAL)
1220   def call_blockdev_snapshot(self, node, cf_bdev):
1221     """Request a snapshot of the given block device.
1222
1223     This is a single-node call.
1224
1225     """
1226     return self._SingleNodeCall(node, "blockdev_snapshot", [cf_bdev.ToDict()])
1227
1228   @_RpcTimeout(_TMO_NORMAL)
1229   def call_finalize_export(self, node, instance, snap_disks):
1230     """Request the completion of an export operation.
1231
1232     This writes the export config file, etc.
1233
1234     This is a single-node call.
1235
1236     """
1237     flat_disks = []
1238     for disk in snap_disks:
1239       if isinstance(disk, bool):
1240         flat_disks.append(disk)
1241       else:
1242         flat_disks.append(disk.ToDict())
1243
1244     return self._SingleNodeCall(node, "finalize_export",
1245                                 [self._InstDict(instance), flat_disks])
1246
1247   @_RpcTimeout(_TMO_FAST)
1248   def call_export_info(self, node, path):
1249     """Queries the export information in a given path.
1250
1251     This is a single-node call.
1252
1253     """
1254     return self._SingleNodeCall(node, "export_info", [path])
1255
1256   @_RpcTimeout(_TMO_FAST)
1257   def call_export_list(self, node_list):
1258     """Gets the stored exports list.
1259
1260     This is a multi-node call.
1261
1262     """
1263     return self._MultiNodeCall(node_list, "export_list", [])
1264
1265   @_RpcTimeout(_TMO_FAST)
1266   def call_export_remove(self, node, export):
1267     """Requests removal of a given export.
1268
1269     This is a single-node call.
1270
1271     """
1272     return self._SingleNodeCall(node, "export_remove", [export])
1273
1274   @classmethod
1275   @_RpcTimeout(_TMO_NORMAL)
1276   def call_node_leave_cluster(cls, node, modify_ssh_setup):
1277     """Requests a node to clean the cluster information it has.
1278
1279     This will remove the configuration information from the ganeti data
1280     dir.
1281
1282     This is a single-node call.
1283
1284     """
1285     return cls._StaticSingleNodeCall(node, "node_leave_cluster",
1286                                      [modify_ssh_setup])
1287
1288   @_RpcTimeout(_TMO_FAST)
1289   def call_node_volumes(self, node_list):
1290     """Gets all volumes on node(s).
1291
1292     This is a multi-node call.
1293
1294     """
1295     return self._MultiNodeCall(node_list, "node_volumes", [])
1296
1297   @_RpcTimeout(_TMO_FAST)
1298   def call_node_demote_from_mc(self, node):
1299     """Demote a node from the master candidate role.
1300
1301     This is a single-node call.
1302
1303     """
1304     return self._SingleNodeCall(node, "node_demote_from_mc", [])
1305
1306   @_RpcTimeout(_TMO_NORMAL)
1307   def call_node_powercycle(self, node, hypervisor):
1308     """Tries to powercycle a node.
1309
1310     This is a single-node call.
1311
1312     """
1313     return self._SingleNodeCall(node, "node_powercycle", [hypervisor])
1314
1315   @_RpcTimeout(None)
1316   def call_test_delay(self, node_list, duration):
1317     """Sleep for a fixed time on given node(s).
1318
1319     This is a multi-node call.
1320
1321     """
1322     return self._MultiNodeCall(node_list, "test_delay", [duration],
1323                                read_timeout=int(duration + 5))
1324
1325   @_RpcTimeout(_TMO_FAST)
1326   def call_file_storage_dir_create(self, node, file_storage_dir):
1327     """Create the given file storage directory.
1328
1329     This is a single-node call.
1330
1331     """
1332     return self._SingleNodeCall(node, "file_storage_dir_create",
1333                                 [file_storage_dir])
1334
1335   @_RpcTimeout(_TMO_FAST)
1336   def call_file_storage_dir_remove(self, node, file_storage_dir):
1337     """Remove the given file storage directory.
1338
1339     This is a single-node call.
1340
1341     """
1342     return self._SingleNodeCall(node, "file_storage_dir_remove",
1343                                 [file_storage_dir])
1344
1345   @_RpcTimeout(_TMO_FAST)
1346   def call_file_storage_dir_rename(self, node, old_file_storage_dir,
1347                                    new_file_storage_dir):
1348     """Rename file storage directory.
1349
1350     This is a single-node call.
1351
1352     """
1353     return self._SingleNodeCall(node, "file_storage_dir_rename",
1354                                 [old_file_storage_dir, new_file_storage_dir])
1355
1356   @classmethod
1357   @_RpcTimeout(_TMO_FAST)
1358   def call_jobqueue_update(cls, node_list, address_list, file_name, content):
1359     """Update job queue.
1360
1361     This is a multi-node call.
1362
1363     """
1364     return cls._StaticMultiNodeCall(node_list, "jobqueue_update",
1365                                     [file_name, cls._Compress(content)],
1366                                     address_list=address_list)
1367
1368   @classmethod
1369   @_RpcTimeout(_TMO_NORMAL)
1370   def call_jobqueue_purge(cls, node):
1371     """Purge job queue.
1372
1373     This is a single-node call.
1374
1375     """
1376     return cls._StaticSingleNodeCall(node, "jobqueue_purge", [])
1377
1378   @classmethod
1379   @_RpcTimeout(_TMO_FAST)
1380   def call_jobqueue_rename(cls, node_list, address_list, rename):
1381     """Rename a job queue file.
1382
1383     This is a multi-node call.
1384
1385     """
1386     return cls._StaticMultiNodeCall(node_list, "jobqueue_rename", rename,
1387                                     address_list=address_list)
1388
1389   @_RpcTimeout(_TMO_NORMAL)
1390   def call_hypervisor_validate_params(self, node_list, hvname, hvparams):
1391     """Validate the hypervisor params.
1392
1393     This is a multi-node call.
1394
1395     @type node_list: list
1396     @param node_list: the list of nodes to query
1397     @type hvname: string
1398     @param hvname: the hypervisor name
1399     @type hvparams: dict
1400     @param hvparams: the hypervisor parameters to be validated
1401
1402     """
1403     cluster = self._cfg.GetClusterInfo()
1404     hv_full = objects.FillDict(cluster.hvparams.get(hvname, {}), hvparams)
1405     return self._MultiNodeCall(node_list, "hypervisor_validate_params",
1406                                [hvname, hv_full])
1407
1408   @_RpcTimeout(_TMO_NORMAL)
1409   def call_x509_cert_create(self, node, validity):
1410     """Creates a new X509 certificate for SSL/TLS.
1411
1412     This is a single-node call.
1413
1414     @type validity: int
1415     @param validity: Validity in seconds
1416
1417     """
1418     return self._SingleNodeCall(node, "x509_cert_create", [validity])
1419
1420   @_RpcTimeout(_TMO_NORMAL)
1421   def call_x509_cert_remove(self, node, name):
1422     """Removes a X509 certificate.
1423
1424     This is a single-node call.
1425
1426     @type name: string
1427     @param name: Certificate name
1428
1429     """
1430     return self._SingleNodeCall(node, "x509_cert_remove", [name])
1431
1432   @_RpcTimeout(_TMO_NORMAL)
1433   def call_import_start(self, node, opts, instance, dest, dest_args):
1434     """Starts a listener for an import.
1435
1436     This is a single-node call.
1437
1438     @type node: string
1439     @param node: Node name
1440     @type instance: C{objects.Instance}
1441     @param instance: Instance object
1442
1443     """
1444     return self._SingleNodeCall(node, "import_start",
1445                                 [opts.ToDict(),
1446                                  self._InstDict(instance), dest,
1447                                  _EncodeImportExportIO(dest, dest_args)])
1448
1449   @_RpcTimeout(_TMO_NORMAL)
1450   def call_export_start(self, node, opts, host, port,
1451                         instance, source, source_args):
1452     """Starts an export daemon.
1453
1454     This is a single-node call.
1455
1456     @type node: string
1457     @param node: Node name
1458     @type instance: C{objects.Instance}
1459     @param instance: Instance object
1460
1461     """
1462     return self._SingleNodeCall(node, "export_start",
1463                                 [opts.ToDict(), host, port,
1464                                  self._InstDict(instance), source,
1465                                  _EncodeImportExportIO(source, source_args)])
1466
1467   @_RpcTimeout(_TMO_FAST)
1468   def call_impexp_status(self, node, names):
1469     """Gets the status of an import or export.
1470
1471     This is a single-node call.
1472
1473     @type node: string
1474     @param node: Node name
1475     @type names: List of strings
1476     @param names: Import/export names
1477     @rtype: List of L{objects.ImportExportStatus} instances
1478     @return: Returns a list of the state of each named import/export or None if
1479              a status couldn't be retrieved
1480
1481     """
1482     result = self._SingleNodeCall(node, "impexp_status", [names])
1483
1484     if not result.fail_msg:
1485       decoded = []
1486
1487       for i in result.payload:
1488         if i is None:
1489           decoded.append(None)
1490           continue
1491         decoded.append(objects.ImportExportStatus.FromDict(i))
1492
1493       result.payload = decoded
1494
1495     return result
1496
1497   @_RpcTimeout(_TMO_NORMAL)
1498   def call_impexp_abort(self, node, name):
1499     """Aborts an import or export.
1500
1501     This is a single-node call.
1502
1503     @type node: string
1504     @param node: Node name
1505     @type name: string
1506     @param name: Import/export name
1507
1508     """
1509     return self._SingleNodeCall(node, "impexp_abort", [name])
1510
1511   @_RpcTimeout(_TMO_NORMAL)
1512   def call_impexp_cleanup(self, node, name):
1513     """Cleans up after an import or export.
1514
1515     This is a single-node call.
1516
1517     @type node: string
1518     @param node: Node name
1519     @type name: string
1520     @param name: Import/export name
1521
1522     """
1523     return self._SingleNodeCall(node, "impexp_cleanup", [name])