Statistics
| Branch: | Tag: | Revision:

root / lib / rpc.py @ b8d26c6e

History | View | Annotate | Download (45.7 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008, 2009, 2010 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Inter-node RPC library.
23

24
"""
25

    
26
# pylint: disable-msg=C0103,R0201,R0904
27
# C0103: Invalid name, since call_ are not valid
28
# R0201: Method could be a function, we keep all rpcs instance methods
29
# as not to change them back and forth between static/instance methods
30
# if they need to start using instance attributes
31
# R0904: Too many public methods
32

    
33
import os
34
import logging
35
import zlib
36
import base64
37
import pycurl
38
import threading
39

    
40
from ganeti import utils
41
from ganeti import objects
42
from ganeti import http
43
from ganeti import serializer
44
from ganeti import constants
45
from ganeti import errors
46
from ganeti import netutils
47
from ganeti import ssconf
48

    
49
# pylint has a bug here, doesn't see this import
50
import ganeti.http.client  # pylint: disable-msg=W0611
51

    
52

    
53
# Timeout for connecting to nodes (seconds)
54
_RPC_CONNECT_TIMEOUT = 5
55

    
56
_RPC_CLIENT_HEADERS = [
57
  "Content-type: %s" % http.HTTP_APP_JSON,
58
  "Expect:",
59
  ]
60

    
61
# Various time constants for the timeout table
62
_TMO_URGENT = 60 # one minute
63
_TMO_FAST = 5 * 60 # five minutes
64
_TMO_NORMAL = 15 * 60 # 15 minutes
65
_TMO_SLOW = 3600 # one hour
66
_TMO_4HRS = 4 * 3600
67
_TMO_1DAY = 86400
68

    
69
# Timeout table that will be built later by decorators
70
# Guidelines for choosing timeouts:
71
# - call used during watcher: timeout -> 1min, _TMO_URGENT
72
# - trivial (but be sure it is trivial) (e.g. reading a file): 5min, _TMO_FAST
73
# - other calls: 15 min, _TMO_NORMAL
74
# - special calls (instance add, etc.): either _TMO_SLOW (1h) or huge timeouts
75

    
76
_TIMEOUTS = {
77
}
78

    
79

    
80
def Init():
81
  """Initializes the module-global HTTP client manager.
82

83
  Must be called before using any RPC function and while exactly one thread is
84
  running.
85

86
  """
87
  # curl_global_init(3) and curl_global_cleanup(3) must be called with only
88
  # one thread running. This check is just a safety measure -- it doesn't
89
  # cover all cases.
90
  assert threading.activeCount() == 1, \
91
         "Found more than one active thread when initializing pycURL"
92

    
93
  logging.info("Using PycURL %s", pycurl.version)
94

    
95
  pycurl.global_init(pycurl.GLOBAL_ALL)
96

    
97

    
98
def Shutdown():
99
  """Stops the module-global HTTP client manager.
100

101
  Must be called before quitting the program and while exactly one thread is
102
  running.
103

104
  """
105
  pycurl.global_cleanup()
106

    
107

    
108
def _ConfigRpcCurl(curl):
109
  noded_cert = str(constants.NODED_CERT_FILE)
110

    
111
  curl.setopt(pycurl.FOLLOWLOCATION, False)
112
  curl.setopt(pycurl.CAINFO, noded_cert)
113
  curl.setopt(pycurl.SSL_VERIFYHOST, 0)
114
  curl.setopt(pycurl.SSL_VERIFYPEER, True)
115
  curl.setopt(pycurl.SSLCERTTYPE, "PEM")
116
  curl.setopt(pycurl.SSLCERT, noded_cert)
117
  curl.setopt(pycurl.SSLKEYTYPE, "PEM")
118
  curl.setopt(pycurl.SSLKEY, noded_cert)
119
  curl.setopt(pycurl.CONNECTTIMEOUT, _RPC_CONNECT_TIMEOUT)
120

    
121

    
122
# Aliasing this module avoids the following warning by epydoc: "Warning: No
123
# information available for ganeti.rpc._RpcThreadLocal's base threading.local"
124
_threading = threading
125

    
126

    
127
class _RpcThreadLocal(_threading.local):
128
  def GetHttpClientPool(self):
129
    """Returns a per-thread HTTP client pool.
130

131
    @rtype: L{http.client.HttpClientPool}
132

133
    """
134
    try:
135
      pool = self.hcp
136
    except AttributeError:
137
      pool = http.client.HttpClientPool(_ConfigRpcCurl)
138
      self.hcp = pool
139

    
140
    return pool
141

    
142

    
143
# Remove module alias (see above)
144
del _threading
145

    
146

    
147
_thread_local = _RpcThreadLocal()
148

    
149

    
150
def _RpcTimeout(secs):
151
  """Timeout decorator.
152

153
  When applied to a rpc call_* function, it updates the global timeout
154
  table with the given function/timeout.
155

156
  """
157
  def decorator(f):
158
    name = f.__name__
159
    assert name.startswith("call_")
160
    _TIMEOUTS[name[len("call_"):]] = secs
161
    return f
162
  return decorator
163

    
164

    
165
def RunWithRPC(fn):
166
  """RPC-wrapper decorator.
167

168
  When applied to a function, it runs it with the RPC system
169
  initialized, and it shutsdown the system afterwards. This means the
170
  function must be called without RPC being initialized.
171

172
  """
173
  def wrapper(*args, **kwargs):
174
    Init()
175
    try:
176
      return fn(*args, **kwargs)
177
    finally:
178
      Shutdown()
179
  return wrapper
180

    
181

    
182
class RpcResult(object):
183
  """RPC Result class.
184

185
  This class holds an RPC result. It is needed since in multi-node
186
  calls we can't raise an exception just because one one out of many
187
  failed, and therefore we use this class to encapsulate the result.
188

189
  @ivar data: the data payload, for successful results, or None
190
  @ivar call: the name of the RPC call
191
  @ivar node: the name of the node to which we made the call
192
  @ivar offline: whether the operation failed because the node was
193
      offline, as opposed to actual failure; offline=True will always
194
      imply failed=True, in order to allow simpler checking if
195
      the user doesn't care about the exact failure mode
196
  @ivar fail_msg: the error message if the call failed
197

198
  """
199
  def __init__(self, data=None, failed=False, offline=False,
200
               call=None, node=None):
201
    self.offline = offline
202
    self.call = call
203
    self.node = node
204

    
205
    if offline:
206
      self.fail_msg = "Node is marked offline"
207
      self.data = self.payload = None
208
    elif failed:
209
      self.fail_msg = self._EnsureErr(data)
210
      self.data = self.payload = None
211
    else:
212
      self.data = data
213
      if not isinstance(self.data, (tuple, list)):
214
        self.fail_msg = ("RPC layer error: invalid result type (%s)" %
215
                         type(self.data))
216
        self.payload = None
217
      elif len(data) != 2:
218
        self.fail_msg = ("RPC layer error: invalid result length (%d), "
219
                         "expected 2" % len(self.data))
220
        self.payload = None
221
      elif not self.data[0]:
222
        self.fail_msg = self._EnsureErr(self.data[1])
223
        self.payload = None
224
      else:
225
        # finally success
226
        self.fail_msg = None
227
        self.payload = data[1]
228

    
229
    assert hasattr(self, "call")
230
    assert hasattr(self, "data")
231
    assert hasattr(self, "fail_msg")
232
    assert hasattr(self, "node")
233
    assert hasattr(self, "offline")
234
    assert hasattr(self, "payload")
235

    
236
  @staticmethod
237
  def _EnsureErr(val):
238
    """Helper to ensure we return a 'True' value for error."""
239
    if val:
240
      return val
241
    else:
242
      return "No error information"
243

    
244
  def Raise(self, msg, prereq=False, ecode=None):
245
    """If the result has failed, raise an OpExecError.
246

247
    This is used so that LU code doesn't have to check for each
248
    result, but instead can call this function.
249

250
    """
251
    if not self.fail_msg:
252
      return
253

    
254
    if not msg: # one could pass None for default message
255
      msg = ("Call '%s' to node '%s' has failed: %s" %
256
             (self.call, self.node, self.fail_msg))
257
    else:
258
      msg = "%s: %s" % (msg, self.fail_msg)
259
    if prereq:
260
      ec = errors.OpPrereqError
261
    else:
262
      ec = errors.OpExecError
263
    if ecode is not None:
264
      args = (msg, ecode)
265
    else:
266
      args = (msg, )
267
    raise ec(*args) # pylint: disable-msg=W0142
268

    
269

    
270
def _AddressLookup(node_list,
271
                   ssc=ssconf.SimpleStore,
272
                   nslookup_fn=netutils.Hostname.GetIP):
273
  """Return addresses for given node names.
274

275
  @type node_list: list
276
  @param node_list: List of node names
277
  @type ssc: class
278
  @param ssc: SimpleStore class that is used to obtain node->ip mappings
279
  @type nslookup_fn: callable
280
  @param nslookup_fn: function use to do NS lookup
281
  @rtype: list of addresses and/or None's
282
  @returns: List of corresponding addresses, if found
283

284
  """
285
  ss = ssc()
286
  iplist = ss.GetNodePrimaryIPList()
287
  family = ss.GetPrimaryIPFamily()
288
  addresses = []
289
  ipmap = dict(entry.split() for entry in iplist)
290
  for node in node_list:
291
    address = ipmap.get(node)
292
    if address is None:
293
      address = nslookup_fn(node, family=family)
294
    addresses.append(address)
295

    
296
  return addresses
297

    
298

    
299
class Client:
300
  """RPC Client class.
301

302
  This class, given a (remote) method name, a list of parameters and a
303
  list of nodes, will contact (in parallel) all nodes, and return a
304
  dict of results (key: node name, value: result).
305

306
  One current bug is that generic failure is still signaled by
307
  'False' result, which is not good. This overloading of values can
308
  cause bugs.
309

310
  """
311
  def __init__(self, procedure, body, port, address_lookup_fn=_AddressLookup):
312
    assert procedure in _TIMEOUTS, ("New RPC call not declared in the"
313
                                    " timeouts table")
314
    self.procedure = procedure
315
    self.body = body
316
    self.port = port
317
    self._request = {}
318
    self._address_lookup_fn = address_lookup_fn
319

    
320
  def ConnectList(self, node_list, address_list=None, read_timeout=None):
321
    """Add a list of nodes to the target nodes.
322

323
    @type node_list: list
324
    @param node_list: the list of node names to connect
325
    @type address_list: list or None
326
    @keyword address_list: either None or a list with node addresses,
327
        which must have the same length as the node list
328
    @type read_timeout: int
329
    @param read_timeout: overwrites default timeout for operation
330

331
    """
332
    if address_list is None:
333
      # Always use IP address instead of node name
334
      address_list = self._address_lookup_fn(node_list)
335

    
336
    assert len(node_list) == len(address_list), \
337
           "Name and address lists must have the same length"
338

    
339
    for node, address in zip(node_list, address_list):
340
      self.ConnectNode(node, address, read_timeout=read_timeout)
341

    
342
  def ConnectNode(self, name, address=None, read_timeout=None):
343
    """Add a node to the target list.
344

345
    @type name: str
346
    @param name: the node name
347
    @type address: str
348
    @param address: the node address, if known
349
    @type read_timeout: int
350
    @param read_timeout: overwrites default timeout for operation
351

352
    """
353
    if address is None:
354
      # Always use IP address instead of node name
355
      address = self._address_lookup_fn([name])[0]
356

    
357
    assert(address is not None)
358

    
359
    if read_timeout is None:
360
      read_timeout = _TIMEOUTS[self.procedure]
361

    
362
    self._request[name] = \
363
      http.client.HttpClientRequest(str(address), self.port,
364
                                    http.HTTP_PUT, str("/%s" % self.procedure),
365
                                    headers=_RPC_CLIENT_HEADERS,
366
                                    post_data=str(self.body),
367
                                    read_timeout=read_timeout)
368

    
369
  def GetResults(self, http_pool=None):
370
    """Call nodes and return results.
371

372
    @rtype: list
373
    @return: List of RPC results
374

375
    """
376
    if not http_pool:
377
      http_pool = _thread_local.GetHttpClientPool()
378

    
379
    http_pool.ProcessRequests(self._request.values())
380

    
381
    results = {}
382

    
383
    for name, req in self._request.iteritems():
384
      if req.success and req.resp_status_code == http.HTTP_OK:
385
        results[name] = RpcResult(data=serializer.LoadJson(req.resp_body),
386
                                  node=name, call=self.procedure)
387
        continue
388

    
389
      # TODO: Better error reporting
390
      if req.error:
391
        msg = req.error
392
      else:
393
        msg = req.resp_body
394

    
395
      logging.error("RPC error in %s from node %s: %s",
396
                    self.procedure, name, msg)
397
      results[name] = RpcResult(data=msg, failed=True, node=name,
398
                                call=self.procedure)
399

    
400
    return results
401

    
402

    
403
def _EncodeImportExportIO(ieio, ieioargs):
404
  """Encodes import/export I/O information.
405

406
  """
407
  if ieio == constants.IEIO_RAW_DISK:
408
    assert len(ieioargs) == 1
409
    return (ieioargs[0].ToDict(), )
410

    
411
  if ieio == constants.IEIO_SCRIPT:
412
    assert len(ieioargs) == 2
413
    return (ieioargs[0].ToDict(), ieioargs[1])
414

    
415
  return ieioargs
416

    
417

    
418
class RpcRunner(object):
419
  """RPC runner class"""
420

    
421
  def __init__(self, cfg):
422
    """Initialized the rpc runner.
423

424
    @type cfg:  C{config.ConfigWriter}
425
    @param cfg: the configuration object that will be used to get data
426
                about the cluster
427

428
    """
429
    self._cfg = cfg
430
    self.port = netutils.GetDaemonPort(constants.NODED)
431

    
432
  def _InstDict(self, instance, hvp=None, bep=None, osp=None):
433
    """Convert the given instance to a dict.
434

435
    This is done via the instance's ToDict() method and additionally
436
    we fill the hvparams with the cluster defaults.
437

438
    @type instance: L{objects.Instance}
439
    @param instance: an Instance object
440
    @type hvp: dict or None
441
    @param hvp: a dictionary with overridden hypervisor parameters
442
    @type bep: dict or None
443
    @param bep: a dictionary with overridden backend parameters
444
    @type osp: dict or None
445
    @param osp: a dictionary with overridden os parameters
446
    @rtype: dict
447
    @return: the instance dict, with the hvparams filled with the
448
        cluster defaults
449

450
    """
451
    idict = instance.ToDict()
452
    cluster = self._cfg.GetClusterInfo()
453
    idict["hvparams"] = cluster.FillHV(instance)
454
    if hvp is not None:
455
      idict["hvparams"].update(hvp)
456
    idict["beparams"] = cluster.FillBE(instance)
457
    if bep is not None:
458
      idict["beparams"].update(bep)
459
    idict["osparams"] = cluster.SimpleFillOS(instance.os, instance.osparams)
460
    if osp is not None:
461
      idict["osparams"].update(osp)
462
    for nic in idict["nics"]:
463
      nic['nicparams'] = objects.FillDict(
464
        cluster.nicparams[constants.PP_DEFAULT],
465
        nic['nicparams'])
466
    return idict
467

    
468
  def _ConnectList(self, client, node_list, call, read_timeout=None):
469
    """Helper for computing node addresses.
470

471
    @type client: L{ganeti.rpc.Client}
472
    @param client: a C{Client} instance
473
    @type node_list: list
474
    @param node_list: the node list we should connect
475
    @type call: string
476
    @param call: the name of the remote procedure call, for filling in
477
        correctly any eventual offline nodes' results
478
    @type read_timeout: int
479
    @param read_timeout: overwrites the default read timeout for the
480
        given operation
481

482
    """
483
    all_nodes = self._cfg.GetAllNodesInfo()
484
    name_list = []
485
    addr_list = []
486
    skip_dict = {}
487
    for node in node_list:
488
      if node in all_nodes:
489
        if all_nodes[node].offline:
490
          skip_dict[node] = RpcResult(node=node, offline=True, call=call)
491
          continue
492
        val = all_nodes[node].primary_ip
493
      else:
494
        val = None
495
      addr_list.append(val)
496
      name_list.append(node)
497
    if name_list:
498
      client.ConnectList(name_list, address_list=addr_list,
499
                         read_timeout=read_timeout)
500
    return skip_dict
501

    
502
  def _ConnectNode(self, client, node, call, read_timeout=None):
503
    """Helper for computing one node's address.
504

505
    @type client: L{ganeti.rpc.Client}
506
    @param client: a C{Client} instance
507
    @type node: str
508
    @param node: the node we should connect
509
    @type call: string
510
    @param call: the name of the remote procedure call, for filling in
511
        correctly any eventual offline nodes' results
512
    @type read_timeout: int
513
    @param read_timeout: overwrites the default read timeout for the
514
        given operation
515

516
    """
517
    node_info = self._cfg.GetNodeInfo(node)
518
    if node_info is not None:
519
      if node_info.offline:
520
        return RpcResult(node=node, offline=True, call=call)
521
      addr = node_info.primary_ip
522
    else:
523
      addr = None
524
    client.ConnectNode(node, address=addr, read_timeout=read_timeout)
525

    
526
  def _MultiNodeCall(self, node_list, procedure, args, read_timeout=None):
527
    """Helper for making a multi-node call
528

529
    """
530
    body = serializer.DumpJson(args, indent=False)
531
    c = Client(procedure, body, self.port)
532
    skip_dict = self._ConnectList(c, node_list, procedure,
533
                                  read_timeout=read_timeout)
534
    skip_dict.update(c.GetResults())
535
    return skip_dict
536

    
537
  @classmethod
538
  def _StaticMultiNodeCall(cls, node_list, procedure, args,
539
                           address_list=None, read_timeout=None):
540
    """Helper for making a multi-node static call
541

542
    """
543
    body = serializer.DumpJson(args, indent=False)
544
    c = Client(procedure, body, netutils.GetDaemonPort(constants.NODED))
545
    c.ConnectList(node_list, address_list=address_list,
546
                  read_timeout=read_timeout)
547
    return c.GetResults()
548

    
549
  def _SingleNodeCall(self, node, procedure, args, read_timeout=None):
550
    """Helper for making a single-node call
551

552
    """
553
    body = serializer.DumpJson(args, indent=False)
554
    c = Client(procedure, body, self.port)
555
    result = self._ConnectNode(c, node, procedure, read_timeout=read_timeout)
556
    if result is None:
557
      # we did connect, node is not offline
558
      result = c.GetResults()[node]
559
    return result
560

    
561
  @classmethod
562
  def _StaticSingleNodeCall(cls, node, procedure, args, read_timeout=None):
563
    """Helper for making a single-node static call
564

565
    """
566
    body = serializer.DumpJson(args, indent=False)
567
    c = Client(procedure, body, netutils.GetDaemonPort(constants.NODED))
568
    c.ConnectNode(node, read_timeout=read_timeout)
569
    return c.GetResults()[node]
570

    
571
  @staticmethod
572
  def _Compress(data):
573
    """Compresses a string for transport over RPC.
574

575
    Small amounts of data are not compressed.
576

577
    @type data: str
578
    @param data: Data
579
    @rtype: tuple
580
    @return: Encoded data to send
581

582
    """
583
    # Small amounts of data are not compressed
584
    if len(data) < 512:
585
      return (constants.RPC_ENCODING_NONE, data)
586

    
587
    # Compress with zlib and encode in base64
588
    return (constants.RPC_ENCODING_ZLIB_BASE64,
589
            base64.b64encode(zlib.compress(data, 3)))
590

    
591
  #
592
  # Begin RPC calls
593
  #
594

    
595
  @_RpcTimeout(_TMO_URGENT)
596
  def call_lv_list(self, node_list, vg_name):
597
    """Gets the logical volumes present in a given volume group.
598

599
    This is a multi-node call.
600

601
    """
602
    return self._MultiNodeCall(node_list, "lv_list", [vg_name])
603

    
604
  @_RpcTimeout(_TMO_URGENT)
605
  def call_vg_list(self, node_list):
606
    """Gets the volume group list.
607

608
    This is a multi-node call.
609

610
    """
611
    return self._MultiNodeCall(node_list, "vg_list", [])
612

    
613
  @_RpcTimeout(_TMO_NORMAL)
614
  def call_storage_list(self, node_list, su_name, su_args, name, fields):
615
    """Get list of storage units.
616

617
    This is a multi-node call.
618

619
    """
620
    return self._MultiNodeCall(node_list, "storage_list",
621
                               [su_name, su_args, name, fields])
622

    
623
  @_RpcTimeout(_TMO_NORMAL)
624
  def call_storage_modify(self, node, su_name, su_args, name, changes):
625
    """Modify a storage unit.
626

627
    This is a single-node call.
628

629
    """
630
    return self._SingleNodeCall(node, "storage_modify",
631
                                [su_name, su_args, name, changes])
632

    
633
  @_RpcTimeout(_TMO_NORMAL)
634
  def call_storage_execute(self, node, su_name, su_args, name, op):
635
    """Executes an operation on a storage unit.
636

637
    This is a single-node call.
638

639
    """
640
    return self._SingleNodeCall(node, "storage_execute",
641
                                [su_name, su_args, name, op])
642

    
643
  @_RpcTimeout(_TMO_URGENT)
644
  def call_bridges_exist(self, node, bridges_list):
645
    """Checks if a node has all the bridges given.
646

647
    This method checks if all bridges given in the bridges_list are
648
    present on the remote node, so that an instance that uses interfaces
649
    on those bridges can be started.
650

651
    This is a single-node call.
652

653
    """
654
    return self._SingleNodeCall(node, "bridges_exist", [bridges_list])
655

    
656
  @_RpcTimeout(_TMO_NORMAL)
657
  def call_instance_start(self, node, instance, hvp, bep):
658
    """Starts an instance.
659

660
    This is a single-node call.
661

662
    """
663
    idict = self._InstDict(instance, hvp=hvp, bep=bep)
664
    return self._SingleNodeCall(node, "instance_start", [idict])
665

    
666
  @_RpcTimeout(_TMO_NORMAL)
667
  def call_instance_shutdown(self, node, instance, timeout):
668
    """Stops an instance.
669

670
    This is a single-node call.
671

672
    """
673
    return self._SingleNodeCall(node, "instance_shutdown",
674
                                [self._InstDict(instance), timeout])
675

    
676
  @_RpcTimeout(_TMO_NORMAL)
677
  def call_migration_info(self, node, instance):
678
    """Gather the information necessary to prepare an instance migration.
679

680
    This is a single-node call.
681

682
    @type node: string
683
    @param node: the node on which the instance is currently running
684
    @type instance: C{objects.Instance}
685
    @param instance: the instance definition
686

687
    """
688
    return self._SingleNodeCall(node, "migration_info",
689
                                [self._InstDict(instance)])
690

    
691
  @_RpcTimeout(_TMO_NORMAL)
692
  def call_accept_instance(self, node, instance, info, target):
693
    """Prepare a node to accept an instance.
694

695
    This is a single-node call.
696

697
    @type node: string
698
    @param node: the target node for the migration
699
    @type instance: C{objects.Instance}
700
    @param instance: the instance definition
701
    @type info: opaque/hypervisor specific (string/data)
702
    @param info: result for the call_migration_info call
703
    @type target: string
704
    @param target: target hostname (usually ip address) (on the node itself)
705

706
    """
707
    return self._SingleNodeCall(node, "accept_instance",
708
                                [self._InstDict(instance), info, target])
709

    
710
  @_RpcTimeout(_TMO_NORMAL)
711
  def call_finalize_migration(self, node, instance, info, success):
712
    """Finalize any target-node migration specific operation.
713

714
    This is called both in case of a successful migration and in case of error
715
    (in which case it should abort the migration).
716

717
    This is a single-node call.
718

719
    @type node: string
720
    @param node: the target node for the migration
721
    @type instance: C{objects.Instance}
722
    @param instance: the instance definition
723
    @type info: opaque/hypervisor specific (string/data)
724
    @param info: result for the call_migration_info call
725
    @type success: boolean
726
    @param success: whether the migration was a success or a failure
727

728
    """
729
    return self._SingleNodeCall(node, "finalize_migration",
730
                                [self._InstDict(instance), info, success])
731

    
732
  @_RpcTimeout(_TMO_SLOW)
733
  def call_instance_migrate(self, node, instance, target, live):
734
    """Migrate an instance.
735

736
    This is a single-node call.
737

738
    @type node: string
739
    @param node: the node on which the instance is currently running
740
    @type instance: C{objects.Instance}
741
    @param instance: the instance definition
742
    @type target: string
743
    @param target: the target node name
744
    @type live: boolean
745
    @param live: whether the migration should be done live or not (the
746
        interpretation of this parameter is left to the hypervisor)
747

748
    """
749
    return self._SingleNodeCall(node, "instance_migrate",
750
                                [self._InstDict(instance), target, live])
751

    
752
  @_RpcTimeout(_TMO_NORMAL)
753
  def call_instance_reboot(self, node, inst, reboot_type, shutdown_timeout):
754
    """Reboots an instance.
755

756
    This is a single-node call.
757

758
    """
759
    return self._SingleNodeCall(node, "instance_reboot",
760
                                [self._InstDict(inst), reboot_type,
761
                                 shutdown_timeout])
762

    
763
  @_RpcTimeout(_TMO_1DAY)
764
  def call_instance_os_add(self, node, inst, reinstall, debug, osparams=None):
765
    """Installs an OS on the given instance.
766

767
    This is a single-node call.
768

769
    """
770
    return self._SingleNodeCall(node, "instance_os_add",
771
                                [self._InstDict(inst, osp=osparams),
772
                                 reinstall, debug])
773

    
774
  @_RpcTimeout(_TMO_SLOW)
775
  def call_instance_run_rename(self, node, inst, old_name, debug):
776
    """Run the OS rename script for an instance.
777

778
    This is a single-node call.
779

780
    """
781
    return self._SingleNodeCall(node, "instance_run_rename",
782
                                [self._InstDict(inst), old_name, debug])
783

    
784
  @_RpcTimeout(_TMO_URGENT)
785
  def call_instance_info(self, node, instance, hname):
786
    """Returns information about a single instance.
787

788
    This is a single-node call.
789

790
    @type node: list
791
    @param node: the list of nodes to query
792
    @type instance: string
793
    @param instance: the instance name
794
    @type hname: string
795
    @param hname: the hypervisor type of the instance
796

797
    """
798
    return self._SingleNodeCall(node, "instance_info", [instance, hname])
799

    
800
  @_RpcTimeout(_TMO_NORMAL)
801
  def call_instance_migratable(self, node, instance):
802
    """Checks whether the given instance can be migrated.
803

804
    This is a single-node call.
805

806
    @param node: the node to query
807
    @type instance: L{objects.Instance}
808
    @param instance: the instance to check
809

810

811
    """
812
    return self._SingleNodeCall(node, "instance_migratable",
813
                                [self._InstDict(instance)])
814

    
815
  @_RpcTimeout(_TMO_URGENT)
816
  def call_all_instances_info(self, node_list, hypervisor_list):
817
    """Returns information about all instances on the given nodes.
818

819
    This is a multi-node call.
820

821
    @type node_list: list
822
    @param node_list: the list of nodes to query
823
    @type hypervisor_list: list
824
    @param hypervisor_list: the hypervisors to query for instances
825

826
    """
827
    return self._MultiNodeCall(node_list, "all_instances_info",
828
                               [hypervisor_list])
829

    
830
  @_RpcTimeout(_TMO_URGENT)
831
  def call_instance_list(self, node_list, hypervisor_list):
832
    """Returns the list of running instances on a given node.
833

834
    This is a multi-node call.
835

836
    @type node_list: list
837
    @param node_list: the list of nodes to query
838
    @type hypervisor_list: list
839
    @param hypervisor_list: the hypervisors to query for instances
840

841
    """
842
    return self._MultiNodeCall(node_list, "instance_list", [hypervisor_list])
843

    
844
  @_RpcTimeout(_TMO_FAST)
845
  def call_node_tcp_ping(self, node, source, target, port, timeout,
846
                         live_port_needed):
847
    """Do a TcpPing on the remote node
848

849
    This is a single-node call.
850

851
    """
852
    return self._SingleNodeCall(node, "node_tcp_ping",
853
                                [source, target, port, timeout,
854
                                 live_port_needed])
855

    
856
  @_RpcTimeout(_TMO_FAST)
857
  def call_node_has_ip_address(self, node, address):
858
    """Checks if a node has the given IP address.
859

860
    This is a single-node call.
861

862
    """
863
    return self._SingleNodeCall(node, "node_has_ip_address", [address])
864

    
865
  @_RpcTimeout(_TMO_URGENT)
866
  def call_node_info(self, node_list, vg_name, hypervisor_type):
867
    """Return node information.
868

869
    This will return memory information and volume group size and free
870
    space.
871

872
    This is a multi-node call.
873

874
    @type node_list: list
875
    @param node_list: the list of nodes to query
876
    @type vg_name: C{string}
877
    @param vg_name: the name of the volume group to ask for disk space
878
        information
879
    @type hypervisor_type: C{str}
880
    @param hypervisor_type: the name of the hypervisor to ask for
881
        memory information
882

883
    """
884
    return self._MultiNodeCall(node_list, "node_info",
885
                               [vg_name, hypervisor_type])
886

    
887
  @_RpcTimeout(_TMO_NORMAL)
888
  def call_etc_hosts_modify(self, node, mode, name, ip):
889
    """Modify hosts file with name
890

891
    @type node: string
892
    @param node: The node to call
893
    @type mode: string
894
    @param mode: The mode to operate. Currently "add" or "remove"
895
    @type name: string
896
    @param name: The host name to be modified
897
    @type ip: string
898
    @param ip: The ip of the entry (just valid if mode is "add")
899

900
    """
901
    return self._SingleNodeCall(node, "etc_hosts_modify", [mode, name, ip])
902

    
903
  @_RpcTimeout(_TMO_NORMAL)
904
  def call_node_verify(self, node_list, checkdict, cluster_name):
905
    """Request verification of given parameters.
906

907
    This is a multi-node call.
908

909
    """
910
    return self._MultiNodeCall(node_list, "node_verify",
911
                               [checkdict, cluster_name])
912

    
913
  @classmethod
914
  @_RpcTimeout(_TMO_FAST)
915
  def call_node_start_master(cls, node, start_daemons, no_voting):
916
    """Tells a node to activate itself as a master.
917

918
    This is a single-node call.
919

920
    """
921
    return cls._StaticSingleNodeCall(node, "node_start_master",
922
                                     [start_daemons, no_voting])
923

    
924
  @classmethod
925
  @_RpcTimeout(_TMO_FAST)
926
  def call_node_stop_master(cls, node, stop_daemons):
927
    """Tells a node to demote itself from master status.
928

929
    This is a single-node call.
930

931
    """
932
    return cls._StaticSingleNodeCall(node, "node_stop_master", [stop_daemons])
933

    
934
  @classmethod
935
  @_RpcTimeout(_TMO_URGENT)
936
  def call_master_info(cls, node_list):
937
    """Query master info.
938

939
    This is a multi-node call.
940

941
    """
942
    # TODO: should this method query down nodes?
943
    return cls._StaticMultiNodeCall(node_list, "master_info", [])
944

    
945
  @classmethod
946
  @_RpcTimeout(_TMO_URGENT)
947
  def call_version(cls, node_list):
948
    """Query node version.
949

950
    This is a multi-node call.
951

952
    """
953
    return cls._StaticMultiNodeCall(node_list, "version", [])
954

    
955
  @_RpcTimeout(_TMO_NORMAL)
956
  def call_blockdev_create(self, node, bdev, size, owner, on_primary, info):
957
    """Request creation of a given block device.
958

959
    This is a single-node call.
960

961
    """
962
    return self._SingleNodeCall(node, "blockdev_create",
963
                                [bdev.ToDict(), size, owner, on_primary, info])
964

    
965
  @_RpcTimeout(_TMO_SLOW)
966
  def call_blockdev_wipe(self, node, bdev, offset, size):
967
    """Request wipe at given offset with given size of a block device.
968

969
    This is a single-node call.
970

971
    """
972
    return self._SingleNodeCall(node, "blockdev_wipe",
973
                                [bdev.ToDict(), offset, size])
974

    
975
  @_RpcTimeout(_TMO_NORMAL)
976
  def call_blockdev_remove(self, node, bdev):
977
    """Request removal of a given block device.
978

979
    This is a single-node call.
980

981
    """
982
    return self._SingleNodeCall(node, "blockdev_remove", [bdev.ToDict()])
983

    
984
  @_RpcTimeout(_TMO_NORMAL)
985
  def call_blockdev_rename(self, node, devlist):
986
    """Request rename of the given block devices.
987

988
    This is a single-node call.
989

990
    """
991
    return self._SingleNodeCall(node, "blockdev_rename",
992
                                [(d.ToDict(), uid) for d, uid in devlist])
993

    
994
  @_RpcTimeout(_TMO_NORMAL)
995
  def call_blockdev_assemble(self, node, disk, owner, on_primary):
996
    """Request assembling of a given block device.
997

998
    This is a single-node call.
999

1000
    """
1001
    return self._SingleNodeCall(node, "blockdev_assemble",
1002
                                [disk.ToDict(), owner, on_primary])
1003

    
1004
  @_RpcTimeout(_TMO_NORMAL)
1005
  def call_blockdev_shutdown(self, node, disk):
1006
    """Request shutdown of a given block device.
1007

1008
    This is a single-node call.
1009

1010
    """
1011
    return self._SingleNodeCall(node, "blockdev_shutdown", [disk.ToDict()])
1012

    
1013
  @_RpcTimeout(_TMO_NORMAL)
1014
  def call_blockdev_addchildren(self, node, bdev, ndevs):
1015
    """Request adding a list of children to a (mirroring) device.
1016

1017
    This is a single-node call.
1018

1019
    """
1020
    return self._SingleNodeCall(node, "blockdev_addchildren",
1021
                                [bdev.ToDict(),
1022
                                 [disk.ToDict() for disk in ndevs]])
1023

    
1024
  @_RpcTimeout(_TMO_NORMAL)
1025
  def call_blockdev_removechildren(self, node, bdev, ndevs):
1026
    """Request removing a list of children from a (mirroring) device.
1027

1028
    This is a single-node call.
1029

1030
    """
1031
    return self._SingleNodeCall(node, "blockdev_removechildren",
1032
                                [bdev.ToDict(),
1033
                                 [disk.ToDict() for disk in ndevs]])
1034

    
1035
  @_RpcTimeout(_TMO_NORMAL)
1036
  def call_blockdev_getmirrorstatus(self, node, disks):
1037
    """Request status of a (mirroring) device.
1038

1039
    This is a single-node call.
1040

1041
    """
1042
    result = self._SingleNodeCall(node, "blockdev_getmirrorstatus",
1043
                                  [dsk.ToDict() for dsk in disks])
1044
    if not result.fail_msg:
1045
      result.payload = [objects.BlockDevStatus.FromDict(i)
1046
                        for i in result.payload]
1047
    return result
1048

    
1049
  @_RpcTimeout(_TMO_NORMAL)
1050
  def call_blockdev_getmirrorstatus_multi(self, node_list, node_disks):
1051
    """Request status of (mirroring) devices from multiple nodes.
1052

1053
    This is a multi-node call.
1054

1055
    """
1056
    result = self._MultiNodeCall(node_list, "blockdev_getmirrorstatus_multi",
1057
                                 [dict((name, [dsk.ToDict() for dsk in disks])
1058
                                       for name, disks in node_disks.items())])
1059
    for nres in result.values():
1060
      if not nres.fail_msg:
1061
        nres.payload = [objects.BlockDevStatus.FromDict(i)
1062
                        for i in nres.payload]
1063
    return result
1064

    
1065
  @_RpcTimeout(_TMO_NORMAL)
1066
  def call_blockdev_find(self, node, disk):
1067
    """Request identification of a given block device.
1068

1069
    This is a single-node call.
1070

1071
    """
1072
    result = self._SingleNodeCall(node, "blockdev_find", [disk.ToDict()])
1073
    if not result.fail_msg and result.payload is not None:
1074
      result.payload = objects.BlockDevStatus.FromDict(result.payload)
1075
    return result
1076

    
1077
  @_RpcTimeout(_TMO_NORMAL)
1078
  def call_blockdev_close(self, node, instance_name, disks):
1079
    """Closes the given block devices.
1080

1081
    This is a single-node call.
1082

1083
    """
1084
    params = [instance_name, [cf.ToDict() for cf in disks]]
1085
    return self._SingleNodeCall(node, "blockdev_close", params)
1086

    
1087
  @_RpcTimeout(_TMO_NORMAL)
1088
  def call_blockdev_getsizes(self, node, disks):
1089
    """Returns the size of the given disks.
1090

1091
    This is a single-node call.
1092

1093
    """
1094
    params = [[cf.ToDict() for cf in disks]]
1095
    return self._SingleNodeCall(node, "blockdev_getsize", params)
1096

    
1097
  @_RpcTimeout(_TMO_NORMAL)
1098
  def call_drbd_disconnect_net(self, node_list, nodes_ip, disks):
1099
    """Disconnects the network of the given drbd devices.
1100

1101
    This is a multi-node call.
1102

1103
    """
1104
    return self._MultiNodeCall(node_list, "drbd_disconnect_net",
1105
                               [nodes_ip, [cf.ToDict() for cf in disks]])
1106

    
1107
  @_RpcTimeout(_TMO_NORMAL)
1108
  def call_drbd_attach_net(self, node_list, nodes_ip,
1109
                           disks, instance_name, multimaster):
1110
    """Disconnects the given drbd devices.
1111

1112
    This is a multi-node call.
1113

1114
    """
1115
    return self._MultiNodeCall(node_list, "drbd_attach_net",
1116
                               [nodes_ip, [cf.ToDict() for cf in disks],
1117
                                instance_name, multimaster])
1118

    
1119
  @_RpcTimeout(_TMO_SLOW)
1120
  def call_drbd_wait_sync(self, node_list, nodes_ip, disks):
1121
    """Waits for the synchronization of drbd devices is complete.
1122

1123
    This is a multi-node call.
1124

1125
    """
1126
    return self._MultiNodeCall(node_list, "drbd_wait_sync",
1127
                               [nodes_ip, [cf.ToDict() for cf in disks]])
1128

    
1129
  @_RpcTimeout(_TMO_URGENT)
1130
  def call_drbd_helper(self, node_list):
1131
    """Gets drbd helper.
1132

1133
    This is a multi-node call.
1134

1135
    """
1136
    return self._MultiNodeCall(node_list, "drbd_helper", [])
1137

    
1138
  @classmethod
1139
  @_RpcTimeout(_TMO_NORMAL)
1140
  def call_upload_file(cls, node_list, file_name, address_list=None):
1141
    """Upload a file.
1142

1143
    The node will refuse the operation in case the file is not on the
1144
    approved file list.
1145

1146
    This is a multi-node call.
1147

1148
    @type node_list: list
1149
    @param node_list: the list of node names to upload to
1150
    @type file_name: str
1151
    @param file_name: the filename to upload
1152
    @type address_list: list or None
1153
    @keyword address_list: an optional list of node addresses, in order
1154
        to optimize the RPC speed
1155

1156
    """
1157
    file_contents = utils.ReadFile(file_name)
1158
    data = cls._Compress(file_contents)
1159
    st = os.stat(file_name)
1160
    params = [file_name, data, st.st_mode, st.st_uid, st.st_gid,
1161
              st.st_atime, st.st_mtime]
1162
    return cls._StaticMultiNodeCall(node_list, "upload_file", params,
1163
                                    address_list=address_list)
1164

    
1165
  @classmethod
1166
  @_RpcTimeout(_TMO_NORMAL)
1167
  def call_write_ssconf_files(cls, node_list, values):
1168
    """Write ssconf files.
1169

1170
    This is a multi-node call.
1171

1172
    """
1173
    return cls._StaticMultiNodeCall(node_list, "write_ssconf_files", [values])
1174

    
1175
  @_RpcTimeout(_TMO_FAST)
1176
  def call_os_diagnose(self, node_list):
1177
    """Request a diagnose of OS definitions.
1178

1179
    This is a multi-node call.
1180

1181
    """
1182
    return self._MultiNodeCall(node_list, "os_diagnose", [])
1183

    
1184
  @_RpcTimeout(_TMO_FAST)
1185
  def call_os_get(self, node, name):
1186
    """Returns an OS definition.
1187

1188
    This is a single-node call.
1189

1190
    """
1191
    result = self._SingleNodeCall(node, "os_get", [name])
1192
    if not result.fail_msg and isinstance(result.payload, dict):
1193
      result.payload = objects.OS.FromDict(result.payload)
1194
    return result
1195

    
1196
  @_RpcTimeout(_TMO_FAST)
1197
  def call_os_validate(self, required, nodes, name, checks, params):
1198
    """Run a validation routine for a given OS.
1199

1200
    This is a multi-node call.
1201

1202
    """
1203
    return self._MultiNodeCall(nodes, "os_validate",
1204
                               [required, name, checks, params])
1205

    
1206
  @_RpcTimeout(_TMO_NORMAL)
1207
  def call_hooks_runner(self, node_list, hpath, phase, env):
1208
    """Call the hooks runner.
1209

1210
    Args:
1211
      - op: the OpCode instance
1212
      - env: a dictionary with the environment
1213

1214
    This is a multi-node call.
1215

1216
    """
1217
    params = [hpath, phase, env]
1218
    return self._MultiNodeCall(node_list, "hooks_runner", params)
1219

    
1220
  @_RpcTimeout(_TMO_NORMAL)
1221
  def call_iallocator_runner(self, node, name, idata):
1222
    """Call an iallocator on a remote node
1223

1224
    Args:
1225
      - name: the iallocator name
1226
      - input: the json-encoded input string
1227

1228
    This is a single-node call.
1229

1230
    """
1231
    return self._SingleNodeCall(node, "iallocator_runner", [name, idata])
1232

    
1233
  @_RpcTimeout(_TMO_NORMAL)
1234
  def call_blockdev_grow(self, node, cf_bdev, amount):
1235
    """Request a snapshot of the given block device.
1236

1237
    This is a single-node call.
1238

1239
    """
1240
    return self._SingleNodeCall(node, "blockdev_grow",
1241
                                [cf_bdev.ToDict(), amount])
1242

    
1243
  @_RpcTimeout(_TMO_1DAY)
1244
  def call_blockdev_export(self, node, cf_bdev,
1245
                           dest_node, dest_path, cluster_name):
1246
    """Export a given disk to another node.
1247

1248
    This is a single-node call.
1249

1250
    """
1251
    return self._SingleNodeCall(node, "blockdev_export",
1252
                                [cf_bdev.ToDict(), dest_node, dest_path,
1253
                                 cluster_name])
1254

    
1255
  @_RpcTimeout(_TMO_NORMAL)
1256
  def call_blockdev_snapshot(self, node, cf_bdev):
1257
    """Request a snapshot of the given block device.
1258

1259
    This is a single-node call.
1260

1261
    """
1262
    return self._SingleNodeCall(node, "blockdev_snapshot", [cf_bdev.ToDict()])
1263

    
1264
  @_RpcTimeout(_TMO_NORMAL)
1265
  def call_finalize_export(self, node, instance, snap_disks):
1266
    """Request the completion of an export operation.
1267

1268
    This writes the export config file, etc.
1269

1270
    This is a single-node call.
1271

1272
    """
1273
    flat_disks = []
1274
    for disk in snap_disks:
1275
      if isinstance(disk, bool):
1276
        flat_disks.append(disk)
1277
      else:
1278
        flat_disks.append(disk.ToDict())
1279

    
1280
    return self._SingleNodeCall(node, "finalize_export",
1281
                                [self._InstDict(instance), flat_disks])
1282

    
1283
  @_RpcTimeout(_TMO_FAST)
1284
  def call_export_info(self, node, path):
1285
    """Queries the export information in a given path.
1286

1287
    This is a single-node call.
1288

1289
    """
1290
    return self._SingleNodeCall(node, "export_info", [path])
1291

    
1292
  @_RpcTimeout(_TMO_FAST)
1293
  def call_export_list(self, node_list):
1294
    """Gets the stored exports list.
1295

1296
    This is a multi-node call.
1297

1298
    """
1299
    return self._MultiNodeCall(node_list, "export_list", [])
1300

    
1301
  @_RpcTimeout(_TMO_FAST)
1302
  def call_export_remove(self, node, export):
1303
    """Requests removal of a given export.
1304

1305
    This is a single-node call.
1306

1307
    """
1308
    return self._SingleNodeCall(node, "export_remove", [export])
1309

    
1310
  @classmethod
1311
  @_RpcTimeout(_TMO_NORMAL)
1312
  def call_node_leave_cluster(cls, node, modify_ssh_setup):
1313
    """Requests a node to clean the cluster information it has.
1314

1315
    This will remove the configuration information from the ganeti data
1316
    dir.
1317

1318
    This is a single-node call.
1319

1320
    """
1321
    return cls._StaticSingleNodeCall(node, "node_leave_cluster",
1322
                                     [modify_ssh_setup])
1323

    
1324
  @_RpcTimeout(_TMO_FAST)
1325
  def call_node_volumes(self, node_list):
1326
    """Gets all volumes on node(s).
1327

1328
    This is a multi-node call.
1329

1330
    """
1331
    return self._MultiNodeCall(node_list, "node_volumes", [])
1332

    
1333
  @_RpcTimeout(_TMO_FAST)
1334
  def call_node_demote_from_mc(self, node):
1335
    """Demote a node from the master candidate role.
1336

1337
    This is a single-node call.
1338

1339
    """
1340
    return self._SingleNodeCall(node, "node_demote_from_mc", [])
1341

    
1342
  @_RpcTimeout(_TMO_NORMAL)
1343
  def call_node_powercycle(self, node, hypervisor):
1344
    """Tries to powercycle a node.
1345

1346
    This is a single-node call.
1347

1348
    """
1349
    return self._SingleNodeCall(node, "node_powercycle", [hypervisor])
1350

    
1351
  @_RpcTimeout(None)
1352
  def call_test_delay(self, node_list, duration):
1353
    """Sleep for a fixed time on given node(s).
1354

1355
    This is a multi-node call.
1356

1357
    """
1358
    return self._MultiNodeCall(node_list, "test_delay", [duration],
1359
                               read_timeout=int(duration + 5))
1360

    
1361
  @_RpcTimeout(_TMO_FAST)
1362
  def call_file_storage_dir_create(self, node, file_storage_dir):
1363
    """Create the given file storage directory.
1364

1365
    This is a single-node call.
1366

1367
    """
1368
    return self._SingleNodeCall(node, "file_storage_dir_create",
1369
                                [file_storage_dir])
1370

    
1371
  @_RpcTimeout(_TMO_FAST)
1372
  def call_file_storage_dir_remove(self, node, file_storage_dir):
1373
    """Remove the given file storage directory.
1374

1375
    This is a single-node call.
1376

1377
    """
1378
    return self._SingleNodeCall(node, "file_storage_dir_remove",
1379
                                [file_storage_dir])
1380

    
1381
  @_RpcTimeout(_TMO_FAST)
1382
  def call_file_storage_dir_rename(self, node, old_file_storage_dir,
1383
                                   new_file_storage_dir):
1384
    """Rename file storage directory.
1385

1386
    This is a single-node call.
1387

1388
    """
1389
    return self._SingleNodeCall(node, "file_storage_dir_rename",
1390
                                [old_file_storage_dir, new_file_storage_dir])
1391

    
1392
  @classmethod
1393
  @_RpcTimeout(_TMO_FAST)
1394
  def call_jobqueue_update(cls, node_list, address_list, file_name, content):
1395
    """Update job queue.
1396

1397
    This is a multi-node call.
1398

1399
    """
1400
    return cls._StaticMultiNodeCall(node_list, "jobqueue_update",
1401
                                    [file_name, cls._Compress(content)],
1402
                                    address_list=address_list)
1403

    
1404
  @classmethod
1405
  @_RpcTimeout(_TMO_NORMAL)
1406
  def call_jobqueue_purge(cls, node):
1407
    """Purge job queue.
1408

1409
    This is a single-node call.
1410

1411
    """
1412
    return cls._StaticSingleNodeCall(node, "jobqueue_purge", [])
1413

    
1414
  @classmethod
1415
  @_RpcTimeout(_TMO_FAST)
1416
  def call_jobqueue_rename(cls, node_list, address_list, rename):
1417
    """Rename a job queue file.
1418

1419
    This is a multi-node call.
1420

1421
    """
1422
    return cls._StaticMultiNodeCall(node_list, "jobqueue_rename", rename,
1423
                                    address_list=address_list)
1424

    
1425
  @_RpcTimeout(_TMO_NORMAL)
1426
  def call_hypervisor_validate_params(self, node_list, hvname, hvparams):
1427
    """Validate the hypervisor params.
1428

1429
    This is a multi-node call.
1430

1431
    @type node_list: list
1432
    @param node_list: the list of nodes to query
1433
    @type hvname: string
1434
    @param hvname: the hypervisor name
1435
    @type hvparams: dict
1436
    @param hvparams: the hypervisor parameters to be validated
1437

1438
    """
1439
    cluster = self._cfg.GetClusterInfo()
1440
    hv_full = objects.FillDict(cluster.hvparams.get(hvname, {}), hvparams)
1441
    return self._MultiNodeCall(node_list, "hypervisor_validate_params",
1442
                               [hvname, hv_full])
1443

    
1444
  @_RpcTimeout(_TMO_NORMAL)
1445
  def call_x509_cert_create(self, node, validity):
1446
    """Creates a new X509 certificate for SSL/TLS.
1447

1448
    This is a single-node call.
1449

1450
    @type validity: int
1451
    @param validity: Validity in seconds
1452

1453
    """
1454
    return self._SingleNodeCall(node, "x509_cert_create", [validity])
1455

    
1456
  @_RpcTimeout(_TMO_NORMAL)
1457
  def call_x509_cert_remove(self, node, name):
1458
    """Removes a X509 certificate.
1459

1460
    This is a single-node call.
1461

1462
    @type name: string
1463
    @param name: Certificate name
1464

1465
    """
1466
    return self._SingleNodeCall(node, "x509_cert_remove", [name])
1467

    
1468
  @_RpcTimeout(_TMO_NORMAL)
1469
  def call_import_start(self, node, opts, instance, dest, dest_args):
1470
    """Starts a listener for an import.
1471

1472
    This is a single-node call.
1473

1474
    @type node: string
1475
    @param node: Node name
1476
    @type instance: C{objects.Instance}
1477
    @param instance: Instance object
1478

1479
    """
1480
    return self._SingleNodeCall(node, "import_start",
1481
                                [opts.ToDict(),
1482
                                 self._InstDict(instance), dest,
1483
                                 _EncodeImportExportIO(dest, dest_args)])
1484

    
1485
  @_RpcTimeout(_TMO_NORMAL)
1486
  def call_export_start(self, node, opts, host, port,
1487
                        instance, source, source_args):
1488
    """Starts an export daemon.
1489

1490
    This is a single-node call.
1491

1492
    @type node: string
1493
    @param node: Node name
1494
    @type instance: C{objects.Instance}
1495
    @param instance: Instance object
1496

1497
    """
1498
    return self._SingleNodeCall(node, "export_start",
1499
                                [opts.ToDict(), host, port,
1500
                                 self._InstDict(instance), source,
1501
                                 _EncodeImportExportIO(source, source_args)])
1502

    
1503
  @_RpcTimeout(_TMO_FAST)
1504
  def call_impexp_status(self, node, names):
1505
    """Gets the status of an import or export.
1506

1507
    This is a single-node call.
1508

1509
    @type node: string
1510
    @param node: Node name
1511
    @type names: List of strings
1512
    @param names: Import/export names
1513
    @rtype: List of L{objects.ImportExportStatus} instances
1514
    @return: Returns a list of the state of each named import/export or None if
1515
             a status couldn't be retrieved
1516

1517
    """
1518
    result = self._SingleNodeCall(node, "impexp_status", [names])
1519

    
1520
    if not result.fail_msg:
1521
      decoded = []
1522

    
1523
      for i in result.payload:
1524
        if i is None:
1525
          decoded.append(None)
1526
          continue
1527
        decoded.append(objects.ImportExportStatus.FromDict(i))
1528

    
1529
      result.payload = decoded
1530

    
1531
    return result
1532

    
1533
  @_RpcTimeout(_TMO_NORMAL)
1534
  def call_impexp_abort(self, node, name):
1535
    """Aborts an import or export.
1536

1537
    This is a single-node call.
1538

1539
    @type node: string
1540
    @param node: Node name
1541
    @type name: string
1542
    @param name: Import/export name
1543

1544
    """
1545
    return self._SingleNodeCall(node, "impexp_abort", [name])
1546

    
1547
  @_RpcTimeout(_TMO_NORMAL)
1548
  def call_impexp_cleanup(self, node, name):
1549
    """Cleans up after an import or export.
1550

1551
    This is a single-node call.
1552

1553
    @type node: string
1554
    @param node: Node name
1555
    @type name: string
1556
    @param name: Import/export name
1557

1558
    """
1559
    return self._SingleNodeCall(node, "impexp_cleanup", [name])