Statistics
| Branch: | Tag: | Revision:

root / lib / rpc.py @ bd6d1202

History | View | Annotate | Download (22.2 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Inter-node RPC library.
23

24
"""
25

    
26
# pylint: disable=C0103,R0201,R0904
27
# C0103: Invalid name, since call_ are not valid
28
# R0201: Method could be a function, we keep all rpcs instance methods
29
# as not to change them back and forth between static/instance methods
30
# if they need to start using instance attributes
31
# R0904: Too many public methods
32

    
33
import logging
34
import zlib
35
import base64
36
import pycurl
37
import threading
38

    
39
from ganeti import utils
40
from ganeti import objects
41
from ganeti import http
42
from ganeti import serializer
43
from ganeti import constants
44
from ganeti import errors
45
from ganeti import netutils
46
from ganeti import ssconf
47
from ganeti import runtime
48
from ganeti import compat
49
from ganeti import rpc_defs
50

    
51
# Special module generated at build time
52
from ganeti import _generated_rpc
53

    
54
# pylint has a bug here, doesn't see this import
55
import ganeti.http.client  # pylint: disable=W0611
56

    
57

    
58
# Timeout for connecting to nodes (seconds)
59
_RPC_CONNECT_TIMEOUT = 5
60

    
61
_RPC_CLIENT_HEADERS = [
62
  "Content-type: %s" % http.HTTP_APP_JSON,
63
  "Expect:",
64
  ]
65

    
66
# Various time constants for the timeout table
67
_TMO_URGENT = 60 # one minute
68
_TMO_FAST = 5 * 60 # five minutes
69
_TMO_NORMAL = 15 * 60 # 15 minutes
70
_TMO_SLOW = 3600 # one hour
71
_TMO_4HRS = 4 * 3600
72
_TMO_1DAY = 86400
73

    
74
#: Special value to describe an offline host
75
_OFFLINE = object()
76

    
77

    
78
def Init():
79
  """Initializes the module-global HTTP client manager.
80

81
  Must be called before using any RPC function and while exactly one thread is
82
  running.
83

84
  """
85
  # curl_global_init(3) and curl_global_cleanup(3) must be called with only
86
  # one thread running. This check is just a safety measure -- it doesn't
87
  # cover all cases.
88
  assert threading.activeCount() == 1, \
89
         "Found more than one active thread when initializing pycURL"
90

    
91
  logging.info("Using PycURL %s", pycurl.version)
92

    
93
  pycurl.global_init(pycurl.GLOBAL_ALL)
94

    
95

    
96
def Shutdown():
97
  """Stops the module-global HTTP client manager.
98

99
  Must be called before quitting the program and while exactly one thread is
100
  running.
101

102
  """
103
  pycurl.global_cleanup()
104

    
105

    
106
def _ConfigRpcCurl(curl):
107
  noded_cert = str(constants.NODED_CERT_FILE)
108

    
109
  curl.setopt(pycurl.FOLLOWLOCATION, False)
110
  curl.setopt(pycurl.CAINFO, noded_cert)
111
  curl.setopt(pycurl.SSL_VERIFYHOST, 0)
112
  curl.setopt(pycurl.SSL_VERIFYPEER, True)
113
  curl.setopt(pycurl.SSLCERTTYPE, "PEM")
114
  curl.setopt(pycurl.SSLCERT, noded_cert)
115
  curl.setopt(pycurl.SSLKEYTYPE, "PEM")
116
  curl.setopt(pycurl.SSLKEY, noded_cert)
117
  curl.setopt(pycurl.CONNECTTIMEOUT, _RPC_CONNECT_TIMEOUT)
118

    
119

    
120
def RunWithRPC(fn):
121
  """RPC-wrapper decorator.
122

123
  When applied to a function, it runs it with the RPC system
124
  initialized, and it shutsdown the system afterwards. This means the
125
  function must be called without RPC being initialized.
126

127
  """
128
  def wrapper(*args, **kwargs):
129
    Init()
130
    try:
131
      return fn(*args, **kwargs)
132
    finally:
133
      Shutdown()
134
  return wrapper
135

    
136

    
137
def _Compress(data):
138
  """Compresses a string for transport over RPC.
139

140
  Small amounts of data are not compressed.
141

142
  @type data: str
143
  @param data: Data
144
  @rtype: tuple
145
  @return: Encoded data to send
146

147
  """
148
  # Small amounts of data are not compressed
149
  if len(data) < 512:
150
    return (constants.RPC_ENCODING_NONE, data)
151

    
152
  # Compress with zlib and encode in base64
153
  return (constants.RPC_ENCODING_ZLIB_BASE64,
154
          base64.b64encode(zlib.compress(data, 3)))
155

    
156

    
157
class RpcResult(object):
158
  """RPC Result class.
159

160
  This class holds an RPC result. It is needed since in multi-node
161
  calls we can't raise an exception just because one one out of many
162
  failed, and therefore we use this class to encapsulate the result.
163

164
  @ivar data: the data payload, for successful results, or None
165
  @ivar call: the name of the RPC call
166
  @ivar node: the name of the node to which we made the call
167
  @ivar offline: whether the operation failed because the node was
168
      offline, as opposed to actual failure; offline=True will always
169
      imply failed=True, in order to allow simpler checking if
170
      the user doesn't care about the exact failure mode
171
  @ivar fail_msg: the error message if the call failed
172

173
  """
174
  def __init__(self, data=None, failed=False, offline=False,
175
               call=None, node=None):
176
    self.offline = offline
177
    self.call = call
178
    self.node = node
179

    
180
    if offline:
181
      self.fail_msg = "Node is marked offline"
182
      self.data = self.payload = None
183
    elif failed:
184
      self.fail_msg = self._EnsureErr(data)
185
      self.data = self.payload = None
186
    else:
187
      self.data = data
188
      if not isinstance(self.data, (tuple, list)):
189
        self.fail_msg = ("RPC layer error: invalid result type (%s)" %
190
                         type(self.data))
191
        self.payload = None
192
      elif len(data) != 2:
193
        self.fail_msg = ("RPC layer error: invalid result length (%d), "
194
                         "expected 2" % len(self.data))
195
        self.payload = None
196
      elif not self.data[0]:
197
        self.fail_msg = self._EnsureErr(self.data[1])
198
        self.payload = None
199
      else:
200
        # finally success
201
        self.fail_msg = None
202
        self.payload = data[1]
203

    
204
    for attr_name in ["call", "data", "fail_msg",
205
                      "node", "offline", "payload"]:
206
      assert hasattr(self, attr_name), "Missing attribute %s" % attr_name
207

    
208
  @staticmethod
209
  def _EnsureErr(val):
210
    """Helper to ensure we return a 'True' value for error."""
211
    if val:
212
      return val
213
    else:
214
      return "No error information"
215

    
216
  def Raise(self, msg, prereq=False, ecode=None):
217
    """If the result has failed, raise an OpExecError.
218

219
    This is used so that LU code doesn't have to check for each
220
    result, but instead can call this function.
221

222
    """
223
    if not self.fail_msg:
224
      return
225

    
226
    if not msg: # one could pass None for default message
227
      msg = ("Call '%s' to node '%s' has failed: %s" %
228
             (self.call, self.node, self.fail_msg))
229
    else:
230
      msg = "%s: %s" % (msg, self.fail_msg)
231
    if prereq:
232
      ec = errors.OpPrereqError
233
    else:
234
      ec = errors.OpExecError
235
    if ecode is not None:
236
      args = (msg, ecode)
237
    else:
238
      args = (msg, )
239
    raise ec(*args) # pylint: disable=W0142
240

    
241

    
242
def _SsconfResolver(ssconf_ips, node_list, _,
243
                    ssc=ssconf.SimpleStore,
244
                    nslookup_fn=netutils.Hostname.GetIP):
245
  """Return addresses for given node names.
246

247
  @type ssconf_ips: bool
248
  @param ssconf_ips: Use the ssconf IPs
249
  @type node_list: list
250
  @param node_list: List of node names
251
  @type ssc: class
252
  @param ssc: SimpleStore class that is used to obtain node->ip mappings
253
  @type nslookup_fn: callable
254
  @param nslookup_fn: function use to do NS lookup
255
  @rtype: list of tuple; (string, string)
256
  @return: List of tuples containing node name and IP address
257

258
  """
259
  ss = ssc()
260
  family = ss.GetPrimaryIPFamily()
261

    
262
  if ssconf_ips:
263
    iplist = ss.GetNodePrimaryIPList()
264
    ipmap = dict(entry.split() for entry in iplist)
265
  else:
266
    ipmap = {}
267

    
268
  result = []
269
  for node in node_list:
270
    ip = ipmap.get(node)
271
    if ip is None:
272
      ip = nslookup_fn(node, family=family)
273
    result.append((node, ip))
274

    
275
  return result
276

    
277

    
278
class _StaticResolver:
279
  def __init__(self, addresses):
280
    """Initializes this class.
281

282
    """
283
    self._addresses = addresses
284

    
285
  def __call__(self, hosts, _):
286
    """Returns static addresses for hosts.
287

288
    """
289
    assert len(hosts) == len(self._addresses)
290
    return zip(hosts, self._addresses)
291

    
292

    
293
def _CheckConfigNode(name, node, accept_offline_node):
294
  """Checks if a node is online.
295

296
  @type name: string
297
  @param name: Node name
298
  @type node: L{objects.Node} or None
299
  @param node: Node object
300

301
  """
302
  if node is None:
303
    # Depend on DNS for name resolution
304
    ip = name
305
  elif node.offline and not accept_offline_node:
306
    ip = _OFFLINE
307
  else:
308
    ip = node.primary_ip
309
  return (name, ip)
310

    
311

    
312
def _NodeConfigResolver(single_node_fn, all_nodes_fn, hosts, opts):
313
  """Calculate node addresses using configuration.
314

315
  """
316
  accept_offline_node = (opts is rpc_defs.ACCEPT_OFFLINE_NODE)
317

    
318
  assert accept_offline_node or opts is None, "Unknown option"
319

    
320
  # Special case for single-host lookups
321
  if len(hosts) == 1:
322
    (name, ) = hosts
323
    return [_CheckConfigNode(name, single_node_fn(name), accept_offline_node)]
324
  else:
325
    all_nodes = all_nodes_fn()
326
    return [_CheckConfigNode(name, all_nodes.get(name, None),
327
                             accept_offline_node)
328
            for name in hosts]
329

    
330

    
331
class _RpcProcessor:
332
  def __init__(self, resolver, port, lock_monitor_cb=None):
333
    """Initializes this class.
334

335
    @param resolver: callable accepting a list of hostnames, returning a list
336
      of tuples containing name and IP address (IP address can be the name or
337
      the special value L{_OFFLINE} to mark offline machines)
338
    @type port: int
339
    @param port: TCP port
340
    @param lock_monitor_cb: Callable for registering with lock monitor
341

342
    """
343
    self._resolver = resolver
344
    self._port = port
345
    self._lock_monitor_cb = lock_monitor_cb
346

    
347
  @staticmethod
348
  def _PrepareRequests(hosts, port, procedure, body, read_timeout):
349
    """Prepares requests by sorting offline hosts into separate list.
350

351
    @type body: dict
352
    @param body: a dictionary with per-host body data
353

354
    """
355
    results = {}
356
    requests = {}
357

    
358
    assert isinstance(body, dict)
359
    assert len(body) == len(hosts)
360
    assert compat.all(isinstance(v, str) for v in body.values())
361
    assert frozenset(map(compat.fst, hosts)) == frozenset(body.keys()), \
362
        "%s != %s" % (hosts, body.keys())
363

    
364
    for (name, ip) in hosts:
365
      if ip is _OFFLINE:
366
        # Node is marked as offline
367
        results[name] = RpcResult(node=name, offline=True, call=procedure)
368
      else:
369
        requests[name] = \
370
          http.client.HttpClientRequest(str(ip), port,
371
                                        http.HTTP_POST, str("/%s" % procedure),
372
                                        headers=_RPC_CLIENT_HEADERS,
373
                                        post_data=body[name],
374
                                        read_timeout=read_timeout,
375
                                        nicename="%s/%s" % (name, procedure),
376
                                        curl_config_fn=_ConfigRpcCurl)
377

    
378
    return (results, requests)
379

    
380
  @staticmethod
381
  def _CombineResults(results, requests, procedure):
382
    """Combines pre-computed results for offline hosts with actual call results.
383

384
    """
385
    for name, req in requests.items():
386
      if req.success and req.resp_status_code == http.HTTP_OK:
387
        host_result = RpcResult(data=serializer.LoadJson(req.resp_body),
388
                                node=name, call=procedure)
389
      else:
390
        # TODO: Better error reporting
391
        if req.error:
392
          msg = req.error
393
        else:
394
          msg = req.resp_body
395

    
396
        logging.error("RPC error in %s on node %s: %s", procedure, name, msg)
397
        host_result = RpcResult(data=msg, failed=True, node=name,
398
                                call=procedure)
399

    
400
      results[name] = host_result
401

    
402
    return results
403

    
404
  def __call__(self, hosts, procedure, body, read_timeout, resolver_opts,
405
               _req_process_fn=None):
406
    """Makes an RPC request to a number of nodes.
407

408
    @type hosts: sequence
409
    @param hosts: Hostnames
410
    @type procedure: string
411
    @param procedure: Request path
412
    @type body: dictionary
413
    @param body: dictionary with request bodies per host
414
    @type read_timeout: int or None
415
    @param read_timeout: Read timeout for request
416

417
    """
418
    assert read_timeout is not None, \
419
      "Missing RPC read timeout for procedure '%s'" % procedure
420

    
421
    if _req_process_fn is None:
422
      _req_process_fn = http.client.ProcessRequests
423

    
424
    (results, requests) = \
425
      self._PrepareRequests(self._resolver(hosts, resolver_opts), self._port,
426
                            procedure, body, read_timeout)
427

    
428
    _req_process_fn(requests.values(), lock_monitor_cb=self._lock_monitor_cb)
429

    
430
    assert not frozenset(results).intersection(requests)
431

    
432
    return self._CombineResults(results, requests, procedure)
433

    
434

    
435
class _RpcClientBase:
436
  def __init__(self, resolver, encoder_fn, lock_monitor_cb=None,
437
               _req_process_fn=None):
438
    """Initializes this class.
439

440
    """
441
    proc = _RpcProcessor(resolver,
442
                         netutils.GetDaemonPort(constants.NODED),
443
                         lock_monitor_cb=lock_monitor_cb)
444
    self._proc = compat.partial(proc, _req_process_fn=_req_process_fn)
445
    self._encoder = compat.partial(self._EncodeArg, encoder_fn)
446

    
447
  @staticmethod
448
  def _EncodeArg(encoder_fn, (argkind, value)):
449
    """Encode argument.
450

451
    """
452
    if argkind is None:
453
      return value
454
    else:
455
      return encoder_fn(argkind)(value)
456

    
457
  def _Call(self, cdef, node_list, args):
458
    """Entry point for automatically generated RPC wrappers.
459

460
    """
461
    (procedure, _, resolver_opts, timeout, argdefs,
462
     prep_fn, postproc_fn, _) = cdef
463

    
464
    if callable(timeout):
465
      read_timeout = timeout(args)
466
    else:
467
      read_timeout = timeout
468

    
469
    if callable(resolver_opts):
470
      req_resolver_opts = resolver_opts(args)
471
    else:
472
      req_resolver_opts = resolver_opts
473

    
474
    if len(args) != len(argdefs):
475
      raise errors.ProgrammerError("Number of passed arguments doesn't match")
476

    
477
    enc_args = map(self._encoder, zip(map(compat.snd, argdefs), args))
478
    if prep_fn is None:
479
      # for a no-op prep_fn, we serialise the body once, and then we
480
      # reuse it in the dictionary values
481
      body = serializer.DumpJson(enc_args)
482
      pnbody = dict((n, body) for n in node_list)
483
    else:
484
      # for a custom prep_fn, we pass the encoded arguments and the
485
      # node name to the prep_fn, and we serialise its return value
486
      assert callable(prep_fn)
487
      pnbody = dict((n, serializer.DumpJson(prep_fn(n, enc_args)))
488
                    for n in node_list)
489

    
490
    result = self._proc(node_list, procedure, pnbody, read_timeout,
491
                        req_resolver_opts)
492

    
493
    if postproc_fn:
494
      return dict(map(lambda (key, value): (key, postproc_fn(value)),
495
                      result.items()))
496
    else:
497
      return result
498

    
499

    
500
def _ObjectToDict(value):
501
  """Converts an object to a dictionary.
502

503
  @note: See L{objects}.
504

505
  """
506
  return value.ToDict()
507

    
508

    
509
def _ObjectListToDict(value):
510
  """Converts a list of L{objects} to dictionaries.
511

512
  """
513
  return map(_ObjectToDict, value)
514

    
515

    
516
def _EncodeNodeToDiskDict(value):
517
  """Encodes a dictionary with node name as key and disk objects as values.
518

519
  """
520
  return dict((name, _ObjectListToDict(disks))
521
              for name, disks in value.items())
522

    
523

    
524
def _PrepareFileUpload(getents_fn, filename):
525
  """Loads a file and prepares it for an upload to nodes.
526

527
  """
528
  statcb = utils.FileStatHelper()
529
  data = _Compress(utils.ReadFile(filename, preread=statcb))
530
  st = statcb.st
531

    
532
  if getents_fn is None:
533
    getents_fn = runtime.GetEnts
534

    
535
  getents = getents_fn()
536

    
537
  return [filename, data, st.st_mode, getents.LookupUid(st.st_uid),
538
          getents.LookupGid(st.st_gid), st.st_atime, st.st_mtime]
539

    
540

    
541
def _PrepareFinalizeExportDisks(snap_disks):
542
  """Encodes disks for finalizing export.
543

544
  """
545
  flat_disks = []
546

    
547
  for disk in snap_disks:
548
    if isinstance(disk, bool):
549
      flat_disks.append(disk)
550
    else:
551
      flat_disks.append(disk.ToDict())
552

    
553
  return flat_disks
554

    
555

    
556
def _EncodeImportExportIO((ieio, ieioargs)):
557
  """Encodes import/export I/O information.
558

559
  """
560
  if ieio == constants.IEIO_RAW_DISK:
561
    assert len(ieioargs) == 1
562
    return (ieio, (ieioargs[0].ToDict(), ))
563

    
564
  if ieio == constants.IEIO_SCRIPT:
565
    assert len(ieioargs) == 2
566
    return (ieio, (ieioargs[0].ToDict(), ieioargs[1]))
567

    
568
  return (ieio, ieioargs)
569

    
570

    
571
def _EncodeBlockdevRename(value):
572
  """Encodes information for renaming block devices.
573

574
  """
575
  return [(d.ToDict(), uid) for d, uid in value]
576

    
577

    
578
#: Generic encoders
579
_ENCODERS = {
580
  rpc_defs.ED_OBJECT_DICT: _ObjectToDict,
581
  rpc_defs.ED_OBJECT_DICT_LIST: _ObjectListToDict,
582
  rpc_defs.ED_NODE_TO_DISK_DICT: _EncodeNodeToDiskDict,
583
  rpc_defs.ED_COMPRESS: _Compress,
584
  rpc_defs.ED_FINALIZE_EXPORT_DISKS: _PrepareFinalizeExportDisks,
585
  rpc_defs.ED_IMPEXP_IO: _EncodeImportExportIO,
586
  rpc_defs.ED_BLOCKDEV_RENAME: _EncodeBlockdevRename,
587
  }
588

    
589

    
590
class RpcRunner(_RpcClientBase,
591
                _generated_rpc.RpcClientDefault,
592
                _generated_rpc.RpcClientBootstrap,
593
                _generated_rpc.RpcClientDnsOnly,
594
                _generated_rpc.RpcClientConfig):
595
  """RPC runner class.
596

597
  """
598
  def __init__(self, cfg, lock_monitor_cb, _req_process_fn=None, _getents=None):
599
    """Initialized the RPC runner.
600

601
    @type cfg: L{config.ConfigWriter}
602
    @param cfg: Configuration
603
    @type lock_monitor_cb: callable
604
    @param lock_monitor_cb: Lock monitor callback
605

606
    """
607
    self._cfg = cfg
608

    
609
    encoders = _ENCODERS.copy()
610

    
611
    encoders.update({
612
      # Encoders requiring configuration object
613
      rpc_defs.ED_INST_DICT: self._InstDict,
614
      rpc_defs.ED_INST_DICT_HVP_BEP: self._InstDictHvpBep,
615
      rpc_defs.ED_INST_DICT_OSP: self._InstDictOsp,
616

    
617
      # Encoders with special requirements
618
      rpc_defs.ED_FILE_DETAILS: compat.partial(_PrepareFileUpload, _getents),
619
      })
620

    
621
    # Resolver using configuration
622
    resolver = compat.partial(_NodeConfigResolver, cfg.GetNodeInfo,
623
                              cfg.GetAllNodesInfo)
624

    
625
    # Pylint doesn't recognize multiple inheritance properly, see
626
    # <http://www.logilab.org/ticket/36586> and
627
    # <http://www.logilab.org/ticket/35642>
628
    # pylint: disable=W0233
629
    _RpcClientBase.__init__(self, resolver, encoders.get,
630
                            lock_monitor_cb=lock_monitor_cb,
631
                            _req_process_fn=_req_process_fn)
632
    _generated_rpc.RpcClientConfig.__init__(self)
633
    _generated_rpc.RpcClientBootstrap.__init__(self)
634
    _generated_rpc.RpcClientDnsOnly.__init__(self)
635
    _generated_rpc.RpcClientDefault.__init__(self)
636

    
637
  def _InstDict(self, instance, hvp=None, bep=None, osp=None):
638
    """Convert the given instance to a dict.
639

640
    This is done via the instance's ToDict() method and additionally
641
    we fill the hvparams with the cluster defaults.
642

643
    @type instance: L{objects.Instance}
644
    @param instance: an Instance object
645
    @type hvp: dict or None
646
    @param hvp: a dictionary with overridden hypervisor parameters
647
    @type bep: dict or None
648
    @param bep: a dictionary with overridden backend parameters
649
    @type osp: dict or None
650
    @param osp: a dictionary with overridden os parameters
651
    @rtype: dict
652
    @return: the instance dict, with the hvparams filled with the
653
        cluster defaults
654

655
    """
656
    idict = instance.ToDict()
657
    cluster = self._cfg.GetClusterInfo()
658
    idict["hvparams"] = cluster.FillHV(instance)
659
    if hvp is not None:
660
      idict["hvparams"].update(hvp)
661
    idict["beparams"] = cluster.FillBE(instance)
662
    if bep is not None:
663
      idict["beparams"].update(bep)
664
    idict["osparams"] = cluster.SimpleFillOS(instance.os, instance.osparams)
665
    if osp is not None:
666
      idict["osparams"].update(osp)
667
    for nic in idict["nics"]:
668
      nic["nicparams"] = objects.FillDict(
669
        cluster.nicparams[constants.PP_DEFAULT],
670
        nic["nicparams"])
671
    return idict
672

    
673
  def _InstDictHvpBep(self, (instance, hvp, bep)):
674
    """Wrapper for L{_InstDict}.
675

676
    """
677
    return self._InstDict(instance, hvp=hvp, bep=bep)
678

    
679
  def _InstDictOsp(self, (instance, osparams)):
680
    """Wrapper for L{_InstDict}.
681

682
    """
683
    return self._InstDict(instance, osp=osparams)
684

    
685

    
686
class JobQueueRunner(_RpcClientBase, _generated_rpc.RpcClientJobQueue):
687
  """RPC wrappers for job queue.
688

689
  """
690
  def __init__(self, context, address_list):
691
    """Initializes this class.
692

693
    """
694
    if address_list is None:
695
      resolver = compat.partial(_SsconfResolver, True)
696
    else:
697
      # Caller provided an address list
698
      resolver = _StaticResolver(address_list)
699

    
700
    _RpcClientBase.__init__(self, resolver, _ENCODERS.get,
701
                            lock_monitor_cb=context.glm.AddToLockMonitor)
702
    _generated_rpc.RpcClientJobQueue.__init__(self)
703

    
704

    
705
class BootstrapRunner(_RpcClientBase,
706
                      _generated_rpc.RpcClientBootstrap,
707
                      _generated_rpc.RpcClientDnsOnly):
708
  """RPC wrappers for bootstrapping.
709

710
  """
711
  def __init__(self):
712
    """Initializes this class.
713

714
    """
715
    # Pylint doesn't recognize multiple inheritance properly, see
716
    # <http://www.logilab.org/ticket/36586> and
717
    # <http://www.logilab.org/ticket/35642>
718
    # pylint: disable=W0233
719
    _RpcClientBase.__init__(self, compat.partial(_SsconfResolver, True),
720
                            _ENCODERS.get)
721
    _generated_rpc.RpcClientBootstrap.__init__(self)
722
    _generated_rpc.RpcClientDnsOnly.__init__(self)
723

    
724

    
725
class DnsOnlyRunner(_RpcClientBase, _generated_rpc.RpcClientDnsOnly):
726
  """RPC wrappers for calls using only DNS.
727

728
  """
729
  def __init__(self):
730
    """Initialize this class.
731

732
    """
733
    _RpcClientBase.__init__(self, compat.partial(_SsconfResolver, False),
734
                            _ENCODERS.get)
735
    _generated_rpc.RpcClientDnsOnly.__init__(self)
736

    
737

    
738
class ConfigRunner(_RpcClientBase, _generated_rpc.RpcClientConfig):
739
  """RPC wrappers for L{config}.
740

741
  """
742
  def __init__(self, context, address_list, _req_process_fn=None,
743
               _getents=None):
744
    """Initializes this class.
745

746
    """
747
    if context:
748
      lock_monitor_cb = context.glm.AddToLockMonitor
749
    else:
750
      lock_monitor_cb = None
751

    
752
    if address_list is None:
753
      resolver = compat.partial(_SsconfResolver, True)
754
    else:
755
      # Caller provided an address list
756
      resolver = _StaticResolver(address_list)
757

    
758
    encoders = _ENCODERS.copy()
759

    
760
    encoders.update({
761
      rpc_defs.ED_FILE_DETAILS: compat.partial(_PrepareFileUpload, _getents),
762
      })
763

    
764
    _RpcClientBase.__init__(self, resolver, encoders.get,
765
                            lock_monitor_cb=lock_monitor_cb,
766
                            _req_process_fn=_req_process_fn)
767
    _generated_rpc.RpcClientConfig.__init__(self)