4 # Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Inter-node RPC library.
26 # pylint: disable=C0103,R0201,R0904
27 # C0103: Invalid name, since call_ are not valid
28 # R0201: Method could be a function, we keep all rpcs instance methods
29 # as not to change them back and forth between static/instance methods
30 # if they need to start using instance attributes
31 # R0904: Too many public methods
39 from ganeti import utils
40 from ganeti import objects
41 from ganeti import http
42 from ganeti import serializer
43 from ganeti import constants
44 from ganeti import errors
45 from ganeti import netutils
46 from ganeti import ssconf
47 from ganeti import runtime
48 from ganeti import compat
49 from ganeti import rpc_defs
51 # Special module generated at build time
52 from ganeti import _generated_rpc
54 # pylint has a bug here, doesn't see this import
55 import ganeti.http.client # pylint: disable=W0611
58 # Timeout for connecting to nodes (seconds)
59 _RPC_CONNECT_TIMEOUT = 5
61 _RPC_CLIENT_HEADERS = [
62 "Content-type: %s" % http.HTTP_APP_JSON,
66 # Various time constants for the timeout table
67 _TMO_URGENT = 60 # one minute
68 _TMO_FAST = 5 * 60 # five minutes
69 _TMO_NORMAL = 15 * 60 # 15 minutes
70 _TMO_SLOW = 3600 # one hour
74 #: Special value to describe an offline host
79 """Initializes the module-global HTTP client manager.
81 Must be called before using any RPC function and while exactly one thread is
85 # curl_global_init(3) and curl_global_cleanup(3) must be called with only
86 # one thread running. This check is just a safety measure -- it doesn't
88 assert threading.activeCount() == 1, \
89 "Found more than one active thread when initializing pycURL"
91 logging.info("Using PycURL %s", pycurl.version)
93 pycurl.global_init(pycurl.GLOBAL_ALL)
97 """Stops the module-global HTTP client manager.
99 Must be called before quitting the program and while exactly one thread is
103 pycurl.global_cleanup()
106 def _ConfigRpcCurl(curl):
107 noded_cert = str(constants.NODED_CERT_FILE)
109 curl.setopt(pycurl.FOLLOWLOCATION, False)
110 curl.setopt(pycurl.CAINFO, noded_cert)
111 curl.setopt(pycurl.SSL_VERIFYHOST, 0)
112 curl.setopt(pycurl.SSL_VERIFYPEER, True)
113 curl.setopt(pycurl.SSLCERTTYPE, "PEM")
114 curl.setopt(pycurl.SSLCERT, noded_cert)
115 curl.setopt(pycurl.SSLKEYTYPE, "PEM")
116 curl.setopt(pycurl.SSLKEY, noded_cert)
117 curl.setopt(pycurl.CONNECTTIMEOUT, _RPC_CONNECT_TIMEOUT)
121 """RPC-wrapper decorator.
123 When applied to a function, it runs it with the RPC system
124 initialized, and it shutsdown the system afterwards. This means the
125 function must be called without RPC being initialized.
128 def wrapper(*args, **kwargs):
131 return fn(*args, **kwargs)
138 """Compresses a string for transport over RPC.
140 Small amounts of data are not compressed.
145 @return: Encoded data to send
148 # Small amounts of data are not compressed
150 return (constants.RPC_ENCODING_NONE, data)
152 # Compress with zlib and encode in base64
153 return (constants.RPC_ENCODING_ZLIB_BASE64,
154 base64.b64encode(zlib.compress(data, 3)))
157 class RpcResult(object):
160 This class holds an RPC result. It is needed since in multi-node
161 calls we can't raise an exception just because one one out of many
162 failed, and therefore we use this class to encapsulate the result.
164 @ivar data: the data payload, for successful results, or None
165 @ivar call: the name of the RPC call
166 @ivar node: the name of the node to which we made the call
167 @ivar offline: whether the operation failed because the node was
168 offline, as opposed to actual failure; offline=True will always
169 imply failed=True, in order to allow simpler checking if
170 the user doesn't care about the exact failure mode
171 @ivar fail_msg: the error message if the call failed
174 def __init__(self, data=None, failed=False, offline=False,
175 call=None, node=None):
176 self.offline = offline
181 self.fail_msg = "Node is marked offline"
182 self.data = self.payload = None
184 self.fail_msg = self._EnsureErr(data)
185 self.data = self.payload = None
188 if not isinstance(self.data, (tuple, list)):
189 self.fail_msg = ("RPC layer error: invalid result type (%s)" %
193 self.fail_msg = ("RPC layer error: invalid result length (%d), "
194 "expected 2" % len(self.data))
196 elif not self.data[0]:
197 self.fail_msg = self._EnsureErr(self.data[1])
202 self.payload = data[1]
204 for attr_name in ["call", "data", "fail_msg",
205 "node", "offline", "payload"]:
206 assert hasattr(self, attr_name), "Missing attribute %s" % attr_name
210 """Helper to ensure we return a 'True' value for error."""
214 return "No error information"
216 def Raise(self, msg, prereq=False, ecode=None):
217 """If the result has failed, raise an OpExecError.
219 This is used so that LU code doesn't have to check for each
220 result, but instead can call this function.
223 if not self.fail_msg:
226 if not msg: # one could pass None for default message
227 msg = ("Call '%s' to node '%s' has failed: %s" %
228 (self.call, self.node, self.fail_msg))
230 msg = "%s: %s" % (msg, self.fail_msg)
232 ec = errors.OpPrereqError
234 ec = errors.OpExecError
235 if ecode is not None:
239 raise ec(*args) # pylint: disable=W0142
242 def _SsconfResolver(node_list, _,
243 ssc=ssconf.SimpleStore,
244 nslookup_fn=netutils.Hostname.GetIP):
245 """Return addresses for given node names.
247 @type node_list: list
248 @param node_list: List of node names
250 @param ssc: SimpleStore class that is used to obtain node->ip mappings
251 @type nslookup_fn: callable
252 @param nslookup_fn: function use to do NS lookup
253 @rtype: list of tuple; (string, string)
254 @return: List of tuples containing node name and IP address
258 iplist = ss.GetNodePrimaryIPList()
259 family = ss.GetPrimaryIPFamily()
260 ipmap = dict(entry.split() for entry in iplist)
263 for node in node_list:
266 ip = nslookup_fn(node, family=family)
267 result.append((node, ip))
272 class _StaticResolver:
273 def __init__(self, addresses):
274 """Initializes this class.
277 self._addresses = addresses
279 def __call__(self, hosts, _):
280 """Returns static addresses for hosts.
283 assert len(hosts) == len(self._addresses)
284 return zip(hosts, self._addresses)
287 def _CheckConfigNode(name, node, accept_offline_node):
288 """Checks if a node is online.
291 @param name: Node name
292 @type node: L{objects.Node} or None
293 @param node: Node object
297 # Depend on DNS for name resolution
299 elif node.offline and not accept_offline_node:
306 def _NodeConfigResolver(single_node_fn, all_nodes_fn, hosts, opts):
307 """Calculate node addresses using configuration.
310 accept_offline_node = (opts is rpc_defs.ACCEPT_OFFLINE_NODE)
312 assert accept_offline_node or opts is None, "Unknown option"
314 # Special case for single-host lookups
317 return [_CheckConfigNode(name, single_node_fn(name), accept_offline_node)]
319 all_nodes = all_nodes_fn()
320 return [_CheckConfigNode(name, all_nodes.get(name, None),
326 def __init__(self, resolver, port, lock_monitor_cb=None):
327 """Initializes this class.
329 @param resolver: callable accepting a list of hostnames, returning a list
330 of tuples containing name and IP address (IP address can be the name or
331 the special value L{_OFFLINE} to mark offline machines)
333 @param port: TCP port
334 @param lock_monitor_cb: Callable for registering with lock monitor
337 self._resolver = resolver
339 self._lock_monitor_cb = lock_monitor_cb
342 def _PrepareRequests(hosts, port, procedure, body, read_timeout):
343 """Prepares requests by sorting offline hosts into separate list.
346 @param body: a dictionary with per-host body data
352 assert isinstance(body, dict)
353 assert len(body) == len(hosts)
354 assert compat.all(isinstance(v, str) for v in body.values())
355 assert frozenset(map(compat.fst, hosts)) == frozenset(body.keys()), \
356 "%s != %s" % (hosts, body.keys())
358 for (name, ip) in hosts:
360 # Node is marked as offline
361 results[name] = RpcResult(node=name, offline=True, call=procedure)
364 http.client.HttpClientRequest(str(ip), port,
365 http.HTTP_POST, str("/%s" % procedure),
366 headers=_RPC_CLIENT_HEADERS,
367 post_data=body[name],
368 read_timeout=read_timeout,
369 nicename="%s/%s" % (name, procedure),
370 curl_config_fn=_ConfigRpcCurl)
372 return (results, requests)
375 def _CombineResults(results, requests, procedure):
376 """Combines pre-computed results for offline hosts with actual call results.
379 for name, req in requests.items():
380 if req.success and req.resp_status_code == http.HTTP_OK:
381 host_result = RpcResult(data=serializer.LoadJson(req.resp_body),
382 node=name, call=procedure)
384 # TODO: Better error reporting
390 logging.error("RPC error in %s on node %s: %s", procedure, name, msg)
391 host_result = RpcResult(data=msg, failed=True, node=name,
394 results[name] = host_result
398 def __call__(self, hosts, procedure, body, read_timeout, resolver_opts,
399 _req_process_fn=None):
400 """Makes an RPC request to a number of nodes.
402 @type hosts: sequence
403 @param hosts: Hostnames
404 @type procedure: string
405 @param procedure: Request path
406 @type body: dictionary
407 @param body: dictionary with request bodies per host
408 @type read_timeout: int or None
409 @param read_timeout: Read timeout for request
412 assert read_timeout is not None, \
413 "Missing RPC read timeout for procedure '%s'" % procedure
415 if _req_process_fn is None:
416 _req_process_fn = http.client.ProcessRequests
418 (results, requests) = \
419 self._PrepareRequests(self._resolver(hosts, resolver_opts), self._port,
420 procedure, body, read_timeout)
422 _req_process_fn(requests.values(), lock_monitor_cb=self._lock_monitor_cb)
424 assert not frozenset(results).intersection(requests)
426 return self._CombineResults(results, requests, procedure)
429 class _RpcClientBase:
430 def __init__(self, resolver, encoder_fn, lock_monitor_cb=None,
431 _req_process_fn=None):
432 """Initializes this class.
435 proc = _RpcProcessor(resolver,
436 netutils.GetDaemonPort(constants.NODED),
437 lock_monitor_cb=lock_monitor_cb)
438 self._proc = compat.partial(proc, _req_process_fn=_req_process_fn)
439 self._encoder = compat.partial(self._EncodeArg, encoder_fn)
442 def _EncodeArg(encoder_fn, (argkind, value)):
449 return encoder_fn(argkind)(value)
451 def _Call(self, cdef, node_list, args):
452 """Entry point for automatically generated RPC wrappers.
455 (procedure, _, resolver_opts, timeout, argdefs,
456 prep_fn, postproc_fn, _) = cdef
458 if callable(timeout):
459 read_timeout = timeout(args)
461 read_timeout = timeout
463 if callable(resolver_opts):
464 req_resolver_opts = resolver_opts(args)
466 req_resolver_opts = resolver_opts
468 if len(args) != len(argdefs):
469 raise errors.ProgrammerError("Number of passed arguments doesn't match")
471 enc_args = map(self._encoder, zip(map(compat.snd, argdefs), args))
473 # for a no-op prep_fn, we serialise the body once, and then we
474 # reuse it in the dictionary values
475 body = serializer.DumpJson(enc_args)
476 pnbody = dict((n, body) for n in node_list)
478 # for a custom prep_fn, we pass the encoded arguments and the
479 # node name to the prep_fn, and we serialise its return value
480 assert callable(prep_fn)
481 pnbody = dict((n, serializer.DumpJson(prep_fn(n, enc_args)))
484 result = self._proc(node_list, procedure, pnbody, read_timeout,
488 return dict(map(lambda (key, value): (key, postproc_fn(value)),
494 def _ObjectToDict(value):
495 """Converts an object to a dictionary.
497 @note: See L{objects}.
500 return value.ToDict()
503 def _ObjectListToDict(value):
504 """Converts a list of L{objects} to dictionaries.
507 return map(_ObjectToDict, value)
510 def _EncodeNodeToDiskDict(value):
511 """Encodes a dictionary with node name as key and disk objects as values.
514 return dict((name, _ObjectListToDict(disks))
515 for name, disks in value.items())
518 def _PrepareFileUpload(getents_fn, filename):
519 """Loads a file and prepares it for an upload to nodes.
522 statcb = utils.FileStatHelper()
523 data = _Compress(utils.ReadFile(filename, preread=statcb))
526 if getents_fn is None:
527 getents_fn = runtime.GetEnts
529 getents = getents_fn()
531 return [filename, data, st.st_mode, getents.LookupUid(st.st_uid),
532 getents.LookupGid(st.st_gid), st.st_atime, st.st_mtime]
535 def _PrepareFinalizeExportDisks(snap_disks):
536 """Encodes disks for finalizing export.
541 for disk in snap_disks:
542 if isinstance(disk, bool):
543 flat_disks.append(disk)
545 flat_disks.append(disk.ToDict())
550 def _EncodeImportExportIO((ieio, ieioargs)):
551 """Encodes import/export I/O information.
554 if ieio == constants.IEIO_RAW_DISK:
555 assert len(ieioargs) == 1
556 return (ieio, (ieioargs[0].ToDict(), ))
558 if ieio == constants.IEIO_SCRIPT:
559 assert len(ieioargs) == 2
560 return (ieio, (ieioargs[0].ToDict(), ieioargs[1]))
562 return (ieio, ieioargs)
565 def _EncodeBlockdevRename(value):
566 """Encodes information for renaming block devices.
569 return [(d.ToDict(), uid) for d, uid in value]
574 rpc_defs.ED_OBJECT_DICT: _ObjectToDict,
575 rpc_defs.ED_OBJECT_DICT_LIST: _ObjectListToDict,
576 rpc_defs.ED_NODE_TO_DISK_DICT: _EncodeNodeToDiskDict,
577 rpc_defs.ED_COMPRESS: _Compress,
578 rpc_defs.ED_FINALIZE_EXPORT_DISKS: _PrepareFinalizeExportDisks,
579 rpc_defs.ED_IMPEXP_IO: _EncodeImportExportIO,
580 rpc_defs.ED_BLOCKDEV_RENAME: _EncodeBlockdevRename,
584 class RpcRunner(_RpcClientBase,
585 _generated_rpc.RpcClientDefault,
586 _generated_rpc.RpcClientBootstrap,
587 _generated_rpc.RpcClientConfig):
591 def __init__(self, cfg, lock_monitor_cb, _req_process_fn=None, _getents=None):
592 """Initialized the RPC runner.
594 @type cfg: L{config.ConfigWriter}
595 @param cfg: Configuration
596 @type lock_monitor_cb: callable
597 @param lock_monitor_cb: Lock monitor callback
602 encoders = _ENCODERS.copy()
605 # Encoders requiring configuration object
606 rpc_defs.ED_INST_DICT: self._InstDict,
607 rpc_defs.ED_INST_DICT_HVP_BEP: self._InstDictHvpBep,
608 rpc_defs.ED_INST_DICT_OSP: self._InstDictOsp,
610 # Encoders with special requirements
611 rpc_defs.ED_FILE_DETAILS: compat.partial(_PrepareFileUpload, _getents),
614 # Resolver using configuration
615 resolver = compat.partial(_NodeConfigResolver, cfg.GetNodeInfo,
618 # Pylint doesn't recognize multiple inheritance properly, see
619 # <http://www.logilab.org/ticket/36586> and
620 # <http://www.logilab.org/ticket/35642>
621 # pylint: disable=W0233
622 _RpcClientBase.__init__(self, resolver, encoders.get,
623 lock_monitor_cb=lock_monitor_cb,
624 _req_process_fn=_req_process_fn)
625 _generated_rpc.RpcClientConfig.__init__(self)
626 _generated_rpc.RpcClientBootstrap.__init__(self)
627 _generated_rpc.RpcClientDefault.__init__(self)
629 def _InstDict(self, instance, hvp=None, bep=None, osp=None):
630 """Convert the given instance to a dict.
632 This is done via the instance's ToDict() method and additionally
633 we fill the hvparams with the cluster defaults.
635 @type instance: L{objects.Instance}
636 @param instance: an Instance object
637 @type hvp: dict or None
638 @param hvp: a dictionary with overridden hypervisor parameters
639 @type bep: dict or None
640 @param bep: a dictionary with overridden backend parameters
641 @type osp: dict or None
642 @param osp: a dictionary with overridden os parameters
644 @return: the instance dict, with the hvparams filled with the
648 idict = instance.ToDict()
649 cluster = self._cfg.GetClusterInfo()
650 idict["hvparams"] = cluster.FillHV(instance)
652 idict["hvparams"].update(hvp)
653 idict["beparams"] = cluster.FillBE(instance)
655 idict["beparams"].update(bep)
656 idict["osparams"] = cluster.SimpleFillOS(instance.os, instance.osparams)
658 idict["osparams"].update(osp)
659 for nic in idict["nics"]:
660 nic['nicparams'] = objects.FillDict(
661 cluster.nicparams[constants.PP_DEFAULT],
665 def _InstDictHvpBep(self, (instance, hvp, bep)):
666 """Wrapper for L{_InstDict}.
669 return self._InstDict(instance, hvp=hvp, bep=bep)
671 def _InstDictOsp(self, (instance, osparams)):
672 """Wrapper for L{_InstDict}.
675 return self._InstDict(instance, osp=osparams)
678 class JobQueueRunner(_RpcClientBase, _generated_rpc.RpcClientJobQueue):
679 """RPC wrappers for job queue.
682 def __init__(self, context, address_list):
683 """Initializes this class.
686 if address_list is None:
687 resolver = _SsconfResolver
689 # Caller provided an address list
690 resolver = _StaticResolver(address_list)
692 _RpcClientBase.__init__(self, resolver, _ENCODERS.get,
693 lock_monitor_cb=context.glm.AddToLockMonitor)
694 _generated_rpc.RpcClientJobQueue.__init__(self)
697 class BootstrapRunner(_RpcClientBase, _generated_rpc.RpcClientBootstrap):
698 """RPC wrappers for bootstrapping.
702 """Initializes this class.
705 _RpcClientBase.__init__(self, _SsconfResolver, _ENCODERS.get)
706 _generated_rpc.RpcClientBootstrap.__init__(self)
709 class ConfigRunner(_RpcClientBase, _generated_rpc.RpcClientConfig):
710 """RPC wrappers for L{config}.
713 def __init__(self, context, address_list, _req_process_fn=None,
715 """Initializes this class.
719 lock_monitor_cb = context.glm.AddToLockMonitor
721 lock_monitor_cb = None
723 if address_list is None:
724 resolver = _SsconfResolver
726 # Caller provided an address list
727 resolver = _StaticResolver(address_list)
729 encoders = _ENCODERS.copy()
732 rpc_defs.ED_FILE_DETAILS: compat.partial(_PrepareFileUpload, _getents),
735 _RpcClientBase.__init__(self, resolver, encoders.get,
736 lock_monitor_cb=lock_monitor_cb,
737 _req_process_fn=_req_process_fn)
738 _generated_rpc.RpcClientConfig.__init__(self)