Statistics
| Branch: | Tag: | Revision:

root / lib / rpc.py @ 6915fe26

History | View | Annotate | Download (47.7 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Inter-node RPC library.
23

24
"""
25

    
26
# pylint: disable=C0103,R0201,R0904
27
# C0103: Invalid name, since call_ are not valid
28
# R0201: Method could be a function, we keep all rpcs instance methods
29
# as not to change them back and forth between static/instance methods
30
# if they need to start using instance attributes
31
# R0904: Too many public methods
32

    
33
import os
34
import logging
35
import zlib
36
import base64
37
import pycurl
38
import threading
39

    
40
from ganeti import utils
41
from ganeti import objects
42
from ganeti import http
43
from ganeti import serializer
44
from ganeti import constants
45
from ganeti import errors
46
from ganeti import netutils
47
from ganeti import ssconf
48
from ganeti import runtime
49
from ganeti import compat
50

    
51
# pylint has a bug here, doesn't see this import
52
import ganeti.http.client  # pylint: disable=W0611
53

    
54

    
55
# Timeout for connecting to nodes (seconds)
56
_RPC_CONNECT_TIMEOUT = 5
57

    
58
_RPC_CLIENT_HEADERS = [
59
  "Content-type: %s" % http.HTTP_APP_JSON,
60
  "Expect:",
61
  ]
62

    
63
# Various time constants for the timeout table
64
_TMO_URGENT = 60 # one minute
65
_TMO_FAST = 5 * 60 # five minutes
66
_TMO_NORMAL = 15 * 60 # 15 minutes
67
_TMO_SLOW = 3600 # one hour
68
_TMO_4HRS = 4 * 3600
69
_TMO_1DAY = 86400
70

    
71
# Timeout table that will be built later by decorators
72
# Guidelines for choosing timeouts:
73
# - call used during watcher: timeout -> 1min, _TMO_URGENT
74
# - trivial (but be sure it is trivial) (e.g. reading a file): 5min, _TMO_FAST
75
# - other calls: 15 min, _TMO_NORMAL
76
# - special calls (instance add, etc.): either _TMO_SLOW (1h) or huge timeouts
77

    
78
_TIMEOUTS = {
79
}
80

    
81
#: Special value to describe an offline host
82
_OFFLINE = object()
83

    
84

    
85
def Init():
86
  """Initializes the module-global HTTP client manager.
87

88
  Must be called before using any RPC function and while exactly one thread is
89
  running.
90

91
  """
92
  # curl_global_init(3) and curl_global_cleanup(3) must be called with only
93
  # one thread running. This check is just a safety measure -- it doesn't
94
  # cover all cases.
95
  assert threading.activeCount() == 1, \
96
         "Found more than one active thread when initializing pycURL"
97

    
98
  logging.info("Using PycURL %s", pycurl.version)
99

    
100
  pycurl.global_init(pycurl.GLOBAL_ALL)
101

    
102

    
103
def Shutdown():
104
  """Stops the module-global HTTP client manager.
105

106
  Must be called before quitting the program and while exactly one thread is
107
  running.
108

109
  """
110
  pycurl.global_cleanup()
111

    
112

    
113
def _ConfigRpcCurl(curl):
114
  noded_cert = str(constants.NODED_CERT_FILE)
115

    
116
  curl.setopt(pycurl.FOLLOWLOCATION, False)
117
  curl.setopt(pycurl.CAINFO, noded_cert)
118
  curl.setopt(pycurl.SSL_VERIFYHOST, 0)
119
  curl.setopt(pycurl.SSL_VERIFYPEER, True)
120
  curl.setopt(pycurl.SSLCERTTYPE, "PEM")
121
  curl.setopt(pycurl.SSLCERT, noded_cert)
122
  curl.setopt(pycurl.SSLKEYTYPE, "PEM")
123
  curl.setopt(pycurl.SSLKEY, noded_cert)
124
  curl.setopt(pycurl.CONNECTTIMEOUT, _RPC_CONNECT_TIMEOUT)
125

    
126

    
127
def _RpcTimeout(secs):
128
  """Timeout decorator.
129

130
  When applied to a rpc call_* function, it updates the global timeout
131
  table with the given function/timeout.
132

133
  """
134
  def decorator(f):
135
    name = f.__name__
136
    assert name.startswith("call_")
137
    _TIMEOUTS[name[len("call_"):]] = secs
138
    return f
139
  return decorator
140

    
141

    
142
def RunWithRPC(fn):
143
  """RPC-wrapper decorator.
144

145
  When applied to a function, it runs it with the RPC system
146
  initialized, and it shutsdown the system afterwards. This means the
147
  function must be called without RPC being initialized.
148

149
  """
150
  def wrapper(*args, **kwargs):
151
    Init()
152
    try:
153
      return fn(*args, **kwargs)
154
    finally:
155
      Shutdown()
156
  return wrapper
157

    
158

    
159
def _Compress(data):
160
  """Compresses a string for transport over RPC.
161

162
  Small amounts of data are not compressed.
163

164
  @type data: str
165
  @param data: Data
166
  @rtype: tuple
167
  @return: Encoded data to send
168

169
  """
170
  # Small amounts of data are not compressed
171
  if len(data) < 512:
172
    return (constants.RPC_ENCODING_NONE, data)
173

    
174
  # Compress with zlib and encode in base64
175
  return (constants.RPC_ENCODING_ZLIB_BASE64,
176
          base64.b64encode(zlib.compress(data, 3)))
177

    
178

    
179
class RpcResult(object):
180
  """RPC Result class.
181

182
  This class holds an RPC result. It is needed since in multi-node
183
  calls we can't raise an exception just because one one out of many
184
  failed, and therefore we use this class to encapsulate the result.
185

186
  @ivar data: the data payload, for successful results, or None
187
  @ivar call: the name of the RPC call
188
  @ivar node: the name of the node to which we made the call
189
  @ivar offline: whether the operation failed because the node was
190
      offline, as opposed to actual failure; offline=True will always
191
      imply failed=True, in order to allow simpler checking if
192
      the user doesn't care about the exact failure mode
193
  @ivar fail_msg: the error message if the call failed
194

195
  """
196
  def __init__(self, data=None, failed=False, offline=False,
197
               call=None, node=None):
198
    self.offline = offline
199
    self.call = call
200
    self.node = node
201

    
202
    if offline:
203
      self.fail_msg = "Node is marked offline"
204
      self.data = self.payload = None
205
    elif failed:
206
      self.fail_msg = self._EnsureErr(data)
207
      self.data = self.payload = None
208
    else:
209
      self.data = data
210
      if not isinstance(self.data, (tuple, list)):
211
        self.fail_msg = ("RPC layer error: invalid result type (%s)" %
212
                         type(self.data))
213
        self.payload = None
214
      elif len(data) != 2:
215
        self.fail_msg = ("RPC layer error: invalid result length (%d), "
216
                         "expected 2" % len(self.data))
217
        self.payload = None
218
      elif not self.data[0]:
219
        self.fail_msg = self._EnsureErr(self.data[1])
220
        self.payload = None
221
      else:
222
        # finally success
223
        self.fail_msg = None
224
        self.payload = data[1]
225

    
226
    for attr_name in ["call", "data", "fail_msg",
227
                      "node", "offline", "payload"]:
228
      assert hasattr(self, attr_name), "Missing attribute %s" % attr_name
229

    
230
  @staticmethod
231
  def _EnsureErr(val):
232
    """Helper to ensure we return a 'True' value for error."""
233
    if val:
234
      return val
235
    else:
236
      return "No error information"
237

    
238
  def Raise(self, msg, prereq=False, ecode=None):
239
    """If the result has failed, raise an OpExecError.
240

241
    This is used so that LU code doesn't have to check for each
242
    result, but instead can call this function.
243

244
    """
245
    if not self.fail_msg:
246
      return
247

    
248
    if not msg: # one could pass None for default message
249
      msg = ("Call '%s' to node '%s' has failed: %s" %
250
             (self.call, self.node, self.fail_msg))
251
    else:
252
      msg = "%s: %s" % (msg, self.fail_msg)
253
    if prereq:
254
      ec = errors.OpPrereqError
255
    else:
256
      ec = errors.OpExecError
257
    if ecode is not None:
258
      args = (msg, ecode)
259
    else:
260
      args = (msg, )
261
    raise ec(*args) # pylint: disable=W0142
262

    
263

    
264
def _SsconfResolver(node_list,
265
                    ssc=ssconf.SimpleStore,
266
                    nslookup_fn=netutils.Hostname.GetIP):
267
  """Return addresses for given node names.
268

269
  @type node_list: list
270
  @param node_list: List of node names
271
  @type ssc: class
272
  @param ssc: SimpleStore class that is used to obtain node->ip mappings
273
  @type nslookup_fn: callable
274
  @param nslookup_fn: function use to do NS lookup
275
  @rtype: list of tuple; (string, string)
276
  @return: List of tuples containing node name and IP address
277

278
  """
279
  ss = ssc()
280
  iplist = ss.GetNodePrimaryIPList()
281
  family = ss.GetPrimaryIPFamily()
282
  ipmap = dict(entry.split() for entry in iplist)
283

    
284
  result = []
285
  for node in node_list:
286
    ip = ipmap.get(node)
287
    if ip is None:
288
      ip = nslookup_fn(node, family=family)
289
    result.append((node, ip))
290

    
291
  return result
292

    
293

    
294
class _StaticResolver:
295
  def __init__(self, addresses):
296
    """Initializes this class.
297

298
    """
299
    self._addresses = addresses
300

    
301
  def __call__(self, hosts):
302
    """Returns static addresses for hosts.
303

304
    """
305
    assert len(hosts) == len(self._addresses)
306
    return zip(hosts, self._addresses)
307

    
308

    
309
def _CheckConfigNode(name, node):
310
  """Checks if a node is online.
311

312
  @type name: string
313
  @param name: Node name
314
  @type node: L{objects.Node} or None
315
  @param node: Node object
316

317
  """
318
  if node is None:
319
    # Depend on DNS for name resolution
320
    ip = name
321
  elif node.offline:
322
    ip = _OFFLINE
323
  else:
324
    ip = node.primary_ip
325
  return (name, ip)
326

    
327

    
328
def _NodeConfigResolver(single_node_fn, all_nodes_fn, hosts):
329
  """Calculate node addresses using configuration.
330

331
  """
332
  # Special case for single-host lookups
333
  if len(hosts) == 1:
334
    (name, ) = hosts
335
    return [_CheckConfigNode(name, single_node_fn(name))]
336
  else:
337
    all_nodes = all_nodes_fn()
338
    return [_CheckConfigNode(name, all_nodes.get(name, None))
339
            for name in hosts]
340

    
341

    
342
class _RpcProcessor:
343
  def __init__(self, resolver, port, lock_monitor_cb=None):
344
    """Initializes this class.
345

346
    @param resolver: callable accepting a list of hostnames, returning a list
347
      of tuples containing name and IP address (IP address can be the name or
348
      the special value L{_OFFLINE} to mark offline machines)
349
    @type port: int
350
    @param port: TCP port
351
    @param lock_monitor_cb: Callable for registering with lock monitor
352

353
    """
354
    self._resolver = resolver
355
    self._port = port
356
    self._lock_monitor_cb = lock_monitor_cb
357

    
358
  @staticmethod
359
  def _PrepareRequests(hosts, port, procedure, body, read_timeout):
360
    """Prepares requests by sorting offline hosts into separate list.
361

362
    """
363
    results = {}
364
    requests = {}
365

    
366
    for (name, ip) in hosts:
367
      if ip is _OFFLINE:
368
        # Node is marked as offline
369
        results[name] = RpcResult(node=name, offline=True, call=procedure)
370
      else:
371
        requests[name] = \
372
          http.client.HttpClientRequest(str(ip), port,
373
                                        http.HTTP_PUT, str("/%s" % procedure),
374
                                        headers=_RPC_CLIENT_HEADERS,
375
                                        post_data=body,
376
                                        read_timeout=read_timeout,
377
                                        nicename="%s/%s" % (name, procedure),
378
                                        curl_config_fn=_ConfigRpcCurl)
379

    
380
    return (results, requests)
381

    
382
  @staticmethod
383
  def _CombineResults(results, requests, procedure):
384
    """Combines pre-computed results for offline hosts with actual call results.
385

386
    """
387
    for name, req in requests.items():
388
      if req.success and req.resp_status_code == http.HTTP_OK:
389
        host_result = RpcResult(data=serializer.LoadJson(req.resp_body),
390
                                node=name, call=procedure)
391
      else:
392
        # TODO: Better error reporting
393
        if req.error:
394
          msg = req.error
395
        else:
396
          msg = req.resp_body
397

    
398
        logging.error("RPC error in %s on node %s: %s", procedure, name, msg)
399
        host_result = RpcResult(data=msg, failed=True, node=name,
400
                                call=procedure)
401

    
402
      results[name] = host_result
403

    
404
    return results
405

    
406
  def __call__(self, hosts, procedure, body, read_timeout=None,
407
               _req_process_fn=http.client.ProcessRequests):
408
    """Makes an RPC request to a number of nodes.
409

410
    @type hosts: sequence
411
    @param hosts: Hostnames
412
    @type procedure: string
413
    @param procedure: Request path
414
    @type body: string
415
    @param body: Request body
416
    @type read_timeout: int or None
417
    @param read_timeout: Read timeout for request
418

419
    """
420
    assert procedure in _TIMEOUTS, "RPC call not declared in the timeouts table"
421

    
422
    if read_timeout is None:
423
      read_timeout = _TIMEOUTS[procedure]
424

    
425
    (results, requests) = \
426
      self._PrepareRequests(self._resolver(hosts), self._port, procedure,
427
                            str(body), read_timeout)
428

    
429
    _req_process_fn(requests.values(), lock_monitor_cb=self._lock_monitor_cb)
430

    
431
    assert not frozenset(results).intersection(requests)
432

    
433
    return self._CombineResults(results, requests, procedure)
434

    
435

    
436
def _EncodeImportExportIO(ieio, ieioargs):
437
  """Encodes import/export I/O information.
438

439
  """
440
  if ieio == constants.IEIO_RAW_DISK:
441
    assert len(ieioargs) == 1
442
    return (ieioargs[0].ToDict(), )
443

    
444
  if ieio == constants.IEIO_SCRIPT:
445
    assert len(ieioargs) == 2
446
    return (ieioargs[0].ToDict(), ieioargs[1])
447

    
448
  return ieioargs
449

    
450

    
451
class RpcRunner(object):
452
  """RPC runner class.
453

454
  """
455
  def __init__(self, context):
456
    """Initialized the RPC runner.
457

458
    @type context: C{masterd.GanetiContext}
459
    @param context: Ganeti context
460

461
    """
462
    self._cfg = context.cfg
463
    self._proc = _RpcProcessor(compat.partial(_NodeConfigResolver,
464
                                              self._cfg.GetNodeInfo,
465
                                              self._cfg.GetAllNodesInfo),
466
                               netutils.GetDaemonPort(constants.NODED),
467
                               lock_monitor_cb=context.glm.AddToLockMonitor)
468

    
469
  def _InstDict(self, instance, hvp=None, bep=None, osp=None):
470
    """Convert the given instance to a dict.
471

472
    This is done via the instance's ToDict() method and additionally
473
    we fill the hvparams with the cluster defaults.
474

475
    @type instance: L{objects.Instance}
476
    @param instance: an Instance object
477
    @type hvp: dict or None
478
    @param hvp: a dictionary with overridden hypervisor parameters
479
    @type bep: dict or None
480
    @param bep: a dictionary with overridden backend parameters
481
    @type osp: dict or None
482
    @param osp: a dictionary with overridden os parameters
483
    @rtype: dict
484
    @return: the instance dict, with the hvparams filled with the
485
        cluster defaults
486

487
    """
488
    idict = instance.ToDict()
489
    cluster = self._cfg.GetClusterInfo()
490
    idict["hvparams"] = cluster.FillHV(instance)
491
    if hvp is not None:
492
      idict["hvparams"].update(hvp)
493
    idict["beparams"] = cluster.FillBE(instance)
494
    if bep is not None:
495
      idict["beparams"].update(bep)
496
    idict["osparams"] = cluster.SimpleFillOS(instance.os, instance.osparams)
497
    if osp is not None:
498
      idict["osparams"].update(osp)
499
    for nic in idict["nics"]:
500
      nic['nicparams'] = objects.FillDict(
501
        cluster.nicparams[constants.PP_DEFAULT],
502
        nic['nicparams'])
503
    return idict
504

    
505
  def _MultiNodeCall(self, node_list, procedure, args, read_timeout=None):
506
    """Helper for making a multi-node call
507

508
    """
509
    body = serializer.DumpJson(args, indent=False)
510
    return self._proc(node_list, procedure, body, read_timeout=read_timeout)
511

    
512
  @staticmethod
513
  def _StaticMultiNodeCall(node_list, procedure, args,
514
                           address_list=None, read_timeout=None):
515
    """Helper for making a multi-node static call
516

517
    """
518
    body = serializer.DumpJson(args, indent=False)
519

    
520
    if address_list is None:
521
      resolver = _SsconfResolver
522
    else:
523
      # Caller provided an address list
524
      resolver = _StaticResolver(address_list)
525

    
526
    proc = _RpcProcessor(resolver,
527
                         netutils.GetDaemonPort(constants.NODED))
528
    return proc(node_list, procedure, body, read_timeout=read_timeout)
529

    
530
  def _SingleNodeCall(self, node, procedure, args, read_timeout=None):
531
    """Helper for making a single-node call
532

533
    """
534
    body = serializer.DumpJson(args, indent=False)
535
    return self._proc([node], procedure, body, read_timeout=read_timeout)[node]
536

    
537
  @classmethod
538
  def _StaticSingleNodeCall(cls, node, procedure, args, read_timeout=None):
539
    """Helper for making a single-node static call
540

541
    """
542
    body = serializer.DumpJson(args, indent=False)
543
    proc = _RpcProcessor(_SsconfResolver,
544
                         netutils.GetDaemonPort(constants.NODED))
545
    return proc([node], procedure, body, read_timeout=read_timeout)[node]
546

    
547
  #
548
  # Begin RPC calls
549
  #
550

    
551
  @_RpcTimeout(_TMO_URGENT)
552
  def call_bdev_sizes(self, node_list, devices):
553
    """Gets the sizes of requested block devices present on a node
554

555
    This is a multi-node call.
556

557
    """
558
    return self._MultiNodeCall(node_list, "bdev_sizes", [devices])
559

    
560
  @_RpcTimeout(_TMO_URGENT)
561
  def call_lv_list(self, node_list, vg_name):
562
    """Gets the logical volumes present in a given volume group.
563

564
    This is a multi-node call.
565

566
    """
567
    return self._MultiNodeCall(node_list, "lv_list", [vg_name])
568

    
569
  @_RpcTimeout(_TMO_URGENT)
570
  def call_vg_list(self, node_list):
571
    """Gets the volume group list.
572

573
    This is a multi-node call.
574

575
    """
576
    return self._MultiNodeCall(node_list, "vg_list", [])
577

    
578
  @_RpcTimeout(_TMO_NORMAL)
579
  def call_storage_list(self, node_list, su_name, su_args, name, fields):
580
    """Get list of storage units.
581

582
    This is a multi-node call.
583

584
    """
585
    return self._MultiNodeCall(node_list, "storage_list",
586
                               [su_name, su_args, name, fields])
587

    
588
  @_RpcTimeout(_TMO_NORMAL)
589
  def call_storage_modify(self, node, su_name, su_args, name, changes):
590
    """Modify a storage unit.
591

592
    This is a single-node call.
593

594
    """
595
    return self._SingleNodeCall(node, "storage_modify",
596
                                [su_name, su_args, name, changes])
597

    
598
  @_RpcTimeout(_TMO_NORMAL)
599
  def call_storage_execute(self, node, su_name, su_args, name, op):
600
    """Executes an operation on a storage unit.
601

602
    This is a single-node call.
603

604
    """
605
    return self._SingleNodeCall(node, "storage_execute",
606
                                [su_name, su_args, name, op])
607

    
608
  @_RpcTimeout(_TMO_URGENT)
609
  def call_bridges_exist(self, node, bridges_list):
610
    """Checks if a node has all the bridges given.
611

612
    This method checks if all bridges given in the bridges_list are
613
    present on the remote node, so that an instance that uses interfaces
614
    on those bridges can be started.
615

616
    This is a single-node call.
617

618
    """
619
    return self._SingleNodeCall(node, "bridges_exist", [bridges_list])
620

    
621
  @_RpcTimeout(_TMO_NORMAL)
622
  def call_instance_start(self, node, instance, hvp, bep, startup_paused):
623
    """Starts an instance.
624

625
    This is a single-node call.
626

627
    """
628
    idict = self._InstDict(instance, hvp=hvp, bep=bep)
629
    return self._SingleNodeCall(node, "instance_start", [idict, startup_paused])
630

    
631
  @_RpcTimeout(_TMO_NORMAL)
632
  def call_instance_shutdown(self, node, instance, timeout):
633
    """Stops an instance.
634

635
    This is a single-node call.
636

637
    """
638
    return self._SingleNodeCall(node, "instance_shutdown",
639
                                [self._InstDict(instance), timeout])
640

    
641
  @_RpcTimeout(_TMO_NORMAL)
642
  def call_migration_info(self, node, instance):
643
    """Gather the information necessary to prepare an instance migration.
644

645
    This is a single-node call.
646

647
    @type node: string
648
    @param node: the node on which the instance is currently running
649
    @type instance: C{objects.Instance}
650
    @param instance: the instance definition
651

652
    """
653
    return self._SingleNodeCall(node, "migration_info",
654
                                [self._InstDict(instance)])
655

    
656
  @_RpcTimeout(_TMO_NORMAL)
657
  def call_accept_instance(self, node, instance, info, target):
658
    """Prepare a node to accept an instance.
659

660
    This is a single-node call.
661

662
    @type node: string
663
    @param node: the target node for the migration
664
    @type instance: C{objects.Instance}
665
    @param instance: the instance definition
666
    @type info: opaque/hypervisor specific (string/data)
667
    @param info: result for the call_migration_info call
668
    @type target: string
669
    @param target: target hostname (usually ip address) (on the node itself)
670

671
    """
672
    return self._SingleNodeCall(node, "accept_instance",
673
                                [self._InstDict(instance), info, target])
674

    
675
  @_RpcTimeout(_TMO_NORMAL)
676
  def call_instance_finalize_migration_dst(self, node, instance, info, success):
677
    """Finalize any target-node migration specific operation.
678

679
    This is called both in case of a successful migration and in case of error
680
    (in which case it should abort the migration).
681

682
    This is a single-node call.
683

684
    @type node: string
685
    @param node: the target node for the migration
686
    @type instance: C{objects.Instance}
687
    @param instance: the instance definition
688
    @type info: opaque/hypervisor specific (string/data)
689
    @param info: result for the call_migration_info call
690
    @type success: boolean
691
    @param success: whether the migration was a success or a failure
692

693
    """
694
    return self._SingleNodeCall(node, "instance_finalize_migration_dst",
695
                                [self._InstDict(instance), info, success])
696

    
697
  @_RpcTimeout(_TMO_SLOW)
698
  def call_instance_migrate(self, node, instance, target, live):
699
    """Migrate an instance.
700

701
    This is a single-node call.
702

703
    @type node: string
704
    @param node: the node on which the instance is currently running
705
    @type instance: C{objects.Instance}
706
    @param instance: the instance definition
707
    @type target: string
708
    @param target: the target node name
709
    @type live: boolean
710
    @param live: whether the migration should be done live or not (the
711
        interpretation of this parameter is left to the hypervisor)
712

713
    """
714
    return self._SingleNodeCall(node, "instance_migrate",
715
                                [self._InstDict(instance), target, live])
716

    
717
  @_RpcTimeout(_TMO_SLOW)
718
  def call_instance_finalize_migration_src(self, node, instance, success, live):
719
    """Finalize the instance migration on the source node.
720

721
    This is a single-node call.
722

723
    @type instance: L{objects.Instance}
724
    @param instance: the instance that was migrated
725
    @type success: bool
726
    @param success: whether the migration succeeded or not
727
    @type live: bool
728
    @param live: whether the user requested a live migration or not
729

730
    """
731
    return self._SingleNodeCall(node, "instance_finalize_migration_src",
732
                                [self._InstDict(instance), success, live])
733

    
734
  @_RpcTimeout(_TMO_SLOW)
735
  def call_instance_get_migration_status(self, node, instance):
736
    """Report migration status.
737

738
    This is a single-node call that must be executed on the source node.
739

740
    @type instance: L{objects.Instance}
741
    @param instance: the instance that is being migrated
742
    @rtype: L{objects.MigrationStatus}
743
    @return: the status of the current migration (one of
744
             L{constants.HV_MIGRATION_VALID_STATUSES}), plus any additional
745
             progress info that can be retrieved from the hypervisor
746

747
    """
748
    result = self._SingleNodeCall(node, "instance_get_migration_status",
749
                                  [self._InstDict(instance)])
750
    if not result.fail_msg and result.payload is not None:
751
      result.payload = objects.MigrationStatus.FromDict(result.payload)
752
    return result
753

    
754
  @_RpcTimeout(_TMO_NORMAL)
755
  def call_instance_reboot(self, node, inst, reboot_type, shutdown_timeout):
756
    """Reboots an instance.
757

758
    This is a single-node call.
759

760
    """
761
    return self._SingleNodeCall(node, "instance_reboot",
762
                                [self._InstDict(inst), reboot_type,
763
                                 shutdown_timeout])
764

    
765
  @_RpcTimeout(_TMO_1DAY)
766
  def call_instance_os_add(self, node, inst, reinstall, debug, osparams=None):
767
    """Installs an OS on the given instance.
768

769
    This is a single-node call.
770

771
    """
772
    return self._SingleNodeCall(node, "instance_os_add",
773
                                [self._InstDict(inst, osp=osparams),
774
                                 reinstall, debug])
775

    
776
  @_RpcTimeout(_TMO_SLOW)
777
  def call_instance_run_rename(self, node, inst, old_name, debug):
778
    """Run the OS rename script for an instance.
779

780
    This is a single-node call.
781

782
    """
783
    return self._SingleNodeCall(node, "instance_run_rename",
784
                                [self._InstDict(inst), old_name, debug])
785

    
786
  @_RpcTimeout(_TMO_URGENT)
787
  def call_instance_info(self, node, instance, hname):
788
    """Returns information about a single instance.
789

790
    This is a single-node call.
791

792
    @type node: list
793
    @param node: the list of nodes to query
794
    @type instance: string
795
    @param instance: the instance name
796
    @type hname: string
797
    @param hname: the hypervisor type of the instance
798

799
    """
800
    return self._SingleNodeCall(node, "instance_info", [instance, hname])
801

    
802
  @_RpcTimeout(_TMO_NORMAL)
803
  def call_instance_migratable(self, node, instance):
804
    """Checks whether the given instance can be migrated.
805

806
    This is a single-node call.
807

808
    @param node: the node to query
809
    @type instance: L{objects.Instance}
810
    @param instance: the instance to check
811

812

813
    """
814
    return self._SingleNodeCall(node, "instance_migratable",
815
                                [self._InstDict(instance)])
816

    
817
  @_RpcTimeout(_TMO_URGENT)
818
  def call_all_instances_info(self, node_list, hypervisor_list):
819
    """Returns information about all instances on the given nodes.
820

821
    This is a multi-node call.
822

823
    @type node_list: list
824
    @param node_list: the list of nodes to query
825
    @type hypervisor_list: list
826
    @param hypervisor_list: the hypervisors to query for instances
827

828
    """
829
    return self._MultiNodeCall(node_list, "all_instances_info",
830
                               [hypervisor_list])
831

    
832
  @_RpcTimeout(_TMO_URGENT)
833
  def call_instance_list(self, node_list, hypervisor_list):
834
    """Returns the list of running instances on a given node.
835

836
    This is a multi-node call.
837

838
    @type node_list: list
839
    @param node_list: the list of nodes to query
840
    @type hypervisor_list: list
841
    @param hypervisor_list: the hypervisors to query for instances
842

843
    """
844
    return self._MultiNodeCall(node_list, "instance_list", [hypervisor_list])
845

    
846
  @_RpcTimeout(_TMO_FAST)
847
  def call_node_tcp_ping(self, node, source, target, port, timeout,
848
                         live_port_needed):
849
    """Do a TcpPing on the remote node
850

851
    This is a single-node call.
852

853
    """
854
    return self._SingleNodeCall(node, "node_tcp_ping",
855
                                [source, target, port, timeout,
856
                                 live_port_needed])
857

    
858
  @_RpcTimeout(_TMO_FAST)
859
  def call_node_has_ip_address(self, node, address):
860
    """Checks if a node has the given IP address.
861

862
    This is a single-node call.
863

864
    """
865
    return self._SingleNodeCall(node, "node_has_ip_address", [address])
866

    
867
  @_RpcTimeout(_TMO_URGENT)
868
  def call_node_info(self, node_list, vg_name, hypervisor_type):
869
    """Return node information.
870

871
    This will return memory information and volume group size and free
872
    space.
873

874
    This is a multi-node call.
875

876
    @type node_list: list
877
    @param node_list: the list of nodes to query
878
    @type vg_name: C{string}
879
    @param vg_name: the name of the volume group to ask for disk space
880
        information
881
    @type hypervisor_type: C{str}
882
    @param hypervisor_type: the name of the hypervisor to ask for
883
        memory information
884

885
    """
886
    return self._MultiNodeCall(node_list, "node_info",
887
                               [vg_name, hypervisor_type])
888

    
889
  @_RpcTimeout(_TMO_NORMAL)
890
  def call_etc_hosts_modify(self, node, mode, name, ip):
891
    """Modify hosts file with name
892

893
    @type node: string
894
    @param node: The node to call
895
    @type mode: string
896
    @param mode: The mode to operate. Currently "add" or "remove"
897
    @type name: string
898
    @param name: The host name to be modified
899
    @type ip: string
900
    @param ip: The ip of the entry (just valid if mode is "add")
901

902
    """
903
    return self._SingleNodeCall(node, "etc_hosts_modify", [mode, name, ip])
904

    
905
  @_RpcTimeout(_TMO_NORMAL)
906
  def call_node_verify(self, node_list, checkdict, cluster_name):
907
    """Request verification of given parameters.
908

909
    This is a multi-node call.
910

911
    """
912
    return self._MultiNodeCall(node_list, "node_verify",
913
                               [checkdict, cluster_name])
914

    
915
  @classmethod
916
  @_RpcTimeout(_TMO_FAST)
917
  def call_node_start_master_daemons(cls, node, no_voting):
918
    """Starts master daemons on a node.
919

920
    This is a single-node call.
921

922
    """
923
    return cls._StaticSingleNodeCall(node, "node_start_master_daemons",
924
                                     [no_voting])
925

    
926
  @classmethod
927
  @_RpcTimeout(_TMO_FAST)
928
  def call_node_activate_master_ip(cls, node):
929
    """Activates master IP on a node.
930

931
    This is a single-node call.
932

933
    """
934
    return cls._StaticSingleNodeCall(node, "node_activate_master_ip", [])
935

    
936
  @classmethod
937
  @_RpcTimeout(_TMO_FAST)
938
  def call_node_stop_master(cls, node):
939
    """Deactivates master IP and stops master daemons on a node.
940

941
    This is a single-node call.
942

943
    """
944
    return cls._StaticSingleNodeCall(node, "node_stop_master", [])
945

    
946
  @classmethod
947
  @_RpcTimeout(_TMO_FAST)
948
  def call_node_deactivate_master_ip(cls, node):
949
    """Deactivates master IP on a node.
950

951
    This is a single-node call.
952

953
    """
954
    return cls._StaticSingleNodeCall(node, "node_deactivate_master_ip", [])
955

    
956
  @classmethod
957
  @_RpcTimeout(_TMO_FAST)
958
  def call_node_change_master_netmask(cls, node, netmask):
959
    """Change master IP netmask.
960

961
    This is a single-node call.
962

963
    """
964
    return cls._StaticSingleNodeCall(node, "node_change_master_netmask",
965
                  [netmask])
966

    
967
  @classmethod
968
  @_RpcTimeout(_TMO_URGENT)
969
  def call_master_info(cls, node_list):
970
    """Query master info.
971

972
    This is a multi-node call.
973

974
    """
975
    # TODO: should this method query down nodes?
976
    return cls._StaticMultiNodeCall(node_list, "master_info", [])
977

    
978
  @classmethod
979
  @_RpcTimeout(_TMO_URGENT)
980
  def call_version(cls, node_list):
981
    """Query node version.
982

983
    This is a multi-node call.
984

985
    """
986
    return cls._StaticMultiNodeCall(node_list, "version", [])
987

    
988
  @_RpcTimeout(_TMO_NORMAL)
989
  def call_blockdev_create(self, node, bdev, size, owner, on_primary, info):
990
    """Request creation of a given block device.
991

992
    This is a single-node call.
993

994
    """
995
    return self._SingleNodeCall(node, "blockdev_create",
996
                                [bdev.ToDict(), size, owner, on_primary, info])
997

    
998
  @_RpcTimeout(_TMO_SLOW)
999
  def call_blockdev_wipe(self, node, bdev, offset, size):
1000
    """Request wipe at given offset with given size of a block device.
1001

1002
    This is a single-node call.
1003

1004
    """
1005
    return self._SingleNodeCall(node, "blockdev_wipe",
1006
                                [bdev.ToDict(), offset, size])
1007

    
1008
  @_RpcTimeout(_TMO_NORMAL)
1009
  def call_blockdev_remove(self, node, bdev):
1010
    """Request removal of a given block device.
1011

1012
    This is a single-node call.
1013

1014
    """
1015
    return self._SingleNodeCall(node, "blockdev_remove", [bdev.ToDict()])
1016

    
1017
  @_RpcTimeout(_TMO_NORMAL)
1018
  def call_blockdev_rename(self, node, devlist):
1019
    """Request rename of the given block devices.
1020

1021
    This is a single-node call.
1022

1023
    """
1024
    return self._SingleNodeCall(node, "blockdev_rename",
1025
                                [(d.ToDict(), uid) for d, uid in devlist])
1026

    
1027
  @_RpcTimeout(_TMO_NORMAL)
1028
  def call_blockdev_pause_resume_sync(self, node, disks, pause):
1029
    """Request a pause/resume of given block device.
1030

1031
    This is a single-node call.
1032

1033
    """
1034
    return self._SingleNodeCall(node, "blockdev_pause_resume_sync",
1035
                                [[bdev.ToDict() for bdev in disks], pause])
1036

    
1037
  @_RpcTimeout(_TMO_NORMAL)
1038
  def call_blockdev_assemble(self, node, disk, owner, on_primary, idx):
1039
    """Request assembling of a given block device.
1040

1041
    This is a single-node call.
1042

1043
    """
1044
    return self._SingleNodeCall(node, "blockdev_assemble",
1045
                                [disk.ToDict(), owner, on_primary, idx])
1046

    
1047
  @_RpcTimeout(_TMO_NORMAL)
1048
  def call_blockdev_shutdown(self, node, disk):
1049
    """Request shutdown of a given block device.
1050

1051
    This is a single-node call.
1052

1053
    """
1054
    return self._SingleNodeCall(node, "blockdev_shutdown", [disk.ToDict()])
1055

    
1056
  @_RpcTimeout(_TMO_NORMAL)
1057
  def call_blockdev_addchildren(self, node, bdev, ndevs):
1058
    """Request adding a list of children to a (mirroring) device.
1059

1060
    This is a single-node call.
1061

1062
    """
1063
    return self._SingleNodeCall(node, "blockdev_addchildren",
1064
                                [bdev.ToDict(),
1065
                                 [disk.ToDict() for disk in ndevs]])
1066

    
1067
  @_RpcTimeout(_TMO_NORMAL)
1068
  def call_blockdev_removechildren(self, node, bdev, ndevs):
1069
    """Request removing a list of children from a (mirroring) device.
1070

1071
    This is a single-node call.
1072

1073
    """
1074
    return self._SingleNodeCall(node, "blockdev_removechildren",
1075
                                [bdev.ToDict(),
1076
                                 [disk.ToDict() for disk in ndevs]])
1077

    
1078
  @_RpcTimeout(_TMO_NORMAL)
1079
  def call_blockdev_getmirrorstatus(self, node, disks):
1080
    """Request status of a (mirroring) device.
1081

1082
    This is a single-node call.
1083

1084
    """
1085
    result = self._SingleNodeCall(node, "blockdev_getmirrorstatus",
1086
                                  [dsk.ToDict() for dsk in disks])
1087
    if not result.fail_msg:
1088
      result.payload = [objects.BlockDevStatus.FromDict(i)
1089
                        for i in result.payload]
1090
    return result
1091

    
1092
  @_RpcTimeout(_TMO_NORMAL)
1093
  def call_blockdev_getmirrorstatus_multi(self, node_list, node_disks):
1094
    """Request status of (mirroring) devices from multiple nodes.
1095

1096
    This is a multi-node call.
1097

1098
    """
1099
    result = self._MultiNodeCall(node_list, "blockdev_getmirrorstatus_multi",
1100
                                 [dict((name, [dsk.ToDict() for dsk in disks])
1101
                                       for name, disks in node_disks.items())])
1102
    for nres in result.values():
1103
      if nres.fail_msg:
1104
        continue
1105

    
1106
      for idx, (success, status) in enumerate(nres.payload):
1107
        if success:
1108
          nres.payload[idx] = (success, objects.BlockDevStatus.FromDict(status))
1109

    
1110
    return result
1111

    
1112
  @_RpcTimeout(_TMO_NORMAL)
1113
  def call_blockdev_find(self, node, disk):
1114
    """Request identification of a given block device.
1115

1116
    This is a single-node call.
1117

1118
    """
1119
    result = self._SingleNodeCall(node, "blockdev_find", [disk.ToDict()])
1120
    if not result.fail_msg and result.payload is not None:
1121
      result.payload = objects.BlockDevStatus.FromDict(result.payload)
1122
    return result
1123

    
1124
  @_RpcTimeout(_TMO_NORMAL)
1125
  def call_blockdev_close(self, node, instance_name, disks):
1126
    """Closes the given block devices.
1127

1128
    This is a single-node call.
1129

1130
    """
1131
    params = [instance_name, [cf.ToDict() for cf in disks]]
1132
    return self._SingleNodeCall(node, "blockdev_close", params)
1133

    
1134
  @_RpcTimeout(_TMO_NORMAL)
1135
  def call_blockdev_getsize(self, node, disks):
1136
    """Returns the size of the given disks.
1137

1138
    This is a single-node call.
1139

1140
    """
1141
    params = [[cf.ToDict() for cf in disks]]
1142
    return self._SingleNodeCall(node, "blockdev_getsize", params)
1143

    
1144
  @_RpcTimeout(_TMO_NORMAL)
1145
  def call_drbd_disconnect_net(self, node_list, nodes_ip, disks):
1146
    """Disconnects the network of the given drbd devices.
1147

1148
    This is a multi-node call.
1149

1150
    """
1151
    return self._MultiNodeCall(node_list, "drbd_disconnect_net",
1152
                               [nodes_ip, [cf.ToDict() for cf in disks]])
1153

    
1154
  @_RpcTimeout(_TMO_NORMAL)
1155
  def call_drbd_attach_net(self, node_list, nodes_ip,
1156
                           disks, instance_name, multimaster):
1157
    """Disconnects the given drbd devices.
1158

1159
    This is a multi-node call.
1160

1161
    """
1162
    return self._MultiNodeCall(node_list, "drbd_attach_net",
1163
                               [nodes_ip, [cf.ToDict() for cf in disks],
1164
                                instance_name, multimaster])
1165

    
1166
  @_RpcTimeout(_TMO_SLOW)
1167
  def call_drbd_wait_sync(self, node_list, nodes_ip, disks):
1168
    """Waits for the synchronization of drbd devices is complete.
1169

1170
    This is a multi-node call.
1171

1172
    """
1173
    return self._MultiNodeCall(node_list, "drbd_wait_sync",
1174
                               [nodes_ip, [cf.ToDict() for cf in disks]])
1175

    
1176
  @_RpcTimeout(_TMO_URGENT)
1177
  def call_drbd_helper(self, node_list):
1178
    """Gets drbd helper.
1179

1180
    This is a multi-node call.
1181

1182
    """
1183
    return self._MultiNodeCall(node_list, "drbd_helper", [])
1184

    
1185
  @classmethod
1186
  @_RpcTimeout(_TMO_NORMAL)
1187
  def call_upload_file(cls, node_list, file_name, address_list=None):
1188
    """Upload a file.
1189

1190
    The node will refuse the operation in case the file is not on the
1191
    approved file list.
1192

1193
    This is a multi-node call.
1194

1195
    @type node_list: list
1196
    @param node_list: the list of node names to upload to
1197
    @type file_name: str
1198
    @param file_name: the filename to upload
1199
    @type address_list: list or None
1200
    @keyword address_list: an optional list of node addresses, in order
1201
        to optimize the RPC speed
1202

1203
    """
1204
    file_contents = utils.ReadFile(file_name)
1205
    data = _Compress(file_contents)
1206
    st = os.stat(file_name)
1207
    getents = runtime.GetEnts()
1208
    params = [file_name, data, st.st_mode, getents.LookupUid(st.st_uid),
1209
              getents.LookupGid(st.st_gid), st.st_atime, st.st_mtime]
1210
    return cls._StaticMultiNodeCall(node_list, "upload_file", params,
1211
                                    address_list=address_list)
1212

    
1213
  @classmethod
1214
  @_RpcTimeout(_TMO_NORMAL)
1215
  def call_write_ssconf_files(cls, node_list, values):
1216
    """Write ssconf files.
1217

1218
    This is a multi-node call.
1219

1220
    """
1221
    return cls._StaticMultiNodeCall(node_list, "write_ssconf_files", [values])
1222

    
1223
  @_RpcTimeout(_TMO_NORMAL)
1224
  def call_run_oob(self, node, oob_program, command, remote_node, timeout):
1225
    """Runs OOB.
1226

1227
    This is a single-node call.
1228

1229
    """
1230
    return self._SingleNodeCall(node, "run_oob", [oob_program, command,
1231
                                                  remote_node, timeout])
1232

    
1233
  @_RpcTimeout(_TMO_FAST)
1234
  def call_os_diagnose(self, node_list):
1235
    """Request a diagnose of OS definitions.
1236

1237
    This is a multi-node call.
1238

1239
    """
1240
    return self._MultiNodeCall(node_list, "os_diagnose", [])
1241

    
1242
  @_RpcTimeout(_TMO_FAST)
1243
  def call_os_get(self, node, name):
1244
    """Returns an OS definition.
1245

1246
    This is a single-node call.
1247

1248
    """
1249
    result = self._SingleNodeCall(node, "os_get", [name])
1250
    if not result.fail_msg and isinstance(result.payload, dict):
1251
      result.payload = objects.OS.FromDict(result.payload)
1252
    return result
1253

    
1254
  @_RpcTimeout(_TMO_FAST)
1255
  def call_os_validate(self, required, nodes, name, checks, params):
1256
    """Run a validation routine for a given OS.
1257

1258
    This is a multi-node call.
1259

1260
    """
1261
    return self._MultiNodeCall(nodes, "os_validate",
1262
                               [required, name, checks, params])
1263

    
1264
  @_RpcTimeout(_TMO_NORMAL)
1265
  def call_hooks_runner(self, node_list, hpath, phase, env):
1266
    """Call the hooks runner.
1267

1268
    Args:
1269
      - op: the OpCode instance
1270
      - env: a dictionary with the environment
1271

1272
    This is a multi-node call.
1273

1274
    """
1275
    params = [hpath, phase, env]
1276
    return self._MultiNodeCall(node_list, "hooks_runner", params)
1277

    
1278
  @_RpcTimeout(_TMO_NORMAL)
1279
  def call_iallocator_runner(self, node, name, idata):
1280
    """Call an iallocator on a remote node
1281

1282
    Args:
1283
      - name: the iallocator name
1284
      - input: the json-encoded input string
1285

1286
    This is a single-node call.
1287

1288
    """
1289
    return self._SingleNodeCall(node, "iallocator_runner", [name, idata])
1290

    
1291
  @_RpcTimeout(_TMO_NORMAL)
1292
  def call_blockdev_grow(self, node, cf_bdev, amount, dryrun):
1293
    """Request a snapshot of the given block device.
1294

1295
    This is a single-node call.
1296

1297
    """
1298
    return self._SingleNodeCall(node, "blockdev_grow",
1299
                                [cf_bdev.ToDict(), amount, dryrun])
1300

    
1301
  @_RpcTimeout(_TMO_1DAY)
1302
  def call_blockdev_export(self, node, cf_bdev,
1303
                           dest_node, dest_path, cluster_name):
1304
    """Export a given disk to another node.
1305

1306
    This is a single-node call.
1307

1308
    """
1309
    return self._SingleNodeCall(node, "blockdev_export",
1310
                                [cf_bdev.ToDict(), dest_node, dest_path,
1311
                                 cluster_name])
1312

    
1313
  @_RpcTimeout(_TMO_NORMAL)
1314
  def call_blockdev_snapshot(self, node, cf_bdev):
1315
    """Request a snapshot of the given block device.
1316

1317
    This is a single-node call.
1318

1319
    """
1320
    return self._SingleNodeCall(node, "blockdev_snapshot", [cf_bdev.ToDict()])
1321

    
1322
  @_RpcTimeout(_TMO_NORMAL)
1323
  def call_finalize_export(self, node, instance, snap_disks):
1324
    """Request the completion of an export operation.
1325

1326
    This writes the export config file, etc.
1327

1328
    This is a single-node call.
1329

1330
    """
1331
    flat_disks = []
1332
    for disk in snap_disks:
1333
      if isinstance(disk, bool):
1334
        flat_disks.append(disk)
1335
      else:
1336
        flat_disks.append(disk.ToDict())
1337

    
1338
    return self._SingleNodeCall(node, "finalize_export",
1339
                                [self._InstDict(instance), flat_disks])
1340

    
1341
  @_RpcTimeout(_TMO_FAST)
1342
  def call_export_info(self, node, path):
1343
    """Queries the export information in a given path.
1344

1345
    This is a single-node call.
1346

1347
    """
1348
    return self._SingleNodeCall(node, "export_info", [path])
1349

    
1350
  @_RpcTimeout(_TMO_FAST)
1351
  def call_export_list(self, node_list):
1352
    """Gets the stored exports list.
1353

1354
    This is a multi-node call.
1355

1356
    """
1357
    return self._MultiNodeCall(node_list, "export_list", [])
1358

    
1359
  @_RpcTimeout(_TMO_FAST)
1360
  def call_export_remove(self, node, export):
1361
    """Requests removal of a given export.
1362

1363
    This is a single-node call.
1364

1365
    """
1366
    return self._SingleNodeCall(node, "export_remove", [export])
1367

    
1368
  @classmethod
1369
  @_RpcTimeout(_TMO_NORMAL)
1370
  def call_node_leave_cluster(cls, node, modify_ssh_setup):
1371
    """Requests a node to clean the cluster information it has.
1372

1373
    This will remove the configuration information from the ganeti data
1374
    dir.
1375

1376
    This is a single-node call.
1377

1378
    """
1379
    return cls._StaticSingleNodeCall(node, "node_leave_cluster",
1380
                                     [modify_ssh_setup])
1381

    
1382
  @_RpcTimeout(_TMO_FAST)
1383
  def call_node_volumes(self, node_list):
1384
    """Gets all volumes on node(s).
1385

1386
    This is a multi-node call.
1387

1388
    """
1389
    return self._MultiNodeCall(node_list, "node_volumes", [])
1390

    
1391
  @_RpcTimeout(_TMO_FAST)
1392
  def call_node_demote_from_mc(self, node):
1393
    """Demote a node from the master candidate role.
1394

1395
    This is a single-node call.
1396

1397
    """
1398
    return self._SingleNodeCall(node, "node_demote_from_mc", [])
1399

    
1400
  @_RpcTimeout(_TMO_NORMAL)
1401
  def call_node_powercycle(self, node, hypervisor):
1402
    """Tries to powercycle a node.
1403

1404
    This is a single-node call.
1405

1406
    """
1407
    return self._SingleNodeCall(node, "node_powercycle", [hypervisor])
1408

    
1409
  @_RpcTimeout(None)
1410
  def call_test_delay(self, node_list, duration):
1411
    """Sleep for a fixed time on given node(s).
1412

1413
    This is a multi-node call.
1414

1415
    """
1416
    return self._MultiNodeCall(node_list, "test_delay", [duration],
1417
                               read_timeout=int(duration + 5))
1418

    
1419
  @_RpcTimeout(_TMO_FAST)
1420
  def call_file_storage_dir_create(self, node, file_storage_dir):
1421
    """Create the given file storage directory.
1422

1423
    This is a single-node call.
1424

1425
    """
1426
    return self._SingleNodeCall(node, "file_storage_dir_create",
1427
                                [file_storage_dir])
1428

    
1429
  @_RpcTimeout(_TMO_FAST)
1430
  def call_file_storage_dir_remove(self, node, file_storage_dir):
1431
    """Remove the given file storage directory.
1432

1433
    This is a single-node call.
1434

1435
    """
1436
    return self._SingleNodeCall(node, "file_storage_dir_remove",
1437
                                [file_storage_dir])
1438

    
1439
  @_RpcTimeout(_TMO_FAST)
1440
  def call_file_storage_dir_rename(self, node, old_file_storage_dir,
1441
                                   new_file_storage_dir):
1442
    """Rename file storage directory.
1443

1444
    This is a single-node call.
1445

1446
    """
1447
    return self._SingleNodeCall(node, "file_storage_dir_rename",
1448
                                [old_file_storage_dir, new_file_storage_dir])
1449

    
1450
  @classmethod
1451
  @_RpcTimeout(_TMO_URGENT)
1452
  def call_jobqueue_update(cls, node_list, address_list, file_name, content):
1453
    """Update job queue.
1454

1455
    This is a multi-node call.
1456

1457
    """
1458
    return cls._StaticMultiNodeCall(node_list, "jobqueue_update",
1459
                                    [file_name, _Compress(content)],
1460
                                    address_list=address_list)
1461

    
1462
  @classmethod
1463
  @_RpcTimeout(_TMO_NORMAL)
1464
  def call_jobqueue_purge(cls, node):
1465
    """Purge job queue.
1466

1467
    This is a single-node call.
1468

1469
    """
1470
    return cls._StaticSingleNodeCall(node, "jobqueue_purge", [])
1471

    
1472
  @classmethod
1473
  @_RpcTimeout(_TMO_URGENT)
1474
  def call_jobqueue_rename(cls, node_list, address_list, rename):
1475
    """Rename a job queue file.
1476

1477
    This is a multi-node call.
1478

1479
    """
1480
    return cls._StaticMultiNodeCall(node_list, "jobqueue_rename", rename,
1481
                                    address_list=address_list)
1482

    
1483
  @_RpcTimeout(_TMO_NORMAL)
1484
  def call_hypervisor_validate_params(self, node_list, hvname, hvparams):
1485
    """Validate the hypervisor params.
1486

1487
    This is a multi-node call.
1488

1489
    @type node_list: list
1490
    @param node_list: the list of nodes to query
1491
    @type hvname: string
1492
    @param hvname: the hypervisor name
1493
    @type hvparams: dict
1494
    @param hvparams: the hypervisor parameters to be validated
1495

1496
    """
1497
    cluster = self._cfg.GetClusterInfo()
1498
    hv_full = objects.FillDict(cluster.hvparams.get(hvname, {}), hvparams)
1499
    return self._MultiNodeCall(node_list, "hypervisor_validate_params",
1500
                               [hvname, hv_full])
1501

    
1502
  @_RpcTimeout(_TMO_NORMAL)
1503
  def call_x509_cert_create(self, node, validity):
1504
    """Creates a new X509 certificate for SSL/TLS.
1505

1506
    This is a single-node call.
1507

1508
    @type validity: int
1509
    @param validity: Validity in seconds
1510

1511
    """
1512
    return self._SingleNodeCall(node, "x509_cert_create", [validity])
1513

    
1514
  @_RpcTimeout(_TMO_NORMAL)
1515
  def call_x509_cert_remove(self, node, name):
1516
    """Removes a X509 certificate.
1517

1518
    This is a single-node call.
1519

1520
    @type name: string
1521
    @param name: Certificate name
1522

1523
    """
1524
    return self._SingleNodeCall(node, "x509_cert_remove", [name])
1525

    
1526
  @_RpcTimeout(_TMO_NORMAL)
1527
  def call_import_start(self, node, opts, instance, component,
1528
                        dest, dest_args):
1529
    """Starts a listener for an import.
1530

1531
    This is a single-node call.
1532

1533
    @type node: string
1534
    @param node: Node name
1535
    @type instance: C{objects.Instance}
1536
    @param instance: Instance object
1537
    @type component: string
1538
    @param component: which part of the instance is being imported
1539

1540
    """
1541
    return self._SingleNodeCall(node, "import_start",
1542
                                [opts.ToDict(),
1543
                                 self._InstDict(instance), component, dest,
1544
                                 _EncodeImportExportIO(dest, dest_args)])
1545

    
1546
  @_RpcTimeout(_TMO_NORMAL)
1547
  def call_export_start(self, node, opts, host, port,
1548
                        instance, component, source, source_args):
1549
    """Starts an export daemon.
1550

1551
    This is a single-node call.
1552

1553
    @type node: string
1554
    @param node: Node name
1555
    @type instance: C{objects.Instance}
1556
    @param instance: Instance object
1557
    @type component: string
1558
    @param component: which part of the instance is being imported
1559

1560
    """
1561
    return self._SingleNodeCall(node, "export_start",
1562
                                [opts.ToDict(), host, port,
1563
                                 self._InstDict(instance),
1564
                                 component, source,
1565
                                 _EncodeImportExportIO(source, source_args)])
1566

    
1567
  @_RpcTimeout(_TMO_FAST)
1568
  def call_impexp_status(self, node, names):
1569
    """Gets the status of an import or export.
1570

1571
    This is a single-node call.
1572

1573
    @type node: string
1574
    @param node: Node name
1575
    @type names: List of strings
1576
    @param names: Import/export names
1577
    @rtype: List of L{objects.ImportExportStatus} instances
1578
    @return: Returns a list of the state of each named import/export or None if
1579
             a status couldn't be retrieved
1580

1581
    """
1582
    result = self._SingleNodeCall(node, "impexp_status", [names])
1583

    
1584
    if not result.fail_msg:
1585
      decoded = []
1586

    
1587
      for i in result.payload:
1588
        if i is None:
1589
          decoded.append(None)
1590
          continue
1591
        decoded.append(objects.ImportExportStatus.FromDict(i))
1592

    
1593
      result.payload = decoded
1594

    
1595
    return result
1596

    
1597
  @_RpcTimeout(_TMO_NORMAL)
1598
  def call_impexp_abort(self, node, name):
1599
    """Aborts an import or export.
1600

1601
    This is a single-node call.
1602

1603
    @type node: string
1604
    @param node: Node name
1605
    @type name: string
1606
    @param name: Import/export name
1607

1608
    """
1609
    return self._SingleNodeCall(node, "impexp_abort", [name])
1610

    
1611
  @_RpcTimeout(_TMO_NORMAL)
1612
  def call_impexp_cleanup(self, node, name):
1613
    """Cleans up after an import or export.
1614

1615
    This is a single-node call.
1616

1617
    @type node: string
1618
    @param node: Node name
1619
    @type name: string
1620
    @param name: Import/export name
1621

1622
    """
1623
    return self._SingleNodeCall(node, "impexp_cleanup", [name])