Statistics
| Branch: | Tag: | Revision:

root / lib / rpc.py @ 2c0f74f2

History | View | Annotate | Download (46.1 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008, 2009, 2010 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Inter-node RPC library.
23

24
"""
25

    
26
# pylint: disable-msg=C0103,R0201,R0904
27
# C0103: Invalid name, since call_ are not valid
28
# R0201: Method could be a function, we keep all rpcs instance methods
29
# as not to change them back and forth between static/instance methods
30
# if they need to start using instance attributes
31
# R0904: Too many public methods
32

    
33
import os
34
import logging
35
import zlib
36
import base64
37
import pycurl
38
import threading
39

    
40
from ganeti import utils
41
from ganeti import objects
42
from ganeti import http
43
from ganeti import serializer
44
from ganeti import constants
45
from ganeti import errors
46
from ganeti import netutils
47
from ganeti import ssconf
48

    
49
# pylint has a bug here, doesn't see this import
50
import ganeti.http.client  # pylint: disable-msg=W0611
51

    
52

    
53
# Timeout for connecting to nodes (seconds)
54
_RPC_CONNECT_TIMEOUT = 5
55

    
56
_RPC_CLIENT_HEADERS = [
57
  "Content-type: %s" % http.HTTP_APP_JSON,
58
  "Expect:",
59
  ]
60

    
61
# Various time constants for the timeout table
62
_TMO_URGENT = 60 # one minute
63
_TMO_FAST = 5 * 60 # five minutes
64
_TMO_NORMAL = 15 * 60 # 15 minutes
65
_TMO_SLOW = 3600 # one hour
66
_TMO_4HRS = 4 * 3600
67
_TMO_1DAY = 86400
68

    
69
# Timeout table that will be built later by decorators
70
# Guidelines for choosing timeouts:
71
# - call used during watcher: timeout -> 1min, _TMO_URGENT
72
# - trivial (but be sure it is trivial) (e.g. reading a file): 5min, _TMO_FAST
73
# - other calls: 15 min, _TMO_NORMAL
74
# - special calls (instance add, etc.): either _TMO_SLOW (1h) or huge timeouts
75

    
76
_TIMEOUTS = {
77
}
78

    
79

    
80
def Init():
81
  """Initializes the module-global HTTP client manager.
82

83
  Must be called before using any RPC function and while exactly one thread is
84
  running.
85

86
  """
87
  # curl_global_init(3) and curl_global_cleanup(3) must be called with only
88
  # one thread running. This check is just a safety measure -- it doesn't
89
  # cover all cases.
90
  assert threading.activeCount() == 1, \
91
         "Found more than one active thread when initializing pycURL"
92

    
93
  logging.info("Using PycURL %s", pycurl.version)
94

    
95
  pycurl.global_init(pycurl.GLOBAL_ALL)
96

    
97

    
98
def Shutdown():
99
  """Stops the module-global HTTP client manager.
100

101
  Must be called before quitting the program and while exactly one thread is
102
  running.
103

104
  """
105
  pycurl.global_cleanup()
106

    
107

    
108
def _ConfigRpcCurl(curl):
109
  noded_cert = str(constants.NODED_CERT_FILE)
110

    
111
  curl.setopt(pycurl.FOLLOWLOCATION, False)
112
  curl.setopt(pycurl.CAINFO, noded_cert)
113
  curl.setopt(pycurl.SSL_VERIFYHOST, 0)
114
  curl.setopt(pycurl.SSL_VERIFYPEER, True)
115
  curl.setopt(pycurl.SSLCERTTYPE, "PEM")
116
  curl.setopt(pycurl.SSLCERT, noded_cert)
117
  curl.setopt(pycurl.SSLKEYTYPE, "PEM")
118
  curl.setopt(pycurl.SSLKEY, noded_cert)
119
  curl.setopt(pycurl.CONNECTTIMEOUT, _RPC_CONNECT_TIMEOUT)
120

    
121

    
122
# Aliasing this module avoids the following warning by epydoc: "Warning: No
123
# information available for ganeti.rpc._RpcThreadLocal's base threading.local"
124
_threading = threading
125

    
126

    
127
class _RpcThreadLocal(_threading.local):
128
  def GetHttpClientPool(self):
129
    """Returns a per-thread HTTP client pool.
130

131
    @rtype: L{http.client.HttpClientPool}
132

133
    """
134
    try:
135
      pool = self.hcp
136
    except AttributeError:
137
      pool = http.client.HttpClientPool(_ConfigRpcCurl)
138
      self.hcp = pool
139

    
140
    return pool
141

    
142

    
143
# Remove module alias (see above)
144
del _threading
145

    
146

    
147
_thread_local = _RpcThreadLocal()
148

    
149

    
150
def _RpcTimeout(secs):
151
  """Timeout decorator.
152

153
  When applied to a rpc call_* function, it updates the global timeout
154
  table with the given function/timeout.
155

156
  """
157
  def decorator(f):
158
    name = f.__name__
159
    assert name.startswith("call_")
160
    _TIMEOUTS[name[len("call_"):]] = secs
161
    return f
162
  return decorator
163

    
164

    
165
def RunWithRPC(fn):
166
  """RPC-wrapper decorator.
167

168
  When applied to a function, it runs it with the RPC system
169
  initialized, and it shutsdown the system afterwards. This means the
170
  function must be called without RPC being initialized.
171

172
  """
173
  def wrapper(*args, **kwargs):
174
    Init()
175
    try:
176
      return fn(*args, **kwargs)
177
    finally:
178
      Shutdown()
179
  return wrapper
180

    
181

    
182
class RpcResult(object):
183
  """RPC Result class.
184

185
  This class holds an RPC result. It is needed since in multi-node
186
  calls we can't raise an exception just because one one out of many
187
  failed, and therefore we use this class to encapsulate the result.
188

189
  @ivar data: the data payload, for successful results, or None
190
  @ivar call: the name of the RPC call
191
  @ivar node: the name of the node to which we made the call
192
  @ivar offline: whether the operation failed because the node was
193
      offline, as opposed to actual failure; offline=True will always
194
      imply failed=True, in order to allow simpler checking if
195
      the user doesn't care about the exact failure mode
196
  @ivar fail_msg: the error message if the call failed
197

198
  """
199
  def __init__(self, data=None, failed=False, offline=False,
200
               call=None, node=None):
201
    self.offline = offline
202
    self.call = call
203
    self.node = node
204

    
205
    if offline:
206
      self.fail_msg = "Node is marked offline"
207
      self.data = self.payload = None
208
    elif failed:
209
      self.fail_msg = self._EnsureErr(data)
210
      self.data = self.payload = None
211
    else:
212
      self.data = data
213
      if not isinstance(self.data, (tuple, list)):
214
        self.fail_msg = ("RPC layer error: invalid result type (%s)" %
215
                         type(self.data))
216
        self.payload = None
217
      elif len(data) != 2:
218
        self.fail_msg = ("RPC layer error: invalid result length (%d), "
219
                         "expected 2" % len(self.data))
220
        self.payload = None
221
      elif not self.data[0]:
222
        self.fail_msg = self._EnsureErr(self.data[1])
223
        self.payload = None
224
      else:
225
        # finally success
226
        self.fail_msg = None
227
        self.payload = data[1]
228

    
229
    for attr_name in ["call", "data", "fail_msg",
230
                      "node", "offline", "payload"]:
231
      assert hasattr(self, attr_name), "Missing attribute %s" % attr_name
232

    
233
  @staticmethod
234
  def _EnsureErr(val):
235
    """Helper to ensure we return a 'True' value for error."""
236
    if val:
237
      return val
238
    else:
239
      return "No error information"
240

    
241
  def Raise(self, msg, prereq=False, ecode=None):
242
    """If the result has failed, raise an OpExecError.
243

244
    This is used so that LU code doesn't have to check for each
245
    result, but instead can call this function.
246

247
    """
248
    if not self.fail_msg:
249
      return
250

    
251
    if not msg: # one could pass None for default message
252
      msg = ("Call '%s' to node '%s' has failed: %s" %
253
             (self.call, self.node, self.fail_msg))
254
    else:
255
      msg = "%s: %s" % (msg, self.fail_msg)
256
    if prereq:
257
      ec = errors.OpPrereqError
258
    else:
259
      ec = errors.OpExecError
260
    if ecode is not None:
261
      args = (msg, ecode)
262
    else:
263
      args = (msg, )
264
    raise ec(*args) # pylint: disable-msg=W0142
265

    
266

    
267
def _AddressLookup(node_list,
268
                   ssc=ssconf.SimpleStore,
269
                   nslookup_fn=netutils.Hostname.GetIP):
270
  """Return addresses for given node names.
271

272
  @type node_list: list
273
  @param node_list: List of node names
274
  @type ssc: class
275
  @param ssc: SimpleStore class that is used to obtain node->ip mappings
276
  @type nslookup_fn: callable
277
  @param nslookup_fn: function use to do NS lookup
278
  @rtype: list of addresses and/or None's
279
  @returns: List of corresponding addresses, if found
280

281
  """
282
  ss = ssc()
283
  iplist = ss.GetNodePrimaryIPList()
284
  family = ss.GetPrimaryIPFamily()
285
  addresses = []
286
  ipmap = dict(entry.split() for entry in iplist)
287
  for node in node_list:
288
    address = ipmap.get(node)
289
    if address is None:
290
      address = nslookup_fn(node, family=family)
291
    addresses.append(address)
292

    
293
  return addresses
294

    
295

    
296
class Client:
297
  """RPC Client class.
298

299
  This class, given a (remote) method name, a list of parameters and a
300
  list of nodes, will contact (in parallel) all nodes, and return a
301
  dict of results (key: node name, value: result).
302

303
  One current bug is that generic failure is still signaled by
304
  'False' result, which is not good. This overloading of values can
305
  cause bugs.
306

307
  """
308
  def __init__(self, procedure, body, port, address_lookup_fn=_AddressLookup):
309
    assert procedure in _TIMEOUTS, ("New RPC call not declared in the"
310
                                    " timeouts table")
311
    self.procedure = procedure
312
    self.body = body
313
    self.port = port
314
    self._request = {}
315
    self._address_lookup_fn = address_lookup_fn
316

    
317
  def ConnectList(self, node_list, address_list=None, read_timeout=None):
318
    """Add a list of nodes to the target nodes.
319

320
    @type node_list: list
321
    @param node_list: the list of node names to connect
322
    @type address_list: list or None
323
    @keyword address_list: either None or a list with node addresses,
324
        which must have the same length as the node list
325
    @type read_timeout: int
326
    @param read_timeout: overwrites default timeout for operation
327

328
    """
329
    if address_list is None:
330
      # Always use IP address instead of node name
331
      address_list = self._address_lookup_fn(node_list)
332

    
333
    assert len(node_list) == len(address_list), \
334
           "Name and address lists must have the same length"
335

    
336
    for node, address in zip(node_list, address_list):
337
      self.ConnectNode(node, address, read_timeout=read_timeout)
338

    
339
  def ConnectNode(self, name, address=None, read_timeout=None):
340
    """Add a node to the target list.
341

342
    @type name: str
343
    @param name: the node name
344
    @type address: str
345
    @param address: the node address, if known
346
    @type read_timeout: int
347
    @param read_timeout: overwrites default timeout for operation
348

349
    """
350
    if address is None:
351
      # Always use IP address instead of node name
352
      address = self._address_lookup_fn([name])[0]
353

    
354
    assert(address is not None)
355

    
356
    if read_timeout is None:
357
      read_timeout = _TIMEOUTS[self.procedure]
358

    
359
    self._request[name] = \
360
      http.client.HttpClientRequest(str(address), self.port,
361
                                    http.HTTP_PUT, str("/%s" % self.procedure),
362
                                    headers=_RPC_CLIENT_HEADERS,
363
                                    post_data=str(self.body),
364
                                    read_timeout=read_timeout)
365

    
366
  def GetResults(self, http_pool=None):
367
    """Call nodes and return results.
368

369
    @rtype: list
370
    @return: List of RPC results
371

372
    """
373
    if not http_pool:
374
      http_pool = _thread_local.GetHttpClientPool()
375

    
376
    http_pool.ProcessRequests(self._request.values())
377

    
378
    results = {}
379

    
380
    for name, req in self._request.iteritems():
381
      if req.success and req.resp_status_code == http.HTTP_OK:
382
        results[name] = RpcResult(data=serializer.LoadJson(req.resp_body),
383
                                  node=name, call=self.procedure)
384
        continue
385

    
386
      # TODO: Better error reporting
387
      if req.error:
388
        msg = req.error
389
      else:
390
        msg = req.resp_body
391

    
392
      logging.error("RPC error in %s from node %s: %s",
393
                    self.procedure, name, msg)
394
      results[name] = RpcResult(data=msg, failed=True, node=name,
395
                                call=self.procedure)
396

    
397
    return results
398

    
399

    
400
def _EncodeImportExportIO(ieio, ieioargs):
401
  """Encodes import/export I/O information.
402

403
  """
404
  if ieio == constants.IEIO_RAW_DISK:
405
    assert len(ieioargs) == 1
406
    return (ieioargs[0].ToDict(), )
407

    
408
  if ieio == constants.IEIO_SCRIPT:
409
    assert len(ieioargs) == 2
410
    return (ieioargs[0].ToDict(), ieioargs[1])
411

    
412
  return ieioargs
413

    
414

    
415
class RpcRunner(object):
416
  """RPC runner class"""
417

    
418
  def __init__(self, cfg):
419
    """Initialized the rpc runner.
420

421
    @type cfg:  C{config.ConfigWriter}
422
    @param cfg: the configuration object that will be used to get data
423
                about the cluster
424

425
    """
426
    self._cfg = cfg
427
    self.port = netutils.GetDaemonPort(constants.NODED)
428

    
429
  def _InstDict(self, instance, hvp=None, bep=None, osp=None):
430
    """Convert the given instance to a dict.
431

432
    This is done via the instance's ToDict() method and additionally
433
    we fill the hvparams with the cluster defaults.
434

435
    @type instance: L{objects.Instance}
436
    @param instance: an Instance object
437
    @type hvp: dict or None
438
    @param hvp: a dictionary with overridden hypervisor parameters
439
    @type bep: dict or None
440
    @param bep: a dictionary with overridden backend parameters
441
    @type osp: dict or None
442
    @param osp: a dictionary with overridden os parameters
443
    @rtype: dict
444
    @return: the instance dict, with the hvparams filled with the
445
        cluster defaults
446

447
    """
448
    idict = instance.ToDict()
449
    cluster = self._cfg.GetClusterInfo()
450
    idict["hvparams"] = cluster.FillHV(instance)
451
    if hvp is not None:
452
      idict["hvparams"].update(hvp)
453
    idict["beparams"] = cluster.FillBE(instance)
454
    if bep is not None:
455
      idict["beparams"].update(bep)
456
    idict["osparams"] = cluster.SimpleFillOS(instance.os, instance.osparams)
457
    if osp is not None:
458
      idict["osparams"].update(osp)
459
    for nic in idict["nics"]:
460
      nic['nicparams'] = objects.FillDict(
461
        cluster.nicparams[constants.PP_DEFAULT],
462
        nic['nicparams'])
463
    return idict
464

    
465
  def _ConnectList(self, client, node_list, call, read_timeout=None):
466
    """Helper for computing node addresses.
467

468
    @type client: L{ganeti.rpc.Client}
469
    @param client: a C{Client} instance
470
    @type node_list: list
471
    @param node_list: the node list we should connect
472
    @type call: string
473
    @param call: the name of the remote procedure call, for filling in
474
        correctly any eventual offline nodes' results
475
    @type read_timeout: int
476
    @param read_timeout: overwrites the default read timeout for the
477
        given operation
478

479
    """
480
    all_nodes = self._cfg.GetAllNodesInfo()
481
    name_list = []
482
    addr_list = []
483
    skip_dict = {}
484
    for node in node_list:
485
      if node in all_nodes:
486
        if all_nodes[node].offline:
487
          skip_dict[node] = RpcResult(node=node, offline=True, call=call)
488
          continue
489
        val = all_nodes[node].primary_ip
490
      else:
491
        val = None
492
      addr_list.append(val)
493
      name_list.append(node)
494
    if name_list:
495
      client.ConnectList(name_list, address_list=addr_list,
496
                         read_timeout=read_timeout)
497
    return skip_dict
498

    
499
  def _ConnectNode(self, client, node, call, read_timeout=None):
500
    """Helper for computing one node's address.
501

502
    @type client: L{ganeti.rpc.Client}
503
    @param client: a C{Client} instance
504
    @type node: str
505
    @param node: the node we should connect
506
    @type call: string
507
    @param call: the name of the remote procedure call, for filling in
508
        correctly any eventual offline nodes' results
509
    @type read_timeout: int
510
    @param read_timeout: overwrites the default read timeout for the
511
        given operation
512

513
    """
514
    node_info = self._cfg.GetNodeInfo(node)
515
    if node_info is not None:
516
      if node_info.offline:
517
        return RpcResult(node=node, offline=True, call=call)
518
      addr = node_info.primary_ip
519
    else:
520
      addr = None
521
    client.ConnectNode(node, address=addr, read_timeout=read_timeout)
522

    
523
  def _MultiNodeCall(self, node_list, procedure, args, read_timeout=None):
524
    """Helper for making a multi-node call
525

526
    """
527
    body = serializer.DumpJson(args, indent=False)
528
    c = Client(procedure, body, self.port)
529
    skip_dict = self._ConnectList(c, node_list, procedure,
530
                                  read_timeout=read_timeout)
531
    skip_dict.update(c.GetResults())
532
    return skip_dict
533

    
534
  @classmethod
535
  def _StaticMultiNodeCall(cls, node_list, procedure, args,
536
                           address_list=None, read_timeout=None):
537
    """Helper for making a multi-node static call
538

539
    """
540
    body = serializer.DumpJson(args, indent=False)
541
    c = Client(procedure, body, netutils.GetDaemonPort(constants.NODED))
542
    c.ConnectList(node_list, address_list=address_list,
543
                  read_timeout=read_timeout)
544
    return c.GetResults()
545

    
546
  def _SingleNodeCall(self, node, procedure, args, read_timeout=None):
547
    """Helper for making a single-node call
548

549
    """
550
    body = serializer.DumpJson(args, indent=False)
551
    c = Client(procedure, body, self.port)
552
    result = self._ConnectNode(c, node, procedure, read_timeout=read_timeout)
553
    if result is None:
554
      # we did connect, node is not offline
555
      result = c.GetResults()[node]
556
    return result
557

    
558
  @classmethod
559
  def _StaticSingleNodeCall(cls, node, procedure, args, read_timeout=None):
560
    """Helper for making a single-node static call
561

562
    """
563
    body = serializer.DumpJson(args, indent=False)
564
    c = Client(procedure, body, netutils.GetDaemonPort(constants.NODED))
565
    c.ConnectNode(node, read_timeout=read_timeout)
566
    return c.GetResults()[node]
567

    
568
  @staticmethod
569
  def _Compress(data):
570
    """Compresses a string for transport over RPC.
571

572
    Small amounts of data are not compressed.
573

574
    @type data: str
575
    @param data: Data
576
    @rtype: tuple
577
    @return: Encoded data to send
578

579
    """
580
    # Small amounts of data are not compressed
581
    if len(data) < 512:
582
      return (constants.RPC_ENCODING_NONE, data)
583

    
584
    # Compress with zlib and encode in base64
585
    return (constants.RPC_ENCODING_ZLIB_BASE64,
586
            base64.b64encode(zlib.compress(data, 3)))
587

    
588
  #
589
  # Begin RPC calls
590
  #
591

    
592
  @_RpcTimeout(_TMO_URGENT)
593
  def call_lv_list(self, node_list, vg_name):
594
    """Gets the logical volumes present in a given volume group.
595

596
    This is a multi-node call.
597

598
    """
599
    return self._MultiNodeCall(node_list, "lv_list", [vg_name])
600

    
601
  @_RpcTimeout(_TMO_URGENT)
602
  def call_vg_list(self, node_list):
603
    """Gets the volume group list.
604

605
    This is a multi-node call.
606

607
    """
608
    return self._MultiNodeCall(node_list, "vg_list", [])
609

    
610
  @_RpcTimeout(_TMO_NORMAL)
611
  def call_storage_list(self, node_list, su_name, su_args, name, fields):
612
    """Get list of storage units.
613

614
    This is a multi-node call.
615

616
    """
617
    return self._MultiNodeCall(node_list, "storage_list",
618
                               [su_name, su_args, name, fields])
619

    
620
  @_RpcTimeout(_TMO_NORMAL)
621
  def call_storage_modify(self, node, su_name, su_args, name, changes):
622
    """Modify a storage unit.
623

624
    This is a single-node call.
625

626
    """
627
    return self._SingleNodeCall(node, "storage_modify",
628
                                [su_name, su_args, name, changes])
629

    
630
  @_RpcTimeout(_TMO_NORMAL)
631
  def call_storage_execute(self, node, su_name, su_args, name, op):
632
    """Executes an operation on a storage unit.
633

634
    This is a single-node call.
635

636
    """
637
    return self._SingleNodeCall(node, "storage_execute",
638
                                [su_name, su_args, name, op])
639

    
640
  @_RpcTimeout(_TMO_URGENT)
641
  def call_bridges_exist(self, node, bridges_list):
642
    """Checks if a node has all the bridges given.
643

644
    This method checks if all bridges given in the bridges_list are
645
    present on the remote node, so that an instance that uses interfaces
646
    on those bridges can be started.
647

648
    This is a single-node call.
649

650
    """
651
    return self._SingleNodeCall(node, "bridges_exist", [bridges_list])
652

    
653
  @_RpcTimeout(_TMO_NORMAL)
654
  def call_instance_start(self, node, instance, hvp, bep):
655
    """Starts an instance.
656

657
    This is a single-node call.
658

659
    """
660
    idict = self._InstDict(instance, hvp=hvp, bep=bep)
661
    return self._SingleNodeCall(node, "instance_start", [idict])
662

    
663
  @_RpcTimeout(_TMO_NORMAL)
664
  def call_instance_shutdown(self, node, instance, timeout):
665
    """Stops an instance.
666

667
    This is a single-node call.
668

669
    """
670
    return self._SingleNodeCall(node, "instance_shutdown",
671
                                [self._InstDict(instance), timeout])
672

    
673
  @_RpcTimeout(_TMO_NORMAL)
674
  def call_migration_info(self, node, instance):
675
    """Gather the information necessary to prepare an instance migration.
676

677
    This is a single-node call.
678

679
    @type node: string
680
    @param node: the node on which the instance is currently running
681
    @type instance: C{objects.Instance}
682
    @param instance: the instance definition
683

684
    """
685
    return self._SingleNodeCall(node, "migration_info",
686
                                [self._InstDict(instance)])
687

    
688
  @_RpcTimeout(_TMO_NORMAL)
689
  def call_accept_instance(self, node, instance, info, target):
690
    """Prepare a node to accept an instance.
691

692
    This is a single-node call.
693

694
    @type node: string
695
    @param node: the target node for the migration
696
    @type instance: C{objects.Instance}
697
    @param instance: the instance definition
698
    @type info: opaque/hypervisor specific (string/data)
699
    @param info: result for the call_migration_info call
700
    @type target: string
701
    @param target: target hostname (usually ip address) (on the node itself)
702

703
    """
704
    return self._SingleNodeCall(node, "accept_instance",
705
                                [self._InstDict(instance), info, target])
706

    
707
  @_RpcTimeout(_TMO_NORMAL)
708
  def call_finalize_migration(self, node, instance, info, success):
709
    """Finalize any target-node migration specific operation.
710

711
    This is called both in case of a successful migration and in case of error
712
    (in which case it should abort the migration).
713

714
    This is a single-node call.
715

716
    @type node: string
717
    @param node: the target node for the migration
718
    @type instance: C{objects.Instance}
719
    @param instance: the instance definition
720
    @type info: opaque/hypervisor specific (string/data)
721
    @param info: result for the call_migration_info call
722
    @type success: boolean
723
    @param success: whether the migration was a success or a failure
724

725
    """
726
    return self._SingleNodeCall(node, "finalize_migration",
727
                                [self._InstDict(instance), info, success])
728

    
729
  @_RpcTimeout(_TMO_SLOW)
730
  def call_instance_migrate(self, node, instance, target, live):
731
    """Migrate an instance.
732

733
    This is a single-node call.
734

735
    @type node: string
736
    @param node: the node on which the instance is currently running
737
    @type instance: C{objects.Instance}
738
    @param instance: the instance definition
739
    @type target: string
740
    @param target: the target node name
741
    @type live: boolean
742
    @param live: whether the migration should be done live or not (the
743
        interpretation of this parameter is left to the hypervisor)
744

745
    """
746
    return self._SingleNodeCall(node, "instance_migrate",
747
                                [self._InstDict(instance), target, live])
748

    
749
  @_RpcTimeout(_TMO_NORMAL)
750
  def call_instance_reboot(self, node, inst, reboot_type, shutdown_timeout):
751
    """Reboots an instance.
752

753
    This is a single-node call.
754

755
    """
756
    return self._SingleNodeCall(node, "instance_reboot",
757
                                [self._InstDict(inst), reboot_type,
758
                                 shutdown_timeout])
759

    
760
  @_RpcTimeout(_TMO_1DAY)
761
  def call_instance_os_add(self, node, inst, reinstall, debug, osparams=None):
762
    """Installs an OS on the given instance.
763

764
    This is a single-node call.
765

766
    """
767
    return self._SingleNodeCall(node, "instance_os_add",
768
                                [self._InstDict(inst, osp=osparams),
769
                                 reinstall, debug])
770

    
771
  @_RpcTimeout(_TMO_SLOW)
772
  def call_instance_run_rename(self, node, inst, old_name, debug):
773
    """Run the OS rename script for an instance.
774

775
    This is a single-node call.
776

777
    """
778
    return self._SingleNodeCall(node, "instance_run_rename",
779
                                [self._InstDict(inst), old_name, debug])
780

    
781
  @_RpcTimeout(_TMO_URGENT)
782
  def call_instance_info(self, node, instance, hname):
783
    """Returns information about a single instance.
784

785
    This is a single-node call.
786

787
    @type node: list
788
    @param node: the list of nodes to query
789
    @type instance: string
790
    @param instance: the instance name
791
    @type hname: string
792
    @param hname: the hypervisor type of the instance
793

794
    """
795
    return self._SingleNodeCall(node, "instance_info", [instance, hname])
796

    
797
  @_RpcTimeout(_TMO_NORMAL)
798
  def call_instance_migratable(self, node, instance):
799
    """Checks whether the given instance can be migrated.
800

801
    This is a single-node call.
802

803
    @param node: the node to query
804
    @type instance: L{objects.Instance}
805
    @param instance: the instance to check
806

807

808
    """
809
    return self._SingleNodeCall(node, "instance_migratable",
810
                                [self._InstDict(instance)])
811

    
812
  @_RpcTimeout(_TMO_URGENT)
813
  def call_all_instances_info(self, node_list, hypervisor_list):
814
    """Returns information about all instances on the given nodes.
815

816
    This is a multi-node call.
817

818
    @type node_list: list
819
    @param node_list: the list of nodes to query
820
    @type hypervisor_list: list
821
    @param hypervisor_list: the hypervisors to query for instances
822

823
    """
824
    return self._MultiNodeCall(node_list, "all_instances_info",
825
                               [hypervisor_list])
826

    
827
  @_RpcTimeout(_TMO_URGENT)
828
  def call_instance_list(self, node_list, hypervisor_list):
829
    """Returns the list of running instances on a given node.
830

831
    This is a multi-node call.
832

833
    @type node_list: list
834
    @param node_list: the list of nodes to query
835
    @type hypervisor_list: list
836
    @param hypervisor_list: the hypervisors to query for instances
837

838
    """
839
    return self._MultiNodeCall(node_list, "instance_list", [hypervisor_list])
840

    
841
  @_RpcTimeout(_TMO_FAST)
842
  def call_node_tcp_ping(self, node, source, target, port, timeout,
843
                         live_port_needed):
844
    """Do a TcpPing on the remote node
845

846
    This is a single-node call.
847

848
    """
849
    return self._SingleNodeCall(node, "node_tcp_ping",
850
                                [source, target, port, timeout,
851
                                 live_port_needed])
852

    
853
  @_RpcTimeout(_TMO_FAST)
854
  def call_node_has_ip_address(self, node, address):
855
    """Checks if a node has the given IP address.
856

857
    This is a single-node call.
858

859
    """
860
    return self._SingleNodeCall(node, "node_has_ip_address", [address])
861

    
862
  @_RpcTimeout(_TMO_URGENT)
863
  def call_node_info(self, node_list, vg_name, hypervisor_type):
864
    """Return node information.
865

866
    This will return memory information and volume group size and free
867
    space.
868

869
    This is a multi-node call.
870

871
    @type node_list: list
872
    @param node_list: the list of nodes to query
873
    @type vg_name: C{string}
874
    @param vg_name: the name of the volume group to ask for disk space
875
        information
876
    @type hypervisor_type: C{str}
877
    @param hypervisor_type: the name of the hypervisor to ask for
878
        memory information
879

880
    """
881
    return self._MultiNodeCall(node_list, "node_info",
882
                               [vg_name, hypervisor_type])
883

    
884
  @_RpcTimeout(_TMO_NORMAL)
885
  def call_etc_hosts_modify(self, node, mode, name, ip):
886
    """Modify hosts file with name
887

888
    @type node: string
889
    @param node: The node to call
890
    @type mode: string
891
    @param mode: The mode to operate. Currently "add" or "remove"
892
    @type name: string
893
    @param name: The host name to be modified
894
    @type ip: string
895
    @param ip: The ip of the entry (just valid if mode is "add")
896

897
    """
898
    return self._SingleNodeCall(node, "etc_hosts_modify", [mode, name, ip])
899

    
900
  @_RpcTimeout(_TMO_NORMAL)
901
  def call_node_verify(self, node_list, checkdict, cluster_name):
902
    """Request verification of given parameters.
903

904
    This is a multi-node call.
905

906
    """
907
    return self._MultiNodeCall(node_list, "node_verify",
908
                               [checkdict, cluster_name])
909

    
910
  @classmethod
911
  @_RpcTimeout(_TMO_FAST)
912
  def call_node_start_master(cls, node, start_daemons, no_voting):
913
    """Tells a node to activate itself as a master.
914

915
    This is a single-node call.
916

917
    """
918
    return cls._StaticSingleNodeCall(node, "node_start_master",
919
                                     [start_daemons, no_voting])
920

    
921
  @classmethod
922
  @_RpcTimeout(_TMO_FAST)
923
  def call_node_stop_master(cls, node, stop_daemons):
924
    """Tells a node to demote itself from master status.
925

926
    This is a single-node call.
927

928
    """
929
    return cls._StaticSingleNodeCall(node, "node_stop_master", [stop_daemons])
930

    
931
  @classmethod
932
  @_RpcTimeout(_TMO_URGENT)
933
  def call_master_info(cls, node_list):
934
    """Query master info.
935

936
    This is a multi-node call.
937

938
    """
939
    # TODO: should this method query down nodes?
940
    return cls._StaticMultiNodeCall(node_list, "master_info", [])
941

    
942
  @classmethod
943
  @_RpcTimeout(_TMO_URGENT)
944
  def call_version(cls, node_list):
945
    """Query node version.
946

947
    This is a multi-node call.
948

949
    """
950
    return cls._StaticMultiNodeCall(node_list, "version", [])
951

    
952
  @_RpcTimeout(_TMO_NORMAL)
953
  def call_blockdev_create(self, node, bdev, size, owner, on_primary, info):
954
    """Request creation of a given block device.
955

956
    This is a single-node call.
957

958
    """
959
    return self._SingleNodeCall(node, "blockdev_create",
960
                                [bdev.ToDict(), size, owner, on_primary, info])
961

    
962
  @_RpcTimeout(_TMO_SLOW)
963
  def call_blockdev_wipe(self, node, bdev, offset, size):
964
    """Request wipe at given offset with given size of a block device.
965

966
    This is a single-node call.
967

968
    """
969
    return self._SingleNodeCall(node, "blockdev_wipe",
970
                                [bdev.ToDict(), offset, size])
971

    
972
  @_RpcTimeout(_TMO_NORMAL)
973
  def call_blockdev_remove(self, node, bdev):
974
    """Request removal of a given block device.
975

976
    This is a single-node call.
977

978
    """
979
    return self._SingleNodeCall(node, "blockdev_remove", [bdev.ToDict()])
980

    
981
  @_RpcTimeout(_TMO_NORMAL)
982
  def call_blockdev_rename(self, node, devlist):
983
    """Request rename of the given block devices.
984

985
    This is a single-node call.
986

987
    """
988
    return self._SingleNodeCall(node, "blockdev_rename",
989
                                [(d.ToDict(), uid) for d, uid in devlist])
990

    
991
  @_RpcTimeout(_TMO_NORMAL)
992
  def call_blockdev_assemble(self, node, disk, owner, on_primary):
993
    """Request assembling of a given block device.
994

995
    This is a single-node call.
996

997
    """
998
    return self._SingleNodeCall(node, "blockdev_assemble",
999
                                [disk.ToDict(), owner, on_primary])
1000

    
1001
  @_RpcTimeout(_TMO_NORMAL)
1002
  def call_blockdev_shutdown(self, node, disk):
1003
    """Request shutdown of a given block device.
1004

1005
    This is a single-node call.
1006

1007
    """
1008
    return self._SingleNodeCall(node, "blockdev_shutdown", [disk.ToDict()])
1009

    
1010
  @_RpcTimeout(_TMO_NORMAL)
1011
  def call_blockdev_addchildren(self, node, bdev, ndevs):
1012
    """Request adding a list of children to a (mirroring) device.
1013

1014
    This is a single-node call.
1015

1016
    """
1017
    return self._SingleNodeCall(node, "blockdev_addchildren",
1018
                                [bdev.ToDict(),
1019
                                 [disk.ToDict() for disk in ndevs]])
1020

    
1021
  @_RpcTimeout(_TMO_NORMAL)
1022
  def call_blockdev_removechildren(self, node, bdev, ndevs):
1023
    """Request removing a list of children from a (mirroring) device.
1024

1025
    This is a single-node call.
1026

1027
    """
1028
    return self._SingleNodeCall(node, "blockdev_removechildren",
1029
                                [bdev.ToDict(),
1030
                                 [disk.ToDict() for disk in ndevs]])
1031

    
1032
  @_RpcTimeout(_TMO_NORMAL)
1033
  def call_blockdev_getmirrorstatus(self, node, disks):
1034
    """Request status of a (mirroring) device.
1035

1036
    This is a single-node call.
1037

1038
    """
1039
    result = self._SingleNodeCall(node, "blockdev_getmirrorstatus",
1040
                                  [dsk.ToDict() for dsk in disks])
1041
    if not result.fail_msg:
1042
      result.payload = [objects.BlockDevStatus.FromDict(i)
1043
                        for i in result.payload]
1044
    return result
1045

    
1046
  @_RpcTimeout(_TMO_NORMAL)
1047
  def call_blockdev_getmirrorstatus_multi(self, node_list, node_disks):
1048
    """Request status of (mirroring) devices from multiple nodes.
1049

1050
    This is a multi-node call.
1051

1052
    """
1053
    result = self._MultiNodeCall(node_list, "blockdev_getmirrorstatus_multi",
1054
                                 [dict((name, [dsk.ToDict() for dsk in disks])
1055
                                       for name, disks in node_disks.items())])
1056
    for nres in result.values():
1057
      if nres.fail_msg:
1058
        continue
1059

    
1060
      for idx, (success, status) in enumerate(nres.payload):
1061
        if success:
1062
          nres.payload[idx] = (success, objects.BlockDevStatus.FromDict(status))
1063

    
1064
    return result
1065

    
1066
  @_RpcTimeout(_TMO_NORMAL)
1067
  def call_blockdev_find(self, node, disk):
1068
    """Request identification of a given block device.
1069

1070
    This is a single-node call.
1071

1072
    """
1073
    result = self._SingleNodeCall(node, "blockdev_find", [disk.ToDict()])
1074
    if not result.fail_msg and result.payload is not None:
1075
      result.payload = objects.BlockDevStatus.FromDict(result.payload)
1076
    return result
1077

    
1078
  @_RpcTimeout(_TMO_NORMAL)
1079
  def call_blockdev_close(self, node, instance_name, disks):
1080
    """Closes the given block devices.
1081

1082
    This is a single-node call.
1083

1084
    """
1085
    params = [instance_name, [cf.ToDict() for cf in disks]]
1086
    return self._SingleNodeCall(node, "blockdev_close", params)
1087

    
1088
  @_RpcTimeout(_TMO_NORMAL)
1089
  def call_blockdev_getsizes(self, node, disks):
1090
    """Returns the size of the given disks.
1091

1092
    This is a single-node call.
1093

1094
    """
1095
    params = [[cf.ToDict() for cf in disks]]
1096
    return self._SingleNodeCall(node, "blockdev_getsize", params)
1097

    
1098
  @_RpcTimeout(_TMO_NORMAL)
1099
  def call_drbd_disconnect_net(self, node_list, nodes_ip, disks):
1100
    """Disconnects the network of the given drbd devices.
1101

1102
    This is a multi-node call.
1103

1104
    """
1105
    return self._MultiNodeCall(node_list, "drbd_disconnect_net",
1106
                               [nodes_ip, [cf.ToDict() for cf in disks]])
1107

    
1108
  @_RpcTimeout(_TMO_NORMAL)
1109
  def call_drbd_attach_net(self, node_list, nodes_ip,
1110
                           disks, instance_name, multimaster):
1111
    """Disconnects the given drbd devices.
1112

1113
    This is a multi-node call.
1114

1115
    """
1116
    return self._MultiNodeCall(node_list, "drbd_attach_net",
1117
                               [nodes_ip, [cf.ToDict() for cf in disks],
1118
                                instance_name, multimaster])
1119

    
1120
  @_RpcTimeout(_TMO_SLOW)
1121
  def call_drbd_wait_sync(self, node_list, nodes_ip, disks):
1122
    """Waits for the synchronization of drbd devices is complete.
1123

1124
    This is a multi-node call.
1125

1126
    """
1127
    return self._MultiNodeCall(node_list, "drbd_wait_sync",
1128
                               [nodes_ip, [cf.ToDict() for cf in disks]])
1129

    
1130
  @_RpcTimeout(_TMO_URGENT)
1131
  def call_drbd_helper(self, node_list):
1132
    """Gets drbd helper.
1133

1134
    This is a multi-node call.
1135

1136
    """
1137
    return self._MultiNodeCall(node_list, "drbd_helper", [])
1138

    
1139
  @classmethod
1140
  @_RpcTimeout(_TMO_NORMAL)
1141
  def call_upload_file(cls, node_list, file_name, address_list=None):
1142
    """Upload a file.
1143

1144
    The node will refuse the operation in case the file is not on the
1145
    approved file list.
1146

1147
    This is a multi-node call.
1148

1149
    @type node_list: list
1150
    @param node_list: the list of node names to upload to
1151
    @type file_name: str
1152
    @param file_name: the filename to upload
1153
    @type address_list: list or None
1154
    @keyword address_list: an optional list of node addresses, in order
1155
        to optimize the RPC speed
1156

1157
    """
1158
    file_contents = utils.ReadFile(file_name)
1159
    data = cls._Compress(file_contents)
1160
    st = os.stat(file_name)
1161
    params = [file_name, data, st.st_mode, st.st_uid, st.st_gid,
1162
              st.st_atime, st.st_mtime]
1163
    return cls._StaticMultiNodeCall(node_list, "upload_file", params,
1164
                                    address_list=address_list)
1165

    
1166
  @classmethod
1167
  @_RpcTimeout(_TMO_NORMAL)
1168
  def call_write_ssconf_files(cls, node_list, values):
1169
    """Write ssconf files.
1170

1171
    This is a multi-node call.
1172

1173
    """
1174
    return cls._StaticMultiNodeCall(node_list, "write_ssconf_files", [values])
1175

    
1176
  @_RpcTimeout(_TMO_NORMAL)
1177
  def call_run_oob(self, node, oob_program, command, remote_node, timeout):
1178
    """Runs OOB.
1179

1180
    This is a single-node call.
1181

1182
    """
1183
    return self._SingleNodeCall(node, "run_oob", [oob_program, command,
1184
                                                  remote_node, timeout])
1185

    
1186
  @_RpcTimeout(_TMO_FAST)
1187
  def call_os_diagnose(self, node_list):
1188
    """Request a diagnose of OS definitions.
1189

1190
    This is a multi-node call.
1191

1192
    """
1193
    return self._MultiNodeCall(node_list, "os_diagnose", [])
1194

    
1195
  @_RpcTimeout(_TMO_FAST)
1196
  def call_os_get(self, node, name):
1197
    """Returns an OS definition.
1198

1199
    This is a single-node call.
1200

1201
    """
1202
    result = self._SingleNodeCall(node, "os_get", [name])
1203
    if not result.fail_msg and isinstance(result.payload, dict):
1204
      result.payload = objects.OS.FromDict(result.payload)
1205
    return result
1206

    
1207
  @_RpcTimeout(_TMO_FAST)
1208
  def call_os_validate(self, required, nodes, name, checks, params):
1209
    """Run a validation routine for a given OS.
1210

1211
    This is a multi-node call.
1212

1213
    """
1214
    return self._MultiNodeCall(nodes, "os_validate",
1215
                               [required, name, checks, params])
1216

    
1217
  @_RpcTimeout(_TMO_NORMAL)
1218
  def call_hooks_runner(self, node_list, hpath, phase, env):
1219
    """Call the hooks runner.
1220

1221
    Args:
1222
      - op: the OpCode instance
1223
      - env: a dictionary with the environment
1224

1225
    This is a multi-node call.
1226

1227
    """
1228
    params = [hpath, phase, env]
1229
    return self._MultiNodeCall(node_list, "hooks_runner", params)
1230

    
1231
  @_RpcTimeout(_TMO_NORMAL)
1232
  def call_iallocator_runner(self, node, name, idata):
1233
    """Call an iallocator on a remote node
1234

1235
    Args:
1236
      - name: the iallocator name
1237
      - input: the json-encoded input string
1238

1239
    This is a single-node call.
1240

1241
    """
1242
    return self._SingleNodeCall(node, "iallocator_runner", [name, idata])
1243

    
1244
  @_RpcTimeout(_TMO_NORMAL)
1245
  def call_blockdev_grow(self, node, cf_bdev, amount):
1246
    """Request a snapshot of the given block device.
1247

1248
    This is a single-node call.
1249

1250
    """
1251
    return self._SingleNodeCall(node, "blockdev_grow",
1252
                                [cf_bdev.ToDict(), amount])
1253

    
1254
  @_RpcTimeout(_TMO_1DAY)
1255
  def call_blockdev_export(self, node, cf_bdev,
1256
                           dest_node, dest_path, cluster_name):
1257
    """Export a given disk to another node.
1258

1259
    This is a single-node call.
1260

1261
    """
1262
    return self._SingleNodeCall(node, "blockdev_export",
1263
                                [cf_bdev.ToDict(), dest_node, dest_path,
1264
                                 cluster_name])
1265

    
1266
  @_RpcTimeout(_TMO_NORMAL)
1267
  def call_blockdev_snapshot(self, node, cf_bdev):
1268
    """Request a snapshot of the given block device.
1269

1270
    This is a single-node call.
1271

1272
    """
1273
    return self._SingleNodeCall(node, "blockdev_snapshot", [cf_bdev.ToDict()])
1274

    
1275
  @_RpcTimeout(_TMO_NORMAL)
1276
  def call_finalize_export(self, node, instance, snap_disks):
1277
    """Request the completion of an export operation.
1278

1279
    This writes the export config file, etc.
1280

1281
    This is a single-node call.
1282

1283
    """
1284
    flat_disks = []
1285
    for disk in snap_disks:
1286
      if isinstance(disk, bool):
1287
        flat_disks.append(disk)
1288
      else:
1289
        flat_disks.append(disk.ToDict())
1290

    
1291
    return self._SingleNodeCall(node, "finalize_export",
1292
                                [self._InstDict(instance), flat_disks])
1293

    
1294
  @_RpcTimeout(_TMO_FAST)
1295
  def call_export_info(self, node, path):
1296
    """Queries the export information in a given path.
1297

1298
    This is a single-node call.
1299

1300
    """
1301
    return self._SingleNodeCall(node, "export_info", [path])
1302

    
1303
  @_RpcTimeout(_TMO_FAST)
1304
  def call_export_list(self, node_list):
1305
    """Gets the stored exports list.
1306

1307
    This is a multi-node call.
1308

1309
    """
1310
    return self._MultiNodeCall(node_list, "export_list", [])
1311

    
1312
  @_RpcTimeout(_TMO_FAST)
1313
  def call_export_remove(self, node, export):
1314
    """Requests removal of a given export.
1315

1316
    This is a single-node call.
1317

1318
    """
1319
    return self._SingleNodeCall(node, "export_remove", [export])
1320

    
1321
  @classmethod
1322
  @_RpcTimeout(_TMO_NORMAL)
1323
  def call_node_leave_cluster(cls, node, modify_ssh_setup):
1324
    """Requests a node to clean the cluster information it has.
1325

1326
    This will remove the configuration information from the ganeti data
1327
    dir.
1328

1329
    This is a single-node call.
1330

1331
    """
1332
    return cls._StaticSingleNodeCall(node, "node_leave_cluster",
1333
                                     [modify_ssh_setup])
1334

    
1335
  @_RpcTimeout(_TMO_FAST)
1336
  def call_node_volumes(self, node_list):
1337
    """Gets all volumes on node(s).
1338

1339
    This is a multi-node call.
1340

1341
    """
1342
    return self._MultiNodeCall(node_list, "node_volumes", [])
1343

    
1344
  @_RpcTimeout(_TMO_FAST)
1345
  def call_node_demote_from_mc(self, node):
1346
    """Demote a node from the master candidate role.
1347

1348
    This is a single-node call.
1349

1350
    """
1351
    return self._SingleNodeCall(node, "node_demote_from_mc", [])
1352

    
1353
  @_RpcTimeout(_TMO_NORMAL)
1354
  def call_node_powercycle(self, node, hypervisor):
1355
    """Tries to powercycle a node.
1356

1357
    This is a single-node call.
1358

1359
    """
1360
    return self._SingleNodeCall(node, "node_powercycle", [hypervisor])
1361

    
1362
  @_RpcTimeout(None)
1363
  def call_test_delay(self, node_list, duration):
1364
    """Sleep for a fixed time on given node(s).
1365

1366
    This is a multi-node call.
1367

1368
    """
1369
    return self._MultiNodeCall(node_list, "test_delay", [duration],
1370
                               read_timeout=int(duration + 5))
1371

    
1372
  @_RpcTimeout(_TMO_FAST)
1373
  def call_file_storage_dir_create(self, node, file_storage_dir):
1374
    """Create the given file storage directory.
1375

1376
    This is a single-node call.
1377

1378
    """
1379
    return self._SingleNodeCall(node, "file_storage_dir_create",
1380
                                [file_storage_dir])
1381

    
1382
  @_RpcTimeout(_TMO_FAST)
1383
  def call_file_storage_dir_remove(self, node, file_storage_dir):
1384
    """Remove the given file storage directory.
1385

1386
    This is a single-node call.
1387

1388
    """
1389
    return self._SingleNodeCall(node, "file_storage_dir_remove",
1390
                                [file_storage_dir])
1391

    
1392
  @_RpcTimeout(_TMO_FAST)
1393
  def call_file_storage_dir_rename(self, node, old_file_storage_dir,
1394
                                   new_file_storage_dir):
1395
    """Rename file storage directory.
1396

1397
    This is a single-node call.
1398

1399
    """
1400
    return self._SingleNodeCall(node, "file_storage_dir_rename",
1401
                                [old_file_storage_dir, new_file_storage_dir])
1402

    
1403
  @classmethod
1404
  @_RpcTimeout(_TMO_FAST)
1405
  def call_jobqueue_update(cls, node_list, address_list, file_name, content):
1406
    """Update job queue.
1407

1408
    This is a multi-node call.
1409

1410
    """
1411
    return cls._StaticMultiNodeCall(node_list, "jobqueue_update",
1412
                                    [file_name, cls._Compress(content)],
1413
                                    address_list=address_list)
1414

    
1415
  @classmethod
1416
  @_RpcTimeout(_TMO_NORMAL)
1417
  def call_jobqueue_purge(cls, node):
1418
    """Purge job queue.
1419

1420
    This is a single-node call.
1421

1422
    """
1423
    return cls._StaticSingleNodeCall(node, "jobqueue_purge", [])
1424

    
1425
  @classmethod
1426
  @_RpcTimeout(_TMO_FAST)
1427
  def call_jobqueue_rename(cls, node_list, address_list, rename):
1428
    """Rename a job queue file.
1429

1430
    This is a multi-node call.
1431

1432
    """
1433
    return cls._StaticMultiNodeCall(node_list, "jobqueue_rename", rename,
1434
                                    address_list=address_list)
1435

    
1436
  @_RpcTimeout(_TMO_NORMAL)
1437
  def call_hypervisor_validate_params(self, node_list, hvname, hvparams):
1438
    """Validate the hypervisor params.
1439

1440
    This is a multi-node call.
1441

1442
    @type node_list: list
1443
    @param node_list: the list of nodes to query
1444
    @type hvname: string
1445
    @param hvname: the hypervisor name
1446
    @type hvparams: dict
1447
    @param hvparams: the hypervisor parameters to be validated
1448

1449
    """
1450
    cluster = self._cfg.GetClusterInfo()
1451
    hv_full = objects.FillDict(cluster.hvparams.get(hvname, {}), hvparams)
1452
    return self._MultiNodeCall(node_list, "hypervisor_validate_params",
1453
                               [hvname, hv_full])
1454

    
1455
  @_RpcTimeout(_TMO_NORMAL)
1456
  def call_x509_cert_create(self, node, validity):
1457
    """Creates a new X509 certificate for SSL/TLS.
1458

1459
    This is a single-node call.
1460

1461
    @type validity: int
1462
    @param validity: Validity in seconds
1463

1464
    """
1465
    return self._SingleNodeCall(node, "x509_cert_create", [validity])
1466

    
1467
  @_RpcTimeout(_TMO_NORMAL)
1468
  def call_x509_cert_remove(self, node, name):
1469
    """Removes a X509 certificate.
1470

1471
    This is a single-node call.
1472

1473
    @type name: string
1474
    @param name: Certificate name
1475

1476
    """
1477
    return self._SingleNodeCall(node, "x509_cert_remove", [name])
1478

    
1479
  @_RpcTimeout(_TMO_NORMAL)
1480
  def call_import_start(self, node, opts, instance, dest, dest_args):
1481
    """Starts a listener for an import.
1482

1483
    This is a single-node call.
1484

1485
    @type node: string
1486
    @param node: Node name
1487
    @type instance: C{objects.Instance}
1488
    @param instance: Instance object
1489

1490
    """
1491
    return self._SingleNodeCall(node, "import_start",
1492
                                [opts.ToDict(),
1493
                                 self._InstDict(instance), dest,
1494
                                 _EncodeImportExportIO(dest, dest_args)])
1495

    
1496
  @_RpcTimeout(_TMO_NORMAL)
1497
  def call_export_start(self, node, opts, host, port,
1498
                        instance, source, source_args):
1499
    """Starts an export daemon.
1500

1501
    This is a single-node call.
1502

1503
    @type node: string
1504
    @param node: Node name
1505
    @type instance: C{objects.Instance}
1506
    @param instance: Instance object
1507

1508
    """
1509
    return self._SingleNodeCall(node, "export_start",
1510
                                [opts.ToDict(), host, port,
1511
                                 self._InstDict(instance), source,
1512
                                 _EncodeImportExportIO(source, source_args)])
1513

    
1514
  @_RpcTimeout(_TMO_FAST)
1515
  def call_impexp_status(self, node, names):
1516
    """Gets the status of an import or export.
1517

1518
    This is a single-node call.
1519

1520
    @type node: string
1521
    @param node: Node name
1522
    @type names: List of strings
1523
    @param names: Import/export names
1524
    @rtype: List of L{objects.ImportExportStatus} instances
1525
    @return: Returns a list of the state of each named import/export or None if
1526
             a status couldn't be retrieved
1527

1528
    """
1529
    result = self._SingleNodeCall(node, "impexp_status", [names])
1530

    
1531
    if not result.fail_msg:
1532
      decoded = []
1533

    
1534
      for i in result.payload:
1535
        if i is None:
1536
          decoded.append(None)
1537
          continue
1538
        decoded.append(objects.ImportExportStatus.FromDict(i))
1539

    
1540
      result.payload = decoded
1541

    
1542
    return result
1543

    
1544
  @_RpcTimeout(_TMO_NORMAL)
1545
  def call_impexp_abort(self, node, name):
1546
    """Aborts an import or export.
1547

1548
    This is a single-node call.
1549

1550
    @type node: string
1551
    @param node: Node name
1552
    @type name: string
1553
    @param name: Import/export name
1554

1555
    """
1556
    return self._SingleNodeCall(node, "impexp_abort", [name])
1557

    
1558
  @_RpcTimeout(_TMO_NORMAL)
1559
  def call_impexp_cleanup(self, node, name):
1560
    """Cleans up after an import or export.
1561

1562
    This is a single-node call.
1563

1564
    @type node: string
1565
    @param node: Node name
1566
    @type name: string
1567
    @param name: Import/export name
1568

1569
    """
1570
    return self._SingleNodeCall(node, "impexp_cleanup", [name])