Statistics
| Branch: | Tag: | Revision:

root / lib / rpc.py @ 95e7e85e

History | View | Annotate | Download (26.4 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012, 2013 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Inter-node RPC library.
23

24
"""
25

    
26
# pylint: disable=C0103,R0201,R0904
27
# C0103: Invalid name, since call_ are not valid
28
# R0201: Method could be a function, we keep all rpcs instance methods
29
# as not to change them back and forth between static/instance methods
30
# if they need to start using instance attributes
31
# R0904: Too many public methods
32

    
33
import logging
34
import zlib
35
import base64
36
import pycurl
37
import threading
38
import copy
39

    
40
from ganeti import utils
41
from ganeti import objects
42
from ganeti import http
43
from ganeti import serializer
44
from ganeti import constants
45
from ganeti import errors
46
from ganeti import netutils
47
from ganeti import ssconf
48
from ganeti import runtime
49
from ganeti import compat
50
from ganeti import rpc_defs
51
from ganeti import pathutils
52
from ganeti import vcluster
53

    
54
# Special module generated at build time
55
from ganeti import _generated_rpc
56

    
57
# pylint has a bug here, doesn't see this import
58
import ganeti.http.client  # pylint: disable=W0611
59

    
60

    
61
_RPC_CLIENT_HEADERS = [
62
  "Content-type: %s" % http.HTTP_APP_JSON,
63
  "Expect:",
64
  ]
65

    
66
#: Special value to describe an offline host
67
_OFFLINE = object()
68

    
69

    
70
def Init():
71
  """Initializes the module-global HTTP client manager.
72

73
  Must be called before using any RPC function and while exactly one thread is
74
  running.
75

76
  """
77
  # curl_global_init(3) and curl_global_cleanup(3) must be called with only
78
  # one thread running. This check is just a safety measure -- it doesn't
79
  # cover all cases.
80
  assert threading.activeCount() == 1, \
81
         "Found more than one active thread when initializing pycURL"
82

    
83
  logging.info("Using PycURL %s", pycurl.version)
84

    
85
  pycurl.global_init(pycurl.GLOBAL_ALL)
86

    
87

    
88
def Shutdown():
89
  """Stops the module-global HTTP client manager.
90

91
  Must be called before quitting the program and while exactly one thread is
92
  running.
93

94
  """
95
  pycurl.global_cleanup()
96

    
97

    
98
def _ConfigRpcCurl(curl):
99
  noded_cert = str(pathutils.NODED_CERT_FILE)
100

    
101
  curl.setopt(pycurl.FOLLOWLOCATION, False)
102
  curl.setopt(pycurl.CAINFO, noded_cert)
103
  curl.setopt(pycurl.SSL_VERIFYHOST, 0)
104
  curl.setopt(pycurl.SSL_VERIFYPEER, True)
105
  curl.setopt(pycurl.SSLCERTTYPE, "PEM")
106
  curl.setopt(pycurl.SSLCERT, noded_cert)
107
  curl.setopt(pycurl.SSLKEYTYPE, "PEM")
108
  curl.setopt(pycurl.SSLKEY, noded_cert)
109
  curl.setopt(pycurl.CONNECTTIMEOUT, constants.RPC_CONNECT_TIMEOUT)
110

    
111

    
112
def RunWithRPC(fn):
113
  """RPC-wrapper decorator.
114

115
  When applied to a function, it runs it with the RPC system
116
  initialized, and it shutsdown the system afterwards. This means the
117
  function must be called without RPC being initialized.
118

119
  """
120
  def wrapper(*args, **kwargs):
121
    Init()
122
    try:
123
      return fn(*args, **kwargs)
124
    finally:
125
      Shutdown()
126
  return wrapper
127

    
128

    
129
def _Compress(data):
130
  """Compresses a string for transport over RPC.
131

132
  Small amounts of data are not compressed.
133

134
  @type data: str
135
  @param data: Data
136
  @rtype: tuple
137
  @return: Encoded data to send
138

139
  """
140
  # Small amounts of data are not compressed
141
  if len(data) < 512:
142
    return (constants.RPC_ENCODING_NONE, data)
143

    
144
  # Compress with zlib and encode in base64
145
  return (constants.RPC_ENCODING_ZLIB_BASE64,
146
          base64.b64encode(zlib.compress(data, 3)))
147

    
148

    
149
class RpcResult(object):
150
  """RPC Result class.
151

152
  This class holds an RPC result. It is needed since in multi-node
153
  calls we can't raise an exception just because one out of many
154
  failed, and therefore we use this class to encapsulate the result.
155

156
  @ivar data: the data payload, for successful results, or None
157
  @ivar call: the name of the RPC call
158
  @ivar node: the name of the node to which we made the call
159
  @ivar offline: whether the operation failed because the node was
160
      offline, as opposed to actual failure; offline=True will always
161
      imply failed=True, in order to allow simpler checking if
162
      the user doesn't care about the exact failure mode
163
  @ivar fail_msg: the error message if the call failed
164

165
  """
166
  def __init__(self, data=None, failed=False, offline=False,
167
               call=None, node=None):
168
    self.offline = offline
169
    self.call = call
170
    self.node = node
171

    
172
    if offline:
173
      self.fail_msg = "Node is marked offline"
174
      self.data = self.payload = None
175
    elif failed:
176
      self.fail_msg = self._EnsureErr(data)
177
      self.data = self.payload = None
178
    else:
179
      self.data = data
180
      if not isinstance(self.data, (tuple, list)):
181
        self.fail_msg = ("RPC layer error: invalid result type (%s)" %
182
                         type(self.data))
183
        self.payload = None
184
      elif len(data) != 2:
185
        self.fail_msg = ("RPC layer error: invalid result length (%d), "
186
                         "expected 2" % len(self.data))
187
        self.payload = None
188
      elif not self.data[0]:
189
        self.fail_msg = self._EnsureErr(self.data[1])
190
        self.payload = None
191
      else:
192
        # finally success
193
        self.fail_msg = None
194
        self.payload = data[1]
195

    
196
    for attr_name in ["call", "data", "fail_msg",
197
                      "node", "offline", "payload"]:
198
      assert hasattr(self, attr_name), "Missing attribute %s" % attr_name
199

    
200
  @staticmethod
201
  def _EnsureErr(val):
202
    """Helper to ensure we return a 'True' value for error."""
203
    if val:
204
      return val
205
    else:
206
      return "No error information"
207

    
208
  def Raise(self, msg, prereq=False, ecode=None):
209
    """If the result has failed, raise an OpExecError.
210

211
    This is used so that LU code doesn't have to check for each
212
    result, but instead can call this function.
213

214
    """
215
    if not self.fail_msg:
216
      return
217

    
218
    if not msg: # one could pass None for default message
219
      msg = ("Call '%s' to node '%s' has failed: %s" %
220
             (self.call, self.node, self.fail_msg))
221
    else:
222
      msg = "%s: %s" % (msg, self.fail_msg)
223
    if prereq:
224
      ec = errors.OpPrereqError
225
    else:
226
      ec = errors.OpExecError
227
    if ecode is not None:
228
      args = (msg, ecode)
229
    else:
230
      args = (msg, )
231
    raise ec(*args) # pylint: disable=W0142
232

    
233
  def Warn(self, msg, feedback_fn):
234
    """If the result has failed, call the feedback_fn.
235

236
    This is used to in cases were LU wants to warn the
237
    user about a failure, but continue anyway.
238

239
    """
240
    if not self.fail_msg:
241
      return
242

    
243
    msg = "%s: %s" % (msg, self.fail_msg)
244
    feedback_fn(msg)
245

    
246

    
247
def _SsconfResolver(ssconf_ips, node_list, _,
248
                    ssc=ssconf.SimpleStore,
249
                    nslookup_fn=netutils.Hostname.GetIP):
250
  """Return addresses for given node names.
251

252
  @type ssconf_ips: bool
253
  @param ssconf_ips: Use the ssconf IPs
254
  @type node_list: list
255
  @param node_list: List of node names
256
  @type ssc: class
257
  @param ssc: SimpleStore class that is used to obtain node->ip mappings
258
  @type nslookup_fn: callable
259
  @param nslookup_fn: function use to do NS lookup
260
  @rtype: list of tuple; (string, string)
261
  @return: List of tuples containing node name and IP address
262

263
  """
264
  ss = ssc()
265
  family = ss.GetPrimaryIPFamily()
266

    
267
  if ssconf_ips:
268
    iplist = ss.GetNodePrimaryIPList()
269
    ipmap = dict(entry.split() for entry in iplist)
270
  else:
271
    ipmap = {}
272

    
273
  result = []
274
  for node in node_list:
275
    ip = ipmap.get(node)
276
    if ip is None:
277
      ip = nslookup_fn(node, family=family)
278
    result.append((node, ip))
279

    
280
  return result
281

    
282

    
283
class _StaticResolver:
284
  def __init__(self, addresses):
285
    """Initializes this class.
286

287
    """
288
    self._addresses = addresses
289

    
290
  def __call__(self, hosts, _):
291
    """Returns static addresses for hosts.
292

293
    """
294
    assert len(hosts) == len(self._addresses)
295
    return zip(hosts, self._addresses)
296

    
297

    
298
def _CheckConfigNode(name, node, accept_offline_node):
299
  """Checks if a node is online.
300

301
  @type name: string
302
  @param name: Node name
303
  @type node: L{objects.Node} or None
304
  @param node: Node object
305

306
  """
307
  if node is None:
308
    # Depend on DNS for name resolution
309
    ip = name
310
  elif node.offline and not accept_offline_node:
311
    ip = _OFFLINE
312
  else:
313
    ip = node.primary_ip
314
  return (name, ip)
315

    
316

    
317
def _NodeConfigResolver(single_node_fn, all_nodes_fn, hosts, opts):
318
  """Calculate node addresses using configuration.
319

320
  """
321
  accept_offline_node = (opts is rpc_defs.ACCEPT_OFFLINE_NODE)
322

    
323
  assert accept_offline_node or opts is None, "Unknown option"
324

    
325
  # Special case for single-host lookups
326
  if len(hosts) == 1:
327
    (name, ) = hosts
328
    return [_CheckConfigNode(name, single_node_fn(name), accept_offline_node)]
329
  else:
330
    all_nodes = all_nodes_fn()
331
    return [_CheckConfigNode(name, all_nodes.get(name, None),
332
                             accept_offline_node)
333
            for name in hosts]
334

    
335

    
336
class _RpcProcessor:
337
  def __init__(self, resolver, port, lock_monitor_cb=None):
338
    """Initializes this class.
339

340
    @param resolver: callable accepting a list of hostnames, returning a list
341
      of tuples containing name and IP address (IP address can be the name or
342
      the special value L{_OFFLINE} to mark offline machines)
343
    @type port: int
344
    @param port: TCP port
345
    @param lock_monitor_cb: Callable for registering with lock monitor
346

347
    """
348
    self._resolver = resolver
349
    self._port = port
350
    self._lock_monitor_cb = lock_monitor_cb
351

    
352
  @staticmethod
353
  def _PrepareRequests(hosts, port, procedure, body, read_timeout):
354
    """Prepares requests by sorting offline hosts into separate list.
355

356
    @type body: dict
357
    @param body: a dictionary with per-host body data
358

359
    """
360
    results = {}
361
    requests = {}
362

    
363
    assert isinstance(body, dict)
364
    assert len(body) == len(hosts)
365
    assert compat.all(isinstance(v, str) for v in body.values())
366
    assert frozenset(map(compat.fst, hosts)) == frozenset(body.keys()), \
367
        "%s != %s" % (hosts, body.keys())
368

    
369
    for (name, ip) in hosts:
370
      if ip is _OFFLINE:
371
        # Node is marked as offline
372
        results[name] = RpcResult(node=name, offline=True, call=procedure)
373
      else:
374
        requests[name] = \
375
          http.client.HttpClientRequest(str(ip), port,
376
                                        http.HTTP_POST, str("/%s" % procedure),
377
                                        headers=_RPC_CLIENT_HEADERS,
378
                                        post_data=body[name],
379
                                        read_timeout=read_timeout,
380
                                        nicename="%s/%s" % (name, procedure),
381
                                        curl_config_fn=_ConfigRpcCurl)
382

    
383
    return (results, requests)
384

    
385
  @staticmethod
386
  def _CombineResults(results, requests, procedure):
387
    """Combines pre-computed results for offline hosts with actual call results.
388

389
    """
390
    for name, req in requests.items():
391
      if req.success and req.resp_status_code == http.HTTP_OK:
392
        host_result = RpcResult(data=serializer.LoadJson(req.resp_body),
393
                                node=name, call=procedure)
394
      else:
395
        # TODO: Better error reporting
396
        if req.error:
397
          msg = req.error
398
        else:
399
          msg = req.resp_body
400

    
401
        logging.error("RPC error in %s on node %s: %s", procedure, name, msg)
402
        host_result = RpcResult(data=msg, failed=True, node=name,
403
                                call=procedure)
404

    
405
      results[name] = host_result
406

    
407
    return results
408

    
409
  def __call__(self, hosts, procedure, body, read_timeout, resolver_opts,
410
               _req_process_fn=None):
411
    """Makes an RPC request to a number of nodes.
412

413
    @type hosts: sequence
414
    @param hosts: Hostnames
415
    @type procedure: string
416
    @param procedure: Request path
417
    @type body: dictionary
418
    @param body: dictionary with request bodies per host
419
    @type read_timeout: int or None
420
    @param read_timeout: Read timeout for request
421
    @rtype: dictionary
422
    @return: a dictionary mapping host names to rpc.RpcResult objects
423

424
    """
425
    assert read_timeout is not None, \
426
      "Missing RPC read timeout for procedure '%s'" % procedure
427

    
428
    if _req_process_fn is None:
429
      _req_process_fn = http.client.ProcessRequests
430

    
431
    (results, requests) = \
432
      self._PrepareRequests(self._resolver(hosts, resolver_opts), self._port,
433
                            procedure, body, read_timeout)
434

    
435
    _req_process_fn(requests.values(), lock_monitor_cb=self._lock_monitor_cb)
436

    
437
    assert not frozenset(results).intersection(requests)
438

    
439
    return self._CombineResults(results, requests, procedure)
440

    
441

    
442
class _RpcClientBase:
443
  def __init__(self, resolver, encoder_fn, lock_monitor_cb=None,
444
               _req_process_fn=None):
445
    """Initializes this class.
446

447
    """
448
    proc = _RpcProcessor(resolver,
449
                         netutils.GetDaemonPort(constants.NODED),
450
                         lock_monitor_cb=lock_monitor_cb)
451
    self._proc = compat.partial(proc, _req_process_fn=_req_process_fn)
452
    self._encoder = compat.partial(self._EncodeArg, encoder_fn)
453

    
454
  @staticmethod
455
  def _EncodeArg(encoder_fn, (argkind, value)):
456
    """Encode argument.
457

458
    """
459
    if argkind is None:
460
      return value
461
    else:
462
      return encoder_fn(argkind)(value)
463

    
464
  def _Call(self, cdef, node_list, args):
465
    """Entry point for automatically generated RPC wrappers.
466

467
    """
468
    (procedure, _, resolver_opts, timeout, argdefs,
469
     prep_fn, postproc_fn, _) = cdef
470

    
471
    if callable(timeout):
472
      read_timeout = timeout(args)
473
    else:
474
      read_timeout = timeout
475

    
476
    if callable(resolver_opts):
477
      req_resolver_opts = resolver_opts(args)
478
    else:
479
      req_resolver_opts = resolver_opts
480

    
481
    if len(args) != len(argdefs):
482
      raise errors.ProgrammerError("Number of passed arguments doesn't match")
483

    
484
    enc_args = map(self._encoder, zip(map(compat.snd, argdefs), args))
485
    if prep_fn is None:
486
      # for a no-op prep_fn, we serialise the body once, and then we
487
      # reuse it in the dictionary values
488
      body = serializer.DumpJson(enc_args)
489
      pnbody = dict((n, body) for n in node_list)
490
    else:
491
      # for a custom prep_fn, we pass the encoded arguments and the
492
      # node name to the prep_fn, and we serialise its return value
493
      assert callable(prep_fn)
494
      pnbody = dict((n, serializer.DumpJson(prep_fn(n, enc_args)))
495
                    for n in node_list)
496

    
497
    result = self._proc(node_list, procedure, pnbody, read_timeout,
498
                        req_resolver_opts)
499

    
500
    if postproc_fn:
501
      return dict(map(lambda (key, value): (key, postproc_fn(value)),
502
                      result.items()))
503
    else:
504
      return result
505

    
506

    
507
def _ObjectToDict(value):
508
  """Converts an object to a dictionary.
509

510
  @note: See L{objects}.
511

512
  """
513
  return value.ToDict()
514

    
515

    
516
def _ObjectListToDict(value):
517
  """Converts a list of L{objects} to dictionaries.
518

519
  """
520
  return map(_ObjectToDict, value)
521

    
522

    
523
def _EncodeNodeToDiskDict(value):
524
  """Encodes a dictionary with node name as key and disk objects as values.
525

526
  """
527
  return dict((name, _ObjectListToDict(disks))
528
              for name, disks in value.items())
529

    
530

    
531
def _PrepareFileUpload(getents_fn, filename):
532
  """Loads a file and prepares it for an upload to nodes.
533

534
  """
535
  statcb = utils.FileStatHelper()
536
  data = _Compress(utils.ReadFile(filename, preread=statcb))
537
  st = statcb.st
538

    
539
  if getents_fn is None:
540
    getents_fn = runtime.GetEnts
541

    
542
  getents = getents_fn()
543

    
544
  virt_filename = vcluster.MakeVirtualPath(filename)
545

    
546
  return [virt_filename, data, st.st_mode, getents.LookupUid(st.st_uid),
547
          getents.LookupGid(st.st_gid), st.st_atime, st.st_mtime]
548

    
549

    
550
def _PrepareFinalizeExportDisks(snap_disks):
551
  """Encodes disks for finalizing export.
552

553
  """
554
  flat_disks = []
555

    
556
  for disk in snap_disks:
557
    if isinstance(disk, bool):
558
      flat_disks.append(disk)
559
    else:
560
      flat_disks.append(disk.ToDict())
561

    
562
  return flat_disks
563

    
564

    
565
def _EncodeImportExportIO((ieio, ieioargs)):
566
  """Encodes import/export I/O information.
567

568
  """
569
  if ieio == constants.IEIO_RAW_DISK:
570
    assert len(ieioargs) == 1
571
    return (ieio, (ieioargs[0].ToDict(), ))
572

    
573
  if ieio == constants.IEIO_SCRIPT:
574
    assert len(ieioargs) == 2
575
    return (ieio, (ieioargs[0].ToDict(), ieioargs[1]))
576

    
577
  return (ieio, ieioargs)
578

    
579

    
580
def _EncodeBlockdevRename(value):
581
  """Encodes information for renaming block devices.
582

583
  """
584
  return [(d.ToDict(), uid) for d, uid in value]
585

    
586

    
587
def MakeLegacyNodeInfo(data, require_vg_info=True):
588
  """Formats the data returned by L{rpc.RpcRunner.call_node_info}.
589

590
  Converts the data into a single dictionary. This is fine for most use cases,
591
  but some require information from more than one volume group or hypervisor.
592

593
  @param require_vg_info: raise an error if the returnd vg_info
594
      doesn't have any values
595

596
  """
597
  (bootid, vgs_info, (hv_info, )) = data
598

    
599
  ret = utils.JoinDisjointDicts(hv_info, {"bootid": bootid})
600

    
601
  if require_vg_info or vgs_info:
602
    (vg0_info, ) = vgs_info
603
    ret = utils.JoinDisjointDicts(vg0_info, ret)
604

    
605
  return ret
606

    
607

    
608
def _AnnotateDParamsDRBD(disk, (drbd_params, data_params, meta_params)):
609
  """Annotates just DRBD disks layouts.
610

611
  """
612
  assert disk.dev_type == constants.LD_DRBD8
613

    
614
  disk.params = objects.FillDict(drbd_params, disk.params)
615
  (dev_data, dev_meta) = disk.children
616
  dev_data.params = objects.FillDict(data_params, dev_data.params)
617
  dev_meta.params = objects.FillDict(meta_params, dev_meta.params)
618

    
619
  return disk
620

    
621

    
622
def _AnnotateDParamsGeneric(disk, (params, )):
623
  """Generic disk parameter annotation routine.
624

625
  """
626
  assert disk.dev_type != constants.LD_DRBD8
627

    
628
  disk.params = objects.FillDict(params, disk.params)
629

    
630
  return disk
631

    
632

    
633
def AnnotateDiskParams(template, disks, disk_params):
634
  """Annotates the disk objects with the disk parameters.
635

636
  @param template: The disk template used
637
  @param disks: The list of disks objects to annotate
638
  @param disk_params: The disk paramaters for annotation
639
  @returns: A list of disk objects annotated
640

641
  """
642
  ld_params = objects.Disk.ComputeLDParams(template, disk_params)
643

    
644
  if template == constants.DT_DRBD8:
645
    annotation_fn = _AnnotateDParamsDRBD
646
  elif template == constants.DT_DISKLESS:
647
    annotation_fn = lambda disk, _: disk
648
  else:
649
    annotation_fn = _AnnotateDParamsGeneric
650

    
651
  return [annotation_fn(disk.Copy(), ld_params) for disk in disks]
652

    
653

    
654
def _GetESFlag(cfg, nodename):
655
  ni = cfg.GetNodeInfo(nodename)
656
  if ni is None:
657
    raise errors.OpPrereqError("Invalid node name %s" % nodename,
658
                               errors.ECODE_NOENT)
659
  return cfg.GetNdParams(ni)[constants.ND_EXCLUSIVE_STORAGE]
660

    
661

    
662
def GetExclusiveStorageForNodeNames(cfg, nodelist):
663
  """Return the exclusive storage flag for all the given nodes.
664

665
  @type cfg: L{config.ConfigWriter}
666
  @param cfg: cluster configuration
667
  @type nodelist: list or tuple
668
  @param nodelist: node names for which to read the flag
669
  @rtype: dict
670
  @return: mapping from node names to exclusive storage flags
671
  @raise errors.OpPrereqError: if any given node name has no corresponding node
672

673
  """
674
  getflag = lambda n: _GetESFlag(cfg, n)
675
  flags = map(getflag, nodelist)
676
  return dict(zip(nodelist, flags))
677

    
678

    
679
#: Generic encoders
680
_ENCODERS = {
681
  rpc_defs.ED_OBJECT_DICT: _ObjectToDict,
682
  rpc_defs.ED_OBJECT_DICT_LIST: _ObjectListToDict,
683
  rpc_defs.ED_NODE_TO_DISK_DICT: _EncodeNodeToDiskDict,
684
  rpc_defs.ED_COMPRESS: _Compress,
685
  rpc_defs.ED_FINALIZE_EXPORT_DISKS: _PrepareFinalizeExportDisks,
686
  rpc_defs.ED_IMPEXP_IO: _EncodeImportExportIO,
687
  rpc_defs.ED_BLOCKDEV_RENAME: _EncodeBlockdevRename,
688
  }
689

    
690

    
691
class RpcRunner(_RpcClientBase,
692
                _generated_rpc.RpcClientDefault,
693
                _generated_rpc.RpcClientBootstrap,
694
                _generated_rpc.RpcClientDnsOnly,
695
                _generated_rpc.RpcClientConfig):
696
  """RPC runner class.
697

698
  """
699
  def __init__(self, cfg, lock_monitor_cb, _req_process_fn=None, _getents=None):
700
    """Initialized the RPC runner.
701

702
    @type cfg: L{config.ConfigWriter}
703
    @param cfg: Configuration
704
    @type lock_monitor_cb: callable
705
    @param lock_monitor_cb: Lock monitor callback
706

707
    """
708
    self._cfg = cfg
709

    
710
    encoders = _ENCODERS.copy()
711

    
712
    encoders.update({
713
      # Encoders requiring configuration object
714
      rpc_defs.ED_INST_DICT: self._InstDict,
715
      rpc_defs.ED_INST_DICT_HVP_BEP_DP: self._InstDictHvpBepDp,
716
      rpc_defs.ED_INST_DICT_OSP_DP: self._InstDictOspDp,
717
      rpc_defs.ED_NIC_DICT: self._NicDict,
718

    
719
      # Encoders annotating disk parameters
720
      rpc_defs.ED_DISKS_DICT_DP: self._DisksDictDP,
721
      rpc_defs.ED_SINGLE_DISK_DICT_DP: self._SingleDiskDictDP,
722

    
723
      # Encoders with special requirements
724
      rpc_defs.ED_FILE_DETAILS: compat.partial(_PrepareFileUpload, _getents),
725
      })
726

    
727
    # Resolver using configuration
728
    resolver = compat.partial(_NodeConfigResolver, cfg.GetNodeInfo,
729
                              cfg.GetAllNodesInfo)
730

    
731
    # Pylint doesn't recognize multiple inheritance properly, see
732
    # <http://www.logilab.org/ticket/36586> and
733
    # <http://www.logilab.org/ticket/35642>
734
    # pylint: disable=W0233
735
    _RpcClientBase.__init__(self, resolver, encoders.get,
736
                            lock_monitor_cb=lock_monitor_cb,
737
                            _req_process_fn=_req_process_fn)
738
    _generated_rpc.RpcClientConfig.__init__(self)
739
    _generated_rpc.RpcClientBootstrap.__init__(self)
740
    _generated_rpc.RpcClientDnsOnly.__init__(self)
741
    _generated_rpc.RpcClientDefault.__init__(self)
742

    
743
  def _NicDict(self, nic):
744
    """Convert the given nic to a dict and encapsulate netinfo
745

746
    """
747
    n = copy.deepcopy(nic)
748
    if n.network:
749
      net_uuid = self._cfg.LookupNetwork(n.network)
750
      if net_uuid:
751
        nobj = self._cfg.GetNetwork(net_uuid)
752
        n.netinfo = objects.Network.ToDict(nobj)
753
    return n.ToDict()
754

    
755
  def _InstDict(self, instance, hvp=None, bep=None, osp=None):
756
    """Convert the given instance to a dict.
757

758
    This is done via the instance's ToDict() method and additionally
759
    we fill the hvparams with the cluster defaults.
760

761
    @type instance: L{objects.Instance}
762
    @param instance: an Instance object
763
    @type hvp: dict or None
764
    @param hvp: a dictionary with overridden hypervisor parameters
765
    @type bep: dict or None
766
    @param bep: a dictionary with overridden backend parameters
767
    @type osp: dict or None
768
    @param osp: a dictionary with overridden os parameters
769
    @rtype: dict
770
    @return: the instance dict, with the hvparams filled with the
771
        cluster defaults
772

773
    """
774
    idict = instance.ToDict()
775
    cluster = self._cfg.GetClusterInfo()
776
    idict["hvparams"] = cluster.FillHV(instance)
777
    if hvp is not None:
778
      idict["hvparams"].update(hvp)
779
    idict["beparams"] = cluster.FillBE(instance)
780
    if bep is not None:
781
      idict["beparams"].update(bep)
782
    idict["osparams"] = cluster.SimpleFillOS(instance.os, instance.osparams)
783
    if osp is not None:
784
      idict["osparams"].update(osp)
785
    idict["disks"] = self._DisksDictDP((instance.disks, instance))
786
    for nic in idict["nics"]:
787
      nic["nicparams"] = objects.FillDict(
788
        cluster.nicparams[constants.PP_DEFAULT],
789
        nic["nicparams"])
790
      network = nic.get("network", None)
791
      if network:
792
        net_uuid = self._cfg.LookupNetwork(network)
793
        if net_uuid:
794
          nobj = self._cfg.GetNetwork(net_uuid)
795
          nic["netinfo"] = objects.Network.ToDict(nobj)
796
    return idict
797

    
798
  def _InstDictHvpBepDp(self, (instance, hvp, bep)):
799
    """Wrapper for L{_InstDict}.
800

801
    """
802
    return self._InstDict(instance, hvp=hvp, bep=bep)
803

    
804
  def _InstDictOspDp(self, (instance, osparams)):
805
    """Wrapper for L{_InstDict}.
806

807
    """
808
    return self._InstDict(instance, osp=osparams)
809

    
810
  def _DisksDictDP(self, (disks, instance)):
811
    """Wrapper for L{AnnotateDiskParams}.
812

813
    """
814
    diskparams = self._cfg.GetInstanceDiskParams(instance)
815
    return [disk.ToDict()
816
            for disk in AnnotateDiskParams(instance.disk_template,
817
                                           disks, diskparams)]
818

    
819
  def _SingleDiskDictDP(self, (disk, instance)):
820
    """Wrapper for L{AnnotateDiskParams}.
821

822
    """
823
    (anno_disk,) = self._DisksDictDP(([disk], instance))
824
    return anno_disk
825

    
826

    
827
class JobQueueRunner(_RpcClientBase, _generated_rpc.RpcClientJobQueue):
828
  """RPC wrappers for job queue.
829

830
  """
831
  def __init__(self, context, address_list):
832
    """Initializes this class.
833

834
    """
835
    if address_list is None:
836
      resolver = compat.partial(_SsconfResolver, True)
837
    else:
838
      # Caller provided an address list
839
      resolver = _StaticResolver(address_list)
840

    
841
    _RpcClientBase.__init__(self, resolver, _ENCODERS.get,
842
                            lock_monitor_cb=context.glm.AddToLockMonitor)
843
    _generated_rpc.RpcClientJobQueue.__init__(self)
844

    
845

    
846
class BootstrapRunner(_RpcClientBase,
847
                      _generated_rpc.RpcClientBootstrap,
848
                      _generated_rpc.RpcClientDnsOnly):
849
  """RPC wrappers for bootstrapping.
850

851
  """
852
  def __init__(self):
853
    """Initializes this class.
854

855
    """
856
    # Pylint doesn't recognize multiple inheritance properly, see
857
    # <http://www.logilab.org/ticket/36586> and
858
    # <http://www.logilab.org/ticket/35642>
859
    # pylint: disable=W0233
860
    _RpcClientBase.__init__(self, compat.partial(_SsconfResolver, True),
861
                            _ENCODERS.get)
862
    _generated_rpc.RpcClientBootstrap.__init__(self)
863
    _generated_rpc.RpcClientDnsOnly.__init__(self)
864

    
865

    
866
class DnsOnlyRunner(_RpcClientBase, _generated_rpc.RpcClientDnsOnly):
867
  """RPC wrappers for calls using only DNS.
868

869
  """
870
  def __init__(self):
871
    """Initialize this class.
872

873
    """
874
    _RpcClientBase.__init__(self, compat.partial(_SsconfResolver, False),
875
                            _ENCODERS.get)
876
    _generated_rpc.RpcClientDnsOnly.__init__(self)
877

    
878

    
879
class ConfigRunner(_RpcClientBase, _generated_rpc.RpcClientConfig):
880
  """RPC wrappers for L{config}.
881

882
  """
883
  def __init__(self, context, address_list, _req_process_fn=None,
884
               _getents=None):
885
    """Initializes this class.
886

887
    """
888
    if context:
889
      lock_monitor_cb = context.glm.AddToLockMonitor
890
    else:
891
      lock_monitor_cb = None
892

    
893
    if address_list is None:
894
      resolver = compat.partial(_SsconfResolver, True)
895
    else:
896
      # Caller provided an address list
897
      resolver = _StaticResolver(address_list)
898

    
899
    encoders = _ENCODERS.copy()
900

    
901
    encoders.update({
902
      rpc_defs.ED_FILE_DETAILS: compat.partial(_PrepareFileUpload, _getents),
903
      })
904

    
905
    _RpcClientBase.__init__(self, resolver, encoders.get,
906
                            lock_monitor_cb=lock_monitor_cb,
907
                            _req_process_fn=_req_process_fn)
908
    _generated_rpc.RpcClientConfig.__init__(self)