4 # Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Inter-node RPC library.
26 # pylint: disable=C0103,R0201,R0904
27 # C0103: Invalid name, since call_ are not valid
28 # R0201: Method could be a function, we keep all rpcs instance methods
29 # as not to change them back and forth between static/instance methods
30 # if they need to start using instance attributes
31 # R0904: Too many public methods
39 from ganeti import utils
40 from ganeti import objects
41 from ganeti import http
42 from ganeti import serializer
43 from ganeti import constants
44 from ganeti import errors
45 from ganeti import netutils
46 from ganeti import ssconf
47 from ganeti import runtime
48 from ganeti import compat
49 from ganeti import rpc_defs
50 from ganeti import pathutils
51 from ganeti import vcluster
53 # Special module generated at build time
54 from ganeti import _generated_rpc
56 # pylint has a bug here, doesn't see this import
57 import ganeti.http.client # pylint: disable=W0611
60 _RPC_CLIENT_HEADERS = [
61 "Content-type: %s" % http.HTTP_APP_JSON,
65 #: Special value to describe an offline host
70 """Initializes the module-global HTTP client manager.
72 Must be called before using any RPC function and while exactly one thread is
76 # curl_global_init(3) and curl_global_cleanup(3) must be called with only
77 # one thread running. This check is just a safety measure -- it doesn't
79 assert threading.activeCount() == 1, \
80 "Found more than one active thread when initializing pycURL"
82 logging.info("Using PycURL %s", pycurl.version)
84 pycurl.global_init(pycurl.GLOBAL_ALL)
88 """Stops the module-global HTTP client manager.
90 Must be called before quitting the program and while exactly one thread is
94 pycurl.global_cleanup()
97 def _ConfigRpcCurl(curl):
98 noded_cert = str(pathutils.NODED_CERT_FILE)
100 curl.setopt(pycurl.FOLLOWLOCATION, False)
101 curl.setopt(pycurl.CAINFO, noded_cert)
102 curl.setopt(pycurl.SSL_VERIFYHOST, 0)
103 curl.setopt(pycurl.SSL_VERIFYPEER, True)
104 curl.setopt(pycurl.SSLCERTTYPE, "PEM")
105 curl.setopt(pycurl.SSLCERT, noded_cert)
106 curl.setopt(pycurl.SSLKEYTYPE, "PEM")
107 curl.setopt(pycurl.SSLKEY, noded_cert)
108 curl.setopt(pycurl.CONNECTTIMEOUT, constants.RPC_CONNECT_TIMEOUT)
112 """RPC-wrapper decorator.
114 When applied to a function, it runs it with the RPC system
115 initialized, and it shutsdown the system afterwards. This means the
116 function must be called without RPC being initialized.
119 def wrapper(*args, **kwargs):
122 return fn(*args, **kwargs)
129 """Compresses a string for transport over RPC.
131 Small amounts of data are not compressed.
136 @return: Encoded data to send
139 # Small amounts of data are not compressed
141 return (constants.RPC_ENCODING_NONE, data)
143 # Compress with zlib and encode in base64
144 return (constants.RPC_ENCODING_ZLIB_BASE64,
145 base64.b64encode(zlib.compress(data, 3)))
148 class RpcResult(object):
151 This class holds an RPC result. It is needed since in multi-node
152 calls we can't raise an exception just because one one out of many
153 failed, and therefore we use this class to encapsulate the result.
155 @ivar data: the data payload, for successful results, or None
156 @ivar call: the name of the RPC call
157 @ivar node: the name of the node to which we made the call
158 @ivar offline: whether the operation failed because the node was
159 offline, as opposed to actual failure; offline=True will always
160 imply failed=True, in order to allow simpler checking if
161 the user doesn't care about the exact failure mode
162 @ivar fail_msg: the error message if the call failed
165 def __init__(self, data=None, failed=False, offline=False,
166 call=None, node=None):
167 self.offline = offline
172 self.fail_msg = "Node is marked offline"
173 self.data = self.payload = None
175 self.fail_msg = self._EnsureErr(data)
176 self.data = self.payload = None
179 if not isinstance(self.data, (tuple, list)):
180 self.fail_msg = ("RPC layer error: invalid result type (%s)" %
184 self.fail_msg = ("RPC layer error: invalid result length (%d), "
185 "expected 2" % len(self.data))
187 elif not self.data[0]:
188 self.fail_msg = self._EnsureErr(self.data[1])
193 self.payload = data[1]
195 for attr_name in ["call", "data", "fail_msg",
196 "node", "offline", "payload"]:
197 assert hasattr(self, attr_name), "Missing attribute %s" % attr_name
201 """Helper to ensure we return a 'True' value for error."""
205 return "No error information"
207 def Raise(self, msg, prereq=False, ecode=None):
208 """If the result has failed, raise an OpExecError.
210 This is used so that LU code doesn't have to check for each
211 result, but instead can call this function.
214 if not self.fail_msg:
217 if not msg: # one could pass None for default message
218 msg = ("Call '%s' to node '%s' has failed: %s" %
219 (self.call, self.node, self.fail_msg))
221 msg = "%s: %s" % (msg, self.fail_msg)
223 ec = errors.OpPrereqError
225 ec = errors.OpExecError
226 if ecode is not None:
230 raise ec(*args) # pylint: disable=W0142
233 def _SsconfResolver(ssconf_ips, node_list, _,
234 ssc=ssconf.SimpleStore,
235 nslookup_fn=netutils.Hostname.GetIP):
236 """Return addresses for given node names.
238 @type ssconf_ips: bool
239 @param ssconf_ips: Use the ssconf IPs
240 @type node_list: list
241 @param node_list: List of node names
243 @param ssc: SimpleStore class that is used to obtain node->ip mappings
244 @type nslookup_fn: callable
245 @param nslookup_fn: function use to do NS lookup
246 @rtype: list of tuple; (string, string)
247 @return: List of tuples containing node name and IP address
251 family = ss.GetPrimaryIPFamily()
254 iplist = ss.GetNodePrimaryIPList()
255 ipmap = dict(entry.split() for entry in iplist)
260 for node in node_list:
263 ip = nslookup_fn(node, family=family)
264 result.append((node, ip))
269 class _StaticResolver:
270 def __init__(self, addresses):
271 """Initializes this class.
274 self._addresses = addresses
276 def __call__(self, hosts, _):
277 """Returns static addresses for hosts.
280 assert len(hosts) == len(self._addresses)
281 return zip(hosts, self._addresses)
284 def _CheckConfigNode(name, node, accept_offline_node):
285 """Checks if a node is online.
288 @param name: Node name
289 @type node: L{objects.Node} or None
290 @param node: Node object
294 # Depend on DNS for name resolution
296 elif node.offline and not accept_offline_node:
303 def _NodeConfigResolver(single_node_fn, all_nodes_fn, hosts, opts):
304 """Calculate node addresses using configuration.
307 accept_offline_node = (opts is rpc_defs.ACCEPT_OFFLINE_NODE)
309 assert accept_offline_node or opts is None, "Unknown option"
311 # Special case for single-host lookups
314 return [_CheckConfigNode(name, single_node_fn(name), accept_offline_node)]
316 all_nodes = all_nodes_fn()
317 return [_CheckConfigNode(name, all_nodes.get(name, None),
323 def __init__(self, resolver, port, lock_monitor_cb=None):
324 """Initializes this class.
326 @param resolver: callable accepting a list of hostnames, returning a list
327 of tuples containing name and IP address (IP address can be the name or
328 the special value L{_OFFLINE} to mark offline machines)
330 @param port: TCP port
331 @param lock_monitor_cb: Callable for registering with lock monitor
334 self._resolver = resolver
336 self._lock_monitor_cb = lock_monitor_cb
339 def _PrepareRequests(hosts, port, procedure, body, read_timeout):
340 """Prepares requests by sorting offline hosts into separate list.
343 @param body: a dictionary with per-host body data
349 assert isinstance(body, dict)
350 assert len(body) == len(hosts)
351 assert compat.all(isinstance(v, str) for v in body.values())
352 assert frozenset(map(compat.fst, hosts)) == frozenset(body.keys()), \
353 "%s != %s" % (hosts, body.keys())
355 for (name, ip) in hosts:
357 # Node is marked as offline
358 results[name] = RpcResult(node=name, offline=True, call=procedure)
361 http.client.HttpClientRequest(str(ip), port,
362 http.HTTP_POST, str("/%s" % procedure),
363 headers=_RPC_CLIENT_HEADERS,
364 post_data=body[name],
365 read_timeout=read_timeout,
366 nicename="%s/%s" % (name, procedure),
367 curl_config_fn=_ConfigRpcCurl)
369 return (results, requests)
372 def _CombineResults(results, requests, procedure):
373 """Combines pre-computed results for offline hosts with actual call results.
376 for name, req in requests.items():
377 if req.success and req.resp_status_code == http.HTTP_OK:
378 host_result = RpcResult(data=serializer.LoadJson(req.resp_body),
379 node=name, call=procedure)
381 # TODO: Better error reporting
387 logging.error("RPC error in %s on node %s: %s", procedure, name, msg)
388 host_result = RpcResult(data=msg, failed=True, node=name,
391 results[name] = host_result
395 def __call__(self, hosts, procedure, body, read_timeout, resolver_opts,
396 _req_process_fn=None):
397 """Makes an RPC request to a number of nodes.
399 @type hosts: sequence
400 @param hosts: Hostnames
401 @type procedure: string
402 @param procedure: Request path
403 @type body: dictionary
404 @param body: dictionary with request bodies per host
405 @type read_timeout: int or None
406 @param read_timeout: Read timeout for request
409 assert read_timeout is not None, \
410 "Missing RPC read timeout for procedure '%s'" % procedure
412 if _req_process_fn is None:
413 _req_process_fn = http.client.ProcessRequests
415 (results, requests) = \
416 self._PrepareRequests(self._resolver(hosts, resolver_opts), self._port,
417 procedure, body, read_timeout)
419 _req_process_fn(requests.values(), lock_monitor_cb=self._lock_monitor_cb)
421 assert not frozenset(results).intersection(requests)
423 return self._CombineResults(results, requests, procedure)
426 class _RpcClientBase:
427 def __init__(self, resolver, encoder_fn, lock_monitor_cb=None,
428 _req_process_fn=None):
429 """Initializes this class.
432 proc = _RpcProcessor(resolver,
433 netutils.GetDaemonPort(constants.NODED),
434 lock_monitor_cb=lock_monitor_cb)
435 self._proc = compat.partial(proc, _req_process_fn=_req_process_fn)
436 self._encoder = compat.partial(self._EncodeArg, encoder_fn)
439 def _EncodeArg(encoder_fn, (argkind, value)):
446 return encoder_fn(argkind)(value)
448 def _Call(self, cdef, node_list, args):
449 """Entry point for automatically generated RPC wrappers.
452 (procedure, _, resolver_opts, timeout, argdefs,
453 prep_fn, postproc_fn, _) = cdef
455 if callable(timeout):
456 read_timeout = timeout(args)
458 read_timeout = timeout
460 if callable(resolver_opts):
461 req_resolver_opts = resolver_opts(args)
463 req_resolver_opts = resolver_opts
465 if len(args) != len(argdefs):
466 raise errors.ProgrammerError("Number of passed arguments doesn't match")
468 enc_args = map(self._encoder, zip(map(compat.snd, argdefs), args))
470 # for a no-op prep_fn, we serialise the body once, and then we
471 # reuse it in the dictionary values
472 body = serializer.DumpJson(enc_args)
473 pnbody = dict((n, body) for n in node_list)
475 # for a custom prep_fn, we pass the encoded arguments and the
476 # node name to the prep_fn, and we serialise its return value
477 assert callable(prep_fn)
478 pnbody = dict((n, serializer.DumpJson(prep_fn(n, enc_args)))
481 result = self._proc(node_list, procedure, pnbody, read_timeout,
485 return dict(map(lambda (key, value): (key, postproc_fn(value)),
491 def _ObjectToDict(value):
492 """Converts an object to a dictionary.
494 @note: See L{objects}.
497 return value.ToDict()
500 def _ObjectListToDict(value):
501 """Converts a list of L{objects} to dictionaries.
504 return map(_ObjectToDict, value)
507 def _EncodeNodeToDiskDict(value):
508 """Encodes a dictionary with node name as key and disk objects as values.
511 return dict((name, _ObjectListToDict(disks))
512 for name, disks in value.items())
515 def _PrepareFileUpload(getents_fn, filename):
516 """Loads a file and prepares it for an upload to nodes.
519 statcb = utils.FileStatHelper()
520 data = _Compress(utils.ReadFile(filename, preread=statcb))
523 if getents_fn is None:
524 getents_fn = runtime.GetEnts
526 getents = getents_fn()
528 virt_filename = vcluster.MakeVirtualPath(filename)
530 return [virt_filename, data, st.st_mode, getents.LookupUid(st.st_uid),
531 getents.LookupGid(st.st_gid), st.st_atime, st.st_mtime]
534 def _PrepareFinalizeExportDisks(snap_disks):
535 """Encodes disks for finalizing export.
540 for disk in snap_disks:
541 if isinstance(disk, bool):
542 flat_disks.append(disk)
544 flat_disks.append(disk.ToDict())
549 def _EncodeImportExportIO((ieio, ieioargs)):
550 """Encodes import/export I/O information.
553 if ieio == constants.IEIO_RAW_DISK:
554 assert len(ieioargs) == 1
555 return (ieio, (ieioargs[0].ToDict(), ))
557 if ieio == constants.IEIO_SCRIPT:
558 assert len(ieioargs) == 2
559 return (ieio, (ieioargs[0].ToDict(), ieioargs[1]))
561 return (ieio, ieioargs)
564 def _EncodeBlockdevRename(value):
565 """Encodes information for renaming block devices.
568 return [(d.ToDict(), uid) for d, uid in value]
571 def MakeLegacyNodeInfo(data):
572 """Formats the data returned by L{rpc.RpcRunner.call_node_info}.
574 Converts the data into a single dictionary. This is fine for most use cases,
575 but some require information from more than one volume group or hypervisor.
578 (bootid, (vg_info, ), (hv_info, )) = data
580 return utils.JoinDisjointDicts(utils.JoinDisjointDicts(vg_info, hv_info), {
585 def _AnnotateDParamsDRBD(disk, (drbd_params, data_params, meta_params)):
586 """Annotates just DRBD disks layouts.
589 assert disk.dev_type == constants.LD_DRBD8
591 disk.params = objects.FillDict(drbd_params, disk.params)
592 (dev_data, dev_meta) = disk.children
593 dev_data.params = objects.FillDict(data_params, dev_data.params)
594 dev_meta.params = objects.FillDict(meta_params, dev_meta.params)
599 def _AnnotateDParamsGeneric(disk, (params, )):
600 """Generic disk parameter annotation routine.
603 assert disk.dev_type != constants.LD_DRBD8
605 disk.params = objects.FillDict(params, disk.params)
610 def AnnotateDiskParams(template, disks, disk_params):
611 """Annotates the disk objects with the disk parameters.
613 @param template: The disk template used
614 @param disks: The list of disks objects to annotate
615 @param disk_params: The disk paramaters for annotation
616 @returns: A list of disk objects annotated
619 ld_params = objects.Disk.ComputeLDParams(template, disk_params)
621 if template == constants.DT_DRBD8:
622 annotation_fn = _AnnotateDParamsDRBD
623 elif template == constants.DT_DISKLESS:
624 annotation_fn = lambda disk, _: disk
626 annotation_fn = _AnnotateDParamsGeneric
628 return [annotation_fn(disk.Copy(), ld_params) for disk in disks]
633 rpc_defs.ED_OBJECT_DICT: _ObjectToDict,
634 rpc_defs.ED_OBJECT_DICT_LIST: _ObjectListToDict,
635 rpc_defs.ED_NODE_TO_DISK_DICT: _EncodeNodeToDiskDict,
636 rpc_defs.ED_COMPRESS: _Compress,
637 rpc_defs.ED_FINALIZE_EXPORT_DISKS: _PrepareFinalizeExportDisks,
638 rpc_defs.ED_IMPEXP_IO: _EncodeImportExportIO,
639 rpc_defs.ED_BLOCKDEV_RENAME: _EncodeBlockdevRename,
643 class RpcRunner(_RpcClientBase,
644 _generated_rpc.RpcClientDefault,
645 _generated_rpc.RpcClientBootstrap,
646 _generated_rpc.RpcClientDnsOnly,
647 _generated_rpc.RpcClientConfig):
651 def __init__(self, cfg, lock_monitor_cb, _req_process_fn=None, _getents=None):
652 """Initialized the RPC runner.
654 @type cfg: L{config.ConfigWriter}
655 @param cfg: Configuration
656 @type lock_monitor_cb: callable
657 @param lock_monitor_cb: Lock monitor callback
662 encoders = _ENCODERS.copy()
665 # Encoders requiring configuration object
666 rpc_defs.ED_INST_DICT: self._InstDict,
667 rpc_defs.ED_INST_DICT_HVP_BEP: self._InstDictHvpBep,
668 rpc_defs.ED_INST_DICT_OSP_DP: self._InstDictOspDp,
670 # Encoders annotating disk parameters
671 rpc_defs.ED_DISKS_DICT_DP: self._DisksDictDP,
672 rpc_defs.ED_SINGLE_DISK_DICT_DP: self._SingleDiskDictDP,
674 # Encoders with special requirements
675 rpc_defs.ED_FILE_DETAILS: compat.partial(_PrepareFileUpload, _getents),
678 # Resolver using configuration
679 resolver = compat.partial(_NodeConfigResolver, cfg.GetNodeInfo,
682 # Pylint doesn't recognize multiple inheritance properly, see
683 # <http://www.logilab.org/ticket/36586> and
684 # <http://www.logilab.org/ticket/35642>
685 # pylint: disable=W0233
686 _RpcClientBase.__init__(self, resolver, encoders.get,
687 lock_monitor_cb=lock_monitor_cb,
688 _req_process_fn=_req_process_fn)
689 _generated_rpc.RpcClientConfig.__init__(self)
690 _generated_rpc.RpcClientBootstrap.__init__(self)
691 _generated_rpc.RpcClientDnsOnly.__init__(self)
692 _generated_rpc.RpcClientDefault.__init__(self)
694 def _InstDict(self, instance, hvp=None, bep=None, osp=None):
695 """Convert the given instance to a dict.
697 This is done via the instance's ToDict() method and additionally
698 we fill the hvparams with the cluster defaults.
700 @type instance: L{objects.Instance}
701 @param instance: an Instance object
702 @type hvp: dict or None
703 @param hvp: a dictionary with overridden hypervisor parameters
704 @type bep: dict or None
705 @param bep: a dictionary with overridden backend parameters
706 @type osp: dict or None
707 @param osp: a dictionary with overridden os parameters
709 @return: the instance dict, with the hvparams filled with the
713 idict = instance.ToDict()
714 cluster = self._cfg.GetClusterInfo()
715 idict["hvparams"] = cluster.FillHV(instance)
717 idict["hvparams"].update(hvp)
718 idict["beparams"] = cluster.FillBE(instance)
720 idict["beparams"].update(bep)
721 idict["osparams"] = cluster.SimpleFillOS(instance.os, instance.osparams)
723 idict["osparams"].update(osp)
724 for nic in idict["nics"]:
725 nic["nicparams"] = objects.FillDict(
726 cluster.nicparams[constants.PP_DEFAULT],
730 def _InstDictHvpBep(self, (instance, hvp, bep)):
731 """Wrapper for L{_InstDict}.
734 return self._InstDict(instance, hvp=hvp, bep=bep)
736 def _InstDictOspDp(self, (instance, osparams)):
737 """Wrapper for L{_InstDict}.
740 updated_inst = self._InstDict(instance, osp=osparams)
741 updated_inst["disks"] = self._DisksDictDP((instance.disks, instance))
744 def _DisksDictDP(self, (disks, instance)):
745 """Wrapper for L{AnnotateDiskParams}.
748 diskparams = self._cfg.GetInstanceDiskParams(instance)
749 return [disk.ToDict()
750 for disk in AnnotateDiskParams(instance.disk_template,
753 def _SingleDiskDictDP(self, (disk, instance)):
754 """Wrapper for L{AnnotateDiskParams}.
757 (anno_disk,) = self._DisksDictDP(([disk], instance))
761 class JobQueueRunner(_RpcClientBase, _generated_rpc.RpcClientJobQueue):
762 """RPC wrappers for job queue.
765 def __init__(self, context, address_list):
766 """Initializes this class.
769 if address_list is None:
770 resolver = compat.partial(_SsconfResolver, True)
772 # Caller provided an address list
773 resolver = _StaticResolver(address_list)
775 _RpcClientBase.__init__(self, resolver, _ENCODERS.get,
776 lock_monitor_cb=context.glm.AddToLockMonitor)
777 _generated_rpc.RpcClientJobQueue.__init__(self)
780 class BootstrapRunner(_RpcClientBase,
781 _generated_rpc.RpcClientBootstrap,
782 _generated_rpc.RpcClientDnsOnly):
783 """RPC wrappers for bootstrapping.
787 """Initializes this class.
790 # Pylint doesn't recognize multiple inheritance properly, see
791 # <http://www.logilab.org/ticket/36586> and
792 # <http://www.logilab.org/ticket/35642>
793 # pylint: disable=W0233
794 _RpcClientBase.__init__(self, compat.partial(_SsconfResolver, True),
796 _generated_rpc.RpcClientBootstrap.__init__(self)
797 _generated_rpc.RpcClientDnsOnly.__init__(self)
800 class DnsOnlyRunner(_RpcClientBase, _generated_rpc.RpcClientDnsOnly):
801 """RPC wrappers for calls using only DNS.
805 """Initialize this class.
808 _RpcClientBase.__init__(self, compat.partial(_SsconfResolver, False),
810 _generated_rpc.RpcClientDnsOnly.__init__(self)
813 class ConfigRunner(_RpcClientBase, _generated_rpc.RpcClientConfig):
814 """RPC wrappers for L{config}.
817 def __init__(self, context, address_list, _req_process_fn=None,
819 """Initializes this class.
823 lock_monitor_cb = context.glm.AddToLockMonitor
825 lock_monitor_cb = None
827 if address_list is None:
828 resolver = compat.partial(_SsconfResolver, True)
830 # Caller provided an address list
831 resolver = _StaticResolver(address_list)
833 encoders = _ENCODERS.copy()
836 rpc_defs.ED_FILE_DETAILS: compat.partial(_PrepareFileUpload, _getents),
839 _RpcClientBase.__init__(self, resolver, encoders.get,
840 lock_monitor_cb=lock_monitor_cb,
841 _req_process_fn=_req_process_fn)
842 _generated_rpc.RpcClientConfig.__init__(self)