Statistics
| Branch: | Tag: | Revision:

root / lib / rpc.py @ 781de953

History | View | Annotate | Download (26.4 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Inter-node RPC library.
23

24
"""
25

    
26
# pylint: disable-msg=C0103,R0201,R0904
27
# C0103: Invalid name, since call_ are not valid
28
# R0201: Method could be a function, we keep all rpcs instance methods
29
# as not to change them back and forth between static/instance methods
30
# if they need to start using instance attributes
31
# R0904: Too many public methods
32

    
33
import os
34
import socket
35
import httplib
36
import logging
37

    
38
from ganeti import utils
39
from ganeti import objects
40
from ganeti import http
41
from ganeti import serializer
42
from ganeti import constants
43
from ganeti import errors
44

    
45

    
46
# Module level variable
47
_http_manager = None
48

    
49

    
50
def Init():
51
  """Initializes the module-global HTTP client manager.
52

53
  Must be called before using any RPC function.
54

55
  """
56
  global _http_manager
57

    
58
  assert not _http_manager, "RPC module initialized more than once"
59

    
60
  _http_manager = http.HttpClientManager()
61

    
62

    
63
def Shutdown():
64
  """Stops the module-global HTTP client manager.
65

66
  Must be called before quitting the program.
67

68
  """
69
  global _http_manager
70

    
71
  if _http_manager:
72
    _http_manager.Shutdown()
73
    _http_manager = None
74

    
75

    
76
class RpcResult(object):
77
  """RPC Result class.
78

79
  This class holds an RPC result. It is needed since in multi-node
80
  calls we can't raise an exception just because one one out of many
81
  failed, and therefore we use this class to encapsulate the result.
82

83
  """
84
  def __init__(self, data, failed=False, call=None, node=None):
85
    self.failed = failed
86
    self.call = None
87
    self.node = None
88
    if failed:
89
      self.error = data
90
      self.data = None
91
    else:
92
      self.data = data
93
      self.error = None
94

    
95
  def Raise(self):
96
    """If the result has failed, raise an OpExecError.
97

98
    This is used so that LU code doesn't have to check for each
99
    result, but instead can call this function.
100

101
    """
102
    if self.failed:
103
      raise errors.OpExecError("Call '%s' to node '%s' has failed: %s" %
104
                               (self.call, self.node, self.error))
105

    
106

    
107
class Client:
108
  """RPC Client class.
109

110
  This class, given a (remote) method name, a list of parameters and a
111
  list of nodes, will contact (in parallel) all nodes, and return a
112
  dict of results (key: node name, value: result).
113

114
  One current bug is that generic failure is still signalled by
115
  'False' result, which is not good. This overloading of values can
116
  cause bugs.
117

118
  """
119
  def __init__(self, procedure, body, port):
120
    self.procedure = procedure
121
    self.body = body
122
    self.port = port
123
    self.nc = {}
124

    
125
    self._ssl_params = \
126
      http.HttpSslParams(ssl_key_path=constants.SSL_CERT_FILE,
127
                         ssl_cert_path=constants.SSL_CERT_FILE)
128

    
129
  def ConnectList(self, node_list, address_list=None):
130
    """Add a list of nodes to the target nodes.
131

132
    @type node_list: list
133
    @param node_list: the list of node names to connect
134
    @type address_list: list or None
135
    @keyword address_list: either None or a list with node addresses,
136
        which must have the same length as the node list
137

138
    """
139
    if address_list is None:
140
      address_list = [None for _ in node_list]
141
    else:
142
      assert len(node_list) == len(address_list), \
143
             "Name and address lists should have the same length"
144
    for node, address in zip(node_list, address_list):
145
      self.ConnectNode(node, address)
146

    
147
  def ConnectNode(self, name, address=None):
148
    """Add a node to the target list.
149

150
    @type name: str
151
    @param name: the node name
152
    @type address: str
153
    @keyword address: the node address, if known
154

155
    """
156
    if address is None:
157
      address = name
158

    
159
    self.nc[name] = http.HttpClientRequest(address, self.port, http.HTTP_PUT,
160
                                           "/%s" % self.procedure,
161
                                           post_data=self.body,
162
                                           ssl_params=self._ssl_params,
163
                                           ssl_verify_peer=True)
164

    
165
  def GetResults(self):
166
    """Call nodes and return results.
167

168
    @rtype: list
169
    @returns: List of RPC results
170

171
    """
172
    assert _http_manager, "RPC module not intialized"
173

    
174
    _http_manager.ExecRequests(self.nc.values())
175

    
176
    results = {}
177

    
178
    for name, req in self.nc.iteritems():
179
      if req.success and req.resp_status == http.HTTP_OK:
180
        results[name] = RpcResult(data=serializer.LoadJson(req.resp_body),
181
                                  node=name, call=self.procedure)
182
        continue
183

    
184
      # TODO: Better error reporting
185
      if req.error:
186
        msg = req.error
187
      else:
188
        msg = req.resp_body
189

    
190
      logging.error("RPC error from node %s: %s", name, msg)
191
      results[name] = RpcResult(data=msg, failed=True, node=name,
192
                                call=self.procedure)
193

    
194
    return results
195

    
196

    
197
class RpcRunner(object):
198
  """RPC runner class"""
199

    
200
  def __init__(self, cfg):
201
    """Initialized the rpc runner.
202

203
    @type cfg:  C{config.ConfigWriter}
204
    @param cfg: the configuration object that will be used to get data
205
                about the cluster
206

207
    """
208
    self._cfg = cfg
209
    self.port = utils.GetNodeDaemonPort()
210

    
211
  def _InstDict(self, instance):
212
    """Convert the given instance to a dict.
213

214
    This is done via the instance's ToDict() method and additionally
215
    we fill the hvparams with the cluster defaults.
216

217
    @type instance: L{objects.Instance}
218
    @param instance: an Instance object
219
    @rtype: dict
220
    @return: the instance dict, with the hvparams filled with the
221
        cluster defaults
222

223
    """
224
    idict = instance.ToDict()
225
    cluster = self._cfg.GetClusterInfo()
226
    idict["hvparams"] = cluster.FillHV(instance)
227
    idict["beparams"] = cluster.FillBE(instance)
228
    return idict
229

    
230
  def _ConnectList(self, client, node_list):
231
    """Helper for computing node addresses.
232

233
    @type client: L{Client}
234
    @param client: a C{Client} instance
235
    @type node_list: list
236
    @param node_list: the node list we should connect
237

238
    """
239
    all_nodes = self._cfg.GetAllNodesInfo()
240
    addr_list = []
241
    for node in node_list:
242
      if node in all_nodes:
243
        val = all_nodes[node].primary_ip
244
      else:
245
        val = None
246
      addr_list.append(val)
247
    client.ConnectList(node_list, address_list=addr_list)
248

    
249
  def _ConnectNode(self, client, node):
250
    """Helper for computing one node's address.
251

252
    @type client: L{Client}
253
    @param client: a C{Client} instance
254
    @type node: str
255
    @param node: the node we should connect
256

257
    """
258
    node_info = self._cfg.GetNodeInfo(node)
259
    if node_info is not None:
260
      addr = node_info.primary_ip
261
    else:
262
      addr = None
263
    client.ConnectNode(node, address=addr)
264

    
265
  def _MultiNodeCall(self, node_list, procedure, args,
266
                     address_list=None):
267
    """Helper for making a multi-node call
268

269
    """
270
    body = serializer.DumpJson(args, indent=False)
271
    c = Client(procedure, body, self.port)
272
    if address_list is None:
273
      self._ConnectList(c, node_list)
274
    else:
275
      c.ConnectList(node_list, address_list=address_list)
276
    return c.GetResults()
277

    
278
  @classmethod
279
  def _StaticMultiNodeCall(cls, node_list, procedure, args,
280
                           address_list=None):
281
    """Helper for making a multi-node static call
282

283
    """
284
    body = serializer.DumpJson(args, indent=False)
285
    c = Client(procedure, body, utils.GetNodeDaemonPort())
286
    c.ConnectList(node_list, address_list=address_list)
287
    return c.GetResults()
288

    
289
  def _SingleNodeCall(self, node, procedure, args):
290
    """Helper for making a single-node call
291

292
    """
293
    body = serializer.DumpJson(args, indent=False)
294
    c = Client(procedure, body, self.port)
295
    self._ConnectNode(c, node)
296
    return c.GetResults().get(node, False)
297

    
298
  @classmethod
299
  def _StaticSingleNodeCall(cls, node, procedure, args):
300
    """Helper for making a single-node static call
301

302
    """
303
    body = serializer.DumpJson(args, indent=False)
304
    c = Client(procedure, body, utils.GetNodeDaemonPort())
305
    c.ConnectNode(node)
306
    return c.GetResults().get(node, False)
307

    
308
  #
309
  # Begin RPC calls
310
  #
311

    
312
  def call_volume_list(self, node_list, vg_name):
313
    """Gets the logical volumes present in a given volume group.
314

315
    This is a multi-node call.
316

317
    """
318
    return self._MultiNodeCall(node_list, "volume_list", [vg_name])
319

    
320
  def call_vg_list(self, node_list):
321
    """Gets the volume group list.
322

323
    This is a multi-node call.
324

325
    """
326
    return self._MultiNodeCall(node_list, "vg_list", [])
327

    
328
  def call_bridges_exist(self, node, bridges_list):
329
    """Checks if a node has all the bridges given.
330

331
    This method checks if all bridges given in the bridges_list are
332
    present on the remote node, so that an instance that uses interfaces
333
    on those bridges can be started.
334

335
    This is a single-node call.
336

337
    """
338
    return self._SingleNodeCall(node, "bridges_exist", [bridges_list])
339

    
340
  def call_instance_start(self, node, instance, extra_args):
341
    """Starts an instance.
342

343
    This is a single-node call.
344

345
    """
346
    return self._SingleNodeCall(node, "instance_start",
347
                                [self._InstDict(instance), extra_args])
348

    
349
  def call_instance_shutdown(self, node, instance):
350
    """Stops an instance.
351

352
    This is a single-node call.
353

354
    """
355
    return self._SingleNodeCall(node, "instance_shutdown",
356
                                [self._InstDict(instance)])
357

    
358
  def call_instance_migrate(self, node, instance, target, live):
359
    """Migrate an instance.
360

361
    This is a single-node call.
362

363
    @type node: string
364
    @param node: the node on which the instance is currently running
365
    @type instance: C{objects.Instance}
366
    @param instance: the instance definition
367
    @type target: string
368
    @param target: the target node name
369
    @type live: boolean
370
    @param live: whether the migration should be done live or not (the
371
        interpretation of this parameter is left to the hypervisor)
372

373
    """
374
    return self._SingleNodeCall(node, "instance_migrate",
375
                                [self._InstDict(instance), target, live])
376

    
377
  def call_instance_reboot(self, node, instance, reboot_type, extra_args):
378
    """Reboots an instance.
379

380
    This is a single-node call.
381

382
    """
383
    return self._SingleNodeCall(node, "instance_reboot",
384
                                [self._InstDict(instance), reboot_type,
385
                                 extra_args])
386

    
387
  def call_instance_os_add(self, node, inst):
388
    """Installs an OS on the given instance.
389

390
    This is a single-node call.
391

392
    """
393
    return self._SingleNodeCall(node, "instance_os_add",
394
                                [self._InstDict(inst)])
395

    
396
  def call_instance_run_rename(self, node, inst, old_name):
397
    """Run the OS rename script for an instance.
398

399
    This is a single-node call.
400

401
    """
402
    return self._SingleNodeCall(node, "instance_run_rename",
403
                                [self._InstDict(inst), old_name])
404

    
405
  def call_instance_info(self, node, instance, hname):
406
    """Returns information about a single instance.
407

408
    This is a single-node call.
409

410
    @type node: list
411
    @param node: the list of nodes to query
412
    @type instance: string
413
    @param instance: the instance name
414
    @type hname: string
415
    @param hname: the hypervisor type of the instance
416

417
    """
418
    return self._SingleNodeCall(node, "instance_info", [instance, hname])
419

    
420
  def call_all_instances_info(self, node_list, hypervisor_list):
421
    """Returns information about all instances on the given nodes.
422

423
    This is a multi-node call.
424

425
    @type node_list: list
426
    @param node_list: the list of nodes to query
427
    @type hypervisor_list: list
428
    @param hypervisor_list: the hypervisors to query for instances
429

430
    """
431
    return self._MultiNodeCall(node_list, "all_instances_info",
432
                               [hypervisor_list])
433

    
434
  def call_instance_list(self, node_list, hypervisor_list):
435
    """Returns the list of running instances on a given node.
436

437
    This is a multi-node call.
438

439
    @type node_list: list
440
    @param node_list: the list of nodes to query
441
    @type hypervisor_list: list
442
    @param hypervisor_list: the hypervisors to query for instances
443

444
    """
445
    return self._MultiNodeCall(node_list, "instance_list", [hypervisor_list])
446

    
447
  def call_node_tcp_ping(self, node, source, target, port, timeout,
448
                         live_port_needed):
449
    """Do a TcpPing on the remote node
450

451
    This is a single-node call.
452

453
    """
454
    return self._SingleNodeCall(node, "node_tcp_ping",
455
                                [source, target, port, timeout,
456
                                 live_port_needed])
457

    
458
  def call_node_has_ip_address(self, node, address):
459
    """Checks if a node has the given IP address.
460

461
    This is a single-node call.
462

463
    """
464
    return self._SingleNodeCall(node, "node_has_ip_address", [address])
465

    
466
  def call_node_info(self, node_list, vg_name, hypervisor_type):
467
    """Return node information.
468

469
    This will return memory information and volume group size and free
470
    space.
471

472
    This is a multi-node call.
473

474
    @type node_list: list
475
    @param node_list: the list of nodes to query
476
    @type vgname: C{string}
477
    @param vgname: the name of the volume group to ask for disk space
478
        information
479
    @type hypervisor_type: C{str}
480
    @param hypervisor_type: the name of the hypervisor to ask for
481
        memory information
482

483
    """
484
    retux = self._MultiNodeCall(node_list, "node_info",
485
                                [vg_name, hypervisor_type])
486

    
487
    for result in retux.itervalues():
488
      if result.failed or not isinstance(result.data, dict):
489
        result.data = {}
490

    
491
      utils.CheckDict(result.data, {
492
        'memory_total' : '-',
493
        'memory_dom0' : '-',
494
        'memory_free' : '-',
495
        'vg_size' : 'node_unreachable',
496
        'vg_free' : '-',
497
        }, "call_node_info")
498
    return retux
499

    
500
  def call_node_add(self, node, dsa, dsapub, rsa, rsapub, ssh, sshpub):
501
    """Add a node to the cluster.
502

503
    This is a single-node call.
504

505
    """
506
    return self._SingleNodeCall(node, "node_add",
507
                                [dsa, dsapub, rsa, rsapub, ssh, sshpub])
508

    
509
  def call_node_verify(self, node_list, checkdict, cluster_name):
510
    """Request verification of given parameters.
511

512
    This is a multi-node call.
513

514
    """
515
    return self._MultiNodeCall(node_list, "node_verify",
516
                               [checkdict, cluster_name])
517

    
518
  @classmethod
519
  def call_node_start_master(cls, node, start_daemons):
520
    """Tells a node to activate itself as a master.
521

522
    This is a single-node call.
523

524
    """
525
    return cls._StaticSingleNodeCall(node, "node_start_master",
526
                                     [start_daemons])
527

    
528
  @classmethod
529
  def call_node_stop_master(cls, node, stop_daemons):
530
    """Tells a node to demote itself from master status.
531

532
    This is a single-node call.
533

534
    """
535
    return cls._StaticSingleNodeCall(node, "node_stop_master", [stop_daemons])
536

    
537
  @classmethod
538
  def call_master_info(cls, node_list):
539
    """Query master info.
540

541
    This is a multi-node call.
542

543
    """
544
    # TODO: should this method query down nodes?
545
    return cls._StaticMultiNodeCall(node_list, "master_info", [])
546

    
547
  def call_version(self, node_list):
548
    """Query node version.
549

550
    This is a multi-node call.
551

552
    """
553
    return self._MultiNodeCall(node_list, "version", [])
554

    
555
  def call_blockdev_create(self, node, bdev, size, owner, on_primary, info):
556
    """Request creation of a given block device.
557

558
    This is a single-node call.
559

560
    """
561
    return self._SingleNodeCall(node, "blockdev_create",
562
                                [bdev.ToDict(), size, owner, on_primary, info])
563

    
564
  def call_blockdev_remove(self, node, bdev):
565
    """Request removal of a given block device.
566

567
    This is a single-node call.
568

569
    """
570
    return self._SingleNodeCall(node, "blockdev_remove", [bdev.ToDict()])
571

    
572
  def call_blockdev_rename(self, node, devlist):
573
    """Request rename of the given block devices.
574

575
    This is a single-node call.
576

577
    """
578
    return self._SingleNodeCall(node, "blockdev_rename",
579
                                [(d.ToDict(), uid) for d, uid in devlist])
580

    
581
  def call_blockdev_assemble(self, node, disk, owner, on_primary):
582
    """Request assembling of a given block device.
583

584
    This is a single-node call.
585

586
    """
587
    return self._SingleNodeCall(node, "blockdev_assemble",
588
                                [disk.ToDict(), owner, on_primary])
589

    
590
  def call_blockdev_shutdown(self, node, disk):
591
    """Request shutdown of a given block device.
592

593
    This is a single-node call.
594

595
    """
596
    return self._SingleNodeCall(node, "blockdev_shutdown", [disk.ToDict()])
597

    
598
  def call_blockdev_addchildren(self, node, bdev, ndevs):
599
    """Request adding a list of children to a (mirroring) device.
600

601
    This is a single-node call.
602

603
    """
604
    return self._SingleNodeCall(node, "blockdev_addchildren",
605
                                [bdev.ToDict(),
606
                                 [disk.ToDict() for disk in ndevs]])
607

    
608
  def call_blockdev_removechildren(self, node, bdev, ndevs):
609
    """Request removing a list of children from a (mirroring) device.
610

611
    This is a single-node call.
612

613
    """
614
    return self._SingleNodeCall(node, "blockdev_removechildren",
615
                                [bdev.ToDict(),
616
                                 [disk.ToDict() for disk in ndevs]])
617

    
618
  def call_blockdev_getmirrorstatus(self, node, disks):
619
    """Request status of a (mirroring) device.
620

621
    This is a single-node call.
622

623
    """
624
    return self._SingleNodeCall(node, "blockdev_getmirrorstatus",
625
                                [dsk.ToDict() for dsk in disks])
626

    
627
  def call_blockdev_find(self, node, disk):
628
    """Request identification of a given block device.
629

630
    This is a single-node call.
631

632
    """
633
    return self._SingleNodeCall(node, "blockdev_find", [disk.ToDict()])
634

    
635
  def call_blockdev_close(self, node, disks):
636
    """Closes the given block devices.
637

638
    This is a single-node call.
639

640
    """
641
    return self._SingleNodeCall(node, "blockdev_close",
642
                                [cf.ToDict() for cf in disks])
643

    
644
  @classmethod
645
  def call_upload_file(cls, node_list, file_name, address_list=None):
646
    """Upload a file.
647

648
    The node will refuse the operation in case the file is not on the
649
    approved file list.
650

651
    This is a multi-node call.
652

653
    @type node_list: list
654
    @param node_list: the list of node names to upload to
655
    @type file_name: str
656
    @param file_name: the filename to upload
657
    @type address_list: list or None
658
    @keyword address_list: an optional list of node addresses, in order
659
        to optimize the RPC speed
660

661
    """
662
    data = utils.ReadFile(file_name)
663
    st = os.stat(file_name)
664
    params = [file_name, data, st.st_mode, st.st_uid, st.st_gid,
665
              st.st_atime, st.st_mtime]
666
    return cls._StaticMultiNodeCall(node_list, "upload_file", params,
667
                                    address_list=address_list)
668

    
669
  @classmethod
670
  def call_write_ssconf_files(cls, node_list, values):
671
    """Write ssconf files.
672

673
    This is a multi-node call.
674

675
    """
676
    return cls._StaticMultiNodeCall(node_list, "write_ssconf_files", [values])
677

    
678
  def call_os_diagnose(self, node_list):
679
    """Request a diagnose of OS definitions.
680

681
    This is a multi-node call.
682

683
    """
684
    result = self._MultiNodeCall(node_list, "os_diagnose", [])
685

    
686
    for node_name, node_result in result.iteritems():
687
      if not node_result.failed and node_result.data:
688
        node_result.data = [objects.OS.FromDict(oss)
689
                            for oss in node_result.data]
690
    return result
691

    
692
  def call_os_get(self, node, name):
693
    """Returns an OS definition.
694

695
    This is a single-node call.
696

697
    """
698
    result = self._SingleNodeCall(node, "os_get", [name])
699
    if not result.failed and isinstance(result.data, dict):
700
      result.data = objects.OS.FromDict(result.data)
701
    return result
702

    
703
  def call_hooks_runner(self, node_list, hpath, phase, env):
704
    """Call the hooks runner.
705

706
    Args:
707
      - op: the OpCode instance
708
      - env: a dictionary with the environment
709

710
    This is a multi-node call.
711

712
    """
713
    params = [hpath, phase, env]
714
    return self._MultiNodeCall(node_list, "hooks_runner", params)
715

    
716
  def call_iallocator_runner(self, node, name, idata):
717
    """Call an iallocator on a remote node
718

719
    Args:
720
      - name: the iallocator name
721
      - input: the json-encoded input string
722

723
    This is a single-node call.
724

725
    """
726
    return self._SingleNodeCall(node, "iallocator_runner", [name, idata])
727

    
728
  def call_blockdev_grow(self, node, cf_bdev, amount):
729
    """Request a snapshot of the given block device.
730

731
    This is a single-node call.
732

733
    """
734
    return self._SingleNodeCall(node, "blockdev_grow",
735
                                [cf_bdev.ToDict(), amount])
736

    
737
  def call_blockdev_snapshot(self, node, cf_bdev):
738
    """Request a snapshot of the given block device.
739

740
    This is a single-node call.
741

742
    """
743
    return self._SingleNodeCall(node, "blockdev_snapshot", [cf_bdev.ToDict()])
744

    
745
  def call_snapshot_export(self, node, snap_bdev, dest_node, instance,
746
                           cluster_name, idx):
747
    """Request the export of a given snapshot.
748

749
    This is a single-node call.
750

751
    """
752
    return self._SingleNodeCall(node, "snapshot_export",
753
                                [snap_bdev.ToDict(), dest_node,
754
                                 self._InstDict(instance), cluster_name, idx])
755

    
756
  def call_finalize_export(self, node, instance, snap_disks):
757
    """Request the completion of an export operation.
758

759
    This writes the export config file, etc.
760

761
    This is a single-node call.
762

763
    """
764
    flat_disks = []
765
    for disk in snap_disks:
766
      flat_disks.append(disk.ToDict())
767

    
768
    return self._SingleNodeCall(node, "finalize_export",
769
                                [self._InstDict(instance), flat_disks])
770

    
771
  def call_export_info(self, node, path):
772
    """Queries the export information in a given path.
773

774
    This is a single-node call.
775

776
    """
777
    result = self._SingleNodeCall(node, "export_info", [path])
778
    if not result.failed and result.data:
779
      result.data = objects.SerializableConfigParser.Loads(str(result.data))
780
    return result
781

    
782
  def call_instance_os_import(self, node, inst, src_node, src_images,
783
                              cluster_name):
784
    """Request the import of a backup into an instance.
785

786
    This is a single-node call.
787

788
    """
789
    return self._SingleNodeCall(node, "instance_os_import",
790
                                [self._InstDict(inst), src_node, src_images,
791
                                 cluster_name])
792

    
793
  def call_export_list(self, node_list):
794
    """Gets the stored exports list.
795

796
    This is a multi-node call.
797

798
    """
799
    return self._MultiNodeCall(node_list, "export_list", [])
800

    
801
  def call_export_remove(self, node, export):
802
    """Requests removal of a given export.
803

804
    This is a single-node call.
805

806
    """
807
    return self._SingleNodeCall(node, "export_remove", [export])
808

    
809
  @classmethod
810
  def call_node_leave_cluster(cls, node):
811
    """Requests a node to clean the cluster information it has.
812

813
    This will remove the configuration information from the ganeti data
814
    dir.
815

816
    This is a single-node call.
817

818
    """
819
    return cls._StaticSingleNodeCall(node, "node_leave_cluster", [])
820

    
821
  def call_node_volumes(self, node_list):
822
    """Gets all volumes on node(s).
823

824
    This is a multi-node call.
825

826
    """
827
    return self._MultiNodeCall(node_list, "node_volumes", [])
828

    
829
  def call_test_delay(self, node_list, duration):
830
    """Sleep for a fixed time on given node(s).
831

832
    This is a multi-node call.
833

834
    """
835
    return self._MultiNodeCall(node_list, "test_delay", [duration])
836

    
837
  def call_file_storage_dir_create(self, node, file_storage_dir):
838
    """Create the given file storage directory.
839

840
    This is a single-node call.
841

842
    """
843
    return self._SingleNodeCall(node, "file_storage_dir_create",
844
                                [file_storage_dir])
845

    
846
  def call_file_storage_dir_remove(self, node, file_storage_dir):
847
    """Remove the given file storage directory.
848

849
    This is a single-node call.
850

851
    """
852
    return self._SingleNodeCall(node, "file_storage_dir_remove",
853
                                [file_storage_dir])
854

    
855
  def call_file_storage_dir_rename(self, node, old_file_storage_dir,
856
                                   new_file_storage_dir):
857
    """Rename file storage directory.
858

859
    This is a single-node call.
860

861
    """
862
    return self._SingleNodeCall(node, "file_storage_dir_rename",
863
                                [old_file_storage_dir, new_file_storage_dir])
864

    
865
  @classmethod
866
  def call_jobqueue_update(cls, node_list, address_list, file_name, content):
867
    """Update job queue.
868

869
    This is a multi-node call.
870

871
    """
872
    return cls._StaticMultiNodeCall(node_list, "jobqueue_update",
873
                                    [file_name, content],
874
                                    address_list=address_list)
875

    
876
  @classmethod
877
  def call_jobqueue_purge(cls, node):
878
    """Purge job queue.
879

880
    This is a single-node call.
881

882
    """
883
    return cls._StaticSingleNodeCall(node, "jobqueue_purge", [])
884

    
885
  @classmethod
886
  def call_jobqueue_rename(cls, node_list, address_list, old, new):
887
    """Rename a job queue file.
888

889
    This is a multi-node call.
890

891
    """
892
    return cls._StaticMultiNodeCall(node_list, "jobqueue_rename", [old, new],
893
                                    address_list=address_list)
894

    
895
  @classmethod
896
  def call_jobqueue_set_drain(cls, node_list, drain_flag):
897
    """Set the drain flag on the queue.
898

899
    This is a multi-node call.
900

901
    @type node_list: list
902
    @param node_list: the list of nodes to query
903
    @type drain_flag: bool
904
    @param drain_flag: if True, will set the drain flag, otherwise reset it.
905

906
    """
907
    return cls._StaticMultiNodeCall(node_list, "jobqueue_set_drain",
908
                                    [drain_flag])
909

    
910
  def call_hypervisor_validate_params(self, node_list, hvname, hvparams):
911
    """Validate the hypervisor params.
912

913
    This is a multi-node call.
914

915
    @type node_list: list
916
    @param node_list: the list of nodes to query
917
    @type hvname: string
918
    @param hvname: the hypervisor name
919
    @type hvparams: dict
920
    @param hvparams: the hypervisor parameters to be validated
921

922
    """
923
    cluster = self._cfg.GetClusterInfo()
924
    hv_full = cluster.FillDict(cluster.hvparams.get(hvname, {}), hvparams)
925
    return self._MultiNodeCall(node_list, "hypervisor_validate_params",
926
                               [hvname, hv_full])