Statistics
| Branch: | Tag: | Revision:

root / lib / rpc.py @ c46b9782

History | View | Annotate | Download (41.8 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Inter-node RPC library.
23

24
"""
25

    
26
# pylint: disable-msg=C0103,R0201,R0904
27
# C0103: Invalid name, since call_ are not valid
28
# R0201: Method could be a function, we keep all rpcs instance methods
29
# as not to change them back and forth between static/instance methods
30
# if they need to start using instance attributes
31
# R0904: Too many public methods
32

    
33
import os
34
import logging
35
import zlib
36
import base64
37

    
38
from ganeti import utils
39
from ganeti import objects
40
from ganeti import http
41
from ganeti import serializer
42
from ganeti import constants
43
from ganeti import errors
44

    
45
# pylint has a bug here, doesn't see this import
46
import ganeti.http.client  # pylint: disable-msg=W0611
47

    
48

    
49
# Module level variable
50
_http_manager = None
51

    
52
# Various time constants for the timeout table
53
_TMO_URGENT = 60 # one minute
54
_TMO_FAST = 5 * 60 # five minutes
55
_TMO_NORMAL = 15 * 60 # 15 minutes
56
_TMO_SLOW = 3600 # one hour
57
_TMO_4HRS = 4 * 3600
58
_TMO_1DAY = 86400
59

    
60
# Timeout table that will be built later by decorators
61
# Guidelines for choosing timeouts:
62
# - call used during watcher: timeout -> 1min, _TMO_URGENT
63
# - trivial (but be sure it is trivial) (e.g. reading a file): 5min, _TMO_FAST
64
# - other calls: 15 min, _TMO_NORMAL
65
# - special calls (instance add, etc.): either _TMO_SLOW (1h) or huge timeouts
66

    
67
_TIMEOUTS = {
68
}
69

    
70

    
71
def Init():
72
  """Initializes the module-global HTTP client manager.
73

74
  Must be called before using any RPC function.
75

76
  """
77
  global _http_manager # pylint: disable-msg=W0603
78

    
79
  assert not _http_manager, "RPC module initialized more than once"
80

    
81
  http.InitSsl()
82

    
83
  _http_manager = http.client.HttpClientManager()
84

    
85

    
86
def Shutdown():
87
  """Stops the module-global HTTP client manager.
88

89
  Must be called before quitting the program.
90

91
  """
92
  global _http_manager # pylint: disable-msg=W0603
93

    
94
  if _http_manager:
95
    _http_manager.Shutdown()
96
    _http_manager = None
97

    
98

    
99
def _RpcTimeout(secs):
100
  """Timeout decorator.
101

102
  When applied to a rpc call_* function, it updates the global timeout
103
  table with the given function/timeout.
104

105
  """
106
  def decorator(f):
107
    name = f.__name__
108
    assert name.startswith("call_")
109
    _TIMEOUTS[name[len("call_"):]] = secs
110
    return f
111
  return decorator
112

    
113

    
114
class RpcResult(object):
115
  """RPC Result class.
116

117
  This class holds an RPC result. It is needed since in multi-node
118
  calls we can't raise an exception just because one one out of many
119
  failed, and therefore we use this class to encapsulate the result.
120

121
  @ivar data: the data payload, for successful results, or None
122
  @ivar call: the name of the RPC call
123
  @ivar node: the name of the node to which we made the call
124
  @ivar offline: whether the operation failed because the node was
125
      offline, as opposed to actual failure; offline=True will always
126
      imply failed=True, in order to allow simpler checking if
127
      the user doesn't care about the exact failure mode
128
  @ivar fail_msg: the error message if the call failed
129

130
  """
131
  def __init__(self, data=None, failed=False, offline=False,
132
               call=None, node=None):
133
    self.offline = offline
134
    self.call = call
135
    self.node = node
136

    
137
    if offline:
138
      self.fail_msg = "Node is marked offline"
139
      self.data = self.payload = None
140
    elif failed:
141
      self.fail_msg = self._EnsureErr(data)
142
      self.data = self.payload = None
143
    else:
144
      self.data = data
145
      if not isinstance(self.data, (tuple, list)):
146
        self.fail_msg = ("RPC layer error: invalid result type (%s)" %
147
                         type(self.data))
148
        self.payload = None
149
      elif len(data) != 2:
150
        self.fail_msg = ("RPC layer error: invalid result length (%d), "
151
                         "expected 2" % len(self.data))
152
        self.payload = None
153
      elif not self.data[0]:
154
        self.fail_msg = self._EnsureErr(self.data[1])
155
        self.payload = None
156
      else:
157
        # finally success
158
        self.fail_msg = None
159
        self.payload = data[1]
160

    
161
    assert hasattr(self, "call")
162
    assert hasattr(self, "data")
163
    assert hasattr(self, "fail_msg")
164
    assert hasattr(self, "node")
165
    assert hasattr(self, "offline")
166
    assert hasattr(self, "payload")
167

    
168
  @staticmethod
169
  def _EnsureErr(val):
170
    """Helper to ensure we return a 'True' value for error."""
171
    if val:
172
      return val
173
    else:
174
      return "No error information"
175

    
176
  def Raise(self, msg, prereq=False, ecode=None):
177
    """If the result has failed, raise an OpExecError.
178

179
    This is used so that LU code doesn't have to check for each
180
    result, but instead can call this function.
181

182
    """
183
    if not self.fail_msg:
184
      return
185

    
186
    if not msg: # one could pass None for default message
187
      msg = ("Call '%s' to node '%s' has failed: %s" %
188
             (self.call, self.node, self.fail_msg))
189
    else:
190
      msg = "%s: %s" % (msg, self.fail_msg)
191
    if prereq:
192
      ec = errors.OpPrereqError
193
    else:
194
      ec = errors.OpExecError
195
    if ecode is not None:
196
      args = (msg, ecode)
197
    else:
198
      args = (msg, )
199
    raise ec(*args) # pylint: disable-msg=W0142
200

    
201

    
202
class Client:
203
  """RPC Client class.
204

205
  This class, given a (remote) method name, a list of parameters and a
206
  list of nodes, will contact (in parallel) all nodes, and return a
207
  dict of results (key: node name, value: result).
208

209
  One current bug is that generic failure is still signaled by
210
  'False' result, which is not good. This overloading of values can
211
  cause bugs.
212

213
  """
214
  def __init__(self, procedure, body, port):
215
    assert procedure in _TIMEOUTS, ("New RPC call not declared in the"
216
                                    " timeouts table")
217
    self.procedure = procedure
218
    self.body = body
219
    self.port = port
220
    self.nc = {}
221

    
222
    self._ssl_params = \
223
      http.HttpSslParams(ssl_key_path=constants.NODED_CERT_FILE,
224
                         ssl_cert_path=constants.NODED_CERT_FILE)
225

    
226
  def ConnectList(self, node_list, address_list=None, read_timeout=None):
227
    """Add a list of nodes to the target nodes.
228

229
    @type node_list: list
230
    @param node_list: the list of node names to connect
231
    @type address_list: list or None
232
    @keyword address_list: either None or a list with node addresses,
233
        which must have the same length as the node list
234
    @type read_timeout: int
235
    @param read_timeout: overwrites the default read timeout for the
236
        given operation
237

238
    """
239
    if address_list is None:
240
      address_list = [None for _ in node_list]
241
    else:
242
      assert len(node_list) == len(address_list), \
243
             "Name and address lists should have the same length"
244
    for node, address in zip(node_list, address_list):
245
      self.ConnectNode(node, address, read_timeout=read_timeout)
246

    
247
  def ConnectNode(self, name, address=None, read_timeout=None):
248
    """Add a node to the target list.
249

250
    @type name: str
251
    @param name: the node name
252
    @type address: str
253
    @keyword address: the node address, if known
254

255
    """
256
    if address is None:
257
      address = name
258

    
259
    if read_timeout is None:
260
      read_timeout = _TIMEOUTS[self.procedure]
261

    
262
    self.nc[name] = \
263
      http.client.HttpClientRequest(address, self.port, http.HTTP_PUT,
264
                                    "/%s" % self.procedure,
265
                                    post_data=self.body,
266
                                    ssl_params=self._ssl_params,
267
                                    ssl_verify_peer=True,
268
                                    read_timeout=read_timeout)
269

    
270
  def GetResults(self):
271
    """Call nodes and return results.
272

273
    @rtype: list
274
    @return: List of RPC results
275

276
    """
277
    assert _http_manager, "RPC module not initialized"
278

    
279
    _http_manager.ExecRequests(self.nc.values())
280

    
281
    results = {}
282

    
283
    for name, req in self.nc.iteritems():
284
      if req.success and req.resp_status_code == http.HTTP_OK:
285
        results[name] = RpcResult(data=serializer.LoadJson(req.resp_body),
286
                                  node=name, call=self.procedure)
287
        continue
288

    
289
      # TODO: Better error reporting
290
      if req.error:
291
        msg = req.error
292
      else:
293
        msg = req.resp_body
294

    
295
      logging.error("RPC error in %s from node %s: %s",
296
                    self.procedure, name, msg)
297
      results[name] = RpcResult(data=msg, failed=True, node=name,
298
                                call=self.procedure)
299

    
300
    return results
301

    
302

    
303
def _EncodeImportExportIO(ieio, ieioargs):
304
  """Encodes import/export I/O information.
305

306
  """
307
  if ieio == constants.IEIO_RAW_DISK:
308
    assert len(ieioargs) == 1
309
    return (ieioargs[0].ToDict(), )
310

    
311
  if ieio == constants.IEIO_SCRIPT:
312
    assert len(ieioargs) == 2
313
    return (ieioargs[0].ToDict(), ieioargs[1])
314

    
315
  return ieioargs
316

    
317

    
318
class RpcRunner(object):
319
  """RPC runner class"""
320

    
321
  def __init__(self, cfg):
322
    """Initialized the rpc runner.
323

324
    @type cfg:  C{config.ConfigWriter}
325
    @param cfg: the configuration object that will be used to get data
326
                about the cluster
327

328
    """
329
    self._cfg = cfg
330
    self.port = utils.GetDaemonPort(constants.NODED)
331

    
332
  def _InstDict(self, instance, hvp=None, bep=None, osp=None):
333
    """Convert the given instance to a dict.
334

335
    This is done via the instance's ToDict() method and additionally
336
    we fill the hvparams with the cluster defaults.
337

338
    @type instance: L{objects.Instance}
339
    @param instance: an Instance object
340
    @type hvp: dict or None
341
    @param hvp: a dictionary with overridden hypervisor parameters
342
    @type bep: dict or None
343
    @param bep: a dictionary with overridden backend parameters
344
    @type osp: dict or None
345
    @param osp: a dictionary with overriden os parameters
346
    @rtype: dict
347
    @return: the instance dict, with the hvparams filled with the
348
        cluster defaults
349

350
    """
351
    idict = instance.ToDict()
352
    cluster = self._cfg.GetClusterInfo()
353
    idict["hvparams"] = cluster.FillHV(instance)
354
    if hvp is not None:
355
      idict["hvparams"].update(hvp)
356
    idict["beparams"] = cluster.FillBE(instance)
357
    if bep is not None:
358
      idict["beparams"].update(bep)
359
    idict["osparams"] = cluster.SimpleFillOS(instance.os, instance.osparams)
360
    if osp is not None:
361
      idict["osparams"].update(osp)
362
    for nic in idict["nics"]:
363
      nic['nicparams'] = objects.FillDict(
364
        cluster.nicparams[constants.PP_DEFAULT],
365
        nic['nicparams'])
366
    return idict
367

    
368
  def _ConnectList(self, client, node_list, call, read_timeout=None):
369
    """Helper for computing node addresses.
370

371
    @type client: L{ganeti.rpc.Client}
372
    @param client: a C{Client} instance
373
    @type node_list: list
374
    @param node_list: the node list we should connect
375
    @type call: string
376
    @param call: the name of the remote procedure call, for filling in
377
        correctly any eventual offline nodes' results
378
    @type read_timeout: int
379
    @param read_timeout: overwrites the default read timeout for the
380
        given operation
381

382
    """
383
    all_nodes = self._cfg.GetAllNodesInfo()
384
    name_list = []
385
    addr_list = []
386
    skip_dict = {}
387
    for node in node_list:
388
      if node in all_nodes:
389
        if all_nodes[node].offline:
390
          skip_dict[node] = RpcResult(node=node, offline=True, call=call)
391
          continue
392
        val = all_nodes[node].primary_ip
393
      else:
394
        val = None
395
      addr_list.append(val)
396
      name_list.append(node)
397
    if name_list:
398
      client.ConnectList(name_list, address_list=addr_list,
399
                         read_timeout=read_timeout)
400
    return skip_dict
401

    
402
  def _ConnectNode(self, client, node, call, read_timeout=None):
403
    """Helper for computing one node's address.
404

405
    @type client: L{ganeti.rpc.Client}
406
    @param client: a C{Client} instance
407
    @type node: str
408
    @param node: the node we should connect
409
    @type call: string
410
    @param call: the name of the remote procedure call, for filling in
411
        correctly any eventual offline nodes' results
412
    @type read_timeout: int
413
    @param read_timeout: overwrites the default read timeout for the
414
        given operation
415

416
    """
417
    node_info = self._cfg.GetNodeInfo(node)
418
    if node_info is not None:
419
      if node_info.offline:
420
        return RpcResult(node=node, offline=True, call=call)
421
      addr = node_info.primary_ip
422
    else:
423
      addr = None
424
    client.ConnectNode(node, address=addr, read_timeout=read_timeout)
425

    
426
  def _MultiNodeCall(self, node_list, procedure, args, read_timeout=None):
427
    """Helper for making a multi-node call
428

429
    """
430
    body = serializer.DumpJson(args, indent=False)
431
    c = Client(procedure, body, self.port)
432
    skip_dict = self._ConnectList(c, node_list, procedure,
433
                                  read_timeout=read_timeout)
434
    skip_dict.update(c.GetResults())
435
    return skip_dict
436

    
437
  @classmethod
438
  def _StaticMultiNodeCall(cls, node_list, procedure, args,
439
                           address_list=None, read_timeout=None):
440
    """Helper for making a multi-node static call
441

442
    """
443
    body = serializer.DumpJson(args, indent=False)
444
    c = Client(procedure, body, utils.GetDaemonPort(constants.NODED))
445
    c.ConnectList(node_list, address_list=address_list,
446
                  read_timeout=read_timeout)
447
    return c.GetResults()
448

    
449
  def _SingleNodeCall(self, node, procedure, args, read_timeout=None):
450
    """Helper for making a single-node call
451

452
    """
453
    body = serializer.DumpJson(args, indent=False)
454
    c = Client(procedure, body, self.port)
455
    result = self._ConnectNode(c, node, procedure, read_timeout=read_timeout)
456
    if result is None:
457
      # we did connect, node is not offline
458
      result = c.GetResults()[node]
459
    return result
460

    
461
  @classmethod
462
  def _StaticSingleNodeCall(cls, node, procedure, args, read_timeout=None):
463
    """Helper for making a single-node static call
464

465
    """
466
    body = serializer.DumpJson(args, indent=False)
467
    c = Client(procedure, body, utils.GetDaemonPort(constants.NODED))
468
    c.ConnectNode(node, read_timeout=read_timeout)
469
    return c.GetResults()[node]
470

    
471
  @staticmethod
472
  def _Compress(data):
473
    """Compresses a string for transport over RPC.
474

475
    Small amounts of data are not compressed.
476

477
    @type data: str
478
    @param data: Data
479
    @rtype: tuple
480
    @return: Encoded data to send
481

482
    """
483
    # Small amounts of data are not compressed
484
    if len(data) < 512:
485
      return (constants.RPC_ENCODING_NONE, data)
486

    
487
    # Compress with zlib and encode in base64
488
    return (constants.RPC_ENCODING_ZLIB_BASE64,
489
            base64.b64encode(zlib.compress(data, 3)))
490

    
491
  #
492
  # Begin RPC calls
493
  #
494

    
495
  @_RpcTimeout(_TMO_URGENT)
496
  def call_lv_list(self, node_list, vg_name):
497
    """Gets the logical volumes present in a given volume group.
498

499
    This is a multi-node call.
500

501
    """
502
    return self._MultiNodeCall(node_list, "lv_list", [vg_name])
503

    
504
  @_RpcTimeout(_TMO_URGENT)
505
  def call_vg_list(self, node_list):
506
    """Gets the volume group list.
507

508
    This is a multi-node call.
509

510
    """
511
    return self._MultiNodeCall(node_list, "vg_list", [])
512

    
513
  @_RpcTimeout(_TMO_NORMAL)
514
  def call_storage_list(self, node_list, su_name, su_args, name, fields):
515
    """Get list of storage units.
516

517
    This is a multi-node call.
518

519
    """
520
    return self._MultiNodeCall(node_list, "storage_list",
521
                               [su_name, su_args, name, fields])
522

    
523
  @_RpcTimeout(_TMO_NORMAL)
524
  def call_storage_modify(self, node, su_name, su_args, name, changes):
525
    """Modify a storage unit.
526

527
    This is a single-node call.
528

529
    """
530
    return self._SingleNodeCall(node, "storage_modify",
531
                                [su_name, su_args, name, changes])
532

    
533
  @_RpcTimeout(_TMO_NORMAL)
534
  def call_storage_execute(self, node, su_name, su_args, name, op):
535
    """Executes an operation on a storage unit.
536

537
    This is a single-node call.
538

539
    """
540
    return self._SingleNodeCall(node, "storage_execute",
541
                                [su_name, su_args, name, op])
542

    
543
  @_RpcTimeout(_TMO_URGENT)
544
  def call_bridges_exist(self, node, bridges_list):
545
    """Checks if a node has all the bridges given.
546

547
    This method checks if all bridges given in the bridges_list are
548
    present on the remote node, so that an instance that uses interfaces
549
    on those bridges can be started.
550

551
    This is a single-node call.
552

553
    """
554
    return self._SingleNodeCall(node, "bridges_exist", [bridges_list])
555

    
556
  @_RpcTimeout(_TMO_NORMAL)
557
  def call_instance_start(self, node, instance, hvp, bep):
558
    """Starts an instance.
559

560
    This is a single-node call.
561

562
    """
563
    idict = self._InstDict(instance, hvp=hvp, bep=bep)
564
    return self._SingleNodeCall(node, "instance_start", [idict])
565

    
566
  @_RpcTimeout(_TMO_NORMAL)
567
  def call_instance_shutdown(self, node, instance, timeout):
568
    """Stops an instance.
569

570
    This is a single-node call.
571

572
    """
573
    return self._SingleNodeCall(node, "instance_shutdown",
574
                                [self._InstDict(instance), timeout])
575

    
576
  @_RpcTimeout(_TMO_NORMAL)
577
  def call_migration_info(self, node, instance):
578
    """Gather the information necessary to prepare an instance migration.
579

580
    This is a single-node call.
581

582
    @type node: string
583
    @param node: the node on which the instance is currently running
584
    @type instance: C{objects.Instance}
585
    @param instance: the instance definition
586

587
    """
588
    return self._SingleNodeCall(node, "migration_info",
589
                                [self._InstDict(instance)])
590

    
591
  @_RpcTimeout(_TMO_NORMAL)
592
  def call_accept_instance(self, node, instance, info, target):
593
    """Prepare a node to accept an instance.
594

595
    This is a single-node call.
596

597
    @type node: string
598
    @param node: the target node for the migration
599
    @type instance: C{objects.Instance}
600
    @param instance: the instance definition
601
    @type info: opaque/hypervisor specific (string/data)
602
    @param info: result for the call_migration_info call
603
    @type target: string
604
    @param target: target hostname (usually ip address) (on the node itself)
605

606
    """
607
    return self._SingleNodeCall(node, "accept_instance",
608
                                [self._InstDict(instance), info, target])
609

    
610
  @_RpcTimeout(_TMO_NORMAL)
611
  def call_finalize_migration(self, node, instance, info, success):
612
    """Finalize any target-node migration specific operation.
613

614
    This is called both in case of a successful migration and in case of error
615
    (in which case it should abort the migration).
616

617
    This is a single-node call.
618

619
    @type node: string
620
    @param node: the target node for the migration
621
    @type instance: C{objects.Instance}
622
    @param instance: the instance definition
623
    @type info: opaque/hypervisor specific (string/data)
624
    @param info: result for the call_migration_info call
625
    @type success: boolean
626
    @param success: whether the migration was a success or a failure
627

628
    """
629
    return self._SingleNodeCall(node, "finalize_migration",
630
                                [self._InstDict(instance), info, success])
631

    
632
  @_RpcTimeout(_TMO_SLOW)
633
  def call_instance_migrate(self, node, instance, target, live):
634
    """Migrate an instance.
635

636
    This is a single-node call.
637

638
    @type node: string
639
    @param node: the node on which the instance is currently running
640
    @type instance: C{objects.Instance}
641
    @param instance: the instance definition
642
    @type target: string
643
    @param target: the target node name
644
    @type live: boolean
645
    @param live: whether the migration should be done live or not (the
646
        interpretation of this parameter is left to the hypervisor)
647

648
    """
649
    return self._SingleNodeCall(node, "instance_migrate",
650
                                [self._InstDict(instance), target, live])
651

    
652
  @_RpcTimeout(_TMO_NORMAL)
653
  def call_instance_reboot(self, node, inst, reboot_type, shutdown_timeout):
654
    """Reboots an instance.
655

656
    This is a single-node call.
657

658
    """
659
    return self._SingleNodeCall(node, "instance_reboot",
660
                                [self._InstDict(inst), reboot_type,
661
                                 shutdown_timeout])
662

    
663
  @_RpcTimeout(_TMO_1DAY)
664
  def call_instance_os_add(self, node, inst, reinstall, debug):
665
    """Installs an OS on the given instance.
666

667
    This is a single-node call.
668

669
    """
670
    return self._SingleNodeCall(node, "instance_os_add",
671
                                [self._InstDict(inst), reinstall, debug])
672

    
673
  @_RpcTimeout(_TMO_SLOW)
674
  def call_instance_run_rename(self, node, inst, old_name, debug):
675
    """Run the OS rename script for an instance.
676

677
    This is a single-node call.
678

679
    """
680
    return self._SingleNodeCall(node, "instance_run_rename",
681
                                [self._InstDict(inst), old_name, debug])
682

    
683
  @_RpcTimeout(_TMO_URGENT)
684
  def call_instance_info(self, node, instance, hname):
685
    """Returns information about a single instance.
686

687
    This is a single-node call.
688

689
    @type node: list
690
    @param node: the list of nodes to query
691
    @type instance: string
692
    @param instance: the instance name
693
    @type hname: string
694
    @param hname: the hypervisor type of the instance
695

696
    """
697
    return self._SingleNodeCall(node, "instance_info", [instance, hname])
698

    
699
  @_RpcTimeout(_TMO_NORMAL)
700
  def call_instance_migratable(self, node, instance):
701
    """Checks whether the given instance can be migrated.
702

703
    This is a single-node call.
704

705
    @param node: the node to query
706
    @type instance: L{objects.Instance}
707
    @param instance: the instance to check
708

709

710
    """
711
    return self._SingleNodeCall(node, "instance_migratable",
712
                                [self._InstDict(instance)])
713

    
714
  @_RpcTimeout(_TMO_URGENT)
715
  def call_all_instances_info(self, node_list, hypervisor_list):
716
    """Returns information about all instances on the given nodes.
717

718
    This is a multi-node call.
719

720
    @type node_list: list
721
    @param node_list: the list of nodes to query
722
    @type hypervisor_list: list
723
    @param hypervisor_list: the hypervisors to query for instances
724

725
    """
726
    return self._MultiNodeCall(node_list, "all_instances_info",
727
                               [hypervisor_list])
728

    
729
  @_RpcTimeout(_TMO_URGENT)
730
  def call_instance_list(self, node_list, hypervisor_list):
731
    """Returns the list of running instances on a given node.
732

733
    This is a multi-node call.
734

735
    @type node_list: list
736
    @param node_list: the list of nodes to query
737
    @type hypervisor_list: list
738
    @param hypervisor_list: the hypervisors to query for instances
739

740
    """
741
    return self._MultiNodeCall(node_list, "instance_list", [hypervisor_list])
742

    
743
  @_RpcTimeout(_TMO_FAST)
744
  def call_node_tcp_ping(self, node, source, target, port, timeout,
745
                         live_port_needed):
746
    """Do a TcpPing on the remote node
747

748
    This is a single-node call.
749

750
    """
751
    return self._SingleNodeCall(node, "node_tcp_ping",
752
                                [source, target, port, timeout,
753
                                 live_port_needed])
754

    
755
  @_RpcTimeout(_TMO_FAST)
756
  def call_node_has_ip_address(self, node, address):
757
    """Checks if a node has the given IP address.
758

759
    This is a single-node call.
760

761
    """
762
    return self._SingleNodeCall(node, "node_has_ip_address", [address])
763

    
764
  @_RpcTimeout(_TMO_URGENT)
765
  def call_node_info(self, node_list, vg_name, hypervisor_type):
766
    """Return node information.
767

768
    This will return memory information and volume group size and free
769
    space.
770

771
    This is a multi-node call.
772

773
    @type node_list: list
774
    @param node_list: the list of nodes to query
775
    @type vg_name: C{string}
776
    @param vg_name: the name of the volume group to ask for disk space
777
        information
778
    @type hypervisor_type: C{str}
779
    @param hypervisor_type: the name of the hypervisor to ask for
780
        memory information
781

782
    """
783
    return self._MultiNodeCall(node_list, "node_info",
784
                               [vg_name, hypervisor_type])
785

    
786
  @_RpcTimeout(_TMO_NORMAL)
787
  def call_node_add(self, node, dsa, dsapub, rsa, rsapub, ssh, sshpub):
788
    """Add a node to the cluster.
789

790
    This is a single-node call.
791

792
    """
793
    return self._SingleNodeCall(node, "node_add",
794
                                [dsa, dsapub, rsa, rsapub, ssh, sshpub])
795

    
796
  @_RpcTimeout(_TMO_NORMAL)
797
  def call_node_verify(self, node_list, checkdict, cluster_name):
798
    """Request verification of given parameters.
799

800
    This is a multi-node call.
801

802
    """
803
    return self._MultiNodeCall(node_list, "node_verify",
804
                               [checkdict, cluster_name])
805

    
806
  @classmethod
807
  @_RpcTimeout(_TMO_FAST)
808
  def call_node_start_master(cls, node, start_daemons, no_voting):
809
    """Tells a node to activate itself as a master.
810

811
    This is a single-node call.
812

813
    """
814
    return cls._StaticSingleNodeCall(node, "node_start_master",
815
                                     [start_daemons, no_voting])
816

    
817
  @classmethod
818
  @_RpcTimeout(_TMO_FAST)
819
  def call_node_stop_master(cls, node, stop_daemons):
820
    """Tells a node to demote itself from master status.
821

822
    This is a single-node call.
823

824
    """
825
    return cls._StaticSingleNodeCall(node, "node_stop_master", [stop_daemons])
826

    
827
  @classmethod
828
  @_RpcTimeout(_TMO_URGENT)
829
  def call_master_info(cls, node_list):
830
    """Query master info.
831

832
    This is a multi-node call.
833

834
    """
835
    # TODO: should this method query down nodes?
836
    return cls._StaticMultiNodeCall(node_list, "master_info", [])
837

    
838
  @classmethod
839
  @_RpcTimeout(_TMO_URGENT)
840
  def call_version(cls, node_list):
841
    """Query node version.
842

843
    This is a multi-node call.
844

845
    """
846
    return cls._StaticMultiNodeCall(node_list, "version", [])
847

    
848
  @_RpcTimeout(_TMO_NORMAL)
849
  def call_blockdev_create(self, node, bdev, size, owner, on_primary, info):
850
    """Request creation of a given block device.
851

852
    This is a single-node call.
853

854
    """
855
    return self._SingleNodeCall(node, "blockdev_create",
856
                                [bdev.ToDict(), size, owner, on_primary, info])
857

    
858
  @_RpcTimeout(_TMO_NORMAL)
859
  def call_blockdev_remove(self, node, bdev):
860
    """Request removal of a given block device.
861

862
    This is a single-node call.
863

864
    """
865
    return self._SingleNodeCall(node, "blockdev_remove", [bdev.ToDict()])
866

    
867
  @_RpcTimeout(_TMO_NORMAL)
868
  def call_blockdev_rename(self, node, devlist):
869
    """Request rename of the given block devices.
870

871
    This is a single-node call.
872

873
    """
874
    return self._SingleNodeCall(node, "blockdev_rename",
875
                                [(d.ToDict(), uid) for d, uid in devlist])
876

    
877
  @_RpcTimeout(_TMO_NORMAL)
878
  def call_blockdev_assemble(self, node, disk, owner, on_primary):
879
    """Request assembling of a given block device.
880

881
    This is a single-node call.
882

883
    """
884
    return self._SingleNodeCall(node, "blockdev_assemble",
885
                                [disk.ToDict(), owner, on_primary])
886

    
887
  @_RpcTimeout(_TMO_NORMAL)
888
  def call_blockdev_shutdown(self, node, disk):
889
    """Request shutdown of a given block device.
890

891
    This is a single-node call.
892

893
    """
894
    return self._SingleNodeCall(node, "blockdev_shutdown", [disk.ToDict()])
895

    
896
  @_RpcTimeout(_TMO_NORMAL)
897
  def call_blockdev_addchildren(self, node, bdev, ndevs):
898
    """Request adding a list of children to a (mirroring) device.
899

900
    This is a single-node call.
901

902
    """
903
    return self._SingleNodeCall(node, "blockdev_addchildren",
904
                                [bdev.ToDict(),
905
                                 [disk.ToDict() for disk in ndevs]])
906

    
907
  @_RpcTimeout(_TMO_NORMAL)
908
  def call_blockdev_removechildren(self, node, bdev, ndevs):
909
    """Request removing a list of children from a (mirroring) device.
910

911
    This is a single-node call.
912

913
    """
914
    return self._SingleNodeCall(node, "blockdev_removechildren",
915
                                [bdev.ToDict(),
916
                                 [disk.ToDict() for disk in ndevs]])
917

    
918
  @_RpcTimeout(_TMO_NORMAL)
919
  def call_blockdev_getmirrorstatus(self, node, disks):
920
    """Request status of a (mirroring) device.
921

922
    This is a single-node call.
923

924
    """
925
    result = self._SingleNodeCall(node, "blockdev_getmirrorstatus",
926
                                  [dsk.ToDict() for dsk in disks])
927
    if not result.fail_msg:
928
      result.payload = [objects.BlockDevStatus.FromDict(i)
929
                        for i in result.payload]
930
    return result
931

    
932
  @_RpcTimeout(_TMO_NORMAL)
933
  def call_blockdev_find(self, node, disk):
934
    """Request identification of a given block device.
935

936
    This is a single-node call.
937

938
    """
939
    result = self._SingleNodeCall(node, "blockdev_find", [disk.ToDict()])
940
    if not result.fail_msg and result.payload is not None:
941
      result.payload = objects.BlockDevStatus.FromDict(result.payload)
942
    return result
943

    
944
  @_RpcTimeout(_TMO_NORMAL)
945
  def call_blockdev_close(self, node, instance_name, disks):
946
    """Closes the given block devices.
947

948
    This is a single-node call.
949

950
    """
951
    params = [instance_name, [cf.ToDict() for cf in disks]]
952
    return self._SingleNodeCall(node, "blockdev_close", params)
953

    
954
  @_RpcTimeout(_TMO_NORMAL)
955
  def call_blockdev_getsizes(self, node, disks):
956
    """Returns the size of the given disks.
957

958
    This is a single-node call.
959

960
    """
961
    params = [[cf.ToDict() for cf in disks]]
962
    return self._SingleNodeCall(node, "blockdev_getsize", params)
963

    
964
  @_RpcTimeout(_TMO_NORMAL)
965
  def call_drbd_disconnect_net(self, node_list, nodes_ip, disks):
966
    """Disconnects the network of the given drbd devices.
967

968
    This is a multi-node call.
969

970
    """
971
    return self._MultiNodeCall(node_list, "drbd_disconnect_net",
972
                               [nodes_ip, [cf.ToDict() for cf in disks]])
973

    
974
  @_RpcTimeout(_TMO_NORMAL)
975
  def call_drbd_attach_net(self, node_list, nodes_ip,
976
                           disks, instance_name, multimaster):
977
    """Disconnects the given drbd devices.
978

979
    This is a multi-node call.
980

981
    """
982
    return self._MultiNodeCall(node_list, "drbd_attach_net",
983
                               [nodes_ip, [cf.ToDict() for cf in disks],
984
                                instance_name, multimaster])
985

    
986
  @_RpcTimeout(_TMO_SLOW)
987
  def call_drbd_wait_sync(self, node_list, nodes_ip, disks):
988
    """Waits for the synchronization of drbd devices is complete.
989

990
    This is a multi-node call.
991

992
    """
993
    return self._MultiNodeCall(node_list, "drbd_wait_sync",
994
                               [nodes_ip, [cf.ToDict() for cf in disks]])
995

    
996
  @_RpcTimeout(_TMO_URGENT)
997
  def call_drbd_helper(self, node_list):
998
    """Gets drbd helper.
999

1000
    This is a multi-node call.
1001

1002
    """
1003
    return self._MultiNodeCall(node_list, "drbd_helper", [])
1004

    
1005
  @classmethod
1006
  @_RpcTimeout(_TMO_NORMAL)
1007
  def call_upload_file(cls, node_list, file_name, address_list=None):
1008
    """Upload a file.
1009

1010
    The node will refuse the operation in case the file is not on the
1011
    approved file list.
1012

1013
    This is a multi-node call.
1014

1015
    @type node_list: list
1016
    @param node_list: the list of node names to upload to
1017
    @type file_name: str
1018
    @param file_name: the filename to upload
1019
    @type address_list: list or None
1020
    @keyword address_list: an optional list of node addresses, in order
1021
        to optimize the RPC speed
1022

1023
    """
1024
    file_contents = utils.ReadFile(file_name)
1025
    data = cls._Compress(file_contents)
1026
    st = os.stat(file_name)
1027
    params = [file_name, data, st.st_mode, st.st_uid, st.st_gid,
1028
              st.st_atime, st.st_mtime]
1029
    return cls._StaticMultiNodeCall(node_list, "upload_file", params,
1030
                                    address_list=address_list)
1031

    
1032
  @classmethod
1033
  @_RpcTimeout(_TMO_NORMAL)
1034
  def call_write_ssconf_files(cls, node_list, values):
1035
    """Write ssconf files.
1036

1037
    This is a multi-node call.
1038

1039
    """
1040
    return cls._StaticMultiNodeCall(node_list, "write_ssconf_files", [values])
1041

    
1042
  @_RpcTimeout(_TMO_FAST)
1043
  def call_os_diagnose(self, node_list):
1044
    """Request a diagnose of OS definitions.
1045

1046
    This is a multi-node call.
1047

1048
    """
1049
    return self._MultiNodeCall(node_list, "os_diagnose", [])
1050

    
1051
  @_RpcTimeout(_TMO_FAST)
1052
  def call_os_get(self, node, name):
1053
    """Returns an OS definition.
1054

1055
    This is a single-node call.
1056

1057
    """
1058
    result = self._SingleNodeCall(node, "os_get", [name])
1059
    if not result.fail_msg and isinstance(result.payload, dict):
1060
      result.payload = objects.OS.FromDict(result.payload)
1061
    return result
1062

    
1063
  @_RpcTimeout(_TMO_FAST)
1064
  def call_os_validate(self, required, nodes, name, checks, params):
1065
    """Run a validation routine for a given OS.
1066

1067
    This is a multi-node call.
1068

1069
    """
1070
    return self._MultiNodeCall(nodes, "os_validate",
1071
                               [required, name, checks, params])
1072

    
1073
  @_RpcTimeout(_TMO_NORMAL)
1074
  def call_hooks_runner(self, node_list, hpath, phase, env):
1075
    """Call the hooks runner.
1076

1077
    Args:
1078
      - op: the OpCode instance
1079
      - env: a dictionary with the environment
1080

1081
    This is a multi-node call.
1082

1083
    """
1084
    params = [hpath, phase, env]
1085
    return self._MultiNodeCall(node_list, "hooks_runner", params)
1086

    
1087
  @_RpcTimeout(_TMO_NORMAL)
1088
  def call_iallocator_runner(self, node, name, idata):
1089
    """Call an iallocator on a remote node
1090

1091
    Args:
1092
      - name: the iallocator name
1093
      - input: the json-encoded input string
1094

1095
    This is a single-node call.
1096

1097
    """
1098
    return self._SingleNodeCall(node, "iallocator_runner", [name, idata])
1099

    
1100
  @_RpcTimeout(_TMO_NORMAL)
1101
  def call_blockdev_grow(self, node, cf_bdev, amount):
1102
    """Request a snapshot of the given block device.
1103

1104
    This is a single-node call.
1105

1106
    """
1107
    return self._SingleNodeCall(node, "blockdev_grow",
1108
                                [cf_bdev.ToDict(), amount])
1109

    
1110
  @_RpcTimeout(_TMO_1DAY)
1111
  def call_blockdev_export(self, node, cf_bdev,
1112
                           dest_node, dest_path, cluster_name):
1113
    """Export a given disk to another node.
1114

1115
    This is a single-node call.
1116

1117
    """
1118
    return self._SingleNodeCall(node, "blockdev_export",
1119
                                [cf_bdev.ToDict(), dest_node, dest_path,
1120
                                 cluster_name])
1121

    
1122
  @_RpcTimeout(_TMO_NORMAL)
1123
  def call_blockdev_snapshot(self, node, cf_bdev):
1124
    """Request a snapshot of the given block device.
1125

1126
    This is a single-node call.
1127

1128
    """
1129
    return self._SingleNodeCall(node, "blockdev_snapshot", [cf_bdev.ToDict()])
1130

    
1131
  @_RpcTimeout(_TMO_NORMAL)
1132
  def call_finalize_export(self, node, instance, snap_disks):
1133
    """Request the completion of an export operation.
1134

1135
    This writes the export config file, etc.
1136

1137
    This is a single-node call.
1138

1139
    """
1140
    flat_disks = []
1141
    for disk in snap_disks:
1142
      if isinstance(disk, bool):
1143
        flat_disks.append(disk)
1144
      else:
1145
        flat_disks.append(disk.ToDict())
1146

    
1147
    return self._SingleNodeCall(node, "finalize_export",
1148
                                [self._InstDict(instance), flat_disks])
1149

    
1150
  @_RpcTimeout(_TMO_FAST)
1151
  def call_export_info(self, node, path):
1152
    """Queries the export information in a given path.
1153

1154
    This is a single-node call.
1155

1156
    """
1157
    return self._SingleNodeCall(node, "export_info", [path])
1158

    
1159
  @_RpcTimeout(_TMO_FAST)
1160
  def call_export_list(self, node_list):
1161
    """Gets the stored exports list.
1162

1163
    This is a multi-node call.
1164

1165
    """
1166
    return self._MultiNodeCall(node_list, "export_list", [])
1167

    
1168
  @_RpcTimeout(_TMO_FAST)
1169
  def call_export_remove(self, node, export):
1170
    """Requests removal of a given export.
1171

1172
    This is a single-node call.
1173

1174
    """
1175
    return self._SingleNodeCall(node, "export_remove", [export])
1176

    
1177
  @classmethod
1178
  @_RpcTimeout(_TMO_NORMAL)
1179
  def call_node_leave_cluster(cls, node, modify_ssh_setup):
1180
    """Requests a node to clean the cluster information it has.
1181

1182
    This will remove the configuration information from the ganeti data
1183
    dir.
1184

1185
    This is a single-node call.
1186

1187
    """
1188
    return cls._StaticSingleNodeCall(node, "node_leave_cluster",
1189
                                     [modify_ssh_setup])
1190

    
1191
  @_RpcTimeout(_TMO_FAST)
1192
  def call_node_volumes(self, node_list):
1193
    """Gets all volumes on node(s).
1194

1195
    This is a multi-node call.
1196

1197
    """
1198
    return self._MultiNodeCall(node_list, "node_volumes", [])
1199

    
1200
  @_RpcTimeout(_TMO_FAST)
1201
  def call_node_demote_from_mc(self, node):
1202
    """Demote a node from the master candidate role.
1203

1204
    This is a single-node call.
1205

1206
    """
1207
    return self._SingleNodeCall(node, "node_demote_from_mc", [])
1208

    
1209
  @_RpcTimeout(_TMO_NORMAL)
1210
  def call_node_powercycle(self, node, hypervisor):
1211
    """Tries to powercycle a node.
1212

1213
    This is a single-node call.
1214

1215
    """
1216
    return self._SingleNodeCall(node, "node_powercycle", [hypervisor])
1217

    
1218
  @_RpcTimeout(None)
1219
  def call_test_delay(self, node_list, duration):
1220
    """Sleep for a fixed time on given node(s).
1221

1222
    This is a multi-node call.
1223

1224
    """
1225
    return self._MultiNodeCall(node_list, "test_delay", [duration],
1226
                               read_timeout=int(duration + 5))
1227

    
1228
  @_RpcTimeout(_TMO_FAST)
1229
  def call_file_storage_dir_create(self, node, file_storage_dir):
1230
    """Create the given file storage directory.
1231

1232
    This is a single-node call.
1233

1234
    """
1235
    return self._SingleNodeCall(node, "file_storage_dir_create",
1236
                                [file_storage_dir])
1237

    
1238
  @_RpcTimeout(_TMO_FAST)
1239
  def call_file_storage_dir_remove(self, node, file_storage_dir):
1240
    """Remove the given file storage directory.
1241

1242
    This is a single-node call.
1243

1244
    """
1245
    return self._SingleNodeCall(node, "file_storage_dir_remove",
1246
                                [file_storage_dir])
1247

    
1248
  @_RpcTimeout(_TMO_FAST)
1249
  def call_file_storage_dir_rename(self, node, old_file_storage_dir,
1250
                                   new_file_storage_dir):
1251
    """Rename file storage directory.
1252

1253
    This is a single-node call.
1254

1255
    """
1256
    return self._SingleNodeCall(node, "file_storage_dir_rename",
1257
                                [old_file_storage_dir, new_file_storage_dir])
1258

    
1259
  @classmethod
1260
  @_RpcTimeout(_TMO_FAST)
1261
  def call_jobqueue_update(cls, node_list, address_list, file_name, content):
1262
    """Update job queue.
1263

1264
    This is a multi-node call.
1265

1266
    """
1267
    return cls._StaticMultiNodeCall(node_list, "jobqueue_update",
1268
                                    [file_name, cls._Compress(content)],
1269
                                    address_list=address_list)
1270

    
1271
  @classmethod
1272
  @_RpcTimeout(_TMO_NORMAL)
1273
  def call_jobqueue_purge(cls, node):
1274
    """Purge job queue.
1275

1276
    This is a single-node call.
1277

1278
    """
1279
    return cls._StaticSingleNodeCall(node, "jobqueue_purge", [])
1280

    
1281
  @classmethod
1282
  @_RpcTimeout(_TMO_FAST)
1283
  def call_jobqueue_rename(cls, node_list, address_list, rename):
1284
    """Rename a job queue file.
1285

1286
    This is a multi-node call.
1287

1288
    """
1289
    return cls._StaticMultiNodeCall(node_list, "jobqueue_rename", rename,
1290
                                    address_list=address_list)
1291

    
1292
  @_RpcTimeout(_TMO_NORMAL)
1293
  def call_hypervisor_validate_params(self, node_list, hvname, hvparams):
1294
    """Validate the hypervisor params.
1295

1296
    This is a multi-node call.
1297

1298
    @type node_list: list
1299
    @param node_list: the list of nodes to query
1300
    @type hvname: string
1301
    @param hvname: the hypervisor name
1302
    @type hvparams: dict
1303
    @param hvparams: the hypervisor parameters to be validated
1304

1305
    """
1306
    cluster = self._cfg.GetClusterInfo()
1307
    hv_full = objects.FillDict(cluster.hvparams.get(hvname, {}), hvparams)
1308
    return self._MultiNodeCall(node_list, "hypervisor_validate_params",
1309
                               [hvname, hv_full])
1310

    
1311
  @_RpcTimeout(_TMO_NORMAL)
1312
  def call_x509_cert_create(self, node, validity):
1313
    """Creates a new X509 certificate for SSL/TLS.
1314

1315
    This is a single-node call.
1316

1317
    @type validity: int
1318
    @param validity: Validity in seconds
1319

1320
    """
1321
    return self._SingleNodeCall(node, "x509_cert_create", [validity])
1322

    
1323
  @_RpcTimeout(_TMO_NORMAL)
1324
  def call_x509_cert_remove(self, node, name):
1325
    """Removes a X509 certificate.
1326

1327
    This is a single-node call.
1328

1329
    @type name: string
1330
    @param name: Certificate name
1331

1332
    """
1333
    return self._SingleNodeCall(node, "x509_cert_remove", [name])
1334

    
1335
  @_RpcTimeout(_TMO_NORMAL)
1336
  def call_import_start(self, node, opts, instance, dest, dest_args):
1337
    """Starts a listener for an import.
1338

1339
    This is a single-node call.
1340

1341
    @type node: string
1342
    @param node: Node name
1343
    @type instance: C{objects.Instance}
1344
    @param instance: Instance object
1345

1346
    """
1347
    return self._SingleNodeCall(node, "import_start",
1348
                                [opts.ToDict(),
1349
                                 self._InstDict(instance), dest,
1350
                                 _EncodeImportExportIO(dest, dest_args)])
1351

    
1352
  @_RpcTimeout(_TMO_NORMAL)
1353
  def call_export_start(self, node, opts, host, port,
1354
                        instance, source, source_args):
1355
    """Starts an export daemon.
1356

1357
    This is a single-node call.
1358

1359
    @type node: string
1360
    @param node: Node name
1361
    @type instance: C{objects.Instance}
1362
    @param instance: Instance object
1363

1364
    """
1365
    return self._SingleNodeCall(node, "export_start",
1366
                                [opts.ToDict(), host, port,
1367
                                 self._InstDict(instance), source,
1368
                                 _EncodeImportExportIO(source, source_args)])
1369

    
1370
  @_RpcTimeout(_TMO_FAST)
1371
  def call_impexp_status(self, node, names):
1372
    """Gets the status of an import or export.
1373

1374
    This is a single-node call.
1375

1376
    @type node: string
1377
    @param node: Node name
1378
    @type names: List of strings
1379
    @param names: Import/export names
1380
    @rtype: List of L{objects.ImportExportStatus} instances
1381
    @return: Returns a list of the state of each named import/export or None if
1382
             a status couldn't be retrieved
1383

1384
    """
1385
    result = self._SingleNodeCall(node, "impexp_status", [names])
1386

    
1387
    if not result.fail_msg:
1388
      decoded = []
1389

    
1390
      for i in result.payload:
1391
        if i is None:
1392
          decoded.append(None)
1393
          continue
1394
        decoded.append(objects.ImportExportStatus.FromDict(i))
1395

    
1396
      result.payload = decoded
1397

    
1398
    return result
1399

    
1400
  @_RpcTimeout(_TMO_NORMAL)
1401
  def call_impexp_abort(self, node, name):
1402
    """Aborts an import or export.
1403

1404
    This is a single-node call.
1405

1406
    @type node: string
1407
    @param node: Node name
1408
    @type name: string
1409
    @param name: Import/export name
1410

1411
    """
1412
    return self._SingleNodeCall(node, "impexp_abort", [name])
1413

    
1414
  @_RpcTimeout(_TMO_NORMAL)
1415
  def call_impexp_cleanup(self, node, name):
1416
    """Cleans up after an import or export.
1417

1418
    This is a single-node call.
1419

1420
    @type node: string
1421
    @param node: Node name
1422
    @type name: string
1423
    @param name: Import/export name
1424

1425
    """
1426
    return self._SingleNodeCall(node, "impexp_cleanup", [name])