Statistics
| Branch: | Tag: | Revision:

root / lib / rpc.py @ 702abcf9

History | View | Annotate | Download (20.9 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Inter-node RPC library.
23

24
"""
25

    
26
# pylint: disable=C0103,R0201,R0904
27
# C0103: Invalid name, since call_ are not valid
28
# R0201: Method could be a function, we keep all rpcs instance methods
29
# as not to change them back and forth between static/instance methods
30
# if they need to start using instance attributes
31
# R0904: Too many public methods
32

    
33
import os
34
import logging
35
import zlib
36
import base64
37
import pycurl
38
import threading
39

    
40
from ganeti import utils
41
from ganeti import objects
42
from ganeti import http
43
from ganeti import serializer
44
from ganeti import constants
45
from ganeti import errors
46
from ganeti import netutils
47
from ganeti import ssconf
48
from ganeti import runtime
49
from ganeti import compat
50
from ganeti import rpc_defs
51

    
52
# Special module generated at build time
53
from ganeti import _generated_rpc
54

    
55
# pylint has a bug here, doesn't see this import
56
import ganeti.http.client  # pylint: disable=W0611
57

    
58

    
59
# Timeout for connecting to nodes (seconds)
60
_RPC_CONNECT_TIMEOUT = 5
61

    
62
_RPC_CLIENT_HEADERS = [
63
  "Content-type: %s" % http.HTTP_APP_JSON,
64
  "Expect:",
65
  ]
66

    
67
# Various time constants for the timeout table
68
_TMO_URGENT = 60 # one minute
69
_TMO_FAST = 5 * 60 # five minutes
70
_TMO_NORMAL = 15 * 60 # 15 minutes
71
_TMO_SLOW = 3600 # one hour
72
_TMO_4HRS = 4 * 3600
73
_TMO_1DAY = 86400
74

    
75
#: Special value to describe an offline host
76
_OFFLINE = object()
77

    
78

    
79
def Init():
80
  """Initializes the module-global HTTP client manager.
81

82
  Must be called before using any RPC function and while exactly one thread is
83
  running.
84

85
  """
86
  # curl_global_init(3) and curl_global_cleanup(3) must be called with only
87
  # one thread running. This check is just a safety measure -- it doesn't
88
  # cover all cases.
89
  assert threading.activeCount() == 1, \
90
         "Found more than one active thread when initializing pycURL"
91

    
92
  logging.info("Using PycURL %s", pycurl.version)
93

    
94
  pycurl.global_init(pycurl.GLOBAL_ALL)
95

    
96

    
97
def Shutdown():
98
  """Stops the module-global HTTP client manager.
99

100
  Must be called before quitting the program and while exactly one thread is
101
  running.
102

103
  """
104
  pycurl.global_cleanup()
105

    
106

    
107
def _ConfigRpcCurl(curl):
108
  noded_cert = str(constants.NODED_CERT_FILE)
109

    
110
  curl.setopt(pycurl.FOLLOWLOCATION, False)
111
  curl.setopt(pycurl.CAINFO, noded_cert)
112
  curl.setopt(pycurl.SSL_VERIFYHOST, 0)
113
  curl.setopt(pycurl.SSL_VERIFYPEER, True)
114
  curl.setopt(pycurl.SSLCERTTYPE, "PEM")
115
  curl.setopt(pycurl.SSLCERT, noded_cert)
116
  curl.setopt(pycurl.SSLKEYTYPE, "PEM")
117
  curl.setopt(pycurl.SSLKEY, noded_cert)
118
  curl.setopt(pycurl.CONNECTTIMEOUT, _RPC_CONNECT_TIMEOUT)
119

    
120

    
121
def RunWithRPC(fn):
122
  """RPC-wrapper decorator.
123

124
  When applied to a function, it runs it with the RPC system
125
  initialized, and it shutsdown the system afterwards. This means the
126
  function must be called without RPC being initialized.
127

128
  """
129
  def wrapper(*args, **kwargs):
130
    Init()
131
    try:
132
      return fn(*args, **kwargs)
133
    finally:
134
      Shutdown()
135
  return wrapper
136

    
137

    
138
def _Compress(data):
139
  """Compresses a string for transport over RPC.
140

141
  Small amounts of data are not compressed.
142

143
  @type data: str
144
  @param data: Data
145
  @rtype: tuple
146
  @return: Encoded data to send
147

148
  """
149
  # Small amounts of data are not compressed
150
  if len(data) < 512:
151
    return (constants.RPC_ENCODING_NONE, data)
152

    
153
  # Compress with zlib and encode in base64
154
  return (constants.RPC_ENCODING_ZLIB_BASE64,
155
          base64.b64encode(zlib.compress(data, 3)))
156

    
157

    
158
class RpcResult(object):
159
  """RPC Result class.
160

161
  This class holds an RPC result. It is needed since in multi-node
162
  calls we can't raise an exception just because one one out of many
163
  failed, and therefore we use this class to encapsulate the result.
164

165
  @ivar data: the data payload, for successful results, or None
166
  @ivar call: the name of the RPC call
167
  @ivar node: the name of the node to which we made the call
168
  @ivar offline: whether the operation failed because the node was
169
      offline, as opposed to actual failure; offline=True will always
170
      imply failed=True, in order to allow simpler checking if
171
      the user doesn't care about the exact failure mode
172
  @ivar fail_msg: the error message if the call failed
173

174
  """
175
  def __init__(self, data=None, failed=False, offline=False,
176
               call=None, node=None):
177
    self.offline = offline
178
    self.call = call
179
    self.node = node
180

    
181
    if offline:
182
      self.fail_msg = "Node is marked offline"
183
      self.data = self.payload = None
184
    elif failed:
185
      self.fail_msg = self._EnsureErr(data)
186
      self.data = self.payload = None
187
    else:
188
      self.data = data
189
      if not isinstance(self.data, (tuple, list)):
190
        self.fail_msg = ("RPC layer error: invalid result type (%s)" %
191
                         type(self.data))
192
        self.payload = None
193
      elif len(data) != 2:
194
        self.fail_msg = ("RPC layer error: invalid result length (%d), "
195
                         "expected 2" % len(self.data))
196
        self.payload = None
197
      elif not self.data[0]:
198
        self.fail_msg = self._EnsureErr(self.data[1])
199
        self.payload = None
200
      else:
201
        # finally success
202
        self.fail_msg = None
203
        self.payload = data[1]
204

    
205
    for attr_name in ["call", "data", "fail_msg",
206
                      "node", "offline", "payload"]:
207
      assert hasattr(self, attr_name), "Missing attribute %s" % attr_name
208

    
209
  @staticmethod
210
  def _EnsureErr(val):
211
    """Helper to ensure we return a 'True' value for error."""
212
    if val:
213
      return val
214
    else:
215
      return "No error information"
216

    
217
  def Raise(self, msg, prereq=False, ecode=None):
218
    """If the result has failed, raise an OpExecError.
219

220
    This is used so that LU code doesn't have to check for each
221
    result, but instead can call this function.
222

223
    """
224
    if not self.fail_msg:
225
      return
226

    
227
    if not msg: # one could pass None for default message
228
      msg = ("Call '%s' to node '%s' has failed: %s" %
229
             (self.call, self.node, self.fail_msg))
230
    else:
231
      msg = "%s: %s" % (msg, self.fail_msg)
232
    if prereq:
233
      ec = errors.OpPrereqError
234
    else:
235
      ec = errors.OpExecError
236
    if ecode is not None:
237
      args = (msg, ecode)
238
    else:
239
      args = (msg, )
240
    raise ec(*args) # pylint: disable=W0142
241

    
242

    
243
def _SsconfResolver(node_list,
244
                    ssc=ssconf.SimpleStore,
245
                    nslookup_fn=netutils.Hostname.GetIP):
246
  """Return addresses for given node names.
247

248
  @type node_list: list
249
  @param node_list: List of node names
250
  @type ssc: class
251
  @param ssc: SimpleStore class that is used to obtain node->ip mappings
252
  @type nslookup_fn: callable
253
  @param nslookup_fn: function use to do NS lookup
254
  @rtype: list of tuple; (string, string)
255
  @return: List of tuples containing node name and IP address
256

257
  """
258
  ss = ssc()
259
  iplist = ss.GetNodePrimaryIPList()
260
  family = ss.GetPrimaryIPFamily()
261
  ipmap = dict(entry.split() for entry in iplist)
262

    
263
  result = []
264
  for node in node_list:
265
    ip = ipmap.get(node)
266
    if ip is None:
267
      ip = nslookup_fn(node, family=family)
268
    result.append((node, ip))
269

    
270
  return result
271

    
272

    
273
class _StaticResolver:
274
  def __init__(self, addresses):
275
    """Initializes this class.
276

277
    """
278
    self._addresses = addresses
279

    
280
  def __call__(self, hosts):
281
    """Returns static addresses for hosts.
282

283
    """
284
    assert len(hosts) == len(self._addresses)
285
    return zip(hosts, self._addresses)
286

    
287

    
288
def _CheckConfigNode(name, node):
289
  """Checks if a node is online.
290

291
  @type name: string
292
  @param name: Node name
293
  @type node: L{objects.Node} or None
294
  @param node: Node object
295

296
  """
297
  if node is None:
298
    # Depend on DNS for name resolution
299
    ip = name
300
  elif node.offline:
301
    ip = _OFFLINE
302
  else:
303
    ip = node.primary_ip
304
  return (name, ip)
305

    
306

    
307
def _NodeConfigResolver(single_node_fn, all_nodes_fn, hosts):
308
  """Calculate node addresses using configuration.
309

310
  """
311
  # Special case for single-host lookups
312
  if len(hosts) == 1:
313
    (name, ) = hosts
314
    return [_CheckConfigNode(name, single_node_fn(name))]
315
  else:
316
    all_nodes = all_nodes_fn()
317
    return [_CheckConfigNode(name, all_nodes.get(name, None))
318
            for name in hosts]
319

    
320

    
321
class _RpcProcessor:
322
  def __init__(self, resolver, port, lock_monitor_cb=None):
323
    """Initializes this class.
324

325
    @param resolver: callable accepting a list of hostnames, returning a list
326
      of tuples containing name and IP address (IP address can be the name or
327
      the special value L{_OFFLINE} to mark offline machines)
328
    @type port: int
329
    @param port: TCP port
330
    @param lock_monitor_cb: Callable for registering with lock monitor
331

332
    """
333
    self._resolver = resolver
334
    self._port = port
335
    self._lock_monitor_cb = lock_monitor_cb
336

    
337
  @staticmethod
338
  def _PrepareRequests(hosts, port, procedure, body, read_timeout):
339
    """Prepares requests by sorting offline hosts into separate list.
340

341
    """
342
    results = {}
343
    requests = {}
344

    
345
    for (name, ip) in hosts:
346
      if ip is _OFFLINE:
347
        # Node is marked as offline
348
        results[name] = RpcResult(node=name, offline=True, call=procedure)
349
      else:
350
        requests[name] = \
351
          http.client.HttpClientRequest(str(ip), port,
352
                                        http.HTTP_PUT, str("/%s" % procedure),
353
                                        headers=_RPC_CLIENT_HEADERS,
354
                                        post_data=body,
355
                                        read_timeout=read_timeout,
356
                                        nicename="%s/%s" % (name, procedure),
357
                                        curl_config_fn=_ConfigRpcCurl)
358

    
359
    return (results, requests)
360

    
361
  @staticmethod
362
  def _CombineResults(results, requests, procedure):
363
    """Combines pre-computed results for offline hosts with actual call results.
364

365
    """
366
    for name, req in requests.items():
367
      if req.success and req.resp_status_code == http.HTTP_OK:
368
        host_result = RpcResult(data=serializer.LoadJson(req.resp_body),
369
                                node=name, call=procedure)
370
      else:
371
        # TODO: Better error reporting
372
        if req.error:
373
          msg = req.error
374
        else:
375
          msg = req.resp_body
376

    
377
        logging.error("RPC error in %s on node %s: %s", procedure, name, msg)
378
        host_result = RpcResult(data=msg, failed=True, node=name,
379
                                call=procedure)
380

    
381
      results[name] = host_result
382

    
383
    return results
384

    
385
  def __call__(self, hosts, procedure, body, read_timeout=None,
386
               _req_process_fn=http.client.ProcessRequests):
387
    """Makes an RPC request to a number of nodes.
388

389
    @type hosts: sequence
390
    @param hosts: Hostnames
391
    @type procedure: string
392
    @param procedure: Request path
393
    @type body: string
394
    @param body: Request body
395
    @type read_timeout: int or None
396
    @param read_timeout: Read timeout for request
397

398
    """
399
    assert read_timeout is not None, \
400
      "Missing RPC read timeout for procedure '%s'" % procedure
401

    
402
    (results, requests) = \
403
      self._PrepareRequests(self._resolver(hosts), self._port, procedure,
404
                            str(body), read_timeout)
405

    
406
    _req_process_fn(requests.values(), lock_monitor_cb=self._lock_monitor_cb)
407

    
408
    assert not frozenset(results).intersection(requests)
409

    
410
    return self._CombineResults(results, requests, procedure)
411

    
412

    
413
class _RpcClientBase:
414
  def __init__(self, resolver, encoder_fn, lock_monitor_cb=None):
415
    """Initializes this class.
416

417
    """
418
    self._proc = _RpcProcessor(resolver,
419
                               netutils.GetDaemonPort(constants.NODED),
420
                               lock_monitor_cb=lock_monitor_cb)
421
    self._encoder = compat.partial(self._EncodeArg, encoder_fn)
422

    
423
  @staticmethod
424
  def _EncodeArg(encoder_fn, (argkind, value)):
425
    """Encode argument.
426

427
    """
428
    if argkind is None:
429
      return value
430
    else:
431
      return encoder_fn(argkind)(value)
432

    
433
  def _Call(self, cdef, node_list, timeout, args):
434
    """Entry point for automatically generated RPC wrappers.
435

436
    """
437
    (procedure, _, _, argdefs, _, _) = cdef
438

    
439
    body = serializer.DumpJson(map(self._encoder,
440
                                   zip(map(compat.snd, argdefs), args)),
441
                               indent=False)
442

    
443
    return self._proc(node_list, procedure, body, read_timeout=timeout)
444

    
445

    
446
def _ObjectToDict(value):
447
  """Converts an object to a dictionary.
448

449
  @note: See L{objects}.
450

451
  """
452
  return value.ToDict()
453

    
454

    
455
def _ObjectListToDict(value):
456
  """Converts a list of L{objects} to dictionaries.
457

458
  """
459
  return map(_ObjectToDict, value)
460

    
461

    
462
def _EncodeNodeToDiskDict(value):
463
  """Encodes a dictionary with node name as key and disk objects as values.
464

465
  """
466
  return dict((name, _ObjectListToDict(disks))
467
              for name, disks in value.items())
468

    
469

    
470
def _PrepareFileUpload(filename):
471
  """Loads a file and prepares it for an upload to nodes.
472

473
  """
474
  data = _Compress(utils.ReadFile(filename))
475
  st = os.stat(filename)
476
  getents = runtime.GetEnts()
477
  return [filename, data, st.st_mode, getents.LookupUid(st.st_uid),
478
          getents.LookupGid(st.st_gid), st.st_atime, st.st_mtime]
479

    
480

    
481
def _PrepareFinalizeExportDisks(snap_disks):
482
  """Encodes disks for finalizing export.
483

484
  """
485
  flat_disks = []
486

    
487
  for disk in snap_disks:
488
    if isinstance(disk, bool):
489
      flat_disks.append(disk)
490
    else:
491
      flat_disks.append(disk.ToDict())
492

    
493
  return flat_disks
494

    
495

    
496
def _EncodeImportExportIO((ieio, ieioargs)):
497
  """Encodes import/export I/O information.
498

499
  """
500
  if ieio == constants.IEIO_RAW_DISK:
501
    assert len(ieioargs) == 1
502
    return (ieio, (ieioargs[0].ToDict(), ))
503

    
504
  if ieio == constants.IEIO_SCRIPT:
505
    assert len(ieioargs) == 2
506
    return (ieio, (ieioargs[0].ToDict(), ieioargs[1]))
507

    
508
  return (ieio, ieioargs)
509

    
510

    
511
def _EncodeBlockdevRename(value):
512
  """Encodes information for renaming block devices.
513

514
  """
515
  return [(d.ToDict(), uid) for d, uid in value]
516

    
517

    
518
#: Generic encoders
519
_ENCODERS = {
520
  rpc_defs.ED_OBJECT_DICT: _ObjectToDict,
521
  rpc_defs.ED_OBJECT_DICT_LIST: _ObjectListToDict,
522
  rpc_defs.ED_NODE_TO_DISK_DICT: _EncodeNodeToDiskDict,
523
  rpc_defs.ED_FILE_DETAILS: _PrepareFileUpload,
524
  rpc_defs.ED_COMPRESS: _Compress,
525
  rpc_defs.ED_FINALIZE_EXPORT_DISKS: _PrepareFinalizeExportDisks,
526
  rpc_defs.ED_IMPEXP_IO: _EncodeImportExportIO,
527
  rpc_defs.ED_BLOCKDEV_RENAME: _EncodeBlockdevRename,
528
  }
529

    
530

    
531
class RpcRunner(_RpcClientBase,
532
                _generated_rpc.RpcClientDefault,
533
                _generated_rpc.RpcClientBootstrap,
534
                _generated_rpc.RpcClientConfig):
535
  """RPC runner class.
536

537
  """
538
  def __init__(self, context):
539
    """Initialized the RPC runner.
540

541
    @type context: C{masterd.GanetiContext}
542
    @param context: Ganeti context
543

544
    """
545
    self._cfg = context.cfg
546

    
547
    encoders = _ENCODERS.copy()
548

    
549
    # Add encoders requiring configuration object
550
    encoders.update({
551
      rpc_defs.ED_INST_DICT: self._InstDict,
552
      rpc_defs.ED_INST_DICT_HVP_BEP: self._InstDictHvpBep,
553
      rpc_defs.ED_INST_DICT_OSP: self._InstDictOsp,
554
      })
555

    
556
    # Resolver using configuration
557
    resolver = compat.partial(_NodeConfigResolver, self._cfg.GetNodeInfo,
558
                              self._cfg.GetAllNodesInfo)
559

    
560
    # Pylint doesn't recognize multiple inheritance properly, see
561
    # <http://www.logilab.org/ticket/36586> and
562
    # <http://www.logilab.org/ticket/35642>
563
    # pylint: disable=W0233
564
    _RpcClientBase.__init__(self, resolver, encoders.get,
565
                            lock_monitor_cb=context.glm.AddToLockMonitor)
566
    _generated_rpc.RpcClientConfig.__init__(self)
567
    _generated_rpc.RpcClientBootstrap.__init__(self)
568
    _generated_rpc.RpcClientDefault.__init__(self)
569

    
570
  def _InstDict(self, instance, hvp=None, bep=None, osp=None):
571
    """Convert the given instance to a dict.
572

573
    This is done via the instance's ToDict() method and additionally
574
    we fill the hvparams with the cluster defaults.
575

576
    @type instance: L{objects.Instance}
577
    @param instance: an Instance object
578
    @type hvp: dict or None
579
    @param hvp: a dictionary with overridden hypervisor parameters
580
    @type bep: dict or None
581
    @param bep: a dictionary with overridden backend parameters
582
    @type osp: dict or None
583
    @param osp: a dictionary with overridden os parameters
584
    @rtype: dict
585
    @return: the instance dict, with the hvparams filled with the
586
        cluster defaults
587

588
    """
589
    idict = instance.ToDict()
590
    cluster = self._cfg.GetClusterInfo()
591
    idict["hvparams"] = cluster.FillHV(instance)
592
    if hvp is not None:
593
      idict["hvparams"].update(hvp)
594
    idict["beparams"] = cluster.FillBE(instance)
595
    if bep is not None:
596
      idict["beparams"].update(bep)
597
    idict["osparams"] = cluster.SimpleFillOS(instance.os, instance.osparams)
598
    if osp is not None:
599
      idict["osparams"].update(osp)
600
    for nic in idict["nics"]:
601
      nic['nicparams'] = objects.FillDict(
602
        cluster.nicparams[constants.PP_DEFAULT],
603
        nic['nicparams'])
604
    return idict
605

    
606
  def _InstDictHvpBep(self, (instance, hvp, bep)):
607
    """Wrapper for L{_InstDict}.
608

609
    """
610
    return self._InstDict(instance, hvp=hvp, bep=bep)
611

    
612
  def _InstDictOsp(self, (instance, osparams)):
613
    """Wrapper for L{_InstDict}.
614

615
    """
616
    return self._InstDict(instance, osp=osparams)
617

    
618
  @staticmethod
619
  def _MigrationStatusPostProc(result):
620
    if not result.fail_msg and result.payload is not None:
621
      result.payload = objects.MigrationStatus.FromDict(result.payload)
622
    return result
623

    
624
  @staticmethod
625
  def _BlockdevFindPostProc(result):
626
    if not result.fail_msg and result.payload is not None:
627
      result.payload = objects.BlockDevStatus.FromDict(result.payload)
628
    return result
629

    
630
  @staticmethod
631
  def _BlockdevGetMirrorStatusPostProc(result):
632
    if not result.fail_msg:
633
      result.payload = [objects.BlockDevStatus.FromDict(i)
634
                        for i in result.payload]
635
    return result
636

    
637
  @staticmethod
638
  def _BlockdevGetMirrorStatusMultiPostProc(result):
639
    for nres in result.values():
640
      if nres.fail_msg:
641
        continue
642

    
643
      for idx, (success, status) in enumerate(nres.payload):
644
        if success:
645
          nres.payload[idx] = (success, objects.BlockDevStatus.FromDict(status))
646

    
647
    return result
648

    
649
  @staticmethod
650
  def _OsGetPostProc(result):
651
    if not result.fail_msg and isinstance(result.payload, dict):
652
      result.payload = objects.OS.FromDict(result.payload)
653
    return result
654

    
655
  @staticmethod
656
  def _ImpExpStatusPostProc(result):
657
    """Post-processor for import/export status.
658

659
    @rtype: Payload containing list of L{objects.ImportExportStatus} instances
660
    @return: Returns a list of the state of each named import/export or None if
661
             a status couldn't be retrieved
662

663
    """
664
    if not result.fail_msg:
665
      decoded = []
666

    
667
      for i in result.payload:
668
        if i is None:
669
          decoded.append(None)
670
          continue
671
        decoded.append(objects.ImportExportStatus.FromDict(i))
672

    
673
      result.payload = decoded
674

    
675
    return result
676

    
677
  #
678
  # Begin RPC calls
679
  #
680

    
681
  def call_test_delay(self, node_list, duration): # pylint: disable=W0221
682
    """Sleep for a fixed time on given node(s).
683

684
    This is a multi-node call.
685

686
    """
687
    # TODO: Use callable timeout calculation
688
    return _generated_rpc.RpcClientDefault.call_test_delay(self,
689
      node_list, duration, read_timeout=int(duration + 5))
690

    
691

    
692
class JobQueueRunner(_RpcClientBase, _generated_rpc.RpcClientJobQueue):
693
  """RPC wrappers for job queue.
694

695
  """
696
  def __init__(self, context, address_list):
697
    """Initializes this class.
698

699
    """
700
    if address_list is None:
701
      resolver = _SsconfResolver
702
    else:
703
      # Caller provided an address list
704
      resolver = _StaticResolver(address_list)
705

    
706
    _RpcClientBase.__init__(self, resolver, _ENCODERS.get,
707
                            lock_monitor_cb=context.glm.AddToLockMonitor)
708
    _generated_rpc.RpcClientJobQueue.__init__(self)
709

    
710

    
711
class BootstrapRunner(_RpcClientBase, _generated_rpc.RpcClientBootstrap):
712
  """RPC wrappers for bootstrapping.
713

714
  """
715
  def __init__(self):
716
    """Initializes this class.
717

718
    """
719
    _RpcClientBase.__init__(self, _SsconfResolver, _ENCODERS.get)
720
    _generated_rpc.RpcClientBootstrap.__init__(self)
721

    
722

    
723
class ConfigRunner(_RpcClientBase, _generated_rpc.RpcClientConfig):
724
  """RPC wrappers for L{config}.
725

726
  """
727
  def __init__(self, context, address_list):
728
    """Initializes this class.
729

730
    """
731
    if context:
732
      lock_monitor_cb = context.glm.AddToLockMonitor
733
    else:
734
      lock_monitor_cb = None
735

    
736
    if address_list is None:
737
      resolver = _SsconfResolver
738
    else:
739
      # Caller provided an address list
740
      resolver = _StaticResolver(address_list)
741

    
742
    _RpcClientBase.__init__(self, resolver, _ENCODERS.get,
743
                            lock_monitor_cb=lock_monitor_cb)
744
    _generated_rpc.RpcClientConfig.__init__(self)