Statistics
| Branch: | Tag: | Revision:

root / lib / rpc.py @ f92ed8ab

History | View | Annotate | Download (24.4 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Inter-node RPC library.
23

24
"""
25

    
26
# pylint: disable=C0103,R0201,R0904
27
# C0103: Invalid name, since call_ are not valid
28
# R0201: Method could be a function, we keep all rpcs instance methods
29
# as not to change them back and forth between static/instance methods
30
# if they need to start using instance attributes
31
# R0904: Too many public methods
32

    
33
import logging
34
import zlib
35
import base64
36
import pycurl
37
import threading
38

    
39
from ganeti import utils
40
from ganeti import objects
41
from ganeti import http
42
from ganeti import serializer
43
from ganeti import constants
44
from ganeti import errors
45
from ganeti import netutils
46
from ganeti import ssconf
47
from ganeti import runtime
48
from ganeti import compat
49
from ganeti import rpc_defs
50
from ganeti import pathutils
51
from ganeti import vcluster
52

    
53
# Special module generated at build time
54
from ganeti import _generated_rpc
55

    
56
# pylint has a bug here, doesn't see this import
57
import ganeti.http.client  # pylint: disable=W0611
58

    
59

    
60
_RPC_CLIENT_HEADERS = [
61
  "Content-type: %s" % http.HTTP_APP_JSON,
62
  "Expect:",
63
  ]
64

    
65
#: Special value to describe an offline host
66
_OFFLINE = object()
67

    
68

    
69
def Init():
70
  """Initializes the module-global HTTP client manager.
71

72
  Must be called before using any RPC function and while exactly one thread is
73
  running.
74

75
  """
76
  # curl_global_init(3) and curl_global_cleanup(3) must be called with only
77
  # one thread running. This check is just a safety measure -- it doesn't
78
  # cover all cases.
79
  assert threading.activeCount() == 1, \
80
         "Found more than one active thread when initializing pycURL"
81

    
82
  logging.info("Using PycURL %s", pycurl.version)
83

    
84
  pycurl.global_init(pycurl.GLOBAL_ALL)
85

    
86

    
87
def Shutdown():
88
  """Stops the module-global HTTP client manager.
89

90
  Must be called before quitting the program and while exactly one thread is
91
  running.
92

93
  """
94
  pycurl.global_cleanup()
95

    
96

    
97
def _ConfigRpcCurl(curl):
98
  noded_cert = str(pathutils.NODED_CERT_FILE)
99

    
100
  curl.setopt(pycurl.FOLLOWLOCATION, False)
101
  curl.setopt(pycurl.CAINFO, noded_cert)
102
  curl.setopt(pycurl.SSL_VERIFYHOST, 0)
103
  curl.setopt(pycurl.SSL_VERIFYPEER, True)
104
  curl.setopt(pycurl.SSLCERTTYPE, "PEM")
105
  curl.setopt(pycurl.SSLCERT, noded_cert)
106
  curl.setopt(pycurl.SSLKEYTYPE, "PEM")
107
  curl.setopt(pycurl.SSLKEY, noded_cert)
108
  curl.setopt(pycurl.CONNECTTIMEOUT, constants.RPC_CONNECT_TIMEOUT)
109

    
110

    
111
def RunWithRPC(fn):
112
  """RPC-wrapper decorator.
113

114
  When applied to a function, it runs it with the RPC system
115
  initialized, and it shutsdown the system afterwards. This means the
116
  function must be called without RPC being initialized.
117

118
  """
119
  def wrapper(*args, **kwargs):
120
    Init()
121
    try:
122
      return fn(*args, **kwargs)
123
    finally:
124
      Shutdown()
125
  return wrapper
126

    
127

    
128
def _Compress(data):
129
  """Compresses a string for transport over RPC.
130

131
  Small amounts of data are not compressed.
132

133
  @type data: str
134
  @param data: Data
135
  @rtype: tuple
136
  @return: Encoded data to send
137

138
  """
139
  # Small amounts of data are not compressed
140
  if len(data) < 512:
141
    return (constants.RPC_ENCODING_NONE, data)
142

    
143
  # Compress with zlib and encode in base64
144
  return (constants.RPC_ENCODING_ZLIB_BASE64,
145
          base64.b64encode(zlib.compress(data, 3)))
146

    
147

    
148
class RpcResult(object):
149
  """RPC Result class.
150

151
  This class holds an RPC result. It is needed since in multi-node
152
  calls we can't raise an exception just because one one out of many
153
  failed, and therefore we use this class to encapsulate the result.
154

155
  @ivar data: the data payload, for successful results, or None
156
  @ivar call: the name of the RPC call
157
  @ivar node: the name of the node to which we made the call
158
  @ivar offline: whether the operation failed because the node was
159
      offline, as opposed to actual failure; offline=True will always
160
      imply failed=True, in order to allow simpler checking if
161
      the user doesn't care about the exact failure mode
162
  @ivar fail_msg: the error message if the call failed
163

164
  """
165
  def __init__(self, data=None, failed=False, offline=False,
166
               call=None, node=None):
167
    self.offline = offline
168
    self.call = call
169
    self.node = node
170

    
171
    if offline:
172
      self.fail_msg = "Node is marked offline"
173
      self.data = self.payload = None
174
    elif failed:
175
      self.fail_msg = self._EnsureErr(data)
176
      self.data = self.payload = None
177
    else:
178
      self.data = data
179
      if not isinstance(self.data, (tuple, list)):
180
        self.fail_msg = ("RPC layer error: invalid result type (%s)" %
181
                         type(self.data))
182
        self.payload = None
183
      elif len(data) != 2:
184
        self.fail_msg = ("RPC layer error: invalid result length (%d), "
185
                         "expected 2" % len(self.data))
186
        self.payload = None
187
      elif not self.data[0]:
188
        self.fail_msg = self._EnsureErr(self.data[1])
189
        self.payload = None
190
      else:
191
        # finally success
192
        self.fail_msg = None
193
        self.payload = data[1]
194

    
195
    for attr_name in ["call", "data", "fail_msg",
196
                      "node", "offline", "payload"]:
197
      assert hasattr(self, attr_name), "Missing attribute %s" % attr_name
198

    
199
  @staticmethod
200
  def _EnsureErr(val):
201
    """Helper to ensure we return a 'True' value for error."""
202
    if val:
203
      return val
204
    else:
205
      return "No error information"
206

    
207
  def Raise(self, msg, prereq=False, ecode=None):
208
    """If the result has failed, raise an OpExecError.
209

210
    This is used so that LU code doesn't have to check for each
211
    result, but instead can call this function.
212

213
    """
214
    if not self.fail_msg:
215
      return
216

    
217
    if not msg: # one could pass None for default message
218
      msg = ("Call '%s' to node '%s' has failed: %s" %
219
             (self.call, self.node, self.fail_msg))
220
    else:
221
      msg = "%s: %s" % (msg, self.fail_msg)
222
    if prereq:
223
      ec = errors.OpPrereqError
224
    else:
225
      ec = errors.OpExecError
226
    if ecode is not None:
227
      args = (msg, ecode)
228
    else:
229
      args = (msg, )
230
    raise ec(*args) # pylint: disable=W0142
231

    
232

    
233
def _SsconfResolver(ssconf_ips, node_list, _,
234
                    ssc=ssconf.SimpleStore,
235
                    nslookup_fn=netutils.Hostname.GetIP):
236
  """Return addresses for given node names.
237

238
  @type ssconf_ips: bool
239
  @param ssconf_ips: Use the ssconf IPs
240
  @type node_list: list
241
  @param node_list: List of node names
242
  @type ssc: class
243
  @param ssc: SimpleStore class that is used to obtain node->ip mappings
244
  @type nslookup_fn: callable
245
  @param nslookup_fn: function use to do NS lookup
246
  @rtype: list of tuple; (string, string)
247
  @return: List of tuples containing node name and IP address
248

249
  """
250
  ss = ssc()
251
  family = ss.GetPrimaryIPFamily()
252

    
253
  if ssconf_ips:
254
    iplist = ss.GetNodePrimaryIPList()
255
    ipmap = dict(entry.split() for entry in iplist)
256
  else:
257
    ipmap = {}
258

    
259
  result = []
260
  for node in node_list:
261
    ip = ipmap.get(node)
262
    if ip is None:
263
      ip = nslookup_fn(node, family=family)
264
    result.append((node, ip))
265

    
266
  return result
267

    
268

    
269
class _StaticResolver:
270
  def __init__(self, addresses):
271
    """Initializes this class.
272

273
    """
274
    self._addresses = addresses
275

    
276
  def __call__(self, hosts, _):
277
    """Returns static addresses for hosts.
278

279
    """
280
    assert len(hosts) == len(self._addresses)
281
    return zip(hosts, self._addresses)
282

    
283

    
284
def _CheckConfigNode(name, node, accept_offline_node):
285
  """Checks if a node is online.
286

287
  @type name: string
288
  @param name: Node name
289
  @type node: L{objects.Node} or None
290
  @param node: Node object
291

292
  """
293
  if node is None:
294
    # Depend on DNS for name resolution
295
    ip = name
296
  elif node.offline and not accept_offline_node:
297
    ip = _OFFLINE
298
  else:
299
    ip = node.primary_ip
300
  return (name, ip)
301

    
302

    
303
def _NodeConfigResolver(single_node_fn, all_nodes_fn, hosts, opts):
304
  """Calculate node addresses using configuration.
305

306
  """
307
  accept_offline_node = (opts is rpc_defs.ACCEPT_OFFLINE_NODE)
308

    
309
  assert accept_offline_node or opts is None, "Unknown option"
310

    
311
  # Special case for single-host lookups
312
  if len(hosts) == 1:
313
    (name, ) = hosts
314
    return [_CheckConfigNode(name, single_node_fn(name), accept_offline_node)]
315
  else:
316
    all_nodes = all_nodes_fn()
317
    return [_CheckConfigNode(name, all_nodes.get(name, None),
318
                             accept_offline_node)
319
            for name in hosts]
320

    
321

    
322
class _RpcProcessor:
323
  def __init__(self, resolver, port, lock_monitor_cb=None):
324
    """Initializes this class.
325

326
    @param resolver: callable accepting a list of hostnames, returning a list
327
      of tuples containing name and IP address (IP address can be the name or
328
      the special value L{_OFFLINE} to mark offline machines)
329
    @type port: int
330
    @param port: TCP port
331
    @param lock_monitor_cb: Callable for registering with lock monitor
332

333
    """
334
    self._resolver = resolver
335
    self._port = port
336
    self._lock_monitor_cb = lock_monitor_cb
337

    
338
  @staticmethod
339
  def _PrepareRequests(hosts, port, procedure, body, read_timeout):
340
    """Prepares requests by sorting offline hosts into separate list.
341

342
    @type body: dict
343
    @param body: a dictionary with per-host body data
344

345
    """
346
    results = {}
347
    requests = {}
348

    
349
    assert isinstance(body, dict)
350
    assert len(body) == len(hosts)
351
    assert compat.all(isinstance(v, str) for v in body.values())
352
    assert frozenset(map(compat.fst, hosts)) == frozenset(body.keys()), \
353
        "%s != %s" % (hosts, body.keys())
354

    
355
    for (name, ip) in hosts:
356
      if ip is _OFFLINE:
357
        # Node is marked as offline
358
        results[name] = RpcResult(node=name, offline=True, call=procedure)
359
      else:
360
        requests[name] = \
361
          http.client.HttpClientRequest(str(ip), port,
362
                                        http.HTTP_POST, str("/%s" % procedure),
363
                                        headers=_RPC_CLIENT_HEADERS,
364
                                        post_data=body[name],
365
                                        read_timeout=read_timeout,
366
                                        nicename="%s/%s" % (name, procedure),
367
                                        curl_config_fn=_ConfigRpcCurl)
368

    
369
    return (results, requests)
370

    
371
  @staticmethod
372
  def _CombineResults(results, requests, procedure):
373
    """Combines pre-computed results for offline hosts with actual call results.
374

375
    """
376
    for name, req in requests.items():
377
      if req.success and req.resp_status_code == http.HTTP_OK:
378
        host_result = RpcResult(data=serializer.LoadJson(req.resp_body),
379
                                node=name, call=procedure)
380
      else:
381
        # TODO: Better error reporting
382
        if req.error:
383
          msg = req.error
384
        else:
385
          msg = req.resp_body
386

    
387
        logging.error("RPC error in %s on node %s: %s", procedure, name, msg)
388
        host_result = RpcResult(data=msg, failed=True, node=name,
389
                                call=procedure)
390

    
391
      results[name] = host_result
392

    
393
    return results
394

    
395
  def __call__(self, hosts, procedure, body, read_timeout, resolver_opts,
396
               _req_process_fn=None):
397
    """Makes an RPC request to a number of nodes.
398

399
    @type hosts: sequence
400
    @param hosts: Hostnames
401
    @type procedure: string
402
    @param procedure: Request path
403
    @type body: dictionary
404
    @param body: dictionary with request bodies per host
405
    @type read_timeout: int or None
406
    @param read_timeout: Read timeout for request
407

408
    """
409
    assert read_timeout is not None, \
410
      "Missing RPC read timeout for procedure '%s'" % procedure
411

    
412
    if _req_process_fn is None:
413
      _req_process_fn = http.client.ProcessRequests
414

    
415
    (results, requests) = \
416
      self._PrepareRequests(self._resolver(hosts, resolver_opts), self._port,
417
                            procedure, body, read_timeout)
418

    
419
    _req_process_fn(requests.values(), lock_monitor_cb=self._lock_monitor_cb)
420

    
421
    assert not frozenset(results).intersection(requests)
422

    
423
    return self._CombineResults(results, requests, procedure)
424

    
425

    
426
class _RpcClientBase:
427
  def __init__(self, resolver, encoder_fn, lock_monitor_cb=None,
428
               _req_process_fn=None):
429
    """Initializes this class.
430

431
    """
432
    proc = _RpcProcessor(resolver,
433
                         netutils.GetDaemonPort(constants.NODED),
434
                         lock_monitor_cb=lock_monitor_cb)
435
    self._proc = compat.partial(proc, _req_process_fn=_req_process_fn)
436
    self._encoder = compat.partial(self._EncodeArg, encoder_fn)
437

    
438
  @staticmethod
439
  def _EncodeArg(encoder_fn, (argkind, value)):
440
    """Encode argument.
441

442
    """
443
    if argkind is None:
444
      return value
445
    else:
446
      return encoder_fn(argkind)(value)
447

    
448
  def _Call(self, cdef, node_list, args):
449
    """Entry point for automatically generated RPC wrappers.
450

451
    """
452
    (procedure, _, resolver_opts, timeout, argdefs,
453
     prep_fn, postproc_fn, _) = cdef
454

    
455
    if callable(timeout):
456
      read_timeout = timeout(args)
457
    else:
458
      read_timeout = timeout
459

    
460
    if callable(resolver_opts):
461
      req_resolver_opts = resolver_opts(args)
462
    else:
463
      req_resolver_opts = resolver_opts
464

    
465
    if len(args) != len(argdefs):
466
      raise errors.ProgrammerError("Number of passed arguments doesn't match")
467

    
468
    enc_args = map(self._encoder, zip(map(compat.snd, argdefs), args))
469
    if prep_fn is None:
470
      # for a no-op prep_fn, we serialise the body once, and then we
471
      # reuse it in the dictionary values
472
      body = serializer.DumpJson(enc_args)
473
      pnbody = dict((n, body) for n in node_list)
474
    else:
475
      # for a custom prep_fn, we pass the encoded arguments and the
476
      # node name to the prep_fn, and we serialise its return value
477
      assert callable(prep_fn)
478
      pnbody = dict((n, serializer.DumpJson(prep_fn(n, enc_args)))
479
                    for n in node_list)
480

    
481
    result = self._proc(node_list, procedure, pnbody, read_timeout,
482
                        req_resolver_opts)
483

    
484
    if postproc_fn:
485
      return dict(map(lambda (key, value): (key, postproc_fn(value)),
486
                      result.items()))
487
    else:
488
      return result
489

    
490

    
491
def _ObjectToDict(value):
492
  """Converts an object to a dictionary.
493

494
  @note: See L{objects}.
495

496
  """
497
  return value.ToDict()
498

    
499

    
500
def _ObjectListToDict(value):
501
  """Converts a list of L{objects} to dictionaries.
502

503
  """
504
  return map(_ObjectToDict, value)
505

    
506

    
507
def _EncodeNodeToDiskDict(value):
508
  """Encodes a dictionary with node name as key and disk objects as values.
509

510
  """
511
  return dict((name, _ObjectListToDict(disks))
512
              for name, disks in value.items())
513

    
514

    
515
def _PrepareFileUpload(getents_fn, filename):
516
  """Loads a file and prepares it for an upload to nodes.
517

518
  """
519
  statcb = utils.FileStatHelper()
520
  data = _Compress(utils.ReadFile(filename, preread=statcb))
521
  st = statcb.st
522

    
523
  if getents_fn is None:
524
    getents_fn = runtime.GetEnts
525

    
526
  getents = getents_fn()
527

    
528
  virt_filename = vcluster.MakeVirtualPath(filename)
529

    
530
  return [virt_filename, data, st.st_mode, getents.LookupUid(st.st_uid),
531
          getents.LookupGid(st.st_gid), st.st_atime, st.st_mtime]
532

    
533

    
534
def _PrepareFinalizeExportDisks(snap_disks):
535
  """Encodes disks for finalizing export.
536

537
  """
538
  flat_disks = []
539

    
540
  for disk in snap_disks:
541
    if isinstance(disk, bool):
542
      flat_disks.append(disk)
543
    else:
544
      flat_disks.append(disk.ToDict())
545

    
546
  return flat_disks
547

    
548

    
549
def _EncodeImportExportIO((ieio, ieioargs)):
550
  """Encodes import/export I/O information.
551

552
  """
553
  if ieio == constants.IEIO_RAW_DISK:
554
    assert len(ieioargs) == 1
555
    return (ieio, (ieioargs[0].ToDict(), ))
556

    
557
  if ieio == constants.IEIO_SCRIPT:
558
    assert len(ieioargs) == 2
559
    return (ieio, (ieioargs[0].ToDict(), ieioargs[1]))
560

    
561
  return (ieio, ieioargs)
562

    
563

    
564
def _EncodeBlockdevRename(value):
565
  """Encodes information for renaming block devices.
566

567
  """
568
  return [(d.ToDict(), uid) for d, uid in value]
569

    
570

    
571
def MakeLegacyNodeInfo(data):
572
  """Formats the data returned by L{rpc.RpcRunner.call_node_info}.
573

574
  Converts the data into a single dictionary. This is fine for most use cases,
575
  but some require information from more than one volume group or hypervisor.
576

577
  """
578
  (bootid, (vg_info, ), (hv_info, )) = data
579

    
580
  return utils.JoinDisjointDicts(utils.JoinDisjointDicts(vg_info, hv_info), {
581
    "bootid": bootid,
582
    })
583

    
584

    
585
def _AnnotateDParamsDRBD(disk, (drbd_params, data_params, meta_params)):
586
  """Annotates just DRBD disks layouts.
587

588
  """
589
  assert disk.dev_type == constants.LD_DRBD8
590

    
591
  disk.params = objects.FillDict(drbd_params, disk.params)
592
  (dev_data, dev_meta) = disk.children
593
  dev_data.params = objects.FillDict(data_params, dev_data.params)
594
  dev_meta.params = objects.FillDict(meta_params, dev_meta.params)
595

    
596
  return disk
597

    
598

    
599
def _AnnotateDParamsGeneric(disk, (params, )):
600
  """Generic disk parameter annotation routine.
601

602
  """
603
  assert disk.dev_type != constants.LD_DRBD8
604

    
605
  disk.params = objects.FillDict(params, disk.params)
606

    
607
  return disk
608

    
609

    
610
def AnnotateDiskParams(template, disks, disk_params):
611
  """Annotates the disk objects with the disk parameters.
612

613
  @param template: The disk template used
614
  @param disks: The list of disks objects to annotate
615
  @param disk_params: The disk paramaters for annotation
616
  @returns: A list of disk objects annotated
617

618
  """
619
  ld_params = objects.Disk.ComputeLDParams(template, disk_params)
620

    
621
  if template == constants.DT_DRBD8:
622
    annotation_fn = _AnnotateDParamsDRBD
623
  elif template == constants.DT_DISKLESS:
624
    annotation_fn = lambda disk, _: disk
625
  else:
626
    annotation_fn = _AnnotateDParamsGeneric
627

    
628
  return [annotation_fn(disk.Copy(), ld_params) for disk in disks]
629

    
630

    
631
#: Generic encoders
632
_ENCODERS = {
633
  rpc_defs.ED_OBJECT_DICT: _ObjectToDict,
634
  rpc_defs.ED_OBJECT_DICT_LIST: _ObjectListToDict,
635
  rpc_defs.ED_NODE_TO_DISK_DICT: _EncodeNodeToDiskDict,
636
  rpc_defs.ED_COMPRESS: _Compress,
637
  rpc_defs.ED_FINALIZE_EXPORT_DISKS: _PrepareFinalizeExportDisks,
638
  rpc_defs.ED_IMPEXP_IO: _EncodeImportExportIO,
639
  rpc_defs.ED_BLOCKDEV_RENAME: _EncodeBlockdevRename,
640
  }
641

    
642

    
643
class RpcRunner(_RpcClientBase,
644
                _generated_rpc.RpcClientDefault,
645
                _generated_rpc.RpcClientBootstrap,
646
                _generated_rpc.RpcClientDnsOnly,
647
                _generated_rpc.RpcClientConfig):
648
  """RPC runner class.
649

650
  """
651
  def __init__(self, cfg, lock_monitor_cb, _req_process_fn=None, _getents=None):
652
    """Initialized the RPC runner.
653

654
    @type cfg: L{config.ConfigWriter}
655
    @param cfg: Configuration
656
    @type lock_monitor_cb: callable
657
    @param lock_monitor_cb: Lock monitor callback
658

659
    """
660
    self._cfg = cfg
661

    
662
    encoders = _ENCODERS.copy()
663

    
664
    encoders.update({
665
      # Encoders requiring configuration object
666
      rpc_defs.ED_INST_DICT: self._InstDict,
667
      rpc_defs.ED_INST_DICT_HVP_BEP_DP: self._InstDictHvpBepDp,
668
      rpc_defs.ED_INST_DICT_OSP_DP: self._InstDictOspDp,
669

    
670
      # Encoders annotating disk parameters
671
      rpc_defs.ED_DISKS_DICT_DP: self._DisksDictDP,
672
      rpc_defs.ED_SINGLE_DISK_DICT_DP: self._SingleDiskDictDP,
673

    
674
      # Encoders with special requirements
675
      rpc_defs.ED_FILE_DETAILS: compat.partial(_PrepareFileUpload, _getents),
676
      })
677

    
678
    # Resolver using configuration
679
    resolver = compat.partial(_NodeConfigResolver, cfg.GetNodeInfo,
680
                              cfg.GetAllNodesInfo)
681

    
682
    # Pylint doesn't recognize multiple inheritance properly, see
683
    # <http://www.logilab.org/ticket/36586> and
684
    # <http://www.logilab.org/ticket/35642>
685
    # pylint: disable=W0233
686
    _RpcClientBase.__init__(self, resolver, encoders.get,
687
                            lock_monitor_cb=lock_monitor_cb,
688
                            _req_process_fn=_req_process_fn)
689
    _generated_rpc.RpcClientConfig.__init__(self)
690
    _generated_rpc.RpcClientBootstrap.__init__(self)
691
    _generated_rpc.RpcClientDnsOnly.__init__(self)
692
    _generated_rpc.RpcClientDefault.__init__(self)
693

    
694
  def _InstDict(self, instance, hvp=None, bep=None, osp=None):
695
    """Convert the given instance to a dict.
696

697
    This is done via the instance's ToDict() method and additionally
698
    we fill the hvparams with the cluster defaults.
699

700
    @type instance: L{objects.Instance}
701
    @param instance: an Instance object
702
    @type hvp: dict or None
703
    @param hvp: a dictionary with overridden hypervisor parameters
704
    @type bep: dict or None
705
    @param bep: a dictionary with overridden backend parameters
706
    @type osp: dict or None
707
    @param osp: a dictionary with overridden os parameters
708
    @rtype: dict
709
    @return: the instance dict, with the hvparams filled with the
710
        cluster defaults
711

712
    """
713
    idict = instance.ToDict()
714
    cluster = self._cfg.GetClusterInfo()
715
    idict["hvparams"] = cluster.FillHV(instance)
716
    if hvp is not None:
717
      idict["hvparams"].update(hvp)
718
    idict["beparams"] = cluster.FillBE(instance)
719
    if bep is not None:
720
      idict["beparams"].update(bep)
721
    idict["osparams"] = cluster.SimpleFillOS(instance.os, instance.osparams)
722
    if osp is not None:
723
      idict["osparams"].update(osp)
724
    for nic in idict["nics"]:
725
      nic["nicparams"] = objects.FillDict(
726
        cluster.nicparams[constants.PP_DEFAULT],
727
        nic["nicparams"])
728
    idict["disks"] = self._DisksDictDP((instance.disks, instance))
729
    return idict
730

    
731
  def _InstDictHvpBepDp(self, (instance, hvp, bep)):
732
    """Wrapper for L{_InstDict}.
733

734
    """
735
    return self._InstDict(instance, hvp=hvp, bep=bep)
736

    
737
  def _InstDictOspDp(self, (instance, osparams)):
738
    """Wrapper for L{_InstDict}.
739

740
    """
741
    return self._InstDict(instance, osp=osparams)
742

    
743
  def _DisksDictDP(self, (disks, instance)):
744
    """Wrapper for L{AnnotateDiskParams}.
745

746
    """
747
    diskparams = self._cfg.GetInstanceDiskParams(instance)
748
    return [disk.ToDict()
749
            for disk in AnnotateDiskParams(instance.disk_template,
750
                                           disks, diskparams)]
751

    
752
  def _SingleDiskDictDP(self, (disk, instance)):
753
    """Wrapper for L{AnnotateDiskParams}.
754

755
    """
756
    (anno_disk,) = self._DisksDictDP(([disk], instance))
757
    return anno_disk
758

    
759

    
760
class JobQueueRunner(_RpcClientBase, _generated_rpc.RpcClientJobQueue):
761
  """RPC wrappers for job queue.
762

763
  """
764
  def __init__(self, context, address_list):
765
    """Initializes this class.
766

767
    """
768
    if address_list is None:
769
      resolver = compat.partial(_SsconfResolver, True)
770
    else:
771
      # Caller provided an address list
772
      resolver = _StaticResolver(address_list)
773

    
774
    _RpcClientBase.__init__(self, resolver, _ENCODERS.get,
775
                            lock_monitor_cb=context.glm.AddToLockMonitor)
776
    _generated_rpc.RpcClientJobQueue.__init__(self)
777

    
778

    
779
class BootstrapRunner(_RpcClientBase,
780
                      _generated_rpc.RpcClientBootstrap,
781
                      _generated_rpc.RpcClientDnsOnly):
782
  """RPC wrappers for bootstrapping.
783

784
  """
785
  def __init__(self):
786
    """Initializes this class.
787

788
    """
789
    # Pylint doesn't recognize multiple inheritance properly, see
790
    # <http://www.logilab.org/ticket/36586> and
791
    # <http://www.logilab.org/ticket/35642>
792
    # pylint: disable=W0233
793
    _RpcClientBase.__init__(self, compat.partial(_SsconfResolver, True),
794
                            _ENCODERS.get)
795
    _generated_rpc.RpcClientBootstrap.__init__(self)
796
    _generated_rpc.RpcClientDnsOnly.__init__(self)
797

    
798

    
799
class DnsOnlyRunner(_RpcClientBase, _generated_rpc.RpcClientDnsOnly):
800
  """RPC wrappers for calls using only DNS.
801

802
  """
803
  def __init__(self):
804
    """Initialize this class.
805

806
    """
807
    _RpcClientBase.__init__(self, compat.partial(_SsconfResolver, False),
808
                            _ENCODERS.get)
809
    _generated_rpc.RpcClientDnsOnly.__init__(self)
810

    
811

    
812
class ConfigRunner(_RpcClientBase, _generated_rpc.RpcClientConfig):
813
  """RPC wrappers for L{config}.
814

815
  """
816
  def __init__(self, context, address_list, _req_process_fn=None,
817
               _getents=None):
818
    """Initializes this class.
819

820
    """
821
    if context:
822
      lock_monitor_cb = context.glm.AddToLockMonitor
823
    else:
824
      lock_monitor_cb = None
825

    
826
    if address_list is None:
827
      resolver = compat.partial(_SsconfResolver, True)
828
    else:
829
      # Caller provided an address list
830
      resolver = _StaticResolver(address_list)
831

    
832
    encoders = _ENCODERS.copy()
833

    
834
    encoders.update({
835
      rpc_defs.ED_FILE_DETAILS: compat.partial(_PrepareFileUpload, _getents),
836
      })
837

    
838
    _RpcClientBase.__init__(self, resolver, encoders.get,
839
                            lock_monitor_cb=lock_monitor_cb,
840
                            _req_process_fn=_req_process_fn)
841
    _generated_rpc.RpcClientConfig.__init__(self)