Statistics
| Branch: | Tag: | Revision:

root / lib / rpc.py @ 11344a50

History | View | Annotate | Download (32.8 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Inter-node RPC library.
23

24
"""
25

    
26
# pylint: disable-msg=C0103,R0201,R0904
27
# C0103: Invalid name, since call_ are not valid
28
# R0201: Method could be a function, we keep all rpcs instance methods
29
# as not to change them back and forth between static/instance methods
30
# if they need to start using instance attributes
31
# R0904: Too many public methods
32

    
33
import os
34
import socket
35
import logging
36
import zlib
37
import base64
38

    
39
from ganeti import utils
40
from ganeti import objects
41
from ganeti import http
42
from ganeti import serializer
43
from ganeti import constants
44
from ganeti import errors
45

    
46
import ganeti.http.client
47

    
48

    
49
# Module level variable
50
_http_manager = None
51

    
52

    
53
def Init():
54
  """Initializes the module-global HTTP client manager.
55

56
  Must be called before using any RPC function.
57

58
  """
59
  global _http_manager
60

    
61
  assert not _http_manager, "RPC module initialized more than once"
62

    
63
  _http_manager = http.client.HttpClientManager()
64

    
65

    
66
def Shutdown():
67
  """Stops the module-global HTTP client manager.
68

69
  Must be called before quitting the program.
70

71
  """
72
  global _http_manager
73

    
74
  if _http_manager:
75
    _http_manager.Shutdown()
76
    _http_manager = None
77

    
78

    
79
class RpcResult(object):
80
  """RPC Result class.
81

82
  This class holds an RPC result. It is needed since in multi-node
83
  calls we can't raise an exception just because one one out of many
84
  failed, and therefore we use this class to encapsulate the result.
85

86
  @ivar data: the data payload, for successfull results, or None
87
  @type failed: boolean
88
  @ivar failed: whether the operation failed at RPC level (not
89
      application level on the remote node)
90
  @ivar call: the name of the RPC call
91
  @ivar node: the name of the node to which we made the call
92
  @ivar offline: whether the operation failed because the node was
93
      offline, as opposed to actual failure; offline=True will always
94
      imply failed=True, in order to allow simpler checking if
95
      the user doesn't care about the exact failure mode
96

97
  """
98
  def __init__(self, data=None, failed=False, offline=False,
99
               call=None, node=None):
100
    self.failed = failed
101
    self.offline = offline
102
    self.call = call
103
    self.node = node
104
    if offline:
105
      self.failed = True
106
      self.error = "Node is marked offline"
107
      self.data = self.payload = None
108
    elif failed:
109
      self.error = data
110
      self.data = self.payload = None
111
    else:
112
      self.data = data
113
      self.error = None
114
      if isinstance(data, (tuple, list)) and len(data) == 2:
115
        self.payload = data[1]
116
      else:
117
        self.payload = None
118

    
119
  def Raise(self):
120
    """If the result has failed, raise an OpExecError.
121

122
    This is used so that LU code doesn't have to check for each
123
    result, but instead can call this function.
124

125
    """
126
    if self.failed:
127
      raise errors.OpExecError("Call '%s' to node '%s' has failed: %s" %
128
                               (self.call, self.node, self.error))
129

    
130
  def RemoteFailMsg(self):
131
    """Check if the remote procedure failed.
132

133
    This is valid only for RPC calls which return result of the form
134
    (status, data | error_msg).
135

136
    @return: empty string for succcess, otherwise an error message
137

138
    """
139
    def _EnsureErr(val):
140
      """Helper to ensure we return a 'True' value for error."""
141
      if val:
142
        return val
143
      else:
144
        return "No error information"
145

    
146
    if self.failed:
147
      return _EnsureErr(self.error)
148
    if not isinstance(self.data, (tuple, list)):
149
      return "Invalid result type (%s)" % type(self.data)
150
    if len(self.data) != 2:
151
      return "Invalid result length (%d), expected 2" % len(self.data)
152
    if not self.data[0]:
153
      return _EnsureErr(self.data[1])
154
    return ""
155

    
156

    
157
class Client:
158
  """RPC Client class.
159

160
  This class, given a (remote) method name, a list of parameters and a
161
  list of nodes, will contact (in parallel) all nodes, and return a
162
  dict of results (key: node name, value: result).
163

164
  One current bug is that generic failure is still signalled by
165
  'False' result, which is not good. This overloading of values can
166
  cause bugs.
167

168
  """
169
  def __init__(self, procedure, body, port):
170
    self.procedure = procedure
171
    self.body = body
172
    self.port = port
173
    self.nc = {}
174

    
175
    self._ssl_params = \
176
      http.HttpSslParams(ssl_key_path=constants.SSL_CERT_FILE,
177
                         ssl_cert_path=constants.SSL_CERT_FILE)
178

    
179
  def ConnectList(self, node_list, address_list=None):
180
    """Add a list of nodes to the target nodes.
181

182
    @type node_list: list
183
    @param node_list: the list of node names to connect
184
    @type address_list: list or None
185
    @keyword address_list: either None or a list with node addresses,
186
        which must have the same length as the node list
187

188
    """
189
    if address_list is None:
190
      address_list = [None for _ in node_list]
191
    else:
192
      assert len(node_list) == len(address_list), \
193
             "Name and address lists should have the same length"
194
    for node, address in zip(node_list, address_list):
195
      self.ConnectNode(node, address)
196

    
197
  def ConnectNode(self, name, address=None):
198
    """Add a node to the target list.
199

200
    @type name: str
201
    @param name: the node name
202
    @type address: str
203
    @keyword address: the node address, if known
204

205
    """
206
    if address is None:
207
      address = name
208

    
209
    self.nc[name] = \
210
      http.client.HttpClientRequest(address, self.port, http.HTTP_PUT,
211
                                    "/%s" % self.procedure,
212
                                    post_data=self.body,
213
                                    ssl_params=self._ssl_params,
214
                                    ssl_verify_peer=True)
215

    
216
  def GetResults(self):
217
    """Call nodes and return results.
218

219
    @rtype: list
220
    @returns: List of RPC results
221

222
    """
223
    assert _http_manager, "RPC module not intialized"
224

    
225
    _http_manager.ExecRequests(self.nc.values())
226

    
227
    results = {}
228

    
229
    for name, req in self.nc.iteritems():
230
      if req.success and req.resp_status_code == http.HTTP_OK:
231
        results[name] = RpcResult(data=serializer.LoadJson(req.resp_body),
232
                                  node=name, call=self.procedure)
233
        continue
234

    
235
      # TODO: Better error reporting
236
      if req.error:
237
        msg = req.error
238
      else:
239
        msg = req.resp_body
240

    
241
      logging.error("RPC error in %s from node %s: %s",
242
                    self.procedure, name, msg)
243
      results[name] = RpcResult(data=msg, failed=True, node=name,
244
                                call=self.procedure)
245

    
246
    return results
247

    
248

    
249
class RpcRunner(object):
250
  """RPC runner class"""
251

    
252
  def __init__(self, cfg):
253
    """Initialized the rpc runner.
254

255
    @type cfg:  C{config.ConfigWriter}
256
    @param cfg: the configuration object that will be used to get data
257
                about the cluster
258

259
    """
260
    self._cfg = cfg
261
    self.port = utils.GetNodeDaemonPort()
262

    
263
  def _InstDict(self, instance):
264
    """Convert the given instance to a dict.
265

266
    This is done via the instance's ToDict() method and additionally
267
    we fill the hvparams with the cluster defaults.
268

269
    @type instance: L{objects.Instance}
270
    @param instance: an Instance object
271
    @rtype: dict
272
    @return: the instance dict, with the hvparams filled with the
273
        cluster defaults
274

275
    """
276
    idict = instance.ToDict()
277
    cluster = self._cfg.GetClusterInfo()
278
    idict["hvparams"] = cluster.FillHV(instance)
279
    idict["beparams"] = cluster.FillBE(instance)
280
    return idict
281

    
282
  def _ConnectList(self, client, node_list, call):
283
    """Helper for computing node addresses.
284

285
    @type client: L{Client}
286
    @param client: a C{Client} instance
287
    @type node_list: list
288
    @param node_list: the node list we should connect
289
    @type call: string
290
    @param call: the name of the remote procedure call, for filling in
291
        correctly any eventual offline nodes' results
292

293
    """
294
    all_nodes = self._cfg.GetAllNodesInfo()
295
    name_list = []
296
    addr_list = []
297
    skip_dict = {}
298
    for node in node_list:
299
      if node in all_nodes:
300
        if all_nodes[node].offline:
301
          skip_dict[node] = RpcResult(node=node, offline=True, call=call)
302
          continue
303
        val = all_nodes[node].primary_ip
304
      else:
305
        val = None
306
      addr_list.append(val)
307
      name_list.append(node)
308
    if name_list:
309
      client.ConnectList(name_list, address_list=addr_list)
310
    return skip_dict
311

    
312
  def _ConnectNode(self, client, node, call):
313
    """Helper for computing one node's address.
314

315
    @type client: L{Client}
316
    @param client: a C{Client} instance
317
    @type node: str
318
    @param node: the node we should connect
319
    @type call: string
320
    @param call: the name of the remote procedure call, for filling in
321
        correctly any eventual offline nodes' results
322

323
    """
324
    node_info = self._cfg.GetNodeInfo(node)
325
    if node_info is not None:
326
      if node_info.offline:
327
        return RpcResult(node=node, offline=True, call=call)
328
      addr = node_info.primary_ip
329
    else:
330
      addr = None
331
    client.ConnectNode(node, address=addr)
332

    
333
  def _MultiNodeCall(self, node_list, procedure, args):
334
    """Helper for making a multi-node call
335

336
    """
337
    body = serializer.DumpJson(args, indent=False)
338
    c = Client(procedure, body, self.port)
339
    skip_dict = self._ConnectList(c, node_list, procedure)
340
    skip_dict.update(c.GetResults())
341
    return skip_dict
342

    
343
  @classmethod
344
  def _StaticMultiNodeCall(cls, node_list, procedure, args,
345
                           address_list=None):
346
    """Helper for making a multi-node static call
347

348
    """
349
    body = serializer.DumpJson(args, indent=False)
350
    c = Client(procedure, body, utils.GetNodeDaemonPort())
351
    c.ConnectList(node_list, address_list=address_list)
352
    return c.GetResults()
353

    
354
  def _SingleNodeCall(self, node, procedure, args):
355
    """Helper for making a single-node call
356

357
    """
358
    body = serializer.DumpJson(args, indent=False)
359
    c = Client(procedure, body, self.port)
360
    result = self._ConnectNode(c, node, procedure)
361
    if result is None:
362
      # we did connect, node is not offline
363
      result = c.GetResults()[node]
364
    return result
365

    
366
  @classmethod
367
  def _StaticSingleNodeCall(cls, node, procedure, args):
368
    """Helper for making a single-node static call
369

370
    """
371
    body = serializer.DumpJson(args, indent=False)
372
    c = Client(procedure, body, utils.GetNodeDaemonPort())
373
    c.ConnectNode(node)
374
    return c.GetResults()[node]
375

    
376
  @staticmethod
377
  def _Compress(data):
378
    """Compresses a string for transport over RPC.
379

380
    Small amounts of data are not compressed.
381

382
    @type data: str
383
    @param data: Data
384
    @rtype: tuple
385
    @return: Encoded data to send
386

387
    """
388
    # Small amounts of data are not compressed
389
    if len(data) < 512:
390
      return (constants.RPC_ENCODING_NONE, data)
391

    
392
    # Compress with zlib and encode in base64
393
    return (constants.RPC_ENCODING_ZLIB_BASE64,
394
            base64.b64encode(zlib.compress(data, 3)))
395

    
396
  #
397
  # Begin RPC calls
398
  #
399

    
400
  def call_volume_list(self, node_list, vg_name):
401
    """Gets the logical volumes present in a given volume group.
402

403
    This is a multi-node call.
404

405
    """
406
    return self._MultiNodeCall(node_list, "volume_list", [vg_name])
407

    
408
  def call_vg_list(self, node_list):
409
    """Gets the volume group list.
410

411
    This is a multi-node call.
412

413
    """
414
    return self._MultiNodeCall(node_list, "vg_list", [])
415

    
416
  def call_bridges_exist(self, node, bridges_list):
417
    """Checks if a node has all the bridges given.
418

419
    This method checks if all bridges given in the bridges_list are
420
    present on the remote node, so that an instance that uses interfaces
421
    on those bridges can be started.
422

423
    This is a single-node call.
424

425
    """
426
    return self._SingleNodeCall(node, "bridges_exist", [bridges_list])
427

    
428
  def call_instance_start(self, node, instance, extra_args):
429
    """Starts an instance.
430

431
    This is a single-node call.
432

433
    """
434
    return self._SingleNodeCall(node, "instance_start",
435
                                [self._InstDict(instance), extra_args])
436

    
437
  def call_instance_shutdown(self, node, instance):
438
    """Stops an instance.
439

440
    This is a single-node call.
441

442
    """
443
    return self._SingleNodeCall(node, "instance_shutdown",
444
                                [self._InstDict(instance)])
445

    
446
  def call_migration_info(self, node, instance):
447
    """Gather the information necessary to prepare an instance migration.
448

449
    This is a single-node call.
450

451
    @type node: string
452
    @param node: the node on which the instance is currently running
453
    @type instance: C{objects.Instance}
454
    @param instance: the instance definition
455

456
    """
457
    return self._SingleNodeCall(node, "migration_info",
458
                                [self._InstDict(instance)])
459

    
460
  def call_accept_instance(self, node, instance, info, target):
461
    """Prepare a node to accept an instance.
462

463
    This is a single-node call.
464

465
    @type node: string
466
    @param node: the target node for the migration
467
    @type instance: C{objects.Instance}
468
    @param instance: the instance definition
469
    @type info: opaque/hypervisor specific (string/data)
470
    @param info: result for the call_migration_info call
471
    @type target: string
472
    @param target: target hostname (usually ip address) (on the node itself)
473

474
    """
475
    return self._SingleNodeCall(node, "accept_instance",
476
                                [self._InstDict(instance), info, target])
477

    
478
  def call_finalize_migration(self, node, instance, info, success):
479
    """Finalize any target-node migration specific operation.
480

481
    This is called both in case of a successful migration and in case of error
482
    (in which case it should abort the migration).
483

484
    This is a single-node call.
485

486
    @type node: string
487
    @param node: the target node for the migration
488
    @type instance: C{objects.Instance}
489
    @param instance: the instance definition
490
    @type info: opaque/hypervisor specific (string/data)
491
    @param info: result for the call_migration_info call
492
    @type success: boolean
493
    @param success: whether the migration was a success or a failure
494

495
    """
496
    return self._SingleNodeCall(node, "finalize_migration",
497
                                [self._InstDict(instance), info, success])
498

    
499
  def call_instance_migrate(self, node, instance, target, live):
500
    """Migrate an instance.
501

502
    This is a single-node call.
503

504
    @type node: string
505
    @param node: the node on which the instance is currently running
506
    @type instance: C{objects.Instance}
507
    @param instance: the instance definition
508
    @type target: string
509
    @param target: the target node name
510
    @type live: boolean
511
    @param live: whether the migration should be done live or not (the
512
        interpretation of this parameter is left to the hypervisor)
513

514
    """
515
    return self._SingleNodeCall(node, "instance_migrate",
516
                                [self._InstDict(instance), target, live])
517

    
518
  def call_instance_reboot(self, node, instance, reboot_type, extra_args):
519
    """Reboots an instance.
520

521
    This is a single-node call.
522

523
    """
524
    return self._SingleNodeCall(node, "instance_reboot",
525
                                [self._InstDict(instance), reboot_type,
526
                                 extra_args])
527

    
528
  def call_instance_os_add(self, node, inst):
529
    """Installs an OS on the given instance.
530

531
    This is a single-node call.
532

533
    """
534
    return self._SingleNodeCall(node, "instance_os_add",
535
                                [self._InstDict(inst)])
536

    
537
  def call_instance_run_rename(self, node, inst, old_name):
538
    """Run the OS rename script for an instance.
539

540
    This is a single-node call.
541

542
    """
543
    return self._SingleNodeCall(node, "instance_run_rename",
544
                                [self._InstDict(inst), old_name])
545

    
546
  def call_instance_info(self, node, instance, hname):
547
    """Returns information about a single instance.
548

549
    This is a single-node call.
550

551
    @type node: list
552
    @param node: the list of nodes to query
553
    @type instance: string
554
    @param instance: the instance name
555
    @type hname: string
556
    @param hname: the hypervisor type of the instance
557

558
    """
559
    return self._SingleNodeCall(node, "instance_info", [instance, hname])
560

    
561
  def call_instance_migratable(self, node, instance):
562
    """Checks whether the given instance can be migrated.
563

564
    This is a single-node call.
565

566
    @param node: the node to query
567
    @type instance: L{objects.Instance}
568
    @param instance: the instance to check
569

570

571
    """
572
    return self._SingleNodeCall(node, "instance_migratable",
573
                                [self._InstDict(instance)])
574

    
575
  def call_all_instances_info(self, node_list, hypervisor_list):
576
    """Returns information about all instances on the given nodes.
577

578
    This is a multi-node call.
579

580
    @type node_list: list
581
    @param node_list: the list of nodes to query
582
    @type hypervisor_list: list
583
    @param hypervisor_list: the hypervisors to query for instances
584

585
    """
586
    return self._MultiNodeCall(node_list, "all_instances_info",
587
                               [hypervisor_list])
588

    
589
  def call_instance_list(self, node_list, hypervisor_list):
590
    """Returns the list of running instances on a given node.
591

592
    This is a multi-node call.
593

594
    @type node_list: list
595
    @param node_list: the list of nodes to query
596
    @type hypervisor_list: list
597
    @param hypervisor_list: the hypervisors to query for instances
598

599
    """
600
    return self._MultiNodeCall(node_list, "instance_list", [hypervisor_list])
601

    
602
  def call_node_tcp_ping(self, node, source, target, port, timeout,
603
                         live_port_needed):
604
    """Do a TcpPing on the remote node
605

606
    This is a single-node call.
607

608
    """
609
    return self._SingleNodeCall(node, "node_tcp_ping",
610
                                [source, target, port, timeout,
611
                                 live_port_needed])
612

    
613
  def call_node_has_ip_address(self, node, address):
614
    """Checks if a node has the given IP address.
615

616
    This is a single-node call.
617

618
    """
619
    return self._SingleNodeCall(node, "node_has_ip_address", [address])
620

    
621
  def call_node_info(self, node_list, vg_name, hypervisor_type):
622
    """Return node information.
623

624
    This will return memory information and volume group size and free
625
    space.
626

627
    This is a multi-node call.
628

629
    @type node_list: list
630
    @param node_list: the list of nodes to query
631
    @type vg_name: C{string}
632
    @param vg_name: the name of the volume group to ask for disk space
633
        information
634
    @type hypervisor_type: C{str}
635
    @param hypervisor_type: the name of the hypervisor to ask for
636
        memory information
637

638
    """
639
    retux = self._MultiNodeCall(node_list, "node_info",
640
                                [vg_name, hypervisor_type])
641

    
642
    for result in retux.itervalues():
643
      if result.failed or not isinstance(result.data, dict):
644
        result.data = {}
645
      if result.offline:
646
        log_name = None
647
      else:
648
        log_name = "call_node_info"
649

    
650
      utils.CheckDict(result.data, {
651
        'memory_total' : '-',
652
        'memory_dom0' : '-',
653
        'memory_free' : '-',
654
        'vg_size' : 'node_unreachable',
655
        'vg_free' : '-',
656
        }, log_name)
657
    return retux
658

    
659
  def call_node_add(self, node, dsa, dsapub, rsa, rsapub, ssh, sshpub):
660
    """Add a node to the cluster.
661

662
    This is a single-node call.
663

664
    """
665
    return self._SingleNodeCall(node, "node_add",
666
                                [dsa, dsapub, rsa, rsapub, ssh, sshpub])
667

    
668
  def call_node_verify(self, node_list, checkdict, cluster_name):
669
    """Request verification of given parameters.
670

671
    This is a multi-node call.
672

673
    """
674
    return self._MultiNodeCall(node_list, "node_verify",
675
                               [checkdict, cluster_name])
676

    
677
  @classmethod
678
  def call_node_start_master(cls, node, start_daemons):
679
    """Tells a node to activate itself as a master.
680

681
    This is a single-node call.
682

683
    """
684
    return cls._StaticSingleNodeCall(node, "node_start_master",
685
                                     [start_daemons])
686

    
687
  @classmethod
688
  def call_node_stop_master(cls, node, stop_daemons):
689
    """Tells a node to demote itself from master status.
690

691
    This is a single-node call.
692

693
    """
694
    return cls._StaticSingleNodeCall(node, "node_stop_master", [stop_daemons])
695

    
696
  @classmethod
697
  def call_master_info(cls, node_list):
698
    """Query master info.
699

700
    This is a multi-node call.
701

702
    """
703
    # TODO: should this method query down nodes?
704
    return cls._StaticMultiNodeCall(node_list, "master_info", [])
705

    
706
  def call_version(self, node_list):
707
    """Query node version.
708

709
    This is a multi-node call.
710

711
    """
712
    return self._MultiNodeCall(node_list, "version", [])
713

    
714
  def call_blockdev_create(self, node, bdev, size, owner, on_primary, info):
715
    """Request creation of a given block device.
716

717
    This is a single-node call.
718

719
    """
720
    return self._SingleNodeCall(node, "blockdev_create",
721
                                [bdev.ToDict(), size, owner, on_primary, info])
722

    
723
  def call_blockdev_remove(self, node, bdev):
724
    """Request removal of a given block device.
725

726
    This is a single-node call.
727

728
    """
729
    return self._SingleNodeCall(node, "blockdev_remove", [bdev.ToDict()])
730

    
731
  def call_blockdev_rename(self, node, devlist):
732
    """Request rename of the given block devices.
733

734
    This is a single-node call.
735

736
    """
737
    return self._SingleNodeCall(node, "blockdev_rename",
738
                                [(d.ToDict(), uid) for d, uid in devlist])
739

    
740
  def call_blockdev_assemble(self, node, disk, owner, on_primary):
741
    """Request assembling of a given block device.
742

743
    This is a single-node call.
744

745
    """
746
    return self._SingleNodeCall(node, "blockdev_assemble",
747
                                [disk.ToDict(), owner, on_primary])
748

    
749
  def call_blockdev_shutdown(self, node, disk):
750
    """Request shutdown of a given block device.
751

752
    This is a single-node call.
753

754
    """
755
    return self._SingleNodeCall(node, "blockdev_shutdown", [disk.ToDict()])
756

    
757
  def call_blockdev_addchildren(self, node, bdev, ndevs):
758
    """Request adding a list of children to a (mirroring) device.
759

760
    This is a single-node call.
761

762
    """
763
    return self._SingleNodeCall(node, "blockdev_addchildren",
764
                                [bdev.ToDict(),
765
                                 [disk.ToDict() for disk in ndevs]])
766

    
767
  def call_blockdev_removechildren(self, node, bdev, ndevs):
768
    """Request removing a list of children from a (mirroring) device.
769

770
    This is a single-node call.
771

772
    """
773
    return self._SingleNodeCall(node, "blockdev_removechildren",
774
                                [bdev.ToDict(),
775
                                 [disk.ToDict() for disk in ndevs]])
776

    
777
  def call_blockdev_getmirrorstatus(self, node, disks):
778
    """Request status of a (mirroring) device.
779

780
    This is a single-node call.
781

782
    """
783
    return self._SingleNodeCall(node, "blockdev_getmirrorstatus",
784
                                [dsk.ToDict() for dsk in disks])
785

    
786
  def call_blockdev_find(self, node, disk):
787
    """Request identification of a given block device.
788

789
    This is a single-node call.
790

791
    """
792
    return self._SingleNodeCall(node, "blockdev_find", [disk.ToDict()])
793

    
794
  def call_blockdev_close(self, node, instance_name, disks):
795
    """Closes the given block devices.
796

797
    This is a single-node call.
798

799
    """
800
    params = [instance_name, [cf.ToDict() for cf in disks]]
801
    return self._SingleNodeCall(node, "blockdev_close", params)
802

    
803
  def call_drbd_disconnect_net(self, node_list, nodes_ip, disks):
804
    """Disconnects the network of the given drbd devices.
805

806
    This is a multi-node call.
807

808
    """
809
    return self._MultiNodeCall(node_list, "drbd_disconnect_net",
810
                               [nodes_ip, [cf.ToDict() for cf in disks]])
811

    
812
  def call_drbd_attach_net(self, node_list, nodes_ip,
813
                           disks, instance_name, multimaster):
814
    """Disconnects the given drbd devices.
815

816
    This is a multi-node call.
817

818
    """
819
    return self._MultiNodeCall(node_list, "drbd_attach_net",
820
                               [nodes_ip, [cf.ToDict() for cf in disks],
821
                                instance_name, multimaster])
822

    
823
  def call_drbd_wait_sync(self, node_list, nodes_ip, disks):
824
    """Waits for the synchronization of drbd devices is complete.
825

826
    This is a multi-node call.
827

828
    """
829
    return self._MultiNodeCall(node_list, "drbd_wait_sync",
830
                               [nodes_ip, [cf.ToDict() for cf in disks]])
831

    
832
  @classmethod
833
  def call_upload_file(cls, node_list, file_name, address_list=None):
834
    """Upload a file.
835

836
    The node will refuse the operation in case the file is not on the
837
    approved file list.
838

839
    This is a multi-node call.
840

841
    @type node_list: list
842
    @param node_list: the list of node names to upload to
843
    @type file_name: str
844
    @param file_name: the filename to upload
845
    @type address_list: list or None
846
    @keyword address_list: an optional list of node addresses, in order
847
        to optimize the RPC speed
848

849
    """
850
    file_contents = utils.ReadFile(file_name)
851
    data = cls._Compress(file_contents)
852
    st = os.stat(file_name)
853
    params = [file_name, data, st.st_mode, st.st_uid, st.st_gid,
854
              st.st_atime, st.st_mtime]
855
    return cls._StaticMultiNodeCall(node_list, "upload_file", params,
856
                                    address_list=address_list)
857

    
858
  @classmethod
859
  def call_write_ssconf_files(cls, node_list, values):
860
    """Write ssconf files.
861

862
    This is a multi-node call.
863

864
    """
865
    return cls._StaticMultiNodeCall(node_list, "write_ssconf_files", [values])
866

    
867
  def call_os_diagnose(self, node_list):
868
    """Request a diagnose of OS definitions.
869

870
    This is a multi-node call.
871

872
    """
873
    result = self._MultiNodeCall(node_list, "os_diagnose", [])
874

    
875
    for node_result in result.values():
876
      if not node_result.failed and node_result.data:
877
        node_result.data = [objects.OS.FromDict(oss)
878
                            for oss in node_result.data]
879
    return result
880

    
881
  def call_os_get(self, node, name):
882
    """Returns an OS definition.
883

884
    This is a single-node call.
885

886
    """
887
    result = self._SingleNodeCall(node, "os_get", [name])
888
    if not result.failed and isinstance(result.data, dict):
889
      result.data = objects.OS.FromDict(result.data)
890
    return result
891

    
892
  def call_hooks_runner(self, node_list, hpath, phase, env):
893
    """Call the hooks runner.
894

895
    Args:
896
      - op: the OpCode instance
897
      - env: a dictionary with the environment
898

899
    This is a multi-node call.
900

901
    """
902
    params = [hpath, phase, env]
903
    return self._MultiNodeCall(node_list, "hooks_runner", params)
904

    
905
  def call_iallocator_runner(self, node, name, idata):
906
    """Call an iallocator on a remote node
907

908
    Args:
909
      - name: the iallocator name
910
      - input: the json-encoded input string
911

912
    This is a single-node call.
913

914
    """
915
    return self._SingleNodeCall(node, "iallocator_runner", [name, idata])
916

    
917
  def call_blockdev_grow(self, node, cf_bdev, amount):
918
    """Request a snapshot of the given block device.
919

920
    This is a single-node call.
921

922
    """
923
    return self._SingleNodeCall(node, "blockdev_grow",
924
                                [cf_bdev.ToDict(), amount])
925

    
926
  def call_blockdev_snapshot(self, node, cf_bdev):
927
    """Request a snapshot of the given block device.
928

929
    This is a single-node call.
930

931
    """
932
    return self._SingleNodeCall(node, "blockdev_snapshot", [cf_bdev.ToDict()])
933

    
934
  def call_snapshot_export(self, node, snap_bdev, dest_node, instance,
935
                           cluster_name, idx):
936
    """Request the export of a given snapshot.
937

938
    This is a single-node call.
939

940
    """
941
    return self._SingleNodeCall(node, "snapshot_export",
942
                                [snap_bdev.ToDict(), dest_node,
943
                                 self._InstDict(instance), cluster_name, idx])
944

    
945
  def call_finalize_export(self, node, instance, snap_disks):
946
    """Request the completion of an export operation.
947

948
    This writes the export config file, etc.
949

950
    This is a single-node call.
951

952
    """
953
    flat_disks = []
954
    for disk in snap_disks:
955
      flat_disks.append(disk.ToDict())
956

    
957
    return self._SingleNodeCall(node, "finalize_export",
958
                                [self._InstDict(instance), flat_disks])
959

    
960
  def call_export_info(self, node, path):
961
    """Queries the export information in a given path.
962

963
    This is a single-node call.
964

965
    """
966
    result = self._SingleNodeCall(node, "export_info", [path])
967
    if not result.failed and result.data:
968
      result.data = objects.SerializableConfigParser.Loads(str(result.data))
969
    return result
970

    
971
  def call_instance_os_import(self, node, inst, src_node, src_images,
972
                              cluster_name):
973
    """Request the import of a backup into an instance.
974

975
    This is a single-node call.
976

977
    """
978
    return self._SingleNodeCall(node, "instance_os_import",
979
                                [self._InstDict(inst), src_node, src_images,
980
                                 cluster_name])
981

    
982
  def call_export_list(self, node_list):
983
    """Gets the stored exports list.
984

985
    This is a multi-node call.
986

987
    """
988
    return self._MultiNodeCall(node_list, "export_list", [])
989

    
990
  def call_export_remove(self, node, export):
991
    """Requests removal of a given export.
992

993
    This is a single-node call.
994

995
    """
996
    return self._SingleNodeCall(node, "export_remove", [export])
997

    
998
  @classmethod
999
  def call_node_leave_cluster(cls, node):
1000
    """Requests a node to clean the cluster information it has.
1001

1002
    This will remove the configuration information from the ganeti data
1003
    dir.
1004

1005
    This is a single-node call.
1006

1007
    """
1008
    return cls._StaticSingleNodeCall(node, "node_leave_cluster", [])
1009

    
1010
  def call_node_volumes(self, node_list):
1011
    """Gets all volumes on node(s).
1012

1013
    This is a multi-node call.
1014

1015
    """
1016
    return self._MultiNodeCall(node_list, "node_volumes", [])
1017

    
1018
  def call_node_demote_from_mc(self, node):
1019
    """Demote a node from the master candidate role.
1020

1021
    This is a single-node call.
1022

1023
    """
1024
    return self._SingleNodeCall(node, "node_demote_from_mc", [])
1025

    
1026
  def call_test_delay(self, node_list, duration):
1027
    """Sleep for a fixed time on given node(s).
1028

1029
    This is a multi-node call.
1030

1031
    """
1032
    return self._MultiNodeCall(node_list, "test_delay", [duration])
1033

    
1034
  def call_file_storage_dir_create(self, node, file_storage_dir):
1035
    """Create the given file storage directory.
1036

1037
    This is a single-node call.
1038

1039
    """
1040
    return self._SingleNodeCall(node, "file_storage_dir_create",
1041
                                [file_storage_dir])
1042

    
1043
  def call_file_storage_dir_remove(self, node, file_storage_dir):
1044
    """Remove the given file storage directory.
1045

1046
    This is a single-node call.
1047

1048
    """
1049
    return self._SingleNodeCall(node, "file_storage_dir_remove",
1050
                                [file_storage_dir])
1051

    
1052
  def call_file_storage_dir_rename(self, node, old_file_storage_dir,
1053
                                   new_file_storage_dir):
1054
    """Rename file storage directory.
1055

1056
    This is a single-node call.
1057

1058
    """
1059
    return self._SingleNodeCall(node, "file_storage_dir_rename",
1060
                                [old_file_storage_dir, new_file_storage_dir])
1061

    
1062
  @classmethod
1063
  def call_jobqueue_update(cls, node_list, address_list, file_name, content):
1064
    """Update job queue.
1065

1066
    This is a multi-node call.
1067

1068
    """
1069
    return cls._StaticMultiNodeCall(node_list, "jobqueue_update",
1070
                                    [file_name, cls._Compress(content)],
1071
                                    address_list=address_list)
1072

    
1073
  @classmethod
1074
  def call_jobqueue_purge(cls, node):
1075
    """Purge job queue.
1076

1077
    This is a single-node call.
1078

1079
    """
1080
    return cls._StaticSingleNodeCall(node, "jobqueue_purge", [])
1081

    
1082
  @classmethod
1083
  def call_jobqueue_rename(cls, node_list, address_list, rename):
1084
    """Rename a job queue file.
1085

1086
    This is a multi-node call.
1087

1088
    """
1089
    return cls._StaticMultiNodeCall(node_list, "jobqueue_rename", rename,
1090
                                    address_list=address_list)
1091

    
1092
  @classmethod
1093
  def call_jobqueue_set_drain(cls, node_list, drain_flag):
1094
    """Set the drain flag on the queue.
1095

1096
    This is a multi-node call.
1097

1098
    @type node_list: list
1099
    @param node_list: the list of nodes to query
1100
    @type drain_flag: bool
1101
    @param drain_flag: if True, will set the drain flag, otherwise reset it.
1102

1103
    """
1104
    return cls._StaticMultiNodeCall(node_list, "jobqueue_set_drain",
1105
                                    [drain_flag])
1106

    
1107
  def call_hypervisor_validate_params(self, node_list, hvname, hvparams):
1108
    """Validate the hypervisor params.
1109

1110
    This is a multi-node call.
1111

1112
    @type node_list: list
1113
    @param node_list: the list of nodes to query
1114
    @type hvname: string
1115
    @param hvname: the hypervisor name
1116
    @type hvparams: dict
1117
    @param hvparams: the hypervisor parameters to be validated
1118

1119
    """
1120
    cluster = self._cfg.GetClusterInfo()
1121
    hv_full = cluster.FillDict(cluster.hvparams.get(hvname, {}), hvparams)
1122
    return self._MultiNodeCall(node_list, "hypervisor_validate_params",
1123
                               [hvname, hv_full])