Statistics
| Branch: | Tag: | Revision:

root / lib / rpc.py @ 235a6b29

History | View | Annotate | Download (31.3 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012, 2013 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Inter-node RPC library.
23

24
"""
25

    
26
# pylint: disable=C0103,R0201,R0904
27
# C0103: Invalid name, since call_ are not valid
28
# R0201: Method could be a function, we keep all rpcs instance methods
29
# as not to change them back and forth between static/instance methods
30
# if they need to start using instance attributes
31
# R0904: Too many public methods
32

    
33
import logging
34
import zlib
35
import base64
36
import pycurl
37
import threading
38
import copy
39

    
40
from ganeti import utils
41
from ganeti import objects
42
from ganeti import http
43
from ganeti import serializer
44
from ganeti import constants
45
from ganeti import errors
46
from ganeti import netutils
47
from ganeti import ssconf
48
from ganeti import runtime
49
from ganeti import compat
50
from ganeti import rpc_defs
51
from ganeti import pathutils
52
from ganeti import vcluster
53

    
54
# Special module generated at build time
55
from ganeti import _generated_rpc
56

    
57
# pylint has a bug here, doesn't see this import
58
import ganeti.http.client  # pylint: disable=W0611
59

    
60

    
61
_RPC_CLIENT_HEADERS = [
62
  "Content-type: %s" % http.HTTP_APP_JSON,
63
  "Expect:",
64
  ]
65

    
66
#: Special value to describe an offline host
67
_OFFLINE = object()
68

    
69

    
70
def Init():
71
  """Initializes the module-global HTTP client manager.
72

73
  Must be called before using any RPC function and while exactly one thread is
74
  running.
75

76
  """
77
  # curl_global_init(3) and curl_global_cleanup(3) must be called with only
78
  # one thread running. This check is just a safety measure -- it doesn't
79
  # cover all cases.
80
  assert threading.activeCount() == 1, \
81
         "Found more than one active thread when initializing pycURL"
82

    
83
  logging.info("Using PycURL %s", pycurl.version)
84

    
85
  pycurl.global_init(pycurl.GLOBAL_ALL)
86

    
87

    
88
def Shutdown():
89
  """Stops the module-global HTTP client manager.
90

91
  Must be called before quitting the program and while exactly one thread is
92
  running.
93

94
  """
95
  pycurl.global_cleanup()
96

    
97

    
98
def _ConfigRpcCurl(curl):
99
  noded_cert = str(pathutils.NODED_CERT_FILE)
100

    
101
  curl.setopt(pycurl.FOLLOWLOCATION, False)
102
  curl.setopt(pycurl.CAINFO, noded_cert)
103
  curl.setopt(pycurl.SSL_VERIFYHOST, 0)
104
  curl.setopt(pycurl.SSL_VERIFYPEER, True)
105
  curl.setopt(pycurl.SSLCERTTYPE, "PEM")
106
  curl.setopt(pycurl.SSLCERT, noded_cert)
107
  curl.setopt(pycurl.SSLKEYTYPE, "PEM")
108
  curl.setopt(pycurl.SSLKEY, noded_cert)
109
  curl.setopt(pycurl.CONNECTTIMEOUT, constants.RPC_CONNECT_TIMEOUT)
110

    
111

    
112
def RunWithRPC(fn):
113
  """RPC-wrapper decorator.
114

115
  When applied to a function, it runs it with the RPC system
116
  initialized, and it shutsdown the system afterwards. This means the
117
  function must be called without RPC being initialized.
118

119
  """
120
  def wrapper(*args, **kwargs):
121
    Init()
122
    try:
123
      return fn(*args, **kwargs)
124
    finally:
125
      Shutdown()
126
  return wrapper
127

    
128

    
129
def _Compress(data):
130
  """Compresses a string for transport over RPC.
131

132
  Small amounts of data are not compressed.
133

134
  @type data: str
135
  @param data: Data
136
  @rtype: tuple
137
  @return: Encoded data to send
138

139
  """
140
  # Small amounts of data are not compressed
141
  if len(data) < 512:
142
    return (constants.RPC_ENCODING_NONE, data)
143

    
144
  # Compress with zlib and encode in base64
145
  return (constants.RPC_ENCODING_ZLIB_BASE64,
146
          base64.b64encode(zlib.compress(data, 3)))
147

    
148

    
149
class RpcResult(object):
150
  """RPC Result class.
151

152
  This class holds an RPC result. It is needed since in multi-node
153
  calls we can't raise an exception just because one out of many
154
  failed, and therefore we use this class to encapsulate the result.
155

156
  @ivar data: the data payload, for successful results, or None
157
  @ivar call: the name of the RPC call
158
  @ivar node: the name of the node to which we made the call
159
  @ivar offline: whether the operation failed because the node was
160
      offline, as opposed to actual failure; offline=True will always
161
      imply failed=True, in order to allow simpler checking if
162
      the user doesn't care about the exact failure mode
163
  @ivar fail_msg: the error message if the call failed
164

165
  """
166
  def __init__(self, data=None, failed=False, offline=False,
167
               call=None, node=None):
168
    self.offline = offline
169
    self.call = call
170
    self.node = node
171

    
172
    if offline:
173
      self.fail_msg = "Node is marked offline"
174
      self.data = self.payload = None
175
    elif failed:
176
      self.fail_msg = self._EnsureErr(data)
177
      self.data = self.payload = None
178
    else:
179
      self.data = data
180
      if not isinstance(self.data, (tuple, list)):
181
        self.fail_msg = ("RPC layer error: invalid result type (%s)" %
182
                         type(self.data))
183
        self.payload = None
184
      elif len(data) != 2:
185
        self.fail_msg = ("RPC layer error: invalid result length (%d), "
186
                         "expected 2" % len(self.data))
187
        self.payload = None
188
      elif not self.data[0]:
189
        self.fail_msg = self._EnsureErr(self.data[1])
190
        self.payload = None
191
      else:
192
        # finally success
193
        self.fail_msg = None
194
        self.payload = data[1]
195

    
196
    for attr_name in ["call", "data", "fail_msg",
197
                      "node", "offline", "payload"]:
198
      assert hasattr(self, attr_name), "Missing attribute %s" % attr_name
199

    
200
  @staticmethod
201
  def _EnsureErr(val):
202
    """Helper to ensure we return a 'True' value for error."""
203
    if val:
204
      return val
205
    else:
206
      return "No error information"
207

    
208
  def Raise(self, msg, prereq=False, ecode=None):
209
    """If the result has failed, raise an OpExecError.
210

211
    This is used so that LU code doesn't have to check for each
212
    result, but instead can call this function.
213

214
    """
215
    if not self.fail_msg:
216
      return
217

    
218
    if not msg: # one could pass None for default message
219
      msg = ("Call '%s' to node '%s' has failed: %s" %
220
             (self.call, self.node, self.fail_msg))
221
    else:
222
      msg = "%s: %s" % (msg, self.fail_msg)
223
    if prereq:
224
      ec = errors.OpPrereqError
225
    else:
226
      ec = errors.OpExecError
227
    if ecode is not None:
228
      args = (msg, ecode)
229
    else:
230
      args = (msg, )
231
    raise ec(*args) # pylint: disable=W0142
232

    
233
  def Warn(self, msg, feedback_fn):
234
    """If the result has failed, call the feedback_fn.
235

236
    This is used to in cases were LU wants to warn the
237
    user about a failure, but continue anyway.
238

239
    """
240
    if not self.fail_msg:
241
      return
242

    
243
    msg = "%s: %s" % (msg, self.fail_msg)
244
    feedback_fn(msg)
245

    
246

    
247
def _SsconfResolver(ssconf_ips, node_list, _,
248
                    ssc=ssconf.SimpleStore,
249
                    nslookup_fn=netutils.Hostname.GetIP):
250
  """Return addresses for given node names.
251

252
  @type ssconf_ips: bool
253
  @param ssconf_ips: Use the ssconf IPs
254
  @type node_list: list
255
  @param node_list: List of node names
256
  @type ssc: class
257
  @param ssc: SimpleStore class that is used to obtain node->ip mappings
258
  @type nslookup_fn: callable
259
  @param nslookup_fn: function use to do NS lookup
260
  @rtype: list of tuple; (string, string)
261
  @return: List of tuples containing node name and IP address
262

263
  """
264
  ss = ssc()
265
  family = ss.GetPrimaryIPFamily()
266

    
267
  if ssconf_ips:
268
    iplist = ss.GetNodePrimaryIPList()
269
    ipmap = dict(entry.split() for entry in iplist)
270
  else:
271
    ipmap = {}
272

    
273
  result = []
274
  for node in node_list:
275
    ip = ipmap.get(node)
276
    if ip is None:
277
      ip = nslookup_fn(node, family=family)
278
    result.append((node, ip, node))
279

    
280
  return result
281

    
282

    
283
class _StaticResolver:
284
  def __init__(self, addresses):
285
    """Initializes this class.
286

287
    """
288
    self._addresses = addresses
289

    
290
  def __call__(self, hosts, _):
291
    """Returns static addresses for hosts.
292

293
    """
294
    assert len(hosts) == len(self._addresses)
295
    return zip(hosts, self._addresses, hosts)
296

    
297

    
298
def _CheckConfigNode(node_uuid_or_name, node, accept_offline_node):
299
  """Checks if a node is online.
300

301
  @type node_uuid_or_name: string
302
  @param node_uuid_or_name: Node UUID
303
  @type node: L{objects.Node} or None
304
  @param node: Node object
305

306
  """
307
  if node is None:
308
    # Assume that the passed parameter was actually a node name, so depend on
309
    # DNS for name resolution
310
    return (node_uuid_or_name, node_uuid_or_name, node_uuid_or_name)
311
  else:
312
    if node.offline and not accept_offline_node:
313
      ip = _OFFLINE
314
    else:
315
      ip = node.primary_ip
316
    return (node.name, ip, node_uuid_or_name)
317

    
318

    
319
def _NodeConfigResolver(single_node_fn, all_nodes_fn, node_uuids, opts):
320
  """Calculate node addresses using configuration.
321

322
  Note that strings in node_uuids are treated as node names if the UUID is not
323
  found in the configuration.
324

325
  """
326
  accept_offline_node = (opts is rpc_defs.ACCEPT_OFFLINE_NODE)
327

    
328
  assert accept_offline_node or opts is None, "Unknown option"
329

    
330
  # Special case for single-host lookups
331
  if len(node_uuids) == 1:
332
    (uuid, ) = node_uuids
333
    return [_CheckConfigNode(uuid, single_node_fn(uuid), accept_offline_node)]
334
  else:
335
    all_nodes = all_nodes_fn()
336
    return [_CheckConfigNode(uuid, all_nodes.get(uuid, None),
337
                             accept_offline_node)
338
            for uuid in node_uuids]
339

    
340

    
341
class _RpcProcessor:
342
  def __init__(self, resolver, port, lock_monitor_cb=None):
343
    """Initializes this class.
344

345
    @param resolver: callable accepting a list of node UUIDs or hostnames,
346
      returning a list of tuples containing name, IP address and original name
347
      of the resolved node. IP address can be the name or the special value
348
      L{_OFFLINE} to mark offline machines.
349
    @type port: int
350
    @param port: TCP port
351
    @param lock_monitor_cb: Callable for registering with lock monitor
352

353
    """
354
    self._resolver = resolver
355
    self._port = port
356
    self._lock_monitor_cb = lock_monitor_cb
357

    
358
  @staticmethod
359
  def _PrepareRequests(hosts, port, procedure, body, read_timeout):
360
    """Prepares requests by sorting offline hosts into separate list.
361

362
    @type body: dict
363
    @param body: a dictionary with per-host body data
364

365
    """
366
    results = {}
367
    requests = {}
368

    
369
    assert isinstance(body, dict)
370
    assert len(body) == len(hosts)
371
    assert compat.all(isinstance(v, str) for v in body.values())
372
    assert frozenset(map(lambda x: x[2], hosts)) == frozenset(body.keys()), \
373
        "%s != %s" % (hosts, body.keys())
374

    
375
    for (name, ip, original_name) in hosts:
376
      if ip is _OFFLINE:
377
        # Node is marked as offline
378
        results[original_name] = RpcResult(node=name,
379
                                           offline=True,
380
                                           call=procedure)
381
      else:
382
        requests[original_name] = \
383
          http.client.HttpClientRequest(str(ip), port,
384
                                        http.HTTP_POST, str("/%s" % procedure),
385
                                        headers=_RPC_CLIENT_HEADERS,
386
                                        post_data=body[original_name],
387
                                        read_timeout=read_timeout,
388
                                        nicename="%s/%s" % (name, procedure),
389
                                        curl_config_fn=_ConfigRpcCurl)
390

    
391
    return (results, requests)
392

    
393
  @staticmethod
394
  def _CombineResults(results, requests, procedure):
395
    """Combines pre-computed results for offline hosts with actual call results.
396

397
    """
398
    for name, req in requests.items():
399
      if req.success and req.resp_status_code == http.HTTP_OK:
400
        host_result = RpcResult(data=serializer.LoadJson(req.resp_body),
401
                                node=name, call=procedure)
402
      else:
403
        # TODO: Better error reporting
404
        if req.error:
405
          msg = req.error
406
        else:
407
          msg = req.resp_body
408

    
409
        logging.error("RPC error in %s on node %s: %s", procedure, name, msg)
410
        host_result = RpcResult(data=msg, failed=True, node=name,
411
                                call=procedure)
412

    
413
      results[name] = host_result
414

    
415
    return results
416

    
417
  def __call__(self, nodes, procedure, body, read_timeout, resolver_opts,
418
               _req_process_fn=None):
419
    """Makes an RPC request to a number of nodes.
420

421
    @type nodes: sequence
422
    @param nodes: node UUIDs or Hostnames
423
    @type procedure: string
424
    @param procedure: Request path
425
    @type body: dictionary
426
    @param body: dictionary with request bodies per host
427
    @type read_timeout: int or None
428
    @param read_timeout: Read timeout for request
429
    @rtype: dictionary
430
    @return: a dictionary mapping host names to rpc.RpcResult objects
431

432
    """
433
    assert read_timeout is not None, \
434
      "Missing RPC read timeout for procedure '%s'" % procedure
435

    
436
    if _req_process_fn is None:
437
      _req_process_fn = http.client.ProcessRequests
438

    
439
    (results, requests) = \
440
      self._PrepareRequests(self._resolver(nodes, resolver_opts), self._port,
441
                            procedure, body, read_timeout)
442

    
443
    _req_process_fn(requests.values(), lock_monitor_cb=self._lock_monitor_cb)
444

    
445
    assert not frozenset(results).intersection(requests)
446

    
447
    return self._CombineResults(results, requests, procedure)
448

    
449

    
450
class _RpcClientBase:
451
  def __init__(self, resolver, encoder_fn, lock_monitor_cb=None,
452
               _req_process_fn=None):
453
    """Initializes this class.
454

455
    """
456
    proc = _RpcProcessor(resolver,
457
                         netutils.GetDaemonPort(constants.NODED),
458
                         lock_monitor_cb=lock_monitor_cb)
459
    self._proc = compat.partial(proc, _req_process_fn=_req_process_fn)
460
    self._encoder = compat.partial(self._EncodeArg, encoder_fn)
461

    
462
  @staticmethod
463
  def _EncodeArg(encoder_fn, (argkind, value)):
464
    """Encode argument.
465

466
    """
467
    if argkind is None:
468
      return value
469
    else:
470
      return encoder_fn(argkind)(value)
471

    
472
  def _Call(self, cdef, node_list, args):
473
    """Entry point for automatically generated RPC wrappers.
474

475
    """
476
    (procedure, _, resolver_opts, timeout, argdefs,
477
     prep_fn, postproc_fn, _) = cdef
478

    
479
    if callable(timeout):
480
      read_timeout = timeout(args)
481
    else:
482
      read_timeout = timeout
483

    
484
    if callable(resolver_opts):
485
      req_resolver_opts = resolver_opts(args)
486
    else:
487
      req_resolver_opts = resolver_opts
488

    
489
    if len(args) != len(argdefs):
490
      raise errors.ProgrammerError("Number of passed arguments doesn't match")
491

    
492
    enc_args = map(self._encoder, zip(map(compat.snd, argdefs), args))
493
    if prep_fn is None:
494
      # for a no-op prep_fn, we serialise the body once, and then we
495
      # reuse it in the dictionary values
496
      body = serializer.DumpJson(enc_args)
497
      pnbody = dict((n, body) for n in node_list)
498
    else:
499
      # for a custom prep_fn, we pass the encoded arguments and the
500
      # node name to the prep_fn, and we serialise its return value
501
      assert callable(prep_fn)
502
      pnbody = dict((n, serializer.DumpJson(prep_fn(n, enc_args)))
503
                    for n in node_list)
504

    
505
    result = self._proc(node_list, procedure, pnbody, read_timeout,
506
                        req_resolver_opts)
507

    
508
    if postproc_fn:
509
      return dict(map(lambda (key, value): (key, postproc_fn(value)),
510
                      result.items()))
511
    else:
512
      return result
513

    
514

    
515
def _ObjectToDict(value):
516
  """Converts an object to a dictionary.
517

518
  @note: See L{objects}.
519

520
  """
521
  return value.ToDict()
522

    
523

    
524
def _ObjectListToDict(value):
525
  """Converts a list of L{objects} to dictionaries.
526

527
  """
528
  return map(_ObjectToDict, value)
529

    
530

    
531
def _EncodeNodeToDiskDict(value):
532
  """Encodes a dictionary with node name as key and disk objects as values.
533

534
  """
535
  return dict((name, _ObjectListToDict(disks))
536
              for name, disks in value.items())
537

    
538

    
539
def _PrepareFileUpload(getents_fn, filename):
540
  """Loads a file and prepares it for an upload to nodes.
541

542
  """
543
  statcb = utils.FileStatHelper()
544
  data = _Compress(utils.ReadFile(filename, preread=statcb))
545
  st = statcb.st
546

    
547
  if getents_fn is None:
548
    getents_fn = runtime.GetEnts
549

    
550
  getents = getents_fn()
551

    
552
  virt_filename = vcluster.MakeVirtualPath(filename)
553

    
554
  return [virt_filename, data, st.st_mode, getents.LookupUid(st.st_uid),
555
          getents.LookupGid(st.st_gid), st.st_atime, st.st_mtime]
556

    
557

    
558
def _PrepareFinalizeExportDisks(snap_disks):
559
  """Encodes disks for finalizing export.
560

561
  """
562
  flat_disks = []
563

    
564
  for disk in snap_disks:
565
    if isinstance(disk, bool):
566
      flat_disks.append(disk)
567
    else:
568
      flat_disks.append(disk.ToDict())
569

    
570
  return flat_disks
571

    
572

    
573
def _EncodeImportExportIO((ieio, ieioargs)):
574
  """Encodes import/export I/O information.
575

576
  """
577
  if ieio == constants.IEIO_RAW_DISK:
578
    assert len(ieioargs) == 1
579
    return (ieio, (ieioargs[0].ToDict(), ))
580

    
581
  if ieio == constants.IEIO_SCRIPT:
582
    assert len(ieioargs) == 2
583
    return (ieio, (ieioargs[0].ToDict(), ieioargs[1]))
584

    
585
  return (ieio, ieioargs)
586

    
587

    
588
def _EncodeBlockdevRename(value):
589
  """Encodes information for renaming block devices.
590

591
  """
592
  return [(d.ToDict(), uid) for d, uid in value]
593

    
594

    
595
def _AddSpindlesToLegacyNodeInfo(result, space_info):
596
  """Extracts the spindle information from the space info and adds
597
  it to the result dictionary.
598

599
  @type result: dict of strings
600
  @param result: dictionary holding the result of the legacy node info
601
  @type space_info: list of dicts of strings
602
  @param space_info: list, each row holding space information of one storage
603
    unit
604
  @rtype: None
605
  @return: does not return anything, manipulates the C{result} variable
606

607
  """
608
  lvm_pv_info = utils.storage.LookupSpaceInfoByStorageType(
609
      space_info, constants.ST_LVM_PV)
610
  if lvm_pv_info:
611
    result["spindles_free"] = lvm_pv_info["storage_free"]
612
    result["spindles_total"] = lvm_pv_info["storage_size"]
613

    
614

    
615
def _AddDefaultStorageInfoToLegacyNodeInfo(result, space_info,
616
                                           require_vg_info=True):
617
  """Extracts the storage space information of the default storage type from
618
  the space info and adds it to the result dictionary.
619

620
  @see: C{_AddSpindlesToLegacyNodeInfo} for parameter information.
621
  @type require_vg_info: boolean
622
  @param require_vg_info: indicates whether volume group information is
623
    required or not
624

625
  """
626
  # Check if there is at least one row for non-spindle storage info.
627
  no_defaults = (len(space_info) < 1) or \
628
      (space_info[0]["type"] == constants.ST_LVM_PV and len(space_info) == 1)
629

    
630
  default_space_info = None
631
  if no_defaults:
632
    logging.warning("No storage info provided for default storage type.")
633
  else:
634
    default_space_info = space_info[0]
635

    
636
  if require_vg_info:
637
    # if lvm storage is required, ignore the actual default and look for LVM
638
    lvm_info_found = False
639
    for space_entry in space_info:
640
      if space_entry["type"] == constants.ST_LVM_VG:
641
        default_space_info = space_entry
642
        lvm_info_found = True
643
        continue
644
    if not lvm_info_found:
645
      raise errors.OpExecError("LVM volume group info required, but not"
646
                               " provided.")
647

    
648
  if default_space_info:
649
    result["name"] = default_space_info["name"]
650
    result["storage_free"] = default_space_info["storage_free"]
651
    result["storage_size"] = default_space_info["storage_size"]
652

    
653

    
654
def MakeLegacyNodeInfo(data, require_vg_info=True):
655
  """Formats the data returned by L{rpc.RpcRunner.call_node_info}.
656

657
  Converts the data into a single dictionary. This is fine for most use cases,
658
  but some require information from more than one volume group or hypervisor.
659

660
  @param require_vg_info: raise an error if the returnd vg_info
661
      doesn't have any values
662

663
  """
664
  (bootid, space_info, (hv_info, )) = data
665

    
666
  ret = utils.JoinDisjointDicts(hv_info, {"bootid": bootid})
667

    
668
  _AddSpindlesToLegacyNodeInfo(ret, space_info)
669
  _AddDefaultStorageInfoToLegacyNodeInfo(ret, space_info,
670
                                         require_vg_info=require_vg_info)
671

    
672
  return ret
673

    
674

    
675
def _AnnotateDParamsDRBD(disk, (drbd_params, data_params, meta_params)):
676
  """Annotates just DRBD disks layouts.
677

678
  """
679
  assert disk.dev_type == constants.LD_DRBD8
680

    
681
  disk.params = objects.FillDict(drbd_params, disk.params)
682
  (dev_data, dev_meta) = disk.children
683
  dev_data.params = objects.FillDict(data_params, dev_data.params)
684
  dev_meta.params = objects.FillDict(meta_params, dev_meta.params)
685

    
686
  return disk
687

    
688

    
689
def _AnnotateDParamsGeneric(disk, (params, )):
690
  """Generic disk parameter annotation routine.
691

692
  """
693
  assert disk.dev_type != constants.LD_DRBD8
694

    
695
  disk.params = objects.FillDict(params, disk.params)
696

    
697
  return disk
698

    
699

    
700
def AnnotateDiskParams(template, disks, disk_params):
701
  """Annotates the disk objects with the disk parameters.
702

703
  @param template: The disk template used
704
  @param disks: The list of disks objects to annotate
705
  @param disk_params: The disk paramaters for annotation
706
  @returns: A list of disk objects annotated
707

708
  """
709
  ld_params = objects.Disk.ComputeLDParams(template, disk_params)
710

    
711
  if template == constants.DT_DRBD8:
712
    annotation_fn = _AnnotateDParamsDRBD
713
  elif template == constants.DT_DISKLESS:
714
    annotation_fn = lambda disk, _: disk
715
  else:
716
    annotation_fn = _AnnotateDParamsGeneric
717

    
718
  return [annotation_fn(disk.Copy(), ld_params) for disk in disks]
719

    
720

    
721
def _GetExclusiveStorageFlag(cfg, node_uuid):
722
  ni = cfg.GetNodeInfo(node_uuid)
723
  if ni is None:
724
    raise errors.OpPrereqError("Invalid node name %s" % node_uuid,
725
                               errors.ECODE_NOENT)
726
  return cfg.GetNdParams(ni)[constants.ND_EXCLUSIVE_STORAGE]
727

    
728

    
729
def _AddExclusiveStorageFlagToLvmStorageUnits(storage_units, es_flag):
730
  """Adds the exclusive storage flag to lvm units.
731

732
  This function creates a copy of the storage_units lists, with the
733
  es_flag being added to all lvm storage units.
734

735
  @type storage_units: list of pairs (string, string)
736
  @param storage_units: list of 'raw' storage units, consisting only of
737
    (storage_type, storage_key)
738
  @type es_flag: boolean
739
  @param es_flag: exclusive storage flag
740
  @rtype: list of tuples (string, string, list)
741
  @return: list of storage units (storage_type, storage_key, params) with
742
    the params containing the es_flag for lvm-vg storage units
743

744
  """
745
  result = []
746
  for (storage_type, storage_key) in storage_units:
747
    if storage_type == constants.ST_LVM_VG:
748
      result.append((storage_type, storage_key, es_flag))
749
    else:
750
      result.append((storage_type, storage_key, []))
751
  return result
752

    
753

    
754
def GetExclusiveStorageForNodes(cfg, node_uuids):
755
  """Return the exclusive storage flag for all the given nodes.
756

757
  @type cfg: L{config.ConfigWriter}
758
  @param cfg: cluster configuration
759
  @type node_uuids: list or tuple
760
  @param node_uuids: node UUIDs for which to read the flag
761
  @rtype: dict
762
  @return: mapping from node uuids to exclusive storage flags
763
  @raise errors.OpPrereqError: if any given node name has no corresponding
764
  node
765

766
  """
767
  getflag = lambda n: _GetExclusiveStorageFlag(cfg, n)
768
  flags = map(getflag, node_uuids)
769
  return dict(zip(node_uuids, flags))
770

    
771

    
772
def PrepareStorageUnitsForNodes(cfg, storage_units, node_uuids):
773
  """Return the lvm storage unit for all the given nodes.
774

775
  Main purpose of this function is to map the exclusive storage flag, which
776
  can be different for each node, to the default LVM storage unit.
777

778
  @type cfg: L{config.ConfigWriter}
779
  @param cfg: cluster configuration
780
  @type storage_units: list of pairs (string, string)
781
  @param storage_units: list of 'raw' storage units, e.g. pairs of
782
    (storage_type, storage_key)
783
  @type node_uuids: list or tuple
784
  @param node_uuids: node UUIDs for which to read the flag
785
  @rtype: dict
786
  @return: mapping from node uuids to a list of storage units which include
787
    the exclusive storage flag for lvm storage
788
  @raise errors.OpPrereqError: if any given node name has no corresponding
789
  node
790

791
  """
792
  getunit = lambda n: _AddExclusiveStorageFlagToLvmStorageUnits(
793
      storage_units, _GetExclusiveStorageFlag(cfg, n))
794
  flags = map(getunit, node_uuids)
795
  return dict(zip(node_uuids, flags))
796

    
797

    
798
#: Generic encoders
799
_ENCODERS = {
800
  rpc_defs.ED_OBJECT_DICT: _ObjectToDict,
801
  rpc_defs.ED_OBJECT_DICT_LIST: _ObjectListToDict,
802
  rpc_defs.ED_NODE_TO_DISK_DICT: _EncodeNodeToDiskDict,
803
  rpc_defs.ED_COMPRESS: _Compress,
804
  rpc_defs.ED_FINALIZE_EXPORT_DISKS: _PrepareFinalizeExportDisks,
805
  rpc_defs.ED_IMPEXP_IO: _EncodeImportExportIO,
806
  rpc_defs.ED_BLOCKDEV_RENAME: _EncodeBlockdevRename,
807
  }
808

    
809

    
810
class RpcRunner(_RpcClientBase,
811
                _generated_rpc.RpcClientDefault,
812
                _generated_rpc.RpcClientBootstrap,
813
                _generated_rpc.RpcClientDnsOnly,
814
                _generated_rpc.RpcClientConfig):
815
  """RPC runner class.
816

817
  """
818
  def __init__(self, cfg, lock_monitor_cb, _req_process_fn=None, _getents=None):
819
    """Initialized the RPC runner.
820

821
    @type cfg: L{config.ConfigWriter}
822
    @param cfg: Configuration
823
    @type lock_monitor_cb: callable
824
    @param lock_monitor_cb: Lock monitor callback
825

826
    """
827
    self._cfg = cfg
828

    
829
    encoders = _ENCODERS.copy()
830

    
831
    encoders.update({
832
      # Encoders requiring configuration object
833
      rpc_defs.ED_INST_DICT: self._InstDict,
834
      rpc_defs.ED_INST_DICT_HVP_BEP_DP: self._InstDictHvpBepDp,
835
      rpc_defs.ED_INST_DICT_OSP_DP: self._InstDictOspDp,
836
      rpc_defs.ED_NIC_DICT: self._NicDict,
837

    
838
      # Encoders annotating disk parameters
839
      rpc_defs.ED_DISKS_DICT_DP: self._DisksDictDP,
840
      rpc_defs.ED_MULTI_DISKS_DICT_DP: self._MultiDiskDictDP,
841
      rpc_defs.ED_SINGLE_DISK_DICT_DP: self._SingleDiskDictDP,
842

    
843
      # Encoders with special requirements
844
      rpc_defs.ED_FILE_DETAILS: compat.partial(_PrepareFileUpload, _getents),
845
      })
846

    
847
    # Resolver using configuration
848
    resolver = compat.partial(_NodeConfigResolver, cfg.GetNodeInfo,
849
                              cfg.GetAllNodesInfo)
850

    
851
    # Pylint doesn't recognize multiple inheritance properly, see
852
    # <http://www.logilab.org/ticket/36586> and
853
    # <http://www.logilab.org/ticket/35642>
854
    # pylint: disable=W0233
855
    _RpcClientBase.__init__(self, resolver, encoders.get,
856
                            lock_monitor_cb=lock_monitor_cb,
857
                            _req_process_fn=_req_process_fn)
858
    _generated_rpc.RpcClientConfig.__init__(self)
859
    _generated_rpc.RpcClientBootstrap.__init__(self)
860
    _generated_rpc.RpcClientDnsOnly.__init__(self)
861
    _generated_rpc.RpcClientDefault.__init__(self)
862

    
863
  def _NicDict(self, nic):
864
    """Convert the given nic to a dict and encapsulate netinfo
865

866
    """
867
    n = copy.deepcopy(nic)
868
    if n.network:
869
      net_uuid = self._cfg.LookupNetwork(n.network)
870
      if net_uuid:
871
        nobj = self._cfg.GetNetwork(net_uuid)
872
        n.netinfo = objects.Network.ToDict(nobj)
873
    return n.ToDict()
874

    
875
  def _InstDict(self, instance, hvp=None, bep=None, osp=None):
876
    """Convert the given instance to a dict.
877

878
    This is done via the instance's ToDict() method and additionally
879
    we fill the hvparams with the cluster defaults.
880

881
    @type instance: L{objects.Instance}
882
    @param instance: an Instance object
883
    @type hvp: dict or None
884
    @param hvp: a dictionary with overridden hypervisor parameters
885
    @type bep: dict or None
886
    @param bep: a dictionary with overridden backend parameters
887
    @type osp: dict or None
888
    @param osp: a dictionary with overridden os parameters
889
    @rtype: dict
890
    @return: the instance dict, with the hvparams filled with the
891
        cluster defaults
892

893
    """
894
    idict = instance.ToDict()
895
    cluster = self._cfg.GetClusterInfo()
896
    idict["hvparams"] = cluster.FillHV(instance)
897
    if hvp is not None:
898
      idict["hvparams"].update(hvp)
899
    idict["beparams"] = cluster.FillBE(instance)
900
    if bep is not None:
901
      idict["beparams"].update(bep)
902
    idict["osparams"] = cluster.SimpleFillOS(instance.os, instance.osparams)
903
    if osp is not None:
904
      idict["osparams"].update(osp)
905
    idict["disks"] = self._DisksDictDP((instance.disks, instance))
906
    for nic in idict["nics"]:
907
      nic["nicparams"] = objects.FillDict(
908
        cluster.nicparams[constants.PP_DEFAULT],
909
        nic["nicparams"])
910
      network = nic.get("network", None)
911
      if network:
912
        net_uuid = self._cfg.LookupNetwork(network)
913
        if net_uuid:
914
          nobj = self._cfg.GetNetwork(net_uuid)
915
          nic["netinfo"] = objects.Network.ToDict(nobj)
916
    return idict
917

    
918
  def _InstDictHvpBepDp(self, (instance, hvp, bep)):
919
    """Wrapper for L{_InstDict}.
920

921
    """
922
    return self._InstDict(instance, hvp=hvp, bep=bep)
923

    
924
  def _InstDictOspDp(self, (instance, osparams)):
925
    """Wrapper for L{_InstDict}.
926

927
    """
928
    return self._InstDict(instance, osp=osparams)
929

    
930
  def _DisksDictDP(self, (disks, instance)):
931
    """Wrapper for L{AnnotateDiskParams}.
932

933
    """
934
    diskparams = self._cfg.GetInstanceDiskParams(instance)
935
    return [disk.ToDict()
936
            for disk in AnnotateDiskParams(instance.disk_template,
937
                                           disks, diskparams)]
938

    
939
  def _MultiDiskDictDP(self, disks_insts):
940
    """Wrapper for L{AnnotateDiskParams}.
941

942
    Supports a list of (disk, instance) tuples.
943
    """
944
    return [disk for disk_inst in disks_insts
945
            for disk in self._DisksDictDP(disk_inst)]
946

    
947
  def _SingleDiskDictDP(self, (disk, instance)):
948
    """Wrapper for L{AnnotateDiskParams}.
949

950
    """
951
    (anno_disk,) = self._DisksDictDP(([disk], instance))
952
    return anno_disk
953

    
954

    
955
class JobQueueRunner(_RpcClientBase, _generated_rpc.RpcClientJobQueue):
956
  """RPC wrappers for job queue.
957

958
  """
959
  def __init__(self, context, address_list):
960
    """Initializes this class.
961

962
    """
963
    if address_list is None:
964
      resolver = compat.partial(_SsconfResolver, True)
965
    else:
966
      # Caller provided an address list
967
      resolver = _StaticResolver(address_list)
968

    
969
    _RpcClientBase.__init__(self, resolver, _ENCODERS.get,
970
                            lock_monitor_cb=context.glm.AddToLockMonitor)
971
    _generated_rpc.RpcClientJobQueue.__init__(self)
972

    
973

    
974
class BootstrapRunner(_RpcClientBase,
975
                      _generated_rpc.RpcClientBootstrap,
976
                      _generated_rpc.RpcClientDnsOnly):
977
  """RPC wrappers for bootstrapping.
978

979
  """
980
  def __init__(self):
981
    """Initializes this class.
982

983
    """
984
    # Pylint doesn't recognize multiple inheritance properly, see
985
    # <http://www.logilab.org/ticket/36586> and
986
    # <http://www.logilab.org/ticket/35642>
987
    # pylint: disable=W0233
988
    _RpcClientBase.__init__(self, compat.partial(_SsconfResolver, True),
989
                            _ENCODERS.get)
990
    _generated_rpc.RpcClientBootstrap.__init__(self)
991
    _generated_rpc.RpcClientDnsOnly.__init__(self)
992

    
993

    
994
class DnsOnlyRunner(_RpcClientBase, _generated_rpc.RpcClientDnsOnly):
995
  """RPC wrappers for calls using only DNS.
996

997
  """
998
  def __init__(self):
999
    """Initialize this class.
1000

1001
    """
1002
    _RpcClientBase.__init__(self, compat.partial(_SsconfResolver, False),
1003
                            _ENCODERS.get)
1004
    _generated_rpc.RpcClientDnsOnly.__init__(self)
1005

    
1006

    
1007
class ConfigRunner(_RpcClientBase, _generated_rpc.RpcClientConfig):
1008
  """RPC wrappers for L{config}.
1009

1010
  """
1011
  def __init__(self, context, address_list, _req_process_fn=None,
1012
               _getents=None):
1013
    """Initializes this class.
1014

1015
    """
1016
    if context:
1017
      lock_monitor_cb = context.glm.AddToLockMonitor
1018
    else:
1019
      lock_monitor_cb = None
1020

    
1021
    if address_list is None:
1022
      resolver = compat.partial(_SsconfResolver, True)
1023
    else:
1024
      # Caller provided an address list
1025
      resolver = _StaticResolver(address_list)
1026

    
1027
    encoders = _ENCODERS.copy()
1028

    
1029
    encoders.update({
1030
      rpc_defs.ED_FILE_DETAILS: compat.partial(_PrepareFileUpload, _getents),
1031
      })
1032

    
1033
    _RpcClientBase.__init__(self, resolver, encoders.get,
1034
                            lock_monitor_cb=lock_monitor_cb,
1035
                            _req_process_fn=_req_process_fn)
1036
    _generated_rpc.RpcClientConfig.__init__(self)