Statistics
| Branch: | Tag: | Revision:

root / lib / rpc.py @ 6b9b18a2

History | View | Annotate | Download (46.4 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Inter-node RPC library.
23

24
"""
25

    
26
# pylint: disable-msg=C0103,R0201,R0904
27
# C0103: Invalid name, since call_ are not valid
28
# R0201: Method could be a function, we keep all rpcs instance methods
29
# as not to change them back and forth between static/instance methods
30
# if they need to start using instance attributes
31
# R0904: Too many public methods
32

    
33
import os
34
import logging
35
import zlib
36
import base64
37
import pycurl
38
import threading
39

    
40
from ganeti import utils
41
from ganeti import objects
42
from ganeti import http
43
from ganeti import serializer
44
from ganeti import constants
45
from ganeti import errors
46
from ganeti import netutils
47
from ganeti import ssconf
48

    
49
# pylint has a bug here, doesn't see this import
50
import ganeti.http.client  # pylint: disable-msg=W0611
51

    
52

    
53
# Timeout for connecting to nodes (seconds)
54
_RPC_CONNECT_TIMEOUT = 5
55

    
56
_RPC_CLIENT_HEADERS = [
57
  "Content-type: %s" % http.HTTP_APP_JSON,
58
  "Expect:",
59
  ]
60

    
61
# Various time constants for the timeout table
62
_TMO_URGENT = 60 # one minute
63
_TMO_FAST = 5 * 60 # five minutes
64
_TMO_NORMAL = 15 * 60 # 15 minutes
65
_TMO_SLOW = 3600 # one hour
66
_TMO_4HRS = 4 * 3600
67
_TMO_1DAY = 86400
68

    
69
# Timeout table that will be built later by decorators
70
# Guidelines for choosing timeouts:
71
# - call used during watcher: timeout -> 1min, _TMO_URGENT
72
# - trivial (but be sure it is trivial) (e.g. reading a file): 5min, _TMO_FAST
73
# - other calls: 15 min, _TMO_NORMAL
74
# - special calls (instance add, etc.): either _TMO_SLOW (1h) or huge timeouts
75

    
76
_TIMEOUTS = {
77
}
78

    
79

    
80
def Init():
81
  """Initializes the module-global HTTP client manager.
82

83
  Must be called before using any RPC function and while exactly one thread is
84
  running.
85

86
  """
87
  # curl_global_init(3) and curl_global_cleanup(3) must be called with only
88
  # one thread running. This check is just a safety measure -- it doesn't
89
  # cover all cases.
90
  assert threading.activeCount() == 1, \
91
         "Found more than one active thread when initializing pycURL"
92

    
93
  logging.info("Using PycURL %s", pycurl.version)
94

    
95
  pycurl.global_init(pycurl.GLOBAL_ALL)
96

    
97

    
98
def Shutdown():
99
  """Stops the module-global HTTP client manager.
100

101
  Must be called before quitting the program and while exactly one thread is
102
  running.
103

104
  """
105
  pycurl.global_cleanup()
106

    
107

    
108
def _ConfigRpcCurl(curl):
109
  noded_cert = str(constants.NODED_CERT_FILE)
110

    
111
  curl.setopt(pycurl.FOLLOWLOCATION, False)
112
  curl.setopt(pycurl.CAINFO, noded_cert)
113
  curl.setopt(pycurl.SSL_VERIFYHOST, 0)
114
  curl.setopt(pycurl.SSL_VERIFYPEER, True)
115
  curl.setopt(pycurl.SSLCERTTYPE, "PEM")
116
  curl.setopt(pycurl.SSLCERT, noded_cert)
117
  curl.setopt(pycurl.SSLKEYTYPE, "PEM")
118
  curl.setopt(pycurl.SSLKEY, noded_cert)
119
  curl.setopt(pycurl.CONNECTTIMEOUT, _RPC_CONNECT_TIMEOUT)
120

    
121

    
122
# Aliasing this module avoids the following warning by epydoc: "Warning: No
123
# information available for ganeti.rpc._RpcThreadLocal's base threading.local"
124
_threading = threading
125

    
126

    
127
class _RpcThreadLocal(_threading.local):
128
  def GetHttpClientPool(self):
129
    """Returns a per-thread HTTP client pool.
130

131
    @rtype: L{http.client.HttpClientPool}
132

133
    """
134
    try:
135
      pool = self.hcp
136
    except AttributeError:
137
      pool = http.client.HttpClientPool(_ConfigRpcCurl)
138
      self.hcp = pool
139

    
140
    return pool
141

    
142

    
143
# Remove module alias (see above)
144
del _threading
145

    
146

    
147
_thread_local = _RpcThreadLocal()
148

    
149

    
150
def _RpcTimeout(secs):
151
  """Timeout decorator.
152

153
  When applied to a rpc call_* function, it updates the global timeout
154
  table with the given function/timeout.
155

156
  """
157
  def decorator(f):
158
    name = f.__name__
159
    assert name.startswith("call_")
160
    _TIMEOUTS[name[len("call_"):]] = secs
161
    return f
162
  return decorator
163

    
164

    
165
def RunWithRPC(fn):
166
  """RPC-wrapper decorator.
167

168
  When applied to a function, it runs it with the RPC system
169
  initialized, and it shutsdown the system afterwards. This means the
170
  function must be called without RPC being initialized.
171

172
  """
173
  def wrapper(*args, **kwargs):
174
    Init()
175
    try:
176
      return fn(*args, **kwargs)
177
    finally:
178
      Shutdown()
179
  return wrapper
180

    
181

    
182
class RpcResult(object):
183
  """RPC Result class.
184

185
  This class holds an RPC result. It is needed since in multi-node
186
  calls we can't raise an exception just because one one out of many
187
  failed, and therefore we use this class to encapsulate the result.
188

189
  @ivar data: the data payload, for successful results, or None
190
  @ivar call: the name of the RPC call
191
  @ivar node: the name of the node to which we made the call
192
  @ivar offline: whether the operation failed because the node was
193
      offline, as opposed to actual failure; offline=True will always
194
      imply failed=True, in order to allow simpler checking if
195
      the user doesn't care about the exact failure mode
196
  @ivar fail_msg: the error message if the call failed
197

198
  """
199
  def __init__(self, data=None, failed=False, offline=False,
200
               call=None, node=None):
201
    self.offline = offline
202
    self.call = call
203
    self.node = node
204

    
205
    if offline:
206
      self.fail_msg = "Node is marked offline"
207
      self.data = self.payload = None
208
    elif failed:
209
      self.fail_msg = self._EnsureErr(data)
210
      self.data = self.payload = None
211
    else:
212
      self.data = data
213
      if not isinstance(self.data, (tuple, list)):
214
        self.fail_msg = ("RPC layer error: invalid result type (%s)" %
215
                         type(self.data))
216
        self.payload = None
217
      elif len(data) != 2:
218
        self.fail_msg = ("RPC layer error: invalid result length (%d), "
219
                         "expected 2" % len(self.data))
220
        self.payload = None
221
      elif not self.data[0]:
222
        self.fail_msg = self._EnsureErr(self.data[1])
223
        self.payload = None
224
      else:
225
        # finally success
226
        self.fail_msg = None
227
        self.payload = data[1]
228

    
229
    for attr_name in ["call", "data", "fail_msg",
230
                      "node", "offline", "payload"]:
231
      assert hasattr(self, attr_name), "Missing attribute %s" % attr_name
232

    
233
  @staticmethod
234
  def _EnsureErr(val):
235
    """Helper to ensure we return a 'True' value for error."""
236
    if val:
237
      return val
238
    else:
239
      return "No error information"
240

    
241
  def Raise(self, msg, prereq=False, ecode=None):
242
    """If the result has failed, raise an OpExecError.
243

244
    This is used so that LU code doesn't have to check for each
245
    result, but instead can call this function.
246

247
    """
248
    if not self.fail_msg:
249
      return
250

    
251
    if not msg: # one could pass None for default message
252
      msg = ("Call '%s' to node '%s' has failed: %s" %
253
             (self.call, self.node, self.fail_msg))
254
    else:
255
      msg = "%s: %s" % (msg, self.fail_msg)
256
    if prereq:
257
      ec = errors.OpPrereqError
258
    else:
259
      ec = errors.OpExecError
260
    if ecode is not None:
261
      args = (msg, ecode)
262
    else:
263
      args = (msg, )
264
    raise ec(*args) # pylint: disable-msg=W0142
265

    
266

    
267
def _AddressLookup(node_list,
268
                   ssc=ssconf.SimpleStore,
269
                   nslookup_fn=netutils.Hostname.GetIP):
270
  """Return addresses for given node names.
271

272
  @type node_list: list
273
  @param node_list: List of node names
274
  @type ssc: class
275
  @param ssc: SimpleStore class that is used to obtain node->ip mappings
276
  @type nslookup_fn: callable
277
  @param nslookup_fn: function use to do NS lookup
278
  @rtype: list of addresses and/or None's
279
  @returns: List of corresponding addresses, if found
280

281
  """
282
  ss = ssc()
283
  iplist = ss.GetNodePrimaryIPList()
284
  family = ss.GetPrimaryIPFamily()
285
  addresses = []
286
  ipmap = dict(entry.split() for entry in iplist)
287
  for node in node_list:
288
    address = ipmap.get(node)
289
    if address is None:
290
      address = nslookup_fn(node, family=family)
291
    addresses.append(address)
292

    
293
  return addresses
294

    
295

    
296
class Client:
297
  """RPC Client class.
298

299
  This class, given a (remote) method name, a list of parameters and a
300
  list of nodes, will contact (in parallel) all nodes, and return a
301
  dict of results (key: node name, value: result).
302

303
  One current bug is that generic failure is still signaled by
304
  'False' result, which is not good. This overloading of values can
305
  cause bugs.
306

307
  """
308
  def __init__(self, procedure, body, port, address_lookup_fn=_AddressLookup):
309
    assert procedure in _TIMEOUTS, ("New RPC call not declared in the"
310
                                    " timeouts table")
311
    self.procedure = procedure
312
    self.body = body
313
    self.port = port
314
    self._request = {}
315
    self._address_lookup_fn = address_lookup_fn
316

    
317
  def ConnectList(self, node_list, address_list=None, read_timeout=None):
318
    """Add a list of nodes to the target nodes.
319

320
    @type node_list: list
321
    @param node_list: the list of node names to connect
322
    @type address_list: list or None
323
    @keyword address_list: either None or a list with node addresses,
324
        which must have the same length as the node list
325
    @type read_timeout: int
326
    @param read_timeout: overwrites default timeout for operation
327

328
    """
329
    if address_list is None:
330
      # Always use IP address instead of node name
331
      address_list = self._address_lookup_fn(node_list)
332

    
333
    assert len(node_list) == len(address_list), \
334
           "Name and address lists must have the same length"
335

    
336
    for node, address in zip(node_list, address_list):
337
      self.ConnectNode(node, address, read_timeout=read_timeout)
338

    
339
  def ConnectNode(self, name, address=None, read_timeout=None):
340
    """Add a node to the target list.
341

342
    @type name: str
343
    @param name: the node name
344
    @type address: str
345
    @param address: the node address, if known
346
    @type read_timeout: int
347
    @param read_timeout: overwrites default timeout for operation
348

349
    """
350
    if address is None:
351
      # Always use IP address instead of node name
352
      address = self._address_lookup_fn([name])[0]
353

    
354
    assert(address is not None)
355

    
356
    if read_timeout is None:
357
      read_timeout = _TIMEOUTS[self.procedure]
358

    
359
    self._request[name] = \
360
      http.client.HttpClientRequest(str(address), self.port,
361
                                    http.HTTP_PUT, str("/%s" % self.procedure),
362
                                    headers=_RPC_CLIENT_HEADERS,
363
                                    post_data=str(self.body),
364
                                    read_timeout=read_timeout)
365

    
366
  def GetResults(self, http_pool=None):
367
    """Call nodes and return results.
368

369
    @rtype: list
370
    @return: List of RPC results
371

372
    """
373
    if not http_pool:
374
      http_pool = _thread_local.GetHttpClientPool()
375

    
376
    http_pool.ProcessRequests(self._request.values())
377

    
378
    results = {}
379

    
380
    for name, req in self._request.iteritems():
381
      if req.success and req.resp_status_code == http.HTTP_OK:
382
        results[name] = RpcResult(data=serializer.LoadJson(req.resp_body),
383
                                  node=name, call=self.procedure)
384
        continue
385

    
386
      # TODO: Better error reporting
387
      if req.error:
388
        msg = req.error
389
      else:
390
        msg = req.resp_body
391

    
392
      logging.error("RPC error in %s from node %s: %s",
393
                    self.procedure, name, msg)
394
      results[name] = RpcResult(data=msg, failed=True, node=name,
395
                                call=self.procedure)
396

    
397
    return results
398

    
399

    
400
def _EncodeImportExportIO(ieio, ieioargs):
401
  """Encodes import/export I/O information.
402

403
  """
404
  if ieio == constants.IEIO_RAW_DISK:
405
    assert len(ieioargs) == 1
406
    return (ieioargs[0].ToDict(), )
407

    
408
  if ieio == constants.IEIO_SCRIPT:
409
    assert len(ieioargs) == 2
410
    return (ieioargs[0].ToDict(), ieioargs[1])
411

    
412
  return ieioargs
413

    
414

    
415
class RpcRunner(object):
416
  """RPC runner class"""
417

    
418
  def __init__(self, cfg):
419
    """Initialized the rpc runner.
420

421
    @type cfg:  C{config.ConfigWriter}
422
    @param cfg: the configuration object that will be used to get data
423
                about the cluster
424

425
    """
426
    self._cfg = cfg
427
    self.port = netutils.GetDaemonPort(constants.NODED)
428

    
429
  def _InstDict(self, instance, hvp=None, bep=None, osp=None):
430
    """Convert the given instance to a dict.
431

432
    This is done via the instance's ToDict() method and additionally
433
    we fill the hvparams with the cluster defaults.
434

435
    @type instance: L{objects.Instance}
436
    @param instance: an Instance object
437
    @type hvp: dict or None
438
    @param hvp: a dictionary with overridden hypervisor parameters
439
    @type bep: dict or None
440
    @param bep: a dictionary with overridden backend parameters
441
    @type osp: dict or None
442
    @param osp: a dictionary with overridden os parameters
443
    @rtype: dict
444
    @return: the instance dict, with the hvparams filled with the
445
        cluster defaults
446

447
    """
448
    idict = instance.ToDict()
449
    cluster = self._cfg.GetClusterInfo()
450
    idict["hvparams"] = cluster.FillHV(instance)
451
    if hvp is not None:
452
      idict["hvparams"].update(hvp)
453
    idict["beparams"] = cluster.FillBE(instance)
454
    if bep is not None:
455
      idict["beparams"].update(bep)
456
    idict["osparams"] = cluster.SimpleFillOS(instance.os, instance.osparams)
457
    if osp is not None:
458
      idict["osparams"].update(osp)
459
    for nic in idict["nics"]:
460
      nic['nicparams'] = objects.FillDict(
461
        cluster.nicparams[constants.PP_DEFAULT],
462
        nic['nicparams'])
463
    return idict
464

    
465
  def _ConnectList(self, client, node_list, call, read_timeout=None):
466
    """Helper for computing node addresses.
467

468
    @type client: L{ganeti.rpc.Client}
469
    @param client: a C{Client} instance
470
    @type node_list: list
471
    @param node_list: the node list we should connect
472
    @type call: string
473
    @param call: the name of the remote procedure call, for filling in
474
        correctly any eventual offline nodes' results
475
    @type read_timeout: int
476
    @param read_timeout: overwrites the default read timeout for the
477
        given operation
478

479
    """
480
    all_nodes = self._cfg.GetAllNodesInfo()
481
    name_list = []
482
    addr_list = []
483
    skip_dict = {}
484
    for node in node_list:
485
      if node in all_nodes:
486
        if all_nodes[node].offline:
487
          skip_dict[node] = RpcResult(node=node, offline=True, call=call)
488
          continue
489
        val = all_nodes[node].primary_ip
490
      else:
491
        val = None
492
      addr_list.append(val)
493
      name_list.append(node)
494
    if name_list:
495
      client.ConnectList(name_list, address_list=addr_list,
496
                         read_timeout=read_timeout)
497
    return skip_dict
498

    
499
  def _ConnectNode(self, client, node, call, read_timeout=None):
500
    """Helper for computing one node's address.
501

502
    @type client: L{ganeti.rpc.Client}
503
    @param client: a C{Client} instance
504
    @type node: str
505
    @param node: the node we should connect
506
    @type call: string
507
    @param call: the name of the remote procedure call, for filling in
508
        correctly any eventual offline nodes' results
509
    @type read_timeout: int
510
    @param read_timeout: overwrites the default read timeout for the
511
        given operation
512

513
    """
514
    node_info = self._cfg.GetNodeInfo(node)
515
    if node_info is not None:
516
      if node_info.offline:
517
        return RpcResult(node=node, offline=True, call=call)
518
      addr = node_info.primary_ip
519
    else:
520
      addr = None
521
    client.ConnectNode(node, address=addr, read_timeout=read_timeout)
522

    
523
  def _MultiNodeCall(self, node_list, procedure, args, read_timeout=None):
524
    """Helper for making a multi-node call
525

526
    """
527
    body = serializer.DumpJson(args, indent=False)
528
    c = Client(procedure, body, self.port)
529
    skip_dict = self._ConnectList(c, node_list, procedure,
530
                                  read_timeout=read_timeout)
531
    skip_dict.update(c.GetResults())
532
    return skip_dict
533

    
534
  @classmethod
535
  def _StaticMultiNodeCall(cls, node_list, procedure, args,
536
                           address_list=None, read_timeout=None):
537
    """Helper for making a multi-node static call
538

539
    """
540
    body = serializer.DumpJson(args, indent=False)
541
    c = Client(procedure, body, netutils.GetDaemonPort(constants.NODED))
542
    c.ConnectList(node_list, address_list=address_list,
543
                  read_timeout=read_timeout)
544
    return c.GetResults()
545

    
546
  def _SingleNodeCall(self, node, procedure, args, read_timeout=None):
547
    """Helper for making a single-node call
548

549
    """
550
    body = serializer.DumpJson(args, indent=False)
551
    c = Client(procedure, body, self.port)
552
    result = self._ConnectNode(c, node, procedure, read_timeout=read_timeout)
553
    if result is None:
554
      # we did connect, node is not offline
555
      result = c.GetResults()[node]
556
    return result
557

    
558
  @classmethod
559
  def _StaticSingleNodeCall(cls, node, procedure, args, read_timeout=None):
560
    """Helper for making a single-node static call
561

562
    """
563
    body = serializer.DumpJson(args, indent=False)
564
    c = Client(procedure, body, netutils.GetDaemonPort(constants.NODED))
565
    c.ConnectNode(node, read_timeout=read_timeout)
566
    return c.GetResults()[node]
567

    
568
  @staticmethod
569
  def _Compress(data):
570
    """Compresses a string for transport over RPC.
571

572
    Small amounts of data are not compressed.
573

574
    @type data: str
575
    @param data: Data
576
    @rtype: tuple
577
    @return: Encoded data to send
578

579
    """
580
    # Small amounts of data are not compressed
581
    if len(data) < 512:
582
      return (constants.RPC_ENCODING_NONE, data)
583

    
584
    # Compress with zlib and encode in base64
585
    return (constants.RPC_ENCODING_ZLIB_BASE64,
586
            base64.b64encode(zlib.compress(data, 3)))
587

    
588
  #
589
  # Begin RPC calls
590
  #
591

    
592
  @_RpcTimeout(_TMO_URGENT)
593
  def call_lv_list(self, node_list, vg_name):
594
    """Gets the logical volumes present in a given volume group.
595

596
    This is a multi-node call.
597

598
    """
599
    return self._MultiNodeCall(node_list, "lv_list", [vg_name])
600

    
601
  @_RpcTimeout(_TMO_URGENT)
602
  def call_vg_list(self, node_list):
603
    """Gets the volume group list.
604

605
    This is a multi-node call.
606

607
    """
608
    return self._MultiNodeCall(node_list, "vg_list", [])
609

    
610
  @_RpcTimeout(_TMO_NORMAL)
611
  def call_storage_list(self, node_list, su_name, su_args, name, fields):
612
    """Get list of storage units.
613

614
    This is a multi-node call.
615

616
    """
617
    return self._MultiNodeCall(node_list, "storage_list",
618
                               [su_name, su_args, name, fields])
619

    
620
  @_RpcTimeout(_TMO_NORMAL)
621
  def call_storage_modify(self, node, su_name, su_args, name, changes):
622
    """Modify a storage unit.
623

624
    This is a single-node call.
625

626
    """
627
    return self._SingleNodeCall(node, "storage_modify",
628
                                [su_name, su_args, name, changes])
629

    
630
  @_RpcTimeout(_TMO_NORMAL)
631
  def call_storage_execute(self, node, su_name, su_args, name, op):
632
    """Executes an operation on a storage unit.
633

634
    This is a single-node call.
635

636
    """
637
    return self._SingleNodeCall(node, "storage_execute",
638
                                [su_name, su_args, name, op])
639

    
640
  @_RpcTimeout(_TMO_URGENT)
641
  def call_bridges_exist(self, node, bridges_list):
642
    """Checks if a node has all the bridges given.
643

644
    This method checks if all bridges given in the bridges_list are
645
    present on the remote node, so that an instance that uses interfaces
646
    on those bridges can be started.
647

648
    This is a single-node call.
649

650
    """
651
    return self._SingleNodeCall(node, "bridges_exist", [bridges_list])
652

    
653
  @_RpcTimeout(_TMO_NORMAL)
654
  def call_instance_start(self, node, instance, hvp, bep):
655
    """Starts an instance.
656

657
    This is a single-node call.
658

659
    """
660
    idict = self._InstDict(instance, hvp=hvp, bep=bep)
661
    return self._SingleNodeCall(node, "instance_start", [idict])
662

    
663
  @_RpcTimeout(_TMO_NORMAL)
664
  def call_instance_shutdown(self, node, instance, timeout):
665
    """Stops an instance.
666

667
    This is a single-node call.
668

669
    """
670
    return self._SingleNodeCall(node, "instance_shutdown",
671
                                [self._InstDict(instance), timeout])
672

    
673
  @_RpcTimeout(_TMO_NORMAL)
674
  def call_migration_info(self, node, instance):
675
    """Gather the information necessary to prepare an instance migration.
676

677
    This is a single-node call.
678

679
    @type node: string
680
    @param node: the node on which the instance is currently running
681
    @type instance: C{objects.Instance}
682
    @param instance: the instance definition
683

684
    """
685
    return self._SingleNodeCall(node, "migration_info",
686
                                [self._InstDict(instance)])
687

    
688
  @_RpcTimeout(_TMO_NORMAL)
689
  def call_accept_instance(self, node, instance, info, target):
690
    """Prepare a node to accept an instance.
691

692
    This is a single-node call.
693

694
    @type node: string
695
    @param node: the target node for the migration
696
    @type instance: C{objects.Instance}
697
    @param instance: the instance definition
698
    @type info: opaque/hypervisor specific (string/data)
699
    @param info: result for the call_migration_info call
700
    @type target: string
701
    @param target: target hostname (usually ip address) (on the node itself)
702

703
    """
704
    return self._SingleNodeCall(node, "accept_instance",
705
                                [self._InstDict(instance), info, target])
706

    
707
  @_RpcTimeout(_TMO_NORMAL)
708
  def call_finalize_migration(self, node, instance, info, success):
709
    """Finalize any target-node migration specific operation.
710

711
    This is called both in case of a successful migration and in case of error
712
    (in which case it should abort the migration).
713

714
    This is a single-node call.
715

716
    @type node: string
717
    @param node: the target node for the migration
718
    @type instance: C{objects.Instance}
719
    @param instance: the instance definition
720
    @type info: opaque/hypervisor specific (string/data)
721
    @param info: result for the call_migration_info call
722
    @type success: boolean
723
    @param success: whether the migration was a success or a failure
724

725
    """
726
    return self._SingleNodeCall(node, "finalize_migration",
727
                                [self._InstDict(instance), info, success])
728

    
729
  @_RpcTimeout(_TMO_SLOW)
730
  def call_instance_migrate(self, node, instance, target, live):
731
    """Migrate an instance.
732

733
    This is a single-node call.
734

735
    @type node: string
736
    @param node: the node on which the instance is currently running
737
    @type instance: C{objects.Instance}
738
    @param instance: the instance definition
739
    @type target: string
740
    @param target: the target node name
741
    @type live: boolean
742
    @param live: whether the migration should be done live or not (the
743
        interpretation of this parameter is left to the hypervisor)
744

745
    """
746
    return self._SingleNodeCall(node, "instance_migrate",
747
                                [self._InstDict(instance), target, live])
748

    
749
  @_RpcTimeout(_TMO_NORMAL)
750
  def call_instance_reboot(self, node, inst, reboot_type, shutdown_timeout):
751
    """Reboots an instance.
752

753
    This is a single-node call.
754

755
    """
756
    return self._SingleNodeCall(node, "instance_reboot",
757
                                [self._InstDict(inst), reboot_type,
758
                                 shutdown_timeout])
759

    
760
  @_RpcTimeout(_TMO_1DAY)
761
  def call_instance_os_add(self, node, inst, reinstall, debug, osparams=None):
762
    """Installs an OS on the given instance.
763

764
    This is a single-node call.
765

766
    """
767
    return self._SingleNodeCall(node, "instance_os_add",
768
                                [self._InstDict(inst, osp=osparams),
769
                                 reinstall, debug])
770

    
771
  @_RpcTimeout(_TMO_SLOW)
772
  def call_instance_run_rename(self, node, inst, old_name, debug):
773
    """Run the OS rename script for an instance.
774

775
    This is a single-node call.
776

777
    """
778
    return self._SingleNodeCall(node, "instance_run_rename",
779
                                [self._InstDict(inst), old_name, debug])
780

    
781
  @_RpcTimeout(_TMO_URGENT)
782
  def call_instance_info(self, node, instance, hname):
783
    """Returns information about a single instance.
784

785
    This is a single-node call.
786

787
    @type node: list
788
    @param node: the list of nodes to query
789
    @type instance: string
790
    @param instance: the instance name
791
    @type hname: string
792
    @param hname: the hypervisor type of the instance
793

794
    """
795
    return self._SingleNodeCall(node, "instance_info", [instance, hname])
796

    
797
  @_RpcTimeout(_TMO_NORMAL)
798
  def call_instance_migratable(self, node, instance):
799
    """Checks whether the given instance can be migrated.
800

801
    This is a single-node call.
802

803
    @param node: the node to query
804
    @type instance: L{objects.Instance}
805
    @param instance: the instance to check
806

807

808
    """
809
    return self._SingleNodeCall(node, "instance_migratable",
810
                                [self._InstDict(instance)])
811

    
812
  @_RpcTimeout(_TMO_URGENT)
813
  def call_all_instances_info(self, node_list, hypervisor_list):
814
    """Returns information about all instances on the given nodes.
815

816
    This is a multi-node call.
817

818
    @type node_list: list
819
    @param node_list: the list of nodes to query
820
    @type hypervisor_list: list
821
    @param hypervisor_list: the hypervisors to query for instances
822

823
    """
824
    return self._MultiNodeCall(node_list, "all_instances_info",
825
                               [hypervisor_list])
826

    
827
  @_RpcTimeout(_TMO_URGENT)
828
  def call_instance_list(self, node_list, hypervisor_list):
829
    """Returns the list of running instances on a given node.
830

831
    This is a multi-node call.
832

833
    @type node_list: list
834
    @param node_list: the list of nodes to query
835
    @type hypervisor_list: list
836
    @param hypervisor_list: the hypervisors to query for instances
837

838
    """
839
    return self._MultiNodeCall(node_list, "instance_list", [hypervisor_list])
840

    
841
  @_RpcTimeout(_TMO_FAST)
842
  def call_node_tcp_ping(self, node, source, target, port, timeout,
843
                         live_port_needed):
844
    """Do a TcpPing on the remote node
845

846
    This is a single-node call.
847

848
    """
849
    return self._SingleNodeCall(node, "node_tcp_ping",
850
                                [source, target, port, timeout,
851
                                 live_port_needed])
852

    
853
  @_RpcTimeout(_TMO_FAST)
854
  def call_node_has_ip_address(self, node, address):
855
    """Checks if a node has the given IP address.
856

857
    This is a single-node call.
858

859
    """
860
    return self._SingleNodeCall(node, "node_has_ip_address", [address])
861

    
862
  @_RpcTimeout(_TMO_URGENT)
863
  def call_node_info(self, node_list, vg_name, hypervisor_type):
864
    """Return node information.
865

866
    This will return memory information and volume group size and free
867
    space.
868

869
    This is a multi-node call.
870

871
    @type node_list: list
872
    @param node_list: the list of nodes to query
873
    @type vg_name: C{string}
874
    @param vg_name: the name of the volume group to ask for disk space
875
        information
876
    @type hypervisor_type: C{str}
877
    @param hypervisor_type: the name of the hypervisor to ask for
878
        memory information
879

880
    """
881
    return self._MultiNodeCall(node_list, "node_info",
882
                               [vg_name, hypervisor_type])
883

    
884
  @_RpcTimeout(_TMO_NORMAL)
885
  def call_etc_hosts_modify(self, node, mode, name, ip):
886
    """Modify hosts file with name
887

888
    @type node: string
889
    @param node: The node to call
890
    @type mode: string
891
    @param mode: The mode to operate. Currently "add" or "remove"
892
    @type name: string
893
    @param name: The host name to be modified
894
    @type ip: string
895
    @param ip: The ip of the entry (just valid if mode is "add")
896

897
    """
898
    return self._SingleNodeCall(node, "etc_hosts_modify", [mode, name, ip])
899

    
900
  @_RpcTimeout(_TMO_NORMAL)
901
  def call_node_verify(self, node_list, checkdict, cluster_name):
902
    """Request verification of given parameters.
903

904
    This is a multi-node call.
905

906
    """
907
    return self._MultiNodeCall(node_list, "node_verify",
908
                               [checkdict, cluster_name])
909

    
910
  @classmethod
911
  @_RpcTimeout(_TMO_FAST)
912
  def call_node_start_master(cls, node, start_daemons, no_voting):
913
    """Tells a node to activate itself as a master.
914

915
    This is a single-node call.
916

917
    """
918
    return cls._StaticSingleNodeCall(node, "node_start_master",
919
                                     [start_daemons, no_voting])
920

    
921
  @classmethod
922
  @_RpcTimeout(_TMO_FAST)
923
  def call_node_stop_master(cls, node, stop_daemons):
924
    """Tells a node to demote itself from master status.
925

926
    This is a single-node call.
927

928
    """
929
    return cls._StaticSingleNodeCall(node, "node_stop_master", [stop_daemons])
930

    
931
  @classmethod
932
  @_RpcTimeout(_TMO_URGENT)
933
  def call_master_info(cls, node_list):
934
    """Query master info.
935

936
    This is a multi-node call.
937

938
    """
939
    # TODO: should this method query down nodes?
940
    return cls._StaticMultiNodeCall(node_list, "master_info", [])
941

    
942
  @classmethod
943
  @_RpcTimeout(_TMO_URGENT)
944
  def call_version(cls, node_list):
945
    """Query node version.
946

947
    This is a multi-node call.
948

949
    """
950
    return cls._StaticMultiNodeCall(node_list, "version", [])
951

    
952
  @_RpcTimeout(_TMO_NORMAL)
953
  def call_blockdev_create(self, node, bdev, size, owner, on_primary, info):
954
    """Request creation of a given block device.
955

956
    This is a single-node call.
957

958
    """
959
    return self._SingleNodeCall(node, "blockdev_create",
960
                                [bdev.ToDict(), size, owner, on_primary, info])
961

    
962
  @_RpcTimeout(_TMO_SLOW)
963
  def call_blockdev_wipe(self, node, bdev, offset, size):
964
    """Request wipe at given offset with given size of a block device.
965

966
    This is a single-node call.
967

968
    """
969
    return self._SingleNodeCall(node, "blockdev_wipe",
970
                                [bdev.ToDict(), offset, size])
971

    
972
  @_RpcTimeout(_TMO_NORMAL)
973
  def call_blockdev_remove(self, node, bdev):
974
    """Request removal of a given block device.
975

976
    This is a single-node call.
977

978
    """
979
    return self._SingleNodeCall(node, "blockdev_remove", [bdev.ToDict()])
980

    
981
  @_RpcTimeout(_TMO_NORMAL)
982
  def call_blockdev_rename(self, node, devlist):
983
    """Request rename of the given block devices.
984

985
    This is a single-node call.
986

987
    """
988
    return self._SingleNodeCall(node, "blockdev_rename",
989
                                [(d.ToDict(), uid) for d, uid in devlist])
990

    
991
  @_RpcTimeout(_TMO_NORMAL)
992
  def call_blockdev_pause_resume_sync(self, node, disks, pause):
993
    """Request a pause/resume of given block device.
994

995
    This is a single-node call.
996

997
    """
998
    return self._SingleNodeCall(node, "blockdev_pause_resume_sync",
999
                                [[bdev.ToDict() for bdev in disks], pause])
1000

    
1001
  @_RpcTimeout(_TMO_NORMAL)
1002
  def call_blockdev_assemble(self, node, disk, owner, on_primary, idx):
1003
    """Request assembling of a given block device.
1004

1005
    This is a single-node call.
1006

1007
    """
1008
    return self._SingleNodeCall(node, "blockdev_assemble",
1009
                                [disk.ToDict(), owner, on_primary, idx])
1010

    
1011
  @_RpcTimeout(_TMO_NORMAL)
1012
  def call_blockdev_shutdown(self, node, disk):
1013
    """Request shutdown of a given block device.
1014

1015
    This is a single-node call.
1016

1017
    """
1018
    return self._SingleNodeCall(node, "blockdev_shutdown", [disk.ToDict()])
1019

    
1020
  @_RpcTimeout(_TMO_NORMAL)
1021
  def call_blockdev_addchildren(self, node, bdev, ndevs):
1022
    """Request adding a list of children to a (mirroring) device.
1023

1024
    This is a single-node call.
1025

1026
    """
1027
    return self._SingleNodeCall(node, "blockdev_addchildren",
1028
                                [bdev.ToDict(),
1029
                                 [disk.ToDict() for disk in ndevs]])
1030

    
1031
  @_RpcTimeout(_TMO_NORMAL)
1032
  def call_blockdev_removechildren(self, node, bdev, ndevs):
1033
    """Request removing a list of children from a (mirroring) device.
1034

1035
    This is a single-node call.
1036

1037
    """
1038
    return self._SingleNodeCall(node, "blockdev_removechildren",
1039
                                [bdev.ToDict(),
1040
                                 [disk.ToDict() for disk in ndevs]])
1041

    
1042
  @_RpcTimeout(_TMO_NORMAL)
1043
  def call_blockdev_getmirrorstatus(self, node, disks):
1044
    """Request status of a (mirroring) device.
1045

1046
    This is a single-node call.
1047

1048
    """
1049
    result = self._SingleNodeCall(node, "blockdev_getmirrorstatus",
1050
                                  [dsk.ToDict() for dsk in disks])
1051
    if not result.fail_msg:
1052
      result.payload = [objects.BlockDevStatus.FromDict(i)
1053
                        for i in result.payload]
1054
    return result
1055

    
1056
  @_RpcTimeout(_TMO_NORMAL)
1057
  def call_blockdev_getmirrorstatus_multi(self, node_list, node_disks):
1058
    """Request status of (mirroring) devices from multiple nodes.
1059

1060
    This is a multi-node call.
1061

1062
    """
1063
    result = self._MultiNodeCall(node_list, "blockdev_getmirrorstatus_multi",
1064
                                 [dict((name, [dsk.ToDict() for dsk in disks])
1065
                                       for name, disks in node_disks.items())])
1066
    for nres in result.values():
1067
      if nres.fail_msg:
1068
        continue
1069

    
1070
      for idx, (success, status) in enumerate(nres.payload):
1071
        if success:
1072
          nres.payload[idx] = (success, objects.BlockDevStatus.FromDict(status))
1073

    
1074
    return result
1075

    
1076
  @_RpcTimeout(_TMO_NORMAL)
1077
  def call_blockdev_find(self, node, disk):
1078
    """Request identification of a given block device.
1079

1080
    This is a single-node call.
1081

1082
    """
1083
    result = self._SingleNodeCall(node, "blockdev_find", [disk.ToDict()])
1084
    if not result.fail_msg and result.payload is not None:
1085
      result.payload = objects.BlockDevStatus.FromDict(result.payload)
1086
    return result
1087

    
1088
  @_RpcTimeout(_TMO_NORMAL)
1089
  def call_blockdev_close(self, node, instance_name, disks):
1090
    """Closes the given block devices.
1091

1092
    This is a single-node call.
1093

1094
    """
1095
    params = [instance_name, [cf.ToDict() for cf in disks]]
1096
    return self._SingleNodeCall(node, "blockdev_close", params)
1097

    
1098
  @_RpcTimeout(_TMO_NORMAL)
1099
  def call_blockdev_getsize(self, node, disks):
1100
    """Returns the size of the given disks.
1101

1102
    This is a single-node call.
1103

1104
    """
1105
    params = [[cf.ToDict() for cf in disks]]
1106
    return self._SingleNodeCall(node, "blockdev_getsize", params)
1107

    
1108
  @_RpcTimeout(_TMO_NORMAL)
1109
  def call_drbd_disconnect_net(self, node_list, nodes_ip, disks):
1110
    """Disconnects the network of the given drbd devices.
1111

1112
    This is a multi-node call.
1113

1114
    """
1115
    return self._MultiNodeCall(node_list, "drbd_disconnect_net",
1116
                               [nodes_ip, [cf.ToDict() for cf in disks]])
1117

    
1118
  @_RpcTimeout(_TMO_NORMAL)
1119
  def call_drbd_attach_net(self, node_list, nodes_ip,
1120
                           disks, instance_name, multimaster):
1121
    """Disconnects the given drbd devices.
1122

1123
    This is a multi-node call.
1124

1125
    """
1126
    return self._MultiNodeCall(node_list, "drbd_attach_net",
1127
                               [nodes_ip, [cf.ToDict() for cf in disks],
1128
                                instance_name, multimaster])
1129

    
1130
  @_RpcTimeout(_TMO_SLOW)
1131
  def call_drbd_wait_sync(self, node_list, nodes_ip, disks):
1132
    """Waits for the synchronization of drbd devices is complete.
1133

1134
    This is a multi-node call.
1135

1136
    """
1137
    return self._MultiNodeCall(node_list, "drbd_wait_sync",
1138
                               [nodes_ip, [cf.ToDict() for cf in disks]])
1139

    
1140
  @_RpcTimeout(_TMO_URGENT)
1141
  def call_drbd_helper(self, node_list):
1142
    """Gets drbd helper.
1143

1144
    This is a multi-node call.
1145

1146
    """
1147
    return self._MultiNodeCall(node_list, "drbd_helper", [])
1148

    
1149
  @classmethod
1150
  @_RpcTimeout(_TMO_NORMAL)
1151
  def call_upload_file(cls, node_list, file_name, address_list=None):
1152
    """Upload a file.
1153

1154
    The node will refuse the operation in case the file is not on the
1155
    approved file list.
1156

1157
    This is a multi-node call.
1158

1159
    @type node_list: list
1160
    @param node_list: the list of node names to upload to
1161
    @type file_name: str
1162
    @param file_name: the filename to upload
1163
    @type address_list: list or None
1164
    @keyword address_list: an optional list of node addresses, in order
1165
        to optimize the RPC speed
1166

1167
    """
1168
    file_contents = utils.ReadFile(file_name)
1169
    data = cls._Compress(file_contents)
1170
    st = os.stat(file_name)
1171
    params = [file_name, data, st.st_mode, st.st_uid, st.st_gid,
1172
              st.st_atime, st.st_mtime]
1173
    return cls._StaticMultiNodeCall(node_list, "upload_file", params,
1174
                                    address_list=address_list)
1175

    
1176
  @classmethod
1177
  @_RpcTimeout(_TMO_NORMAL)
1178
  def call_write_ssconf_files(cls, node_list, values):
1179
    """Write ssconf files.
1180

1181
    This is a multi-node call.
1182

1183
    """
1184
    return cls._StaticMultiNodeCall(node_list, "write_ssconf_files", [values])
1185

    
1186
  @_RpcTimeout(_TMO_NORMAL)
1187
  def call_run_oob(self, node, oob_program, command, remote_node, timeout):
1188
    """Runs OOB.
1189

1190
    This is a single-node call.
1191

1192
    """
1193
    return self._SingleNodeCall(node, "run_oob", [oob_program, command,
1194
                                                  remote_node, timeout])
1195

    
1196
  @_RpcTimeout(_TMO_FAST)
1197
  def call_os_diagnose(self, node_list):
1198
    """Request a diagnose of OS definitions.
1199

1200
    This is a multi-node call.
1201

1202
    """
1203
    return self._MultiNodeCall(node_list, "os_diagnose", [])
1204

    
1205
  @_RpcTimeout(_TMO_FAST)
1206
  def call_os_get(self, node, name):
1207
    """Returns an OS definition.
1208

1209
    This is a single-node call.
1210

1211
    """
1212
    result = self._SingleNodeCall(node, "os_get", [name])
1213
    if not result.fail_msg and isinstance(result.payload, dict):
1214
      result.payload = objects.OS.FromDict(result.payload)
1215
    return result
1216

    
1217
  @_RpcTimeout(_TMO_FAST)
1218
  def call_os_validate(self, required, nodes, name, checks, params):
1219
    """Run a validation routine for a given OS.
1220

1221
    This is a multi-node call.
1222

1223
    """
1224
    return self._MultiNodeCall(nodes, "os_validate",
1225
                               [required, name, checks, params])
1226

    
1227
  @_RpcTimeout(_TMO_NORMAL)
1228
  def call_hooks_runner(self, node_list, hpath, phase, env):
1229
    """Call the hooks runner.
1230

1231
    Args:
1232
      - op: the OpCode instance
1233
      - env: a dictionary with the environment
1234

1235
    This is a multi-node call.
1236

1237
    """
1238
    params = [hpath, phase, env]
1239
    return self._MultiNodeCall(node_list, "hooks_runner", params)
1240

    
1241
  @_RpcTimeout(_TMO_NORMAL)
1242
  def call_iallocator_runner(self, node, name, idata):
1243
    """Call an iallocator on a remote node
1244

1245
    Args:
1246
      - name: the iallocator name
1247
      - input: the json-encoded input string
1248

1249
    This is a single-node call.
1250

1251
    """
1252
    return self._SingleNodeCall(node, "iallocator_runner", [name, idata])
1253

    
1254
  @_RpcTimeout(_TMO_NORMAL)
1255
  def call_blockdev_grow(self, node, cf_bdev, amount):
1256
    """Request a snapshot of the given block device.
1257

1258
    This is a single-node call.
1259

1260
    """
1261
    return self._SingleNodeCall(node, "blockdev_grow",
1262
                                [cf_bdev.ToDict(), amount])
1263

    
1264
  @_RpcTimeout(_TMO_1DAY)
1265
  def call_blockdev_export(self, node, cf_bdev,
1266
                           dest_node, dest_path, cluster_name):
1267
    """Export a given disk to another node.
1268

1269
    This is a single-node call.
1270

1271
    """
1272
    return self._SingleNodeCall(node, "blockdev_export",
1273
                                [cf_bdev.ToDict(), dest_node, dest_path,
1274
                                 cluster_name])
1275

    
1276
  @_RpcTimeout(_TMO_NORMAL)
1277
  def call_blockdev_snapshot(self, node, cf_bdev):
1278
    """Request a snapshot of the given block device.
1279

1280
    This is a single-node call.
1281

1282
    """
1283
    return self._SingleNodeCall(node, "blockdev_snapshot", [cf_bdev.ToDict()])
1284

    
1285
  @_RpcTimeout(_TMO_NORMAL)
1286
  def call_finalize_export(self, node, instance, snap_disks):
1287
    """Request the completion of an export operation.
1288

1289
    This writes the export config file, etc.
1290

1291
    This is a single-node call.
1292

1293
    """
1294
    flat_disks = []
1295
    for disk in snap_disks:
1296
      if isinstance(disk, bool):
1297
        flat_disks.append(disk)
1298
      else:
1299
        flat_disks.append(disk.ToDict())
1300

    
1301
    return self._SingleNodeCall(node, "finalize_export",
1302
                                [self._InstDict(instance), flat_disks])
1303

    
1304
  @_RpcTimeout(_TMO_FAST)
1305
  def call_export_info(self, node, path):
1306
    """Queries the export information in a given path.
1307

1308
    This is a single-node call.
1309

1310
    """
1311
    return self._SingleNodeCall(node, "export_info", [path])
1312

    
1313
  @_RpcTimeout(_TMO_FAST)
1314
  def call_export_list(self, node_list):
1315
    """Gets the stored exports list.
1316

1317
    This is a multi-node call.
1318

1319
    """
1320
    return self._MultiNodeCall(node_list, "export_list", [])
1321

    
1322
  @_RpcTimeout(_TMO_FAST)
1323
  def call_export_remove(self, node, export):
1324
    """Requests removal of a given export.
1325

1326
    This is a single-node call.
1327

1328
    """
1329
    return self._SingleNodeCall(node, "export_remove", [export])
1330

    
1331
  @classmethod
1332
  @_RpcTimeout(_TMO_NORMAL)
1333
  def call_node_leave_cluster(cls, node, modify_ssh_setup):
1334
    """Requests a node to clean the cluster information it has.
1335

1336
    This will remove the configuration information from the ganeti data
1337
    dir.
1338

1339
    This is a single-node call.
1340

1341
    """
1342
    return cls._StaticSingleNodeCall(node, "node_leave_cluster",
1343
                                     [modify_ssh_setup])
1344

    
1345
  @_RpcTimeout(_TMO_FAST)
1346
  def call_node_volumes(self, node_list):
1347
    """Gets all volumes on node(s).
1348

1349
    This is a multi-node call.
1350

1351
    """
1352
    return self._MultiNodeCall(node_list, "node_volumes", [])
1353

    
1354
  @_RpcTimeout(_TMO_FAST)
1355
  def call_node_demote_from_mc(self, node):
1356
    """Demote a node from the master candidate role.
1357

1358
    This is a single-node call.
1359

1360
    """
1361
    return self._SingleNodeCall(node, "node_demote_from_mc", [])
1362

    
1363
  @_RpcTimeout(_TMO_NORMAL)
1364
  def call_node_powercycle(self, node, hypervisor):
1365
    """Tries to powercycle a node.
1366

1367
    This is a single-node call.
1368

1369
    """
1370
    return self._SingleNodeCall(node, "node_powercycle", [hypervisor])
1371

    
1372
  @_RpcTimeout(None)
1373
  def call_test_delay(self, node_list, duration):
1374
    """Sleep for a fixed time on given node(s).
1375

1376
    This is a multi-node call.
1377

1378
    """
1379
    return self._MultiNodeCall(node_list, "test_delay", [duration],
1380
                               read_timeout=int(duration + 5))
1381

    
1382
  @_RpcTimeout(_TMO_FAST)
1383
  def call_file_storage_dir_create(self, node, file_storage_dir):
1384
    """Create the given file storage directory.
1385

1386
    This is a single-node call.
1387

1388
    """
1389
    return self._SingleNodeCall(node, "file_storage_dir_create",
1390
                                [file_storage_dir])
1391

    
1392
  @_RpcTimeout(_TMO_FAST)
1393
  def call_file_storage_dir_remove(self, node, file_storage_dir):
1394
    """Remove the given file storage directory.
1395

1396
    This is a single-node call.
1397

1398
    """
1399
    return self._SingleNodeCall(node, "file_storage_dir_remove",
1400
                                [file_storage_dir])
1401

    
1402
  @_RpcTimeout(_TMO_FAST)
1403
  def call_file_storage_dir_rename(self, node, old_file_storage_dir,
1404
                                   new_file_storage_dir):
1405
    """Rename file storage directory.
1406

1407
    This is a single-node call.
1408

1409
    """
1410
    return self._SingleNodeCall(node, "file_storage_dir_rename",
1411
                                [old_file_storage_dir, new_file_storage_dir])
1412

    
1413
  @classmethod
1414
  @_RpcTimeout(_TMO_URGENT)
1415
  def call_jobqueue_update(cls, node_list, address_list, file_name, content):
1416
    """Update job queue.
1417

1418
    This is a multi-node call.
1419

1420
    """
1421
    return cls._StaticMultiNodeCall(node_list, "jobqueue_update",
1422
                                    [file_name, cls._Compress(content)],
1423
                                    address_list=address_list)
1424

    
1425
  @classmethod
1426
  @_RpcTimeout(_TMO_NORMAL)
1427
  def call_jobqueue_purge(cls, node):
1428
    """Purge job queue.
1429

1430
    This is a single-node call.
1431

1432
    """
1433
    return cls._StaticSingleNodeCall(node, "jobqueue_purge", [])
1434

    
1435
  @classmethod
1436
  @_RpcTimeout(_TMO_URGENT)
1437
  def call_jobqueue_rename(cls, node_list, address_list, rename):
1438
    """Rename a job queue file.
1439

1440
    This is a multi-node call.
1441

1442
    """
1443
    return cls._StaticMultiNodeCall(node_list, "jobqueue_rename", rename,
1444
                                    address_list=address_list)
1445

    
1446
  @_RpcTimeout(_TMO_NORMAL)
1447
  def call_hypervisor_validate_params(self, node_list, hvname, hvparams):
1448
    """Validate the hypervisor params.
1449

1450
    This is a multi-node call.
1451

1452
    @type node_list: list
1453
    @param node_list: the list of nodes to query
1454
    @type hvname: string
1455
    @param hvname: the hypervisor name
1456
    @type hvparams: dict
1457
    @param hvparams: the hypervisor parameters to be validated
1458

1459
    """
1460
    cluster = self._cfg.GetClusterInfo()
1461
    hv_full = objects.FillDict(cluster.hvparams.get(hvname, {}), hvparams)
1462
    return self._MultiNodeCall(node_list, "hypervisor_validate_params",
1463
                               [hvname, hv_full])
1464

    
1465
  @_RpcTimeout(_TMO_NORMAL)
1466
  def call_x509_cert_create(self, node, validity):
1467
    """Creates a new X509 certificate for SSL/TLS.
1468

1469
    This is a single-node call.
1470

1471
    @type validity: int
1472
    @param validity: Validity in seconds
1473

1474
    """
1475
    return self._SingleNodeCall(node, "x509_cert_create", [validity])
1476

    
1477
  @_RpcTimeout(_TMO_NORMAL)
1478
  def call_x509_cert_remove(self, node, name):
1479
    """Removes a X509 certificate.
1480

1481
    This is a single-node call.
1482

1483
    @type name: string
1484
    @param name: Certificate name
1485

1486
    """
1487
    return self._SingleNodeCall(node, "x509_cert_remove", [name])
1488

    
1489
  @_RpcTimeout(_TMO_NORMAL)
1490
  def call_import_start(self, node, opts, instance, dest, dest_args):
1491
    """Starts a listener for an import.
1492

1493
    This is a single-node call.
1494

1495
    @type node: string
1496
    @param node: Node name
1497
    @type instance: C{objects.Instance}
1498
    @param instance: Instance object
1499

1500
    """
1501
    return self._SingleNodeCall(node, "import_start",
1502
                                [opts.ToDict(),
1503
                                 self._InstDict(instance), dest,
1504
                                 _EncodeImportExportIO(dest, dest_args)])
1505

    
1506
  @_RpcTimeout(_TMO_NORMAL)
1507
  def call_export_start(self, node, opts, host, port,
1508
                        instance, source, source_args):
1509
    """Starts an export daemon.
1510

1511
    This is a single-node call.
1512

1513
    @type node: string
1514
    @param node: Node name
1515
    @type instance: C{objects.Instance}
1516
    @param instance: Instance object
1517

1518
    """
1519
    return self._SingleNodeCall(node, "export_start",
1520
                                [opts.ToDict(), host, port,
1521
                                 self._InstDict(instance), source,
1522
                                 _EncodeImportExportIO(source, source_args)])
1523

    
1524
  @_RpcTimeout(_TMO_FAST)
1525
  def call_impexp_status(self, node, names):
1526
    """Gets the status of an import or export.
1527

1528
    This is a single-node call.
1529

1530
    @type node: string
1531
    @param node: Node name
1532
    @type names: List of strings
1533
    @param names: Import/export names
1534
    @rtype: List of L{objects.ImportExportStatus} instances
1535
    @return: Returns a list of the state of each named import/export or None if
1536
             a status couldn't be retrieved
1537

1538
    """
1539
    result = self._SingleNodeCall(node, "impexp_status", [names])
1540

    
1541
    if not result.fail_msg:
1542
      decoded = []
1543

    
1544
      for i in result.payload:
1545
        if i is None:
1546
          decoded.append(None)
1547
          continue
1548
        decoded.append(objects.ImportExportStatus.FromDict(i))
1549

    
1550
      result.payload = decoded
1551

    
1552
    return result
1553

    
1554
  @_RpcTimeout(_TMO_NORMAL)
1555
  def call_impexp_abort(self, node, name):
1556
    """Aborts an import or export.
1557

1558
    This is a single-node call.
1559

1560
    @type node: string
1561
    @param node: Node name
1562
    @type name: string
1563
    @param name: Import/export name
1564

1565
    """
1566
    return self._SingleNodeCall(node, "impexp_abort", [name])
1567

    
1568
  @_RpcTimeout(_TMO_NORMAL)
1569
  def call_impexp_cleanup(self, node, name):
1570
    """Cleans up after an import or export.
1571

1572
    This is a single-node call.
1573

1574
    @type node: string
1575
    @param node: Node name
1576
    @type name: string
1577
    @param name: Import/export name
1578

1579
    """
1580
    return self._SingleNodeCall(node, "impexp_cleanup", [name])