Statistics
| Branch: | Tag: | Revision:

root / lib / rpc.py @ cbe4a0a5

History | View | Annotate | Download (25 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Inter-node RPC library.
23

24
"""
25

    
26
# pylint: disable=C0103,R0201,R0904
27
# C0103: Invalid name, since call_ are not valid
28
# R0201: Method could be a function, we keep all rpcs instance methods
29
# as not to change them back and forth between static/instance methods
30
# if they need to start using instance attributes
31
# R0904: Too many public methods
32

    
33
import logging
34
import zlib
35
import base64
36
import pycurl
37
import threading
38
import copy
39

    
40
from ganeti import utils
41
from ganeti import objects
42
from ganeti import http
43
from ganeti import serializer
44
from ganeti import constants
45
from ganeti import errors
46
from ganeti import netutils
47
from ganeti import ssconf
48
from ganeti import runtime
49
from ganeti import compat
50
from ganeti import rpc_defs
51
from ganeti import pathutils
52
from ganeti import vcluster
53

    
54
# Special module generated at build time
55
from ganeti import _generated_rpc
56

    
57
# pylint has a bug here, doesn't see this import
58
import ganeti.http.client  # pylint: disable=W0611
59

    
60

    
61
_RPC_CLIENT_HEADERS = [
62
  "Content-type: %s" % http.HTTP_APP_JSON,
63
  "Expect:",
64
  ]
65

    
66
#: Special value to describe an offline host
67
_OFFLINE = object()
68

    
69

    
70
def Init():
71
  """Initializes the module-global HTTP client manager.
72

73
  Must be called before using any RPC function and while exactly one thread is
74
  running.
75

76
  """
77
  # curl_global_init(3) and curl_global_cleanup(3) must be called with only
78
  # one thread running. This check is just a safety measure -- it doesn't
79
  # cover all cases.
80
  assert threading.activeCount() == 1, \
81
         "Found more than one active thread when initializing pycURL"
82

    
83
  logging.info("Using PycURL %s", pycurl.version)
84

    
85
  pycurl.global_init(pycurl.GLOBAL_ALL)
86

    
87

    
88
def Shutdown():
89
  """Stops the module-global HTTP client manager.
90

91
  Must be called before quitting the program and while exactly one thread is
92
  running.
93

94
  """
95
  pycurl.global_cleanup()
96

    
97

    
98
def _ConfigRpcCurl(curl):
99
  noded_cert = str(pathutils.NODED_CERT_FILE)
100

    
101
  curl.setopt(pycurl.FOLLOWLOCATION, False)
102
  curl.setopt(pycurl.CAINFO, noded_cert)
103
  curl.setopt(pycurl.SSL_VERIFYHOST, 0)
104
  curl.setopt(pycurl.SSL_VERIFYPEER, True)
105
  curl.setopt(pycurl.SSLCERTTYPE, "PEM")
106
  curl.setopt(pycurl.SSLCERT, noded_cert)
107
  curl.setopt(pycurl.SSLKEYTYPE, "PEM")
108
  curl.setopt(pycurl.SSLKEY, noded_cert)
109
  curl.setopt(pycurl.CONNECTTIMEOUT, constants.RPC_CONNECT_TIMEOUT)
110

    
111

    
112
def RunWithRPC(fn):
113
  """RPC-wrapper decorator.
114

115
  When applied to a function, it runs it with the RPC system
116
  initialized, and it shutsdown the system afterwards. This means the
117
  function must be called without RPC being initialized.
118

119
  """
120
  def wrapper(*args, **kwargs):
121
    Init()
122
    try:
123
      return fn(*args, **kwargs)
124
    finally:
125
      Shutdown()
126
  return wrapper
127

    
128

    
129
def _Compress(data):
130
  """Compresses a string for transport over RPC.
131

132
  Small amounts of data are not compressed.
133

134
  @type data: str
135
  @param data: Data
136
  @rtype: tuple
137
  @return: Encoded data to send
138

139
  """
140
  # Small amounts of data are not compressed
141
  if len(data) < 512:
142
    return (constants.RPC_ENCODING_NONE, data)
143

    
144
  # Compress with zlib and encode in base64
145
  return (constants.RPC_ENCODING_ZLIB_BASE64,
146
          base64.b64encode(zlib.compress(data, 3)))
147

    
148

    
149
class RpcResult(object):
150
  """RPC Result class.
151

152
  This class holds an RPC result. It is needed since in multi-node
153
  calls we can't raise an exception just because one one out of many
154
  failed, and therefore we use this class to encapsulate the result.
155

156
  @ivar data: the data payload, for successful results, or None
157
  @ivar call: the name of the RPC call
158
  @ivar node: the name of the node to which we made the call
159
  @ivar offline: whether the operation failed because the node was
160
      offline, as opposed to actual failure; offline=True will always
161
      imply failed=True, in order to allow simpler checking if
162
      the user doesn't care about the exact failure mode
163
  @ivar fail_msg: the error message if the call failed
164

165
  """
166
  def __init__(self, data=None, failed=False, offline=False,
167
               call=None, node=None):
168
    self.offline = offline
169
    self.call = call
170
    self.node = node
171

    
172
    if offline:
173
      self.fail_msg = "Node is marked offline"
174
      self.data = self.payload = None
175
    elif failed:
176
      self.fail_msg = self._EnsureErr(data)
177
      self.data = self.payload = None
178
    else:
179
      self.data = data
180
      if not isinstance(self.data, (tuple, list)):
181
        self.fail_msg = ("RPC layer error: invalid result type (%s)" %
182
                         type(self.data))
183
        self.payload = None
184
      elif len(data) != 2:
185
        self.fail_msg = ("RPC layer error: invalid result length (%d), "
186
                         "expected 2" % len(self.data))
187
        self.payload = None
188
      elif not self.data[0]:
189
        self.fail_msg = self._EnsureErr(self.data[1])
190
        self.payload = None
191
      else:
192
        # finally success
193
        self.fail_msg = None
194
        self.payload = data[1]
195

    
196
    for attr_name in ["call", "data", "fail_msg",
197
                      "node", "offline", "payload"]:
198
      assert hasattr(self, attr_name), "Missing attribute %s" % attr_name
199

    
200
  @staticmethod
201
  def _EnsureErr(val):
202
    """Helper to ensure we return a 'True' value for error."""
203
    if val:
204
      return val
205
    else:
206
      return "No error information"
207

    
208
  def Raise(self, msg, prereq=False, ecode=None):
209
    """If the result has failed, raise an OpExecError.
210

211
    This is used so that LU code doesn't have to check for each
212
    result, but instead can call this function.
213

214
    """
215
    if not self.fail_msg:
216
      return
217

    
218
    if not msg: # one could pass None for default message
219
      msg = ("Call '%s' to node '%s' has failed: %s" %
220
             (self.call, self.node, self.fail_msg))
221
    else:
222
      msg = "%s: %s" % (msg, self.fail_msg)
223
    if prereq:
224
      ec = errors.OpPrereqError
225
    else:
226
      ec = errors.OpExecError
227
    if ecode is not None:
228
      args = (msg, ecode)
229
    else:
230
      args = (msg, )
231
    raise ec(*args) # pylint: disable=W0142
232

    
233

    
234
def _SsconfResolver(ssconf_ips, node_list, _,
235
                    ssc=ssconf.SimpleStore,
236
                    nslookup_fn=netutils.Hostname.GetIP):
237
  """Return addresses for given node names.
238

239
  @type ssconf_ips: bool
240
  @param ssconf_ips: Use the ssconf IPs
241
  @type node_list: list
242
  @param node_list: List of node names
243
  @type ssc: class
244
  @param ssc: SimpleStore class that is used to obtain node->ip mappings
245
  @type nslookup_fn: callable
246
  @param nslookup_fn: function use to do NS lookup
247
  @rtype: list of tuple; (string, string)
248
  @return: List of tuples containing node name and IP address
249

250
  """
251
  ss = ssc()
252
  family = ss.GetPrimaryIPFamily()
253

    
254
  if ssconf_ips:
255
    iplist = ss.GetNodePrimaryIPList()
256
    ipmap = dict(entry.split() for entry in iplist)
257
  else:
258
    ipmap = {}
259

    
260
  result = []
261
  for node in node_list:
262
    ip = ipmap.get(node)
263
    if ip is None:
264
      ip = nslookup_fn(node, family=family)
265
    result.append((node, ip))
266

    
267
  return result
268

    
269

    
270
class _StaticResolver:
271
  def __init__(self, addresses):
272
    """Initializes this class.
273

274
    """
275
    self._addresses = addresses
276

    
277
  def __call__(self, hosts, _):
278
    """Returns static addresses for hosts.
279

280
    """
281
    assert len(hosts) == len(self._addresses)
282
    return zip(hosts, self._addresses)
283

    
284

    
285
def _CheckConfigNode(name, node, accept_offline_node):
286
  """Checks if a node is online.
287

288
  @type name: string
289
  @param name: Node name
290
  @type node: L{objects.Node} or None
291
  @param node: Node object
292

293
  """
294
  if node is None:
295
    # Depend on DNS for name resolution
296
    ip = name
297
  elif node.offline and not accept_offline_node:
298
    ip = _OFFLINE
299
  else:
300
    ip = node.primary_ip
301
  return (name, ip)
302

    
303

    
304
def _NodeConfigResolver(single_node_fn, all_nodes_fn, hosts, opts):
305
  """Calculate node addresses using configuration.
306

307
  """
308
  accept_offline_node = (opts is rpc_defs.ACCEPT_OFFLINE_NODE)
309

    
310
  assert accept_offline_node or opts is None, "Unknown option"
311

    
312
  # Special case for single-host lookups
313
  if len(hosts) == 1:
314
    (name, ) = hosts
315
    return [_CheckConfigNode(name, single_node_fn(name), accept_offline_node)]
316
  else:
317
    all_nodes = all_nodes_fn()
318
    return [_CheckConfigNode(name, all_nodes.get(name, None),
319
                             accept_offline_node)
320
            for name in hosts]
321

    
322

    
323
class _RpcProcessor:
324
  def __init__(self, resolver, port, lock_monitor_cb=None):
325
    """Initializes this class.
326

327
    @param resolver: callable accepting a list of hostnames, returning a list
328
      of tuples containing name and IP address (IP address can be the name or
329
      the special value L{_OFFLINE} to mark offline machines)
330
    @type port: int
331
    @param port: TCP port
332
    @param lock_monitor_cb: Callable for registering with lock monitor
333

334
    """
335
    self._resolver = resolver
336
    self._port = port
337
    self._lock_monitor_cb = lock_monitor_cb
338

    
339
  @staticmethod
340
  def _PrepareRequests(hosts, port, procedure, body, read_timeout):
341
    """Prepares requests by sorting offline hosts into separate list.
342

343
    @type body: dict
344
    @param body: a dictionary with per-host body data
345

346
    """
347
    results = {}
348
    requests = {}
349

    
350
    assert isinstance(body, dict)
351
    assert len(body) == len(hosts)
352
    assert compat.all(isinstance(v, str) for v in body.values())
353
    assert frozenset(map(compat.fst, hosts)) == frozenset(body.keys()), \
354
        "%s != %s" % (hosts, body.keys())
355

    
356
    for (name, ip) in hosts:
357
      if ip is _OFFLINE:
358
        # Node is marked as offline
359
        results[name] = RpcResult(node=name, offline=True, call=procedure)
360
      else:
361
        requests[name] = \
362
          http.client.HttpClientRequest(str(ip), port,
363
                                        http.HTTP_POST, str("/%s" % procedure),
364
                                        headers=_RPC_CLIENT_HEADERS,
365
                                        post_data=body[name],
366
                                        read_timeout=read_timeout,
367
                                        nicename="%s/%s" % (name, procedure),
368
                                        curl_config_fn=_ConfigRpcCurl)
369

    
370
    return (results, requests)
371

    
372
  @staticmethod
373
  def _CombineResults(results, requests, procedure):
374
    """Combines pre-computed results for offline hosts with actual call results.
375

376
    """
377
    for name, req in requests.items():
378
      if req.success and req.resp_status_code == http.HTTP_OK:
379
        host_result = RpcResult(data=serializer.LoadJson(req.resp_body),
380
                                node=name, call=procedure)
381
      else:
382
        # TODO: Better error reporting
383
        if req.error:
384
          msg = req.error
385
        else:
386
          msg = req.resp_body
387

    
388
        logging.error("RPC error in %s on node %s: %s", procedure, name, msg)
389
        host_result = RpcResult(data=msg, failed=True, node=name,
390
                                call=procedure)
391

    
392
      results[name] = host_result
393

    
394
    return results
395

    
396
  def __call__(self, hosts, procedure, body, read_timeout, resolver_opts,
397
               _req_process_fn=None):
398
    """Makes an RPC request to a number of nodes.
399

400
    @type hosts: sequence
401
    @param hosts: Hostnames
402
    @type procedure: string
403
    @param procedure: Request path
404
    @type body: dictionary
405
    @param body: dictionary with request bodies per host
406
    @type read_timeout: int or None
407
    @param read_timeout: Read timeout for request
408

409
    """
410
    assert read_timeout is not None, \
411
      "Missing RPC read timeout for procedure '%s'" % procedure
412

    
413
    if _req_process_fn is None:
414
      _req_process_fn = http.client.ProcessRequests
415

    
416
    (results, requests) = \
417
      self._PrepareRequests(self._resolver(hosts, resolver_opts), self._port,
418
                            procedure, body, read_timeout)
419

    
420
    _req_process_fn(requests.values(), lock_monitor_cb=self._lock_monitor_cb)
421

    
422
    assert not frozenset(results).intersection(requests)
423

    
424
    return self._CombineResults(results, requests, procedure)
425

    
426

    
427
class _RpcClientBase:
428
  def __init__(self, resolver, encoder_fn, lock_monitor_cb=None,
429
               _req_process_fn=None):
430
    """Initializes this class.
431

432
    """
433
    proc = _RpcProcessor(resolver,
434
                         netutils.GetDaemonPort(constants.NODED),
435
                         lock_monitor_cb=lock_monitor_cb)
436
    self._proc = compat.partial(proc, _req_process_fn=_req_process_fn)
437
    self._encoder = compat.partial(self._EncodeArg, encoder_fn)
438

    
439
  @staticmethod
440
  def _EncodeArg(encoder_fn, (argkind, value)):
441
    """Encode argument.
442

443
    """
444
    if argkind is None:
445
      return value
446
    else:
447
      return encoder_fn(argkind)(value)
448

    
449
  def _Call(self, cdef, node_list, args):
450
    """Entry point for automatically generated RPC wrappers.
451

452
    """
453
    (procedure, _, resolver_opts, timeout, argdefs,
454
     prep_fn, postproc_fn, _) = cdef
455

    
456
    if callable(timeout):
457
      read_timeout = timeout(args)
458
    else:
459
      read_timeout = timeout
460

    
461
    if callable(resolver_opts):
462
      req_resolver_opts = resolver_opts(args)
463
    else:
464
      req_resolver_opts = resolver_opts
465

    
466
    if len(args) != len(argdefs):
467
      raise errors.ProgrammerError("Number of passed arguments doesn't match")
468

    
469
    enc_args = map(self._encoder, zip(map(compat.snd, argdefs), args))
470
    if prep_fn is None:
471
      # for a no-op prep_fn, we serialise the body once, and then we
472
      # reuse it in the dictionary values
473
      body = serializer.DumpJson(enc_args)
474
      pnbody = dict((n, body) for n in node_list)
475
    else:
476
      # for a custom prep_fn, we pass the encoded arguments and the
477
      # node name to the prep_fn, and we serialise its return value
478
      assert callable(prep_fn)
479
      pnbody = dict((n, serializer.DumpJson(prep_fn(n, enc_args)))
480
                    for n in node_list)
481

    
482
    result = self._proc(node_list, procedure, pnbody, read_timeout,
483
                        req_resolver_opts)
484

    
485
    if postproc_fn:
486
      return dict(map(lambda (key, value): (key, postproc_fn(value)),
487
                      result.items()))
488
    else:
489
      return result
490

    
491

    
492
def _ObjectToDict(value):
493
  """Converts an object to a dictionary.
494

495
  @note: See L{objects}.
496

497
  """
498
  return value.ToDict()
499

    
500

    
501
def _ObjectListToDict(value):
502
  """Converts a list of L{objects} to dictionaries.
503

504
  """
505
  return map(_ObjectToDict, value)
506

    
507

    
508
def _EncodeNodeToDiskDict(value):
509
  """Encodes a dictionary with node name as key and disk objects as values.
510

511
  """
512
  return dict((name, _ObjectListToDict(disks))
513
              for name, disks in value.items())
514

    
515

    
516
def _PrepareFileUpload(getents_fn, filename):
517
  """Loads a file and prepares it for an upload to nodes.
518

519
  """
520
  statcb = utils.FileStatHelper()
521
  data = _Compress(utils.ReadFile(filename, preread=statcb))
522
  st = statcb.st
523

    
524
  if getents_fn is None:
525
    getents_fn = runtime.GetEnts
526

    
527
  getents = getents_fn()
528

    
529
  virt_filename = vcluster.MakeVirtualPath(filename)
530

    
531
  return [virt_filename, data, st.st_mode, getents.LookupUid(st.st_uid),
532
          getents.LookupGid(st.st_gid), st.st_atime, st.st_mtime]
533

    
534

    
535
def _PrepareFinalizeExportDisks(snap_disks):
536
  """Encodes disks for finalizing export.
537

538
  """
539
  flat_disks = []
540

    
541
  for disk in snap_disks:
542
    if isinstance(disk, bool):
543
      flat_disks.append(disk)
544
    else:
545
      flat_disks.append(disk.ToDict())
546

    
547
  return flat_disks
548

    
549

    
550
def _EncodeImportExportIO((ieio, ieioargs)):
551
  """Encodes import/export I/O information.
552

553
  """
554
  if ieio == constants.IEIO_RAW_DISK:
555
    assert len(ieioargs) == 1
556
    return (ieio, (ieioargs[0].ToDict(), ))
557

    
558
  if ieio == constants.IEIO_SCRIPT:
559
    assert len(ieioargs) == 2
560
    return (ieio, (ieioargs[0].ToDict(), ieioargs[1]))
561

    
562
  return (ieio, ieioargs)
563

    
564

    
565
def _EncodeBlockdevRename(value):
566
  """Encodes information for renaming block devices.
567

568
  """
569
  return [(d.ToDict(), uid) for d, uid in value]
570

    
571

    
572
def MakeLegacyNodeInfo(data):
573
  """Formats the data returned by L{rpc.RpcRunner.call_node_info}.
574

575
  Converts the data into a single dictionary. This is fine for most use cases,
576
  but some require information from more than one volume group or hypervisor.
577

578
  """
579
  (bootid, (vg_info, ), (hv_info, )) = data
580

    
581
  return utils.JoinDisjointDicts(utils.JoinDisjointDicts(vg_info, hv_info), {
582
    "bootid": bootid,
583
    })
584

    
585

    
586
def _AnnotateDParamsDRBD(disk, (drbd_params, data_params, meta_params)):
587
  """Annotates just DRBD disks layouts.
588

589
  """
590
  assert disk.dev_type == constants.LD_DRBD8
591

    
592
  disk.params = objects.FillDict(drbd_params, disk.params)
593
  (dev_data, dev_meta) = disk.children
594
  dev_data.params = objects.FillDict(data_params, dev_data.params)
595
  dev_meta.params = objects.FillDict(meta_params, dev_meta.params)
596

    
597
  return disk
598

    
599

    
600
def _AnnotateDParamsGeneric(disk, (params, )):
601
  """Generic disk parameter annotation routine.
602

603
  """
604
  assert disk.dev_type != constants.LD_DRBD8
605

    
606
  disk.params = objects.FillDict(params, disk.params)
607

    
608
  return disk
609

    
610

    
611
def AnnotateDiskParams(template, disks, disk_params):
612
  """Annotates the disk objects with the disk parameters.
613

614
  @param template: The disk template used
615
  @param disks: The list of disks objects to annotate
616
  @param disk_params: The disk paramaters for annotation
617
  @returns: A list of disk objects annotated
618

619
  """
620
  ld_params = objects.Disk.ComputeLDParams(template, disk_params)
621

    
622
  if template == constants.DT_DRBD8:
623
    annotation_fn = _AnnotateDParamsDRBD
624
  elif template == constants.DT_DISKLESS:
625
    annotation_fn = lambda disk, _: disk
626
  else:
627
    annotation_fn = _AnnotateDParamsGeneric
628

    
629
  return [annotation_fn(disk.Copy(), ld_params) for disk in disks]
630

    
631

    
632
#: Generic encoders
633
_ENCODERS = {
634
  rpc_defs.ED_OBJECT_DICT: _ObjectToDict,
635
  rpc_defs.ED_OBJECT_DICT_LIST: _ObjectListToDict,
636
  rpc_defs.ED_NODE_TO_DISK_DICT: _EncodeNodeToDiskDict,
637
  rpc_defs.ED_COMPRESS: _Compress,
638
  rpc_defs.ED_FINALIZE_EXPORT_DISKS: _PrepareFinalizeExportDisks,
639
  rpc_defs.ED_IMPEXP_IO: _EncodeImportExportIO,
640
  rpc_defs.ED_BLOCKDEV_RENAME: _EncodeBlockdevRename,
641
  }
642

    
643

    
644
class RpcRunner(_RpcClientBase,
645
                _generated_rpc.RpcClientDefault,
646
                _generated_rpc.RpcClientBootstrap,
647
                _generated_rpc.RpcClientDnsOnly,
648
                _generated_rpc.RpcClientConfig):
649
  """RPC runner class.
650

651
  """
652
  def __init__(self, cfg, lock_monitor_cb, _req_process_fn=None, _getents=None):
653
    """Initialized the RPC runner.
654

655
    @type cfg: L{config.ConfigWriter}
656
    @param cfg: Configuration
657
    @type lock_monitor_cb: callable
658
    @param lock_monitor_cb: Lock monitor callback
659

660
    """
661
    self._cfg = cfg
662

    
663
    encoders = _ENCODERS.copy()
664

    
665
    encoders.update({
666
      # Encoders requiring configuration object
667
      rpc_defs.ED_INST_DICT: self._InstDict,
668
      rpc_defs.ED_INST_DICT_HVP_BEP_DP: self._InstDictHvpBepDp,
669
      rpc_defs.ED_INST_DICT_OSP_DP: self._InstDictOspDp,
670
      rpc_defs.ED_NIC_DICT: self._NicDict,
671

    
672
      # Encoders annotating disk parameters
673
      rpc_defs.ED_DISKS_DICT_DP: self._DisksDictDP,
674
      rpc_defs.ED_SINGLE_DISK_DICT_DP: self._SingleDiskDictDP,
675

    
676
      # Encoders with special requirements
677
      rpc_defs.ED_FILE_DETAILS: compat.partial(_PrepareFileUpload, _getents),
678
      })
679

    
680
    # Resolver using configuration
681
    resolver = compat.partial(_NodeConfigResolver, cfg.GetNodeInfo,
682
                              cfg.GetAllNodesInfo)
683

    
684
    # Pylint doesn't recognize multiple inheritance properly, see
685
    # <http://www.logilab.org/ticket/36586> and
686
    # <http://www.logilab.org/ticket/35642>
687
    # pylint: disable=W0233
688
    _RpcClientBase.__init__(self, resolver, encoders.get,
689
                            lock_monitor_cb=lock_monitor_cb,
690
                            _req_process_fn=_req_process_fn)
691
    _generated_rpc.RpcClientConfig.__init__(self)
692
    _generated_rpc.RpcClientBootstrap.__init__(self)
693
    _generated_rpc.RpcClientDnsOnly.__init__(self)
694
    _generated_rpc.RpcClientDefault.__init__(self)
695

    
696
  def _NicDict(self, nic):
697
    """Convert the given nic to a dict and encapsulate netinfo
698

699
    """
700
    n = copy.deepcopy(nic)
701
    if n.network:
702
      net_uuid = self._cfg.LookupNetwork(n.network)
703
      if net_uuid:
704
        nobj = self._cfg.GetNetwork(net_uuid)
705
        n.netinfo = objects.Network.ToDict(nobj)
706
    return n.ToDict()
707

    
708
  def _InstDict(self, instance, hvp=None, bep=None, osp=None):
709
    """Convert the given instance to a dict.
710

711
    This is done via the instance's ToDict() method and additionally
712
    we fill the hvparams with the cluster defaults.
713

714
    @type instance: L{objects.Instance}
715
    @param instance: an Instance object
716
    @type hvp: dict or None
717
    @param hvp: a dictionary with overridden hypervisor parameters
718
    @type bep: dict or None
719
    @param bep: a dictionary with overridden backend parameters
720
    @type osp: dict or None
721
    @param osp: a dictionary with overridden os parameters
722
    @rtype: dict
723
    @return: the instance dict, with the hvparams filled with the
724
        cluster defaults
725

726
    """
727
    idict = instance.ToDict()
728
    cluster = self._cfg.GetClusterInfo()
729
    idict["hvparams"] = cluster.FillHV(instance)
730
    if hvp is not None:
731
      idict["hvparams"].update(hvp)
732
    idict["beparams"] = cluster.FillBE(instance)
733
    if bep is not None:
734
      idict["beparams"].update(bep)
735
    idict["osparams"] = cluster.SimpleFillOS(instance.os, instance.osparams)
736
    if osp is not None:
737
      idict["osparams"].update(osp)
738
    idict["disks"] = self._DisksDictDP((instance.disks, instance))
739
    for nic in idict["nics"]:
740
      nic["nicparams"] = objects.FillDict(
741
        cluster.nicparams[constants.PP_DEFAULT],
742
        nic["nicparams"])
743
      network = nic.get("network", None)
744
      if network:
745
        net_uuid = self._cfg.LookupNetwork(network)
746
        if net_uuid:
747
          nobj = self._cfg.GetNetwork(net_uuid)
748
          nic["netinfo"] = objects.Network.ToDict(nobj)
749
    return idict
750

    
751
  def _InstDictHvpBepDp(self, (instance, hvp, bep)):
752
    """Wrapper for L{_InstDict}.
753

754
    """
755
    return self._InstDict(instance, hvp=hvp, bep=bep)
756

    
757
  def _InstDictOspDp(self, (instance, osparams)):
758
    """Wrapper for L{_InstDict}.
759

760
    """
761
    return self._InstDict(instance, osp=osparams)
762

    
763
  def _DisksDictDP(self, (disks, instance)):
764
    """Wrapper for L{AnnotateDiskParams}.
765

766
    """
767
    diskparams = self._cfg.GetInstanceDiskParams(instance)
768
    return [disk.ToDict()
769
            for disk in AnnotateDiskParams(instance.disk_template,
770
                                           disks, diskparams)]
771

    
772
  def _SingleDiskDictDP(self, (disk, instance)):
773
    """Wrapper for L{AnnotateDiskParams}.
774

775
    """
776
    (anno_disk,) = self._DisksDictDP(([disk], instance))
777
    return anno_disk
778

    
779

    
780
class JobQueueRunner(_RpcClientBase, _generated_rpc.RpcClientJobQueue):
781
  """RPC wrappers for job queue.
782

783
  """
784
  def __init__(self, context, address_list):
785
    """Initializes this class.
786

787
    """
788
    if address_list is None:
789
      resolver = compat.partial(_SsconfResolver, True)
790
    else:
791
      # Caller provided an address list
792
      resolver = _StaticResolver(address_list)
793

    
794
    _RpcClientBase.__init__(self, resolver, _ENCODERS.get,
795
                            lock_monitor_cb=context.glm.AddToLockMonitor)
796
    _generated_rpc.RpcClientJobQueue.__init__(self)
797

    
798

    
799
class BootstrapRunner(_RpcClientBase,
800
                      _generated_rpc.RpcClientBootstrap,
801
                      _generated_rpc.RpcClientDnsOnly):
802
  """RPC wrappers for bootstrapping.
803

804
  """
805
  def __init__(self):
806
    """Initializes this class.
807

808
    """
809
    # Pylint doesn't recognize multiple inheritance properly, see
810
    # <http://www.logilab.org/ticket/36586> and
811
    # <http://www.logilab.org/ticket/35642>
812
    # pylint: disable=W0233
813
    _RpcClientBase.__init__(self, compat.partial(_SsconfResolver, True),
814
                            _ENCODERS.get)
815
    _generated_rpc.RpcClientBootstrap.__init__(self)
816
    _generated_rpc.RpcClientDnsOnly.__init__(self)
817

    
818

    
819
class DnsOnlyRunner(_RpcClientBase, _generated_rpc.RpcClientDnsOnly):
820
  """RPC wrappers for calls using only DNS.
821

822
  """
823
  def __init__(self):
824
    """Initialize this class.
825

826
    """
827
    _RpcClientBase.__init__(self, compat.partial(_SsconfResolver, False),
828
                            _ENCODERS.get)
829
    _generated_rpc.RpcClientDnsOnly.__init__(self)
830

    
831

    
832
class ConfigRunner(_RpcClientBase, _generated_rpc.RpcClientConfig):
833
  """RPC wrappers for L{config}.
834

835
  """
836
  def __init__(self, context, address_list, _req_process_fn=None,
837
               _getents=None):
838
    """Initializes this class.
839

840
    """
841
    if context:
842
      lock_monitor_cb = context.glm.AddToLockMonitor
843
    else:
844
      lock_monitor_cb = None
845

    
846
    if address_list is None:
847
      resolver = compat.partial(_SsconfResolver, True)
848
    else:
849
      # Caller provided an address list
850
      resolver = _StaticResolver(address_list)
851

    
852
    encoders = _ENCODERS.copy()
853

    
854
    encoders.update({
855
      rpc_defs.ED_FILE_DETAILS: compat.partial(_PrepareFileUpload, _getents),
856
      })
857

    
858
    _RpcClientBase.__init__(self, resolver, encoders.get,
859
                            lock_monitor_cb=lock_monitor_cb,
860
                            _req_process_fn=_req_process_fn)
861
    _generated_rpc.RpcClientConfig.__init__(self)