Statistics
| Branch: | Tag: | Revision:

root / lib / rpc.py @ e4335b5b

History | View | Annotate | Download (34.4 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Inter-node RPC library.
23

24
"""
25

    
26
# pylint: disable-msg=C0103,R0201,R0904
27
# C0103: Invalid name, since call_ are not valid
28
# R0201: Method could be a function, we keep all rpcs instance methods
29
# as not to change them back and forth between static/instance methods
30
# if they need to start using instance attributes
31
# R0904: Too many public methods
32

    
33
import os
34
import logging
35
import zlib
36
import base64
37

    
38
from ganeti import utils
39
from ganeti import objects
40
from ganeti import http
41
from ganeti import serializer
42
from ganeti import constants
43
from ganeti import errors
44

    
45
import ganeti.http.client
46

    
47

    
48
# Module level variable
49
_http_manager = None
50

    
51

    
52
def Init():
53
  """Initializes the module-global HTTP client manager.
54

55
  Must be called before using any RPC function.
56

57
  """
58
  global _http_manager
59

    
60
  assert not _http_manager, "RPC module initialized more than once"
61

    
62
  _http_manager = http.client.HttpClientManager()
63

    
64

    
65
def Shutdown():
66
  """Stops the module-global HTTP client manager.
67

68
  Must be called before quitting the program.
69

70
  """
71
  global _http_manager
72

    
73
  if _http_manager:
74
    _http_manager.Shutdown()
75
    _http_manager = None
76

    
77

    
78
class RpcResult(object):
79
  """RPC Result class.
80

81
  This class holds an RPC result. It is needed since in multi-node
82
  calls we can't raise an exception just because one one out of many
83
  failed, and therefore we use this class to encapsulate the result.
84

85
  @ivar data: the data payload, for successful results, or None
86
  @ivar call: the name of the RPC call
87
  @ivar node: the name of the node to which we made the call
88
  @ivar offline: whether the operation failed because the node was
89
      offline, as opposed to actual failure; offline=True will always
90
      imply failed=True, in order to allow simpler checking if
91
      the user doesn't care about the exact failure mode
92
  @ivar fail_msg: the error message if the call failed
93

94
  """
95
  def __init__(self, data=None, failed=False, offline=False,
96
               call=None, node=None):
97
    self.offline = offline
98
    self.call = call
99
    self.node = node
100
    if offline:
101
      self.fail_msg = "Node is marked offline"
102
      self.data = self.payload = None
103
    elif failed:
104
      self.fail_msg = self._EnsureErr(data)
105
      self.data = self.payload = None
106
    else:
107
      self.data = data
108
      if not isinstance(self.data, (tuple, list)):
109
        self.fail_msg = ("RPC layer error: invalid result type (%s)" %
110
                         type(self.data))
111
      elif len(data) != 2:
112
        self.fail_msg = ("RPC layer error: invalid result length (%d), "
113
                         "expected 2" % len(self.data))
114
      elif not self.data[0]:
115
        self.fail_msg = self._EnsureErr(self.data[1])
116
      else:
117
        # finally success
118
        self.fail_msg = None
119
        self.payload = data[1]
120

    
121
  @staticmethod
122
  def _EnsureErr(val):
123
    """Helper to ensure we return a 'True' value for error."""
124
    if val:
125
      return val
126
    else:
127
      return "No error information"
128

    
129
  def Raise(self, msg, prereq=False):
130
    """If the result has failed, raise an OpExecError.
131

132
    This is used so that LU code doesn't have to check for each
133
    result, but instead can call this function.
134

135
    """
136
    if not self.fail_msg:
137
      return
138

    
139
    if not msg: # one could pass None for default message
140
      msg = ("Call '%s' to node '%s' has failed: %s" %
141
             (self.call, self.node, self.fail_msg))
142
    else:
143
      msg = "%s: %s" % (msg, self.fail_msg)
144
    if prereq:
145
      ec = errors.OpPrereqError
146
    else:
147
      ec = errors.OpExecError
148
    raise ec(msg)
149

    
150

    
151
class Client:
152
  """RPC Client class.
153

154
  This class, given a (remote) method name, a list of parameters and a
155
  list of nodes, will contact (in parallel) all nodes, and return a
156
  dict of results (key: node name, value: result).
157

158
  One current bug is that generic failure is still signaled by
159
  'False' result, which is not good. This overloading of values can
160
  cause bugs.
161

162
  """
163
  def __init__(self, procedure, body, port):
164
    self.procedure = procedure
165
    self.body = body
166
    self.port = port
167
    self.nc = {}
168

    
169
    self._ssl_params = \
170
      http.HttpSslParams(ssl_key_path=constants.SSL_CERT_FILE,
171
                         ssl_cert_path=constants.SSL_CERT_FILE)
172

    
173
  def ConnectList(self, node_list, address_list=None):
174
    """Add a list of nodes to the target nodes.
175

176
    @type node_list: list
177
    @param node_list: the list of node names to connect
178
    @type address_list: list or None
179
    @keyword address_list: either None or a list with node addresses,
180
        which must have the same length as the node list
181

182
    """
183
    if address_list is None:
184
      address_list = [None for _ in node_list]
185
    else:
186
      assert len(node_list) == len(address_list), \
187
             "Name and address lists should have the same length"
188
    for node, address in zip(node_list, address_list):
189
      self.ConnectNode(node, address)
190

    
191
  def ConnectNode(self, name, address=None):
192
    """Add a node to the target list.
193

194
    @type name: str
195
    @param name: the node name
196
    @type address: str
197
    @keyword address: the node address, if known
198

199
    """
200
    if address is None:
201
      address = name
202

    
203
    self.nc[name] = \
204
      http.client.HttpClientRequest(address, self.port, http.HTTP_PUT,
205
                                    "/%s" % self.procedure,
206
                                    post_data=self.body,
207
                                    ssl_params=self._ssl_params,
208
                                    ssl_verify_peer=True)
209

    
210
  def GetResults(self):
211
    """Call nodes and return results.
212

213
    @rtype: list
214
    @return: List of RPC results
215

216
    """
217
    assert _http_manager, "RPC module not initialized"
218

    
219
    _http_manager.ExecRequests(self.nc.values())
220

    
221
    results = {}
222

    
223
    for name, req in self.nc.iteritems():
224
      if req.success and req.resp_status_code == http.HTTP_OK:
225
        results[name] = RpcResult(data=serializer.LoadJson(req.resp_body),
226
                                  node=name, call=self.procedure)
227
        continue
228

    
229
      # TODO: Better error reporting
230
      if req.error:
231
        msg = req.error
232
      else:
233
        msg = req.resp_body
234

    
235
      logging.error("RPC error in %s from node %s: %s",
236
                    self.procedure, name, msg)
237
      results[name] = RpcResult(data=msg, failed=True, node=name,
238
                                call=self.procedure)
239

    
240
    return results
241

    
242

    
243
class RpcRunner(object):
244
  """RPC runner class"""
245

    
246
  def __init__(self, cfg):
247
    """Initialized the rpc runner.
248

249
    @type cfg:  C{config.ConfigWriter}
250
    @param cfg: the configuration object that will be used to get data
251
                about the cluster
252

253
    """
254
    self._cfg = cfg
255
    self.port = utils.GetDaemonPort(constants.NODED)
256

    
257
  def _InstDict(self, instance, hvp=None, bep=None):
258
    """Convert the given instance to a dict.
259

260
    This is done via the instance's ToDict() method and additionally
261
    we fill the hvparams with the cluster defaults.
262

263
    @type instance: L{objects.Instance}
264
    @param instance: an Instance object
265
    @type hvp: dict or None
266
    @param hvp: a dictionary with overridden hypervisor parameters
267
    @type bep: dict or None
268
    @param bep: a dictionary with overridden backend parameters
269
    @rtype: dict
270
    @return: the instance dict, with the hvparams filled with the
271
        cluster defaults
272

273
    """
274
    idict = instance.ToDict()
275
    cluster = self._cfg.GetClusterInfo()
276
    idict["hvparams"] = cluster.FillHV(instance)
277
    if hvp is not None:
278
      idict["hvparams"].update(hvp)
279
    idict["beparams"] = cluster.FillBE(instance)
280
    if bep is not None:
281
      idict["beparams"].update(bep)
282
    for nic in idict["nics"]:
283
      nic['nicparams'] = objects.FillDict(
284
        cluster.nicparams[constants.PP_DEFAULT],
285
        nic['nicparams'])
286
    return idict
287

    
288
  def _ConnectList(self, client, node_list, call):
289
    """Helper for computing node addresses.
290

291
    @type client: L{ganeti.rpc.Client}
292
    @param client: a C{Client} instance
293
    @type node_list: list
294
    @param node_list: the node list we should connect
295
    @type call: string
296
    @param call: the name of the remote procedure call, for filling in
297
        correctly any eventual offline nodes' results
298

299
    """
300
    all_nodes = self._cfg.GetAllNodesInfo()
301
    name_list = []
302
    addr_list = []
303
    skip_dict = {}
304
    for node in node_list:
305
      if node in all_nodes:
306
        if all_nodes[node].offline:
307
          skip_dict[node] = RpcResult(node=node, offline=True, call=call)
308
          continue
309
        val = all_nodes[node].primary_ip
310
      else:
311
        val = None
312
      addr_list.append(val)
313
      name_list.append(node)
314
    if name_list:
315
      client.ConnectList(name_list, address_list=addr_list)
316
    return skip_dict
317

    
318
  def _ConnectNode(self, client, node, call):
319
    """Helper for computing one node's address.
320

321
    @type client: L{ganeti.rpc.Client}
322
    @param client: a C{Client} instance
323
    @type node: str
324
    @param node: the node we should connect
325
    @type call: string
326
    @param call: the name of the remote procedure call, for filling in
327
        correctly any eventual offline nodes' results
328

329
    """
330
    node_info = self._cfg.GetNodeInfo(node)
331
    if node_info is not None:
332
      if node_info.offline:
333
        return RpcResult(node=node, offline=True, call=call)
334
      addr = node_info.primary_ip
335
    else:
336
      addr = None
337
    client.ConnectNode(node, address=addr)
338

    
339
  def _MultiNodeCall(self, node_list, procedure, args):
340
    """Helper for making a multi-node call
341

342
    """
343
    body = serializer.DumpJson(args, indent=False)
344
    c = Client(procedure, body, self.port)
345
    skip_dict = self._ConnectList(c, node_list, procedure)
346
    skip_dict.update(c.GetResults())
347
    return skip_dict
348

    
349
  @classmethod
350
  def _StaticMultiNodeCall(cls, node_list, procedure, args,
351
                           address_list=None):
352
    """Helper for making a multi-node static call
353

354
    """
355
    body = serializer.DumpJson(args, indent=False)
356
    c = Client(procedure, body, utils.GetDaemonPort(constants.NODED))
357
    c.ConnectList(node_list, address_list=address_list)
358
    return c.GetResults()
359

    
360
  def _SingleNodeCall(self, node, procedure, args):
361
    """Helper for making a single-node call
362

363
    """
364
    body = serializer.DumpJson(args, indent=False)
365
    c = Client(procedure, body, self.port)
366
    result = self._ConnectNode(c, node, procedure)
367
    if result is None:
368
      # we did connect, node is not offline
369
      result = c.GetResults()[node]
370
    return result
371

    
372
  @classmethod
373
  def _StaticSingleNodeCall(cls, node, procedure, args):
374
    """Helper for making a single-node static call
375

376
    """
377
    body = serializer.DumpJson(args, indent=False)
378
    c = Client(procedure, body, utils.GetDaemonPort(constants.NODED))
379
    c.ConnectNode(node)
380
    return c.GetResults()[node]
381

    
382
  @staticmethod
383
  def _Compress(data):
384
    """Compresses a string for transport over RPC.
385

386
    Small amounts of data are not compressed.
387

388
    @type data: str
389
    @param data: Data
390
    @rtype: tuple
391
    @return: Encoded data to send
392

393
    """
394
    # Small amounts of data are not compressed
395
    if len(data) < 512:
396
      return (constants.RPC_ENCODING_NONE, data)
397

    
398
    # Compress with zlib and encode in base64
399
    return (constants.RPC_ENCODING_ZLIB_BASE64,
400
            base64.b64encode(zlib.compress(data, 3)))
401

    
402
  #
403
  # Begin RPC calls
404
  #
405

    
406
  def call_lv_list(self, node_list, vg_name):
407
    """Gets the logical volumes present in a given volume group.
408

409
    This is a multi-node call.
410

411
    """
412
    return self._MultiNodeCall(node_list, "lv_list", [vg_name])
413

    
414
  def call_vg_list(self, node_list):
415
    """Gets the volume group list.
416

417
    This is a multi-node call.
418

419
    """
420
    return self._MultiNodeCall(node_list, "vg_list", [])
421

    
422
  def call_storage_list(self, node_list, su_name, su_args, name, fields):
423
    """Get list of storage units.
424

425
    This is a multi-node call.
426

427
    """
428
    return self._MultiNodeCall(node_list, "storage_list",
429
                               [su_name, su_args, name, fields])
430

    
431
  def call_storage_modify(self, node, su_name, su_args, name, changes):
432
    """Modify a storage unit.
433

434
    This is a single-node call.
435

436
    """
437
    return self._SingleNodeCall(node, "storage_modify",
438
                                [su_name, su_args, name, changes])
439

    
440
  def call_storage_execute(self, node, su_name, su_args, name, op):
441
    """Executes an operation on a storage unit.
442

443
    This is a single-node call.
444

445
    """
446
    return self._SingleNodeCall(node, "storage_execute",
447
                                [su_name, su_args, name, op])
448

    
449
  def call_bridges_exist(self, node, bridges_list):
450
    """Checks if a node has all the bridges given.
451

452
    This method checks if all bridges given in the bridges_list are
453
    present on the remote node, so that an instance that uses interfaces
454
    on those bridges can be started.
455

456
    This is a single-node call.
457

458
    """
459
    return self._SingleNodeCall(node, "bridges_exist", [bridges_list])
460

    
461
  def call_instance_start(self, node, instance, hvp, bep):
462
    """Starts an instance.
463

464
    This is a single-node call.
465

466
    """
467
    idict = self._InstDict(instance, hvp=hvp, bep=bep)
468
    return self._SingleNodeCall(node, "instance_start", [idict])
469

    
470
  def call_instance_shutdown(self, node, instance, timeout):
471
    """Stops an instance.
472

473
    This is a single-node call.
474

475
    """
476
    return self._SingleNodeCall(node, "instance_shutdown",
477
                                [self._InstDict(instance), timeout])
478

    
479
  def call_migration_info(self, node, instance):
480
    """Gather the information necessary to prepare an instance migration.
481

482
    This is a single-node call.
483

484
    @type node: string
485
    @param node: the node on which the instance is currently running
486
    @type instance: C{objects.Instance}
487
    @param instance: the instance definition
488

489
    """
490
    return self._SingleNodeCall(node, "migration_info",
491
                                [self._InstDict(instance)])
492

    
493
  def call_accept_instance(self, node, instance, info, target):
494
    """Prepare a node to accept an instance.
495

496
    This is a single-node call.
497

498
    @type node: string
499
    @param node: the target node for the migration
500
    @type instance: C{objects.Instance}
501
    @param instance: the instance definition
502
    @type info: opaque/hypervisor specific (string/data)
503
    @param info: result for the call_migration_info call
504
    @type target: string
505
    @param target: target hostname (usually ip address) (on the node itself)
506

507
    """
508
    return self._SingleNodeCall(node, "accept_instance",
509
                                [self._InstDict(instance), info, target])
510

    
511
  def call_finalize_migration(self, node, instance, info, success):
512
    """Finalize any target-node migration specific operation.
513

514
    This is called both in case of a successful migration and in case of error
515
    (in which case it should abort the migration).
516

517
    This is a single-node call.
518

519
    @type node: string
520
    @param node: the target node for the migration
521
    @type instance: C{objects.Instance}
522
    @param instance: the instance definition
523
    @type info: opaque/hypervisor specific (string/data)
524
    @param info: result for the call_migration_info call
525
    @type success: boolean
526
    @param success: whether the migration was a success or a failure
527

528
    """
529
    return self._SingleNodeCall(node, "finalize_migration",
530
                                [self._InstDict(instance), info, success])
531

    
532
  def call_instance_migrate(self, node, instance, target, live):
533
    """Migrate an instance.
534

535
    This is a single-node call.
536

537
    @type node: string
538
    @param node: the node on which the instance is currently running
539
    @type instance: C{objects.Instance}
540
    @param instance: the instance definition
541
    @type target: string
542
    @param target: the target node name
543
    @type live: boolean
544
    @param live: whether the migration should be done live or not (the
545
        interpretation of this parameter is left to the hypervisor)
546

547
    """
548
    return self._SingleNodeCall(node, "instance_migrate",
549
                                [self._InstDict(instance), target, live])
550

    
551
  def call_instance_reboot(self, node, inst, reboot_type, shutdown_timeout):
552
    """Reboots an instance.
553

554
    This is a single-node call.
555

556
    """
557
    return self._SingleNodeCall(node, "instance_reboot",
558
                                [self._InstDict(inst), reboot_type,
559
                                 shutdown_timeout])
560

    
561
  def call_instance_os_add(self, node, inst, reinstall):
562
    """Installs an OS on the given instance.
563

564
    This is a single-node call.
565

566
    """
567
    return self._SingleNodeCall(node, "instance_os_add",
568
                                [self._InstDict(inst), reinstall])
569

    
570
  def call_instance_run_rename(self, node, inst, old_name):
571
    """Run the OS rename script for an instance.
572

573
    This is a single-node call.
574

575
    """
576
    return self._SingleNodeCall(node, "instance_run_rename",
577
                                [self._InstDict(inst), old_name])
578

    
579
  def call_instance_info(self, node, instance, hname):
580
    """Returns information about a single instance.
581

582
    This is a single-node call.
583

584
    @type node: list
585
    @param node: the list of nodes to query
586
    @type instance: string
587
    @param instance: the instance name
588
    @type hname: string
589
    @param hname: the hypervisor type of the instance
590

591
    """
592
    return self._SingleNodeCall(node, "instance_info", [instance, hname])
593

    
594
  def call_instance_migratable(self, node, instance):
595
    """Checks whether the given instance can be migrated.
596

597
    This is a single-node call.
598

599
    @param node: the node to query
600
    @type instance: L{objects.Instance}
601
    @param instance: the instance to check
602

603

604
    """
605
    return self._SingleNodeCall(node, "instance_migratable",
606
                                [self._InstDict(instance)])
607

    
608
  def call_all_instances_info(self, node_list, hypervisor_list):
609
    """Returns information about all instances on the given nodes.
610

611
    This is a multi-node call.
612

613
    @type node_list: list
614
    @param node_list: the list of nodes to query
615
    @type hypervisor_list: list
616
    @param hypervisor_list: the hypervisors to query for instances
617

618
    """
619
    return self._MultiNodeCall(node_list, "all_instances_info",
620
                               [hypervisor_list])
621

    
622
  def call_instance_list(self, node_list, hypervisor_list):
623
    """Returns the list of running instances on a given node.
624

625
    This is a multi-node call.
626

627
    @type node_list: list
628
    @param node_list: the list of nodes to query
629
    @type hypervisor_list: list
630
    @param hypervisor_list: the hypervisors to query for instances
631

632
    """
633
    return self._MultiNodeCall(node_list, "instance_list", [hypervisor_list])
634

    
635
  def call_node_tcp_ping(self, node, source, target, port, timeout,
636
                         live_port_needed):
637
    """Do a TcpPing on the remote node
638

639
    This is a single-node call.
640

641
    """
642
    return self._SingleNodeCall(node, "node_tcp_ping",
643
                                [source, target, port, timeout,
644
                                 live_port_needed])
645

    
646
  def call_node_has_ip_address(self, node, address):
647
    """Checks if a node has the given IP address.
648

649
    This is a single-node call.
650

651
    """
652
    return self._SingleNodeCall(node, "node_has_ip_address", [address])
653

    
654
  def call_node_info(self, node_list, vg_name, hypervisor_type):
655
    """Return node information.
656

657
    This will return memory information and volume group size and free
658
    space.
659

660
    This is a multi-node call.
661

662
    @type node_list: list
663
    @param node_list: the list of nodes to query
664
    @type vg_name: C{string}
665
    @param vg_name: the name of the volume group to ask for disk space
666
        information
667
    @type hypervisor_type: C{str}
668
    @param hypervisor_type: the name of the hypervisor to ask for
669
        memory information
670

671
    """
672
    return self._MultiNodeCall(node_list, "node_info",
673
                               [vg_name, hypervisor_type])
674

    
675
  def call_node_add(self, node, dsa, dsapub, rsa, rsapub, ssh, sshpub):
676
    """Add a node to the cluster.
677

678
    This is a single-node call.
679

680
    """
681
    return self._SingleNodeCall(node, "node_add",
682
                                [dsa, dsapub, rsa, rsapub, ssh, sshpub])
683

    
684
  def call_node_verify(self, node_list, checkdict, cluster_name):
685
    """Request verification of given parameters.
686

687
    This is a multi-node call.
688

689
    """
690
    return self._MultiNodeCall(node_list, "node_verify",
691
                               [checkdict, cluster_name])
692

    
693
  @classmethod
694
  def call_node_start_master(cls, node, start_daemons, no_voting):
695
    """Tells a node to activate itself as a master.
696

697
    This is a single-node call.
698

699
    """
700
    return cls._StaticSingleNodeCall(node, "node_start_master",
701
                                     [start_daemons, no_voting])
702

    
703
  @classmethod
704
  def call_node_stop_master(cls, node, stop_daemons):
705
    """Tells a node to demote itself from master status.
706

707
    This is a single-node call.
708

709
    """
710
    return cls._StaticSingleNodeCall(node, "node_stop_master", [stop_daemons])
711

    
712
  @classmethod
713
  def call_master_info(cls, node_list):
714
    """Query master info.
715

716
    This is a multi-node call.
717

718
    """
719
    # TODO: should this method query down nodes?
720
    return cls._StaticMultiNodeCall(node_list, "master_info", [])
721

    
722
  def call_version(self, node_list):
723
    """Query node version.
724

725
    This is a multi-node call.
726

727
    """
728
    return self._MultiNodeCall(node_list, "version", [])
729

    
730
  def call_blockdev_create(self, node, bdev, size, owner, on_primary, info):
731
    """Request creation of a given block device.
732

733
    This is a single-node call.
734

735
    """
736
    return self._SingleNodeCall(node, "blockdev_create",
737
                                [bdev.ToDict(), size, owner, on_primary, info])
738

    
739
  def call_blockdev_remove(self, node, bdev):
740
    """Request removal of a given block device.
741

742
    This is a single-node call.
743

744
    """
745
    return self._SingleNodeCall(node, "blockdev_remove", [bdev.ToDict()])
746

    
747
  def call_blockdev_rename(self, node, devlist):
748
    """Request rename of the given block devices.
749

750
    This is a single-node call.
751

752
    """
753
    return self._SingleNodeCall(node, "blockdev_rename",
754
                                [(d.ToDict(), uid) for d, uid in devlist])
755

    
756
  def call_blockdev_assemble(self, node, disk, owner, on_primary):
757
    """Request assembling of a given block device.
758

759
    This is a single-node call.
760

761
    """
762
    return self._SingleNodeCall(node, "blockdev_assemble",
763
                                [disk.ToDict(), owner, on_primary])
764

    
765
  def call_blockdev_shutdown(self, node, disk):
766
    """Request shutdown of a given block device.
767

768
    This is a single-node call.
769

770
    """
771
    return self._SingleNodeCall(node, "blockdev_shutdown", [disk.ToDict()])
772

    
773
  def call_blockdev_addchildren(self, node, bdev, ndevs):
774
    """Request adding a list of children to a (mirroring) device.
775

776
    This is a single-node call.
777

778
    """
779
    return self._SingleNodeCall(node, "blockdev_addchildren",
780
                                [bdev.ToDict(),
781
                                 [disk.ToDict() for disk in ndevs]])
782

    
783
  def call_blockdev_removechildren(self, node, bdev, ndevs):
784
    """Request removing a list of children from a (mirroring) device.
785

786
    This is a single-node call.
787

788
    """
789
    return self._SingleNodeCall(node, "blockdev_removechildren",
790
                                [bdev.ToDict(),
791
                                 [disk.ToDict() for disk in ndevs]])
792

    
793
  def call_blockdev_getmirrorstatus(self, node, disks):
794
    """Request status of a (mirroring) device.
795

796
    This is a single-node call.
797

798
    """
799
    result = self._SingleNodeCall(node, "blockdev_getmirrorstatus",
800
                                  [dsk.ToDict() for dsk in disks])
801
    if not result.fail_msg:
802
      result.payload = [objects.BlockDevStatus.FromDict(i)
803
                        for i in result.payload]
804
    return result
805

    
806
  def call_blockdev_find(self, node, disk):
807
    """Request identification of a given block device.
808

809
    This is a single-node call.
810

811
    """
812
    result = self._SingleNodeCall(node, "blockdev_find", [disk.ToDict()])
813
    if not result.fail_msg and result.payload is not None:
814
      result.payload = objects.BlockDevStatus.FromDict(result.payload)
815
    return result
816

    
817
  def call_blockdev_close(self, node, instance_name, disks):
818
    """Closes the given block devices.
819

820
    This is a single-node call.
821

822
    """
823
    params = [instance_name, [cf.ToDict() for cf in disks]]
824
    return self._SingleNodeCall(node, "blockdev_close", params)
825

    
826
  def call_blockdev_getsizes(self, node, disks):
827
    """Returns the size of the given disks.
828

829
    This is a single-node call.
830

831
    """
832
    params = [[cf.ToDict() for cf in disks]]
833
    return self._SingleNodeCall(node, "blockdev_getsize", params)
834

    
835
  def call_drbd_disconnect_net(self, node_list, nodes_ip, disks):
836
    """Disconnects the network of the given drbd devices.
837

838
    This is a multi-node call.
839

840
    """
841
    return self._MultiNodeCall(node_list, "drbd_disconnect_net",
842
                               [nodes_ip, [cf.ToDict() for cf in disks]])
843

    
844
  def call_drbd_attach_net(self, node_list, nodes_ip,
845
                           disks, instance_name, multimaster):
846
    """Disconnects the given drbd devices.
847

848
    This is a multi-node call.
849

850
    """
851
    return self._MultiNodeCall(node_list, "drbd_attach_net",
852
                               [nodes_ip, [cf.ToDict() for cf in disks],
853
                                instance_name, multimaster])
854

    
855
  def call_drbd_wait_sync(self, node_list, nodes_ip, disks):
856
    """Waits for the synchronization of drbd devices is complete.
857

858
    This is a multi-node call.
859

860
    """
861
    return self._MultiNodeCall(node_list, "drbd_wait_sync",
862
                               [nodes_ip, [cf.ToDict() for cf in disks]])
863

    
864
  @classmethod
865
  def call_upload_file(cls, node_list, file_name, address_list=None):
866
    """Upload a file.
867

868
    The node will refuse the operation in case the file is not on the
869
    approved file list.
870

871
    This is a multi-node call.
872

873
    @type node_list: list
874
    @param node_list: the list of node names to upload to
875
    @type file_name: str
876
    @param file_name: the filename to upload
877
    @type address_list: list or None
878
    @keyword address_list: an optional list of node addresses, in order
879
        to optimize the RPC speed
880

881
    """
882
    file_contents = utils.ReadFile(file_name)
883
    data = cls._Compress(file_contents)
884
    st = os.stat(file_name)
885
    params = [file_name, data, st.st_mode, st.st_uid, st.st_gid,
886
              st.st_atime, st.st_mtime]
887
    return cls._StaticMultiNodeCall(node_list, "upload_file", params,
888
                                    address_list=address_list)
889

    
890
  @classmethod
891
  def call_write_ssconf_files(cls, node_list, values):
892
    """Write ssconf files.
893

894
    This is a multi-node call.
895

896
    """
897
    return cls._StaticMultiNodeCall(node_list, "write_ssconf_files", [values])
898

    
899
  def call_os_diagnose(self, node_list):
900
    """Request a diagnose of OS definitions.
901

902
    This is a multi-node call.
903

904
    """
905
    return self._MultiNodeCall(node_list, "os_diagnose", [])
906

    
907
  def call_os_get(self, node, name):
908
    """Returns an OS definition.
909

910
    This is a single-node call.
911

912
    """
913
    result = self._SingleNodeCall(node, "os_get", [name])
914
    if not result.fail_msg and isinstance(result.payload, dict):
915
      result.payload = objects.OS.FromDict(result.payload)
916
    return result
917

    
918
  def call_hooks_runner(self, node_list, hpath, phase, env):
919
    """Call the hooks runner.
920

921
    Args:
922
      - op: the OpCode instance
923
      - env: a dictionary with the environment
924

925
    This is a multi-node call.
926

927
    """
928
    params = [hpath, phase, env]
929
    return self._MultiNodeCall(node_list, "hooks_runner", params)
930

    
931
  def call_iallocator_runner(self, node, name, idata):
932
    """Call an iallocator on a remote node
933

934
    Args:
935
      - name: the iallocator name
936
      - input: the json-encoded input string
937

938
    This is a single-node call.
939

940
    """
941
    return self._SingleNodeCall(node, "iallocator_runner", [name, idata])
942

    
943
  def call_blockdev_grow(self, node, cf_bdev, amount):
944
    """Request a snapshot of the given block device.
945

946
    This is a single-node call.
947

948
    """
949
    return self._SingleNodeCall(node, "blockdev_grow",
950
                                [cf_bdev.ToDict(), amount])
951

    
952
  def call_blockdev_export(self, node, cf_bdev,
953
                           dest_node, dest_path, cluster_name):
954
    """Export a given disk to another node.
955

956
    This is a single-node call.
957

958
    """
959
    return self._SingleNodeCall(node, "blockdev_export",
960
                                [cf_bdev.ToDict(), dest_node, dest_path,
961
                                 cluster_name])
962

    
963
  def call_blockdev_snapshot(self, node, cf_bdev):
964
    """Request a snapshot of the given block device.
965

966
    This is a single-node call.
967

968
    """
969
    return self._SingleNodeCall(node, "blockdev_snapshot", [cf_bdev.ToDict()])
970

    
971
  def call_snapshot_export(self, node, snap_bdev, dest_node, instance,
972
                           cluster_name, idx):
973
    """Request the export of a given snapshot.
974

975
    This is a single-node call.
976

977
    """
978
    return self._SingleNodeCall(node, "snapshot_export",
979
                                [snap_bdev.ToDict(), dest_node,
980
                                 self._InstDict(instance), cluster_name, idx])
981

    
982
  def call_finalize_export(self, node, instance, snap_disks):
983
    """Request the completion of an export operation.
984

985
    This writes the export config file, etc.
986

987
    This is a single-node call.
988

989
    """
990
    flat_disks = []
991
    for disk in snap_disks:
992
      if isinstance(disk, bool):
993
        flat_disks.append(disk)
994
      else:
995
        flat_disks.append(disk.ToDict())
996

    
997
    return self._SingleNodeCall(node, "finalize_export",
998
                                [self._InstDict(instance), flat_disks])
999

    
1000
  def call_export_info(self, node, path):
1001
    """Queries the export information in a given path.
1002

1003
    This is a single-node call.
1004

1005
    """
1006
    return self._SingleNodeCall(node, "export_info", [path])
1007

    
1008
  def call_instance_os_import(self, node, inst, src_node, src_images,
1009
                              cluster_name):
1010
    """Request the import of a backup into an instance.
1011

1012
    This is a single-node call.
1013

1014
    """
1015
    return self._SingleNodeCall(node, "instance_os_import",
1016
                                [self._InstDict(inst), src_node, src_images,
1017
                                 cluster_name])
1018

    
1019
  def call_export_list(self, node_list):
1020
    """Gets the stored exports list.
1021

1022
    This is a multi-node call.
1023

1024
    """
1025
    return self._MultiNodeCall(node_list, "export_list", [])
1026

    
1027
  def call_export_remove(self, node, export):
1028
    """Requests removal of a given export.
1029

1030
    This is a single-node call.
1031

1032
    """
1033
    return self._SingleNodeCall(node, "export_remove", [export])
1034

    
1035
  @classmethod
1036
  def call_node_leave_cluster(cls, node):
1037
    """Requests a node to clean the cluster information it has.
1038

1039
    This will remove the configuration information from the ganeti data
1040
    dir.
1041

1042
    This is a single-node call.
1043

1044
    """
1045
    return cls._StaticSingleNodeCall(node, "node_leave_cluster", [])
1046

    
1047
  def call_node_volumes(self, node_list):
1048
    """Gets all volumes on node(s).
1049

1050
    This is a multi-node call.
1051

1052
    """
1053
    return self._MultiNodeCall(node_list, "node_volumes", [])
1054

    
1055
  def call_node_demote_from_mc(self, node):
1056
    """Demote a node from the master candidate role.
1057

1058
    This is a single-node call.
1059

1060
    """
1061
    return self._SingleNodeCall(node, "node_demote_from_mc", [])
1062

    
1063

    
1064
  def call_node_powercycle(self, node, hypervisor):
1065
    """Tries to powercycle a node.
1066

1067
    This is a single-node call.
1068

1069
    """
1070
    return self._SingleNodeCall(node, "node_powercycle", [hypervisor])
1071

    
1072

    
1073
  def call_test_delay(self, node_list, duration):
1074
    """Sleep for a fixed time on given node(s).
1075

1076
    This is a multi-node call.
1077

1078
    """
1079
    return self._MultiNodeCall(node_list, "test_delay", [duration])
1080

    
1081
  def call_file_storage_dir_create(self, node, file_storage_dir):
1082
    """Create the given file storage directory.
1083

1084
    This is a single-node call.
1085

1086
    """
1087
    return self._SingleNodeCall(node, "file_storage_dir_create",
1088
                                [file_storage_dir])
1089

    
1090
  def call_file_storage_dir_remove(self, node, file_storage_dir):
1091
    """Remove the given file storage directory.
1092

1093
    This is a single-node call.
1094

1095
    """
1096
    return self._SingleNodeCall(node, "file_storage_dir_remove",
1097
                                [file_storage_dir])
1098

    
1099
  def call_file_storage_dir_rename(self, node, old_file_storage_dir,
1100
                                   new_file_storage_dir):
1101
    """Rename file storage directory.
1102

1103
    This is a single-node call.
1104

1105
    """
1106
    return self._SingleNodeCall(node, "file_storage_dir_rename",
1107
                                [old_file_storage_dir, new_file_storage_dir])
1108

    
1109
  @classmethod
1110
  def call_jobqueue_update(cls, node_list, address_list, file_name, content):
1111
    """Update job queue.
1112

1113
    This is a multi-node call.
1114

1115
    """
1116
    return cls._StaticMultiNodeCall(node_list, "jobqueue_update",
1117
                                    [file_name, cls._Compress(content)],
1118
                                    address_list=address_list)
1119

    
1120
  @classmethod
1121
  def call_jobqueue_purge(cls, node):
1122
    """Purge job queue.
1123

1124
    This is a single-node call.
1125

1126
    """
1127
    return cls._StaticSingleNodeCall(node, "jobqueue_purge", [])
1128

    
1129
  @classmethod
1130
  def call_jobqueue_rename(cls, node_list, address_list, rename):
1131
    """Rename a job queue file.
1132

1133
    This is a multi-node call.
1134

1135
    """
1136
    return cls._StaticMultiNodeCall(node_list, "jobqueue_rename", rename,
1137
                                    address_list=address_list)
1138

    
1139
  @classmethod
1140
  def call_jobqueue_set_drain(cls, node_list, drain_flag):
1141
    """Set the drain flag on the queue.
1142

1143
    This is a multi-node call.
1144

1145
    @type node_list: list
1146
    @param node_list: the list of nodes to query
1147
    @type drain_flag: bool
1148
    @param drain_flag: if True, will set the drain flag, otherwise reset it.
1149

1150
    """
1151
    return cls._StaticMultiNodeCall(node_list, "jobqueue_set_drain",
1152
                                    [drain_flag])
1153

    
1154
  def call_hypervisor_validate_params(self, node_list, hvname, hvparams):
1155
    """Validate the hypervisor params.
1156

1157
    This is a multi-node call.
1158

1159
    @type node_list: list
1160
    @param node_list: the list of nodes to query
1161
    @type hvname: string
1162
    @param hvname: the hypervisor name
1163
    @type hvparams: dict
1164
    @param hvparams: the hypervisor parameters to be validated
1165

1166
    """
1167
    cluster = self._cfg.GetClusterInfo()
1168
    hv_full = objects.FillDict(cluster.hvparams.get(hvname, {}), hvparams)
1169
    return self._MultiNodeCall(node_list, "hypervisor_validate_params",
1170
                               [hvname, hv_full])