Statistics
| Branch: | Tag: | Revision:

root / lib / rpc.py @ 30474135

History | View | Annotate | Download (47 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Inter-node RPC library.
23

24
"""
25

    
26
# pylint: disable=C0103,R0201,R0904
27
# C0103: Invalid name, since call_ are not valid
28
# R0201: Method could be a function, we keep all rpcs instance methods
29
# as not to change them back and forth between static/instance methods
30
# if they need to start using instance attributes
31
# R0904: Too many public methods
32

    
33
import os
34
import logging
35
import zlib
36
import base64
37
import pycurl
38
import threading
39

    
40
from ganeti import utils
41
from ganeti import objects
42
from ganeti import http
43
from ganeti import serializer
44
from ganeti import constants
45
from ganeti import errors
46
from ganeti import netutils
47
from ganeti import ssconf
48
from ganeti import runtime
49

    
50
# pylint has a bug here, doesn't see this import
51
import ganeti.http.client  # pylint: disable=W0611
52

    
53

    
54
# Timeout for connecting to nodes (seconds)
55
_RPC_CONNECT_TIMEOUT = 5
56

    
57
_RPC_CLIENT_HEADERS = [
58
  "Content-type: %s" % http.HTTP_APP_JSON,
59
  "Expect:",
60
  ]
61

    
62
# Various time constants for the timeout table
63
_TMO_URGENT = 60 # one minute
64
_TMO_FAST = 5 * 60 # five minutes
65
_TMO_NORMAL = 15 * 60 # 15 minutes
66
_TMO_SLOW = 3600 # one hour
67
_TMO_4HRS = 4 * 3600
68
_TMO_1DAY = 86400
69

    
70
# Timeout table that will be built later by decorators
71
# Guidelines for choosing timeouts:
72
# - call used during watcher: timeout -> 1min, _TMO_URGENT
73
# - trivial (but be sure it is trivial) (e.g. reading a file): 5min, _TMO_FAST
74
# - other calls: 15 min, _TMO_NORMAL
75
# - special calls (instance add, etc.): either _TMO_SLOW (1h) or huge timeouts
76

    
77
_TIMEOUTS = {
78
}
79

    
80

    
81
def Init():
82
  """Initializes the module-global HTTP client manager.
83

84
  Must be called before using any RPC function and while exactly one thread is
85
  running.
86

87
  """
88
  # curl_global_init(3) and curl_global_cleanup(3) must be called with only
89
  # one thread running. This check is just a safety measure -- it doesn't
90
  # cover all cases.
91
  assert threading.activeCount() == 1, \
92
         "Found more than one active thread when initializing pycURL"
93

    
94
  logging.info("Using PycURL %s", pycurl.version)
95

    
96
  pycurl.global_init(pycurl.GLOBAL_ALL)
97

    
98

    
99
def Shutdown():
100
  """Stops the module-global HTTP client manager.
101

102
  Must be called before quitting the program and while exactly one thread is
103
  running.
104

105
  """
106
  pycurl.global_cleanup()
107

    
108

    
109
def _ConfigRpcCurl(curl):
110
  noded_cert = str(constants.NODED_CERT_FILE)
111

    
112
  curl.setopt(pycurl.FOLLOWLOCATION, False)
113
  curl.setopt(pycurl.CAINFO, noded_cert)
114
  curl.setopt(pycurl.SSL_VERIFYHOST, 0)
115
  curl.setopt(pycurl.SSL_VERIFYPEER, True)
116
  curl.setopt(pycurl.SSLCERTTYPE, "PEM")
117
  curl.setopt(pycurl.SSLCERT, noded_cert)
118
  curl.setopt(pycurl.SSLKEYTYPE, "PEM")
119
  curl.setopt(pycurl.SSLKEY, noded_cert)
120
  curl.setopt(pycurl.CONNECTTIMEOUT, _RPC_CONNECT_TIMEOUT)
121

    
122

    
123
# Aliasing this module avoids the following warning by epydoc: "Warning: No
124
# information available for ganeti.rpc._RpcThreadLocal's base threading.local"
125
_threading = threading
126

    
127

    
128
class _RpcThreadLocal(_threading.local):
129
  def GetHttpClientPool(self):
130
    """Returns a per-thread HTTP client pool.
131

132
    @rtype: L{http.client.HttpClientPool}
133

134
    """
135
    try:
136
      pool = self.hcp
137
    except AttributeError:
138
      pool = http.client.HttpClientPool(_ConfigRpcCurl)
139
      self.hcp = pool
140

    
141
    return pool
142

    
143

    
144
# Remove module alias (see above)
145
del _threading
146

    
147

    
148
_thread_local = _RpcThreadLocal()
149

    
150

    
151
def _RpcTimeout(secs):
152
  """Timeout decorator.
153

154
  When applied to a rpc call_* function, it updates the global timeout
155
  table with the given function/timeout.
156

157
  """
158
  def decorator(f):
159
    name = f.__name__
160
    assert name.startswith("call_")
161
    _TIMEOUTS[name[len("call_"):]] = secs
162
    return f
163
  return decorator
164

    
165

    
166
def RunWithRPC(fn):
167
  """RPC-wrapper decorator.
168

169
  When applied to a function, it runs it with the RPC system
170
  initialized, and it shutsdown the system afterwards. This means the
171
  function must be called without RPC being initialized.
172

173
  """
174
  def wrapper(*args, **kwargs):
175
    Init()
176
    try:
177
      return fn(*args, **kwargs)
178
    finally:
179
      Shutdown()
180
  return wrapper
181

    
182

    
183
def _Compress(data):
184
  """Compresses a string for transport over RPC.
185

186
  Small amounts of data are not compressed.
187

188
  @type data: str
189
  @param data: Data
190
  @rtype: tuple
191
  @return: Encoded data to send
192

193
  """
194
  # Small amounts of data are not compressed
195
  if len(data) < 512:
196
    return (constants.RPC_ENCODING_NONE, data)
197

    
198
  # Compress with zlib and encode in base64
199
  return (constants.RPC_ENCODING_ZLIB_BASE64,
200
          base64.b64encode(zlib.compress(data, 3)))
201

    
202

    
203
class RpcResult(object):
204
  """RPC Result class.
205

206
  This class holds an RPC result. It is needed since in multi-node
207
  calls we can't raise an exception just because one one out of many
208
  failed, and therefore we use this class to encapsulate the result.
209

210
  @ivar data: the data payload, for successful results, or None
211
  @ivar call: the name of the RPC call
212
  @ivar node: the name of the node to which we made the call
213
  @ivar offline: whether the operation failed because the node was
214
      offline, as opposed to actual failure; offline=True will always
215
      imply failed=True, in order to allow simpler checking if
216
      the user doesn't care about the exact failure mode
217
  @ivar fail_msg: the error message if the call failed
218

219
  """
220
  def __init__(self, data=None, failed=False, offline=False,
221
               call=None, node=None):
222
    self.offline = offline
223
    self.call = call
224
    self.node = node
225

    
226
    if offline:
227
      self.fail_msg = "Node is marked offline"
228
      self.data = self.payload = None
229
    elif failed:
230
      self.fail_msg = self._EnsureErr(data)
231
      self.data = self.payload = None
232
    else:
233
      self.data = data
234
      if not isinstance(self.data, (tuple, list)):
235
        self.fail_msg = ("RPC layer error: invalid result type (%s)" %
236
                         type(self.data))
237
        self.payload = None
238
      elif len(data) != 2:
239
        self.fail_msg = ("RPC layer error: invalid result length (%d), "
240
                         "expected 2" % len(self.data))
241
        self.payload = None
242
      elif not self.data[0]:
243
        self.fail_msg = self._EnsureErr(self.data[1])
244
        self.payload = None
245
      else:
246
        # finally success
247
        self.fail_msg = None
248
        self.payload = data[1]
249

    
250
    for attr_name in ["call", "data", "fail_msg",
251
                      "node", "offline", "payload"]:
252
      assert hasattr(self, attr_name), "Missing attribute %s" % attr_name
253

    
254
  @staticmethod
255
  def _EnsureErr(val):
256
    """Helper to ensure we return a 'True' value for error."""
257
    if val:
258
      return val
259
    else:
260
      return "No error information"
261

    
262
  def Raise(self, msg, prereq=False, ecode=None):
263
    """If the result has failed, raise an OpExecError.
264

265
    This is used so that LU code doesn't have to check for each
266
    result, but instead can call this function.
267

268
    """
269
    if not self.fail_msg:
270
      return
271

    
272
    if not msg: # one could pass None for default message
273
      msg = ("Call '%s' to node '%s' has failed: %s" %
274
             (self.call, self.node, self.fail_msg))
275
    else:
276
      msg = "%s: %s" % (msg, self.fail_msg)
277
    if prereq:
278
      ec = errors.OpPrereqError
279
    else:
280
      ec = errors.OpExecError
281
    if ecode is not None:
282
      args = (msg, ecode)
283
    else:
284
      args = (msg, )
285
    raise ec(*args) # pylint: disable=W0142
286

    
287

    
288
def _AddressLookup(node_list,
289
                   ssc=ssconf.SimpleStore,
290
                   nslookup_fn=netutils.Hostname.GetIP):
291
  """Return addresses for given node names.
292

293
  @type node_list: list
294
  @param node_list: List of node names
295
  @type ssc: class
296
  @param ssc: SimpleStore class that is used to obtain node->ip mappings
297
  @type nslookup_fn: callable
298
  @param nslookup_fn: function use to do NS lookup
299
  @rtype: list of addresses and/or None's
300
  @returns: List of corresponding addresses, if found
301

302
  """
303
  ss = ssc()
304
  iplist = ss.GetNodePrimaryIPList()
305
  family = ss.GetPrimaryIPFamily()
306
  addresses = []
307
  ipmap = dict(entry.split() for entry in iplist)
308
  for node in node_list:
309
    address = ipmap.get(node)
310
    if address is None:
311
      address = nslookup_fn(node, family=family)
312
    addresses.append(address)
313

    
314
  return addresses
315

    
316

    
317
class Client:
318
  """RPC Client class.
319

320
  This class, given a (remote) method name, a list of parameters and a
321
  list of nodes, will contact (in parallel) all nodes, and return a
322
  dict of results (key: node name, value: result).
323

324
  One current bug is that generic failure is still signaled by
325
  'False' result, which is not good. This overloading of values can
326
  cause bugs.
327

328
  """
329
  def __init__(self, procedure, body, port, address_lookup_fn=_AddressLookup):
330
    assert procedure in _TIMEOUTS, ("New RPC call not declared in the"
331
                                    " timeouts table")
332
    self.procedure = procedure
333
    self.body = body
334
    self.port = port
335
    self._request = {}
336
    self._address_lookup_fn = address_lookup_fn
337

    
338
  def ConnectList(self, node_list, address_list=None, read_timeout=None):
339
    """Add a list of nodes to the target nodes.
340

341
    @type node_list: list
342
    @param node_list: the list of node names to connect
343
    @type address_list: list or None
344
    @keyword address_list: either None or a list with node addresses,
345
        which must have the same length as the node list
346
    @type read_timeout: int
347
    @param read_timeout: overwrites default timeout for operation
348

349
    """
350
    if address_list is None:
351
      # Always use IP address instead of node name
352
      address_list = self._address_lookup_fn(node_list)
353

    
354
    assert len(node_list) == len(address_list), \
355
           "Name and address lists must have the same length"
356

    
357
    for node, address in zip(node_list, address_list):
358
      self.ConnectNode(node, address, read_timeout=read_timeout)
359

    
360
  def ConnectNode(self, name, address=None, read_timeout=None):
361
    """Add a node to the target list.
362

363
    @type name: str
364
    @param name: the node name
365
    @type address: str
366
    @param address: the node address, if known
367
    @type read_timeout: int
368
    @param read_timeout: overwrites default timeout for operation
369

370
    """
371
    if address is None:
372
      # Always use IP address instead of node name
373
      address = self._address_lookup_fn([name])[0]
374

    
375
    assert(address is not None)
376

    
377
    if read_timeout is None:
378
      read_timeout = _TIMEOUTS[self.procedure]
379

    
380
    self._request[name] = \
381
      http.client.HttpClientRequest(str(address), self.port,
382
                                    http.HTTP_PUT, str("/%s" % self.procedure),
383
                                    headers=_RPC_CLIENT_HEADERS,
384
                                    post_data=str(self.body),
385
                                    read_timeout=read_timeout)
386

    
387
  def GetResults(self, http_pool=None):
388
    """Call nodes and return results.
389

390
    @rtype: list
391
    @return: List of RPC results
392

393
    """
394
    if not http_pool:
395
      http_pool = _thread_local.GetHttpClientPool()
396

    
397
    http_pool.ProcessRequests(self._request.values())
398

    
399
    results = {}
400

    
401
    for name, req in self._request.iteritems():
402
      if req.success and req.resp_status_code == http.HTTP_OK:
403
        results[name] = RpcResult(data=serializer.LoadJson(req.resp_body),
404
                                  node=name, call=self.procedure)
405
        continue
406

    
407
      # TODO: Better error reporting
408
      if req.error:
409
        msg = req.error
410
      else:
411
        msg = req.resp_body
412

    
413
      logging.error("RPC error in %s from node %s: %s",
414
                    self.procedure, name, msg)
415
      results[name] = RpcResult(data=msg, failed=True, node=name,
416
                                call=self.procedure)
417

    
418
    return results
419

    
420

    
421
def _EncodeImportExportIO(ieio, ieioargs):
422
  """Encodes import/export I/O information.
423

424
  """
425
  if ieio == constants.IEIO_RAW_DISK:
426
    assert len(ieioargs) == 1
427
    return (ieioargs[0].ToDict(), )
428

    
429
  if ieio == constants.IEIO_SCRIPT:
430
    assert len(ieioargs) == 2
431
    return (ieioargs[0].ToDict(), ieioargs[1])
432

    
433
  return ieioargs
434

    
435

    
436
class RpcRunner(object):
437
  """RPC runner class.
438

439
  """
440
  def __init__(self, context):
441
    """Initialized the RPC runner.
442

443
    @type context: C{masterd.GanetiContext}
444
    @param context: Ganeti context
445

446
    """
447
    self._cfg = context.cfg
448
    self.port = netutils.GetDaemonPort(constants.NODED)
449

    
450
  def _InstDict(self, instance, hvp=None, bep=None, osp=None):
451
    """Convert the given instance to a dict.
452

453
    This is done via the instance's ToDict() method and additionally
454
    we fill the hvparams with the cluster defaults.
455

456
    @type instance: L{objects.Instance}
457
    @param instance: an Instance object
458
    @type hvp: dict or None
459
    @param hvp: a dictionary with overridden hypervisor parameters
460
    @type bep: dict or None
461
    @param bep: a dictionary with overridden backend parameters
462
    @type osp: dict or None
463
    @param osp: a dictionary with overridden os parameters
464
    @rtype: dict
465
    @return: the instance dict, with the hvparams filled with the
466
        cluster defaults
467

468
    """
469
    idict = instance.ToDict()
470
    cluster = self._cfg.GetClusterInfo()
471
    idict["hvparams"] = cluster.FillHV(instance)
472
    if hvp is not None:
473
      idict["hvparams"].update(hvp)
474
    idict["beparams"] = cluster.FillBE(instance)
475
    if bep is not None:
476
      idict["beparams"].update(bep)
477
    idict["osparams"] = cluster.SimpleFillOS(instance.os, instance.osparams)
478
    if osp is not None:
479
      idict["osparams"].update(osp)
480
    for nic in idict["nics"]:
481
      nic['nicparams'] = objects.FillDict(
482
        cluster.nicparams[constants.PP_DEFAULT],
483
        nic['nicparams'])
484
    return idict
485

    
486
  def _ConnectList(self, client, node_list, call, read_timeout=None):
487
    """Helper for computing node addresses.
488

489
    @type client: L{ganeti.rpc.Client}
490
    @param client: a C{Client} instance
491
    @type node_list: list
492
    @param node_list: the node list we should connect
493
    @type call: string
494
    @param call: the name of the remote procedure call, for filling in
495
        correctly any eventual offline nodes' results
496
    @type read_timeout: int
497
    @param read_timeout: overwrites the default read timeout for the
498
        given operation
499

500
    """
501
    all_nodes = self._cfg.GetAllNodesInfo()
502
    name_list = []
503
    addr_list = []
504
    skip_dict = {}
505
    for node in node_list:
506
      if node in all_nodes:
507
        if all_nodes[node].offline:
508
          skip_dict[node] = RpcResult(node=node, offline=True, call=call)
509
          continue
510
        val = all_nodes[node].primary_ip
511
      else:
512
        val = None
513
      addr_list.append(val)
514
      name_list.append(node)
515
    if name_list:
516
      client.ConnectList(name_list, address_list=addr_list,
517
                         read_timeout=read_timeout)
518
    return skip_dict
519

    
520
  def _ConnectNode(self, client, node, call, read_timeout=None):
521
    """Helper for computing one node's address.
522

523
    @type client: L{ganeti.rpc.Client}
524
    @param client: a C{Client} instance
525
    @type node: str
526
    @param node: the node we should connect
527
    @type call: string
528
    @param call: the name of the remote procedure call, for filling in
529
        correctly any eventual offline nodes' results
530
    @type read_timeout: int
531
    @param read_timeout: overwrites the default read timeout for the
532
        given operation
533

534
    """
535
    node_info = self._cfg.GetNodeInfo(node)
536
    if node_info is not None:
537
      if node_info.offline:
538
        return RpcResult(node=node, offline=True, call=call)
539
      addr = node_info.primary_ip
540
    else:
541
      addr = None
542
    client.ConnectNode(node, address=addr, read_timeout=read_timeout)
543

    
544
  def _MultiNodeCall(self, node_list, procedure, args, read_timeout=None):
545
    """Helper for making a multi-node call
546

547
    """
548
    body = serializer.DumpJson(args, indent=False)
549
    c = Client(procedure, body, self.port)
550
    skip_dict = self._ConnectList(c, node_list, procedure,
551
                                  read_timeout=read_timeout)
552
    skip_dict.update(c.GetResults())
553
    return skip_dict
554

    
555
  @classmethod
556
  def _StaticMultiNodeCall(cls, node_list, procedure, args,
557
                           address_list=None, read_timeout=None):
558
    """Helper for making a multi-node static call
559

560
    """
561
    body = serializer.DumpJson(args, indent=False)
562
    c = Client(procedure, body, netutils.GetDaemonPort(constants.NODED))
563
    c.ConnectList(node_list, address_list=address_list,
564
                  read_timeout=read_timeout)
565
    return c.GetResults()
566

    
567
  def _SingleNodeCall(self, node, procedure, args, read_timeout=None):
568
    """Helper for making a single-node call
569

570
    """
571
    body = serializer.DumpJson(args, indent=False)
572
    c = Client(procedure, body, self.port)
573
    result = self._ConnectNode(c, node, procedure, read_timeout=read_timeout)
574
    if result is None:
575
      # we did connect, node is not offline
576
      result = c.GetResults()[node]
577
    return result
578

    
579
  @classmethod
580
  def _StaticSingleNodeCall(cls, node, procedure, args, read_timeout=None):
581
    """Helper for making a single-node static call
582

583
    """
584
    body = serializer.DumpJson(args, indent=False)
585
    c = Client(procedure, body, netutils.GetDaemonPort(constants.NODED))
586
    c.ConnectNode(node, read_timeout=read_timeout)
587
    return c.GetResults()[node]
588

    
589
  #
590
  # Begin RPC calls
591
  #
592

    
593
  @_RpcTimeout(_TMO_URGENT)
594
  def call_bdev_sizes(self, node_list, devices):
595
    """Gets the sizes of requested block devices present on a node
596

597
    This is a multi-node call.
598

599
    """
600
    return self._MultiNodeCall(node_list, "bdev_sizes", [devices])
601

    
602
  @_RpcTimeout(_TMO_URGENT)
603
  def call_lv_list(self, node_list, vg_name):
604
    """Gets the logical volumes present in a given volume group.
605

606
    This is a multi-node call.
607

608
    """
609
    return self._MultiNodeCall(node_list, "lv_list", [vg_name])
610

    
611
  @_RpcTimeout(_TMO_URGENT)
612
  def call_vg_list(self, node_list):
613
    """Gets the volume group list.
614

615
    This is a multi-node call.
616

617
    """
618
    return self._MultiNodeCall(node_list, "vg_list", [])
619

    
620
  @_RpcTimeout(_TMO_NORMAL)
621
  def call_storage_list(self, node_list, su_name, su_args, name, fields):
622
    """Get list of storage units.
623

624
    This is a multi-node call.
625

626
    """
627
    return self._MultiNodeCall(node_list, "storage_list",
628
                               [su_name, su_args, name, fields])
629

    
630
  @_RpcTimeout(_TMO_NORMAL)
631
  def call_storage_modify(self, node, su_name, su_args, name, changes):
632
    """Modify a storage unit.
633

634
    This is a single-node call.
635

636
    """
637
    return self._SingleNodeCall(node, "storage_modify",
638
                                [su_name, su_args, name, changes])
639

    
640
  @_RpcTimeout(_TMO_NORMAL)
641
  def call_storage_execute(self, node, su_name, su_args, name, op):
642
    """Executes an operation on a storage unit.
643

644
    This is a single-node call.
645

646
    """
647
    return self._SingleNodeCall(node, "storage_execute",
648
                                [su_name, su_args, name, op])
649

    
650
  @_RpcTimeout(_TMO_URGENT)
651
  def call_bridges_exist(self, node, bridges_list):
652
    """Checks if a node has all the bridges given.
653

654
    This method checks if all bridges given in the bridges_list are
655
    present on the remote node, so that an instance that uses interfaces
656
    on those bridges can be started.
657

658
    This is a single-node call.
659

660
    """
661
    return self._SingleNodeCall(node, "bridges_exist", [bridges_list])
662

    
663
  @_RpcTimeout(_TMO_NORMAL)
664
  def call_instance_start(self, node, instance, hvp, bep, startup_paused):
665
    """Starts an instance.
666

667
    This is a single-node call.
668

669
    """
670
    idict = self._InstDict(instance, hvp=hvp, bep=bep)
671
    return self._SingleNodeCall(node, "instance_start", [idict, startup_paused])
672

    
673
  @_RpcTimeout(_TMO_NORMAL)
674
  def call_instance_shutdown(self, node, instance, timeout):
675
    """Stops an instance.
676

677
    This is a single-node call.
678

679
    """
680
    return self._SingleNodeCall(node, "instance_shutdown",
681
                                [self._InstDict(instance), timeout])
682

    
683
  @_RpcTimeout(_TMO_NORMAL)
684
  def call_migration_info(self, node, instance):
685
    """Gather the information necessary to prepare an instance migration.
686

687
    This is a single-node call.
688

689
    @type node: string
690
    @param node: the node on which the instance is currently running
691
    @type instance: C{objects.Instance}
692
    @param instance: the instance definition
693

694
    """
695
    return self._SingleNodeCall(node, "migration_info",
696
                                [self._InstDict(instance)])
697

    
698
  @_RpcTimeout(_TMO_NORMAL)
699
  def call_accept_instance(self, node, instance, info, target):
700
    """Prepare a node to accept an instance.
701

702
    This is a single-node call.
703

704
    @type node: string
705
    @param node: the target node for the migration
706
    @type instance: C{objects.Instance}
707
    @param instance: the instance definition
708
    @type info: opaque/hypervisor specific (string/data)
709
    @param info: result for the call_migration_info call
710
    @type target: string
711
    @param target: target hostname (usually ip address) (on the node itself)
712

713
    """
714
    return self._SingleNodeCall(node, "accept_instance",
715
                                [self._InstDict(instance), info, target])
716

    
717
  @_RpcTimeout(_TMO_NORMAL)
718
  def call_finalize_migration(self, node, instance, info, success):
719
    """Finalize any target-node migration specific operation.
720

721
    This is called both in case of a successful migration and in case of error
722
    (in which case it should abort the migration).
723

724
    This is a single-node call.
725

726
    @type node: string
727
    @param node: the target node for the migration
728
    @type instance: C{objects.Instance}
729
    @param instance: the instance definition
730
    @type info: opaque/hypervisor specific (string/data)
731
    @param info: result for the call_migration_info call
732
    @type success: boolean
733
    @param success: whether the migration was a success or a failure
734

735
    """
736
    return self._SingleNodeCall(node, "finalize_migration",
737
                                [self._InstDict(instance), info, success])
738

    
739
  @_RpcTimeout(_TMO_SLOW)
740
  def call_instance_migrate(self, node, instance, target, live):
741
    """Migrate an instance.
742

743
    This is a single-node call.
744

745
    @type node: string
746
    @param node: the node on which the instance is currently running
747
    @type instance: C{objects.Instance}
748
    @param instance: the instance definition
749
    @type target: string
750
    @param target: the target node name
751
    @type live: boolean
752
    @param live: whether the migration should be done live or not (the
753
        interpretation of this parameter is left to the hypervisor)
754

755
    """
756
    return self._SingleNodeCall(node, "instance_migrate",
757
                                [self._InstDict(instance), target, live])
758

    
759
  @_RpcTimeout(_TMO_NORMAL)
760
  def call_instance_reboot(self, node, inst, reboot_type, shutdown_timeout):
761
    """Reboots an instance.
762

763
    This is a single-node call.
764

765
    """
766
    return self._SingleNodeCall(node, "instance_reboot",
767
                                [self._InstDict(inst), reboot_type,
768
                                 shutdown_timeout])
769

    
770
  @_RpcTimeout(_TMO_1DAY)
771
  def call_instance_os_add(self, node, inst, reinstall, debug, osparams=None):
772
    """Installs an OS on the given instance.
773

774
    This is a single-node call.
775

776
    """
777
    return self._SingleNodeCall(node, "instance_os_add",
778
                                [self._InstDict(inst, osp=osparams),
779
                                 reinstall, debug])
780

    
781
  @_RpcTimeout(_TMO_SLOW)
782
  def call_instance_run_rename(self, node, inst, old_name, debug):
783
    """Run the OS rename script for an instance.
784

785
    This is a single-node call.
786

787
    """
788
    return self._SingleNodeCall(node, "instance_run_rename",
789
                                [self._InstDict(inst), old_name, debug])
790

    
791
  @_RpcTimeout(_TMO_URGENT)
792
  def call_instance_info(self, node, instance, hname):
793
    """Returns information about a single instance.
794

795
    This is a single-node call.
796

797
    @type node: list
798
    @param node: the list of nodes to query
799
    @type instance: string
800
    @param instance: the instance name
801
    @type hname: string
802
    @param hname: the hypervisor type of the instance
803

804
    """
805
    return self._SingleNodeCall(node, "instance_info", [instance, hname])
806

    
807
  @_RpcTimeout(_TMO_NORMAL)
808
  def call_instance_migratable(self, node, instance):
809
    """Checks whether the given instance can be migrated.
810

811
    This is a single-node call.
812

813
    @param node: the node to query
814
    @type instance: L{objects.Instance}
815
    @param instance: the instance to check
816

817

818
    """
819
    return self._SingleNodeCall(node, "instance_migratable",
820
                                [self._InstDict(instance)])
821

    
822
  @_RpcTimeout(_TMO_URGENT)
823
  def call_all_instances_info(self, node_list, hypervisor_list):
824
    """Returns information about all instances on the given nodes.
825

826
    This is a multi-node call.
827

828
    @type node_list: list
829
    @param node_list: the list of nodes to query
830
    @type hypervisor_list: list
831
    @param hypervisor_list: the hypervisors to query for instances
832

833
    """
834
    return self._MultiNodeCall(node_list, "all_instances_info",
835
                               [hypervisor_list])
836

    
837
  @_RpcTimeout(_TMO_URGENT)
838
  def call_instance_list(self, node_list, hypervisor_list):
839
    """Returns the list of running instances on a given node.
840

841
    This is a multi-node call.
842

843
    @type node_list: list
844
    @param node_list: the list of nodes to query
845
    @type hypervisor_list: list
846
    @param hypervisor_list: the hypervisors to query for instances
847

848
    """
849
    return self._MultiNodeCall(node_list, "instance_list", [hypervisor_list])
850

    
851
  @_RpcTimeout(_TMO_FAST)
852
  def call_node_tcp_ping(self, node, source, target, port, timeout,
853
                         live_port_needed):
854
    """Do a TcpPing on the remote node
855

856
    This is a single-node call.
857

858
    """
859
    return self._SingleNodeCall(node, "node_tcp_ping",
860
                                [source, target, port, timeout,
861
                                 live_port_needed])
862

    
863
  @_RpcTimeout(_TMO_FAST)
864
  def call_node_has_ip_address(self, node, address):
865
    """Checks if a node has the given IP address.
866

867
    This is a single-node call.
868

869
    """
870
    return self._SingleNodeCall(node, "node_has_ip_address", [address])
871

    
872
  @_RpcTimeout(_TMO_URGENT)
873
  def call_node_info(self, node_list, vg_name, hypervisor_type):
874
    """Return node information.
875

876
    This will return memory information and volume group size and free
877
    space.
878

879
    This is a multi-node call.
880

881
    @type node_list: list
882
    @param node_list: the list of nodes to query
883
    @type vg_name: C{string}
884
    @param vg_name: the name of the volume group to ask for disk space
885
        information
886
    @type hypervisor_type: C{str}
887
    @param hypervisor_type: the name of the hypervisor to ask for
888
        memory information
889

890
    """
891
    return self._MultiNodeCall(node_list, "node_info",
892
                               [vg_name, hypervisor_type])
893

    
894
  @_RpcTimeout(_TMO_NORMAL)
895
  def call_etc_hosts_modify(self, node, mode, name, ip):
896
    """Modify hosts file with name
897

898
    @type node: string
899
    @param node: The node to call
900
    @type mode: string
901
    @param mode: The mode to operate. Currently "add" or "remove"
902
    @type name: string
903
    @param name: The host name to be modified
904
    @type ip: string
905
    @param ip: The ip of the entry (just valid if mode is "add")
906

907
    """
908
    return self._SingleNodeCall(node, "etc_hosts_modify", [mode, name, ip])
909

    
910
  @_RpcTimeout(_TMO_NORMAL)
911
  def call_node_verify(self, node_list, checkdict, cluster_name):
912
    """Request verification of given parameters.
913

914
    This is a multi-node call.
915

916
    """
917
    return self._MultiNodeCall(node_list, "node_verify",
918
                               [checkdict, cluster_name])
919

    
920
  @classmethod
921
  @_RpcTimeout(_TMO_FAST)
922
  def call_node_start_master(cls, node, start_daemons, no_voting):
923
    """Tells a node to activate itself as a master.
924

925
    This is a single-node call.
926

927
    """
928
    return cls._StaticSingleNodeCall(node, "node_start_master",
929
                                     [start_daemons, no_voting])
930

    
931
  @classmethod
932
  @_RpcTimeout(_TMO_FAST)
933
  def call_node_stop_master(cls, node, stop_daemons):
934
    """Tells a node to demote itself from master status.
935

936
    This is a single-node call.
937

938
    """
939
    return cls._StaticSingleNodeCall(node, "node_stop_master", [stop_daemons])
940

    
941
  @classmethod
942
  @_RpcTimeout(_TMO_URGENT)
943
  def call_master_info(cls, node_list):
944
    """Query master info.
945

946
    This is a multi-node call.
947

948
    """
949
    # TODO: should this method query down nodes?
950
    return cls._StaticMultiNodeCall(node_list, "master_info", [])
951

    
952
  @classmethod
953
  @_RpcTimeout(_TMO_URGENT)
954
  def call_version(cls, node_list):
955
    """Query node version.
956

957
    This is a multi-node call.
958

959
    """
960
    return cls._StaticMultiNodeCall(node_list, "version", [])
961

    
962
  @_RpcTimeout(_TMO_NORMAL)
963
  def call_blockdev_create(self, node, bdev, size, owner, on_primary, info):
964
    """Request creation of a given block device.
965

966
    This is a single-node call.
967

968
    """
969
    return self._SingleNodeCall(node, "blockdev_create",
970
                                [bdev.ToDict(), size, owner, on_primary, info])
971

    
972
  @_RpcTimeout(_TMO_SLOW)
973
  def call_blockdev_wipe(self, node, bdev, offset, size):
974
    """Request wipe at given offset with given size of a block device.
975

976
    This is a single-node call.
977

978
    """
979
    return self._SingleNodeCall(node, "blockdev_wipe",
980
                                [bdev.ToDict(), offset, size])
981

    
982
  @_RpcTimeout(_TMO_NORMAL)
983
  def call_blockdev_remove(self, node, bdev):
984
    """Request removal of a given block device.
985

986
    This is a single-node call.
987

988
    """
989
    return self._SingleNodeCall(node, "blockdev_remove", [bdev.ToDict()])
990

    
991
  @_RpcTimeout(_TMO_NORMAL)
992
  def call_blockdev_rename(self, node, devlist):
993
    """Request rename of the given block devices.
994

995
    This is a single-node call.
996

997
    """
998
    return self._SingleNodeCall(node, "blockdev_rename",
999
                                [(d.ToDict(), uid) for d, uid in devlist])
1000

    
1001
  @_RpcTimeout(_TMO_NORMAL)
1002
  def call_blockdev_pause_resume_sync(self, node, disks, pause):
1003
    """Request a pause/resume of given block device.
1004

1005
    This is a single-node call.
1006

1007
    """
1008
    return self._SingleNodeCall(node, "blockdev_pause_resume_sync",
1009
                                [[bdev.ToDict() for bdev in disks], pause])
1010

    
1011
  @_RpcTimeout(_TMO_NORMAL)
1012
  def call_blockdev_assemble(self, node, disk, owner, on_primary, idx):
1013
    """Request assembling of a given block device.
1014

1015
    This is a single-node call.
1016

1017
    """
1018
    return self._SingleNodeCall(node, "blockdev_assemble",
1019
                                [disk.ToDict(), owner, on_primary, idx])
1020

    
1021
  @_RpcTimeout(_TMO_NORMAL)
1022
  def call_blockdev_shutdown(self, node, disk):
1023
    """Request shutdown of a given block device.
1024

1025
    This is a single-node call.
1026

1027
    """
1028
    return self._SingleNodeCall(node, "blockdev_shutdown", [disk.ToDict()])
1029

    
1030
  @_RpcTimeout(_TMO_NORMAL)
1031
  def call_blockdev_addchildren(self, node, bdev, ndevs):
1032
    """Request adding a list of children to a (mirroring) device.
1033

1034
    This is a single-node call.
1035

1036
    """
1037
    return self._SingleNodeCall(node, "blockdev_addchildren",
1038
                                [bdev.ToDict(),
1039
                                 [disk.ToDict() for disk in ndevs]])
1040

    
1041
  @_RpcTimeout(_TMO_NORMAL)
1042
  def call_blockdev_removechildren(self, node, bdev, ndevs):
1043
    """Request removing a list of children from a (mirroring) device.
1044

1045
    This is a single-node call.
1046

1047
    """
1048
    return self._SingleNodeCall(node, "blockdev_removechildren",
1049
                                [bdev.ToDict(),
1050
                                 [disk.ToDict() for disk in ndevs]])
1051

    
1052
  @_RpcTimeout(_TMO_NORMAL)
1053
  def call_blockdev_getmirrorstatus(self, node, disks):
1054
    """Request status of a (mirroring) device.
1055

1056
    This is a single-node call.
1057

1058
    """
1059
    result = self._SingleNodeCall(node, "blockdev_getmirrorstatus",
1060
                                  [dsk.ToDict() for dsk in disks])
1061
    if not result.fail_msg:
1062
      result.payload = [objects.BlockDevStatus.FromDict(i)
1063
                        for i in result.payload]
1064
    return result
1065

    
1066
  @_RpcTimeout(_TMO_NORMAL)
1067
  def call_blockdev_getmirrorstatus_multi(self, node_list, node_disks):
1068
    """Request status of (mirroring) devices from multiple nodes.
1069

1070
    This is a multi-node call.
1071

1072
    """
1073
    result = self._MultiNodeCall(node_list, "blockdev_getmirrorstatus_multi",
1074
                                 [dict((name, [dsk.ToDict() for dsk in disks])
1075
                                       for name, disks in node_disks.items())])
1076
    for nres in result.values():
1077
      if nres.fail_msg:
1078
        continue
1079

    
1080
      for idx, (success, status) in enumerate(nres.payload):
1081
        if success:
1082
          nres.payload[idx] = (success, objects.BlockDevStatus.FromDict(status))
1083

    
1084
    return result
1085

    
1086
  @_RpcTimeout(_TMO_NORMAL)
1087
  def call_blockdev_find(self, node, disk):
1088
    """Request identification of a given block device.
1089

1090
    This is a single-node call.
1091

1092
    """
1093
    result = self._SingleNodeCall(node, "blockdev_find", [disk.ToDict()])
1094
    if not result.fail_msg and result.payload is not None:
1095
      result.payload = objects.BlockDevStatus.FromDict(result.payload)
1096
    return result
1097

    
1098
  @_RpcTimeout(_TMO_NORMAL)
1099
  def call_blockdev_close(self, node, instance_name, disks):
1100
    """Closes the given block devices.
1101

1102
    This is a single-node call.
1103

1104
    """
1105
    params = [instance_name, [cf.ToDict() for cf in disks]]
1106
    return self._SingleNodeCall(node, "blockdev_close", params)
1107

    
1108
  @_RpcTimeout(_TMO_NORMAL)
1109
  def call_blockdev_getsize(self, node, disks):
1110
    """Returns the size of the given disks.
1111

1112
    This is a single-node call.
1113

1114
    """
1115
    params = [[cf.ToDict() for cf in disks]]
1116
    return self._SingleNodeCall(node, "blockdev_getsize", params)
1117

    
1118
  @_RpcTimeout(_TMO_NORMAL)
1119
  def call_drbd_disconnect_net(self, node_list, nodes_ip, disks):
1120
    """Disconnects the network of the given drbd devices.
1121

1122
    This is a multi-node call.
1123

1124
    """
1125
    return self._MultiNodeCall(node_list, "drbd_disconnect_net",
1126
                               [nodes_ip, [cf.ToDict() for cf in disks]])
1127

    
1128
  @_RpcTimeout(_TMO_NORMAL)
1129
  def call_drbd_attach_net(self, node_list, nodes_ip,
1130
                           disks, instance_name, multimaster):
1131
    """Disconnects the given drbd devices.
1132

1133
    This is a multi-node call.
1134

1135
    """
1136
    return self._MultiNodeCall(node_list, "drbd_attach_net",
1137
                               [nodes_ip, [cf.ToDict() for cf in disks],
1138
                                instance_name, multimaster])
1139

    
1140
  @_RpcTimeout(_TMO_SLOW)
1141
  def call_drbd_wait_sync(self, node_list, nodes_ip, disks):
1142
    """Waits for the synchronization of drbd devices is complete.
1143

1144
    This is a multi-node call.
1145

1146
    """
1147
    return self._MultiNodeCall(node_list, "drbd_wait_sync",
1148
                               [nodes_ip, [cf.ToDict() for cf in disks]])
1149

    
1150
  @_RpcTimeout(_TMO_URGENT)
1151
  def call_drbd_helper(self, node_list):
1152
    """Gets drbd helper.
1153

1154
    This is a multi-node call.
1155

1156
    """
1157
    return self._MultiNodeCall(node_list, "drbd_helper", [])
1158

    
1159
  @classmethod
1160
  @_RpcTimeout(_TMO_NORMAL)
1161
  def call_upload_file(cls, node_list, file_name, address_list=None):
1162
    """Upload a file.
1163

1164
    The node will refuse the operation in case the file is not on the
1165
    approved file list.
1166

1167
    This is a multi-node call.
1168

1169
    @type node_list: list
1170
    @param node_list: the list of node names to upload to
1171
    @type file_name: str
1172
    @param file_name: the filename to upload
1173
    @type address_list: list or None
1174
    @keyword address_list: an optional list of node addresses, in order
1175
        to optimize the RPC speed
1176

1177
    """
1178
    file_contents = utils.ReadFile(file_name)
1179
    data = _Compress(file_contents)
1180
    st = os.stat(file_name)
1181
    getents = runtime.GetEnts()
1182
    params = [file_name, data, st.st_mode, getents.LookupUid(st.st_uid),
1183
              getents.LookupGid(st.st_gid), st.st_atime, st.st_mtime]
1184
    return cls._StaticMultiNodeCall(node_list, "upload_file", params,
1185
                                    address_list=address_list)
1186

    
1187
  @classmethod
1188
  @_RpcTimeout(_TMO_NORMAL)
1189
  def call_write_ssconf_files(cls, node_list, values):
1190
    """Write ssconf files.
1191

1192
    This is a multi-node call.
1193

1194
    """
1195
    return cls._StaticMultiNodeCall(node_list, "write_ssconf_files", [values])
1196

    
1197
  @_RpcTimeout(_TMO_NORMAL)
1198
  def call_run_oob(self, node, oob_program, command, remote_node, timeout):
1199
    """Runs OOB.
1200

1201
    This is a single-node call.
1202

1203
    """
1204
    return self._SingleNodeCall(node, "run_oob", [oob_program, command,
1205
                                                  remote_node, timeout])
1206

    
1207
  @_RpcTimeout(_TMO_FAST)
1208
  def call_os_diagnose(self, node_list):
1209
    """Request a diagnose of OS definitions.
1210

1211
    This is a multi-node call.
1212

1213
    """
1214
    return self._MultiNodeCall(node_list, "os_diagnose", [])
1215

    
1216
  @_RpcTimeout(_TMO_FAST)
1217
  def call_os_get(self, node, name):
1218
    """Returns an OS definition.
1219

1220
    This is a single-node call.
1221

1222
    """
1223
    result = self._SingleNodeCall(node, "os_get", [name])
1224
    if not result.fail_msg and isinstance(result.payload, dict):
1225
      result.payload = objects.OS.FromDict(result.payload)
1226
    return result
1227

    
1228
  @_RpcTimeout(_TMO_FAST)
1229
  def call_os_validate(self, required, nodes, name, checks, params):
1230
    """Run a validation routine for a given OS.
1231

1232
    This is a multi-node call.
1233

1234
    """
1235
    return self._MultiNodeCall(nodes, "os_validate",
1236
                               [required, name, checks, params])
1237

    
1238
  @_RpcTimeout(_TMO_NORMAL)
1239
  def call_hooks_runner(self, node_list, hpath, phase, env):
1240
    """Call the hooks runner.
1241

1242
    Args:
1243
      - op: the OpCode instance
1244
      - env: a dictionary with the environment
1245

1246
    This is a multi-node call.
1247

1248
    """
1249
    params = [hpath, phase, env]
1250
    return self._MultiNodeCall(node_list, "hooks_runner", params)
1251

    
1252
  @_RpcTimeout(_TMO_NORMAL)
1253
  def call_iallocator_runner(self, node, name, idata):
1254
    """Call an iallocator on a remote node
1255

1256
    Args:
1257
      - name: the iallocator name
1258
      - input: the json-encoded input string
1259

1260
    This is a single-node call.
1261

1262
    """
1263
    return self._SingleNodeCall(node, "iallocator_runner", [name, idata])
1264

    
1265
  @_RpcTimeout(_TMO_NORMAL)
1266
  def call_blockdev_grow(self, node, cf_bdev, amount, dryrun):
1267
    """Request a snapshot of the given block device.
1268

1269
    This is a single-node call.
1270

1271
    """
1272
    return self._SingleNodeCall(node, "blockdev_grow",
1273
                                [cf_bdev.ToDict(), amount, dryrun])
1274

    
1275
  @_RpcTimeout(_TMO_1DAY)
1276
  def call_blockdev_export(self, node, cf_bdev,
1277
                           dest_node, dest_path, cluster_name):
1278
    """Export a given disk to another node.
1279

1280
    This is a single-node call.
1281

1282
    """
1283
    return self._SingleNodeCall(node, "blockdev_export",
1284
                                [cf_bdev.ToDict(), dest_node, dest_path,
1285
                                 cluster_name])
1286

    
1287
  @_RpcTimeout(_TMO_NORMAL)
1288
  def call_blockdev_snapshot(self, node, cf_bdev):
1289
    """Request a snapshot of the given block device.
1290

1291
    This is a single-node call.
1292

1293
    """
1294
    return self._SingleNodeCall(node, "blockdev_snapshot", [cf_bdev.ToDict()])
1295

    
1296
  @_RpcTimeout(_TMO_NORMAL)
1297
  def call_finalize_export(self, node, instance, snap_disks):
1298
    """Request the completion of an export operation.
1299

1300
    This writes the export config file, etc.
1301

1302
    This is a single-node call.
1303

1304
    """
1305
    flat_disks = []
1306
    for disk in snap_disks:
1307
      if isinstance(disk, bool):
1308
        flat_disks.append(disk)
1309
      else:
1310
        flat_disks.append(disk.ToDict())
1311

    
1312
    return self._SingleNodeCall(node, "finalize_export",
1313
                                [self._InstDict(instance), flat_disks])
1314

    
1315
  @_RpcTimeout(_TMO_FAST)
1316
  def call_export_info(self, node, path):
1317
    """Queries the export information in a given path.
1318

1319
    This is a single-node call.
1320

1321
    """
1322
    return self._SingleNodeCall(node, "export_info", [path])
1323

    
1324
  @_RpcTimeout(_TMO_FAST)
1325
  def call_export_list(self, node_list):
1326
    """Gets the stored exports list.
1327

1328
    This is a multi-node call.
1329

1330
    """
1331
    return self._MultiNodeCall(node_list, "export_list", [])
1332

    
1333
  @_RpcTimeout(_TMO_FAST)
1334
  def call_export_remove(self, node, export):
1335
    """Requests removal of a given export.
1336

1337
    This is a single-node call.
1338

1339
    """
1340
    return self._SingleNodeCall(node, "export_remove", [export])
1341

    
1342
  @classmethod
1343
  @_RpcTimeout(_TMO_NORMAL)
1344
  def call_node_leave_cluster(cls, node, modify_ssh_setup):
1345
    """Requests a node to clean the cluster information it has.
1346

1347
    This will remove the configuration information from the ganeti data
1348
    dir.
1349

1350
    This is a single-node call.
1351

1352
    """
1353
    return cls._StaticSingleNodeCall(node, "node_leave_cluster",
1354
                                     [modify_ssh_setup])
1355

    
1356
  @_RpcTimeout(_TMO_FAST)
1357
  def call_node_volumes(self, node_list):
1358
    """Gets all volumes on node(s).
1359

1360
    This is a multi-node call.
1361

1362
    """
1363
    return self._MultiNodeCall(node_list, "node_volumes", [])
1364

    
1365
  @_RpcTimeout(_TMO_FAST)
1366
  def call_node_demote_from_mc(self, node):
1367
    """Demote a node from the master candidate role.
1368

1369
    This is a single-node call.
1370

1371
    """
1372
    return self._SingleNodeCall(node, "node_demote_from_mc", [])
1373

    
1374
  @_RpcTimeout(_TMO_NORMAL)
1375
  def call_node_powercycle(self, node, hypervisor):
1376
    """Tries to powercycle a node.
1377

1378
    This is a single-node call.
1379

1380
    """
1381
    return self._SingleNodeCall(node, "node_powercycle", [hypervisor])
1382

    
1383
  @_RpcTimeout(None)
1384
  def call_test_delay(self, node_list, duration):
1385
    """Sleep for a fixed time on given node(s).
1386

1387
    This is a multi-node call.
1388

1389
    """
1390
    return self._MultiNodeCall(node_list, "test_delay", [duration],
1391
                               read_timeout=int(duration + 5))
1392

    
1393
  @_RpcTimeout(_TMO_FAST)
1394
  def call_file_storage_dir_create(self, node, file_storage_dir):
1395
    """Create the given file storage directory.
1396

1397
    This is a single-node call.
1398

1399
    """
1400
    return self._SingleNodeCall(node, "file_storage_dir_create",
1401
                                [file_storage_dir])
1402

    
1403
  @_RpcTimeout(_TMO_FAST)
1404
  def call_file_storage_dir_remove(self, node, file_storage_dir):
1405
    """Remove the given file storage directory.
1406

1407
    This is a single-node call.
1408

1409
    """
1410
    return self._SingleNodeCall(node, "file_storage_dir_remove",
1411
                                [file_storage_dir])
1412

    
1413
  @_RpcTimeout(_TMO_FAST)
1414
  def call_file_storage_dir_rename(self, node, old_file_storage_dir,
1415
                                   new_file_storage_dir):
1416
    """Rename file storage directory.
1417

1418
    This is a single-node call.
1419

1420
    """
1421
    return self._SingleNodeCall(node, "file_storage_dir_rename",
1422
                                [old_file_storage_dir, new_file_storage_dir])
1423

    
1424
  @classmethod
1425
  @_RpcTimeout(_TMO_URGENT)
1426
  def call_jobqueue_update(cls, node_list, address_list, file_name, content):
1427
    """Update job queue.
1428

1429
    This is a multi-node call.
1430

1431
    """
1432
    return cls._StaticMultiNodeCall(node_list, "jobqueue_update",
1433
                                    [file_name, _Compress(content)],
1434
                                    address_list=address_list)
1435

    
1436
  @classmethod
1437
  @_RpcTimeout(_TMO_NORMAL)
1438
  def call_jobqueue_purge(cls, node):
1439
    """Purge job queue.
1440

1441
    This is a single-node call.
1442

1443
    """
1444
    return cls._StaticSingleNodeCall(node, "jobqueue_purge", [])
1445

    
1446
  @classmethod
1447
  @_RpcTimeout(_TMO_URGENT)
1448
  def call_jobqueue_rename(cls, node_list, address_list, rename):
1449
    """Rename a job queue file.
1450

1451
    This is a multi-node call.
1452

1453
    """
1454
    return cls._StaticMultiNodeCall(node_list, "jobqueue_rename", rename,
1455
                                    address_list=address_list)
1456

    
1457
  @_RpcTimeout(_TMO_NORMAL)
1458
  def call_hypervisor_validate_params(self, node_list, hvname, hvparams):
1459
    """Validate the hypervisor params.
1460

1461
    This is a multi-node call.
1462

1463
    @type node_list: list
1464
    @param node_list: the list of nodes to query
1465
    @type hvname: string
1466
    @param hvname: the hypervisor name
1467
    @type hvparams: dict
1468
    @param hvparams: the hypervisor parameters to be validated
1469

1470
    """
1471
    cluster = self._cfg.GetClusterInfo()
1472
    hv_full = objects.FillDict(cluster.hvparams.get(hvname, {}), hvparams)
1473
    return self._MultiNodeCall(node_list, "hypervisor_validate_params",
1474
                               [hvname, hv_full])
1475

    
1476
  @_RpcTimeout(_TMO_NORMAL)
1477
  def call_x509_cert_create(self, node, validity):
1478
    """Creates a new X509 certificate for SSL/TLS.
1479

1480
    This is a single-node call.
1481

1482
    @type validity: int
1483
    @param validity: Validity in seconds
1484

1485
    """
1486
    return self._SingleNodeCall(node, "x509_cert_create", [validity])
1487

    
1488
  @_RpcTimeout(_TMO_NORMAL)
1489
  def call_x509_cert_remove(self, node, name):
1490
    """Removes a X509 certificate.
1491

1492
    This is a single-node call.
1493

1494
    @type name: string
1495
    @param name: Certificate name
1496

1497
    """
1498
    return self._SingleNodeCall(node, "x509_cert_remove", [name])
1499

    
1500
  @_RpcTimeout(_TMO_NORMAL)
1501
  def call_import_start(self, node, opts, instance, component,
1502
                        dest, dest_args):
1503
    """Starts a listener for an import.
1504

1505
    This is a single-node call.
1506

1507
    @type node: string
1508
    @param node: Node name
1509
    @type instance: C{objects.Instance}
1510
    @param instance: Instance object
1511
    @type component: string
1512
    @param component: which part of the instance is being imported
1513

1514
    """
1515
    return self._SingleNodeCall(node, "import_start",
1516
                                [opts.ToDict(),
1517
                                 self._InstDict(instance), component, dest,
1518
                                 _EncodeImportExportIO(dest, dest_args)])
1519

    
1520
  @_RpcTimeout(_TMO_NORMAL)
1521
  def call_export_start(self, node, opts, host, port,
1522
                        instance, component, source, source_args):
1523
    """Starts an export daemon.
1524

1525
    This is a single-node call.
1526

1527
    @type node: string
1528
    @param node: Node name
1529
    @type instance: C{objects.Instance}
1530
    @param instance: Instance object
1531
    @type component: string
1532
    @param component: which part of the instance is being imported
1533

1534
    """
1535
    return self._SingleNodeCall(node, "export_start",
1536
                                [opts.ToDict(), host, port,
1537
                                 self._InstDict(instance),
1538
                                 component, source,
1539
                                 _EncodeImportExportIO(source, source_args)])
1540

    
1541
  @_RpcTimeout(_TMO_FAST)
1542
  def call_impexp_status(self, node, names):
1543
    """Gets the status of an import or export.
1544

1545
    This is a single-node call.
1546

1547
    @type node: string
1548
    @param node: Node name
1549
    @type names: List of strings
1550
    @param names: Import/export names
1551
    @rtype: List of L{objects.ImportExportStatus} instances
1552
    @return: Returns a list of the state of each named import/export or None if
1553
             a status couldn't be retrieved
1554

1555
    """
1556
    result = self._SingleNodeCall(node, "impexp_status", [names])
1557

    
1558
    if not result.fail_msg:
1559
      decoded = []
1560

    
1561
      for i in result.payload:
1562
        if i is None:
1563
          decoded.append(None)
1564
          continue
1565
        decoded.append(objects.ImportExportStatus.FromDict(i))
1566

    
1567
      result.payload = decoded
1568

    
1569
    return result
1570

    
1571
  @_RpcTimeout(_TMO_NORMAL)
1572
  def call_impexp_abort(self, node, name):
1573
    """Aborts an import or export.
1574

1575
    This is a single-node call.
1576

1577
    @type node: string
1578
    @param node: Node name
1579
    @type name: string
1580
    @param name: Import/export name
1581

1582
    """
1583
    return self._SingleNodeCall(node, "impexp_abort", [name])
1584

    
1585
  @_RpcTimeout(_TMO_NORMAL)
1586
  def call_impexp_cleanup(self, node, name):
1587
    """Cleans up after an import or export.
1588

1589
    This is a single-node call.
1590

1591
    @type node: string
1592
    @param node: Node name
1593
    @type name: string
1594
    @param name: Import/export name
1595

1596
    """
1597
    return self._SingleNodeCall(node, "impexp_cleanup", [name])