Statistics
| Branch: | Tag: | Revision:

root / lib / rpc.py @ 19ddc57a

History | View | Annotate | Download (44.5 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2010 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Inter-node RPC library.
23

24
"""
25

    
26
# pylint: disable-msg=C0103,R0201,R0904
27
# C0103: Invalid name, since call_ are not valid
28
# R0201: Method could be a function, we keep all rpcs instance methods
29
# as not to change them back and forth between static/instance methods
30
# if they need to start using instance attributes
31
# R0904: Too many public methods
32

    
33
import os
34
import logging
35
import zlib
36
import base64
37
import pycurl
38
import threading
39

    
40
from ganeti import utils
41
from ganeti import objects
42
from ganeti import http
43
from ganeti import serializer
44
from ganeti import constants
45
from ganeti import errors
46
from ganeti import netutils
47
from ganeti import ssconf
48

    
49
# pylint has a bug here, doesn't see this import
50
import ganeti.http.client  # pylint: disable-msg=W0611
51

    
52

    
53
# Timeout for connecting to nodes (seconds)
54
_RPC_CONNECT_TIMEOUT = 5
55

    
56
_RPC_CLIENT_HEADERS = [
57
  "Content-type: %s" % http.HTTP_APP_JSON,
58
  ]
59

    
60
# Various time constants for the timeout table
61
_TMO_URGENT = 60 # one minute
62
_TMO_FAST = 5 * 60 # five minutes
63
_TMO_NORMAL = 15 * 60 # 15 minutes
64
_TMO_SLOW = 3600 # one hour
65
_TMO_4HRS = 4 * 3600
66
_TMO_1DAY = 86400
67

    
68
# Timeout table that will be built later by decorators
69
# Guidelines for choosing timeouts:
70
# - call used during watcher: timeout -> 1min, _TMO_URGENT
71
# - trivial (but be sure it is trivial) (e.g. reading a file): 5min, _TMO_FAST
72
# - other calls: 15 min, _TMO_NORMAL
73
# - special calls (instance add, etc.): either _TMO_SLOW (1h) or huge timeouts
74

    
75
_TIMEOUTS = {
76
}
77

    
78

    
79
def Init():
80
  """Initializes the module-global HTTP client manager.
81

82
  Must be called before using any RPC function and while exactly one thread is
83
  running.
84

85
  """
86
  # curl_global_init(3) and curl_global_cleanup(3) must be called with only
87
  # one thread running. This check is just a safety measure -- it doesn't
88
  # cover all cases.
89
  assert threading.activeCount() == 1, \
90
         "Found more than one active thread when initializing pycURL"
91

    
92
  logging.info("Using PycURL %s", pycurl.version)
93

    
94
  pycurl.global_init(pycurl.GLOBAL_ALL)
95

    
96

    
97
def Shutdown():
98
  """Stops the module-global HTTP client manager.
99

100
  Must be called before quitting the program and while exactly one thread is
101
  running.
102

103
  """
104
  pycurl.global_cleanup()
105

    
106

    
107
def _ConfigRpcCurl(curl):
108
  noded_cert = str(constants.NODED_CERT_FILE)
109

    
110
  curl.setopt(pycurl.FOLLOWLOCATION, False)
111
  curl.setopt(pycurl.CAINFO, noded_cert)
112
  curl.setopt(pycurl.SSL_VERIFYHOST, 0)
113
  curl.setopt(pycurl.SSL_VERIFYPEER, True)
114
  curl.setopt(pycurl.SSLCERTTYPE, "PEM")
115
  curl.setopt(pycurl.SSLCERT, noded_cert)
116
  curl.setopt(pycurl.SSLKEYTYPE, "PEM")
117
  curl.setopt(pycurl.SSLKEY, noded_cert)
118
  curl.setopt(pycurl.CONNECTTIMEOUT, _RPC_CONNECT_TIMEOUT)
119

    
120

    
121
class _RpcThreadLocal(threading.local):
122
  def GetHttpClientPool(self):
123
    """Returns a per-thread HTTP client pool.
124

125
    @rtype: L{http.client.HttpClientPool}
126

127
    """
128
    try:
129
      pool = self.hcp
130
    except AttributeError:
131
      pool = http.client.HttpClientPool(_ConfigRpcCurl)
132
      self.hcp = pool
133

    
134
    return pool
135

    
136

    
137
_thread_local = _RpcThreadLocal()
138

    
139

    
140
def _RpcTimeout(secs):
141
  """Timeout decorator.
142

143
  When applied to a rpc call_* function, it updates the global timeout
144
  table with the given function/timeout.
145

146
  """
147
  def decorator(f):
148
    name = f.__name__
149
    assert name.startswith("call_")
150
    _TIMEOUTS[name[len("call_"):]] = secs
151
    return f
152
  return decorator
153

    
154

    
155
def RunWithRPC(fn):
156
  """RPC-wrapper decorator.
157

158
  When applied to a function, it runs it with the RPC system
159
  initialized, and it shutsdown the system afterwards. This means the
160
  function must be called without RPC being initialized.
161

162
  """
163
  def wrapper(*args, **kwargs):
164
    Init()
165
    try:
166
      return fn(*args, **kwargs)
167
    finally:
168
      Shutdown()
169
  return wrapper
170

    
171

    
172
class RpcResult(object):
173
  """RPC Result class.
174

175
  This class holds an RPC result. It is needed since in multi-node
176
  calls we can't raise an exception just because one one out of many
177
  failed, and therefore we use this class to encapsulate the result.
178

179
  @ivar data: the data payload, for successful results, or None
180
  @ivar call: the name of the RPC call
181
  @ivar node: the name of the node to which we made the call
182
  @ivar offline: whether the operation failed because the node was
183
      offline, as opposed to actual failure; offline=True will always
184
      imply failed=True, in order to allow simpler checking if
185
      the user doesn't care about the exact failure mode
186
  @ivar fail_msg: the error message if the call failed
187

188
  """
189
  def __init__(self, data=None, failed=False, offline=False,
190
               call=None, node=None):
191
    self.offline = offline
192
    self.call = call
193
    self.node = node
194

    
195
    if offline:
196
      self.fail_msg = "Node is marked offline"
197
      self.data = self.payload = None
198
    elif failed:
199
      self.fail_msg = self._EnsureErr(data)
200
      self.data = self.payload = None
201
    else:
202
      self.data = data
203
      if not isinstance(self.data, (tuple, list)):
204
        self.fail_msg = ("RPC layer error: invalid result type (%s)" %
205
                         type(self.data))
206
        self.payload = None
207
      elif len(data) != 2:
208
        self.fail_msg = ("RPC layer error: invalid result length (%d), "
209
                         "expected 2" % len(self.data))
210
        self.payload = None
211
      elif not self.data[0]:
212
        self.fail_msg = self._EnsureErr(self.data[1])
213
        self.payload = None
214
      else:
215
        # finally success
216
        self.fail_msg = None
217
        self.payload = data[1]
218

    
219
    assert hasattr(self, "call")
220
    assert hasattr(self, "data")
221
    assert hasattr(self, "fail_msg")
222
    assert hasattr(self, "node")
223
    assert hasattr(self, "offline")
224
    assert hasattr(self, "payload")
225

    
226
  @staticmethod
227
  def _EnsureErr(val):
228
    """Helper to ensure we return a 'True' value for error."""
229
    if val:
230
      return val
231
    else:
232
      return "No error information"
233

    
234
  def Raise(self, msg, prereq=False, ecode=None):
235
    """If the result has failed, raise an OpExecError.
236

237
    This is used so that LU code doesn't have to check for each
238
    result, but instead can call this function.
239

240
    """
241
    if not self.fail_msg:
242
      return
243

    
244
    if not msg: # one could pass None for default message
245
      msg = ("Call '%s' to node '%s' has failed: %s" %
246
             (self.call, self.node, self.fail_msg))
247
    else:
248
      msg = "%s: %s" % (msg, self.fail_msg)
249
    if prereq:
250
      ec = errors.OpPrereqError
251
    else:
252
      ec = errors.OpExecError
253
    if ecode is not None:
254
      args = (msg, ecode)
255
    else:
256
      args = (msg, )
257
    raise ec(*args) # pylint: disable-msg=W0142
258

    
259

    
260
def _AddressLookup(node_list,
261
                   ssc=ssconf.SimpleStore,
262
                   nslookup_fn=netutils.Hostname.GetIP):
263
  """Return addresses for given node names.
264

265
  @type node_list: list
266
  @param node_list: List of node names
267
  @type ssc: class
268
  @param ssc: SimpleStore class that is used to obtain node->ip mappings
269
  @type nslookup_fn: callable
270
  @param nslookup_fn: function use to do NS lookup
271
  @rtype: list of addresses and/or None's
272
  @returns: List of corresponding addresses, if found
273

274
  """
275
  ss = ssc()
276
  iplist = ss.GetNodePrimaryIPList()
277
  family = ss.GetPrimaryIPFamily()
278
  addresses = []
279
  ipmap = dict(entry.split() for entry in iplist)
280
  for node in node_list:
281
    address = ipmap.get(node)
282
    if address is None:
283
      address = nslookup_fn(node, family=family)
284
    addresses.append(address)
285

    
286
  return addresses
287

    
288

    
289
class Client:
290
  """RPC Client class.
291

292
  This class, given a (remote) method name, a list of parameters and a
293
  list of nodes, will contact (in parallel) all nodes, and return a
294
  dict of results (key: node name, value: result).
295

296
  One current bug is that generic failure is still signaled by
297
  'False' result, which is not good. This overloading of values can
298
  cause bugs.
299

300
  """
301
  def __init__(self, procedure, body, port, address_lookup_fn=_AddressLookup):
302
    assert procedure in _TIMEOUTS, ("New RPC call not declared in the"
303
                                    " timeouts table")
304
    self.procedure = procedure
305
    self.body = body
306
    self.port = port
307
    self._request = {}
308
    self._address_lookup_fn = address_lookup_fn
309

    
310
  def ConnectList(self, node_list, address_list=None, read_timeout=None):
311
    """Add a list of nodes to the target nodes.
312

313
    @type node_list: list
314
    @param node_list: the list of node names to connect
315
    @type address_list: list or None
316
    @keyword address_list: either None or a list with node addresses,
317
        which must have the same length as the node list
318
    @type read_timeout: int
319
    @param read_timeout: overwrites default timeout for operation
320

321
    """
322
    if address_list is None:
323
      # Always use IP address instead of node name
324
      address_list = self._address_lookup_fn(node_list)
325

    
326
    assert len(node_list) == len(address_list), \
327
           "Name and address lists must have the same length"
328

    
329
    for node, address in zip(node_list, address_list):
330
      self.ConnectNode(node, address, read_timeout=read_timeout)
331

    
332
  def ConnectNode(self, name, address=None, read_timeout=None):
333
    """Add a node to the target list.
334

335
    @type name: str
336
    @param name: the node name
337
    @type address: str
338
    @param address: the node address, if known
339
    @type read_timeout: int
340
    @param read_timeout: overwrites default timeout for operation
341

342
    """
343
    if address is None:
344
      # Always use IP address instead of node name
345
      address = self._address_lookup_fn([name])[0]
346

    
347
    assert(address is not None)
348

    
349
    if read_timeout is None:
350
      read_timeout = _TIMEOUTS[self.procedure]
351

    
352
    self._request[name] = \
353
      http.client.HttpClientRequest(str(address), self.port,
354
                                    http.HTTP_PUT, str("/%s" % self.procedure),
355
                                    headers=_RPC_CLIENT_HEADERS,
356
                                    post_data=str(self.body),
357
                                    read_timeout=read_timeout)
358

    
359
  def GetResults(self, http_pool=None):
360
    """Call nodes and return results.
361

362
    @rtype: list
363
    @return: List of RPC results
364

365
    """
366
    if not http_pool:
367
      http_pool = _thread_local.GetHttpClientPool()
368

    
369
    http_pool.ProcessRequests(self._request.values())
370

    
371
    results = {}
372

    
373
    for name, req in self._request.iteritems():
374
      if req.success and req.resp_status_code == http.HTTP_OK:
375
        results[name] = RpcResult(data=serializer.LoadJson(req.resp_body),
376
                                  node=name, call=self.procedure)
377
        continue
378

    
379
      # TODO: Better error reporting
380
      if req.error:
381
        msg = req.error
382
      else:
383
        msg = req.resp_body
384

    
385
      logging.error("RPC error in %s from node %s: %s",
386
                    self.procedure, name, msg)
387
      results[name] = RpcResult(data=msg, failed=True, node=name,
388
                                call=self.procedure)
389

    
390
    return results
391

    
392

    
393
def _EncodeImportExportIO(ieio, ieioargs):
394
  """Encodes import/export I/O information.
395

396
  """
397
  if ieio == constants.IEIO_RAW_DISK:
398
    assert len(ieioargs) == 1
399
    return (ieioargs[0].ToDict(), )
400

    
401
  if ieio == constants.IEIO_SCRIPT:
402
    assert len(ieioargs) == 2
403
    return (ieioargs[0].ToDict(), ieioargs[1])
404

    
405
  return ieioargs
406

    
407

    
408
class RpcRunner(object):
409
  """RPC runner class"""
410

    
411
  def __init__(self, cfg):
412
    """Initialized the rpc runner.
413

414
    @type cfg:  C{config.ConfigWriter}
415
    @param cfg: the configuration object that will be used to get data
416
                about the cluster
417

418
    """
419
    self._cfg = cfg
420
    self.port = netutils.GetDaemonPort(constants.NODED)
421

    
422
  def _InstDict(self, instance, hvp=None, bep=None, osp=None):
423
    """Convert the given instance to a dict.
424

425
    This is done via the instance's ToDict() method and additionally
426
    we fill the hvparams with the cluster defaults.
427

428
    @type instance: L{objects.Instance}
429
    @param instance: an Instance object
430
    @type hvp: dict or None
431
    @param hvp: a dictionary with overridden hypervisor parameters
432
    @type bep: dict or None
433
    @param bep: a dictionary with overridden backend parameters
434
    @type osp: dict or None
435
    @param osp: a dictionary with overriden os parameters
436
    @rtype: dict
437
    @return: the instance dict, with the hvparams filled with the
438
        cluster defaults
439

440
    """
441
    idict = instance.ToDict()
442
    cluster = self._cfg.GetClusterInfo()
443
    idict["hvparams"] = cluster.FillHV(instance)
444
    if hvp is not None:
445
      idict["hvparams"].update(hvp)
446
    idict["beparams"] = cluster.FillBE(instance)
447
    if bep is not None:
448
      idict["beparams"].update(bep)
449
    idict["osparams"] = cluster.SimpleFillOS(instance.os, instance.osparams)
450
    if osp is not None:
451
      idict["osparams"].update(osp)
452
    for nic in idict["nics"]:
453
      nic['nicparams'] = objects.FillDict(
454
        cluster.nicparams[constants.PP_DEFAULT],
455
        nic['nicparams'])
456
    return idict
457

    
458
  def _ConnectList(self, client, node_list, call, read_timeout=None):
459
    """Helper for computing node addresses.
460

461
    @type client: L{ganeti.rpc.Client}
462
    @param client: a C{Client} instance
463
    @type node_list: list
464
    @param node_list: the node list we should connect
465
    @type call: string
466
    @param call: the name of the remote procedure call, for filling in
467
        correctly any eventual offline nodes' results
468
    @type read_timeout: int
469
    @param read_timeout: overwrites the default read timeout for the
470
        given operation
471

472
    """
473
    all_nodes = self._cfg.GetAllNodesInfo()
474
    name_list = []
475
    addr_list = []
476
    skip_dict = {}
477
    for node in node_list:
478
      if node in all_nodes:
479
        if all_nodes[node].offline:
480
          skip_dict[node] = RpcResult(node=node, offline=True, call=call)
481
          continue
482
        val = all_nodes[node].primary_ip
483
      else:
484
        val = None
485
      addr_list.append(val)
486
      name_list.append(node)
487
    if name_list:
488
      client.ConnectList(name_list, address_list=addr_list,
489
                         read_timeout=read_timeout)
490
    return skip_dict
491

    
492
  def _ConnectNode(self, client, node, call, read_timeout=None):
493
    """Helper for computing one node's address.
494

495
    @type client: L{ganeti.rpc.Client}
496
    @param client: a C{Client} instance
497
    @type node: str
498
    @param node: the node we should connect
499
    @type call: string
500
    @param call: the name of the remote procedure call, for filling in
501
        correctly any eventual offline nodes' results
502
    @type read_timeout: int
503
    @param read_timeout: overwrites the default read timeout for the
504
        given operation
505

506
    """
507
    node_info = self._cfg.GetNodeInfo(node)
508
    if node_info is not None:
509
      if node_info.offline:
510
        return RpcResult(node=node, offline=True, call=call)
511
      addr = node_info.primary_ip
512
    else:
513
      addr = None
514
    client.ConnectNode(node, address=addr, read_timeout=read_timeout)
515

    
516
  def _MultiNodeCall(self, node_list, procedure, args, read_timeout=None):
517
    """Helper for making a multi-node call
518

519
    """
520
    body = serializer.DumpJson(args, indent=False)
521
    c = Client(procedure, body, self.port)
522
    skip_dict = self._ConnectList(c, node_list, procedure,
523
                                  read_timeout=read_timeout)
524
    skip_dict.update(c.GetResults())
525
    return skip_dict
526

    
527
  @classmethod
528
  def _StaticMultiNodeCall(cls, node_list, procedure, args,
529
                           address_list=None, read_timeout=None):
530
    """Helper for making a multi-node static call
531

532
    """
533
    body = serializer.DumpJson(args, indent=False)
534
    c = Client(procedure, body, netutils.GetDaemonPort(constants.NODED))
535
    c.ConnectList(node_list, address_list=address_list,
536
                  read_timeout=read_timeout)
537
    return c.GetResults()
538

    
539
  def _SingleNodeCall(self, node, procedure, args, read_timeout=None):
540
    """Helper for making a single-node call
541

542
    """
543
    body = serializer.DumpJson(args, indent=False)
544
    c = Client(procedure, body, self.port)
545
    result = self._ConnectNode(c, node, procedure, read_timeout=read_timeout)
546
    if result is None:
547
      # we did connect, node is not offline
548
      result = c.GetResults()[node]
549
    return result
550

    
551
  @classmethod
552
  def _StaticSingleNodeCall(cls, node, procedure, args, read_timeout=None):
553
    """Helper for making a single-node static call
554

555
    """
556
    body = serializer.DumpJson(args, indent=False)
557
    c = Client(procedure, body, netutils.GetDaemonPort(constants.NODED))
558
    c.ConnectNode(node, read_timeout=read_timeout)
559
    return c.GetResults()[node]
560

    
561
  @staticmethod
562
  def _Compress(data):
563
    """Compresses a string for transport over RPC.
564

565
    Small amounts of data are not compressed.
566

567
    @type data: str
568
    @param data: Data
569
    @rtype: tuple
570
    @return: Encoded data to send
571

572
    """
573
    # Small amounts of data are not compressed
574
    if len(data) < 512:
575
      return (constants.RPC_ENCODING_NONE, data)
576

    
577
    # Compress with zlib and encode in base64
578
    return (constants.RPC_ENCODING_ZLIB_BASE64,
579
            base64.b64encode(zlib.compress(data, 3)))
580

    
581
  #
582
  # Begin RPC calls
583
  #
584

    
585
  @_RpcTimeout(_TMO_URGENT)
586
  def call_lv_list(self, node_list, vg_name):
587
    """Gets the logical volumes present in a given volume group.
588

589
    This is a multi-node call.
590

591
    """
592
    return self._MultiNodeCall(node_list, "lv_list", [vg_name])
593

    
594
  @_RpcTimeout(_TMO_URGENT)
595
  def call_vg_list(self, node_list):
596
    """Gets the volume group list.
597

598
    This is a multi-node call.
599

600
    """
601
    return self._MultiNodeCall(node_list, "vg_list", [])
602

    
603
  @_RpcTimeout(_TMO_NORMAL)
604
  def call_storage_list(self, node_list, su_name, su_args, name, fields):
605
    """Get list of storage units.
606

607
    This is a multi-node call.
608

609
    """
610
    return self._MultiNodeCall(node_list, "storage_list",
611
                               [su_name, su_args, name, fields])
612

    
613
  @_RpcTimeout(_TMO_NORMAL)
614
  def call_storage_modify(self, node, su_name, su_args, name, changes):
615
    """Modify a storage unit.
616

617
    This is a single-node call.
618

619
    """
620
    return self._SingleNodeCall(node, "storage_modify",
621
                                [su_name, su_args, name, changes])
622

    
623
  @_RpcTimeout(_TMO_NORMAL)
624
  def call_storage_execute(self, node, su_name, su_args, name, op):
625
    """Executes an operation on a storage unit.
626

627
    This is a single-node call.
628

629
    """
630
    return self._SingleNodeCall(node, "storage_execute",
631
                                [su_name, su_args, name, op])
632

    
633
  @_RpcTimeout(_TMO_URGENT)
634
  def call_bridges_exist(self, node, bridges_list):
635
    """Checks if a node has all the bridges given.
636

637
    This method checks if all bridges given in the bridges_list are
638
    present on the remote node, so that an instance that uses interfaces
639
    on those bridges can be started.
640

641
    This is a single-node call.
642

643
    """
644
    return self._SingleNodeCall(node, "bridges_exist", [bridges_list])
645

    
646
  @_RpcTimeout(_TMO_NORMAL)
647
  def call_instance_start(self, node, instance, hvp, bep):
648
    """Starts an instance.
649

650
    This is a single-node call.
651

652
    """
653
    idict = self._InstDict(instance, hvp=hvp, bep=bep)
654
    return self._SingleNodeCall(node, "instance_start", [idict])
655

    
656
  @_RpcTimeout(_TMO_NORMAL)
657
  def call_instance_shutdown(self, node, instance, timeout):
658
    """Stops an instance.
659

660
    This is a single-node call.
661

662
    """
663
    return self._SingleNodeCall(node, "instance_shutdown",
664
                                [self._InstDict(instance), timeout])
665

    
666
  @_RpcTimeout(_TMO_NORMAL)
667
  def call_migration_info(self, node, instance):
668
    """Gather the information necessary to prepare an instance migration.
669

670
    This is a single-node call.
671

672
    @type node: string
673
    @param node: the node on which the instance is currently running
674
    @type instance: C{objects.Instance}
675
    @param instance: the instance definition
676

677
    """
678
    return self._SingleNodeCall(node, "migration_info",
679
                                [self._InstDict(instance)])
680

    
681
  @_RpcTimeout(_TMO_NORMAL)
682
  def call_accept_instance(self, node, instance, info, target):
683
    """Prepare a node to accept an instance.
684

685
    This is a single-node call.
686

687
    @type node: string
688
    @param node: the target node for the migration
689
    @type instance: C{objects.Instance}
690
    @param instance: the instance definition
691
    @type info: opaque/hypervisor specific (string/data)
692
    @param info: result for the call_migration_info call
693
    @type target: string
694
    @param target: target hostname (usually ip address) (on the node itself)
695

696
    """
697
    return self._SingleNodeCall(node, "accept_instance",
698
                                [self._InstDict(instance), info, target])
699

    
700
  @_RpcTimeout(_TMO_NORMAL)
701
  def call_finalize_migration(self, node, instance, info, success):
702
    """Finalize any target-node migration specific operation.
703

704
    This is called both in case of a successful migration and in case of error
705
    (in which case it should abort the migration).
706

707
    This is a single-node call.
708

709
    @type node: string
710
    @param node: the target node for the migration
711
    @type instance: C{objects.Instance}
712
    @param instance: the instance definition
713
    @type info: opaque/hypervisor specific (string/data)
714
    @param info: result for the call_migration_info call
715
    @type success: boolean
716
    @param success: whether the migration was a success or a failure
717

718
    """
719
    return self._SingleNodeCall(node, "finalize_migration",
720
                                [self._InstDict(instance), info, success])
721

    
722
  @_RpcTimeout(_TMO_SLOW)
723
  def call_instance_migrate(self, node, instance, target, live):
724
    """Migrate an instance.
725

726
    This is a single-node call.
727

728
    @type node: string
729
    @param node: the node on which the instance is currently running
730
    @type instance: C{objects.Instance}
731
    @param instance: the instance definition
732
    @type target: string
733
    @param target: the target node name
734
    @type live: boolean
735
    @param live: whether the migration should be done live or not (the
736
        interpretation of this parameter is left to the hypervisor)
737

738
    """
739
    return self._SingleNodeCall(node, "instance_migrate",
740
                                [self._InstDict(instance), target, live])
741

    
742
  @_RpcTimeout(_TMO_NORMAL)
743
  def call_instance_reboot(self, node, inst, reboot_type, shutdown_timeout):
744
    """Reboots an instance.
745

746
    This is a single-node call.
747

748
    """
749
    return self._SingleNodeCall(node, "instance_reboot",
750
                                [self._InstDict(inst), reboot_type,
751
                                 shutdown_timeout])
752

    
753
  @_RpcTimeout(_TMO_1DAY)
754
  def call_instance_os_add(self, node, inst, reinstall, debug):
755
    """Installs an OS on the given instance.
756

757
    This is a single-node call.
758

759
    """
760
    return self._SingleNodeCall(node, "instance_os_add",
761
                                [self._InstDict(inst), reinstall, debug])
762

    
763
  @_RpcTimeout(_TMO_SLOW)
764
  def call_instance_run_rename(self, node, inst, old_name, debug):
765
    """Run the OS rename script for an instance.
766

767
    This is a single-node call.
768

769
    """
770
    return self._SingleNodeCall(node, "instance_run_rename",
771
                                [self._InstDict(inst), old_name, debug])
772

    
773
  @_RpcTimeout(_TMO_URGENT)
774
  def call_instance_info(self, node, instance, hname):
775
    """Returns information about a single instance.
776

777
    This is a single-node call.
778

779
    @type node: list
780
    @param node: the list of nodes to query
781
    @type instance: string
782
    @param instance: the instance name
783
    @type hname: string
784
    @param hname: the hypervisor type of the instance
785

786
    """
787
    return self._SingleNodeCall(node, "instance_info", [instance, hname])
788

    
789
  @_RpcTimeout(_TMO_NORMAL)
790
  def call_instance_migratable(self, node, instance):
791
    """Checks whether the given instance can be migrated.
792

793
    This is a single-node call.
794

795
    @param node: the node to query
796
    @type instance: L{objects.Instance}
797
    @param instance: the instance to check
798

799

800
    """
801
    return self._SingleNodeCall(node, "instance_migratable",
802
                                [self._InstDict(instance)])
803

    
804
  @_RpcTimeout(_TMO_URGENT)
805
  def call_all_instances_info(self, node_list, hypervisor_list):
806
    """Returns information about all instances on the given nodes.
807

808
    This is a multi-node call.
809

810
    @type node_list: list
811
    @param node_list: the list of nodes to query
812
    @type hypervisor_list: list
813
    @param hypervisor_list: the hypervisors to query for instances
814

815
    """
816
    return self._MultiNodeCall(node_list, "all_instances_info",
817
                               [hypervisor_list])
818

    
819
  @_RpcTimeout(_TMO_URGENT)
820
  def call_instance_list(self, node_list, hypervisor_list):
821
    """Returns the list of running instances on a given node.
822

823
    This is a multi-node call.
824

825
    @type node_list: list
826
    @param node_list: the list of nodes to query
827
    @type hypervisor_list: list
828
    @param hypervisor_list: the hypervisors to query for instances
829

830
    """
831
    return self._MultiNodeCall(node_list, "instance_list", [hypervisor_list])
832

    
833
  @_RpcTimeout(_TMO_FAST)
834
  def call_node_tcp_ping(self, node, source, target, port, timeout,
835
                         live_port_needed):
836
    """Do a TcpPing on the remote node
837

838
    This is a single-node call.
839

840
    """
841
    return self._SingleNodeCall(node, "node_tcp_ping",
842
                                [source, target, port, timeout,
843
                                 live_port_needed])
844

    
845
  @_RpcTimeout(_TMO_FAST)
846
  def call_node_has_ip_address(self, node, address):
847
    """Checks if a node has the given IP address.
848

849
    This is a single-node call.
850

851
    """
852
    return self._SingleNodeCall(node, "node_has_ip_address", [address])
853

    
854
  @_RpcTimeout(_TMO_URGENT)
855
  def call_node_info(self, node_list, vg_name, hypervisor_type):
856
    """Return node information.
857

858
    This will return memory information and volume group size and free
859
    space.
860

861
    This is a multi-node call.
862

863
    @type node_list: list
864
    @param node_list: the list of nodes to query
865
    @type vg_name: C{string}
866
    @param vg_name: the name of the volume group to ask for disk space
867
        information
868
    @type hypervisor_type: C{str}
869
    @param hypervisor_type: the name of the hypervisor to ask for
870
        memory information
871

872
    """
873
    return self._MultiNodeCall(node_list, "node_info",
874
                               [vg_name, hypervisor_type])
875

    
876
  @_RpcTimeout(_TMO_NORMAL)
877
  def call_etc_hosts_modify(self, node, mode, name, ip):
878
    """Modify hosts file with name
879

880
    @type node: string
881
    @param node: The node to call
882
    @type mode: string
883
    @param mode: The mode to operate. Currently "add" or "remove"
884
    @type name: string
885
    @param name: The host name to be modified
886
    @type ip: string
887
    @param ip: The ip of the entry (just valid if mode is "add")
888

889
    """
890
    return self._SingleNodeCall(node, "etc_hosts_modify", [mode, name, ip])
891

    
892
  @_RpcTimeout(_TMO_NORMAL)
893
  def call_node_verify(self, node_list, checkdict, cluster_name):
894
    """Request verification of given parameters.
895

896
    This is a multi-node call.
897

898
    """
899
    return self._MultiNodeCall(node_list, "node_verify",
900
                               [checkdict, cluster_name])
901

    
902
  @classmethod
903
  @_RpcTimeout(_TMO_FAST)
904
  def call_node_start_master(cls, node, start_daemons, no_voting):
905
    """Tells a node to activate itself as a master.
906

907
    This is a single-node call.
908

909
    """
910
    return cls._StaticSingleNodeCall(node, "node_start_master",
911
                                     [start_daemons, no_voting])
912

    
913
  @classmethod
914
  @_RpcTimeout(_TMO_FAST)
915
  def call_node_stop_master(cls, node, stop_daemons):
916
    """Tells a node to demote itself from master status.
917

918
    This is a single-node call.
919

920
    """
921
    return cls._StaticSingleNodeCall(node, "node_stop_master", [stop_daemons])
922

    
923
  @classmethod
924
  @_RpcTimeout(_TMO_URGENT)
925
  def call_master_info(cls, node_list):
926
    """Query master info.
927

928
    This is a multi-node call.
929

930
    """
931
    # TODO: should this method query down nodes?
932
    return cls._StaticMultiNodeCall(node_list, "master_info", [])
933

    
934
  @classmethod
935
  @_RpcTimeout(_TMO_URGENT)
936
  def call_version(cls, node_list):
937
    """Query node version.
938

939
    This is a multi-node call.
940

941
    """
942
    return cls._StaticMultiNodeCall(node_list, "version", [])
943

    
944
  @_RpcTimeout(_TMO_NORMAL)
945
  def call_blockdev_create(self, node, bdev, size, owner, on_primary, info):
946
    """Request creation of a given block device.
947

948
    This is a single-node call.
949

950
    """
951
    return self._SingleNodeCall(node, "blockdev_create",
952
                                [bdev.ToDict(), size, owner, on_primary, info])
953

    
954
  @_RpcTimeout(_TMO_NORMAL)
955
  def call_blockdev_remove(self, node, bdev):
956
    """Request removal of a given block device.
957

958
    This is a single-node call.
959

960
    """
961
    return self._SingleNodeCall(node, "blockdev_remove", [bdev.ToDict()])
962

    
963
  @_RpcTimeout(_TMO_NORMAL)
964
  def call_blockdev_rename(self, node, devlist):
965
    """Request rename of the given block devices.
966

967
    This is a single-node call.
968

969
    """
970
    return self._SingleNodeCall(node, "blockdev_rename",
971
                                [(d.ToDict(), uid) for d, uid in devlist])
972

    
973
  @_RpcTimeout(_TMO_NORMAL)
974
  def call_blockdev_assemble(self, node, disk, owner, on_primary):
975
    """Request assembling of a given block device.
976

977
    This is a single-node call.
978

979
    """
980
    return self._SingleNodeCall(node, "blockdev_assemble",
981
                                [disk.ToDict(), owner, on_primary])
982

    
983
  @_RpcTimeout(_TMO_NORMAL)
984
  def call_blockdev_shutdown(self, node, disk):
985
    """Request shutdown of a given block device.
986

987
    This is a single-node call.
988

989
    """
990
    return self._SingleNodeCall(node, "blockdev_shutdown", [disk.ToDict()])
991

    
992
  @_RpcTimeout(_TMO_NORMAL)
993
  def call_blockdev_addchildren(self, node, bdev, ndevs):
994
    """Request adding a list of children to a (mirroring) device.
995

996
    This is a single-node call.
997

998
    """
999
    return self._SingleNodeCall(node, "blockdev_addchildren",
1000
                                [bdev.ToDict(),
1001
                                 [disk.ToDict() for disk in ndevs]])
1002

    
1003
  @_RpcTimeout(_TMO_NORMAL)
1004
  def call_blockdev_removechildren(self, node, bdev, ndevs):
1005
    """Request removing a list of children from a (mirroring) device.
1006

1007
    This is a single-node call.
1008

1009
    """
1010
    return self._SingleNodeCall(node, "blockdev_removechildren",
1011
                                [bdev.ToDict(),
1012
                                 [disk.ToDict() for disk in ndevs]])
1013

    
1014
  @_RpcTimeout(_TMO_NORMAL)
1015
  def call_blockdev_getmirrorstatus(self, node, disks):
1016
    """Request status of a (mirroring) device.
1017

1018
    This is a single-node call.
1019

1020
    """
1021
    result = self._SingleNodeCall(node, "blockdev_getmirrorstatus",
1022
                                  [dsk.ToDict() for dsk in disks])
1023
    if not result.fail_msg:
1024
      result.payload = [objects.BlockDevStatus.FromDict(i)
1025
                        for i in result.payload]
1026
    return result
1027

    
1028
  @_RpcTimeout(_TMO_NORMAL)
1029
  def call_blockdev_find(self, node, disk):
1030
    """Request identification of a given block device.
1031

1032
    This is a single-node call.
1033

1034
    """
1035
    result = self._SingleNodeCall(node, "blockdev_find", [disk.ToDict()])
1036
    if not result.fail_msg and result.payload is not None:
1037
      result.payload = objects.BlockDevStatus.FromDict(result.payload)
1038
    return result
1039

    
1040
  @_RpcTimeout(_TMO_NORMAL)
1041
  def call_blockdev_close(self, node, instance_name, disks):
1042
    """Closes the given block devices.
1043

1044
    This is a single-node call.
1045

1046
    """
1047
    params = [instance_name, [cf.ToDict() for cf in disks]]
1048
    return self._SingleNodeCall(node, "blockdev_close", params)
1049

    
1050
  @_RpcTimeout(_TMO_NORMAL)
1051
  def call_blockdev_getsizes(self, node, disks):
1052
    """Returns the size of the given disks.
1053

1054
    This is a single-node call.
1055

1056
    """
1057
    params = [[cf.ToDict() for cf in disks]]
1058
    return self._SingleNodeCall(node, "blockdev_getsize", params)
1059

    
1060
  @_RpcTimeout(_TMO_NORMAL)
1061
  def call_drbd_disconnect_net(self, node_list, nodes_ip, disks):
1062
    """Disconnects the network of the given drbd devices.
1063

1064
    This is a multi-node call.
1065

1066
    """
1067
    return self._MultiNodeCall(node_list, "drbd_disconnect_net",
1068
                               [nodes_ip, [cf.ToDict() for cf in disks]])
1069

    
1070
  @_RpcTimeout(_TMO_NORMAL)
1071
  def call_drbd_attach_net(self, node_list, nodes_ip,
1072
                           disks, instance_name, multimaster):
1073
    """Disconnects the given drbd devices.
1074

1075
    This is a multi-node call.
1076

1077
    """
1078
    return self._MultiNodeCall(node_list, "drbd_attach_net",
1079
                               [nodes_ip, [cf.ToDict() for cf in disks],
1080
                                instance_name, multimaster])
1081

    
1082
  @_RpcTimeout(_TMO_SLOW)
1083
  def call_drbd_wait_sync(self, node_list, nodes_ip, disks):
1084
    """Waits for the synchronization of drbd devices is complete.
1085

1086
    This is a multi-node call.
1087

1088
    """
1089
    return self._MultiNodeCall(node_list, "drbd_wait_sync",
1090
                               [nodes_ip, [cf.ToDict() for cf in disks]])
1091

    
1092
  @_RpcTimeout(_TMO_URGENT)
1093
  def call_drbd_helper(self, node_list):
1094
    """Gets drbd helper.
1095

1096
    This is a multi-node call.
1097

1098
    """
1099
    return self._MultiNodeCall(node_list, "drbd_helper", [])
1100

    
1101
  @classmethod
1102
  @_RpcTimeout(_TMO_NORMAL)
1103
  def call_upload_file(cls, node_list, file_name, address_list=None):
1104
    """Upload a file.
1105

1106
    The node will refuse the operation in case the file is not on the
1107
    approved file list.
1108

1109
    This is a multi-node call.
1110

1111
    @type node_list: list
1112
    @param node_list: the list of node names to upload to
1113
    @type file_name: str
1114
    @param file_name: the filename to upload
1115
    @type address_list: list or None
1116
    @keyword address_list: an optional list of node addresses, in order
1117
        to optimize the RPC speed
1118

1119
    """
1120
    file_contents = utils.ReadFile(file_name)
1121
    data = cls._Compress(file_contents)
1122
    st = os.stat(file_name)
1123
    params = [file_name, data, st.st_mode, st.st_uid, st.st_gid,
1124
              st.st_atime, st.st_mtime]
1125
    return cls._StaticMultiNodeCall(node_list, "upload_file", params,
1126
                                    address_list=address_list)
1127

    
1128
  @classmethod
1129
  @_RpcTimeout(_TMO_NORMAL)
1130
  def call_write_ssconf_files(cls, node_list, values):
1131
    """Write ssconf files.
1132

1133
    This is a multi-node call.
1134

1135
    """
1136
    return cls._StaticMultiNodeCall(node_list, "write_ssconf_files", [values])
1137

    
1138
  @_RpcTimeout(_TMO_FAST)
1139
  def call_os_diagnose(self, node_list):
1140
    """Request a diagnose of OS definitions.
1141

1142
    This is a multi-node call.
1143

1144
    """
1145
    return self._MultiNodeCall(node_list, "os_diagnose", [])
1146

    
1147
  @_RpcTimeout(_TMO_FAST)
1148
  def call_os_get(self, node, name):
1149
    """Returns an OS definition.
1150

1151
    This is a single-node call.
1152

1153
    """
1154
    result = self._SingleNodeCall(node, "os_get", [name])
1155
    if not result.fail_msg and isinstance(result.payload, dict):
1156
      result.payload = objects.OS.FromDict(result.payload)
1157
    return result
1158

    
1159
  @_RpcTimeout(_TMO_FAST)
1160
  def call_os_validate(self, required, nodes, name, checks, params):
1161
    """Run a validation routine for a given OS.
1162

1163
    This is a multi-node call.
1164

1165
    """
1166
    return self._MultiNodeCall(nodes, "os_validate",
1167
                               [required, name, checks, params])
1168

    
1169
  @_RpcTimeout(_TMO_NORMAL)
1170
  def call_hooks_runner(self, node_list, hpath, phase, env):
1171
    """Call the hooks runner.
1172

1173
    Args:
1174
      - op: the OpCode instance
1175
      - env: a dictionary with the environment
1176

1177
    This is a multi-node call.
1178

1179
    """
1180
    params = [hpath, phase, env]
1181
    return self._MultiNodeCall(node_list, "hooks_runner", params)
1182

    
1183
  @_RpcTimeout(_TMO_NORMAL)
1184
  def call_iallocator_runner(self, node, name, idata):
1185
    """Call an iallocator on a remote node
1186

1187
    Args:
1188
      - name: the iallocator name
1189
      - input: the json-encoded input string
1190

1191
    This is a single-node call.
1192

1193
    """
1194
    return self._SingleNodeCall(node, "iallocator_runner", [name, idata])
1195

    
1196
  @_RpcTimeout(_TMO_NORMAL)
1197
  def call_blockdev_grow(self, node, cf_bdev, amount):
1198
    """Request a snapshot of the given block device.
1199

1200
    This is a single-node call.
1201

1202
    """
1203
    return self._SingleNodeCall(node, "blockdev_grow",
1204
                                [cf_bdev.ToDict(), amount])
1205

    
1206
  @_RpcTimeout(_TMO_1DAY)
1207
  def call_blockdev_export(self, node, cf_bdev,
1208
                           dest_node, dest_path, cluster_name):
1209
    """Export a given disk to another node.
1210

1211
    This is a single-node call.
1212

1213
    """
1214
    return self._SingleNodeCall(node, "blockdev_export",
1215
                                [cf_bdev.ToDict(), dest_node, dest_path,
1216
                                 cluster_name])
1217

    
1218
  @_RpcTimeout(_TMO_NORMAL)
1219
  def call_blockdev_snapshot(self, node, cf_bdev):
1220
    """Request a snapshot of the given block device.
1221

1222
    This is a single-node call.
1223

1224
    """
1225
    return self._SingleNodeCall(node, "blockdev_snapshot", [cf_bdev.ToDict()])
1226

    
1227
  @_RpcTimeout(_TMO_NORMAL)
1228
  def call_finalize_export(self, node, instance, snap_disks):
1229
    """Request the completion of an export operation.
1230

1231
    This writes the export config file, etc.
1232

1233
    This is a single-node call.
1234

1235
    """
1236
    flat_disks = []
1237
    for disk in snap_disks:
1238
      if isinstance(disk, bool):
1239
        flat_disks.append(disk)
1240
      else:
1241
        flat_disks.append(disk.ToDict())
1242

    
1243
    return self._SingleNodeCall(node, "finalize_export",
1244
                                [self._InstDict(instance), flat_disks])
1245

    
1246
  @_RpcTimeout(_TMO_FAST)
1247
  def call_export_info(self, node, path):
1248
    """Queries the export information in a given path.
1249

1250
    This is a single-node call.
1251

1252
    """
1253
    return self._SingleNodeCall(node, "export_info", [path])
1254

    
1255
  @_RpcTimeout(_TMO_FAST)
1256
  def call_export_list(self, node_list):
1257
    """Gets the stored exports list.
1258

1259
    This is a multi-node call.
1260

1261
    """
1262
    return self._MultiNodeCall(node_list, "export_list", [])
1263

    
1264
  @_RpcTimeout(_TMO_FAST)
1265
  def call_export_remove(self, node, export):
1266
    """Requests removal of a given export.
1267

1268
    This is a single-node call.
1269

1270
    """
1271
    return self._SingleNodeCall(node, "export_remove", [export])
1272

    
1273
  @classmethod
1274
  @_RpcTimeout(_TMO_NORMAL)
1275
  def call_node_leave_cluster(cls, node, modify_ssh_setup):
1276
    """Requests a node to clean the cluster information it has.
1277

1278
    This will remove the configuration information from the ganeti data
1279
    dir.
1280

1281
    This is a single-node call.
1282

1283
    """
1284
    return cls._StaticSingleNodeCall(node, "node_leave_cluster",
1285
                                     [modify_ssh_setup])
1286

    
1287
  @_RpcTimeout(_TMO_FAST)
1288
  def call_node_volumes(self, node_list):
1289
    """Gets all volumes on node(s).
1290

1291
    This is a multi-node call.
1292

1293
    """
1294
    return self._MultiNodeCall(node_list, "node_volumes", [])
1295

    
1296
  @_RpcTimeout(_TMO_FAST)
1297
  def call_node_demote_from_mc(self, node):
1298
    """Demote a node from the master candidate role.
1299

1300
    This is a single-node call.
1301

1302
    """
1303
    return self._SingleNodeCall(node, "node_demote_from_mc", [])
1304

    
1305
  @_RpcTimeout(_TMO_NORMAL)
1306
  def call_node_powercycle(self, node, hypervisor):
1307
    """Tries to powercycle a node.
1308

1309
    This is a single-node call.
1310

1311
    """
1312
    return self._SingleNodeCall(node, "node_powercycle", [hypervisor])
1313

    
1314
  @_RpcTimeout(None)
1315
  def call_test_delay(self, node_list, duration):
1316
    """Sleep for a fixed time on given node(s).
1317

1318
    This is a multi-node call.
1319

1320
    """
1321
    return self._MultiNodeCall(node_list, "test_delay", [duration],
1322
                               read_timeout=int(duration + 5))
1323

    
1324
  @_RpcTimeout(_TMO_FAST)
1325
  def call_file_storage_dir_create(self, node, file_storage_dir):
1326
    """Create the given file storage directory.
1327

1328
    This is a single-node call.
1329

1330
    """
1331
    return self._SingleNodeCall(node, "file_storage_dir_create",
1332
                                [file_storage_dir])
1333

    
1334
  @_RpcTimeout(_TMO_FAST)
1335
  def call_file_storage_dir_remove(self, node, file_storage_dir):
1336
    """Remove the given file storage directory.
1337

1338
    This is a single-node call.
1339

1340
    """
1341
    return self._SingleNodeCall(node, "file_storage_dir_remove",
1342
                                [file_storage_dir])
1343

    
1344
  @_RpcTimeout(_TMO_FAST)
1345
  def call_file_storage_dir_rename(self, node, old_file_storage_dir,
1346
                                   new_file_storage_dir):
1347
    """Rename file storage directory.
1348

1349
    This is a single-node call.
1350

1351
    """
1352
    return self._SingleNodeCall(node, "file_storage_dir_rename",
1353
                                [old_file_storage_dir, new_file_storage_dir])
1354

    
1355
  @classmethod
1356
  @_RpcTimeout(_TMO_FAST)
1357
  def call_jobqueue_update(cls, node_list, address_list, file_name, content):
1358
    """Update job queue.
1359

1360
    This is a multi-node call.
1361

1362
    """
1363
    return cls._StaticMultiNodeCall(node_list, "jobqueue_update",
1364
                                    [file_name, cls._Compress(content)],
1365
                                    address_list=address_list)
1366

    
1367
  @classmethod
1368
  @_RpcTimeout(_TMO_NORMAL)
1369
  def call_jobqueue_purge(cls, node):
1370
    """Purge job queue.
1371

1372
    This is a single-node call.
1373

1374
    """
1375
    return cls._StaticSingleNodeCall(node, "jobqueue_purge", [])
1376

    
1377
  @classmethod
1378
  @_RpcTimeout(_TMO_FAST)
1379
  def call_jobqueue_rename(cls, node_list, address_list, rename):
1380
    """Rename a job queue file.
1381

1382
    This is a multi-node call.
1383

1384
    """
1385
    return cls._StaticMultiNodeCall(node_list, "jobqueue_rename", rename,
1386
                                    address_list=address_list)
1387

    
1388
  @_RpcTimeout(_TMO_NORMAL)
1389
  def call_hypervisor_validate_params(self, node_list, hvname, hvparams):
1390
    """Validate the hypervisor params.
1391

1392
    This is a multi-node call.
1393

1394
    @type node_list: list
1395
    @param node_list: the list of nodes to query
1396
    @type hvname: string
1397
    @param hvname: the hypervisor name
1398
    @type hvparams: dict
1399
    @param hvparams: the hypervisor parameters to be validated
1400

1401
    """
1402
    cluster = self._cfg.GetClusterInfo()
1403
    hv_full = objects.FillDict(cluster.hvparams.get(hvname, {}), hvparams)
1404
    return self._MultiNodeCall(node_list, "hypervisor_validate_params",
1405
                               [hvname, hv_full])
1406

    
1407
  @_RpcTimeout(_TMO_NORMAL)
1408
  def call_x509_cert_create(self, node, validity):
1409
    """Creates a new X509 certificate for SSL/TLS.
1410

1411
    This is a single-node call.
1412

1413
    @type validity: int
1414
    @param validity: Validity in seconds
1415

1416
    """
1417
    return self._SingleNodeCall(node, "x509_cert_create", [validity])
1418

    
1419
  @_RpcTimeout(_TMO_NORMAL)
1420
  def call_x509_cert_remove(self, node, name):
1421
    """Removes a X509 certificate.
1422

1423
    This is a single-node call.
1424

1425
    @type name: string
1426
    @param name: Certificate name
1427

1428
    """
1429
    return self._SingleNodeCall(node, "x509_cert_remove", [name])
1430

    
1431
  @_RpcTimeout(_TMO_NORMAL)
1432
  def call_import_start(self, node, opts, instance, dest, dest_args):
1433
    """Starts a listener for an import.
1434

1435
    This is a single-node call.
1436

1437
    @type node: string
1438
    @param node: Node name
1439
    @type instance: C{objects.Instance}
1440
    @param instance: Instance object
1441

1442
    """
1443
    return self._SingleNodeCall(node, "import_start",
1444
                                [opts.ToDict(),
1445
                                 self._InstDict(instance), dest,
1446
                                 _EncodeImportExportIO(dest, dest_args)])
1447

    
1448
  @_RpcTimeout(_TMO_NORMAL)
1449
  def call_export_start(self, node, opts, host, port,
1450
                        instance, source, source_args):
1451
    """Starts an export daemon.
1452

1453
    This is a single-node call.
1454

1455
    @type node: string
1456
    @param node: Node name
1457
    @type instance: C{objects.Instance}
1458
    @param instance: Instance object
1459

1460
    """
1461
    return self._SingleNodeCall(node, "export_start",
1462
                                [opts.ToDict(), host, port,
1463
                                 self._InstDict(instance), source,
1464
                                 _EncodeImportExportIO(source, source_args)])
1465

    
1466
  @_RpcTimeout(_TMO_FAST)
1467
  def call_impexp_status(self, node, names):
1468
    """Gets the status of an import or export.
1469

1470
    This is a single-node call.
1471

1472
    @type node: string
1473
    @param node: Node name
1474
    @type names: List of strings
1475
    @param names: Import/export names
1476
    @rtype: List of L{objects.ImportExportStatus} instances
1477
    @return: Returns a list of the state of each named import/export or None if
1478
             a status couldn't be retrieved
1479

1480
    """
1481
    result = self._SingleNodeCall(node, "impexp_status", [names])
1482

    
1483
    if not result.fail_msg:
1484
      decoded = []
1485

    
1486
      for i in result.payload:
1487
        if i is None:
1488
          decoded.append(None)
1489
          continue
1490
        decoded.append(objects.ImportExportStatus.FromDict(i))
1491

    
1492
      result.payload = decoded
1493

    
1494
    return result
1495

    
1496
  @_RpcTimeout(_TMO_NORMAL)
1497
  def call_impexp_abort(self, node, name):
1498
    """Aborts an import or export.
1499

1500
    This is a single-node call.
1501

1502
    @type node: string
1503
    @param node: Node name
1504
    @type name: string
1505
    @param name: Import/export name
1506

1507
    """
1508
    return self._SingleNodeCall(node, "impexp_abort", [name])
1509

    
1510
  @_RpcTimeout(_TMO_NORMAL)
1511
  def call_impexp_cleanup(self, node, name):
1512
    """Cleans up after an import or export.
1513

1514
    This is a single-node call.
1515

1516
    @type node: string
1517
    @param node: Node name
1518
    @type name: string
1519
    @param name: Import/export name
1520

1521
    """
1522
    return self._SingleNodeCall(node, "impexp_cleanup", [name])