Revision 4869595d

b/Makefile.am
87 87
masterddir = $(pkgpythondir)/masterd
88 88
confddir = $(pkgpythondir)/confd
89 89
rapidir = $(pkgpythondir)/rapi
90
rpcdir = $(pkgpythondir)/rpc
90 91
serverdir = $(pkgpythondir)/server
91 92
watcherdir = $(pkgpythondir)/watcher
92 93
impexpddir = $(pkgpythondir)/impexpd
......
173 174
	lib/impexpd \
174 175
	lib/masterd \
175 176
	lib/rapi \
177
	lib/rpc \
176 178
	lib/server \
177 179
	lib/storage \
178 180
	lib/tools \
......
359 361
	lib/pathutils.py \
360 362
	lib/qlang.py \
361 363
	lib/query.py \
362
	lib/rpc.py \
363 364
	lib/rpc_defs.py \
364 365
	lib/runtime.py \
365 366
	lib/serializer.py \
......
462 463
	lib/server/noded.py \
463 464
	lib/server/rapi.py
464 465

  
466
rpc_PYTHON = \
467
	lib/rpc/__init__.py \
468
	lib/rpc/node.py
469

  
465 470
pytools_PYTHON = \
466 471
	lib/tools/__init__.py \
467 472
	lib/tools/burnin.py \
......
1545 1550
	$(storage_PYTHON) \
1546 1551
	$(rapi_PYTHON) \
1547 1552
	$(server_PYTHON) \
1553
	$(rpc_PYTHON) \
1548 1554
	$(pytools_PYTHON) \
1549 1555
	$(http_PYTHON) \
1550 1556
	$(confd_PYTHON) \
b/lib/bootstrap.py
31 31
import tempfile
32 32

  
33 33
from ganeti.cmdlib import cluster
34
from ganeti import rpc
34
import ganeti.rpc.node as rpc
35 35
from ganeti import ssh
36 36
from ganeti import utils
37 37
from ganeti import errors
b/lib/cli.py
37 37
from ganeti import constants
38 38
from ganeti import opcodes
39 39
from ganeti import luxi
40
from ganeti import rpc
40
import ganeti.rpc.node as rpc
41 41
from ganeti import ssh
42 42
from ganeti import compat
43 43
from ganeti import netutils
b/lib/cmdlib/cluster.py
42 42
from ganeti import opcodes
43 43
from ganeti import pathutils
44 44
from ganeti import query
45
from ganeti import rpc
45
import ganeti.rpc.node as rpc
46 46
from ganeti import runtime
47 47
from ganeti import ssh
48 48
from ganeti import uidpool
b/lib/cmdlib/common.py
32 32
from ganeti import objects
33 33
from ganeti import opcodes
34 34
from ganeti import pathutils
35
from ganeti import rpc
35
import ganeti.rpc.node as rpc
36 36
from ganeti import ssconf
37 37
from ganeti import utils
38 38

  
b/lib/cmdlib/instance.py
37 37
from ganeti import netutils
38 38
from ganeti import objects
39 39
from ganeti import pathutils
40
from ganeti import rpc
40
import ganeti.rpc.node as rpc
41 41
from ganeti import utils
42 42

  
43 43
from ganeti.cmdlib.base import NoHooksLU, LogicalUnit, ResultWithJobs
b/lib/cmdlib/instance_storage.py
34 34
from ganeti.masterd import iallocator
35 35
from ganeti import objects
36 36
from ganeti import utils
37
from ganeti import rpc
37
import ganeti.rpc.node as rpc
38 38
from ganeti.cmdlib.base import LogicalUnit, NoHooksLU, Tasklet
39 39
from ganeti.cmdlib.common import INSTANCE_DOWN, INSTANCE_NOT_RUNNING, \
40 40
  AnnotateDiskParams, CheckIAllocatorOrNode, ExpandNodeUuidAndName, \
b/lib/cmdlib/node.py
30 30
from ganeti import netutils
31 31
from ganeti import objects
32 32
from ganeti import opcodes
33
from ganeti import rpc
33
import ganeti.rpc.node as rpc
34 34
from ganeti import utils
35 35
from ganeti.masterd import iallocator
36 36

  
b/lib/config.py
45 45
from ganeti import locking
46 46
from ganeti import utils
47 47
from ganeti import constants
48
from ganeti import rpc
48
import ganeti.rpc.node as rpc
49 49
from ganeti import objects
50 50
from ganeti import serializer
51 51
from ganeti import uidpool
b/lib/jqueue.py
55 55
from ganeti import mcpu
56 56
from ganeti import utils
57 57
from ganeti import jstore
58
from ganeti import rpc
58
import ganeti.rpc.node as rpc
59 59
from ganeti import runtime
60 60
from ganeti import netutils
61 61
from ganeti import compat
b/lib/masterd/iallocator.py
27 27
from ganeti import ht
28 28
from ganeti import outils
29 29
from ganeti import opcodes
30
from ganeti import rpc
30
import ganeti.rpc.node as rpc
31 31
from ganeti import serializer
32 32
from ganeti import utils
33 33

  
/dev/null
1
#
2
#
3

  
4
# Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012, 2013 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

  
21

  
22
"""Inter-node RPC library.
23

  
24
"""
25

  
26
# pylint: disable=C0103,R0201,R0904
27
# C0103: Invalid name, since call_ are not valid
28
# R0201: Method could be a function, we keep all rpcs instance methods
29
# as not to change them back and forth between static/instance methods
30
# if they need to start using instance attributes
31
# R0904: Too many public methods
32

  
33
import logging
34
import zlib
35
import base64
36
import pycurl
37
import threading
38
import copy
39

  
40
from ganeti import utils
41
from ganeti import objects
42
from ganeti import http
43
from ganeti import serializer
44
from ganeti import constants
45
from ganeti import errors
46
from ganeti import netutils
47
from ganeti import ssconf
48
from ganeti import runtime
49
from ganeti import compat
50
from ganeti import rpc_defs
51
from ganeti import pathutils
52
from ganeti import vcluster
53

  
54
# Special module generated at build time
55
from ganeti import _generated_rpc
56

  
57
# pylint has a bug here, doesn't see this import
58
import ganeti.http.client  # pylint: disable=W0611
59

  
60

  
61
_RPC_CLIENT_HEADERS = [
62
  "Content-type: %s" % http.HTTP_APP_JSON,
63
  "Expect:",
64
  ]
