Statistics
| Branch: | Tag: | Revision:

root / lib / rpc.py @ 03d1dba2

History | View | Annotate | Download (25 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Inter-node RPC library.
23

24
"""
25

    
26
# pylint: disable-msg=C0103,R0201,R0904
27
# C0103: Invalid name, since call_ are not valid
28
# R0201: Method could be a function, we keep all rpcs instance methods
29
# as not to change them back and forth between static/instance methods
30
# if they need to start using instance attributes
31
# R0904: Too many public methods
32

    
33
import os
34
import socket
35
import httplib
36
import logging
37

    
38
from ganeti import utils
39
from ganeti import objects
40
from ganeti import http
41
from ganeti import serializer
42
from ganeti import constants
43

    
44

    
45
# Module level variable
46
_http_manager = None
47

    
48

    
49
def Init():
50
  """Initializes the module-global HTTP client manager.
51

52
  Must be called before using any RPC function.
53

54
  """
55
  global _http_manager
56

    
57
  assert not _http_manager, "RPC module initialized more than once"
58

    
59
  _http_manager = http.HttpClientManager()
60

    
61

    
62
def Shutdown():
63
  """Stops the module-global HTTP client manager.
64

65
  Must be called before quitting the program.
66

67
  """
68
  global _http_manager
69

    
70
  if _http_manager:
71
    _http_manager.Shutdown()
72
    _http_manager = None
73

    
74

    
75
class Client:
76
  """RPC Client class.
77

78
  This class, given a (remote) method name, a list of parameters and a
79
  list of nodes, will contact (in parallel) all nodes, and return a
80
  dict of results (key: node name, value: result).
81

82
  One current bug is that generic failure is still signalled by
83
  'False' result, which is not good. This overloading of values can
84
  cause bugs.
85

86
  """
87
  def __init__(self, procedure, args):
88
    self.procedure = procedure
89
    self.args = args
90
    self.body = serializer.DumpJson(args, indent=False)
91

    
92
    self.port = utils.GetNodeDaemonPort()
93
    self.nc = {}
94

    
95
    self._ssl_params = \
96
      http.HttpSslParams(ssl_key_path=constants.SSL_CERT_FILE,
97
                         ssl_cert_path=constants.SSL_CERT_FILE)
98

    
99
  def ConnectList(self, node_list, address_list=None):
100
    """Add a list of nodes to the target nodes.
101

102
    @type node_list: list
103
    @param node_list: the list of node names to connect
104
    @type address_list: list or None
105
    @keyword address_list: either None or a list with node addresses,
106
        which must have the same length as the node list
107

108
    """
109
    if address_list is None:
110
      address_list = [None for _ in node_list]
111
    else:
112
      assert len(node_list) == len(address_list), \
113
             "Name and address lists should have the same length"
114
    for node, address in zip(node_list, address_list):
115
      self.ConnectNode(node, address)
116

    
117
  def ConnectNode(self, name, address=None):
118
    """Add a node to the target list.
119

120
    @type name: str
121
    @param name: the node name
122
    @type address: str
123
    @keyword address: the node address, if known
124

125
    """
126
    if address is None:
127
      address = name
128

    
129
    self.nc[name] = http.HttpClientRequest(address, self.port, http.HTTP_PUT,
130
                                           "/%s" % self.procedure,
131
                                           post_data=self.body,
132
                                           ssl_params=self._ssl_params,
133
                                           ssl_verify_peer=True)
134

    
135
  def GetResults(self):
136
    """Call nodes and return results.
137

138
    @rtype: list
139
    @returns: List of RPC results
140

141
    """
142
    assert _http_manager, "RPC module not intialized"
143

    
144
    _http_manager.ExecRequests(self.nc.values())
145

    
146
    results = {}
147

    
148
    for name, req in self.nc.iteritems():
149
      if req.success and req.resp_status == http.HTTP_OK:
150
        results[name] = serializer.LoadJson(req.resp_body)
151
        continue
152

    
153
      # TODO: Better error reporting
154
      if req.error:
155
        msg = req.error
156
      else:
157
        msg = req.resp_body
158

    
159
      logging.error("RPC error from node %s: %s", name, msg)
160
      results[name] = False
161

    
162
    return results
163

    
164

    
165
class RpcRunner(object):
166
  """RPC runner class"""
167

    
168
  def __init__(self, cfg):
169
    """Initialized the rpc runner.
170

171
    @type cfg:  C{config.ConfigWriter}
172
    @param cfg: the configuration object that will be used to get data
173
                about the cluster
174

175
    """
176
    self._cfg = cfg
177

    
178
  def _InstDict(self, instance):
179
    """Convert the given instance to a dict.
180

181
    This is done via the instance's ToDict() method and additionally
182
    we fill the hvparams with the cluster defaults.
183

184
    @type instance: L{objects.Instance}
185
    @param instance: an Instance object
186
    @rtype: dict
187
    @return: the instance dict, with the hvparams filled with the
188
        cluster defaults
189

190
    """
191
    idict = instance.ToDict()
192
    cluster = self._cfg.GetClusterInfo()
193
    idict["hvparams"] = cluster.FillHV(instance)
194
    idict["beparams"] = cluster.FillBE(instance)
195
    return idict
196

    
197
  def _ConnectList(self, client, node_list):
198
    """Helper for computing node addresses.
199

200
    @type client: L{Client}
201
    @param client: a C{Client} instance
202
    @type node_list: list
203
    @param node_list: the node list we should connect
204

205
    """
206
    all_nodes = self._cfg.GetAllNodesInfo()
207
    addr_list = []
208
    for node in node_list:
209
      if node in all_nodes:
210
        val = all_nodes[node].primary_ip
211
      else:
212
        val = None
213
      addr_list.append(val)
214
    client.ConnectList(node_list, address_list=addr_list)
215

    
216
  def _ConnectNode(self, client, node):
217
    """Helper for computing one node's address.
218

219
    @type client: L{Client}
220
    @param client: a C{Client} instance
221
    @type node: str
222
    @param node: the node we should connect
223

224
    """
225
    node_info = self._cfg.GetNodeInfo(node)
226
    if node_info is not None:
227
      addr = node_info.primary_ip
228
    else:
229
      addr = None
230
    client.ConnectNode(node, address=addr)
231

    
232
  def _MultiNodeCall(self, node_list, procedure, args,
233
                     address_list=None):
234
    c = Client(procedure, args)
235
    if address_list is None:
236
      self._ConnectList(c, node_list)
237
    else:
238
      c.ConnectList(node_list, address_list=address_list)
239
    return c.GetResults()
240

    
241
  @classmethod
242
  def _StaticMultiNodeCall(cls, node_list, procedure, args,
243
                           address_list=None):
244
    c = Client(procedure, args)
245
    c.ConnectList(node_list, address_list=address_list)
246
    return c.GetResults()
247

    
248
  def _SingleNodeCall(self, node, procedure, args):
249
    """
250

251
    """
252
    c = Client(procedure, args)
253
    self._ConnectNode(c, node)
254
    return c.GetResults().get(node, False)
255

    
256
  @classmethod
257
  def _StaticSingleNodeCall(cls, node, procedure, args):
258
    """
259

260
    """
261
    c = Client(procedure, args)
262
    c.ConnectNode(c, node)
263
    return c.GetResults().get(node, False)
264

    
265
  def call_volume_list(self, node_list, vg_name):
266
    """Gets the logical volumes present in a given volume group.
267

268
    This is a multi-node call.
269

270
    """
271
    return self._MultiNodeCall(node_list, "volume_list", [vg_name])
272

    
273
  def call_vg_list(self, node_list):
274
    """Gets the volume group list.
275

276
    This is a multi-node call.
277

278
    """
279
    return self._MultiNodeCall(node_list, "vg_list", [])
280

    
281
  def call_bridges_exist(self, node, bridges_list):
282
    """Checks if a node has all the bridges given.
283

284
    This method checks if all bridges given in the bridges_list are
285
    present on the remote node, so that an instance that uses interfaces
286
    on those bridges can be started.
287

288
    This is a single-node call.
289

290
    """
291
    return self._SingleNodeCall(node, "bridges_exist", [bridges_list])
292

    
293
  def call_instance_start(self, node, instance, extra_args):
294
    """Starts an instance.
295

296
    This is a single-node call.
297

298
    """
299
    return self._SingleNodeCall(node, "instance_start",
300
                                [self._InstDict(instance), extra_args])
301

    
302
  def call_instance_shutdown(self, node, instance):
303
    """Stops an instance.
304

305
    This is a single-node call.
306

307
    """
308
    return self._SingleNodeCall(node, "instance_shutdown",
309
                                [self._InstDict(instance)])
310

    
311
  def call_instance_migrate(self, node, instance, target, live):
312
    """Migrate an instance.
313

314
    This is a single-node call.
315

316
    @type node: string
317
    @param node: the node on which the instance is currently running
318
    @type instance: C{objects.Instance}
319
    @param instance: the instance definition
320
    @type target: string
321
    @param target: the target node name
322
    @type live: boolean
323
    @param live: whether the migration should be done live or not (the
324
        interpretation of this parameter is left to the hypervisor)
325

326
    """
327
    return self._SingleNodeCall(node, "instance_migrate",
328
                                [self._InstDict(instance), target, live])
329

    
330
  def call_instance_reboot(self, node, instance, reboot_type, extra_args):
331
    """Reboots an instance.
332

333
    This is a single-node call.
334

335
    """
336
    return self._SingleNodeCall(node, "instance_reboot",
337
                                [self._InstDict(instance), reboot_type,
338
                                 extra_args])
339

    
340
  def call_instance_os_add(self, node, inst):
341
    """Installs an OS on the given instance.
342

343
    This is a single-node call.
344

345
    """
346
    return self._SingleNodeCall(node, "instance_os_add",
347
                                [self._InstDict(inst)])
348

    
349
  def call_instance_run_rename(self, node, inst, old_name):
350
    """Run the OS rename script for an instance.
351

352
    This is a single-node call.
353

354
    """
355
    return self._SingleNodeCall(node, "instance_run_rename",
356
                                [self._InstDict(inst), old_name])
357

    
358
  def call_instance_info(self, node, instance, hname):
359
    """Returns information about a single instance.
360

361
    This is a single-node call.
362

363
    @type node: list
364
    @param node: the list of nodes to query
365
    @type instance: string
366
    @param instance: the instance name
367
    @type hname: string
368
    @param hname: the hypervisor type of the instance
369

370
    """
371
    return self._SingleNodeCall(node, "instance_info", [instance, hname])
372

    
373
  def call_all_instances_info(self, node_list, hypervisor_list):
374
    """Returns information about all instances on the given nodes.
375

376
    This is a multi-node call.
377

378
    @type node_list: list
379
    @param node_list: the list of nodes to query
380
    @type hypervisor_list: list
381
    @param hypervisor_list: the hypervisors to query for instances
382

383
    """
384
    return self._MultiNodeCall(node_list, "all_instances_info",
385
                               [hypervisor_list])
386

    
387
  def call_instance_list(self, node_list, hypervisor_list):
388
    """Returns the list of running instances on a given node.
389

390
    This is a multi-node call.
391

392
    @type node_list: list
393
    @param node_list: the list of nodes to query
394
    @type hypervisor_list: list
395
    @param hypervisor_list: the hypervisors to query for instances
396

397
    """
398
    return self._MultiNodeCall(node_list, "instance_list", [hypervisor_list])
399

    
400
  def call_node_tcp_ping(self, node, source, target, port, timeout,
401
                         live_port_needed):
402
    """Do a TcpPing on the remote node
403

404
    This is a single-node call.
405

406
    """
407
    return self._SingleNodeCall(node, "node_tcp_ping",
408
                                [source, target, port, timeout,
409
                                 live_port_needed])
410

    
411
  def call_node_has_ip_address(self, node, address):
412
    """Checks if a node has the given IP address.
413

414
    This is a single-node call.
415

416
    """
417
    return self._SingleNodeCall(node, "node_has_ip_address", [address])
418

    
419
  def call_node_info(self, node_list, vg_name, hypervisor_type):
420
    """Return node information.
421

422
    This will return memory information and volume group size and free
423
    space.
424

425
    This is a multi-node call.
426

427
    @type node_list: list
428
    @param node_list: the list of nodes to query
429
    @type vgname: C{string}
430
    @param vgname: the name of the volume group to ask for disk space
431
        information
432
    @type hypervisor_type: C{str}
433
    @param hypervisor_type: the name of the hypervisor to ask for
434
        memory information
435

436
    """
437
    retux = self._MultiNodeCall(node_list, "node_info",
438
                                [vg_name, hypervisor_type])
439

    
440
    for node_name in retux:
441
      ret = retux.get(node_name, False)
442
      if type(ret) != dict:
443
        logging.error("could not connect to node %s", node_name)
444
        ret = {}
445

    
446
      utils.CheckDict(ret, {
447
                        'memory_total' : '-',
448
                        'memory_dom0' : '-',
449
                        'memory_free' : '-',
450
                        'vg_size' : 'node_unreachable',
451
                        'vg_free' : '-',
452
                      }, "call_node_info")
453
    return retux
454

    
455
  def call_node_add(self, node, dsa, dsapub, rsa, rsapub, ssh, sshpub):
456
    """Add a node to the cluster.
457

458
    This is a single-node call.
459

460
    """
461
    return self._SingleNodeCall(node, "node_add",
462
                                [dsa, dsapub, rsa, rsapub, ssh, sshpub])
463

    
464
  def call_node_verify(self, node_list, checkdict, cluster_name):
465
    """Request verification of given parameters.
466

467
    This is a multi-node call.
468

469
    """
470
    return self._MultiNodeCall(node_list, "node_verify",
471
                               [checkdict, cluster_name])
472

    
473
  @classmethod
474
  def call_node_start_master(cls, node, start_daemons):
475
    """Tells a node to activate itself as a master.
476

477
    This is a single-node call.
478

479
    """
480
    return cls._StaticSingleNodeCall(node, "node_start_master",
481
                                     [start_daemons])
482

    
483
  @classmethod
484
  def call_node_stop_master(cls, node, stop_daemons):
485
    """Tells a node to demote itself from master status.
486

487
    This is a single-node call.
488

489
    """
490
    return cls._StaticSingleNodeCall(node, "node_stop_master", [stop_daemons])
491

    
492
  @classmethod
493
  def call_master_info(cls, node_list):
494
    """Query master info.
495

496
    This is a multi-node call.
497

498
    """
499
    # TODO: should this method query down nodes?
500
    return cls._StaticMultiNodeCall(node_list, "master_info", [])
501

    
502
  def call_version(self, node_list):
503
    """Query node version.
504

505
    This is a multi-node call.
506

507
    """
508
    return self._MultiNodeCall(node_list, "version", [])
509

    
510
  def call_blockdev_create(self, node, bdev, size, owner, on_primary, info):
511
    """Request creation of a given block device.
512

513
    This is a single-node call.
514

515
    """
516
    return self._SingleNodeCall(node, "blockdev_create",
517
                                [bdev.ToDict(), size, owner, on_primary, info])
518

    
519
  def call_blockdev_remove(self, node, bdev):
520
    """Request removal of a given block device.
521

522
    This is a single-node call.
523

524
    """
525
    return self._SingleNodeCall(node, "blockdev_remove", [bdev.ToDict()])
526

    
527
  def call_blockdev_rename(self, node, devlist):
528
    """Request rename of the given block devices.
529

530
    This is a single-node call.
531

532
    """
533
    return self._SingleNodeCall(node, "blockdev_rename",
534
                                [(d.ToDict(), uid) for d, uid in devlist])
535

    
536
  def call_blockdev_assemble(self, node, disk, owner, on_primary):
537
    """Request assembling of a given block device.
538

539
    This is a single-node call.
540

541
    """
542
    return self._SingleNodeCall(node, "blockdev_assemble",
543
                                [disk.ToDict(), owner, on_primary])
544

    
545
  def call_blockdev_shutdown(self, node, disk):
546
    """Request shutdown of a given block device.
547

548
    This is a single-node call.
549

550
    """
551
    return self._SingleNodeCall(node, "blockdev_shutdown", [disk.ToDict()])
552

    
553
  def call_blockdev_addchildren(self, node, bdev, ndevs):
554
    """Request adding a list of children to a (mirroring) device.
555

556
    This is a single-node call.
557

558
    """
559
    return self._SingleNodeCall(node, "blockdev_addchildren",
560
                                [bdev.ToDict(),
561
                                 [disk.ToDict() for disk in ndevs]])
562

    
563
  def call_blockdev_removechildren(self, node, bdev, ndevs):
564
    """Request removing a list of children from a (mirroring) device.
565

566
    This is a single-node call.
567

568
    """
569
    return self._SingleNodeCall(node, "blockdev_removechildren",
570
                                [bdev.ToDict(),
571
                                 [disk.ToDict() for disk in ndevs]])
572

    
573
  def call_blockdev_getmirrorstatus(self, node, disks):
574
    """Request status of a (mirroring) device.
575

576
    This is a single-node call.
577

578
    """
579
    return self._SingleNodeCall(node, "blockdev_getmirrorstatus",
580
                                [dsk.ToDict() for dsk in disks])
581

    
582
  def call_blockdev_find(self, node, disk):
583
    """Request identification of a given block device.
584

585
    This is a single-node call.
586

587
    """
588
    return self._SingleNodeCall(node, "blockdev_find", [disk.ToDict()])
589

    
590
  def call_blockdev_close(self, node, disks):
591
    """Closes the given block devices.
592

593
    This is a single-node call.
594

595
    """
596
    return self._SingleNodeCall(node, "blockdev_close",
597
                                [cf.ToDict() for cf in disks])
598

    
599
  @classmethod
600
  def call_upload_file(cls, node_list, file_name, address_list=None):
601
    """Upload a file.
602

603
    The node will refuse the operation in case the file is not on the
604
    approved file list.
605

606
    This is a multi-node call.
607

608
    @type node_list: list
609
    @param node_list: the list of node names to upload to
610
    @type file_name: str
611
    @param file_name: the filename to upload
612
    @type address_list: list or None
613
    @keyword address_list: an optional list of node addresses, in order
614
        to optimize the RPC speed
615

616
    """
617
    data = utils.ReadFile(file_name)
618
    st = os.stat(file_name)
619
    params = [file_name, data, st.st_mode, st.st_uid, st.st_gid,
620
              st.st_atime, st.st_mtime]
621
    return cls._StaticMultiNodeCall(node_list, "upload_file", params,
622
                                    address_list=address_list)
623

    
624
  @classmethod
625
  def call_write_ssconf_files(cls, node_list, values):
626
    """Write ssconf files.
627

628
    This is a multi-node call.
629

630
    """
631
    return cls._StaticMultiNodeCall(node_list, "write_ssconf_files", [values])
632

    
633
  def call_os_diagnose(self, node_list):
634
    """Request a diagnose of OS definitions.
635

636
    This is a multi-node call.
637

638
    """
639
    result = self._MultiNodeCall(node_list, "os_diagnose", [])
640

    
641
    new_result = {}
642
    for node_name in result:
643
      if result[node_name]:
644
        nr = [objects.OS.FromDict(oss) for oss in result[node_name]]
645
      else:
646
        nr = []
647
      new_result[node_name] = nr
648
    return new_result
649

    
650
  def call_os_get(self, node, name):
651
    """Returns an OS definition.
652

653
    This is a single-node call.
654

655
    """
656
    result = self._SingleNodeCall(node, "os_get", [name])
657
    if isinstance(result, dict):
658
      return objects.OS.FromDict(result)
659
    else:
660
      return result
661

    
662
  def call_hooks_runner(self, node_list, hpath, phase, env):
663
    """Call the hooks runner.
664

665
    Args:
666
      - op: the OpCode instance
667
      - env: a dictionary with the environment
668

669
    This is a multi-node call.
670

671
    """
672
    params = [hpath, phase, env]
673
    return self._MultiNodeCall(node_list, "hooks_runner", params)
674

    
675
  def call_iallocator_runner(self, node, name, idata):
676
    """Call an iallocator on a remote node
677

678
    Args:
679
      - name: the iallocator name
680
      - input: the json-encoded input string
681

682
    This is a single-node call.
683

684
    """
685
    return self._SingleNodeCall(node, "iallocator_runner", [name, idata])
686

    
687
  def call_blockdev_grow(self, node, cf_bdev, amount):
688
    """Request a snapshot of the given block device.
689

690
    This is a single-node call.
691

692
    """
693
    return self._SingleNodeCall(node, "blockdev_grow",
694
                                [cf_bdev.ToDict(), amount])
695

    
696
  def call_blockdev_snapshot(self, node, cf_bdev):
697
    """Request a snapshot of the given block device.
698

699
    This is a single-node call.
700

701
    """
702
    return self._SingleNodeCall(node, "blockdev_snapshot", [cf_bdev.ToDict()])
703

    
704
  def call_snapshot_export(self, node, snap_bdev, dest_node, instance,
705
                           cluster_name, idx):
706
    """Request the export of a given snapshot.
707

708
    This is a single-node call.
709

710
    """
711
    return self._SingleNodeCall(node, "snapshot_export",
712
                                [snap_bdev.ToDict(), dest_node,
713
                                 self._InstDict(instance), cluster_name, idx])
714

    
715
  def call_finalize_export(self, node, instance, snap_disks):
716
    """Request the completion of an export operation.
717

718
    This writes the export config file, etc.
719

720
    This is a single-node call.
721

722
    """
723
    flat_disks = []
724
    for disk in snap_disks:
725
      flat_disks.append(disk.ToDict())
726

    
727
    return self._SingleNodeCall(node, "finalize_export",
728
                                [self._InstDict(instance), flat_disks])
729

    
730
  def call_export_info(self, node, path):
731
    """Queries the export information in a given path.
732

733
    This is a single-node call.
734

735
    """
736
    result = self._SingleNodeCall(node, "export_info", [path])
737
    if not result:
738
      return result
739
    return objects.SerializableConfigParser.Loads(str(result))
740

    
741
  def call_instance_os_import(self, node, inst, src_node, src_images,
742
                              cluster_name):
743
    """Request the import of a backup into an instance.
744

745
    This is a single-node call.
746

747
    """
748
    return self._SingleNodeCall(node, "instance_os_import",
749
                                [self._InstDict(inst), src_node, src_images,
750
                                 cluster_name])
751

    
752
  def call_export_list(self, node_list):
753
    """Gets the stored exports list.
754

755
    This is a multi-node call.
756

757
    """
758
    return self._MultiNodeCall(node_list, "export_list", [])
759

    
760
  def call_export_remove(self, node, export):
761
    """Requests removal of a given export.
762

763
    This is a single-node call.
764

765
    """
766
    return self._SingleNodeCall(node, "export_remove", [export])
767

    
768
  @classmethod
769
  def call_node_leave_cluster(cls, node):
770
    """Requests a node to clean the cluster information it has.
771

772
    This will remove the configuration information from the ganeti data
773
    dir.
774

775
    This is a single-node call.
776

777
    """
778
    return cls._StaticSingleNodeCall(node, "node_leave_cluster", [])
779

    
780
  def call_node_volumes(self, node_list):
781
    """Gets all volumes on node(s).
782

783
    This is a multi-node call.
784

785
    """
786
    return self._MultiNodeCall(node_list, "node_volumes", [])
787

    
788
  def call_test_delay(self, node_list, duration):
789
    """Sleep for a fixed time on given node(s).
790

791
    This is a multi-node call.
792

793
    """
794
    return self._MultiNodeCall(node_list, "test_delay", [duration])
795

    
796
  def call_file_storage_dir_create(self, node, file_storage_dir):
797
    """Create the given file storage directory.
798

799
    This is a single-node call.
800

801
    """
802
    return self._SingleNodeCall(node, "file_storage_dir_create",
803
                                [file_storage_dir])
804

    
805
  def call_file_storage_dir_remove(self, node, file_storage_dir):
806
    """Remove the given file storage directory.
807

808
    This is a single-node call.
809

810
    """
811
    return self._SingleNodeCall(node, "file_storage_dir_remove",
812
                                [file_storage_dir])
813

    
814
  def call_file_storage_dir_rename(self, node, old_file_storage_dir,
815
                                   new_file_storage_dir):
816
    """Rename file storage directory.
817

818
    This is a single-node call.
819

820
    """
821
    return self._SingleNodeCall(node, "file_storage_dir_rename",
822
                                [old_file_storage_dir, new_file_storage_dir])
823

    
824
  @classmethod
825
  def call_jobqueue_update(cls, node_list, address_list, file_name, content):
826
    """Update job queue.
827

828
    This is a multi-node call.
829

830
    """
831
    return cls._StaticMultiNodeCall(node_list, "jobqueue_update",
832
                                    [file_name, content],
833
                                    address_list=address_list)
834

    
835
  @classmethod
836
  def call_jobqueue_purge(cls, node):
837
    """Purge job queue.
838

839
    This is a single-node call.
840

841
    """
842
    return cls._StaticSingleNodeCall(node, "jobqueue_purge", [])
843

    
844
  @classmethod
845
  def call_jobqueue_rename(cls, node_list, address_list, old, new):
846
    """Rename a job queue file.
847

848
    This is a multi-node call.
849

850
    """
851
    return cls._StaticMultiNodeCall(node_list, "jobqueue_rename", [old, new],
852
                                    address_list=address_list)
853

    
854
  @classmethod
855
  def call_jobqueue_set_drain(cls, node_list, drain_flag):
856
    """Set the drain flag on the queue.
857

858
    This is a multi-node call.
859

860
    @type node_list: list
861
    @param node_list: the list of nodes to query
862
    @type drain_flag: bool
863
    @param drain_flag: if True, will set the drain flag, otherwise reset it.
864

865
    """
866
    return cls._StaticMultiNodeCall(node_list, "jobqueue_set_drain",
867
                                    [drain_flag])
868

    
869
  def call_hypervisor_validate_params(self, node_list, hvname, hvparams):
870
    """Validate the hypervisor params.
871

872
    This is a multi-node call.
873

874
    @type node_list: list
875
    @param node_list: the list of nodes to query
876
    @type hvname: string
877
    @param hvname: the hypervisor name
878
    @type hvparams: dict
879
    @param hvparams: the hypervisor parameters to be validated
880

881
    """
882
    cluster = self._cfg.GetClusterInfo()
883
    hv_full = cluster.FillDict(cluster.hvparams.get(hvname, {}), hvparams)
884
    return self._MultiNodeCall(node_list, "hypervisor_validate_params",
885
                               [hvname, hv_full])