Statistics
| Branch: | Tag: | Revision:

root / lib / rpc.py @ 23f06b2b

History | View | Annotate | Download (33.1 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Inter-node RPC library.
23

24
"""
25

    
26
# pylint: disable-msg=C0103,R0201,R0904
27
# C0103: Invalid name, since call_ are not valid
28
# R0201: Method could be a function, we keep all rpcs instance methods
29
# as not to change them back and forth between static/instance methods
30
# if they need to start using instance attributes
31
# R0904: Too many public methods
32

    
33
import os
34
import socket
35
import logging
36
import zlib
37
import base64
38

    
39
from ganeti import utils
40
from ganeti import objects
41
from ganeti import http
42
from ganeti import serializer
43
from ganeti import constants
44
from ganeti import errors
45

    
46
import ganeti.http.client
47

    
48

    
49
# Module level variable
50
_http_manager = None
51

    
52

    
53
def Init():
54
  """Initializes the module-global HTTP client manager.
55

56
  Must be called before using any RPC function.
57

58
  """
59
  global _http_manager
60

    
61
  assert not _http_manager, "RPC module initialized more than once"
62

    
63
  _http_manager = http.client.HttpClientManager()
64

    
65

    
66
def Shutdown():
67
  """Stops the module-global HTTP client manager.
68

69
  Must be called before quitting the program.
70

71
  """
72
  global _http_manager
73

    
74
  if _http_manager:
75
    _http_manager.Shutdown()
76
    _http_manager = None
77

    
78

    
79
class RpcResult(object):
80
  """RPC Result class.
81

82
  This class holds an RPC result. It is needed since in multi-node
83
  calls we can't raise an exception just because one one out of many
84
  failed, and therefore we use this class to encapsulate the result.
85

86
  @ivar data: the data payload, for successfull results, or None
87
  @type failed: boolean
88
  @ivar failed: whether the operation failed at RPC level (not
89
      application level on the remote node)
90
  @ivar call: the name of the RPC call
91
  @ivar node: the name of the node to which we made the call
92
  @ivar offline: whether the operation failed because the node was
93
      offline, as opposed to actual failure; offline=True will always
94
      imply failed=True, in order to allow simpler checking if
95
      the user doesn't care about the exact failure mode
96

97
  """
98
  def __init__(self, data=None, failed=False, offline=False,
99
               call=None, node=None):
100
    self.failed = failed
101
    self.offline = offline
102
    self.call = call
103
    self.node = node
104
    if offline:
105
      self.failed = True
106
      self.error = "Node is marked offline"
107
      self.data = self.payload = None
108
    elif failed:
109
      self.error = data
110
      self.data = self.payload = None
111
    else:
112
      self.data = data
113
      self.error = None
114
      if isinstance(data, (tuple, list)) and len(data) == 2:
115
        self.payload = data[1]
116
      else:
117
        self.payload = None
118

    
119
  def Raise(self):
120
    """If the result has failed, raise an OpExecError.
121

122
    This is used so that LU code doesn't have to check for each
123
    result, but instead can call this function.
124

125
    """
126
    if self.failed:
127
      raise errors.OpExecError("Call '%s' to node '%s' has failed: %s" %
128
                               (self.call, self.node, self.error))
129

    
130
  def RemoteFailMsg(self):
131
    """Check if the remote procedure failed.
132

133
    This is valid only for RPC calls which return result of the form
134
    (status, data | error_msg).
135

136
    @return: empty string for succcess, otherwise an error message
137

138
    """
139
    def _EnsureErr(val):
140
      """Helper to ensure we return a 'True' value for error."""
141
      if val:
142
        return val
143
      else:
144
        return "No error information"
145

    
146
    if self.failed:
147
      return _EnsureErr(self.error)
148
    if not isinstance(self.data, (tuple, list)):
149
      return "Invalid result type (%s)" % type(self.data)
150
    if len(self.data) != 2:
151
      return "Invalid result length (%d), expected 2" % len(self.data)
152
    if not self.data[0]:
153
      return _EnsureErr(self.data[1])
154
    return ""
155

    
156

    
157
class Client:
158
  """RPC Client class.
159

160
  This class, given a (remote) method name, a list of parameters and a
161
  list of nodes, will contact (in parallel) all nodes, and return a
162
  dict of results (key: node name, value: result).
163

164
  One current bug is that generic failure is still signalled by
165
  'False' result, which is not good. This overloading of values can
166
  cause bugs.
167

168
  """
169
  def __init__(self, procedure, body, port):
170
    self.procedure = procedure
171
    self.body = body
172
    self.port = port
173
    self.nc = {}
174

    
175
    self._ssl_params = \
176
      http.HttpSslParams(ssl_key_path=constants.SSL_CERT_FILE,
177
                         ssl_cert_path=constants.SSL_CERT_FILE)
178

    
179
  def ConnectList(self, node_list, address_list=None):
180
    """Add a list of nodes to the target nodes.
181

182
    @type node_list: list
183
    @param node_list: the list of node names to connect
184
    @type address_list: list or None
185
    @keyword address_list: either None or a list with node addresses,
186
        which must have the same length as the node list
187

188
    """
189
    if address_list is None:
190
      address_list = [None for _ in node_list]
191
    else:
192
      assert len(node_list) == len(address_list), \
193
             "Name and address lists should have the same length"
194
    for node, address in zip(node_list, address_list):
195
      self.ConnectNode(node, address)
196

    
197
  def ConnectNode(self, name, address=None):
198
    """Add a node to the target list.
199

200
    @type name: str
201
    @param name: the node name
202
    @type address: str
203
    @keyword address: the node address, if known
204

205
    """
206
    if address is None:
207
      address = name
208

    
209
    self.nc[name] = \
210
      http.client.HttpClientRequest(address, self.port, http.HTTP_PUT,
211
                                    "/%s" % self.procedure,
212
                                    post_data=self.body,
213
                                    ssl_params=self._ssl_params,
214
                                    ssl_verify_peer=True)
215

    
216
  def GetResults(self):
217
    """Call nodes and return results.
218

219
    @rtype: list
220
    @return: List of RPC results
221

222
    """
223
    assert _http_manager, "RPC module not intialized"
224

    
225
    _http_manager.ExecRequests(self.nc.values())
226

    
227
    results = {}
228

    
229
    for name, req in self.nc.iteritems():
230
      if req.success and req.resp_status_code == http.HTTP_OK:
231
        results[name] = RpcResult(data=serializer.LoadJson(req.resp_body),
232
                                  node=name, call=self.procedure)
233
        continue
234

    
235
      # TODO: Better error reporting
236
      if req.error:
237
        msg = req.error
238
      else:
239
        msg = req.resp_body
240

    
241
      logging.error("RPC error in %s from node %s: %s",
242
                    self.procedure, name, msg)
243
      results[name] = RpcResult(data=msg, failed=True, node=name,
244
                                call=self.procedure)
245

    
246
    return results
247

    
248

    
249
class RpcRunner(object):
250
  """RPC runner class"""
251

    
252
  def __init__(self, cfg):
253
    """Initialized the rpc runner.
254

255
    @type cfg:  C{config.ConfigWriter}
256
    @param cfg: the configuration object that will be used to get data
257
                about the cluster
258

259
    """
260
    self._cfg = cfg
261
    self.port = utils.GetNodeDaemonPort()
262

    
263
  def _InstDict(self, instance, hvp=None, bep=None):
264
    """Convert the given instance to a dict.
265

266
    This is done via the instance's ToDict() method and additionally
267
    we fill the hvparams with the cluster defaults.
268

269
    @type instance: L{objects.Instance}
270
    @param instance: an Instance object
271
    @type hvp: dict or None
272
    @param hvp: a dictionary with overriden hypervisor parameters
273
    @type bep: dict or None
274
    @param bep: a dictionary with overriden backend parameters
275
    @rtype: dict
276
    @return: the instance dict, with the hvparams filled with the
277
        cluster defaults
278

279
    """
280
    idict = instance.ToDict()
281
    cluster = self._cfg.GetClusterInfo()
282
    idict["hvparams"] = cluster.FillHV(instance)
283
    if hvp is not None:
284
      idict["hvparams"].update(hvp)
285
    idict["beparams"] = cluster.FillBE(instance)
286
    if bep is not None:
287
      idict["beparams"].update(bep)
288
    return idict
289

    
290
  def _ConnectList(self, client, node_list, call):
291
    """Helper for computing node addresses.
292

293
    @type client: L{Client}
294
    @param client: a C{Client} instance
295
    @type node_list: list
296
    @param node_list: the node list we should connect
297
    @type call: string
298
    @param call: the name of the remote procedure call, for filling in
299
        correctly any eventual offline nodes' results
300

301
    """
302
    all_nodes = self._cfg.GetAllNodesInfo()
303
    name_list = []
304
    addr_list = []
305
    skip_dict = {}
306
    for node in node_list:
307
      if node in all_nodes:
308
        if all_nodes[node].offline:
309
          skip_dict[node] = RpcResult(node=node, offline=True, call=call)
310
          continue
311
        val = all_nodes[node].primary_ip
312
      else:
313
        val = None
314
      addr_list.append(val)
315
      name_list.append(node)
316
    if name_list:
317
      client.ConnectList(name_list, address_list=addr_list)
318
    return skip_dict
319

    
320
  def _ConnectNode(self, client, node, call):
321
    """Helper for computing one node's address.
322

323
    @type client: L{Client}
324
    @param client: a C{Client} instance
325
    @type node: str
326
    @param node: the node we should connect
327
    @type call: string
328
    @param call: the name of the remote procedure call, for filling in
329
        correctly any eventual offline nodes' results
330

331
    """
332
    node_info = self._cfg.GetNodeInfo(node)
333
    if node_info is not None:
334
      if node_info.offline:
335
        return RpcResult(node=node, offline=True, call=call)
336
      addr = node_info.primary_ip
337
    else:
338
      addr = None
339
    client.ConnectNode(node, address=addr)
340

    
341
  def _MultiNodeCall(self, node_list, procedure, args):
342
    """Helper for making a multi-node call
343

344
    """
345
    body = serializer.DumpJson(args, indent=False)
346
    c = Client(procedure, body, self.port)
347
    skip_dict = self._ConnectList(c, node_list, procedure)
348
    skip_dict.update(c.GetResults())
349
    return skip_dict
350

    
351
  @classmethod
352
  def _StaticMultiNodeCall(cls, node_list, procedure, args,
353
                           address_list=None):
354
    """Helper for making a multi-node static call
355

356
    """
357
    body = serializer.DumpJson(args, indent=False)
358
    c = Client(procedure, body, utils.GetNodeDaemonPort())
359
    c.ConnectList(node_list, address_list=address_list)
360
    return c.GetResults()
361

    
362
  def _SingleNodeCall(self, node, procedure, args):
363
    """Helper for making a single-node call
364

365
    """
366
    body = serializer.DumpJson(args, indent=False)
367
    c = Client(procedure, body, self.port)
368
    result = self._ConnectNode(c, node, procedure)
369
    if result is None:
370
      # we did connect, node is not offline
371
      result = c.GetResults()[node]
372
    return result
373

    
374
  @classmethod
375
  def _StaticSingleNodeCall(cls, node, procedure, args):
376
    """Helper for making a single-node static call
377

378
    """
379
    body = serializer.DumpJson(args, indent=False)
380
    c = Client(procedure, body, utils.GetNodeDaemonPort())
381
    c.ConnectNode(node)
382
    return c.GetResults()[node]
383

    
384
  @staticmethod
385
  def _Compress(data):
386
    """Compresses a string for transport over RPC.
387

388
    Small amounts of data are not compressed.
389

390
    @type data: str
391
    @param data: Data
392
    @rtype: tuple
393
    @return: Encoded data to send
394

395
    """
396
    # Small amounts of data are not compressed
397
    if len(data) < 512:
398
      return (constants.RPC_ENCODING_NONE, data)
399

    
400
    # Compress with zlib and encode in base64
401
    return (constants.RPC_ENCODING_ZLIB_BASE64,
402
            base64.b64encode(zlib.compress(data, 3)))
403

    
404
  #
405
  # Begin RPC calls
406
  #
407

    
408
  def call_volume_list(self, node_list, vg_name):
409
    """Gets the logical volumes present in a given volume group.
410

411
    This is a multi-node call.
412

413
    """
414
    return self._MultiNodeCall(node_list, "volume_list", [vg_name])
415

    
416
  def call_vg_list(self, node_list):
417
    """Gets the volume group list.
418

419
    This is a multi-node call.
420

421
    """
422
    return self._MultiNodeCall(node_list, "vg_list", [])
423

    
424
  def call_bridges_exist(self, node, bridges_list):
425
    """Checks if a node has all the bridges given.
426

427
    This method checks if all bridges given in the bridges_list are
428
    present on the remote node, so that an instance that uses interfaces
429
    on those bridges can be started.
430

431
    This is a single-node call.
432

433
    """
434
    return self._SingleNodeCall(node, "bridges_exist", [bridges_list])
435

    
436
  def call_instance_start(self, node, instance, hvp, bep):
437
    """Starts an instance.
438

439
    This is a single-node call.
440

441
    """
442
    idict = self._InstDict(instance, hvp=hvp, bep=bep)
443
    return self._SingleNodeCall(node, "instance_start", [idict])
444

    
445
  def call_instance_shutdown(self, node, instance):
446
    """Stops an instance.
447

448
    This is a single-node call.
449

450
    """
451
    return self._SingleNodeCall(node, "instance_shutdown",
452
                                [self._InstDict(instance)])
453

    
454
  def call_migration_info(self, node, instance):
455
    """Gather the information necessary to prepare an instance migration.
456

457
    This is a single-node call.
458

459
    @type node: string
460
    @param node: the node on which the instance is currently running
461
    @type instance: C{objects.Instance}
462
    @param instance: the instance definition
463

464
    """
465
    return self._SingleNodeCall(node, "migration_info",
466
                                [self._InstDict(instance)])
467

    
468
  def call_accept_instance(self, node, instance, info, target):
469
    """Prepare a node to accept an instance.
470

471
    This is a single-node call.
472

473
    @type node: string
474
    @param node: the target node for the migration
475
    @type instance: C{objects.Instance}
476
    @param instance: the instance definition
477
    @type info: opaque/hypervisor specific (string/data)
478
    @param info: result for the call_migration_info call
479
    @type target: string
480
    @param target: target hostname (usually ip address) (on the node itself)
481

482
    """
483
    return self._SingleNodeCall(node, "accept_instance",
484
                                [self._InstDict(instance), info, target])
485

    
486
  def call_finalize_migration(self, node, instance, info, success):
487
    """Finalize any target-node migration specific operation.
488

489
    This is called both in case of a successful migration and in case of error
490
    (in which case it should abort the migration).
491

492
    This is a single-node call.
493

494
    @type node: string
495
    @param node: the target node for the migration
496
    @type instance: C{objects.Instance}
497
    @param instance: the instance definition
498
    @type info: opaque/hypervisor specific (string/data)
499
    @param info: result for the call_migration_info call
500
    @type success: boolean
501
    @param success: whether the migration was a success or a failure
502

503
    """
504
    return self._SingleNodeCall(node, "finalize_migration",
505
                                [self._InstDict(instance), info, success])
506

    
507
  def call_instance_migrate(self, node, instance, target, live):
508
    """Migrate an instance.
509

510
    This is a single-node call.
511

512
    @type node: string
513
    @param node: the node on which the instance is currently running
514
    @type instance: C{objects.Instance}
515
    @param instance: the instance definition
516
    @type target: string
517
    @param target: the target node name
518
    @type live: boolean
519
    @param live: whether the migration should be done live or not (the
520
        interpretation of this parameter is left to the hypervisor)
521

522
    """
523
    return self._SingleNodeCall(node, "instance_migrate",
524
                                [self._InstDict(instance), target, live])
525

    
526
  def call_instance_reboot(self, node, instance, reboot_type):
527
    """Reboots an instance.
528

529
    This is a single-node call.
530

531
    """
532
    return self._SingleNodeCall(node, "instance_reboot",
533
                                [self._InstDict(instance), reboot_type])
534

    
535
  def call_instance_os_add(self, node, inst):
536
    """Installs an OS on the given instance.
537

538
    This is a single-node call.
539

540
    """
541
    return self._SingleNodeCall(node, "instance_os_add",
542
                                [self._InstDict(inst)])
543

    
544
  def call_instance_run_rename(self, node, inst, old_name):
545
    """Run the OS rename script for an instance.
546

547
    This is a single-node call.
548

549
    """
550
    return self._SingleNodeCall(node, "instance_run_rename",
551
                                [self._InstDict(inst), old_name])
552

    
553
  def call_instance_info(self, node, instance, hname):
554
    """Returns information about a single instance.
555

556
    This is a single-node call.
557

558
    @type node: list
559
    @param node: the list of nodes to query
560
    @type instance: string
561
    @param instance: the instance name
562
    @type hname: string
563
    @param hname: the hypervisor type of the instance
564

565
    """
566
    return self._SingleNodeCall(node, "instance_info", [instance, hname])
567

    
568
  def call_instance_migratable(self, node, instance):
569
    """Checks whether the given instance can be migrated.
570

571
    This is a single-node call.
572

573
    @param node: the node to query
574
    @type instance: L{objects.Instance}
575
    @param instance: the instance to check
576

577

578
    """
579
    return self._SingleNodeCall(node, "instance_migratable",
580
                                [self._InstDict(instance)])
581

    
582
  def call_all_instances_info(self, node_list, hypervisor_list):
583
    """Returns information about all instances on the given nodes.
584

585
    This is a multi-node call.
586

587
    @type node_list: list
588
    @param node_list: the list of nodes to query
589
    @type hypervisor_list: list
590
    @param hypervisor_list: the hypervisors to query for instances
591

592
    """
593
    return self._MultiNodeCall(node_list, "all_instances_info",
594
                               [hypervisor_list])
595

    
596
  def call_instance_list(self, node_list, hypervisor_list):
597
    """Returns the list of running instances on a given node.
598

599
    This is a multi-node call.
600

601
    @type node_list: list
602
    @param node_list: the list of nodes to query
603
    @type hypervisor_list: list
604
    @param hypervisor_list: the hypervisors to query for instances
605

606
    """
607
    return self._MultiNodeCall(node_list, "instance_list", [hypervisor_list])
608

    
609
  def call_node_tcp_ping(self, node, source, target, port, timeout,
610
                         live_port_needed):
611
    """Do a TcpPing on the remote node
612

613
    This is a single-node call.
614

615
    """
616
    return self._SingleNodeCall(node, "node_tcp_ping",
617
                                [source, target, port, timeout,
618
                                 live_port_needed])
619

    
620
  def call_node_has_ip_address(self, node, address):
621
    """Checks if a node has the given IP address.
622

623
    This is a single-node call.
624

625
    """
626
    return self._SingleNodeCall(node, "node_has_ip_address", [address])
627

    
628
  def call_node_info(self, node_list, vg_name, hypervisor_type):
629
    """Return node information.
630

631
    This will return memory information and volume group size and free
632
    space.
633

634
    This is a multi-node call.
635

636
    @type node_list: list
637
    @param node_list: the list of nodes to query
638
    @type vg_name: C{string}
639
    @param vg_name: the name of the volume group to ask for disk space
640
        information
641
    @type hypervisor_type: C{str}
642
    @param hypervisor_type: the name of the hypervisor to ask for
643
        memory information
644

645
    """
646
    retux = self._MultiNodeCall(node_list, "node_info",
647
                                [vg_name, hypervisor_type])
648

    
649
    for result in retux.itervalues():
650
      if result.failed or not isinstance(result.data, dict):
651
        result.data = {}
652
      if result.offline:
653
        log_name = None
654
      else:
655
        log_name = "call_node_info"
656

    
657
      utils.CheckDict(result.data, {
658
        'memory_total' : '-',
659
        'memory_dom0' : '-',
660
        'memory_free' : '-',
661
        'vg_size' : 'node_unreachable',
662
        'vg_free' : '-',
663
        }, log_name)
664
    return retux
665

    
666
  def call_node_add(self, node, dsa, dsapub, rsa, rsapub, ssh, sshpub):
667
    """Add a node to the cluster.
668

669
    This is a single-node call.
670

671
    """
672
    return self._SingleNodeCall(node, "node_add",
673
                                [dsa, dsapub, rsa, rsapub, ssh, sshpub])
674

    
675
  def call_node_verify(self, node_list, checkdict, cluster_name):
676
    """Request verification of given parameters.
677

678
    This is a multi-node call.
679

680
    """
681
    return self._MultiNodeCall(node_list, "node_verify",
682
                               [checkdict, cluster_name])
683

    
684
  @classmethod
685
  def call_node_start_master(cls, node, start_daemons):
686
    """Tells a node to activate itself as a master.
687

688
    This is a single-node call.
689

690
    """
691
    return cls._StaticSingleNodeCall(node, "node_start_master",
692
                                     [start_daemons])
693

    
694
  @classmethod
695
  def call_node_stop_master(cls, node, stop_daemons):
696
    """Tells a node to demote itself from master status.
697

698
    This is a single-node call.
699

700
    """
701
    return cls._StaticSingleNodeCall(node, "node_stop_master", [stop_daemons])
702

    
703
  @classmethod
704
  def call_master_info(cls, node_list):
705
    """Query master info.
706

707
    This is a multi-node call.
708

709
    """
710
    # TODO: should this method query down nodes?
711
    return cls._StaticMultiNodeCall(node_list, "master_info", [])
712

    
713
  def call_version(self, node_list):
714
    """Query node version.
715

716
    This is a multi-node call.
717

718
    """
719
    return self._MultiNodeCall(node_list, "version", [])
720

    
721
  def call_blockdev_create(self, node, bdev, size, owner, on_primary, info):
722
    """Request creation of a given block device.
723

724
    This is a single-node call.
725

726
    """
727
    return self._SingleNodeCall(node, "blockdev_create",
728
                                [bdev.ToDict(), size, owner, on_primary, info])
729

    
730
  def call_blockdev_remove(self, node, bdev):
731
    """Request removal of a given block device.
732

733
    This is a single-node call.
734

735
    """
736
    return self._SingleNodeCall(node, "blockdev_remove", [bdev.ToDict()])
737

    
738
  def call_blockdev_rename(self, node, devlist):
739
    """Request rename of the given block devices.
740

741
    This is a single-node call.
742

743
    """
744
    return self._SingleNodeCall(node, "blockdev_rename",
745
                                [(d.ToDict(), uid) for d, uid in devlist])
746

    
747
  def call_blockdev_assemble(self, node, disk, owner, on_primary):
748
    """Request assembling of a given block device.
749

750
    This is a single-node call.
751

752
    """
753
    return self._SingleNodeCall(node, "blockdev_assemble",
754
                                [disk.ToDict(), owner, on_primary])
755

    
756
  def call_blockdev_shutdown(self, node, disk):
757
    """Request shutdown of a given block device.
758

759
    This is a single-node call.
760

761
    """
762
    return self._SingleNodeCall(node, "blockdev_shutdown", [disk.ToDict()])
763

    
764
  def call_blockdev_addchildren(self, node, bdev, ndevs):
765
    """Request adding a list of children to a (mirroring) device.
766

767
    This is a single-node call.
768

769
    """
770
    return self._SingleNodeCall(node, "blockdev_addchildren",
771
                                [bdev.ToDict(),
772
                                 [disk.ToDict() for disk in ndevs]])
773

    
774
  def call_blockdev_removechildren(self, node, bdev, ndevs):
775
    """Request removing a list of children from a (mirroring) device.
776

777
    This is a single-node call.
778

779
    """
780
    return self._SingleNodeCall(node, "blockdev_removechildren",
781
                                [bdev.ToDict(),
782
                                 [disk.ToDict() for disk in ndevs]])
783

    
784
  def call_blockdev_getmirrorstatus(self, node, disks):
785
    """Request status of a (mirroring) device.
786

787
    This is a single-node call.
788

789
    """
790
    return self._SingleNodeCall(node, "blockdev_getmirrorstatus",
791
                                [dsk.ToDict() for dsk in disks])
792

    
793
  def call_blockdev_find(self, node, disk):
794
    """Request identification of a given block device.
795

796
    This is a single-node call.
797

798
    """
799
    return self._SingleNodeCall(node, "blockdev_find", [disk.ToDict()])
800

    
801
  def call_blockdev_close(self, node, instance_name, disks):
802
    """Closes the given block devices.
803

804
    This is a single-node call.
805

806
    """
807
    params = [instance_name, [cf.ToDict() for cf in disks]]
808
    return self._SingleNodeCall(node, "blockdev_close", params)
809

    
810
  def call_drbd_disconnect_net(self, node_list, nodes_ip, disks):
811
    """Disconnects the network of the given drbd devices.
812

813
    This is a multi-node call.
814

815
    """
816
    return self._MultiNodeCall(node_list, "drbd_disconnect_net",
817
                               [nodes_ip, [cf.ToDict() for cf in disks]])
818

    
819
  def call_drbd_attach_net(self, node_list, nodes_ip,
820
                           disks, instance_name, multimaster):
821
    """Disconnects the given drbd devices.
822

823
    This is a multi-node call.
824

825
    """
826
    return self._MultiNodeCall(node_list, "drbd_attach_net",
827
                               [nodes_ip, [cf.ToDict() for cf in disks],
828
                                instance_name, multimaster])
829

    
830
  def call_drbd_wait_sync(self, node_list, nodes_ip, disks):
831
    """Waits for the synchronization of drbd devices is complete.
832

833
    This is a multi-node call.
834

835
    """
836
    return self._MultiNodeCall(node_list, "drbd_wait_sync",
837
                               [nodes_ip, [cf.ToDict() for cf in disks]])
838

    
839
  @classmethod
840
  def call_upload_file(cls, node_list, file_name, address_list=None):
841
    """Upload a file.
842

843
    The node will refuse the operation in case the file is not on the
844
    approved file list.
845

846
    This is a multi-node call.
847

848
    @type node_list: list
849
    @param node_list: the list of node names to upload to
850
    @type file_name: str
851
    @param file_name: the filename to upload
852
    @type address_list: list or None
853
    @keyword address_list: an optional list of node addresses, in order
854
        to optimize the RPC speed
855

856
    """
857
    file_contents = utils.ReadFile(file_name)
858
    data = cls._Compress(file_contents)
859
    st = os.stat(file_name)
860
    params = [file_name, data, st.st_mode, st.st_uid, st.st_gid,
861
              st.st_atime, st.st_mtime]
862
    return cls._StaticMultiNodeCall(node_list, "upload_file", params,
863
                                    address_list=address_list)
864

    
865
  @classmethod
866
  def call_write_ssconf_files(cls, node_list, values):
867
    """Write ssconf files.
868

869
    This is a multi-node call.
870

871
    """
872
    return cls._StaticMultiNodeCall(node_list, "write_ssconf_files", [values])
873

    
874
  def call_os_diagnose(self, node_list):
875
    """Request a diagnose of OS definitions.
876

877
    This is a multi-node call.
878

879
    """
880
    result = self._MultiNodeCall(node_list, "os_diagnose", [])
881

    
882
    for node_result in result.values():
883
      if not node_result.failed and node_result.data:
884
        node_result.data = [objects.OS.FromDict(oss)
885
                            for oss in node_result.data]
886
    return result
887

    
888
  def call_os_get(self, node, name):
889
    """Returns an OS definition.
890

891
    This is a single-node call.
892

893
    """
894
    result = self._SingleNodeCall(node, "os_get", [name])
895
    if not result.failed and isinstance(result.data, dict):
896
      result.data = objects.OS.FromDict(result.data)
897
    return result
898

    
899
  def call_hooks_runner(self, node_list, hpath, phase, env):
900
    """Call the hooks runner.
901

902
    Args:
903
      - op: the OpCode instance
904
      - env: a dictionary with the environment
905

906
    This is a multi-node call.
907

908
    """
909
    params = [hpath, phase, env]
910
    return self._MultiNodeCall(node_list, "hooks_runner", params)
911

    
912
  def call_iallocator_runner(self, node, name, idata):
913
    """Call an iallocator on a remote node
914

915
    Args:
916
      - name: the iallocator name
917
      - input: the json-encoded input string
918

919
    This is a single-node call.
920

921
    """
922
    return self._SingleNodeCall(node, "iallocator_runner", [name, idata])
923

    
924
  def call_blockdev_grow(self, node, cf_bdev, amount):
925
    """Request a snapshot of the given block device.
926

927
    This is a single-node call.
928

929
    """
930
    return self._SingleNodeCall(node, "blockdev_grow",
931
                                [cf_bdev.ToDict(), amount])
932

    
933
  def call_blockdev_snapshot(self, node, cf_bdev):
934
    """Request a snapshot of the given block device.
935

936
    This is a single-node call.
937

938
    """
939
    return self._SingleNodeCall(node, "blockdev_snapshot", [cf_bdev.ToDict()])
940

    
941
  def call_snapshot_export(self, node, snap_bdev, dest_node, instance,
942
                           cluster_name, idx):
943
    """Request the export of a given snapshot.
944

945
    This is a single-node call.
946

947
    """
948
    return self._SingleNodeCall(node, "snapshot_export",
949
                                [snap_bdev.ToDict(), dest_node,
950
                                 self._InstDict(instance), cluster_name, idx])
951

    
952
  def call_finalize_export(self, node, instance, snap_disks):
953
    """Request the completion of an export operation.
954

955
    This writes the export config file, etc.
956

957
    This is a single-node call.
958

959
    """
960
    flat_disks = []
961
    for disk in snap_disks:
962
      if isinstance(disk, bool):
963
        flat_disks.append(disk)
964
      else:
965
        flat_disks.append(disk.ToDict())
966

    
967
    return self._SingleNodeCall(node, "finalize_export",
968
                                [self._InstDict(instance), flat_disks])
969

    
970
  def call_export_info(self, node, path):
971
    """Queries the export information in a given path.
972

973
    This is a single-node call.
974

975
    """
976
    result = self._SingleNodeCall(node, "export_info", [path])
977
    if not result.failed and result.data:
978
      result.data = objects.SerializableConfigParser.Loads(str(result.data))
979
    return result
980

    
981
  def call_instance_os_import(self, node, inst, src_node, src_images,
982
                              cluster_name):
983
    """Request the import of a backup into an instance.
984

985
    This is a single-node call.
986

987
    """
988
    return self._SingleNodeCall(node, "instance_os_import",
989
                                [self._InstDict(inst), src_node, src_images,
990
                                 cluster_name])
991

    
992
  def call_export_list(self, node_list):
993
    """Gets the stored exports list.
994

995
    This is a multi-node call.
996

997
    """
998
    return self._MultiNodeCall(node_list, "export_list", [])
999

    
1000
  def call_export_remove(self, node, export):
1001
    """Requests removal of a given export.
1002

1003
    This is a single-node call.
1004

1005
    """
1006
    return self._SingleNodeCall(node, "export_remove", [export])
1007

    
1008
  @classmethod
1009
  def call_node_leave_cluster(cls, node):
1010
    """Requests a node to clean the cluster information it has.
1011

1012
    This will remove the configuration information from the ganeti data
1013
    dir.
1014

1015
    This is a single-node call.
1016

1017
    """
1018
    return cls._StaticSingleNodeCall(node, "node_leave_cluster", [])
1019

    
1020
  def call_node_volumes(self, node_list):
1021
    """Gets all volumes on node(s).
1022

1023
    This is a multi-node call.
1024

1025
    """
1026
    return self._MultiNodeCall(node_list, "node_volumes", [])
1027

    
1028
  def call_node_demote_from_mc(self, node):
1029
    """Demote a node from the master candidate role.
1030

1031
    This is a single-node call.
1032

1033
    """
1034
    return self._SingleNodeCall(node, "node_demote_from_mc", [])
1035

    
1036
  def call_test_delay(self, node_list, duration):
1037
    """Sleep for a fixed time on given node(s).
1038

1039
    This is a multi-node call.
1040

1041
    """
1042
    return self._MultiNodeCall(node_list, "test_delay", [duration])
1043

    
1044
  def call_file_storage_dir_create(self, node, file_storage_dir):
1045
    """Create the given file storage directory.
1046

1047
    This is a single-node call.
1048

1049
    """
1050
    return self._SingleNodeCall(node, "file_storage_dir_create",
1051
                                [file_storage_dir])
1052

    
1053
  def call_file_storage_dir_remove(self, node, file_storage_dir):
1054
    """Remove the given file storage directory.
1055

1056
    This is a single-node call.
1057

1058
    """
1059
    return self._SingleNodeCall(node, "file_storage_dir_remove",
1060
                                [file_storage_dir])
1061

    
1062
  def call_file_storage_dir_rename(self, node, old_file_storage_dir,
1063
                                   new_file_storage_dir):
1064
    """Rename file storage directory.
1065

1066
    This is a single-node call.
1067

1068
    """
1069
    return self._SingleNodeCall(node, "file_storage_dir_rename",
1070
                                [old_file_storage_dir, new_file_storage_dir])
1071

    
1072
  @classmethod
1073
  def call_jobqueue_update(cls, node_list, address_list, file_name, content):
1074
    """Update job queue.
1075

1076
    This is a multi-node call.
1077

1078
    """
1079
    return cls._StaticMultiNodeCall(node_list, "jobqueue_update",
1080
                                    [file_name, cls._Compress(content)],
1081
                                    address_list=address_list)
1082

    
1083
  @classmethod
1084
  def call_jobqueue_purge(cls, node):
1085
    """Purge job queue.
1086

1087
    This is a single-node call.
1088

1089
    """
1090
    return cls._StaticSingleNodeCall(node, "jobqueue_purge", [])
1091

    
1092
  @classmethod
1093
  def call_jobqueue_rename(cls, node_list, address_list, rename):
1094
    """Rename a job queue file.
1095

1096
    This is a multi-node call.
1097

1098
    """
1099
    return cls._StaticMultiNodeCall(node_list, "jobqueue_rename", rename,
1100
                                    address_list=address_list)
1101

    
1102
  @classmethod
1103
  def call_jobqueue_set_drain(cls, node_list, drain_flag):
1104
    """Set the drain flag on the queue.
1105

1106
    This is a multi-node call.
1107

1108
    @type node_list: list
1109
    @param node_list: the list of nodes to query
1110
    @type drain_flag: bool
1111
    @param drain_flag: if True, will set the drain flag, otherwise reset it.
1112

1113
    """
1114
    return cls._StaticMultiNodeCall(node_list, "jobqueue_set_drain",
1115
                                    [drain_flag])
1116

    
1117
  def call_hypervisor_validate_params(self, node_list, hvname, hvparams):
1118
    """Validate the hypervisor params.
1119

1120
    This is a multi-node call.
1121

1122
    @type node_list: list
1123
    @param node_list: the list of nodes to query
1124
    @type hvname: string
1125
    @param hvname: the hypervisor name
1126
    @type hvparams: dict
1127
    @param hvparams: the hypervisor parameters to be validated
1128

1129
    """
1130
    cluster = self._cfg.GetClusterInfo()
1131
    hv_full = cluster.FillDict(cluster.hvparams.get(hvname, {}), hvparams)
1132
    return self._MultiNodeCall(node_list, "hypervisor_validate_params",
1133
                               [hvname, hv_full])