Statistics
| Branch: | Tag: | Revision:

root / lib / rpc.py @ 0d1e78dd

History | View | Annotate | Download (19.5 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Inter-node RPC library.
23

24
"""
25

    
26
# pylint: disable=C0103,R0201,R0904
27
# C0103: Invalid name, since call_ are not valid
28
# R0201: Method could be a function, we keep all rpcs instance methods
29
# as not to change them back and forth between static/instance methods
30
# if they need to start using instance attributes
31
# R0904: Too many public methods
32

    
33
import os
34
import logging
35
import zlib
36
import base64
37
import pycurl
38
import threading
39

    
40
from ganeti import utils
41
from ganeti import objects
42
from ganeti import http
43
from ganeti import serializer
44
from ganeti import constants
45
from ganeti import errors
46
from ganeti import netutils
47
from ganeti import ssconf
48
from ganeti import runtime
49
from ganeti import compat
50

    
51
# Special module generated at build time
52
from ganeti import _generated_rpc
53

    
54
# pylint has a bug here, doesn't see this import
55
import ganeti.http.client  # pylint: disable=W0611
56

    
57

    
58
# Timeout for connecting to nodes (seconds)
59
_RPC_CONNECT_TIMEOUT = 5
60

    
61
_RPC_CLIENT_HEADERS = [
62
  "Content-type: %s" % http.HTTP_APP_JSON,
63
  "Expect:",
64
  ]
65

    
66
# Various time constants for the timeout table
67
_TMO_URGENT = 60 # one minute
68
_TMO_FAST = 5 * 60 # five minutes
69
_TMO_NORMAL = 15 * 60 # 15 minutes
70
_TMO_SLOW = 3600 # one hour
71
_TMO_4HRS = 4 * 3600
72
_TMO_1DAY = 86400
73

    
74
#: Special value to describe an offline host
75
_OFFLINE = object()
76

    
77

    
78
def Init():
79
  """Initializes the module-global HTTP client manager.
80

81
  Must be called before using any RPC function and while exactly one thread is
82
  running.
83

84
  """
85
  # curl_global_init(3) and curl_global_cleanup(3) must be called with only
86
  # one thread running. This check is just a safety measure -- it doesn't
87
  # cover all cases.
88
  assert threading.activeCount() == 1, \
89
         "Found more than one active thread when initializing pycURL"
90

    
91
  logging.info("Using PycURL %s", pycurl.version)
92

    
93
  pycurl.global_init(pycurl.GLOBAL_ALL)
94

    
95

    
96
def Shutdown():
97
  """Stops the module-global HTTP client manager.
98

99
  Must be called before quitting the program and while exactly one thread is
100
  running.
101

102
  """
103
  pycurl.global_cleanup()
104

    
105

    
106
def _ConfigRpcCurl(curl):
107
  noded_cert = str(constants.NODED_CERT_FILE)
108

    
109
  curl.setopt(pycurl.FOLLOWLOCATION, False)
110
  curl.setopt(pycurl.CAINFO, noded_cert)
111
  curl.setopt(pycurl.SSL_VERIFYHOST, 0)
112
  curl.setopt(pycurl.SSL_VERIFYPEER, True)
113
  curl.setopt(pycurl.SSLCERTTYPE, "PEM")
114
  curl.setopt(pycurl.SSLCERT, noded_cert)
115
  curl.setopt(pycurl.SSLKEYTYPE, "PEM")
116
  curl.setopt(pycurl.SSLKEY, noded_cert)
117
  curl.setopt(pycurl.CONNECTTIMEOUT, _RPC_CONNECT_TIMEOUT)
118

    
119

    
120
def RunWithRPC(fn):
121
  """RPC-wrapper decorator.
122

123
  When applied to a function, it runs it with the RPC system
124
  initialized, and it shutsdown the system afterwards. This means the
125
  function must be called without RPC being initialized.
126

127
  """
128
  def wrapper(*args, **kwargs):
129
    Init()
130
    try:
131
      return fn(*args, **kwargs)
132
    finally:
133
      Shutdown()
134
  return wrapper
135

    
136

    
137
def _Compress(data):
138
  """Compresses a string for transport over RPC.
139

140
  Small amounts of data are not compressed.
141

142
  @type data: str
143
  @param data: Data
144
  @rtype: tuple
145
  @return: Encoded data to send
146

147
  """
148
  # Small amounts of data are not compressed
149
  if len(data) < 512:
150
    return (constants.RPC_ENCODING_NONE, data)
151

    
152
  # Compress with zlib and encode in base64
153
  return (constants.RPC_ENCODING_ZLIB_BASE64,
154
          base64.b64encode(zlib.compress(data, 3)))
155

    
156

    
157
class RpcResult(object):
158
  """RPC Result class.
159

160
  This class holds an RPC result. It is needed since in multi-node
161
  calls we can't raise an exception just because one one out of many
162
  failed, and therefore we use this class to encapsulate the result.
163

164
  @ivar data: the data payload, for successful results, or None
165
  @ivar call: the name of the RPC call
166
  @ivar node: the name of the node to which we made the call
167
  @ivar offline: whether the operation failed because the node was
168
      offline, as opposed to actual failure; offline=True will always
169
      imply failed=True, in order to allow simpler checking if
170
      the user doesn't care about the exact failure mode
171
  @ivar fail_msg: the error message if the call failed
172

173
  """
174
  def __init__(self, data=None, failed=False, offline=False,
175
               call=None, node=None):
176
    self.offline = offline
177
    self.call = call
178
    self.node = node
179

    
180
    if offline:
181
      self.fail_msg = "Node is marked offline"
182
      self.data = self.payload = None
183
    elif failed:
184
      self.fail_msg = self._EnsureErr(data)
185
      self.data = self.payload = None
186
    else:
187
      self.data = data
188
      if not isinstance(self.data, (tuple, list)):
189
        self.fail_msg = ("RPC layer error: invalid result type (%s)" %
190
                         type(self.data))
191
        self.payload = None
192
      elif len(data) != 2:
193
        self.fail_msg = ("RPC layer error: invalid result length (%d), "
194
                         "expected 2" % len(self.data))
195
        self.payload = None
196
      elif not self.data[0]:
197
        self.fail_msg = self._EnsureErr(self.data[1])
198
        self.payload = None
199
      else:
200
        # finally success
201
        self.fail_msg = None
202
        self.payload = data[1]
203

    
204
    for attr_name in ["call", "data", "fail_msg",
205
                      "node", "offline", "payload"]:
206
      assert hasattr(self, attr_name), "Missing attribute %s" % attr_name
207

    
208
  @staticmethod
209
  def _EnsureErr(val):
210
    """Helper to ensure we return a 'True' value for error."""
211
    if val:
212
      return val
213
    else:
214
      return "No error information"
215

    
216
  def Raise(self, msg, prereq=False, ecode=None):
217
    """If the result has failed, raise an OpExecError.
218

219
    This is used so that LU code doesn't have to check for each
220
    result, but instead can call this function.
221

222
    """
223
    if not self.fail_msg:
224
      return
225

    
226
    if not msg: # one could pass None for default message
227
      msg = ("Call '%s' to node '%s' has failed: %s" %
228
             (self.call, self.node, self.fail_msg))
229
    else:
230
      msg = "%s: %s" % (msg, self.fail_msg)
231
    if prereq:
232
      ec = errors.OpPrereqError
233
    else:
234
      ec = errors.OpExecError
235
    if ecode is not None:
236
      args = (msg, ecode)
237
    else:
238
      args = (msg, )
239
    raise ec(*args) # pylint: disable=W0142
240

    
241

    
242
def _SsconfResolver(node_list,
243
                    ssc=ssconf.SimpleStore,
244
                    nslookup_fn=netutils.Hostname.GetIP):
245
  """Return addresses for given node names.
246

247
  @type node_list: list
248
  @param node_list: List of node names
249
  @type ssc: class
250
  @param ssc: SimpleStore class that is used to obtain node->ip mappings
251
  @type nslookup_fn: callable
252
  @param nslookup_fn: function use to do NS lookup
253
  @rtype: list of tuple; (string, string)
254
  @return: List of tuples containing node name and IP address
255

256
  """
257
  ss = ssc()
258
  iplist = ss.GetNodePrimaryIPList()
259
  family = ss.GetPrimaryIPFamily()
260
  ipmap = dict(entry.split() for entry in iplist)
261

    
262
  result = []
263
  for node in node_list:
264
    ip = ipmap.get(node)
265
    if ip is None:
266
      ip = nslookup_fn(node, family=family)
267
    result.append((node, ip))
268

    
269
  return result
270

    
271

    
272
class _StaticResolver:
273
  def __init__(self, addresses):
274
    """Initializes this class.
275

276
    """
277
    self._addresses = addresses
278

    
279
  def __call__(self, hosts):
280
    """Returns static addresses for hosts.
281

282
    """
283
    assert len(hosts) == len(self._addresses)
284
    return zip(hosts, self._addresses)
285

    
286

    
287
def _CheckConfigNode(name, node):
288
  """Checks if a node is online.
289

290
  @type name: string
291
  @param name: Node name
292
  @type node: L{objects.Node} or None
293
  @param node: Node object
294

295
  """
296
  if node is None:
297
    # Depend on DNS for name resolution
298
    ip = name
299
  elif node.offline:
300
    ip = _OFFLINE
301
  else:
302
    ip = node.primary_ip
303
  return (name, ip)
304

    
305

    
306
def _NodeConfigResolver(single_node_fn, all_nodes_fn, hosts):
307
  """Calculate node addresses using configuration.
308

309
  """
310
  # Special case for single-host lookups
311
  if len(hosts) == 1:
312
    (name, ) = hosts
313
    return [_CheckConfigNode(name, single_node_fn(name))]
314
  else:
315
    all_nodes = all_nodes_fn()
316
    return [_CheckConfigNode(name, all_nodes.get(name, None))
317
            for name in hosts]
318

    
319

    
320
class _RpcProcessor:
321
  def __init__(self, resolver, port, lock_monitor_cb=None):
322
    """Initializes this class.
323

324
    @param resolver: callable accepting a list of hostnames, returning a list
325
      of tuples containing name and IP address (IP address can be the name or
326
      the special value L{_OFFLINE} to mark offline machines)
327
    @type port: int
328
    @param port: TCP port
329
    @param lock_monitor_cb: Callable for registering with lock monitor
330

331
    """
332
    self._resolver = resolver
333
    self._port = port
334
    self._lock_monitor_cb = lock_monitor_cb
335

    
336
  @staticmethod
337
  def _PrepareRequests(hosts, port, procedure, body, read_timeout):
338
    """Prepares requests by sorting offline hosts into separate list.
339

340
    """
341
    results = {}
342
    requests = {}
343

    
344
    for (name, ip) in hosts:
345
      if ip is _OFFLINE:
346
        # Node is marked as offline
347
        results[name] = RpcResult(node=name, offline=True, call=procedure)
348
      else:
349
        requests[name] = \
350
          http.client.HttpClientRequest(str(ip), port,
351
                                        http.HTTP_PUT, str("/%s" % procedure),
352
                                        headers=_RPC_CLIENT_HEADERS,
353
                                        post_data=body,
354
                                        read_timeout=read_timeout,
355
                                        nicename="%s/%s" % (name, procedure),
356
                                        curl_config_fn=_ConfigRpcCurl)
357

    
358
    return (results, requests)
359

    
360
  @staticmethod
361
  def _CombineResults(results, requests, procedure):
362
    """Combines pre-computed results for offline hosts with actual call results.
363

364
    """
365
    for name, req in requests.items():
366
      if req.success and req.resp_status_code == http.HTTP_OK:
367
        host_result = RpcResult(data=serializer.LoadJson(req.resp_body),
368
                                node=name, call=procedure)
369
      else:
370
        # TODO: Better error reporting
371
        if req.error:
372
          msg = req.error
373
        else:
374
          msg = req.resp_body
375

    
376
        logging.error("RPC error in %s on node %s: %s", procedure, name, msg)
377
        host_result = RpcResult(data=msg, failed=True, node=name,
378
                                call=procedure)
379

    
380
      results[name] = host_result
381

    
382
    return results
383

    
384
  def __call__(self, hosts, procedure, body, read_timeout=None,
385
               _req_process_fn=http.client.ProcessRequests):
386
    """Makes an RPC request to a number of nodes.
387

388
    @type hosts: sequence
389
    @param hosts: Hostnames
390
    @type procedure: string
391
    @param procedure: Request path
392
    @type body: string
393
    @param body: Request body
394
    @type read_timeout: int or None
395
    @param read_timeout: Read timeout for request
396

397
    """
398
    assert read_timeout is not None, \
399
      "Missing RPC read timeout for procedure '%s'" % procedure
400

    
401
    (results, requests) = \
402
      self._PrepareRequests(self._resolver(hosts), self._port, procedure,
403
                            str(body), read_timeout)
404

    
405
    _req_process_fn(requests.values(), lock_monitor_cb=self._lock_monitor_cb)
406

    
407
    assert not frozenset(results).intersection(requests)
408

    
409
    return self._CombineResults(results, requests, procedure)
410

    
411

    
412
class RpcRunner(_generated_rpc.RpcClientDefault,
413
                _generated_rpc.RpcClientBootstrap,
414
                _generated_rpc.RpcClientConfig):
415
  """RPC runner class.
416

417
  """
418
  def __init__(self, context):
419
    """Initialized the RPC runner.
420

421
    @type context: C{masterd.GanetiContext}
422
    @param context: Ganeti context
423

424
    """
425
    # Pylint doesn't recognize multiple inheritance properly, see
426
    # <http://www.logilab.org/ticket/36586> and
427
    # <http://www.logilab.org/ticket/35642>
428
    # pylint: disable=W0233
429
    _generated_rpc.RpcClientConfig.__init__(self)
430
    _generated_rpc.RpcClientBootstrap.__init__(self)
431
    _generated_rpc.RpcClientDefault.__init__(self)
432

    
433
    self._cfg = context.cfg
434
    self._proc = _RpcProcessor(compat.partial(_NodeConfigResolver,
435
                                              self._cfg.GetNodeInfo,
436
                                              self._cfg.GetAllNodesInfo),
437
                               netutils.GetDaemonPort(constants.NODED),
438
                               lock_monitor_cb=context.glm.AddToLockMonitor)
439

    
440
  def _InstDict(self, instance, hvp=None, bep=None, osp=None):
441
    """Convert the given instance to a dict.
442

443
    This is done via the instance's ToDict() method and additionally
444
    we fill the hvparams with the cluster defaults.
445

446
    @type instance: L{objects.Instance}
447
    @param instance: an Instance object
448
    @type hvp: dict or None
449
    @param hvp: a dictionary with overridden hypervisor parameters
450
    @type bep: dict or None
451
    @param bep: a dictionary with overridden backend parameters
452
    @type osp: dict or None
453
    @param osp: a dictionary with overridden os parameters
454
    @rtype: dict
455
    @return: the instance dict, with the hvparams filled with the
456
        cluster defaults
457

458
    """
459
    idict = instance.ToDict()
460
    cluster = self._cfg.GetClusterInfo()
461
    idict["hvparams"] = cluster.FillHV(instance)
462
    if hvp is not None:
463
      idict["hvparams"].update(hvp)
464
    idict["beparams"] = cluster.FillBE(instance)
465
    if bep is not None:
466
      idict["beparams"].update(bep)
467
    idict["osparams"] = cluster.SimpleFillOS(instance.os, instance.osparams)
468
    if osp is not None:
469
      idict["osparams"].update(osp)
470
    for nic in idict["nics"]:
471
      nic['nicparams'] = objects.FillDict(
472
        cluster.nicparams[constants.PP_DEFAULT],
473
        nic['nicparams'])
474
    return idict
475

    
476
  def _InstDictHvpBep(self, (instance, hvp, bep)):
477
    """Wrapper for L{_InstDict}.
478

479
    """
480
    return self._InstDict(instance, hvp=hvp, bep=bep)
481

    
482
  def _InstDictOsp(self, (instance, osparams)):
483
    """Wrapper for L{_InstDict}.
484

485
    """
486
    return self._InstDict(instance, osp=osparams)
487

    
488
  def _Call(self, node_list, procedure, timeout, args):
489
    """Entry point for automatically generated RPC wrappers.
490

491
    """
492
    body = serializer.DumpJson(args, indent=False)
493

    
494
    return self._proc(node_list, procedure, body, read_timeout=timeout)
495

    
496
  @staticmethod
497
  def _BlockdevFindPostProc(result):
498
    if not result.fail_msg and result.payload is not None:
499
      result.payload = objects.BlockDevStatus.FromDict(result.payload)
500
    return result
501

    
502
  @staticmethod
503
  def _BlockdevGetMirrorStatusPostProc(result):
504
    if not result.fail_msg:
505
      result.payload = [objects.BlockDevStatus.FromDict(i)
506
                        for i in result.payload]
507
    return result
508

    
509
  @staticmethod
510
  def _BlockdevGetMirrorStatusMultiPostProc(result):
511
    for nres in result.values():
512
      if nres.fail_msg:
513
        continue
514

    
515
      for idx, (success, status) in enumerate(nres.payload):
516
        if success:
517
          nres.payload[idx] = (success, objects.BlockDevStatus.FromDict(status))
518

    
519
    return result
520

    
521
  @staticmethod
522
  def _OsGetPostProc(result):
523
    if not result.fail_msg and isinstance(result.payload, dict):
524
      result.payload = objects.OS.FromDict(result.payload)
525
    return result
526

    
527
  @staticmethod
528
  def _PrepareFinalizeExportDisks(snap_disks):
529
    flat_disks = []
530

    
531
    for disk in snap_disks:
532
      if isinstance(disk, bool):
533
        flat_disks.append(disk)
534
      else:
535
        flat_disks.append(disk.ToDict())
536

    
537
    return flat_disks
538

    
539
  @staticmethod
540
  def _ImpExpStatusPostProc(result):
541
    """Post-processor for import/export status.
542

543
    @rtype: Payload containing list of L{objects.ImportExportStatus} instances
544
    @return: Returns a list of the state of each named import/export or None if
545
             a status couldn't be retrieved
546

547
    """
548
    if not result.fail_msg:
549
      decoded = []
550

    
551
      for i in result.payload:
552
        if i is None:
553
          decoded.append(None)
554
          continue
555
        decoded.append(objects.ImportExportStatus.FromDict(i))
556

    
557
      result.payload = decoded
558

    
559
    return result
560

    
561
  @staticmethod
562
  def _EncodeImportExportIO(ieio, ieioargs):
563
    """Encodes import/export I/O information.
564

565
    """
566
    if ieio == constants.IEIO_RAW_DISK:
567
      assert len(ieioargs) == 1
568
      return (ieioargs[0].ToDict(), )
569

    
570
    if ieio == constants.IEIO_SCRIPT:
571
      assert len(ieioargs) == 2
572
      return (ieioargs[0].ToDict(), ieioargs[1])
573

    
574
    return ieioargs
575

    
576
  @staticmethod
577
  def _PrepareFileUpload(filename):
578
    """Loads a file and prepares it for an upload to nodes.
579

580
    """
581
    data = _Compress(utils.ReadFile(filename))
582
    st = os.stat(filename)
583
    getents = runtime.GetEnts()
584
    return [filename, data, st.st_mode, getents.LookupUid(st.st_uid),
585
            getents.LookupGid(st.st_gid), st.st_atime, st.st_mtime]
586

    
587
  #
588
  # Begin RPC calls
589
  #
590

    
591
  def call_test_delay(self, node_list, duration, read_timeout=None):
592
    """Sleep for a fixed time on given node(s).
593

594
    This is a multi-node call.
595

596
    """
597
    assert read_timeout is None
598
    return self.call_test_delay(node_list, duration,
599
                                read_timeout=int(duration + 5))
600

    
601

    
602
class JobQueueRunner(_generated_rpc.RpcClientJobQueue):
603
  """RPC wrappers for job queue.
604

605
  """
606
  _Compress = staticmethod(_Compress)
607

    
608
  def __init__(self, context, address_list):
609
    """Initializes this class.
610

611
    """
612
    _generated_rpc.RpcClientJobQueue.__init__(self)
613

    
614
    if address_list is None:
615
      resolver = _SsconfResolver
616
    else:
617
      # Caller provided an address list
618
      resolver = _StaticResolver(address_list)
619

    
620
    self._proc = _RpcProcessor(resolver,
621
                               netutils.GetDaemonPort(constants.NODED),
622
                               lock_monitor_cb=context.glm.AddToLockMonitor)
623

    
624
  def _Call(self, node_list, procedure, timeout, args):
625
    """Entry point for automatically generated RPC wrappers.
626

627
    """
628
    body = serializer.DumpJson(args, indent=False)
629

    
630
    return self._proc(node_list, procedure, body, read_timeout=timeout)
631

    
632

    
633
class BootstrapRunner(_generated_rpc.RpcClientBootstrap):
634
  """RPC wrappers for bootstrapping.
635

636
  """
637
  def __init__(self):
638
    """Initializes this class.
639

640
    """
641
    _generated_rpc.RpcClientBootstrap.__init__(self)
642

    
643
    self._proc = _RpcProcessor(_SsconfResolver,
644
                               netutils.GetDaemonPort(constants.NODED))
645

    
646
  def _Call(self, node_list, procedure, timeout, args):
647
    """Entry point for automatically generated RPC wrappers.
648

649
    """
650
    body = serializer.DumpJson(args, indent=False)
651

    
652
    return self._proc(node_list, procedure, body, read_timeout=timeout)
653

    
654

    
655
class ConfigRunner(_generated_rpc.RpcClientConfig):
656
  """RPC wrappers for L{config}.
657

658
  """
659
  _PrepareFileUpload = \
660
    staticmethod(RpcRunner._PrepareFileUpload) # pylint: disable=W0212
661

    
662
  def __init__(self, address_list):
663
    """Initializes this class.
664

665
    """
666
    _generated_rpc.RpcClientConfig.__init__(self)
667

    
668
    if address_list is None:
669
      resolver = _SsconfResolver
670
    else:
671
      # Caller provided an address list
672
      resolver = _StaticResolver(address_list)
673

    
674
    self._proc = _RpcProcessor(resolver,
675
                               netutils.GetDaemonPort(constants.NODED))
676

    
677
  def _Call(self, node_list, procedure, timeout, args):
678
    """Entry point for automatically generated RPC wrappers.
679

680
    """
681
    body = serializer.DumpJson(args, indent=False)
682

    
683
    return self._proc(node_list, procedure, body, read_timeout=timeout)