Statistics
| Branch: | Tag: | Revision:

root / lib / rpc.py @ b8c160c1

History | View | Annotate | Download (19.8 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Inter-node RPC library.
23

24
"""
25

    
26
# pylint: disable=C0103,R0201,R0904
27
# C0103: Invalid name, since call_ are not valid
28
# R0201: Method could be a function, we keep all rpcs instance methods
29
# as not to change them back and forth between static/instance methods
30
# if they need to start using instance attributes
31
# R0904: Too many public methods
32

    
33
import os
34
import logging
35
import zlib
36
import base64
37
import pycurl
38
import threading
39

    
40
from ganeti import utils
41
from ganeti import objects
42
from ganeti import http
43
from ganeti import serializer
44
from ganeti import constants
45
from ganeti import errors
46
from ganeti import netutils
47
from ganeti import ssconf
48
from ganeti import runtime
49
from ganeti import compat
50

    
51
# Special module generated at build time
52
from ganeti import _generated_rpc
53

    
54
# pylint has a bug here, doesn't see this import
55
import ganeti.http.client  # pylint: disable=W0611
56

    
57

    
58
# Timeout for connecting to nodes (seconds)
59
_RPC_CONNECT_TIMEOUT = 5
60

    
61
_RPC_CLIENT_HEADERS = [
62
  "Content-type: %s" % http.HTTP_APP_JSON,
63
  "Expect:",
64
  ]
65

    
66
# Various time constants for the timeout table
67
_TMO_URGENT = 60 # one minute
68
_TMO_FAST = 5 * 60 # five minutes
69
_TMO_NORMAL = 15 * 60 # 15 minutes
70
_TMO_SLOW = 3600 # one hour
71
_TMO_4HRS = 4 * 3600
72
_TMO_1DAY = 86400
73

    
74
#: Special value to describe an offline host
75
_OFFLINE = object()
76

    
77

    
78
def Init():
79
  """Initializes the module-global HTTP client manager.
80

81
  Must be called before using any RPC function and while exactly one thread is
82
  running.
83

84
  """
85
  # curl_global_init(3) and curl_global_cleanup(3) must be called with only
86
  # one thread running. This check is just a safety measure -- it doesn't
87
  # cover all cases.
88
  assert threading.activeCount() == 1, \
89
         "Found more than one active thread when initializing pycURL"
90

    
91
  logging.info("Using PycURL %s", pycurl.version)
92

    
93
  pycurl.global_init(pycurl.GLOBAL_ALL)
94

    
95

    
96
def Shutdown():
97
  """Stops the module-global HTTP client manager.
98

99
  Must be called before quitting the program and while exactly one thread is
100
  running.
101

102
  """
103
  pycurl.global_cleanup()
104

    
105

    
106
def _ConfigRpcCurl(curl):
107
  noded_cert = str(constants.NODED_CERT_FILE)
108

    
109
  curl.setopt(pycurl.FOLLOWLOCATION, False)
110
  curl.setopt(pycurl.CAINFO, noded_cert)
111
  curl.setopt(pycurl.SSL_VERIFYHOST, 0)
112
  curl.setopt(pycurl.SSL_VERIFYPEER, True)
113
  curl.setopt(pycurl.SSLCERTTYPE, "PEM")
114
  curl.setopt(pycurl.SSLCERT, noded_cert)
115
  curl.setopt(pycurl.SSLKEYTYPE, "PEM")
116
  curl.setopt(pycurl.SSLKEY, noded_cert)
117
  curl.setopt(pycurl.CONNECTTIMEOUT, _RPC_CONNECT_TIMEOUT)
118

    
119

    
120
def RunWithRPC(fn):
121
  """RPC-wrapper decorator.
122

123
  When applied to a function, it runs it with the RPC system
124
  initialized, and it shutsdown the system afterwards. This means the
125
  function must be called without RPC being initialized.
126

127
  """
128
  def wrapper(*args, **kwargs):
129
    Init()
130
    try:
131
      return fn(*args, **kwargs)
132
    finally:
133
      Shutdown()
134
  return wrapper
135

    
136

    
137
def _Compress(data):
138
  """Compresses a string for transport over RPC.
139

140
  Small amounts of data are not compressed.
141

142
  @type data: str
143
  @param data: Data
144
  @rtype: tuple
145
  @return: Encoded data to send
146

147
  """
148
  # Small amounts of data are not compressed
149
  if len(data) < 512:
150
    return (constants.RPC_ENCODING_NONE, data)
151

    
152
  # Compress with zlib and encode in base64
153
  return (constants.RPC_ENCODING_ZLIB_BASE64,
154
          base64.b64encode(zlib.compress(data, 3)))
155

    
156

    
157
class RpcResult(object):
158
  """RPC Result class.
159

160
  This class holds an RPC result. It is needed since in multi-node
161
  calls we can't raise an exception just because one one out of many
162
  failed, and therefore we use this class to encapsulate the result.
163

164
  @ivar data: the data payload, for successful results, or None
165
  @ivar call: the name of the RPC call
166
  @ivar node: the name of the node to which we made the call
167
  @ivar offline: whether the operation failed because the node was
168
      offline, as opposed to actual failure; offline=True will always
169
      imply failed=True, in order to allow simpler checking if
170
      the user doesn't care about the exact failure mode
171
  @ivar fail_msg: the error message if the call failed
172

173
  """
174
  def __init__(self, data=None, failed=False, offline=False,
175
               call=None, node=None):
176
    self.offline = offline
177
    self.call = call
178
    self.node = node
179

    
180
    if offline:
181
      self.fail_msg = "Node is marked offline"
182
      self.data = self.payload = None
183
    elif failed:
184
      self.fail_msg = self._EnsureErr(data)
185
      self.data = self.payload = None
186
    else:
187
      self.data = data
188
      if not isinstance(self.data, (tuple, list)):
189
        self.fail_msg = ("RPC layer error: invalid result type (%s)" %
190
                         type(self.data))
191
        self.payload = None
192
      elif len(data) != 2:
193
        self.fail_msg = ("RPC layer error: invalid result length (%d), "
194
                         "expected 2" % len(self.data))
195
        self.payload = None
196
      elif not self.data[0]:
197
        self.fail_msg = self._EnsureErr(self.data[1])
198
        self.payload = None
199
      else:
200
        # finally success
201
        self.fail_msg = None
202
        self.payload = data[1]
203

    
204
    for attr_name in ["call", "data", "fail_msg",
205
                      "node", "offline", "payload"]:
206
      assert hasattr(self, attr_name), "Missing attribute %s" % attr_name
207

    
208
  @staticmethod
209
  def _EnsureErr(val):
210
    """Helper to ensure we return a 'True' value for error."""
211
    if val:
212
      return val
213
    else:
214
      return "No error information"
215

    
216
  def Raise(self, msg, prereq=False, ecode=None):
217
    """If the result has failed, raise an OpExecError.
218

219
    This is used so that LU code doesn't have to check for each
220
    result, but instead can call this function.
221

222
    """
223
    if not self.fail_msg:
224
      return
225

    
226
    if not msg: # one could pass None for default message
227
      msg = ("Call '%s' to node '%s' has failed: %s" %
228
             (self.call, self.node, self.fail_msg))
229
    else:
230
      msg = "%s: %s" % (msg, self.fail_msg)
231
    if prereq:
232
      ec = errors.OpPrereqError
233
    else:
234
      ec = errors.OpExecError
235
    if ecode is not None:
236
      args = (msg, ecode)
237
    else:
238
      args = (msg, )
239
    raise ec(*args) # pylint: disable=W0142
240

    
241

    
242
def _SsconfResolver(node_list,
243
                    ssc=ssconf.SimpleStore,
244
                    nslookup_fn=netutils.Hostname.GetIP):
245
  """Return addresses for given node names.
246

247
  @type node_list: list
248
  @param node_list: List of node names
249
  @type ssc: class
250
  @param ssc: SimpleStore class that is used to obtain node->ip mappings
251
  @type nslookup_fn: callable
252
  @param nslookup_fn: function use to do NS lookup
253
  @rtype: list of tuple; (string, string)
254
  @return: List of tuples containing node name and IP address
255

256
  """
257
  ss = ssc()
258
  iplist = ss.GetNodePrimaryIPList()
259
  family = ss.GetPrimaryIPFamily()
260
  ipmap = dict(entry.split() for entry in iplist)
261

    
262
  result = []
263
  for node in node_list:
264
    ip = ipmap.get(node)
265
    if ip is None:
266
      ip = nslookup_fn(node, family=family)
267
    result.append((node, ip))
268

    
269
  return result
270

    
271

    
272
class _StaticResolver:
273
  def __init__(self, addresses):
274
    """Initializes this class.
275

276
    """
277
    self._addresses = addresses
278

    
279
  def __call__(self, hosts):
280
    """Returns static addresses for hosts.
281

282
    """
283
    assert len(hosts) == len(self._addresses)
284
    return zip(hosts, self._addresses)
285

    
286

    
287
def _CheckConfigNode(name, node):
288
  """Checks if a node is online.
289

290
  @type name: string
291
  @param name: Node name
292
  @type node: L{objects.Node} or None
293
  @param node: Node object
294

295
  """
296
  if node is None:
297
    # Depend on DNS for name resolution
298
    ip = name
299
  elif node.offline:
300
    ip = _OFFLINE
301
  else:
302
    ip = node.primary_ip
303
  return (name, ip)
304

    
305

    
306
def _NodeConfigResolver(single_node_fn, all_nodes_fn, hosts):
307
  """Calculate node addresses using configuration.
308

309
  """
310
  # Special case for single-host lookups
311
  if len(hosts) == 1:
312
    (name, ) = hosts
313
    return [_CheckConfigNode(name, single_node_fn(name))]
314
  else:
315
    all_nodes = all_nodes_fn()
316
    return [_CheckConfigNode(name, all_nodes.get(name, None))
317
            for name in hosts]
318

    
319

    
320
class _RpcProcessor:
321
  def __init__(self, resolver, port, lock_monitor_cb=None):
322
    """Initializes this class.
323

324
    @param resolver: callable accepting a list of hostnames, returning a list
325
      of tuples containing name and IP address (IP address can be the name or
326
      the special value L{_OFFLINE} to mark offline machines)
327
    @type port: int
328
    @param port: TCP port
329
    @param lock_monitor_cb: Callable for registering with lock monitor
330

331
    """
332
    self._resolver = resolver
333
    self._port = port
334
    self._lock_monitor_cb = lock_monitor_cb
335

    
336
  @staticmethod
337
  def _PrepareRequests(hosts, port, procedure, body, read_timeout):
338
    """Prepares requests by sorting offline hosts into separate list.
339

340
    """
341
    results = {}
342
    requests = {}
343

    
344
    for (name, ip) in hosts:
345
      if ip is _OFFLINE:
346
        # Node is marked as offline
347
        results[name] = RpcResult(node=name, offline=True, call=procedure)
348
      else:
349
        requests[name] = \
350
          http.client.HttpClientRequest(str(ip), port,
351
                                        http.HTTP_PUT, str("/%s" % procedure),
352
                                        headers=_RPC_CLIENT_HEADERS,
353
                                        post_data=body,
354
                                        read_timeout=read_timeout,
355
                                        nicename="%s/%s" % (name, procedure),
356
                                        curl_config_fn=_ConfigRpcCurl)
357

    
358
    return (results, requests)
359

    
360
  @staticmethod
361
  def _CombineResults(results, requests, procedure):
362
    """Combines pre-computed results for offline hosts with actual call results.
363

364
    """
365
    for name, req in requests.items():
366
      if req.success and req.resp_status_code == http.HTTP_OK:
367
        host_result = RpcResult(data=serializer.LoadJson(req.resp_body),
368
                                node=name, call=procedure)
369
      else:
370
        # TODO: Better error reporting
371
        if req.error:
372
          msg = req.error
373
        else:
374
          msg = req.resp_body
375

    
376
        logging.error("RPC error in %s on node %s: %s", procedure, name, msg)
377
        host_result = RpcResult(data=msg, failed=True, node=name,
378
                                call=procedure)
379

    
380
      results[name] = host_result
381

    
382
    return results
383

    
384
  def __call__(self, hosts, procedure, body, read_timeout=None,
385
               _req_process_fn=http.client.ProcessRequests):
386
    """Makes an RPC request to a number of nodes.
387

388
    @type hosts: sequence
389
    @param hosts: Hostnames
390
    @type procedure: string
391
    @param procedure: Request path
392
    @type body: string
393
    @param body: Request body
394
    @type read_timeout: int or None
395
    @param read_timeout: Read timeout for request
396

397
    """
398
    assert read_timeout is not None, \
399
      "Missing RPC read timeout for procedure '%s'" % procedure
400

    
401
    (results, requests) = \
402
      self._PrepareRequests(self._resolver(hosts), self._port, procedure,
403
                            str(body), read_timeout)
404

    
405
    _req_process_fn(requests.values(), lock_monitor_cb=self._lock_monitor_cb)
406

    
407
    assert not frozenset(results).intersection(requests)
408

    
409
    return self._CombineResults(results, requests, procedure)
410

    
411

    
412
class RpcRunner(_generated_rpc.RpcClientDefault,
413
                _generated_rpc.RpcClientBootstrap,
414
                _generated_rpc.RpcClientConfig):
415
  """RPC runner class.
416

417
  """
418
  def __init__(self, context):
419
    """Initialized the RPC runner.
420

421
    @type context: C{masterd.GanetiContext}
422
    @param context: Ganeti context
423

424
    """
425
    # Pylint doesn't recognize multiple inheritance properly, see
426
    # <http://www.logilab.org/ticket/36586> and
427
    # <http://www.logilab.org/ticket/35642>
428
    # pylint: disable=W0233
429
    _generated_rpc.RpcClientConfig.__init__(self)
430
    _generated_rpc.RpcClientBootstrap.__init__(self)
431
    _generated_rpc.RpcClientDefault.__init__(self)
432

    
433
    self._cfg = context.cfg
434
    self._proc = _RpcProcessor(compat.partial(_NodeConfigResolver,
435
                                              self._cfg.GetNodeInfo,
436
                                              self._cfg.GetAllNodesInfo),
437
                               netutils.GetDaemonPort(constants.NODED),
438
                               lock_monitor_cb=context.glm.AddToLockMonitor)
439

    
440
  def _InstDict(self, instance, hvp=None, bep=None, osp=None):
441
    """Convert the given instance to a dict.
442

443
    This is done via the instance's ToDict() method and additionally
444
    we fill the hvparams with the cluster defaults.
445

446
    @type instance: L{objects.Instance}
447
    @param instance: an Instance object
448
    @type hvp: dict or None
449
    @param hvp: a dictionary with overridden hypervisor parameters
450
    @type bep: dict or None
451
    @param bep: a dictionary with overridden backend parameters
452
    @type osp: dict or None
453
    @param osp: a dictionary with overridden os parameters
454
    @rtype: dict
455
    @return: the instance dict, with the hvparams filled with the
456
        cluster defaults
457

458
    """
459
    idict = instance.ToDict()
460
    cluster = self._cfg.GetClusterInfo()
461
    idict["hvparams"] = cluster.FillHV(instance)
462
    if hvp is not None:
463
      idict["hvparams"].update(hvp)
464
    idict["beparams"] = cluster.FillBE(instance)
465
    if bep is not None:
466
      idict["beparams"].update(bep)
467
    idict["osparams"] = cluster.SimpleFillOS(instance.os, instance.osparams)
468
    if osp is not None:
469
      idict["osparams"].update(osp)
470
    for nic in idict["nics"]:
471
      nic['nicparams'] = objects.FillDict(
472
        cluster.nicparams[constants.PP_DEFAULT],
473
        nic['nicparams'])
474
    return idict
475

    
476
  def _InstDictHvpBep(self, (instance, hvp, bep)):
477
    """Wrapper for L{_InstDict}.
478

479
    """
480
    return self._InstDict(instance, hvp=hvp, bep=bep)
481

    
482
  def _InstDictOsp(self, (instance, osparams)):
483
    """Wrapper for L{_InstDict}.
484

485
    """
486
    return self._InstDict(instance, osp=osparams)
487

    
488
  def _Call(self, node_list, procedure, timeout, args):
489
    """Entry point for automatically generated RPC wrappers.
490

491
    """
492
    body = serializer.DumpJson(args, indent=False)
493

    
494
    return self._proc(node_list, procedure, body, read_timeout=timeout)
495

    
496
  @staticmethod
497
  def _MigrationStatusPostProc(result):
498
    if not result.fail_msg and result.payload is not None:
499
      result.payload = objects.MigrationStatus.FromDict(result.payload)
500
    return result
501

    
502
  @staticmethod
503
  def _BlockdevFindPostProc(result):
504
    if not result.fail_msg and result.payload is not None:
505
      result.payload = objects.BlockDevStatus.FromDict(result.payload)
506
    return result
507

    
508
  @staticmethod
509
  def _BlockdevGetMirrorStatusPostProc(result):
510
    if not result.fail_msg:
511
      result.payload = [objects.BlockDevStatus.FromDict(i)
512
                        for i in result.payload]
513
    return result
514

    
515
  @staticmethod
516
  def _BlockdevGetMirrorStatusMultiPostProc(result):
517
    for nres in result.values():
518
      if nres.fail_msg:
519
        continue
520

    
521
      for idx, (success, status) in enumerate(nres.payload):
522
        if success:
523
          nres.payload[idx] = (success, objects.BlockDevStatus.FromDict(status))
524

    
525
    return result
526

    
527
  @staticmethod
528
  def _OsGetPostProc(result):
529
    if not result.fail_msg and isinstance(result.payload, dict):
530
      result.payload = objects.OS.FromDict(result.payload)
531
    return result
532

    
533
  @staticmethod
534
  def _PrepareFinalizeExportDisks(snap_disks):
535
    flat_disks = []
536

    
537
    for disk in snap_disks:
538
      if isinstance(disk, bool):
539
        flat_disks.append(disk)
540
      else:
541
        flat_disks.append(disk.ToDict())
542

    
543
    return flat_disks
544

    
545
  @staticmethod
546
  def _ImpExpStatusPostProc(result):
547
    """Post-processor for import/export status.
548

549
    @rtype: Payload containing list of L{objects.ImportExportStatus} instances
550
    @return: Returns a list of the state of each named import/export or None if
551
             a status couldn't be retrieved
552

553
    """
554
    if not result.fail_msg:
555
      decoded = []
556

    
557
      for i in result.payload:
558
        if i is None:
559
          decoded.append(None)
560
          continue
561
        decoded.append(objects.ImportExportStatus.FromDict(i))
562

    
563
      result.payload = decoded
564

    
565
    return result
566

    
567
  @staticmethod
568
  def _EncodeImportExportIO((ieio, ieioargs)):
569
    """Encodes import/export I/O information.
570

571
    """
572
    if ieio == constants.IEIO_RAW_DISK:
573
      assert len(ieioargs) == 1
574
      return (ieio, (ieioargs[0].ToDict(), ))
575

    
576
    if ieio == constants.IEIO_SCRIPT:
577
      assert len(ieioargs) == 2
578
      return (ieio, (ieioargs[0].ToDict(), ieioargs[1]))
579

    
580
    return (ieio, ieioargs)
581

    
582
  @staticmethod
583
  def _PrepareFileUpload(filename):
584
    """Loads a file and prepares it for an upload to nodes.
585

586
    """
587
    data = _Compress(utils.ReadFile(filename))
588
    st = os.stat(filename)
589
    getents = runtime.GetEnts()
590
    return [filename, data, st.st_mode, getents.LookupUid(st.st_uid),
591
            getents.LookupGid(st.st_gid), st.st_atime, st.st_mtime]
592

    
593
  #
594
  # Begin RPC calls
595
  #
596

    
597
  def call_test_delay(self, node_list, duration, read_timeout=None):
598
    """Sleep for a fixed time on given node(s).
599

600
    This is a multi-node call.
601

602
    """
603
    assert read_timeout is None
604
    return self.call_test_delay(node_list, duration,
605
                                read_timeout=int(duration + 5))
606

    
607

    
608
class JobQueueRunner(_generated_rpc.RpcClientJobQueue):
609
  """RPC wrappers for job queue.
610

611
  """
612
  _Compress = staticmethod(_Compress)
613

    
614
  def __init__(self, context, address_list):
615
    """Initializes this class.
616

617
    """
618
    _generated_rpc.RpcClientJobQueue.__init__(self)
619

    
620
    if address_list is None:
621
      resolver = _SsconfResolver
622
    else:
623
      # Caller provided an address list
624
      resolver = _StaticResolver(address_list)
625

    
626
    self._proc = _RpcProcessor(resolver,
627
                               netutils.GetDaemonPort(constants.NODED),
628
                               lock_monitor_cb=context.glm.AddToLockMonitor)
629

    
630
  def _Call(self, node_list, procedure, timeout, args):
631
    """Entry point for automatically generated RPC wrappers.
632

633
    """
634
    body = serializer.DumpJson(args, indent=False)
635

    
636
    return self._proc(node_list, procedure, body, read_timeout=timeout)
637

    
638

    
639
class BootstrapRunner(_generated_rpc.RpcClientBootstrap):
640
  """RPC wrappers for bootstrapping.
641

642
  """
643
  def __init__(self):
644
    """Initializes this class.
645

646
    """
647
    _generated_rpc.RpcClientBootstrap.__init__(self)
648

    
649
    self._proc = _RpcProcessor(_SsconfResolver,
650
                               netutils.GetDaemonPort(constants.NODED))
651

    
652
  def _Call(self, node_list, procedure, timeout, args):
653
    """Entry point for automatically generated RPC wrappers.
654

655
    """
656
    body = serializer.DumpJson(args, indent=False)
657

    
658
    return self._proc(node_list, procedure, body, read_timeout=timeout)
659

    
660

    
661
class ConfigRunner(_generated_rpc.RpcClientConfig):
662
  """RPC wrappers for L{config}.
663

664
  """
665
  _PrepareFileUpload = \
666
    staticmethod(RpcRunner._PrepareFileUpload) # pylint: disable=W0212
667

    
668
  def __init__(self, address_list):
669
    """Initializes this class.
670

671
    """
672
    _generated_rpc.RpcClientConfig.__init__(self)
673

    
674
    if address_list is None:
675
      resolver = _SsconfResolver
676
    else:
677
      # Caller provided an address list
678
      resolver = _StaticResolver(address_list)
679

    
680
    self._proc = _RpcProcessor(resolver,
681
                               netutils.GetDaemonPort(constants.NODED))
682

    
683
  def _Call(self, node_list, procedure, timeout, args):
684
    """Entry point for automatically generated RPC wrappers.
685

686
    """
687
    body = serializer.DumpJson(args, indent=False)
688

    
689
    return self._proc(node_list, procedure, body, read_timeout=timeout)