Statistics
| Branch: | Tag: | Revision:

root / lib / rpc.py @ 83e7af18

History | View | Annotate | Download (47.4 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Inter-node RPC library.
23

24
"""
25

    
26
# pylint: disable=C0103,R0201,R0904
27
# C0103: Invalid name, since call_ are not valid
28
# R0201: Method could be a function, we keep all rpcs instance methods
29
# as not to change them back and forth between static/instance methods
30
# if they need to start using instance attributes
31
# R0904: Too many public methods
32

    
33
import os
34
import logging
35
import zlib
36
import base64
37
import pycurl
38
import threading
39

    
40
from ganeti import utils
41
from ganeti import objects
42
from ganeti import http
43
from ganeti import serializer
44
from ganeti import constants
45
from ganeti import errors
46
from ganeti import netutils
47
from ganeti import ssconf
48
from ganeti import runtime
49
from ganeti import compat
50

    
51
# pylint has a bug here, doesn't see this import
52
import ganeti.http.client  # pylint: disable=W0611
53

    
54

    
55
# Timeout for connecting to nodes (seconds)
56
_RPC_CONNECT_TIMEOUT = 5
57

    
58
_RPC_CLIENT_HEADERS = [
59
  "Content-type: %s" % http.HTTP_APP_JSON,
60
  "Expect:",
61
  ]
62

    
63
# Various time constants for the timeout table
64
_TMO_URGENT = 60 # one minute
65
_TMO_FAST = 5 * 60 # five minutes
66
_TMO_NORMAL = 15 * 60 # 15 minutes
67
_TMO_SLOW = 3600 # one hour
68
_TMO_4HRS = 4 * 3600
69
_TMO_1DAY = 86400
70

    
71
# Timeout table that will be built later by decorators
72
# Guidelines for choosing timeouts:
73
# - call used during watcher: timeout -> 1min, _TMO_URGENT
74
# - trivial (but be sure it is trivial) (e.g. reading a file): 5min, _TMO_FAST
75
# - other calls: 15 min, _TMO_NORMAL
76
# - special calls (instance add, etc.): either _TMO_SLOW (1h) or huge timeouts
77

    
78
_TIMEOUTS = {
79
}
80

    
81
#: Special value to describe an offline host
82
_OFFLINE = object()
83

    
84

    
85
def Init():
86
  """Initializes the module-global HTTP client manager.
87

88
  Must be called before using any RPC function and while exactly one thread is
89
  running.
90

91
  """
92
  # curl_global_init(3) and curl_global_cleanup(3) must be called with only
93
  # one thread running. This check is just a safety measure -- it doesn't
94
  # cover all cases.
95
  assert threading.activeCount() == 1, \
96
         "Found more than one active thread when initializing pycURL"
97

    
98
  logging.info("Using PycURL %s", pycurl.version)
99

    
100
  pycurl.global_init(pycurl.GLOBAL_ALL)
101

    
102

    
103
def Shutdown():
104
  """Stops the module-global HTTP client manager.
105

106
  Must be called before quitting the program and while exactly one thread is
107
  running.
108

109
  """
110
  pycurl.global_cleanup()
111

    
112

    
113
def _ConfigRpcCurl(curl):
114
  noded_cert = str(constants.NODED_CERT_FILE)
115

    
116
  curl.setopt(pycurl.FOLLOWLOCATION, False)
117
  curl.setopt(pycurl.CAINFO, noded_cert)
118
  curl.setopt(pycurl.SSL_VERIFYHOST, 0)
119
  curl.setopt(pycurl.SSL_VERIFYPEER, True)
120
  curl.setopt(pycurl.SSLCERTTYPE, "PEM")
121
  curl.setopt(pycurl.SSLCERT, noded_cert)
122
  curl.setopt(pycurl.SSLKEYTYPE, "PEM")
123
  curl.setopt(pycurl.SSLKEY, noded_cert)
124
  curl.setopt(pycurl.CONNECTTIMEOUT, _RPC_CONNECT_TIMEOUT)
125

    
126

    
127
def _RpcTimeout(secs):
128
  """Timeout decorator.
129

130
  When applied to a rpc call_* function, it updates the global timeout
131
  table with the given function/timeout.
132

133
  """
134
  def decorator(f):
135
    name = f.__name__
136
    assert name.startswith("call_")
137
    _TIMEOUTS[name[len("call_"):]] = secs
138
    return f
139
  return decorator
140

    
141

    
142
def RunWithRPC(fn):
143
  """RPC-wrapper decorator.
144

145
  When applied to a function, it runs it with the RPC system
146
  initialized, and it shutsdown the system afterwards. This means the
147
  function must be called without RPC being initialized.
148

149
  """
150
  def wrapper(*args, **kwargs):
151
    Init()
152
    try:
153
      return fn(*args, **kwargs)
154
    finally:
155
      Shutdown()
156
  return wrapper
157

    
158

    
159
def _Compress(data):
160
  """Compresses a string for transport over RPC.
161

162
  Small amounts of data are not compressed.
163

164
  @type data: str
165
  @param data: Data
166
  @rtype: tuple
167
  @return: Encoded data to send
168

169
  """
170
  # Small amounts of data are not compressed
171
  if len(data) < 512:
172
    return (constants.RPC_ENCODING_NONE, data)
173

    
174
  # Compress with zlib and encode in base64
175
  return (constants.RPC_ENCODING_ZLIB_BASE64,
176
          base64.b64encode(zlib.compress(data, 3)))
177

    
178

    
179
class RpcResult(object):
180
  """RPC Result class.
181

182
  This class holds an RPC result. It is needed since in multi-node
183
  calls we can't raise an exception just because one one out of many
184
  failed, and therefore we use this class to encapsulate the result.
185

186
  @ivar data: the data payload, for successful results, or None
187
  @ivar call: the name of the RPC call
188
  @ivar node: the name of the node to which we made the call
189
  @ivar offline: whether the operation failed because the node was
190
      offline, as opposed to actual failure; offline=True will always
191
      imply failed=True, in order to allow simpler checking if
192
      the user doesn't care about the exact failure mode
193
  @ivar fail_msg: the error message if the call failed
194

195
  """
196
  def __init__(self, data=None, failed=False, offline=False,
197
               call=None, node=None):
198
    self.offline = offline
199
    self.call = call
200
    self.node = node
201

    
202
    if offline:
203
      self.fail_msg = "Node is marked offline"
204
      self.data = self.payload = None
205
    elif failed:
206
      self.fail_msg = self._EnsureErr(data)
207
      self.data = self.payload = None
208
    else:
209
      self.data = data
210
      if not isinstance(self.data, (tuple, list)):
211
        self.fail_msg = ("RPC layer error: invalid result type (%s)" %
212
                         type(self.data))
213
        self.payload = None
214
      elif len(data) != 2:
215
        self.fail_msg = ("RPC layer error: invalid result length (%d), "
216
                         "expected 2" % len(self.data))
217
        self.payload = None
218
      elif not self.data[0]:
219
        self.fail_msg = self._EnsureErr(self.data[1])
220
        self.payload = None
221
      else:
222
        # finally success
223
        self.fail_msg = None
224
        self.payload = data[1]
225

    
226
    for attr_name in ["call", "data", "fail_msg",
227
                      "node", "offline", "payload"]:
228
      assert hasattr(self, attr_name), "Missing attribute %s" % attr_name
229

    
230
  @staticmethod
231
  def _EnsureErr(val):
232
    """Helper to ensure we return a 'True' value for error."""
233
    if val:
234
      return val
235
    else:
236
      return "No error information"
237

    
238
  def Raise(self, msg, prereq=False, ecode=None):
239
    """If the result has failed, raise an OpExecError.
240

241
    This is used so that LU code doesn't have to check for each
242
    result, but instead can call this function.
243

244
    """
245
    if not self.fail_msg:
246
      return
247

    
248
    if not msg: # one could pass None for default message
249
      msg = ("Call '%s' to node '%s' has failed: %s" %
250
             (self.call, self.node, self.fail_msg))
251
    else:
252
      msg = "%s: %s" % (msg, self.fail_msg)
253
    if prereq:
254
      ec = errors.OpPrereqError
255
    else:
256
      ec = errors.OpExecError
257
    if ecode is not None:
258
      args = (msg, ecode)
259
    else:
260
      args = (msg, )
261
    raise ec(*args) # pylint: disable=W0142
262

    
263

    
264
def _SsconfResolver(node_list,
265
                    ssc=ssconf.SimpleStore,
266
                    nslookup_fn=netutils.Hostname.GetIP):
267
  """Return addresses for given node names.
268

269
  @type node_list: list
270
  @param node_list: List of node names
271
  @type ssc: class
272
  @param ssc: SimpleStore class that is used to obtain node->ip mappings
273
  @type nslookup_fn: callable
274
  @param nslookup_fn: function use to do NS lookup
275
  @rtype: list of tuple; (string, string)
276
  @return: List of tuples containing node name and IP address
277

278
  """
279
  ss = ssc()
280
  iplist = ss.GetNodePrimaryIPList()
281
  family = ss.GetPrimaryIPFamily()
282
  ipmap = dict(entry.split() for entry in iplist)
283

    
284
  result = []
285
  for node in node_list:
286
    ip = ipmap.get(node)
287
    if ip is None:
288
      ip = nslookup_fn(node, family=family)
289
    result.append((node, ip))
290

    
291
  return result
292

    
293

    
294
class _StaticResolver:
295
  def __init__(self, addresses):
296
    """Initializes this class.
297

298
    """
299
    self._addresses = addresses
300

    
301
  def __call__(self, hosts):
302
    """Returns static addresses for hosts.
303

304
    """
305
    assert len(hosts) == len(self._addresses)
306
    return zip(hosts, self._addresses)
307

    
308

    
309
def _CheckConfigNode(name, node):
310
  """Checks if a node is online.
311

312
  @type name: string
313
  @param name: Node name
314
  @type node: L{objects.Node} or None
315
  @param node: Node object
316

317
  """
318
  if node is None:
319
    # Depend on DNS for name resolution
320
    ip = name
321
  elif node.offline:
322
    ip = _OFFLINE
323
  else:
324
    ip = node.primary_ip
325
  return (name, ip)
326

    
327

    
328
def _NodeConfigResolver(single_node_fn, all_nodes_fn, hosts):
329
  """Calculate node addresses using configuration.
330

331
  """
332
  # Special case for single-host lookups
333
  if len(hosts) == 1:
334
    (name, ) = hosts
335
    return [_CheckConfigNode(name, single_node_fn(name))]
336
  else:
337
    all_nodes = all_nodes_fn()
338
    return [_CheckConfigNode(name, all_nodes.get(name, None))
339
            for name in hosts]
340

    
341

    
342
class _RpcProcessor:
343
  def __init__(self, resolver, port, lock_monitor_cb=None):
344
    """Initializes this class.
345

346
    @param resolver: callable accepting a list of hostnames, returning a list
347
      of tuples containing name and IP address (IP address can be the name or
348
      the special value L{_OFFLINE} to mark offline machines)
349
    @type port: int
350
    @param port: TCP port
351
    @param lock_monitor_cb: Callable for registering with lock monitor
352

353
    """
354
    self._resolver = resolver
355
    self._port = port
356
    self._lock_monitor_cb = lock_monitor_cb
357

    
358
  @staticmethod
359
  def _PrepareRequests(hosts, port, procedure, body, read_timeout):
360
    """Prepares requests by sorting offline hosts into separate list.
361

362
    """
363
    results = {}
364
    requests = {}
365

    
366
    for (name, ip) in hosts:
367
      if ip is _OFFLINE:
368
        # Node is marked as offline
369
        results[name] = RpcResult(node=name, offline=True, call=procedure)
370
      else:
371
        requests[name] = \
372
          http.client.HttpClientRequest(str(ip), port,
373
                                        http.HTTP_PUT, str("/%s" % procedure),
374
                                        headers=_RPC_CLIENT_HEADERS,
375
                                        post_data=body,
376
                                        read_timeout=read_timeout,
377
                                        nicename="%s/%s" % (name, procedure),
378
                                        curl_config_fn=_ConfigRpcCurl)
379

    
380
    return (results, requests)
381

    
382
  @staticmethod
383
  def _CombineResults(results, requests, procedure):
384
    """Combines pre-computed results for offline hosts with actual call results.
385

386
    """
387
    for name, req in requests.items():
388
      if req.success and req.resp_status_code == http.HTTP_OK:
389
        host_result = RpcResult(data=serializer.LoadJson(req.resp_body),
390
                                node=name, call=procedure)
391
      else:
392
        # TODO: Better error reporting
393
        if req.error:
394
          msg = req.error
395
        else:
396
          msg = req.resp_body
397

    
398
        logging.error("RPC error in %s on node %s: %s", procedure, name, msg)
399
        host_result = RpcResult(data=msg, failed=True, node=name,
400
                                call=procedure)
401

    
402
      results[name] = host_result
403

    
404
    return results
405

    
406
  def __call__(self, hosts, procedure, body, read_timeout=None,
407
               _req_process_fn=http.client.ProcessRequests):
408
    """Makes an RPC request to a number of nodes.
409

410
    @type hosts: sequence
411
    @param hosts: Hostnames
412
    @type procedure: string
413
    @param procedure: Request path
414
    @type body: string
415
    @param body: Request body
416
    @type read_timeout: int or None
417
    @param read_timeout: Read timeout for request
418

419
    """
420
    if read_timeout is None:
421
      read_timeout = _TIMEOUTS.get(procedure, None)
422

    
423
    assert read_timeout is not None, \
424
      "Missing RPC read timeout for procedure '%s'" % procedure
425

    
426
    (results, requests) = \
427
      self._PrepareRequests(self._resolver(hosts), self._port, procedure,
428
                            str(body), read_timeout)
429

    
430
    _req_process_fn(requests.values(), lock_monitor_cb=self._lock_monitor_cb)
431

    
432
    assert not frozenset(results).intersection(requests)
433

    
434
    return self._CombineResults(results, requests, procedure)
435

    
436

    
437
def _EncodeImportExportIO(ieio, ieioargs):
438
  """Encodes import/export I/O information.
439

440
  """
441
  if ieio == constants.IEIO_RAW_DISK:
442
    assert len(ieioargs) == 1
443
    return (ieioargs[0].ToDict(), )
444

    
445
  if ieio == constants.IEIO_SCRIPT:
446
    assert len(ieioargs) == 2
447
    return (ieioargs[0].ToDict(), ieioargs[1])
448

    
449
  return ieioargs
450

    
451

    
452
class RpcRunner(object):
453
  """RPC runner class.
454

455
  """
456
  def __init__(self, context):
457
    """Initialized the RPC runner.
458

459
    @type context: C{masterd.GanetiContext}
460
    @param context: Ganeti context
461

462
    """
463
    self._cfg = context.cfg
464
    self._proc = _RpcProcessor(compat.partial(_NodeConfigResolver,
465
                                              self._cfg.GetNodeInfo,
466
                                              self._cfg.GetAllNodesInfo),
467
                               netutils.GetDaemonPort(constants.NODED),
468
                               lock_monitor_cb=context.glm.AddToLockMonitor)
469

    
470
  def _InstDict(self, instance, hvp=None, bep=None, osp=None):
471
    """Convert the given instance to a dict.
472

473
    This is done via the instance's ToDict() method and additionally
474
    we fill the hvparams with the cluster defaults.
475

476
    @type instance: L{objects.Instance}
477
    @param instance: an Instance object
478
    @type hvp: dict or None
479
    @param hvp: a dictionary with overridden hypervisor parameters
480
    @type bep: dict or None
481
    @param bep: a dictionary with overridden backend parameters
482
    @type osp: dict or None
483
    @param osp: a dictionary with overridden os parameters
484
    @rtype: dict
485
    @return: the instance dict, with the hvparams filled with the
486
        cluster defaults
487

488
    """
489
    idict = instance.ToDict()
490
    cluster = self._cfg.GetClusterInfo()
491
    idict["hvparams"] = cluster.FillHV(instance)
492
    if hvp is not None:
493
      idict["hvparams"].update(hvp)
494
    idict["beparams"] = cluster.FillBE(instance)
495
    if bep is not None:
496
      idict["beparams"].update(bep)
497
    idict["osparams"] = cluster.SimpleFillOS(instance.os, instance.osparams)
498
    if osp is not None:
499
      idict["osparams"].update(osp)
500
    for nic in idict["nics"]:
501
      nic['nicparams'] = objects.FillDict(
502
        cluster.nicparams[constants.PP_DEFAULT],
503
        nic['nicparams'])
504
    return idict
505

    
506
  def _MultiNodeCall(self, node_list, procedure, args, read_timeout=None):
507
    """Helper for making a multi-node call
508

509
    """
510
    body = serializer.DumpJson(args, indent=False)
511
    return self._proc(node_list, procedure, body, read_timeout=read_timeout)
512

    
513
  @staticmethod
514
  def _StaticMultiNodeCall(node_list, procedure, args,
515
                           address_list=None, read_timeout=None):
516
    """Helper for making a multi-node static call
517

518
    """
519
    body = serializer.DumpJson(args, indent=False)
520

    
521
    if address_list is None:
522
      resolver = _SsconfResolver
523
    else:
524
      # Caller provided an address list
525
      resolver = _StaticResolver(address_list)
526

    
527
    proc = _RpcProcessor(resolver,
528
                         netutils.GetDaemonPort(constants.NODED))
529
    return proc(node_list, procedure, body, read_timeout=read_timeout)
530

    
531
  def _SingleNodeCall(self, node, procedure, args, read_timeout=None):
532
    """Helper for making a single-node call
533

534
    """
535
    body = serializer.DumpJson(args, indent=False)
536
    return self._proc([node], procedure, body, read_timeout=read_timeout)[node]
537

    
538
  @classmethod
539
  def _StaticSingleNodeCall(cls, node, procedure, args, read_timeout=None):
540
    """Helper for making a single-node static call
541

542
    """
543
    body = serializer.DumpJson(args, indent=False)
544
    proc = _RpcProcessor(_SsconfResolver,
545
                         netutils.GetDaemonPort(constants.NODED))
546
    return proc([node], procedure, body, read_timeout=read_timeout)[node]
547

    
548
  #
549
  # Begin RPC calls
550
  #
551

    
552
  @_RpcTimeout(_TMO_URGENT)
553
  def call_bdev_sizes(self, node_list, devices):
554
    """Gets the sizes of requested block devices present on a node
555

556
    This is a multi-node call.
557

558
    """
559
    return self._MultiNodeCall(node_list, "bdev_sizes", [devices])
560

    
561
  @_RpcTimeout(_TMO_URGENT)
562
  def call_lv_list(self, node_list, vg_name):
563
    """Gets the logical volumes present in a given volume group.
564

565
    This is a multi-node call.
566

567
    """
568
    return self._MultiNodeCall(node_list, "lv_list", [vg_name])
569

    
570
  @_RpcTimeout(_TMO_URGENT)
571
  def call_vg_list(self, node_list):
572
    """Gets the volume group list.
573

574
    This is a multi-node call.
575

576
    """
577
    return self._MultiNodeCall(node_list, "vg_list", [])
578

    
579
  @_RpcTimeout(_TMO_NORMAL)
580
  def call_storage_list(self, node_list, su_name, su_args, name, fields):
581
    """Get list of storage units.
582

583
    This is a multi-node call.
584

585
    """
586
    return self._MultiNodeCall(node_list, "storage_list",
587
                               [su_name, su_args, name, fields])
588

    
589
  @_RpcTimeout(_TMO_NORMAL)
590
  def call_storage_modify(self, node, su_name, su_args, name, changes):
591
    """Modify a storage unit.
592

593
    This is a single-node call.
594

595
    """
596
    return self._SingleNodeCall(node, "storage_modify",
597
                                [su_name, su_args, name, changes])
598

    
599
  @_RpcTimeout(_TMO_NORMAL)
600
  def call_storage_execute(self, node, su_name, su_args, name, op):
601
    """Executes an operation on a storage unit.
602

603
    This is a single-node call.
604

605
    """
606
    return self._SingleNodeCall(node, "storage_execute",
607
                                [su_name, su_args, name, op])
608

    
609
  @_RpcTimeout(_TMO_URGENT)
610
  def call_bridges_exist(self, node, bridges_list):
611
    """Checks if a node has all the bridges given.
612

613
    This method checks if all bridges given in the bridges_list are
614
    present on the remote node, so that an instance that uses interfaces
615
    on those bridges can be started.
616

617
    This is a single-node call.
618

619
    """
620
    return self._SingleNodeCall(node, "bridges_exist", [bridges_list])
621

    
622
  @_RpcTimeout(_TMO_NORMAL)
623
  def call_instance_start(self, node, instance, hvp, bep, startup_paused):
624
    """Starts an instance.
625

626
    This is a single-node call.
627

628
    """
629
    idict = self._InstDict(instance, hvp=hvp, bep=bep)
630
    return self._SingleNodeCall(node, "instance_start", [idict, startup_paused])
631

    
632
  @_RpcTimeout(_TMO_NORMAL)
633
  def call_instance_shutdown(self, node, instance, timeout):
634
    """Stops an instance.
635

636
    This is a single-node call.
637

638
    """
639
    return self._SingleNodeCall(node, "instance_shutdown",
640
                                [self._InstDict(instance), timeout])
641

    
642
  @_RpcTimeout(_TMO_NORMAL)
643
  def call_migration_info(self, node, instance):
644
    """Gather the information necessary to prepare an instance migration.
645

646
    This is a single-node call.
647

648
    @type node: string
649
    @param node: the node on which the instance is currently running
650
    @type instance: C{objects.Instance}
651
    @param instance: the instance definition
652

653
    """
654
    return self._SingleNodeCall(node, "migration_info",
655
                                [self._InstDict(instance)])
656

    
657
  @_RpcTimeout(_TMO_NORMAL)
658
  def call_accept_instance(self, node, instance, info, target):
659
    """Prepare a node to accept an instance.
660

661
    This is a single-node call.
662

663
    @type node: string
664
    @param node: the target node for the migration
665
    @type instance: C{objects.Instance}
666
    @param instance: the instance definition
667
    @type info: opaque/hypervisor specific (string/data)
668
    @param info: result for the call_migration_info call
669
    @type target: string
670
    @param target: target hostname (usually ip address) (on the node itself)
671

672
    """
673
    return self._SingleNodeCall(node, "accept_instance",
674
                                [self._InstDict(instance), info, target])
675

    
676
  @_RpcTimeout(_TMO_NORMAL)
677
  def call_instance_finalize_migration_dst(self, node, instance, info, success):
678
    """Finalize any target-node migration specific operation.
679

680
    This is called both in case of a successful migration and in case of error
681
    (in which case it should abort the migration).
682

683
    This is a single-node call.
684

685
    @type node: string
686
    @param node: the target node for the migration
687
    @type instance: C{objects.Instance}
688
    @param instance: the instance definition
689
    @type info: opaque/hypervisor specific (string/data)
690
    @param info: result for the call_migration_info call
691
    @type success: boolean
692
    @param success: whether the migration was a success or a failure
693

694
    """
695
    return self._SingleNodeCall(node, "instance_finalize_migration_dst",
696
                                [self._InstDict(instance), info, success])
697

    
698
  @_RpcTimeout(_TMO_SLOW)
699
  def call_instance_migrate(self, node, instance, target, live):
700
    """Migrate an instance.
701

702
    This is a single-node call.
703

704
    @type node: string
705
    @param node: the node on which the instance is currently running
706
    @type instance: C{objects.Instance}
707
    @param instance: the instance definition
708
    @type target: string
709
    @param target: the target node name
710
    @type live: boolean
711
    @param live: whether the migration should be done live or not (the
712
        interpretation of this parameter is left to the hypervisor)
713

714
    """
715
    return self._SingleNodeCall(node, "instance_migrate",
716
                                [self._InstDict(instance), target, live])
717

    
718
  @_RpcTimeout(_TMO_SLOW)
719
  def call_instance_finalize_migration_src(self, node, instance, success, live):
720
    """Finalize the instance migration on the source node.
721

722
    This is a single-node call.
723

724
    @type instance: L{objects.Instance}
725
    @param instance: the instance that was migrated
726
    @type success: bool
727
    @param success: whether the migration succeeded or not
728
    @type live: bool
729
    @param live: whether the user requested a live migration or not
730

731
    """
732
    return self._SingleNodeCall(node, "instance_finalize_migration_src",
733
                                [self._InstDict(instance), success, live])
734

    
735
  @_RpcTimeout(_TMO_SLOW)
736
  def call_instance_get_migration_status(self, node, instance):
737
    """Report migration status.
738

739
    This is a single-node call that must be executed on the source node.
740

741
    @type instance: L{objects.Instance}
742
    @param instance: the instance that is being migrated
743
    @rtype: L{objects.MigrationStatus}
744
    @return: the status of the current migration (one of
745
             L{constants.HV_MIGRATION_VALID_STATUSES}), plus any additional
746
             progress info that can be retrieved from the hypervisor
747

748
    """
749
    result = self._SingleNodeCall(node, "instance_get_migration_status",
750
                                  [self._InstDict(instance)])
751
    if not result.fail_msg and result.payload is not None:
752
      result.payload = objects.MigrationStatus.FromDict(result.payload)
753
    return result
754

    
755
  @_RpcTimeout(_TMO_NORMAL)
756
  def call_instance_reboot(self, node, inst, reboot_type, shutdown_timeout):
757
    """Reboots an instance.
758

759
    This is a single-node call.
760

761
    """
762
    return self._SingleNodeCall(node, "instance_reboot",
763
                                [self._InstDict(inst), reboot_type,
764
                                 shutdown_timeout])
765

    
766
  @_RpcTimeout(_TMO_1DAY)
767
  def call_instance_os_add(self, node, inst, reinstall, debug, osparams=None):
768
    """Installs an OS on the given instance.
769

770
    This is a single-node call.
771

772
    """
773
    return self._SingleNodeCall(node, "instance_os_add",
774
                                [self._InstDict(inst, osp=osparams),
775
                                 reinstall, debug])
776

    
777
  @_RpcTimeout(_TMO_SLOW)
778
  def call_instance_run_rename(self, node, inst, old_name, debug):
779
    """Run the OS rename script for an instance.
780

781
    This is a single-node call.
782

783
    """
784
    return self._SingleNodeCall(node, "instance_run_rename",
785
                                [self._InstDict(inst), old_name, debug])
786

    
787
  @_RpcTimeout(_TMO_URGENT)
788
  def call_instance_info(self, node, instance, hname):
789
    """Returns information about a single instance.
790

791
    This is a single-node call.
792

793
    @type node: list
794
    @param node: the list of nodes to query
795
    @type instance: string
796
    @param instance: the instance name
797
    @type hname: string
798
    @param hname: the hypervisor type of the instance
799

800
    """
801
    return self._SingleNodeCall(node, "instance_info", [instance, hname])
802

    
803
  @_RpcTimeout(_TMO_NORMAL)
804
  def call_instance_migratable(self, node, instance):
805
    """Checks whether the given instance can be migrated.
806

807
    This is a single-node call.
808

809
    @param node: the node to query
810
    @type instance: L{objects.Instance}
811
    @param instance: the instance to check
812

813

814
    """
815
    return self._SingleNodeCall(node, "instance_migratable",
816
                                [self._InstDict(instance)])
817

    
818
  @_RpcTimeout(_TMO_URGENT)
819
  def call_all_instances_info(self, node_list, hypervisor_list):
820
    """Returns information about all instances on the given nodes.
821

822
    This is a multi-node call.
823

824
    @type node_list: list
825
    @param node_list: the list of nodes to query
826
    @type hypervisor_list: list
827
    @param hypervisor_list: the hypervisors to query for instances
828

829
    """
830
    return self._MultiNodeCall(node_list, "all_instances_info",
831
                               [hypervisor_list])
832

    
833
  @_RpcTimeout(_TMO_URGENT)
834
  def call_instance_list(self, node_list, hypervisor_list):
835
    """Returns the list of running instances on a given node.
836

837
    This is a multi-node call.
838

839
    @type node_list: list
840
    @param node_list: the list of nodes to query
841
    @type hypervisor_list: list
842
    @param hypervisor_list: the hypervisors to query for instances
843

844
    """
845
    return self._MultiNodeCall(node_list, "instance_list", [hypervisor_list])
846

    
847
  @_RpcTimeout(_TMO_FAST)
848
  def call_node_has_ip_address(self, node, address):
849
    """Checks if a node has the given IP address.
850

851
    This is a single-node call.
852

853
    """
854
    return self._SingleNodeCall(node, "node_has_ip_address", [address])
855

    
856
  @_RpcTimeout(_TMO_URGENT)
857
  def call_node_info(self, node_list, vg_name, hypervisor_type):
858
    """Return node information.
859

860
    This will return memory information and volume group size and free
861
    space.
862

863
    This is a multi-node call.
864

865
    @type node_list: list
866
    @param node_list: the list of nodes to query
867
    @type vg_name: C{string}
868
    @param vg_name: the name of the volume group to ask for disk space
869
        information
870
    @type hypervisor_type: C{str}
871
    @param hypervisor_type: the name of the hypervisor to ask for
872
        memory information
873

874
    """
875
    return self._MultiNodeCall(node_list, "node_info",
876
                               [vg_name, hypervisor_type])
877

    
878
  @_RpcTimeout(_TMO_NORMAL)
879
  def call_etc_hosts_modify(self, node, mode, name, ip):
880
    """Modify hosts file with name
881

882
    @type node: string
883
    @param node: The node to call
884
    @type mode: string
885
    @param mode: The mode to operate. Currently "add" or "remove"
886
    @type name: string
887
    @param name: The host name to be modified
888
    @type ip: string
889
    @param ip: The ip of the entry (just valid if mode is "add")
890

891
    """
892
    return self._SingleNodeCall(node, "etc_hosts_modify", [mode, name, ip])
893

    
894
  @_RpcTimeout(_TMO_NORMAL)
895
  def call_node_verify(self, node_list, checkdict, cluster_name):
896
    """Request verification of given parameters.
897

898
    This is a multi-node call.
899

900
    """
901
    return self._MultiNodeCall(node_list, "node_verify",
902
                               [checkdict, cluster_name])
903

    
904
  @classmethod
905
  @_RpcTimeout(_TMO_FAST)
906
  def call_node_start_master_daemons(cls, node, no_voting):
907
    """Starts master daemons on a node.
908

909
    This is a single-node call.
910

911
    """
912
    return cls._StaticSingleNodeCall(node, "node_start_master_daemons",
913
                                     [no_voting])
914

    
915
  @classmethod
916
  @_RpcTimeout(_TMO_FAST)
917
  def call_node_activate_master_ip(cls, node):
918
    """Activates master IP on a node.
919

920
    This is a single-node call.
921

922
    """
923
    return cls._StaticSingleNodeCall(node, "node_activate_master_ip", [])
924

    
925
  @classmethod
926
  @_RpcTimeout(_TMO_FAST)
927
  def call_node_stop_master(cls, node):
928
    """Deactivates master IP and stops master daemons on a node.
929

930
    This is a single-node call.
931

932
    """
933
    return cls._StaticSingleNodeCall(node, "node_stop_master", [])
934

    
935
  @classmethod
936
  @_RpcTimeout(_TMO_FAST)
937
  def call_node_deactivate_master_ip(cls, node):
938
    """Deactivates master IP on a node.
939

940
    This is a single-node call.
941

942
    """
943
    return cls._StaticSingleNodeCall(node, "node_deactivate_master_ip", [])
944

    
945
  @classmethod
946
  @_RpcTimeout(_TMO_FAST)
947
  def call_node_change_master_netmask(cls, node, netmask):
948
    """Change master IP netmask.
949

950
    This is a single-node call.
951

952
    """
953
    return cls._StaticSingleNodeCall(node, "node_change_master_netmask",
954
                  [netmask])
955

    
956
  @classmethod
957
  @_RpcTimeout(_TMO_URGENT)
958
  def call_master_info(cls, node_list):
959
    """Query master info.
960

961
    This is a multi-node call.
962

963
    """
964
    # TODO: should this method query down nodes?
965
    return cls._StaticMultiNodeCall(node_list, "master_info", [])
966

    
967
  @classmethod
968
  @_RpcTimeout(_TMO_URGENT)
969
  def call_version(cls, node_list):
970
    """Query node version.
971

972
    This is a multi-node call.
973

974
    """
975
    return cls._StaticMultiNodeCall(node_list, "version", [])
976

    
977
  @_RpcTimeout(_TMO_NORMAL)
978
  def call_blockdev_create(self, node, bdev, size, owner, on_primary, info):
979
    """Request creation of a given block device.
980

981
    This is a single-node call.
982

983
    """
984
    return self._SingleNodeCall(node, "blockdev_create",
985
                                [bdev.ToDict(), size, owner, on_primary, info])
986

    
987
  @_RpcTimeout(_TMO_SLOW)
988
  def call_blockdev_wipe(self, node, bdev, offset, size):
989
    """Request wipe at given offset with given size of a block device.
990

991
    This is a single-node call.
992

993
    """
994
    return self._SingleNodeCall(node, "blockdev_wipe",
995
                                [bdev.ToDict(), offset, size])
996

    
997
  @_RpcTimeout(_TMO_NORMAL)
998
  def call_blockdev_remove(self, node, bdev):
999
    """Request removal of a given block device.
1000

1001
    This is a single-node call.
1002

1003
    """
1004
    return self._SingleNodeCall(node, "blockdev_remove", [bdev.ToDict()])
1005

    
1006
  @_RpcTimeout(_TMO_NORMAL)
1007
  def call_blockdev_rename(self, node, devlist):
1008
    """Request rename of the given block devices.
1009

1010
    This is a single-node call.
1011

1012
    """
1013
    return self._SingleNodeCall(node, "blockdev_rename",
1014
                                [[(d.ToDict(), uid) for d, uid in devlist]])
1015

    
1016
  @_RpcTimeout(_TMO_NORMAL)
1017
  def call_blockdev_pause_resume_sync(self, node, disks, pause):
1018
    """Request a pause/resume of given block device.
1019

1020
    This is a single-node call.
1021

1022
    """
1023
    return self._SingleNodeCall(node, "blockdev_pause_resume_sync",
1024
                                [[bdev.ToDict() for bdev in disks], pause])
1025

    
1026
  @_RpcTimeout(_TMO_NORMAL)
1027
  def call_blockdev_assemble(self, node, disk, owner, on_primary, idx):
1028
    """Request assembling of a given block device.
1029

1030
    This is a single-node call.
1031

1032
    """
1033
    return self._SingleNodeCall(node, "blockdev_assemble",
1034
                                [disk.ToDict(), owner, on_primary, idx])
1035

    
1036
  @_RpcTimeout(_TMO_NORMAL)
1037
  def call_blockdev_shutdown(self, node, disk):
1038
    """Request shutdown of a given block device.
1039

1040
    This is a single-node call.
1041

1042
    """
1043
    return self._SingleNodeCall(node, "blockdev_shutdown", [disk.ToDict()])
1044

    
1045
  @_RpcTimeout(_TMO_NORMAL)
1046
  def call_blockdev_addchildren(self, node, bdev, ndevs):
1047
    """Request adding a list of children to a (mirroring) device.
1048

1049
    This is a single-node call.
1050

1051
    """
1052
    return self._SingleNodeCall(node, "blockdev_addchildren",
1053
                                [bdev.ToDict(),
1054
                                 [disk.ToDict() for disk in ndevs]])
1055

    
1056
  @_RpcTimeout(_TMO_NORMAL)
1057
  def call_blockdev_removechildren(self, node, bdev, ndevs):
1058
    """Request removing a list of children from a (mirroring) device.
1059

1060
    This is a single-node call.
1061

1062
    """
1063
    return self._SingleNodeCall(node, "blockdev_removechildren",
1064
                                [bdev.ToDict(),
1065
                                 [disk.ToDict() for disk in ndevs]])
1066

    
1067
  @_RpcTimeout(_TMO_NORMAL)
1068
  def call_blockdev_getmirrorstatus(self, node, disks):
1069
    """Request status of a (mirroring) device.
1070

1071
    This is a single-node call.
1072

1073
    """
1074
    result = self._SingleNodeCall(node, "blockdev_getmirrorstatus",
1075
                                  [dsk.ToDict() for dsk in disks])
1076
    if not result.fail_msg:
1077
      result.payload = [objects.BlockDevStatus.FromDict(i)
1078
                        for i in result.payload]
1079
    return result
1080

    
1081
  @_RpcTimeout(_TMO_NORMAL)
1082
  def call_blockdev_getmirrorstatus_multi(self, node_list, node_disks):
1083
    """Request status of (mirroring) devices from multiple nodes.
1084

1085
    This is a multi-node call.
1086

1087
    """
1088
    result = self._MultiNodeCall(node_list, "blockdev_getmirrorstatus_multi",
1089
                                 [dict((name, [dsk.ToDict() for dsk in disks])
1090
                                       for name, disks in node_disks.items())])
1091
    for nres in result.values():
1092
      if nres.fail_msg:
1093
        continue
1094

    
1095
      for idx, (success, status) in enumerate(nres.payload):
1096
        if success:
1097
          nres.payload[idx] = (success, objects.BlockDevStatus.FromDict(status))
1098

    
1099
    return result
1100

    
1101
  @_RpcTimeout(_TMO_NORMAL)
1102
  def call_blockdev_find(self, node, disk):
1103
    """Request identification of a given block device.
1104

1105
    This is a single-node call.
1106

1107
    """
1108
    result = self._SingleNodeCall(node, "blockdev_find", [disk.ToDict()])
1109
    if not result.fail_msg and result.payload is not None:
1110
      result.payload = objects.BlockDevStatus.FromDict(result.payload)
1111
    return result
1112

    
1113
  @_RpcTimeout(_TMO_NORMAL)
1114
  def call_blockdev_close(self, node, instance_name, disks):
1115
    """Closes the given block devices.
1116

1117
    This is a single-node call.
1118

1119
    """
1120
    params = [instance_name, [cf.ToDict() for cf in disks]]
1121
    return self._SingleNodeCall(node, "blockdev_close", params)
1122

    
1123
  @_RpcTimeout(_TMO_NORMAL)
1124
  def call_blockdev_getsize(self, node, disks):
1125
    """Returns the size of the given disks.
1126

1127
    This is a single-node call.
1128

1129
    """
1130
    params = [[cf.ToDict() for cf in disks]]
1131
    return self._SingleNodeCall(node, "blockdev_getsize", params)
1132

    
1133
  @_RpcTimeout(_TMO_NORMAL)
1134
  def call_drbd_disconnect_net(self, node_list, nodes_ip, disks):
1135
    """Disconnects the network of the given drbd devices.
1136

1137
    This is a multi-node call.
1138

1139
    """
1140
    return self._MultiNodeCall(node_list, "drbd_disconnect_net",
1141
                               [nodes_ip, [cf.ToDict() for cf in disks]])
1142

    
1143
  @_RpcTimeout(_TMO_NORMAL)
1144
  def call_drbd_attach_net(self, node_list, nodes_ip,
1145
                           disks, instance_name, multimaster):
1146
    """Disconnects the given drbd devices.
1147

1148
    This is a multi-node call.
1149

1150
    """
1151
    return self._MultiNodeCall(node_list, "drbd_attach_net",
1152
                               [nodes_ip, [cf.ToDict() for cf in disks],
1153
                                instance_name, multimaster])
1154

    
1155
  @_RpcTimeout(_TMO_SLOW)
1156
  def call_drbd_wait_sync(self, node_list, nodes_ip, disks):
1157
    """Waits for the synchronization of drbd devices is complete.
1158

1159
    This is a multi-node call.
1160

1161
    """
1162
    return self._MultiNodeCall(node_list, "drbd_wait_sync",
1163
                               [nodes_ip, [cf.ToDict() for cf in disks]])
1164

    
1165
  @_RpcTimeout(_TMO_URGENT)
1166
  def call_drbd_helper(self, node_list):
1167
    """Gets drbd helper.
1168

1169
    This is a multi-node call.
1170

1171
    """
1172
    return self._MultiNodeCall(node_list, "drbd_helper", [])
1173

    
1174
  @classmethod
1175
  @_RpcTimeout(_TMO_NORMAL)
1176
  def call_upload_file(cls, node_list, file_name, address_list=None):
1177
    """Upload a file.
1178

1179
    The node will refuse the operation in case the file is not on the
1180
    approved file list.
1181

1182
    This is a multi-node call.
1183

1184
    @type node_list: list
1185
    @param node_list: the list of node names to upload to
1186
    @type file_name: str
1187
    @param file_name: the filename to upload
1188
    @type address_list: list or None
1189
    @keyword address_list: an optional list of node addresses, in order
1190
        to optimize the RPC speed
1191

1192
    """
1193
    file_contents = utils.ReadFile(file_name)
1194
    data = _Compress(file_contents)
1195
    st = os.stat(file_name)
1196
    getents = runtime.GetEnts()
1197
    params = [file_name, data, st.st_mode, getents.LookupUid(st.st_uid),
1198
              getents.LookupGid(st.st_gid), st.st_atime, st.st_mtime]
1199
    return cls._StaticMultiNodeCall(node_list, "upload_file", params,
1200
                                    address_list=address_list)
1201

    
1202
  @classmethod
1203
  @_RpcTimeout(_TMO_NORMAL)
1204
  def call_write_ssconf_files(cls, node_list, values):
1205
    """Write ssconf files.
1206

1207
    This is a multi-node call.
1208

1209
    """
1210
    return cls._StaticMultiNodeCall(node_list, "write_ssconf_files", [values])
1211

    
1212
  @_RpcTimeout(_TMO_NORMAL)
1213
  def call_run_oob(self, node, oob_program, command, remote_node, timeout):
1214
    """Runs OOB.
1215

1216
    This is a single-node call.
1217

1218
    """
1219
    return self._SingleNodeCall(node, "run_oob", [oob_program, command,
1220
                                                  remote_node, timeout])
1221

    
1222
  @_RpcTimeout(_TMO_FAST)
1223
  def call_os_diagnose(self, node_list):
1224
    """Request a diagnose of OS definitions.
1225

1226
    This is a multi-node call.
1227

1228
    """
1229
    return self._MultiNodeCall(node_list, "os_diagnose", [])
1230

    
1231
  @_RpcTimeout(_TMO_FAST)
1232
  def call_os_get(self, node, name):
1233
    """Returns an OS definition.
1234

1235
    This is a single-node call.
1236

1237
    """
1238
    result = self._SingleNodeCall(node, "os_get", [name])
1239
    if not result.fail_msg and isinstance(result.payload, dict):
1240
      result.payload = objects.OS.FromDict(result.payload)
1241
    return result
1242

    
1243
  @_RpcTimeout(_TMO_FAST)
1244
  def call_os_validate(self, nodes, required, name, checks, params):
1245
    """Run a validation routine for a given OS.
1246

1247
    This is a multi-node call.
1248

1249
    """
1250
    return self._MultiNodeCall(nodes, "os_validate",
1251
                               [required, name, checks, params])
1252

    
1253
  @_RpcTimeout(_TMO_NORMAL)
1254
  def call_hooks_runner(self, node_list, hpath, phase, env):
1255
    """Call the hooks runner.
1256

1257
    Args:
1258
      - op: the OpCode instance
1259
      - env: a dictionary with the environment
1260

1261
    This is a multi-node call.
1262

1263
    """
1264
    params = [hpath, phase, env]
1265
    return self._MultiNodeCall(node_list, "hooks_runner", params)
1266

    
1267
  @_RpcTimeout(_TMO_NORMAL)
1268
  def call_iallocator_runner(self, node, name, idata):
1269
    """Call an iallocator on a remote node
1270

1271
    Args:
1272
      - name: the iallocator name
1273
      - input: the json-encoded input string
1274

1275
    This is a single-node call.
1276

1277
    """
1278
    return self._SingleNodeCall(node, "iallocator_runner", [name, idata])
1279

    
1280
  @_RpcTimeout(_TMO_NORMAL)
1281
  def call_blockdev_grow(self, node, cf_bdev, amount, dryrun):
1282
    """Request a snapshot of the given block device.
1283

1284
    This is a single-node call.
1285

1286
    """
1287
    return self._SingleNodeCall(node, "blockdev_grow",
1288
                                [cf_bdev.ToDict(), amount, dryrun])
1289

    
1290
  @_RpcTimeout(_TMO_1DAY)
1291
  def call_blockdev_export(self, node, cf_bdev,
1292
                           dest_node, dest_path, cluster_name):
1293
    """Export a given disk to another node.
1294

1295
    This is a single-node call.
1296

1297
    """
1298
    return self._SingleNodeCall(node, "blockdev_export",
1299
                                [cf_bdev.ToDict(), dest_node, dest_path,
1300
                                 cluster_name])
1301

    
1302
  @_RpcTimeout(_TMO_NORMAL)
1303
  def call_blockdev_snapshot(self, node, cf_bdev):
1304
    """Request a snapshot of the given block device.
1305

1306
    This is a single-node call.
1307

1308
    """
1309
    return self._SingleNodeCall(node, "blockdev_snapshot", [cf_bdev.ToDict()])
1310

    
1311
  @_RpcTimeout(_TMO_NORMAL)
1312
  def call_finalize_export(self, node, instance, snap_disks):
1313
    """Request the completion of an export operation.
1314

1315
    This writes the export config file, etc.
1316

1317
    This is a single-node call.
1318

1319
    """
1320
    flat_disks = []
1321
    for disk in snap_disks:
1322
      if isinstance(disk, bool):
1323
        flat_disks.append(disk)
1324
      else:
1325
        flat_disks.append(disk.ToDict())
1326

    
1327
    return self._SingleNodeCall(node, "finalize_export",
1328
                                [self._InstDict(instance), flat_disks])
1329

    
1330
  @_RpcTimeout(_TMO_FAST)
1331
  def call_export_info(self, node, path):
1332
    """Queries the export information in a given path.
1333

1334
    This is a single-node call.
1335

1336
    """
1337
    return self._SingleNodeCall(node, "export_info", [path])
1338

    
1339
  @_RpcTimeout(_TMO_FAST)
1340
  def call_export_list(self, node_list):
1341
    """Gets the stored exports list.
1342

1343
    This is a multi-node call.
1344

1345
    """
1346
    return self._MultiNodeCall(node_list, "export_list", [])
1347

    
1348
  @_RpcTimeout(_TMO_FAST)
1349
  def call_export_remove(self, node, export):
1350
    """Requests removal of a given export.
1351

1352
    This is a single-node call.
1353

1354
    """
1355
    return self._SingleNodeCall(node, "export_remove", [export])
1356

    
1357
  @classmethod
1358
  @_RpcTimeout(_TMO_NORMAL)
1359
  def call_node_leave_cluster(cls, node, modify_ssh_setup):
1360
    """Requests a node to clean the cluster information it has.
1361

1362
    This will remove the configuration information from the ganeti data
1363
    dir.
1364

1365
    This is a single-node call.
1366

1367
    """
1368
    return cls._StaticSingleNodeCall(node, "node_leave_cluster",
1369
                                     [modify_ssh_setup])
1370

    
1371
  @_RpcTimeout(_TMO_FAST)
1372
  def call_node_volumes(self, node_list):
1373
    """Gets all volumes on node(s).
1374

1375
    This is a multi-node call.
1376

1377
    """
1378
    return self._MultiNodeCall(node_list, "node_volumes", [])
1379

    
1380
  @_RpcTimeout(_TMO_FAST)
1381
  def call_node_demote_from_mc(self, node):
1382
    """Demote a node from the master candidate role.
1383

1384
    This is a single-node call.
1385

1386
    """
1387
    return self._SingleNodeCall(node, "node_demote_from_mc", [])
1388

    
1389
  @_RpcTimeout(_TMO_NORMAL)
1390
  def call_node_powercycle(self, node, hypervisor):
1391
    """Tries to powercycle a node.
1392

1393
    This is a single-node call.
1394

1395
    """
1396
    return self._SingleNodeCall(node, "node_powercycle", [hypervisor])
1397

    
1398
  @_RpcTimeout(None)
1399
  def call_test_delay(self, node_list, duration):
1400
    """Sleep for a fixed time on given node(s).
1401

1402
    This is a multi-node call.
1403

1404
    """
1405
    return self._MultiNodeCall(node_list, "test_delay", [duration],
1406
                               read_timeout=int(duration + 5))
1407

    
1408
  @_RpcTimeout(_TMO_FAST)
1409
  def call_file_storage_dir_create(self, node, file_storage_dir):
1410
    """Create the given file storage directory.
1411

1412
    This is a single-node call.
1413

1414
    """
1415
    return self._SingleNodeCall(node, "file_storage_dir_create",
1416
                                [file_storage_dir])
1417

    
1418
  @_RpcTimeout(_TMO_FAST)
1419
  def call_file_storage_dir_remove(self, node, file_storage_dir):
1420
    """Remove the given file storage directory.
1421

1422
    This is a single-node call.
1423

1424
    """
1425
    return self._SingleNodeCall(node, "file_storage_dir_remove",
1426
                                [file_storage_dir])
1427

    
1428
  @_RpcTimeout(_TMO_FAST)
1429
  def call_file_storage_dir_rename(self, node, old_file_storage_dir,
1430
                                   new_file_storage_dir):
1431
    """Rename file storage directory.
1432

1433
    This is a single-node call.
1434

1435
    """
1436
    return self._SingleNodeCall(node, "file_storage_dir_rename",
1437
                                [old_file_storage_dir, new_file_storage_dir])
1438

    
1439
  @classmethod
1440
  @_RpcTimeout(_TMO_URGENT)
1441
  def call_jobqueue_update(cls, node_list, address_list, file_name, content):
1442
    """Update job queue.
1443

1444
    This is a multi-node call.
1445

1446
    """
1447
    return cls._StaticMultiNodeCall(node_list, "jobqueue_update",
1448
                                    [file_name, _Compress(content)],
1449
                                    address_list=address_list)
1450

    
1451
  @classmethod
1452
  @_RpcTimeout(_TMO_NORMAL)
1453
  def call_jobqueue_purge(cls, node):
1454
    """Purge job queue.
1455

1456
    This is a single-node call.
1457

1458
    """
1459
    return cls._StaticSingleNodeCall(node, "jobqueue_purge", [])
1460

    
1461
  @classmethod
1462
  @_RpcTimeout(_TMO_URGENT)
1463
  def call_jobqueue_rename(cls, node_list, address_list, rename):
1464
    """Rename a job queue file.
1465

1466
    This is a multi-node call.
1467

1468
    """
1469
    return cls._StaticMultiNodeCall(node_list, "jobqueue_rename", rename,
1470
                                    address_list=address_list)
1471

    
1472
  @_RpcTimeout(_TMO_NORMAL)
1473
  def call_hypervisor_validate_params(self, node_list, hvname, hvparams):
1474
    """Validate the hypervisor params.
1475

1476
    This is a multi-node call.
1477

1478
    @type node_list: list
1479
    @param node_list: the list of nodes to query
1480
    @type hvname: string
1481
    @param hvname: the hypervisor name
1482
    @type hvparams: dict
1483
    @param hvparams: the hypervisor parameters to be validated
1484

1485
    """
1486
    cluster = self._cfg.GetClusterInfo()
1487
    hv_full = objects.FillDict(cluster.hvparams.get(hvname, {}), hvparams)
1488
    return self._MultiNodeCall(node_list, "hypervisor_validate_params",
1489
                               [hvname, hv_full])
1490

    
1491
  @_RpcTimeout(_TMO_NORMAL)
1492
  def call_x509_cert_create(self, node, validity):
1493
    """Creates a new X509 certificate for SSL/TLS.
1494

1495
    This is a single-node call.
1496

1497
    @type validity: int
1498
    @param validity: Validity in seconds
1499

1500
    """
1501
    return self._SingleNodeCall(node, "x509_cert_create", [validity])
1502

    
1503
  @_RpcTimeout(_TMO_NORMAL)
1504
  def call_x509_cert_remove(self, node, name):
1505
    """Removes a X509 certificate.
1506

1507
    This is a single-node call.
1508

1509
    @type name: string
1510
    @param name: Certificate name
1511

1512
    """
1513
    return self._SingleNodeCall(node, "x509_cert_remove", [name])
1514

    
1515
  @_RpcTimeout(_TMO_NORMAL)
1516
  def call_import_start(self, node, opts, instance, component,
1517
                        dest, dest_args):
1518
    """Starts a listener for an import.
1519

1520
    This is a single-node call.
1521

1522
    @type node: string
1523
    @param node: Node name
1524
    @type instance: C{objects.Instance}
1525
    @param instance: Instance object
1526
    @type component: string
1527
    @param component: which part of the instance is being imported
1528

1529
    """
1530
    return self._SingleNodeCall(node, "import_start",
1531
                                [opts.ToDict(),
1532
                                 self._InstDict(instance), component, dest,
1533
                                 _EncodeImportExportIO(dest, dest_args)])
1534

    
1535
  @_RpcTimeout(_TMO_NORMAL)
1536
  def call_export_start(self, node, opts, host, port,
1537
                        instance, component, source, source_args):
1538
    """Starts an export daemon.
1539

1540
    This is a single-node call.
1541

1542
    @type node: string
1543
    @param node: Node name
1544
    @type instance: C{objects.Instance}
1545
    @param instance: Instance object
1546
    @type component: string
1547
    @param component: which part of the instance is being imported
1548

1549
    """
1550
    return self._SingleNodeCall(node, "export_start",
1551
                                [opts.ToDict(), host, port,
1552
                                 self._InstDict(instance),
1553
                                 component, source,
1554
                                 _EncodeImportExportIO(source, source_args)])
1555

    
1556
  @_RpcTimeout(_TMO_FAST)
1557
  def call_impexp_status(self, node, names):
1558
    """Gets the status of an import or export.
1559

1560
    This is a single-node call.
1561

1562
    @type node: string
1563
    @param node: Node name
1564
    @type names: List of strings
1565
    @param names: Import/export names
1566
    @rtype: List of L{objects.ImportExportStatus} instances
1567
    @return: Returns a list of the state of each named import/export or None if
1568
             a status couldn't be retrieved
1569

1570
    """
1571
    result = self._SingleNodeCall(node, "impexp_status", [names])
1572

    
1573
    if not result.fail_msg:
1574
      decoded = []
1575

    
1576
      for i in result.payload:
1577
        if i is None:
1578
          decoded.append(None)
1579
          continue
1580
        decoded.append(objects.ImportExportStatus.FromDict(i))
1581

    
1582
      result.payload = decoded
1583

    
1584
    return result
1585

    
1586
  @_RpcTimeout(_TMO_NORMAL)
1587
  def call_impexp_abort(self, node, name):
1588
    """Aborts an import or export.
1589

1590
    This is a single-node call.
1591

1592
    @type node: string
1593
    @param node: Node name
1594
    @type name: string
1595
    @param name: Import/export name
1596

1597
    """
1598
    return self._SingleNodeCall(node, "impexp_abort", [name])
1599

    
1600
  @_RpcTimeout(_TMO_NORMAL)
1601
  def call_impexp_cleanup(self, node, name):
1602
    """Cleans up after an import or export.
1603

1604
    This is a single-node call.
1605

1606
    @type node: string
1607
    @param node: Node name
1608
    @type name: string
1609
    @param name: Import/export name
1610

1611
    """
1612
    return self._SingleNodeCall(node, "impexp_cleanup", [name])