rpc: Convert misc calls
[ganeti-local] / lib / rpc.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Inter-node RPC library.
23
24 """
25
26 # pylint: disable=C0103,R0201,R0904
27 # C0103: Invalid name, since call_ are not valid
28 # R0201: Method could be a function, we keep all rpcs instance methods
29 # as not to change them back and forth between static/instance methods
30 # if they need to start using instance attributes
31 # R0904: Too many public methods
32
33 import os
34 import logging
35 import zlib
36 import base64
37 import pycurl
38 import threading
39
40 from ganeti import utils
41 from ganeti import objects
42 from ganeti import http
43 from ganeti import serializer
44 from ganeti import constants
45 from ganeti import errors
46 from ganeti import netutils
47 from ganeti import ssconf
48 from ganeti import runtime
49 from ganeti import compat
50
51 # Special module generated at build time
52 from ganeti import _generated_rpc
53
54 # pylint has a bug here, doesn't see this import
55 import ganeti.http.client  # pylint: disable=W0611
56
57
58 # Timeout for connecting to nodes (seconds)
59 _RPC_CONNECT_TIMEOUT = 5
60
61 _RPC_CLIENT_HEADERS = [
62   "Content-type: %s" % http.HTTP_APP_JSON,
63   "Expect:",
64   ]
65
66 # Various time constants for the timeout table
67 _TMO_URGENT = 60 # one minute
68 _TMO_FAST = 5 * 60 # five minutes
69 _TMO_NORMAL = 15 * 60 # 15 minutes
70 _TMO_SLOW = 3600 # one hour
71 _TMO_4HRS = 4 * 3600
72 _TMO_1DAY = 86400
73
74 # Timeout table that will be built later by decorators
75 # Guidelines for choosing timeouts:
76 # - call used during watcher: timeout -> 1min, _TMO_URGENT
77 # - trivial (but be sure it is trivial) (e.g. reading a file): 5min, _TMO_FAST
78 # - other calls: 15 min, _TMO_NORMAL
79 # - special calls (instance add, etc.): either _TMO_SLOW (1h) or huge timeouts
80
81 _TIMEOUTS = {
82 }
83
84 #: Special value to describe an offline host
85 _OFFLINE = object()
86
87
88 def Init():
89   """Initializes the module-global HTTP client manager.
90
91   Must be called before using any RPC function and while exactly one thread is
92   running.
93
94   """
95   # curl_global_init(3) and curl_global_cleanup(3) must be called with only
96   # one thread running. This check is just a safety measure -- it doesn't
97   # cover all cases.
98   assert threading.activeCount() == 1, \
99          "Found more than one active thread when initializing pycURL"
100
101   logging.info("Using PycURL %s", pycurl.version)
102
103   pycurl.global_init(pycurl.GLOBAL_ALL)
104
105
106 def Shutdown():
107   """Stops the module-global HTTP client manager.
108
109   Must be called before quitting the program and while exactly one thread is
110   running.
111
112   """
113   pycurl.global_cleanup()
114
115
116 def _ConfigRpcCurl(curl):
117   noded_cert = str(constants.NODED_CERT_FILE)
118
119   curl.setopt(pycurl.FOLLOWLOCATION, False)
120   curl.setopt(pycurl.CAINFO, noded_cert)
121   curl.setopt(pycurl.SSL_VERIFYHOST, 0)
122   curl.setopt(pycurl.SSL_VERIFYPEER, True)
123   curl.setopt(pycurl.SSLCERTTYPE, "PEM")
124   curl.setopt(pycurl.SSLCERT, noded_cert)
125   curl.setopt(pycurl.SSLKEYTYPE, "PEM")
126   curl.setopt(pycurl.SSLKEY, noded_cert)
127   curl.setopt(pycurl.CONNECTTIMEOUT, _RPC_CONNECT_TIMEOUT)
128
129
130 def _RpcTimeout(secs):
131   """Timeout decorator.
132
133   When applied to a rpc call_* function, it updates the global timeout
134   table with the given function/timeout.
135
136   """
137   def decorator(f):
138     name = f.__name__
139     assert name.startswith("call_")
140     _TIMEOUTS[name[len("call_"):]] = secs
141     return f
142   return decorator
143
144
145 def RunWithRPC(fn):
146   """RPC-wrapper decorator.
147
148   When applied to a function, it runs it with the RPC system
149   initialized, and it shutsdown the system afterwards. This means the
150   function must be called without RPC being initialized.
151
152   """
153   def wrapper(*args, **kwargs):
154     Init()
155     try:
156       return fn(*args, **kwargs)
157     finally:
158       Shutdown()
159   return wrapper
160
161
162 def _Compress(data):
163   """Compresses a string for transport over RPC.
164
165   Small amounts of data are not compressed.
166
167   @type data: str
168   @param data: Data
169   @rtype: tuple
170   @return: Encoded data to send
171
172   """
173   # Small amounts of data are not compressed
174   if len(data) < 512:
175     return (constants.RPC_ENCODING_NONE, data)
176
177   # Compress with zlib and encode in base64
178   return (constants.RPC_ENCODING_ZLIB_BASE64,
179           base64.b64encode(zlib.compress(data, 3)))
180
181
182 class RpcResult(object):
183   """RPC Result class.
184
185   This class holds an RPC result. It is needed since in multi-node
186   calls we can't raise an exception just because one one out of many
187   failed, and therefore we use this class to encapsulate the result.
188
189   @ivar data: the data payload, for successful results, or None
190   @ivar call: the name of the RPC call
191   @ivar node: the name of the node to which we made the call
192   @ivar offline: whether the operation failed because the node was
193       offline, as opposed to actual failure; offline=True will always
194       imply failed=True, in order to allow simpler checking if
195       the user doesn't care about the exact failure mode
196   @ivar fail_msg: the error message if the call failed
197
198   """
199   def __init__(self, data=None, failed=False, offline=False,
200                call=None, node=None):
201     self.offline = offline
202     self.call = call
203     self.node = node
204
205     if offline:
206       self.fail_msg = "Node is marked offline"
207       self.data = self.payload = None
208     elif failed:
209       self.fail_msg = self._EnsureErr(data)
210       self.data = self.payload = None
211     else:
212       self.data = data
213       if not isinstance(self.data, (tuple, list)):
214         self.fail_msg = ("RPC layer error: invalid result type (%s)" %
215                          type(self.data))
216         self.payload = None
217       elif len(data) != 2:
218         self.fail_msg = ("RPC layer error: invalid result length (%d), "
219                          "expected 2" % len(self.data))
220         self.payload = None
221       elif not self.data[0]:
222         self.fail_msg = self._EnsureErr(self.data[1])
223         self.payload = None
224       else:
225         # finally success
226         self.fail_msg = None
227         self.payload = data[1]
228
229     for attr_name in ["call", "data", "fail_msg",
230                       "node", "offline", "payload"]:
231       assert hasattr(self, attr_name), "Missing attribute %s" % attr_name
232
233   @staticmethod
234   def _EnsureErr(val):
235     """Helper to ensure we return a 'True' value for error."""
236     if val:
237       return val
238     else:
239       return "No error information"
240
241   def Raise(self, msg, prereq=False, ecode=None):
242     """If the result has failed, raise an OpExecError.
243
244     This is used so that LU code doesn't have to check for each
245     result, but instead can call this function.
246
247     """
248     if not self.fail_msg:
249       return
250
251     if not msg: # one could pass None for default message
252       msg = ("Call '%s' to node '%s' has failed: %s" %
253              (self.call, self.node, self.fail_msg))
254     else:
255       msg = "%s: %s" % (msg, self.fail_msg)
256     if prereq:
257       ec = errors.OpPrereqError
258     else:
259       ec = errors.OpExecError
260     if ecode is not None:
261       args = (msg, ecode)
262     else:
263       args = (msg, )
264     raise ec(*args) # pylint: disable=W0142
265
266
267 def _SsconfResolver(node_list,
268                     ssc=ssconf.SimpleStore,
269                     nslookup_fn=netutils.Hostname.GetIP):
270   """Return addresses for given node names.
271
272   @type node_list: list
273   @param node_list: List of node names
274   @type ssc: class
275   @param ssc: SimpleStore class that is used to obtain node->ip mappings
276   @type nslookup_fn: callable
277   @param nslookup_fn: function use to do NS lookup
278   @rtype: list of tuple; (string, string)
279   @return: List of tuples containing node name and IP address
280
281   """
282   ss = ssc()
283   iplist = ss.GetNodePrimaryIPList()
284   family = ss.GetPrimaryIPFamily()
285   ipmap = dict(entry.split() for entry in iplist)
286
287   result = []
288   for node in node_list:
289     ip = ipmap.get(node)
290     if ip is None:
291       ip = nslookup_fn(node, family=family)
292     result.append((node, ip))
293
294   return result
295
296
297 class _StaticResolver:
298   def __init__(self, addresses):
299     """Initializes this class.
300
301     """
302     self._addresses = addresses
303
304   def __call__(self, hosts):
305     """Returns static addresses for hosts.
306
307     """
308     assert len(hosts) == len(self._addresses)
309     return zip(hosts, self._addresses)
310
311
312 def _CheckConfigNode(name, node):
313   """Checks if a node is online.
314
315   @type name: string
316   @param name: Node name
317   @type node: L{objects.Node} or None
318   @param node: Node object
319
320   """
321   if node is None:
322     # Depend on DNS for name resolution
323     ip = name
324   elif node.offline:
325     ip = _OFFLINE
326   else:
327     ip = node.primary_ip
328   return (name, ip)
329
330
331 def _NodeConfigResolver(single_node_fn, all_nodes_fn, hosts):
332   """Calculate node addresses using configuration.
333
334   """
335   # Special case for single-host lookups
336   if len(hosts) == 1:
337     (name, ) = hosts
338     return [_CheckConfigNode(name, single_node_fn(name))]
339   else:
340     all_nodes = all_nodes_fn()
341     return [_CheckConfigNode(name, all_nodes.get(name, None))
342             for name in hosts]
343
344
345 class _RpcProcessor:
346   def __init__(self, resolver, port, lock_monitor_cb=None):
347     """Initializes this class.
348
349     @param resolver: callable accepting a list of hostnames, returning a list
350       of tuples containing name and IP address (IP address can be the name or
351       the special value L{_OFFLINE} to mark offline machines)
352     @type port: int
353     @param port: TCP port
354     @param lock_monitor_cb: Callable for registering with lock monitor
355
356     """
357     self._resolver = resolver
358     self._port = port
359     self._lock_monitor_cb = lock_monitor_cb
360
361   @staticmethod
362   def _PrepareRequests(hosts, port, procedure, body, read_timeout):
363     """Prepares requests by sorting offline hosts into separate list.
364
365     """
366     results = {}
367     requests = {}
368
369     for (name, ip) in hosts:
370       if ip is _OFFLINE:
371         # Node is marked as offline
372         results[name] = RpcResult(node=name, offline=True, call=procedure)
373       else:
374         requests[name] = \
375           http.client.HttpClientRequest(str(ip), port,
376                                         http.HTTP_PUT, str("/%s" % procedure),
377                                         headers=_RPC_CLIENT_HEADERS,
378                                         post_data=body,
379                                         read_timeout=read_timeout,
380                                         nicename="%s/%s" % (name, procedure),
381                                         curl_config_fn=_ConfigRpcCurl)
382
383     return (results, requests)
384
385   @staticmethod
386   def _CombineResults(results, requests, procedure):
387     """Combines pre-computed results for offline hosts with actual call results.
388
389     """
390     for name, req in requests.items():
391       if req.success and req.resp_status_code == http.HTTP_OK:
392         host_result = RpcResult(data=serializer.LoadJson(req.resp_body),
393                                 node=name, call=procedure)
394       else:
395         # TODO: Better error reporting
396         if req.error:
397           msg = req.error
398         else:
399           msg = req.resp_body
400
401         logging.error("RPC error in %s on node %s: %s", procedure, name, msg)
402         host_result = RpcResult(data=msg, failed=True, node=name,
403                                 call=procedure)
404
405       results[name] = host_result
406
407     return results
408
409   def __call__(self, hosts, procedure, body, read_timeout=None,
410                _req_process_fn=http.client.ProcessRequests):
411     """Makes an RPC request to a number of nodes.
412
413     @type hosts: sequence
414     @param hosts: Hostnames
415     @type procedure: string
416     @param procedure: Request path
417     @type body: string
418     @param body: Request body
419     @type read_timeout: int or None
420     @param read_timeout: Read timeout for request
421
422     """
423     if read_timeout is None:
424       read_timeout = _TIMEOUTS.get(procedure, None)
425
426     assert read_timeout is not None, \
427       "Missing RPC read timeout for procedure '%s'" % procedure
428
429     (results, requests) = \
430       self._PrepareRequests(self._resolver(hosts), self._port, procedure,
431                             str(body), read_timeout)
432
433     _req_process_fn(requests.values(), lock_monitor_cb=self._lock_monitor_cb)
434
435     assert not frozenset(results).intersection(requests)
436
437     return self._CombineResults(results, requests, procedure)
438
439
440 def _EncodeImportExportIO(ieio, ieioargs):
441   """Encodes import/export I/O information.
442
443   """
444   if ieio == constants.IEIO_RAW_DISK:
445     assert len(ieioargs) == 1
446     return (ieioargs[0].ToDict(), )
447
448   if ieio == constants.IEIO_SCRIPT:
449     assert len(ieioargs) == 2
450     return (ieioargs[0].ToDict(), ieioargs[1])
451
452   return ieioargs
453
454
455 class RpcRunner(_generated_rpc.RpcClientDefault):
456   """RPC runner class.
457
458   """
459   def __init__(self, context):
460     """Initialized the RPC runner.
461
462     @type context: C{masterd.GanetiContext}
463     @param context: Ganeti context
464
465     """
466     _generated_rpc.RpcClientDefault.__init__(self)
467
468     self._cfg = context.cfg
469     self._proc = _RpcProcessor(compat.partial(_NodeConfigResolver,
470                                               self._cfg.GetNodeInfo,
471                                               self._cfg.GetAllNodesInfo),
472                                netutils.GetDaemonPort(constants.NODED),
473                                lock_monitor_cb=context.glm.AddToLockMonitor)
474
475   def _InstDict(self, instance, hvp=None, bep=None, osp=None):
476     """Convert the given instance to a dict.
477
478     This is done via the instance's ToDict() method and additionally
479     we fill the hvparams with the cluster defaults.
480
481     @type instance: L{objects.Instance}
482     @param instance: an Instance object
483     @type hvp: dict or None
484     @param hvp: a dictionary with overridden hypervisor parameters
485     @type bep: dict or None
486     @param bep: a dictionary with overridden backend parameters
487     @type osp: dict or None
488     @param osp: a dictionary with overridden os parameters
489     @rtype: dict
490     @return: the instance dict, with the hvparams filled with the
491         cluster defaults
492
493     """
494     idict = instance.ToDict()
495     cluster = self._cfg.GetClusterInfo()
496     idict["hvparams"] = cluster.FillHV(instance)
497     if hvp is not None:
498       idict["hvparams"].update(hvp)
499     idict["beparams"] = cluster.FillBE(instance)
500     if bep is not None:
501       idict["beparams"].update(bep)
502     idict["osparams"] = cluster.SimpleFillOS(instance.os, instance.osparams)
503     if osp is not None:
504       idict["osparams"].update(osp)
505     for nic in idict["nics"]:
506       nic['nicparams'] = objects.FillDict(
507         cluster.nicparams[constants.PP_DEFAULT],
508         nic['nicparams'])
509     return idict
510
511   def _MultiNodeCall(self, node_list, procedure, args, read_timeout=None):
512     """Helper for making a multi-node call
513
514     """
515     body = serializer.DumpJson(args, indent=False)
516     return self._proc(node_list, procedure, body, read_timeout=read_timeout)
517
518   def _Call(self, node_list, procedure, timeout, args):
519     """Entry point for automatically generated RPC wrappers.
520
521     """
522     return self._MultiNodeCall(node_list, procedure, args, read_timeout=timeout)
523
524   @staticmethod
525   def _StaticMultiNodeCall(node_list, procedure, args,
526                            address_list=None, read_timeout=None):
527     """Helper for making a multi-node static call
528
529     """
530     body = serializer.DumpJson(args, indent=False)
531
532     if address_list is None:
533       resolver = _SsconfResolver
534     else:
535       # Caller provided an address list
536       resolver = _StaticResolver(address_list)
537
538     proc = _RpcProcessor(resolver,
539                          netutils.GetDaemonPort(constants.NODED))
540     return proc(node_list, procedure, body, read_timeout=read_timeout)
541
542   def _SingleNodeCall(self, node, procedure, args, read_timeout=None):
543     """Helper for making a single-node call
544
545     """
546     body = serializer.DumpJson(args, indent=False)
547     return self._proc([node], procedure, body, read_timeout=read_timeout)[node]
548
549   @classmethod
550   def _StaticSingleNodeCall(cls, node, procedure, args, read_timeout=None):
551     """Helper for making a single-node static call
552
553     """
554     body = serializer.DumpJson(args, indent=False)
555     proc = _RpcProcessor(_SsconfResolver,
556                          netutils.GetDaemonPort(constants.NODED))
557     return proc([node], procedure, body, read_timeout=read_timeout)[node]
558
559   @staticmethod
560   def _BlockdevFindPostProc(result):
561     if not result.fail_msg and result.payload is not None:
562       result.payload = objects.BlockDevStatus.FromDict(result.payload)
563     return result
564
565   @staticmethod
566   def _BlockdevGetMirrorStatusPostProc(result):
567     if not result.fail_msg:
568       result.payload = [objects.BlockDevStatus.FromDict(i)
569                         for i in result.payload]
570     return result
571
572   @staticmethod
573   def _BlockdevGetMirrorStatusMultiPostProc(result):
574     for nres in result.values():
575       if nres.fail_msg:
576         continue
577
578       for idx, (success, status) in enumerate(nres.payload):
579         if success:
580           nres.payload[idx] = (success, objects.BlockDevStatus.FromDict(status))
581
582     return result
583
584   @staticmethod
585   def _OsGetPostProc(result):
586     if not result.fail_msg and isinstance(result.payload, dict):
587       result.payload = objects.OS.FromDict(result.payload)
588     return result
589
590   @staticmethod
591   def _PrepareFinalizeExportDisks(snap_disks):
592     flat_disks = []
593
594     for disk in snap_disks:
595       if isinstance(disk, bool):
596         flat_disks.append(disk)
597       else:
598         flat_disks.append(disk.ToDict())
599
600     return flat_disks
601
602   @staticmethod
603   def _ImpExpStatusPostProc(result):
604     """Post-processor for import/export status.
605
606     @rtype: Payload containing list of L{objects.ImportExportStatus} instances
607     @return: Returns a list of the state of each named import/export or None if
608              a status couldn't be retrieved
609
610     """
611     if not result.fail_msg:
612       decoded = []
613
614       for i in result.payload:
615         if i is None:
616           decoded.append(None)
617           continue
618         decoded.append(objects.ImportExportStatus.FromDict(i))
619
620       result.payload = decoded
621
622     return result
623
624   #
625   # Begin RPC calls
626   #
627
628   @_RpcTimeout(_TMO_URGENT)
629   def call_bdev_sizes(self, node_list, devices):
630     """Gets the sizes of requested block devices present on a node
631
632     This is a multi-node call.
633
634     """
635     return self._MultiNodeCall(node_list, "bdev_sizes", [devices])
636
637   @_RpcTimeout(_TMO_NORMAL)
638   def call_storage_list(self, node_list, su_name, su_args, name, fields):
639     """Get list of storage units.
640
641     This is a multi-node call.
642
643     """
644     return self._MultiNodeCall(node_list, "storage_list",
645                                [su_name, su_args, name, fields])
646
647   @_RpcTimeout(_TMO_NORMAL)
648   def call_storage_modify(self, node, su_name, su_args, name, changes):
649     """Modify a storage unit.
650
651     This is a single-node call.
652
653     """
654     return self._SingleNodeCall(node, "storage_modify",
655                                 [su_name, su_args, name, changes])
656
657   @_RpcTimeout(_TMO_NORMAL)
658   def call_storage_execute(self, node, su_name, su_args, name, op):
659     """Executes an operation on a storage unit.
660
661     This is a single-node call.
662
663     """
664     return self._SingleNodeCall(node, "storage_execute",
665                                 [su_name, su_args, name, op])
666
667   @_RpcTimeout(_TMO_NORMAL)
668   def call_instance_start(self, node, instance, hvp, bep, startup_paused):
669     """Starts an instance.
670
671     This is a single-node call.
672
673     """
674     idict = self._InstDict(instance, hvp=hvp, bep=bep)
675     return self._SingleNodeCall(node, "instance_start", [idict, startup_paused])
676
677   @_RpcTimeout(_TMO_NORMAL)
678   def call_instance_shutdown(self, node, instance, timeout):
679     """Stops an instance.
680
681     This is a single-node call.
682
683     """
684     return self._SingleNodeCall(node, "instance_shutdown",
685                                 [self._InstDict(instance), timeout])
686
687   @_RpcTimeout(_TMO_NORMAL)
688   def call_migration_info(self, node, instance):
689     """Gather the information necessary to prepare an instance migration.
690
691     This is a single-node call.
692
693     @type node: string
694     @param node: the node on which the instance is currently running
695     @type instance: C{objects.Instance}
696     @param instance: the instance definition
697
698     """
699     return self._SingleNodeCall(node, "migration_info",
700                                 [self._InstDict(instance)])
701
702   @_RpcTimeout(_TMO_NORMAL)
703   def call_accept_instance(self, node, instance, info, target):
704     """Prepare a node to accept an instance.
705
706     This is a single-node call.
707
708     @type node: string
709     @param node: the target node for the migration
710     @type instance: C{objects.Instance}
711     @param instance: the instance definition
712     @type info: opaque/hypervisor specific (string/data)
713     @param info: result for the call_migration_info call
714     @type target: string
715     @param target: target hostname (usually ip address) (on the node itself)
716
717     """
718     return self._SingleNodeCall(node, "accept_instance",
719                                 [self._InstDict(instance), info, target])
720
721   @_RpcTimeout(_TMO_NORMAL)
722   def call_instance_finalize_migration_dst(self, node, instance, info, success):
723     """Finalize any target-node migration specific operation.
724
725     This is called both in case of a successful migration and in case of error
726     (in which case it should abort the migration).
727
728     This is a single-node call.
729
730     @type node: string
731     @param node: the target node for the migration
732     @type instance: C{objects.Instance}
733     @param instance: the instance definition
734     @type info: opaque/hypervisor specific (string/data)
735     @param info: result for the call_migration_info call
736     @type success: boolean
737     @param success: whether the migration was a success or a failure
738
739     """
740     return self._SingleNodeCall(node, "instance_finalize_migration_dst",
741                                 [self._InstDict(instance), info, success])
742
743   @_RpcTimeout(_TMO_SLOW)
744   def call_instance_migrate(self, node, instance, target, live):
745     """Migrate an instance.
746
747     This is a single-node call.
748
749     @type node: string
750     @param node: the node on which the instance is currently running
751     @type instance: C{objects.Instance}
752     @param instance: the instance definition
753     @type target: string
754     @param target: the target node name
755     @type live: boolean
756     @param live: whether the migration should be done live or not (the
757         interpretation of this parameter is left to the hypervisor)
758
759     """
760     return self._SingleNodeCall(node, "instance_migrate",
761                                 [self._InstDict(instance), target, live])
762
763   @_RpcTimeout(_TMO_SLOW)
764   def call_instance_finalize_migration_src(self, node, instance, success, live):
765     """Finalize the instance migration on the source node.
766
767     This is a single-node call.
768
769     @type instance: L{objects.Instance}
770     @param instance: the instance that was migrated
771     @type success: bool
772     @param success: whether the migration succeeded or not
773     @type live: bool
774     @param live: whether the user requested a live migration or not
775
776     """
777     return self._SingleNodeCall(node, "instance_finalize_migration_src",
778                                 [self._InstDict(instance), success, live])
779
780   @_RpcTimeout(_TMO_SLOW)
781   def call_instance_get_migration_status(self, node, instance):
782     """Report migration status.
783
784     This is a single-node call that must be executed on the source node.
785
786     @type instance: L{objects.Instance}
787     @param instance: the instance that is being migrated
788     @rtype: L{objects.MigrationStatus}
789     @return: the status of the current migration (one of
790              L{constants.HV_MIGRATION_VALID_STATUSES}), plus any additional
791              progress info that can be retrieved from the hypervisor
792
793     """
794     result = self._SingleNodeCall(node, "instance_get_migration_status",
795                                   [self._InstDict(instance)])
796     if not result.fail_msg and result.payload is not None:
797       result.payload = objects.MigrationStatus.FromDict(result.payload)
798     return result
799
800   @_RpcTimeout(_TMO_NORMAL)
801   def call_instance_reboot(self, node, inst, reboot_type, shutdown_timeout):
802     """Reboots an instance.
803
804     This is a single-node call.
805
806     """
807     return self._SingleNodeCall(node, "instance_reboot",
808                                 [self._InstDict(inst), reboot_type,
809                                  shutdown_timeout])
810
811   @_RpcTimeout(_TMO_1DAY)
812   def call_instance_os_add(self, node, inst, reinstall, debug, osparams=None):
813     """Installs an OS on the given instance.
814
815     This is a single-node call.
816
817     """
818     return self._SingleNodeCall(node, "instance_os_add",
819                                 [self._InstDict(inst, osp=osparams),
820                                  reinstall, debug])
821
822   @_RpcTimeout(_TMO_SLOW)
823   def call_instance_run_rename(self, node, inst, old_name, debug):
824     """Run the OS rename script for an instance.
825
826     This is a single-node call.
827
828     """
829     return self._SingleNodeCall(node, "instance_run_rename",
830                                 [self._InstDict(inst), old_name, debug])
831
832   @_RpcTimeout(_TMO_URGENT)
833   def call_instance_info(self, node, instance, hname):
834     """Returns information about a single instance.
835
836     This is a single-node call.
837
838     @type node: list
839     @param node: the list of nodes to query
840     @type instance: string
841     @param instance: the instance name
842     @type hname: string
843     @param hname: the hypervisor type of the instance
844
845     """
846     return self._SingleNodeCall(node, "instance_info", [instance, hname])
847
848   @_RpcTimeout(_TMO_NORMAL)
849   def call_instance_migratable(self, node, instance):
850     """Checks whether the given instance can be migrated.
851
852     This is a single-node call.
853
854     @param node: the node to query
855     @type instance: L{objects.Instance}
856     @param instance: the instance to check
857
858
859     """
860     return self._SingleNodeCall(node, "instance_migratable",
861                                 [self._InstDict(instance)])
862
863   @_RpcTimeout(_TMO_URGENT)
864   def call_all_instances_info(self, node_list, hypervisor_list):
865     """Returns information about all instances on the given nodes.
866
867     This is a multi-node call.
868
869     @type node_list: list
870     @param node_list: the list of nodes to query
871     @type hypervisor_list: list
872     @param hypervisor_list: the hypervisors to query for instances
873
874     """
875     return self._MultiNodeCall(node_list, "all_instances_info",
876                                [hypervisor_list])
877
878   @_RpcTimeout(_TMO_URGENT)
879   def call_instance_list(self, node_list, hypervisor_list):
880     """Returns the list of running instances on a given node.
881
882     This is a multi-node call.
883
884     @type node_list: list
885     @param node_list: the list of nodes to query
886     @type hypervisor_list: list
887     @param hypervisor_list: the hypervisors to query for instances
888
889     """
890     return self._MultiNodeCall(node_list, "instance_list", [hypervisor_list])
891
892   @classmethod
893   @_RpcTimeout(_TMO_FAST)
894   def call_node_start_master_daemons(cls, node, no_voting):
895     """Starts master daemons on a node.
896
897     This is a single-node call.
898
899     """
900     return cls._StaticSingleNodeCall(node, "node_start_master_daemons",
901                                      [no_voting])
902
903   @classmethod
904   @_RpcTimeout(_TMO_FAST)
905   def call_node_activate_master_ip(cls, node):
906     """Activates master IP on a node.
907
908     This is a single-node call.
909
910     """
911     return cls._StaticSingleNodeCall(node, "node_activate_master_ip", [])
912
913   @classmethod
914   @_RpcTimeout(_TMO_FAST)
915   def call_node_stop_master(cls, node):
916     """Deactivates master IP and stops master daemons on a node.
917
918     This is a single-node call.
919
920     """
921     return cls._StaticSingleNodeCall(node, "node_stop_master", [])
922
923   @classmethod
924   @_RpcTimeout(_TMO_FAST)
925   def call_node_deactivate_master_ip(cls, node):
926     """Deactivates master IP on a node.
927
928     This is a single-node call.
929
930     """
931     return cls._StaticSingleNodeCall(node, "node_deactivate_master_ip", [])
932
933   @classmethod
934   @_RpcTimeout(_TMO_FAST)
935   def call_node_change_master_netmask(cls, node, netmask):
936     """Change master IP netmask.
937
938     This is a single-node call.
939
940     """
941     return cls._StaticSingleNodeCall(node, "node_change_master_netmask",
942                   [netmask])
943
944   @classmethod
945   @_RpcTimeout(_TMO_URGENT)
946   def call_master_info(cls, node_list):
947     """Query master info.
948
949     This is a multi-node call.
950
951     """
952     # TODO: should this method query down nodes?
953     return cls._StaticMultiNodeCall(node_list, "master_info", [])
954
955   @classmethod
956   @_RpcTimeout(_TMO_URGENT)
957   def call_version(cls, node_list):
958     """Query node version.
959
960     This is a multi-node call.
961
962     """
963     return cls._StaticMultiNodeCall(node_list, "version", [])
964
965   @_RpcTimeout(_TMO_NORMAL)
966   def call_blockdev_create(self, node, bdev, size, owner, on_primary, info):
967     """Request creation of a given block device.
968
969     This is a single-node call.
970
971     """
972     return self._SingleNodeCall(node, "blockdev_create",
973                                 [bdev.ToDict(), size, owner, on_primary, info])
974
975   @_RpcTimeout(_TMO_SLOW)
976   def call_blockdev_wipe(self, node, bdev, offset, size):
977     """Request wipe at given offset with given size of a block device.
978
979     This is a single-node call.
980
981     """
982     return self._SingleNodeCall(node, "blockdev_wipe",
983                                 [bdev.ToDict(), offset, size])
984
985   @_RpcTimeout(_TMO_NORMAL)
986   def call_blockdev_remove(self, node, bdev):
987     """Request removal of a given block device.
988
989     This is a single-node call.
990
991     """
992     return self._SingleNodeCall(node, "blockdev_remove", [bdev.ToDict()])
993
994   @_RpcTimeout(_TMO_NORMAL)
995   def call_blockdev_rename(self, node, devlist):
996     """Request rename of the given block devices.
997
998     This is a single-node call.
999
1000     """
1001     return self._SingleNodeCall(node, "blockdev_rename",
1002                                 [[(d.ToDict(), uid) for d, uid in devlist]])
1003
1004   @_RpcTimeout(_TMO_NORMAL)
1005   def call_blockdev_pause_resume_sync(self, node, disks, pause):
1006     """Request a pause/resume of given block device.
1007
1008     This is a single-node call.
1009
1010     """
1011     return self._SingleNodeCall(node, "blockdev_pause_resume_sync",
1012                                 [[bdev.ToDict() for bdev in disks], pause])
1013
1014   @_RpcTimeout(_TMO_NORMAL)
1015   def call_blockdev_assemble(self, node, disk, owner, on_primary, idx):
1016     """Request assembling of a given block device.
1017
1018     This is a single-node call.
1019
1020     """
1021     return self._SingleNodeCall(node, "blockdev_assemble",
1022                                 [disk.ToDict(), owner, on_primary, idx])
1023
1024   @_RpcTimeout(_TMO_NORMAL)
1025   def call_blockdev_shutdown(self, node, disk):
1026     """Request shutdown of a given block device.
1027
1028     This is a single-node call.
1029
1030     """
1031     return self._SingleNodeCall(node, "blockdev_shutdown", [disk.ToDict()])
1032
1033   @_RpcTimeout(_TMO_NORMAL)
1034   def call_blockdev_addchildren(self, node, bdev, ndevs):
1035     """Request adding a list of children to a (mirroring) device.
1036
1037     This is a single-node call.
1038
1039     """
1040     return self._SingleNodeCall(node, "blockdev_addchildren",
1041                                 [bdev.ToDict(),
1042                                  [disk.ToDict() for disk in ndevs]])
1043
1044   @_RpcTimeout(_TMO_NORMAL)
1045   def call_blockdev_removechildren(self, node, bdev, ndevs):
1046     """Request removing a list of children from a (mirroring) device.
1047
1048     This is a single-node call.
1049
1050     """
1051     return self._SingleNodeCall(node, "blockdev_removechildren",
1052                                 [bdev.ToDict(),
1053                                  [disk.ToDict() for disk in ndevs]])
1054
1055   @_RpcTimeout(_TMO_NORMAL)
1056   def call_blockdev_getmirrorstatus(self, node, disks):
1057     """Request status of a (mirroring) device.
1058
1059     This is a single-node call.
1060
1061     """
1062     result = self._SingleNodeCall(node, "blockdev_getmirrorstatus",
1063                                   [dsk.ToDict() for dsk in disks])
1064     if not result.fail_msg:
1065       result.payload = [objects.BlockDevStatus.FromDict(i)
1066                         for i in result.payload]
1067     return result
1068
1069   @_RpcTimeout(_TMO_NORMAL)
1070   def call_blockdev_getmirrorstatus_multi(self, node_list, node_disks):
1071     """Request status of (mirroring) devices from multiple nodes.
1072
1073     This is a multi-node call.
1074
1075     """
1076     result = self._MultiNodeCall(node_list, "blockdev_getmirrorstatus_multi",
1077                                  [dict((name, [dsk.ToDict() for dsk in disks])
1078                                        for name, disks in node_disks.items())])
1079     for nres in result.values():
1080       if nres.fail_msg:
1081         continue
1082
1083       for idx, (success, status) in enumerate(nres.payload):
1084         if success:
1085           nres.payload[idx] = (success, objects.BlockDevStatus.FromDict(status))
1086
1087     return result
1088
1089   @_RpcTimeout(_TMO_NORMAL)
1090   def call_blockdev_find(self, node, disk):
1091     """Request identification of a given block device.
1092
1093     This is a single-node call.
1094
1095     """
1096     result = self._SingleNodeCall(node, "blockdev_find", [disk.ToDict()])
1097     if not result.fail_msg and result.payload is not None:
1098       result.payload = objects.BlockDevStatus.FromDict(result.payload)
1099     return result
1100
1101   @_RpcTimeout(_TMO_NORMAL)
1102   def call_blockdev_close(self, node, instance_name, disks):
1103     """Closes the given block devices.
1104
1105     This is a single-node call.
1106
1107     """
1108     params = [instance_name, [cf.ToDict() for cf in disks]]
1109     return self._SingleNodeCall(node, "blockdev_close", params)
1110
1111   @_RpcTimeout(_TMO_NORMAL)
1112   def call_blockdev_getsize(self, node, disks):
1113     """Returns the size of the given disks.
1114
1115     This is a single-node call.
1116
1117     """
1118     params = [[cf.ToDict() for cf in disks]]
1119     return self._SingleNodeCall(node, "blockdev_getsize", params)
1120
1121   @_RpcTimeout(_TMO_NORMAL)
1122   def call_drbd_disconnect_net(self, node_list, nodes_ip, disks):
1123     """Disconnects the network of the given drbd devices.
1124
1125     This is a multi-node call.
1126
1127     """
1128     return self._MultiNodeCall(node_list, "drbd_disconnect_net",
1129                                [nodes_ip, [cf.ToDict() for cf in disks]])
1130
1131   @_RpcTimeout(_TMO_NORMAL)
1132   def call_drbd_attach_net(self, node_list, nodes_ip,
1133                            disks, instance_name, multimaster):
1134     """Disconnects the given drbd devices.
1135
1136     This is a multi-node call.
1137
1138     """
1139     return self._MultiNodeCall(node_list, "drbd_attach_net",
1140                                [nodes_ip, [cf.ToDict() for cf in disks],
1141                                 instance_name, multimaster])
1142
1143   @_RpcTimeout(_TMO_SLOW)
1144   def call_drbd_wait_sync(self, node_list, nodes_ip, disks):
1145     """Waits for the synchronization of drbd devices is complete.
1146
1147     This is a multi-node call.
1148
1149     """
1150     return self._MultiNodeCall(node_list, "drbd_wait_sync",
1151                                [nodes_ip, [cf.ToDict() for cf in disks]])
1152
1153   @_RpcTimeout(_TMO_URGENT)
1154   def call_drbd_helper(self, node_list):
1155     """Gets drbd helper.
1156
1157     This is a multi-node call.
1158
1159     """
1160     return self._MultiNodeCall(node_list, "drbd_helper", [])
1161
1162   @classmethod
1163   @_RpcTimeout(_TMO_NORMAL)
1164   def call_upload_file(cls, node_list, file_name, address_list=None):
1165     """Upload a file.
1166
1167     The node will refuse the operation in case the file is not on the
1168     approved file list.
1169
1170     This is a multi-node call.
1171
1172     @type node_list: list
1173     @param node_list: the list of node names to upload to
1174     @type file_name: str
1175     @param file_name: the filename to upload
1176     @type address_list: list or None
1177     @keyword address_list: an optional list of node addresses, in order
1178         to optimize the RPC speed
1179
1180     """
1181     file_contents = utils.ReadFile(file_name)
1182     data = _Compress(file_contents)
1183     st = os.stat(file_name)
1184     getents = runtime.GetEnts()
1185     params = [file_name, data, st.st_mode, getents.LookupUid(st.st_uid),
1186               getents.LookupGid(st.st_gid), st.st_atime, st.st_mtime]
1187     return cls._StaticMultiNodeCall(node_list, "upload_file", params,
1188                                     address_list=address_list)
1189
1190   @classmethod
1191   @_RpcTimeout(_TMO_NORMAL)
1192   def call_write_ssconf_files(cls, node_list, values):
1193     """Write ssconf files.
1194
1195     This is a multi-node call.
1196
1197     """
1198     return cls._StaticMultiNodeCall(node_list, "write_ssconf_files", [values])
1199
1200   @_RpcTimeout(_TMO_NORMAL)
1201   def call_blockdev_grow(self, node, cf_bdev, amount, dryrun):
1202     """Request a snapshot of the given block device.
1203
1204     This is a single-node call.
1205
1206     """
1207     return self._SingleNodeCall(node, "blockdev_grow",
1208                                 [cf_bdev.ToDict(), amount, dryrun])
1209
1210   @_RpcTimeout(_TMO_1DAY)
1211   def call_blockdev_export(self, node, cf_bdev,
1212                            dest_node, dest_path, cluster_name):
1213     """Export a given disk to another node.
1214
1215     This is a single-node call.
1216
1217     """
1218     return self._SingleNodeCall(node, "blockdev_export",
1219                                 [cf_bdev.ToDict(), dest_node, dest_path,
1220                                  cluster_name])
1221
1222   @_RpcTimeout(_TMO_NORMAL)
1223   def call_blockdev_snapshot(self, node, cf_bdev):
1224     """Request a snapshot of the given block device.
1225
1226     This is a single-node call.
1227
1228     """
1229     return self._SingleNodeCall(node, "blockdev_snapshot", [cf_bdev.ToDict()])
1230
1231   @classmethod
1232   @_RpcTimeout(_TMO_NORMAL)
1233   def call_node_leave_cluster(cls, node, modify_ssh_setup):
1234     """Requests a node to clean the cluster information it has.
1235
1236     This will remove the configuration information from the ganeti data
1237     dir.
1238
1239     This is a single-node call.
1240
1241     """
1242     return cls._StaticSingleNodeCall(node, "node_leave_cluster",
1243                                      [modify_ssh_setup])
1244
1245   def call_test_delay(self, node_list, duration, read_timeout=None):
1246     """Sleep for a fixed time on given node(s).
1247
1248     This is a multi-node call.
1249
1250     """
1251     assert read_timeout is None
1252     return self.call_test_delay(node_list, duration,
1253                                 read_timeout=int(duration + 5))
1254
1255   @classmethod
1256   @_RpcTimeout(_TMO_URGENT)
1257   def call_jobqueue_update(cls, node_list, address_list, file_name, content):
1258     """Update job queue.
1259
1260     This is a multi-node call.
1261
1262     """
1263     return cls._StaticMultiNodeCall(node_list, "jobqueue_update",
1264                                     [file_name, _Compress(content)],
1265                                     address_list=address_list)
1266
1267   @classmethod
1268   @_RpcTimeout(_TMO_NORMAL)
1269   def call_jobqueue_purge(cls, node):
1270     """Purge job queue.
1271
1272     This is a single-node call.
1273
1274     """
1275     return cls._StaticSingleNodeCall(node, "jobqueue_purge", [])
1276
1277   @classmethod
1278   @_RpcTimeout(_TMO_URGENT)
1279   def call_jobqueue_rename(cls, node_list, address_list, rename):
1280     """Rename a job queue file.
1281
1282     This is a multi-node call.
1283
1284     """
1285     return cls._StaticMultiNodeCall(node_list, "jobqueue_rename", rename,
1286                                     address_list=address_list)
1287
1288   @_RpcTimeout(_TMO_NORMAL)
1289   def call_hypervisor_validate_params(self, node_list, hvname, hvparams):
1290     """Validate the hypervisor params.
1291
1292     This is a multi-node call.
1293
1294     @type node_list: list
1295     @param node_list: the list of nodes to query
1296     @type hvname: string
1297     @param hvname: the hypervisor name
1298     @type hvparams: dict
1299     @param hvparams: the hypervisor parameters to be validated
1300
1301     """
1302     cluster = self._cfg.GetClusterInfo()
1303     hv_full = objects.FillDict(cluster.hvparams.get(hvname, {}), hvparams)
1304     return self._MultiNodeCall(node_list, "hypervisor_validate_params",
1305                                [hvname, hv_full])
1306
1307   @_RpcTimeout(_TMO_NORMAL)
1308   def call_import_start(self, node, opts, instance, component,
1309                         dest, dest_args):
1310     """Starts a listener for an import.
1311
1312     This is a single-node call.
1313
1314     @type node: string
1315     @param node: Node name
1316     @type instance: C{objects.Instance}
1317     @param instance: Instance object
1318     @type component: string
1319     @param component: which part of the instance is being imported
1320
1321     """
1322     return self._SingleNodeCall(node, "import_start",
1323                                 [opts.ToDict(),
1324                                  self._InstDict(instance), component, dest,
1325                                  _EncodeImportExportIO(dest, dest_args)])
1326
1327   @_RpcTimeout(_TMO_NORMAL)
1328   def call_export_start(self, node, opts, host, port,
1329                         instance, component, source, source_args):
1330     """Starts an export daemon.
1331
1332     This is a single-node call.
1333
1334     @type node: string
1335     @param node: Node name
1336     @type instance: C{objects.Instance}
1337     @param instance: Instance object
1338     @type component: string
1339     @param component: which part of the instance is being imported
1340
1341     """
1342     return self._SingleNodeCall(node, "export_start",
1343                                 [opts.ToDict(), host, port,
1344                                  self._InstDict(instance),
1345                                  component, source,
1346                                  _EncodeImportExportIO(source, source_args)])