fcead58f94baa8ebe8775916ca60530eeebefb56
[ganeti-local] / lib / rpc.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Inter-node RPC library.
23
24 """
25
26 # pylint: disable=C0103,R0201,R0904
27 # C0103: Invalid name, since call_ are not valid
28 # R0201: Method could be a function, we keep all rpcs instance methods
29 # as not to change them back and forth between static/instance methods
30 # if they need to start using instance attributes
31 # R0904: Too many public methods
32
33 import os
34 import logging
35 import zlib
36 import base64
37 import pycurl
38 import threading
39
40 from ganeti import utils
41 from ganeti import objects
42 from ganeti import http
43 from ganeti import serializer
44 from ganeti import constants
45 from ganeti import errors
46 from ganeti import netutils
47 from ganeti import ssconf
48 from ganeti import runtime
49 from ganeti import compat
50
51 # Special module generated at build time
52 from ganeti import _generated_rpc
53
54 # pylint has a bug here, doesn't see this import
55 import ganeti.http.client  # pylint: disable=W0611
56
57
58 # Timeout for connecting to nodes (seconds)
59 _RPC_CONNECT_TIMEOUT = 5
60
61 _RPC_CLIENT_HEADERS = [
62   "Content-type: %s" % http.HTTP_APP_JSON,
63   "Expect:",
64   ]
65
66 # Various time constants for the timeout table
67 _TMO_URGENT = 60 # one minute
68 _TMO_FAST = 5 * 60 # five minutes
69 _TMO_NORMAL = 15 * 60 # 15 minutes
70 _TMO_SLOW = 3600 # one hour
71 _TMO_4HRS = 4 * 3600
72 _TMO_1DAY = 86400
73
74 #: Special value to describe an offline host
75 _OFFLINE = object()
76
77
78 def Init():
79   """Initializes the module-global HTTP client manager.
80
81   Must be called before using any RPC function and while exactly one thread is
82   running.
83
84   """
85   # curl_global_init(3) and curl_global_cleanup(3) must be called with only
86   # one thread running. This check is just a safety measure -- it doesn't
87   # cover all cases.
88   assert threading.activeCount() == 1, \
89          "Found more than one active thread when initializing pycURL"
90
91   logging.info("Using PycURL %s", pycurl.version)
92
93   pycurl.global_init(pycurl.GLOBAL_ALL)
94
95
96 def Shutdown():
97   """Stops the module-global HTTP client manager.
98
99   Must be called before quitting the program and while exactly one thread is
100   running.
101
102   """
103   pycurl.global_cleanup()
104
105
106 def _ConfigRpcCurl(curl):
107   noded_cert = str(constants.NODED_CERT_FILE)
108
109   curl.setopt(pycurl.FOLLOWLOCATION, False)
110   curl.setopt(pycurl.CAINFO, noded_cert)
111   curl.setopt(pycurl.SSL_VERIFYHOST, 0)
112   curl.setopt(pycurl.SSL_VERIFYPEER, True)
113   curl.setopt(pycurl.SSLCERTTYPE, "PEM")
114   curl.setopt(pycurl.SSLCERT, noded_cert)
115   curl.setopt(pycurl.SSLKEYTYPE, "PEM")
116   curl.setopt(pycurl.SSLKEY, noded_cert)
117   curl.setopt(pycurl.CONNECTTIMEOUT, _RPC_CONNECT_TIMEOUT)
118
119
120 def RunWithRPC(fn):
121   """RPC-wrapper decorator.
122
123   When applied to a function, it runs it with the RPC system
124   initialized, and it shutsdown the system afterwards. This means the
125   function must be called without RPC being initialized.
126
127   """
128   def wrapper(*args, **kwargs):
129     Init()
130     try:
131       return fn(*args, **kwargs)
132     finally:
133       Shutdown()
134   return wrapper
135
136
137 def _Compress(data):
138   """Compresses a string for transport over RPC.
139
140   Small amounts of data are not compressed.
141
142   @type data: str
143   @param data: Data
144   @rtype: tuple
145   @return: Encoded data to send
146
147   """
148   # Small amounts of data are not compressed
149   if len(data) < 512:
150     return (constants.RPC_ENCODING_NONE, data)
151
152   # Compress with zlib and encode in base64
153   return (constants.RPC_ENCODING_ZLIB_BASE64,
154           base64.b64encode(zlib.compress(data, 3)))
155
156
157 class RpcResult(object):
158   """RPC Result class.
159
160   This class holds an RPC result. It is needed since in multi-node
161   calls we can't raise an exception just because one one out of many
162   failed, and therefore we use this class to encapsulate the result.
163
164   @ivar data: the data payload, for successful results, or None
165   @ivar call: the name of the RPC call
166   @ivar node: the name of the node to which we made the call
167   @ivar offline: whether the operation failed because the node was
168       offline, as opposed to actual failure; offline=True will always
169       imply failed=True, in order to allow simpler checking if
170       the user doesn't care about the exact failure mode
171   @ivar fail_msg: the error message if the call failed
172
173   """
174   def __init__(self, data=None, failed=False, offline=False,
175                call=None, node=None):
176     self.offline = offline
177     self.call = call
178     self.node = node
179
180     if offline:
181       self.fail_msg = "Node is marked offline"
182       self.data = self.payload = None
183     elif failed:
184       self.fail_msg = self._EnsureErr(data)
185       self.data = self.payload = None
186     else:
187       self.data = data
188       if not isinstance(self.data, (tuple, list)):
189         self.fail_msg = ("RPC layer error: invalid result type (%s)" %
190                          type(self.data))
191         self.payload = None
192       elif len(data) != 2:
193         self.fail_msg = ("RPC layer error: invalid result length (%d), "
194                          "expected 2" % len(self.data))
195         self.payload = None
196       elif not self.data[0]:
197         self.fail_msg = self._EnsureErr(self.data[1])
198         self.payload = None
199       else:
200         # finally success
201         self.fail_msg = None
202         self.payload = data[1]
203
204     for attr_name in ["call", "data", "fail_msg",
205                       "node", "offline", "payload"]:
206       assert hasattr(self, attr_name), "Missing attribute %s" % attr_name
207
208   @staticmethod
209   def _EnsureErr(val):
210     """Helper to ensure we return a 'True' value for error."""
211     if val:
212       return val
213     else:
214       return "No error information"
215
216   def Raise(self, msg, prereq=False, ecode=None):
217     """If the result has failed, raise an OpExecError.
218
219     This is used so that LU code doesn't have to check for each
220     result, but instead can call this function.
221
222     """
223     if not self.fail_msg:
224       return
225
226     if not msg: # one could pass None for default message
227       msg = ("Call '%s' to node '%s' has failed: %s" %
228              (self.call, self.node, self.fail_msg))
229     else:
230       msg = "%s: %s" % (msg, self.fail_msg)
231     if prereq:
232       ec = errors.OpPrereqError
233     else:
234       ec = errors.OpExecError
235     if ecode is not None:
236       args = (msg, ecode)
237     else:
238       args = (msg, )
239     raise ec(*args) # pylint: disable=W0142
240
241
242 def _SsconfResolver(node_list,
243                     ssc=ssconf.SimpleStore,
244                     nslookup_fn=netutils.Hostname.GetIP):
245   """Return addresses for given node names.
246
247   @type node_list: list
248   @param node_list: List of node names
249   @type ssc: class
250   @param ssc: SimpleStore class that is used to obtain node->ip mappings
251   @type nslookup_fn: callable
252   @param nslookup_fn: function use to do NS lookup
253   @rtype: list of tuple; (string, string)
254   @return: List of tuples containing node name and IP address
255
256   """
257   ss = ssc()
258   iplist = ss.GetNodePrimaryIPList()
259   family = ss.GetPrimaryIPFamily()
260   ipmap = dict(entry.split() for entry in iplist)
261
262   result = []
263   for node in node_list:
264     ip = ipmap.get(node)
265     if ip is None:
266       ip = nslookup_fn(node, family=family)
267     result.append((node, ip))
268
269   return result
270
271
272 class _StaticResolver:
273   def __init__(self, addresses):
274     """Initializes this class.
275
276     """
277     self._addresses = addresses
278
279   def __call__(self, hosts):
280     """Returns static addresses for hosts.
281
282     """
283     assert len(hosts) == len(self._addresses)
284     return zip(hosts, self._addresses)
285
286
287 def _CheckConfigNode(name, node):
288   """Checks if a node is online.
289
290   @type name: string
291   @param name: Node name
292   @type node: L{objects.Node} or None
293   @param node: Node object
294
295   """
296   if node is None:
297     # Depend on DNS for name resolution
298     ip = name
299   elif node.offline:
300     ip = _OFFLINE
301   else:
302     ip = node.primary_ip
303   return (name, ip)
304
305
306 def _NodeConfigResolver(single_node_fn, all_nodes_fn, hosts):
307   """Calculate node addresses using configuration.
308
309   """
310   # Special case for single-host lookups
311   if len(hosts) == 1:
312     (name, ) = hosts
313     return [_CheckConfigNode(name, single_node_fn(name))]
314   else:
315     all_nodes = all_nodes_fn()
316     return [_CheckConfigNode(name, all_nodes.get(name, None))
317             for name in hosts]
318
319
320 class _RpcProcessor:
321   def __init__(self, resolver, port, lock_monitor_cb=None):
322     """Initializes this class.
323
324     @param resolver: callable accepting a list of hostnames, returning a list
325       of tuples containing name and IP address (IP address can be the name or
326       the special value L{_OFFLINE} to mark offline machines)
327     @type port: int
328     @param port: TCP port
329     @param lock_monitor_cb: Callable for registering with lock monitor
330
331     """
332     self._resolver = resolver
333     self._port = port
334     self._lock_monitor_cb = lock_monitor_cb
335
336   @staticmethod
337   def _PrepareRequests(hosts, port, procedure, body, read_timeout):
338     """Prepares requests by sorting offline hosts into separate list.
339
340     """
341     results = {}
342     requests = {}
343
344     for (name, ip) in hosts:
345       if ip is _OFFLINE:
346         # Node is marked as offline
347         results[name] = RpcResult(node=name, offline=True, call=procedure)
348       else:
349         requests[name] = \
350           http.client.HttpClientRequest(str(ip), port,
351                                         http.HTTP_PUT, str("/%s" % procedure),
352                                         headers=_RPC_CLIENT_HEADERS,
353                                         post_data=body,
354                                         read_timeout=read_timeout,
355                                         nicename="%s/%s" % (name, procedure),
356                                         curl_config_fn=_ConfigRpcCurl)
357
358     return (results, requests)
359
360   @staticmethod
361   def _CombineResults(results, requests, procedure):
362     """Combines pre-computed results for offline hosts with actual call results.
363
364     """
365     for name, req in requests.items():
366       if req.success and req.resp_status_code == http.HTTP_OK:
367         host_result = RpcResult(data=serializer.LoadJson(req.resp_body),
368                                 node=name, call=procedure)
369       else:
370         # TODO: Better error reporting
371         if req.error:
372           msg = req.error
373         else:
374           msg = req.resp_body
375
376         logging.error("RPC error in %s on node %s: %s", procedure, name, msg)
377         host_result = RpcResult(data=msg, failed=True, node=name,
378                                 call=procedure)
379
380       results[name] = host_result
381
382     return results
383
384   def __call__(self, hosts, procedure, body, read_timeout=None,
385                _req_process_fn=http.client.ProcessRequests):
386     """Makes an RPC request to a number of nodes.
387
388     @type hosts: sequence
389     @param hosts: Hostnames
390     @type procedure: string
391     @param procedure: Request path
392     @type body: string
393     @param body: Request body
394     @type read_timeout: int or None
395     @param read_timeout: Read timeout for request
396
397     """
398     assert read_timeout is not None, \
399       "Missing RPC read timeout for procedure '%s'" % procedure
400
401     (results, requests) = \
402       self._PrepareRequests(self._resolver(hosts), self._port, procedure,
403                             str(body), read_timeout)
404
405     _req_process_fn(requests.values(), lock_monitor_cb=self._lock_monitor_cb)
406
407     assert not frozenset(results).intersection(requests)
408
409     return self._CombineResults(results, requests, procedure)
410
411
412 class RpcRunner(_generated_rpc.RpcClientDefault,
413                 _generated_rpc.RpcClientBootstrap,
414                 _generated_rpc.RpcClientConfig):
415   """RPC runner class.
416
417   """
418   def __init__(self, context):
419     """Initialized the RPC runner.
420
421     @type context: C{masterd.GanetiContext}
422     @param context: Ganeti context
423
424     """
425     # Pylint doesn't recognize multiple inheritance properly, see
426     # <http://www.logilab.org/ticket/36586> and
427     # <http://www.logilab.org/ticket/35642>
428     # pylint: disable=W0233
429     _generated_rpc.RpcClientConfig.__init__(self)
430     _generated_rpc.RpcClientBootstrap.__init__(self)
431     _generated_rpc.RpcClientDefault.__init__(self)
432
433     self._cfg = context.cfg
434     self._proc = _RpcProcessor(compat.partial(_NodeConfigResolver,
435                                               self._cfg.GetNodeInfo,
436                                               self._cfg.GetAllNodesInfo),
437                                netutils.GetDaemonPort(constants.NODED),
438                                lock_monitor_cb=context.glm.AddToLockMonitor)
439
440   def _InstDict(self, instance, hvp=None, bep=None, osp=None):
441     """Convert the given instance to a dict.
442
443     This is done via the instance's ToDict() method and additionally
444     we fill the hvparams with the cluster defaults.
445
446     @type instance: L{objects.Instance}
447     @param instance: an Instance object
448     @type hvp: dict or None
449     @param hvp: a dictionary with overridden hypervisor parameters
450     @type bep: dict or None
451     @param bep: a dictionary with overridden backend parameters
452     @type osp: dict or None
453     @param osp: a dictionary with overridden os parameters
454     @rtype: dict
455     @return: the instance dict, with the hvparams filled with the
456         cluster defaults
457
458     """
459     idict = instance.ToDict()
460     cluster = self._cfg.GetClusterInfo()
461     idict["hvparams"] = cluster.FillHV(instance)
462     if hvp is not None:
463       idict["hvparams"].update(hvp)
464     idict["beparams"] = cluster.FillBE(instance)
465     if bep is not None:
466       idict["beparams"].update(bep)
467     idict["osparams"] = cluster.SimpleFillOS(instance.os, instance.osparams)
468     if osp is not None:
469       idict["osparams"].update(osp)
470     for nic in idict["nics"]:
471       nic['nicparams'] = objects.FillDict(
472         cluster.nicparams[constants.PP_DEFAULT],
473         nic['nicparams'])
474     return idict
475
476   def _InstDictHvpBep(self, (instance, hvp, bep)):
477     """Wrapper for L{_InstDict}.
478
479     """
480     return self._InstDict(instance, hvp=hvp, bep=bep)
481
482   def _InstDictOsp(self, (instance, osparams)):
483     """Wrapper for L{_InstDict}.
484
485     """
486     return self._InstDict(instance, osp=osparams)
487
488   def _Call(self, node_list, procedure, timeout, args):
489     """Entry point for automatically generated RPC wrappers.
490
491     """
492     body = serializer.DumpJson(args, indent=False)
493
494     return self._proc(node_list, procedure, body, read_timeout=timeout)
495
496   @staticmethod
497   def _MigrationStatusPostProc(result):
498     if not result.fail_msg and result.payload is not None:
499       result.payload = objects.MigrationStatus.FromDict(result.payload)
500     return result
501
502   @staticmethod
503   def _BlockdevFindPostProc(result):
504     if not result.fail_msg and result.payload is not None:
505       result.payload = objects.BlockDevStatus.FromDict(result.payload)
506     return result
507
508   @staticmethod
509   def _BlockdevGetMirrorStatusPostProc(result):
510     if not result.fail_msg:
511       result.payload = [objects.BlockDevStatus.FromDict(i)
512                         for i in result.payload]
513     return result
514
515   @staticmethod
516   def _BlockdevGetMirrorStatusMultiPostProc(result):
517     for nres in result.values():
518       if nres.fail_msg:
519         continue
520
521       for idx, (success, status) in enumerate(nres.payload):
522         if success:
523           nres.payload[idx] = (success, objects.BlockDevStatus.FromDict(status))
524
525     return result
526
527   @staticmethod
528   def _OsGetPostProc(result):
529     if not result.fail_msg and isinstance(result.payload, dict):
530       result.payload = objects.OS.FromDict(result.payload)
531     return result
532
533   @staticmethod
534   def _PrepareFinalizeExportDisks(snap_disks):
535     flat_disks = []
536
537     for disk in snap_disks:
538       if isinstance(disk, bool):
539         flat_disks.append(disk)
540       else:
541         flat_disks.append(disk.ToDict())
542
543     return flat_disks
544
545   @staticmethod
546   def _ImpExpStatusPostProc(result):
547     """Post-processor for import/export status.
548
549     @rtype: Payload containing list of L{objects.ImportExportStatus} instances
550     @return: Returns a list of the state of each named import/export or None if
551              a status couldn't be retrieved
552
553     """
554     if not result.fail_msg:
555       decoded = []
556
557       for i in result.payload:
558         if i is None:
559           decoded.append(None)
560           continue
561         decoded.append(objects.ImportExportStatus.FromDict(i))
562
563       result.payload = decoded
564
565     return result
566
567   @staticmethod
568   def _EncodeImportExportIO(ieio, ieioargs):
569     """Encodes import/export I/O information.
570
571     """
572     if ieio == constants.IEIO_RAW_DISK:
573       assert len(ieioargs) == 1
574       return (ieioargs[0].ToDict(), )
575
576     if ieio == constants.IEIO_SCRIPT:
577       assert len(ieioargs) == 2
578       return (ieioargs[0].ToDict(), ieioargs[1])
579
580     return ieioargs
581
582   @staticmethod
583   def _PrepareFileUpload(filename):
584     """Loads a file and prepares it for an upload to nodes.
585
586     """
587     data = _Compress(utils.ReadFile(filename))
588     st = os.stat(filename)
589     getents = runtime.GetEnts()
590     return [filename, data, st.st_mode, getents.LookupUid(st.st_uid),
591             getents.LookupGid(st.st_gid), st.st_atime, st.st_mtime]
592
593   #
594   # Begin RPC calls
595   #
596
597   def call_test_delay(self, node_list, duration, read_timeout=None):
598     """Sleep for a fixed time on given node(s).
599
600     This is a multi-node call.
601
602     """
603     assert read_timeout is None
604     return self.call_test_delay(node_list, duration,
605                                 read_timeout=int(duration + 5))
606
607
608 class JobQueueRunner(_generated_rpc.RpcClientJobQueue):
609   """RPC wrappers for job queue.
610
611   """
612   _Compress = staticmethod(_Compress)
613
614   def __init__(self, context, address_list):
615     """Initializes this class.
616
617     """
618     _generated_rpc.RpcClientJobQueue.__init__(self)
619
620     if address_list is None:
621       resolver = _SsconfResolver
622     else:
623       # Caller provided an address list
624       resolver = _StaticResolver(address_list)
625
626     self._proc = _RpcProcessor(resolver,
627                                netutils.GetDaemonPort(constants.NODED),
628                                lock_monitor_cb=context.glm.AddToLockMonitor)
629
630   def _Call(self, node_list, procedure, timeout, args):
631     """Entry point for automatically generated RPC wrappers.
632
633     """
634     body = serializer.DumpJson(args, indent=False)
635
636     return self._proc(node_list, procedure, body, read_timeout=timeout)
637
638
639 class BootstrapRunner(_generated_rpc.RpcClientBootstrap):
640   """RPC wrappers for bootstrapping.
641
642   """
643   def __init__(self):
644     """Initializes this class.
645
646     """
647     _generated_rpc.RpcClientBootstrap.__init__(self)
648
649     self._proc = _RpcProcessor(_SsconfResolver,
650                                netutils.GetDaemonPort(constants.NODED))
651
652   def _Call(self, node_list, procedure, timeout, args):
653     """Entry point for automatically generated RPC wrappers.
654
655     """
656     body = serializer.DumpJson(args, indent=False)
657
658     return self._proc(node_list, procedure, body, read_timeout=timeout)
659
660
661 class ConfigRunner(_generated_rpc.RpcClientConfig):
662   """RPC wrappers for L{config}.
663
664   """
665   _PrepareFileUpload = \
666     staticmethod(RpcRunner._PrepareFileUpload) # pylint: disable=W0212
667
668   def __init__(self, address_list):
669     """Initializes this class.
670
671     """
672     _generated_rpc.RpcClientConfig.__init__(self)
673
674     if address_list is None:
675       resolver = _SsconfResolver
676     else:
677       # Caller provided an address list
678       resolver = _StaticResolver(address_list)
679
680     self._proc = _RpcProcessor(resolver,
681                                netutils.GetDaemonPort(constants.NODED))
682
683   def _Call(self, node_list, procedure, timeout, args):
684     """Entry point for automatically generated RPC wrappers.
685
686     """
687     body = serializer.DumpJson(args, indent=False)
688
689     return self._proc(node_list, procedure, body, read_timeout=timeout)