Statistics
| Branch: | Tag: | Revision:

root / lib / rpc.py @ 83816869

History | View | Annotate | Download (42.6 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Inter-node RPC library.
23

24
"""
25

    
26
# pylint: disable=C0103,R0201,R0904
27
# C0103: Invalid name, since call_ are not valid
28
# R0201: Method could be a function, we keep all rpcs instance methods
29
# as not to change them back and forth between static/instance methods
30
# if they need to start using instance attributes
31
# R0904: Too many public methods
32

    
33
import os
34
import logging
35
import zlib
36
import base64
37
import pycurl
38
import threading
39

    
40
from ganeti import utils
41
from ganeti import objects
42
from ganeti import http
43
from ganeti import serializer
44
from ganeti import constants
45
from ganeti import errors
46
from ganeti import netutils
47
from ganeti import ssconf
48
from ganeti import runtime
49
from ganeti import compat
50

    
51
# Special module generated at build time
52
from ganeti import _generated_rpc
53

    
54
# pylint has a bug here, doesn't see this import
55
import ganeti.http.client  # pylint: disable=W0611
56

    
57

    
58
# Timeout for connecting to nodes (seconds)
59
_RPC_CONNECT_TIMEOUT = 5
60

    
61
_RPC_CLIENT_HEADERS = [
62
  "Content-type: %s" % http.HTTP_APP_JSON,
63
  "Expect:",
64
  ]
65

    
66
# Various time constants for the timeout table
67
_TMO_URGENT = 60 # one minute
68
_TMO_FAST = 5 * 60 # five minutes
69
_TMO_NORMAL = 15 * 60 # 15 minutes
70
_TMO_SLOW = 3600 # one hour
71
_TMO_4HRS = 4 * 3600
72
_TMO_1DAY = 86400
73

    
74
# Timeout table that will be built later by decorators
75
# Guidelines for choosing timeouts:
76
# - call used during watcher: timeout -> 1min, _TMO_URGENT
77
# - trivial (but be sure it is trivial) (e.g. reading a file): 5min, _TMO_FAST
78
# - other calls: 15 min, _TMO_NORMAL
79
# - special calls (instance add, etc.): either _TMO_SLOW (1h) or huge timeouts
80

    
81
_TIMEOUTS = {
82
}
83

    
84
#: Special value to describe an offline host
85
_OFFLINE = object()
86

    
87

    
88
def Init():
89
  """Initializes the module-global HTTP client manager.
90

91
  Must be called before using any RPC function and while exactly one thread is
92
  running.
93

94
  """
95
  # curl_global_init(3) and curl_global_cleanup(3) must be called with only
96
  # one thread running. This check is just a safety measure -- it doesn't
97
  # cover all cases.
98
  assert threading.activeCount() == 1, \
99
         "Found more than one active thread when initializing pycURL"
100

    
101
  logging.info("Using PycURL %s", pycurl.version)
102

    
103
  pycurl.global_init(pycurl.GLOBAL_ALL)
104

    
105

    
106
def Shutdown():
107
  """Stops the module-global HTTP client manager.
108

109
  Must be called before quitting the program and while exactly one thread is
110
  running.
111

112
  """
113
  pycurl.global_cleanup()
114

    
115

    
116
def _ConfigRpcCurl(curl):
117
  noded_cert = str(constants.NODED_CERT_FILE)
118

    
119
  curl.setopt(pycurl.FOLLOWLOCATION, False)
120
  curl.setopt(pycurl.CAINFO, noded_cert)
121
  curl.setopt(pycurl.SSL_VERIFYHOST, 0)
122
  curl.setopt(pycurl.SSL_VERIFYPEER, True)
123
  curl.setopt(pycurl.SSLCERTTYPE, "PEM")
124
  curl.setopt(pycurl.SSLCERT, noded_cert)
125
  curl.setopt(pycurl.SSLKEYTYPE, "PEM")
126
  curl.setopt(pycurl.SSLKEY, noded_cert)
127
  curl.setopt(pycurl.CONNECTTIMEOUT, _RPC_CONNECT_TIMEOUT)
128

    
129

    
130
def _RpcTimeout(secs):
131
  """Timeout decorator.
132

133
  When applied to a rpc call_* function, it updates the global timeout
134
  table with the given function/timeout.
135

136
  """
137
  def decorator(f):
138
    name = f.__name__
139
    assert name.startswith("call_")
140
    _TIMEOUTS[name[len("call_"):]] = secs
141
    return f
142
  return decorator
143

    
144

    
145
def RunWithRPC(fn):
146
  """RPC-wrapper decorator.
147

148
  When applied to a function, it runs it with the RPC system
149
  initialized, and it shutsdown the system afterwards. This means the
150
  function must be called without RPC being initialized.
151

152
  """
153
  def wrapper(*args, **kwargs):
154
    Init()
155
    try:
156
      return fn(*args, **kwargs)
157
    finally:
158
      Shutdown()
159
  return wrapper
160

    
161

    
162
def _Compress(data):
163
  """Compresses a string for transport over RPC.
164

165
  Small amounts of data are not compressed.
166

167
  @type data: str
168
  @param data: Data
169
  @rtype: tuple
170
  @return: Encoded data to send
171

172
  """
173
  # Small amounts of data are not compressed
174
  if len(data) < 512:
175
    return (constants.RPC_ENCODING_NONE, data)
176

    
177
  # Compress with zlib and encode in base64
178
  return (constants.RPC_ENCODING_ZLIB_BASE64,
179
          base64.b64encode(zlib.compress(data, 3)))
180

    
181

    
182
class RpcResult(object):
183
  """RPC Result class.
184

185
  This class holds an RPC result. It is needed since in multi-node
186
  calls we can't raise an exception just because one one out of many
187
  failed, and therefore we use this class to encapsulate the result.
188

189
  @ivar data: the data payload, for successful results, or None
190
  @ivar call: the name of the RPC call
191
  @ivar node: the name of the node to which we made the call
192
  @ivar offline: whether the operation failed because the node was
193
      offline, as opposed to actual failure; offline=True will always
194
      imply failed=True, in order to allow simpler checking if
195
      the user doesn't care about the exact failure mode
196
  @ivar fail_msg: the error message if the call failed
197

198
  """
199
  def __init__(self, data=None, failed=False, offline=False,
200
               call=None, node=None):
201
    self.offline = offline
202
    self.call = call
203
    self.node = node
204

    
205
    if offline:
206
      self.fail_msg = "Node is marked offline"
207
      self.data = self.payload = None
208
    elif failed:
209
      self.fail_msg = self._EnsureErr(data)
210
      self.data = self.payload = None
211
    else:
212
      self.data = data
213
      if not isinstance(self.data, (tuple, list)):
214
        self.fail_msg = ("RPC layer error: invalid result type (%s)" %
215
                         type(self.data))
216
        self.payload = None
217
      elif len(data) != 2:
218
        self.fail_msg = ("RPC layer error: invalid result length (%d), "
219
                         "expected 2" % len(self.data))
220
        self.payload = None
221
      elif not self.data[0]:
222
        self.fail_msg = self._EnsureErr(self.data[1])
223
        self.payload = None
224
      else:
225
        # finally success
226
        self.fail_msg = None
227
        self.payload = data[1]
228

    
229
    for attr_name in ["call", "data", "fail_msg",
230
                      "node", "offline", "payload"]:
231
      assert hasattr(self, attr_name), "Missing attribute %s" % attr_name
232

    
233
  @staticmethod
234
  def _EnsureErr(val):
235
    """Helper to ensure we return a 'True' value for error."""
236
    if val:
237
      return val
238
    else:
239
      return "No error information"
240

    
241
  def Raise(self, msg, prereq=False, ecode=None):
242
    """If the result has failed, raise an OpExecError.
243

244
    This is used so that LU code doesn't have to check for each
245
    result, but instead can call this function.
246

247
    """
248
    if not self.fail_msg:
249
      return
250

    
251
    if not msg: # one could pass None for default message
252
      msg = ("Call '%s' to node '%s' has failed: %s" %
253
             (self.call, self.node, self.fail_msg))
254
    else:
255
      msg = "%s: %s" % (msg, self.fail_msg)
256
    if prereq:
257
      ec = errors.OpPrereqError
258
    else:
259
      ec = errors.OpExecError
260
    if ecode is not None:
261
      args = (msg, ecode)
262
    else:
263
      args = (msg, )
264
    raise ec(*args) # pylint: disable=W0142
265

    
266

    
267
def _SsconfResolver(node_list,
268
                    ssc=ssconf.SimpleStore,
269
                    nslookup_fn=netutils.Hostname.GetIP):
270
  """Return addresses for given node names.
271

272
  @type node_list: list
273
  @param node_list: List of node names
274
  @type ssc: class
275
  @param ssc: SimpleStore class that is used to obtain node->ip mappings
276
  @type nslookup_fn: callable
277
  @param nslookup_fn: function use to do NS lookup
278
  @rtype: list of tuple; (string, string)
279
  @return: List of tuples containing node name and IP address
280

281
  """
282
  ss = ssc()
283
  iplist = ss.GetNodePrimaryIPList()
284
  family = ss.GetPrimaryIPFamily()
285
  ipmap = dict(entry.split() for entry in iplist)
286

    
287
  result = []
288
  for node in node_list:
289
    ip = ipmap.get(node)
290
    if ip is None:
291
      ip = nslookup_fn(node, family=family)
292
    result.append((node, ip))
293

    
294
  return result
295

    
296

    
297
class _StaticResolver:
298
  def __init__(self, addresses):
299
    """Initializes this class.
300

301
    """
302
    self._addresses = addresses
303

    
304
  def __call__(self, hosts):
305
    """Returns static addresses for hosts.
306

307
    """
308
    assert len(hosts) == len(self._addresses)
309
    return zip(hosts, self._addresses)
310

    
311

    
312
def _CheckConfigNode(name, node):
313
  """Checks if a node is online.
314

315
  @type name: string
316
  @param name: Node name
317
  @type node: L{objects.Node} or None
318
  @param node: Node object
319

320
  """
321
  if node is None:
322
    # Depend on DNS for name resolution
323
    ip = name
324
  elif node.offline:
325
    ip = _OFFLINE
326
  else:
327
    ip = node.primary_ip
328
  return (name, ip)
329

    
330

    
331
def _NodeConfigResolver(single_node_fn, all_nodes_fn, hosts):
332
  """Calculate node addresses using configuration.
333

334
  """
335
  # Special case for single-host lookups
336
  if len(hosts) == 1:
337
    (name, ) = hosts
338
    return [_CheckConfigNode(name, single_node_fn(name))]
339
  else:
340
    all_nodes = all_nodes_fn()
341
    return [_CheckConfigNode(name, all_nodes.get(name, None))
342
            for name in hosts]
343

    
344

    
345
class _RpcProcessor:
346
  def __init__(self, resolver, port, lock_monitor_cb=None):
347
    """Initializes this class.
348

349
    @param resolver: callable accepting a list of hostnames, returning a list
350
      of tuples containing name and IP address (IP address can be the name or
351
      the special value L{_OFFLINE} to mark offline machines)
352
    @type port: int
353
    @param port: TCP port
354
    @param lock_monitor_cb: Callable for registering with lock monitor
355

356
    """
357
    self._resolver = resolver
358
    self._port = port
359
    self._lock_monitor_cb = lock_monitor_cb
360

    
361
  @staticmethod
362
  def _PrepareRequests(hosts, port, procedure, body, read_timeout):
363
    """Prepares requests by sorting offline hosts into separate list.
364

365
    """
366
    results = {}
367
    requests = {}
368

    
369
    for (name, ip) in hosts:
370
      if ip is _OFFLINE:
371
        # Node is marked as offline
372
        results[name] = RpcResult(node=name, offline=True, call=procedure)
373
      else:
374
        requests[name] = \
375
          http.client.HttpClientRequest(str(ip), port,
376
                                        http.HTTP_PUT, str("/%s" % procedure),
377
                                        headers=_RPC_CLIENT_HEADERS,
378
                                        post_data=body,
379
                                        read_timeout=read_timeout,
380
                                        nicename="%s/%s" % (name, procedure),
381
                                        curl_config_fn=_ConfigRpcCurl)
382

    
383
    return (results, requests)
384

    
385
  @staticmethod
386
  def _CombineResults(results, requests, procedure):
387
    """Combines pre-computed results for offline hosts with actual call results.
388

389
    """
390
    for name, req in requests.items():
391
      if req.success and req.resp_status_code == http.HTTP_OK:
392
        host_result = RpcResult(data=serializer.LoadJson(req.resp_body),
393
                                node=name, call=procedure)
394
      else:
395
        # TODO: Better error reporting
396
        if req.error:
397
          msg = req.error
398
        else:
399
          msg = req.resp_body
400

    
401
        logging.error("RPC error in %s on node %s: %s", procedure, name, msg)
402
        host_result = RpcResult(data=msg, failed=True, node=name,
403
                                call=procedure)
404

    
405
      results[name] = host_result
406

    
407
    return results
408

    
409
  def __call__(self, hosts, procedure, body, read_timeout=None,
410
               _req_process_fn=http.client.ProcessRequests):
411
    """Makes an RPC request to a number of nodes.
412

413
    @type hosts: sequence
414
    @param hosts: Hostnames
415
    @type procedure: string
416
    @param procedure: Request path
417
    @type body: string
418
    @param body: Request body
419
    @type read_timeout: int or None
420
    @param read_timeout: Read timeout for request
421

422
    """
423
    if read_timeout is None:
424
      read_timeout = _TIMEOUTS.get(procedure, None)
425

    
426
    assert read_timeout is not None, \
427
      "Missing RPC read timeout for procedure '%s'" % procedure
428

    
429
    (results, requests) = \
430
      self._PrepareRequests(self._resolver(hosts), self._port, procedure,
431
                            str(body), read_timeout)
432

    
433
    _req_process_fn(requests.values(), lock_monitor_cb=self._lock_monitor_cb)
434

    
435
    assert not frozenset(results).intersection(requests)
436

    
437
    return self._CombineResults(results, requests, procedure)
438

    
439

    
440
def _EncodeImportExportIO(ieio, ieioargs):
441
  """Encodes import/export I/O information.
442

443
  """
444
  if ieio == constants.IEIO_RAW_DISK:
445
    assert len(ieioargs) == 1
446
    return (ieioargs[0].ToDict(), )
447

    
448
  if ieio == constants.IEIO_SCRIPT:
449
    assert len(ieioargs) == 2
450
    return (ieioargs[0].ToDict(), ieioargs[1])
451

    
452
  return ieioargs
453

    
454

    
455
class RpcRunner(_generated_rpc.RpcClientDefault):
456
  """RPC runner class.
457

458
  """
459
  def __init__(self, context):
460
    """Initialized the RPC runner.
461

462
    @type context: C{masterd.GanetiContext}
463
    @param context: Ganeti context
464

465
    """
466
    _generated_rpc.RpcClientDefault.__init__(self)
467

    
468
    self._cfg = context.cfg
469
    self._proc = _RpcProcessor(compat.partial(_NodeConfigResolver,
470
                                              self._cfg.GetNodeInfo,
471
                                              self._cfg.GetAllNodesInfo),
472
                               netutils.GetDaemonPort(constants.NODED),
473
                               lock_monitor_cb=context.glm.AddToLockMonitor)
474

    
475
  def _InstDict(self, instance, hvp=None, bep=None, osp=None):
476
    """Convert the given instance to a dict.
477

478
    This is done via the instance's ToDict() method and additionally
479
    we fill the hvparams with the cluster defaults.
480

481
    @type instance: L{objects.Instance}
482
    @param instance: an Instance object
483
    @type hvp: dict or None
484
    @param hvp: a dictionary with overridden hypervisor parameters
485
    @type bep: dict or None
486
    @param bep: a dictionary with overridden backend parameters
487
    @type osp: dict or None
488
    @param osp: a dictionary with overridden os parameters
489
    @rtype: dict
490
    @return: the instance dict, with the hvparams filled with the
491
        cluster defaults
492

493
    """
494
    idict = instance.ToDict()
495
    cluster = self._cfg.GetClusterInfo()
496
    idict["hvparams"] = cluster.FillHV(instance)
497
    if hvp is not None:
498
      idict["hvparams"].update(hvp)
499
    idict["beparams"] = cluster.FillBE(instance)
500
    if bep is not None:
501
      idict["beparams"].update(bep)
502
    idict["osparams"] = cluster.SimpleFillOS(instance.os, instance.osparams)
503
    if osp is not None:
504
      idict["osparams"].update(osp)
505
    for nic in idict["nics"]:
506
      nic['nicparams'] = objects.FillDict(
507
        cluster.nicparams[constants.PP_DEFAULT],
508
        nic['nicparams'])
509
    return idict
510

    
511
  def _MultiNodeCall(self, node_list, procedure, args, read_timeout=None):
512
    """Helper for making a multi-node call
513

514
    """
515
    body = serializer.DumpJson(args, indent=False)
516
    return self._proc(node_list, procedure, body, read_timeout=read_timeout)
517

    
518
  def _Call(self, node_list, procedure, timeout, args):
519
    """Entry point for automatically generated RPC wrappers.
520

521
    """
522
    return self._MultiNodeCall(node_list, procedure, args, read_timeout=timeout)
523

    
524
  @staticmethod
525
  def _StaticMultiNodeCall(node_list, procedure, args,
526
                           address_list=None, read_timeout=None):
527
    """Helper for making a multi-node static call
528

529
    """
530
    body = serializer.DumpJson(args, indent=False)
531

    
532
    if address_list is None:
533
      resolver = _SsconfResolver
534
    else:
535
      # Caller provided an address list
536
      resolver = _StaticResolver(address_list)
537

    
538
    proc = _RpcProcessor(resolver,
539
                         netutils.GetDaemonPort(constants.NODED))
540
    return proc(node_list, procedure, body, read_timeout=read_timeout)
541

    
542
  def _SingleNodeCall(self, node, procedure, args, read_timeout=None):
543
    """Helper for making a single-node call
544

545
    """
546
    body = serializer.DumpJson(args, indent=False)
547
    return self._proc([node], procedure, body, read_timeout=read_timeout)[node]
548

    
549
  @classmethod
550
  def _StaticSingleNodeCall(cls, node, procedure, args, read_timeout=None):
551
    """Helper for making a single-node static call
552

553
    """
554
    body = serializer.DumpJson(args, indent=False)
555
    proc = _RpcProcessor(_SsconfResolver,
556
                         netutils.GetDaemonPort(constants.NODED))
557
    return proc([node], procedure, body, read_timeout=read_timeout)[node]
558

    
559
  @staticmethod
560
  def _BlockdevFindPostProc(result):
561
    if not result.fail_msg and result.payload is not None:
562
      result.payload = objects.BlockDevStatus.FromDict(result.payload)
563
    return result
564

    
565
  @staticmethod
566
  def _BlockdevGetMirrorStatusPostProc(result):
567
    if not result.fail_msg:
568
      result.payload = [objects.BlockDevStatus.FromDict(i)
569
                        for i in result.payload]
570
    return result
571

    
572
  @staticmethod
573
  def _BlockdevGetMirrorStatusMultiPostProc(result):
574
    for nres in result.values():
575
      if nres.fail_msg:
576
        continue
577

    
578
      for idx, (success, status) in enumerate(nres.payload):
579
        if success:
580
          nres.payload[idx] = (success, objects.BlockDevStatus.FromDict(status))
581

    
582
    return result
583

    
584
  @staticmethod
585
  def _OsGetPostProc(result):
586
    if not result.fail_msg and isinstance(result.payload, dict):
587
      result.payload = objects.OS.FromDict(result.payload)
588
    return result
589

    
590
  @staticmethod
591
  def _PrepareFinalizeExportDisks(snap_disks):
592
    flat_disks = []
593

    
594
    for disk in snap_disks:
595
      if isinstance(disk, bool):
596
        flat_disks.append(disk)
597
      else:
598
        flat_disks.append(disk.ToDict())
599

    
600
    return flat_disks
601

    
602
  @staticmethod
603
  def _ImpExpStatusPostProc(result):
604
    """Post-processor for import/export status.
605

606
    @rtype: Payload containing list of L{objects.ImportExportStatus} instances
607
    @return: Returns a list of the state of each named import/export or None if
608
             a status couldn't be retrieved
609

610
    """
611
    if not result.fail_msg:
612
      decoded = []
613

    
614
      for i in result.payload:
615
        if i is None:
616
          decoded.append(None)
617
          continue
618
        decoded.append(objects.ImportExportStatus.FromDict(i))
619

    
620
      result.payload = decoded
621

    
622
    return result
623

    
624
  #
625
  # Begin RPC calls
626
  #
627

    
628
  @_RpcTimeout(_TMO_URGENT)
629
  def call_bdev_sizes(self, node_list, devices):
630
    """Gets the sizes of requested block devices present on a node
631

632
    This is a multi-node call.
633

634
    """
635
    return self._MultiNodeCall(node_list, "bdev_sizes", [devices])
636

    
637
  @_RpcTimeout(_TMO_URGENT)
638
  def call_lv_list(self, node_list, vg_name):
639
    """Gets the logical volumes present in a given volume group.
640

641
    This is a multi-node call.
642

643
    """
644
    return self._MultiNodeCall(node_list, "lv_list", [vg_name])
645

    
646
  @_RpcTimeout(_TMO_URGENT)
647
  def call_vg_list(self, node_list):
648
    """Gets the volume group list.
649

650
    This is a multi-node call.
651

652
    """
653
    return self._MultiNodeCall(node_list, "vg_list", [])
654

    
655
  @_RpcTimeout(_TMO_NORMAL)
656
  def call_storage_list(self, node_list, su_name, su_args, name, fields):
657
    """Get list of storage units.
658

659
    This is a multi-node call.
660

661
    """
662
    return self._MultiNodeCall(node_list, "storage_list",
663
                               [su_name, su_args, name, fields])
664

    
665
  @_RpcTimeout(_TMO_NORMAL)
666
  def call_storage_modify(self, node, su_name, su_args, name, changes):
667
    """Modify a storage unit.
668

669
    This is a single-node call.
670

671
    """
672
    return self._SingleNodeCall(node, "storage_modify",
673
                                [su_name, su_args, name, changes])
674

    
675
  @_RpcTimeout(_TMO_NORMAL)
676
  def call_storage_execute(self, node, su_name, su_args, name, op):
677
    """Executes an operation on a storage unit.
678

679
    This is a single-node call.
680

681
    """
682
    return self._SingleNodeCall(node, "storage_execute",
683
                                [su_name, su_args, name, op])
684

    
685
  @_RpcTimeout(_TMO_URGENT)
686
  def call_bridges_exist(self, node, bridges_list):
687
    """Checks if a node has all the bridges given.
688

689
    This method checks if all bridges given in the bridges_list are
690
    present on the remote node, so that an instance that uses interfaces
691
    on those bridges can be started.
692

693
    This is a single-node call.
694

695
    """
696
    return self._SingleNodeCall(node, "bridges_exist", [bridges_list])
697

    
698
  @_RpcTimeout(_TMO_NORMAL)
699
  def call_instance_start(self, node, instance, hvp, bep, startup_paused):
700
    """Starts an instance.
701

702
    This is a single-node call.
703

704
    """
705
    idict = self._InstDict(instance, hvp=hvp, bep=bep)
706
    return self._SingleNodeCall(node, "instance_start", [idict, startup_paused])
707

    
708
  @_RpcTimeout(_TMO_NORMAL)
709
  def call_instance_shutdown(self, node, instance, timeout):
710
    """Stops an instance.
711

712
    This is a single-node call.
713

714
    """
715
    return self._SingleNodeCall(node, "instance_shutdown",
716
                                [self._InstDict(instance), timeout])
717

    
718
  @_RpcTimeout(_TMO_NORMAL)
719
  def call_migration_info(self, node, instance):
720
    """Gather the information necessary to prepare an instance migration.
721

722
    This is a single-node call.
723

724
    @type node: string
725
    @param node: the node on which the instance is currently running
726
    @type instance: C{objects.Instance}
727
    @param instance: the instance definition
728

729
    """
730
    return self._SingleNodeCall(node, "migration_info",
731
                                [self._InstDict(instance)])
732

    
733
  @_RpcTimeout(_TMO_NORMAL)
734
  def call_accept_instance(self, node, instance, info, target):
735
    """Prepare a node to accept an instance.
736

737
    This is a single-node call.
738

739
    @type node: string
740
    @param node: the target node for the migration
741
    @type instance: C{objects.Instance}
742
    @param instance: the instance definition
743
    @type info: opaque/hypervisor specific (string/data)
744
    @param info: result for the call_migration_info call
745
    @type target: string
746
    @param target: target hostname (usually ip address) (on the node itself)
747

748
    """
749
    return self._SingleNodeCall(node, "accept_instance",
750
                                [self._InstDict(instance), info, target])
751

    
752
  @_RpcTimeout(_TMO_NORMAL)
753
  def call_instance_finalize_migration_dst(self, node, instance, info, success):
754
    """Finalize any target-node migration specific operation.
755

756
    This is called both in case of a successful migration and in case of error
757
    (in which case it should abort the migration).
758

759
    This is a single-node call.
760

761
    @type node: string
762
    @param node: the target node for the migration
763
    @type instance: C{objects.Instance}
764
    @param instance: the instance definition
765
    @type info: opaque/hypervisor specific (string/data)
766
    @param info: result for the call_migration_info call
767
    @type success: boolean
768
    @param success: whether the migration was a success or a failure
769

770
    """
771
    return self._SingleNodeCall(node, "instance_finalize_migration_dst",
772
                                [self._InstDict(instance), info, success])
773

    
774
  @_RpcTimeout(_TMO_SLOW)
775
  def call_instance_migrate(self, node, instance, target, live):
776
    """Migrate an instance.
777

778
    This is a single-node call.
779

780
    @type node: string
781
    @param node: the node on which the instance is currently running
782
    @type instance: C{objects.Instance}
783
    @param instance: the instance definition
784
    @type target: string
785
    @param target: the target node name
786
    @type live: boolean
787
    @param live: whether the migration should be done live or not (the
788
        interpretation of this parameter is left to the hypervisor)
789

790
    """
791
    return self._SingleNodeCall(node, "instance_migrate",
792
                                [self._InstDict(instance), target, live])
793

    
794
  @_RpcTimeout(_TMO_SLOW)
795
  def call_instance_finalize_migration_src(self, node, instance, success, live):
796
    """Finalize the instance migration on the source node.
797

798
    This is a single-node call.
799

800
    @type instance: L{objects.Instance}
801
    @param instance: the instance that was migrated
802
    @type success: bool
803
    @param success: whether the migration succeeded or not
804
    @type live: bool
805
    @param live: whether the user requested a live migration or not
806

807
    """
808
    return self._SingleNodeCall(node, "instance_finalize_migration_src",
809
                                [self._InstDict(instance), success, live])
810

    
811
  @_RpcTimeout(_TMO_SLOW)
812
  def call_instance_get_migration_status(self, node, instance):
813
    """Report migration status.
814

815
    This is a single-node call that must be executed on the source node.
816

817
    @type instance: L{objects.Instance}
818
    @param instance: the instance that is being migrated
819
    @rtype: L{objects.MigrationStatus}
820
    @return: the status of the current migration (one of
821
             L{constants.HV_MIGRATION_VALID_STATUSES}), plus any additional
822
             progress info that can be retrieved from the hypervisor
823

824
    """
825
    result = self._SingleNodeCall(node, "instance_get_migration_status",
826
                                  [self._InstDict(instance)])
827
    if not result.fail_msg and result.payload is not None:
828
      result.payload = objects.MigrationStatus.FromDict(result.payload)
829
    return result
830

    
831
  @_RpcTimeout(_TMO_NORMAL)
832
  def call_instance_reboot(self, node, inst, reboot_type, shutdown_timeout):
833
    """Reboots an instance.
834

835
    This is a single-node call.
836

837
    """
838
    return self._SingleNodeCall(node, "instance_reboot",
839
                                [self._InstDict(inst), reboot_type,
840
                                 shutdown_timeout])
841

    
842
  @_RpcTimeout(_TMO_1DAY)
843
  def call_instance_os_add(self, node, inst, reinstall, debug, osparams=None):
844
    """Installs an OS on the given instance.
845

846
    This is a single-node call.
847

848
    """
849
    return self._SingleNodeCall(node, "instance_os_add",
850
                                [self._InstDict(inst, osp=osparams),
851
                                 reinstall, debug])
852

    
853
  @_RpcTimeout(_TMO_SLOW)
854
  def call_instance_run_rename(self, node, inst, old_name, debug):
855
    """Run the OS rename script for an instance.
856

857
    This is a single-node call.
858

859
    """
860
    return self._SingleNodeCall(node, "instance_run_rename",
861
                                [self._InstDict(inst), old_name, debug])
862

    
863
  @_RpcTimeout(_TMO_URGENT)
864
  def call_instance_info(self, node, instance, hname):
865
    """Returns information about a single instance.
866

867
    This is a single-node call.
868

869
    @type node: list
870
    @param node: the list of nodes to query
871
    @type instance: string
872
    @param instance: the instance name
873
    @type hname: string
874
    @param hname: the hypervisor type of the instance
875

876
    """
877
    return self._SingleNodeCall(node, "instance_info", [instance, hname])
878

    
879
  @_RpcTimeout(_TMO_NORMAL)
880
  def call_instance_migratable(self, node, instance):
881
    """Checks whether the given instance can be migrated.
882

883
    This is a single-node call.
884

885
    @param node: the node to query
886
    @type instance: L{objects.Instance}
887
    @param instance: the instance to check
888

889

890
    """
891
    return self._SingleNodeCall(node, "instance_migratable",
892
                                [self._InstDict(instance)])
893

    
894
  @_RpcTimeout(_TMO_URGENT)
895
  def call_all_instances_info(self, node_list, hypervisor_list):
896
    """Returns information about all instances on the given nodes.
897

898
    This is a multi-node call.
899

900
    @type node_list: list
901
    @param node_list: the list of nodes to query
902
    @type hypervisor_list: list
903
    @param hypervisor_list: the hypervisors to query for instances
904

905
    """
906
    return self._MultiNodeCall(node_list, "all_instances_info",
907
                               [hypervisor_list])
908

    
909
  @_RpcTimeout(_TMO_URGENT)
910
  def call_instance_list(self, node_list, hypervisor_list):
911
    """Returns the list of running instances on a given node.
912

913
    This is a multi-node call.
914

915
    @type node_list: list
916
    @param node_list: the list of nodes to query
917
    @type hypervisor_list: list
918
    @param hypervisor_list: the hypervisors to query for instances
919

920
    """
921
    return self._MultiNodeCall(node_list, "instance_list", [hypervisor_list])
922

    
923
  @_RpcTimeout(_TMO_NORMAL)
924
  def call_etc_hosts_modify(self, node, mode, name, ip):
925
    """Modify hosts file with name
926

927
    @type node: string
928
    @param node: The node to call
929
    @type mode: string
930
    @param mode: The mode to operate. Currently "add" or "remove"
931
    @type name: string
932
    @param name: The host name to be modified
933
    @type ip: string
934
    @param ip: The ip of the entry (just valid if mode is "add")
935

936
    """
937
    return self._SingleNodeCall(node, "etc_hosts_modify", [mode, name, ip])
938

    
939
  @classmethod
940
  @_RpcTimeout(_TMO_FAST)
941
  def call_node_start_master_daemons(cls, node, no_voting):
942
    """Starts master daemons on a node.
943

944
    This is a single-node call.
945

946
    """
947
    return cls._StaticSingleNodeCall(node, "node_start_master_daemons",
948
                                     [no_voting])
949

    
950
  @classmethod
951
  @_RpcTimeout(_TMO_FAST)
952
  def call_node_activate_master_ip(cls, node):
953
    """Activates master IP on a node.
954

955
    This is a single-node call.
956

957
    """
958
    return cls._StaticSingleNodeCall(node, "node_activate_master_ip", [])
959

    
960
  @classmethod
961
  @_RpcTimeout(_TMO_FAST)
962
  def call_node_stop_master(cls, node):
963
    """Deactivates master IP and stops master daemons on a node.
964

965
    This is a single-node call.
966

967
    """
968
    return cls._StaticSingleNodeCall(node, "node_stop_master", [])
969

    
970
  @classmethod
971
  @_RpcTimeout(_TMO_FAST)
972
  def call_node_deactivate_master_ip(cls, node):
973
    """Deactivates master IP on a node.
974

975
    This is a single-node call.
976

977
    """
978
    return cls._StaticSingleNodeCall(node, "node_deactivate_master_ip", [])
979

    
980
  @classmethod
981
  @_RpcTimeout(_TMO_FAST)
982
  def call_node_change_master_netmask(cls, node, netmask):
983
    """Change master IP netmask.
984

985
    This is a single-node call.
986

987
    """
988
    return cls._StaticSingleNodeCall(node, "node_change_master_netmask",
989
                  [netmask])
990

    
991
  @classmethod
992
  @_RpcTimeout(_TMO_URGENT)
993
  def call_master_info(cls, node_list):
994
    """Query master info.
995

996
    This is a multi-node call.
997

998
    """
999
    # TODO: should this method query down nodes?
1000
    return cls._StaticMultiNodeCall(node_list, "master_info", [])
1001

    
1002
  @classmethod
1003
  @_RpcTimeout(_TMO_URGENT)
1004
  def call_version(cls, node_list):
1005
    """Query node version.
1006

1007
    This is a multi-node call.
1008

1009
    """
1010
    return cls._StaticMultiNodeCall(node_list, "version", [])
1011

    
1012
  @_RpcTimeout(_TMO_NORMAL)
1013
  def call_blockdev_create(self, node, bdev, size, owner, on_primary, info):
1014
    """Request creation of a given block device.
1015

1016
    This is a single-node call.
1017

1018
    """
1019
    return self._SingleNodeCall(node, "blockdev_create",
1020
                                [bdev.ToDict(), size, owner, on_primary, info])
1021

    
1022
  @_RpcTimeout(_TMO_SLOW)
1023
  def call_blockdev_wipe(self, node, bdev, offset, size):
1024
    """Request wipe at given offset with given size of a block device.
1025

1026
    This is a single-node call.
1027

1028
    """
1029
    return self._SingleNodeCall(node, "blockdev_wipe",
1030
                                [bdev.ToDict(), offset, size])
1031

    
1032
  @_RpcTimeout(_TMO_NORMAL)
1033
  def call_blockdev_remove(self, node, bdev):
1034
    """Request removal of a given block device.
1035

1036
    This is a single-node call.
1037

1038
    """
1039
    return self._SingleNodeCall(node, "blockdev_remove", [bdev.ToDict()])
1040

    
1041
  @_RpcTimeout(_TMO_NORMAL)
1042
  def call_blockdev_rename(self, node, devlist):
1043
    """Request rename of the given block devices.
1044

1045
    This is a single-node call.
1046

1047
    """
1048
    return self._SingleNodeCall(node, "blockdev_rename",
1049
                                [[(d.ToDict(), uid) for d, uid in devlist]])
1050

    
1051
  @_RpcTimeout(_TMO_NORMAL)
1052
  def call_blockdev_pause_resume_sync(self, node, disks, pause):
1053
    """Request a pause/resume of given block device.
1054

1055
    This is a single-node call.
1056

1057
    """
1058
    return self._SingleNodeCall(node, "blockdev_pause_resume_sync",
1059
                                [[bdev.ToDict() for bdev in disks], pause])
1060

    
1061
  @_RpcTimeout(_TMO_NORMAL)
1062
  def call_blockdev_assemble(self, node, disk, owner, on_primary, idx):
1063
    """Request assembling of a given block device.
1064

1065
    This is a single-node call.
1066

1067
    """
1068
    return self._SingleNodeCall(node, "blockdev_assemble",
1069
                                [disk.ToDict(), owner, on_primary, idx])
1070

    
1071
  @_RpcTimeout(_TMO_NORMAL)
1072
  def call_blockdev_shutdown(self, node, disk):
1073
    """Request shutdown of a given block device.
1074

1075
    This is a single-node call.
1076

1077
    """
1078
    return self._SingleNodeCall(node, "blockdev_shutdown", [disk.ToDict()])
1079

    
1080
  @_RpcTimeout(_TMO_NORMAL)
1081
  def call_blockdev_addchildren(self, node, bdev, ndevs):
1082
    """Request adding a list of children to a (mirroring) device.
1083

1084
    This is a single-node call.
1085

1086
    """
1087
    return self._SingleNodeCall(node, "blockdev_addchildren",
1088
                                [bdev.ToDict(),
1089
                                 [disk.ToDict() for disk in ndevs]])
1090

    
1091
  @_RpcTimeout(_TMO_NORMAL)
1092
  def call_blockdev_removechildren(self, node, bdev, ndevs):
1093
    """Request removing a list of children from a (mirroring) device.
1094

1095
    This is a single-node call.
1096

1097
    """
1098
    return self._SingleNodeCall(node, "blockdev_removechildren",
1099
                                [bdev.ToDict(),
1100
                                 [disk.ToDict() for disk in ndevs]])
1101

    
1102
  @_RpcTimeout(_TMO_NORMAL)
1103
  def call_blockdev_getmirrorstatus(self, node, disks):
1104
    """Request status of a (mirroring) device.
1105

1106
    This is a single-node call.
1107

1108
    """
1109
    result = self._SingleNodeCall(node, "blockdev_getmirrorstatus",
1110
                                  [dsk.ToDict() for dsk in disks])
1111
    if not result.fail_msg:
1112
      result.payload = [objects.BlockDevStatus.FromDict(i)
1113
                        for i in result.payload]
1114
    return result
1115

    
1116
  @_RpcTimeout(_TMO_NORMAL)
1117
  def call_blockdev_getmirrorstatus_multi(self, node_list, node_disks):
1118
    """Request status of (mirroring) devices from multiple nodes.
1119

1120
    This is a multi-node call.
1121

1122
    """
1123
    result = self._MultiNodeCall(node_list, "blockdev_getmirrorstatus_multi",
1124
                                 [dict((name, [dsk.ToDict() for dsk in disks])
1125
                                       for name, disks in node_disks.items())])
1126
    for nres in result.values():
1127
      if nres.fail_msg:
1128
        continue
1129

    
1130
      for idx, (success, status) in enumerate(nres.payload):
1131
        if success:
1132
          nres.payload[idx] = (success, objects.BlockDevStatus.FromDict(status))
1133

    
1134
    return result
1135

    
1136
  @_RpcTimeout(_TMO_NORMAL)
1137
  def call_blockdev_find(self, node, disk):
1138
    """Request identification of a given block device.
1139

1140
    This is a single-node call.
1141

1142
    """
1143
    result = self._SingleNodeCall(node, "blockdev_find", [disk.ToDict()])
1144
    if not result.fail_msg and result.payload is not None:
1145
      result.payload = objects.BlockDevStatus.FromDict(result.payload)
1146
    return result
1147

    
1148
  @_RpcTimeout(_TMO_NORMAL)
1149
  def call_blockdev_close(self, node, instance_name, disks):
1150
    """Closes the given block devices.
1151

1152
    This is a single-node call.
1153

1154
    """
1155
    params = [instance_name, [cf.ToDict() for cf in disks]]
1156
    return self._SingleNodeCall(node, "blockdev_close", params)
1157

    
1158
  @_RpcTimeout(_TMO_NORMAL)
1159
  def call_blockdev_getsize(self, node, disks):
1160
    """Returns the size of the given disks.
1161

1162
    This is a single-node call.
1163

1164
    """
1165
    params = [[cf.ToDict() for cf in disks]]
1166
    return self._SingleNodeCall(node, "blockdev_getsize", params)
1167

    
1168
  @_RpcTimeout(_TMO_NORMAL)
1169
  def call_drbd_disconnect_net(self, node_list, nodes_ip, disks):
1170
    """Disconnects the network of the given drbd devices.
1171

1172
    This is a multi-node call.
1173

1174
    """
1175
    return self._MultiNodeCall(node_list, "drbd_disconnect_net",
1176
                               [nodes_ip, [cf.ToDict() for cf in disks]])
1177

    
1178
  @_RpcTimeout(_TMO_NORMAL)
1179
  def call_drbd_attach_net(self, node_list, nodes_ip,
1180
                           disks, instance_name, multimaster):
1181
    """Disconnects the given drbd devices.
1182

1183
    This is a multi-node call.
1184

1185
    """
1186
    return self._MultiNodeCall(node_list, "drbd_attach_net",
1187
                               [nodes_ip, [cf.ToDict() for cf in disks],
1188
                                instance_name, multimaster])
1189

    
1190
  @_RpcTimeout(_TMO_SLOW)
1191
  def call_drbd_wait_sync(self, node_list, nodes_ip, disks):
1192
    """Waits for the synchronization of drbd devices is complete.
1193

1194
    This is a multi-node call.
1195

1196
    """
1197
    return self._MultiNodeCall(node_list, "drbd_wait_sync",
1198
                               [nodes_ip, [cf.ToDict() for cf in disks]])
1199

    
1200
  @_RpcTimeout(_TMO_URGENT)
1201
  def call_drbd_helper(self, node_list):
1202
    """Gets drbd helper.
1203

1204
    This is a multi-node call.
1205

1206
    """
1207
    return self._MultiNodeCall(node_list, "drbd_helper", [])
1208

    
1209
  @classmethod
1210
  @_RpcTimeout(_TMO_NORMAL)
1211
  def call_upload_file(cls, node_list, file_name, address_list=None):
1212
    """Upload a file.
1213

1214
    The node will refuse the operation in case the file is not on the
1215
    approved file list.
1216

1217
    This is a multi-node call.
1218

1219
    @type node_list: list
1220
    @param node_list: the list of node names to upload to
1221
    @type file_name: str
1222
    @param file_name: the filename to upload
1223
    @type address_list: list or None
1224
    @keyword address_list: an optional list of node addresses, in order
1225
        to optimize the RPC speed
1226

1227
    """
1228
    file_contents = utils.ReadFile(file_name)
1229
    data = _Compress(file_contents)
1230
    st = os.stat(file_name)
1231
    getents = runtime.GetEnts()
1232
    params = [file_name, data, st.st_mode, getents.LookupUid(st.st_uid),
1233
              getents.LookupGid(st.st_gid), st.st_atime, st.st_mtime]
1234
    return cls._StaticMultiNodeCall(node_list, "upload_file", params,
1235
                                    address_list=address_list)
1236

    
1237
  @classmethod
1238
  @_RpcTimeout(_TMO_NORMAL)
1239
  def call_write_ssconf_files(cls, node_list, values):
1240
    """Write ssconf files.
1241

1242
    This is a multi-node call.
1243

1244
    """
1245
    return cls._StaticMultiNodeCall(node_list, "write_ssconf_files", [values])
1246

    
1247
  @_RpcTimeout(_TMO_NORMAL)
1248
  def call_run_oob(self, node, oob_program, command, remote_node, timeout):
1249
    """Runs OOB.
1250

1251
    This is a single-node call.
1252

1253
    """
1254
    return self._SingleNodeCall(node, "run_oob", [oob_program, command,
1255
                                                  remote_node, timeout])
1256

    
1257
  @_RpcTimeout(_TMO_NORMAL)
1258
  def call_hooks_runner(self, node_list, hpath, phase, env):
1259
    """Call the hooks runner.
1260

1261
    Args:
1262
      - op: the OpCode instance
1263
      - env: a dictionary with the environment
1264

1265
    This is a multi-node call.
1266

1267
    """
1268
    params = [hpath, phase, env]
1269
    return self._MultiNodeCall(node_list, "hooks_runner", params)
1270

    
1271
  @_RpcTimeout(_TMO_NORMAL)
1272
  def call_iallocator_runner(self, node, name, idata):
1273
    """Call an iallocator on a remote node
1274

1275
    Args:
1276
      - name: the iallocator name
1277
      - input: the json-encoded input string
1278

1279
    This is a single-node call.
1280

1281
    """
1282
    return self._SingleNodeCall(node, "iallocator_runner", [name, idata])
1283

    
1284
  @_RpcTimeout(_TMO_NORMAL)
1285
  def call_blockdev_grow(self, node, cf_bdev, amount, dryrun):
1286
    """Request a snapshot of the given block device.
1287

1288
    This is a single-node call.
1289

1290
    """
1291
    return self._SingleNodeCall(node, "blockdev_grow",
1292
                                [cf_bdev.ToDict(), amount, dryrun])
1293

    
1294
  @_RpcTimeout(_TMO_1DAY)
1295
  def call_blockdev_export(self, node, cf_bdev,
1296
                           dest_node, dest_path, cluster_name):
1297
    """Export a given disk to another node.
1298

1299
    This is a single-node call.
1300

1301
    """
1302
    return self._SingleNodeCall(node, "blockdev_export",
1303
                                [cf_bdev.ToDict(), dest_node, dest_path,
1304
                                 cluster_name])
1305

    
1306
  @_RpcTimeout(_TMO_NORMAL)
1307
  def call_blockdev_snapshot(self, node, cf_bdev):
1308
    """Request a snapshot of the given block device.
1309

1310
    This is a single-node call.
1311

1312
    """
1313
    return self._SingleNodeCall(node, "blockdev_snapshot", [cf_bdev.ToDict()])
1314

    
1315
  @classmethod
1316
  @_RpcTimeout(_TMO_NORMAL)
1317
  def call_node_leave_cluster(cls, node, modify_ssh_setup):
1318
    """Requests a node to clean the cluster information it has.
1319

1320
    This will remove the configuration information from the ganeti data
1321
    dir.
1322

1323
    This is a single-node call.
1324

1325
    """
1326
    return cls._StaticSingleNodeCall(node, "node_leave_cluster",
1327
                                     [modify_ssh_setup])
1328

    
1329
  @_RpcTimeout(None)
1330
  def call_test_delay(self, node_list, duration):
1331
    """Sleep for a fixed time on given node(s).
1332

1333
    This is a multi-node call.
1334

1335
    """
1336
    return self._MultiNodeCall(node_list, "test_delay", [duration],
1337
                               read_timeout=int(duration + 5))
1338

    
1339
  @classmethod
1340
  @_RpcTimeout(_TMO_URGENT)
1341
  def call_jobqueue_update(cls, node_list, address_list, file_name, content):
1342
    """Update job queue.
1343

1344
    This is a multi-node call.
1345

1346
    """
1347
    return cls._StaticMultiNodeCall(node_list, "jobqueue_update",
1348
                                    [file_name, _Compress(content)],
1349
                                    address_list=address_list)
1350

    
1351
  @classmethod
1352
  @_RpcTimeout(_TMO_NORMAL)
1353
  def call_jobqueue_purge(cls, node):
1354
    """Purge job queue.
1355

1356
    This is a single-node call.
1357

1358
    """
1359
    return cls._StaticSingleNodeCall(node, "jobqueue_purge", [])
1360

    
1361
  @classmethod
1362
  @_RpcTimeout(_TMO_URGENT)
1363
  def call_jobqueue_rename(cls, node_list, address_list, rename):
1364
    """Rename a job queue file.
1365

1366
    This is a multi-node call.
1367

1368
    """
1369
    return cls._StaticMultiNodeCall(node_list, "jobqueue_rename", rename,
1370
                                    address_list=address_list)
1371

    
1372
  @_RpcTimeout(_TMO_NORMAL)
1373
  def call_hypervisor_validate_params(self, node_list, hvname, hvparams):
1374
    """Validate the hypervisor params.
1375

1376
    This is a multi-node call.
1377

1378
    @type node_list: list
1379
    @param node_list: the list of nodes to query
1380
    @type hvname: string
1381
    @param hvname: the hypervisor name
1382
    @type hvparams: dict
1383
    @param hvparams: the hypervisor parameters to be validated
1384

1385
    """
1386
    cluster = self._cfg.GetClusterInfo()
1387
    hv_full = objects.FillDict(cluster.hvparams.get(hvname, {}), hvparams)
1388
    return self._MultiNodeCall(node_list, "hypervisor_validate_params",
1389
                               [hvname, hv_full])
1390

    
1391
  @_RpcTimeout(_TMO_NORMAL)
1392
  def call_import_start(self, node, opts, instance, component,
1393
                        dest, dest_args):
1394
    """Starts a listener for an import.
1395

1396
    This is a single-node call.
1397

1398
    @type node: string
1399
    @param node: Node name
1400
    @type instance: C{objects.Instance}
1401
    @param instance: Instance object
1402
    @type component: string
1403
    @param component: which part of the instance is being imported
1404

1405
    """
1406
    return self._SingleNodeCall(node, "import_start",
1407
                                [opts.ToDict(),
1408
                                 self._InstDict(instance), component, dest,
1409
                                 _EncodeImportExportIO(dest, dest_args)])
1410

    
1411
  @_RpcTimeout(_TMO_NORMAL)
1412
  def call_export_start(self, node, opts, host, port,
1413
                        instance, component, source, source_args):
1414
    """Starts an export daemon.
1415

1416
    This is a single-node call.
1417

1418
    @type node: string
1419
    @param node: Node name
1420
    @type instance: C{objects.Instance}
1421
    @param instance: Instance object
1422
    @type component: string
1423
    @param component: which part of the instance is being imported
1424

1425
    """
1426
    return self._SingleNodeCall(node, "export_start",
1427
                                [opts.ToDict(), host, port,
1428
                                 self._InstDict(instance),
1429
                                 component, source,
1430
                                 _EncodeImportExportIO(source, source_args)])