Statistics
| Branch: | Tag: | Revision:

root / lib / rpc.py @ d3c8b360

History | View | Annotate | Download (32.6 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Inter-node RPC library.
23

24
"""
25

    
26
# pylint: disable-msg=C0103,R0201,R0904
27
# C0103: Invalid name, since call_ are not valid
28
# R0201: Method could be a function, we keep all rpcs instance methods
29
# as not to change them back and forth between static/instance methods
30
# if they need to start using instance attributes
31
# R0904: Too many public methods
32

    
33
import os
34
import socket
35
import logging
36
import zlib
37
import base64
38

    
39
from ganeti import utils
40
from ganeti import objects
41
from ganeti import http
42
from ganeti import serializer
43
from ganeti import constants
44
from ganeti import errors
45

    
46
import ganeti.http.client
47

    
48

    
49
# Module level variable
50
_http_manager = None
51

    
52

    
53
def Init():
54
  """Initializes the module-global HTTP client manager.
55

56
  Must be called before using any RPC function.
57

58
  """
59
  global _http_manager
60

    
61
  assert not _http_manager, "RPC module initialized more than once"
62

    
63
  _http_manager = http.client.HttpClientManager()
64

    
65

    
66
def Shutdown():
67
  """Stops the module-global HTTP client manager.
68

69
  Must be called before quitting the program.
70

71
  """
72
  global _http_manager
73

    
74
  if _http_manager:
75
    _http_manager.Shutdown()
76
    _http_manager = None
77

    
78

    
79
class RpcResult(object):
80
  """RPC Result class.
81

82
  This class holds an RPC result. It is needed since in multi-node
83
  calls we can't raise an exception just because one one out of many
84
  failed, and therefore we use this class to encapsulate the result.
85

86
  @ivar data: the data payload, for successfull results, or None
87
  @type failed: boolean
88
  @ivar failed: whether the operation failed at transport level (not
89
      application level on the remote node)
90
  @ivar call: the name of the RPC call
91
  @ivar node: the name of the node to which we made the call
92
  @ivar offline: whether the operation failed because the node was
93
      offline, as opposed to actual failure; offline=True will always
94
      imply failed=True, in order to allow simpler checking if
95
      the user doesn't care about the exact failure mode
96
  @ivar error: the error message if the call failed
97

98
  """
99
  def __init__(self, data=None, failed=False, offline=False,
100
               call=None, node=None):
101
    self.failed = failed
102
    self.offline = offline
103
    self.call = call
104
    self.node = node
105
    if offline:
106
      self.failed = True
107
      self.error = "Node is marked offline"
108
      self.data = self.payload = None
109
    elif failed:
110
      self.error = self._EnsureErr(data)
111
      self.data = self.payload = None
112
    else:
113
      self.data = data
114
      if not isinstance(self.data, (tuple, list)):
115
        self.error = ("RPC layer error: invalid result type (%s)" %
116
                      type(self.data))
117
      elif len(data) != 2:
118
        self.error = ("RPC layer error: invalid result length (%d), "
119
                      "expected 2" % len(self.data))
120
      elif not self.data[0]:
121
        self.error = self._EnsureErr(self.data[1])
122
      else:
123
        # finally success
124
        self.error = None
125
        self.payload = data[1]
126

    
127
  @staticmethod
128
  def _EnsureErr(val):
129
    """Helper to ensure we return a 'True' value for error."""
130
    if val:
131
      return val
132
    else:
133
      return "No error information"
134

    
135
  def Raise(self):
136
    """If the result has failed, raise an OpExecError.
137

138
    This is used so that LU code doesn't have to check for each
139
    result, but instead can call this function.
140

141
    """
142
    if self.failed:
143
      raise errors.OpExecError("Call '%s' to node '%s' has failed: %s" %
144
                               (self.call, self.node, self.error))
145

    
146
  def RemoteFailMsg(self):
147
    """Check if the remote procedure failed.
148

149
    @return: the fail_msg attribute
150

151
    """
152
    return self.error
153

    
154

    
155
class Client:
156
  """RPC Client class.
157

158
  This class, given a (remote) method name, a list of parameters and a
159
  list of nodes, will contact (in parallel) all nodes, and return a
160
  dict of results (key: node name, value: result).
161

162
  One current bug is that generic failure is still signalled by
163
  'False' result, which is not good. This overloading of values can
164
  cause bugs.
165

166
  """
167
  def __init__(self, procedure, body, port):
168
    self.procedure = procedure
169
    self.body = body
170
    self.port = port
171
    self.nc = {}
172

    
173
    self._ssl_params = \
174
      http.HttpSslParams(ssl_key_path=constants.SSL_CERT_FILE,
175
                         ssl_cert_path=constants.SSL_CERT_FILE)
176

    
177
  def ConnectList(self, node_list, address_list=None):
178
    """Add a list of nodes to the target nodes.
179

180
    @type node_list: list
181
    @param node_list: the list of node names to connect
182
    @type address_list: list or None
183
    @keyword address_list: either None or a list with node addresses,
184
        which must have the same length as the node list
185

186
    """
187
    if address_list is None:
188
      address_list = [None for _ in node_list]
189
    else:
190
      assert len(node_list) == len(address_list), \
191
             "Name and address lists should have the same length"
192
    for node, address in zip(node_list, address_list):
193
      self.ConnectNode(node, address)
194

    
195
  def ConnectNode(self, name, address=None):
196
    """Add a node to the target list.
197

198
    @type name: str
199
    @param name: the node name
200
    @type address: str
201
    @keyword address: the node address, if known
202

203
    """
204
    if address is None:
205
      address = name
206

    
207
    self.nc[name] = \
208
      http.client.HttpClientRequest(address, self.port, http.HTTP_PUT,
209
                                    "/%s" % self.procedure,
210
                                    post_data=self.body,
211
                                    ssl_params=self._ssl_params,
212
                                    ssl_verify_peer=True)
213

    
214
  def GetResults(self):
215
    """Call nodes and return results.
216

217
    @rtype: list
218
    @return: List of RPC results
219

220
    """
221
    assert _http_manager, "RPC module not intialized"
222

    
223
    _http_manager.ExecRequests(self.nc.values())
224

    
225
    results = {}
226

    
227
    for name, req in self.nc.iteritems():
228
      if req.success and req.resp_status_code == http.HTTP_OK:
229
        results[name] = RpcResult(data=serializer.LoadJson(req.resp_body),
230
                                  node=name, call=self.procedure)
231
        continue
232

    
233
      # TODO: Better error reporting
234
      if req.error:
235
        msg = req.error
236
      else:
237
        msg = req.resp_body
238

    
239
      logging.error("RPC error in %s from node %s: %s",
240
                    self.procedure, name, msg)
241
      results[name] = RpcResult(data=msg, failed=True, node=name,
242
                                call=self.procedure)
243

    
244
    return results
245

    
246

    
247
class RpcRunner(object):
248
  """RPC runner class"""
249

    
250
  def __init__(self, cfg):
251
    """Initialized the rpc runner.
252

253
    @type cfg:  C{config.ConfigWriter}
254
    @param cfg: the configuration object that will be used to get data
255
                about the cluster
256

257
    """
258
    self._cfg = cfg
259
    self.port = utils.GetNodeDaemonPort()
260

    
261
  def _InstDict(self, instance, hvp=None, bep=None):
262
    """Convert the given instance to a dict.
263

264
    This is done via the instance's ToDict() method and additionally
265
    we fill the hvparams with the cluster defaults.
266

267
    @type instance: L{objects.Instance}
268
    @param instance: an Instance object
269
    @type hvp: dict or None
270
    @param hvp: a dictionary with overriden hypervisor parameters
271
    @type bep: dict or None
272
    @param bep: a dictionary with overriden backend parameters
273
    @rtype: dict
274
    @return: the instance dict, with the hvparams filled with the
275
        cluster defaults
276

277
    """
278
    idict = instance.ToDict()
279
    cluster = self._cfg.GetClusterInfo()
280
    idict["hvparams"] = cluster.FillHV(instance)
281
    if hvp is not None:
282
      idict["hvparams"].update(hvp)
283
    idict["beparams"] = cluster.FillBE(instance)
284
    if bep is not None:
285
      idict["beparams"].update(bep)
286
    for nic in idict["nics"]:
287
      nic['nicparams'] = objects.FillDict(
288
        cluster.nicparams[constants.PP_DEFAULT],
289
        nic['nicparams'])
290
    return idict
291

    
292
  def _ConnectList(self, client, node_list, call):
293
    """Helper for computing node addresses.
294

295
    @type client: L{Client}
296
    @param client: a C{Client} instance
297
    @type node_list: list
298
    @param node_list: the node list we should connect
299
    @type call: string
300
    @param call: the name of the remote procedure call, for filling in
301
        correctly any eventual offline nodes' results
302

303
    """
304
    all_nodes = self._cfg.GetAllNodesInfo()
305
    name_list = []
306
    addr_list = []
307
    skip_dict = {}
308
    for node in node_list:
309
      if node in all_nodes:
310
        if all_nodes[node].offline:
311
          skip_dict[node] = RpcResult(node=node, offline=True, call=call)
312
          continue
313
        val = all_nodes[node].primary_ip
314
      else:
315
        val = None
316
      addr_list.append(val)
317
      name_list.append(node)
318
    if name_list:
319
      client.ConnectList(name_list, address_list=addr_list)
320
    return skip_dict
321

    
322
  def _ConnectNode(self, client, node, call):
323
    """Helper for computing one node's address.
324

325
    @type client: L{Client}
326
    @param client: a C{Client} instance
327
    @type node: str
328
    @param node: the node we should connect
329
    @type call: string
330
    @param call: the name of the remote procedure call, for filling in
331
        correctly any eventual offline nodes' results
332

333
    """
334
    node_info = self._cfg.GetNodeInfo(node)
335
    if node_info is not None:
336
      if node_info.offline:
337
        return RpcResult(node=node, offline=True, call=call)
338
      addr = node_info.primary_ip
339
    else:
340
      addr = None
341
    client.ConnectNode(node, address=addr)
342

    
343
  def _MultiNodeCall(self, node_list, procedure, args):
344
    """Helper for making a multi-node call
345

346
    """
347
    body = serializer.DumpJson(args, indent=False)
348
    c = Client(procedure, body, self.port)
349
    skip_dict = self._ConnectList(c, node_list, procedure)
350
    skip_dict.update(c.GetResults())
351
    return skip_dict
352

    
353
  @classmethod
354
  def _StaticMultiNodeCall(cls, node_list, procedure, args,
355
                           address_list=None):
356
    """Helper for making a multi-node static call
357

358
    """
359
    body = serializer.DumpJson(args, indent=False)
360
    c = Client(procedure, body, utils.GetNodeDaemonPort())
361
    c.ConnectList(node_list, address_list=address_list)
362
    return c.GetResults()
363

    
364
  def _SingleNodeCall(self, node, procedure, args):
365
    """Helper for making a single-node call
366

367
    """
368
    body = serializer.DumpJson(args, indent=False)
369
    c = Client(procedure, body, self.port)
370
    result = self._ConnectNode(c, node, procedure)
371
    if result is None:
372
      # we did connect, node is not offline
373
      result = c.GetResults()[node]
374
    return result
375

    
376
  @classmethod
377
  def _StaticSingleNodeCall(cls, node, procedure, args):
378
    """Helper for making a single-node static call
379

380
    """
381
    body = serializer.DumpJson(args, indent=False)
382
    c = Client(procedure, body, utils.GetNodeDaemonPort())
383
    c.ConnectNode(node)
384
    return c.GetResults()[node]
385

    
386
  @staticmethod
387
  def _Compress(data):
388
    """Compresses a string for transport over RPC.
389

390
    Small amounts of data are not compressed.
391

392
    @type data: str
393
    @param data: Data
394
    @rtype: tuple
395
    @return: Encoded data to send
396

397
    """
398
    # Small amounts of data are not compressed
399
    if len(data) < 512:
400
      return (constants.RPC_ENCODING_NONE, data)
401

    
402
    # Compress with zlib and encode in base64
403
    return (constants.RPC_ENCODING_ZLIB_BASE64,
404
            base64.b64encode(zlib.compress(data, 3)))
405

    
406
  #
407
  # Begin RPC calls
408
  #
409

    
410
  def call_volume_list(self, node_list, vg_name):
411
    """Gets the logical volumes present in a given volume group.
412

413
    This is a multi-node call.
414

415
    """
416
    return self._MultiNodeCall(node_list, "volume_list", [vg_name])
417

    
418
  def call_vg_list(self, node_list):
419
    """Gets the volume group list.
420

421
    This is a multi-node call.
422

423
    """
424
    return self._MultiNodeCall(node_list, "vg_list", [])
425

    
426
  def call_bridges_exist(self, node, bridges_list):
427
    """Checks if a node has all the bridges given.
428

429
    This method checks if all bridges given in the bridges_list are
430
    present on the remote node, so that an instance that uses interfaces
431
    on those bridges can be started.
432

433
    This is a single-node call.
434

435
    """
436
    return self._SingleNodeCall(node, "bridges_exist", [bridges_list])
437

    
438
  def call_instance_start(self, node, instance, hvp, bep):
439
    """Starts an instance.
440

441
    This is a single-node call.
442

443
    """
444
    idict = self._InstDict(instance, hvp=hvp, bep=bep)
445
    return self._SingleNodeCall(node, "instance_start", [idict])
446

    
447
  def call_instance_shutdown(self, node, instance):
448
    """Stops an instance.
449

450
    This is a single-node call.
451

452
    """
453
    return self._SingleNodeCall(node, "instance_shutdown",
454
                                [self._InstDict(instance)])
455

    
456
  def call_migration_info(self, node, instance):
457
    """Gather the information necessary to prepare an instance migration.
458

459
    This is a single-node call.
460

461
    @type node: string
462
    @param node: the node on which the instance is currently running
463
    @type instance: C{objects.Instance}
464
    @param instance: the instance definition
465

466
    """
467
    return self._SingleNodeCall(node, "migration_info",
468
                                [self._InstDict(instance)])
469

    
470
  def call_accept_instance(self, node, instance, info, target):
471
    """Prepare a node to accept an instance.
472

473
    This is a single-node call.
474

475
    @type node: string
476
    @param node: the target node for the migration
477
    @type instance: C{objects.Instance}
478
    @param instance: the instance definition
479
    @type info: opaque/hypervisor specific (string/data)
480
    @param info: result for the call_migration_info call
481
    @type target: string
482
    @param target: target hostname (usually ip address) (on the node itself)
483

484
    """
485
    return self._SingleNodeCall(node, "accept_instance",
486
                                [self._InstDict(instance), info, target])
487

    
488
  def call_finalize_migration(self, node, instance, info, success):
489
    """Finalize any target-node migration specific operation.
490

491
    This is called both in case of a successful migration and in case of error
492
    (in which case it should abort the migration).
493

494
    This is a single-node call.
495

496
    @type node: string
497
    @param node: the target node for the migration
498
    @type instance: C{objects.Instance}
499
    @param instance: the instance definition
500
    @type info: opaque/hypervisor specific (string/data)
501
    @param info: result for the call_migration_info call
502
    @type success: boolean
503
    @param success: whether the migration was a success or a failure
504

505
    """
506
    return self._SingleNodeCall(node, "finalize_migration",
507
                                [self._InstDict(instance), info, success])
508

    
509
  def call_instance_migrate(self, node, instance, target, live):
510
    """Migrate an instance.
511

512
    This is a single-node call.
513

514
    @type node: string
515
    @param node: the node on which the instance is currently running
516
    @type instance: C{objects.Instance}
517
    @param instance: the instance definition
518
    @type target: string
519
    @param target: the target node name
520
    @type live: boolean
521
    @param live: whether the migration should be done live or not (the
522
        interpretation of this parameter is left to the hypervisor)
523

524
    """
525
    return self._SingleNodeCall(node, "instance_migrate",
526
                                [self._InstDict(instance), target, live])
527

    
528
  def call_instance_reboot(self, node, instance, reboot_type):
529
    """Reboots an instance.
530

531
    This is a single-node call.
532

533
    """
534
    return self._SingleNodeCall(node, "instance_reboot",
535
                                [self._InstDict(instance), reboot_type])
536

    
537
  def call_instance_os_add(self, node, inst, reinstall):
538
    """Installs an OS on the given instance.
539

540
    This is a single-node call.
541

542
    """
543
    return self._SingleNodeCall(node, "instance_os_add",
544
                                [self._InstDict(inst), reinstall])
545

    
546
  def call_instance_run_rename(self, node, inst, old_name):
547
    """Run the OS rename script for an instance.
548

549
    This is a single-node call.
550

551
    """
552
    return self._SingleNodeCall(node, "instance_run_rename",
553
                                [self._InstDict(inst), old_name])
554

    
555
  def call_instance_info(self, node, instance, hname):
556
    """Returns information about a single instance.
557

558
    This is a single-node call.
559

560
    @type node: list
561
    @param node: the list of nodes to query
562
    @type instance: string
563
    @param instance: the instance name
564
    @type hname: string
565
    @param hname: the hypervisor type of the instance
566

567
    """
568
    return self._SingleNodeCall(node, "instance_info", [instance, hname])
569

    
570
  def call_instance_migratable(self, node, instance):
571
    """Checks whether the given instance can be migrated.
572

573
    This is a single-node call.
574

575
    @param node: the node to query
576
    @type instance: L{objects.Instance}
577
    @param instance: the instance to check
578

579

580
    """
581
    return self._SingleNodeCall(node, "instance_migratable",
582
                                [self._InstDict(instance)])
583

    
584
  def call_all_instances_info(self, node_list, hypervisor_list):
585
    """Returns information about all instances on the given nodes.
586

587
    This is a multi-node call.
588

589
    @type node_list: list
590
    @param node_list: the list of nodes to query
591
    @type hypervisor_list: list
592
    @param hypervisor_list: the hypervisors to query for instances
593

594
    """
595
    return self._MultiNodeCall(node_list, "all_instances_info",
596
                               [hypervisor_list])
597

    
598
  def call_instance_list(self, node_list, hypervisor_list):
599
    """Returns the list of running instances on a given node.
600

601
    This is a multi-node call.
602

603
    @type node_list: list
604
    @param node_list: the list of nodes to query
605
    @type hypervisor_list: list
606
    @param hypervisor_list: the hypervisors to query for instances
607

608
    """
609
    return self._MultiNodeCall(node_list, "instance_list", [hypervisor_list])
610

    
611
  def call_node_tcp_ping(self, node, source, target, port, timeout,
612
                         live_port_needed):
613
    """Do a TcpPing on the remote node
614

615
    This is a single-node call.
616

617
    """
618
    return self._SingleNodeCall(node, "node_tcp_ping",
619
                                [source, target, port, timeout,
620
                                 live_port_needed])
621

    
622
  def call_node_has_ip_address(self, node, address):
623
    """Checks if a node has the given IP address.
624

625
    This is a single-node call.
626

627
    """
628
    return self._SingleNodeCall(node, "node_has_ip_address", [address])
629

    
630
  def call_node_info(self, node_list, vg_name, hypervisor_type):
631
    """Return node information.
632

633
    This will return memory information and volume group size and free
634
    space.
635

636
    This is a multi-node call.
637

638
    @type node_list: list
639
    @param node_list: the list of nodes to query
640
    @type vg_name: C{string}
641
    @param vg_name: the name of the volume group to ask for disk space
642
        information
643
    @type hypervisor_type: C{str}
644
    @param hypervisor_type: the name of the hypervisor to ask for
645
        memory information
646

647
    """
648
    return self._MultiNodeCall(node_list, "node_info",
649
                               [vg_name, hypervisor_type])
650

    
651
  def call_node_add(self, node, dsa, dsapub, rsa, rsapub, ssh, sshpub):
652
    """Add a node to the cluster.
653

654
    This is a single-node call.
655

656
    """
657
    return self._SingleNodeCall(node, "node_add",
658
                                [dsa, dsapub, rsa, rsapub, ssh, sshpub])
659

    
660
  def call_node_verify(self, node_list, checkdict, cluster_name):
661
    """Request verification of given parameters.
662

663
    This is a multi-node call.
664

665
    """
666
    return self._MultiNodeCall(node_list, "node_verify",
667
                               [checkdict, cluster_name])
668

    
669
  @classmethod
670
  def call_node_start_master(cls, node, start_daemons):
671
    """Tells a node to activate itself as a master.
672

673
    This is a single-node call.
674

675
    """
676
    return cls._StaticSingleNodeCall(node, "node_start_master",
677
                                     [start_daemons])
678

    
679
  @classmethod
680
  def call_node_stop_master(cls, node, stop_daemons):
681
    """Tells a node to demote itself from master status.
682

683
    This is a single-node call.
684

685
    """
686
    return cls._StaticSingleNodeCall(node, "node_stop_master", [stop_daemons])
687

    
688
  @classmethod
689
  def call_master_info(cls, node_list):
690
    """Query master info.
691

692
    This is a multi-node call.
693

694
    """
695
    # TODO: should this method query down nodes?
696
    return cls._StaticMultiNodeCall(node_list, "master_info", [])
697

    
698
  def call_version(self, node_list):
699
    """Query node version.
700

701
    This is a multi-node call.
702

703
    """
704
    return self._MultiNodeCall(node_list, "version", [])
705

    
706
  def call_blockdev_create(self, node, bdev, size, owner, on_primary, info):
707
    """Request creation of a given block device.
708

709
    This is a single-node call.
710

711
    """
712
    return self._SingleNodeCall(node, "blockdev_create",
713
                                [bdev.ToDict(), size, owner, on_primary, info])
714

    
715
  def call_blockdev_remove(self, node, bdev):
716
    """Request removal of a given block device.
717

718
    This is a single-node call.
719

720
    """
721
    return self._SingleNodeCall(node, "blockdev_remove", [bdev.ToDict()])
722

    
723
  def call_blockdev_rename(self, node, devlist):
724
    """Request rename of the given block devices.
725

726
    This is a single-node call.
727

728
    """
729
    return self._SingleNodeCall(node, "blockdev_rename",
730
                                [(d.ToDict(), uid) for d, uid in devlist])
731

    
732
  def call_blockdev_assemble(self, node, disk, owner, on_primary):
733
    """Request assembling of a given block device.
734

735
    This is a single-node call.
736

737
    """
738
    return self._SingleNodeCall(node, "blockdev_assemble",
739
                                [disk.ToDict(), owner, on_primary])
740

    
741
  def call_blockdev_shutdown(self, node, disk):
742
    """Request shutdown of a given block device.
743

744
    This is a single-node call.
745

746
    """
747
    return self._SingleNodeCall(node, "blockdev_shutdown", [disk.ToDict()])
748

    
749
  def call_blockdev_addchildren(self, node, bdev, ndevs):
750
    """Request adding a list of children to a (mirroring) device.
751

752
    This is a single-node call.
753

754
    """
755
    return self._SingleNodeCall(node, "blockdev_addchildren",
756
                                [bdev.ToDict(),
757
                                 [disk.ToDict() for disk in ndevs]])
758

    
759
  def call_blockdev_removechildren(self, node, bdev, ndevs):
760
    """Request removing a list of children from a (mirroring) device.
761

762
    This is a single-node call.
763

764
    """
765
    return self._SingleNodeCall(node, "blockdev_removechildren",
766
                                [bdev.ToDict(),
767
                                 [disk.ToDict() for disk in ndevs]])
768

    
769
  def call_blockdev_getmirrorstatus(self, node, disks):
770
    """Request status of a (mirroring) device.
771

772
    This is a single-node call.
773

774
    """
775
    return self._SingleNodeCall(node, "blockdev_getmirrorstatus",
776
                                [dsk.ToDict() for dsk in disks])
777

    
778
  def call_blockdev_find(self, node, disk):
779
    """Request identification of a given block device.
780

781
    This is a single-node call.
782

783
    """
784
    return self._SingleNodeCall(node, "blockdev_find", [disk.ToDict()])
785

    
786
  def call_blockdev_close(self, node, instance_name, disks):
787
    """Closes the given block devices.
788

789
    This is a single-node call.
790

791
    """
792
    params = [instance_name, [cf.ToDict() for cf in disks]]
793
    return self._SingleNodeCall(node, "blockdev_close", params)
794

    
795
  def call_drbd_disconnect_net(self, node_list, nodes_ip, disks):
796
    """Disconnects the network of the given drbd devices.
797

798
    This is a multi-node call.
799

800
    """
801
    return self._MultiNodeCall(node_list, "drbd_disconnect_net",
802
                               [nodes_ip, [cf.ToDict() for cf in disks]])
803

    
804
  def call_drbd_attach_net(self, node_list, nodes_ip,
805
                           disks, instance_name, multimaster):
806
    """Disconnects the given drbd devices.
807

808
    This is a multi-node call.
809

810
    """
811
    return self._MultiNodeCall(node_list, "drbd_attach_net",
812
                               [nodes_ip, [cf.ToDict() for cf in disks],
813
                                instance_name, multimaster])
814

    
815
  def call_drbd_wait_sync(self, node_list, nodes_ip, disks):
816
    """Waits for the synchronization of drbd devices is complete.
817

818
    This is a multi-node call.
819

820
    """
821
    return self._MultiNodeCall(node_list, "drbd_wait_sync",
822
                               [nodes_ip, [cf.ToDict() for cf in disks]])
823

    
824
  @classmethod
825
  def call_upload_file(cls, node_list, file_name, address_list=None):
826
    """Upload a file.
827

828
    The node will refuse the operation in case the file is not on the
829
    approved file list.
830

831
    This is a multi-node call.
832

833
    @type node_list: list
834
    @param node_list: the list of node names to upload to
835
    @type file_name: str
836
    @param file_name: the filename to upload
837
    @type address_list: list or None
838
    @keyword address_list: an optional list of node addresses, in order
839
        to optimize the RPC speed
840

841
    """
842
    file_contents = utils.ReadFile(file_name)
843
    data = cls._Compress(file_contents)
844
    st = os.stat(file_name)
845
    params = [file_name, data, st.st_mode, st.st_uid, st.st_gid,
846
              st.st_atime, st.st_mtime]
847
    return cls._StaticMultiNodeCall(node_list, "upload_file", params,
848
                                    address_list=address_list)
849

    
850
  @classmethod
851
  def call_write_ssconf_files(cls, node_list, values):
852
    """Write ssconf files.
853

854
    This is a multi-node call.
855

856
    """
857
    return cls._StaticMultiNodeCall(node_list, "write_ssconf_files", [values])
858

    
859
  def call_os_diagnose(self, node_list):
860
    """Request a diagnose of OS definitions.
861

862
    This is a multi-node call.
863

864
    """
865
    return self._MultiNodeCall(node_list, "os_diagnose", [])
866

    
867
  def call_os_get(self, node, name):
868
    """Returns an OS definition.
869

870
    This is a single-node call.
871

872
    """
873
    result = self._SingleNodeCall(node, "os_get", [name])
874
    if not result.failed and isinstance(result.data, dict):
875
      result.data = objects.OS.FromDict(result.data)
876
    return result
877

    
878
  def call_hooks_runner(self, node_list, hpath, phase, env):
879
    """Call the hooks runner.
880

881
    Args:
882
      - op: the OpCode instance
883
      - env: a dictionary with the environment
884

885
    This is a multi-node call.
886

887
    """
888
    params = [hpath, phase, env]
889
    return self._MultiNodeCall(node_list, "hooks_runner", params)
890

    
891
  def call_iallocator_runner(self, node, name, idata):
892
    """Call an iallocator on a remote node
893

894
    Args:
895
      - name: the iallocator name
896
      - input: the json-encoded input string
897

898
    This is a single-node call.
899

900
    """
901
    return self._SingleNodeCall(node, "iallocator_runner", [name, idata])
902

    
903
  def call_blockdev_grow(self, node, cf_bdev, amount):
904
    """Request a snapshot of the given block device.
905

906
    This is a single-node call.
907

908
    """
909
    return self._SingleNodeCall(node, "blockdev_grow",
910
                                [cf_bdev.ToDict(), amount])
911

    
912
  def call_blockdev_snapshot(self, node, cf_bdev):
913
    """Request a snapshot of the given block device.
914

915
    This is a single-node call.
916

917
    """
918
    return self._SingleNodeCall(node, "blockdev_snapshot", [cf_bdev.ToDict()])
919

    
920
  def call_snapshot_export(self, node, snap_bdev, dest_node, instance,
921
                           cluster_name, idx):
922
    """Request the export of a given snapshot.
923

924
    This is a single-node call.
925

926
    """
927
    return self._SingleNodeCall(node, "snapshot_export",
928
                                [snap_bdev.ToDict(), dest_node,
929
                                 self._InstDict(instance), cluster_name, idx])
930

    
931
  def call_finalize_export(self, node, instance, snap_disks):
932
    """Request the completion of an export operation.
933

934
    This writes the export config file, etc.
935

936
    This is a single-node call.
937

938
    """
939
    flat_disks = []
940
    for disk in snap_disks:
941
      flat_disks.append(disk.ToDict())
942

    
943
    return self._SingleNodeCall(node, "finalize_export",
944
                                [self._InstDict(instance), flat_disks])
945

    
946
  def call_export_info(self, node, path):
947
    """Queries the export information in a given path.
948

949
    This is a single-node call.
950

951
    """
952
    return self._SingleNodeCall(node, "export_info", [path])
953

    
954
  def call_instance_os_import(self, node, inst, src_node, src_images,
955
                              cluster_name):
956
    """Request the import of a backup into an instance.
957

958
    This is a single-node call.
959

960
    """
961
    return self._SingleNodeCall(node, "instance_os_import",
962
                                [self._InstDict(inst), src_node, src_images,
963
                                 cluster_name])
964

    
965
  def call_export_list(self, node_list):
966
    """Gets the stored exports list.
967

968
    This is a multi-node call.
969

970
    """
971
    return self._MultiNodeCall(node_list, "export_list", [])
972

    
973
  def call_export_remove(self, node, export):
974
    """Requests removal of a given export.
975

976
    This is a single-node call.
977

978
    """
979
    return self._SingleNodeCall(node, "export_remove", [export])
980

    
981
  @classmethod
982
  def call_node_leave_cluster(cls, node):
983
    """Requests a node to clean the cluster information it has.
984

985
    This will remove the configuration information from the ganeti data
986
    dir.
987

988
    This is a single-node call.
989

990
    """
991
    return cls._StaticSingleNodeCall(node, "node_leave_cluster", [])
992

    
993
  def call_node_volumes(self, node_list):
994
    """Gets all volumes on node(s).
995

996
    This is a multi-node call.
997

998
    """
999
    return self._MultiNodeCall(node_list, "node_volumes", [])
1000

    
1001
  def call_node_demote_from_mc(self, node):
1002
    """Demote a node from the master candidate role.
1003

1004
    This is a single-node call.
1005

1006
    """
1007
    return self._SingleNodeCall(node, "node_demote_from_mc", [])
1008

    
1009

    
1010
  def call_node_powercycle(self, node, hypervisor):
1011
    """Tries to powercycle a node.
1012

1013
    This is a single-node call.
1014

1015
    """
1016
    return self._SingleNodeCall(node, "node_powercycle", [hypervisor])
1017

    
1018

    
1019
  def call_test_delay(self, node_list, duration):
1020
    """Sleep for a fixed time on given node(s).
1021

1022
    This is a multi-node call.
1023

1024
    """
1025
    return self._MultiNodeCall(node_list, "test_delay", [duration])
1026

    
1027
  def call_file_storage_dir_create(self, node, file_storage_dir):
1028
    """Create the given file storage directory.
1029

1030
    This is a single-node call.
1031

1032
    """
1033
    return self._SingleNodeCall(node, "file_storage_dir_create",
1034
                                [file_storage_dir])
1035

    
1036
  def call_file_storage_dir_remove(self, node, file_storage_dir):
1037
    """Remove the given file storage directory.
1038

1039
    This is a single-node call.
1040

1041
    """
1042
    return self._SingleNodeCall(node, "file_storage_dir_remove",
1043
                                [file_storage_dir])
1044

    
1045
  def call_file_storage_dir_rename(self, node, old_file_storage_dir,
1046
                                   new_file_storage_dir):
1047
    """Rename file storage directory.
1048

1049
    This is a single-node call.
1050

1051
    """
1052
    return self._SingleNodeCall(node, "file_storage_dir_rename",
1053
                                [old_file_storage_dir, new_file_storage_dir])
1054

    
1055
  @classmethod
1056
  def call_jobqueue_update(cls, node_list, address_list, file_name, content):
1057
    """Update job queue.
1058

1059
    This is a multi-node call.
1060

1061
    """
1062
    return cls._StaticMultiNodeCall(node_list, "jobqueue_update",
1063
                                    [file_name, cls._Compress(content)],
1064
                                    address_list=address_list)
1065

    
1066
  @classmethod
1067
  def call_jobqueue_purge(cls, node):
1068
    """Purge job queue.
1069

1070
    This is a single-node call.
1071

1072
    """
1073
    return cls._StaticSingleNodeCall(node, "jobqueue_purge", [])
1074

    
1075
  @classmethod
1076
  def call_jobqueue_rename(cls, node_list, address_list, rename):
1077
    """Rename a job queue file.
1078

1079
    This is a multi-node call.
1080

1081
    """
1082
    return cls._StaticMultiNodeCall(node_list, "jobqueue_rename", rename,
1083
                                    address_list=address_list)
1084

    
1085
  @classmethod
1086
  def call_jobqueue_set_drain(cls, node_list, drain_flag):
1087
    """Set the drain flag on the queue.
1088

1089
    This is a multi-node call.
1090

1091
    @type node_list: list
1092
    @param node_list: the list of nodes to query
1093
    @type drain_flag: bool
1094
    @param drain_flag: if True, will set the drain flag, otherwise reset it.
1095

1096
    """
1097
    return cls._StaticMultiNodeCall(node_list, "jobqueue_set_drain",
1098
                                    [drain_flag])
1099

    
1100
  def call_hypervisor_validate_params(self, node_list, hvname, hvparams):
1101
    """Validate the hypervisor params.
1102

1103
    This is a multi-node call.
1104

1105
    @type node_list: list
1106
    @param node_list: the list of nodes to query
1107
    @type hvname: string
1108
    @param hvname: the hypervisor name
1109
    @type hvparams: dict
1110
    @param hvparams: the hypervisor parameters to be validated
1111

1112
    """
1113
    cluster = self._cfg.GetClusterInfo()
1114
    hv_full = objects.FillDict(cluster.hvparams.get(hvname, {}), hvparams)
1115
    return self._MultiNodeCall(node_list, "hypervisor_validate_params",
1116
                               [hvname, hv_full])