65

  
66
#: Special value to describe an offline host
67
_OFFLINE = object()
68

  
69

  
70
def Init():
71
  """Initializes the module-global HTTP client manager.
72

  
73
  Must be called before using any RPC function and while exactly one thread is
74
  running.
75

  
76
  """
77
  # curl_global_init(3) and curl_global_cleanup(3) must be called with only
78
  # one thread running. This check is just a safety measure -- it doesn't
79
  # cover all cases.
80
  assert threading.activeCount() == 1, \
81
         "Found more than one active thread when initializing pycURL"
82

  
83
  logging.info("Using PycURL %s", pycurl.version)
84

  
85
  pycurl.global_init(pycurl.GLOBAL_ALL)
86

  
87

  
88
def Shutdown():
89
  """Stops the module-global HTTP client manager.
90

  
91
  Must be called before quitting the program and while exactly one thread is
92
  running.
93

  
94
  """
95
  pycurl.global_cleanup()
96

  
97

  
98
def _ConfigRpcCurl(curl):
99
  noded_cert = str(pathutils.NODED_CERT_FILE)
100

  
101
  curl.setopt(pycurl.FOLLOWLOCATION, False)
102
  curl.setopt(pycurl.CAINFO, noded_cert)
103
  curl.setopt(pycurl.SSL_VERIFYHOST, 0)
104
  curl.setopt(pycurl.SSL_VERIFYPEER, True)
105
  curl.setopt(pycurl.SSLCERTTYPE, "PEM")
106
  curl.setopt(pycurl.SSLCERT, noded_cert)
107
  curl.setopt(pycurl.SSLKEYTYPE, "PEM")
108
  curl.setopt(pycurl.SSLKEY, noded_cert)
109
  curl.setopt(pycurl.CONNECTTIMEOUT, constants.RPC_CONNECT_TIMEOUT)
110

  
111

  
112
def RunWithRPC(fn):
113
  """RPC-wrapper decorator.
114

  
115
  When applied to a function, it runs it with the RPC system
116
  initialized, and it shutsdown the system afterwards. This means the
117
  function must be called without RPC being initialized.
118

  
119
  """
120
  def wrapper(*args, **kwargs):
121
    Init()
122
    try:
123
      return fn(*args, **kwargs)
124
    finally:
125
      Shutdown()
126
  return wrapper
127

  
128

  
129
def _Compress(_, data):
130
  """Compresses a string for transport over RPC.
131

  
132
  Small amounts of data are not compressed.
133

  
134
  @type data: str
135
  @param data: Data
136
  @rtype: tuple
137
  @return: Encoded data to send
138

  
139
  """
140
  # Small amounts of data are not compressed
141
  if len(data) < 512:
142
    return (constants.RPC_ENCODING_NONE, data)
143

  
144
  # Compress with zlib and encode in base64
145
  return (constants.RPC_ENCODING_ZLIB_BASE64,
146
          base64.b64encode(zlib.compress(data, 3)))
147

  
148

  
149
class RpcResult(object):
150
  """RPC Result class.
151

  
152
  This class holds an RPC result. It is needed since in multi-node
153
  calls we can't raise an exception just because one out of many
154
  failed, and therefore we use this class to encapsulate the result.
155

  
156
  @ivar data: the data payload, for successful results, or None
157
  @ivar call: the name of the RPC call
158
  @ivar node: the name of the node to which we made the call
159
  @ivar offline: whether the operation failed because the node was
160
      offline, as opposed to actual failure; offline=True will always
161
      imply failed=True, in order to allow simpler checking if
162
      the user doesn't care about the exact failure mode
163
  @ivar fail_msg: the error message if the call failed
164

  
165
  """
166
  def __init__(self, data=None, failed=False, offline=False,
167
               call=None, node=None):
168
    self.offline = offline
169
    self.call = call
170
    self.node = node
171

  
172
    if offline:
173
      self.fail_msg = "Node is marked offline"
174
      self.data = self.payload = None
175
    elif failed:
176
      self.fail_msg = self._EnsureErr(data)
177
      self.data = self.payload = None
178
    else:
179
      self.data = data
180
      if not isinstance(self.data, (tuple, list)):
181
        self.fail_msg = ("RPC layer error: invalid result type (%s)" %
182
                         type(self.data))
183
        self.payload = None
184
      elif len(data) != 2:
185
        self.fail_msg = ("RPC layer error: invalid result length (%d), "
186
                         "expected 2" % len(self.data))
187
        self.payload = None
188
      elif not self.data[0]:
189
        self.fail_msg = self._EnsureErr(self.data[1])
190
        self.payload = None
191
      else:
192
        # finally success
193
        self.fail_msg = None
194
        self.payload = data[1]
195

  
196
    for attr_name in ["call", "data", "fail_msg",
197
                      "node", "offline", "payload"]:
198
      assert hasattr(self, attr_name), "Missing attribute %s" % attr_name
199

  
200
  def __repr__(self):
201
    return ("RpcResult(data=%s, call=%s, node=%s, offline=%s, fail_msg=%s)" %
202
            (self.offline, self.call, self.node, self.offline, self.fail_msg))
203

  
204
  @staticmethod
205
  def _EnsureErr(val):
206
    """Helper to ensure we return a 'True' value for error."""
207
    if val:
208
      return val
209
    else:
210
      return "No error information"
211

  
212
  def Raise(self, msg, prereq=False, ecode=None):
213
    """If the result has failed, raise an OpExecError.
214

  
215
    This is used so that LU code doesn't have to check for each
216
    result, but instead can call this function.
217

  
218
    """
219
    if not self.fail_msg:
220
      return
221

  
222
    if not msg: # one could pass None for default message
223
      msg = ("Call '%s' to node '%s' has failed: %s" %
224
             (self.call, self.node, self.fail_msg))
225
    else:
226
      msg = "%s: %s" % (msg, self.fail_msg)
227
    if prereq:
228
      ec = errors.OpPrereqError
229
    else:
230
      ec = errors.OpExecError
231
    if ecode is not None:
232
      args = (msg, ecode)
233
    else:
234
      args = (msg, )
235
    raise ec(*args) # pylint: disable=W0142
236

  
237
  def Warn(self, msg, feedback_fn):
238
    """If the result has failed, call the feedback_fn.
239

  
240
    This is used to in cases were LU wants to warn the
241
    user about a failure, but continue anyway.
242

  
243
    """
244
    if not self.fail_msg:
245
      return
246

  
247
    msg = "%s: %s" % (msg, self.fail_msg)
248
    feedback_fn(msg)
