Statistics
| Branch: | Tag: | Revision:

root / lib / rpc.py @ 84a12e40

History | View | Annotate | Download (42.8 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Inter-node RPC library.
23

24
"""
25

    
26
# pylint: disable-msg=C0103,R0201,R0904
27
# C0103: Invalid name, since call_ are not valid
28
# R0201: Method could be a function, we keep all rpcs instance methods
29
# as not to change them back and forth between static/instance methods
30
# if they need to start using instance attributes
31
# R0904: Too many public methods
32

    
33
import os
34
import logging
35
import zlib
36
import base64
37
import pycurl
38
import threading
39

    
40
from ganeti import utils
41
from ganeti import objects
42
from ganeti import http
43
from ganeti import serializer
44
from ganeti import constants
45
from ganeti import errors
46
from ganeti import netutils
47

    
48
# pylint has a bug here, doesn't see this import
49
import ganeti.http.client  # pylint: disable-msg=W0611
50

    
51

    
52
# Timeout for connecting to nodes (seconds)
53
_RPC_CONNECT_TIMEOUT = 5
54

    
55
_RPC_CLIENT_HEADERS = [
56
  "Content-type: %s" % http.HTTP_APP_JSON,
57
  ]
58

    
59
# Various time constants for the timeout table
60
_TMO_URGENT = 60 # one minute
61
_TMO_FAST = 5 * 60 # five minutes
62
_TMO_NORMAL = 15 * 60 # 15 minutes
63
_TMO_SLOW = 3600 # one hour
64
_TMO_4HRS = 4 * 3600
65
_TMO_1DAY = 86400
66

    
67
# Timeout table that will be built later by decorators
68
# Guidelines for choosing timeouts:
69
# - call used during watcher: timeout -> 1min, _TMO_URGENT
70
# - trivial (but be sure it is trivial) (e.g. reading a file): 5min, _TMO_FAST
71
# - other calls: 15 min, _TMO_NORMAL
72
# - special calls (instance add, etc.): either _TMO_SLOW (1h) or huge timeouts
73

    
74
_TIMEOUTS = {
75
}
76

    
77

    
78
def Init():
79
  """Initializes the module-global HTTP client manager.
80

81
  Must be called before using any RPC function and while exactly one thread is
82
  running.
83

84
  """
85
  # curl_global_init(3) and curl_global_cleanup(3) must be called with only
86
  # one thread running. This check is just a safety measure -- it doesn't
87
  # cover all cases.
88
  assert threading.activeCount() == 1, \
89
         "Found more than one active thread when initializing pycURL"
90

    
91
  logging.info("Using PycURL %s", pycurl.version)
92

    
93
  pycurl.global_init(pycurl.GLOBAL_ALL)
94

    
95

    
96
def Shutdown():
97
  """Stops the module-global HTTP client manager.
98

99
  Must be called before quitting the program and while exactly one thread is
100
  running.
101

102
  """
103
  pycurl.global_cleanup()
104

    
105

    
106
def _ConfigRpcCurl(curl):
107
  noded_cert = str(constants.NODED_CERT_FILE)
108

    
109
  curl.setopt(pycurl.FOLLOWLOCATION, False)
110
  curl.setopt(pycurl.CAINFO, noded_cert)
111
  curl.setopt(pycurl.SSL_VERIFYHOST, 0)
112
  curl.setopt(pycurl.SSL_VERIFYPEER, True)
113
  curl.setopt(pycurl.SSLCERTTYPE, "PEM")
114
  curl.setopt(pycurl.SSLCERT, noded_cert)
115
  curl.setopt(pycurl.SSLKEYTYPE, "PEM")
116
  curl.setopt(pycurl.SSLKEY, noded_cert)
117
  curl.setopt(pycurl.CONNECTTIMEOUT, _RPC_CONNECT_TIMEOUT)
118

    
119

    
120
class _RpcThreadLocal(threading.local):
121
  def GetHttpClientPool(self):
122
    """Returns a per-thread HTTP client pool.
123

124
    @rtype: L{http.client.HttpClientPool}
125

126
    """
127
    try:
128
      pool = self.hcp
129
    except AttributeError:
130
      pool = http.client.HttpClientPool(_ConfigRpcCurl)
131
      self.hcp = pool
132

    
133
    return pool
134

    
135

    
136
_thread_local = _RpcThreadLocal()
137

    
138

    
139
def _RpcTimeout(secs):
140
  """Timeout decorator.
141

142
  When applied to a rpc call_* function, it updates the global timeout
143
  table with the given function/timeout.
144

145
  """
146
  def decorator(f):
147
    name = f.__name__
148
    assert name.startswith("call_")
149
    _TIMEOUTS[name[len("call_"):]] = secs
150
    return f
151
  return decorator
152

    
153

    
154
class RpcResult(object):
155
  """RPC Result class.
156

157
  This class holds an RPC result. It is needed since in multi-node
158
  calls we can't raise an exception just because one one out of many
159
  failed, and therefore we use this class to encapsulate the result.
160

161
  @ivar data: the data payload, for successful results, or None
162
  @ivar call: the name of the RPC call
163
  @ivar node: the name of the node to which we made the call
164
  @ivar offline: whether the operation failed because the node was
165
      offline, as opposed to actual failure; offline=True will always
166
      imply failed=True, in order to allow simpler checking if
167
      the user doesn't care about the exact failure mode
168
  @ivar fail_msg: the error message if the call failed
169

170
  """
171
  def __init__(self, data=None, failed=False, offline=False,
172
               call=None, node=None):
173
    self.offline = offline
174
    self.call = call
175
    self.node = node
176

    
177
    if offline:
178
      self.fail_msg = "Node is marked offline"
179
      self.data = self.payload = None
180
    elif failed:
181
      self.fail_msg = self._EnsureErr(data)
182
      self.data = self.payload = None
183
    else:
184
      self.data = data
185
      if not isinstance(self.data, (tuple, list)):
186
        self.fail_msg = ("RPC layer error: invalid result type (%s)" %
187
                         type(self.data))
188
        self.payload = None
189
      elif len(data) != 2:
190
        self.fail_msg = ("RPC layer error: invalid result length (%d), "
191
                         "expected 2" % len(self.data))
192
        self.payload = None
193
      elif not self.data[0]:
194
        self.fail_msg = self._EnsureErr(self.data[1])
195
        self.payload = None
196
      else:
197
        # finally success
198
        self.fail_msg = None
199
        self.payload = data[1]
200

    
201
    assert hasattr(self, "call")
202
    assert hasattr(self, "data")
203
    assert hasattr(self, "fail_msg")
204
    assert hasattr(self, "node")
205
    assert hasattr(self, "offline")
206
    assert hasattr(self, "payload")
207

    
208
  @staticmethod
209
  def _EnsureErr(val):
210
    """Helper to ensure we return a 'True' value for error."""
211
    if val:
212
      return val
213
    else:
214
      return "No error information"
215

    
216
  def Raise(self, msg, prereq=False, ecode=None):
217
    """If the result has failed, raise an OpExecError.
218

219
    This is used so that LU code doesn't have to check for each
220
    result, but instead can call this function.
221

222
    """
223
    if not self.fail_msg:
224
      return
225

    
226
    if not msg: # one could pass None for default message
227
      msg = ("Call '%s' to node '%s' has failed: %s" %
228
             (self.call, self.node, self.fail_msg))
229
    else:
230
      msg = "%s: %s" % (msg, self.fail_msg)
231
    if prereq:
232
      ec = errors.OpPrereqError
233
    else:
234
      ec = errors.OpExecError
235
    if ecode is not None:
236
      args = (msg, ecode)
237
    else:
238
      args = (msg, )
239
    raise ec(*args) # pylint: disable-msg=W0142
240

    
241

    
242
class Client:
243
  """RPC Client class.
244

245
  This class, given a (remote) method name, a list of parameters and a
246
  list of nodes, will contact (in parallel) all nodes, and return a
247
  dict of results (key: node name, value: result).
248

249
  One current bug is that generic failure is still signaled by
250
  'False' result, which is not good. This overloading of values can
251
  cause bugs.
252

253
  """
254
  def __init__(self, procedure, body, port):
255
    assert procedure in _TIMEOUTS, ("New RPC call not declared in the"
256
                                    " timeouts table")
257
    self.procedure = procedure
258
    self.body = body
259
    self.port = port
260
    self._request = {}
261

    
262
  def ConnectList(self, node_list, address_list=None, read_timeout=None):
263
    """Add a list of nodes to the target nodes.
264

265
    @type node_list: list
266
    @param node_list: the list of node names to connect
267
    @type address_list: list or None
268
    @keyword address_list: either None or a list with node addresses,
269
        which must have the same length as the node list
270
    @type read_timeout: int
271
    @param read_timeout: overwrites the default read timeout for the
272
        given operation
273

274
    """
275
    if address_list is None:
276
      address_list = [None for _ in node_list]
277
    else:
278
      assert len(node_list) == len(address_list), \
279
             "Name and address lists should have the same length"
280
    for node, address in zip(node_list, address_list):
281
      self.ConnectNode(node, address, read_timeout=read_timeout)
282

    
283
  def ConnectNode(self, name, address=None, read_timeout=None):
284
    """Add a node to the target list.
285

286
    @type name: str
287
    @param name: the node name
288
    @type address: str
289
    @keyword address: the node address, if known
290

291
    """
292
    if address is None:
293
      address = name
294

    
295
    if read_timeout is None:
296
      read_timeout = _TIMEOUTS[self.procedure]
297

    
298
    self._request[name] = \
299
      http.client.HttpClientRequest(str(address), self.port,
300
                                    http.HTTP_PUT, str("/%s" % self.procedure),
301
                                    headers=_RPC_CLIENT_HEADERS,
302
                                    post_data=str(self.body),
303
                                    read_timeout=read_timeout)
304

    
305
  def GetResults(self, http_pool=None):
306
    """Call nodes and return results.
307

308
    @rtype: list
309
    @return: List of RPC results
310

311
    """
312
    if not http_pool:
313
      http_pool = _thread_local.GetHttpClientPool()
314

    
315
    http_pool.ProcessRequests(self._request.values())
316

    
317
    results = {}
318

    
319
    for name, req in self._request.iteritems():
320
      if req.success and req.resp_status_code == http.HTTP_OK:
321
        results[name] = RpcResult(data=serializer.LoadJson(req.resp_body),
322
                                  node=name, call=self.procedure)
323
        continue
324

    
325
      # TODO: Better error reporting
326
      if req.error:
327
        msg = req.error
328
      else:
329
        msg = req.resp_body
330

    
331
      logging.error("RPC error in %s from node %s: %s",
332
                    self.procedure, name, msg)
333
      results[name] = RpcResult(data=msg, failed=True, node=name,
334
                                call=self.procedure)
335

    
336
    return results
337

    
338

    
339
def _EncodeImportExportIO(ieio, ieioargs):
340
  """Encodes import/export I/O information.
341

342
  """
343
  if ieio == constants.IEIO_RAW_DISK:
344
    assert len(ieioargs) == 1
345
    return (ieioargs[0].ToDict(), )
346

    
347
  if ieio == constants.IEIO_SCRIPT:
348
    assert len(ieioargs) == 2
349
    return (ieioargs[0].ToDict(), ieioargs[1])
350

    
351
  return ieioargs
352

    
353

    
354
class RpcRunner(object):
355
  """RPC runner class"""
356

    
357
  def __init__(self, cfg):
358
    """Initialized the rpc runner.
359

360
    @type cfg:  C{config.ConfigWriter}
361
    @param cfg: the configuration object that will be used to get data
362
                about the cluster
363

364
    """
365
    self._cfg = cfg
366
    self.port = netutils.GetDaemonPort(constants.NODED)
367

    
368
  def _InstDict(self, instance, hvp=None, bep=None, osp=None):
369
    """Convert the given instance to a dict.
370

371
    This is done via the instance's ToDict() method and additionally
372
    we fill the hvparams with the cluster defaults.
373

374
    @type instance: L{objects.Instance}
375
    @param instance: an Instance object
376
    @type hvp: dict or None
377
    @param hvp: a dictionary with overridden hypervisor parameters
378
    @type bep: dict or None
379
    @param bep: a dictionary with overridden backend parameters
380
    @type osp: dict or None
381
    @param osp: a dictionary with overriden os parameters
382
    @rtype: dict
383
    @return: the instance dict, with the hvparams filled with the
384
        cluster defaults
385

386
    """
387
    idict = instance.ToDict()
388
    cluster = self._cfg.GetClusterInfo()
389
    idict["hvparams"] = cluster.FillHV(instance)
390
    if hvp is not None:
391
      idict["hvparams"].update(hvp)
392
    idict["beparams"] = cluster.FillBE(instance)
393
    if bep is not None:
394
      idict["beparams"].update(bep)
395
    idict["osparams"] = cluster.SimpleFillOS(instance.os, instance.osparams)
396
    if osp is not None:
397
      idict["osparams"].update(osp)
398
    for nic in idict["nics"]:
399
      nic['nicparams'] = objects.FillDict(
400
        cluster.nicparams[constants.PP_DEFAULT],
401
        nic['nicparams'])
402
    return idict
403

    
404
  def _ConnectList(self, client, node_list, call, read_timeout=None):
405
    """Helper for computing node addresses.
406

407
    @type client: L{ganeti.rpc.Client}
408
    @param client: a C{Client} instance
409
    @type node_list: list
410
    @param node_list: the node list we should connect
411
    @type call: string
412
    @param call: the name of the remote procedure call, for filling in
413
        correctly any eventual offline nodes' results
414
    @type read_timeout: int
415
    @param read_timeout: overwrites the default read timeout for the
416
        given operation
417

418
    """
419
    all_nodes = self._cfg.GetAllNodesInfo()
420
    name_list = []
421
    addr_list = []
422
    skip_dict = {}
423
    for node in node_list:
424
      if node in all_nodes:
425
        if all_nodes[node].offline:
426
          skip_dict[node] = RpcResult(node=node, offline=True, call=call)
427
          continue
428
        val = all_nodes[node].primary_ip
429
      else:
430
        val = None
431
      addr_list.append(val)
432
      name_list.append(node)
433
    if name_list:
434
      client.ConnectList(name_list, address_list=addr_list,
435
                         read_timeout=read_timeout)
436
    return skip_dict
437

    
438
  def _ConnectNode(self, client, node, call, read_timeout=None):
439
    """Helper for computing one node's address.
440

441
    @type client: L{ganeti.rpc.Client}
442
    @param client: a C{Client} instance
443
    @type node: str
444
    @param node: the node we should connect
445
    @type call: string
446
    @param call: the name of the remote procedure call, for filling in
447
        correctly any eventual offline nodes' results
448
    @type read_timeout: int
449
    @param read_timeout: overwrites the default read timeout for the
450
        given operation
451

452
    """
453
    node_info = self._cfg.GetNodeInfo(node)
454
    if node_info is not None:
455
      if node_info.offline:
456
        return RpcResult(node=node, offline=True, call=call)
457
      addr = node_info.primary_ip
458
    else:
459
      addr = None
460
    client.ConnectNode(node, address=addr, read_timeout=read_timeout)
461

    
462
  def _MultiNodeCall(self, node_list, procedure, args, read_timeout=None):
463
    """Helper for making a multi-node call
464

465
    """
466
    body = serializer.DumpJson(args, indent=False)
467
    c = Client(procedure, body, self.port)
468
    skip_dict = self._ConnectList(c, node_list, procedure,
469
                                  read_timeout=read_timeout)
470
    skip_dict.update(c.GetResults())
471
    return skip_dict
472

    
473
  @classmethod
474
  def _StaticMultiNodeCall(cls, node_list, procedure, args,
475
                           address_list=None, read_timeout=None):
476
    """Helper for making a multi-node static call
477

478
    """
479
    body = serializer.DumpJson(args, indent=False)
480
    c = Client(procedure, body, netutils.GetDaemonPort(constants.NODED))
481
    c.ConnectList(node_list, address_list=address_list,
482
                  read_timeout=read_timeout)
483
    return c.GetResults()
484

    
485
  def _SingleNodeCall(self, node, procedure, args, read_timeout=None):
486
    """Helper for making a single-node call
487

488
    """
489
    body = serializer.DumpJson(args, indent=False)
490
    c = Client(procedure, body, self.port)
491
    result = self._ConnectNode(c, node, procedure, read_timeout=read_timeout)
492
    if result is None:
493
      # we did connect, node is not offline
494
      result = c.GetResults()[node]
495
    return result
496

    
497
  @classmethod
498
  def _StaticSingleNodeCall(cls, node, procedure, args, read_timeout=None):
499
    """Helper for making a single-node static call
500

501
    """
502
    body = serializer.DumpJson(args, indent=False)
503
    c = Client(procedure, body, netutils.GetDaemonPort(constants.NODED))
504
    c.ConnectNode(node, read_timeout=read_timeout)
505
    return c.GetResults()[node]
506

    
507
  @staticmethod
508
  def _Compress(data):
509
    """Compresses a string for transport over RPC.
510

511
    Small amounts of data are not compressed.
512

513
    @type data: str
514
    @param data: Data
515
    @rtype: tuple
516
    @return: Encoded data to send
517

518
    """
519
    # Small amounts of data are not compressed
520
    if len(data) < 512:
521
      return (constants.RPC_ENCODING_NONE, data)
522

    
523
    # Compress with zlib and encode in base64
524
    return (constants.RPC_ENCODING_ZLIB_BASE64,
525
            base64.b64encode(zlib.compress(data, 3)))
526

    
527
  #
528
  # Begin RPC calls
529
  #
530

    
531
  @_RpcTimeout(_TMO_URGENT)
532
  def call_lv_list(self, node_list, vg_name):
533
    """Gets the logical volumes present in a given volume group.
534

535
    This is a multi-node call.
536

537
    """
538
    return self._MultiNodeCall(node_list, "lv_list", [vg_name])
539

    
540
  @_RpcTimeout(_TMO_URGENT)
541
  def call_vg_list(self, node_list):
542
    """Gets the volume group list.
543

544
    This is a multi-node call.
545

546
    """
547
    return self._MultiNodeCall(node_list, "vg_list", [])
548

    
549
  @_RpcTimeout(_TMO_NORMAL)
550
  def call_storage_list(self, node_list, su_name, su_args, name, fields):
551
    """Get list of storage units.
552

553
    This is a multi-node call.
554

555
    """
556
    return self._MultiNodeCall(node_list, "storage_list",
557
                               [su_name, su_args, name, fields])
558

    
559
  @_RpcTimeout(_TMO_NORMAL)
560
  def call_storage_modify(self, node, su_name, su_args, name, changes):
561
    """Modify a storage unit.
562

563
    This is a single-node call.
564

565
    """
566
    return self._SingleNodeCall(node, "storage_modify",
567
                                [su_name, su_args, name, changes])
568

    
569
  @_RpcTimeout(_TMO_NORMAL)
570
  def call_storage_execute(self, node, su_name, su_args, name, op):
571
    """Executes an operation on a storage unit.
572

573
    This is a single-node call.
574

575
    """
576
    return self._SingleNodeCall(node, "storage_execute",
577
                                [su_name, su_args, name, op])
578

    
579
  @_RpcTimeout(_TMO_URGENT)
580
  def call_bridges_exist(self, node, bridges_list):
581
    """Checks if a node has all the bridges given.
582

583
    This method checks if all bridges given in the bridges_list are
584
    present on the remote node, so that an instance that uses interfaces
585
    on those bridges can be started.
586

587
    This is a single-node call.
588

589
    """
590
    return self._SingleNodeCall(node, "bridges_exist", [bridges_list])
591

    
592
  @_RpcTimeout(_TMO_NORMAL)
593
  def call_instance_start(self, node, instance, hvp, bep):
594
    """Starts an instance.
595

596
    This is a single-node call.
597

598
    """
599
    idict = self._InstDict(instance, hvp=hvp, bep=bep)
600
    return self._SingleNodeCall(node, "instance_start", [idict])
601

    
602
  @_RpcTimeout(_TMO_NORMAL)
603
  def call_instance_shutdown(self, node, instance, timeout):
604
    """Stops an instance.
605

606
    This is a single-node call.
607

608
    """
609
    return self._SingleNodeCall(node, "instance_shutdown",
610
                                [self._InstDict(instance), timeout])
611

    
612
  @_RpcTimeout(_TMO_NORMAL)
613
  def call_migration_info(self, node, instance):
614
    """Gather the information necessary to prepare an instance migration.
615

616
    This is a single-node call.
617

618
    @type node: string
619
    @param node: the node on which the instance is currently running
620
    @type instance: C{objects.Instance}
621
    @param instance: the instance definition
622

623
    """
624
    return self._SingleNodeCall(node, "migration_info",
625
                                [self._InstDict(instance)])
626

    
627
  @_RpcTimeout(_TMO_NORMAL)
628
  def call_accept_instance(self, node, instance, info, target):
629
    """Prepare a node to accept an instance.
630

631
    This is a single-node call.
632

633
    @type node: string
634
    @param node: the target node for the migration
635
    @type instance: C{objects.Instance}
636
    @param instance: the instance definition
637
    @type info: opaque/hypervisor specific (string/data)
638
    @param info: result for the call_migration_info call
639
    @type target: string
640
    @param target: target hostname (usually ip address) (on the node itself)
641

642
    """
643
    return self._SingleNodeCall(node, "accept_instance",
644
                                [self._InstDict(instance), info, target])
645

    
646
  @_RpcTimeout(_TMO_NORMAL)
647
  def call_finalize_migration(self, node, instance, info, success):
648
    """Finalize any target-node migration specific operation.
649

650
    This is called both in case of a successful migration and in case of error
651
    (in which case it should abort the migration).
652

653
    This is a single-node call.
654

655
    @type node: string
656
    @param node: the target node for the migration
657
    @type instance: C{objects.Instance}
658
    @param instance: the instance definition
659
    @type info: opaque/hypervisor specific (string/data)
660
    @param info: result for the call_migration_info call
661
    @type success: boolean
662
    @param success: whether the migration was a success or a failure
663

664
    """
665
    return self._SingleNodeCall(node, "finalize_migration",
666
                                [self._InstDict(instance), info, success])
667

    
668
  @_RpcTimeout(_TMO_SLOW)
669
  def call_instance_migrate(self, node, instance, target, live):
670
    """Migrate an instance.
671

672
    This is a single-node call.
673

674
    @type node: string
675
    @param node: the node on which the instance is currently running
676
    @type instance: C{objects.Instance}
677
    @param instance: the instance definition
678
    @type target: string
679
    @param target: the target node name
680
    @type live: boolean
681
    @param live: whether the migration should be done live or not (the
682
        interpretation of this parameter is left to the hypervisor)
683

684
    """
685
    return self._SingleNodeCall(node, "instance_migrate",
686
                                [self._InstDict(instance), target, live])
687

    
688
  @_RpcTimeout(_TMO_NORMAL)
689
  def call_instance_reboot(self, node, inst, reboot_type, shutdown_timeout):
690
    """Reboots an instance.
691

692
    This is a single-node call.
693

694
    """
695
    return self._SingleNodeCall(node, "instance_reboot",
696
                                [self._InstDict(inst), reboot_type,
697
                                 shutdown_timeout])
698

    
699
  @_RpcTimeout(_TMO_1DAY)
700
  def call_instance_os_add(self, node, inst, reinstall, debug):
701
    """Installs an OS on the given instance.
702

703
    This is a single-node call.
704

705
    """
706
    return self._SingleNodeCall(node, "instance_os_add",
707
                                [self._InstDict(inst), reinstall, debug])
708

    
709
  @_RpcTimeout(_TMO_SLOW)
710
  def call_instance_run_rename(self, node, inst, old_name, debug):
711
    """Run the OS rename script for an instance.
712

713
    This is a single-node call.
714

715
    """
716
    return self._SingleNodeCall(node, "instance_run_rename",
717
                                [self._InstDict(inst), old_name, debug])
718

    
719
  @_RpcTimeout(_TMO_URGENT)
720
  def call_instance_info(self, node, instance, hname):
721
    """Returns information about a single instance.
722

723
    This is a single-node call.
724

725
    @type node: list
726
    @param node: the list of nodes to query
727
    @type instance: string
728
    @param instance: the instance name
729
    @type hname: string
730
    @param hname: the hypervisor type of the instance
731

732
    """
733
    return self._SingleNodeCall(node, "instance_info", [instance, hname])
734

    
735
  @_RpcTimeout(_TMO_NORMAL)
736
  def call_instance_migratable(self, node, instance):
737
    """Checks whether the given instance can be migrated.
738

739
    This is a single-node call.
740

741
    @param node: the node to query
742
    @type instance: L{objects.Instance}
743
    @param instance: the instance to check
744

745

746
    """
747
    return self._SingleNodeCall(node, "instance_migratable",
748
                                [self._InstDict(instance)])
749

    
750
  @_RpcTimeout(_TMO_URGENT)
751
  def call_all_instances_info(self, node_list, hypervisor_list):
752
    """Returns information about all instances on the given nodes.
753

754
    This is a multi-node call.
755

756
    @type node_list: list
757
    @param node_list: the list of nodes to query
758
    @type hypervisor_list: list
759
    @param hypervisor_list: the hypervisors to query for instances
760

761
    """
762
    return self._MultiNodeCall(node_list, "all_instances_info",
763
                               [hypervisor_list])
764

    
765
  @_RpcTimeout(_TMO_URGENT)
766
  def call_instance_list(self, node_list, hypervisor_list):
767
    """Returns the list of running instances on a given node.
768

769
    This is a multi-node call.
770

771
    @type node_list: list
772
    @param node_list: the list of nodes to query
773
    @type hypervisor_list: list
774
    @param hypervisor_list: the hypervisors to query for instances
775

776
    """
777
    return self._MultiNodeCall(node_list, "instance_list", [hypervisor_list])
778

    
779
  @_RpcTimeout(_TMO_FAST)
780
  def call_node_tcp_ping(self, node, source, target, port, timeout,
781
                         live_port_needed):
782
    """Do a TcpPing on the remote node
783

784
    This is a single-node call.
785

786
    """
787
    return self._SingleNodeCall(node, "node_tcp_ping",
788
                                [source, target, port, timeout,
789
                                 live_port_needed])
790

    
791
  @_RpcTimeout(_TMO_FAST)
792
  def call_node_has_ip_address(self, node, address):
793
    """Checks if a node has the given IP address.
794

795
    This is a single-node call.
796

797
    """
798
    return self._SingleNodeCall(node, "node_has_ip_address", [address])
799

    
800
  @_RpcTimeout(_TMO_URGENT)
801
  def call_node_info(self, node_list, vg_name, hypervisor_type):
802
    """Return node information.
803

804
    This will return memory information and volume group size and free
805
    space.
806

807
    This is a multi-node call.
808

809
    @type node_list: list
810
    @param node_list: the list of nodes to query
811
    @type vg_name: C{string}
812
    @param vg_name: the name of the volume group to ask for disk space
813
        information
814
    @type hypervisor_type: C{str}
815
    @param hypervisor_type: the name of the hypervisor to ask for
816
        memory information
817

818
    """
819
    return self._MultiNodeCall(node_list, "node_info",
820
                               [vg_name, hypervisor_type])
821

    
822
  @_RpcTimeout(_TMO_NORMAL)
823
  def call_node_add(self, node, dsa, dsapub, rsa, rsapub, ssh, sshpub):
824
    """Add a node to the cluster.
825

826
    This is a single-node call.
827

828
    """
829
    return self._SingleNodeCall(node, "node_add",
830
                                [dsa, dsapub, rsa, rsapub, ssh, sshpub])
831

    
832
  @_RpcTimeout(_TMO_NORMAL)
833
  def call_node_verify(self, node_list, checkdict, cluster_name):
834
    """Request verification of given parameters.
835

836
    This is a multi-node call.
837

838
    """
839
    return self._MultiNodeCall(node_list, "node_verify",
840
                               [checkdict, cluster_name])
841

    
842
  @classmethod
843
  @_RpcTimeout(_TMO_FAST)
844
  def call_node_start_master(cls, node, start_daemons, no_voting):
845
    """Tells a node to activate itself as a master.
846

847
    This is a single-node call.
848

849
    """
850
    return cls._StaticSingleNodeCall(node, "node_start_master",
851
                                     [start_daemons, no_voting])
852

    
853
  @classmethod
854
  @_RpcTimeout(_TMO_FAST)
855
  def call_node_stop_master(cls, node, stop_daemons):
856
    """Tells a node to demote itself from master status.
857

858
    This is a single-node call.
859

860
    """
861
    return cls._StaticSingleNodeCall(node, "node_stop_master", [stop_daemons])
862

    
863
  @classmethod
864
  @_RpcTimeout(_TMO_URGENT)
865
  def call_master_info(cls, node_list):
866
    """Query master info.
867

868
    This is a multi-node call.
869

870
    """
871
    # TODO: should this method query down nodes?
872
    return cls._StaticMultiNodeCall(node_list, "master_info", [])
873

    
874
  @classmethod
875
  @_RpcTimeout(_TMO_URGENT)
876
  def call_version(cls, node_list):
877
    """Query node version.
878

879
    This is a multi-node call.
880

881
    """
882
    return cls._StaticMultiNodeCall(node_list, "version", [])
883

    
884
  @_RpcTimeout(_TMO_NORMAL)
885
  def call_blockdev_create(self, node, bdev, size, owner, on_primary, info):
886
    """Request creation of a given block device.
887

888
    This is a single-node call.
889

890
    """
891
    return self._SingleNodeCall(node, "blockdev_create",
892
                                [bdev.ToDict(), size, owner, on_primary, info])
893

    
894
  @_RpcTimeout(_TMO_NORMAL)
895
  def call_blockdev_remove(self, node, bdev):
896
    """Request removal of a given block device.
897

898
    This is a single-node call.
899

900
    """
901
    return self._SingleNodeCall(node, "blockdev_remove", [bdev.ToDict()])
902

    
903
  @_RpcTimeout(_TMO_NORMAL)
904
  def call_blockdev_rename(self, node, devlist):
905
    """Request rename of the given block devices.
906

907
    This is a single-node call.
908

909
    """
910
    return self._SingleNodeCall(node, "blockdev_rename",
911
                                [(d.ToDict(), uid) for d, uid in devlist])
912

    
913
  @_RpcTimeout(_TMO_NORMAL)
914
  def call_blockdev_assemble(self, node, disk, owner, on_primary):
915
    """Request assembling of a given block device.
916

917
    This is a single-node call.
918

919
    """
920
    return self._SingleNodeCall(node, "blockdev_assemble",
921
                                [disk.ToDict(), owner, on_primary])
922

    
923
  @_RpcTimeout(_TMO_NORMAL)
924
  def call_blockdev_shutdown(self, node, disk):
925
    """Request shutdown of a given block device.
926

927
    This is a single-node call.
928

929
    """
930
    return self._SingleNodeCall(node, "blockdev_shutdown", [disk.ToDict()])
931

    
932
  @_RpcTimeout(_TMO_NORMAL)
933
  def call_blockdev_addchildren(self, node, bdev, ndevs):
934
    """Request adding a list of children to a (mirroring) device.
935

936
    This is a single-node call.
937

938
    """
939
    return self._SingleNodeCall(node, "blockdev_addchildren",
940
                                [bdev.ToDict(),
941
                                 [disk.ToDict() for disk in ndevs]])
942

    
943
  @_RpcTimeout(_TMO_NORMAL)
944
  def call_blockdev_removechildren(self, node, bdev, ndevs):
945
    """Request removing a list of children from a (mirroring) device.
946

947
    This is a single-node call.
948

949
    """
950
    return self._SingleNodeCall(node, "blockdev_removechildren",
951
                                [bdev.ToDict(),
952
                                 [disk.ToDict() for disk in ndevs]])
953

    
954
  @_RpcTimeout(_TMO_NORMAL)
955
  def call_blockdev_getmirrorstatus(self, node, disks):
956
    """Request status of a (mirroring) device.
957

958
    This is a single-node call.
959

960
    """
961
    result = self._SingleNodeCall(node, "blockdev_getmirrorstatus",
962
                                  [dsk.ToDict() for dsk in disks])
963
    if not result.fail_msg:
964
      result.payload = [objects.BlockDevStatus.FromDict(i)
965
                        for i in result.payload]
966
    return result
967

    
968
  @_RpcTimeout(_TMO_NORMAL)
969
  def call_blockdev_find(self, node, disk):
970
    """Request identification of a given block device.
971

972
    This is a single-node call.
973

974
    """
975
    result = self._SingleNodeCall(node, "blockdev_find", [disk.ToDict()])
976
    if not result.fail_msg and result.payload is not None:
977
      result.payload = objects.BlockDevStatus.FromDict(result.payload)
978
    return result
979

    
980
  @_RpcTimeout(_TMO_NORMAL)
981
  def call_blockdev_close(self, node, instance_name, disks):
982
    """Closes the given block devices.
983

984
    This is a single-node call.
985

986
    """
987
    params = [instance_name, [cf.ToDict() for cf in disks]]
988
    return self._SingleNodeCall(node, "blockdev_close", params)
989

    
990
  @_RpcTimeout(_TMO_NORMAL)
991
  def call_blockdev_getsizes(self, node, disks):
992
    """Returns the size of the given disks.
993

994
    This is a single-node call.
995

996
    """
997
    params = [[cf.ToDict() for cf in disks]]
998
    return self._SingleNodeCall(node, "blockdev_getsize", params)
999

    
1000
  @_RpcTimeout(_TMO_NORMAL)
1001
  def call_drbd_disconnect_net(self, node_list, nodes_ip, disks):
1002
    """Disconnects the network of the given drbd devices.
1003

1004
    This is a multi-node call.
1005

1006
    """
1007
    return self._MultiNodeCall(node_list, "drbd_disconnect_net",
1008
                               [nodes_ip, [cf.ToDict() for cf in disks]])
1009

    
1010
  @_RpcTimeout(_TMO_NORMAL)
1011
  def call_drbd_attach_net(self, node_list, nodes_ip,
1012
                           disks, instance_name, multimaster):
1013
    """Disconnects the given drbd devices.
1014

1015
    This is a multi-node call.
1016

1017
    """
1018
    return self._MultiNodeCall(node_list, "drbd_attach_net",
1019
                               [nodes_ip, [cf.ToDict() for cf in disks],
1020
                                instance_name, multimaster])
1021

    
1022
  @_RpcTimeout(_TMO_SLOW)
1023
  def call_drbd_wait_sync(self, node_list, nodes_ip, disks):
1024
    """Waits for the synchronization of drbd devices is complete.
1025

1026
    This is a multi-node call.
1027

1028
    """
1029
    return self._MultiNodeCall(node_list, "drbd_wait_sync",
1030
                               [nodes_ip, [cf.ToDict() for cf in disks]])
1031

    
1032
  @_RpcTimeout(_TMO_URGENT)
1033
  def call_drbd_helper(self, node_list):
1034
    """Gets drbd helper.
1035

1036
    This is a multi-node call.
1037

1038
    """
1039
    return self._MultiNodeCall(node_list, "drbd_helper", [])
1040

    
1041
  @classmethod
1042
  @_RpcTimeout(_TMO_NORMAL)
1043
  def call_upload_file(cls, node_list, file_name, address_list=None):
1044
    """Upload a file.
1045

1046
    The node will refuse the operation in case the file is not on the
1047
    approved file list.
1048

1049
    This is a multi-node call.
1050

1051
    @type node_list: list
1052
    @param node_list: the list of node names to upload to
1053
    @type file_name: str
1054
    @param file_name: the filename to upload
1055
    @type address_list: list or None
1056
    @keyword address_list: an optional list of node addresses, in order
1057
        to optimize the RPC speed
1058

1059
    """
1060
    file_contents = utils.ReadFile(file_name)
1061
    data = cls._Compress(file_contents)
1062
    st = os.stat(file_name)
1063
    params = [file_name, data, st.st_mode, st.st_uid, st.st_gid,
1064
              st.st_atime, st.st_mtime]
1065
    return cls._StaticMultiNodeCall(node_list, "upload_file", params,
1066
                                    address_list=address_list)
1067

    
1068
  @classmethod
1069
  @_RpcTimeout(_TMO_NORMAL)
1070
  def call_write_ssconf_files(cls, node_list, values):
1071
    """Write ssconf files.
1072

1073
    This is a multi-node call.
1074

1075
    """
1076
    return cls._StaticMultiNodeCall(node_list, "write_ssconf_files", [values])
1077

    
1078
  @_RpcTimeout(_TMO_FAST)
1079
  def call_os_diagnose(self, node_list):
1080
    """Request a diagnose of OS definitions.
1081

1082
    This is a multi-node call.
1083

1084
    """
1085
    return self._MultiNodeCall(node_list, "os_diagnose", [])
1086

    
1087
  @_RpcTimeout(_TMO_FAST)
1088
  def call_os_get(self, node, name):
1089
    """Returns an OS definition.
1090

1091
    This is a single-node call.
1092

1093
    """
1094
    result = self._SingleNodeCall(node, "os_get", [name])
1095
    if not result.fail_msg and isinstance(result.payload, dict):
1096
      result.payload = objects.OS.FromDict(result.payload)
1097
    return result
1098

    
1099
  @_RpcTimeout(_TMO_FAST)
1100
  def call_os_validate(self, required, nodes, name, checks, params):
1101
    """Run a validation routine for a given OS.
1102

1103
    This is a multi-node call.
1104

1105
    """
1106
    return self._MultiNodeCall(nodes, "os_validate",
1107
                               [required, name, checks, params])
1108

    
1109
  @_RpcTimeout(_TMO_NORMAL)
1110
  def call_hooks_runner(self, node_list, hpath, phase, env):
1111
    """Call the hooks runner.
1112

1113
    Args:
1114
      - op: the OpCode instance
1115
      - env: a dictionary with the environment
1116

1117
    This is a multi-node call.
1118

1119
    """
1120
    params = [hpath, phase, env]
1121
    return self._MultiNodeCall(node_list, "hooks_runner", params)
1122

    
1123
  @_RpcTimeout(_TMO_NORMAL)
1124
  def call_iallocator_runner(self, node, name, idata):
1125
    """Call an iallocator on a remote node
1126

1127
    Args:
1128
      - name: the iallocator name
1129
      - input: the json-encoded input string
1130

1131
    This is a single-node call.
1132

1133
    """
1134
    return self._SingleNodeCall(node, "iallocator_runner", [name, idata])
1135

    
1136
  @_RpcTimeout(_TMO_NORMAL)
1137
  def call_blockdev_grow(self, node, cf_bdev, amount):
1138
    """Request a snapshot of the given block device.
1139

1140
    This is a single-node call.
1141

1142
    """
1143
    return self._SingleNodeCall(node, "blockdev_grow",
1144
                                [cf_bdev.ToDict(), amount])
1145

    
1146
  @_RpcTimeout(_TMO_1DAY)
1147
  def call_blockdev_export(self, node, cf_bdev,
1148
                           dest_node, dest_path, cluster_name):
1149
    """Export a given disk to another node.
1150

1151
    This is a single-node call.
1152

1153
    """
1154
    return self._SingleNodeCall(node, "blockdev_export",
1155
                                [cf_bdev.ToDict(), dest_node, dest_path,
1156
                                 cluster_name])
1157

    
1158
  @_RpcTimeout(_TMO_NORMAL)
1159
  def call_blockdev_snapshot(self, node, cf_bdev):
1160
    """Request a snapshot of the given block device.
1161

1162
    This is a single-node call.
1163

1164
    """
1165
    return self._SingleNodeCall(node, "blockdev_snapshot", [cf_bdev.ToDict()])
1166

    
1167
  @_RpcTimeout(_TMO_NORMAL)
1168
  def call_finalize_export(self, node, instance, snap_disks):
1169
    """Request the completion of an export operation.
1170

1171
    This writes the export config file, etc.
1172

1173
    This is a single-node call.
1174

1175
    """
1176
    flat_disks = []
1177
    for disk in snap_disks:
1178
      if isinstance(disk, bool):
1179
        flat_disks.append(disk)
1180
      else:
1181
        flat_disks.append(disk.ToDict())
1182

    
1183
    return self._SingleNodeCall(node, "finalize_export",
1184
                                [self._InstDict(instance), flat_disks])
1185

    
1186
  @_RpcTimeout(_TMO_FAST)
1187
  def call_export_info(self, node, path):
1188
    """Queries the export information in a given path.
1189

1190
    This is a single-node call.
1191

1192
    """
1193
    return self._SingleNodeCall(node, "export_info", [path])
1194

    
1195
  @_RpcTimeout(_TMO_FAST)
1196
  def call_export_list(self, node_list):
1197
    """Gets the stored exports list.
1198

1199
    This is a multi-node call.
1200

1201
    """
1202
    return self._MultiNodeCall(node_list, "export_list", [])
1203

    
1204
  @_RpcTimeout(_TMO_FAST)
1205
  def call_export_remove(self, node, export):
1206
    """Requests removal of a given export.
1207

1208
    This is a single-node call.
1209

1210
    """
1211
    return self._SingleNodeCall(node, "export_remove", [export])
1212

    
1213
  @classmethod
1214
  @_RpcTimeout(_TMO_NORMAL)
1215
  def call_node_leave_cluster(cls, node, modify_ssh_setup):
1216
    """Requests a node to clean the cluster information it has.
1217

1218
    This will remove the configuration information from the ganeti data
1219
    dir.
1220

1221
    This is a single-node call.
1222

1223
    """
1224
    return cls._StaticSingleNodeCall(node, "node_leave_cluster",
1225
                                     [modify_ssh_setup])
1226

    
1227
  @_RpcTimeout(_TMO_FAST)
1228
  def call_node_volumes(self, node_list):
1229
    """Gets all volumes on node(s).
1230

1231
    This is a multi-node call.
1232

1233
    """
1234
    return self._MultiNodeCall(node_list, "node_volumes", [])
1235

    
1236
  @_RpcTimeout(_TMO_FAST)
1237
  def call_node_demote_from_mc(self, node):
1238
    """Demote a node from the master candidate role.
1239

1240
    This is a single-node call.
1241

1242
    """
1243
    return self._SingleNodeCall(node, "node_demote_from_mc", [])
1244

    
1245
  @_RpcTimeout(_TMO_NORMAL)
1246
  def call_node_powercycle(self, node, hypervisor):
1247
    """Tries to powercycle a node.
1248

1249
    This is a single-node call.
1250

1251
    """
1252
    return self._SingleNodeCall(node, "node_powercycle", [hypervisor])
1253

    
1254
  @_RpcTimeout(None)
1255
  def call_test_delay(self, node_list, duration):
1256
    """Sleep for a fixed time on given node(s).
1257

1258
    This is a multi-node call.
1259

1260
    """
1261
    return self._MultiNodeCall(node_list, "test_delay", [duration],
1262
                               read_timeout=int(duration + 5))
1263

    
1264
  @_RpcTimeout(_TMO_FAST)
1265
  def call_file_storage_dir_create(self, node, file_storage_dir):
1266
    """Create the given file storage directory.
1267

1268
    This is a single-node call.
1269

1270
    """
1271
    return self._SingleNodeCall(node, "file_storage_dir_create",
1272
                                [file_storage_dir])
1273

    
1274
  @_RpcTimeout(_TMO_FAST)
1275
  def call_file_storage_dir_remove(self, node, file_storage_dir):
1276
    """Remove the given file storage directory.
1277

1278
    This is a single-node call.
1279

1280
    """
1281
    return self._SingleNodeCall(node, "file_storage_dir_remove",
1282
                                [file_storage_dir])
1283

    
1284
  @_RpcTimeout(_TMO_FAST)
1285
  def call_file_storage_dir_rename(self, node, old_file_storage_dir,
1286
                                   new_file_storage_dir):
1287
    """Rename file storage directory.
1288

1289
    This is a single-node call.
1290

1291
    """
1292
    return self._SingleNodeCall(node, "file_storage_dir_rename",
1293
                                [old_file_storage_dir, new_file_storage_dir])
1294

    
1295
  @classmethod
1296
  @_RpcTimeout(_TMO_FAST)
1297
  def call_jobqueue_update(cls, node_list, address_list, file_name, content):
1298
    """Update job queue.
1299

1300
    This is a multi-node call.
1301

1302
    """
1303
    return cls._StaticMultiNodeCall(node_list, "jobqueue_update",
1304
                                    [file_name, cls._Compress(content)],
1305
                                    address_list=address_list)
1306

    
1307
  @classmethod
1308
  @_RpcTimeout(_TMO_NORMAL)
1309
  def call_jobqueue_purge(cls, node):
1310
    """Purge job queue.
1311

1312
    This is a single-node call.
1313

1314
    """
1315
    return cls._StaticSingleNodeCall(node, "jobqueue_purge", [])
1316

    
1317
  @classmethod
1318
  @_RpcTimeout(_TMO_FAST)
1319
  def call_jobqueue_rename(cls, node_list, address_list, rename):
1320
    """Rename a job queue file.
1321

1322
    This is a multi-node call.
1323

1324
    """
1325
    return cls._StaticMultiNodeCall(node_list, "jobqueue_rename", rename,
1326
                                    address_list=address_list)
1327

    
1328
  @_RpcTimeout(_TMO_NORMAL)
1329
  def call_hypervisor_validate_params(self, node_list, hvname, hvparams):
1330
    """Validate the hypervisor params.
1331

1332
    This is a multi-node call.
1333

1334
    @type node_list: list
1335
    @param node_list: the list of nodes to query
1336
    @type hvname: string
1337
    @param hvname: the hypervisor name
1338
    @type hvparams: dict
1339
    @param hvparams: the hypervisor parameters to be validated
1340

1341
    """
1342
    cluster = self._cfg.GetClusterInfo()
1343
    hv_full = objects.FillDict(cluster.hvparams.get(hvname, {}), hvparams)
1344
    return self._MultiNodeCall(node_list, "hypervisor_validate_params",
1345
                               [hvname, hv_full])
1346

    
1347
  @_RpcTimeout(_TMO_NORMAL)
1348
  def call_x509_cert_create(self, node, validity):
1349
    """Creates a new X509 certificate for SSL/TLS.
1350

1351
    This is a single-node call.
1352

1353
    @type validity: int
1354
    @param validity: Validity in seconds
1355

1356
    """
1357
    return self._SingleNodeCall(node, "x509_cert_create", [validity])
1358

    
1359
  @_RpcTimeout(_TMO_NORMAL)
1360
  def call_x509_cert_remove(self, node, name):
1361
    """Removes a X509 certificate.
1362

1363
    This is a single-node call.
1364

1365
    @type name: string
1366
    @param name: Certificate name
1367

1368
    """
1369
    return self._SingleNodeCall(node, "x509_cert_remove", [name])
1370

    
1371
  @_RpcTimeout(_TMO_NORMAL)
1372
  def call_import_start(self, node, opts, instance, dest, dest_args):
1373
    """Starts a listener for an import.
1374

1375
    This is a single-node call.
1376

1377
    @type node: string
1378
    @param node: Node name
1379
    @type instance: C{objects.Instance}
1380
    @param instance: Instance object
1381

1382
    """
1383
    return self._SingleNodeCall(node, "import_start",
1384
                                [opts.ToDict(),
1385
                                 self._InstDict(instance), dest,
1386
                                 _EncodeImportExportIO(dest, dest_args)])
1387

    
1388
  @_RpcTimeout(_TMO_NORMAL)
1389
  def call_export_start(self, node, opts, host, port,
1390
                        instance, source, source_args):
1391
    """Starts an export daemon.
1392

1393
    This is a single-node call.
1394

1395
    @type node: string
1396
    @param node: Node name
1397
    @type instance: C{objects.Instance}
1398
    @param instance: Instance object
1399

1400
    """
1401
    return self._SingleNodeCall(node, "export_start",
1402
                                [opts.ToDict(), host, port,
1403
                                 self._InstDict(instance), source,
1404
                                 _EncodeImportExportIO(source, source_args)])
1405

    
1406
  @_RpcTimeout(_TMO_FAST)
1407
  def call_impexp_status(self, node, names):
1408
    """Gets the status of an import or export.
1409

1410
    This is a single-node call.
1411

1412
    @type node: string
1413
    @param node: Node name
1414
    @type names: List of strings
1415
    @param names: Import/export names
1416
    @rtype: List of L{objects.ImportExportStatus} instances
1417
    @return: Returns a list of the state of each named import/export or None if
1418
             a status couldn't be retrieved
1419

1420
    """
1421
    result = self._SingleNodeCall(node, "impexp_status", [names])
1422

    
1423
    if not result.fail_msg:
1424
      decoded = []
1425

    
1426
      for i in result.payload:
1427
        if i is None:
1428
          decoded.append(None)
1429
          continue
1430
        decoded.append(objects.ImportExportStatus.FromDict(i))
1431

    
1432
      result.payload = decoded
1433

    
1434
    return result
1435

    
1436
  @_RpcTimeout(_TMO_NORMAL)
1437
  def call_impexp_abort(self, node, name):
1438
    """Aborts an import or export.
1439

1440
    This is a single-node call.
1441

1442
    @type node: string
1443
    @param node: Node name
1444
    @type name: string
1445
    @param name: Import/export name
1446

1447
    """
1448
    return self._SingleNodeCall(node, "impexp_abort", [name])
1449

    
1450
  @_RpcTimeout(_TMO_NORMAL)
1451
  def call_impexp_cleanup(self, node, name):
1452
    """Cleans up after an import or export.
1453

1454
    This is a single-node call.
1455

1456
    @type node: string
1457
    @param node: Node name
1458
    @type name: string
1459
    @param name: Import/export name
1460

1461
    """
1462
    return self._SingleNodeCall(node, "impexp_cleanup", [name])