Statistics
| Branch: | Tag: | Revision:

root / lib / rpc.py @ 1b8acf70

History | View | Annotate | Download (32.3 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Inter-node RPC library.
23

24
"""
25

    
26
# pylint: disable-msg=C0103,R0201,R0904
27
# C0103: Invalid name, since call_ are not valid
28
# R0201: Method could be a function, we keep all rpcs instance methods
29
# as not to change them back and forth between static/instance methods
30
# if they need to start using instance attributes
31
# R0904: Too many public methods
32

    
33
import os
34
import socket
35
import logging
36
import zlib
37
import base64
38

    
39
from ganeti import utils
40
from ganeti import objects
41
from ganeti import http
42
from ganeti import serializer
43
from ganeti import constants
44
from ganeti import errors
45

    
46
import ganeti.http.client
47

    
48

    
49
# Module level variable
50
_http_manager = None
51

    
52

    
53
def Init():
54
  """Initializes the module-global HTTP client manager.
55

56
  Must be called before using any RPC function.
57

58
  """
59
  global _http_manager
60

    
61
  assert not _http_manager, "RPC module initialized more than once"
62

    
63
  _http_manager = http.client.HttpClientManager()
64

    
65

    
66
def Shutdown():
67
  """Stops the module-global HTTP client manager.
68

69
  Must be called before quitting the program.
70

71
  """
72
  global _http_manager
73

    
74
  if _http_manager:
75
    _http_manager.Shutdown()
76
    _http_manager = None
77

    
78

    
79
class RpcResult(object):
80
  """RPC Result class.
81

82
  This class holds an RPC result. It is needed since in multi-node
83
  calls we can't raise an exception just because one one out of many
84
  failed, and therefore we use this class to encapsulate the result.
85

86
  @ivar data: the data payload, for successfull results, or None
87
  @type failed: boolean
88
  @ivar failed: whether the operation failed at RPC level (not
89
      application level on the remote node)
90
  @ivar call: the name of the RPC call
91
  @ivar node: the name of the node to which we made the call
92
  @ivar offline: whether the operation failed because the node was
93
      offline, as opposed to actual failure; offline=True will always
94
      imply failed=True, in order to allow simpler checking if
95
      the user doesn't care about the exact failure mode
96

97
  """
98
  def __init__(self, data=None, failed=False, offline=False,
99
               call=None, node=None):
100
    self.failed = failed
101
    self.offline = offline
102
    self.call = call
103
    self.node = node
104
    if offline:
105
      self.failed = True
106
      self.error = "Node is marked offline"
107
      self.data = None
108
    elif failed:
109
      self.error = data
110
      self.data = None
111
    else:
112
      self.data = data
113
      self.error = None
114

    
115
  def Raise(self):
116
    """If the result has failed, raise an OpExecError.
117

118
    This is used so that LU code doesn't have to check for each
119
    result, but instead can call this function.
120

121
    """
122
    if self.failed:
123
      raise errors.OpExecError("Call '%s' to node '%s' has failed: %s" %
124
                               (self.call, self.node, self.error))
125

    
126
  def RemoteFailMsg(self):
127
    """Check if the remote procedure failed.
128

129
    This is valid only for RPC calls which return result of the form
130
    (status, data | error_msg).
131

132
    @return: empty string for succcess, otherwise an error message
133

134
    """
135
    def _EnsureErr(val):
136
      """Helper to ensure we return a 'True' value for error."""
137
      if val:
138
        return val
139
      else:
140
        return "No error information"
141

    
142
    if self.failed:
143
      return _EnsureErr(self.error)
144
    if not isinstance(self.data, (tuple, list)):
145
      return "Invalid result type (%s)" % type(self.data)
146
    if len(self.data) != 2:
147
      return "Invalid result length (%d), expected 2" % len(self.data)
148
    if not self.data[0]:
149
      return _EnsureErr(self.data[1])
150
    return ""
151

    
152

    
153
class Client:
154
  """RPC Client class.
155

156
  This class, given a (remote) method name, a list of parameters and a
157
  list of nodes, will contact (in parallel) all nodes, and return a
158
  dict of results (key: node name, value: result).
159

160
  One current bug is that generic failure is still signalled by
161
  'False' result, which is not good. This overloading of values can
162
  cause bugs.
163

164
  """
165
  def __init__(self, procedure, body, port):
166
    self.procedure = procedure
167
    self.body = body
168
    self.port = port
169
    self.nc = {}
170

    
171
    self._ssl_params = \
172
      http.HttpSslParams(ssl_key_path=constants.SSL_CERT_FILE,
173
                         ssl_cert_path=constants.SSL_CERT_FILE)
174

    
175
  def ConnectList(self, node_list, address_list=None):
176
    """Add a list of nodes to the target nodes.
177

178
    @type node_list: list
179
    @param node_list: the list of node names to connect
180
    @type address_list: list or None
181
    @keyword address_list: either None or a list with node addresses,
182
        which must have the same length as the node list
183

184
    """
185
    if address_list is None:
186
      address_list = [None for _ in node_list]
187
    else:
188
      assert len(node_list) == len(address_list), \
189
             "Name and address lists should have the same length"
190
    for node, address in zip(node_list, address_list):
191
      self.ConnectNode(node, address)
192

    
193
  def ConnectNode(self, name, address=None):
194
    """Add a node to the target list.
195

196
    @type name: str
197
    @param name: the node name
198
    @type address: str
199
    @keyword address: the node address, if known
200

201
    """
202
    if address is None:
203
      address = name
204

    
205
    self.nc[name] = \
206
      http.client.HttpClientRequest(address, self.port, http.HTTP_PUT,
207
                                    "/%s" % self.procedure,
208
                                    post_data=self.body,
209
                                    ssl_params=self._ssl_params,
210
                                    ssl_verify_peer=True)
211

    
212
  def GetResults(self):
213
    """Call nodes and return results.
214

215
    @rtype: list
216
    @returns: List of RPC results
217

218
    """
219
    assert _http_manager, "RPC module not intialized"
220

    
221
    _http_manager.ExecRequests(self.nc.values())
222

    
223
    results = {}
224

    
225
    for name, req in self.nc.iteritems():
226
      if req.success and req.resp_status_code == http.HTTP_OK:
227
        results[name] = RpcResult(data=serializer.LoadJson(req.resp_body),
228
                                  node=name, call=self.procedure)
229
        continue
230

    
231
      # TODO: Better error reporting
232
      if req.error:
233
        msg = req.error
234
      else:
235
        msg = req.resp_body
236

    
237
      logging.error("RPC error in %s from node %s: %s",
238
                    self.procedure, name, msg)
239
      results[name] = RpcResult(data=msg, failed=True, node=name,
240
                                call=self.procedure)
241

    
242
    return results
243

    
244

    
245
class RpcRunner(object):
246
  """RPC runner class"""
247

    
248
  def __init__(self, cfg):
249
    """Initialized the rpc runner.
250

251
    @type cfg:  C{config.ConfigWriter}
252
    @param cfg: the configuration object that will be used to get data
253
                about the cluster
254

255
    """
256
    self._cfg = cfg
257
    self.port = utils.GetNodeDaemonPort()
258

    
259
  def _InstDict(self, instance):
260
    """Convert the given instance to a dict.
261

262
    This is done via the instance's ToDict() method and additionally
263
    we fill the hvparams with the cluster defaults.
264

265
    @type instance: L{objects.Instance}
266
    @param instance: an Instance object
267
    @rtype: dict
268
    @return: the instance dict, with the hvparams filled with the
269
        cluster defaults
270

271
    """
272
    idict = instance.ToDict()
273
    cluster = self._cfg.GetClusterInfo()
274
    idict["hvparams"] = cluster.FillHV(instance)
275
    idict["beparams"] = cluster.FillBE(instance)
276
    return idict
277

    
278
  def _ConnectList(self, client, node_list):
279
    """Helper for computing node addresses.
280

281
    @type client: L{Client}
282
    @param client: a C{Client} instance
283
    @type node_list: list
284
    @param node_list: the node list we should connect
285

286
    """
287
    all_nodes = self._cfg.GetAllNodesInfo()
288
    name_list = []
289
    addr_list = []
290
    skip_dict = {}
291
    for node in node_list:
292
      if node in all_nodes:
293
        if all_nodes[node].offline:
294
          skip_dict[node] = RpcResult(node=node, offline=True)
295
          continue
296
        val = all_nodes[node].primary_ip
297
      else:
298
        val = None
299
      addr_list.append(val)
300
      name_list.append(node)
301
    if name_list:
302
      client.ConnectList(name_list, address_list=addr_list)
303
    return skip_dict
304

    
305
  def _ConnectNode(self, client, node):
306
    """Helper for computing one node's address.
307

308
    @type client: L{Client}
309
    @param client: a C{Client} instance
310
    @type node: str
311
    @param node: the node we should connect
312

313
    """
314
    node_info = self._cfg.GetNodeInfo(node)
315
    if node_info is not None:
316
      if node_info.offline:
317
        return RpcResult(node=node, offline=True)
318
      addr = node_info.primary_ip
319
    else:
320
      addr = None
321
    client.ConnectNode(node, address=addr)
322

    
323
  def _MultiNodeCall(self, node_list, procedure, args):
324
    """Helper for making a multi-node call
325

326
    """
327
    body = serializer.DumpJson(args, indent=False)
328
    c = Client(procedure, body, self.port)
329
    skip_dict = self._ConnectList(c, node_list)
330
    skip_dict.update(c.GetResults())
331
    return skip_dict
332

    
333
  @classmethod
334
  def _StaticMultiNodeCall(cls, node_list, procedure, args,
335
                           address_list=None):
336
    """Helper for making a multi-node static call
337

338
    """
339
    body = serializer.DumpJson(args, indent=False)
340
    c = Client(procedure, body, utils.GetNodeDaemonPort())
341
    c.ConnectList(node_list, address_list=address_list)
342
    return c.GetResults()
343

    
344
  def _SingleNodeCall(self, node, procedure, args):
345
    """Helper for making a single-node call
346

347
    """
348
    body = serializer.DumpJson(args, indent=False)
349
    c = Client(procedure, body, self.port)
350
    result = self._ConnectNode(c, node)
351
    if result is None:
352
      # we did connect, node is not offline
353
      result = c.GetResults()[node]
354
    return result
355

    
356
  @classmethod
357
  def _StaticSingleNodeCall(cls, node, procedure, args):
358
    """Helper for making a single-node static call
359

360
    """
361
    body = serializer.DumpJson(args, indent=False)
362
    c = Client(procedure, body, utils.GetNodeDaemonPort())
363
    c.ConnectNode(node)
364
    return c.GetResults()[node]
365

    
366
  @staticmethod
367
  def _Compress(data):
368
    """Compresses a string for transport over RPC.
369

370
    Small amounts of data are not compressed.
371

372
    @type data: str
373
    @param data: Data
374
    @rtype: tuple
375
    @return: Encoded data to send
376

377
    """
378
    # Small amounts of data are not compressed
379
    if len(data) < 512:
380
      return (constants.RPC_ENCODING_NONE, data)
381

    
382
    # Compress with zlib and encode in base64
383
    return (constants.RPC_ENCODING_ZLIB_BASE64,
384
            base64.b64encode(zlib.compress(data, 3)))
385

    
386
  #
387
  # Begin RPC calls
388
  #
389

    
390
  def call_volume_list(self, node_list, vg_name):
391
    """Gets the logical volumes present in a given volume group.
392

393
    This is a multi-node call.
394

395
    """
396
    return self._MultiNodeCall(node_list, "volume_list", [vg_name])
397

    
398
  def call_vg_list(self, node_list):
399
    """Gets the volume group list.
400

401
    This is a multi-node call.
402

403
    """
404
    return self._MultiNodeCall(node_list, "vg_list", [])
405

    
406
  def call_bridges_exist(self, node, bridges_list):
407
    """Checks if a node has all the bridges given.
408

409
    This method checks if all bridges given in the bridges_list are
410
    present on the remote node, so that an instance that uses interfaces
411
    on those bridges can be started.
412

413
    This is a single-node call.
414

415
    """
416
    return self._SingleNodeCall(node, "bridges_exist", [bridges_list])
417

    
418
  def call_instance_start(self, node, instance, extra_args):
419
    """Starts an instance.
420

421
    This is a single-node call.
422

423
    """
424
    return self._SingleNodeCall(node, "instance_start",
425
                                [self._InstDict(instance), extra_args])
426

    
427
  def call_instance_shutdown(self, node, instance):
428
    """Stops an instance.
429

430
    This is a single-node call.
431

432
    """
433
    return self._SingleNodeCall(node, "instance_shutdown",
434
                                [self._InstDict(instance)])
435

    
436
  def call_migration_info(self, node, instance):
437
    """Gather the information necessary to prepare an instance migration.
438

439
    This is a single-node call.
440

441
    @type node: string
442
    @param node: the node on which the instance is currently running
443
    @type instance: C{objects.Instance}
444
    @param instance: the instance definition
445

446
    """
447
    return self._SingleNodeCall(node, "migration_info",
448
                                [self._InstDict(instance)])
449

    
450
  def call_accept_instance(self, node, instance, info, target):
451
    """Prepare a node to accept an instance.
452

453
    This is a single-node call.
454

455
    @type node: string
456
    @param node: the target node for the migration
457
    @type instance: C{objects.Instance}
458
    @param instance: the instance definition
459
    @type info: opaque/hypervisor specific (string/data)
460
    @param info: result for the call_migration_info call
461
    @type target: string
462
    @param target: target hostname (usually ip address) (on the node itself)
463

464
    """
465
    return self._SingleNodeCall(node, "accept_instance",
466
                                [self._InstDict(instance), info, target])
467

    
468
  def call_finalize_migration(self, node, instance, info, success):
469
    """Finalize any target-node migration specific operation.
470

471
    This is called both in case of a successful migration and in case of error
472
    (in which case it should abort the migration).
473

474
    This is a single-node call.
475

476
    @type node: string
477
    @param node: the target node for the migration
478
    @type instance: C{objects.Instance}
479
    @param instance: the instance definition
480
    @type info: opaque/hypervisor specific (string/data)
481
    @param info: result for the call_migration_info call
482
    @type success: boolean
483
    @param success: whether the migration was a success or a failure
484

485
    """
486
    return self._SingleNodeCall(node, "finalize_migration",
487
                                [self._InstDict(instance), info, success])
488

    
489
  def call_instance_migrate(self, node, instance, target, live):
490
    """Migrate an instance.
491

492
    This is a single-node call.
493

494
    @type node: string
495
    @param node: the node on which the instance is currently running
496
    @type instance: C{objects.Instance}
497
    @param instance: the instance definition
498
    @type target: string
499
    @param target: the target node name
500
    @type live: boolean
501
    @param live: whether the migration should be done live or not (the
502
        interpretation of this parameter is left to the hypervisor)
503

504
    """
505
    return self._SingleNodeCall(node, "instance_migrate",
506
                                [self._InstDict(instance), target, live])
507

    
508
  def call_instance_reboot(self, node, instance, reboot_type, extra_args):
509
    """Reboots an instance.
510

511
    This is a single-node call.
512

513
    """
514
    return self._SingleNodeCall(node, "instance_reboot",
515
                                [self._InstDict(instance), reboot_type,
516
                                 extra_args])
517

    
518
  def call_instance_os_add(self, node, inst):
519
    """Installs an OS on the given instance.
520

521
    This is a single-node call.
522

523
    """
524
    return self._SingleNodeCall(node, "instance_os_add",
525
                                [self._InstDict(inst)])
526

    
527
  def call_instance_run_rename(self, node, inst, old_name):
528
    """Run the OS rename script for an instance.
529

530
    This is a single-node call.
531

532
    """
533
    return self._SingleNodeCall(node, "instance_run_rename",
534
                                [self._InstDict(inst), old_name])
535

    
536
  def call_instance_info(self, node, instance, hname):
537
    """Returns information about a single instance.
538

539
    This is a single-node call.
540

541
    @type node: list
542
    @param node: the list of nodes to query
543
    @type instance: string
544
    @param instance: the instance name
545
    @type hname: string
546
    @param hname: the hypervisor type of the instance
547

548
    """
549
    return self._SingleNodeCall(node, "instance_info", [instance, hname])
550

    
551
  def call_instance_migratable(self, node, instance):
552
    """Checks whether the given instance can be migrated.
553

554
    This is a single-node call.
555

556
    @param node: the node to query
557
    @type instance: L{objects.Instance}
558
    @param instance: the instance to check
559

560

561
    """
562
    return self._SingleNodeCall(node, "instance_migratable",
563
                                [self._InstDict(instance)])
564

    
565
  def call_all_instances_info(self, node_list, hypervisor_list):
566
    """Returns information about all instances on the given nodes.
567

568
    This is a multi-node call.
569

570
    @type node_list: list
571
    @param node_list: the list of nodes to query
572
    @type hypervisor_list: list
573
    @param hypervisor_list: the hypervisors to query for instances
574

575
    """
576
    return self._MultiNodeCall(node_list, "all_instances_info",
577
                               [hypervisor_list])
578

    
579
  def call_instance_list(self, node_list, hypervisor_list):
580
    """Returns the list of running instances on a given node.
581

582
    This is a multi-node call.
583

584
    @type node_list: list
585
    @param node_list: the list of nodes to query
586
    @type hypervisor_list: list
587
    @param hypervisor_list: the hypervisors to query for instances
588

589
    """
590
    return self._MultiNodeCall(node_list, "instance_list", [hypervisor_list])
591

    
592
  def call_node_tcp_ping(self, node, source, target, port, timeout,
593
                         live_port_needed):
594
    """Do a TcpPing on the remote node
595

596
    This is a single-node call.
597

598
    """
599
    return self._SingleNodeCall(node, "node_tcp_ping",
600
                                [source, target, port, timeout,
601
                                 live_port_needed])
602

    
603
  def call_node_has_ip_address(self, node, address):
604
    """Checks if a node has the given IP address.
605

606
    This is a single-node call.
607

608
    """
609
    return self._SingleNodeCall(node, "node_has_ip_address", [address])
610

    
611
  def call_node_info(self, node_list, vg_name, hypervisor_type):
612
    """Return node information.
613

614
    This will return memory information and volume group size and free
615
    space.
616

617
    This is a multi-node call.
618

619
    @type node_list: list
620
    @param node_list: the list of nodes to query
621
    @type vg_name: C{string}
622
    @param vg_name: the name of the volume group to ask for disk space
623
        information
624
    @type hypervisor_type: C{str}
625
    @param hypervisor_type: the name of the hypervisor to ask for
626
        memory information
627

628
    """
629
    retux = self._MultiNodeCall(node_list, "node_info",
630
                                [vg_name, hypervisor_type])
631

    
632
    for result in retux.itervalues():
633
      if result.failed or not isinstance(result.data, dict):
634
        result.data = {}
635
      if result.offline:
636
        log_name = None
637
      else:
638
        log_name = "call_node_info"
639

    
640
      utils.CheckDict(result.data, {
641
        'memory_total' : '-',
642
        'memory_dom0' : '-',
643
        'memory_free' : '-',
644
        'vg_size' : 'node_unreachable',
645
        'vg_free' : '-',
646
        }, log_name)
647
    return retux
648

    
649
  def call_node_add(self, node, dsa, dsapub, rsa, rsapub, ssh, sshpub):
650
    """Add a node to the cluster.
651

652
    This is a single-node call.
653

654
    """
655
    return self._SingleNodeCall(node, "node_add",
656
                                [dsa, dsapub, rsa, rsapub, ssh, sshpub])
657

    
658
  def call_node_verify(self, node_list, checkdict, cluster_name):
659
    """Request verification of given parameters.
660

661
    This is a multi-node call.
662

663
    """
664
    return self._MultiNodeCall(node_list, "node_verify",
665
                               [checkdict, cluster_name])
666

    
667
  @classmethod
668
  def call_node_start_master(cls, node, start_daemons):
669
    """Tells a node to activate itself as a master.
670

671
    This is a single-node call.
672

673
    """
674
    return cls._StaticSingleNodeCall(node, "node_start_master",
675
                                     [start_daemons])
676

    
677
  @classmethod
678
  def call_node_stop_master(cls, node, stop_daemons):
679
    """Tells a node to demote itself from master status.
680

681
    This is a single-node call.
682

683
    """
684
    return cls._StaticSingleNodeCall(node, "node_stop_master", [stop_daemons])
685

    
686
  @classmethod
687
  def call_master_info(cls, node_list):
688
    """Query master info.
689

690
    This is a multi-node call.
691

692
    """
693
    # TODO: should this method query down nodes?
694
    return cls._StaticMultiNodeCall(node_list, "master_info", [])
695

    
696
  def call_version(self, node_list):
697
    """Query node version.
698

699
    This is a multi-node call.
700

701
    """
702
    return self._MultiNodeCall(node_list, "version", [])
703

    
704
  def call_blockdev_create(self, node, bdev, size, owner, on_primary, info):
705
    """Request creation of a given block device.
706

707
    This is a single-node call.
708

709
    """
710
    return self._SingleNodeCall(node, "blockdev_create",
711
                                [bdev.ToDict(), size, owner, on_primary, info])
712

    
713
  def call_blockdev_remove(self, node, bdev):
714
    """Request removal of a given block device.
715

716
    This is a single-node call.
717

718
    """
719
    return self._SingleNodeCall(node, "blockdev_remove", [bdev.ToDict()])
720

    
721
  def call_blockdev_rename(self, node, devlist):
722
    """Request rename of the given block devices.
723

724
    This is a single-node call.
725

726
    """
727
    return self._SingleNodeCall(node, "blockdev_rename",
728
                                [(d.ToDict(), uid) for d, uid in devlist])
729

    
730
  def call_blockdev_assemble(self, node, disk, owner, on_primary):
731
    """Request assembling of a given block device.
732

733
    This is a single-node call.
734

735
    """
736
    return self._SingleNodeCall(node, "blockdev_assemble",
737
                                [disk.ToDict(), owner, on_primary])
738

    
739
  def call_blockdev_shutdown(self, node, disk):
740
    """Request shutdown of a given block device.
741

742
    This is a single-node call.
743

744
    """
745
    return self._SingleNodeCall(node, "blockdev_shutdown", [disk.ToDict()])
746

    
747
  def call_blockdev_addchildren(self, node, bdev, ndevs):
748
    """Request adding a list of children to a (mirroring) device.
749

750
    This is a single-node call.
751

752
    """
753
    return self._SingleNodeCall(node, "blockdev_addchildren",
754
                                [bdev.ToDict(),
755
                                 [disk.ToDict() for disk in ndevs]])
756

    
757
  def call_blockdev_removechildren(self, node, bdev, ndevs):
758
    """Request removing a list of children from a (mirroring) device.
759

760
    This is a single-node call.
761

762
    """
763
    return self._SingleNodeCall(node, "blockdev_removechildren",
764
                                [bdev.ToDict(),
765
                                 [disk.ToDict() for disk in ndevs]])
766

    
767
  def call_blockdev_getmirrorstatus(self, node, disks):
768
    """Request status of a (mirroring) device.
769

770
    This is a single-node call.
771

772
    """
773
    return self._SingleNodeCall(node, "blockdev_getmirrorstatus",
774
                                [dsk.ToDict() for dsk in disks])
775

    
776
  def call_blockdev_find(self, node, disk):
777
    """Request identification of a given block device.
778

779
    This is a single-node call.
780

781
    """
782
    return self._SingleNodeCall(node, "blockdev_find", [disk.ToDict()])
783

    
784
  def call_blockdev_close(self, node, instance_name, disks):
785
    """Closes the given block devices.
786

787
    This is a single-node call.
788

789
    """
790
    params = [instance_name, [cf.ToDict() for cf in disks]]
791
    return self._SingleNodeCall(node, "blockdev_close", params)
792

    
793
  def call_drbd_disconnect_net(self, node_list, nodes_ip, disks):
794
    """Disconnects the network of the given drbd devices.
795

796
    This is a multi-node call.
797

798
    """
799
    return self._MultiNodeCall(node_list, "drbd_disconnect_net",
800
                               [nodes_ip, [cf.ToDict() for cf in disks]])
801

    
802
  def call_drbd_attach_net(self, node_list, nodes_ip,
803
                           disks, instance_name, multimaster):
804
    """Disconnects the given drbd devices.
805

806
    This is a multi-node call.
807

808
    """
809
    return self._MultiNodeCall(node_list, "drbd_attach_net",
810
                               [nodes_ip, [cf.ToDict() for cf in disks],
811
                                instance_name, multimaster])
812

    
813
  def call_drbd_wait_sync(self, node_list, nodes_ip, disks):
814
    """Waits for the synchronization of drbd devices is complete.
815

816
    This is a multi-node call.
817

818
    """
819
    return self._MultiNodeCall(node_list, "drbd_wait_sync",
820
                               [nodes_ip, [cf.ToDict() for cf in disks]])
821

    
822
  @classmethod
823
  def call_upload_file(cls, node_list, file_name, address_list=None):
824
    """Upload a file.
825

826
    The node will refuse the operation in case the file is not on the
827
    approved file list.
828

829
    This is a multi-node call.
830

831
    @type node_list: list
832
    @param node_list: the list of node names to upload to
833
    @type file_name: str
834
    @param file_name: the filename to upload
835
    @type address_list: list or None
836
    @keyword address_list: an optional list of node addresses, in order
837
        to optimize the RPC speed
838

839
    """
840
    file_contents = utils.ReadFile(file_name)
841
    data = cls._Compress(file_contents)
842
    st = os.stat(file_name)
843
    params = [file_name, data, st.st_mode, st.st_uid, st.st_gid,
844
              st.st_atime, st.st_mtime]
845
    return cls._StaticMultiNodeCall(node_list, "upload_file", params,
846
                                    address_list=address_list)
847

    
848
  @classmethod
849
  def call_write_ssconf_files(cls, node_list, values):
850
    """Write ssconf files.
851

852
    This is a multi-node call.
853

854
    """
855
    return cls._StaticMultiNodeCall(node_list, "write_ssconf_files", [values])
856

    
857
  def call_os_diagnose(self, node_list):
858
    """Request a diagnose of OS definitions.
859

860
    This is a multi-node call.
861

862
    """
863
    result = self._MultiNodeCall(node_list, "os_diagnose", [])
864

    
865
    for node_result in result.values():
866
      if not node_result.failed and node_result.data:
867
        node_result.data = [objects.OS.FromDict(oss)
868
                            for oss in node_result.data]
869
    return result
870

    
871
  def call_os_get(self, node, name):
872
    """Returns an OS definition.
873

874
    This is a single-node call.
875

876
    """
877
    result = self._SingleNodeCall(node, "os_get", [name])
878
    if not result.failed and isinstance(result.data, dict):
879
      result.data = objects.OS.FromDict(result.data)
880
    return result
881

    
882
  def call_hooks_runner(self, node_list, hpath, phase, env):
883
    """Call the hooks runner.
884

885
    Args:
886
      - op: the OpCode instance
887
      - env: a dictionary with the environment
888

889
    This is a multi-node call.
890

891
    """
892
    params = [hpath, phase, env]
893
    return self._MultiNodeCall(node_list, "hooks_runner", params)
894

    
895
  def call_iallocator_runner(self, node, name, idata):
896
    """Call an iallocator on a remote node
897

898
    Args:
899
      - name: the iallocator name
900
      - input: the json-encoded input string
901

902
    This is a single-node call.
903

904
    """
905
    return self._SingleNodeCall(node, "iallocator_runner", [name, idata])
906

    
907
  def call_blockdev_grow(self, node, cf_bdev, amount):
908
    """Request a snapshot of the given block device.
909

910
    This is a single-node call.
911

912
    """
913
    return self._SingleNodeCall(node, "blockdev_grow",
914
                                [cf_bdev.ToDict(), amount])
915

    
916
  def call_blockdev_snapshot(self, node, cf_bdev):
917
    """Request a snapshot of the given block device.
918

919
    This is a single-node call.
920

921
    """
922
    return self._SingleNodeCall(node, "blockdev_snapshot", [cf_bdev.ToDict()])
923

    
924
  def call_snapshot_export(self, node, snap_bdev, dest_node, instance,
925
                           cluster_name, idx):
926
    """Request the export of a given snapshot.
927

928
    This is a single-node call.
929

930
    """
931
    return self._SingleNodeCall(node, "snapshot_export",
932
                                [snap_bdev.ToDict(), dest_node,
933
                                 self._InstDict(instance), cluster_name, idx])
934

    
935
  def call_finalize_export(self, node, instance, snap_disks):
936
    """Request the completion of an export operation.
937

938
    This writes the export config file, etc.
939

940
    This is a single-node call.
941

942
    """
943
    flat_disks = []
944
    for disk in snap_disks:
945
      flat_disks.append(disk.ToDict())
946

    
947
    return self._SingleNodeCall(node, "finalize_export",
948
                                [self._InstDict(instance), flat_disks])
949

    
950
  def call_export_info(self, node, path):
951
    """Queries the export information in a given path.
952

953
    This is a single-node call.
954

955
    """
956
    result = self._SingleNodeCall(node, "export_info", [path])
957
    if not result.failed and result.data:
958
      result.data = objects.SerializableConfigParser.Loads(str(result.data))
959
    return result
960

    
961
  def call_instance_os_import(self, node, inst, src_node, src_images,
962
                              cluster_name):
963
    """Request the import of a backup into an instance.
964

965
    This is a single-node call.
966

967
    """
968
    return self._SingleNodeCall(node, "instance_os_import",
969
                                [self._InstDict(inst), src_node, src_images,
970
                                 cluster_name])
971

    
972
  def call_export_list(self, node_list):
973
    """Gets the stored exports list.
974

975
    This is a multi-node call.
976

977
    """
978
    return self._MultiNodeCall(node_list, "export_list", [])
979

    
980
  def call_export_remove(self, node, export):
981
    """Requests removal of a given export.
982

983
    This is a single-node call.
984

985
    """
986
    return self._SingleNodeCall(node, "export_remove", [export])
987

    
988
  @classmethod
989
  def call_node_leave_cluster(cls, node):
990
    """Requests a node to clean the cluster information it has.
991

992
    This will remove the configuration information from the ganeti data
993
    dir.
994

995
    This is a single-node call.
996

997
    """
998
    return cls._StaticSingleNodeCall(node, "node_leave_cluster", [])
999

    
1000
  def call_node_volumes(self, node_list):
1001
    """Gets all volumes on node(s).
1002

1003
    This is a multi-node call.
1004

1005
    """
1006
    return self._MultiNodeCall(node_list, "node_volumes", [])
1007

    
1008
  def call_node_demote_from_mc(self, node):
1009
    """Demote a node from the master candidate role.
1010

1011
    This is a single-node call.
1012

1013
    """
1014
    return self._SingleNodeCall(node, "node_demote_from_mc", [])
1015

    
1016
  def call_test_delay(self, node_list, duration):
1017
    """Sleep for a fixed time on given node(s).
1018

1019
    This is a multi-node call.
1020

1021
    """
1022
    return self._MultiNodeCall(node_list, "test_delay", [duration])
1023

    
1024
  def call_file_storage_dir_create(self, node, file_storage_dir):
1025
    """Create the given file storage directory.
1026

1027
    This is a single-node call.
1028

1029
    """
1030
    return self._SingleNodeCall(node, "file_storage_dir_create",
1031
                                [file_storage_dir])
1032

    
1033
  def call_file_storage_dir_remove(self, node, file_storage_dir):
1034
    """Remove the given file storage directory.
1035

1036
    This is a single-node call.
1037

1038
    """
1039
    return self._SingleNodeCall(node, "file_storage_dir_remove",
1040
                                [file_storage_dir])
1041

    
1042
  def call_file_storage_dir_rename(self, node, old_file_storage_dir,
1043
                                   new_file_storage_dir):
1044
    """Rename file storage directory.
1045

1046
    This is a single-node call.
1047

1048
    """
1049
    return self._SingleNodeCall(node, "file_storage_dir_rename",
1050
                                [old_file_storage_dir, new_file_storage_dir])
1051

    
1052
  @classmethod
1053
  def call_jobqueue_update(cls, node_list, address_list, file_name, content):
1054
    """Update job queue.
1055

1056
    This is a multi-node call.
1057

1058
    """
1059
    return cls._StaticMultiNodeCall(node_list, "jobqueue_update",
1060
                                    [file_name, cls._Compress(content)],
1061
                                    address_list=address_list)
1062

    
1063
  @classmethod
1064
  def call_jobqueue_purge(cls, node):
1065
    """Purge job queue.
1066

1067
    This is a single-node call.
1068

1069
    """
1070
    return cls._StaticSingleNodeCall(node, "jobqueue_purge", [])
1071

    
1072
  @classmethod
1073
  def call_jobqueue_rename(cls, node_list, address_list, rename):
1074
    """Rename a job queue file.
1075

1076
    This is a multi-node call.
1077

1078
    """
1079
    return cls._StaticMultiNodeCall(node_list, "jobqueue_rename", rename,
1080
                                    address_list=address_list)
1081

    
1082
  @classmethod
1083
  def call_jobqueue_set_drain(cls, node_list, drain_flag):
1084
    """Set the drain flag on the queue.
1085

1086
    This is a multi-node call.
1087

1088
    @type node_list: list
1089
    @param node_list: the list of nodes to query
1090
    @type drain_flag: bool
1091
    @param drain_flag: if True, will set the drain flag, otherwise reset it.
1092

1093
    """
1094
    return cls._StaticMultiNodeCall(node_list, "jobqueue_set_drain",
1095
                                    [drain_flag])
1096

    
1097
  def call_hypervisor_validate_params(self, node_list, hvname, hvparams):
1098
    """Validate the hypervisor params.
1099

1100
    This is a multi-node call.
1101

1102
    @type node_list: list
1103
    @param node_list: the list of nodes to query
1104
    @type hvname: string
1105
    @param hvname: the hypervisor name
1106
    @type hvparams: dict
1107
    @param hvparams: the hypervisor parameters to be validated
1108

1109
    """
1110
    cluster = self._cfg.GetClusterInfo()
1111
    hv_full = cluster.FillDict(cluster.hvparams.get(hvname, {}), hvparams)
1112
    return self._MultiNodeCall(node_list, "hypervisor_validate_params",
1113
                               [hvname, hv_full])