249

  
250

  
251
def _SsconfResolver(ssconf_ips, node_list, _,
252
                    ssc=ssconf.SimpleStore,
253
                    nslookup_fn=netutils.Hostname.GetIP):
254
  """Return addresses for given node names.
255

  
256
  @type ssconf_ips: bool
257
  @param ssconf_ips: Use the ssconf IPs
258
  @type node_list: list
259
  @param node_list: List of node names
260
  @type ssc: class
261
  @param ssc: SimpleStore class that is used to obtain node->ip mappings
262
  @type nslookup_fn: callable
263
  @param nslookup_fn: function use to do NS lookup
264
  @rtype: list of tuple; (string, string)
265
  @return: List of tuples containing node name and IP address
266

  
267
  """
268
  ss = ssc()
269
  family = ss.GetPrimaryIPFamily()
270

  
271
  if ssconf_ips:
272
    iplist = ss.GetNodePrimaryIPList()
273
    ipmap = dict(entry.split() for entry in iplist)
274
  else:
275
    ipmap = {}
276

  
277
  result = []
278
  for node in node_list:
279
    ip = ipmap.get(node)
280
    if ip is None:
281
      ip = nslookup_fn(node, family=family)
282
    result.append((node, ip, node))
283

  
284
  return result
285

  
286

  
287
class _StaticResolver:
288
  def __init__(self, addresses):
289
    """Initializes this class.
290

  
291
    """
292
    self._addresses = addresses
293

  
294
  def __call__(self, hosts, _):
295
    """Returns static addresses for hosts.
296

  
297
    """
298
    assert len(hosts) == len(self._addresses)
299
    return zip(hosts, self._addresses, hosts)
300

  
301

  
302
def _CheckConfigNode(node_uuid_or_name, node, accept_offline_node):
303
  """Checks if a node is online.
304

  
305
  @type node_uuid_or_name: string
306
  @param node_uuid_or_name: Node UUID
307
  @type node: L{objects.Node} or None
308
  @param node: Node object
309

  
310
  """
311
  if node is None:
312
    # Assume that the passed parameter was actually a node name, so depend on
313
    # DNS for name resolution
314
    return (node_uuid_or_name, node_uuid_or_name, node_uuid_or_name)
315
  else:
316
    if node.offline and not accept_offline_node:
317
      ip = _OFFLINE
318
    else:
319
      ip = node.primary_ip
320
    return (node.name, ip, node_uuid_or_name)
321

  
322

  
323
def _NodeConfigResolver(single_node_fn, all_nodes_fn, node_uuids, opts):
324
  """Calculate node addresses using configuration.
325

  
326
  Note that strings in node_uuids are treated as node names if the UUID is not
327
  found in the configuration.
328

  
329
  """
330
  accept_offline_node = (opts is rpc_defs.ACCEPT_OFFLINE_NODE)
331

  
332
  assert accept_offline_node or opts is None, "Unknown option"
333

  
334
  # Special case for single-host lookups
335
  if len(node_uuids) == 1:
336
    (uuid, ) = node_uuids
337
    return [_CheckConfigNode(uuid, single_node_fn(uuid), accept_offline_node)]
338
  else:
339
    all_nodes = all_nodes_fn()
340
    return [_CheckConfigNode(uuid, all_nodes.get(uuid, None),
341
                             accept_offline_node)
342
            for uuid in node_uuids]
343

  
344

  
345
class _RpcProcessor:
346
  def __init__(self, resolver, port, lock_monitor_cb=None):
347
    """Initializes this class.
348

  
349
    @param resolver: callable accepting a list of node UUIDs or hostnames,
350
      returning a list of tuples containing name, IP address and original name
351
      of the resolved node. IP address can be the name or the special value
352
      L{_OFFLINE} to mark offline machines.
353
    @type port: int
354
    @param port: TCP port
355
    @param lock_monitor_cb: Callable for registering with lock monitor
356

  
357
    """
358
    self._resolver = resolver
359
    self._port = port
360
    self._lock_monitor_cb = lock_monitor_cb
361

  
362
  @staticmethod
363
  def _PrepareRequests(hosts, port, procedure, body, read_timeout):
364
    """Prepares requests by sorting offline hosts into separate list.
365

  
366
    @type body: dict
367
    @param body: a dictionary with per-host body data
368

  
369
    """
370
    results = {}
371
    requests = {}
372

  
373
    assert isinstance(body, dict)
374
    assert len(body) == len(hosts)
375
    assert compat.all(isinstance(v, str) for v in body.values())
376
    assert frozenset(map(lambda x: x[2], hosts)) == frozenset(body.keys()), \
377
        "%s != %s" % (hosts, body.keys())
378

  
379
    for (name, ip, original_name) in hosts:
380
      if ip is _OFFLINE:
381
        # Node is marked as offline
382
        results[original_name] = RpcResult(node=name,
383
                                           offline=True,
384
                                           call=procedure)
385
      else:
386
        requests[original_name] = \
387
          http.client.HttpClientRequest(str(ip), port,
388
                                        http.HTTP_POST, str("/%s" % procedure),
389
                                        headers=_RPC_CLIENT_HEADERS,
390
                                        post_data=body[original_name],
391
                                        read_timeout=read_timeout,
392
                                        nicename="%s/%s" % (name, procedure),
393
                                        curl_config_fn=_ConfigRpcCurl)
394

  
395
    return (results, requests)
396

  
397
  @staticmethod
398
  def _CombineResults(results, requests, procedure):
399
    """Combines pre-computed results for offline hosts with actual call results.
400

  
401
    """
402
    for name, req in requests.items():
403
      if req.success and req.resp_status_code == http.HTTP_OK:
404
        host_result = RpcResult(data=serializer.LoadJson(req.resp_body),
405
                                node=name, call=procedure)
406
      else:
407
        # TODO: Better error reporting
408
        if req.error:
409
          msg = req.error
410
        else:
411
          msg = req.resp_body
412

  
413
        logging.error("RPC error in %s on node %s: %s", procedure, name, msg)
414
        host_result = RpcResult(data=msg, failed=True, node=name,
415
                                call=procedure)
416

  
417
      results[name] = host_result
418

  
419
    return results
420

  
421
  def __call__(self, nodes, procedure, body, read_timeout, resolver_opts,
422
               _req_process_fn=None):
423
    """Makes an RPC request to a number of nodes.
424

  
425
    @type nodes: sequence
426
    @param nodes: node UUIDs or Hostnames
427
    @type procedure: string
428
    @param procedure: Request path
429
    @type body: dictionary
430
    @param body: dictionary with request bodies per host
431
    @type read_timeout: int or None
432
    @param read_timeout: Read timeout for request
433
    @rtype: dictionary
434
    @return: a dictionary mapping host names to rpc.RpcResult objects
435

  
436
    """
