Statistics
| Branch: | Tag: | Revision:

root / lib / rpc.py @ fe89794e

History | View | Annotate | Download (34.8 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Inter-node RPC library.
23

24
"""
25

    
26
# pylint: disable-msg=C0103,R0201,R0904
27
# C0103: Invalid name, since call_ are not valid
28
# R0201: Method could be a function, we keep all rpcs instance methods
29
# as not to change them back and forth between static/instance methods
30
# if they need to start using instance attributes
31
# R0904: Too many public methods
32

    
33
import os
34
import logging
35
import zlib
36
import base64
37

    
38
from ganeti import utils
39
from ganeti import objects
40
from ganeti import http
41
from ganeti import serializer
42
from ganeti import constants
43
from ganeti import errors
44

    
45
import ganeti.http.client
46

    
47

    
48
# Module level variable
49
_http_manager = None
50

    
51

    
52
def Init():
53
  """Initializes the module-global HTTP client manager.
54

55
  Must be called before using any RPC function.
56

57
  """
58
  global _http_manager
59

    
60
  assert not _http_manager, "RPC module initialized more than once"
61

    
62
  _http_manager = http.client.HttpClientManager()
63

    
64

    
65
def Shutdown():
66
  """Stops the module-global HTTP client manager.
67

68
  Must be called before quitting the program.
69

70
  """
71
  global _http_manager
72

    
73
  if _http_manager:
74
    _http_manager.Shutdown()
75
    _http_manager = None
76

    
77

    
78
class RpcResult(object):
79
  """RPC Result class.
80

81
  This class holds an RPC result. It is needed since in multi-node
82
  calls we can't raise an exception just because one one out of many
83
  failed, and therefore we use this class to encapsulate the result.
84

85
  @ivar data: the data payload, for successful results, or None
86
  @type failed: boolean
87
  @ivar failed: whether the operation failed at transport level (not
88
      application level on the remote node)
89
  @ivar call: the name of the RPC call
90
  @ivar node: the name of the node to which we made the call
91
  @ivar offline: whether the operation failed because the node was
92
      offline, as opposed to actual failure; offline=True will always
93
      imply failed=True, in order to allow simpler checking if
94
      the user doesn't care about the exact failure mode
95
  @ivar fail_msg: the error message if the call failed
96

97
  """
98
  def __init__(self, data=None, failed=False, offline=False,
99
               call=None, node=None):
100
    self.failed = failed
101
    self.offline = offline
102
    self.call = call
103
    self.node = node
104
    if offline:
105
      self.failed = True
106
      self.fail_msg = "Node is marked offline"
107
      self.data = self.payload = None
108
    elif failed:
109
      self.fail_msg = self._EnsureErr(data)
110
      self.data = self.payload = None
111
    else:
112
      self.data = data
113
      if not isinstance(self.data, (tuple, list)):
114
        self.failed = True
115
        self.fail_msg = ("RPC layer error: invalid result type (%s)" %
116
                         type(self.data))
117
      elif len(data) != 2:
118
        self.failed = True
119
        self.fail_msg = ("RPC layer error: invalid result length (%d), "
120
                         "expected 2" % len(self.data))
121
      elif not self.data[0]:
122
        self.failed = True
123
        self.fail_msg = self._EnsureErr(self.data[1])
124
      else:
125
        # finally success
126
        self.fail_msg = None
127
        self.payload = data[1]
128

    
129
  @staticmethod
130
  def _EnsureErr(val):
131
    """Helper to ensure we return a 'True' value for error."""
132
    if val:
133
      return val
134
    else:
135
      return "No error information"
136

    
137
  def Raise(self, msg, prereq=False):
138
    """If the result has failed, raise an OpExecError.
139

140
    This is used so that LU code doesn't have to check for each
141
    result, but instead can call this function.
142

143
    """
144
    if not self.fail_msg:
145
      return
146

    
147
    if not msg: # one could pass None for default message
148
      msg = ("Call '%s' to node '%s' has failed: %s" %
149
             (self.call, self.node, self.fail_msg))
150
    else:
151
      msg = "%s: %s" % (msg, self.fail_msg)
152
    if prereq:
153
      ec = errors.OpPrereqError
154
    else:
155
      ec = errors.OpExecError
156
    raise ec(msg)
157

    
158
  def RemoteFailMsg(self):
159
    """Check if the remote procedure failed.
160

161
    @return: the fail_msg attribute
162

163
    """
164
    return self.fail_msg
165

    
166

    
167
class Client:
168
  """RPC Client class.
169

170
  This class, given a (remote) method name, a list of parameters and a
171
  list of nodes, will contact (in parallel) all nodes, and return a
172
  dict of results (key: node name, value: result).
173

174
  One current bug is that generic failure is still signaled by
175
  'False' result, which is not good. This overloading of values can
176
  cause bugs.
177

178
  """
179
  def __init__(self, procedure, body, port):
180
    self.procedure = procedure
181
    self.body = body
182
    self.port = port
183
    self.nc = {}
184

    
185
    self._ssl_params = \
186
      http.HttpSslParams(ssl_key_path=constants.SSL_CERT_FILE,
187
                         ssl_cert_path=constants.SSL_CERT_FILE)
188

    
189
  def ConnectList(self, node_list, address_list=None):
190
    """Add a list of nodes to the target nodes.
191

192
    @type node_list: list
193
    @param node_list: the list of node names to connect
194
    @type address_list: list or None
195
    @keyword address_list: either None or a list with node addresses,
196
        which must have the same length as the node list
197

198
    """
199
    if address_list is None:
200
      address_list = [None for _ in node_list]
201
    else:
202
      assert len(node_list) == len(address_list), \
203
             "Name and address lists should have the same length"
204
    for node, address in zip(node_list, address_list):
205
      self.ConnectNode(node, address)
206

    
207
  def ConnectNode(self, name, address=None):
208
    """Add a node to the target list.
209

210
    @type name: str
211
    @param name: the node name
212
    @type address: str
213
    @keyword address: the node address, if known
214

215
    """
216
    if address is None:
217
      address = name
218

    
219
    self.nc[name] = \
220
      http.client.HttpClientRequest(address, self.port, http.HTTP_PUT,
221
                                    "/%s" % self.procedure,
222
                                    post_data=self.body,
223
                                    ssl_params=self._ssl_params,
224
                                    ssl_verify_peer=True)
225

    
226
  def GetResults(self):
227
    """Call nodes and return results.
228

229
    @rtype: list
230
    @return: List of RPC results
231

232
    """
233
    assert _http_manager, "RPC module not initialized"
234

    
235
    _http_manager.ExecRequests(self.nc.values())
236

    
237
    results = {}
238

    
239
    for name, req in self.nc.iteritems():
240
      if req.success and req.resp_status_code == http.HTTP_OK:
241
        results[name] = RpcResult(data=serializer.LoadJson(req.resp_body),
242
                                  node=name, call=self.procedure)
243
        continue
244

    
245
      # TODO: Better error reporting
246
      if req.error:
247
        msg = req.error
248
      else:
249
        msg = req.resp_body
250

    
251
      logging.error("RPC error in %s from node %s: %s",
252
                    self.procedure, name, msg)
253
      results[name] = RpcResult(data=msg, failed=True, node=name,
254
                                call=self.procedure)
255

    
256
    return results
257

    
258

    
259
class RpcRunner(object):
260
  """RPC runner class"""
261

    
262
  def __init__(self, cfg):
263
    """Initialized the rpc runner.
264

265
    @type cfg:  C{config.ConfigWriter}
266
    @param cfg: the configuration object that will be used to get data
267
                about the cluster
268

269
    """
270
    self._cfg = cfg
271
    self.port = utils.GetDaemonPort(constants.NODED)
272

    
273
  def _InstDict(self, instance, hvp=None, bep=None):
274
    """Convert the given instance to a dict.
275

276
    This is done via the instance's ToDict() method and additionally
277
    we fill the hvparams with the cluster defaults.
278

279
    @type instance: L{objects.Instance}
280
    @param instance: an Instance object
281
    @type hvp: dict or None
282
    @param hvp: a dictionary with overridden hypervisor parameters
283
    @type bep: dict or None
284
    @param bep: a dictionary with overridden backend parameters
285
    @rtype: dict
286
    @return: the instance dict, with the hvparams filled with the
287
        cluster defaults
288

289
    """
290
    idict = instance.ToDict()
291
    cluster = self._cfg.GetClusterInfo()
292
    idict["hvparams"] = cluster.FillHV(instance)
293
    if hvp is not None:
294
      idict["hvparams"].update(hvp)
295
    idict["beparams"] = cluster.FillBE(instance)
296
    if bep is not None:
297
      idict["beparams"].update(bep)
298
    for nic in idict["nics"]:
299
      nic['nicparams'] = objects.FillDict(
300
        cluster.nicparams[constants.PP_DEFAULT],
301
        nic['nicparams'])
302
    return idict
303

    
304
  def _ConnectList(self, client, node_list, call):
305
    """Helper for computing node addresses.
306

307
    @type client: L{ganeti.rpc.Client}
308
    @param client: a C{Client} instance
309
    @type node_list: list
310
    @param node_list: the node list we should connect
311
    @type call: string
312
    @param call: the name of the remote procedure call, for filling in
313
        correctly any eventual offline nodes' results
314

315
    """
316
    all_nodes = self._cfg.GetAllNodesInfo()
317
    name_list = []
318
    addr_list = []
319
    skip_dict = {}
320
    for node in node_list:
321
      if node in all_nodes:
322
        if all_nodes[node].offline:
323
          skip_dict[node] = RpcResult(node=node, offline=True, call=call)
324
          continue
325
        val = all_nodes[node].primary_ip
326
      else:
327
        val = None
328
      addr_list.append(val)
329
      name_list.append(node)
330
    if name_list:
331
      client.ConnectList(name_list, address_list=addr_list)
332
    return skip_dict
333

    
334
  def _ConnectNode(self, client, node, call):
335
    """Helper for computing one node's address.
336

337
    @type client: L{ganeti.rpc.Client}
338
    @param client: a C{Client} instance
339
    @type node: str
340
    @param node: the node we should connect
341
    @type call: string
342
    @param call: the name of the remote procedure call, for filling in
343
        correctly any eventual offline nodes' results
344

345
    """
346
    node_info = self._cfg.GetNodeInfo(node)
347
    if node_info is not None:
348
      if node_info.offline:
349
        return RpcResult(node=node, offline=True, call=call)
350
      addr = node_info.primary_ip
351
    else:
352
      addr = None
353
    client.ConnectNode(node, address=addr)
354

    
355
  def _MultiNodeCall(self, node_list, procedure, args):
356
    """Helper for making a multi-node call
357

358
    """
359
    body = serializer.DumpJson(args, indent=False)
360
    c = Client(procedure, body, self.port)
361
    skip_dict = self._ConnectList(c, node_list, procedure)
362
    skip_dict.update(c.GetResults())
363
    return skip_dict
364

    
365
  @classmethod
366
  def _StaticMultiNodeCall(cls, node_list, procedure, args,
367
                           address_list=None):
368
    """Helper for making a multi-node static call
369

370
    """
371
    body = serializer.DumpJson(args, indent=False)
372
    c = Client(procedure, body, utils.GetDaemonPort(constants.NODED))
373
    c.ConnectList(node_list, address_list=address_list)
374
    return c.GetResults()
375

    
376
  def _SingleNodeCall(self, node, procedure, args):
377
    """Helper for making a single-node call
378

379
    """
380
    body = serializer.DumpJson(args, indent=False)
381
    c = Client(procedure, body, self.port)
382
    result = self._ConnectNode(c, node, procedure)
383
    if result is None:
384
      # we did connect, node is not offline
385
      result = c.GetResults()[node]
386
    return result
387

    
388
  @classmethod
389
  def _StaticSingleNodeCall(cls, node, procedure, args):
390
    """Helper for making a single-node static call
391

392
    """
393
    body = serializer.DumpJson(args, indent=False)
394
    c = Client(procedure, body, utils.GetDaemonPort(constants.NODED))
395
    c.ConnectNode(node)
396
    return c.GetResults()[node]
397

    
398
  @staticmethod
399
  def _Compress(data):
400
    """Compresses a string for transport over RPC.
401

402
    Small amounts of data are not compressed.
403

404
    @type data: str
405
    @param data: Data
406
    @rtype: tuple
407
    @return: Encoded data to send
408

409
    """
410
    # Small amounts of data are not compressed
411
    if len(data) < 512:
412
      return (constants.RPC_ENCODING_NONE, data)
413

    
414
    # Compress with zlib and encode in base64
415
    return (constants.RPC_ENCODING_ZLIB_BASE64,
416
            base64.b64encode(zlib.compress(data, 3)))
417

    
418
  #
419
  # Begin RPC calls
420
  #
421

    
422
  def call_lv_list(self, node_list, vg_name):
423
    """Gets the logical volumes present in a given volume group.
424

425
    This is a multi-node call.
426

427
    """
428
    return self._MultiNodeCall(node_list, "lv_list", [vg_name])
429

    
430
  def call_vg_list(self, node_list):
431
    """Gets the volume group list.
432

433
    This is a multi-node call.
434

435
    """
436
    return self._MultiNodeCall(node_list, "vg_list", [])
437

    
438
  def call_storage_list(self, node_list, su_name, su_args, name, fields):
439
    """Get list of storage units.
440

441
    This is a multi-node call.
442

443
    """
444
    return self._MultiNodeCall(node_list, "storage_list",
445
                               [su_name, su_args, name, fields])
446

    
447
  def call_storage_modify(self, node, su_name, su_args, name, changes):
448
    """Modify a storage unit.
449

450
    This is a single-node call.
451

452
    """
453
    return self._SingleNodeCall(node, "storage_modify",
454
                                [su_name, su_args, name, changes])
455

    
456
  def call_storage_execute(self, node, su_name, su_args, name, op):
457
    """Executes an operation on a storage unit.
458

459
    This is a single-node call.
460

461
    """
462
    return self._SingleNodeCall(node, "storage_execute",
463
                                [su_name, su_args, name, op])
464

    
465
  def call_bridges_exist(self, node, bridges_list):
466
    """Checks if a node has all the bridges given.
467

468
    This method checks if all bridges given in the bridges_list are
469
    present on the remote node, so that an instance that uses interfaces
470
    on those bridges can be started.
471

472
    This is a single-node call.
473

474
    """
475
    return self._SingleNodeCall(node, "bridges_exist", [bridges_list])
476

    
477
  def call_instance_start(self, node, instance, hvp, bep):
478
    """Starts an instance.
479

480
    This is a single-node call.
481

482
    """
483
    idict = self._InstDict(instance, hvp=hvp, bep=bep)
484
    return self._SingleNodeCall(node, "instance_start", [idict])
485

    
486
  def call_instance_shutdown(self, node, instance):
487
    """Stops an instance.
488

489
    This is a single-node call.
490

491
    """
492
    return self._SingleNodeCall(node, "instance_shutdown",
493
                                [self._InstDict(instance)])
494

    
495
  def call_migration_info(self, node, instance):
496
    """Gather the information necessary to prepare an instance migration.
497

498
    This is a single-node call.
499

500
    @type node: string
501
    @param node: the node on which the instance is currently running
502
    @type instance: C{objects.Instance}
503
    @param instance: the instance definition
504

505
    """
506
    return self._SingleNodeCall(node, "migration_info",
507
                                [self._InstDict(instance)])
508

    
509
  def call_accept_instance(self, node, instance, info, target):
510
    """Prepare a node to accept an instance.
511

512
    This is a single-node call.
513

514
    @type node: string
515
    @param node: the target node for the migration
516
    @type instance: C{objects.Instance}
517
    @param instance: the instance definition
518
    @type info: opaque/hypervisor specific (string/data)
519
    @param info: result for the call_migration_info call
520
    @type target: string
521
    @param target: target hostname (usually ip address) (on the node itself)
522

523
    """
524
    return self._SingleNodeCall(node, "accept_instance",
525
                                [self._InstDict(instance), info, target])
526

    
527
  def call_finalize_migration(self, node, instance, info, success):
528
    """Finalize any target-node migration specific operation.
529

530
    This is called both in case of a successful migration and in case of error
531
    (in which case it should abort the migration).
532

533
    This is a single-node call.
534

535
    @type node: string
536
    @param node: the target node for the migration
537
    @type instance: C{objects.Instance}
538
    @param instance: the instance definition
539
    @type info: opaque/hypervisor specific (string/data)
540
    @param info: result for the call_migration_info call
541
    @type success: boolean
542
    @param success: whether the migration was a success or a failure
543

544
    """
545
    return self._SingleNodeCall(node, "finalize_migration",
546
                                [self._InstDict(instance), info, success])
547

    
548
  def call_instance_migrate(self, node, instance, target, live):
549
    """Migrate an instance.
550

551
    This is a single-node call.
552

553
    @type node: string
554
    @param node: the node on which the instance is currently running
555
    @type instance: C{objects.Instance}
556
    @param instance: the instance definition
557
    @type target: string
558
    @param target: the target node name
559
    @type live: boolean
560
    @param live: whether the migration should be done live or not (the
561
        interpretation of this parameter is left to the hypervisor)
562

563
    """
564
    return self._SingleNodeCall(node, "instance_migrate",
565
                                [self._InstDict(instance), target, live])
566

    
567
  def call_instance_reboot(self, node, instance, reboot_type):
568
    """Reboots an instance.
569

570
    This is a single-node call.
571

572
    """
573
    return self._SingleNodeCall(node, "instance_reboot",
574
                                [self._InstDict(instance), reboot_type])
575

    
576
  def call_instance_os_add(self, node, inst, reinstall):
577
    """Installs an OS on the given instance.
578

579
    This is a single-node call.
580

581
    """
582
    return self._SingleNodeCall(node, "instance_os_add",
583
                                [self._InstDict(inst), reinstall])
584

    
585
  def call_instance_run_rename(self, node, inst, old_name):
586
    """Run the OS rename script for an instance.
587

588
    This is a single-node call.
589

590
    """
591
    return self._SingleNodeCall(node, "instance_run_rename",
592
                                [self._InstDict(inst), old_name])
593

    
594
  def call_instance_info(self, node, instance, hname):
595
    """Returns information about a single instance.
596

597
    This is a single-node call.
598

599
    @type node: list
600
    @param node: the list of nodes to query
601
    @type instance: string
602
    @param instance: the instance name
603
    @type hname: string
604
    @param hname: the hypervisor type of the instance
605

606
    """
607
    return self._SingleNodeCall(node, "instance_info", [instance, hname])
608

    
609
  def call_instance_migratable(self, node, instance):
610
    """Checks whether the given instance can be migrated.
611

612
    This is a single-node call.
613

614
    @param node: the node to query
615
    @type instance: L{objects.Instance}
616
    @param instance: the instance to check
617

618

619
    """
620
    return self._SingleNodeCall(node, "instance_migratable",
621
                                [self._InstDict(instance)])
622

    
623
  def call_all_instances_info(self, node_list, hypervisor_list):
624
    """Returns information about all instances on the given nodes.
625

626
    This is a multi-node call.
627

628
    @type node_list: list
629
    @param node_list: the list of nodes to query
630
    @type hypervisor_list: list
631
    @param hypervisor_list: the hypervisors to query for instances
632

633
    """
634
    return self._MultiNodeCall(node_list, "all_instances_info",
635
                               [hypervisor_list])
636

    
637
  def call_instance_list(self, node_list, hypervisor_list):
638
    """Returns the list of running instances on a given node.
639

640
    This is a multi-node call.
641

642
    @type node_list: list
643
    @param node_list: the list of nodes to query
644
    @type hypervisor_list: list
645
    @param hypervisor_list: the hypervisors to query for instances
646

647
    """
648
    return self._MultiNodeCall(node_list, "instance_list", [hypervisor_list])
649

    
650
  def call_node_tcp_ping(self, node, source, target, port, timeout,
651
                         live_port_needed):
652
    """Do a TcpPing on the remote node
653

654
    This is a single-node call.
655

656
    """
657
    return self._SingleNodeCall(node, "node_tcp_ping",
658
                                [source, target, port, timeout,
659
                                 live_port_needed])
660

    
661
  def call_node_has_ip_address(self, node, address):
662
    """Checks if a node has the given IP address.
663

664
    This is a single-node call.
665

666
    """
667
    return self._SingleNodeCall(node, "node_has_ip_address", [address])
668

    
669
  def call_node_info(self, node_list, vg_name, hypervisor_type):
670
    """Return node information.
671

672
    This will return memory information and volume group size and free
673
    space.
674

675
    This is a multi-node call.
676

677
    @type node_list: list
678
    @param node_list: the list of nodes to query
679
    @type vg_name: C{string}
680
    @param vg_name: the name of the volume group to ask for disk space
681
        information
682
    @type hypervisor_type: C{str}
683
    @param hypervisor_type: the name of the hypervisor to ask for
684
        memory information
685

686
    """
687
    return self._MultiNodeCall(node_list, "node_info",
688
                               [vg_name, hypervisor_type])
689

    
690
  def call_node_add(self, node, dsa, dsapub, rsa, rsapub, ssh, sshpub):
691
    """Add a node to the cluster.
692

693
    This is a single-node call.
694

695
    """
696
    return self._SingleNodeCall(node, "node_add",
697
                                [dsa, dsapub, rsa, rsapub, ssh, sshpub])
698

    
699
  def call_node_verify(self, node_list, checkdict, cluster_name):
700
    """Request verification of given parameters.
701

702
    This is a multi-node call.
703

704
    """
705
    return self._MultiNodeCall(node_list, "node_verify",
706
                               [checkdict, cluster_name])
707

    
708
  @classmethod
709
  def call_node_start_master(cls, node, start_daemons, no_voting):
710
    """Tells a node to activate itself as a master.
711

712
    This is a single-node call.
713

714
    """
715
    return cls._StaticSingleNodeCall(node, "node_start_master",
716
                                     [start_daemons, no_voting])
717

    
718
  @classmethod
719
  def call_node_stop_master(cls, node, stop_daemons):
720
    """Tells a node to demote itself from master status.
721

722
    This is a single-node call.
723

724
    """
725
    return cls._StaticSingleNodeCall(node, "node_stop_master", [stop_daemons])
726

    
727
  @classmethod
728
  def call_master_info(cls, node_list):
729
    """Query master info.
730

731
    This is a multi-node call.
732

733
    """
734
    # TODO: should this method query down nodes?
735
    return cls._StaticMultiNodeCall(node_list, "master_info", [])
736

    
737
  def call_version(self, node_list):
738
    """Query node version.
739

740
    This is a multi-node call.
741

742
    """
743
    return self._MultiNodeCall(node_list, "version", [])
744

    
745
  def call_blockdev_create(self, node, bdev, size, owner, on_primary, info):
746
    """Request creation of a given block device.
747

748
    This is a single-node call.
749

750
    """
751
    return self._SingleNodeCall(node, "blockdev_create",
752
                                [bdev.ToDict(), size, owner, on_primary, info])
753

    
754
  def call_blockdev_remove(self, node, bdev):
755
    """Request removal of a given block device.
756

757
    This is a single-node call.
758

759
    """
760
    return self._SingleNodeCall(node, "blockdev_remove", [bdev.ToDict()])
761

    
762
  def call_blockdev_rename(self, node, devlist):
763
    """Request rename of the given block devices.
764

765
    This is a single-node call.
766

767
    """
768
    return self._SingleNodeCall(node, "blockdev_rename",
769
                                [(d.ToDict(), uid) for d, uid in devlist])
770

    
771
  def call_blockdev_assemble(self, node, disk, owner, on_primary):
772
    """Request assembling of a given block device.
773

774
    This is a single-node call.
775

776
    """
777
    return self._SingleNodeCall(node, "blockdev_assemble",
778
                                [disk.ToDict(), owner, on_primary])
779

    
780
  def call_blockdev_shutdown(self, node, disk):
781
    """Request shutdown of a given block device.
782

783
    This is a single-node call.
784

785
    """
786
    return self._SingleNodeCall(node, "blockdev_shutdown", [disk.ToDict()])
787

    
788
  def call_blockdev_addchildren(self, node, bdev, ndevs):
789
    """Request adding a list of children to a (mirroring) device.
790

791
    This is a single-node call.
792

793
    """
794
    return self._SingleNodeCall(node, "blockdev_addchildren",
795
                                [bdev.ToDict(),
796
                                 [disk.ToDict() for disk in ndevs]])
797

    
798
  def call_blockdev_removechildren(self, node, bdev, ndevs):
799
    """Request removing a list of children from a (mirroring) device.
800

801
    This is a single-node call.
802

803
    """
804
    return self._SingleNodeCall(node, "blockdev_removechildren",
805
                                [bdev.ToDict(),
806
                                 [disk.ToDict() for disk in ndevs]])
807

    
808
  def call_blockdev_getmirrorstatus(self, node, disks):
809
    """Request status of a (mirroring) device.
810

811
    This is a single-node call.
812

813
    """
814
    result = self._SingleNodeCall(node, "blockdev_getmirrorstatus",
815
                                  [dsk.ToDict() for dsk in disks])
816
    if not (result.failed or result.fail_msg):
817
      result.payload = [objects.BlockDevStatus.FromDict(i)
818
                        for i in result.payload]
819
    return result
820

    
821
  def call_blockdev_find(self, node, disk):
822
    """Request identification of a given block device.
823

824
    This is a single-node call.
825

826
    """
827
    result = self._SingleNodeCall(node, "blockdev_find", [disk.ToDict()])
828
    if not result.failed and result.payload is not None:
829
      result.payload = objects.BlockDevStatus.FromDict(result.payload)
830
    return result
831

    
832
  def call_blockdev_close(self, node, instance_name, disks):
833
    """Closes the given block devices.
834

835
    This is a single-node call.
836

837
    """
838
    params = [instance_name, [cf.ToDict() for cf in disks]]
839
    return self._SingleNodeCall(node, "blockdev_close", params)
840

    
841
  def call_blockdev_getsizes(self, node, disks):
842
    """Returns the size of the given disks.
843

844
    This is a single-node call.
845

846
    """
847
    params = [[cf.ToDict() for cf in disks]]
848
    return self._SingleNodeCall(node, "blockdev_getsize", params)
849

    
850
  def call_drbd_disconnect_net(self, node_list, nodes_ip, disks):
851
    """Disconnects the network of the given drbd devices.
852

853
    This is a multi-node call.
854

855
    """
856
    return self._MultiNodeCall(node_list, "drbd_disconnect_net",
857
                               [nodes_ip, [cf.ToDict() for cf in disks]])
858

    
859
  def call_drbd_attach_net(self, node_list, nodes_ip,
860
                           disks, instance_name, multimaster):
861
    """Disconnects the given drbd devices.
862

863
    This is a multi-node call.
864

865
    """
866
    return self._MultiNodeCall(node_list, "drbd_attach_net",
867
                               [nodes_ip, [cf.ToDict() for cf in disks],
868
                                instance_name, multimaster])
869

    
870
  def call_drbd_wait_sync(self, node_list, nodes_ip, disks):
871
    """Waits for the synchronization of drbd devices is complete.
872

873
    This is a multi-node call.
874

875
    """
876
    return self._MultiNodeCall(node_list, "drbd_wait_sync",
877
                               [nodes_ip, [cf.ToDict() for cf in disks]])
878

    
879
  @classmethod
880
  def call_upload_file(cls, node_list, file_name, address_list=None):
881
    """Upload a file.
882

883
    The node will refuse the operation in case the file is not on the
884
    approved file list.
885

886
    This is a multi-node call.
887

888
    @type node_list: list
889
    @param node_list: the list of node names to upload to
890
    @type file_name: str
891
    @param file_name: the filename to upload
892
    @type address_list: list or None
893
    @keyword address_list: an optional list of node addresses, in order
894
        to optimize the RPC speed
895

896
    """
897
    file_contents = utils.ReadFile(file_name)
898
    data = cls._Compress(file_contents)
899
    st = os.stat(file_name)
900
    params = [file_name, data, st.st_mode, st.st_uid, st.st_gid,
901
              st.st_atime, st.st_mtime]
902
    return cls._StaticMultiNodeCall(node_list, "upload_file", params,
903
                                    address_list=address_list)
904

    
905
  @classmethod
906
  def call_write_ssconf_files(cls, node_list, values):
907
    """Write ssconf files.
908

909
    This is a multi-node call.
910

911
    """
912
    return cls._StaticMultiNodeCall(node_list, "write_ssconf_files", [values])
913

    
914
  def call_os_diagnose(self, node_list):
915
    """Request a diagnose of OS definitions.
916

917
    This is a multi-node call.
918

919
    """
920
    return self._MultiNodeCall(node_list, "os_diagnose", [])
921

    
922
  def call_os_get(self, node, name):
923
    """Returns an OS definition.
924

925
    This is a single-node call.
926

927
    """
928
    result = self._SingleNodeCall(node, "os_get", [name])
929
    if not result.failed and isinstance(result.data, dict):
930
      result.data = objects.OS.FromDict(result.data)
931
    return result
932

    
933
  def call_hooks_runner(self, node_list, hpath, phase, env):
934
    """Call the hooks runner.
935

936
    Args:
937
      - op: the OpCode instance
938
      - env: a dictionary with the environment
939

940
    This is a multi-node call.
941

942
    """
943
    params = [hpath, phase, env]
944
    return self._MultiNodeCall(node_list, "hooks_runner", params)
945

    
946
  def call_iallocator_runner(self, node, name, idata):
947
    """Call an iallocator on a remote node
948

949
    Args:
950
      - name: the iallocator name
951
      - input: the json-encoded input string
952

953
    This is a single-node call.
954

955
    """
956
    return self._SingleNodeCall(node, "iallocator_runner", [name, idata])
957

    
958
  def call_blockdev_grow(self, node, cf_bdev, amount):
959
    """Request a snapshot of the given block device.
960

961
    This is a single-node call.
962

963
    """
964
    return self._SingleNodeCall(node, "blockdev_grow",
965
                                [cf_bdev.ToDict(), amount])
966

    
967
  def call_blockdev_export(self, node, cf_bdev,
968
                           dest_node, dest_path, cluster_name):
969
    """Export a given disk to another node.
970

971
    This is a single-node call.
972

973
    """
974
    return self._SingleNodeCall(node, "blockdev_export",
975
                                [cf_bdev.ToDict(), dest_node, dest_path,
976
                                 cluster_name])
977

    
978
  def call_blockdev_snapshot(self, node, cf_bdev):
979
    """Request a snapshot of the given block device.
980

981
    This is a single-node call.
982

983
    """
984
    return self._SingleNodeCall(node, "blockdev_snapshot", [cf_bdev.ToDict()])
985

    
986
  def call_snapshot_export(self, node, snap_bdev, dest_node, instance,
987
                           cluster_name, idx):
988
    """Request the export of a given snapshot.
989

990
    This is a single-node call.
991

992
    """
993
    return self._SingleNodeCall(node, "snapshot_export",
994
                                [snap_bdev.ToDict(), dest_node,
995
                                 self._InstDict(instance), cluster_name, idx])
996

    
997
  def call_finalize_export(self, node, instance, snap_disks):
998
    """Request the completion of an export operation.
999

1000
    This writes the export config file, etc.
1001

1002
    This is a single-node call.
1003

1004
    """
1005
    flat_disks = []
1006
    for disk in snap_disks:
1007
      if isinstance(disk, bool):
1008
        flat_disks.append(disk)
1009
      else:
1010
        flat_disks.append(disk.ToDict())
1011

    
1012
    return self._SingleNodeCall(node, "finalize_export",
1013
                                [self._InstDict(instance), flat_disks])
1014

    
1015
  def call_export_info(self, node, path):
1016
    """Queries the export information in a given path.
1017

1018
    This is a single-node call.
1019

1020
    """
1021
    return self._SingleNodeCall(node, "export_info", [path])
1022

    
1023
  def call_instance_os_import(self, node, inst, src_node, src_images,
1024
                              cluster_name):
1025
    """Request the import of a backup into an instance.
1026

1027
    This is a single-node call.
1028

1029
    """
1030
    return self._SingleNodeCall(node, "instance_os_import",
1031
                                [self._InstDict(inst), src_node, src_images,
1032
                                 cluster_name])
1033

    
1034
  def call_export_list(self, node_list):
1035
    """Gets the stored exports list.
1036

1037
    This is a multi-node call.
1038

1039
    """
1040
    return self._MultiNodeCall(node_list, "export_list", [])
1041

    
1042
  def call_export_remove(self, node, export):
1043
    """Requests removal of a given export.
1044

1045
    This is a single-node call.
1046

1047
    """
1048
    return self._SingleNodeCall(node, "export_remove", [export])
1049

    
1050
  @classmethod
1051
  def call_node_leave_cluster(cls, node):
1052
    """Requests a node to clean the cluster information it has.
1053

1054
    This will remove the configuration information from the ganeti data
1055
    dir.
1056

1057
    This is a single-node call.
1058

1059
    """
1060
    return cls._StaticSingleNodeCall(node, "node_leave_cluster", [])
1061

    
1062
  def call_node_volumes(self, node_list):
1063
    """Gets all volumes on node(s).
1064

1065
    This is a multi-node call.
1066

1067
    """
1068
    return self._MultiNodeCall(node_list, "node_volumes", [])
1069

    
1070
  def call_node_demote_from_mc(self, node):
1071
    """Demote a node from the master candidate role.
1072

1073
    This is a single-node call.
1074

1075
    """
1076
    return self._SingleNodeCall(node, "node_demote_from_mc", [])
1077

    
1078

    
1079
  def call_node_powercycle(self, node, hypervisor):
1080
    """Tries to powercycle a node.
1081

1082
    This is a single-node call.
1083

1084
    """
1085
    return self._SingleNodeCall(node, "node_powercycle", [hypervisor])
1086

    
1087

    
1088
  def call_test_delay(self, node_list, duration):
1089
    """Sleep for a fixed time on given node(s).
1090

1091
    This is a multi-node call.
1092

1093
    """
1094
    return self._MultiNodeCall(node_list, "test_delay", [duration])
1095

    
1096
  def call_file_storage_dir_create(self, node, file_storage_dir):
1097
    """Create the given file storage directory.
1098

1099
    This is a single-node call.
1100

1101
    """
1102
    return self._SingleNodeCall(node, "file_storage_dir_create",
1103
                                [file_storage_dir])
1104

    
1105
  def call_file_storage_dir_remove(self, node, file_storage_dir):
1106
    """Remove the given file storage directory.
1107

1108
    This is a single-node call.
1109

1110
    """
1111
    return self._SingleNodeCall(node, "file_storage_dir_remove",
1112
                                [file_storage_dir])
1113

    
1114
  def call_file_storage_dir_rename(self, node, old_file_storage_dir,
1115
                                   new_file_storage_dir):
1116
    """Rename file storage directory.
1117

1118
    This is a single-node call.
1119

1120
    """
1121
    return self._SingleNodeCall(node, "file_storage_dir_rename",
1122
                                [old_file_storage_dir, new_file_storage_dir])
1123

    
1124
  @classmethod
1125
  def call_jobqueue_update(cls, node_list, address_list, file_name, content):
1126
    """Update job queue.
1127

1128
    This is a multi-node call.
1129

1130
    """
1131
    return cls._StaticMultiNodeCall(node_list, "jobqueue_update",
1132
                                    [file_name, cls._Compress(content)],
1133
                                    address_list=address_list)
1134

    
1135
  @classmethod
1136
  def call_jobqueue_purge(cls, node):
1137
    """Purge job queue.
1138

1139
    This is a single-node call.
1140

1141
    """
1142
    return cls._StaticSingleNodeCall(node, "jobqueue_purge", [])
1143

    
1144
  @classmethod
1145
  def call_jobqueue_rename(cls, node_list, address_list, rename):
1146
    """Rename a job queue file.
1147

1148
    This is a multi-node call.
1149

1150
    """
1151
    return cls._StaticMultiNodeCall(node_list, "jobqueue_rename", rename,
1152
                                    address_list=address_list)
1153

    
1154
  @classmethod
1155
  def call_jobqueue_set_drain(cls, node_list, drain_flag):
1156
    """Set the drain flag on the queue.
1157

1158
    This is a multi-node call.
1159

1160
    @type node_list: list
1161
    @param node_list: the list of nodes to query
1162
    @type drain_flag: bool
1163
    @param drain_flag: if True, will set the drain flag, otherwise reset it.
1164

1165
    """
1166
    return cls._StaticMultiNodeCall(node_list, "jobqueue_set_drain",
1167
                                    [drain_flag])
1168

    
1169
  def call_hypervisor_validate_params(self, node_list, hvname, hvparams):
1170
    """Validate the hypervisor params.
1171

1172
    This is a multi-node call.
1173

1174
    @type node_list: list
1175
    @param node_list: the list of nodes to query
1176
    @type hvname: string
1177
    @param hvname: the hypervisor name
1178
    @type hvparams: dict
1179
    @param hvparams: the hypervisor parameters to be validated
1180

1181
    """
1182
    cluster = self._cfg.GetClusterInfo()
1183
    hv_full = objects.FillDict(cluster.hvparams.get(hvname, {}), hvparams)
1184
    return self._MultiNodeCall(node_list, "hypervisor_validate_params",
1185
                               [hvname, hv_full])