Statistics
| Branch: | Tag: | Revision:

root / lib / rpc.py @ 05325a35

History | View | Annotate | Download (25.1 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Inter-node RPC library.
23

24
"""
25

    
26
# pylint: disable=C0103,R0201,R0904
27
# C0103: Invalid name, since call_ are not valid
28
# R0201: Method could be a function, we keep all rpcs instance methods
29
# as not to change them back and forth between static/instance methods
30
# if they need to start using instance attributes
31
# R0904: Too many public methods
32

    
33
import logging
34
import zlib
35
import base64
36
import pycurl
37
import threading
38
import copy
39

    
40
from ganeti import utils
41
from ganeti import objects
42
from ganeti import http
43
from ganeti import serializer
44
from ganeti import constants
45
from ganeti import errors
46
from ganeti import netutils
47
from ganeti import ssconf
48
from ganeti import runtime
49
from ganeti import compat
50
from ganeti import rpc_defs
51
from ganeti import pathutils
52
from ganeti import vcluster
53

    
54
# Special module generated at build time
55
from ganeti import _generated_rpc
56

    
57
# pylint has a bug here, doesn't see this import
58
import ganeti.http.client  # pylint: disable=W0611
59

    
60

    
61
_RPC_CLIENT_HEADERS = [
62
  "Content-type: %s" % http.HTTP_APP_JSON,
63
  "Expect:",
64
  ]
65

    
66
#: Special value to describe an offline host
67
_OFFLINE = object()
68

    
69

    
70
def Init():
71
  """Initializes the module-global HTTP client manager.
72

73
  Must be called before using any RPC function and while exactly one thread is
74
  running.
75

76
  """
77
  # curl_global_init(3) and curl_global_cleanup(3) must be called with only
78
  # one thread running. This check is just a safety measure -- it doesn't
79
  # cover all cases.
80
  assert threading.activeCount() == 1, \
81
         "Found more than one active thread when initializing pycURL"
82

    
83
  logging.info("Using PycURL %s", pycurl.version)
84

    
85
  pycurl.global_init(pycurl.GLOBAL_ALL)
86

    
87

    
88
def Shutdown():
89
  """Stops the module-global HTTP client manager.
90

91
  Must be called before quitting the program and while exactly one thread is
92
  running.
93

94
  """
95
  pycurl.global_cleanup()
96

    
97

    
98
def _ConfigRpcCurl(curl):
99
  noded_cert = str(pathutils.NODED_CERT_FILE)
100

    
101
  curl.setopt(pycurl.FOLLOWLOCATION, False)
102
  curl.setopt(pycurl.CAINFO, noded_cert)
103
  curl.setopt(pycurl.SSL_VERIFYHOST, 0)
104
  curl.setopt(pycurl.SSL_VERIFYPEER, True)
105
  curl.setopt(pycurl.SSLCERTTYPE, "PEM")
106
  curl.setopt(pycurl.SSLCERT, noded_cert)
107
  curl.setopt(pycurl.SSLKEYTYPE, "PEM")
108
  curl.setopt(pycurl.SSLKEY, noded_cert)
109
  curl.setopt(pycurl.CONNECTTIMEOUT, constants.RPC_CONNECT_TIMEOUT)
110

    
111

    
112
def RunWithRPC(fn):
113
  """RPC-wrapper decorator.
114

115
  When applied to a function, it runs it with the RPC system
116
  initialized, and it shutsdown the system afterwards. This means the
117
  function must be called without RPC being initialized.
118

119
  """
120
  def wrapper(*args, **kwargs):
121
    Init()
122
    try:
123
      return fn(*args, **kwargs)
124
    finally:
125
      Shutdown()
126
  return wrapper
127

    
128

    
129
def _Compress(data):
130
  """Compresses a string for transport over RPC.
131

132
  Small amounts of data are not compressed.
133

134
  @type data: str
135
  @param data: Data
136
  @rtype: tuple
137
  @return: Encoded data to send
138

139
  """
140
  # Small amounts of data are not compressed
141
  if len(data) < 512:
142
    return (constants.RPC_ENCODING_NONE, data)
143

    
144
  # Compress with zlib and encode in base64
145
  return (constants.RPC_ENCODING_ZLIB_BASE64,
146
          base64.b64encode(zlib.compress(data, 3)))
147

    
148

    
149
class RpcResult(object):
150
  """RPC Result class.
151

152
  This class holds an RPC result. It is needed since in multi-node
153
  calls we can't raise an exception just because one out of many
154
  failed, and therefore we use this class to encapsulate the result.
155

156
  @ivar data: the data payload, for successful results, or None
157
  @ivar call: the name of the RPC call
158
  @ivar node: the name of the node to which we made the call
159
  @ivar offline: whether the operation failed because the node was
160
      offline, as opposed to actual failure; offline=True will always
161
      imply failed=True, in order to allow simpler checking if
162
      the user doesn't care about the exact failure mode
163
  @ivar fail_msg: the error message if the call failed
164

165
  """
166
  def __init__(self, data=None, failed=False, offline=False,
167
               call=None, node=None):
168
    self.offline = offline
169
    self.call = call
170
    self.node = node
171

    
172
    if offline:
173
      self.fail_msg = "Node is marked offline"
174
      self.data = self.payload = None
175
    elif failed:
176
      self.fail_msg = self._EnsureErr(data)
177
      self.data = self.payload = None
178
    else:
179
      self.data = data
180
      if not isinstance(self.data, (tuple, list)):
181
        self.fail_msg = ("RPC layer error: invalid result type (%s)" %
182
                         type(self.data))
183
        self.payload = None
184
      elif len(data) != 2:
185
        self.fail_msg = ("RPC layer error: invalid result length (%d), "
186
                         "expected 2" % len(self.data))
187
        self.payload = None
188
      elif not self.data[0]:
189
        self.fail_msg = self._EnsureErr(self.data[1])
190
        self.payload = None
191
      else:
192
        # finally success
193
        self.fail_msg = None
194
        self.payload = data[1]
195

    
196
    for attr_name in ["call", "data", "fail_msg",
197
                      "node", "offline", "payload"]:
198
      assert hasattr(self, attr_name), "Missing attribute %s" % attr_name
199

    
200
  @staticmethod
201
  def _EnsureErr(val):
202
    """Helper to ensure we return a 'True' value for error."""
203
    if val:
204
      return val
205
    else:
206
      return "No error information"
207

    
208
  def Raise(self, msg, prereq=False, ecode=None):
209
    """If the result has failed, raise an OpExecError.
210

211
    This is used so that LU code doesn't have to check for each
212
    result, but instead can call this function.
213

214
    """
215
    if not self.fail_msg:
216
      return
217

    
218
    if not msg: # one could pass None for default message
219
      msg = ("Call '%s' to node '%s' has failed: %s" %
220
             (self.call, self.node, self.fail_msg))
221
    else:
222
      msg = "%s: %s" % (msg, self.fail_msg)
223
    if prereq:
224
      ec = errors.OpPrereqError
225
    else:
226
      ec = errors.OpExecError
227
    if ecode is not None:
228
      args = (msg, ecode)
229
    else:
230
      args = (msg, )
231
    raise ec(*args) # pylint: disable=W0142
232

    
233

    
234
def _SsconfResolver(ssconf_ips, node_list, _,
235
                    ssc=ssconf.SimpleStore,
236
                    nslookup_fn=netutils.Hostname.GetIP):
237
  """Return addresses for given node names.
238

239
  @type ssconf_ips: bool
240
  @param ssconf_ips: Use the ssconf IPs
241
  @type node_list: list
242
  @param node_list: List of node names
243
  @type ssc: class
244
  @param ssc: SimpleStore class that is used to obtain node->ip mappings
245
  @type nslookup_fn: callable
246
  @param nslookup_fn: function use to do NS lookup
247
  @rtype: list of tuple; (string, string)
248
  @return: List of tuples containing node name and IP address
249

250
  """
251
  ss = ssc()
252
  family = ss.GetPrimaryIPFamily()
253

    
254
  if ssconf_ips:
255
    iplist = ss.GetNodePrimaryIPList()
256
    ipmap = dict(entry.split() for entry in iplist)
257
  else:
258
    ipmap = {}
259

    
260
  result = []
261
  for node in node_list:
262
    ip = ipmap.get(node)
263
    if ip is None:
264
      ip = nslookup_fn(node, family=family)
265
    result.append((node, ip))
266

    
267
  return result
268

    
269

    
270
class _StaticResolver:
271
  def __init__(self, addresses):
272
    """Initializes this class.
273

274
    """
275
    self._addresses = addresses
276

    
277
  def __call__(self, hosts, _):
278
    """Returns static addresses for hosts.
279

280
    """
281
    assert len(hosts) == len(self._addresses)
282
    return zip(hosts, self._addresses)
283

    
284

    
285
def _CheckConfigNode(name, node, accept_offline_node):
286
  """Checks if a node is online.
287

288
  @type name: string
289
  @param name: Node name
290
  @type node: L{objects.Node} or None
291
  @param node: Node object
292

293
  """
294
  if node is None:
295
    # Depend on DNS for name resolution
296
    ip = name
297
  elif node.offline and not accept_offline_node:
298
    ip = _OFFLINE
299
  else:
300
    ip = node.primary_ip
301
  return (name, ip)
302

    
303

    
304
def _NodeConfigResolver(single_node_fn, all_nodes_fn, hosts, opts):
305
  """Calculate node addresses using configuration.
306

307
  """
308
  accept_offline_node = (opts is rpc_defs.ACCEPT_OFFLINE_NODE)
309

    
310
  assert accept_offline_node or opts is None, "Unknown option"
311

    
312
  # Special case for single-host lookups
313
  if len(hosts) == 1:
314
    (name, ) = hosts
315
    return [_CheckConfigNode(name, single_node_fn(name), accept_offline_node)]
316
  else:
317
    all_nodes = all_nodes_fn()
318
    return [_CheckConfigNode(name, all_nodes.get(name, None),
319
                             accept_offline_node)
320
            for name in hosts]
321

    
322

    
323
class _RpcProcessor:
324
  def __init__(self, resolver, port, lock_monitor_cb=None):
325
    """Initializes this class.
326

327
    @param resolver: callable accepting a list of hostnames, returning a list
328
      of tuples containing name and IP address (IP address can be the name or
329
      the special value L{_OFFLINE} to mark offline machines)
330
    @type port: int
331
    @param port: TCP port
332
    @param lock_monitor_cb: Callable for registering with lock monitor
333

334
    """
335
    self._resolver = resolver
336
    self._port = port
337
    self._lock_monitor_cb = lock_monitor_cb
338

    
339
  @staticmethod
340
  def _PrepareRequests(hosts, port, procedure, body, read_timeout):
341
    """Prepares requests by sorting offline hosts into separate list.
342

343
    @type body: dict
344
    @param body: a dictionary with per-host body data
345

346
    """
347
    results = {}
348
    requests = {}
349

    
350
    assert isinstance(body, dict)
351
    assert len(body) == len(hosts)
352
    assert compat.all(isinstance(v, str) for v in body.values())
353
    assert frozenset(map(compat.fst, hosts)) == frozenset(body.keys()), \
354
        "%s != %s" % (hosts, body.keys())
355

    
356
    for (name, ip) in hosts:
357
      if ip is _OFFLINE:
358
        # Node is marked as offline
359
        results[name] = RpcResult(node=name, offline=True, call=procedure)
360
      else:
361
        requests[name] = \
362
          http.client.HttpClientRequest(str(ip), port,
363
                                        http.HTTP_POST, str("/%s" % procedure),
364
                                        headers=_RPC_CLIENT_HEADERS,
365
                                        post_data=body[name],
366
                                        read_timeout=read_timeout,
367
                                        nicename="%s/%s" % (name, procedure),
368
                                        curl_config_fn=_ConfigRpcCurl)
369

    
370
    return (results, requests)
371

    
372
  @staticmethod
373
  def _CombineResults(results, requests, procedure):
374
    """Combines pre-computed results for offline hosts with actual call results.
375

376
    """
377
    for name, req in requests.items():
378
      if req.success and req.resp_status_code == http.HTTP_OK:
379
        host_result = RpcResult(data=serializer.LoadJson(req.resp_body),
380
                                node=name, call=procedure)
381
      else:
382
        # TODO: Better error reporting
383
        if req.error:
384
          msg = req.error
385
        else:
386
          msg = req.resp_body
387

    
388
        logging.error("RPC error in %s on node %s: %s", procedure, name, msg)
389
        host_result = RpcResult(data=msg, failed=True, node=name,
390
                                call=procedure)
391

    
392
      results[name] = host_result
393

    
394
    return results
395

    
396
  def __call__(self, hosts, procedure, body, read_timeout, resolver_opts,
397
               _req_process_fn=None):
398
    """Makes an RPC request to a number of nodes.
399

400
    @type hosts: sequence
401
    @param hosts: Hostnames
402
    @type procedure: string
403
    @param procedure: Request path
404
    @type body: dictionary
405
    @param body: dictionary with request bodies per host
406
    @type read_timeout: int or None
407
    @param read_timeout: Read timeout for request
408
    @rtype: dictionary
409
    @return: a dictionary mapping host names to rpc.RpcResult objects
410

411
    """
412
    assert read_timeout is not None, \
413
      "Missing RPC read timeout for procedure '%s'" % procedure
414

    
415
    if _req_process_fn is None:
416
      _req_process_fn = http.client.ProcessRequests
417

    
418
    (results, requests) = \
419
      self._PrepareRequests(self._resolver(hosts, resolver_opts), self._port,
420
                            procedure, body, read_timeout)
421

    
422
    _req_process_fn(requests.values(), lock_monitor_cb=self._lock_monitor_cb)
423

    
424
    assert not frozenset(results).intersection(requests)
425

    
426
    return self._CombineResults(results, requests, procedure)
427

    
428

    
429
class _RpcClientBase:
430
  def __init__(self, resolver, encoder_fn, lock_monitor_cb=None,
431
               _req_process_fn=None):
432
    """Initializes this class.
433

434
    """
435
    proc = _RpcProcessor(resolver,
436
                         netutils.GetDaemonPort(constants.NODED),
437
                         lock_monitor_cb=lock_monitor_cb)
438
    self._proc = compat.partial(proc, _req_process_fn=_req_process_fn)
439
    self._encoder = compat.partial(self._EncodeArg, encoder_fn)
440

    
441
  @staticmethod
442
  def _EncodeArg(encoder_fn, (argkind, value)):
443
    """Encode argument.
444

445
    """
446
    if argkind is None:
447
      return value
448
    else:
449
      return encoder_fn(argkind)(value)
450

    
451
  def _Call(self, cdef, node_list, args):
452
    """Entry point for automatically generated RPC wrappers.
453

454
    """
455
    (procedure, _, resolver_opts, timeout, argdefs,
456
     prep_fn, postproc_fn, _) = cdef
457

    
458
    if callable(timeout):
459
      read_timeout = timeout(args)
460
    else:
461
      read_timeout = timeout
462

    
463
    if callable(resolver_opts):
464
      req_resolver_opts = resolver_opts(args)
465
    else:
466
      req_resolver_opts = resolver_opts
467

    
468
    if len(args) != len(argdefs):
469
      raise errors.ProgrammerError("Number of passed arguments doesn't match")
470

    
471
    enc_args = map(self._encoder, zip(map(compat.snd, argdefs), args))
472
    if prep_fn is None:
473
      # for a no-op prep_fn, we serialise the body once, and then we
474
      # reuse it in the dictionary values
475
      body = serializer.DumpJson(enc_args)
476
      pnbody = dict((n, body) for n in node_list)
477
    else:
478
      # for a custom prep_fn, we pass the encoded arguments and the
479
      # node name to the prep_fn, and we serialise its return value
480
      assert callable(prep_fn)
481
      pnbody = dict((n, serializer.DumpJson(prep_fn(n, enc_args)))
482
                    for n in node_list)
483

    
484
    result = self._proc(node_list, procedure, pnbody, read_timeout,
485
                        req_resolver_opts)
486

    
487
    if postproc_fn:
488
      return dict(map(lambda (key, value): (key, postproc_fn(value)),
489
                      result.items()))
490
    else:
491
      return result
492

    
493

    
494
def _ObjectToDict(value):
495
  """Converts an object to a dictionary.
496

497
  @note: See L{objects}.
498

499
  """
500
  return value.ToDict()
501

    
502

    
503
def _ObjectListToDict(value):
504
  """Converts a list of L{objects} to dictionaries.
505

506
  """
507
  return map(_ObjectToDict, value)
508

    
509

    
510
def _EncodeNodeToDiskDict(value):
511
  """Encodes a dictionary with node name as key and disk objects as values.
512

513
  """
514
  return dict((name, _ObjectListToDict(disks))
515
              for name, disks in value.items())
516

    
517

    
518
def _PrepareFileUpload(getents_fn, filename):
519
  """Loads a file and prepares it for an upload to nodes.
520

521
  """
522
  statcb = utils.FileStatHelper()
523
  data = _Compress(utils.ReadFile(filename, preread=statcb))
524
  st = statcb.st
525

    
526
  if getents_fn is None:
527
    getents_fn = runtime.GetEnts
528

    
529
  getents = getents_fn()
530

    
531
  virt_filename = vcluster.MakeVirtualPath(filename)
532

    
533
  return [virt_filename, data, st.st_mode, getents.LookupUid(st.st_uid),
534
          getents.LookupGid(st.st_gid), st.st_atime, st.st_mtime]
535

    
536

    
537
def _PrepareFinalizeExportDisks(snap_disks):
538
  """Encodes disks for finalizing export.
539

540
  """
541
  flat_disks = []
542

    
543
  for disk in snap_disks:
544
    if isinstance(disk, bool):
545
      flat_disks.append(disk)
546
    else:
547
      flat_disks.append(disk.ToDict())
548

    
549
  return flat_disks
550

    
551

    
552
def _EncodeImportExportIO((ieio, ieioargs)):
553
  """Encodes import/export I/O information.
554

555
  """
556
  if ieio == constants.IEIO_RAW_DISK:
557
    assert len(ieioargs) == 1
558
    return (ieio, (ieioargs[0].ToDict(), ))
559

    
560
  if ieio == constants.IEIO_SCRIPT:
561
    assert len(ieioargs) == 2
562
    return (ieio, (ieioargs[0].ToDict(), ieioargs[1]))
563

    
564
  return (ieio, ieioargs)
565

    
566

    
567
def _EncodeBlockdevRename(value):
568
  """Encodes information for renaming block devices.
569

570
  """
571
  return [(d.ToDict(), uid) for d, uid in value]
572

    
573

    
574
def MakeLegacyNodeInfo(data):
575
  """Formats the data returned by L{rpc.RpcRunner.call_node_info}.
576

577
  Converts the data into a single dictionary. This is fine for most use cases,
578
  but some require information from more than one volume group or hypervisor.
579

580
  """
581
  (bootid, (vg_info, ), (hv_info, )) = data
582

    
583
  return utils.JoinDisjointDicts(utils.JoinDisjointDicts(vg_info, hv_info), {
584
    "bootid": bootid,
585
    })
586

    
587

    
588
def _AnnotateDParamsDRBD(disk, (drbd_params, data_params, meta_params)):
589
  """Annotates just DRBD disks layouts.
590

591
  """
592
  assert disk.dev_type == constants.LD_DRBD8
593

    
594
  disk.params = objects.FillDict(drbd_params, disk.params)
595
  (dev_data, dev_meta) = disk.children
596
  dev_data.params = objects.FillDict(data_params, dev_data.params)
597
  dev_meta.params = objects.FillDict(meta_params, dev_meta.params)
598

    
599
  return disk
600

    
601

    
602
def _AnnotateDParamsGeneric(disk, (params, )):
603
  """Generic disk parameter annotation routine.
604

605
  """
606
  assert disk.dev_type != constants.LD_DRBD8
607

    
608
  disk.params = objects.FillDict(params, disk.params)
609

    
610
  return disk
611

    
612

    
613
def AnnotateDiskParams(template, disks, disk_params):
614
  """Annotates the disk objects with the disk parameters.
615

616
  @param template: The disk template used
617
  @param disks: The list of disks objects to annotate
618
  @param disk_params: The disk paramaters for annotation
619
  @returns: A list of disk objects annotated
620

621
  """
622
  ld_params = objects.Disk.ComputeLDParams(template, disk_params)
623

    
624
  if template == constants.DT_DRBD8:
625
    annotation_fn = _AnnotateDParamsDRBD
626
  elif template == constants.DT_DISKLESS:
627
    annotation_fn = lambda disk, _: disk
628
  else:
629
    annotation_fn = _AnnotateDParamsGeneric
630

    
631
  return [annotation_fn(disk.Copy(), ld_params) for disk in disks]
632

    
633

    
634
#: Generic encoders
635
_ENCODERS = {
636
  rpc_defs.ED_OBJECT_DICT: _ObjectToDict,
637
  rpc_defs.ED_OBJECT_DICT_LIST: _ObjectListToDict,
638
  rpc_defs.ED_NODE_TO_DISK_DICT: _EncodeNodeToDiskDict,
639
  rpc_defs.ED_COMPRESS: _Compress,
640
  rpc_defs.ED_FINALIZE_EXPORT_DISKS: _PrepareFinalizeExportDisks,
641
  rpc_defs.ED_IMPEXP_IO: _EncodeImportExportIO,
642
  rpc_defs.ED_BLOCKDEV_RENAME: _EncodeBlockdevRename,
643
  }
644

    
645

    
646
class RpcRunner(_RpcClientBase,
647
                _generated_rpc.RpcClientDefault,
648
                _generated_rpc.RpcClientBootstrap,
649
                _generated_rpc.RpcClientDnsOnly,
650
                _generated_rpc.RpcClientConfig):
651
  """RPC runner class.
652

653
  """
654
  def __init__(self, cfg, lock_monitor_cb, _req_process_fn=None, _getents=None):
655
    """Initialized the RPC runner.
656

657
    @type cfg: L{config.ConfigWriter}
658
    @param cfg: Configuration
659
    @type lock_monitor_cb: callable
660
    @param lock_monitor_cb: Lock monitor callback
661

662
    """
663
    self._cfg = cfg
664

    
665
    encoders = _ENCODERS.copy()
666

    
667
    encoders.update({
668
      # Encoders requiring configuration object
669
      rpc_defs.ED_INST_DICT: self._InstDict,
670
      rpc_defs.ED_INST_DICT_HVP_BEP_DP: self._InstDictHvpBepDp,
671
      rpc_defs.ED_INST_DICT_OSP_DP: self._InstDictOspDp,
672
      rpc_defs.ED_NIC_DICT: self._NicDict,
673

    
674
      # Encoders annotating disk parameters
675
      rpc_defs.ED_DISKS_DICT_DP: self._DisksDictDP,
676
      rpc_defs.ED_SINGLE_DISK_DICT_DP: self._SingleDiskDictDP,
677

    
678
      # Encoders with special requirements
679
      rpc_defs.ED_FILE_DETAILS: compat.partial(_PrepareFileUpload, _getents),
680
      })
681

    
682
    # Resolver using configuration
683
    resolver = compat.partial(_NodeConfigResolver, cfg.GetNodeInfo,
684
                              cfg.GetAllNodesInfo)
685

    
686
    # Pylint doesn't recognize multiple inheritance properly, see
687
    # <http://www.logilab.org/ticket/36586> and
688
    # <http://www.logilab.org/ticket/35642>
689
    # pylint: disable=W0233
690
    _RpcClientBase.__init__(self, resolver, encoders.get,
691
                            lock_monitor_cb=lock_monitor_cb,
692
                            _req_process_fn=_req_process_fn)
693
    _generated_rpc.RpcClientConfig.__init__(self)
694
    _generated_rpc.RpcClientBootstrap.__init__(self)
695
    _generated_rpc.RpcClientDnsOnly.__init__(self)
696
    _generated_rpc.RpcClientDefault.__init__(self)
697

    
698
  def _NicDict(self, nic):
699
    """Convert the given nic to a dict and encapsulate netinfo
700

701
    """
702
    n = copy.deepcopy(nic)
703
    if n.network:
704
      net_uuid = self._cfg.LookupNetwork(n.network)
705
      if net_uuid:
706
        nobj = self._cfg.GetNetwork(net_uuid)
707
        n.netinfo = objects.Network.ToDict(nobj)
708
    return n.ToDict()
709

    
710
  def _InstDict(self, instance, hvp=None, bep=None, osp=None):
711
    """Convert the given instance to a dict.
712

713
    This is done via the instance's ToDict() method and additionally
714
    we fill the hvparams with the cluster defaults.
715

716
    @type instance: L{objects.Instance}
717
    @param instance: an Instance object
718
    @type hvp: dict or None
719
    @param hvp: a dictionary with overridden hypervisor parameters
720
    @type bep: dict or None
721
    @param bep: a dictionary with overridden backend parameters
722
    @type osp: dict or None
723
    @param osp: a dictionary with overridden os parameters
724
    @rtype: dict
725
    @return: the instance dict, with the hvparams filled with the
726
        cluster defaults
727

728
    """
729
    idict = instance.ToDict()
730
    cluster = self._cfg.GetClusterInfo()
731
    idict["hvparams"] = cluster.FillHV(instance)
732
    if hvp is not None:
733
      idict["hvparams"].update(hvp)
734
    idict["beparams"] = cluster.FillBE(instance)
735
    if bep is not None:
736
      idict["beparams"].update(bep)
737
    idict["osparams"] = cluster.SimpleFillOS(instance.os, instance.osparams)
738
    if osp is not None:
739
      idict["osparams"].update(osp)
740
    idict["disks"] = self._DisksDictDP((instance.disks, instance))
741
    for nic in idict["nics"]:
742
      nic["nicparams"] = objects.FillDict(
743
        cluster.nicparams[constants.PP_DEFAULT],
744
        nic["nicparams"])
745
      network = nic.get("network", None)
746
      if network:
747
        net_uuid = self._cfg.LookupNetwork(network)
748
        if net_uuid:
749
          nobj = self._cfg.GetNetwork(net_uuid)
750
          nic["netinfo"] = objects.Network.ToDict(nobj)
751
    return idict
752

    
753
  def _InstDictHvpBepDp(self, (instance, hvp, bep)):
754
    """Wrapper for L{_InstDict}.
755

756
    """
757
    return self._InstDict(instance, hvp=hvp, bep=bep)
758

    
759
  def _InstDictOspDp(self, (instance, osparams)):
760
    """Wrapper for L{_InstDict}.
761

762
    """
763
    return self._InstDict(instance, osp=osparams)
764

    
765
  def _DisksDictDP(self, (disks, instance)):
766
    """Wrapper for L{AnnotateDiskParams}.
767

768
    """
769
    diskparams = self._cfg.GetInstanceDiskParams(instance)
770
    return [disk.ToDict()
771
            for disk in AnnotateDiskParams(instance.disk_template,
772
                                           disks, diskparams)]
773

    
774
  def _SingleDiskDictDP(self, (disk, instance)):
775
    """Wrapper for L{AnnotateDiskParams}.
776

777
    """
778
    (anno_disk,) = self._DisksDictDP(([disk], instance))
779
    return anno_disk
780

    
781

    
782
class JobQueueRunner(_RpcClientBase, _generated_rpc.RpcClientJobQueue):
783
  """RPC wrappers for job queue.
784

785
  """
786
  def __init__(self, context, address_list):
787
    """Initializes this class.
788

789
    """
790
    if address_list is None:
791
      resolver = compat.partial(_SsconfResolver, True)
792
    else:
793
      # Caller provided an address list
794
      resolver = _StaticResolver(address_list)
795

    
796
    _RpcClientBase.__init__(self, resolver, _ENCODERS.get,
797
                            lock_monitor_cb=context.glm.AddToLockMonitor)
798
    _generated_rpc.RpcClientJobQueue.__init__(self)
799

    
800

    
801
class BootstrapRunner(_RpcClientBase,
802
                      _generated_rpc.RpcClientBootstrap,
803
                      _generated_rpc.RpcClientDnsOnly):
804
  """RPC wrappers for bootstrapping.
805

806
  """
807
  def __init__(self):
808
    """Initializes this class.
809

810
    """
811
    # Pylint doesn't recognize multiple inheritance properly, see
812
    # <http://www.logilab.org/ticket/36586> and
813
    # <http://www.logilab.org/ticket/35642>
814
    # pylint: disable=W0233
815
    _RpcClientBase.__init__(self, compat.partial(_SsconfResolver, True),
816
                            _ENCODERS.get)
817
    _generated_rpc.RpcClientBootstrap.__init__(self)
818
    _generated_rpc.RpcClientDnsOnly.__init__(self)
819

    
820

    
821
class DnsOnlyRunner(_RpcClientBase, _generated_rpc.RpcClientDnsOnly):
822
  """RPC wrappers for calls using only DNS.
823

824
  """
825
  def __init__(self):
826
    """Initialize this class.
827

828
    """
829
    _RpcClientBase.__init__(self, compat.partial(_SsconfResolver, False),
830
                            _ENCODERS.get)
831
    _generated_rpc.RpcClientDnsOnly.__init__(self)
832

    
833

    
834
class ConfigRunner(_RpcClientBase, _generated_rpc.RpcClientConfig):
835
  """RPC wrappers for L{config}.
836

837
  """
838
  def __init__(self, context, address_list, _req_process_fn=None,
839
               _getents=None):
840
    """Initializes this class.
841

842
    """
843
    if context:
844
      lock_monitor_cb = context.glm.AddToLockMonitor
845
    else:
846
      lock_monitor_cb = None
847

    
848
    if address_list is None:
849
      resolver = compat.partial(_SsconfResolver, True)
850
    else:
851
      # Caller provided an address list
852
      resolver = _StaticResolver(address_list)
853

    
854
    encoders = _ENCODERS.copy()
855

    
856
    encoders.update({
857
      rpc_defs.ED_FILE_DETAILS: compat.partial(_PrepareFileUpload, _getents),
858
      })
859

    
860
    _RpcClientBase.__init__(self, resolver, encoders.get,
861
                            lock_monitor_cb=lock_monitor_cb,
862
                            _req_process_fn=_req_process_fn)
863
    _generated_rpc.RpcClientConfig.__init__(self)