Statistics
| Branch: | Tag: | Revision:

root / lib / rpc.py @ 9afb67fe

History | View | Annotate | Download (32.7 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Inter-node RPC library.
23

24
"""
25

    
26
# pylint: disable-msg=C0103,R0201,R0904
27
# C0103: Invalid name, since call_ are not valid
28
# R0201: Method could be a function, we keep all rpcs instance methods
29
# as not to change them back and forth between static/instance methods
30
# if they need to start using instance attributes
31
# R0904: Too many public methods
32

    
33
import os
34
import socket
35
import logging
36
import zlib
37
import base64
38

    
39
from ganeti import utils
40
from ganeti import objects
41
from ganeti import http
42
from ganeti import serializer
43
from ganeti import constants
44
from ganeti import errors
45

    
46
import ganeti.http.client
47

    
48

    
49
# Module level variable
50
_http_manager = None
51

    
52

    
53
def Init():
54
  """Initializes the module-global HTTP client manager.
55

56
  Must be called before using any RPC function.
57

58
  """
59
  global _http_manager
60

    
61
  assert not _http_manager, "RPC module initialized more than once"
62

    
63
  _http_manager = http.client.HttpClientManager()
64

    
65

    
66
def Shutdown():
67
  """Stops the module-global HTTP client manager.
68

69
  Must be called before quitting the program.
70

71
  """
72
  global _http_manager
73

    
74
  if _http_manager:
75
    _http_manager.Shutdown()
76
    _http_manager = None
77

    
78

    
79
class RpcResult(object):
80
  """RPC Result class.
81

82
  This class holds an RPC result. It is needed since in multi-node
83
  calls we can't raise an exception just because one one out of many
84
  failed, and therefore we use this class to encapsulate the result.
85

86
  @ivar data: the data payload, for successfull results, or None
87
  @type failed: boolean
88
  @ivar failed: whether the operation failed at RPC level (not
89
      application level on the remote node)
90
  @ivar call: the name of the RPC call
91
  @ivar node: the name of the node to which we made the call
92
  @ivar offline: whether the operation failed because the node was
93
      offline, as opposed to actual failure; offline=True will always
94
      imply failed=True, in order to allow simpler checking if
95
      the user doesn't care about the exact failure mode
96

97
  """
98
  def __init__(self, data=None, failed=False, offline=False,
99
               call=None, node=None):
100
    self.failed = failed
101
    self.offline = offline
102
    self.call = call
103
    self.node = node
104
    if offline:
105
      self.failed = True
106
      self.error = "Node is marked offline"
107
      self.data = self.payload = None
108
    elif failed:
109
      self.error = data
110
      self.data = self.payload = None
111
    else:
112
      self.data = data
113
      self.error = None
114
      if isinstance(data, (tuple, list)) and len(data) == 2:
115
        self.payload = data[1]
116
      else:
117
        self.payload = None
118

    
119
  def Raise(self):
120
    """If the result has failed, raise an OpExecError.
121

122
    This is used so that LU code doesn't have to check for each
123
    result, but instead can call this function.
124

125
    """
126
    if self.failed:
127
      raise errors.OpExecError("Call '%s' to node '%s' has failed: %s" %
128
                               (self.call, self.node, self.error))
129

    
130
  def RemoteFailMsg(self):
131
    """Check if the remote procedure failed.
132

133
    This is valid only for RPC calls which return result of the form
134
    (status, data | error_msg).
135

136
    @return: empty string for succcess, otherwise an error message
137

138
    """
139
    def _EnsureErr(val):
140
      """Helper to ensure we return a 'True' value for error."""
141
      if val:
142
        return val
143
      else:
144
        return "No error information"
145

    
146
    if self.failed:
147
      return _EnsureErr(self.error)
148
    if not isinstance(self.data, (tuple, list)):
149
      return "Invalid result type (%s)" % type(self.data)
150
    if len(self.data) != 2:
151
      return "Invalid result length (%d), expected 2" % len(self.data)
152
    if not self.data[0]:
153
      return _EnsureErr(self.data[1])
154
    return ""
155

    
156

    
157
class Client:
158
  """RPC Client class.
159

160
  This class, given a (remote) method name, a list of parameters and a
161
  list of nodes, will contact (in parallel) all nodes, and return a
162
  dict of results (key: node name, value: result).
163

164
  One current bug is that generic failure is still signalled by
165
  'False' result, which is not good. This overloading of values can
166
  cause bugs.
167

168
  """
169
  def __init__(self, procedure, body, port):
170
    self.procedure = procedure
171
    self.body = body
172
    self.port = port
173
    self.nc = {}
174

    
175
    self._ssl_params = \
176
      http.HttpSslParams(ssl_key_path=constants.SSL_CERT_FILE,
177
                         ssl_cert_path=constants.SSL_CERT_FILE)
178

    
179
  def ConnectList(self, node_list, address_list=None):
180
    """Add a list of nodes to the target nodes.
181

182
    @type node_list: list
183
    @param node_list: the list of node names to connect
184
    @type address_list: list or None
185
    @keyword address_list: either None or a list with node addresses,
186
        which must have the same length as the node list
187

188
    """
189
    if address_list is None:
190
      address_list = [None for _ in node_list]
191
    else:
192
      assert len(node_list) == len(address_list), \
193
             "Name and address lists should have the same length"
194
    for node, address in zip(node_list, address_list):
195
      self.ConnectNode(node, address)
196

    
197
  def ConnectNode(self, name, address=None):
198
    """Add a node to the target list.
199

200
    @type name: str
201
    @param name: the node name
202
    @type address: str
203
    @keyword address: the node address, if known
204

205
    """
206
    if address is None:
207
      address = name
208

    
209
    self.nc[name] = \
210
      http.client.HttpClientRequest(address, self.port, http.HTTP_PUT,
211
                                    "/%s" % self.procedure,
212
                                    post_data=self.body,
213
                                    ssl_params=self._ssl_params,
214
                                    ssl_verify_peer=True)
215

    
216
  def GetResults(self):
217
    """Call nodes and return results.
218

219
    @rtype: list
220
    @return: List of RPC results
221

222
    """
223
    assert _http_manager, "RPC module not intialized"
224

    
225
    _http_manager.ExecRequests(self.nc.values())
226

    
227
    results = {}
228

    
229
    for name, req in self.nc.iteritems():
230
      if req.success and req.resp_status_code == http.HTTP_OK:
231
        results[name] = RpcResult(data=serializer.LoadJson(req.resp_body),
232
                                  node=name, call=self.procedure)
233
        continue
234

    
235
      # TODO: Better error reporting
236
      if req.error:
237
        msg = req.error
238
      else:
239
        msg = req.resp_body
240

    
241
      logging.error("RPC error in %s from node %s: %s",
242
                    self.procedure, name, msg)
243
      results[name] = RpcResult(data=msg, failed=True, node=name,
244
                                call=self.procedure)
245

    
246
    return results
247

    
248

    
249
class RpcRunner(object):
250
  """RPC runner class"""
251

    
252
  def __init__(self, cfg):
253
    """Initialized the rpc runner.
254

255
    @type cfg:  C{config.ConfigWriter}
256
    @param cfg: the configuration object that will be used to get data
257
                about the cluster
258

259
    """
260
    self._cfg = cfg
261
    self.port = utils.GetNodeDaemonPort()
262

    
263
  def _InstDict(self, instance):
264
    """Convert the given instance to a dict.
265

266
    This is done via the instance's ToDict() method and additionally
267
    we fill the hvparams with the cluster defaults.
268

269
    @type instance: L{objects.Instance}
270
    @param instance: an Instance object
271
    @rtype: dict
272
    @return: the instance dict, with the hvparams filled with the
273
        cluster defaults
274

275
    """
276
    idict = instance.ToDict()
277
    cluster = self._cfg.GetClusterInfo()
278
    idict["hvparams"] = cluster.FillHV(instance)
279
    idict["beparams"] = cluster.FillBE(instance)
280
    return idict
281

    
282
  def _ConnectList(self, client, node_list, call):
283
    """Helper for computing node addresses.
284

285
    @type client: L{Client}
286
    @param client: a C{Client} instance
287
    @type node_list: list
288
    @param node_list: the node list we should connect
289
    @type call: string
290
    @param call: the name of the remote procedure call, for filling in
291
        correctly any eventual offline nodes' results
292

293
    """
294
    all_nodes = self._cfg.GetAllNodesInfo()
295
    name_list = []
296
    addr_list = []
297
    skip_dict = {}
298
    for node in node_list:
299
      if node in all_nodes:
300
        if all_nodes[node].offline:
301
          skip_dict[node] = RpcResult(node=node, offline=True, call=call)
302
          continue
303
        val = all_nodes[node].primary_ip
304
      else:
305
        val = None
306
      addr_list.append(val)
307
      name_list.append(node)
308
    if name_list:
309
      client.ConnectList(name_list, address_list=addr_list)
310
    return skip_dict
311

    
312
  def _ConnectNode(self, client, node, call):
313
    """Helper for computing one node's address.
314

315
    @type client: L{Client}
316
    @param client: a C{Client} instance
317
    @type node: str
318
    @param node: the node we should connect
319
    @type call: string
320
    @param call: the name of the remote procedure call, for filling in
321
        correctly any eventual offline nodes' results
322

323
    """
324
    node_info = self._cfg.GetNodeInfo(node)
325
    if node_info is not None:
326
      if node_info.offline:
327
        return RpcResult(node=node, offline=True, call=call)
328
      addr = node_info.primary_ip
329
    else:
330
      addr = None
331
    client.ConnectNode(node, address=addr)
332

    
333
  def _MultiNodeCall(self, node_list, procedure, args):
334
    """Helper for making a multi-node call
335

336
    """
337
    body = serializer.DumpJson(args, indent=False)
338
    c = Client(procedure, body, self.port)
339
    skip_dict = self._ConnectList(c, node_list, procedure)
340
    skip_dict.update(c.GetResults())
341
    return skip_dict
342

    
343
  @classmethod
344
  def _StaticMultiNodeCall(cls, node_list, procedure, args,
345
                           address_list=None):
346
    """Helper for making a multi-node static call
347

348
    """
349
    body = serializer.DumpJson(args, indent=False)
350
    c = Client(procedure, body, utils.GetNodeDaemonPort())
351
    c.ConnectList(node_list, address_list=address_list)
352
    return c.GetResults()
353

    
354
  def _SingleNodeCall(self, node, procedure, args):
355
    """Helper for making a single-node call
356

357
    """
358
    body = serializer.DumpJson(args, indent=False)
359
    c = Client(procedure, body, self.port)
360
    result = self._ConnectNode(c, node, procedure)
361
    if result is None:
362
      # we did connect, node is not offline
363
      result = c.GetResults()[node]
364
    return result
365

    
366
  @classmethod
367
  def _StaticSingleNodeCall(cls, node, procedure, args):
368
    """Helper for making a single-node static call
369

370
    """
371
    body = serializer.DumpJson(args, indent=False)
372
    c = Client(procedure, body, utils.GetNodeDaemonPort())
373
    c.ConnectNode(node)
374
    return c.GetResults()[node]
375

    
376
  @staticmethod
377
  def _Compress(data):
378
    """Compresses a string for transport over RPC.
379

380
    Small amounts of data are not compressed.
381

382
    @type data: str
383
    @param data: Data
384
    @rtype: tuple
385
    @return: Encoded data to send
386

387
    """
388
    # Small amounts of data are not compressed
389
    if len(data) < 512:
390
      return (constants.RPC_ENCODING_NONE, data)
391

    
392
    # Compress with zlib and encode in base64
393
    return (constants.RPC_ENCODING_ZLIB_BASE64,
394
            base64.b64encode(zlib.compress(data, 3)))
395

    
396
  #
397
  # Begin RPC calls
398
  #
399

    
400
  def call_volume_list(self, node_list, vg_name):
401
    """Gets the logical volumes present in a given volume group.
402

403
    This is a multi-node call.
404

405
    """
406
    return self._MultiNodeCall(node_list, "volume_list", [vg_name])
407

    
408
  def call_vg_list(self, node_list):
409
    """Gets the volume group list.
410

411
    This is a multi-node call.
412

413
    """
414
    return self._MultiNodeCall(node_list, "vg_list", [])
415

    
416
  def call_bridges_exist(self, node, bridges_list):
417
    """Checks if a node has all the bridges given.
418

419
    This method checks if all bridges given in the bridges_list are
420
    present on the remote node, so that an instance that uses interfaces
421
    on those bridges can be started.
422

423
    This is a single-node call.
424

425
    """
426
    return self._SingleNodeCall(node, "bridges_exist", [bridges_list])
427

    
428
  def call_instance_start(self, node, instance):
429
    """Starts an instance.
430

431
    This is a single-node call.
432

433
    """
434
    return self._SingleNodeCall(node, "instance_start",
435
                                [self._InstDict(instance)])
436

    
437
  def call_instance_shutdown(self, node, instance):
438
    """Stops an instance.
439

440
    This is a single-node call.
441

442
    """
443
    return self._SingleNodeCall(node, "instance_shutdown",
444
                                [self._InstDict(instance)])
445

    
446
  def call_migration_info(self, node, instance):
447
    """Gather the information necessary to prepare an instance migration.
448

449
    This is a single-node call.
450

451
    @type node: string
452
    @param node: the node on which the instance is currently running
453
    @type instance: C{objects.Instance}
454
    @param instance: the instance definition
455

456
    """
457
    return self._SingleNodeCall(node, "migration_info",
458
                                [self._InstDict(instance)])
459

    
460
  def call_accept_instance(self, node, instance, info, target):
461
    """Prepare a node to accept an instance.
462

463
    This is a single-node call.
464

465
    @type node: string
466
    @param node: the target node for the migration
467
    @type instance: C{objects.Instance}
468
    @param instance: the instance definition
469
    @type info: opaque/hypervisor specific (string/data)
470
    @param info: result for the call_migration_info call
471
    @type target: string
472
    @param target: target hostname (usually ip address) (on the node itself)
473

474
    """
475
    return self._SingleNodeCall(node, "accept_instance",
476
                                [self._InstDict(instance), info, target])
477

    
478
  def call_finalize_migration(self, node, instance, info, success):
479
    """Finalize any target-node migration specific operation.
480

481
    This is called both in case of a successful migration and in case of error
482
    (in which case it should abort the migration).
483

484
    This is a single-node call.
485

486
    @type node: string
487
    @param node: the target node for the migration
488
    @type instance: C{objects.Instance}
489
    @param instance: the instance definition
490
    @type info: opaque/hypervisor specific (string/data)
491
    @param info: result for the call_migration_info call
492
    @type success: boolean
493
    @param success: whether the migration was a success or a failure
494

495
    """
496
    return self._SingleNodeCall(node, "finalize_migration",
497
                                [self._InstDict(instance), info, success])
498

    
499
  def call_instance_migrate(self, node, instance, target, live):
500
    """Migrate an instance.
501

502
    This is a single-node call.
503

504
    @type node: string
505
    @param node: the node on which the instance is currently running
506
    @type instance: C{objects.Instance}
507
    @param instance: the instance definition
508
    @type target: string
509
    @param target: the target node name
510
    @type live: boolean
511
    @param live: whether the migration should be done live or not (the
512
        interpretation of this parameter is left to the hypervisor)
513

514
    """
515
    return self._SingleNodeCall(node, "instance_migrate",
516
                                [self._InstDict(instance), target, live])
517

    
518
  def call_instance_reboot(self, node, instance, reboot_type):
519
    """Reboots an instance.
520

521
    This is a single-node call.
522

523
    """
524
    return self._SingleNodeCall(node, "instance_reboot",
525
                                [self._InstDict(instance), reboot_type])
526

    
527
  def call_instance_os_add(self, node, inst):
528
    """Installs an OS on the given instance.
529

530
    This is a single-node call.
531

532
    """
533
    return self._SingleNodeCall(node, "instance_os_add",
534
                                [self._InstDict(inst)])
535

    
536
  def call_instance_run_rename(self, node, inst, old_name):
537
    """Run the OS rename script for an instance.
538

539
    This is a single-node call.
540

541
    """
542
    return self._SingleNodeCall(node, "instance_run_rename",
543
                                [self._InstDict(inst), old_name])
544

    
545
  def call_instance_info(self, node, instance, hname):
546
    """Returns information about a single instance.
547

548
    This is a single-node call.
549

550
    @type node: list
551
    @param node: the list of nodes to query
552
    @type instance: string
553
    @param instance: the instance name
554
    @type hname: string
555
    @param hname: the hypervisor type of the instance
556

557
    """
558
    return self._SingleNodeCall(node, "instance_info", [instance, hname])
559

    
560
  def call_instance_migratable(self, node, instance):
561
    """Checks whether the given instance can be migrated.
562

563
    This is a single-node call.
564

565
    @param node: the node to query
566
    @type instance: L{objects.Instance}
567
    @param instance: the instance to check
568

569

570
    """
571
    return self._SingleNodeCall(node, "instance_migratable",
572
                                [self._InstDict(instance)])
573

    
574
  def call_all_instances_info(self, node_list, hypervisor_list):
575
    """Returns information about all instances on the given nodes.
576

577
    This is a multi-node call.
578

579
    @type node_list: list
580
    @param node_list: the list of nodes to query
581
    @type hypervisor_list: list
582
    @param hypervisor_list: the hypervisors to query for instances
583

584
    """
585
    return self._MultiNodeCall(node_list, "all_instances_info",
586
                               [hypervisor_list])
587

    
588
  def call_instance_list(self, node_list, hypervisor_list):
589
    """Returns the list of running instances on a given node.
590

591
    This is a multi-node call.
592

593
    @type node_list: list
594
    @param node_list: the list of nodes to query
595
    @type hypervisor_list: list
596
    @param hypervisor_list: the hypervisors to query for instances
597

598
    """
599
    return self._MultiNodeCall(node_list, "instance_list", [hypervisor_list])
600

    
601
  def call_node_tcp_ping(self, node, source, target, port, timeout,
602
                         live_port_needed):
603
    """Do a TcpPing on the remote node
604

605
    This is a single-node call.
606

607
    """
608
    return self._SingleNodeCall(node, "node_tcp_ping",
609
                                [source, target, port, timeout,
610
                                 live_port_needed])
611

    
612
  def call_node_has_ip_address(self, node, address):
613
    """Checks if a node has the given IP address.
614

615
    This is a single-node call.
616

617
    """
618
    return self._SingleNodeCall(node, "node_has_ip_address", [address])
619

    
620
  def call_node_info(self, node_list, vg_name, hypervisor_type):
621
    """Return node information.
622

623
    This will return memory information and volume group size and free
624
    space.
625

626
    This is a multi-node call.
627

628
    @type node_list: list
629
    @param node_list: the list of nodes to query
630
    @type vg_name: C{string}
631
    @param vg_name: the name of the volume group to ask for disk space
632
        information
633
    @type hypervisor_type: C{str}
634
    @param hypervisor_type: the name of the hypervisor to ask for
635
        memory information
636

637
    """
638
    retux = self._MultiNodeCall(node_list, "node_info",
639
                                [vg_name, hypervisor_type])
640

    
641
    for result in retux.itervalues():
642
      if result.failed or not isinstance(result.data, dict):
643
        result.data = {}
644
      if result.offline:
645
        log_name = None
646
      else:
647
        log_name = "call_node_info"
648

    
649
      utils.CheckDict(result.data, {
650
        'memory_total' : '-',
651
        'memory_dom0' : '-',
652
        'memory_free' : '-',
653
        'vg_size' : 'node_unreachable',
654
        'vg_free' : '-',
655
        }, log_name)
656
    return retux
657

    
658
  def call_node_add(self, node, dsa, dsapub, rsa, rsapub, ssh, sshpub):
659
    """Add a node to the cluster.
660

661
    This is a single-node call.
662

663
    """
664
    return self._SingleNodeCall(node, "node_add",
665
                                [dsa, dsapub, rsa, rsapub, ssh, sshpub])
666

    
667
  def call_node_verify(self, node_list, checkdict, cluster_name):
668
    """Request verification of given parameters.
669

670
    This is a multi-node call.
671

672
    """
673
    return self._MultiNodeCall(node_list, "node_verify",
674
                               [checkdict, cluster_name])
675

    
676
  @classmethod
677
  def call_node_start_master(cls, node, start_daemons):
678
    """Tells a node to activate itself as a master.
679

680
    This is a single-node call.
681

682
    """
683
    return cls._StaticSingleNodeCall(node, "node_start_master",
684
                                     [start_daemons])
685

    
686
  @classmethod
687
  def call_node_stop_master(cls, node, stop_daemons):
688
    """Tells a node to demote itself from master status.
689

690
    This is a single-node call.
691

692
    """
693
    return cls._StaticSingleNodeCall(node, "node_stop_master", [stop_daemons])
694

    
695
  @classmethod
696
  def call_master_info(cls, node_list):
697
    """Query master info.
698

699
    This is a multi-node call.
700

701
    """
702
    # TODO: should this method query down nodes?
703
    return cls._StaticMultiNodeCall(node_list, "master_info", [])
704

    
705
  def call_version(self, node_list):
706
    """Query node version.
707

708
    This is a multi-node call.
709

710
    """
711
    return self._MultiNodeCall(node_list, "version", [])
712

    
713
  def call_blockdev_create(self, node, bdev, size, owner, on_primary, info):
714
    """Request creation of a given block device.
715

716
    This is a single-node call.
717

718
    """
719
    return self._SingleNodeCall(node, "blockdev_create",
720
                                [bdev.ToDict(), size, owner, on_primary, info])
721

    
722
  def call_blockdev_remove(self, node, bdev):
723
    """Request removal of a given block device.
724

725
    This is a single-node call.
726

727
    """
728
    return self._SingleNodeCall(node, "blockdev_remove", [bdev.ToDict()])
729

    
730
  def call_blockdev_rename(self, node, devlist):
731
    """Request rename of the given block devices.
732

733
    This is a single-node call.
734

735
    """
736
    return self._SingleNodeCall(node, "blockdev_rename",
737
                                [(d.ToDict(), uid) for d, uid in devlist])
738

    
739
  def call_blockdev_assemble(self, node, disk, owner, on_primary):
740
    """Request assembling of a given block device.
741

742
    This is a single-node call.
743

744
    """
745
    return self._SingleNodeCall(node, "blockdev_assemble",
746
                                [disk.ToDict(), owner, on_primary])
747

    
748
  def call_blockdev_shutdown(self, node, disk):
749
    """Request shutdown of a given block device.
750

751
    This is a single-node call.
752

753
    """
754
    return self._SingleNodeCall(node, "blockdev_shutdown", [disk.ToDict()])
755

    
756
  def call_blockdev_addchildren(self, node, bdev, ndevs):
757
    """Request adding a list of children to a (mirroring) device.
758

759
    This is a single-node call.
760

761
    """
762
    return self._SingleNodeCall(node, "blockdev_addchildren",
763
                                [bdev.ToDict(),
764
                                 [disk.ToDict() for disk in ndevs]])
765

    
766
  def call_blockdev_removechildren(self, node, bdev, ndevs):
767
    """Request removing a list of children from a (mirroring) device.
768

769
    This is a single-node call.
770

771
    """
772
    return self._SingleNodeCall(node, "blockdev_removechildren",
773
                                [bdev.ToDict(),
774
                                 [disk.ToDict() for disk in ndevs]])
775

    
776
  def call_blockdev_getmirrorstatus(self, node, disks):
777
    """Request status of a (mirroring) device.
778

779
    This is a single-node call.
780

781
    """
782
    return self._SingleNodeCall(node, "blockdev_getmirrorstatus",
783
                                [dsk.ToDict() for dsk in disks])
784

    
785
  def call_blockdev_find(self, node, disk):
786
    """Request identification of a given block device.
787

788
    This is a single-node call.
789

790
    """
791
    return self._SingleNodeCall(node, "blockdev_find", [disk.ToDict()])
792

    
793
  def call_blockdev_close(self, node, instance_name, disks):
794
    """Closes the given block devices.
795

796
    This is a single-node call.
797

798
    """
799
    params = [instance_name, [cf.ToDict() for cf in disks]]
800
    return self._SingleNodeCall(node, "blockdev_close", params)
801

    
802
  def call_drbd_disconnect_net(self, node_list, nodes_ip, disks):
803
    """Disconnects the network of the given drbd devices.
804

805
    This is a multi-node call.
806

807
    """
808
    return self._MultiNodeCall(node_list, "drbd_disconnect_net",
809
                               [nodes_ip, [cf.ToDict() for cf in disks]])
810

    
811
  def call_drbd_attach_net(self, node_list, nodes_ip,
812
                           disks, instance_name, multimaster):
813
    """Disconnects the given drbd devices.
814

815
    This is a multi-node call.
816

817
    """
818
    return self._MultiNodeCall(node_list, "drbd_attach_net",
819
                               [nodes_ip, [cf.ToDict() for cf in disks],
820
                                instance_name, multimaster])
821

    
822
  def call_drbd_wait_sync(self, node_list, nodes_ip, disks):
823
    """Waits for the synchronization of drbd devices is complete.
824

825
    This is a multi-node call.
826

827
    """
828
    return self._MultiNodeCall(node_list, "drbd_wait_sync",
829
                               [nodes_ip, [cf.ToDict() for cf in disks]])
830

    
831
  @classmethod
832
  def call_upload_file(cls, node_list, file_name, address_list=None):
833
    """Upload a file.
834

835
    The node will refuse the operation in case the file is not on the
836
    approved file list.
837

838
    This is a multi-node call.
839

840
    @type node_list: list
841
    @param node_list: the list of node names to upload to
842
    @type file_name: str
843
    @param file_name: the filename to upload
844
    @type address_list: list or None
845
    @keyword address_list: an optional list of node addresses, in order
846
        to optimize the RPC speed
847

848
    """
849
    file_contents = utils.ReadFile(file_name)
850
    data = cls._Compress(file_contents)
851
    st = os.stat(file_name)
852
    params = [file_name, data, st.st_mode, st.st_uid, st.st_gid,
853
              st.st_atime, st.st_mtime]
854
    return cls._StaticMultiNodeCall(node_list, "upload_file", params,
855
                                    address_list=address_list)
856

    
857
  @classmethod
858
  def call_write_ssconf_files(cls, node_list, values):
859
    """Write ssconf files.
860

861
    This is a multi-node call.
862

863
    """
864
    return cls._StaticMultiNodeCall(node_list, "write_ssconf_files", [values])
865

    
866
  def call_os_diagnose(self, node_list):
867
    """Request a diagnose of OS definitions.
868

869
    This is a multi-node call.
870

871
    """
872
    result = self._MultiNodeCall(node_list, "os_diagnose", [])
873

    
874
    for node_result in result.values():
875
      if not node_result.failed and node_result.data:
876
        node_result.data = [objects.OS.FromDict(oss)
877
                            for oss in node_result.data]
878
    return result
879

    
880
  def call_os_get(self, node, name):
881
    """Returns an OS definition.
882

883
    This is a single-node call.
884

885
    """
886
    result = self._SingleNodeCall(node, "os_get", [name])
887
    if not result.failed and isinstance(result.data, dict):
888
      result.data = objects.OS.FromDict(result.data)
889
    return result
890

    
891
  def call_hooks_runner(self, node_list, hpath, phase, env):
892
    """Call the hooks runner.
893

894
    Args:
895
      - op: the OpCode instance
896
      - env: a dictionary with the environment
897

898
    This is a multi-node call.
899

900
    """
901
    params = [hpath, phase, env]
902
    return self._MultiNodeCall(node_list, "hooks_runner", params)
903

    
904
  def call_iallocator_runner(self, node, name, idata):
905
    """Call an iallocator on a remote node
906

907
    Args:
908
      - name: the iallocator name
909
      - input: the json-encoded input string
910

911
    This is a single-node call.
912

913
    """
914
    return self._SingleNodeCall(node, "iallocator_runner", [name, idata])
915

    
916
  def call_blockdev_grow(self, node, cf_bdev, amount):
917
    """Request a snapshot of the given block device.
918

919
    This is a single-node call.
920

921
    """
922
    return self._SingleNodeCall(node, "blockdev_grow",
923
                                [cf_bdev.ToDict(), amount])
924

    
925
  def call_blockdev_snapshot(self, node, cf_bdev):
926
    """Request a snapshot of the given block device.
927

928
    This is a single-node call.
929

930
    """
931
    return self._SingleNodeCall(node, "blockdev_snapshot", [cf_bdev.ToDict()])
932

    
933
  def call_snapshot_export(self, node, snap_bdev, dest_node, instance,
934
                           cluster_name, idx):
935
    """Request the export of a given snapshot.
936

937
    This is a single-node call.
938

939
    """
940
    return self._SingleNodeCall(node, "snapshot_export",
941
                                [snap_bdev.ToDict(), dest_node,
942
                                 self._InstDict(instance), cluster_name, idx])
943

    
944
  def call_finalize_export(self, node, instance, snap_disks):
945
    """Request the completion of an export operation.
946

947
    This writes the export config file, etc.
948

949
    This is a single-node call.
950

951
    """
952
    flat_disks = []
953
    for disk in snap_disks:
954
      flat_disks.append(disk.ToDict())
955

    
956
    return self._SingleNodeCall(node, "finalize_export",
957
                                [self._InstDict(instance), flat_disks])
958

    
959
  def call_export_info(self, node, path):
960
    """Queries the export information in a given path.
961

962
    This is a single-node call.
963

964
    """
965
    result = self._SingleNodeCall(node, "export_info", [path])
966
    if not result.failed and result.data:
967
      result.data = objects.SerializableConfigParser.Loads(str(result.data))
968
    return result
969

    
970
  def call_instance_os_import(self, node, inst, src_node, src_images,
971
                              cluster_name):
972
    """Request the import of a backup into an instance.
973

974
    This is a single-node call.
975

976
    """
977
    return self._SingleNodeCall(node, "instance_os_import",
978
                                [self._InstDict(inst), src_node, src_images,
979
                                 cluster_name])
980

    
981
  def call_export_list(self, node_list):
982
    """Gets the stored exports list.
983

984
    This is a multi-node call.
985

986
    """
987
    return self._MultiNodeCall(node_list, "export_list", [])
988

    
989
  def call_export_remove(self, node, export):
990
    """Requests removal of a given export.
991

992
    This is a single-node call.
993

994
    """
995
    return self._SingleNodeCall(node, "export_remove", [export])
996

    
997
  @classmethod
998
  def call_node_leave_cluster(cls, node):
999
    """Requests a node to clean the cluster information it has.
1000

1001
    This will remove the configuration information from the ganeti data
1002
    dir.
1003

1004
    This is a single-node call.
1005

1006
    """
1007
    return cls._StaticSingleNodeCall(node, "node_leave_cluster", [])
1008

    
1009
  def call_node_volumes(self, node_list):
1010
    """Gets all volumes on node(s).
1011

1012
    This is a multi-node call.
1013

1014
    """
1015
    return self._MultiNodeCall(node_list, "node_volumes", [])
1016

    
1017
  def call_node_demote_from_mc(self, node):
1018
    """Demote a node from the master candidate role.
1019

1020
    This is a single-node call.
1021

1022
    """
1023
    return self._SingleNodeCall(node, "node_demote_from_mc", [])
1024

    
1025
  def call_test_delay(self, node_list, duration):
1026
    """Sleep for a fixed time on given node(s).
1027

1028
    This is a multi-node call.
1029

1030
    """
1031
    return self._MultiNodeCall(node_list, "test_delay", [duration])
1032

    
1033
  def call_file_storage_dir_create(self, node, file_storage_dir):
1034
    """Create the given file storage directory.
1035

1036
    This is a single-node call.
1037

1038
    """
1039
    return self._SingleNodeCall(node, "file_storage_dir_create",
1040
                                [file_storage_dir])
1041

    
1042
  def call_file_storage_dir_remove(self, node, file_storage_dir):
1043
    """Remove the given file storage directory.
1044

1045
    This is a single-node call.
1046

1047
    """
1048
    return self._SingleNodeCall(node, "file_storage_dir_remove",
1049
                                [file_storage_dir])
1050

    
1051
  def call_file_storage_dir_rename(self, node, old_file_storage_dir,
1052
                                   new_file_storage_dir):
1053
    """Rename file storage directory.
1054

1055
    This is a single-node call.
1056

1057
    """
1058
    return self._SingleNodeCall(node, "file_storage_dir_rename",
1059
                                [old_file_storage_dir, new_file_storage_dir])
1060

    
1061
  @classmethod
1062
  def call_jobqueue_update(cls, node_list, address_list, file_name, content):
1063
    """Update job queue.
1064

1065
    This is a multi-node call.
1066

1067
    """
1068
    return cls._StaticMultiNodeCall(node_list, "jobqueue_update",
1069
                                    [file_name, cls._Compress(content)],
1070
                                    address_list=address_list)
1071

    
1072
  @classmethod
1073
  def call_jobqueue_purge(cls, node):
1074
    """Purge job queue.
1075

1076
    This is a single-node call.
1077

1078
    """
1079
    return cls._StaticSingleNodeCall(node, "jobqueue_purge", [])
1080

    
1081
  @classmethod
1082
  def call_jobqueue_rename(cls, node_list, address_list, rename):
1083
    """Rename a job queue file.
1084

1085
    This is a multi-node call.
1086

1087
    """
1088
    return cls._StaticMultiNodeCall(node_list, "jobqueue_rename", rename,
1089
                                    address_list=address_list)
1090

    
1091
  @classmethod
1092
  def call_jobqueue_set_drain(cls, node_list, drain_flag):
1093
    """Set the drain flag on the queue.
1094

1095
    This is a multi-node call.
1096

1097
    @type node_list: list
1098
    @param node_list: the list of nodes to query
1099
    @type drain_flag: bool
1100
    @param drain_flag: if True, will set the drain flag, otherwise reset it.
1101

1102
    """
1103
    return cls._StaticMultiNodeCall(node_list, "jobqueue_set_drain",
1104
                                    [drain_flag])
1105

    
1106
  def call_hypervisor_validate_params(self, node_list, hvname, hvparams):
1107
    """Validate the hypervisor params.
1108

1109
    This is a multi-node call.
1110

1111
    @type node_list: list
1112
    @param node_list: the list of nodes to query
1113
    @type hvname: string
1114
    @param hvname: the hypervisor name
1115
    @type hvparams: dict
1116
    @param hvparams: the hypervisor parameters to be validated
1117

1118
    """
1119
    cluster = self._cfg.GetClusterInfo()
1120
    hv_full = cluster.FillDict(cluster.hvparams.get(hvname, {}), hvparams)
1121
    return self._MultiNodeCall(node_list, "hypervisor_validate_params",
1122
                               [hvname, hv_full])