rpc: Call result processor once for each node result
[ganeti-local] / lib / rpc.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Inter-node RPC library.
23
24 """
25
26 # pylint: disable=C0103,R0201,R0904
27 # C0103: Invalid name, since call_ are not valid
28 # R0201: Method could be a function, we keep all rpcs instance methods
29 # as not to change them back and forth between static/instance methods
30 # if they need to start using instance attributes
31 # R0904: Too many public methods
32
33 import os
34 import logging
35 import zlib
36 import base64
37 import pycurl
38 import threading
39
40 from ganeti import utils
41 from ganeti import objects
42 from ganeti import http
43 from ganeti import serializer
44 from ganeti import constants
45 from ganeti import errors
46 from ganeti import netutils
47 from ganeti import ssconf
48 from ganeti import runtime
49 from ganeti import compat
50 from ganeti import rpc_defs
51
52 # Special module generated at build time
53 from ganeti import _generated_rpc
54
55 # pylint has a bug here, doesn't see this import
56 import ganeti.http.client  # pylint: disable=W0611
57
58
59 # Timeout for connecting to nodes (seconds)
60 _RPC_CONNECT_TIMEOUT = 5
61
62 _RPC_CLIENT_HEADERS = [
63   "Content-type: %s" % http.HTTP_APP_JSON,
64   "Expect:",
65   ]
66
67 # Various time constants for the timeout table
68 _TMO_URGENT = 60 # one minute
69 _TMO_FAST = 5 * 60 # five minutes
70 _TMO_NORMAL = 15 * 60 # 15 minutes
71 _TMO_SLOW = 3600 # one hour
72 _TMO_4HRS = 4 * 3600
73 _TMO_1DAY = 86400
74
75 #: Special value to describe an offline host
76 _OFFLINE = object()
77
78
79 def Init():
80   """Initializes the module-global HTTP client manager.
81
82   Must be called before using any RPC function and while exactly one thread is
83   running.
84
85   """
86   # curl_global_init(3) and curl_global_cleanup(3) must be called with only
87   # one thread running. This check is just a safety measure -- it doesn't
88   # cover all cases.
89   assert threading.activeCount() == 1, \
90          "Found more than one active thread when initializing pycURL"
91
92   logging.info("Using PycURL %s", pycurl.version)
93
94   pycurl.global_init(pycurl.GLOBAL_ALL)
95
96
97 def Shutdown():
98   """Stops the module-global HTTP client manager.
99
100   Must be called before quitting the program and while exactly one thread is
101   running.
102
103   """
104   pycurl.global_cleanup()
105
106
107 def _ConfigRpcCurl(curl):
108   noded_cert = str(constants.NODED_CERT_FILE)
109
110   curl.setopt(pycurl.FOLLOWLOCATION, False)
111   curl.setopt(pycurl.CAINFO, noded_cert)
112   curl.setopt(pycurl.SSL_VERIFYHOST, 0)
113   curl.setopt(pycurl.SSL_VERIFYPEER, True)
114   curl.setopt(pycurl.SSLCERTTYPE, "PEM")
115   curl.setopt(pycurl.SSLCERT, noded_cert)
116   curl.setopt(pycurl.SSLKEYTYPE, "PEM")
117   curl.setopt(pycurl.SSLKEY, noded_cert)
118   curl.setopt(pycurl.CONNECTTIMEOUT, _RPC_CONNECT_TIMEOUT)
119
120
121 def RunWithRPC(fn):
122   """RPC-wrapper decorator.
123
124   When applied to a function, it runs it with the RPC system
125   initialized, and it shutsdown the system afterwards. This means the
126   function must be called without RPC being initialized.
127
128   """
129   def wrapper(*args, **kwargs):
130     Init()
131     try:
132       return fn(*args, **kwargs)
133     finally:
134       Shutdown()
135   return wrapper
136
137
138 def _Compress(data):
139   """Compresses a string for transport over RPC.
140
141   Small amounts of data are not compressed.
142
143   @type data: str
144   @param data: Data
145   @rtype: tuple
146   @return: Encoded data to send
147
148   """
149   # Small amounts of data are not compressed
150   if len(data) < 512:
151     return (constants.RPC_ENCODING_NONE, data)
152
153   # Compress with zlib and encode in base64
154   return (constants.RPC_ENCODING_ZLIB_BASE64,
155           base64.b64encode(zlib.compress(data, 3)))
156
157
158 class RpcResult(object):
159   """RPC Result class.
160
161   This class holds an RPC result. It is needed since in multi-node
162   calls we can't raise an exception just because one one out of many
163   failed, and therefore we use this class to encapsulate the result.
164
165   @ivar data: the data payload, for successful results, or None
166   @ivar call: the name of the RPC call
167   @ivar node: the name of the node to which we made the call
168   @ivar offline: whether the operation failed because the node was
169       offline, as opposed to actual failure; offline=True will always
170       imply failed=True, in order to allow simpler checking if
171       the user doesn't care about the exact failure mode
172   @ivar fail_msg: the error message if the call failed
173
174   """
175   def __init__(self, data=None, failed=False, offline=False,
176                call=None, node=None):
177     self.offline = offline
178     self.call = call
179     self.node = node
180
181     if offline:
182       self.fail_msg = "Node is marked offline"
183       self.data = self.payload = None
184     elif failed:
185       self.fail_msg = self._EnsureErr(data)
186       self.data = self.payload = None
187     else:
188       self.data = data
189       if not isinstance(self.data, (tuple, list)):
190         self.fail_msg = ("RPC layer error: invalid result type (%s)" %
191                          type(self.data))
192         self.payload = None
193       elif len(data) != 2:
194         self.fail_msg = ("RPC layer error: invalid result length (%d), "
195                          "expected 2" % len(self.data))
196         self.payload = None
197       elif not self.data[0]:
198         self.fail_msg = self._EnsureErr(self.data[1])
199         self.payload = None
200       else:
201         # finally success
202         self.fail_msg = None
203         self.payload = data[1]
204
205     for attr_name in ["call", "data", "fail_msg",
206                       "node", "offline", "payload"]:
207       assert hasattr(self, attr_name), "Missing attribute %s" % attr_name
208
209   @staticmethod
210   def _EnsureErr(val):
211     """Helper to ensure we return a 'True' value for error."""
212     if val:
213       return val
214     else:
215       return "No error information"
216
217   def Raise(self, msg, prereq=False, ecode=None):
218     """If the result has failed, raise an OpExecError.
219
220     This is used so that LU code doesn't have to check for each
221     result, but instead can call this function.
222
223     """
224     if not self.fail_msg:
225       return
226
227     if not msg: # one could pass None for default message
228       msg = ("Call '%s' to node '%s' has failed: %s" %
229              (self.call, self.node, self.fail_msg))
230     else:
231       msg = "%s: %s" % (msg, self.fail_msg)
232     if prereq:
233       ec = errors.OpPrereqError
234     else:
235       ec = errors.OpExecError
236     if ecode is not None:
237       args = (msg, ecode)
238     else:
239       args = (msg, )
240     raise ec(*args) # pylint: disable=W0142
241
242
243 def _SsconfResolver(node_list,
244                     ssc=ssconf.SimpleStore,
245                     nslookup_fn=netutils.Hostname.GetIP):
246   """Return addresses for given node names.
247
248   @type node_list: list
249   @param node_list: List of node names
250   @type ssc: class
251   @param ssc: SimpleStore class that is used to obtain node->ip mappings
252   @type nslookup_fn: callable
253   @param nslookup_fn: function use to do NS lookup
254   @rtype: list of tuple; (string, string)
255   @return: List of tuples containing node name and IP address
256
257   """
258   ss = ssc()
259   iplist = ss.GetNodePrimaryIPList()
260   family = ss.GetPrimaryIPFamily()
261   ipmap = dict(entry.split() for entry in iplist)
262
263   result = []
264   for node in node_list:
265     ip = ipmap.get(node)
266     if ip is None:
267       ip = nslookup_fn(node, family=family)
268     result.append((node, ip))
269
270   return result
271
272
273 class _StaticResolver:
274   def __init__(self, addresses):
275     """Initializes this class.
276
277     """
278     self._addresses = addresses
279
280   def __call__(self, hosts):
281     """Returns static addresses for hosts.
282
283     """
284     assert len(hosts) == len(self._addresses)
285     return zip(hosts, self._addresses)
286
287
288 def _CheckConfigNode(name, node):
289   """Checks if a node is online.
290
291   @type name: string
292   @param name: Node name
293   @type node: L{objects.Node} or None
294   @param node: Node object
295
296   """
297   if node is None:
298     # Depend on DNS for name resolution
299     ip = name
300   elif node.offline:
301     ip = _OFFLINE
302   else:
303     ip = node.primary_ip
304   return (name, ip)
305
306
307 def _NodeConfigResolver(single_node_fn, all_nodes_fn, hosts):
308   """Calculate node addresses using configuration.
309
310   """
311   # Special case for single-host lookups
312   if len(hosts) == 1:
313     (name, ) = hosts
314     return [_CheckConfigNode(name, single_node_fn(name))]
315   else:
316     all_nodes = all_nodes_fn()
317     return [_CheckConfigNode(name, all_nodes.get(name, None))
318             for name in hosts]
319
320
321 class _RpcProcessor:
322   def __init__(self, resolver, port, lock_monitor_cb=None):
323     """Initializes this class.
324
325     @param resolver: callable accepting a list of hostnames, returning a list
326       of tuples containing name and IP address (IP address can be the name or
327       the special value L{_OFFLINE} to mark offline machines)
328     @type port: int
329     @param port: TCP port
330     @param lock_monitor_cb: Callable for registering with lock monitor
331
332     """
333     self._resolver = resolver
334     self._port = port
335     self._lock_monitor_cb = lock_monitor_cb
336
337   @staticmethod
338   def _PrepareRequests(hosts, port, procedure, body, read_timeout):
339     """Prepares requests by sorting offline hosts into separate list.
340
341     """
342     results = {}
343     requests = {}
344
345     for (name, ip) in hosts:
346       if ip is _OFFLINE:
347         # Node is marked as offline
348         results[name] = RpcResult(node=name, offline=True, call=procedure)
349       else:
350         requests[name] = \
351           http.client.HttpClientRequest(str(ip), port,
352                                         http.HTTP_PUT, str("/%s" % procedure),
353                                         headers=_RPC_CLIENT_HEADERS,
354                                         post_data=body,
355                                         read_timeout=read_timeout,
356                                         nicename="%s/%s" % (name, procedure),
357                                         curl_config_fn=_ConfigRpcCurl)
358
359     return (results, requests)
360
361   @staticmethod
362   def _CombineResults(results, requests, procedure):
363     """Combines pre-computed results for offline hosts with actual call results.
364
365     """
366     for name, req in requests.items():
367       if req.success and req.resp_status_code == http.HTTP_OK:
368         host_result = RpcResult(data=serializer.LoadJson(req.resp_body),
369                                 node=name, call=procedure)
370       else:
371         # TODO: Better error reporting
372         if req.error:
373           msg = req.error
374         else:
375           msg = req.resp_body
376
377         logging.error("RPC error in %s on node %s: %s", procedure, name, msg)
378         host_result = RpcResult(data=msg, failed=True, node=name,
379                                 call=procedure)
380
381       results[name] = host_result
382
383     return results
384
385   def __call__(self, hosts, procedure, body, read_timeout=None,
386                _req_process_fn=http.client.ProcessRequests):
387     """Makes an RPC request to a number of nodes.
388
389     @type hosts: sequence
390     @param hosts: Hostnames
391     @type procedure: string
392     @param procedure: Request path
393     @type body: string
394     @param body: Request body
395     @type read_timeout: int or None
396     @param read_timeout: Read timeout for request
397
398     """
399     assert read_timeout is not None, \
400       "Missing RPC read timeout for procedure '%s'" % procedure
401
402     (results, requests) = \
403       self._PrepareRequests(self._resolver(hosts), self._port, procedure,
404                             str(body), read_timeout)
405
406     _req_process_fn(requests.values(), lock_monitor_cb=self._lock_monitor_cb)
407
408     assert not frozenset(results).intersection(requests)
409
410     return self._CombineResults(results, requests, procedure)
411
412
413 class _RpcClientBase:
414   def __init__(self, resolver, encoder_fn, lock_monitor_cb=None):
415     """Initializes this class.
416
417     """
418     self._proc = _RpcProcessor(resolver,
419                                netutils.GetDaemonPort(constants.NODED),
420                                lock_monitor_cb=lock_monitor_cb)
421     self._encoder = compat.partial(self._EncodeArg, encoder_fn)
422
423   @staticmethod
424   def _EncodeArg(encoder_fn, (argkind, value)):
425     """Encode argument.
426
427     """
428     if argkind is None:
429       return value
430     else:
431       return encoder_fn(argkind)(value)
432
433   def _Call(self, cdef, node_list, args):
434     """Entry point for automatically generated RPC wrappers.
435
436     """
437     (procedure, _, timeout, argdefs, postproc_fn, _) = cdef
438
439     if callable(timeout):
440       read_timeout = timeout(args)
441     else:
442       read_timeout = timeout
443
444     body = serializer.DumpJson(map(self._encoder,
445                                    zip(map(compat.snd, argdefs), args)),
446                                indent=False)
447
448     result = self._proc(node_list, procedure, body, read_timeout=read_timeout)
449
450     if postproc_fn:
451       return dict(map(lambda (key, value): (key, postproc_fn(value)),
452                       result.items()))
453     else:
454       return result
455
456
457 def _ObjectToDict(value):
458   """Converts an object to a dictionary.
459
460   @note: See L{objects}.
461
462   """
463   return value.ToDict()
464
465
466 def _ObjectListToDict(value):
467   """Converts a list of L{objects} to dictionaries.
468
469   """
470   return map(_ObjectToDict, value)
471
472
473 def _EncodeNodeToDiskDict(value):
474   """Encodes a dictionary with node name as key and disk objects as values.
475
476   """
477   return dict((name, _ObjectListToDict(disks))
478               for name, disks in value.items())
479
480
481 def _PrepareFileUpload(filename):
482   """Loads a file and prepares it for an upload to nodes.
483
484   """
485   data = _Compress(utils.ReadFile(filename))
486   st = os.stat(filename)
487   getents = runtime.GetEnts()
488   return [filename, data, st.st_mode, getents.LookupUid(st.st_uid),
489           getents.LookupGid(st.st_gid), st.st_atime, st.st_mtime]
490
491
492 def _PrepareFinalizeExportDisks(snap_disks):
493   """Encodes disks for finalizing export.
494
495   """
496   flat_disks = []
497
498   for disk in snap_disks:
499     if isinstance(disk, bool):
500       flat_disks.append(disk)
501     else:
502       flat_disks.append(disk.ToDict())
503
504   return flat_disks
505
506
507 def _EncodeImportExportIO((ieio, ieioargs)):
508   """Encodes import/export I/O information.
509
510   """
511   if ieio == constants.IEIO_RAW_DISK:
512     assert len(ieioargs) == 1
513     return (ieio, (ieioargs[0].ToDict(), ))
514
515   if ieio == constants.IEIO_SCRIPT:
516     assert len(ieioargs) == 2
517     return (ieio, (ieioargs[0].ToDict(), ieioargs[1]))
518
519   return (ieio, ieioargs)
520
521
522 def _EncodeBlockdevRename(value):
523   """Encodes information for renaming block devices.
524
525   """
526   return [(d.ToDict(), uid) for d, uid in value]
527
528
529 #: Generic encoders
530 _ENCODERS = {
531   rpc_defs.ED_OBJECT_DICT: _ObjectToDict,
532   rpc_defs.ED_OBJECT_DICT_LIST: _ObjectListToDict,
533   rpc_defs.ED_NODE_TO_DISK_DICT: _EncodeNodeToDiskDict,
534   rpc_defs.ED_FILE_DETAILS: _PrepareFileUpload,
535   rpc_defs.ED_COMPRESS: _Compress,
536   rpc_defs.ED_FINALIZE_EXPORT_DISKS: _PrepareFinalizeExportDisks,
537   rpc_defs.ED_IMPEXP_IO: _EncodeImportExportIO,
538   rpc_defs.ED_BLOCKDEV_RENAME: _EncodeBlockdevRename,
539   }
540
541
542 class RpcRunner(_RpcClientBase,
543                 _generated_rpc.RpcClientDefault,
544                 _generated_rpc.RpcClientBootstrap,
545                 _generated_rpc.RpcClientConfig):
546   """RPC runner class.
547
548   """
549   def __init__(self, context):
550     """Initialized the RPC runner.
551
552     @type context: C{masterd.GanetiContext}
553     @param context: Ganeti context
554
555     """
556     self._cfg = context.cfg
557
558     encoders = _ENCODERS.copy()
559
560     # Add encoders requiring configuration object
561     encoders.update({
562       rpc_defs.ED_INST_DICT: self._InstDict,
563       rpc_defs.ED_INST_DICT_HVP_BEP: self._InstDictHvpBep,
564       rpc_defs.ED_INST_DICT_OSP: self._InstDictOsp,
565       })
566
567     # Resolver using configuration
568     resolver = compat.partial(_NodeConfigResolver, self._cfg.GetNodeInfo,
569                               self._cfg.GetAllNodesInfo)
570
571     # Pylint doesn't recognize multiple inheritance properly, see
572     # <http://www.logilab.org/ticket/36586> and
573     # <http://www.logilab.org/ticket/35642>
574     # pylint: disable=W0233
575     _RpcClientBase.__init__(self, resolver, encoders.get,
576                             lock_monitor_cb=context.glm.AddToLockMonitor)
577     _generated_rpc.RpcClientConfig.__init__(self)
578     _generated_rpc.RpcClientBootstrap.__init__(self)
579     _generated_rpc.RpcClientDefault.__init__(self)
580
581   def _InstDict(self, instance, hvp=None, bep=None, osp=None):
582     """Convert the given instance to a dict.
583
584     This is done via the instance's ToDict() method and additionally
585     we fill the hvparams with the cluster defaults.
586
587     @type instance: L{objects.Instance}
588     @param instance: an Instance object
589     @type hvp: dict or None
590     @param hvp: a dictionary with overridden hypervisor parameters
591     @type bep: dict or None
592     @param bep: a dictionary with overridden backend parameters
593     @type osp: dict or None
594     @param osp: a dictionary with overridden os parameters
595     @rtype: dict
596     @return: the instance dict, with the hvparams filled with the
597         cluster defaults
598
599     """
600     idict = instance.ToDict()
601     cluster = self._cfg.GetClusterInfo()
602     idict["hvparams"] = cluster.FillHV(instance)
603     if hvp is not None:
604       idict["hvparams"].update(hvp)
605     idict["beparams"] = cluster.FillBE(instance)
606     if bep is not None:
607       idict["beparams"].update(bep)
608     idict["osparams"] = cluster.SimpleFillOS(instance.os, instance.osparams)
609     if osp is not None:
610       idict["osparams"].update(osp)
611     for nic in idict["nics"]:
612       nic['nicparams'] = objects.FillDict(
613         cluster.nicparams[constants.PP_DEFAULT],
614         nic['nicparams'])
615     return idict
616
617   def _InstDictHvpBep(self, (instance, hvp, bep)):
618     """Wrapper for L{_InstDict}.
619
620     """
621     return self._InstDict(instance, hvp=hvp, bep=bep)
622
623   def _InstDictOsp(self, (instance, osparams)):
624     """Wrapper for L{_InstDict}.
625
626     """
627     return self._InstDict(instance, osp=osparams)
628
629
630 class JobQueueRunner(_RpcClientBase, _generated_rpc.RpcClientJobQueue):
631   """RPC wrappers for job queue.
632
633   """
634   def __init__(self, context, address_list):
635     """Initializes this class.
636
637     """
638     if address_list is None:
639       resolver = _SsconfResolver
640     else:
641       # Caller provided an address list
642       resolver = _StaticResolver(address_list)
643
644     _RpcClientBase.__init__(self, resolver, _ENCODERS.get,
645                             lock_monitor_cb=context.glm.AddToLockMonitor)
646     _generated_rpc.RpcClientJobQueue.__init__(self)
647
648
649 class BootstrapRunner(_RpcClientBase, _generated_rpc.RpcClientBootstrap):
650   """RPC wrappers for bootstrapping.
651
652   """
653   def __init__(self):
654     """Initializes this class.
655
656     """
657     _RpcClientBase.__init__(self, _SsconfResolver, _ENCODERS.get)
658     _generated_rpc.RpcClientBootstrap.__init__(self)
659
660
661 class ConfigRunner(_RpcClientBase, _generated_rpc.RpcClientConfig):
662   """RPC wrappers for L{config}.
663
664   """
665   def __init__(self, context, address_list):
666     """Initializes this class.
667
668     """
669     if context:
670       lock_monitor_cb = context.glm.AddToLockMonitor
671     else:
672       lock_monitor_cb = None
673
674     if address_list is None:
675       resolver = _SsconfResolver
676     else:
677       # Caller provided an address list
678       resolver = _StaticResolver(address_list)
679
680     _RpcClientBase.__init__(self, resolver, _ENCODERS.get,
681                             lock_monitor_cb=lock_monitor_cb)
682     _generated_rpc.RpcClientConfig.__init__(self)