437
    assert read_timeout is not None, \
438
      "Missing RPC read timeout for procedure '%s'" % procedure
439

  
440
    if _req_process_fn is None:
441
      _req_process_fn = http.client.ProcessRequests
442

  
443
    (results, requests) = \
444
      self._PrepareRequests(self._resolver(nodes, resolver_opts), self._port,
445
                            procedure, body, read_timeout)
446

  
447
    _req_process_fn(requests.values(), lock_monitor_cb=self._lock_monitor_cb)
448

  
449
    assert not frozenset(results).intersection(requests)
450

  
451
    return self._CombineResults(results, requests, procedure)
452

  
453

  
454
class _RpcClientBase:
455
  def __init__(self, resolver, encoder_fn, lock_monitor_cb=None,
456
               _req_process_fn=None):
457
    """Initializes this class.
458

  
459
    """
460
    proc = _RpcProcessor(resolver,
461
                         netutils.GetDaemonPort(constants.NODED),
462
                         lock_monitor_cb=lock_monitor_cb)
463
    self._proc = compat.partial(proc, _req_process_fn=_req_process_fn)
464
    self._encoder = compat.partial(self._EncodeArg, encoder_fn)
465

  
466
  @staticmethod
467
  def _EncodeArg(encoder_fn, node, (argkind, value)):
468
    """Encode argument.
469

  
470
    """
471
    if argkind is None:
472
      return value
473
    else:
474
      return encoder_fn(argkind)(node, value)
475

  
476
  def _Call(self, cdef, node_list, args):
477
    """Entry point for automatically generated RPC wrappers.
478

  
479
    """
480
    (procedure, _, resolver_opts, timeout, argdefs,
481
     prep_fn, postproc_fn, _) = cdef
482

  
483
    if callable(timeout):
484
      read_timeout = timeout(args)
485
    else:
486
      read_timeout = timeout
487

  
488
    if callable(resolver_opts):
489
      req_resolver_opts = resolver_opts(args)
490
    else:
491
      req_resolver_opts = resolver_opts
492

  
493
    if len(args) != len(argdefs):
494
      raise errors.ProgrammerError("Number of passed arguments doesn't match")
495

  
496
    if prep_fn is None:
497
      prep_fn = lambda _, args: args
498
    assert callable(prep_fn)
499

  
500
    # encode the arguments for each node individually, pass them and the node
501
    # name to the prep_fn, and serialise its return value
502
    encode_args_fn = lambda node: map(compat.partial(self._encoder, node),
503
                                      zip(map(compat.snd, argdefs), args))
504
    pnbody = dict((n, serializer.DumpJson(prep_fn(n, encode_args_fn(n))))
505
                  for n in node_list)
506

  
507
    result = self._proc(node_list, procedure, pnbody, read_timeout,
508
                        req_resolver_opts)
509

  
510
    if postproc_fn:
511
      return dict(map(lambda (key, value): (key, postproc_fn(value)),
512
                      result.items()))
513
    else:
514
      return result
515

  
516

  
517
def _ObjectToDict(_, value):
518
  """Converts an object to a dictionary.
519

  
520
  @note: See L{objects}.
521

  
522
  """
523
  return value.ToDict()
524

  
525

  
526
def _ObjectListToDict(node, value):
527
  """Converts a list of L{objects} to dictionaries.
528

  
529
  """
530
  return map(compat.partial(_ObjectToDict, node), value)
531

  
532

  
533
def _PrepareFileUpload(getents_fn, node, filename):
534
  """Loads a file and prepares it for an upload to nodes.
535

  
536
  """
537
  statcb = utils.FileStatHelper()
538
  data = _Compress(node, utils.ReadFile(filename, preread=statcb))
539
  st = statcb.st
540

  
541
  if getents_fn is None:
542
    getents_fn = runtime.GetEnts
543

  
544
  getents = getents_fn()
545

  
546
  virt_filename = vcluster.MakeVirtualPath(filename)
547

  
548
  return [virt_filename, data, st.st_mode, getents.LookupUid(st.st_uid),
549
          getents.LookupGid(st.st_gid), st.st_atime, st.st_mtime]
550

  
551

  
552
def _PrepareFinalizeExportDisks(_, snap_disks):
553
  """Encodes disks for finalizing export.
554

  
555
  """
556
  flat_disks = []
557

  
558
  for disk in snap_disks:
559
    if isinstance(disk, bool):
560
      flat_disks.append(disk)
561
    else:
562
      flat_disks.append(disk.ToDict())
563

  
564
  return flat_disks
565

  
566

  
567
def _EncodeBlockdevRename(_, value):
568
  """Encodes information for renaming block devices.
569

  
570
  """
571
  return [(d.ToDict(), uid) for d, uid in value]
572

  
573

  
574
def _AddSpindlesToLegacyNodeInfo(result, space_info):
575
  """Extracts the spindle information from the space info and adds
576
  it to the result dictionary.
577

  
578
  @type result: dict of strings
579
  @param result: dictionary holding the result of the legacy node info
580
  @type space_info: list of dicts of strings
581
  @param space_info: list, each row holding space information of one storage
582
    unit
583
  @rtype: None
584
  @return: does not return anything, manipulates the C{result} variable
585

  
586
  """
587
  lvm_pv_info = utils.storage.LookupSpaceInfoByStorageType(
588
      space_info, constants.ST_LVM_PV)
589
  if lvm_pv_info:
590
    result["spindles_free"] = lvm_pv_info["storage_free"]
591
    result["spindles_total"] = lvm_pv_info["storage_size"]
592
  else:
593
    result["spindles_free"] = 0
594
    result["spindles_total"] = 0
595

  
596

  
597
def _AddStorageInfoToLegacyNodeInfoByTemplate(
598
    result, space_info, disk_template):
599
  """Extracts the storage space information of the disk template from
600
  the space info and adds it to the result dictionary.
601

  
602
  @see: C{_AddSpindlesToLegacyNodeInfo} for parameter information.
603

  
604
  """
605
  if utils.storage.DiskTemplateSupportsSpaceReporting(disk_template):
606
    disk_info = utils.storage.LookupSpaceInfoByDiskTemplate(
607
        space_info, disk_template)
608
    result["name"] = disk_info["name"]
609
    result["storage_free"] = disk_info["storage_free"]
