jqueue: Use priority for worker pool
[ganeti-local] / lib / rpc.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007, 2010 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Inter-node RPC library.
23
24 """
25
26 # pylint: disable-msg=C0103,R0201,R0904
27 # C0103: Invalid name, since call_ are not valid
28 # R0201: Method could be a function, we keep all rpcs instance methods
29 # as not to change them back and forth between static/instance methods
30 # if they need to start using instance attributes
31 # R0904: Too many public methods
32
33 import os
34 import logging
35 import zlib
36 import base64
37 import pycurl
38 import threading
39
40 from ganeti import utils
41 from ganeti import objects
42 from ganeti import http
43 from ganeti import serializer
44 from ganeti import constants
45 from ganeti import errors
46 from ganeti import netutils
47 from ganeti import ssconf
48
49 # pylint has a bug here, doesn't see this import
50 import ganeti.http.client  # pylint: disable-msg=W0611
51
52
53 # Timeout for connecting to nodes (seconds)
54 _RPC_CONNECT_TIMEOUT = 5
55
56 _RPC_CLIENT_HEADERS = [
57   "Content-type: %s" % http.HTTP_APP_JSON,
58   ]
59
60 # Various time constants for the timeout table
61 _TMO_URGENT = 60 # one minute
62 _TMO_FAST = 5 * 60 # five minutes
63 _TMO_NORMAL = 15 * 60 # 15 minutes
64 _TMO_SLOW = 3600 # one hour
65 _TMO_4HRS = 4 * 3600
66 _TMO_1DAY = 86400
67
68 # Timeout table that will be built later by decorators
69 # Guidelines for choosing timeouts:
70 # - call used during watcher: timeout -> 1min, _TMO_URGENT
71 # - trivial (but be sure it is trivial) (e.g. reading a file): 5min, _TMO_FAST
72 # - other calls: 15 min, _TMO_NORMAL
73 # - special calls (instance add, etc.): either _TMO_SLOW (1h) or huge timeouts
74
75 _TIMEOUTS = {
76 }
77
78
79 def Init():
80   """Initializes the module-global HTTP client manager.
81
82   Must be called before using any RPC function and while exactly one thread is
83   running.
84
85   """
86   # curl_global_init(3) and curl_global_cleanup(3) must be called with only
87   # one thread running. This check is just a safety measure -- it doesn't
88   # cover all cases.
89   assert threading.activeCount() == 1, \
90          "Found more than one active thread when initializing pycURL"
91
92   logging.info("Using PycURL %s", pycurl.version)
93
94   pycurl.global_init(pycurl.GLOBAL_ALL)
95
96
97 def Shutdown():
98   """Stops the module-global HTTP client manager.
99
100   Must be called before quitting the program and while exactly one thread is
101   running.
102
103   """
104   pycurl.global_cleanup()
105
106
107 def _ConfigRpcCurl(curl):
108   noded_cert = str(constants.NODED_CERT_FILE)
109
110   curl.setopt(pycurl.FOLLOWLOCATION, False)
111   curl.setopt(pycurl.CAINFO, noded_cert)
112   curl.setopt(pycurl.SSL_VERIFYHOST, 0)
113   curl.setopt(pycurl.SSL_VERIFYPEER, True)
114   curl.setopt(pycurl.SSLCERTTYPE, "PEM")
115   curl.setopt(pycurl.SSLCERT, noded_cert)
116   curl.setopt(pycurl.SSLKEYTYPE, "PEM")
117   curl.setopt(pycurl.SSLKEY, noded_cert)
118   curl.setopt(pycurl.CONNECTTIMEOUT, _RPC_CONNECT_TIMEOUT)
119
120
121 class _RpcThreadLocal(threading.local):
122   def GetHttpClientPool(self):
123     """Returns a per-thread HTTP client pool.
124
125     @rtype: L{http.client.HttpClientPool}
126
127     """
128     try:
129       pool = self.hcp
130     except AttributeError:
131       pool = http.client.HttpClientPool(_ConfigRpcCurl)
132       self.hcp = pool
133
134     return pool
135
136
137 _thread_local = _RpcThreadLocal()
138
139
140 def _RpcTimeout(secs):
141   """Timeout decorator.
142
143   When applied to a rpc call_* function, it updates the global timeout
144   table with the given function/timeout.
145
146   """
147   def decorator(f):
148     name = f.__name__
149     assert name.startswith("call_")
150     _TIMEOUTS[name[len("call_"):]] = secs
151     return f
152   return decorator
153
154
155 def RunWithRPC(fn):
156   """RPC-wrapper decorator.
157
158   When applied to a function, it runs it with the RPC system
159   initialized, and it shutsdown the system afterwards. This means the
160   function must be called without RPC being initialized.
161
162   """
163   def wrapper(*args, **kwargs):
164     Init()
165     try:
166       return fn(*args, **kwargs)
167     finally:
168       Shutdown()
169   return wrapper
170
171
172 class RpcResult(object):
173   """RPC Result class.
174
175   This class holds an RPC result. It is needed since in multi-node
176   calls we can't raise an exception just because one one out of many
177   failed, and therefore we use this class to encapsulate the result.
178
179   @ivar data: the data payload, for successful results, or None
180   @ivar call: the name of the RPC call
181   @ivar node: the name of the node to which we made the call
182   @ivar offline: whether the operation failed because the node was
183       offline, as opposed to actual failure; offline=True will always
184       imply failed=True, in order to allow simpler checking if
185       the user doesn't care about the exact failure mode
186   @ivar fail_msg: the error message if the call failed
187
188   """
189   def __init__(self, data=None, failed=False, offline=False,
190                call=None, node=None):
191     self.offline = offline
192     self.call = call
193     self.node = node
194
195     if offline:
196       self.fail_msg = "Node is marked offline"
197       self.data = self.payload = None
198     elif failed:
199       self.fail_msg = self._EnsureErr(data)
200       self.data = self.payload = None
201     else:
202       self.data = data
203       if not isinstance(self.data, (tuple, list)):
204         self.fail_msg = ("RPC layer error: invalid result type (%s)" %
205                          type(self.data))
206         self.payload = None
207       elif len(data) != 2:
208         self.fail_msg = ("RPC layer error: invalid result length (%d), "
209                          "expected 2" % len(self.data))
210         self.payload = None
211       elif not self.data[0]:
212         self.fail_msg = self._EnsureErr(self.data[1])
213         self.payload = None
214       else:
215         # finally success
216         self.fail_msg = None
217         self.payload = data[1]
218
219     assert hasattr(self, "call")
220     assert hasattr(self, "data")
221     assert hasattr(self, "fail_msg")
222     assert hasattr(self, "node")
223     assert hasattr(self, "offline")
224     assert hasattr(self, "payload")
225
226   @staticmethod
227   def _EnsureErr(val):
228     """Helper to ensure we return a 'True' value for error."""
229     if val:
230       return val
231     else:
232       return "No error information"
233
234   def Raise(self, msg, prereq=False, ecode=None):
235     """If the result has failed, raise an OpExecError.
236
237     This is used so that LU code doesn't have to check for each
238     result, but instead can call this function.
239
240     """
241     if not self.fail_msg:
242       return
243
244     if not msg: # one could pass None for default message
245       msg = ("Call '%s' to node '%s' has failed: %s" %
246              (self.call, self.node, self.fail_msg))
247     else:
248       msg = "%s: %s" % (msg, self.fail_msg)
249     if prereq:
250       ec = errors.OpPrereqError
251     else:
252       ec = errors.OpExecError
253     if ecode is not None:
254       args = (msg, ecode)
255     else:
256       args = (msg, )
257     raise ec(*args) # pylint: disable-msg=W0142
258
259
260 def _AddressLookup(node_list,
261                    ssc=ssconf.SimpleStore,
262                    nslookup_fn=netutils.Hostname.GetIP):
263   """Return addresses for given node names.
264
265   @type node_list: list
266   @param node_list: List of node names
267   @type ssc: class
268   @param ssc: SimpleStore class that is used to obtain node->ip mappings
269   @type nslookup_fn: callable
270   @param nslookup_fn: function use to do NS lookup
271   @rtype: list of addresses and/or None's
272   @returns: List of corresponding addresses, if found
273
274   """
275   ss = ssc()
276   iplist = ss.GetNodePrimaryIPList()
277   family = ss.GetPrimaryIPFamily()
278   addresses = []
279   ipmap = dict(entry.split() for entry in iplist)
280   for node in node_list:
281     address = ipmap.get(node)
282     if address is None:
283       address = nslookup_fn(node, family=family)
284     addresses.append(address)
285
286   return addresses
287
288
289 class Client:
290   """RPC Client class.
291
292   This class, given a (remote) method name, a list of parameters and a
293   list of nodes, will contact (in parallel) all nodes, and return a
294   dict of results (key: node name, value: result).
295
296   One current bug is that generic failure is still signaled by
297   'False' result, which is not good. This overloading of values can
298   cause bugs.
299
300   """
301   def __init__(self, procedure, body, port, address_lookup_fn=_AddressLookup):
302     assert procedure in _TIMEOUTS, ("New RPC call not declared in the"
303                                     " timeouts table")
304     self.procedure = procedure
305     self.body = body
306     self.port = port
307     self._request = {}
308     self._address_lookup_fn = address_lookup_fn
309
310   def ConnectList(self, node_list, address_list=None, read_timeout=None):
311     """Add a list of nodes to the target nodes.
312
313     @type node_list: list
314     @param node_list: the list of node names to connect
315     @type address_list: list or None
316     @keyword address_list: either None or a list with node addresses,
317         which must have the same length as the node list
318     @type read_timeout: int
319     @param read_timeout: overwrites default timeout for operation
320
321     """
322     if address_list is None:
323       # Always use IP address instead of node name
324       address_list = self._address_lookup_fn(node_list)
325
326     assert len(node_list) == len(address_list), \
327            "Name and address lists must have the same length"
328
329     for node, address in zip(node_list, address_list):
330       self.ConnectNode(node, address, read_timeout=read_timeout)
331
332   def ConnectNode(self, name, address=None, read_timeout=None):
333     """Add a node to the target list.
334
335     @type name: str
336     @param name: the node name
337     @type address: str
338     @param address: the node address, if known
339     @type read_timeout: int
340     @param read_timeout: overwrites default timeout for operation
341
342     """
343     if address is None:
344       # Always use IP address instead of node name
345       address = self._address_lookup_fn([name])[0]
346
347     assert(address is not None)
348
349     if read_timeout is None:
350       read_timeout = _TIMEOUTS[self.procedure]
351
352     self._request[name] = \
353       http.client.HttpClientRequest(str(address), self.port,
354                                     http.HTTP_PUT, str("/%s" % self.procedure),
355                                     headers=_RPC_CLIENT_HEADERS,
356                                     post_data=str(self.body),
357                                     read_timeout=read_timeout)
358
359   def GetResults(self, http_pool=None):
360     """Call nodes and return results.
361
362     @rtype: list
363     @return: List of RPC results
364
365     """
366     if not http_pool:
367       http_pool = _thread_local.GetHttpClientPool()
368
369     http_pool.ProcessRequests(self._request.values())
370
371     results = {}
372
373     for name, req in self._request.iteritems():
374       if req.success and req.resp_status_code == http.HTTP_OK:
375         results[name] = RpcResult(data=serializer.LoadJson(req.resp_body),
376                                   node=name, call=self.procedure)
377         continue
378
379       # TODO: Better error reporting
380       if req.error:
381         msg = req.error
382       else:
383         msg = req.resp_body
384
385       logging.error("RPC error in %s from node %s: %s",
386                     self.procedure, name, msg)
387       results[name] = RpcResult(data=msg, failed=True, node=name,
388                                 call=self.procedure)
389
390     return results
391
392
393 def _EncodeImportExportIO(ieio, ieioargs):
394   """Encodes import/export I/O information.
395
396   """
397   if ieio == constants.IEIO_RAW_DISK:
398     assert len(ieioargs) == 1
399     return (ieioargs[0].ToDict(), )
400
401   if ieio == constants.IEIO_SCRIPT:
402     assert len(ieioargs) == 2
403     return (ieioargs[0].ToDict(), ieioargs[1])
404
405   return ieioargs
406
407
408 class RpcRunner(object):
409   """RPC runner class"""
410
411   def __init__(self, cfg):
412     """Initialized the rpc runner.
413
414     @type cfg:  C{config.ConfigWriter}
415     @param cfg: the configuration object that will be used to get data
416                 about the cluster
417
418     """
419     self._cfg = cfg
420     self.port = netutils.GetDaemonPort(constants.NODED)
421
422   def _InstDict(self, instance, hvp=None, bep=None, osp=None):
423     """Convert the given instance to a dict.
424
425     This is done via the instance's ToDict() method and additionally
426     we fill the hvparams with the cluster defaults.
427
428     @type instance: L{objects.Instance}
429     @param instance: an Instance object
430     @type hvp: dict or None
431     @param hvp: a dictionary with overridden hypervisor parameters
432     @type bep: dict or None
433     @param bep: a dictionary with overridden backend parameters
434     @type osp: dict or None
435     @param osp: a dictionary with overriden os parameters
436     @rtype: dict
437     @return: the instance dict, with the hvparams filled with the
438         cluster defaults
439
440     """
441     idict = instance.ToDict()
442     cluster = self._cfg.GetClusterInfo()
443     idict["hvparams"] = cluster.FillHV(instance)
444     if hvp is not None:
445       idict["hvparams"].update(hvp)
446     idict["beparams"] = cluster.FillBE(instance)
447     if bep is not None:
448       idict["beparams"].update(bep)
449     idict["osparams"] = cluster.SimpleFillOS(instance.os, instance.osparams)
450     if osp is not None:
451       idict["osparams"].update(osp)
452     for nic in idict["nics"]:
453       nic['nicparams'] = objects.FillDict(
454         cluster.nicparams[constants.PP_DEFAULT],
455         nic['nicparams'])
456     return idict
457
458   def _ConnectList(self, client, node_list, call, read_timeout=None):
459     """Helper for computing node addresses.
460
461     @type client: L{ganeti.rpc.Client}
462     @param client: a C{Client} instance
463     @type node_list: list
464     @param node_list: the node list we should connect
465     @type call: string
466     @param call: the name of the remote procedure call, for filling in
467         correctly any eventual offline nodes' results
468     @type read_timeout: int
469     @param read_timeout: overwrites the default read timeout for the
470         given operation
471
472     """
473     all_nodes = self._cfg.GetAllNodesInfo()
474     name_list = []
475     addr_list = []
476     skip_dict = {}
477     for node in node_list:
478       if node in all_nodes:
479         if all_nodes[node].offline:
480           skip_dict[node] = RpcResult(node=node, offline=True, call=call)
481           continue
482         val = all_nodes[node].primary_ip
483       else:
484         val = None
485       addr_list.append(val)
486       name_list.append(node)
487     if name_list:
488       client.ConnectList(name_list, address_list=addr_list,
489                          read_timeout=read_timeout)
490     return skip_dict
491
492   def _ConnectNode(self, client, node, call, read_timeout=None):
493     """Helper for computing one node's address.
494
495     @type client: L{ganeti.rpc.Client}
496     @param client: a C{Client} instance
497     @type node: str
498     @param node: the node we should connect
499     @type call: string
500     @param call: the name of the remote procedure call, for filling in
501         correctly any eventual offline nodes' results
502     @type read_timeout: int
503     @param read_timeout: overwrites the default read timeout for the
504         given operation
505
506     """
507     node_info = self._cfg.GetNodeInfo(node)
508     if node_info is not None:
509       if node_info.offline:
510         return RpcResult(node=node, offline=True, call=call)
511       addr = node_info.primary_ip
512     else:
513       addr = None
514     client.ConnectNode(node, address=addr, read_timeout=read_timeout)
515
516   def _MultiNodeCall(self, node_list, procedure, args, read_timeout=None):
517     """Helper for making a multi-node call
518
519     """
520     body = serializer.DumpJson(args, indent=False)
521     c = Client(procedure, body, self.port)
522     skip_dict = self._ConnectList(c, node_list, procedure,
523                                   read_timeout=read_timeout)
524     skip_dict.update(c.GetResults())
525     return skip_dict
526
527   @classmethod
528   def _StaticMultiNodeCall(cls, node_list, procedure, args,
529                            address_list=None, read_timeout=None):
530     """Helper for making a multi-node static call
531
532     """
533     body = serializer.DumpJson(args, indent=False)
534     c = Client(procedure, body, netutils.GetDaemonPort(constants.NODED))
535     c.ConnectList(node_list, address_list=address_list,
536                   read_timeout=read_timeout)
537     return c.GetResults()
538
539   def _SingleNodeCall(self, node, procedure, args, read_timeout=None):
540     """Helper for making a single-node call
541
542     """
543     body = serializer.DumpJson(args, indent=False)
544     c = Client(procedure, body, self.port)
545     result = self._ConnectNode(c, node, procedure, read_timeout=read_timeout)
546     if result is None:
547       # we did connect, node is not offline
548       result = c.GetResults()[node]
549     return result
550
551   @classmethod
552   def _StaticSingleNodeCall(cls, node, procedure, args, read_timeout=None):
553     """Helper for making a single-node static call
554
555     """
556     body = serializer.DumpJson(args, indent=False)
557     c = Client(procedure, body, netutils.GetDaemonPort(constants.NODED))
558     c.ConnectNode(node, read_timeout=read_timeout)
559     return c.GetResults()[node]
560
561   @staticmethod
562   def _Compress(data):
563     """Compresses a string for transport over RPC.
564
565     Small amounts of data are not compressed.
566
567     @type data: str
568     @param data: Data
569     @rtype: tuple
570     @return: Encoded data to send
571
572     """
573     # Small amounts of data are not compressed
574     if len(data) < 512:
575       return (constants.RPC_ENCODING_NONE, data)
576
577     # Compress with zlib and encode in base64
578     return (constants.RPC_ENCODING_ZLIB_BASE64,
579             base64.b64encode(zlib.compress(data, 3)))
580
581   #
582   # Begin RPC calls
583   #
584
585   @_RpcTimeout(_TMO_URGENT)
586   def call_lv_list(self, node_list, vg_name):
587     """Gets the logical volumes present in a given volume group.
588
589     This is a multi-node call.
590
591     """
592     return self._MultiNodeCall(node_list, "lv_list", [vg_name])
593
594   @_RpcTimeout(_TMO_URGENT)
595   def call_vg_list(self, node_list):
596     """Gets the volume group list.
597
598     This is a multi-node call.
599
600     """
601     return self._MultiNodeCall(node_list, "vg_list", [])
602
603   @_RpcTimeout(_TMO_NORMAL)
604   def call_storage_list(self, node_list, su_name, su_args, name, fields):
605     """Get list of storage units.
606
607     This is a multi-node call.
608
609     """
610     return self._MultiNodeCall(node_list, "storage_list",
611                                [su_name, su_args, name, fields])
612
613   @_RpcTimeout(_TMO_NORMAL)
614   def call_storage_modify(self, node, su_name, su_args, name, changes):
615     """Modify a storage unit.
616
617     This is a single-node call.
618
619     """
620     return self._SingleNodeCall(node, "storage_modify",
621                                 [su_name, su_args, name, changes])
622
623   @_RpcTimeout(_TMO_NORMAL)
624   def call_storage_execute(self, node, su_name, su_args, name, op):
625     """Executes an operation on a storage unit.
626
627     This is a single-node call.
628
629     """
630     return self._SingleNodeCall(node, "storage_execute",
631                                 [su_name, su_args, name, op])
632
633   @_RpcTimeout(_TMO_URGENT)
634   def call_bridges_exist(self, node, bridges_list):
635     """Checks if a node has all the bridges given.
636
637     This method checks if all bridges given in the bridges_list are
638     present on the remote node, so that an instance that uses interfaces
639     on those bridges can be started.
640
641     This is a single-node call.
642
643     """
644     return self._SingleNodeCall(node, "bridges_exist", [bridges_list])
645
646   @_RpcTimeout(_TMO_NORMAL)
647   def call_instance_start(self, node, instance, hvp, bep):
648     """Starts an instance.
649
650     This is a single-node call.
651
652     """
653     idict = self._InstDict(instance, hvp=hvp, bep=bep)
654     return self._SingleNodeCall(node, "instance_start", [idict])
655
656   @_RpcTimeout(_TMO_NORMAL)
657   def call_instance_shutdown(self, node, instance, timeout):
658     """Stops an instance.
659
660     This is a single-node call.
661
662     """
663     return self._SingleNodeCall(node, "instance_shutdown",
664                                 [self._InstDict(instance), timeout])
665
666   @_RpcTimeout(_TMO_NORMAL)
667   def call_migration_info(self, node, instance):
668     """Gather the information necessary to prepare an instance migration.
669
670     This is a single-node call.
671
672     @type node: string
673     @param node: the node on which the instance is currently running
674     @type instance: C{objects.Instance}
675     @param instance: the instance definition
676
677     """
678     return self._SingleNodeCall(node, "migration_info",
679                                 [self._InstDict(instance)])
680
681   @_RpcTimeout(_TMO_NORMAL)
682   def call_accept_instance(self, node, instance, info, target):
683     """Prepare a node to accept an instance.
684
685     This is a single-node call.
686
687     @type node: string
688     @param node: the target node for the migration
689     @type instance: C{objects.Instance}
690     @param instance: the instance definition
691     @type info: opaque/hypervisor specific (string/data)
692     @param info: result for the call_migration_info call
693     @type target: string
694     @param target: target hostname (usually ip address) (on the node itself)
695
696     """
697     return self._SingleNodeCall(node, "accept_instance",
698                                 [self._InstDict(instance), info, target])
699
700   @_RpcTimeout(_TMO_NORMAL)
701   def call_finalize_migration(self, node, instance, info, success):
702     """Finalize any target-node migration specific operation.
703
704     This is called both in case of a successful migration and in case of error
705     (in which case it should abort the migration).
706
707     This is a single-node call.
708
709     @type node: string
710     @param node: the target node for the migration
711     @type instance: C{objects.Instance}
712     @param instance: the instance definition
713     @type info: opaque/hypervisor specific (string/data)
714     @param info: result for the call_migration_info call
715     @type success: boolean
716     @param success: whether the migration was a success or a failure
717
718     """
719     return self._SingleNodeCall(node, "finalize_migration",
720                                 [self._InstDict(instance), info, success])
721
722   @_RpcTimeout(_TMO_SLOW)
723   def call_instance_migrate(self, node, instance, target, live):
724     """Migrate an instance.
725
726     This is a single-node call.
727
728     @type node: string
729     @param node: the node on which the instance is currently running
730     @type instance: C{objects.Instance}
731     @param instance: the instance definition
732     @type target: string
733     @param target: the target node name
734     @type live: boolean
735     @param live: whether the migration should be done live or not (the
736         interpretation of this parameter is left to the hypervisor)
737
738     """
739     return self._SingleNodeCall(node, "instance_migrate",
740                                 [self._InstDict(instance), target, live])
741
742   @_RpcTimeout(_TMO_NORMAL)
743   def call_instance_reboot(self, node, inst, reboot_type, shutdown_timeout):
744     """Reboots an instance.
745
746     This is a single-node call.
747
748     """
749     return self._SingleNodeCall(node, "instance_reboot",
750                                 [self._InstDict(inst), reboot_type,
751                                  shutdown_timeout])
752
753   @_RpcTimeout(_TMO_1DAY)
754   def call_instance_os_add(self, node, inst, reinstall, debug):
755     """Installs an OS on the given instance.
756
757     This is a single-node call.
758
759     """
760     return self._SingleNodeCall(node, "instance_os_add",
761                                 [self._InstDict(inst), reinstall, debug])
762
763   @_RpcTimeout(_TMO_SLOW)
764   def call_instance_run_rename(self, node, inst, old_name, debug):
765     """Run the OS rename script for an instance.
766
767     This is a single-node call.
768
769     """
770     return self._SingleNodeCall(node, "instance_run_rename",
771                                 [self._InstDict(inst), old_name, debug])
772
773   @_RpcTimeout(_TMO_URGENT)
774   def call_instance_info(self, node, instance, hname):
775     """Returns information about a single instance.
776
777     This is a single-node call.
778
779     @type node: list
780     @param node: the list of nodes to query
781     @type instance: string
782     @param instance: the instance name
783     @type hname: string
784     @param hname: the hypervisor type of the instance
785
786     """
787     return self._SingleNodeCall(node, "instance_info", [instance, hname])
788
789   @_RpcTimeout(_TMO_NORMAL)
790   def call_instance_migratable(self, node, instance):
791     """Checks whether the given instance can be migrated.
792
793     This is a single-node call.
794
795     @param node: the node to query
796     @type instance: L{objects.Instance}
797     @param instance: the instance to check
798
799
800     """
801     return self._SingleNodeCall(node, "instance_migratable",
802                                 [self._InstDict(instance)])
803
804   @_RpcTimeout(_TMO_URGENT)
805   def call_all_instances_info(self, node_list, hypervisor_list):
806     """Returns information about all instances on the given nodes.
807
808     This is a multi-node call.
809
810     @type node_list: list
811     @param node_list: the list of nodes to query
812     @type hypervisor_list: list
813     @param hypervisor_list: the hypervisors to query for instances
814
815     """
816     return self._MultiNodeCall(node_list, "all_instances_info",
817                                [hypervisor_list])
818
819   @_RpcTimeout(_TMO_URGENT)
820   def call_instance_list(self, node_list, hypervisor_list):
821     """Returns the list of running instances on a given node.
822
823     This is a multi-node call.
824
825     @type node_list: list
826     @param node_list: the list of nodes to query
827     @type hypervisor_list: list
828     @param hypervisor_list: the hypervisors to query for instances
829
830     """
831     return self._MultiNodeCall(node_list, "instance_list", [hypervisor_list])
832
833   @_RpcTimeout(_TMO_FAST)
834   def call_node_tcp_ping(self, node, source, target, port, timeout,
835                          live_port_needed):
836     """Do a TcpPing on the remote node
837
838     This is a single-node call.
839
840     """
841     return self._SingleNodeCall(node, "node_tcp_ping",
842                                 [source, target, port, timeout,
843                                  live_port_needed])
844
845   @_RpcTimeout(_TMO_FAST)
846   def call_node_has_ip_address(self, node, address):
847     """Checks if a node has the given IP address.
848
849     This is a single-node call.
850
851     """
852     return self._SingleNodeCall(node, "node_has_ip_address", [address])
853
854   @_RpcTimeout(_TMO_URGENT)
855   def call_node_info(self, node_list, vg_name, hypervisor_type):
856     """Return node information.
857
858     This will return memory information and volume group size and free
859     space.
860
861     This is a multi-node call.
862
863     @type node_list: list
864     @param node_list: the list of nodes to query
865     @type vg_name: C{string}
866     @param vg_name: the name of the volume group to ask for disk space
867         information
868     @type hypervisor_type: C{str}
869     @param hypervisor_type: the name of the hypervisor to ask for
870         memory information
871
872     """
873     return self._MultiNodeCall(node_list, "node_info",
874                                [vg_name, hypervisor_type])
875
876   @_RpcTimeout(_TMO_NORMAL)
877   def call_etc_hosts_modify(self, node, mode, name, ip):
878     """Modify hosts file with name
879
880     @type node: string
881     @param node: The node to call
882     @type mode: string
883     @param mode: The mode to operate. Currently "add" or "remove"
884     @type name: string
885     @param name: The host name to be modified
886     @type ip: string
887     @param ip: The ip of the entry (just valid if mode is "add")
888
889     """
890     return self._SingleNodeCall(node, "etc_hosts_modify", [mode, name, ip])
891
892   @_RpcTimeout(_TMO_NORMAL)
893   def call_node_verify(self, node_list, checkdict, cluster_name):
894     """Request verification of given parameters.
895
896     This is a multi-node call.
897
898     """
899     return self._MultiNodeCall(node_list, "node_verify",
900                                [checkdict, cluster_name])
901
902   @classmethod
903   @_RpcTimeout(_TMO_FAST)
904   def call_node_start_master(cls, node, start_daemons, no_voting):
905     """Tells a node to activate itself as a master.
906
907     This is a single-node call.
908
909     """
910     return cls._StaticSingleNodeCall(node, "node_start_master",
911                                      [start_daemons, no_voting])
912
913   @classmethod
914   @_RpcTimeout(_TMO_FAST)
915   def call_node_stop_master(cls, node, stop_daemons):
916     """Tells a node to demote itself from master status.
917
918     This is a single-node call.
919
920     """
921     return cls._StaticSingleNodeCall(node, "node_stop_master", [stop_daemons])
922
923   @classmethod
924   @_RpcTimeout(_TMO_URGENT)
925   def call_master_info(cls, node_list):
926     """Query master info.
927
928     This is a multi-node call.
929
930     """
931     # TODO: should this method query down nodes?
932     return cls._StaticMultiNodeCall(node_list, "master_info", [])
933
934   @classmethod
935   @_RpcTimeout(_TMO_URGENT)
936   def call_version(cls, node_list):
937     """Query node version.
938
939     This is a multi-node call.
940
941     """
942     return cls._StaticMultiNodeCall(node_list, "version", [])
943
944   @_RpcTimeout(_TMO_NORMAL)
945   def call_blockdev_create(self, node, bdev, size, owner, on_primary, info):
946     """Request creation of a given block device.
947
948     This is a single-node call.
949
950     """
951     return self._SingleNodeCall(node, "blockdev_create",
952                                 [bdev.ToDict(), size, owner, on_primary, info])
953
954   @_RpcTimeout(_TMO_NORMAL)
955   def call_blockdev_remove(self, node, bdev):
956     """Request removal of a given block device.
957
958     This is a single-node call.
959
960     """
961     return self._SingleNodeCall(node, "blockdev_remove", [bdev.ToDict()])
962
963   @_RpcTimeout(_TMO_NORMAL)
964   def call_blockdev_rename(self, node, devlist):
965     """Request rename of the given block devices.
966
967     This is a single-node call.
968
969     """
970     return self._SingleNodeCall(node, "blockdev_rename",
971                                 [(d.ToDict(), uid) for d, uid in devlist])
972
973   @_RpcTimeout(_TMO_NORMAL)
974   def call_blockdev_assemble(self, node, disk, owner, on_primary):
975     """Request assembling of a given block device.
976
977     This is a single-node call.
978
979     """
980     return self._SingleNodeCall(node, "blockdev_assemble",
981                                 [disk.ToDict(), owner, on_primary])
982
983   @_RpcTimeout(_TMO_NORMAL)
984   def call_blockdev_shutdown(self, node, disk):
985     """Request shutdown of a given block device.
986
987     This is a single-node call.
988
989     """
990     return self._SingleNodeCall(node, "blockdev_shutdown", [disk.ToDict()])
991
992   @_RpcTimeout(_TMO_NORMAL)
993   def call_blockdev_addchildren(self, node, bdev, ndevs):
994     """Request adding a list of children to a (mirroring) device.
995
996     This is a single-node call.
997
998     """
999     return self._SingleNodeCall(node, "blockdev_addchildren",
1000                                 [bdev.ToDict(),
1001                                  [disk.ToDict() for disk in ndevs]])
1002
1003   @_RpcTimeout(_TMO_NORMAL)
1004   def call_blockdev_removechildren(self, node, bdev, ndevs):
1005     """Request removing a list of children from a (mirroring) device.
1006
1007     This is a single-node call.
1008
1009     """
1010     return self._SingleNodeCall(node, "blockdev_removechildren",
1011                                 [bdev.ToDict(),
1012                                  [disk.ToDict() for disk in ndevs]])
1013
1014   @_RpcTimeout(_TMO_NORMAL)
1015   def call_blockdev_getmirrorstatus(self, node, disks):
1016     """Request status of a (mirroring) device.
1017
1018     This is a single-node call.
1019
1020     """
1021     result = self._SingleNodeCall(node, "blockdev_getmirrorstatus",
1022                                   [dsk.ToDict() for dsk in disks])
1023     if not result.fail_msg:
1024       result.payload = [objects.BlockDevStatus.FromDict(i)
1025                         for i in result.payload]
1026     return result
1027
1028   @_RpcTimeout(_TMO_NORMAL)
1029   def call_blockdev_find(self, node, disk):
1030     """Request identification of a given block device.
1031
1032     This is a single-node call.
1033
1034     """
1035     result = self._SingleNodeCall(node, "blockdev_find", [disk.ToDict()])
1036     if not result.fail_msg and result.payload is not None:
1037       result.payload = objects.BlockDevStatus.FromDict(result.payload)
1038     return result
1039
1040   @_RpcTimeout(_TMO_NORMAL)
1041   def call_blockdev_close(self, node, instance_name, disks):
1042     """Closes the given block devices.
1043
1044     This is a single-node call.
1045
1046     """
1047     params = [instance_name, [cf.ToDict() for cf in disks]]
1048     return self._SingleNodeCall(node, "blockdev_close", params)
1049
1050   @_RpcTimeout(_TMO_NORMAL)
1051   def call_blockdev_getsizes(self, node, disks):
1052     """Returns the size of the given disks.
1053
1054     This is a single-node call.
1055
1056     """
1057     params = [[cf.ToDict() for cf in disks]]
1058     return self._SingleNodeCall(node, "blockdev_getsize", params)
1059
1060   @_RpcTimeout(_TMO_NORMAL)
1061   def call_drbd_disconnect_net(self, node_list, nodes_ip, disks):
1062     """Disconnects the network of the given drbd devices.
1063
1064     This is a multi-node call.
1065
1066     """
1067     return self._MultiNodeCall(node_list, "drbd_disconnect_net",
1068                                [nodes_ip, [cf.ToDict() for cf in disks]])
1069
1070   @_RpcTimeout(_TMO_NORMAL)
1071   def call_drbd_attach_net(self, node_list, nodes_ip,
1072                            disks, instance_name, multimaster):
1073     """Disconnects the given drbd devices.
1074
1075     This is a multi-node call.
1076
1077     """
1078     return self._MultiNodeCall(node_list, "drbd_attach_net",
1079                                [nodes_ip, [cf.ToDict() for cf in disks],
1080                                 instance_name, multimaster])
1081
1082   @_RpcTimeout(_TMO_SLOW)
1083   def call_drbd_wait_sync(self, node_list, nodes_ip, disks):
1084     """Waits for the synchronization of drbd devices is complete.
1085
1086     This is a multi-node call.
1087
1088     """
1089     return self._MultiNodeCall(node_list, "drbd_wait_sync",
1090                                [nodes_ip, [cf.ToDict() for cf in disks]])
1091
1092   @_RpcTimeout(_TMO_URGENT)
1093   def call_drbd_helper(self, node_list):
1094     """Gets drbd helper.
1095
1096     This is a multi-node call.
1097
1098     """
1099     return self._MultiNodeCall(node_list, "drbd_helper", [])
1100
1101   @classmethod
1102   @_RpcTimeout(_TMO_NORMAL)
1103   def call_upload_file(cls, node_list, file_name, address_list=None):
1104     """Upload a file.
1105
1106     The node will refuse the operation in case the file is not on the
1107     approved file list.
1108
1109     This is a multi-node call.
1110
1111     @type node_list: list
1112     @param node_list: the list of node names to upload to
1113     @type file_name: str
1114     @param file_name: the filename to upload
1115     @type address_list: list or None
1116     @keyword address_list: an optional list of node addresses, in order
1117         to optimize the RPC speed
1118
1119     """
1120     file_contents = utils.ReadFile(file_name)
1121     data = cls._Compress(file_contents)
1122     st = os.stat(file_name)
1123     params = [file_name, data, st.st_mode, st.st_uid, st.st_gid,
1124               st.st_atime, st.st_mtime]
1125     return cls._StaticMultiNodeCall(node_list, "upload_file", params,
1126                                     address_list=address_list)
1127
1128   @classmethod
1129   @_RpcTimeout(_TMO_NORMAL)
1130   def call_write_ssconf_files(cls, node_list, values):
1131     """Write ssconf files.
1132
1133     This is a multi-node call.
1134
1135     """
1136     return cls._StaticMultiNodeCall(node_list, "write_ssconf_files", [values])
1137
1138   @_RpcTimeout(_TMO_FAST)
1139   def call_os_diagnose(self, node_list):
1140     """Request a diagnose of OS definitions.
1141
1142     This is a multi-node call.
1143
1144     """
1145     return self._MultiNodeCall(node_list, "os_diagnose", [])
1146
1147   @_RpcTimeout(_TMO_FAST)
1148   def call_os_get(self, node, name):
1149     """Returns an OS definition.
1150
1151     This is a single-node call.
1152
1153     """
1154     result = self._SingleNodeCall(node, "os_get", [name])
1155     if not result.fail_msg and isinstance(result.payload, dict):
1156       result.payload = objects.OS.FromDict(result.payload)
1157     return result
1158
1159   @_RpcTimeout(_TMO_FAST)
1160   def call_os_validate(self, required, nodes, name, checks, params):
1161     """Run a validation routine for a given OS.
1162
1163     This is a multi-node call.
1164
1165     """
1166     return self._MultiNodeCall(nodes, "os_validate",
1167                                [required, name, checks, params])
1168
1169   @_RpcTimeout(_TMO_NORMAL)
1170   def call_hooks_runner(self, node_list, hpath, phase, env):
1171     """Call the hooks runner.
1172
1173     Args:
1174       - op: the OpCode instance
1175       - env: a dictionary with the environment
1176
1177     This is a multi-node call.
1178
1179     """
1180     params = [hpath, phase, env]
1181     return self._MultiNodeCall(node_list, "hooks_runner", params)
1182
1183   @_RpcTimeout(_TMO_NORMAL)
1184   def call_iallocator_runner(self, node, name, idata):
1185     """Call an iallocator on a remote node
1186
1187     Args:
1188       - name: the iallocator name
1189       - input: the json-encoded input string
1190
1191     This is a single-node call.
1192
1193     """
1194     return self._SingleNodeCall(node, "iallocator_runner", [name, idata])
1195
1196   @_RpcTimeout(_TMO_NORMAL)
1197   def call_blockdev_grow(self, node, cf_bdev, amount):
1198     """Request a snapshot of the given block device.
1199
1200     This is a single-node call.
1201
1202     """
1203     return self._SingleNodeCall(node, "blockdev_grow",
1204                                 [cf_bdev.ToDict(), amount])
1205
1206   @_RpcTimeout(_TMO_1DAY)
1207   def call_blockdev_export(self, node, cf_bdev,
1208                            dest_node, dest_path, cluster_name):
1209     """Export a given disk to another node.
1210
1211     This is a single-node call.
1212
1213     """
1214     return self._SingleNodeCall(node, "blockdev_export",
1215                                 [cf_bdev.ToDict(), dest_node, dest_path,
1216                                  cluster_name])
1217
1218   @_RpcTimeout(_TMO_NORMAL)
1219   def call_blockdev_snapshot(self, node, cf_bdev):
1220     """Request a snapshot of the given block device.
1221
1222     This is a single-node call.
1223
1224     """
1225     return self._SingleNodeCall(node, "blockdev_snapshot", [cf_bdev.ToDict()])
1226
1227   @_RpcTimeout(_TMO_NORMAL)
1228   def call_finalize_export(self, node, instance, snap_disks):
1229     """Request the completion of an export operation.
1230
1231     This writes the export config file, etc.
1232
1233     This is a single-node call.
1234
1235     """
1236     flat_disks = []
1237     for disk in snap_disks:
1238       if isinstance(disk, bool):
1239         flat_disks.append(disk)
1240       else:
1241         flat_disks.append(disk.ToDict())
1242
1243     return self._SingleNodeCall(node, "finalize_export",
1244                                 [self._InstDict(instance), flat_disks])
1245
1246   @_RpcTimeout(_TMO_FAST)
1247   def call_export_info(self, node, path):
1248     """Queries the export information in a given path.
1249
1250     This is a single-node call.
1251
1252     """
1253     return self._SingleNodeCall(node, "export_info", [path])
1254
1255   @_RpcTimeout(_TMO_FAST)
1256   def call_export_list(self, node_list):
1257     """Gets the stored exports list.
1258
1259     This is a multi-node call.
1260
1261     """
1262     return self._MultiNodeCall(node_list, "export_list", [])
1263
1264   @_RpcTimeout(_TMO_FAST)
1265   def call_export_remove(self, node, export):
1266     """Requests removal of a given export.
1267
1268     This is a single-node call.
1269
1270     """
1271     return self._SingleNodeCall(node, "export_remove", [export])
1272
1273   @classmethod
1274   @_RpcTimeout(_TMO_NORMAL)
1275   def call_node_leave_cluster(cls, node, modify_ssh_setup):
1276     """Requests a node to clean the cluster information it has.
1277
1278     This will remove the configuration information from the ganeti data
1279     dir.
1280
1281     This is a single-node call.
1282
1283     """
1284     return cls._StaticSingleNodeCall(node, "node_leave_cluster",
1285                                      [modify_ssh_setup])
1286
1287   @_RpcTimeout(_TMO_FAST)
1288   def call_node_volumes(self, node_list):
1289     """Gets all volumes on node(s).
1290
1291     This is a multi-node call.
1292
1293     """
1294     return self._MultiNodeCall(node_list, "node_volumes", [])
1295
1296   @_RpcTimeout(_TMO_FAST)
1297   def call_node_demote_from_mc(self, node):
1298     """Demote a node from the master candidate role.
1299
1300     This is a single-node call.
1301
1302     """
1303     return self._SingleNodeCall(node, "node_demote_from_mc", [])
1304
1305   @_RpcTimeout(_TMO_NORMAL)
1306   def call_node_powercycle(self, node, hypervisor):
1307     """Tries to powercycle a node.
1308
1309     This is a single-node call.
1310
1311     """
1312     return self._SingleNodeCall(node, "node_powercycle", [hypervisor])
1313
1314   @_RpcTimeout(None)
1315   def call_test_delay(self, node_list, duration):
1316     """Sleep for a fixed time on given node(s).
1317
1318     This is a multi-node call.
1319
1320     """
1321     return self._MultiNodeCall(node_list, "test_delay", [duration],
1322                                read_timeout=int(duration + 5))
1323
1324   @_RpcTimeout(_TMO_FAST)
1325   def call_file_storage_dir_create(self, node, file_storage_dir):
1326     """Create the given file storage directory.
1327
1328     This is a single-node call.
1329
1330     """
1331     return self._SingleNodeCall(node, "file_storage_dir_create",
1332                                 [file_storage_dir])
1333
1334   @_RpcTimeout(_TMO_FAST)
1335   def call_file_storage_dir_remove(self, node, file_storage_dir):
1336     """Remove the given file storage directory.
1337
1338     This is a single-node call.
1339
1340     """
1341     return self._SingleNodeCall(node, "file_storage_dir_remove",
1342                                 [file_storage_dir])
1343
1344   @_RpcTimeout(_TMO_FAST)
1345   def call_file_storage_dir_rename(self, node, old_file_storage_dir,
1346                                    new_file_storage_dir):
1347     """Rename file storage directory.
1348
1349     This is a single-node call.
1350
1351     """
1352     return self._SingleNodeCall(node, "file_storage_dir_rename",
1353                                 [old_file_storage_dir, new_file_storage_dir])
1354
1355   @classmethod
1356   @_RpcTimeout(_TMO_FAST)
1357   def call_jobqueue_update(cls, node_list, address_list, file_name, content):
1358     """Update job queue.
1359
1360     This is a multi-node call.
1361
1362     """
1363     return cls._StaticMultiNodeCall(node_list, "jobqueue_update",
1364                                     [file_name, cls._Compress(content)],
1365                                     address_list=address_list)
1366
1367   @classmethod
1368   @_RpcTimeout(_TMO_NORMAL)
1369   def call_jobqueue_purge(cls, node):
1370     """Purge job queue.
1371
1372     This is a single-node call.
1373
1374     """
1375     return cls._StaticSingleNodeCall(node, "jobqueue_purge", [])
1376
1377   @classmethod
1378   @_RpcTimeout(_TMO_FAST)
1379   def call_jobqueue_rename(cls, node_list, address_list, rename):
1380     """Rename a job queue file.
1381
1382     This is a multi-node call.
1383
1384     """
1385     return cls._StaticMultiNodeCall(node_list, "jobqueue_rename", rename,
1386                                     address_list=address_list)
1387
1388   @_RpcTimeout(_TMO_NORMAL)
1389   def call_hypervisor_validate_params(self, node_list, hvname, hvparams):
1390     """Validate the hypervisor params.
1391
1392     This is a multi-node call.
1393
1394     @type node_list: list
1395     @param node_list: the list of nodes to query
1396     @type hvname: string
1397     @param hvname: the hypervisor name
1398     @type hvparams: dict
1399     @param hvparams: the hypervisor parameters to be validated
1400
1401     """
1402     cluster = self._cfg.GetClusterInfo()
1403     hv_full = objects.FillDict(cluster.hvparams.get(hvname, {}), hvparams)
1404     return self._MultiNodeCall(node_list, "hypervisor_validate_params",
1405                                [hvname, hv_full])
1406
1407   @_RpcTimeout(_TMO_NORMAL)
1408   def call_x509_cert_create(self, node, validity):
1409     """Creates a new X509 certificate for SSL/TLS.
1410
1411     This is a single-node call.
1412
1413     @type validity: int
1414     @param validity: Validity in seconds
1415
1416     """
1417     return self._SingleNodeCall(node, "x509_cert_create", [validity])
1418
1419   @_RpcTimeout(_TMO_NORMAL)
1420   def call_x509_cert_remove(self, node, name):
1421     """Removes a X509 certificate.
1422
1423     This is a single-node call.
1424
1425     @type name: string
1426     @param name: Certificate name
1427
1428     """
1429     return self._SingleNodeCall(node, "x509_cert_remove", [name])
1430
1431   @_RpcTimeout(_TMO_NORMAL)
1432   def call_import_start(self, node, opts, instance, dest, dest_args):
1433     """Starts a listener for an import.
1434
1435     This is a single-node call.
1436
1437     @type node: string
1438     @param node: Node name
1439     @type instance: C{objects.Instance}
1440     @param instance: Instance object
1441
1442     """
1443     return self._SingleNodeCall(node, "import_start",
1444                                 [opts.ToDict(),
1445                                  self._InstDict(instance), dest,
1446                                  _EncodeImportExportIO(dest, dest_args)])
1447
1448   @_RpcTimeout(_TMO_NORMAL)
1449   def call_export_start(self, node, opts, host, port,
1450                         instance, source, source_args):
1451     """Starts an export daemon.
1452
1453     This is a single-node call.
1454
1455     @type node: string
1456     @param node: Node name
1457     @type instance: C{objects.Instance}
1458     @param instance: Instance object
1459
1460     """
1461     return self._SingleNodeCall(node, "export_start",
1462                                 [opts.ToDict(), host, port,
1463                                  self._InstDict(instance), source,
1464                                  _EncodeImportExportIO(source, source_args)])
1465
1466   @_RpcTimeout(_TMO_FAST)
1467   def call_impexp_status(self, node, names):
1468     """Gets the status of an import or export.
1469
1470     This is a single-node call.
1471
1472     @type node: string
1473     @param node: Node name
1474     @type names: List of strings
1475     @param names: Import/export names
1476     @rtype: List of L{objects.ImportExportStatus} instances
1477     @return: Returns a list of the state of each named import/export or None if
1478              a status couldn't be retrieved
1479
1480     """
1481     result = self._SingleNodeCall(node, "impexp_status", [names])
1482
1483     if not result.fail_msg:
1484       decoded = []
1485
1486       for i in result.payload:
1487         if i is None:
1488           decoded.append(None)
1489           continue
1490         decoded.append(objects.ImportExportStatus.FromDict(i))
1491
1492       result.payload = decoded
1493
1494     return result
1495
1496   @_RpcTimeout(_TMO_NORMAL)
1497   def call_impexp_abort(self, node, name):
1498     """Aborts an import or export.
1499
1500     This is a single-node call.
1501
1502     @type node: string
1503     @param node: Node name
1504     @type name: string
1505     @param name: Import/export name
1506
1507     """
1508     return self._SingleNodeCall(node, "impexp_abort", [name])
1509
1510   @_RpcTimeout(_TMO_NORMAL)
1511   def call_impexp_cleanup(self, node, name):
1512     """Cleans up after an import or export.
1513
1514     This is a single-node call.
1515
1516     @type node: string
1517     @param node: Node name
1518     @type name: string
1519     @param name: Import/export name
1520
1521     """
1522     return self._SingleNodeCall(node, "impexp_cleanup", [name])