4 # Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Inter-node RPC library.
26 # pylint: disable=C0103,R0201,R0904
27 # C0103: Invalid name, since call_ are not valid
28 # R0201: Method could be a function, we keep all rpcs instance methods
29 # as not to change them back and forth between static/instance methods
30 # if they need to start using instance attributes
31 # R0904: Too many public methods
40 from ganeti import utils
41 from ganeti import objects
42 from ganeti import http
43 from ganeti import serializer
44 from ganeti import constants
45 from ganeti import errors
46 from ganeti import netutils
47 from ganeti import ssconf
48 from ganeti import runtime
49 from ganeti import compat
50 from ganeti import rpc_defs
52 # Special module generated at build time
53 from ganeti import _generated_rpc
55 # pylint has a bug here, doesn't see this import
56 import ganeti.http.client # pylint: disable=W0611
59 # Timeout for connecting to nodes (seconds)
60 _RPC_CONNECT_TIMEOUT = 5
62 _RPC_CLIENT_HEADERS = [
63 "Content-type: %s" % http.HTTP_APP_JSON,
67 # Various time constants for the timeout table
68 _TMO_URGENT = 60 # one minute
69 _TMO_FAST = 5 * 60 # five minutes
70 _TMO_NORMAL = 15 * 60 # 15 minutes
71 _TMO_SLOW = 3600 # one hour
75 #: Special value to describe an offline host
80 """Initializes the module-global HTTP client manager.
82 Must be called before using any RPC function and while exactly one thread is
86 # curl_global_init(3) and curl_global_cleanup(3) must be called with only
87 # one thread running. This check is just a safety measure -- it doesn't
89 assert threading.activeCount() == 1, \
90 "Found more than one active thread when initializing pycURL"
92 logging.info("Using PycURL %s", pycurl.version)
94 pycurl.global_init(pycurl.GLOBAL_ALL)
98 """Stops the module-global HTTP client manager.
100 Must be called before quitting the program and while exactly one thread is
104 pycurl.global_cleanup()
107 def _ConfigRpcCurl(curl):
108 noded_cert = str(constants.NODED_CERT_FILE)
110 curl.setopt(pycurl.FOLLOWLOCATION, False)
111 curl.setopt(pycurl.CAINFO, noded_cert)
112 curl.setopt(pycurl.SSL_VERIFYHOST, 0)
113 curl.setopt(pycurl.SSL_VERIFYPEER, True)
114 curl.setopt(pycurl.SSLCERTTYPE, "PEM")
115 curl.setopt(pycurl.SSLCERT, noded_cert)
116 curl.setopt(pycurl.SSLKEYTYPE, "PEM")
117 curl.setopt(pycurl.SSLKEY, noded_cert)
118 curl.setopt(pycurl.CONNECTTIMEOUT, _RPC_CONNECT_TIMEOUT)
122 """RPC-wrapper decorator.
124 When applied to a function, it runs it with the RPC system
125 initialized, and it shutsdown the system afterwards. This means the
126 function must be called without RPC being initialized.
129 def wrapper(*args, **kwargs):
132 return fn(*args, **kwargs)
139 """Compresses a string for transport over RPC.
141 Small amounts of data are not compressed.
146 @return: Encoded data to send
149 # Small amounts of data are not compressed
151 return (constants.RPC_ENCODING_NONE, data)
153 # Compress with zlib and encode in base64
154 return (constants.RPC_ENCODING_ZLIB_BASE64,
155 base64.b64encode(zlib.compress(data, 3)))
158 class RpcResult(object):
161 This class holds an RPC result. It is needed since in multi-node
162 calls we can't raise an exception just because one one out of many
163 failed, and therefore we use this class to encapsulate the result.
165 @ivar data: the data payload, for successful results, or None
166 @ivar call: the name of the RPC call
167 @ivar node: the name of the node to which we made the call
168 @ivar offline: whether the operation failed because the node was
169 offline, as opposed to actual failure; offline=True will always
170 imply failed=True, in order to allow simpler checking if
171 the user doesn't care about the exact failure mode
172 @ivar fail_msg: the error message if the call failed
175 def __init__(self, data=None, failed=False, offline=False,
176 call=None, node=None):
177 self.offline = offline
182 self.fail_msg = "Node is marked offline"
183 self.data = self.payload = None
185 self.fail_msg = self._EnsureErr(data)
186 self.data = self.payload = None
189 if not isinstance(self.data, (tuple, list)):
190 self.fail_msg = ("RPC layer error: invalid result type (%s)" %
194 self.fail_msg = ("RPC layer error: invalid result length (%d), "
195 "expected 2" % len(self.data))
197 elif not self.data[0]:
198 self.fail_msg = self._EnsureErr(self.data[1])
203 self.payload = data[1]
205 for attr_name in ["call", "data", "fail_msg",
206 "node", "offline", "payload"]:
207 assert hasattr(self, attr_name), "Missing attribute %s" % attr_name
211 """Helper to ensure we return a 'True' value for error."""
215 return "No error information"
217 def Raise(self, msg, prereq=False, ecode=None):
218 """If the result has failed, raise an OpExecError.
220 This is used so that LU code doesn't have to check for each
221 result, but instead can call this function.
224 if not self.fail_msg:
227 if not msg: # one could pass None for default message
228 msg = ("Call '%s' to node '%s' has failed: %s" %
229 (self.call, self.node, self.fail_msg))
231 msg = "%s: %s" % (msg, self.fail_msg)
233 ec = errors.OpPrereqError
235 ec = errors.OpExecError
236 if ecode is not None:
240 raise ec(*args) # pylint: disable=W0142
243 def _SsconfResolver(node_list,
244 ssc=ssconf.SimpleStore,
245 nslookup_fn=netutils.Hostname.GetIP):
246 """Return addresses for given node names.
248 @type node_list: list
249 @param node_list: List of node names
251 @param ssc: SimpleStore class that is used to obtain node->ip mappings
252 @type nslookup_fn: callable
253 @param nslookup_fn: function use to do NS lookup
254 @rtype: list of tuple; (string, string)
255 @return: List of tuples containing node name and IP address
259 iplist = ss.GetNodePrimaryIPList()
260 family = ss.GetPrimaryIPFamily()
261 ipmap = dict(entry.split() for entry in iplist)
264 for node in node_list:
267 ip = nslookup_fn(node, family=family)
268 result.append((node, ip))
273 class _StaticResolver:
274 def __init__(self, addresses):
275 """Initializes this class.
278 self._addresses = addresses
280 def __call__(self, hosts):
281 """Returns static addresses for hosts.
284 assert len(hosts) == len(self._addresses)
285 return zip(hosts, self._addresses)
288 def _CheckConfigNode(name, node):
289 """Checks if a node is online.
292 @param name: Node name
293 @type node: L{objects.Node} or None
294 @param node: Node object
298 # Depend on DNS for name resolution
307 def _NodeConfigResolver(single_node_fn, all_nodes_fn, hosts):
308 """Calculate node addresses using configuration.
311 # Special case for single-host lookups
314 return [_CheckConfigNode(name, single_node_fn(name))]
316 all_nodes = all_nodes_fn()
317 return [_CheckConfigNode(name, all_nodes.get(name, None))
322 def __init__(self, resolver, port, lock_monitor_cb=None):
323 """Initializes this class.
325 @param resolver: callable accepting a list of hostnames, returning a list
326 of tuples containing name and IP address (IP address can be the name or
327 the special value L{_OFFLINE} to mark offline machines)
329 @param port: TCP port
330 @param lock_monitor_cb: Callable for registering with lock monitor
333 self._resolver = resolver
335 self._lock_monitor_cb = lock_monitor_cb
338 def _PrepareRequests(hosts, port, procedure, body, read_timeout):
339 """Prepares requests by sorting offline hosts into separate list.
345 for (name, ip) in hosts:
347 # Node is marked as offline
348 results[name] = RpcResult(node=name, offline=True, call=procedure)
351 http.client.HttpClientRequest(str(ip), port,
352 http.HTTP_PUT, str("/%s" % procedure),
353 headers=_RPC_CLIENT_HEADERS,
355 read_timeout=read_timeout,
356 nicename="%s/%s" % (name, procedure),
357 curl_config_fn=_ConfigRpcCurl)
359 return (results, requests)
362 def _CombineResults(results, requests, procedure):
363 """Combines pre-computed results for offline hosts with actual call results.
366 for name, req in requests.items():
367 if req.success and req.resp_status_code == http.HTTP_OK:
368 host_result = RpcResult(data=serializer.LoadJson(req.resp_body),
369 node=name, call=procedure)
371 # TODO: Better error reporting
377 logging.error("RPC error in %s on node %s: %s", procedure, name, msg)
378 host_result = RpcResult(data=msg, failed=True, node=name,
381 results[name] = host_result
385 def __call__(self, hosts, procedure, body, read_timeout=None,
386 _req_process_fn=http.client.ProcessRequests):
387 """Makes an RPC request to a number of nodes.
389 @type hosts: sequence
390 @param hosts: Hostnames
391 @type procedure: string
392 @param procedure: Request path
394 @param body: Request body
395 @type read_timeout: int or None
396 @param read_timeout: Read timeout for request
399 assert read_timeout is not None, \
400 "Missing RPC read timeout for procedure '%s'" % procedure
402 (results, requests) = \
403 self._PrepareRequests(self._resolver(hosts), self._port, procedure,
404 str(body), read_timeout)
406 _req_process_fn(requests.values(), lock_monitor_cb=self._lock_monitor_cb)
408 assert not frozenset(results).intersection(requests)
410 return self._CombineResults(results, requests, procedure)
413 class _RpcClientBase:
414 def __init__(self, resolver, encoder_fn, lock_monitor_cb=None):
415 """Initializes this class.
418 self._proc = _RpcProcessor(resolver,
419 netutils.GetDaemonPort(constants.NODED),
420 lock_monitor_cb=lock_monitor_cb)
421 self._encoder = compat.partial(self._EncodeArg, encoder_fn)
424 def _EncodeArg(encoder_fn, (argkind, value)):
431 return encoder_fn(argkind)(value)
433 def _Call(self, cdef, node_list, timeout, args):
434 """Entry point for automatically generated RPC wrappers.
437 (procedure, _, _, argdefs, _, _) = cdef
439 body = serializer.DumpJson(map(self._encoder,
440 zip(map(compat.snd, argdefs), args)),
443 return self._proc(node_list, procedure, body, read_timeout=timeout)
446 def _ObjectToDict(value):
447 """Converts an object to a dictionary.
449 @note: See L{objects}.
452 return value.ToDict()
455 def _ObjectListToDict(value):
456 """Converts a list of L{objects} to dictionaries.
459 return map(_ObjectToDict, value)
462 def _EncodeNodeToDiskDict(value):
463 """Encodes a dictionary with node name as key and disk objects as values.
466 return dict((name, _ObjectListToDict(disks))
467 for name, disks in value.items())
470 def _PrepareFileUpload(filename):
471 """Loads a file and prepares it for an upload to nodes.
474 data = _Compress(utils.ReadFile(filename))
475 st = os.stat(filename)
476 getents = runtime.GetEnts()
477 return [filename, data, st.st_mode, getents.LookupUid(st.st_uid),
478 getents.LookupGid(st.st_gid), st.st_atime, st.st_mtime]
481 def _PrepareFinalizeExportDisks(snap_disks):
482 """Encodes disks for finalizing export.
487 for disk in snap_disks:
488 if isinstance(disk, bool):
489 flat_disks.append(disk)
491 flat_disks.append(disk.ToDict())
496 def _EncodeImportExportIO((ieio, ieioargs)):
497 """Encodes import/export I/O information.
500 if ieio == constants.IEIO_RAW_DISK:
501 assert len(ieioargs) == 1
502 return (ieio, (ieioargs[0].ToDict(), ))
504 if ieio == constants.IEIO_SCRIPT:
505 assert len(ieioargs) == 2
506 return (ieio, (ieioargs[0].ToDict(), ieioargs[1]))
508 return (ieio, ieioargs)
511 def _EncodeBlockdevRename(value):
512 """Encodes information for renaming block devices.
515 return [(d.ToDict(), uid) for d, uid in value]
520 rpc_defs.ED_OBJECT_DICT: _ObjectToDict,
521 rpc_defs.ED_OBJECT_DICT_LIST: _ObjectListToDict,
522 rpc_defs.ED_NODE_TO_DISK_DICT: _EncodeNodeToDiskDict,
523 rpc_defs.ED_FILE_DETAILS: _PrepareFileUpload,
524 rpc_defs.ED_COMPRESS: _Compress,
525 rpc_defs.ED_FINALIZE_EXPORT_DISKS: _PrepareFinalizeExportDisks,
526 rpc_defs.ED_IMPEXP_IO: _EncodeImportExportIO,
527 rpc_defs.ED_BLOCKDEV_RENAME: _EncodeBlockdevRename,
531 class RpcRunner(_RpcClientBase,
532 _generated_rpc.RpcClientDefault,
533 _generated_rpc.RpcClientBootstrap,
534 _generated_rpc.RpcClientConfig):
538 def __init__(self, context):
539 """Initialized the RPC runner.
541 @type context: C{masterd.GanetiContext}
542 @param context: Ganeti context
545 self._cfg = context.cfg
547 encoders = _ENCODERS.copy()
549 # Add encoders requiring configuration object
551 rpc_defs.ED_INST_DICT: self._InstDict,
552 rpc_defs.ED_INST_DICT_HVP_BEP: self._InstDictHvpBep,
553 rpc_defs.ED_INST_DICT_OSP: self._InstDictOsp,
556 # Resolver using configuration
557 resolver = compat.partial(_NodeConfigResolver, self._cfg.GetNodeInfo,
558 self._cfg.GetAllNodesInfo)
560 # Pylint doesn't recognize multiple inheritance properly, see
561 # <http://www.logilab.org/ticket/36586> and
562 # <http://www.logilab.org/ticket/35642>
563 # pylint: disable=W0233
564 _RpcClientBase.__init__(self, resolver, encoders.get,
565 lock_monitor_cb=context.glm.AddToLockMonitor)
566 _generated_rpc.RpcClientConfig.__init__(self)
567 _generated_rpc.RpcClientBootstrap.__init__(self)
568 _generated_rpc.RpcClientDefault.__init__(self)
570 def _InstDict(self, instance, hvp=None, bep=None, osp=None):
571 """Convert the given instance to a dict.
573 This is done via the instance's ToDict() method and additionally
574 we fill the hvparams with the cluster defaults.
576 @type instance: L{objects.Instance}
577 @param instance: an Instance object
578 @type hvp: dict or None
579 @param hvp: a dictionary with overridden hypervisor parameters
580 @type bep: dict or None
581 @param bep: a dictionary with overridden backend parameters
582 @type osp: dict or None
583 @param osp: a dictionary with overridden os parameters
585 @return: the instance dict, with the hvparams filled with the
589 idict = instance.ToDict()
590 cluster = self._cfg.GetClusterInfo()
591 idict["hvparams"] = cluster.FillHV(instance)
593 idict["hvparams"].update(hvp)
594 idict["beparams"] = cluster.FillBE(instance)
596 idict["beparams"].update(bep)
597 idict["osparams"] = cluster.SimpleFillOS(instance.os, instance.osparams)
599 idict["osparams"].update(osp)
600 for nic in idict["nics"]:
601 nic['nicparams'] = objects.FillDict(
602 cluster.nicparams[constants.PP_DEFAULT],
606 def _InstDictHvpBep(self, (instance, hvp, bep)):
607 """Wrapper for L{_InstDict}.
610 return self._InstDict(instance, hvp=hvp, bep=bep)
612 def _InstDictOsp(self, (instance, osparams)):
613 """Wrapper for L{_InstDict}.
616 return self._InstDict(instance, osp=osparams)
619 def _MigrationStatusPostProc(result):
620 if not result.fail_msg and result.payload is not None:
621 result.payload = objects.MigrationStatus.FromDict(result.payload)
625 def _BlockdevFindPostProc(result):
626 if not result.fail_msg and result.payload is not None:
627 result.payload = objects.BlockDevStatus.FromDict(result.payload)
631 def _BlockdevGetMirrorStatusPostProc(result):
632 if not result.fail_msg:
633 result.payload = [objects.BlockDevStatus.FromDict(i)
634 for i in result.payload]
638 def _BlockdevGetMirrorStatusMultiPostProc(result):
639 for nres in result.values():
643 for idx, (success, status) in enumerate(nres.payload):
645 nres.payload[idx] = (success, objects.BlockDevStatus.FromDict(status))
650 def _OsGetPostProc(result):
651 if not result.fail_msg and isinstance(result.payload, dict):
652 result.payload = objects.OS.FromDict(result.payload)
656 def _ImpExpStatusPostProc(result):
657 """Post-processor for import/export status.
659 @rtype: Payload containing list of L{objects.ImportExportStatus} instances
660 @return: Returns a list of the state of each named import/export or None if
661 a status couldn't be retrieved
664 if not result.fail_msg:
667 for i in result.payload:
671 decoded.append(objects.ImportExportStatus.FromDict(i))
673 result.payload = decoded
681 def call_test_delay(self, node_list, duration): # pylint: disable=W0221
682 """Sleep for a fixed time on given node(s).
684 This is a multi-node call.
687 # TODO: Use callable timeout calculation
688 return _generated_rpc.RpcClientDefault.call_test_delay(self,
689 node_list, duration, read_timeout=int(duration + 5))
692 class JobQueueRunner(_RpcClientBase, _generated_rpc.RpcClientJobQueue):
693 """RPC wrappers for job queue.
696 def __init__(self, context, address_list):
697 """Initializes this class.
700 if address_list is None:
701 resolver = _SsconfResolver
703 # Caller provided an address list
704 resolver = _StaticResolver(address_list)
706 _RpcClientBase.__init__(self, resolver, _ENCODERS.get,
707 lock_monitor_cb=context.glm.AddToLockMonitor)
708 _generated_rpc.RpcClientJobQueue.__init__(self)
711 class BootstrapRunner(_RpcClientBase, _generated_rpc.RpcClientBootstrap):
712 """RPC wrappers for bootstrapping.
716 """Initializes this class.
719 _RpcClientBase.__init__(self, _SsconfResolver, _ENCODERS.get)
720 _generated_rpc.RpcClientBootstrap.__init__(self)
723 class ConfigRunner(_RpcClientBase, _generated_rpc.RpcClientConfig):
724 """RPC wrappers for L{config}.
727 def __init__(self, context, address_list):
728 """Initializes this class.
732 lock_monitor_cb = context.glm.AddToLockMonitor
734 lock_monitor_cb = None
736 if address_list is None:
737 resolver = _SsconfResolver
739 # Caller provided an address list
740 resolver = _StaticResolver(address_list)
742 _RpcClientBase.__init__(self, resolver, _ENCODERS.get,
743 lock_monitor_cb=lock_monitor_cb)
744 _generated_rpc.RpcClientConfig.__init__(self)