Statistics
| Branch: | Tag: | Revision:

root / lib / rpc.py @ a629ecb9

History | View | Annotate | Download (48.3 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Inter-node RPC library.
23

24
"""
25

    
26
# pylint: disable=C0103,R0201,R0904
27
# C0103: Invalid name, since call_ are not valid
28
# R0201: Method could be a function, we keep all rpcs instance methods
29
# as not to change them back and forth between static/instance methods
30
# if they need to start using instance attributes
31
# R0904: Too many public methods
32

    
33
import os
34
import logging
35
import zlib
36
import base64
37
import pycurl
38
import threading
39

    
40
from ganeti import utils
41
from ganeti import objects
42
from ganeti import http
43
from ganeti import serializer
44
from ganeti import constants
45
from ganeti import errors
46
from ganeti import netutils
47
from ganeti import ssconf
48
from ganeti import runtime
49
from ganeti import compat
50

    
51
# pylint has a bug here, doesn't see this import
52
import ganeti.http.client  # pylint: disable=W0611
53

    
54

    
55
# Timeout for connecting to nodes (seconds)
56
_RPC_CONNECT_TIMEOUT = 5
57

    
58
_RPC_CLIENT_HEADERS = [
59
  "Content-type: %s" % http.HTTP_APP_JSON,
60
  "Expect:",
61
  ]
62

    
63
# Various time constants for the timeout table
64
_TMO_URGENT = 60 # one minute
65
_TMO_FAST = 5 * 60 # five minutes
66
_TMO_NORMAL = 15 * 60 # 15 minutes
67
_TMO_SLOW = 3600 # one hour
68
_TMO_4HRS = 4 * 3600
69
_TMO_1DAY = 86400
70

    
71
# Timeout table that will be built later by decorators
72
# Guidelines for choosing timeouts:
73
# - call used during watcher: timeout -> 1min, _TMO_URGENT
74
# - trivial (but be sure it is trivial) (e.g. reading a file): 5min, _TMO_FAST
75
# - other calls: 15 min, _TMO_NORMAL
76
# - special calls (instance add, etc.): either _TMO_SLOW (1h) or huge timeouts
77

    
78
_TIMEOUTS = {
79
}
80

    
81
#: Special value to describe an offline host
82
_OFFLINE = object()
83

    
84

    
85
def Init():
86
  """Initializes the module-global HTTP client manager.
87

88
  Must be called before using any RPC function and while exactly one thread is
89
  running.
90

91
  """
92
  # curl_global_init(3) and curl_global_cleanup(3) must be called with only
93
  # one thread running. This check is just a safety measure -- it doesn't
94
  # cover all cases.
95
  assert threading.activeCount() == 1, \
96
         "Found more than one active thread when initializing pycURL"
97

    
98
  logging.info("Using PycURL %s", pycurl.version)
99

    
100
  pycurl.global_init(pycurl.GLOBAL_ALL)
101

    
102

    
103
def Shutdown():
104
  """Stops the module-global HTTP client manager.
105

106
  Must be called before quitting the program and while exactly one thread is
107
  running.
108

109
  """
110
  pycurl.global_cleanup()
111

    
112

    
113
def _ConfigRpcCurl(curl):
114
  noded_cert = str(constants.NODED_CERT_FILE)
115

    
116
  curl.setopt(pycurl.FOLLOWLOCATION, False)
117
  curl.setopt(pycurl.CAINFO, noded_cert)
118
  curl.setopt(pycurl.SSL_VERIFYHOST, 0)
119
  curl.setopt(pycurl.SSL_VERIFYPEER, True)
120
  curl.setopt(pycurl.SSLCERTTYPE, "PEM")
121
  curl.setopt(pycurl.SSLCERT, noded_cert)
122
  curl.setopt(pycurl.SSLKEYTYPE, "PEM")
123
  curl.setopt(pycurl.SSLKEY, noded_cert)
124
  curl.setopt(pycurl.CONNECTTIMEOUT, _RPC_CONNECT_TIMEOUT)
125

    
126

    
127
# Aliasing this module avoids the following warning by epydoc: "Warning: No
128
# information available for ganeti.rpc._RpcThreadLocal's base threading.local"
129
_threading = threading
130

    
131

    
132
class _RpcThreadLocal(_threading.local):
133
  def GetHttpClientPool(self):
134
    """Returns a per-thread HTTP client pool.
135

136
    @rtype: L{http.client.HttpClientPool}
137

138
    """
139
    try:
140
      pool = self.hcp
141
    except AttributeError:
142
      pool = http.client.HttpClientPool(_ConfigRpcCurl)
143
      self.hcp = pool
144

    
145
    return pool
146

    
147

    
148
# Remove module alias (see above)
149
del _threading
150

    
151

    
152
_thread_local = _RpcThreadLocal()
153

    
154

    
155
def _RpcTimeout(secs):
156
  """Timeout decorator.
157

158
  When applied to a rpc call_* function, it updates the global timeout
159
  table with the given function/timeout.
160

161
  """
162
  def decorator(f):
163
    name = f.__name__
164
    assert name.startswith("call_")
165
    _TIMEOUTS[name[len("call_"):]] = secs
166
    return f
167
  return decorator
168

    
169

    
170
def RunWithRPC(fn):
171
  """RPC-wrapper decorator.
172

173
  When applied to a function, it runs it with the RPC system
174
  initialized, and it shutsdown the system afterwards. This means the
175
  function must be called without RPC being initialized.
176

177
  """
178
  def wrapper(*args, **kwargs):
179
    Init()
180
    try:
181
      return fn(*args, **kwargs)
182
    finally:
183
      Shutdown()
184
  return wrapper
185

    
186

    
187
def _Compress(data):
188
  """Compresses a string for transport over RPC.
189

190
  Small amounts of data are not compressed.
191

192
  @type data: str
193
  @param data: Data
194
  @rtype: tuple
195
  @return: Encoded data to send
196

197
  """
198
  # Small amounts of data are not compressed
199
  if len(data) < 512:
200
    return (constants.RPC_ENCODING_NONE, data)
201

    
202
  # Compress with zlib and encode in base64
203
  return (constants.RPC_ENCODING_ZLIB_BASE64,
204
          base64.b64encode(zlib.compress(data, 3)))
205

    
206

    
207
class RpcResult(object):
208
  """RPC Result class.
209

210
  This class holds an RPC result. It is needed since in multi-node
211
  calls we can't raise an exception just because one one out of many
212
  failed, and therefore we use this class to encapsulate the result.
213

214
  @ivar data: the data payload, for successful results, or None
215
  @ivar call: the name of the RPC call
216
  @ivar node: the name of the node to which we made the call
217
  @ivar offline: whether the operation failed because the node was
218
      offline, as opposed to actual failure; offline=True will always
219
      imply failed=True, in order to allow simpler checking if
220
      the user doesn't care about the exact failure mode
221
  @ivar fail_msg: the error message if the call failed
222

223
  """
224
  def __init__(self, data=None, failed=False, offline=False,
225
               call=None, node=None):
226
    self.offline = offline
227
    self.call = call
228
    self.node = node
229

    
230
    if offline:
231
      self.fail_msg = "Node is marked offline"
232
      self.data = self.payload = None
233
    elif failed:
234
      self.fail_msg = self._EnsureErr(data)
235
      self.data = self.payload = None
236
    else:
237
      self.data = data
238
      if not isinstance(self.data, (tuple, list)):
239
        self.fail_msg = ("RPC layer error: invalid result type (%s)" %
240
                         type(self.data))
241
        self.payload = None
242
      elif len(data) != 2:
243
        self.fail_msg = ("RPC layer error: invalid result length (%d), "
244
                         "expected 2" % len(self.data))
245
        self.payload = None
246
      elif not self.data[0]:
247
        self.fail_msg = self._EnsureErr(self.data[1])
248
        self.payload = None
249
      else:
250
        # finally success
251
        self.fail_msg = None
252
        self.payload = data[1]
253

    
254
    for attr_name in ["call", "data", "fail_msg",
255
                      "node", "offline", "payload"]:
256
      assert hasattr(self, attr_name), "Missing attribute %s" % attr_name
257

    
258
  @staticmethod
259
  def _EnsureErr(val):
260
    """Helper to ensure we return a 'True' value for error."""
261
    if val:
262
      return val
263
    else:
264
      return "No error information"
265

    
266
  def Raise(self, msg, prereq=False, ecode=None):
267
    """If the result has failed, raise an OpExecError.
268

269
    This is used so that LU code doesn't have to check for each
270
    result, but instead can call this function.
271

272
    """
273
    if not self.fail_msg:
274
      return
275

    
276
    if not msg: # one could pass None for default message
277
      msg = ("Call '%s' to node '%s' has failed: %s" %
278
             (self.call, self.node, self.fail_msg))
279
    else:
280
      msg = "%s: %s" % (msg, self.fail_msg)
281
    if prereq:
282
      ec = errors.OpPrereqError
283
    else:
284
      ec = errors.OpExecError
285
    if ecode is not None:
286
      args = (msg, ecode)
287
    else:
288
      args = (msg, )
289
    raise ec(*args) # pylint: disable=W0142
290

    
291

    
292
def _SsconfResolver(node_list,
293
                    ssc=ssconf.SimpleStore,
294
                    nslookup_fn=netutils.Hostname.GetIP):
295
  """Return addresses for given node names.
296

297
  @type node_list: list
298
  @param node_list: List of node names
299
  @type ssc: class
300
  @param ssc: SimpleStore class that is used to obtain node->ip mappings
301
  @type nslookup_fn: callable
302
  @param nslookup_fn: function use to do NS lookup
303
  @rtype: list of tuple; (string, string)
304
  @return: List of tuples containing node name and IP address
305

306
  """
307
  ss = ssc()
308
  iplist = ss.GetNodePrimaryIPList()
309
  family = ss.GetPrimaryIPFamily()
310
  ipmap = dict(entry.split() for entry in iplist)
311

    
312
  result = []
313
  for node in node_list:
314
    ip = ipmap.get(node)
315
    if ip is None:
316
      ip = nslookup_fn(node, family=family)
317
    result.append((node, ip))
318

    
319
  return result
320

    
321

    
322
class _StaticResolver:
323
  def __init__(self, addresses):
324
    """Initializes this class.
325

326
    """
327
    self._addresses = addresses
328

    
329
  def __call__(self, hosts):
330
    """Returns static addresses for hosts.
331

332
    """
333
    assert len(hosts) == len(self._addresses)
334
    return zip(hosts, self._addresses)
335

    
336

    
337
def _CheckConfigNode(name, node):
338
  """Checks if a node is online.
339

340
  @type name: string
341
  @param name: Node name
342
  @type node: L{objects.Node} or None
343
  @param node: Node object
344

345
  """
346
  if node is None:
347
    # Depend on DNS for name resolution
348
    ip = name
349
  elif node.offline:
350
    ip = _OFFLINE
351
  else:
352
    ip = node.primary_ip
353
  return (name, ip)
354

    
355

    
356
def _NodeConfigResolver(single_node_fn, all_nodes_fn, hosts):
357
  """Calculate node addresses using configuration.
358

359
  """
360
  # Special case for single-host lookups
361
  if len(hosts) == 1:
362
    (name, ) = hosts
363
    return [_CheckConfigNode(name, single_node_fn(name))]
364
  else:
365
    all_nodes = all_nodes_fn()
366
    return [_CheckConfigNode(name, all_nodes.get(name, None))
367
            for name in hosts]
368

    
369

    
370
class _RpcProcessor:
371
  def __init__(self, resolver, port, lock_monitor_cb=None):
372
    """Initializes this class.
373

374
    @param resolver: callable accepting a list of hostnames, returning a list
375
      of tuples containing name and IP address (IP address can be the name or
376
      the special value L{_OFFLINE} to mark offline machines)
377
    @type port: int
378
    @param port: TCP port
379
    @param lock_monitor_cb: Callable for registering with lock monitor
380

381
    """
382
    self._resolver = resolver
383
    self._port = port
384
    self._lock_monitor_cb = lock_monitor_cb
385

    
386
  @staticmethod
387
  def _PrepareRequests(hosts, port, procedure, body, read_timeout):
388
    """Prepares requests by sorting offline hosts into separate list.
389

390
    """
391
    results = {}
392
    requests = {}
393

    
394
    for (name, ip) in hosts:
395
      if ip is _OFFLINE:
396
        # Node is marked as offline
397
        results[name] = RpcResult(node=name, offline=True, call=procedure)
398
      else:
399
        requests[name] = \
400
          http.client.HttpClientRequest(str(ip), port,
401
                                        http.HTTP_PUT, str("/%s" % procedure),
402
                                        headers=_RPC_CLIENT_HEADERS,
403
                                        post_data=body,
404
                                        read_timeout=read_timeout,
405
                                        nicename="%s/%s" % (name, procedure))
406

    
407
    return (results, requests)
408

    
409
  @staticmethod
410
  def _CombineResults(results, requests, procedure):
411
    """Combines pre-computed results for offline hosts with actual call results.
412

413
    """
414
    for name, req in requests.items():
415
      if req.success and req.resp_status_code == http.HTTP_OK:
416
        host_result = RpcResult(data=serializer.LoadJson(req.resp_body),
417
                                node=name, call=procedure)
418
      else:
419
        # TODO: Better error reporting
420
        if req.error:
421
          msg = req.error
422
        else:
423
          msg = req.resp_body
424

    
425
        logging.error("RPC error in %s on node %s: %s", procedure, name, msg)
426
        host_result = RpcResult(data=msg, failed=True, node=name,
427
                                call=procedure)
428

    
429
      results[name] = host_result
430

    
431
    return results
432

    
433
  def __call__(self, hosts, procedure, body, read_timeout=None, http_pool=None):
434
    """Makes an RPC request to a number of nodes.
435

436
    @type hosts: sequence
437
    @param hosts: Hostnames
438
    @type procedure: string
439
    @param procedure: Request path
440
    @type body: string
441
    @param body: Request body
442
    @type read_timeout: int or None
443
    @param read_timeout: Read timeout for request
444

445
    """
446
    assert procedure in _TIMEOUTS, "RPC call not declared in the timeouts table"
447

    
448
    if not http_pool:
449
      http_pool = _thread_local.GetHttpClientPool()
450

    
451
    if read_timeout is None:
452
      read_timeout = _TIMEOUTS[procedure]
453

    
454
    (results, requests) = \
455
      self._PrepareRequests(self._resolver(hosts), self._port, procedure,
456
                            str(body), read_timeout)
457

    
458
    http_pool.ProcessRequests(requests.values(),
459
                              lock_monitor_cb=self._lock_monitor_cb)
460

    
461
    assert not frozenset(results).intersection(requests)
462

    
463
    return self._CombineResults(results, requests, procedure)
464

    
465

    
466
def _EncodeImportExportIO(ieio, ieioargs):
467
  """Encodes import/export I/O information.
468

469
  """
470
  if ieio == constants.IEIO_RAW_DISK:
471
    assert len(ieioargs) == 1
472
    return (ieioargs[0].ToDict(), )
473

    
474
  if ieio == constants.IEIO_SCRIPT:
475
    assert len(ieioargs) == 2
476
    return (ieioargs[0].ToDict(), ieioargs[1])
477

    
478
  return ieioargs
479

    
480

    
481
class RpcRunner(object):
482
  """RPC runner class.
483

484
  """
485
  def __init__(self, context):
486
    """Initialized the RPC runner.
487

488
    @type context: C{masterd.GanetiContext}
489
    @param context: Ganeti context
490

491
    """
492
    self._cfg = context.cfg
493
    self._proc = _RpcProcessor(compat.partial(_NodeConfigResolver,
494
                                              self._cfg.GetNodeInfo,
495
                                              self._cfg.GetAllNodesInfo),
496
                               netutils.GetDaemonPort(constants.NODED),
497
                               lock_monitor_cb=context.glm.AddToLockMonitor)
498

    
499
  def _InstDict(self, instance, hvp=None, bep=None, osp=None):
500
    """Convert the given instance to a dict.
501

502
    This is done via the instance's ToDict() method and additionally
503
    we fill the hvparams with the cluster defaults.
504

505
    @type instance: L{objects.Instance}
506
    @param instance: an Instance object
507
    @type hvp: dict or None
508
    @param hvp: a dictionary with overridden hypervisor parameters
509
    @type bep: dict or None
510
    @param bep: a dictionary with overridden backend parameters
511
    @type osp: dict or None
512
    @param osp: a dictionary with overridden os parameters
513
    @rtype: dict
514
    @return: the instance dict, with the hvparams filled with the
515
        cluster defaults
516

517
    """
518
    idict = instance.ToDict()
519
    cluster = self._cfg.GetClusterInfo()
520
    idict["hvparams"] = cluster.FillHV(instance)
521
    if hvp is not None:
522
      idict["hvparams"].update(hvp)
523
    idict["beparams"] = cluster.FillBE(instance)
524
    if bep is not None:
525
      idict["beparams"].update(bep)
526
    idict["osparams"] = cluster.SimpleFillOS(instance.os, instance.osparams)
527
    if osp is not None:
528
      idict["osparams"].update(osp)
529
    for nic in idict["nics"]:
530
      nic['nicparams'] = objects.FillDict(
531
        cluster.nicparams[constants.PP_DEFAULT],
532
        nic['nicparams'])
533
    return idict
534

    
535
  def _MultiNodeCall(self, node_list, procedure, args, read_timeout=None):
536
    """Helper for making a multi-node call
537

538
    """
539
    body = serializer.DumpJson(args, indent=False)
540
    return self._proc(node_list, procedure, body, read_timeout=read_timeout)
541

    
542
  @staticmethod
543
  def _StaticMultiNodeCall(node_list, procedure, args,
544
                           address_list=None, read_timeout=None):
545
    """Helper for making a multi-node static call
546

547
    """
548
    body = serializer.DumpJson(args, indent=False)
549

    
550
    if address_list is None:
551
      resolver = _SsconfResolver
552
    else:
553
      # Caller provided an address list
554
      resolver = _StaticResolver(address_list)
555

    
556
    proc = _RpcProcessor(resolver,
557
                         netutils.GetDaemonPort(constants.NODED))
558
    return proc(node_list, procedure, body, read_timeout=read_timeout)
559

    
560
  def _SingleNodeCall(self, node, procedure, args, read_timeout=None):
561
    """Helper for making a single-node call
562

563
    """
564
    body = serializer.DumpJson(args, indent=False)
565
    return self._proc([node], procedure, body, read_timeout=read_timeout)[node]
566

    
567
  @classmethod
568
  def _StaticSingleNodeCall(cls, node, procedure, args, read_timeout=None):
569
    """Helper for making a single-node static call
570

571
    """
572
    body = serializer.DumpJson(args, indent=False)
573
    proc = _RpcProcessor(_SsconfResolver,
574
                         netutils.GetDaemonPort(constants.NODED))
575
    return proc([node], procedure, body, read_timeout=read_timeout)[node]
576

    
577
  #
578
  # Begin RPC calls
579
  #
580

    
581
  @_RpcTimeout(_TMO_URGENT)
582
  def call_bdev_sizes(self, node_list, devices):
583
    """Gets the sizes of requested block devices present on a node
584

585
    This is a multi-node call.
586

587
    """
588
    return self._MultiNodeCall(node_list, "bdev_sizes", [devices])
589

    
590
  @_RpcTimeout(_TMO_URGENT)
591
  def call_lv_list(self, node_list, vg_name):
592
    """Gets the logical volumes present in a given volume group.
593

594
    This is a multi-node call.
595

596
    """
597
    return self._MultiNodeCall(node_list, "lv_list", [vg_name])
598

    
599
  @_RpcTimeout(_TMO_URGENT)
600
  def call_vg_list(self, node_list):
601
    """Gets the volume group list.
602

603
    This is a multi-node call.
604

605
    """
606
    return self._MultiNodeCall(node_list, "vg_list", [])
607

    
608
  @_RpcTimeout(_TMO_NORMAL)
609
  def call_storage_list(self, node_list, su_name, su_args, name, fields):
610
    """Get list of storage units.
611

612
    This is a multi-node call.
613

614
    """
615
    return self._MultiNodeCall(node_list, "storage_list",
616
                               [su_name, su_args, name, fields])
617

    
618
  @_RpcTimeout(_TMO_NORMAL)
619
  def call_storage_modify(self, node, su_name, su_args, name, changes):
620
    """Modify a storage unit.
621

622
    This is a single-node call.
623

624
    """
625
    return self._SingleNodeCall(node, "storage_modify",
626
                                [su_name, su_args, name, changes])
627

    
628
  @_RpcTimeout(_TMO_NORMAL)
629
  def call_storage_execute(self, node, su_name, su_args, name, op):
630
    """Executes an operation on a storage unit.
631

632
    This is a single-node call.
633

634
    """
635
    return self._SingleNodeCall(node, "storage_execute",
636
                                [su_name, su_args, name, op])
637

    
638
  @_RpcTimeout(_TMO_URGENT)
639
  def call_bridges_exist(self, node, bridges_list):
640
    """Checks if a node has all the bridges given.
641

642
    This method checks if all bridges given in the bridges_list are
643
    present on the remote node, so that an instance that uses interfaces
644
    on those bridges can be started.
645

646
    This is a single-node call.
647

648
    """
649
    return self._SingleNodeCall(node, "bridges_exist", [bridges_list])
650

    
651
  @_RpcTimeout(_TMO_NORMAL)
652
  def call_instance_start(self, node, instance, hvp, bep, startup_paused):
653
    """Starts an instance.
654

655
    This is a single-node call.
656

657
    """
658
    idict = self._InstDict(instance, hvp=hvp, bep=bep)
659
    return self._SingleNodeCall(node, "instance_start", [idict, startup_paused])
660

    
661
  @_RpcTimeout(_TMO_NORMAL)
662
  def call_instance_shutdown(self, node, instance, timeout):
663
    """Stops an instance.
664

665
    This is a single-node call.
666

667
    """
668
    return self._SingleNodeCall(node, "instance_shutdown",
669
                                [self._InstDict(instance), timeout])
670

    
671
  @_RpcTimeout(_TMO_NORMAL)
672
  def call_migration_info(self, node, instance):
673
    """Gather the information necessary to prepare an instance migration.
674

675
    This is a single-node call.
676

677
    @type node: string
678
    @param node: the node on which the instance is currently running
679
    @type instance: C{objects.Instance}
680
    @param instance: the instance definition
681

682
    """
683
    return self._SingleNodeCall(node, "migration_info",
684
                                [self._InstDict(instance)])
685

    
686
  @_RpcTimeout(_TMO_NORMAL)
687
  def call_accept_instance(self, node, instance, info, target):
688
    """Prepare a node to accept an instance.
689

690
    This is a single-node call.
691

692
    @type node: string
693
    @param node: the target node for the migration
694
    @type instance: C{objects.Instance}
695
    @param instance: the instance definition
696
    @type info: opaque/hypervisor specific (string/data)
697
    @param info: result for the call_migration_info call
698
    @type target: string
699
    @param target: target hostname (usually ip address) (on the node itself)
700

701
    """
702
    return self._SingleNodeCall(node, "accept_instance",
703
                                [self._InstDict(instance), info, target])
704

    
705
  @_RpcTimeout(_TMO_NORMAL)
706
  def call_instance_finalize_migration_dst(self, node, instance, info, success):
707
    """Finalize any target-node migration specific operation.
708

709
    This is called both in case of a successful migration and in case of error
710
    (in which case it should abort the migration).
711

712
    This is a single-node call.
713

714
    @type node: string
715
    @param node: the target node for the migration
716
    @type instance: C{objects.Instance}
717
    @param instance: the instance definition
718
    @type info: opaque/hypervisor specific (string/data)
719
    @param info: result for the call_migration_info call
720
    @type success: boolean
721
    @param success: whether the migration was a success or a failure
722

723
    """
724
    return self._SingleNodeCall(node, "instance_finalize_migration_dst",
725
                                [self._InstDict(instance), info, success])
726

    
727
  @_RpcTimeout(_TMO_SLOW)
728
  def call_instance_migrate(self, node, instance, target, live):
729
    """Migrate an instance.
730

731
    This is a single-node call.
732

733
    @type node: string
734
    @param node: the node on which the instance is currently running
735
    @type instance: C{objects.Instance}
736
    @param instance: the instance definition
737
    @type target: string
738
    @param target: the target node name
739
    @type live: boolean
740
    @param live: whether the migration should be done live or not (the
741
        interpretation of this parameter is left to the hypervisor)
742

743
    """
744
    return self._SingleNodeCall(node, "instance_migrate",
745
                                [self._InstDict(instance), target, live])
746

    
747
  @_RpcTimeout(_TMO_SLOW)
748
  def call_instance_finalize_migration_src(self, node, instance, success, live):
749
    """Finalize the instance migration on the source node.
750

751
    This is a single-node call.
752

753
    @type instance: L{objects.Instance}
754
    @param instance: the instance that was migrated
755
    @type success: bool
756
    @param success: whether the migration succeeded or not
757
    @type live: bool
758
    @param live: whether the user requested a live migration or not
759

760
    """
761
    return self._SingleNodeCall(node, "instance_finalize_migration_src",
762
                                [self._InstDict(instance), success, live])
763

    
764
  @_RpcTimeout(_TMO_SLOW)
765
  def call_instance_get_migration_status(self, node, instance):
766
    """Report migration status.
767

768
    This is a single-node call that must be executed on the source node.
769

770
    @type instance: L{objects.Instance}
771
    @param instance: the instance that is being migrated
772
    @rtype: L{objects.MigrationStatus}
773
    @return: the status of the current migration (one of
774
             L{constants.HV_MIGRATION_VALID_STATUSES}), plus any additional
775
             progress info that can be retrieved from the hypervisor
776

777
    """
778
    result = self._SingleNodeCall(node, "instance_get_migration_status",
779
                                  [self._InstDict(instance)])
780
    if not result.fail_msg and result.payload is not None:
781
      result.payload = objects.MigrationStatus.FromDict(result.payload)
782
    return result
783

    
784
  @_RpcTimeout(_TMO_NORMAL)
785
  def call_instance_reboot(self, node, inst, reboot_type, shutdown_timeout):
786
    """Reboots an instance.
787

788
    This is a single-node call.
789

790
    """
791
    return self._SingleNodeCall(node, "instance_reboot",
792
                                [self._InstDict(inst), reboot_type,
793
                                 shutdown_timeout])
794

    
795
  @_RpcTimeout(_TMO_1DAY)
796
  def call_instance_os_add(self, node, inst, reinstall, debug, osparams=None):
797
    """Installs an OS on the given instance.
798

799
    This is a single-node call.
800

801
    """
802
    return self._SingleNodeCall(node, "instance_os_add",
803
                                [self._InstDict(inst, osp=osparams),
804
                                 reinstall, debug])
805

    
806
  @_RpcTimeout(_TMO_SLOW)
807
  def call_instance_run_rename(self, node, inst, old_name, debug):
808
    """Run the OS rename script for an instance.
809

810
    This is a single-node call.
811

812
    """
813
    return self._SingleNodeCall(node, "instance_run_rename",
814
                                [self._InstDict(inst), old_name, debug])
815

    
816
  @_RpcTimeout(_TMO_URGENT)
817
  def call_instance_info(self, node, instance, hname):
818
    """Returns information about a single instance.
819

820
    This is a single-node call.
821

822
    @type node: list
823
    @param node: the list of nodes to query
824
    @type instance: string
825
    @param instance: the instance name
826
    @type hname: string
827
    @param hname: the hypervisor type of the instance
828

829
    """
830
    return self._SingleNodeCall(node, "instance_info", [instance, hname])
831

    
832
  @_RpcTimeout(_TMO_NORMAL)
833
  def call_instance_migratable(self, node, instance):
834
    """Checks whether the given instance can be migrated.
835

836
    This is a single-node call.
837

838
    @param node: the node to query
839
    @type instance: L{objects.Instance}
840
    @param instance: the instance to check
841

842

843
    """
844
    return self._SingleNodeCall(node, "instance_migratable",
845
                                [self._InstDict(instance)])
846

    
847
  @_RpcTimeout(_TMO_URGENT)
848
  def call_all_instances_info(self, node_list, hypervisor_list):
849
    """Returns information about all instances on the given nodes.
850

851
    This is a multi-node call.
852

853
    @type node_list: list
854
    @param node_list: the list of nodes to query
855
    @type hypervisor_list: list
856
    @param hypervisor_list: the hypervisors to query for instances
857

858
    """
859
    return self._MultiNodeCall(node_list, "all_instances_info",
860
                               [hypervisor_list])
861

    
862
  @_RpcTimeout(_TMO_URGENT)
863
  def call_instance_list(self, node_list, hypervisor_list):
864
    """Returns the list of running instances on a given node.
865

866
    This is a multi-node call.
867

868
    @type node_list: list
869
    @param node_list: the list of nodes to query
870
    @type hypervisor_list: list
871
    @param hypervisor_list: the hypervisors to query for instances
872

873
    """
874
    return self._MultiNodeCall(node_list, "instance_list", [hypervisor_list])
875

    
876
  @_RpcTimeout(_TMO_FAST)
877
  def call_node_tcp_ping(self, node, source, target, port, timeout,
878
                         live_port_needed):
879
    """Do a TcpPing on the remote node
880

881
    This is a single-node call.
882

883
    """
884
    return self._SingleNodeCall(node, "node_tcp_ping",
885
                                [source, target, port, timeout,
886
                                 live_port_needed])
887

    
888
  @_RpcTimeout(_TMO_FAST)
889
  def call_node_has_ip_address(self, node, address):
890
    """Checks if a node has the given IP address.
891

892
    This is a single-node call.
893

894
    """
895
    return self._SingleNodeCall(node, "node_has_ip_address", [address])
896

    
897
  @_RpcTimeout(_TMO_URGENT)
898
  def call_node_info(self, node_list, vg_name, hypervisor_type):
899
    """Return node information.
900

901
    This will return memory information and volume group size and free
902
    space.
903

904
    This is a multi-node call.
905

906
    @type node_list: list
907
    @param node_list: the list of nodes to query
908
    @type vg_name: C{string}
909
    @param vg_name: the name of the volume group to ask for disk space
910
        information
911
    @type hypervisor_type: C{str}
912
    @param hypervisor_type: the name of the hypervisor to ask for
913
        memory information
914

915
    """
916
    return self._MultiNodeCall(node_list, "node_info",
917
                               [vg_name, hypervisor_type])
918

    
919
  @_RpcTimeout(_TMO_NORMAL)
920
  def call_etc_hosts_modify(self, node, mode, name, ip):
921
    """Modify hosts file with name
922

923
    @type node: string
924
    @param node: The node to call
925
    @type mode: string
926
    @param mode: The mode to operate. Currently "add" or "remove"
927
    @type name: string
928
    @param name: The host name to be modified
929
    @type ip: string
930
    @param ip: The ip of the entry (just valid if mode is "add")
931

932
    """
933
    return self._SingleNodeCall(node, "etc_hosts_modify", [mode, name, ip])
934

    
935
  @_RpcTimeout(_TMO_NORMAL)
936
  def call_node_verify(self, node_list, checkdict, cluster_name):
937
    """Request verification of given parameters.
938

939
    This is a multi-node call.
940

941
    """
942
    return self._MultiNodeCall(node_list, "node_verify",
943
                               [checkdict, cluster_name])
944

    
945
  @classmethod
946
  @_RpcTimeout(_TMO_FAST)
947
  def call_node_start_master_daemons(cls, node, no_voting):
948
    """Starts master daemons on a node.
949

950
    This is a single-node call.
951

952
    """
953
    return cls._StaticSingleNodeCall(node, "node_start_master_daemons",
954
                                     [no_voting])
955

    
956
  @classmethod
957
  @_RpcTimeout(_TMO_FAST)
958
  def call_node_activate_master_ip(cls, node):
959
    """Activates master IP on a node.
960

961
    This is a single-node call.
962

963
    """
964
    return cls._StaticSingleNodeCall(node, "node_activate_master_ip", [])
965

    
966
  @classmethod
967
  @_RpcTimeout(_TMO_FAST)
968
  def call_node_stop_master(cls, node):
969
    """Deactivates master IP and stops master daemons on a node.
970

971
    This is a single-node call.
972

973
    """
974
    return cls._StaticSingleNodeCall(node, "node_stop_master", [])
975

    
976
  @classmethod
977
  @_RpcTimeout(_TMO_FAST)
978
  def call_node_deactivate_master_ip(cls, node):
979
    """Deactivates master IP on a node.
980

981
    This is a single-node call.
982

983
    """
984
    return cls._StaticSingleNodeCall(node, "node_deactivate_master_ip", [])
985

    
986
  @classmethod
987
  @_RpcTimeout(_TMO_FAST)
988
  def call_node_change_master_netmask(cls, node, netmask):
989
    """Change master IP netmask.
990

991
    This is a single-node call.
992

993
    """
994
    return cls._StaticSingleNodeCall(node, "node_change_master_netmask",
995
                  [netmask])
996

    
997
  @classmethod
998
  @_RpcTimeout(_TMO_URGENT)
999
  def call_master_info(cls, node_list):
1000
    """Query master info.
1001

1002
    This is a multi-node call.
1003

1004
    """
1005
    # TODO: should this method query down nodes?
1006
    return cls._StaticMultiNodeCall(node_list, "master_info", [])
1007

    
1008
  @classmethod
1009
  @_RpcTimeout(_TMO_URGENT)
1010
  def call_version(cls, node_list):
1011
    """Query node version.
1012

1013
    This is a multi-node call.
1014

1015
    """
1016
    return cls._StaticMultiNodeCall(node_list, "version", [])
1017

    
1018
  @_RpcTimeout(_TMO_NORMAL)
1019
  def call_blockdev_create(self, node, bdev, size, owner, on_primary, info):
1020
    """Request creation of a given block device.
1021

1022
    This is a single-node call.
1023

1024
    """
1025
    return self._SingleNodeCall(node, "blockdev_create",
1026
                                [bdev.ToDict(), size, owner, on_primary, info])
1027

    
1028
  @_RpcTimeout(_TMO_SLOW)
1029
  def call_blockdev_wipe(self, node, bdev, offset, size):
1030
    """Request wipe at given offset with given size of a block device.
1031

1032
    This is a single-node call.
1033

1034
    """
1035
    return self._SingleNodeCall(node, "blockdev_wipe",
1036
                                [bdev.ToDict(), offset, size])
1037

    
1038
  @_RpcTimeout(_TMO_NORMAL)
1039
  def call_blockdev_remove(self, node, bdev):
1040
    """Request removal of a given block device.
1041

1042
    This is a single-node call.
1043

1044
    """
1045
    return self._SingleNodeCall(node, "blockdev_remove", [bdev.ToDict()])
1046

    
1047
  @_RpcTimeout(_TMO_NORMAL)
1048
  def call_blockdev_rename(self, node, devlist):
1049
    """Request rename of the given block devices.
1050

1051
    This is a single-node call.
1052

1053
    """
1054
    return self._SingleNodeCall(node, "blockdev_rename",
1055
                                [(d.ToDict(), uid) for d, uid in devlist])
1056

    
1057
  @_RpcTimeout(_TMO_NORMAL)
1058
  def call_blockdev_pause_resume_sync(self, node, disks, pause):
1059
    """Request a pause/resume of given block device.
1060

1061
    This is a single-node call.
1062

1063
    """
1064
    return self._SingleNodeCall(node, "blockdev_pause_resume_sync",
1065
                                [[bdev.ToDict() for bdev in disks], pause])
1066

    
1067
  @_RpcTimeout(_TMO_NORMAL)
1068
  def call_blockdev_assemble(self, node, disk, owner, on_primary, idx):
1069
    """Request assembling of a given block device.
1070

1071
    This is a single-node call.
1072

1073
    """
1074
    return self._SingleNodeCall(node, "blockdev_assemble",
1075
                                [disk.ToDict(), owner, on_primary, idx])
1076

    
1077
  @_RpcTimeout(_TMO_NORMAL)
1078
  def call_blockdev_shutdown(self, node, disk):
1079
    """Request shutdown of a given block device.
1080

1081
    This is a single-node call.
1082

1083
    """
1084
    return self._SingleNodeCall(node, "blockdev_shutdown", [disk.ToDict()])
1085

    
1086
  @_RpcTimeout(_TMO_NORMAL)
1087
  def call_blockdev_addchildren(self, node, bdev, ndevs):
1088
    """Request adding a list of children to a (mirroring) device.
1089

1090
    This is a single-node call.
1091

1092
    """
1093
    return self._SingleNodeCall(node, "blockdev_addchildren",
1094
                                [bdev.ToDict(),
1095
                                 [disk.ToDict() for disk in ndevs]])
1096

    
1097
  @_RpcTimeout(_TMO_NORMAL)
1098
  def call_blockdev_removechildren(self, node, bdev, ndevs):
1099
    """Request removing a list of children from a (mirroring) device.
1100

1101
    This is a single-node call.
1102

1103
    """
1104
    return self._SingleNodeCall(node, "blockdev_removechildren",
1105
                                [bdev.ToDict(),
1106
                                 [disk.ToDict() for disk in ndevs]])
1107

    
1108
  @_RpcTimeout(_TMO_NORMAL)
1109
  def call_blockdev_getmirrorstatus(self, node, disks):
1110
    """Request status of a (mirroring) device.
1111

1112
    This is a single-node call.
1113

1114
    """
1115
    result = self._SingleNodeCall(node, "blockdev_getmirrorstatus",
1116
                                  [dsk.ToDict() for dsk in disks])
1117
    if not result.fail_msg:
1118
      result.payload = [objects.BlockDevStatus.FromDict(i)
1119
                        for i in result.payload]
1120
    return result
1121

    
1122
  @_RpcTimeout(_TMO_NORMAL)
1123
  def call_blockdev_getmirrorstatus_multi(self, node_list, node_disks):
1124
    """Request status of (mirroring) devices from multiple nodes.
1125

1126
    This is a multi-node call.
1127

1128
    """
1129
    result = self._MultiNodeCall(node_list, "blockdev_getmirrorstatus_multi",
1130
                                 [dict((name, [dsk.ToDict() for dsk in disks])
1131
                                       for name, disks in node_disks.items())])
1132
    for nres in result.values():
1133
      if nres.fail_msg:
1134
        continue
1135

    
1136
      for idx, (success, status) in enumerate(nres.payload):
1137
        if success:
1138
          nres.payload[idx] = (success, objects.BlockDevStatus.FromDict(status))
1139

    
1140
    return result
1141

    
1142
  @_RpcTimeout(_TMO_NORMAL)
1143
  def call_blockdev_find(self, node, disk):
1144
    """Request identification of a given block device.
1145

1146
    This is a single-node call.
1147

1148
    """
1149
    result = self._SingleNodeCall(node, "blockdev_find", [disk.ToDict()])
1150
    if not result.fail_msg and result.payload is not None:
1151
      result.payload = objects.BlockDevStatus.FromDict(result.payload)
1152
    return result
1153

    
1154
  @_RpcTimeout(_TMO_NORMAL)
1155
  def call_blockdev_close(self, node, instance_name, disks):
1156
    """Closes the given block devices.
1157

1158
    This is a single-node call.
1159

1160
    """
1161
    params = [instance_name, [cf.ToDict() for cf in disks]]
1162
    return self._SingleNodeCall(node, "blockdev_close", params)
1163

    
1164
  @_RpcTimeout(_TMO_NORMAL)
1165
  def call_blockdev_getsize(self, node, disks):
1166
    """Returns the size of the given disks.
1167

1168
    This is a single-node call.
1169

1170
    """
1171
    params = [[cf.ToDict() for cf in disks]]
1172
    return self._SingleNodeCall(node, "blockdev_getsize", params)
1173

    
1174
  @_RpcTimeout(_TMO_NORMAL)
1175
  def call_drbd_disconnect_net(self, node_list, nodes_ip, disks):
1176
    """Disconnects the network of the given drbd devices.
1177

1178
    This is a multi-node call.
1179

1180
    """
1181
    return self._MultiNodeCall(node_list, "drbd_disconnect_net",
1182
                               [nodes_ip, [cf.ToDict() for cf in disks]])
1183

    
1184
  @_RpcTimeout(_TMO_NORMAL)
1185
  def call_drbd_attach_net(self, node_list, nodes_ip,
1186
                           disks, instance_name, multimaster):
1187
    """Disconnects the given drbd devices.
1188

1189
    This is a multi-node call.
1190

1191
    """
1192
    return self._MultiNodeCall(node_list, "drbd_attach_net",
1193
                               [nodes_ip, [cf.ToDict() for cf in disks],
1194
                                instance_name, multimaster])
1195

    
1196
  @_RpcTimeout(_TMO_SLOW)
1197
  def call_drbd_wait_sync(self, node_list, nodes_ip, disks):
1198
    """Waits for the synchronization of drbd devices is complete.
1199

1200
    This is a multi-node call.
1201

1202
    """
1203
    return self._MultiNodeCall(node_list, "drbd_wait_sync",
1204
                               [nodes_ip, [cf.ToDict() for cf in disks]])
1205

    
1206
  @_RpcTimeout(_TMO_URGENT)
1207
  def call_drbd_helper(self, node_list):
1208
    """Gets drbd helper.
1209

1210
    This is a multi-node call.
1211

1212
    """
1213
    return self._MultiNodeCall(node_list, "drbd_helper", [])
1214

    
1215
  @classmethod
1216
  @_RpcTimeout(_TMO_NORMAL)
1217
  def call_upload_file(cls, node_list, file_name, address_list=None):
1218
    """Upload a file.
1219

1220
    The node will refuse the operation in case the file is not on the
1221
    approved file list.
1222

1223
    This is a multi-node call.
1224

1225
    @type node_list: list
1226
    @param node_list: the list of node names to upload to
1227
    @type file_name: str
1228
    @param file_name: the filename to upload
1229
    @type address_list: list or None
1230
    @keyword address_list: an optional list of node addresses, in order
1231
        to optimize the RPC speed
1232

1233
    """
1234
    file_contents = utils.ReadFile(file_name)
1235
    data = _Compress(file_contents)
1236
    st = os.stat(file_name)
1237
    getents = runtime.GetEnts()
1238
    params = [file_name, data, st.st_mode, getents.LookupUid(st.st_uid),
1239
              getents.LookupGid(st.st_gid), st.st_atime, st.st_mtime]
1240
    return cls._StaticMultiNodeCall(node_list, "upload_file", params,
1241
                                    address_list=address_list)
1242

    
1243
  @classmethod
1244
  @_RpcTimeout(_TMO_NORMAL)
1245
  def call_write_ssconf_files(cls, node_list, values):
1246
    """Write ssconf files.
1247

1248
    This is a multi-node call.
1249

1250
    """
1251
    return cls._StaticMultiNodeCall(node_list, "write_ssconf_files", [values])
1252

    
1253
  @_RpcTimeout(_TMO_NORMAL)
1254
  def call_run_oob(self, node, oob_program, command, remote_node, timeout):
1255
    """Runs OOB.
1256

1257
    This is a single-node call.
1258

1259
    """
1260
    return self._SingleNodeCall(node, "run_oob", [oob_program, command,
1261
                                                  remote_node, timeout])
1262

    
1263
  @_RpcTimeout(_TMO_FAST)
1264
  def call_os_diagnose(self, node_list):
1265
    """Request a diagnose of OS definitions.
1266

1267
    This is a multi-node call.
1268

1269
    """
1270
    return self._MultiNodeCall(node_list, "os_diagnose", [])
1271

    
1272
  @_RpcTimeout(_TMO_FAST)
1273
  def call_os_get(self, node, name):
1274
    """Returns an OS definition.
1275

1276
    This is a single-node call.
1277

1278
    """
1279
    result = self._SingleNodeCall(node, "os_get", [name])
1280
    if not result.fail_msg and isinstance(result.payload, dict):
1281
      result.payload = objects.OS.FromDict(result.payload)
1282
    return result
1283

    
1284
  @_RpcTimeout(_TMO_FAST)
1285
  def call_os_validate(self, required, nodes, name, checks, params):
1286
    """Run a validation routine for a given OS.
1287

1288
    This is a multi-node call.
1289

1290
    """
1291
    return self._MultiNodeCall(nodes, "os_validate",
1292
                               [required, name, checks, params])
1293

    
1294
  @_RpcTimeout(_TMO_NORMAL)
1295
  def call_hooks_runner(self, node_list, hpath, phase, env):
1296
    """Call the hooks runner.
1297

1298
    Args:
1299
      - op: the OpCode instance
1300
      - env: a dictionary with the environment
1301

1302
    This is a multi-node call.
1303

1304
    """
1305
    params = [hpath, phase, env]
1306
    return self._MultiNodeCall(node_list, "hooks_runner", params)
1307

    
1308
  @_RpcTimeout(_TMO_NORMAL)
1309
  def call_iallocator_runner(self, node, name, idata):
1310
    """Call an iallocator on a remote node
1311

1312
    Args:
1313
      - name: the iallocator name
1314
      - input: the json-encoded input string
1315

1316
    This is a single-node call.
1317

1318
    """
1319
    return self._SingleNodeCall(node, "iallocator_runner", [name, idata])
1320

    
1321
  @_RpcTimeout(_TMO_NORMAL)
1322
  def call_blockdev_grow(self, node, cf_bdev, amount, dryrun):
1323
    """Request a snapshot of the given block device.
1324

1325
    This is a single-node call.
1326

1327
    """
1328
    return self._SingleNodeCall(node, "blockdev_grow",
1329
                                [cf_bdev.ToDict(), amount, dryrun])
1330

    
1331
  @_RpcTimeout(_TMO_1DAY)
1332
  def call_blockdev_export(self, node, cf_bdev,
1333
                           dest_node, dest_path, cluster_name):
1334
    """Export a given disk to another node.
1335

1336
    This is a single-node call.
1337

1338
    """
1339
    return self._SingleNodeCall(node, "blockdev_export",
1340
                                [cf_bdev.ToDict(), dest_node, dest_path,
1341
                                 cluster_name])
1342

    
1343
  @_RpcTimeout(_TMO_NORMAL)
1344
  def call_blockdev_snapshot(self, node, cf_bdev):
1345
    """Request a snapshot of the given block device.
1346

1347
    This is a single-node call.
1348

1349
    """
1350
    return self._SingleNodeCall(node, "blockdev_snapshot", [cf_bdev.ToDict()])
1351

    
1352
  @_RpcTimeout(_TMO_NORMAL)
1353
  def call_finalize_export(self, node, instance, snap_disks):
1354
    """Request the completion of an export operation.
1355

1356
    This writes the export config file, etc.
1357

1358
    This is a single-node call.
1359

1360
    """
1361
    flat_disks = []
1362
    for disk in snap_disks:
1363
      if isinstance(disk, bool):
1364
        flat_disks.append(disk)
1365
      else:
1366
        flat_disks.append(disk.ToDict())
1367

    
1368
    return self._SingleNodeCall(node, "finalize_export",
1369
                                [self._InstDict(instance), flat_disks])
1370

    
1371
  @_RpcTimeout(_TMO_FAST)
1372
  def call_export_info(self, node, path):
1373
    """Queries the export information in a given path.
1374

1375
    This is a single-node call.
1376

1377
    """
1378
    return self._SingleNodeCall(node, "export_info", [path])
1379

    
1380
  @_RpcTimeout(_TMO_FAST)
1381
  def call_export_list(self, node_list):
1382
    """Gets the stored exports list.
1383

1384
    This is a multi-node call.
1385

1386
    """
1387
    return self._MultiNodeCall(node_list, "export_list", [])
1388

    
1389
  @_RpcTimeout(_TMO_FAST)
1390
  def call_export_remove(self, node, export):
1391
    """Requests removal of a given export.
1392

1393
    This is a single-node call.
1394

1395
    """
1396
    return self._SingleNodeCall(node, "export_remove", [export])
1397

    
1398
  @classmethod
1399
  @_RpcTimeout(_TMO_NORMAL)
1400
  def call_node_leave_cluster(cls, node, modify_ssh_setup):
1401
    """Requests a node to clean the cluster information it has.
1402

1403
    This will remove the configuration information from the ganeti data
1404
    dir.
1405

1406
    This is a single-node call.
1407

1408
    """
1409
    return cls._StaticSingleNodeCall(node, "node_leave_cluster",
1410
                                     [modify_ssh_setup])
1411

    
1412
  @_RpcTimeout(_TMO_FAST)
1413
  def call_node_volumes(self, node_list):
1414
    """Gets all volumes on node(s).
1415

1416
    This is a multi-node call.
1417

1418
    """
1419
    return self._MultiNodeCall(node_list, "node_volumes", [])
1420

    
1421
  @_RpcTimeout(_TMO_FAST)
1422
  def call_node_demote_from_mc(self, node):
1423
    """Demote a node from the master candidate role.
1424

1425
    This is a single-node call.
1426

1427
    """
1428
    return self._SingleNodeCall(node, "node_demote_from_mc", [])
1429

    
1430
  @_RpcTimeout(_TMO_NORMAL)
1431
  def call_node_powercycle(self, node, hypervisor):
1432
    """Tries to powercycle a node.
1433

1434
    This is a single-node call.
1435

1436
    """
1437
    return self._SingleNodeCall(node, "node_powercycle", [hypervisor])
1438

    
1439
  @_RpcTimeout(None)
1440
  def call_test_delay(self, node_list, duration):
1441
    """Sleep for a fixed time on given node(s).
1442

1443
    This is a multi-node call.
1444

1445
    """
1446
    return self._MultiNodeCall(node_list, "test_delay", [duration],
1447
                               read_timeout=int(duration + 5))
1448

    
1449
  @_RpcTimeout(_TMO_FAST)
1450
  def call_file_storage_dir_create(self, node, file_storage_dir):
1451
    """Create the given file storage directory.
1452

1453
    This is a single-node call.
1454

1455
    """
1456
    return self._SingleNodeCall(node, "file_storage_dir_create",
1457
                                [file_storage_dir])
1458

    
1459
  @_RpcTimeout(_TMO_FAST)
1460
  def call_file_storage_dir_remove(self, node, file_storage_dir):
1461
    """Remove the given file storage directory.
1462

1463
    This is a single-node call.
1464

1465
    """
1466
    return self._SingleNodeCall(node, "file_storage_dir_remove",
1467
                                [file_storage_dir])
1468

    
1469
  @_RpcTimeout(_TMO_FAST)
1470
  def call_file_storage_dir_rename(self, node, old_file_storage_dir,
1471
                                   new_file_storage_dir):
1472
    """Rename file storage directory.
1473

1474
    This is a single-node call.
1475

1476
    """
1477
    return self._SingleNodeCall(node, "file_storage_dir_rename",
1478
                                [old_file_storage_dir, new_file_storage_dir])
1479

    
1480
  @classmethod
1481
  @_RpcTimeout(_TMO_URGENT)
1482
  def call_jobqueue_update(cls, node_list, address_list, file_name, content):
1483
    """Update job queue.
1484

1485
    This is a multi-node call.
1486

1487
    """
1488
    return cls._StaticMultiNodeCall(node_list, "jobqueue_update",
1489
                                    [file_name, _Compress(content)],
1490
                                    address_list=address_list)
1491

    
1492
  @classmethod
1493
  @_RpcTimeout(_TMO_NORMAL)
1494
  def call_jobqueue_purge(cls, node):
1495
    """Purge job queue.
1496

1497
    This is a single-node call.
1498

1499
    """
1500
    return cls._StaticSingleNodeCall(node, "jobqueue_purge", [])
1501

    
1502
  @classmethod
1503
  @_RpcTimeout(_TMO_URGENT)
1504
  def call_jobqueue_rename(cls, node_list, address_list, rename):
1505
    """Rename a job queue file.
1506

1507
    This is a multi-node call.
1508

1509
    """
1510
    return cls._StaticMultiNodeCall(node_list, "jobqueue_rename", rename,
1511
                                    address_list=address_list)
1512

    
1513
  @_RpcTimeout(_TMO_NORMAL)
1514
  def call_hypervisor_validate_params(self, node_list, hvname, hvparams):
1515
    """Validate the hypervisor params.
1516

1517
    This is a multi-node call.
1518

1519
    @type node_list: list
1520
    @param node_list: the list of nodes to query
1521
    @type hvname: string
1522
    @param hvname: the hypervisor name
1523
    @type hvparams: dict
1524
    @param hvparams: the hypervisor parameters to be validated
1525

1526
    """
1527
    cluster = self._cfg.GetClusterInfo()
1528
    hv_full = objects.FillDict(cluster.hvparams.get(hvname, {}), hvparams)
1529
    return self._MultiNodeCall(node_list, "hypervisor_validate_params",
1530
                               [hvname, hv_full])
1531

    
1532
  @_RpcTimeout(_TMO_NORMAL)
1533
  def call_x509_cert_create(self, node, validity):
1534
    """Creates a new X509 certificate for SSL/TLS.
1535

1536
    This is a single-node call.
1537

1538
    @type validity: int
1539
    @param validity: Validity in seconds
1540

1541
    """
1542
    return self._SingleNodeCall(node, "x509_cert_create", [validity])
1543

    
1544
  @_RpcTimeout(_TMO_NORMAL)
1545
  def call_x509_cert_remove(self, node, name):
1546
    """Removes a X509 certificate.
1547

1548
    This is a single-node call.
1549

1550
    @type name: string
1551
    @param name: Certificate name
1552

1553
    """
1554
    return self._SingleNodeCall(node, "x509_cert_remove", [name])
1555

    
1556
  @_RpcTimeout(_TMO_NORMAL)
1557
  def call_import_start(self, node, opts, instance, component,
1558
                        dest, dest_args):
1559
    """Starts a listener for an import.
1560

1561
    This is a single-node call.
1562

1563
    @type node: string
1564
    @param node: Node name
1565
    @type instance: C{objects.Instance}
1566
    @param instance: Instance object
1567
    @type component: string
1568
    @param component: which part of the instance is being imported
1569

1570
    """
1571
    return self._SingleNodeCall(node, "import_start",
1572
                                [opts.ToDict(),
1573
                                 self._InstDict(instance), component, dest,
1574
                                 _EncodeImportExportIO(dest, dest_args)])
1575

    
1576
  @_RpcTimeout(_TMO_NORMAL)
1577
  def call_export_start(self, node, opts, host, port,
1578
                        instance, component, source, source_args):
1579
    """Starts an export daemon.
1580

1581
    This is a single-node call.
1582

1583
    @type node: string
1584
    @param node: Node name
1585
    @type instance: C{objects.Instance}
1586
    @param instance: Instance object
1587
    @type component: string
1588
    @param component: which part of the instance is being imported
1589

1590
    """
1591
    return self._SingleNodeCall(node, "export_start",
1592
                                [opts.ToDict(), host, port,
1593
                                 self._InstDict(instance),
1594
                                 component, source,
1595
                                 _EncodeImportExportIO(source, source_args)])
1596

    
1597
  @_RpcTimeout(_TMO_FAST)
1598
  def call_impexp_status(self, node, names):
1599
    """Gets the status of an import or export.
1600

1601
    This is a single-node call.
1602

1603
    @type node: string
1604
    @param node: Node name
1605
    @type names: List of strings
1606
    @param names: Import/export names
1607
    @rtype: List of L{objects.ImportExportStatus} instances
1608
    @return: Returns a list of the state of each named import/export or None if
1609
             a status couldn't be retrieved
1610

1611
    """
1612
    result = self._SingleNodeCall(node, "impexp_status", [names])
1613

    
1614
    if not result.fail_msg:
1615
      decoded = []
1616

    
1617
      for i in result.payload:
1618
        if i is None:
1619
          decoded.append(None)
1620
          continue
1621
        decoded.append(objects.ImportExportStatus.FromDict(i))
1622

    
1623
      result.payload = decoded
1624

    
1625
    return result
1626

    
1627
  @_RpcTimeout(_TMO_NORMAL)
1628
  def call_impexp_abort(self, node, name):
1629
    """Aborts an import or export.
1630

1631
    This is a single-node call.
1632

1633
    @type node: string
1634
    @param node: Node name
1635
    @type name: string
1636
    @param name: Import/export name
1637

1638
    """
1639
    return self._SingleNodeCall(node, "impexp_abort", [name])
1640

    
1641
  @_RpcTimeout(_TMO_NORMAL)
1642
  def call_impexp_cleanup(self, node, name):
1643
    """Cleans up after an import or export.
1644

1645
    This is a single-node call.
1646

1647
    @type node: string
1648
    @param node: Node name
1649
    @type name: string
1650
    @param name: Import/export name
1651

1652
    """
1653
    return self._SingleNodeCall(node, "impexp_cleanup", [name])