Statistics
| Branch: | Tag: | Revision:

root / lib / rpc.py @ 00267bfe

History | View | Annotate | Download (45.8 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Inter-node RPC library.
23

24
"""
25

    
26
# pylint: disable=C0103,R0201,R0904
27
# C0103: Invalid name, since call_ are not valid
28
# R0201: Method could be a function, we keep all rpcs instance methods
29
# as not to change them back and forth between static/instance methods
30
# if they need to start using instance attributes
31
# R0904: Too many public methods
32

    
33
import os
34
import logging
35
import zlib
36
import base64
37
import pycurl
38
import threading
39

    
40
from ganeti import utils
41
from ganeti import objects
42
from ganeti import http
43
from ganeti import serializer
44
from ganeti import constants
45
from ganeti import errors
46
from ganeti import netutils
47
from ganeti import ssconf
48
from ganeti import runtime
49
from ganeti import compat
50

    
51
# pylint has a bug here, doesn't see this import
52
import ganeti.http.client  # pylint: disable=W0611
53

    
54

    
55
# Timeout for connecting to nodes (seconds)
56
_RPC_CONNECT_TIMEOUT = 5
57

    
58
_RPC_CLIENT_HEADERS = [
59
  "Content-type: %s" % http.HTTP_APP_JSON,
60
  "Expect:",
61
  ]
62

    
63
# Various time constants for the timeout table
64
_TMO_URGENT = 60 # one minute
65
_TMO_FAST = 5 * 60 # five minutes
66
_TMO_NORMAL = 15 * 60 # 15 minutes
67
_TMO_SLOW = 3600 # one hour
68
_TMO_4HRS = 4 * 3600
69
_TMO_1DAY = 86400
70

    
71
# Timeout table that will be built later by decorators
72
# Guidelines for choosing timeouts:
73
# - call used during watcher: timeout -> 1min, _TMO_URGENT
74
# - trivial (but be sure it is trivial) (e.g. reading a file): 5min, _TMO_FAST
75
# - other calls: 15 min, _TMO_NORMAL
76
# - special calls (instance add, etc.): either _TMO_SLOW (1h) or huge timeouts
77

    
78
_TIMEOUTS = {
79
}
80

    
81
#: Special value to describe an offline host
82
_OFFLINE = object()
83

    
84

    
85
def Init():
86
  """Initializes the module-global HTTP client manager.
87

88
  Must be called before using any RPC function and while exactly one thread is
89
  running.
90

91
  """
92
  # curl_global_init(3) and curl_global_cleanup(3) must be called with only
93
  # one thread running. This check is just a safety measure -- it doesn't
94
  # cover all cases.
95
  assert threading.activeCount() == 1, \
96
         "Found more than one active thread when initializing pycURL"
97

    
98
  logging.info("Using PycURL %s", pycurl.version)
99

    
100
  pycurl.global_init(pycurl.GLOBAL_ALL)
101

    
102

    
103
def Shutdown():
104
  """Stops the module-global HTTP client manager.
105

106
  Must be called before quitting the program and while exactly one thread is
107
  running.
108

109
  """
110
  pycurl.global_cleanup()
111

    
112

    
113
def _ConfigRpcCurl(curl):
114
  noded_cert = str(constants.NODED_CERT_FILE)
115

    
116
  curl.setopt(pycurl.FOLLOWLOCATION, False)
117
  curl.setopt(pycurl.CAINFO, noded_cert)
118
  curl.setopt(pycurl.SSL_VERIFYHOST, 0)
119
  curl.setopt(pycurl.SSL_VERIFYPEER, True)
120
  curl.setopt(pycurl.SSLCERTTYPE, "PEM")
121
  curl.setopt(pycurl.SSLCERT, noded_cert)
122
  curl.setopt(pycurl.SSLKEYTYPE, "PEM")
123
  curl.setopt(pycurl.SSLKEY, noded_cert)
124
  curl.setopt(pycurl.CONNECTTIMEOUT, _RPC_CONNECT_TIMEOUT)
125

    
126

    
127
# Aliasing this module avoids the following warning by epydoc: "Warning: No
128
# information available for ganeti.rpc._RpcThreadLocal's base threading.local"
129
_threading = threading
130

    
131

    
132
class _RpcThreadLocal(_threading.local):
133
  def GetHttpClientPool(self):
134
    """Returns a per-thread HTTP client pool.
135

136
    @rtype: L{http.client.HttpClientPool}
137

138
    """
139
    try:
140
      pool = self.hcp
141
    except AttributeError:
142
      pool = http.client.HttpClientPool(_ConfigRpcCurl)
143
      self.hcp = pool
144

    
145
    return pool
146

    
147

    
148
# Remove module alias (see above)
149
del _threading
150

    
151

    
152
_thread_local = _RpcThreadLocal()
153

    
154

    
155
def _RpcTimeout(secs):
156
  """Timeout decorator.
157

158
  When applied to a rpc call_* function, it updates the global timeout
159
  table with the given function/timeout.
160

161
  """
162
  def decorator(f):
163
    name = f.__name__
164
    assert name.startswith("call_")
165
    _TIMEOUTS[name[len("call_"):]] = secs
166
    return f
167
  return decorator
168

    
169

    
170
def RunWithRPC(fn):
171
  """RPC-wrapper decorator.
172

173
  When applied to a function, it runs it with the RPC system
174
  initialized, and it shutsdown the system afterwards. This means the
175
  function must be called without RPC being initialized.
176

177
  """
178
  def wrapper(*args, **kwargs):
179
    Init()
180
    try:
181
      return fn(*args, **kwargs)
182
    finally:
183
      Shutdown()
184
  return wrapper
185

    
186

    
187
def _Compress(data):
188
  """Compresses a string for transport over RPC.
189

190
  Small amounts of data are not compressed.
191

192
  @type data: str
193
  @param data: Data
194
  @rtype: tuple
195
  @return: Encoded data to send
196

197
  """
198
  # Small amounts of data are not compressed
199
  if len(data) < 512:
200
    return (constants.RPC_ENCODING_NONE, data)
201

    
202
  # Compress with zlib and encode in base64
203
  return (constants.RPC_ENCODING_ZLIB_BASE64,
204
          base64.b64encode(zlib.compress(data, 3)))
205

    
206

    
207
class RpcResult(object):
208
  """RPC Result class.
209

210
  This class holds an RPC result. It is needed since in multi-node
211
  calls we can't raise an exception just because one one out of many
212
  failed, and therefore we use this class to encapsulate the result.
213

214
  @ivar data: the data payload, for successful results, or None
215
  @ivar call: the name of the RPC call
216
  @ivar node: the name of the node to which we made the call
217
  @ivar offline: whether the operation failed because the node was
218
      offline, as opposed to actual failure; offline=True will always
219
      imply failed=True, in order to allow simpler checking if
220
      the user doesn't care about the exact failure mode
221
  @ivar fail_msg: the error message if the call failed
222

223
  """
224
  def __init__(self, data=None, failed=False, offline=False,
225
               call=None, node=None):
226
    self.offline = offline
227
    self.call = call
228
    self.node = node
229

    
230
    if offline:
231
      self.fail_msg = "Node is marked offline"
232
      self.data = self.payload = None
233
    elif failed:
234
      self.fail_msg = self._EnsureErr(data)
235
      self.data = self.payload = None
236
    else:
237
      self.data = data
238
      if not isinstance(self.data, (tuple, list)):
239
        self.fail_msg = ("RPC layer error: invalid result type (%s)" %
240
                         type(self.data))
241
        self.payload = None
242
      elif len(data) != 2:
243
        self.fail_msg = ("RPC layer error: invalid result length (%d), "
244
                         "expected 2" % len(self.data))
245
        self.payload = None
246
      elif not self.data[0]:
247
        self.fail_msg = self._EnsureErr(self.data[1])
248
        self.payload = None
249
      else:
250
        # finally success
251
        self.fail_msg = None
252
        self.payload = data[1]
253

    
254
    for attr_name in ["call", "data", "fail_msg",
255
                      "node", "offline", "payload"]:
256
      assert hasattr(self, attr_name), "Missing attribute %s" % attr_name
257

    
258
  @staticmethod
259
  def _EnsureErr(val):
260
    """Helper to ensure we return a 'True' value for error."""
261
    if val:
262
      return val
263
    else:
264
      return "No error information"
265

    
266
  def Raise(self, msg, prereq=False, ecode=None):
267
    """If the result has failed, raise an OpExecError.
268

269
    This is used so that LU code doesn't have to check for each
270
    result, but instead can call this function.
271

272
    """
273
    if not self.fail_msg:
274
      return
275

    
276
    if not msg: # one could pass None for default message
277
      msg = ("Call '%s' to node '%s' has failed: %s" %
278
             (self.call, self.node, self.fail_msg))
279
    else:
280
      msg = "%s: %s" % (msg, self.fail_msg)
281
    if prereq:
282
      ec = errors.OpPrereqError
283
    else:
284
      ec = errors.OpExecError
285
    if ecode is not None:
286
      args = (msg, ecode)
287
    else:
288
      args = (msg, )
289
    raise ec(*args) # pylint: disable=W0142
290

    
291

    
292
def _SsconfResolver(node_list,
293
                    ssc=ssconf.SimpleStore,
294
                    nslookup_fn=netutils.Hostname.GetIP):
295
  """Return addresses for given node names.
296

297
  @type node_list: list
298
  @param node_list: List of node names
299
  @type ssc: class
300
  @param ssc: SimpleStore class that is used to obtain node->ip mappings
301
  @type nslookup_fn: callable
302
  @param nslookup_fn: function use to do NS lookup
303
  @rtype: list of tuple; (string, string)
304
  @return: List of tuples containing node name and IP address
305

306
  """
307
  ss = ssc()
308
  iplist = ss.GetNodePrimaryIPList()
309
  family = ss.GetPrimaryIPFamily()
310
  ipmap = dict(entry.split() for entry in iplist)
311

    
312
  result = []
313
  for node in node_list:
314
    ip = ipmap.get(node)
315
    if ip is None:
316
      ip = nslookup_fn(node, family=family)
317
    result.append((node, ip))
318

    
319
  return result
320

    
321

    
322
class _StaticResolver:
323
  def __init__(self, addresses):
324
    """Initializes this class.
325

326
    """
327
    self._addresses = addresses
328

    
329
  def __call__(self, hosts):
330
    """Returns static addresses for hosts.
331

332
    """
333
    assert len(hosts) == len(self._addresses)
334
    return zip(hosts, self._addresses)
335

    
336

    
337
def _CheckConfigNode(name, node):
338
  """Checks if a node is online.
339

340
  @type name: string
341
  @param name: Node name
342
  @type node: L{objects.Node} or None
343
  @param node: Node object
344

345
  """
346
  if node is None:
347
    # Depend on DNS for name resolution
348
    ip = name
349
  elif node.offline:
350
    ip = _OFFLINE
351
  else:
352
    ip = node.primary_ip
353
  return (name, ip)
354

    
355

    
356
def _NodeConfigResolver(single_node_fn, all_nodes_fn, hosts):
357
  """Calculate node addresses using configuration.
358

359
  """
360
  # Special case for single-host lookups
361
  if len(hosts) == 1:
362
    (name, ) = hosts
363
    return [_CheckConfigNode(name, single_node_fn(name))]
364
  else:
365
    all_nodes = all_nodes_fn()
366
    return [_CheckConfigNode(name, all_nodes.get(name, None))
367
            for name in hosts]
368

    
369

    
370
class _RpcProcessor:
371
  def __init__(self, resolver, port):
372
    """Initializes this class.
373

374
    @param resolver: callable accepting a list of hostnames, returning a list
375
      of tuples containing name and IP address (IP address can be the name or
376
      the special value L{_OFFLINE} to mark offline machines)
377
    @type port: int
378
    @param port: TCP port
379

380
    """
381
    self._resolver = resolver
382
    self._port = port
383

    
384
  @staticmethod
385
  def _PrepareRequests(hosts, port, procedure, body, read_timeout):
386
    """Prepares requests by sorting offline hosts into separate list.
387

388
    """
389
    results = {}
390
    requests = {}
391

    
392
    for (name, ip) in hosts:
393
      if ip is _OFFLINE:
394
        # Node is marked as offline
395
        results[name] = RpcResult(node=name, offline=True, call=procedure)
396
      else:
397
        requests[name] = \
398
          http.client.HttpClientRequest(str(ip), port,
399
                                        http.HTTP_PUT, str("/%s" % procedure),
400
                                        headers=_RPC_CLIENT_HEADERS,
401
                                        post_data=body,
402
                                        read_timeout=read_timeout)
403

    
404
    return (results, requests)
405

    
406
  @staticmethod
407
  def _CombineResults(results, requests, procedure):
408
    """Combines pre-computed results for offline hosts with actual call results.
409

410
    """
411
    for name, req in requests.items():
412
      if req.success and req.resp_status_code == http.HTTP_OK:
413
        host_result = RpcResult(data=serializer.LoadJson(req.resp_body),
414
                                node=name, call=procedure)
415
      else:
416
        # TODO: Better error reporting
417
        if req.error:
418
          msg = req.error
419
        else:
420
          msg = req.resp_body
421

    
422
        logging.error("RPC error in %s on node %s: %s", procedure, name, msg)
423
        host_result = RpcResult(data=msg, failed=True, node=name,
424
                                call=procedure)
425

    
426
      results[name] = host_result
427

    
428
    return results
429

    
430
  def __call__(self, hosts, procedure, body, read_timeout=None, http_pool=None):
431
    """Makes an RPC request to a number of nodes.
432

433
    @type hosts: sequence
434
    @param hosts: Hostnames
435
    @type procedure: string
436
    @param procedure: Request path
437
    @type body: string
438
    @param body: Request body
439
    @type read_timeout: int or None
440
    @param read_timeout: Read timeout for request
441

442
    """
443
    assert procedure in _TIMEOUTS, "RPC call not declared in the timeouts table"
444

    
445
    if not http_pool:
446
      http_pool = _thread_local.GetHttpClientPool()
447

    
448
    if read_timeout is None:
449
      read_timeout = _TIMEOUTS[procedure]
450

    
451
    (results, requests) = \
452
      self._PrepareRequests(self._resolver(hosts), self._port, procedure,
453
                            str(body), read_timeout)
454

    
455
    http_pool.ProcessRequests(requests.values())
456

    
457
    assert not frozenset(results).intersection(requests)
458

    
459
    return self._CombineResults(results, requests, procedure)
460

    
461

    
462
def _EncodeImportExportIO(ieio, ieioargs):
463
  """Encodes import/export I/O information.
464

465
  """
466
  if ieio == constants.IEIO_RAW_DISK:
467
    assert len(ieioargs) == 1
468
    return (ieioargs[0].ToDict(), )
469

    
470
  if ieio == constants.IEIO_SCRIPT:
471
    assert len(ieioargs) == 2
472
    return (ieioargs[0].ToDict(), ieioargs[1])
473

    
474
  return ieioargs
475

    
476

    
477
class RpcRunner(object):
478
  """RPC runner class.
479

480
  """
481
  def __init__(self, context):
482
    """Initialized the RPC runner.
483

484
    @type context: C{masterd.GanetiContext}
485
    @param context: Ganeti context
486

487
    """
488
    self._cfg = context.cfg
489
    self._proc = _RpcProcessor(compat.partial(_NodeConfigResolver,
490
                                              self._cfg.GetNodeInfo,
491
                                              self._cfg.GetAllNodesInfo),
492
                               netutils.GetDaemonPort(constants.NODED))
493

    
494
  def _InstDict(self, instance, hvp=None, bep=None, osp=None):
495
    """Convert the given instance to a dict.
496

497
    This is done via the instance's ToDict() method and additionally
498
    we fill the hvparams with the cluster defaults.
499

500
    @type instance: L{objects.Instance}
501
    @param instance: an Instance object
502
    @type hvp: dict or None
503
    @param hvp: a dictionary with overridden hypervisor parameters
504
    @type bep: dict or None
505
    @param bep: a dictionary with overridden backend parameters
506
    @type osp: dict or None
507
    @param osp: a dictionary with overridden os parameters
508
    @rtype: dict
509
    @return: the instance dict, with the hvparams filled with the
510
        cluster defaults
511

512
    """
513
    idict = instance.ToDict()
514
    cluster = self._cfg.GetClusterInfo()
515
    idict["hvparams"] = cluster.FillHV(instance)
516
    if hvp is not None:
517
      idict["hvparams"].update(hvp)
518
    idict["beparams"] = cluster.FillBE(instance)
519
    if bep is not None:
520
      idict["beparams"].update(bep)
521
    idict["osparams"] = cluster.SimpleFillOS(instance.os, instance.osparams)
522
    if osp is not None:
523
      idict["osparams"].update(osp)
524
    for nic in idict["nics"]:
525
      nic['nicparams'] = objects.FillDict(
526
        cluster.nicparams[constants.PP_DEFAULT],
527
        nic['nicparams'])
528
    return idict
529

    
530
  def _MultiNodeCall(self, node_list, procedure, args, read_timeout=None):
531
    """Helper for making a multi-node call
532

533
    """
534
    body = serializer.DumpJson(args, indent=False)
535
    return self._proc(node_list, procedure, body, read_timeout=read_timeout)
536

    
537
  @staticmethod
538
  def _StaticMultiNodeCall(node_list, procedure, args,
539
                           address_list=None, read_timeout=None):
540
    """Helper for making a multi-node static call
541

542
    """
543
    body = serializer.DumpJson(args, indent=False)
544

    
545
    if address_list is None:
546
      resolver = _SsconfResolver
547
    else:
548
      # Caller provided an address list
549
      resolver = _StaticResolver(address_list)
550

    
551
    proc = _RpcProcessor(resolver,
552
                         netutils.GetDaemonPort(constants.NODED))
553
    return proc(node_list, procedure, body, read_timeout=read_timeout)
554

    
555
  def _SingleNodeCall(self, node, procedure, args, read_timeout=None):
556
    """Helper for making a single-node call
557

558
    """
559
    body = serializer.DumpJson(args, indent=False)
560
    return self._proc([node], procedure, body, read_timeout=read_timeout)[node]
561

    
562
  @classmethod
563
  def _StaticSingleNodeCall(cls, node, procedure, args, read_timeout=None):
564
    """Helper for making a single-node static call
565

566
    """
567
    body = serializer.DumpJson(args, indent=False)
568
    proc = _RpcProcessor(_SsconfResolver,
569
                         netutils.GetDaemonPort(constants.NODED))
570
    return proc([node], procedure, body, read_timeout=read_timeout)[node]
571

    
572
  #
573
  # Begin RPC calls
574
  #
575

    
576
  @_RpcTimeout(_TMO_URGENT)
577
  def call_bdev_sizes(self, node_list, devices):
578
    """Gets the sizes of requested block devices present on a node
579

580
    This is a multi-node call.
581

582
    """
583
    return self._MultiNodeCall(node_list, "bdev_sizes", [devices])
584

    
585
  @_RpcTimeout(_TMO_URGENT)
586
  def call_lv_list(self, node_list, vg_name):
587
    """Gets the logical volumes present in a given volume group.
588

589
    This is a multi-node call.
590

591
    """
592
    return self._MultiNodeCall(node_list, "lv_list", [vg_name])
593

    
594
  @_RpcTimeout(_TMO_URGENT)
595
  def call_vg_list(self, node_list):
596
    """Gets the volume group list.
597

598
    This is a multi-node call.
599

600
    """
601
    return self._MultiNodeCall(node_list, "vg_list", [])
602

    
603
  @_RpcTimeout(_TMO_NORMAL)
604
  def call_storage_list(self, node_list, su_name, su_args, name, fields):
605
    """Get list of storage units.
606

607
    This is a multi-node call.
608

609
    """
610
    return self._MultiNodeCall(node_list, "storage_list",
611
                               [su_name, su_args, name, fields])
612

    
613
  @_RpcTimeout(_TMO_NORMAL)
614
  def call_storage_modify(self, node, su_name, su_args, name, changes):
615
    """Modify a storage unit.
616

617
    This is a single-node call.
618

619
    """
620
    return self._SingleNodeCall(node, "storage_modify",
621
                                [su_name, su_args, name, changes])
622

    
623
  @_RpcTimeout(_TMO_NORMAL)
624
  def call_storage_execute(self, node, su_name, su_args, name, op):
625
    """Executes an operation on a storage unit.
626

627
    This is a single-node call.
628

629
    """
630
    return self._SingleNodeCall(node, "storage_execute",
631
                                [su_name, su_args, name, op])
632

    
633
  @_RpcTimeout(_TMO_URGENT)
634
  def call_bridges_exist(self, node, bridges_list):
635
    """Checks if a node has all the bridges given.
636

637
    This method checks if all bridges given in the bridges_list are
638
    present on the remote node, so that an instance that uses interfaces
639
    on those bridges can be started.
640

641
    This is a single-node call.
642

643
    """
644
    return self._SingleNodeCall(node, "bridges_exist", [bridges_list])
645

    
646
  @_RpcTimeout(_TMO_NORMAL)
647
  def call_instance_start(self, node, instance, hvp, bep, startup_paused):
648
    """Starts an instance.
649

650
    This is a single-node call.
651

652
    """
653
    idict = self._InstDict(instance, hvp=hvp, bep=bep)
654
    return self._SingleNodeCall(node, "instance_start", [idict, startup_paused])
655

    
656
  @_RpcTimeout(_TMO_NORMAL)
657
  def call_instance_shutdown(self, node, instance, timeout):
658
    """Stops an instance.
659

660
    This is a single-node call.
661

662
    """
663
    return self._SingleNodeCall(node, "instance_shutdown",
664
                                [self._InstDict(instance), timeout])
665

    
666
  @_RpcTimeout(_TMO_NORMAL)
667
  def call_migration_info(self, node, instance):
668
    """Gather the information necessary to prepare an instance migration.
669

670
    This is a single-node call.
671

672
    @type node: string
673
    @param node: the node on which the instance is currently running
674
    @type instance: C{objects.Instance}
675
    @param instance: the instance definition
676

677
    """
678
    return self._SingleNodeCall(node, "migration_info",
679
                                [self._InstDict(instance)])
680

    
681
  @_RpcTimeout(_TMO_NORMAL)
682
  def call_accept_instance(self, node, instance, info, target):
683
    """Prepare a node to accept an instance.
684

685
    This is a single-node call.
686

687
    @type node: string
688
    @param node: the target node for the migration
689
    @type instance: C{objects.Instance}
690
    @param instance: the instance definition
691
    @type info: opaque/hypervisor specific (string/data)
692
    @param info: result for the call_migration_info call
693
    @type target: string
694
    @param target: target hostname (usually ip address) (on the node itself)
695

696
    """
697
    return self._SingleNodeCall(node, "accept_instance",
698
                                [self._InstDict(instance), info, target])
699

    
700
  @_RpcTimeout(_TMO_NORMAL)
701
  def call_finalize_migration(self, node, instance, info, success):
702
    """Finalize any target-node migration specific operation.
703

704
    This is called both in case of a successful migration and in case of error
705
    (in which case it should abort the migration).
706

707
    This is a single-node call.
708

709
    @type node: string
710
    @param node: the target node for the migration
711
    @type instance: C{objects.Instance}
712
    @param instance: the instance definition
713
    @type info: opaque/hypervisor specific (string/data)
714
    @param info: result for the call_migration_info call
715
    @type success: boolean
716
    @param success: whether the migration was a success or a failure
717

718
    """
719
    return self._SingleNodeCall(node, "finalize_migration",
720
                                [self._InstDict(instance), info, success])
721

    
722
  @_RpcTimeout(_TMO_SLOW)
723
  def call_instance_migrate(self, node, instance, target, live):
724
    """Migrate an instance.
725

726
    This is a single-node call.
727

728
    @type node: string
729
    @param node: the node on which the instance is currently running
730
    @type instance: C{objects.Instance}
731
    @param instance: the instance definition
732
    @type target: string
733
    @param target: the target node name
734
    @type live: boolean
735
    @param live: whether the migration should be done live or not (the
736
        interpretation of this parameter is left to the hypervisor)
737

738
    """
739
    return self._SingleNodeCall(node, "instance_migrate",
740
                                [self._InstDict(instance), target, live])
741

    
742
  @_RpcTimeout(_TMO_NORMAL)
743
  def call_instance_reboot(self, node, inst, reboot_type, shutdown_timeout):
744
    """Reboots an instance.
745

746
    This is a single-node call.
747

748
    """
749
    return self._SingleNodeCall(node, "instance_reboot",
750
                                [self._InstDict(inst), reboot_type,
751
                                 shutdown_timeout])
752

    
753
  @_RpcTimeout(_TMO_1DAY)
754
  def call_instance_os_add(self, node, inst, reinstall, debug, osparams=None):
755
    """Installs an OS on the given instance.
756

757
    This is a single-node call.
758

759
    """
760
    return self._SingleNodeCall(node, "instance_os_add",
761
                                [self._InstDict(inst, osp=osparams),
762
                                 reinstall, debug])
763

    
764
  @_RpcTimeout(_TMO_SLOW)
765
  def call_instance_run_rename(self, node, inst, old_name, debug):
766
    """Run the OS rename script for an instance.
767

768
    This is a single-node call.
769

770
    """
771
    return self._SingleNodeCall(node, "instance_run_rename",
772
                                [self._InstDict(inst), old_name, debug])
773

    
774
  @_RpcTimeout(_TMO_URGENT)
775
  def call_instance_info(self, node, instance, hname):
776
    """Returns information about a single instance.
777

778
    This is a single-node call.
779

780
    @type node: list
781
    @param node: the list of nodes to query
782
    @type instance: string
783
    @param instance: the instance name
784
    @type hname: string
785
    @param hname: the hypervisor type of the instance
786

787
    """
788
    return self._SingleNodeCall(node, "instance_info", [instance, hname])
789

    
790
  @_RpcTimeout(_TMO_NORMAL)
791
  def call_instance_migratable(self, node, instance):
792
    """Checks whether the given instance can be migrated.
793

794
    This is a single-node call.
795

796
    @param node: the node to query
797
    @type instance: L{objects.Instance}
798
    @param instance: the instance to check
799

800

801
    """
802
    return self._SingleNodeCall(node, "instance_migratable",
803
                                [self._InstDict(instance)])
804

    
805
  @_RpcTimeout(_TMO_URGENT)
806
  def call_all_instances_info(self, node_list, hypervisor_list):
807
    """Returns information about all instances on the given nodes.
808

809
    This is a multi-node call.
810

811
    @type node_list: list
812
    @param node_list: the list of nodes to query
813
    @type hypervisor_list: list
814
    @param hypervisor_list: the hypervisors to query for instances
815

816
    """
817
    return self._MultiNodeCall(node_list, "all_instances_info",
818
                               [hypervisor_list])
819

    
820
  @_RpcTimeout(_TMO_URGENT)
821
  def call_instance_list(self, node_list, hypervisor_list):
822
    """Returns the list of running instances on a given node.
823

824
    This is a multi-node call.
825

826
    @type node_list: list
827
    @param node_list: the list of nodes to query
828
    @type hypervisor_list: list
829
    @param hypervisor_list: the hypervisors to query for instances
830

831
    """
832
    return self._MultiNodeCall(node_list, "instance_list", [hypervisor_list])
833

    
834
  @_RpcTimeout(_TMO_FAST)
835
  def call_node_tcp_ping(self, node, source, target, port, timeout,
836
                         live_port_needed):
837
    """Do a TcpPing on the remote node
838

839
    This is a single-node call.
840

841
    """
842
    return self._SingleNodeCall(node, "node_tcp_ping",
843
                                [source, target, port, timeout,
844
                                 live_port_needed])
845

    
846
  @_RpcTimeout(_TMO_FAST)
847
  def call_node_has_ip_address(self, node, address):
848
    """Checks if a node has the given IP address.
849

850
    This is a single-node call.
851

852
    """
853
    return self._SingleNodeCall(node, "node_has_ip_address", [address])
854

    
855
  @_RpcTimeout(_TMO_URGENT)
856
  def call_node_info(self, node_list, vg_name, hypervisor_type):
857
    """Return node information.
858

859
    This will return memory information and volume group size and free
860
    space.
861

862
    This is a multi-node call.
863

864
    @type node_list: list
865
    @param node_list: the list of nodes to query
866
    @type vg_name: C{string}
867
    @param vg_name: the name of the volume group to ask for disk space
868
        information
869
    @type hypervisor_type: C{str}
870
    @param hypervisor_type: the name of the hypervisor to ask for
871
        memory information
872

873
    """
874
    return self._MultiNodeCall(node_list, "node_info",
875
                               [vg_name, hypervisor_type])
876

    
877
  @_RpcTimeout(_TMO_NORMAL)
878
  def call_etc_hosts_modify(self, node, mode, name, ip):
879
    """Modify hosts file with name
880

881
    @type node: string
882
    @param node: The node to call
883
    @type mode: string
884
    @param mode: The mode to operate. Currently "add" or "remove"
885
    @type name: string
886
    @param name: The host name to be modified
887
    @type ip: string
888
    @param ip: The ip of the entry (just valid if mode is "add")
889

890
    """
891
    return self._SingleNodeCall(node, "etc_hosts_modify", [mode, name, ip])
892

    
893
  @_RpcTimeout(_TMO_NORMAL)
894
  def call_node_verify(self, node_list, checkdict, cluster_name):
895
    """Request verification of given parameters.
896

897
    This is a multi-node call.
898

899
    """
900
    return self._MultiNodeCall(node_list, "node_verify",
901
                               [checkdict, cluster_name])
902

    
903
  @classmethod
904
  @_RpcTimeout(_TMO_FAST)
905
  def call_node_start_master(cls, node, start_daemons, no_voting):
906
    """Tells a node to activate itself as a master.
907

908
    This is a single-node call.
909

910
    """
911
    return cls._StaticSingleNodeCall(node, "node_start_master",
912
                                     [start_daemons, no_voting])
913

    
914
  @classmethod
915
  @_RpcTimeout(_TMO_FAST)
916
  def call_node_stop_master(cls, node, stop_daemons):
917
    """Tells a node to demote itself from master status.
918

919
    This is a single-node call.
920

921
    """
922
    return cls._StaticSingleNodeCall(node, "node_stop_master", [stop_daemons])
923

    
924
  @classmethod
925
  @_RpcTimeout(_TMO_URGENT)
926
  def call_master_info(cls, node_list):
927
    """Query master info.
928

929
    This is a multi-node call.
930

931
    """
932
    # TODO: should this method query down nodes?
933
    return cls._StaticMultiNodeCall(node_list, "master_info", [])
934

    
935
  @classmethod
936
  @_RpcTimeout(_TMO_URGENT)
937
  def call_version(cls, node_list):
938
    """Query node version.
939

940
    This is a multi-node call.
941

942
    """
943
    return cls._StaticMultiNodeCall(node_list, "version", [])
944

    
945
  @_RpcTimeout(_TMO_NORMAL)
946
  def call_blockdev_create(self, node, bdev, size, owner, on_primary, info):
947
    """Request creation of a given block device.
948

949
    This is a single-node call.
950

951
    """
952
    return self._SingleNodeCall(node, "blockdev_create",
953
                                [bdev.ToDict(), size, owner, on_primary, info])
954

    
955
  @_RpcTimeout(_TMO_SLOW)
956
  def call_blockdev_wipe(self, node, bdev, offset, size):
957
    """Request wipe at given offset with given size of a block device.
958

959
    This is a single-node call.
960

961
    """
962
    return self._SingleNodeCall(node, "blockdev_wipe",
963
                                [bdev.ToDict(), offset, size])
964

    
965
  @_RpcTimeout(_TMO_NORMAL)
966
  def call_blockdev_remove(self, node, bdev):
967
    """Request removal of a given block device.
968

969
    This is a single-node call.
970

971
    """
972
    return self._SingleNodeCall(node, "blockdev_remove", [bdev.ToDict()])
973

    
974
  @_RpcTimeout(_TMO_NORMAL)
975
  def call_blockdev_rename(self, node, devlist):
976
    """Request rename of the given block devices.
977

978
    This is a single-node call.
979

980
    """
981
    return self._SingleNodeCall(node, "blockdev_rename",
982
                                [(d.ToDict(), uid) for d, uid in devlist])
983

    
984
  @_RpcTimeout(_TMO_NORMAL)
985
  def call_blockdev_pause_resume_sync(self, node, disks, pause):
986
    """Request a pause/resume of given block device.
987

988
    This is a single-node call.
989

990
    """
991
    return self._SingleNodeCall(node, "blockdev_pause_resume_sync",
992
                                [[bdev.ToDict() for bdev in disks], pause])
993

    
994
  @_RpcTimeout(_TMO_NORMAL)
995
  def call_blockdev_assemble(self, node, disk, owner, on_primary, idx):
996
    """Request assembling of a given block device.
997

998
    This is a single-node call.
999

1000
    """
1001
    return self._SingleNodeCall(node, "blockdev_assemble",
1002
                                [disk.ToDict(), owner, on_primary, idx])
1003

    
1004
  @_RpcTimeout(_TMO_NORMAL)
1005
  def call_blockdev_shutdown(self, node, disk):
1006
    """Request shutdown of a given block device.
1007

1008
    This is a single-node call.
1009

1010
    """
1011
    return self._SingleNodeCall(node, "blockdev_shutdown", [disk.ToDict()])
1012

    
1013
  @_RpcTimeout(_TMO_NORMAL)
1014
  def call_blockdev_addchildren(self, node, bdev, ndevs):
1015
    """Request adding a list of children to a (mirroring) device.
1016

1017
    This is a single-node call.
1018

1019
    """
1020
    return self._SingleNodeCall(node, "blockdev_addchildren",
1021
                                [bdev.ToDict(),
1022
                                 [disk.ToDict() for disk in ndevs]])
1023

    
1024
  @_RpcTimeout(_TMO_NORMAL)
1025
  def call_blockdev_removechildren(self, node, bdev, ndevs):
1026
    """Request removing a list of children from a (mirroring) device.
1027

1028
    This is a single-node call.
1029

1030
    """
1031
    return self._SingleNodeCall(node, "blockdev_removechildren",
1032
                                [bdev.ToDict(),
1033
                                 [disk.ToDict() for disk in ndevs]])
1034

    
1035
  @_RpcTimeout(_TMO_NORMAL)
1036
  def call_blockdev_getmirrorstatus(self, node, disks):
1037
    """Request status of a (mirroring) device.
1038

1039
    This is a single-node call.
1040

1041
    """
1042
    result = self._SingleNodeCall(node, "blockdev_getmirrorstatus",
1043
                                  [dsk.ToDict() for dsk in disks])
1044
    if not result.fail_msg:
1045
      result.payload = [objects.BlockDevStatus.FromDict(i)
1046
                        for i in result.payload]
1047
    return result
1048

    
1049
  @_RpcTimeout(_TMO_NORMAL)
1050
  def call_blockdev_getmirrorstatus_multi(self, node_list, node_disks):
1051
    """Request status of (mirroring) devices from multiple nodes.
1052

1053
    This is a multi-node call.
1054

1055
    """
1056
    result = self._MultiNodeCall(node_list, "blockdev_getmirrorstatus_multi",
1057
                                 [dict((name, [dsk.ToDict() for dsk in disks])
1058
                                       for name, disks in node_disks.items())])
1059
    for nres in result.values():
1060
      if nres.fail_msg:
1061
        continue
1062

    
1063
      for idx, (success, status) in enumerate(nres.payload):
1064
        if success:
1065
          nres.payload[idx] = (success, objects.BlockDevStatus.FromDict(status))
1066

    
1067
    return result
1068

    
1069
  @_RpcTimeout(_TMO_NORMAL)
1070
  def call_blockdev_find(self, node, disk):
1071
    """Request identification of a given block device.
1072

1073
    This is a single-node call.
1074

1075
    """
1076
    result = self._SingleNodeCall(node, "blockdev_find", [disk.ToDict()])
1077
    if not result.fail_msg and result.payload is not None:
1078
      result.payload = objects.BlockDevStatus.FromDict(result.payload)
1079
    return result
1080

    
1081
  @_RpcTimeout(_TMO_NORMAL)
1082
  def call_blockdev_close(self, node, instance_name, disks):
1083
    """Closes the given block devices.
1084

1085
    This is a single-node call.
1086

1087
    """
1088
    params = [instance_name, [cf.ToDict() for cf in disks]]
1089
    return self._SingleNodeCall(node, "blockdev_close", params)
1090

    
1091
  @_RpcTimeout(_TMO_NORMAL)
1092
  def call_blockdev_getsize(self, node, disks):
1093
    """Returns the size of the given disks.
1094

1095
    This is a single-node call.
1096

1097
    """
1098
    params = [[cf.ToDict() for cf in disks]]
1099
    return self._SingleNodeCall(node, "blockdev_getsize", params)
1100

    
1101
  @_RpcTimeout(_TMO_NORMAL)
1102
  def call_drbd_disconnect_net(self, node_list, nodes_ip, disks):
1103
    """Disconnects the network of the given drbd devices.
1104

1105
    This is a multi-node call.
1106

1107
    """
1108
    return self._MultiNodeCall(node_list, "drbd_disconnect_net",
1109
                               [nodes_ip, [cf.ToDict() for cf in disks]])
1110

    
1111
  @_RpcTimeout(_TMO_NORMAL)
1112
  def call_drbd_attach_net(self, node_list, nodes_ip,
1113
                           disks, instance_name, multimaster):
1114
    """Disconnects the given drbd devices.
1115

1116
    This is a multi-node call.
1117

1118
    """
1119
    return self._MultiNodeCall(node_list, "drbd_attach_net",
1120
                               [nodes_ip, [cf.ToDict() for cf in disks],
1121
                                instance_name, multimaster])
1122

    
1123
  @_RpcTimeout(_TMO_SLOW)
1124
  def call_drbd_wait_sync(self, node_list, nodes_ip, disks):
1125
    """Waits for the synchronization of drbd devices is complete.
1126

1127
    This is a multi-node call.
1128

1129
    """
1130
    return self._MultiNodeCall(node_list, "drbd_wait_sync",
1131
                               [nodes_ip, [cf.ToDict() for cf in disks]])
1132

    
1133
  @_RpcTimeout(_TMO_URGENT)
1134
  def call_drbd_helper(self, node_list):
1135
    """Gets drbd helper.
1136

1137
    This is a multi-node call.
1138

1139
    """
1140
    return self._MultiNodeCall(node_list, "drbd_helper", [])
1141

    
1142
  @classmethod
1143
  @_RpcTimeout(_TMO_NORMAL)
1144
  def call_upload_file(cls, node_list, file_name, address_list=None):
1145
    """Upload a file.
1146

1147
    The node will refuse the operation in case the file is not on the
1148
    approved file list.
1149

1150
    This is a multi-node call.
1151

1152
    @type node_list: list
1153
    @param node_list: the list of node names to upload to
1154
    @type file_name: str
1155
    @param file_name: the filename to upload
1156
    @type address_list: list or None
1157
    @keyword address_list: an optional list of node addresses, in order
1158
        to optimize the RPC speed
1159

1160
    """
1161
    file_contents = utils.ReadFile(file_name)
1162
    data = _Compress(file_contents)
1163
    st = os.stat(file_name)
1164
    getents = runtime.GetEnts()
1165
    params = [file_name, data, st.st_mode, getents.LookupUid(st.st_uid),
1166
              getents.LookupGid(st.st_gid), st.st_atime, st.st_mtime]
1167
    return cls._StaticMultiNodeCall(node_list, "upload_file", params,
1168
                                    address_list=address_list)
1169

    
1170
  @classmethod
1171
  @_RpcTimeout(_TMO_NORMAL)
1172
  def call_write_ssconf_files(cls, node_list, values):
1173
    """Write ssconf files.
1174

1175
    This is a multi-node call.
1176

1177
    """
1178
    return cls._StaticMultiNodeCall(node_list, "write_ssconf_files", [values])
1179

    
1180
  @_RpcTimeout(_TMO_NORMAL)
1181
  def call_run_oob(self, node, oob_program, command, remote_node, timeout):
1182
    """Runs OOB.
1183

1184
    This is a single-node call.
1185

1186
    """
1187
    return self._SingleNodeCall(node, "run_oob", [oob_program, command,
1188
                                                  remote_node, timeout])
1189

    
1190
  @_RpcTimeout(_TMO_FAST)
1191
  def call_os_diagnose(self, node_list):
1192
    """Request a diagnose of OS definitions.
1193

1194
    This is a multi-node call.
1195

1196
    """
1197
    return self._MultiNodeCall(node_list, "os_diagnose", [])
1198

    
1199
  @_RpcTimeout(_TMO_FAST)
1200
  def call_os_get(self, node, name):
1201
    """Returns an OS definition.
1202

1203
    This is a single-node call.
1204

1205
    """
1206
    result = self._SingleNodeCall(node, "os_get", [name])
1207
    if not result.fail_msg and isinstance(result.payload, dict):
1208
      result.payload = objects.OS.FromDict(result.payload)
1209
    return result
1210

    
1211
  @_RpcTimeout(_TMO_FAST)
1212
  def call_os_validate(self, required, nodes, name, checks, params):
1213
    """Run a validation routine for a given OS.
1214

1215
    This is a multi-node call.
1216

1217
    """
1218
    return self._MultiNodeCall(nodes, "os_validate",
1219
                               [required, name, checks, params])
1220

    
1221
  @_RpcTimeout(_TMO_NORMAL)
1222
  def call_hooks_runner(self, node_list, hpath, phase, env):
1223
    """Call the hooks runner.
1224

1225
    Args:
1226
      - op: the OpCode instance
1227
      - env: a dictionary with the environment
1228

1229
    This is a multi-node call.
1230

1231
    """
1232
    params = [hpath, phase, env]
1233
    return self._MultiNodeCall(node_list, "hooks_runner", params)
1234

    
1235
  @_RpcTimeout(_TMO_NORMAL)
1236
  def call_iallocator_runner(self, node, name, idata):
1237
    """Call an iallocator on a remote node
1238

1239
    Args:
1240
      - name: the iallocator name
1241
      - input: the json-encoded input string
1242

1243
    This is a single-node call.
1244

1245
    """
1246
    return self._SingleNodeCall(node, "iallocator_runner", [name, idata])
1247

    
1248
  @_RpcTimeout(_TMO_NORMAL)
1249
  def call_blockdev_grow(self, node, cf_bdev, amount, dryrun):
1250
    """Request a snapshot of the given block device.
1251

1252
    This is a single-node call.
1253

1254
    """
1255
    return self._SingleNodeCall(node, "blockdev_grow",
1256
                                [cf_bdev.ToDict(), amount, dryrun])
1257

    
1258
  @_RpcTimeout(_TMO_1DAY)
1259
  def call_blockdev_export(self, node, cf_bdev,
1260
                           dest_node, dest_path, cluster_name):
1261
    """Export a given disk to another node.
1262

1263
    This is a single-node call.
1264

1265
    """
1266
    return self._SingleNodeCall(node, "blockdev_export",
1267
                                [cf_bdev.ToDict(), dest_node, dest_path,
1268
                                 cluster_name])
1269

    
1270
  @_RpcTimeout(_TMO_NORMAL)
1271
  def call_blockdev_snapshot(self, node, cf_bdev):
1272
    """Request a snapshot of the given block device.
1273

1274
    This is a single-node call.
1275

1276
    """
1277
    return self._SingleNodeCall(node, "blockdev_snapshot", [cf_bdev.ToDict()])
1278

    
1279
  @_RpcTimeout(_TMO_NORMAL)
1280
  def call_finalize_export(self, node, instance, snap_disks):
1281
    """Request the completion of an export operation.
1282

1283
    This writes the export config file, etc.
1284

1285
    This is a single-node call.
1286

1287
    """
1288
    flat_disks = []
1289
    for disk in snap_disks:
1290
      if isinstance(disk, bool):
1291
        flat_disks.append(disk)
1292
      else:
1293
        flat_disks.append(disk.ToDict())
1294

    
1295
    return self._SingleNodeCall(node, "finalize_export",
1296
                                [self._InstDict(instance), flat_disks])
1297

    
1298
  @_RpcTimeout(_TMO_FAST)
1299
  def call_export_info(self, node, path):
1300
    """Queries the export information in a given path.
1301

1302
    This is a single-node call.
1303

1304
    """
1305
    return self._SingleNodeCall(node, "export_info", [path])
1306

    
1307
  @_RpcTimeout(_TMO_FAST)
1308
  def call_export_list(self, node_list):
1309
    """Gets the stored exports list.
1310

1311
    This is a multi-node call.
1312

1313
    """
1314
    return self._MultiNodeCall(node_list, "export_list", [])
1315

    
1316
  @_RpcTimeout(_TMO_FAST)
1317
  def call_export_remove(self, node, export):
1318
    """Requests removal of a given export.
1319

1320
    This is a single-node call.
1321

1322
    """
1323
    return self._SingleNodeCall(node, "export_remove", [export])
1324

    
1325
  @classmethod
1326
  @_RpcTimeout(_TMO_NORMAL)
1327
  def call_node_leave_cluster(cls, node, modify_ssh_setup):
1328
    """Requests a node to clean the cluster information it has.
1329

1330
    This will remove the configuration information from the ganeti data
1331
    dir.
1332

1333
    This is a single-node call.
1334

1335
    """
1336
    return cls._StaticSingleNodeCall(node, "node_leave_cluster",
1337
                                     [modify_ssh_setup])
1338

    
1339
  @_RpcTimeout(_TMO_FAST)
1340
  def call_node_volumes(self, node_list):
1341
    """Gets all volumes on node(s).
1342

1343
    This is a multi-node call.
1344

1345
    """
1346
    return self._MultiNodeCall(node_list, "node_volumes", [])
1347

    
1348
  @_RpcTimeout(_TMO_FAST)
1349
  def call_node_demote_from_mc(self, node):
1350
    """Demote a node from the master candidate role.
1351

1352
    This is a single-node call.
1353

1354
    """
1355
    return self._SingleNodeCall(node, "node_demote_from_mc", [])
1356

    
1357
  @_RpcTimeout(_TMO_NORMAL)
1358
  def call_node_powercycle(self, node, hypervisor):
1359
    """Tries to powercycle a node.
1360

1361
    This is a single-node call.
1362

1363
    """
1364
    return self._SingleNodeCall(node, "node_powercycle", [hypervisor])
1365

    
1366
  @_RpcTimeout(None)
1367
  def call_test_delay(self, node_list, duration):
1368
    """Sleep for a fixed time on given node(s).
1369

1370
    This is a multi-node call.
1371

1372
    """
1373
    return self._MultiNodeCall(node_list, "test_delay", [duration],
1374
                               read_timeout=int(duration + 5))
1375

    
1376
  @_RpcTimeout(_TMO_FAST)
1377
  def call_file_storage_dir_create(self, node, file_storage_dir):
1378
    """Create the given file storage directory.
1379

1380
    This is a single-node call.
1381

1382
    """
1383
    return self._SingleNodeCall(node, "file_storage_dir_create",
1384
                                [file_storage_dir])
1385

    
1386
  @_RpcTimeout(_TMO_FAST)
1387
  def call_file_storage_dir_remove(self, node, file_storage_dir):
1388
    """Remove the given file storage directory.
1389

1390
    This is a single-node call.
1391

1392
    """
1393
    return self._SingleNodeCall(node, "file_storage_dir_remove",
1394
                                [file_storage_dir])
1395

    
1396
  @_RpcTimeout(_TMO_FAST)
1397
  def call_file_storage_dir_rename(self, node, old_file_storage_dir,
1398
                                   new_file_storage_dir):
1399
    """Rename file storage directory.
1400

1401
    This is a single-node call.
1402

1403
    """
1404
    return self._SingleNodeCall(node, "file_storage_dir_rename",
1405
                                [old_file_storage_dir, new_file_storage_dir])
1406

    
1407
  @classmethod
1408
  @_RpcTimeout(_TMO_URGENT)
1409
  def call_jobqueue_update(cls, node_list, address_list, file_name, content):
1410
    """Update job queue.
1411

1412
    This is a multi-node call.
1413

1414
    """
1415
    return cls._StaticMultiNodeCall(node_list, "jobqueue_update",
1416
                                    [file_name, _Compress(content)],
1417
                                    address_list=address_list)
1418

    
1419
  @classmethod
1420
  @_RpcTimeout(_TMO_NORMAL)
1421
  def call_jobqueue_purge(cls, node):
1422
    """Purge job queue.
1423

1424
    This is a single-node call.
1425

1426
    """
1427
    return cls._StaticSingleNodeCall(node, "jobqueue_purge", [])
1428

    
1429
  @classmethod
1430
  @_RpcTimeout(_TMO_URGENT)
1431
  def call_jobqueue_rename(cls, node_list, address_list, rename):
1432
    """Rename a job queue file.
1433

1434
    This is a multi-node call.
1435

1436
    """
1437
    return cls._StaticMultiNodeCall(node_list, "jobqueue_rename", rename,
1438
                                    address_list=address_list)
1439

    
1440
  @_RpcTimeout(_TMO_NORMAL)
1441
  def call_hypervisor_validate_params(self, node_list, hvname, hvparams):
1442
    """Validate the hypervisor params.
1443

1444
    This is a multi-node call.
1445

1446
    @type node_list: list
1447
    @param node_list: the list of nodes to query
1448
    @type hvname: string
1449
    @param hvname: the hypervisor name
1450
    @type hvparams: dict
1451
    @param hvparams: the hypervisor parameters to be validated
1452

1453
    """
1454
    cluster = self._cfg.GetClusterInfo()
1455
    hv_full = objects.FillDict(cluster.hvparams.get(hvname, {}), hvparams)
1456
    return self._MultiNodeCall(node_list, "hypervisor_validate_params",
1457
                               [hvname, hv_full])
1458

    
1459
  @_RpcTimeout(_TMO_NORMAL)
1460
  def call_x509_cert_create(self, node, validity):
1461
    """Creates a new X509 certificate for SSL/TLS.
1462

1463
    This is a single-node call.
1464

1465
    @type validity: int
1466
    @param validity: Validity in seconds
1467

1468
    """
1469
    return self._SingleNodeCall(node, "x509_cert_create", [validity])
1470

    
1471
  @_RpcTimeout(_TMO_NORMAL)
1472
  def call_x509_cert_remove(self, node, name):
1473
    """Removes a X509 certificate.
1474

1475
    This is a single-node call.
1476

1477
    @type name: string
1478
    @param name: Certificate name
1479

1480
    """
1481
    return self._SingleNodeCall(node, "x509_cert_remove", [name])
1482

    
1483
  @_RpcTimeout(_TMO_NORMAL)
1484
  def call_import_start(self, node, opts, instance, component,
1485
                        dest, dest_args):
1486
    """Starts a listener for an import.
1487

1488
    This is a single-node call.
1489

1490
    @type node: string
1491
    @param node: Node name
1492
    @type instance: C{objects.Instance}
1493
    @param instance: Instance object
1494
    @type component: string
1495
    @param component: which part of the instance is being imported
1496

1497
    """
1498
    return self._SingleNodeCall(node, "import_start",
1499
                                [opts.ToDict(),
1500
                                 self._InstDict(instance), component, dest,
1501
                                 _EncodeImportExportIO(dest, dest_args)])
1502

    
1503
  @_RpcTimeout(_TMO_NORMAL)
1504
  def call_export_start(self, node, opts, host, port,
1505
                        instance, component, source, source_args):
1506
    """Starts an export daemon.
1507

1508
    This is a single-node call.
1509

1510
    @type node: string
1511
    @param node: Node name
1512
    @type instance: C{objects.Instance}
1513
    @param instance: Instance object
1514
    @type component: string
1515
    @param component: which part of the instance is being imported
1516

1517
    """
1518
    return self._SingleNodeCall(node, "export_start",
1519
                                [opts.ToDict(), host, port,
1520
                                 self._InstDict(instance),
1521
                                 component, source,
1522
                                 _EncodeImportExportIO(source, source_args)])
1523

    
1524
  @_RpcTimeout(_TMO_FAST)
1525
  def call_impexp_status(self, node, names):
1526
    """Gets the status of an import or export.
1527

1528
    This is a single-node call.
1529

1530
    @type node: string
1531
    @param node: Node name
1532
    @type names: List of strings
1533
    @param names: Import/export names
1534
    @rtype: List of L{objects.ImportExportStatus} instances
1535
    @return: Returns a list of the state of each named import/export or None if
1536
             a status couldn't be retrieved
1537

1538
    """
1539
    result = self._SingleNodeCall(node, "impexp_status", [names])
1540

    
1541
    if not result.fail_msg:
1542
      decoded = []
1543

    
1544
      for i in result.payload:
1545
        if i is None:
1546
          decoded.append(None)
1547
          continue
1548
        decoded.append(objects.ImportExportStatus.FromDict(i))
1549

    
1550
      result.payload = decoded
1551

    
1552
    return result
1553

    
1554
  @_RpcTimeout(_TMO_NORMAL)
1555
  def call_impexp_abort(self, node, name):
1556
    """Aborts an import or export.
1557

1558
    This is a single-node call.
1559

1560
    @type node: string
1561
    @param node: Node name
1562
    @type name: string
1563
    @param name: Import/export name
1564

1565
    """
1566
    return self._SingleNodeCall(node, "impexp_abort", [name])
1567

    
1568
  @_RpcTimeout(_TMO_NORMAL)
1569
  def call_impexp_cleanup(self, node, name):
1570
    """Cleans up after an import or export.
1571

1572
    This is a single-node call.
1573

1574
    @type node: string
1575
    @param node: Node name
1576
    @type name: string
1577
    @param name: Import/export name
1578

1579
    """
1580
    return self._SingleNodeCall(node, "impexp_cleanup", [name])