Statistics
| Branch: | Tag: | Revision:

root / lib / rpc / node.py @ 8e8cf324

History | View | Annotate | Download (32 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012, 2013 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Inter-node RPC library.
23

24
"""
25

    
26
# pylint: disable=C0103,R0201,R0904
27
# C0103: Invalid name, since call_ are not valid
28
# R0201: Method could be a function, we keep all rpcs instance methods
29
# as not to change them back and forth between static/instance methods
30
# if they need to start using instance attributes
31
# R0904: Too many public methods
32

    
33
import logging
34
import zlib
35
import base64
36
import pycurl
37
import threading
38
import copy
39
import os
40

    
41
from ganeti import utils
42
from ganeti import objects
43
from ganeti import http
44
from ganeti import serializer
45
from ganeti import constants
46
from ganeti import errors
47
from ganeti import netutils
48
from ganeti import ssconf
49
from ganeti import runtime
50
from ganeti import compat
51
from ganeti import rpc_defs
52
from ganeti import pathutils
53
from ganeti import vcluster
54

    
55
# Special module generated at build time
56
from ganeti import _generated_rpc
57

    
58
# pylint has a bug here, doesn't see this import
59
import ganeti.http.client  # pylint: disable=W0611
60

    
61

    
62
_RPC_CLIENT_HEADERS = [
63
  "Content-type: %s" % http.HTTP_APP_JSON,
64
  "Expect:",
65
  ]
66

    
67
#: Special value to describe an offline host
68
_OFFLINE = object()
69

    
70

    
71
def Init():
72
  """Initializes the module-global HTTP client manager.
73

74
  Must be called before using any RPC function and while exactly one thread is
75
  running.
76

77
  """
78
  # curl_global_init(3) and curl_global_cleanup(3) must be called with only
79
  # one thread running. This check is just a safety measure -- it doesn't
80
  # cover all cases.
81
  assert threading.activeCount() == 1, \
82
         "Found more than one active thread when initializing pycURL"
83

    
84
  logging.info("Using PycURL %s", pycurl.version)
85

    
86
  pycurl.global_init(pycurl.GLOBAL_ALL)
87

    
88

    
89
def Shutdown():
90
  """Stops the module-global HTTP client manager.
91

92
  Must be called before quitting the program and while exactly one thread is
93
  running.
94

95
  """
96
  pycurl.global_cleanup()
97

    
98

    
99
def _ConfigRpcCurl(curl):
100
  noded_cert = str(pathutils.NODED_CERT_FILE)
101
  noded_client_cert = str(pathutils.NODED_CLIENT_CERT_FILE)
102

    
103
  # FIXME: The next two lines are necessary to ensure upgradability from
104
  # 2.10 to 2.11. Remove in 2.12, because this slows down RPC calls.
105
  if not os.path.exists(noded_client_cert):
106
    logging.info("Using server certificate as client certificate for RPC"
107
                 "call.")
108
    noded_client_cert = noded_cert
109

    
110
  curl.setopt(pycurl.FOLLOWLOCATION, False)
111
  curl.setopt(pycurl.CAINFO, noded_cert)
112
  curl.setopt(pycurl.SSL_VERIFYHOST, 0)
113
  curl.setopt(pycurl.SSL_VERIFYPEER, True)
114
  curl.setopt(pycurl.SSLCERTTYPE, "PEM")
115
  curl.setopt(pycurl.SSLCERT, noded_client_cert)
116
  curl.setopt(pycurl.SSLKEYTYPE, "PEM")
117
  curl.setopt(pycurl.SSLKEY, noded_client_cert)
118
  curl.setopt(pycurl.CONNECTTIMEOUT, constants.RPC_CONNECT_TIMEOUT)
119

    
120

    
121
def RunWithRPC(fn):
122
  """RPC-wrapper decorator.
123

124
  When applied to a function, it runs it with the RPC system
125
  initialized, and it shutsdown the system afterwards. This means the
126
  function must be called without RPC being initialized.
127

128
  """
129
  def wrapper(*args, **kwargs):
130
    Init()
131
    try:
132
      return fn(*args, **kwargs)
133
    finally:
134
      Shutdown()
135
  return wrapper
136

    
137

    
138
def _Compress(_, data):
139
  """Compresses a string for transport over RPC.
140

141
  Small amounts of data are not compressed.
142

143
  @type data: str
144
  @param data: Data
145
  @rtype: tuple
146
  @return: Encoded data to send
147

148
  """
149
  # Small amounts of data are not compressed
150
  if len(data) < 512:
151
    return (constants.RPC_ENCODING_NONE, data)
152

    
153
  # Compress with zlib and encode in base64
154
  return (constants.RPC_ENCODING_ZLIB_BASE64,
155
          base64.b64encode(zlib.compress(data, 3)))
156

    
157

    
158
class RpcResult(object):
159
  """RPC Result class.
160

161
  This class holds an RPC result. It is needed since in multi-node
162
  calls we can't raise an exception just because one out of many
163
  failed, and therefore we use this class to encapsulate the result.
164

165
  @ivar data: the data payload, for successful results, or None
166
  @ivar call: the name of the RPC call
167
  @ivar node: the name of the node to which we made the call
168
  @ivar offline: whether the operation failed because the node was
169
      offline, as opposed to actual failure; offline=True will always
170
      imply failed=True, in order to allow simpler checking if
171
      the user doesn't care about the exact failure mode
172
  @ivar fail_msg: the error message if the call failed
173

174
  """
175
  def __init__(self, data=None, failed=False, offline=False,
176
               call=None, node=None):
177
    self.offline = offline
178
    self.call = call
179
    self.node = node
180

    
181
    if offline:
182
      self.fail_msg = "Node is marked offline"
183
      self.data = self.payload = None
184
    elif failed:
185
      self.fail_msg = self._EnsureErr(data)
186
      self.data = self.payload = None
187
    else:
188
      self.data = data
189
      if not isinstance(self.data, (tuple, list)):
190
        self.fail_msg = ("RPC layer error: invalid result type (%s)" %
191
                         type(self.data))
192
        self.payload = None
193
      elif len(data) != 2:
194
        self.fail_msg = ("RPC layer error: invalid result length (%d), "
195
                         "expected 2" % len(self.data))
196
        self.payload = None
197
      elif not self.data[0]:
198
        self.fail_msg = self._EnsureErr(self.data[1])
199
        self.payload = None
200
      else:
201
        # finally success
202
        self.fail_msg = None
203
        self.payload = data[1]
204

    
205
    for attr_name in ["call", "data", "fail_msg",
206
                      "node", "offline", "payload"]:
207
      assert hasattr(self, attr_name), "Missing attribute %s" % attr_name
208

    
209
  def __repr__(self):
210
    return ("RpcResult(data=%s, call=%s, node=%s, offline=%s, fail_msg=%s)" %
211
            (self.offline, self.call, self.node, self.offline, self.fail_msg))
212

    
213
  @staticmethod
214
  def _EnsureErr(val):
215
    """Helper to ensure we return a 'True' value for error."""
216
    if val:
217
      return val
218
    else:
219
      return "No error information"
220

    
221
  def Raise(self, msg, prereq=False, ecode=None):
222
    """If the result has failed, raise an OpExecError.
223

224
    This is used so that LU code doesn't have to check for each
225
    result, but instead can call this function.
226

227
    """
228
    if not self.fail_msg:
229
      return
230

    
231
    if not msg: # one could pass None for default message
232
      msg = ("Call '%s' to node '%s' has failed: %s" %
233
             (self.call, self.node, self.fail_msg))
234
    else:
235
      msg = "%s: %s" % (msg, self.fail_msg)
236
    if prereq:
237
      ec = errors.OpPrereqError
238
    else:
239
      ec = errors.OpExecError
240
    if ecode is not None:
241
      args = (msg, ecode)
242
    else:
243
      args = (msg, )
244
    raise ec(*args) # pylint: disable=W0142
245

    
246
  def Warn(self, msg, feedback_fn):
247
    """If the result has failed, call the feedback_fn.
248

249
    This is used to in cases were LU wants to warn the
250
    user about a failure, but continue anyway.
251

252
    """
253
    if not self.fail_msg:
254
      return
255

    
256
    msg = "%s: %s" % (msg, self.fail_msg)
257
    feedback_fn(msg)
258

    
259

    
260
def _SsconfResolver(ssconf_ips, node_list, _,
261
                    ssc=ssconf.SimpleStore,
262
                    nslookup_fn=netutils.Hostname.GetIP):
263
  """Return addresses for given node names.
264

265
  @type ssconf_ips: bool
266
  @param ssconf_ips: Use the ssconf IPs
267
  @type node_list: list
268
  @param node_list: List of node names
269
  @type ssc: class
270
  @param ssc: SimpleStore class that is used to obtain node->ip mappings
271
  @type nslookup_fn: callable
272
  @param nslookup_fn: function use to do NS lookup
273
  @rtype: list of tuple; (string, string)
274
  @return: List of tuples containing node name and IP address
275

276
  """
277
  ss = ssc()
278
  family = ss.GetPrimaryIPFamily()
279

    
280
  if ssconf_ips:
281
    iplist = ss.GetNodePrimaryIPList()
282
    ipmap = dict(entry.split() for entry in iplist)
283
  else:
284
    ipmap = {}
285

    
286
  result = []
287
  for node in node_list:
288
    ip = ipmap.get(node)
289
    if ip is None:
290
      ip = nslookup_fn(node, family=family)
291
    result.append((node, ip, node))
292

    
293
  return result
294

    
295

    
296
class _StaticResolver:
297
  def __init__(self, addresses):
298
    """Initializes this class.
299

300
    """
301
    self._addresses = addresses
302

    
303
  def __call__(self, hosts, _):
304
    """Returns static addresses for hosts.
305

306
    """
307
    assert len(hosts) == len(self._addresses)
308
    return zip(hosts, self._addresses, hosts)
309

    
310

    
311
def _CheckConfigNode(node_uuid_or_name, node, accept_offline_node):
312
  """Checks if a node is online.
313

314
  @type node_uuid_or_name: string
315
  @param node_uuid_or_name: Node UUID
316
  @type node: L{objects.Node} or None
317
  @param node: Node object
318

319
  """
320
  if node is None:
321
    # Assume that the passed parameter was actually a node name, so depend on
322
    # DNS for name resolution
323
    return (node_uuid_or_name, node_uuid_or_name, node_uuid_or_name)
324
  else:
325
    if node.offline and not accept_offline_node:
326
      ip = _OFFLINE
327
    else:
328
      ip = node.primary_ip
329
    return (node.name, ip, node_uuid_or_name)
330

    
331

    
332
def _NodeConfigResolver(single_node_fn, all_nodes_fn, node_uuids, opts):
333
  """Calculate node addresses using configuration.
334

335
  Note that strings in node_uuids are treated as node names if the UUID is not
336
  found in the configuration.
337

338
  """
339
  accept_offline_node = (opts is rpc_defs.ACCEPT_OFFLINE_NODE)
340

    
341
  assert accept_offline_node or opts is None, "Unknown option"
342

    
343
  # Special case for single-host lookups
344
  if len(node_uuids) == 1:
345
    (uuid, ) = node_uuids
346
    return [_CheckConfigNode(uuid, single_node_fn(uuid), accept_offline_node)]
347
  else:
348
    all_nodes = all_nodes_fn()
349
    return [_CheckConfigNode(uuid, all_nodes.get(uuid, None),
350
                             accept_offline_node)
351
            for uuid in node_uuids]
352

    
353

    
354
class _RpcProcessor:
355
  def __init__(self, resolver, port, lock_monitor_cb=None):
356
    """Initializes this class.
357

358
    @param resolver: callable accepting a list of node UUIDs or hostnames,
359
      returning a list of tuples containing name, IP address and original name
360
      of the resolved node. IP address can be the name or the special value
361
      L{_OFFLINE} to mark offline machines.
362
    @type port: int
363
    @param port: TCP port
364
    @param lock_monitor_cb: Callable for registering with lock monitor
365

366
    """
367
    self._resolver = resolver
368
    self._port = port
369
    self._lock_monitor_cb = lock_monitor_cb
370

    
371
  @staticmethod
372
  def _PrepareRequests(hosts, port, procedure, body, read_timeout):
373
    """Prepares requests by sorting offline hosts into separate list.
374

375
    @type body: dict
376
    @param body: a dictionary with per-host body data
377

378
    """
379
    results = {}
380
    requests = {}
381

    
382
    assert isinstance(body, dict)
383
    assert len(body) == len(hosts)
384
    assert compat.all(isinstance(v, str) for v in body.values())
385
    assert frozenset(map(lambda x: x[2], hosts)) == frozenset(body.keys()), \
386
        "%s != %s" % (hosts, body.keys())
387

    
388
    for (name, ip, original_name) in hosts:
389
      if ip is _OFFLINE:
390
        # Node is marked as offline
391
        results[original_name] = RpcResult(node=name,
392
                                           offline=True,
393
                                           call=procedure)
394
      else:
395
        requests[original_name] = \
396
          http.client.HttpClientRequest(str(ip), port,
397
                                        http.HTTP_POST, str("/%s" % procedure),
398
                                        headers=_RPC_CLIENT_HEADERS,
399
                                        post_data=body[original_name],
400
                                        read_timeout=read_timeout,
401
                                        nicename="%s/%s" % (name, procedure),
402
                                        curl_config_fn=_ConfigRpcCurl)
403

    
404
    return (results, requests)
405

    
406
  @staticmethod
407
  def _CombineResults(results, requests, procedure):
408
    """Combines pre-computed results for offline hosts with actual call results.
409

410
    """
411
    for name, req in requests.items():
412
      if req.success and req.resp_status_code == http.HTTP_OK:
413
        host_result = RpcResult(data=serializer.LoadJson(req.resp_body),
414
                                node=name, call=procedure)
415
      else:
416
        # TODO: Better error reporting
417
        if req.error:
418
          msg = req.error
419
        else:
420
          msg = req.resp_body
421

    
422
        logging.error("RPC error in %s on node %s: %s", procedure, name, msg)
423
        host_result = RpcResult(data=msg, failed=True, node=name,
424
                                call=procedure)
425

    
426
      results[name] = host_result
427

    
428
    return results
429

    
430
  def __call__(self, nodes, procedure, body, read_timeout, resolver_opts,
431
               _req_process_fn=None):
432
    """Makes an RPC request to a number of nodes.
433

434
    @type nodes: sequence
435
    @param nodes: node UUIDs or Hostnames
436
    @type procedure: string
437
    @param procedure: Request path
438
    @type body: dictionary
439
    @param body: dictionary with request bodies per host
440
    @type read_timeout: int or None
441
    @param read_timeout: Read timeout for request
442
    @rtype: dictionary
443
    @return: a dictionary mapping host names to rpc.RpcResult objects
444

445
    """
446
    assert read_timeout is not None, \
447
      "Missing RPC read timeout for procedure '%s'" % procedure
448

    
449
    if _req_process_fn is None:
450
      _req_process_fn = http.client.ProcessRequests
451

    
452
    (results, requests) = \
453
      self._PrepareRequests(self._resolver(nodes, resolver_opts), self._port,
454
                            procedure, body, read_timeout)
455

    
456
    _req_process_fn(requests.values(), lock_monitor_cb=self._lock_monitor_cb)
457

    
458
    assert not frozenset(results).intersection(requests)
459

    
460
    return self._CombineResults(results, requests, procedure)
461

    
462

    
463
class _RpcClientBase:
464
  def __init__(self, resolver, encoder_fn, lock_monitor_cb=None,
465
               _req_process_fn=None):
466
    """Initializes this class.
467

468
    """
469
    proc = _RpcProcessor(resolver,
470
                         netutils.GetDaemonPort(constants.NODED),
471
                         lock_monitor_cb=lock_monitor_cb)
472
    self._proc = compat.partial(proc, _req_process_fn=_req_process_fn)
473
    self._encoder = compat.partial(self._EncodeArg, encoder_fn)
474

    
475
  @staticmethod
476
  def _EncodeArg(encoder_fn, node, (argkind, value)):
477
    """Encode argument.
478

479
    """
480
    if argkind is None:
481
      return value
482
    else:
483
      return encoder_fn(argkind)(node, value)
484

    
485
  def _Call(self, cdef, node_list, args):
486
    """Entry point for automatically generated RPC wrappers.
487

488
    """
489
    (procedure, _, resolver_opts, timeout, argdefs,
490
     prep_fn, postproc_fn, _) = cdef
491

    
492
    if callable(timeout):
493
      read_timeout = timeout(args)
494
    else:
495
      read_timeout = timeout
496

    
497
    if callable(resolver_opts):
498
      req_resolver_opts = resolver_opts(args)
499
    else:
500
      req_resolver_opts = resolver_opts
501

    
502
    if len(args) != len(argdefs):
503
      raise errors.ProgrammerError("Number of passed arguments doesn't match")
504

    
505
    if prep_fn is None:
506
      prep_fn = lambda _, args: args
507
    assert callable(prep_fn)
508

    
509
    # encode the arguments for each node individually, pass them and the node
510
    # name to the prep_fn, and serialise its return value
511
    encode_args_fn = lambda node: map(compat.partial(self._encoder, node),
512
                                      zip(map(compat.snd, argdefs), args))
513
    pnbody = dict(
514
      (n,
515
       serializer.DumpJson(prep_fn(n, encode_args_fn(n)),
516
                           private_encoder=serializer.EncodeWithPrivateFields))
517
      for n in node_list
518
    )
519

    
520
    result = self._proc(node_list, procedure, pnbody, read_timeout,
521
                        req_resolver_opts)
522

    
523
    if postproc_fn:
524
      return dict(map(lambda (key, value): (key, postproc_fn(value)),
525
                      result.items()))
526
    else:
527
      return result
528

    
529

    
530
def _ObjectToDict(_, value):
531
  """Converts an object to a dictionary.
532

533
  @note: See L{objects}.
534

535
  """
536
  return value.ToDict()
537

    
538

    
539
def _ObjectListToDict(node, value):
540
  """Converts a list of L{objects} to dictionaries.
541

542
  """
543
  return map(compat.partial(_ObjectToDict, node), value)
544

    
545

    
546
def _PrepareFileUpload(getents_fn, node, filename):
547
  """Loads a file and prepares it for an upload to nodes.
548

549
  """
550
  statcb = utils.FileStatHelper()
551
  data = _Compress(node, utils.ReadFile(filename, preread=statcb))
552
  st = statcb.st
553

    
554
  if getents_fn is None:
555
    getents_fn = runtime.GetEnts
556

    
557
  getents = getents_fn()
558

    
559
  virt_filename = vcluster.MakeVirtualPath(filename)
560

    
561
  return [virt_filename, data, st.st_mode, getents.LookupUid(st.st_uid),
562
          getents.LookupGid(st.st_gid), st.st_atime, st.st_mtime]
563

    
564

    
565
def _PrepareFinalizeExportDisks(_, snap_disks):
566
  """Encodes disks for finalizing export.
567

568
  """
569
  flat_disks = []
570

    
571
  for disk in snap_disks:
572
    if isinstance(disk, bool):
573
      flat_disks.append(disk)
574
    else:
575
      flat_disks.append(disk.ToDict())
576

    
577
  return flat_disks
578

    
579

    
580
def _EncodeBlockdevRename(_, value):
581
  """Encodes information for renaming block devices.
582

583
  """
584
  return [(d.ToDict(), uid) for d, uid in value]
585

    
586

    
587
def _AddSpindlesToLegacyNodeInfo(result, space_info):
588
  """Extracts the spindle information from the space info and adds
589
  it to the result dictionary.
590

591
  @type result: dict of strings
592
  @param result: dictionary holding the result of the legacy node info
593
  @type space_info: list of dicts of strings
594
  @param space_info: list, each row holding space information of one storage
595
    unit
596
  @rtype: None
597
  @return: does not return anything, manipulates the C{result} variable
598

599
  """
600
  lvm_pv_info = utils.storage.LookupSpaceInfoByStorageType(
601
      space_info, constants.ST_LVM_PV)
602
  if lvm_pv_info:
603
    result["spindles_free"] = lvm_pv_info["storage_free"]
604
    result["spindles_total"] = lvm_pv_info["storage_size"]
605
  else:
606
    result["spindles_free"] = 0
607
    result["spindles_total"] = 0
608

    
609

    
610
def _AddStorageInfoToLegacyNodeInfoByTemplate(
611
    result, space_info, disk_template):
612
  """Extracts the storage space information of the disk template from
613
  the space info and adds it to the result dictionary.
614

615
  @see: C{_AddSpindlesToLegacyNodeInfo} for parameter information.
616

617
  """
618
  if utils.storage.DiskTemplateSupportsSpaceReporting(disk_template):
619
    disk_info = utils.storage.LookupSpaceInfoByDiskTemplate(
620
        space_info, disk_template)
621
    result["name"] = disk_info["name"]
622
    result["storage_free"] = disk_info["storage_free"]
623
    result["storage_size"] = disk_info["storage_size"]
624
  else:
625
    # FIXME: consider displaying '-' in this case
626
    result["storage_free"] = 0
627
    result["storage_size"] = 0
628

    
629

    
630
def MakeLegacyNodeInfo(data, disk_template):
631
  """Formats the data returned by call_node_info.
632

633
  Converts the data into a single dictionary. This is fine for most use cases,
634
  but some require information from more than one volume group or hypervisor.
635

636
  """
637
  (bootid, space_info, (hv_info, )) = data
638

    
639
  ret = utils.JoinDisjointDicts(hv_info, {"bootid": bootid})
640

    
641
  _AddSpindlesToLegacyNodeInfo(ret, space_info)
642
  _AddStorageInfoToLegacyNodeInfoByTemplate(ret, space_info, disk_template)
643

    
644
  return ret
645

    
646

    
647
def _AnnotateDParamsDRBD(disk, (drbd_params, data_params, meta_params)):
648
  """Annotates just DRBD disks layouts.
649

650
  """
651
  assert disk.dev_type == constants.DT_DRBD8
652

    
653
  disk.params = objects.FillDict(drbd_params, disk.params)
654
  (dev_data, dev_meta) = disk.children
655
  dev_data.params = objects.FillDict(data_params, dev_data.params)
656
  dev_meta.params = objects.FillDict(meta_params, dev_meta.params)
657

    
658
  return disk
659

    
660

    
661
def _AnnotateDParamsGeneric(disk, (params, )):
662
  """Generic disk parameter annotation routine.
663

664
  """
665
  assert disk.dev_type != constants.DT_DRBD8
666

    
667
  disk.params = objects.FillDict(params, disk.params)
668

    
669
  return disk
670

    
671

    
672
def AnnotateDiskParams(disks, disk_params):
673
  """Annotates the disk objects with the disk parameters.
674

675
  @param disks: The list of disks objects to annotate
676
  @param disk_params: The disk parameters for annotation
677
  @returns: A list of disk objects annotated
678

679
  """
680
  def AnnotateDisk(disk):
681
    if disk.dev_type == constants.DT_DISKLESS:
682
      return disk
683

    
684
    ld_params = objects.Disk.ComputeLDParams(disk.dev_type, disk_params)
685

    
686
    if disk.dev_type == constants.DT_DRBD8:
687
      return _AnnotateDParamsDRBD(disk, ld_params)
688
    else:
689
      return _AnnotateDParamsGeneric(disk, ld_params)
690

    
691
  return [AnnotateDisk(disk.Copy()) for disk in disks]
692

    
693

    
694
def _GetExclusiveStorageFlag(cfg, node_uuid):
695
  ni = cfg.GetNodeInfo(node_uuid)
696
  if ni is None:
697
    raise errors.OpPrereqError("Invalid node name %s" % node_uuid,
698
                               errors.ECODE_NOENT)
699
  return cfg.GetNdParams(ni)[constants.ND_EXCLUSIVE_STORAGE]
700

    
701

    
702
def _AddExclusiveStorageFlagToLvmStorageUnits(storage_units, es_flag):
703
  """Adds the exclusive storage flag to lvm units.
704

705
  This function creates a copy of the storage_units lists, with the
706
  es_flag being added to all lvm storage units.
707

708
  @type storage_units: list of pairs (string, string)
709
  @param storage_units: list of 'raw' storage units, consisting only of
710
    (storage_type, storage_key)
711
  @type es_flag: boolean
712
  @param es_flag: exclusive storage flag
713
  @rtype: list of tuples (string, string, list)
714
  @return: list of storage units (storage_type, storage_key, params) with
715
    the params containing the es_flag for lvm-vg storage units
716

717
  """
718
  result = []
719
  for (storage_type, storage_key) in storage_units:
720
    if storage_type in [constants.ST_LVM_VG]:
721
      result.append((storage_type, storage_key, [es_flag]))
722
      if es_flag:
723
        result.append((constants.ST_LVM_PV, storage_key, [es_flag]))
724
    else:
725
      result.append((storage_type, storage_key, []))
726
  return result
727

    
728

    
729
def GetExclusiveStorageForNodes(cfg, node_uuids):
730
  """Return the exclusive storage flag for all the given nodes.
731

732
  @type cfg: L{config.ConfigWriter}
733
  @param cfg: cluster configuration
734
  @type node_uuids: list or tuple
735
  @param node_uuids: node UUIDs for which to read the flag
736
  @rtype: dict
737
  @return: mapping from node uuids to exclusive storage flags
738
  @raise errors.OpPrereqError: if any given node name has no corresponding
739
  node
740

741
  """
742
  getflag = lambda n: _GetExclusiveStorageFlag(cfg, n)
743
  flags = map(getflag, node_uuids)
744
  return dict(zip(node_uuids, flags))
745

    
746

    
747
def PrepareStorageUnitsForNodes(cfg, storage_units, node_uuids):
748
  """Return the lvm storage unit for all the given nodes.
749

750
  Main purpose of this function is to map the exclusive storage flag, which
751
  can be different for each node, to the default LVM storage unit.
752

753
  @type cfg: L{config.ConfigWriter}
754
  @param cfg: cluster configuration
755
  @type storage_units: list of pairs (string, string)
756
  @param storage_units: list of 'raw' storage units, e.g. pairs of
757
    (storage_type, storage_key)
758
  @type node_uuids: list or tuple
759
  @param node_uuids: node UUIDs for which to read the flag
760
  @rtype: dict
761
  @return: mapping from node uuids to a list of storage units which include
762
    the exclusive storage flag for lvm storage
763
  @raise errors.OpPrereqError: if any given node name has no corresponding
764
  node
765

766
  """
767
  getunit = lambda n: _AddExclusiveStorageFlagToLvmStorageUnits(
768
      storage_units, _GetExclusiveStorageFlag(cfg, n))
769
  flags = map(getunit, node_uuids)
770
  return dict(zip(node_uuids, flags))
771

    
772

    
773
#: Generic encoders
774
_ENCODERS = {
775
  rpc_defs.ED_OBJECT_DICT: _ObjectToDict,
776
  rpc_defs.ED_OBJECT_DICT_LIST: _ObjectListToDict,
777
  rpc_defs.ED_COMPRESS: _Compress,
778
  rpc_defs.ED_FINALIZE_EXPORT_DISKS: _PrepareFinalizeExportDisks,
779
  rpc_defs.ED_BLOCKDEV_RENAME: _EncodeBlockdevRename,
780
  }
781

    
782

    
783
class RpcRunner(_RpcClientBase,
784
                _generated_rpc.RpcClientDefault,
785
                _generated_rpc.RpcClientBootstrap,
786
                _generated_rpc.RpcClientDnsOnly,
787
                _generated_rpc.RpcClientConfig):
788
  """RPC runner class.
789

790
  """
791
  def __init__(self, cfg, lock_monitor_cb, _req_process_fn=None, _getents=None):
792
    """Initialized the RPC runner.
793

794
    @type cfg: L{config.ConfigWriter}
795
    @param cfg: Configuration
796
    @type lock_monitor_cb: callable
797
    @param lock_monitor_cb: Lock monitor callback
798

799
    """
800
    self._cfg = cfg
801

    
802
    encoders = _ENCODERS.copy()
803

    
804
    encoders.update({
805
      # Encoders requiring configuration object
806
      rpc_defs.ED_INST_DICT: self._InstDict,
807
      rpc_defs.ED_INST_DICT_HVP_BEP_DP: self._InstDictHvpBepDp,
808
      rpc_defs.ED_INST_DICT_OSP_DP: self._InstDictOspDp,
809
      rpc_defs.ED_NIC_DICT: self._NicDict,
810
      rpc_defs.ED_DEVICE_DICT: self._DeviceDict,
811

    
812
      # Encoders annotating disk parameters
813
      rpc_defs.ED_DISKS_DICT_DP: self._DisksDictDP,
814
      rpc_defs.ED_MULTI_DISKS_DICT_DP: self._MultiDiskDictDP,
815
      rpc_defs.ED_SINGLE_DISK_DICT_DP: self._SingleDiskDictDP,
816
      rpc_defs.ED_NODE_TO_DISK_DICT_DP: self._EncodeNodeToDiskDictDP,
817

    
818
      # Encoders with special requirements
819
      rpc_defs.ED_FILE_DETAILS: compat.partial(_PrepareFileUpload, _getents),
820

    
821
      rpc_defs.ED_IMPEXP_IO: self._EncodeImportExportIO,
822
      })
823

    
824
    # Resolver using configuration
825
    resolver = compat.partial(_NodeConfigResolver, cfg.GetNodeInfo,
826
                              cfg.GetAllNodesInfo)
827

    
828
    # Pylint doesn't recognize multiple inheritance properly, see
829
    # <http://www.logilab.org/ticket/36586> and
830
    # <http://www.logilab.org/ticket/35642>
831
    # pylint: disable=W0233
832
    _RpcClientBase.__init__(self, resolver, encoders.get,
833
                            lock_monitor_cb=lock_monitor_cb,
834
                            _req_process_fn=_req_process_fn)
835
    _generated_rpc.RpcClientConfig.__init__(self)
836
    _generated_rpc.RpcClientBootstrap.__init__(self)
837
    _generated_rpc.RpcClientDnsOnly.__init__(self)
838
    _generated_rpc.RpcClientDefault.__init__(self)
839

    
840
  def _NicDict(self, _, nic):
841
    """Convert the given nic to a dict and encapsulate netinfo
842

843
    """
844
    n = copy.deepcopy(nic)
845
    if n.network:
846
      net_uuid = self._cfg.LookupNetwork(n.network)
847
      if net_uuid:
848
        nobj = self._cfg.GetNetwork(net_uuid)
849
        n.netinfo = objects.Network.ToDict(nobj)
850
    return n.ToDict()
851

    
852
  def _DeviceDict(self, _, (device, instance)):
853
    if isinstance(device, objects.NIC):
854
      return self._NicDict(None, device)
855
    elif isinstance(device, objects.Disk):
856
      return self._SingleDiskDictDP(None, (device, instance))
857

    
858
  def _InstDict(self, node, instance, hvp=None, bep=None, osp=None):
859
    """Convert the given instance to a dict.
860

861
    This is done via the instance's ToDict() method and additionally
862
    we fill the hvparams with the cluster defaults.
863

864
    @type instance: L{objects.Instance}
865
    @param instance: an Instance object
866
    @type hvp: dict or None
867
    @param hvp: a dictionary with overridden hypervisor parameters
868
    @type bep: dict or None
869
    @param bep: a dictionary with overridden backend parameters
870
    @type osp: dict or None
871
    @param osp: a dictionary with overridden os parameters
872
    @rtype: dict
873
    @return: the instance dict, with the hvparams filled with the
874
        cluster defaults
875

876
    """
877
    idict = instance.ToDict()
878
    cluster = self._cfg.GetClusterInfo()
879
    idict["hvparams"] = cluster.FillHV(instance)
880
    idict["secondary_nodes"] = self._cfg.GetInstanceSecondaryNodes(instance)
881
    if hvp is not None:
882
      idict["hvparams"].update(hvp)
883
    idict["beparams"] = cluster.FillBE(instance)
884
    if bep is not None:
885
      idict["beparams"].update(bep)
886
    idict["osparams"] = cluster.SimpleFillOS(instance.os, instance.osparams)
887
    if osp is not None:
888
      idict["osparams"].update(osp)
889
    idict["disks_info"] = self._DisksDictDP(node, (instance.disks, instance))
890
    for nic in idict["nics"]:
891
      nic["nicparams"] = objects.FillDict(
892
        cluster.nicparams[constants.PP_DEFAULT],
893
        nic["nicparams"])
894
      network = nic.get("network", None)
895
      if network:
896
        net_uuid = self._cfg.LookupNetwork(network)
897
        if net_uuid:
898
          nobj = self._cfg.GetNetwork(net_uuid)
899
          nic["netinfo"] = objects.Network.ToDict(nobj)
900
    return idict
901

    
902
  def _InstDictHvpBepDp(self, node, (instance, hvp, bep)):
903
    """Wrapper for L{_InstDict}.
904

905
    """
906
    return self._InstDict(node, instance, hvp=hvp, bep=bep)
907

    
908
  def _InstDictOspDp(self, node, (instance, osparams)):
909
    """Wrapper for L{_InstDict}.
910

911
    """
912
    return self._InstDict(node, instance, osp=osparams)
913

    
914
  def _DisksDictDP(self, node, (disks, instance)):
915
    """Wrapper for L{AnnotateDiskParams}.
916

917
    """
918
    diskparams = self._cfg.GetInstanceDiskParams(instance)
919
    ret = []
920
    for disk in AnnotateDiskParams(disks, diskparams):
921
      disk_node_uuids = disk.GetNodes(instance.primary_node)
922
      node_ips = dict((uuid, node.secondary_ip) for (uuid, node)
923
                      in self._cfg.GetMultiNodeInfo(disk_node_uuids))
924

    
925
      disk.UpdateDynamicDiskParams(node, node_ips)
926

    
927
      ret.append(disk.ToDict(include_dynamic_params=True))
928

    
929
    return ret
930

    
931
  def _MultiDiskDictDP(self, node, disks_insts):
932
    """Wrapper for L{AnnotateDiskParams}.
933

934
    Supports a list of (disk, instance) tuples.
935
    """
936
    return [disk for disk_inst in disks_insts
937
            for disk in self._DisksDictDP(node, disk_inst)]
938

    
939
  def _SingleDiskDictDP(self, node, (disk, instance)):
940
    """Wrapper for L{AnnotateDiskParams}.
941

942
    """
943
    (anno_disk,) = self._DisksDictDP(node, ([disk], instance))
944
    return anno_disk
945

    
946
  def _EncodeNodeToDiskDictDP(self, node, value):
947
    """Encode dict of node name -> list of (disk, instance) tuples as values.
948

949
    """
950
    return dict((name, [self._SingleDiskDictDP(node, disk) for disk in disks])
951
                for name, disks in value.items())
952

    
953
  def _EncodeImportExportIO(self, node, (ieio, ieioargs)):
954
    """Encodes import/export I/O information.
955

956
    """
957
    if ieio == constants.IEIO_RAW_DISK:
958
      assert len(ieioargs) == 2
959
      return (ieio, (self._SingleDiskDictDP(node, ieioargs), ))
960

    
961
    if ieio == constants.IEIO_SCRIPT:
962
      assert len(ieioargs) == 2
963
      return (ieio, (self._SingleDiskDictDP(node, ieioargs[0]), ieioargs[1]))
964

    
965
    return (ieio, ieioargs)
966

    
967

    
968
class JobQueueRunner(_RpcClientBase, _generated_rpc.RpcClientJobQueue):
969
  """RPC wrappers for job queue.
970

971
  """
972
  def __init__(self, context, address_list):
973
    """Initializes this class.
974

975
    """
976
    if address_list is None:
977
      resolver = compat.partial(_SsconfResolver, True)
978
    else:
979
      # Caller provided an address list
980
      resolver = _StaticResolver(address_list)
981

    
982
    _RpcClientBase.__init__(self, resolver, _ENCODERS.get,
983
                            lock_monitor_cb=context.glm.AddToLockMonitor)
984
    _generated_rpc.RpcClientJobQueue.__init__(self)
985

    
986

    
987
class BootstrapRunner(_RpcClientBase,
988
                      _generated_rpc.RpcClientBootstrap,
989
                      _generated_rpc.RpcClientDnsOnly):
990
  """RPC wrappers for bootstrapping.
991

992
  """
993
  def __init__(self):
994
    """Initializes this class.
995

996
    """
997
    # Pylint doesn't recognize multiple inheritance properly, see
998
    # <http://www.logilab.org/ticket/36586> and
999
    # <http://www.logilab.org/ticket/35642>
1000
    # pylint: disable=W0233
1001
    _RpcClientBase.__init__(self, compat.partial(_SsconfResolver, True),
1002
                            _ENCODERS.get)
1003
    _generated_rpc.RpcClientBootstrap.__init__(self)
1004
    _generated_rpc.RpcClientDnsOnly.__init__(self)
1005

    
1006

    
1007
class DnsOnlyRunner(_RpcClientBase, _generated_rpc.RpcClientDnsOnly):
1008
  """RPC wrappers for calls using only DNS.
1009

1010
  """
1011
  def __init__(self):
1012
    """Initialize this class.
1013

1014
    """
1015
    _RpcClientBase.__init__(self, compat.partial(_SsconfResolver, False),
1016
                            _ENCODERS.get)
1017
    _generated_rpc.RpcClientDnsOnly.__init__(self)
1018

    
1019

    
1020
class ConfigRunner(_RpcClientBase, _generated_rpc.RpcClientConfig):
1021
  """RPC wrappers for L{config}.
1022

1023
  """
1024
  def __init__(self, context, address_list, _req_process_fn=None,
1025
               _getents=None):
1026
    """Initializes this class.
1027

1028
    """
1029
    if context:
1030
      lock_monitor_cb = context.glm.AddToLockMonitor
1031
    else:
1032
      lock_monitor_cb = None
1033

    
1034
    if address_list is None:
1035
      resolver = compat.partial(_SsconfResolver, True)
1036
    else:
1037
      # Caller provided an address list
1038
      resolver = _StaticResolver(address_list)
1039

    
1040
    encoders = _ENCODERS.copy()
1041

    
1042
    encoders.update({
1043
      rpc_defs.ED_FILE_DETAILS: compat.partial(_PrepareFileUpload, _getents),
1044
      })
1045

    
1046
    _RpcClientBase.__init__(self, resolver, encoders.get,
1047
                            lock_monitor_cb=lock_monitor_cb,
1048
                            _req_process_fn=_req_process_fn)
1049
    _generated_rpc.RpcClientConfig.__init__(self)