Statistics
| Branch: | Tag: | Revision:

root / lib / rpc.py @ b8291e00

History | View | Annotate | Download (24.2 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Inter-node RPC library.
23

24
"""
25

    
26
# pylint: disable=C0103,R0201,R0904
27
# C0103: Invalid name, since call_ are not valid
28
# R0201: Method could be a function, we keep all rpcs instance methods
29
# as not to change them back and forth between static/instance methods
30
# if they need to start using instance attributes
31
# R0904: Too many public methods
32

    
33
import logging
34
import zlib
35
import base64
36
import pycurl
37
import threading
38

    
39
from ganeti import utils
40
from ganeti import objects
41
from ganeti import http
42
from ganeti import serializer
43
from ganeti import constants
44
from ganeti import errors
45
from ganeti import netutils
46
from ganeti import ssconf
47
from ganeti import runtime
48
from ganeti import compat
49
from ganeti import rpc_defs
50

    
51
# Special module generated at build time
52
from ganeti import _generated_rpc
53

    
54
# pylint has a bug here, doesn't see this import
55
import ganeti.http.client  # pylint: disable=W0611
56

    
57

    
58
# Timeout for connecting to nodes (seconds)
59
_RPC_CONNECT_TIMEOUT = 5
60

    
61
_RPC_CLIENT_HEADERS = [
62
  "Content-type: %s" % http.HTTP_APP_JSON,
63
  "Expect:",
64
  ]
65

    
66
# Various time constants for the timeout table
67
_TMO_URGENT = 60 # one minute
68
_TMO_FAST = 5 * 60 # five minutes
69
_TMO_NORMAL = 15 * 60 # 15 minutes
70
_TMO_SLOW = 3600 # one hour
71
_TMO_4HRS = 4 * 3600
72
_TMO_1DAY = 86400
73

    
74
#: Special value to describe an offline host
75
_OFFLINE = object()
76

    
77

    
78
def Init():
79
  """Initializes the module-global HTTP client manager.
80

81
  Must be called before using any RPC function and while exactly one thread is
82
  running.
83

84
  """
85
  # curl_global_init(3) and curl_global_cleanup(3) must be called with only
86
  # one thread running. This check is just a safety measure -- it doesn't
87
  # cover all cases.
88
  assert threading.activeCount() == 1, \
89
         "Found more than one active thread when initializing pycURL"
90

    
91
  logging.info("Using PycURL %s", pycurl.version)
92

    
93
  pycurl.global_init(pycurl.GLOBAL_ALL)
94

    
95

    
96
def Shutdown():
97
  """Stops the module-global HTTP client manager.
98

99
  Must be called before quitting the program and while exactly one thread is
100
  running.
101

102
  """
103
  pycurl.global_cleanup()
104

    
105

    
106
def _ConfigRpcCurl(curl):
107
  noded_cert = str(constants.NODED_CERT_FILE)
108

    
109
  curl.setopt(pycurl.FOLLOWLOCATION, False)
110
  curl.setopt(pycurl.CAINFO, noded_cert)
111
  curl.setopt(pycurl.SSL_VERIFYHOST, 0)
112
  curl.setopt(pycurl.SSL_VERIFYPEER, True)
113
  curl.setopt(pycurl.SSLCERTTYPE, "PEM")
114
  curl.setopt(pycurl.SSLCERT, noded_cert)
115
  curl.setopt(pycurl.SSLKEYTYPE, "PEM")
116
  curl.setopt(pycurl.SSLKEY, noded_cert)
117
  curl.setopt(pycurl.CONNECTTIMEOUT, _RPC_CONNECT_TIMEOUT)
118

    
119

    
120
def RunWithRPC(fn):
121
  """RPC-wrapper decorator.
122

123
  When applied to a function, it runs it with the RPC system
124
  initialized, and it shutsdown the system afterwards. This means the
125
  function must be called without RPC being initialized.
126

127
  """
128
  def wrapper(*args, **kwargs):
129
    Init()
130
    try:
131
      return fn(*args, **kwargs)
132
    finally:
133
      Shutdown()
134
  return wrapper
135

    
136

    
137
def _Compress(data):
138
  """Compresses a string for transport over RPC.
139

140
  Small amounts of data are not compressed.
141

142
  @type data: str
143
  @param data: Data
144
  @rtype: tuple
145
  @return: Encoded data to send
146

147
  """
148
  # Small amounts of data are not compressed
149
  if len(data) < 512:
150
    return (constants.RPC_ENCODING_NONE, data)
151

    
152
  # Compress with zlib and encode in base64
153
  return (constants.RPC_ENCODING_ZLIB_BASE64,
154
          base64.b64encode(zlib.compress(data, 3)))
155

    
156

    
157
class RpcResult(object):
158
  """RPC Result class.
159

160
  This class holds an RPC result. It is needed since in multi-node
161
  calls we can't raise an exception just because one one out of many
162
  failed, and therefore we use this class to encapsulate the result.
163

164
  @ivar data: the data payload, for successful results, or None
165
  @ivar call: the name of the RPC call
166
  @ivar node: the name of the node to which we made the call
167
  @ivar offline: whether the operation failed because the node was
168
      offline, as opposed to actual failure; offline=True will always
169
      imply failed=True, in order to allow simpler checking if
170
      the user doesn't care about the exact failure mode
171
  @ivar fail_msg: the error message if the call failed
172

173
  """
174
  def __init__(self, data=None, failed=False, offline=False,
175
               call=None, node=None):
176
    self.offline = offline
177
    self.call = call
178
    self.node = node
179

    
180
    if offline:
181
      self.fail_msg = "Node is marked offline"
182
      self.data = self.payload = None
183
    elif failed:
184
      self.fail_msg = self._EnsureErr(data)
185
      self.data = self.payload = None
186
    else:
187
      self.data = data
188
      if not isinstance(self.data, (tuple, list)):
189
        self.fail_msg = ("RPC layer error: invalid result type (%s)" %
190
                         type(self.data))
191
        self.payload = None
192
      elif len(data) != 2:
193
        self.fail_msg = ("RPC layer error: invalid result length (%d), "
194
                         "expected 2" % len(self.data))
195
        self.payload = None
196
      elif not self.data[0]:
197
        self.fail_msg = self._EnsureErr(self.data[1])
198
        self.payload = None
199
      else:
200
        # finally success
201
        self.fail_msg = None
202
        self.payload = data[1]
203

    
204
    for attr_name in ["call", "data", "fail_msg",
205
                      "node", "offline", "payload"]:
206
      assert hasattr(self, attr_name), "Missing attribute %s" % attr_name
207

    
208
  @staticmethod
209
  def _EnsureErr(val):
210
    """Helper to ensure we return a 'True' value for error."""
211
    if val:
212
      return val
213
    else:
214
      return "No error information"
215

    
216
  def Raise(self, msg, prereq=False, ecode=None):
217
    """If the result has failed, raise an OpExecError.
218

219
    This is used so that LU code doesn't have to check for each
220
    result, but instead can call this function.
221

222
    """
223
    if not self.fail_msg:
224
      return
225

    
226
    if not msg: # one could pass None for default message
227
      msg = ("Call '%s' to node '%s' has failed: %s" %
228
             (self.call, self.node, self.fail_msg))
229
    else:
230
      msg = "%s: %s" % (msg, self.fail_msg)
231
    if prereq:
232
      ec = errors.OpPrereqError
233
    else:
234
      ec = errors.OpExecError
235
    if ecode is not None:
236
      args = (msg, ecode)
237
    else:
238
      args = (msg, )
239
    raise ec(*args) # pylint: disable=W0142
240

    
241

    
242
def _SsconfResolver(ssconf_ips, node_list, _,
243
                    ssc=ssconf.SimpleStore,
244
                    nslookup_fn=netutils.Hostname.GetIP):
245
  """Return addresses for given node names.
246

247
  @type ssconf_ips: bool
248
  @param ssconf_ips: Use the ssconf IPs
249
  @type node_list: list
250
  @param node_list: List of node names
251
  @type ssc: class
252
  @param ssc: SimpleStore class that is used to obtain node->ip mappings
253
  @type nslookup_fn: callable
254
  @param nslookup_fn: function use to do NS lookup
255
  @rtype: list of tuple; (string, string)
256
  @return: List of tuples containing node name and IP address
257

258
  """
259
  ss = ssc()
260
  family = ss.GetPrimaryIPFamily()
261

    
262
  if ssconf_ips:
263
    iplist = ss.GetNodePrimaryIPList()
264
    ipmap = dict(entry.split() for entry in iplist)
265
  else:
266
    ipmap = {}
267

    
268
  result = []
269
  for node in node_list:
270
    ip = ipmap.get(node)
271
    if ip is None:
272
      ip = nslookup_fn(node, family=family)
273
    result.append((node, ip))
274

    
275
  return result
276

    
277

    
278
class _StaticResolver:
279
  def __init__(self, addresses):
280
    """Initializes this class.
281

282
    """
283
    self._addresses = addresses
284

    
285
  def __call__(self, hosts, _):
286
    """Returns static addresses for hosts.
287

288
    """
289
    assert len(hosts) == len(self._addresses)
290
    return zip(hosts, self._addresses)
291

    
292

    
293
def _CheckConfigNode(name, node, accept_offline_node):
294
  """Checks if a node is online.
295

296
  @type name: string
297
  @param name: Node name
298
  @type node: L{objects.Node} or None
299
  @param node: Node object
300

301
  """
302
  if node is None:
303
    # Depend on DNS for name resolution
304
    ip = name
305
  elif node.offline and not accept_offline_node:
306
    ip = _OFFLINE
307
  else:
308
    ip = node.primary_ip
309
  return (name, ip)
310

    
311

    
312
def _NodeConfigResolver(single_node_fn, all_nodes_fn, hosts, opts):
313
  """Calculate node addresses using configuration.
314

315
  """
316
  accept_offline_node = (opts is rpc_defs.ACCEPT_OFFLINE_NODE)
317

    
318
  assert accept_offline_node or opts is None, "Unknown option"
319

    
320
  # Special case for single-host lookups
321
  if len(hosts) == 1:
322
    (name, ) = hosts
323
    return [_CheckConfigNode(name, single_node_fn(name), accept_offline_node)]
324
  else:
325
    all_nodes = all_nodes_fn()
326
    return [_CheckConfigNode(name, all_nodes.get(name, None),
327
                             accept_offline_node)
328
            for name in hosts]
329

    
330

    
331
class _RpcProcessor:
332
  def __init__(self, resolver, port, lock_monitor_cb=None):
333
    """Initializes this class.
334

335
    @param resolver: callable accepting a list of hostnames, returning a list
336
      of tuples containing name and IP address (IP address can be the name or
337
      the special value L{_OFFLINE} to mark offline machines)
338
    @type port: int
339
    @param port: TCP port
340
    @param lock_monitor_cb: Callable for registering with lock monitor
341

342
    """
343
    self._resolver = resolver
344
    self._port = port
345
    self._lock_monitor_cb = lock_monitor_cb
346

    
347
  @staticmethod
348
  def _PrepareRequests(hosts, port, procedure, body, read_timeout):
349
    """Prepares requests by sorting offline hosts into separate list.
350

351
    @type body: dict
352
    @param body: a dictionary with per-host body data
353

354
    """
355
    results = {}
356
    requests = {}
357

    
358
    assert isinstance(body, dict)
359
    assert len(body) == len(hosts)
360
    assert compat.all(isinstance(v, str) for v in body.values())
361
    assert frozenset(map(compat.fst, hosts)) == frozenset(body.keys()), \
362
        "%s != %s" % (hosts, body.keys())
363

    
364
    for (name, ip) in hosts:
365
      if ip is _OFFLINE:
366
        # Node is marked as offline
367
        results[name] = RpcResult(node=name, offline=True, call=procedure)
368
      else:
369
        requests[name] = \
370
          http.client.HttpClientRequest(str(ip), port,
371
                                        http.HTTP_POST, str("/%s" % procedure),
372
                                        headers=_RPC_CLIENT_HEADERS,
373
                                        post_data=body[name],
374
                                        read_timeout=read_timeout,
375
                                        nicename="%s/%s" % (name, procedure),
376
                                        curl_config_fn=_ConfigRpcCurl)
377

    
378
    return (results, requests)
379

    
380
  @staticmethod
381
  def _CombineResults(results, requests, procedure):
382
    """Combines pre-computed results for offline hosts with actual call results.
383

384
    """
385
    for name, req in requests.items():
386
      if req.success and req.resp_status_code == http.HTTP_OK:
387
        host_result = RpcResult(data=serializer.LoadJson(req.resp_body),
388
                                node=name, call=procedure)
389
      else:
390
        # TODO: Better error reporting
391
        if req.error:
392
          msg = req.error
393
        else:
394
          msg = req.resp_body
395

    
396
        logging.error("RPC error in %s on node %s: %s", procedure, name, msg)
397
        host_result = RpcResult(data=msg, failed=True, node=name,
398
                                call=procedure)
399

    
400
      results[name] = host_result
401

    
402
    return results
403

    
404
  def __call__(self, hosts, procedure, body, read_timeout, resolver_opts,
405
               _req_process_fn=None):
406
    """Makes an RPC request to a number of nodes.
407

408
    @type hosts: sequence
409
    @param hosts: Hostnames
410
    @type procedure: string
411
    @param procedure: Request path
412
    @type body: dictionary
413
    @param body: dictionary with request bodies per host
414
    @type read_timeout: int or None
415
    @param read_timeout: Read timeout for request
416

417
    """
418
    assert read_timeout is not None, \
419
      "Missing RPC read timeout for procedure '%s'" % procedure
420

    
421
    if _req_process_fn is None:
422
      _req_process_fn = http.client.ProcessRequests
423

    
424
    (results, requests) = \
425
      self._PrepareRequests(self._resolver(hosts, resolver_opts), self._port,
426
                            procedure, body, read_timeout)
427

    
428
    _req_process_fn(requests.values(), lock_monitor_cb=self._lock_monitor_cb)
429

    
430
    assert not frozenset(results).intersection(requests)
431

    
432
    return self._CombineResults(results, requests, procedure)
433

    
434

    
435
class _RpcClientBase:
436
  def __init__(self, resolver, encoder_fn, lock_monitor_cb=None,
437
               _req_process_fn=None):
438
    """Initializes this class.
439

440
    """
441
    proc = _RpcProcessor(resolver,
442
                         netutils.GetDaemonPort(constants.NODED),
443
                         lock_monitor_cb=lock_monitor_cb)
444
    self._proc = compat.partial(proc, _req_process_fn=_req_process_fn)
445
    self._encoder = compat.partial(self._EncodeArg, encoder_fn)
446

    
447
  @staticmethod
448
  def _EncodeArg(encoder_fn, (argkind, value)):
449
    """Encode argument.
450

451
    """
452
    if argkind is None:
453
      return value
454
    else:
455
      return encoder_fn(argkind)(value)
456

    
457
  def _Call(self, cdef, node_list, args):
458
    """Entry point for automatically generated RPC wrappers.
459

460
    """
461
    (procedure, _, resolver_opts, timeout, argdefs,
462
     prep_fn, postproc_fn, _) = cdef
463

    
464
    if callable(timeout):
465
      read_timeout = timeout(args)
466
    else:
467
      read_timeout = timeout
468

    
469
    if callable(resolver_opts):
470
      req_resolver_opts = resolver_opts(args)
471
    else:
472
      req_resolver_opts = resolver_opts
473

    
474
    if len(args) != len(argdefs):
475
      raise errors.ProgrammerError("Number of passed arguments doesn't match")
476

    
477
    enc_args = map(self._encoder, zip(map(compat.snd, argdefs), args))
478
    if prep_fn is None:
479
      # for a no-op prep_fn, we serialise the body once, and then we
480
      # reuse it in the dictionary values
481
      body = serializer.DumpJson(enc_args)
482
      pnbody = dict((n, body) for n in node_list)
483
    else:
484
      # for a custom prep_fn, we pass the encoded arguments and the
485
      # node name to the prep_fn, and we serialise its return value
486
      assert callable(prep_fn)
487
      pnbody = dict((n, serializer.DumpJson(prep_fn(n, enc_args)))
488
                    for n in node_list)
489

    
490
    result = self._proc(node_list, procedure, pnbody, read_timeout,
491
                        req_resolver_opts)
492

    
493
    if postproc_fn:
494
      return dict(map(lambda (key, value): (key, postproc_fn(value)),
495
                      result.items()))
496
    else:
497
      return result
498

    
499

    
500
def _ObjectToDict(value):
501
  """Converts an object to a dictionary.
502

503
  @note: See L{objects}.
504

505
  """
506
  return value.ToDict()
507

    
508

    
509
def _ObjectListToDict(value):
510
  """Converts a list of L{objects} to dictionaries.
511

512
  """
513
  return map(_ObjectToDict, value)
514

    
515

    
516
def _EncodeNodeToDiskDict(value):
517
  """Encodes a dictionary with node name as key and disk objects as values.
518

519
  """
520
  return dict((name, _ObjectListToDict(disks))
521
              for name, disks in value.items())
522

    
523

    
524
def _PrepareFileUpload(getents_fn, filename):
525
  """Loads a file and prepares it for an upload to nodes.
526

527
  """
528
  statcb = utils.FileStatHelper()
529
  data = _Compress(utils.ReadFile(filename, preread=statcb))
530
  st = statcb.st
531

    
532
  if getents_fn is None:
533
    getents_fn = runtime.GetEnts
534

    
535
  getents = getents_fn()
536

    
537
  return [filename, data, st.st_mode, getents.LookupUid(st.st_uid),
538
          getents.LookupGid(st.st_gid), st.st_atime, st.st_mtime]
539

    
540

    
541
def _PrepareFinalizeExportDisks(snap_disks):
542
  """Encodes disks for finalizing export.
543

544
  """
545
  flat_disks = []
546

    
547
  for disk in snap_disks:
548
    if isinstance(disk, bool):
549
      flat_disks.append(disk)
550
    else:
551
      flat_disks.append(disk.ToDict())
552

    
553
  return flat_disks
554

    
555

    
556
def _EncodeImportExportIO((ieio, ieioargs)):
557
  """Encodes import/export I/O information.
558

559
  """
560
  if ieio == constants.IEIO_RAW_DISK:
561
    assert len(ieioargs) == 1
562
    return (ieio, (ieioargs[0].ToDict(), ))
563

    
564
  if ieio == constants.IEIO_SCRIPT:
565
    assert len(ieioargs) == 2
566
    return (ieio, (ieioargs[0].ToDict(), ieioargs[1]))
567

    
568
  return (ieio, ieioargs)
569

    
570

    
571
def _EncodeBlockdevRename(value):
572
  """Encodes information for renaming block devices.
573

574
  """
575
  return [(d.ToDict(), uid) for d, uid in value]
576

    
577

    
578
def _AnnotateDParamsDRBD(disk, (drbd_params, data_params, meta_params)):
579
  """Annotates just DRBD disks layouts.
580

581
  """
582
  assert disk.dev_type == constants.LD_DRBD8
583

    
584
  disk.params = objects.FillDict(drbd_params, disk.params)
585
  (dev_data, dev_meta) = disk.children
586
  dev_data.params = objects.FillDict(data_params, dev_data.params)
587
  dev_meta.params = objects.FillDict(meta_params, dev_meta.params)
588

    
589
  return disk
590

    
591

    
592
def _AnnotateDParamsGeneric(disk, (params, )):
593
  """Generic disk parameter annotation routine.
594

595
  """
596
  assert disk.dev_type != constants.LD_DRBD8
597

    
598
  disk.params = objects.FillDict(params, disk.params)
599

    
600
  return disk
601

    
602

    
603
def AnnotateDiskParams(template, disks, disk_params):
604
  """Annotates the disk objects with the disk parameters.
605

606
  @param template: The disk template used
607
  @param disks: The list of disks objects to annotate
608
  @param disk_params: The disk paramaters for annotation
609
  @returns: A list of disk objects annotated
610

611
  """
612
  ld_params = objects.Disk.ComputeLDParams(template, disk_params)
613

    
614
  if template == constants.DT_DRBD8:
615
    annotation_fn = _AnnotateDParamsDRBD
616
  elif template == constants.DT_DISKLESS:
617
    annotation_fn = lambda disk, _: disk
618
  else:
619
    annotation_fn = _AnnotateDParamsGeneric
620

    
621
  new_disks = []
622
  for disk in disks:
623
    new_disks.append(annotation_fn(disk.Copy(), ld_params))
624

    
625
  return new_disks
626

    
627

    
628
#: Generic encoders
629
_ENCODERS = {
630
  rpc_defs.ED_OBJECT_DICT: _ObjectToDict,
631
  rpc_defs.ED_OBJECT_DICT_LIST: _ObjectListToDict,
632
  rpc_defs.ED_NODE_TO_DISK_DICT: _EncodeNodeToDiskDict,
633
  rpc_defs.ED_COMPRESS: _Compress,
634
  rpc_defs.ED_FINALIZE_EXPORT_DISKS: _PrepareFinalizeExportDisks,
635
  rpc_defs.ED_IMPEXP_IO: _EncodeImportExportIO,
636
  rpc_defs.ED_BLOCKDEV_RENAME: _EncodeBlockdevRename,
637
  }
638

    
639

    
640
class RpcRunner(_RpcClientBase,
641
                _generated_rpc.RpcClientDefault,
642
                _generated_rpc.RpcClientBootstrap,
643
                _generated_rpc.RpcClientDnsOnly,
644
                _generated_rpc.RpcClientConfig):
645
  """RPC runner class.
646

647
  """
648
  def __init__(self, cfg, lock_monitor_cb, _req_process_fn=None, _getents=None):
649
    """Initialized the RPC runner.
650

651
    @type cfg: L{config.ConfigWriter}
652
    @param cfg: Configuration
653
    @type lock_monitor_cb: callable
654
    @param lock_monitor_cb: Lock monitor callback
655

656
    """
657
    self._cfg = cfg
658

    
659
    encoders = _ENCODERS.copy()
660

    
661
    encoders.update({
662
      # Encoders requiring configuration object
663
      rpc_defs.ED_INST_DICT: self._InstDict,
664
      rpc_defs.ED_INST_DICT_HVP_BEP: self._InstDictHvpBep,
665
      rpc_defs.ED_INST_DICT_OSP_DP: self._InstDictOspDp,
666

    
667
      # Encoders annotating disk parameters
668
      rpc_defs.ED_DISKS_DICT_DP: self._DisksDictDP,
669
      rpc_defs.ED_SINGLE_DISK_DICT_DP: self._SingleDiskDictDP,
670

    
671
      # Encoders with special requirements
672
      rpc_defs.ED_FILE_DETAILS: compat.partial(_PrepareFileUpload, _getents),
673
      })
674

    
675
    # Resolver using configuration
676
    resolver = compat.partial(_NodeConfigResolver, cfg.GetNodeInfo,
677
                              cfg.GetAllNodesInfo)
678

    
679
    # Pylint doesn't recognize multiple inheritance properly, see
680
    # <http://www.logilab.org/ticket/36586> and
681
    # <http://www.logilab.org/ticket/35642>
682
    # pylint: disable=W0233
683
    _RpcClientBase.__init__(self, resolver, encoders.get,
684
                            lock_monitor_cb=lock_monitor_cb,
685
                            _req_process_fn=_req_process_fn)
686
    _generated_rpc.RpcClientConfig.__init__(self)
687
    _generated_rpc.RpcClientBootstrap.__init__(self)
688
    _generated_rpc.RpcClientDnsOnly.__init__(self)
689
    _generated_rpc.RpcClientDefault.__init__(self)
690

    
691
  def _InstDict(self, instance, hvp=None, bep=None, osp=None):
692
    """Convert the given instance to a dict.
693

694
    This is done via the instance's ToDict() method and additionally
695
    we fill the hvparams with the cluster defaults.
696

697
    @type instance: L{objects.Instance}
698
    @param instance: an Instance object
699
    @type hvp: dict or None
700
    @param hvp: a dictionary with overridden hypervisor parameters
701
    @type bep: dict or None
702
    @param bep: a dictionary with overridden backend parameters
703
    @type osp: dict or None
704
    @param osp: a dictionary with overridden os parameters
705
    @rtype: dict
706
    @return: the instance dict, with the hvparams filled with the
707
        cluster defaults
708

709
    """
710
    idict = instance.ToDict()
711
    cluster = self._cfg.GetClusterInfo()
712
    idict["hvparams"] = cluster.FillHV(instance)
713
    if hvp is not None:
714
      idict["hvparams"].update(hvp)
715
    idict["beparams"] = cluster.FillBE(instance)
716
    if bep is not None:
717
      idict["beparams"].update(bep)
718
    idict["osparams"] = cluster.SimpleFillOS(instance.os, instance.osparams)
719
    if osp is not None:
720
      idict["osparams"].update(osp)
721
    for nic in idict["nics"]:
722
      nic["nicparams"] = objects.FillDict(
723
        cluster.nicparams[constants.PP_DEFAULT],
724
        nic["nicparams"])
725
    return idict
726

    
727
  def _InstDictHvpBep(self, (instance, hvp, bep)):
728
    """Wrapper for L{_InstDict}.
729

730
    """
731
    return self._InstDict(instance, hvp=hvp, bep=bep)
732

    
733
  def _InstDictOspDp(self, (instance, osparams)):
734
    """Wrapper for L{_InstDict}.
735

736
    """
737
    updated_inst = self._InstDict(instance, osp=osparams)
738
    updated_inst["disks"] = self._DisksDictDP((instance.disks, instance))
739
    return updated_inst
740

    
741
  def _DisksDictDP(self, (disks, instance)):
742
    """Wrapper for L{AnnotateDiskParams}.
743

744
    """
745
    diskparams = self._cfg.GetInstanceDiskParams(instance)
746
    return [disk.ToDict()
747
            for disk in AnnotateDiskParams(instance.disk_template,
748
                                           disks, diskparams)]
749

    
750
  def _SingleDiskDictDP(self, (disk, instance)):
751
    """Wrapper for L{AnnotateDiskParams}.
752

753
    """
754
    (anno_disk,) = self._DisksDictDP(([disk], instance))
755
    return anno_disk
756

    
757

    
758
class JobQueueRunner(_RpcClientBase, _generated_rpc.RpcClientJobQueue):
759
  """RPC wrappers for job queue.
760

761
  """
762
  def __init__(self, context, address_list):
763
    """Initializes this class.
764

765
    """
766
    if address_list is None:
767
      resolver = compat.partial(_SsconfResolver, True)
768
    else:
769
      # Caller provided an address list
770
      resolver = _StaticResolver(address_list)
771

    
772
    _RpcClientBase.__init__(self, resolver, _ENCODERS.get,
773
                            lock_monitor_cb=context.glm.AddToLockMonitor)
774
    _generated_rpc.RpcClientJobQueue.__init__(self)
775

    
776

    
777
class BootstrapRunner(_RpcClientBase,
778
                      _generated_rpc.RpcClientBootstrap,
779
                      _generated_rpc.RpcClientDnsOnly):
780
  """RPC wrappers for bootstrapping.
781

782
  """
783
  def __init__(self):
784
    """Initializes this class.
785

786
    """
787
    # Pylint doesn't recognize multiple inheritance properly, see
788
    # <http://www.logilab.org/ticket/36586> and
789
    # <http://www.logilab.org/ticket/35642>
790
    # pylint: disable=W0233
791
    _RpcClientBase.__init__(self, compat.partial(_SsconfResolver, True),
792
                            _ENCODERS.get)
793
    _generated_rpc.RpcClientBootstrap.__init__(self)
794
    _generated_rpc.RpcClientDnsOnly.__init__(self)
795

    
796

    
797
class DnsOnlyRunner(_RpcClientBase, _generated_rpc.RpcClientDnsOnly):
798
  """RPC wrappers for calls using only DNS.
799

800
  """
801
  def __init__(self):
802
    """Initialize this class.
803

804
    """
805
    _RpcClientBase.__init__(self, compat.partial(_SsconfResolver, False),
806
                            _ENCODERS.get)
807
    _generated_rpc.RpcClientDnsOnly.__init__(self)
808

    
809

    
810
class ConfigRunner(_RpcClientBase, _generated_rpc.RpcClientConfig):
811
  """RPC wrappers for L{config}.
812

813
  """
814
  def __init__(self, context, address_list, _req_process_fn=None,
815
               _getents=None):
816
    """Initializes this class.
817

818
    """
819
    if context:
820
      lock_monitor_cb = context.glm.AddToLockMonitor
821
    else:
822
      lock_monitor_cb = None
823

    
824
    if address_list is None:
825
      resolver = compat.partial(_SsconfResolver, True)
826
    else:
827
      # Caller provided an address list
828
      resolver = _StaticResolver(address_list)
829

    
830
    encoders = _ENCODERS.copy()
831

    
832
    encoders.update({
833
      rpc_defs.ED_FILE_DETAILS: compat.partial(_PrepareFileUpload, _getents),
834
      })
835

    
836
    _RpcClientBase.__init__(self, resolver, encoders.get,
837
                            lock_monitor_cb=lock_monitor_cb,
838
                            _req_process_fn=_req_process_fn)
839
    _generated_rpc.RpcClientConfig.__init__(self)