Statistics
| Branch: | Tag: | Revision:

root / lib / rpc.py @ 91c622a8

History | View | Annotate | Download (43.2 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Inter-node RPC library.
23

24
"""
25

    
26
# pylint: disable-msg=C0103,R0201,R0904
27
# C0103: Invalid name, since call_ are not valid
28
# R0201: Method could be a function, we keep all rpcs instance methods
29
# as not to change them back and forth between static/instance methods
30
# if they need to start using instance attributes
31
# R0904: Too many public methods
32

    
33
import os
34
import logging
35
import zlib
36
import base64
37
import pycurl
38
import threading
39

    
40
from ganeti import utils
41
from ganeti import objects
42
from ganeti import http
43
from ganeti import serializer
44
from ganeti import constants
45
from ganeti import errors
46
from ganeti import netutils
47

    
48
# pylint has a bug here, doesn't see this import
49
import ganeti.http.client  # pylint: disable-msg=W0611
50

    
51

    
52
# Timeout for connecting to nodes (seconds)
53
_RPC_CONNECT_TIMEOUT = 5
54

    
55
_RPC_CLIENT_HEADERS = [
56
  "Content-type: %s" % http.HTTP_APP_JSON,
57
  ]
58

    
59
# Various time constants for the timeout table
60
_TMO_URGENT = 60 # one minute
61
_TMO_FAST = 5 * 60 # five minutes
62
_TMO_NORMAL = 15 * 60 # 15 minutes
63
_TMO_SLOW = 3600 # one hour
64
_TMO_4HRS = 4 * 3600
65
_TMO_1DAY = 86400
66

    
67
# Timeout table that will be built later by decorators
68
# Guidelines for choosing timeouts:
69
# - call used during watcher: timeout -> 1min, _TMO_URGENT
70
# - trivial (but be sure it is trivial) (e.g. reading a file): 5min, _TMO_FAST
71
# - other calls: 15 min, _TMO_NORMAL
72
# - special calls (instance add, etc.): either _TMO_SLOW (1h) or huge timeouts
73

    
74
_TIMEOUTS = {
75
}
76

    
77

    
78
def Init():
79
  """Initializes the module-global HTTP client manager.
80

81
  Must be called before using any RPC function and while exactly one thread is
82
  running.
83

84
  """
85
  # curl_global_init(3) and curl_global_cleanup(3) must be called with only
86
  # one thread running. This check is just a safety measure -- it doesn't
87
  # cover all cases.
88
  assert threading.activeCount() == 1, \
89
         "Found more than one active thread when initializing pycURL"
90

    
91
  logging.info("Using PycURL %s", pycurl.version)
92

    
93
  pycurl.global_init(pycurl.GLOBAL_ALL)
94

    
95

    
96
def Shutdown():
97
  """Stops the module-global HTTP client manager.
98

99
  Must be called before quitting the program and while exactly one thread is
100
  running.
101

102
  """
103
  pycurl.global_cleanup()
104

    
105

    
106
def _ConfigRpcCurl(curl):
107
  noded_cert = str(constants.NODED_CERT_FILE)
108

    
109
  curl.setopt(pycurl.FOLLOWLOCATION, False)
110
  curl.setopt(pycurl.CAINFO, noded_cert)
111
  curl.setopt(pycurl.SSL_VERIFYHOST, 0)
112
  curl.setopt(pycurl.SSL_VERIFYPEER, True)
113
  curl.setopt(pycurl.SSLCERTTYPE, "PEM")
114
  curl.setopt(pycurl.SSLCERT, noded_cert)
115
  curl.setopt(pycurl.SSLKEYTYPE, "PEM")
116
  curl.setopt(pycurl.SSLKEY, noded_cert)
117
  curl.setopt(pycurl.CONNECTTIMEOUT, _RPC_CONNECT_TIMEOUT)
118

    
119

    
120
class _RpcThreadLocal(threading.local):
121
  def GetHttpClientPool(self):
122
    """Returns a per-thread HTTP client pool.
123

124
    @rtype: L{http.client.HttpClientPool}
125

126
    """
127
    try:
128
      pool = self.hcp
129
    except AttributeError:
130
      pool = http.client.HttpClientPool(_ConfigRpcCurl)
131
      self.hcp = pool
132

    
133
    return pool
134

    
135

    
136
_thread_local = _RpcThreadLocal()
137

    
138

    
139
def _RpcTimeout(secs):
140
  """Timeout decorator.
141

142
  When applied to a rpc call_* function, it updates the global timeout
143
  table with the given function/timeout.
144

145
  """
146
  def decorator(f):
147
    name = f.__name__
148
    assert name.startswith("call_")
149
    _TIMEOUTS[name[len("call_"):]] = secs
150
    return f
151
  return decorator
152

    
153

    
154
def RunWithRPC(fn):
155
  """RPC-wrapper decorator.
156

157
  When applied to a function, it runs it with the RPC system
158
  initialized, and it shutsdown the system afterwards. This means the
159
  function must be called without RPC being initialized.
160

161
  """
162
  def wrapper(*args, **kwargs):
163
    Init()
164
    try:
165
      return fn(*args, **kwargs)
166
    finally:
167
      Shutdown()
168
  return wrapper
169

    
170

    
171
class RpcResult(object):
172
  """RPC Result class.
173

174
  This class holds an RPC result. It is needed since in multi-node
175
  calls we can't raise an exception just because one one out of many
176
  failed, and therefore we use this class to encapsulate the result.
177

178
  @ivar data: the data payload, for successful results, or None
179
  @ivar call: the name of the RPC call
180
  @ivar node: the name of the node to which we made the call
181
  @ivar offline: whether the operation failed because the node was
182
      offline, as opposed to actual failure; offline=True will always
183
      imply failed=True, in order to allow simpler checking if
184
      the user doesn't care about the exact failure mode
185
  @ivar fail_msg: the error message if the call failed
186

187
  """
188
  def __init__(self, data=None, failed=False, offline=False,
189
               call=None, node=None):
190
    self.offline = offline
191
    self.call = call
192
    self.node = node
193

    
194
    if offline:
195
      self.fail_msg = "Node is marked offline"
196
      self.data = self.payload = None
197
    elif failed:
198
      self.fail_msg = self._EnsureErr(data)
199
      self.data = self.payload = None
200
    else:
201
      self.data = data
202
      if not isinstance(self.data, (tuple, list)):
203
        self.fail_msg = ("RPC layer error: invalid result type (%s)" %
204
                         type(self.data))
205
        self.payload = None
206
      elif len(data) != 2:
207
        self.fail_msg = ("RPC layer error: invalid result length (%d), "
208
                         "expected 2" % len(self.data))
209
        self.payload = None
210
      elif not self.data[0]:
211
        self.fail_msg = self._EnsureErr(self.data[1])
212
        self.payload = None
213
      else:
214
        # finally success
215
        self.fail_msg = None
216
        self.payload = data[1]
217

    
218
    assert hasattr(self, "call")
219
    assert hasattr(self, "data")
220
    assert hasattr(self, "fail_msg")
221
    assert hasattr(self, "node")
222
    assert hasattr(self, "offline")
223
    assert hasattr(self, "payload")
224

    
225
  @staticmethod
226
  def _EnsureErr(val):
227
    """Helper to ensure we return a 'True' value for error."""
228
    if val:
229
      return val
230
    else:
231
      return "No error information"
232

    
233
  def Raise(self, msg, prereq=False, ecode=None):
234
    """If the result has failed, raise an OpExecError.
235

236
    This is used so that LU code doesn't have to check for each
237
    result, but instead can call this function.
238

239
    """
240
    if not self.fail_msg:
241
      return
242

    
243
    if not msg: # one could pass None for default message
244
      msg = ("Call '%s' to node '%s' has failed: %s" %
245
             (self.call, self.node, self.fail_msg))
246
    else:
247
      msg = "%s: %s" % (msg, self.fail_msg)
248
    if prereq:
249
      ec = errors.OpPrereqError
250
    else:
251
      ec = errors.OpExecError
252
    if ecode is not None:
253
      args = (msg, ecode)
254
    else:
255
      args = (msg, )
256
    raise ec(*args) # pylint: disable-msg=W0142
257

    
258

    
259
class Client:
260
  """RPC Client class.
261

262
  This class, given a (remote) method name, a list of parameters and a
263
  list of nodes, will contact (in parallel) all nodes, and return a
264
  dict of results (key: node name, value: result).
265

266
  One current bug is that generic failure is still signaled by
267
  'False' result, which is not good. This overloading of values can
268
  cause bugs.
269

270
  """
271
  def __init__(self, procedure, body, port):
272
    assert procedure in _TIMEOUTS, ("New RPC call not declared in the"
273
                                    " timeouts table")
274
    self.procedure = procedure
275
    self.body = body
276
    self.port = port
277
    self._request = {}
278

    
279
  def ConnectList(self, node_list, address_list=None, read_timeout=None):
280
    """Add a list of nodes to the target nodes.
281

282
    @type node_list: list
283
    @param node_list: the list of node names to connect
284
    @type address_list: list or None
285
    @keyword address_list: either None or a list with node addresses,
286
        which must have the same length as the node list
287
    @type read_timeout: int
288
    @param read_timeout: overwrites the default read timeout for the
289
        given operation
290

291
    """
292
    if address_list is None:
293
      address_list = [None for _ in node_list]
294
    else:
295
      assert len(node_list) == len(address_list), \
296
             "Name and address lists should have the same length"
297
    for node, address in zip(node_list, address_list):
298
      self.ConnectNode(node, address, read_timeout=read_timeout)
299

    
300
  def ConnectNode(self, name, address=None, read_timeout=None):
301
    """Add a node to the target list.
302

303
    @type name: str
304
    @param name: the node name
305
    @type address: str
306
    @keyword address: the node address, if known
307

308
    """
309
    if address is None:
310
      address = name
311

    
312
    if read_timeout is None:
313
      read_timeout = _TIMEOUTS[self.procedure]
314

    
315
    self._request[name] = \
316
      http.client.HttpClientRequest(str(address), self.port,
317
                                    http.HTTP_PUT, str("/%s" % self.procedure),
318
                                    headers=_RPC_CLIENT_HEADERS,
319
                                    post_data=str(self.body),
320
                                    read_timeout=read_timeout)
321

    
322
  def GetResults(self, http_pool=None):
323
    """Call nodes and return results.
324

325
    @rtype: list
326
    @return: List of RPC results
327

328
    """
329
    if not http_pool:
330
      http_pool = _thread_local.GetHttpClientPool()
331

    
332
    http_pool.ProcessRequests(self._request.values())
333

    
334
    results = {}
335

    
336
    for name, req in self._request.iteritems():
337
      if req.success and req.resp_status_code == http.HTTP_OK:
338
        results[name] = RpcResult(data=serializer.LoadJson(req.resp_body),
339
                                  node=name, call=self.procedure)
340
        continue
341

    
342
      # TODO: Better error reporting
343
      if req.error:
344
        msg = req.error
345
      else:
346
        msg = req.resp_body
347

    
348
      logging.error("RPC error in %s from node %s: %s",
349
                    self.procedure, name, msg)
350
      results[name] = RpcResult(data=msg, failed=True, node=name,
351
                                call=self.procedure)
352

    
353
    return results
354

    
355

    
356
def _EncodeImportExportIO(ieio, ieioargs):
357
  """Encodes import/export I/O information.
358

359
  """
360
  if ieio == constants.IEIO_RAW_DISK:
361
    assert len(ieioargs) == 1
362
    return (ieioargs[0].ToDict(), )
363

    
364
  if ieio == constants.IEIO_SCRIPT:
365
    assert len(ieioargs) == 2
366
    return (ieioargs[0].ToDict(), ieioargs[1])
367

    
368
  return ieioargs
369

    
370

    
371
class RpcRunner(object):
372
  """RPC runner class"""
373

    
374
  def __init__(self, cfg):
375
    """Initialized the rpc runner.
376

377
    @type cfg:  C{config.ConfigWriter}
378
    @param cfg: the configuration object that will be used to get data
379
                about the cluster
380

381
    """
382
    self._cfg = cfg
383
    self.port = netutils.GetDaemonPort(constants.NODED)
384

    
385
  def _InstDict(self, instance, hvp=None, bep=None, osp=None):
386
    """Convert the given instance to a dict.
387

388
    This is done via the instance's ToDict() method and additionally
389
    we fill the hvparams with the cluster defaults.
390

391
    @type instance: L{objects.Instance}
392
    @param instance: an Instance object
393
    @type hvp: dict or None
394
    @param hvp: a dictionary with overridden hypervisor parameters
395
    @type bep: dict or None
396
    @param bep: a dictionary with overridden backend parameters
397
    @type osp: dict or None
398
    @param osp: a dictionary with overriden os parameters
399
    @rtype: dict
400
    @return: the instance dict, with the hvparams filled with the
401
        cluster defaults
402

403
    """
404
    idict = instance.ToDict()
405
    cluster = self._cfg.GetClusterInfo()
406
    idict["hvparams"] = cluster.FillHV(instance)
407
    if hvp is not None:
408
      idict["hvparams"].update(hvp)
409
    idict["beparams"] = cluster.FillBE(instance)
410
    if bep is not None:
411
      idict["beparams"].update(bep)
412
    idict["osparams"] = cluster.SimpleFillOS(instance.os, instance.osparams)
413
    if osp is not None:
414
      idict["osparams"].update(osp)
415
    for nic in idict["nics"]:
416
      nic['nicparams'] = objects.FillDict(
417
        cluster.nicparams[constants.PP_DEFAULT],
418
        nic['nicparams'])
419
    return idict
420

    
421
  def _ConnectList(self, client, node_list, call, read_timeout=None):
422
    """Helper for computing node addresses.
423

424
    @type client: L{ganeti.rpc.Client}
425
    @param client: a C{Client} instance
426
    @type node_list: list
427
    @param node_list: the node list we should connect
428
    @type call: string
429
    @param call: the name of the remote procedure call, for filling in
430
        correctly any eventual offline nodes' results
431
    @type read_timeout: int
432
    @param read_timeout: overwrites the default read timeout for the
433
        given operation
434

435
    """
436
    all_nodes = self._cfg.GetAllNodesInfo()
437
    name_list = []
438
    addr_list = []
439
    skip_dict = {}
440
    for node in node_list:
441
      if node in all_nodes:
442
        if all_nodes[node].offline:
443
          skip_dict[node] = RpcResult(node=node, offline=True, call=call)
444
          continue
445
        val = all_nodes[node].primary_ip
446
      else:
447
        val = None
448
      addr_list.append(val)
449
      name_list.append(node)
450
    if name_list:
451
      client.ConnectList(name_list, address_list=addr_list,
452
                         read_timeout=read_timeout)
453
    return skip_dict
454

    
455
  def _ConnectNode(self, client, node, call, read_timeout=None):
456
    """Helper for computing one node's address.
457

458
    @type client: L{ganeti.rpc.Client}
459
    @param client: a C{Client} instance
460
    @type node: str
461
    @param node: the node we should connect
462
    @type call: string
463
    @param call: the name of the remote procedure call, for filling in
464
        correctly any eventual offline nodes' results
465
    @type read_timeout: int
466
    @param read_timeout: overwrites the default read timeout for the
467
        given operation
468

469
    """
470
    node_info = self._cfg.GetNodeInfo(node)
471
    if node_info is not None:
472
      if node_info.offline:
473
        return RpcResult(node=node, offline=True, call=call)
474
      addr = node_info.primary_ip
475
    else:
476
      addr = None
477
    client.ConnectNode(node, address=addr, read_timeout=read_timeout)
478

    
479
  def _MultiNodeCall(self, node_list, procedure, args, read_timeout=None):
480
    """Helper for making a multi-node call
481

482
    """
483
    body = serializer.DumpJson(args, indent=False)
484
    c = Client(procedure, body, self.port)
485
    skip_dict = self._ConnectList(c, node_list, procedure,
486
                                  read_timeout=read_timeout)
487
    skip_dict.update(c.GetResults())
488
    return skip_dict
489

    
490
  @classmethod
491
  def _StaticMultiNodeCall(cls, node_list, procedure, args,
492
                           address_list=None, read_timeout=None):
493
    """Helper for making a multi-node static call
494

495
    """
496
    body = serializer.DumpJson(args, indent=False)
497
    c = Client(procedure, body, netutils.GetDaemonPort(constants.NODED))
498
    c.ConnectList(node_list, address_list=address_list,
499
                  read_timeout=read_timeout)
500
    return c.GetResults()
501

    
502
  def _SingleNodeCall(self, node, procedure, args, read_timeout=None):
503
    """Helper for making a single-node call
504

505
    """
506
    body = serializer.DumpJson(args, indent=False)
507
    c = Client(procedure, body, self.port)
508
    result = self._ConnectNode(c, node, procedure, read_timeout=read_timeout)
509
    if result is None:
510
      # we did connect, node is not offline
511
      result = c.GetResults()[node]
512
    return result
513

    
514
  @classmethod
515
  def _StaticSingleNodeCall(cls, node, procedure, args, read_timeout=None):
516
    """Helper for making a single-node static call
517

518
    """
519
    body = serializer.DumpJson(args, indent=False)
520
    c = Client(procedure, body, netutils.GetDaemonPort(constants.NODED))
521
    c.ConnectNode(node, read_timeout=read_timeout)
522
    return c.GetResults()[node]
523

    
524
  @staticmethod
525
  def _Compress(data):
526
    """Compresses a string for transport over RPC.
527

528
    Small amounts of data are not compressed.
529

530
    @type data: str
531
    @param data: Data
532
    @rtype: tuple
533
    @return: Encoded data to send
534

535
    """
536
    # Small amounts of data are not compressed
537
    if len(data) < 512:
538
      return (constants.RPC_ENCODING_NONE, data)
539

    
540
    # Compress with zlib and encode in base64
541
    return (constants.RPC_ENCODING_ZLIB_BASE64,
542
            base64.b64encode(zlib.compress(data, 3)))
543

    
544
  #
545
  # Begin RPC calls
546
  #
547

    
548
  @_RpcTimeout(_TMO_URGENT)
549
  def call_lv_list(self, node_list, vg_name):
550
    """Gets the logical volumes present in a given volume group.
551

552
    This is a multi-node call.
553

554
    """
555
    return self._MultiNodeCall(node_list, "lv_list", [vg_name])
556

    
557
  @_RpcTimeout(_TMO_URGENT)
558
  def call_vg_list(self, node_list):
559
    """Gets the volume group list.
560

561
    This is a multi-node call.
562

563
    """
564
    return self._MultiNodeCall(node_list, "vg_list", [])
565

    
566
  @_RpcTimeout(_TMO_NORMAL)
567
  def call_storage_list(self, node_list, su_name, su_args, name, fields):
568
    """Get list of storage units.
569

570
    This is a multi-node call.
571

572
    """
573
    return self._MultiNodeCall(node_list, "storage_list",
574
                               [su_name, su_args, name, fields])
575

    
576
  @_RpcTimeout(_TMO_NORMAL)
577
  def call_storage_modify(self, node, su_name, su_args, name, changes):
578
    """Modify a storage unit.
579

580
    This is a single-node call.
581

582
    """
583
    return self._SingleNodeCall(node, "storage_modify",
584
                                [su_name, su_args, name, changes])
585

    
586
  @_RpcTimeout(_TMO_NORMAL)
587
  def call_storage_execute(self, node, su_name, su_args, name, op):
588
    """Executes an operation on a storage unit.
589

590
    This is a single-node call.
591

592
    """
593
    return self._SingleNodeCall(node, "storage_execute",
594
                                [su_name, su_args, name, op])
595

    
596
  @_RpcTimeout(_TMO_URGENT)
597
  def call_bridges_exist(self, node, bridges_list):
598
    """Checks if a node has all the bridges given.
599

600
    This method checks if all bridges given in the bridges_list are
601
    present on the remote node, so that an instance that uses interfaces
602
    on those bridges can be started.
603

604
    This is a single-node call.
605

606
    """
607
    return self._SingleNodeCall(node, "bridges_exist", [bridges_list])
608

    
609
  @_RpcTimeout(_TMO_NORMAL)
610
  def call_instance_start(self, node, instance, hvp, bep):
611
    """Starts an instance.
612

613
    This is a single-node call.
614

615
    """
616
    idict = self._InstDict(instance, hvp=hvp, bep=bep)
617
    return self._SingleNodeCall(node, "instance_start", [idict])
618

    
619
  @_RpcTimeout(_TMO_NORMAL)
620
  def call_instance_shutdown(self, node, instance, timeout):
621
    """Stops an instance.
622

623
    This is a single-node call.
624

625
    """
626
    return self._SingleNodeCall(node, "instance_shutdown",
627
                                [self._InstDict(instance), timeout])
628

    
629
  @_RpcTimeout(_TMO_NORMAL)
630
  def call_migration_info(self, node, instance):
631
    """Gather the information necessary to prepare an instance migration.
632

633
    This is a single-node call.
634

635
    @type node: string
636
    @param node: the node on which the instance is currently running
637
    @type instance: C{objects.Instance}
638
    @param instance: the instance definition
639

640
    """
641
    return self._SingleNodeCall(node, "migration_info",
642
                                [self._InstDict(instance)])
643

    
644
  @_RpcTimeout(_TMO_NORMAL)
645
  def call_accept_instance(self, node, instance, info, target):
646
    """Prepare a node to accept an instance.
647

648
    This is a single-node call.
649

650
    @type node: string
651
    @param node: the target node for the migration
652
    @type instance: C{objects.Instance}
653
    @param instance: the instance definition
654
    @type info: opaque/hypervisor specific (string/data)
655
    @param info: result for the call_migration_info call
656
    @type target: string
657
    @param target: target hostname (usually ip address) (on the node itself)
658

659
    """
660
    return self._SingleNodeCall(node, "accept_instance",
661
                                [self._InstDict(instance), info, target])
662

    
663
  @_RpcTimeout(_TMO_NORMAL)
664
  def call_finalize_migration(self, node, instance, info, success):
665
    """Finalize any target-node migration specific operation.
666

667
    This is called both in case of a successful migration and in case of error
668
    (in which case it should abort the migration).
669

670
    This is a single-node call.
671

672
    @type node: string
673
    @param node: the target node for the migration
674
    @type instance: C{objects.Instance}
675
    @param instance: the instance definition
676
    @type info: opaque/hypervisor specific (string/data)
677
    @param info: result for the call_migration_info call
678
    @type success: boolean
679
    @param success: whether the migration was a success or a failure
680

681
    """
682
    return self._SingleNodeCall(node, "finalize_migration",
683
                                [self._InstDict(instance), info, success])
684

    
685
  @_RpcTimeout(_TMO_SLOW)
686
  def call_instance_migrate(self, node, instance, target, live):
687
    """Migrate an instance.
688

689
    This is a single-node call.
690

691
    @type node: string
692
    @param node: the node on which the instance is currently running
693
    @type instance: C{objects.Instance}
694
    @param instance: the instance definition
695
    @type target: string
696
    @param target: the target node name
697
    @type live: boolean
698
    @param live: whether the migration should be done live or not (the
699
        interpretation of this parameter is left to the hypervisor)
700

701
    """
702
    return self._SingleNodeCall(node, "instance_migrate",
703
                                [self._InstDict(instance), target, live])
704

    
705
  @_RpcTimeout(_TMO_NORMAL)
706
  def call_instance_reboot(self, node, inst, reboot_type, shutdown_timeout):
707
    """Reboots an instance.
708

709
    This is a single-node call.
710

711
    """
712
    return self._SingleNodeCall(node, "instance_reboot",
713
                                [self._InstDict(inst), reboot_type,
714
                                 shutdown_timeout])
715

    
716
  @_RpcTimeout(_TMO_1DAY)
717
  def call_instance_os_add(self, node, inst, reinstall, debug):
718
    """Installs an OS on the given instance.
719

720
    This is a single-node call.
721

722
    """
723
    return self._SingleNodeCall(node, "instance_os_add",
724
                                [self._InstDict(inst), reinstall, debug])
725

    
726
  @_RpcTimeout(_TMO_SLOW)
727
  def call_instance_run_rename(self, node, inst, old_name, debug):
728
    """Run the OS rename script for an instance.
729

730
    This is a single-node call.
731

732
    """
733
    return self._SingleNodeCall(node, "instance_run_rename",
734
                                [self._InstDict(inst), old_name, debug])
735

    
736
  @_RpcTimeout(_TMO_URGENT)
737
  def call_instance_info(self, node, instance, hname):
738
    """Returns information about a single instance.
739

740
    This is a single-node call.
741

742
    @type node: list
743
    @param node: the list of nodes to query
744
    @type instance: string
745
    @param instance: the instance name
746
    @type hname: string
747
    @param hname: the hypervisor type of the instance
748

749
    """
750
    return self._SingleNodeCall(node, "instance_info", [instance, hname])
751

    
752
  @_RpcTimeout(_TMO_NORMAL)
753
  def call_instance_migratable(self, node, instance):
754
    """Checks whether the given instance can be migrated.
755

756
    This is a single-node call.
757

758
    @param node: the node to query
759
    @type instance: L{objects.Instance}
760
    @param instance: the instance to check
761

762

763
    """
764
    return self._SingleNodeCall(node, "instance_migratable",
765
                                [self._InstDict(instance)])
766

    
767
  @_RpcTimeout(_TMO_URGENT)
768
  def call_all_instances_info(self, node_list, hypervisor_list):
769
    """Returns information about all instances on the given nodes.
770

771
    This is a multi-node call.
772

773
    @type node_list: list
774
    @param node_list: the list of nodes to query
775
    @type hypervisor_list: list
776
    @param hypervisor_list: the hypervisors to query for instances
777

778
    """
779
    return self._MultiNodeCall(node_list, "all_instances_info",
780
                               [hypervisor_list])
781

    
782
  @_RpcTimeout(_TMO_URGENT)
783
  def call_instance_list(self, node_list, hypervisor_list):
784
    """Returns the list of running instances on a given node.
785

786
    This is a multi-node call.
787

788
    @type node_list: list
789
    @param node_list: the list of nodes to query
790
    @type hypervisor_list: list
791
    @param hypervisor_list: the hypervisors to query for instances
792

793
    """
794
    return self._MultiNodeCall(node_list, "instance_list", [hypervisor_list])
795

    
796
  @_RpcTimeout(_TMO_FAST)
797
  def call_node_tcp_ping(self, node, source, target, port, timeout,
798
                         live_port_needed):
799
    """Do a TcpPing on the remote node
800

801
    This is a single-node call.
802

803
    """
804
    return self._SingleNodeCall(node, "node_tcp_ping",
805
                                [source, target, port, timeout,
806
                                 live_port_needed])
807

    
808
  @_RpcTimeout(_TMO_FAST)
809
  def call_node_has_ip_address(self, node, address):
810
    """Checks if a node has the given IP address.
811

812
    This is a single-node call.
813

814
    """
815
    return self._SingleNodeCall(node, "node_has_ip_address", [address])
816

    
817
  @_RpcTimeout(_TMO_URGENT)
818
  def call_node_info(self, node_list, vg_name, hypervisor_type):
819
    """Return node information.
820

821
    This will return memory information and volume group size and free
822
    space.
823

824
    This is a multi-node call.
825

826
    @type node_list: list
827
    @param node_list: the list of nodes to query
828
    @type vg_name: C{string}
829
    @param vg_name: the name of the volume group to ask for disk space
830
        information
831
    @type hypervisor_type: C{str}
832
    @param hypervisor_type: the name of the hypervisor to ask for
833
        memory information
834

835
    """
836
    return self._MultiNodeCall(node_list, "node_info",
837
                               [vg_name, hypervisor_type])
838

    
839
  @_RpcTimeout(_TMO_NORMAL)
840
  def call_node_add(self, node, dsa, dsapub, rsa, rsapub, ssh, sshpub):
841
    """Add a node to the cluster.
842

843
    This is a single-node call.
844

845
    """
846
    return self._SingleNodeCall(node, "node_add",
847
                                [dsa, dsapub, rsa, rsapub, ssh, sshpub])
848

    
849
  @_RpcTimeout(_TMO_NORMAL)
850
  def call_node_verify(self, node_list, checkdict, cluster_name):
851
    """Request verification of given parameters.
852

853
    This is a multi-node call.
854

855
    """
856
    return self._MultiNodeCall(node_list, "node_verify",
857
                               [checkdict, cluster_name])
858

    
859
  @classmethod
860
  @_RpcTimeout(_TMO_FAST)
861
  def call_node_start_master(cls, node, start_daemons, no_voting):
862
    """Tells a node to activate itself as a master.
863

864
    This is a single-node call.
865

866
    """
867
    return cls._StaticSingleNodeCall(node, "node_start_master",
868
                                     [start_daemons, no_voting])
869

    
870
  @classmethod
871
  @_RpcTimeout(_TMO_FAST)
872
  def call_node_stop_master(cls, node, stop_daemons):
873
    """Tells a node to demote itself from master status.
874

875
    This is a single-node call.
876

877
    """
878
    return cls._StaticSingleNodeCall(node, "node_stop_master", [stop_daemons])
879

    
880
  @classmethod
881
  @_RpcTimeout(_TMO_URGENT)
882
  def call_master_info(cls, node_list):
883
    """Query master info.
884

885
    This is a multi-node call.
886

887
    """
888
    # TODO: should this method query down nodes?
889
    return cls._StaticMultiNodeCall(node_list, "master_info", [])
890

    
891
  @classmethod
892
  @_RpcTimeout(_TMO_URGENT)
893
  def call_version(cls, node_list):
894
    """Query node version.
895

896
    This is a multi-node call.
897

898
    """
899
    return cls._StaticMultiNodeCall(node_list, "version", [])
900

    
901
  @_RpcTimeout(_TMO_NORMAL)
902
  def call_blockdev_create(self, node, bdev, size, owner, on_primary, info):
903
    """Request creation of a given block device.
904

905
    This is a single-node call.
906

907
    """
908
    return self._SingleNodeCall(node, "blockdev_create",
909
                                [bdev.ToDict(), size, owner, on_primary, info])
910

    
911
  @_RpcTimeout(_TMO_NORMAL)
912
  def call_blockdev_remove(self, node, bdev):
913
    """Request removal of a given block device.
914

915
    This is a single-node call.
916

917
    """
918
    return self._SingleNodeCall(node, "blockdev_remove", [bdev.ToDict()])
919

    
920
  @_RpcTimeout(_TMO_NORMAL)
921
  def call_blockdev_rename(self, node, devlist):
922
    """Request rename of the given block devices.
923

924
    This is a single-node call.
925

926
    """
927
    return self._SingleNodeCall(node, "blockdev_rename",
928
                                [(d.ToDict(), uid) for d, uid in devlist])
929

    
930
  @_RpcTimeout(_TMO_NORMAL)
931
  def call_blockdev_assemble(self, node, disk, owner, on_primary):
932
    """Request assembling of a given block device.
933

934
    This is a single-node call.
935

936
    """
937
    return self._SingleNodeCall(node, "blockdev_assemble",
938
                                [disk.ToDict(), owner, on_primary])
939

    
940
  @_RpcTimeout(_TMO_NORMAL)
941
  def call_blockdev_shutdown(self, node, disk):
942
    """Request shutdown of a given block device.
943

944
    This is a single-node call.
945

946
    """
947
    return self._SingleNodeCall(node, "blockdev_shutdown", [disk.ToDict()])
948

    
949
  @_RpcTimeout(_TMO_NORMAL)
950
  def call_blockdev_addchildren(self, node, bdev, ndevs):
951
    """Request adding a list of children to a (mirroring) device.
952

953
    This is a single-node call.
954

955
    """
956
    return self._SingleNodeCall(node, "blockdev_addchildren",
957
                                [bdev.ToDict(),
958
                                 [disk.ToDict() for disk in ndevs]])
959

    
960
  @_RpcTimeout(_TMO_NORMAL)
961
  def call_blockdev_removechildren(self, node, bdev, ndevs):
962
    """Request removing a list of children from a (mirroring) device.
963

964
    This is a single-node call.
965

966
    """
967
    return self._SingleNodeCall(node, "blockdev_removechildren",
968
                                [bdev.ToDict(),
969
                                 [disk.ToDict() for disk in ndevs]])
970

    
971
  @_RpcTimeout(_TMO_NORMAL)
972
  def call_blockdev_getmirrorstatus(self, node, disks):
973
    """Request status of a (mirroring) device.
974

975
    This is a single-node call.
976

977
    """
978
    result = self._SingleNodeCall(node, "blockdev_getmirrorstatus",
979
                                  [dsk.ToDict() for dsk in disks])
980
    if not result.fail_msg:
981
      result.payload = [objects.BlockDevStatus.FromDict(i)
982
                        for i in result.payload]
983
    return result
984

    
985
  @_RpcTimeout(_TMO_NORMAL)
986
  def call_blockdev_find(self, node, disk):
987
    """Request identification of a given block device.
988

989
    This is a single-node call.
990

991
    """
992
    result = self._SingleNodeCall(node, "blockdev_find", [disk.ToDict()])
993
    if not result.fail_msg and result.payload is not None:
994
      result.payload = objects.BlockDevStatus.FromDict(result.payload)
995
    return result
996

    
997
  @_RpcTimeout(_TMO_NORMAL)
998
  def call_blockdev_close(self, node, instance_name, disks):
999
    """Closes the given block devices.
1000

1001
    This is a single-node call.
1002

1003
    """
1004
    params = [instance_name, [cf.ToDict() for cf in disks]]
1005
    return self._SingleNodeCall(node, "blockdev_close", params)
1006

    
1007
  @_RpcTimeout(_TMO_NORMAL)
1008
  def call_blockdev_getsizes(self, node, disks):
1009
    """Returns the size of the given disks.
1010

1011
    This is a single-node call.
1012

1013
    """
1014
    params = [[cf.ToDict() for cf in disks]]
1015
    return self._SingleNodeCall(node, "blockdev_getsize", params)
1016

    
1017
  @_RpcTimeout(_TMO_NORMAL)
1018
  def call_drbd_disconnect_net(self, node_list, nodes_ip, disks):
1019
    """Disconnects the network of the given drbd devices.
1020

1021
    This is a multi-node call.
1022

1023
    """
1024
    return self._MultiNodeCall(node_list, "drbd_disconnect_net",
1025
                               [nodes_ip, [cf.ToDict() for cf in disks]])
1026

    
1027
  @_RpcTimeout(_TMO_NORMAL)
1028
  def call_drbd_attach_net(self, node_list, nodes_ip,
1029
                           disks, instance_name, multimaster):
1030
    """Disconnects the given drbd devices.
1031

1032
    This is a multi-node call.
1033

1034
    """
1035
    return self._MultiNodeCall(node_list, "drbd_attach_net",
1036
                               [nodes_ip, [cf.ToDict() for cf in disks],
1037
                                instance_name, multimaster])
1038

    
1039
  @_RpcTimeout(_TMO_SLOW)
1040
  def call_drbd_wait_sync(self, node_list, nodes_ip, disks):
1041
    """Waits for the synchronization of drbd devices is complete.
1042

1043
    This is a multi-node call.
1044

1045
    """
1046
    return self._MultiNodeCall(node_list, "drbd_wait_sync",
1047
                               [nodes_ip, [cf.ToDict() for cf in disks]])
1048

    
1049
  @_RpcTimeout(_TMO_URGENT)
1050
  def call_drbd_helper(self, node_list):
1051
    """Gets drbd helper.
1052

1053
    This is a multi-node call.
1054

1055
    """
1056
    return self._MultiNodeCall(node_list, "drbd_helper", [])
1057

    
1058
  @classmethod
1059
  @_RpcTimeout(_TMO_NORMAL)
1060
  def call_upload_file(cls, node_list, file_name, address_list=None):
1061
    """Upload a file.
1062

1063
    The node will refuse the operation in case the file is not on the
1064
    approved file list.
1065

1066
    This is a multi-node call.
1067

1068
    @type node_list: list
1069
    @param node_list: the list of node names to upload to
1070
    @type file_name: str
1071
    @param file_name: the filename to upload
1072
    @type address_list: list or None
1073
    @keyword address_list: an optional list of node addresses, in order
1074
        to optimize the RPC speed
1075

1076
    """
1077
    file_contents = utils.ReadFile(file_name)
1078
    data = cls._Compress(file_contents)
1079
    st = os.stat(file_name)
1080
    params = [file_name, data, st.st_mode, st.st_uid, st.st_gid,
1081
              st.st_atime, st.st_mtime]
1082
    return cls._StaticMultiNodeCall(node_list, "upload_file", params,
1083
                                    address_list=address_list)
1084

    
1085
  @classmethod
1086
  @_RpcTimeout(_TMO_NORMAL)
1087
  def call_write_ssconf_files(cls, node_list, values):
1088
    """Write ssconf files.
1089

1090
    This is a multi-node call.
1091

1092
    """
1093
    return cls._StaticMultiNodeCall(node_list, "write_ssconf_files", [values])
1094

    
1095
  @_RpcTimeout(_TMO_FAST)
1096
  def call_os_diagnose(self, node_list):
1097
    """Request a diagnose of OS definitions.
1098

1099
    This is a multi-node call.
1100

1101
    """
1102
    return self._MultiNodeCall(node_list, "os_diagnose", [])
1103

    
1104
  @_RpcTimeout(_TMO_FAST)
1105
  def call_os_get(self, node, name):
1106
    """Returns an OS definition.
1107

1108
    This is a single-node call.
1109

1110
    """
1111
    result = self._SingleNodeCall(node, "os_get", [name])
1112
    if not result.fail_msg and isinstance(result.payload, dict):
1113
      result.payload = objects.OS.FromDict(result.payload)
1114
    return result
1115

    
1116
  @_RpcTimeout(_TMO_FAST)
1117
  def call_os_validate(self, required, nodes, name, checks, params):
1118
    """Run a validation routine for a given OS.
1119

1120
    This is a multi-node call.
1121

1122
    """
1123
    return self._MultiNodeCall(nodes, "os_validate",
1124
                               [required, name, checks, params])
1125

    
1126
  @_RpcTimeout(_TMO_NORMAL)
1127
  def call_hooks_runner(self, node_list, hpath, phase, env):
1128
    """Call the hooks runner.
1129

1130
    Args:
1131
      - op: the OpCode instance
1132
      - env: a dictionary with the environment
1133

1134
    This is a multi-node call.
1135

1136
    """
1137
    params = [hpath, phase, env]
1138
    return self._MultiNodeCall(node_list, "hooks_runner", params)
1139

    
1140
  @_RpcTimeout(_TMO_NORMAL)
1141
  def call_iallocator_runner(self, node, name, idata):
1142
    """Call an iallocator on a remote node
1143

1144
    Args:
1145
      - name: the iallocator name
1146
      - input: the json-encoded input string
1147

1148
    This is a single-node call.
1149

1150
    """
1151
    return self._SingleNodeCall(node, "iallocator_runner", [name, idata])
1152

    
1153
  @_RpcTimeout(_TMO_NORMAL)
1154
  def call_blockdev_grow(self, node, cf_bdev, amount):
1155
    """Request a snapshot of the given block device.
1156

1157
    This is a single-node call.
1158

1159
    """
1160
    return self._SingleNodeCall(node, "blockdev_grow",
1161
                                [cf_bdev.ToDict(), amount])
1162

    
1163
  @_RpcTimeout(_TMO_1DAY)
1164
  def call_blockdev_export(self, node, cf_bdev,
1165
                           dest_node, dest_path, cluster_name):
1166
    """Export a given disk to another node.
1167

1168
    This is a single-node call.
1169

1170
    """
1171
    return self._SingleNodeCall(node, "blockdev_export",
1172
                                [cf_bdev.ToDict(), dest_node, dest_path,
1173
                                 cluster_name])
1174

    
1175
  @_RpcTimeout(_TMO_NORMAL)
1176
  def call_blockdev_snapshot(self, node, cf_bdev):
1177
    """Request a snapshot of the given block device.
1178

1179
    This is a single-node call.
1180

1181
    """
1182
    return self._SingleNodeCall(node, "blockdev_snapshot", [cf_bdev.ToDict()])
1183

    
1184
  @_RpcTimeout(_TMO_NORMAL)
1185
  def call_finalize_export(self, node, instance, snap_disks):
1186
    """Request the completion of an export operation.
1187

1188
    This writes the export config file, etc.
1189

1190
    This is a single-node call.
1191

1192
    """
1193
    flat_disks = []
1194
    for disk in snap_disks:
1195
      if isinstance(disk, bool):
1196
        flat_disks.append(disk)
1197
      else:
1198
        flat_disks.append(disk.ToDict())
1199

    
1200
    return self._SingleNodeCall(node, "finalize_export",
1201
                                [self._InstDict(instance), flat_disks])
1202

    
1203
  @_RpcTimeout(_TMO_FAST)
1204
  def call_export_info(self, node, path):
1205
    """Queries the export information in a given path.
1206

1207
    This is a single-node call.
1208

1209
    """
1210
    return self._SingleNodeCall(node, "export_info", [path])
1211

    
1212
  @_RpcTimeout(_TMO_FAST)
1213
  def call_export_list(self, node_list):
1214
    """Gets the stored exports list.
1215

1216
    This is a multi-node call.
1217

1218
    """
1219
    return self._MultiNodeCall(node_list, "export_list", [])
1220

    
1221
  @_RpcTimeout(_TMO_FAST)
1222
  def call_export_remove(self, node, export):
1223
    """Requests removal of a given export.
1224

1225
    This is a single-node call.
1226

1227
    """
1228
    return self._SingleNodeCall(node, "export_remove", [export])
1229

    
1230
  @classmethod
1231
  @_RpcTimeout(_TMO_NORMAL)
1232
  def call_node_leave_cluster(cls, node, modify_ssh_setup):
1233
    """Requests a node to clean the cluster information it has.
1234

1235
    This will remove the configuration information from the ganeti data
1236
    dir.
1237

1238
    This is a single-node call.
1239

1240
    """
1241
    return cls._StaticSingleNodeCall(node, "node_leave_cluster",
1242
                                     [modify_ssh_setup])
1243

    
1244
  @_RpcTimeout(_TMO_FAST)
1245
  def call_node_volumes(self, node_list):
1246
    """Gets all volumes on node(s).
1247

1248
    This is a multi-node call.
1249

1250
    """
1251
    return self._MultiNodeCall(node_list, "node_volumes", [])
1252

    
1253
  @_RpcTimeout(_TMO_FAST)
1254
  def call_node_demote_from_mc(self, node):
1255
    """Demote a node from the master candidate role.
1256

1257
    This is a single-node call.
1258

1259
    """
1260
    return self._SingleNodeCall(node, "node_demote_from_mc", [])
1261

    
1262
  @_RpcTimeout(_TMO_NORMAL)
1263
  def call_node_powercycle(self, node, hypervisor):
1264
    """Tries to powercycle a node.
1265

1266
    This is a single-node call.
1267

1268
    """
1269
    return self._SingleNodeCall(node, "node_powercycle", [hypervisor])
1270

    
1271
  @_RpcTimeout(None)
1272
  def call_test_delay(self, node_list, duration):
1273
    """Sleep for a fixed time on given node(s).
1274

1275
    This is a multi-node call.
1276

1277
    """
1278
    return self._MultiNodeCall(node_list, "test_delay", [duration],
1279
                               read_timeout=int(duration + 5))
1280

    
1281
  @_RpcTimeout(_TMO_FAST)
1282
  def call_file_storage_dir_create(self, node, file_storage_dir):
1283
    """Create the given file storage directory.
1284

1285
    This is a single-node call.
1286

1287
    """
1288
    return self._SingleNodeCall(node, "file_storage_dir_create",
1289
                                [file_storage_dir])
1290

    
1291
  @_RpcTimeout(_TMO_FAST)
1292
  def call_file_storage_dir_remove(self, node, file_storage_dir):
1293
    """Remove the given file storage directory.
1294

1295
    This is a single-node call.
1296

1297
    """
1298
    return self._SingleNodeCall(node, "file_storage_dir_remove",
1299
                                [file_storage_dir])
1300

    
1301
  @_RpcTimeout(_TMO_FAST)
1302
  def call_file_storage_dir_rename(self, node, old_file_storage_dir,
1303
                                   new_file_storage_dir):
1304
    """Rename file storage directory.
1305

1306
    This is a single-node call.
1307

1308
    """
1309
    return self._SingleNodeCall(node, "file_storage_dir_rename",
1310
                                [old_file_storage_dir, new_file_storage_dir])
1311

    
1312
  @classmethod
1313
  @_RpcTimeout(_TMO_FAST)
1314
  def call_jobqueue_update(cls, node_list, address_list, file_name, content):
1315
    """Update job queue.
1316

1317
    This is a multi-node call.
1318

1319
    """
1320
    return cls._StaticMultiNodeCall(node_list, "jobqueue_update",
1321
                                    [file_name, cls._Compress(content)],
1322
                                    address_list=address_list)
1323

    
1324
  @classmethod
1325
  @_RpcTimeout(_TMO_NORMAL)
1326
  def call_jobqueue_purge(cls, node):
1327
    """Purge job queue.
1328

1329
    This is a single-node call.
1330

1331
    """
1332
    return cls._StaticSingleNodeCall(node, "jobqueue_purge", [])
1333

    
1334
  @classmethod
1335
  @_RpcTimeout(_TMO_FAST)
1336
  def call_jobqueue_rename(cls, node_list, address_list, rename):
1337
    """Rename a job queue file.
1338

1339
    This is a multi-node call.
1340

1341
    """
1342
    return cls._StaticMultiNodeCall(node_list, "jobqueue_rename", rename,
1343
                                    address_list=address_list)
1344

    
1345
  @_RpcTimeout(_TMO_NORMAL)
1346
  def call_hypervisor_validate_params(self, node_list, hvname, hvparams):
1347
    """Validate the hypervisor params.
1348

1349
    This is a multi-node call.
1350

1351
    @type node_list: list
1352
    @param node_list: the list of nodes to query
1353
    @type hvname: string
1354
    @param hvname: the hypervisor name
1355
    @type hvparams: dict
1356
    @param hvparams: the hypervisor parameters to be validated
1357

1358
    """
1359
    cluster = self._cfg.GetClusterInfo()
1360
    hv_full = objects.FillDict(cluster.hvparams.get(hvname, {}), hvparams)
1361
    return self._MultiNodeCall(node_list, "hypervisor_validate_params",
1362
                               [hvname, hv_full])
1363

    
1364
  @_RpcTimeout(_TMO_NORMAL)
1365
  def call_x509_cert_create(self, node, validity):
1366
    """Creates a new X509 certificate for SSL/TLS.
1367

1368
    This is a single-node call.
1369

1370
    @type validity: int
1371
    @param validity: Validity in seconds
1372

1373
    """
1374
    return self._SingleNodeCall(node, "x509_cert_create", [validity])
1375

    
1376
  @_RpcTimeout(_TMO_NORMAL)
1377
  def call_x509_cert_remove(self, node, name):
1378
    """Removes a X509 certificate.
1379

1380
    This is a single-node call.
1381

1382
    @type name: string
1383
    @param name: Certificate name
1384

1385
    """
1386
    return self._SingleNodeCall(node, "x509_cert_remove", [name])
1387

    
1388
  @_RpcTimeout(_TMO_NORMAL)
1389
  def call_import_start(self, node, opts, instance, dest, dest_args):
1390
    """Starts a listener for an import.
1391

1392
    This is a single-node call.
1393

1394
    @type node: string
1395
    @param node: Node name
1396
    @type instance: C{objects.Instance}
1397
    @param instance: Instance object
1398

1399
    """
1400
    return self._SingleNodeCall(node, "import_start",
1401
                                [opts.ToDict(),
1402
                                 self._InstDict(instance), dest,
1403
                                 _EncodeImportExportIO(dest, dest_args)])
1404

    
1405
  @_RpcTimeout(_TMO_NORMAL)
1406
  def call_export_start(self, node, opts, host, port,
1407
                        instance, source, source_args):
1408
    """Starts an export daemon.
1409

1410
    This is a single-node call.
1411

1412
    @type node: string
1413
    @param node: Node name
1414
    @type instance: C{objects.Instance}
1415
    @param instance: Instance object
1416

1417
    """
1418
    return self._SingleNodeCall(node, "export_start",
1419
                                [opts.ToDict(), host, port,
1420
                                 self._InstDict(instance), source,
1421
                                 _EncodeImportExportIO(source, source_args)])
1422

    
1423
  @_RpcTimeout(_TMO_FAST)
1424
  def call_impexp_status(self, node, names):
1425
    """Gets the status of an import or export.
1426

1427
    This is a single-node call.
1428

1429
    @type node: string
1430
    @param node: Node name
1431
    @type names: List of strings
1432
    @param names: Import/export names
1433
    @rtype: List of L{objects.ImportExportStatus} instances
1434
    @return: Returns a list of the state of each named import/export or None if
1435
             a status couldn't be retrieved
1436

1437
    """
1438
    result = self._SingleNodeCall(node, "impexp_status", [names])
1439

    
1440
    if not result.fail_msg:
1441
      decoded = []
1442

    
1443
      for i in result.payload:
1444
        if i is None:
1445
          decoded.append(None)
1446
          continue
1447
        decoded.append(objects.ImportExportStatus.FromDict(i))
1448

    
1449
      result.payload = decoded
1450

    
1451
    return result
1452

    
1453
  @_RpcTimeout(_TMO_NORMAL)
1454
  def call_impexp_abort(self, node, name):
1455
    """Aborts an import or export.
1456

1457
    This is a single-node call.
1458

1459
    @type node: string
1460
    @param node: Node name
1461
    @type name: string
1462
    @param name: Import/export name
1463

1464
    """
1465
    return self._SingleNodeCall(node, "impexp_abort", [name])
1466

    
1467
  @_RpcTimeout(_TMO_NORMAL)
1468
  def call_impexp_cleanup(self, node, name):
1469
    """Cleans up after an import or export.
1470

1471
    This is a single-node call.
1472

1473
    @type node: string
1474
    @param node: Node name
1475
    @type name: string
1476
    @param name: Import/export name
1477

1478
    """
1479
    return self._SingleNodeCall(node, "impexp_cleanup", [name])