610
    result["storage_size"] = disk_info["storage_size"]
611
  else:
612
    # FIXME: consider displaying '-' in this case
613
    result["storage_free"] = 0
614
    result["storage_size"] = 0
615

  
616

  
617
def MakeLegacyNodeInfo(data, disk_template):
618
  """Formats the data returned by L{rpc.RpcRunner.call_node_info}.
619

  
620
  Converts the data into a single dictionary. This is fine for most use cases,
621
  but some require information from more than one volume group or hypervisor.
622

  
623
  """
624
  (bootid, space_info, (hv_info, )) = data
625

  
626
  ret = utils.JoinDisjointDicts(hv_info, {"bootid": bootid})
627

  
628
  _AddSpindlesToLegacyNodeInfo(ret, space_info)
629
  _AddStorageInfoToLegacyNodeInfoByTemplate(ret, space_info, disk_template)
630

  
631
  return ret
632

  
633

  
634
def _AnnotateDParamsDRBD(disk, (drbd_params, data_params, meta_params)):
635
  """Annotates just DRBD disks layouts.
636

  
637
  """
638
  assert disk.dev_type == constants.DT_DRBD8
639

  
640
  disk.params = objects.FillDict(drbd_params, disk.params)
641
  (dev_data, dev_meta) = disk.children
642
  dev_data.params = objects.FillDict(data_params, dev_data.params)
643
  dev_meta.params = objects.FillDict(meta_params, dev_meta.params)
644

  
645
  return disk
646

  
647

  
648
def _AnnotateDParamsGeneric(disk, (params, )):
649
  """Generic disk parameter annotation routine.
650

  
651
  """
652
  assert disk.dev_type != constants.DT_DRBD8
653

  
654
  disk.params = objects.FillDict(params, disk.params)
655

  
656
  return disk
657

  
658

  
659
def AnnotateDiskParams(disks, disk_params):
660
  """Annotates the disk objects with the disk parameters.
661

  
662
  @param disks: The list of disks objects to annotate
663
  @param disk_params: The disk parameters for annotation
664
  @returns: A list of disk objects annotated
665

  
666
  """
667
  def AnnotateDisk(disk):
668
    if disk.dev_type == constants.DT_DISKLESS:
669
      return disk
670

  
671
    ld_params = objects.Disk.ComputeLDParams(disk.dev_type, disk_params)
672

  
673
    if disk.dev_type == constants.DT_DRBD8:
674
      return _AnnotateDParamsDRBD(disk, ld_params)
675
    else:
676
      return _AnnotateDParamsGeneric(disk, ld_params)
677

  
678
  return [AnnotateDisk(disk.Copy()) for disk in disks]
679

  
680

  
681
def _GetExclusiveStorageFlag(cfg, node_uuid):
682
  ni = cfg.GetNodeInfo(node_uuid)
683
  if ni is None:
684
    raise errors.OpPrereqError("Invalid node name %s" % node_uuid,
685
                               errors.ECODE_NOENT)
686
  return cfg.GetNdParams(ni)[constants.ND_EXCLUSIVE_STORAGE]
687

  
688

  
689
def _AddExclusiveStorageFlagToLvmStorageUnits(storage_units, es_flag):
690
  """Adds the exclusive storage flag to lvm units.
691

  
692
  This function creates a copy of the storage_units lists, with the
693
  es_flag being added to all lvm storage units.
694

  
695
  @type storage_units: list of pairs (string, string)
696
  @param storage_units: list of 'raw' storage units, consisting only of
697
    (storage_type, storage_key)
698
  @type es_flag: boolean
699
  @param es_flag: exclusive storage flag
700
  @rtype: list of tuples (string, string, list)
701
  @return: list of storage units (storage_type, storage_key, params) with
702
    the params containing the es_flag for lvm-vg storage units
703

  
704
  """
705
  result = []
706
  for (storage_type, storage_key) in storage_units:
707
    if storage_type in [constants.ST_LVM_VG]:
708
      result.append((storage_type, storage_key, [es_flag]))
709
      if es_flag:
710
        result.append((constants.ST_LVM_PV, storage_key, [es_flag]))
711
    else:
712
      result.append((storage_type, storage_key, []))
713
  return result
714

  
715

  
716
def GetExclusiveStorageForNodes(cfg, node_uuids):
717
  """Return the exclusive storage flag for all the given nodes.
718

  
719
  @type cfg: L{config.ConfigWriter}
720
  @param cfg: cluster configuration
721
  @type node_uuids: list or tuple
722
  @param node_uuids: node UUIDs for which to read the flag
723
  @rtype: dict
724
  @return: mapping from node uuids to exclusive storage flags
725
  @raise errors.OpPrereqError: if any given node name has no corresponding
726
  node
727

  
728
  """
729
  getflag = lambda n: _GetExclusiveStorageFlag(cfg, n)
730
  flags = map(getflag, node_uuids)
731
  return dict(zip(node_uuids, flags))
732

  
733

  
734
def PrepareStorageUnitsForNodes(cfg, storage_units, node_uuids):
735
  """Return the lvm storage unit for all the given nodes.
736

  
737
  Main purpose of this function is to map the exclusive storage flag, which
738
  can be different for each node, to the default LVM storage unit.
739

  
740
  @type cfg: L{config.ConfigWriter}
741
  @param cfg: cluster configuration
742
  @type storage_units: list of pairs (string, string)
743
  @param storage_units: list of 'raw' storage units, e.g. pairs of
744
    (storage_type, storage_key)
745
  @type node_uuids: list or tuple
746
  @param node_uuids: node UUIDs for which to read the flag
747
  @rtype: dict
748
  @return: mapping from node uuids to a list of storage units which include
749
    the exclusive storage flag for lvm storage
750
  @raise errors.OpPrereqError: if any given node name has no corresponding
751
  node
752

  
753
  """
754
  getunit = lambda n: _AddExclusiveStorageFlagToLvmStorageUnits(
755
      storage_units, _GetExclusiveStorageFlag(cfg, n))
756
  flags = map(getunit, node_uuids)
757
  return dict(zip(node_uuids, flags))
758

  
759

  
760
#: Generic encoders
761
_ENCODERS = {
762
  rpc_defs.ED_OBJECT_DICT: _ObjectToDict,
763
  rpc_defs.ED_OBJECT_DICT_LIST: _ObjectListToDict,
764
  rpc_defs.ED_COMPRESS: _Compress,
765
  rpc_defs.ED_FINALIZE_EXPORT_DISKS: _PrepareFinalizeExportDisks,
766
  rpc_defs.ED_BLOCKDEV_RENAME: _EncodeBlockdevRename,
767
  }
