Statistics
| Branch: | Tag: | Revision:

root / lib / rpc.py @ 85414b69

History | View | Annotate | Download (32.7 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Inter-node RPC library.
23

24
"""
25

    
26
# pylint: disable-msg=C0103,R0201,R0904
27
# C0103: Invalid name, since call_ are not valid
28
# R0201: Method could be a function, we keep all rpcs instance methods
29
# as not to change them back and forth between static/instance methods
30
# if they need to start using instance attributes
31
# R0904: Too many public methods
32

    
33
import os
34
import socket
35
import logging
36
import zlib
37
import base64
38

    
39
from ganeti import utils
40
from ganeti import objects
41
from ganeti import http
42
from ganeti import serializer
43
from ganeti import constants
44
from ganeti import errors
45

    
46
import ganeti.http.client
47

    
48

    
49
# Module level variable
50
_http_manager = None
51

    
52

    
53
def Init():
54
  """Initializes the module-global HTTP client manager.
55

56
  Must be called before using any RPC function.
57

58
  """
59
  global _http_manager
60

    
61
  assert not _http_manager, "RPC module initialized more than once"
62

    
63
  _http_manager = http.client.HttpClientManager()
64

    
65

    
66
def Shutdown():
67
  """Stops the module-global HTTP client manager.
68

69
  Must be called before quitting the program.
70

71
  """
72
  global _http_manager
73

    
74
  if _http_manager:
75
    _http_manager.Shutdown()
76
    _http_manager = None
77

    
78

    
79
class RpcResult(object):
80
  """RPC Result class.
81

82
  This class holds an RPC result. It is needed since in multi-node
83
  calls we can't raise an exception just because one one out of many
84
  failed, and therefore we use this class to encapsulate the result.
85

86
  @ivar data: the data payload, for successfull results, or None
87
  @type failed: boolean
88
  @ivar failed: whether the operation failed at RPC level (not
89
      application level on the remote node)
90
  @ivar call: the name of the RPC call
91
  @ivar node: the name of the node to which we made the call
92
  @ivar offline: whether the operation failed because the node was
93
      offline, as opposed to actual failure; offline=True will always
94
      imply failed=True, in order to allow simpler checking if
95
      the user doesn't care about the exact failure mode
96

97
  """
98
  def __init__(self, data=None, failed=False, offline=False,
99
               call=None, node=None):
100
    self.failed = failed
101
    self.offline = offline
102
    self.call = call
103
    self.node = node
104
    if offline:
105
      self.failed = True
106
      self.error = "Node is marked offline"
107
      self.data = None
108
    elif failed:
109
      self.error = data
110
      self.data = None
111
    else:
112
      self.data = data
113
      self.error = None
114

    
115
  def Raise(self):
116
    """If the result has failed, raise an OpExecError.
117

118
    This is used so that LU code doesn't have to check for each
119
    result, but instead can call this function.
120

121
    """
122
    if self.failed:
123
      raise errors.OpExecError("Call '%s' to node '%s' has failed: %s" %
124
                               (self.call, self.node, self.error))
125

    
126
  def RemoteFailMsg(self):
127
    """Check if the remote procedure failed.
128

129
    This is valid only for RPC calls which return result of the form
130
    (status, data | error_msg).
131

132
    @return: empty string for succcess, otherwise an error message
133

134
    """
135
    def _EnsureErr(val):
136
      """Helper to ensure we return a 'True' value for error."""
137
      if val:
138
        return val
139
      else:
140
        return "No error information"
141

    
142
    if self.failed:
143
      return _EnsureErr(self.error)
144
    if not isinstance(self.data, (tuple, list)):
145
      return "Invalid result type (%s)" % type(self.data)
146
    if len(self.data) != 2:
147
      return "Invalid result length (%d), expected 2" % len(self.data)
148
    if not self.data[0]:
149
      return _EnsureErr(self.data[1])
150
    return ""
151

    
152

    
153
class Client:
154
  """RPC Client class.
155

156
  This class, given a (remote) method name, a list of parameters and a
157
  list of nodes, will contact (in parallel) all nodes, and return a
158
  dict of results (key: node name, value: result).
159

160
  One current bug is that generic failure is still signalled by
161
  'False' result, which is not good. This overloading of values can
162
  cause bugs.
163

164
  """
165
  def __init__(self, procedure, body, port):
166
    self.procedure = procedure
167
    self.body = body
168
    self.port = port
169
    self.nc = {}
170

    
171
    self._ssl_params = \
172
      http.HttpSslParams(ssl_key_path=constants.SSL_CERT_FILE,
173
                         ssl_cert_path=constants.SSL_CERT_FILE)
174

    
175
  def ConnectList(self, node_list, address_list=None):
176
    """Add a list of nodes to the target nodes.
177

178
    @type node_list: list
179
    @param node_list: the list of node names to connect
180
    @type address_list: list or None
181
    @keyword address_list: either None or a list with node addresses,
182
        which must have the same length as the node list
183

184
    """
185
    if address_list is None:
186
      address_list = [None for _ in node_list]
187
    else:
188
      assert len(node_list) == len(address_list), \
189
             "Name and address lists should have the same length"
190
    for node, address in zip(node_list, address_list):
191
      self.ConnectNode(node, address)
192

    
193
  def ConnectNode(self, name, address=None):
194
    """Add a node to the target list.
195

196
    @type name: str
197
    @param name: the node name
198
    @type address: str
199
    @keyword address: the node address, if known
200

201
    """
202
    if address is None:
203
      address = name
204

    
205
    self.nc[name] = \
206
      http.client.HttpClientRequest(address, self.port, http.HTTP_PUT,
207
                                    "/%s" % self.procedure,
208
                                    post_data=self.body,
209
                                    ssl_params=self._ssl_params,
210
                                    ssl_verify_peer=True)
211

    
212
  def GetResults(self):
213
    """Call nodes and return results.
214

215
    @rtype: list
216
    @returns: List of RPC results
217

218
    """
219
    assert _http_manager, "RPC module not intialized"
220

    
221
    _http_manager.ExecRequests(self.nc.values())
222

    
223
    results = {}
224

    
225
    for name, req in self.nc.iteritems():
226
      if req.success and req.resp_status_code == http.HTTP_OK:
227
        results[name] = RpcResult(data=serializer.LoadJson(req.resp_body),
228
                                  node=name, call=self.procedure)
229
        continue
230

    
231
      # TODO: Better error reporting
232
      if req.error:
233
        msg = req.error
234
      else:
235
        msg = req.resp_body
236

    
237
      logging.error("RPC error in %s from node %s: %s",
238
                    self.procedure, name, msg)
239
      results[name] = RpcResult(data=msg, failed=True, node=name,
240
                                call=self.procedure)
241

    
242
    return results
243

    
244

    
245
class RpcRunner(object):
246
  """RPC runner class"""
247

    
248
  def __init__(self, cfg):
249
    """Initialized the rpc runner.
250

251
    @type cfg:  C{config.ConfigWriter}
252
    @param cfg: the configuration object that will be used to get data
253
                about the cluster
254

255
    """
256
    self._cfg = cfg
257
    self.port = utils.GetNodeDaemonPort()
258

    
259
  def _InstDict(self, instance):
260
    """Convert the given instance to a dict.
261

262
    This is done via the instance's ToDict() method and additionally
263
    we fill the hvparams with the cluster defaults.
264

265
    @type instance: L{objects.Instance}
266
    @param instance: an Instance object
267
    @rtype: dict
268
    @return: the instance dict, with the hvparams filled with the
269
        cluster defaults
270

271
    """
272
    idict = instance.ToDict()
273
    cluster = self._cfg.GetClusterInfo()
274
    idict["hvparams"] = cluster.FillHV(instance)
275
    idict["beparams"] = cluster.FillBE(instance)
276
    return idict
277

    
278
  def _ConnectList(self, client, node_list, call):
279
    """Helper for computing node addresses.
280

281
    @type client: L{Client}
282
    @param client: a C{Client} instance
283
    @type node_list: list
284
    @param node_list: the node list we should connect
285
    @type call: string
286
    @param call: the name of the remote procedure call, for filling in
287
        correctly any eventual offline nodes' results
288

289
    """
290
    all_nodes = self._cfg.GetAllNodesInfo()
291
    name_list = []
292
    addr_list = []
293
    skip_dict = {}
294
    for node in node_list:
295
      if node in all_nodes:
296
        if all_nodes[node].offline:
297
          skip_dict[node] = RpcResult(node=node, offline=True, call=call)
298
          continue
299
        val = all_nodes[node].primary_ip
300
      else:
301
        val = None
302
      addr_list.append(val)
303
      name_list.append(node)
304
    if name_list:
305
      client.ConnectList(name_list, address_list=addr_list)
306
    return skip_dict
307

    
308
  def _ConnectNode(self, client, node, call):
309
    """Helper for computing one node's address.
310

311
    @type client: L{Client}
312
    @param client: a C{Client} instance
313
    @type node: str
314
    @param node: the node we should connect
315
    @type call: string
316
    @param call: the name of the remote procedure call, for filling in
317
        correctly any eventual offline nodes' results
318

319
    """
320
    node_info = self._cfg.GetNodeInfo(node)
321
    if node_info is not None:
322
      if node_info.offline:
323
        return RpcResult(node=node, offline=True, call=call)
324
      addr = node_info.primary_ip
325
    else:
326
      addr = None
327
    client.ConnectNode(node, address=addr)
328

    
329
  def _MultiNodeCall(self, node_list, procedure, args):
330
    """Helper for making a multi-node call
331

332
    """
333
    body = serializer.DumpJson(args, indent=False)
334
    c = Client(procedure, body, self.port)
335
    skip_dict = self._ConnectList(c, node_list, procedure)
336
    skip_dict.update(c.GetResults())
337
    return skip_dict
338

    
339
  @classmethod
340
  def _StaticMultiNodeCall(cls, node_list, procedure, args,
341
                           address_list=None):
342
    """Helper for making a multi-node static call
343

344
    """
345
    body = serializer.DumpJson(args, indent=False)
346
    c = Client(procedure, body, utils.GetNodeDaemonPort())
347
    c.ConnectList(node_list, address_list=address_list)
348
    return c.GetResults()
349

    
350
  def _SingleNodeCall(self, node, procedure, args):
351
    """Helper for making a single-node call
352

353
    """
354
    body = serializer.DumpJson(args, indent=False)
355
    c = Client(procedure, body, self.port)
356
    result = self._ConnectNode(c, node, procedure)
357
    if result is None:
358
      # we did connect, node is not offline
359
      result = c.GetResults()[node]
360
    return result
361

    
362
  @classmethod
363
  def _StaticSingleNodeCall(cls, node, procedure, args):
364
    """Helper for making a single-node static call
365

366
    """
367
    body = serializer.DumpJson(args, indent=False)
368
    c = Client(procedure, body, utils.GetNodeDaemonPort())
369
    c.ConnectNode(node)
370
    return c.GetResults()[node]
371

    
372
  @staticmethod
373
  def _Compress(data):
374
    """Compresses a string for transport over RPC.
375

376
    Small amounts of data are not compressed.
377

378
    @type data: str
379
    @param data: Data
380
    @rtype: tuple
381
    @return: Encoded data to send
382

383
    """
384
    # Small amounts of data are not compressed
385
    if len(data) < 512:
386
      return (constants.RPC_ENCODING_NONE, data)
387

    
388
    # Compress with zlib and encode in base64
389
    return (constants.RPC_ENCODING_ZLIB_BASE64,
390
            base64.b64encode(zlib.compress(data, 3)))
391

    
392
  #
393
  # Begin RPC calls
394
  #
395

    
396
  def call_volume_list(self, node_list, vg_name):
397
    """Gets the logical volumes present in a given volume group.
398

399
    This is a multi-node call.
400

401
    """
402
    return self._MultiNodeCall(node_list, "volume_list", [vg_name])
403

    
404
  def call_vg_list(self, node_list):
405
    """Gets the volume group list.
406

407
    This is a multi-node call.
408

409
    """
410
    return self._MultiNodeCall(node_list, "vg_list", [])
411

    
412
  def call_bridges_exist(self, node, bridges_list):
413
    """Checks if a node has all the bridges given.
414

415
    This method checks if all bridges given in the bridges_list are
416
    present on the remote node, so that an instance that uses interfaces
417
    on those bridges can be started.
418

419
    This is a single-node call.
420

421
    """
422
    return self._SingleNodeCall(node, "bridges_exist", [bridges_list])
423

    
424
  def call_instance_start(self, node, instance, extra_args):
425
    """Starts an instance.
426

427
    This is a single-node call.
428

429
    """
430
    return self._SingleNodeCall(node, "instance_start",
431
                                [self._InstDict(instance), extra_args])
432

    
433
  def call_instance_shutdown(self, node, instance):
434
    """Stops an instance.
435

436
    This is a single-node call.
437

438
    """
439
    return self._SingleNodeCall(node, "instance_shutdown",
440
                                [self._InstDict(instance)])
441

    
442
  def call_migration_info(self, node, instance):
443
    """Gather the information necessary to prepare an instance migration.
444

445
    This is a single-node call.
446

447
    @type node: string
448
    @param node: the node on which the instance is currently running
449
    @type instance: C{objects.Instance}
450
    @param instance: the instance definition
451

452
    """
453
    return self._SingleNodeCall(node, "migration_info",
454
                                [self._InstDict(instance)])
455

    
456
  def call_accept_instance(self, node, instance, info, target):
457
    """Prepare a node to accept an instance.
458

459
    This is a single-node call.
460

461
    @type node: string
462
    @param node: the target node for the migration
463
    @type instance: C{objects.Instance}
464
    @param instance: the instance definition
465
    @type info: opaque/hypervisor specific (string/data)
466
    @param info: result for the call_migration_info call
467
    @type target: string
468
    @param target: target hostname (usually ip address) (on the node itself)
469

470
    """
471
    return self._SingleNodeCall(node, "accept_instance",
472
                                [self._InstDict(instance), info, target])
473

    
474
  def call_finalize_migration(self, node, instance, info, success):
475
    """Finalize any target-node migration specific operation.
476

477
    This is called both in case of a successful migration and in case of error
478
    (in which case it should abort the migration).
479

480
    This is a single-node call.
481

482
    @type node: string
483
    @param node: the target node for the migration
484
    @type instance: C{objects.Instance}
485
    @param instance: the instance definition
486
    @type info: opaque/hypervisor specific (string/data)
487
    @param info: result for the call_migration_info call
488
    @type success: boolean
489
    @param success: whether the migration was a success or a failure
490

491
    """
492
    return self._SingleNodeCall(node, "finalize_migration",
493
                                [self._InstDict(instance), info, success])
494

    
495
  def call_instance_migrate(self, node, instance, target, live):
496
    """Migrate an instance.
497

498
    This is a single-node call.
499

500
    @type node: string
501
    @param node: the node on which the instance is currently running
502
    @type instance: C{objects.Instance}
503
    @param instance: the instance definition
504
    @type target: string
505
    @param target: the target node name
506
    @type live: boolean
507
    @param live: whether the migration should be done live or not (the
508
        interpretation of this parameter is left to the hypervisor)
509

510
    """
511
    return self._SingleNodeCall(node, "instance_migrate",
512
                                [self._InstDict(instance), target, live])
513

    
514
  def call_instance_reboot(self, node, instance, reboot_type, extra_args):
515
    """Reboots an instance.
516

517
    This is a single-node call.
518

519
    """
520
    return self._SingleNodeCall(node, "instance_reboot",
521
                                [self._InstDict(instance), reboot_type,
522
                                 extra_args])
523

    
524
  def call_instance_os_add(self, node, inst):
525
    """Installs an OS on the given instance.
526

527
    This is a single-node call.
528

529
    """
530
    return self._SingleNodeCall(node, "instance_os_add",
531
                                [self._InstDict(inst)])
532

    
533
  def call_instance_run_rename(self, node, inst, old_name):
534
    """Run the OS rename script for an instance.
535

536
    This is a single-node call.
537

538
    """
539
    return self._SingleNodeCall(node, "instance_run_rename",
540
                                [self._InstDict(inst), old_name])
541

    
542
  def call_instance_info(self, node, instance, hname):
543
    """Returns information about a single instance.
544

545
    This is a single-node call.
546

547
    @type node: list
548
    @param node: the list of nodes to query
549
    @type instance: string
550
    @param instance: the instance name
551
    @type hname: string
552
    @param hname: the hypervisor type of the instance
553

554
    """
555
    return self._SingleNodeCall(node, "instance_info", [instance, hname])
556

    
557
  def call_instance_migratable(self, node, instance):
558
    """Checks whether the given instance can be migrated.
559

560
    This is a single-node call.
561

562
    @param node: the node to query
563
    @type instance: L{objects.Instance}
564
    @param instance: the instance to check
565

566

567
    """
568
    return self._SingleNodeCall(node, "instance_migratable",
569
                                [self._InstDict(instance)])
570

    
571
  def call_all_instances_info(self, node_list, hypervisor_list):
572
    """Returns information about all instances on the given nodes.
573

574
    This is a multi-node call.
575

576
    @type node_list: list
577
    @param node_list: the list of nodes to query
578
    @type hypervisor_list: list
579
    @param hypervisor_list: the hypervisors to query for instances
580

581
    """
582
    return self._MultiNodeCall(node_list, "all_instances_info",
583
                               [hypervisor_list])
584

    
585
  def call_instance_list(self, node_list, hypervisor_list):
586
    """Returns the list of running instances on a given node.
587

588
    This is a multi-node call.
589

590
    @type node_list: list
591
    @param node_list: the list of nodes to query
592
    @type hypervisor_list: list
593
    @param hypervisor_list: the hypervisors to query for instances
594

595
    """
596
    return self._MultiNodeCall(node_list, "instance_list", [hypervisor_list])
597

    
598
  def call_node_tcp_ping(self, node, source, target, port, timeout,
599
                         live_port_needed):
600
    """Do a TcpPing on the remote node
601

602
    This is a single-node call.
603

604
    """
605
    return self._SingleNodeCall(node, "node_tcp_ping",
606
                                [source, target, port, timeout,
607
                                 live_port_needed])
608

    
609
  def call_node_has_ip_address(self, node, address):
610
    """Checks if a node has the given IP address.
611

612
    This is a single-node call.
613

614
    """
615
    return self._SingleNodeCall(node, "node_has_ip_address", [address])
616

    
617
  def call_node_info(self, node_list, vg_name, hypervisor_type):
618
    """Return node information.
619

620
    This will return memory information and volume group size and free
621
    space.
622

623
    This is a multi-node call.
624

625
    @type node_list: list
626
    @param node_list: the list of nodes to query
627
    @type vg_name: C{string}
628
    @param vg_name: the name of the volume group to ask for disk space
629
        information
630
    @type hypervisor_type: C{str}
631
    @param hypervisor_type: the name of the hypervisor to ask for
632
        memory information
633

634
    """
635
    retux = self._MultiNodeCall(node_list, "node_info",
636
                                [vg_name, hypervisor_type])
637

    
638
    for result in retux.itervalues():
639
      if result.failed or not isinstance(result.data, dict):
640
        result.data = {}
641
      if result.offline:
642
        log_name = None
643
      else:
644
        log_name = "call_node_info"
645

    
646
      utils.CheckDict(result.data, {
647
        'memory_total' : '-',
648
        'memory_dom0' : '-',
649
        'memory_free' : '-',
650
        'vg_size' : 'node_unreachable',
651
        'vg_free' : '-',
652
        }, log_name)
653
    return retux
654

    
655
  def call_node_add(self, node, dsa, dsapub, rsa, rsapub, ssh, sshpub):
656
    """Add a node to the cluster.
657

658
    This is a single-node call.
659

660
    """
661
    return self._SingleNodeCall(node, "node_add",
662
                                [dsa, dsapub, rsa, rsapub, ssh, sshpub])
663

    
664
  def call_node_verify(self, node_list, checkdict, cluster_name):
665
    """Request verification of given parameters.
666

667
    This is a multi-node call.
668

669
    """
670
    return self._MultiNodeCall(node_list, "node_verify",
671
                               [checkdict, cluster_name])
672

    
673
  @classmethod
674
  def call_node_start_master(cls, node, start_daemons):
675
    """Tells a node to activate itself as a master.
676

677
    This is a single-node call.
678

679
    """
680
    return cls._StaticSingleNodeCall(node, "node_start_master",
681
                                     [start_daemons])
682

    
683
  @classmethod
684
  def call_node_stop_master(cls, node, stop_daemons):
685
    """Tells a node to demote itself from master status.
686

687
    This is a single-node call.
688

689
    """
690
    return cls._StaticSingleNodeCall(node, "node_stop_master", [stop_daemons])
691

    
692
  @classmethod
693
  def call_master_info(cls, node_list):
694
    """Query master info.
695

696
    This is a multi-node call.
697

698
    """
699
    # TODO: should this method query down nodes?
700
    return cls._StaticMultiNodeCall(node_list, "master_info", [])
701

    
702
  def call_version(self, node_list):
703
    """Query node version.
704

705
    This is a multi-node call.
706

707
    """
708
    return self._MultiNodeCall(node_list, "version", [])
709

    
710
  def call_blockdev_create(self, node, bdev, size, owner, on_primary, info):
711
    """Request creation of a given block device.
712

713
    This is a single-node call.
714

715
    """
716
    return self._SingleNodeCall(node, "blockdev_create",
717
                                [bdev.ToDict(), size, owner, on_primary, info])
718

    
719
  def call_blockdev_remove(self, node, bdev):
720
    """Request removal of a given block device.
721

722
    This is a single-node call.
723

724
    """
725
    return self._SingleNodeCall(node, "blockdev_remove", [bdev.ToDict()])
726

    
727
  def call_blockdev_rename(self, node, devlist):
728
    """Request rename of the given block devices.
729

730
    This is a single-node call.
731

732
    """
733
    return self._SingleNodeCall(node, "blockdev_rename",
734
                                [(d.ToDict(), uid) for d, uid in devlist])
735

    
736
  def call_blockdev_assemble(self, node, disk, owner, on_primary):
737
    """Request assembling of a given block device.
738

739
    This is a single-node call.
740

741
    """
742
    return self._SingleNodeCall(node, "blockdev_assemble",
743
                                [disk.ToDict(), owner, on_primary])
744

    
745
  def call_blockdev_shutdown(self, node, disk):
746
    """Request shutdown of a given block device.
747

748
    This is a single-node call.
749

750
    """
751
    return self._SingleNodeCall(node, "blockdev_shutdown", [disk.ToDict()])
752

    
753
  def call_blockdev_addchildren(self, node, bdev, ndevs):
754
    """Request adding a list of children to a (mirroring) device.
755

756
    This is a single-node call.
757

758
    """
759
    return self._SingleNodeCall(node, "blockdev_addchildren",
760
                                [bdev.ToDict(),
761
                                 [disk.ToDict() for disk in ndevs]])
762

    
763
  def call_blockdev_removechildren(self, node, bdev, ndevs):
764
    """Request removing a list of children from a (mirroring) device.
765

766
    This is a single-node call.
767

768
    """
769
    return self._SingleNodeCall(node, "blockdev_removechildren",
770
                                [bdev.ToDict(),
771
                                 [disk.ToDict() for disk in ndevs]])
772

    
773
  def call_blockdev_getmirrorstatus(self, node, disks):
774
    """Request status of a (mirroring) device.
775

776
    This is a single-node call.
777

778
    """
779
    return self._SingleNodeCall(node, "blockdev_getmirrorstatus",
780
                                [dsk.ToDict() for dsk in disks])
781

    
782
  def call_blockdev_find(self, node, disk):
783
    """Request identification of a given block device.
784

785
    This is a single-node call.
786

787
    """
788
    return self._SingleNodeCall(node, "blockdev_find", [disk.ToDict()])
789

    
790
  def call_blockdev_close(self, node, instance_name, disks):
791
    """Closes the given block devices.
792

793
    This is a single-node call.
794

795
    """
796
    params = [instance_name, [cf.ToDict() for cf in disks]]
797
    return self._SingleNodeCall(node, "blockdev_close", params)
798

    
799
  def call_drbd_disconnect_net(self, node_list, nodes_ip, disks):
800
    """Disconnects the network of the given drbd devices.
801

802
    This is a multi-node call.
803

804
    """
805
    return self._MultiNodeCall(node_list, "drbd_disconnect_net",
806
                               [nodes_ip, [cf.ToDict() for cf in disks]])
807

    
808
  def call_drbd_attach_net(self, node_list, nodes_ip,
809
                           disks, instance_name, multimaster):
810
    """Disconnects the given drbd devices.
811

812
    This is a multi-node call.
813

814
    """
815
    return self._MultiNodeCall(node_list, "drbd_attach_net",
816
                               [nodes_ip, [cf.ToDict() for cf in disks],
817
                                instance_name, multimaster])
818

    
819
  def call_drbd_wait_sync(self, node_list, nodes_ip, disks):
820
    """Waits for the synchronization of drbd devices is complete.
821

822
    This is a multi-node call.
823

824
    """
825
    return self._MultiNodeCall(node_list, "drbd_wait_sync",
826
                               [nodes_ip, [cf.ToDict() for cf in disks]])
827

    
828
  @classmethod
829
  def call_upload_file(cls, node_list, file_name, address_list=None):
830
    """Upload a file.
831

832
    The node will refuse the operation in case the file is not on the
833
    approved file list.
834

835
    This is a multi-node call.
836

837
    @type node_list: list
838
    @param node_list: the list of node names to upload to
839
    @type file_name: str
840
    @param file_name: the filename to upload
841
    @type address_list: list or None
842
    @keyword address_list: an optional list of node addresses, in order
843
        to optimize the RPC speed
844

845
    """
846
    file_contents = utils.ReadFile(file_name)
847
    data = cls._Compress(file_contents)
848
    st = os.stat(file_name)
849
    params = [file_name, data, st.st_mode, st.st_uid, st.st_gid,
850
              st.st_atime, st.st_mtime]
851
    return cls._StaticMultiNodeCall(node_list, "upload_file", params,
852
                                    address_list=address_list)
853

    
854
  @classmethod
855
  def call_write_ssconf_files(cls, node_list, values):
856
    """Write ssconf files.
857

858
    This is a multi-node call.
859

860
    """
861
    return cls._StaticMultiNodeCall(node_list, "write_ssconf_files", [values])
862

    
863
  def call_os_diagnose(self, node_list):
864
    """Request a diagnose of OS definitions.
865

866
    This is a multi-node call.
867

868
    """
869
    result = self._MultiNodeCall(node_list, "os_diagnose", [])
870

    
871
    for node_result in result.values():
872
      if not node_result.failed and node_result.data:
873
        node_result.data = [objects.OS.FromDict(oss)
874
                            for oss in node_result.data]
875
    return result
876

    
877
  def call_os_get(self, node, name):
878
    """Returns an OS definition.
879

880
    This is a single-node call.
881

882
    """
883
    result = self._SingleNodeCall(node, "os_get", [name])
884
    if not result.failed and isinstance(result.data, dict):
885
      result.data = objects.OS.FromDict(result.data)
886
    return result
887

    
888
  def call_hooks_runner(self, node_list, hpath, phase, env):
889
    """Call the hooks runner.
890

891
    Args:
892
      - op: the OpCode instance
893
      - env: a dictionary with the environment
894

895
    This is a multi-node call.
896

897
    """
898
    params = [hpath, phase, env]
899
    return self._MultiNodeCall(node_list, "hooks_runner", params)
900

    
901
  def call_iallocator_runner(self, node, name, idata):
902
    """Call an iallocator on a remote node
903

904
    Args:
905
      - name: the iallocator name
906
      - input: the json-encoded input string
907

908
    This is a single-node call.
909

910
    """
911
    return self._SingleNodeCall(node, "iallocator_runner", [name, idata])
912

    
913
  def call_blockdev_grow(self, node, cf_bdev, amount):
914
    """Request a snapshot of the given block device.
915

916
    This is a single-node call.
917

918
    """
919
    return self._SingleNodeCall(node, "blockdev_grow",
920
                                [cf_bdev.ToDict(), amount])
921

    
922
  def call_blockdev_snapshot(self, node, cf_bdev):
923
    """Request a snapshot of the given block device.
924

925
    This is a single-node call.
926

927
    """
928
    return self._SingleNodeCall(node, "blockdev_snapshot", [cf_bdev.ToDict()])
929

    
930
  def call_snapshot_export(self, node, snap_bdev, dest_node, instance,
931
                           cluster_name, idx):
932
    """Request the export of a given snapshot.
933

934
    This is a single-node call.
935

936
    """
937
    return self._SingleNodeCall(node, "snapshot_export",
938
                                [snap_bdev.ToDict(), dest_node,
939
                                 self._InstDict(instance), cluster_name, idx])
940

    
941
  def call_finalize_export(self, node, instance, snap_disks):
942
    """Request the completion of an export operation.
943

944
    This writes the export config file, etc.
945

946
    This is a single-node call.
947

948
    """
949
    flat_disks = []
950
    for disk in snap_disks:
951
      flat_disks.append(disk.ToDict())
952

    
953
    return self._SingleNodeCall(node, "finalize_export",
954
                                [self._InstDict(instance), flat_disks])
955

    
956
  def call_export_info(self, node, path):
957
    """Queries the export information in a given path.
958

959
    This is a single-node call.
960

961
    """
962
    result = self._SingleNodeCall(node, "export_info", [path])
963
    if not result.failed and result.data:
964
      result.data = objects.SerializableConfigParser.Loads(str(result.data))
965
    return result
966

    
967
  def call_instance_os_import(self, node, inst, src_node, src_images,
968
                              cluster_name):
969
    """Request the import of a backup into an instance.
970

971
    This is a single-node call.
972

973
    """
974
    return self._SingleNodeCall(node, "instance_os_import",
975
                                [self._InstDict(inst), src_node, src_images,
976
                                 cluster_name])
977

    
978
  def call_export_list(self, node_list):
979
    """Gets the stored exports list.
980

981
    This is a multi-node call.
982

983
    """
984
    return self._MultiNodeCall(node_list, "export_list", [])
985

    
986
  def call_export_remove(self, node, export):
987
    """Requests removal of a given export.
988

989
    This is a single-node call.
990

991
    """
992
    return self._SingleNodeCall(node, "export_remove", [export])
993

    
994
  @classmethod
995
  def call_node_leave_cluster(cls, node):
996
    """Requests a node to clean the cluster information it has.
997

998
    This will remove the configuration information from the ganeti data
999
    dir.
1000

1001
    This is a single-node call.
1002

1003
    """
1004
    return cls._StaticSingleNodeCall(node, "node_leave_cluster", [])
1005

    
1006
  def call_node_volumes(self, node_list):
1007
    """Gets all volumes on node(s).
1008

1009
    This is a multi-node call.
1010

1011
    """
1012
    return self._MultiNodeCall(node_list, "node_volumes", [])
1013

    
1014
  def call_node_demote_from_mc(self, node):
1015
    """Demote a node from the master candidate role.
1016

1017
    This is a single-node call.
1018

1019
    """
1020
    return self._SingleNodeCall(node, "node_demote_from_mc", [])
1021

    
1022
  def call_test_delay(self, node_list, duration):
1023
    """Sleep for a fixed time on given node(s).
1024

1025
    This is a multi-node call.
1026

1027
    """
1028
    return self._MultiNodeCall(node_list, "test_delay", [duration])
1029

    
1030
  def call_file_storage_dir_create(self, node, file_storage_dir):
1031
    """Create the given file storage directory.
1032

1033
    This is a single-node call.
1034

1035
    """
1036
    return self._SingleNodeCall(node, "file_storage_dir_create",
1037
                                [file_storage_dir])
1038

    
1039
  def call_file_storage_dir_remove(self, node, file_storage_dir):
1040
    """Remove the given file storage directory.
1041

1042
    This is a single-node call.
1043

1044
    """
1045
    return self._SingleNodeCall(node, "file_storage_dir_remove",
1046
                                [file_storage_dir])
1047

    
1048
  def call_file_storage_dir_rename(self, node, old_file_storage_dir,
1049
                                   new_file_storage_dir):
1050
    """Rename file storage directory.
1051

1052
    This is a single-node call.
1053

1054
    """
1055
    return self._SingleNodeCall(node, "file_storage_dir_rename",
1056
                                [old_file_storage_dir, new_file_storage_dir])
1057

    
1058
  @classmethod
1059
  def call_jobqueue_update(cls, node_list, address_list, file_name, content):
1060
    """Update job queue.
1061

1062
    This is a multi-node call.
1063

1064
    """
1065
    return cls._StaticMultiNodeCall(node_list, "jobqueue_update",
1066
                                    [file_name, cls._Compress(content)],
1067
                                    address_list=address_list)
1068

    
1069
  @classmethod
1070
  def call_jobqueue_purge(cls, node):
1071
    """Purge job queue.
1072

1073
    This is a single-node call.
1074

1075
    """
1076
    return cls._StaticSingleNodeCall(node, "jobqueue_purge", [])
1077

    
1078
  @classmethod
1079
  def call_jobqueue_rename(cls, node_list, address_list, rename):
1080
    """Rename a job queue file.
1081

1082
    This is a multi-node call.
1083

1084
    """
1085
    return cls._StaticMultiNodeCall(node_list, "jobqueue_rename", rename,
1086
                                    address_list=address_list)
1087

    
1088
  @classmethod
1089
  def call_jobqueue_set_drain(cls, node_list, drain_flag):
1090
    """Set the drain flag on the queue.
1091

1092
    This is a multi-node call.
1093

1094
    @type node_list: list
1095
    @param node_list: the list of nodes to query
1096
    @type drain_flag: bool
1097
    @param drain_flag: if True, will set the drain flag, otherwise reset it.
1098

1099
    """
1100
    return cls._StaticMultiNodeCall(node_list, "jobqueue_set_drain",
1101
                                    [drain_flag])
1102

    
1103
  def call_hypervisor_validate_params(self, node_list, hvname, hvparams):
1104
    """Validate the hypervisor params.
1105

1106
    This is a multi-node call.
1107

1108
    @type node_list: list
1109
    @param node_list: the list of nodes to query
1110
    @type hvname: string
1111
    @param hvname: the hypervisor name
1112
    @type hvparams: dict
1113
    @param hvparams: the hypervisor parameters to be validated
1114

1115
    """
1116
    cluster = self._cfg.GetClusterInfo()
1117
    hv_full = cluster.FillDict(cluster.hvparams.get(hvname, {}), hvparams)
1118
    return self._MultiNodeCall(node_list, "hypervisor_validate_params",
1119
                               [hvname, hv_full])