Statistics
| Branch: | Tag: | Revision:

root / lib / rpc.py @ eafd8762

History | View | Annotate | Download (25 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Inter-node RPC library.
23

24
"""
25

    
26
# pylint: disable-msg=C0103,R0201,R0904
27
# C0103: Invalid name, since call_ are not valid
28
# R0201: Method could be a function, we keep all rpcs instance methods
29
# as not to change them back and forth between static/instance methods
30
# if they need to start using instance attributes
31
# R0904: Too many public methods
32

    
33
import os
34
import socket
35
import httplib
36
import logging
37

    
38
from ganeti import utils
39
from ganeti import objects
40
from ganeti import http
41
from ganeti import serializer
42
from ganeti import constants
43

    
44

    
45
# Module level variable
46
_http_manager = None
47

    
48

    
49
def Init():
50
  """Initializes the module-global HTTP client manager.
51

52
  Must be called before using any RPC function.
53

54
  """
55
  global _http_manager
56

    
57
  assert not _http_manager, "RPC module initialized more than once"
58

    
59
  _http_manager = http.HttpClientManager()
60

    
61

    
62
def Shutdown():
63
  """Stops the module-global HTTP client manager.
64

65
  Must be called before quitting the program.
66

67
  """
68
  global _http_manager
69

    
70
  if _http_manager:
71
    _http_manager.Shutdown()
72
    _http_manager = None
73

    
74

    
75
class Client:
76
  """RPC Client class.
77

78
  This class, given a (remote) method name, a list of parameters and a
79
  list of nodes, will contact (in parallel) all nodes, and return a
80
  dict of results (key: node name, value: result).
81

82
  One current bug is that generic failure is still signalled by
83
  'False' result, which is not good. This overloading of values can
84
  cause bugs.
85

86
  """
87
  def __init__(self, procedure, args):
88
    self.procedure = procedure
89
    self.args = args
90
    self.body = serializer.DumpJson(args, indent=False)
91

    
92
    self.port = utils.GetNodeDaemonPort()
93
    self.nc = {}
94

    
95
  def ConnectList(self, node_list, address_list=None):
96
    """Add a list of nodes to the target nodes.
97

98
    @type node_list: list
99
    @param node_list: the list of node names to connect
100
    @type address_list: list or None
101
    @keyword address_list: either None or a list with node addresses,
102
        which must have the same length as the node list
103

104
    """
105
    if address_list is None:
106
      address_list = [None for _ in node_list]
107
    else:
108
      assert len(node_list) == len(address_list), \
109
             "Name and address lists should have the same length"
110
    for node, address in zip(node_list, address_list):
111
      self.ConnectNode(node, address)
112

    
113
  def ConnectNode(self, name, address=None):
114
    """Add a node to the target list.
115

116
    @type name: str
117
    @param name: the node name
118
    @type address: str
119
    @keyword address: the node address, if known
120

121
    """
122
    if address is None:
123
      address = name
124

    
125
    # TODO: Cache key and certificate for different requests
126
    ssl_params = http.HttpSslParams(ssl_key_path=constants.SSL_CERT_FILE,
127
                                    ssl_cert_path=constants.SSL_CERT_FILE)
128

    
129
    self.nc[name] = http.HttpClientRequest(address, self.port, http.HTTP_PUT,
130
                                           "/%s" % self.procedure,
131
                                           post_data=self.body,
132
                                           ssl_params=ssl_params,
133
                                           ssl_verify_peer=True)
134

    
135
  def GetResults(self):
136
    """Call nodes and return results.
137

138
    @rtype: list
139
    @returns: List of RPC results
140

141
    """
142
    assert _http_manager, "RPC module not intialized"
143

    
144
    _http_manager.ExecRequests(self.nc.values())
145

    
146
    results = {}
147

    
148
    for name, req in self.nc.iteritems():
149
      if req.success and req.resp_status == http.HTTP_OK:
150
        results[name] = serializer.LoadJson(req.resp_body)
151
        continue
152

    
153
      if req.error:
154
        msg = req.error
155
      else:
156
        msg = req.resp_body
157

    
158
      logging.error("RPC error from node %s: %s", name, msg)
159
      results[name] = False
160

    
161
    return results
162

    
163

    
164
class RpcRunner(object):
165
  """RPC runner class"""
166

    
167
  def __init__(self, cfg):
168
    """Initialized the rpc runner.
169

170
    @type cfg:  C{config.ConfigWriter}
171
    @param cfg: the configuration object that will be used to get data
172
                about the cluster
173

174
    """
175
    self._cfg = cfg
176

    
177
  def _InstDict(self, instance):
178
    """Convert the given instance to a dict.
179

180
    This is done via the instance's ToDict() method and additionally
181
    we fill the hvparams with the cluster defaults.
182

183
    @type instance: L{objects.Instance}
184
    @param instance: an Instance object
185
    @rtype: dict
186
    @return: the instance dict, with the hvparams filled with the
187
        cluster defaults
188

189
    """
190
    idict = instance.ToDict()
191
    cluster = self._cfg.GetClusterInfo()
192
    idict["hvparams"] = cluster.FillHV(instance)
193
    idict["beparams"] = cluster.FillBE(instance)
194
    return idict
195

    
196
  def _ConnectList(self, client, node_list):
197
    """Helper for computing node addresses.
198

199
    @type client: L{Client}
200
    @param client: a C{Client} instance
201
    @type node_list: list
202
    @param node_list: the node list we should connect
203

204
    """
205
    all_nodes = self._cfg.GetAllNodesInfo()
206
    addr_list = []
207
    for node in node_list:
208
      if node in all_nodes:
209
        val = all_nodes[node].primary_ip
210
      else:
211
        val = None
212
      addr_list.append(val)
213
    client.ConnectList(node_list, address_list=addr_list)
214

    
215
  def _ConnectNode(self, client, node):
216
    """Helper for computing one node's address.
217

218
    @type client: L{Client}
219
    @param client: a C{Client} instance
220
    @type node: str
221
    @param node: the node we should connect
222

223
    """
224
    node_info = self._cfg.GetNodeInfo(node)
225
    if node_info is not None:
226
      addr = node_info.primary_ip
227
    else:
228
      addr = None
229
    client.ConnectNode(node, address=addr)
230

    
231
  def _MultiNodeCall(self, node_list, procedure, args,
232
                     address_list=None):
233
    c = Client(procedure, args)
234
    if address_list is None:
235
      self._ConnectList(c, node_list)
236
    else:
237
      c.ConnectList(node_list, address_list=address_list)
238
    return c.GetResults()
239

    
240
  @classmethod
241
  def _StaticMultiNodeCall(cls, node_list, procedure, args,
242
                           address_list=None):
243
    c = Client(procedure, args)
244
    c.ConnectList(node_list, address_list=address_list)
245
    return c.GetResults()
246

    
247
  def _SingleNodeCall(self, node, procedure, args):
248
    """
249

250
    """
251
    c = Client(procedure, args)
252
    self._ConnectNode(c, node)
253
    return c.GetResults().get(node, False)
254

    
255
  @classmethod
256
  def _StaticSingleNodeCall(cls, node, procedure, args):
257
    """
258

259
    """
260
    c = Client(procedure, args)
261
    c.ConnectNode(c, node)
262
    return c.GetResults().get(node, False)
263

    
264
  def call_volume_list(self, node_list, vg_name):
265
    """Gets the logical volumes present in a given volume group.
266

267
    This is a multi-node call.
268

269
    """
270
    return self._MultiNodeCall(node_list, "volume_list", [vg_name])
271

    
272
  def call_vg_list(self, node_list):
273
    """Gets the volume group list.
274

275
    This is a multi-node call.
276

277
    """
278
    return self._MultiNodeCall(node_list, "vg_list", [])
279

    
280
  def call_bridges_exist(self, node, bridges_list):
281
    """Checks if a node has all the bridges given.
282

283
    This method checks if all bridges given in the bridges_list are
284
    present on the remote node, so that an instance that uses interfaces
285
    on those bridges can be started.
286

287
    This is a single-node call.
288

289
    """
290
    return self._SingleNodeCall(node, "bridges_exist", [bridges_list])
291

    
292
  def call_instance_start(self, node, instance, extra_args):
293
    """Starts an instance.
294

295
    This is a single-node call.
296

297
    """
298
    return self._SingleNodeCall(node, "instance_start",
299
                                [self._InstDict(instance), extra_args])
300

    
301
  def call_instance_shutdown(self, node, instance):
302
    """Stops an instance.
303

304
    This is a single-node call.
305

306
    """
307
    return self._SingleNodeCall(node, "instance_shutdown",
308
                                [self._InstDict(instance)])
309

    
310
  def call_instance_migrate(self, node, instance, target, live):
311
    """Migrate an instance.
312

313
    This is a single-node call.
314

315
    @type node: string
316
    @param node: the node on which the instance is currently running
317
    @type instance: C{objects.Instance}
318
    @param instance: the instance definition
319
    @type target: string
320
    @param target: the target node name
321
    @type live: boolean
322
    @param live: whether the migration should be done live or not (the
323
        interpretation of this parameter is left to the hypervisor)
324

325
    """
326
    return self._SingleNodeCall(node, "instance_migrate",
327
                                [self._InstDict(instance), target, live])
328

    
329
  def call_instance_reboot(self, node, instance, reboot_type, extra_args):
330
    """Reboots an instance.
331

332
    This is a single-node call.
333

334
    """
335
    return self._SingleNodeCall(node, "instance_reboot",
336
                                [self._InstDict(instance), reboot_type,
337
                                 extra_args])
338

    
339
  def call_instance_os_add(self, node, inst):
340
    """Installs an OS on the given instance.
341

342
    This is a single-node call.
343

344
    """
345
    return self._SingleNodeCall(node, "instance_os_add",
346
                                [self._InstDict(inst)])
347

    
348
  def call_instance_run_rename(self, node, inst, old_name):
349
    """Run the OS rename script for an instance.
350

351
    This is a single-node call.
352

353
    """
354
    return self._SingleNodeCall(node, "instance_run_rename",
355
                                [self._InstDict(inst), old_name])
356

    
357
  def call_instance_info(self, node, instance, hname):
358
    """Returns information about a single instance.
359

360
    This is a single-node call.
361

362
    @type node: list
363
    @param node: the list of nodes to query
364
    @type instance: string
365
    @param instance: the instance name
366
    @type hname: string
367
    @param hname: the hypervisor type of the instance
368

369
    """
370
    return self._SingleNodeCall(node, "instance_info", [instance, hname])
371

    
372
  def call_all_instances_info(self, node_list, hypervisor_list):
373
    """Returns information about all instances on the given nodes.
374

375
    This is a multi-node call.
376

377
    @type node_list: list
378
    @param node_list: the list of nodes to query
379
    @type hypervisor_list: list
380
    @param hypervisor_list: the hypervisors to query for instances
381

382
    """
383
    return self._MultiNodeCall(node_list, "all_instances_info",
384
                               [hypervisor_list])
385

    
386
  def call_instance_list(self, node_list, hypervisor_list):
387
    """Returns the list of running instances on a given node.
388

389
    This is a multi-node call.
390

391
    @type node_list: list
392
    @param node_list: the list of nodes to query
393
    @type hypervisor_list: list
394
    @param hypervisor_list: the hypervisors to query for instances
395

396
    """
397
    return self._MultiNodeCall(node_list, "instance_list", [hypervisor_list])
398

    
399
  def call_node_tcp_ping(self, node, source, target, port, timeout,
400
                         live_port_needed):
401
    """Do a TcpPing on the remote node
402

403
    This is a single-node call.
404

405
    """
406
    return self._SingleNodeCall(node, "node_tcp_ping",
407
                                [source, target, port, timeout,
408
                                 live_port_needed])
409

    
410
  def call_node_has_ip_address(self, node, address):
411
    """Checks if a node has the given IP address.
412

413
    This is a single-node call.
414

415
    """
416
    return self._SingleNodeCall(node, "node_has_ip_address", [address])
417

    
418
  def call_node_info(self, node_list, vg_name, hypervisor_type):
419
    """Return node information.
420

421
    This will return memory information and volume group size and free
422
    space.
423

424
    This is a multi-node call.
425

426
    @type node_list: list
427
    @param node_list: the list of nodes to query
428
    @type vgname: C{string}
429
    @param vgname: the name of the volume group to ask for disk space
430
        information
431
    @type hypervisor_type: C{str}
432
    @param hypervisor_type: the name of the hypervisor to ask for
433
        memory information
434

435
    """
436
    retux = self._MultiNodeCall(node_list, "node_info",
437
                                [vg_name, hypervisor_type])
438

    
439
    for node_name in retux:
440
      ret = retux.get(node_name, False)
441
      if type(ret) != dict:
442
        logging.error("could not connect to node %s", node_name)
443
        ret = {}
444

    
445
      utils.CheckDict(ret, {
446
                        'memory_total' : '-',
447
                        'memory_dom0' : '-',
448
                        'memory_free' : '-',
449
                        'vg_size' : 'node_unreachable',
450
                        'vg_free' : '-',
451
                      }, "call_node_info")
452
    return retux
453

    
454
  def call_node_add(self, node, dsa, dsapub, rsa, rsapub, ssh, sshpub):
455
    """Add a node to the cluster.
456

457
    This is a single-node call.
458

459
    """
460
    return self._SingleNodeCall(node, "node_add",
461
                                [dsa, dsapub, rsa, rsapub, ssh, sshpub])
462

    
463
  def call_node_verify(self, node_list, checkdict, cluster_name):
464
    """Request verification of given parameters.
465

466
    This is a multi-node call.
467

468
    """
469
    return self._MultiNodeCall(node_list, "node_verify",
470
                               [checkdict, cluster_name])
471

    
472
  @classmethod
473
  def call_node_start_master(cls, node, start_daemons):
474
    """Tells a node to activate itself as a master.
475

476
    This is a single-node call.
477

478
    """
479
    return cls._StaticSingleNodeCall(node, "node_start_master",
480
                                     [start_daemons])
481

    
482
  @classmethod
483
  def call_node_stop_master(cls, node, stop_daemons):
484
    """Tells a node to demote itself from master status.
485

486
    This is a single-node call.
487

488
    """
489
    return cls._StaticSingleNodeCall(node, "node_stop_master", [stop_daemons])
490

    
491
  @classmethod
492
  def call_master_info(cls, node_list):
493
    """Query master info.
494

495
    This is a multi-node call.
496

497
    """
498
    # TODO: should this method query down nodes?
499
    return cls._StaticMultiNodeCall(node_list, "master_info", [])
500

    
501
  def call_version(self, node_list):
502
    """Query node version.
503

504
    This is a multi-node call.
505

506
    """
507
    return self._MultiNodeCall(node_list, "version", [])
508

    
509
  def call_blockdev_create(self, node, bdev, size, owner, on_primary, info):
510
    """Request creation of a given block device.
511

512
    This is a single-node call.
513

514
    """
515
    return self._SingleNodeCall(node, "blockdev_create",
516
                                [bdev.ToDict(), size, owner, on_primary, info])
517

    
518
  def call_blockdev_remove(self, node, bdev):
519
    """Request removal of a given block device.
520

521
    This is a single-node call.
522

523
    """
524
    return self._SingleNodeCall(node, "blockdev_remove", [bdev.ToDict()])
525

    
526
  def call_blockdev_rename(self, node, devlist):
527
    """Request rename of the given block devices.
528

529
    This is a single-node call.
530

531
    """
532
    return self._SingleNodeCall(node, "blockdev_rename",
533
                                [(d.ToDict(), uid) for d, uid in devlist])
534

    
535
  def call_blockdev_assemble(self, node, disk, owner, on_primary):
536
    """Request assembling of a given block device.
537

538
    This is a single-node call.
539

540
    """
541
    return self._SingleNodeCall(node, "blockdev_assemble",
542
                                [disk.ToDict(), owner, on_primary])
543

    
544
  def call_blockdev_shutdown(self, node, disk):
545
    """Request shutdown of a given block device.
546

547
    This is a single-node call.
548

549
    """
550
    return self._SingleNodeCall(node, "blockdev_shutdown", [disk.ToDict()])
551

    
552
  def call_blockdev_addchildren(self, node, bdev, ndevs):
553
    """Request adding a list of children to a (mirroring) device.
554

555
    This is a single-node call.
556

557
    """
558
    return self._SingleNodeCall(node, "blockdev_addchildren",
559
                                [bdev.ToDict(),
560
                                 [disk.ToDict() for disk in ndevs]])
561

    
562
  def call_blockdev_removechildren(self, node, bdev, ndevs):
563
    """Request removing a list of children from a (mirroring) device.
564

565
    This is a single-node call.
566

567
    """
568
    return self._SingleNodeCall(node, "blockdev_removechildren",
569
                                [bdev.ToDict(),
570
                                 [disk.ToDict() for disk in ndevs]])
571

    
572
  def call_blockdev_getmirrorstatus(self, node, disks):
573
    """Request status of a (mirroring) device.
574

575
    This is a single-node call.
576

577
    """
578
    return self._SingleNodeCall(node, "blockdev_getmirrorstatus",
579
                                [dsk.ToDict() for dsk in disks])
580

    
581
  def call_blockdev_find(self, node, disk):
582
    """Request identification of a given block device.
583

584
    This is a single-node call.
585

586
    """
587
    return self._SingleNodeCall(node, "blockdev_find", [disk.ToDict()])
588

    
589
  def call_blockdev_close(self, node, disks):
590
    """Closes the given block devices.
591

592
    This is a single-node call.
593

594
    """
595
    return self._SingleNodeCall(node, "blockdev_close",
596
                                [cf.ToDict() for cf in disks])
597

    
598
  @classmethod
599
  def call_upload_file(cls, node_list, file_name, address_list=None):
600
    """Upload a file.
601

602
    The node will refuse the operation in case the file is not on the
603
    approved file list.
604

605
    This is a multi-node call.
606

607
    @type node_list: list
608
    @param node_list: the list of node names to upload to
609
    @type file_name: str
610
    @param file_name: the filename to upload
611
    @type address_list: list or None
612
    @keyword address_list: an optional list of node addresses, in order
613
        to optimize the RPC speed
614

615
    """
616
    data = utils.ReadFile(file_name)
617
    st = os.stat(file_name)
618
    params = [file_name, data, st.st_mode, st.st_uid, st.st_gid,
619
              st.st_atime, st.st_mtime]
620
    return cls._StaticMultiNodeCall(node_list, "upload_file", params,
621
                                    address_list=address_list)
622

    
623
  @classmethod
624
  def call_write_ssconf_files(cls, node_list):
625
    """Write ssconf files.
626

627
    This is a multi-node call.
628

629
    """
630
    return cls._StaticMultiNodeCall(node_list, "write_ssconf_files", [])
631

    
632
  def call_os_diagnose(self, node_list):
633
    """Request a diagnose of OS definitions.
634

635
    This is a multi-node call.
636

637
    """
638
    result = self._MultiNodeCall(node_list, "os_diagnose", [])
639

    
640
    new_result = {}
641
    for node_name in result:
642
      if result[node_name]:
643
        nr = [objects.OS.FromDict(oss) for oss in result[node_name]]
644
      else:
645
        nr = []
646
      new_result[node_name] = nr
647
    return new_result
648

    
649
  def call_os_get(self, node, name):
650
    """Returns an OS definition.
651

652
    This is a single-node call.
653

654
    """
655
    result = self._SingleNodeCall(node, "os_get", [name])
656
    if isinstance(result, dict):
657
      return objects.OS.FromDict(result)
658
    else:
659
      return result
660

    
661
  def call_hooks_runner(self, node_list, hpath, phase, env):
662
    """Call the hooks runner.
663

664
    Args:
665
      - op: the OpCode instance
666
      - env: a dictionary with the environment
667

668
    This is a multi-node call.
669

670
    """
671
    params = [hpath, phase, env]
672
    return self._MultiNodeCall(node_list, "hooks_runner", params)
673

    
674
  def call_iallocator_runner(self, node, name, idata):
675
    """Call an iallocator on a remote node
676

677
    Args:
678
      - name: the iallocator name
679
      - input: the json-encoded input string
680

681
    This is a single-node call.
682

683
    """
684
    return self._SingleNodeCall(node, "iallocator_runner", [name, idata])
685

    
686
  def call_blockdev_grow(self, node, cf_bdev, amount):
687
    """Request a snapshot of the given block device.
688

689
    This is a single-node call.
690

691
    """
692
    return self._SingleNodeCall(node, "blockdev_grow",
693
                                [cf_bdev.ToDict(), amount])
694

    
695
  def call_blockdev_snapshot(self, node, cf_bdev):
696
    """Request a snapshot of the given block device.
697

698
    This is a single-node call.
699

700
    """
701
    return self._SingleNodeCall(node, "blockdev_snapshot", [cf_bdev.ToDict()])
702

    
703
  def call_snapshot_export(self, node, snap_bdev, dest_node, instance,
704
                           cluster_name, idx):
705
    """Request the export of a given snapshot.
706

707
    This is a single-node call.
708

709
    """
710
    return self._SingleNodeCall(node, "snapshot_export",
711
                                [snap_bdev.ToDict(), dest_node,
712
                                 self._InstDict(instance), cluster_name, idx])
713

    
714
  def call_finalize_export(self, node, instance, snap_disks):
715
    """Request the completion of an export operation.
716

717
    This writes the export config file, etc.
718

719
    This is a single-node call.
720

721
    """
722
    flat_disks = []
723
    for disk in snap_disks:
724
      flat_disks.append(disk.ToDict())
725

    
726
    return self._SingleNodeCall(node, "finalize_export",
727
                                [self._InstDict(instance), flat_disks])
728

    
729
  def call_export_info(self, node, path):
730
    """Queries the export information in a given path.
731

732
    This is a single-node call.
733

734
    """
735
    result = self._SingleNodeCall(node, "export_info", [path])
736
    if not result:
737
      return result
738
    return objects.SerializableConfigParser.Loads(str(result))
739

    
740
  def call_instance_os_import(self, node, inst, src_node, src_images,
741
                              cluster_name):
742
    """Request the import of a backup into an instance.
743

744
    This is a single-node call.
745

746
    """
747
    return self._SingleNodeCall(node, "instance_os_import",
748
                                [self._InstDict(inst), src_node, src_images,
749
                                 cluster_name])
750

    
751
  def call_export_list(self, node_list):
752
    """Gets the stored exports list.
753

754
    This is a multi-node call.
755

756
    """
757
    return self._MultiNodeCall(node_list, "export_list", [])
758

    
759
  def call_export_remove(self, node, export):
760
    """Requests removal of a given export.
761

762
    This is a single-node call.
763

764
    """
765
    return self._SingleNodeCall(node, "export_remove", [export])
766

    
767
  @classmethod
768
  def call_node_leave_cluster(cls, node):
769
    """Requests a node to clean the cluster information it has.
770

771
    This will remove the configuration information from the ganeti data
772
    dir.
773

774
    This is a single-node call.
775

776
    """
777
    return cls._StaticSingleNodeCall(node, "node_leave_cluster", [])
778

    
779
  def call_node_volumes(self, node_list):
780
    """Gets all volumes on node(s).
781

782
    This is a multi-node call.
783

784
    """
785
    return self._MultiNodeCall(node_list, "node_volumes", [])
786

    
787
  def call_test_delay(self, node_list, duration):
788
    """Sleep for a fixed time on given node(s).
789

790
    This is a multi-node call.
791

792
    """
793
    return self._MultiNodeCall(node_list, "test_delay", [duration])
794

    
795
  def call_file_storage_dir_create(self, node, file_storage_dir):
796
    """Create the given file storage directory.
797

798
    This is a single-node call.
799

800
    """
801
    return self._SingleNodeCall(node, "file_storage_dir_create",
802
                                [file_storage_dir])
803

    
804
  def call_file_storage_dir_remove(self, node, file_storage_dir):
805
    """Remove the given file storage directory.
806

807
    This is a single-node call.
808

809
    """
810
    return self._SingleNodeCall(node, "file_storage_dir_remove",
811
                                [file_storage_dir])
812

    
813
  def call_file_storage_dir_rename(self, node, old_file_storage_dir,
814
                                   new_file_storage_dir):
815
    """Rename file storage directory.
816

817
    This is a single-node call.
818

819
    """
820
    return self._SingleNodeCall(node, "file_storage_dir_rename",
821
                                [old_file_storage_dir, new_file_storage_dir])
822

    
823
  @classmethod
824
  def call_jobqueue_update(cls, node_list, address_list, file_name, content):
825
    """Update job queue.
826

827
    This is a multi-node call.
828

829
    """
830
    return cls._StaticMultiNodeCall(node_list, "jobqueue_update",
831
                                    [file_name, content],
832
                                    address_list=address_list)
833

    
834
  @classmethod
835
  def call_jobqueue_purge(cls, node):
836
    """Purge job queue.
837

838
    This is a single-node call.
839

840
    """
841
    return cls._StaticSingleNodeCall(node, "jobqueue_purge", [])
842

    
843
  @classmethod
844
  def call_jobqueue_rename(cls, node_list, address_list, old, new):
845
    """Rename a job queue file.
846

847
    This is a multi-node call.
848

849
    """
850
    return cls._StaticMultiNodeCall(node_list, "jobqueue_rename", [old, new],
851
                                    address_list=address_list)
852

    
853
  @classmethod
854
  def call_jobqueue_set_drain(cls, node_list, drain_flag):
855
    """Set the drain flag on the queue.
856

857
    This is a multi-node call.
858

859
    @type node_list: list
860
    @param node_list: the list of nodes to query
861
    @type drain_flag: bool
862
    @param drain_flag: if True, will set the drain flag, otherwise reset it.
863

864
    """
865
    return cls._StaticMultiNodeCall(node_list, "jobqueue_set_drain",
866
                                    [drain_flag])
867

    
868
  def call_hypervisor_validate_params(self, node_list, hvname, hvparams):
869
    """Validate the hypervisor params.
870

871
    This is a multi-node call.
872

873
    @type node_list: list
874
    @param node_list: the list of nodes to query
875
    @type hvname: string
876
    @param hvname: the hypervisor name
877
    @type hvparams: dict
878
    @param hvparams: the hypervisor parameters to be validated
879

880
    """
881
    cluster = self._cfg.GetClusterInfo()
882
    hv_full = cluster.FillDict(cluster.hvparams.get(hvname, {}), hvparams)
883
    return self._MultiNodeCall(node_list, "hypervisor_validate_params",
884
                               [hvname, hv_full])