Statistics
| Branch: | Tag: | Revision:

root / lib / rpc.py @ 4c4e4e1e

History | View | Annotate | Download (32.8 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Inter-node RPC library.
23

24
"""
25

    
26
# pylint: disable-msg=C0103,R0201,R0904
27
# C0103: Invalid name, since call_ are not valid
28
# R0201: Method could be a function, we keep all rpcs instance methods
29
# as not to change them back and forth between static/instance methods
30
# if they need to start using instance attributes
31
# R0904: Too many public methods
32

    
33
import os
34
import socket
35
import logging
36
import zlib
37
import base64
38

    
39
from ganeti import utils
40
from ganeti import objects
41
from ganeti import http
42
from ganeti import serializer
43
from ganeti import constants
44
from ganeti import errors
45

    
46
import ganeti.http.client
47

    
48

    
49
# Module level variable
50
_http_manager = None
51

    
52

    
53
def Init():
54
  """Initializes the module-global HTTP client manager.
55

56
  Must be called before using any RPC function.
57

58
  """
59
  global _http_manager
60

    
61
  assert not _http_manager, "RPC module initialized more than once"
62

    
63
  _http_manager = http.client.HttpClientManager()
64

    
65

    
66
def Shutdown():
67
  """Stops the module-global HTTP client manager.
68

69
  Must be called before quitting the program.
70

71
  """
72
  global _http_manager
73

    
74
  if _http_manager:
75
    _http_manager.Shutdown()
76
    _http_manager = None
77

    
78

    
79
class RpcResult(object):
80
  """RPC Result class.
81

82
  This class holds an RPC result. It is needed since in multi-node
83
  calls we can't raise an exception just because one one out of many
84
  failed, and therefore we use this class to encapsulate the result.
85

86
  @ivar data: the data payload, for successfull results, or None
87
  @type failed: boolean
88
  @ivar failed: whether the operation failed at transport level (not
89
      application level on the remote node)
90
  @ivar call: the name of the RPC call
91
  @ivar node: the name of the node to which we made the call
92
  @ivar offline: whether the operation failed because the node was
93
      offline, as opposed to actual failure; offline=True will always
94
      imply failed=True, in order to allow simpler checking if
95
      the user doesn't care about the exact failure mode
96
  @ivar fail_msg: the error message if the call failed
97

98
  """
99
  def __init__(self, data=None, failed=False, offline=False,
100
               call=None, node=None):
101
    self.failed = failed
102
    self.offline = offline
103
    self.call = call
104
    self.node = node
105
    if offline:
106
      self.failed = True
107
      self.fail_msg = "Node is marked offline"
108
      self.data = self.payload = None
109
    elif failed:
110
      self.fail_msg = self._EnsureErr(data)
111
      self.data = self.payload = None
112
    else:
113
      self.data = data
114
      if not isinstance(self.data, (tuple, list)):
115
        self.fail_msg = ("RPC layer error: invalid result type (%s)" %
116
                         type(self.data))
117
      elif len(data) != 2:
118
        self.fail_msg = ("RPC layer error: invalid result length (%d), "
119
                         "expected 2" % len(self.data))
120
      elif not self.data[0]:
121
        self.fail_msg = self._EnsureErr(self.data[1])
122
      else:
123
        # finally success
124
        self.fail_msg = None
125
        self.payload = data[1]
126

    
127
  @staticmethod
128
  def _EnsureErr(val):
129
    """Helper to ensure we return a 'True' value for error."""
130
    if val:
131
      return val
132
    else:
133
      return "No error information"
134

    
135
  def Raise(self, msg, prereq=False):
136
    """If the result has failed, raise an OpExecError.
137

138
    This is used so that LU code doesn't have to check for each
139
    result, but instead can call this function.
140

141
    """
142
    if not self.fail_msg:
143
      return
144

    
145
    if not msg: # one could pass None for default message
146
      msg = ("Call '%s' to node '%s' has failed: %s" %
147
             (self.call, self.node, self.fail_msg))
148
    else:
149
      msg = "%s: %s" % (msg, self.fail_msg)
150
    if prereq:
151
      ec = errors.OpPrereqError
152
    else:
153
      ec = errors.OpExecError
154
    raise ec(msg)
155

    
156
  def RemoteFailMsg(self):
157
    """Check if the remote procedure failed.
158

159
    @return: the fail_msg attribute
160

161
    """
162
    return self.fail_msg
163

    
164

    
165
class Client:
166
  """RPC Client class.
167

168
  This class, given a (remote) method name, a list of parameters and a
169
  list of nodes, will contact (in parallel) all nodes, and return a
170
  dict of results (key: node name, value: result).
171

172
  One current bug is that generic failure is still signalled by
173
  'False' result, which is not good. This overloading of values can
174
  cause bugs.
175

176
  """
177
  def __init__(self, procedure, body, port):
178
    self.procedure = procedure
179
    self.body = body
180
    self.port = port
181
    self.nc = {}
182

    
183
    self._ssl_params = \
184
      http.HttpSslParams(ssl_key_path=constants.SSL_CERT_FILE,
185
                         ssl_cert_path=constants.SSL_CERT_FILE)
186

    
187
  def ConnectList(self, node_list, address_list=None):
188
    """Add a list of nodes to the target nodes.
189

190
    @type node_list: list
191
    @param node_list: the list of node names to connect
192
    @type address_list: list or None
193
    @keyword address_list: either None or a list with node addresses,
194
        which must have the same length as the node list
195

196
    """
197
    if address_list is None:
198
      address_list = [None for _ in node_list]
199
    else:
200
      assert len(node_list) == len(address_list), \
201
             "Name and address lists should have the same length"
202
    for node, address in zip(node_list, address_list):
203
      self.ConnectNode(node, address)
204

    
205
  def ConnectNode(self, name, address=None):
206
    """Add a node to the target list.
207

208
    @type name: str
209
    @param name: the node name
210
    @type address: str
211
    @keyword address: the node address, if known
212

213
    """
214
    if address is None:
215
      address = name
216

    
217
    self.nc[name] = \
218
      http.client.HttpClientRequest(address, self.port, http.HTTP_PUT,
219
                                    "/%s" % self.procedure,
220
                                    post_data=self.body,
221
                                    ssl_params=self._ssl_params,
222
                                    ssl_verify_peer=True)
223

    
224
  def GetResults(self):
225
    """Call nodes and return results.
226

227
    @rtype: list
228
    @return: List of RPC results
229

230
    """
231
    assert _http_manager, "RPC module not intialized"
232

    
233
    _http_manager.ExecRequests(self.nc.values())
234

    
235
    results = {}
236

    
237
    for name, req in self.nc.iteritems():
238
      if req.success and req.resp_status_code == http.HTTP_OK:
239
        results[name] = RpcResult(data=serializer.LoadJson(req.resp_body),
240
                                  node=name, call=self.procedure)
241
        continue
242

    
243
      # TODO: Better error reporting
244
      if req.error:
245
        msg = req.error
246
      else:
247
        msg = req.resp_body
248

    
249
      logging.error("RPC error in %s from node %s: %s",
250
                    self.procedure, name, msg)
251
      results[name] = RpcResult(data=msg, failed=True, node=name,
252
                                call=self.procedure)
253

    
254
    return results
255

    
256

    
257
class RpcRunner(object):
258
  """RPC runner class"""
259

    
260
  def __init__(self, cfg):
261
    """Initialized the rpc runner.
262

263
    @type cfg:  C{config.ConfigWriter}
264
    @param cfg: the configuration object that will be used to get data
265
                about the cluster
266

267
    """
268
    self._cfg = cfg
269
    self.port = utils.GetNodeDaemonPort()
270

    
271
  def _InstDict(self, instance, hvp=None, bep=None):
272
    """Convert the given instance to a dict.
273

274
    This is done via the instance's ToDict() method and additionally
275
    we fill the hvparams with the cluster defaults.
276

277
    @type instance: L{objects.Instance}
278
    @param instance: an Instance object
279
    @type hvp: dict or None
280
    @param hvp: a dictionary with overriden hypervisor parameters
281
    @type bep: dict or None
282
    @param bep: a dictionary with overriden backend parameters
283
    @rtype: dict
284
    @return: the instance dict, with the hvparams filled with the
285
        cluster defaults
286

287
    """
288
    idict = instance.ToDict()
289
    cluster = self._cfg.GetClusterInfo()
290
    idict["hvparams"] = cluster.FillHV(instance)
291
    if hvp is not None:
292
      idict["hvparams"].update(hvp)
293
    idict["beparams"] = cluster.FillBE(instance)
294
    if bep is not None:
295
      idict["beparams"].update(bep)
296
    for nic in idict["nics"]:
297
      nic['nicparams'] = objects.FillDict(
298
        cluster.nicparams[constants.PP_DEFAULT],
299
        nic['nicparams'])
300
    return idict
301

    
302
  def _ConnectList(self, client, node_list, call):
303
    """Helper for computing node addresses.
304

305
    @type client: L{Client}
306
    @param client: a C{Client} instance
307
    @type node_list: list
308
    @param node_list: the node list we should connect
309
    @type call: string
310
    @param call: the name of the remote procedure call, for filling in
311
        correctly any eventual offline nodes' results
312

313
    """
314
    all_nodes = self._cfg.GetAllNodesInfo()
315
    name_list = []
316
    addr_list = []
317
    skip_dict = {}
318
    for node in node_list:
319
      if node in all_nodes:
320
        if all_nodes[node].offline:
321
          skip_dict[node] = RpcResult(node=node, offline=True, call=call)
322
          continue
323
        val = all_nodes[node].primary_ip
324
      else:
325
        val = None
326
      addr_list.append(val)
327
      name_list.append(node)
328
    if name_list:
329
      client.ConnectList(name_list, address_list=addr_list)
330
    return skip_dict
331

    
332
  def _ConnectNode(self, client, node, call):
333
    """Helper for computing one node's address.
334

335
    @type client: L{Client}
336
    @param client: a C{Client} instance
337
    @type node: str
338
    @param node: the node we should connect
339
    @type call: string
340
    @param call: the name of the remote procedure call, for filling in
341
        correctly any eventual offline nodes' results
342

343
    """
344
    node_info = self._cfg.GetNodeInfo(node)
345
    if node_info is not None:
346
      if node_info.offline:
347
        return RpcResult(node=node, offline=True, call=call)
348
      addr = node_info.primary_ip
349
    else:
350
      addr = None
351
    client.ConnectNode(node, address=addr)
352

    
353
  def _MultiNodeCall(self, node_list, procedure, args):
354
    """Helper for making a multi-node call
355

356
    """
357
    body = serializer.DumpJson(args, indent=False)
358
    c = Client(procedure, body, self.port)
359
    skip_dict = self._ConnectList(c, node_list, procedure)
360
    skip_dict.update(c.GetResults())
361
    return skip_dict
362

    
363
  @classmethod
364
  def _StaticMultiNodeCall(cls, node_list, procedure, args,
365
                           address_list=None):
366
    """Helper for making a multi-node static call
367

368
    """
369
    body = serializer.DumpJson(args, indent=False)
370
    c = Client(procedure, body, utils.GetNodeDaemonPort())
371
    c.ConnectList(node_list, address_list=address_list)
372
    return c.GetResults()
373

    
374
  def _SingleNodeCall(self, node, procedure, args):
375
    """Helper for making a single-node call
376

377
    """
378
    body = serializer.DumpJson(args, indent=False)
379
    c = Client(procedure, body, self.port)
380
    result = self._ConnectNode(c, node, procedure)
381
    if result is None:
382
      # we did connect, node is not offline
383
      result = c.GetResults()[node]
384
    return result
385

    
386
  @classmethod
387
  def _StaticSingleNodeCall(cls, node, procedure, args):
388
    """Helper for making a single-node static call
389

390
    """
391
    body = serializer.DumpJson(args, indent=False)
392
    c = Client(procedure, body, utils.GetNodeDaemonPort())
393
    c.ConnectNode(node)
394
    return c.GetResults()[node]
395

    
396
  @staticmethod
397
  def _Compress(data):
398
    """Compresses a string for transport over RPC.
399

400
    Small amounts of data are not compressed.
401

402
    @type data: str
403
    @param data: Data
404
    @rtype: tuple
405
    @return: Encoded data to send
406

407
    """
408
    # Small amounts of data are not compressed
409
    if len(data) < 512:
410
      return (constants.RPC_ENCODING_NONE, data)
411

    
412
    # Compress with zlib and encode in base64
413
    return (constants.RPC_ENCODING_ZLIB_BASE64,
414
            base64.b64encode(zlib.compress(data, 3)))
415

    
416
  #
417
  # Begin RPC calls
418
  #
419

    
420
  def call_volume_list(self, node_list, vg_name):
421
    """Gets the logical volumes present in a given volume group.
422

423
    This is a multi-node call.
424

425
    """
426
    return self._MultiNodeCall(node_list, "volume_list", [vg_name])
427

    
428
  def call_vg_list(self, node_list):
429
    """Gets the volume group list.
430

431
    This is a multi-node call.
432

433
    """
434
    return self._MultiNodeCall(node_list, "vg_list", [])
435

    
436
  def call_bridges_exist(self, node, bridges_list):
437
    """Checks if a node has all the bridges given.
438

439
    This method checks if all bridges given in the bridges_list are
440
    present on the remote node, so that an instance that uses interfaces
441
    on those bridges can be started.
442

443
    This is a single-node call.
444

445
    """
446
    return self._SingleNodeCall(node, "bridges_exist", [bridges_list])
447

    
448
  def call_instance_start(self, node, instance, hvp, bep):
449
    """Starts an instance.
450

451
    This is a single-node call.
452

453
    """
454
    idict = self._InstDict(instance, hvp=hvp, bep=bep)
455
    return self._SingleNodeCall(node, "instance_start", [idict])
456

    
457
  def call_instance_shutdown(self, node, instance):
458
    """Stops an instance.
459

460
    This is a single-node call.
461

462
    """
463
    return self._SingleNodeCall(node, "instance_shutdown",
464
                                [self._InstDict(instance)])
465

    
466
  def call_migration_info(self, node, instance):
467
    """Gather the information necessary to prepare an instance migration.
468

469
    This is a single-node call.
470

471
    @type node: string
472
    @param node: the node on which the instance is currently running
473
    @type instance: C{objects.Instance}
474
    @param instance: the instance definition
475

476
    """
477
    return self._SingleNodeCall(node, "migration_info",
478
                                [self._InstDict(instance)])
479

    
480
  def call_accept_instance(self, node, instance, info, target):
481
    """Prepare a node to accept an instance.
482

483
    This is a single-node call.
484

485
    @type node: string
486
    @param node: the target node for the migration
487
    @type instance: C{objects.Instance}
488
    @param instance: the instance definition
489
    @type info: opaque/hypervisor specific (string/data)
490
    @param info: result for the call_migration_info call
491
    @type target: string
492
    @param target: target hostname (usually ip address) (on the node itself)
493

494
    """
495
    return self._SingleNodeCall(node, "accept_instance",
496
                                [self._InstDict(instance), info, target])
497

    
498
  def call_finalize_migration(self, node, instance, info, success):
499
    """Finalize any target-node migration specific operation.
500

501
    This is called both in case of a successful migration and in case of error
502
    (in which case it should abort the migration).
503

504
    This is a single-node call.
505

506
    @type node: string
507
    @param node: the target node for the migration
508
    @type instance: C{objects.Instance}
509
    @param instance: the instance definition
510
    @type info: opaque/hypervisor specific (string/data)
511
    @param info: result for the call_migration_info call
512
    @type success: boolean
513
    @param success: whether the migration was a success or a failure
514

515
    """
516
    return self._SingleNodeCall(node, "finalize_migration",
517
                                [self._InstDict(instance), info, success])
518

    
519
  def call_instance_migrate(self, node, instance, target, live):
520
    """Migrate an instance.
521

522
    This is a single-node call.
523

524
    @type node: string
525
    @param node: the node on which the instance is currently running
526
    @type instance: C{objects.Instance}
527
    @param instance: the instance definition
528
    @type target: string
529
    @param target: the target node name
530
    @type live: boolean
531
    @param live: whether the migration should be done live or not (the
532
        interpretation of this parameter is left to the hypervisor)
533

534
    """
535
    return self._SingleNodeCall(node, "instance_migrate",
536
                                [self._InstDict(instance), target, live])
537

    
538
  def call_instance_reboot(self, node, instance, reboot_type):
539
    """Reboots an instance.
540

541
    This is a single-node call.
542

543
    """
544
    return self._SingleNodeCall(node, "instance_reboot",
545
                                [self._InstDict(instance), reboot_type])
546

    
547
  def call_instance_os_add(self, node, inst, reinstall):
548
    """Installs an OS on the given instance.
549

550
    This is a single-node call.
551

552
    """
553
    return self._SingleNodeCall(node, "instance_os_add",
554
                                [self._InstDict(inst), reinstall])
555

    
556
  def call_instance_run_rename(self, node, inst, old_name):
557
    """Run the OS rename script for an instance.
558

559
    This is a single-node call.
560

561
    """
562
    return self._SingleNodeCall(node, "instance_run_rename",
563
                                [self._InstDict(inst), old_name])
564

    
565
  def call_instance_info(self, node, instance, hname):
566
    """Returns information about a single instance.
567

568
    This is a single-node call.
569

570
    @type node: list
571
    @param node: the list of nodes to query
572
    @type instance: string
573
    @param instance: the instance name
574
    @type hname: string
575
    @param hname: the hypervisor type of the instance
576

577
    """
578
    return self._SingleNodeCall(node, "instance_info", [instance, hname])
579

    
580
  def call_instance_migratable(self, node, instance):
581
    """Checks whether the given instance can be migrated.
582

583
    This is a single-node call.
584

585
    @param node: the node to query
586
    @type instance: L{objects.Instance}
587
    @param instance: the instance to check
588

589

590
    """
591
    return self._SingleNodeCall(node, "instance_migratable",
592
                                [self._InstDict(instance)])
593

    
594
  def call_all_instances_info(self, node_list, hypervisor_list):
595
    """Returns information about all instances on the given nodes.
596

597
    This is a multi-node call.
598

599
    @type node_list: list
600
    @param node_list: the list of nodes to query
601
    @type hypervisor_list: list
602
    @param hypervisor_list: the hypervisors to query for instances
603

604
    """
605
    return self._MultiNodeCall(node_list, "all_instances_info",
606
                               [hypervisor_list])
607

    
608
  def call_instance_list(self, node_list, hypervisor_list):
609
    """Returns the list of running instances on a given node.
610

611
    This is a multi-node call.
612

613
    @type node_list: list
614
    @param node_list: the list of nodes to query
615
    @type hypervisor_list: list
616
    @param hypervisor_list: the hypervisors to query for instances
617

618
    """
619
    return self._MultiNodeCall(node_list, "instance_list", [hypervisor_list])
620

    
621
  def call_node_tcp_ping(self, node, source, target, port, timeout,
622
                         live_port_needed):
623
    """Do a TcpPing on the remote node
624

625
    This is a single-node call.
626

627
    """
628
    return self._SingleNodeCall(node, "node_tcp_ping",
629
                                [source, target, port, timeout,
630
                                 live_port_needed])
631

    
632
  def call_node_has_ip_address(self, node, address):
633
    """Checks if a node has the given IP address.
634

635
    This is a single-node call.
636

637
    """
638
    return self._SingleNodeCall(node, "node_has_ip_address", [address])
639

    
640
  def call_node_info(self, node_list, vg_name, hypervisor_type):
641
    """Return node information.
642

643
    This will return memory information and volume group size and free
644
    space.
645

646
    This is a multi-node call.
647

648
    @type node_list: list
649
    @param node_list: the list of nodes to query
650
    @type vg_name: C{string}
651
    @param vg_name: the name of the volume group to ask for disk space
652
        information
653
    @type hypervisor_type: C{str}
654
    @param hypervisor_type: the name of the hypervisor to ask for
655
        memory information
656

657
    """
658
    return self._MultiNodeCall(node_list, "node_info",
659
                               [vg_name, hypervisor_type])
660

    
661
  def call_node_add(self, node, dsa, dsapub, rsa, rsapub, ssh, sshpub):
662
    """Add a node to the cluster.
663

664
    This is a single-node call.
665

666
    """
667
    return self._SingleNodeCall(node, "node_add",
668
                                [dsa, dsapub, rsa, rsapub, ssh, sshpub])
669

    
670
  def call_node_verify(self, node_list, checkdict, cluster_name):
671
    """Request verification of given parameters.
672

673
    This is a multi-node call.
674

675
    """
676
    return self._MultiNodeCall(node_list, "node_verify",
677
                               [checkdict, cluster_name])
678

    
679
  @classmethod
680
  def call_node_start_master(cls, node, start_daemons):
681
    """Tells a node to activate itself as a master.
682

683
    This is a single-node call.
684

685
    """
686
    return cls._StaticSingleNodeCall(node, "node_start_master",
687
                                     [start_daemons])
688

    
689
  @classmethod
690
  def call_node_stop_master(cls, node, stop_daemons):
691
    """Tells a node to demote itself from master status.
692

693
    This is a single-node call.
694

695
    """
696
    return cls._StaticSingleNodeCall(node, "node_stop_master", [stop_daemons])
697

    
698
  @classmethod
699
  def call_master_info(cls, node_list):
700
    """Query master info.
701

702
    This is a multi-node call.
703

704
    """
705
    # TODO: should this method query down nodes?
706
    return cls._StaticMultiNodeCall(node_list, "master_info", [])
707

    
708
  def call_version(self, node_list):
709
    """Query node version.
710

711
    This is a multi-node call.
712

713
    """
714
    return self._MultiNodeCall(node_list, "version", [])
715

    
716
  def call_blockdev_create(self, node, bdev, size, owner, on_primary, info):
717
    """Request creation of a given block device.
718

719
    This is a single-node call.
720

721
    """
722
    return self._SingleNodeCall(node, "blockdev_create",
723
                                [bdev.ToDict(), size, owner, on_primary, info])
724

    
725
  def call_blockdev_remove(self, node, bdev):
726
    """Request removal of a given block device.
727

728
    This is a single-node call.
729

730
    """
731
    return self._SingleNodeCall(node, "blockdev_remove", [bdev.ToDict()])
732

    
733
  def call_blockdev_rename(self, node, devlist):
734
    """Request rename of the given block devices.
735

736
    This is a single-node call.
737

738
    """
739
    return self._SingleNodeCall(node, "blockdev_rename",
740
                                [(d.ToDict(), uid) for d, uid in devlist])
741

    
742
  def call_blockdev_assemble(self, node, disk, owner, on_primary):
743
    """Request assembling of a given block device.
744

745
    This is a single-node call.
746

747
    """
748
    return self._SingleNodeCall(node, "blockdev_assemble",
749
                                [disk.ToDict(), owner, on_primary])
750

    
751
  def call_blockdev_shutdown(self, node, disk):
752
    """Request shutdown of a given block device.
753

754
    This is a single-node call.
755

756
    """
757
    return self._SingleNodeCall(node, "blockdev_shutdown", [disk.ToDict()])
758

    
759
  def call_blockdev_addchildren(self, node, bdev, ndevs):
760
    """Request adding a list of children to a (mirroring) device.
761

762
    This is a single-node call.
763

764
    """
765
    return self._SingleNodeCall(node, "blockdev_addchildren",
766
                                [bdev.ToDict(),
767
                                 [disk.ToDict() for disk in ndevs]])
768

    
769
  def call_blockdev_removechildren(self, node, bdev, ndevs):
770
    """Request removing a list of children from a (mirroring) device.
771

772
    This is a single-node call.
773

774
    """
775
    return self._SingleNodeCall(node, "blockdev_removechildren",
776
                                [bdev.ToDict(),
777
                                 [disk.ToDict() for disk in ndevs]])
778

    
779
  def call_blockdev_getmirrorstatus(self, node, disks):
780
    """Request status of a (mirroring) device.
781

782
    This is a single-node call.
783

784
    """
785
    return self._SingleNodeCall(node, "blockdev_getmirrorstatus",
786
                                [dsk.ToDict() for dsk in disks])
787

    
788
  def call_blockdev_find(self, node, disk):
789
    """Request identification of a given block device.
790

791
    This is a single-node call.
792

793
    """
794
    return self._SingleNodeCall(node, "blockdev_find", [disk.ToDict()])
795

    
796
  def call_blockdev_close(self, node, instance_name, disks):
797
    """Closes the given block devices.
798

799
    This is a single-node call.
800

801
    """
802
    params = [instance_name, [cf.ToDict() for cf in disks]]
803
    return self._SingleNodeCall(node, "blockdev_close", params)
804

    
805
  def call_drbd_disconnect_net(self, node_list, nodes_ip, disks):
806
    """Disconnects the network of the given drbd devices.
807

808
    This is a multi-node call.
809

810
    """
811
    return self._MultiNodeCall(node_list, "drbd_disconnect_net",
812
                               [nodes_ip, [cf.ToDict() for cf in disks]])
813

    
814
  def call_drbd_attach_net(self, node_list, nodes_ip,
815
                           disks, instance_name, multimaster):
816
    """Disconnects the given drbd devices.
817

818
    This is a multi-node call.
819

820
    """
821
    return self._MultiNodeCall(node_list, "drbd_attach_net",
822
                               [nodes_ip, [cf.ToDict() for cf in disks],
823
                                instance_name, multimaster])
824

    
825
  def call_drbd_wait_sync(self, node_list, nodes_ip, disks):
826
    """Waits for the synchronization of drbd devices is complete.
827

828
    This is a multi-node call.
829

830
    """
831
    return self._MultiNodeCall(node_list, "drbd_wait_sync",
832
                               [nodes_ip, [cf.ToDict() for cf in disks]])
833

    
834
  @classmethod
835
  def call_upload_file(cls, node_list, file_name, address_list=None):
836
    """Upload a file.
837

838
    The node will refuse the operation in case the file is not on the
839
    approved file list.
840

841
    This is a multi-node call.
842

843
    @type node_list: list
844
    @param node_list: the list of node names to upload to
845
    @type file_name: str
846
    @param file_name: the filename to upload
847
    @type address_list: list or None
848
    @keyword address_list: an optional list of node addresses, in order
849
        to optimize the RPC speed
850

851
    """
852
    file_contents = utils.ReadFile(file_name)
853
    data = cls._Compress(file_contents)
854
    st = os.stat(file_name)
855
    params = [file_name, data, st.st_mode, st.st_uid, st.st_gid,
856
              st.st_atime, st.st_mtime]
857
    return cls._StaticMultiNodeCall(node_list, "upload_file", params,
858
                                    address_list=address_list)
859

    
860
  @classmethod
861
  def call_write_ssconf_files(cls, node_list, values):
862
    """Write ssconf files.
863

864
    This is a multi-node call.
865

866
    """
867
    return cls._StaticMultiNodeCall(node_list, "write_ssconf_files", [values])
868

    
869
  def call_os_diagnose(self, node_list):
870
    """Request a diagnose of OS definitions.
871

872
    This is a multi-node call.
873

874
    """
875
    return self._MultiNodeCall(node_list, "os_diagnose", [])
876

    
877
  def call_os_get(self, node, name):
878
    """Returns an OS definition.
879

880
    This is a single-node call.
881

882
    """
883
    result = self._SingleNodeCall(node, "os_get", [name])
884
    if not result.failed and isinstance(result.data, dict):
885
      result.data = objects.OS.FromDict(result.data)
886
    return result
887

    
888
  def call_hooks_runner(self, node_list, hpath, phase, env):
889
    """Call the hooks runner.
890

891
    Args:
892
      - op: the OpCode instance
893
      - env: a dictionary with the environment
894

895
    This is a multi-node call.
896

897
    """
898
    params = [hpath, phase, env]
899
    return self._MultiNodeCall(node_list, "hooks_runner", params)
900

    
901
  def call_iallocator_runner(self, node, name, idata):
902
    """Call an iallocator on a remote node
903

904
    Args:
905
      - name: the iallocator name
906
      - input: the json-encoded input string
907

908
    This is a single-node call.
909

910
    """
911
    return self._SingleNodeCall(node, "iallocator_runner", [name, idata])
912

    
913
  def call_blockdev_grow(self, node, cf_bdev, amount):
914
    """Request a snapshot of the given block device.
915

916
    This is a single-node call.
917

918
    """
919
    return self._SingleNodeCall(node, "blockdev_grow",
920
                                [cf_bdev.ToDict(), amount])
921

    
922
  def call_blockdev_snapshot(self, node, cf_bdev):
923
    """Request a snapshot of the given block device.
924

925
    This is a single-node call.
926

927
    """
928
    return self._SingleNodeCall(node, "blockdev_snapshot", [cf_bdev.ToDict()])
929

    
930
  def call_snapshot_export(self, node, snap_bdev, dest_node, instance,
931
                           cluster_name, idx):
932
    """Request the export of a given snapshot.
933

934
    This is a single-node call.
935

936
    """
937
    return self._SingleNodeCall(node, "snapshot_export",
938
                                [snap_bdev.ToDict(), dest_node,
939
                                 self._InstDict(instance), cluster_name, idx])
940

    
941
  def call_finalize_export(self, node, instance, snap_disks):
942
    """Request the completion of an export operation.
943

944
    This writes the export config file, etc.
945

946
    This is a single-node call.
947

948
    """
949
    flat_disks = []
950
    for disk in snap_disks:
951
      flat_disks.append(disk.ToDict())
952

    
953
    return self._SingleNodeCall(node, "finalize_export",
954
                                [self._InstDict(instance), flat_disks])
955

    
956
  def call_export_info(self, node, path):
957
    """Queries the export information in a given path.
958

959
    This is a single-node call.
960

961
    """
962
    return self._SingleNodeCall(node, "export_info", [path])
963

    
964
  def call_instance_os_import(self, node, inst, src_node, src_images,
965
                              cluster_name):
966
    """Request the import of a backup into an instance.
967

968
    This is a single-node call.
969

970
    """
971
    return self._SingleNodeCall(node, "instance_os_import",
972
                                [self._InstDict(inst), src_node, src_images,
973
                                 cluster_name])
974

    
975
  def call_export_list(self, node_list):
976
    """Gets the stored exports list.
977

978
    This is a multi-node call.
979

980
    """
981
    return self._MultiNodeCall(node_list, "export_list", [])
982

    
983
  def call_export_remove(self, node, export):
984
    """Requests removal of a given export.
985

986
    This is a single-node call.
987

988
    """
989
    return self._SingleNodeCall(node, "export_remove", [export])
990

    
991
  @classmethod
992
  def call_node_leave_cluster(cls, node):
993
    """Requests a node to clean the cluster information it has.
994

995
    This will remove the configuration information from the ganeti data
996
    dir.
997

998
    This is a single-node call.
999

1000
    """
1001
    return cls._StaticSingleNodeCall(node, "node_leave_cluster", [])
1002

    
1003
  def call_node_volumes(self, node_list):
1004
    """Gets all volumes on node(s).
1005

1006
    This is a multi-node call.
1007

1008
    """
1009
    return self._MultiNodeCall(node_list, "node_volumes", [])
1010

    
1011
  def call_node_demote_from_mc(self, node):
1012
    """Demote a node from the master candidate role.
1013

1014
    This is a single-node call.
1015

1016
    """
1017
    return self._SingleNodeCall(node, "node_demote_from_mc", [])
1018

    
1019

    
1020
  def call_node_powercycle(self, node, hypervisor):
1021
    """Tries to powercycle a node.
1022

1023
    This is a single-node call.
1024

1025
    """
1026
    return self._SingleNodeCall(node, "node_powercycle", [hypervisor])
1027

    
1028

    
1029
  def call_test_delay(self, node_list, duration):
1030
    """Sleep for a fixed time on given node(s).
1031

1032
    This is a multi-node call.
1033

1034
    """
1035
    return self._MultiNodeCall(node_list, "test_delay", [duration])
1036

    
1037
  def call_file_storage_dir_create(self, node, file_storage_dir):
1038
    """Create the given file storage directory.
1039

1040
    This is a single-node call.
1041

1042
    """
1043
    return self._SingleNodeCall(node, "file_storage_dir_create",
1044
                                [file_storage_dir])
1045

    
1046
  def call_file_storage_dir_remove(self, node, file_storage_dir):
1047
    """Remove the given file storage directory.
1048

1049
    This is a single-node call.
1050

1051
    """
1052
    return self._SingleNodeCall(node, "file_storage_dir_remove",
1053
                                [file_storage_dir])
1054

    
1055
  def call_file_storage_dir_rename(self, node, old_file_storage_dir,
1056
                                   new_file_storage_dir):
1057
    """Rename file storage directory.
1058

1059
    This is a single-node call.
1060

1061
    """
1062
    return self._SingleNodeCall(node, "file_storage_dir_rename",
1063
                                [old_file_storage_dir, new_file_storage_dir])
1064

    
1065
  @classmethod
1066
  def call_jobqueue_update(cls, node_list, address_list, file_name, content):
1067
    """Update job queue.
1068

1069
    This is a multi-node call.
1070

1071
    """
1072
    return cls._StaticMultiNodeCall(node_list, "jobqueue_update",
1073
                                    [file_name, cls._Compress(content)],
1074
                                    address_list=address_list)
1075

    
1076
  @classmethod
1077
  def call_jobqueue_purge(cls, node):
1078
    """Purge job queue.
1079

1080
    This is a single-node call.
1081

1082
    """
1083
    return cls._StaticSingleNodeCall(node, "jobqueue_purge", [])
1084

    
1085
  @classmethod
1086
  def call_jobqueue_rename(cls, node_list, address_list, rename):
1087
    """Rename a job queue file.
1088

1089
    This is a multi-node call.
1090

1091
    """
1092
    return cls._StaticMultiNodeCall(node_list, "jobqueue_rename", rename,
1093
                                    address_list=address_list)
1094

    
1095
  @classmethod
1096
  def call_jobqueue_set_drain(cls, node_list, drain_flag):
1097
    """Set the drain flag on the queue.
1098

1099
    This is a multi-node call.
1100

1101
    @type node_list: list
1102
    @param node_list: the list of nodes to query
1103
    @type drain_flag: bool
1104
    @param drain_flag: if True, will set the drain flag, otherwise reset it.
1105

1106
    """
1107
    return cls._StaticMultiNodeCall(node_list, "jobqueue_set_drain",
1108
                                    [drain_flag])
1109

    
1110
  def call_hypervisor_validate_params(self, node_list, hvname, hvparams):
1111
    """Validate the hypervisor params.
1112

1113
    This is a multi-node call.
1114

1115
    @type node_list: list
1116
    @param node_list: the list of nodes to query
1117
    @type hvname: string
1118
    @param hvname: the hypervisor name
1119
    @type hvparams: dict
1120
    @param hvparams: the hypervisor parameters to be validated
1121

1122
    """
1123
    cluster = self._cfg.GetClusterInfo()
1124
    hv_full = objects.FillDict(cluster.hvparams.get(hvname, {}), hvparams)
1125
    return self._MultiNodeCall(node_list, "hypervisor_validate_params",
1126
                               [hvname, hv_full])