Statistics
| Branch: | Tag: | Revision:

root / lib / rpc.py @ 3d914585

History | View | Annotate | Download (44.5 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008, 2009, 2010 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Inter-node RPC library.
23

24
"""
25

    
26
# pylint: disable-msg=C0103,R0201,R0904
27
# C0103: Invalid name, since call_ are not valid
28
# R0201: Method could be a function, we keep all rpcs instance methods
29
# as not to change them back and forth between static/instance methods
30
# if they need to start using instance attributes
31
# R0904: Too many public methods
32

    
33
import os
34
import logging
35
import zlib
36
import base64
37
import pycurl
38
import threading
39

    
40
from ganeti import utils
41
from ganeti import objects
42
from ganeti import http
43
from ganeti import serializer
44
from ganeti import constants
45
from ganeti import errors
46
from ganeti import netutils
47
from ganeti import ssconf
48

    
49
# pylint has a bug here, doesn't see this import
50
import ganeti.http.client  # pylint: disable-msg=W0611
51

    
52

    
53
# Timeout for connecting to nodes (seconds)
54
_RPC_CONNECT_TIMEOUT = 5
55

    
56
_RPC_CLIENT_HEADERS = [
57
  "Content-type: %s" % http.HTTP_APP_JSON,
58
  "Expect:",
59
  ]
60

    
61
# Various time constants for the timeout table
62
_TMO_URGENT = 60 # one minute
63
_TMO_FAST = 5 * 60 # five minutes
64
_TMO_NORMAL = 15 * 60 # 15 minutes
65
_TMO_SLOW = 3600 # one hour
66
_TMO_4HRS = 4 * 3600
67
_TMO_1DAY = 86400
68

    
69
# Timeout table that will be built later by decorators
70
# Guidelines for choosing timeouts:
71
# - call used during watcher: timeout -> 1min, _TMO_URGENT
72
# - trivial (but be sure it is trivial) (e.g. reading a file): 5min, _TMO_FAST
73
# - other calls: 15 min, _TMO_NORMAL
74
# - special calls (instance add, etc.): either _TMO_SLOW (1h) or huge timeouts
75

    
76
_TIMEOUTS = {
77
}
78

    
79

    
80
def Init():
81
  """Initializes the module-global HTTP client manager.
82

83
  Must be called before using any RPC function and while exactly one thread is
84
  running.
85

86
  """
87
  # curl_global_init(3) and curl_global_cleanup(3) must be called with only
88
  # one thread running. This check is just a safety measure -- it doesn't
89
  # cover all cases.
90
  assert threading.activeCount() == 1, \
91
         "Found more than one active thread when initializing pycURL"
92

    
93
  logging.info("Using PycURL %s", pycurl.version)
94

    
95
  pycurl.global_init(pycurl.GLOBAL_ALL)
96

    
97

    
98
def Shutdown():
99
  """Stops the module-global HTTP client manager.
100

101
  Must be called before quitting the program and while exactly one thread is
102
  running.
103

104
  """
105
  pycurl.global_cleanup()
106

    
107

    
108
def _ConfigRpcCurl(curl):
109
  noded_cert = str(constants.NODED_CERT_FILE)
110

    
111
  curl.setopt(pycurl.FOLLOWLOCATION, False)
112
  curl.setopt(pycurl.CAINFO, noded_cert)
113
  curl.setopt(pycurl.SSL_VERIFYHOST, 0)
114
  curl.setopt(pycurl.SSL_VERIFYPEER, True)
115
  curl.setopt(pycurl.SSLCERTTYPE, "PEM")
116
  curl.setopt(pycurl.SSLCERT, noded_cert)
117
  curl.setopt(pycurl.SSLKEYTYPE, "PEM")
118
  curl.setopt(pycurl.SSLKEY, noded_cert)
119
  curl.setopt(pycurl.CONNECTTIMEOUT, _RPC_CONNECT_TIMEOUT)
120

    
121

    
122
class _RpcThreadLocal(threading.local):
123
  def GetHttpClientPool(self):
124
    """Returns a per-thread HTTP client pool.
125

126
    @rtype: L{http.client.HttpClientPool}
127

128
    """
129
    try:
130
      pool = self.hcp
131
    except AttributeError:
132
      pool = http.client.HttpClientPool(_ConfigRpcCurl)
133
      self.hcp = pool
134

    
135
    return pool
136

    
137

    
138
_thread_local = _RpcThreadLocal()
139

    
140

    
141
def _RpcTimeout(secs):
142
  """Timeout decorator.
143

144
  When applied to a rpc call_* function, it updates the global timeout
145
  table with the given function/timeout.
146

147
  """
148
  def decorator(f):
149
    name = f.__name__
150
    assert name.startswith("call_")
151
    _TIMEOUTS[name[len("call_"):]] = secs
152
    return f
153
  return decorator
154

    
155

    
156
def RunWithRPC(fn):
157
  """RPC-wrapper decorator.
158

159
  When applied to a function, it runs it with the RPC system
160
  initialized, and it shutsdown the system afterwards. This means the
161
  function must be called without RPC being initialized.
162

163
  """
164
  def wrapper(*args, **kwargs):
165
    Init()
166
    try:
167
      return fn(*args, **kwargs)
168
    finally:
169
      Shutdown()
170
  return wrapper
171

    
172

    
173
class RpcResult(object):
174
  """RPC Result class.
175

176
  This class holds an RPC result. It is needed since in multi-node
177
  calls we can't raise an exception just because one one out of many
178
  failed, and therefore we use this class to encapsulate the result.
179

180
  @ivar data: the data payload, for successful results, or None
181
  @ivar call: the name of the RPC call
182
  @ivar node: the name of the node to which we made the call
183
  @ivar offline: whether the operation failed because the node was
184
      offline, as opposed to actual failure; offline=True will always
185
      imply failed=True, in order to allow simpler checking if
186
      the user doesn't care about the exact failure mode
187
  @ivar fail_msg: the error message if the call failed
188

189
  """
190
  def __init__(self, data=None, failed=False, offline=False,
191
               call=None, node=None):
192
    self.offline = offline
193
    self.call = call
194
    self.node = node
195

    
196
    if offline:
197
      self.fail_msg = "Node is marked offline"
198
      self.data = self.payload = None
199
    elif failed:
200
      self.fail_msg = self._EnsureErr(data)
201
      self.data = self.payload = None
202
    else:
203
      self.data = data
204
      if not isinstance(self.data, (tuple, list)):
205
        self.fail_msg = ("RPC layer error: invalid result type (%s)" %
206
                         type(self.data))
207
        self.payload = None
208
      elif len(data) != 2:
209
        self.fail_msg = ("RPC layer error: invalid result length (%d), "
210
                         "expected 2" % len(self.data))
211
        self.payload = None
212
      elif not self.data[0]:
213
        self.fail_msg = self._EnsureErr(self.data[1])
214
        self.payload = None
215
      else:
216
        # finally success
217
        self.fail_msg = None
218
        self.payload = data[1]
219

    
220
    assert hasattr(self, "call")
221
    assert hasattr(self, "data")
222
    assert hasattr(self, "fail_msg")
223
    assert hasattr(self, "node")
224
    assert hasattr(self, "offline")
225
    assert hasattr(self, "payload")
226

    
227
  @staticmethod
228
  def _EnsureErr(val):
229
    """Helper to ensure we return a 'True' value for error."""
230
    if val:
231
      return val
232
    else:
233
      return "No error information"
234

    
235
  def Raise(self, msg, prereq=False, ecode=None):
236
    """If the result has failed, raise an OpExecError.
237

238
    This is used so that LU code doesn't have to check for each
239
    result, but instead can call this function.
240

241
    """
242
    if not self.fail_msg:
243
      return
244

    
245
    if not msg: # one could pass None for default message
246
      msg = ("Call '%s' to node '%s' has failed: %s" %
247
             (self.call, self.node, self.fail_msg))
248
    else:
249
      msg = "%s: %s" % (msg, self.fail_msg)
250
    if prereq:
251
      ec = errors.OpPrereqError
252
    else:
253
      ec = errors.OpExecError
254
    if ecode is not None:
255
      args = (msg, ecode)
256
    else:
257
      args = (msg, )
258
    raise ec(*args) # pylint: disable-msg=W0142
259

    
260

    
261
def _AddressLookup(node_list,
262
                   ssc=ssconf.SimpleStore,
263
                   nslookup_fn=netutils.Hostname.GetIP):
264
  """Return addresses for given node names.
265

266
  @type node_list: list
267
  @param node_list: List of node names
268
  @type ssc: class
269
  @param ssc: SimpleStore class that is used to obtain node->ip mappings
270
  @type nslookup_fn: callable
271
  @param nslookup_fn: function use to do NS lookup
272
  @rtype: list of addresses and/or None's
273
  @returns: List of corresponding addresses, if found
274

275
  """
276
  ss = ssc()
277
  iplist = ss.GetNodePrimaryIPList()
278
  family = ss.GetPrimaryIPFamily()
279
  addresses = []
280
  ipmap = dict(entry.split() for entry in iplist)
281
  for node in node_list:
282
    address = ipmap.get(node)
283
    if address is None:
284
      address = nslookup_fn(node, family=family)
285
    addresses.append(address)
286

    
287
  return addresses
288

    
289

    
290
class Client:
291
  """RPC Client class.
292

293
  This class, given a (remote) method name, a list of parameters and a
294
  list of nodes, will contact (in parallel) all nodes, and return a
295
  dict of results (key: node name, value: result).
296

297
  One current bug is that generic failure is still signaled by
298
  'False' result, which is not good. This overloading of values can
299
  cause bugs.
300

301
  """
302
  def __init__(self, procedure, body, port, address_lookup_fn=_AddressLookup):
303
    assert procedure in _TIMEOUTS, ("New RPC call not declared in the"
304
                                    " timeouts table")
305
    self.procedure = procedure
306
    self.body = body
307
    self.port = port
308
    self._request = {}
309
    self._address_lookup_fn = address_lookup_fn
310

    
311
  def ConnectList(self, node_list, address_list=None, read_timeout=None):
312
    """Add a list of nodes to the target nodes.
313

314
    @type node_list: list
315
    @param node_list: the list of node names to connect
316
    @type address_list: list or None
317
    @keyword address_list: either None or a list with node addresses,
318
        which must have the same length as the node list
319
    @type read_timeout: int
320
    @param read_timeout: overwrites default timeout for operation
321

322
    """
323
    if address_list is None:
324
      # Always use IP address instead of node name
325
      address_list = self._address_lookup_fn(node_list)
326

    
327
    assert len(node_list) == len(address_list), \
328
           "Name and address lists must have the same length"
329

    
330
    for node, address in zip(node_list, address_list):
331
      self.ConnectNode(node, address, read_timeout=read_timeout)
332

    
333
  def ConnectNode(self, name, address=None, read_timeout=None):
334
    """Add a node to the target list.
335

336
    @type name: str
337
    @param name: the node name
338
    @type address: str
339
    @param address: the node address, if known
340
    @type read_timeout: int
341
    @param read_timeout: overwrites default timeout for operation
342

343
    """
344
    if address is None:
345
      # Always use IP address instead of node name
346
      address = self._address_lookup_fn([name])[0]
347

    
348
    assert(address is not None)
349

    
350
    if read_timeout is None:
351
      read_timeout = _TIMEOUTS[self.procedure]
352

    
353
    self._request[name] = \
354
      http.client.HttpClientRequest(str(address), self.port,
355
                                    http.HTTP_PUT, str("/%s" % self.procedure),
356
                                    headers=_RPC_CLIENT_HEADERS,
357
                                    post_data=str(self.body),
358
                                    read_timeout=read_timeout)
359

    
360
  def GetResults(self, http_pool=None):
361
    """Call nodes and return results.
362

363
    @rtype: list
364
    @return: List of RPC results
365

366
    """
367
    if not http_pool:
368
      http_pool = _thread_local.GetHttpClientPool()
369

    
370
    http_pool.ProcessRequests(self._request.values())
371

    
372
    results = {}
373

    
374
    for name, req in self._request.iteritems():
375
      if req.success and req.resp_status_code == http.HTTP_OK:
376
        results[name] = RpcResult(data=serializer.LoadJson(req.resp_body),
377
                                  node=name, call=self.procedure)
378
        continue
379

    
380
      # TODO: Better error reporting
381
      if req.error:
382
        msg = req.error
383
      else:
384
        msg = req.resp_body
385

    
386
      logging.error("RPC error in %s from node %s: %s",
387
                    self.procedure, name, msg)
388
      results[name] = RpcResult(data=msg, failed=True, node=name,
389
                                call=self.procedure)
390

    
391
    return results
392

    
393

    
394
def _EncodeImportExportIO(ieio, ieioargs):
395
  """Encodes import/export I/O information.
396

397
  """
398
  if ieio == constants.IEIO_RAW_DISK:
399
    assert len(ieioargs) == 1
400
    return (ieioargs[0].ToDict(), )
401

    
402
  if ieio == constants.IEIO_SCRIPT:
403
    assert len(ieioargs) == 2
404
    return (ieioargs[0].ToDict(), ieioargs[1])
405

    
406
  return ieioargs
407

    
408

    
409
class RpcRunner(object):
410
  """RPC runner class"""
411

    
412
  def __init__(self, cfg):
413
    """Initialized the rpc runner.
414

415
    @type cfg:  C{config.ConfigWriter}
416
    @param cfg: the configuration object that will be used to get data
417
                about the cluster
418

419
    """
420
    self._cfg = cfg
421
    self.port = netutils.GetDaemonPort(constants.NODED)
422

    
423
  def _InstDict(self, instance, hvp=None, bep=None, osp=None):
424
    """Convert the given instance to a dict.
425

426
    This is done via the instance's ToDict() method and additionally
427
    we fill the hvparams with the cluster defaults.
428

429
    @type instance: L{objects.Instance}
430
    @param instance: an Instance object
431
    @type hvp: dict or None
432
    @param hvp: a dictionary with overridden hypervisor parameters
433
    @type bep: dict or None
434
    @param bep: a dictionary with overridden backend parameters
435
    @type osp: dict or None
436
    @param osp: a dictionary with overriden os parameters
437
    @rtype: dict
438
    @return: the instance dict, with the hvparams filled with the
439
        cluster defaults
440

441
    """
442
    idict = instance.ToDict()
443
    cluster = self._cfg.GetClusterInfo()
444
    idict["hvparams"] = cluster.FillHV(instance)
445
    if hvp is not None:
446
      idict["hvparams"].update(hvp)
447
    idict["beparams"] = cluster.FillBE(instance)
448
    if bep is not None:
449
      idict["beparams"].update(bep)
450
    idict["osparams"] = cluster.SimpleFillOS(instance.os, instance.osparams)
451
    if osp is not None:
452
      idict["osparams"].update(osp)
453
    for nic in idict["nics"]:
454
      nic['nicparams'] = objects.FillDict(
455
        cluster.nicparams[constants.PP_DEFAULT],
456
        nic['nicparams'])
457
    return idict
458

    
459
  def _ConnectList(self, client, node_list, call, read_timeout=None):
460
    """Helper for computing node addresses.
461

462
    @type client: L{ganeti.rpc.Client}
463
    @param client: a C{Client} instance
464
    @type node_list: list
465
    @param node_list: the node list we should connect
466
    @type call: string
467
    @param call: the name of the remote procedure call, for filling in
468
        correctly any eventual offline nodes' results
469
    @type read_timeout: int
470
    @param read_timeout: overwrites the default read timeout for the
471
        given operation
472

473
    """
474
    all_nodes = self._cfg.GetAllNodesInfo()
475
    name_list = []
476
    addr_list = []
477
    skip_dict = {}
478
    for node in node_list:
479
      if node in all_nodes:
480
        if all_nodes[node].offline:
481
          skip_dict[node] = RpcResult(node=node, offline=True, call=call)
482
          continue
483
        val = all_nodes[node].primary_ip
484
      else:
485
        val = None
486
      addr_list.append(val)
487
      name_list.append(node)
488
    if name_list:
489
      client.ConnectList(name_list, address_list=addr_list,
490
                         read_timeout=read_timeout)
491
    return skip_dict
492

    
493
  def _ConnectNode(self, client, node, call, read_timeout=None):
494
    """Helper for computing one node's address.
495

496
    @type client: L{ganeti.rpc.Client}
497
    @param client: a C{Client} instance
498
    @type node: str
499
    @param node: the node we should connect
500
    @type call: string
501
    @param call: the name of the remote procedure call, for filling in
502
        correctly any eventual offline nodes' results
503
    @type read_timeout: int
504
    @param read_timeout: overwrites the default read timeout for the
505
        given operation
506

507
    """
508
    node_info = self._cfg.GetNodeInfo(node)
509
    if node_info is not None:
510
      if node_info.offline:
511
        return RpcResult(node=node, offline=True, call=call)
512
      addr = node_info.primary_ip
513
    else:
514
      addr = None
515
    client.ConnectNode(node, address=addr, read_timeout=read_timeout)
516

    
517
  def _MultiNodeCall(self, node_list, procedure, args, read_timeout=None):
518
    """Helper for making a multi-node call
519

520
    """
521
    body = serializer.DumpJson(args, indent=False)
522
    c = Client(procedure, body, self.port)
523
    skip_dict = self._ConnectList(c, node_list, procedure,
524
                                  read_timeout=read_timeout)
525
    skip_dict.update(c.GetResults())
526
    return skip_dict
527

    
528
  @classmethod
529
  def _StaticMultiNodeCall(cls, node_list, procedure, args,
530
                           address_list=None, read_timeout=None):
531
    """Helper for making a multi-node static call
532

533
    """
534
    body = serializer.DumpJson(args, indent=False)
535
    c = Client(procedure, body, netutils.GetDaemonPort(constants.NODED))
536
    c.ConnectList(node_list, address_list=address_list,
537
                  read_timeout=read_timeout)
538
    return c.GetResults()
539

    
540
  def _SingleNodeCall(self, node, procedure, args, read_timeout=None):
541
    """Helper for making a single-node call
542

543
    """
544
    body = serializer.DumpJson(args, indent=False)
545
    c = Client(procedure, body, self.port)
546
    result = self._ConnectNode(c, node, procedure, read_timeout=read_timeout)
547
    if result is None:
548
      # we did connect, node is not offline
549
      result = c.GetResults()[node]
550
    return result
551

    
552
  @classmethod
553
  def _StaticSingleNodeCall(cls, node, procedure, args, read_timeout=None):
554
    """Helper for making a single-node static call
555

556
    """
557
    body = serializer.DumpJson(args, indent=False)
558
    c = Client(procedure, body, netutils.GetDaemonPort(constants.NODED))
559
    c.ConnectNode(node, read_timeout=read_timeout)
560
    return c.GetResults()[node]
561

    
562
  @staticmethod
563
  def _Compress(data):
564
    """Compresses a string for transport over RPC.
565

566
    Small amounts of data are not compressed.
567

568
    @type data: str
569
    @param data: Data
570
    @rtype: tuple
571
    @return: Encoded data to send
572

573
    """
574
    # Small amounts of data are not compressed
575
    if len(data) < 512:
576
      return (constants.RPC_ENCODING_NONE, data)
577

    
578
    # Compress with zlib and encode in base64
579
    return (constants.RPC_ENCODING_ZLIB_BASE64,
580
            base64.b64encode(zlib.compress(data, 3)))
581

    
582
  #
583
  # Begin RPC calls
584
  #
585

    
586
  @_RpcTimeout(_TMO_URGENT)
587
  def call_lv_list(self, node_list, vg_name):
588
    """Gets the logical volumes present in a given volume group.
589

590
    This is a multi-node call.
591

592
    """
593
    return self._MultiNodeCall(node_list, "lv_list", [vg_name])
594

    
595
  @_RpcTimeout(_TMO_URGENT)
596
  def call_vg_list(self, node_list):
597
    """Gets the volume group list.
598

599
    This is a multi-node call.
600

601
    """
602
    return self._MultiNodeCall(node_list, "vg_list", [])
603

    
604
  @_RpcTimeout(_TMO_NORMAL)
605
  def call_storage_list(self, node_list, su_name, su_args, name, fields):
606
    """Get list of storage units.
607

608
    This is a multi-node call.
609

610
    """
611
    return self._MultiNodeCall(node_list, "storage_list",
612
                               [su_name, su_args, name, fields])
613

    
614
  @_RpcTimeout(_TMO_NORMAL)
615
  def call_storage_modify(self, node, su_name, su_args, name, changes):
616
    """Modify a storage unit.
617

618
    This is a single-node call.
619

620
    """
621
    return self._SingleNodeCall(node, "storage_modify",
622
                                [su_name, su_args, name, changes])
623

    
624
  @_RpcTimeout(_TMO_NORMAL)
625
  def call_storage_execute(self, node, su_name, su_args, name, op):
626
    """Executes an operation on a storage unit.
627

628
    This is a single-node call.
629

630
    """
631
    return self._SingleNodeCall(node, "storage_execute",
632
                                [su_name, su_args, name, op])
633

    
634
  @_RpcTimeout(_TMO_URGENT)
635
  def call_bridges_exist(self, node, bridges_list):
636
    """Checks if a node has all the bridges given.
637

638
    This method checks if all bridges given in the bridges_list are
639
    present on the remote node, so that an instance that uses interfaces
640
    on those bridges can be started.
641

642
    This is a single-node call.
643

644
    """
645
    return self._SingleNodeCall(node, "bridges_exist", [bridges_list])
646

    
647
  @_RpcTimeout(_TMO_NORMAL)
648
  def call_instance_start(self, node, instance, hvp, bep):
649
    """Starts an instance.
650

651
    This is a single-node call.
652

653
    """
654
    idict = self._InstDict(instance, hvp=hvp, bep=bep)
655
    return self._SingleNodeCall(node, "instance_start", [idict])
656

    
657
  @_RpcTimeout(_TMO_NORMAL)
658
  def call_instance_shutdown(self, node, instance, timeout):
659
    """Stops an instance.
660

661
    This is a single-node call.
662

663
    """
664
    return self._SingleNodeCall(node, "instance_shutdown",
665
                                [self._InstDict(instance), timeout])
666

    
667
  @_RpcTimeout(_TMO_NORMAL)
668
  def call_migration_info(self, node, instance):
669
    """Gather the information necessary to prepare an instance migration.
670

671
    This is a single-node call.
672

673
    @type node: string
674
    @param node: the node on which the instance is currently running
675
    @type instance: C{objects.Instance}
676
    @param instance: the instance definition
677

678
    """
679
    return self._SingleNodeCall(node, "migration_info",
680
                                [self._InstDict(instance)])
681

    
682
  @_RpcTimeout(_TMO_NORMAL)
683
  def call_accept_instance(self, node, instance, info, target):
684
    """Prepare a node to accept an instance.
685

686
    This is a single-node call.
687

688
    @type node: string
689
    @param node: the target node for the migration
690
    @type instance: C{objects.Instance}
691
    @param instance: the instance definition
692
    @type info: opaque/hypervisor specific (string/data)
693
    @param info: result for the call_migration_info call
694
    @type target: string
695
    @param target: target hostname (usually ip address) (on the node itself)
696

697
    """
698
    return self._SingleNodeCall(node, "accept_instance",
699
                                [self._InstDict(instance), info, target])
700

    
701
  @_RpcTimeout(_TMO_NORMAL)
702
  def call_finalize_migration(self, node, instance, info, success):
703
    """Finalize any target-node migration specific operation.
704

705
    This is called both in case of a successful migration and in case of error
706
    (in which case it should abort the migration).
707

708
    This is a single-node call.
709

710
    @type node: string
711
    @param node: the target node for the migration
712
    @type instance: C{objects.Instance}
713
    @param instance: the instance definition
714
    @type info: opaque/hypervisor specific (string/data)
715
    @param info: result for the call_migration_info call
716
    @type success: boolean
717
    @param success: whether the migration was a success or a failure
718

719
    """
720
    return self._SingleNodeCall(node, "finalize_migration",
721
                                [self._InstDict(instance), info, success])
722

    
723
  @_RpcTimeout(_TMO_SLOW)
724
  def call_instance_migrate(self, node, instance, target, live):
725
    """Migrate an instance.
726

727
    This is a single-node call.
728

729
    @type node: string
730
    @param node: the node on which the instance is currently running
731
    @type instance: C{objects.Instance}
732
    @param instance: the instance definition
733
    @type target: string
734
    @param target: the target node name
735
    @type live: boolean
736
    @param live: whether the migration should be done live or not (the
737
        interpretation of this parameter is left to the hypervisor)
738

739
    """
740
    return self._SingleNodeCall(node, "instance_migrate",
741
                                [self._InstDict(instance), target, live])
742

    
743
  @_RpcTimeout(_TMO_NORMAL)
744
  def call_instance_reboot(self, node, inst, reboot_type, shutdown_timeout):
745
    """Reboots an instance.
746

747
    This is a single-node call.
748

749
    """
750
    return self._SingleNodeCall(node, "instance_reboot",
751
                                [self._InstDict(inst), reboot_type,
752
                                 shutdown_timeout])
753

    
754
  @_RpcTimeout(_TMO_1DAY)
755
  def call_instance_os_add(self, node, inst, reinstall, debug):
756
    """Installs an OS on the given instance.
757

758
    This is a single-node call.
759

760
    """
761
    return self._SingleNodeCall(node, "instance_os_add",
762
                                [self._InstDict(inst), reinstall, debug])
763

    
764
  @_RpcTimeout(_TMO_SLOW)
765
  def call_instance_run_rename(self, node, inst, old_name, debug):
766
    """Run the OS rename script for an instance.
767

768
    This is a single-node call.
769

770
    """
771
    return self._SingleNodeCall(node, "instance_run_rename",
772
                                [self._InstDict(inst), old_name, debug])
773

    
774
  @_RpcTimeout(_TMO_URGENT)
775
  def call_instance_info(self, node, instance, hname):
776
    """Returns information about a single instance.
777

778
    This is a single-node call.
779

780
    @type node: list
781
    @param node: the list of nodes to query
782
    @type instance: string
783
    @param instance: the instance name
784
    @type hname: string
785
    @param hname: the hypervisor type of the instance
786

787
    """
788
    return self._SingleNodeCall(node, "instance_info", [instance, hname])
789

    
790
  @_RpcTimeout(_TMO_NORMAL)
791
  def call_instance_migratable(self, node, instance):
792
    """Checks whether the given instance can be migrated.
793

794
    This is a single-node call.
795

796
    @param node: the node to query
797
    @type instance: L{objects.Instance}
798
    @param instance: the instance to check
799

800

801
    """
802
    return self._SingleNodeCall(node, "instance_migratable",
803
                                [self._InstDict(instance)])
804

    
805
  @_RpcTimeout(_TMO_URGENT)
806
  def call_all_instances_info(self, node_list, hypervisor_list):
807
    """Returns information about all instances on the given nodes.
808

809
    This is a multi-node call.
810

811
    @type node_list: list
812
    @param node_list: the list of nodes to query
813
    @type hypervisor_list: list
814
    @param hypervisor_list: the hypervisors to query for instances
815

816
    """
817
    return self._MultiNodeCall(node_list, "all_instances_info",
818
                               [hypervisor_list])
819

    
820
  @_RpcTimeout(_TMO_URGENT)
821
  def call_instance_list(self, node_list, hypervisor_list):
822
    """Returns the list of running instances on a given node.
823

824
    This is a multi-node call.
825

826
    @type node_list: list
827
    @param node_list: the list of nodes to query
828
    @type hypervisor_list: list
829
    @param hypervisor_list: the hypervisors to query for instances
830

831
    """
832
    return self._MultiNodeCall(node_list, "instance_list", [hypervisor_list])
833

    
834
  @_RpcTimeout(_TMO_FAST)
835
  def call_node_tcp_ping(self, node, source, target, port, timeout,
836
                         live_port_needed):
837
    """Do a TcpPing on the remote node
838

839
    This is a single-node call.
840

841
    """
842
    return self._SingleNodeCall(node, "node_tcp_ping",
843
                                [source, target, port, timeout,
844
                                 live_port_needed])
845

    
846
  @_RpcTimeout(_TMO_FAST)
847
  def call_node_has_ip_address(self, node, address):
848
    """Checks if a node has the given IP address.
849

850
    This is a single-node call.
851

852
    """
853
    return self._SingleNodeCall(node, "node_has_ip_address", [address])
854

    
855
  @_RpcTimeout(_TMO_URGENT)
856
  def call_node_info(self, node_list, vg_name, hypervisor_type):
857
    """Return node information.
858

859
    This will return memory information and volume group size and free
860
    space.
861

862
    This is a multi-node call.
863

864
    @type node_list: list
865
    @param node_list: the list of nodes to query
866
    @type vg_name: C{string}
867
    @param vg_name: the name of the volume group to ask for disk space
868
        information
869
    @type hypervisor_type: C{str}
870
    @param hypervisor_type: the name of the hypervisor to ask for
871
        memory information
872

873
    """
874
    return self._MultiNodeCall(node_list, "node_info",
875
                               [vg_name, hypervisor_type])
876

    
877
  @_RpcTimeout(_TMO_NORMAL)
878
  def call_etc_hosts_modify(self, node, mode, name, ip):
879
    """Modify hosts file with name
880

881
    @type node: string
882
    @param node: The node to call
883
    @type mode: string
884
    @param mode: The mode to operate. Currently "add" or "remove"
885
    @type name: string
886
    @param name: The host name to be modified
887
    @type ip: string
888
    @param ip: The ip of the entry (just valid if mode is "add")
889

890
    """
891
    return self._SingleNodeCall(node, "etc_hosts_modify", [mode, name, ip])
892

    
893
  @_RpcTimeout(_TMO_NORMAL)
894
  def call_node_verify(self, node_list, checkdict, cluster_name):
895
    """Request verification of given parameters.
896

897
    This is a multi-node call.
898

899
    """
900
    return self._MultiNodeCall(node_list, "node_verify",
901
                               [checkdict, cluster_name])
902

    
903
  @classmethod
904
  @_RpcTimeout(_TMO_FAST)
905
  def call_node_start_master(cls, node, start_daemons, no_voting):
906
    """Tells a node to activate itself as a master.
907

908
    This is a single-node call.
909

910
    """
911
    return cls._StaticSingleNodeCall(node, "node_start_master",
912
                                     [start_daemons, no_voting])
913

    
914
  @classmethod
915
  @_RpcTimeout(_TMO_FAST)
916
  def call_node_stop_master(cls, node, stop_daemons):
917
    """Tells a node to demote itself from master status.
918

919
    This is a single-node call.
920

921
    """
922
    return cls._StaticSingleNodeCall(node, "node_stop_master", [stop_daemons])
923

    
924
  @classmethod
925
  @_RpcTimeout(_TMO_URGENT)
926
  def call_master_info(cls, node_list):
927
    """Query master info.
928

929
    This is a multi-node call.
930

931
    """
932
    # TODO: should this method query down nodes?
933
    return cls._StaticMultiNodeCall(node_list, "master_info", [])
934

    
935
  @classmethod
936
  @_RpcTimeout(_TMO_URGENT)
937
  def call_version(cls, node_list):
938
    """Query node version.
939

940
    This is a multi-node call.
941

942
    """
943
    return cls._StaticMultiNodeCall(node_list, "version", [])
944

    
945
  @_RpcTimeout(_TMO_NORMAL)
946
  def call_blockdev_create(self, node, bdev, size, owner, on_primary, info):
947
    """Request creation of a given block device.
948

949
    This is a single-node call.
950

951
    """
952
    return self._SingleNodeCall(node, "blockdev_create",
953
                                [bdev.ToDict(), size, owner, on_primary, info])
954

    
955
  @_RpcTimeout(_TMO_NORMAL)
956
  def call_blockdev_remove(self, node, bdev):
957
    """Request removal of a given block device.
958

959
    This is a single-node call.
960

961
    """
962
    return self._SingleNodeCall(node, "blockdev_remove", [bdev.ToDict()])
963

    
964
  @_RpcTimeout(_TMO_NORMAL)
965
  def call_blockdev_rename(self, node, devlist):
966
    """Request rename of the given block devices.
967

968
    This is a single-node call.
969

970
    """
971
    return self._SingleNodeCall(node, "blockdev_rename",
972
                                [(d.ToDict(), uid) for d, uid in devlist])
973

    
974
  @_RpcTimeout(_TMO_NORMAL)
975
  def call_blockdev_assemble(self, node, disk, owner, on_primary):
976
    """Request assembling of a given block device.
977

978
    This is a single-node call.
979

980
    """
981
    return self._SingleNodeCall(node, "blockdev_assemble",
982
                                [disk.ToDict(), owner, on_primary])
983

    
984
  @_RpcTimeout(_TMO_NORMAL)
985
  def call_blockdev_shutdown(self, node, disk):
986
    """Request shutdown of a given block device.
987

988
    This is a single-node call.
989

990
    """
991
    return self._SingleNodeCall(node, "blockdev_shutdown", [disk.ToDict()])
992

    
993
  @_RpcTimeout(_TMO_NORMAL)
994
  def call_blockdev_addchildren(self, node, bdev, ndevs):
995
    """Request adding a list of children to a (mirroring) device.
996

997
    This is a single-node call.
998

999
    """
1000
    return self._SingleNodeCall(node, "blockdev_addchildren",
1001
                                [bdev.ToDict(),
1002
                                 [disk.ToDict() for disk in ndevs]])
1003

    
1004
  @_RpcTimeout(_TMO_NORMAL)
1005
  def call_blockdev_removechildren(self, node, bdev, ndevs):
1006
    """Request removing a list of children from a (mirroring) device.
1007

1008
    This is a single-node call.
1009

1010
    """
1011
    return self._SingleNodeCall(node, "blockdev_removechildren",
1012
                                [bdev.ToDict(),
1013
                                 [disk.ToDict() for disk in ndevs]])
1014

    
1015
  @_RpcTimeout(_TMO_NORMAL)
1016
  def call_blockdev_getmirrorstatus(self, node, disks):
1017
    """Request status of a (mirroring) device.
1018

1019
    This is a single-node call.
1020

1021
    """
1022
    result = self._SingleNodeCall(node, "blockdev_getmirrorstatus",
1023
                                  [dsk.ToDict() for dsk in disks])
1024
    if not result.fail_msg:
1025
      result.payload = [objects.BlockDevStatus.FromDict(i)
1026
                        for i in result.payload]
1027
    return result
1028

    
1029
  @_RpcTimeout(_TMO_NORMAL)
1030
  def call_blockdev_find(self, node, disk):
1031
    """Request identification of a given block device.
1032

1033
    This is a single-node call.
1034

1035
    """
1036
    result = self._SingleNodeCall(node, "blockdev_find", [disk.ToDict()])
1037
    if not result.fail_msg and result.payload is not None:
1038
      result.payload = objects.BlockDevStatus.FromDict(result.payload)
1039
    return result
1040

    
1041
  @_RpcTimeout(_TMO_NORMAL)
1042
  def call_blockdev_close(self, node, instance_name, disks):
1043
    """Closes the given block devices.
1044

1045
    This is a single-node call.
1046

1047
    """
1048
    params = [instance_name, [cf.ToDict() for cf in disks]]
1049
    return self._SingleNodeCall(node, "blockdev_close", params)
1050

    
1051
  @_RpcTimeout(_TMO_NORMAL)
1052
  def call_blockdev_getsizes(self, node, disks):
1053
    """Returns the size of the given disks.
1054

1055
    This is a single-node call.
1056

1057
    """
1058
    params = [[cf.ToDict() for cf in disks]]
1059
    return self._SingleNodeCall(node, "blockdev_getsize", params)
1060

    
1061
  @_RpcTimeout(_TMO_NORMAL)
1062
  def call_drbd_disconnect_net(self, node_list, nodes_ip, disks):
1063
    """Disconnects the network of the given drbd devices.
1064

1065
    This is a multi-node call.
1066

1067
    """
1068
    return self._MultiNodeCall(node_list, "drbd_disconnect_net",
1069
                               [nodes_ip, [cf.ToDict() for cf in disks]])
1070

    
1071
  @_RpcTimeout(_TMO_NORMAL)
1072
  def call_drbd_attach_net(self, node_list, nodes_ip,
1073
                           disks, instance_name, multimaster):
1074
    """Disconnects the given drbd devices.
1075

1076
    This is a multi-node call.
1077

1078
    """
1079
    return self._MultiNodeCall(node_list, "drbd_attach_net",
1080
                               [nodes_ip, [cf.ToDict() for cf in disks],
1081
                                instance_name, multimaster])
1082

    
1083
  @_RpcTimeout(_TMO_SLOW)
1084
  def call_drbd_wait_sync(self, node_list, nodes_ip, disks):
1085
    """Waits for the synchronization of drbd devices is complete.
1086

1087
    This is a multi-node call.
1088

1089
    """
1090
    return self._MultiNodeCall(node_list, "drbd_wait_sync",
1091
                               [nodes_ip, [cf.ToDict() for cf in disks]])
1092

    
1093
  @_RpcTimeout(_TMO_URGENT)
1094
  def call_drbd_helper(self, node_list):
1095
    """Gets drbd helper.
1096

1097
    This is a multi-node call.
1098

1099
    """
1100
    return self._MultiNodeCall(node_list, "drbd_helper", [])
1101

    
1102
  @classmethod
1103
  @_RpcTimeout(_TMO_NORMAL)
1104
  def call_upload_file(cls, node_list, file_name, address_list=None):
1105
    """Upload a file.
1106

1107
    The node will refuse the operation in case the file is not on the
1108
    approved file list.
1109

1110
    This is a multi-node call.
1111

1112
    @type node_list: list
1113
    @param node_list: the list of node names to upload to
1114
    @type file_name: str
1115
    @param file_name: the filename to upload
1116
    @type address_list: list or None
1117
    @keyword address_list: an optional list of node addresses, in order
1118
        to optimize the RPC speed
1119

1120
    """
1121
    file_contents = utils.ReadFile(file_name)
1122
    data = cls._Compress(file_contents)
1123
    st = os.stat(file_name)
1124
    params = [file_name, data, st.st_mode, st.st_uid, st.st_gid,
1125
              st.st_atime, st.st_mtime]
1126
    return cls._StaticMultiNodeCall(node_list, "upload_file", params,
1127
                                    address_list=address_list)
1128

    
1129
  @classmethod
1130
  @_RpcTimeout(_TMO_NORMAL)
1131
  def call_write_ssconf_files(cls, node_list, values):
1132
    """Write ssconf files.
1133

1134
    This is a multi-node call.
1135

1136
    """
1137
    return cls._StaticMultiNodeCall(node_list, "write_ssconf_files", [values])
1138

    
1139
  @_RpcTimeout(_TMO_FAST)
1140
  def call_os_diagnose(self, node_list):
1141
    """Request a diagnose of OS definitions.
1142

1143
    This is a multi-node call.
1144

1145
    """
1146
    return self._MultiNodeCall(node_list, "os_diagnose", [])
1147

    
1148
  @_RpcTimeout(_TMO_FAST)
1149
  def call_os_get(self, node, name):
1150
    """Returns an OS definition.
1151

1152
    This is a single-node call.
1153

1154
    """
1155
    result = self._SingleNodeCall(node, "os_get", [name])
1156
    if not result.fail_msg and isinstance(result.payload, dict):
1157
      result.payload = objects.OS.FromDict(result.payload)
1158
    return result
1159

    
1160
  @_RpcTimeout(_TMO_FAST)
1161
  def call_os_validate(self, required, nodes, name, checks, params):
1162
    """Run a validation routine for a given OS.
1163

1164
    This is a multi-node call.
1165

1166
    """
1167
    return self._MultiNodeCall(nodes, "os_validate",
1168
                               [required, name, checks, params])
1169

    
1170
  @_RpcTimeout(_TMO_NORMAL)
1171
  def call_hooks_runner(self, node_list, hpath, phase, env):
1172
    """Call the hooks runner.
1173

1174
    Args:
1175
      - op: the OpCode instance
1176
      - env: a dictionary with the environment
1177

1178
    This is a multi-node call.
1179

1180
    """
1181
    params = [hpath, phase, env]
1182
    return self._MultiNodeCall(node_list, "hooks_runner", params)
1183

    
1184
  @_RpcTimeout(_TMO_NORMAL)
1185
  def call_iallocator_runner(self, node, name, idata):
1186
    """Call an iallocator on a remote node
1187

1188
    Args:
1189
      - name: the iallocator name
1190
      - input: the json-encoded input string
1191

1192
    This is a single-node call.
1193

1194
    """
1195
    return self._SingleNodeCall(node, "iallocator_runner", [name, idata])
1196

    
1197
  @_RpcTimeout(_TMO_NORMAL)
1198
  def call_blockdev_grow(self, node, cf_bdev, amount):
1199
    """Request a snapshot of the given block device.
1200

1201
    This is a single-node call.
1202

1203
    """
1204
    return self._SingleNodeCall(node, "blockdev_grow",
1205
                                [cf_bdev.ToDict(), amount])
1206

    
1207
  @_RpcTimeout(_TMO_1DAY)
1208
  def call_blockdev_export(self, node, cf_bdev,
1209
                           dest_node, dest_path, cluster_name):
1210
    """Export a given disk to another node.
1211

1212
    This is a single-node call.
1213

1214
    """
1215
    return self._SingleNodeCall(node, "blockdev_export",
1216
                                [cf_bdev.ToDict(), dest_node, dest_path,
1217
                                 cluster_name])
1218

    
1219
  @_RpcTimeout(_TMO_NORMAL)
1220
  def call_blockdev_snapshot(self, node, cf_bdev):
1221
    """Request a snapshot of the given block device.
1222

1223
    This is a single-node call.
1224

1225
    """
1226
    return self._SingleNodeCall(node, "blockdev_snapshot", [cf_bdev.ToDict()])
1227

    
1228
  @_RpcTimeout(_TMO_NORMAL)
1229
  def call_finalize_export(self, node, instance, snap_disks):
1230
    """Request the completion of an export operation.
1231

1232
    This writes the export config file, etc.
1233

1234
    This is a single-node call.
1235

1236
    """
1237
    flat_disks = []
1238
    for disk in snap_disks:
1239
      if isinstance(disk, bool):
1240
        flat_disks.append(disk)
1241
      else:
1242
        flat_disks.append(disk.ToDict())
1243

    
1244
    return self._SingleNodeCall(node, "finalize_export",
1245
                                [self._InstDict(instance), flat_disks])
1246

    
1247
  @_RpcTimeout(_TMO_FAST)
1248
  def call_export_info(self, node, path):
1249
    """Queries the export information in a given path.
1250

1251
    This is a single-node call.
1252

1253
    """
1254
    return self._SingleNodeCall(node, "export_info", [path])
1255

    
1256
  @_RpcTimeout(_TMO_FAST)
1257
  def call_export_list(self, node_list):
1258
    """Gets the stored exports list.
1259

1260
    This is a multi-node call.
1261

1262
    """
1263
    return self._MultiNodeCall(node_list, "export_list", [])
1264

    
1265
  @_RpcTimeout(_TMO_FAST)
1266
  def call_export_remove(self, node, export):
1267
    """Requests removal of a given export.
1268

1269
    This is a single-node call.
1270

1271
    """
1272
    return self._SingleNodeCall(node, "export_remove", [export])
1273

    
1274
  @classmethod
1275
  @_RpcTimeout(_TMO_NORMAL)
1276
  def call_node_leave_cluster(cls, node, modify_ssh_setup):
1277
    """Requests a node to clean the cluster information it has.
1278

1279
    This will remove the configuration information from the ganeti data
1280
    dir.
1281

1282
    This is a single-node call.
1283

1284
    """
1285
    return cls._StaticSingleNodeCall(node, "node_leave_cluster",
1286
                                     [modify_ssh_setup])
1287

    
1288
  @_RpcTimeout(_TMO_FAST)
1289
  def call_node_volumes(self, node_list):
1290
    """Gets all volumes on node(s).
1291

1292
    This is a multi-node call.
1293

1294
    """
1295
    return self._MultiNodeCall(node_list, "node_volumes", [])
1296

    
1297
  @_RpcTimeout(_TMO_FAST)
1298
  def call_node_demote_from_mc(self, node):
1299
    """Demote a node from the master candidate role.
1300

1301
    This is a single-node call.
1302

1303
    """
1304
    return self._SingleNodeCall(node, "node_demote_from_mc", [])
1305

    
1306
  @_RpcTimeout(_TMO_NORMAL)
1307
  def call_node_powercycle(self, node, hypervisor):
1308
    """Tries to powercycle a node.
1309

1310
    This is a single-node call.
1311

1312
    """
1313
    return self._SingleNodeCall(node, "node_powercycle", [hypervisor])
1314

    
1315
  @_RpcTimeout(None)
1316
  def call_test_delay(self, node_list, duration):
1317
    """Sleep for a fixed time on given node(s).
1318

1319
    This is a multi-node call.
1320

1321
    """
1322
    return self._MultiNodeCall(node_list, "test_delay", [duration],
1323
                               read_timeout=int(duration + 5))
1324

    
1325
  @_RpcTimeout(_TMO_FAST)
1326
  def call_file_storage_dir_create(self, node, file_storage_dir):
1327
    """Create the given file storage directory.
1328

1329
    This is a single-node call.
1330

1331
    """
1332
    return self._SingleNodeCall(node, "file_storage_dir_create",
1333
                                [file_storage_dir])
1334

    
1335
  @_RpcTimeout(_TMO_FAST)
1336
  def call_file_storage_dir_remove(self, node, file_storage_dir):
1337
    """Remove the given file storage directory.
1338

1339
    This is a single-node call.
1340

1341
    """
1342
    return self._SingleNodeCall(node, "file_storage_dir_remove",
1343
                                [file_storage_dir])
1344

    
1345
  @_RpcTimeout(_TMO_FAST)
1346
  def call_file_storage_dir_rename(self, node, old_file_storage_dir,
1347
                                   new_file_storage_dir):
1348
    """Rename file storage directory.
1349

1350
    This is a single-node call.
1351

1352
    """
1353
    return self._SingleNodeCall(node, "file_storage_dir_rename",
1354
                                [old_file_storage_dir, new_file_storage_dir])
1355

    
1356
  @classmethod
1357
  @_RpcTimeout(_TMO_FAST)
1358
  def call_jobqueue_update(cls, node_list, address_list, file_name, content):
1359
    """Update job queue.
1360

1361
    This is a multi-node call.
1362

1363
    """
1364
    return cls._StaticMultiNodeCall(node_list, "jobqueue_update",
1365
                                    [file_name, cls._Compress(content)],
1366
                                    address_list=address_list)
1367

    
1368
  @classmethod
1369
  @_RpcTimeout(_TMO_NORMAL)
1370
  def call_jobqueue_purge(cls, node):
1371
    """Purge job queue.
1372

1373
    This is a single-node call.
1374

1375
    """
1376
    return cls._StaticSingleNodeCall(node, "jobqueue_purge", [])
1377

    
1378
  @classmethod
1379
  @_RpcTimeout(_TMO_FAST)
1380
  def call_jobqueue_rename(cls, node_list, address_list, rename):
1381
    """Rename a job queue file.
1382

1383
    This is a multi-node call.
1384

1385
    """
1386
    return cls._StaticMultiNodeCall(node_list, "jobqueue_rename", rename,
1387
                                    address_list=address_list)
1388

    
1389
  @_RpcTimeout(_TMO_NORMAL)
1390
  def call_hypervisor_validate_params(self, node_list, hvname, hvparams):
1391
    """Validate the hypervisor params.
1392

1393
    This is a multi-node call.
1394

1395
    @type node_list: list
1396
    @param node_list: the list of nodes to query
1397
    @type hvname: string
1398
    @param hvname: the hypervisor name
1399
    @type hvparams: dict
1400
    @param hvparams: the hypervisor parameters to be validated
1401

1402
    """
1403
    cluster = self._cfg.GetClusterInfo()
1404
    hv_full = objects.FillDict(cluster.hvparams.get(hvname, {}), hvparams)
1405
    return self._MultiNodeCall(node_list, "hypervisor_validate_params",
1406
                               [hvname, hv_full])
1407

    
1408
  @_RpcTimeout(_TMO_NORMAL)
1409
  def call_x509_cert_create(self, node, validity):
1410
    """Creates a new X509 certificate for SSL/TLS.
1411

1412
    This is a single-node call.
1413

1414
    @type validity: int
1415
    @param validity: Validity in seconds
1416

1417
    """
1418
    return self._SingleNodeCall(node, "x509_cert_create", [validity])
1419

    
1420
  @_RpcTimeout(_TMO_NORMAL)
1421
  def call_x509_cert_remove(self, node, name):
1422
    """Removes a X509 certificate.
1423

1424
    This is a single-node call.
1425

1426
    @type name: string
1427
    @param name: Certificate name
1428

1429
    """
1430
    return self._SingleNodeCall(node, "x509_cert_remove", [name])
1431

    
1432
  @_RpcTimeout(_TMO_NORMAL)
1433
  def call_import_start(self, node, opts, instance, dest, dest_args):
1434
    """Starts a listener for an import.
1435

1436
    This is a single-node call.
1437

1438
    @type node: string
1439
    @param node: Node name
1440
    @type instance: C{objects.Instance}
1441
    @param instance: Instance object
1442

1443
    """
1444
    return self._SingleNodeCall(node, "import_start",
1445
                                [opts.ToDict(),
1446
                                 self._InstDict(instance), dest,
1447
                                 _EncodeImportExportIO(dest, dest_args)])
1448

    
1449
  @_RpcTimeout(_TMO_NORMAL)
1450
  def call_export_start(self, node, opts, host, port,
1451
                        instance, source, source_args):
1452
    """Starts an export daemon.
1453

1454
    This is a single-node call.
1455

1456
    @type node: string
1457
    @param node: Node name
1458
    @type instance: C{objects.Instance}
1459
    @param instance: Instance object
1460

1461
    """
1462
    return self._SingleNodeCall(node, "export_start",
1463
                                [opts.ToDict(), host, port,
1464
                                 self._InstDict(instance), source,
1465
                                 _EncodeImportExportIO(source, source_args)])
1466

    
1467
  @_RpcTimeout(_TMO_FAST)
1468
  def call_impexp_status(self, node, names):
1469
    """Gets the status of an import or export.
1470

1471
    This is a single-node call.
1472

1473
    @type node: string
1474
    @param node: Node name
1475
    @type names: List of strings
1476
    @param names: Import/export names
1477
    @rtype: List of L{objects.ImportExportStatus} instances
1478
    @return: Returns a list of the state of each named import/export or None if
1479
             a status couldn't be retrieved
1480

1481
    """
1482
    result = self._SingleNodeCall(node, "impexp_status", [names])
1483

    
1484
    if not result.fail_msg:
1485
      decoded = []
1486

    
1487
      for i in result.payload:
1488
        if i is None:
1489
          decoded.append(None)
1490
          continue
1491
        decoded.append(objects.ImportExportStatus.FromDict(i))
1492

    
1493
      result.payload = decoded
1494

    
1495
    return result
1496

    
1497
  @_RpcTimeout(_TMO_NORMAL)
1498
  def call_impexp_abort(self, node, name):
1499
    """Aborts an import or export.
1500

1501
    This is a single-node call.
1502

1503
    @type node: string
1504
    @param node: Node name
1505
    @type name: string
1506
    @param name: Import/export name
1507

1508
    """
1509
    return self._SingleNodeCall(node, "impexp_abort", [name])
1510

    
1511
  @_RpcTimeout(_TMO_NORMAL)
1512
  def call_impexp_cleanup(self, node, name):
1513
    """Cleans up after an import or export.
1514

1515
    This is a single-node call.
1516

1517
    @type node: string
1518
    @param node: Node name
1519
    @type name: string
1520
    @param name: Import/export name
1521

1522
    """
1523
    return self._SingleNodeCall(node, "impexp_cleanup", [name])