Statistics
| Branch: | Tag: | Revision:

root / lib / rpc.py @ 271b7cf9

History | View | Annotate | Download (44.9 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008, 2009, 2010 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Inter-node RPC library.
23

24
"""
25

    
26
# pylint: disable-msg=C0103,R0201,R0904
27
# C0103: Invalid name, since call_ are not valid
28
# R0201: Method could be a function, we keep all rpcs instance methods
29
# as not to change them back and forth between static/instance methods
30
# if they need to start using instance attributes
31
# R0904: Too many public methods
32

    
33
import os
34
import logging
35
import zlib
36
import base64
37
import pycurl
38
import threading
39

    
40
from ganeti import utils
41
from ganeti import objects
42
from ganeti import http
43
from ganeti import serializer
44
from ganeti import constants
45
from ganeti import errors
46
from ganeti import netutils
47
from ganeti import ssconf
48

    
49
# pylint has a bug here, doesn't see this import
50
import ganeti.http.client  # pylint: disable-msg=W0611
51

    
52

    
53
# Timeout for connecting to nodes (seconds)
54
_RPC_CONNECT_TIMEOUT = 5
55

    
56
_RPC_CLIENT_HEADERS = [
57
  "Content-type: %s" % http.HTTP_APP_JSON,
58
  "Expect:",
59
  ]
60

    
61
# Various time constants for the timeout table
62
_TMO_URGENT = 60 # one minute
63
_TMO_FAST = 5 * 60 # five minutes
64
_TMO_NORMAL = 15 * 60 # 15 minutes
65
_TMO_SLOW = 3600 # one hour
66
_TMO_4HRS = 4 * 3600
67
_TMO_1DAY = 86400
68

    
69
# Timeout table that will be built later by decorators
70
# Guidelines for choosing timeouts:
71
# - call used during watcher: timeout -> 1min, _TMO_URGENT
72
# - trivial (but be sure it is trivial) (e.g. reading a file): 5min, _TMO_FAST
73
# - other calls: 15 min, _TMO_NORMAL
74
# - special calls (instance add, etc.): either _TMO_SLOW (1h) or huge timeouts
75

    
76
_TIMEOUTS = {
77
}
78

    
79

    
80
def Init():
81
  """Initializes the module-global HTTP client manager.
82

83
  Must be called before using any RPC function and while exactly one thread is
84
  running.
85

86
  """
87
  # curl_global_init(3) and curl_global_cleanup(3) must be called with only
88
  # one thread running. This check is just a safety measure -- it doesn't
89
  # cover all cases.
90
  assert threading.activeCount() == 1, \
91
         "Found more than one active thread when initializing pycURL"
92

    
93
  logging.info("Using PycURL %s", pycurl.version)
94

    
95
  pycurl.global_init(pycurl.GLOBAL_ALL)
96

    
97

    
98
def Shutdown():
99
  """Stops the module-global HTTP client manager.
100

101
  Must be called before quitting the program and while exactly one thread is
102
  running.
103

104
  """
105
  pycurl.global_cleanup()
106

    
107

    
108
def _ConfigRpcCurl(curl):
109
  noded_cert = str(constants.NODED_CERT_FILE)
110

    
111
  curl.setopt(pycurl.FOLLOWLOCATION, False)
112
  curl.setopt(pycurl.CAINFO, noded_cert)
113
  curl.setopt(pycurl.SSL_VERIFYHOST, 0)
114
  curl.setopt(pycurl.SSL_VERIFYPEER, True)
115
  curl.setopt(pycurl.SSLCERTTYPE, "PEM")
116
  curl.setopt(pycurl.SSLCERT, noded_cert)
117
  curl.setopt(pycurl.SSLKEYTYPE, "PEM")
118
  curl.setopt(pycurl.SSLKEY, noded_cert)
119
  curl.setopt(pycurl.CONNECTTIMEOUT, _RPC_CONNECT_TIMEOUT)
120

    
121

    
122
class _RpcThreadLocal(threading.local):
123
  def GetHttpClientPool(self):
124
    """Returns a per-thread HTTP client pool.
125

126
    @rtype: L{http.client.HttpClientPool}
127

128
    """
129
    try:
130
      pool = self.hcp
131
    except AttributeError:
132
      pool = http.client.HttpClientPool(_ConfigRpcCurl)
133
      self.hcp = pool
134

    
135
    return pool
136

    
137

    
138
_thread_local = _RpcThreadLocal()
139

    
140

    
141
def _RpcTimeout(secs):
142
  """Timeout decorator.
143

144
  When applied to a rpc call_* function, it updates the global timeout
145
  table with the given function/timeout.
146

147
  """
148
  def decorator(f):
149
    name = f.__name__
150
    assert name.startswith("call_")
151
    _TIMEOUTS[name[len("call_"):]] = secs
152
    return f
153
  return decorator
154

    
155

    
156
def RunWithRPC(fn):
157
  """RPC-wrapper decorator.
158

159
  When applied to a function, it runs it with the RPC system
160
  initialized, and it shutsdown the system afterwards. This means the
161
  function must be called without RPC being initialized.
162

163
  """
164
  def wrapper(*args, **kwargs):
165
    Init()
166
    try:
167
      return fn(*args, **kwargs)
168
    finally:
169
      Shutdown()
170
  return wrapper
171

    
172

    
173
class RpcResult(object):
174
  """RPC Result class.
175

176
  This class holds an RPC result. It is needed since in multi-node
177
  calls we can't raise an exception just because one one out of many
178
  failed, and therefore we use this class to encapsulate the result.
179

180
  @ivar data: the data payload, for successful results, or None
181
  @ivar call: the name of the RPC call
182
  @ivar node: the name of the node to which we made the call
183
  @ivar offline: whether the operation failed because the node was
184
      offline, as opposed to actual failure; offline=True will always
185
      imply failed=True, in order to allow simpler checking if
186
      the user doesn't care about the exact failure mode
187
  @ivar fail_msg: the error message if the call failed
188

189
  """
190
  def __init__(self, data=None, failed=False, offline=False,
191
               call=None, node=None):
192
    self.offline = offline
193
    self.call = call
194
    self.node = node
195

    
196
    if offline:
197
      self.fail_msg = "Node is marked offline"
198
      self.data = self.payload = None
199
    elif failed:
200
      self.fail_msg = self._EnsureErr(data)
201
      self.data = self.payload = None
202
    else:
203
      self.data = data
204
      if not isinstance(self.data, (tuple, list)):
205
        self.fail_msg = ("RPC layer error: invalid result type (%s)" %
206
                         type(self.data))
207
        self.payload = None
208
      elif len(data) != 2:
209
        self.fail_msg = ("RPC layer error: invalid result length (%d), "
210
                         "expected 2" % len(self.data))
211
        self.payload = None
212
      elif not self.data[0]:
213
        self.fail_msg = self._EnsureErr(self.data[1])
214
        self.payload = None
215
      else:
216
        # finally success
217
        self.fail_msg = None
218
        self.payload = data[1]
219

    
220
    assert hasattr(self, "call")
221
    assert hasattr(self, "data")
222
    assert hasattr(self, "fail_msg")
223
    assert hasattr(self, "node")
224
    assert hasattr(self, "offline")
225
    assert hasattr(self, "payload")
226

    
227
  @staticmethod
228
  def _EnsureErr(val):
229
    """Helper to ensure we return a 'True' value for error."""
230
    if val:
231
      return val
232
    else:
233
      return "No error information"
234

    
235
  def Raise(self, msg, prereq=False, ecode=None):
236
    """If the result has failed, raise an OpExecError.
237

238
    This is used so that LU code doesn't have to check for each
239
    result, but instead can call this function.
240

241
    """
242
    if not self.fail_msg:
243
      return
244

    
245
    if not msg: # one could pass None for default message
246
      msg = ("Call '%s' to node '%s' has failed: %s" %
247
             (self.call, self.node, self.fail_msg))
248
    else:
249
      msg = "%s: %s" % (msg, self.fail_msg)
250
    if prereq:
251
      ec = errors.OpPrereqError
252
    else:
253
      ec = errors.OpExecError
254
    if ecode is not None:
255
      args = (msg, ecode)
256
    else:
257
      args = (msg, )
258
    raise ec(*args) # pylint: disable-msg=W0142
259

    
260

    
261
def _AddressLookup(node_list,
262
                   ssc=ssconf.SimpleStore,
263
                   nslookup_fn=netutils.Hostname.GetIP):
264
  """Return addresses for given node names.
265

266
  @type node_list: list
267
  @param node_list: List of node names
268
  @type ssc: class
269
  @param ssc: SimpleStore class that is used to obtain node->ip mappings
270
  @type nslookup_fn: callable
271
  @param nslookup_fn: function use to do NS lookup
272
  @rtype: list of addresses and/or None's
273
  @returns: List of corresponding addresses, if found
274

275
  """
276
  ss = ssc()
277
  iplist = ss.GetNodePrimaryIPList()
278
  family = ss.GetPrimaryIPFamily()
279
  addresses = []
280
  ipmap = dict(entry.split() for entry in iplist)
281
  for node in node_list:
282
    address = ipmap.get(node)
283
    if address is None:
284
      address = nslookup_fn(node, family=family)
285
    addresses.append(address)
286

    
287
  return addresses
288

    
289

    
290
class Client:
291
  """RPC Client class.
292

293
  This class, given a (remote) method name, a list of parameters and a
294
  list of nodes, will contact (in parallel) all nodes, and return a
295
  dict of results (key: node name, value: result).
296

297
  One current bug is that generic failure is still signaled by
298
  'False' result, which is not good. This overloading of values can
299
  cause bugs.
300

301
  """
302
  def __init__(self, procedure, body, port, address_lookup_fn=_AddressLookup):
303
    assert procedure in _TIMEOUTS, ("New RPC call not declared in the"
304
                                    " timeouts table")
305
    self.procedure = procedure
306
    self.body = body
307
    self.port = port
308
    self._request = {}
309
    self._address_lookup_fn = address_lookup_fn
310

    
311
  def ConnectList(self, node_list, address_list=None, read_timeout=None):
312
    """Add a list of nodes to the target nodes.
313

314
    @type node_list: list
315
    @param node_list: the list of node names to connect
316
    @type address_list: list or None
317
    @keyword address_list: either None or a list with node addresses,
318
        which must have the same length as the node list
319
    @type read_timeout: int
320
    @param read_timeout: overwrites default timeout for operation
321

322
    """
323
    if address_list is None:
324
      # Always use IP address instead of node name
325
      address_list = self._address_lookup_fn(node_list)
326

    
327
    assert len(node_list) == len(address_list), \
328
           "Name and address lists must have the same length"
329

    
330
    for node, address in zip(node_list, address_list):
331
      self.ConnectNode(node, address, read_timeout=read_timeout)
332

    
333
  def ConnectNode(self, name, address=None, read_timeout=None):
334
    """Add a node to the target list.
335

336
    @type name: str
337
    @param name: the node name
338
    @type address: str
339
    @param address: the node address, if known
340
    @type read_timeout: int
341
    @param read_timeout: overwrites default timeout for operation
342

343
    """
344
    if address is None:
345
      # Always use IP address instead of node name
346
      address = self._address_lookup_fn([name])[0]
347

    
348
    assert(address is not None)
349

    
350
    if read_timeout is None:
351
      read_timeout = _TIMEOUTS[self.procedure]
352

    
353
    self._request[name] = \
354
      http.client.HttpClientRequest(str(address), self.port,
355
                                    http.HTTP_PUT, str("/%s" % self.procedure),
356
                                    headers=_RPC_CLIENT_HEADERS,
357
                                    post_data=str(self.body),
358
                                    read_timeout=read_timeout)
359

    
360
  def GetResults(self, http_pool=None):
361
    """Call nodes and return results.
362

363
    @rtype: list
364
    @return: List of RPC results
365

366
    """
367
    if not http_pool:
368
      http_pool = _thread_local.GetHttpClientPool()
369

    
370
    http_pool.ProcessRequests(self._request.values())
371

    
372
    results = {}
373

    
374
    for name, req in self._request.iteritems():
375
      if req.success and req.resp_status_code == http.HTTP_OK:
376
        results[name] = RpcResult(data=serializer.LoadJson(req.resp_body),
377
                                  node=name, call=self.procedure)
378
        continue
379

    
380
      # TODO: Better error reporting
381
      if req.error:
382
        msg = req.error
383
      else:
384
        msg = req.resp_body
385

    
386
      logging.error("RPC error in %s from node %s: %s",
387
                    self.procedure, name, msg)
388
      results[name] = RpcResult(data=msg, failed=True, node=name,
389
                                call=self.procedure)
390

    
391
    return results
392

    
393

    
394
def _EncodeImportExportIO(ieio, ieioargs):
395
  """Encodes import/export I/O information.
396

397
  """
398
  if ieio == constants.IEIO_RAW_DISK:
399
    assert len(ieioargs) == 1
400
    return (ieioargs[0].ToDict(), )
401

    
402
  if ieio == constants.IEIO_SCRIPT:
403
    assert len(ieioargs) == 2
404
    return (ieioargs[0].ToDict(), ieioargs[1])
405

    
406
  return ieioargs
407

    
408

    
409
class RpcRunner(object):
410
  """RPC runner class"""
411

    
412
  def __init__(self, cfg):
413
    """Initialized the rpc runner.
414

415
    @type cfg:  C{config.ConfigWriter}
416
    @param cfg: the configuration object that will be used to get data
417
                about the cluster
418

419
    """
420
    self._cfg = cfg
421
    self.port = netutils.GetDaemonPort(constants.NODED)
422

    
423
  def _InstDict(self, instance, hvp=None, bep=None, osp=None):
424
    """Convert the given instance to a dict.
425

426
    This is done via the instance's ToDict() method and additionally
427
    we fill the hvparams with the cluster defaults.
428

429
    @type instance: L{objects.Instance}
430
    @param instance: an Instance object
431
    @type hvp: dict or None
432
    @param hvp: a dictionary with overridden hypervisor parameters
433
    @type bep: dict or None
434
    @param bep: a dictionary with overridden backend parameters
435
    @type osp: dict or None
436
    @param osp: a dictionary with overridden os parameters
437
    @rtype: dict
438
    @return: the instance dict, with the hvparams filled with the
439
        cluster defaults
440

441
    """
442
    idict = instance.ToDict()
443
    cluster = self._cfg.GetClusterInfo()
444
    idict["hvparams"] = cluster.FillHV(instance)
445
    if hvp is not None:
446
      idict["hvparams"].update(hvp)
447
    idict["beparams"] = cluster.FillBE(instance)
448
    if bep is not None:
449
      idict["beparams"].update(bep)
450
    idict["osparams"] = cluster.SimpleFillOS(instance.os, instance.osparams)
451
    if osp is not None:
452
      idict["osparams"].update(osp)
453
    for nic in idict["nics"]:
454
      nic['nicparams'] = objects.FillDict(
455
        cluster.nicparams[constants.PP_DEFAULT],
456
        nic['nicparams'])
457
    return idict
458

    
459
  def _ConnectList(self, client, node_list, call, read_timeout=None):
460
    """Helper for computing node addresses.
461

462
    @type client: L{ganeti.rpc.Client}
463
    @param client: a C{Client} instance
464
    @type node_list: list
465
    @param node_list: the node list we should connect
466
    @type call: string
467
    @param call: the name of the remote procedure call, for filling in
468
        correctly any eventual offline nodes' results
469
    @type read_timeout: int
470
    @param read_timeout: overwrites the default read timeout for the
471
        given operation
472

473
    """
474
    all_nodes = self._cfg.GetAllNodesInfo()
475
    name_list = []
476
    addr_list = []
477
    skip_dict = {}
478
    for node in node_list:
479
      if node in all_nodes:
480
        if all_nodes[node].offline:
481
          skip_dict[node] = RpcResult(node=node, offline=True, call=call)
482
          continue
483
        val = all_nodes[node].primary_ip
484
      else:
485
        val = None
486
      addr_list.append(val)
487
      name_list.append(node)
488
    if name_list:
489
      client.ConnectList(name_list, address_list=addr_list,
490
                         read_timeout=read_timeout)
491
    return skip_dict
492

    
493
  def _ConnectNode(self, client, node, call, read_timeout=None):
494
    """Helper for computing one node's address.
495

496
    @type client: L{ganeti.rpc.Client}
497
    @param client: a C{Client} instance
498
    @type node: str
499
    @param node: the node we should connect
500
    @type call: string
501
    @param call: the name of the remote procedure call, for filling in
502
        correctly any eventual offline nodes' results
503
    @type read_timeout: int
504
    @param read_timeout: overwrites the default read timeout for the
505
        given operation
506

507
    """
508
    node_info = self._cfg.GetNodeInfo(node)
509
    if node_info is not None:
510
      if node_info.offline:
511
        return RpcResult(node=node, offline=True, call=call)
512
      addr = node_info.primary_ip
513
    else:
514
      addr = None
515
    client.ConnectNode(node, address=addr, read_timeout=read_timeout)
516

    
517
  def _MultiNodeCall(self, node_list, procedure, args, read_timeout=None):
518
    """Helper for making a multi-node call
519

520
    """
521
    body = serializer.DumpJson(args, indent=False)
522
    c = Client(procedure, body, self.port)
523
    skip_dict = self._ConnectList(c, node_list, procedure,
524
                                  read_timeout=read_timeout)
525
    skip_dict.update(c.GetResults())
526
    return skip_dict
527

    
528
  @classmethod
529
  def _StaticMultiNodeCall(cls, node_list, procedure, args,
530
                           address_list=None, read_timeout=None):
531
    """Helper for making a multi-node static call
532

533
    """
534
    body = serializer.DumpJson(args, indent=False)
535
    c = Client(procedure, body, netutils.GetDaemonPort(constants.NODED))
536
    c.ConnectList(node_list, address_list=address_list,
537
                  read_timeout=read_timeout)
538
    return c.GetResults()
539

    
540
  def _SingleNodeCall(self, node, procedure, args, read_timeout=None):
541
    """Helper for making a single-node call
542

543
    """
544
    body = serializer.DumpJson(args, indent=False)
545
    c = Client(procedure, body, self.port)
546
    result = self._ConnectNode(c, node, procedure, read_timeout=read_timeout)
547
    if result is None:
548
      # we did connect, node is not offline
549
      result = c.GetResults()[node]
550
    return result
551

    
552
  @classmethod
553
  def _StaticSingleNodeCall(cls, node, procedure, args, read_timeout=None):
554
    """Helper for making a single-node static call
555

556
    """
557
    body = serializer.DumpJson(args, indent=False)
558
    c = Client(procedure, body, netutils.GetDaemonPort(constants.NODED))
559
    c.ConnectNode(node, read_timeout=read_timeout)
560
    return c.GetResults()[node]
561

    
562
  @staticmethod
563
  def _Compress(data):
564
    """Compresses a string for transport over RPC.
565

566
    Small amounts of data are not compressed.
567

568
    @type data: str
569
    @param data: Data
570
    @rtype: tuple
571
    @return: Encoded data to send
572

573
    """
574
    # Small amounts of data are not compressed
575
    if len(data) < 512:
576
      return (constants.RPC_ENCODING_NONE, data)
577

    
578
    # Compress with zlib and encode in base64
579
    return (constants.RPC_ENCODING_ZLIB_BASE64,
580
            base64.b64encode(zlib.compress(data, 3)))
581

    
582
  #
583
  # Begin RPC calls
584
  #
585

    
586
  @_RpcTimeout(_TMO_URGENT)
587
  def call_lv_list(self, node_list, vg_name):
588
    """Gets the logical volumes present in a given volume group.
589

590
    This is a multi-node call.
591

592
    """
593
    return self._MultiNodeCall(node_list, "lv_list", [vg_name])
594

    
595
  @_RpcTimeout(_TMO_URGENT)
596
  def call_vg_list(self, node_list):
597
    """Gets the volume group list.
598

599
    This is a multi-node call.
600

601
    """
602
    return self._MultiNodeCall(node_list, "vg_list", [])
603

    
604
  @_RpcTimeout(_TMO_NORMAL)
605
  def call_storage_list(self, node_list, su_name, su_args, name, fields):
606
    """Get list of storage units.
607

608
    This is a multi-node call.
609

610
    """
611
    return self._MultiNodeCall(node_list, "storage_list",
612
                               [su_name, su_args, name, fields])
613

    
614
  @_RpcTimeout(_TMO_NORMAL)
615
  def call_storage_modify(self, node, su_name, su_args, name, changes):
616
    """Modify a storage unit.
617

618
    This is a single-node call.
619

620
    """
621
    return self._SingleNodeCall(node, "storage_modify",
622
                                [su_name, su_args, name, changes])
623

    
624
  @_RpcTimeout(_TMO_NORMAL)
625
  def call_storage_execute(self, node, su_name, su_args, name, op):
626
    """Executes an operation on a storage unit.
627

628
    This is a single-node call.
629

630
    """
631
    return self._SingleNodeCall(node, "storage_execute",
632
                                [su_name, su_args, name, op])
633

    
634
  @_RpcTimeout(_TMO_URGENT)
635
  def call_bridges_exist(self, node, bridges_list):
636
    """Checks if a node has all the bridges given.
637

638
    This method checks if all bridges given in the bridges_list are
639
    present on the remote node, so that an instance that uses interfaces
640
    on those bridges can be started.
641

642
    This is a single-node call.
643

644
    """
645
    return self._SingleNodeCall(node, "bridges_exist", [bridges_list])
646

    
647
  @_RpcTimeout(_TMO_NORMAL)
648
  def call_instance_start(self, node, instance, hvp, bep):
649
    """Starts an instance.
650

651
    This is a single-node call.
652

653
    """
654
    idict = self._InstDict(instance, hvp=hvp, bep=bep)
655
    return self._SingleNodeCall(node, "instance_start", [idict])
656

    
657
  @_RpcTimeout(_TMO_NORMAL)
658
  def call_instance_shutdown(self, node, instance, timeout):
659
    """Stops an instance.
660

661
    This is a single-node call.
662

663
    """
664
    return self._SingleNodeCall(node, "instance_shutdown",
665
                                [self._InstDict(instance), timeout])
666

    
667
  @_RpcTimeout(_TMO_NORMAL)
668
  def call_migration_info(self, node, instance):
669
    """Gather the information necessary to prepare an instance migration.
670

671
    This is a single-node call.
672

673
    @type node: string
674
    @param node: the node on which the instance is currently running
675
    @type instance: C{objects.Instance}
676
    @param instance: the instance definition
677

678
    """
679
    return self._SingleNodeCall(node, "migration_info",
680
                                [self._InstDict(instance)])
681

    
682
  @_RpcTimeout(_TMO_NORMAL)
683
  def call_accept_instance(self, node, instance, info, target):
684
    """Prepare a node to accept an instance.
685

686
    This is a single-node call.
687

688
    @type node: string
689
    @param node: the target node for the migration
690
    @type instance: C{objects.Instance}
691
    @param instance: the instance definition
692
    @type info: opaque/hypervisor specific (string/data)
693
    @param info: result for the call_migration_info call
694
    @type target: string
695
    @param target: target hostname (usually ip address) (on the node itself)
696

697
    """
698
    return self._SingleNodeCall(node, "accept_instance",
699
                                [self._InstDict(instance), info, target])
700

    
701
  @_RpcTimeout(_TMO_NORMAL)
702
  def call_finalize_migration(self, node, instance, info, success):
703
    """Finalize any target-node migration specific operation.
704

705
    This is called both in case of a successful migration and in case of error
706
    (in which case it should abort the migration).
707

708
    This is a single-node call.
709

710
    @type node: string
711
    @param node: the target node for the migration
712
    @type instance: C{objects.Instance}
713
    @param instance: the instance definition
714
    @type info: opaque/hypervisor specific (string/data)
715
    @param info: result for the call_migration_info call
716
    @type success: boolean
717
    @param success: whether the migration was a success or a failure
718

719
    """
720
    return self._SingleNodeCall(node, "finalize_migration",
721
                                [self._InstDict(instance), info, success])
722

    
723
  @_RpcTimeout(_TMO_SLOW)
724
  def call_instance_migrate(self, node, instance, target, live):
725
    """Migrate an instance.
726

727
    This is a single-node call.
728

729
    @type node: string
730
    @param node: the node on which the instance is currently running
731
    @type instance: C{objects.Instance}
732
    @param instance: the instance definition
733
    @type target: string
734
    @param target: the target node name
735
    @type live: boolean
736
    @param live: whether the migration should be done live or not (the
737
        interpretation of this parameter is left to the hypervisor)
738

739
    """
740
    return self._SingleNodeCall(node, "instance_migrate",
741
                                [self._InstDict(instance), target, live])
742

    
743
  @_RpcTimeout(_TMO_NORMAL)
744
  def call_instance_reboot(self, node, inst, reboot_type, shutdown_timeout):
745
    """Reboots an instance.
746

747
    This is a single-node call.
748

749
    """
750
    return self._SingleNodeCall(node, "instance_reboot",
751
                                [self._InstDict(inst), reboot_type,
752
                                 shutdown_timeout])
753

    
754
  @_RpcTimeout(_TMO_1DAY)
755
  def call_instance_os_add(self, node, inst, reinstall, debug, osparams=None):
756
    """Installs an OS on the given instance.
757

758
    This is a single-node call.
759

760
    """
761
    return self._SingleNodeCall(node, "instance_os_add",
762
                                [self._InstDict(inst, osp=osparams),
763
                                 reinstall, debug])
764

    
765
  @_RpcTimeout(_TMO_SLOW)
766
  def call_instance_run_rename(self, node, inst, old_name, debug):
767
    """Run the OS rename script for an instance.
768

769
    This is a single-node call.
770

771
    """
772
    return self._SingleNodeCall(node, "instance_run_rename",
773
                                [self._InstDict(inst), old_name, debug])
774

    
775
  @_RpcTimeout(_TMO_URGENT)
776
  def call_instance_info(self, node, instance, hname):
777
    """Returns information about a single instance.
778

779
    This is a single-node call.
780

781
    @type node: list
782
    @param node: the list of nodes to query
783
    @type instance: string
784
    @param instance: the instance name
785
    @type hname: string
786
    @param hname: the hypervisor type of the instance
787

788
    """
789
    return self._SingleNodeCall(node, "instance_info", [instance, hname])
790

    
791
  @_RpcTimeout(_TMO_NORMAL)
792
  def call_instance_migratable(self, node, instance):
793
    """Checks whether the given instance can be migrated.
794

795
    This is a single-node call.
796

797
    @param node: the node to query
798
    @type instance: L{objects.Instance}
799
    @param instance: the instance to check
800

801

802
    """
803
    return self._SingleNodeCall(node, "instance_migratable",
804
                                [self._InstDict(instance)])
805

    
806
  @_RpcTimeout(_TMO_URGENT)
807
  def call_all_instances_info(self, node_list, hypervisor_list):
808
    """Returns information about all instances on the given nodes.
809

810
    This is a multi-node call.
811

812
    @type node_list: list
813
    @param node_list: the list of nodes to query
814
    @type hypervisor_list: list
815
    @param hypervisor_list: the hypervisors to query for instances
816

817
    """
818
    return self._MultiNodeCall(node_list, "all_instances_info",
819
                               [hypervisor_list])
820

    
821
  @_RpcTimeout(_TMO_URGENT)
822
  def call_instance_list(self, node_list, hypervisor_list):
823
    """Returns the list of running instances on a given node.
824

825
    This is a multi-node call.
826

827
    @type node_list: list
828
    @param node_list: the list of nodes to query
829
    @type hypervisor_list: list
830
    @param hypervisor_list: the hypervisors to query for instances
831

832
    """
833
    return self._MultiNodeCall(node_list, "instance_list", [hypervisor_list])
834

    
835
  @_RpcTimeout(_TMO_FAST)
836
  def call_node_tcp_ping(self, node, source, target, port, timeout,
837
                         live_port_needed):
838
    """Do a TcpPing on the remote node
839

840
    This is a single-node call.
841

842
    """
843
    return self._SingleNodeCall(node, "node_tcp_ping",
844
                                [source, target, port, timeout,
845
                                 live_port_needed])
846

    
847
  @_RpcTimeout(_TMO_FAST)
848
  def call_node_has_ip_address(self, node, address):
849
    """Checks if a node has the given IP address.
850

851
    This is a single-node call.
852

853
    """
854
    return self._SingleNodeCall(node, "node_has_ip_address", [address])
855

    
856
  @_RpcTimeout(_TMO_URGENT)
857
  def call_node_info(self, node_list, vg_name, hypervisor_type):
858
    """Return node information.
859

860
    This will return memory information and volume group size and free
861
    space.
862

863
    This is a multi-node call.
864

865
    @type node_list: list
866
    @param node_list: the list of nodes to query
867
    @type vg_name: C{string}
868
    @param vg_name: the name of the volume group to ask for disk space
869
        information
870
    @type hypervisor_type: C{str}
871
    @param hypervisor_type: the name of the hypervisor to ask for
872
        memory information
873

874
    """
875
    return self._MultiNodeCall(node_list, "node_info",
876
                               [vg_name, hypervisor_type])
877

    
878
  @_RpcTimeout(_TMO_NORMAL)
879
  def call_etc_hosts_modify(self, node, mode, name, ip):
880
    """Modify hosts file with name
881

882
    @type node: string
883
    @param node: The node to call
884
    @type mode: string
885
    @param mode: The mode to operate. Currently "add" or "remove"
886
    @type name: string
887
    @param name: The host name to be modified
888
    @type ip: string
889
    @param ip: The ip of the entry (just valid if mode is "add")
890

891
    """
892
    return self._SingleNodeCall(node, "etc_hosts_modify", [mode, name, ip])
893

    
894
  @_RpcTimeout(_TMO_NORMAL)
895
  def call_node_verify(self, node_list, checkdict, cluster_name):
896
    """Request verification of given parameters.
897

898
    This is a multi-node call.
899

900
    """
901
    return self._MultiNodeCall(node_list, "node_verify",
902
                               [checkdict, cluster_name])
903

    
904
  @classmethod
905
  @_RpcTimeout(_TMO_FAST)
906
  def call_node_start_master(cls, node, start_daemons, no_voting):
907
    """Tells a node to activate itself as a master.
908

909
    This is a single-node call.
910

911
    """
912
    return cls._StaticSingleNodeCall(node, "node_start_master",
913
                                     [start_daemons, no_voting])
914

    
915
  @classmethod
916
  @_RpcTimeout(_TMO_FAST)
917
  def call_node_stop_master(cls, node, stop_daemons):
918
    """Tells a node to demote itself from master status.
919

920
    This is a single-node call.
921

922
    """
923
    return cls._StaticSingleNodeCall(node, "node_stop_master", [stop_daemons])
924

    
925
  @classmethod
926
  @_RpcTimeout(_TMO_URGENT)
927
  def call_master_info(cls, node_list):
928
    """Query master info.
929

930
    This is a multi-node call.
931

932
    """
933
    # TODO: should this method query down nodes?
934
    return cls._StaticMultiNodeCall(node_list, "master_info", [])
935

    
936
  @classmethod
937
  @_RpcTimeout(_TMO_URGENT)
938
  def call_version(cls, node_list):
939
    """Query node version.
940

941
    This is a multi-node call.
942

943
    """
944
    return cls._StaticMultiNodeCall(node_list, "version", [])
945

    
946
  @_RpcTimeout(_TMO_NORMAL)
947
  def call_blockdev_create(self, node, bdev, size, owner, on_primary, info):
948
    """Request creation of a given block device.
949

950
    This is a single-node call.
951

952
    """
953
    return self._SingleNodeCall(node, "blockdev_create",
954
                                [bdev.ToDict(), size, owner, on_primary, info])
955

    
956
  @_RpcTimeout(_TMO_SLOW)
957
  def call_blockdev_wipe(self, node, bdev, offset, size):
958
    """Request wipe at given offset with given size of a block device.
959

960
    This is a single-node call.
961

962
    """
963
    return self._SingleNodeCall(node, "blockdev_wipe",
964
                                [bdev.ToDict(), offset, size])
965

    
966
  @_RpcTimeout(_TMO_NORMAL)
967
  def call_blockdev_remove(self, node, bdev):
968
    """Request removal of a given block device.
969

970
    This is a single-node call.
971

972
    """
973
    return self._SingleNodeCall(node, "blockdev_remove", [bdev.ToDict()])
974

    
975
  @_RpcTimeout(_TMO_NORMAL)
976
  def call_blockdev_rename(self, node, devlist):
977
    """Request rename of the given block devices.
978

979
    This is a single-node call.
980

981
    """
982
    return self._SingleNodeCall(node, "blockdev_rename",
983
                                [(d.ToDict(), uid) for d, uid in devlist])
984

    
985
  @_RpcTimeout(_TMO_NORMAL)
986
  def call_blockdev_assemble(self, node, disk, owner, on_primary):
987
    """Request assembling of a given block device.
988

989
    This is a single-node call.
990

991
    """
992
    return self._SingleNodeCall(node, "blockdev_assemble",
993
                                [disk.ToDict(), owner, on_primary])
994

    
995
  @_RpcTimeout(_TMO_NORMAL)
996
  def call_blockdev_shutdown(self, node, disk):
997
    """Request shutdown of a given block device.
998

999
    This is a single-node call.
1000

1001
    """
1002
    return self._SingleNodeCall(node, "blockdev_shutdown", [disk.ToDict()])
1003

    
1004
  @_RpcTimeout(_TMO_NORMAL)
1005
  def call_blockdev_addchildren(self, node, bdev, ndevs):
1006
    """Request adding a list of children to a (mirroring) device.
1007

1008
    This is a single-node call.
1009

1010
    """
1011
    return self._SingleNodeCall(node, "blockdev_addchildren",
1012
                                [bdev.ToDict(),
1013
                                 [disk.ToDict() for disk in ndevs]])
1014

    
1015
  @_RpcTimeout(_TMO_NORMAL)
1016
  def call_blockdev_removechildren(self, node, bdev, ndevs):
1017
    """Request removing a list of children from a (mirroring) device.
1018

1019
    This is a single-node call.
1020

1021
    """
1022
    return self._SingleNodeCall(node, "blockdev_removechildren",
1023
                                [bdev.ToDict(),
1024
                                 [disk.ToDict() for disk in ndevs]])
1025

    
1026
  @_RpcTimeout(_TMO_NORMAL)
1027
  def call_blockdev_getmirrorstatus(self, node, disks):
1028
    """Request status of a (mirroring) device.
1029

1030
    This is a single-node call.
1031

1032
    """
1033
    result = self._SingleNodeCall(node, "blockdev_getmirrorstatus",
1034
                                  [dsk.ToDict() for dsk in disks])
1035
    if not result.fail_msg:
1036
      result.payload = [objects.BlockDevStatus.FromDict(i)
1037
                        for i in result.payload]
1038
    return result
1039

    
1040
  @_RpcTimeout(_TMO_NORMAL)
1041
  def call_blockdev_find(self, node, disk):
1042
    """Request identification of a given block device.
1043

1044
    This is a single-node call.
1045

1046
    """
1047
    result = self._SingleNodeCall(node, "blockdev_find", [disk.ToDict()])
1048
    if not result.fail_msg and result.payload is not None:
1049
      result.payload = objects.BlockDevStatus.FromDict(result.payload)
1050
    return result
1051

    
1052
  @_RpcTimeout(_TMO_NORMAL)
1053
  def call_blockdev_close(self, node, instance_name, disks):
1054
    """Closes the given block devices.
1055

1056
    This is a single-node call.
1057

1058
    """
1059
    params = [instance_name, [cf.ToDict() for cf in disks]]
1060
    return self._SingleNodeCall(node, "blockdev_close", params)
1061

    
1062
  @_RpcTimeout(_TMO_NORMAL)
1063
  def call_blockdev_getsizes(self, node, disks):
1064
    """Returns the size of the given disks.
1065

1066
    This is a single-node call.
1067

1068
    """
1069
    params = [[cf.ToDict() for cf in disks]]
1070
    return self._SingleNodeCall(node, "blockdev_getsize", params)
1071

    
1072
  @_RpcTimeout(_TMO_NORMAL)
1073
  def call_drbd_disconnect_net(self, node_list, nodes_ip, disks):
1074
    """Disconnects the network of the given drbd devices.
1075

1076
    This is a multi-node call.
1077

1078
    """
1079
    return self._MultiNodeCall(node_list, "drbd_disconnect_net",
1080
                               [nodes_ip, [cf.ToDict() for cf in disks]])
1081

    
1082
  @_RpcTimeout(_TMO_NORMAL)
1083
  def call_drbd_attach_net(self, node_list, nodes_ip,
1084
                           disks, instance_name, multimaster):
1085
    """Disconnects the given drbd devices.
1086

1087
    This is a multi-node call.
1088

1089
    """
1090
    return self._MultiNodeCall(node_list, "drbd_attach_net",
1091
                               [nodes_ip, [cf.ToDict() for cf in disks],
1092
                                instance_name, multimaster])
1093

    
1094
  @_RpcTimeout(_TMO_SLOW)
1095
  def call_drbd_wait_sync(self, node_list, nodes_ip, disks):
1096
    """Waits for the synchronization of drbd devices is complete.
1097

1098
    This is a multi-node call.
1099

1100
    """
1101
    return self._MultiNodeCall(node_list, "drbd_wait_sync",
1102
                               [nodes_ip, [cf.ToDict() for cf in disks]])
1103

    
1104
  @_RpcTimeout(_TMO_URGENT)
1105
  def call_drbd_helper(self, node_list):
1106
    """Gets drbd helper.
1107

1108
    This is a multi-node call.
1109

1110
    """
1111
    return self._MultiNodeCall(node_list, "drbd_helper", [])
1112

    
1113
  @classmethod
1114
  @_RpcTimeout(_TMO_NORMAL)
1115
  def call_upload_file(cls, node_list, file_name, address_list=None):
1116
    """Upload a file.
1117

1118
    The node will refuse the operation in case the file is not on the
1119
    approved file list.
1120

1121
    This is a multi-node call.
1122

1123
    @type node_list: list
1124
    @param node_list: the list of node names to upload to
1125
    @type file_name: str
1126
    @param file_name: the filename to upload
1127
    @type address_list: list or None
1128
    @keyword address_list: an optional list of node addresses, in order
1129
        to optimize the RPC speed
1130

1131
    """
1132
    file_contents = utils.ReadFile(file_name)
1133
    data = cls._Compress(file_contents)
1134
    st = os.stat(file_name)
1135
    params = [file_name, data, st.st_mode, st.st_uid, st.st_gid,
1136
              st.st_atime, st.st_mtime]
1137
    return cls._StaticMultiNodeCall(node_list, "upload_file", params,
1138
                                    address_list=address_list)
1139

    
1140
  @classmethod
1141
  @_RpcTimeout(_TMO_NORMAL)
1142
  def call_write_ssconf_files(cls, node_list, values):
1143
    """Write ssconf files.
1144

1145
    This is a multi-node call.
1146

1147
    """
1148
    return cls._StaticMultiNodeCall(node_list, "write_ssconf_files", [values])
1149

    
1150
  @_RpcTimeout(_TMO_FAST)
1151
  def call_os_diagnose(self, node_list):
1152
    """Request a diagnose of OS definitions.
1153

1154
    This is a multi-node call.
1155

1156
    """
1157
    return self._MultiNodeCall(node_list, "os_diagnose", [])
1158

    
1159
  @_RpcTimeout(_TMO_FAST)
1160
  def call_os_get(self, node, name):
1161
    """Returns an OS definition.
1162

1163
    This is a single-node call.
1164

1165
    """
1166
    result = self._SingleNodeCall(node, "os_get", [name])
1167
    if not result.fail_msg and isinstance(result.payload, dict):
1168
      result.payload = objects.OS.FromDict(result.payload)
1169
    return result
1170

    
1171
  @_RpcTimeout(_TMO_FAST)
1172
  def call_os_validate(self, required, nodes, name, checks, params):
1173
    """Run a validation routine for a given OS.
1174

1175
    This is a multi-node call.
1176

1177
    """
1178
    return self._MultiNodeCall(nodes, "os_validate",
1179
                               [required, name, checks, params])
1180

    
1181
  @_RpcTimeout(_TMO_NORMAL)
1182
  def call_hooks_runner(self, node_list, hpath, phase, env):
1183
    """Call the hooks runner.
1184

1185
    Args:
1186
      - op: the OpCode instance
1187
      - env: a dictionary with the environment
1188

1189
    This is a multi-node call.
1190

1191
    """
1192
    params = [hpath, phase, env]
1193
    return self._MultiNodeCall(node_list, "hooks_runner", params)
1194

    
1195
  @_RpcTimeout(_TMO_NORMAL)
1196
  def call_iallocator_runner(self, node, name, idata):
1197
    """Call an iallocator on a remote node
1198

1199
    Args:
1200
      - name: the iallocator name
1201
      - input: the json-encoded input string
1202

1203
    This is a single-node call.
1204

1205
    """
1206
    return self._SingleNodeCall(node, "iallocator_runner", [name, idata])
1207

    
1208
  @_RpcTimeout(_TMO_NORMAL)
1209
  def call_blockdev_grow(self, node, cf_bdev, amount):
1210
    """Request a snapshot of the given block device.
1211

1212
    This is a single-node call.
1213

1214
    """
1215
    return self._SingleNodeCall(node, "blockdev_grow",
1216
                                [cf_bdev.ToDict(), amount])
1217

    
1218
  @_RpcTimeout(_TMO_1DAY)
1219
  def call_blockdev_export(self, node, cf_bdev,
1220
                           dest_node, dest_path, cluster_name):
1221
    """Export a given disk to another node.
1222

1223
    This is a single-node call.
1224

1225
    """
1226
    return self._SingleNodeCall(node, "blockdev_export",
1227
                                [cf_bdev.ToDict(), dest_node, dest_path,
1228
                                 cluster_name])
1229

    
1230
  @_RpcTimeout(_TMO_NORMAL)
1231
  def call_blockdev_snapshot(self, node, cf_bdev):
1232
    """Request a snapshot of the given block device.
1233

1234
    This is a single-node call.
1235

1236
    """
1237
    return self._SingleNodeCall(node, "blockdev_snapshot", [cf_bdev.ToDict()])
1238

    
1239
  @_RpcTimeout(_TMO_NORMAL)
1240
  def call_finalize_export(self, node, instance, snap_disks):
1241
    """Request the completion of an export operation.
1242

1243
    This writes the export config file, etc.
1244

1245
    This is a single-node call.
1246

1247
    """
1248
    flat_disks = []
1249
    for disk in snap_disks:
1250
      if isinstance(disk, bool):
1251
        flat_disks.append(disk)
1252
      else:
1253
        flat_disks.append(disk.ToDict())
1254

    
1255
    return self._SingleNodeCall(node, "finalize_export",
1256
                                [self._InstDict(instance), flat_disks])
1257

    
1258
  @_RpcTimeout(_TMO_FAST)
1259
  def call_export_info(self, node, path):
1260
    """Queries the export information in a given path.
1261

1262
    This is a single-node call.
1263

1264
    """
1265
    return self._SingleNodeCall(node, "export_info", [path])
1266

    
1267
  @_RpcTimeout(_TMO_FAST)
1268
  def call_export_list(self, node_list):
1269
    """Gets the stored exports list.
1270

1271
    This is a multi-node call.
1272

1273
    """
1274
    return self._MultiNodeCall(node_list, "export_list", [])
1275

    
1276
  @_RpcTimeout(_TMO_FAST)
1277
  def call_export_remove(self, node, export):
1278
    """Requests removal of a given export.
1279

1280
    This is a single-node call.
1281

1282
    """
1283
    return self._SingleNodeCall(node, "export_remove", [export])
1284

    
1285
  @classmethod
1286
  @_RpcTimeout(_TMO_NORMAL)
1287
  def call_node_leave_cluster(cls, node, modify_ssh_setup):
1288
    """Requests a node to clean the cluster information it has.
1289

1290
    This will remove the configuration information from the ganeti data
1291
    dir.
1292

1293
    This is a single-node call.
1294

1295
    """
1296
    return cls._StaticSingleNodeCall(node, "node_leave_cluster",
1297
                                     [modify_ssh_setup])
1298

    
1299
  @_RpcTimeout(_TMO_FAST)
1300
  def call_node_volumes(self, node_list):
1301
    """Gets all volumes on node(s).
1302

1303
    This is a multi-node call.
1304

1305
    """
1306
    return self._MultiNodeCall(node_list, "node_volumes", [])
1307

    
1308
  @_RpcTimeout(_TMO_FAST)
1309
  def call_node_demote_from_mc(self, node):
1310
    """Demote a node from the master candidate role.
1311

1312
    This is a single-node call.
1313

1314
    """
1315
    return self._SingleNodeCall(node, "node_demote_from_mc", [])
1316

    
1317
  @_RpcTimeout(_TMO_NORMAL)
1318
  def call_node_powercycle(self, node, hypervisor):
1319
    """Tries to powercycle a node.
1320

1321
    This is a single-node call.
1322

1323
    """
1324
    return self._SingleNodeCall(node, "node_powercycle", [hypervisor])
1325

    
1326
  @_RpcTimeout(None)
1327
  def call_test_delay(self, node_list, duration):
1328
    """Sleep for a fixed time on given node(s).
1329

1330
    This is a multi-node call.
1331

1332
    """
1333
    return self._MultiNodeCall(node_list, "test_delay", [duration],
1334
                               read_timeout=int(duration + 5))
1335

    
1336
  @_RpcTimeout(_TMO_FAST)
1337
  def call_file_storage_dir_create(self, node, file_storage_dir):
1338
    """Create the given file storage directory.
1339

1340
    This is a single-node call.
1341

1342
    """
1343
    return self._SingleNodeCall(node, "file_storage_dir_create",
1344
                                [file_storage_dir])
1345

    
1346
  @_RpcTimeout(_TMO_FAST)
1347
  def call_file_storage_dir_remove(self, node, file_storage_dir):
1348
    """Remove the given file storage directory.
1349

1350
    This is a single-node call.
1351

1352
    """
1353
    return self._SingleNodeCall(node, "file_storage_dir_remove",
1354
                                [file_storage_dir])
1355

    
1356
  @_RpcTimeout(_TMO_FAST)
1357
  def call_file_storage_dir_rename(self, node, old_file_storage_dir,
1358
                                   new_file_storage_dir):
1359
    """Rename file storage directory.
1360

1361
    This is a single-node call.
1362

1363
    """
1364
    return self._SingleNodeCall(node, "file_storage_dir_rename",
1365
                                [old_file_storage_dir, new_file_storage_dir])
1366

    
1367
  @classmethod
1368
  @_RpcTimeout(_TMO_FAST)
1369
  def call_jobqueue_update(cls, node_list, address_list, file_name, content):
1370
    """Update job queue.
1371

1372
    This is a multi-node call.
1373

1374
    """
1375
    return cls._StaticMultiNodeCall(node_list, "jobqueue_update",
1376
                                    [file_name, cls._Compress(content)],
1377
                                    address_list=address_list)
1378

    
1379
  @classmethod
1380
  @_RpcTimeout(_TMO_NORMAL)
1381
  def call_jobqueue_purge(cls, node):
1382
    """Purge job queue.
1383

1384
    This is a single-node call.
1385

1386
    """
1387
    return cls._StaticSingleNodeCall(node, "jobqueue_purge", [])
1388

    
1389
  @classmethod
1390
  @_RpcTimeout(_TMO_FAST)
1391
  def call_jobqueue_rename(cls, node_list, address_list, rename):
1392
    """Rename a job queue file.
1393

1394
    This is a multi-node call.
1395

1396
    """
1397
    return cls._StaticMultiNodeCall(node_list, "jobqueue_rename", rename,
1398
                                    address_list=address_list)
1399

    
1400
  @_RpcTimeout(_TMO_NORMAL)
1401
  def call_hypervisor_validate_params(self, node_list, hvname, hvparams):
1402
    """Validate the hypervisor params.
1403

1404
    This is a multi-node call.
1405

1406
    @type node_list: list
1407
    @param node_list: the list of nodes to query
1408
    @type hvname: string
1409
    @param hvname: the hypervisor name
1410
    @type hvparams: dict
1411
    @param hvparams: the hypervisor parameters to be validated
1412

1413
    """
1414
    cluster = self._cfg.GetClusterInfo()
1415
    hv_full = objects.FillDict(cluster.hvparams.get(hvname, {}), hvparams)
1416
    return self._MultiNodeCall(node_list, "hypervisor_validate_params",
1417
                               [hvname, hv_full])
1418

    
1419
  @_RpcTimeout(_TMO_NORMAL)
1420
  def call_x509_cert_create(self, node, validity):
1421
    """Creates a new X509 certificate for SSL/TLS.
1422

1423
    This is a single-node call.
1424

1425
    @type validity: int
1426
    @param validity: Validity in seconds
1427

1428
    """
1429
    return self._SingleNodeCall(node, "x509_cert_create", [validity])
1430

    
1431
  @_RpcTimeout(_TMO_NORMAL)
1432
  def call_x509_cert_remove(self, node, name):
1433
    """Removes a X509 certificate.
1434

1435
    This is a single-node call.
1436

1437
    @type name: string
1438
    @param name: Certificate name
1439

1440
    """
1441
    return self._SingleNodeCall(node, "x509_cert_remove", [name])
1442

    
1443
  @_RpcTimeout(_TMO_NORMAL)
1444
  def call_import_start(self, node, opts, instance, dest, dest_args):
1445
    """Starts a listener for an import.
1446

1447
    This is a single-node call.
1448

1449
    @type node: string
1450
    @param node: Node name
1451
    @type instance: C{objects.Instance}
1452
    @param instance: Instance object
1453

1454
    """
1455
    return self._SingleNodeCall(node, "import_start",
1456
                                [opts.ToDict(),
1457
                                 self._InstDict(instance), dest,
1458
                                 _EncodeImportExportIO(dest, dest_args)])
1459

    
1460
  @_RpcTimeout(_TMO_NORMAL)
1461
  def call_export_start(self, node, opts, host, port,
1462
                        instance, source, source_args):
1463
    """Starts an export daemon.
1464

1465
    This is a single-node call.
1466

1467
    @type node: string
1468
    @param node: Node name
1469
    @type instance: C{objects.Instance}
1470
    @param instance: Instance object
1471

1472
    """
1473
    return self._SingleNodeCall(node, "export_start",
1474
                                [opts.ToDict(), host, port,
1475
                                 self._InstDict(instance), source,
1476
                                 _EncodeImportExportIO(source, source_args)])
1477

    
1478
  @_RpcTimeout(_TMO_FAST)
1479
  def call_impexp_status(self, node, names):
1480
    """Gets the status of an import or export.
1481

1482
    This is a single-node call.
1483

1484
    @type node: string
1485
    @param node: Node name
1486
    @type names: List of strings
1487
    @param names: Import/export names
1488
    @rtype: List of L{objects.ImportExportStatus} instances
1489
    @return: Returns a list of the state of each named import/export or None if
1490
             a status couldn't be retrieved
1491

1492
    """
1493
    result = self._SingleNodeCall(node, "impexp_status", [names])
1494

    
1495
    if not result.fail_msg:
1496
      decoded = []
1497

    
1498
      for i in result.payload:
1499
        if i is None:
1500
          decoded.append(None)
1501
          continue
1502
        decoded.append(objects.ImportExportStatus.FromDict(i))
1503

    
1504
      result.payload = decoded
1505

    
1506
    return result
1507

    
1508
  @_RpcTimeout(_TMO_NORMAL)
1509
  def call_impexp_abort(self, node, name):
1510
    """Aborts an import or export.
1511

1512
    This is a single-node call.
1513

1514
    @type node: string
1515
    @param node: Node name
1516
    @type name: string
1517
    @param name: Import/export name
1518

1519
    """
1520
    return self._SingleNodeCall(node, "impexp_abort", [name])
1521

    
1522
  @_RpcTimeout(_TMO_NORMAL)
1523
  def call_impexp_cleanup(self, node, name):
1524
    """Cleans up after an import or export.
1525

1526
    This is a single-node call.
1527

1528
    @type node: string
1529
    @param node: Node name
1530
    @type name: string
1531
    @param name: Import/export name
1532

1533
    """
1534
    return self._SingleNodeCall(node, "impexp_cleanup", [name])