768

  
769

  
770
class RpcRunner(_RpcClientBase,
771
                _generated_rpc.RpcClientDefault,
772
                _generated_rpc.RpcClientBootstrap,
773
                _generated_rpc.RpcClientDnsOnly,
774
                _generated_rpc.RpcClientConfig):
775
  """RPC runner class.
776

  
777
  """
778
  def __init__(self, cfg, lock_monitor_cb, _req_process_fn=None, _getents=None):
779
    """Initialized the RPC runner.
780

  
781
    @type cfg: L{config.ConfigWriter}
782
    @param cfg: Configuration
783
    @type lock_monitor_cb: callable
784
    @param lock_monitor_cb: Lock monitor callback
785

  
786
    """
787
    self._cfg = cfg
788

  
789
    encoders = _ENCODERS.copy()
790

  
791
    encoders.update({
792
      # Encoders requiring configuration object
793
      rpc_defs.ED_INST_DICT: self._InstDict,
794
      rpc_defs.ED_INST_DICT_HVP_BEP_DP: self._InstDictHvpBepDp,
795
      rpc_defs.ED_INST_DICT_OSP_DP: self._InstDictOspDp,
796
      rpc_defs.ED_NIC_DICT: self._NicDict,
797
      rpc_defs.ED_DEVICE_DICT: self._DeviceDict,
798

  
799
      # Encoders annotating disk parameters
800
      rpc_defs.ED_DISKS_DICT_DP: self._DisksDictDP,
801
      rpc_defs.ED_MULTI_DISKS_DICT_DP: self._MultiDiskDictDP,
802
      rpc_defs.ED_SINGLE_DISK_DICT_DP: self._SingleDiskDictDP,
803
      rpc_defs.ED_NODE_TO_DISK_DICT_DP: self._EncodeNodeToDiskDictDP,
804

  
805
      # Encoders with special requirements
806
      rpc_defs.ED_FILE_DETAILS: compat.partial(_PrepareFileUpload, _getents),
807

  
808
      rpc_defs.ED_IMPEXP_IO: self._EncodeImportExportIO,
809
      })
810

  
811
    # Resolver using configuration
812
    resolver = compat.partial(_NodeConfigResolver, cfg.GetNodeInfo,
813
                              cfg.GetAllNodesInfo)
814

  
815
    # Pylint doesn't recognize multiple inheritance properly, see
816
    # <http://www.logilab.org/ticket/36586> and
817
    # <http://www.logilab.org/ticket/35642>
818
    # pylint: disable=W0233
819
    _RpcClientBase.__init__(self, resolver, encoders.get,
820
                            lock_monitor_cb=lock_monitor_cb,
821
                            _req_process_fn=_req_process_fn)
822
    _generated_rpc.RpcClientConfig.__init__(self)
823
    _generated_rpc.RpcClientBootstrap.__init__(self)
824
    _generated_rpc.RpcClientDnsOnly.__init__(self)
825
    _generated_rpc.RpcClientDefault.__init__(self)
826

  
827
  def _NicDict(self, _, nic):
828
    """Convert the given nic to a dict and encapsulate netinfo
829

  
830
    """
831
    n = copy.deepcopy(nic)
832
    if n.network:
833
      net_uuid = self._cfg.LookupNetwork(n.network)
834
      if net_uuid:
835
        nobj = self._cfg.GetNetwork(net_uuid)
836
        n.netinfo = objects.Network.ToDict(nobj)
837
    return n.ToDict()
838

  
839
  def _DeviceDict(self, _, (device, instance)):
840
    if isinstance(device, objects.NIC):
841
      return self._NicDict(None, device)
842
    elif isinstance(device, objects.Disk):
843
      return self._SingleDiskDictDP(None, (device, instance))
844

  
845
  def _InstDict(self, node, instance, hvp=None, bep=None, osp=None):
846
    """Convert the given instance to a dict.
847

  
848
    This is done via the instance's ToDict() method and additionally
849
    we fill the hvparams with the cluster defaults.
850

  
851
    @type instance: L{objects.Instance}
852
    @param instance: an Instance object
853
    @type hvp: dict or None
854
    @param hvp: a dictionary with overridden hypervisor parameters
855
    @type bep: dict or None
856
    @param bep: a dictionary with overridden backend parameters
857
    @type osp: dict or None
858
    @param osp: a dictionary with overridden os parameters
859
    @rtype: dict
860
    @return: the instance dict, with the hvparams filled with the
861
        cluster defaults
862

  
863
    """
864
    idict = instance.ToDict()
865
    cluster = self._cfg.GetClusterInfo()
866
    idict["hvparams"] = cluster.FillHV(instance)
867
    if hvp is not None:
868
      idict["hvparams"].update(hvp)
869
    idict["beparams"] = cluster.FillBE(instance)
870
    if bep is not None:
871
      idict["beparams"].update(bep)
872
    idict["osparams"] = cluster.SimpleFillOS(instance.os, instance.osparams)
873
    if osp is not None:
874
      idict["osparams"].update(osp)
875
    idict["disks"] = self._DisksDictDP(node, (instance.disks, instance))
876
    for nic in idict["nics"]:
877
      nic["nicparams"] = objects.FillDict(
878
        cluster.nicparams[constants.PP_DEFAULT],
879
        nic["nicparams"])
880
      network = nic.get("network", None)
881
      if network:
882
        net_uuid = self._cfg.LookupNetwork(network)
883
        if net_uuid:
884
          nobj = self._cfg.GetNetwork(net_uuid)
885
          nic["netinfo"] = objects.Network.ToDict(nobj)
886
    return idict
887

  
888
  def _InstDictHvpBepDp(self, node, (instance, hvp, bep)):
889
    """Wrapper for L{_InstDict}.
890

  
891
    """
892
    return self._InstDict(node, instance, hvp=hvp, bep=bep)
893

  
894
  def _InstDictOspDp(self, node, (instance, osparams)):
895
    """Wrapper for L{_InstDict}.
896

  
897
    """
898
    return self._InstDict(node, instance, osp=osparams)
899

  
900
  def _DisksDictDP(self, node, (disks, instance)):
901
    """Wrapper for L{AnnotateDiskParams}.
902

  
903
    """
904
    diskparams = self._cfg.GetInstanceDiskParams(instance)
905
    ret = []
906
    for disk in AnnotateDiskParams(disks, diskparams):
