Statistics
| Branch: | Tag: | Revision:

root / lib / rpc.py @ fb460cf7

History | View | Annotate | Download (48 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Inter-node RPC library.
23

24
"""
25

    
26
# pylint: disable=C0103,R0201,R0904
27
# C0103: Invalid name, since call_ are not valid
28
# R0201: Method could be a function, we keep all rpcs instance methods
29
# as not to change them back and forth between static/instance methods
30
# if they need to start using instance attributes
31
# R0904: Too many public methods
32

    
33
import os
34
import logging
35
import zlib
36
import base64
37
import pycurl
38
import threading
39

    
40
from ganeti import utils
41
from ganeti import objects
42
from ganeti import http
43
from ganeti import serializer
44
from ganeti import constants
45
from ganeti import errors
46
from ganeti import netutils
47
from ganeti import ssconf
48
from ganeti import runtime
49
from ganeti import compat
50

    
51
# pylint has a bug here, doesn't see this import
52
import ganeti.http.client  # pylint: disable=W0611
53

    
54

    
55
# Timeout for connecting to nodes (seconds)
56
_RPC_CONNECT_TIMEOUT = 5
57

    
58
_RPC_CLIENT_HEADERS = [
59
  "Content-type: %s" % http.HTTP_APP_JSON,
60
  "Expect:",
61
  ]
62

    
63
# Various time constants for the timeout table
64
_TMO_URGENT = 60 # one minute
65
_TMO_FAST = 5 * 60 # five minutes
66
_TMO_NORMAL = 15 * 60 # 15 minutes
67
_TMO_SLOW = 3600 # one hour
68
_TMO_4HRS = 4 * 3600
69
_TMO_1DAY = 86400
70

    
71
# Timeout table that will be built later by decorators
72
# Guidelines for choosing timeouts:
73
# - call used during watcher: timeout -> 1min, _TMO_URGENT
74
# - trivial (but be sure it is trivial) (e.g. reading a file): 5min, _TMO_FAST
75
# - other calls: 15 min, _TMO_NORMAL
76
# - special calls (instance add, etc.): either _TMO_SLOW (1h) or huge timeouts
77

    
78
_TIMEOUTS = {
79
}
80

    
81
#: Special value to describe an offline host
82
_OFFLINE = object()
83

    
84

    
85
def Init():
86
  """Initializes the module-global HTTP client manager.
87

88
  Must be called before using any RPC function and while exactly one thread is
89
  running.
90

91
  """
92
  # curl_global_init(3) and curl_global_cleanup(3) must be called with only
93
  # one thread running. This check is just a safety measure -- it doesn't
94
  # cover all cases.
95
  assert threading.activeCount() == 1, \
96
         "Found more than one active thread when initializing pycURL"
97

    
98
  logging.info("Using PycURL %s", pycurl.version)
99

    
100
  pycurl.global_init(pycurl.GLOBAL_ALL)
101

    
102

    
103
def Shutdown():
104
  """Stops the module-global HTTP client manager.
105

106
  Must be called before quitting the program and while exactly one thread is
107
  running.
108

109
  """
110
  pycurl.global_cleanup()
111

    
112

    
113
def _ConfigRpcCurl(curl):
114
  noded_cert = str(constants.NODED_CERT_FILE)
115

    
116
  curl.setopt(pycurl.FOLLOWLOCATION, False)
117
  curl.setopt(pycurl.CAINFO, noded_cert)
118
  curl.setopt(pycurl.SSL_VERIFYHOST, 0)
119
  curl.setopt(pycurl.SSL_VERIFYPEER, True)
120
  curl.setopt(pycurl.SSLCERTTYPE, "PEM")
121
  curl.setopt(pycurl.SSLCERT, noded_cert)
122
  curl.setopt(pycurl.SSLKEYTYPE, "PEM")
123
  curl.setopt(pycurl.SSLKEY, noded_cert)
124
  curl.setopt(pycurl.CONNECTTIMEOUT, _RPC_CONNECT_TIMEOUT)
125

    
126

    
127
# Aliasing this module avoids the following warning by epydoc: "Warning: No
128
# information available for ganeti.rpc._RpcThreadLocal's base threading.local"
129
_threading = threading
130

    
131

    
132
class _RpcThreadLocal(_threading.local):
133
  def GetHttpClientPool(self):
134
    """Returns a per-thread HTTP client pool.
135

136
    @rtype: L{http.client.HttpClientPool}
137

138
    """
139
    try:
140
      pool = self.hcp
141
    except AttributeError:
142
      pool = http.client.HttpClientPool(_ConfigRpcCurl)
143
      self.hcp = pool
144

    
145
    return pool
146

    
147

    
148
# Remove module alias (see above)
149
del _threading
150

    
151

    
152
_thread_local = _RpcThreadLocal()
153

    
154

    
155
def _RpcTimeout(secs):
156
  """Timeout decorator.
157

158
  When applied to a rpc call_* function, it updates the global timeout
159
  table with the given function/timeout.
160

161
  """
162
  def decorator(f):
163
    name = f.__name__
164
    assert name.startswith("call_")
165
    _TIMEOUTS[name[len("call_"):]] = secs
166
    return f
167
  return decorator
168

    
169

    
170
def RunWithRPC(fn):
171
  """RPC-wrapper decorator.
172

173
  When applied to a function, it runs it with the RPC system
174
  initialized, and it shutsdown the system afterwards. This means the
175
  function must be called without RPC being initialized.
176

177
  """
178
  def wrapper(*args, **kwargs):
179
    Init()
180
    try:
181
      return fn(*args, **kwargs)
182
    finally:
183
      Shutdown()
184
  return wrapper
185

    
186

    
187
def _Compress(data):
188
  """Compresses a string for transport over RPC.
189

190
  Small amounts of data are not compressed.
191

192
  @type data: str
193
  @param data: Data
194
  @rtype: tuple
195
  @return: Encoded data to send
196

197
  """
198
  # Small amounts of data are not compressed
199
  if len(data) < 512:
200
    return (constants.RPC_ENCODING_NONE, data)
201

    
202
  # Compress with zlib and encode in base64
203
  return (constants.RPC_ENCODING_ZLIB_BASE64,
204
          base64.b64encode(zlib.compress(data, 3)))
205

    
206

    
207
class RpcResult(object):
208
  """RPC Result class.
209

210
  This class holds an RPC result. It is needed since in multi-node
211
  calls we can't raise an exception just because one one out of many
212
  failed, and therefore we use this class to encapsulate the result.
213

214
  @ivar data: the data payload, for successful results, or None
215
  @ivar call: the name of the RPC call
216
  @ivar node: the name of the node to which we made the call
217
  @ivar offline: whether the operation failed because the node was
218
      offline, as opposed to actual failure; offline=True will always
219
      imply failed=True, in order to allow simpler checking if
220
      the user doesn't care about the exact failure mode
221
  @ivar fail_msg: the error message if the call failed
222

223
  """
224
  def __init__(self, data=None, failed=False, offline=False,
225
               call=None, node=None):
226
    self.offline = offline
227
    self.call = call
228
    self.node = node
229

    
230
    if offline:
231
      self.fail_msg = "Node is marked offline"
232
      self.data = self.payload = None
233
    elif failed:
234
      self.fail_msg = self._EnsureErr(data)
235
      self.data = self.payload = None
236
    else:
237
      self.data = data
238
      if not isinstance(self.data, (tuple, list)):
239
        self.fail_msg = ("RPC layer error: invalid result type (%s)" %
240
                         type(self.data))
241
        self.payload = None
242
      elif len(data) != 2:
243
        self.fail_msg = ("RPC layer error: invalid result length (%d), "
244
                         "expected 2" % len(self.data))
245
        self.payload = None
246
      elif not self.data[0]:
247
        self.fail_msg = self._EnsureErr(self.data[1])
248
        self.payload = None
249
      else:
250
        # finally success
251
        self.fail_msg = None
252
        self.payload = data[1]
253

    
254
    for attr_name in ["call", "data", "fail_msg",
255
                      "node", "offline", "payload"]:
256
      assert hasattr(self, attr_name), "Missing attribute %s" % attr_name
257

    
258
  @staticmethod
259
  def _EnsureErr(val):
260
    """Helper to ensure we return a 'True' value for error."""
261
    if val:
262
      return val
263
    else:
264
      return "No error information"
265

    
266
  def Raise(self, msg, prereq=False, ecode=None):
267
    """If the result has failed, raise an OpExecError.
268

269
    This is used so that LU code doesn't have to check for each
270
    result, but instead can call this function.
271

272
    """
273
    if not self.fail_msg:
274
      return
275

    
276
    if not msg: # one could pass None for default message
277
      msg = ("Call '%s' to node '%s' has failed: %s" %
278
             (self.call, self.node, self.fail_msg))
279
    else:
280
      msg = "%s: %s" % (msg, self.fail_msg)
281
    if prereq:
282
      ec = errors.OpPrereqError
283
    else:
284
      ec = errors.OpExecError
285
    if ecode is not None:
286
      args = (msg, ecode)
287
    else:
288
      args = (msg, )
289
    raise ec(*args) # pylint: disable=W0142
290

    
291

    
292
def _SsconfResolver(node_list,
293
                    ssc=ssconf.SimpleStore,
294
                    nslookup_fn=netutils.Hostname.GetIP):
295
  """Return addresses for given node names.
296

297
  @type node_list: list
298
  @param node_list: List of node names
299
  @type ssc: class
300
  @param ssc: SimpleStore class that is used to obtain node->ip mappings
301
  @type nslookup_fn: callable
302
  @param nslookup_fn: function use to do NS lookup
303
  @rtype: list of tuple; (string, string)
304
  @return: List of tuples containing node name and IP address
305

306
  """
307
  ss = ssc()
308
  iplist = ss.GetNodePrimaryIPList()
309
  family = ss.GetPrimaryIPFamily()
310
  ipmap = dict(entry.split() for entry in iplist)
311

    
312
  result = []
313
  for node in node_list:
314
    ip = ipmap.get(node)
315
    if ip is None:
316
      ip = nslookup_fn(node, family=family)
317
    result.append((node, ip))
318

    
319
  return result
320

    
321

    
322
class _StaticResolver:
323
  def __init__(self, addresses):
324
    """Initializes this class.
325

326
    """
327
    self._addresses = addresses
328

    
329
  def __call__(self, hosts):
330
    """Returns static addresses for hosts.
331

332
    """
333
    assert len(hosts) == len(self._addresses)
334
    return zip(hosts, self._addresses)
335

    
336

    
337
def _CheckConfigNode(name, node):
338
  """Checks if a node is online.
339

340
  @type name: string
341
  @param name: Node name
342
  @type node: L{objects.Node} or None
343
  @param node: Node object
344

345
  """
346
  if node is None:
347
    # Depend on DNS for name resolution
348
    ip = name
349
  elif node.offline:
350
    ip = _OFFLINE
351
  else:
352
    ip = node.primary_ip
353
  return (name, ip)
354

    
355

    
356
def _NodeConfigResolver(single_node_fn, all_nodes_fn, hosts):
357
  """Calculate node addresses using configuration.
358

359
  """
360
  # Special case for single-host lookups
361
  if len(hosts) == 1:
362
    (name, ) = hosts
363
    return [_CheckConfigNode(name, single_node_fn(name))]
364
  else:
365
    all_nodes = all_nodes_fn()
366
    return [_CheckConfigNode(name, all_nodes.get(name, None))
367
            for name in hosts]
368

    
369

    
370
class _RpcProcessor:
371
  def __init__(self, resolver, port, lock_monitor_cb=None):
372
    """Initializes this class.
373

374
    @param resolver: callable accepting a list of hostnames, returning a list
375
      of tuples containing name and IP address (IP address can be the name or
376
      the special value L{_OFFLINE} to mark offline machines)
377
    @type port: int
378
    @param port: TCP port
379
    @param lock_monitor_cb: Callable for registering with lock monitor
380

381
    """
382
    self._resolver = resolver
383
    self._port = port
384
    self._lock_monitor_cb = lock_monitor_cb
385

    
386
  @staticmethod
387
  def _PrepareRequests(hosts, port, procedure, body, read_timeout):
388
    """Prepares requests by sorting offline hosts into separate list.
389

390
    """
391
    results = {}
392
    requests = {}
393

    
394
    for (name, ip) in hosts:
395
      if ip is _OFFLINE:
396
        # Node is marked as offline
397
        results[name] = RpcResult(node=name, offline=True, call=procedure)
398
      else:
399
        requests[name] = \
400
          http.client.HttpClientRequest(str(ip), port,
401
                                        http.HTTP_PUT, str("/%s" % procedure),
402
                                        headers=_RPC_CLIENT_HEADERS,
403
                                        post_data=body,
404
                                        read_timeout=read_timeout,
405
                                        nicename="%s/%s" % (name, procedure))
406

    
407
    return (results, requests)
408

    
409
  @staticmethod
410
  def _CombineResults(results, requests, procedure):
411
    """Combines pre-computed results for offline hosts with actual call results.
412

413
    """
414
    for name, req in requests.items():
415
      if req.success and req.resp_status_code == http.HTTP_OK:
416
        host_result = RpcResult(data=serializer.LoadJson(req.resp_body),
417
                                node=name, call=procedure)
418
      else:
419
        # TODO: Better error reporting
420
        if req.error:
421
          msg = req.error
422
        else:
423
          msg = req.resp_body
424

    
425
        logging.error("RPC error in %s on node %s: %s", procedure, name, msg)
426
        host_result = RpcResult(data=msg, failed=True, node=name,
427
                                call=procedure)
428

    
429
      results[name] = host_result
430

    
431
    return results
432

    
433
  def __call__(self, hosts, procedure, body, read_timeout=None, http_pool=None):
434
    """Makes an RPC request to a number of nodes.
435

436
    @type hosts: sequence
437
    @param hosts: Hostnames
438
    @type procedure: string
439
    @param procedure: Request path
440
    @type body: string
441
    @param body: Request body
442
    @type read_timeout: int or None
443
    @param read_timeout: Read timeout for request
444

445
    """
446
    assert procedure in _TIMEOUTS, "RPC call not declared in the timeouts table"
447

    
448
    if not http_pool:
449
      http_pool = _thread_local.GetHttpClientPool()
450

    
451
    if read_timeout is None:
452
      read_timeout = _TIMEOUTS[procedure]
453

    
454
    (results, requests) = \
455
      self._PrepareRequests(self._resolver(hosts), self._port, procedure,
456
                            str(body), read_timeout)
457

    
458
    http_pool.ProcessRequests(requests.values(),
459
                              lock_monitor_cb=self._lock_monitor_cb)
460

    
461
    assert not frozenset(results).intersection(requests)
462

    
463
    return self._CombineResults(results, requests, procedure)
464

    
465

    
466
def _EncodeImportExportIO(ieio, ieioargs):
467
  """Encodes import/export I/O information.
468

469
  """
470
  if ieio == constants.IEIO_RAW_DISK:
471
    assert len(ieioargs) == 1
472
    return (ieioargs[0].ToDict(), )
473

    
474
  if ieio == constants.IEIO_SCRIPT:
475
    assert len(ieioargs) == 2
476
    return (ieioargs[0].ToDict(), ieioargs[1])
477

    
478
  return ieioargs
479

    
480

    
481
class RpcRunner(object):
482
  """RPC runner class.
483

484
  """
485
  def __init__(self, context):
486
    """Initialized the RPC runner.
487

488
    @type context: C{masterd.GanetiContext}
489
    @param context: Ganeti context
490

491
    """
492
    self._cfg = context.cfg
493
    self._proc = _RpcProcessor(compat.partial(_NodeConfigResolver,
494
                                              self._cfg.GetNodeInfo,
495
                                              self._cfg.GetAllNodesInfo),
496
                               netutils.GetDaemonPort(constants.NODED),
497
                               lock_monitor_cb=context.glm.AddToLockMonitor)
498

    
499
  def _InstDict(self, instance, hvp=None, bep=None, osp=None):
500
    """Convert the given instance to a dict.
501

502
    This is done via the instance's ToDict() method and additionally
503
    we fill the hvparams with the cluster defaults.
504

505
    @type instance: L{objects.Instance}
506
    @param instance: an Instance object
507
    @type hvp: dict or None
508
    @param hvp: a dictionary with overridden hypervisor parameters
509
    @type bep: dict or None
510
    @param bep: a dictionary with overridden backend parameters
511
    @type osp: dict or None
512
    @param osp: a dictionary with overridden os parameters
513
    @rtype: dict
514
    @return: the instance dict, with the hvparams filled with the
515
        cluster defaults
516

517
    """
518
    idict = instance.ToDict()
519
    cluster = self._cfg.GetClusterInfo()
520
    idict["hvparams"] = cluster.FillHV(instance)
521
    if hvp is not None:
522
      idict["hvparams"].update(hvp)
523
    idict["beparams"] = cluster.FillBE(instance)
524
    if bep is not None:
525
      idict["beparams"].update(bep)
526
    idict["osparams"] = cluster.SimpleFillOS(instance.os, instance.osparams)
527
    if osp is not None:
528
      idict["osparams"].update(osp)
529
    for nic in idict["nics"]:
530
      nic['nicparams'] = objects.FillDict(
531
        cluster.nicparams[constants.PP_DEFAULT],
532
        nic['nicparams'])
533
    return idict
534

    
535
  def _MultiNodeCall(self, node_list, procedure, args, read_timeout=None):
536
    """Helper for making a multi-node call
537

538
    """
539
    body = serializer.DumpJson(args, indent=False)
540
    return self._proc(node_list, procedure, body, read_timeout=read_timeout)
541

    
542
  @staticmethod
543
  def _StaticMultiNodeCall(node_list, procedure, args,
544
                           address_list=None, read_timeout=None):
545
    """Helper for making a multi-node static call
546

547
    """
548
    body = serializer.DumpJson(args, indent=False)
549

    
550
    if address_list is None:
551
      resolver = _SsconfResolver
552
    else:
553
      # Caller provided an address list
554
      resolver = _StaticResolver(address_list)
555

    
556
    proc = _RpcProcessor(resolver,
557
                         netutils.GetDaemonPort(constants.NODED))
558
    return proc(node_list, procedure, body, read_timeout=read_timeout)
559

    
560
  def _SingleNodeCall(self, node, procedure, args, read_timeout=None):
561
    """Helper for making a single-node call
562

563
    """
564
    body = serializer.DumpJson(args, indent=False)
565
    return self._proc([node], procedure, body, read_timeout=read_timeout)[node]
566

    
567
  @classmethod
568
  def _StaticSingleNodeCall(cls, node, procedure, args, read_timeout=None):
569
    """Helper for making a single-node static call
570

571
    """
572
    body = serializer.DumpJson(args, indent=False)
573
    proc = _RpcProcessor(_SsconfResolver,
574
                         netutils.GetDaemonPort(constants.NODED))
575
    return proc([node], procedure, body, read_timeout=read_timeout)[node]
576

    
577
  #
578
  # Begin RPC calls
579
  #
580

    
581
  @_RpcTimeout(_TMO_URGENT)
582
  def call_bdev_sizes(self, node_list, devices):
583
    """Gets the sizes of requested block devices present on a node
584

585
    This is a multi-node call.
586

587
    """
588
    return self._MultiNodeCall(node_list, "bdev_sizes", [devices])
589

    
590
  @_RpcTimeout(_TMO_URGENT)
591
  def call_lv_list(self, node_list, vg_name):
592
    """Gets the logical volumes present in a given volume group.
593

594
    This is a multi-node call.
595

596
    """
597
    return self._MultiNodeCall(node_list, "lv_list", [vg_name])
598

    
599
  @_RpcTimeout(_TMO_URGENT)
600
  def call_vg_list(self, node_list):
601
    """Gets the volume group list.
602

603
    This is a multi-node call.
604

605
    """
606
    return self._MultiNodeCall(node_list, "vg_list", [])
607

    
608
  @_RpcTimeout(_TMO_NORMAL)
609
  def call_storage_list(self, node_list, su_name, su_args, name, fields):
610
    """Get list of storage units.
611

612
    This is a multi-node call.
613

614
    """
615
    return self._MultiNodeCall(node_list, "storage_list",
616
                               [su_name, su_args, name, fields])
617

    
618
  @_RpcTimeout(_TMO_NORMAL)
619
  def call_storage_modify(self, node, su_name, su_args, name, changes):
620
    """Modify a storage unit.
621

622
    This is a single-node call.
623

624
    """
625
    return self._SingleNodeCall(node, "storage_modify",
626
                                [su_name, su_args, name, changes])
627

    
628
  @_RpcTimeout(_TMO_NORMAL)
629
  def call_storage_execute(self, node, su_name, su_args, name, op):
630
    """Executes an operation on a storage unit.
631

632
    This is a single-node call.
633

634
    """
635
    return self._SingleNodeCall(node, "storage_execute",
636
                                [su_name, su_args, name, op])
637

    
638
  @_RpcTimeout(_TMO_URGENT)
639
  def call_bridges_exist(self, node, bridges_list):
640
    """Checks if a node has all the bridges given.
641

642
    This method checks if all bridges given in the bridges_list are
643
    present on the remote node, so that an instance that uses interfaces
644
    on those bridges can be started.
645

646
    This is a single-node call.
647

648
    """
649
    return self._SingleNodeCall(node, "bridges_exist", [bridges_list])
650

    
651
  @_RpcTimeout(_TMO_NORMAL)
652
  def call_instance_start(self, node, instance, hvp, bep, startup_paused):
653
    """Starts an instance.
654

655
    This is a single-node call.
656

657
    """
658
    idict = self._InstDict(instance, hvp=hvp, bep=bep)
659
    return self._SingleNodeCall(node, "instance_start", [idict, startup_paused])
660

    
661
  @_RpcTimeout(_TMO_NORMAL)
662
  def call_instance_shutdown(self, node, instance, timeout):
663
    """Stops an instance.
664

665
    This is a single-node call.
666

667
    """
668
    return self._SingleNodeCall(node, "instance_shutdown",
669
                                [self._InstDict(instance), timeout])
670

    
671
  @_RpcTimeout(_TMO_NORMAL)
672
  def call_migration_info(self, node, instance):
673
    """Gather the information necessary to prepare an instance migration.
674

675
    This is a single-node call.
676

677
    @type node: string
678
    @param node: the node on which the instance is currently running
679
    @type instance: C{objects.Instance}
680
    @param instance: the instance definition
681

682
    """
683
    return self._SingleNodeCall(node, "migration_info",
684
                                [self._InstDict(instance)])
685

    
686
  @_RpcTimeout(_TMO_NORMAL)
687
  def call_accept_instance(self, node, instance, info, target):
688
    """Prepare a node to accept an instance.
689

690
    This is a single-node call.
691

692
    @type node: string
693
    @param node: the target node for the migration
694
    @type instance: C{objects.Instance}
695
    @param instance: the instance definition
696
    @type info: opaque/hypervisor specific (string/data)
697
    @param info: result for the call_migration_info call
698
    @type target: string
699
    @param target: target hostname (usually ip address) (on the node itself)
700

701
    """
702
    return self._SingleNodeCall(node, "accept_instance",
703
                                [self._InstDict(instance), info, target])
704

    
705
  @_RpcTimeout(_TMO_NORMAL)
706
  def call_instance_finalize_migration_dst(self, node, instance, info, success):
707
    """Finalize any target-node migration specific operation.
708

709
    This is called both in case of a successful migration and in case of error
710
    (in which case it should abort the migration).
711

712
    This is a single-node call.
713

714
    @type node: string
715
    @param node: the target node for the migration
716
    @type instance: C{objects.Instance}
717
    @param instance: the instance definition
718
    @type info: opaque/hypervisor specific (string/data)
719
    @param info: result for the call_migration_info call
720
    @type success: boolean
721
    @param success: whether the migration was a success or a failure
722

723
    """
724
    return self._SingleNodeCall(node, "instance_finalize_migration_dst",
725
                                [self._InstDict(instance), info, success])
726

    
727
  @_RpcTimeout(_TMO_SLOW)
728
  def call_instance_migrate(self, node, instance, target, live):
729
    """Migrate an instance.
730

731
    This is a single-node call.
732

733
    @type node: string
734
    @param node: the node on which the instance is currently running
735
    @type instance: C{objects.Instance}
736
    @param instance: the instance definition
737
    @type target: string
738
    @param target: the target node name
739
    @type live: boolean
740
    @param live: whether the migration should be done live or not (the
741
        interpretation of this parameter is left to the hypervisor)
742

743
    """
744
    return self._SingleNodeCall(node, "instance_migrate",
745
                                [self._InstDict(instance), target, live])
746

    
747
  @_RpcTimeout(_TMO_SLOW)
748
  def call_instance_finalize_migration_src(self, node, instance, success, live):
749
    """Finalize the instance migration on the source node.
750

751
    This is a single-node call.
752

753
    @type instance: L{objects.Instance}
754
    @param instance: the instance that was migrated
755
    @type success: bool
756
    @param success: whether the migration succeeded or not
757
    @type live: bool
758
    @param live: whether the user requested a live migration or not
759

760
    """
761
    return self._SingleNodeCall(node, "instance_finalize_migration_src",
762
                                [self._InstDict(instance), success, live])
763

    
764
  @_RpcTimeout(_TMO_SLOW)
765
  def call_instance_get_migration_status(self, node, instance):
766
    """Report migration status.
767

768
    This is a single-node call that must be executed on the source node.
769

770
    @type instance: L{objects.Instance}
771
    @param instance: the instance that is being migrated
772
    @rtype: L{objects.MigrationStatus}
773
    @return: the status of the current migration (one of
774
             L{constants.HV_MIGRATION_VALID_STATUSES}), plus any additional
775
             progress info that can be retrieved from the hypervisor
776

777
    """
778
    result = self._SingleNodeCall(node, "instance_get_migration_status",
779
                                  [self._InstDict(instance)])
780
    if not result.fail_msg and result.payload is not None:
781
      result.payload = objects.MigrationStatus.FromDict(result.payload)
782
    return result
783

    
784
  @_RpcTimeout(_TMO_NORMAL)
785
  def call_instance_reboot(self, node, inst, reboot_type, shutdown_timeout):
786
    """Reboots an instance.
787

788
    This is a single-node call.
789

790
    """
791
    return self._SingleNodeCall(node, "instance_reboot",
792
                                [self._InstDict(inst), reboot_type,
793
                                 shutdown_timeout])
794

    
795
  @_RpcTimeout(_TMO_1DAY)
796
  def call_instance_os_add(self, node, inst, reinstall, debug, osparams=None):
797
    """Installs an OS on the given instance.
798

799
    This is a single-node call.
800

801
    """
802
    return self._SingleNodeCall(node, "instance_os_add",
803
                                [self._InstDict(inst, osp=osparams),
804
                                 reinstall, debug])
805

    
806
  @_RpcTimeout(_TMO_SLOW)
807
  def call_instance_run_rename(self, node, inst, old_name, debug):
808
    """Run the OS rename script for an instance.
809

810
    This is a single-node call.
811

812
    """
813
    return self._SingleNodeCall(node, "instance_run_rename",
814
                                [self._InstDict(inst), old_name, debug])
815

    
816
  @_RpcTimeout(_TMO_URGENT)
817
  def call_instance_info(self, node, instance, hname):
818
    """Returns information about a single instance.
819

820
    This is a single-node call.
821

822
    @type node: list
823
    @param node: the list of nodes to query
824
    @type instance: string
825
    @param instance: the instance name
826
    @type hname: string
827
    @param hname: the hypervisor type of the instance
828

829
    """
830
    return self._SingleNodeCall(node, "instance_info", [instance, hname])
831

    
832
  @_RpcTimeout(_TMO_NORMAL)
833
  def call_instance_migratable(self, node, instance):
834
    """Checks whether the given instance can be migrated.
835

836
    This is a single-node call.
837

838
    @param node: the node to query
839
    @type instance: L{objects.Instance}
840
    @param instance: the instance to check
841

842

843
    """
844
    return self._SingleNodeCall(node, "instance_migratable",
845
                                [self._InstDict(instance)])
846

    
847
  @_RpcTimeout(_TMO_URGENT)
848
  def call_all_instances_info(self, node_list, hypervisor_list):
849
    """Returns information about all instances on the given nodes.
850

851
    This is a multi-node call.
852

853
    @type node_list: list
854
    @param node_list: the list of nodes to query
855
    @type hypervisor_list: list
856
    @param hypervisor_list: the hypervisors to query for instances
857

858
    """
859
    return self._MultiNodeCall(node_list, "all_instances_info",
860
                               [hypervisor_list])
861

    
862
  @_RpcTimeout(_TMO_URGENT)
863
  def call_instance_list(self, node_list, hypervisor_list):
864
    """Returns the list of running instances on a given node.
865

866
    This is a multi-node call.
867

868
    @type node_list: list
869
    @param node_list: the list of nodes to query
870
    @type hypervisor_list: list
871
    @param hypervisor_list: the hypervisors to query for instances
872

873
    """
874
    return self._MultiNodeCall(node_list, "instance_list", [hypervisor_list])
875

    
876
  @_RpcTimeout(_TMO_FAST)
877
  def call_node_tcp_ping(self, node, source, target, port, timeout,
878
                         live_port_needed):
879
    """Do a TcpPing on the remote node
880

881
    This is a single-node call.
882

883
    """
884
    return self._SingleNodeCall(node, "node_tcp_ping",
885
                                [source, target, port, timeout,
886
                                 live_port_needed])
887

    
888
  @_RpcTimeout(_TMO_FAST)
889
  def call_node_has_ip_address(self, node, address):
890
    """Checks if a node has the given IP address.
891

892
    This is a single-node call.
893

894
    """
895
    return self._SingleNodeCall(node, "node_has_ip_address", [address])
896

    
897
  @_RpcTimeout(_TMO_URGENT)
898
  def call_node_info(self, node_list, vg_name, hypervisor_type):
899
    """Return node information.
900

901
    This will return memory information and volume group size and free
902
    space.
903

904
    This is a multi-node call.
905

906
    @type node_list: list
907
    @param node_list: the list of nodes to query
908
    @type vg_name: C{string}
909
    @param vg_name: the name of the volume group to ask for disk space
910
        information
911
    @type hypervisor_type: C{str}
912
    @param hypervisor_type: the name of the hypervisor to ask for
913
        memory information
914

915
    """
916
    return self._MultiNodeCall(node_list, "node_info",
917
                               [vg_name, hypervisor_type])
918

    
919
  @_RpcTimeout(_TMO_NORMAL)
920
  def call_etc_hosts_modify(self, node, mode, name, ip):
921
    """Modify hosts file with name
922

923
    @type node: string
924
    @param node: The node to call
925
    @type mode: string
926
    @param mode: The mode to operate. Currently "add" or "remove"
927
    @type name: string
928
    @param name: The host name to be modified
929
    @type ip: string
930
    @param ip: The ip of the entry (just valid if mode is "add")
931

932
    """
933
    return self._SingleNodeCall(node, "etc_hosts_modify", [mode, name, ip])
934

    
935
  @_RpcTimeout(_TMO_NORMAL)
936
  def call_node_verify(self, node_list, checkdict, cluster_name):
937
    """Request verification of given parameters.
938

939
    This is a multi-node call.
940

941
    """
942
    return self._MultiNodeCall(node_list, "node_verify",
943
                               [checkdict, cluster_name])
944

    
945
  @classmethod
946
  @_RpcTimeout(_TMO_FAST)
947
  def call_node_start_master_daemons(cls, node, no_voting):
948
    """Starts master daemons on a node.
949

950
    This is a single-node call.
951

952
    """
953
    return cls._StaticSingleNodeCall(node, "node_start_master_daemons",
954
                                     [no_voting])
955

    
956
  @classmethod
957
  @_RpcTimeout(_TMO_FAST)
958
  def call_node_activate_master_ip(cls, node):
959
    """Activates master IP on a node.
960

961
    This is a single-node call.
962

963
    """
964
    return cls._StaticSingleNodeCall(node, "node_activate_master_ip", [])
965

    
966
  @classmethod
967
  @_RpcTimeout(_TMO_FAST)
968
  def call_node_stop_master(cls, node):
969
    """Deactivates master IP and stops master daemons on a node.
970

971
    This is a single-node call.
972

973
    """
974
    return cls._StaticSingleNodeCall(node, "node_stop_master", [])
975

    
976
  @classmethod
977
  @_RpcTimeout(_TMO_FAST)
978
  def call_node_deactivate_master_ip(cls, node):
979
    """Deactivates master IP on a node.
980

981
    This is a single-node call.
982

983
    """
984
    return cls._StaticSingleNodeCall(node, "node_deactivate_master_ip", [])
985

    
986
  @classmethod
987
  @_RpcTimeout(_TMO_URGENT)
988
  def call_master_info(cls, node_list):
989
    """Query master info.
990

991
    This is a multi-node call.
992

993
    """
994
    # TODO: should this method query down nodes?
995
    return cls._StaticMultiNodeCall(node_list, "master_info", [])
996

    
997
  @classmethod
998
  @_RpcTimeout(_TMO_URGENT)
999
  def call_version(cls, node_list):
1000
    """Query node version.
1001

1002
    This is a multi-node call.
1003

1004
    """
1005
    return cls._StaticMultiNodeCall(node_list, "version", [])
1006

    
1007
  @_RpcTimeout(_TMO_NORMAL)
1008
  def call_blockdev_create(self, node, bdev, size, owner, on_primary, info):
1009
    """Request creation of a given block device.
1010

1011
    This is a single-node call.
1012

1013
    """
1014
    return self._SingleNodeCall(node, "blockdev_create",
1015
                                [bdev.ToDict(), size, owner, on_primary, info])
1016

    
1017
  @_RpcTimeout(_TMO_SLOW)
1018
  def call_blockdev_wipe(self, node, bdev, offset, size):
1019
    """Request wipe at given offset with given size of a block device.
1020

1021
    This is a single-node call.
1022

1023
    """
1024
    return self._SingleNodeCall(node, "blockdev_wipe",
1025
                                [bdev.ToDict(), offset, size])
1026

    
1027
  @_RpcTimeout(_TMO_NORMAL)
1028
  def call_blockdev_remove(self, node, bdev):
1029
    """Request removal of a given block device.
1030

1031
    This is a single-node call.
1032

1033
    """
1034
    return self._SingleNodeCall(node, "blockdev_remove", [bdev.ToDict()])
1035

    
1036
  @_RpcTimeout(_TMO_NORMAL)
1037
  def call_blockdev_rename(self, node, devlist):
1038
    """Request rename of the given block devices.
1039

1040
    This is a single-node call.
1041

1042
    """
1043
    return self._SingleNodeCall(node, "blockdev_rename",
1044
                                [(d.ToDict(), uid) for d, uid in devlist])
1045

    
1046
  @_RpcTimeout(_TMO_NORMAL)
1047
  def call_blockdev_pause_resume_sync(self, node, disks, pause):
1048
    """Request a pause/resume of given block device.
1049

1050
    This is a single-node call.
1051

1052
    """
1053
    return self._SingleNodeCall(node, "blockdev_pause_resume_sync",
1054
                                [[bdev.ToDict() for bdev in disks], pause])
1055

    
1056
  @_RpcTimeout(_TMO_NORMAL)
1057
  def call_blockdev_assemble(self, node, disk, owner, on_primary, idx):
1058
    """Request assembling of a given block device.
1059

1060
    This is a single-node call.
1061

1062
    """
1063
    return self._SingleNodeCall(node, "blockdev_assemble",
1064
                                [disk.ToDict(), owner, on_primary, idx])
1065

    
1066
  @_RpcTimeout(_TMO_NORMAL)
1067
  def call_blockdev_shutdown(self, node, disk):
1068
    """Request shutdown of a given block device.
1069

1070
    This is a single-node call.
1071

1072
    """
1073
    return self._SingleNodeCall(node, "blockdev_shutdown", [disk.ToDict()])
1074

    
1075
  @_RpcTimeout(_TMO_NORMAL)
1076
  def call_blockdev_addchildren(self, node, bdev, ndevs):
1077
    """Request adding a list of children to a (mirroring) device.
1078

1079
    This is a single-node call.
1080

1081
    """
1082
    return self._SingleNodeCall(node, "blockdev_addchildren",
1083
                                [bdev.ToDict(),
1084
                                 [disk.ToDict() for disk in ndevs]])
1085

    
1086
  @_RpcTimeout(_TMO_NORMAL)
1087
  def call_blockdev_removechildren(self, node, bdev, ndevs):
1088
    """Request removing a list of children from a (mirroring) device.
1089

1090
    This is a single-node call.
1091

1092
    """
1093
    return self._SingleNodeCall(node, "blockdev_removechildren",
1094
                                [bdev.ToDict(),
1095
                                 [disk.ToDict() for disk in ndevs]])
1096

    
1097
  @_RpcTimeout(_TMO_NORMAL)
1098
  def call_blockdev_getmirrorstatus(self, node, disks):
1099
    """Request status of a (mirroring) device.
1100

1101
    This is a single-node call.
1102

1103
    """
1104
    result = self._SingleNodeCall(node, "blockdev_getmirrorstatus",
1105
                                  [dsk.ToDict() for dsk in disks])
1106
    if not result.fail_msg:
1107
      result.payload = [objects.BlockDevStatus.FromDict(i)
1108
                        for i in result.payload]
1109
    return result
1110

    
1111
  @_RpcTimeout(_TMO_NORMAL)
1112
  def call_blockdev_getmirrorstatus_multi(self, node_list, node_disks):
1113
    """Request status of (mirroring) devices from multiple nodes.
1114

1115
    This is a multi-node call.
1116

1117
    """
1118
    result = self._MultiNodeCall(node_list, "blockdev_getmirrorstatus_multi",
1119
                                 [dict((name, [dsk.ToDict() for dsk in disks])
1120
                                       for name, disks in node_disks.items())])
1121
    for nres in result.values():
1122
      if nres.fail_msg:
1123
        continue
1124

    
1125
      for idx, (success, status) in enumerate(nres.payload):
1126
        if success:
1127
          nres.payload[idx] = (success, objects.BlockDevStatus.FromDict(status))
1128

    
1129
    return result
1130

    
1131
  @_RpcTimeout(_TMO_NORMAL)
1132
  def call_blockdev_find(self, node, disk):
1133
    """Request identification of a given block device.
1134

1135
    This is a single-node call.
1136

1137
    """
1138
    result = self._SingleNodeCall(node, "blockdev_find", [disk.ToDict()])
1139
    if not result.fail_msg and result.payload is not None:
1140
      result.payload = objects.BlockDevStatus.FromDict(result.payload)
1141
    return result
1142

    
1143
  @_RpcTimeout(_TMO_NORMAL)
1144
  def call_blockdev_close(self, node, instance_name, disks):
1145
    """Closes the given block devices.
1146

1147
    This is a single-node call.
1148

1149
    """
1150
    params = [instance_name, [cf.ToDict() for cf in disks]]
1151
    return self._SingleNodeCall(node, "blockdev_close", params)
1152

    
1153
  @_RpcTimeout(_TMO_NORMAL)
1154
  def call_blockdev_getsize(self, node, disks):
1155
    """Returns the size of the given disks.
1156

1157
    This is a single-node call.
1158

1159
    """
1160
    params = [[cf.ToDict() for cf in disks]]
1161
    return self._SingleNodeCall(node, "blockdev_getsize", params)
1162

    
1163
  @_RpcTimeout(_TMO_NORMAL)
1164
  def call_drbd_disconnect_net(self, node_list, nodes_ip, disks):
1165
    """Disconnects the network of the given drbd devices.
1166

1167
    This is a multi-node call.
1168

1169
    """
1170
    return self._MultiNodeCall(node_list, "drbd_disconnect_net",
1171
                               [nodes_ip, [cf.ToDict() for cf in disks]])
1172

    
1173
  @_RpcTimeout(_TMO_NORMAL)
1174
  def call_drbd_attach_net(self, node_list, nodes_ip,
1175
                           disks, instance_name, multimaster):
1176
    """Disconnects the given drbd devices.
1177

1178
    This is a multi-node call.
1179

1180
    """
1181
    return self._MultiNodeCall(node_list, "drbd_attach_net",
1182
                               [nodes_ip, [cf.ToDict() for cf in disks],
1183
                                instance_name, multimaster])
1184

    
1185
  @_RpcTimeout(_TMO_SLOW)
1186
  def call_drbd_wait_sync(self, node_list, nodes_ip, disks):
1187
    """Waits for the synchronization of drbd devices is complete.
1188

1189
    This is a multi-node call.
1190

1191
    """
1192
    return self._MultiNodeCall(node_list, "drbd_wait_sync",
1193
                               [nodes_ip, [cf.ToDict() for cf in disks]])
1194

    
1195
  @_RpcTimeout(_TMO_URGENT)
1196
  def call_drbd_helper(self, node_list):
1197
    """Gets drbd helper.
1198

1199
    This is a multi-node call.
1200

1201
    """
1202
    return self._MultiNodeCall(node_list, "drbd_helper", [])
1203

    
1204
  @classmethod
1205
  @_RpcTimeout(_TMO_NORMAL)
1206
  def call_upload_file(cls, node_list, file_name, address_list=None):
1207
    """Upload a file.
1208

1209
    The node will refuse the operation in case the file is not on the
1210
    approved file list.
1211

1212
    This is a multi-node call.
1213

1214
    @type node_list: list
1215
    @param node_list: the list of node names to upload to
1216
    @type file_name: str
1217
    @param file_name: the filename to upload
1218
    @type address_list: list or None
1219
    @keyword address_list: an optional list of node addresses, in order
1220
        to optimize the RPC speed
1221

1222
    """
1223
    file_contents = utils.ReadFile(file_name)
1224
    data = _Compress(file_contents)
1225
    st = os.stat(file_name)
1226
    getents = runtime.GetEnts()
1227
    params = [file_name, data, st.st_mode, getents.LookupUid(st.st_uid),
1228
              getents.LookupGid(st.st_gid), st.st_atime, st.st_mtime]
1229
    return cls._StaticMultiNodeCall(node_list, "upload_file", params,
1230
                                    address_list=address_list)
1231

    
1232
  @classmethod
1233
  @_RpcTimeout(_TMO_NORMAL)
1234
  def call_write_ssconf_files(cls, node_list, values):
1235
    """Write ssconf files.
1236

1237
    This is a multi-node call.
1238

1239
    """
1240
    return cls._StaticMultiNodeCall(node_list, "write_ssconf_files", [values])
1241

    
1242
  @_RpcTimeout(_TMO_NORMAL)
1243
  def call_run_oob(self, node, oob_program, command, remote_node, timeout):
1244
    """Runs OOB.
1245

1246
    This is a single-node call.
1247

1248
    """
1249
    return self._SingleNodeCall(node, "run_oob", [oob_program, command,
1250
                                                  remote_node, timeout])
1251

    
1252
  @_RpcTimeout(_TMO_FAST)
1253
  def call_os_diagnose(self, node_list):
1254
    """Request a diagnose of OS definitions.
1255

1256
    This is a multi-node call.
1257

1258
    """
1259
    return self._MultiNodeCall(node_list, "os_diagnose", [])
1260

    
1261
  @_RpcTimeout(_TMO_FAST)
1262
  def call_os_get(self, node, name):
1263
    """Returns an OS definition.
1264

1265
    This is a single-node call.
1266

1267
    """
1268
    result = self._SingleNodeCall(node, "os_get", [name])
1269
    if not result.fail_msg and isinstance(result.payload, dict):
1270
      result.payload = objects.OS.FromDict(result.payload)
1271
    return result
1272

    
1273
  @_RpcTimeout(_TMO_FAST)
1274
  def call_os_validate(self, required, nodes, name, checks, params):
1275
    """Run a validation routine for a given OS.
1276

1277
    This is a multi-node call.
1278

1279
    """
1280
    return self._MultiNodeCall(nodes, "os_validate",
1281
                               [required, name, checks, params])
1282

    
1283
  @_RpcTimeout(_TMO_NORMAL)
1284
  def call_hooks_runner(self, node_list, hpath, phase, env):
1285
    """Call the hooks runner.
1286

1287
    Args:
1288
      - op: the OpCode instance
1289
      - env: a dictionary with the environment
1290

1291
    This is a multi-node call.
1292

1293
    """
1294
    params = [hpath, phase, env]
1295
    return self._MultiNodeCall(node_list, "hooks_runner", params)
1296

    
1297
  @_RpcTimeout(_TMO_NORMAL)
1298
  def call_iallocator_runner(self, node, name, idata):
1299
    """Call an iallocator on a remote node
1300

1301
    Args:
1302
      - name: the iallocator name
1303
      - input: the json-encoded input string
1304

1305
    This is a single-node call.
1306

1307
    """
1308
    return self._SingleNodeCall(node, "iallocator_runner", [name, idata])
1309

    
1310
  @_RpcTimeout(_TMO_NORMAL)
1311
  def call_blockdev_grow(self, node, cf_bdev, amount, dryrun):
1312
    """Request a snapshot of the given block device.
1313

1314
    This is a single-node call.
1315

1316
    """
1317
    return self._SingleNodeCall(node, "blockdev_grow",
1318
                                [cf_bdev.ToDict(), amount, dryrun])
1319

    
1320
  @_RpcTimeout(_TMO_1DAY)
1321
  def call_blockdev_export(self, node, cf_bdev,
1322
                           dest_node, dest_path, cluster_name):
1323
    """Export a given disk to another node.
1324

1325
    This is a single-node call.
1326

1327
    """
1328
    return self._SingleNodeCall(node, "blockdev_export",
1329
                                [cf_bdev.ToDict(), dest_node, dest_path,
1330
                                 cluster_name])
1331

    
1332
  @_RpcTimeout(_TMO_NORMAL)
1333
  def call_blockdev_snapshot(self, node, cf_bdev):
1334
    """Request a snapshot of the given block device.
1335

1336
    This is a single-node call.
1337

1338
    """
1339
    return self._SingleNodeCall(node, "blockdev_snapshot", [cf_bdev.ToDict()])
1340

    
1341
  @_RpcTimeout(_TMO_NORMAL)
1342
  def call_finalize_export(self, node, instance, snap_disks):
1343
    """Request the completion of an export operation.
1344

1345
    This writes the export config file, etc.
1346

1347
    This is a single-node call.
1348

1349
    """
1350
    flat_disks = []
1351
    for disk in snap_disks:
1352
      if isinstance(disk, bool):
1353
        flat_disks.append(disk)
1354
      else:
1355
        flat_disks.append(disk.ToDict())
1356

    
1357
    return self._SingleNodeCall(node, "finalize_export",
1358
                                [self._InstDict(instance), flat_disks])
1359

    
1360
  @_RpcTimeout(_TMO_FAST)
1361
  def call_export_info(self, node, path):
1362
    """Queries the export information in a given path.
1363

1364
    This is a single-node call.
1365

1366
    """
1367
    return self._SingleNodeCall(node, "export_info", [path])
1368

    
1369
  @_RpcTimeout(_TMO_FAST)
1370
  def call_export_list(self, node_list):
1371
    """Gets the stored exports list.
1372

1373
    This is a multi-node call.
1374

1375
    """
1376
    return self._MultiNodeCall(node_list, "export_list", [])
1377

    
1378
  @_RpcTimeout(_TMO_FAST)
1379
  def call_export_remove(self, node, export):
1380
    """Requests removal of a given export.
1381

1382
    This is a single-node call.
1383

1384
    """
1385
    return self._SingleNodeCall(node, "export_remove", [export])
1386

    
1387
  @classmethod
1388
  @_RpcTimeout(_TMO_NORMAL)
1389
  def call_node_leave_cluster(cls, node, modify_ssh_setup):
1390
    """Requests a node to clean the cluster information it has.
1391

1392
    This will remove the configuration information from the ganeti data
1393
    dir.
1394

1395
    This is a single-node call.
1396

1397
    """
1398
    return cls._StaticSingleNodeCall(node, "node_leave_cluster",
1399
                                     [modify_ssh_setup])
1400

    
1401
  @_RpcTimeout(_TMO_FAST)
1402
  def call_node_volumes(self, node_list):
1403
    """Gets all volumes on node(s).
1404

1405
    This is a multi-node call.
1406

1407
    """
1408
    return self._MultiNodeCall(node_list, "node_volumes", [])
1409

    
1410
  @_RpcTimeout(_TMO_FAST)
1411
  def call_node_demote_from_mc(self, node):
1412
    """Demote a node from the master candidate role.
1413

1414
    This is a single-node call.
1415

1416
    """
1417
    return self._SingleNodeCall(node, "node_demote_from_mc", [])
1418

    
1419
  @_RpcTimeout(_TMO_NORMAL)
1420
  def call_node_powercycle(self, node, hypervisor):
1421
    """Tries to powercycle a node.
1422

1423
    This is a single-node call.
1424

1425
    """
1426
    return self._SingleNodeCall(node, "node_powercycle", [hypervisor])
1427

    
1428
  @_RpcTimeout(None)
1429
  def call_test_delay(self, node_list, duration):
1430
    """Sleep for a fixed time on given node(s).
1431

1432
    This is a multi-node call.
1433

1434
    """
1435
    return self._MultiNodeCall(node_list, "test_delay", [duration],
1436
                               read_timeout=int(duration + 5))
1437

    
1438
  @_RpcTimeout(_TMO_FAST)
1439
  def call_file_storage_dir_create(self, node, file_storage_dir):
1440
    """Create the given file storage directory.
1441

1442
    This is a single-node call.
1443

1444
    """
1445
    return self._SingleNodeCall(node, "file_storage_dir_create",
1446
                                [file_storage_dir])
1447

    
1448
  @_RpcTimeout(_TMO_FAST)
1449
  def call_file_storage_dir_remove(self, node, file_storage_dir):
1450
    """Remove the given file storage directory.
1451

1452
    This is a single-node call.
1453

1454
    """
1455
    return self._SingleNodeCall(node, "file_storage_dir_remove",
1456
                                [file_storage_dir])
1457

    
1458
  @_RpcTimeout(_TMO_FAST)
1459
  def call_file_storage_dir_rename(self, node, old_file_storage_dir,
1460
                                   new_file_storage_dir):
1461
    """Rename file storage directory.
1462

1463
    This is a single-node call.
1464

1465
    """
1466
    return self._SingleNodeCall(node, "file_storage_dir_rename",
1467
                                [old_file_storage_dir, new_file_storage_dir])
1468

    
1469
  @classmethod
1470
  @_RpcTimeout(_TMO_URGENT)
1471
  def call_jobqueue_update(cls, node_list, address_list, file_name, content):
1472
    """Update job queue.
1473

1474
    This is a multi-node call.
1475

1476
    """
1477
    return cls._StaticMultiNodeCall(node_list, "jobqueue_update",
1478
                                    [file_name, _Compress(content)],
1479
                                    address_list=address_list)
1480

    
1481
  @classmethod
1482
  @_RpcTimeout(_TMO_NORMAL)
1483
  def call_jobqueue_purge(cls, node):
1484
    """Purge job queue.
1485

1486
    This is a single-node call.
1487

1488
    """
1489
    return cls._StaticSingleNodeCall(node, "jobqueue_purge", [])
1490

    
1491
  @classmethod
1492
  @_RpcTimeout(_TMO_URGENT)
1493
  def call_jobqueue_rename(cls, node_list, address_list, rename):
1494
    """Rename a job queue file.
1495

1496
    This is a multi-node call.
1497

1498
    """
1499
    return cls._StaticMultiNodeCall(node_list, "jobqueue_rename", rename,
1500
                                    address_list=address_list)
1501

    
1502
  @_RpcTimeout(_TMO_NORMAL)
1503
  def call_hypervisor_validate_params(self, node_list, hvname, hvparams):
1504
    """Validate the hypervisor params.
1505

1506
    This is a multi-node call.
1507

1508
    @type node_list: list
1509
    @param node_list: the list of nodes to query
1510
    @type hvname: string
1511
    @param hvname: the hypervisor name
1512
    @type hvparams: dict
1513
    @param hvparams: the hypervisor parameters to be validated
1514

1515
    """
1516
    cluster = self._cfg.GetClusterInfo()
1517
    hv_full = objects.FillDict(cluster.hvparams.get(hvname, {}), hvparams)
1518
    return self._MultiNodeCall(node_list, "hypervisor_validate_params",
1519
                               [hvname, hv_full])
1520

    
1521
  @_RpcTimeout(_TMO_NORMAL)
1522
  def call_x509_cert_create(self, node, validity):
1523
    """Creates a new X509 certificate for SSL/TLS.
1524

1525
    This is a single-node call.
1526

1527
    @type validity: int
1528
    @param validity: Validity in seconds
1529

1530
    """
1531
    return self._SingleNodeCall(node, "x509_cert_create", [validity])
1532

    
1533
  @_RpcTimeout(_TMO_NORMAL)
1534
  def call_x509_cert_remove(self, node, name):
1535
    """Removes a X509 certificate.
1536

1537
    This is a single-node call.
1538

1539
    @type name: string
1540
    @param name: Certificate name
1541

1542
    """
1543
    return self._SingleNodeCall(node, "x509_cert_remove", [name])
1544

    
1545
  @_RpcTimeout(_TMO_NORMAL)
1546
  def call_import_start(self, node, opts, instance, component,
1547
                        dest, dest_args):
1548
    """Starts a listener for an import.
1549

1550
    This is a single-node call.
1551

1552
    @type node: string
1553
    @param node: Node name
1554
    @type instance: C{objects.Instance}
1555
    @param instance: Instance object
1556
    @type component: string
1557
    @param component: which part of the instance is being imported
1558

1559
    """
1560
    return self._SingleNodeCall(node, "import_start",
1561
                                [opts.ToDict(),
1562
                                 self._InstDict(instance), component, dest,
1563
                                 _EncodeImportExportIO(dest, dest_args)])
1564

    
1565
  @_RpcTimeout(_TMO_NORMAL)
1566
  def call_export_start(self, node, opts, host, port,
1567
                        instance, component, source, source_args):
1568
    """Starts an export daemon.
1569

1570
    This is a single-node call.
1571

1572
    @type node: string
1573
    @param node: Node name
1574
    @type instance: C{objects.Instance}
1575
    @param instance: Instance object
1576
    @type component: string
1577
    @param component: which part of the instance is being imported
1578

1579
    """
1580
    return self._SingleNodeCall(node, "export_start",
1581
                                [opts.ToDict(), host, port,
1582
                                 self._InstDict(instance),
1583
                                 component, source,
1584
                                 _EncodeImportExportIO(source, source_args)])
1585

    
1586
  @_RpcTimeout(_TMO_FAST)
1587
  def call_impexp_status(self, node, names):
1588
    """Gets the status of an import or export.
1589

1590
    This is a single-node call.
1591

1592
    @type node: string
1593
    @param node: Node name
1594
    @type names: List of strings
1595
    @param names: Import/export names
1596
    @rtype: List of L{objects.ImportExportStatus} instances
1597
    @return: Returns a list of the state of each named import/export or None if
1598
             a status couldn't be retrieved
1599

1600
    """
1601
    result = self._SingleNodeCall(node, "impexp_status", [names])
1602

    
1603
    if not result.fail_msg:
1604
      decoded = []
1605

    
1606
      for i in result.payload:
1607
        if i is None:
1608
          decoded.append(None)
1609
          continue
1610
        decoded.append(objects.ImportExportStatus.FromDict(i))
1611

    
1612
      result.payload = decoded
1613

    
1614
    return result
1615

    
1616
  @_RpcTimeout(_TMO_NORMAL)
1617
  def call_impexp_abort(self, node, name):
1618
    """Aborts an import or export.
1619

1620
    This is a single-node call.
1621

1622
    @type node: string
1623
    @param node: Node name
1624
    @type name: string
1625
    @param name: Import/export name
1626

1627
    """
1628
    return self._SingleNodeCall(node, "impexp_abort", [name])
1629

    
1630
  @_RpcTimeout(_TMO_NORMAL)
1631
  def call_impexp_cleanup(self, node, name):
1632
    """Cleans up after an import or export.
1633

1634
    This is a single-node call.
1635

1636
    @type node: string
1637
    @param node: Node name
1638
    @type name: string
1639
    @param name: Import/export name
1640

1641
    """
1642
    return self._SingleNodeCall(node, "impexp_cleanup", [name])