Statistics
| Branch: | Tag: | Revision:

root / lib / rpc.py @ 9a914f7a

History | View | Annotate | Download (46.5 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Inter-node RPC library.
23

24
"""
25

    
26
# pylint: disable-msg=C0103,R0201,R0904
27
# C0103: Invalid name, since call_ are not valid
28
# R0201: Method could be a function, we keep all rpcs instance methods
29
# as not to change them back and forth between static/instance methods
30
# if they need to start using instance attributes
31
# R0904: Too many public methods
32

    
33
import os
34
import logging
35
import zlib
36
import base64
37
import pycurl
38
import threading
39

    
40
from ganeti import utils
41
from ganeti import objects
42
from ganeti import http
43
from ganeti import serializer
44
from ganeti import constants
45
from ganeti import errors
46
from ganeti import netutils
47
from ganeti import ssconf
48
from ganeti import runtime
49

    
50
# pylint has a bug here, doesn't see this import
51
import ganeti.http.client  # pylint: disable-msg=W0611
52

    
53

    
54
# Timeout for connecting to nodes (seconds)
55
_RPC_CONNECT_TIMEOUT = 5
56

    
57
_RPC_CLIENT_HEADERS = [
58
  "Content-type: %s" % http.HTTP_APP_JSON,
59
  "Expect:",
60
  ]
61

    
62
# Various time constants for the timeout table
63
_TMO_URGENT = 60 # one minute
64
_TMO_FAST = 5 * 60 # five minutes
65
_TMO_NORMAL = 15 * 60 # 15 minutes
66
_TMO_SLOW = 3600 # one hour
67
_TMO_4HRS = 4 * 3600
68
_TMO_1DAY = 86400
69

    
70
# Timeout table that will be built later by decorators
71
# Guidelines for choosing timeouts:
72
# - call used during watcher: timeout -> 1min, _TMO_URGENT
73
# - trivial (but be sure it is trivial) (e.g. reading a file): 5min, _TMO_FAST
74
# - other calls: 15 min, _TMO_NORMAL
75
# - special calls (instance add, etc.): either _TMO_SLOW (1h) or huge timeouts
76

    
77
_TIMEOUTS = {
78
}
79

    
80

    
81
def Init():
82
  """Initializes the module-global HTTP client manager.
83

84
  Must be called before using any RPC function and while exactly one thread is
85
  running.
86

87
  """
88
  # curl_global_init(3) and curl_global_cleanup(3) must be called with only
89
  # one thread running. This check is just a safety measure -- it doesn't
90
  # cover all cases.
91
  assert threading.activeCount() == 1, \
92
         "Found more than one active thread when initializing pycURL"
93

    
94
  logging.info("Using PycURL %s", pycurl.version)
95

    
96
  pycurl.global_init(pycurl.GLOBAL_ALL)
97

    
98

    
99
def Shutdown():
100
  """Stops the module-global HTTP client manager.
101

102
  Must be called before quitting the program and while exactly one thread is
103
  running.
104

105
  """
106
  pycurl.global_cleanup()
107

    
108

    
109
def _ConfigRpcCurl(curl):
110
  noded_cert = str(constants.NODED_CERT_FILE)
111

    
112
  curl.setopt(pycurl.FOLLOWLOCATION, False)
113
  curl.setopt(pycurl.CAINFO, noded_cert)
114
  curl.setopt(pycurl.SSL_VERIFYHOST, 0)
115
  curl.setopt(pycurl.SSL_VERIFYPEER, True)
116
  curl.setopt(pycurl.SSLCERTTYPE, "PEM")
117
  curl.setopt(pycurl.SSLCERT, noded_cert)
118
  curl.setopt(pycurl.SSLKEYTYPE, "PEM")
119
  curl.setopt(pycurl.SSLKEY, noded_cert)
120
  curl.setopt(pycurl.CONNECTTIMEOUT, _RPC_CONNECT_TIMEOUT)
121

    
122

    
123
# Aliasing this module avoids the following warning by epydoc: "Warning: No
124
# information available for ganeti.rpc._RpcThreadLocal's base threading.local"
125
_threading = threading
126

    
127

    
128
class _RpcThreadLocal(_threading.local):
129
  def GetHttpClientPool(self):
130
    """Returns a per-thread HTTP client pool.
131

132
    @rtype: L{http.client.HttpClientPool}
133

134
    """
135
    try:
136
      pool = self.hcp
137
    except AttributeError:
138
      pool = http.client.HttpClientPool(_ConfigRpcCurl)
139
      self.hcp = pool
140

    
141
    return pool
142

    
143

    
144
# Remove module alias (see above)
145
del _threading
146

    
147

    
148
_thread_local = _RpcThreadLocal()
149

    
150

    
151
def _RpcTimeout(secs):
152
  """Timeout decorator.
153

154
  When applied to a rpc call_* function, it updates the global timeout
155
  table with the given function/timeout.
156

157
  """
158
  def decorator(f):
159
    name = f.__name__
160
    assert name.startswith("call_")
161
    _TIMEOUTS[name[len("call_"):]] = secs
162
    return f
163
  return decorator
164

    
165

    
166
def RunWithRPC(fn):
167
  """RPC-wrapper decorator.
168

169
  When applied to a function, it runs it with the RPC system
170
  initialized, and it shutsdown the system afterwards. This means the
171
  function must be called without RPC being initialized.
172

173
  """
174
  def wrapper(*args, **kwargs):
175
    Init()
176
    try:
177
      return fn(*args, **kwargs)
178
    finally:
179
      Shutdown()
180
  return wrapper
181

    
182

    
183
class RpcResult(object):
184
  """RPC Result class.
185

186
  This class holds an RPC result. It is needed since in multi-node
187
  calls we can't raise an exception just because one one out of many
188
  failed, and therefore we use this class to encapsulate the result.
189

190
  @ivar data: the data payload, for successful results, or None
191
  @ivar call: the name of the RPC call
192
  @ivar node: the name of the node to which we made the call
193
  @ivar offline: whether the operation failed because the node was
194
      offline, as opposed to actual failure; offline=True will always
195
      imply failed=True, in order to allow simpler checking if
196
      the user doesn't care about the exact failure mode
197
  @ivar fail_msg: the error message if the call failed
198

199
  """
200
  def __init__(self, data=None, failed=False, offline=False,
201
               call=None, node=None):
202
    self.offline = offline
203
    self.call = call
204
    self.node = node
205

    
206
    if offline:
207
      self.fail_msg = "Node is marked offline"
208
      self.data = self.payload = None
209
    elif failed:
210
      self.fail_msg = self._EnsureErr(data)
211
      self.data = self.payload = None
212
    else:
213
      self.data = data
214
      if not isinstance(self.data, (tuple, list)):
215
        self.fail_msg = ("RPC layer error: invalid result type (%s)" %
216
                         type(self.data))
217
        self.payload = None
218
      elif len(data) != 2:
219
        self.fail_msg = ("RPC layer error: invalid result length (%d), "
220
                         "expected 2" % len(self.data))
221
        self.payload = None
222
      elif not self.data[0]:
223
        self.fail_msg = self._EnsureErr(self.data[1])
224
        self.payload = None
225
      else:
226
        # finally success
227
        self.fail_msg = None
228
        self.payload = data[1]
229

    
230
    for attr_name in ["call", "data", "fail_msg",
231
                      "node", "offline", "payload"]:
232
      assert hasattr(self, attr_name), "Missing attribute %s" % attr_name
233

    
234
  @staticmethod
235
  def _EnsureErr(val):
236
    """Helper to ensure we return a 'True' value for error."""
237
    if val:
238
      return val
239
    else:
240
      return "No error information"
241

    
242
  def Raise(self, msg, prereq=False, ecode=None):
243
    """If the result has failed, raise an OpExecError.
244

245
    This is used so that LU code doesn't have to check for each
246
    result, but instead can call this function.
247

248
    """
249
    if not self.fail_msg:
250
      return
251

    
252
    if not msg: # one could pass None for default message
253
      msg = ("Call '%s' to node '%s' has failed: %s" %
254
             (self.call, self.node, self.fail_msg))
255
    else:
256
      msg = "%s: %s" % (msg, self.fail_msg)
257
    if prereq:
258
      ec = errors.OpPrereqError
259
    else:
260
      ec = errors.OpExecError
261
    if ecode is not None:
262
      args = (msg, ecode)
263
    else:
264
      args = (msg, )
265
    raise ec(*args) # pylint: disable-msg=W0142
266

    
267

    
268
def _AddressLookup(node_list,
269
                   ssc=ssconf.SimpleStore,
270
                   nslookup_fn=netutils.Hostname.GetIP):
271
  """Return addresses for given node names.
272

273
  @type node_list: list
274
  @param node_list: List of node names
275
  @type ssc: class
276
  @param ssc: SimpleStore class that is used to obtain node->ip mappings
277
  @type nslookup_fn: callable
278
  @param nslookup_fn: function use to do NS lookup
279
  @rtype: list of addresses and/or None's
280
  @returns: List of corresponding addresses, if found
281

282
  """
283
  ss = ssc()
284
  iplist = ss.GetNodePrimaryIPList()
285
  family = ss.GetPrimaryIPFamily()
286
  addresses = []
287
  ipmap = dict(entry.split() for entry in iplist)
288
  for node in node_list:
289
    address = ipmap.get(node)
290
    if address is None:
291
      address = nslookup_fn(node, family=family)
292
    addresses.append(address)
293

    
294
  return addresses
295

    
296

    
297
class Client:
298
  """RPC Client class.
299

300
  This class, given a (remote) method name, a list of parameters and a
301
  list of nodes, will contact (in parallel) all nodes, and return a
302
  dict of results (key: node name, value: result).
303

304
  One current bug is that generic failure is still signaled by
305
  'False' result, which is not good. This overloading of values can
306
  cause bugs.
307

308
  """
309
  def __init__(self, procedure, body, port, address_lookup_fn=_AddressLookup):
310
    assert procedure in _TIMEOUTS, ("New RPC call not declared in the"
311
                                    " timeouts table")
312
    self.procedure = procedure
313
    self.body = body
314
    self.port = port
315
    self._request = {}
316
    self._address_lookup_fn = address_lookup_fn
317

    
318
  def ConnectList(self, node_list, address_list=None, read_timeout=None):
319
    """Add a list of nodes to the target nodes.
320

321
    @type node_list: list
322
    @param node_list: the list of node names to connect
323
    @type address_list: list or None
324
    @keyword address_list: either None or a list with node addresses,
325
        which must have the same length as the node list
326
    @type read_timeout: int
327
    @param read_timeout: overwrites default timeout for operation
328

329
    """
330
    if address_list is None:
331
      # Always use IP address instead of node name
332
      address_list = self._address_lookup_fn(node_list)
333

    
334
    assert len(node_list) == len(address_list), \
335
           "Name and address lists must have the same length"
336

    
337
    for node, address in zip(node_list, address_list):
338
      self.ConnectNode(node, address, read_timeout=read_timeout)
339

    
340
  def ConnectNode(self, name, address=None, read_timeout=None):
341
    """Add a node to the target list.
342

343
    @type name: str
344
    @param name: the node name
345
    @type address: str
346
    @param address: the node address, if known
347
    @type read_timeout: int
348
    @param read_timeout: overwrites default timeout for operation
349

350
    """
351
    if address is None:
352
      # Always use IP address instead of node name
353
      address = self._address_lookup_fn([name])[0]
354

    
355
    assert(address is not None)
356

    
357
    if read_timeout is None:
358
      read_timeout = _TIMEOUTS[self.procedure]
359

    
360
    self._request[name] = \
361
      http.client.HttpClientRequest(str(address), self.port,
362
                                    http.HTTP_PUT, str("/%s" % self.procedure),
363
                                    headers=_RPC_CLIENT_HEADERS,
364
                                    post_data=str(self.body),
365
                                    read_timeout=read_timeout)
366

    
367
  def GetResults(self, http_pool=None):
368
    """Call nodes and return results.
369

370
    @rtype: list
371
    @return: List of RPC results
372

373
    """
374
    if not http_pool:
375
      http_pool = _thread_local.GetHttpClientPool()
376

    
377
    http_pool.ProcessRequests(self._request.values())
378

    
379
    results = {}
380

    
381
    for name, req in self._request.iteritems():
382
      if req.success and req.resp_status_code == http.HTTP_OK:
383
        results[name] = RpcResult(data=serializer.LoadJson(req.resp_body),
384
                                  node=name, call=self.procedure)
385
        continue
386

    
387
      # TODO: Better error reporting
388
      if req.error:
389
        msg = req.error
390
      else:
391
        msg = req.resp_body
392

    
393
      logging.error("RPC error in %s from node %s: %s",
394
                    self.procedure, name, msg)
395
      results[name] = RpcResult(data=msg, failed=True, node=name,
396
                                call=self.procedure)
397

    
398
    return results
399

    
400

    
401
def _EncodeImportExportIO(ieio, ieioargs):
402
  """Encodes import/export I/O information.
403

404
  """
405
  if ieio == constants.IEIO_RAW_DISK:
406
    assert len(ieioargs) == 1
407
    return (ieioargs[0].ToDict(), )
408

    
409
  if ieio == constants.IEIO_SCRIPT:
410
    assert len(ieioargs) == 2
411
    return (ieioargs[0].ToDict(), ieioargs[1])
412

    
413
  return ieioargs
414

    
415

    
416
class RpcRunner(object):
417
  """RPC runner class"""
418

    
419
  def __init__(self, cfg):
420
    """Initialized the rpc runner.
421

422
    @type cfg:  C{config.ConfigWriter}
423
    @param cfg: the configuration object that will be used to get data
424
                about the cluster
425

426
    """
427
    self._cfg = cfg
428
    self.port = netutils.GetDaemonPort(constants.NODED)
429

    
430
  def _InstDict(self, instance, hvp=None, bep=None, osp=None):
431
    """Convert the given instance to a dict.
432

433
    This is done via the instance's ToDict() method and additionally
434
    we fill the hvparams with the cluster defaults.
435

436
    @type instance: L{objects.Instance}
437
    @param instance: an Instance object
438
    @type hvp: dict or None
439
    @param hvp: a dictionary with overridden hypervisor parameters
440
    @type bep: dict or None
441
    @param bep: a dictionary with overridden backend parameters
442
    @type osp: dict or None
443
    @param osp: a dictionary with overridden os parameters
444
    @rtype: dict
445
    @return: the instance dict, with the hvparams filled with the
446
        cluster defaults
447

448
    """
449
    idict = instance.ToDict()
450
    cluster = self._cfg.GetClusterInfo()
451
    idict["hvparams"] = cluster.FillHV(instance)
452
    if hvp is not None:
453
      idict["hvparams"].update(hvp)
454
    idict["beparams"] = cluster.FillBE(instance)
455
    if bep is not None:
456
      idict["beparams"].update(bep)
457
    idict["osparams"] = cluster.SimpleFillOS(instance.os, instance.osparams)
458
    if osp is not None:
459
      idict["osparams"].update(osp)
460
    for nic in idict["nics"]:
461
      nic['nicparams'] = objects.FillDict(
462
        cluster.nicparams[constants.PP_DEFAULT],
463
        nic['nicparams'])
464
    return idict
465

    
466
  def _ConnectList(self, client, node_list, call, read_timeout=None):
467
    """Helper for computing node addresses.
468

469
    @type client: L{ganeti.rpc.Client}
470
    @param client: a C{Client} instance
471
    @type node_list: list
472
    @param node_list: the node list we should connect
473
    @type call: string
474
    @param call: the name of the remote procedure call, for filling in
475
        correctly any eventual offline nodes' results
476
    @type read_timeout: int
477
    @param read_timeout: overwrites the default read timeout for the
478
        given operation
479

480
    """
481
    all_nodes = self._cfg.GetAllNodesInfo()
482
    name_list = []
483
    addr_list = []
484
    skip_dict = {}
485
    for node in node_list:
486
      if node in all_nodes:
487
        if all_nodes[node].offline:
488
          skip_dict[node] = RpcResult(node=node, offline=True, call=call)
489
          continue
490
        val = all_nodes[node].primary_ip
491
      else:
492
        val = None
493
      addr_list.append(val)
494
      name_list.append(node)
495
    if name_list:
496
      client.ConnectList(name_list, address_list=addr_list,
497
                         read_timeout=read_timeout)
498
    return skip_dict
499

    
500
  def _ConnectNode(self, client, node, call, read_timeout=None):
501
    """Helper for computing one node's address.
502

503
    @type client: L{ganeti.rpc.Client}
504
    @param client: a C{Client} instance
505
    @type node: str
506
    @param node: the node we should connect
507
    @type call: string
508
    @param call: the name of the remote procedure call, for filling in
509
        correctly any eventual offline nodes' results
510
    @type read_timeout: int
511
    @param read_timeout: overwrites the default read timeout for the
512
        given operation
513

514
    """
515
    node_info = self._cfg.GetNodeInfo(node)
516
    if node_info is not None:
517
      if node_info.offline:
518
        return RpcResult(node=node, offline=True, call=call)
519
      addr = node_info.primary_ip
520
    else:
521
      addr = None
522
    client.ConnectNode(node, address=addr, read_timeout=read_timeout)
523

    
524
  def _MultiNodeCall(self, node_list, procedure, args, read_timeout=None):
525
    """Helper for making a multi-node call
526

527
    """
528
    body = serializer.DumpJson(args, indent=False)
529
    c = Client(procedure, body, self.port)
530
    skip_dict = self._ConnectList(c, node_list, procedure,
531
                                  read_timeout=read_timeout)
532
    skip_dict.update(c.GetResults())
533
    return skip_dict
534

    
535
  @classmethod
536
  def _StaticMultiNodeCall(cls, node_list, procedure, args,
537
                           address_list=None, read_timeout=None):
538
    """Helper for making a multi-node static call
539

540
    """
541
    body = serializer.DumpJson(args, indent=False)
542
    c = Client(procedure, body, netutils.GetDaemonPort(constants.NODED))
543
    c.ConnectList(node_list, address_list=address_list,
544
                  read_timeout=read_timeout)
545
    return c.GetResults()
546

    
547
  def _SingleNodeCall(self, node, procedure, args, read_timeout=None):
548
    """Helper for making a single-node call
549

550
    """
551
    body = serializer.DumpJson(args, indent=False)
552
    c = Client(procedure, body, self.port)
553
    result = self._ConnectNode(c, node, procedure, read_timeout=read_timeout)
554
    if result is None:
555
      # we did connect, node is not offline
556
      result = c.GetResults()[node]
557
    return result
558

    
559
  @classmethod
560
  def _StaticSingleNodeCall(cls, node, procedure, args, read_timeout=None):
561
    """Helper for making a single-node static call
562

563
    """
564
    body = serializer.DumpJson(args, indent=False)
565
    c = Client(procedure, body, netutils.GetDaemonPort(constants.NODED))
566
    c.ConnectNode(node, read_timeout=read_timeout)
567
    return c.GetResults()[node]
568

    
569
  @staticmethod
570
  def _Compress(data):
571
    """Compresses a string for transport over RPC.
572

573
    Small amounts of data are not compressed.
574

575
    @type data: str
576
    @param data: Data
577
    @rtype: tuple
578
    @return: Encoded data to send
579

580
    """
581
    # Small amounts of data are not compressed
582
    if len(data) < 512:
583
      return (constants.RPC_ENCODING_NONE, data)
584

    
585
    # Compress with zlib and encode in base64
586
    return (constants.RPC_ENCODING_ZLIB_BASE64,
587
            base64.b64encode(zlib.compress(data, 3)))
588

    
589
  #
590
  # Begin RPC calls
591
  #
592

    
593
  @_RpcTimeout(_TMO_URGENT)
594
  def call_lv_list(self, node_list, vg_name):
595
    """Gets the logical volumes present in a given volume group.
596

597
    This is a multi-node call.
598

599
    """
600
    return self._MultiNodeCall(node_list, "lv_list", [vg_name])
601

    
602
  @_RpcTimeout(_TMO_URGENT)
603
  def call_vg_list(self, node_list):
604
    """Gets the volume group list.
605

606
    This is a multi-node call.
607

608
    """
609
    return self._MultiNodeCall(node_list, "vg_list", [])
610

    
611
  @_RpcTimeout(_TMO_NORMAL)
612
  def call_storage_list(self, node_list, su_name, su_args, name, fields):
613
    """Get list of storage units.
614

615
    This is a multi-node call.
616

617
    """
618
    return self._MultiNodeCall(node_list, "storage_list",
619
                               [su_name, su_args, name, fields])
620

    
621
  @_RpcTimeout(_TMO_NORMAL)
622
  def call_storage_modify(self, node, su_name, su_args, name, changes):
623
    """Modify a storage unit.
624

625
    This is a single-node call.
626

627
    """
628
    return self._SingleNodeCall(node, "storage_modify",
629
                                [su_name, su_args, name, changes])
630

    
631
  @_RpcTimeout(_TMO_NORMAL)
632
  def call_storage_execute(self, node, su_name, su_args, name, op):
633
    """Executes an operation on a storage unit.
634

635
    This is a single-node call.
636

637
    """
638
    return self._SingleNodeCall(node, "storage_execute",
639
                                [su_name, su_args, name, op])
640

    
641
  @_RpcTimeout(_TMO_URGENT)
642
  def call_bridges_exist(self, node, bridges_list):
643
    """Checks if a node has all the bridges given.
644

645
    This method checks if all bridges given in the bridges_list are
646
    present on the remote node, so that an instance that uses interfaces
647
    on those bridges can be started.
648

649
    This is a single-node call.
650

651
    """
652
    return self._SingleNodeCall(node, "bridges_exist", [bridges_list])
653

    
654
  @_RpcTimeout(_TMO_NORMAL)
655
  def call_instance_start(self, node, instance, hvp, bep):
656
    """Starts an instance.
657

658
    This is a single-node call.
659

660
    """
661
    idict = self._InstDict(instance, hvp=hvp, bep=bep)
662
    return self._SingleNodeCall(node, "instance_start", [idict])
663

    
664
  @_RpcTimeout(_TMO_NORMAL)
665
  def call_instance_shutdown(self, node, instance, timeout):
666
    """Stops an instance.
667

668
    This is a single-node call.
669

670
    """
671
    return self._SingleNodeCall(node, "instance_shutdown",
672
                                [self._InstDict(instance), timeout])
673

    
674
  @_RpcTimeout(_TMO_NORMAL)
675
  def call_migration_info(self, node, instance):
676
    """Gather the information necessary to prepare an instance migration.
677

678
    This is a single-node call.
679

680
    @type node: string
681
    @param node: the node on which the instance is currently running
682
    @type instance: C{objects.Instance}
683
    @param instance: the instance definition
684

685
    """
686
    return self._SingleNodeCall(node, "migration_info",
687
                                [self._InstDict(instance)])
688

    
689
  @_RpcTimeout(_TMO_NORMAL)
690
  def call_accept_instance(self, node, instance, info, target):
691
    """Prepare a node to accept an instance.
692

693
    This is a single-node call.
694

695
    @type node: string
696
    @param node: the target node for the migration
697
    @type instance: C{objects.Instance}
698
    @param instance: the instance definition
699
    @type info: opaque/hypervisor specific (string/data)
700
    @param info: result for the call_migration_info call
701
    @type target: string
702
    @param target: target hostname (usually ip address) (on the node itself)
703

704
    """
705
    return self._SingleNodeCall(node, "accept_instance",
706
                                [self._InstDict(instance), info, target])
707

    
708
  @_RpcTimeout(_TMO_NORMAL)
709
  def call_finalize_migration(self, node, instance, info, success):
710
    """Finalize any target-node migration specific operation.
711

712
    This is called both in case of a successful migration and in case of error
713
    (in which case it should abort the migration).
714

715
    This is a single-node call.
716

717
    @type node: string
718
    @param node: the target node for the migration
719
    @type instance: C{objects.Instance}
720
    @param instance: the instance definition
721
    @type info: opaque/hypervisor specific (string/data)
722
    @param info: result for the call_migration_info call
723
    @type success: boolean
724
    @param success: whether the migration was a success or a failure
725

726
    """
727
    return self._SingleNodeCall(node, "finalize_migration",
728
                                [self._InstDict(instance), info, success])
729

    
730
  @_RpcTimeout(_TMO_SLOW)
731
  def call_instance_migrate(self, node, instance, target, live):
732
    """Migrate an instance.
733

734
    This is a single-node call.
735

736
    @type node: string
737
    @param node: the node on which the instance is currently running
738
    @type instance: C{objects.Instance}
739
    @param instance: the instance definition
740
    @type target: string
741
    @param target: the target node name
742
    @type live: boolean
743
    @param live: whether the migration should be done live or not (the
744
        interpretation of this parameter is left to the hypervisor)
745

746
    """
747
    return self._SingleNodeCall(node, "instance_migrate",
748
                                [self._InstDict(instance), target, live])
749

    
750
  @_RpcTimeout(_TMO_NORMAL)
751
  def call_instance_reboot(self, node, inst, reboot_type, shutdown_timeout):
752
    """Reboots an instance.
753

754
    This is a single-node call.
755

756
    """
757
    return self._SingleNodeCall(node, "instance_reboot",
758
                                [self._InstDict(inst), reboot_type,
759
                                 shutdown_timeout])
760

    
761
  @_RpcTimeout(_TMO_1DAY)
762
  def call_instance_os_add(self, node, inst, reinstall, debug, osparams=None):
763
    """Installs an OS on the given instance.
764

765
    This is a single-node call.
766

767
    """
768
    return self._SingleNodeCall(node, "instance_os_add",
769
                                [self._InstDict(inst, osp=osparams),
770
                                 reinstall, debug])
771

    
772
  @_RpcTimeout(_TMO_SLOW)
773
  def call_instance_run_rename(self, node, inst, old_name, debug):
774
    """Run the OS rename script for an instance.
775

776
    This is a single-node call.
777

778
    """
779
    return self._SingleNodeCall(node, "instance_run_rename",
780
                                [self._InstDict(inst), old_name, debug])
781

    
782
  @_RpcTimeout(_TMO_URGENT)
783
  def call_instance_info(self, node, instance, hname):
784
    """Returns information about a single instance.
785

786
    This is a single-node call.
787

788
    @type node: list
789
    @param node: the list of nodes to query
790
    @type instance: string
791
    @param instance: the instance name
792
    @type hname: string
793
    @param hname: the hypervisor type of the instance
794

795
    """
796
    return self._SingleNodeCall(node, "instance_info", [instance, hname])
797

    
798
  @_RpcTimeout(_TMO_NORMAL)
799
  def call_instance_migratable(self, node, instance):
800
    """Checks whether the given instance can be migrated.
801

802
    This is a single-node call.
803

804
    @param node: the node to query
805
    @type instance: L{objects.Instance}
806
    @param instance: the instance to check
807

808

809
    """
810
    return self._SingleNodeCall(node, "instance_migratable",
811
                                [self._InstDict(instance)])
812

    
813
  @_RpcTimeout(_TMO_URGENT)
814
  def call_all_instances_info(self, node_list, hypervisor_list):
815
    """Returns information about all instances on the given nodes.
816

817
    This is a multi-node call.
818

819
    @type node_list: list
820
    @param node_list: the list of nodes to query
821
    @type hypervisor_list: list
822
    @param hypervisor_list: the hypervisors to query for instances
823

824
    """
825
    return self._MultiNodeCall(node_list, "all_instances_info",
826
                               [hypervisor_list])
827

    
828
  @_RpcTimeout(_TMO_URGENT)
829
  def call_instance_list(self, node_list, hypervisor_list):
830
    """Returns the list of running instances on a given node.
831

832
    This is a multi-node call.
833

834
    @type node_list: list
835
    @param node_list: the list of nodes to query
836
    @type hypervisor_list: list
837
    @param hypervisor_list: the hypervisors to query for instances
838

839
    """
840
    return self._MultiNodeCall(node_list, "instance_list", [hypervisor_list])
841

    
842
  @_RpcTimeout(_TMO_FAST)
843
  def call_node_tcp_ping(self, node, source, target, port, timeout,
844
                         live_port_needed):
845
    """Do a TcpPing on the remote node
846

847
    This is a single-node call.
848

849
    """
850
    return self._SingleNodeCall(node, "node_tcp_ping",
851
                                [source, target, port, timeout,
852
                                 live_port_needed])
853

    
854
  @_RpcTimeout(_TMO_FAST)
855
  def call_node_has_ip_address(self, node, address):
856
    """Checks if a node has the given IP address.
857

858
    This is a single-node call.
859

860
    """
861
    return self._SingleNodeCall(node, "node_has_ip_address", [address])
862

    
863
  @_RpcTimeout(_TMO_URGENT)
864
  def call_node_info(self, node_list, vg_name, hypervisor_type):
865
    """Return node information.
866

867
    This will return memory information and volume group size and free
868
    space.
869

870
    This is a multi-node call.
871

872
    @type node_list: list
873
    @param node_list: the list of nodes to query
874
    @type vg_name: C{string}
875
    @param vg_name: the name of the volume group to ask for disk space
876
        information
877
    @type hypervisor_type: C{str}
878
    @param hypervisor_type: the name of the hypervisor to ask for
879
        memory information
880

881
    """
882
    return self._MultiNodeCall(node_list, "node_info",
883
                               [vg_name, hypervisor_type])
884

    
885
  @_RpcTimeout(_TMO_NORMAL)
886
  def call_etc_hosts_modify(self, node, mode, name, ip):
887
    """Modify hosts file with name
888

889
    @type node: string
890
    @param node: The node to call
891
    @type mode: string
892
    @param mode: The mode to operate. Currently "add" or "remove"
893
    @type name: string
894
    @param name: The host name to be modified
895
    @type ip: string
896
    @param ip: The ip of the entry (just valid if mode is "add")
897

898
    """
899
    return self._SingleNodeCall(node, "etc_hosts_modify", [mode, name, ip])
900

    
901
  @_RpcTimeout(_TMO_NORMAL)
902
  def call_node_verify(self, node_list, checkdict, cluster_name):
903
    """Request verification of given parameters.
904

905
    This is a multi-node call.
906

907
    """
908
    return self._MultiNodeCall(node_list, "node_verify",
909
                               [checkdict, cluster_name])
910

    
911
  @classmethod
912
  @_RpcTimeout(_TMO_FAST)
913
  def call_node_start_master(cls, node, start_daemons, no_voting):
914
    """Tells a node to activate itself as a master.
915

916
    This is a single-node call.
917

918
    """
919
    return cls._StaticSingleNodeCall(node, "node_start_master",
920
                                     [start_daemons, no_voting])
921

    
922
  @classmethod
923
  @_RpcTimeout(_TMO_FAST)
924
  def call_node_stop_master(cls, node, stop_daemons):
925
    """Tells a node to demote itself from master status.
926

927
    This is a single-node call.
928

929
    """
930
    return cls._StaticSingleNodeCall(node, "node_stop_master", [stop_daemons])
931

    
932
  @classmethod
933
  @_RpcTimeout(_TMO_URGENT)
934
  def call_master_info(cls, node_list):
935
    """Query master info.
936

937
    This is a multi-node call.
938

939
    """
940
    # TODO: should this method query down nodes?
941
    return cls._StaticMultiNodeCall(node_list, "master_info", [])
942

    
943
  @classmethod
944
  @_RpcTimeout(_TMO_URGENT)
945
  def call_version(cls, node_list):
946
    """Query node version.
947

948
    This is a multi-node call.
949

950
    """
951
    return cls._StaticMultiNodeCall(node_list, "version", [])
952

    
953
  @_RpcTimeout(_TMO_NORMAL)
954
  def call_blockdev_create(self, node, bdev, size, owner, on_primary, info):
955
    """Request creation of a given block device.
956

957
    This is a single-node call.
958

959
    """
960
    return self._SingleNodeCall(node, "blockdev_create",
961
                                [bdev.ToDict(), size, owner, on_primary, info])
962

    
963
  @_RpcTimeout(_TMO_SLOW)
964
  def call_blockdev_wipe(self, node, bdev, offset, size):
965
    """Request wipe at given offset with given size of a block device.
966

967
    This is a single-node call.
968

969
    """
970
    return self._SingleNodeCall(node, "blockdev_wipe",
971
                                [bdev.ToDict(), offset, size])
972

    
973
  @_RpcTimeout(_TMO_NORMAL)
974
  def call_blockdev_remove(self, node, bdev):
975
    """Request removal of a given block device.
976

977
    This is a single-node call.
978

979
    """
980
    return self._SingleNodeCall(node, "blockdev_remove", [bdev.ToDict()])
981

    
982
  @_RpcTimeout(_TMO_NORMAL)
983
  def call_blockdev_rename(self, node, devlist):
984
    """Request rename of the given block devices.
985

986
    This is a single-node call.
987

988
    """
989
    return self._SingleNodeCall(node, "blockdev_rename",
990
                                [(d.ToDict(), uid) for d, uid in devlist])
991

    
992
  @_RpcTimeout(_TMO_NORMAL)
993
  def call_blockdev_pause_resume_sync(self, node, disks, pause):
994
    """Request a pause/resume of given block device.
995

996
    This is a single-node call.
997

998
    """
999
    return self._SingleNodeCall(node, "blockdev_pause_resume_sync",
1000
                                [[bdev.ToDict() for bdev in disks], pause])
1001

    
1002
  @_RpcTimeout(_TMO_NORMAL)
1003
  def call_blockdev_assemble(self, node, disk, owner, on_primary, idx):
1004
    """Request assembling of a given block device.
1005

1006
    This is a single-node call.
1007

1008
    """
1009
    return self._SingleNodeCall(node, "blockdev_assemble",
1010
                                [disk.ToDict(), owner, on_primary, idx])
1011

    
1012
  @_RpcTimeout(_TMO_NORMAL)
1013
  def call_blockdev_shutdown(self, node, disk):
1014
    """Request shutdown of a given block device.
1015

1016
    This is a single-node call.
1017

1018
    """
1019
    return self._SingleNodeCall(node, "blockdev_shutdown", [disk.ToDict()])
1020

    
1021
  @_RpcTimeout(_TMO_NORMAL)
1022
  def call_blockdev_addchildren(self, node, bdev, ndevs):
1023
    """Request adding a list of children to a (mirroring) device.
1024

1025
    This is a single-node call.
1026

1027
    """
1028
    return self._SingleNodeCall(node, "blockdev_addchildren",
1029
                                [bdev.ToDict(),
1030
                                 [disk.ToDict() for disk in ndevs]])
1031

    
1032
  @_RpcTimeout(_TMO_NORMAL)
1033
  def call_blockdev_removechildren(self, node, bdev, ndevs):
1034
    """Request removing a list of children from a (mirroring) device.
1035

1036
    This is a single-node call.
1037

1038
    """
1039
    return self._SingleNodeCall(node, "blockdev_removechildren",
1040
                                [bdev.ToDict(),
1041
                                 [disk.ToDict() for disk in ndevs]])
1042

    
1043
  @_RpcTimeout(_TMO_NORMAL)
1044
  def call_blockdev_getmirrorstatus(self, node, disks):
1045
    """Request status of a (mirroring) device.
1046

1047
    This is a single-node call.
1048

1049
    """
1050
    result = self._SingleNodeCall(node, "blockdev_getmirrorstatus",
1051
                                  [dsk.ToDict() for dsk in disks])
1052
    if not result.fail_msg:
1053
      result.payload = [objects.BlockDevStatus.FromDict(i)
1054
                        for i in result.payload]
1055
    return result
1056

    
1057
  @_RpcTimeout(_TMO_NORMAL)
1058
  def call_blockdev_getmirrorstatus_multi(self, node_list, node_disks):
1059
    """Request status of (mirroring) devices from multiple nodes.
1060

1061
    This is a multi-node call.
1062

1063
    """
1064
    result = self._MultiNodeCall(node_list, "blockdev_getmirrorstatus_multi",
1065
                                 [dict((name, [dsk.ToDict() for dsk in disks])
1066
                                       for name, disks in node_disks.items())])
1067
    for nres in result.values():
1068
      if nres.fail_msg:
1069
        continue
1070

    
1071
      for idx, (success, status) in enumerate(nres.payload):
1072
        if success:
1073
          nres.payload[idx] = (success, objects.BlockDevStatus.FromDict(status))
1074

    
1075
    return result
1076

    
1077
  @_RpcTimeout(_TMO_NORMAL)
1078
  def call_blockdev_find(self, node, disk):
1079
    """Request identification of a given block device.
1080

1081
    This is a single-node call.
1082

1083
    """
1084
    result = self._SingleNodeCall(node, "blockdev_find", [disk.ToDict()])
1085
    if not result.fail_msg and result.payload is not None:
1086
      result.payload = objects.BlockDevStatus.FromDict(result.payload)
1087
    return result
1088

    
1089
  @_RpcTimeout(_TMO_NORMAL)
1090
  def call_blockdev_close(self, node, instance_name, disks):
1091
    """Closes the given block devices.
1092

1093
    This is a single-node call.
1094

1095
    """
1096
    params = [instance_name, [cf.ToDict() for cf in disks]]
1097
    return self._SingleNodeCall(node, "blockdev_close", params)
1098

    
1099
  @_RpcTimeout(_TMO_NORMAL)
1100
  def call_blockdev_getsize(self, node, disks):
1101
    """Returns the size of the given disks.
1102

1103
    This is a single-node call.
1104

1105
    """
1106
    params = [[cf.ToDict() for cf in disks]]
1107
    return self._SingleNodeCall(node, "blockdev_getsize", params)
1108

    
1109
  @_RpcTimeout(_TMO_NORMAL)
1110
  def call_drbd_disconnect_net(self, node_list, nodes_ip, disks):
1111
    """Disconnects the network of the given drbd devices.
1112

1113
    This is a multi-node call.
1114

1115
    """
1116
    return self._MultiNodeCall(node_list, "drbd_disconnect_net",
1117
                               [nodes_ip, [cf.ToDict() for cf in disks]])
1118

    
1119
  @_RpcTimeout(_TMO_NORMAL)
1120
  def call_drbd_attach_net(self, node_list, nodes_ip,
1121
                           disks, instance_name, multimaster):
1122
    """Disconnects the given drbd devices.
1123

1124
    This is a multi-node call.
1125

1126
    """
1127
    return self._MultiNodeCall(node_list, "drbd_attach_net",
1128
                               [nodes_ip, [cf.ToDict() for cf in disks],
1129
                                instance_name, multimaster])
1130

    
1131
  @_RpcTimeout(_TMO_SLOW)
1132
  def call_drbd_wait_sync(self, node_list, nodes_ip, disks):
1133
    """Waits for the synchronization of drbd devices is complete.
1134

1135
    This is a multi-node call.
1136

1137
    """
1138
    return self._MultiNodeCall(node_list, "drbd_wait_sync",
1139
                               [nodes_ip, [cf.ToDict() for cf in disks]])
1140

    
1141
  @_RpcTimeout(_TMO_URGENT)
1142
  def call_drbd_helper(self, node_list):
1143
    """Gets drbd helper.
1144

1145
    This is a multi-node call.
1146

1147
    """
1148
    return self._MultiNodeCall(node_list, "drbd_helper", [])
1149

    
1150
  @classmethod
1151
  @_RpcTimeout(_TMO_NORMAL)
1152
  def call_upload_file(cls, node_list, file_name, address_list=None):
1153
    """Upload a file.
1154

1155
    The node will refuse the operation in case the file is not on the
1156
    approved file list.
1157

1158
    This is a multi-node call.
1159

1160
    @type node_list: list
1161
    @param node_list: the list of node names to upload to
1162
    @type file_name: str
1163
    @param file_name: the filename to upload
1164
    @type address_list: list or None
1165
    @keyword address_list: an optional list of node addresses, in order
1166
        to optimize the RPC speed
1167

1168
    """
1169
    file_contents = utils.ReadFile(file_name)
1170
    data = cls._Compress(file_contents)
1171
    st = os.stat(file_name)
1172
    getents = runtime.GetEnts()
1173
    params = [file_name, data, st.st_mode, getents.LookupUid(st.st_uid),
1174
              getents.LookupGid(st.st_gid), st.st_atime, st.st_mtime]
1175
    return cls._StaticMultiNodeCall(node_list, "upload_file", params,
1176
                                    address_list=address_list)
1177

    
1178
  @classmethod
1179
  @_RpcTimeout(_TMO_NORMAL)
1180
  def call_write_ssconf_files(cls, node_list, values):
1181
    """Write ssconf files.
1182

1183
    This is a multi-node call.
1184

1185
    """
1186
    return cls._StaticMultiNodeCall(node_list, "write_ssconf_files", [values])
1187

    
1188
  @_RpcTimeout(_TMO_NORMAL)
1189
  def call_run_oob(self, node, oob_program, command, remote_node, timeout):
1190
    """Runs OOB.
1191

1192
    This is a single-node call.
1193

1194
    """
1195
    return self._SingleNodeCall(node, "run_oob", [oob_program, command,
1196
                                                  remote_node, timeout])
1197

    
1198
  @_RpcTimeout(_TMO_FAST)
1199
  def call_os_diagnose(self, node_list):
1200
    """Request a diagnose of OS definitions.
1201

1202
    This is a multi-node call.
1203

1204
    """
1205
    return self._MultiNodeCall(node_list, "os_diagnose", [])
1206

    
1207
  @_RpcTimeout(_TMO_FAST)
1208
  def call_os_get(self, node, name):
1209
    """Returns an OS definition.
1210

1211
    This is a single-node call.
1212

1213
    """
1214
    result = self._SingleNodeCall(node, "os_get", [name])
1215
    if not result.fail_msg and isinstance(result.payload, dict):
1216
      result.payload = objects.OS.FromDict(result.payload)
1217
    return result
1218

    
1219
  @_RpcTimeout(_TMO_FAST)
1220
  def call_os_validate(self, required, nodes, name, checks, params):
1221
    """Run a validation routine for a given OS.
1222

1223
    This is a multi-node call.
1224

1225
    """
1226
    return self._MultiNodeCall(nodes, "os_validate",
1227
                               [required, name, checks, params])
1228

    
1229
  @_RpcTimeout(_TMO_NORMAL)
1230
  def call_hooks_runner(self, node_list, hpath, phase, env):
1231
    """Call the hooks runner.
1232

1233
    Args:
1234
      - op: the OpCode instance
1235
      - env: a dictionary with the environment
1236

1237
    This is a multi-node call.
1238

1239
    """
1240
    params = [hpath, phase, env]
1241
    return self._MultiNodeCall(node_list, "hooks_runner", params)
1242

    
1243
  @_RpcTimeout(_TMO_NORMAL)
1244
  def call_iallocator_runner(self, node, name, idata):
1245
    """Call an iallocator on a remote node
1246

1247
    Args:
1248
      - name: the iallocator name
1249
      - input: the json-encoded input string
1250

1251
    This is a single-node call.
1252

1253
    """
1254
    return self._SingleNodeCall(node, "iallocator_runner", [name, idata])
1255

    
1256
  @_RpcTimeout(_TMO_NORMAL)
1257
  def call_blockdev_grow(self, node, cf_bdev, amount):
1258
    """Request a snapshot of the given block device.
1259

1260
    This is a single-node call.
1261

1262
    """
1263
    return self._SingleNodeCall(node, "blockdev_grow",
1264
                                [cf_bdev.ToDict(), amount])
1265

    
1266
  @_RpcTimeout(_TMO_1DAY)
1267
  def call_blockdev_export(self, node, cf_bdev,
1268
                           dest_node, dest_path, cluster_name):
1269
    """Export a given disk to another node.
1270

1271
    This is a single-node call.
1272

1273
    """
1274
    return self._SingleNodeCall(node, "blockdev_export",
1275
                                [cf_bdev.ToDict(), dest_node, dest_path,
1276
                                 cluster_name])
1277

    
1278
  @_RpcTimeout(_TMO_NORMAL)
1279
  def call_blockdev_snapshot(self, node, cf_bdev):
1280
    """Request a snapshot of the given block device.
1281

1282
    This is a single-node call.
1283

1284
    """
1285
    return self._SingleNodeCall(node, "blockdev_snapshot", [cf_bdev.ToDict()])
1286

    
1287
  @_RpcTimeout(_TMO_NORMAL)
1288
  def call_finalize_export(self, node, instance, snap_disks):
1289
    """Request the completion of an export operation.
1290

1291
    This writes the export config file, etc.
1292

1293
    This is a single-node call.
1294

1295
    """
1296
    flat_disks = []
1297
    for disk in snap_disks:
1298
      if isinstance(disk, bool):
1299
        flat_disks.append(disk)
1300
      else:
1301
        flat_disks.append(disk.ToDict())
1302

    
1303
    return self._SingleNodeCall(node, "finalize_export",
1304
                                [self._InstDict(instance), flat_disks])
1305

    
1306
  @_RpcTimeout(_TMO_FAST)
1307
  def call_export_info(self, node, path):
1308
    """Queries the export information in a given path.
1309

1310
    This is a single-node call.
1311

1312
    """
1313
    return self._SingleNodeCall(node, "export_info", [path])
1314

    
1315
  @_RpcTimeout(_TMO_FAST)
1316
  def call_export_list(self, node_list):
1317
    """Gets the stored exports list.
1318

1319
    This is a multi-node call.
1320

1321
    """
1322
    return self._MultiNodeCall(node_list, "export_list", [])
1323

    
1324
  @_RpcTimeout(_TMO_FAST)
1325
  def call_export_remove(self, node, export):
1326
    """Requests removal of a given export.
1327

1328
    This is a single-node call.
1329

1330
    """
1331
    return self._SingleNodeCall(node, "export_remove", [export])
1332

    
1333
  @classmethod
1334
  @_RpcTimeout(_TMO_NORMAL)
1335
  def call_node_leave_cluster(cls, node, modify_ssh_setup):
1336
    """Requests a node to clean the cluster information it has.
1337

1338
    This will remove the configuration information from the ganeti data
1339
    dir.
1340

1341
    This is a single-node call.
1342

1343
    """
1344
    return cls._StaticSingleNodeCall(node, "node_leave_cluster",
1345
                                     [modify_ssh_setup])
1346

    
1347
  @_RpcTimeout(_TMO_FAST)
1348
  def call_node_volumes(self, node_list):
1349
    """Gets all volumes on node(s).
1350

1351
    This is a multi-node call.
1352

1353
    """
1354
    return self._MultiNodeCall(node_list, "node_volumes", [])
1355

    
1356
  @_RpcTimeout(_TMO_FAST)
1357
  def call_node_demote_from_mc(self, node):
1358
    """Demote a node from the master candidate role.
1359

1360
    This is a single-node call.
1361

1362
    """
1363
    return self._SingleNodeCall(node, "node_demote_from_mc", [])
1364

    
1365
  @_RpcTimeout(_TMO_NORMAL)
1366
  def call_node_powercycle(self, node, hypervisor):
1367
    """Tries to powercycle a node.
1368

1369
    This is a single-node call.
1370

1371
    """
1372
    return self._SingleNodeCall(node, "node_powercycle", [hypervisor])
1373

    
1374
  @_RpcTimeout(None)
1375
  def call_test_delay(self, node_list, duration):
1376
    """Sleep for a fixed time on given node(s).
1377

1378
    This is a multi-node call.
1379

1380
    """
1381
    return self._MultiNodeCall(node_list, "test_delay", [duration],
1382
                               read_timeout=int(duration + 5))
1383

    
1384
  @_RpcTimeout(_TMO_FAST)
1385
  def call_file_storage_dir_create(self, node, file_storage_dir):
1386
    """Create the given file storage directory.
1387

1388
    This is a single-node call.
1389

1390
    """
1391
    return self._SingleNodeCall(node, "file_storage_dir_create",
1392
                                [file_storage_dir])
1393

    
1394
  @_RpcTimeout(_TMO_FAST)
1395
  def call_file_storage_dir_remove(self, node, file_storage_dir):
1396
    """Remove the given file storage directory.
1397

1398
    This is a single-node call.
1399

1400
    """
1401
    return self._SingleNodeCall(node, "file_storage_dir_remove",
1402
                                [file_storage_dir])
1403

    
1404
  @_RpcTimeout(_TMO_FAST)
1405
  def call_file_storage_dir_rename(self, node, old_file_storage_dir,
1406
                                   new_file_storage_dir):
1407
    """Rename file storage directory.
1408

1409
    This is a single-node call.
1410

1411
    """
1412
    return self._SingleNodeCall(node, "file_storage_dir_rename",
1413
                                [old_file_storage_dir, new_file_storage_dir])
1414

    
1415
  @classmethod
1416
  @_RpcTimeout(_TMO_URGENT)
1417
  def call_jobqueue_update(cls, node_list, address_list, file_name, content):
1418
    """Update job queue.
1419

1420
    This is a multi-node call.
1421

1422
    """
1423
    return cls._StaticMultiNodeCall(node_list, "jobqueue_update",
1424
                                    [file_name, cls._Compress(content)],
1425
                                    address_list=address_list)
1426

    
1427
  @classmethod
1428
  @_RpcTimeout(_TMO_NORMAL)
1429
  def call_jobqueue_purge(cls, node):
1430
    """Purge job queue.
1431

1432
    This is a single-node call.
1433

1434
    """
1435
    return cls._StaticSingleNodeCall(node, "jobqueue_purge", [])
1436

    
1437
  @classmethod
1438
  @_RpcTimeout(_TMO_URGENT)
1439
  def call_jobqueue_rename(cls, node_list, address_list, rename):
1440
    """Rename a job queue file.
1441

1442
    This is a multi-node call.
1443

1444
    """
1445
    return cls._StaticMultiNodeCall(node_list, "jobqueue_rename", rename,
1446
                                    address_list=address_list)
1447

    
1448
  @_RpcTimeout(_TMO_NORMAL)
1449
  def call_hypervisor_validate_params(self, node_list, hvname, hvparams):
1450
    """Validate the hypervisor params.
1451

1452
    This is a multi-node call.
1453

1454
    @type node_list: list
1455
    @param node_list: the list of nodes to query
1456
    @type hvname: string
1457
    @param hvname: the hypervisor name
1458
    @type hvparams: dict
1459
    @param hvparams: the hypervisor parameters to be validated
1460

1461
    """
1462
    cluster = self._cfg.GetClusterInfo()
1463
    hv_full = objects.FillDict(cluster.hvparams.get(hvname, {}), hvparams)
1464
    return self._MultiNodeCall(node_list, "hypervisor_validate_params",
1465
                               [hvname, hv_full])
1466

    
1467
  @_RpcTimeout(_TMO_NORMAL)
1468
  def call_x509_cert_create(self, node, validity):
1469
    """Creates a new X509 certificate for SSL/TLS.
1470

1471
    This is a single-node call.
1472

1473
    @type validity: int
1474
    @param validity: Validity in seconds
1475

1476
    """
1477
    return self._SingleNodeCall(node, "x509_cert_create", [validity])
1478

    
1479
  @_RpcTimeout(_TMO_NORMAL)
1480
  def call_x509_cert_remove(self, node, name):
1481
    """Removes a X509 certificate.
1482

1483
    This is a single-node call.
1484

1485
    @type name: string
1486
    @param name: Certificate name
1487

1488
    """
1489
    return self._SingleNodeCall(node, "x509_cert_remove", [name])
1490

    
1491
  @_RpcTimeout(_TMO_NORMAL)
1492
  def call_import_start(self, node, opts, instance, dest, dest_args):
1493
    """Starts a listener for an import.
1494

1495
    This is a single-node call.
1496

1497
    @type node: string
1498
    @param node: Node name
1499
    @type instance: C{objects.Instance}
1500
    @param instance: Instance object
1501

1502
    """
1503
    return self._SingleNodeCall(node, "import_start",
1504
                                [opts.ToDict(),
1505
                                 self._InstDict(instance), dest,
1506
                                 _EncodeImportExportIO(dest, dest_args)])
1507

    
1508
  @_RpcTimeout(_TMO_NORMAL)
1509
  def call_export_start(self, node, opts, host, port,
1510
                        instance, source, source_args):
1511
    """Starts an export daemon.
1512

1513
    This is a single-node call.
1514

1515
    @type node: string
1516
    @param node: Node name
1517
    @type instance: C{objects.Instance}
1518
    @param instance: Instance object
1519

1520
    """
1521
    return self._SingleNodeCall(node, "export_start",
1522
                                [opts.ToDict(), host, port,
1523
                                 self._InstDict(instance), source,
1524
                                 _EncodeImportExportIO(source, source_args)])
1525

    
1526
  @_RpcTimeout(_TMO_FAST)
1527
  def call_impexp_status(self, node, names):
1528
    """Gets the status of an import or export.
1529

1530
    This is a single-node call.
1531

1532
    @type node: string
1533
    @param node: Node name
1534
    @type names: List of strings
1535
    @param names: Import/export names
1536
    @rtype: List of L{objects.ImportExportStatus} instances
1537
    @return: Returns a list of the state of each named import/export or None if
1538
             a status couldn't be retrieved
1539

1540
    """
1541
    result = self._SingleNodeCall(node, "impexp_status", [names])
1542

    
1543
    if not result.fail_msg:
1544
      decoded = []
1545

    
1546
      for i in result.payload:
1547
        if i is None:
1548
          decoded.append(None)
1549
          continue
1550
        decoded.append(objects.ImportExportStatus.FromDict(i))
1551

    
1552
      result.payload = decoded
1553

    
1554
    return result
1555

    
1556
  @_RpcTimeout(_TMO_NORMAL)
1557
  def call_impexp_abort(self, node, name):
1558
    """Aborts an import or export.
1559

1560
    This is a single-node call.
1561

1562
    @type node: string
1563
    @param node: Node name
1564
    @type name: string
1565
    @param name: Import/export name
1566

1567
    """
1568
    return self._SingleNodeCall(node, "impexp_abort", [name])
1569

    
1570
  @_RpcTimeout(_TMO_NORMAL)
1571
  def call_impexp_cleanup(self, node, name):
1572
    """Cleans up after an import or export.
1573

1574
    This is a single-node call.
1575

1576
    @type node: string
1577
    @param node: Node name
1578
    @type name: string
1579
    @param name: Import/export name
1580

1581
    """
1582
    return self._SingleNodeCall(node, "impexp_cleanup", [name])