Statistics
| Branch: | Tag: | Revision:

root / lib / rpc.py @ 637b8d7e

History | View | Annotate | Download (34.3 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Inter-node RPC library.
23

24
"""
25

    
26
# pylint: disable-msg=C0103,R0201,R0904
27
# C0103: Invalid name, since call_ are not valid
28
# R0201: Method could be a function, we keep all rpcs instance methods
29
# as not to change them back and forth between static/instance methods
30
# if they need to start using instance attributes
31
# R0904: Too many public methods
32

    
33
import os
34
import logging
35
import zlib
36
import base64
37

    
38
from ganeti import utils
39
from ganeti import objects
40
from ganeti import http
41
from ganeti import serializer
42
from ganeti import constants
43
from ganeti import errors
44

    
45
import ganeti.http.client
46

    
47

    
48
# Module level variable
49
_http_manager = None
50

    
51

    
52
def Init():
53
  """Initializes the module-global HTTP client manager.
54

55
  Must be called before using any RPC function.
56

57
  """
58
  global _http_manager
59

    
60
  assert not _http_manager, "RPC module initialized more than once"
61

    
62
  _http_manager = http.client.HttpClientManager()
63

    
64

    
65
def Shutdown():
66
  """Stops the module-global HTTP client manager.
67

68
  Must be called before quitting the program.
69

70
  """
71
  global _http_manager
72

    
73
  if _http_manager:
74
    _http_manager.Shutdown()
75
    _http_manager = None
76

    
77

    
78
class RpcResult(object):
79
  """RPC Result class.
80

81
  This class holds an RPC result. It is needed since in multi-node
82
  calls we can't raise an exception just because one one out of many
83
  failed, and therefore we use this class to encapsulate the result.
84

85
  @ivar data: the data payload, for successful results, or None
86
  @type failed: boolean
87
  @ivar failed: whether the operation failed at transport level (not
88
      application level on the remote node)
89
  @ivar call: the name of the RPC call
90
  @ivar node: the name of the node to which we made the call
91
  @ivar offline: whether the operation failed because the node was
92
      offline, as opposed to actual failure; offline=True will always
93
      imply failed=True, in order to allow simpler checking if
94
      the user doesn't care about the exact failure mode
95
  @ivar fail_msg: the error message if the call failed
96

97
  """
98
  def __init__(self, data=None, failed=False, offline=False,
99
               call=None, node=None):
100
    self.failed = failed
101
    self.offline = offline
102
    self.call = call
103
    self.node = node
104
    if offline:
105
      self.failed = True
106
      self.fail_msg = "Node is marked offline"
107
      self.data = self.payload = None
108
    elif failed:
109
      self.fail_msg = self._EnsureErr(data)
110
      self.data = self.payload = None
111
    else:
112
      self.data = data
113
      if not isinstance(self.data, (tuple, list)):
114
        self.fail_msg = ("RPC layer error: invalid result type (%s)" %
115
                         type(self.data))
116
      elif len(data) != 2:
117
        self.fail_msg = ("RPC layer error: invalid result length (%d), "
118
                         "expected 2" % len(self.data))
119
      elif not self.data[0]:
120
        self.fail_msg = self._EnsureErr(self.data[1])
121
      else:
122
        # finally success
123
        self.fail_msg = None
124
        self.payload = data[1]
125

    
126
  @staticmethod
127
  def _EnsureErr(val):
128
    """Helper to ensure we return a 'True' value for error."""
129
    if val:
130
      return val
131
    else:
132
      return "No error information"
133

    
134
  def Raise(self, msg, prereq=False):
135
    """If the result has failed, raise an OpExecError.
136

137
    This is used so that LU code doesn't have to check for each
138
    result, but instead can call this function.
139

140
    """
141
    if not self.fail_msg:
142
      return
143

    
144
    if not msg: # one could pass None for default message
145
      msg = ("Call '%s' to node '%s' has failed: %s" %
146
             (self.call, self.node, self.fail_msg))
147
    else:
148
      msg = "%s: %s" % (msg, self.fail_msg)
149
    if prereq:
150
      ec = errors.OpPrereqError
151
    else:
152
      ec = errors.OpExecError
153
    raise ec(msg)
154

    
155
  def RemoteFailMsg(self):
156
    """Check if the remote procedure failed.
157

158
    @return: the fail_msg attribute
159

160
    """
161
    return self.fail_msg
162

    
163

    
164
class Client:
165
  """RPC Client class.
166

167
  This class, given a (remote) method name, a list of parameters and a
168
  list of nodes, will contact (in parallel) all nodes, and return a
169
  dict of results (key: node name, value: result).
170

171
  One current bug is that generic failure is still signaled by
172
  'False' result, which is not good. This overloading of values can
173
  cause bugs.
174

175
  """
176
  def __init__(self, procedure, body, port):
177
    self.procedure = procedure
178
    self.body = body
179
    self.port = port
180
    self.nc = {}
181

    
182
    self._ssl_params = \
183
      http.HttpSslParams(ssl_key_path=constants.SSL_CERT_FILE,
184
                         ssl_cert_path=constants.SSL_CERT_FILE)
185

    
186
  def ConnectList(self, node_list, address_list=None):
187
    """Add a list of nodes to the target nodes.
188

189
    @type node_list: list
190
    @param node_list: the list of node names to connect
191
    @type address_list: list or None
192
    @keyword address_list: either None or a list with node addresses,
193
        which must have the same length as the node list
194

195
    """
196
    if address_list is None:
197
      address_list = [None for _ in node_list]
198
    else:
199
      assert len(node_list) == len(address_list), \
200
             "Name and address lists should have the same length"
201
    for node, address in zip(node_list, address_list):
202
      self.ConnectNode(node, address)
203

    
204
  def ConnectNode(self, name, address=None):
205
    """Add a node to the target list.
206

207
    @type name: str
208
    @param name: the node name
209
    @type address: str
210
    @keyword address: the node address, if known
211

212
    """
213
    if address is None:
214
      address = name
215

    
216
    self.nc[name] = \
217
      http.client.HttpClientRequest(address, self.port, http.HTTP_PUT,
218
                                    "/%s" % self.procedure,
219
                                    post_data=self.body,
220
                                    ssl_params=self._ssl_params,
221
                                    ssl_verify_peer=True)
222

    
223
  def GetResults(self):
224
    """Call nodes and return results.
225

226
    @rtype: list
227
    @return: List of RPC results
228

229
    """
230
    assert _http_manager, "RPC module not initialized"
231

    
232
    _http_manager.ExecRequests(self.nc.values())
233

    
234
    results = {}
235

    
236
    for name, req in self.nc.iteritems():
237
      if req.success and req.resp_status_code == http.HTTP_OK:
238
        results[name] = RpcResult(data=serializer.LoadJson(req.resp_body),
239
                                  node=name, call=self.procedure)
240
        continue
241

    
242
      # TODO: Better error reporting
243
      if req.error:
244
        msg = req.error
245
      else:
246
        msg = req.resp_body
247

    
248
      logging.error("RPC error in %s from node %s: %s",
249
                    self.procedure, name, msg)
250
      results[name] = RpcResult(data=msg, failed=True, node=name,
251
                                call=self.procedure)
252

    
253
    return results
254

    
255

    
256
class RpcRunner(object):
257
  """RPC runner class"""
258

    
259
  def __init__(self, cfg):
260
    """Initialized the rpc runner.
261

262
    @type cfg:  C{config.ConfigWriter}
263
    @param cfg: the configuration object that will be used to get data
264
                about the cluster
265

266
    """
267
    self._cfg = cfg
268
    self.port = utils.GetDaemonPort(constants.NODED)
269

    
270
  def _InstDict(self, instance, hvp=None, bep=None):
271
    """Convert the given instance to a dict.
272

273
    This is done via the instance's ToDict() method and additionally
274
    we fill the hvparams with the cluster defaults.
275

276
    @type instance: L{objects.Instance}
277
    @param instance: an Instance object
278
    @type hvp: dict or None
279
    @param hvp: a dictionary with overridden hypervisor parameters
280
    @type bep: dict or None
281
    @param bep: a dictionary with overridden backend parameters
282
    @rtype: dict
283
    @return: the instance dict, with the hvparams filled with the
284
        cluster defaults
285

286
    """
287
    idict = instance.ToDict()
288
    cluster = self._cfg.GetClusterInfo()
289
    idict["hvparams"] = cluster.FillHV(instance)
290
    if hvp is not None:
291
      idict["hvparams"].update(hvp)
292
    idict["beparams"] = cluster.FillBE(instance)
293
    if bep is not None:
294
      idict["beparams"].update(bep)
295
    for nic in idict["nics"]:
296
      nic['nicparams'] = objects.FillDict(
297
        cluster.nicparams[constants.PP_DEFAULT],
298
        nic['nicparams'])
299
    return idict
300

    
301
  def _ConnectList(self, client, node_list, call):
302
    """Helper for computing node addresses.
303

304
    @type client: L{ganeti.rpc.Client}
305
    @param client: a C{Client} instance
306
    @type node_list: list
307
    @param node_list: the node list we should connect
308
    @type call: string
309
    @param call: the name of the remote procedure call, for filling in
310
        correctly any eventual offline nodes' results
311

312
    """
313
    all_nodes = self._cfg.GetAllNodesInfo()
314
    name_list = []
315
    addr_list = []
316
    skip_dict = {}
317
    for node in node_list:
318
      if node in all_nodes:
319
        if all_nodes[node].offline:
320
          skip_dict[node] = RpcResult(node=node, offline=True, call=call)
321
          continue
322
        val = all_nodes[node].primary_ip
323
      else:
324
        val = None
325
      addr_list.append(val)
326
      name_list.append(node)
327
    if name_list:
328
      client.ConnectList(name_list, address_list=addr_list)
329
    return skip_dict
330

    
331
  def _ConnectNode(self, client, node, call):
332
    """Helper for computing one node's address.
333

334
    @type client: L{ganeti.rpc.Client}
335
    @param client: a C{Client} instance
336
    @type node: str
337
    @param node: the node we should connect
338
    @type call: string
339
    @param call: the name of the remote procedure call, for filling in
340
        correctly any eventual offline nodes' results
341

342
    """
343
    node_info = self._cfg.GetNodeInfo(node)
344
    if node_info is not None:
345
      if node_info.offline:
346
        return RpcResult(node=node, offline=True, call=call)
347
      addr = node_info.primary_ip
348
    else:
349
      addr = None
350
    client.ConnectNode(node, address=addr)
351

    
352
  def _MultiNodeCall(self, node_list, procedure, args):
353
    """Helper for making a multi-node call
354

355
    """
356
    body = serializer.DumpJson(args, indent=False)
357
    c = Client(procedure, body, self.port)
358
    skip_dict = self._ConnectList(c, node_list, procedure)
359
    skip_dict.update(c.GetResults())
360
    return skip_dict
361

    
362
  @classmethod
363
  def _StaticMultiNodeCall(cls, node_list, procedure, args,
364
                           address_list=None):
365
    """Helper for making a multi-node static call
366

367
    """
368
    body = serializer.DumpJson(args, indent=False)
369
    c = Client(procedure, body, utils.GetDaemonPort(constants.NODED))
370
    c.ConnectList(node_list, address_list=address_list)
371
    return c.GetResults()
372

    
373
  def _SingleNodeCall(self, node, procedure, args):
374
    """Helper for making a single-node call
375

376
    """
377
    body = serializer.DumpJson(args, indent=False)
378
    c = Client(procedure, body, self.port)
379
    result = self._ConnectNode(c, node, procedure)
380
    if result is None:
381
      # we did connect, node is not offline
382
      result = c.GetResults()[node]
383
    return result
384

    
385
  @classmethod
386
  def _StaticSingleNodeCall(cls, node, procedure, args):
387
    """Helper for making a single-node static call
388

389
    """
390
    body = serializer.DumpJson(args, indent=False)
391
    c = Client(procedure, body, utils.GetDaemonPort(constants.NODED))
392
    c.ConnectNode(node)
393
    return c.GetResults()[node]
394

    
395
  @staticmethod
396
  def _Compress(data):
397
    """Compresses a string for transport over RPC.
398

399
    Small amounts of data are not compressed.
400

401
    @type data: str
402
    @param data: Data
403
    @rtype: tuple
404
    @return: Encoded data to send
405

406
    """
407
    # Small amounts of data are not compressed
408
    if len(data) < 512:
409
      return (constants.RPC_ENCODING_NONE, data)
410

    
411
    # Compress with zlib and encode in base64
412
    return (constants.RPC_ENCODING_ZLIB_BASE64,
413
            base64.b64encode(zlib.compress(data, 3)))
414

    
415
  #
416
  # Begin RPC calls
417
  #
418

    
419
  def call_lv_list(self, node_list, vg_name):
420
    """Gets the logical volumes present in a given volume group.
421

422
    This is a multi-node call.
423

424
    """
425
    return self._MultiNodeCall(node_list, "lv_list", [vg_name])
426

    
427
  def call_vg_list(self, node_list):
428
    """Gets the volume group list.
429

430
    This is a multi-node call.
431

432
    """
433
    return self._MultiNodeCall(node_list, "vg_list", [])
434

    
435
  def call_storage_list(self, node_list, su_name, su_args, name, fields):
436
    """Get list of storage units.
437

438
    This is a multi-node call.
439

440
    """
441
    return self._MultiNodeCall(node_list, "storage_list",
442
                               [su_name, su_args, name, fields])
443

    
444
  def call_storage_modify(self, node, su_name, su_args, name, changes):
445
    """Modify a storage unit.
446

447
    This is a single-node call.
448

449
    """
450
    return self._SingleNodeCall(node, "storage_modify",
451
                                [su_name, su_args, name, changes])
452

    
453
  def call_storage_execute(self, node, su_name, su_args, name, op):
454
    """Executes an operation on a storage unit.
455

456
    This is a single-node call.
457

458
    """
459
    return self._SingleNodeCall(node, "storage_execute",
460
                                [su_name, su_args, name, op])
461

    
462
  def call_bridges_exist(self, node, bridges_list):
463
    """Checks if a node has all the bridges given.
464

465
    This method checks if all bridges given in the bridges_list are
466
    present on the remote node, so that an instance that uses interfaces
467
    on those bridges can be started.
468

469
    This is a single-node call.
470

471
    """
472
    return self._SingleNodeCall(node, "bridges_exist", [bridges_list])
473

    
474
  def call_instance_start(self, node, instance, hvp, bep):
475
    """Starts an instance.
476

477
    This is a single-node call.
478

479
    """
480
    idict = self._InstDict(instance, hvp=hvp, bep=bep)
481
    return self._SingleNodeCall(node, "instance_start", [idict])
482

    
483
  def call_instance_shutdown(self, node, instance):
484
    """Stops an instance.
485

486
    This is a single-node call.
487

488
    """
489
    return self._SingleNodeCall(node, "instance_shutdown",
490
                                [self._InstDict(instance)])
491

    
492
  def call_migration_info(self, node, instance):
493
    """Gather the information necessary to prepare an instance migration.
494

495
    This is a single-node call.
496

497
    @type node: string
498
    @param node: the node on which the instance is currently running
499
    @type instance: C{objects.Instance}
500
    @param instance: the instance definition
501

502
    """
503
    return self._SingleNodeCall(node, "migration_info",
504
                                [self._InstDict(instance)])
505

    
506
  def call_accept_instance(self, node, instance, info, target):
507
    """Prepare a node to accept an instance.
508

509
    This is a single-node call.
510

511
    @type node: string
512
    @param node: the target node for the migration
513
    @type instance: C{objects.Instance}
514
    @param instance: the instance definition
515
    @type info: opaque/hypervisor specific (string/data)
516
    @param info: result for the call_migration_info call
517
    @type target: string
518
    @param target: target hostname (usually ip address) (on the node itself)
519

520
    """
521
    return self._SingleNodeCall(node, "accept_instance",
522
                                [self._InstDict(instance), info, target])
523

    
524
  def call_finalize_migration(self, node, instance, info, success):
525
    """Finalize any target-node migration specific operation.
526

527
    This is called both in case of a successful migration and in case of error
528
    (in which case it should abort the migration).
529

530
    This is a single-node call.
531

532
    @type node: string
533
    @param node: the target node for the migration
534
    @type instance: C{objects.Instance}
535
    @param instance: the instance definition
536
    @type info: opaque/hypervisor specific (string/data)
537
    @param info: result for the call_migration_info call
538
    @type success: boolean
539
    @param success: whether the migration was a success or a failure
540

541
    """
542
    return self._SingleNodeCall(node, "finalize_migration",
543
                                [self._InstDict(instance), info, success])
544

    
545
  def call_instance_migrate(self, node, instance, target, live):
546
    """Migrate an instance.
547

548
    This is a single-node call.
549

550
    @type node: string
551
    @param node: the node on which the instance is currently running
552
    @type instance: C{objects.Instance}
553
    @param instance: the instance definition
554
    @type target: string
555
    @param target: the target node name
556
    @type live: boolean
557
    @param live: whether the migration should be done live or not (the
558
        interpretation of this parameter is left to the hypervisor)
559

560
    """
561
    return self._SingleNodeCall(node, "instance_migrate",
562
                                [self._InstDict(instance), target, live])
563

    
564
  def call_instance_reboot(self, node, instance, reboot_type):
565
    """Reboots an instance.
566

567
    This is a single-node call.
568

569
    """
570
    return self._SingleNodeCall(node, "instance_reboot",
571
                                [self._InstDict(instance), reboot_type])
572

    
573
  def call_instance_os_add(self, node, inst, reinstall):
574
    """Installs an OS on the given instance.
575

576
    This is a single-node call.
577

578
    """
579
    return self._SingleNodeCall(node, "instance_os_add",
580
                                [self._InstDict(inst), reinstall])
581

    
582
  def call_instance_run_rename(self, node, inst, old_name):
583
    """Run the OS rename script for an instance.
584

585
    This is a single-node call.
586

587
    """
588
    return self._SingleNodeCall(node, "instance_run_rename",
589
                                [self._InstDict(inst), old_name])
590

    
591
  def call_instance_info(self, node, instance, hname):
592
    """Returns information about a single instance.
593

594
    This is a single-node call.
595

596
    @type node: list
597
    @param node: the list of nodes to query
598
    @type instance: string
599
    @param instance: the instance name
600
    @type hname: string
601
    @param hname: the hypervisor type of the instance
602

603
    """
604
    return self._SingleNodeCall(node, "instance_info", [instance, hname])
605

    
606
  def call_instance_migratable(self, node, instance):
607
    """Checks whether the given instance can be migrated.
608

609
    This is a single-node call.
610

611
    @param node: the node to query
612
    @type instance: L{objects.Instance}
613
    @param instance: the instance to check
614

615

616
    """
617
    return self._SingleNodeCall(node, "instance_migratable",
618
                                [self._InstDict(instance)])
619

    
620
  def call_all_instances_info(self, node_list, hypervisor_list):
621
    """Returns information about all instances on the given nodes.
622

623
    This is a multi-node call.
624

625
    @type node_list: list
626
    @param node_list: the list of nodes to query
627
    @type hypervisor_list: list
628
    @param hypervisor_list: the hypervisors to query for instances
629

630
    """
631
    return self._MultiNodeCall(node_list, "all_instances_info",
632
                               [hypervisor_list])
633

    
634
  def call_instance_list(self, node_list, hypervisor_list):
635
    """Returns the list of running instances on a given node.
636

637
    This is a multi-node call.
638

639
    @type node_list: list
640
    @param node_list: the list of nodes to query
641
    @type hypervisor_list: list
642
    @param hypervisor_list: the hypervisors to query for instances
643

644
    """
645
    return self._MultiNodeCall(node_list, "instance_list", [hypervisor_list])
646

    
647
  def call_node_tcp_ping(self, node, source, target, port, timeout,
648
                         live_port_needed):
649
    """Do a TcpPing on the remote node
650

651
    This is a single-node call.
652

653
    """
654
    return self._SingleNodeCall(node, "node_tcp_ping",
655
                                [source, target, port, timeout,
656
                                 live_port_needed])
657

    
658
  def call_node_has_ip_address(self, node, address):
659
    """Checks if a node has the given IP address.
660

661
    This is a single-node call.
662

663
    """
664
    return self._SingleNodeCall(node, "node_has_ip_address", [address])
665

    
666
  def call_node_info(self, node_list, vg_name, hypervisor_type):
667
    """Return node information.
668

669
    This will return memory information and volume group size and free
670
    space.
671

672
    This is a multi-node call.
673

674
    @type node_list: list
675
    @param node_list: the list of nodes to query
676
    @type vg_name: C{string}
677
    @param vg_name: the name of the volume group to ask for disk space
678
        information
679
    @type hypervisor_type: C{str}
680
    @param hypervisor_type: the name of the hypervisor to ask for
681
        memory information
682

683
    """
684
    return self._MultiNodeCall(node_list, "node_info",
685
                               [vg_name, hypervisor_type])
686

    
687
  def call_node_add(self, node, dsa, dsapub, rsa, rsapub, ssh, sshpub):
688
    """Add a node to the cluster.
689

690
    This is a single-node call.
691

692
    """
693
    return self._SingleNodeCall(node, "node_add",
694
                                [dsa, dsapub, rsa, rsapub, ssh, sshpub])
695

    
696
  def call_node_verify(self, node_list, checkdict, cluster_name):
697
    """Request verification of given parameters.
698

699
    This is a multi-node call.
700

701
    """
702
    return self._MultiNodeCall(node_list, "node_verify",
703
                               [checkdict, cluster_name])
704

    
705
  @classmethod
706
  def call_node_start_master(cls, node, start_daemons, no_voting):
707
    """Tells a node to activate itself as a master.
708

709
    This is a single-node call.
710

711
    """
712
    return cls._StaticSingleNodeCall(node, "node_start_master",
713
                                     [start_daemons, no_voting])
714

    
715
  @classmethod
716
  def call_node_stop_master(cls, node, stop_daemons):
717
    """Tells a node to demote itself from master status.
718

719
    This is a single-node call.
720

721
    """
722
    return cls._StaticSingleNodeCall(node, "node_stop_master", [stop_daemons])
723

    
724
  @classmethod
725
  def call_master_info(cls, node_list):
726
    """Query master info.
727

728
    This is a multi-node call.
729

730
    """
731
    # TODO: should this method query down nodes?
732
    return cls._StaticMultiNodeCall(node_list, "master_info", [])
733

    
734
  def call_version(self, node_list):
735
    """Query node version.
736

737
    This is a multi-node call.
738

739
    """
740
    return self._MultiNodeCall(node_list, "version", [])
741

    
742
  def call_blockdev_create(self, node, bdev, size, owner, on_primary, info):
743
    """Request creation of a given block device.
744

745
    This is a single-node call.
746

747
    """
748
    return self._SingleNodeCall(node, "blockdev_create",
749
                                [bdev.ToDict(), size, owner, on_primary, info])
750

    
751
  def call_blockdev_remove(self, node, bdev):
752
    """Request removal of a given block device.
753

754
    This is a single-node call.
755

756
    """
757
    return self._SingleNodeCall(node, "blockdev_remove", [bdev.ToDict()])
758

    
759
  def call_blockdev_rename(self, node, devlist):
760
    """Request rename of the given block devices.
761

762
    This is a single-node call.
763

764
    """
765
    return self._SingleNodeCall(node, "blockdev_rename",
766
                                [(d.ToDict(), uid) for d, uid in devlist])
767

    
768
  def call_blockdev_assemble(self, node, disk, owner, on_primary):
769
    """Request assembling of a given block device.
770

771
    This is a single-node call.
772

773
    """
774
    return self._SingleNodeCall(node, "blockdev_assemble",
775
                                [disk.ToDict(), owner, on_primary])
776

    
777
  def call_blockdev_shutdown(self, node, disk):
778
    """Request shutdown of a given block device.
779

780
    This is a single-node call.
781

782
    """
783
    return self._SingleNodeCall(node, "blockdev_shutdown", [disk.ToDict()])
784

    
785
  def call_blockdev_addchildren(self, node, bdev, ndevs):
786
    """Request adding a list of children to a (mirroring) device.
787

788
    This is a single-node call.
789

790
    """
791
    return self._SingleNodeCall(node, "blockdev_addchildren",
792
                                [bdev.ToDict(),
793
                                 [disk.ToDict() for disk in ndevs]])
794

    
795
  def call_blockdev_removechildren(self, node, bdev, ndevs):
796
    """Request removing a list of children from a (mirroring) device.
797

798
    This is a single-node call.
799

800
    """
801
    return self._SingleNodeCall(node, "blockdev_removechildren",
802
                                [bdev.ToDict(),
803
                                 [disk.ToDict() for disk in ndevs]])
804

    
805
  def call_blockdev_getmirrorstatus(self, node, disks):
806
    """Request status of a (mirroring) device.
807

808
    This is a single-node call.
809

810
    """
811
    result = self._SingleNodeCall(node, "blockdev_getmirrorstatus",
812
                                  [dsk.ToDict() for dsk in disks])
813
    if not result.failed:
814
      result.payload = [objects.BlockDevStatus.FromDict(i)
815
                        for i in result.payload]
816
    return result
817

    
818
  def call_blockdev_find(self, node, disk):
819
    """Request identification of a given block device.
820

821
    This is a single-node call.
822

823
    """
824
    result = self._SingleNodeCall(node, "blockdev_find", [disk.ToDict()])
825
    if not result.failed and result.payload is not None:
826
      result.payload = objects.BlockDevStatus.FromDict(result.payload)
827
    return result
828

    
829
  def call_blockdev_close(self, node, instance_name, disks):
830
    """Closes the given block devices.
831

832
    This is a single-node call.
833

834
    """
835
    params = [instance_name, [cf.ToDict() for cf in disks]]
836
    return self._SingleNodeCall(node, "blockdev_close", params)
837

    
838
  def call_blockdev_getsizes(self, node, disks):
839
    """Returns the size of the given disks.
840

841
    This is a single-node call.
842

843
    """
844
    params = [[cf.ToDict() for cf in disks]]
845
    return self._SingleNodeCall(node, "blockdev_getsize", params)
846

    
847
  def call_drbd_disconnect_net(self, node_list, nodes_ip, disks):
848
    """Disconnects the network of the given drbd devices.
849

850
    This is a multi-node call.
851

852
    """
853
    return self._MultiNodeCall(node_list, "drbd_disconnect_net",
854
                               [nodes_ip, [cf.ToDict() for cf in disks]])
855

    
856
  def call_drbd_attach_net(self, node_list, nodes_ip,
857
                           disks, instance_name, multimaster):
858
    """Disconnects the given drbd devices.
859

860
    This is a multi-node call.
861

862
    """
863
    return self._MultiNodeCall(node_list, "drbd_attach_net",
864
                               [nodes_ip, [cf.ToDict() for cf in disks],
865
                                instance_name, multimaster])
866

    
867
  def call_drbd_wait_sync(self, node_list, nodes_ip, disks):
868
    """Waits for the synchronization of drbd devices is complete.
869

870
    This is a multi-node call.
871

872
    """
873
    return self._MultiNodeCall(node_list, "drbd_wait_sync",
874
                               [nodes_ip, [cf.ToDict() for cf in disks]])
875

    
876
  @classmethod
877
  def call_upload_file(cls, node_list, file_name, address_list=None):
878
    """Upload a file.
879

880
    The node will refuse the operation in case the file is not on the
881
    approved file list.
882

883
    This is a multi-node call.
884

885
    @type node_list: list
886
    @param node_list: the list of node names to upload to
887
    @type file_name: str
888
    @param file_name: the filename to upload
889
    @type address_list: list or None
890
    @keyword address_list: an optional list of node addresses, in order
891
        to optimize the RPC speed
892

893
    """
894
    file_contents = utils.ReadFile(file_name)
895
    data = cls._Compress(file_contents)
896
    st = os.stat(file_name)
897
    params = [file_name, data, st.st_mode, st.st_uid, st.st_gid,
898
              st.st_atime, st.st_mtime]
899
    return cls._StaticMultiNodeCall(node_list, "upload_file", params,
900
                                    address_list=address_list)
901

    
902
  @classmethod
903
  def call_write_ssconf_files(cls, node_list, values):
904
    """Write ssconf files.
905

906
    This is a multi-node call.
907

908
    """
909
    return cls._StaticMultiNodeCall(node_list, "write_ssconf_files", [values])
910

    
911
  def call_os_diagnose(self, node_list):
912
    """Request a diagnose of OS definitions.
913

914
    This is a multi-node call.
915

916
    """
917
    return self._MultiNodeCall(node_list, "os_diagnose", [])
918

    
919
  def call_os_get(self, node, name):
920
    """Returns an OS definition.
921

922
    This is a single-node call.
923

924
    """
925
    result = self._SingleNodeCall(node, "os_get", [name])
926
    if not result.failed and isinstance(result.data, dict):
927
      result.data = objects.OS.FromDict(result.data)
928
    return result
929

    
930
  def call_hooks_runner(self, node_list, hpath, phase, env):
931
    """Call the hooks runner.
932

933
    Args:
934
      - op: the OpCode instance
935
      - env: a dictionary with the environment
936

937
    This is a multi-node call.
938

939
    """
940
    params = [hpath, phase, env]
941
    return self._MultiNodeCall(node_list, "hooks_runner", params)
942

    
943
  def call_iallocator_runner(self, node, name, idata):
944
    """Call an iallocator on a remote node
945

946
    Args:
947
      - name: the iallocator name
948
      - input: the json-encoded input string
949

950
    This is a single-node call.
951

952
    """
953
    return self._SingleNodeCall(node, "iallocator_runner", [name, idata])
954

    
955
  def call_blockdev_grow(self, node, cf_bdev, amount):
956
    """Request a snapshot of the given block device.
957

958
    This is a single-node call.
959

960
    """
961
    return self._SingleNodeCall(node, "blockdev_grow",
962
                                [cf_bdev.ToDict(), amount])
963

    
964
  def call_blockdev_snapshot(self, node, cf_bdev):
965
    """Request a snapshot of the given block device.
966

967
    This is a single-node call.
968

969
    """
970
    return self._SingleNodeCall(node, "blockdev_snapshot", [cf_bdev.ToDict()])
971

    
972
  def call_snapshot_export(self, node, snap_bdev, dest_node, instance,
973
                           cluster_name, idx):
974
    """Request the export of a given snapshot.
975

976
    This is a single-node call.
977

978
    """
979
    return self._SingleNodeCall(node, "snapshot_export",
980
                                [snap_bdev.ToDict(), dest_node,
981
                                 self._InstDict(instance), cluster_name, idx])
982

    
983
  def call_finalize_export(self, node, instance, snap_disks):
984
    """Request the completion of an export operation.
985

986
    This writes the export config file, etc.
987

988
    This is a single-node call.
989

990
    """
991
    flat_disks = []
992
    for disk in snap_disks:
993
      if isinstance(disk, bool):
994
        flat_disks.append(disk)
995
      else:
996
        flat_disks.append(disk.ToDict())
997

    
998
    return self._SingleNodeCall(node, "finalize_export",
999
                                [self._InstDict(instance), flat_disks])
1000

    
1001
  def call_export_info(self, node, path):
1002
    """Queries the export information in a given path.
1003

1004
    This is a single-node call.
1005

1006
    """
1007
    return self._SingleNodeCall(node, "export_info", [path])
1008

    
1009
  def call_instance_os_import(self, node, inst, src_node, src_images,
1010
                              cluster_name):
1011
    """Request the import of a backup into an instance.
1012

1013
    This is a single-node call.
1014

1015
    """
1016
    return self._SingleNodeCall(node, "instance_os_import",
1017
                                [self._InstDict(inst), src_node, src_images,
1018
                                 cluster_name])
1019

    
1020
  def call_export_list(self, node_list):
1021
    """Gets the stored exports list.
1022

1023
    This is a multi-node call.
1024

1025
    """
1026
    return self._MultiNodeCall(node_list, "export_list", [])
1027

    
1028
  def call_export_remove(self, node, export):
1029
    """Requests removal of a given export.
1030

1031
    This is a single-node call.
1032

1033
    """
1034
    return self._SingleNodeCall(node, "export_remove", [export])
1035

    
1036
  @classmethod
1037
  def call_node_leave_cluster(cls, node):
1038
    """Requests a node to clean the cluster information it has.
1039

1040
    This will remove the configuration information from the ganeti data
1041
    dir.
1042

1043
    This is a single-node call.
1044

1045
    """
1046
    return cls._StaticSingleNodeCall(node, "node_leave_cluster", [])
1047

    
1048
  def call_node_volumes(self, node_list):
1049
    """Gets all volumes on node(s).
1050

1051
    This is a multi-node call.
1052

1053
    """
1054
    return self._MultiNodeCall(node_list, "node_volumes", [])
1055

    
1056
  def call_node_demote_from_mc(self, node):
1057
    """Demote a node from the master candidate role.
1058

1059
    This is a single-node call.
1060

1061
    """
1062
    return self._SingleNodeCall(node, "node_demote_from_mc", [])
1063

    
1064

    
1065
  def call_node_powercycle(self, node, hypervisor):
1066
    """Tries to powercycle a node.
1067

1068
    This is a single-node call.
1069

1070
    """
1071
    return self._SingleNodeCall(node, "node_powercycle", [hypervisor])
1072

    
1073

    
1074
  def call_test_delay(self, node_list, duration):
1075
    """Sleep for a fixed time on given node(s).
1076

1077
    This is a multi-node call.
1078

1079
    """
1080
    return self._MultiNodeCall(node_list, "test_delay", [duration])
1081

    
1082
  def call_file_storage_dir_create(self, node, file_storage_dir):
1083
    """Create the given file storage directory.
1084

1085
    This is a single-node call.
1086

1087
    """
1088
    return self._SingleNodeCall(node, "file_storage_dir_create",
1089
                                [file_storage_dir])
1090

    
1091
  def call_file_storage_dir_remove(self, node, file_storage_dir):
1092
    """Remove the given file storage directory.
1093

1094
    This is a single-node call.
1095

1096
    """
1097
    return self._SingleNodeCall(node, "file_storage_dir_remove",
1098
                                [file_storage_dir])
1099

    
1100
  def call_file_storage_dir_rename(self, node, old_file_storage_dir,
1101
                                   new_file_storage_dir):
1102
    """Rename file storage directory.
1103

1104
    This is a single-node call.
1105

1106
    """
1107
    return self._SingleNodeCall(node, "file_storage_dir_rename",
1108
                                [old_file_storage_dir, new_file_storage_dir])
1109

    
1110
  @classmethod
1111
  def call_jobqueue_update(cls, node_list, address_list, file_name, content):
1112
    """Update job queue.
1113

1114
    This is a multi-node call.
1115

1116
    """
1117
    return cls._StaticMultiNodeCall(node_list, "jobqueue_update",
1118
                                    [file_name, cls._Compress(content)],
1119
                                    address_list=address_list)
1120

    
1121
  @classmethod
1122
  def call_jobqueue_purge(cls, node):
1123
    """Purge job queue.
1124

1125
    This is a single-node call.
1126

1127
    """
1128
    return cls._StaticSingleNodeCall(node, "jobqueue_purge", [])
1129

    
1130
  @classmethod
1131
  def call_jobqueue_rename(cls, node_list, address_list, rename):
1132
    """Rename a job queue file.
1133

1134
    This is a multi-node call.
1135

1136
    """
1137
    return cls._StaticMultiNodeCall(node_list, "jobqueue_rename", rename,
1138
                                    address_list=address_list)
1139

    
1140
  @classmethod
1141
  def call_jobqueue_set_drain(cls, node_list, drain_flag):
1142
    """Set the drain flag on the queue.
1143

1144
    This is a multi-node call.
1145

1146
    @type node_list: list
1147
    @param node_list: the list of nodes to query
1148
    @type drain_flag: bool
1149
    @param drain_flag: if True, will set the drain flag, otherwise reset it.
1150

1151
    """
1152
    return cls._StaticMultiNodeCall(node_list, "jobqueue_set_drain",
1153
                                    [drain_flag])
1154

    
1155
  def call_hypervisor_validate_params(self, node_list, hvname, hvparams):
1156
    """Validate the hypervisor params.
1157

1158
    This is a multi-node call.
1159

1160
    @type node_list: list
1161
    @param node_list: the list of nodes to query
1162
    @type hvname: string
1163
    @param hvname: the hypervisor name
1164
    @type hvparams: dict
1165
    @param hvparams: the hypervisor parameters to be validated
1166

1167
    """
1168
    cluster = self._cfg.GetClusterInfo()
1169
    hv_full = objects.FillDict(cluster.hvparams.get(hvname, {}), hvparams)
1170
    return self._MultiNodeCall(node_list, "hypervisor_validate_params",
1171
                               [hvname, hv_full])