Statistics
| Branch: | Tag: | Revision:

root / lib / rpc.py @ 6a1434d7

History | View | Annotate | Download (47.6 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Inter-node RPC library.
23

24
"""
25

    
26
# pylint: disable=C0103,R0201,R0904
27
# C0103: Invalid name, since call_ are not valid
28
# R0201: Method could be a function, we keep all rpcs instance methods
29
# as not to change them back and forth between static/instance methods
30
# if they need to start using instance attributes
31
# R0904: Too many public methods
32

    
33
import os
34
import logging
35
import zlib
36
import base64
37
import pycurl
38
import threading
39

    
40
from ganeti import utils
41
from ganeti import objects
42
from ganeti import http
43
from ganeti import serializer
44
from ganeti import constants
45
from ganeti import errors
46
from ganeti import netutils
47
from ganeti import ssconf
48
from ganeti import runtime
49
from ganeti import compat
50

    
51
# pylint has a bug here, doesn't see this import
52
import ganeti.http.client  # pylint: disable=W0611
53

    
54

    
55
# Timeout for connecting to nodes (seconds)
56
_RPC_CONNECT_TIMEOUT = 5
57

    
58
_RPC_CLIENT_HEADERS = [
59
  "Content-type: %s" % http.HTTP_APP_JSON,
60
  "Expect:",
61
  ]
62

    
63
# Various time constants for the timeout table
64
_TMO_URGENT = 60 # one minute
65
_TMO_FAST = 5 * 60 # five minutes
66
_TMO_NORMAL = 15 * 60 # 15 minutes
67
_TMO_SLOW = 3600 # one hour
68
_TMO_4HRS = 4 * 3600
69
_TMO_1DAY = 86400
70

    
71
# Timeout table that will be built later by decorators
72
# Guidelines for choosing timeouts:
73
# - call used during watcher: timeout -> 1min, _TMO_URGENT
74
# - trivial (but be sure it is trivial) (e.g. reading a file): 5min, _TMO_FAST
75
# - other calls: 15 min, _TMO_NORMAL
76
# - special calls (instance add, etc.): either _TMO_SLOW (1h) or huge timeouts
77

    
78
_TIMEOUTS = {
79
}
80

    
81
#: Special value to describe an offline host
82
_OFFLINE = object()
83

    
84

    
85
def Init():
86
  """Initializes the module-global HTTP client manager.
87

88
  Must be called before using any RPC function and while exactly one thread is
89
  running.
90

91
  """
92
  # curl_global_init(3) and curl_global_cleanup(3) must be called with only
93
  # one thread running. This check is just a safety measure -- it doesn't
94
  # cover all cases.
95
  assert threading.activeCount() == 1, \
96
         "Found more than one active thread when initializing pycURL"
97

    
98
  logging.info("Using PycURL %s", pycurl.version)
99

    
100
  pycurl.global_init(pycurl.GLOBAL_ALL)
101

    
102

    
103
def Shutdown():
104
  """Stops the module-global HTTP client manager.
105

106
  Must be called before quitting the program and while exactly one thread is
107
  running.
108

109
  """
110
  pycurl.global_cleanup()
111

    
112

    
113
def _ConfigRpcCurl(curl):
114
  noded_cert = str(constants.NODED_CERT_FILE)
115

    
116
  curl.setopt(pycurl.FOLLOWLOCATION, False)
117
  curl.setopt(pycurl.CAINFO, noded_cert)
118
  curl.setopt(pycurl.SSL_VERIFYHOST, 0)
119
  curl.setopt(pycurl.SSL_VERIFYPEER, True)
120
  curl.setopt(pycurl.SSLCERTTYPE, "PEM")
121
  curl.setopt(pycurl.SSLCERT, noded_cert)
122
  curl.setopt(pycurl.SSLKEYTYPE, "PEM")
123
  curl.setopt(pycurl.SSLKEY, noded_cert)
124
  curl.setopt(pycurl.CONNECTTIMEOUT, _RPC_CONNECT_TIMEOUT)
125

    
126

    
127
# Aliasing this module avoids the following warning by epydoc: "Warning: No
128
# information available for ganeti.rpc._RpcThreadLocal's base threading.local"
129
_threading = threading
130

    
131

    
132
class _RpcThreadLocal(_threading.local):
133
  def GetHttpClientPool(self):
134
    """Returns a per-thread HTTP client pool.
135

136
    @rtype: L{http.client.HttpClientPool}
137

138
    """
139
    try:
140
      pool = self.hcp
141
    except AttributeError:
142
      pool = http.client.HttpClientPool(_ConfigRpcCurl)
143
      self.hcp = pool
144

    
145
    return pool
146

    
147

    
148
# Remove module alias (see above)
149
del _threading
150

    
151

    
152
_thread_local = _RpcThreadLocal()
153

    
154

    
155
def _RpcTimeout(secs):
156
  """Timeout decorator.
157

158
  When applied to a rpc call_* function, it updates the global timeout
159
  table with the given function/timeout.
160

161
  """
162
  def decorator(f):
163
    name = f.__name__
164
    assert name.startswith("call_")
165
    _TIMEOUTS[name[len("call_"):]] = secs
166
    return f
167
  return decorator
168

    
169

    
170
def RunWithRPC(fn):
171
  """RPC-wrapper decorator.
172

173
  When applied to a function, it runs it with the RPC system
174
  initialized, and it shutsdown the system afterwards. This means the
175
  function must be called without RPC being initialized.
176

177
  """
178
  def wrapper(*args, **kwargs):
179
    Init()
180
    try:
181
      return fn(*args, **kwargs)
182
    finally:
183
      Shutdown()
184
  return wrapper
185

    
186

    
187
def _Compress(data):
188
  """Compresses a string for transport over RPC.
189

190
  Small amounts of data are not compressed.
191

192
  @type data: str
193
  @param data: Data
194
  @rtype: tuple
195
  @return: Encoded data to send
196

197
  """
198
  # Small amounts of data are not compressed
199
  if len(data) < 512:
200
    return (constants.RPC_ENCODING_NONE, data)
201

    
202
  # Compress with zlib and encode in base64
203
  return (constants.RPC_ENCODING_ZLIB_BASE64,
204
          base64.b64encode(zlib.compress(data, 3)))
205

    
206

    
207
class RpcResult(object):
208
  """RPC Result class.
209

210
  This class holds an RPC result. It is needed since in multi-node
211
  calls we can't raise an exception just because one one out of many
212
  failed, and therefore we use this class to encapsulate the result.
213

214
  @ivar data: the data payload, for successful results, or None
215
  @ivar call: the name of the RPC call
216
  @ivar node: the name of the node to which we made the call
217
  @ivar offline: whether the operation failed because the node was
218
      offline, as opposed to actual failure; offline=True will always
219
      imply failed=True, in order to allow simpler checking if
220
      the user doesn't care about the exact failure mode
221
  @ivar fail_msg: the error message if the call failed
222

223
  """
224
  def __init__(self, data=None, failed=False, offline=False,
225
               call=None, node=None):
226
    self.offline = offline
227
    self.call = call
228
    self.node = node
229

    
230
    if offline:
231
      self.fail_msg = "Node is marked offline"
232
      self.data = self.payload = None
233
    elif failed:
234
      self.fail_msg = self._EnsureErr(data)
235
      self.data = self.payload = None
236
    else:
237
      self.data = data
238
      if not isinstance(self.data, (tuple, list)):
239
        self.fail_msg = ("RPC layer error: invalid result type (%s)" %
240
                         type(self.data))
241
        self.payload = None
242
      elif len(data) != 2:
243
        self.fail_msg = ("RPC layer error: invalid result length (%d), "
244
                         "expected 2" % len(self.data))
245
        self.payload = None
246
      elif not self.data[0]:
247
        self.fail_msg = self._EnsureErr(self.data[1])
248
        self.payload = None
249
      else:
250
        # finally success
251
        self.fail_msg = None
252
        self.payload = data[1]
253

    
254
    for attr_name in ["call", "data", "fail_msg",
255
                      "node", "offline", "payload"]:
256
      assert hasattr(self, attr_name), "Missing attribute %s" % attr_name
257

    
258
  @staticmethod
259
  def _EnsureErr(val):
260
    """Helper to ensure we return a 'True' value for error."""
261
    if val:
262
      return val
263
    else:
264
      return "No error information"
265

    
266
  def Raise(self, msg, prereq=False, ecode=None):
267
    """If the result has failed, raise an OpExecError.
268

269
    This is used so that LU code doesn't have to check for each
270
    result, but instead can call this function.
271

272
    """
273
    if not self.fail_msg:
274
      return
275

    
276
    if not msg: # one could pass None for default message
277
      msg = ("Call '%s' to node '%s' has failed: %s" %
278
             (self.call, self.node, self.fail_msg))
279
    else:
280
      msg = "%s: %s" % (msg, self.fail_msg)
281
    if prereq:
282
      ec = errors.OpPrereqError
283
    else:
284
      ec = errors.OpExecError
285
    if ecode is not None:
286
      args = (msg, ecode)
287
    else:
288
      args = (msg, )
289
    raise ec(*args) # pylint: disable=W0142
290

    
291

    
292
def _SsconfResolver(node_list,
293
                    ssc=ssconf.SimpleStore,
294
                    nslookup_fn=netutils.Hostname.GetIP):
295
  """Return addresses for given node names.
296

297
  @type node_list: list
298
  @param node_list: List of node names
299
  @type ssc: class
300
  @param ssc: SimpleStore class that is used to obtain node->ip mappings
301
  @type nslookup_fn: callable
302
  @param nslookup_fn: function use to do NS lookup
303
  @rtype: list of tuple; (string, string)
304
  @return: List of tuples containing node name and IP address
305

306
  """
307
  ss = ssc()
308
  iplist = ss.GetNodePrimaryIPList()
309
  family = ss.GetPrimaryIPFamily()
310
  ipmap = dict(entry.split() for entry in iplist)
311

    
312
  result = []
313
  for node in node_list:
314
    ip = ipmap.get(node)
315
    if ip is None:
316
      ip = nslookup_fn(node, family=family)
317
    result.append((node, ip))
318

    
319
  return result
320

    
321

    
322
class _StaticResolver:
323
  def __init__(self, addresses):
324
    """Initializes this class.
325

326
    """
327
    self._addresses = addresses
328

    
329
  def __call__(self, hosts):
330
    """Returns static addresses for hosts.
331

332
    """
333
    assert len(hosts) == len(self._addresses)
334
    return zip(hosts, self._addresses)
335

    
336

    
337
def _CheckConfigNode(name, node):
338
  """Checks if a node is online.
339

340
  @type name: string
341
  @param name: Node name
342
  @type node: L{objects.Node} or None
343
  @param node: Node object
344

345
  """
346
  if node is None:
347
    # Depend on DNS for name resolution
348
    ip = name
349
  elif node.offline:
350
    ip = _OFFLINE
351
  else:
352
    ip = node.primary_ip
353
  return (name, ip)
354

    
355

    
356
def _NodeConfigResolver(single_node_fn, all_nodes_fn, hosts):
357
  """Calculate node addresses using configuration.
358

359
  """
360
  # Special case for single-host lookups
361
  if len(hosts) == 1:
362
    (name, ) = hosts
363
    return [_CheckConfigNode(name, single_node_fn(name))]
364
  else:
365
    all_nodes = all_nodes_fn()
366
    return [_CheckConfigNode(name, all_nodes.get(name, None))
367
            for name in hosts]
368

    
369

    
370
class _RpcProcessor:
371
  def __init__(self, resolver, port, lock_monitor_cb=None):
372
    """Initializes this class.
373

374
    @param resolver: callable accepting a list of hostnames, returning a list
375
      of tuples containing name and IP address (IP address can be the name or
376
      the special value L{_OFFLINE} to mark offline machines)
377
    @type port: int
378
    @param port: TCP port
379
    @param lock_monitor_cb: Callable for registering with lock monitor
380

381
    """
382
    self._resolver = resolver
383
    self._port = port
384
    self._lock_monitor_cb = lock_monitor_cb
385

    
386
  @staticmethod
387
  def _PrepareRequests(hosts, port, procedure, body, read_timeout):
388
    """Prepares requests by sorting offline hosts into separate list.
389

390
    """
391
    results = {}
392
    requests = {}
393

    
394
    for (name, ip) in hosts:
395
      if ip is _OFFLINE:
396
        # Node is marked as offline
397
        results[name] = RpcResult(node=name, offline=True, call=procedure)
398
      else:
399
        requests[name] = \
400
          http.client.HttpClientRequest(str(ip), port,
401
                                        http.HTTP_PUT, str("/%s" % procedure),
402
                                        headers=_RPC_CLIENT_HEADERS,
403
                                        post_data=body,
404
                                        read_timeout=read_timeout,
405
                                        nicename="%s/%s" % (name, procedure))
406

    
407
    return (results, requests)
408

    
409
  @staticmethod
410
  def _CombineResults(results, requests, procedure):
411
    """Combines pre-computed results for offline hosts with actual call results.
412

413
    """
414
    for name, req in requests.items():
415
      if req.success and req.resp_status_code == http.HTTP_OK:
416
        host_result = RpcResult(data=serializer.LoadJson(req.resp_body),
417
                                node=name, call=procedure)
418
      else:
419
        # TODO: Better error reporting
420
        if req.error:
421
          msg = req.error
422
        else:
423
          msg = req.resp_body
424

    
425
        logging.error("RPC error in %s on node %s: %s", procedure, name, msg)
426
        host_result = RpcResult(data=msg, failed=True, node=name,
427
                                call=procedure)
428

    
429
      results[name] = host_result
430

    
431
    return results
432

    
433
  def __call__(self, hosts, procedure, body, read_timeout=None, http_pool=None):
434
    """Makes an RPC request to a number of nodes.
435

436
    @type hosts: sequence
437
    @param hosts: Hostnames
438
    @type procedure: string
439
    @param procedure: Request path
440
    @type body: string
441
    @param body: Request body
442
    @type read_timeout: int or None
443
    @param read_timeout: Read timeout for request
444

445
    """
446
    assert procedure in _TIMEOUTS, "RPC call not declared in the timeouts table"
447

    
448
    if not http_pool:
449
      http_pool = _thread_local.GetHttpClientPool()
450

    
451
    if read_timeout is None:
452
      read_timeout = _TIMEOUTS[procedure]
453

    
454
    (results, requests) = \
455
      self._PrepareRequests(self._resolver(hosts), self._port, procedure,
456
                            str(body), read_timeout)
457

    
458
    http_pool.ProcessRequests(requests.values(),
459
                              lock_monitor_cb=self._lock_monitor_cb)
460

    
461
    assert not frozenset(results).intersection(requests)
462

    
463
    return self._CombineResults(results, requests, procedure)
464

    
465

    
466
def _EncodeImportExportIO(ieio, ieioargs):
467
  """Encodes import/export I/O information.
468

469
  """
470
  if ieio == constants.IEIO_RAW_DISK:
471
    assert len(ieioargs) == 1
472
    return (ieioargs[0].ToDict(), )
473

    
474
  if ieio == constants.IEIO_SCRIPT:
475
    assert len(ieioargs) == 2
476
    return (ieioargs[0].ToDict(), ieioargs[1])
477

    
478
  return ieioargs
479

    
480

    
481
class RpcRunner(object):
482
  """RPC runner class.
483

484
  """
485
  def __init__(self, context):
486
    """Initialized the RPC runner.
487

488
    @type context: C{masterd.GanetiContext}
489
    @param context: Ganeti context
490

491
    """
492
    self._cfg = context.cfg
493
    self._proc = _RpcProcessor(compat.partial(_NodeConfigResolver,
494
                                              self._cfg.GetNodeInfo,
495
                                              self._cfg.GetAllNodesInfo),
496
                               netutils.GetDaemonPort(constants.NODED),
497
                               lock_monitor_cb=context.glm.AddToLockMonitor)
498

    
499
  def _InstDict(self, instance, hvp=None, bep=None, osp=None):
500
    """Convert the given instance to a dict.
501

502
    This is done via the instance's ToDict() method and additionally
503
    we fill the hvparams with the cluster defaults.
504

505
    @type instance: L{objects.Instance}
506
    @param instance: an Instance object
507
    @type hvp: dict or None
508
    @param hvp: a dictionary with overridden hypervisor parameters
509
    @type bep: dict or None
510
    @param bep: a dictionary with overridden backend parameters
511
    @type osp: dict or None
512
    @param osp: a dictionary with overridden os parameters
513
    @rtype: dict
514
    @return: the instance dict, with the hvparams filled with the
515
        cluster defaults
516

517
    """
518
    idict = instance.ToDict()
519
    cluster = self._cfg.GetClusterInfo()
520
    idict["hvparams"] = cluster.FillHV(instance)
521
    if hvp is not None:
522
      idict["hvparams"].update(hvp)
523
    idict["beparams"] = cluster.FillBE(instance)
524
    if bep is not None:
525
      idict["beparams"].update(bep)
526
    idict["osparams"] = cluster.SimpleFillOS(instance.os, instance.osparams)
527
    if osp is not None:
528
      idict["osparams"].update(osp)
529
    for nic in idict["nics"]:
530
      nic['nicparams'] = objects.FillDict(
531
        cluster.nicparams[constants.PP_DEFAULT],
532
        nic['nicparams'])
533
    return idict
534

    
535
  def _MultiNodeCall(self, node_list, procedure, args, read_timeout=None):
536
    """Helper for making a multi-node call
537

538
    """
539
    body = serializer.DumpJson(args, indent=False)
540
    return self._proc(node_list, procedure, body, read_timeout=read_timeout)
541

    
542
  @staticmethod
543
  def _StaticMultiNodeCall(node_list, procedure, args,
544
                           address_list=None, read_timeout=None):
545
    """Helper for making a multi-node static call
546

547
    """
548
    body = serializer.DumpJson(args, indent=False)
549

    
550
    if address_list is None:
551
      resolver = _SsconfResolver
552
    else:
553
      # Caller provided an address list
554
      resolver = _StaticResolver(address_list)
555

    
556
    proc = _RpcProcessor(resolver,
557
                         netutils.GetDaemonPort(constants.NODED))
558
    return proc(node_list, procedure, body, read_timeout=read_timeout)
559

    
560
  def _SingleNodeCall(self, node, procedure, args, read_timeout=None):
561
    """Helper for making a single-node call
562

563
    """
564
    body = serializer.DumpJson(args, indent=False)
565
    return self._proc([node], procedure, body, read_timeout=read_timeout)[node]
566

    
567
  @classmethod
568
  def _StaticSingleNodeCall(cls, node, procedure, args, read_timeout=None):
569
    """Helper for making a single-node static call
570

571
    """
572
    body = serializer.DumpJson(args, indent=False)
573
    proc = _RpcProcessor(_SsconfResolver,
574
                         netutils.GetDaemonPort(constants.NODED))
575
    return proc([node], procedure, body, read_timeout=read_timeout)[node]
576

    
577
  #
578
  # Begin RPC calls
579
  #
580

    
581
  @_RpcTimeout(_TMO_URGENT)
582
  def call_bdev_sizes(self, node_list, devices):
583
    """Gets the sizes of requested block devices present on a node
584

585
    This is a multi-node call.
586

587
    """
588
    return self._MultiNodeCall(node_list, "bdev_sizes", [devices])
589

    
590
  @_RpcTimeout(_TMO_URGENT)
591
  def call_lv_list(self, node_list, vg_name):
592
    """Gets the logical volumes present in a given volume group.
593

594
    This is a multi-node call.
595

596
    """
597
    return self._MultiNodeCall(node_list, "lv_list", [vg_name])
598

    
599
  @_RpcTimeout(_TMO_URGENT)
600
  def call_vg_list(self, node_list):
601
    """Gets the volume group list.
602

603
    This is a multi-node call.
604

605
    """
606
    return self._MultiNodeCall(node_list, "vg_list", [])
607

    
608
  @_RpcTimeout(_TMO_NORMAL)
609
  def call_storage_list(self, node_list, su_name, su_args, name, fields):
610
    """Get list of storage units.
611

612
    This is a multi-node call.
613

614
    """
615
    return self._MultiNodeCall(node_list, "storage_list",
616
                               [su_name, su_args, name, fields])
617

    
618
  @_RpcTimeout(_TMO_NORMAL)
619
  def call_storage_modify(self, node, su_name, su_args, name, changes):
620
    """Modify a storage unit.
621

622
    This is a single-node call.
623

624
    """
625
    return self._SingleNodeCall(node, "storage_modify",
626
                                [su_name, su_args, name, changes])
627

    
628
  @_RpcTimeout(_TMO_NORMAL)
629
  def call_storage_execute(self, node, su_name, su_args, name, op):
630
    """Executes an operation on a storage unit.
631

632
    This is a single-node call.
633

634
    """
635
    return self._SingleNodeCall(node, "storage_execute",
636
                                [su_name, su_args, name, op])
637

    
638
  @_RpcTimeout(_TMO_URGENT)
639
  def call_bridges_exist(self, node, bridges_list):
640
    """Checks if a node has all the bridges given.
641

642
    This method checks if all bridges given in the bridges_list are
643
    present on the remote node, so that an instance that uses interfaces
644
    on those bridges can be started.
645

646
    This is a single-node call.
647

648
    """
649
    return self._SingleNodeCall(node, "bridges_exist", [bridges_list])
650

    
651
  @_RpcTimeout(_TMO_NORMAL)
652
  def call_instance_start(self, node, instance, hvp, bep, startup_paused):
653
    """Starts an instance.
654

655
    This is a single-node call.
656

657
    """
658
    idict = self._InstDict(instance, hvp=hvp, bep=bep)
659
    return self._SingleNodeCall(node, "instance_start", [idict, startup_paused])
660

    
661
  @_RpcTimeout(_TMO_NORMAL)
662
  def call_instance_shutdown(self, node, instance, timeout):
663
    """Stops an instance.
664

665
    This is a single-node call.
666

667
    """
668
    return self._SingleNodeCall(node, "instance_shutdown",
669
                                [self._InstDict(instance), timeout])
670

    
671
  @_RpcTimeout(_TMO_NORMAL)
672
  def call_migration_info(self, node, instance):
673
    """Gather the information necessary to prepare an instance migration.
674

675
    This is a single-node call.
676

677
    @type node: string
678
    @param node: the node on which the instance is currently running
679
    @type instance: C{objects.Instance}
680
    @param instance: the instance definition
681

682
    """
683
    return self._SingleNodeCall(node, "migration_info",
684
                                [self._InstDict(instance)])
685

    
686
  @_RpcTimeout(_TMO_NORMAL)
687
  def call_accept_instance(self, node, instance, info, target):
688
    """Prepare a node to accept an instance.
689

690
    This is a single-node call.
691

692
    @type node: string
693
    @param node: the target node for the migration
694
    @type instance: C{objects.Instance}
695
    @param instance: the instance definition
696
    @type info: opaque/hypervisor specific (string/data)
697
    @param info: result for the call_migration_info call
698
    @type target: string
699
    @param target: target hostname (usually ip address) (on the node itself)
700

701
    """
702
    return self._SingleNodeCall(node, "accept_instance",
703
                                [self._InstDict(instance), info, target])
704

    
705
  @_RpcTimeout(_TMO_NORMAL)
706
  def call_instance_finalize_migration_dst(self, node, instance, info, success):
707
    """Finalize any target-node migration specific operation.
708

709
    This is called both in case of a successful migration and in case of error
710
    (in which case it should abort the migration).
711

712
    This is a single-node call.
713

714
    @type node: string
715
    @param node: the target node for the migration
716
    @type instance: C{objects.Instance}
717
    @param instance: the instance definition
718
    @type info: opaque/hypervisor specific (string/data)
719
    @param info: result for the call_migration_info call
720
    @type success: boolean
721
    @param success: whether the migration was a success or a failure
722

723
    """
724
    return self._SingleNodeCall(node, "instance_finalize_migration_dst",
725
                                [self._InstDict(instance), info, success])
726

    
727
  @_RpcTimeout(_TMO_SLOW)
728
  def call_instance_migrate(self, node, instance, target, live):
729
    """Migrate an instance.
730

731
    This is a single-node call.
732

733
    @type node: string
734
    @param node: the node on which the instance is currently running
735
    @type instance: C{objects.Instance}
736
    @param instance: the instance definition
737
    @type target: string
738
    @param target: the target node name
739
    @type live: boolean
740
    @param live: whether the migration should be done live or not (the
741
        interpretation of this parameter is left to the hypervisor)
742

743
    """
744
    return self._SingleNodeCall(node, "instance_migrate",
745
                                [self._InstDict(instance), target, live])
746

    
747
  @_RpcTimeout(_TMO_SLOW)
748
  def call_instance_finalize_migration_src(self, node, instance, success, live):
749
    """Finalize the instance migration on the source node.
750

751
    This is a single-node call.
752

753
    @type instance: L{objects.Instance}
754
    @param instance: the instance that was migrated
755
    @type success: bool
756
    @param success: whether the migration succeeded or not
757
    @type live: bool
758
    @param live: whether the user requested a live migration or not
759

760
    """
761
    return self._SingleNodeCall(node, "instance_finalize_migration_src",
762
                                [self._InstDict(instance), success, live])
763

    
764
  @_RpcTimeout(_TMO_SLOW)
765
  def call_instance_get_migration_status(self, node, instance):
766
    """Report migration status.
767

768
    This is a single-node call that must be executed on the source node.
769

770
    @type instance: L{objects.Instance}
771
    @param instance: the instance that is being migrated
772
    @rtype: L{objects.MigrationStatus}
773
    @return: the status of the current migration (one of
774
             L{constants.HV_MIGRATION_VALID_STATUSES}), plus any additional
775
             progress info that can be retrieved from the hypervisor
776

777
    """
778
    result = self._SingleNodeCall(node, "instance_get_migration_status",
779
                                  [self._InstDict(instance)])
780
    if not result.fail_msg and result.payload is not None:
781
      result.payload = objects.MigrationStatus.FromDict(result.payload)
782
    return result
783

    
784
  @_RpcTimeout(_TMO_NORMAL)
785
  def call_instance_reboot(self, node, inst, reboot_type, shutdown_timeout):
786
    """Reboots an instance.
787

788
    This is a single-node call.
789

790
    """
791
    return self._SingleNodeCall(node, "instance_reboot",
792
                                [self._InstDict(inst), reboot_type,
793
                                 shutdown_timeout])
794

    
795
  @_RpcTimeout(_TMO_1DAY)
796
  def call_instance_os_add(self, node, inst, reinstall, debug, osparams=None):
797
    """Installs an OS on the given instance.
798

799
    This is a single-node call.
800

801
    """
802
    return self._SingleNodeCall(node, "instance_os_add",
803
                                [self._InstDict(inst, osp=osparams),
804
                                 reinstall, debug])
805

    
806
  @_RpcTimeout(_TMO_SLOW)
807
  def call_instance_run_rename(self, node, inst, old_name, debug):
808
    """Run the OS rename script for an instance.
809

810
    This is a single-node call.
811

812
    """
813
    return self._SingleNodeCall(node, "instance_run_rename",
814
                                [self._InstDict(inst), old_name, debug])
815

    
816
  @_RpcTimeout(_TMO_URGENT)
817
  def call_instance_info(self, node, instance, hname):
818
    """Returns information about a single instance.
819

820
    This is a single-node call.
821

822
    @type node: list
823
    @param node: the list of nodes to query
824
    @type instance: string
825
    @param instance: the instance name
826
    @type hname: string
827
    @param hname: the hypervisor type of the instance
828

829
    """
830
    return self._SingleNodeCall(node, "instance_info", [instance, hname])
831

    
832
  @_RpcTimeout(_TMO_NORMAL)
833
  def call_instance_migratable(self, node, instance):
834
    """Checks whether the given instance can be migrated.
835

836
    This is a single-node call.
837

838
    @param node: the node to query
839
    @type instance: L{objects.Instance}
840
    @param instance: the instance to check
841

842

843
    """
844
    return self._SingleNodeCall(node, "instance_migratable",
845
                                [self._InstDict(instance)])
846

    
847
  @_RpcTimeout(_TMO_URGENT)
848
  def call_all_instances_info(self, node_list, hypervisor_list):
849
    """Returns information about all instances on the given nodes.
850

851
    This is a multi-node call.
852

853
    @type node_list: list
854
    @param node_list: the list of nodes to query
855
    @type hypervisor_list: list
856
    @param hypervisor_list: the hypervisors to query for instances
857

858
    """
859
    return self._MultiNodeCall(node_list, "all_instances_info",
860
                               [hypervisor_list])
861

    
862
  @_RpcTimeout(_TMO_URGENT)
863
  def call_instance_list(self, node_list, hypervisor_list):
864
    """Returns the list of running instances on a given node.
865

866
    This is a multi-node call.
867

868
    @type node_list: list
869
    @param node_list: the list of nodes to query
870
    @type hypervisor_list: list
871
    @param hypervisor_list: the hypervisors to query for instances
872

873
    """
874
    return self._MultiNodeCall(node_list, "instance_list", [hypervisor_list])
875

    
876
  @_RpcTimeout(_TMO_FAST)
877
  def call_node_tcp_ping(self, node, source, target, port, timeout,
878
                         live_port_needed):
879
    """Do a TcpPing on the remote node
880

881
    This is a single-node call.
882

883
    """
884
    return self._SingleNodeCall(node, "node_tcp_ping",
885
                                [source, target, port, timeout,
886
                                 live_port_needed])
887

    
888
  @_RpcTimeout(_TMO_FAST)
889
  def call_node_has_ip_address(self, node, address):
890
    """Checks if a node has the given IP address.
891

892
    This is a single-node call.
893

894
    """
895
    return self._SingleNodeCall(node, "node_has_ip_address", [address])
896

    
897
  @_RpcTimeout(_TMO_URGENT)
898
  def call_node_info(self, node_list, vg_name, hypervisor_type):
899
    """Return node information.
900

901
    This will return memory information and volume group size and free
902
    space.
903

904
    This is a multi-node call.
905

906
    @type node_list: list
907
    @param node_list: the list of nodes to query
908
    @type vg_name: C{string}
909
    @param vg_name: the name of the volume group to ask for disk space
910
        information
911
    @type hypervisor_type: C{str}
912
    @param hypervisor_type: the name of the hypervisor to ask for
913
        memory information
914

915
    """
916
    return self._MultiNodeCall(node_list, "node_info",
917
                               [vg_name, hypervisor_type])
918

    
919
  @_RpcTimeout(_TMO_NORMAL)
920
  def call_etc_hosts_modify(self, node, mode, name, ip):
921
    """Modify hosts file with name
922

923
    @type node: string
924
    @param node: The node to call
925
    @type mode: string
926
    @param mode: The mode to operate. Currently "add" or "remove"
927
    @type name: string
928
    @param name: The host name to be modified
929
    @type ip: string
930
    @param ip: The ip of the entry (just valid if mode is "add")
931

932
    """
933
    return self._SingleNodeCall(node, "etc_hosts_modify", [mode, name, ip])
934

    
935
  @_RpcTimeout(_TMO_NORMAL)
936
  def call_node_verify(self, node_list, checkdict, cluster_name):
937
    """Request verification of given parameters.
938

939
    This is a multi-node call.
940

941
    """
942
    return self._MultiNodeCall(node_list, "node_verify",
943
                               [checkdict, cluster_name])
944

    
945
  @classmethod
946
  @_RpcTimeout(_TMO_FAST)
947
  def call_node_start_master(cls, node, start_daemons, no_voting):
948
    """Tells a node to activate itself as a master.
949

950
    This is a single-node call.
951

952
    """
953
    return cls._StaticSingleNodeCall(node, "node_start_master",
954
                                     [start_daemons, no_voting])
955

    
956
  @classmethod
957
  @_RpcTimeout(_TMO_FAST)
958
  def call_node_stop_master(cls, node, stop_daemons):
959
    """Tells a node to demote itself from master status.
960

961
    This is a single-node call.
962

963
    """
964
    return cls._StaticSingleNodeCall(node, "node_stop_master", [stop_daemons])
965

    
966
  @classmethod
967
  @_RpcTimeout(_TMO_URGENT)
968
  def call_master_info(cls, node_list):
969
    """Query master info.
970

971
    This is a multi-node call.
972

973
    """
974
    # TODO: should this method query down nodes?
975
    return cls._StaticMultiNodeCall(node_list, "master_info", [])
976

    
977
  @classmethod
978
  @_RpcTimeout(_TMO_URGENT)
979
  def call_version(cls, node_list):
980
    """Query node version.
981

982
    This is a multi-node call.
983

984
    """
985
    return cls._StaticMultiNodeCall(node_list, "version", [])
986

    
987
  @_RpcTimeout(_TMO_NORMAL)
988
  def call_blockdev_create(self, node, bdev, size, owner, on_primary, info):
989
    """Request creation of a given block device.
990

991
    This is a single-node call.
992

993
    """
994
    return self._SingleNodeCall(node, "blockdev_create",
995
                                [bdev.ToDict(), size, owner, on_primary, info])
996

    
997
  @_RpcTimeout(_TMO_SLOW)
998
  def call_blockdev_wipe(self, node, bdev, offset, size):
999
    """Request wipe at given offset with given size of a block device.
1000

1001
    This is a single-node call.
1002

1003
    """
1004
    return self._SingleNodeCall(node, "blockdev_wipe",
1005
                                [bdev.ToDict(), offset, size])
1006

    
1007
  @_RpcTimeout(_TMO_NORMAL)
1008
  def call_blockdev_remove(self, node, bdev):
1009
    """Request removal of a given block device.
1010

1011
    This is a single-node call.
1012

1013
    """
1014
    return self._SingleNodeCall(node, "blockdev_remove", [bdev.ToDict()])
1015

    
1016
  @_RpcTimeout(_TMO_NORMAL)
1017
  def call_blockdev_rename(self, node, devlist):
1018
    """Request rename of the given block devices.
1019

1020
    This is a single-node call.
1021

1022
    """
1023
    return self._SingleNodeCall(node, "blockdev_rename",
1024
                                [(d.ToDict(), uid) for d, uid in devlist])
1025

    
1026
  @_RpcTimeout(_TMO_NORMAL)
1027
  def call_blockdev_pause_resume_sync(self, node, disks, pause):
1028
    """Request a pause/resume of given block device.
1029

1030
    This is a single-node call.
1031

1032
    """
1033
    return self._SingleNodeCall(node, "blockdev_pause_resume_sync",
1034
                                [[bdev.ToDict() for bdev in disks], pause])
1035

    
1036
  @_RpcTimeout(_TMO_NORMAL)
1037
  def call_blockdev_assemble(self, node, disk, owner, on_primary, idx):
1038
    """Request assembling of a given block device.
1039

1040
    This is a single-node call.
1041

1042
    """
1043
    return self._SingleNodeCall(node, "blockdev_assemble",
1044
                                [disk.ToDict(), owner, on_primary, idx])
1045

    
1046
  @_RpcTimeout(_TMO_NORMAL)
1047
  def call_blockdev_shutdown(self, node, disk):
1048
    """Request shutdown of a given block device.
1049

1050
    This is a single-node call.
1051

1052
    """
1053
    return self._SingleNodeCall(node, "blockdev_shutdown", [disk.ToDict()])
1054

    
1055
  @_RpcTimeout(_TMO_NORMAL)
1056
  def call_blockdev_addchildren(self, node, bdev, ndevs):
1057
    """Request adding a list of children to a (mirroring) device.
1058

1059
    This is a single-node call.
1060

1061
    """
1062
    return self._SingleNodeCall(node, "blockdev_addchildren",
1063
                                [bdev.ToDict(),
1064
                                 [disk.ToDict() for disk in ndevs]])
1065

    
1066
  @_RpcTimeout(_TMO_NORMAL)
1067
  def call_blockdev_removechildren(self, node, bdev, ndevs):
1068
    """Request removing a list of children from a (mirroring) device.
1069

1070
    This is a single-node call.
1071

1072
    """
1073
    return self._SingleNodeCall(node, "blockdev_removechildren",
1074
                                [bdev.ToDict(),
1075
                                 [disk.ToDict() for disk in ndevs]])
1076

    
1077
  @_RpcTimeout(_TMO_NORMAL)
1078
  def call_blockdev_getmirrorstatus(self, node, disks):
1079
    """Request status of a (mirroring) device.
1080

1081
    This is a single-node call.
1082

1083
    """
1084
    result = self._SingleNodeCall(node, "blockdev_getmirrorstatus",
1085
                                  [dsk.ToDict() for dsk in disks])
1086
    if not result.fail_msg:
1087
      result.payload = [objects.BlockDevStatus.FromDict(i)
1088
                        for i in result.payload]
1089
    return result
1090

    
1091
  @_RpcTimeout(_TMO_NORMAL)
1092
  def call_blockdev_getmirrorstatus_multi(self, node_list, node_disks):
1093
    """Request status of (mirroring) devices from multiple nodes.
1094

1095
    This is a multi-node call.
1096

1097
    """
1098
    result = self._MultiNodeCall(node_list, "blockdev_getmirrorstatus_multi",
1099
                                 [dict((name, [dsk.ToDict() for dsk in disks])
1100
                                       for name, disks in node_disks.items())])
1101
    for nres in result.values():
1102
      if nres.fail_msg:
1103
        continue
1104

    
1105
      for idx, (success, status) in enumerate(nres.payload):
1106
        if success:
1107
          nres.payload[idx] = (success, objects.BlockDevStatus.FromDict(status))
1108

    
1109
    return result
1110

    
1111
  @_RpcTimeout(_TMO_NORMAL)
1112
  def call_blockdev_find(self, node, disk):
1113
    """Request identification of a given block device.
1114

1115
    This is a single-node call.
1116

1117
    """
1118
    result = self._SingleNodeCall(node, "blockdev_find", [disk.ToDict()])
1119
    if not result.fail_msg and result.payload is not None:
1120
      result.payload = objects.BlockDevStatus.FromDict(result.payload)
1121
    return result
1122

    
1123
  @_RpcTimeout(_TMO_NORMAL)
1124
  def call_blockdev_close(self, node, instance_name, disks):
1125
    """Closes the given block devices.
1126

1127
    This is a single-node call.
1128

1129
    """
1130
    params = [instance_name, [cf.ToDict() for cf in disks]]
1131
    return self._SingleNodeCall(node, "blockdev_close", params)
1132

    
1133
  @_RpcTimeout(_TMO_NORMAL)
1134
  def call_blockdev_getsize(self, node, disks):
1135
    """Returns the size of the given disks.
1136

1137
    This is a single-node call.
1138

1139
    """
1140
    params = [[cf.ToDict() for cf in disks]]
1141
    return self._SingleNodeCall(node, "blockdev_getsize", params)
1142

    
1143
  @_RpcTimeout(_TMO_NORMAL)
1144
  def call_drbd_disconnect_net(self, node_list, nodes_ip, disks):
1145
    """Disconnects the network of the given drbd devices.
1146

1147
    This is a multi-node call.
1148

1149
    """
1150
    return self._MultiNodeCall(node_list, "drbd_disconnect_net",
1151
                               [nodes_ip, [cf.ToDict() for cf in disks]])
1152

    
1153
  @_RpcTimeout(_TMO_NORMAL)
1154
  def call_drbd_attach_net(self, node_list, nodes_ip,
1155
                           disks, instance_name, multimaster):
1156
    """Disconnects the given drbd devices.
1157

1158
    This is a multi-node call.
1159

1160
    """
1161
    return self._MultiNodeCall(node_list, "drbd_attach_net",
1162
                               [nodes_ip, [cf.ToDict() for cf in disks],
1163
                                instance_name, multimaster])
1164

    
1165
  @_RpcTimeout(_TMO_SLOW)
1166
  def call_drbd_wait_sync(self, node_list, nodes_ip, disks):
1167
    """Waits for the synchronization of drbd devices is complete.
1168

1169
    This is a multi-node call.
1170

1171
    """
1172
    return self._MultiNodeCall(node_list, "drbd_wait_sync",
1173
                               [nodes_ip, [cf.ToDict() for cf in disks]])
1174

    
1175
  @_RpcTimeout(_TMO_URGENT)
1176
  def call_drbd_helper(self, node_list):
1177
    """Gets drbd helper.
1178

1179
    This is a multi-node call.
1180

1181
    """
1182
    return self._MultiNodeCall(node_list, "drbd_helper", [])
1183

    
1184
  @classmethod
1185
  @_RpcTimeout(_TMO_NORMAL)
1186
  def call_upload_file(cls, node_list, file_name, address_list=None):
1187
    """Upload a file.
1188

1189
    The node will refuse the operation in case the file is not on the
1190
    approved file list.
1191

1192
    This is a multi-node call.
1193

1194
    @type node_list: list
1195
    @param node_list: the list of node names to upload to
1196
    @type file_name: str
1197
    @param file_name: the filename to upload
1198
    @type address_list: list or None
1199
    @keyword address_list: an optional list of node addresses, in order
1200
        to optimize the RPC speed
1201

1202
    """
1203
    file_contents = utils.ReadFile(file_name)
1204
    data = _Compress(file_contents)
1205
    st = os.stat(file_name)
1206
    getents = runtime.GetEnts()
1207
    params = [file_name, data, st.st_mode, getents.LookupUid(st.st_uid),
1208
              getents.LookupGid(st.st_gid), st.st_atime, st.st_mtime]
1209
    return cls._StaticMultiNodeCall(node_list, "upload_file", params,
1210
                                    address_list=address_list)
1211

    
1212
  @classmethod
1213
  @_RpcTimeout(_TMO_NORMAL)
1214
  def call_write_ssconf_files(cls, node_list, values):
1215
    """Write ssconf files.
1216

1217
    This is a multi-node call.
1218

1219
    """
1220
    return cls._StaticMultiNodeCall(node_list, "write_ssconf_files", [values])
1221

    
1222
  @_RpcTimeout(_TMO_NORMAL)
1223
  def call_run_oob(self, node, oob_program, command, remote_node, timeout):
1224
    """Runs OOB.
1225

1226
    This is a single-node call.
1227

1228
    """
1229
    return self._SingleNodeCall(node, "run_oob", [oob_program, command,
1230
                                                  remote_node, timeout])
1231

    
1232
  @_RpcTimeout(_TMO_FAST)
1233
  def call_os_diagnose(self, node_list):
1234
    """Request a diagnose of OS definitions.
1235

1236
    This is a multi-node call.
1237

1238
    """
1239
    return self._MultiNodeCall(node_list, "os_diagnose", [])
1240

    
1241
  @_RpcTimeout(_TMO_FAST)
1242
  def call_os_get(self, node, name):
1243
    """Returns an OS definition.
1244

1245
    This is a single-node call.
1246

1247
    """
1248
    result = self._SingleNodeCall(node, "os_get", [name])
1249
    if not result.fail_msg and isinstance(result.payload, dict):
1250
      result.payload = objects.OS.FromDict(result.payload)
1251
    return result
1252

    
1253
  @_RpcTimeout(_TMO_FAST)
1254
  def call_os_validate(self, required, nodes, name, checks, params):
1255
    """Run a validation routine for a given OS.
1256

1257
    This is a multi-node call.
1258

1259
    """
1260
    return self._MultiNodeCall(nodes, "os_validate",
1261
                               [required, name, checks, params])
1262

    
1263
  @_RpcTimeout(_TMO_NORMAL)
1264
  def call_hooks_runner(self, node_list, hpath, phase, env):
1265
    """Call the hooks runner.
1266

1267
    Args:
1268
      - op: the OpCode instance
1269
      - env: a dictionary with the environment
1270

1271
    This is a multi-node call.
1272

1273
    """
1274
    params = [hpath, phase, env]
1275
    return self._MultiNodeCall(node_list, "hooks_runner", params)
1276

    
1277
  @_RpcTimeout(_TMO_NORMAL)
1278
  def call_iallocator_runner(self, node, name, idata):
1279
    """Call an iallocator on a remote node
1280

1281
    Args:
1282
      - name: the iallocator name
1283
      - input: the json-encoded input string
1284

1285
    This is a single-node call.
1286

1287
    """
1288
    return self._SingleNodeCall(node, "iallocator_runner", [name, idata])
1289

    
1290
  @_RpcTimeout(_TMO_NORMAL)
1291
  def call_blockdev_grow(self, node, cf_bdev, amount, dryrun):
1292
    """Request a snapshot of the given block device.
1293

1294
    This is a single-node call.
1295

1296
    """
1297
    return self._SingleNodeCall(node, "blockdev_grow",
1298
                                [cf_bdev.ToDict(), amount, dryrun])
1299

    
1300
  @_RpcTimeout(_TMO_1DAY)
1301
  def call_blockdev_export(self, node, cf_bdev,
1302
                           dest_node, dest_path, cluster_name):
1303
    """Export a given disk to another node.
1304

1305
    This is a single-node call.
1306

1307
    """
1308
    return self._SingleNodeCall(node, "blockdev_export",
1309
                                [cf_bdev.ToDict(), dest_node, dest_path,
1310
                                 cluster_name])
1311

    
1312
  @_RpcTimeout(_TMO_NORMAL)
1313
  def call_blockdev_snapshot(self, node, cf_bdev):
1314
    """Request a snapshot of the given block device.
1315

1316
    This is a single-node call.
1317

1318
    """
1319
    return self._SingleNodeCall(node, "blockdev_snapshot", [cf_bdev.ToDict()])
1320

    
1321
  @_RpcTimeout(_TMO_NORMAL)
1322
  def call_finalize_export(self, node, instance, snap_disks):
1323
    """Request the completion of an export operation.
1324

1325
    This writes the export config file, etc.
1326

1327
    This is a single-node call.
1328

1329
    """
1330
    flat_disks = []
1331
    for disk in snap_disks:
1332
      if isinstance(disk, bool):
1333
        flat_disks.append(disk)
1334
      else:
1335
        flat_disks.append(disk.ToDict())
1336

    
1337
    return self._SingleNodeCall(node, "finalize_export",
1338
                                [self._InstDict(instance), flat_disks])
1339

    
1340
  @_RpcTimeout(_TMO_FAST)
1341
  def call_export_info(self, node, path):
1342
    """Queries the export information in a given path.
1343

1344
    This is a single-node call.
1345

1346
    """
1347
    return self._SingleNodeCall(node, "export_info", [path])
1348

    
1349
  @_RpcTimeout(_TMO_FAST)
1350
  def call_export_list(self, node_list):
1351
    """Gets the stored exports list.
1352

1353
    This is a multi-node call.
1354

1355
    """
1356
    return self._MultiNodeCall(node_list, "export_list", [])
1357

    
1358
  @_RpcTimeout(_TMO_FAST)
1359
  def call_export_remove(self, node, export):
1360
    """Requests removal of a given export.
1361

1362
    This is a single-node call.
1363

1364
    """
1365
    return self._SingleNodeCall(node, "export_remove", [export])
1366

    
1367
  @classmethod
1368
  @_RpcTimeout(_TMO_NORMAL)
1369
  def call_node_leave_cluster(cls, node, modify_ssh_setup):
1370
    """Requests a node to clean the cluster information it has.
1371

1372
    This will remove the configuration information from the ganeti data
1373
    dir.
1374

1375
    This is a single-node call.
1376

1377
    """
1378
    return cls._StaticSingleNodeCall(node, "node_leave_cluster",
1379
                                     [modify_ssh_setup])
1380

    
1381
  @_RpcTimeout(_TMO_FAST)
1382
  def call_node_volumes(self, node_list):
1383
    """Gets all volumes on node(s).
1384

1385
    This is a multi-node call.
1386

1387
    """
1388
    return self._MultiNodeCall(node_list, "node_volumes", [])
1389

    
1390
  @_RpcTimeout(_TMO_FAST)
1391
  def call_node_demote_from_mc(self, node):
1392
    """Demote a node from the master candidate role.
1393

1394
    This is a single-node call.
1395

1396
    """
1397
    return self._SingleNodeCall(node, "node_demote_from_mc", [])
1398

    
1399
  @_RpcTimeout(_TMO_NORMAL)
1400
  def call_node_powercycle(self, node, hypervisor):
1401
    """Tries to powercycle a node.
1402

1403
    This is a single-node call.
1404

1405
    """
1406
    return self._SingleNodeCall(node, "node_powercycle", [hypervisor])
1407

    
1408
  @_RpcTimeout(None)
1409
  def call_test_delay(self, node_list, duration):
1410
    """Sleep for a fixed time on given node(s).
1411

1412
    This is a multi-node call.
1413

1414
    """
1415
    return self._MultiNodeCall(node_list, "test_delay", [duration],
1416
                               read_timeout=int(duration + 5))
1417

    
1418
  @_RpcTimeout(_TMO_FAST)
1419
  def call_file_storage_dir_create(self, node, file_storage_dir):
1420
    """Create the given file storage directory.
1421

1422
    This is a single-node call.
1423

1424
    """
1425
    return self._SingleNodeCall(node, "file_storage_dir_create",
1426
                                [file_storage_dir])
1427

    
1428
  @_RpcTimeout(_TMO_FAST)
1429
  def call_file_storage_dir_remove(self, node, file_storage_dir):
1430
    """Remove the given file storage directory.
1431

1432
    This is a single-node call.
1433

1434
    """
1435
    return self._SingleNodeCall(node, "file_storage_dir_remove",
1436
                                [file_storage_dir])
1437

    
1438
  @_RpcTimeout(_TMO_FAST)
1439
  def call_file_storage_dir_rename(self, node, old_file_storage_dir,
1440
                                   new_file_storage_dir):
1441
    """Rename file storage directory.
1442

1443
    This is a single-node call.
1444

1445
    """
1446
    return self._SingleNodeCall(node, "file_storage_dir_rename",
1447
                                [old_file_storage_dir, new_file_storage_dir])
1448

    
1449
  @classmethod
1450
  @_RpcTimeout(_TMO_URGENT)
1451
  def call_jobqueue_update(cls, node_list, address_list, file_name, content):
1452
    """Update job queue.
1453

1454
    This is a multi-node call.
1455

1456
    """
1457
    return cls._StaticMultiNodeCall(node_list, "jobqueue_update",
1458
                                    [file_name, _Compress(content)],
1459
                                    address_list=address_list)
1460

    
1461
  @classmethod
1462
  @_RpcTimeout(_TMO_NORMAL)
1463
  def call_jobqueue_purge(cls, node):
1464
    """Purge job queue.
1465

1466
    This is a single-node call.
1467

1468
    """
1469
    return cls._StaticSingleNodeCall(node, "jobqueue_purge", [])
1470

    
1471
  @classmethod
1472
  @_RpcTimeout(_TMO_URGENT)
1473
  def call_jobqueue_rename(cls, node_list, address_list, rename):
1474
    """Rename a job queue file.
1475

1476
    This is a multi-node call.
1477

1478
    """
1479
    return cls._StaticMultiNodeCall(node_list, "jobqueue_rename", rename,
1480
                                    address_list=address_list)
1481

    
1482
  @_RpcTimeout(_TMO_NORMAL)
1483
  def call_hypervisor_validate_params(self, node_list, hvname, hvparams):
1484
    """Validate the hypervisor params.
1485

1486
    This is a multi-node call.
1487

1488
    @type node_list: list
1489
    @param node_list: the list of nodes to query
1490
    @type hvname: string
1491
    @param hvname: the hypervisor name
1492
    @type hvparams: dict
1493
    @param hvparams: the hypervisor parameters to be validated
1494

1495
    """
1496
    cluster = self._cfg.GetClusterInfo()
1497
    hv_full = objects.FillDict(cluster.hvparams.get(hvname, {}), hvparams)
1498
    return self._MultiNodeCall(node_list, "hypervisor_validate_params",
1499
                               [hvname, hv_full])
1500

    
1501
  @_RpcTimeout(_TMO_NORMAL)
1502
  def call_x509_cert_create(self, node, validity):
1503
    """Creates a new X509 certificate for SSL/TLS.
1504

1505
    This is a single-node call.
1506

1507
    @type validity: int
1508
    @param validity: Validity in seconds
1509

1510
    """
1511
    return self._SingleNodeCall(node, "x509_cert_create", [validity])
1512

    
1513
  @_RpcTimeout(_TMO_NORMAL)
1514
  def call_x509_cert_remove(self, node, name):
1515
    """Removes a X509 certificate.
1516

1517
    This is a single-node call.
1518

1519
    @type name: string
1520
    @param name: Certificate name
1521

1522
    """
1523
    return self._SingleNodeCall(node, "x509_cert_remove", [name])
1524

    
1525
  @_RpcTimeout(_TMO_NORMAL)
1526
  def call_import_start(self, node, opts, instance, component,
1527
                        dest, dest_args):
1528
    """Starts a listener for an import.
1529

1530
    This is a single-node call.
1531

1532
    @type node: string
1533
    @param node: Node name
1534
    @type instance: C{objects.Instance}
1535
    @param instance: Instance object
1536
    @type component: string
1537
    @param component: which part of the instance is being imported
1538

1539
    """
1540
    return self._SingleNodeCall(node, "import_start",
1541
                                [opts.ToDict(),
1542
                                 self._InstDict(instance), component, dest,
1543
                                 _EncodeImportExportIO(dest, dest_args)])
1544

    
1545
  @_RpcTimeout(_TMO_NORMAL)
1546
  def call_export_start(self, node, opts, host, port,
1547
                        instance, component, source, source_args):
1548
    """Starts an export daemon.
1549

1550
    This is a single-node call.
1551

1552
    @type node: string
1553
    @param node: Node name
1554
    @type instance: C{objects.Instance}
1555
    @param instance: Instance object
1556
    @type component: string
1557
    @param component: which part of the instance is being imported
1558

1559
    """
1560
    return self._SingleNodeCall(node, "export_start",
1561
                                [opts.ToDict(), host, port,
1562
                                 self._InstDict(instance),
1563
                                 component, source,
1564
                                 _EncodeImportExportIO(source, source_args)])
1565

    
1566
  @_RpcTimeout(_TMO_FAST)
1567
  def call_impexp_status(self, node, names):
1568
    """Gets the status of an import or export.
1569

1570
    This is a single-node call.
1571

1572
    @type node: string
1573
    @param node: Node name
1574
    @type names: List of strings
1575
    @param names: Import/export names
1576
    @rtype: List of L{objects.ImportExportStatus} instances
1577
    @return: Returns a list of the state of each named import/export or None if
1578
             a status couldn't be retrieved
1579

1580
    """
1581
    result = self._SingleNodeCall(node, "impexp_status", [names])
1582

    
1583
    if not result.fail_msg:
1584
      decoded = []
1585

    
1586
      for i in result.payload:
1587
        if i is None:
1588
          decoded.append(None)
1589
          continue
1590
        decoded.append(objects.ImportExportStatus.FromDict(i))
1591

    
1592
      result.payload = decoded
1593

    
1594
    return result
1595

    
1596
  @_RpcTimeout(_TMO_NORMAL)
1597
  def call_impexp_abort(self, node, name):
1598
    """Aborts an import or export.
1599

1600
    This is a single-node call.
1601

1602
    @type node: string
1603
    @param node: Node name
1604
    @type name: string
1605
    @param name: Import/export name
1606

1607
    """
1608
    return self._SingleNodeCall(node, "impexp_abort", [name])
1609

    
1610
  @_RpcTimeout(_TMO_NORMAL)
1611
  def call_impexp_cleanup(self, node, name):
1612
    """Cleans up after an import or export.
1613

1614
    This is a single-node call.
1615

1616
    @type node: string
1617
    @param node: Node name
1618
    @type name: string
1619
    @param name: Import/export name
1620

1621
    """
1622
    return self._SingleNodeCall(node, "impexp_cleanup", [name])