Statistics
| Branch: | Tag: | Revision:

root / lib / rpc.py @ 8f215968

History | View | Annotate | Download (34.8 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Inter-node RPC library.
23

24
"""
25

    
26
# pylint: disable-msg=C0103,R0201,R0904
27
# C0103: Invalid name, since call_ are not valid
28
# R0201: Method could be a function, we keep all rpcs instance methods
29
# as not to change them back and forth between static/instance methods
30
# if they need to start using instance attributes
31
# R0904: Too many public methods
32

    
33
import os
34
import logging
35
import zlib
36
import base64
37

    
38
from ganeti import utils
39
from ganeti import objects
40
from ganeti import http
41
from ganeti import serializer
42
from ganeti import constants
43
from ganeti import errors
44

    
45
import ganeti.http.client
46

    
47

    
48
# Module level variable
49
_http_manager = None
50

    
51

    
52
def Init():
53
  """Initializes the module-global HTTP client manager.
54

55
  Must be called before using any RPC function.
56

57
  """
58
  global _http_manager
59

    
60
  assert not _http_manager, "RPC module initialized more than once"
61

    
62
  _http_manager = http.client.HttpClientManager()
63

    
64

    
65
def Shutdown():
66
  """Stops the module-global HTTP client manager.
67

68
  Must be called before quitting the program.
69

70
  """
71
  global _http_manager
72

    
73
  if _http_manager:
74
    _http_manager.Shutdown()
75
    _http_manager = None
76

    
77

    
78
class RpcResult(object):
79
  """RPC Result class.
80

81
  This class holds an RPC result. It is needed since in multi-node
82
  calls we can't raise an exception just because one one out of many
83
  failed, and therefore we use this class to encapsulate the result.
84

85
  @ivar data: the data payload, for successful results, or None
86
  @ivar call: the name of the RPC call
87
  @ivar node: the name of the node to which we made the call
88
  @ivar offline: whether the operation failed because the node was
89
      offline, as opposed to actual failure; offline=True will always
90
      imply failed=True, in order to allow simpler checking if
91
      the user doesn't care about the exact failure mode
92
  @ivar fail_msg: the error message if the call failed
93

94
  """
95
  def __init__(self, data=None, failed=False, offline=False,
96
               call=None, node=None):
97
    self.offline = offline
98
    self.call = call
99
    self.node = node
100

    
101
    if offline:
102
      self.fail_msg = "Node is marked offline"
103
      self.data = self.payload = None
104
    elif failed:
105
      self.fail_msg = self._EnsureErr(data)
106
      self.data = self.payload = None
107
    else:
108
      self.data = data
109
      if not isinstance(self.data, (tuple, list)):
110
        self.fail_msg = ("RPC layer error: invalid result type (%s)" %
111
                         type(self.data))
112
        self.payload = None
113
      elif len(data) != 2:
114
        self.fail_msg = ("RPC layer error: invalid result length (%d), "
115
                         "expected 2" % len(self.data))
116
        self.payload = None
117
      elif not self.data[0]:
118
        self.fail_msg = self._EnsureErr(self.data[1])
119
        self.payload = None
120
      else:
121
        # finally success
122
        self.fail_msg = None
123
        self.payload = data[1]
124

    
125
    assert hasattr(self, "call")
126
    assert hasattr(self, "data")
127
    assert hasattr(self, "fail_msg")
128
    assert hasattr(self, "node")
129
    assert hasattr(self, "offline")
130
    assert hasattr(self, "payload")
131

    
132
  @staticmethod
133
  def _EnsureErr(val):
134
    """Helper to ensure we return a 'True' value for error."""
135
    if val:
136
      return val
137
    else:
138
      return "No error information"
139

    
140
  def Raise(self, msg, prereq=False):
141
    """If the result has failed, raise an OpExecError.
142

143
    This is used so that LU code doesn't have to check for each
144
    result, but instead can call this function.
145

146
    """
147
    if not self.fail_msg:
148
      return
149

    
150
    if not msg: # one could pass None for default message
151
      msg = ("Call '%s' to node '%s' has failed: %s" %
152
             (self.call, self.node, self.fail_msg))
153
    else:
154
      msg = "%s: %s" % (msg, self.fail_msg)
155
    if prereq:
156
      ec = errors.OpPrereqError
157
    else:
158
      ec = errors.OpExecError
159
    raise ec(msg)
160

    
161

    
162
class Client:
163
  """RPC Client class.
164

165
  This class, given a (remote) method name, a list of parameters and a
166
  list of nodes, will contact (in parallel) all nodes, and return a
167
  dict of results (key: node name, value: result).
168

169
  One current bug is that generic failure is still signaled by
170
  'False' result, which is not good. This overloading of values can
171
  cause bugs.
172

173
  """
174
  def __init__(self, procedure, body, port):
175
    self.procedure = procedure
176
    self.body = body
177
    self.port = port
178
    self.nc = {}
179

    
180
    self._ssl_params = \
181
      http.HttpSslParams(ssl_key_path=constants.SSL_CERT_FILE,
182
                         ssl_cert_path=constants.SSL_CERT_FILE)
183

    
184
  def ConnectList(self, node_list, address_list=None):
185
    """Add a list of nodes to the target nodes.
186

187
    @type node_list: list
188
    @param node_list: the list of node names to connect
189
    @type address_list: list or None
190
    @keyword address_list: either None or a list with node addresses,
191
        which must have the same length as the node list
192

193
    """
194
    if address_list is None:
195
      address_list = [None for _ in node_list]
196
    else:
197
      assert len(node_list) == len(address_list), \
198
             "Name and address lists should have the same length"
199
    for node, address in zip(node_list, address_list):
200
      self.ConnectNode(node, address)
201

    
202
  def ConnectNode(self, name, address=None):
203
    """Add a node to the target list.
204

205
    @type name: str
206
    @param name: the node name
207
    @type address: str
208
    @keyword address: the node address, if known
209

210
    """
211
    if address is None:
212
      address = name
213

    
214
    self.nc[name] = \
215
      http.client.HttpClientRequest(address, self.port, http.HTTP_PUT,
216
                                    "/%s" % self.procedure,
217
                                    post_data=self.body,
218
                                    ssl_params=self._ssl_params,
219
                                    ssl_verify_peer=True)
220

    
221
  def GetResults(self):
222
    """Call nodes and return results.
223

224
    @rtype: list
225
    @return: List of RPC results
226

227
    """
228
    assert _http_manager, "RPC module not initialized"
229

    
230
    _http_manager.ExecRequests(self.nc.values())
231

    
232
    results = {}
233

    
234
    for name, req in self.nc.iteritems():
235
      if req.success and req.resp_status_code == http.HTTP_OK:
236
        results[name] = RpcResult(data=serializer.LoadJson(req.resp_body),
237
                                  node=name, call=self.procedure)
238
        continue
239

    
240
      # TODO: Better error reporting
241
      if req.error:
242
        msg = req.error
243
      else:
244
        msg = req.resp_body
245

    
246
      logging.error("RPC error in %s from node %s: %s",
247
                    self.procedure, name, msg)
248
      results[name] = RpcResult(data=msg, failed=True, node=name,
249
                                call=self.procedure)
250

    
251
    return results
252

    
253

    
254
class RpcRunner(object):
255
  """RPC runner class"""
256

    
257
  def __init__(self, cfg):
258
    """Initialized the rpc runner.
259

260
    @type cfg:  C{config.ConfigWriter}
261
    @param cfg: the configuration object that will be used to get data
262
                about the cluster
263

264
    """
265
    self._cfg = cfg
266
    self.port = utils.GetDaemonPort(constants.NODED)
267

    
268
  def _InstDict(self, instance, hvp=None, bep=None):
269
    """Convert the given instance to a dict.
270

271
    This is done via the instance's ToDict() method and additionally
272
    we fill the hvparams with the cluster defaults.
273

274
    @type instance: L{objects.Instance}
275
    @param instance: an Instance object
276
    @type hvp: dict or None
277
    @param hvp: a dictionary with overridden hypervisor parameters
278
    @type bep: dict or None
279
    @param bep: a dictionary with overridden backend parameters
280
    @rtype: dict
281
    @return: the instance dict, with the hvparams filled with the
282
        cluster defaults
283

284
    """
285
    idict = instance.ToDict()
286
    cluster = self._cfg.GetClusterInfo()
287
    idict["hvparams"] = cluster.FillHV(instance)
288
    if hvp is not None:
289
      idict["hvparams"].update(hvp)
290
    idict["beparams"] = cluster.FillBE(instance)
291
    if bep is not None:
292
      idict["beparams"].update(bep)
293
    for nic in idict["nics"]:
294
      nic['nicparams'] = objects.FillDict(
295
        cluster.nicparams[constants.PP_DEFAULT],
296
        nic['nicparams'])
297
    return idict
298

    
299
  def _ConnectList(self, client, node_list, call):
300
    """Helper for computing node addresses.
301

302
    @type client: L{ganeti.rpc.Client}
303
    @param client: a C{Client} instance
304
    @type node_list: list
305
    @param node_list: the node list we should connect
306
    @type call: string
307
    @param call: the name of the remote procedure call, for filling in
308
        correctly any eventual offline nodes' results
309

310
    """
311
    all_nodes = self._cfg.GetAllNodesInfo()
312
    name_list = []
313
    addr_list = []
314
    skip_dict = {}
315
    for node in node_list:
316
      if node in all_nodes:
317
        if all_nodes[node].offline:
318
          skip_dict[node] = RpcResult(node=node, offline=True, call=call)
319
          continue
320
        val = all_nodes[node].primary_ip
321
      else:
322
        val = None
323
      addr_list.append(val)
324
      name_list.append(node)
325
    if name_list:
326
      client.ConnectList(name_list, address_list=addr_list)
327
    return skip_dict
328

    
329
  def _ConnectNode(self, client, node, call):
330
    """Helper for computing one node's address.
331

332
    @type client: L{ganeti.rpc.Client}
333
    @param client: a C{Client} instance
334
    @type node: str
335
    @param node: the node we should connect
336
    @type call: string
337
    @param call: the name of the remote procedure call, for filling in
338
        correctly any eventual offline nodes' results
339

340
    """
341
    node_info = self._cfg.GetNodeInfo(node)
342
    if node_info is not None:
343
      if node_info.offline:
344
        return RpcResult(node=node, offline=True, call=call)
345
      addr = node_info.primary_ip
346
    else:
347
      addr = None
348
    client.ConnectNode(node, address=addr)
349

    
350
  def _MultiNodeCall(self, node_list, procedure, args):
351
    """Helper for making a multi-node call
352

353
    """
354
    body = serializer.DumpJson(args, indent=False)
355
    c = Client(procedure, body, self.port)
356
    skip_dict = self._ConnectList(c, node_list, procedure)
357
    skip_dict.update(c.GetResults())
358
    return skip_dict
359

    
360
  @classmethod
361
  def _StaticMultiNodeCall(cls, node_list, procedure, args,
362
                           address_list=None):
363
    """Helper for making a multi-node static call
364

365
    """
366
    body = serializer.DumpJson(args, indent=False)
367
    c = Client(procedure, body, utils.GetDaemonPort(constants.NODED))
368
    c.ConnectList(node_list, address_list=address_list)
369
    return c.GetResults()
370

    
371
  def _SingleNodeCall(self, node, procedure, args):
372
    """Helper for making a single-node call
373

374
    """
375
    body = serializer.DumpJson(args, indent=False)
376
    c = Client(procedure, body, self.port)
377
    result = self._ConnectNode(c, node, procedure)
378
    if result is None:
379
      # we did connect, node is not offline
380
      result = c.GetResults()[node]
381
    return result
382

    
383
  @classmethod
384
  def _StaticSingleNodeCall(cls, node, procedure, args):
385
    """Helper for making a single-node static call
386

387
    """
388
    body = serializer.DumpJson(args, indent=False)
389
    c = Client(procedure, body, utils.GetDaemonPort(constants.NODED))
390
    c.ConnectNode(node)
391
    return c.GetResults()[node]
392

    
393
  @staticmethod
394
  def _Compress(data):
395
    """Compresses a string for transport over RPC.
396

397
    Small amounts of data are not compressed.
398

399
    @type data: str
400
    @param data: Data
401
    @rtype: tuple
402
    @return: Encoded data to send
403

404
    """
405
    # Small amounts of data are not compressed
406
    if len(data) < 512:
407
      return (constants.RPC_ENCODING_NONE, data)
408

    
409
    # Compress with zlib and encode in base64
410
    return (constants.RPC_ENCODING_ZLIB_BASE64,
411
            base64.b64encode(zlib.compress(data, 3)))
412

    
413
  #
414
  # Begin RPC calls
415
  #
416

    
417
  def call_lv_list(self, node_list, vg_name):
418
    """Gets the logical volumes present in a given volume group.
419

420
    This is a multi-node call.
421

422
    """
423
    return self._MultiNodeCall(node_list, "lv_list", [vg_name])
424

    
425
  def call_vg_list(self, node_list):
426
    """Gets the volume group list.
427

428
    This is a multi-node call.
429

430
    """
431
    return self._MultiNodeCall(node_list, "vg_list", [])
432

    
433
  def call_storage_list(self, node_list, su_name, su_args, name, fields):
434
    """Get list of storage units.
435

436
    This is a multi-node call.
437

438
    """
439
    return self._MultiNodeCall(node_list, "storage_list",
440
                               [su_name, su_args, name, fields])
441

    
442
  def call_storage_modify(self, node, su_name, su_args, name, changes):
443
    """Modify a storage unit.
444

445
    This is a single-node call.
446

447
    """
448
    return self._SingleNodeCall(node, "storage_modify",
449
                                [su_name, su_args, name, changes])
450

    
451
  def call_storage_execute(self, node, su_name, su_args, name, op):
452
    """Executes an operation on a storage unit.
453

454
    This is a single-node call.
455

456
    """
457
    return self._SingleNodeCall(node, "storage_execute",
458
                                [su_name, su_args, name, op])
459

    
460
  def call_bridges_exist(self, node, bridges_list):
461
    """Checks if a node has all the bridges given.
462

463
    This method checks if all bridges given in the bridges_list are
464
    present on the remote node, so that an instance that uses interfaces
465
    on those bridges can be started.
466

467
    This is a single-node call.
468

469
    """
470
    return self._SingleNodeCall(node, "bridges_exist", [bridges_list])
471

    
472
  def call_instance_start(self, node, instance, hvp, bep):
473
    """Starts an instance.
474

475
    This is a single-node call.
476

477
    """
478
    idict = self._InstDict(instance, hvp=hvp, bep=bep)
479
    return self._SingleNodeCall(node, "instance_start", [idict])
480

    
481
  def call_instance_shutdown(self, node, instance, timeout):
482
    """Stops an instance.
483

484
    This is a single-node call.
485

486
    """
487
    return self._SingleNodeCall(node, "instance_shutdown",
488
                                [self._InstDict(instance), timeout])
489

    
490
  def call_migration_info(self, node, instance):
491
    """Gather the information necessary to prepare an instance migration.
492

493
    This is a single-node call.
494

495
    @type node: string
496
    @param node: the node on which the instance is currently running
497
    @type instance: C{objects.Instance}
498
    @param instance: the instance definition
499

500
    """
501
    return self._SingleNodeCall(node, "migration_info",
502
                                [self._InstDict(instance)])
503

    
504
  def call_accept_instance(self, node, instance, info, target):
505
    """Prepare a node to accept an instance.
506

507
    This is a single-node call.
508

509
    @type node: string
510
    @param node: the target node for the migration
511
    @type instance: C{objects.Instance}
512
    @param instance: the instance definition
513
    @type info: opaque/hypervisor specific (string/data)
514
    @param info: result for the call_migration_info call
515
    @type target: string
516
    @param target: target hostname (usually ip address) (on the node itself)
517

518
    """
519
    return self._SingleNodeCall(node, "accept_instance",
520
                                [self._InstDict(instance), info, target])
521

    
522
  def call_finalize_migration(self, node, instance, info, success):
523
    """Finalize any target-node migration specific operation.
524

525
    This is called both in case of a successful migration and in case of error
526
    (in which case it should abort the migration).
527

528
    This is a single-node call.
529

530
    @type node: string
531
    @param node: the target node for the migration
532
    @type instance: C{objects.Instance}
533
    @param instance: the instance definition
534
    @type info: opaque/hypervisor specific (string/data)
535
    @param info: result for the call_migration_info call
536
    @type success: boolean
537
    @param success: whether the migration was a success or a failure
538

539
    """
540
    return self._SingleNodeCall(node, "finalize_migration",
541
                                [self._InstDict(instance), info, success])
542

    
543
  def call_instance_migrate(self, node, instance, target, live):
544
    """Migrate an instance.
545

546
    This is a single-node call.
547

548
    @type node: string
549
    @param node: the node on which the instance is currently running
550
    @type instance: C{objects.Instance}
551
    @param instance: the instance definition
552
    @type target: string
553
    @param target: the target node name
554
    @type live: boolean
555
    @param live: whether the migration should be done live or not (the
556
        interpretation of this parameter is left to the hypervisor)
557

558
    """
559
    return self._SingleNodeCall(node, "instance_migrate",
560
                                [self._InstDict(instance), target, live])
561

    
562
  def call_instance_reboot(self, node, inst, reboot_type, shutdown_timeout):
563
    """Reboots an instance.
564

565
    This is a single-node call.
566

567
    """
568
    return self._SingleNodeCall(node, "instance_reboot",
569
                                [self._InstDict(inst), reboot_type,
570
                                 shutdown_timeout])
571

    
572
  def call_instance_os_add(self, node, inst, reinstall):
573
    """Installs an OS on the given instance.
574

575
    This is a single-node call.
576

577
    """
578
    return self._SingleNodeCall(node, "instance_os_add",
579
                                [self._InstDict(inst), reinstall])
580

    
581
  def call_instance_run_rename(self, node, inst, old_name):
582
    """Run the OS rename script for an instance.
583

584
    This is a single-node call.
585

586
    """
587
    return self._SingleNodeCall(node, "instance_run_rename",
588
                                [self._InstDict(inst), old_name])
589

    
590
  def call_instance_info(self, node, instance, hname):
591
    """Returns information about a single instance.
592

593
    This is a single-node call.
594

595
    @type node: list
596
    @param node: the list of nodes to query
597
    @type instance: string
598
    @param instance: the instance name
599
    @type hname: string
600
    @param hname: the hypervisor type of the instance
601

602
    """
603
    return self._SingleNodeCall(node, "instance_info", [instance, hname])
604

    
605
  def call_instance_migratable(self, node, instance):
606
    """Checks whether the given instance can be migrated.
607

608
    This is a single-node call.
609

610
    @param node: the node to query
611
    @type instance: L{objects.Instance}
612
    @param instance: the instance to check
613

614

615
    """
616
    return self._SingleNodeCall(node, "instance_migratable",
617
                                [self._InstDict(instance)])
618

    
619
  def call_all_instances_info(self, node_list, hypervisor_list):
620
    """Returns information about all instances on the given nodes.
621

622
    This is a multi-node call.
623

624
    @type node_list: list
625
    @param node_list: the list of nodes to query
626
    @type hypervisor_list: list
627
    @param hypervisor_list: the hypervisors to query for instances
628

629
    """
630
    return self._MultiNodeCall(node_list, "all_instances_info",
631
                               [hypervisor_list])
632

    
633
  def call_instance_list(self, node_list, hypervisor_list):
634
    """Returns the list of running instances on a given node.
635

636
    This is a multi-node call.
637

638
    @type node_list: list
639
    @param node_list: the list of nodes to query
640
    @type hypervisor_list: list
641
    @param hypervisor_list: the hypervisors to query for instances
642

643
    """
644
    return self._MultiNodeCall(node_list, "instance_list", [hypervisor_list])
645

    
646
  def call_node_tcp_ping(self, node, source, target, port, timeout,
647
                         live_port_needed):
648
    """Do a TcpPing on the remote node
649

650
    This is a single-node call.
651

652
    """
653
    return self._SingleNodeCall(node, "node_tcp_ping",
654
                                [source, target, port, timeout,
655
                                 live_port_needed])
656

    
657
  def call_node_has_ip_address(self, node, address):
658
    """Checks if a node has the given IP address.
659

660
    This is a single-node call.
661

662
    """
663
    return self._SingleNodeCall(node, "node_has_ip_address", [address])
664

    
665
  def call_node_info(self, node_list, vg_name, hypervisor_type):
666
    """Return node information.
667

668
    This will return memory information and volume group size and free
669
    space.
670

671
    This is a multi-node call.
672

673
    @type node_list: list
674
    @param node_list: the list of nodes to query
675
    @type vg_name: C{string}
676
    @param vg_name: the name of the volume group to ask for disk space
677
        information
678
    @type hypervisor_type: C{str}
679
    @param hypervisor_type: the name of the hypervisor to ask for
680
        memory information
681

682
    """
683
    return self._MultiNodeCall(node_list, "node_info",
684
                               [vg_name, hypervisor_type])
685

    
686
  def call_node_add(self, node, dsa, dsapub, rsa, rsapub, ssh, sshpub):
687
    """Add a node to the cluster.
688

689
    This is a single-node call.
690

691
    """
692
    return self._SingleNodeCall(node, "node_add",
693
                                [dsa, dsapub, rsa, rsapub, ssh, sshpub])
694

    
695
  def call_node_verify(self, node_list, checkdict, cluster_name):
696
    """Request verification of given parameters.
697

698
    This is a multi-node call.
699

700
    """
701
    return self._MultiNodeCall(node_list, "node_verify",
702
                               [checkdict, cluster_name])
703

    
704
  @classmethod
705
  def call_node_start_master(cls, node, start_daemons, no_voting):
706
    """Tells a node to activate itself as a master.
707

708
    This is a single-node call.
709

710
    """
711
    return cls._StaticSingleNodeCall(node, "node_start_master",
712
                                     [start_daemons, no_voting])
713

    
714
  @classmethod
715
  def call_node_stop_master(cls, node, stop_daemons):
716
    """Tells a node to demote itself from master status.
717

718
    This is a single-node call.
719

720
    """
721
    return cls._StaticSingleNodeCall(node, "node_stop_master", [stop_daemons])
722

    
723
  @classmethod
724
  def call_master_info(cls, node_list):
725
    """Query master info.
726

727
    This is a multi-node call.
728

729
    """
730
    # TODO: should this method query down nodes?
731
    return cls._StaticMultiNodeCall(node_list, "master_info", [])
732

    
733
  @classmethod
734
  def call_version(cls, node_list):
735
    """Query node version.
736

737
    This is a multi-node call.
738

739
    """
740
    return cls._StaticMultiNodeCall(node_list, "version", [])
741

    
742
  def call_blockdev_create(self, node, bdev, size, owner, on_primary, info):
743
    """Request creation of a given block device.
744

745
    This is a single-node call.
746

747
    """
748
    return self._SingleNodeCall(node, "blockdev_create",
749
                                [bdev.ToDict(), size, owner, on_primary, info])
750

    
751
  def call_blockdev_remove(self, node, bdev):
752
    """Request removal of a given block device.
753

754
    This is a single-node call.
755

756
    """
757
    return self._SingleNodeCall(node, "blockdev_remove", [bdev.ToDict()])
758

    
759
  def call_blockdev_rename(self, node, devlist):
760
    """Request rename of the given block devices.
761

762
    This is a single-node call.
763

764
    """
765
    return self._SingleNodeCall(node, "blockdev_rename",
766
                                [(d.ToDict(), uid) for d, uid in devlist])
767

    
768
  def call_blockdev_assemble(self, node, disk, owner, on_primary):
769
    """Request assembling of a given block device.
770

771
    This is a single-node call.
772

773
    """
774
    return self._SingleNodeCall(node, "blockdev_assemble",
775
                                [disk.ToDict(), owner, on_primary])
776

    
777
  def call_blockdev_shutdown(self, node, disk):
778
    """Request shutdown of a given block device.
779

780
    This is a single-node call.
781

782
    """
783
    return self._SingleNodeCall(node, "blockdev_shutdown", [disk.ToDict()])
784

    
785
  def call_blockdev_addchildren(self, node, bdev, ndevs):
786
    """Request adding a list of children to a (mirroring) device.
787

788
    This is a single-node call.
789

790
    """
791
    return self._SingleNodeCall(node, "blockdev_addchildren",
792
                                [bdev.ToDict(),
793
                                 [disk.ToDict() for disk in ndevs]])
794

    
795
  def call_blockdev_removechildren(self, node, bdev, ndevs):
796
    """Request removing a list of children from a (mirroring) device.
797

798
    This is a single-node call.
799

800
    """
801
    return self._SingleNodeCall(node, "blockdev_removechildren",
802
                                [bdev.ToDict(),
803
                                 [disk.ToDict() for disk in ndevs]])
804

    
805
  def call_blockdev_getmirrorstatus(self, node, disks):
806
    """Request status of a (mirroring) device.
807

808
    This is a single-node call.
809

810
    """
811
    result = self._SingleNodeCall(node, "blockdev_getmirrorstatus",
812
                                  [dsk.ToDict() for dsk in disks])
813
    if not result.fail_msg:
814
      result.payload = [objects.BlockDevStatus.FromDict(i)
815
                        for i in result.payload]
816
    return result
817

    
818
  def call_blockdev_find(self, node, disk):
819
    """Request identification of a given block device.
820

821
    This is a single-node call.
822

823
    """
824
    result = self._SingleNodeCall(node, "blockdev_find", [disk.ToDict()])
825
    if not result.fail_msg and result.payload is not None:
826
      result.payload = objects.BlockDevStatus.FromDict(result.payload)
827
    return result
828

    
829
  def call_blockdev_close(self, node, instance_name, disks):
830
    """Closes the given block devices.
831

832
    This is a single-node call.
833

834
    """
835
    params = [instance_name, [cf.ToDict() for cf in disks]]
836
    return self._SingleNodeCall(node, "blockdev_close", params)
837

    
838
  def call_blockdev_getsizes(self, node, disks):
839
    """Returns the size of the given disks.
840

841
    This is a single-node call.
842

843
    """
844
    params = [[cf.ToDict() for cf in disks]]
845
    return self._SingleNodeCall(node, "blockdev_getsize", params)
846

    
847
  def call_drbd_disconnect_net(self, node_list, nodes_ip, disks):
848
    """Disconnects the network of the given drbd devices.
849

850
    This is a multi-node call.
851

852
    """
853
    return self._MultiNodeCall(node_list, "drbd_disconnect_net",
854
                               [nodes_ip, [cf.ToDict() for cf in disks]])
855

    
856
  def call_drbd_attach_net(self, node_list, nodes_ip,
857
                           disks, instance_name, multimaster):
858
    """Disconnects the given drbd devices.
859

860
    This is a multi-node call.
861

862
    """
863
    return self._MultiNodeCall(node_list, "drbd_attach_net",
864
                               [nodes_ip, [cf.ToDict() for cf in disks],
865
                                instance_name, multimaster])
866

    
867
  def call_drbd_wait_sync(self, node_list, nodes_ip, disks):
868
    """Waits for the synchronization of drbd devices is complete.
869

870
    This is a multi-node call.
871

872
    """
873
    return self._MultiNodeCall(node_list, "drbd_wait_sync",
874
                               [nodes_ip, [cf.ToDict() for cf in disks]])
875

    
876
  @classmethod
877
  def call_upload_file(cls, node_list, file_name, address_list=None):
878
    """Upload a file.
879

880
    The node will refuse the operation in case the file is not on the
881
    approved file list.
882

883
    This is a multi-node call.
884

885
    @type node_list: list
886
    @param node_list: the list of node names to upload to
887
    @type file_name: str
888
    @param file_name: the filename to upload
889
    @type address_list: list or None
890
    @keyword address_list: an optional list of node addresses, in order
891
        to optimize the RPC speed
892

893
    """
894
    file_contents = utils.ReadFile(file_name)
895
    data = cls._Compress(file_contents)
896
    st = os.stat(file_name)
897
    params = [file_name, data, st.st_mode, st.st_uid, st.st_gid,
898
              st.st_atime, st.st_mtime]
899
    return cls._StaticMultiNodeCall(node_list, "upload_file", params,
900
                                    address_list=address_list)
901

    
902
  @classmethod
903
  def call_write_ssconf_files(cls, node_list, values):
904
    """Write ssconf files.
905

906
    This is a multi-node call.
907

908
    """
909
    return cls._StaticMultiNodeCall(node_list, "write_ssconf_files", [values])
910

    
911
  def call_os_diagnose(self, node_list):
912
    """Request a diagnose of OS definitions.
913

914
    This is a multi-node call.
915

916
    """
917
    return self._MultiNodeCall(node_list, "os_diagnose", [])
918

    
919
  def call_os_get(self, node, name):
920
    """Returns an OS definition.
921

922
    This is a single-node call.
923

924
    """
925
    result = self._SingleNodeCall(node, "os_get", [name])
926
    if not result.fail_msg and isinstance(result.payload, dict):
927
      result.payload = objects.OS.FromDict(result.payload)
928
    return result
929

    
930
  def call_hooks_runner(self, node_list, hpath, phase, env):
931
    """Call the hooks runner.
932

933
    Args:
934
      - op: the OpCode instance
935
      - env: a dictionary with the environment
936

937
    This is a multi-node call.
938

939
    """
940
    params = [hpath, phase, env]
941
    return self._MultiNodeCall(node_list, "hooks_runner", params)
942

    
943
  def call_iallocator_runner(self, node, name, idata):
944
    """Call an iallocator on a remote node
945

946
    Args:
947
      - name: the iallocator name
948
      - input: the json-encoded input string
949

950
    This is a single-node call.
951

952
    """
953
    return self._SingleNodeCall(node, "iallocator_runner", [name, idata])
954

    
955
  def call_blockdev_grow(self, node, cf_bdev, amount):
956
    """Request a snapshot of the given block device.
957

958
    This is a single-node call.
959

960
    """
961
    return self._SingleNodeCall(node, "blockdev_grow",
962
                                [cf_bdev.ToDict(), amount])
963

    
964
  def call_blockdev_export(self, node, cf_bdev,
965
                           dest_node, dest_path, cluster_name):
966
    """Export a given disk to another node.
967

968
    This is a single-node call.
969

970
    """
971
    return self._SingleNodeCall(node, "blockdev_export",
972
                                [cf_bdev.ToDict(), dest_node, dest_path,
973
                                 cluster_name])
974

    
975
  def call_blockdev_snapshot(self, node, cf_bdev):
976
    """Request a snapshot of the given block device.
977

978
    This is a single-node call.
979

980
    """
981
    return self._SingleNodeCall(node, "blockdev_snapshot", [cf_bdev.ToDict()])
982

    
983
  def call_snapshot_export(self, node, snap_bdev, dest_node, instance,
984
                           cluster_name, idx):
985
    """Request the export of a given snapshot.
986

987
    This is a single-node call.
988

989
    """
990
    return self._SingleNodeCall(node, "snapshot_export",
991
                                [snap_bdev.ToDict(), dest_node,
992
                                 self._InstDict(instance), cluster_name, idx])
993

    
994
  def call_finalize_export(self, node, instance, snap_disks):
995
    """Request the completion of an export operation.
996

997
    This writes the export config file, etc.
998

999
    This is a single-node call.
1000

1001
    """
1002
    flat_disks = []
1003
    for disk in snap_disks:
1004
      if isinstance(disk, bool):
1005
        flat_disks.append(disk)
1006
      else:
1007
        flat_disks.append(disk.ToDict())
1008

    
1009
    return self._SingleNodeCall(node, "finalize_export",
1010
                                [self._InstDict(instance), flat_disks])
1011

    
1012
  def call_export_info(self, node, path):
1013
    """Queries the export information in a given path.
1014

1015
    This is a single-node call.
1016

1017
    """
1018
    return self._SingleNodeCall(node, "export_info", [path])
1019

    
1020
  def call_instance_os_import(self, node, inst, src_node, src_images,
1021
                              cluster_name):
1022
    """Request the import of a backup into an instance.
1023

1024
    This is a single-node call.
1025

1026
    """
1027
    return self._SingleNodeCall(node, "instance_os_import",
1028
                                [self._InstDict(inst), src_node, src_images,
1029
                                 cluster_name])
1030

    
1031
  def call_export_list(self, node_list):
1032
    """Gets the stored exports list.
1033

1034
    This is a multi-node call.
1035

1036
    """
1037
    return self._MultiNodeCall(node_list, "export_list", [])
1038

    
1039
  def call_export_remove(self, node, export):
1040
    """Requests removal of a given export.
1041

1042
    This is a single-node call.
1043

1044
    """
1045
    return self._SingleNodeCall(node, "export_remove", [export])
1046

    
1047
  @classmethod
1048
  def call_node_leave_cluster(cls, node, modify_ssh_setup):
1049
    """Requests a node to clean the cluster information it has.
1050

1051
    This will remove the configuration information from the ganeti data
1052
    dir.
1053

1054
    This is a single-node call.
1055

1056
    """
1057
    return cls._StaticSingleNodeCall(node, "node_leave_cluster",
1058
                                     [modify_ssh_setup])
1059

    
1060
  def call_node_volumes(self, node_list):
1061
    """Gets all volumes on node(s).
1062

1063
    This is a multi-node call.
1064

1065
    """
1066
    return self._MultiNodeCall(node_list, "node_volumes", [])
1067

    
1068
  def call_node_demote_from_mc(self, node):
1069
    """Demote a node from the master candidate role.
1070

1071
    This is a single-node call.
1072

1073
    """
1074
    return self._SingleNodeCall(node, "node_demote_from_mc", [])
1075

    
1076

    
1077
  def call_node_powercycle(self, node, hypervisor):
1078
    """Tries to powercycle a node.
1079

1080
    This is a single-node call.
1081

1082
    """
1083
    return self._SingleNodeCall(node, "node_powercycle", [hypervisor])
1084

    
1085

    
1086
  def call_test_delay(self, node_list, duration):
1087
    """Sleep for a fixed time on given node(s).
1088

1089
    This is a multi-node call.
1090

1091
    """
1092
    return self._MultiNodeCall(node_list, "test_delay", [duration])
1093

    
1094
  def call_file_storage_dir_create(self, node, file_storage_dir):
1095
    """Create the given file storage directory.
1096

1097
    This is a single-node call.
1098

1099
    """
1100
    return self._SingleNodeCall(node, "file_storage_dir_create",
1101
                                [file_storage_dir])
1102

    
1103
  def call_file_storage_dir_remove(self, node, file_storage_dir):
1104
    """Remove the given file storage directory.
1105

1106
    This is a single-node call.
1107

1108
    """
1109
    return self._SingleNodeCall(node, "file_storage_dir_remove",
1110
                                [file_storage_dir])
1111

    
1112
  def call_file_storage_dir_rename(self, node, old_file_storage_dir,
1113
                                   new_file_storage_dir):
1114
    """Rename file storage directory.
1115

1116
    This is a single-node call.
1117

1118
    """
1119
    return self._SingleNodeCall(node, "file_storage_dir_rename",
1120
                                [old_file_storage_dir, new_file_storage_dir])
1121

    
1122
  @classmethod
1123
  def call_jobqueue_update(cls, node_list, address_list, file_name, content):
1124
    """Update job queue.
1125

1126
    This is a multi-node call.
1127

1128
    """
1129
    return cls._StaticMultiNodeCall(node_list, "jobqueue_update",
1130
                                    [file_name, cls._Compress(content)],
1131
                                    address_list=address_list)
1132

    
1133
  @classmethod
1134
  def call_jobqueue_purge(cls, node):
1135
    """Purge job queue.
1136

1137
    This is a single-node call.
1138

1139
    """
1140
    return cls._StaticSingleNodeCall(node, "jobqueue_purge", [])
1141

    
1142
  @classmethod
1143
  def call_jobqueue_rename(cls, node_list, address_list, rename):
1144
    """Rename a job queue file.
1145

1146
    This is a multi-node call.
1147

1148
    """
1149
    return cls._StaticMultiNodeCall(node_list, "jobqueue_rename", rename,
1150
                                    address_list=address_list)
1151

    
1152
  @classmethod
1153
  def call_jobqueue_set_drain(cls, node_list, drain_flag):
1154
    """Set the drain flag on the queue.
1155

1156
    This is a multi-node call.
1157

1158
    @type node_list: list
1159
    @param node_list: the list of nodes to query
1160
    @type drain_flag: bool
1161
    @param drain_flag: if True, will set the drain flag, otherwise reset it.
1162

1163
    """
1164
    return cls._StaticMultiNodeCall(node_list, "jobqueue_set_drain",
1165
                                    [drain_flag])
1166

    
1167
  def call_hypervisor_validate_params(self, node_list, hvname, hvparams):
1168
    """Validate the hypervisor params.
1169

1170
    This is a multi-node call.
1171

1172
    @type node_list: list
1173
    @param node_list: the list of nodes to query
1174
    @type hvname: string
1175
    @param hvname: the hypervisor name
1176
    @type hvparams: dict
1177
    @param hvparams: the hypervisor parameters to be validated
1178

1179
    """
1180
    cluster = self._cfg.GetClusterInfo()
1181
    hv_full = objects.FillDict(cluster.hvparams.get(hvname, {}), hvparams)
1182
    return self._MultiNodeCall(node_list, "hypervisor_validate_params",
1183
                               [hvname, hv_full])