rpc: Use definitions directly instead of via generated code
[ganeti-local] / lib / rpc.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Inter-node RPC library.
23
24 """
25
26 # pylint: disable=C0103,R0201,R0904
27 # C0103: Invalid name, since call_ are not valid
28 # R0201: Method could be a function, we keep all rpcs instance methods
29 # as not to change them back and forth between static/instance methods
30 # if they need to start using instance attributes
31 # R0904: Too many public methods
32
33 import os
34 import logging
35 import zlib
36 import base64
37 import pycurl
38 import threading
39
40 from ganeti import utils
41 from ganeti import objects
42 from ganeti import http
43 from ganeti import serializer
44 from ganeti import constants
45 from ganeti import errors
46 from ganeti import netutils
47 from ganeti import ssconf
48 from ganeti import runtime
49 from ganeti import compat
50 from ganeti import rpc_defs
51
52 # Special module generated at build time
53 from ganeti import _generated_rpc
54
55 # pylint has a bug here, doesn't see this import
56 import ganeti.http.client  # pylint: disable=W0611
57
58
59 # Timeout for connecting to nodes (seconds)
60 _RPC_CONNECT_TIMEOUT = 5
61
62 _RPC_CLIENT_HEADERS = [
63   "Content-type: %s" % http.HTTP_APP_JSON,
64   "Expect:",
65   ]
66
67 # Various time constants for the timeout table
68 _TMO_URGENT = 60 # one minute
69 _TMO_FAST = 5 * 60 # five minutes
70 _TMO_NORMAL = 15 * 60 # 15 minutes
71 _TMO_SLOW = 3600 # one hour
72 _TMO_4HRS = 4 * 3600
73 _TMO_1DAY = 86400
74
75 #: Special value to describe an offline host
76 _OFFLINE = object()
77
78
79 def Init():
80   """Initializes the module-global HTTP client manager.
81
82   Must be called before using any RPC function and while exactly one thread is
83   running.
84
85   """
86   # curl_global_init(3) and curl_global_cleanup(3) must be called with only
87   # one thread running. This check is just a safety measure -- it doesn't
88   # cover all cases.
89   assert threading.activeCount() == 1, \
90          "Found more than one active thread when initializing pycURL"
91
92   logging.info("Using PycURL %s", pycurl.version)
93
94   pycurl.global_init(pycurl.GLOBAL_ALL)
95
96
97 def Shutdown():
98   """Stops the module-global HTTP client manager.
99
100   Must be called before quitting the program and while exactly one thread is
101   running.
102
103   """
104   pycurl.global_cleanup()
105
106
107 def _ConfigRpcCurl(curl):
108   noded_cert = str(constants.NODED_CERT_FILE)
109
110   curl.setopt(pycurl.FOLLOWLOCATION, False)
111   curl.setopt(pycurl.CAINFO, noded_cert)
112   curl.setopt(pycurl.SSL_VERIFYHOST, 0)
113   curl.setopt(pycurl.SSL_VERIFYPEER, True)
114   curl.setopt(pycurl.SSLCERTTYPE, "PEM")
115   curl.setopt(pycurl.SSLCERT, noded_cert)
116   curl.setopt(pycurl.SSLKEYTYPE, "PEM")
117   curl.setopt(pycurl.SSLKEY, noded_cert)
118   curl.setopt(pycurl.CONNECTTIMEOUT, _RPC_CONNECT_TIMEOUT)
119
120
121 def RunWithRPC(fn):
122   """RPC-wrapper decorator.
123
124   When applied to a function, it runs it with the RPC system
125   initialized, and it shutsdown the system afterwards. This means the
126   function must be called without RPC being initialized.
127
128   """
129   def wrapper(*args, **kwargs):
130     Init()
131     try:
132       return fn(*args, **kwargs)
133     finally:
134       Shutdown()
135   return wrapper
136
137
138 def _Compress(data):
139   """Compresses a string for transport over RPC.
140
141   Small amounts of data are not compressed.
142
143   @type data: str
144   @param data: Data
145   @rtype: tuple
146   @return: Encoded data to send
147
148   """
149   # Small amounts of data are not compressed
150   if len(data) < 512:
151     return (constants.RPC_ENCODING_NONE, data)
152
153   # Compress with zlib and encode in base64
154   return (constants.RPC_ENCODING_ZLIB_BASE64,
155           base64.b64encode(zlib.compress(data, 3)))
156
157
158 class RpcResult(object):
159   """RPC Result class.
160
161   This class holds an RPC result. It is needed since in multi-node
162   calls we can't raise an exception just because one one out of many
163   failed, and therefore we use this class to encapsulate the result.
164
165   @ivar data: the data payload, for successful results, or None
166   @ivar call: the name of the RPC call
167   @ivar node: the name of the node to which we made the call
168   @ivar offline: whether the operation failed because the node was
169       offline, as opposed to actual failure; offline=True will always
170       imply failed=True, in order to allow simpler checking if
171       the user doesn't care about the exact failure mode
172   @ivar fail_msg: the error message if the call failed
173
174   """
175   def __init__(self, data=None, failed=False, offline=False,
176                call=None, node=None):
177     self.offline = offline
178     self.call = call
179     self.node = node
180
181     if offline:
182       self.fail_msg = "Node is marked offline"
183       self.data = self.payload = None
184     elif failed:
185       self.fail_msg = self._EnsureErr(data)
186       self.data = self.payload = None
187     else:
188       self.data = data
189       if not isinstance(self.data, (tuple, list)):
190         self.fail_msg = ("RPC layer error: invalid result type (%s)" %
191                          type(self.data))
192         self.payload = None
193       elif len(data) != 2:
194         self.fail_msg = ("RPC layer error: invalid result length (%d), "
195                          "expected 2" % len(self.data))
196         self.payload = None
197       elif not self.data[0]:
198         self.fail_msg = self._EnsureErr(self.data[1])
199         self.payload = None
200       else:
201         # finally success
202         self.fail_msg = None
203         self.payload = data[1]
204
205     for attr_name in ["call", "data", "fail_msg",
206                       "node", "offline", "payload"]:
207       assert hasattr(self, attr_name), "Missing attribute %s" % attr_name
208
209   @staticmethod
210   def _EnsureErr(val):
211     """Helper to ensure we return a 'True' value for error."""
212     if val:
213       return val
214     else:
215       return "No error information"
216
217   def Raise(self, msg, prereq=False, ecode=None):
218     """If the result has failed, raise an OpExecError.
219
220     This is used so that LU code doesn't have to check for each
221     result, but instead can call this function.
222
223     """
224     if not self.fail_msg:
225       return
226
227     if not msg: # one could pass None for default message
228       msg = ("Call '%s' to node '%s' has failed: %s" %
229              (self.call, self.node, self.fail_msg))
230     else:
231       msg = "%s: %s" % (msg, self.fail_msg)
232     if prereq:
233       ec = errors.OpPrereqError
234     else:
235       ec = errors.OpExecError
236     if ecode is not None:
237       args = (msg, ecode)
238     else:
239       args = (msg, )
240     raise ec(*args) # pylint: disable=W0142
241
242
243 def _SsconfResolver(node_list,
244                     ssc=ssconf.SimpleStore,
245                     nslookup_fn=netutils.Hostname.GetIP):
246   """Return addresses for given node names.
247
248   @type node_list: list
249   @param node_list: List of node names
250   @type ssc: class
251   @param ssc: SimpleStore class that is used to obtain node->ip mappings
252   @type nslookup_fn: callable
253   @param nslookup_fn: function use to do NS lookup
254   @rtype: list of tuple; (string, string)
255   @return: List of tuples containing node name and IP address
256
257   """
258   ss = ssc()
259   iplist = ss.GetNodePrimaryIPList()
260   family = ss.GetPrimaryIPFamily()
261   ipmap = dict(entry.split() for entry in iplist)
262
263   result = []
264   for node in node_list:
265     ip = ipmap.get(node)
266     if ip is None:
267       ip = nslookup_fn(node, family=family)
268     result.append((node, ip))
269
270   return result
271
272
273 class _StaticResolver:
274   def __init__(self, addresses):
275     """Initializes this class.
276
277     """
278     self._addresses = addresses
279
280   def __call__(self, hosts):
281     """Returns static addresses for hosts.
282
283     """
284     assert len(hosts) == len(self._addresses)
285     return zip(hosts, self._addresses)
286
287
288 def _CheckConfigNode(name, node):
289   """Checks if a node is online.
290
291   @type name: string
292   @param name: Node name
293   @type node: L{objects.Node} or None
294   @param node: Node object
295
296   """
297   if node is None:
298     # Depend on DNS for name resolution
299     ip = name
300   elif node.offline:
301     ip = _OFFLINE
302   else:
303     ip = node.primary_ip
304   return (name, ip)
305
306
307 def _NodeConfigResolver(single_node_fn, all_nodes_fn, hosts):
308   """Calculate node addresses using configuration.
309
310   """
311   # Special case for single-host lookups
312   if len(hosts) == 1:
313     (name, ) = hosts
314     return [_CheckConfigNode(name, single_node_fn(name))]
315   else:
316     all_nodes = all_nodes_fn()
317     return [_CheckConfigNode(name, all_nodes.get(name, None))
318             for name in hosts]
319
320
321 class _RpcProcessor:
322   def __init__(self, resolver, port, lock_monitor_cb=None):
323     """Initializes this class.
324
325     @param resolver: callable accepting a list of hostnames, returning a list
326       of tuples containing name and IP address (IP address can be the name or
327       the special value L{_OFFLINE} to mark offline machines)
328     @type port: int
329     @param port: TCP port
330     @param lock_monitor_cb: Callable for registering with lock monitor
331
332     """
333     self._resolver = resolver
334     self._port = port
335     self._lock_monitor_cb = lock_monitor_cb
336
337   @staticmethod
338   def _PrepareRequests(hosts, port, procedure, body, read_timeout):
339     """Prepares requests by sorting offline hosts into separate list.
340
341     """
342     results = {}
343     requests = {}
344
345     for (name, ip) in hosts:
346       if ip is _OFFLINE:
347         # Node is marked as offline
348         results[name] = RpcResult(node=name, offline=True, call=procedure)
349       else:
350         requests[name] = \
351           http.client.HttpClientRequest(str(ip), port,
352                                         http.HTTP_PUT, str("/%s" % procedure),
353                                         headers=_RPC_CLIENT_HEADERS,
354                                         post_data=body,
355                                         read_timeout=read_timeout,
356                                         nicename="%s/%s" % (name, procedure),
357                                         curl_config_fn=_ConfigRpcCurl)
358
359     return (results, requests)
360
361   @staticmethod
362   def _CombineResults(results, requests, procedure):
363     """Combines pre-computed results for offline hosts with actual call results.
364
365     """
366     for name, req in requests.items():
367       if req.success and req.resp_status_code == http.HTTP_OK:
368         host_result = RpcResult(data=serializer.LoadJson(req.resp_body),
369                                 node=name, call=procedure)
370       else:
371         # TODO: Better error reporting
372         if req.error:
373           msg = req.error
374         else:
375           msg = req.resp_body
376
377         logging.error("RPC error in %s on node %s: %s", procedure, name, msg)
378         host_result = RpcResult(data=msg, failed=True, node=name,
379                                 call=procedure)
380
381       results[name] = host_result
382
383     return results
384
385   def __call__(self, hosts, procedure, body, read_timeout=None,
386                _req_process_fn=http.client.ProcessRequests):
387     """Makes an RPC request to a number of nodes.
388
389     @type hosts: sequence
390     @param hosts: Hostnames
391     @type procedure: string
392     @param procedure: Request path
393     @type body: string
394     @param body: Request body
395     @type read_timeout: int or None
396     @param read_timeout: Read timeout for request
397
398     """
399     assert read_timeout is not None, \
400       "Missing RPC read timeout for procedure '%s'" % procedure
401
402     (results, requests) = \
403       self._PrepareRequests(self._resolver(hosts), self._port, procedure,
404                             str(body), read_timeout)
405
406     _req_process_fn(requests.values(), lock_monitor_cb=self._lock_monitor_cb)
407
408     assert not frozenset(results).intersection(requests)
409
410     return self._CombineResults(results, requests, procedure)
411
412
413 class _RpcClientBase:
414   def __init__(self, resolver, encoder_fn, lock_monitor_cb=None):
415     """Initializes this class.
416
417     """
418     self._proc = _RpcProcessor(resolver,
419                                netutils.GetDaemonPort(constants.NODED),
420                                lock_monitor_cb=lock_monitor_cb)
421     self._encoder = compat.partial(self._EncodeArg, encoder_fn)
422
423   @staticmethod
424   def _EncodeArg(encoder_fn, (argkind, value)):
425     """Encode argument.
426
427     """
428     if argkind is None:
429       return value
430     else:
431       return encoder_fn(argkind)(value)
432
433   def _Call(self, cdef, node_list, timeout, args):
434     """Entry point for automatically generated RPC wrappers.
435
436     """
437     (procedure, _, _, argdefs, _, _) = cdef
438
439     body = serializer.DumpJson(map(self._encoder,
440                                    zip(map(compat.snd, argdefs), args)),
441                                indent=False)
442
443     return self._proc(node_list, procedure, body, read_timeout=timeout)
444
445
446 def _ObjectToDict(value):
447   """Converts an object to a dictionary.
448
449   @note: See L{objects}.
450
451   """
452   return value.ToDict()
453
454
455 def _ObjectListToDict(value):
456   """Converts a list of L{objects} to dictionaries.
457
458   """
459   return map(_ObjectToDict, value)
460
461
462 def _EncodeNodeToDiskDict(value):
463   """Encodes a dictionary with node name as key and disk objects as values.
464
465   """
466   return dict((name, _ObjectListToDict(disks))
467               for name, disks in value.items())
468
469
470 def _PrepareFileUpload(filename):
471   """Loads a file and prepares it for an upload to nodes.
472
473   """
474   data = _Compress(utils.ReadFile(filename))
475   st = os.stat(filename)
476   getents = runtime.GetEnts()
477   return [filename, data, st.st_mode, getents.LookupUid(st.st_uid),
478           getents.LookupGid(st.st_gid), st.st_atime, st.st_mtime]
479
480
481 def _PrepareFinalizeExportDisks(snap_disks):
482   """Encodes disks for finalizing export.
483
484   """
485   flat_disks = []
486
487   for disk in snap_disks:
488     if isinstance(disk, bool):
489       flat_disks.append(disk)
490     else:
491       flat_disks.append(disk.ToDict())
492
493   return flat_disks
494
495
496 def _EncodeImportExportIO((ieio, ieioargs)):
497   """Encodes import/export I/O information.
498
499   """
500   if ieio == constants.IEIO_RAW_DISK:
501     assert len(ieioargs) == 1
502     return (ieio, (ieioargs[0].ToDict(), ))
503
504   if ieio == constants.IEIO_SCRIPT:
505     assert len(ieioargs) == 2
506     return (ieio, (ieioargs[0].ToDict(), ieioargs[1]))
507
508   return (ieio, ieioargs)
509
510
511 def _EncodeBlockdevRename(value):
512   """Encodes information for renaming block devices.
513
514   """
515   return [(d.ToDict(), uid) for d, uid in value]
516
517
518 #: Generic encoders
519 _ENCODERS = {
520   rpc_defs.ED_OBJECT_DICT: _ObjectToDict,
521   rpc_defs.ED_OBJECT_DICT_LIST: _ObjectListToDict,
522   rpc_defs.ED_NODE_TO_DISK_DICT: _EncodeNodeToDiskDict,
523   rpc_defs.ED_FILE_DETAILS: _PrepareFileUpload,
524   rpc_defs.ED_COMPRESS: _Compress,
525   rpc_defs.ED_FINALIZE_EXPORT_DISKS: _PrepareFinalizeExportDisks,
526   rpc_defs.ED_IMPEXP_IO: _EncodeImportExportIO,
527   rpc_defs.ED_BLOCKDEV_RENAME: _EncodeBlockdevRename,
528   }
529
530
531 class RpcRunner(_RpcClientBase,
532                 _generated_rpc.RpcClientDefault,
533                 _generated_rpc.RpcClientBootstrap,
534                 _generated_rpc.RpcClientConfig):
535   """RPC runner class.
536
537   """
538   def __init__(self, context):
539     """Initialized the RPC runner.
540
541     @type context: C{masterd.GanetiContext}
542     @param context: Ganeti context
543
544     """
545     self._cfg = context.cfg
546
547     encoders = _ENCODERS.copy()
548
549     # Add encoders requiring configuration object
550     encoders.update({
551       rpc_defs.ED_INST_DICT: self._InstDict,
552       rpc_defs.ED_INST_DICT_HVP_BEP: self._InstDictHvpBep,
553       rpc_defs.ED_INST_DICT_OSP: self._InstDictOsp,
554       })
555
556     # Resolver using configuration
557     resolver = compat.partial(_NodeConfigResolver, self._cfg.GetNodeInfo,
558                               self._cfg.GetAllNodesInfo)
559
560     # Pylint doesn't recognize multiple inheritance properly, see
561     # <http://www.logilab.org/ticket/36586> and
562     # <http://www.logilab.org/ticket/35642>
563     # pylint: disable=W0233
564     _RpcClientBase.__init__(self, resolver, encoders.get,
565                             lock_monitor_cb=context.glm.AddToLockMonitor)
566     _generated_rpc.RpcClientConfig.__init__(self)
567     _generated_rpc.RpcClientBootstrap.__init__(self)
568     _generated_rpc.RpcClientDefault.__init__(self)
569
570   def _InstDict(self, instance, hvp=None, bep=None, osp=None):
571     """Convert the given instance to a dict.
572
573     This is done via the instance's ToDict() method and additionally
574     we fill the hvparams with the cluster defaults.
575
576     @type instance: L{objects.Instance}
577     @param instance: an Instance object
578     @type hvp: dict or None
579     @param hvp: a dictionary with overridden hypervisor parameters
580     @type bep: dict or None
581     @param bep: a dictionary with overridden backend parameters
582     @type osp: dict or None
583     @param osp: a dictionary with overridden os parameters
584     @rtype: dict
585     @return: the instance dict, with the hvparams filled with the
586         cluster defaults
587
588     """
589     idict = instance.ToDict()
590     cluster = self._cfg.GetClusterInfo()
591     idict["hvparams"] = cluster.FillHV(instance)
592     if hvp is not None:
593       idict["hvparams"].update(hvp)
594     idict["beparams"] = cluster.FillBE(instance)
595     if bep is not None:
596       idict["beparams"].update(bep)
597     idict["osparams"] = cluster.SimpleFillOS(instance.os, instance.osparams)
598     if osp is not None:
599       idict["osparams"].update(osp)
600     for nic in idict["nics"]:
601       nic['nicparams'] = objects.FillDict(
602         cluster.nicparams[constants.PP_DEFAULT],
603         nic['nicparams'])
604     return idict
605
606   def _InstDictHvpBep(self, (instance, hvp, bep)):
607     """Wrapper for L{_InstDict}.
608
609     """
610     return self._InstDict(instance, hvp=hvp, bep=bep)
611
612   def _InstDictOsp(self, (instance, osparams)):
613     """Wrapper for L{_InstDict}.
614
615     """
616     return self._InstDict(instance, osp=osparams)
617
618   @staticmethod
619   def _MigrationStatusPostProc(result):
620     if not result.fail_msg and result.payload is not None:
621       result.payload = objects.MigrationStatus.FromDict(result.payload)
622     return result
623
624   @staticmethod
625   def _BlockdevFindPostProc(result):
626     if not result.fail_msg and result.payload is not None:
627       result.payload = objects.BlockDevStatus.FromDict(result.payload)
628     return result
629
630   @staticmethod
631   def _BlockdevGetMirrorStatusPostProc(result):
632     if not result.fail_msg:
633       result.payload = [objects.BlockDevStatus.FromDict(i)
634                         for i in result.payload]
635     return result
636
637   @staticmethod
638   def _BlockdevGetMirrorStatusMultiPostProc(result):
639     for nres in result.values():
640       if nres.fail_msg:
641         continue
642
643       for idx, (success, status) in enumerate(nres.payload):
644         if success:
645           nres.payload[idx] = (success, objects.BlockDevStatus.FromDict(status))
646
647     return result
648
649   @staticmethod
650   def _OsGetPostProc(result):
651     if not result.fail_msg and isinstance(result.payload, dict):
652       result.payload = objects.OS.FromDict(result.payload)
653     return result
654
655   @staticmethod
656   def _ImpExpStatusPostProc(result):
657     """Post-processor for import/export status.
658
659     @rtype: Payload containing list of L{objects.ImportExportStatus} instances
660     @return: Returns a list of the state of each named import/export or None if
661              a status couldn't be retrieved
662
663     """
664     if not result.fail_msg:
665       decoded = []
666
667       for i in result.payload:
668         if i is None:
669           decoded.append(None)
670           continue
671         decoded.append(objects.ImportExportStatus.FromDict(i))
672
673       result.payload = decoded
674
675     return result
676
677   #
678   # Begin RPC calls
679   #
680
681   def call_test_delay(self, node_list, duration): # pylint: disable=W0221
682     """Sleep for a fixed time on given node(s).
683
684     This is a multi-node call.
685
686     """
687     # TODO: Use callable timeout calculation
688     return _generated_rpc.RpcClientDefault.call_test_delay(self,
689       node_list, duration, read_timeout=int(duration + 5))
690
691
692 class JobQueueRunner(_RpcClientBase, _generated_rpc.RpcClientJobQueue):
693   """RPC wrappers for job queue.
694
695   """
696   def __init__(self, context, address_list):
697     """Initializes this class.
698
699     """
700     if address_list is None:
701       resolver = _SsconfResolver
702     else:
703       # Caller provided an address list
704       resolver = _StaticResolver(address_list)
705
706     _RpcClientBase.__init__(self, resolver, _ENCODERS.get,
707                             lock_monitor_cb=context.glm.AddToLockMonitor)
708     _generated_rpc.RpcClientJobQueue.__init__(self)
709
710
711 class BootstrapRunner(_RpcClientBase, _generated_rpc.RpcClientBootstrap):
712   """RPC wrappers for bootstrapping.
713
714   """
715   def __init__(self):
716     """Initializes this class.
717
718     """
719     _RpcClientBase.__init__(self, _SsconfResolver, _ENCODERS.get)
720     _generated_rpc.RpcClientBootstrap.__init__(self)
721
722
723 class ConfigRunner(_RpcClientBase, _generated_rpc.RpcClientConfig):
724   """RPC wrappers for L{config}.
725
726   """
727   def __init__(self, context, address_list):
728     """Initializes this class.
729
730     """
731     if context:
732       lock_monitor_cb = context.glm.AddToLockMonitor
733     else:
734       lock_monitor_cb = None
735
736     if address_list is None:
737       resolver = _SsconfResolver
738     else:
739       # Caller provided an address list
740       resolver = _StaticResolver(address_list)
741
742     _RpcClientBase.__init__(self, resolver, _ENCODERS.get,
743                             lock_monitor_cb=lock_monitor_cb)
744     _generated_rpc.RpcClientConfig.__init__(self)