Statistics
| Branch: | Tag: | Revision:

root / lib / rpc.py @ 2be7273c

History | View | Annotate | Download (46.7 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Inter-node RPC library.
23

24
"""
25

    
26
# pylint: disable-msg=C0103,R0201,R0904
27
# C0103: Invalid name, since call_ are not valid
28
# R0201: Method could be a function, we keep all rpcs instance methods
29
# as not to change them back and forth between static/instance methods
30
# if they need to start using instance attributes
31
# R0904: Too many public methods
32

    
33
import os
34
import logging
35
import zlib
36
import base64
37
import pycurl
38
import threading
39

    
40
from ganeti import utils
41
from ganeti import objects
42
from ganeti import http
43
from ganeti import serializer
44
from ganeti import constants
45
from ganeti import errors
46
from ganeti import netutils
47
from ganeti import ssconf
48

    
49
# pylint has a bug here, doesn't see this import
50
import ganeti.http.client  # pylint: disable-msg=W0611
51

    
52

    
53
# Timeout for connecting to nodes (seconds)
54
_RPC_CONNECT_TIMEOUT = 5
55

    
56
_RPC_CLIENT_HEADERS = [
57
  "Content-type: %s" % http.HTTP_APP_JSON,
58
  "Expect:",
59
  ]
60

    
61
# Various time constants for the timeout table
62
_TMO_URGENT = 60 # one minute
63
_TMO_FAST = 5 * 60 # five minutes
64
_TMO_NORMAL = 15 * 60 # 15 minutes
65
_TMO_SLOW = 3600 # one hour
66
_TMO_4HRS = 4 * 3600
67
_TMO_1DAY = 86400
68

    
69
# Timeout table that will be built later by decorators
70
# Guidelines for choosing timeouts:
71
# - call used during watcher: timeout -> 1min, _TMO_URGENT
72
# - trivial (but be sure it is trivial) (e.g. reading a file): 5min, _TMO_FAST
73
# - other calls: 15 min, _TMO_NORMAL
74
# - special calls (instance add, etc.): either _TMO_SLOW (1h) or huge timeouts
75

    
76
_TIMEOUTS = {
77
}
78

    
79

    
80
def Init():
81
  """Initializes the module-global HTTP client manager.
82

83
  Must be called before using any RPC function and while exactly one thread is
84
  running.
85

86
  """
87
  # curl_global_init(3) and curl_global_cleanup(3) must be called with only
88
  # one thread running. This check is just a safety measure -- it doesn't
89
  # cover all cases.
90
  assert threading.activeCount() == 1, \
91
         "Found more than one active thread when initializing pycURL"
92

    
93
  logging.info("Using PycURL %s", pycurl.version)
94

    
95
  pycurl.global_init(pycurl.GLOBAL_ALL)
96

    
97

    
98
def Shutdown():
99
  """Stops the module-global HTTP client manager.
100

101
  Must be called before quitting the program and while exactly one thread is
102
  running.
103

104
  """
105
  pycurl.global_cleanup()
106

    
107

    
108
def _ConfigRpcCurl(curl):
109
  noded_cert = str(constants.NODED_CERT_FILE)
110

    
111
  curl.setopt(pycurl.FOLLOWLOCATION, False)
112
  curl.setopt(pycurl.CAINFO, noded_cert)
113
  curl.setopt(pycurl.SSL_VERIFYHOST, 0)
114
  curl.setopt(pycurl.SSL_VERIFYPEER, True)
115
  curl.setopt(pycurl.SSLCERTTYPE, "PEM")
116
  curl.setopt(pycurl.SSLCERT, noded_cert)
117
  curl.setopt(pycurl.SSLKEYTYPE, "PEM")
118
  curl.setopt(pycurl.SSLKEY, noded_cert)
119
  curl.setopt(pycurl.CONNECTTIMEOUT, _RPC_CONNECT_TIMEOUT)
120

    
121

    
122
# Aliasing this module avoids the following warning by epydoc: "Warning: No
123
# information available for ganeti.rpc._RpcThreadLocal's base threading.local"
124
_threading = threading
125

    
126

    
127
class _RpcThreadLocal(_threading.local):
128
  def GetHttpClientPool(self):
129
    """Returns a per-thread HTTP client pool.
130

131
    @rtype: L{http.client.HttpClientPool}
132

133
    """
134
    try:
135
      pool = self.hcp
136
    except AttributeError:
137
      pool = http.client.HttpClientPool(_ConfigRpcCurl)
138
      self.hcp = pool
139

    
140
    return pool
141

    
142

    
143
# Remove module alias (see above)
144
del _threading
145

    
146

    
147
_thread_local = _RpcThreadLocal()
148

    
149

    
150
def _RpcTimeout(secs):
151
  """Timeout decorator.
152

153
  When applied to a rpc call_* function, it updates the global timeout
154
  table with the given function/timeout.
155

156
  """
157
  def decorator(f):
158
    name = f.__name__
159
    assert name.startswith("call_")
160
    _TIMEOUTS[name[len("call_"):]] = secs
161
    return f
162
  return decorator
163

    
164

    
165
def RunWithRPC(fn):
166
  """RPC-wrapper decorator.
167

168
  When applied to a function, it runs it with the RPC system
169
  initialized, and it shutsdown the system afterwards. This means the
170
  function must be called without RPC being initialized.
171

172
  """
173
  def wrapper(*args, **kwargs):
174
    Init()
175
    try:
176
      return fn(*args, **kwargs)
177
    finally:
178
      Shutdown()
179
  return wrapper
180

    
181

    
182
class RpcResult(object):
183
  """RPC Result class.
184

185
  This class holds an RPC result. It is needed since in multi-node
186
  calls we can't raise an exception just because one one out of many
187
  failed, and therefore we use this class to encapsulate the result.
188

189
  @ivar data: the data payload, for successful results, or None
190
  @ivar call: the name of the RPC call
191
  @ivar node: the name of the node to which we made the call
192
  @ivar offline: whether the operation failed because the node was
193
      offline, as opposed to actual failure; offline=True will always
194
      imply failed=True, in order to allow simpler checking if
195
      the user doesn't care about the exact failure mode
196
  @ivar fail_msg: the error message if the call failed
197

198
  """
199
  def __init__(self, data=None, failed=False, offline=False,
200
               call=None, node=None):
201
    self.offline = offline
202
    self.call = call
203
    self.node = node
204

    
205
    if offline:
206
      self.fail_msg = "Node is marked offline"
207
      self.data = self.payload = None
208
    elif failed:
209
      self.fail_msg = self._EnsureErr(data)
210
      self.data = self.payload = None
211
    else:
212
      self.data = data
213
      if not isinstance(self.data, (tuple, list)):
214
        self.fail_msg = ("RPC layer error: invalid result type (%s)" %
215
                         type(self.data))
216
        self.payload = None
217
      elif len(data) != 2:
218
        self.fail_msg = ("RPC layer error: invalid result length (%d), "
219
                         "expected 2" % len(self.data))
220
        self.payload = None
221
      elif not self.data[0]:
222
        self.fail_msg = self._EnsureErr(self.data[1])
223
        self.payload = None
224
      else:
225
        # finally success
226
        self.fail_msg = None
227
        self.payload = data[1]
228

    
229
    for attr_name in ["call", "data", "fail_msg",
230
                      "node", "offline", "payload"]:
231
      assert hasattr(self, attr_name), "Missing attribute %s" % attr_name
232

    
233
  @staticmethod
234
  def _EnsureErr(val):
235
    """Helper to ensure we return a 'True' value for error."""
236
    if val:
237
      return val
238
    else:
239
      return "No error information"
240

    
241
  def Raise(self, msg, prereq=False, ecode=None):
242
    """If the result has failed, raise an OpExecError.
243

244
    This is used so that LU code doesn't have to check for each
245
    result, but instead can call this function.
246

247
    """
248
    if not self.fail_msg:
249
      return
250

    
251
    if not msg: # one could pass None for default message
252
      msg = ("Call '%s' to node '%s' has failed: %s" %
253
             (self.call, self.node, self.fail_msg))
254
    else:
255
      msg = "%s: %s" % (msg, self.fail_msg)
256
    if prereq:
257
      ec = errors.OpPrereqError
258
    else:
259
      ec = errors.OpExecError
260
    if ecode is not None:
261
      args = (msg, ecode)
262
    else:
263
      args = (msg, )
264
    raise ec(*args) # pylint: disable-msg=W0142
265

    
266

    
267
def _AddressLookup(node_list,
268
                   ssc=ssconf.SimpleStore,
269
                   nslookup_fn=netutils.Hostname.GetIP):
270
  """Return addresses for given node names.
271

272
  @type node_list: list
273
  @param node_list: List of node names
274
  @type ssc: class
275
  @param ssc: SimpleStore class that is used to obtain node->ip mappings
276
  @type nslookup_fn: callable
277
  @param nslookup_fn: function use to do NS lookup
278
  @rtype: list of addresses and/or None's
279
  @returns: List of corresponding addresses, if found
280

281
  """
282
  ss = ssc()
283
  iplist = ss.GetNodePrimaryIPList()
284
  family = ss.GetPrimaryIPFamily()
285
  addresses = []
286
  ipmap = dict(entry.split() for entry in iplist)
287
  for node in node_list:
288
    address = ipmap.get(node)
289
    if address is None:
290
      address = nslookup_fn(node, family=family)
291
    addresses.append(address)
292

    
293
  return addresses
294

    
295

    
296
class Client:
297
  """RPC Client class.
298

299
  This class, given a (remote) method name, a list of parameters and a
300
  list of nodes, will contact (in parallel) all nodes, and return a
301
  dict of results (key: node name, value: result).
302

303
  One current bug is that generic failure is still signaled by
304
  'False' result, which is not good. This overloading of values can
305
  cause bugs.
306

307
  """
308
  def __init__(self, procedure, body, port, address_lookup_fn=_AddressLookup):
309
    assert procedure in _TIMEOUTS, ("New RPC call not declared in the"
310
                                    " timeouts table")
311
    self.procedure = procedure
312
    self.body = body
313
    self.port = port
314
    self._request = {}
315
    self._address_lookup_fn = address_lookup_fn
316

    
317
  def ConnectList(self, node_list, address_list=None, read_timeout=None):
318
    """Add a list of nodes to the target nodes.
319

320
    @type node_list: list
321
    @param node_list: the list of node names to connect
322
    @type address_list: list or None
323
    @keyword address_list: either None or a list with node addresses,
324
        which must have the same length as the node list
325
    @type read_timeout: int
326
    @param read_timeout: overwrites default timeout for operation
327

328
    """
329
    if address_list is None:
330
      # Always use IP address instead of node name
331
      address_list = self._address_lookup_fn(node_list)
332

    
333
    assert len(node_list) == len(address_list), \
334
           "Name and address lists must have the same length"
335

    
336
    for node, address in zip(node_list, address_list):
337
      self.ConnectNode(node, address, read_timeout=read_timeout)
338

    
339
  def ConnectNode(self, name, address=None, read_timeout=None):
340
    """Add a node to the target list.
341

342
    @type name: str
343
    @param name: the node name
344
    @type address: str
345
    @param address: the node address, if known
346
    @type read_timeout: int
347
    @param read_timeout: overwrites default timeout for operation
348

349
    """
350
    if address is None:
351
      # Always use IP address instead of node name
352
      address = self._address_lookup_fn([name])[0]
353

    
354
    assert(address is not None)
355

    
356
    if read_timeout is None:
357
      read_timeout = _TIMEOUTS[self.procedure]
358

    
359
    self._request[name] = \
360
      http.client.HttpClientRequest(str(address), self.port,
361
                                    http.HTTP_PUT, str("/%s" % self.procedure),
362
                                    headers=_RPC_CLIENT_HEADERS,
363
                                    post_data=str(self.body),
364
                                    read_timeout=read_timeout)
365

    
366
  def GetResults(self, http_pool=None):
367
    """Call nodes and return results.
368

369
    @rtype: list
370
    @return: List of RPC results
371

372
    """
373
    if not http_pool:
374
      http_pool = _thread_local.GetHttpClientPool()
375

    
376
    http_pool.ProcessRequests(self._request.values())
377

    
378
    results = {}
379

    
380
    for name, req in self._request.iteritems():
381
      if req.success and req.resp_status_code == http.HTTP_OK:
382
        results[name] = RpcResult(data=serializer.LoadJson(req.resp_body),
383
                                  node=name, call=self.procedure)
384
        continue
385

    
386
      # TODO: Better error reporting
387
      if req.error:
388
        msg = req.error
389
      else:
390
        msg = req.resp_body
391

    
392
      logging.error("RPC error in %s from node %s: %s",
393
                    self.procedure, name, msg)
394
      results[name] = RpcResult(data=msg, failed=True, node=name,
395
                                call=self.procedure)
396

    
397
    return results
398

    
399

    
400
def _EncodeImportExportIO(ieio, ieioargs):
401
  """Encodes import/export I/O information.
402

403
  """
404
  if ieio == constants.IEIO_RAW_DISK:
405
    assert len(ieioargs) == 1
406
    return (ieioargs[0].ToDict(), )
407

    
408
  if ieio == constants.IEIO_SCRIPT:
409
    assert len(ieioargs) == 2
410
    return (ieioargs[0].ToDict(), ieioargs[1])
411

    
412
  return ieioargs
413

    
414

    
415
class RpcRunner(object):
416
  """RPC runner class"""
417

    
418
  def __init__(self, cfg):
419
    """Initialized the rpc runner.
420

421
    @type cfg:  C{config.ConfigWriter}
422
    @param cfg: the configuration object that will be used to get data
423
                about the cluster
424

425
    """
426
    self._cfg = cfg
427
    self.port = netutils.GetDaemonPort(constants.NODED)
428

    
429
  def _InstDict(self, instance, hvp=None, bep=None, osp=None):
430
    """Convert the given instance to a dict.
431

432
    This is done via the instance's ToDict() method and additionally
433
    we fill the hvparams with the cluster defaults.
434

435
    @type instance: L{objects.Instance}
436
    @param instance: an Instance object
437
    @type hvp: dict or None
438
    @param hvp: a dictionary with overridden hypervisor parameters
439
    @type bep: dict or None
440
    @param bep: a dictionary with overridden backend parameters
441
    @type osp: dict or None
442
    @param osp: a dictionary with overridden os parameters
443
    @rtype: dict
444
    @return: the instance dict, with the hvparams filled with the
445
        cluster defaults
446

447
    """
448
    idict = instance.ToDict()
449
    cluster = self._cfg.GetClusterInfo()
450
    idict["hvparams"] = cluster.FillHV(instance)
451
    if hvp is not None:
452
      idict["hvparams"].update(hvp)
453
    idict["beparams"] = cluster.FillBE(instance)
454
    if bep is not None:
455
      idict["beparams"].update(bep)
456
    idict["osparams"] = cluster.SimpleFillOS(instance.os, instance.osparams)
457
    if osp is not None:
458
      idict["osparams"].update(osp)
459
    for nic in idict["nics"]:
460
      nic['nicparams'] = objects.FillDict(
461
        cluster.nicparams[constants.PP_DEFAULT],
462
        nic['nicparams'])
463
    return idict
464

    
465
  def _ConnectList(self, client, node_list, call, read_timeout=None):
466
    """Helper for computing node addresses.
467

468
    @type client: L{ganeti.rpc.Client}
469
    @param client: a C{Client} instance
470
    @type node_list: list
471
    @param node_list: the node list we should connect
472
    @type call: string
473
    @param call: the name of the remote procedure call, for filling in
474
        correctly any eventual offline nodes' results
475
    @type read_timeout: int
476
    @param read_timeout: overwrites the default read timeout for the
477
        given operation
478

479
    """
480
    all_nodes = self._cfg.GetAllNodesInfo()
481
    name_list = []
482
    addr_list = []
483
    skip_dict = {}
484
    for node in node_list:
485
      if node in all_nodes:
486
        if all_nodes[node].offline:
487
          skip_dict[node] = RpcResult(node=node, offline=True, call=call)
488
          continue
489
        val = all_nodes[node].primary_ip
490
      else:
491
        val = None
492
      addr_list.append(val)
493
      name_list.append(node)
494
    if name_list:
495
      client.ConnectList(name_list, address_list=addr_list,
496
                         read_timeout=read_timeout)
497
    return skip_dict
498

    
499
  def _ConnectNode(self, client, node, call, read_timeout=None):
500
    """Helper for computing one node's address.
501

502
    @type client: L{ganeti.rpc.Client}
503
    @param client: a C{Client} instance
504
    @type node: str
505
    @param node: the node we should connect
506
    @type call: string
507
    @param call: the name of the remote procedure call, for filling in
508
        correctly any eventual offline nodes' results
509
    @type read_timeout: int
510
    @param read_timeout: overwrites the default read timeout for the
511
        given operation
512

513
    """
514
    node_info = self._cfg.GetNodeInfo(node)
515
    if node_info is not None:
516
      if node_info.offline:
517
        return RpcResult(node=node, offline=True, call=call)
518
      addr = node_info.primary_ip
519
    else:
520
      addr = None
521
    client.ConnectNode(node, address=addr, read_timeout=read_timeout)
522

    
523
  def _MultiNodeCall(self, node_list, procedure, args, read_timeout=None):
524
    """Helper for making a multi-node call
525

526
    """
527
    body = serializer.DumpJson(args, indent=False)
528
    c = Client(procedure, body, self.port)
529
    skip_dict = self._ConnectList(c, node_list, procedure,
530
                                  read_timeout=read_timeout)
531
    skip_dict.update(c.GetResults())
532
    return skip_dict
533

    
534
  @classmethod
535
  def _StaticMultiNodeCall(cls, node_list, procedure, args,
536
                           address_list=None, read_timeout=None):
537
    """Helper for making a multi-node static call
538

539
    """
540
    body = serializer.DumpJson(args, indent=False)
541
    c = Client(procedure, body, netutils.GetDaemonPort(constants.NODED))
542
    c.ConnectList(node_list, address_list=address_list,
543
                  read_timeout=read_timeout)
544
    return c.GetResults()
545

    
546
  def _SingleNodeCall(self, node, procedure, args, read_timeout=None):
547
    """Helper for making a single-node call
548

549
    """
550
    body = serializer.DumpJson(args, indent=False)
551
    c = Client(procedure, body, self.port)
552
    result = self._ConnectNode(c, node, procedure, read_timeout=read_timeout)
553
    if result is None:
554
      # we did connect, node is not offline
555
      result = c.GetResults()[node]
556
    return result
557

    
558
  @classmethod
559
  def _StaticSingleNodeCall(cls, node, procedure, args, read_timeout=None):
560
    """Helper for making a single-node static call
561

562
    """
563
    body = serializer.DumpJson(args, indent=False)
564
    c = Client(procedure, body, netutils.GetDaemonPort(constants.NODED))
565
    c.ConnectNode(node, read_timeout=read_timeout)
566
    return c.GetResults()[node]
567

    
568
  @staticmethod
569
  def _Compress(data):
570
    """Compresses a string for transport over RPC.
571

572
    Small amounts of data are not compressed.
573

574
    @type data: str
575
    @param data: Data
576
    @rtype: tuple
577
    @return: Encoded data to send
578

579
    """
580
    # Small amounts of data are not compressed
581
    if len(data) < 512:
582
      return (constants.RPC_ENCODING_NONE, data)
583

    
584
    # Compress with zlib and encode in base64
585
    return (constants.RPC_ENCODING_ZLIB_BASE64,
586
            base64.b64encode(zlib.compress(data, 3)))
587

    
588
  #
589
  # Begin RPC calls
590
  #
591

    
592
  @_RpcTimeout(_TMO_URGENT)
593
  def call_bdev_sizes(self, node_list, devices):
594
    """Gets the sizes of requested block devices present on a node
595

596
    This is a multi-node call.
597

598
    """
599
    return self._MultiNodeCall(node_list, "bdev_sizes", [devices])
600

    
601
  @_RpcTimeout(_TMO_URGENT)
602
  def call_lv_list(self, node_list, vg_name):
603
    """Gets the logical volumes present in a given volume group.
604

605
    This is a multi-node call.
606

607
    """
608
    return self._MultiNodeCall(node_list, "lv_list", [vg_name])
609

    
610
  @_RpcTimeout(_TMO_URGENT)
611
  def call_vg_list(self, node_list):
612
    """Gets the volume group list.
613

614
    This is a multi-node call.
615

616
    """
617
    return self._MultiNodeCall(node_list, "vg_list", [])
618

    
619
  @_RpcTimeout(_TMO_NORMAL)
620
  def call_storage_list(self, node_list, su_name, su_args, name, fields):
621
    """Get list of storage units.
622

623
    This is a multi-node call.
624

625
    """
626
    return self._MultiNodeCall(node_list, "storage_list",
627
                               [su_name, su_args, name, fields])
628

    
629
  @_RpcTimeout(_TMO_NORMAL)
630
  def call_storage_modify(self, node, su_name, su_args, name, changes):
631
    """Modify a storage unit.
632

633
    This is a single-node call.
634

635
    """
636
    return self._SingleNodeCall(node, "storage_modify",
637
                                [su_name, su_args, name, changes])
638

    
639
  @_RpcTimeout(_TMO_NORMAL)
640
  def call_storage_execute(self, node, su_name, su_args, name, op):
641
    """Executes an operation on a storage unit.
642

643
    This is a single-node call.
644

645
    """
646
    return self._SingleNodeCall(node, "storage_execute",
647
                                [su_name, su_args, name, op])
648

    
649
  @_RpcTimeout(_TMO_URGENT)
650
  def call_bridges_exist(self, node, bridges_list):
651
    """Checks if a node has all the bridges given.
652

653
    This method checks if all bridges given in the bridges_list are
654
    present on the remote node, so that an instance that uses interfaces
655
    on those bridges can be started.
656

657
    This is a single-node call.
658

659
    """
660
    return self._SingleNodeCall(node, "bridges_exist", [bridges_list])
661

    
662
  @_RpcTimeout(_TMO_NORMAL)
663
  def call_instance_start(self, node, instance, hvp, bep):
664
    """Starts an instance.
665

666
    This is a single-node call.
667

668
    """
669
    idict = self._InstDict(instance, hvp=hvp, bep=bep)
670
    return self._SingleNodeCall(node, "instance_start", [idict])
671

    
672
  @_RpcTimeout(_TMO_NORMAL)
673
  def call_instance_shutdown(self, node, instance, timeout):
674
    """Stops an instance.
675

676
    This is a single-node call.
677

678
    """
679
    return self._SingleNodeCall(node, "instance_shutdown",
680
                                [self._InstDict(instance), timeout])
681

    
682
  @_RpcTimeout(_TMO_NORMAL)
683
  def call_migration_info(self, node, instance):
684
    """Gather the information necessary to prepare an instance migration.
685

686
    This is a single-node call.
687

688
    @type node: string
689
    @param node: the node on which the instance is currently running
690
    @type instance: C{objects.Instance}
691
    @param instance: the instance definition
692

693
    """
694
    return self._SingleNodeCall(node, "migration_info",
695
                                [self._InstDict(instance)])
696

    
697
  @_RpcTimeout(_TMO_NORMAL)
698
  def call_accept_instance(self, node, instance, info, target):
699
    """Prepare a node to accept an instance.
700

701
    This is a single-node call.
702

703
    @type node: string
704
    @param node: the target node for the migration
705
    @type instance: C{objects.Instance}
706
    @param instance: the instance definition
707
    @type info: opaque/hypervisor specific (string/data)
708
    @param info: result for the call_migration_info call
709
    @type target: string
710
    @param target: target hostname (usually ip address) (on the node itself)
711

712
    """
713
    return self._SingleNodeCall(node, "accept_instance",
714
                                [self._InstDict(instance), info, target])
715

    
716
  @_RpcTimeout(_TMO_NORMAL)
717
  def call_finalize_migration(self, node, instance, info, success):
718
    """Finalize any target-node migration specific operation.
719

720
    This is called both in case of a successful migration and in case of error
721
    (in which case it should abort the migration).
722

723
    This is a single-node call.
724

725
    @type node: string
726
    @param node: the target node for the migration
727
    @type instance: C{objects.Instance}
728
    @param instance: the instance definition
729
    @type info: opaque/hypervisor specific (string/data)
730
    @param info: result for the call_migration_info call
731
    @type success: boolean
732
    @param success: whether the migration was a success or a failure
733

734
    """
735
    return self._SingleNodeCall(node, "finalize_migration",
736
                                [self._InstDict(instance), info, success])
737

    
738
  @_RpcTimeout(_TMO_SLOW)
739
  def call_instance_migrate(self, node, instance, target, live):
740
    """Migrate an instance.
741

742
    This is a single-node call.
743

744
    @type node: string
745
    @param node: the node on which the instance is currently running
746
    @type instance: C{objects.Instance}
747
    @param instance: the instance definition
748
    @type target: string
749
    @param target: the target node name
750
    @type live: boolean
751
    @param live: whether the migration should be done live or not (the
752
        interpretation of this parameter is left to the hypervisor)
753

754
    """
755
    return self._SingleNodeCall(node, "instance_migrate",
756
                                [self._InstDict(instance), target, live])
757

    
758
  @_RpcTimeout(_TMO_NORMAL)
759
  def call_instance_reboot(self, node, inst, reboot_type, shutdown_timeout):
760
    """Reboots an instance.
761

762
    This is a single-node call.
763

764
    """
765
    return self._SingleNodeCall(node, "instance_reboot",
766
                                [self._InstDict(inst), reboot_type,
767
                                 shutdown_timeout])
768

    
769
  @_RpcTimeout(_TMO_1DAY)
770
  def call_instance_os_add(self, node, inst, reinstall, debug, osparams=None):
771
    """Installs an OS on the given instance.
772

773
    This is a single-node call.
774

775
    """
776
    return self._SingleNodeCall(node, "instance_os_add",
777
                                [self._InstDict(inst, osp=osparams),
778
                                 reinstall, debug])
779

    
780
  @_RpcTimeout(_TMO_SLOW)
781
  def call_instance_run_rename(self, node, inst, old_name, debug):
782
    """Run the OS rename script for an instance.
783

784
    This is a single-node call.
785

786
    """
787
    return self._SingleNodeCall(node, "instance_run_rename",
788
                                [self._InstDict(inst), old_name, debug])
789

    
790
  @_RpcTimeout(_TMO_URGENT)
791
  def call_instance_info(self, node, instance, hname):
792
    """Returns information about a single instance.
793

794
    This is a single-node call.
795

796
    @type node: list
797
    @param node: the list of nodes to query
798
    @type instance: string
799
    @param instance: the instance name
800
    @type hname: string
801
    @param hname: the hypervisor type of the instance
802

803
    """
804
    return self._SingleNodeCall(node, "instance_info", [instance, hname])
805

    
806
  @_RpcTimeout(_TMO_NORMAL)
807
  def call_instance_migratable(self, node, instance):
808
    """Checks whether the given instance can be migrated.
809

810
    This is a single-node call.
811

812
    @param node: the node to query
813
    @type instance: L{objects.Instance}
814
    @param instance: the instance to check
815

816

817
    """
818
    return self._SingleNodeCall(node, "instance_migratable",
819
                                [self._InstDict(instance)])
820

    
821
  @_RpcTimeout(_TMO_URGENT)
822
  def call_all_instances_info(self, node_list, hypervisor_list):
823
    """Returns information about all instances on the given nodes.
824

825
    This is a multi-node call.
826

827
    @type node_list: list
828
    @param node_list: the list of nodes to query
829
    @type hypervisor_list: list
830
    @param hypervisor_list: the hypervisors to query for instances
831

832
    """
833
    return self._MultiNodeCall(node_list, "all_instances_info",
834
                               [hypervisor_list])
835

    
836
  @_RpcTimeout(_TMO_URGENT)
837
  def call_instance_list(self, node_list, hypervisor_list):
838
    """Returns the list of running instances on a given node.
839

840
    This is a multi-node call.
841

842
    @type node_list: list
843
    @param node_list: the list of nodes to query
844
    @type hypervisor_list: list
845
    @param hypervisor_list: the hypervisors to query for instances
846

847
    """
848
    return self._MultiNodeCall(node_list, "instance_list", [hypervisor_list])
849

    
850
  @_RpcTimeout(_TMO_FAST)
851
  def call_node_tcp_ping(self, node, source, target, port, timeout,
852
                         live_port_needed):
853
    """Do a TcpPing on the remote node
854

855
    This is a single-node call.
856

857
    """
858
    return self._SingleNodeCall(node, "node_tcp_ping",
859
                                [source, target, port, timeout,
860
                                 live_port_needed])
861

    
862
  @_RpcTimeout(_TMO_FAST)
863
  def call_node_has_ip_address(self, node, address):
864
    """Checks if a node has the given IP address.
865

866
    This is a single-node call.
867

868
    """
869
    return self._SingleNodeCall(node, "node_has_ip_address", [address])
870

    
871
  @_RpcTimeout(_TMO_URGENT)
872
  def call_node_info(self, node_list, vg_name, hypervisor_type):
873
    """Return node information.
874

875
    This will return memory information and volume group size and free
876
    space.
877

878
    This is a multi-node call.
879

880
    @type node_list: list
881
    @param node_list: the list of nodes to query
882
    @type vg_name: C{string}
883
    @param vg_name: the name of the volume group to ask for disk space
884
        information
885
    @type hypervisor_type: C{str}
886
    @param hypervisor_type: the name of the hypervisor to ask for
887
        memory information
888

889
    """
890
    return self._MultiNodeCall(node_list, "node_info",
891
                               [vg_name, hypervisor_type])
892

    
893
  @_RpcTimeout(_TMO_NORMAL)
894
  def call_etc_hosts_modify(self, node, mode, name, ip):
895
    """Modify hosts file with name
896

897
    @type node: string
898
    @param node: The node to call
899
    @type mode: string
900
    @param mode: The mode to operate. Currently "add" or "remove"
901
    @type name: string
902
    @param name: The host name to be modified
903
    @type ip: string
904
    @param ip: The ip of the entry (just valid if mode is "add")
905

906
    """
907
    return self._SingleNodeCall(node, "etc_hosts_modify", [mode, name, ip])
908

    
909
  @_RpcTimeout(_TMO_NORMAL)
910
  def call_node_verify(self, node_list, checkdict, cluster_name):
911
    """Request verification of given parameters.
912

913
    This is a multi-node call.
914

915
    """
916
    return self._MultiNodeCall(node_list, "node_verify",
917
                               [checkdict, cluster_name])
918

    
919
  @classmethod
920
  @_RpcTimeout(_TMO_FAST)
921
  def call_node_start_master(cls, node, start_daemons, no_voting):
922
    """Tells a node to activate itself as a master.
923

924
    This is a single-node call.
925

926
    """
927
    return cls._StaticSingleNodeCall(node, "node_start_master",
928
                                     [start_daemons, no_voting])
929

    
930
  @classmethod
931
  @_RpcTimeout(_TMO_FAST)
932
  def call_node_stop_master(cls, node, stop_daemons):
933
    """Tells a node to demote itself from master status.
934

935
    This is a single-node call.
936

937
    """
938
    return cls._StaticSingleNodeCall(node, "node_stop_master", [stop_daemons])
939

    
940
  @classmethod
941
  @_RpcTimeout(_TMO_URGENT)
942
  def call_master_info(cls, node_list):
943
    """Query master info.
944

945
    This is a multi-node call.
946

947
    """
948
    # TODO: should this method query down nodes?
949
    return cls._StaticMultiNodeCall(node_list, "master_info", [])
950

    
951
  @classmethod
952
  @_RpcTimeout(_TMO_URGENT)
953
  def call_version(cls, node_list):
954
    """Query node version.
955

956
    This is a multi-node call.
957

958
    """
959
    return cls._StaticMultiNodeCall(node_list, "version", [])
960

    
961
  @_RpcTimeout(_TMO_NORMAL)
962
  def call_blockdev_create(self, node, bdev, size, owner, on_primary, info):
963
    """Request creation of a given block device.
964

965
    This is a single-node call.
966

967
    """
968
    return self._SingleNodeCall(node, "blockdev_create",
969
                                [bdev.ToDict(), size, owner, on_primary, info])
970

    
971
  @_RpcTimeout(_TMO_SLOW)
972
  def call_blockdev_wipe(self, node, bdev, offset, size):
973
    """Request wipe at given offset with given size of a block device.
974

975
    This is a single-node call.
976

977
    """
978
    return self._SingleNodeCall(node, "blockdev_wipe",
979
                                [bdev.ToDict(), offset, size])
980

    
981
  @_RpcTimeout(_TMO_NORMAL)
982
  def call_blockdev_remove(self, node, bdev):
983
    """Request removal of a given block device.
984

985
    This is a single-node call.
986

987
    """
988
    return self._SingleNodeCall(node, "blockdev_remove", [bdev.ToDict()])
989

    
990
  @_RpcTimeout(_TMO_NORMAL)
991
  def call_blockdev_rename(self, node, devlist):
992
    """Request rename of the given block devices.
993

994
    This is a single-node call.
995

996
    """
997
    return self._SingleNodeCall(node, "blockdev_rename",
998
                                [(d.ToDict(), uid) for d, uid in devlist])
999

    
1000
  @_RpcTimeout(_TMO_NORMAL)
1001
  def call_blockdev_pause_resume_sync(self, node, disks, pause):
1002
    """Request a pause/resume of given block device.
1003

1004
    This is a single-node call.
1005

1006
    """
1007
    return self._SingleNodeCall(node, "blockdev_pause_resume_sync",
1008
                                [[bdev.ToDict() for bdev in disks], pause])
1009

    
1010
  @_RpcTimeout(_TMO_NORMAL)
1011
  def call_blockdev_assemble(self, node, disk, owner, on_primary, idx):
1012
    """Request assembling of a given block device.
1013

1014
    This is a single-node call.
1015

1016
    """
1017
    return self._SingleNodeCall(node, "blockdev_assemble",
1018
                                [disk.ToDict(), owner, on_primary, idx])
1019

    
1020
  @_RpcTimeout(_TMO_NORMAL)
1021
  def call_blockdev_shutdown(self, node, disk):
1022
    """Request shutdown of a given block device.
1023

1024
    This is a single-node call.
1025

1026
    """
1027
    return self._SingleNodeCall(node, "blockdev_shutdown", [disk.ToDict()])
1028

    
1029
  @_RpcTimeout(_TMO_NORMAL)
1030
  def call_blockdev_addchildren(self, node, bdev, ndevs):
1031
    """Request adding a list of children to a (mirroring) device.
1032

1033
    This is a single-node call.
1034

1035
    """
1036
    return self._SingleNodeCall(node, "blockdev_addchildren",
1037
                                [bdev.ToDict(),
1038
                                 [disk.ToDict() for disk in ndevs]])
1039

    
1040
  @_RpcTimeout(_TMO_NORMAL)
1041
  def call_blockdev_removechildren(self, node, bdev, ndevs):
1042
    """Request removing a list of children from a (mirroring) device.
1043

1044
    This is a single-node call.
1045

1046
    """
1047
    return self._SingleNodeCall(node, "blockdev_removechildren",
1048
                                [bdev.ToDict(),
1049
                                 [disk.ToDict() for disk in ndevs]])
1050

    
1051
  @_RpcTimeout(_TMO_NORMAL)
1052
  def call_blockdev_getmirrorstatus(self, node, disks):
1053
    """Request status of a (mirroring) device.
1054

1055
    This is a single-node call.
1056

1057
    """
1058
    result = self._SingleNodeCall(node, "blockdev_getmirrorstatus",
1059
                                  [dsk.ToDict() for dsk in disks])
1060
    if not result.fail_msg:
1061
      result.payload = [objects.BlockDevStatus.FromDict(i)
1062
                        for i in result.payload]
1063
    return result
1064

    
1065
  @_RpcTimeout(_TMO_NORMAL)
1066
  def call_blockdev_getmirrorstatus_multi(self, node_list, node_disks):
1067
    """Request status of (mirroring) devices from multiple nodes.
1068

1069
    This is a multi-node call.
1070

1071
    """
1072
    result = self._MultiNodeCall(node_list, "blockdev_getmirrorstatus_multi",
1073
                                 [dict((name, [dsk.ToDict() for dsk in disks])
1074
                                       for name, disks in node_disks.items())])
1075
    for nres in result.values():
1076
      if nres.fail_msg:
1077
        continue
1078

    
1079
      for idx, (success, status) in enumerate(nres.payload):
1080
        if success:
1081
          nres.payload[idx] = (success, objects.BlockDevStatus.FromDict(status))
1082

    
1083
    return result
1084

    
1085
  @_RpcTimeout(_TMO_NORMAL)
1086
  def call_blockdev_find(self, node, disk):
1087
    """Request identification of a given block device.
1088

1089
    This is a single-node call.
1090

1091
    """
1092
    result = self._SingleNodeCall(node, "blockdev_find", [disk.ToDict()])
1093
    if not result.fail_msg and result.payload is not None:
1094
      result.payload = objects.BlockDevStatus.FromDict(result.payload)
1095
    return result
1096

    
1097
  @_RpcTimeout(_TMO_NORMAL)
1098
  def call_blockdev_close(self, node, instance_name, disks):
1099
    """Closes the given block devices.
1100

1101
    This is a single-node call.
1102

1103
    """
1104
    params = [instance_name, [cf.ToDict() for cf in disks]]
1105
    return self._SingleNodeCall(node, "blockdev_close", params)
1106

    
1107
  @_RpcTimeout(_TMO_NORMAL)
1108
  def call_blockdev_getsize(self, node, disks):
1109
    """Returns the size of the given disks.
1110

1111
    This is a single-node call.
1112

1113
    """
1114
    params = [[cf.ToDict() for cf in disks]]
1115
    return self._SingleNodeCall(node, "blockdev_getsize", params)
1116

    
1117
  @_RpcTimeout(_TMO_NORMAL)
1118
  def call_drbd_disconnect_net(self, node_list, nodes_ip, disks):
1119
    """Disconnects the network of the given drbd devices.
1120

1121
    This is a multi-node call.
1122

1123
    """
1124
    return self._MultiNodeCall(node_list, "drbd_disconnect_net",
1125
                               [nodes_ip, [cf.ToDict() for cf in disks]])
1126

    
1127
  @_RpcTimeout(_TMO_NORMAL)
1128
  def call_drbd_attach_net(self, node_list, nodes_ip,
1129
                           disks, instance_name, multimaster):
1130
    """Disconnects the given drbd devices.
1131

1132
    This is a multi-node call.
1133

1134
    """
1135
    return self._MultiNodeCall(node_list, "drbd_attach_net",
1136
                               [nodes_ip, [cf.ToDict() for cf in disks],
1137
                                instance_name, multimaster])
1138

    
1139
  @_RpcTimeout(_TMO_SLOW)
1140
  def call_drbd_wait_sync(self, node_list, nodes_ip, disks):
1141
    """Waits for the synchronization of drbd devices is complete.
1142

1143
    This is a multi-node call.
1144

1145
    """
1146
    return self._MultiNodeCall(node_list, "drbd_wait_sync",
1147
                               [nodes_ip, [cf.ToDict() for cf in disks]])
1148

    
1149
  @_RpcTimeout(_TMO_URGENT)
1150
  def call_drbd_helper(self, node_list):
1151
    """Gets drbd helper.
1152

1153
    This is a multi-node call.
1154

1155
    """
1156
    return self._MultiNodeCall(node_list, "drbd_helper", [])
1157

    
1158
  @classmethod
1159
  @_RpcTimeout(_TMO_NORMAL)
1160
  def call_upload_file(cls, node_list, file_name, address_list=None):
1161
    """Upload a file.
1162

1163
    The node will refuse the operation in case the file is not on the
1164
    approved file list.
1165

1166
    This is a multi-node call.
1167

1168
    @type node_list: list
1169
    @param node_list: the list of node names to upload to
1170
    @type file_name: str
1171
    @param file_name: the filename to upload
1172
    @type address_list: list or None
1173
    @keyword address_list: an optional list of node addresses, in order
1174
        to optimize the RPC speed
1175

1176
    """
1177
    file_contents = utils.ReadFile(file_name)
1178
    data = cls._Compress(file_contents)
1179
    st = os.stat(file_name)
1180
    params = [file_name, data, st.st_mode, st.st_uid, st.st_gid,
1181
              st.st_atime, st.st_mtime]
1182
    return cls._StaticMultiNodeCall(node_list, "upload_file", params,
1183
                                    address_list=address_list)
1184

    
1185
  @classmethod
1186
  @_RpcTimeout(_TMO_NORMAL)
1187
  def call_write_ssconf_files(cls, node_list, values):
1188
    """Write ssconf files.
1189

1190
    This is a multi-node call.
1191

1192
    """
1193
    return cls._StaticMultiNodeCall(node_list, "write_ssconf_files", [values])
1194

    
1195
  @_RpcTimeout(_TMO_NORMAL)
1196
  def call_run_oob(self, node, oob_program, command, remote_node, timeout):
1197
    """Runs OOB.
1198

1199
    This is a single-node call.
1200

1201
    """
1202
    return self._SingleNodeCall(node, "run_oob", [oob_program, command,
1203
                                                  remote_node, timeout])
1204

    
1205
  @_RpcTimeout(_TMO_FAST)
1206
  def call_os_diagnose(self, node_list):
1207
    """Request a diagnose of OS definitions.
1208

1209
    This is a multi-node call.
1210

1211
    """
1212
    return self._MultiNodeCall(node_list, "os_diagnose", [])
1213

    
1214
  @_RpcTimeout(_TMO_FAST)
1215
  def call_os_get(self, node, name):
1216
    """Returns an OS definition.
1217

1218
    This is a single-node call.
1219

1220
    """
1221
    result = self._SingleNodeCall(node, "os_get", [name])
1222
    if not result.fail_msg and isinstance(result.payload, dict):
1223
      result.payload = objects.OS.FromDict(result.payload)
1224
    return result
1225

    
1226
  @_RpcTimeout(_TMO_FAST)
1227
  def call_os_validate(self, required, nodes, name, checks, params):
1228
    """Run a validation routine for a given OS.
1229

1230
    This is a multi-node call.
1231

1232
    """
1233
    return self._MultiNodeCall(nodes, "os_validate",
1234
                               [required, name, checks, params])
1235

    
1236
  @_RpcTimeout(_TMO_NORMAL)
1237
  def call_hooks_runner(self, node_list, hpath, phase, env):
1238
    """Call the hooks runner.
1239

1240
    Args:
1241
      - op: the OpCode instance
1242
      - env: a dictionary with the environment
1243

1244
    This is a multi-node call.
1245

1246
    """
1247
    params = [hpath, phase, env]
1248
    return self._MultiNodeCall(node_list, "hooks_runner", params)
1249

    
1250
  @_RpcTimeout(_TMO_NORMAL)
1251
  def call_iallocator_runner(self, node, name, idata):
1252
    """Call an iallocator on a remote node
1253

1254
    Args:
1255
      - name: the iallocator name
1256
      - input: the json-encoded input string
1257

1258
    This is a single-node call.
1259

1260
    """
1261
    return self._SingleNodeCall(node, "iallocator_runner", [name, idata])
1262

    
1263
  @_RpcTimeout(_TMO_NORMAL)
1264
  def call_blockdev_grow(self, node, cf_bdev, amount):
1265
    """Request a snapshot of the given block device.
1266

1267
    This is a single-node call.
1268

1269
    """
1270
    return self._SingleNodeCall(node, "blockdev_grow",
1271
                                [cf_bdev.ToDict(), amount])
1272

    
1273
  @_RpcTimeout(_TMO_1DAY)
1274
  def call_blockdev_export(self, node, cf_bdev,
1275
                           dest_node, dest_path, cluster_name):
1276
    """Export a given disk to another node.
1277

1278
    This is a single-node call.
1279

1280
    """
1281
    return self._SingleNodeCall(node, "blockdev_export",
1282
                                [cf_bdev.ToDict(), dest_node, dest_path,
1283
                                 cluster_name])
1284

    
1285
  @_RpcTimeout(_TMO_NORMAL)
1286
  def call_blockdev_snapshot(self, node, cf_bdev):
1287
    """Request a snapshot of the given block device.
1288

1289
    This is a single-node call.
1290

1291
    """
1292
    return self._SingleNodeCall(node, "blockdev_snapshot", [cf_bdev.ToDict()])
1293

    
1294
  @_RpcTimeout(_TMO_NORMAL)
1295
  def call_finalize_export(self, node, instance, snap_disks):
1296
    """Request the completion of an export operation.
1297

1298
    This writes the export config file, etc.
1299

1300
    This is a single-node call.
1301

1302
    """
1303
    flat_disks = []
1304
    for disk in snap_disks:
1305
      if isinstance(disk, bool):
1306
        flat_disks.append(disk)
1307
      else:
1308
        flat_disks.append(disk.ToDict())
1309

    
1310
    return self._SingleNodeCall(node, "finalize_export",
1311
                                [self._InstDict(instance), flat_disks])
1312

    
1313
  @_RpcTimeout(_TMO_FAST)
1314
  def call_export_info(self, node, path):
1315
    """Queries the export information in a given path.
1316

1317
    This is a single-node call.
1318

1319
    """
1320
    return self._SingleNodeCall(node, "export_info", [path])
1321

    
1322
  @_RpcTimeout(_TMO_FAST)
1323
  def call_export_list(self, node_list):
1324
    """Gets the stored exports list.
1325

1326
    This is a multi-node call.
1327

1328
    """
1329
    return self._MultiNodeCall(node_list, "export_list", [])
1330

    
1331
  @_RpcTimeout(_TMO_FAST)
1332
  def call_export_remove(self, node, export):
1333
    """Requests removal of a given export.
1334

1335
    This is a single-node call.
1336

1337
    """
1338
    return self._SingleNodeCall(node, "export_remove", [export])
1339

    
1340
  @classmethod
1341
  @_RpcTimeout(_TMO_NORMAL)
1342
  def call_node_leave_cluster(cls, node, modify_ssh_setup):
1343
    """Requests a node to clean the cluster information it has.
1344

1345
    This will remove the configuration information from the ganeti data
1346
    dir.
1347

1348
    This is a single-node call.
1349

1350
    """
1351
    return cls._StaticSingleNodeCall(node, "node_leave_cluster",
1352
                                     [modify_ssh_setup])
1353

    
1354
  @_RpcTimeout(_TMO_FAST)
1355
  def call_node_volumes(self, node_list):
1356
    """Gets all volumes on node(s).
1357

1358
    This is a multi-node call.
1359

1360
    """
1361
    return self._MultiNodeCall(node_list, "node_volumes", [])
1362

    
1363
  @_RpcTimeout(_TMO_FAST)
1364
  def call_node_demote_from_mc(self, node):
1365
    """Demote a node from the master candidate role.
1366

1367
    This is a single-node call.
1368

1369
    """
1370
    return self._SingleNodeCall(node, "node_demote_from_mc", [])
1371

    
1372
  @_RpcTimeout(_TMO_NORMAL)
1373
  def call_node_powercycle(self, node, hypervisor):
1374
    """Tries to powercycle a node.
1375

1376
    This is a single-node call.
1377

1378
    """
1379
    return self._SingleNodeCall(node, "node_powercycle", [hypervisor])
1380

    
1381
  @_RpcTimeout(None)
1382
  def call_test_delay(self, node_list, duration):
1383
    """Sleep for a fixed time on given node(s).
1384

1385
    This is a multi-node call.
1386

1387
    """
1388
    return self._MultiNodeCall(node_list, "test_delay", [duration],
1389
                               read_timeout=int(duration + 5))
1390

    
1391
  @_RpcTimeout(_TMO_FAST)
1392
  def call_file_storage_dir_create(self, node, file_storage_dir):
1393
    """Create the given file storage directory.
1394

1395
    This is a single-node call.
1396

1397
    """
1398
    return self._SingleNodeCall(node, "file_storage_dir_create",
1399
                                [file_storage_dir])
1400

    
1401
  @_RpcTimeout(_TMO_FAST)
1402
  def call_file_storage_dir_remove(self, node, file_storage_dir):
1403
    """Remove the given file storage directory.
1404

1405
    This is a single-node call.
1406

1407
    """
1408
    return self._SingleNodeCall(node, "file_storage_dir_remove",
1409
                                [file_storage_dir])
1410

    
1411
  @_RpcTimeout(_TMO_FAST)
1412
  def call_file_storage_dir_rename(self, node, old_file_storage_dir,
1413
                                   new_file_storage_dir):
1414
    """Rename file storage directory.
1415

1416
    This is a single-node call.
1417

1418
    """
1419
    return self._SingleNodeCall(node, "file_storage_dir_rename",
1420
                                [old_file_storage_dir, new_file_storage_dir])
1421

    
1422
  @classmethod
1423
  @_RpcTimeout(_TMO_URGENT)
1424
  def call_jobqueue_update(cls, node_list, address_list, file_name, content):
1425
    """Update job queue.
1426

1427
    This is a multi-node call.
1428

1429
    """
1430
    return cls._StaticMultiNodeCall(node_list, "jobqueue_update",
1431
                                    [file_name, cls._Compress(content)],
1432
                                    address_list=address_list)
1433

    
1434
  @classmethod
1435
  @_RpcTimeout(_TMO_NORMAL)
1436
  def call_jobqueue_purge(cls, node):
1437
    """Purge job queue.
1438

1439
    This is a single-node call.
1440

1441
    """
1442
    return cls._StaticSingleNodeCall(node, "jobqueue_purge", [])
1443

    
1444
  @classmethod
1445
  @_RpcTimeout(_TMO_URGENT)
1446
  def call_jobqueue_rename(cls, node_list, address_list, rename):
1447
    """Rename a job queue file.
1448

1449
    This is a multi-node call.
1450

1451
    """
1452
    return cls._StaticMultiNodeCall(node_list, "jobqueue_rename", rename,
1453
                                    address_list=address_list)
1454

    
1455
  @_RpcTimeout(_TMO_NORMAL)
1456
  def call_hypervisor_validate_params(self, node_list, hvname, hvparams):
1457
    """Validate the hypervisor params.
1458

1459
    This is a multi-node call.
1460

1461
    @type node_list: list
1462
    @param node_list: the list of nodes to query
1463
    @type hvname: string
1464
    @param hvname: the hypervisor name
1465
    @type hvparams: dict
1466
    @param hvparams: the hypervisor parameters to be validated
1467

1468
    """
1469
    cluster = self._cfg.GetClusterInfo()
1470
    hv_full = objects.FillDict(cluster.hvparams.get(hvname, {}), hvparams)
1471
    return self._MultiNodeCall(node_list, "hypervisor_validate_params",
1472
                               [hvname, hv_full])
1473

    
1474
  @_RpcTimeout(_TMO_NORMAL)
1475
  def call_x509_cert_create(self, node, validity):
1476
    """Creates a new X509 certificate for SSL/TLS.
1477

1478
    This is a single-node call.
1479

1480
    @type validity: int
1481
    @param validity: Validity in seconds
1482

1483
    """
1484
    return self._SingleNodeCall(node, "x509_cert_create", [validity])
1485

    
1486
  @_RpcTimeout(_TMO_NORMAL)
1487
  def call_x509_cert_remove(self, node, name):
1488
    """Removes a X509 certificate.
1489

1490
    This is a single-node call.
1491

1492
    @type name: string
1493
    @param name: Certificate name
1494

1495
    """
1496
    return self._SingleNodeCall(node, "x509_cert_remove", [name])
1497

    
1498
  @_RpcTimeout(_TMO_NORMAL)
1499
  def call_import_start(self, node, opts, instance, dest, dest_args):
1500
    """Starts a listener for an import.
1501

1502
    This is a single-node call.
1503

1504
    @type node: string
1505
    @param node: Node name
1506
    @type instance: C{objects.Instance}
1507
    @param instance: Instance object
1508

1509
    """
1510
    return self._SingleNodeCall(node, "import_start",
1511
                                [opts.ToDict(),
1512
                                 self._InstDict(instance), dest,
1513
                                 _EncodeImportExportIO(dest, dest_args)])
1514

    
1515
  @_RpcTimeout(_TMO_NORMAL)
1516
  def call_export_start(self, node, opts, host, port,
1517
                        instance, source, source_args):
1518
    """Starts an export daemon.
1519

1520
    This is a single-node call.
1521

1522
    @type node: string
1523
    @param node: Node name
1524
    @type instance: C{objects.Instance}
1525
    @param instance: Instance object
1526

1527
    """
1528
    return self._SingleNodeCall(node, "export_start",
1529
                                [opts.ToDict(), host, port,
1530
                                 self._InstDict(instance), source,
1531
                                 _EncodeImportExportIO(source, source_args)])
1532

    
1533
  @_RpcTimeout(_TMO_FAST)
1534
  def call_impexp_status(self, node, names):
1535
    """Gets the status of an import or export.
1536

1537
    This is a single-node call.
1538

1539
    @type node: string
1540
    @param node: Node name
1541
    @type names: List of strings
1542
    @param names: Import/export names
1543
    @rtype: List of L{objects.ImportExportStatus} instances
1544
    @return: Returns a list of the state of each named import/export or None if
1545
             a status couldn't be retrieved
1546

1547
    """
1548
    result = self._SingleNodeCall(node, "impexp_status", [names])
1549

    
1550
    if not result.fail_msg:
1551
      decoded = []
1552

    
1553
      for i in result.payload:
1554
        if i is None:
1555
          decoded.append(None)
1556
          continue
1557
        decoded.append(objects.ImportExportStatus.FromDict(i))
1558

    
1559
      result.payload = decoded
1560

    
1561
    return result
1562

    
1563
  @_RpcTimeout(_TMO_NORMAL)
1564
  def call_impexp_abort(self, node, name):
1565
    """Aborts an import or export.
1566

1567
    This is a single-node call.
1568

1569
    @type node: string
1570
    @param node: Node name
1571
    @type name: string
1572
    @param name: Import/export name
1573

1574
    """
1575
    return self._SingleNodeCall(node, "impexp_abort", [name])
1576

    
1577
  @_RpcTimeout(_TMO_NORMAL)
1578
  def call_impexp_cleanup(self, node, name):
1579
    """Cleans up after an import or export.
1580

1581
    This is a single-node call.
1582

1583
    @type node: string
1584
    @param node: Node name
1585
    @type name: string
1586
    @param name: Import/export name
1587

1588
    """
1589
    return self._SingleNodeCall(node, "impexp_cleanup", [name])