Statistics
| Branch: | Tag: | Revision:

root / lib / rpc.py @ ee8fd7b7

History | View | Annotate | Download (47.3 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Inter-node RPC library.
23

24
"""
25

    
26
# pylint: disable=C0103,R0201,R0904
27
# C0103: Invalid name, since call_ are not valid
28
# R0201: Method could be a function, we keep all rpcs instance methods
29
# as not to change them back and forth between static/instance methods
30
# if they need to start using instance attributes
31
# R0904: Too many public methods
32

    
33
import os
34
import logging
35
import zlib
36
import base64
37
import pycurl
38
import threading
39

    
40
from ganeti import utils
41
from ganeti import objects
42
from ganeti import http
43
from ganeti import serializer
44
from ganeti import constants
45
from ganeti import errors
46
from ganeti import netutils
47
from ganeti import ssconf
48
from ganeti import runtime
49
from ganeti import compat
50

    
51
# pylint has a bug here, doesn't see this import
52
import ganeti.http.client  # pylint: disable=W0611
53

    
54

    
55
# Timeout for connecting to nodes (seconds)
56
_RPC_CONNECT_TIMEOUT = 5
57

    
58
_RPC_CLIENT_HEADERS = [
59
  "Content-type: %s" % http.HTTP_APP_JSON,
60
  "Expect:",
61
  ]
62

    
63
# Various time constants for the timeout table
64
_TMO_URGENT = 60 # one minute
65
_TMO_FAST = 5 * 60 # five minutes
66
_TMO_NORMAL = 15 * 60 # 15 minutes
67
_TMO_SLOW = 3600 # one hour
68
_TMO_4HRS = 4 * 3600
69
_TMO_1DAY = 86400
70

    
71
# Timeout table that will be built later by decorators
72
# Guidelines for choosing timeouts:
73
# - call used during watcher: timeout -> 1min, _TMO_URGENT
74
# - trivial (but be sure it is trivial) (e.g. reading a file): 5min, _TMO_FAST
75
# - other calls: 15 min, _TMO_NORMAL
76
# - special calls (instance add, etc.): either _TMO_SLOW (1h) or huge timeouts
77

    
78
_TIMEOUTS = {
79
}
80

    
81
#: Special value to describe an offline host
82
_OFFLINE = object()
83

    
84

    
85
def Init():
86
  """Initializes the module-global HTTP client manager.
87

88
  Must be called before using any RPC function and while exactly one thread is
89
  running.
90

91
  """
92
  # curl_global_init(3) and curl_global_cleanup(3) must be called with only
93
  # one thread running. This check is just a safety measure -- it doesn't
94
  # cover all cases.
95
  assert threading.activeCount() == 1, \
96
         "Found more than one active thread when initializing pycURL"
97

    
98
  logging.info("Using PycURL %s", pycurl.version)
99

    
100
  pycurl.global_init(pycurl.GLOBAL_ALL)
101

    
102

    
103
def Shutdown():
104
  """Stops the module-global HTTP client manager.
105

106
  Must be called before quitting the program and while exactly one thread is
107
  running.
108

109
  """
110
  pycurl.global_cleanup()
111

    
112

    
113
def _ConfigRpcCurl(curl):
114
  noded_cert = str(constants.NODED_CERT_FILE)
115

    
116
  curl.setopt(pycurl.FOLLOWLOCATION, False)
117
  curl.setopt(pycurl.CAINFO, noded_cert)
118
  curl.setopt(pycurl.SSL_VERIFYHOST, 0)
119
  curl.setopt(pycurl.SSL_VERIFYPEER, True)
120
  curl.setopt(pycurl.SSLCERTTYPE, "PEM")
121
  curl.setopt(pycurl.SSLCERT, noded_cert)
122
  curl.setopt(pycurl.SSLKEYTYPE, "PEM")
123
  curl.setopt(pycurl.SSLKEY, noded_cert)
124
  curl.setopt(pycurl.CONNECTTIMEOUT, _RPC_CONNECT_TIMEOUT)
125

    
126

    
127
def _RpcTimeout(secs):
128
  """Timeout decorator.
129

130
  When applied to a rpc call_* function, it updates the global timeout
131
  table with the given function/timeout.
132

133
  """
134
  def decorator(f):
135
    name = f.__name__
136
    assert name.startswith("call_")
137
    _TIMEOUTS[name[len("call_"):]] = secs
138
    return f
139
  return decorator
140

    
141

    
142
def RunWithRPC(fn):
143
  """RPC-wrapper decorator.
144

145
  When applied to a function, it runs it with the RPC system
146
  initialized, and it shutsdown the system afterwards. This means the
147
  function must be called without RPC being initialized.
148

149
  """
150
  def wrapper(*args, **kwargs):
151
    Init()
152
    try:
153
      return fn(*args, **kwargs)
154
    finally:
155
      Shutdown()
156
  return wrapper
157

    
158

    
159
def _Compress(data):
160
  """Compresses a string for transport over RPC.
161

162
  Small amounts of data are not compressed.
163

164
  @type data: str
165
  @param data: Data
166
  @rtype: tuple
167
  @return: Encoded data to send
168

169
  """
170
  # Small amounts of data are not compressed
171
  if len(data) < 512:
172
    return (constants.RPC_ENCODING_NONE, data)
173

    
174
  # Compress with zlib and encode in base64
175
  return (constants.RPC_ENCODING_ZLIB_BASE64,
176
          base64.b64encode(zlib.compress(data, 3)))
177

    
178

    
179
class RpcResult(object):
180
  """RPC Result class.
181

182
  This class holds an RPC result. It is needed since in multi-node
183
  calls we can't raise an exception just because one one out of many
184
  failed, and therefore we use this class to encapsulate the result.
185

186
  @ivar data: the data payload, for successful results, or None
187
  @ivar call: the name of the RPC call
188
  @ivar node: the name of the node to which we made the call
189
  @ivar offline: whether the operation failed because the node was
190
      offline, as opposed to actual failure; offline=True will always
191
      imply failed=True, in order to allow simpler checking if
192
      the user doesn't care about the exact failure mode
193
  @ivar fail_msg: the error message if the call failed
194

195
  """
196
  def __init__(self, data=None, failed=False, offline=False,
197
               call=None, node=None):
198
    self.offline = offline
199
    self.call = call
200
    self.node = node
201

    
202
    if offline:
203
      self.fail_msg = "Node is marked offline"
204
      self.data = self.payload = None
205
    elif failed:
206
      self.fail_msg = self._EnsureErr(data)
207
      self.data = self.payload = None
208
    else:
209
      self.data = data
210
      if not isinstance(self.data, (tuple, list)):
211
        self.fail_msg = ("RPC layer error: invalid result type (%s)" %
212
                         type(self.data))
213
        self.payload = None
214
      elif len(data) != 2:
215
        self.fail_msg = ("RPC layer error: invalid result length (%d), "
216
                         "expected 2" % len(self.data))
217
        self.payload = None
218
      elif not self.data[0]:
219
        self.fail_msg = self._EnsureErr(self.data[1])
220
        self.payload = None
221
      else:
222
        # finally success
223
        self.fail_msg = None
224
        self.payload = data[1]
225

    
226
    for attr_name in ["call", "data", "fail_msg",
227
                      "node", "offline", "payload"]:
228
      assert hasattr(self, attr_name), "Missing attribute %s" % attr_name
229

    
230
  @staticmethod
231
  def _EnsureErr(val):
232
    """Helper to ensure we return a 'True' value for error."""
233
    if val:
234
      return val
235
    else:
236
      return "No error information"
237

    
238
  def Raise(self, msg, prereq=False, ecode=None):
239
    """If the result has failed, raise an OpExecError.
240

241
    This is used so that LU code doesn't have to check for each
242
    result, but instead can call this function.
243

244
    """
245
    if not self.fail_msg:
246
      return
247

    
248
    if not msg: # one could pass None for default message
249
      msg = ("Call '%s' to node '%s' has failed: %s" %
250
             (self.call, self.node, self.fail_msg))
251
    else:
252
      msg = "%s: %s" % (msg, self.fail_msg)
253
    if prereq:
254
      ec = errors.OpPrereqError
255
    else:
256
      ec = errors.OpExecError
257
    if ecode is not None:
258
      args = (msg, ecode)
259
    else:
260
      args = (msg, )
261
    raise ec(*args) # pylint: disable=W0142
262

    
263

    
264
def _SsconfResolver(node_list,
265
                    ssc=ssconf.SimpleStore,
266
                    nslookup_fn=netutils.Hostname.GetIP):
267
  """Return addresses for given node names.
268

269
  @type node_list: list
270
  @param node_list: List of node names
271
  @type ssc: class
272
  @param ssc: SimpleStore class that is used to obtain node->ip mappings
273
  @type nslookup_fn: callable
274
  @param nslookup_fn: function use to do NS lookup
275
  @rtype: list of tuple; (string, string)
276
  @return: List of tuples containing node name and IP address
277

278
  """
279
  ss = ssc()
280
  iplist = ss.GetNodePrimaryIPList()
281
  family = ss.GetPrimaryIPFamily()
282
  ipmap = dict(entry.split() for entry in iplist)
283

    
284
  result = []
285
  for node in node_list:
286
    ip = ipmap.get(node)
287
    if ip is None:
288
      ip = nslookup_fn(node, family=family)
289
    result.append((node, ip))
290

    
291
  return result
292

    
293

    
294
class _StaticResolver:
295
  def __init__(self, addresses):
296
    """Initializes this class.
297

298
    """
299
    self._addresses = addresses
300

    
301
  def __call__(self, hosts):
302
    """Returns static addresses for hosts.
303

304
    """
305
    assert len(hosts) == len(self._addresses)
306
    return zip(hosts, self._addresses)
307

    
308

    
309
def _CheckConfigNode(name, node):
310
  """Checks if a node is online.
311

312
  @type name: string
313
  @param name: Node name
314
  @type node: L{objects.Node} or None
315
  @param node: Node object
316

317
  """
318
  if node is None:
319
    # Depend on DNS for name resolution
320
    ip = name
321
  elif node.offline:
322
    ip = _OFFLINE
323
  else:
324
    ip = node.primary_ip
325
  return (name, ip)
326

    
327

    
328
def _NodeConfigResolver(single_node_fn, all_nodes_fn, hosts):
329
  """Calculate node addresses using configuration.
330

331
  """
332
  # Special case for single-host lookups
333
  if len(hosts) == 1:
334
    (name, ) = hosts
335
    return [_CheckConfigNode(name, single_node_fn(name))]
336
  else:
337
    all_nodes = all_nodes_fn()
338
    return [_CheckConfigNode(name, all_nodes.get(name, None))
339
            for name in hosts]
340

    
341

    
342
class _RpcProcessor:
343
  def __init__(self, resolver, port, lock_monitor_cb=None):
344
    """Initializes this class.
345

346
    @param resolver: callable accepting a list of hostnames, returning a list
347
      of tuples containing name and IP address (IP address can be the name or
348
      the special value L{_OFFLINE} to mark offline machines)
349
    @type port: int
350
    @param port: TCP port
351
    @param lock_monitor_cb: Callable for registering with lock monitor
352

353
    """
354
    self._resolver = resolver
355
    self._port = port
356
    self._lock_monitor_cb = lock_monitor_cb
357

    
358
  @staticmethod
359
  def _PrepareRequests(hosts, port, procedure, body, read_timeout):
360
    """Prepares requests by sorting offline hosts into separate list.
361

362
    """
363
    results = {}
364
    requests = {}
365

    
366
    for (name, ip) in hosts:
367
      if ip is _OFFLINE:
368
        # Node is marked as offline
369
        results[name] = RpcResult(node=name, offline=True, call=procedure)
370
      else:
371
        requests[name] = \
372
          http.client.HttpClientRequest(str(ip), port,
373
                                        http.HTTP_PUT, str("/%s" % procedure),
374
                                        headers=_RPC_CLIENT_HEADERS,
375
                                        post_data=body,
376
                                        read_timeout=read_timeout,
377
                                        nicename="%s/%s" % (name, procedure),
378
                                        curl_config_fn=_ConfigRpcCurl)
379

    
380
    return (results, requests)
381

    
382
  @staticmethod
383
  def _CombineResults(results, requests, procedure):
384
    """Combines pre-computed results for offline hosts with actual call results.
385

386
    """
387
    for name, req in requests.items():
388
      if req.success and req.resp_status_code == http.HTTP_OK:
389
        host_result = RpcResult(data=serializer.LoadJson(req.resp_body),
390
                                node=name, call=procedure)
391
      else:
392
        # TODO: Better error reporting
393
        if req.error:
394
          msg = req.error
395
        else:
396
          msg = req.resp_body
397

    
398
        logging.error("RPC error in %s on node %s: %s", procedure, name, msg)
399
        host_result = RpcResult(data=msg, failed=True, node=name,
400
                                call=procedure)
401

    
402
      results[name] = host_result
403

    
404
    return results
405

    
406
  def __call__(self, hosts, procedure, body, read_timeout=None,
407
               _req_process_fn=http.client.ProcessRequests):
408
    """Makes an RPC request to a number of nodes.
409

410
    @type hosts: sequence
411
    @param hosts: Hostnames
412
    @type procedure: string
413
    @param procedure: Request path
414
    @type body: string
415
    @param body: Request body
416
    @type read_timeout: int or None
417
    @param read_timeout: Read timeout for request
418

419
    """
420
    assert procedure in _TIMEOUTS, "RPC call not declared in the timeouts table"
421

    
422
    if read_timeout is None:
423
      read_timeout = _TIMEOUTS[procedure]
424

    
425
    (results, requests) = \
426
      self._PrepareRequests(self._resolver(hosts), self._port, procedure,
427
                            str(body), read_timeout)
428

    
429
    _req_process_fn(requests.values(), lock_monitor_cb=self._lock_monitor_cb)
430

    
431
    assert not frozenset(results).intersection(requests)
432

    
433
    return self._CombineResults(results, requests, procedure)
434

    
435

    
436
def _EncodeImportExportIO(ieio, ieioargs):
437
  """Encodes import/export I/O information.
438

439
  """
440
  if ieio == constants.IEIO_RAW_DISK:
441
    assert len(ieioargs) == 1
442
    return (ieioargs[0].ToDict(), )
443

    
444
  if ieio == constants.IEIO_SCRIPT:
445
    assert len(ieioargs) == 2
446
    return (ieioargs[0].ToDict(), ieioargs[1])
447

    
448
  return ieioargs
449

    
450

    
451
class RpcRunner(object):
452
  """RPC runner class.
453

454
  """
455
  def __init__(self, context):
456
    """Initialized the RPC runner.
457

458
    @type context: C{masterd.GanetiContext}
459
    @param context: Ganeti context
460

461
    """
462
    self._cfg = context.cfg
463
    self._proc = _RpcProcessor(compat.partial(_NodeConfigResolver,
464
                                              self._cfg.GetNodeInfo,
465
                                              self._cfg.GetAllNodesInfo),
466
                               netutils.GetDaemonPort(constants.NODED),
467
                               lock_monitor_cb=context.glm.AddToLockMonitor)
468

    
469
  def _InstDict(self, instance, hvp=None, bep=None, osp=None):
470
    """Convert the given instance to a dict.
471

472
    This is done via the instance's ToDict() method and additionally
473
    we fill the hvparams with the cluster defaults.
474

475
    @type instance: L{objects.Instance}
476
    @param instance: an Instance object
477
    @type hvp: dict or None
478
    @param hvp: a dictionary with overridden hypervisor parameters
479
    @type bep: dict or None
480
    @param bep: a dictionary with overridden backend parameters
481
    @type osp: dict or None
482
    @param osp: a dictionary with overridden os parameters
483
    @rtype: dict
484
    @return: the instance dict, with the hvparams filled with the
485
        cluster defaults
486

487
    """
488
    idict = instance.ToDict()
489
    cluster = self._cfg.GetClusterInfo()
490
    idict["hvparams"] = cluster.FillHV(instance)
491
    if hvp is not None:
492
      idict["hvparams"].update(hvp)
493
    idict["beparams"] = cluster.FillBE(instance)
494
    if bep is not None:
495
      idict["beparams"].update(bep)
496
    idict["osparams"] = cluster.SimpleFillOS(instance.os, instance.osparams)
497
    if osp is not None:
498
      idict["osparams"].update(osp)
499
    for nic in idict["nics"]:
500
      nic['nicparams'] = objects.FillDict(
501
        cluster.nicparams[constants.PP_DEFAULT],
502
        nic['nicparams'])
503
    return idict
504

    
505
  def _MultiNodeCall(self, node_list, procedure, args, read_timeout=None):
506
    """Helper for making a multi-node call
507

508
    """
509
    body = serializer.DumpJson(args, indent=False)
510
    return self._proc(node_list, procedure, body, read_timeout=read_timeout)
511

    
512
  @staticmethod
513
  def _StaticMultiNodeCall(node_list, procedure, args,
514
                           address_list=None, read_timeout=None):
515
    """Helper for making a multi-node static call
516

517
    """
518
    body = serializer.DumpJson(args, indent=False)
519

    
520
    if address_list is None:
521
      resolver = _SsconfResolver
522
    else:
523
      # Caller provided an address list
524
      resolver = _StaticResolver(address_list)
525

    
526
    proc = _RpcProcessor(resolver,
527
                         netutils.GetDaemonPort(constants.NODED))
528
    return proc(node_list, procedure, body, read_timeout=read_timeout)
529

    
530
  def _SingleNodeCall(self, node, procedure, args, read_timeout=None):
531
    """Helper for making a single-node call
532

533
    """
534
    body = serializer.DumpJson(args, indent=False)
535
    return self._proc([node], procedure, body, read_timeout=read_timeout)[node]
536

    
537
  @classmethod
538
  def _StaticSingleNodeCall(cls, node, procedure, args, read_timeout=None):
539
    """Helper for making a single-node static call
540

541
    """
542
    body = serializer.DumpJson(args, indent=False)
543
    proc = _RpcProcessor(_SsconfResolver,
544
                         netutils.GetDaemonPort(constants.NODED))
545
    return proc([node], procedure, body, read_timeout=read_timeout)[node]
546

    
547
  #
548
  # Begin RPC calls
549
  #
550

    
551
  @_RpcTimeout(_TMO_URGENT)
552
  def call_bdev_sizes(self, node_list, devices):
553
    """Gets the sizes of requested block devices present on a node
554

555
    This is a multi-node call.
556

557
    """
558
    return self._MultiNodeCall(node_list, "bdev_sizes", [devices])
559

    
560
  @_RpcTimeout(_TMO_URGENT)
561
  def call_lv_list(self, node_list, vg_name):
562
    """Gets the logical volumes present in a given volume group.
563

564
    This is a multi-node call.
565

566
    """
567
    return self._MultiNodeCall(node_list, "lv_list", [vg_name])
568

    
569
  @_RpcTimeout(_TMO_URGENT)
570
  def call_vg_list(self, node_list):
571
    """Gets the volume group list.
572

573
    This is a multi-node call.
574

575
    """
576
    return self._MultiNodeCall(node_list, "vg_list", [])
577

    
578
  @_RpcTimeout(_TMO_NORMAL)
579
  def call_storage_list(self, node_list, su_name, su_args, name, fields):
580
    """Get list of storage units.
581

582
    This is a multi-node call.
583

584
    """
585
    return self._MultiNodeCall(node_list, "storage_list",
586
                               [su_name, su_args, name, fields])
587

    
588
  @_RpcTimeout(_TMO_NORMAL)
589
  def call_storage_modify(self, node, su_name, su_args, name, changes):
590
    """Modify a storage unit.
591

592
    This is a single-node call.
593

594
    """
595
    return self._SingleNodeCall(node, "storage_modify",
596
                                [su_name, su_args, name, changes])
597

    
598
  @_RpcTimeout(_TMO_NORMAL)
599
  def call_storage_execute(self, node, su_name, su_args, name, op):
600
    """Executes an operation on a storage unit.
601

602
    This is a single-node call.
603

604
    """
605
    return self._SingleNodeCall(node, "storage_execute",
606
                                [su_name, su_args, name, op])
607

    
608
  @_RpcTimeout(_TMO_URGENT)
609
  def call_bridges_exist(self, node, bridges_list):
610
    """Checks if a node has all the bridges given.
611

612
    This method checks if all bridges given in the bridges_list are
613
    present on the remote node, so that an instance that uses interfaces
614
    on those bridges can be started.
615

616
    This is a single-node call.
617

618
    """
619
    return self._SingleNodeCall(node, "bridges_exist", [bridges_list])
620

    
621
  @_RpcTimeout(_TMO_NORMAL)
622
  def call_instance_start(self, node, instance, hvp, bep, startup_paused):
623
    """Starts an instance.
624

625
    This is a single-node call.
626

627
    """
628
    idict = self._InstDict(instance, hvp=hvp, bep=bep)
629
    return self._SingleNodeCall(node, "instance_start", [idict, startup_paused])
630

    
631
  @_RpcTimeout(_TMO_NORMAL)
632
  def call_instance_shutdown(self, node, instance, timeout):
633
    """Stops an instance.
634

635
    This is a single-node call.
636

637
    """
638
    return self._SingleNodeCall(node, "instance_shutdown",
639
                                [self._InstDict(instance), timeout])
640

    
641
  @_RpcTimeout(_TMO_NORMAL)
642
  def call_migration_info(self, node, instance):
643
    """Gather the information necessary to prepare an instance migration.
644

645
    This is a single-node call.
646

647
    @type node: string
648
    @param node: the node on which the instance is currently running
649
    @type instance: C{objects.Instance}
650
    @param instance: the instance definition
651

652
    """
653
    return self._SingleNodeCall(node, "migration_info",
654
                                [self._InstDict(instance)])
655

    
656
  @_RpcTimeout(_TMO_NORMAL)
657
  def call_accept_instance(self, node, instance, info, target):
658
    """Prepare a node to accept an instance.
659

660
    This is a single-node call.
661

662
    @type node: string
663
    @param node: the target node for the migration
664
    @type instance: C{objects.Instance}
665
    @param instance: the instance definition
666
    @type info: opaque/hypervisor specific (string/data)
667
    @param info: result for the call_migration_info call
668
    @type target: string
669
    @param target: target hostname (usually ip address) (on the node itself)
670

671
    """
672
    return self._SingleNodeCall(node, "accept_instance",
673
                                [self._InstDict(instance), info, target])
674

    
675
  @_RpcTimeout(_TMO_NORMAL)
676
  def call_instance_finalize_migration_dst(self, node, instance, info, success):
677
    """Finalize any target-node migration specific operation.
678

679
    This is called both in case of a successful migration and in case of error
680
    (in which case it should abort the migration).
681

682
    This is a single-node call.
683

684
    @type node: string
685
    @param node: the target node for the migration
686
    @type instance: C{objects.Instance}
687
    @param instance: the instance definition
688
    @type info: opaque/hypervisor specific (string/data)
689
    @param info: result for the call_migration_info call
690
    @type success: boolean
691
    @param success: whether the migration was a success or a failure
692

693
    """
694
    return self._SingleNodeCall(node, "instance_finalize_migration_dst",
695
                                [self._InstDict(instance), info, success])
696

    
697
  @_RpcTimeout(_TMO_SLOW)
698
  def call_instance_migrate(self, node, instance, target, live):
699
    """Migrate an instance.
700

701
    This is a single-node call.
702

703
    @type node: string
704
    @param node: the node on which the instance is currently running
705
    @type instance: C{objects.Instance}
706
    @param instance: the instance definition
707
    @type target: string
708
    @param target: the target node name
709
    @type live: boolean
710
    @param live: whether the migration should be done live or not (the
711
        interpretation of this parameter is left to the hypervisor)
712

713
    """
714
    return self._SingleNodeCall(node, "instance_migrate",
715
                                [self._InstDict(instance), target, live])
716

    
717
  @_RpcTimeout(_TMO_SLOW)
718
  def call_instance_finalize_migration_src(self, node, instance, success, live):
719
    """Finalize the instance migration on the source node.
720

721
    This is a single-node call.
722

723
    @type instance: L{objects.Instance}
724
    @param instance: the instance that was migrated
725
    @type success: bool
726
    @param success: whether the migration succeeded or not
727
    @type live: bool
728
    @param live: whether the user requested a live migration or not
729

730
    """
731
    return self._SingleNodeCall(node, "instance_finalize_migration_src",
732
                                [self._InstDict(instance), success, live])
733

    
734
  @_RpcTimeout(_TMO_SLOW)
735
  def call_instance_get_migration_status(self, node, instance):
736
    """Report migration status.
737

738
    This is a single-node call that must be executed on the source node.
739

740
    @type instance: L{objects.Instance}
741
    @param instance: the instance that is being migrated
742
    @rtype: L{objects.MigrationStatus}
743
    @return: the status of the current migration (one of
744
             L{constants.HV_MIGRATION_VALID_STATUSES}), plus any additional
745
             progress info that can be retrieved from the hypervisor
746

747
    """
748
    result = self._SingleNodeCall(node, "instance_get_migration_status",
749
                                  [self._InstDict(instance)])
750
    if not result.fail_msg and result.payload is not None:
751
      result.payload = objects.MigrationStatus.FromDict(result.payload)
752
    return result
753

    
754
  @_RpcTimeout(_TMO_NORMAL)
755
  def call_instance_reboot(self, node, inst, reboot_type, shutdown_timeout):
756
    """Reboots an instance.
757

758
    This is a single-node call.
759

760
    """
761
    return self._SingleNodeCall(node, "instance_reboot",
762
                                [self._InstDict(inst), reboot_type,
763
                                 shutdown_timeout])
764

    
765
  @_RpcTimeout(_TMO_1DAY)
766
  def call_instance_os_add(self, node, inst, reinstall, debug, osparams=None):
767
    """Installs an OS on the given instance.
768

769
    This is a single-node call.
770

771
    """
772
    return self._SingleNodeCall(node, "instance_os_add",
773
                                [self._InstDict(inst, osp=osparams),
774
                                 reinstall, debug])
775

    
776
  @_RpcTimeout(_TMO_SLOW)
777
  def call_instance_run_rename(self, node, inst, old_name, debug):
778
    """Run the OS rename script for an instance.
779

780
    This is a single-node call.
781

782
    """
783
    return self._SingleNodeCall(node, "instance_run_rename",
784
                                [self._InstDict(inst), old_name, debug])
785

    
786
  @_RpcTimeout(_TMO_URGENT)
787
  def call_instance_info(self, node, instance, hname):
788
    """Returns information about a single instance.
789

790
    This is a single-node call.
791

792
    @type node: list
793
    @param node: the list of nodes to query
794
    @type instance: string
795
    @param instance: the instance name
796
    @type hname: string
797
    @param hname: the hypervisor type of the instance
798

799
    """
800
    return self._SingleNodeCall(node, "instance_info", [instance, hname])
801

    
802
  @_RpcTimeout(_TMO_NORMAL)
803
  def call_instance_migratable(self, node, instance):
804
    """Checks whether the given instance can be migrated.
805

806
    This is a single-node call.
807

808
    @param node: the node to query
809
    @type instance: L{objects.Instance}
810
    @param instance: the instance to check
811

812

813
    """
814
    return self._SingleNodeCall(node, "instance_migratable",
815
                                [self._InstDict(instance)])
816

    
817
  @_RpcTimeout(_TMO_URGENT)
818
  def call_all_instances_info(self, node_list, hypervisor_list):
819
    """Returns information about all instances on the given nodes.
820

821
    This is a multi-node call.
822

823
    @type node_list: list
824
    @param node_list: the list of nodes to query
825
    @type hypervisor_list: list
826
    @param hypervisor_list: the hypervisors to query for instances
827

828
    """
829
    return self._MultiNodeCall(node_list, "all_instances_info",
830
                               [hypervisor_list])
831

    
832
  @_RpcTimeout(_TMO_URGENT)
833
  def call_instance_list(self, node_list, hypervisor_list):
834
    """Returns the list of running instances on a given node.
835

836
    This is a multi-node call.
837

838
    @type node_list: list
839
    @param node_list: the list of nodes to query
840
    @type hypervisor_list: list
841
    @param hypervisor_list: the hypervisors to query for instances
842

843
    """
844
    return self._MultiNodeCall(node_list, "instance_list", [hypervisor_list])
845

    
846
  @_RpcTimeout(_TMO_FAST)
847
  def call_node_has_ip_address(self, node, address):
848
    """Checks if a node has the given IP address.
849

850
    This is a single-node call.
851

852
    """
853
    return self._SingleNodeCall(node, "node_has_ip_address", [address])
854

    
855
  @_RpcTimeout(_TMO_URGENT)
856
  def call_node_info(self, node_list, vg_name, hypervisor_type):
857
    """Return node information.
858

859
    This will return memory information and volume group size and free
860
    space.
861

862
    This is a multi-node call.
863

864
    @type node_list: list
865
    @param node_list: the list of nodes to query
866
    @type vg_name: C{string}
867
    @param vg_name: the name of the volume group to ask for disk space
868
        information
869
    @type hypervisor_type: C{str}
870
    @param hypervisor_type: the name of the hypervisor to ask for
871
        memory information
872

873
    """
874
    return self._MultiNodeCall(node_list, "node_info",
875
                               [vg_name, hypervisor_type])
876

    
877
  @_RpcTimeout(_TMO_NORMAL)
878
  def call_etc_hosts_modify(self, node, mode, name, ip):
879
    """Modify hosts file with name
880

881
    @type node: string
882
    @param node: The node to call
883
    @type mode: string
884
    @param mode: The mode to operate. Currently "add" or "remove"
885
    @type name: string
886
    @param name: The host name to be modified
887
    @type ip: string
888
    @param ip: The ip of the entry (just valid if mode is "add")
889

890
    """
891
    return self._SingleNodeCall(node, "etc_hosts_modify", [mode, name, ip])
892

    
893
  @_RpcTimeout(_TMO_NORMAL)
894
  def call_node_verify(self, node_list, checkdict, cluster_name):
895
    """Request verification of given parameters.
896

897
    This is a multi-node call.
898

899
    """
900
    return self._MultiNodeCall(node_list, "node_verify",
901
                               [checkdict, cluster_name])
902

    
903
  @classmethod
904
  @_RpcTimeout(_TMO_FAST)
905
  def call_node_start_master_daemons(cls, node, no_voting):
906
    """Starts master daemons on a node.
907

908
    This is a single-node call.
909

910
    """
911
    return cls._StaticSingleNodeCall(node, "node_start_master_daemons",
912
                                     [no_voting])
913

    
914
  @classmethod
915
  @_RpcTimeout(_TMO_FAST)
916
  def call_node_activate_master_ip(cls, node):
917
    """Activates master IP on a node.
918

919
    This is a single-node call.
920

921
    """
922
    return cls._StaticSingleNodeCall(node, "node_activate_master_ip", [])
923

    
924
  @classmethod
925
  @_RpcTimeout(_TMO_FAST)
926
  def call_node_stop_master(cls, node):
927
    """Deactivates master IP and stops master daemons on a node.
928

929
    This is a single-node call.
930

931
    """
932
    return cls._StaticSingleNodeCall(node, "node_stop_master", [])
933

    
934
  @classmethod
935
  @_RpcTimeout(_TMO_FAST)
936
  def call_node_deactivate_master_ip(cls, node):
937
    """Deactivates master IP on a node.
938

939
    This is a single-node call.
940

941
    """
942
    return cls._StaticSingleNodeCall(node, "node_deactivate_master_ip", [])
943

    
944
  @classmethod
945
  @_RpcTimeout(_TMO_FAST)
946
  def call_node_change_master_netmask(cls, node, netmask):
947
    """Change master IP netmask.
948

949
    This is a single-node call.
950

951
    """
952
    return cls._StaticSingleNodeCall(node, "node_change_master_netmask",
953
                  [netmask])
954

    
955
  @classmethod
956
  @_RpcTimeout(_TMO_URGENT)
957
  def call_master_info(cls, node_list):
958
    """Query master info.
959

960
    This is a multi-node call.
961

962
    """
963
    # TODO: should this method query down nodes?
964
    return cls._StaticMultiNodeCall(node_list, "master_info", [])
965

    
966
  @classmethod
967
  @_RpcTimeout(_TMO_URGENT)
968
  def call_version(cls, node_list):
969
    """Query node version.
970

971
    This is a multi-node call.
972

973
    """
974
    return cls._StaticMultiNodeCall(node_list, "version", [])
975

    
976
  @_RpcTimeout(_TMO_NORMAL)
977
  def call_blockdev_create(self, node, bdev, size, owner, on_primary, info):
978
    """Request creation of a given block device.
979

980
    This is a single-node call.
981

982
    """
983
    return self._SingleNodeCall(node, "blockdev_create",
984
                                [bdev.ToDict(), size, owner, on_primary, info])
985

    
986
  @_RpcTimeout(_TMO_SLOW)
987
  def call_blockdev_wipe(self, node, bdev, offset, size):
988
    """Request wipe at given offset with given size of a block device.
989

990
    This is a single-node call.
991

992
    """
993
    return self._SingleNodeCall(node, "blockdev_wipe",
994
                                [bdev.ToDict(), offset, size])
995

    
996
  @_RpcTimeout(_TMO_NORMAL)
997
  def call_blockdev_remove(self, node, bdev):
998
    """Request removal of a given block device.
999

1000
    This is a single-node call.
1001

1002
    """
1003
    return self._SingleNodeCall(node, "blockdev_remove", [bdev.ToDict()])
1004

    
1005
  @_RpcTimeout(_TMO_NORMAL)
1006
  def call_blockdev_rename(self, node, devlist):
1007
    """Request rename of the given block devices.
1008

1009
    This is a single-node call.
1010

1011
    """
1012
    return self._SingleNodeCall(node, "blockdev_rename",
1013
                                [(d.ToDict(), uid) for d, uid in devlist])
1014

    
1015
  @_RpcTimeout(_TMO_NORMAL)
1016
  def call_blockdev_pause_resume_sync(self, node, disks, pause):
1017
    """Request a pause/resume of given block device.
1018

1019
    This is a single-node call.
1020

1021
    """
1022
    return self._SingleNodeCall(node, "blockdev_pause_resume_sync",
1023
                                [[bdev.ToDict() for bdev in disks], pause])
1024

    
1025
  @_RpcTimeout(_TMO_NORMAL)
1026
  def call_blockdev_assemble(self, node, disk, owner, on_primary, idx):
1027
    """Request assembling of a given block device.
1028

1029
    This is a single-node call.
1030

1031
    """
1032
    return self._SingleNodeCall(node, "blockdev_assemble",
1033
                                [disk.ToDict(), owner, on_primary, idx])
1034

    
1035
  @_RpcTimeout(_TMO_NORMAL)
1036
  def call_blockdev_shutdown(self, node, disk):
1037
    """Request shutdown of a given block device.
1038

1039
    This is a single-node call.
1040

1041
    """
1042
    return self._SingleNodeCall(node, "blockdev_shutdown", [disk.ToDict()])
1043

    
1044
  @_RpcTimeout(_TMO_NORMAL)
1045
  def call_blockdev_addchildren(self, node, bdev, ndevs):
1046
    """Request adding a list of children to a (mirroring) device.
1047

1048
    This is a single-node call.
1049

1050
    """
1051
    return self._SingleNodeCall(node, "blockdev_addchildren",
1052
                                [bdev.ToDict(),
1053
                                 [disk.ToDict() for disk in ndevs]])
1054

    
1055
  @_RpcTimeout(_TMO_NORMAL)
1056
  def call_blockdev_removechildren(self, node, bdev, ndevs):
1057
    """Request removing a list of children from a (mirroring) device.
1058

1059
    This is a single-node call.
1060

1061
    """
1062
    return self._SingleNodeCall(node, "blockdev_removechildren",
1063
                                [bdev.ToDict(),
1064
                                 [disk.ToDict() for disk in ndevs]])
1065

    
1066
  @_RpcTimeout(_TMO_NORMAL)
1067
  def call_blockdev_getmirrorstatus(self, node, disks):
1068
    """Request status of a (mirroring) device.
1069

1070
    This is a single-node call.
1071

1072
    """
1073
    result = self._SingleNodeCall(node, "blockdev_getmirrorstatus",
1074
                                  [dsk.ToDict() for dsk in disks])
1075
    if not result.fail_msg:
1076
      result.payload = [objects.BlockDevStatus.FromDict(i)
1077
                        for i in result.payload]
1078
    return result
1079

    
1080
  @_RpcTimeout(_TMO_NORMAL)
1081
  def call_blockdev_getmirrorstatus_multi(self, node_list, node_disks):
1082
    """Request status of (mirroring) devices from multiple nodes.
1083

1084
    This is a multi-node call.
1085

1086
    """
1087
    result = self._MultiNodeCall(node_list, "blockdev_getmirrorstatus_multi",
1088
                                 [dict((name, [dsk.ToDict() for dsk in disks])
1089
                                       for name, disks in node_disks.items())])
1090
    for nres in result.values():
1091
      if nres.fail_msg:
1092
        continue
1093

    
1094
      for idx, (success, status) in enumerate(nres.payload):
1095
        if success:
1096
          nres.payload[idx] = (success, objects.BlockDevStatus.FromDict(status))
1097

    
1098
    return result
1099

    
1100
  @_RpcTimeout(_TMO_NORMAL)
1101
  def call_blockdev_find(self, node, disk):
1102
    """Request identification of a given block device.
1103

1104
    This is a single-node call.
1105

1106
    """
1107
    result = self._SingleNodeCall(node, "blockdev_find", [disk.ToDict()])
1108
    if not result.fail_msg and result.payload is not None:
1109
      result.payload = objects.BlockDevStatus.FromDict(result.payload)
1110
    return result
1111

    
1112
  @_RpcTimeout(_TMO_NORMAL)
1113
  def call_blockdev_close(self, node, instance_name, disks):
1114
    """Closes the given block devices.
1115

1116
    This is a single-node call.
1117

1118
    """
1119
    params = [instance_name, [cf.ToDict() for cf in disks]]
1120
    return self._SingleNodeCall(node, "blockdev_close", params)
1121

    
1122
  @_RpcTimeout(_TMO_NORMAL)
1123
  def call_blockdev_getsize(self, node, disks):
1124
    """Returns the size of the given disks.
1125

1126
    This is a single-node call.
1127

1128
    """
1129
    params = [[cf.ToDict() for cf in disks]]
1130
    return self._SingleNodeCall(node, "blockdev_getsize", params)
1131

    
1132
  @_RpcTimeout(_TMO_NORMAL)
1133
  def call_drbd_disconnect_net(self, node_list, nodes_ip, disks):
1134
    """Disconnects the network of the given drbd devices.
1135

1136
    This is a multi-node call.
1137

1138
    """
1139
    return self._MultiNodeCall(node_list, "drbd_disconnect_net",
1140
                               [nodes_ip, [cf.ToDict() for cf in disks]])
1141

    
1142
  @_RpcTimeout(_TMO_NORMAL)
1143
  def call_drbd_attach_net(self, node_list, nodes_ip,
1144
                           disks, instance_name, multimaster):
1145
    """Disconnects the given drbd devices.
1146

1147
    This is a multi-node call.
1148

1149
    """
1150
    return self._MultiNodeCall(node_list, "drbd_attach_net",
1151
                               [nodes_ip, [cf.ToDict() for cf in disks],
1152
                                instance_name, multimaster])
1153

    
1154
  @_RpcTimeout(_TMO_SLOW)
1155
  def call_drbd_wait_sync(self, node_list, nodes_ip, disks):
1156
    """Waits for the synchronization of drbd devices is complete.
1157

1158
    This is a multi-node call.
1159

1160
    """
1161
    return self._MultiNodeCall(node_list, "drbd_wait_sync",
1162
                               [nodes_ip, [cf.ToDict() for cf in disks]])
1163

    
1164
  @_RpcTimeout(_TMO_URGENT)
1165
  def call_drbd_helper(self, node_list):
1166
    """Gets drbd helper.
1167

1168
    This is a multi-node call.
1169

1170
    """
1171
    return self._MultiNodeCall(node_list, "drbd_helper", [])
1172

    
1173
  @classmethod
1174
  @_RpcTimeout(_TMO_NORMAL)
1175
  def call_upload_file(cls, node_list, file_name, address_list=None):
1176
    """Upload a file.
1177

1178
    The node will refuse the operation in case the file is not on the
1179
    approved file list.
1180

1181
    This is a multi-node call.
1182

1183
    @type node_list: list
1184
    @param node_list: the list of node names to upload to
1185
    @type file_name: str
1186
    @param file_name: the filename to upload
1187
    @type address_list: list or None
1188
    @keyword address_list: an optional list of node addresses, in order
1189
        to optimize the RPC speed
1190

1191
    """
1192
    file_contents = utils.ReadFile(file_name)
1193
    data = _Compress(file_contents)
1194
    st = os.stat(file_name)
1195
    getents = runtime.GetEnts()
1196
    params = [file_name, data, st.st_mode, getents.LookupUid(st.st_uid),
1197
              getents.LookupGid(st.st_gid), st.st_atime, st.st_mtime]
1198
    return cls._StaticMultiNodeCall(node_list, "upload_file", params,
1199
                                    address_list=address_list)
1200

    
1201
  @classmethod
1202
  @_RpcTimeout(_TMO_NORMAL)
1203
  def call_write_ssconf_files(cls, node_list, values):
1204
    """Write ssconf files.
1205

1206
    This is a multi-node call.
1207

1208
    """
1209
    return cls._StaticMultiNodeCall(node_list, "write_ssconf_files", [values])
1210

    
1211
  @_RpcTimeout(_TMO_NORMAL)
1212
  def call_run_oob(self, node, oob_program, command, remote_node, timeout):
1213
    """Runs OOB.
1214

1215
    This is a single-node call.
1216

1217
    """
1218
    return self._SingleNodeCall(node, "run_oob", [oob_program, command,
1219
                                                  remote_node, timeout])
1220

    
1221
  @_RpcTimeout(_TMO_FAST)
1222
  def call_os_diagnose(self, node_list):
1223
    """Request a diagnose of OS definitions.
1224

1225
    This is a multi-node call.
1226

1227
    """
1228
    return self._MultiNodeCall(node_list, "os_diagnose", [])
1229

    
1230
  @_RpcTimeout(_TMO_FAST)
1231
  def call_os_get(self, node, name):
1232
    """Returns an OS definition.
1233

1234
    This is a single-node call.
1235

1236
    """
1237
    result = self._SingleNodeCall(node, "os_get", [name])
1238
    if not result.fail_msg and isinstance(result.payload, dict):
1239
      result.payload = objects.OS.FromDict(result.payload)
1240
    return result
1241

    
1242
  @_RpcTimeout(_TMO_FAST)
1243
  def call_os_validate(self, required, nodes, name, checks, params):
1244
    """Run a validation routine for a given OS.
1245

1246
    This is a multi-node call.
1247

1248
    """
1249
    return self._MultiNodeCall(nodes, "os_validate",
1250
                               [required, name, checks, params])
1251

    
1252
  @_RpcTimeout(_TMO_NORMAL)
1253
  def call_hooks_runner(self, node_list, hpath, phase, env):
1254
    """Call the hooks runner.
1255

1256
    Args:
1257
      - op: the OpCode instance
1258
      - env: a dictionary with the environment
1259

1260
    This is a multi-node call.
1261

1262
    """
1263
    params = [hpath, phase, env]
1264
    return self._MultiNodeCall(node_list, "hooks_runner", params)
1265

    
1266
  @_RpcTimeout(_TMO_NORMAL)
1267
  def call_iallocator_runner(self, node, name, idata):
1268
    """Call an iallocator on a remote node
1269

1270
    Args:
1271
      - name: the iallocator name
1272
      - input: the json-encoded input string
1273

1274
    This is a single-node call.
1275

1276
    """
1277
    return self._SingleNodeCall(node, "iallocator_runner", [name, idata])
1278

    
1279
  @_RpcTimeout(_TMO_NORMAL)
1280
  def call_blockdev_grow(self, node, cf_bdev, amount, dryrun):
1281
    """Request a snapshot of the given block device.
1282

1283
    This is a single-node call.
1284

1285
    """
1286
    return self._SingleNodeCall(node, "blockdev_grow",
1287
                                [cf_bdev.ToDict(), amount, dryrun])
1288

    
1289
  @_RpcTimeout(_TMO_1DAY)
1290
  def call_blockdev_export(self, node, cf_bdev,
1291
                           dest_node, dest_path, cluster_name):
1292
    """Export a given disk to another node.
1293

1294
    This is a single-node call.
1295

1296
    """
1297
    return self._SingleNodeCall(node, "blockdev_export",
1298
                                [cf_bdev.ToDict(), dest_node, dest_path,
1299
                                 cluster_name])
1300

    
1301
  @_RpcTimeout(_TMO_NORMAL)
1302
  def call_blockdev_snapshot(self, node, cf_bdev):
1303
    """Request a snapshot of the given block device.
1304

1305
    This is a single-node call.
1306

1307
    """
1308
    return self._SingleNodeCall(node, "blockdev_snapshot", [cf_bdev.ToDict()])
1309

    
1310
  @_RpcTimeout(_TMO_NORMAL)
1311
  def call_finalize_export(self, node, instance, snap_disks):
1312
    """Request the completion of an export operation.
1313

1314
    This writes the export config file, etc.
1315

1316
    This is a single-node call.
1317

1318
    """
1319
    flat_disks = []
1320
    for disk in snap_disks:
1321
      if isinstance(disk, bool):
1322
        flat_disks.append(disk)
1323
      else:
1324
        flat_disks.append(disk.ToDict())
1325

    
1326
    return self._SingleNodeCall(node, "finalize_export",
1327
                                [self._InstDict(instance), flat_disks])
1328

    
1329
  @_RpcTimeout(_TMO_FAST)
1330
  def call_export_info(self, node, path):
1331
    """Queries the export information in a given path.
1332

1333
    This is a single-node call.
1334

1335
    """
1336
    return self._SingleNodeCall(node, "export_info", [path])
1337

    
1338
  @_RpcTimeout(_TMO_FAST)
1339
  def call_export_list(self, node_list):
1340
    """Gets the stored exports list.
1341

1342
    This is a multi-node call.
1343

1344
    """
1345
    return self._MultiNodeCall(node_list, "export_list", [])
1346

    
1347
  @_RpcTimeout(_TMO_FAST)
1348
  def call_export_remove(self, node, export):
1349
    """Requests removal of a given export.
1350

1351
    This is a single-node call.
1352

1353
    """
1354
    return self._SingleNodeCall(node, "export_remove", [export])
1355

    
1356
  @classmethod
1357
  @_RpcTimeout(_TMO_NORMAL)
1358
  def call_node_leave_cluster(cls, node, modify_ssh_setup):
1359
    """Requests a node to clean the cluster information it has.
1360

1361
    This will remove the configuration information from the ganeti data
1362
    dir.
1363

1364
    This is a single-node call.
1365

1366
    """
1367
    return cls._StaticSingleNodeCall(node, "node_leave_cluster",
1368
                                     [modify_ssh_setup])
1369

    
1370
  @_RpcTimeout(_TMO_FAST)
1371
  def call_node_volumes(self, node_list):
1372
    """Gets all volumes on node(s).
1373

1374
    This is a multi-node call.
1375

1376
    """
1377
    return self._MultiNodeCall(node_list, "node_volumes", [])
1378

    
1379
  @_RpcTimeout(_TMO_FAST)
1380
  def call_node_demote_from_mc(self, node):
1381
    """Demote a node from the master candidate role.
1382

1383
    This is a single-node call.
1384

1385
    """
1386
    return self._SingleNodeCall(node, "node_demote_from_mc", [])
1387

    
1388
  @_RpcTimeout(_TMO_NORMAL)
1389
  def call_node_powercycle(self, node, hypervisor):
1390
    """Tries to powercycle a node.
1391

1392
    This is a single-node call.
1393

1394
    """
1395
    return self._SingleNodeCall(node, "node_powercycle", [hypervisor])
1396

    
1397
  @_RpcTimeout(None)
1398
  def call_test_delay(self, node_list, duration):
1399
    """Sleep for a fixed time on given node(s).
1400

1401
    This is a multi-node call.
1402

1403
    """
1404
    return self._MultiNodeCall(node_list, "test_delay", [duration],
1405
                               read_timeout=int(duration + 5))
1406

    
1407
  @_RpcTimeout(_TMO_FAST)
1408
  def call_file_storage_dir_create(self, node, file_storage_dir):
1409
    """Create the given file storage directory.
1410

1411
    This is a single-node call.
1412

1413
    """
1414
    return self._SingleNodeCall(node, "file_storage_dir_create",
1415
                                [file_storage_dir])
1416

    
1417
  @_RpcTimeout(_TMO_FAST)
1418
  def call_file_storage_dir_remove(self, node, file_storage_dir):
1419
    """Remove the given file storage directory.
1420

1421
    This is a single-node call.
1422

1423
    """
1424
    return self._SingleNodeCall(node, "file_storage_dir_remove",
1425
                                [file_storage_dir])
1426

    
1427
  @_RpcTimeout(_TMO_FAST)
1428
  def call_file_storage_dir_rename(self, node, old_file_storage_dir,
1429
                                   new_file_storage_dir):
1430
    """Rename file storage directory.
1431

1432
    This is a single-node call.
1433

1434
    """
1435
    return self._SingleNodeCall(node, "file_storage_dir_rename",
1436
                                [old_file_storage_dir, new_file_storage_dir])
1437

    
1438
  @classmethod
1439
  @_RpcTimeout(_TMO_URGENT)
1440
  def call_jobqueue_update(cls, node_list, address_list, file_name, content):
1441
    """Update job queue.
1442

1443
    This is a multi-node call.
1444

1445
    """
1446
    return cls._StaticMultiNodeCall(node_list, "jobqueue_update",
1447
                                    [file_name, _Compress(content)],
1448
                                    address_list=address_list)
1449

    
1450
  @classmethod
1451
  @_RpcTimeout(_TMO_NORMAL)
1452
  def call_jobqueue_purge(cls, node):
1453
    """Purge job queue.
1454

1455
    This is a single-node call.
1456

1457
    """
1458
    return cls._StaticSingleNodeCall(node, "jobqueue_purge", [])
1459

    
1460
  @classmethod
1461
  @_RpcTimeout(_TMO_URGENT)
1462
  def call_jobqueue_rename(cls, node_list, address_list, rename):
1463
    """Rename a job queue file.
1464

1465
    This is a multi-node call.
1466

1467
    """
1468
    return cls._StaticMultiNodeCall(node_list, "jobqueue_rename", rename,
1469
                                    address_list=address_list)
1470

    
1471
  @_RpcTimeout(_TMO_NORMAL)
1472
  def call_hypervisor_validate_params(self, node_list, hvname, hvparams):
1473
    """Validate the hypervisor params.
1474

1475
    This is a multi-node call.
1476

1477
    @type node_list: list
1478
    @param node_list: the list of nodes to query
1479
    @type hvname: string
1480
    @param hvname: the hypervisor name
1481
    @type hvparams: dict
1482
    @param hvparams: the hypervisor parameters to be validated
1483

1484
    """
1485
    cluster = self._cfg.GetClusterInfo()
1486
    hv_full = objects.FillDict(cluster.hvparams.get(hvname, {}), hvparams)
1487
    return self._MultiNodeCall(node_list, "hypervisor_validate_params",
1488
                               [hvname, hv_full])
1489

    
1490
  @_RpcTimeout(_TMO_NORMAL)
1491
  def call_x509_cert_create(self, node, validity):
1492
    """Creates a new X509 certificate for SSL/TLS.
1493

1494
    This is a single-node call.
1495

1496
    @type validity: int
1497
    @param validity: Validity in seconds
1498

1499
    """
1500
    return self._SingleNodeCall(node, "x509_cert_create", [validity])
1501

    
1502
  @_RpcTimeout(_TMO_NORMAL)
1503
  def call_x509_cert_remove(self, node, name):
1504
    """Removes a X509 certificate.
1505

1506
    This is a single-node call.
1507

1508
    @type name: string
1509
    @param name: Certificate name
1510

1511
    """
1512
    return self._SingleNodeCall(node, "x509_cert_remove", [name])
1513

    
1514
  @_RpcTimeout(_TMO_NORMAL)
1515
  def call_import_start(self, node, opts, instance, component,
1516
                        dest, dest_args):
1517
    """Starts a listener for an import.
1518

1519
    This is a single-node call.
1520

1521
    @type node: string
1522
    @param node: Node name
1523
    @type instance: C{objects.Instance}
1524
    @param instance: Instance object
1525
    @type component: string
1526
    @param component: which part of the instance is being imported
1527

1528
    """
1529
    return self._SingleNodeCall(node, "import_start",
1530
                                [opts.ToDict(),
1531
                                 self._InstDict(instance), component, dest,
1532
                                 _EncodeImportExportIO(dest, dest_args)])
1533

    
1534
  @_RpcTimeout(_TMO_NORMAL)
1535
  def call_export_start(self, node, opts, host, port,
1536
                        instance, component, source, source_args):
1537
    """Starts an export daemon.
1538

1539
    This is a single-node call.
1540

1541
    @type node: string
1542
    @param node: Node name
1543
    @type instance: C{objects.Instance}
1544
    @param instance: Instance object
1545
    @type component: string
1546
    @param component: which part of the instance is being imported
1547

1548
    """
1549
    return self._SingleNodeCall(node, "export_start",
1550
                                [opts.ToDict(), host, port,
1551
                                 self._InstDict(instance),
1552
                                 component, source,
1553
                                 _EncodeImportExportIO(source, source_args)])
1554

    
1555
  @_RpcTimeout(_TMO_FAST)
1556
  def call_impexp_status(self, node, names):
1557
    """Gets the status of an import or export.
1558

1559
    This is a single-node call.
1560

1561
    @type node: string
1562
    @param node: Node name
1563
    @type names: List of strings
1564
    @param names: Import/export names
1565
    @rtype: List of L{objects.ImportExportStatus} instances
1566
    @return: Returns a list of the state of each named import/export or None if
1567
             a status couldn't be retrieved
1568

1569
    """
1570
    result = self._SingleNodeCall(node, "impexp_status", [names])
1571

    
1572
    if not result.fail_msg:
1573
      decoded = []
1574

    
1575
      for i in result.payload:
1576
        if i is None:
1577
          decoded.append(None)
1578
          continue
1579
        decoded.append(objects.ImportExportStatus.FromDict(i))
1580

    
1581
      result.payload = decoded
1582

    
1583
    return result
1584

    
1585
  @_RpcTimeout(_TMO_NORMAL)
1586
  def call_impexp_abort(self, node, name):
1587
    """Aborts an import or export.
1588

1589
    This is a single-node call.
1590

1591
    @type node: string
1592
    @param node: Node name
1593
    @type name: string
1594
    @param name: Import/export name
1595

1596
    """
1597
    return self._SingleNodeCall(node, "impexp_abort", [name])
1598

    
1599
  @_RpcTimeout(_TMO_NORMAL)
1600
  def call_impexp_cleanup(self, node, name):
1601
    """Cleans up after an import or export.
1602

1603
    This is a single-node call.
1604

1605
    @type node: string
1606
    @param node: Node name
1607
    @type name: string
1608
    @param name: Import/export name
1609

1610
    """
1611
    return self._SingleNodeCall(node, "impexp_cleanup", [name])