Statistics
| Branch: | Tag: | Revision:

root / lib / rpc.py @ 9b94905f

History | View | Annotate | Download (34.8 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Inter-node RPC library.
23

24
"""
25

    
26
# pylint: disable-msg=C0103,R0201,R0904
27
# C0103: Invalid name, since call_ are not valid
28
# R0201: Method could be a function, we keep all rpcs instance methods
29
# as not to change them back and forth between static/instance methods
30
# if they need to start using instance attributes
31
# R0904: Too many public methods
32

    
33
import os
34
import logging
35
import zlib
36
import base64
37

    
38
from ganeti import utils
39
from ganeti import objects
40
from ganeti import http
41
from ganeti import serializer
42
from ganeti import constants
43
from ganeti import errors
44

    
45
import ganeti.http.client
46

    
47

    
48
# Module level variable
49
_http_manager = None
50

    
51

    
52
def Init():
53
  """Initializes the module-global HTTP client manager.
54

55
  Must be called before using any RPC function.
56

57
  """
58
  global _http_manager
59

    
60
  assert not _http_manager, "RPC module initialized more than once"
61

    
62
  _http_manager = http.client.HttpClientManager()
63

    
64

    
65
def Shutdown():
66
  """Stops the module-global HTTP client manager.
67

68
  Must be called before quitting the program.
69

70
  """
71
  global _http_manager
72

    
73
  if _http_manager:
74
    _http_manager.Shutdown()
75
    _http_manager = None
76

    
77

    
78
class RpcResult(object):
79
  """RPC Result class.
80

81
  This class holds an RPC result. It is needed since in multi-node
82
  calls we can't raise an exception just because one one out of many
83
  failed, and therefore we use this class to encapsulate the result.
84

85
  @ivar data: the data payload, for successful results, or None
86
  @ivar call: the name of the RPC call
87
  @ivar node: the name of the node to which we made the call
88
  @ivar offline: whether the operation failed because the node was
89
      offline, as opposed to actual failure; offline=True will always
90
      imply failed=True, in order to allow simpler checking if
91
      the user doesn't care about the exact failure mode
92
  @ivar fail_msg: the error message if the call failed
93

94
  """
95
  def __init__(self, data=None, failed=False, offline=False,
96
               call=None, node=None):
97
    self.offline = offline
98
    self.call = call
99
    self.node = node
100

    
101
    if offline:
102
      self.fail_msg = "Node is marked offline"
103
      self.data = self.payload = None
104
    elif failed:
105
      self.fail_msg = self._EnsureErr(data)
106
      self.data = self.payload = None
107
    else:
108
      self.data = data
109
      if not isinstance(self.data, (tuple, list)):
110
        self.fail_msg = ("RPC layer error: invalid result type (%s)" %
111
                         type(self.data))
112
        self.payload = None
113
      elif len(data) != 2:
114
        self.fail_msg = ("RPC layer error: invalid result length (%d), "
115
                         "expected 2" % len(self.data))
116
        self.payload = None
117
      elif not self.data[0]:
118
        self.fail_msg = self._EnsureErr(self.data[1])
119
        self.payload = None
120
      else:
121
        # finally success
122
        self.fail_msg = None
123
        self.payload = data[1]
124

    
125
    assert hasattr(self, "call")
126
    assert hasattr(self, "data")
127
    assert hasattr(self, "fail_msg")
128
    assert hasattr(self, "node")
129
    assert hasattr(self, "offline")
130
    assert hasattr(self, "payload")
131

    
132
  @staticmethod
133
  def _EnsureErr(val):
134
    """Helper to ensure we return a 'True' value for error."""
135
    if val:
136
      return val
137
    else:
138
      return "No error information"
139

    
140
  def Raise(self, msg, prereq=False):
141
    """If the result has failed, raise an OpExecError.
142

143
    This is used so that LU code doesn't have to check for each
144
    result, but instead can call this function.
145

146
    """
147
    if not self.fail_msg:
148
      return
149

    
150
    if not msg: # one could pass None for default message
151
      msg = ("Call '%s' to node '%s' has failed: %s" %
152
             (self.call, self.node, self.fail_msg))
153
    else:
154
      msg = "%s: %s" % (msg, self.fail_msg)
155
    if prereq:
156
      ec = errors.OpPrereqError
157
    else:
158
      ec = errors.OpExecError
159
    raise ec(msg)
160

    
161

    
162
class Client:
163
  """RPC Client class.
164

165
  This class, given a (remote) method name, a list of parameters and a
166
  list of nodes, will contact (in parallel) all nodes, and return a
167
  dict of results (key: node name, value: result).
168

169
  One current bug is that generic failure is still signaled by
170
  'False' result, which is not good. This overloading of values can
171
  cause bugs.
172

173
  """
174
  def __init__(self, procedure, body, port):
175
    self.procedure = procedure
176
    self.body = body
177
    self.port = port
178
    self.nc = {}
179

    
180
    self._ssl_params = \
181
      http.HttpSslParams(ssl_key_path=constants.SSL_CERT_FILE,
182
                         ssl_cert_path=constants.SSL_CERT_FILE)
183

    
184
  def ConnectList(self, node_list, address_list=None):
185
    """Add a list of nodes to the target nodes.
186

187
    @type node_list: list
188
    @param node_list: the list of node names to connect
189
    @type address_list: list or None
190
    @keyword address_list: either None or a list with node addresses,
191
        which must have the same length as the node list
192

193
    """
194
    if address_list is None:
195
      address_list = [None for _ in node_list]
196
    else:
197
      assert len(node_list) == len(address_list), \
198
             "Name and address lists should have the same length"
199
    for node, address in zip(node_list, address_list):
200
      self.ConnectNode(node, address)
201

    
202
  def ConnectNode(self, name, address=None):
203
    """Add a node to the target list.
204

205
    @type name: str
206
    @param name: the node name
207
    @type address: str
208
    @keyword address: the node address, if known
209

210
    """
211
    if address is None:
212
      address = name
213

    
214
    self.nc[name] = \
215
      http.client.HttpClientRequest(address, self.port, http.HTTP_PUT,
216
                                    "/%s" % self.procedure,
217
                                    post_data=self.body,
218
                                    ssl_params=self._ssl_params,
219
                                    ssl_verify_peer=True)
220

    
221
  def GetResults(self):
222
    """Call nodes and return results.
223

224
    @rtype: list
225
    @return: List of RPC results
226

227
    """
228
    assert _http_manager, "RPC module not initialized"
229

    
230
    _http_manager.ExecRequests(self.nc.values())
231

    
232
    results = {}
233

    
234
    for name, req in self.nc.iteritems():
235
      if req.success and req.resp_status_code == http.HTTP_OK:
236
        results[name] = RpcResult(data=serializer.LoadJson(req.resp_body),
237
                                  node=name, call=self.procedure)
238
        continue
239

    
240
      # TODO: Better error reporting
241
      if req.error:
242
        msg = req.error
243
      else:
244
        msg = req.resp_body
245

    
246
      logging.error("RPC error in %s from node %s: %s",
247
                    self.procedure, name, msg)
248
      results[name] = RpcResult(data=msg, failed=True, node=name,
249
                                call=self.procedure)
250

    
251
    return results
252

    
253

    
254
class RpcRunner(object):
255
  """RPC runner class"""
256

    
257
  def __init__(self, cfg):
258
    """Initialized the rpc runner.
259

260
    @type cfg:  C{config.ConfigWriter}
261
    @param cfg: the configuration object that will be used to get data
262
                about the cluster
263

264
    """
265
    self._cfg = cfg
266
    self.port = utils.GetDaemonPort(constants.NODED)
267

    
268
  def _InstDict(self, instance, hvp=None, bep=None):
269
    """Convert the given instance to a dict.
270

271
    This is done via the instance's ToDict() method and additionally
272
    we fill the hvparams with the cluster defaults.
273

274
    @type instance: L{objects.Instance}
275
    @param instance: an Instance object
276
    @type hvp: dict or None
277
    @param hvp: a dictionary with overridden hypervisor parameters
278
    @type bep: dict or None
279
    @param bep: a dictionary with overridden backend parameters
280
    @rtype: dict
281
    @return: the instance dict, with the hvparams filled with the
282
        cluster defaults
283

284
    """
285
    idict = instance.ToDict()
286
    cluster = self._cfg.GetClusterInfo()
287
    idict["hvparams"] = cluster.FillHV(instance)
288
    if hvp is not None:
289
      idict["hvparams"].update(hvp)
290
    idict["beparams"] = cluster.FillBE(instance)
291
    if bep is not None:
292
      idict["beparams"].update(bep)
293
    for nic in idict["nics"]:
294
      nic['nicparams'] = objects.FillDict(
295
        cluster.nicparams[constants.PP_DEFAULT],
296
        nic['nicparams'])
297
    return idict
298

    
299
  def _ConnectList(self, client, node_list, call):
300
    """Helper for computing node addresses.
301

302
    @type client: L{ganeti.rpc.Client}
303
    @param client: a C{Client} instance
304
    @type node_list: list
305
    @param node_list: the node list we should connect
306
    @type call: string
307
    @param call: the name of the remote procedure call, for filling in
308
        correctly any eventual offline nodes' results
309

310
    """
311
    all_nodes = self._cfg.GetAllNodesInfo()
312
    name_list = []
313
    addr_list = []
314
    skip_dict = {}
315
    for node in node_list:
316
      if node in all_nodes:
317
        if all_nodes[node].offline:
318
          skip_dict[node] = RpcResult(node=node, offline=True, call=call)
319
          continue
320
        val = all_nodes[node].primary_ip
321
      else:
322
        val = None
323
      addr_list.append(val)
324
      name_list.append(node)
325
    if name_list:
326
      client.ConnectList(name_list, address_list=addr_list)
327
    return skip_dict
328

    
329
  def _ConnectNode(self, client, node, call):
330
    """Helper for computing one node's address.
331

332
    @type client: L{ganeti.rpc.Client}
333
    @param client: a C{Client} instance
334
    @type node: str
335
    @param node: the node we should connect
336
    @type call: string
337
    @param call: the name of the remote procedure call, for filling in
338
        correctly any eventual offline nodes' results
339

340
    """
341
    node_info = self._cfg.GetNodeInfo(node)
342
    if node_info is not None:
343
      if node_info.offline:
344
        return RpcResult(node=node, offline=True, call=call)
345
      addr = node_info.primary_ip
346
    else:
347
      addr = None
348
    client.ConnectNode(node, address=addr)
349

    
350
  def _MultiNodeCall(self, node_list, procedure, args):
351
    """Helper for making a multi-node call
352

353
    """
354
    body = serializer.DumpJson(args, indent=False)
355
    c = Client(procedure, body, self.port)
356
    skip_dict = self._ConnectList(c, node_list, procedure)
357
    skip_dict.update(c.GetResults())
358
    return skip_dict
359

    
360
  @classmethod
361
  def _StaticMultiNodeCall(cls, node_list, procedure, args,
362
                           address_list=None):
363
    """Helper for making a multi-node static call
364

365
    """
366
    body = serializer.DumpJson(args, indent=False)
367
    c = Client(procedure, body, utils.GetDaemonPort(constants.NODED))
368
    c.ConnectList(node_list, address_list=address_list)
369
    return c.GetResults()
370

    
371
  def _SingleNodeCall(self, node, procedure, args):
372
    """Helper for making a single-node call
373

374
    """
375
    body = serializer.DumpJson(args, indent=False)
376
    c = Client(procedure, body, self.port)
377
    result = self._ConnectNode(c, node, procedure)
378
    if result is None:
379
      # we did connect, node is not offline
380
      result = c.GetResults()[node]
381
    return result
382

    
383
  @classmethod
384
  def _StaticSingleNodeCall(cls, node, procedure, args):
385
    """Helper for making a single-node static call
386

387
    """
388
    body = serializer.DumpJson(args, indent=False)
389
    c = Client(procedure, body, utils.GetDaemonPort(constants.NODED))
390
    c.ConnectNode(node)
391
    return c.GetResults()[node]
392

    
393
  @staticmethod
394
  def _Compress(data):
395
    """Compresses a string for transport over RPC.
396

397
    Small amounts of data are not compressed.
398

399
    @type data: str
400
    @param data: Data
401
    @rtype: tuple
402
    @return: Encoded data to send
403

404
    """
405
    # Small amounts of data are not compressed
406
    if len(data) < 512:
407
      return (constants.RPC_ENCODING_NONE, data)
408

    
409
    # Compress with zlib and encode in base64
410
    return (constants.RPC_ENCODING_ZLIB_BASE64,
411
            base64.b64encode(zlib.compress(data, 3)))
412

    
413
  #
414
  # Begin RPC calls
415
  #
416

    
417
  def call_lv_list(self, node_list, vg_name):
418
    """Gets the logical volumes present in a given volume group.
419

420
    This is a multi-node call.
421

422
    """
423
    return self._MultiNodeCall(node_list, "lv_list", [vg_name])
424

    
425
  def call_vg_list(self, node_list):
426
    """Gets the volume group list.
427

428
    This is a multi-node call.
429

430
    """
431
    return self._MultiNodeCall(node_list, "vg_list", [])
432

    
433
  def call_storage_list(self, node_list, su_name, su_args, name, fields):
434
    """Get list of storage units.
435

436
    This is a multi-node call.
437

438
    """
439
    return self._MultiNodeCall(node_list, "storage_list",
440
                               [su_name, su_args, name, fields])
441

    
442
  def call_storage_modify(self, node, su_name, su_args, name, changes):
443
    """Modify a storage unit.
444

445
    This is a single-node call.
446

447
    """
448
    return self._SingleNodeCall(node, "storage_modify",
449
                                [su_name, su_args, name, changes])
450

    
451
  def call_storage_execute(self, node, su_name, su_args, name, op):
452
    """Executes an operation on a storage unit.
453

454
    This is a single-node call.
455

456
    """
457
    return self._SingleNodeCall(node, "storage_execute",
458
                                [su_name, su_args, name, op])
459

    
460
  def call_bridges_exist(self, node, bridges_list):
461
    """Checks if a node has all the bridges given.
462

463
    This method checks if all bridges given in the bridges_list are
464
    present on the remote node, so that an instance that uses interfaces
465
    on those bridges can be started.
466

467
    This is a single-node call.
468

469
    """
470
    return self._SingleNodeCall(node, "bridges_exist", [bridges_list])
471

    
472
  def call_instance_start(self, node, instance, hvp, bep):
473
    """Starts an instance.
474

475
    This is a single-node call.
476

477
    """
478
    idict = self._InstDict(instance, hvp=hvp, bep=bep)
479
    return self._SingleNodeCall(node, "instance_start", [idict])
480

    
481
  def call_instance_shutdown(self, node, instance, timeout):
482
    """Stops an instance.
483

484
    This is a single-node call.
485

486
    """
487
    return self._SingleNodeCall(node, "instance_shutdown",
488
                                [self._InstDict(instance), timeout])
489

    
490
  def call_migration_info(self, node, instance):
491
    """Gather the information necessary to prepare an instance migration.
492

493
    This is a single-node call.
494

495
    @type node: string
496
    @param node: the node on which the instance is currently running
497
    @type instance: C{objects.Instance}
498
    @param instance: the instance definition
499

500
    """
501
    return self._SingleNodeCall(node, "migration_info",
502
                                [self._InstDict(instance)])
503

    
504
  def call_accept_instance(self, node, instance, info, target):
505
    """Prepare a node to accept an instance.
506

507
    This is a single-node call.
508

509
    @type node: string
510
    @param node: the target node for the migration
511
    @type instance: C{objects.Instance}
512
    @param instance: the instance definition
513
    @type info: opaque/hypervisor specific (string/data)
514
    @param info: result for the call_migration_info call
515
    @type target: string
516
    @param target: target hostname (usually ip address) (on the node itself)
517

518
    """
519
    return self._SingleNodeCall(node, "accept_instance",
520
                                [self._InstDict(instance), info, target])
521

    
522
  def call_finalize_migration(self, node, instance, info, success):
523
    """Finalize any target-node migration specific operation.
524

525
    This is called both in case of a successful migration and in case of error
526
    (in which case it should abort the migration).
527

528
    This is a single-node call.
529

530
    @type node: string
531
    @param node: the target node for the migration
532
    @type instance: C{objects.Instance}
533
    @param instance: the instance definition
534
    @type info: opaque/hypervisor specific (string/data)
535
    @param info: result for the call_migration_info call
536
    @type success: boolean
537
    @param success: whether the migration was a success or a failure
538

539
    """
540
    return self._SingleNodeCall(node, "finalize_migration",
541
                                [self._InstDict(instance), info, success])
542

    
543
  def call_instance_migrate(self, node, instance, target, live):
544
    """Migrate an instance.
545

546
    This is a single-node call.
547

548
    @type node: string
549
    @param node: the node on which the instance is currently running
550
    @type instance: C{objects.Instance}
551
    @param instance: the instance definition
552
    @type target: string
553
    @param target: the target node name
554
    @type live: boolean
555
    @param live: whether the migration should be done live or not (the
556
        interpretation of this parameter is left to the hypervisor)
557

558
    """
559
    return self._SingleNodeCall(node, "instance_migrate",
560
                                [self._InstDict(instance), target, live])
561

    
562
  def call_instance_reboot(self, node, inst, reboot_type, shutdown_timeout):
563
    """Reboots an instance.
564

565
    This is a single-node call.
566

567
    """
568
    return self._SingleNodeCall(node, "instance_reboot",
569
                                [self._InstDict(inst), reboot_type,
570
                                 shutdown_timeout])
571

    
572
  def call_instance_os_add(self, node, inst, reinstall):
573
    """Installs an OS on the given instance.
574

575
    This is a single-node call.
576

577
    """
578
    return self._SingleNodeCall(node, "instance_os_add",
579
                                [self._InstDict(inst), reinstall])
580

    
581
  def call_instance_run_rename(self, node, inst, old_name):
582
    """Run the OS rename script for an instance.
583

584
    This is a single-node call.
585

586
    """
587
    return self._SingleNodeCall(node, "instance_run_rename",
588
                                [self._InstDict(inst), old_name])
589

    
590
  def call_instance_info(self, node, instance, hname):
591
    """Returns information about a single instance.
592

593
    This is a single-node call.
594

595
    @type node: list
596
    @param node: the list of nodes to query
597
    @type instance: string
598
    @param instance: the instance name
599
    @type hname: string
600
    @param hname: the hypervisor type of the instance
601

602
    """
603
    return self._SingleNodeCall(node, "instance_info", [instance, hname])
604

    
605
  def call_instance_migratable(self, node, instance):
606
    """Checks whether the given instance can be migrated.
607

608
    This is a single-node call.
609

610
    @param node: the node to query
611
    @type instance: L{objects.Instance}
612
    @param instance: the instance to check
613

614

615
    """
616
    return self._SingleNodeCall(node, "instance_migratable",
617
                                [self._InstDict(instance)])
618

    
619
  def call_all_instances_info(self, node_list, hypervisor_list):
620
    """Returns information about all instances on the given nodes.
621

622
    This is a multi-node call.
623

624
    @type node_list: list
625
    @param node_list: the list of nodes to query
626
    @type hypervisor_list: list
627
    @param hypervisor_list: the hypervisors to query for instances
628

629
    """
630
    return self._MultiNodeCall(node_list, "all_instances_info",
631
                               [hypervisor_list])
632

    
633
  def call_instance_list(self, node_list, hypervisor_list):
634
    """Returns the list of running instances on a given node.
635

636
    This is a multi-node call.
637

638
    @type node_list: list
639
    @param node_list: the list of nodes to query
640
    @type hypervisor_list: list
641
    @param hypervisor_list: the hypervisors to query for instances
642

643
    """
644
    return self._MultiNodeCall(node_list, "instance_list", [hypervisor_list])
645

    
646
  def call_node_tcp_ping(self, node, source, target, port, timeout,
647
                         live_port_needed):
648
    """Do a TcpPing on the remote node
649

650
    This is a single-node call.
651

652
    """
653
    return self._SingleNodeCall(node, "node_tcp_ping",
654
                                [source, target, port, timeout,
655
                                 live_port_needed])
656

    
657
  def call_node_has_ip_address(self, node, address):
658
    """Checks if a node has the given IP address.
659

660
    This is a single-node call.
661

662
    """
663
    return self._SingleNodeCall(node, "node_has_ip_address", [address])
664

    
665
  def call_node_info(self, node_list, vg_name, hypervisor_type):
666
    """Return node information.
667

668
    This will return memory information and volume group size and free
669
    space.
670

671
    This is a multi-node call.
672

673
    @type node_list: list
674
    @param node_list: the list of nodes to query
675
    @type vg_name: C{string}
676
    @param vg_name: the name of the volume group to ask for disk space
677
        information
678
    @type hypervisor_type: C{str}
679
    @param hypervisor_type: the name of the hypervisor to ask for
680
        memory information
681

682
    """
683
    return self._MultiNodeCall(node_list, "node_info",
684
                               [vg_name, hypervisor_type])
685

    
686
  def call_node_add(self, node, dsa, dsapub, rsa, rsapub, ssh, sshpub):
687
    """Add a node to the cluster.
688

689
    This is a single-node call.
690

691
    """
692
    return self._SingleNodeCall(node, "node_add",
693
                                [dsa, dsapub, rsa, rsapub, ssh, sshpub])
694

    
695
  def call_node_verify(self, node_list, checkdict, cluster_name):
696
    """Request verification of given parameters.
697

698
    This is a multi-node call.
699

700
    """
701
    return self._MultiNodeCall(node_list, "node_verify",
702
                               [checkdict, cluster_name])
703

    
704
  @classmethod
705
  def call_node_start_master(cls, node, start_daemons, no_voting):
706
    """Tells a node to activate itself as a master.
707

708
    This is a single-node call.
709

710
    """
711
    return cls._StaticSingleNodeCall(node, "node_start_master",
712
                                     [start_daemons, no_voting])
713

    
714
  @classmethod
715
  def call_node_stop_master(cls, node, stop_daemons):
716
    """Tells a node to demote itself from master status.
717

718
    This is a single-node call.
719

720
    """
721
    return cls._StaticSingleNodeCall(node, "node_stop_master", [stop_daemons])
722

    
723
  @classmethod
724
  def call_master_info(cls, node_list):
725
    """Query master info.
726

727
    This is a multi-node call.
728

729
    """
730
    # TODO: should this method query down nodes?
731
    return cls._StaticMultiNodeCall(node_list, "master_info", [])
732

    
733
  def call_version(self, node_list):
734
    """Query node version.
735

736
    This is a multi-node call.
737

738
    """
739
    return self._MultiNodeCall(node_list, "version", [])
740

    
741
  def call_blockdev_create(self, node, bdev, size, owner, on_primary, info):
742
    """Request creation of a given block device.
743

744
    This is a single-node call.
745

746
    """
747
    return self._SingleNodeCall(node, "blockdev_create",
748
                                [bdev.ToDict(), size, owner, on_primary, info])
749

    
750
  def call_blockdev_remove(self, node, bdev):
751
    """Request removal of a given block device.
752

753
    This is a single-node call.
754

755
    """
756
    return self._SingleNodeCall(node, "blockdev_remove", [bdev.ToDict()])
757

    
758
  def call_blockdev_rename(self, node, devlist):
759
    """Request rename of the given block devices.
760

761
    This is a single-node call.
762

763
    """
764
    return self._SingleNodeCall(node, "blockdev_rename",
765
                                [(d.ToDict(), uid) for d, uid in devlist])
766

    
767
  def call_blockdev_assemble(self, node, disk, owner, on_primary):
768
    """Request assembling of a given block device.
769

770
    This is a single-node call.
771

772
    """
773
    return self._SingleNodeCall(node, "blockdev_assemble",
774
                                [disk.ToDict(), owner, on_primary])
775

    
776
  def call_blockdev_shutdown(self, node, disk):
777
    """Request shutdown of a given block device.
778

779
    This is a single-node call.
780

781
    """
782
    return self._SingleNodeCall(node, "blockdev_shutdown", [disk.ToDict()])
783

    
784
  def call_blockdev_addchildren(self, node, bdev, ndevs):
785
    """Request adding a list of children to a (mirroring) device.
786

787
    This is a single-node call.
788

789
    """
790
    return self._SingleNodeCall(node, "blockdev_addchildren",
791
                                [bdev.ToDict(),
792
                                 [disk.ToDict() for disk in ndevs]])
793

    
794
  def call_blockdev_removechildren(self, node, bdev, ndevs):
795
    """Request removing a list of children from a (mirroring) device.
796

797
    This is a single-node call.
798

799
    """
800
    return self._SingleNodeCall(node, "blockdev_removechildren",
801
                                [bdev.ToDict(),
802
                                 [disk.ToDict() for disk in ndevs]])
803

    
804
  def call_blockdev_getmirrorstatus(self, node, disks):
805
    """Request status of a (mirroring) device.
806

807
    This is a single-node call.
808

809
    """
810
    result = self._SingleNodeCall(node, "blockdev_getmirrorstatus",
811
                                  [dsk.ToDict() for dsk in disks])
812
    if not result.fail_msg:
813
      result.payload = [objects.BlockDevStatus.FromDict(i)
814
                        for i in result.payload]
815
    return result
816

    
817
  def call_blockdev_find(self, node, disk):
818
    """Request identification of a given block device.
819

820
    This is a single-node call.
821

822
    """
823
    result = self._SingleNodeCall(node, "blockdev_find", [disk.ToDict()])
824
    if not result.fail_msg and result.payload is not None:
825
      result.payload = objects.BlockDevStatus.FromDict(result.payload)
826
    return result
827

    
828
  def call_blockdev_close(self, node, instance_name, disks):
829
    """Closes the given block devices.
830

831
    This is a single-node call.
832

833
    """
834
    params = [instance_name, [cf.ToDict() for cf in disks]]
835
    return self._SingleNodeCall(node, "blockdev_close", params)
836

    
837
  def call_blockdev_getsizes(self, node, disks):
838
    """Returns the size of the given disks.
839

840
    This is a single-node call.
841

842
    """
843
    params = [[cf.ToDict() for cf in disks]]
844
    return self._SingleNodeCall(node, "blockdev_getsize", params)
845

    
846
  def call_drbd_disconnect_net(self, node_list, nodes_ip, disks):
847
    """Disconnects the network of the given drbd devices.
848

849
    This is a multi-node call.
850

851
    """
852
    return self._MultiNodeCall(node_list, "drbd_disconnect_net",
853
                               [nodes_ip, [cf.ToDict() for cf in disks]])
854

    
855
  def call_drbd_attach_net(self, node_list, nodes_ip,
856
                           disks, instance_name, multimaster):
857
    """Disconnects the given drbd devices.
858

859
    This is a multi-node call.
860

861
    """
862
    return self._MultiNodeCall(node_list, "drbd_attach_net",
863
                               [nodes_ip, [cf.ToDict() for cf in disks],
864
                                instance_name, multimaster])
865

    
866
  def call_drbd_wait_sync(self, node_list, nodes_ip, disks):
867
    """Waits for the synchronization of drbd devices is complete.
868

869
    This is a multi-node call.
870

871
    """
872
    return self._MultiNodeCall(node_list, "drbd_wait_sync",
873
                               [nodes_ip, [cf.ToDict() for cf in disks]])
874

    
875
  @classmethod
876
  def call_upload_file(cls, node_list, file_name, address_list=None):
877
    """Upload a file.
878

879
    The node will refuse the operation in case the file is not on the
880
    approved file list.
881

882
    This is a multi-node call.
883

884
    @type node_list: list
885
    @param node_list: the list of node names to upload to
886
    @type file_name: str
887
    @param file_name: the filename to upload
888
    @type address_list: list or None
889
    @keyword address_list: an optional list of node addresses, in order
890
        to optimize the RPC speed
891

892
    """
893
    file_contents = utils.ReadFile(file_name)
894
    data = cls._Compress(file_contents)
895
    st = os.stat(file_name)
896
    params = [file_name, data, st.st_mode, st.st_uid, st.st_gid,
897
              st.st_atime, st.st_mtime]
898
    return cls._StaticMultiNodeCall(node_list, "upload_file", params,
899
                                    address_list=address_list)
900

    
901
  @classmethod
902
  def call_write_ssconf_files(cls, node_list, values):
903
    """Write ssconf files.
904

905
    This is a multi-node call.
906

907
    """
908
    return cls._StaticMultiNodeCall(node_list, "write_ssconf_files", [values])
909

    
910
  def call_os_diagnose(self, node_list):
911
    """Request a diagnose of OS definitions.
912

913
    This is a multi-node call.
914

915
    """
916
    return self._MultiNodeCall(node_list, "os_diagnose", [])
917

    
918
  def call_os_get(self, node, name):
919
    """Returns an OS definition.
920

921
    This is a single-node call.
922

923
    """
924
    result = self._SingleNodeCall(node, "os_get", [name])
925
    if not result.fail_msg and isinstance(result.payload, dict):
926
      result.payload = objects.OS.FromDict(result.payload)
927
    return result
928

    
929
  def call_hooks_runner(self, node_list, hpath, phase, env):
930
    """Call the hooks runner.
931

932
    Args:
933
      - op: the OpCode instance
934
      - env: a dictionary with the environment
935

936
    This is a multi-node call.
937

938
    """
939
    params = [hpath, phase, env]
940
    return self._MultiNodeCall(node_list, "hooks_runner", params)
941

    
942
  def call_iallocator_runner(self, node, name, idata):
943
    """Call an iallocator on a remote node
944

945
    Args:
946
      - name: the iallocator name
947
      - input: the json-encoded input string
948

949
    This is a single-node call.
950

951
    """
952
    return self._SingleNodeCall(node, "iallocator_runner", [name, idata])
953

    
954
  def call_blockdev_grow(self, node, cf_bdev, amount):
955
    """Request a snapshot of the given block device.
956

957
    This is a single-node call.
958

959
    """
960
    return self._SingleNodeCall(node, "blockdev_grow",
961
                                [cf_bdev.ToDict(), amount])
962

    
963
  def call_blockdev_export(self, node, cf_bdev,
964
                           dest_node, dest_path, cluster_name):
965
    """Export a given disk to another node.
966

967
    This is a single-node call.
968

969
    """
970
    return self._SingleNodeCall(node, "blockdev_export",
971
                                [cf_bdev.ToDict(), dest_node, dest_path,
972
                                 cluster_name])
973

    
974
  def call_blockdev_snapshot(self, node, cf_bdev):
975
    """Request a snapshot of the given block device.
976

977
    This is a single-node call.
978

979
    """
980
    return self._SingleNodeCall(node, "blockdev_snapshot", [cf_bdev.ToDict()])
981

    
982
  def call_snapshot_export(self, node, snap_bdev, dest_node, instance,
983
                           cluster_name, idx):
984
    """Request the export of a given snapshot.
985

986
    This is a single-node call.
987

988
    """
989
    return self._SingleNodeCall(node, "snapshot_export",
990
                                [snap_bdev.ToDict(), dest_node,
991
                                 self._InstDict(instance), cluster_name, idx])
992

    
993
  def call_finalize_export(self, node, instance, snap_disks):
994
    """Request the completion of an export operation.
995

996
    This writes the export config file, etc.
997

998
    This is a single-node call.
999

1000
    """
1001
    flat_disks = []
1002
    for disk in snap_disks:
1003
      if isinstance(disk, bool):
1004
        flat_disks.append(disk)
1005
      else:
1006
        flat_disks.append(disk.ToDict())
1007

    
1008
    return self._SingleNodeCall(node, "finalize_export",
1009
                                [self._InstDict(instance), flat_disks])
1010

    
1011
  def call_export_info(self, node, path):
1012
    """Queries the export information in a given path.
1013

1014
    This is a single-node call.
1015

1016
    """
1017
    return self._SingleNodeCall(node, "export_info", [path])
1018

    
1019
  def call_instance_os_import(self, node, inst, src_node, src_images,
1020
                              cluster_name):
1021
    """Request the import of a backup into an instance.
1022

1023
    This is a single-node call.
1024

1025
    """
1026
    return self._SingleNodeCall(node, "instance_os_import",
1027
                                [self._InstDict(inst), src_node, src_images,
1028
                                 cluster_name])
1029

    
1030
  def call_export_list(self, node_list):
1031
    """Gets the stored exports list.
1032

1033
    This is a multi-node call.
1034

1035
    """
1036
    return self._MultiNodeCall(node_list, "export_list", [])
1037

    
1038
  def call_export_remove(self, node, export):
1039
    """Requests removal of a given export.
1040

1041
    This is a single-node call.
1042

1043
    """
1044
    return self._SingleNodeCall(node, "export_remove", [export])
1045

    
1046
  @classmethod
1047
  def call_node_leave_cluster(cls, node, modify_ssh_setup):
1048
    """Requests a node to clean the cluster information it has.
1049

1050
    This will remove the configuration information from the ganeti data
1051
    dir.
1052

1053
    This is a single-node call.
1054

1055
    """
1056
    return cls._StaticSingleNodeCall(node, "node_leave_cluster",
1057
                                     [modify_ssh_setup])
1058

    
1059
  def call_node_volumes(self, node_list):
1060
    """Gets all volumes on node(s).
1061

1062
    This is a multi-node call.
1063

1064
    """
1065
    return self._MultiNodeCall(node_list, "node_volumes", [])
1066

    
1067
  def call_node_demote_from_mc(self, node):
1068
    """Demote a node from the master candidate role.
1069

1070
    This is a single-node call.
1071

1072
    """
1073
    return self._SingleNodeCall(node, "node_demote_from_mc", [])
1074

    
1075

    
1076
  def call_node_powercycle(self, node, hypervisor):
1077
    """Tries to powercycle a node.
1078

1079
    This is a single-node call.
1080

1081
    """
1082
    return self._SingleNodeCall(node, "node_powercycle", [hypervisor])
1083

    
1084

    
1085
  def call_test_delay(self, node_list, duration):
1086
    """Sleep for a fixed time on given node(s).
1087

1088
    This is a multi-node call.
1089

1090
    """
1091
    return self._MultiNodeCall(node_list, "test_delay", [duration])
1092

    
1093
  def call_file_storage_dir_create(self, node, file_storage_dir):
1094
    """Create the given file storage directory.
1095

1096
    This is a single-node call.
1097

1098
    """
1099
    return self._SingleNodeCall(node, "file_storage_dir_create",
1100
                                [file_storage_dir])
1101

    
1102
  def call_file_storage_dir_remove(self, node, file_storage_dir):
1103
    """Remove the given file storage directory.
1104

1105
    This is a single-node call.
1106

1107
    """
1108
    return self._SingleNodeCall(node, "file_storage_dir_remove",
1109
                                [file_storage_dir])
1110

    
1111
  def call_file_storage_dir_rename(self, node, old_file_storage_dir,
1112
                                   new_file_storage_dir):
1113
    """Rename file storage directory.
1114

1115
    This is a single-node call.
1116

1117
    """
1118
    return self._SingleNodeCall(node, "file_storage_dir_rename",
1119
                                [old_file_storage_dir, new_file_storage_dir])
1120

    
1121
  @classmethod
1122
  def call_jobqueue_update(cls, node_list, address_list, file_name, content):
1123
    """Update job queue.
1124

1125
    This is a multi-node call.
1126

1127
    """
1128
    return cls._StaticMultiNodeCall(node_list, "jobqueue_update",
1129
                                    [file_name, cls._Compress(content)],
1130
                                    address_list=address_list)
1131

    
1132
  @classmethod
1133
  def call_jobqueue_purge(cls, node):
1134
    """Purge job queue.
1135

1136
    This is a single-node call.
1137

1138
    """
1139
    return cls._StaticSingleNodeCall(node, "jobqueue_purge", [])
1140

    
1141
  @classmethod
1142
  def call_jobqueue_rename(cls, node_list, address_list, rename):
1143
    """Rename a job queue file.
1144

1145
    This is a multi-node call.
1146

1147
    """
1148
    return cls._StaticMultiNodeCall(node_list, "jobqueue_rename", rename,
1149
                                    address_list=address_list)
1150

    
1151
  @classmethod
1152
  def call_jobqueue_set_drain(cls, node_list, drain_flag):
1153
    """Set the drain flag on the queue.
1154

1155
    This is a multi-node call.
1156

1157
    @type node_list: list
1158
    @param node_list: the list of nodes to query
1159
    @type drain_flag: bool
1160
    @param drain_flag: if True, will set the drain flag, otherwise reset it.
1161

1162
    """
1163
    return cls._StaticMultiNodeCall(node_list, "jobqueue_set_drain",
1164
                                    [drain_flag])
1165

    
1166
  def call_hypervisor_validate_params(self, node_list, hvname, hvparams):
1167
    """Validate the hypervisor params.
1168

1169
    This is a multi-node call.
1170

1171
    @type node_list: list
1172
    @param node_list: the list of nodes to query
1173
    @type hvname: string
1174
    @param hvname: the hypervisor name
1175
    @type hvparams: dict
1176
    @param hvparams: the hypervisor parameters to be validated
1177

1178
    """
1179
    cluster = self._cfg.GetClusterInfo()
1180
    hv_full = objects.FillDict(cluster.hvparams.get(hvname, {}), hvparams)
1181
    return self._MultiNodeCall(node_list, "hypervisor_validate_params",
1182
                               [hvname, hv_full])