Statistics
| Branch: | Tag: | Revision:

root / lib / rpc.py @ 3cebe102

History | View | Annotate | Download (34.4 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Inter-node RPC library.
23

24
"""
25

    
26
# pylint: disable-msg=C0103,R0201,R0904
27
# C0103: Invalid name, since call_ are not valid
28
# R0201: Method could be a function, we keep all rpcs instance methods
29
# as not to change them back and forth between static/instance methods
30
# if they need to start using instance attributes
31
# R0904: Too many public methods
32

    
33
import os
34
import logging
35
import zlib
36
import base64
37

    
38
from ganeti import utils
39
from ganeti import objects
40
from ganeti import http
41
from ganeti import serializer
42
from ganeti import constants
43
from ganeti import errors
44

    
45
import ganeti.http.client
46

    
47

    
48
# Module level variable
49
_http_manager = None
50

    
51

    
52
def Init():
53
  """Initializes the module-global HTTP client manager.
54

55
  Must be called before using any RPC function.
56

57
  """
58
  global _http_manager
59

    
60
  assert not _http_manager, "RPC module initialized more than once"
61

    
62
  _http_manager = http.client.HttpClientManager()
63

    
64

    
65
def Shutdown():
66
  """Stops the module-global HTTP client manager.
67

68
  Must be called before quitting the program.
69

70
  """
71
  global _http_manager
72

    
73
  if _http_manager:
74
    _http_manager.Shutdown()
75
    _http_manager = None
76

    
77

    
78
class RpcResult(object):
79
  """RPC Result class.
80

81
  This class holds an RPC result. It is needed since in multi-node
82
  calls we can't raise an exception just because one one out of many
83
  failed, and therefore we use this class to encapsulate the result.
84

85
  @ivar data: the data payload, for successful results, or None
86
  @ivar call: the name of the RPC call
87
  @ivar node: the name of the node to which we made the call
88
  @ivar offline: whether the operation failed because the node was
89
      offline, as opposed to actual failure; offline=True will always
90
      imply failed=True, in order to allow simpler checking if
91
      the user doesn't care about the exact failure mode
92
  @ivar fail_msg: the error message if the call failed
93

94
  """
95
  def __init__(self, data=None, failed=False, offline=False,
96
               call=None, node=None):
97
    self.offline = offline
98
    self.call = call
99
    self.node = node
100
    if offline:
101
      self.fail_msg = "Node is marked offline"
102
      self.data = self.payload = None
103
    elif failed:
104
      self.fail_msg = self._EnsureErr(data)
105
      self.data = self.payload = None
106
    else:
107
      self.data = data
108
      if not isinstance(self.data, (tuple, list)):
109
        self.fail_msg = ("RPC layer error: invalid result type (%s)" %
110
                         type(self.data))
111
      elif len(data) != 2:
112
        self.fail_msg = ("RPC layer error: invalid result length (%d), "
113
                         "expected 2" % len(self.data))
114
      elif not self.data[0]:
115
        self.fail_msg = self._EnsureErr(self.data[1])
116
      else:
117
        # finally success
118
        self.fail_msg = None
119
        self.payload = data[1]
120

    
121
  @staticmethod
122
  def _EnsureErr(val):
123
    """Helper to ensure we return a 'True' value for error."""
124
    if val:
125
      return val
126
    else:
127
      return "No error information"
128

    
129
  def Raise(self, msg, prereq=False):
130
    """If the result has failed, raise an OpExecError.
131

132
    This is used so that LU code doesn't have to check for each
133
    result, but instead can call this function.
134

135
    """
136
    if not self.fail_msg:
137
      return
138

    
139
    if not msg: # one could pass None for default message
140
      msg = ("Call '%s' to node '%s' has failed: %s" %
141
             (self.call, self.node, self.fail_msg))
142
    else:
143
      msg = "%s: %s" % (msg, self.fail_msg)
144
    if prereq:
145
      ec = errors.OpPrereqError
146
    else:
147
      ec = errors.OpExecError
148
    raise ec(msg)
149

    
150

    
151
class Client:
152
  """RPC Client class.
153

154
  This class, given a (remote) method name, a list of parameters and a
155
  list of nodes, will contact (in parallel) all nodes, and return a
156
  dict of results (key: node name, value: result).
157

158
  One current bug is that generic failure is still signaled by
159
  'False' result, which is not good. This overloading of values can
160
  cause bugs.
161

162
  """
163
  def __init__(self, procedure, body, port):
164
    self.procedure = procedure
165
    self.body = body
166
    self.port = port
167
    self.nc = {}
168

    
169
    self._ssl_params = \
170
      http.HttpSslParams(ssl_key_path=constants.SSL_CERT_FILE,
171
                         ssl_cert_path=constants.SSL_CERT_FILE)
172

    
173
  def ConnectList(self, node_list, address_list=None):
174
    """Add a list of nodes to the target nodes.
175

176
    @type node_list: list
177
    @param node_list: the list of node names to connect
178
    @type address_list: list or None
179
    @keyword address_list: either None or a list with node addresses,
180
        which must have the same length as the node list
181

182
    """
183
    if address_list is None:
184
      address_list = [None for _ in node_list]
185
    else:
186
      assert len(node_list) == len(address_list), \
187
             "Name and address lists should have the same length"
188
    for node, address in zip(node_list, address_list):
189
      self.ConnectNode(node, address)
190

    
191
  def ConnectNode(self, name, address=None):
192
    """Add a node to the target list.
193

194
    @type name: str
195
    @param name: the node name
196
    @type address: str
197
    @keyword address: the node address, if known
198

199
    """
200
    if address is None:
201
      address = name
202

    
203
    self.nc[name] = \
204
      http.client.HttpClientRequest(address, self.port, http.HTTP_PUT,
205
                                    "/%s" % self.procedure,
206
                                    post_data=self.body,
207
                                    ssl_params=self._ssl_params,
208
                                    ssl_verify_peer=True)
209

    
210
  def GetResults(self):
211
    """Call nodes and return results.
212

213
    @rtype: list
214
    @return: List of RPC results
215

216
    """
217
    assert _http_manager, "RPC module not initialized"
218

    
219
    _http_manager.ExecRequests(self.nc.values())
220

    
221
    results = {}
222

    
223
    for name, req in self.nc.iteritems():
224
      if req.success and req.resp_status_code == http.HTTP_OK:
225
        results[name] = RpcResult(data=serializer.LoadJson(req.resp_body),
226
                                  node=name, call=self.procedure)
227
        continue
228

    
229
      # TODO: Better error reporting
230
      if req.error:
231
        msg = req.error
232
      else:
233
        msg = req.resp_body
234

    
235
      logging.error("RPC error in %s from node %s: %s",
236
                    self.procedure, name, msg)
237
      results[name] = RpcResult(data=msg, failed=True, node=name,
238
                                call=self.procedure)
239

    
240
    return results
241

    
242

    
243
class RpcRunner(object):
244
  """RPC runner class"""
245

    
246
  def __init__(self, cfg):
247
    """Initialized the rpc runner.
248

249
    @type cfg:  C{config.ConfigWriter}
250
    @param cfg: the configuration object that will be used to get data
251
                about the cluster
252

253
    """
254
    self._cfg = cfg
255
    self.port = utils.GetDaemonPort(constants.NODED)
256

    
257
  def _InstDict(self, instance, hvp=None, bep=None):
258
    """Convert the given instance to a dict.
259

260
    This is done via the instance's ToDict() method and additionally
261
    we fill the hvparams with the cluster defaults.
262

263
    @type instance: L{objects.Instance}
264
    @param instance: an Instance object
265
    @type hvp: dict or None
266
    @param hvp: a dictionary with overridden hypervisor parameters
267
    @type bep: dict or None
268
    @param bep: a dictionary with overridden backend parameters
269
    @rtype: dict
270
    @return: the instance dict, with the hvparams filled with the
271
        cluster defaults
272

273
    """
274
    idict = instance.ToDict()
275
    cluster = self._cfg.GetClusterInfo()
276
    idict["hvparams"] = cluster.FillHV(instance)
277
    if hvp is not None:
278
      idict["hvparams"].update(hvp)
279
    idict["beparams"] = cluster.FillBE(instance)
280
    if bep is not None:
281
      idict["beparams"].update(bep)
282
    for nic in idict["nics"]:
283
      nic['nicparams'] = objects.FillDict(
284
        cluster.nicparams[constants.PP_DEFAULT],
285
        nic['nicparams'])
286
    return idict
287

    
288
  def _ConnectList(self, client, node_list, call):
289
    """Helper for computing node addresses.
290

291
    @type client: L{ganeti.rpc.Client}
292
    @param client: a C{Client} instance
293
    @type node_list: list
294
    @param node_list: the node list we should connect
295
    @type call: string
296
    @param call: the name of the remote procedure call, for filling in
297
        correctly any eventual offline nodes' results
298

299
    """
300
    all_nodes = self._cfg.GetAllNodesInfo()
301
    name_list = []
302
    addr_list = []
303
    skip_dict = {}
304
    for node in node_list:
305
      if node in all_nodes:
306
        if all_nodes[node].offline:
307
          skip_dict[node] = RpcResult(node=node, offline=True, call=call)
308
          continue
309
        val = all_nodes[node].primary_ip
310
      else:
311
        val = None
312
      addr_list.append(val)
313
      name_list.append(node)
314
    if name_list:
315
      client.ConnectList(name_list, address_list=addr_list)
316
    return skip_dict
317

    
318
  def _ConnectNode(self, client, node, call):
319
    """Helper for computing one node's address.
320

321
    @type client: L{ganeti.rpc.Client}
322
    @param client: a C{Client} instance
323
    @type node: str
324
    @param node: the node we should connect
325
    @type call: string
326
    @param call: the name of the remote procedure call, for filling in
327
        correctly any eventual offline nodes' results
328

329
    """
330
    node_info = self._cfg.GetNodeInfo(node)
331
    if node_info is not None:
332
      if node_info.offline:
333
        return RpcResult(node=node, offline=True, call=call)
334
      addr = node_info.primary_ip
335
    else:
336
      addr = None
337
    client.ConnectNode(node, address=addr)
338

    
339
  def _MultiNodeCall(self, node_list, procedure, args):
340
    """Helper for making a multi-node call
341

342
    """
343
    body = serializer.DumpJson(args, indent=False)
344
    c = Client(procedure, body, self.port)
345
    skip_dict = self._ConnectList(c, node_list, procedure)
346
    skip_dict.update(c.GetResults())
347
    return skip_dict
348

    
349
  @classmethod
350
  def _StaticMultiNodeCall(cls, node_list, procedure, args,
351
                           address_list=None):
352
    """Helper for making a multi-node static call
353

354
    """
355
    body = serializer.DumpJson(args, indent=False)
356
    c = Client(procedure, body, utils.GetDaemonPort(constants.NODED))
357
    c.ConnectList(node_list, address_list=address_list)
358
    return c.GetResults()
359

    
360
  def _SingleNodeCall(self, node, procedure, args):
361
    """Helper for making a single-node call
362

363
    """
364
    body = serializer.DumpJson(args, indent=False)
365
    c = Client(procedure, body, self.port)
366
    result = self._ConnectNode(c, node, procedure)
367
    if result is None:
368
      # we did connect, node is not offline
369
      result = c.GetResults()[node]
370
    return result
371

    
372
  @classmethod
373
  def _StaticSingleNodeCall(cls, node, procedure, args):
374
    """Helper for making a single-node static call
375

376
    """
377
    body = serializer.DumpJson(args, indent=False)
378
    c = Client(procedure, body, utils.GetDaemonPort(constants.NODED))
379
    c.ConnectNode(node)
380
    return c.GetResults()[node]
381

    
382
  @staticmethod
383
  def _Compress(data):
384
    """Compresses a string for transport over RPC.
385

386
    Small amounts of data are not compressed.
387

388
    @type data: str
389
    @param data: Data
390
    @rtype: tuple
391
    @return: Encoded data to send
392

393
    """
394
    # Small amounts of data are not compressed
395
    if len(data) < 512:
396
      return (constants.RPC_ENCODING_NONE, data)
397

    
398
    # Compress with zlib and encode in base64
399
    return (constants.RPC_ENCODING_ZLIB_BASE64,
400
            base64.b64encode(zlib.compress(data, 3)))
401

    
402
  #
403
  # Begin RPC calls
404
  #
405

    
406
  def call_lv_list(self, node_list, vg_name):
407
    """Gets the logical volumes present in a given volume group.
408

409
    This is a multi-node call.
410

411
    """
412
    return self._MultiNodeCall(node_list, "lv_list", [vg_name])
413

    
414
  def call_vg_list(self, node_list):
415
    """Gets the volume group list.
416

417
    This is a multi-node call.
418

419
    """
420
    return self._MultiNodeCall(node_list, "vg_list", [])
421

    
422
  def call_storage_list(self, node_list, su_name, su_args, name, fields):
423
    """Get list of storage units.
424

425
    This is a multi-node call.
426

427
    """
428
    return self._MultiNodeCall(node_list, "storage_list",
429
                               [su_name, su_args, name, fields])
430

    
431
  def call_storage_modify(self, node, su_name, su_args, name, changes):
432
    """Modify a storage unit.
433

434
    This is a single-node call.
435

436
    """
437
    return self._SingleNodeCall(node, "storage_modify",
438
                                [su_name, su_args, name, changes])
439

    
440
  def call_storage_execute(self, node, su_name, su_args, name, op):
441
    """Executes an operation on a storage unit.
442

443
    This is a single-node call.
444

445
    """
446
    return self._SingleNodeCall(node, "storage_execute",
447
                                [su_name, su_args, name, op])
448

    
449
  def call_bridges_exist(self, node, bridges_list):
450
    """Checks if a node has all the bridges given.
451

452
    This method checks if all bridges given in the bridges_list are
453
    present on the remote node, so that an instance that uses interfaces
454
    on those bridges can be started.
455

456
    This is a single-node call.
457

458
    """
459
    return self._SingleNodeCall(node, "bridges_exist", [bridges_list])
460

    
461
  def call_instance_start(self, node, instance, hvp, bep):
462
    """Starts an instance.
463

464
    This is a single-node call.
465

466
    """
467
    idict = self._InstDict(instance, hvp=hvp, bep=bep)
468
    return self._SingleNodeCall(node, "instance_start", [idict])
469

    
470
  def call_instance_shutdown(self, node, instance):
471
    """Stops an instance.
472

473
    This is a single-node call.
474

475
    """
476
    return self._SingleNodeCall(node, "instance_shutdown",
477
                                [self._InstDict(instance)])
478

    
479
  def call_migration_info(self, node, instance):
480
    """Gather the information necessary to prepare an instance migration.
481

482
    This is a single-node call.
483

484
    @type node: string
485
    @param node: the node on which the instance is currently running
486
    @type instance: C{objects.Instance}
487
    @param instance: the instance definition
488

489
    """
490
    return self._SingleNodeCall(node, "migration_info",
491
                                [self._InstDict(instance)])
492

    
493
  def call_accept_instance(self, node, instance, info, target):
494
    """Prepare a node to accept an instance.
495

496
    This is a single-node call.
497

498
    @type node: string
499
    @param node: the target node for the migration
500
    @type instance: C{objects.Instance}
501
    @param instance: the instance definition
502
    @type info: opaque/hypervisor specific (string/data)
503
    @param info: result for the call_migration_info call
504
    @type target: string
505
    @param target: target hostname (usually ip address) (on the node itself)
506

507
    """
508
    return self._SingleNodeCall(node, "accept_instance",
509
                                [self._InstDict(instance), info, target])
510

    
511
  def call_finalize_migration(self, node, instance, info, success):
512
    """Finalize any target-node migration specific operation.
513

514
    This is called both in case of a successful migration and in case of error
515
    (in which case it should abort the migration).
516

517
    This is a single-node call.
518

519
    @type node: string
520
    @param node: the target node for the migration
521
    @type instance: C{objects.Instance}
522
    @param instance: the instance definition
523
    @type info: opaque/hypervisor specific (string/data)
524
    @param info: result for the call_migration_info call
525
    @type success: boolean
526
    @param success: whether the migration was a success or a failure
527

528
    """
529
    return self._SingleNodeCall(node, "finalize_migration",
530
                                [self._InstDict(instance), info, success])
531

    
532
  def call_instance_migrate(self, node, instance, target, live):
533
    """Migrate an instance.
534

535
    This is a single-node call.
536

537
    @type node: string
538
    @param node: the node on which the instance is currently running
539
    @type instance: C{objects.Instance}
540
    @param instance: the instance definition
541
    @type target: string
542
    @param target: the target node name
543
    @type live: boolean
544
    @param live: whether the migration should be done live or not (the
545
        interpretation of this parameter is left to the hypervisor)
546

547
    """
548
    return self._SingleNodeCall(node, "instance_migrate",
549
                                [self._InstDict(instance), target, live])
550

    
551
  def call_instance_reboot(self, node, instance, reboot_type):
552
    """Reboots an instance.
553

554
    This is a single-node call.
555

556
    """
557
    return self._SingleNodeCall(node, "instance_reboot",
558
                                [self._InstDict(instance), reboot_type])
559

    
560
  def call_instance_os_add(self, node, inst, reinstall):
561
    """Installs an OS on the given instance.
562

563
    This is a single-node call.
564

565
    """
566
    return self._SingleNodeCall(node, "instance_os_add",
567
                                [self._InstDict(inst), reinstall])
568

    
569
  def call_instance_run_rename(self, node, inst, old_name):
570
    """Run the OS rename script for an instance.
571

572
    This is a single-node call.
573

574
    """
575
    return self._SingleNodeCall(node, "instance_run_rename",
576
                                [self._InstDict(inst), old_name])
577

    
578
  def call_instance_info(self, node, instance, hname):
579
    """Returns information about a single instance.
580

581
    This is a single-node call.
582

583
    @type node: list
584
    @param node: the list of nodes to query
585
    @type instance: string
586
    @param instance: the instance name
587
    @type hname: string
588
    @param hname: the hypervisor type of the instance
589

590
    """
591
    return self._SingleNodeCall(node, "instance_info", [instance, hname])
592

    
593
  def call_instance_migratable(self, node, instance):
594
    """Checks whether the given instance can be migrated.
595

596
    This is a single-node call.
597

598
    @param node: the node to query
599
    @type instance: L{objects.Instance}
600
    @param instance: the instance to check
601

602

603
    """
604
    return self._SingleNodeCall(node, "instance_migratable",
605
                                [self._InstDict(instance)])
606

    
607
  def call_all_instances_info(self, node_list, hypervisor_list):
608
    """Returns information about all instances on the given nodes.
609

610
    This is a multi-node call.
611

612
    @type node_list: list
613
    @param node_list: the list of nodes to query
614
    @type hypervisor_list: list
615
    @param hypervisor_list: the hypervisors to query for instances
616

617
    """
618
    return self._MultiNodeCall(node_list, "all_instances_info",
619
                               [hypervisor_list])
620

    
621
  def call_instance_list(self, node_list, hypervisor_list):
622
    """Returns the list of running instances on a given node.
623

624
    This is a multi-node call.
625

626
    @type node_list: list
627
    @param node_list: the list of nodes to query
628
    @type hypervisor_list: list
629
    @param hypervisor_list: the hypervisors to query for instances
630

631
    """
632
    return self._MultiNodeCall(node_list, "instance_list", [hypervisor_list])
633

    
634
  def call_node_tcp_ping(self, node, source, target, port, timeout,
635
                         live_port_needed):
636
    """Do a TcpPing on the remote node
637

638
    This is a single-node call.
639

640
    """
641
    return self._SingleNodeCall(node, "node_tcp_ping",
642
                                [source, target, port, timeout,
643
                                 live_port_needed])
644

    
645
  def call_node_has_ip_address(self, node, address):
646
    """Checks if a node has the given IP address.
647

648
    This is a single-node call.
649

650
    """
651
    return self._SingleNodeCall(node, "node_has_ip_address", [address])
652

    
653
  def call_node_info(self, node_list, vg_name, hypervisor_type):
654
    """Return node information.
655

656
    This will return memory information and volume group size and free
657
    space.
658

659
    This is a multi-node call.
660

661
    @type node_list: list
662
    @param node_list: the list of nodes to query
663
    @type vg_name: C{string}
664
    @param vg_name: the name of the volume group to ask for disk space
665
        information
666
    @type hypervisor_type: C{str}
667
    @param hypervisor_type: the name of the hypervisor to ask for
668
        memory information
669

670
    """
671
    return self._MultiNodeCall(node_list, "node_info",
672
                               [vg_name, hypervisor_type])
673

    
674
  def call_node_add(self, node, dsa, dsapub, rsa, rsapub, ssh, sshpub):
675
    """Add a node to the cluster.
676

677
    This is a single-node call.
678

679
    """
680
    return self._SingleNodeCall(node, "node_add",
681
                                [dsa, dsapub, rsa, rsapub, ssh, sshpub])
682

    
683
  def call_node_verify(self, node_list, checkdict, cluster_name):
684
    """Request verification of given parameters.
685

686
    This is a multi-node call.
687

688
    """
689
    return self._MultiNodeCall(node_list, "node_verify",
690
                               [checkdict, cluster_name])
691

    
692
  @classmethod
693
  def call_node_start_master(cls, node, start_daemons, no_voting):
694
    """Tells a node to activate itself as a master.
695

696
    This is a single-node call.
697

698
    """
699
    return cls._StaticSingleNodeCall(node, "node_start_master",
700
                                     [start_daemons, no_voting])
701

    
702
  @classmethod
703
  def call_node_stop_master(cls, node, stop_daemons):
704
    """Tells a node to demote itself from master status.
705

706
    This is a single-node call.
707

708
    """
709
    return cls._StaticSingleNodeCall(node, "node_stop_master", [stop_daemons])
710

    
711
  @classmethod
712
  def call_master_info(cls, node_list):
713
    """Query master info.
714

715
    This is a multi-node call.
716

717
    """
718
    # TODO: should this method query down nodes?
719
    return cls._StaticMultiNodeCall(node_list, "master_info", [])
720

    
721
  def call_version(self, node_list):
722
    """Query node version.
723

724
    This is a multi-node call.
725

726
    """
727
    return self._MultiNodeCall(node_list, "version", [])
728

    
729
  def call_blockdev_create(self, node, bdev, size, owner, on_primary, info):
730
    """Request creation of a given block device.
731

732
    This is a single-node call.
733

734
    """
735
    return self._SingleNodeCall(node, "blockdev_create",
736
                                [bdev.ToDict(), size, owner, on_primary, info])
737

    
738
  def call_blockdev_remove(self, node, bdev):
739
    """Request removal of a given block device.
740

741
    This is a single-node call.
742

743
    """
744
    return self._SingleNodeCall(node, "blockdev_remove", [bdev.ToDict()])
745

    
746
  def call_blockdev_rename(self, node, devlist):
747
    """Request rename of the given block devices.
748

749
    This is a single-node call.
750

751
    """
752
    return self._SingleNodeCall(node, "blockdev_rename",
753
                                [(d.ToDict(), uid) for d, uid in devlist])
754

    
755
  def call_blockdev_assemble(self, node, disk, owner, on_primary):
756
    """Request assembling of a given block device.
757

758
    This is a single-node call.
759

760
    """
761
    return self._SingleNodeCall(node, "blockdev_assemble",
762
                                [disk.ToDict(), owner, on_primary])
763

    
764
  def call_blockdev_shutdown(self, node, disk):
765
    """Request shutdown of a given block device.
766

767
    This is a single-node call.
768

769
    """
770
    return self._SingleNodeCall(node, "blockdev_shutdown", [disk.ToDict()])
771

    
772
  def call_blockdev_addchildren(self, node, bdev, ndevs):
773
    """Request adding a list of children to a (mirroring) device.
774

775
    This is a single-node call.
776

777
    """
778
    return self._SingleNodeCall(node, "blockdev_addchildren",
779
                                [bdev.ToDict(),
780
                                 [disk.ToDict() for disk in ndevs]])
781

    
782
  def call_blockdev_removechildren(self, node, bdev, ndevs):
783
    """Request removing a list of children from a (mirroring) device.
784

785
    This is a single-node call.
786

787
    """
788
    return self._SingleNodeCall(node, "blockdev_removechildren",
789
                                [bdev.ToDict(),
790
                                 [disk.ToDict() for disk in ndevs]])
791

    
792
  def call_blockdev_getmirrorstatus(self, node, disks):
793
    """Request status of a (mirroring) device.
794

795
    This is a single-node call.
796

797
    """
798
    result = self._SingleNodeCall(node, "blockdev_getmirrorstatus",
799
                                  [dsk.ToDict() for dsk in disks])
800
    if not result.fail_msg:
801
      result.payload = [objects.BlockDevStatus.FromDict(i)
802
                        for i in result.payload]
803
    return result
804

    
805
  def call_blockdev_find(self, node, disk):
806
    """Request identification of a given block device.
807

808
    This is a single-node call.
809

810
    """
811
    result = self._SingleNodeCall(node, "blockdev_find", [disk.ToDict()])
812
    if not result.fail_msg and result.payload is not None:
813
      result.payload = objects.BlockDevStatus.FromDict(result.payload)
814
    return result
815

    
816
  def call_blockdev_close(self, node, instance_name, disks):
817
    """Closes the given block devices.
818

819
    This is a single-node call.
820

821
    """
822
    params = [instance_name, [cf.ToDict() for cf in disks]]
823
    return self._SingleNodeCall(node, "blockdev_close", params)
824

    
825
  def call_blockdev_getsizes(self, node, disks):
826
    """Returns the size of the given disks.
827

828
    This is a single-node call.
829

830
    """
831
    params = [[cf.ToDict() for cf in disks]]
832
    return self._SingleNodeCall(node, "blockdev_getsize", params)
833

    
834
  def call_drbd_disconnect_net(self, node_list, nodes_ip, disks):
835
    """Disconnects the network of the given drbd devices.
836

837
    This is a multi-node call.
838

839
    """
840
    return self._MultiNodeCall(node_list, "drbd_disconnect_net",
841
                               [nodes_ip, [cf.ToDict() for cf in disks]])
842

    
843
  def call_drbd_attach_net(self, node_list, nodes_ip,
844
                           disks, instance_name, multimaster):
845
    """Disconnects the given drbd devices.
846

847
    This is a multi-node call.
848

849
    """
850
    return self._MultiNodeCall(node_list, "drbd_attach_net",
851
                               [nodes_ip, [cf.ToDict() for cf in disks],
852
                                instance_name, multimaster])
853

    
854
  def call_drbd_wait_sync(self, node_list, nodes_ip, disks):
855
    """Waits for the synchronization of drbd devices is complete.
856

857
    This is a multi-node call.
858

859
    """
860
    return self._MultiNodeCall(node_list, "drbd_wait_sync",
861
                               [nodes_ip, [cf.ToDict() for cf in disks]])
862

    
863
  @classmethod
864
  def call_upload_file(cls, node_list, file_name, address_list=None):
865
    """Upload a file.
866

867
    The node will refuse the operation in case the file is not on the
868
    approved file list.
869

870
    This is a multi-node call.
871

872
    @type node_list: list
873
    @param node_list: the list of node names to upload to
874
    @type file_name: str
875
    @param file_name: the filename to upload
876
    @type address_list: list or None
877
    @keyword address_list: an optional list of node addresses, in order
878
        to optimize the RPC speed
879

880
    """
881
    file_contents = utils.ReadFile(file_name)
882
    data = cls._Compress(file_contents)
883
    st = os.stat(file_name)
884
    params = [file_name, data, st.st_mode, st.st_uid, st.st_gid,
885
              st.st_atime, st.st_mtime]
886
    return cls._StaticMultiNodeCall(node_list, "upload_file", params,
887
                                    address_list=address_list)
888

    
889
  @classmethod
890
  def call_write_ssconf_files(cls, node_list, values):
891
    """Write ssconf files.
892

893
    This is a multi-node call.
894

895
    """
896
    return cls._StaticMultiNodeCall(node_list, "write_ssconf_files", [values])
897

    
898
  def call_os_diagnose(self, node_list):
899
    """Request a diagnose of OS definitions.
900

901
    This is a multi-node call.
902

903
    """
904
    return self._MultiNodeCall(node_list, "os_diagnose", [])
905

    
906
  def call_os_get(self, node, name):
907
    """Returns an OS definition.
908

909
    This is a single-node call.
910

911
    """
912
    result = self._SingleNodeCall(node, "os_get", [name])
913
    if not result.fail_msg and isinstance(result.data, dict):
914
      result.data = objects.OS.FromDict(result.data)
915
    return result
916

    
917
  def call_hooks_runner(self, node_list, hpath, phase, env):
918
    """Call the hooks runner.
919

920
    Args:
921
      - op: the OpCode instance
922
      - env: a dictionary with the environment
923

924
    This is a multi-node call.
925

926
    """
927
    params = [hpath, phase, env]
928
    return self._MultiNodeCall(node_list, "hooks_runner", params)
929

    
930
  def call_iallocator_runner(self, node, name, idata):
931
    """Call an iallocator on a remote node
932

933
    Args:
934
      - name: the iallocator name
935
      - input: the json-encoded input string
936

937
    This is a single-node call.
938

939
    """
940
    return self._SingleNodeCall(node, "iallocator_runner", [name, idata])
941

    
942
  def call_blockdev_grow(self, node, cf_bdev, amount):
943
    """Request a snapshot of the given block device.
944

945
    This is a single-node call.
946

947
    """
948
    return self._SingleNodeCall(node, "blockdev_grow",
949
                                [cf_bdev.ToDict(), amount])
950

    
951
  def call_blockdev_export(self, node, cf_bdev,
952
                           dest_node, dest_path, cluster_name):
953
    """Export a given disk to another node.
954

955
    This is a single-node call.
956

957
    """
958
    return self._SingleNodeCall(node, "blockdev_export",
959
                                [cf_bdev.ToDict(), dest_node, dest_path,
960
                                 cluster_name])
961

    
962
  def call_blockdev_snapshot(self, node, cf_bdev):
963
    """Request a snapshot of the given block device.
964

965
    This is a single-node call.
966

967
    """
968
    return self._SingleNodeCall(node, "blockdev_snapshot", [cf_bdev.ToDict()])
969

    
970
  def call_snapshot_export(self, node, snap_bdev, dest_node, instance,
971
                           cluster_name, idx):
972
    """Request the export of a given snapshot.
973

974
    This is a single-node call.
975

976
    """
977
    return self._SingleNodeCall(node, "snapshot_export",
978
                                [snap_bdev.ToDict(), dest_node,
979
                                 self._InstDict(instance), cluster_name, idx])
980

    
981
  def call_finalize_export(self, node, instance, snap_disks):
982
    """Request the completion of an export operation.
983

984
    This writes the export config file, etc.
985

986
    This is a single-node call.
987

988
    """
989
    flat_disks = []
990
    for disk in snap_disks:
991
      if isinstance(disk, bool):
992
        flat_disks.append(disk)
993
      else:
994
        flat_disks.append(disk.ToDict())
995

    
996
    return self._SingleNodeCall(node, "finalize_export",
997
                                [self._InstDict(instance), flat_disks])
998

    
999
  def call_export_info(self, node, path):
1000
    """Queries the export information in a given path.
1001

1002
    This is a single-node call.
1003

1004
    """
1005
    return self._SingleNodeCall(node, "export_info", [path])
1006

    
1007
  def call_instance_os_import(self, node, inst, src_node, src_images,
1008
                              cluster_name):
1009
    """Request the import of a backup into an instance.
1010

1011
    This is a single-node call.
1012

1013
    """
1014
    return self._SingleNodeCall(node, "instance_os_import",
1015
                                [self._InstDict(inst), src_node, src_images,
1016
                                 cluster_name])
1017

    
1018
  def call_export_list(self, node_list):
1019
    """Gets the stored exports list.
1020

1021
    This is a multi-node call.
1022

1023
    """
1024
    return self._MultiNodeCall(node_list, "export_list", [])
1025

    
1026
  def call_export_remove(self, node, export):
1027
    """Requests removal of a given export.
1028

1029
    This is a single-node call.
1030

1031
    """
1032
    return self._SingleNodeCall(node, "export_remove", [export])
1033

    
1034
  @classmethod
1035
  def call_node_leave_cluster(cls, node):
1036
    """Requests a node to clean the cluster information it has.
1037

1038
    This will remove the configuration information from the ganeti data
1039
    dir.
1040

1041
    This is a single-node call.
1042

1043
    """
1044
    return cls._StaticSingleNodeCall(node, "node_leave_cluster", [])
1045

    
1046
  def call_node_volumes(self, node_list):
1047
    """Gets all volumes on node(s).
1048

1049
    This is a multi-node call.
1050

1051
    """
1052
    return self._MultiNodeCall(node_list, "node_volumes", [])
1053

    
1054
  def call_node_demote_from_mc(self, node):
1055
    """Demote a node from the master candidate role.
1056

1057
    This is a single-node call.
1058

1059
    """
1060
    return self._SingleNodeCall(node, "node_demote_from_mc", [])
1061

    
1062

    
1063
  def call_node_powercycle(self, node, hypervisor):
1064
    """Tries to powercycle a node.
1065

1066
    This is a single-node call.
1067

1068
    """
1069
    return self._SingleNodeCall(node, "node_powercycle", [hypervisor])
1070

    
1071

    
1072
  def call_test_delay(self, node_list, duration):
1073
    """Sleep for a fixed time on given node(s).
1074

1075
    This is a multi-node call.
1076

1077
    """
1078
    return self._MultiNodeCall(node_list, "test_delay", [duration])
1079

    
1080
  def call_file_storage_dir_create(self, node, file_storage_dir):
1081
    """Create the given file storage directory.
1082

1083
    This is a single-node call.
1084

1085
    """
1086
    return self._SingleNodeCall(node, "file_storage_dir_create",
1087
                                [file_storage_dir])
1088

    
1089
  def call_file_storage_dir_remove(self, node, file_storage_dir):
1090
    """Remove the given file storage directory.
1091

1092
    This is a single-node call.
1093

1094
    """
1095
    return self._SingleNodeCall(node, "file_storage_dir_remove",
1096
                                [file_storage_dir])
1097

    
1098
  def call_file_storage_dir_rename(self, node, old_file_storage_dir,
1099
                                   new_file_storage_dir):
1100
    """Rename file storage directory.
1101

1102
    This is a single-node call.
1103

1104
    """
1105
    return self._SingleNodeCall(node, "file_storage_dir_rename",
1106
                                [old_file_storage_dir, new_file_storage_dir])
1107

    
1108
  @classmethod
1109
  def call_jobqueue_update(cls, node_list, address_list, file_name, content):
1110
    """Update job queue.
1111

1112
    This is a multi-node call.
1113

1114
    """
1115
    return cls._StaticMultiNodeCall(node_list, "jobqueue_update",
1116
                                    [file_name, cls._Compress(content)],
1117
                                    address_list=address_list)
1118

    
1119
  @classmethod
1120
  def call_jobqueue_purge(cls, node):
1121
    """Purge job queue.
1122

1123
    This is a single-node call.
1124

1125
    """
1126
    return cls._StaticSingleNodeCall(node, "jobqueue_purge", [])
1127

    
1128
  @classmethod
1129
  def call_jobqueue_rename(cls, node_list, address_list, rename):
1130
    """Rename a job queue file.
1131

1132
    This is a multi-node call.
1133

1134
    """
1135
    return cls._StaticMultiNodeCall(node_list, "jobqueue_rename", rename,
1136
                                    address_list=address_list)
1137

    
1138
  @classmethod
1139
  def call_jobqueue_set_drain(cls, node_list, drain_flag):
1140
    """Set the drain flag on the queue.
1141

1142
    This is a multi-node call.
1143

1144
    @type node_list: list
1145
    @param node_list: the list of nodes to query
1146
    @type drain_flag: bool
1147
    @param drain_flag: if True, will set the drain flag, otherwise reset it.
1148

1149
    """
1150
    return cls._StaticMultiNodeCall(node_list, "jobqueue_set_drain",
1151
                                    [drain_flag])
1152

    
1153
  def call_hypervisor_validate_params(self, node_list, hvname, hvparams):
1154
    """Validate the hypervisor params.
1155

1156
    This is a multi-node call.
1157

1158
    @type node_list: list
1159
    @param node_list: the list of nodes to query
1160
    @type hvname: string
1161
    @param hvname: the hypervisor name
1162
    @type hvparams: dict
1163
    @param hvparams: the hypervisor parameters to be validated
1164

1165
    """
1166
    cluster = self._cfg.GetClusterInfo()
1167
    hv_full = objects.FillDict(cluster.hvparams.get(hvname, {}), hvparams)
1168
    return self._MultiNodeCall(node_list, "hypervisor_validate_params",
1169
                               [hvname, hv_full])