4 # Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Inter-node RPC library.
26 # pylint: disable=C0103,R0201,R0904
27 # C0103: Invalid name, since call_ are not valid
28 # R0201: Method could be a function, we keep all rpcs instance methods
29 # as not to change them back and forth between static/instance methods
30 # if they need to start using instance attributes
31 # R0904: Too many public methods
39 from ganeti import utils
40 from ganeti import objects
41 from ganeti import http
42 from ganeti import serializer
43 from ganeti import constants
44 from ganeti import errors
45 from ganeti import netutils
46 from ganeti import ssconf
47 from ganeti import runtime
48 from ganeti import compat
49 from ganeti import rpc_defs
51 # Special module generated at build time
52 from ganeti import _generated_rpc
54 # pylint has a bug here, doesn't see this import
55 import ganeti.http.client # pylint: disable=W0611
58 _RPC_CLIENT_HEADERS = [
59 "Content-type: %s" % http.HTTP_APP_JSON,
63 #: Special value to describe an offline host
68 """Initializes the module-global HTTP client manager.
70 Must be called before using any RPC function and while exactly one thread is
74 # curl_global_init(3) and curl_global_cleanup(3) must be called with only
75 # one thread running. This check is just a safety measure -- it doesn't
77 assert threading.activeCount() == 1, \
78 "Found more than one active thread when initializing pycURL"
80 logging.info("Using PycURL %s", pycurl.version)
82 pycurl.global_init(pycurl.GLOBAL_ALL)
86 """Stops the module-global HTTP client manager.
88 Must be called before quitting the program and while exactly one thread is
92 pycurl.global_cleanup()
95 def _ConfigRpcCurl(curl):
96 noded_cert = str(constants.NODED_CERT_FILE)
98 curl.setopt(pycurl.FOLLOWLOCATION, False)
99 curl.setopt(pycurl.CAINFO, noded_cert)
100 curl.setopt(pycurl.SSL_VERIFYHOST, 0)
101 curl.setopt(pycurl.SSL_VERIFYPEER, True)
102 curl.setopt(pycurl.SSLCERTTYPE, "PEM")
103 curl.setopt(pycurl.SSLCERT, noded_cert)
104 curl.setopt(pycurl.SSLKEYTYPE, "PEM")
105 curl.setopt(pycurl.SSLKEY, noded_cert)
106 curl.setopt(pycurl.CONNECTTIMEOUT, constants.RPC_CONNECT_TIMEOUT)
110 """RPC-wrapper decorator.
112 When applied to a function, it runs it with the RPC system
113 initialized, and it shutsdown the system afterwards. This means the
114 function must be called without RPC being initialized.
117 def wrapper(*args, **kwargs):
120 return fn(*args, **kwargs)
127 """Compresses a string for transport over RPC.
129 Small amounts of data are not compressed.
134 @return: Encoded data to send
137 # Small amounts of data are not compressed
139 return (constants.RPC_ENCODING_NONE, data)
141 # Compress with zlib and encode in base64
142 return (constants.RPC_ENCODING_ZLIB_BASE64,
143 base64.b64encode(zlib.compress(data, 3)))
146 class RpcResult(object):
149 This class holds an RPC result. It is needed since in multi-node
150 calls we can't raise an exception just because one one out of many
151 failed, and therefore we use this class to encapsulate the result.
153 @ivar data: the data payload, for successful results, or None
154 @ivar call: the name of the RPC call
155 @ivar node: the name of the node to which we made the call
156 @ivar offline: whether the operation failed because the node was
157 offline, as opposed to actual failure; offline=True will always
158 imply failed=True, in order to allow simpler checking if
159 the user doesn't care about the exact failure mode
160 @ivar fail_msg: the error message if the call failed
163 def __init__(self, data=None, failed=False, offline=False,
164 call=None, node=None):
165 self.offline = offline
170 self.fail_msg = "Node is marked offline"
171 self.data = self.payload = None
173 self.fail_msg = self._EnsureErr(data)
174 self.data = self.payload = None
177 if not isinstance(self.data, (tuple, list)):
178 self.fail_msg = ("RPC layer error: invalid result type (%s)" %
182 self.fail_msg = ("RPC layer error: invalid result length (%d), "
183 "expected 2" % len(self.data))
185 elif not self.data[0]:
186 self.fail_msg = self._EnsureErr(self.data[1])
191 self.payload = data[1]
193 for attr_name in ["call", "data", "fail_msg",
194 "node", "offline", "payload"]:
195 assert hasattr(self, attr_name), "Missing attribute %s" % attr_name
199 """Helper to ensure we return a 'True' value for error."""
203 return "No error information"
205 def Raise(self, msg, prereq=False, ecode=None):
206 """If the result has failed, raise an OpExecError.
208 This is used so that LU code doesn't have to check for each
209 result, but instead can call this function.
212 if not self.fail_msg:
215 if not msg: # one could pass None for default message
216 msg = ("Call '%s' to node '%s' has failed: %s" %
217 (self.call, self.node, self.fail_msg))
219 msg = "%s: %s" % (msg, self.fail_msg)
221 ec = errors.OpPrereqError
223 ec = errors.OpExecError
224 if ecode is not None:
228 raise ec(*args) # pylint: disable=W0142
231 def _SsconfResolver(ssconf_ips, node_list, _,
232 ssc=ssconf.SimpleStore,
233 nslookup_fn=netutils.Hostname.GetIP):
234 """Return addresses for given node names.
236 @type ssconf_ips: bool
237 @param ssconf_ips: Use the ssconf IPs
238 @type node_list: list
239 @param node_list: List of node names
241 @param ssc: SimpleStore class that is used to obtain node->ip mappings
242 @type nslookup_fn: callable
243 @param nslookup_fn: function use to do NS lookup
244 @rtype: list of tuple; (string, string)
245 @return: List of tuples containing node name and IP address
249 family = ss.GetPrimaryIPFamily()
252 iplist = ss.GetNodePrimaryIPList()
253 ipmap = dict(entry.split() for entry in iplist)
258 for node in node_list:
261 ip = nslookup_fn(node, family=family)
262 result.append((node, ip))
267 class _StaticResolver:
268 def __init__(self, addresses):
269 """Initializes this class.
272 self._addresses = addresses
274 def __call__(self, hosts, _):
275 """Returns static addresses for hosts.
278 assert len(hosts) == len(self._addresses)
279 return zip(hosts, self._addresses)
282 def _CheckConfigNode(name, node, accept_offline_node):
283 """Checks if a node is online.
286 @param name: Node name
287 @type node: L{objects.Node} or None
288 @param node: Node object
292 # Depend on DNS for name resolution
294 elif node.offline and not accept_offline_node:
301 def _NodeConfigResolver(single_node_fn, all_nodes_fn, hosts, opts):
302 """Calculate node addresses using configuration.
305 accept_offline_node = (opts is rpc_defs.ACCEPT_OFFLINE_NODE)
307 assert accept_offline_node or opts is None, "Unknown option"
309 # Special case for single-host lookups
312 return [_CheckConfigNode(name, single_node_fn(name), accept_offline_node)]
314 all_nodes = all_nodes_fn()
315 return [_CheckConfigNode(name, all_nodes.get(name, None),
321 def __init__(self, resolver, port, lock_monitor_cb=None):
322 """Initializes this class.
324 @param resolver: callable accepting a list of hostnames, returning a list
325 of tuples containing name and IP address (IP address can be the name or
326 the special value L{_OFFLINE} to mark offline machines)
328 @param port: TCP port
329 @param lock_monitor_cb: Callable for registering with lock monitor
332 self._resolver = resolver
334 self._lock_monitor_cb = lock_monitor_cb
337 def _PrepareRequests(hosts, port, procedure, body, read_timeout):
338 """Prepares requests by sorting offline hosts into separate list.
341 @param body: a dictionary with per-host body data
347 assert isinstance(body, dict)
348 assert len(body) == len(hosts)
349 assert compat.all(isinstance(v, str) for v in body.values())
350 assert frozenset(map(compat.fst, hosts)) == frozenset(body.keys()), \
351 "%s != %s" % (hosts, body.keys())
353 for (name, ip) in hosts:
355 # Node is marked as offline
356 results[name] = RpcResult(node=name, offline=True, call=procedure)
359 http.client.HttpClientRequest(str(ip), port,
360 http.HTTP_POST, str("/%s" % procedure),
361 headers=_RPC_CLIENT_HEADERS,
362 post_data=body[name],
363 read_timeout=read_timeout,
364 nicename="%s/%s" % (name, procedure),
365 curl_config_fn=_ConfigRpcCurl)
367 return (results, requests)
370 def _CombineResults(results, requests, procedure):
371 """Combines pre-computed results for offline hosts with actual call results.
374 for name, req in requests.items():
375 if req.success and req.resp_status_code == http.HTTP_OK:
376 host_result = RpcResult(data=serializer.LoadJson(req.resp_body),
377 node=name, call=procedure)
379 # TODO: Better error reporting
385 logging.error("RPC error in %s on node %s: %s", procedure, name, msg)
386 host_result = RpcResult(data=msg, failed=True, node=name,
389 results[name] = host_result
393 def __call__(self, hosts, procedure, body, read_timeout, resolver_opts,
394 _req_process_fn=None):
395 """Makes an RPC request to a number of nodes.
397 @type hosts: sequence
398 @param hosts: Hostnames
399 @type procedure: string
400 @param procedure: Request path
401 @type body: dictionary
402 @param body: dictionary with request bodies per host
403 @type read_timeout: int or None
404 @param read_timeout: Read timeout for request
407 assert read_timeout is not None, \
408 "Missing RPC read timeout for procedure '%s'" % procedure
410 if _req_process_fn is None:
411 _req_process_fn = http.client.ProcessRequests
413 (results, requests) = \
414 self._PrepareRequests(self._resolver(hosts, resolver_opts), self._port,
415 procedure, body, read_timeout)
417 _req_process_fn(requests.values(), lock_monitor_cb=self._lock_monitor_cb)
419 assert not frozenset(results).intersection(requests)
421 return self._CombineResults(results, requests, procedure)
424 class _RpcClientBase:
425 def __init__(self, resolver, encoder_fn, lock_monitor_cb=None,
426 _req_process_fn=None):
427 """Initializes this class.
430 proc = _RpcProcessor(resolver,
431 netutils.GetDaemonPort(constants.NODED),
432 lock_monitor_cb=lock_monitor_cb)
433 self._proc = compat.partial(proc, _req_process_fn=_req_process_fn)
434 self._encoder = compat.partial(self._EncodeArg, encoder_fn)
437 def _EncodeArg(encoder_fn, (argkind, value)):
444 return encoder_fn(argkind)(value)
446 def _Call(self, cdef, node_list, args):
447 """Entry point for automatically generated RPC wrappers.
450 (procedure, _, resolver_opts, timeout, argdefs,
451 prep_fn, postproc_fn, _) = cdef
453 if callable(timeout):
454 read_timeout = timeout(args)
456 read_timeout = timeout
458 if callable(resolver_opts):
459 req_resolver_opts = resolver_opts(args)
461 req_resolver_opts = resolver_opts
463 if len(args) != len(argdefs):
464 raise errors.ProgrammerError("Number of passed arguments doesn't match")
466 enc_args = map(self._encoder, zip(map(compat.snd, argdefs), args))
468 # for a no-op prep_fn, we serialise the body once, and then we
469 # reuse it in the dictionary values
470 body = serializer.DumpJson(enc_args)
471 pnbody = dict((n, body) for n in node_list)
473 # for a custom prep_fn, we pass the encoded arguments and the
474 # node name to the prep_fn, and we serialise its return value
475 assert callable(prep_fn)
476 pnbody = dict((n, serializer.DumpJson(prep_fn(n, enc_args)))
479 result = self._proc(node_list, procedure, pnbody, read_timeout,
483 return dict(map(lambda (key, value): (key, postproc_fn(value)),
489 def _ObjectToDict(value):
490 """Converts an object to a dictionary.
492 @note: See L{objects}.
495 return value.ToDict()
498 def _ObjectListToDict(value):
499 """Converts a list of L{objects} to dictionaries.
502 return map(_ObjectToDict, value)
505 def _EncodeNodeToDiskDict(value):
506 """Encodes a dictionary with node name as key and disk objects as values.
509 return dict((name, _ObjectListToDict(disks))
510 for name, disks in value.items())
513 def _PrepareFileUpload(getents_fn, filename):
514 """Loads a file and prepares it for an upload to nodes.
517 statcb = utils.FileStatHelper()
518 data = _Compress(utils.ReadFile(filename, preread=statcb))
521 if getents_fn is None:
522 getents_fn = runtime.GetEnts
524 getents = getents_fn()
526 return [filename, data, st.st_mode, getents.LookupUid(st.st_uid),
527 getents.LookupGid(st.st_gid), st.st_atime, st.st_mtime]
530 def _PrepareFinalizeExportDisks(snap_disks):
531 """Encodes disks for finalizing export.
536 for disk in snap_disks:
537 if isinstance(disk, bool):
538 flat_disks.append(disk)
540 flat_disks.append(disk.ToDict())
545 def _EncodeImportExportIO((ieio, ieioargs)):
546 """Encodes import/export I/O information.
549 if ieio == constants.IEIO_RAW_DISK:
550 assert len(ieioargs) == 1
551 return (ieio, (ieioargs[0].ToDict(), ))
553 if ieio == constants.IEIO_SCRIPT:
554 assert len(ieioargs) == 2
555 return (ieio, (ieioargs[0].ToDict(), ieioargs[1]))
557 return (ieio, ieioargs)
560 def _EncodeBlockdevRename(value):
561 """Encodes information for renaming block devices.
564 return [(d.ToDict(), uid) for d, uid in value]
567 def MakeLegacyNodeInfo(data):
568 """Formats the data returned by L{rpc.RpcRunner.call_node_info}.
570 Converts the data into a single dictionary. This is fine for most use cases,
571 but some require information from more than one volume group or hypervisor.
574 (bootid, (vg_info, ), (hv_info, )) = data
576 return utils.JoinDisjointDicts(utils.JoinDisjointDicts(vg_info, hv_info), {
581 def _AnnotateDParamsDRBD(disk, (drbd_params, data_params, meta_params)):
582 """Annotates just DRBD disks layouts.
585 assert disk.dev_type == constants.LD_DRBD8
587 disk.params = objects.FillDict(drbd_params, disk.params)
588 (dev_data, dev_meta) = disk.children
589 dev_data.params = objects.FillDict(data_params, dev_data.params)
590 dev_meta.params = objects.FillDict(meta_params, dev_meta.params)
595 def _AnnotateDParamsGeneric(disk, (params, )):
596 """Generic disk parameter annotation routine.
599 assert disk.dev_type != constants.LD_DRBD8
601 disk.params = objects.FillDict(params, disk.params)
606 def AnnotateDiskParams(template, disks, disk_params):
607 """Annotates the disk objects with the disk parameters.
609 @param template: The disk template used
610 @param disks: The list of disks objects to annotate
611 @param disk_params: The disk paramaters for annotation
612 @returns: A list of disk objects annotated
615 ld_params = objects.Disk.ComputeLDParams(template, disk_params)
617 if template == constants.DT_DRBD8:
618 annotation_fn = _AnnotateDParamsDRBD
619 elif template == constants.DT_DISKLESS:
620 annotation_fn = lambda disk, _: disk
622 annotation_fn = _AnnotateDParamsGeneric
624 return [annotation_fn(disk.Copy(), ld_params) for disk in disks]
629 rpc_defs.ED_OBJECT_DICT: _ObjectToDict,
630 rpc_defs.ED_OBJECT_DICT_LIST: _ObjectListToDict,
631 rpc_defs.ED_NODE_TO_DISK_DICT: _EncodeNodeToDiskDict,
632 rpc_defs.ED_COMPRESS: _Compress,
633 rpc_defs.ED_FINALIZE_EXPORT_DISKS: _PrepareFinalizeExportDisks,
634 rpc_defs.ED_IMPEXP_IO: _EncodeImportExportIO,
635 rpc_defs.ED_BLOCKDEV_RENAME: _EncodeBlockdevRename,
639 class RpcRunner(_RpcClientBase,
640 _generated_rpc.RpcClientDefault,
641 _generated_rpc.RpcClientBootstrap,
642 _generated_rpc.RpcClientDnsOnly,
643 _generated_rpc.RpcClientConfig):
647 def __init__(self, cfg, lock_monitor_cb, _req_process_fn=None, _getents=None):
648 """Initialized the RPC runner.
650 @type cfg: L{config.ConfigWriter}
651 @param cfg: Configuration
652 @type lock_monitor_cb: callable
653 @param lock_monitor_cb: Lock monitor callback
658 encoders = _ENCODERS.copy()
661 # Encoders requiring configuration object
662 rpc_defs.ED_INST_DICT: self._InstDict,
663 rpc_defs.ED_INST_DICT_HVP_BEP: self._InstDictHvpBep,
664 rpc_defs.ED_INST_DICT_OSP_DP: self._InstDictOspDp,
666 # Encoders annotating disk parameters
667 rpc_defs.ED_DISKS_DICT_DP: self._DisksDictDP,
668 rpc_defs.ED_SINGLE_DISK_DICT_DP: self._SingleDiskDictDP,
670 # Encoders with special requirements
671 rpc_defs.ED_FILE_DETAILS: compat.partial(_PrepareFileUpload, _getents),
674 # Resolver using configuration
675 resolver = compat.partial(_NodeConfigResolver, cfg.GetNodeInfo,
678 # Pylint doesn't recognize multiple inheritance properly, see
679 # <http://www.logilab.org/ticket/36586> and
680 # <http://www.logilab.org/ticket/35642>
681 # pylint: disable=W0233
682 _RpcClientBase.__init__(self, resolver, encoders.get,
683 lock_monitor_cb=lock_monitor_cb,
684 _req_process_fn=_req_process_fn)
685 _generated_rpc.RpcClientConfig.__init__(self)
686 _generated_rpc.RpcClientBootstrap.__init__(self)
687 _generated_rpc.RpcClientDnsOnly.__init__(self)
688 _generated_rpc.RpcClientDefault.__init__(self)
690 def _InstDict(self, instance, hvp=None, bep=None, osp=None):
691 """Convert the given instance to a dict.
693 This is done via the instance's ToDict() method and additionally
694 we fill the hvparams with the cluster defaults.
696 @type instance: L{objects.Instance}
697 @param instance: an Instance object
698 @type hvp: dict or None
699 @param hvp: a dictionary with overridden hypervisor parameters
700 @type bep: dict or None
701 @param bep: a dictionary with overridden backend parameters
702 @type osp: dict or None
703 @param osp: a dictionary with overridden os parameters
705 @return: the instance dict, with the hvparams filled with the
709 idict = instance.ToDict()
710 cluster = self._cfg.GetClusterInfo()
711 idict["hvparams"] = cluster.FillHV(instance)
713 idict["hvparams"].update(hvp)
714 idict["beparams"] = cluster.FillBE(instance)
716 idict["beparams"].update(bep)
717 idict["osparams"] = cluster.SimpleFillOS(instance.os, instance.osparams)
719 idict["osparams"].update(osp)
720 for nic in idict["nics"]:
721 nic["nicparams"] = objects.FillDict(
722 cluster.nicparams[constants.PP_DEFAULT],
726 def _InstDictHvpBep(self, (instance, hvp, bep)):
727 """Wrapper for L{_InstDict}.
730 return self._InstDict(instance, hvp=hvp, bep=bep)
732 def _InstDictOspDp(self, (instance, osparams)):
733 """Wrapper for L{_InstDict}.
736 updated_inst = self._InstDict(instance, osp=osparams)
737 updated_inst["disks"] = self._DisksDictDP((instance.disks, instance))
740 def _DisksDictDP(self, (disks, instance)):
741 """Wrapper for L{AnnotateDiskParams}.
744 diskparams = self._cfg.GetInstanceDiskParams(instance)
745 return [disk.ToDict()
746 for disk in AnnotateDiskParams(instance.disk_template,
749 def _SingleDiskDictDP(self, (disk, instance)):
750 """Wrapper for L{AnnotateDiskParams}.
753 (anno_disk,) = self._DisksDictDP(([disk], instance))
757 class JobQueueRunner(_RpcClientBase, _generated_rpc.RpcClientJobQueue):
758 """RPC wrappers for job queue.
761 def __init__(self, context, address_list):
762 """Initializes this class.
765 if address_list is None:
766 resolver = compat.partial(_SsconfResolver, True)
768 # Caller provided an address list
769 resolver = _StaticResolver(address_list)
771 _RpcClientBase.__init__(self, resolver, _ENCODERS.get,
772 lock_monitor_cb=context.glm.AddToLockMonitor)
773 _generated_rpc.RpcClientJobQueue.__init__(self)
776 class BootstrapRunner(_RpcClientBase,
777 _generated_rpc.RpcClientBootstrap,
778 _generated_rpc.RpcClientDnsOnly):
779 """RPC wrappers for bootstrapping.
783 """Initializes this class.
786 # Pylint doesn't recognize multiple inheritance properly, see
787 # <http://www.logilab.org/ticket/36586> and
788 # <http://www.logilab.org/ticket/35642>
789 # pylint: disable=W0233
790 _RpcClientBase.__init__(self, compat.partial(_SsconfResolver, True),
792 _generated_rpc.RpcClientBootstrap.__init__(self)
793 _generated_rpc.RpcClientDnsOnly.__init__(self)
796 class DnsOnlyRunner(_RpcClientBase, _generated_rpc.RpcClientDnsOnly):
797 """RPC wrappers for calls using only DNS.
801 """Initialize this class.
804 _RpcClientBase.__init__(self, compat.partial(_SsconfResolver, False),
806 _generated_rpc.RpcClientDnsOnly.__init__(self)
809 class ConfigRunner(_RpcClientBase, _generated_rpc.RpcClientConfig):
810 """RPC wrappers for L{config}.
813 def __init__(self, context, address_list, _req_process_fn=None,
815 """Initializes this class.
819 lock_monitor_cb = context.glm.AddToLockMonitor
821 lock_monitor_cb = None
823 if address_list is None:
824 resolver = compat.partial(_SsconfResolver, True)
826 # Caller provided an address list
827 resolver = _StaticResolver(address_list)
829 encoders = _ENCODERS.copy()
832 rpc_defs.ED_FILE_DETAILS: compat.partial(_PrepareFileUpload, _getents),
835 _RpcClientBase.__init__(self, resolver, encoders.get,
836 lock_monitor_cb=lock_monitor_cb,
837 _req_process_fn=_req_process_fn)
838 _generated_rpc.RpcClientConfig.__init__(self)