4 # Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Inter-node RPC library.
26 # pylint: disable=C0103,R0201,R0904
27 # C0103: Invalid name, since call_ are not valid
28 # R0201: Method could be a function, we keep all rpcs instance methods
29 # as not to change them back and forth between static/instance methods
30 # if they need to start using instance attributes
31 # R0904: Too many public methods
39 from ganeti import utils
40 from ganeti import objects
41 from ganeti import http
42 from ganeti import serializer
43 from ganeti import constants
44 from ganeti import errors
45 from ganeti import netutils
46 from ganeti import ssconf
47 from ganeti import runtime
48 from ganeti import compat
49 from ganeti import rpc_defs
51 # Special module generated at build time
52 from ganeti import _generated_rpc
54 # pylint has a bug here, doesn't see this import
55 import ganeti.http.client # pylint: disable=W0611
58 _RPC_CLIENT_HEADERS = [
59 "Content-type: %s" % http.HTTP_APP_JSON,
63 #: Special value to describe an offline host
68 """Initializes the module-global HTTP client manager.
70 Must be called before using any RPC function and while exactly one thread is
74 # curl_global_init(3) and curl_global_cleanup(3) must be called with only
75 # one thread running. This check is just a safety measure -- it doesn't
77 assert threading.activeCount() == 1, \
78 "Found more than one active thread when initializing pycURL"
80 logging.info("Using PycURL %s", pycurl.version)
82 pycurl.global_init(pycurl.GLOBAL_ALL)
86 """Stops the module-global HTTP client manager.
88 Must be called before quitting the program and while exactly one thread is
92 pycurl.global_cleanup()
95 def _ConfigRpcCurl(curl):
96 noded_cert = str(constants.NODED_CERT_FILE)
98 curl.setopt(pycurl.FOLLOWLOCATION, False)
99 curl.setopt(pycurl.CAINFO, noded_cert)
100 curl.setopt(pycurl.SSL_VERIFYHOST, 0)
101 curl.setopt(pycurl.SSL_VERIFYPEER, True)
102 curl.setopt(pycurl.SSLCERTTYPE, "PEM")
103 curl.setopt(pycurl.SSLCERT, noded_cert)
104 curl.setopt(pycurl.SSLKEYTYPE, "PEM")
105 curl.setopt(pycurl.SSLKEY, noded_cert)
106 curl.setopt(pycurl.CONNECTTIMEOUT, constants.RPC_CONNECT_TIMEOUT)
110 """RPC-wrapper decorator.
112 When applied to a function, it runs it with the RPC system
113 initialized, and it shutsdown the system afterwards. This means the
114 function must be called without RPC being initialized.
117 def wrapper(*args, **kwargs):
120 return fn(*args, **kwargs)
127 """Compresses a string for transport over RPC.
129 Small amounts of data are not compressed.
134 @return: Encoded data to send
137 # Small amounts of data are not compressed
139 return (constants.RPC_ENCODING_NONE, data)
141 # Compress with zlib and encode in base64
142 return (constants.RPC_ENCODING_ZLIB_BASE64,
143 base64.b64encode(zlib.compress(data, 3)))
146 class RpcResult(object):
149 This class holds an RPC result. It is needed since in multi-node
150 calls we can't raise an exception just because one one out of many
151 failed, and therefore we use this class to encapsulate the result.
153 @ivar data: the data payload, for successful results, or None
154 @ivar call: the name of the RPC call
155 @ivar node: the name of the node to which we made the call
156 @ivar offline: whether the operation failed because the node was
157 offline, as opposed to actual failure; offline=True will always
158 imply failed=True, in order to allow simpler checking if
159 the user doesn't care about the exact failure mode
160 @ivar fail_msg: the error message if the call failed
163 def __init__(self, data=None, failed=False, offline=False,
164 call=None, node=None):
165 self.offline = offline
170 self.fail_msg = "Node is marked offline"
171 self.data = self.payload = None
173 self.fail_msg = self._EnsureErr(data)
174 self.data = self.payload = None
177 if not isinstance(self.data, (tuple, list)):
178 self.fail_msg = ("RPC layer error: invalid result type (%s)" %
182 self.fail_msg = ("RPC layer error: invalid result length (%d), "
183 "expected 2" % len(self.data))
185 elif not self.data[0]:
186 self.fail_msg = self._EnsureErr(self.data[1])
191 self.payload = data[1]
193 for attr_name in ["call", "data", "fail_msg",
194 "node", "offline", "payload"]:
195 assert hasattr(self, attr_name), "Missing attribute %s" % attr_name
199 """Helper to ensure we return a 'True' value for error."""
203 return "No error information"
205 def Raise(self, msg, prereq=False, ecode=None):
206 """If the result has failed, raise an OpExecError.
208 This is used so that LU code doesn't have to check for each
209 result, but instead can call this function.
212 if not self.fail_msg:
215 if not msg: # one could pass None for default message
216 msg = ("Call '%s' to node '%s' has failed: %s" %
217 (self.call, self.node, self.fail_msg))
219 msg = "%s: %s" % (msg, self.fail_msg)
221 ec = errors.OpPrereqError
223 ec = errors.OpExecError
224 if ecode is not None:
228 raise ec(*args) # pylint: disable=W0142
231 def _SsconfResolver(ssconf_ips, node_list, _,
232 ssc=ssconf.SimpleStore,
233 nslookup_fn=netutils.Hostname.GetIP):
234 """Return addresses for given node names.
236 @type ssconf_ips: bool
237 @param ssconf_ips: Use the ssconf IPs
238 @type node_list: list
239 @param node_list: List of node names
241 @param ssc: SimpleStore class that is used to obtain node->ip mappings
242 @type nslookup_fn: callable
243 @param nslookup_fn: function use to do NS lookup
244 @rtype: list of tuple; (string, string)
245 @return: List of tuples containing node name and IP address
249 family = ss.GetPrimaryIPFamily()
252 iplist = ss.GetNodePrimaryIPList()
253 ipmap = dict(entry.split() for entry in iplist)
258 for node in node_list:
261 ip = nslookup_fn(node, family=family)
262 result.append((node, ip))
267 class _StaticResolver:
268 def __init__(self, addresses):
269 """Initializes this class.
272 self._addresses = addresses
274 def __call__(self, hosts, _):
275 """Returns static addresses for hosts.
278 assert len(hosts) == len(self._addresses)
279 return zip(hosts, self._addresses)
282 def _CheckConfigNode(name, node, accept_offline_node):
283 """Checks if a node is online.
286 @param name: Node name
287 @type node: L{objects.Node} or None
288 @param node: Node object
292 # Depend on DNS for name resolution
294 elif node.offline and not accept_offline_node:
301 def _NodeConfigResolver(single_node_fn, all_nodes_fn, hosts, opts):
302 """Calculate node addresses using configuration.
305 accept_offline_node = (opts is rpc_defs.ACCEPT_OFFLINE_NODE)
307 assert accept_offline_node or opts is None, "Unknown option"
309 # Special case for single-host lookups
312 return [_CheckConfigNode(name, single_node_fn(name), accept_offline_node)]
314 all_nodes = all_nodes_fn()
315 return [_CheckConfigNode(name, all_nodes.get(name, None),
321 def __init__(self, resolver, port, lock_monitor_cb=None):
322 """Initializes this class.
324 @param resolver: callable accepting a list of hostnames, returning a list
325 of tuples containing name and IP address (IP address can be the name or
326 the special value L{_OFFLINE} to mark offline machines)
328 @param port: TCP port
329 @param lock_monitor_cb: Callable for registering with lock monitor
332 self._resolver = resolver
334 self._lock_monitor_cb = lock_monitor_cb
337 def _PrepareRequests(hosts, port, procedure, body, read_timeout):
338 """Prepares requests by sorting offline hosts into separate list.
341 @param body: a dictionary with per-host body data
347 assert isinstance(body, dict)
348 assert len(body) == len(hosts)
349 assert compat.all(isinstance(v, str) for v in body.values())
350 assert frozenset(map(compat.fst, hosts)) == frozenset(body.keys()), \
351 "%s != %s" % (hosts, body.keys())
353 for (name, ip) in hosts:
355 # Node is marked as offline
356 results[name] = RpcResult(node=name, offline=True, call=procedure)
359 http.client.HttpClientRequest(str(ip), port,
360 http.HTTP_POST, str("/%s" % procedure),
361 headers=_RPC_CLIENT_HEADERS,
362 post_data=body[name],
363 read_timeout=read_timeout,
364 nicename="%s/%s" % (name, procedure),
365 curl_config_fn=_ConfigRpcCurl)
367 return (results, requests)
370 def _CombineResults(results, requests, procedure):
371 """Combines pre-computed results for offline hosts with actual call results.
374 for name, req in requests.items():
375 if req.success and req.resp_status_code == http.HTTP_OK:
376 host_result = RpcResult(data=serializer.LoadJson(req.resp_body),
377 node=name, call=procedure)
379 # TODO: Better error reporting
385 logging.error("RPC error in %s on node %s: %s", procedure, name, msg)
386 host_result = RpcResult(data=msg, failed=True, node=name,
389 results[name] = host_result
393 def __call__(self, hosts, procedure, body, read_timeout, resolver_opts,
394 _req_process_fn=None):
395 """Makes an RPC request to a number of nodes.
397 @type hosts: sequence
398 @param hosts: Hostnames
399 @type procedure: string
400 @param procedure: Request path
401 @type body: dictionary
402 @param body: dictionary with request bodies per host
403 @type read_timeout: int or None
404 @param read_timeout: Read timeout for request
407 assert read_timeout is not None, \
408 "Missing RPC read timeout for procedure '%s'" % procedure
410 if _req_process_fn is None:
411 _req_process_fn = http.client.ProcessRequests
413 (results, requests) = \
414 self._PrepareRequests(self._resolver(hosts, resolver_opts), self._port,
415 procedure, body, read_timeout)
417 _req_process_fn(requests.values(), lock_monitor_cb=self._lock_monitor_cb)
419 assert not frozenset(results).intersection(requests)
421 return self._CombineResults(results, requests, procedure)
424 class _RpcClientBase:
425 def __init__(self, resolver, encoder_fn, lock_monitor_cb=None,
426 _req_process_fn=None):
427 """Initializes this class.
430 proc = _RpcProcessor(resolver,
431 netutils.GetDaemonPort(constants.NODED),
432 lock_monitor_cb=lock_monitor_cb)
433 self._proc = compat.partial(proc, _req_process_fn=_req_process_fn)
434 self._encoder = compat.partial(self._EncodeArg, encoder_fn)
437 def _EncodeArg(encoder_fn, (argkind, value)):
444 return encoder_fn(argkind)(value)
446 def _Call(self, cdef, node_list, args):
447 """Entry point for automatically generated RPC wrappers.
450 (procedure, _, resolver_opts, timeout, argdefs,
451 prep_fn, postproc_fn, _) = cdef
453 if callable(timeout):
454 read_timeout = timeout(args)
456 read_timeout = timeout
458 if callable(resolver_opts):
459 req_resolver_opts = resolver_opts(args)
461 req_resolver_opts = resolver_opts
463 if len(args) != len(argdefs):
464 raise errors.ProgrammerError("Number of passed arguments doesn't match")
466 enc_args = map(self._encoder, zip(map(compat.snd, argdefs), args))
468 # for a no-op prep_fn, we serialise the body once, and then we
469 # reuse it in the dictionary values
470 body = serializer.DumpJson(enc_args)
471 pnbody = dict((n, body) for n in node_list)
473 # for a custom prep_fn, we pass the encoded arguments and the
474 # node name to the prep_fn, and we serialise its return value
475 assert callable(prep_fn)
476 pnbody = dict((n, serializer.DumpJson(prep_fn(n, enc_args)))
479 result = self._proc(node_list, procedure, pnbody, read_timeout,
483 return dict(map(lambda (key, value): (key, postproc_fn(value)),
489 def _ObjectToDict(value):
490 """Converts an object to a dictionary.
492 @note: See L{objects}.
495 return value.ToDict()
498 def _ObjectListToDict(value):
499 """Converts a list of L{objects} to dictionaries.
502 return map(_ObjectToDict, value)
505 def _EncodeNodeToDiskDict(value):
506 """Encodes a dictionary with node name as key and disk objects as values.
509 return dict((name, _ObjectListToDict(disks))
510 for name, disks in value.items())
513 def _PrepareFileUpload(getents_fn, filename):
514 """Loads a file and prepares it for an upload to nodes.
517 statcb = utils.FileStatHelper()
518 data = _Compress(utils.ReadFile(filename, preread=statcb))
521 if getents_fn is None:
522 getents_fn = runtime.GetEnts
524 getents = getents_fn()
526 return [filename, data, st.st_mode, getents.LookupUid(st.st_uid),
527 getents.LookupGid(st.st_gid), st.st_atime, st.st_mtime]
530 def _PrepareFinalizeExportDisks(snap_disks):
531 """Encodes disks for finalizing export.
536 for disk in snap_disks:
537 if isinstance(disk, bool):
538 flat_disks.append(disk)
540 flat_disks.append(disk.ToDict())
545 def _EncodeImportExportIO((ieio, ieioargs)):
546 """Encodes import/export I/O information.
549 if ieio == constants.IEIO_RAW_DISK:
550 assert len(ieioargs) == 1
551 return (ieio, (ieioargs[0].ToDict(), ))
553 if ieio == constants.IEIO_SCRIPT:
554 assert len(ieioargs) == 2
555 return (ieio, (ieioargs[0].ToDict(), ieioargs[1]))
557 return (ieio, ieioargs)
560 def _EncodeBlockdevRename(value):
561 """Encodes information for renaming block devices.
564 return [(d.ToDict(), uid) for d, uid in value]
567 def _AnnotateDParamsDRBD(disk, (drbd_params, data_params, meta_params)):
568 """Annotates just DRBD disks layouts.
571 assert disk.dev_type == constants.LD_DRBD8
573 disk.params = objects.FillDict(drbd_params, disk.params)
574 (dev_data, dev_meta) = disk.children
575 dev_data.params = objects.FillDict(data_params, dev_data.params)
576 dev_meta.params = objects.FillDict(meta_params, dev_meta.params)
581 def _AnnotateDParamsGeneric(disk, (params, )):
582 """Generic disk parameter annotation routine.
585 assert disk.dev_type != constants.LD_DRBD8
587 disk.params = objects.FillDict(params, disk.params)
592 def AnnotateDiskParams(template, disks, disk_params):
593 """Annotates the disk objects with the disk parameters.
595 @param template: The disk template used
596 @param disks: The list of disks objects to annotate
597 @param disk_params: The disk paramaters for annotation
598 @returns: A list of disk objects annotated
601 ld_params = objects.Disk.ComputeLDParams(template, disk_params)
603 if template == constants.DT_DRBD8:
604 annotation_fn = _AnnotateDParamsDRBD
605 elif template == constants.DT_DISKLESS:
606 annotation_fn = lambda disk, _: disk
608 annotation_fn = _AnnotateDParamsGeneric
612 new_disks.append(annotation_fn(disk.Copy(), ld_params))
619 rpc_defs.ED_OBJECT_DICT: _ObjectToDict,
620 rpc_defs.ED_OBJECT_DICT_LIST: _ObjectListToDict,
621 rpc_defs.ED_NODE_TO_DISK_DICT: _EncodeNodeToDiskDict,
622 rpc_defs.ED_COMPRESS: _Compress,
623 rpc_defs.ED_FINALIZE_EXPORT_DISKS: _PrepareFinalizeExportDisks,
624 rpc_defs.ED_IMPEXP_IO: _EncodeImportExportIO,
625 rpc_defs.ED_BLOCKDEV_RENAME: _EncodeBlockdevRename,
629 class RpcRunner(_RpcClientBase,
630 _generated_rpc.RpcClientDefault,
631 _generated_rpc.RpcClientBootstrap,
632 _generated_rpc.RpcClientDnsOnly,
633 _generated_rpc.RpcClientConfig):
637 def __init__(self, cfg, lock_monitor_cb, _req_process_fn=None, _getents=None):
638 """Initialized the RPC runner.
640 @type cfg: L{config.ConfigWriter}
641 @param cfg: Configuration
642 @type lock_monitor_cb: callable
643 @param lock_monitor_cb: Lock monitor callback
648 encoders = _ENCODERS.copy()
651 # Encoders requiring configuration object
652 rpc_defs.ED_INST_DICT: self._InstDict,
653 rpc_defs.ED_INST_DICT_HVP_BEP: self._InstDictHvpBep,
654 rpc_defs.ED_INST_DICT_OSP_DP: self._InstDictOspDp,
656 # Encoders annotating disk parameters
657 rpc_defs.ED_DISKS_DICT_DP: self._DisksDictDP,
658 rpc_defs.ED_SINGLE_DISK_DICT_DP: self._SingleDiskDictDP,
660 # Encoders with special requirements
661 rpc_defs.ED_FILE_DETAILS: compat.partial(_PrepareFileUpload, _getents),
664 # Resolver using configuration
665 resolver = compat.partial(_NodeConfigResolver, cfg.GetNodeInfo,
668 # Pylint doesn't recognize multiple inheritance properly, see
669 # <http://www.logilab.org/ticket/36586> and
670 # <http://www.logilab.org/ticket/35642>
671 # pylint: disable=W0233
672 _RpcClientBase.__init__(self, resolver, encoders.get,
673 lock_monitor_cb=lock_monitor_cb,
674 _req_process_fn=_req_process_fn)
675 _generated_rpc.RpcClientConfig.__init__(self)
676 _generated_rpc.RpcClientBootstrap.__init__(self)
677 _generated_rpc.RpcClientDnsOnly.__init__(self)
678 _generated_rpc.RpcClientDefault.__init__(self)
680 def _InstDict(self, instance, hvp=None, bep=None, osp=None):
681 """Convert the given instance to a dict.
683 This is done via the instance's ToDict() method and additionally
684 we fill the hvparams with the cluster defaults.
686 @type instance: L{objects.Instance}
687 @param instance: an Instance object
688 @type hvp: dict or None
689 @param hvp: a dictionary with overridden hypervisor parameters
690 @type bep: dict or None
691 @param bep: a dictionary with overridden backend parameters
692 @type osp: dict or None
693 @param osp: a dictionary with overridden os parameters
695 @return: the instance dict, with the hvparams filled with the
699 idict = instance.ToDict()
700 cluster = self._cfg.GetClusterInfo()
701 idict["hvparams"] = cluster.FillHV(instance)
703 idict["hvparams"].update(hvp)
704 idict["beparams"] = cluster.FillBE(instance)
706 idict["beparams"].update(bep)
707 idict["osparams"] = cluster.SimpleFillOS(instance.os, instance.osparams)
709 idict["osparams"].update(osp)
710 for nic in idict["nics"]:
711 nic["nicparams"] = objects.FillDict(
712 cluster.nicparams[constants.PP_DEFAULT],
716 def _InstDictHvpBep(self, (instance, hvp, bep)):
717 """Wrapper for L{_InstDict}.
720 return self._InstDict(instance, hvp=hvp, bep=bep)
722 def _InstDictOspDp(self, (instance, osparams)):
723 """Wrapper for L{_InstDict}.
726 updated_inst = self._InstDict(instance, osp=osparams)
727 updated_inst["disks"] = self._DisksDictDP((instance.disks, instance))
730 def _DisksDictDP(self, (disks, instance)):
731 """Wrapper for L{AnnotateDiskParams}.
734 diskparams = self._cfg.GetInstanceDiskParams(instance)
735 return [disk.ToDict()
736 for disk in AnnotateDiskParams(instance.disk_template,
739 def _SingleDiskDictDP(self, (disk, instance)):
740 """Wrapper for L{AnnotateDiskParams}.
743 (anno_disk,) = self._DisksDictDP(([disk], instance))
747 class JobQueueRunner(_RpcClientBase, _generated_rpc.RpcClientJobQueue):
748 """RPC wrappers for job queue.
751 def __init__(self, context, address_list):
752 """Initializes this class.
755 if address_list is None:
756 resolver = compat.partial(_SsconfResolver, True)
758 # Caller provided an address list
759 resolver = _StaticResolver(address_list)
761 _RpcClientBase.__init__(self, resolver, _ENCODERS.get,
762 lock_monitor_cb=context.glm.AddToLockMonitor)
763 _generated_rpc.RpcClientJobQueue.__init__(self)
766 class BootstrapRunner(_RpcClientBase,
767 _generated_rpc.RpcClientBootstrap,
768 _generated_rpc.RpcClientDnsOnly):
769 """RPC wrappers for bootstrapping.
773 """Initializes this class.
776 # Pylint doesn't recognize multiple inheritance properly, see
777 # <http://www.logilab.org/ticket/36586> and
778 # <http://www.logilab.org/ticket/35642>
779 # pylint: disable=W0233
780 _RpcClientBase.__init__(self, compat.partial(_SsconfResolver, True),
782 _generated_rpc.RpcClientBootstrap.__init__(self)
783 _generated_rpc.RpcClientDnsOnly.__init__(self)
786 class DnsOnlyRunner(_RpcClientBase, _generated_rpc.RpcClientDnsOnly):
787 """RPC wrappers for calls using only DNS.
791 """Initialize this class.
794 _RpcClientBase.__init__(self, compat.partial(_SsconfResolver, False),
796 _generated_rpc.RpcClientDnsOnly.__init__(self)
799 class ConfigRunner(_RpcClientBase, _generated_rpc.RpcClientConfig):
800 """RPC wrappers for L{config}.
803 def __init__(self, context, address_list, _req_process_fn=None,
805 """Initializes this class.
809 lock_monitor_cb = context.glm.AddToLockMonitor
811 lock_monitor_cb = None
813 if address_list is None:
814 resolver = compat.partial(_SsconfResolver, True)
816 # Caller provided an address list
817 resolver = _StaticResolver(address_list)
819 encoders = _ENCODERS.copy()
822 rpc_defs.ED_FILE_DETAILS: compat.partial(_PrepareFileUpload, _getents),
825 _RpcClientBase.__init__(self, resolver, encoders.get,
826 lock_monitor_cb=lock_monitor_cb,
827 _req_process_fn=_req_process_fn)
828 _generated_rpc.RpcClientConfig.__init__(self)