Statistics
| Branch: | Tag: | Revision:

root / lib / rpc.py @ 4f6014d4

History | View | Annotate | Download (46.1 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008, 2009, 2010 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Inter-node RPC library.
23

24
"""
25

    
26
# pylint: disable-msg=C0103,R0201,R0904
27
# C0103: Invalid name, since call_ are not valid
28
# R0201: Method could be a function, we keep all rpcs instance methods
29
# as not to change them back and forth between static/instance methods
30
# if they need to start using instance attributes
31
# R0904: Too many public methods
32

    
33
import os
34
import logging
35
import zlib
36
import base64
37
import pycurl
38
import threading
39

    
40
from ganeti import utils
41
from ganeti import objects
42
from ganeti import http
43
from ganeti import serializer
44
from ganeti import constants
45
from ganeti import errors
46
from ganeti import netutils
47
from ganeti import ssconf
48

    
49
# pylint has a bug here, doesn't see this import
50
import ganeti.http.client  # pylint: disable-msg=W0611
51

    
52

    
53
# Timeout for connecting to nodes (seconds)
54
_RPC_CONNECT_TIMEOUT = 5
55

    
56
_RPC_CLIENT_HEADERS = [
57
  "Content-type: %s" % http.HTTP_APP_JSON,
58
  "Expect:",
59
  ]
60

    
61
# Various time constants for the timeout table
62
_TMO_URGENT = 60 # one minute
63
_TMO_FAST = 5 * 60 # five minutes
64
_TMO_NORMAL = 15 * 60 # 15 minutes
65
_TMO_SLOW = 3600 # one hour
66
_TMO_4HRS = 4 * 3600
67
_TMO_1DAY = 86400
68

    
69
# Timeout table that will be built later by decorators
70
# Guidelines for choosing timeouts:
71
# - call used during watcher: timeout -> 1min, _TMO_URGENT
72
# - trivial (but be sure it is trivial) (e.g. reading a file): 5min, _TMO_FAST
73
# - other calls: 15 min, _TMO_NORMAL
74
# - special calls (instance add, etc.): either _TMO_SLOW (1h) or huge timeouts
75

    
76
_TIMEOUTS = {
77
}
78

    
79

    
80
def Init():
81
  """Initializes the module-global HTTP client manager.
82

83
  Must be called before using any RPC function and while exactly one thread is
84
  running.
85

86
  """
87
  # curl_global_init(3) and curl_global_cleanup(3) must be called with only
88
  # one thread running. This check is just a safety measure -- it doesn't
89
  # cover all cases.
90
  assert threading.activeCount() == 1, \
91
         "Found more than one active thread when initializing pycURL"
92

    
93
  logging.info("Using PycURL %s", pycurl.version)
94

    
95
  pycurl.global_init(pycurl.GLOBAL_ALL)
96

    
97

    
98
def Shutdown():
99
  """Stops the module-global HTTP client manager.
100

101
  Must be called before quitting the program and while exactly one thread is
102
  running.
103

104
  """
105
  pycurl.global_cleanup()
106

    
107

    
108
def _ConfigRpcCurl(curl):
109
  noded_cert = str(constants.NODED_CERT_FILE)
110

    
111
  curl.setopt(pycurl.FOLLOWLOCATION, False)
112
  curl.setopt(pycurl.CAINFO, noded_cert)
113
  curl.setopt(pycurl.SSL_VERIFYHOST, 0)
114
  curl.setopt(pycurl.SSL_VERIFYPEER, True)
115
  curl.setopt(pycurl.SSLCERTTYPE, "PEM")
116
  curl.setopt(pycurl.SSLCERT, noded_cert)
117
  curl.setopt(pycurl.SSLKEYTYPE, "PEM")
118
  curl.setopt(pycurl.SSLKEY, noded_cert)
119
  curl.setopt(pycurl.CONNECTTIMEOUT, _RPC_CONNECT_TIMEOUT)
120

    
121

    
122
# Aliasing this module avoids the following warning by epydoc: "Warning: No
123
# information available for ganeti.rpc._RpcThreadLocal's base threading.local"
124
_threading = threading
125

    
126

    
127
class _RpcThreadLocal(_threading.local):
128
  def GetHttpClientPool(self):
129
    """Returns a per-thread HTTP client pool.
130

131
    @rtype: L{http.client.HttpClientPool}
132

133
    """
134
    try:
135
      pool = self.hcp
136
    except AttributeError:
137
      pool = http.client.HttpClientPool(_ConfigRpcCurl)
138
      self.hcp = pool
139

    
140
    return pool
141

    
142

    
143
# Remove module alias (see above)
144
del _threading
145

    
146

    
147
_thread_local = _RpcThreadLocal()
148

    
149

    
150
def _RpcTimeout(secs):
151
  """Timeout decorator.
152

153
  When applied to a rpc call_* function, it updates the global timeout
154
  table with the given function/timeout.
155

156
  """
157
  def decorator(f):
158
    name = f.__name__
159
    assert name.startswith("call_")
160
    _TIMEOUTS[name[len("call_"):]] = secs
161
    return f
162
  return decorator
163

    
164

    
165
def RunWithRPC(fn):
166
  """RPC-wrapper decorator.
167

168
  When applied to a function, it runs it with the RPC system
169
  initialized, and it shutsdown the system afterwards. This means the
170
  function must be called without RPC being initialized.
171

172
  """
173
  def wrapper(*args, **kwargs):
174
    Init()
175
    try:
176
      return fn(*args, **kwargs)
177
    finally:
178
      Shutdown()
179
  return wrapper
180

    
181

    
182
class RpcResult(object):
183
  """RPC Result class.
184

185
  This class holds an RPC result. It is needed since in multi-node
186
  calls we can't raise an exception just because one one out of many
187
  failed, and therefore we use this class to encapsulate the result.
188

189
  @ivar data: the data payload, for successful results, or None
190
  @ivar call: the name of the RPC call
191
  @ivar node: the name of the node to which we made the call
192
  @ivar offline: whether the operation failed because the node was
193
      offline, as opposed to actual failure; offline=True will always
194
      imply failed=True, in order to allow simpler checking if
195
      the user doesn't care about the exact failure mode
196
  @ivar fail_msg: the error message if the call failed
197

198
  """
199
  def __init__(self, data=None, failed=False, offline=False,
200
               call=None, node=None):
201
    self.offline = offline
202
    self.call = call
203
    self.node = node
204

    
205
    if offline:
206
      self.fail_msg = "Node is marked offline"
207
      self.data = self.payload = None
208
    elif failed:
209
      self.fail_msg = self._EnsureErr(data)
210
      self.data = self.payload = None
211
    else:
212
      self.data = data
213
      if not isinstance(self.data, (tuple, list)):
214
        self.fail_msg = ("RPC layer error: invalid result type (%s)" %
215
                         type(self.data))
216
        self.payload = None
217
      elif len(data) != 2:
218
        self.fail_msg = ("RPC layer error: invalid result length (%d), "
219
                         "expected 2" % len(self.data))
220
        self.payload = None
221
      elif not self.data[0]:
222
        self.fail_msg = self._EnsureErr(self.data[1])
223
        self.payload = None
224
      else:
225
        # finally success
226
        self.fail_msg = None
227
        self.payload = data[1]
228

    
229
    assert hasattr(self, "call")
230
    assert hasattr(self, "data")
231
    assert hasattr(self, "fail_msg")
232
    assert hasattr(self, "node")
233
    assert hasattr(self, "offline")
234
    assert hasattr(self, "payload")
235

    
236
  @staticmethod
237
  def _EnsureErr(val):
238
    """Helper to ensure we return a 'True' value for error."""
239
    if val:
240
      return val
241
    else:
242
      return "No error information"
243

    
244
  def Raise(self, msg, prereq=False, ecode=None):
245
    """If the result has failed, raise an OpExecError.
246

247
    This is used so that LU code doesn't have to check for each
248
    result, but instead can call this function.
249

250
    """
251
    if not self.fail_msg:
252
      return
253

    
254
    if not msg: # one could pass None for default message
255
      msg = ("Call '%s' to node '%s' has failed: %s" %
256
             (self.call, self.node, self.fail_msg))
257
    else:
258
      msg = "%s: %s" % (msg, self.fail_msg)
259
    if prereq:
260
      ec = errors.OpPrereqError
261
    else:
262
      ec = errors.OpExecError
263
    if ecode is not None:
264
      args = (msg, ecode)
265
    else:
266
      args = (msg, )
267
    raise ec(*args) # pylint: disable-msg=W0142
268

    
269

    
270
def _AddressLookup(node_list,
271
                   ssc=ssconf.SimpleStore,
272
                   nslookup_fn=netutils.Hostname.GetIP):
273
  """Return addresses for given node names.
274

275
  @type node_list: list
276
  @param node_list: List of node names
277
  @type ssc: class
278
  @param ssc: SimpleStore class that is used to obtain node->ip mappings
279
  @type nslookup_fn: callable
280
  @param nslookup_fn: function use to do NS lookup
281
  @rtype: list of addresses and/or None's
282
  @returns: List of corresponding addresses, if found
283

284
  """
285
  ss = ssc()
286
  iplist = ss.GetNodePrimaryIPList()
287
  family = ss.GetPrimaryIPFamily()
288
  addresses = []
289
  ipmap = dict(entry.split() for entry in iplist)
290
  for node in node_list:
291
    address = ipmap.get(node)
292
    if address is None:
293
      address = nslookup_fn(node, family=family)
294
    addresses.append(address)
295

    
296
  return addresses
297

    
298

    
299
class Client:
300
  """RPC Client class.
301

302
  This class, given a (remote) method name, a list of parameters and a
303
  list of nodes, will contact (in parallel) all nodes, and return a
304
  dict of results (key: node name, value: result).
305

306
  One current bug is that generic failure is still signaled by
307
  'False' result, which is not good. This overloading of values can
308
  cause bugs.
309

310
  """
311
  def __init__(self, procedure, body, port, address_lookup_fn=_AddressLookup):
312
    assert procedure in _TIMEOUTS, ("New RPC call not declared in the"
313
                                    " timeouts table")
314
    self.procedure = procedure
315
    self.body = body
316
    self.port = port
317
    self._request = {}
318
    self._address_lookup_fn = address_lookup_fn
319

    
320
  def ConnectList(self, node_list, address_list=None, read_timeout=None):
321
    """Add a list of nodes to the target nodes.
322

323
    @type node_list: list
324
    @param node_list: the list of node names to connect
325
    @type address_list: list or None
326
    @keyword address_list: either None or a list with node addresses,
327
        which must have the same length as the node list
328
    @type read_timeout: int
329
    @param read_timeout: overwrites default timeout for operation
330

331
    """
332
    if address_list is None:
333
      # Always use IP address instead of node name
334
      address_list = self._address_lookup_fn(node_list)
335

    
336
    assert len(node_list) == len(address_list), \
337
           "Name and address lists must have the same length"
338

    
339
    for node, address in zip(node_list, address_list):
340
      self.ConnectNode(node, address, read_timeout=read_timeout)
341

    
342
  def ConnectNode(self, name, address=None, read_timeout=None):
343
    """Add a node to the target list.
344

345
    @type name: str
346
    @param name: the node name
347
    @type address: str
348
    @param address: the node address, if known
349
    @type read_timeout: int
350
    @param read_timeout: overwrites default timeout for operation
351

352
    """
353
    if address is None:
354
      # Always use IP address instead of node name
355
      address = self._address_lookup_fn([name])[0]
356

    
357
    assert(address is not None)
358

    
359
    if read_timeout is None:
360
      read_timeout = _TIMEOUTS[self.procedure]
361

    
362
    self._request[name] = \
363
      http.client.HttpClientRequest(str(address), self.port,
364
                                    http.HTTP_PUT, str("/%s" % self.procedure),
365
                                    headers=_RPC_CLIENT_HEADERS,
366
                                    post_data=str(self.body),
367
                                    read_timeout=read_timeout)
368

    
369
  def GetResults(self, http_pool=None):
370
    """Call nodes and return results.
371

372
    @rtype: list
373
    @return: List of RPC results
374

375
    """
376
    if not http_pool:
377
      http_pool = _thread_local.GetHttpClientPool()
378

    
379
    http_pool.ProcessRequests(self._request.values())
380

    
381
    results = {}
382

    
383
    for name, req in self._request.iteritems():
384
      if req.success and req.resp_status_code == http.HTTP_OK:
385
        results[name] = RpcResult(data=serializer.LoadJson(req.resp_body),
386
                                  node=name, call=self.procedure)
387
        continue
388

    
389
      # TODO: Better error reporting
390
      if req.error:
391
        msg = req.error
392
      else:
393
        msg = req.resp_body
394

    
395
      logging.error("RPC error in %s from node %s: %s",
396
                    self.procedure, name, msg)
397
      results[name] = RpcResult(data=msg, failed=True, node=name,
398
                                call=self.procedure)
399

    
400
    return results
401

    
402

    
403
def _EncodeImportExportIO(ieio, ieioargs):
404
  """Encodes import/export I/O information.
405

406
  """
407
  if ieio == constants.IEIO_RAW_DISK:
408
    assert len(ieioargs) == 1
409
    return (ieioargs[0].ToDict(), )
410

    
411
  if ieio == constants.IEIO_SCRIPT:
412
    assert len(ieioargs) == 2
413
    return (ieioargs[0].ToDict(), ieioargs[1])
414

    
415
  return ieioargs
416

    
417

    
418
class RpcRunner(object):
419
  """RPC runner class"""
420

    
421
  def __init__(self, cfg):
422
    """Initialized the rpc runner.
423

424
    @type cfg:  C{config.ConfigWriter}
425
    @param cfg: the configuration object that will be used to get data
426
                about the cluster
427

428
    """
429
    self._cfg = cfg
430
    self.port = netutils.GetDaemonPort(constants.NODED)
431

    
432
  def _InstDict(self, instance, hvp=None, bep=None, osp=None):
433
    """Convert the given instance to a dict.
434

435
    This is done via the instance's ToDict() method and additionally
436
    we fill the hvparams with the cluster defaults.
437

438
    @type instance: L{objects.Instance}
439
    @param instance: an Instance object
440
    @type hvp: dict or None
441
    @param hvp: a dictionary with overridden hypervisor parameters
442
    @type bep: dict or None
443
    @param bep: a dictionary with overridden backend parameters
444
    @type osp: dict or None
445
    @param osp: a dictionary with overridden os parameters
446
    @rtype: dict
447
    @return: the instance dict, with the hvparams filled with the
448
        cluster defaults
449

450
    """
451
    idict = instance.ToDict()
452
    cluster = self._cfg.GetClusterInfo()
453
    idict["hvparams"] = cluster.FillHV(instance)
454
    if hvp is not None:
455
      idict["hvparams"].update(hvp)
456
    idict["beparams"] = cluster.FillBE(instance)
457
    if bep is not None:
458
      idict["beparams"].update(bep)
459
    idict["osparams"] = cluster.SimpleFillOS(instance.os, instance.osparams)
460
    if osp is not None:
461
      idict["osparams"].update(osp)
462
    for nic in idict["nics"]:
463
      nic['nicparams'] = objects.FillDict(
464
        cluster.nicparams[constants.PP_DEFAULT],
465
        nic['nicparams'])
466
    return idict
467

    
468
  def _ConnectList(self, client, node_list, call, read_timeout=None):
469
    """Helper for computing node addresses.
470

471
    @type client: L{ganeti.rpc.Client}
472
    @param client: a C{Client} instance
473
    @type node_list: list
474
    @param node_list: the node list we should connect
475
    @type call: string
476
    @param call: the name of the remote procedure call, for filling in
477
        correctly any eventual offline nodes' results
478
    @type read_timeout: int
479
    @param read_timeout: overwrites the default read timeout for the
480
        given operation
481

482
    """
483
    all_nodes = self._cfg.GetAllNodesInfo()
484
    name_list = []
485
    addr_list = []
486
    skip_dict = {}
487
    for node in node_list:
488
      if node in all_nodes:
489
        if all_nodes[node].offline:
490
          skip_dict[node] = RpcResult(node=node, offline=True, call=call)
491
          continue
492
        val = all_nodes[node].primary_ip
493
      else:
494
        val = None
495
      addr_list.append(val)
496
      name_list.append(node)
497
    if name_list:
498
      client.ConnectList(name_list, address_list=addr_list,
499
                         read_timeout=read_timeout)
500
    return skip_dict
501

    
502
  def _ConnectNode(self, client, node, call, read_timeout=None):
503
    """Helper for computing one node's address.
504

505
    @type client: L{ganeti.rpc.Client}
506
    @param client: a C{Client} instance
507
    @type node: str
508
    @param node: the node we should connect
509
    @type call: string
510
    @param call: the name of the remote procedure call, for filling in
511
        correctly any eventual offline nodes' results
512
    @type read_timeout: int
513
    @param read_timeout: overwrites the default read timeout for the
514
        given operation
515

516
    """
517
    node_info = self._cfg.GetNodeInfo(node)
518
    if node_info is not None:
519
      if node_info.offline:
520
        return RpcResult(node=node, offline=True, call=call)
521
      addr = node_info.primary_ip
522
    else:
523
      addr = None
524
    client.ConnectNode(node, address=addr, read_timeout=read_timeout)
525

    
526
  def _MultiNodeCall(self, node_list, procedure, args, read_timeout=None):
527
    """Helper for making a multi-node call
528

529
    """
530
    body = serializer.DumpJson(args, indent=False)
531
    c = Client(procedure, body, self.port)
532
    skip_dict = self._ConnectList(c, node_list, procedure,
533
                                  read_timeout=read_timeout)
534
    skip_dict.update(c.GetResults())
535
    return skip_dict
536

    
537
  @classmethod
538
  def _StaticMultiNodeCall(cls, node_list, procedure, args,
539
                           address_list=None, read_timeout=None):
540
    """Helper for making a multi-node static call
541

542
    """
543
    body = serializer.DumpJson(args, indent=False)
544
    c = Client(procedure, body, netutils.GetDaemonPort(constants.NODED))
545
    c.ConnectList(node_list, address_list=address_list,
546
                  read_timeout=read_timeout)
547
    return c.GetResults()
548

    
549
  def _SingleNodeCall(self, node, procedure, args, read_timeout=None):
550
    """Helper for making a single-node call
551

552
    """
553
    body = serializer.DumpJson(args, indent=False)
554
    c = Client(procedure, body, self.port)
555
    result = self._ConnectNode(c, node, procedure, read_timeout=read_timeout)
556
    if result is None:
557
      # we did connect, node is not offline
558
      result = c.GetResults()[node]
559
    return result
560

    
561
  @classmethod
562
  def _StaticSingleNodeCall(cls, node, procedure, args, read_timeout=None):
563
    """Helper for making a single-node static call
564

565
    """
566
    body = serializer.DumpJson(args, indent=False)
567
    c = Client(procedure, body, netutils.GetDaemonPort(constants.NODED))
568
    c.ConnectNode(node, read_timeout=read_timeout)
569
    return c.GetResults()[node]
570

    
571
  @staticmethod
572
  def _Compress(data):
573
    """Compresses a string for transport over RPC.
574

575
    Small amounts of data are not compressed.
576

577
    @type data: str
578
    @param data: Data
579
    @rtype: tuple
580
    @return: Encoded data to send
581

582
    """
583
    # Small amounts of data are not compressed
584
    if len(data) < 512:
585
      return (constants.RPC_ENCODING_NONE, data)
586

    
587
    # Compress with zlib and encode in base64
588
    return (constants.RPC_ENCODING_ZLIB_BASE64,
589
            base64.b64encode(zlib.compress(data, 3)))
590

    
591
  #
592
  # Begin RPC calls
593
  #
594

    
595
  @_RpcTimeout(_TMO_URGENT)
596
  def call_lv_list(self, node_list, vg_name):
597
    """Gets the logical volumes present in a given volume group.
598

599
    This is a multi-node call.
600

601
    """
602
    return self._MultiNodeCall(node_list, "lv_list", [vg_name])
603

    
604
  @_RpcTimeout(_TMO_URGENT)
605
  def call_vg_list(self, node_list):
606
    """Gets the volume group list.
607

608
    This is a multi-node call.
609

610
    """
611
    return self._MultiNodeCall(node_list, "vg_list", [])
612

    
613
  @_RpcTimeout(_TMO_NORMAL)
614
  def call_storage_list(self, node_list, su_name, su_args, name, fields):
615
    """Get list of storage units.
616

617
    This is a multi-node call.
618

619
    """
620
    return self._MultiNodeCall(node_list, "storage_list",
621
                               [su_name, su_args, name, fields])
622

    
623
  @_RpcTimeout(_TMO_NORMAL)
624
  def call_storage_modify(self, node, su_name, su_args, name, changes):
625
    """Modify a storage unit.
626

627
    This is a single-node call.
628

629
    """
630
    return self._SingleNodeCall(node, "storage_modify",
631
                                [su_name, su_args, name, changes])
632

    
633
  @_RpcTimeout(_TMO_NORMAL)
634
  def call_storage_execute(self, node, su_name, su_args, name, op):
635
    """Executes an operation on a storage unit.
636

637
    This is a single-node call.
638

639
    """
640
    return self._SingleNodeCall(node, "storage_execute",
641
                                [su_name, su_args, name, op])
642

    
643
  @_RpcTimeout(_TMO_URGENT)
644
  def call_bridges_exist(self, node, bridges_list):
645
    """Checks if a node has all the bridges given.
646

647
    This method checks if all bridges given in the bridges_list are
648
    present on the remote node, so that an instance that uses interfaces
649
    on those bridges can be started.
650

651
    This is a single-node call.
652

653
    """
654
    return self._SingleNodeCall(node, "bridges_exist", [bridges_list])
655

    
656
  @_RpcTimeout(_TMO_NORMAL)
657
  def call_instance_start(self, node, instance, hvp, bep):
658
    """Starts an instance.
659

660
    This is a single-node call.
661

662
    """
663
    idict = self._InstDict(instance, hvp=hvp, bep=bep)
664
    return self._SingleNodeCall(node, "instance_start", [idict])
665

    
666
  @_RpcTimeout(_TMO_NORMAL)
667
  def call_instance_shutdown(self, node, instance, timeout):
668
    """Stops an instance.
669

670
    This is a single-node call.
671

672
    """
673
    return self._SingleNodeCall(node, "instance_shutdown",
674
                                [self._InstDict(instance), timeout])
675

    
676
  @_RpcTimeout(_TMO_NORMAL)
677
  def call_migration_info(self, node, instance):
678
    """Gather the information necessary to prepare an instance migration.
679

680
    This is a single-node call.
681

682
    @type node: string
683
    @param node: the node on which the instance is currently running
684
    @type instance: C{objects.Instance}
685
    @param instance: the instance definition
686

687
    """
688
    return self._SingleNodeCall(node, "migration_info",
689
                                [self._InstDict(instance)])
690

    
691
  @_RpcTimeout(_TMO_NORMAL)
692
  def call_accept_instance(self, node, instance, info, target):
693
    """Prepare a node to accept an instance.
694

695
    This is a single-node call.
696

697
    @type node: string
698
    @param node: the target node for the migration
699
    @type instance: C{objects.Instance}
700
    @param instance: the instance definition
701
    @type info: opaque/hypervisor specific (string/data)
702
    @param info: result for the call_migration_info call
703
    @type target: string
704
    @param target: target hostname (usually ip address) (on the node itself)
705

706
    """
707
    return self._SingleNodeCall(node, "accept_instance",
708
                                [self._InstDict(instance), info, target])
709

    
710
  @_RpcTimeout(_TMO_NORMAL)
711
  def call_finalize_migration(self, node, instance, info, success):
712
    """Finalize any target-node migration specific operation.
713

714
    This is called both in case of a successful migration and in case of error
715
    (in which case it should abort the migration).
716

717
    This is a single-node call.
718

719
    @type node: string
720
    @param node: the target node for the migration
721
    @type instance: C{objects.Instance}
722
    @param instance: the instance definition
723
    @type info: opaque/hypervisor specific (string/data)
724
    @param info: result for the call_migration_info call
725
    @type success: boolean
726
    @param success: whether the migration was a success or a failure
727

728
    """
729
    return self._SingleNodeCall(node, "finalize_migration",
730
                                [self._InstDict(instance), info, success])
731

    
732
  @_RpcTimeout(_TMO_SLOW)
733
  def call_instance_migrate(self, node, instance, target, live):
734
    """Migrate an instance.
735

736
    This is a single-node call.
737

738
    @type node: string
739
    @param node: the node on which the instance is currently running
740
    @type instance: C{objects.Instance}
741
    @param instance: the instance definition
742
    @type target: string
743
    @param target: the target node name
744
    @type live: boolean
745
    @param live: whether the migration should be done live or not (the
746
        interpretation of this parameter is left to the hypervisor)
747

748
    """
749
    return self._SingleNodeCall(node, "instance_migrate",
750
                                [self._InstDict(instance), target, live])
751

    
752
  @_RpcTimeout(_TMO_NORMAL)
753
  def call_instance_reboot(self, node, inst, reboot_type, shutdown_timeout):
754
    """Reboots an instance.
755

756
    This is a single-node call.
757

758
    """
759
    return self._SingleNodeCall(node, "instance_reboot",
760
                                [self._InstDict(inst), reboot_type,
761
                                 shutdown_timeout])
762

    
763
  @_RpcTimeout(_TMO_1DAY)
764
  def call_instance_os_add(self, node, inst, reinstall, debug, osparams=None):
765
    """Installs an OS on the given instance.
766

767
    This is a single-node call.
768

769
    """
770
    return self._SingleNodeCall(node, "instance_os_add",
771
                                [self._InstDict(inst, osp=osparams),
772
                                 reinstall, debug])
773

    
774
  @_RpcTimeout(_TMO_SLOW)
775
  def call_instance_run_rename(self, node, inst, old_name, debug):
776
    """Run the OS rename script for an instance.
777

778
    This is a single-node call.
779

780
    """
781
    return self._SingleNodeCall(node, "instance_run_rename",
782
                                [self._InstDict(inst), old_name, debug])
783

    
784
  @_RpcTimeout(_TMO_URGENT)
785
  def call_instance_info(self, node, instance, hname):
786
    """Returns information about a single instance.
787

788
    This is a single-node call.
789

790
    @type node: list
791
    @param node: the list of nodes to query
792
    @type instance: string
793
    @param instance: the instance name
794
    @type hname: string
795
    @param hname: the hypervisor type of the instance
796

797
    """
798
    return self._SingleNodeCall(node, "instance_info", [instance, hname])
799

    
800
  @_RpcTimeout(_TMO_NORMAL)
801
  def call_instance_migratable(self, node, instance):
802
    """Checks whether the given instance can be migrated.
803

804
    This is a single-node call.
805

806
    @param node: the node to query
807
    @type instance: L{objects.Instance}
808
    @param instance: the instance to check
809

810

811
    """
812
    return self._SingleNodeCall(node, "instance_migratable",
813
                                [self._InstDict(instance)])
814

    
815
  @_RpcTimeout(_TMO_URGENT)
816
  def call_all_instances_info(self, node_list, hypervisor_list):
817
    """Returns information about all instances on the given nodes.
818

819
    This is a multi-node call.
820

821
    @type node_list: list
822
    @param node_list: the list of nodes to query
823
    @type hypervisor_list: list
824
    @param hypervisor_list: the hypervisors to query for instances
825

826
    """
827
    return self._MultiNodeCall(node_list, "all_instances_info",
828
                               [hypervisor_list])
829

    
830
  @_RpcTimeout(_TMO_URGENT)
831
  def call_instance_list(self, node_list, hypervisor_list):
832
    """Returns the list of running instances on a given node.
833

834
    This is a multi-node call.
835

836
    @type node_list: list
837
    @param node_list: the list of nodes to query
838
    @type hypervisor_list: list
839
    @param hypervisor_list: the hypervisors to query for instances
840

841
    """
842
    return self._MultiNodeCall(node_list, "instance_list", [hypervisor_list])
843

    
844
  @_RpcTimeout(_TMO_FAST)
845
  def call_node_tcp_ping(self, node, source, target, port, timeout,
846
                         live_port_needed):
847
    """Do a TcpPing on the remote node
848

849
    This is a single-node call.
850

851
    """
852
    return self._SingleNodeCall(node, "node_tcp_ping",
853
                                [source, target, port, timeout,
854
                                 live_port_needed])
855

    
856
  @_RpcTimeout(_TMO_FAST)
857
  def call_node_has_ip_address(self, node, address):
858
    """Checks if a node has the given IP address.
859

860
    This is a single-node call.
861

862
    """
863
    return self._SingleNodeCall(node, "node_has_ip_address", [address])
864

    
865
  @_RpcTimeout(_TMO_URGENT)
866
  def call_node_info(self, node_list, vg_name, hypervisor_type):
867
    """Return node information.
868

869
    This will return memory information and volume group size and free
870
    space.
871

872
    This is a multi-node call.
873

874
    @type node_list: list
875
    @param node_list: the list of nodes to query
876
    @type vg_name: C{string}
877
    @param vg_name: the name of the volume group to ask for disk space
878
        information
879
    @type hypervisor_type: C{str}
880
    @param hypervisor_type: the name of the hypervisor to ask for
881
        memory information
882

883
    """
884
    return self._MultiNodeCall(node_list, "node_info",
885
                               [vg_name, hypervisor_type])
886

    
887
  @_RpcTimeout(_TMO_NORMAL)
888
  def call_etc_hosts_modify(self, node, mode, name, ip):
889
    """Modify hosts file with name
890

891
    @type node: string
892
    @param node: The node to call
893
    @type mode: string
894
    @param mode: The mode to operate. Currently "add" or "remove"
895
    @type name: string
896
    @param name: The host name to be modified
897
    @type ip: string
898
    @param ip: The ip of the entry (just valid if mode is "add")
899

900
    """
901
    return self._SingleNodeCall(node, "etc_hosts_modify", [mode, name, ip])
902

    
903
  @_RpcTimeout(_TMO_NORMAL)
904
  def call_node_verify(self, node_list, checkdict, cluster_name):
905
    """Request verification of given parameters.
906

907
    This is a multi-node call.
908

909
    """
910
    return self._MultiNodeCall(node_list, "node_verify",
911
                               [checkdict, cluster_name])
912

    
913
  @classmethod
914
  @_RpcTimeout(_TMO_FAST)
915
  def call_node_start_master(cls, node, start_daemons, no_voting):
916
    """Tells a node to activate itself as a master.
917

918
    This is a single-node call.
919

920
    """
921
    return cls._StaticSingleNodeCall(node, "node_start_master",
922
                                     [start_daemons, no_voting])
923

    
924
  @classmethod
925
  @_RpcTimeout(_TMO_FAST)
926
  def call_node_stop_master(cls, node, stop_daemons):
927
    """Tells a node to demote itself from master status.
928

929
    This is a single-node call.
930

931
    """
932
    return cls._StaticSingleNodeCall(node, "node_stop_master", [stop_daemons])
933

    
934
  @classmethod
935
  @_RpcTimeout(_TMO_URGENT)
936
  def call_master_info(cls, node_list):
937
    """Query master info.
938

939
    This is a multi-node call.
940

941
    """
942
    # TODO: should this method query down nodes?
943
    return cls._StaticMultiNodeCall(node_list, "master_info", [])
944

    
945
  @classmethod
946
  @_RpcTimeout(_TMO_URGENT)
947
  def call_version(cls, node_list):
948
    """Query node version.
949

950
    This is a multi-node call.
951

952
    """
953
    return cls._StaticMultiNodeCall(node_list, "version", [])
954

    
955
  @_RpcTimeout(_TMO_NORMAL)
956
  def call_blockdev_create(self, node, bdev, size, owner, on_primary, info):
957
    """Request creation of a given block device.
958

959
    This is a single-node call.
960

961
    """
962
    return self._SingleNodeCall(node, "blockdev_create",
963
                                [bdev.ToDict(), size, owner, on_primary, info])
964

    
965
  @_RpcTimeout(_TMO_SLOW)
966
  def call_blockdev_wipe(self, node, bdev, offset, size):
967
    """Request wipe at given offset with given size of a block device.
968

969
    This is a single-node call.
970

971
    """
972
    return self._SingleNodeCall(node, "blockdev_wipe",
973
                                [bdev.ToDict(), offset, size])
974

    
975
  @_RpcTimeout(_TMO_NORMAL)
976
  def call_blockdev_remove(self, node, bdev):
977
    """Request removal of a given block device.
978

979
    This is a single-node call.
980

981
    """
982
    return self._SingleNodeCall(node, "blockdev_remove", [bdev.ToDict()])
983

    
984
  @_RpcTimeout(_TMO_NORMAL)
985
  def call_blockdev_rename(self, node, devlist):
986
    """Request rename of the given block devices.
987

988
    This is a single-node call.
989

990
    """
991
    return self._SingleNodeCall(node, "blockdev_rename",
992
                                [(d.ToDict(), uid) for d, uid in devlist])
993

    
994
  @_RpcTimeout(_TMO_NORMAL)
995
  def call_blockdev_assemble(self, node, disk, owner, on_primary):
996
    """Request assembling of a given block device.
997

998
    This is a single-node call.
999

1000
    """
1001
    return self._SingleNodeCall(node, "blockdev_assemble",
1002
                                [disk.ToDict(), owner, on_primary])
1003

    
1004
  @_RpcTimeout(_TMO_NORMAL)
1005
  def call_blockdev_shutdown(self, node, disk):
1006
    """Request shutdown of a given block device.
1007

1008
    This is a single-node call.
1009

1010
    """
1011
    return self._SingleNodeCall(node, "blockdev_shutdown", [disk.ToDict()])
1012

    
1013
  @_RpcTimeout(_TMO_NORMAL)
1014
  def call_blockdev_addchildren(self, node, bdev, ndevs):
1015
    """Request adding a list of children to a (mirroring) device.
1016

1017
    This is a single-node call.
1018

1019
    """
1020
    return self._SingleNodeCall(node, "blockdev_addchildren",
1021
                                [bdev.ToDict(),
1022
                                 [disk.ToDict() for disk in ndevs]])
1023

    
1024
  @_RpcTimeout(_TMO_NORMAL)
1025
  def call_blockdev_removechildren(self, node, bdev, ndevs):
1026
    """Request removing a list of children from a (mirroring) device.
1027

1028
    This is a single-node call.
1029

1030
    """
1031
    return self._SingleNodeCall(node, "blockdev_removechildren",
1032
                                [bdev.ToDict(),
1033
                                 [disk.ToDict() for disk in ndevs]])
1034

    
1035
  @_RpcTimeout(_TMO_NORMAL)
1036
  def call_blockdev_getmirrorstatus(self, node, disks):
1037
    """Request status of a (mirroring) device.
1038

1039
    This is a single-node call.
1040

1041
    """
1042
    result = self._SingleNodeCall(node, "blockdev_getmirrorstatus",
1043
                                  [dsk.ToDict() for dsk in disks])
1044
    if not result.fail_msg:
1045
      result.payload = [objects.BlockDevStatus.FromDict(i)
1046
                        for i in result.payload]
1047
    return result
1048

    
1049
  @_RpcTimeout(_TMO_NORMAL)
1050
  def call_blockdev_getmirrorstatus_multi(self, node_list, node_disks):
1051
    """Request status of (mirroring) devices from multiple nodes.
1052

1053
    This is a multi-node call.
1054

1055
    """
1056
    result = self._MultiNodeCall(node_list, "blockdev_getmirrorstatus_multi",
1057
                                 [dict((name, [dsk.ToDict() for dsk in disks])
1058
                                       for name, disks in node_disks.items())])
1059
    for nres in result.values():
1060
      if nres.fail_msg:
1061
        continue
1062

    
1063
      for idx, (success, status) in enumerate(nres.payload):
1064
        if success:
1065
          nres.payload[idx] = (success, objects.BlockDevStatus.FromDict(status))
1066

    
1067
    return result
1068

    
1069
  @_RpcTimeout(_TMO_NORMAL)
1070
  def call_blockdev_find(self, node, disk):
1071
    """Request identification of a given block device.
1072

1073
    This is a single-node call.
1074

1075
    """
1076
    result = self._SingleNodeCall(node, "blockdev_find", [disk.ToDict()])
1077
    if not result.fail_msg and result.payload is not None:
1078
      result.payload = objects.BlockDevStatus.FromDict(result.payload)
1079
    return result
1080

    
1081
  @_RpcTimeout(_TMO_NORMAL)
1082
  def call_blockdev_close(self, node, instance_name, disks):
1083
    """Closes the given block devices.
1084

1085
    This is a single-node call.
1086

1087
    """
1088
    params = [instance_name, [cf.ToDict() for cf in disks]]
1089
    return self._SingleNodeCall(node, "blockdev_close", params)
1090

    
1091
  @_RpcTimeout(_TMO_NORMAL)
1092
  def call_blockdev_getsizes(self, node, disks):
1093
    """Returns the size of the given disks.
1094

1095
    This is a single-node call.
1096

1097
    """
1098
    params = [[cf.ToDict() for cf in disks]]
1099
    return self._SingleNodeCall(node, "blockdev_getsize", params)
1100

    
1101
  @_RpcTimeout(_TMO_NORMAL)
1102
  def call_drbd_disconnect_net(self, node_list, nodes_ip, disks):
1103
    """Disconnects the network of the given drbd devices.
1104

1105
    This is a multi-node call.
1106

1107
    """
1108
    return self._MultiNodeCall(node_list, "drbd_disconnect_net",
1109
                               [nodes_ip, [cf.ToDict() for cf in disks]])
1110

    
1111
  @_RpcTimeout(_TMO_NORMAL)
1112
  def call_drbd_attach_net(self, node_list, nodes_ip,
1113
                           disks, instance_name, multimaster):
1114
    """Disconnects the given drbd devices.
1115

1116
    This is a multi-node call.
1117

1118
    """
1119
    return self._MultiNodeCall(node_list, "drbd_attach_net",
1120
                               [nodes_ip, [cf.ToDict() for cf in disks],
1121
                                instance_name, multimaster])
1122

    
1123
  @_RpcTimeout(_TMO_SLOW)
1124
  def call_drbd_wait_sync(self, node_list, nodes_ip, disks):
1125
    """Waits for the synchronization of drbd devices is complete.
1126

1127
    This is a multi-node call.
1128

1129
    """
1130
    return self._MultiNodeCall(node_list, "drbd_wait_sync",
1131
                               [nodes_ip, [cf.ToDict() for cf in disks]])
1132

    
1133
  @_RpcTimeout(_TMO_URGENT)
1134
  def call_drbd_helper(self, node_list):
1135
    """Gets drbd helper.
1136

1137
    This is a multi-node call.
1138

1139
    """
1140
    return self._MultiNodeCall(node_list, "drbd_helper", [])
1141

    
1142
  @classmethod
1143
  @_RpcTimeout(_TMO_NORMAL)
1144
  def call_upload_file(cls, node_list, file_name, address_list=None):
1145
    """Upload a file.
1146

1147
    The node will refuse the operation in case the file is not on the
1148
    approved file list.
1149

1150
    This is a multi-node call.
1151

1152
    @type node_list: list
1153
    @param node_list: the list of node names to upload to
1154
    @type file_name: str
1155
    @param file_name: the filename to upload
1156
    @type address_list: list or None
1157
    @keyword address_list: an optional list of node addresses, in order
1158
        to optimize the RPC speed
1159

1160
    """
1161
    file_contents = utils.ReadFile(file_name)
1162
    data = cls._Compress(file_contents)
1163
    st = os.stat(file_name)
1164
    params = [file_name, data, st.st_mode, st.st_uid, st.st_gid,
1165
              st.st_atime, st.st_mtime]
1166
    return cls._StaticMultiNodeCall(node_list, "upload_file", params,
1167
                                    address_list=address_list)
1168

    
1169
  @classmethod
1170
  @_RpcTimeout(_TMO_NORMAL)
1171
  def call_write_ssconf_files(cls, node_list, values):
1172
    """Write ssconf files.
1173

1174
    This is a multi-node call.
1175

1176
    """
1177
    return cls._StaticMultiNodeCall(node_list, "write_ssconf_files", [values])
1178

    
1179
  @_RpcTimeout(_TMO_NORMAL)
1180
  def call_run_oob(self, node, oob_program, command, remote_node):
1181
    """Runs OOB.
1182

1183
    This is a single-node call.
1184

1185
    """
1186
    return self._SingleNodeCall(node, "run_oob", [oob_program, command,
1187
                                                  remote_node])
1188

    
1189
  @_RpcTimeout(_TMO_FAST)
1190
  def call_os_diagnose(self, node_list):
1191
    """Request a diagnose of OS definitions.
1192

1193
    This is a multi-node call.
1194

1195
    """
1196
    return self._MultiNodeCall(node_list, "os_diagnose", [])
1197

    
1198
  @_RpcTimeout(_TMO_FAST)
1199
  def call_os_get(self, node, name):
1200
    """Returns an OS definition.
1201

1202
    This is a single-node call.
1203

1204
    """
1205
    result = self._SingleNodeCall(node, "os_get", [name])
1206
    if not result.fail_msg and isinstance(result.payload, dict):
1207
      result.payload = objects.OS.FromDict(result.payload)
1208
    return result
1209

    
1210
  @_RpcTimeout(_TMO_FAST)
1211
  def call_os_validate(self, required, nodes, name, checks, params):
1212
    """Run a validation routine for a given OS.
1213

1214
    This is a multi-node call.
1215

1216
    """
1217
    return self._MultiNodeCall(nodes, "os_validate",
1218
                               [required, name, checks, params])
1219

    
1220
  @_RpcTimeout(_TMO_NORMAL)
1221
  def call_hooks_runner(self, node_list, hpath, phase, env):
1222
    """Call the hooks runner.
1223

1224
    Args:
1225
      - op: the OpCode instance
1226
      - env: a dictionary with the environment
1227

1228
    This is a multi-node call.
1229

1230
    """
1231
    params = [hpath, phase, env]
1232
    return self._MultiNodeCall(node_list, "hooks_runner", params)
1233

    
1234
  @_RpcTimeout(_TMO_NORMAL)
1235
  def call_iallocator_runner(self, node, name, idata):
1236
    """Call an iallocator on a remote node
1237

1238
    Args:
1239
      - name: the iallocator name
1240
      - input: the json-encoded input string
1241

1242
    This is a single-node call.
1243

1244
    """
1245
    return self._SingleNodeCall(node, "iallocator_runner", [name, idata])
1246

    
1247
  @_RpcTimeout(_TMO_NORMAL)
1248
  def call_blockdev_grow(self, node, cf_bdev, amount):
1249
    """Request a snapshot of the given block device.
1250

1251
    This is a single-node call.
1252

1253
    """
1254
    return self._SingleNodeCall(node, "blockdev_grow",
1255
                                [cf_bdev.ToDict(), amount])
1256

    
1257
  @_RpcTimeout(_TMO_1DAY)
1258
  def call_blockdev_export(self, node, cf_bdev,
1259
                           dest_node, dest_path, cluster_name):
1260
    """Export a given disk to another node.
1261

1262
    This is a single-node call.
1263

1264
    """
1265
    return self._SingleNodeCall(node, "blockdev_export",
1266
                                [cf_bdev.ToDict(), dest_node, dest_path,
1267
                                 cluster_name])
1268

    
1269
  @_RpcTimeout(_TMO_NORMAL)
1270
  def call_blockdev_snapshot(self, node, cf_bdev):
1271
    """Request a snapshot of the given block device.
1272

1273
    This is a single-node call.
1274

1275
    """
1276
    return self._SingleNodeCall(node, "blockdev_snapshot", [cf_bdev.ToDict()])
1277

    
1278
  @_RpcTimeout(_TMO_NORMAL)
1279
  def call_finalize_export(self, node, instance, snap_disks):
1280
    """Request the completion of an export operation.
1281

1282
    This writes the export config file, etc.
1283

1284
    This is a single-node call.
1285

1286
    """
1287
    flat_disks = []
1288
    for disk in snap_disks:
1289
      if isinstance(disk, bool):
1290
        flat_disks.append(disk)
1291
      else:
1292
        flat_disks.append(disk.ToDict())
1293

    
1294
    return self._SingleNodeCall(node, "finalize_export",
1295
                                [self._InstDict(instance), flat_disks])
1296

    
1297
  @_RpcTimeout(_TMO_FAST)
1298
  def call_export_info(self, node, path):
1299
    """Queries the export information in a given path.
1300

1301
    This is a single-node call.
1302

1303
    """
1304
    return self._SingleNodeCall(node, "export_info", [path])
1305

    
1306
  @_RpcTimeout(_TMO_FAST)
1307
  def call_export_list(self, node_list):
1308
    """Gets the stored exports list.
1309

1310
    This is a multi-node call.
1311

1312
    """
1313
    return self._MultiNodeCall(node_list, "export_list", [])
1314

    
1315
  @_RpcTimeout(_TMO_FAST)
1316
  def call_export_remove(self, node, export):
1317
    """Requests removal of a given export.
1318

1319
    This is a single-node call.
1320

1321
    """
1322
    return self._SingleNodeCall(node, "export_remove", [export])
1323

    
1324
  @classmethod
1325
  @_RpcTimeout(_TMO_NORMAL)
1326
  def call_node_leave_cluster(cls, node, modify_ssh_setup):
1327
    """Requests a node to clean the cluster information it has.
1328

1329
    This will remove the configuration information from the ganeti data
1330
    dir.
1331

1332
    This is a single-node call.
1333

1334
    """
1335
    return cls._StaticSingleNodeCall(node, "node_leave_cluster",
1336
                                     [modify_ssh_setup])
1337

    
1338
  @_RpcTimeout(_TMO_FAST)
1339
  def call_node_volumes(self, node_list):
1340
    """Gets all volumes on node(s).
1341

1342
    This is a multi-node call.
1343

1344
    """
1345
    return self._MultiNodeCall(node_list, "node_volumes", [])
1346

    
1347
  @_RpcTimeout(_TMO_FAST)
1348
  def call_node_demote_from_mc(self, node):
1349
    """Demote a node from the master candidate role.
1350

1351
    This is a single-node call.
1352

1353
    """
1354
    return self._SingleNodeCall(node, "node_demote_from_mc", [])
1355

    
1356
  @_RpcTimeout(_TMO_NORMAL)
1357
  def call_node_powercycle(self, node, hypervisor):
1358
    """Tries to powercycle a node.
1359

1360
    This is a single-node call.
1361

1362
    """
1363
    return self._SingleNodeCall(node, "node_powercycle", [hypervisor])
1364

    
1365
  @_RpcTimeout(None)
1366
  def call_test_delay(self, node_list, duration):
1367
    """Sleep for a fixed time on given node(s).
1368

1369
    This is a multi-node call.
1370

1371
    """
1372
    return self._MultiNodeCall(node_list, "test_delay", [duration],
1373
                               read_timeout=int(duration + 5))
1374

    
1375
  @_RpcTimeout(_TMO_FAST)
1376
  def call_file_storage_dir_create(self, node, file_storage_dir):
1377
    """Create the given file storage directory.
1378

1379
    This is a single-node call.
1380

1381
    """
1382
    return self._SingleNodeCall(node, "file_storage_dir_create",
1383
                                [file_storage_dir])
1384

    
1385
  @_RpcTimeout(_TMO_FAST)
1386
  def call_file_storage_dir_remove(self, node, file_storage_dir):
1387
    """Remove the given file storage directory.
1388

1389
    This is a single-node call.
1390

1391
    """
1392
    return self._SingleNodeCall(node, "file_storage_dir_remove",
1393
                                [file_storage_dir])
1394

    
1395
  @_RpcTimeout(_TMO_FAST)
1396
  def call_file_storage_dir_rename(self, node, old_file_storage_dir,
1397
                                   new_file_storage_dir):
1398
    """Rename file storage directory.
1399

1400
    This is a single-node call.
1401

1402
    """
1403
    return self._SingleNodeCall(node, "file_storage_dir_rename",
1404
                                [old_file_storage_dir, new_file_storage_dir])
1405

    
1406
  @classmethod
1407
  @_RpcTimeout(_TMO_FAST)
1408
  def call_jobqueue_update(cls, node_list, address_list, file_name, content):
1409
    """Update job queue.
1410

1411
    This is a multi-node call.
1412

1413
    """
1414
    return cls._StaticMultiNodeCall(node_list, "jobqueue_update",
1415
                                    [file_name, cls._Compress(content)],
1416
                                    address_list=address_list)
1417

    
1418
  @classmethod
1419
  @_RpcTimeout(_TMO_NORMAL)
1420
  def call_jobqueue_purge(cls, node):
1421
    """Purge job queue.
1422

1423
    This is a single-node call.
1424

1425
    """
1426
    return cls._StaticSingleNodeCall(node, "jobqueue_purge", [])
1427

    
1428
  @classmethod
1429
  @_RpcTimeout(_TMO_FAST)
1430
  def call_jobqueue_rename(cls, node_list, address_list, rename):
1431
    """Rename a job queue file.
1432

1433
    This is a multi-node call.
1434

1435
    """
1436
    return cls._StaticMultiNodeCall(node_list, "jobqueue_rename", rename,
1437
                                    address_list=address_list)
1438

    
1439
  @_RpcTimeout(_TMO_NORMAL)
1440
  def call_hypervisor_validate_params(self, node_list, hvname, hvparams):
1441
    """Validate the hypervisor params.
1442

1443
    This is a multi-node call.
1444

1445
    @type node_list: list
1446
    @param node_list: the list of nodes to query
1447
    @type hvname: string
1448
    @param hvname: the hypervisor name
1449
    @type hvparams: dict
1450
    @param hvparams: the hypervisor parameters to be validated
1451

1452
    """
1453
    cluster = self._cfg.GetClusterInfo()
1454
    hv_full = objects.FillDict(cluster.hvparams.get(hvname, {}), hvparams)
1455
    return self._MultiNodeCall(node_list, "hypervisor_validate_params",
1456
                               [hvname, hv_full])
1457

    
1458
  @_RpcTimeout(_TMO_NORMAL)
1459
  def call_x509_cert_create(self, node, validity):
1460
    """Creates a new X509 certificate for SSL/TLS.
1461

1462
    This is a single-node call.
1463

1464
    @type validity: int
1465
    @param validity: Validity in seconds
1466

1467
    """
1468
    return self._SingleNodeCall(node, "x509_cert_create", [validity])
1469

    
1470
  @_RpcTimeout(_TMO_NORMAL)
1471
  def call_x509_cert_remove(self, node, name):
1472
    """Removes a X509 certificate.
1473

1474
    This is a single-node call.
1475

1476
    @type name: string
1477
    @param name: Certificate name
1478

1479
    """
1480
    return self._SingleNodeCall(node, "x509_cert_remove", [name])
1481

    
1482
  @_RpcTimeout(_TMO_NORMAL)
1483
  def call_import_start(self, node, opts, instance, dest, dest_args):
1484
    """Starts a listener for an import.
1485

1486
    This is a single-node call.
1487

1488
    @type node: string
1489
    @param node: Node name
1490
    @type instance: C{objects.Instance}
1491
    @param instance: Instance object
1492

1493
    """
1494
    return self._SingleNodeCall(node, "import_start",
1495
                                [opts.ToDict(),
1496
                                 self._InstDict(instance), dest,
1497
                                 _EncodeImportExportIO(dest, dest_args)])
1498

    
1499
  @_RpcTimeout(_TMO_NORMAL)
1500
  def call_export_start(self, node, opts, host, port,
1501
                        instance, source, source_args):
1502
    """Starts an export daemon.
1503

1504
    This is a single-node call.
1505

1506
    @type node: string
1507
    @param node: Node name
1508
    @type instance: C{objects.Instance}
1509
    @param instance: Instance object
1510

1511
    """
1512
    return self._SingleNodeCall(node, "export_start",
1513
                                [opts.ToDict(), host, port,
1514
                                 self._InstDict(instance), source,
1515
                                 _EncodeImportExportIO(source, source_args)])
1516

    
1517
  @_RpcTimeout(_TMO_FAST)
1518
  def call_impexp_status(self, node, names):
1519
    """Gets the status of an import or export.
1520

1521
    This is a single-node call.
1522

1523
    @type node: string
1524
    @param node: Node name
1525
    @type names: List of strings
1526
    @param names: Import/export names
1527
    @rtype: List of L{objects.ImportExportStatus} instances
1528
    @return: Returns a list of the state of each named import/export or None if
1529
             a status couldn't be retrieved
1530

1531
    """
1532
    result = self._SingleNodeCall(node, "impexp_status", [names])
1533

    
1534
    if not result.fail_msg:
1535
      decoded = []
1536

    
1537
      for i in result.payload:
1538
        if i is None:
1539
          decoded.append(None)
1540
          continue
1541
        decoded.append(objects.ImportExportStatus.FromDict(i))
1542

    
1543
      result.payload = decoded
1544

    
1545
    return result
1546

    
1547
  @_RpcTimeout(_TMO_NORMAL)
1548
  def call_impexp_abort(self, node, name):
1549
    """Aborts an import or export.
1550

1551
    This is a single-node call.
1552

1553
    @type node: string
1554
    @param node: Node name
1555
    @type name: string
1556
    @param name: Import/export name
1557

1558
    """
1559
    return self._SingleNodeCall(node, "impexp_abort", [name])
1560

    
1561
  @_RpcTimeout(_TMO_NORMAL)
1562
  def call_impexp_cleanup(self, node, name):
1563
    """Cleans up after an import or export.
1564

1565
    This is a single-node call.
1566

1567
    @type node: string
1568
    @param node: Node name
1569
    @type name: string
1570
    @param name: Import/export name
1571

1572
    """
1573
    return self._SingleNodeCall(node, "impexp_cleanup", [name])