Statistics
| Branch: | Tag: | Revision:

root / lib / rpc.py @ 6906a9d8

History | View | Annotate | Download (32.3 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Inter-node RPC library.
23

24
"""
25

    
26
# pylint: disable-msg=C0103,R0201,R0904
27
# C0103: Invalid name, since call_ are not valid
28
# R0201: Method could be a function, we keep all rpcs instance methods
29
# as not to change them back and forth between static/instance methods
30
# if they need to start using instance attributes
31
# R0904: Too many public methods
32

    
33
import os
34
import socket
35
import logging
36
import zlib
37
import base64
38

    
39
from ganeti import utils
40
from ganeti import objects
41
from ganeti import http
42
from ganeti import serializer
43
from ganeti import constants
44
from ganeti import errors
45

    
46
import ganeti.http.client
47

    
48

    
49
# Module level variable
50
_http_manager = None
51

    
52

    
53
def Init():
54
  """Initializes the module-global HTTP client manager.
55

56
  Must be called before using any RPC function.
57

58
  """
59
  global _http_manager
60

    
61
  assert not _http_manager, "RPC module initialized more than once"
62

    
63
  _http_manager = http.client.HttpClientManager()
64

    
65

    
66
def Shutdown():
67
  """Stops the module-global HTTP client manager.
68

69
  Must be called before quitting the program.
70

71
  """
72
  global _http_manager
73

    
74
  if _http_manager:
75
    _http_manager.Shutdown()
76
    _http_manager = None
77

    
78

    
79
class RpcResult(object):
80
  """RPC Result class.
81

82
  This class holds an RPC result. It is needed since in multi-node
83
  calls we can't raise an exception just because one one out of many
84
  failed, and therefore we use this class to encapsulate the result.
85

86
  @ivar data: the data payload, for successfull results, or None
87
  @type failed: boolean
88
  @ivar failed: whether the operation failed at RPC level (not
89
      application level on the remote node)
90
  @ivar call: the name of the RPC call
91
  @ivar node: the name of the node to which we made the call
92
  @ivar offline: whether the operation failed because the node was
93
      offline, as opposed to actual failure; offline=True will always
94
      imply failed=True, in order to allow simpler checking if
95
      the user doesn't care about the exact failure mode
96

97
  """
98
  def __init__(self, data=None, failed=False, offline=False,
99
               call=None, node=None):
100
    self.failed = failed
101
    self.offline = offline
102
    self.call = call
103
    self.node = node
104
    if offline:
105
      self.failed = True
106
      self.error = "Node is marked offline"
107
      self.data = None
108
    elif failed:
109
      self.error = data
110
      self.data = None
111
    else:
112
      self.data = data
113
      self.error = None
114

    
115
  def Raise(self):
116
    """If the result has failed, raise an OpExecError.
117

118
    This is used so that LU code doesn't have to check for each
119
    result, but instead can call this function.
120

121
    """
122
    if self.failed:
123
      raise errors.OpExecError("Call '%s' to node '%s' has failed: %s" %
124
                               (self.call, self.node, self.error))
125

    
126
  def RemoteFailMsg(self):
127
    """Check if the remote procedure failed.
128

129
    This is valid only for RPC calls which return result of the form
130
    (status, data | error_msg).
131

132
    @return: empty string for succcess, otherwise an error message
133

134
    """
135
    def _EnsureErr(val):
136
      """Helper to ensure we return a 'True' value for error."""
137
      if val:
138
        return val
139
      else:
140
        return "No error information"
141

    
142
    if self.failed:
143
      return _EnsureErr(self.error)
144
    if not isinstance(self.data, (tuple, list)):
145
      return "Invalid result type (%s)" % type(self.data)
146
    if len(self.data) != 2:
147
      return "Invalid result length (%d), expected 2" % len(self.data)
148
    if not self.data[0]:
149
      return _EnsureErr(self.data[1])
150
    return ""
151

    
152

    
153
class Client:
154
  """RPC Client class.
155

156
  This class, given a (remote) method name, a list of parameters and a
157
  list of nodes, will contact (in parallel) all nodes, and return a
158
  dict of results (key: node name, value: result).
159

160
  One current bug is that generic failure is still signalled by
161
  'False' result, which is not good. This overloading of values can
162
  cause bugs.
163

164
  """
165
  def __init__(self, procedure, body, port):
166
    self.procedure = procedure
167
    self.body = body
168
    self.port = port
169
    self.nc = {}
170

    
171
    self._ssl_params = \
172
      http.HttpSslParams(ssl_key_path=constants.SSL_CERT_FILE,
173
                         ssl_cert_path=constants.SSL_CERT_FILE)
174

    
175
  def ConnectList(self, node_list, address_list=None):
176
    """Add a list of nodes to the target nodes.
177

178
    @type node_list: list
179
    @param node_list: the list of node names to connect
180
    @type address_list: list or None
181
    @keyword address_list: either None or a list with node addresses,
182
        which must have the same length as the node list
183

184
    """
185
    if address_list is None:
186
      address_list = [None for _ in node_list]
187
    else:
188
      assert len(node_list) == len(address_list), \
189
             "Name and address lists should have the same length"
190
    for node, address in zip(node_list, address_list):
191
      self.ConnectNode(node, address)
192

    
193
  def ConnectNode(self, name, address=None):
194
    """Add a node to the target list.
195

196
    @type name: str
197
    @param name: the node name
198
    @type address: str
199
    @keyword address: the node address, if known
200

201
    """
202
    if address is None:
203
      address = name
204

    
205
    self.nc[name] = \
206
      http.client.HttpClientRequest(address, self.port, http.HTTP_PUT,
207
                                    "/%s" % self.procedure,
208
                                    post_data=self.body,
209
                                    ssl_params=self._ssl_params,
210
                                    ssl_verify_peer=True)
211

    
212
  def GetResults(self):
213
    """Call nodes and return results.
214

215
    @rtype: list
216
    @returns: List of RPC results
217

218
    """
219
    assert _http_manager, "RPC module not intialized"
220

    
221
    _http_manager.ExecRequests(self.nc.values())
222

    
223
    results = {}
224

    
225
    for name, req in self.nc.iteritems():
226
      if req.success and req.resp_status_code == http.HTTP_OK:
227
        results[name] = RpcResult(data=serializer.LoadJson(req.resp_body),
228
                                  node=name, call=self.procedure)
229
        continue
230

    
231
      # TODO: Better error reporting
232
      if req.error:
233
        msg = req.error
234
      else:
235
        msg = req.resp_body
236

    
237
      logging.error("RPC error from node %s: %s", name, msg)
238
      results[name] = RpcResult(data=msg, failed=True, node=name,
239
                                call=self.procedure)
240

    
241
    return results
242

    
243

    
244
class RpcRunner(object):
245
  """RPC runner class"""
246

    
247
  def __init__(self, cfg):
248
    """Initialized the rpc runner.
249

250
    @type cfg:  C{config.ConfigWriter}
251
    @param cfg: the configuration object that will be used to get data
252
                about the cluster
253

254
    """
255
    self._cfg = cfg
256
    self.port = utils.GetNodeDaemonPort()
257

    
258
  def _InstDict(self, instance):
259
    """Convert the given instance to a dict.
260

261
    This is done via the instance's ToDict() method and additionally
262
    we fill the hvparams with the cluster defaults.
263

264
    @type instance: L{objects.Instance}
265
    @param instance: an Instance object
266
    @rtype: dict
267
    @return: the instance dict, with the hvparams filled with the
268
        cluster defaults
269

270
    """
271
    idict = instance.ToDict()
272
    cluster = self._cfg.GetClusterInfo()
273
    idict["hvparams"] = cluster.FillHV(instance)
274
    idict["beparams"] = cluster.FillBE(instance)
275
    return idict
276

    
277
  def _ConnectList(self, client, node_list):
278
    """Helper for computing node addresses.
279

280
    @type client: L{Client}
281
    @param client: a C{Client} instance
282
    @type node_list: list
283
    @param node_list: the node list we should connect
284

285
    """
286
    all_nodes = self._cfg.GetAllNodesInfo()
287
    name_list = []
288
    addr_list = []
289
    skip_dict = {}
290
    for node in node_list:
291
      if node in all_nodes:
292
        if all_nodes[node].offline:
293
          skip_dict[node] = RpcResult(node=node, offline=True)
294
          continue
295
        val = all_nodes[node].primary_ip
296
      else:
297
        val = None
298
      addr_list.append(val)
299
      name_list.append(node)
300
    if name_list:
301
      client.ConnectList(name_list, address_list=addr_list)
302
    return skip_dict
303

    
304
  def _ConnectNode(self, client, node):
305
    """Helper for computing one node's address.
306

307
    @type client: L{Client}
308
    @param client: a C{Client} instance
309
    @type node: str
310
    @param node: the node we should connect
311

312
    """
313
    node_info = self._cfg.GetNodeInfo(node)
314
    if node_info is not None:
315
      if node_info.offline:
316
        return RpcResult(node=node, offline=True)
317
      addr = node_info.primary_ip
318
    else:
319
      addr = None
320
    client.ConnectNode(node, address=addr)
321

    
322
  def _MultiNodeCall(self, node_list, procedure, args):
323
    """Helper for making a multi-node call
324

325
    """
326
    body = serializer.DumpJson(args, indent=False)
327
    c = Client(procedure, body, self.port)
328
    skip_dict = self._ConnectList(c, node_list)
329
    skip_dict.update(c.GetResults())
330
    return skip_dict
331

    
332
  @classmethod
333
  def _StaticMultiNodeCall(cls, node_list, procedure, args,
334
                           address_list=None):
335
    """Helper for making a multi-node static call
336

337
    """
338
    body = serializer.DumpJson(args, indent=False)
339
    c = Client(procedure, body, utils.GetNodeDaemonPort())
340
    c.ConnectList(node_list, address_list=address_list)
341
    return c.GetResults()
342

    
343
  def _SingleNodeCall(self, node, procedure, args):
344
    """Helper for making a single-node call
345

346
    """
347
    body = serializer.DumpJson(args, indent=False)
348
    c = Client(procedure, body, self.port)
349
    result = self._ConnectNode(c, node)
350
    if result is None:
351
      # we did connect, node is not offline
352
      result = c.GetResults()[node]
353
    return result
354

    
355
  @classmethod
356
  def _StaticSingleNodeCall(cls, node, procedure, args):
357
    """Helper for making a single-node static call
358

359
    """
360
    body = serializer.DumpJson(args, indent=False)
361
    c = Client(procedure, body, utils.GetNodeDaemonPort())
362
    c.ConnectNode(node)
363
    return c.GetResults()[node]
364

    
365
  @staticmethod
366
  def _Compress(data):
367
    """Compresses a string for transport over RPC.
368

369
    Small amounts of data are not compressed.
370

371
    @type data: str
372
    @param data: Data
373
    @rtype: tuple
374
    @return: Encoded data to send
375

376
    """
377
    # Small amounts of data are not compressed
378
    if len(data) < 512:
379
      return (constants.RPC_ENCODING_NONE, data)
380

    
381
    # Compress with zlib and encode in base64
382
    return (constants.RPC_ENCODING_ZLIB_BASE64,
383
            base64.b64encode(zlib.compress(data, 3)))
384

    
385
  #
386
  # Begin RPC calls
387
  #
388

    
389
  def call_volume_list(self, node_list, vg_name):
390
    """Gets the logical volumes present in a given volume group.
391

392
    This is a multi-node call.
393

394
    """
395
    return self._MultiNodeCall(node_list, "volume_list", [vg_name])
396

    
397
  def call_vg_list(self, node_list):
398
    """Gets the volume group list.
399

400
    This is a multi-node call.
401

402
    """
403
    return self._MultiNodeCall(node_list, "vg_list", [])
404

    
405
  def call_bridges_exist(self, node, bridges_list):
406
    """Checks if a node has all the bridges given.
407

408
    This method checks if all bridges given in the bridges_list are
409
    present on the remote node, so that an instance that uses interfaces
410
    on those bridges can be started.
411

412
    This is a single-node call.
413

414
    """
415
    return self._SingleNodeCall(node, "bridges_exist", [bridges_list])
416

    
417
  def call_instance_start(self, node, instance, extra_args):
418
    """Starts an instance.
419

420
    This is a single-node call.
421

422
    """
423
    return self._SingleNodeCall(node, "instance_start",
424
                                [self._InstDict(instance), extra_args])
425

    
426
  def call_instance_shutdown(self, node, instance):
427
    """Stops an instance.
428

429
    This is a single-node call.
430

431
    """
432
    return self._SingleNodeCall(node, "instance_shutdown",
433
                                [self._InstDict(instance)])
434

    
435
  def call_migration_info(self, node, instance):
436
    """Gather the information necessary to prepare an instance migration.
437

438
    This is a single-node call.
439

440
    @type node: string
441
    @param node: the node on which the instance is currently running
442
    @type instance: C{objects.Instance}
443
    @param instance: the instance definition
444

445
    """
446
    return self._SingleNodeCall(node, "migration_info",
447
                                [self._InstDict(instance)])
448

    
449
  def call_accept_instance(self, node, instance, info, target):
450
    """Prepare a node to accept an instance.
451

452
    This is a single-node call.
453

454
    @type node: string
455
    @param node: the target node for the migration
456
    @type instance: C{objects.Instance}
457
    @param instance: the instance definition
458
    @type info: opaque/hypervisor specific (string/data)
459
    @param info: result for the call_migration_info call
460
    @type target: string
461
    @param target: target hostname (usually ip address) (on the node itself)
462

463
    """
464
    return self._SingleNodeCall(node, "accept_instance",
465
                                [self._InstDict(instance), info, target])
466

    
467
  def call_finalize_migration(self, node, instance, info, success):
468
    """Finalize any target-node migration specific operation.
469

470
    This is called both in case of a successful migration and in case of error
471
    (in which case it should abort the migration).
472

473
    This is a single-node call.
474

475
    @type node: string
476
    @param node: the target node for the migration
477
    @type instance: C{objects.Instance}
478
    @param instance: the instance definition
479
    @type info: opaque/hypervisor specific (string/data)
480
    @param info: result for the call_migration_info call
481
    @type success: boolean
482
    @param success: whether the migration was a success or a failure
483

484
    """
485
    return self._SingleNodeCall(node, "finalize_migration",
486
                                [self._InstDict(instance), info, success])
487

    
488
  def call_instance_migrate(self, node, instance, target, live):
489
    """Migrate an instance.
490

491
    This is a single-node call.
492

493
    @type node: string
494
    @param node: the node on which the instance is currently running
495
    @type instance: C{objects.Instance}
496
    @param instance: the instance definition
497
    @type target: string
498
    @param target: the target node name
499
    @type live: boolean
500
    @param live: whether the migration should be done live or not (the
501
        interpretation of this parameter is left to the hypervisor)
502

503
    """
504
    return self._SingleNodeCall(node, "instance_migrate",
505
                                [self._InstDict(instance), target, live])
506

    
507
  def call_instance_reboot(self, node, instance, reboot_type, extra_args):
508
    """Reboots an instance.
509

510
    This is a single-node call.
511

512
    """
513
    return self._SingleNodeCall(node, "instance_reboot",
514
                                [self._InstDict(instance), reboot_type,
515
                                 extra_args])
516

    
517
  def call_instance_os_add(self, node, inst):
518
    """Installs an OS on the given instance.
519

520
    This is a single-node call.
521

522
    """
523
    return self._SingleNodeCall(node, "instance_os_add",
524
                                [self._InstDict(inst)])
525

    
526
  def call_instance_run_rename(self, node, inst, old_name):
527
    """Run the OS rename script for an instance.
528

529
    This is a single-node call.
530

531
    """
532
    return self._SingleNodeCall(node, "instance_run_rename",
533
                                [self._InstDict(inst), old_name])
534

    
535
  def call_instance_info(self, node, instance, hname):
536
    """Returns information about a single instance.
537

538
    This is a single-node call.
539

540
    @type node: list
541
    @param node: the list of nodes to query
542
    @type instance: string
543
    @param instance: the instance name
544
    @type hname: string
545
    @param hname: the hypervisor type of the instance
546

547
    """
548
    return self._SingleNodeCall(node, "instance_info", [instance, hname])
549

    
550
  def call_instance_migratable(self, node, instance):
551
    """Checks whether the given instance can be migrated.
552

553
    This is a single-node call.
554

555
    @param node: the node to query
556
    @type instance: L{objects.Instance}
557
    @param instance: the instance to check
558

559

560
    """
561
    return self._SingleNodeCall(node, "instance_migratable",
562
                                [self._InstDict(instance)])
563

    
564
  def call_all_instances_info(self, node_list, hypervisor_list):
565
    """Returns information about all instances on the given nodes.
566

567
    This is a multi-node call.
568

569
    @type node_list: list
570
    @param node_list: the list of nodes to query
571
    @type hypervisor_list: list
572
    @param hypervisor_list: the hypervisors to query for instances
573

574
    """
575
    return self._MultiNodeCall(node_list, "all_instances_info",
576
                               [hypervisor_list])
577

    
578
  def call_instance_list(self, node_list, hypervisor_list):
579
    """Returns the list of running instances on a given node.
580

581
    This is a multi-node call.
582

583
    @type node_list: list
584
    @param node_list: the list of nodes to query
585
    @type hypervisor_list: list
586
    @param hypervisor_list: the hypervisors to query for instances
587

588
    """
589
    return self._MultiNodeCall(node_list, "instance_list", [hypervisor_list])
590

    
591
  def call_node_tcp_ping(self, node, source, target, port, timeout,
592
                         live_port_needed):
593
    """Do a TcpPing on the remote node
594

595
    This is a single-node call.
596

597
    """
598
    return self._SingleNodeCall(node, "node_tcp_ping",
599
                                [source, target, port, timeout,
600
                                 live_port_needed])
601

    
602
  def call_node_has_ip_address(self, node, address):
603
    """Checks if a node has the given IP address.
604

605
    This is a single-node call.
606

607
    """
608
    return self._SingleNodeCall(node, "node_has_ip_address", [address])
609

    
610
  def call_node_info(self, node_list, vg_name, hypervisor_type):
611
    """Return node information.
612

613
    This will return memory information and volume group size and free
614
    space.
615

616
    This is a multi-node call.
617

618
    @type node_list: list
619
    @param node_list: the list of nodes to query
620
    @type vg_name: C{string}
621
    @param vg_name: the name of the volume group to ask for disk space
622
        information
623
    @type hypervisor_type: C{str}
624
    @param hypervisor_type: the name of the hypervisor to ask for
625
        memory information
626

627
    """
628
    retux = self._MultiNodeCall(node_list, "node_info",
629
                                [vg_name, hypervisor_type])
630

    
631
    for result in retux.itervalues():
632
      if result.failed or not isinstance(result.data, dict):
633
        result.data = {}
634
      if result.offline:
635
        log_name = None
636
      else:
637
        log_name = "call_node_info"
638

    
639
      utils.CheckDict(result.data, {
640
        'memory_total' : '-',
641
        'memory_dom0' : '-',
642
        'memory_free' : '-',
643
        'vg_size' : 'node_unreachable',
644
        'vg_free' : '-',
645
        }, log_name)
646
    return retux
647

    
648
  def call_node_add(self, node, dsa, dsapub, rsa, rsapub, ssh, sshpub):
649
    """Add a node to the cluster.
650

651
    This is a single-node call.
652

653
    """
654
    return self._SingleNodeCall(node, "node_add",
655
                                [dsa, dsapub, rsa, rsapub, ssh, sshpub])
656

    
657
  def call_node_verify(self, node_list, checkdict, cluster_name):
658
    """Request verification of given parameters.
659

660
    This is a multi-node call.
661

662
    """
663
    return self._MultiNodeCall(node_list, "node_verify",
664
                               [checkdict, cluster_name])
665

    
666
  @classmethod
667
  def call_node_start_master(cls, node, start_daemons):
668
    """Tells a node to activate itself as a master.
669

670
    This is a single-node call.
671

672
    """
673
    return cls._StaticSingleNodeCall(node, "node_start_master",
674
                                     [start_daemons])
675

    
676
  @classmethod
677
  def call_node_stop_master(cls, node, stop_daemons):
678
    """Tells a node to demote itself from master status.
679

680
    This is a single-node call.
681

682
    """
683
    return cls._StaticSingleNodeCall(node, "node_stop_master", [stop_daemons])
684

    
685
  @classmethod
686
  def call_master_info(cls, node_list):
687
    """Query master info.
688

689
    This is a multi-node call.
690

691
    """
692
    # TODO: should this method query down nodes?
693
    return cls._StaticMultiNodeCall(node_list, "master_info", [])
694

    
695
  def call_version(self, node_list):
696
    """Query node version.
697

698
    This is a multi-node call.
699

700
    """
701
    return self._MultiNodeCall(node_list, "version", [])
702

    
703
  def call_blockdev_create(self, node, bdev, size, owner, on_primary, info):
704
    """Request creation of a given block device.
705

706
    This is a single-node call.
707

708
    """
709
    return self._SingleNodeCall(node, "blockdev_create",
710
                                [bdev.ToDict(), size, owner, on_primary, info])
711

    
712
  def call_blockdev_remove(self, node, bdev):
713
    """Request removal of a given block device.
714

715
    This is a single-node call.
716

717
    """
718
    return self._SingleNodeCall(node, "blockdev_remove", [bdev.ToDict()])
719

    
720
  def call_blockdev_rename(self, node, devlist):
721
    """Request rename of the given block devices.
722

723
    This is a single-node call.
724

725
    """
726
    return self._SingleNodeCall(node, "blockdev_rename",
727
                                [(d.ToDict(), uid) for d, uid in devlist])
728

    
729
  def call_blockdev_assemble(self, node, disk, owner, on_primary):
730
    """Request assembling of a given block device.
731

732
    This is a single-node call.
733

734
    """
735
    return self._SingleNodeCall(node, "blockdev_assemble",
736
                                [disk.ToDict(), owner, on_primary])
737

    
738
  def call_blockdev_shutdown(self, node, disk):
739
    """Request shutdown of a given block device.
740

741
    This is a single-node call.
742

743
    """
744
    return self._SingleNodeCall(node, "blockdev_shutdown", [disk.ToDict()])
745

    
746
  def call_blockdev_addchildren(self, node, bdev, ndevs):
747
    """Request adding a list of children to a (mirroring) device.
748

749
    This is a single-node call.
750

751
    """
752
    return self._SingleNodeCall(node, "blockdev_addchildren",
753
                                [bdev.ToDict(),
754
                                 [disk.ToDict() for disk in ndevs]])
755

    
756
  def call_blockdev_removechildren(self, node, bdev, ndevs):
757
    """Request removing a list of children from a (mirroring) device.
758

759
    This is a single-node call.
760

761
    """
762
    return self._SingleNodeCall(node, "blockdev_removechildren",
763
                                [bdev.ToDict(),
764
                                 [disk.ToDict() for disk in ndevs]])
765

    
766
  def call_blockdev_getmirrorstatus(self, node, disks):
767
    """Request status of a (mirroring) device.
768

769
    This is a single-node call.
770

771
    """
772
    return self._SingleNodeCall(node, "blockdev_getmirrorstatus",
773
                                [dsk.ToDict() for dsk in disks])
774

    
775
  def call_blockdev_find(self, node, disk):
776
    """Request identification of a given block device.
777

778
    This is a single-node call.
779

780
    """
781
    return self._SingleNodeCall(node, "blockdev_find", [disk.ToDict()])
782

    
783
  def call_blockdev_close(self, node, instance_name, disks):
784
    """Closes the given block devices.
785

786
    This is a single-node call.
787

788
    """
789
    params = [instance_name, [cf.ToDict() for cf in disks]]
790
    return self._SingleNodeCall(node, "blockdev_close", params)
791

    
792
  def call_drbd_disconnect_net(self, node_list, nodes_ip, disks):
793
    """Disconnects the network of the given drbd devices.
794

795
    This is a multi-node call.
796

797
    """
798
    return self._MultiNodeCall(node_list, "drbd_disconnect_net",
799
                               [nodes_ip, [cf.ToDict() for cf in disks]])
800

    
801
  def call_drbd_attach_net(self, node_list, nodes_ip,
802
                           disks, instance_name, multimaster):
803
    """Disconnects the given drbd devices.
804

805
    This is a multi-node call.
806

807
    """
808
    return self._MultiNodeCall(node_list, "drbd_attach_net",
809
                               [nodes_ip, [cf.ToDict() for cf in disks],
810
                                instance_name, multimaster])
811

    
812
  def call_drbd_wait_sync(self, node_list, nodes_ip, disks):
813
    """Waits for the synchronization of drbd devices is complete.
814

815
    This is a multi-node call.
816

817
    """
818
    return self._MultiNodeCall(node_list, "drbd_wait_sync",
819
                               [nodes_ip, [cf.ToDict() for cf in disks]])
820

    
821
  @classmethod
822
  def call_upload_file(cls, node_list, file_name, address_list=None):
823
    """Upload a file.
824

825
    The node will refuse the operation in case the file is not on the
826
    approved file list.
827

828
    This is a multi-node call.
829

830
    @type node_list: list
831
    @param node_list: the list of node names to upload to
832
    @type file_name: str
833
    @param file_name: the filename to upload
834
    @type address_list: list or None
835
    @keyword address_list: an optional list of node addresses, in order
836
        to optimize the RPC speed
837

838
    """
839
    file_contents = utils.ReadFile(file_name)
840
    data = cls._Compress(file_contents)
841
    st = os.stat(file_name)
842
    params = [file_name, data, st.st_mode, st.st_uid, st.st_gid,
843
              st.st_atime, st.st_mtime]
844
    return cls._StaticMultiNodeCall(node_list, "upload_file", params,
845
                                    address_list=address_list)
846

    
847
  @classmethod
848
  def call_write_ssconf_files(cls, node_list, values):
849
    """Write ssconf files.
850

851
    This is a multi-node call.
852

853
    """
854
    return cls._StaticMultiNodeCall(node_list, "write_ssconf_files", [values])
855

    
856
  def call_os_diagnose(self, node_list):
857
    """Request a diagnose of OS definitions.
858

859
    This is a multi-node call.
860

861
    """
862
    result = self._MultiNodeCall(node_list, "os_diagnose", [])
863

    
864
    for node_result in result.values():
865
      if not node_result.failed and node_result.data:
866
        node_result.data = [objects.OS.FromDict(oss)
867
                            for oss in node_result.data]
868
    return result
869

    
870
  def call_os_get(self, node, name):
871
    """Returns an OS definition.
872

873
    This is a single-node call.
874

875
    """
876
    result = self._SingleNodeCall(node, "os_get", [name])
877
    if not result.failed and isinstance(result.data, dict):
878
      result.data = objects.OS.FromDict(result.data)
879
    return result
880

    
881
  def call_hooks_runner(self, node_list, hpath, phase, env):
882
    """Call the hooks runner.
883

884
    Args:
885
      - op: the OpCode instance
886
      - env: a dictionary with the environment
887

888
    This is a multi-node call.
889

890
    """
891
    params = [hpath, phase, env]
892
    return self._MultiNodeCall(node_list, "hooks_runner", params)
893

    
894
  def call_iallocator_runner(self, node, name, idata):
895
    """Call an iallocator on a remote node
896

897
    Args:
898
      - name: the iallocator name
899
      - input: the json-encoded input string
900

901
    This is a single-node call.
902

903
    """
904
    return self._SingleNodeCall(node, "iallocator_runner", [name, idata])
905

    
906
  def call_blockdev_grow(self, node, cf_bdev, amount):
907
    """Request a snapshot of the given block device.
908

909
    This is a single-node call.
910

911
    """
912
    return self._SingleNodeCall(node, "blockdev_grow",
913
                                [cf_bdev.ToDict(), amount])
914

    
915
  def call_blockdev_snapshot(self, node, cf_bdev):
916
    """Request a snapshot of the given block device.
917

918
    This is a single-node call.
919

920
    """
921
    return self._SingleNodeCall(node, "blockdev_snapshot", [cf_bdev.ToDict()])
922

    
923
  def call_snapshot_export(self, node, snap_bdev, dest_node, instance,
924
                           cluster_name, idx):
925
    """Request the export of a given snapshot.
926

927
    This is a single-node call.
928

929
    """
930
    return self._SingleNodeCall(node, "snapshot_export",
931
                                [snap_bdev.ToDict(), dest_node,
932
                                 self._InstDict(instance), cluster_name, idx])
933

    
934
  def call_finalize_export(self, node, instance, snap_disks):
935
    """Request the completion of an export operation.
936

937
    This writes the export config file, etc.
938

939
    This is a single-node call.
940

941
    """
942
    flat_disks = []
943
    for disk in snap_disks:
944
      flat_disks.append(disk.ToDict())
945

    
946
    return self._SingleNodeCall(node, "finalize_export",
947
                                [self._InstDict(instance), flat_disks])
948

    
949
  def call_export_info(self, node, path):
950
    """Queries the export information in a given path.
951

952
    This is a single-node call.
953

954
    """
955
    result = self._SingleNodeCall(node, "export_info", [path])
956
    if not result.failed and result.data:
957
      result.data = objects.SerializableConfigParser.Loads(str(result.data))
958
    return result
959

    
960
  def call_instance_os_import(self, node, inst, src_node, src_images,
961
                              cluster_name):
962
    """Request the import of a backup into an instance.
963

964
    This is a single-node call.
965

966
    """
967
    return self._SingleNodeCall(node, "instance_os_import",
968
                                [self._InstDict(inst), src_node, src_images,
969
                                 cluster_name])
970

    
971
  def call_export_list(self, node_list):
972
    """Gets the stored exports list.
973

974
    This is a multi-node call.
975

976
    """
977
    return self._MultiNodeCall(node_list, "export_list", [])
978

    
979
  def call_export_remove(self, node, export):
980
    """Requests removal of a given export.
981

982
    This is a single-node call.
983

984
    """
985
    return self._SingleNodeCall(node, "export_remove", [export])
986

    
987
  @classmethod
988
  def call_node_leave_cluster(cls, node):
989
    """Requests a node to clean the cluster information it has.
990

991
    This will remove the configuration information from the ganeti data
992
    dir.
993

994
    This is a single-node call.
995

996
    """
997
    return cls._StaticSingleNodeCall(node, "node_leave_cluster", [])
998

    
999
  def call_node_volumes(self, node_list):
1000
    """Gets all volumes on node(s).
1001

1002
    This is a multi-node call.
1003

1004
    """
1005
    return self._MultiNodeCall(node_list, "node_volumes", [])
1006

    
1007
  def call_node_demote_from_mc(self, node):
1008
    """Demote a node from the master candidate role.
1009

1010
    This is a single-node call.
1011

1012
    """
1013
    return self._SingleNodeCall(node, "node_demote_from_mc", [])
1014

    
1015
  def call_test_delay(self, node_list, duration):
1016
    """Sleep for a fixed time on given node(s).
1017

1018
    This is a multi-node call.
1019

1020
    """
1021
    return self._MultiNodeCall(node_list, "test_delay", [duration])
1022

    
1023
  def call_file_storage_dir_create(self, node, file_storage_dir):
1024
    """Create the given file storage directory.
1025

1026
    This is a single-node call.
1027

1028
    """
1029
    return self._SingleNodeCall(node, "file_storage_dir_create",
1030
                                [file_storage_dir])
1031

    
1032
  def call_file_storage_dir_remove(self, node, file_storage_dir):
1033
    """Remove the given file storage directory.
1034

1035
    This is a single-node call.
1036

1037
    """
1038
    return self._SingleNodeCall(node, "file_storage_dir_remove",
1039
                                [file_storage_dir])
1040

    
1041
  def call_file_storage_dir_rename(self, node, old_file_storage_dir,
1042
                                   new_file_storage_dir):
1043
    """Rename file storage directory.
1044

1045
    This is a single-node call.
1046

1047
    """
1048
    return self._SingleNodeCall(node, "file_storage_dir_rename",
1049
                                [old_file_storage_dir, new_file_storage_dir])
1050

    
1051
  @classmethod
1052
  def call_jobqueue_update(cls, node_list, address_list, file_name, content):
1053
    """Update job queue.
1054

1055
    This is a multi-node call.
1056

1057
    """
1058
    return cls._StaticMultiNodeCall(node_list, "jobqueue_update",
1059
                                    [file_name, cls._Compress(content)],
1060
                                    address_list=address_list)
1061

    
1062
  @classmethod
1063
  def call_jobqueue_purge(cls, node):
1064
    """Purge job queue.
1065

1066
    This is a single-node call.
1067

1068
    """
1069
    return cls._StaticSingleNodeCall(node, "jobqueue_purge", [])
1070

    
1071
  @classmethod
1072
  def call_jobqueue_rename(cls, node_list, address_list, rename):
1073
    """Rename a job queue file.
1074

1075
    This is a multi-node call.
1076

1077
    """
1078
    return cls._StaticMultiNodeCall(node_list, "jobqueue_rename", rename,
1079
                                    address_list=address_list)
1080

    
1081
  @classmethod
1082
  def call_jobqueue_set_drain(cls, node_list, drain_flag):
1083
    """Set the drain flag on the queue.
1084

1085
    This is a multi-node call.
1086

1087
    @type node_list: list
1088
    @param node_list: the list of nodes to query
1089
    @type drain_flag: bool
1090
    @param drain_flag: if True, will set the drain flag, otherwise reset it.
1091

1092
    """
1093
    return cls._StaticMultiNodeCall(node_list, "jobqueue_set_drain",
1094
                                    [drain_flag])
1095

    
1096
  def call_hypervisor_validate_params(self, node_list, hvname, hvparams):
1097
    """Validate the hypervisor params.
1098

1099
    This is a multi-node call.
1100

1101
    @type node_list: list
1102
    @param node_list: the list of nodes to query
1103
    @type hvname: string
1104
    @param hvname: the hypervisor name
1105
    @type hvparams: dict
1106
    @param hvparams: the hypervisor parameters to be validated
1107

1108
    """
1109
    cluster = self._cfg.GetClusterInfo()
1110
    hv_full = cluster.FillDict(cluster.hvparams.get(hvname, {}), hvparams)
1111
    return self._MultiNodeCall(node_list, "hypervisor_validate_params",
1112
                               [hvname, hv_full])