Replace single- with double-quotes
[ganeti-local] / lib / rpc.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Inter-node RPC library.
23
24 """
25
26 # pylint: disable=C0103,R0201,R0904
27 # C0103: Invalid name, since call_ are not valid
28 # R0201: Method could be a function, we keep all rpcs instance methods
29 # as not to change them back and forth between static/instance methods
30 # if they need to start using instance attributes
31 # R0904: Too many public methods
32
33 import logging
34 import zlib
35 import base64
36 import pycurl
37 import threading
38
39 from ganeti import utils
40 from ganeti import objects
41 from ganeti import http
42 from ganeti import serializer
43 from ganeti import constants
44 from ganeti import errors
45 from ganeti import netutils
46 from ganeti import ssconf
47 from ganeti import runtime
48 from ganeti import compat
49 from ganeti import rpc_defs
50
51 # Special module generated at build time
52 from ganeti import _generated_rpc
53
54 # pylint has a bug here, doesn't see this import
55 import ganeti.http.client  # pylint: disable=W0611
56
57
58 # Timeout for connecting to nodes (seconds)
59 _RPC_CONNECT_TIMEOUT = 5
60
61 _RPC_CLIENT_HEADERS = [
62   "Content-type: %s" % http.HTTP_APP_JSON,
63   "Expect:",
64   ]
65
66 # Various time constants for the timeout table
67 _TMO_URGENT = 60 # one minute
68 _TMO_FAST = 5 * 60 # five minutes
69 _TMO_NORMAL = 15 * 60 # 15 minutes
70 _TMO_SLOW = 3600 # one hour
71 _TMO_4HRS = 4 * 3600
72 _TMO_1DAY = 86400
73
74 #: Special value to describe an offline host
75 _OFFLINE = object()
76
77
78 def Init():
79   """Initializes the module-global HTTP client manager.
80
81   Must be called before using any RPC function and while exactly one thread is
82   running.
83
84   """
85   # curl_global_init(3) and curl_global_cleanup(3) must be called with only
86   # one thread running. This check is just a safety measure -- it doesn't
87   # cover all cases.
88   assert threading.activeCount() == 1, \
89          "Found more than one active thread when initializing pycURL"
90
91   logging.info("Using PycURL %s", pycurl.version)
92
93   pycurl.global_init(pycurl.GLOBAL_ALL)
94
95
96 def Shutdown():
97   """Stops the module-global HTTP client manager.
98
99   Must be called before quitting the program and while exactly one thread is
100   running.
101
102   """
103   pycurl.global_cleanup()
104
105
106 def _ConfigRpcCurl(curl):
107   noded_cert = str(constants.NODED_CERT_FILE)
108
109   curl.setopt(pycurl.FOLLOWLOCATION, False)
110   curl.setopt(pycurl.CAINFO, noded_cert)
111   curl.setopt(pycurl.SSL_VERIFYHOST, 0)
112   curl.setopt(pycurl.SSL_VERIFYPEER, True)
113   curl.setopt(pycurl.SSLCERTTYPE, "PEM")
114   curl.setopt(pycurl.SSLCERT, noded_cert)
115   curl.setopt(pycurl.SSLKEYTYPE, "PEM")
116   curl.setopt(pycurl.SSLKEY, noded_cert)
117   curl.setopt(pycurl.CONNECTTIMEOUT, _RPC_CONNECT_TIMEOUT)
118
119
120 def RunWithRPC(fn):
121   """RPC-wrapper decorator.
122
123   When applied to a function, it runs it with the RPC system
124   initialized, and it shutsdown the system afterwards. This means the
125   function must be called without RPC being initialized.
126
127   """
128   def wrapper(*args, **kwargs):
129     Init()
130     try:
131       return fn(*args, **kwargs)
132     finally:
133       Shutdown()
134   return wrapper
135
136
137 def _Compress(data):
138   """Compresses a string for transport over RPC.
139
140   Small amounts of data are not compressed.
141
142   @type data: str
143   @param data: Data
144   @rtype: tuple
145   @return: Encoded data to send
146
147   """
148   # Small amounts of data are not compressed
149   if len(data) < 512:
150     return (constants.RPC_ENCODING_NONE, data)
151
152   # Compress with zlib and encode in base64
153   return (constants.RPC_ENCODING_ZLIB_BASE64,
154           base64.b64encode(zlib.compress(data, 3)))
155
156
157 class RpcResult(object):
158   """RPC Result class.
159
160   This class holds an RPC result. It is needed since in multi-node
161   calls we can't raise an exception just because one one out of many
162   failed, and therefore we use this class to encapsulate the result.
163
164   @ivar data: the data payload, for successful results, or None
165   @ivar call: the name of the RPC call
166   @ivar node: the name of the node to which we made the call
167   @ivar offline: whether the operation failed because the node was
168       offline, as opposed to actual failure; offline=True will always
169       imply failed=True, in order to allow simpler checking if
170       the user doesn't care about the exact failure mode
171   @ivar fail_msg: the error message if the call failed
172
173   """
174   def __init__(self, data=None, failed=False, offline=False,
175                call=None, node=None):
176     self.offline = offline
177     self.call = call
178     self.node = node
179
180     if offline:
181       self.fail_msg = "Node is marked offline"
182       self.data = self.payload = None
183     elif failed:
184       self.fail_msg = self._EnsureErr(data)
185       self.data = self.payload = None
186     else:
187       self.data = data
188       if not isinstance(self.data, (tuple, list)):
189         self.fail_msg = ("RPC layer error: invalid result type (%s)" %
190                          type(self.data))
191         self.payload = None
192       elif len(data) != 2:
193         self.fail_msg = ("RPC layer error: invalid result length (%d), "
194                          "expected 2" % len(self.data))
195         self.payload = None
196       elif not self.data[0]:
197         self.fail_msg = self._EnsureErr(self.data[1])
198         self.payload = None
199       else:
200         # finally success
201         self.fail_msg = None
202         self.payload = data[1]
203
204     for attr_name in ["call", "data", "fail_msg",
205                       "node", "offline", "payload"]:
206       assert hasattr(self, attr_name), "Missing attribute %s" % attr_name
207
208   @staticmethod
209   def _EnsureErr(val):
210     """Helper to ensure we return a 'True' value for error."""
211     if val:
212       return val
213     else:
214       return "No error information"
215
216   def Raise(self, msg, prereq=False, ecode=None):
217     """If the result has failed, raise an OpExecError.
218
219     This is used so that LU code doesn't have to check for each
220     result, but instead can call this function.
221
222     """
223     if not self.fail_msg:
224       return
225
226     if not msg: # one could pass None for default message
227       msg = ("Call '%s' to node '%s' has failed: %s" %
228              (self.call, self.node, self.fail_msg))
229     else:
230       msg = "%s: %s" % (msg, self.fail_msg)
231     if prereq:
232       ec = errors.OpPrereqError
233     else:
234       ec = errors.OpExecError
235     if ecode is not None:
236       args = (msg, ecode)
237     else:
238       args = (msg, )
239     raise ec(*args) # pylint: disable=W0142
240
241
242 def _SsconfResolver(node_list, _,
243                     ssc=ssconf.SimpleStore,
244                     nslookup_fn=netutils.Hostname.GetIP):
245   """Return addresses for given node names.
246
247   @type node_list: list
248   @param node_list: List of node names
249   @type ssc: class
250   @param ssc: SimpleStore class that is used to obtain node->ip mappings
251   @type nslookup_fn: callable
252   @param nslookup_fn: function use to do NS lookup
253   @rtype: list of tuple; (string, string)
254   @return: List of tuples containing node name and IP address
255
256   """
257   ss = ssc()
258   iplist = ss.GetNodePrimaryIPList()
259   family = ss.GetPrimaryIPFamily()
260   ipmap = dict(entry.split() for entry in iplist)
261
262   result = []
263   for node in node_list:
264     ip = ipmap.get(node)
265     if ip is None:
266       ip = nslookup_fn(node, family=family)
267     result.append((node, ip))
268
269   return result
270
271
272 class _StaticResolver:
273   def __init__(self, addresses):
274     """Initializes this class.
275
276     """
277     self._addresses = addresses
278
279   def __call__(self, hosts, _):
280     """Returns static addresses for hosts.
281
282     """
283     assert len(hosts) == len(self._addresses)
284     return zip(hosts, self._addresses)
285
286
287 def _CheckConfigNode(name, node, accept_offline_node):
288   """Checks if a node is online.
289
290   @type name: string
291   @param name: Node name
292   @type node: L{objects.Node} or None
293   @param node: Node object
294
295   """
296   if node is None:
297     # Depend on DNS for name resolution
298     ip = name
299   elif node.offline and not accept_offline_node:
300     ip = _OFFLINE
301   else:
302     ip = node.primary_ip
303   return (name, ip)
304
305
306 def _NodeConfigResolver(single_node_fn, all_nodes_fn, hosts, opts):
307   """Calculate node addresses using configuration.
308
309   """
310   accept_offline_node = (opts is rpc_defs.ACCEPT_OFFLINE_NODE)
311
312   assert accept_offline_node or opts is None, "Unknown option"
313
314   # Special case for single-host lookups
315   if len(hosts) == 1:
316     (name, ) = hosts
317     return [_CheckConfigNode(name, single_node_fn(name), accept_offline_node)]
318   else:
319     all_nodes = all_nodes_fn()
320     return [_CheckConfigNode(name, all_nodes.get(name, None),
321                              accept_offline_node)
322             for name in hosts]
323
324
325 class _RpcProcessor:
326   def __init__(self, resolver, port, lock_monitor_cb=None):
327     """Initializes this class.
328
329     @param resolver: callable accepting a list of hostnames, returning a list
330       of tuples containing name and IP address (IP address can be the name or
331       the special value L{_OFFLINE} to mark offline machines)
332     @type port: int
333     @param port: TCP port
334     @param lock_monitor_cb: Callable for registering with lock monitor
335
336     """
337     self._resolver = resolver
338     self._port = port
339     self._lock_monitor_cb = lock_monitor_cb
340
341   @staticmethod
342   def _PrepareRequests(hosts, port, procedure, body, read_timeout):
343     """Prepares requests by sorting offline hosts into separate list.
344
345     @type body: dict
346     @param body: a dictionary with per-host body data
347
348     """
349     results = {}
350     requests = {}
351
352     assert isinstance(body, dict)
353     assert len(body) == len(hosts)
354     assert compat.all(isinstance(v, str) for v in body.values())
355     assert frozenset(map(compat.fst, hosts)) == frozenset(body.keys()), \
356         "%s != %s" % (hosts, body.keys())
357
358     for (name, ip) in hosts:
359       if ip is _OFFLINE:
360         # Node is marked as offline
361         results[name] = RpcResult(node=name, offline=True, call=procedure)
362       else:
363         requests[name] = \
364           http.client.HttpClientRequest(str(ip), port,
365                                         http.HTTP_POST, str("/%s" % procedure),
366                                         headers=_RPC_CLIENT_HEADERS,
367                                         post_data=body[name],
368                                         read_timeout=read_timeout,
369                                         nicename="%s/%s" % (name, procedure),
370                                         curl_config_fn=_ConfigRpcCurl)
371
372     return (results, requests)
373
374   @staticmethod
375   def _CombineResults(results, requests, procedure):
376     """Combines pre-computed results for offline hosts with actual call results.
377
378     """
379     for name, req in requests.items():
380       if req.success and req.resp_status_code == http.HTTP_OK:
381         host_result = RpcResult(data=serializer.LoadJson(req.resp_body),
382                                 node=name, call=procedure)
383       else:
384         # TODO: Better error reporting
385         if req.error:
386           msg = req.error
387         else:
388           msg = req.resp_body
389
390         logging.error("RPC error in %s on node %s: %s", procedure, name, msg)
391         host_result = RpcResult(data=msg, failed=True, node=name,
392                                 call=procedure)
393
394       results[name] = host_result
395
396     return results
397
398   def __call__(self, hosts, procedure, body, read_timeout, resolver_opts,
399                _req_process_fn=None):
400     """Makes an RPC request to a number of nodes.
401
402     @type hosts: sequence
403     @param hosts: Hostnames
404     @type procedure: string
405     @param procedure: Request path
406     @type body: dictionary
407     @param body: dictionary with request bodies per host
408     @type read_timeout: int or None
409     @param read_timeout: Read timeout for request
410
411     """
412     assert read_timeout is not None, \
413       "Missing RPC read timeout for procedure '%s'" % procedure
414
415     if _req_process_fn is None:
416       _req_process_fn = http.client.ProcessRequests
417
418     (results, requests) = \
419       self._PrepareRequests(self._resolver(hosts, resolver_opts), self._port,
420                             procedure, body, read_timeout)
421
422     _req_process_fn(requests.values(), lock_monitor_cb=self._lock_monitor_cb)
423
424     assert not frozenset(results).intersection(requests)
425
426     return self._CombineResults(results, requests, procedure)
427
428
429 class _RpcClientBase:
430   def __init__(self, resolver, encoder_fn, lock_monitor_cb=None,
431                _req_process_fn=None):
432     """Initializes this class.
433
434     """
435     proc = _RpcProcessor(resolver,
436                          netutils.GetDaemonPort(constants.NODED),
437                          lock_monitor_cb=lock_monitor_cb)
438     self._proc = compat.partial(proc, _req_process_fn=_req_process_fn)
439     self._encoder = compat.partial(self._EncodeArg, encoder_fn)
440
441   @staticmethod
442   def _EncodeArg(encoder_fn, (argkind, value)):
443     """Encode argument.
444
445     """
446     if argkind is None:
447       return value
448     else:
449       return encoder_fn(argkind)(value)
450
451   def _Call(self, cdef, node_list, args):
452     """Entry point for automatically generated RPC wrappers.
453
454     """
455     (procedure, _, resolver_opts, timeout, argdefs,
456      prep_fn, postproc_fn, _) = cdef
457
458     if callable(timeout):
459       read_timeout = timeout(args)
460     else:
461       read_timeout = timeout
462
463     if callable(resolver_opts):
464       req_resolver_opts = resolver_opts(args)
465     else:
466       req_resolver_opts = resolver_opts
467
468     if len(args) != len(argdefs):
469       raise errors.ProgrammerError("Number of passed arguments doesn't match")
470
471     enc_args = map(self._encoder, zip(map(compat.snd, argdefs), args))
472     if prep_fn is None:
473       # for a no-op prep_fn, we serialise the body once, and then we
474       # reuse it in the dictionary values
475       body = serializer.DumpJson(enc_args)
476       pnbody = dict((n, body) for n in node_list)
477     else:
478       # for a custom prep_fn, we pass the encoded arguments and the
479       # node name to the prep_fn, and we serialise its return value
480       assert callable(prep_fn)
481       pnbody = dict((n, serializer.DumpJson(prep_fn(n, enc_args)))
482                     for n in node_list)
483
484     result = self._proc(node_list, procedure, pnbody, read_timeout,
485                         req_resolver_opts)
486
487     if postproc_fn:
488       return dict(map(lambda (key, value): (key, postproc_fn(value)),
489                       result.items()))
490     else:
491       return result
492
493
494 def _ObjectToDict(value):
495   """Converts an object to a dictionary.
496
497   @note: See L{objects}.
498
499   """
500   return value.ToDict()
501
502
503 def _ObjectListToDict(value):
504   """Converts a list of L{objects} to dictionaries.
505
506   """
507   return map(_ObjectToDict, value)
508
509
510 def _EncodeNodeToDiskDict(value):
511   """Encodes a dictionary with node name as key and disk objects as values.
512
513   """
514   return dict((name, _ObjectListToDict(disks))
515               for name, disks in value.items())
516
517
518 def _PrepareFileUpload(getents_fn, filename):
519   """Loads a file and prepares it for an upload to nodes.
520
521   """
522   statcb = utils.FileStatHelper()
523   data = _Compress(utils.ReadFile(filename, preread=statcb))
524   st = statcb.st
525
526   if getents_fn is None:
527     getents_fn = runtime.GetEnts
528
529   getents = getents_fn()
530
531   return [filename, data, st.st_mode, getents.LookupUid(st.st_uid),
532           getents.LookupGid(st.st_gid), st.st_atime, st.st_mtime]
533
534
535 def _PrepareFinalizeExportDisks(snap_disks):
536   """Encodes disks for finalizing export.
537
538   """
539   flat_disks = []
540
541   for disk in snap_disks:
542     if isinstance(disk, bool):
543       flat_disks.append(disk)
544     else:
545       flat_disks.append(disk.ToDict())
546
547   return flat_disks
548
549
550 def _EncodeImportExportIO((ieio, ieioargs)):
551   """Encodes import/export I/O information.
552
553   """
554   if ieio == constants.IEIO_RAW_DISK:
555     assert len(ieioargs) == 1
556     return (ieio, (ieioargs[0].ToDict(), ))
557
558   if ieio == constants.IEIO_SCRIPT:
559     assert len(ieioargs) == 2
560     return (ieio, (ieioargs[0].ToDict(), ieioargs[1]))
561
562   return (ieio, ieioargs)
563
564
565 def _EncodeBlockdevRename(value):
566   """Encodes information for renaming block devices.
567
568   """
569   return [(d.ToDict(), uid) for d, uid in value]
570
571
572 #: Generic encoders
573 _ENCODERS = {
574   rpc_defs.ED_OBJECT_DICT: _ObjectToDict,
575   rpc_defs.ED_OBJECT_DICT_LIST: _ObjectListToDict,
576   rpc_defs.ED_NODE_TO_DISK_DICT: _EncodeNodeToDiskDict,
577   rpc_defs.ED_COMPRESS: _Compress,
578   rpc_defs.ED_FINALIZE_EXPORT_DISKS: _PrepareFinalizeExportDisks,
579   rpc_defs.ED_IMPEXP_IO: _EncodeImportExportIO,
580   rpc_defs.ED_BLOCKDEV_RENAME: _EncodeBlockdevRename,
581   }
582
583
584 class RpcRunner(_RpcClientBase,
585                 _generated_rpc.RpcClientDefault,
586                 _generated_rpc.RpcClientBootstrap,
587                 _generated_rpc.RpcClientConfig):
588   """RPC runner class.
589
590   """
591   def __init__(self, cfg, lock_monitor_cb, _req_process_fn=None, _getents=None):
592     """Initialized the RPC runner.
593
594     @type cfg: L{config.ConfigWriter}
595     @param cfg: Configuration
596     @type lock_monitor_cb: callable
597     @param lock_monitor_cb: Lock monitor callback
598
599     """
600     self._cfg = cfg
601
602     encoders = _ENCODERS.copy()
603
604     encoders.update({
605       # Encoders requiring configuration object
606       rpc_defs.ED_INST_DICT: self._InstDict,
607       rpc_defs.ED_INST_DICT_HVP_BEP: self._InstDictHvpBep,
608       rpc_defs.ED_INST_DICT_OSP: self._InstDictOsp,
609
610       # Encoders with special requirements
611       rpc_defs.ED_FILE_DETAILS: compat.partial(_PrepareFileUpload, _getents),
612       })
613
614     # Resolver using configuration
615     resolver = compat.partial(_NodeConfigResolver, cfg.GetNodeInfo,
616                               cfg.GetAllNodesInfo)
617
618     # Pylint doesn't recognize multiple inheritance properly, see
619     # <http://www.logilab.org/ticket/36586> and
620     # <http://www.logilab.org/ticket/35642>
621     # pylint: disable=W0233
622     _RpcClientBase.__init__(self, resolver, encoders.get,
623                             lock_monitor_cb=lock_monitor_cb,
624                             _req_process_fn=_req_process_fn)
625     _generated_rpc.RpcClientConfig.__init__(self)
626     _generated_rpc.RpcClientBootstrap.__init__(self)
627     _generated_rpc.RpcClientDefault.__init__(self)
628
629   def _InstDict(self, instance, hvp=None, bep=None, osp=None):
630     """Convert the given instance to a dict.
631
632     This is done via the instance's ToDict() method and additionally
633     we fill the hvparams with the cluster defaults.
634
635     @type instance: L{objects.Instance}
636     @param instance: an Instance object
637     @type hvp: dict or None
638     @param hvp: a dictionary with overridden hypervisor parameters
639     @type bep: dict or None
640     @param bep: a dictionary with overridden backend parameters
641     @type osp: dict or None
642     @param osp: a dictionary with overridden os parameters
643     @rtype: dict
644     @return: the instance dict, with the hvparams filled with the
645         cluster defaults
646
647     """
648     idict = instance.ToDict()
649     cluster = self._cfg.GetClusterInfo()
650     idict["hvparams"] = cluster.FillHV(instance)
651     if hvp is not None:
652       idict["hvparams"].update(hvp)
653     idict["beparams"] = cluster.FillBE(instance)
654     if bep is not None:
655       idict["beparams"].update(bep)
656     idict["osparams"] = cluster.SimpleFillOS(instance.os, instance.osparams)
657     if osp is not None:
658       idict["osparams"].update(osp)
659     for nic in idict["nics"]:
660       nic["nicparams"] = objects.FillDict(
661         cluster.nicparams[constants.PP_DEFAULT],
662         nic["nicparams"])
663     return idict
664
665   def _InstDictHvpBep(self, (instance, hvp, bep)):
666     """Wrapper for L{_InstDict}.
667
668     """
669     return self._InstDict(instance, hvp=hvp, bep=bep)
670
671   def _InstDictOsp(self, (instance, osparams)):
672     """Wrapper for L{_InstDict}.
673
674     """
675     return self._InstDict(instance, osp=osparams)
676
677
678 class JobQueueRunner(_RpcClientBase, _generated_rpc.RpcClientJobQueue):
679   """RPC wrappers for job queue.
680
681   """
682   def __init__(self, context, address_list):
683     """Initializes this class.
684
685     """
686     if address_list is None:
687       resolver = _SsconfResolver
688     else:
689       # Caller provided an address list
690       resolver = _StaticResolver(address_list)
691
692     _RpcClientBase.__init__(self, resolver, _ENCODERS.get,
693                             lock_monitor_cb=context.glm.AddToLockMonitor)
694     _generated_rpc.RpcClientJobQueue.__init__(self)
695
696
697 class BootstrapRunner(_RpcClientBase, _generated_rpc.RpcClientBootstrap):
698   """RPC wrappers for bootstrapping.
699
700   """
701   def __init__(self):
702     """Initializes this class.
703
704     """
705     _RpcClientBase.__init__(self, _SsconfResolver, _ENCODERS.get)
706     _generated_rpc.RpcClientBootstrap.__init__(self)
707
708
709 class ConfigRunner(_RpcClientBase, _generated_rpc.RpcClientConfig):
710   """RPC wrappers for L{config}.
711
712   """
713   def __init__(self, context, address_list, _req_process_fn=None,
714                _getents=None):
715     """Initializes this class.
716
717     """
718     if context:
719       lock_monitor_cb = context.glm.AddToLockMonitor
720     else:
721       lock_monitor_cb = None
722
723     if address_list is None:
724       resolver = _SsconfResolver
725     else:
726       # Caller provided an address list
727       resolver = _StaticResolver(address_list)
728
729     encoders = _ENCODERS.copy()
730
731     encoders.update({
732       rpc_defs.ED_FILE_DETAILS: compat.partial(_PrepareFileUpload, _getents),
733       })
734
735     _RpcClientBase.__init__(self, resolver, encoders.get,
736                             lock_monitor_cb=lock_monitor_cb,
737                             _req_process_fn=_req_process_fn)
738     _generated_rpc.RpcClientConfig.__init__(self)