Statistics
| Branch: | Tag: | Revision:

root / lib / rpc.py @ aea5caef

History | View | Annotate | Download (46.1 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Inter-node RPC library.
23

24
"""
25

    
26
# pylint: disable=C0103,R0201,R0904
27
# C0103: Invalid name, since call_ are not valid
28
# R0201: Method could be a function, we keep all rpcs instance methods
29
# as not to change them back and forth between static/instance methods
30
# if they need to start using instance attributes
31
# R0904: Too many public methods
32

    
33
import os
34
import logging
35
import zlib
36
import base64
37
import pycurl
38
import threading
39

    
40
from ganeti import utils
41
from ganeti import objects
42
from ganeti import http
43
from ganeti import serializer
44
from ganeti import constants
45
from ganeti import errors
46
from ganeti import netutils
47
from ganeti import ssconf
48
from ganeti import runtime
49
from ganeti import compat
50

    
51
# pylint has a bug here, doesn't see this import
52
import ganeti.http.client  # pylint: disable=W0611
53

    
54

    
55
# Timeout for connecting to nodes (seconds)
56
_RPC_CONNECT_TIMEOUT = 5
57

    
58
_RPC_CLIENT_HEADERS = [
59
  "Content-type: %s" % http.HTTP_APP_JSON,
60
  "Expect:",
61
  ]
62

    
63
# Various time constants for the timeout table
64
_TMO_URGENT = 60 # one minute
65
_TMO_FAST = 5 * 60 # five minutes
66
_TMO_NORMAL = 15 * 60 # 15 minutes
67
_TMO_SLOW = 3600 # one hour
68
_TMO_4HRS = 4 * 3600
69
_TMO_1DAY = 86400
70

    
71
# Timeout table that will be built later by decorators
72
# Guidelines for choosing timeouts:
73
# - call used during watcher: timeout -> 1min, _TMO_URGENT
74
# - trivial (but be sure it is trivial) (e.g. reading a file): 5min, _TMO_FAST
75
# - other calls: 15 min, _TMO_NORMAL
76
# - special calls (instance add, etc.): either _TMO_SLOW (1h) or huge timeouts
77

    
78
_TIMEOUTS = {
79
}
80

    
81
#: Special value to describe an offline host
82
_OFFLINE = object()
83

    
84

    
85
def Init():
86
  """Initializes the module-global HTTP client manager.
87

88
  Must be called before using any RPC function and while exactly one thread is
89
  running.
90

91
  """
92
  # curl_global_init(3) and curl_global_cleanup(3) must be called with only
93
  # one thread running. This check is just a safety measure -- it doesn't
94
  # cover all cases.
95
  assert threading.activeCount() == 1, \
96
         "Found more than one active thread when initializing pycURL"
97

    
98
  logging.info("Using PycURL %s", pycurl.version)
99

    
100
  pycurl.global_init(pycurl.GLOBAL_ALL)
101

    
102

    
103
def Shutdown():
104
  """Stops the module-global HTTP client manager.
105

106
  Must be called before quitting the program and while exactly one thread is
107
  running.
108

109
  """
110
  pycurl.global_cleanup()
111

    
112

    
113
def _ConfigRpcCurl(curl):
114
  noded_cert = str(constants.NODED_CERT_FILE)
115

    
116
  curl.setopt(pycurl.FOLLOWLOCATION, False)
117
  curl.setopt(pycurl.CAINFO, noded_cert)
118
  curl.setopt(pycurl.SSL_VERIFYHOST, 0)
119
  curl.setopt(pycurl.SSL_VERIFYPEER, True)
120
  curl.setopt(pycurl.SSLCERTTYPE, "PEM")
121
  curl.setopt(pycurl.SSLCERT, noded_cert)
122
  curl.setopt(pycurl.SSLKEYTYPE, "PEM")
123
  curl.setopt(pycurl.SSLKEY, noded_cert)
124
  curl.setopt(pycurl.CONNECTTIMEOUT, _RPC_CONNECT_TIMEOUT)
125

    
126

    
127
# Aliasing this module avoids the following warning by epydoc: "Warning: No
128
# information available for ganeti.rpc._RpcThreadLocal's base threading.local"
129
_threading = threading
130

    
131

    
132
class _RpcThreadLocal(_threading.local):
133
  def GetHttpClientPool(self):
134
    """Returns a per-thread HTTP client pool.
135

136
    @rtype: L{http.client.HttpClientPool}
137

138
    """
139
    try:
140
      pool = self.hcp
141
    except AttributeError:
142
      pool = http.client.HttpClientPool(_ConfigRpcCurl)
143
      self.hcp = pool
144

    
145
    return pool
146

    
147

    
148
# Remove module alias (see above)
149
del _threading
150

    
151

    
152
_thread_local = _RpcThreadLocal()
153

    
154

    
155
def _RpcTimeout(secs):
156
  """Timeout decorator.
157

158
  When applied to a rpc call_* function, it updates the global timeout
159
  table with the given function/timeout.
160

161
  """
162
  def decorator(f):
163
    name = f.__name__
164
    assert name.startswith("call_")
165
    _TIMEOUTS[name[len("call_"):]] = secs
166
    return f
167
  return decorator
168

    
169

    
170
def RunWithRPC(fn):
171
  """RPC-wrapper decorator.
172

173
  When applied to a function, it runs it with the RPC system
174
  initialized, and it shutsdown the system afterwards. This means the
175
  function must be called without RPC being initialized.
176

177
  """
178
  def wrapper(*args, **kwargs):
179
    Init()
180
    try:
181
      return fn(*args, **kwargs)
182
    finally:
183
      Shutdown()
184
  return wrapper
185

    
186

    
187
def _Compress(data):
188
  """Compresses a string for transport over RPC.
189

190
  Small amounts of data are not compressed.
191

192
  @type data: str
193
  @param data: Data
194
  @rtype: tuple
195
  @return: Encoded data to send
196

197
  """
198
  # Small amounts of data are not compressed
199
  if len(data) < 512:
200
    return (constants.RPC_ENCODING_NONE, data)
201

    
202
  # Compress with zlib and encode in base64
203
  return (constants.RPC_ENCODING_ZLIB_BASE64,
204
          base64.b64encode(zlib.compress(data, 3)))
205

    
206

    
207
class RpcResult(object):
208
  """RPC Result class.
209

210
  This class holds an RPC result. It is needed since in multi-node
211
  calls we can't raise an exception just because one one out of many
212
  failed, and therefore we use this class to encapsulate the result.
213

214
  @ivar data: the data payload, for successful results, or None
215
  @ivar call: the name of the RPC call
216
  @ivar node: the name of the node to which we made the call
217
  @ivar offline: whether the operation failed because the node was
218
      offline, as opposed to actual failure; offline=True will always
219
      imply failed=True, in order to allow simpler checking if
220
      the user doesn't care about the exact failure mode
221
  @ivar fail_msg: the error message if the call failed
222

223
  """
224
  def __init__(self, data=None, failed=False, offline=False,
225
               call=None, node=None):
226
    self.offline = offline
227
    self.call = call
228
    self.node = node
229

    
230
    if offline:
231
      self.fail_msg = "Node is marked offline"
232
      self.data = self.payload = None
233
    elif failed:
234
      self.fail_msg = self._EnsureErr(data)
235
      self.data = self.payload = None
236
    else:
237
      self.data = data
238
      if not isinstance(self.data, (tuple, list)):
239
        self.fail_msg = ("RPC layer error: invalid result type (%s)" %
240
                         type(self.data))
241
        self.payload = None
242
      elif len(data) != 2:
243
        self.fail_msg = ("RPC layer error: invalid result length (%d), "
244
                         "expected 2" % len(self.data))
245
        self.payload = None
246
      elif not self.data[0]:
247
        self.fail_msg = self._EnsureErr(self.data[1])
248
        self.payload = None
249
      else:
250
        # finally success
251
        self.fail_msg = None
252
        self.payload = data[1]
253

    
254
    for attr_name in ["call", "data", "fail_msg",
255
                      "node", "offline", "payload"]:
256
      assert hasattr(self, attr_name), "Missing attribute %s" % attr_name
257

    
258
  @staticmethod
259
  def _EnsureErr(val):
260
    """Helper to ensure we return a 'True' value for error."""
261
    if val:
262
      return val
263
    else:
264
      return "No error information"
265

    
266
  def Raise(self, msg, prereq=False, ecode=None):
267
    """If the result has failed, raise an OpExecError.
268

269
    This is used so that LU code doesn't have to check for each
270
    result, but instead can call this function.
271

272
    """
273
    if not self.fail_msg:
274
      return
275

    
276
    if not msg: # one could pass None for default message
277
      msg = ("Call '%s' to node '%s' has failed: %s" %
278
             (self.call, self.node, self.fail_msg))
279
    else:
280
      msg = "%s: %s" % (msg, self.fail_msg)
281
    if prereq:
282
      ec = errors.OpPrereqError
283
    else:
284
      ec = errors.OpExecError
285
    if ecode is not None:
286
      args = (msg, ecode)
287
    else:
288
      args = (msg, )
289
    raise ec(*args) # pylint: disable=W0142
290

    
291

    
292
def _SsconfResolver(node_list,
293
                    ssc=ssconf.SimpleStore,
294
                    nslookup_fn=netutils.Hostname.GetIP):
295
  """Return addresses for given node names.
296

297
  @type node_list: list
298
  @param node_list: List of node names
299
  @type ssc: class
300
  @param ssc: SimpleStore class that is used to obtain node->ip mappings
301
  @type nslookup_fn: callable
302
  @param nslookup_fn: function use to do NS lookup
303
  @rtype: list of tuple; (string, string)
304
  @return: List of tuples containing node name and IP address
305

306
  """
307
  ss = ssc()
308
  iplist = ss.GetNodePrimaryIPList()
309
  family = ss.GetPrimaryIPFamily()
310
  ipmap = dict(entry.split() for entry in iplist)
311

    
312
  result = []
313
  for node in node_list:
314
    ip = ipmap.get(node)
315
    if ip is None:
316
      ip = nslookup_fn(node, family=family)
317
    result.append((node, ip))
318

    
319
  return result
320

    
321

    
322
class _StaticResolver:
323
  def __init__(self, addresses):
324
    """Initializes this class.
325

326
    """
327
    self._addresses = addresses
328

    
329
  def __call__(self, hosts):
330
    """Returns static addresses for hosts.
331

332
    """
333
    assert len(hosts) == len(self._addresses)
334
    return zip(hosts, self._addresses)
335

    
336

    
337
def _CheckConfigNode(name, node):
338
  """Checks if a node is online.
339

340
  @type name: string
341
  @param name: Node name
342
  @type node: L{objects.Node} or None
343
  @param node: Node object
344

345
  """
346
  if node is None:
347
    # Depend on DNS for name resolution
348
    ip = name
349
  elif node.offline:
350
    ip = _OFFLINE
351
  else:
352
    ip = node.primary_ip
353
  return (name, ip)
354

    
355

    
356
def _NodeConfigResolver(single_node_fn, all_nodes_fn, hosts):
357
  """Calculate node addresses using configuration.
358

359
  """
360
  # Special case for single-host lookups
361
  if len(hosts) == 1:
362
    (name, ) = hosts
363
    return [_CheckConfigNode(name, single_node_fn(name))]
364
  else:
365
    all_nodes = all_nodes_fn()
366
    return [_CheckConfigNode(name, all_nodes.get(name, None))
367
            for name in hosts]
368

    
369

    
370
class _RpcProcessor:
371
  def __init__(self, resolver, port, lock_monitor_cb=None):
372
    """Initializes this class.
373

374
    @param resolver: callable accepting a list of hostnames, returning a list
375
      of tuples containing name and IP address (IP address can be the name or
376
      the special value L{_OFFLINE} to mark offline machines)
377
    @type port: int
378
    @param port: TCP port
379
    @param lock_monitor_cb: Callable for registering with lock monitor
380

381
    """
382
    self._resolver = resolver
383
    self._port = port
384
    self._lock_monitor_cb = lock_monitor_cb
385

    
386
  @staticmethod
387
  def _PrepareRequests(hosts, port, procedure, body, read_timeout):
388
    """Prepares requests by sorting offline hosts into separate list.
389

390
    """
391
    results = {}
392
    requests = {}
393

    
394
    for (name, ip) in hosts:
395
      if ip is _OFFLINE:
396
        # Node is marked as offline
397
        results[name] = RpcResult(node=name, offline=True, call=procedure)
398
      else:
399
        requests[name] = \
400
          http.client.HttpClientRequest(str(ip), port,
401
                                        http.HTTP_PUT, str("/%s" % procedure),
402
                                        headers=_RPC_CLIENT_HEADERS,
403
                                        post_data=body,
404
                                        read_timeout=read_timeout)
405

    
406
    return (results, requests)
407

    
408
  @staticmethod
409
  def _CombineResults(results, requests, procedure):
410
    """Combines pre-computed results for offline hosts with actual call results.
411

412
    """
413
    for name, req in requests.items():
414
      if req.success and req.resp_status_code == http.HTTP_OK:
415
        host_result = RpcResult(data=serializer.LoadJson(req.resp_body),
416
                                node=name, call=procedure)
417
      else:
418
        # TODO: Better error reporting
419
        if req.error:
420
          msg = req.error
421
        else:
422
          msg = req.resp_body
423

    
424
        logging.error("RPC error in %s on node %s: %s", procedure, name, msg)
425
        host_result = RpcResult(data=msg, failed=True, node=name,
426
                                call=procedure)
427

    
428
      results[name] = host_result
429

    
430
    return results
431

    
432
  def __call__(self, hosts, procedure, body, read_timeout=None, http_pool=None):
433
    """Makes an RPC request to a number of nodes.
434

435
    @type hosts: sequence
436
    @param hosts: Hostnames
437
    @type procedure: string
438
    @param procedure: Request path
439
    @type body: string
440
    @param body: Request body
441
    @type read_timeout: int or None
442
    @param read_timeout: Read timeout for request
443

444
    """
445
    assert procedure in _TIMEOUTS, "RPC call not declared in the timeouts table"
446

    
447
    if not http_pool:
448
      http_pool = _thread_local.GetHttpClientPool()
449

    
450
    if read_timeout is None:
451
      read_timeout = _TIMEOUTS[procedure]
452

    
453
    (results, requests) = \
454
      self._PrepareRequests(self._resolver(hosts), self._port, procedure,
455
                            str(body), read_timeout)
456

    
457
    http_pool.ProcessRequests(requests.values(),
458
                              lock_monitor_cb=self._lock_monitor_cb)
459

    
460
    assert not frozenset(results).intersection(requests)
461

    
462
    return self._CombineResults(results, requests, procedure)
463

    
464

    
465
def _EncodeImportExportIO(ieio, ieioargs):
466
  """Encodes import/export I/O information.
467

468
  """
469
  if ieio == constants.IEIO_RAW_DISK:
470
    assert len(ieioargs) == 1
471
    return (ieioargs[0].ToDict(), )
472

    
473
  if ieio == constants.IEIO_SCRIPT:
474
    assert len(ieioargs) == 2
475
    return (ieioargs[0].ToDict(), ieioargs[1])
476

    
477
  return ieioargs
478

    
479

    
480
class RpcRunner(object):
481
  """RPC runner class.
482

483
  """
484
  def __init__(self, context):
485
    """Initialized the RPC runner.
486

487
    @type context: C{masterd.GanetiContext}
488
    @param context: Ganeti context
489

490
    """
491
    self._cfg = context.cfg
492
    self._proc = _RpcProcessor(compat.partial(_NodeConfigResolver,
493
                                              self._cfg.GetNodeInfo,
494
                                              self._cfg.GetAllNodesInfo),
495
                               netutils.GetDaemonPort(constants.NODED),
496
                               lock_monitor_cb=context.glm.AddToLockMonitor)
497

    
498
  def _InstDict(self, instance, hvp=None, bep=None, osp=None):
499
    """Convert the given instance to a dict.
500

501
    This is done via the instance's ToDict() method and additionally
502
    we fill the hvparams with the cluster defaults.
503

504
    @type instance: L{objects.Instance}
505
    @param instance: an Instance object
506
    @type hvp: dict or None
507
    @param hvp: a dictionary with overridden hypervisor parameters
508
    @type bep: dict or None
509
    @param bep: a dictionary with overridden backend parameters
510
    @type osp: dict or None
511
    @param osp: a dictionary with overridden os parameters
512
    @rtype: dict
513
    @return: the instance dict, with the hvparams filled with the
514
        cluster defaults
515

516
    """
517
    idict = instance.ToDict()
518
    cluster = self._cfg.GetClusterInfo()
519
    idict["hvparams"] = cluster.FillHV(instance)
520
    if hvp is not None:
521
      idict["hvparams"].update(hvp)
522
    idict["beparams"] = cluster.FillBE(instance)
523
    if bep is not None:
524
      idict["beparams"].update(bep)
525
    idict["osparams"] = cluster.SimpleFillOS(instance.os, instance.osparams)
526
    if osp is not None:
527
      idict["osparams"].update(osp)
528
    for nic in idict["nics"]:
529
      nic['nicparams'] = objects.FillDict(
530
        cluster.nicparams[constants.PP_DEFAULT],
531
        nic['nicparams'])
532
    return idict
533

    
534
  def _MultiNodeCall(self, node_list, procedure, args, read_timeout=None):
535
    """Helper for making a multi-node call
536

537
    """
538
    body = serializer.DumpJson(args, indent=False)
539
    return self._proc(node_list, procedure, body, read_timeout=read_timeout)
540

    
541
  @staticmethod
542
  def _StaticMultiNodeCall(node_list, procedure, args,
543
                           address_list=None, read_timeout=None):
544
    """Helper for making a multi-node static call
545

546
    """
547
    body = serializer.DumpJson(args, indent=False)
548

    
549
    if address_list is None:
550
      resolver = _SsconfResolver
551
    else:
552
      # Caller provided an address list
553
      resolver = _StaticResolver(address_list)
554

    
555
    proc = _RpcProcessor(resolver,
556
                         netutils.GetDaemonPort(constants.NODED))
557
    return proc(node_list, procedure, body, read_timeout=read_timeout)
558

    
559
  def _SingleNodeCall(self, node, procedure, args, read_timeout=None):
560
    """Helper for making a single-node call
561

562
    """
563
    body = serializer.DumpJson(args, indent=False)
564
    return self._proc([node], procedure, body, read_timeout=read_timeout)[node]
565

    
566
  @classmethod
567
  def _StaticSingleNodeCall(cls, node, procedure, args, read_timeout=None):
568
    """Helper for making a single-node static call
569

570
    """
571
    body = serializer.DumpJson(args, indent=False)
572
    proc = _RpcProcessor(_SsconfResolver,
573
                         netutils.GetDaemonPort(constants.NODED))
574
    return proc([node], procedure, body, read_timeout=read_timeout)[node]
575

    
576
  #
577
  # Begin RPC calls
578
  #
579

    
580
  @_RpcTimeout(_TMO_URGENT)
581
  def call_bdev_sizes(self, node_list, devices):
582
    """Gets the sizes of requested block devices present on a node
583

584
    This is a multi-node call.
585

586
    """
587
    return self._MultiNodeCall(node_list, "bdev_sizes", [devices])
588

    
589
  @_RpcTimeout(_TMO_URGENT)
590
  def call_lv_list(self, node_list, vg_name):
591
    """Gets the logical volumes present in a given volume group.
592

593
    This is a multi-node call.
594

595
    """
596
    return self._MultiNodeCall(node_list, "lv_list", [vg_name])
597

    
598
  @_RpcTimeout(_TMO_URGENT)
599
  def call_vg_list(self, node_list):
600
    """Gets the volume group list.
601

602
    This is a multi-node call.
603

604
    """
605
    return self._MultiNodeCall(node_list, "vg_list", [])
606

    
607
  @_RpcTimeout(_TMO_NORMAL)
608
  def call_storage_list(self, node_list, su_name, su_args, name, fields):
609
    """Get list of storage units.
610

611
    This is a multi-node call.
612

613
    """
614
    return self._MultiNodeCall(node_list, "storage_list",
615
                               [su_name, su_args, name, fields])
616

    
617
  @_RpcTimeout(_TMO_NORMAL)
618
  def call_storage_modify(self, node, su_name, su_args, name, changes):
619
    """Modify a storage unit.
620

621
    This is a single-node call.
622

623
    """
624
    return self._SingleNodeCall(node, "storage_modify",
625
                                [su_name, su_args, name, changes])
626

    
627
  @_RpcTimeout(_TMO_NORMAL)
628
  def call_storage_execute(self, node, su_name, su_args, name, op):
629
    """Executes an operation on a storage unit.
630

631
    This is a single-node call.
632

633
    """
634
    return self._SingleNodeCall(node, "storage_execute",
635
                                [su_name, su_args, name, op])
636

    
637
  @_RpcTimeout(_TMO_URGENT)
638
  def call_bridges_exist(self, node, bridges_list):
639
    """Checks if a node has all the bridges given.
640

641
    This method checks if all bridges given in the bridges_list are
642
    present on the remote node, so that an instance that uses interfaces
643
    on those bridges can be started.
644

645
    This is a single-node call.
646

647
    """
648
    return self._SingleNodeCall(node, "bridges_exist", [bridges_list])
649

    
650
  @_RpcTimeout(_TMO_NORMAL)
651
  def call_instance_start(self, node, instance, hvp, bep, startup_paused):
652
    """Starts an instance.
653

654
    This is a single-node call.
655

656
    """
657
    idict = self._InstDict(instance, hvp=hvp, bep=bep)
658
    return self._SingleNodeCall(node, "instance_start", [idict, startup_paused])
659

    
660
  @_RpcTimeout(_TMO_NORMAL)
661
  def call_instance_shutdown(self, node, instance, timeout):
662
    """Stops an instance.
663

664
    This is a single-node call.
665

666
    """
667
    return self._SingleNodeCall(node, "instance_shutdown",
668
                                [self._InstDict(instance), timeout])
669

    
670
  @_RpcTimeout(_TMO_NORMAL)
671
  def call_migration_info(self, node, instance):
672
    """Gather the information necessary to prepare an instance migration.
673

674
    This is a single-node call.
675

676
    @type node: string
677
    @param node: the node on which the instance is currently running
678
    @type instance: C{objects.Instance}
679
    @param instance: the instance definition
680

681
    """
682
    return self._SingleNodeCall(node, "migration_info",
683
                                [self._InstDict(instance)])
684

    
685
  @_RpcTimeout(_TMO_NORMAL)
686
  def call_accept_instance(self, node, instance, info, target):
687
    """Prepare a node to accept an instance.
688

689
    This is a single-node call.
690

691
    @type node: string
692
    @param node: the target node for the migration
693
    @type instance: C{objects.Instance}
694
    @param instance: the instance definition
695
    @type info: opaque/hypervisor specific (string/data)
696
    @param info: result for the call_migration_info call
697
    @type target: string
698
    @param target: target hostname (usually ip address) (on the node itself)
699

700
    """
701
    return self._SingleNodeCall(node, "accept_instance",
702
                                [self._InstDict(instance), info, target])
703

    
704
  @_RpcTimeout(_TMO_NORMAL)
705
  def call_finalize_migration(self, node, instance, info, success):
706
    """Finalize any target-node migration specific operation.
707

708
    This is called both in case of a successful migration and in case of error
709
    (in which case it should abort the migration).
710

711
    This is a single-node call.
712

713
    @type node: string
714
    @param node: the target node for the migration
715
    @type instance: C{objects.Instance}
716
    @param instance: the instance definition
717
    @type info: opaque/hypervisor specific (string/data)
718
    @param info: result for the call_migration_info call
719
    @type success: boolean
720
    @param success: whether the migration was a success or a failure
721

722
    """
723
    return self._SingleNodeCall(node, "finalize_migration",
724
                                [self._InstDict(instance), info, success])
725

    
726
  @_RpcTimeout(_TMO_SLOW)
727
  def call_instance_migrate(self, node, instance, target, live):
728
    """Migrate an instance.
729

730
    This is a single-node call.
731

732
    @type node: string
733
    @param node: the node on which the instance is currently running
734
    @type instance: C{objects.Instance}
735
    @param instance: the instance definition
736
    @type target: string
737
    @param target: the target node name
738
    @type live: boolean
739
    @param live: whether the migration should be done live or not (the
740
        interpretation of this parameter is left to the hypervisor)
741

742
    """
743
    return self._SingleNodeCall(node, "instance_migrate",
744
                                [self._InstDict(instance), target, live])
745

    
746
  @_RpcTimeout(_TMO_NORMAL)
747
  def call_instance_reboot(self, node, inst, reboot_type, shutdown_timeout):
748
    """Reboots an instance.
749

750
    This is a single-node call.
751

752
    """
753
    return self._SingleNodeCall(node, "instance_reboot",
754
                                [self._InstDict(inst), reboot_type,
755
                                 shutdown_timeout])
756

    
757
  @_RpcTimeout(_TMO_1DAY)
758
  def call_instance_os_add(self, node, inst, reinstall, debug, osparams=None):
759
    """Installs an OS on the given instance.
760

761
    This is a single-node call.
762

763
    """
764
    return self._SingleNodeCall(node, "instance_os_add",
765
                                [self._InstDict(inst, osp=osparams),
766
                                 reinstall, debug])
767

    
768
  @_RpcTimeout(_TMO_SLOW)
769
  def call_instance_run_rename(self, node, inst, old_name, debug):
770
    """Run the OS rename script for an instance.
771

772
    This is a single-node call.
773

774
    """
775
    return self._SingleNodeCall(node, "instance_run_rename",
776
                                [self._InstDict(inst), old_name, debug])
777

    
778
  @_RpcTimeout(_TMO_URGENT)
779
  def call_instance_info(self, node, instance, hname):
780
    """Returns information about a single instance.
781

782
    This is a single-node call.
783

784
    @type node: list
785
    @param node: the list of nodes to query
786
    @type instance: string
787
    @param instance: the instance name
788
    @type hname: string
789
    @param hname: the hypervisor type of the instance
790

791
    """
792
    return self._SingleNodeCall(node, "instance_info", [instance, hname])
793

    
794
  @_RpcTimeout(_TMO_NORMAL)
795
  def call_instance_migratable(self, node, instance):
796
    """Checks whether the given instance can be migrated.
797

798
    This is a single-node call.
799

800
    @param node: the node to query
801
    @type instance: L{objects.Instance}
802
    @param instance: the instance to check
803

804

805
    """
806
    return self._SingleNodeCall(node, "instance_migratable",
807
                                [self._InstDict(instance)])
808

    
809
  @_RpcTimeout(_TMO_URGENT)
810
  def call_all_instances_info(self, node_list, hypervisor_list):
811
    """Returns information about all instances on the given nodes.
812

813
    This is a multi-node call.
814

815
    @type node_list: list
816
    @param node_list: the list of nodes to query
817
    @type hypervisor_list: list
818
    @param hypervisor_list: the hypervisors to query for instances
819

820
    """
821
    return self._MultiNodeCall(node_list, "all_instances_info",
822
                               [hypervisor_list])
823

    
824
  @_RpcTimeout(_TMO_URGENT)
825
  def call_instance_list(self, node_list, hypervisor_list):
826
    """Returns the list of running instances on a given node.
827

828
    This is a multi-node call.
829

830
    @type node_list: list
831
    @param node_list: the list of nodes to query
832
    @type hypervisor_list: list
833
    @param hypervisor_list: the hypervisors to query for instances
834

835
    """
836
    return self._MultiNodeCall(node_list, "instance_list", [hypervisor_list])
837

    
838
  @_RpcTimeout(_TMO_FAST)
839
  def call_node_tcp_ping(self, node, source, target, port, timeout,
840
                         live_port_needed):
841
    """Do a TcpPing on the remote node
842

843
    This is a single-node call.
844

845
    """
846
    return self._SingleNodeCall(node, "node_tcp_ping",
847
                                [source, target, port, timeout,
848
                                 live_port_needed])
849

    
850
  @_RpcTimeout(_TMO_FAST)
851
  def call_node_has_ip_address(self, node, address):
852
    """Checks if a node has the given IP address.
853

854
    This is a single-node call.
855

856
    """
857
    return self._SingleNodeCall(node, "node_has_ip_address", [address])
858

    
859
  @_RpcTimeout(_TMO_URGENT)
860
  def call_node_info(self, node_list, vg_name, hypervisor_type):
861
    """Return node information.
862

863
    This will return memory information and volume group size and free
864
    space.
865

866
    This is a multi-node call.
867

868
    @type node_list: list
869
    @param node_list: the list of nodes to query
870
    @type vg_name: C{string}
871
    @param vg_name: the name of the volume group to ask for disk space
872
        information
873
    @type hypervisor_type: C{str}
874
    @param hypervisor_type: the name of the hypervisor to ask for
875
        memory information
876

877
    """
878
    return self._MultiNodeCall(node_list, "node_info",
879
                               [vg_name, hypervisor_type])
880

    
881
  @_RpcTimeout(_TMO_NORMAL)
882
  def call_etc_hosts_modify(self, node, mode, name, ip):
883
    """Modify hosts file with name
884

885
    @type node: string
886
    @param node: The node to call
887
    @type mode: string
888
    @param mode: The mode to operate. Currently "add" or "remove"
889
    @type name: string
890
    @param name: The host name to be modified
891
    @type ip: string
892
    @param ip: The ip of the entry (just valid if mode is "add")
893

894
    """
895
    return self._SingleNodeCall(node, "etc_hosts_modify", [mode, name, ip])
896

    
897
  @_RpcTimeout(_TMO_NORMAL)
898
  def call_node_verify(self, node_list, checkdict, cluster_name):
899
    """Request verification of given parameters.
900

901
    This is a multi-node call.
902

903
    """
904
    return self._MultiNodeCall(node_list, "node_verify",
905
                               [checkdict, cluster_name])
906

    
907
  @classmethod
908
  @_RpcTimeout(_TMO_FAST)
909
  def call_node_start_master(cls, node, start_daemons, no_voting):
910
    """Tells a node to activate itself as a master.
911

912
    This is a single-node call.
913

914
    """
915
    return cls._StaticSingleNodeCall(node, "node_start_master",
916
                                     [start_daemons, no_voting])
917

    
918
  @classmethod
919
  @_RpcTimeout(_TMO_FAST)
920
  def call_node_stop_master(cls, node, stop_daemons):
921
    """Tells a node to demote itself from master status.
922

923
    This is a single-node call.
924

925
    """
926
    return cls._StaticSingleNodeCall(node, "node_stop_master", [stop_daemons])
927

    
928
  @classmethod
929
  @_RpcTimeout(_TMO_URGENT)
930
  def call_master_info(cls, node_list):
931
    """Query master info.
932

933
    This is a multi-node call.
934

935
    """
936
    # TODO: should this method query down nodes?
937
    return cls._StaticMultiNodeCall(node_list, "master_info", [])
938

    
939
  @classmethod
940
  @_RpcTimeout(_TMO_URGENT)
941
  def call_version(cls, node_list):
942
    """Query node version.
943

944
    This is a multi-node call.
945

946
    """
947
    return cls._StaticMultiNodeCall(node_list, "version", [])
948

    
949
  @_RpcTimeout(_TMO_NORMAL)
950
  def call_blockdev_create(self, node, bdev, size, owner, on_primary, info):
951
    """Request creation of a given block device.
952

953
    This is a single-node call.
954

955
    """
956
    return self._SingleNodeCall(node, "blockdev_create",
957
                                [bdev.ToDict(), size, owner, on_primary, info])
958

    
959
  @_RpcTimeout(_TMO_SLOW)
960
  def call_blockdev_wipe(self, node, bdev, offset, size):
961
    """Request wipe at given offset with given size of a block device.
962

963
    This is a single-node call.
964

965
    """
966
    return self._SingleNodeCall(node, "blockdev_wipe",
967
                                [bdev.ToDict(), offset, size])
968

    
969
  @_RpcTimeout(_TMO_NORMAL)
970
  def call_blockdev_remove(self, node, bdev):
971
    """Request removal of a given block device.
972

973
    This is a single-node call.
974

975
    """
976
    return self._SingleNodeCall(node, "blockdev_remove", [bdev.ToDict()])
977

    
978
  @_RpcTimeout(_TMO_NORMAL)
979
  def call_blockdev_rename(self, node, devlist):
980
    """Request rename of the given block devices.
981

982
    This is a single-node call.
983

984
    """
985
    return self._SingleNodeCall(node, "blockdev_rename",
986
                                [(d.ToDict(), uid) for d, uid in devlist])
987

    
988
  @_RpcTimeout(_TMO_NORMAL)
989
  def call_blockdev_pause_resume_sync(self, node, disks, pause):
990
    """Request a pause/resume of given block device.
991

992
    This is a single-node call.
993

994
    """
995
    return self._SingleNodeCall(node, "blockdev_pause_resume_sync",
996
                                [[bdev.ToDict() for bdev in disks], pause])
997

    
998
  @_RpcTimeout(_TMO_NORMAL)
999
  def call_blockdev_assemble(self, node, disk, owner, on_primary, idx):
1000
    """Request assembling of a given block device.
1001

1002
    This is a single-node call.
1003

1004
    """
1005
    return self._SingleNodeCall(node, "blockdev_assemble",
1006
                                [disk.ToDict(), owner, on_primary, idx])
1007

    
1008
  @_RpcTimeout(_TMO_NORMAL)
1009
  def call_blockdev_shutdown(self, node, disk):
1010
    """Request shutdown of a given block device.
1011

1012
    This is a single-node call.
1013

1014
    """
1015
    return self._SingleNodeCall(node, "blockdev_shutdown", [disk.ToDict()])
1016

    
1017
  @_RpcTimeout(_TMO_NORMAL)
1018
  def call_blockdev_addchildren(self, node, bdev, ndevs):
1019
    """Request adding a list of children to a (mirroring) device.
1020

1021
    This is a single-node call.
1022

1023
    """
1024
    return self._SingleNodeCall(node, "blockdev_addchildren",
1025
                                [bdev.ToDict(),
1026
                                 [disk.ToDict() for disk in ndevs]])
1027

    
1028
  @_RpcTimeout(_TMO_NORMAL)
1029
  def call_blockdev_removechildren(self, node, bdev, ndevs):
1030
    """Request removing a list of children from a (mirroring) device.
1031

1032
    This is a single-node call.
1033

1034
    """
1035
    return self._SingleNodeCall(node, "blockdev_removechildren",
1036
                                [bdev.ToDict(),
1037
                                 [disk.ToDict() for disk in ndevs]])
1038

    
1039
  @_RpcTimeout(_TMO_NORMAL)
1040
  def call_blockdev_getmirrorstatus(self, node, disks):
1041
    """Request status of a (mirroring) device.
1042

1043
    This is a single-node call.
1044

1045
    """
1046
    result = self._SingleNodeCall(node, "blockdev_getmirrorstatus",
1047
                                  [dsk.ToDict() for dsk in disks])
1048
    if not result.fail_msg:
1049
      result.payload = [objects.BlockDevStatus.FromDict(i)
1050
                        for i in result.payload]
1051
    return result
1052

    
1053
  @_RpcTimeout(_TMO_NORMAL)
1054
  def call_blockdev_getmirrorstatus_multi(self, node_list, node_disks):
1055
    """Request status of (mirroring) devices from multiple nodes.
1056

1057
    This is a multi-node call.
1058

1059
    """
1060
    result = self._MultiNodeCall(node_list, "blockdev_getmirrorstatus_multi",
1061
                                 [dict((name, [dsk.ToDict() for dsk in disks])
1062
                                       for name, disks in node_disks.items())])
1063
    for nres in result.values():
1064
      if nres.fail_msg:
1065
        continue
1066

    
1067
      for idx, (success, status) in enumerate(nres.payload):
1068
        if success:
1069
          nres.payload[idx] = (success, objects.BlockDevStatus.FromDict(status))
1070

    
1071
    return result
1072

    
1073
  @_RpcTimeout(_TMO_NORMAL)
1074
  def call_blockdev_find(self, node, disk):
1075
    """Request identification of a given block device.
1076

1077
    This is a single-node call.
1078

1079
    """
1080
    result = self._SingleNodeCall(node, "blockdev_find", [disk.ToDict()])
1081
    if not result.fail_msg and result.payload is not None:
1082
      result.payload = objects.BlockDevStatus.FromDict(result.payload)
1083
    return result
1084

    
1085
  @_RpcTimeout(_TMO_NORMAL)
1086
  def call_blockdev_close(self, node, instance_name, disks):
1087
    """Closes the given block devices.
1088

1089
    This is a single-node call.
1090

1091
    """
1092
    params = [instance_name, [cf.ToDict() for cf in disks]]
1093
    return self._SingleNodeCall(node, "blockdev_close", params)
1094

    
1095
  @_RpcTimeout(_TMO_NORMAL)
1096
  def call_blockdev_getsize(self, node, disks):
1097
    """Returns the size of the given disks.
1098

1099
    This is a single-node call.
1100

1101
    """
1102
    params = [[cf.ToDict() for cf in disks]]
1103
    return self._SingleNodeCall(node, "blockdev_getsize", params)
1104

    
1105
  @_RpcTimeout(_TMO_NORMAL)
1106
  def call_drbd_disconnect_net(self, node_list, nodes_ip, disks):
1107
    """Disconnects the network of the given drbd devices.
1108

1109
    This is a multi-node call.
1110

1111
    """
1112
    return self._MultiNodeCall(node_list, "drbd_disconnect_net",
1113
                               [nodes_ip, [cf.ToDict() for cf in disks]])
1114

    
1115
  @_RpcTimeout(_TMO_NORMAL)
1116
  def call_drbd_attach_net(self, node_list, nodes_ip,
1117
                           disks, instance_name, multimaster):
1118
    """Disconnects the given drbd devices.
1119

1120
    This is a multi-node call.
1121

1122
    """
1123
    return self._MultiNodeCall(node_list, "drbd_attach_net",
1124
                               [nodes_ip, [cf.ToDict() for cf in disks],
1125
                                instance_name, multimaster])
1126

    
1127
  @_RpcTimeout(_TMO_SLOW)
1128
  def call_drbd_wait_sync(self, node_list, nodes_ip, disks):
1129
    """Waits for the synchronization of drbd devices is complete.
1130

1131
    This is a multi-node call.
1132

1133
    """
1134
    return self._MultiNodeCall(node_list, "drbd_wait_sync",
1135
                               [nodes_ip, [cf.ToDict() for cf in disks]])
1136

    
1137
  @_RpcTimeout(_TMO_URGENT)
1138
  def call_drbd_helper(self, node_list):
1139
    """Gets drbd helper.
1140

1141
    This is a multi-node call.
1142

1143
    """
1144
    return self._MultiNodeCall(node_list, "drbd_helper", [])
1145

    
1146
  @classmethod
1147
  @_RpcTimeout(_TMO_NORMAL)
1148
  def call_upload_file(cls, node_list, file_name, address_list=None):
1149
    """Upload a file.
1150

1151
    The node will refuse the operation in case the file is not on the
1152
    approved file list.
1153

1154
    This is a multi-node call.
1155

1156
    @type node_list: list
1157
    @param node_list: the list of node names to upload to
1158
    @type file_name: str
1159
    @param file_name: the filename to upload
1160
    @type address_list: list or None
1161
    @keyword address_list: an optional list of node addresses, in order
1162
        to optimize the RPC speed
1163

1164
    """
1165
    file_contents = utils.ReadFile(file_name)
1166
    data = _Compress(file_contents)
1167
    st = os.stat(file_name)
1168
    getents = runtime.GetEnts()
1169
    params = [file_name, data, st.st_mode, getents.LookupUid(st.st_uid),
1170
              getents.LookupGid(st.st_gid), st.st_atime, st.st_mtime]
1171
    return cls._StaticMultiNodeCall(node_list, "upload_file", params,
1172
                                    address_list=address_list)
1173

    
1174
  @classmethod
1175
  @_RpcTimeout(_TMO_NORMAL)
1176
  def call_write_ssconf_files(cls, node_list, values):
1177
    """Write ssconf files.
1178

1179
    This is a multi-node call.
1180

1181
    """
1182
    return cls._StaticMultiNodeCall(node_list, "write_ssconf_files", [values])
1183

    
1184
  @_RpcTimeout(_TMO_NORMAL)
1185
  def call_run_oob(self, node, oob_program, command, remote_node, timeout):
1186
    """Runs OOB.
1187

1188
    This is a single-node call.
1189

1190
    """
1191
    return self._SingleNodeCall(node, "run_oob", [oob_program, command,
1192
                                                  remote_node, timeout])
1193

    
1194
  @_RpcTimeout(_TMO_FAST)
1195
  def call_os_diagnose(self, node_list):
1196
    """Request a diagnose of OS definitions.
1197

1198
    This is a multi-node call.
1199

1200
    """
1201
    return self._MultiNodeCall(node_list, "os_diagnose", [])
1202

    
1203
  @_RpcTimeout(_TMO_FAST)
1204
  def call_os_get(self, node, name):
1205
    """Returns an OS definition.
1206

1207
    This is a single-node call.
1208

1209
    """
1210
    result = self._SingleNodeCall(node, "os_get", [name])
1211
    if not result.fail_msg and isinstance(result.payload, dict):
1212
      result.payload = objects.OS.FromDict(result.payload)
1213
    return result
1214

    
1215
  @_RpcTimeout(_TMO_FAST)
1216
  def call_os_validate(self, required, nodes, name, checks, params):
1217
    """Run a validation routine for a given OS.
1218

1219
    This is a multi-node call.
1220

1221
    """
1222
    return self._MultiNodeCall(nodes, "os_validate",
1223
                               [required, name, checks, params])
1224

    
1225
  @_RpcTimeout(_TMO_NORMAL)
1226
  def call_hooks_runner(self, node_list, hpath, phase, env):
1227
    """Call the hooks runner.
1228

1229
    Args:
1230
      - op: the OpCode instance
1231
      - env: a dictionary with the environment
1232

1233
    This is a multi-node call.
1234

1235
    """
1236
    params = [hpath, phase, env]
1237
    return self._MultiNodeCall(node_list, "hooks_runner", params)
1238

    
1239
  @_RpcTimeout(_TMO_NORMAL)
1240
  def call_iallocator_runner(self, node, name, idata):
1241
    """Call an iallocator on a remote node
1242

1243
    Args:
1244
      - name: the iallocator name
1245
      - input: the json-encoded input string
1246

1247
    This is a single-node call.
1248

1249
    """
1250
    return self._SingleNodeCall(node, "iallocator_runner", [name, idata])
1251

    
1252
  @_RpcTimeout(_TMO_NORMAL)
1253
  def call_blockdev_grow(self, node, cf_bdev, amount, dryrun):
1254
    """Request a snapshot of the given block device.
1255

1256
    This is a single-node call.
1257

1258
    """
1259
    return self._SingleNodeCall(node, "blockdev_grow",
1260
                                [cf_bdev.ToDict(), amount, dryrun])
1261

    
1262
  @_RpcTimeout(_TMO_1DAY)
1263
  def call_blockdev_export(self, node, cf_bdev,
1264
                           dest_node, dest_path, cluster_name):
1265
    """Export a given disk to another node.
1266

1267
    This is a single-node call.
1268

1269
    """
1270
    return self._SingleNodeCall(node, "blockdev_export",
1271
                                [cf_bdev.ToDict(), dest_node, dest_path,
1272
                                 cluster_name])
1273

    
1274
  @_RpcTimeout(_TMO_NORMAL)
1275
  def call_blockdev_snapshot(self, node, cf_bdev):
1276
    """Request a snapshot of the given block device.
1277

1278
    This is a single-node call.
1279

1280
    """
1281
    return self._SingleNodeCall(node, "blockdev_snapshot", [cf_bdev.ToDict()])
1282

    
1283
  @_RpcTimeout(_TMO_NORMAL)
1284
  def call_finalize_export(self, node, instance, snap_disks):
1285
    """Request the completion of an export operation.
1286

1287
    This writes the export config file, etc.
1288

1289
    This is a single-node call.
1290

1291
    """
1292
    flat_disks = []
1293
    for disk in snap_disks:
1294
      if isinstance(disk, bool):
1295
        flat_disks.append(disk)
1296
      else:
1297
        flat_disks.append(disk.ToDict())
1298

    
1299
    return self._SingleNodeCall(node, "finalize_export",
1300
                                [self._InstDict(instance), flat_disks])
1301

    
1302
  @_RpcTimeout(_TMO_FAST)
1303
  def call_export_info(self, node, path):
1304
    """Queries the export information in a given path.
1305

1306
    This is a single-node call.
1307

1308
    """
1309
    return self._SingleNodeCall(node, "export_info", [path])
1310

    
1311
  @_RpcTimeout(_TMO_FAST)
1312
  def call_export_list(self, node_list):
1313
    """Gets the stored exports list.
1314

1315
    This is a multi-node call.
1316

1317
    """
1318
    return self._MultiNodeCall(node_list, "export_list", [])
1319

    
1320
  @_RpcTimeout(_TMO_FAST)
1321
  def call_export_remove(self, node, export):
1322
    """Requests removal of a given export.
1323

1324
    This is a single-node call.
1325

1326
    """
1327
    return self._SingleNodeCall(node, "export_remove", [export])
1328

    
1329
  @classmethod
1330
  @_RpcTimeout(_TMO_NORMAL)
1331
  def call_node_leave_cluster(cls, node, modify_ssh_setup):
1332
    """Requests a node to clean the cluster information it has.
1333

1334
    This will remove the configuration information from the ganeti data
1335
    dir.
1336

1337
    This is a single-node call.
1338

1339
    """
1340
    return cls._StaticSingleNodeCall(node, "node_leave_cluster",
1341
                                     [modify_ssh_setup])
1342

    
1343
  @_RpcTimeout(_TMO_FAST)
1344
  def call_node_volumes(self, node_list):
1345
    """Gets all volumes on node(s).
1346

1347
    This is a multi-node call.
1348

1349
    """
1350
    return self._MultiNodeCall(node_list, "node_volumes", [])
1351

    
1352
  @_RpcTimeout(_TMO_FAST)
1353
  def call_node_demote_from_mc(self, node):
1354
    """Demote a node from the master candidate role.
1355

1356
    This is a single-node call.
1357

1358
    """
1359
    return self._SingleNodeCall(node, "node_demote_from_mc", [])
1360

    
1361
  @_RpcTimeout(_TMO_NORMAL)
1362
  def call_node_powercycle(self, node, hypervisor):
1363
    """Tries to powercycle a node.
1364

1365
    This is a single-node call.
1366

1367
    """
1368
    return self._SingleNodeCall(node, "node_powercycle", [hypervisor])
1369

    
1370
  @_RpcTimeout(None)
1371
  def call_test_delay(self, node_list, duration):
1372
    """Sleep for a fixed time on given node(s).
1373

1374
    This is a multi-node call.
1375

1376
    """
1377
    return self._MultiNodeCall(node_list, "test_delay", [duration],
1378
                               read_timeout=int(duration + 5))
1379

    
1380
  @_RpcTimeout(_TMO_FAST)
1381
  def call_file_storage_dir_create(self, node, file_storage_dir):
1382
    """Create the given file storage directory.
1383

1384
    This is a single-node call.
1385

1386
    """
1387
    return self._SingleNodeCall(node, "file_storage_dir_create",
1388
                                [file_storage_dir])
1389

    
1390
  @_RpcTimeout(_TMO_FAST)
1391
  def call_file_storage_dir_remove(self, node, file_storage_dir):
1392
    """Remove the given file storage directory.
1393

1394
    This is a single-node call.
1395

1396
    """
1397
    return self._SingleNodeCall(node, "file_storage_dir_remove",
1398
                                [file_storage_dir])
1399

    
1400
  @_RpcTimeout(_TMO_FAST)
1401
  def call_file_storage_dir_rename(self, node, old_file_storage_dir,
1402
                                   new_file_storage_dir):
1403
    """Rename file storage directory.
1404

1405
    This is a single-node call.
1406

1407
    """
1408
    return self._SingleNodeCall(node, "file_storage_dir_rename",
1409
                                [old_file_storage_dir, new_file_storage_dir])
1410

    
1411
  @classmethod
1412
  @_RpcTimeout(_TMO_URGENT)
1413
  def call_jobqueue_update(cls, node_list, address_list, file_name, content):
1414
    """Update job queue.
1415

1416
    This is a multi-node call.
1417

1418
    """
1419
    return cls._StaticMultiNodeCall(node_list, "jobqueue_update",
1420
                                    [file_name, _Compress(content)],
1421
                                    address_list=address_list)
1422

    
1423
  @classmethod
1424
  @_RpcTimeout(_TMO_NORMAL)
1425
  def call_jobqueue_purge(cls, node):
1426
    """Purge job queue.
1427

1428
    This is a single-node call.
1429

1430
    """
1431
    return cls._StaticSingleNodeCall(node, "jobqueue_purge", [])
1432

    
1433
  @classmethod
1434
  @_RpcTimeout(_TMO_URGENT)
1435
  def call_jobqueue_rename(cls, node_list, address_list, rename):
1436
    """Rename a job queue file.
1437

1438
    This is a multi-node call.
1439

1440
    """
1441
    return cls._StaticMultiNodeCall(node_list, "jobqueue_rename", rename,
1442
                                    address_list=address_list)
1443

    
1444
  @_RpcTimeout(_TMO_NORMAL)
1445
  def call_hypervisor_validate_params(self, node_list, hvname, hvparams):
1446
    """Validate the hypervisor params.
1447

1448
    This is a multi-node call.
1449

1450
    @type node_list: list
1451
    @param node_list: the list of nodes to query
1452
    @type hvname: string
1453
    @param hvname: the hypervisor name
1454
    @type hvparams: dict
1455
    @param hvparams: the hypervisor parameters to be validated
1456

1457
    """
1458
    cluster = self._cfg.GetClusterInfo()
1459
    hv_full = objects.FillDict(cluster.hvparams.get(hvname, {}), hvparams)
1460
    return self._MultiNodeCall(node_list, "hypervisor_validate_params",
1461
                               [hvname, hv_full])
1462

    
1463
  @_RpcTimeout(_TMO_NORMAL)
1464
  def call_x509_cert_create(self, node, validity):
1465
    """Creates a new X509 certificate for SSL/TLS.
1466

1467
    This is a single-node call.
1468

1469
    @type validity: int
1470
    @param validity: Validity in seconds
1471

1472
    """
1473
    return self._SingleNodeCall(node, "x509_cert_create", [validity])
1474

    
1475
  @_RpcTimeout(_TMO_NORMAL)
1476
  def call_x509_cert_remove(self, node, name):
1477
    """Removes a X509 certificate.
1478

1479
    This is a single-node call.
1480

1481
    @type name: string
1482
    @param name: Certificate name
1483

1484
    """
1485
    return self._SingleNodeCall(node, "x509_cert_remove", [name])
1486

    
1487
  @_RpcTimeout(_TMO_NORMAL)
1488
  def call_import_start(self, node, opts, instance, component,
1489
                        dest, dest_args):
1490
    """Starts a listener for an import.
1491

1492
    This is a single-node call.
1493

1494
    @type node: string
1495
    @param node: Node name
1496
    @type instance: C{objects.Instance}
1497
    @param instance: Instance object
1498
    @type component: string
1499
    @param component: which part of the instance is being imported
1500

1501
    """
1502
    return self._SingleNodeCall(node, "import_start",
1503
                                [opts.ToDict(),
1504
                                 self._InstDict(instance), component, dest,
1505
                                 _EncodeImportExportIO(dest, dest_args)])
1506

    
1507
  @_RpcTimeout(_TMO_NORMAL)
1508
  def call_export_start(self, node, opts, host, port,
1509
                        instance, component, source, source_args):
1510
    """Starts an export daemon.
1511

1512
    This is a single-node call.
1513

1514
    @type node: string
1515
    @param node: Node name
1516
    @type instance: C{objects.Instance}
1517
    @param instance: Instance object
1518
    @type component: string
1519
    @param component: which part of the instance is being imported
1520

1521
    """
1522
    return self._SingleNodeCall(node, "export_start",
1523
                                [opts.ToDict(), host, port,
1524
                                 self._InstDict(instance),
1525
                                 component, source,
1526
                                 _EncodeImportExportIO(source, source_args)])
1527

    
1528
  @_RpcTimeout(_TMO_FAST)
1529
  def call_impexp_status(self, node, names):
1530
    """Gets the status of an import or export.
1531

1532
    This is a single-node call.
1533

1534
    @type node: string
1535
    @param node: Node name
1536
    @type names: List of strings
1537
    @param names: Import/export names
1538
    @rtype: List of L{objects.ImportExportStatus} instances
1539
    @return: Returns a list of the state of each named import/export or None if
1540
             a status couldn't be retrieved
1541

1542
    """
1543
    result = self._SingleNodeCall(node, "impexp_status", [names])
1544

    
1545
    if not result.fail_msg:
1546
      decoded = []
1547

    
1548
      for i in result.payload:
1549
        if i is None:
1550
          decoded.append(None)
1551
          continue
1552
        decoded.append(objects.ImportExportStatus.FromDict(i))
1553

    
1554
      result.payload = decoded
1555

    
1556
    return result
1557

    
1558
  @_RpcTimeout(_TMO_NORMAL)
1559
  def call_impexp_abort(self, node, name):
1560
    """Aborts an import or export.
1561

1562
    This is a single-node call.
1563

1564
    @type node: string
1565
    @param node: Node name
1566
    @type name: string
1567
    @param name: Import/export name
1568

1569
    """
1570
    return self._SingleNodeCall(node, "impexp_abort", [name])
1571

    
1572
  @_RpcTimeout(_TMO_NORMAL)
1573
  def call_impexp_cleanup(self, node, name):
1574
    """Cleans up after an import or export.
1575

1576
    This is a single-node call.
1577

1578
    @type node: string
1579
    @param node: Node name
1580
    @type name: string
1581
    @param name: Import/export name
1582

1583
    """
1584
    return self._SingleNodeCall(node, "impexp_cleanup", [name])