Statistics
| Branch: | Tag: | Revision:

root / lib / rpc.py @ 1d4a4b26

History | View | Annotate | Download (26.1 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012, 2013 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Inter-node RPC library.
23

24
"""
25

    
26
# pylint: disable=C0103,R0201,R0904
27
# C0103: Invalid name, since call_ are not valid
28
# R0201: Method could be a function, we keep all rpcs instance methods
29
# as not to change them back and forth between static/instance methods
30
# if they need to start using instance attributes
31
# R0904: Too many public methods
32

    
33
import logging
34
import zlib
35
import base64
36
import pycurl
37
import threading
38
import copy
39

    
40
from ganeti import utils
41
from ganeti import objects
42
from ganeti import http
43
from ganeti import serializer
44
from ganeti import constants
45
from ganeti import errors
46
from ganeti import netutils
47
from ganeti import ssconf
48
from ganeti import runtime
49
from ganeti import compat
50
from ganeti import rpc_defs
51
from ganeti import pathutils
52
from ganeti import vcluster
53

    
54
# Special module generated at build time
55
from ganeti import _generated_rpc
56

    
57
# pylint has a bug here, doesn't see this import
58
import ganeti.http.client  # pylint: disable=W0611
59

    
60

    
61
_RPC_CLIENT_HEADERS = [
62
  "Content-type: %s" % http.HTTP_APP_JSON,
63
  "Expect:",
64
  ]
65

    
66
#: Special value to describe an offline host
67
_OFFLINE = object()
68

    
69

    
70
def Init():
71
  """Initializes the module-global HTTP client manager.
72

73
  Must be called before using any RPC function and while exactly one thread is
74
  running.
75

76
  """
77
  # curl_global_init(3) and curl_global_cleanup(3) must be called with only
78
  # one thread running. This check is just a safety measure -- it doesn't
79
  # cover all cases.
80
  assert threading.activeCount() == 1, \
81
         "Found more than one active thread when initializing pycURL"
82

    
83
  logging.info("Using PycURL %s", pycurl.version)
84

    
85
  pycurl.global_init(pycurl.GLOBAL_ALL)
86

    
87

    
88
def Shutdown():
89
  """Stops the module-global HTTP client manager.
90

91
  Must be called before quitting the program and while exactly one thread is
92
  running.
93

94
  """
95
  pycurl.global_cleanup()
96

    
97

    
98
def _ConfigRpcCurl(curl):
99
  noded_cert = str(pathutils.NODED_CERT_FILE)
100

    
101
  curl.setopt(pycurl.FOLLOWLOCATION, False)
102
  curl.setopt(pycurl.CAINFO, noded_cert)
103
  curl.setopt(pycurl.SSL_VERIFYHOST, 0)
104
  curl.setopt(pycurl.SSL_VERIFYPEER, True)
105
  curl.setopt(pycurl.SSLCERTTYPE, "PEM")
106
  curl.setopt(pycurl.SSLCERT, noded_cert)
107
  curl.setopt(pycurl.SSLKEYTYPE, "PEM")
108
  curl.setopt(pycurl.SSLKEY, noded_cert)
109
  curl.setopt(pycurl.CONNECTTIMEOUT, constants.RPC_CONNECT_TIMEOUT)
110

    
111

    
112
def RunWithRPC(fn):
113
  """RPC-wrapper decorator.
114

115
  When applied to a function, it runs it with the RPC system
116
  initialized, and it shutsdown the system afterwards. This means the
117
  function must be called without RPC being initialized.
118

119
  """
120
  def wrapper(*args, **kwargs):
121
    Init()
122
    try:
123
      return fn(*args, **kwargs)
124
    finally:
125
      Shutdown()
126
  return wrapper
127

    
128

    
129
def _Compress(data):
130
  """Compresses a string for transport over RPC.
131

132
  Small amounts of data are not compressed.
133

134
  @type data: str
135
  @param data: Data
136
  @rtype: tuple
137
  @return: Encoded data to send
138

139
  """
140
  # Small amounts of data are not compressed
141
  if len(data) < 512:
142
    return (constants.RPC_ENCODING_NONE, data)
143

    
144
  # Compress with zlib and encode in base64
145
  return (constants.RPC_ENCODING_ZLIB_BASE64,
146
          base64.b64encode(zlib.compress(data, 3)))
147

    
148

    
149
class RpcResult(object):
150
  """RPC Result class.
151

152
  This class holds an RPC result. It is needed since in multi-node
153
  calls we can't raise an exception just because one out of many
154
  failed, and therefore we use this class to encapsulate the result.
155

156
  @ivar data: the data payload, for successful results, or None
157
  @ivar call: the name of the RPC call
158
  @ivar node: the name of the node to which we made the call
159
  @ivar offline: whether the operation failed because the node was
160
      offline, as opposed to actual failure; offline=True will always
161
      imply failed=True, in order to allow simpler checking if
162
      the user doesn't care about the exact failure mode
163
  @ivar fail_msg: the error message if the call failed
164

165
  """
166
  def __init__(self, data=None, failed=False, offline=False,
167
               call=None, node=None):
168
    self.offline = offline
169
    self.call = call
170
    self.node = node
171

    
172
    if offline:
173
      self.fail_msg = "Node is marked offline"
174
      self.data = self.payload = None
175
    elif failed:
176
      self.fail_msg = self._EnsureErr(data)
177
      self.data = self.payload = None
178
    else:
179
      self.data = data
180
      if not isinstance(self.data, (tuple, list)):
181
        self.fail_msg = ("RPC layer error: invalid result type (%s)" %
182
                         type(self.data))
183
        self.payload = None
184
      elif len(data) != 2:
185
        self.fail_msg = ("RPC layer error: invalid result length (%d), "
186
                         "expected 2" % len(self.data))
187
        self.payload = None
188
      elif not self.data[0]:
189
        self.fail_msg = self._EnsureErr(self.data[1])
190
        self.payload = None
191
      else:
192
        # finally success
193
        self.fail_msg = None
194
        self.payload = data[1]
195

    
196
    for attr_name in ["call", "data", "fail_msg",
197
                      "node", "offline", "payload"]:
198
      assert hasattr(self, attr_name), "Missing attribute %s" % attr_name
199

    
200
  @staticmethod
201
  def _EnsureErr(val):
202
    """Helper to ensure we return a 'True' value for error."""
203
    if val:
204
      return val
205
    else:
206
      return "No error information"
207

    
208
  def Raise(self, msg, prereq=False, ecode=None):
209
    """If the result has failed, raise an OpExecError.
210

211
    This is used so that LU code doesn't have to check for each
212
    result, but instead can call this function.
213

214
    """
215
    if not self.fail_msg:
216
      return
217

    
218
    if not msg: # one could pass None for default message
219
      msg = ("Call '%s' to node '%s' has failed: %s" %
220
             (self.call, self.node, self.fail_msg))
221
    else:
222
      msg = "%s: %s" % (msg, self.fail_msg)
223
    if prereq:
224
      ec = errors.OpPrereqError
225
    else:
226
      ec = errors.OpExecError
227
    if ecode is not None:
228
      args = (msg, ecode)
229
    else:
230
      args = (msg, )
231
    raise ec(*args) # pylint: disable=W0142
232

    
233

    
234
def _SsconfResolver(ssconf_ips, node_list, _,
235
                    ssc=ssconf.SimpleStore,
236
                    nslookup_fn=netutils.Hostname.GetIP):
237
  """Return addresses for given node names.
238

239
  @type ssconf_ips: bool
240
  @param ssconf_ips: Use the ssconf IPs
241
  @type node_list: list
242
  @param node_list: List of node names
243
  @type ssc: class
244
  @param ssc: SimpleStore class that is used to obtain node->ip mappings
245
  @type nslookup_fn: callable
246
  @param nslookup_fn: function use to do NS lookup
247
  @rtype: list of tuple; (string, string)
248
  @return: List of tuples containing node name and IP address
249

250
  """
251
  ss = ssc()
252
  family = ss.GetPrimaryIPFamily()
253

    
254
  if ssconf_ips:
255
    iplist = ss.GetNodePrimaryIPList()
256
    ipmap = dict(entry.split() for entry in iplist)
257
  else:
258
    ipmap = {}
259

    
260
  result = []
261
  for node in node_list:
262
    ip = ipmap.get(node)
263
    if ip is None:
264
      ip = nslookup_fn(node, family=family)
265
    result.append((node, ip))
266

    
267
  return result
268

    
269

    
270
class _StaticResolver:
271
  def __init__(self, addresses):
272
    """Initializes this class.
273

274
    """
275
    self._addresses = addresses
276

    
277
  def __call__(self, hosts, _):
278
    """Returns static addresses for hosts.
279

280
    """
281
    assert len(hosts) == len(self._addresses)
282
    return zip(hosts, self._addresses)
283

    
284

    
285
def _CheckConfigNode(name, node, accept_offline_node):
286
  """Checks if a node is online.
287

288
  @type name: string
289
  @param name: Node name
290
  @type node: L{objects.Node} or None
291
  @param node: Node object
292

293
  """
294
  if node is None:
295
    # Depend on DNS for name resolution
296
    ip = name
297
  elif node.offline and not accept_offline_node:
298
    ip = _OFFLINE
299
  else:
300
    ip = node.primary_ip
301
  return (name, ip)
302

    
303

    
304
def _NodeConfigResolver(single_node_fn, all_nodes_fn, hosts, opts):
305
  """Calculate node addresses using configuration.
306

307
  """
308
  accept_offline_node = (opts is rpc_defs.ACCEPT_OFFLINE_NODE)
309

    
310
  assert accept_offline_node or opts is None, "Unknown option"
311

    
312
  # Special case for single-host lookups
313
  if len(hosts) == 1:
314
    (name, ) = hosts
315
    return [_CheckConfigNode(name, single_node_fn(name), accept_offline_node)]
316
  else:
317
    all_nodes = all_nodes_fn()
318
    return [_CheckConfigNode(name, all_nodes.get(name, None),
319
                             accept_offline_node)
320
            for name in hosts]
321

    
322

    
323
class _RpcProcessor:
324
  def __init__(self, resolver, port, lock_monitor_cb=None):
325
    """Initializes this class.
326

327
    @param resolver: callable accepting a list of hostnames, returning a list
328
      of tuples containing name and IP address (IP address can be the name or
329
      the special value L{_OFFLINE} to mark offline machines)
330
    @type port: int
331
    @param port: TCP port
332
    @param lock_monitor_cb: Callable for registering with lock monitor
333

334
    """
335
    self._resolver = resolver
336
    self._port = port
337
    self._lock_monitor_cb = lock_monitor_cb
338

    
339
  @staticmethod
340
  def _PrepareRequests(hosts, port, procedure, body, read_timeout):
341
    """Prepares requests by sorting offline hosts into separate list.
342

343
    @type body: dict
344
    @param body: a dictionary with per-host body data
345

346
    """
347
    results = {}
348
    requests = {}
349

    
350
    assert isinstance(body, dict)
351
    assert len(body) == len(hosts)
352
    assert compat.all(isinstance(v, str) for v in body.values())
353
    assert frozenset(map(compat.fst, hosts)) == frozenset(body.keys()), \
354
        "%s != %s" % (hosts, body.keys())
355

    
356
    for (name, ip) in hosts:
357
      if ip is _OFFLINE:
358
        # Node is marked as offline
359
        results[name] = RpcResult(node=name, offline=True, call=procedure)
360
      else:
361
        requests[name] = \
362
          http.client.HttpClientRequest(str(ip), port,
363
                                        http.HTTP_POST, str("/%s" % procedure),
364
                                        headers=_RPC_CLIENT_HEADERS,
365
                                        post_data=body[name],
366
                                        read_timeout=read_timeout,
367
                                        nicename="%s/%s" % (name, procedure),
368
                                        curl_config_fn=_ConfigRpcCurl)
369

    
370
    return (results, requests)
371

    
372
  @staticmethod
373
  def _CombineResults(results, requests, procedure):
374
    """Combines pre-computed results for offline hosts with actual call results.
375

376
    """
377
    for name, req in requests.items():
378
      if req.success and req.resp_status_code == http.HTTP_OK:
379
        host_result = RpcResult(data=serializer.LoadJson(req.resp_body),
380
                                node=name, call=procedure)
381
      else:
382
        # TODO: Better error reporting
383
        if req.error:
384
          msg = req.error
385
        else:
386
          msg = req.resp_body
387

    
388
        logging.error("RPC error in %s on node %s: %s", procedure, name, msg)
389
        host_result = RpcResult(data=msg, failed=True, node=name,
390
                                call=procedure)
391

    
392
      results[name] = host_result
393

    
394
    return results
395

    
396
  def __call__(self, hosts, procedure, body, read_timeout, resolver_opts,
397
               _req_process_fn=None):
398
    """Makes an RPC request to a number of nodes.
399

400
    @type hosts: sequence
401
    @param hosts: Hostnames
402
    @type procedure: string
403
    @param procedure: Request path
404
    @type body: dictionary
405
    @param body: dictionary with request bodies per host
406
    @type read_timeout: int or None
407
    @param read_timeout: Read timeout for request
408
    @rtype: dictionary
409
    @return: a dictionary mapping host names to rpc.RpcResult objects
410

411
    """
412
    assert read_timeout is not None, \
413
      "Missing RPC read timeout for procedure '%s'" % procedure
414

    
415
    if _req_process_fn is None:
416
      _req_process_fn = http.client.ProcessRequests
417

    
418
    (results, requests) = \
419
      self._PrepareRequests(self._resolver(hosts, resolver_opts), self._port,
420
                            procedure, body, read_timeout)
421

    
422
    _req_process_fn(requests.values(), lock_monitor_cb=self._lock_monitor_cb)
423

    
424
    assert not frozenset(results).intersection(requests)
425

    
426
    return self._CombineResults(results, requests, procedure)
427

    
428

    
429
class _RpcClientBase:
430
  def __init__(self, resolver, encoder_fn, lock_monitor_cb=None,
431
               _req_process_fn=None):
432
    """Initializes this class.
433

434
    """
435
    proc = _RpcProcessor(resolver,
436
                         netutils.GetDaemonPort(constants.NODED),
437
                         lock_monitor_cb=lock_monitor_cb)
438
    self._proc = compat.partial(proc, _req_process_fn=_req_process_fn)
439
    self._encoder = compat.partial(self._EncodeArg, encoder_fn)
440

    
441
  @staticmethod
442
  def _EncodeArg(encoder_fn, (argkind, value)):
443
    """Encode argument.
444

445
    """
446
    if argkind is None:
447
      return value
448
    else:
449
      return encoder_fn(argkind)(value)
450

    
451
  def _Call(self, cdef, node_list, args):
452
    """Entry point for automatically generated RPC wrappers.
453

454
    """
455
    (procedure, _, resolver_opts, timeout, argdefs,
456
     prep_fn, postproc_fn, _) = cdef
457

    
458
    if callable(timeout):
459
      read_timeout = timeout(args)
460
    else:
461
      read_timeout = timeout
462

    
463
    if callable(resolver_opts):
464
      req_resolver_opts = resolver_opts(args)
465
    else:
466
      req_resolver_opts = resolver_opts
467

    
468
    if len(args) != len(argdefs):
469
      raise errors.ProgrammerError("Number of passed arguments doesn't match")
470

    
471
    enc_args = map(self._encoder, zip(map(compat.snd, argdefs), args))
472
    if prep_fn is None:
473
      # for a no-op prep_fn, we serialise the body once, and then we
474
      # reuse it in the dictionary values
475
      body = serializer.DumpJson(enc_args)
476
      pnbody = dict((n, body) for n in node_list)
477
    else:
478
      # for a custom prep_fn, we pass the encoded arguments and the
479
      # node name to the prep_fn, and we serialise its return value
480
      assert callable(prep_fn)
481
      pnbody = dict((n, serializer.DumpJson(prep_fn(n, enc_args)))
482
                    for n in node_list)
483

    
484
    result = self._proc(node_list, procedure, pnbody, read_timeout,
485
                        req_resolver_opts)
486

    
487
    if postproc_fn:
488
      return dict(map(lambda (key, value): (key, postproc_fn(value)),
489
                      result.items()))
490
    else:
491
      return result
492

    
493

    
494
def _ObjectToDict(value):
495
  """Converts an object to a dictionary.
496

497
  @note: See L{objects}.
498

499
  """
500
  return value.ToDict()
501

    
502

    
503
def _ObjectListToDict(value):
504
  """Converts a list of L{objects} to dictionaries.
505

506
  """
507
  return map(_ObjectToDict, value)
508

    
509

    
510
def _EncodeNodeToDiskDict(value):
511
  """Encodes a dictionary with node name as key and disk objects as values.
512

513
  """
514
  return dict((name, _ObjectListToDict(disks))
515
              for name, disks in value.items())
516

    
517

    
518
def _PrepareFileUpload(getents_fn, filename):
519
  """Loads a file and prepares it for an upload to nodes.
520

521
  """
522
  statcb = utils.FileStatHelper()
523
  data = _Compress(utils.ReadFile(filename, preread=statcb))
524
  st = statcb.st
525

    
526
  if getents_fn is None:
527
    getents_fn = runtime.GetEnts
528

    
529
  getents = getents_fn()
530

    
531
  virt_filename = vcluster.MakeVirtualPath(filename)
532

    
533
  return [virt_filename, data, st.st_mode, getents.LookupUid(st.st_uid),
534
          getents.LookupGid(st.st_gid), st.st_atime, st.st_mtime]
535

    
536

    
537
def _PrepareFinalizeExportDisks(snap_disks):
538
  """Encodes disks for finalizing export.
539

540
  """
541
  flat_disks = []
542

    
543
  for disk in snap_disks:
544
    if isinstance(disk, bool):
545
      flat_disks.append(disk)
546
    else:
547
      flat_disks.append(disk.ToDict())
548

    
549
  return flat_disks
550

    
551

    
552
def _EncodeImportExportIO((ieio, ieioargs)):
553
  """Encodes import/export I/O information.
554

555
  """
556
  if ieio == constants.IEIO_RAW_DISK:
557
    assert len(ieioargs) == 1
558
    return (ieio, (ieioargs[0].ToDict(), ))
559

    
560
  if ieio == constants.IEIO_SCRIPT:
561
    assert len(ieioargs) == 2
562
    return (ieio, (ieioargs[0].ToDict(), ieioargs[1]))
563

    
564
  return (ieio, ieioargs)
565

    
566

    
567
def _EncodeBlockdevRename(value):
568
  """Encodes information for renaming block devices.
569

570
  """
571
  return [(d.ToDict(), uid) for d, uid in value]
572

    
573

    
574
def MakeLegacyNodeInfo(data, require_vg_info=True):
575
  """Formats the data returned by L{rpc.RpcRunner.call_node_info}.
576

577
  Converts the data into a single dictionary. This is fine for most use cases,
578
  but some require information from more than one volume group or hypervisor.
579

580
  @param require_vg_info: raise an error if the returnd vg_info
581
      doesn't have any values
582

583
  """
584
  (bootid, vgs_info, (hv_info, )) = data
585

    
586
  ret = utils.JoinDisjointDicts(hv_info, {"bootid": bootid})
587

    
588
  if require_vg_info or vgs_info:
589
    (vg0_info, ) = vgs_info
590
    ret = utils.JoinDisjointDicts(vg0_info, ret)
591

    
592
  return ret
593

    
594

    
595
def _AnnotateDParamsDRBD(disk, (drbd_params, data_params, meta_params)):
596
  """Annotates just DRBD disks layouts.
597

598
  """
599
  assert disk.dev_type == constants.LD_DRBD8
600

    
601
  disk.params = objects.FillDict(drbd_params, disk.params)
602
  (dev_data, dev_meta) = disk.children
603
  dev_data.params = objects.FillDict(data_params, dev_data.params)
604
  dev_meta.params = objects.FillDict(meta_params, dev_meta.params)
605

    
606
  return disk
607

    
608

    
609
def _AnnotateDParamsGeneric(disk, (params, )):
610
  """Generic disk parameter annotation routine.
611

612
  """
613
  assert disk.dev_type != constants.LD_DRBD8
614

    
615
  disk.params = objects.FillDict(params, disk.params)
616

    
617
  return disk
618

    
619

    
620
def AnnotateDiskParams(template, disks, disk_params):
621
  """Annotates the disk objects with the disk parameters.
622

623
  @param template: The disk template used
624
  @param disks: The list of disks objects to annotate
625
  @param disk_params: The disk paramaters for annotation
626
  @returns: A list of disk objects annotated
627

628
  """
629
  ld_params = objects.Disk.ComputeLDParams(template, disk_params)
630

    
631
  if template == constants.DT_DRBD8:
632
    annotation_fn = _AnnotateDParamsDRBD
633
  elif template == constants.DT_DISKLESS:
634
    annotation_fn = lambda disk, _: disk
635
  else:
636
    annotation_fn = _AnnotateDParamsGeneric
637

    
638
  return [annotation_fn(disk.Copy(), ld_params) for disk in disks]
639

    
640

    
641
def _GetESFlag(cfg, nodename):
642
  ni = cfg.GetNodeInfo(nodename)
643
  if ni is None:
644
    raise errors.OpPrereqError("Invalid node name %s" % nodename,
645
                               errors.ECODE_NOENT)
646
  return cfg.GetNdParams(ni)[constants.ND_EXCLUSIVE_STORAGE]
647

    
648

    
649
def GetExclusiveStorageForNodeNames(cfg, nodelist):
650
  """Return the exclusive storage flag for all the given nodes.
651

652
  @type cfg: L{config.ConfigWriter}
653
  @param cfg: cluster configuration
654
  @type nodelist: list or tuple
655
  @param nodelist: node names for which to read the flag
656
  @rtype: dict
657
  @return: mapping from node names to exclusive storage flags
658
  @raise errors.OpPrereqError: if any given node name has no corresponding node
659

660
  """
661
  getflag = lambda n: _GetESFlag(cfg, n)
662
  flags = map(getflag, nodelist)
663
  return dict(zip(nodelist, flags))
664

    
665

    
666
#: Generic encoders
667
_ENCODERS = {
668
  rpc_defs.ED_OBJECT_DICT: _ObjectToDict,
669
  rpc_defs.ED_OBJECT_DICT_LIST: _ObjectListToDict,
670
  rpc_defs.ED_NODE_TO_DISK_DICT: _EncodeNodeToDiskDict,
671
  rpc_defs.ED_COMPRESS: _Compress,
672
  rpc_defs.ED_FINALIZE_EXPORT_DISKS: _PrepareFinalizeExportDisks,
673
  rpc_defs.ED_IMPEXP_IO: _EncodeImportExportIO,
674
  rpc_defs.ED_BLOCKDEV_RENAME: _EncodeBlockdevRename,
675
  }
676

    
677

    
678
class RpcRunner(_RpcClientBase,
679
                _generated_rpc.RpcClientDefault,
680
                _generated_rpc.RpcClientBootstrap,
681
                _generated_rpc.RpcClientDnsOnly,
682
                _generated_rpc.RpcClientConfig):
683
  """RPC runner class.
684

685
  """
686
  def __init__(self, cfg, lock_monitor_cb, _req_process_fn=None, _getents=None):
687
    """Initialized the RPC runner.
688

689
    @type cfg: L{config.ConfigWriter}
690
    @param cfg: Configuration
691
    @type lock_monitor_cb: callable
692
    @param lock_monitor_cb: Lock monitor callback
693

694
    """
695
    self._cfg = cfg
696

    
697
    encoders = _ENCODERS.copy()
698

    
699
    encoders.update({
700
      # Encoders requiring configuration object
701
      rpc_defs.ED_INST_DICT: self._InstDict,
702
      rpc_defs.ED_INST_DICT_HVP_BEP_DP: self._InstDictHvpBepDp,
703
      rpc_defs.ED_INST_DICT_OSP_DP: self._InstDictOspDp,
704
      rpc_defs.ED_NIC_DICT: self._NicDict,
705

    
706
      # Encoders annotating disk parameters
707
      rpc_defs.ED_DISKS_DICT_DP: self._DisksDictDP,
708
      rpc_defs.ED_SINGLE_DISK_DICT_DP: self._SingleDiskDictDP,
709

    
710
      # Encoders with special requirements
711
      rpc_defs.ED_FILE_DETAILS: compat.partial(_PrepareFileUpload, _getents),
712
      })
713

    
714
    # Resolver using configuration
715
    resolver = compat.partial(_NodeConfigResolver, cfg.GetNodeInfo,
716
                              cfg.GetAllNodesInfo)
717

    
718
    # Pylint doesn't recognize multiple inheritance properly, see
719
    # <http://www.logilab.org/ticket/36586> and
720
    # <http://www.logilab.org/ticket/35642>
721
    # pylint: disable=W0233
722
    _RpcClientBase.__init__(self, resolver, encoders.get,
723
                            lock_monitor_cb=lock_monitor_cb,
724
                            _req_process_fn=_req_process_fn)
725
    _generated_rpc.RpcClientConfig.__init__(self)
726
    _generated_rpc.RpcClientBootstrap.__init__(self)
727
    _generated_rpc.RpcClientDnsOnly.__init__(self)
728
    _generated_rpc.RpcClientDefault.__init__(self)
729

    
730
  def _NicDict(self, nic):
731
    """Convert the given nic to a dict and encapsulate netinfo
732

733
    """
734
    n = copy.deepcopy(nic)
735
    if n.network:
736
      net_uuid = self._cfg.LookupNetwork(n.network)
737
      if net_uuid:
738
        nobj = self._cfg.GetNetwork(net_uuid)
739
        n.netinfo = objects.Network.ToDict(nobj)
740
    return n.ToDict()
741

    
742
  def _InstDict(self, instance, hvp=None, bep=None, osp=None):
743
    """Convert the given instance to a dict.
744

745
    This is done via the instance's ToDict() method and additionally
746
    we fill the hvparams with the cluster defaults.
747

748
    @type instance: L{objects.Instance}
749
    @param instance: an Instance object
750
    @type hvp: dict or None
751
    @param hvp: a dictionary with overridden hypervisor parameters
752
    @type bep: dict or None
753
    @param bep: a dictionary with overridden backend parameters
754
    @type osp: dict or None
755
    @param osp: a dictionary with overridden os parameters
756
    @rtype: dict
757
    @return: the instance dict, with the hvparams filled with the
758
        cluster defaults
759

760
    """
761
    idict = instance.ToDict()
762
    cluster = self._cfg.GetClusterInfo()
763
    idict["hvparams"] = cluster.FillHV(instance)
764
    if hvp is not None:
765
      idict["hvparams"].update(hvp)
766
    idict["beparams"] = cluster.FillBE(instance)
767
    if bep is not None:
768
      idict["beparams"].update(bep)
769
    idict["osparams"] = cluster.SimpleFillOS(instance.os, instance.osparams)
770
    if osp is not None:
771
      idict["osparams"].update(osp)
772
    idict["disks"] = self._DisksDictDP((instance.disks, instance))
773
    for nic in idict["nics"]:
774
      nic["nicparams"] = objects.FillDict(
775
        cluster.nicparams[constants.PP_DEFAULT],
776
        nic["nicparams"])
777
      network = nic.get("network", None)
778
      if network:
779
        net_uuid = self._cfg.LookupNetwork(network)
780
        if net_uuid:
781
          nobj = self._cfg.GetNetwork(net_uuid)
782
          nic["netinfo"] = objects.Network.ToDict(nobj)
783
    return idict
784

    
785
  def _InstDictHvpBepDp(self, (instance, hvp, bep)):
786
    """Wrapper for L{_InstDict}.
787

788
    """
789
    return self._InstDict(instance, hvp=hvp, bep=bep)
790

    
791
  def _InstDictOspDp(self, (instance, osparams)):
792
    """Wrapper for L{_InstDict}.
793

794
    """
795
    return self._InstDict(instance, osp=osparams)
796

    
797
  def _DisksDictDP(self, (disks, instance)):
798
    """Wrapper for L{AnnotateDiskParams}.
799

800
    """
801
    diskparams = self._cfg.GetInstanceDiskParams(instance)
802
    return [disk.ToDict()
803
            for disk in AnnotateDiskParams(instance.disk_template,
804
                                           disks, diskparams)]
805

    
806
  def _SingleDiskDictDP(self, (disk, instance)):
807
    """Wrapper for L{AnnotateDiskParams}.
808

809
    """
810
    (anno_disk,) = self._DisksDictDP(([disk], instance))
811
    return anno_disk
812

    
813

    
814
class JobQueueRunner(_RpcClientBase, _generated_rpc.RpcClientJobQueue):
815
  """RPC wrappers for job queue.
816

817
  """
818
  def __init__(self, context, address_list):
819
    """Initializes this class.
820

821
    """
822
    if address_list is None:
823
      resolver = compat.partial(_SsconfResolver, True)
824
    else:
825
      # Caller provided an address list
826
      resolver = _StaticResolver(address_list)
827

    
828
    _RpcClientBase.__init__(self, resolver, _ENCODERS.get,
829
                            lock_monitor_cb=context.glm.AddToLockMonitor)
830
    _generated_rpc.RpcClientJobQueue.__init__(self)
831

    
832

    
833
class BootstrapRunner(_RpcClientBase,
834
                      _generated_rpc.RpcClientBootstrap,
835
                      _generated_rpc.RpcClientDnsOnly):
836
  """RPC wrappers for bootstrapping.
837

838
  """
839
  def __init__(self):
840
    """Initializes this class.
841

842
    """
843
    # Pylint doesn't recognize multiple inheritance properly, see
844
    # <http://www.logilab.org/ticket/36586> and
845
    # <http://www.logilab.org/ticket/35642>
846
    # pylint: disable=W0233
847
    _RpcClientBase.__init__(self, compat.partial(_SsconfResolver, True),
848
                            _ENCODERS.get)
849
    _generated_rpc.RpcClientBootstrap.__init__(self)
850
    _generated_rpc.RpcClientDnsOnly.__init__(self)
851

    
852

    
853
class DnsOnlyRunner(_RpcClientBase, _generated_rpc.RpcClientDnsOnly):
854
  """RPC wrappers for calls using only DNS.
855

856
  """
857
  def __init__(self):
858
    """Initialize this class.
859

860
    """
861
    _RpcClientBase.__init__(self, compat.partial(_SsconfResolver, False),
862
                            _ENCODERS.get)
863
    _generated_rpc.RpcClientDnsOnly.__init__(self)
864

    
865

    
866
class ConfigRunner(_RpcClientBase, _generated_rpc.RpcClientConfig):
867
  """RPC wrappers for L{config}.
868

869
  """
870
  def __init__(self, context, address_list, _req_process_fn=None,
871
               _getents=None):
872
    """Initializes this class.
873

874
    """
875
    if context:
876
      lock_monitor_cb = context.glm.AddToLockMonitor
877
    else:
878
      lock_monitor_cb = None
879

    
880
    if address_list is None:
881
      resolver = compat.partial(_SsconfResolver, True)
882
    else:
883
      # Caller provided an address list
884
      resolver = _StaticResolver(address_list)
885

    
886
    encoders = _ENCODERS.copy()
887

    
888
    encoders.update({
889
      rpc_defs.ED_FILE_DETAILS: compat.partial(_PrepareFileUpload, _getents),
890
      })
891

    
892
    _RpcClientBase.__init__(self, resolver, encoders.get,
893
                            lock_monitor_cb=lock_monitor_cb,
894
                            _req_process_fn=_req_process_fn)
895
    _generated_rpc.RpcClientConfig.__init__(self)