Statistics
| Branch: | Tag: | Revision:

root / lib / rpc.py @ 6abf7f2c

History | View | Annotate | Download (41.1 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Inter-node RPC library.
23

24
"""
25

    
26
# pylint: disable-msg=C0103,R0201,R0904
27
# C0103: Invalid name, since call_ are not valid
28
# R0201: Method could be a function, we keep all rpcs instance methods
29
# as not to change them back and forth between static/instance methods
30
# if they need to start using instance attributes
31
# R0904: Too many public methods
32

    
33
import os
34
import logging
35
import zlib
36
import base64
37

    
38
from ganeti import utils
39
from ganeti import objects
40
from ganeti import http
41
from ganeti import serializer
42
from ganeti import constants
43
from ganeti import errors
44

    
45
# pylint has a bug here, doesn't see this import
46
import ganeti.http.client  # pylint: disable-msg=W0611
47

    
48

    
49
# Module level variable
50
_http_manager = None
51

    
52
# Various time constants for the timeout table
53
_TMO_URGENT = 60 # one minute
54
_TMO_FAST = 5 * 60 # five minutes
55
_TMO_NORMAL = 15 * 60 # 15 minutes
56
_TMO_SLOW = 3600 # one hour
57
_TMO_4HRS = 4 * 3600
58
_TMO_1DAY = 86400
59

    
60
# Timeout table that will be built later by decorators
61
# Guidelines for choosing timeouts:
62
# - call used during watcher: timeout -> 1min, _TMO_URGENT
63
# - trivial (but be sure it is trivial) (e.g. reading a file): 5min, _TMO_FAST
64
# - other calls: 15 min, _TMO_NORMAL
65
# - special calls (instance add, etc.): either _TMO_SLOW (1h) or huge timeouts
66

    
67
_TIMEOUTS = {
68
}
69

    
70

    
71
def Init():
72
  """Initializes the module-global HTTP client manager.
73

74
  Must be called before using any RPC function.
75

76
  """
77
  global _http_manager # pylint: disable-msg=W0603
78

    
79
  assert not _http_manager, "RPC module initialized more than once"
80

    
81
  http.InitSsl()
82

    
83
  _http_manager = http.client.HttpClientManager()
84

    
85

    
86
def Shutdown():
87
  """Stops the module-global HTTP client manager.
88

89
  Must be called before quitting the program.
90

91
  """
92
  global _http_manager # pylint: disable-msg=W0603
93

    
94
  if _http_manager:
95
    _http_manager.Shutdown()
96
    _http_manager = None
97

    
98

    
99
def _RpcTimeout(secs):
100
  """Timeout decorator.
101

102
  When applied to a rpc call_* function, it updates the global timeout
103
  table with the given function/timeout.
104

105
  """
106
  def decorator(f):
107
    name = f.__name__
108
    assert name.startswith("call_")
109
    _TIMEOUTS[name[len("call_"):]] = secs
110
    return f
111
  return decorator
112

    
113

    
114
class RpcResult(object):
115
  """RPC Result class.
116

117
  This class holds an RPC result. It is needed since in multi-node
118
  calls we can't raise an exception just because one one out of many
119
  failed, and therefore we use this class to encapsulate the result.
120

121
  @ivar data: the data payload, for successful results, or None
122
  @ivar call: the name of the RPC call
123
  @ivar node: the name of the node to which we made the call
124
  @ivar offline: whether the operation failed because the node was
125
      offline, as opposed to actual failure; offline=True will always
126
      imply failed=True, in order to allow simpler checking if
127
      the user doesn't care about the exact failure mode
128
  @ivar fail_msg: the error message if the call failed
129

130
  """
131
  def __init__(self, data=None, failed=False, offline=False,
132
               call=None, node=None):
133
    self.offline = offline
134
    self.call = call
135
    self.node = node
136

    
137
    if offline:
138
      self.fail_msg = "Node is marked offline"
139
      self.data = self.payload = None
140
    elif failed:
141
      self.fail_msg = self._EnsureErr(data)
142
      self.data = self.payload = None
143
    else:
144
      self.data = data
145
      if not isinstance(self.data, (tuple, list)):
146
        self.fail_msg = ("RPC layer error: invalid result type (%s)" %
147
                         type(self.data))
148
        self.payload = None
149
      elif len(data) != 2:
150
        self.fail_msg = ("RPC layer error: invalid result length (%d), "
151
                         "expected 2" % len(self.data))
152
        self.payload = None
153
      elif not self.data[0]:
154
        self.fail_msg = self._EnsureErr(self.data[1])
155
        self.payload = None
156
      else:
157
        # finally success
158
        self.fail_msg = None
159
        self.payload = data[1]
160

    
161
    assert hasattr(self, "call")
162
    assert hasattr(self, "data")
163
    assert hasattr(self, "fail_msg")
164
    assert hasattr(self, "node")
165
    assert hasattr(self, "offline")
166
    assert hasattr(self, "payload")
167

    
168
  @staticmethod
169
  def _EnsureErr(val):
170
    """Helper to ensure we return a 'True' value for error."""
171
    if val:
172
      return val
173
    else:
174
      return "No error information"
175

    
176
  def Raise(self, msg, prereq=False, ecode=None):
177
    """If the result has failed, raise an OpExecError.
178

179
    This is used so that LU code doesn't have to check for each
180
    result, but instead can call this function.
181

182
    """
183
    if not self.fail_msg:
184
      return
185

    
186
    if not msg: # one could pass None for default message
187
      msg = ("Call '%s' to node '%s' has failed: %s" %
188
             (self.call, self.node, self.fail_msg))
189
    else:
190
      msg = "%s: %s" % (msg, self.fail_msg)
191
    if prereq:
192
      ec = errors.OpPrereqError
193
    else:
194
      ec = errors.OpExecError
195
    if ecode is not None:
196
      args = (msg, prereq)
197
    else:
198
      args = (msg, )
199
    raise ec(*args) # pylint: disable-msg=W0142
200

    
201

    
202
class Client:
203
  """RPC Client class.
204

205
  This class, given a (remote) method name, a list of parameters and a
206
  list of nodes, will contact (in parallel) all nodes, and return a
207
  dict of results (key: node name, value: result).
208

209
  One current bug is that generic failure is still signaled by
210
  'False' result, which is not good. This overloading of values can
211
  cause bugs.
212

213
  """
214
  def __init__(self, procedure, body, port):
215
    assert procedure in _TIMEOUTS, ("New RPC call not declared in the"
216
                                    " timeouts table")
217
    self.procedure = procedure
218
    self.body = body
219
    self.port = port
220
    self.nc = {}
221

    
222
    self._ssl_params = \
223
      http.HttpSslParams(ssl_key_path=constants.NODED_CERT_FILE,
224
                         ssl_cert_path=constants.NODED_CERT_FILE)
225

    
226
  def ConnectList(self, node_list, address_list=None, read_timeout=None):
227
    """Add a list of nodes to the target nodes.
228

229
    @type node_list: list
230
    @param node_list: the list of node names to connect
231
    @type address_list: list or None
232
    @keyword address_list: either None or a list with node addresses,
233
        which must have the same length as the node list
234
    @type read_timeout: int
235
    @param read_timeout: overwrites the default read timeout for the
236
        given operation
237

238
    """
239
    if address_list is None:
240
      address_list = [None for _ in node_list]
241
    else:
242
      assert len(node_list) == len(address_list), \
243
             "Name and address lists should have the same length"
244
    for node, address in zip(node_list, address_list):
245
      self.ConnectNode(node, address, read_timeout=read_timeout)
246

    
247
  def ConnectNode(self, name, address=None, read_timeout=None):
248
    """Add a node to the target list.
249

250
    @type name: str
251
    @param name: the node name
252
    @type address: str
253
    @keyword address: the node address, if known
254

255
    """
256
    if address is None:
257
      address = name
258

    
259
    if read_timeout is None:
260
      read_timeout = _TIMEOUTS[self.procedure]
261

    
262
    self.nc[name] = \
263
      http.client.HttpClientRequest(address, self.port, http.HTTP_PUT,
264
                                    "/%s" % self.procedure,
265
                                    post_data=self.body,
266
                                    ssl_params=self._ssl_params,
267
                                    ssl_verify_peer=True,
268
                                    read_timeout=read_timeout)
269

    
270
  def GetResults(self):
271
    """Call nodes and return results.
272

273
    @rtype: list
274
    @return: List of RPC results
275

276
    """
277
    assert _http_manager, "RPC module not initialized"
278

    
279
    _http_manager.ExecRequests(self.nc.values())
280

    
281
    results = {}
282

    
283
    for name, req in self.nc.iteritems():
284
      if req.success and req.resp_status_code == http.HTTP_OK:
285
        results[name] = RpcResult(data=serializer.LoadJson(req.resp_body),
286
                                  node=name, call=self.procedure)
287
        continue
288

    
289
      # TODO: Better error reporting
290
      if req.error:
291
        msg = req.error
292
      else:
293
        msg = req.resp_body
294

    
295
      logging.error("RPC error in %s from node %s: %s",
296
                    self.procedure, name, msg)
297
      results[name] = RpcResult(data=msg, failed=True, node=name,
298
                                call=self.procedure)
299

    
300
    return results
301

    
302

    
303
def _EncodeImportExportIO(ieio, ieioargs):
304
  """Encodes import/export I/O information.
305

306
  """
307
  if ieio == constants.IEIO_RAW_DISK:
308
    assert len(ieioargs) == 1
309
    return (ieioargs[0].ToDict(), )
310

    
311
  if ieio == constants.IEIO_SCRIPT:
312
    assert len(ieioargs) == 2
313
    return (ieioargs[0].ToDict(), ieioargs[1])
314

    
315
  return ieioargs
316

    
317

    
318
class RpcRunner(object):
319
  """RPC runner class"""
320

    
321
  def __init__(self, cfg):
322
    """Initialized the rpc runner.
323

324
    @type cfg:  C{config.ConfigWriter}
325
    @param cfg: the configuration object that will be used to get data
326
                about the cluster
327

328
    """
329
    self._cfg = cfg
330
    self.port = utils.GetDaemonPort(constants.NODED)
331

    
332
  def _InstDict(self, instance, hvp=None, bep=None):
333
    """Convert the given instance to a dict.
334

335
    This is done via the instance's ToDict() method and additionally
336
    we fill the hvparams with the cluster defaults.
337

338
    @type instance: L{objects.Instance}
339
    @param instance: an Instance object
340
    @type hvp: dict or None
341
    @param hvp: a dictionary with overridden hypervisor parameters
342
    @type bep: dict or None
343
    @param bep: a dictionary with overridden backend parameters
344
    @rtype: dict
345
    @return: the instance dict, with the hvparams filled with the
346
        cluster defaults
347

348
    """
349
    idict = instance.ToDict()
350
    cluster = self._cfg.GetClusterInfo()
351
    idict["hvparams"] = cluster.FillHV(instance)
352
    if hvp is not None:
353
      idict["hvparams"].update(hvp)
354
    idict["beparams"] = cluster.FillBE(instance)
355
    if bep is not None:
356
      idict["beparams"].update(bep)
357
    for nic in idict["nics"]:
358
      nic['nicparams'] = objects.FillDict(
359
        cluster.nicparams[constants.PP_DEFAULT],
360
        nic['nicparams'])
361
    return idict
362

    
363
  def _ConnectList(self, client, node_list, call, read_timeout=None):
364
    """Helper for computing node addresses.
365

366
    @type client: L{ganeti.rpc.Client}
367
    @param client: a C{Client} instance
368
    @type node_list: list
369
    @param node_list: the node list we should connect
370
    @type call: string
371
    @param call: the name of the remote procedure call, for filling in
372
        correctly any eventual offline nodes' results
373
    @type read_timeout: int
374
    @param read_timeout: overwrites the default read timeout for the
375
        given operation
376

377
    """
378
    all_nodes = self._cfg.GetAllNodesInfo()
379
    name_list = []
380
    addr_list = []
381
    skip_dict = {}
382
    for node in node_list:
383
      if node in all_nodes:
384
        if all_nodes[node].offline:
385
          skip_dict[node] = RpcResult(node=node, offline=True, call=call)
386
          continue
387
        val = all_nodes[node].primary_ip
388
      else:
389
        val = None
390
      addr_list.append(val)
391
      name_list.append(node)
392
    if name_list:
393
      client.ConnectList(name_list, address_list=addr_list,
394
                         read_timeout=read_timeout)
395
    return skip_dict
396

    
397
  def _ConnectNode(self, client, node, call, read_timeout=None):
398
    """Helper for computing one node's address.
399

400
    @type client: L{ganeti.rpc.Client}
401
    @param client: a C{Client} instance
402
    @type node: str
403
    @param node: the node we should connect
404
    @type call: string
405
    @param call: the name of the remote procedure call, for filling in
406
        correctly any eventual offline nodes' results
407
    @type read_timeout: int
408
    @param read_timeout: overwrites the default read timeout for the
409
        given operation
410

411
    """
412
    node_info = self._cfg.GetNodeInfo(node)
413
    if node_info is not None:
414
      if node_info.offline:
415
        return RpcResult(node=node, offline=True, call=call)
416
      addr = node_info.primary_ip
417
    else:
418
      addr = None
419
    client.ConnectNode(node, address=addr, read_timeout=read_timeout)
420

    
421
  def _MultiNodeCall(self, node_list, procedure, args, read_timeout=None):
422
    """Helper for making a multi-node call
423

424
    """
425
    body = serializer.DumpJson(args, indent=False)
426
    c = Client(procedure, body, self.port)
427
    skip_dict = self._ConnectList(c, node_list, procedure,
428
                                  read_timeout=read_timeout)
429
    skip_dict.update(c.GetResults())
430
    return skip_dict
431

    
432
  @classmethod
433
  def _StaticMultiNodeCall(cls, node_list, procedure, args,
434
                           address_list=None, read_timeout=None):
435
    """Helper for making a multi-node static call
436

437
    """
438
    body = serializer.DumpJson(args, indent=False)
439
    c = Client(procedure, body, utils.GetDaemonPort(constants.NODED))
440
    c.ConnectList(node_list, address_list=address_list,
441
                  read_timeout=read_timeout)
442
    return c.GetResults()
443

    
444
  def _SingleNodeCall(self, node, procedure, args, read_timeout=None):
445
    """Helper for making a single-node call
446

447
    """
448
    body = serializer.DumpJson(args, indent=False)
449
    c = Client(procedure, body, self.port)
450
    result = self._ConnectNode(c, node, procedure, read_timeout=read_timeout)
451
    if result is None:
452
      # we did connect, node is not offline
453
      result = c.GetResults()[node]
454
    return result
455

    
456
  @classmethod
457
  def _StaticSingleNodeCall(cls, node, procedure, args, read_timeout=None):
458
    """Helper for making a single-node static call
459

460
    """
461
    body = serializer.DumpJson(args, indent=False)
462
    c = Client(procedure, body, utils.GetDaemonPort(constants.NODED))
463
    c.ConnectNode(node, read_timeout=read_timeout)
464
    return c.GetResults()[node]
465

    
466
  @staticmethod
467
  def _Compress(data):
468
    """Compresses a string for transport over RPC.
469

470
    Small amounts of data are not compressed.
471

472
    @type data: str
473
    @param data: Data
474
    @rtype: tuple
475
    @return: Encoded data to send
476

477
    """
478
    # Small amounts of data are not compressed
479
    if len(data) < 512:
480
      return (constants.RPC_ENCODING_NONE, data)
481

    
482
    # Compress with zlib and encode in base64
483
    return (constants.RPC_ENCODING_ZLIB_BASE64,
484
            base64.b64encode(zlib.compress(data, 3)))
485

    
486
  #
487
  # Begin RPC calls
488
  #
489

    
490
  @_RpcTimeout(_TMO_URGENT)
491
  def call_lv_list(self, node_list, vg_name):
492
    """Gets the logical volumes present in a given volume group.
493

494
    This is a multi-node call.
495

496
    """
497
    return self._MultiNodeCall(node_list, "lv_list", [vg_name])
498

    
499
  @_RpcTimeout(_TMO_URGENT)
500
  def call_vg_list(self, node_list):
501
    """Gets the volume group list.
502

503
    This is a multi-node call.
504

505
    """
506
    return self._MultiNodeCall(node_list, "vg_list", [])
507

    
508
  @_RpcTimeout(_TMO_NORMAL)
509
  def call_storage_list(self, node_list, su_name, su_args, name, fields):
510
    """Get list of storage units.
511

512
    This is a multi-node call.
513

514
    """
515
    return self._MultiNodeCall(node_list, "storage_list",
516
                               [su_name, su_args, name, fields])
517

    
518
  @_RpcTimeout(_TMO_NORMAL)
519
  def call_storage_modify(self, node, su_name, su_args, name, changes):
520
    """Modify a storage unit.
521

522
    This is a single-node call.
523

524
    """
525
    return self._SingleNodeCall(node, "storage_modify",
526
                                [su_name, su_args, name, changes])
527

    
528
  @_RpcTimeout(_TMO_NORMAL)
529
  def call_storage_execute(self, node, su_name, su_args, name, op):
530
    """Executes an operation on a storage unit.
531

532
    This is a single-node call.
533

534
    """
535
    return self._SingleNodeCall(node, "storage_execute",
536
                                [su_name, su_args, name, op])
537

    
538
  @_RpcTimeout(_TMO_URGENT)
539
  def call_bridges_exist(self, node, bridges_list):
540
    """Checks if a node has all the bridges given.
541

542
    This method checks if all bridges given in the bridges_list are
543
    present on the remote node, so that an instance that uses interfaces
544
    on those bridges can be started.
545

546
    This is a single-node call.
547

548
    """
549
    return self._SingleNodeCall(node, "bridges_exist", [bridges_list])
550

    
551
  @_RpcTimeout(_TMO_NORMAL)
552
  def call_instance_start(self, node, instance, hvp, bep):
553
    """Starts an instance.
554

555
    This is a single-node call.
556

557
    """
558
    idict = self._InstDict(instance, hvp=hvp, bep=bep)
559
    return self._SingleNodeCall(node, "instance_start", [idict])
560

    
561
  @_RpcTimeout(_TMO_NORMAL)
562
  def call_instance_shutdown(self, node, instance, timeout):
563
    """Stops an instance.
564

565
    This is a single-node call.
566

567
    """
568
    return self._SingleNodeCall(node, "instance_shutdown",
569
                                [self._InstDict(instance), timeout])
570

    
571
  @_RpcTimeout(_TMO_NORMAL)
572
  def call_migration_info(self, node, instance):
573
    """Gather the information necessary to prepare an instance migration.
574

575
    This is a single-node call.
576

577
    @type node: string
578
    @param node: the node on which the instance is currently running
579
    @type instance: C{objects.Instance}
580
    @param instance: the instance definition
581

582
    """
583
    return self._SingleNodeCall(node, "migration_info",
584
                                [self._InstDict(instance)])
585

    
586
  @_RpcTimeout(_TMO_NORMAL)
587
  def call_accept_instance(self, node, instance, info, target):
588
    """Prepare a node to accept an instance.
589

590
    This is a single-node call.
591

592
    @type node: string
593
    @param node: the target node for the migration
594
    @type instance: C{objects.Instance}
595
    @param instance: the instance definition
596
    @type info: opaque/hypervisor specific (string/data)
597
    @param info: result for the call_migration_info call
598
    @type target: string
599
    @param target: target hostname (usually ip address) (on the node itself)
600

601
    """
602
    return self._SingleNodeCall(node, "accept_instance",
603
                                [self._InstDict(instance), info, target])
604

    
605
  @_RpcTimeout(_TMO_NORMAL)
606
  def call_finalize_migration(self, node, instance, info, success):
607
    """Finalize any target-node migration specific operation.
608

609
    This is called both in case of a successful migration and in case of error
610
    (in which case it should abort the migration).
611

612
    This is a single-node call.
613

614
    @type node: string
615
    @param node: the target node for the migration
616
    @type instance: C{objects.Instance}
617
    @param instance: the instance definition
618
    @type info: opaque/hypervisor specific (string/data)
619
    @param info: result for the call_migration_info call
620
    @type success: boolean
621
    @param success: whether the migration was a success or a failure
622

623
    """
624
    return self._SingleNodeCall(node, "finalize_migration",
625
                                [self._InstDict(instance), info, success])
626

    
627
  @_RpcTimeout(_TMO_SLOW)
628
  def call_instance_migrate(self, node, instance, target, live):
629
    """Migrate an instance.
630

631
    This is a single-node call.
632

633
    @type node: string
634
    @param node: the node on which the instance is currently running
635
    @type instance: C{objects.Instance}
636
    @param instance: the instance definition
637
    @type target: string
638
    @param target: the target node name
639
    @type live: boolean
640
    @param live: whether the migration should be done live or not (the
641
        interpretation of this parameter is left to the hypervisor)
642

643
    """
644
    return self._SingleNodeCall(node, "instance_migrate",
645
                                [self._InstDict(instance), target, live])
646

    
647
  @_RpcTimeout(_TMO_NORMAL)
648
  def call_instance_reboot(self, node, inst, reboot_type, shutdown_timeout):
649
    """Reboots an instance.
650

651
    This is a single-node call.
652

653
    """
654
    return self._SingleNodeCall(node, "instance_reboot",
655
                                [self._InstDict(inst), reboot_type,
656
                                 shutdown_timeout])
657

    
658
  @_RpcTimeout(_TMO_1DAY)
659
  def call_instance_os_add(self, node, inst, reinstall, debug):
660
    """Installs an OS on the given instance.
661

662
    This is a single-node call.
663

664
    """
665
    return self._SingleNodeCall(node, "instance_os_add",
666
                                [self._InstDict(inst), reinstall, debug])
667

    
668
  @_RpcTimeout(_TMO_SLOW)
669
  def call_instance_run_rename(self, node, inst, old_name, debug):
670
    """Run the OS rename script for an instance.
671

672
    This is a single-node call.
673

674
    """
675
    return self._SingleNodeCall(node, "instance_run_rename",
676
                                [self._InstDict(inst), old_name, debug])
677

    
678
  @_RpcTimeout(_TMO_URGENT)
679
  def call_instance_info(self, node, instance, hname):
680
    """Returns information about a single instance.
681

682
    This is a single-node call.
683

684
    @type node: list
685
    @param node: the list of nodes to query
686
    @type instance: string
687
    @param instance: the instance name
688
    @type hname: string
689
    @param hname: the hypervisor type of the instance
690

691
    """
692
    return self._SingleNodeCall(node, "instance_info", [instance, hname])
693

    
694
  @_RpcTimeout(_TMO_NORMAL)
695
  def call_instance_migratable(self, node, instance):
696
    """Checks whether the given instance can be migrated.
697

698
    This is a single-node call.
699

700
    @param node: the node to query
701
    @type instance: L{objects.Instance}
702
    @param instance: the instance to check
703

704

705
    """
706
    return self._SingleNodeCall(node, "instance_migratable",
707
                                [self._InstDict(instance)])
708

    
709
  @_RpcTimeout(_TMO_URGENT)
710
  def call_all_instances_info(self, node_list, hypervisor_list):
711
    """Returns information about all instances on the given nodes.
712

713
    This is a multi-node call.
714

715
    @type node_list: list
716
    @param node_list: the list of nodes to query
717
    @type hypervisor_list: list
718
    @param hypervisor_list: the hypervisors to query for instances
719

720
    """
721
    return self._MultiNodeCall(node_list, "all_instances_info",
722
                               [hypervisor_list])
723

    
724
  @_RpcTimeout(_TMO_URGENT)
725
  def call_instance_list(self, node_list, hypervisor_list):
726
    """Returns the list of running instances on a given node.
727

728
    This is a multi-node call.
729

730
    @type node_list: list
731
    @param node_list: the list of nodes to query
732
    @type hypervisor_list: list
733
    @param hypervisor_list: the hypervisors to query for instances
734

735
    """
736
    return self._MultiNodeCall(node_list, "instance_list", [hypervisor_list])
737

    
738
  @_RpcTimeout(_TMO_FAST)
739
  def call_node_tcp_ping(self, node, source, target, port, timeout,
740
                         live_port_needed):
741
    """Do a TcpPing on the remote node
742

743
    This is a single-node call.
744

745
    """
746
    return self._SingleNodeCall(node, "node_tcp_ping",
747
                                [source, target, port, timeout,
748
                                 live_port_needed])
749

    
750
  @_RpcTimeout(_TMO_FAST)
751
  def call_node_has_ip_address(self, node, address):
752
    """Checks if a node has the given IP address.
753

754
    This is a single-node call.
755

756
    """
757
    return self._SingleNodeCall(node, "node_has_ip_address", [address])
758

    
759
  @_RpcTimeout(_TMO_URGENT)
760
  def call_node_info(self, node_list, vg_name, hypervisor_type):
761
    """Return node information.
762

763
    This will return memory information and volume group size and free
764
    space.
765

766
    This is a multi-node call.
767

768
    @type node_list: list
769
    @param node_list: the list of nodes to query
770
    @type vg_name: C{string}
771
    @param vg_name: the name of the volume group to ask for disk space
772
        information
773
    @type hypervisor_type: C{str}
774
    @param hypervisor_type: the name of the hypervisor to ask for
775
        memory information
776

777
    """
778
    return self._MultiNodeCall(node_list, "node_info",
779
                               [vg_name, hypervisor_type])
780

    
781
  @_RpcTimeout(_TMO_NORMAL)
782
  def call_node_add(self, node, dsa, dsapub, rsa, rsapub, ssh, sshpub):
783
    """Add a node to the cluster.
784

785
    This is a single-node call.
786

787
    """
788
    return self._SingleNodeCall(node, "node_add",
789
                                [dsa, dsapub, rsa, rsapub, ssh, sshpub])
790

    
791
  @_RpcTimeout(_TMO_NORMAL)
792
  def call_node_verify(self, node_list, checkdict, cluster_name):
793
    """Request verification of given parameters.
794

795
    This is a multi-node call.
796

797
    """
798
    return self._MultiNodeCall(node_list, "node_verify",
799
                               [checkdict, cluster_name])
800

    
801
  @classmethod
802
  @_RpcTimeout(_TMO_FAST)
803
  def call_node_start_master(cls, node, start_daemons, no_voting):
804
    """Tells a node to activate itself as a master.
805

806
    This is a single-node call.
807

808
    """
809
    return cls._StaticSingleNodeCall(node, "node_start_master",
810
                                     [start_daemons, no_voting])
811

    
812
  @classmethod
813
  @_RpcTimeout(_TMO_FAST)
814
  def call_node_stop_master(cls, node, stop_daemons):
815
    """Tells a node to demote itself from master status.
816

817
    This is a single-node call.
818

819
    """
820
    return cls._StaticSingleNodeCall(node, "node_stop_master", [stop_daemons])
821

    
822
  @classmethod
823
  @_RpcTimeout(_TMO_URGENT)
824
  def call_master_info(cls, node_list):
825
    """Query master info.
826

827
    This is a multi-node call.
828

829
    """
830
    # TODO: should this method query down nodes?
831
    return cls._StaticMultiNodeCall(node_list, "master_info", [])
832

    
833
  @classmethod
834
  @_RpcTimeout(_TMO_URGENT)
835
  def call_version(cls, node_list):
836
    """Query node version.
837

838
    This is a multi-node call.
839

840
    """
841
    return cls._StaticMultiNodeCall(node_list, "version", [])
842

    
843
  @_RpcTimeout(_TMO_NORMAL)
844
  def call_blockdev_create(self, node, bdev, size, owner, on_primary, info):
845
    """Request creation of a given block device.
846

847
    This is a single-node call.
848

849
    """
850
    return self._SingleNodeCall(node, "blockdev_create",
851
                                [bdev.ToDict(), size, owner, on_primary, info])
852

    
853
  @_RpcTimeout(_TMO_NORMAL)
854
  def call_blockdev_remove(self, node, bdev):
855
    """Request removal of a given block device.
856

857
    This is a single-node call.
858

859
    """
860
    return self._SingleNodeCall(node, "blockdev_remove", [bdev.ToDict()])
861

    
862
  @_RpcTimeout(_TMO_NORMAL)
863
  def call_blockdev_rename(self, node, devlist):
864
    """Request rename of the given block devices.
865

866
    This is a single-node call.
867

868
    """
869
    return self._SingleNodeCall(node, "blockdev_rename",
870
                                [(d.ToDict(), uid) for d, uid in devlist])
871

    
872
  @_RpcTimeout(_TMO_NORMAL)
873
  def call_blockdev_assemble(self, node, disk, owner, on_primary):
874
    """Request assembling of a given block device.
875

876
    This is a single-node call.
877

878
    """
879
    return self._SingleNodeCall(node, "blockdev_assemble",
880
                                [disk.ToDict(), owner, on_primary])
881

    
882
  @_RpcTimeout(_TMO_NORMAL)
883
  def call_blockdev_shutdown(self, node, disk):
884
    """Request shutdown of a given block device.
885

886
    This is a single-node call.
887

888
    """
889
    return self._SingleNodeCall(node, "blockdev_shutdown", [disk.ToDict()])
890

    
891
  @_RpcTimeout(_TMO_NORMAL)
892
  def call_blockdev_addchildren(self, node, bdev, ndevs):
893
    """Request adding a list of children to a (mirroring) device.
894

895
    This is a single-node call.
896

897
    """
898
    return self._SingleNodeCall(node, "blockdev_addchildren",
899
                                [bdev.ToDict(),
900
                                 [disk.ToDict() for disk in ndevs]])
901

    
902
  @_RpcTimeout(_TMO_NORMAL)
903
  def call_blockdev_removechildren(self, node, bdev, ndevs):
904
    """Request removing a list of children from a (mirroring) device.
905

906
    This is a single-node call.
907

908
    """
909
    return self._SingleNodeCall(node, "blockdev_removechildren",
910
                                [bdev.ToDict(),
911
                                 [disk.ToDict() for disk in ndevs]])
912

    
913
  @_RpcTimeout(_TMO_NORMAL)
914
  def call_blockdev_getmirrorstatus(self, node, disks):
915
    """Request status of a (mirroring) device.
916

917
    This is a single-node call.
918

919
    """
920
    result = self._SingleNodeCall(node, "blockdev_getmirrorstatus",
921
                                  [dsk.ToDict() for dsk in disks])
922
    if not result.fail_msg:
923
      result.payload = [objects.BlockDevStatus.FromDict(i)
924
                        for i in result.payload]
925
    return result
926

    
927
  @_RpcTimeout(_TMO_NORMAL)
928
  def call_blockdev_find(self, node, disk):
929
    """Request identification of a given block device.
930

931
    This is a single-node call.
932

933
    """
934
    result = self._SingleNodeCall(node, "blockdev_find", [disk.ToDict()])
935
    if not result.fail_msg and result.payload is not None:
936
      result.payload = objects.BlockDevStatus.FromDict(result.payload)
937
    return result
938

    
939
  @_RpcTimeout(_TMO_NORMAL)
940
  def call_blockdev_close(self, node, instance_name, disks):
941
    """Closes the given block devices.
942

943
    This is a single-node call.
944

945
    """
946
    params = [instance_name, [cf.ToDict() for cf in disks]]
947
    return self._SingleNodeCall(node, "blockdev_close", params)
948

    
949
  @_RpcTimeout(_TMO_NORMAL)
950
  def call_blockdev_getsizes(self, node, disks):
951
    """Returns the size of the given disks.
952

953
    This is a single-node call.
954

955
    """
956
    params = [[cf.ToDict() for cf in disks]]
957
    return self._SingleNodeCall(node, "blockdev_getsize", params)
958

    
959
  @_RpcTimeout(_TMO_NORMAL)
960
  def call_drbd_disconnect_net(self, node_list, nodes_ip, disks):
961
    """Disconnects the network of the given drbd devices.
962

963
    This is a multi-node call.
964

965
    """
966
    return self._MultiNodeCall(node_list, "drbd_disconnect_net",
967
                               [nodes_ip, [cf.ToDict() for cf in disks]])
968

    
969
  @_RpcTimeout(_TMO_NORMAL)
970
  def call_drbd_attach_net(self, node_list, nodes_ip,
971
                           disks, instance_name, multimaster):
972
    """Disconnects the given drbd devices.
973

974
    This is a multi-node call.
975

976
    """
977
    return self._MultiNodeCall(node_list, "drbd_attach_net",
978
                               [nodes_ip, [cf.ToDict() for cf in disks],
979
                                instance_name, multimaster])
980

    
981
  @_RpcTimeout(_TMO_SLOW)
982
  def call_drbd_wait_sync(self, node_list, nodes_ip, disks):
983
    """Waits for the synchronization of drbd devices is complete.
984

985
    This is a multi-node call.
986

987
    """
988
    return self._MultiNodeCall(node_list, "drbd_wait_sync",
989
                               [nodes_ip, [cf.ToDict() for cf in disks]])
990

    
991
  @classmethod
992
  @_RpcTimeout(_TMO_NORMAL)
993
  def call_upload_file(cls, node_list, file_name, address_list=None):
994
    """Upload a file.
995

996
    The node will refuse the operation in case the file is not on the
997
    approved file list.
998

999
    This is a multi-node call.
1000

1001
    @type node_list: list
1002
    @param node_list: the list of node names to upload to
1003
    @type file_name: str
1004
    @param file_name: the filename to upload
1005
    @type address_list: list or None
1006
    @keyword address_list: an optional list of node addresses, in order
1007
        to optimize the RPC speed
1008

1009
    """
1010
    file_contents = utils.ReadFile(file_name)
1011
    data = cls._Compress(file_contents)
1012
    st = os.stat(file_name)
1013
    params = [file_name, data, st.st_mode, st.st_uid, st.st_gid,
1014
              st.st_atime, st.st_mtime]
1015
    return cls._StaticMultiNodeCall(node_list, "upload_file", params,
1016
                                    address_list=address_list)
1017

    
1018
  @classmethod
1019
  @_RpcTimeout(_TMO_NORMAL)
1020
  def call_write_ssconf_files(cls, node_list, values):
1021
    """Write ssconf files.
1022

1023
    This is a multi-node call.
1024

1025
    """
1026
    return cls._StaticMultiNodeCall(node_list, "write_ssconf_files", [values])
1027

    
1028
  @_RpcTimeout(_TMO_FAST)
1029
  def call_os_diagnose(self, node_list):
1030
    """Request a diagnose of OS definitions.
1031

1032
    This is a multi-node call.
1033

1034
    """
1035
    return self._MultiNodeCall(node_list, "os_diagnose", [])
1036

    
1037
  @_RpcTimeout(_TMO_FAST)
1038
  def call_os_get(self, node, name):
1039
    """Returns an OS definition.
1040

1041
    This is a single-node call.
1042

1043
    """
1044
    result = self._SingleNodeCall(node, "os_get", [name])
1045
    if not result.fail_msg and isinstance(result.payload, dict):
1046
      result.payload = objects.OS.FromDict(result.payload)
1047
    return result
1048

    
1049
  @_RpcTimeout(_TMO_NORMAL)
1050
  def call_hooks_runner(self, node_list, hpath, phase, env):
1051
    """Call the hooks runner.
1052

1053
    Args:
1054
      - op: the OpCode instance
1055
      - env: a dictionary with the environment
1056

1057
    This is a multi-node call.
1058

1059
    """
1060
    params = [hpath, phase, env]
1061
    return self._MultiNodeCall(node_list, "hooks_runner", params)
1062

    
1063
  @_RpcTimeout(_TMO_NORMAL)
1064
  def call_iallocator_runner(self, node, name, idata):
1065
    """Call an iallocator on a remote node
1066

1067
    Args:
1068
      - name: the iallocator name
1069
      - input: the json-encoded input string
1070

1071
    This is a single-node call.
1072

1073
    """
1074
    return self._SingleNodeCall(node, "iallocator_runner", [name, idata])
1075

    
1076
  @_RpcTimeout(_TMO_NORMAL)
1077
  def call_blockdev_grow(self, node, cf_bdev, amount):
1078
    """Request a snapshot of the given block device.
1079

1080
    This is a single-node call.
1081

1082
    """
1083
    return self._SingleNodeCall(node, "blockdev_grow",
1084
                                [cf_bdev.ToDict(), amount])
1085

    
1086
  @_RpcTimeout(_TMO_1DAY)
1087
  def call_blockdev_export(self, node, cf_bdev,
1088
                           dest_node, dest_path, cluster_name):
1089
    """Export a given disk to another node.
1090

1091
    This is a single-node call.
1092

1093
    """
1094
    return self._SingleNodeCall(node, "blockdev_export",
1095
                                [cf_bdev.ToDict(), dest_node, dest_path,
1096
                                 cluster_name])
1097

    
1098
  @_RpcTimeout(_TMO_NORMAL)
1099
  def call_blockdev_snapshot(self, node, cf_bdev):
1100
    """Request a snapshot of the given block device.
1101

1102
    This is a single-node call.
1103

1104
    """
1105
    return self._SingleNodeCall(node, "blockdev_snapshot", [cf_bdev.ToDict()])
1106

    
1107
  @_RpcTimeout(_TMO_NORMAL)
1108
  def call_finalize_export(self, node, instance, snap_disks):
1109
    """Request the completion of an export operation.
1110

1111
    This writes the export config file, etc.
1112

1113
    This is a single-node call.
1114

1115
    """
1116
    flat_disks = []
1117
    for disk in snap_disks:
1118
      if isinstance(disk, bool):
1119
        flat_disks.append(disk)
1120
      else:
1121
        flat_disks.append(disk.ToDict())
1122

    
1123
    return self._SingleNodeCall(node, "finalize_export",
1124
                                [self._InstDict(instance), flat_disks])
1125

    
1126
  @_RpcTimeout(_TMO_FAST)
1127
  def call_export_info(self, node, path):
1128
    """Queries the export information in a given path.
1129

1130
    This is a single-node call.
1131

1132
    """
1133
    return self._SingleNodeCall(node, "export_info", [path])
1134

    
1135
  @_RpcTimeout(_TMO_FAST)
1136
  def call_export_list(self, node_list):
1137
    """Gets the stored exports list.
1138

1139
    This is a multi-node call.
1140

1141
    """
1142
    return self._MultiNodeCall(node_list, "export_list", [])
1143

    
1144
  @_RpcTimeout(_TMO_FAST)
1145
  def call_export_remove(self, node, export):
1146
    """Requests removal of a given export.
1147

1148
    This is a single-node call.
1149

1150
    """
1151
    return self._SingleNodeCall(node, "export_remove", [export])
1152

    
1153
  @classmethod
1154
  @_RpcTimeout(_TMO_NORMAL)
1155
  def call_node_leave_cluster(cls, node, modify_ssh_setup):
1156
    """Requests a node to clean the cluster information it has.
1157

1158
    This will remove the configuration information from the ganeti data
1159
    dir.
1160

1161
    This is a single-node call.
1162

1163
    """
1164
    return cls._StaticSingleNodeCall(node, "node_leave_cluster",
1165
                                     [modify_ssh_setup])
1166

    
1167
  @_RpcTimeout(_TMO_FAST)
1168
  def call_node_volumes(self, node_list):
1169
    """Gets all volumes on node(s).
1170

1171
    This is a multi-node call.
1172

1173
    """
1174
    return self._MultiNodeCall(node_list, "node_volumes", [])
1175

    
1176
  @_RpcTimeout(_TMO_FAST)
1177
  def call_node_demote_from_mc(self, node):
1178
    """Demote a node from the master candidate role.
1179

1180
    This is a single-node call.
1181

1182
    """
1183
    return self._SingleNodeCall(node, "node_demote_from_mc", [])
1184

    
1185
  @_RpcTimeout(_TMO_NORMAL)
1186
  def call_node_powercycle(self, node, hypervisor):
1187
    """Tries to powercycle a node.
1188

1189
    This is a single-node call.
1190

1191
    """
1192
    return self._SingleNodeCall(node, "node_powercycle", [hypervisor])
1193

    
1194
  @_RpcTimeout(None)
1195
  def call_test_delay(self, node_list, duration):
1196
    """Sleep for a fixed time on given node(s).
1197

1198
    This is a multi-node call.
1199

1200
    """
1201
    return self._MultiNodeCall(node_list, "test_delay", [duration],
1202
                               read_timeout=int(duration + 5))
1203

    
1204
  @_RpcTimeout(_TMO_FAST)
1205
  def call_file_storage_dir_create(self, node, file_storage_dir):
1206
    """Create the given file storage directory.
1207

1208
    This is a single-node call.
1209

1210
    """
1211
    return self._SingleNodeCall(node, "file_storage_dir_create",
1212
                                [file_storage_dir])
1213

    
1214
  @_RpcTimeout(_TMO_FAST)
1215
  def call_file_storage_dir_remove(self, node, file_storage_dir):
1216
    """Remove the given file storage directory.
1217

1218
    This is a single-node call.
1219

1220
    """
1221
    return self._SingleNodeCall(node, "file_storage_dir_remove",
1222
                                [file_storage_dir])
1223

    
1224
  @_RpcTimeout(_TMO_FAST)
1225
  def call_file_storage_dir_rename(self, node, old_file_storage_dir,
1226
                                   new_file_storage_dir):
1227
    """Rename file storage directory.
1228

1229
    This is a single-node call.
1230

1231
    """
1232
    return self._SingleNodeCall(node, "file_storage_dir_rename",
1233
                                [old_file_storage_dir, new_file_storage_dir])
1234

    
1235
  @classmethod
1236
  @_RpcTimeout(_TMO_FAST)
1237
  def call_jobqueue_update(cls, node_list, address_list, file_name, content):
1238
    """Update job queue.
1239

1240
    This is a multi-node call.
1241

1242
    """
1243
    return cls._StaticMultiNodeCall(node_list, "jobqueue_update",
1244
                                    [file_name, cls._Compress(content)],
1245
                                    address_list=address_list)
1246

    
1247
  @classmethod
1248
  @_RpcTimeout(_TMO_NORMAL)
1249
  def call_jobqueue_purge(cls, node):
1250
    """Purge job queue.
1251

1252
    This is a single-node call.
1253

1254
    """
1255
    return cls._StaticSingleNodeCall(node, "jobqueue_purge", [])
1256

    
1257
  @classmethod
1258
  @_RpcTimeout(_TMO_FAST)
1259
  def call_jobqueue_rename(cls, node_list, address_list, rename):
1260
    """Rename a job queue file.
1261

1262
    This is a multi-node call.
1263

1264
    """
1265
    return cls._StaticMultiNodeCall(node_list, "jobqueue_rename", rename,
1266
                                    address_list=address_list)
1267

    
1268
  @_RpcTimeout(_TMO_NORMAL)
1269
  def call_hypervisor_validate_params(self, node_list, hvname, hvparams):
1270
    """Validate the hypervisor params.
1271

1272
    This is a multi-node call.
1273

1274
    @type node_list: list
1275
    @param node_list: the list of nodes to query
1276
    @type hvname: string
1277
    @param hvname: the hypervisor name
1278
    @type hvparams: dict
1279
    @param hvparams: the hypervisor parameters to be validated
1280

1281
    """
1282
    cluster = self._cfg.GetClusterInfo()
1283
    hv_full = objects.FillDict(cluster.hvparams.get(hvname, {}), hvparams)
1284
    return self._MultiNodeCall(node_list, "hypervisor_validate_params",
1285
                               [hvname, hv_full])
1286

    
1287
  @_RpcTimeout(_TMO_NORMAL)
1288
  def call_x509_cert_create(self, node, validity):
1289
    """Creates a new X509 certificate for SSL/TLS.
1290

1291
    This is a single-node call.
1292

1293
    @type validity: int
1294
    @param validity: Validity in seconds
1295

1296
    """
1297
    return self._SingleNodeCall(node, "x509_cert_create", [validity])
1298

    
1299
  @_RpcTimeout(_TMO_NORMAL)
1300
  def call_x509_cert_remove(self, node, name):
1301
    """Removes a X509 certificate.
1302

1303
    This is a single-node call.
1304

1305
    @type name: string
1306
    @param name: Certificate name
1307

1308
    """
1309
    return self._SingleNodeCall(node, "x509_cert_remove", [name])
1310

    
1311
  @_RpcTimeout(_TMO_NORMAL)
1312
  def call_import_start(self, node, opts, instance, dest, dest_args):
1313
    """Starts a listener for an import.
1314

1315
    This is a single-node call.
1316

1317
    @type node: string
1318
    @param node: Node name
1319
    @type instance: C{objects.Instance}
1320
    @param instance: Instance object
1321

1322
    """
1323
    return self._SingleNodeCall(node, "import_start",
1324
                                [opts.ToDict(),
1325
                                 self._InstDict(instance), dest,
1326
                                 _EncodeImportExportIO(dest, dest_args)])
1327

    
1328
  @_RpcTimeout(_TMO_NORMAL)
1329
  def call_export_start(self, node, opts, host, port,
1330
                        instance, source, source_args):
1331
    """Starts an export daemon.
1332

1333
    This is a single-node call.
1334

1335
    @type node: string
1336
    @param node: Node name
1337
    @type instance: C{objects.Instance}
1338
    @param instance: Instance object
1339

1340
    """
1341
    return self._SingleNodeCall(node, "export_start",
1342
                                [opts.ToDict(), host, port,
1343
                                 self._InstDict(instance), source,
1344
                                 _EncodeImportExportIO(source, source_args)])
1345

    
1346
  @_RpcTimeout(_TMO_FAST)
1347
  def call_impexp_status(self, node, names):
1348
    """Gets the status of an import or export.
1349

1350
    This is a single-node call.
1351

1352
    @type node: string
1353
    @param node: Node name
1354
    @type names: List of strings
1355
    @param names: Import/export names
1356
    @rtype: List of L{objects.ImportExportStatus} instances
1357
    @return: Returns a list of the state of each named import/export or None if
1358
             a status couldn't be retrieved
1359

1360
    """
1361
    result = self._SingleNodeCall(node, "impexp_status", [names])
1362

    
1363
    if not result.fail_msg:
1364
      decoded = []
1365

    
1366
      for i in result.payload:
1367
        if i is None:
1368
          decoded.append(None)
1369
          continue
1370
        decoded.append(objects.ImportExportStatus.FromDict(i))
1371

    
1372
      result.payload = decoded
1373

    
1374
    return result
1375

    
1376
  @_RpcTimeout(_TMO_NORMAL)
1377
  def call_impexp_abort(self, node, name):
1378
    """Aborts an import or export.
1379

1380
    This is a single-node call.
1381

1382
    @type node: string
1383
    @param node: Node name
1384
    @type name: string
1385
    @param name: Import/export name
1386

1387
    """
1388
    return self._SingleNodeCall(node, "impexp_abort", [name])
1389

    
1390
  @_RpcTimeout(_TMO_NORMAL)
1391
  def call_impexp_cleanup(self, node, name):
1392
    """Cleans up after an import or export.
1393

1394
    This is a single-node call.
1395

1396
    @type node: string
1397
    @param node: Node name
1398
    @type name: string
1399
    @param name: Import/export name
1400

1401
    """
1402
    return self._SingleNodeCall(node, "impexp_cleanup", [name])