907
      disk_node_uuids = disk.GetNodes(instance.primary_node)
908
      node_ips = dict((uuid, node.secondary_ip) for (uuid, node)
909
                      in self._cfg.GetMultiNodeInfo(disk_node_uuids))
910

  
911
      disk.UpdateDynamicDiskParams(node, node_ips)
912

  
913
      ret.append(disk.ToDict(include_dynamic_params=True))
914

  
915
    return ret
916

  
917
  def _MultiDiskDictDP(self, node, disks_insts):
918
    """Wrapper for L{AnnotateDiskParams}.
919

  
920
    Supports a list of (disk, instance) tuples.
921
    """
922
    return [disk for disk_inst in disks_insts
923
            for disk in self._DisksDictDP(node, disk_inst)]
924

  
925
  def _SingleDiskDictDP(self, node, (disk, instance)):
926
    """Wrapper for L{AnnotateDiskParams}.
927

  
928
    """
929
    (anno_disk,) = self._DisksDictDP(node, ([disk], instance))
930
    return anno_disk
931

  
932
  def _EncodeNodeToDiskDictDP(self, node, value):
933
    """Encode dict of node name -> list of (disk, instance) tuples as values.
934

  
935
    """
936
    return dict((name, [self._SingleDiskDictDP(node, disk) for disk in disks])
937
                for name, disks in value.items())
938

  
939
  def _EncodeImportExportIO(self, node, (ieio, ieioargs)):
940
    """Encodes import/export I/O information.
941

  
942
    """
943
    if ieio == constants.IEIO_RAW_DISK:
944
      assert len(ieioargs) == 2
945
      return (ieio, (self._SingleDiskDictDP(node, ieioargs), ))
946

  
947
    if ieio == constants.IEIO_SCRIPT:
948
      assert len(ieioargs) == 2
949
      return (ieio, (self._SingleDiskDictDP(node, ieioargs[0]), ieioargs[1]))
950

  
951
    return (ieio, ieioargs)
952

  
953

  
954
class JobQueueRunner(_RpcClientBase, _generated_rpc.RpcClientJobQueue):
955
  """RPC wrappers for job queue.
956

  
957
  """
958
  def __init__(self, context, address_list):
959
    """Initializes this class.
960

  
961
    """
962
    if address_list is None:
963
      resolver = compat.partial(_SsconfResolver, True)
964
    else:
965
      # Caller provided an address list
966
      resolver = _StaticResolver(address_list)
967

  
968
    _RpcClientBase.__init__(self, resolver, _ENCODERS.get,
969
                            lock_monitor_cb=context.glm.AddToLockMonitor)
970
    _generated_rpc.RpcClientJobQueue.__init__(self)
971

  
972

  
973
class BootstrapRunner(_RpcClientBase,
974
                      _generated_rpc.RpcClientBootstrap,
975
                      _generated_rpc.RpcClientDnsOnly):
976
  """RPC wrappers for bootstrapping.
977

  
978
  """
979
  def __init__(self):
980
    """Initializes this class.
981

  
982
    """
983
    # Pylint doesn't recognize multiple inheritance properly, see
984
    # <http://www.logilab.org/ticket/36586> and
985
    # <http://www.logilab.org/ticket/35642>
986
    # pylint: disable=W0233
987
    _RpcClientBase.__init__(self, compat.partial(_SsconfResolver, True),
988
                            _ENCODERS.get)
989
    _generated_rpc.RpcClientBootstrap.__init__(self)
990
    _generated_rpc.RpcClientDnsOnly.__init__(self)
991

  
992

  
993
class DnsOnlyRunner(_RpcClientBase, _generated_rpc.RpcClientDnsOnly):
994
  """RPC wrappers for calls using only DNS.
995

  
996
  """
997
  def __init__(self):
998
    """Initialize this class.
999

  
1000
    """
1001
    _RpcClientBase.__init__(self, compat.partial(_SsconfResolver, False),
1002
                            _ENCODERS.get)
1003
    _generated_rpc.RpcClientDnsOnly.__init__(self)
1004

  
1005

  
1006
class ConfigRunner(_RpcClientBase, _generated_rpc.RpcClientConfig):
1007
  """RPC wrappers for L{config}.
1008

  
1009
  """
1010
  def __init__(self, context, address_list, _req_process_fn=None,
1011
               _getents=None):
1012
    """Initializes this class.
1013

  
1014
    """
1015
    if context:
1016
      lock_monitor_cb = context.glm.AddToLockMonitor
1017
    else:
1018
      lock_monitor_cb = None
1019

  
1020
    if address_list is None:
1021
      resolver = compat.partial(_SsconfResolver, True)
1022
    else:
1023
      # Caller provided an address list
1024
      resolver = _StaticResolver(address_list)
1025

  
1026
    encoders = _ENCODERS.copy()
1027

  
1028
    encoders.update({
1029
      rpc_defs.ED_FILE_DETAILS: compat.partial(_PrepareFileUpload, _getents),
1030
      })
1031

  
1032
    _RpcClientBase.__init__(self, resolver, encoders.get,
1033
                            lock_monitor_cb=lock_monitor_cb,
1034
                            _req_process_fn=_req_process_fn)
1035
    _generated_rpc.RpcClientConfig.__init__(self)
b/lib/rpc/__init__.py
1
#
2
#
3

  
4
# Copyright (C) 2013 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

  
21

  
22
"""Empty file for package definition.
23

  
24
"""
b/lib/rpc/node.py
1
#
2
#
3

  
4
# Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012, 2013 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

  
21

  
22
"""Inter-node RPC library.
23

  
24
"""
25

  
26
# pylint: disable=C0103,R0201,R0904
27
# C0103: Invalid name, since call_ are not valid
28
# R0201: Method could be a function, we keep all rpcs instance methods
29
# as not to change them back and forth between static/instance methods
30
# if they need to start using instance attributes
31
# R0904: Too many public methods
32

  
33
import logging
34
import zlib
35
import base64
36
import pycurl
37
import threading
38
import copy
39

  
40
from ganeti import utils
41
from ganeti import objects
42
from ganeti import http
43
from ganeti import serializer
44
from ganeti import constants
45
from ganeti import errors
46
from ganeti import netutils
47
from ganeti import ssconf
48
from ganeti import runtime
49
from ganeti import compat
50
from ganeti import rpc_defs
51
from ganeti import pathutils
52
from ganeti import vcluster
53

  
54
# Special module generated at build time
55
from ganeti import _generated_rpc
56

  
57
# pylint has a bug here, doesn't see this import
58
import ganeti.http.client  # pylint: disable=W0611
59

  
60

  
61
_RPC_CLIENT_HEADERS = [
62
  "Content-type: %s" % http.HTTP_APP_JSON,
63
  "Expect:",
64
  ]
