Statistics
| Branch: | Tag: | Revision:

root / lib / rpc.py @ 596b2459

History | View | Annotate | Download (47.1 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Inter-node RPC library.
23

24
"""
25

    
26
# pylint: disable=C0103,R0201,R0904
27
# C0103: Invalid name, since call_ are not valid
28
# R0201: Method could be a function, we keep all rpcs instance methods
29
# as not to change them back and forth between static/instance methods
30
# if they need to start using instance attributes
31
# R0904: Too many public methods
32

    
33
import os
34
import logging
35
import zlib
36
import base64
37
import pycurl
38
import threading
39

    
40
from ganeti import utils
41
from ganeti import objects
42
from ganeti import http
43
from ganeti import serializer
44
from ganeti import constants
45
from ganeti import errors
46
from ganeti import netutils
47
from ganeti import ssconf
48
from ganeti import runtime
49

    
50
# pylint has a bug here, doesn't see this import
51
import ganeti.http.client  # pylint: disable=W0611
52

    
53

    
54
# Timeout for connecting to nodes (seconds)
55
_RPC_CONNECT_TIMEOUT = 5
56

    
57
_RPC_CLIENT_HEADERS = [
58
  "Content-type: %s" % http.HTTP_APP_JSON,
59
  "Expect:",
60
  ]
61

    
62
# Various time constants for the timeout table
63
_TMO_URGENT = 60 # one minute
64
_TMO_FAST = 5 * 60 # five minutes
65
_TMO_NORMAL = 15 * 60 # 15 minutes
66
_TMO_SLOW = 3600 # one hour
67
_TMO_4HRS = 4 * 3600
68
_TMO_1DAY = 86400
69

    
70
# Timeout table that will be built later by decorators
71
# Guidelines for choosing timeouts:
72
# - call used during watcher: timeout -> 1min, _TMO_URGENT
73
# - trivial (but be sure it is trivial) (e.g. reading a file): 5min, _TMO_FAST
74
# - other calls: 15 min, _TMO_NORMAL
75
# - special calls (instance add, etc.): either _TMO_SLOW (1h) or huge timeouts
76

    
77
_TIMEOUTS = {
78
}
79

    
80

    
81
def Init():
82
  """Initializes the module-global HTTP client manager.
83

84
  Must be called before using any RPC function and while exactly one thread is
85
  running.
86

87
  """
88
  # curl_global_init(3) and curl_global_cleanup(3) must be called with only
89
  # one thread running. This check is just a safety measure -- it doesn't
90
  # cover all cases.
91
  assert threading.activeCount() == 1, \
92
         "Found more than one active thread when initializing pycURL"
93

    
94
  logging.info("Using PycURL %s", pycurl.version)
95

    
96
  pycurl.global_init(pycurl.GLOBAL_ALL)
97

    
98

    
99
def Shutdown():
100
  """Stops the module-global HTTP client manager.
101

102
  Must be called before quitting the program and while exactly one thread is
103
  running.
104

105
  """
106
  pycurl.global_cleanup()
107

    
108

    
109
def _ConfigRpcCurl(curl):
110
  noded_cert = str(constants.NODED_CERT_FILE)
111

    
112
  curl.setopt(pycurl.FOLLOWLOCATION, False)
113
  curl.setopt(pycurl.CAINFO, noded_cert)
114
  curl.setopt(pycurl.SSL_VERIFYHOST, 0)
115
  curl.setopt(pycurl.SSL_VERIFYPEER, True)
116
  curl.setopt(pycurl.SSLCERTTYPE, "PEM")
117
  curl.setopt(pycurl.SSLCERT, noded_cert)
118
  curl.setopt(pycurl.SSLKEYTYPE, "PEM")
119
  curl.setopt(pycurl.SSLKEY, noded_cert)
120
  curl.setopt(pycurl.CONNECTTIMEOUT, _RPC_CONNECT_TIMEOUT)
121

    
122

    
123
# Aliasing this module avoids the following warning by epydoc: "Warning: No
124
# information available for ganeti.rpc._RpcThreadLocal's base threading.local"
125
_threading = threading
126

    
127

    
128
class _RpcThreadLocal(_threading.local):
129
  def GetHttpClientPool(self):
130
    """Returns a per-thread HTTP client pool.
131

132
    @rtype: L{http.client.HttpClientPool}
133

134
    """
135
    try:
136
      pool = self.hcp
137
    except AttributeError:
138
      pool = http.client.HttpClientPool(_ConfigRpcCurl)
139
      self.hcp = pool
140

    
141
    return pool
142

    
143

    
144
# Remove module alias (see above)
145
del _threading
146

    
147

    
148
_thread_local = _RpcThreadLocal()
149

    
150

    
151
def _RpcTimeout(secs):
152
  """Timeout decorator.
153

154
  When applied to a rpc call_* function, it updates the global timeout
155
  table with the given function/timeout.
156

157
  """
158
  def decorator(f):
159
    name = f.__name__
160
    assert name.startswith("call_")
161
    _TIMEOUTS[name[len("call_"):]] = secs
162
    return f
163
  return decorator
164

    
165

    
166
def RunWithRPC(fn):
167
  """RPC-wrapper decorator.
168

169
  When applied to a function, it runs it with the RPC system
170
  initialized, and it shutsdown the system afterwards. This means the
171
  function must be called without RPC being initialized.
172

173
  """
174
  def wrapper(*args, **kwargs):
175
    Init()
176
    try:
177
      return fn(*args, **kwargs)
178
    finally:
179
      Shutdown()
180
  return wrapper
181

    
182

    
183
class RpcResult(object):
184
  """RPC Result class.
185

186
  This class holds an RPC result. It is needed since in multi-node
187
  calls we can't raise an exception just because one one out of many
188
  failed, and therefore we use this class to encapsulate the result.
189

190
  @ivar data: the data payload, for successful results, or None
191
  @ivar call: the name of the RPC call
192
  @ivar node: the name of the node to which we made the call
193
  @ivar offline: whether the operation failed because the node was
194
      offline, as opposed to actual failure; offline=True will always
195
      imply failed=True, in order to allow simpler checking if
196
      the user doesn't care about the exact failure mode
197
  @ivar fail_msg: the error message if the call failed
198

199
  """
200
  def __init__(self, data=None, failed=False, offline=False,
201
               call=None, node=None):
202
    self.offline = offline
203
    self.call = call
204
    self.node = node
205

    
206
    if offline:
207
      self.fail_msg = "Node is marked offline"
208
      self.data = self.payload = None
209
    elif failed:
210
      self.fail_msg = self._EnsureErr(data)
211
      self.data = self.payload = None
212
    else:
213
      self.data = data
214
      if not isinstance(self.data, (tuple, list)):
215
        self.fail_msg = ("RPC layer error: invalid result type (%s)" %
216
                         type(self.data))
217
        self.payload = None
218
      elif len(data) != 2:
219
        self.fail_msg = ("RPC layer error: invalid result length (%d), "
220
                         "expected 2" % len(self.data))
221
        self.payload = None
222
      elif not self.data[0]:
223
        self.fail_msg = self._EnsureErr(self.data[1])
224
        self.payload = None
225
      else:
226
        # finally success
227
        self.fail_msg = None
228
        self.payload = data[1]
229

    
230
    for attr_name in ["call", "data", "fail_msg",
231
                      "node", "offline", "payload"]:
232
      assert hasattr(self, attr_name), "Missing attribute %s" % attr_name
233

    
234
  @staticmethod
235
  def _EnsureErr(val):
236
    """Helper to ensure we return a 'True' value for error."""
237
    if val:
238
      return val
239
    else:
240
      return "No error information"
241

    
242
  def Raise(self, msg, prereq=False, ecode=None):
243
    """If the result has failed, raise an OpExecError.
244

245
    This is used so that LU code doesn't have to check for each
246
    result, but instead can call this function.
247

248
    """
249
    if not self.fail_msg:
250
      return
251

    
252
    if not msg: # one could pass None for default message
253
      msg = ("Call '%s' to node '%s' has failed: %s" %
254
             (self.call, self.node, self.fail_msg))
255
    else:
256
      msg = "%s: %s" % (msg, self.fail_msg)
257
    if prereq:
258
      ec = errors.OpPrereqError
259
    else:
260
      ec = errors.OpExecError
261
    if ecode is not None:
262
      args = (msg, ecode)
263
    else:
264
      args = (msg, )
265
    raise ec(*args) # pylint: disable=W0142
266

    
267

    
268
def _AddressLookup(node_list,
269
                   ssc=ssconf.SimpleStore,
270
                   nslookup_fn=netutils.Hostname.GetIP):
271
  """Return addresses for given node names.
272

273
  @type node_list: list
274
  @param node_list: List of node names
275
  @type ssc: class
276
  @param ssc: SimpleStore class that is used to obtain node->ip mappings
277
  @type nslookup_fn: callable
278
  @param nslookup_fn: function use to do NS lookup
279
  @rtype: list of addresses and/or None's
280
  @returns: List of corresponding addresses, if found
281

282
  """
283
  ss = ssc()
284
  iplist = ss.GetNodePrimaryIPList()
285
  family = ss.GetPrimaryIPFamily()
286
  addresses = []
287
  ipmap = dict(entry.split() for entry in iplist)
288
  for node in node_list:
289
    address = ipmap.get(node)
290
    if address is None:
291
      address = nslookup_fn(node, family=family)
292
    addresses.append(address)
293

    
294
  return addresses
295

    
296

    
297
class Client:
298
  """RPC Client class.
299

300
  This class, given a (remote) method name, a list of parameters and a
301
  list of nodes, will contact (in parallel) all nodes, and return a
302
  dict of results (key: node name, value: result).
303

304
  One current bug is that generic failure is still signaled by
305
  'False' result, which is not good. This overloading of values can
306
  cause bugs.
307

308
  """
309
  def __init__(self, procedure, body, port, address_lookup_fn=_AddressLookup):
310
    assert procedure in _TIMEOUTS, ("New RPC call not declared in the"
311
                                    " timeouts table")
312
    self.procedure = procedure
313
    self.body = body
314
    self.port = port
315
    self._request = {}
316
    self._address_lookup_fn = address_lookup_fn
317

    
318
  def ConnectList(self, node_list, address_list=None, read_timeout=None):
319
    """Add a list of nodes to the target nodes.
320

321
    @type node_list: list
322
    @param node_list: the list of node names to connect
323
    @type address_list: list or None
324
    @keyword address_list: either None or a list with node addresses,
325
        which must have the same length as the node list
326
    @type read_timeout: int
327
    @param read_timeout: overwrites default timeout for operation
328

329
    """
330
    if address_list is None:
331
      # Always use IP address instead of node name
332
      address_list = self._address_lookup_fn(node_list)
333

    
334
    assert len(node_list) == len(address_list), \
335
           "Name and address lists must have the same length"
336

    
337
    for node, address in zip(node_list, address_list):
338
      self.ConnectNode(node, address, read_timeout=read_timeout)
339

    
340
  def ConnectNode(self, name, address=None, read_timeout=None):
341
    """Add a node to the target list.
342

343
    @type name: str
344
    @param name: the node name
345
    @type address: str
346
    @param address: the node address, if known
347
    @type read_timeout: int
348
    @param read_timeout: overwrites default timeout for operation
349

350
    """
351
    if address is None:
352
      # Always use IP address instead of node name
353
      address = self._address_lookup_fn([name])[0]
354

    
355
    assert(address is not None)
356

    
357
    if read_timeout is None:
358
      read_timeout = _TIMEOUTS[self.procedure]
359

    
360
    self._request[name] = \
361
      http.client.HttpClientRequest(str(address), self.port,
362
                                    http.HTTP_PUT, str("/%s" % self.procedure),
363
                                    headers=_RPC_CLIENT_HEADERS,
364
                                    post_data=str(self.body),
365
                                    read_timeout=read_timeout)
366

    
367
  def GetResults(self, http_pool=None):
368
    """Call nodes and return results.
369

370
    @rtype: list
371
    @return: List of RPC results
372

373
    """
374
    if not http_pool:
375
      http_pool = http.client.HttpClientPool(_ConfigRpcCurl)
376

    
377
    http_pool.ProcessRequests(self._request.values())
378

    
379
    results = {}
380

    
381
    for name, req in self._request.iteritems():
382
      if req.success and req.resp_status_code == http.HTTP_OK:
383
        results[name] = RpcResult(data=serializer.LoadJson(req.resp_body),
384
                                  node=name, call=self.procedure)
385
        continue
386

    
387
      # TODO: Better error reporting
388
      if req.error:
389
        msg = req.error
390
      else:
391
        msg = req.resp_body
392

    
393
      logging.error("RPC error in %s from node %s: %s",
394
                    self.procedure, name, msg)
395
      results[name] = RpcResult(data=msg, failed=True, node=name,
396
                                call=self.procedure)
397

    
398
    return results
399

    
400

    
401
def _EncodeImportExportIO(ieio, ieioargs):
402
  """Encodes import/export I/O information.
403

404
  """
405
  if ieio == constants.IEIO_RAW_DISK:
406
    assert len(ieioargs) == 1
407
    return (ieioargs[0].ToDict(), )
408

    
409
  if ieio == constants.IEIO_SCRIPT:
410
    assert len(ieioargs) == 2
411
    return (ieioargs[0].ToDict(), ieioargs[1])
412

    
413
  return ieioargs
414

    
415

    
416
class RpcRunner(object):
417
  """RPC runner class"""
418

    
419
  def __init__(self, cfg):
420
    """Initialized the rpc runner.
421

422
    @type cfg:  C{config.ConfigWriter}
423
    @param cfg: the configuration object that will be used to get data
424
                about the cluster
425

426
    """
427
    self._cfg = cfg
428
    self.port = netutils.GetDaemonPort(constants.NODED)
429

    
430
  def _InstDict(self, instance, hvp=None, bep=None, osp=None):
431
    """Convert the given instance to a dict.
432

433
    This is done via the instance's ToDict() method and additionally
434
    we fill the hvparams with the cluster defaults.
435

436
    @type instance: L{objects.Instance}
437
    @param instance: an Instance object
438
    @type hvp: dict or None
439
    @param hvp: a dictionary with overridden hypervisor parameters
440
    @type bep: dict or None
441
    @param bep: a dictionary with overridden backend parameters
442
    @type osp: dict or None
443
    @param osp: a dictionary with overridden os parameters
444
    @rtype: dict
445
    @return: the instance dict, with the hvparams filled with the
446
        cluster defaults
447

448
    """
449
    idict = instance.ToDict()
450
    cluster = self._cfg.GetClusterInfo()
451
    idict["hvparams"] = cluster.FillHV(instance)
452
    if hvp is not None:
453
      idict["hvparams"].update(hvp)
454
    idict["beparams"] = cluster.FillBE(instance)
455
    if bep is not None:
456
      idict["beparams"].update(bep)
457
    idict["osparams"] = cluster.SimpleFillOS(instance.os, instance.osparams)
458
    if osp is not None:
459
      idict["osparams"].update(osp)
460
    for nic in idict["nics"]:
461
      nic['nicparams'] = objects.FillDict(
462
        cluster.nicparams[constants.PP_DEFAULT],
463
        nic['nicparams'])
464
    return idict
465

    
466
  def _ConnectList(self, client, node_list, call, read_timeout=None):
467
    """Helper for computing node addresses.
468

469
    @type client: L{ganeti.rpc.Client}
470
    @param client: a C{Client} instance
471
    @type node_list: list
472
    @param node_list: the node list we should connect
473
    @type call: string
474
    @param call: the name of the remote procedure call, for filling in
475
        correctly any eventual offline nodes' results
476
    @type read_timeout: int
477
    @param read_timeout: overwrites the default read timeout for the
478
        given operation
479

480
    """
481
    all_nodes = self._cfg.GetAllNodesInfo()
482
    name_list = []
483
    addr_list = []
484
    skip_dict = {}
485
    for node in node_list:
486
      if node in all_nodes:
487
        if all_nodes[node].offline:
488
          skip_dict[node] = RpcResult(node=node, offline=True, call=call)
489
          continue
490
        val = all_nodes[node].primary_ip
491
      else:
492
        val = None
493
      addr_list.append(val)
494
      name_list.append(node)
495
    if name_list:
496
      client.ConnectList(name_list, address_list=addr_list,
497
                         read_timeout=read_timeout)
498
    return skip_dict
499

    
500
  def _ConnectNode(self, client, node, call, read_timeout=None):
501
    """Helper for computing one node's address.
502

503
    @type client: L{ganeti.rpc.Client}
504
    @param client: a C{Client} instance
505
    @type node: str
506
    @param node: the node we should connect
507
    @type call: string
508
    @param call: the name of the remote procedure call, for filling in
509
        correctly any eventual offline nodes' results
510
    @type read_timeout: int
511
    @param read_timeout: overwrites the default read timeout for the
512
        given operation
513

514
    """
515
    node_info = self._cfg.GetNodeInfo(node)
516
    if node_info is not None:
517
      if node_info.offline:
518
        return RpcResult(node=node, offline=True, call=call)
519
      addr = node_info.primary_ip
520
    else:
521
      addr = None
522
    client.ConnectNode(node, address=addr, read_timeout=read_timeout)
523

    
524
  def _MultiNodeCall(self, node_list, procedure, args, read_timeout=None):
525
    """Helper for making a multi-node call
526

527
    """
528
    body = serializer.DumpJson(args, indent=False)
529
    c = Client(procedure, body, self.port)
530
    skip_dict = self._ConnectList(c, node_list, procedure,
531
                                  read_timeout=read_timeout)
532
    skip_dict.update(c.GetResults())
533
    return skip_dict
534

    
535
  @classmethod
536
  def _StaticMultiNodeCall(cls, node_list, procedure, args,
537
                           address_list=None, read_timeout=None):
538
    """Helper for making a multi-node static call
539

540
    """
541
    body = serializer.DumpJson(args, indent=False)
542
    c = Client(procedure, body, netutils.GetDaemonPort(constants.NODED))
543
    c.ConnectList(node_list, address_list=address_list,
544
                  read_timeout=read_timeout)
545
    return c.GetResults()
546

    
547
  def _SingleNodeCall(self, node, procedure, args, read_timeout=None):
548
    """Helper for making a single-node call
549

550
    """
551
    body = serializer.DumpJson(args, indent=False)
552
    c = Client(procedure, body, self.port)
553
    result = self._ConnectNode(c, node, procedure, read_timeout=read_timeout)
554
    if result is None:
555
      # we did connect, node is not offline
556
      result = c.GetResults()[node]
557
    return result
558

    
559
  @classmethod
560
  def _StaticSingleNodeCall(cls, node, procedure, args, read_timeout=None):
561
    """Helper for making a single-node static call
562

563
    """
564
    body = serializer.DumpJson(args, indent=False)
565
    c = Client(procedure, body, netutils.GetDaemonPort(constants.NODED))
566
    c.ConnectNode(node, read_timeout=read_timeout)
567
    return c.GetResults()[node]
568

    
569
  @staticmethod
570
  def _Compress(data):
571
    """Compresses a string for transport over RPC.
572

573
    Small amounts of data are not compressed.
574

575
    @type data: str
576
    @param data: Data
577
    @rtype: tuple
578
    @return: Encoded data to send
579

580
    """
581
    # Small amounts of data are not compressed
582
    if len(data) < 512:
583
      return (constants.RPC_ENCODING_NONE, data)
584

    
585
    # Compress with zlib and encode in base64
586
    return (constants.RPC_ENCODING_ZLIB_BASE64,
587
            base64.b64encode(zlib.compress(data, 3)))
588

    
589
  #
590
  # Begin RPC calls
591
  #
592

    
593
  @_RpcTimeout(_TMO_URGENT)
594
  def call_bdev_sizes(self, node_list, devices):
595
    """Gets the sizes of requested block devices present on a node
596

597
    This is a multi-node call.
598

599
    """
600
    return self._MultiNodeCall(node_list, "bdev_sizes", [devices])
601

    
602
  @_RpcTimeout(_TMO_URGENT)
603
  def call_lv_list(self, node_list, vg_name):
604
    """Gets the logical volumes present in a given volume group.
605

606
    This is a multi-node call.
607

608
    """
609
    return self._MultiNodeCall(node_list, "lv_list", [vg_name])
610

    
611
  @_RpcTimeout(_TMO_URGENT)
612
  def call_vg_list(self, node_list):
613
    """Gets the volume group list.
614

615
    This is a multi-node call.
616

617
    """
618
    return self._MultiNodeCall(node_list, "vg_list", [])
619

    
620
  @_RpcTimeout(_TMO_NORMAL)
621
  def call_storage_list(self, node_list, su_name, su_args, name, fields):
622
    """Get list of storage units.
623

624
    This is a multi-node call.
625

626
    """
627
    return self._MultiNodeCall(node_list, "storage_list",
628
                               [su_name, su_args, name, fields])
629

    
630
  @_RpcTimeout(_TMO_NORMAL)
631
  def call_storage_modify(self, node, su_name, su_args, name, changes):
632
    """Modify a storage unit.
633

634
    This is a single-node call.
635

636
    """
637
    return self._SingleNodeCall(node, "storage_modify",
638
                                [su_name, su_args, name, changes])
639

    
640
  @_RpcTimeout(_TMO_NORMAL)
641
  def call_storage_execute(self, node, su_name, su_args, name, op):
642
    """Executes an operation on a storage unit.
643

644
    This is a single-node call.
645

646
    """
647
    return self._SingleNodeCall(node, "storage_execute",
648
                                [su_name, su_args, name, op])
649

    
650
  @_RpcTimeout(_TMO_URGENT)
651
  def call_bridges_exist(self, node, bridges_list):
652
    """Checks if a node has all the bridges given.
653

654
    This method checks if all bridges given in the bridges_list are
655
    present on the remote node, so that an instance that uses interfaces
656
    on those bridges can be started.
657

658
    This is a single-node call.
659

660
    """
661
    return self._SingleNodeCall(node, "bridges_exist", [bridges_list])
662

    
663
  @_RpcTimeout(_TMO_NORMAL)
664
  def call_instance_start(self, node, instance, hvp, bep, startup_paused):
665
    """Starts an instance.
666

667
    This is a single-node call.
668

669
    """
670
    idict = self._InstDict(instance, hvp=hvp, bep=bep)
671
    return self._SingleNodeCall(node, "instance_start", [idict, startup_paused])
672

    
673
  @_RpcTimeout(_TMO_NORMAL)
674
  def call_instance_shutdown(self, node, instance, timeout):
675
    """Stops an instance.
676

677
    This is a single-node call.
678

679
    """
680
    return self._SingleNodeCall(node, "instance_shutdown",
681
                                [self._InstDict(instance), timeout])
682

    
683
  @_RpcTimeout(_TMO_NORMAL)
684
  def call_migration_info(self, node, instance):
685
    """Gather the information necessary to prepare an instance migration.
686

687
    This is a single-node call.
688

689
    @type node: string
690
    @param node: the node on which the instance is currently running
691
    @type instance: C{objects.Instance}
692
    @param instance: the instance definition
693

694
    """
695
    return self._SingleNodeCall(node, "migration_info",
696
                                [self._InstDict(instance)])
697

    
698
  @_RpcTimeout(_TMO_NORMAL)
699
  def call_accept_instance(self, node, instance, info, target):
700
    """Prepare a node to accept an instance.
701

702
    This is a single-node call.
703

704
    @type node: string
705
    @param node: the target node for the migration
706
    @type instance: C{objects.Instance}
707
    @param instance: the instance definition
708
    @type info: opaque/hypervisor specific (string/data)
709
    @param info: result for the call_migration_info call
710
    @type target: string
711
    @param target: target hostname (usually ip address) (on the node itself)
712

713
    """
714
    return self._SingleNodeCall(node, "accept_instance",
715
                                [self._InstDict(instance), info, target])
716

    
717
  @_RpcTimeout(_TMO_NORMAL)
718
  def call_finalize_migration(self, node, instance, info, success):
719
    """Finalize any target-node migration specific operation.
720

721
    This is called both in case of a successful migration and in case of error
722
    (in which case it should abort the migration).
723

724
    This is a single-node call.
725

726
    @type node: string
727
    @param node: the target node for the migration
728
    @type instance: C{objects.Instance}
729
    @param instance: the instance definition
730
    @type info: opaque/hypervisor specific (string/data)
731
    @param info: result for the call_migration_info call
732
    @type success: boolean
733
    @param success: whether the migration was a success or a failure
734

735
    """
736
    return self._SingleNodeCall(node, "finalize_migration",
737
                                [self._InstDict(instance), info, success])
738

    
739
  @_RpcTimeout(_TMO_SLOW)
740
  def call_instance_migrate(self, node, instance, target, live):
741
    """Migrate an instance.
742

743
    This is a single-node call.
744

745
    @type node: string
746
    @param node: the node on which the instance is currently running
747
    @type instance: C{objects.Instance}
748
    @param instance: the instance definition
749
    @type target: string
750
    @param target: the target node name
751
    @type live: boolean
752
    @param live: whether the migration should be done live or not (the
753
        interpretation of this parameter is left to the hypervisor)
754

755
    """
756
    return self._SingleNodeCall(node, "instance_migrate",
757
                                [self._InstDict(instance), target, live])
758

    
759
  @_RpcTimeout(_TMO_NORMAL)
760
  def call_instance_reboot(self, node, inst, reboot_type, shutdown_timeout):
761
    """Reboots an instance.
762

763
    This is a single-node call.
764

765
    """
766
    return self._SingleNodeCall(node, "instance_reboot",
767
                                [self._InstDict(inst), reboot_type,
768
                                 shutdown_timeout])
769

    
770
  @_RpcTimeout(_TMO_1DAY)
771
  def call_instance_os_add(self, node, inst, reinstall, debug, osparams=None):
772
    """Installs an OS on the given instance.
773

774
    This is a single-node call.
775

776
    """
777
    return self._SingleNodeCall(node, "instance_os_add",
778
                                [self._InstDict(inst, osp=osparams),
779
                                 reinstall, debug])
780

    
781
  @_RpcTimeout(_TMO_SLOW)
782
  def call_instance_run_rename(self, node, inst, old_name, debug):
783
    """Run the OS rename script for an instance.
784

785
    This is a single-node call.
786

787
    """
788
    return self._SingleNodeCall(node, "instance_run_rename",
789
                                [self._InstDict(inst), old_name, debug])
790

    
791
  @_RpcTimeout(_TMO_URGENT)
792
  def call_instance_info(self, node, instance, hname):
793
    """Returns information about a single instance.
794

795
    This is a single-node call.
796

797
    @type node: list
798
    @param node: the list of nodes to query
799
    @type instance: string
800
    @param instance: the instance name
801
    @type hname: string
802
    @param hname: the hypervisor type of the instance
803

804
    """
805
    return self._SingleNodeCall(node, "instance_info", [instance, hname])
806

    
807
  @_RpcTimeout(_TMO_NORMAL)
808
  def call_instance_migratable(self, node, instance):
809
    """Checks whether the given instance can be migrated.
810

811
    This is a single-node call.
812

813
    @param node: the node to query
814
    @type instance: L{objects.Instance}
815
    @param instance: the instance to check
816

817

818
    """
819
    return self._SingleNodeCall(node, "instance_migratable",
820
                                [self._InstDict(instance)])
821

    
822
  @_RpcTimeout(_TMO_URGENT)
823
  def call_all_instances_info(self, node_list, hypervisor_list):
824
    """Returns information about all instances on the given nodes.
825

826
    This is a multi-node call.
827

828
    @type node_list: list
829
    @param node_list: the list of nodes to query
830
    @type hypervisor_list: list
831
    @param hypervisor_list: the hypervisors to query for instances
832

833
    """
834
    return self._MultiNodeCall(node_list, "all_instances_info",
835
                               [hypervisor_list])
836

    
837
  @_RpcTimeout(_TMO_URGENT)
838
  def call_instance_list(self, node_list, hypervisor_list):
839
    """Returns the list of running instances on a given node.
840

841
    This is a multi-node call.
842

843
    @type node_list: list
844
    @param node_list: the list of nodes to query
845
    @type hypervisor_list: list
846
    @param hypervisor_list: the hypervisors to query for instances
847

848
    """
849
    return self._MultiNodeCall(node_list, "instance_list", [hypervisor_list])
850

    
851
  @_RpcTimeout(_TMO_FAST)
852
  def call_node_tcp_ping(self, node, source, target, port, timeout,
853
                         live_port_needed):
854
    """Do a TcpPing on the remote node
855

856
    This is a single-node call.
857

858
    """
859
    return self._SingleNodeCall(node, "node_tcp_ping",
860
                                [source, target, port, timeout,
861
                                 live_port_needed])
862

    
863
  @_RpcTimeout(_TMO_FAST)
864
  def call_node_has_ip_address(self, node, address):
865
    """Checks if a node has the given IP address.
866

867
    This is a single-node call.
868

869
    """
870
    return self._SingleNodeCall(node, "node_has_ip_address", [address])
871

    
872
  @_RpcTimeout(_TMO_URGENT)
873
  def call_node_info(self, node_list, vg_name, hypervisor_type):
874
    """Return node information.
875

876
    This will return memory information and volume group size and free
877
    space.
878

879
    This is a multi-node call.
880

881
    @type node_list: list
882
    @param node_list: the list of nodes to query
883
    @type vg_name: C{string}
884
    @param vg_name: the name of the volume group to ask for disk space
885
        information
886
    @type hypervisor_type: C{str}
887
    @param hypervisor_type: the name of the hypervisor to ask for
888
        memory information
889

890
    """
891
    return self._MultiNodeCall(node_list, "node_info",
892
                               [vg_name, hypervisor_type])
893

    
894
  @_RpcTimeout(_TMO_NORMAL)
895
  def call_etc_hosts_modify(self, node, mode, name, ip):
896
    """Modify hosts file with name
897

898
    @type node: string
899
    @param node: The node to call
900
    @type mode: string
901
    @param mode: The mode to operate. Currently "add" or "remove"
902
    @type name: string
903
    @param name: The host name to be modified
904
    @type ip: string
905
    @param ip: The ip of the entry (just valid if mode is "add")
906

907
    """
908
    return self._SingleNodeCall(node, "etc_hosts_modify", [mode, name, ip])
909

    
910
  @_RpcTimeout(_TMO_NORMAL)
911
  def call_node_verify(self, node_list, checkdict, cluster_name):
912
    """Request verification of given parameters.
913

914
    This is a multi-node call.
915

916
    """
917
    return self._MultiNodeCall(node_list, "node_verify",
918
                               [checkdict, cluster_name])
919

    
920
  @classmethod
921
  @_RpcTimeout(_TMO_FAST)
922
  def call_node_start_master(cls, node, start_daemons, no_voting):
923
    """Tells a node to activate itself as a master.
924

925
    This is a single-node call.
926

927
    """
928
    return cls._StaticSingleNodeCall(node, "node_start_master",
929
                                     [start_daemons, no_voting])
930

    
931
  @classmethod
932
  @_RpcTimeout(_TMO_FAST)
933
  def call_node_stop_master(cls, node, stop_daemons):
934
    """Tells a node to demote itself from master status.
935

936
    This is a single-node call.
937

938
    """
939
    return cls._StaticSingleNodeCall(node, "node_stop_master", [stop_daemons])
940

    
941
  @classmethod
942
  @_RpcTimeout(_TMO_URGENT)
943
  def call_master_info(cls, node_list):
944
    """Query master info.
945

946
    This is a multi-node call.
947

948
    """
949
    # TODO: should this method query down nodes?
950
    return cls._StaticMultiNodeCall(node_list, "master_info", [])
951

    
952
  @classmethod
953
  @_RpcTimeout(_TMO_URGENT)
954
  def call_version(cls, node_list):
955
    """Query node version.
956

957
    This is a multi-node call.
958

959
    """
960
    return cls._StaticMultiNodeCall(node_list, "version", [])
961

    
962
  @_RpcTimeout(_TMO_NORMAL)
963
  def call_blockdev_create(self, node, bdev, size, owner, on_primary, info):
964
    """Request creation of a given block device.
965

966
    This is a single-node call.
967

968
    """
969
    return self._SingleNodeCall(node, "blockdev_create",
970
                                [bdev.ToDict(), size, owner, on_primary, info])
971

    
972
  @_RpcTimeout(_TMO_SLOW)
973
  def call_blockdev_wipe(self, node, bdev, offset, size):
974
    """Request wipe at given offset with given size of a block device.
975

976
    This is a single-node call.
977

978
    """
979
    return self._SingleNodeCall(node, "blockdev_wipe",
980
                                [bdev.ToDict(), offset, size])
981

    
982
  @_RpcTimeout(_TMO_NORMAL)
983
  def call_blockdev_remove(self, node, bdev):
984
    """Request removal of a given block device.
985

986
    This is a single-node call.
987

988
    """
989
    return self._SingleNodeCall(node, "blockdev_remove", [bdev.ToDict()])
990

    
991
  @_RpcTimeout(_TMO_NORMAL)
992
  def call_blockdev_rename(self, node, devlist):
993
    """Request rename of the given block devices.
994

995
    This is a single-node call.
996

997
    """
998
    return self._SingleNodeCall(node, "blockdev_rename",
999
                                [(d.ToDict(), uid) for d, uid in devlist])
1000

    
1001
  @_RpcTimeout(_TMO_NORMAL)
1002
  def call_blockdev_pause_resume_sync(self, node, disks, pause):
1003
    """Request a pause/resume of given block device.
1004

1005
    This is a single-node call.
1006

1007
    """
1008
    return self._SingleNodeCall(node, "blockdev_pause_resume_sync",
1009
                                [[bdev.ToDict() for bdev in disks], pause])
1010

    
1011
  @_RpcTimeout(_TMO_NORMAL)
1012
  def call_blockdev_assemble(self, node, disk, owner, on_primary, idx):
1013
    """Request assembling of a given block device.
1014

1015
    This is a single-node call.
1016

1017
    """
1018
    return self._SingleNodeCall(node, "blockdev_assemble",
1019
                                [disk.ToDict(), owner, on_primary, idx])
1020

    
1021
  @_RpcTimeout(_TMO_NORMAL)
1022
  def call_blockdev_shutdown(self, node, disk):
1023
    """Request shutdown of a given block device.
1024

1025
    This is a single-node call.
1026

1027
    """
1028
    return self._SingleNodeCall(node, "blockdev_shutdown", [disk.ToDict()])
1029

    
1030
  @_RpcTimeout(_TMO_NORMAL)
1031
  def call_blockdev_addchildren(self, node, bdev, ndevs):
1032
    """Request adding a list of children to a (mirroring) device.
1033

1034
    This is a single-node call.
1035

1036
    """
1037
    return self._SingleNodeCall(node, "blockdev_addchildren",
1038
                                [bdev.ToDict(),
1039
                                 [disk.ToDict() for disk in ndevs]])
1040

    
1041
  @_RpcTimeout(_TMO_NORMAL)
1042
  def call_blockdev_removechildren(self, node, bdev, ndevs):
1043
    """Request removing a list of children from a (mirroring) device.
1044

1045
    This is a single-node call.
1046

1047
    """
1048
    return self._SingleNodeCall(node, "blockdev_removechildren",
1049
                                [bdev.ToDict(),
1050
                                 [disk.ToDict() for disk in ndevs]])
1051

    
1052
  @_RpcTimeout(_TMO_NORMAL)
1053
  def call_blockdev_getmirrorstatus(self, node, disks):
1054
    """Request status of a (mirroring) device.
1055

1056
    This is a single-node call.
1057

1058
    """
1059
    result = self._SingleNodeCall(node, "blockdev_getmirrorstatus",
1060
                                  [dsk.ToDict() for dsk in disks])
1061
    if not result.fail_msg:
1062
      result.payload = [objects.BlockDevStatus.FromDict(i)
1063
                        for i in result.payload]
1064
    return result
1065

    
1066
  @_RpcTimeout(_TMO_NORMAL)
1067
  def call_blockdev_getmirrorstatus_multi(self, node_list, node_disks):
1068
    """Request status of (mirroring) devices from multiple nodes.
1069

1070
    This is a multi-node call.
1071

1072
    """
1073
    result = self._MultiNodeCall(node_list, "blockdev_getmirrorstatus_multi",
1074
                                 [dict((name, [dsk.ToDict() for dsk in disks])
1075
                                       for name, disks in node_disks.items())])
1076
    for nres in result.values():
1077
      if nres.fail_msg:
1078
        continue
1079

    
1080
      for idx, (success, status) in enumerate(nres.payload):
1081
        if success:
1082
          nres.payload[idx] = (success, objects.BlockDevStatus.FromDict(status))
1083

    
1084
    return result
1085

    
1086
  @_RpcTimeout(_TMO_NORMAL)
1087
  def call_blockdev_find(self, node, disk):
1088
    """Request identification of a given block device.
1089

1090
    This is a single-node call.
1091

1092
    """
1093
    result = self._SingleNodeCall(node, "blockdev_find", [disk.ToDict()])
1094
    if not result.fail_msg and result.payload is not None:
1095
      result.payload = objects.BlockDevStatus.FromDict(result.payload)
1096
    return result
1097

    
1098
  @_RpcTimeout(_TMO_NORMAL)
1099
  def call_blockdev_close(self, node, instance_name, disks):
1100
    """Closes the given block devices.
1101

1102
    This is a single-node call.
1103

1104
    """
1105
    params = [instance_name, [cf.ToDict() for cf in disks]]
1106
    return self._SingleNodeCall(node, "blockdev_close", params)
1107

    
1108
  @_RpcTimeout(_TMO_NORMAL)
1109
  def call_blockdev_getsize(self, node, disks):
1110
    """Returns the size of the given disks.
1111

1112
    This is a single-node call.
1113

1114
    """
1115
    params = [[cf.ToDict() for cf in disks]]
1116
    return self._SingleNodeCall(node, "blockdev_getsize", params)
1117

    
1118
  @_RpcTimeout(_TMO_NORMAL)
1119
  def call_drbd_disconnect_net(self, node_list, nodes_ip, disks):
1120
    """Disconnects the network of the given drbd devices.
1121

1122
    This is a multi-node call.
1123

1124
    """
1125
    return self._MultiNodeCall(node_list, "drbd_disconnect_net",
1126
                               [nodes_ip, [cf.ToDict() for cf in disks]])
1127

    
1128
  @_RpcTimeout(_TMO_NORMAL)
1129
  def call_drbd_attach_net(self, node_list, nodes_ip,
1130
                           disks, instance_name, multimaster):
1131
    """Disconnects the given drbd devices.
1132

1133
    This is a multi-node call.
1134

1135
    """
1136
    return self._MultiNodeCall(node_list, "drbd_attach_net",
1137
                               [nodes_ip, [cf.ToDict() for cf in disks],
1138
                                instance_name, multimaster])
1139

    
1140
  @_RpcTimeout(_TMO_SLOW)
1141
  def call_drbd_wait_sync(self, node_list, nodes_ip, disks):
1142
    """Waits for the synchronization of drbd devices is complete.
1143

1144
    This is a multi-node call.
1145

1146
    """
1147
    return self._MultiNodeCall(node_list, "drbd_wait_sync",
1148
                               [nodes_ip, [cf.ToDict() for cf in disks]])
1149

    
1150
  @_RpcTimeout(_TMO_URGENT)
1151
  def call_drbd_helper(self, node_list):
1152
    """Gets drbd helper.
1153

1154
    This is a multi-node call.
1155

1156
    """
1157
    return self._MultiNodeCall(node_list, "drbd_helper", [])
1158

    
1159
  @classmethod
1160
  @_RpcTimeout(_TMO_NORMAL)
1161
  def call_upload_file(cls, node_list, file_name, address_list=None):
1162
    """Upload a file.
1163

1164
    The node will refuse the operation in case the file is not on the
1165
    approved file list.
1166

1167
    This is a multi-node call.
1168

1169
    @type node_list: list
1170
    @param node_list: the list of node names to upload to
1171
    @type file_name: str
1172
    @param file_name: the filename to upload
1173
    @type address_list: list or None
1174
    @keyword address_list: an optional list of node addresses, in order
1175
        to optimize the RPC speed
1176

1177
    """
1178
    file_contents = utils.ReadFile(file_name)
1179
    data = cls._Compress(file_contents)
1180
    st = os.stat(file_name)
1181
    getents = runtime.GetEnts()
1182
    params = [file_name, data, st.st_mode, getents.LookupUid(st.st_uid),
1183
              getents.LookupGid(st.st_gid), st.st_atime, st.st_mtime]
1184
    return cls._StaticMultiNodeCall(node_list, "upload_file", params,
1185
                                    address_list=address_list)
1186

    
1187
  @classmethod
1188
  @_RpcTimeout(_TMO_NORMAL)
1189
  def call_write_ssconf_files(cls, node_list, values):
1190
    """Write ssconf files.
1191

1192
    This is a multi-node call.
1193

1194
    """
1195
    return cls._StaticMultiNodeCall(node_list, "write_ssconf_files", [values])
1196

    
1197
  @_RpcTimeout(_TMO_NORMAL)
1198
  def call_run_oob(self, node, oob_program, command, remote_node, timeout):
1199
    """Runs OOB.
1200

1201
    This is a single-node call.
1202

1203
    """
1204
    return self._SingleNodeCall(node, "run_oob", [oob_program, command,
1205
                                                  remote_node, timeout])
1206

    
1207
  @_RpcTimeout(_TMO_FAST)
1208
  def call_os_diagnose(self, node_list):
1209
    """Request a diagnose of OS definitions.
1210

1211
    This is a multi-node call.
1212

1213
    """
1214
    return self._MultiNodeCall(node_list, "os_diagnose", [])
1215

    
1216
  @_RpcTimeout(_TMO_FAST)
1217
  def call_os_get(self, node, name):
1218
    """Returns an OS definition.
1219

1220
    This is a single-node call.
1221

1222
    """
1223
    result = self._SingleNodeCall(node, "os_get", [name])
1224
    if not result.fail_msg and isinstance(result.payload, dict):
1225
      result.payload = objects.OS.FromDict(result.payload)
1226
    return result
1227

    
1228
  @_RpcTimeout(_TMO_FAST)
1229
  def call_os_validate(self, required, nodes, name, checks, params):
1230
    """Run a validation routine for a given OS.
1231

1232
    This is a multi-node call.
1233

1234
    """
1235
    return self._MultiNodeCall(nodes, "os_validate",
1236
                               [required, name, checks, params])
1237

    
1238
  @_RpcTimeout(_TMO_NORMAL)
1239
  def call_hooks_runner(self, node_list, hpath, phase, env):
1240
    """Call the hooks runner.
1241

1242
    Args:
1243
      - op: the OpCode instance
1244
      - env: a dictionary with the environment
1245

1246
    This is a multi-node call.
1247

1248
    """
1249
    params = [hpath, phase, env]
1250
    return self._MultiNodeCall(node_list, "hooks_runner", params)
1251

    
1252
  @_RpcTimeout(_TMO_NORMAL)
1253
  def call_iallocator_runner(self, node, name, idata):
1254
    """Call an iallocator on a remote node
1255

1256
    Args:
1257
      - name: the iallocator name
1258
      - input: the json-encoded input string
1259

1260
    This is a single-node call.
1261

1262
    """
1263
    return self._SingleNodeCall(node, "iallocator_runner", [name, idata])
1264

    
1265
  @_RpcTimeout(_TMO_NORMAL)
1266
  def call_blockdev_grow(self, node, cf_bdev, amount, dryrun):
1267
    """Request a snapshot of the given block device.
1268

1269
    This is a single-node call.
1270

1271
    """
1272
    return self._SingleNodeCall(node, "blockdev_grow",
1273
                                [cf_bdev.ToDict(), amount, dryrun])
1274

    
1275
  @_RpcTimeout(_TMO_1DAY)
1276
  def call_blockdev_export(self, node, cf_bdev,
1277
                           dest_node, dest_path, cluster_name):
1278
    """Export a given disk to another node.
1279

1280
    This is a single-node call.
1281

1282
    """
1283
    return self._SingleNodeCall(node, "blockdev_export",
1284
                                [cf_bdev.ToDict(), dest_node, dest_path,
1285
                                 cluster_name])
1286

    
1287
  @_RpcTimeout(_TMO_NORMAL)
1288
  def call_blockdev_snapshot(self, node, cf_bdev):
1289
    """Request a snapshot of the given block device.
1290

1291
    This is a single-node call.
1292

1293
    """
1294
    return self._SingleNodeCall(node, "blockdev_snapshot", [cf_bdev.ToDict()])
1295

    
1296
  @_RpcTimeout(_TMO_NORMAL)
1297
  def call_finalize_export(self, node, instance, snap_disks):
1298
    """Request the completion of an export operation.
1299

1300
    This writes the export config file, etc.
1301

1302
    This is a single-node call.
1303

1304
    """
1305
    flat_disks = []
1306
    for disk in snap_disks:
1307
      if isinstance(disk, bool):
1308
        flat_disks.append(disk)
1309
      else:
1310
        flat_disks.append(disk.ToDict())
1311

    
1312
    return self._SingleNodeCall(node, "finalize_export",
1313
                                [self._InstDict(instance), flat_disks])
1314

    
1315
  @_RpcTimeout(_TMO_FAST)
1316
  def call_export_info(self, node, path):
1317
    """Queries the export information in a given path.
1318

1319
    This is a single-node call.
1320

1321
    """
1322
    return self._SingleNodeCall(node, "export_info", [path])
1323

    
1324
  @_RpcTimeout(_TMO_FAST)
1325
  def call_export_list(self, node_list):
1326
    """Gets the stored exports list.
1327

1328
    This is a multi-node call.
1329

1330
    """
1331
    return self._MultiNodeCall(node_list, "export_list", [])
1332

    
1333
  @_RpcTimeout(_TMO_FAST)
1334
  def call_export_remove(self, node, export):
1335
    """Requests removal of a given export.
1336

1337
    This is a single-node call.
1338

1339
    """
1340
    return self._SingleNodeCall(node, "export_remove", [export])
1341

    
1342
  @classmethod
1343
  @_RpcTimeout(_TMO_NORMAL)
1344
  def call_node_leave_cluster(cls, node, modify_ssh_setup):
1345
    """Requests a node to clean the cluster information it has.
1346

1347
    This will remove the configuration information from the ganeti data
1348
    dir.
1349

1350
    This is a single-node call.
1351

1352
    """
1353
    return cls._StaticSingleNodeCall(node, "node_leave_cluster",
1354
                                     [modify_ssh_setup])
1355

    
1356
  @_RpcTimeout(_TMO_FAST)
1357
  def call_node_volumes(self, node_list):
1358
    """Gets all volumes on node(s).
1359

1360
    This is a multi-node call.
1361

1362
    """
1363
    return self._MultiNodeCall(node_list, "node_volumes", [])
1364

    
1365
  @_RpcTimeout(_TMO_FAST)
1366
  def call_node_demote_from_mc(self, node):
1367
    """Demote a node from the master candidate role.
1368

1369
    This is a single-node call.
1370

1371
    """
1372
    return self._SingleNodeCall(node, "node_demote_from_mc", [])
1373

    
1374
  @_RpcTimeout(_TMO_NORMAL)
1375
  def call_node_powercycle(self, node, hypervisor):
1376
    """Tries to powercycle a node.
1377

1378
    This is a single-node call.
1379

1380
    """
1381
    return self._SingleNodeCall(node, "node_powercycle", [hypervisor])
1382

    
1383
  @_RpcTimeout(None)
1384
  def call_test_delay(self, node_list, duration):
1385
    """Sleep for a fixed time on given node(s).
1386

1387
    This is a multi-node call.
1388

1389
    """
1390
    return self._MultiNodeCall(node_list, "test_delay", [duration],
1391
                               read_timeout=int(duration + 5))
1392

    
1393
  @_RpcTimeout(_TMO_FAST)
1394
  def call_file_storage_dir_create(self, node, file_storage_dir):
1395
    """Create the given file storage directory.
1396

1397
    This is a single-node call.
1398

1399
    """
1400
    return self._SingleNodeCall(node, "file_storage_dir_create",
1401
                                [file_storage_dir])
1402

    
1403
  @_RpcTimeout(_TMO_FAST)
1404
  def call_file_storage_dir_remove(self, node, file_storage_dir):
1405
    """Remove the given file storage directory.
1406

1407
    This is a single-node call.
1408

1409
    """
1410
    return self._SingleNodeCall(node, "file_storage_dir_remove",
1411
                                [file_storage_dir])
1412

    
1413
  @_RpcTimeout(_TMO_FAST)
1414
  def call_file_storage_dir_rename(self, node, old_file_storage_dir,
1415
                                   new_file_storage_dir):
1416
    """Rename file storage directory.
1417

1418
    This is a single-node call.
1419

1420
    """
1421
    return self._SingleNodeCall(node, "file_storage_dir_rename",
1422
                                [old_file_storage_dir, new_file_storage_dir])
1423

    
1424
  @classmethod
1425
  @_RpcTimeout(_TMO_URGENT)
1426
  def call_jobqueue_update(cls, node_list, address_list, file_name, content):
1427
    """Update job queue.
1428

1429
    This is a multi-node call.
1430

1431
    """
1432
    return cls._StaticMultiNodeCall(node_list, "jobqueue_update",
1433
                                    [file_name, cls._Compress(content)],
1434
                                    address_list=address_list)
1435

    
1436
  @classmethod
1437
  @_RpcTimeout(_TMO_NORMAL)
1438
  def call_jobqueue_purge(cls, node):
1439
    """Purge job queue.
1440

1441
    This is a single-node call.
1442

1443
    """
1444
    return cls._StaticSingleNodeCall(node, "jobqueue_purge", [])
1445

    
1446
  @classmethod
1447
  @_RpcTimeout(_TMO_URGENT)
1448
  def call_jobqueue_rename(cls, node_list, address_list, rename):
1449
    """Rename a job queue file.
1450

1451
    This is a multi-node call.
1452

1453
    """
1454
    return cls._StaticMultiNodeCall(node_list, "jobqueue_rename", rename,
1455
                                    address_list=address_list)
1456

    
1457
  @_RpcTimeout(_TMO_NORMAL)
1458
  def call_hypervisor_validate_params(self, node_list, hvname, hvparams):
1459
    """Validate the hypervisor params.
1460

1461
    This is a multi-node call.
1462

1463
    @type node_list: list
1464
    @param node_list: the list of nodes to query
1465
    @type hvname: string
1466
    @param hvname: the hypervisor name
1467
    @type hvparams: dict
1468
    @param hvparams: the hypervisor parameters to be validated
1469

1470
    """
1471
    cluster = self._cfg.GetClusterInfo()
1472
    hv_full = objects.FillDict(cluster.hvparams.get(hvname, {}), hvparams)
1473
    return self._MultiNodeCall(node_list, "hypervisor_validate_params",
1474
                               [hvname, hv_full])
1475

    
1476
  @_RpcTimeout(_TMO_NORMAL)
1477
  def call_x509_cert_create(self, node, validity):
1478
    """Creates a new X509 certificate for SSL/TLS.
1479

1480
    This is a single-node call.
1481

1482
    @type validity: int
1483
    @param validity: Validity in seconds
1484

1485
    """
1486
    return self._SingleNodeCall(node, "x509_cert_create", [validity])
1487

    
1488
  @_RpcTimeout(_TMO_NORMAL)
1489
  def call_x509_cert_remove(self, node, name):
1490
    """Removes a X509 certificate.
1491

1492
    This is a single-node call.
1493

1494
    @type name: string
1495
    @param name: Certificate name
1496

1497
    """
1498
    return self._SingleNodeCall(node, "x509_cert_remove", [name])
1499

    
1500
  @_RpcTimeout(_TMO_NORMAL)
1501
  def call_import_start(self, node, opts, instance, component,
1502
                        dest, dest_args):
1503
    """Starts a listener for an import.
1504

1505
    This is a single-node call.
1506

1507
    @type node: string
1508
    @param node: Node name
1509
    @type instance: C{objects.Instance}
1510
    @param instance: Instance object
1511
    @type component: string
1512
    @param component: which part of the instance is being imported
1513

1514
    """
1515
    return self._SingleNodeCall(node, "import_start",
1516
                                [opts.ToDict(),
1517
                                 self._InstDict(instance), component, dest,
1518
                                 _EncodeImportExportIO(dest, dest_args)])
1519

    
1520
  @_RpcTimeout(_TMO_NORMAL)
1521
  def call_export_start(self, node, opts, host, port,
1522
                        instance, component, source, source_args):
1523
    """Starts an export daemon.
1524

1525
    This is a single-node call.
1526

1527
    @type node: string
1528
    @param node: Node name
1529
    @type instance: C{objects.Instance}
1530
    @param instance: Instance object
1531
    @type component: string
1532
    @param component: which part of the instance is being imported
1533

1534
    """
1535
    return self._SingleNodeCall(node, "export_start",
1536
                                [opts.ToDict(), host, port,
1537
                                 self._InstDict(instance),
1538
                                 component, source,
1539
                                 _EncodeImportExportIO(source, source_args)])
1540

    
1541
  @_RpcTimeout(_TMO_FAST)
1542
  def call_impexp_status(self, node, names):
1543
    """Gets the status of an import or export.
1544

1545
    This is a single-node call.
1546

1547
    @type node: string
1548
    @param node: Node name
1549
    @type names: List of strings
1550
    @param names: Import/export names
1551
    @rtype: List of L{objects.ImportExportStatus} instances
1552
    @return: Returns a list of the state of each named import/export or None if
1553
             a status couldn't be retrieved
1554

1555
    """
1556
    result = self._SingleNodeCall(node, "impexp_status", [names])
1557

    
1558
    if not result.fail_msg:
1559
      decoded = []
1560

    
1561
      for i in result.payload:
1562
        if i is None:
1563
          decoded.append(None)
1564
          continue
1565
        decoded.append(objects.ImportExportStatus.FromDict(i))
1566

    
1567
      result.payload = decoded
1568

    
1569
    return result
1570

    
1571
  @_RpcTimeout(_TMO_NORMAL)
1572
  def call_impexp_abort(self, node, name):
1573
    """Aborts an import or export.
1574

1575
    This is a single-node call.
1576

1577
    @type node: string
1578
    @param node: Node name
1579
    @type name: string
1580
    @param name: Import/export name
1581

1582
    """
1583
    return self._SingleNodeCall(node, "impexp_abort", [name])
1584

    
1585
  @_RpcTimeout(_TMO_NORMAL)
1586
  def call_impexp_cleanup(self, node, name):
1587
    """Cleans up after an import or export.
1588

1589
    This is a single-node call.
1590

1591
    @type node: string
1592
    @param node: Node name
1593
    @type name: string
1594
    @param name: Import/export name
1595

1596
    """
1597
    return self._SingleNodeCall(node, "impexp_cleanup", [name])