Statistics
| Branch: | Tag: | Revision:

root / lib / rpc.py @ 1bdcbbab

History | View | Annotate | Download (41.6 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Inter-node RPC library.
23

24
"""
25

    
26
# pylint: disable-msg=C0103,R0201,R0904
27
# C0103: Invalid name, since call_ are not valid
28
# R0201: Method could be a function, we keep all rpcs instance methods
29
# as not to change them back and forth between static/instance methods
30
# if they need to start using instance attributes
31
# R0904: Too many public methods
32

    
33
import os
34
import logging
35
import zlib
36
import base64
37

    
38
from ganeti import utils
39
from ganeti import objects
40
from ganeti import http
41
from ganeti import serializer
42
from ganeti import constants
43
from ganeti import errors
44

    
45
# pylint has a bug here, doesn't see this import
46
import ganeti.http.client  # pylint: disable-msg=W0611
47

    
48

    
49
# Module level variable
50
_http_manager = None
51

    
52
# Various time constants for the timeout table
53
_TMO_URGENT = 60 # one minute
54
_TMO_FAST = 5 * 60 # five minutes
55
_TMO_NORMAL = 15 * 60 # 15 minutes
56
_TMO_SLOW = 3600 # one hour
57
_TMO_4HRS = 4 * 3600
58
_TMO_1DAY = 86400
59

    
60
# Timeout table that will be built later by decorators
61
# Guidelines for choosing timeouts:
62
# - call used during watcher: timeout -> 1min, _TMO_URGENT
63
# - trivial (but be sure it is trivial) (e.g. reading a file): 5min, _TMO_FAST
64
# - other calls: 15 min, _TMO_NORMAL
65
# - special calls (instance add, etc.): either _TMO_SLOW (1h) or huge timeouts
66

    
67
_TIMEOUTS = {
68
}
69

    
70

    
71
def Init():
72
  """Initializes the module-global HTTP client manager.
73

74
  Must be called before using any RPC function.
75

76
  """
77
  global _http_manager # pylint: disable-msg=W0603
78

    
79
  assert not _http_manager, "RPC module initialized more than once"
80

    
81
  http.InitSsl()
82

    
83
  _http_manager = http.client.HttpClientManager()
84

    
85

    
86
def Shutdown():
87
  """Stops the module-global HTTP client manager.
88

89
  Must be called before quitting the program.
90

91
  """
92
  global _http_manager # pylint: disable-msg=W0603
93

    
94
  if _http_manager:
95
    _http_manager.Shutdown()
96
    _http_manager = None
97

    
98

    
99
def _RpcTimeout(secs):
100
  """Timeout decorator.
101

102
  When applied to a rpc call_* function, it updates the global timeout
103
  table with the given function/timeout.
104

105
  """
106
  def decorator(f):
107
    name = f.__name__
108
    assert name.startswith("call_")
109
    _TIMEOUTS[name[len("call_"):]] = secs
110
    return f
111
  return decorator
112

    
113

    
114
class RpcResult(object):
115
  """RPC Result class.
116

117
  This class holds an RPC result. It is needed since in multi-node
118
  calls we can't raise an exception just because one one out of many
119
  failed, and therefore we use this class to encapsulate the result.
120

121
  @ivar data: the data payload, for successful results, or None
122
  @ivar call: the name of the RPC call
123
  @ivar node: the name of the node to which we made the call
124
  @ivar offline: whether the operation failed because the node was
125
      offline, as opposed to actual failure; offline=True will always
126
      imply failed=True, in order to allow simpler checking if
127
      the user doesn't care about the exact failure mode
128
  @ivar fail_msg: the error message if the call failed
129

130
  """
131
  def __init__(self, data=None, failed=False, offline=False,
132
               call=None, node=None):
133
    self.offline = offline
134
    self.call = call
135
    self.node = node
136

    
137
    if offline:
138
      self.fail_msg = "Node is marked offline"
139
      self.data = self.payload = None
140
    elif failed:
141
      self.fail_msg = self._EnsureErr(data)
142
      self.data = self.payload = None
143
    else:
144
      self.data = data
145
      if not isinstance(self.data, (tuple, list)):
146
        self.fail_msg = ("RPC layer error: invalid result type (%s)" %
147
                         type(self.data))
148
        self.payload = None
149
      elif len(data) != 2:
150
        self.fail_msg = ("RPC layer error: invalid result length (%d), "
151
                         "expected 2" % len(self.data))
152
        self.payload = None
153
      elif not self.data[0]:
154
        self.fail_msg = self._EnsureErr(self.data[1])
155
        self.payload = None
156
      else:
157
        # finally success
158
        self.fail_msg = None
159
        self.payload = data[1]
160

    
161
    assert hasattr(self, "call")
162
    assert hasattr(self, "data")
163
    assert hasattr(self, "fail_msg")
164
    assert hasattr(self, "node")
165
    assert hasattr(self, "offline")
166
    assert hasattr(self, "payload")
167

    
168
  @staticmethod
169
  def _EnsureErr(val):
170
    """Helper to ensure we return a 'True' value for error."""
171
    if val:
172
      return val
173
    else:
174
      return "No error information"
175

    
176
  def Raise(self, msg, prereq=False, ecode=None):
177
    """If the result has failed, raise an OpExecError.
178

179
    This is used so that LU code doesn't have to check for each
180
    result, but instead can call this function.
181

182
    """
183
    if not self.fail_msg:
184
      return
185

    
186
    if not msg: # one could pass None for default message
187
      msg = ("Call '%s' to node '%s' has failed: %s" %
188
             (self.call, self.node, self.fail_msg))
189
    else:
190
      msg = "%s: %s" % (msg, self.fail_msg)
191
    if prereq:
192
      ec = errors.OpPrereqError
193
    else:
194
      ec = errors.OpExecError
195
    if ecode is not None:
196
      args = (msg, ecode)
197
    else:
198
      args = (msg, )
199
    raise ec(*args) # pylint: disable-msg=W0142
200

    
201

    
202
class Client:
203
  """RPC Client class.
204

205
  This class, given a (remote) method name, a list of parameters and a
206
  list of nodes, will contact (in parallel) all nodes, and return a
207
  dict of results (key: node name, value: result).
208

209
  One current bug is that generic failure is still signaled by
210
  'False' result, which is not good. This overloading of values can
211
  cause bugs.
212

213
  """
214
  def __init__(self, procedure, body, port):
215
    assert procedure in _TIMEOUTS, ("New RPC call not declared in the"
216
                                    " timeouts table")
217
    self.procedure = procedure
218
    self.body = body
219
    self.port = port
220
    self.nc = {}
221

    
222
    self._ssl_params = \
223
      http.HttpSslParams(ssl_key_path=constants.NODED_CERT_FILE,
224
                         ssl_cert_path=constants.NODED_CERT_FILE)
225

    
226
  def ConnectList(self, node_list, address_list=None, read_timeout=None):
227
    """Add a list of nodes to the target nodes.
228

229
    @type node_list: list
230
    @param node_list: the list of node names to connect
231
    @type address_list: list or None
232
    @keyword address_list: either None or a list with node addresses,
233
        which must have the same length as the node list
234
    @type read_timeout: int
235
    @param read_timeout: overwrites the default read timeout for the
236
        given operation
237

238
    """
239
    if address_list is None:
240
      address_list = [None for _ in node_list]
241
    else:
242
      assert len(node_list) == len(address_list), \
243
             "Name and address lists should have the same length"
244
    for node, address in zip(node_list, address_list):
245
      self.ConnectNode(node, address, read_timeout=read_timeout)
246

    
247
  def ConnectNode(self, name, address=None, read_timeout=None):
248
    """Add a node to the target list.
249

250
    @type name: str
251
    @param name: the node name
252
    @type address: str
253
    @keyword address: the node address, if known
254

255
    """
256
    if address is None:
257
      address = name
258

    
259
    if read_timeout is None:
260
      read_timeout = _TIMEOUTS[self.procedure]
261

    
262
    self.nc[name] = \
263
      http.client.HttpClientRequest(address, self.port, http.HTTP_PUT,
264
                                    "/%s" % self.procedure,
265
                                    post_data=self.body,
266
                                    ssl_params=self._ssl_params,
267
                                    ssl_verify_peer=True,
268
                                    read_timeout=read_timeout)
269

    
270
  def GetResults(self):
271
    """Call nodes and return results.
272

273
    @rtype: list
274
    @return: List of RPC results
275

276
    """
277
    assert _http_manager, "RPC module not initialized"
278

    
279
    _http_manager.ExecRequests(self.nc.values())
280

    
281
    results = {}
282

    
283
    for name, req in self.nc.iteritems():
284
      if req.success and req.resp_status_code == http.HTTP_OK:
285
        results[name] = RpcResult(data=serializer.LoadJson(req.resp_body),
286
                                  node=name, call=self.procedure)
287
        continue
288

    
289
      # TODO: Better error reporting
290
      if req.error:
291
        msg = req.error
292
      else:
293
        msg = req.resp_body
294

    
295
      logging.error("RPC error in %s from node %s: %s",
296
                    self.procedure, name, msg)
297
      results[name] = RpcResult(data=msg, failed=True, node=name,
298
                                call=self.procedure)
299

    
300
    return results
301

    
302

    
303
def _EncodeImportExportIO(ieio, ieioargs):
304
  """Encodes import/export I/O information.
305

306
  """
307
  if ieio == constants.IEIO_RAW_DISK:
308
    assert len(ieioargs) == 1
309
    return (ieioargs[0].ToDict(), )
310

    
311
  if ieio == constants.IEIO_SCRIPT:
312
    assert len(ieioargs) == 2
313
    return (ieioargs[0].ToDict(), ieioargs[1])
314

    
315
  return ieioargs
316

    
317

    
318
class RpcRunner(object):
319
  """RPC runner class"""
320

    
321
  def __init__(self, cfg):
322
    """Initialized the rpc runner.
323

324
    @type cfg:  C{config.ConfigWriter}
325
    @param cfg: the configuration object that will be used to get data
326
                about the cluster
327

328
    """
329
    self._cfg = cfg
330
    self.port = utils.GetDaemonPort(constants.NODED)
331

    
332
  def _InstDict(self, instance, hvp=None, bep=None, osp=None):
333
    """Convert the given instance to a dict.
334

335
    This is done via the instance's ToDict() method and additionally
336
    we fill the hvparams with the cluster defaults.
337

338
    @type instance: L{objects.Instance}
339
    @param instance: an Instance object
340
    @type hvp: dict or None
341
    @param hvp: a dictionary with overridden hypervisor parameters
342
    @type bep: dict or None
343
    @param bep: a dictionary with overridden backend parameters
344
    @type osp: dict or None
345
    @param osp: a dictionary with overriden os parameters
346
    @rtype: dict
347
    @return: the instance dict, with the hvparams filled with the
348
        cluster defaults
349

350
    """
351
    idict = instance.ToDict()
352
    cluster = self._cfg.GetClusterInfo()
353
    idict["hvparams"] = cluster.FillHV(instance)
354
    if hvp is not None:
355
      idict["hvparams"].update(hvp)
356
    idict["beparams"] = cluster.FillBE(instance)
357
    if bep is not None:
358
      idict["beparams"].update(bep)
359
    idict["osparams"] = cluster.SimpleFillOS(instance.os, instance.osparams)
360
    if osp is not None:
361
      idict["osparams"].update(osp)
362
    for nic in idict["nics"]:
363
      nic['nicparams'] = objects.FillDict(
364
        cluster.nicparams[constants.PP_DEFAULT],
365
        nic['nicparams'])
366
    return idict
367

    
368
  def _ConnectList(self, client, node_list, call, read_timeout=None):
369
    """Helper for computing node addresses.
370

371
    @type client: L{ganeti.rpc.Client}
372
    @param client: a C{Client} instance
373
    @type node_list: list
374
    @param node_list: the node list we should connect
375
    @type call: string
376
    @param call: the name of the remote procedure call, for filling in
377
        correctly any eventual offline nodes' results
378
    @type read_timeout: int
379
    @param read_timeout: overwrites the default read timeout for the
380
        given operation
381

382
    """
383
    all_nodes = self._cfg.GetAllNodesInfo()
384
    name_list = []
385
    addr_list = []
386
    skip_dict = {}
387
    for node in node_list:
388
      if node in all_nodes:
389
        if all_nodes[node].offline:
390
          skip_dict[node] = RpcResult(node=node, offline=True, call=call)
391
          continue
392
        val = all_nodes[node].primary_ip
393
      else:
394
        val = None
395
      addr_list.append(val)
396
      name_list.append(node)
397
    if name_list:
398
      client.ConnectList(name_list, address_list=addr_list,
399
                         read_timeout=read_timeout)
400
    return skip_dict
401

    
402
  def _ConnectNode(self, client, node, call, read_timeout=None):
403
    """Helper for computing one node's address.
404

405
    @type client: L{ganeti.rpc.Client}
406
    @param client: a C{Client} instance
407
    @type node: str
408
    @param node: the node we should connect
409
    @type call: string
410
    @param call: the name of the remote procedure call, for filling in
411
        correctly any eventual offline nodes' results
412
    @type read_timeout: int
413
    @param read_timeout: overwrites the default read timeout for the
414
        given operation
415

416
    """
417
    node_info = self._cfg.GetNodeInfo(node)
418
    if node_info is not None:
419
      if node_info.offline:
420
        return RpcResult(node=node, offline=True, call=call)
421
      addr = node_info.primary_ip
422
    else:
423
      addr = None
424
    client.ConnectNode(node, address=addr, read_timeout=read_timeout)
425

    
426
  def _MultiNodeCall(self, node_list, procedure, args, read_timeout=None):
427
    """Helper for making a multi-node call
428

429
    """
430
    body = serializer.DumpJson(args, indent=False)
431
    c = Client(procedure, body, self.port)
432
    skip_dict = self._ConnectList(c, node_list, procedure,
433
                                  read_timeout=read_timeout)
434
    skip_dict.update(c.GetResults())
435
    return skip_dict
436

    
437
  @classmethod
438
  def _StaticMultiNodeCall(cls, node_list, procedure, args,
439
                           address_list=None, read_timeout=None):
440
    """Helper for making a multi-node static call
441

442
    """
443
    body = serializer.DumpJson(args, indent=False)
444
    c = Client(procedure, body, utils.GetDaemonPort(constants.NODED))
445
    c.ConnectList(node_list, address_list=address_list,
446
                  read_timeout=read_timeout)
447
    return c.GetResults()
448

    
449
  def _SingleNodeCall(self, node, procedure, args, read_timeout=None):
450
    """Helper for making a single-node call
451

452
    """
453
    body = serializer.DumpJson(args, indent=False)
454
    c = Client(procedure, body, self.port)
455
    result = self._ConnectNode(c, node, procedure, read_timeout=read_timeout)
456
    if result is None:
457
      # we did connect, node is not offline
458
      result = c.GetResults()[node]
459
    return result
460

    
461
  @classmethod
462
  def _StaticSingleNodeCall(cls, node, procedure, args, read_timeout=None):
463
    """Helper for making a single-node static call
464

465
    """
466
    body = serializer.DumpJson(args, indent=False)
467
    c = Client(procedure, body, utils.GetDaemonPort(constants.NODED))
468
    c.ConnectNode(node, read_timeout=read_timeout)
469
    return c.GetResults()[node]
470

    
471
  @staticmethod
472
  def _Compress(data):
473
    """Compresses a string for transport over RPC.
474

475
    Small amounts of data are not compressed.
476

477
    @type data: str
478
    @param data: Data
479
    @rtype: tuple
480
    @return: Encoded data to send
481

482
    """
483
    # Small amounts of data are not compressed
484
    if len(data) < 512:
485
      return (constants.RPC_ENCODING_NONE, data)
486

    
487
    # Compress with zlib and encode in base64
488
    return (constants.RPC_ENCODING_ZLIB_BASE64,
489
            base64.b64encode(zlib.compress(data, 3)))
490

    
491
  #
492
  # Begin RPC calls
493
  #
494

    
495
  @_RpcTimeout(_TMO_URGENT)
496
  def call_lv_list(self, node_list, vg_name):
497
    """Gets the logical volumes present in a given volume group.
498

499
    This is a multi-node call.
500

501
    """
502
    return self._MultiNodeCall(node_list, "lv_list", [vg_name])
503

    
504
  @_RpcTimeout(_TMO_URGENT)
505
  def call_vg_list(self, node_list):
506
    """Gets the volume group list.
507

508
    This is a multi-node call.
509

510
    """
511
    return self._MultiNodeCall(node_list, "vg_list", [])
512

    
513
  @_RpcTimeout(_TMO_NORMAL)
514
  def call_storage_list(self, node_list, su_name, su_args, name, fields):
515
    """Get list of storage units.
516

517
    This is a multi-node call.
518

519
    """
520
    return self._MultiNodeCall(node_list, "storage_list",
521
                               [su_name, su_args, name, fields])
522

    
523
  @_RpcTimeout(_TMO_NORMAL)
524
  def call_storage_modify(self, node, su_name, su_args, name, changes):
525
    """Modify a storage unit.
526

527
    This is a single-node call.
528

529
    """
530
    return self._SingleNodeCall(node, "storage_modify",
531
                                [su_name, su_args, name, changes])
532

    
533
  @_RpcTimeout(_TMO_NORMAL)
534
  def call_storage_execute(self, node, su_name, su_args, name, op):
535
    """Executes an operation on a storage unit.
536

537
    This is a single-node call.
538

539
    """
540
    return self._SingleNodeCall(node, "storage_execute",
541
                                [su_name, su_args, name, op])
542

    
543
  @_RpcTimeout(_TMO_URGENT)
544
  def call_bridges_exist(self, node, bridges_list):
545
    """Checks if a node has all the bridges given.
546

547
    This method checks if all bridges given in the bridges_list are
548
    present on the remote node, so that an instance that uses interfaces
549
    on those bridges can be started.
550

551
    This is a single-node call.
552

553
    """
554
    return self._SingleNodeCall(node, "bridges_exist", [bridges_list])
555

    
556
  @_RpcTimeout(_TMO_NORMAL)
557
  def call_instance_start(self, node, instance, hvp, bep):
558
    """Starts an instance.
559

560
    This is a single-node call.
561

562
    """
563
    idict = self._InstDict(instance, hvp=hvp, bep=bep)
564
    return self._SingleNodeCall(node, "instance_start", [idict])
565

    
566
  @_RpcTimeout(_TMO_NORMAL)
567
  def call_instance_shutdown(self, node, instance, timeout):
568
    """Stops an instance.
569

570
    This is a single-node call.
571

572
    """
573
    return self._SingleNodeCall(node, "instance_shutdown",
574
                                [self._InstDict(instance), timeout])
575

    
576
  @_RpcTimeout(_TMO_NORMAL)
577
  def call_migration_info(self, node, instance):
578
    """Gather the information necessary to prepare an instance migration.
579

580
    This is a single-node call.
581

582
    @type node: string
583
    @param node: the node on which the instance is currently running
584
    @type instance: C{objects.Instance}
585
    @param instance: the instance definition
586

587
    """
588
    return self._SingleNodeCall(node, "migration_info",
589
                                [self._InstDict(instance)])
590

    
591
  @_RpcTimeout(_TMO_NORMAL)
592
  def call_accept_instance(self, node, instance, info, target):
593
    """Prepare a node to accept an instance.
594

595
    This is a single-node call.
596

597
    @type node: string
598
    @param node: the target node for the migration
599
    @type instance: C{objects.Instance}
600
    @param instance: the instance definition
601
    @type info: opaque/hypervisor specific (string/data)
602
    @param info: result for the call_migration_info call
603
    @type target: string
604
    @param target: target hostname (usually ip address) (on the node itself)
605

606
    """
607
    return self._SingleNodeCall(node, "accept_instance",
608
                                [self._InstDict(instance), info, target])
609

    
610
  @_RpcTimeout(_TMO_NORMAL)
611
  def call_finalize_migration(self, node, instance, info, success):
612
    """Finalize any target-node migration specific operation.
613

614
    This is called both in case of a successful migration and in case of error
615
    (in which case it should abort the migration).
616

617
    This is a single-node call.
618

619
    @type node: string
620
    @param node: the target node for the migration
621
    @type instance: C{objects.Instance}
622
    @param instance: the instance definition
623
    @type info: opaque/hypervisor specific (string/data)
624
    @param info: result for the call_migration_info call
625
    @type success: boolean
626
    @param success: whether the migration was a success or a failure
627

628
    """
629
    return self._SingleNodeCall(node, "finalize_migration",
630
                                [self._InstDict(instance), info, success])
631

    
632
  @_RpcTimeout(_TMO_SLOW)
633
  def call_instance_migrate(self, node, instance, target, live):
634
    """Migrate an instance.
635

636
    This is a single-node call.
637

638
    @type node: string
639
    @param node: the node on which the instance is currently running
640
    @type instance: C{objects.Instance}
641
    @param instance: the instance definition
642
    @type target: string
643
    @param target: the target node name
644
    @type live: boolean
645
    @param live: whether the migration should be done live or not (the
646
        interpretation of this parameter is left to the hypervisor)
647

648
    """
649
    return self._SingleNodeCall(node, "instance_migrate",
650
                                [self._InstDict(instance), target, live])
651

    
652
  @_RpcTimeout(_TMO_NORMAL)
653
  def call_instance_reboot(self, node, inst, reboot_type, shutdown_timeout):
654
    """Reboots an instance.
655

656
    This is a single-node call.
657

658
    """
659
    return self._SingleNodeCall(node, "instance_reboot",
660
                                [self._InstDict(inst), reboot_type,
661
                                 shutdown_timeout])
662

    
663
  @_RpcTimeout(_TMO_1DAY)
664
  def call_instance_os_add(self, node, inst, reinstall, debug):
665
    """Installs an OS on the given instance.
666

667
    This is a single-node call.
668

669
    """
670
    return self._SingleNodeCall(node, "instance_os_add",
671
                                [self._InstDict(inst), reinstall, debug])
672

    
673
  @_RpcTimeout(_TMO_SLOW)
674
  def call_instance_run_rename(self, node, inst, old_name, debug):
675
    """Run the OS rename script for an instance.
676

677
    This is a single-node call.
678

679
    """
680
    return self._SingleNodeCall(node, "instance_run_rename",
681
                                [self._InstDict(inst), old_name, debug])
682

    
683
  @_RpcTimeout(_TMO_URGENT)
684
  def call_instance_info(self, node, instance, hname):
685
    """Returns information about a single instance.
686

687
    This is a single-node call.
688

689
    @type node: list
690
    @param node: the list of nodes to query
691
    @type instance: string
692
    @param instance: the instance name
693
    @type hname: string
694
    @param hname: the hypervisor type of the instance
695

696
    """
697
    return self._SingleNodeCall(node, "instance_info", [instance, hname])
698

    
699
  @_RpcTimeout(_TMO_NORMAL)
700
  def call_instance_migratable(self, node, instance):
701
    """Checks whether the given instance can be migrated.
702

703
    This is a single-node call.
704

705
    @param node: the node to query
706
    @type instance: L{objects.Instance}
707
    @param instance: the instance to check
708

709

710
    """
711
    return self._SingleNodeCall(node, "instance_migratable",
712
                                [self._InstDict(instance)])
713

    
714
  @_RpcTimeout(_TMO_URGENT)
715
  def call_all_instances_info(self, node_list, hypervisor_list):
716
    """Returns information about all instances on the given nodes.
717

718
    This is a multi-node call.
719

720
    @type node_list: list
721
    @param node_list: the list of nodes to query
722
    @type hypervisor_list: list
723
    @param hypervisor_list: the hypervisors to query for instances
724

725
    """
726
    return self._MultiNodeCall(node_list, "all_instances_info",
727
                               [hypervisor_list])
728

    
729
  @_RpcTimeout(_TMO_URGENT)
730
  def call_instance_list(self, node_list, hypervisor_list):
731
    """Returns the list of running instances on a given node.
732

733
    This is a multi-node call.
734

735
    @type node_list: list
736
    @param node_list: the list of nodes to query
737
    @type hypervisor_list: list
738
    @param hypervisor_list: the hypervisors to query for instances
739

740
    """
741
    return self._MultiNodeCall(node_list, "instance_list", [hypervisor_list])
742

    
743
  @_RpcTimeout(_TMO_FAST)
744
  def call_node_tcp_ping(self, node, source, target, port, timeout,
745
                         live_port_needed):
746
    """Do a TcpPing on the remote node
747

748
    This is a single-node call.
749

750
    """
751
    return self._SingleNodeCall(node, "node_tcp_ping",
752
                                [source, target, port, timeout,
753
                                 live_port_needed])
754

    
755
  @_RpcTimeout(_TMO_FAST)
756
  def call_node_has_ip_address(self, node, address):
757
    """Checks if a node has the given IP address.
758

759
    This is a single-node call.
760

761
    """
762
    return self._SingleNodeCall(node, "node_has_ip_address", [address])
763

    
764
  @_RpcTimeout(_TMO_URGENT)
765
  def call_node_info(self, node_list, vg_name, hypervisor_type):
766
    """Return node information.
767

768
    This will return memory information and volume group size and free
769
    space.
770

771
    This is a multi-node call.
772

773
    @type node_list: list
774
    @param node_list: the list of nodes to query
775
    @type vg_name: C{string}
776
    @param vg_name: the name of the volume group to ask for disk space
777
        information
778
    @type hypervisor_type: C{str}
779
    @param hypervisor_type: the name of the hypervisor to ask for
780
        memory information
781

782
    """
783
    return self._MultiNodeCall(node_list, "node_info",
784
                               [vg_name, hypervisor_type])
785

    
786
  @_RpcTimeout(_TMO_NORMAL)
787
  def call_node_add(self, node, dsa, dsapub, rsa, rsapub, ssh, sshpub):
788
    """Add a node to the cluster.
789

790
    This is a single-node call.
791

792
    """
793
    return self._SingleNodeCall(node, "node_add",
794
                                [dsa, dsapub, rsa, rsapub, ssh, sshpub])
795

    
796
  @_RpcTimeout(_TMO_NORMAL)
797
  def call_node_verify(self, node_list, checkdict, cluster_name):
798
    """Request verification of given parameters.
799

800
    This is a multi-node call.
801

802
    """
803
    return self._MultiNodeCall(node_list, "node_verify",
804
                               [checkdict, cluster_name])
805

    
806
  @classmethod
807
  @_RpcTimeout(_TMO_FAST)
808
  def call_node_start_master(cls, node, start_daemons, no_voting):
809
    """Tells a node to activate itself as a master.
810

811
    This is a single-node call.
812

813
    """
814
    return cls._StaticSingleNodeCall(node, "node_start_master",
815
                                     [start_daemons, no_voting])
816

    
817
  @classmethod
818
  @_RpcTimeout(_TMO_FAST)
819
  def call_node_stop_master(cls, node, stop_daemons):
820
    """Tells a node to demote itself from master status.
821

822
    This is a single-node call.
823

824
    """
825
    return cls._StaticSingleNodeCall(node, "node_stop_master", [stop_daemons])
826

    
827
  @classmethod
828
  @_RpcTimeout(_TMO_URGENT)
829
  def call_master_info(cls, node_list):
830
    """Query master info.
831

832
    This is a multi-node call.
833

834
    """
835
    # TODO: should this method query down nodes?
836
    return cls._StaticMultiNodeCall(node_list, "master_info", [])
837

    
838
  @classmethod
839
  @_RpcTimeout(_TMO_URGENT)
840
  def call_version(cls, node_list):
841
    """Query node version.
842

843
    This is a multi-node call.
844

845
    """
846
    return cls._StaticMultiNodeCall(node_list, "version", [])
847

    
848
  @_RpcTimeout(_TMO_NORMAL)
849
  def call_blockdev_create(self, node, bdev, size, owner, on_primary, info):
850
    """Request creation of a given block device.
851

852
    This is a single-node call.
853

854
    """
855
    return self._SingleNodeCall(node, "blockdev_create",
856
                                [bdev.ToDict(), size, owner, on_primary, info])
857

    
858
  @_RpcTimeout(_TMO_NORMAL)
859
  def call_blockdev_remove(self, node, bdev):
860
    """Request removal of a given block device.
861

862
    This is a single-node call.
863

864
    """
865
    return self._SingleNodeCall(node, "blockdev_remove", [bdev.ToDict()])
866

    
867
  @_RpcTimeout(_TMO_NORMAL)
868
  def call_blockdev_rename(self, node, devlist):
869
    """Request rename of the given block devices.
870

871
    This is a single-node call.
872

873
    """
874
    return self._SingleNodeCall(node, "blockdev_rename",
875
                                [(d.ToDict(), uid) for d, uid in devlist])
876

    
877
  @_RpcTimeout(_TMO_NORMAL)
878
  def call_blockdev_assemble(self, node, disk, owner, on_primary):
879
    """Request assembling of a given block device.
880

881
    This is a single-node call.
882

883
    """
884
    return self._SingleNodeCall(node, "blockdev_assemble",
885
                                [disk.ToDict(), owner, on_primary])
886

    
887
  @_RpcTimeout(_TMO_NORMAL)
888
  def call_blockdev_shutdown(self, node, disk):
889
    """Request shutdown of a given block device.
890

891
    This is a single-node call.
892

893
    """
894
    return self._SingleNodeCall(node, "blockdev_shutdown", [disk.ToDict()])
895

    
896
  @_RpcTimeout(_TMO_NORMAL)
897
  def call_blockdev_addchildren(self, node, bdev, ndevs):
898
    """Request adding a list of children to a (mirroring) device.
899

900
    This is a single-node call.
901

902
    """
903
    return self._SingleNodeCall(node, "blockdev_addchildren",
904
                                [bdev.ToDict(),
905
                                 [disk.ToDict() for disk in ndevs]])
906

    
907
  @_RpcTimeout(_TMO_NORMAL)
908
  def call_blockdev_removechildren(self, node, bdev, ndevs):
909
    """Request removing a list of children from a (mirroring) device.
910

911
    This is a single-node call.
912

913
    """
914
    return self._SingleNodeCall(node, "blockdev_removechildren",
915
                                [bdev.ToDict(),
916
                                 [disk.ToDict() for disk in ndevs]])
917

    
918
  @_RpcTimeout(_TMO_NORMAL)
919
  def call_blockdev_getmirrorstatus(self, node, disks):
920
    """Request status of a (mirroring) device.
921

922
    This is a single-node call.
923

924
    """
925
    result = self._SingleNodeCall(node, "blockdev_getmirrorstatus",
926
                                  [dsk.ToDict() for dsk in disks])
927
    if not result.fail_msg:
928
      result.payload = [objects.BlockDevStatus.FromDict(i)
929
                        for i in result.payload]
930
    return result
931

    
932
  @_RpcTimeout(_TMO_NORMAL)
933
  def call_blockdev_find(self, node, disk):
934
    """Request identification of a given block device.
935

936
    This is a single-node call.
937

938
    """
939
    result = self._SingleNodeCall(node, "blockdev_find", [disk.ToDict()])
940
    if not result.fail_msg and result.payload is not None:
941
      result.payload = objects.BlockDevStatus.FromDict(result.payload)
942
    return result
943

    
944
  @_RpcTimeout(_TMO_NORMAL)
945
  def call_blockdev_close(self, node, instance_name, disks):
946
    """Closes the given block devices.
947

948
    This is a single-node call.
949

950
    """
951
    params = [instance_name, [cf.ToDict() for cf in disks]]
952
    return self._SingleNodeCall(node, "blockdev_close", params)
953

    
954
  @_RpcTimeout(_TMO_NORMAL)
955
  def call_blockdev_getsizes(self, node, disks):
956
    """Returns the size of the given disks.
957

958
    This is a single-node call.
959

960
    """
961
    params = [[cf.ToDict() for cf in disks]]
962
    return self._SingleNodeCall(node, "blockdev_getsize", params)
963

    
964
  @_RpcTimeout(_TMO_NORMAL)
965
  def call_drbd_disconnect_net(self, node_list, nodes_ip, disks):
966
    """Disconnects the network of the given drbd devices.
967

968
    This is a multi-node call.
969

970
    """
971
    return self._MultiNodeCall(node_list, "drbd_disconnect_net",
972
                               [nodes_ip, [cf.ToDict() for cf in disks]])
973

    
974
  @_RpcTimeout(_TMO_NORMAL)
975
  def call_drbd_attach_net(self, node_list, nodes_ip,
976
                           disks, instance_name, multimaster):
977
    """Disconnects the given drbd devices.
978

979
    This is a multi-node call.
980

981
    """
982
    return self._MultiNodeCall(node_list, "drbd_attach_net",
983
                               [nodes_ip, [cf.ToDict() for cf in disks],
984
                                instance_name, multimaster])
985

    
986
  @_RpcTimeout(_TMO_SLOW)
987
  def call_drbd_wait_sync(self, node_list, nodes_ip, disks):
988
    """Waits for the synchronization of drbd devices is complete.
989

990
    This is a multi-node call.
991

992
    """
993
    return self._MultiNodeCall(node_list, "drbd_wait_sync",
994
                               [nodes_ip, [cf.ToDict() for cf in disks]])
995

    
996
  @classmethod
997
  @_RpcTimeout(_TMO_NORMAL)
998
  def call_upload_file(cls, node_list, file_name, address_list=None):
999
    """Upload a file.
1000

1001
    The node will refuse the operation in case the file is not on the
1002
    approved file list.
1003

1004
    This is a multi-node call.
1005

1006
    @type node_list: list
1007
    @param node_list: the list of node names to upload to
1008
    @type file_name: str
1009
    @param file_name: the filename to upload
1010
    @type address_list: list or None
1011
    @keyword address_list: an optional list of node addresses, in order
1012
        to optimize the RPC speed
1013

1014
    """
1015
    file_contents = utils.ReadFile(file_name)
1016
    data = cls._Compress(file_contents)
1017
    st = os.stat(file_name)
1018
    params = [file_name, data, st.st_mode, st.st_uid, st.st_gid,
1019
              st.st_atime, st.st_mtime]
1020
    return cls._StaticMultiNodeCall(node_list, "upload_file", params,
1021
                                    address_list=address_list)
1022

    
1023
  @classmethod
1024
  @_RpcTimeout(_TMO_NORMAL)
1025
  def call_write_ssconf_files(cls, node_list, values):
1026
    """Write ssconf files.
1027

1028
    This is a multi-node call.
1029

1030
    """
1031
    return cls._StaticMultiNodeCall(node_list, "write_ssconf_files", [values])
1032

    
1033
  @_RpcTimeout(_TMO_FAST)
1034
  def call_os_diagnose(self, node_list):
1035
    """Request a diagnose of OS definitions.
1036

1037
    This is a multi-node call.
1038

1039
    """
1040
    return self._MultiNodeCall(node_list, "os_diagnose", [])
1041

    
1042
  @_RpcTimeout(_TMO_FAST)
1043
  def call_os_get(self, node, name):
1044
    """Returns an OS definition.
1045

1046
    This is a single-node call.
1047

1048
    """
1049
    result = self._SingleNodeCall(node, "os_get", [name])
1050
    if not result.fail_msg and isinstance(result.payload, dict):
1051
      result.payload = objects.OS.FromDict(result.payload)
1052
    return result
1053

    
1054
  @_RpcTimeout(_TMO_FAST)
1055
  def call_os_validate(self, required, nodes, name, checks, params):
1056
    """Run a validation routine for a given OS.
1057

1058
    This is a multi-node call.
1059

1060
    """
1061
    return self._MultiNodeCall(nodes, "os_validate",
1062
                               [required, name, checks, params])
1063

    
1064
  @_RpcTimeout(_TMO_NORMAL)
1065
  def call_hooks_runner(self, node_list, hpath, phase, env):
1066
    """Call the hooks runner.
1067

1068
    Args:
1069
      - op: the OpCode instance
1070
      - env: a dictionary with the environment
1071

1072
    This is a multi-node call.
1073

1074
    """
1075
    params = [hpath, phase, env]
1076
    return self._MultiNodeCall(node_list, "hooks_runner", params)
1077

    
1078
  @_RpcTimeout(_TMO_NORMAL)
1079
  def call_iallocator_runner(self, node, name, idata):
1080
    """Call an iallocator on a remote node
1081

1082
    Args:
1083
      - name: the iallocator name
1084
      - input: the json-encoded input string
1085

1086
    This is a single-node call.
1087

1088
    """
1089
    return self._SingleNodeCall(node, "iallocator_runner", [name, idata])
1090

    
1091
  @_RpcTimeout(_TMO_NORMAL)
1092
  def call_blockdev_grow(self, node, cf_bdev, amount):
1093
    """Request a snapshot of the given block device.
1094

1095
    This is a single-node call.
1096

1097
    """
1098
    return self._SingleNodeCall(node, "blockdev_grow",
1099
                                [cf_bdev.ToDict(), amount])
1100

    
1101
  @_RpcTimeout(_TMO_1DAY)
1102
  def call_blockdev_export(self, node, cf_bdev,
1103
                           dest_node, dest_path, cluster_name):
1104
    """Export a given disk to another node.
1105

1106
    This is a single-node call.
1107

1108
    """
1109
    return self._SingleNodeCall(node, "blockdev_export",
1110
                                [cf_bdev.ToDict(), dest_node, dest_path,
1111
                                 cluster_name])
1112

    
1113
  @_RpcTimeout(_TMO_NORMAL)
1114
  def call_blockdev_snapshot(self, node, cf_bdev):
1115
    """Request a snapshot of the given block device.
1116

1117
    This is a single-node call.
1118

1119
    """
1120
    return self._SingleNodeCall(node, "blockdev_snapshot", [cf_bdev.ToDict()])
1121

    
1122
  @_RpcTimeout(_TMO_NORMAL)
1123
  def call_finalize_export(self, node, instance, snap_disks):
1124
    """Request the completion of an export operation.
1125

1126
    This writes the export config file, etc.
1127

1128
    This is a single-node call.
1129

1130
    """
1131
    flat_disks = []
1132
    for disk in snap_disks:
1133
      if isinstance(disk, bool):
1134
        flat_disks.append(disk)
1135
      else:
1136
        flat_disks.append(disk.ToDict())
1137

    
1138
    return self._SingleNodeCall(node, "finalize_export",
1139
                                [self._InstDict(instance), flat_disks])
1140

    
1141
  @_RpcTimeout(_TMO_FAST)
1142
  def call_export_info(self, node, path):
1143
    """Queries the export information in a given path.
1144

1145
    This is a single-node call.
1146

1147
    """
1148
    return self._SingleNodeCall(node, "export_info", [path])
1149

    
1150
  @_RpcTimeout(_TMO_FAST)
1151
  def call_export_list(self, node_list):
1152
    """Gets the stored exports list.
1153

1154
    This is a multi-node call.
1155

1156
    """
1157
    return self._MultiNodeCall(node_list, "export_list", [])
1158

    
1159
  @_RpcTimeout(_TMO_FAST)
1160
  def call_export_remove(self, node, export):
1161
    """Requests removal of a given export.
1162

1163
    This is a single-node call.
1164

1165
    """
1166
    return self._SingleNodeCall(node, "export_remove", [export])
1167

    
1168
  @classmethod
1169
  @_RpcTimeout(_TMO_NORMAL)
1170
  def call_node_leave_cluster(cls, node, modify_ssh_setup):
1171
    """Requests a node to clean the cluster information it has.
1172

1173
    This will remove the configuration information from the ganeti data
1174
    dir.
1175

1176
    This is a single-node call.
1177

1178
    """
1179
    return cls._StaticSingleNodeCall(node, "node_leave_cluster",
1180
                                     [modify_ssh_setup])
1181

    
1182
  @_RpcTimeout(_TMO_FAST)
1183
  def call_node_volumes(self, node_list):
1184
    """Gets all volumes on node(s).
1185

1186
    This is a multi-node call.
1187

1188
    """
1189
    return self._MultiNodeCall(node_list, "node_volumes", [])
1190

    
1191
  @_RpcTimeout(_TMO_FAST)
1192
  def call_node_demote_from_mc(self, node):
1193
    """Demote a node from the master candidate role.
1194

1195
    This is a single-node call.
1196

1197
    """
1198
    return self._SingleNodeCall(node, "node_demote_from_mc", [])
1199

    
1200
  @_RpcTimeout(_TMO_NORMAL)
1201
  def call_node_powercycle(self, node, hypervisor):
1202
    """Tries to powercycle a node.
1203

1204
    This is a single-node call.
1205

1206
    """
1207
    return self._SingleNodeCall(node, "node_powercycle", [hypervisor])
1208

    
1209
  @_RpcTimeout(None)
1210
  def call_test_delay(self, node_list, duration):
1211
    """Sleep for a fixed time on given node(s).
1212

1213
    This is a multi-node call.
1214

1215
    """
1216
    return self._MultiNodeCall(node_list, "test_delay", [duration],
1217
                               read_timeout=int(duration + 5))
1218

    
1219
  @_RpcTimeout(_TMO_FAST)
1220
  def call_file_storage_dir_create(self, node, file_storage_dir):
1221
    """Create the given file storage directory.
1222

1223
    This is a single-node call.
1224

1225
    """
1226
    return self._SingleNodeCall(node, "file_storage_dir_create",
1227
                                [file_storage_dir])
1228

    
1229
  @_RpcTimeout(_TMO_FAST)
1230
  def call_file_storage_dir_remove(self, node, file_storage_dir):
1231
    """Remove the given file storage directory.
1232

1233
    This is a single-node call.
1234

1235
    """
1236
    return self._SingleNodeCall(node, "file_storage_dir_remove",
1237
                                [file_storage_dir])
1238

    
1239
  @_RpcTimeout(_TMO_FAST)
1240
  def call_file_storage_dir_rename(self, node, old_file_storage_dir,
1241
                                   new_file_storage_dir):
1242
    """Rename file storage directory.
1243

1244
    This is a single-node call.
1245

1246
    """
1247
    return self._SingleNodeCall(node, "file_storage_dir_rename",
1248
                                [old_file_storage_dir, new_file_storage_dir])
1249

    
1250
  @classmethod
1251
  @_RpcTimeout(_TMO_FAST)
1252
  def call_jobqueue_update(cls, node_list, address_list, file_name, content):
1253
    """Update job queue.
1254

1255
    This is a multi-node call.
1256

1257
    """
1258
    return cls._StaticMultiNodeCall(node_list, "jobqueue_update",
1259
                                    [file_name, cls._Compress(content)],
1260
                                    address_list=address_list)
1261

    
1262
  @classmethod
1263
  @_RpcTimeout(_TMO_NORMAL)
1264
  def call_jobqueue_purge(cls, node):
1265
    """Purge job queue.
1266

1267
    This is a single-node call.
1268

1269
    """
1270
    return cls._StaticSingleNodeCall(node, "jobqueue_purge", [])
1271

    
1272
  @classmethod
1273
  @_RpcTimeout(_TMO_FAST)
1274
  def call_jobqueue_rename(cls, node_list, address_list, rename):
1275
    """Rename a job queue file.
1276

1277
    This is a multi-node call.
1278

1279
    """
1280
    return cls._StaticMultiNodeCall(node_list, "jobqueue_rename", rename,
1281
                                    address_list=address_list)
1282

    
1283
  @_RpcTimeout(_TMO_NORMAL)
1284
  def call_hypervisor_validate_params(self, node_list, hvname, hvparams):
1285
    """Validate the hypervisor params.
1286

1287
    This is a multi-node call.
1288

1289
    @type node_list: list
1290
    @param node_list: the list of nodes to query
1291
    @type hvname: string
1292
    @param hvname: the hypervisor name
1293
    @type hvparams: dict
1294
    @param hvparams: the hypervisor parameters to be validated
1295

1296
    """
1297
    cluster = self._cfg.GetClusterInfo()
1298
    hv_full = objects.FillDict(cluster.hvparams.get(hvname, {}), hvparams)
1299
    return self._MultiNodeCall(node_list, "hypervisor_validate_params",
1300
                               [hvname, hv_full])
1301

    
1302
  @_RpcTimeout(_TMO_NORMAL)
1303
  def call_x509_cert_create(self, node, validity):
1304
    """Creates a new X509 certificate for SSL/TLS.
1305

1306
    This is a single-node call.
1307

1308
    @type validity: int
1309
    @param validity: Validity in seconds
1310

1311
    """
1312
    return self._SingleNodeCall(node, "x509_cert_create", [validity])
1313

    
1314
  @_RpcTimeout(_TMO_NORMAL)
1315
  def call_x509_cert_remove(self, node, name):
1316
    """Removes a X509 certificate.
1317

1318
    This is a single-node call.
1319

1320
    @type name: string
1321
    @param name: Certificate name
1322

1323
    """
1324
    return self._SingleNodeCall(node, "x509_cert_remove", [name])
1325

    
1326
  @_RpcTimeout(_TMO_NORMAL)
1327
  def call_import_start(self, node, opts, instance, dest, dest_args):
1328
    """Starts a listener for an import.
1329

1330
    This is a single-node call.
1331

1332
    @type node: string
1333
    @param node: Node name
1334
    @type instance: C{objects.Instance}
1335
    @param instance: Instance object
1336

1337
    """
1338
    return self._SingleNodeCall(node, "import_start",
1339
                                [opts.ToDict(),
1340
                                 self._InstDict(instance), dest,
1341
                                 _EncodeImportExportIO(dest, dest_args)])
1342

    
1343
  @_RpcTimeout(_TMO_NORMAL)
1344
  def call_export_start(self, node, opts, host, port,
1345
                        instance, source, source_args):
1346
    """Starts an export daemon.
1347

1348
    This is a single-node call.
1349

1350
    @type node: string
1351
    @param node: Node name
1352
    @type instance: C{objects.Instance}
1353
    @param instance: Instance object
1354

1355
    """
1356
    return self._SingleNodeCall(node, "export_start",
1357
                                [opts.ToDict(), host, port,
1358
                                 self._InstDict(instance), source,
1359
                                 _EncodeImportExportIO(source, source_args)])
1360

    
1361
  @_RpcTimeout(_TMO_FAST)
1362
  def call_impexp_status(self, node, names):
1363
    """Gets the status of an import or export.
1364

1365
    This is a single-node call.
1366

1367
    @type node: string
1368
    @param node: Node name
1369
    @type names: List of strings
1370
    @param names: Import/export names
1371
    @rtype: List of L{objects.ImportExportStatus} instances
1372
    @return: Returns a list of the state of each named import/export or None if
1373
             a status couldn't be retrieved
1374

1375
    """
1376
    result = self._SingleNodeCall(node, "impexp_status", [names])
1377

    
1378
    if not result.fail_msg:
1379
      decoded = []
1380

    
1381
      for i in result.payload:
1382
        if i is None:
1383
          decoded.append(None)
1384
          continue
1385
        decoded.append(objects.ImportExportStatus.FromDict(i))
1386

    
1387
      result.payload = decoded
1388

    
1389
    return result
1390

    
1391
  @_RpcTimeout(_TMO_NORMAL)
1392
  def call_impexp_abort(self, node, name):
1393
    """Aborts an import or export.
1394

1395
    This is a single-node call.
1396

1397
    @type node: string
1398
    @param node: Node name
1399
    @type name: string
1400
    @param name: Import/export name
1401

1402
    """
1403
    return self._SingleNodeCall(node, "impexp_abort", [name])
1404

    
1405
  @_RpcTimeout(_TMO_NORMAL)
1406
  def call_impexp_cleanup(self, node, name):
1407
    """Cleans up after an import or export.
1408

1409
    This is a single-node call.
1410

1411
    @type node: string
1412
    @param node: Node name
1413
    @type name: string
1414
    @param name: Import/export name
1415

1416
    """
1417
    return self._SingleNodeCall(node, "impexp_cleanup", [name])