Statistics
| Branch: | Tag: | Revision:

root / lib / rpc.py @ c06e0c83

History | View | Annotate | Download (47.5 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Inter-node RPC library.
23

24
"""
25

    
26
# pylint: disable=C0103,R0201,R0904
27
# C0103: Invalid name, since call_ are not valid
28
# R0201: Method could be a function, we keep all rpcs instance methods
29
# as not to change them back and forth between static/instance methods
30
# if they need to start using instance attributes
31
# R0904: Too many public methods
32

    
33
import os
34
import logging
35
import zlib
36
import base64
37
import pycurl
38
import threading
39

    
40
from ganeti import utils
41
from ganeti import objects
42
from ganeti import http
43
from ganeti import serializer
44
from ganeti import constants
45
from ganeti import errors
46
from ganeti import netutils
47
from ganeti import ssconf
48
from ganeti import runtime
49

    
50
# pylint has a bug here, doesn't see this import
51
import ganeti.http.client  # pylint: disable=W0611
52

    
53

    
54
# Timeout for connecting to nodes (seconds)
55
_RPC_CONNECT_TIMEOUT = 5
56

    
57
_RPC_CLIENT_HEADERS = [
58
  "Content-type: %s" % http.HTTP_APP_JSON,
59
  "Expect:",
60
  ]
61

    
62
# Various time constants for the timeout table
63
_TMO_URGENT = 60 # one minute
64
_TMO_FAST = 5 * 60 # five minutes
65
_TMO_NORMAL = 15 * 60 # 15 minutes
66
_TMO_SLOW = 3600 # one hour
67
_TMO_4HRS = 4 * 3600
68
_TMO_1DAY = 86400
69

    
70
# Timeout table that will be built later by decorators
71
# Guidelines for choosing timeouts:
72
# - call used during watcher: timeout -> 1min, _TMO_URGENT
73
# - trivial (but be sure it is trivial) (e.g. reading a file): 5min, _TMO_FAST
74
# - other calls: 15 min, _TMO_NORMAL
75
# - special calls (instance add, etc.): either _TMO_SLOW (1h) or huge timeouts
76

    
77
_TIMEOUTS = {
78
}
79

    
80

    
81
def Init():
82
  """Initializes the module-global HTTP client manager.
83

84
  Must be called before using any RPC function and while exactly one thread is
85
  running.
86

87
  """
88
  # curl_global_init(3) and curl_global_cleanup(3) must be called with only
89
  # one thread running. This check is just a safety measure -- it doesn't
90
  # cover all cases.
91
  assert threading.activeCount() == 1, \
92
         "Found more than one active thread when initializing pycURL"
93

    
94
  logging.info("Using PycURL %s", pycurl.version)
95

    
96
  pycurl.global_init(pycurl.GLOBAL_ALL)
97

    
98

    
99
def Shutdown():
100
  """Stops the module-global HTTP client manager.
101

102
  Must be called before quitting the program and while exactly one thread is
103
  running.
104

105
  """
106
  pycurl.global_cleanup()
107

    
108

    
109
def _ConfigRpcCurl(curl):
110
  noded_cert = str(constants.NODED_CERT_FILE)
111

    
112
  curl.setopt(pycurl.FOLLOWLOCATION, False)
113
  curl.setopt(pycurl.CAINFO, noded_cert)
114
  curl.setopt(pycurl.SSL_VERIFYHOST, 0)
115
  curl.setopt(pycurl.SSL_VERIFYPEER, True)
116
  curl.setopt(pycurl.SSLCERTTYPE, "PEM")
117
  curl.setopt(pycurl.SSLCERT, noded_cert)
118
  curl.setopt(pycurl.SSLKEYTYPE, "PEM")
119
  curl.setopt(pycurl.SSLKEY, noded_cert)
120
  curl.setopt(pycurl.CONNECTTIMEOUT, _RPC_CONNECT_TIMEOUT)
121

    
122

    
123
# Aliasing this module avoids the following warning by epydoc: "Warning: No
124
# information available for ganeti.rpc._RpcThreadLocal's base threading.local"
125
_threading = threading
126

    
127

    
128
class _RpcThreadLocal(_threading.local):
129
  def GetHttpClientPool(self):
130
    """Returns a per-thread HTTP client pool.
131

132
    @rtype: L{http.client.HttpClientPool}
133

134
    """
135
    try:
136
      pool = self.hcp
137
    except AttributeError:
138
      pool = http.client.HttpClientPool(_ConfigRpcCurl)
139
      self.hcp = pool
140

    
141
    return pool
142

    
143

    
144
# Remove module alias (see above)
145
del _threading
146

    
147

    
148
_thread_local = _RpcThreadLocal()
149

    
150

    
151
def _RpcTimeout(secs):
152
  """Timeout decorator.
153

154
  When applied to a rpc call_* function, it updates the global timeout
155
  table with the given function/timeout.
156

157
  """
158
  def decorator(f):
159
    name = f.__name__
160
    assert name.startswith("call_")
161
    _TIMEOUTS[name[len("call_"):]] = secs
162
    return f
163
  return decorator
164

    
165

    
166
def RunWithRPC(fn):
167
  """RPC-wrapper decorator.
168

169
  When applied to a function, it runs it with the RPC system
170
  initialized, and it shutsdown the system afterwards. This means the
171
  function must be called without RPC being initialized.
172

173
  """
174
  def wrapper(*args, **kwargs):
175
    Init()
176
    try:
177
      return fn(*args, **kwargs)
178
    finally:
179
      Shutdown()
180
  return wrapper
181

    
182

    
183
class RpcResult(object):
184
  """RPC Result class.
185

186
  This class holds an RPC result. It is needed since in multi-node
187
  calls we can't raise an exception just because one one out of many
188
  failed, and therefore we use this class to encapsulate the result.
189

190
  @ivar data: the data payload, for successful results, or None
191
  @ivar call: the name of the RPC call
192
  @ivar node: the name of the node to which we made the call
193
  @ivar offline: whether the operation failed because the node was
194
      offline, as opposed to actual failure; offline=True will always
195
      imply failed=True, in order to allow simpler checking if
196
      the user doesn't care about the exact failure mode
197
  @ivar fail_msg: the error message if the call failed
198

199
  """
200
  def __init__(self, data=None, failed=False, offline=False,
201
               call=None, node=None):
202
    self.offline = offline
203
    self.call = call
204
    self.node = node
205

    
206
    if offline:
207
      self.fail_msg = "Node is marked offline"
208
      self.data = self.payload = None
209
    elif failed:
210
      self.fail_msg = self._EnsureErr(data)
211
      self.data = self.payload = None
212
    else:
213
      self.data = data
214
      if not isinstance(self.data, (tuple, list)):
215
        self.fail_msg = ("RPC layer error: invalid result type (%s)" %
216
                         type(self.data))
217
        self.payload = None
218
      elif len(data) != 2:
219
        self.fail_msg = ("RPC layer error: invalid result length (%d), "
220
                         "expected 2" % len(self.data))
221
        self.payload = None
222
      elif not self.data[0]:
223
        self.fail_msg = self._EnsureErr(self.data[1])
224
        self.payload = None
225
      else:
226
        # finally success
227
        self.fail_msg = None
228
        self.payload = data[1]
229

    
230
    for attr_name in ["call", "data", "fail_msg",
231
                      "node", "offline", "payload"]:
232
      assert hasattr(self, attr_name), "Missing attribute %s" % attr_name
233

    
234
  @staticmethod
235
  def _EnsureErr(val):
236
    """Helper to ensure we return a 'True' value for error."""
237
    if val:
238
      return val
239
    else:
240
      return "No error information"
241

    
242
  def Raise(self, msg, prereq=False, ecode=None):
243
    """If the result has failed, raise an OpExecError.
244

245
    This is used so that LU code doesn't have to check for each
246
    result, but instead can call this function.
247

248
    """
249
    if not self.fail_msg:
250
      return
251

    
252
    if not msg: # one could pass None for default message
253
      msg = ("Call '%s' to node '%s' has failed: %s" %
254
             (self.call, self.node, self.fail_msg))
255
    else:
256
      msg = "%s: %s" % (msg, self.fail_msg)
257
    if prereq:
258
      ec = errors.OpPrereqError
259
    else:
260
      ec = errors.OpExecError
261
    if ecode is not None:
262
      args = (msg, ecode)
263
    else:
264
      args = (msg, )
265
    raise ec(*args) # pylint: disable=W0142
266

    
267

    
268
def _AddressLookup(node_list,
269
                   ssc=ssconf.SimpleStore,
270
                   nslookup_fn=netutils.Hostname.GetIP):
271
  """Return addresses for given node names.
272

273
  @type node_list: list
274
  @param node_list: List of node names
275
  @type ssc: class
276
  @param ssc: SimpleStore class that is used to obtain node->ip mappings
277
  @type nslookup_fn: callable
278
  @param nslookup_fn: function use to do NS lookup
279
  @rtype: list of addresses and/or None's
280
  @returns: List of corresponding addresses, if found
281

282
  """
283
  ss = ssc()
284
  iplist = ss.GetNodePrimaryIPList()
285
  family = ss.GetPrimaryIPFamily()
286
  addresses = []
287
  ipmap = dict(entry.split() for entry in iplist)
288
  for node in node_list:
289
    address = ipmap.get(node)
290
    if address is None:
291
      address = nslookup_fn(node, family=family)
292
    addresses.append(address)
293

    
294
  return addresses
295

    
296

    
297
class Client:
298
  """RPC Client class.
299

300
  This class, given a (remote) method name, a list of parameters and a
301
  list of nodes, will contact (in parallel) all nodes, and return a
302
  dict of results (key: node name, value: result).
303

304
  One current bug is that generic failure is still signaled by
305
  'False' result, which is not good. This overloading of values can
306
  cause bugs.
307

308
  """
309
  def __init__(self, procedure, body, port, address_lookup_fn=_AddressLookup):
310
    assert procedure in _TIMEOUTS, ("New RPC call not declared in the"
311
                                    " timeouts table")
312
    self.procedure = procedure
313
    self.body = body
314
    self.port = port
315
    self._request = {}
316
    self._address_lookup_fn = address_lookup_fn
317

    
318
  def ConnectList(self, node_list, address_list=None, read_timeout=None):
319
    """Add a list of nodes to the target nodes.
320

321
    @type node_list: list
322
    @param node_list: the list of node names to connect
323
    @type address_list: list or None
324
    @keyword address_list: either None or a list with node addresses,
325
        which must have the same length as the node list
326
    @type read_timeout: int
327
    @param read_timeout: overwrites default timeout for operation
328

329
    """
330
    if address_list is None:
331
      # Always use IP address instead of node name
332
      address_list = self._address_lookup_fn(node_list)
333

    
334
    assert len(node_list) == len(address_list), \
335
           "Name and address lists must have the same length"
336

    
337
    for node, address in zip(node_list, address_list):
338
      self.ConnectNode(node, address, read_timeout=read_timeout)
339

    
340
  def ConnectNode(self, name, address=None, read_timeout=None):
341
    """Add a node to the target list.
342

343
    @type name: str
344
    @param name: the node name
345
    @type address: str
346
    @param address: the node address, if known
347
    @type read_timeout: int
348
    @param read_timeout: overwrites default timeout for operation
349

350
    """
351
    if address is None:
352
      # Always use IP address instead of node name
353
      address = self._address_lookup_fn([name])[0]
354

    
355
    assert(address is not None)
356

    
357
    if read_timeout is None:
358
      read_timeout = _TIMEOUTS[self.procedure]
359

    
360
    self._request[name] = \
361
      http.client.HttpClientRequest(str(address), self.port,
362
                                    http.HTTP_PUT, str("/%s" % self.procedure),
363
                                    headers=_RPC_CLIENT_HEADERS,
364
                                    post_data=str(self.body),
365
                                    read_timeout=read_timeout)
366

    
367
  def GetResults(self, http_pool=None):
368
    """Call nodes and return results.
369

370
    @rtype: list
371
    @return: List of RPC results
372

373
    """
374
    if not http_pool:
375
      http_pool = _thread_local.GetHttpClientPool()
376

    
377
    http_pool.ProcessRequests(self._request.values())
378

    
379
    results = {}
380

    
381
    for name, req in self._request.iteritems():
382
      if req.success and req.resp_status_code == http.HTTP_OK:
383
        results[name] = RpcResult(data=serializer.LoadJson(req.resp_body),
384
                                  node=name, call=self.procedure)
385
        continue
386

    
387
      # TODO: Better error reporting
388
      if req.error:
389
        msg = req.error
390
      else:
391
        msg = req.resp_body
392

    
393
      logging.error("RPC error in %s from node %s: %s",
394
                    self.procedure, name, msg)
395
      results[name] = RpcResult(data=msg, failed=True, node=name,
396
                                call=self.procedure)
397

    
398
    return results
399

    
400

    
401
def _EncodeImportExportIO(ieio, ieioargs):
402
  """Encodes import/export I/O information.
403

404
  """
405
  if ieio == constants.IEIO_RAW_DISK:
406
    assert len(ieioargs) == 1
407
    return (ieioargs[0].ToDict(), )
408

    
409
  if ieio == constants.IEIO_SCRIPT:
410
    assert len(ieioargs) == 2
411
    return (ieioargs[0].ToDict(), ieioargs[1])
412

    
413
  return ieioargs
414

    
415

    
416
class RpcRunner(object):
417
  """RPC runner class"""
418

    
419
  def __init__(self, cfg):
420
    """Initialized the rpc runner.
421

422
    @type cfg:  C{config.ConfigWriter}
423
    @param cfg: the configuration object that will be used to get data
424
                about the cluster
425

426
    """
427
    self._cfg = cfg
428
    self.port = netutils.GetDaemonPort(constants.NODED)
429

    
430
  def _InstDict(self, instance, hvp=None, bep=None, osp=None):
431
    """Convert the given instance to a dict.
432

433
    This is done via the instance's ToDict() method and additionally
434
    we fill the hvparams with the cluster defaults.
435

436
    @type instance: L{objects.Instance}
437
    @param instance: an Instance object
438
    @type hvp: dict or None
439
    @param hvp: a dictionary with overridden hypervisor parameters
440
    @type bep: dict or None
441
    @param bep: a dictionary with overridden backend parameters
442
    @type osp: dict or None
443
    @param osp: a dictionary with overridden os parameters
444
    @rtype: dict
445
    @return: the instance dict, with the hvparams filled with the
446
        cluster defaults
447

448
    """
449
    idict = instance.ToDict()
450
    cluster = self._cfg.GetClusterInfo()
451
    idict["hvparams"] = cluster.FillHV(instance)
452
    if hvp is not None:
453
      idict["hvparams"].update(hvp)
454
    idict["beparams"] = cluster.FillBE(instance)
455
    if bep is not None:
456
      idict["beparams"].update(bep)
457
    idict["osparams"] = cluster.SimpleFillOS(instance.os, instance.osparams)
458
    if osp is not None:
459
      idict["osparams"].update(osp)
460
    for nic in idict["nics"]:
461
      nic['nicparams'] = objects.FillDict(
462
        cluster.nicparams[constants.PP_DEFAULT],
463
        nic['nicparams'])
464
    return idict
465

    
466
  def _ConnectList(self, client, node_list, call, read_timeout=None):
467
    """Helper for computing node addresses.
468

469
    @type client: L{ganeti.rpc.Client}
470
    @param client: a C{Client} instance
471
    @type node_list: list
472
    @param node_list: the node list we should connect
473
    @type call: string
474
    @param call: the name of the remote procedure call, for filling in
475
        correctly any eventual offline nodes' results
476
    @type read_timeout: int
477
    @param read_timeout: overwrites the default read timeout for the
478
        given operation
479

480
    """
481
    all_nodes = self._cfg.GetAllNodesInfo()
482
    name_list = []
483
    addr_list = []
484
    skip_dict = {}
485
    for node in node_list:
486
      if node in all_nodes:
487
        if all_nodes[node].offline:
488
          skip_dict[node] = RpcResult(node=node, offline=True, call=call)
489
          continue
490
        val = all_nodes[node].primary_ip
491
      else:
492
        val = None
493
      addr_list.append(val)
494
      name_list.append(node)
495
    if name_list:
496
      client.ConnectList(name_list, address_list=addr_list,
497
                         read_timeout=read_timeout)
498
    return skip_dict
499

    
500
  def _ConnectNode(self, client, node, call, read_timeout=None):
501
    """Helper for computing one node's address.
502

503
    @type client: L{ganeti.rpc.Client}
504
    @param client: a C{Client} instance
505
    @type node: str
506
    @param node: the node we should connect
507
    @type call: string
508
    @param call: the name of the remote procedure call, for filling in
509
        correctly any eventual offline nodes' results
510
    @type read_timeout: int
511
    @param read_timeout: overwrites the default read timeout for the
512
        given operation
513

514
    """
515
    node_info = self._cfg.GetNodeInfo(node)
516
    if node_info is not None:
517
      if node_info.offline:
518
        return RpcResult(node=node, offline=True, call=call)
519
      addr = node_info.primary_ip
520
    else:
521
      addr = None
522
    client.ConnectNode(node, address=addr, read_timeout=read_timeout)
523

    
524
  def _MultiNodeCall(self, node_list, procedure, args, read_timeout=None):
525
    """Helper for making a multi-node call
526

527
    """
528
    body = serializer.DumpJson(args, indent=False)
529
    c = Client(procedure, body, self.port)
530
    skip_dict = self._ConnectList(c, node_list, procedure,
531
                                  read_timeout=read_timeout)
532
    skip_dict.update(c.GetResults())
533
    return skip_dict
534

    
535
  @classmethod
536
  def _StaticMultiNodeCall(cls, node_list, procedure, args,
537
                           address_list=None, read_timeout=None):
538
    """Helper for making a multi-node static call
539

540
    """
541
    body = serializer.DumpJson(args, indent=False)
542
    c = Client(procedure, body, netutils.GetDaemonPort(constants.NODED))
543
    c.ConnectList(node_list, address_list=address_list,
544
                  read_timeout=read_timeout)
545
    return c.GetResults()
546

    
547
  def _SingleNodeCall(self, node, procedure, args, read_timeout=None):
548
    """Helper for making a single-node call
549

550
    """
551
    body = serializer.DumpJson(args, indent=False)
552
    c = Client(procedure, body, self.port)
553
    result = self._ConnectNode(c, node, procedure, read_timeout=read_timeout)
554
    if result is None:
555
      # we did connect, node is not offline
556
      result = c.GetResults()[node]
557
    return result
558

    
559
  @classmethod
560
  def _StaticSingleNodeCall(cls, node, procedure, args, read_timeout=None):
561
    """Helper for making a single-node static call
562

563
    """
564
    body = serializer.DumpJson(args, indent=False)
565
    c = Client(procedure, body, netutils.GetDaemonPort(constants.NODED))
566
    c.ConnectNode(node, read_timeout=read_timeout)
567
    return c.GetResults()[node]
568

    
569
  @staticmethod
570
  def _Compress(data):
571
    """Compresses a string for transport over RPC.
572

573
    Small amounts of data are not compressed.
574

575
    @type data: str
576
    @param data: Data
577
    @rtype: tuple
578
    @return: Encoded data to send
579

580
    """
581
    # Small amounts of data are not compressed
582
    if len(data) < 512:
583
      return (constants.RPC_ENCODING_NONE, data)
584

    
585
    # Compress with zlib and encode in base64
586
    return (constants.RPC_ENCODING_ZLIB_BASE64,
587
            base64.b64encode(zlib.compress(data, 3)))
588

    
589
  #
590
  # Begin RPC calls
591
  #
592

    
593
  @_RpcTimeout(_TMO_URGENT)
594
  def call_bdev_sizes(self, node_list, devices):
595
    """Gets the sizes of requested block devices present on a node
596

597
    This is a multi-node call.
598

599
    """
600
    return self._MultiNodeCall(node_list, "bdev_sizes", [devices])
601

    
602
  @_RpcTimeout(_TMO_URGENT)
603
  def call_lv_list(self, node_list, vg_name):
604
    """Gets the logical volumes present in a given volume group.
605

606
    This is a multi-node call.
607

608
    """
609
    return self._MultiNodeCall(node_list, "lv_list", [vg_name])
610

    
611
  @_RpcTimeout(_TMO_URGENT)
612
  def call_vg_list(self, node_list):
613
    """Gets the volume group list.
614

615
    This is a multi-node call.
616

617
    """
618
    return self._MultiNodeCall(node_list, "vg_list", [])
619

    
620
  @_RpcTimeout(_TMO_NORMAL)
621
  def call_storage_list(self, node_list, su_name, su_args, name, fields):
622
    """Get list of storage units.
623

624
    This is a multi-node call.
625

626
    """
627
    return self._MultiNodeCall(node_list, "storage_list",
628
                               [su_name, su_args, name, fields])
629

    
630
  @_RpcTimeout(_TMO_NORMAL)
631
  def call_storage_modify(self, node, su_name, su_args, name, changes):
632
    """Modify a storage unit.
633

634
    This is a single-node call.
635

636
    """
637
    return self._SingleNodeCall(node, "storage_modify",
638
                                [su_name, su_args, name, changes])
639

    
640
  @_RpcTimeout(_TMO_NORMAL)
641
  def call_storage_execute(self, node, su_name, su_args, name, op):
642
    """Executes an operation on a storage unit.
643

644
    This is a single-node call.
645

646
    """
647
    return self._SingleNodeCall(node, "storage_execute",
648
                                [su_name, su_args, name, op])
649

    
650
  @_RpcTimeout(_TMO_URGENT)
651
  def call_bridges_exist(self, node, bridges_list):
652
    """Checks if a node has all the bridges given.
653

654
    This method checks if all bridges given in the bridges_list are
655
    present on the remote node, so that an instance that uses interfaces
656
    on those bridges can be started.
657

658
    This is a single-node call.
659

660
    """
661
    return self._SingleNodeCall(node, "bridges_exist", [bridges_list])
662

    
663
  @_RpcTimeout(_TMO_NORMAL)
664
  def call_instance_start(self, node, instance, hvp, bep, startup_paused):
665
    """Starts an instance.
666

667
    This is a single-node call.
668

669
    """
670
    idict = self._InstDict(instance, hvp=hvp, bep=bep)
671
    return self._SingleNodeCall(node, "instance_start", [idict, startup_paused])
672

    
673
  @_RpcTimeout(_TMO_NORMAL)
674
  def call_instance_shutdown(self, node, instance, timeout):
675
    """Stops an instance.
676

677
    This is a single-node call.
678

679
    """
680
    return self._SingleNodeCall(node, "instance_shutdown",
681
                                [self._InstDict(instance), timeout])
682

    
683
  @_RpcTimeout(_TMO_NORMAL)
684
  def call_migration_info(self, node, instance):
685
    """Gather the information necessary to prepare an instance migration.
686

687
    This is a single-node call.
688

689
    @type node: string
690
    @param node: the node on which the instance is currently running
691
    @type instance: C{objects.Instance}
692
    @param instance: the instance definition
693

694
    """
695
    return self._SingleNodeCall(node, "migration_info",
696
                                [self._InstDict(instance)])
697

    
698
  @_RpcTimeout(_TMO_NORMAL)
699
  def call_accept_instance(self, node, instance, info, target):
700
    """Prepare a node to accept an instance.
701

702
    This is a single-node call.
703

704
    @type node: string
705
    @param node: the target node for the migration
706
    @type instance: C{objects.Instance}
707
    @param instance: the instance definition
708
    @type info: opaque/hypervisor specific (string/data)
709
    @param info: result for the call_migration_info call
710
    @type target: string
711
    @param target: target hostname (usually ip address) (on the node itself)
712

713
    """
714
    return self._SingleNodeCall(node, "accept_instance",
715
                                [self._InstDict(instance), info, target])
716

    
717
  @_RpcTimeout(_TMO_NORMAL)
718
  def call_finalize_migration(self, node, instance, info, success):
719
    """Finalize any target-node migration specific operation.
720

721
    This is called both in case of a successful migration and in case of error
722
    (in which case it should abort the migration).
723

724
    This is a single-node call.
725

726
    @type node: string
727
    @param node: the target node for the migration
728
    @type instance: C{objects.Instance}
729
    @param instance: the instance definition
730
    @type info: opaque/hypervisor specific (string/data)
731
    @param info: result for the call_migration_info call
732
    @type success: boolean
733
    @param success: whether the migration was a success or a failure
734

735
    """
736
    return self._SingleNodeCall(node, "finalize_migration",
737
                                [self._InstDict(instance), info, success])
738

    
739
  @_RpcTimeout(_TMO_SLOW)
740
  def call_instance_migrate(self, node, instance, target, live):
741
    """Migrate an instance.
742

743
    This is a single-node call.
744

745
    @type node: string
746
    @param node: the node on which the instance is currently running
747
    @type instance: C{objects.Instance}
748
    @param instance: the instance definition
749
    @type target: string
750
    @param target: the target node name
751
    @type live: boolean
752
    @param live: whether the migration should be done live or not (the
753
        interpretation of this parameter is left to the hypervisor)
754

755
    """
756
    return self._SingleNodeCall(node, "instance_migrate",
757
                                [self._InstDict(instance), target, live])
758

    
759
  @_RpcTimeout(_TMO_NORMAL)
760
  def call_instance_reboot(self, node, inst, reboot_type, shutdown_timeout):
761
    """Reboots an instance.
762

763
    This is a single-node call.
764

765
    """
766
    return self._SingleNodeCall(node, "instance_reboot",
767
                                [self._InstDict(inst), reboot_type,
768
                                 shutdown_timeout])
769

    
770
  @_RpcTimeout(_TMO_1DAY)
771
  def call_instance_os_add(self, node, inst, reinstall, debug, osparams=None):
772
    """Installs an OS on the given instance.
773

774
    This is a single-node call.
775

776
    """
777
    return self._SingleNodeCall(node, "instance_os_add",
778
                                [self._InstDict(inst, osp=osparams),
779
                                 reinstall, debug])
780

    
781
  @_RpcTimeout(_TMO_SLOW)
782
  def call_instance_run_rename(self, node, inst, old_name, debug):
783
    """Run the OS rename script for an instance.
784

785
    This is a single-node call.
786

787
    """
788
    return self._SingleNodeCall(node, "instance_run_rename",
789
                                [self._InstDict(inst), old_name, debug])
790

    
791
  @_RpcTimeout(_TMO_URGENT)
792
  def call_instance_info(self, node, instance, hname):
793
    """Returns information about a single instance.
794

795
    This is a single-node call.
796

797
    @type node: list
798
    @param node: the list of nodes to query
799
    @type instance: string
800
    @param instance: the instance name
801
    @type hname: string
802
    @param hname: the hypervisor type of the instance
803

804
    """
805
    return self._SingleNodeCall(node, "instance_info", [instance, hname])
806

    
807
  @_RpcTimeout(_TMO_NORMAL)
808
  def call_instance_migratable(self, node, instance):
809
    """Checks whether the given instance can be migrated.
810

811
    This is a single-node call.
812

813
    @param node: the node to query
814
    @type instance: L{objects.Instance}
815
    @param instance: the instance to check
816

817

818
    """
819
    return self._SingleNodeCall(node, "instance_migratable",
820
                                [self._InstDict(instance)])
821

    
822
  @_RpcTimeout(_TMO_URGENT)
823
  def call_all_instances_info(self, node_list, hypervisor_list):
824
    """Returns information about all instances on the given nodes.
825

826
    This is a multi-node call.
827

828
    @type node_list: list
829
    @param node_list: the list of nodes to query
830
    @type hypervisor_list: list
831
    @param hypervisor_list: the hypervisors to query for instances
832

833
    """
834
    return self._MultiNodeCall(node_list, "all_instances_info",
835
                               [hypervisor_list])
836

    
837
  @_RpcTimeout(_TMO_URGENT)
838
  def call_instance_list(self, node_list, hypervisor_list):
839
    """Returns the list of running instances on a given node.
840

841
    This is a multi-node call.
842

843
    @type node_list: list
844
    @param node_list: the list of nodes to query
845
    @type hypervisor_list: list
846
    @param hypervisor_list: the hypervisors to query for instances
847

848
    """
849
    return self._MultiNodeCall(node_list, "instance_list", [hypervisor_list])
850

    
851
  @_RpcTimeout(_TMO_FAST)
852
  def call_node_tcp_ping(self, node, source, target, port, timeout,
853
                         live_port_needed):
854
    """Do a TcpPing on the remote node
855

856
    This is a single-node call.
857

858
    """
859
    return self._SingleNodeCall(node, "node_tcp_ping",
860
                                [source, target, port, timeout,
861
                                 live_port_needed])
862

    
863
  @_RpcTimeout(_TMO_FAST)
864
  def call_node_has_ip_address(self, node, address):
865
    """Checks if a node has the given IP address.
866

867
    This is a single-node call.
868

869
    """
870
    return self._SingleNodeCall(node, "node_has_ip_address", [address])
871

    
872
  @_RpcTimeout(_TMO_URGENT)
873
  def call_node_info(self, node_list, vg_name, hypervisor_type):
874
    """Return node information.
875

876
    This will return memory information and volume group size and free
877
    space.
878

879
    This is a multi-node call.
880

881
    @type node_list: list
882
    @param node_list: the list of nodes to query
883
    @type vg_name: C{string}
884
    @param vg_name: the name of the volume group to ask for disk space
885
        information
886
    @type hypervisor_type: C{str}
887
    @param hypervisor_type: the name of the hypervisor to ask for
888
        memory information
889

890
    """
891
    return self._MultiNodeCall(node_list, "node_info",
892
                               [vg_name, hypervisor_type])
893

    
894
  @_RpcTimeout(_TMO_NORMAL)
895
  def call_etc_hosts_modify(self, node, mode, name, ip):
896
    """Modify hosts file with name
897

898
    @type node: string
899
    @param node: The node to call
900
    @type mode: string
901
    @param mode: The mode to operate. Currently "add" or "remove"
902
    @type name: string
903
    @param name: The host name to be modified
904
    @type ip: string
905
    @param ip: The ip of the entry (just valid if mode is "add")
906

907
    """
908
    return self._SingleNodeCall(node, "etc_hosts_modify", [mode, name, ip])
909

    
910
  @_RpcTimeout(_TMO_NORMAL)
911
  def call_node_verify(self, node_list, checkdict, cluster_name):
912
    """Request verification of given parameters.
913

914
    This is a multi-node call.
915

916
    """
917
    return self._MultiNodeCall(node_list, "node_verify",
918
                               [checkdict, cluster_name])
919

    
920
  @classmethod
921
  @_RpcTimeout(_TMO_FAST)
922
  def call_node_start_master_daemons(cls, node, no_voting):
923
    """Starts master daemons on a node.
924

925
    This is a single-node call.
926

927
    """
928
    return cls._StaticSingleNodeCall(node, "node_start_master_daemons",
929
                                     [no_voting])
930

    
931
  @classmethod
932
  @_RpcTimeout(_TMO_FAST)
933
  def call_node_activate_master_ip(cls, node):
934
    """Activates master IP on a node.
935

936
    This is a single-node call.
937

938
    """
939
    return cls._StaticSingleNodeCall(node, "node_activate_master_ip", [])
940

    
941
  @classmethod
942
  @_RpcTimeout(_TMO_FAST)
943
  def call_node_stop_master(cls, node):
944
    """Deactivates master IP and stops master daemons on a node.
945

946
    This is a single-node call.
947

948
    """
949
    return cls._StaticSingleNodeCall(node, "node_stop_master", [])
950

    
951
  @classmethod
952
  @_RpcTimeout(_TMO_FAST)
953
  def call_node_deactivate_master_ip(cls, node):
954
    """Deactivates master IP on a node.
955

956
    This is a single-node call.
957

958
    """
959
    return cls._StaticSingleNodeCall(node, "node_deactivate_master_ip", [])
960

    
961
  @classmethod
962
  @_RpcTimeout(_TMO_URGENT)
963
  def call_master_info(cls, node_list):
964
    """Query master info.
965

966
    This is a multi-node call.
967

968
    """
969
    # TODO: should this method query down nodes?
970
    return cls._StaticMultiNodeCall(node_list, "master_info", [])
971

    
972
  @classmethod
973
  @_RpcTimeout(_TMO_URGENT)
974
  def call_version(cls, node_list):
975
    """Query node version.
976

977
    This is a multi-node call.
978

979
    """
980
    return cls._StaticMultiNodeCall(node_list, "version", [])
981

    
982
  @_RpcTimeout(_TMO_NORMAL)
983
  def call_blockdev_create(self, node, bdev, size, owner, on_primary, info):
984
    """Request creation of a given block device.
985

986
    This is a single-node call.
987

988
    """
989
    return self._SingleNodeCall(node, "blockdev_create",
990
                                [bdev.ToDict(), size, owner, on_primary, info])
991

    
992
  @_RpcTimeout(_TMO_SLOW)
993
  def call_blockdev_wipe(self, node, bdev, offset, size):
994
    """Request wipe at given offset with given size of a block device.
995

996
    This is a single-node call.
997

998
    """
999
    return self._SingleNodeCall(node, "blockdev_wipe",
1000
                                [bdev.ToDict(), offset, size])
1001

    
1002
  @_RpcTimeout(_TMO_NORMAL)
1003
  def call_blockdev_remove(self, node, bdev):
1004
    """Request removal of a given block device.
1005

1006
    This is a single-node call.
1007

1008
    """
1009
    return self._SingleNodeCall(node, "blockdev_remove", [bdev.ToDict()])
1010

    
1011
  @_RpcTimeout(_TMO_NORMAL)
1012
  def call_blockdev_rename(self, node, devlist):
1013
    """Request rename of the given block devices.
1014

1015
    This is a single-node call.
1016

1017
    """
1018
    return self._SingleNodeCall(node, "blockdev_rename",
1019
                                [(d.ToDict(), uid) for d, uid in devlist])
1020

    
1021
  @_RpcTimeout(_TMO_NORMAL)
1022
  def call_blockdev_pause_resume_sync(self, node, disks, pause):
1023
    """Request a pause/resume of given block device.
1024

1025
    This is a single-node call.
1026

1027
    """
1028
    return self._SingleNodeCall(node, "blockdev_pause_resume_sync",
1029
                                [[bdev.ToDict() for bdev in disks], pause])
1030

    
1031
  @_RpcTimeout(_TMO_NORMAL)
1032
  def call_blockdev_assemble(self, node, disk, owner, on_primary, idx):
1033
    """Request assembling of a given block device.
1034

1035
    This is a single-node call.
1036

1037
    """
1038
    return self._SingleNodeCall(node, "blockdev_assemble",
1039
                                [disk.ToDict(), owner, on_primary, idx])
1040

    
1041
  @_RpcTimeout(_TMO_NORMAL)
1042
  def call_blockdev_shutdown(self, node, disk):
1043
    """Request shutdown of a given block device.
1044

1045
    This is a single-node call.
1046

1047
    """
1048
    return self._SingleNodeCall(node, "blockdev_shutdown", [disk.ToDict()])
1049

    
1050
  @_RpcTimeout(_TMO_NORMAL)
1051
  def call_blockdev_addchildren(self, node, bdev, ndevs):
1052
    """Request adding a list of children to a (mirroring) device.
1053

1054
    This is a single-node call.
1055

1056
    """
1057
    return self._SingleNodeCall(node, "blockdev_addchildren",
1058
                                [bdev.ToDict(),
1059
                                 [disk.ToDict() for disk in ndevs]])
1060

    
1061
  @_RpcTimeout(_TMO_NORMAL)
1062
  def call_blockdev_removechildren(self, node, bdev, ndevs):
1063
    """Request removing a list of children from a (mirroring) device.
1064

1065
    This is a single-node call.
1066

1067
    """
1068
    return self._SingleNodeCall(node, "blockdev_removechildren",
1069
                                [bdev.ToDict(),
1070
                                 [disk.ToDict() for disk in ndevs]])
1071

    
1072
  @_RpcTimeout(_TMO_NORMAL)
1073
  def call_blockdev_getmirrorstatus(self, node, disks):
1074
    """Request status of a (mirroring) device.
1075

1076
    This is a single-node call.
1077

1078
    """
1079
    result = self._SingleNodeCall(node, "blockdev_getmirrorstatus",
1080
                                  [dsk.ToDict() for dsk in disks])
1081
    if not result.fail_msg:
1082
      result.payload = [objects.BlockDevStatus.FromDict(i)
1083
                        for i in result.payload]
1084
    return result
1085

    
1086
  @_RpcTimeout(_TMO_NORMAL)
1087
  def call_blockdev_getmirrorstatus_multi(self, node_list, node_disks):
1088
    """Request status of (mirroring) devices from multiple nodes.
1089

1090
    This is a multi-node call.
1091

1092
    """
1093
    result = self._MultiNodeCall(node_list, "blockdev_getmirrorstatus_multi",
1094
                                 [dict((name, [dsk.ToDict() for dsk in disks])
1095
                                       for name, disks in node_disks.items())])
1096
    for nres in result.values():
1097
      if nres.fail_msg:
1098
        continue
1099

    
1100
      for idx, (success, status) in enumerate(nres.payload):
1101
        if success:
1102
          nres.payload[idx] = (success, objects.BlockDevStatus.FromDict(status))
1103

    
1104
    return result
1105

    
1106
  @_RpcTimeout(_TMO_NORMAL)
1107
  def call_blockdev_find(self, node, disk):
1108
    """Request identification of a given block device.
1109

1110
    This is a single-node call.
1111

1112
    """
1113
    result = self._SingleNodeCall(node, "blockdev_find", [disk.ToDict()])
1114
    if not result.fail_msg and result.payload is not None:
1115
      result.payload = objects.BlockDevStatus.FromDict(result.payload)
1116
    return result
1117

    
1118
  @_RpcTimeout(_TMO_NORMAL)
1119
  def call_blockdev_close(self, node, instance_name, disks):
1120
    """Closes the given block devices.
1121

1122
    This is a single-node call.
1123

1124
    """
1125
    params = [instance_name, [cf.ToDict() for cf in disks]]
1126
    return self._SingleNodeCall(node, "blockdev_close", params)
1127

    
1128
  @_RpcTimeout(_TMO_NORMAL)
1129
  def call_blockdev_getsize(self, node, disks):
1130
    """Returns the size of the given disks.
1131

1132
    This is a single-node call.
1133

1134
    """
1135
    params = [[cf.ToDict() for cf in disks]]
1136
    return self._SingleNodeCall(node, "blockdev_getsize", params)
1137

    
1138
  @_RpcTimeout(_TMO_NORMAL)
1139
  def call_drbd_disconnect_net(self, node_list, nodes_ip, disks):
1140
    """Disconnects the network of the given drbd devices.
1141

1142
    This is a multi-node call.
1143

1144
    """
1145
    return self._MultiNodeCall(node_list, "drbd_disconnect_net",
1146
                               [nodes_ip, [cf.ToDict() for cf in disks]])
1147

    
1148
  @_RpcTimeout(_TMO_NORMAL)
1149
  def call_drbd_attach_net(self, node_list, nodes_ip,
1150
                           disks, instance_name, multimaster):
1151
    """Disconnects the given drbd devices.
1152

1153
    This is a multi-node call.
1154

1155
    """
1156
    return self._MultiNodeCall(node_list, "drbd_attach_net",
1157
                               [nodes_ip, [cf.ToDict() for cf in disks],
1158
                                instance_name, multimaster])
1159

    
1160
  @_RpcTimeout(_TMO_SLOW)
1161
  def call_drbd_wait_sync(self, node_list, nodes_ip, disks):
1162
    """Waits for the synchronization of drbd devices is complete.
1163

1164
    This is a multi-node call.
1165

1166
    """
1167
    return self._MultiNodeCall(node_list, "drbd_wait_sync",
1168
                               [nodes_ip, [cf.ToDict() for cf in disks]])
1169

    
1170
  @_RpcTimeout(_TMO_URGENT)
1171
  def call_drbd_helper(self, node_list):
1172
    """Gets drbd helper.
1173

1174
    This is a multi-node call.
1175

1176
    """
1177
    return self._MultiNodeCall(node_list, "drbd_helper", [])
1178

    
1179
  @classmethod
1180
  @_RpcTimeout(_TMO_NORMAL)
1181
  def call_upload_file(cls, node_list, file_name, address_list=None):
1182
    """Upload a file.
1183

1184
    The node will refuse the operation in case the file is not on the
1185
    approved file list.
1186

1187
    This is a multi-node call.
1188

1189
    @type node_list: list
1190
    @param node_list: the list of node names to upload to
1191
    @type file_name: str
1192
    @param file_name: the filename to upload
1193
    @type address_list: list or None
1194
    @keyword address_list: an optional list of node addresses, in order
1195
        to optimize the RPC speed
1196

1197
    """
1198
    file_contents = utils.ReadFile(file_name)
1199
    data = cls._Compress(file_contents)
1200
    st = os.stat(file_name)
1201
    getents = runtime.GetEnts()
1202
    params = [file_name, data, st.st_mode, getents.LookupUid(st.st_uid),
1203
              getents.LookupGid(st.st_gid), st.st_atime, st.st_mtime]
1204
    return cls._StaticMultiNodeCall(node_list, "upload_file", params,
1205
                                    address_list=address_list)
1206

    
1207
  @classmethod
1208
  @_RpcTimeout(_TMO_NORMAL)
1209
  def call_write_ssconf_files(cls, node_list, values):
1210
    """Write ssconf files.
1211

1212
    This is a multi-node call.
1213

1214
    """
1215
    return cls._StaticMultiNodeCall(node_list, "write_ssconf_files", [values])
1216

    
1217
  @_RpcTimeout(_TMO_NORMAL)
1218
  def call_run_oob(self, node, oob_program, command, remote_node, timeout):
1219
    """Runs OOB.
1220

1221
    This is a single-node call.
1222

1223
    """
1224
    return self._SingleNodeCall(node, "run_oob", [oob_program, command,
1225
                                                  remote_node, timeout])
1226

    
1227
  @_RpcTimeout(_TMO_FAST)
1228
  def call_os_diagnose(self, node_list):
1229
    """Request a diagnose of OS definitions.
1230

1231
    This is a multi-node call.
1232

1233
    """
1234
    return self._MultiNodeCall(node_list, "os_diagnose", [])
1235

    
1236
  @_RpcTimeout(_TMO_FAST)
1237
  def call_os_get(self, node, name):
1238
    """Returns an OS definition.
1239

1240
    This is a single-node call.
1241

1242
    """
1243
    result = self._SingleNodeCall(node, "os_get", [name])
1244
    if not result.fail_msg and isinstance(result.payload, dict):
1245
      result.payload = objects.OS.FromDict(result.payload)
1246
    return result
1247

    
1248
  @_RpcTimeout(_TMO_FAST)
1249
  def call_os_validate(self, required, nodes, name, checks, params):
1250
    """Run a validation routine for a given OS.
1251

1252
    This is a multi-node call.
1253

1254
    """
1255
    return self._MultiNodeCall(nodes, "os_validate",
1256
                               [required, name, checks, params])
1257

    
1258
  @_RpcTimeout(_TMO_NORMAL)
1259
  def call_hooks_runner(self, node_list, hpath, phase, env):
1260
    """Call the hooks runner.
1261

1262
    Args:
1263
      - op: the OpCode instance
1264
      - env: a dictionary with the environment
1265

1266
    This is a multi-node call.
1267

1268
    """
1269
    params = [hpath, phase, env]
1270
    return self._MultiNodeCall(node_list, "hooks_runner", params)
1271

    
1272
  @_RpcTimeout(_TMO_NORMAL)
1273
  def call_iallocator_runner(self, node, name, idata):
1274
    """Call an iallocator on a remote node
1275

1276
    Args:
1277
      - name: the iallocator name
1278
      - input: the json-encoded input string
1279

1280
    This is a single-node call.
1281

1282
    """
1283
    return self._SingleNodeCall(node, "iallocator_runner", [name, idata])
1284

    
1285
  @_RpcTimeout(_TMO_NORMAL)
1286
  def call_blockdev_grow(self, node, cf_bdev, amount, dryrun):
1287
    """Request a snapshot of the given block device.
1288

1289
    This is a single-node call.
1290

1291
    """
1292
    return self._SingleNodeCall(node, "blockdev_grow",
1293
                                [cf_bdev.ToDict(), amount, dryrun])
1294

    
1295
  @_RpcTimeout(_TMO_1DAY)
1296
  def call_blockdev_export(self, node, cf_bdev,
1297
                           dest_node, dest_path, cluster_name):
1298
    """Export a given disk to another node.
1299

1300
    This is a single-node call.
1301

1302
    """
1303
    return self._SingleNodeCall(node, "blockdev_export",
1304
                                [cf_bdev.ToDict(), dest_node, dest_path,
1305
                                 cluster_name])
1306

    
1307
  @_RpcTimeout(_TMO_NORMAL)
1308
  def call_blockdev_snapshot(self, node, cf_bdev):
1309
    """Request a snapshot of the given block device.
1310

1311
    This is a single-node call.
1312

1313
    """
1314
    return self._SingleNodeCall(node, "blockdev_snapshot", [cf_bdev.ToDict()])
1315

    
1316
  @_RpcTimeout(_TMO_NORMAL)
1317
  def call_finalize_export(self, node, instance, snap_disks):
1318
    """Request the completion of an export operation.
1319

1320
    This writes the export config file, etc.
1321

1322
    This is a single-node call.
1323

1324
    """
1325
    flat_disks = []
1326
    for disk in snap_disks:
1327
      if isinstance(disk, bool):
1328
        flat_disks.append(disk)
1329
      else:
1330
        flat_disks.append(disk.ToDict())
1331

    
1332
    return self._SingleNodeCall(node, "finalize_export",
1333
                                [self._InstDict(instance), flat_disks])
1334

    
1335
  @_RpcTimeout(_TMO_FAST)
1336
  def call_export_info(self, node, path):
1337
    """Queries the export information in a given path.
1338

1339
    This is a single-node call.
1340

1341
    """
1342
    return self._SingleNodeCall(node, "export_info", [path])
1343

    
1344
  @_RpcTimeout(_TMO_FAST)
1345
  def call_export_list(self, node_list):
1346
    """Gets the stored exports list.
1347

1348
    This is a multi-node call.
1349

1350
    """
1351
    return self._MultiNodeCall(node_list, "export_list", [])
1352

    
1353
  @_RpcTimeout(_TMO_FAST)
1354
  def call_export_remove(self, node, export):
1355
    """Requests removal of a given export.
1356

1357
    This is a single-node call.
1358

1359
    """
1360
    return self._SingleNodeCall(node, "export_remove", [export])
1361

    
1362
  @classmethod
1363
  @_RpcTimeout(_TMO_NORMAL)
1364
  def call_node_leave_cluster(cls, node, modify_ssh_setup):
1365
    """Requests a node to clean the cluster information it has.
1366

1367
    This will remove the configuration information from the ganeti data
1368
    dir.
1369

1370
    This is a single-node call.
1371

1372
    """
1373
    return cls._StaticSingleNodeCall(node, "node_leave_cluster",
1374
                                     [modify_ssh_setup])
1375

    
1376
  @_RpcTimeout(_TMO_FAST)
1377
  def call_node_volumes(self, node_list):
1378
    """Gets all volumes on node(s).
1379

1380
    This is a multi-node call.
1381

1382
    """
1383
    return self._MultiNodeCall(node_list, "node_volumes", [])
1384

    
1385
  @_RpcTimeout(_TMO_FAST)
1386
  def call_node_demote_from_mc(self, node):
1387
    """Demote a node from the master candidate role.
1388

1389
    This is a single-node call.
1390

1391
    """
1392
    return self._SingleNodeCall(node, "node_demote_from_mc", [])
1393

    
1394
  @_RpcTimeout(_TMO_NORMAL)
1395
  def call_node_powercycle(self, node, hypervisor):
1396
    """Tries to powercycle a node.
1397

1398
    This is a single-node call.
1399

1400
    """
1401
    return self._SingleNodeCall(node, "node_powercycle", [hypervisor])
1402

    
1403
  @_RpcTimeout(None)
1404
  def call_test_delay(self, node_list, duration):
1405
    """Sleep for a fixed time on given node(s).
1406

1407
    This is a multi-node call.
1408

1409
    """
1410
    return self._MultiNodeCall(node_list, "test_delay", [duration],
1411
                               read_timeout=int(duration + 5))
1412

    
1413
  @_RpcTimeout(_TMO_FAST)
1414
  def call_file_storage_dir_create(self, node, file_storage_dir):
1415
    """Create the given file storage directory.
1416

1417
    This is a single-node call.
1418

1419
    """
1420
    return self._SingleNodeCall(node, "file_storage_dir_create",
1421
                                [file_storage_dir])
1422

    
1423
  @_RpcTimeout(_TMO_FAST)
1424
  def call_file_storage_dir_remove(self, node, file_storage_dir):
1425
    """Remove the given file storage directory.
1426

1427
    This is a single-node call.
1428

1429
    """
1430
    return self._SingleNodeCall(node, "file_storage_dir_remove",
1431
                                [file_storage_dir])
1432

    
1433
  @_RpcTimeout(_TMO_FAST)
1434
  def call_file_storage_dir_rename(self, node, old_file_storage_dir,
1435
                                   new_file_storage_dir):
1436
    """Rename file storage directory.
1437

1438
    This is a single-node call.
1439

1440
    """
1441
    return self._SingleNodeCall(node, "file_storage_dir_rename",
1442
                                [old_file_storage_dir, new_file_storage_dir])
1443

    
1444
  @classmethod
1445
  @_RpcTimeout(_TMO_URGENT)
1446
  def call_jobqueue_update(cls, node_list, address_list, file_name, content):
1447
    """Update job queue.
1448

1449
    This is a multi-node call.
1450

1451
    """
1452
    return cls._StaticMultiNodeCall(node_list, "jobqueue_update",
1453
                                    [file_name, cls._Compress(content)],
1454
                                    address_list=address_list)
1455

    
1456
  @classmethod
1457
  @_RpcTimeout(_TMO_NORMAL)
1458
  def call_jobqueue_purge(cls, node):
1459
    """Purge job queue.
1460

1461
    This is a single-node call.
1462

1463
    """
1464
    return cls._StaticSingleNodeCall(node, "jobqueue_purge", [])
1465

    
1466
  @classmethod
1467
  @_RpcTimeout(_TMO_URGENT)
1468
  def call_jobqueue_rename(cls, node_list, address_list, rename):
1469
    """Rename a job queue file.
1470

1471
    This is a multi-node call.
1472

1473
    """
1474
    return cls._StaticMultiNodeCall(node_list, "jobqueue_rename", rename,
1475
                                    address_list=address_list)
1476

    
1477
  @_RpcTimeout(_TMO_NORMAL)
1478
  def call_hypervisor_validate_params(self, node_list, hvname, hvparams):
1479
    """Validate the hypervisor params.
1480

1481
    This is a multi-node call.
1482

1483
    @type node_list: list
1484
    @param node_list: the list of nodes to query
1485
    @type hvname: string
1486
    @param hvname: the hypervisor name
1487
    @type hvparams: dict
1488
    @param hvparams: the hypervisor parameters to be validated
1489

1490
    """
1491
    cluster = self._cfg.GetClusterInfo()
1492
    hv_full = objects.FillDict(cluster.hvparams.get(hvname, {}), hvparams)
1493
    return self._MultiNodeCall(node_list, "hypervisor_validate_params",
1494
                               [hvname, hv_full])
1495

    
1496
  @_RpcTimeout(_TMO_NORMAL)
1497
  def call_x509_cert_create(self, node, validity):
1498
    """Creates a new X509 certificate for SSL/TLS.
1499

1500
    This is a single-node call.
1501

1502
    @type validity: int
1503
    @param validity: Validity in seconds
1504

1505
    """
1506
    return self._SingleNodeCall(node, "x509_cert_create", [validity])
1507

    
1508
  @_RpcTimeout(_TMO_NORMAL)
1509
  def call_x509_cert_remove(self, node, name):
1510
    """Removes a X509 certificate.
1511

1512
    This is a single-node call.
1513

1514
    @type name: string
1515
    @param name: Certificate name
1516

1517
    """
1518
    return self._SingleNodeCall(node, "x509_cert_remove", [name])
1519

    
1520
  @_RpcTimeout(_TMO_NORMAL)
1521
  def call_import_start(self, node, opts, instance, component,
1522
                        dest, dest_args):
1523
    """Starts a listener for an import.
1524

1525
    This is a single-node call.
1526

1527
    @type node: string
1528
    @param node: Node name
1529
    @type instance: C{objects.Instance}
1530
    @param instance: Instance object
1531
    @type component: string
1532
    @param component: which part of the instance is being imported
1533

1534
    """
1535
    return self._SingleNodeCall(node, "import_start",
1536
                                [opts.ToDict(),
1537
                                 self._InstDict(instance), component, dest,
1538
                                 _EncodeImportExportIO(dest, dest_args)])
1539

    
1540
  @_RpcTimeout(_TMO_NORMAL)
1541
  def call_export_start(self, node, opts, host, port,
1542
                        instance, component, source, source_args):
1543
    """Starts an export daemon.
1544

1545
    This is a single-node call.
1546

1547
    @type node: string
1548
    @param node: Node name
1549
    @type instance: C{objects.Instance}
1550
    @param instance: Instance object
1551
    @type component: string
1552
    @param component: which part of the instance is being imported
1553

1554
    """
1555
    return self._SingleNodeCall(node, "export_start",
1556
                                [opts.ToDict(), host, port,
1557
                                 self._InstDict(instance),
1558
                                 component, source,
1559
                                 _EncodeImportExportIO(source, source_args)])
1560

    
1561
  @_RpcTimeout(_TMO_FAST)
1562
  def call_impexp_status(self, node, names):
1563
    """Gets the status of an import or export.
1564

1565
    This is a single-node call.
1566

1567
    @type node: string
1568
    @param node: Node name
1569
    @type names: List of strings
1570
    @param names: Import/export names
1571
    @rtype: List of L{objects.ImportExportStatus} instances
1572
    @return: Returns a list of the state of each named import/export or None if
1573
             a status couldn't be retrieved
1574

1575
    """
1576
    result = self._SingleNodeCall(node, "impexp_status", [names])
1577

    
1578
    if not result.fail_msg:
1579
      decoded = []
1580

    
1581
      for i in result.payload:
1582
        if i is None:
1583
          decoded.append(None)
1584
          continue
1585
        decoded.append(objects.ImportExportStatus.FromDict(i))
1586

    
1587
      result.payload = decoded
1588

    
1589
    return result
1590

    
1591
  @_RpcTimeout(_TMO_NORMAL)
1592
  def call_impexp_abort(self, node, name):
1593
    """Aborts an import or export.
1594

1595
    This is a single-node call.
1596

1597
    @type node: string
1598
    @param node: Node name
1599
    @type name: string
1600
    @param name: Import/export name
1601

1602
    """
1603
    return self._SingleNodeCall(node, "impexp_abort", [name])
1604

    
1605
  @_RpcTimeout(_TMO_NORMAL)
1606
  def call_impexp_cleanup(self, node, name):
1607
    """Cleans up after an import or export.
1608

1609
    This is a single-node call.
1610

1611
    @type node: string
1612
    @param node: Node name
1613
    @type name: string
1614
    @param name: Import/export name
1615

1616
    """
1617
    return self._SingleNodeCall(node, "impexp_cleanup", [name])