Statistics
| Branch: | Tag: | Revision:

root / lib / rpc.py @ 3eccac06

History | View | Annotate | Download (33.3 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Inter-node RPC library.
23

24
"""
25

    
26
# pylint: disable-msg=C0103,R0201,R0904
27
# C0103: Invalid name, since call_ are not valid
28
# R0201: Method could be a function, we keep all rpcs instance methods
29
# as not to change them back and forth between static/instance methods
30
# if they need to start using instance attributes
31
# R0904: Too many public methods
32

    
33
import os
34
import socket
35
import logging
36
import zlib
37
import base64
38

    
39
from ganeti import utils
40
from ganeti import objects
41
from ganeti import http
42
from ganeti import serializer
43
from ganeti import constants
44
from ganeti import errors
45

    
46
import ganeti.http.client
47

    
48

    
49
# Module level variable
50
_http_manager = None
51

    
52

    
53
def Init():
54
  """Initializes the module-global HTTP client manager.
55

56
  Must be called before using any RPC function.
57

58
  """
59
  global _http_manager
60

    
61
  assert not _http_manager, "RPC module initialized more than once"
62

    
63
  _http_manager = http.client.HttpClientManager()
64

    
65

    
66
def Shutdown():
67
  """Stops the module-global HTTP client manager.
68

69
  Must be called before quitting the program.
70

71
  """
72
  global _http_manager
73

    
74
  if _http_manager:
75
    _http_manager.Shutdown()
76
    _http_manager = None
77

    
78

    
79
class RpcResult(object):
80
  """RPC Result class.
81

82
  This class holds an RPC result. It is needed since in multi-node
83
  calls we can't raise an exception just because one one out of many
84
  failed, and therefore we use this class to encapsulate the result.
85

86
  @ivar data: the data payload, for successfull results, or None
87
  @type failed: boolean
88
  @ivar failed: whether the operation failed at RPC level (not
89
      application level on the remote node)
90
  @ivar call: the name of the RPC call
91
  @ivar node: the name of the node to which we made the call
92
  @ivar offline: whether the operation failed because the node was
93
      offline, as opposed to actual failure; offline=True will always
94
      imply failed=True, in order to allow simpler checking if
95
      the user doesn't care about the exact failure mode
96

97
  """
98
  def __init__(self, data=None, failed=False, offline=False,
99
               call=None, node=None):
100
    self.failed = failed
101
    self.offline = offline
102
    self.call = call
103
    self.node = node
104
    if offline:
105
      self.failed = True
106
      self.error = "Node is marked offline"
107
      self.data = self.payload = None
108
    elif failed:
109
      self.error = data
110
      self.data = self.payload = None
111
    else:
112
      self.data = data
113
      self.error = None
114
      if isinstance(data, (tuple, list)) and len(data) == 2:
115
        self.payload = data[1]
116
      else:
117
        self.payload = None
118

    
119
  def Raise(self):
120
    """If the result has failed, raise an OpExecError.
121

122
    This is used so that LU code doesn't have to check for each
123
    result, but instead can call this function.
124

125
    """
126
    if self.failed:
127
      raise errors.OpExecError("Call '%s' to node '%s' has failed: %s" %
128
                               (self.call, self.node, self.error))
129

    
130
  def RemoteFailMsg(self):
131
    """Check if the remote procedure failed.
132

133
    This is valid only for RPC calls which return result of the form
134
    (status, data | error_msg).
135

136
    @return: empty string for succcess, otherwise an error message
137

138
    """
139
    def _EnsureErr(val):
140
      """Helper to ensure we return a 'True' value for error."""
141
      if val:
142
        return val
143
      else:
144
        return "No error information"
145

    
146
    if self.failed:
147
      return _EnsureErr(self.error)
148
    if not isinstance(self.data, (tuple, list)):
149
      return "Invalid result type (%s)" % type(self.data)
150
    if len(self.data) != 2:
151
      return "Invalid result length (%d), expected 2" % len(self.data)
152
    if not self.data[0]:
153
      return _EnsureErr(self.data[1])
154
    return ""
155

    
156

    
157
class Client:
158
  """RPC Client class.
159

160
  This class, given a (remote) method name, a list of parameters and a
161
  list of nodes, will contact (in parallel) all nodes, and return a
162
  dict of results (key: node name, value: result).
163

164
  One current bug is that generic failure is still signalled by
165
  'False' result, which is not good. This overloading of values can
166
  cause bugs.
167

168
  """
169
  def __init__(self, procedure, body, port):
170
    self.procedure = procedure
171
    self.body = body
172
    self.port = port
173
    self.nc = {}
174

    
175
    self._ssl_params = \
176
      http.HttpSslParams(ssl_key_path=constants.SSL_CERT_FILE,
177
                         ssl_cert_path=constants.SSL_CERT_FILE)
178

    
179
  def ConnectList(self, node_list, address_list=None):
180
    """Add a list of nodes to the target nodes.
181

182
    @type node_list: list
183
    @param node_list: the list of node names to connect
184
    @type address_list: list or None
185
    @keyword address_list: either None or a list with node addresses,
186
        which must have the same length as the node list
187

188
    """
189
    if address_list is None:
190
      address_list = [None for _ in node_list]
191
    else:
192
      assert len(node_list) == len(address_list), \
193
             "Name and address lists should have the same length"
194
    for node, address in zip(node_list, address_list):
195
      self.ConnectNode(node, address)
196

    
197
  def ConnectNode(self, name, address=None):
198
    """Add a node to the target list.
199

200
    @type name: str
201
    @param name: the node name
202
    @type address: str
203
    @keyword address: the node address, if known
204

205
    """
206
    if address is None:
207
      address = name
208

    
209
    self.nc[name] = \
210
      http.client.HttpClientRequest(address, self.port, http.HTTP_PUT,
211
                                    "/%s" % self.procedure,
212
                                    post_data=self.body,
213
                                    ssl_params=self._ssl_params,
214
                                    ssl_verify_peer=True)
215

    
216
  def GetResults(self):
217
    """Call nodes and return results.
218

219
    @rtype: list
220
    @return: List of RPC results
221

222
    """
223
    assert _http_manager, "RPC module not intialized"
224

    
225
    _http_manager.ExecRequests(self.nc.values())
226

    
227
    results = {}
228

    
229
    for name, req in self.nc.iteritems():
230
      if req.success and req.resp_status_code == http.HTTP_OK:
231
        results[name] = RpcResult(data=serializer.LoadJson(req.resp_body),
232
                                  node=name, call=self.procedure)
233
        continue
234

    
235
      # TODO: Better error reporting
236
      if req.error:
237
        msg = req.error
238
      else:
239
        msg = req.resp_body
240

    
241
      logging.error("RPC error in %s from node %s: %s",
242
                    self.procedure, name, msg)
243
      results[name] = RpcResult(data=msg, failed=True, node=name,
244
                                call=self.procedure)
245

    
246
    return results
247

    
248

    
249
class RpcRunner(object):
250
  """RPC runner class"""
251

    
252
  def __init__(self, cfg):
253
    """Initialized the rpc runner.
254

255
    @type cfg:  C{config.ConfigWriter}
256
    @param cfg: the configuration object that will be used to get data
257
                about the cluster
258

259
    """
260
    self._cfg = cfg
261
    self.port = utils.GetNodeDaemonPort()
262

    
263
  def _InstDict(self, instance, hvp=None, bep=None):
264
    """Convert the given instance to a dict.
265

266
    This is done via the instance's ToDict() method and additionally
267
    we fill the hvparams with the cluster defaults.
268

269
    @type instance: L{objects.Instance}
270
    @param instance: an Instance object
271
    @type hvp: dict or None
272
    @param hvp: a dictionary with overriden hypervisor parameters
273
    @type bep: dict or None
274
    @param bep: a dictionary with overriden backend parameters
275
    @rtype: dict
276
    @return: the instance dict, with the hvparams filled with the
277
        cluster defaults
278

279
    """
280
    idict = instance.ToDict()
281
    cluster = self._cfg.GetClusterInfo()
282
    idict["hvparams"] = cluster.FillHV(instance)
283
    if hvp is not None:
284
      idict["hvparams"].update(hvp)
285
    idict["beparams"] = cluster.FillBE(instance)
286
    if bep is not None:
287
      idict["beparams"].update(bep)
288
    for nic in idict["nics"]:
289
      nic['nicparams'] = objects.FillDict(
290
        cluster.nicparams[constants.PP_DEFAULT],
291
        nic['nicparams'])
292
    return idict
293

    
294
  def _ConnectList(self, client, node_list, call):
295
    """Helper for computing node addresses.
296

297
    @type client: L{Client}
298
    @param client: a C{Client} instance
299
    @type node_list: list
300
    @param node_list: the node list we should connect
301
    @type call: string
302
    @param call: the name of the remote procedure call, for filling in
303
        correctly any eventual offline nodes' results
304

305
    """
306
    all_nodes = self._cfg.GetAllNodesInfo()
307
    name_list = []
308
    addr_list = []
309
    skip_dict = {}
310
    for node in node_list:
311
      if node in all_nodes:
312
        if all_nodes[node].offline:
313
          skip_dict[node] = RpcResult(node=node, offline=True, call=call)
314
          continue
315
        val = all_nodes[node].primary_ip
316
      else:
317
        val = None
318
      addr_list.append(val)
319
      name_list.append(node)
320
    if name_list:
321
      client.ConnectList(name_list, address_list=addr_list)
322
    return skip_dict
323

    
324
  def _ConnectNode(self, client, node, call):
325
    """Helper for computing one node's address.
326

327
    @type client: L{Client}
328
    @param client: a C{Client} instance
329
    @type node: str
330
    @param node: the node we should connect
331
    @type call: string
332
    @param call: the name of the remote procedure call, for filling in
333
        correctly any eventual offline nodes' results
334

335
    """
336
    node_info = self._cfg.GetNodeInfo(node)
337
    if node_info is not None:
338
      if node_info.offline:
339
        return RpcResult(node=node, offline=True, call=call)
340
      addr = node_info.primary_ip
341
    else:
342
      addr = None
343
    client.ConnectNode(node, address=addr)
344

    
345
  def _MultiNodeCall(self, node_list, procedure, args):
346
    """Helper for making a multi-node call
347

348
    """
349
    body = serializer.DumpJson(args, indent=False)
350
    c = Client(procedure, body, self.port)
351
    skip_dict = self._ConnectList(c, node_list, procedure)
352
    skip_dict.update(c.GetResults())
353
    return skip_dict
354

    
355
  @classmethod
356
  def _StaticMultiNodeCall(cls, node_list, procedure, args,
357
                           address_list=None):
358
    """Helper for making a multi-node static call
359

360
    """
361
    body = serializer.DumpJson(args, indent=False)
362
    c = Client(procedure, body, utils.GetNodeDaemonPort())
363
    c.ConnectList(node_list, address_list=address_list)
364
    return c.GetResults()
365

    
366
  def _SingleNodeCall(self, node, procedure, args):
367
    """Helper for making a single-node call
368

369
    """
370
    body = serializer.DumpJson(args, indent=False)
371
    c = Client(procedure, body, self.port)
372
    result = self._ConnectNode(c, node, procedure)
373
    if result is None:
374
      # we did connect, node is not offline
375
      result = c.GetResults()[node]
376
    return result
377

    
378
  @classmethod
379
  def _StaticSingleNodeCall(cls, node, procedure, args):
380
    """Helper for making a single-node static call
381

382
    """
383
    body = serializer.DumpJson(args, indent=False)
384
    c = Client(procedure, body, utils.GetNodeDaemonPort())
385
    c.ConnectNode(node)
386
    return c.GetResults()[node]
387

    
388
  @staticmethod
389
  def _Compress(data):
390
    """Compresses a string for transport over RPC.
391

392
    Small amounts of data are not compressed.
393

394
    @type data: str
395
    @param data: Data
396
    @rtype: tuple
397
    @return: Encoded data to send
398

399
    """
400
    # Small amounts of data are not compressed
401
    if len(data) < 512:
402
      return (constants.RPC_ENCODING_NONE, data)
403

    
404
    # Compress with zlib and encode in base64
405
    return (constants.RPC_ENCODING_ZLIB_BASE64,
406
            base64.b64encode(zlib.compress(data, 3)))
407

    
408
  #
409
  # Begin RPC calls
410
  #
411

    
412
  def call_volume_list(self, node_list, vg_name):
413
    """Gets the logical volumes present in a given volume group.
414

415
    This is a multi-node call.
416

417
    """
418
    return self._MultiNodeCall(node_list, "volume_list", [vg_name])
419

    
420
  def call_vg_list(self, node_list):
421
    """Gets the volume group list.
422

423
    This is a multi-node call.
424

425
    """
426
    return self._MultiNodeCall(node_list, "vg_list", [])
427

    
428
  def call_bridges_exist(self, node, bridges_list):
429
    """Checks if a node has all the bridges given.
430

431
    This method checks if all bridges given in the bridges_list are
432
    present on the remote node, so that an instance that uses interfaces
433
    on those bridges can be started.
434

435
    This is a single-node call.
436

437
    """
438
    return self._SingleNodeCall(node, "bridges_exist", [bridges_list])
439

    
440
  def call_instance_start(self, node, instance, hvp, bep):
441
    """Starts an instance.
442

443
    This is a single-node call.
444

445
    """
446
    idict = self._InstDict(instance, hvp=hvp, bep=bep)
447
    return self._SingleNodeCall(node, "instance_start", [idict])
448

    
449
  def call_instance_shutdown(self, node, instance):
450
    """Stops an instance.
451

452
    This is a single-node call.
453

454
    """
455
    return self._SingleNodeCall(node, "instance_shutdown",
456
                                [self._InstDict(instance)])
457

    
458
  def call_migration_info(self, node, instance):
459
    """Gather the information necessary to prepare an instance migration.
460

461
    This is a single-node call.
462

463
    @type node: string
464
    @param node: the node on which the instance is currently running
465
    @type instance: C{objects.Instance}
466
    @param instance: the instance definition
467

468
    """
469
    return self._SingleNodeCall(node, "migration_info",
470
                                [self._InstDict(instance)])
471

    
472
  def call_accept_instance(self, node, instance, info, target):
473
    """Prepare a node to accept an instance.
474

475
    This is a single-node call.
476

477
    @type node: string
478
    @param node: the target node for the migration
479
    @type instance: C{objects.Instance}
480
    @param instance: the instance definition
481
    @type info: opaque/hypervisor specific (string/data)
482
    @param info: result for the call_migration_info call
483
    @type target: string
484
    @param target: target hostname (usually ip address) (on the node itself)
485

486
    """
487
    return self._SingleNodeCall(node, "accept_instance",
488
                                [self._InstDict(instance), info, target])
489

    
490
  def call_finalize_migration(self, node, instance, info, success):
491
    """Finalize any target-node migration specific operation.
492

493
    This is called both in case of a successful migration and in case of error
494
    (in which case it should abort the migration).
495

496
    This is a single-node call.
497

498
    @type node: string
499
    @param node: the target node for the migration
500
    @type instance: C{objects.Instance}
501
    @param instance: the instance definition
502
    @type info: opaque/hypervisor specific (string/data)
503
    @param info: result for the call_migration_info call
504
    @type success: boolean
505
    @param success: whether the migration was a success or a failure
506

507
    """
508
    return self._SingleNodeCall(node, "finalize_migration",
509
                                [self._InstDict(instance), info, success])
510

    
511
  def call_instance_migrate(self, node, instance, target, live):
512
    """Migrate an instance.
513

514
    This is a single-node call.
515

516
    @type node: string
517
    @param node: the node on which the instance is currently running
518
    @type instance: C{objects.Instance}
519
    @param instance: the instance definition
520
    @type target: string
521
    @param target: the target node name
522
    @type live: boolean
523
    @param live: whether the migration should be done live or not (the
524
        interpretation of this parameter is left to the hypervisor)
525

526
    """
527
    return self._SingleNodeCall(node, "instance_migrate",
528
                                [self._InstDict(instance), target, live])
529

    
530
  def call_instance_reboot(self, node, instance, reboot_type):
531
    """Reboots an instance.
532

533
    This is a single-node call.
534

535
    """
536
    return self._SingleNodeCall(node, "instance_reboot",
537
                                [self._InstDict(instance), reboot_type])
538

    
539
  def call_instance_os_add(self, node, inst, reinstall):
540
    """Installs an OS on the given instance.
541

542
    This is a single-node call.
543

544
    """
545
    return self._SingleNodeCall(node, "instance_os_add",
546
                                [self._InstDict(inst), reinstall])
547

    
548
  def call_instance_run_rename(self, node, inst, old_name):
549
    """Run the OS rename script for an instance.
550

551
    This is a single-node call.
552

553
    """
554
    return self._SingleNodeCall(node, "instance_run_rename",
555
                                [self._InstDict(inst), old_name])
556

    
557
  def call_instance_info(self, node, instance, hname):
558
    """Returns information about a single instance.
559

560
    This is a single-node call.
561

562
    @type node: list
563
    @param node: the list of nodes to query
564
    @type instance: string
565
    @param instance: the instance name
566
    @type hname: string
567
    @param hname: the hypervisor type of the instance
568

569
    """
570
    return self._SingleNodeCall(node, "instance_info", [instance, hname])
571

    
572
  def call_instance_migratable(self, node, instance):
573
    """Checks whether the given instance can be migrated.
574

575
    This is a single-node call.
576

577
    @param node: the node to query
578
    @type instance: L{objects.Instance}
579
    @param instance: the instance to check
580

581

582
    """
583
    return self._SingleNodeCall(node, "instance_migratable",
584
                                [self._InstDict(instance)])
585

    
586
  def call_all_instances_info(self, node_list, hypervisor_list):
587
    """Returns information about all instances on the given nodes.
588

589
    This is a multi-node call.
590

591
    @type node_list: list
592
    @param node_list: the list of nodes to query
593
    @type hypervisor_list: list
594
    @param hypervisor_list: the hypervisors to query for instances
595

596
    """
597
    return self._MultiNodeCall(node_list, "all_instances_info",
598
                               [hypervisor_list])
599

    
600
  def call_instance_list(self, node_list, hypervisor_list):
601
    """Returns the list of running instances on a given node.
602

603
    This is a multi-node call.
604

605
    @type node_list: list
606
    @param node_list: the list of nodes to query
607
    @type hypervisor_list: list
608
    @param hypervisor_list: the hypervisors to query for instances
609

610
    """
611
    return self._MultiNodeCall(node_list, "instance_list", [hypervisor_list])
612

    
613
  def call_node_tcp_ping(self, node, source, target, port, timeout,
614
                         live_port_needed):
615
    """Do a TcpPing on the remote node
616

617
    This is a single-node call.
618

619
    """
620
    return self._SingleNodeCall(node, "node_tcp_ping",
621
                                [source, target, port, timeout,
622
                                 live_port_needed])
623

    
624
  def call_node_has_ip_address(self, node, address):
625
    """Checks if a node has the given IP address.
626

627
    This is a single-node call.
628

629
    """
630
    return self._SingleNodeCall(node, "node_has_ip_address", [address])
631

    
632
  def call_node_info(self, node_list, vg_name, hypervisor_type):
633
    """Return node information.
634

635
    This will return memory information and volume group size and free
636
    space.
637

638
    This is a multi-node call.
639

640
    @type node_list: list
641
    @param node_list: the list of nodes to query
642
    @type vg_name: C{string}
643
    @param vg_name: the name of the volume group to ask for disk space
644
        information
645
    @type hypervisor_type: C{str}
646
    @param hypervisor_type: the name of the hypervisor to ask for
647
        memory information
648

649
    """
650
    retux = self._MultiNodeCall(node_list, "node_info",
651
                                [vg_name, hypervisor_type])
652

    
653
    for result in retux.itervalues():
654
      if result.failed or not isinstance(result.data, dict):
655
        result.data = {}
656
      if result.offline:
657
        log_name = None
658
      else:
659
        log_name = "call_node_info"
660

    
661
      utils.CheckDict(result.data, {
662
        'memory_total' : '-',
663
        'memory_dom0' : '-',
664
        'memory_free' : '-',
665
        'vg_size' : 'node_unreachable',
666
        'vg_free' : '-',
667
        }, log_name)
668
    return retux
669

    
670
  def call_node_add(self, node, dsa, dsapub, rsa, rsapub, ssh, sshpub):
671
    """Add a node to the cluster.
672

673
    This is a single-node call.
674

675
    """
676
    return self._SingleNodeCall(node, "node_add",
677
                                [dsa, dsapub, rsa, rsapub, ssh, sshpub])
678

    
679
  def call_node_verify(self, node_list, checkdict, cluster_name):
680
    """Request verification of given parameters.
681

682
    This is a multi-node call.
683

684
    """
685
    return self._MultiNodeCall(node_list, "node_verify",
686
                               [checkdict, cluster_name])
687

    
688
  @classmethod
689
  def call_node_start_master(cls, node, start_daemons):
690
    """Tells a node to activate itself as a master.
691

692
    This is a single-node call.
693

694
    """
695
    return cls._StaticSingleNodeCall(node, "node_start_master",
696
                                     [start_daemons])
697

    
698
  @classmethod
699
  def call_node_stop_master(cls, node, stop_daemons):
700
    """Tells a node to demote itself from master status.
701

702
    This is a single-node call.
703

704
    """
705
    return cls._StaticSingleNodeCall(node, "node_stop_master", [stop_daemons])
706

    
707
  @classmethod
708
  def call_master_info(cls, node_list):
709
    """Query master info.
710

711
    This is a multi-node call.
712

713
    """
714
    # TODO: should this method query down nodes?
715
    return cls._StaticMultiNodeCall(node_list, "master_info", [])
716

    
717
  def call_version(self, node_list):
718
    """Query node version.
719

720
    This is a multi-node call.
721

722
    """
723
    return self._MultiNodeCall(node_list, "version", [])
724

    
725
  def call_blockdev_create(self, node, bdev, size, owner, on_primary, info):
726
    """Request creation of a given block device.
727

728
    This is a single-node call.
729

730
    """
731
    return self._SingleNodeCall(node, "blockdev_create",
732
                                [bdev.ToDict(), size, owner, on_primary, info])
733

    
734
  def call_blockdev_remove(self, node, bdev):
735
    """Request removal of a given block device.
736

737
    This is a single-node call.
738

739
    """
740
    return self._SingleNodeCall(node, "blockdev_remove", [bdev.ToDict()])
741

    
742
  def call_blockdev_rename(self, node, devlist):
743
    """Request rename of the given block devices.
744

745
    This is a single-node call.
746

747
    """
748
    return self._SingleNodeCall(node, "blockdev_rename",
749
                                [(d.ToDict(), uid) for d, uid in devlist])
750

    
751
  def call_blockdev_assemble(self, node, disk, owner, on_primary):
752
    """Request assembling of a given block device.
753

754
    This is a single-node call.
755

756
    """
757
    return self._SingleNodeCall(node, "blockdev_assemble",
758
                                [disk.ToDict(), owner, on_primary])
759

    
760
  def call_blockdev_shutdown(self, node, disk):
761
    """Request shutdown of a given block device.
762

763
    This is a single-node call.
764

765
    """
766
    return self._SingleNodeCall(node, "blockdev_shutdown", [disk.ToDict()])
767

    
768
  def call_blockdev_addchildren(self, node, bdev, ndevs):
769
    """Request adding a list of children to a (mirroring) device.
770

771
    This is a single-node call.
772

773
    """
774
    return self._SingleNodeCall(node, "blockdev_addchildren",
775
                                [bdev.ToDict(),
776
                                 [disk.ToDict() for disk in ndevs]])
777

    
778
  def call_blockdev_removechildren(self, node, bdev, ndevs):
779
    """Request removing a list of children from a (mirroring) device.
780

781
    This is a single-node call.
782

783
    """
784
    return self._SingleNodeCall(node, "blockdev_removechildren",
785
                                [bdev.ToDict(),
786
                                 [disk.ToDict() for disk in ndevs]])
787

    
788
  def call_blockdev_getmirrorstatus(self, node, disks):
789
    """Request status of a (mirroring) device.
790

791
    This is a single-node call.
792

793
    """
794
    return self._SingleNodeCall(node, "blockdev_getmirrorstatus",
795
                                [dsk.ToDict() for dsk in disks])
796

    
797
  def call_blockdev_find(self, node, disk):
798
    """Request identification of a given block device.
799

800
    This is a single-node call.
801

802
    """
803
    return self._SingleNodeCall(node, "blockdev_find", [disk.ToDict()])
804

    
805
  def call_blockdev_close(self, node, instance_name, disks):
806
    """Closes the given block devices.
807

808
    This is a single-node call.
809

810
    """
811
    params = [instance_name, [cf.ToDict() for cf in disks]]
812
    return self._SingleNodeCall(node, "blockdev_close", params)
813

    
814
  def call_drbd_disconnect_net(self, node_list, nodes_ip, disks):
815
    """Disconnects the network of the given drbd devices.
816

817
    This is a multi-node call.
818

819
    """
820
    return self._MultiNodeCall(node_list, "drbd_disconnect_net",
821
                               [nodes_ip, [cf.ToDict() for cf in disks]])
822

    
823
  def call_drbd_attach_net(self, node_list, nodes_ip,
824
                           disks, instance_name, multimaster):
825
    """Disconnects the given drbd devices.
826

827
    This is a multi-node call.
828

829
    """
830
    return self._MultiNodeCall(node_list, "drbd_attach_net",
831
                               [nodes_ip, [cf.ToDict() for cf in disks],
832
                                instance_name, multimaster])
833

    
834
  def call_drbd_wait_sync(self, node_list, nodes_ip, disks):
835
    """Waits for the synchronization of drbd devices is complete.
836

837
    This is a multi-node call.
838

839
    """
840
    return self._MultiNodeCall(node_list, "drbd_wait_sync",
841
                               [nodes_ip, [cf.ToDict() for cf in disks]])
842

    
843
  @classmethod
844
  def call_upload_file(cls, node_list, file_name, address_list=None):
845
    """Upload a file.
846

847
    The node will refuse the operation in case the file is not on the
848
    approved file list.
849

850
    This is a multi-node call.
851

852
    @type node_list: list
853
    @param node_list: the list of node names to upload to
854
    @type file_name: str
855
    @param file_name: the filename to upload
856
    @type address_list: list or None
857
    @keyword address_list: an optional list of node addresses, in order
858
        to optimize the RPC speed
859

860
    """
861
    file_contents = utils.ReadFile(file_name)
862
    data = cls._Compress(file_contents)
863
    st = os.stat(file_name)
864
    params = [file_name, data, st.st_mode, st.st_uid, st.st_gid,
865
              st.st_atime, st.st_mtime]
866
    return cls._StaticMultiNodeCall(node_list, "upload_file", params,
867
                                    address_list=address_list)
868

    
869
  @classmethod
870
  def call_write_ssconf_files(cls, node_list, values):
871
    """Write ssconf files.
872

873
    This is a multi-node call.
874

875
    """
876
    return cls._StaticMultiNodeCall(node_list, "write_ssconf_files", [values])
877

    
878
  def call_os_diagnose(self, node_list):
879
    """Request a diagnose of OS definitions.
880

881
    This is a multi-node call.
882

883
    """
884
    result = self._MultiNodeCall(node_list, "os_diagnose", [])
885

    
886
    for node_result in result.values():
887
      if not node_result.failed and node_result.data:
888
        node_result.data = [objects.OS.FromDict(oss)
889
                            for oss in node_result.data]
890
    return result
891

    
892
  def call_os_get(self, node, name):
893
    """Returns an OS definition.
894

895
    This is a single-node call.
896

897
    """
898
    result = self._SingleNodeCall(node, "os_get", [name])
899
    if not result.failed and isinstance(result.data, dict):
900
      result.data = objects.OS.FromDict(result.data)
901
    return result
902

    
903
  def call_hooks_runner(self, node_list, hpath, phase, env):
904
    """Call the hooks runner.
905

906
    Args:
907
      - op: the OpCode instance
908
      - env: a dictionary with the environment
909

910
    This is a multi-node call.
911

912
    """
913
    params = [hpath, phase, env]
914
    return self._MultiNodeCall(node_list, "hooks_runner", params)
915

    
916
  def call_iallocator_runner(self, node, name, idata):
917
    """Call an iallocator on a remote node
918

919
    Args:
920
      - name: the iallocator name
921
      - input: the json-encoded input string
922

923
    This is a single-node call.
924

925
    """
926
    return self._SingleNodeCall(node, "iallocator_runner", [name, idata])
927

    
928
  def call_blockdev_grow(self, node, cf_bdev, amount):
929
    """Request a snapshot of the given block device.
930

931
    This is a single-node call.
932

933
    """
934
    return self._SingleNodeCall(node, "blockdev_grow",
935
                                [cf_bdev.ToDict(), amount])
936

    
937
  def call_blockdev_snapshot(self, node, cf_bdev):
938
    """Request a snapshot of the given block device.
939

940
    This is a single-node call.
941

942
    """
943
    return self._SingleNodeCall(node, "blockdev_snapshot", [cf_bdev.ToDict()])
944

    
945
  def call_snapshot_export(self, node, snap_bdev, dest_node, instance,
946
                           cluster_name, idx):
947
    """Request the export of a given snapshot.
948

949
    This is a single-node call.
950

951
    """
952
    return self._SingleNodeCall(node, "snapshot_export",
953
                                [snap_bdev.ToDict(), dest_node,
954
                                 self._InstDict(instance), cluster_name, idx])
955

    
956
  def call_finalize_export(self, node, instance, snap_disks):
957
    """Request the completion of an export operation.
958

959
    This writes the export config file, etc.
960

961
    This is a single-node call.
962

963
    """
964
    flat_disks = []
965
    for disk in snap_disks:
966
      flat_disks.append(disk.ToDict())
967

    
968
    return self._SingleNodeCall(node, "finalize_export",
969
                                [self._InstDict(instance), flat_disks])
970

    
971
  def call_export_info(self, node, path):
972
    """Queries the export information in a given path.
973

974
    This is a single-node call.
975

976
    """
977
    return self._SingleNodeCall(node, "export_info", [path])
978

    
979
  def call_instance_os_import(self, node, inst, src_node, src_images,
980
                              cluster_name):
981
    """Request the import of a backup into an instance.
982

983
    This is a single-node call.
984

985
    """
986
    return self._SingleNodeCall(node, "instance_os_import",
987
                                [self._InstDict(inst), src_node, src_images,
988
                                 cluster_name])
989

    
990
  def call_export_list(self, node_list):
991
    """Gets the stored exports list.
992

993
    This is a multi-node call.
994

995
    """
996
    return self._MultiNodeCall(node_list, "export_list", [])
997

    
998
  def call_export_remove(self, node, export):
999
    """Requests removal of a given export.
1000

1001
    This is a single-node call.
1002

1003
    """
1004
    return self._SingleNodeCall(node, "export_remove", [export])
1005

    
1006
  @classmethod
1007
  def call_node_leave_cluster(cls, node):
1008
    """Requests a node to clean the cluster information it has.
1009

1010
    This will remove the configuration information from the ganeti data
1011
    dir.
1012

1013
    This is a single-node call.
1014

1015
    """
1016
    return cls._StaticSingleNodeCall(node, "node_leave_cluster", [])
1017

    
1018
  def call_node_volumes(self, node_list):
1019
    """Gets all volumes on node(s).
1020

1021
    This is a multi-node call.
1022

1023
    """
1024
    return self._MultiNodeCall(node_list, "node_volumes", [])
1025

    
1026
  def call_node_demote_from_mc(self, node):
1027
    """Demote a node from the master candidate role.
1028

1029
    This is a single-node call.
1030

1031
    """
1032
    return self._SingleNodeCall(node, "node_demote_from_mc", [])
1033

    
1034

    
1035
  def call_node_powercycle(self, node, hypervisor):
1036
    """Tries to powercycle a node.
1037

1038
    This is a single-node call.
1039

1040
    """
1041
    return self._SingleNodeCall(node, "node_powercycle", [hypervisor])
1042

    
1043

    
1044
  def call_test_delay(self, node_list, duration):
1045
    """Sleep for a fixed time on given node(s).
1046

1047
    This is a multi-node call.
1048

1049
    """
1050
    return self._MultiNodeCall(node_list, "test_delay", [duration])
1051

    
1052
  def call_file_storage_dir_create(self, node, file_storage_dir):
1053
    """Create the given file storage directory.
1054

1055
    This is a single-node call.
1056

1057
    """
1058
    return self._SingleNodeCall(node, "file_storage_dir_create",
1059
                                [file_storage_dir])
1060

    
1061
  def call_file_storage_dir_remove(self, node, file_storage_dir):
1062
    """Remove the given file storage directory.
1063

1064
    This is a single-node call.
1065

1066
    """
1067
    return self._SingleNodeCall(node, "file_storage_dir_remove",
1068
                                [file_storage_dir])
1069

    
1070
  def call_file_storage_dir_rename(self, node, old_file_storage_dir,
1071
                                   new_file_storage_dir):
1072
    """Rename file storage directory.
1073

1074
    This is a single-node call.
1075

1076
    """
1077
    return self._SingleNodeCall(node, "file_storage_dir_rename",
1078
                                [old_file_storage_dir, new_file_storage_dir])
1079

    
1080
  @classmethod
1081
  def call_jobqueue_update(cls, node_list, address_list, file_name, content):
1082
    """Update job queue.
1083

1084
    This is a multi-node call.
1085

1086
    """
1087
    return cls._StaticMultiNodeCall(node_list, "jobqueue_update",
1088
                                    [file_name, cls._Compress(content)],
1089
                                    address_list=address_list)
1090

    
1091
  @classmethod
1092
  def call_jobqueue_purge(cls, node):
1093
    """Purge job queue.
1094

1095
    This is a single-node call.
1096

1097
    """
1098
    return cls._StaticSingleNodeCall(node, "jobqueue_purge", [])
1099

    
1100
  @classmethod
1101
  def call_jobqueue_rename(cls, node_list, address_list, rename):
1102
    """Rename a job queue file.
1103

1104
    This is a multi-node call.
1105

1106
    """
1107
    return cls._StaticMultiNodeCall(node_list, "jobqueue_rename", rename,
1108
                                    address_list=address_list)
1109

    
1110
  @classmethod
1111
  def call_jobqueue_set_drain(cls, node_list, drain_flag):
1112
    """Set the drain flag on the queue.
1113

1114
    This is a multi-node call.
1115

1116
    @type node_list: list
1117
    @param node_list: the list of nodes to query
1118
    @type drain_flag: bool
1119
    @param drain_flag: if True, will set the drain flag, otherwise reset it.
1120

1121
    """
1122
    return cls._StaticMultiNodeCall(node_list, "jobqueue_set_drain",
1123
                                    [drain_flag])
1124

    
1125
  def call_hypervisor_validate_params(self, node_list, hvname, hvparams):
1126
    """Validate the hypervisor params.
1127

1128
    This is a multi-node call.
1129

1130
    @type node_list: list
1131
    @param node_list: the list of nodes to query
1132
    @type hvname: string
1133
    @param hvname: the hypervisor name
1134
    @type hvparams: dict
1135
    @param hvparams: the hypervisor parameters to be validated
1136

1137
    """
1138
    cluster = self._cfg.GetClusterInfo()
1139
    hv_full = objects.FillDict(cluster.hvparams.get(hvname, {}), hvparams)
1140
    return self._MultiNodeCall(node_list, "hypervisor_validate_params",
1141
                               [hvname, hv_full])