Statistics
| Branch: | Tag: | Revision:

root / lib / rpc.py @ 8d8c4eff

History | View | Annotate | Download (44.6 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008, 2009, 2010 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Inter-node RPC library.
23

24
"""
25

    
26
# pylint: disable-msg=C0103,R0201,R0904
27
# C0103: Invalid name, since call_ are not valid
28
# R0201: Method could be a function, we keep all rpcs instance methods
29
# as not to change them back and forth between static/instance methods
30
# if they need to start using instance attributes
31
# R0904: Too many public methods
32

    
33
import os
34
import logging
35
import zlib
36
import base64
37
import pycurl
38
import threading
39

    
40
from ganeti import utils
41
from ganeti import objects
42
from ganeti import http
43
from ganeti import serializer
44
from ganeti import constants
45
from ganeti import errors
46
from ganeti import netutils
47
from ganeti import ssconf
48

    
49
# pylint has a bug here, doesn't see this import
50
import ganeti.http.client  # pylint: disable-msg=W0611
51

    
52

    
53
# Timeout for connecting to nodes (seconds)
54
_RPC_CONNECT_TIMEOUT = 5
55

    
56
_RPC_CLIENT_HEADERS = [
57
  "Content-type: %s" % http.HTTP_APP_JSON,
58
  "Expect:",
59
  ]
60

    
61
# Various time constants for the timeout table
62
_TMO_URGENT = 60 # one minute
63
_TMO_FAST = 5 * 60 # five minutes
64
_TMO_NORMAL = 15 * 60 # 15 minutes
65
_TMO_SLOW = 3600 # one hour
66
_TMO_4HRS = 4 * 3600
67
_TMO_1DAY = 86400
68

    
69
# Timeout table that will be built later by decorators
70
# Guidelines for choosing timeouts:
71
# - call used during watcher: timeout -> 1min, _TMO_URGENT
72
# - trivial (but be sure it is trivial) (e.g. reading a file): 5min, _TMO_FAST
73
# - other calls: 15 min, _TMO_NORMAL
74
# - special calls (instance add, etc.): either _TMO_SLOW (1h) or huge timeouts
75

    
76
_TIMEOUTS = {
77
}
78

    
79

    
80
def Init():
81
  """Initializes the module-global HTTP client manager.
82

83
  Must be called before using any RPC function and while exactly one thread is
84
  running.
85

86
  """
87
  # curl_global_init(3) and curl_global_cleanup(3) must be called with only
88
  # one thread running. This check is just a safety measure -- it doesn't
89
  # cover all cases.
90
  assert threading.activeCount() == 1, \
91
         "Found more than one active thread when initializing pycURL"
92

    
93
  logging.info("Using PycURL %s", pycurl.version)
94

    
95
  pycurl.global_init(pycurl.GLOBAL_ALL)
96

    
97

    
98
def Shutdown():
99
  """Stops the module-global HTTP client manager.
100

101
  Must be called before quitting the program and while exactly one thread is
102
  running.
103

104
  """
105
  pycurl.global_cleanup()
106

    
107

    
108
def _ConfigRpcCurl(curl):
109
  noded_cert = str(constants.NODED_CERT_FILE)
110

    
111
  curl.setopt(pycurl.FOLLOWLOCATION, False)
112
  curl.setopt(pycurl.CAINFO, noded_cert)
113
  curl.setopt(pycurl.SSL_VERIFYHOST, 0)
114
  curl.setopt(pycurl.SSL_VERIFYPEER, True)
115
  curl.setopt(pycurl.SSLCERTTYPE, "PEM")
116
  curl.setopt(pycurl.SSLCERT, noded_cert)
117
  curl.setopt(pycurl.SSLKEYTYPE, "PEM")
118
  curl.setopt(pycurl.SSLKEY, noded_cert)
119
  curl.setopt(pycurl.CONNECTTIMEOUT, _RPC_CONNECT_TIMEOUT)
120

    
121

    
122
class _RpcThreadLocal(threading.local):
123
  def GetHttpClientPool(self):
124
    """Returns a per-thread HTTP client pool.
125

126
    @rtype: L{http.client.HttpClientPool}
127

128
    """
129
    try:
130
      pool = self.hcp
131
    except AttributeError:
132
      pool = http.client.HttpClientPool(_ConfigRpcCurl)
133
      self.hcp = pool
134

    
135
    return pool
136

    
137

    
138
_thread_local = _RpcThreadLocal()
139

    
140

    
141
def _RpcTimeout(secs):
142
  """Timeout decorator.
143

144
  When applied to a rpc call_* function, it updates the global timeout
145
  table with the given function/timeout.
146

147
  """
148
  def decorator(f):
149
    name = f.__name__
150
    assert name.startswith("call_")
151
    _TIMEOUTS[name[len("call_"):]] = secs
152
    return f
153
  return decorator
154

    
155

    
156
def RunWithRPC(fn):
157
  """RPC-wrapper decorator.
158

159
  When applied to a function, it runs it with the RPC system
160
  initialized, and it shutsdown the system afterwards. This means the
161
  function must be called without RPC being initialized.
162

163
  """
164
  def wrapper(*args, **kwargs):
165
    Init()
166
    try:
167
      return fn(*args, **kwargs)
168
    finally:
169
      Shutdown()
170
  return wrapper
171

    
172

    
173
class RpcResult(object):
174
  """RPC Result class.
175

176
  This class holds an RPC result. It is needed since in multi-node
177
  calls we can't raise an exception just because one one out of many
178
  failed, and therefore we use this class to encapsulate the result.
179

180
  @ivar data: the data payload, for successful results, or None
181
  @ivar call: the name of the RPC call
182
  @ivar node: the name of the node to which we made the call
183
  @ivar offline: whether the operation failed because the node was
184
      offline, as opposed to actual failure; offline=True will always
185
      imply failed=True, in order to allow simpler checking if
186
      the user doesn't care about the exact failure mode
187
  @ivar fail_msg: the error message if the call failed
188

189
  """
190
  def __init__(self, data=None, failed=False, offline=False,
191
               call=None, node=None):
192
    self.offline = offline
193
    self.call = call
194
    self.node = node
195

    
196
    if offline:
197
      self.fail_msg = "Node is marked offline"
198
      self.data = self.payload = None
199
    elif failed:
200
      self.fail_msg = self._EnsureErr(data)
201
      self.data = self.payload = None
202
    else:
203
      self.data = data
204
      if not isinstance(self.data, (tuple, list)):
205
        self.fail_msg = ("RPC layer error: invalid result type (%s)" %
206
                         type(self.data))
207
        self.payload = None
208
      elif len(data) != 2:
209
        self.fail_msg = ("RPC layer error: invalid result length (%d), "
210
                         "expected 2" % len(self.data))
211
        self.payload = None
212
      elif not self.data[0]:
213
        self.fail_msg = self._EnsureErr(self.data[1])
214
        self.payload = None
215
      else:
216
        # finally success
217
        self.fail_msg = None
218
        self.payload = data[1]
219

    
220
    assert hasattr(self, "call")
221
    assert hasattr(self, "data")
222
    assert hasattr(self, "fail_msg")
223
    assert hasattr(self, "node")
224
    assert hasattr(self, "offline")
225
    assert hasattr(self, "payload")
226

    
227
  @staticmethod
228
  def _EnsureErr(val):
229
    """Helper to ensure we return a 'True' value for error."""
230
    if val:
231
      return val
232
    else:
233
      return "No error information"
234

    
235
  def Raise(self, msg, prereq=False, ecode=None):
236
    """If the result has failed, raise an OpExecError.
237

238
    This is used so that LU code doesn't have to check for each
239
    result, but instead can call this function.
240

241
    """
242
    if not self.fail_msg:
243
      return
244

    
245
    if not msg: # one could pass None for default message
246
      msg = ("Call '%s' to node '%s' has failed: %s" %
247
             (self.call, self.node, self.fail_msg))
248
    else:
249
      msg = "%s: %s" % (msg, self.fail_msg)
250
    if prereq:
251
      ec = errors.OpPrereqError
252
    else:
253
      ec = errors.OpExecError
254
    if ecode is not None:
255
      args = (msg, ecode)
256
    else:
257
      args = (msg, )
258
    raise ec(*args) # pylint: disable-msg=W0142
259

    
260

    
261
def _AddressLookup(node_list,
262
                   ssc=ssconf.SimpleStore,
263
                   nslookup_fn=netutils.Hostname.GetIP):
264
  """Return addresses for given node names.
265

266
  @type node_list: list
267
  @param node_list: List of node names
268
  @type ssc: class
269
  @param ssc: SimpleStore class that is used to obtain node->ip mappings
270
  @type nslookup_fn: callable
271
  @param nslookup_fn: function use to do NS lookup
272
  @rtype: list of addresses and/or None's
273
  @returns: List of corresponding addresses, if found
274

275
  """
276
  ss = ssc()
277
  iplist = ss.GetNodePrimaryIPList()
278
  family = ss.GetPrimaryIPFamily()
279
  addresses = []
280
  ipmap = dict(entry.split() for entry in iplist)
281
  for node in node_list:
282
    address = ipmap.get(node)
283
    if address is None:
284
      address = nslookup_fn(node, family=family)
285
    addresses.append(address)
286

    
287
  return addresses
288

    
289

    
290
class Client:
291
  """RPC Client class.
292

293
  This class, given a (remote) method name, a list of parameters and a
294
  list of nodes, will contact (in parallel) all nodes, and return a
295
  dict of results (key: node name, value: result).
296

297
  One current bug is that generic failure is still signaled by
298
  'False' result, which is not good. This overloading of values can
299
  cause bugs.
300

301
  """
302
  def __init__(self, procedure, body, port, address_lookup_fn=_AddressLookup):
303
    assert procedure in _TIMEOUTS, ("New RPC call not declared in the"
304
                                    " timeouts table")
305
    self.procedure = procedure
306
    self.body = body
307
    self.port = port
308
    self._request = {}
309
    self._address_lookup_fn = address_lookup_fn
310

    
311
  def ConnectList(self, node_list, address_list=None, read_timeout=None):
312
    """Add a list of nodes to the target nodes.
313

314
    @type node_list: list
315
    @param node_list: the list of node names to connect
316
    @type address_list: list or None
317
    @keyword address_list: either None or a list with node addresses,
318
        which must have the same length as the node list
319
    @type read_timeout: int
320
    @param read_timeout: overwrites default timeout for operation
321

322
    """
323
    if address_list is None:
324
      # Always use IP address instead of node name
325
      address_list = self._address_lookup_fn(node_list)
326

    
327
    assert len(node_list) == len(address_list), \
328
           "Name and address lists must have the same length"
329

    
330
    for node, address in zip(node_list, address_list):
331
      self.ConnectNode(node, address, read_timeout=read_timeout)
332

    
333
  def ConnectNode(self, name, address=None, read_timeout=None):
334
    """Add a node to the target list.
335

336
    @type name: str
337
    @param name: the node name
338
    @type address: str
339
    @param address: the node address, if known
340
    @type read_timeout: int
341
    @param read_timeout: overwrites default timeout for operation
342

343
    """
344
    if address is None:
345
      # Always use IP address instead of node name
346
      address = self._address_lookup_fn([name])[0]
347

    
348
    assert(address is not None)
349

    
350
    if read_timeout is None:
351
      read_timeout = _TIMEOUTS[self.procedure]
352

    
353
    self._request[name] = \
354
      http.client.HttpClientRequest(str(address), self.port,
355
                                    http.HTTP_PUT, str("/%s" % self.procedure),
356
                                    headers=_RPC_CLIENT_HEADERS,
357
                                    post_data=str(self.body),
358
                                    read_timeout=read_timeout)
359

    
360
  def GetResults(self, http_pool=None):
361
    """Call nodes and return results.
362

363
    @rtype: list
364
    @return: List of RPC results
365

366
    """
367
    if not http_pool:
368
      http_pool = _thread_local.GetHttpClientPool()
369

    
370
    http_pool.ProcessRequests(self._request.values())
371

    
372
    results = {}
373

    
374
    for name, req in self._request.iteritems():
375
      if req.success and req.resp_status_code == http.HTTP_OK:
376
        results[name] = RpcResult(data=serializer.LoadJson(req.resp_body),
377
                                  node=name, call=self.procedure)
378
        continue
379

    
380
      # TODO: Better error reporting
381
      if req.error:
382
        msg = req.error
383
      else:
384
        msg = req.resp_body
385

    
386
      logging.error("RPC error in %s from node %s: %s",
387
                    self.procedure, name, msg)
388
      results[name] = RpcResult(data=msg, failed=True, node=name,
389
                                call=self.procedure)
390

    
391
    return results
392

    
393

    
394
def _EncodeImportExportIO(ieio, ieioargs):
395
  """Encodes import/export I/O information.
396

397
  """
398
  if ieio == constants.IEIO_RAW_DISK:
399
    assert len(ieioargs) == 1
400
    return (ieioargs[0].ToDict(), )
401

    
402
  if ieio == constants.IEIO_SCRIPT:
403
    assert len(ieioargs) == 2
404
    return (ieioargs[0].ToDict(), ieioargs[1])
405

    
406
  return ieioargs
407

    
408

    
409
class RpcRunner(object):
410
  """RPC runner class"""
411

    
412
  def __init__(self, cfg):
413
    """Initialized the rpc runner.
414

415
    @type cfg:  C{config.ConfigWriter}
416
    @param cfg: the configuration object that will be used to get data
417
                about the cluster
418

419
    """
420
    self._cfg = cfg
421
    self.port = netutils.GetDaemonPort(constants.NODED)
422

    
423
  def _InstDict(self, instance, hvp=None, bep=None, osp=None):
424
    """Convert the given instance to a dict.
425

426
    This is done via the instance's ToDict() method and additionally
427
    we fill the hvparams with the cluster defaults.
428

429
    @type instance: L{objects.Instance}
430
    @param instance: an Instance object
431
    @type hvp: dict or None
432
    @param hvp: a dictionary with overridden hypervisor parameters
433
    @type bep: dict or None
434
    @param bep: a dictionary with overridden backend parameters
435
    @type osp: dict or None
436
    @param osp: a dictionary with overridden os parameters
437
    @rtype: dict
438
    @return: the instance dict, with the hvparams filled with the
439
        cluster defaults
440

441
    """
442
    idict = instance.ToDict()
443
    cluster = self._cfg.GetClusterInfo()
444
    idict["hvparams"] = cluster.FillHV(instance)
445
    if hvp is not None:
446
      idict["hvparams"].update(hvp)
447
    idict["beparams"] = cluster.FillBE(instance)
448
    if bep is not None:
449
      idict["beparams"].update(bep)
450
    idict["osparams"] = cluster.SimpleFillOS(instance.os, instance.osparams)
451
    if osp is not None:
452
      idict["osparams"].update(osp)
453
    for nic in idict["nics"]:
454
      nic['nicparams'] = objects.FillDict(
455
        cluster.nicparams[constants.PP_DEFAULT],
456
        nic['nicparams'])
457
    return idict
458

    
459
  def _ConnectList(self, client, node_list, call, read_timeout=None):
460
    """Helper for computing node addresses.
461

462
    @type client: L{ganeti.rpc.Client}
463
    @param client: a C{Client} instance
464
    @type node_list: list
465
    @param node_list: the node list we should connect
466
    @type call: string
467
    @param call: the name of the remote procedure call, for filling in
468
        correctly any eventual offline nodes' results
469
    @type read_timeout: int
470
    @param read_timeout: overwrites the default read timeout for the
471
        given operation
472

473
    """
474
    all_nodes = self._cfg.GetAllNodesInfo()
475
    name_list = []
476
    addr_list = []
477
    skip_dict = {}
478
    for node in node_list:
479
      if node in all_nodes:
480
        if all_nodes[node].offline:
481
          skip_dict[node] = RpcResult(node=node, offline=True, call=call)
482
          continue
483
        val = all_nodes[node].primary_ip
484
      else:
485
        val = None
486
      addr_list.append(val)
487
      name_list.append(node)
488
    if name_list:
489
      client.ConnectList(name_list, address_list=addr_list,
490
                         read_timeout=read_timeout)
491
    return skip_dict
492

    
493
  def _ConnectNode(self, client, node, call, read_timeout=None):
494
    """Helper for computing one node's address.
495

496
    @type client: L{ganeti.rpc.Client}
497
    @param client: a C{Client} instance
498
    @type node: str
499
    @param node: the node we should connect
500
    @type call: string
501
    @param call: the name of the remote procedure call, for filling in
502
        correctly any eventual offline nodes' results
503
    @type read_timeout: int
504
    @param read_timeout: overwrites the default read timeout for the
505
        given operation
506

507
    """
508
    node_info = self._cfg.GetNodeInfo(node)
509
    if node_info is not None:
510
      if node_info.offline:
511
        return RpcResult(node=node, offline=True, call=call)
512
      addr = node_info.primary_ip
513
    else:
514
      addr = None
515
    client.ConnectNode(node, address=addr, read_timeout=read_timeout)
516

    
517
  def _MultiNodeCall(self, node_list, procedure, args, read_timeout=None):
518
    """Helper for making a multi-node call
519

520
    """
521
    body = serializer.DumpJson(args, indent=False)
522
    c = Client(procedure, body, self.port)
523
    skip_dict = self._ConnectList(c, node_list, procedure,
524
                                  read_timeout=read_timeout)
525
    skip_dict.update(c.GetResults())
526
    return skip_dict
527

    
528
  @classmethod
529
  def _StaticMultiNodeCall(cls, node_list, procedure, args,
530
                           address_list=None, read_timeout=None):
531
    """Helper for making a multi-node static call
532

533
    """
534
    body = serializer.DumpJson(args, indent=False)
535
    c = Client(procedure, body, netutils.GetDaemonPort(constants.NODED))
536
    c.ConnectList(node_list, address_list=address_list,
537
                  read_timeout=read_timeout)
538
    return c.GetResults()
539

    
540
  def _SingleNodeCall(self, node, procedure, args, read_timeout=None):
541
    """Helper for making a single-node call
542

543
    """
544
    body = serializer.DumpJson(args, indent=False)
545
    c = Client(procedure, body, self.port)
546
    result = self._ConnectNode(c, node, procedure, read_timeout=read_timeout)
547
    if result is None:
548
      # we did connect, node is not offline
549
      result = c.GetResults()[node]
550
    return result
551

    
552
  @classmethod
553
  def _StaticSingleNodeCall(cls, node, procedure, args, read_timeout=None):
554
    """Helper for making a single-node static call
555

556
    """
557
    body = serializer.DumpJson(args, indent=False)
558
    c = Client(procedure, body, netutils.GetDaemonPort(constants.NODED))
559
    c.ConnectNode(node, read_timeout=read_timeout)
560
    return c.GetResults()[node]
561

    
562
  @staticmethod
563
  def _Compress(data):
564
    """Compresses a string for transport over RPC.
565

566
    Small amounts of data are not compressed.
567

568
    @type data: str
569
    @param data: Data
570
    @rtype: tuple
571
    @return: Encoded data to send
572

573
    """
574
    # Small amounts of data are not compressed
575
    if len(data) < 512:
576
      return (constants.RPC_ENCODING_NONE, data)
577

    
578
    # Compress with zlib and encode in base64
579
    return (constants.RPC_ENCODING_ZLIB_BASE64,
580
            base64.b64encode(zlib.compress(data, 3)))
581

    
582
  #
583
  # Begin RPC calls
584
  #
585

    
586
  @_RpcTimeout(_TMO_URGENT)
587
  def call_lv_list(self, node_list, vg_name):
588
    """Gets the logical volumes present in a given volume group.
589

590
    This is a multi-node call.
591

592
    """
593
    return self._MultiNodeCall(node_list, "lv_list", [vg_name])
594

    
595
  @_RpcTimeout(_TMO_URGENT)
596
  def call_vg_list(self, node_list):
597
    """Gets the volume group list.
598

599
    This is a multi-node call.
600

601
    """
602
    return self._MultiNodeCall(node_list, "vg_list", [])
603

    
604
  @_RpcTimeout(_TMO_NORMAL)
605
  def call_storage_list(self, node_list, su_name, su_args, name, fields):
606
    """Get list of storage units.
607

608
    This is a multi-node call.
609

610
    """
611
    return self._MultiNodeCall(node_list, "storage_list",
612
                               [su_name, su_args, name, fields])
613

    
614
  @_RpcTimeout(_TMO_NORMAL)
615
  def call_storage_modify(self, node, su_name, su_args, name, changes):
616
    """Modify a storage unit.
617

618
    This is a single-node call.
619

620
    """
621
    return self._SingleNodeCall(node, "storage_modify",
622
                                [su_name, su_args, name, changes])
623

    
624
  @_RpcTimeout(_TMO_NORMAL)
625
  def call_storage_execute(self, node, su_name, su_args, name, op):
626
    """Executes an operation on a storage unit.
627

628
    This is a single-node call.
629

630
    """
631
    return self._SingleNodeCall(node, "storage_execute",
632
                                [su_name, su_args, name, op])
633

    
634
  @_RpcTimeout(_TMO_URGENT)
635
  def call_bridges_exist(self, node, bridges_list):
636
    """Checks if a node has all the bridges given.
637

638
    This method checks if all bridges given in the bridges_list are
639
    present on the remote node, so that an instance that uses interfaces
640
    on those bridges can be started.
641

642
    This is a single-node call.
643

644
    """
645
    return self._SingleNodeCall(node, "bridges_exist", [bridges_list])
646

    
647
  @_RpcTimeout(_TMO_NORMAL)
648
  def call_instance_start(self, node, instance, hvp, bep):
649
    """Starts an instance.
650

651
    This is a single-node call.
652

653
    """
654
    idict = self._InstDict(instance, hvp=hvp, bep=bep)
655
    return self._SingleNodeCall(node, "instance_start", [idict])
656

    
657
  @_RpcTimeout(_TMO_NORMAL)
658
  def call_instance_shutdown(self, node, instance, timeout):
659
    """Stops an instance.
660

661
    This is a single-node call.
662

663
    """
664
    return self._SingleNodeCall(node, "instance_shutdown",
665
                                [self._InstDict(instance), timeout])
666

    
667
  @_RpcTimeout(_TMO_NORMAL)
668
  def call_migration_info(self, node, instance):
669
    """Gather the information necessary to prepare an instance migration.
670

671
    This is a single-node call.
672

673
    @type node: string
674
    @param node: the node on which the instance is currently running
675
    @type instance: C{objects.Instance}
676
    @param instance: the instance definition
677

678
    """
679
    return self._SingleNodeCall(node, "migration_info",
680
                                [self._InstDict(instance)])
681

    
682
  @_RpcTimeout(_TMO_NORMAL)
683
  def call_accept_instance(self, node, instance, info, target):
684
    """Prepare a node to accept an instance.
685

686
    This is a single-node call.
687

688
    @type node: string
689
    @param node: the target node for the migration
690
    @type instance: C{objects.Instance}
691
    @param instance: the instance definition
692
    @type info: opaque/hypervisor specific (string/data)
693
    @param info: result for the call_migration_info call
694
    @type target: string
695
    @param target: target hostname (usually ip address) (on the node itself)
696

697
    """
698
    return self._SingleNodeCall(node, "accept_instance",
699
                                [self._InstDict(instance), info, target])
700

    
701
  @_RpcTimeout(_TMO_NORMAL)
702
  def call_finalize_migration(self, node, instance, info, success):
703
    """Finalize any target-node migration specific operation.
704

705
    This is called both in case of a successful migration and in case of error
706
    (in which case it should abort the migration).
707

708
    This is a single-node call.
709

710
    @type node: string
711
    @param node: the target node for the migration
712
    @type instance: C{objects.Instance}
713
    @param instance: the instance definition
714
    @type info: opaque/hypervisor specific (string/data)
715
    @param info: result for the call_migration_info call
716
    @type success: boolean
717
    @param success: whether the migration was a success or a failure
718

719
    """
720
    return self._SingleNodeCall(node, "finalize_migration",
721
                                [self._InstDict(instance), info, success])
722

    
723
  @_RpcTimeout(_TMO_SLOW)
724
  def call_instance_migrate(self, node, instance, target, live):
725
    """Migrate an instance.
726

727
    This is a single-node call.
728

729
    @type node: string
730
    @param node: the node on which the instance is currently running
731
    @type instance: C{objects.Instance}
732
    @param instance: the instance definition
733
    @type target: string
734
    @param target: the target node name
735
    @type live: boolean
736
    @param live: whether the migration should be done live or not (the
737
        interpretation of this parameter is left to the hypervisor)
738

739
    """
740
    return self._SingleNodeCall(node, "instance_migrate",
741
                                [self._InstDict(instance), target, live])
742

    
743
  @_RpcTimeout(_TMO_NORMAL)
744
  def call_instance_reboot(self, node, inst, reboot_type, shutdown_timeout):
745
    """Reboots an instance.
746

747
    This is a single-node call.
748

749
    """
750
    return self._SingleNodeCall(node, "instance_reboot",
751
                                [self._InstDict(inst), reboot_type,
752
                                 shutdown_timeout])
753

    
754
  @_RpcTimeout(_TMO_1DAY)
755
  def call_instance_os_add(self, node, inst, reinstall, debug, osparams=None):
756
    """Installs an OS on the given instance.
757

758
    This is a single-node call.
759

760
    """
761
    return self._SingleNodeCall(node, "instance_os_add",
762
                                [self._InstDict(inst, osp=osparams),
763
                                 reinstall, debug])
764

    
765
  @_RpcTimeout(_TMO_SLOW)
766
  def call_instance_run_rename(self, node, inst, old_name, debug):
767
    """Run the OS rename script for an instance.
768

769
    This is a single-node call.
770

771
    """
772
    return self._SingleNodeCall(node, "instance_run_rename",
773
                                [self._InstDict(inst), old_name, debug])
774

    
775
  @_RpcTimeout(_TMO_URGENT)
776
  def call_instance_info(self, node, instance, hname):
777
    """Returns information about a single instance.
778

779
    This is a single-node call.
780

781
    @type node: list
782
    @param node: the list of nodes to query
783
    @type instance: string
784
    @param instance: the instance name
785
    @type hname: string
786
    @param hname: the hypervisor type of the instance
787

788
    """
789
    return self._SingleNodeCall(node, "instance_info", [instance, hname])
790

    
791
  @_RpcTimeout(_TMO_NORMAL)
792
  def call_instance_migratable(self, node, instance):
793
    """Checks whether the given instance can be migrated.
794

795
    This is a single-node call.
796

797
    @param node: the node to query
798
    @type instance: L{objects.Instance}
799
    @param instance: the instance to check
800

801

802
    """
803
    return self._SingleNodeCall(node, "instance_migratable",
804
                                [self._InstDict(instance)])
805

    
806
  @_RpcTimeout(_TMO_URGENT)
807
  def call_all_instances_info(self, node_list, hypervisor_list):
808
    """Returns information about all instances on the given nodes.
809

810
    This is a multi-node call.
811

812
    @type node_list: list
813
    @param node_list: the list of nodes to query
814
    @type hypervisor_list: list
815
    @param hypervisor_list: the hypervisors to query for instances
816

817
    """
818
    return self._MultiNodeCall(node_list, "all_instances_info",
819
                               [hypervisor_list])
820

    
821
  @_RpcTimeout(_TMO_URGENT)
822
  def call_instance_list(self, node_list, hypervisor_list):
823
    """Returns the list of running instances on a given node.
824

825
    This is a multi-node call.
826

827
    @type node_list: list
828
    @param node_list: the list of nodes to query
829
    @type hypervisor_list: list
830
    @param hypervisor_list: the hypervisors to query for instances
831

832
    """
833
    return self._MultiNodeCall(node_list, "instance_list", [hypervisor_list])
834

    
835
  @_RpcTimeout(_TMO_FAST)
836
  def call_node_tcp_ping(self, node, source, target, port, timeout,
837
                         live_port_needed):
838
    """Do a TcpPing on the remote node
839

840
    This is a single-node call.
841

842
    """
843
    return self._SingleNodeCall(node, "node_tcp_ping",
844
                                [source, target, port, timeout,
845
                                 live_port_needed])
846

    
847
  @_RpcTimeout(_TMO_FAST)
848
  def call_node_has_ip_address(self, node, address):
849
    """Checks if a node has the given IP address.
850

851
    This is a single-node call.
852

853
    """
854
    return self._SingleNodeCall(node, "node_has_ip_address", [address])
855

    
856
  @_RpcTimeout(_TMO_URGENT)
857
  def call_node_info(self, node_list, vg_name, hypervisor_type):
858
    """Return node information.
859

860
    This will return memory information and volume group size and free
861
    space.
862

863
    This is a multi-node call.
864

865
    @type node_list: list
866
    @param node_list: the list of nodes to query
867
    @type vg_name: C{string}
868
    @param vg_name: the name of the volume group to ask for disk space
869
        information
870
    @type hypervisor_type: C{str}
871
    @param hypervisor_type: the name of the hypervisor to ask for
872
        memory information
873

874
    """
875
    return self._MultiNodeCall(node_list, "node_info",
876
                               [vg_name, hypervisor_type])
877

    
878
  @_RpcTimeout(_TMO_NORMAL)
879
  def call_etc_hosts_modify(self, node, mode, name, ip):
880
    """Modify hosts file with name
881

882
    @type node: string
883
    @param node: The node to call
884
    @type mode: string
885
    @param mode: The mode to operate. Currently "add" or "remove"
886
    @type name: string
887
    @param name: The host name to be modified
888
    @type ip: string
889
    @param ip: The ip of the entry (just valid if mode is "add")
890

891
    """
892
    return self._SingleNodeCall(node, "etc_hosts_modify", [mode, name, ip])
893

    
894
  @_RpcTimeout(_TMO_NORMAL)
895
  def call_node_verify(self, node_list, checkdict, cluster_name):
896
    """Request verification of given parameters.
897

898
    This is a multi-node call.
899

900
    """
901
    return self._MultiNodeCall(node_list, "node_verify",
902
                               [checkdict, cluster_name])
903

    
904
  @classmethod
905
  @_RpcTimeout(_TMO_FAST)
906
  def call_node_start_master(cls, node, start_daemons, no_voting):
907
    """Tells a node to activate itself as a master.
908

909
    This is a single-node call.
910

911
    """
912
    return cls._StaticSingleNodeCall(node, "node_start_master",
913
                                     [start_daemons, no_voting])
914

    
915
  @classmethod
916
  @_RpcTimeout(_TMO_FAST)
917
  def call_node_stop_master(cls, node, stop_daemons):
918
    """Tells a node to demote itself from master status.
919

920
    This is a single-node call.
921

922
    """
923
    return cls._StaticSingleNodeCall(node, "node_stop_master", [stop_daemons])
924

    
925
  @classmethod
926
  @_RpcTimeout(_TMO_URGENT)
927
  def call_master_info(cls, node_list):
928
    """Query master info.
929

930
    This is a multi-node call.
931

932
    """
933
    # TODO: should this method query down nodes?
934
    return cls._StaticMultiNodeCall(node_list, "master_info", [])
935

    
936
  @classmethod
937
  @_RpcTimeout(_TMO_URGENT)
938
  def call_version(cls, node_list):
939
    """Query node version.
940

941
    This is a multi-node call.
942

943
    """
944
    return cls._StaticMultiNodeCall(node_list, "version", [])
945

    
946
  @_RpcTimeout(_TMO_NORMAL)
947
  def call_blockdev_create(self, node, bdev, size, owner, on_primary, info):
948
    """Request creation of a given block device.
949

950
    This is a single-node call.
951

952
    """
953
    return self._SingleNodeCall(node, "blockdev_create",
954
                                [bdev.ToDict(), size, owner, on_primary, info])
955

    
956
  @_RpcTimeout(_TMO_NORMAL)
957
  def call_blockdev_remove(self, node, bdev):
958
    """Request removal of a given block device.
959

960
    This is a single-node call.
961

962
    """
963
    return self._SingleNodeCall(node, "blockdev_remove", [bdev.ToDict()])
964

    
965
  @_RpcTimeout(_TMO_NORMAL)
966
  def call_blockdev_rename(self, node, devlist):
967
    """Request rename of the given block devices.
968

969
    This is a single-node call.
970

971
    """
972
    return self._SingleNodeCall(node, "blockdev_rename",
973
                                [(d.ToDict(), uid) for d, uid in devlist])
974

    
975
  @_RpcTimeout(_TMO_NORMAL)
976
  def call_blockdev_assemble(self, node, disk, owner, on_primary):
977
    """Request assembling of a given block device.
978

979
    This is a single-node call.
980

981
    """
982
    return self._SingleNodeCall(node, "blockdev_assemble",
983
                                [disk.ToDict(), owner, on_primary])
984

    
985
  @_RpcTimeout(_TMO_NORMAL)
986
  def call_blockdev_shutdown(self, node, disk):
987
    """Request shutdown of a given block device.
988

989
    This is a single-node call.
990

991
    """
992
    return self._SingleNodeCall(node, "blockdev_shutdown", [disk.ToDict()])
993

    
994
  @_RpcTimeout(_TMO_NORMAL)
995
  def call_blockdev_addchildren(self, node, bdev, ndevs):
996
    """Request adding a list of children to a (mirroring) device.
997

998
    This is a single-node call.
999

1000
    """
1001
    return self._SingleNodeCall(node, "blockdev_addchildren",
1002
                                [bdev.ToDict(),
1003
                                 [disk.ToDict() for disk in ndevs]])
1004

    
1005
  @_RpcTimeout(_TMO_NORMAL)
1006
  def call_blockdev_removechildren(self, node, bdev, ndevs):
1007
    """Request removing a list of children from a (mirroring) device.
1008

1009
    This is a single-node call.
1010

1011
    """
1012
    return self._SingleNodeCall(node, "blockdev_removechildren",
1013
                                [bdev.ToDict(),
1014
                                 [disk.ToDict() for disk in ndevs]])
1015

    
1016
  @_RpcTimeout(_TMO_NORMAL)
1017
  def call_blockdev_getmirrorstatus(self, node, disks):
1018
    """Request status of a (mirroring) device.
1019

1020
    This is a single-node call.
1021

1022
    """
1023
    result = self._SingleNodeCall(node, "blockdev_getmirrorstatus",
1024
                                  [dsk.ToDict() for dsk in disks])
1025
    if not result.fail_msg:
1026
      result.payload = [objects.BlockDevStatus.FromDict(i)
1027
                        for i in result.payload]
1028
    return result
1029

    
1030
  @_RpcTimeout(_TMO_NORMAL)
1031
  def call_blockdev_find(self, node, disk):
1032
    """Request identification of a given block device.
1033

1034
    This is a single-node call.
1035

1036
    """
1037
    result = self._SingleNodeCall(node, "blockdev_find", [disk.ToDict()])
1038
    if not result.fail_msg and result.payload is not None:
1039
      result.payload = objects.BlockDevStatus.FromDict(result.payload)
1040
    return result
1041

    
1042
  @_RpcTimeout(_TMO_NORMAL)
1043
  def call_blockdev_close(self, node, instance_name, disks):
1044
    """Closes the given block devices.
1045

1046
    This is a single-node call.
1047

1048
    """
1049
    params = [instance_name, [cf.ToDict() for cf in disks]]
1050
    return self._SingleNodeCall(node, "blockdev_close", params)
1051

    
1052
  @_RpcTimeout(_TMO_NORMAL)
1053
  def call_blockdev_getsizes(self, node, disks):
1054
    """Returns the size of the given disks.
1055

1056
    This is a single-node call.
1057

1058
    """
1059
    params = [[cf.ToDict() for cf in disks]]
1060
    return self._SingleNodeCall(node, "blockdev_getsize", params)
1061

    
1062
  @_RpcTimeout(_TMO_NORMAL)
1063
  def call_drbd_disconnect_net(self, node_list, nodes_ip, disks):
1064
    """Disconnects the network of the given drbd devices.
1065

1066
    This is a multi-node call.
1067

1068
    """
1069
    return self._MultiNodeCall(node_list, "drbd_disconnect_net",
1070
                               [nodes_ip, [cf.ToDict() for cf in disks]])
1071

    
1072
  @_RpcTimeout(_TMO_NORMAL)
1073
  def call_drbd_attach_net(self, node_list, nodes_ip,
1074
                           disks, instance_name, multimaster):
1075
    """Disconnects the given drbd devices.
1076

1077
    This is a multi-node call.
1078

1079
    """
1080
    return self._MultiNodeCall(node_list, "drbd_attach_net",
1081
                               [nodes_ip, [cf.ToDict() for cf in disks],
1082
                                instance_name, multimaster])
1083

    
1084
  @_RpcTimeout(_TMO_SLOW)
1085
  def call_drbd_wait_sync(self, node_list, nodes_ip, disks):
1086
    """Waits for the synchronization of drbd devices is complete.
1087

1088
    This is a multi-node call.
1089

1090
    """
1091
    return self._MultiNodeCall(node_list, "drbd_wait_sync",
1092
                               [nodes_ip, [cf.ToDict() for cf in disks]])
1093

    
1094
  @_RpcTimeout(_TMO_URGENT)
1095
  def call_drbd_helper(self, node_list):
1096
    """Gets drbd helper.
1097

1098
    This is a multi-node call.
1099

1100
    """
1101
    return self._MultiNodeCall(node_list, "drbd_helper", [])
1102

    
1103
  @classmethod
1104
  @_RpcTimeout(_TMO_NORMAL)
1105
  def call_upload_file(cls, node_list, file_name, address_list=None):
1106
    """Upload a file.
1107

1108
    The node will refuse the operation in case the file is not on the
1109
    approved file list.
1110

1111
    This is a multi-node call.
1112

1113
    @type node_list: list
1114
    @param node_list: the list of node names to upload to
1115
    @type file_name: str
1116
    @param file_name: the filename to upload
1117
    @type address_list: list or None
1118
    @keyword address_list: an optional list of node addresses, in order
1119
        to optimize the RPC speed
1120

1121
    """
1122
    file_contents = utils.ReadFile(file_name)
1123
    data = cls._Compress(file_contents)
1124
    st = os.stat(file_name)
1125
    params = [file_name, data, st.st_mode, st.st_uid, st.st_gid,
1126
              st.st_atime, st.st_mtime]
1127
    return cls._StaticMultiNodeCall(node_list, "upload_file", params,
1128
                                    address_list=address_list)
1129

    
1130
  @classmethod
1131
  @_RpcTimeout(_TMO_NORMAL)
1132
  def call_write_ssconf_files(cls, node_list, values):
1133
    """Write ssconf files.
1134

1135
    This is a multi-node call.
1136

1137
    """
1138
    return cls._StaticMultiNodeCall(node_list, "write_ssconf_files", [values])
1139

    
1140
  @_RpcTimeout(_TMO_FAST)
1141
  def call_os_diagnose(self, node_list):
1142
    """Request a diagnose of OS definitions.
1143

1144
    This is a multi-node call.
1145

1146
    """
1147
    return self._MultiNodeCall(node_list, "os_diagnose", [])
1148

    
1149
  @_RpcTimeout(_TMO_FAST)
1150
  def call_os_get(self, node, name):
1151
    """Returns an OS definition.
1152

1153
    This is a single-node call.
1154

1155
    """
1156
    result = self._SingleNodeCall(node, "os_get", [name])
1157
    if not result.fail_msg and isinstance(result.payload, dict):
1158
      result.payload = objects.OS.FromDict(result.payload)
1159
    return result
1160

    
1161
  @_RpcTimeout(_TMO_FAST)
1162
  def call_os_validate(self, required, nodes, name, checks, params):
1163
    """Run a validation routine for a given OS.
1164

1165
    This is a multi-node call.
1166

1167
    """
1168
    return self._MultiNodeCall(nodes, "os_validate",
1169
                               [required, name, checks, params])
1170

    
1171
  @_RpcTimeout(_TMO_NORMAL)
1172
  def call_hooks_runner(self, node_list, hpath, phase, env):
1173
    """Call the hooks runner.
1174

1175
    Args:
1176
      - op: the OpCode instance
1177
      - env: a dictionary with the environment
1178

1179
    This is a multi-node call.
1180

1181
    """
1182
    params = [hpath, phase, env]
1183
    return self._MultiNodeCall(node_list, "hooks_runner", params)
1184

    
1185
  @_RpcTimeout(_TMO_NORMAL)
1186
  def call_iallocator_runner(self, node, name, idata):
1187
    """Call an iallocator on a remote node
1188

1189
    Args:
1190
      - name: the iallocator name
1191
      - input: the json-encoded input string
1192

1193
    This is a single-node call.
1194

1195
    """
1196
    return self._SingleNodeCall(node, "iallocator_runner", [name, idata])
1197

    
1198
  @_RpcTimeout(_TMO_NORMAL)
1199
  def call_blockdev_grow(self, node, cf_bdev, amount):
1200
    """Request a snapshot of the given block device.
1201

1202
    This is a single-node call.
1203

1204
    """
1205
    return self._SingleNodeCall(node, "blockdev_grow",
1206
                                [cf_bdev.ToDict(), amount])
1207

    
1208
  @_RpcTimeout(_TMO_1DAY)
1209
  def call_blockdev_export(self, node, cf_bdev,
1210
                           dest_node, dest_path, cluster_name):
1211
    """Export a given disk to another node.
1212

1213
    This is a single-node call.
1214

1215
    """
1216
    return self._SingleNodeCall(node, "blockdev_export",
1217
                                [cf_bdev.ToDict(), dest_node, dest_path,
1218
                                 cluster_name])
1219

    
1220
  @_RpcTimeout(_TMO_NORMAL)
1221
  def call_blockdev_snapshot(self, node, cf_bdev):
1222
    """Request a snapshot of the given block device.
1223

1224
    This is a single-node call.
1225

1226
    """
1227
    return self._SingleNodeCall(node, "blockdev_snapshot", [cf_bdev.ToDict()])
1228

    
1229
  @_RpcTimeout(_TMO_NORMAL)
1230
  def call_finalize_export(self, node, instance, snap_disks):
1231
    """Request the completion of an export operation.
1232

1233
    This writes the export config file, etc.
1234

1235
    This is a single-node call.
1236

1237
    """
1238
    flat_disks = []
1239
    for disk in snap_disks:
1240
      if isinstance(disk, bool):
1241
        flat_disks.append(disk)
1242
      else:
1243
        flat_disks.append(disk.ToDict())
1244

    
1245
    return self._SingleNodeCall(node, "finalize_export",
1246
                                [self._InstDict(instance), flat_disks])
1247

    
1248
  @_RpcTimeout(_TMO_FAST)
1249
  def call_export_info(self, node, path):
1250
    """Queries the export information in a given path.
1251

1252
    This is a single-node call.
1253

1254
    """
1255
    return self._SingleNodeCall(node, "export_info", [path])
1256

    
1257
  @_RpcTimeout(_TMO_FAST)
1258
  def call_export_list(self, node_list):
1259
    """Gets the stored exports list.
1260

1261
    This is a multi-node call.
1262

1263
    """
1264
    return self._MultiNodeCall(node_list, "export_list", [])
1265

    
1266
  @_RpcTimeout(_TMO_FAST)
1267
  def call_export_remove(self, node, export):
1268
    """Requests removal of a given export.
1269

1270
    This is a single-node call.
1271

1272
    """
1273
    return self._SingleNodeCall(node, "export_remove", [export])
1274

    
1275
  @classmethod
1276
  @_RpcTimeout(_TMO_NORMAL)
1277
  def call_node_leave_cluster(cls, node, modify_ssh_setup):
1278
    """Requests a node to clean the cluster information it has.
1279

1280
    This will remove the configuration information from the ganeti data
1281
    dir.
1282

1283
    This is a single-node call.
1284

1285
    """
1286
    return cls._StaticSingleNodeCall(node, "node_leave_cluster",
1287
                                     [modify_ssh_setup])
1288

    
1289
  @_RpcTimeout(_TMO_FAST)
1290
  def call_node_volumes(self, node_list):
1291
    """Gets all volumes on node(s).
1292

1293
    This is a multi-node call.
1294

1295
    """
1296
    return self._MultiNodeCall(node_list, "node_volumes", [])
1297

    
1298
  @_RpcTimeout(_TMO_FAST)
1299
  def call_node_demote_from_mc(self, node):
1300
    """Demote a node from the master candidate role.
1301

1302
    This is a single-node call.
1303

1304
    """
1305
    return self._SingleNodeCall(node, "node_demote_from_mc", [])
1306

    
1307
  @_RpcTimeout(_TMO_NORMAL)
1308
  def call_node_powercycle(self, node, hypervisor):
1309
    """Tries to powercycle a node.
1310

1311
    This is a single-node call.
1312

1313
    """
1314
    return self._SingleNodeCall(node, "node_powercycle", [hypervisor])
1315

    
1316
  @_RpcTimeout(None)
1317
  def call_test_delay(self, node_list, duration):
1318
    """Sleep for a fixed time on given node(s).
1319

1320
    This is a multi-node call.
1321

1322
    """
1323
    return self._MultiNodeCall(node_list, "test_delay", [duration],
1324
                               read_timeout=int(duration + 5))
1325

    
1326
  @_RpcTimeout(_TMO_FAST)
1327
  def call_file_storage_dir_create(self, node, file_storage_dir):
1328
    """Create the given file storage directory.
1329

1330
    This is a single-node call.
1331

1332
    """
1333
    return self._SingleNodeCall(node, "file_storage_dir_create",
1334
                                [file_storage_dir])
1335

    
1336
  @_RpcTimeout(_TMO_FAST)
1337
  def call_file_storage_dir_remove(self, node, file_storage_dir):
1338
    """Remove the given file storage directory.
1339

1340
    This is a single-node call.
1341

1342
    """
1343
    return self._SingleNodeCall(node, "file_storage_dir_remove",
1344
                                [file_storage_dir])
1345

    
1346
  @_RpcTimeout(_TMO_FAST)
1347
  def call_file_storage_dir_rename(self, node, old_file_storage_dir,
1348
                                   new_file_storage_dir):
1349
    """Rename file storage directory.
1350

1351
    This is a single-node call.
1352

1353
    """
1354
    return self._SingleNodeCall(node, "file_storage_dir_rename",
1355
                                [old_file_storage_dir, new_file_storage_dir])
1356

    
1357
  @classmethod
1358
  @_RpcTimeout(_TMO_FAST)
1359
  def call_jobqueue_update(cls, node_list, address_list, file_name, content):
1360
    """Update job queue.
1361

1362
    This is a multi-node call.
1363

1364
    """
1365
    return cls._StaticMultiNodeCall(node_list, "jobqueue_update",
1366
                                    [file_name, cls._Compress(content)],
1367
                                    address_list=address_list)
1368

    
1369
  @classmethod
1370
  @_RpcTimeout(_TMO_NORMAL)
1371
  def call_jobqueue_purge(cls, node):
1372
    """Purge job queue.
1373

1374
    This is a single-node call.
1375

1376
    """
1377
    return cls._StaticSingleNodeCall(node, "jobqueue_purge", [])
1378

    
1379
  @classmethod
1380
  @_RpcTimeout(_TMO_FAST)
1381
  def call_jobqueue_rename(cls, node_list, address_list, rename):
1382
    """Rename a job queue file.
1383

1384
    This is a multi-node call.
1385

1386
    """
1387
    return cls._StaticMultiNodeCall(node_list, "jobqueue_rename", rename,
1388
                                    address_list=address_list)
1389

    
1390
  @_RpcTimeout(_TMO_NORMAL)
1391
  def call_hypervisor_validate_params(self, node_list, hvname, hvparams):
1392
    """Validate the hypervisor params.
1393

1394
    This is a multi-node call.
1395

1396
    @type node_list: list
1397
    @param node_list: the list of nodes to query
1398
    @type hvname: string
1399
    @param hvname: the hypervisor name
1400
    @type hvparams: dict
1401
    @param hvparams: the hypervisor parameters to be validated
1402

1403
    """
1404
    cluster = self._cfg.GetClusterInfo()
1405
    hv_full = objects.FillDict(cluster.hvparams.get(hvname, {}), hvparams)
1406
    return self._MultiNodeCall(node_list, "hypervisor_validate_params",
1407
                               [hvname, hv_full])
1408

    
1409
  @_RpcTimeout(_TMO_NORMAL)
1410
  def call_x509_cert_create(self, node, validity):
1411
    """Creates a new X509 certificate for SSL/TLS.
1412

1413
    This is a single-node call.
1414

1415
    @type validity: int
1416
    @param validity: Validity in seconds
1417

1418
    """
1419
    return self._SingleNodeCall(node, "x509_cert_create", [validity])
1420

    
1421
  @_RpcTimeout(_TMO_NORMAL)
1422
  def call_x509_cert_remove(self, node, name):
1423
    """Removes a X509 certificate.
1424

1425
    This is a single-node call.
1426

1427
    @type name: string
1428
    @param name: Certificate name
1429

1430
    """
1431
    return self._SingleNodeCall(node, "x509_cert_remove", [name])
1432

    
1433
  @_RpcTimeout(_TMO_NORMAL)
1434
  def call_import_start(self, node, opts, instance, dest, dest_args):
1435
    """Starts a listener for an import.
1436

1437
    This is a single-node call.
1438

1439
    @type node: string
1440
    @param node: Node name
1441
    @type instance: C{objects.Instance}
1442
    @param instance: Instance object
1443

1444
    """
1445
    return self._SingleNodeCall(node, "import_start",
1446
                                [opts.ToDict(),
1447
                                 self._InstDict(instance), dest,
1448
                                 _EncodeImportExportIO(dest, dest_args)])
1449

    
1450
  @_RpcTimeout(_TMO_NORMAL)
1451
  def call_export_start(self, node, opts, host, port,
1452
                        instance, source, source_args):
1453
    """Starts an export daemon.
1454

1455
    This is a single-node call.
1456

1457
    @type node: string
1458
    @param node: Node name
1459
    @type instance: C{objects.Instance}
1460
    @param instance: Instance object
1461

1462
    """
1463
    return self._SingleNodeCall(node, "export_start",
1464
                                [opts.ToDict(), host, port,
1465
                                 self._InstDict(instance), source,
1466
                                 _EncodeImportExportIO(source, source_args)])
1467

    
1468
  @_RpcTimeout(_TMO_FAST)
1469
  def call_impexp_status(self, node, names):
1470
    """Gets the status of an import or export.
1471

1472
    This is a single-node call.
1473

1474
    @type node: string
1475
    @param node: Node name
1476
    @type names: List of strings
1477
    @param names: Import/export names
1478
    @rtype: List of L{objects.ImportExportStatus} instances
1479
    @return: Returns a list of the state of each named import/export or None if
1480
             a status couldn't be retrieved
1481

1482
    """
1483
    result = self._SingleNodeCall(node, "impexp_status", [names])
1484

    
1485
    if not result.fail_msg:
1486
      decoded = []
1487

    
1488
      for i in result.payload:
1489
        if i is None:
1490
          decoded.append(None)
1491
          continue
1492
        decoded.append(objects.ImportExportStatus.FromDict(i))
1493

    
1494
      result.payload = decoded
1495

    
1496
    return result
1497

    
1498
  @_RpcTimeout(_TMO_NORMAL)
1499
  def call_impexp_abort(self, node, name):
1500
    """Aborts an import or export.
1501

1502
    This is a single-node call.
1503

1504
    @type node: string
1505
    @param node: Node name
1506
    @type name: string
1507
    @param name: Import/export name
1508

1509
    """
1510
    return self._SingleNodeCall(node, "impexp_abort", [name])
1511

    
1512
  @_RpcTimeout(_TMO_NORMAL)
1513
  def call_impexp_cleanup(self, node, name):
1514
    """Cleans up after an import or export.
1515

1516
    This is a single-node call.
1517

1518
    @type node: string
1519
    @param node: Node name
1520
    @type name: string
1521
    @param name: Import/export name
1522

1523
    """
1524
    return self._SingleNodeCall(node, "impexp_cleanup", [name])