Add QA test for “gnt-debug delay”
[ganeti-local] / lib / rpc.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Inter-node RPC library.
23
24 """
25
26 # pylint: disable=C0103,R0201,R0904
27 # C0103: Invalid name, since call_ are not valid
28 # R0201: Method could be a function, we keep all rpcs instance methods
29 # as not to change them back and forth between static/instance methods
30 # if they need to start using instance attributes
31 # R0904: Too many public methods
32
33 import os
34 import logging
35 import zlib
36 import base64
37 import pycurl
38 import threading
39
40 from ganeti import utils
41 from ganeti import objects
42 from ganeti import http
43 from ganeti import serializer
44 from ganeti import constants
45 from ganeti import errors
46 from ganeti import netutils
47 from ganeti import ssconf
48 from ganeti import runtime
49 from ganeti import compat
50 from ganeti import rpc_defs
51
52 # Special module generated at build time
53 from ganeti import _generated_rpc
54
55 # pylint has a bug here, doesn't see this import
56 import ganeti.http.client  # pylint: disable=W0611
57
58
59 # Timeout for connecting to nodes (seconds)
60 _RPC_CONNECT_TIMEOUT = 5
61
62 _RPC_CLIENT_HEADERS = [
63   "Content-type: %s" % http.HTTP_APP_JSON,
64   "Expect:",
65   ]
66
67 # Various time constants for the timeout table
68 _TMO_URGENT = 60 # one minute
69 _TMO_FAST = 5 * 60 # five minutes
70 _TMO_NORMAL = 15 * 60 # 15 minutes
71 _TMO_SLOW = 3600 # one hour
72 _TMO_4HRS = 4 * 3600
73 _TMO_1DAY = 86400
74
75 #: Special value to describe an offline host
76 _OFFLINE = object()
77
78
79 def Init():
80   """Initializes the module-global HTTP client manager.
81
82   Must be called before using any RPC function and while exactly one thread is
83   running.
84
85   """
86   # curl_global_init(3) and curl_global_cleanup(3) must be called with only
87   # one thread running. This check is just a safety measure -- it doesn't
88   # cover all cases.
89   assert threading.activeCount() == 1, \
90          "Found more than one active thread when initializing pycURL"
91
92   logging.info("Using PycURL %s", pycurl.version)
93
94   pycurl.global_init(pycurl.GLOBAL_ALL)
95
96
97 def Shutdown():
98   """Stops the module-global HTTP client manager.
99
100   Must be called before quitting the program and while exactly one thread is
101   running.
102
103   """
104   pycurl.global_cleanup()
105
106
107 def _ConfigRpcCurl(curl):
108   noded_cert = str(constants.NODED_CERT_FILE)
109
110   curl.setopt(pycurl.FOLLOWLOCATION, False)
111   curl.setopt(pycurl.CAINFO, noded_cert)
112   curl.setopt(pycurl.SSL_VERIFYHOST, 0)
113   curl.setopt(pycurl.SSL_VERIFYPEER, True)
114   curl.setopt(pycurl.SSLCERTTYPE, "PEM")
115   curl.setopt(pycurl.SSLCERT, noded_cert)
116   curl.setopt(pycurl.SSLKEYTYPE, "PEM")
117   curl.setopt(pycurl.SSLKEY, noded_cert)
118   curl.setopt(pycurl.CONNECTTIMEOUT, _RPC_CONNECT_TIMEOUT)
119
120
121 def RunWithRPC(fn):
122   """RPC-wrapper decorator.
123
124   When applied to a function, it runs it with the RPC system
125   initialized, and it shutsdown the system afterwards. This means the
126   function must be called without RPC being initialized.
127
128   """
129   def wrapper(*args, **kwargs):
130     Init()
131     try:
132       return fn(*args, **kwargs)
133     finally:
134       Shutdown()
135   return wrapper
136
137
138 def _Compress(data):
139   """Compresses a string for transport over RPC.
140
141   Small amounts of data are not compressed.
142
143   @type data: str
144   @param data: Data
145   @rtype: tuple
146   @return: Encoded data to send
147
148   """
149   # Small amounts of data are not compressed
150   if len(data) < 512:
151     return (constants.RPC_ENCODING_NONE, data)
152
153   # Compress with zlib and encode in base64
154   return (constants.RPC_ENCODING_ZLIB_BASE64,
155           base64.b64encode(zlib.compress(data, 3)))
156
157
158 class RpcResult(object):
159   """RPC Result class.
160
161   This class holds an RPC result. It is needed since in multi-node
162   calls we can't raise an exception just because one one out of many
163   failed, and therefore we use this class to encapsulate the result.
164
165   @ivar data: the data payload, for successful results, or None
166   @ivar call: the name of the RPC call
167   @ivar node: the name of the node to which we made the call
168   @ivar offline: whether the operation failed because the node was
169       offline, as opposed to actual failure; offline=True will always
170       imply failed=True, in order to allow simpler checking if
171       the user doesn't care about the exact failure mode
172   @ivar fail_msg: the error message if the call failed
173
174   """
175   def __init__(self, data=None, failed=False, offline=False,
176                call=None, node=None):
177     self.offline = offline
178     self.call = call
179     self.node = node
180
181     if offline:
182       self.fail_msg = "Node is marked offline"
183       self.data = self.payload = None
184     elif failed:
185       self.fail_msg = self._EnsureErr(data)
186       self.data = self.payload = None
187     else:
188       self.data = data
189       if not isinstance(self.data, (tuple, list)):
190         self.fail_msg = ("RPC layer error: invalid result type (%s)" %
191                          type(self.data))
192         self.payload = None
193       elif len(data) != 2:
194         self.fail_msg = ("RPC layer error: invalid result length (%d), "
195                          "expected 2" % len(self.data))
196         self.payload = None
197       elif not self.data[0]:
198         self.fail_msg = self._EnsureErr(self.data[1])
199         self.payload = None
200       else:
201         # finally success
202         self.fail_msg = None
203         self.payload = data[1]
204
205     for attr_name in ["call", "data", "fail_msg",
206                       "node", "offline", "payload"]:
207       assert hasattr(self, attr_name), "Missing attribute %s" % attr_name
208
209   @staticmethod
210   def _EnsureErr(val):
211     """Helper to ensure we return a 'True' value for error."""
212     if val:
213       return val
214     else:
215       return "No error information"
216
217   def Raise(self, msg, prereq=False, ecode=None):
218     """If the result has failed, raise an OpExecError.
219
220     This is used so that LU code doesn't have to check for each
221     result, but instead can call this function.
222
223     """
224     if not self.fail_msg:
225       return
226
227     if not msg: # one could pass None for default message
228       msg = ("Call '%s' to node '%s' has failed: %s" %
229              (self.call, self.node, self.fail_msg))
230     else:
231       msg = "%s: %s" % (msg, self.fail_msg)
232     if prereq:
233       ec = errors.OpPrereqError
234     else:
235       ec = errors.OpExecError
236     if ecode is not None:
237       args = (msg, ecode)
238     else:
239       args = (msg, )
240     raise ec(*args) # pylint: disable=W0142
241
242
243 def _SsconfResolver(node_list,
244                     ssc=ssconf.SimpleStore,
245                     nslookup_fn=netutils.Hostname.GetIP):
246   """Return addresses for given node names.
247
248   @type node_list: list
249   @param node_list: List of node names
250   @type ssc: class
251   @param ssc: SimpleStore class that is used to obtain node->ip mappings
252   @type nslookup_fn: callable
253   @param nslookup_fn: function use to do NS lookup
254   @rtype: list of tuple; (string, string)
255   @return: List of tuples containing node name and IP address
256
257   """
258   ss = ssc()
259   iplist = ss.GetNodePrimaryIPList()
260   family = ss.GetPrimaryIPFamily()
261   ipmap = dict(entry.split() for entry in iplist)
262
263   result = []
264   for node in node_list:
265     ip = ipmap.get(node)
266     if ip is None:
267       ip = nslookup_fn(node, family=family)
268     result.append((node, ip))
269
270   return result
271
272
273 class _StaticResolver:
274   def __init__(self, addresses):
275     """Initializes this class.
276
277     """
278     self._addresses = addresses
279
280   def __call__(self, hosts):
281     """Returns static addresses for hosts.
282
283     """
284     assert len(hosts) == len(self._addresses)
285     return zip(hosts, self._addresses)
286
287
288 def _CheckConfigNode(name, node):
289   """Checks if a node is online.
290
291   @type name: string
292   @param name: Node name
293   @type node: L{objects.Node} or None
294   @param node: Node object
295
296   """
297   if node is None:
298     # Depend on DNS for name resolution
299     ip = name
300   elif node.offline:
301     ip = _OFFLINE
302   else:
303     ip = node.primary_ip
304   return (name, ip)
305
306
307 def _NodeConfigResolver(single_node_fn, all_nodes_fn, hosts):
308   """Calculate node addresses using configuration.
309
310   """
311   # Special case for single-host lookups
312   if len(hosts) == 1:
313     (name, ) = hosts
314     return [_CheckConfigNode(name, single_node_fn(name))]
315   else:
316     all_nodes = all_nodes_fn()
317     return [_CheckConfigNode(name, all_nodes.get(name, None))
318             for name in hosts]
319
320
321 class _RpcProcessor:
322   def __init__(self, resolver, port, lock_monitor_cb=None):
323     """Initializes this class.
324
325     @param resolver: callable accepting a list of hostnames, returning a list
326       of tuples containing name and IP address (IP address can be the name or
327       the special value L{_OFFLINE} to mark offline machines)
328     @type port: int
329     @param port: TCP port
330     @param lock_monitor_cb: Callable for registering with lock monitor
331
332     """
333     self._resolver = resolver
334     self._port = port
335     self._lock_monitor_cb = lock_monitor_cb
336
337   @staticmethod
338   def _PrepareRequests(hosts, port, procedure, body, read_timeout):
339     """Prepares requests by sorting offline hosts into separate list.
340
341     """
342     results = {}
343     requests = {}
344
345     for (name, ip) in hosts:
346       if ip is _OFFLINE:
347         # Node is marked as offline
348         results[name] = RpcResult(node=name, offline=True, call=procedure)
349       else:
350         requests[name] = \
351           http.client.HttpClientRequest(str(ip), port,
352                                         http.HTTP_PUT, str("/%s" % procedure),
353                                         headers=_RPC_CLIENT_HEADERS,
354                                         post_data=body,
355                                         read_timeout=read_timeout,
356                                         nicename="%s/%s" % (name, procedure),
357                                         curl_config_fn=_ConfigRpcCurl)
358
359     return (results, requests)
360
361   @staticmethod
362   def _CombineResults(results, requests, procedure):
363     """Combines pre-computed results for offline hosts with actual call results.
364
365     """
366     for name, req in requests.items():
367       if req.success and req.resp_status_code == http.HTTP_OK:
368         host_result = RpcResult(data=serializer.LoadJson(req.resp_body),
369                                 node=name, call=procedure)
370       else:
371         # TODO: Better error reporting
372         if req.error:
373           msg = req.error
374         else:
375           msg = req.resp_body
376
377         logging.error("RPC error in %s on node %s: %s", procedure, name, msg)
378         host_result = RpcResult(data=msg, failed=True, node=name,
379                                 call=procedure)
380
381       results[name] = host_result
382
383     return results
384
385   def __call__(self, hosts, procedure, body, read_timeout=None,
386                _req_process_fn=http.client.ProcessRequests):
387     """Makes an RPC request to a number of nodes.
388
389     @type hosts: sequence
390     @param hosts: Hostnames
391     @type procedure: string
392     @param procedure: Request path
393     @type body: string
394     @param body: Request body
395     @type read_timeout: int or None
396     @param read_timeout: Read timeout for request
397
398     """
399     assert read_timeout is not None, \
400       "Missing RPC read timeout for procedure '%s'" % procedure
401
402     (results, requests) = \
403       self._PrepareRequests(self._resolver(hosts), self._port, procedure,
404                             str(body), read_timeout)
405
406     _req_process_fn(requests.values(), lock_monitor_cb=self._lock_monitor_cb)
407
408     assert not frozenset(results).intersection(requests)
409
410     return self._CombineResults(results, requests, procedure)
411
412
413 class _RpcClientBase:
414   def __init__(self, resolver, encoder_fn, lock_monitor_cb=None):
415     """Initializes this class.
416
417     """
418     self._proc = _RpcProcessor(resolver,
419                                netutils.GetDaemonPort(constants.NODED),
420                                lock_monitor_cb=lock_monitor_cb)
421     self._encoder = compat.partial(self._EncodeArg, encoder_fn)
422
423   @staticmethod
424   def _EncodeArg(encoder_fn, (argkind, value)):
425     """Encode argument.
426
427     """
428     if argkind is None:
429       return value
430     else:
431       return encoder_fn(argkind)(value)
432
433   def _Call(self, node_list, procedure, timeout, argdefs, args):
434     """Entry point for automatically generated RPC wrappers.
435
436     """
437     assert len(args) == len(argdefs), "Wrong number of arguments"
438
439     body = serializer.DumpJson(map(self._encoder, zip(argdefs, args)),
440                                indent=False)
441
442     return self._proc(node_list, procedure, body, read_timeout=timeout)
443
444
445 def _ObjectToDict(value):
446   """Converts an object to a dictionary.
447
448   @note: See L{objects}.
449
450   """
451   return value.ToDict()
452
453
454 def _ObjectListToDict(value):
455   """Converts a list of L{objects} to dictionaries.
456
457   """
458   return map(_ObjectToDict, value)
459
460
461 def _EncodeNodeToDiskDict(value):
462   """Encodes a dictionary with node name as key and disk objects as values.
463
464   """
465   return dict((name, _ObjectListToDict(disks))
466               for name, disks in value.items())
467
468
469 def _PrepareFileUpload(filename):
470   """Loads a file and prepares it for an upload to nodes.
471
472   """
473   data = _Compress(utils.ReadFile(filename))
474   st = os.stat(filename)
475   getents = runtime.GetEnts()
476   return [filename, data, st.st_mode, getents.LookupUid(st.st_uid),
477           getents.LookupGid(st.st_gid), st.st_atime, st.st_mtime]
478
479
480 def _PrepareFinalizeExportDisks(snap_disks):
481   """Encodes disks for finalizing export.
482
483   """
484   flat_disks = []
485
486   for disk in snap_disks:
487     if isinstance(disk, bool):
488       flat_disks.append(disk)
489     else:
490       flat_disks.append(disk.ToDict())
491
492   return flat_disks
493
494
495 def _EncodeImportExportIO((ieio, ieioargs)):
496   """Encodes import/export I/O information.
497
498   """
499   if ieio == constants.IEIO_RAW_DISK:
500     assert len(ieioargs) == 1
501     return (ieio, (ieioargs[0].ToDict(), ))
502
503   if ieio == constants.IEIO_SCRIPT:
504     assert len(ieioargs) == 2
505     return (ieio, (ieioargs[0].ToDict(), ieioargs[1]))
506
507   return (ieio, ieioargs)
508
509
510 def _EncodeBlockdevRename(value):
511   """Encodes information for renaming block devices.
512
513   """
514   return [(d.ToDict(), uid) for d, uid in value]
515
516
517 #: Generic encoders
518 _ENCODERS = {
519   rpc_defs.ED_OBJECT_DICT: _ObjectToDict,
520   rpc_defs.ED_OBJECT_DICT_LIST: _ObjectListToDict,
521   rpc_defs.ED_NODE_TO_DISK_DICT: _EncodeNodeToDiskDict,
522   rpc_defs.ED_FILE_DETAILS: _PrepareFileUpload,
523   rpc_defs.ED_COMPRESS: _Compress,
524   rpc_defs.ED_FINALIZE_EXPORT_DISKS: _PrepareFinalizeExportDisks,
525   rpc_defs.ED_IMPEXP_IO: _EncodeImportExportIO,
526   rpc_defs.ED_BLOCKDEV_RENAME: _EncodeBlockdevRename,
527   }
528
529
530 class RpcRunner(_RpcClientBase,
531                 _generated_rpc.RpcClientDefault,
532                 _generated_rpc.RpcClientBootstrap,
533                 _generated_rpc.RpcClientConfig):
534   """RPC runner class.
535
536   """
537   def __init__(self, context):
538     """Initialized the RPC runner.
539
540     @type context: C{masterd.GanetiContext}
541     @param context: Ganeti context
542
543     """
544     self._cfg = context.cfg
545
546     encoders = _ENCODERS.copy()
547
548     # Add encoders requiring configuration object
549     encoders.update({
550       rpc_defs.ED_INST_DICT: self._InstDict,
551       rpc_defs.ED_INST_DICT_HVP_BEP: self._InstDictHvpBep,
552       rpc_defs.ED_INST_DICT_OSP: self._InstDictOsp,
553       })
554
555     # Resolver using configuration
556     resolver = compat.partial(_NodeConfigResolver, self._cfg.GetNodeInfo,
557                               self._cfg.GetAllNodesInfo)
558
559     # Pylint doesn't recognize multiple inheritance properly, see
560     # <http://www.logilab.org/ticket/36586> and
561     # <http://www.logilab.org/ticket/35642>
562     # pylint: disable=W0233
563     _RpcClientBase.__init__(self, resolver, encoders.get,
564                             lock_monitor_cb=context.glm.AddToLockMonitor)
565     _generated_rpc.RpcClientConfig.__init__(self)
566     _generated_rpc.RpcClientBootstrap.__init__(self)
567     _generated_rpc.RpcClientDefault.__init__(self)
568
569   def _InstDict(self, instance, hvp=None, bep=None, osp=None):
570     """Convert the given instance to a dict.
571
572     This is done via the instance's ToDict() method and additionally
573     we fill the hvparams with the cluster defaults.
574
575     @type instance: L{objects.Instance}
576     @param instance: an Instance object
577     @type hvp: dict or None
578     @param hvp: a dictionary with overridden hypervisor parameters
579     @type bep: dict or None
580     @param bep: a dictionary with overridden backend parameters
581     @type osp: dict or None
582     @param osp: a dictionary with overridden os parameters
583     @rtype: dict
584     @return: the instance dict, with the hvparams filled with the
585         cluster defaults
586
587     """
588     idict = instance.ToDict()
589     cluster = self._cfg.GetClusterInfo()
590     idict["hvparams"] = cluster.FillHV(instance)
591     if hvp is not None:
592       idict["hvparams"].update(hvp)
593     idict["beparams"] = cluster.FillBE(instance)
594     if bep is not None:
595       idict["beparams"].update(bep)
596     idict["osparams"] = cluster.SimpleFillOS(instance.os, instance.osparams)
597     if osp is not None:
598       idict["osparams"].update(osp)
599     for nic in idict["nics"]:
600       nic['nicparams'] = objects.FillDict(
601         cluster.nicparams[constants.PP_DEFAULT],
602         nic['nicparams'])
603     return idict
604
605   def _InstDictHvpBep(self, (instance, hvp, bep)):
606     """Wrapper for L{_InstDict}.
607
608     """
609     return self._InstDict(instance, hvp=hvp, bep=bep)
610
611   def _InstDictOsp(self, (instance, osparams)):
612     """Wrapper for L{_InstDict}.
613
614     """
615     return self._InstDict(instance, osp=osparams)
616
617   @staticmethod
618   def _MigrationStatusPostProc(result):
619     if not result.fail_msg and result.payload is not None:
620       result.payload = objects.MigrationStatus.FromDict(result.payload)
621     return result
622
623   @staticmethod
624   def _BlockdevFindPostProc(result):
625     if not result.fail_msg and result.payload is not None:
626       result.payload = objects.BlockDevStatus.FromDict(result.payload)
627     return result
628
629   @staticmethod
630   def _BlockdevGetMirrorStatusPostProc(result):
631     if not result.fail_msg:
632       result.payload = [objects.BlockDevStatus.FromDict(i)
633                         for i in result.payload]
634     return result
635
636   @staticmethod
637   def _BlockdevGetMirrorStatusMultiPostProc(result):
638     for nres in result.values():
639       if nres.fail_msg:
640         continue
641
642       for idx, (success, status) in enumerate(nres.payload):
643         if success:
644           nres.payload[idx] = (success, objects.BlockDevStatus.FromDict(status))
645
646     return result
647
648   @staticmethod
649   def _OsGetPostProc(result):
650     if not result.fail_msg and isinstance(result.payload, dict):
651       result.payload = objects.OS.FromDict(result.payload)
652     return result
653
654   @staticmethod
655   def _ImpExpStatusPostProc(result):
656     """Post-processor for import/export status.
657
658     @rtype: Payload containing list of L{objects.ImportExportStatus} instances
659     @return: Returns a list of the state of each named import/export or None if
660              a status couldn't be retrieved
661
662     """
663     if not result.fail_msg:
664       decoded = []
665
666       for i in result.payload:
667         if i is None:
668           decoded.append(None)
669           continue
670         decoded.append(objects.ImportExportStatus.FromDict(i))
671
672       result.payload = decoded
673
674     return result
675
676   #
677   # Begin RPC calls
678   #
679
680   def call_test_delay(self, node_list, duration): # pylint: disable=W0221
681     """Sleep for a fixed time on given node(s).
682
683     This is a multi-node call.
684
685     """
686     # TODO: Use callable timeout calculation
687     return _generated_rpc.RpcClientDefault.call_test_delay(self,
688       node_list, duration, read_timeout=int(duration + 5))
689
690
691 class JobQueueRunner(_RpcClientBase, _generated_rpc.RpcClientJobQueue):
692   """RPC wrappers for job queue.
693
694   """
695   def __init__(self, context, address_list):
696     """Initializes this class.
697
698     """
699     if address_list is None:
700       resolver = _SsconfResolver
701     else:
702       # Caller provided an address list
703       resolver = _StaticResolver(address_list)
704
705     _RpcClientBase.__init__(self, resolver, _ENCODERS.get,
706                             lock_monitor_cb=context.glm.AddToLockMonitor)
707     _generated_rpc.RpcClientJobQueue.__init__(self)
708
709
710 class BootstrapRunner(_RpcClientBase, _generated_rpc.RpcClientBootstrap):
711   """RPC wrappers for bootstrapping.
712
713   """
714   def __init__(self):
715     """Initializes this class.
716
717     """
718     _RpcClientBase.__init__(self, _SsconfResolver, _ENCODERS.get)
719     _generated_rpc.RpcClientBootstrap.__init__(self)
720
721
722 class ConfigRunner(_RpcClientBase, _generated_rpc.RpcClientConfig):
723   """RPC wrappers for L{config}.
724
725   """
726   def __init__(self, context, address_list):
727     """Initializes this class.
728
729     """
730     if context:
731       lock_monitor_cb = context.glm.AddToLockMonitor
732     else:
733       lock_monitor_cb = None
734
735     if address_list is None:
736       resolver = _SsconfResolver
737     else:
738       # Caller provided an address list
739       resolver = _StaticResolver(address_list)
740
741     _RpcClientBase.__init__(self, resolver, _ENCODERS.get,
742                             lock_monitor_cb=lock_monitor_cb)
743     _generated_rpc.RpcClientConfig.__init__(self)