65

  
66
#: Special value to describe an offline host
67
_OFFLINE = object()
68

  
69

  
70
def Init():
71
  """Initializes the module-global HTTP client manager.
72

  
73
  Must be called before using any RPC function and while exactly one thread is
74
  running.
75

  
76
  """
77
  # curl_global_init(3) and curl_global_cleanup(3) must be called with only
78
  # one thread running. This check is just a safety measure -- it doesn't
79
  # cover all cases.
80
  assert threading.activeCount() == 1, \
81
         "Found more than one active thread when initializing pycURL"
82

  
83
  logging.info("Using PycURL %s", pycurl.version)
84

  
85
  pycurl.global_init(pycurl.GLOBAL_ALL)
86

  
87

  
88
def Shutdown():
89
  """Stops the module-global HTTP client manager.
90

  
91
  Must be called before quitting the program and while exactly one thread is
92
  running.
93

  
94
  """
95
  pycurl.global_cleanup()
96

  
97

  
98
def _ConfigRpcCurl(curl):
99
  noded_cert = str(pathutils.NODED_CERT_FILE)
100

  
101
  curl.setopt(pycurl.FOLLOWLOCATION, False)
102
  curl.setopt(pycurl.CAINFO, noded_cert)
103
  curl.setopt(pycurl.SSL_VERIFYHOST, 0)
104
  curl.setopt(pycurl.SSL_VERIFYPEER, True)
105
  curl.setopt(pycurl.SSLCERTTYPE, "PEM")
106
  curl.setopt(pycurl.SSLCERT, noded_cert)
107
  curl.setopt(pycurl.SSLKEYTYPE, "PEM")
108
  curl.setopt(pycurl.SSLKEY, noded_cert)
109
  curl.setopt(pycurl.CONNECTTIMEOUT, constants.RPC_CONNECT_TIMEOUT)
110

  
111

  
112
def RunWithRPC(fn):
113
  """RPC-wrapper decorator.
114

  
115
  When applied to a function, it runs it with the RPC system
116
  initialized, and it shutsdown the system afterwards. This means the
117
  function must be called without RPC being initialized.
118

  
119
  """
120
  def wrapper(*args, **kwargs):
121
    Init()
122
    try:
123
      return fn(*args, **kwargs)
124
    finally:
125
      Shutdown()
126
  return wrapper
127

  
128

  
129
def _Compress(_, data):
130
  """Compresses a string for transport over RPC.
131

  
132
  Small amounts of data are not compressed.
133

  
134
  @type data: str
135
  @param data: Data
136
  @rtype: tuple
137
  @return: Encoded data to send
138

  
139
  """
140
  # Small amounts of data are not compressed
141
  if len(data) < 512:
142
    return (constants.RPC_ENCODING_NONE, data)
143

  
144
  # Compress with zlib and encode in base64
145
  return (constants.RPC_ENCODING_ZLIB_BASE64,
146
          base64.b64encode(zlib.compress(data, 3)))
147

  
148

  
149
class RpcResult(object):
150
  """RPC Result class.
151

  
152
  This class holds an RPC result. It is needed since in multi-node
153
  calls we can't raise an exception just because one out of many
154
  failed, and therefore we use this class to encapsulate the result.
155

  
156
  @ivar data: the data payload, for successful results, or None
157
  @ivar call: the name of the RPC call
158
  @ivar node: the name of the node to which we made the call
159
  @ivar offline: whether the operation failed because the node was
160
      offline, as opposed to actual failure; offline=True will always
161
      imply failed=True, in order to allow simpler checking if
162
      the user doesn't care about the exact failure mode
163
  @ivar fail_msg: the error message if the call failed
164

  
165
  """
166
  def __init__(self, data=None, failed=False, offline=False,
167
               call=None, node=None):
168
    self.offline = offline
169
    self.call = call
170
    self.node = node
171

  
172
    if offline:
173
      self.fail_msg = "Node is marked offline"
174
      self.data = self.payload = None
175
    elif failed:
176
      self.fail_msg = self._EnsureErr(data)
177
      self.data = self.payload = None
178
    else:
179
      self.data = data
180
      if not isinstance(self.data, (tuple, list)):
181
        self.fail_msg = ("RPC layer error: invalid result type (%s)" %
182
                         type(self.data))
183
        self.payload = None
184
      elif len(data) != 2:
185
        self.fail_msg = ("RPC layer error: invalid result length (%d), "
186
                         "expected 2" % len(self.data))
187
        self.payload = None
188
      elif not self.data[0]:
189
        self.fail_msg = self._EnsureErr(self.data[1])
190
        self.payload = None
191
      else:
192
        # finally success
193
        self.fail_msg = None
194
        self.payload = data[1]
195

  
196
    for attr_name in ["call", "data", "fail_msg",
197
                      "node", "offline", "payload"]:
198
      assert hasattr(self, attr_name), "Missing attribute %s" % attr_name
199

  
200
  def __repr__(self):
201
    return ("RpcResult(data=%s, call=%s, node=%s, offline=%s, fail_msg=%s)" %
202
            (self.offline, self.call, self.node, self.offline, self.fail_msg))
203

  
204
  @staticmethod
205
  def _EnsureErr(val):
206
    """Helper to ensure we return a 'True' value for error."""
207
    if val:
208
      return val
209
    else:
210
      return "No error information"
211

  
212
  def Raise(self, msg, prereq=False, ecode=None):
213
    """If the result has failed, raise an OpExecError.
214

  
215
    This is used so that LU code doesn't have to check for each
216
    result, but instead can call this function.
217

  
218
    """
219
    if not self.fail_msg:
220
      return
221

  
222
    if not msg: # one could pass None for default message
223
      msg = ("Call '%s' to node '%s' has failed: %s" %
224
             (self.call, self.node, self.fail_msg))
225
    else:
226
      msg = "%s: %s" % (msg, self.fail_msg)
227
    if prereq:
228
      ec = errors.OpPrereqError
229
    else:
230
      ec = errors.OpExecError
231
    if ecode is not None:
232
      args = (msg, ecode)
233
    else:
234
      args = (msg, )
235
    raise ec(*args) # pylint: disable=W0142
236

  
... This diff was truncated because it exceeds the maximum size that can be displayed.

Also available in: Unified diff