Statistics
| Branch: | Tag: | Revision:

root / lib / rpc.py @ c2dc025a

History | View | Annotate | Download (21.2 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Inter-node RPC library.
23

24
"""
25

    
26
# pylint: disable=C0103,R0201,R0904
27
# C0103: Invalid name, since call_ are not valid
28
# R0201: Method could be a function, we keep all rpcs instance methods
29
# as not to change them back and forth between static/instance methods
30
# if they need to start using instance attributes
31
# R0904: Too many public methods
32

    
33
import logging
34
import zlib
35
import base64
36
import pycurl
37
import threading
38

    
39
from ganeti import utils
40
from ganeti import objects
41
from ganeti import http
42
from ganeti import serializer
43
from ganeti import constants
44
from ganeti import errors
45
from ganeti import netutils
46
from ganeti import ssconf
47
from ganeti import runtime
48
from ganeti import compat
49
from ganeti import rpc_defs
50

    
51
# Special module generated at build time
52
from ganeti import _generated_rpc
53

    
54
# pylint has a bug here, doesn't see this import
55
import ganeti.http.client  # pylint: disable=W0611
56

    
57

    
58
# Timeout for connecting to nodes (seconds)
59
_RPC_CONNECT_TIMEOUT = 5
60

    
61
_RPC_CLIENT_HEADERS = [
62
  "Content-type: %s" % http.HTTP_APP_JSON,
63
  "Expect:",
64
  ]
65

    
66
# Various time constants for the timeout table
67
_TMO_URGENT = 60 # one minute
68
_TMO_FAST = 5 * 60 # five minutes
69
_TMO_NORMAL = 15 * 60 # 15 minutes
70
_TMO_SLOW = 3600 # one hour
71
_TMO_4HRS = 4 * 3600
72
_TMO_1DAY = 86400
73

    
74
#: Special value to describe an offline host
75
_OFFLINE = object()
76

    
77

    
78
def Init():
79
  """Initializes the module-global HTTP client manager.
80

81
  Must be called before using any RPC function and while exactly one thread is
82
  running.
83

84
  """
85
  # curl_global_init(3) and curl_global_cleanup(3) must be called with only
86
  # one thread running. This check is just a safety measure -- it doesn't
87
  # cover all cases.
88
  assert threading.activeCount() == 1, \
89
         "Found more than one active thread when initializing pycURL"
90

    
91
  logging.info("Using PycURL %s", pycurl.version)
92

    
93
  pycurl.global_init(pycurl.GLOBAL_ALL)
94

    
95

    
96
def Shutdown():
97
  """Stops the module-global HTTP client manager.
98

99
  Must be called before quitting the program and while exactly one thread is
100
  running.
101

102
  """
103
  pycurl.global_cleanup()
104

    
105

    
106
def _ConfigRpcCurl(curl):
107
  noded_cert = str(constants.NODED_CERT_FILE)
108

    
109
  curl.setopt(pycurl.FOLLOWLOCATION, False)
110
  curl.setopt(pycurl.CAINFO, noded_cert)
111
  curl.setopt(pycurl.SSL_VERIFYHOST, 0)
112
  curl.setopt(pycurl.SSL_VERIFYPEER, True)
113
  curl.setopt(pycurl.SSLCERTTYPE, "PEM")
114
  curl.setopt(pycurl.SSLCERT, noded_cert)
115
  curl.setopt(pycurl.SSLKEYTYPE, "PEM")
116
  curl.setopt(pycurl.SSLKEY, noded_cert)
117
  curl.setopt(pycurl.CONNECTTIMEOUT, _RPC_CONNECT_TIMEOUT)
118

    
119

    
120
def RunWithRPC(fn):
121
  """RPC-wrapper decorator.
122

123
  When applied to a function, it runs it with the RPC system
124
  initialized, and it shutsdown the system afterwards. This means the
125
  function must be called without RPC being initialized.
126

127
  """
128
  def wrapper(*args, **kwargs):
129
    Init()
130
    try:
131
      return fn(*args, **kwargs)
132
    finally:
133
      Shutdown()
134
  return wrapper
135

    
136

    
137
def _Compress(data):
138
  """Compresses a string for transport over RPC.
139

140
  Small amounts of data are not compressed.
141

142
  @type data: str
143
  @param data: Data
144
  @rtype: tuple
145
  @return: Encoded data to send
146

147
  """
148
  # Small amounts of data are not compressed
149
  if len(data) < 512:
150
    return (constants.RPC_ENCODING_NONE, data)
151

    
152
  # Compress with zlib and encode in base64
153
  return (constants.RPC_ENCODING_ZLIB_BASE64,
154
          base64.b64encode(zlib.compress(data, 3)))
155

    
156

    
157
class RpcResult(object):
158
  """RPC Result class.
159

160
  This class holds an RPC result. It is needed since in multi-node
161
  calls we can't raise an exception just because one one out of many
162
  failed, and therefore we use this class to encapsulate the result.
163

164
  @ivar data: the data payload, for successful results, or None
165
  @ivar call: the name of the RPC call
166
  @ivar node: the name of the node to which we made the call
167
  @ivar offline: whether the operation failed because the node was
168
      offline, as opposed to actual failure; offline=True will always
169
      imply failed=True, in order to allow simpler checking if
170
      the user doesn't care about the exact failure mode
171
  @ivar fail_msg: the error message if the call failed
172

173
  """
174
  def __init__(self, data=None, failed=False, offline=False,
175
               call=None, node=None):
176
    self.offline = offline
177
    self.call = call
178
    self.node = node
179

    
180
    if offline:
181
      self.fail_msg = "Node is marked offline"
182
      self.data = self.payload = None
183
    elif failed:
184
      self.fail_msg = self._EnsureErr(data)
185
      self.data = self.payload = None
186
    else:
187
      self.data = data
188
      if not isinstance(self.data, (tuple, list)):
189
        self.fail_msg = ("RPC layer error: invalid result type (%s)" %
190
                         type(self.data))
191
        self.payload = None
192
      elif len(data) != 2:
193
        self.fail_msg = ("RPC layer error: invalid result length (%d), "
194
                         "expected 2" % len(self.data))
195
        self.payload = None
196
      elif not self.data[0]:
197
        self.fail_msg = self._EnsureErr(self.data[1])
198
        self.payload = None
199
      else:
200
        # finally success
201
        self.fail_msg = None
202
        self.payload = data[1]
203

    
204
    for attr_name in ["call", "data", "fail_msg",
205
                      "node", "offline", "payload"]:
206
      assert hasattr(self, attr_name), "Missing attribute %s" % attr_name
207

    
208
  @staticmethod
209
  def _EnsureErr(val):
210
    """Helper to ensure we return a 'True' value for error."""
211
    if val:
212
      return val
213
    else:
214
      return "No error information"
215

    
216
  def Raise(self, msg, prereq=False, ecode=None):
217
    """If the result has failed, raise an OpExecError.
218

219
    This is used so that LU code doesn't have to check for each
220
    result, but instead can call this function.
221

222
    """
223
    if not self.fail_msg:
224
      return
225

    
226
    if not msg: # one could pass None for default message
227
      msg = ("Call '%s' to node '%s' has failed: %s" %
228
             (self.call, self.node, self.fail_msg))
229
    else:
230
      msg = "%s: %s" % (msg, self.fail_msg)
231
    if prereq:
232
      ec = errors.OpPrereqError
233
    else:
234
      ec = errors.OpExecError
235
    if ecode is not None:
236
      args = (msg, ecode)
237
    else:
238
      args = (msg, )
239
    raise ec(*args) # pylint: disable=W0142
240

    
241

    
242
def _SsconfResolver(node_list, _,
243
                    ssc=ssconf.SimpleStore,
244
                    nslookup_fn=netutils.Hostname.GetIP):
245
  """Return addresses for given node names.
246

247
  @type node_list: list
248
  @param node_list: List of node names
249
  @type ssc: class
250
  @param ssc: SimpleStore class that is used to obtain node->ip mappings
251
  @type nslookup_fn: callable
252
  @param nslookup_fn: function use to do NS lookup
253
  @rtype: list of tuple; (string, string)
254
  @return: List of tuples containing node name and IP address
255

256
  """
257
  ss = ssc()
258
  iplist = ss.GetNodePrimaryIPList()
259
  family = ss.GetPrimaryIPFamily()
260
  ipmap = dict(entry.split() for entry in iplist)
261

    
262
  result = []
263
  for node in node_list:
264
    ip = ipmap.get(node)
265
    if ip is None:
266
      ip = nslookup_fn(node, family=family)
267
    result.append((node, ip))
268

    
269
  return result
270

    
271

    
272
class _StaticResolver:
273
  def __init__(self, addresses):
274
    """Initializes this class.
275

276
    """
277
    self._addresses = addresses
278

    
279
  def __call__(self, hosts, _):
280
    """Returns static addresses for hosts.
281

282
    """
283
    assert len(hosts) == len(self._addresses)
284
    return zip(hosts, self._addresses)
285

    
286

    
287
def _CheckConfigNode(name, node, accept_offline_node):
288
  """Checks if a node is online.
289

290
  @type name: string
291
  @param name: Node name
292
  @type node: L{objects.Node} or None
293
  @param node: Node object
294

295
  """
296
  if node is None:
297
    # Depend on DNS for name resolution
298
    ip = name
299
  elif node.offline and not accept_offline_node:
300
    ip = _OFFLINE
301
  else:
302
    ip = node.primary_ip
303
  return (name, ip)
304

    
305

    
306
def _NodeConfigResolver(single_node_fn, all_nodes_fn, hosts, opts):
307
  """Calculate node addresses using configuration.
308

309
  """
310
  accept_offline_node = (opts is rpc_defs.ACCEPT_OFFLINE_NODE)
311

    
312
  assert accept_offline_node or opts is None, "Unknown option"
313

    
314
  # Special case for single-host lookups
315
  if len(hosts) == 1:
316
    (name, ) = hosts
317
    return [_CheckConfigNode(name, single_node_fn(name), accept_offline_node)]
318
  else:
319
    all_nodes = all_nodes_fn()
320
    return [_CheckConfigNode(name, all_nodes.get(name, None),
321
                             accept_offline_node)
322
            for name in hosts]
323

    
324

    
325
class _RpcProcessor:
326
  def __init__(self, resolver, port, lock_monitor_cb=None):
327
    """Initializes this class.
328

329
    @param resolver: callable accepting a list of hostnames, returning a list
330
      of tuples containing name and IP address (IP address can be the name or
331
      the special value L{_OFFLINE} to mark offline machines)
332
    @type port: int
333
    @param port: TCP port
334
    @param lock_monitor_cb: Callable for registering with lock monitor
335

336
    """
337
    self._resolver = resolver
338
    self._port = port
339
    self._lock_monitor_cb = lock_monitor_cb
340

    
341
  @staticmethod
342
  def _PrepareRequests(hosts, port, procedure, body, read_timeout):
343
    """Prepares requests by sorting offline hosts into separate list.
344

345
    @type body: dict
346
    @param body: a dictionary with per-host body data
347

348
    """
349
    results = {}
350
    requests = {}
351

    
352
    assert isinstance(body, dict)
353
    assert len(body) == len(hosts)
354
    assert compat.all(isinstance(v, str) for v in body.values())
355
    assert frozenset(map(compat.fst, hosts)) == frozenset(body.keys()), \
356
        "%s != %s" % (hosts, body.keys())
357

    
358
    for (name, ip) in hosts:
359
      if ip is _OFFLINE:
360
        # Node is marked as offline
361
        results[name] = RpcResult(node=name, offline=True, call=procedure)
362
      else:
363
        requests[name] = \
364
          http.client.HttpClientRequest(str(ip), port,
365
                                        http.HTTP_PUT, str("/%s" % procedure),
366
                                        headers=_RPC_CLIENT_HEADERS,
367
                                        post_data=body[name],
368
                                        read_timeout=read_timeout,
369
                                        nicename="%s/%s" % (name, procedure),
370
                                        curl_config_fn=_ConfigRpcCurl)
371

    
372
    return (results, requests)
373

    
374
  @staticmethod
375
  def _CombineResults(results, requests, procedure):
376
    """Combines pre-computed results for offline hosts with actual call results.
377

378
    """
379
    for name, req in requests.items():
380
      if req.success and req.resp_status_code == http.HTTP_OK:
381
        host_result = RpcResult(data=serializer.LoadJson(req.resp_body),
382
                                node=name, call=procedure)
383
      else:
384
        # TODO: Better error reporting
385
        if req.error:
386
          msg = req.error
387
        else:
388
          msg = req.resp_body
389

    
390
        logging.error("RPC error in %s on node %s: %s", procedure, name, msg)
391
        host_result = RpcResult(data=msg, failed=True, node=name,
392
                                call=procedure)
393

    
394
      results[name] = host_result
395

    
396
    return results
397

    
398
  def __call__(self, hosts, procedure, body, read_timeout, resolver_opts,
399
               _req_process_fn=None):
400
    """Makes an RPC request to a number of nodes.
401

402
    @type hosts: sequence
403
    @param hosts: Hostnames
404
    @type procedure: string
405
    @param procedure: Request path
406
    @type body: dictionary
407
    @param body: dictionary with request bodies per host
408
    @type read_timeout: int or None
409
    @param read_timeout: Read timeout for request
410

411
    """
412
    assert read_timeout is not None, \
413
      "Missing RPC read timeout for procedure '%s'" % procedure
414

    
415
    if _req_process_fn is None:
416
      _req_process_fn = http.client.ProcessRequests
417

    
418
    (results, requests) = \
419
      self._PrepareRequests(self._resolver(hosts, resolver_opts), self._port,
420
                            procedure, body, read_timeout)
421

    
422
    _req_process_fn(requests.values(), lock_monitor_cb=self._lock_monitor_cb)
423

    
424
    assert not frozenset(results).intersection(requests)
425

    
426
    return self._CombineResults(results, requests, procedure)
427

    
428

    
429
class _RpcClientBase:
430
  def __init__(self, resolver, encoder_fn, lock_monitor_cb=None,
431
               _req_process_fn=None):
432
    """Initializes this class.
433

434
    """
435
    proc = _RpcProcessor(resolver,
436
                         netutils.GetDaemonPort(constants.NODED),
437
                         lock_monitor_cb=lock_monitor_cb)
438
    self._proc = compat.partial(proc, _req_process_fn=_req_process_fn)
439
    self._encoder = compat.partial(self._EncodeArg, encoder_fn)
440

    
441
  @staticmethod
442
  def _EncodeArg(encoder_fn, (argkind, value)):
443
    """Encode argument.
444

445
    """
446
    if argkind is None:
447
      return value
448
    else:
449
      return encoder_fn(argkind)(value)
450

    
451
  def _Call(self, cdef, node_list, args):
452
    """Entry point for automatically generated RPC wrappers.
453

454
    """
455
    (procedure, _, resolver_opts, timeout, argdefs,
456
     prep_fn, postproc_fn, _) = cdef
457

    
458
    if callable(timeout):
459
      read_timeout = timeout(args)
460
    else:
461
      read_timeout = timeout
462

    
463
    if callable(resolver_opts):
464
      req_resolver_opts = resolver_opts(args)
465
    else:
466
      req_resolver_opts = resolver_opts
467

    
468
    if len(args) != len(argdefs):
469
      raise errors.ProgrammerError("Number of passed arguments doesn't match")
470

    
471
    enc_args = map(self._encoder, zip(map(compat.snd, argdefs), args))
472
    if prep_fn is None:
473
      # for a no-op prep_fn, we serialise the body once, and then we
474
      # reuse it in the dictionary values
475
      body = serializer.DumpJson(enc_args)
476
      pnbody = dict((n, body) for n in node_list)
477
    else:
478
      # for a custom prep_fn, we pass the encoded arguments and the
479
      # node name to the prep_fn, and we serialise its return value
480
      assert callable(prep_fn)
481
      pnbody = dict((n, serializer.DumpJson(prep_fn(n, enc_args)))
482
                    for n in node_list)
483

    
484
    result = self._proc(node_list, procedure, pnbody, read_timeout,
485
                        req_resolver_opts)
486

    
487
    if postproc_fn:
488
      return dict(map(lambda (key, value): (key, postproc_fn(value)),
489
                      result.items()))
490
    else:
491
      return result
492

    
493

    
494
def _ObjectToDict(value):
495
  """Converts an object to a dictionary.
496

497
  @note: See L{objects}.
498

499
  """
500
  return value.ToDict()
501

    
502

    
503
def _ObjectListToDict(value):
504
  """Converts a list of L{objects} to dictionaries.
505

506
  """
507
  return map(_ObjectToDict, value)
508

    
509

    
510
def _EncodeNodeToDiskDict(value):
511
  """Encodes a dictionary with node name as key and disk objects as values.
512

513
  """
514
  return dict((name, _ObjectListToDict(disks))
515
              for name, disks in value.items())
516

    
517

    
518
def _PrepareFileUpload(getents_fn, filename):
519
  """Loads a file and prepares it for an upload to nodes.
520

521
  """
522
  statcb = utils.FileStatHelper()
523
  data = _Compress(utils.ReadFile(filename, preread=statcb))
524
  st = statcb.st
525

    
526
  if getents_fn is None:
527
    getents_fn = runtime.GetEnts
528

    
529
  getents = getents_fn()
530

    
531
  return [filename, data, st.st_mode, getents.LookupUid(st.st_uid),
532
          getents.LookupGid(st.st_gid), st.st_atime, st.st_mtime]
533

    
534

    
535
def _PrepareFinalizeExportDisks(snap_disks):
536
  """Encodes disks for finalizing export.
537

538
  """
539
  flat_disks = []
540

    
541
  for disk in snap_disks:
542
    if isinstance(disk, bool):
543
      flat_disks.append(disk)
544
    else:
545
      flat_disks.append(disk.ToDict())
546

    
547
  return flat_disks
548

    
549

    
550
def _EncodeImportExportIO((ieio, ieioargs)):
551
  """Encodes import/export I/O information.
552

553
  """
554
  if ieio == constants.IEIO_RAW_DISK:
555
    assert len(ieioargs) == 1
556
    return (ieio, (ieioargs[0].ToDict(), ))
557

    
558
  if ieio == constants.IEIO_SCRIPT:
559
    assert len(ieioargs) == 2
560
    return (ieio, (ieioargs[0].ToDict(), ieioargs[1]))
561

    
562
  return (ieio, ieioargs)
563

    
564

    
565
def _EncodeBlockdevRename(value):
566
  """Encodes information for renaming block devices.
567

568
  """
569
  return [(d.ToDict(), uid) for d, uid in value]
570

    
571

    
572
#: Generic encoders
573
_ENCODERS = {
574
  rpc_defs.ED_OBJECT_DICT: _ObjectToDict,
575
  rpc_defs.ED_OBJECT_DICT_LIST: _ObjectListToDict,
576
  rpc_defs.ED_NODE_TO_DISK_DICT: _EncodeNodeToDiskDict,
577
  rpc_defs.ED_COMPRESS: _Compress,
578
  rpc_defs.ED_FINALIZE_EXPORT_DISKS: _PrepareFinalizeExportDisks,
579
  rpc_defs.ED_IMPEXP_IO: _EncodeImportExportIO,
580
  rpc_defs.ED_BLOCKDEV_RENAME: _EncodeBlockdevRename,
581
  }
582

    
583

    
584
class RpcRunner(_RpcClientBase,
585
                _generated_rpc.RpcClientDefault,
586
                _generated_rpc.RpcClientBootstrap,
587
                _generated_rpc.RpcClientConfig):
588
  """RPC runner class.
589

590
  """
591
  def __init__(self, cfg, lock_monitor_cb, _req_process_fn=None, _getents=None):
592
    """Initialized the RPC runner.
593

594
    @type cfg: L{config.ConfigWriter}
595
    @param cfg: Configuration
596
    @type lock_monitor_cb: callable
597
    @param lock_monitor_cb: Lock monitor callback
598

599
    """
600
    self._cfg = cfg
601

    
602
    encoders = _ENCODERS.copy()
603

    
604
    encoders.update({
605
      # Encoders requiring configuration object
606
      rpc_defs.ED_INST_DICT: self._InstDict,
607
      rpc_defs.ED_INST_DICT_HVP_BEP: self._InstDictHvpBep,
608
      rpc_defs.ED_INST_DICT_OSP: self._InstDictOsp,
609

    
610
      # Encoders with special requirements
611
      rpc_defs.ED_FILE_DETAILS: compat.partial(_PrepareFileUpload, _getents),
612
      })
613

    
614
    # Resolver using configuration
615
    resolver = compat.partial(_NodeConfigResolver, cfg.GetNodeInfo,
616
                              cfg.GetAllNodesInfo)
617

    
618
    # Pylint doesn't recognize multiple inheritance properly, see
619
    # <http://www.logilab.org/ticket/36586> and
620
    # <http://www.logilab.org/ticket/35642>
621
    # pylint: disable=W0233
622
    _RpcClientBase.__init__(self, resolver, encoders.get,
623
                            lock_monitor_cb=lock_monitor_cb,
624
                            _req_process_fn=_req_process_fn)
625
    _generated_rpc.RpcClientConfig.__init__(self)
626
    _generated_rpc.RpcClientBootstrap.__init__(self)
627
    _generated_rpc.RpcClientDefault.__init__(self)
628

    
629
  def _InstDict(self, instance, hvp=None, bep=None, osp=None):
630
    """Convert the given instance to a dict.
631

632
    This is done via the instance's ToDict() method and additionally
633
    we fill the hvparams with the cluster defaults.
634

635
    @type instance: L{objects.Instance}
636
    @param instance: an Instance object
637
    @type hvp: dict or None
638
    @param hvp: a dictionary with overridden hypervisor parameters
639
    @type bep: dict or None
640
    @param bep: a dictionary with overridden backend parameters
641
    @type osp: dict or None
642
    @param osp: a dictionary with overridden os parameters
643
    @rtype: dict
644
    @return: the instance dict, with the hvparams filled with the
645
        cluster defaults
646

647
    """
648
    idict = instance.ToDict()
649
    cluster = self._cfg.GetClusterInfo()
650
    idict["hvparams"] = cluster.FillHV(instance)
651
    if hvp is not None:
652
      idict["hvparams"].update(hvp)
653
    idict["beparams"] = cluster.FillBE(instance)
654
    if bep is not None:
655
      idict["beparams"].update(bep)
656
    idict["osparams"] = cluster.SimpleFillOS(instance.os, instance.osparams)
657
    if osp is not None:
658
      idict["osparams"].update(osp)
659
    for nic in idict["nics"]:
660
      nic['nicparams'] = objects.FillDict(
661
        cluster.nicparams[constants.PP_DEFAULT],
662
        nic['nicparams'])
663
    return idict
664

    
665
  def _InstDictHvpBep(self, (instance, hvp, bep)):
666
    """Wrapper for L{_InstDict}.
667

668
    """
669
    return self._InstDict(instance, hvp=hvp, bep=bep)
670

    
671
  def _InstDictOsp(self, (instance, osparams)):
672
    """Wrapper for L{_InstDict}.
673

674
    """
675
    return self._InstDict(instance, osp=osparams)
676

    
677

    
678
class JobQueueRunner(_RpcClientBase, _generated_rpc.RpcClientJobQueue):
679
  """RPC wrappers for job queue.
680

681
  """
682
  def __init__(self, context, address_list):
683
    """Initializes this class.
684

685
    """
686
    if address_list is None:
687
      resolver = _SsconfResolver
688
    else:
689
      # Caller provided an address list
690
      resolver = _StaticResolver(address_list)
691

    
692
    _RpcClientBase.__init__(self, resolver, _ENCODERS.get,
693
                            lock_monitor_cb=context.glm.AddToLockMonitor)
694
    _generated_rpc.RpcClientJobQueue.__init__(self)
695

    
696

    
697
class BootstrapRunner(_RpcClientBase, _generated_rpc.RpcClientBootstrap):
698
  """RPC wrappers for bootstrapping.
699

700
  """
701
  def __init__(self):
702
    """Initializes this class.
703

704
    """
705
    _RpcClientBase.__init__(self, _SsconfResolver, _ENCODERS.get)
706
    _generated_rpc.RpcClientBootstrap.__init__(self)
707

    
708

    
709
class ConfigRunner(_RpcClientBase, _generated_rpc.RpcClientConfig):
710
  """RPC wrappers for L{config}.
711

712
  """
713
  def __init__(self, context, address_list, _req_process_fn=None,
714
               _getents=None):
715
    """Initializes this class.
716

717
    """
718
    if context:
719
      lock_monitor_cb = context.glm.AddToLockMonitor
720
    else:
721
      lock_monitor_cb = None
722

    
723
    if address_list is None:
724
      resolver = _SsconfResolver
725
    else:
726
      # Caller provided an address list
727
      resolver = _StaticResolver(address_list)
728

    
729
    encoders = _ENCODERS.copy()
730

    
731
    encoders.update({
732
      rpc_defs.ED_FILE_DETAILS: compat.partial(_PrepareFileUpload, _getents),
733
      })
734

    
735
    _RpcClientBase.__init__(self, resolver, encoders.get,
736
                            lock_monitor_cb=lock_monitor_cb,
737
                            _req_process_fn=_req_process_fn)
738
    _generated_rpc.RpcClientConfig.__init__(self)