Statistics
| Branch: | Tag: | Revision:

root / lib / rpc.py @ 56aa9fd5

History | View | Annotate | Download (26.6 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Inter-node RPC library.
23

24
"""
25

    
26
# pylint: disable-msg=C0103,R0201,R0904
27
# C0103: Invalid name, since call_ are not valid
28
# R0201: Method could be a function, we keep all rpcs instance methods
29
# as not to change them back and forth between static/instance methods
30
# if they need to start using instance attributes
31
# R0904: Too many public methods
32

    
33
import os
34
import socket
35
import logging
36

    
37
from ganeti import utils
38
from ganeti import objects
39
from ganeti import http
40
from ganeti import serializer
41
from ganeti import constants
42
from ganeti import errors
43

    
44
import ganeti.http.client
45

    
46

    
47
# Module level variable
48
_http_manager = None
49

    
50

    
51
def Init():
52
  """Initializes the module-global HTTP client manager.
53

54
  Must be called before using any RPC function.
55

56
  """
57
  global _http_manager
58

    
59
  assert not _http_manager, "RPC module initialized more than once"
60

    
61
  _http_manager = http.client.HttpClientManager()
62

    
63

    
64
def Shutdown():
65
  """Stops the module-global HTTP client manager.
66

67
  Must be called before quitting the program.
68

69
  """
70
  global _http_manager
71

    
72
  if _http_manager:
73
    _http_manager.Shutdown()
74
    _http_manager = None
75

    
76

    
77
class RpcResult(object):
78
  """RPC Result class.
79

80
  This class holds an RPC result. It is needed since in multi-node
81
  calls we can't raise an exception just because one one out of many
82
  failed, and therefore we use this class to encapsulate the result.
83

84
  """
85
  def __init__(self, data, failed=False, call=None, node=None):
86
    self.failed = failed
87
    self.call = None
88
    self.node = None
89
    if failed:
90
      self.error = data
91
      self.data = None
92
    else:
93
      self.data = data
94
      self.error = None
95

    
96
  def Raise(self):
97
    """If the result has failed, raise an OpExecError.
98

99
    This is used so that LU code doesn't have to check for each
100
    result, but instead can call this function.
101

102
    """
103
    if self.failed:
104
      raise errors.OpExecError("Call '%s' to node '%s' has failed: %s" %
105
                               (self.call, self.node, self.error))
106

    
107

    
108
class Client:
109
  """RPC Client class.
110

111
  This class, given a (remote) method name, a list of parameters and a
112
  list of nodes, will contact (in parallel) all nodes, and return a
113
  dict of results (key: node name, value: result).
114

115
  One current bug is that generic failure is still signalled by
116
  'False' result, which is not good. This overloading of values can
117
  cause bugs.
118

119
  """
120
  def __init__(self, procedure, body, port):
121
    self.procedure = procedure
122
    self.body = body
123
    self.port = port
124
    self.nc = {}
125

    
126
    self._ssl_params = \
127
      http.HttpSslParams(ssl_key_path=constants.SSL_CERT_FILE,
128
                         ssl_cert_path=constants.SSL_CERT_FILE)
129

    
130
  def ConnectList(self, node_list, address_list=None):
131
    """Add a list of nodes to the target nodes.
132

133
    @type node_list: list
134
    @param node_list: the list of node names to connect
135
    @type address_list: list or None
136
    @keyword address_list: either None or a list with node addresses,
137
        which must have the same length as the node list
138

139
    """
140
    if address_list is None:
141
      address_list = [None for _ in node_list]
142
    else:
143
      assert len(node_list) == len(address_list), \
144
             "Name and address lists should have the same length"
145
    for node, address in zip(node_list, address_list):
146
      self.ConnectNode(node, address)
147

    
148
  def ConnectNode(self, name, address=None):
149
    """Add a node to the target list.
150

151
    @type name: str
152
    @param name: the node name
153
    @type address: str
154
    @keyword address: the node address, if known
155

156
    """
157
    if address is None:
158
      address = name
159

    
160
    self.nc[name] = \
161
      http.client.HttpClientRequest(address, self.port, http.HTTP_PUT,
162
                                    "/%s" % self.procedure,
163
                                    post_data=self.body,
164
                                    ssl_params=self._ssl_params,
165
                                    ssl_verify_peer=True)
166

    
167
  def GetResults(self):
168
    """Call nodes and return results.
169

170
    @rtype: list
171
    @returns: List of RPC results
172

173
    """
174
    assert _http_manager, "RPC module not intialized"
175

    
176
    _http_manager.ExecRequests(self.nc.values())
177

    
178
    results = {}
179

    
180
    for name, req in self.nc.iteritems():
181
      if req.success and req.resp_status_code == http.HTTP_OK:
182
        results[name] = RpcResult(data=serializer.LoadJson(req.resp_body),
183
                                  node=name, call=self.procedure)
184
        continue
185

    
186
      # TODO: Better error reporting
187
      if req.error:
188
        msg = req.error
189
      else:
190
        msg = req.resp_body
191

    
192
      logging.error("RPC error from node %s: %s", name, msg)
193
      results[name] = RpcResult(data=msg, failed=True, node=name,
194
                                call=self.procedure)
195

    
196
    return results
197

    
198

    
199
class RpcRunner(object):
200
  """RPC runner class"""
201

    
202
  def __init__(self, cfg):
203
    """Initialized the rpc runner.
204

205
    @type cfg:  C{config.ConfigWriter}
206
    @param cfg: the configuration object that will be used to get data
207
                about the cluster
208

209
    """
210
    self._cfg = cfg
211
    self.port = utils.GetNodeDaemonPort()
212

    
213
  def _InstDict(self, instance):
214
    """Convert the given instance to a dict.
215

216
    This is done via the instance's ToDict() method and additionally
217
    we fill the hvparams with the cluster defaults.
218

219
    @type instance: L{objects.Instance}
220
    @param instance: an Instance object
221
    @rtype: dict
222
    @return: the instance dict, with the hvparams filled with the
223
        cluster defaults
224

225
    """
226
    idict = instance.ToDict()
227
    cluster = self._cfg.GetClusterInfo()
228
    idict["hvparams"] = cluster.FillHV(instance)
229
    idict["beparams"] = cluster.FillBE(instance)
230
    return idict
231

    
232
  def _ConnectList(self, client, node_list):
233
    """Helper for computing node addresses.
234

235
    @type client: L{Client}
236
    @param client: a C{Client} instance
237
    @type node_list: list
238
    @param node_list: the node list we should connect
239

240
    """
241
    all_nodes = self._cfg.GetAllNodesInfo()
242
    addr_list = []
243
    for node in node_list:
244
      if node in all_nodes:
245
        val = all_nodes[node].primary_ip
246
      else:
247
        val = None
248
      addr_list.append(val)
249
    client.ConnectList(node_list, address_list=addr_list)
250

    
251
  def _ConnectNode(self, client, node):
252
    """Helper for computing one node's address.
253

254
    @type client: L{Client}
255
    @param client: a C{Client} instance
256
    @type node: str
257
    @param node: the node we should connect
258

259
    """
260
    node_info = self._cfg.GetNodeInfo(node)
261
    if node_info is not None:
262
      addr = node_info.primary_ip
263
    else:
264
      addr = None
265
    client.ConnectNode(node, address=addr)
266

    
267
  def _MultiNodeCall(self, node_list, procedure, args,
268
                     address_list=None):
269
    """Helper for making a multi-node call
270

271
    """
272
    body = serializer.DumpJson(args, indent=False)
273
    c = Client(procedure, body, self.port)
274
    if address_list is None:
275
      self._ConnectList(c, node_list)
276
    else:
277
      c.ConnectList(node_list, address_list=address_list)
278
    return c.GetResults()
279

    
280
  @classmethod
281
  def _StaticMultiNodeCall(cls, node_list, procedure, args,
282
                           address_list=None):
283
    """Helper for making a multi-node static call
284

285
    """
286
    body = serializer.DumpJson(args, indent=False)
287
    c = Client(procedure, body, utils.GetNodeDaemonPort())
288
    c.ConnectList(node_list, address_list=address_list)
289
    return c.GetResults()
290

    
291
  def _SingleNodeCall(self, node, procedure, args):
292
    """Helper for making a single-node call
293

294
    """
295
    body = serializer.DumpJson(args, indent=False)
296
    c = Client(procedure, body, self.port)
297
    self._ConnectNode(c, node)
298
    return c.GetResults().get(node, False)
299

    
300
  @classmethod
301
  def _StaticSingleNodeCall(cls, node, procedure, args):
302
    """Helper for making a single-node static call
303

304
    """
305
    body = serializer.DumpJson(args, indent=False)
306
    c = Client(procedure, body, utils.GetNodeDaemonPort())
307
    c.ConnectNode(node)
308
    return c.GetResults().get(node, False)
309

    
310
  #
311
  # Begin RPC calls
312
  #
313

    
314
  def call_volume_list(self, node_list, vg_name):
315
    """Gets the logical volumes present in a given volume group.
316

317
    This is a multi-node call.
318

319
    """
320
    return self._MultiNodeCall(node_list, "volume_list", [vg_name])
321

    
322
  def call_vg_list(self, node_list):
323
    """Gets the volume group list.
324

325
    This is a multi-node call.
326

327
    """
328
    return self._MultiNodeCall(node_list, "vg_list", [])
329

    
330
  def call_bridges_exist(self, node, bridges_list):
331
    """Checks if a node has all the bridges given.
332

333
    This method checks if all bridges given in the bridges_list are
334
    present on the remote node, so that an instance that uses interfaces
335
    on those bridges can be started.
336

337
    This is a single-node call.
338

339
    """
340
    return self._SingleNodeCall(node, "bridges_exist", [bridges_list])
341

    
342
  def call_instance_start(self, node, instance, extra_args):
343
    """Starts an instance.
344

345
    This is a single-node call.
346

347
    """
348
    return self._SingleNodeCall(node, "instance_start",
349
                                [self._InstDict(instance), extra_args])
350

    
351
  def call_instance_shutdown(self, node, instance):
352
    """Stops an instance.
353

354
    This is a single-node call.
355

356
    """
357
    return self._SingleNodeCall(node, "instance_shutdown",
358
                                [self._InstDict(instance)])
359

    
360
  def call_instance_migrate(self, node, instance, target, live):
361
    """Migrate an instance.
362

363
    This is a single-node call.
364

365
    @type node: string
366
    @param node: the node on which the instance is currently running
367
    @type instance: C{objects.Instance}
368
    @param instance: the instance definition
369
    @type target: string
370
    @param target: the target node name
371
    @type live: boolean
372
    @param live: whether the migration should be done live or not (the
373
        interpretation of this parameter is left to the hypervisor)
374

375
    """
376
    return self._SingleNodeCall(node, "instance_migrate",
377
                                [self._InstDict(instance), target, live])
378

    
379
  def call_instance_reboot(self, node, instance, reboot_type, extra_args):
380
    """Reboots an instance.
381

382
    This is a single-node call.
383

384
    """
385
    return self._SingleNodeCall(node, "instance_reboot",
386
                                [self._InstDict(instance), reboot_type,
387
                                 extra_args])
388

    
389
  def call_instance_os_add(self, node, inst):
390
    """Installs an OS on the given instance.
391

392
    This is a single-node call.
393

394
    """
395
    return self._SingleNodeCall(node, "instance_os_add",
396
                                [self._InstDict(inst)])
397

    
398
  def call_instance_run_rename(self, node, inst, old_name):
399
    """Run the OS rename script for an instance.
400

401
    This is a single-node call.
402

403
    """
404
    return self._SingleNodeCall(node, "instance_run_rename",
405
                                [self._InstDict(inst), old_name])
406

    
407
  def call_instance_info(self, node, instance, hname):
408
    """Returns information about a single instance.
409

410
    This is a single-node call.
411

412
    @type node: list
413
    @param node: the list of nodes to query
414
    @type instance: string
415
    @param instance: the instance name
416
    @type hname: string
417
    @param hname: the hypervisor type of the instance
418

419
    """
420
    return self._SingleNodeCall(node, "instance_info", [instance, hname])
421

    
422
  def call_all_instances_info(self, node_list, hypervisor_list):
423
    """Returns information about all instances on the given nodes.
424

425
    This is a multi-node call.
426

427
    @type node_list: list
428
    @param node_list: the list of nodes to query
429
    @type hypervisor_list: list
430
    @param hypervisor_list: the hypervisors to query for instances
431

432
    """
433
    return self._MultiNodeCall(node_list, "all_instances_info",
434
                               [hypervisor_list])
435

    
436
  def call_instance_list(self, node_list, hypervisor_list):
437
    """Returns the list of running instances on a given node.
438

439
    This is a multi-node call.
440

441
    @type node_list: list
442
    @param node_list: the list of nodes to query
443
    @type hypervisor_list: list
444
    @param hypervisor_list: the hypervisors to query for instances
445

446
    """
447
    return self._MultiNodeCall(node_list, "instance_list", [hypervisor_list])
448

    
449
  def call_node_tcp_ping(self, node, source, target, port, timeout,
450
                         live_port_needed):
451
    """Do a TcpPing on the remote node
452

453
    This is a single-node call.
454

455
    """
456
    return self._SingleNodeCall(node, "node_tcp_ping",
457
                                [source, target, port, timeout,
458
                                 live_port_needed])
459

    
460
  def call_node_has_ip_address(self, node, address):
461
    """Checks if a node has the given IP address.
462

463
    This is a single-node call.
464

465
    """
466
    return self._SingleNodeCall(node, "node_has_ip_address", [address])
467

    
468
  def call_node_info(self, node_list, vg_name, hypervisor_type):
469
    """Return node information.
470

471
    This will return memory information and volume group size and free
472
    space.
473

474
    This is a multi-node call.
475

476
    @type node_list: list
477
    @param node_list: the list of nodes to query
478
    @type vgname: C{string}
479
    @param vgname: the name of the volume group to ask for disk space
480
        information
481
    @type hypervisor_type: C{str}
482
    @param hypervisor_type: the name of the hypervisor to ask for
483
        memory information
484

485
    """
486
    retux = self._MultiNodeCall(node_list, "node_info",
487
                                [vg_name, hypervisor_type])
488

    
489
    for result in retux.itervalues():
490
      if result.failed or not isinstance(result.data, dict):
491
        result.data = {}
492

    
493
      utils.CheckDict(result.data, {
494
        'memory_total' : '-',
495
        'memory_dom0' : '-',
496
        'memory_free' : '-',
497
        'vg_size' : 'node_unreachable',
498
        'vg_free' : '-',
499
        }, "call_node_info")
500
    return retux
501

    
502
  def call_node_add(self, node, dsa, dsapub, rsa, rsapub, ssh, sshpub):
503
    """Add a node to the cluster.
504

505
    This is a single-node call.
506

507
    """
508
    return self._SingleNodeCall(node, "node_add",
509
                                [dsa, dsapub, rsa, rsapub, ssh, sshpub])
510

    
511
  def call_node_verify(self, node_list, checkdict, cluster_name):
512
    """Request verification of given parameters.
513

514
    This is a multi-node call.
515

516
    """
517
    return self._MultiNodeCall(node_list, "node_verify",
518
                               [checkdict, cluster_name])
519

    
520
  @classmethod
521
  def call_node_start_master(cls, node, start_daemons):
522
    """Tells a node to activate itself as a master.
523

524
    This is a single-node call.
525

526
    """
527
    return cls._StaticSingleNodeCall(node, "node_start_master",
528
                                     [start_daemons])
529

    
530
  @classmethod
531
  def call_node_stop_master(cls, node, stop_daemons):
532
    """Tells a node to demote itself from master status.
533

534
    This is a single-node call.
535

536
    """
537
    return cls._StaticSingleNodeCall(node, "node_stop_master", [stop_daemons])
538

    
539
  @classmethod
540
  def call_master_info(cls, node_list):
541
    """Query master info.
542

543
    This is a multi-node call.
544

545
    """
546
    # TODO: should this method query down nodes?
547
    return cls._StaticMultiNodeCall(node_list, "master_info", [])
548

    
549
  def call_version(self, node_list):
550
    """Query node version.
551

552
    This is a multi-node call.
553

554
    """
555
    return self._MultiNodeCall(node_list, "version", [])
556

    
557
  def call_blockdev_create(self, node, bdev, size, owner, on_primary, info):
558
    """Request creation of a given block device.
559

560
    This is a single-node call.
561

562
    """
563
    return self._SingleNodeCall(node, "blockdev_create",
564
                                [bdev.ToDict(), size, owner, on_primary, info])
565

    
566
  def call_blockdev_remove(self, node, bdev):
567
    """Request removal of a given block device.
568

569
    This is a single-node call.
570

571
    """
572
    return self._SingleNodeCall(node, "blockdev_remove", [bdev.ToDict()])
573

    
574
  def call_blockdev_rename(self, node, devlist):
575
    """Request rename of the given block devices.
576

577
    This is a single-node call.
578

579
    """
580
    return self._SingleNodeCall(node, "blockdev_rename",
581
                                [(d.ToDict(), uid) for d, uid in devlist])
582

    
583
  def call_blockdev_assemble(self, node, disk, owner, on_primary):
584
    """Request assembling of a given block device.
585

586
    This is a single-node call.
587

588
    """
589
    return self._SingleNodeCall(node, "blockdev_assemble",
590
                                [disk.ToDict(), owner, on_primary])
591

    
592
  def call_blockdev_shutdown(self, node, disk):
593
    """Request shutdown of a given block device.
594

595
    This is a single-node call.
596

597
    """
598
    return self._SingleNodeCall(node, "blockdev_shutdown", [disk.ToDict()])
599

    
600
  def call_blockdev_addchildren(self, node, bdev, ndevs):
601
    """Request adding a list of children to a (mirroring) device.
602

603
    This is a single-node call.
604

605
    """
606
    return self._SingleNodeCall(node, "blockdev_addchildren",
607
                                [bdev.ToDict(),
608
                                 [disk.ToDict() for disk in ndevs]])
609

    
610
  def call_blockdev_removechildren(self, node, bdev, ndevs):
611
    """Request removing a list of children from a (mirroring) device.
612

613
    This is a single-node call.
614

615
    """
616
    return self._SingleNodeCall(node, "blockdev_removechildren",
617
                                [bdev.ToDict(),
618
                                 [disk.ToDict() for disk in ndevs]])
619

    
620
  def call_blockdev_getmirrorstatus(self, node, disks):
621
    """Request status of a (mirroring) device.
622

623
    This is a single-node call.
624

625
    """
626
    return self._SingleNodeCall(node, "blockdev_getmirrorstatus",
627
                                [dsk.ToDict() for dsk in disks])
628

    
629
  def call_blockdev_find(self, node, disk):
630
    """Request identification of a given block device.
631

632
    This is a single-node call.
633

634
    """
635
    return self._SingleNodeCall(node, "blockdev_find", [disk.ToDict()])
636

    
637
  def call_blockdev_close(self, node, disks):
638
    """Closes the given block devices.
639

640
    This is a single-node call.
641

642
    """
643
    return self._SingleNodeCall(node, "blockdev_close",
644
                                [cf.ToDict() for cf in disks])
645

    
646
  @classmethod
647
  def call_upload_file(cls, node_list, file_name, address_list=None):
648
    """Upload a file.
649

650
    The node will refuse the operation in case the file is not on the
651
    approved file list.
652

653
    This is a multi-node call.
654

655
    @type node_list: list
656
    @param node_list: the list of node names to upload to
657
    @type file_name: str
658
    @param file_name: the filename to upload
659
    @type address_list: list or None
660
    @keyword address_list: an optional list of node addresses, in order
661
        to optimize the RPC speed
662

663
    """
664
    data = utils.ReadFile(file_name)
665
    st = os.stat(file_name)
666
    params = [file_name, data, st.st_mode, st.st_uid, st.st_gid,
667
              st.st_atime, st.st_mtime]
668
    return cls._StaticMultiNodeCall(node_list, "upload_file", params,
669
                                    address_list=address_list)
670

    
671
  @classmethod
672
  def call_write_ssconf_files(cls, node_list, values):
673
    """Write ssconf files.
674

675
    This is a multi-node call.
676

677
    """
678
    return cls._StaticMultiNodeCall(node_list, "write_ssconf_files", [values])
679

    
680
  def call_os_diagnose(self, node_list):
681
    """Request a diagnose of OS definitions.
682

683
    This is a multi-node call.
684

685
    """
686
    result = self._MultiNodeCall(node_list, "os_diagnose", [])
687

    
688
    for node_name, node_result in result.iteritems():
689
      if not node_result.failed and node_result.data:
690
        node_result.data = [objects.OS.FromDict(oss)
691
                            for oss in node_result.data]
692
    return result
693

    
694
  def call_os_get(self, node, name):
695
    """Returns an OS definition.
696

697
    This is a single-node call.
698

699
    """
700
    result = self._SingleNodeCall(node, "os_get", [name])
701
    if not result.failed and isinstance(result.data, dict):
702
      result.data = objects.OS.FromDict(result.data)
703
    return result
704

    
705
  def call_hooks_runner(self, node_list, hpath, phase, env):
706
    """Call the hooks runner.
707

708
    Args:
709
      - op: the OpCode instance
710
      - env: a dictionary with the environment
711

712
    This is a multi-node call.
713

714
    """
715
    params = [hpath, phase, env]
716
    return self._MultiNodeCall(node_list, "hooks_runner", params)
717

    
718
  def call_iallocator_runner(self, node, name, idata):
719
    """Call an iallocator on a remote node
720

721
    Args:
722
      - name: the iallocator name
723
      - input: the json-encoded input string
724

725
    This is a single-node call.
726

727
    """
728
    return self._SingleNodeCall(node, "iallocator_runner", [name, idata])
729

    
730
  def call_blockdev_grow(self, node, cf_bdev, amount):
731
    """Request a snapshot of the given block device.
732

733
    This is a single-node call.
734

735
    """
736
    return self._SingleNodeCall(node, "blockdev_grow",
737
                                [cf_bdev.ToDict(), amount])
738

    
739
  def call_blockdev_snapshot(self, node, cf_bdev):
740
    """Request a snapshot of the given block device.
741

742
    This is a single-node call.
743

744
    """
745
    return self._SingleNodeCall(node, "blockdev_snapshot", [cf_bdev.ToDict()])
746

    
747
  def call_snapshot_export(self, node, snap_bdev, dest_node, instance,
748
                           cluster_name, idx):
749
    """Request the export of a given snapshot.
750

751
    This is a single-node call.
752

753
    """
754
    return self._SingleNodeCall(node, "snapshot_export",
755
                                [snap_bdev.ToDict(), dest_node,
756
                                 self._InstDict(instance), cluster_name, idx])
757

    
758
  def call_finalize_export(self, node, instance, snap_disks):
759
    """Request the completion of an export operation.
760

761
    This writes the export config file, etc.
762

763
    This is a single-node call.
764

765
    """
766
    flat_disks = []
767
    for disk in snap_disks:
768
      flat_disks.append(disk.ToDict())
769

    
770
    return self._SingleNodeCall(node, "finalize_export",
771
                                [self._InstDict(instance), flat_disks])
772

    
773
  def call_export_info(self, node, path):
774
    """Queries the export information in a given path.
775

776
    This is a single-node call.
777

778
    """
779
    result = self._SingleNodeCall(node, "export_info", [path])
780
    if not result.failed and result.data:
781
      result.data = objects.SerializableConfigParser.Loads(str(result.data))
782
    return result
783

    
784
  def call_instance_os_import(self, node, inst, src_node, src_images,
785
                              cluster_name):
786
    """Request the import of a backup into an instance.
787

788
    This is a single-node call.
789

790
    """
791
    return self._SingleNodeCall(node, "instance_os_import",
792
                                [self._InstDict(inst), src_node, src_images,
793
                                 cluster_name])
794

    
795
  def call_export_list(self, node_list):
796
    """Gets the stored exports list.
797

798
    This is a multi-node call.
799

800
    """
801
    return self._MultiNodeCall(node_list, "export_list", [])
802

    
803
  def call_export_remove(self, node, export):
804
    """Requests removal of a given export.
805

806
    This is a single-node call.
807

808
    """
809
    return self._SingleNodeCall(node, "export_remove", [export])
810

    
811
  @classmethod
812
  def call_node_leave_cluster(cls, node):
813
    """Requests a node to clean the cluster information it has.
814

815
    This will remove the configuration information from the ganeti data
816
    dir.
817

818
    This is a single-node call.
819

820
    """
821
    return cls._StaticSingleNodeCall(node, "node_leave_cluster", [])
822

    
823
  def call_node_volumes(self, node_list):
824
    """Gets all volumes on node(s).
825

826
    This is a multi-node call.
827

828
    """
829
    return self._MultiNodeCall(node_list, "node_volumes", [])
830

    
831
  def call_node_demote_from_mc(self, node):
832
    """Demote a node from the master candidate role.
833

834
    This is a single-node call.
835

836
    """
837
    return self._SingleNodeCall(node, "node_demote_from_mc", [])
838

    
839
  def call_test_delay(self, node_list, duration):
840
    """Sleep for a fixed time on given node(s).
841

842
    This is a multi-node call.
843

844
    """
845
    return self._MultiNodeCall(node_list, "test_delay", [duration])
846

    
847
  def call_file_storage_dir_create(self, node, file_storage_dir):
848
    """Create the given file storage directory.
849

850
    This is a single-node call.
851

852
    """
853
    return self._SingleNodeCall(node, "file_storage_dir_create",
854
                                [file_storage_dir])
855

    
856
  def call_file_storage_dir_remove(self, node, file_storage_dir):
857
    """Remove the given file storage directory.
858

859
    This is a single-node call.
860

861
    """
862
    return self._SingleNodeCall(node, "file_storage_dir_remove",
863
                                [file_storage_dir])
864

    
865
  def call_file_storage_dir_rename(self, node, old_file_storage_dir,
866
                                   new_file_storage_dir):
867
    """Rename file storage directory.
868

869
    This is a single-node call.
870

871
    """
872
    return self._SingleNodeCall(node, "file_storage_dir_rename",
873
                                [old_file_storage_dir, new_file_storage_dir])
874

    
875
  @classmethod
876
  def call_jobqueue_update(cls, node_list, address_list, file_name, content):
877
    """Update job queue.
878

879
    This is a multi-node call.
880

881
    """
882
    return cls._StaticMultiNodeCall(node_list, "jobqueue_update",
883
                                    [file_name, content],
884
                                    address_list=address_list)
885

    
886
  @classmethod
887
  def call_jobqueue_purge(cls, node):
888
    """Purge job queue.
889

890
    This is a single-node call.
891

892
    """
893
    return cls._StaticSingleNodeCall(node, "jobqueue_purge", [])
894

    
895
  @classmethod
896
  def call_jobqueue_rename(cls, node_list, address_list, old, new):
897
    """Rename a job queue file.
898

899
    This is a multi-node call.
900

901
    """
902
    return cls._StaticMultiNodeCall(node_list, "jobqueue_rename", [old, new],
903
                                    address_list=address_list)
904

    
905
  @classmethod
906
  def call_jobqueue_set_drain(cls, node_list, drain_flag):
907
    """Set the drain flag on the queue.
908

909
    This is a multi-node call.
910

911
    @type node_list: list
912
    @param node_list: the list of nodes to query
913
    @type drain_flag: bool
914
    @param drain_flag: if True, will set the drain flag, otherwise reset it.
915

916
    """
917
    return cls._StaticMultiNodeCall(node_list, "jobqueue_set_drain",
918
                                    [drain_flag])
919

    
920
  def call_hypervisor_validate_params(self, node_list, hvname, hvparams):
921
    """Validate the hypervisor params.
922

923
    This is a multi-node call.
924

925
    @type node_list: list
926
    @param node_list: the list of nodes to query
927
    @type hvname: string
928
    @param hvname: the hypervisor name
929
    @type hvparams: dict
930
    @param hvparams: the hypervisor parameters to be validated
931

932
    """
933
    cluster = self._cfg.GetClusterInfo()
934
    hv_full = cluster.FillDict(cluster.hvparams.get(hvname, {}), hvparams)
935
    return self._MultiNodeCall(node_list, "hypervisor_validate_params",
936
                               [hvname, hv_full])