Statistics
| Branch: | Tag: | Revision:

root / lib / rpc / node.py @ 4869595d

History | View | Annotate | Download (31.5 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012, 2013 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Inter-node RPC library.
23

24
"""
25

    
26
# pylint: disable=C0103,R0201,R0904
27
# C0103: Invalid name, since call_ are not valid
28
# R0201: Method could be a function, we keep all rpcs instance methods
29
# as not to change them back and forth between static/instance methods
30
# if they need to start using instance attributes
31
# R0904: Too many public methods
32

    
33
import logging
34
import zlib
35
import base64
36
import pycurl
37
import threading
38
import copy
39

    
40
from ganeti import utils
41
from ganeti import objects
42
from ganeti import http
43
from ganeti import serializer
44
from ganeti import constants
45
from ganeti import errors
46
from ganeti import netutils
47
from ganeti import ssconf
48
from ganeti import runtime
49
from ganeti import compat
50
from ganeti import rpc_defs
51
from ganeti import pathutils
52
from ganeti import vcluster
53

    
54
# Special module generated at build time
55
from ganeti import _generated_rpc
56

    
57
# pylint has a bug here, doesn't see this import
58
import ganeti.http.client  # pylint: disable=W0611
59

    
60

    
61
_RPC_CLIENT_HEADERS = [
62
  "Content-type: %s" % http.HTTP_APP_JSON,
63
  "Expect:",
64
  ]
65

    
66
#: Special value to describe an offline host
67
_OFFLINE = object()
68

    
69

    
70
def Init():
71
  """Initializes the module-global HTTP client manager.
72

73
  Must be called before using any RPC function and while exactly one thread is
74
  running.
75

76
  """
77
  # curl_global_init(3) and curl_global_cleanup(3) must be called with only
78
  # one thread running. This check is just a safety measure -- it doesn't
79
  # cover all cases.
80
  assert threading.activeCount() == 1, \
81
         "Found more than one active thread when initializing pycURL"
82

    
83
  logging.info("Using PycURL %s", pycurl.version)
84

    
85
  pycurl.global_init(pycurl.GLOBAL_ALL)
86

    
87

    
88
def Shutdown():
89
  """Stops the module-global HTTP client manager.
90

91
  Must be called before quitting the program and while exactly one thread is
92
  running.
93

94
  """
95
  pycurl.global_cleanup()
96

    
97

    
98
def _ConfigRpcCurl(curl):
99
  noded_cert = str(pathutils.NODED_CERT_FILE)
100

    
101
  curl.setopt(pycurl.FOLLOWLOCATION, False)
102
  curl.setopt(pycurl.CAINFO, noded_cert)
103
  curl.setopt(pycurl.SSL_VERIFYHOST, 0)
104
  curl.setopt(pycurl.SSL_VERIFYPEER, True)
105
  curl.setopt(pycurl.SSLCERTTYPE, "PEM")
106
  curl.setopt(pycurl.SSLCERT, noded_cert)
107
  curl.setopt(pycurl.SSLKEYTYPE, "PEM")
108
  curl.setopt(pycurl.SSLKEY, noded_cert)
109
  curl.setopt(pycurl.CONNECTTIMEOUT, constants.RPC_CONNECT_TIMEOUT)
110

    
111

    
112
def RunWithRPC(fn):
113
  """RPC-wrapper decorator.
114

115
  When applied to a function, it runs it with the RPC system
116
  initialized, and it shutsdown the system afterwards. This means the
117
  function must be called without RPC being initialized.
118

119
  """
120
  def wrapper(*args, **kwargs):
121
    Init()
122
    try:
123
      return fn(*args, **kwargs)
124
    finally:
125
      Shutdown()
126
  return wrapper
127

    
128

    
129
def _Compress(_, data):
130
  """Compresses a string for transport over RPC.
131

132
  Small amounts of data are not compressed.
133

134
  @type data: str
135
  @param data: Data
136
  @rtype: tuple
137
  @return: Encoded data to send
138

139
  """
140
  # Small amounts of data are not compressed
141
  if len(data) < 512:
142
    return (constants.RPC_ENCODING_NONE, data)
143

    
144
  # Compress with zlib and encode in base64
145
  return (constants.RPC_ENCODING_ZLIB_BASE64,
146
          base64.b64encode(zlib.compress(data, 3)))
147

    
148

    
149
class RpcResult(object):
150
  """RPC Result class.
151

152
  This class holds an RPC result. It is needed since in multi-node
153
  calls we can't raise an exception just because one out of many
154
  failed, and therefore we use this class to encapsulate the result.
155

156
  @ivar data: the data payload, for successful results, or None
157
  @ivar call: the name of the RPC call
158
  @ivar node: the name of the node to which we made the call
159
  @ivar offline: whether the operation failed because the node was
160
      offline, as opposed to actual failure; offline=True will always
161
      imply failed=True, in order to allow simpler checking if
162
      the user doesn't care about the exact failure mode
163
  @ivar fail_msg: the error message if the call failed
164

165
  """
166
  def __init__(self, data=None, failed=False, offline=False,
167
               call=None, node=None):
168
    self.offline = offline
169
    self.call = call
170
    self.node = node
171

    
172
    if offline:
173
      self.fail_msg = "Node is marked offline"
174
      self.data = self.payload = None
175
    elif failed:
176
      self.fail_msg = self._EnsureErr(data)
177
      self.data = self.payload = None
178
    else:
179
      self.data = data
180
      if not isinstance(self.data, (tuple, list)):
181
        self.fail_msg = ("RPC layer error: invalid result type (%s)" %
182
                         type(self.data))
183
        self.payload = None
184
      elif len(data) != 2:
185
        self.fail_msg = ("RPC layer error: invalid result length (%d), "
186
                         "expected 2" % len(self.data))
187
        self.payload = None
188
      elif not self.data[0]:
189
        self.fail_msg = self._EnsureErr(self.data[1])
190
        self.payload = None
191
      else:
192
        # finally success
193
        self.fail_msg = None
194
        self.payload = data[1]
195

    
196
    for attr_name in ["call", "data", "fail_msg",
197
                      "node", "offline", "payload"]:
198
      assert hasattr(self, attr_name), "Missing attribute %s" % attr_name
199

    
200
  def __repr__(self):
201
    return ("RpcResult(data=%s, call=%s, node=%s, offline=%s, fail_msg=%s)" %
202
            (self.offline, self.call, self.node, self.offline, self.fail_msg))
203

    
204
  @staticmethod
205
  def _EnsureErr(val):
206
    """Helper to ensure we return a 'True' value for error."""
207
    if val:
208
      return val
209
    else:
210
      return "No error information"
211

    
212
  def Raise(self, msg, prereq=False, ecode=None):
213
    """If the result has failed, raise an OpExecError.
214

215
    This is used so that LU code doesn't have to check for each
216
    result, but instead can call this function.
217

218
    """
219
    if not self.fail_msg:
220
      return
221

    
222
    if not msg: # one could pass None for default message
223
      msg = ("Call '%s' to node '%s' has failed: %s" %
224
             (self.call, self.node, self.fail_msg))
225
    else:
226
      msg = "%s: %s" % (msg, self.fail_msg)
227
    if prereq:
228
      ec = errors.OpPrereqError
229
    else:
230
      ec = errors.OpExecError
231
    if ecode is not None:
232
      args = (msg, ecode)
233
    else:
234
      args = (msg, )
235
    raise ec(*args) # pylint: disable=W0142
236

    
237
  def Warn(self, msg, feedback_fn):
238
    """If the result has failed, call the feedback_fn.
239

240
    This is used to in cases were LU wants to warn the
241
    user about a failure, but continue anyway.
242

243
    """
244
    if not self.fail_msg:
245
      return
246

    
247
    msg = "%s: %s" % (msg, self.fail_msg)
248
    feedback_fn(msg)
249

    
250

    
251
def _SsconfResolver(ssconf_ips, node_list, _,
252
                    ssc=ssconf.SimpleStore,
253
                    nslookup_fn=netutils.Hostname.GetIP):
254
  """Return addresses for given node names.
255

256
  @type ssconf_ips: bool
257
  @param ssconf_ips: Use the ssconf IPs
258
  @type node_list: list
259
  @param node_list: List of node names
260
  @type ssc: class
261
  @param ssc: SimpleStore class that is used to obtain node->ip mappings
262
  @type nslookup_fn: callable
263
  @param nslookup_fn: function use to do NS lookup
264
  @rtype: list of tuple; (string, string)
265
  @return: List of tuples containing node name and IP address
266

267
  """
268
  ss = ssc()
269
  family = ss.GetPrimaryIPFamily()
270

    
271
  if ssconf_ips:
272
    iplist = ss.GetNodePrimaryIPList()
273
    ipmap = dict(entry.split() for entry in iplist)
274
  else:
275
    ipmap = {}
276

    
277
  result = []
278
  for node in node_list:
279
    ip = ipmap.get(node)
280
    if ip is None:
281
      ip = nslookup_fn(node, family=family)
282
    result.append((node, ip, node))
283

    
284
  return result
285

    
286

    
287
class _StaticResolver:
288
  def __init__(self, addresses):
289
    """Initializes this class.
290

291
    """
292
    self._addresses = addresses
293

    
294
  def __call__(self, hosts, _):
295
    """Returns static addresses for hosts.
296

297
    """
298
    assert len(hosts) == len(self._addresses)
299
    return zip(hosts, self._addresses, hosts)
300

    
301

    
302
def _CheckConfigNode(node_uuid_or_name, node, accept_offline_node):
303
  """Checks if a node is online.
304

305
  @type node_uuid_or_name: string
306
  @param node_uuid_or_name: Node UUID
307
  @type node: L{objects.Node} or None
308
  @param node: Node object
309

310
  """
311
  if node is None:
312
    # Assume that the passed parameter was actually a node name, so depend on
313
    # DNS for name resolution
314
    return (node_uuid_or_name, node_uuid_or_name, node_uuid_or_name)
315
  else:
316
    if node.offline and not accept_offline_node:
317
      ip = _OFFLINE
318
    else:
319
      ip = node.primary_ip
320
    return (node.name, ip, node_uuid_or_name)
321

    
322

    
323
def _NodeConfigResolver(single_node_fn, all_nodes_fn, node_uuids, opts):
324
  """Calculate node addresses using configuration.
325

326
  Note that strings in node_uuids are treated as node names if the UUID is not
327
  found in the configuration.
328

329
  """
330
  accept_offline_node = (opts is rpc_defs.ACCEPT_OFFLINE_NODE)
331

    
332
  assert accept_offline_node or opts is None, "Unknown option"
333

    
334
  # Special case for single-host lookups
335
  if len(node_uuids) == 1:
336
    (uuid, ) = node_uuids
337
    return [_CheckConfigNode(uuid, single_node_fn(uuid), accept_offline_node)]
338
  else:
339
    all_nodes = all_nodes_fn()
340
    return [_CheckConfigNode(uuid, all_nodes.get(uuid, None),
341
                             accept_offline_node)
342
            for uuid in node_uuids]
343

    
344

    
345
class _RpcProcessor:
346
  def __init__(self, resolver, port, lock_monitor_cb=None):
347
    """Initializes this class.
348

349
    @param resolver: callable accepting a list of node UUIDs or hostnames,
350
      returning a list of tuples containing name, IP address and original name
351
      of the resolved node. IP address can be the name or the special value
352
      L{_OFFLINE} to mark offline machines.
353
    @type port: int
354
    @param port: TCP port
355
    @param lock_monitor_cb: Callable for registering with lock monitor
356

357
    """
358
    self._resolver = resolver
359
    self._port = port
360
    self._lock_monitor_cb = lock_monitor_cb
361

    
362
  @staticmethod
363
  def _PrepareRequests(hosts, port, procedure, body, read_timeout):
364
    """Prepares requests by sorting offline hosts into separate list.
365

366
    @type body: dict
367
    @param body: a dictionary with per-host body data
368

369
    """
370
    results = {}
371
    requests = {}
372

    
373
    assert isinstance(body, dict)
374
    assert len(body) == len(hosts)
375
    assert compat.all(isinstance(v, str) for v in body.values())
376
    assert frozenset(map(lambda x: x[2], hosts)) == frozenset(body.keys()), \
377
        "%s != %s" % (hosts, body.keys())
378

    
379
    for (name, ip, original_name) in hosts:
380
      if ip is _OFFLINE:
381
        # Node is marked as offline
382
        results[original_name] = RpcResult(node=name,
383
                                           offline=True,
384
                                           call=procedure)
385
      else:
386
        requests[original_name] = \
387
          http.client.HttpClientRequest(str(ip), port,
388
                                        http.HTTP_POST, str("/%s" % procedure),
389
                                        headers=_RPC_CLIENT_HEADERS,
390
                                        post_data=body[original_name],
391
                                        read_timeout=read_timeout,
392
                                        nicename="%s/%s" % (name, procedure),
393
                                        curl_config_fn=_ConfigRpcCurl)
394

    
395
    return (results, requests)
396

    
397
  @staticmethod
398
  def _CombineResults(results, requests, procedure):
399
    """Combines pre-computed results for offline hosts with actual call results.
400

401
    """
402
    for name, req in requests.items():
403
      if req.success and req.resp_status_code == http.HTTP_OK:
404
        host_result = RpcResult(data=serializer.LoadJson(req.resp_body),
405
                                node=name, call=procedure)
406
      else:
407
        # TODO: Better error reporting
408
        if req.error:
409
          msg = req.error
410
        else:
411
          msg = req.resp_body
412

    
413
        logging.error("RPC error in %s on node %s: %s", procedure, name, msg)
414
        host_result = RpcResult(data=msg, failed=True, node=name,
415
                                call=procedure)
416

    
417
      results[name] = host_result
418

    
419
    return results
420

    
421
  def __call__(self, nodes, procedure, body, read_timeout, resolver_opts,
422
               _req_process_fn=None):
423
    """Makes an RPC request to a number of nodes.
424

425
    @type nodes: sequence
426
    @param nodes: node UUIDs or Hostnames
427
    @type procedure: string
428
    @param procedure: Request path
429
    @type body: dictionary
430
    @param body: dictionary with request bodies per host
431
    @type read_timeout: int or None
432
    @param read_timeout: Read timeout for request
433
    @rtype: dictionary
434
    @return: a dictionary mapping host names to rpc.RpcResult objects
435

436
    """
437
    assert read_timeout is not None, \
438
      "Missing RPC read timeout for procedure '%s'" % procedure
439

    
440
    if _req_process_fn is None:
441
      _req_process_fn = http.client.ProcessRequests
442

    
443
    (results, requests) = \
444
      self._PrepareRequests(self._resolver(nodes, resolver_opts), self._port,
445
                            procedure, body, read_timeout)
446

    
447
    _req_process_fn(requests.values(), lock_monitor_cb=self._lock_monitor_cb)
448

    
449
    assert not frozenset(results).intersection(requests)
450

    
451
    return self._CombineResults(results, requests, procedure)
452

    
453

    
454
class _RpcClientBase:
455
  def __init__(self, resolver, encoder_fn, lock_monitor_cb=None,
456
               _req_process_fn=None):
457
    """Initializes this class.
458

459
    """
460
    proc = _RpcProcessor(resolver,
461
                         netutils.GetDaemonPort(constants.NODED),
462
                         lock_monitor_cb=lock_monitor_cb)
463
    self._proc = compat.partial(proc, _req_process_fn=_req_process_fn)
464
    self._encoder = compat.partial(self._EncodeArg, encoder_fn)
465

    
466
  @staticmethod
467
  def _EncodeArg(encoder_fn, node, (argkind, value)):
468
    """Encode argument.
469

470
    """
471
    if argkind is None:
472
      return value
473
    else:
474
      return encoder_fn(argkind)(node, value)
475

    
476
  def _Call(self, cdef, node_list, args):
477
    """Entry point for automatically generated RPC wrappers.
478

479
    """
480
    (procedure, _, resolver_opts, timeout, argdefs,
481
     prep_fn, postproc_fn, _) = cdef
482

    
483
    if callable(timeout):
484
      read_timeout = timeout(args)
485
    else:
486
      read_timeout = timeout
487

    
488
    if callable(resolver_opts):
489
      req_resolver_opts = resolver_opts(args)
490
    else:
491
      req_resolver_opts = resolver_opts
492

    
493
    if len(args) != len(argdefs):
494
      raise errors.ProgrammerError("Number of passed arguments doesn't match")
495

    
496
    if prep_fn is None:
497
      prep_fn = lambda _, args: args
498
    assert callable(prep_fn)
499

    
500
    # encode the arguments for each node individually, pass them and the node
501
    # name to the prep_fn, and serialise its return value
502
    encode_args_fn = lambda node: map(compat.partial(self._encoder, node),
503
                                      zip(map(compat.snd, argdefs), args))
504
    pnbody = dict((n, serializer.DumpJson(prep_fn(n, encode_args_fn(n))))
505
                  for n in node_list)
506

    
507
    result = self._proc(node_list, procedure, pnbody, read_timeout,
508
                        req_resolver_opts)
509

    
510
    if postproc_fn:
511
      return dict(map(lambda (key, value): (key, postproc_fn(value)),
512
                      result.items()))
513
    else:
514
      return result
515

    
516

    
517
def _ObjectToDict(_, value):
518
  """Converts an object to a dictionary.
519

520
  @note: See L{objects}.
521

522
  """
523
  return value.ToDict()
524

    
525

    
526
def _ObjectListToDict(node, value):
527
  """Converts a list of L{objects} to dictionaries.
528

529
  """
530
  return map(compat.partial(_ObjectToDict, node), value)
531

    
532

    
533
def _PrepareFileUpload(getents_fn, node, filename):
534
  """Loads a file and prepares it for an upload to nodes.
535

536
  """
537
  statcb = utils.FileStatHelper()
538
  data = _Compress(node, utils.ReadFile(filename, preread=statcb))
539
  st = statcb.st
540

    
541
  if getents_fn is None:
542
    getents_fn = runtime.GetEnts
543

    
544
  getents = getents_fn()
545

    
546
  virt_filename = vcluster.MakeVirtualPath(filename)
547

    
548
  return [virt_filename, data, st.st_mode, getents.LookupUid(st.st_uid),
549
          getents.LookupGid(st.st_gid), st.st_atime, st.st_mtime]
550

    
551

    
552
def _PrepareFinalizeExportDisks(_, snap_disks):
553
  """Encodes disks for finalizing export.
554

555
  """
556
  flat_disks = []
557

    
558
  for disk in snap_disks:
559
    if isinstance(disk, bool):
560
      flat_disks.append(disk)
561
    else:
562
      flat_disks.append(disk.ToDict())
563

    
564
  return flat_disks
565

    
566

    
567
def _EncodeBlockdevRename(_, value):
568
  """Encodes information for renaming block devices.
569

570
  """
571
  return [(d.ToDict(), uid) for d, uid in value]
572

    
573

    
574
def _AddSpindlesToLegacyNodeInfo(result, space_info):
575
  """Extracts the spindle information from the space info and adds
576
  it to the result dictionary.
577

578
  @type result: dict of strings
579
  @param result: dictionary holding the result of the legacy node info
580
  @type space_info: list of dicts of strings
581
  @param space_info: list, each row holding space information of one storage
582
    unit
583
  @rtype: None
584
  @return: does not return anything, manipulates the C{result} variable
585

586
  """
587
  lvm_pv_info = utils.storage.LookupSpaceInfoByStorageType(
588
      space_info, constants.ST_LVM_PV)
589
  if lvm_pv_info:
590
    result["spindles_free"] = lvm_pv_info["storage_free"]
591
    result["spindles_total"] = lvm_pv_info["storage_size"]
592
  else:
593
    result["spindles_free"] = 0
594
    result["spindles_total"] = 0
595

    
596

    
597
def _AddStorageInfoToLegacyNodeInfoByTemplate(
598
    result, space_info, disk_template):
599
  """Extracts the storage space information of the disk template from
600
  the space info and adds it to the result dictionary.
601

602
  @see: C{_AddSpindlesToLegacyNodeInfo} for parameter information.
603

604
  """
605
  if utils.storage.DiskTemplateSupportsSpaceReporting(disk_template):
606
    disk_info = utils.storage.LookupSpaceInfoByDiskTemplate(
607
        space_info, disk_template)
608
    result["name"] = disk_info["name"]
609
    result["storage_free"] = disk_info["storage_free"]
610
    result["storage_size"] = disk_info["storage_size"]
611
  else:
612
    # FIXME: consider displaying '-' in this case
613
    result["storage_free"] = 0
614
    result["storage_size"] = 0
615

    
616

    
617
def MakeLegacyNodeInfo(data, disk_template):
618
  """Formats the data returned by L{rpc.RpcRunner.call_node_info}.
619

620
  Converts the data into a single dictionary. This is fine for most use cases,
621
  but some require information from more than one volume group or hypervisor.
622

623
  """
624
  (bootid, space_info, (hv_info, )) = data
625

    
626
  ret = utils.JoinDisjointDicts(hv_info, {"bootid": bootid})
627

    
628
  _AddSpindlesToLegacyNodeInfo(ret, space_info)
629
  _AddStorageInfoToLegacyNodeInfoByTemplate(ret, space_info, disk_template)
630

    
631
  return ret
632

    
633

    
634
def _AnnotateDParamsDRBD(disk, (drbd_params, data_params, meta_params)):
635
  """Annotates just DRBD disks layouts.
636

637
  """
638
  assert disk.dev_type == constants.DT_DRBD8
639

    
640
  disk.params = objects.FillDict(drbd_params, disk.params)
641
  (dev_data, dev_meta) = disk.children
642
  dev_data.params = objects.FillDict(data_params, dev_data.params)
643
  dev_meta.params = objects.FillDict(meta_params, dev_meta.params)
644

    
645
  return disk
646

    
647

    
648
def _AnnotateDParamsGeneric(disk, (params, )):
649
  """Generic disk parameter annotation routine.
650

651
  """
652
  assert disk.dev_type != constants.DT_DRBD8
653

    
654
  disk.params = objects.FillDict(params, disk.params)
655

    
656
  return disk
657

    
658

    
659
def AnnotateDiskParams(disks, disk_params):
660
  """Annotates the disk objects with the disk parameters.
661

662
  @param disks: The list of disks objects to annotate
663
  @param disk_params: The disk parameters for annotation
664
  @returns: A list of disk objects annotated
665

666
  """
667
  def AnnotateDisk(disk):
668
    if disk.dev_type == constants.DT_DISKLESS:
669
      return disk
670

    
671
    ld_params = objects.Disk.ComputeLDParams(disk.dev_type, disk_params)
672

    
673
    if disk.dev_type == constants.DT_DRBD8:
674
      return _AnnotateDParamsDRBD(disk, ld_params)
675
    else:
676
      return _AnnotateDParamsGeneric(disk, ld_params)
677

    
678
  return [AnnotateDisk(disk.Copy()) for disk in disks]
679

    
680

    
681
def _GetExclusiveStorageFlag(cfg, node_uuid):
682
  ni = cfg.GetNodeInfo(node_uuid)
683
  if ni is None:
684
    raise errors.OpPrereqError("Invalid node name %s" % node_uuid,
685
                               errors.ECODE_NOENT)
686
  return cfg.GetNdParams(ni)[constants.ND_EXCLUSIVE_STORAGE]
687

    
688

    
689
def _AddExclusiveStorageFlagToLvmStorageUnits(storage_units, es_flag):
690
  """Adds the exclusive storage flag to lvm units.
691

692
  This function creates a copy of the storage_units lists, with the
693
  es_flag being added to all lvm storage units.
694

695
  @type storage_units: list of pairs (string, string)
696
  @param storage_units: list of 'raw' storage units, consisting only of
697
    (storage_type, storage_key)
698
  @type es_flag: boolean
699
  @param es_flag: exclusive storage flag
700
  @rtype: list of tuples (string, string, list)
701
  @return: list of storage units (storage_type, storage_key, params) with
702
    the params containing the es_flag for lvm-vg storage units
703

704
  """
705
  result = []
706
  for (storage_type, storage_key) in storage_units:
707
    if storage_type in [constants.ST_LVM_VG]:
708
      result.append((storage_type, storage_key, [es_flag]))
709
      if es_flag:
710
        result.append((constants.ST_LVM_PV, storage_key, [es_flag]))
711
    else:
712
      result.append((storage_type, storage_key, []))
713
  return result
714

    
715

    
716
def GetExclusiveStorageForNodes(cfg, node_uuids):
717
  """Return the exclusive storage flag for all the given nodes.
718

719
  @type cfg: L{config.ConfigWriter}
720
  @param cfg: cluster configuration
721
  @type node_uuids: list or tuple
722
  @param node_uuids: node UUIDs for which to read the flag
723
  @rtype: dict
724
  @return: mapping from node uuids to exclusive storage flags
725
  @raise errors.OpPrereqError: if any given node name has no corresponding
726
  node
727

728
  """
729
  getflag = lambda n: _GetExclusiveStorageFlag(cfg, n)
730
  flags = map(getflag, node_uuids)
731
  return dict(zip(node_uuids, flags))
732

    
733

    
734
def PrepareStorageUnitsForNodes(cfg, storage_units, node_uuids):
735
  """Return the lvm storage unit for all the given nodes.
736

737
  Main purpose of this function is to map the exclusive storage flag, which
738
  can be different for each node, to the default LVM storage unit.
739

740
  @type cfg: L{config.ConfigWriter}
741
  @param cfg: cluster configuration
742
  @type storage_units: list of pairs (string, string)
743
  @param storage_units: list of 'raw' storage units, e.g. pairs of
744
    (storage_type, storage_key)
745
  @type node_uuids: list or tuple
746
  @param node_uuids: node UUIDs for which to read the flag
747
  @rtype: dict
748
  @return: mapping from node uuids to a list of storage units which include
749
    the exclusive storage flag for lvm storage
750
  @raise errors.OpPrereqError: if any given node name has no corresponding
751
  node
752

753
  """
754
  getunit = lambda n: _AddExclusiveStorageFlagToLvmStorageUnits(
755
      storage_units, _GetExclusiveStorageFlag(cfg, n))
756
  flags = map(getunit, node_uuids)
757
  return dict(zip(node_uuids, flags))
758

    
759

    
760
#: Generic encoders
761
_ENCODERS = {
762
  rpc_defs.ED_OBJECT_DICT: _ObjectToDict,
763
  rpc_defs.ED_OBJECT_DICT_LIST: _ObjectListToDict,
764
  rpc_defs.ED_COMPRESS: _Compress,
765
  rpc_defs.ED_FINALIZE_EXPORT_DISKS: _PrepareFinalizeExportDisks,
766
  rpc_defs.ED_BLOCKDEV_RENAME: _EncodeBlockdevRename,
767
  }
768

    
769

    
770
class RpcRunner(_RpcClientBase,
771
                _generated_rpc.RpcClientDefault,
772
                _generated_rpc.RpcClientBootstrap,
773
                _generated_rpc.RpcClientDnsOnly,
774
                _generated_rpc.RpcClientConfig):
775
  """RPC runner class.
776

777
  """
778
  def __init__(self, cfg, lock_monitor_cb, _req_process_fn=None, _getents=None):
779
    """Initialized the RPC runner.
780

781
    @type cfg: L{config.ConfigWriter}
782
    @param cfg: Configuration
783
    @type lock_monitor_cb: callable
784
    @param lock_monitor_cb: Lock monitor callback
785

786
    """
787
    self._cfg = cfg
788

    
789
    encoders = _ENCODERS.copy()
790

    
791
    encoders.update({
792
      # Encoders requiring configuration object
793
      rpc_defs.ED_INST_DICT: self._InstDict,
794
      rpc_defs.ED_INST_DICT_HVP_BEP_DP: self._InstDictHvpBepDp,
795
      rpc_defs.ED_INST_DICT_OSP_DP: self._InstDictOspDp,
796
      rpc_defs.ED_NIC_DICT: self._NicDict,
797
      rpc_defs.ED_DEVICE_DICT: self._DeviceDict,
798

    
799
      # Encoders annotating disk parameters
800
      rpc_defs.ED_DISKS_DICT_DP: self._DisksDictDP,
801
      rpc_defs.ED_MULTI_DISKS_DICT_DP: self._MultiDiskDictDP,
802
      rpc_defs.ED_SINGLE_DISK_DICT_DP: self._SingleDiskDictDP,
803
      rpc_defs.ED_NODE_TO_DISK_DICT_DP: self._EncodeNodeToDiskDictDP,
804

    
805
      # Encoders with special requirements
806
      rpc_defs.ED_FILE_DETAILS: compat.partial(_PrepareFileUpload, _getents),
807

    
808
      rpc_defs.ED_IMPEXP_IO: self._EncodeImportExportIO,
809
      })
810

    
811
    # Resolver using configuration
812
    resolver = compat.partial(_NodeConfigResolver, cfg.GetNodeInfo,
813
                              cfg.GetAllNodesInfo)
814

    
815
    # Pylint doesn't recognize multiple inheritance properly, see
816
    # <http://www.logilab.org/ticket/36586> and
817
    # <http://www.logilab.org/ticket/35642>
818
    # pylint: disable=W0233
819
    _RpcClientBase.__init__(self, resolver, encoders.get,
820
                            lock_monitor_cb=lock_monitor_cb,
821
                            _req_process_fn=_req_process_fn)
822
    _generated_rpc.RpcClientConfig.__init__(self)
823
    _generated_rpc.RpcClientBootstrap.__init__(self)
824
    _generated_rpc.RpcClientDnsOnly.__init__(self)
825
    _generated_rpc.RpcClientDefault.__init__(self)
826

    
827
  def _NicDict(self, _, nic):
828
    """Convert the given nic to a dict and encapsulate netinfo
829

830
    """
831
    n = copy.deepcopy(nic)
832
    if n.network:
833
      net_uuid = self._cfg.LookupNetwork(n.network)
834
      if net_uuid:
835
        nobj = self._cfg.GetNetwork(net_uuid)
836
        n.netinfo = objects.Network.ToDict(nobj)
837
    return n.ToDict()
838

    
839
  def _DeviceDict(self, _, (device, instance)):
840
    if isinstance(device, objects.NIC):
841
      return self._NicDict(None, device)
842
    elif isinstance(device, objects.Disk):
843
      return self._SingleDiskDictDP(None, (device, instance))
844

    
845
  def _InstDict(self, node, instance, hvp=None, bep=None, osp=None):
846
    """Convert the given instance to a dict.
847

848
    This is done via the instance's ToDict() method and additionally
849
    we fill the hvparams with the cluster defaults.
850

851
    @type instance: L{objects.Instance}
852
    @param instance: an Instance object
853
    @type hvp: dict or None
854
    @param hvp: a dictionary with overridden hypervisor parameters
855
    @type bep: dict or None
856
    @param bep: a dictionary with overridden backend parameters
857
    @type osp: dict or None
858
    @param osp: a dictionary with overridden os parameters
859
    @rtype: dict
860
    @return: the instance dict, with the hvparams filled with the
861
        cluster defaults
862

863
    """
864
    idict = instance.ToDict()
865
    cluster = self._cfg.GetClusterInfo()
866
    idict["hvparams"] = cluster.FillHV(instance)
867
    if hvp is not None:
868
      idict["hvparams"].update(hvp)
869
    idict["beparams"] = cluster.FillBE(instance)
870
    if bep is not None:
871
      idict["beparams"].update(bep)
872
    idict["osparams"] = cluster.SimpleFillOS(instance.os, instance.osparams)
873
    if osp is not None:
874
      idict["osparams"].update(osp)
875
    idict["disks"] = self._DisksDictDP(node, (instance.disks, instance))
876
    for nic in idict["nics"]:
877
      nic["nicparams"] = objects.FillDict(
878
        cluster.nicparams[constants.PP_DEFAULT],
879
        nic["nicparams"])
880
      network = nic.get("network", None)
881
      if network:
882
        net_uuid = self._cfg.LookupNetwork(network)
883
        if net_uuid:
884
          nobj = self._cfg.GetNetwork(net_uuid)
885
          nic["netinfo"] = objects.Network.ToDict(nobj)
886
    return idict
887

    
888
  def _InstDictHvpBepDp(self, node, (instance, hvp, bep)):
889
    """Wrapper for L{_InstDict}.
890

891
    """
892
    return self._InstDict(node, instance, hvp=hvp, bep=bep)
893

    
894
  def _InstDictOspDp(self, node, (instance, osparams)):
895
    """Wrapper for L{_InstDict}.
896

897
    """
898
    return self._InstDict(node, instance, osp=osparams)
899

    
900
  def _DisksDictDP(self, node, (disks, instance)):
901
    """Wrapper for L{AnnotateDiskParams}.
902

903
    """
904
    diskparams = self._cfg.GetInstanceDiskParams(instance)
905
    ret = []
906
    for disk in AnnotateDiskParams(disks, diskparams):
907
      disk_node_uuids = disk.GetNodes(instance.primary_node)
908
      node_ips = dict((uuid, node.secondary_ip) for (uuid, node)
909
                      in self._cfg.GetMultiNodeInfo(disk_node_uuids))
910

    
911
      disk.UpdateDynamicDiskParams(node, node_ips)
912

    
913
      ret.append(disk.ToDict(include_dynamic_params=True))
914

    
915
    return ret
916

    
917
  def _MultiDiskDictDP(self, node, disks_insts):
918
    """Wrapper for L{AnnotateDiskParams}.
919

920
    Supports a list of (disk, instance) tuples.
921
    """
922
    return [disk for disk_inst in disks_insts
923
            for disk in self._DisksDictDP(node, disk_inst)]
924

    
925
  def _SingleDiskDictDP(self, node, (disk, instance)):
926
    """Wrapper for L{AnnotateDiskParams}.
927

928
    """
929
    (anno_disk,) = self._DisksDictDP(node, ([disk], instance))
930
    return anno_disk
931

    
932
  def _EncodeNodeToDiskDictDP(self, node, value):
933
    """Encode dict of node name -> list of (disk, instance) tuples as values.
934

935
    """
936
    return dict((name, [self._SingleDiskDictDP(node, disk) for disk in disks])
937
                for name, disks in value.items())
938

    
939
  def _EncodeImportExportIO(self, node, (ieio, ieioargs)):
940
    """Encodes import/export I/O information.
941

942
    """
943
    if ieio == constants.IEIO_RAW_DISK:
944
      assert len(ieioargs) == 2
945
      return (ieio, (self._SingleDiskDictDP(node, ieioargs), ))
946

    
947
    if ieio == constants.IEIO_SCRIPT:
948
      assert len(ieioargs) == 2
949
      return (ieio, (self._SingleDiskDictDP(node, ieioargs[0]), ieioargs[1]))
950

    
951
    return (ieio, ieioargs)
952

    
953

    
954
class JobQueueRunner(_RpcClientBase, _generated_rpc.RpcClientJobQueue):
955
  """RPC wrappers for job queue.
956

957
  """
958
  def __init__(self, context, address_list):
959
    """Initializes this class.
960

961
    """
962
    if address_list is None:
963
      resolver = compat.partial(_SsconfResolver, True)
964
    else:
965
      # Caller provided an address list
966
      resolver = _StaticResolver(address_list)
967

    
968
    _RpcClientBase.__init__(self, resolver, _ENCODERS.get,
969
                            lock_monitor_cb=context.glm.AddToLockMonitor)
970
    _generated_rpc.RpcClientJobQueue.__init__(self)
971

    
972

    
973
class BootstrapRunner(_RpcClientBase,
974
                      _generated_rpc.RpcClientBootstrap,
975
                      _generated_rpc.RpcClientDnsOnly):
976
  """RPC wrappers for bootstrapping.
977

978
  """
979
  def __init__(self):
980
    """Initializes this class.
981

982
    """
983
    # Pylint doesn't recognize multiple inheritance properly, see
984
    # <http://www.logilab.org/ticket/36586> and
985
    # <http://www.logilab.org/ticket/35642>
986
    # pylint: disable=W0233
987
    _RpcClientBase.__init__(self, compat.partial(_SsconfResolver, True),
988
                            _ENCODERS.get)
989
    _generated_rpc.RpcClientBootstrap.__init__(self)
990
    _generated_rpc.RpcClientDnsOnly.__init__(self)
991

    
992

    
993
class DnsOnlyRunner(_RpcClientBase, _generated_rpc.RpcClientDnsOnly):
994
  """RPC wrappers for calls using only DNS.
995

996
  """
997
  def __init__(self):
998
    """Initialize this class.
999

1000
    """
1001
    _RpcClientBase.__init__(self, compat.partial(_SsconfResolver, False),
1002
                            _ENCODERS.get)
1003
    _generated_rpc.RpcClientDnsOnly.__init__(self)
1004

    
1005

    
1006
class ConfigRunner(_RpcClientBase, _generated_rpc.RpcClientConfig):
1007
  """RPC wrappers for L{config}.
1008

1009
  """
1010
  def __init__(self, context, address_list, _req_process_fn=None,
1011
               _getents=None):
1012
    """Initializes this class.
1013

1014
    """
1015
    if context:
1016
      lock_monitor_cb = context.glm.AddToLockMonitor
1017
    else:
1018
      lock_monitor_cb = None
1019

    
1020
    if address_list is None:
1021
      resolver = compat.partial(_SsconfResolver, True)
1022
    else:
1023
      # Caller provided an address list
1024
      resolver = _StaticResolver(address_list)
1025

    
1026
    encoders = _ENCODERS.copy()
1027

    
1028
    encoders.update({
1029
      rpc_defs.ED_FILE_DETAILS: compat.partial(_PrepareFileUpload, _getents),
1030
      })
1031

    
1032
    _RpcClientBase.__init__(self, resolver, encoders.get,
1033
                            lock_monitor_cb=lock_monitor_cb,
1034
                            _req_process_fn=_req_process_fn)
1035
    _generated_rpc.RpcClientConfig.__init__(self)