Change internal RPC client body values
[ganeti-local] / lib / rpc.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Inter-node RPC library.
23
24 """
25
26 # pylint: disable=C0103,R0201,R0904
27 # C0103: Invalid name, since call_ are not valid
28 # R0201: Method could be a function, we keep all rpcs instance methods
29 # as not to change them back and forth between static/instance methods
30 # if they need to start using instance attributes
31 # R0904: Too many public methods
32
33 import os
34 import logging
35 import zlib
36 import base64
37 import pycurl
38 import threading
39
40 from ganeti import utils
41 from ganeti import objects
42 from ganeti import http
43 from ganeti import serializer
44 from ganeti import constants
45 from ganeti import errors
46 from ganeti import netutils
47 from ganeti import ssconf
48 from ganeti import runtime
49 from ganeti import compat
50 from ganeti import rpc_defs
51
52 # Special module generated at build time
53 from ganeti import _generated_rpc
54
55 # pylint has a bug here, doesn't see this import
56 import ganeti.http.client  # pylint: disable=W0611
57
58
59 # Timeout for connecting to nodes (seconds)
60 _RPC_CONNECT_TIMEOUT = 5
61
62 _RPC_CLIENT_HEADERS = [
63   "Content-type: %s" % http.HTTP_APP_JSON,
64   "Expect:",
65   ]
66
67 # Various time constants for the timeout table
68 _TMO_URGENT = 60 # one minute
69 _TMO_FAST = 5 * 60 # five minutes
70 _TMO_NORMAL = 15 * 60 # 15 minutes
71 _TMO_SLOW = 3600 # one hour
72 _TMO_4HRS = 4 * 3600
73 _TMO_1DAY = 86400
74
75 #: Special value to describe an offline host
76 _OFFLINE = object()
77
78
79 def Init():
80   """Initializes the module-global HTTP client manager.
81
82   Must be called before using any RPC function and while exactly one thread is
83   running.
84
85   """
86   # curl_global_init(3) and curl_global_cleanup(3) must be called with only
87   # one thread running. This check is just a safety measure -- it doesn't
88   # cover all cases.
89   assert threading.activeCount() == 1, \
90          "Found more than one active thread when initializing pycURL"
91
92   logging.info("Using PycURL %s", pycurl.version)
93
94   pycurl.global_init(pycurl.GLOBAL_ALL)
95
96
97 def Shutdown():
98   """Stops the module-global HTTP client manager.
99
100   Must be called before quitting the program and while exactly one thread is
101   running.
102
103   """
104   pycurl.global_cleanup()
105
106
107 def _ConfigRpcCurl(curl):
108   noded_cert = str(constants.NODED_CERT_FILE)
109
110   curl.setopt(pycurl.FOLLOWLOCATION, False)
111   curl.setopt(pycurl.CAINFO, noded_cert)
112   curl.setopt(pycurl.SSL_VERIFYHOST, 0)
113   curl.setopt(pycurl.SSL_VERIFYPEER, True)
114   curl.setopt(pycurl.SSLCERTTYPE, "PEM")
115   curl.setopt(pycurl.SSLCERT, noded_cert)
116   curl.setopt(pycurl.SSLKEYTYPE, "PEM")
117   curl.setopt(pycurl.SSLKEY, noded_cert)
118   curl.setopt(pycurl.CONNECTTIMEOUT, _RPC_CONNECT_TIMEOUT)
119
120
121 def RunWithRPC(fn):
122   """RPC-wrapper decorator.
123
124   When applied to a function, it runs it with the RPC system
125   initialized, and it shutsdown the system afterwards. This means the
126   function must be called without RPC being initialized.
127
128   """
129   def wrapper(*args, **kwargs):
130     Init()
131     try:
132       return fn(*args, **kwargs)
133     finally:
134       Shutdown()
135   return wrapper
136
137
138 def _Compress(data):
139   """Compresses a string for transport over RPC.
140
141   Small amounts of data are not compressed.
142
143   @type data: str
144   @param data: Data
145   @rtype: tuple
146   @return: Encoded data to send
147
148   """
149   # Small amounts of data are not compressed
150   if len(data) < 512:
151     return (constants.RPC_ENCODING_NONE, data)
152
153   # Compress with zlib and encode in base64
154   return (constants.RPC_ENCODING_ZLIB_BASE64,
155           base64.b64encode(zlib.compress(data, 3)))
156
157
158 class RpcResult(object):
159   """RPC Result class.
160
161   This class holds an RPC result. It is needed since in multi-node
162   calls we can't raise an exception just because one one out of many
163   failed, and therefore we use this class to encapsulate the result.
164
165   @ivar data: the data payload, for successful results, or None
166   @ivar call: the name of the RPC call
167   @ivar node: the name of the node to which we made the call
168   @ivar offline: whether the operation failed because the node was
169       offline, as opposed to actual failure; offline=True will always
170       imply failed=True, in order to allow simpler checking if
171       the user doesn't care about the exact failure mode
172   @ivar fail_msg: the error message if the call failed
173
174   """
175   def __init__(self, data=None, failed=False, offline=False,
176                call=None, node=None):
177     self.offline = offline
178     self.call = call
179     self.node = node
180
181     if offline:
182       self.fail_msg = "Node is marked offline"
183       self.data = self.payload = None
184     elif failed:
185       self.fail_msg = self._EnsureErr(data)
186       self.data = self.payload = None
187     else:
188       self.data = data
189       if not isinstance(self.data, (tuple, list)):
190         self.fail_msg = ("RPC layer error: invalid result type (%s)" %
191                          type(self.data))
192         self.payload = None
193       elif len(data) != 2:
194         self.fail_msg = ("RPC layer error: invalid result length (%d), "
195                          "expected 2" % len(self.data))
196         self.payload = None
197       elif not self.data[0]:
198         self.fail_msg = self._EnsureErr(self.data[1])
199         self.payload = None
200       else:
201         # finally success
202         self.fail_msg = None
203         self.payload = data[1]
204
205     for attr_name in ["call", "data", "fail_msg",
206                       "node", "offline", "payload"]:
207       assert hasattr(self, attr_name), "Missing attribute %s" % attr_name
208
209   @staticmethod
210   def _EnsureErr(val):
211     """Helper to ensure we return a 'True' value for error."""
212     if val:
213       return val
214     else:
215       return "No error information"
216
217   def Raise(self, msg, prereq=False, ecode=None):
218     """If the result has failed, raise an OpExecError.
219
220     This is used so that LU code doesn't have to check for each
221     result, but instead can call this function.
222
223     """
224     if not self.fail_msg:
225       return
226
227     if not msg: # one could pass None for default message
228       msg = ("Call '%s' to node '%s' has failed: %s" %
229              (self.call, self.node, self.fail_msg))
230     else:
231       msg = "%s: %s" % (msg, self.fail_msg)
232     if prereq:
233       ec = errors.OpPrereqError
234     else:
235       ec = errors.OpExecError
236     if ecode is not None:
237       args = (msg, ecode)
238     else:
239       args = (msg, )
240     raise ec(*args) # pylint: disable=W0142
241
242
243 def _SsconfResolver(node_list,
244                     ssc=ssconf.SimpleStore,
245                     nslookup_fn=netutils.Hostname.GetIP):
246   """Return addresses for given node names.
247
248   @type node_list: list
249   @param node_list: List of node names
250   @type ssc: class
251   @param ssc: SimpleStore class that is used to obtain node->ip mappings
252   @type nslookup_fn: callable
253   @param nslookup_fn: function use to do NS lookup
254   @rtype: list of tuple; (string, string)
255   @return: List of tuples containing node name and IP address
256
257   """
258   ss = ssc()
259   iplist = ss.GetNodePrimaryIPList()
260   family = ss.GetPrimaryIPFamily()
261   ipmap = dict(entry.split() for entry in iplist)
262
263   result = []
264   for node in node_list:
265     ip = ipmap.get(node)
266     if ip is None:
267       ip = nslookup_fn(node, family=family)
268     result.append((node, ip))
269
270   return result
271
272
273 class _StaticResolver:
274   def __init__(self, addresses):
275     """Initializes this class.
276
277     """
278     self._addresses = addresses
279
280   def __call__(self, hosts):
281     """Returns static addresses for hosts.
282
283     """
284     assert len(hosts) == len(self._addresses)
285     return zip(hosts, self._addresses)
286
287
288 def _CheckConfigNode(name, node):
289   """Checks if a node is online.
290
291   @type name: string
292   @param name: Node name
293   @type node: L{objects.Node} or None
294   @param node: Node object
295
296   """
297   if node is None:
298     # Depend on DNS for name resolution
299     ip = name
300   elif node.offline:
301     ip = _OFFLINE
302   else:
303     ip = node.primary_ip
304   return (name, ip)
305
306
307 def _NodeConfigResolver(single_node_fn, all_nodes_fn, hosts):
308   """Calculate node addresses using configuration.
309
310   """
311   # Special case for single-host lookups
312   if len(hosts) == 1:
313     (name, ) = hosts
314     return [_CheckConfigNode(name, single_node_fn(name))]
315   else:
316     all_nodes = all_nodes_fn()
317     return [_CheckConfigNode(name, all_nodes.get(name, None))
318             for name in hosts]
319
320
321 class _RpcProcessor:
322   def __init__(self, resolver, port, lock_monitor_cb=None):
323     """Initializes this class.
324
325     @param resolver: callable accepting a list of hostnames, returning a list
326       of tuples containing name and IP address (IP address can be the name or
327       the special value L{_OFFLINE} to mark offline machines)
328     @type port: int
329     @param port: TCP port
330     @param lock_monitor_cb: Callable for registering with lock monitor
331
332     """
333     self._resolver = resolver
334     self._port = port
335     self._lock_monitor_cb = lock_monitor_cb
336
337   @staticmethod
338   def _PrepareRequests(hosts, port, procedure, body, read_timeout):
339     """Prepares requests by sorting offline hosts into separate list.
340
341     @type body: dict
342     @param body: a dictionary with per-host body data
343
344     """
345     results = {}
346     requests = {}
347
348     assert isinstance(body, dict)
349     assert len(body) == len(hosts)
350     assert compat.all(isinstance(v, str) for v in body.values())
351     assert frozenset(map(compat.fst, hosts)) == frozenset(body.keys()), \
352         "%s != %s" % (hosts, body.keys())
353
354     for (name, ip) in hosts:
355       if ip is _OFFLINE:
356         # Node is marked as offline
357         results[name] = RpcResult(node=name, offline=True, call=procedure)
358       else:
359         requests[name] = \
360           http.client.HttpClientRequest(str(ip), port,
361                                         http.HTTP_PUT, str("/%s" % procedure),
362                                         headers=_RPC_CLIENT_HEADERS,
363                                         post_data=body[name],
364                                         read_timeout=read_timeout,
365                                         nicename="%s/%s" % (name, procedure),
366                                         curl_config_fn=_ConfigRpcCurl)
367
368     return (results, requests)
369
370   @staticmethod
371   def _CombineResults(results, requests, procedure):
372     """Combines pre-computed results for offline hosts with actual call results.
373
374     """
375     for name, req in requests.items():
376       if req.success and req.resp_status_code == http.HTTP_OK:
377         host_result = RpcResult(data=serializer.LoadJson(req.resp_body),
378                                 node=name, call=procedure)
379       else:
380         # TODO: Better error reporting
381         if req.error:
382           msg = req.error
383         else:
384           msg = req.resp_body
385
386         logging.error("RPC error in %s on node %s: %s", procedure, name, msg)
387         host_result = RpcResult(data=msg, failed=True, node=name,
388                                 call=procedure)
389
390       results[name] = host_result
391
392     return results
393
394   def __call__(self, hosts, procedure, body, read_timeout=None,
395                _req_process_fn=http.client.ProcessRequests):
396     """Makes an RPC request to a number of nodes.
397
398     @type hosts: sequence
399     @param hosts: Hostnames
400     @type procedure: string
401     @param procedure: Request path
402     @type body: dictionary
403     @param body: dictionary with request bodies per host
404     @type read_timeout: int or None
405     @param read_timeout: Read timeout for request
406
407     """
408     assert read_timeout is not None, \
409       "Missing RPC read timeout for procedure '%s'" % procedure
410
411     (results, requests) = \
412       self._PrepareRequests(self._resolver(hosts), self._port, procedure,
413                             body, read_timeout)
414
415     _req_process_fn(requests.values(), lock_monitor_cb=self._lock_monitor_cb)
416
417     assert not frozenset(results).intersection(requests)
418
419     return self._CombineResults(results, requests, procedure)
420
421
422 class _RpcClientBase:
423   def __init__(self, resolver, encoder_fn, lock_monitor_cb=None):
424     """Initializes this class.
425
426     """
427     self._proc = _RpcProcessor(resolver,
428                                netutils.GetDaemonPort(constants.NODED),
429                                lock_monitor_cb=lock_monitor_cb)
430     self._encoder = compat.partial(self._EncodeArg, encoder_fn)
431
432   @staticmethod
433   def _EncodeArg(encoder_fn, (argkind, value)):
434     """Encode argument.
435
436     """
437     if argkind is None:
438       return value
439     else:
440       return encoder_fn(argkind)(value)
441
442   def _Call(self, cdef, node_list, args):
443     """Entry point for automatically generated RPC wrappers.
444
445     """
446     (procedure, _, timeout, argdefs, prep_fn, postproc_fn, _) = cdef
447
448     if callable(timeout):
449       read_timeout = timeout(args)
450     else:
451       read_timeout = timeout
452
453     enc_args = map(self._encoder, zip(map(compat.snd, argdefs), args))
454     if prep_fn is None:
455       # for a no-op prep_fn, we serialise the body once, and then we
456       # reuse it in the dictionary values
457       body = serializer.DumpJson(enc_args)
458       pnbody = dict((n, body) for n in node_list)
459     else:
460       # for a custom prep_fn, we pass the encoded arguments and the
461       # node name to the prep_fn, and we serialise its return value
462       assert(callable(prep_fn))
463       pnbody = dict((n, serializer.DumpJson(prep_fn(n, enc_args)))
464                     for n in node_list)
465
466     result = self._proc(node_list, procedure, pnbody,
467                         read_timeout=read_timeout)
468
469     if postproc_fn:
470       return dict(map(lambda (key, value): (key, postproc_fn(value)),
471                       result.items()))
472     else:
473       return result
474
475
476 def _ObjectToDict(value):
477   """Converts an object to a dictionary.
478
479   @note: See L{objects}.
480
481   """
482   return value.ToDict()
483
484
485 def _ObjectListToDict(value):
486   """Converts a list of L{objects} to dictionaries.
487
488   """
489   return map(_ObjectToDict, value)
490
491
492 def _EncodeNodeToDiskDict(value):
493   """Encodes a dictionary with node name as key and disk objects as values.
494
495   """
496   return dict((name, _ObjectListToDict(disks))
497               for name, disks in value.items())
498
499
500 def _PrepareFileUpload(filename):
501   """Loads a file and prepares it for an upload to nodes.
502
503   """
504   data = _Compress(utils.ReadFile(filename))
505   st = os.stat(filename)
506   getents = runtime.GetEnts()
507   return [filename, data, st.st_mode, getents.LookupUid(st.st_uid),
508           getents.LookupGid(st.st_gid), st.st_atime, st.st_mtime]
509
510
511 def _PrepareFinalizeExportDisks(snap_disks):
512   """Encodes disks for finalizing export.
513
514   """
515   flat_disks = []
516
517   for disk in snap_disks:
518     if isinstance(disk, bool):
519       flat_disks.append(disk)
520     else:
521       flat_disks.append(disk.ToDict())
522
523   return flat_disks
524
525
526 def _EncodeImportExportIO((ieio, ieioargs)):
527   """Encodes import/export I/O information.
528
529   """
530   if ieio == constants.IEIO_RAW_DISK:
531     assert len(ieioargs) == 1
532     return (ieio, (ieioargs[0].ToDict(), ))
533
534   if ieio == constants.IEIO_SCRIPT:
535     assert len(ieioargs) == 2
536     return (ieio, (ieioargs[0].ToDict(), ieioargs[1]))
537
538   return (ieio, ieioargs)
539
540
541 def _EncodeBlockdevRename(value):
542   """Encodes information for renaming block devices.
543
544   """
545   return [(d.ToDict(), uid) for d, uid in value]
546
547
548 #: Generic encoders
549 _ENCODERS = {
550   rpc_defs.ED_OBJECT_DICT: _ObjectToDict,
551   rpc_defs.ED_OBJECT_DICT_LIST: _ObjectListToDict,
552   rpc_defs.ED_NODE_TO_DISK_DICT: _EncodeNodeToDiskDict,
553   rpc_defs.ED_FILE_DETAILS: _PrepareFileUpload,
554   rpc_defs.ED_COMPRESS: _Compress,
555   rpc_defs.ED_FINALIZE_EXPORT_DISKS: _PrepareFinalizeExportDisks,
556   rpc_defs.ED_IMPEXP_IO: _EncodeImportExportIO,
557   rpc_defs.ED_BLOCKDEV_RENAME: _EncodeBlockdevRename,
558   }
559
560
561 class RpcRunner(_RpcClientBase,
562                 _generated_rpc.RpcClientDefault,
563                 _generated_rpc.RpcClientBootstrap,
564                 _generated_rpc.RpcClientConfig):
565   """RPC runner class.
566
567   """
568   def __init__(self, context):
569     """Initialized the RPC runner.
570
571     @type context: C{masterd.GanetiContext}
572     @param context: Ganeti context
573
574     """
575     self._cfg = context.cfg
576
577     encoders = _ENCODERS.copy()
578
579     # Add encoders requiring configuration object
580     encoders.update({
581       rpc_defs.ED_INST_DICT: self._InstDict,
582       rpc_defs.ED_INST_DICT_HVP_BEP: self._InstDictHvpBep,
583       rpc_defs.ED_INST_DICT_OSP: self._InstDictOsp,
584       })
585
586     # Resolver using configuration
587     resolver = compat.partial(_NodeConfigResolver, self._cfg.GetNodeInfo,
588                               self._cfg.GetAllNodesInfo)
589
590     # Pylint doesn't recognize multiple inheritance properly, see
591     # <http://www.logilab.org/ticket/36586> and
592     # <http://www.logilab.org/ticket/35642>
593     # pylint: disable=W0233
594     _RpcClientBase.__init__(self, resolver, encoders.get,
595                             lock_monitor_cb=context.glm.AddToLockMonitor)
596     _generated_rpc.RpcClientConfig.__init__(self)
597     _generated_rpc.RpcClientBootstrap.__init__(self)
598     _generated_rpc.RpcClientDefault.__init__(self)
599
600   def _InstDict(self, instance, hvp=None, bep=None, osp=None):
601     """Convert the given instance to a dict.
602
603     This is done via the instance's ToDict() method and additionally
604     we fill the hvparams with the cluster defaults.
605
606     @type instance: L{objects.Instance}
607     @param instance: an Instance object
608     @type hvp: dict or None
609     @param hvp: a dictionary with overridden hypervisor parameters
610     @type bep: dict or None
611     @param bep: a dictionary with overridden backend parameters
612     @type osp: dict or None
613     @param osp: a dictionary with overridden os parameters
614     @rtype: dict
615     @return: the instance dict, with the hvparams filled with the
616         cluster defaults
617
618     """
619     idict = instance.ToDict()
620     cluster = self._cfg.GetClusterInfo()
621     idict["hvparams"] = cluster.FillHV(instance)
622     if hvp is not None:
623       idict["hvparams"].update(hvp)
624     idict["beparams"] = cluster.FillBE(instance)
625     if bep is not None:
626       idict["beparams"].update(bep)
627     idict["osparams"] = cluster.SimpleFillOS(instance.os, instance.osparams)
628     if osp is not None:
629       idict["osparams"].update(osp)
630     for nic in idict["nics"]:
631       nic['nicparams'] = objects.FillDict(
632         cluster.nicparams[constants.PP_DEFAULT],
633         nic['nicparams'])
634     return idict
635
636   def _InstDictHvpBep(self, (instance, hvp, bep)):
637     """Wrapper for L{_InstDict}.
638
639     """
640     return self._InstDict(instance, hvp=hvp, bep=bep)
641
642   def _InstDictOsp(self, (instance, osparams)):
643     """Wrapper for L{_InstDict}.
644
645     """
646     return self._InstDict(instance, osp=osparams)
647
648
649 class JobQueueRunner(_RpcClientBase, _generated_rpc.RpcClientJobQueue):
650   """RPC wrappers for job queue.
651
652   """
653   def __init__(self, context, address_list):
654     """Initializes this class.
655
656     """
657     if address_list is None:
658       resolver = _SsconfResolver
659     else:
660       # Caller provided an address list
661       resolver = _StaticResolver(address_list)
662
663     _RpcClientBase.__init__(self, resolver, _ENCODERS.get,
664                             lock_monitor_cb=context.glm.AddToLockMonitor)
665     _generated_rpc.RpcClientJobQueue.__init__(self)
666
667
668 class BootstrapRunner(_RpcClientBase, _generated_rpc.RpcClientBootstrap):
669   """RPC wrappers for bootstrapping.
670
671   """
672   def __init__(self):
673     """Initializes this class.
674
675     """
676     _RpcClientBase.__init__(self, _SsconfResolver, _ENCODERS.get)
677     _generated_rpc.RpcClientBootstrap.__init__(self)
678
679
680 class ConfigRunner(_RpcClientBase, _generated_rpc.RpcClientConfig):
681   """RPC wrappers for L{config}.
682
683   """
684   def __init__(self, context, address_list):
685     """Initializes this class.
686
687     """
688     if context:
689       lock_monitor_cb = context.glm.AddToLockMonitor
690     else:
691       lock_monitor_cb = None
692
693     if address_list is None:
694       resolver = _SsconfResolver
695     else:
696       # Caller provided an address list
697       resolver = _StaticResolver(address_list)
698
699     _RpcClientBase.__init__(self, resolver, _ENCODERS.get,
700                             lock_monitor_cb=lock_monitor_cb)
701     _generated_rpc.RpcClientConfig.__init__(self)