Statistics
| Branch: | Tag: | Revision:

root / lib / rpc.py @ 6ddc95ec

History | View | Annotate | Download (24.2 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Inter-node RPC library.
23

24
"""
25

    
26
# pylint: disable-msg=C0103,R0201,R0904
27
# C0103: Invalid name, since call_ are not valid
28
# R0201: Method could be a function, we keep all rpcs instance methods
29
# as not to change them back and forth between static/instance methods
30
# if they need to start using instance attributes
31
# R0904: Too many public methods
32

    
33
import os
34
import socket
35
import httplib
36
import logging
37

    
38
from ganeti import utils
39
from ganeti import objects
40
from ganeti import http
41
from ganeti import serializer
42

    
43

    
44
class Client:
45
  """RPC Client class.
46

47
  This class, given a (remote) method name, a list of parameters and a
48
  list of nodes, will contact (in parallel) all nodes, and return a
49
  dict of results (key: node name, value: result).
50

51
  One current bug is that generic failure is still signalled by
52
  'False' result, which is not good. This overloading of values can
53
  cause bugs.
54

55
  """
56
  def __init__(self, procedure, args):
57
    self.procedure = procedure
58
    self.args = args
59
    self.body = serializer.DumpJson(args, indent=False)
60

    
61
    self.port = utils.GetNodeDaemonPort()
62
    self.nodepw = utils.GetNodeDaemonPassword()
63
    self.nc = {}
64

    
65
  def ConnectList(self, node_list, address_list=None):
66
    """Add a list of nodes to the target nodes.
67

68
    @type node_list: list
69
    @param node_list: the list of node names to connect
70
    @type address_list: list or None
71
    @keyword address_list: either None or a list with node addresses,
72
        which must have the same length as the node list
73

74
    """
75
    if address_list is None:
76
      address_list = [None for _ in node_list]
77
    else:
78
      assert len(node_list) == len(address_list), \
79
             "Name and address lists should have the same length"
80
    for node, address in zip(node_list, address_list):
81
      self.ConnectNode(node, address)
82

    
83
  def ConnectNode(self, name, address=None):
84
    """Add a node to the target list.
85

86
    @type name: str
87
    @param name: the node name
88
    @type address: str
89
    @keyword address: the node address, if known
90

91
    """
92
    if address is None:
93
      address = name
94

    
95
    self.nc[name] = http.HttpClientRequest(address, self.port, http.HTTP_PUT,
96
                                           "/%s" % self.procedure,
97
                                           post_data=self.body)
98

    
99
  def GetResults(self):
100
    """Call nodes and return results.
101

102
    @rtype: list
103
    @returns: List of RPC results
104

105
    """
106
    # TODO: Shared and reused manager
107
    mgr = http.HttpClientManager()
108
    try:
109
      mgr.ExecRequests(self.nc.values())
110
    finally:
111
      mgr.Shutdown()
112

    
113
    results = {}
114

    
115
    for name, req in self.nc.iteritems():
116
      if req.success and req.resp_status == http.HTTP_OK:
117
        results[name] = serializer.LoadJson(req.resp_body)
118
        continue
119

    
120
      if req.error:
121
        msg = req.error
122
      else:
123
        msg = req.resp_body
124

    
125
      logging.error("RPC error from node %s: %s", name, msg)
126
      results[name] = False
127

    
128
    return results
129

    
130

    
131
class RpcRunner(object):
132
  """RPC runner class"""
133

    
134
  def __init__(self, cfg):
135
    """Initialized the rpc runner.
136

137
    @type cfg:  C{config.ConfigWriter}
138
    @param cfg: the configuration object that will be used to get data
139
                about the cluster
140

141
    """
142
    self._cfg = cfg
143

    
144
  def _InstDict(self, instance):
145
    """Convert the given instance to a dict.
146

147
    This is done via the instance's ToDict() method and additionally
148
    we fill the hvparams with the cluster defaults.
149

150
    @type instance: L{objects.Instance}
151
    @param instance: an Instance object
152
    @rtype: dict
153
    @return: the instance dict, with the hvparams filled with the
154
        cluster defaults
155

156
    """
157
    idict = instance.ToDict()
158
    cluster = self._cfg.GetClusterInfo()
159
    idict["hvparams"] = cluster.FillHV(instance)
160
    idict["beparams"] = cluster.FillBE(instance)
161
    return idict
162

    
163
  def _ConnectList(self, client, node_list):
164
    """Helper for computing node addresses.
165

166
    @type client: L{Client}
167
    @param client: a C{Client} instance
168
    @type node_list: list
169
    @param node_list: the node list we should connect
170

171
    """
172
    all_nodes = self._cfg.GetAllNodesInfo()
173
    addr_list = []
174
    for node in node_list:
175
      if node in all_nodes:
176
        val = all_nodes[node].primary_ip
177
      else:
178
        val = None
179
      addr_list.append(val)
180
    client.ConnectList(node_list, address_list=addr_list)
181

    
182
  def _ConnectNode(self, client, node):
183
    """Helper for computing one node's address.
184

185
    @type client: L{Client}
186
    @param client: a C{Client} instance
187
    @type node: str
188
    @param node: the node we should connect
189

190
    """
191
    node_info = self._cfg.GetNodeInfo(node)
192
    if node_info is not None:
193
      addr = node_info.primary_ip
194
    else:
195
      addr = None
196
    client.ConnectNode(node, address=addr)
197

    
198
  def _MultiNodeCall(self, node_list, procedure, args,
199
                     address_list=None):
200
    c = Client(procedure, args)
201
    if address_list is None:
202
      self._ConnectList(c, node_list)
203
    else:
204
      c.ConnectList(node_list, address_list=address_list)
205
    return c.GetResults()
206

    
207
  @classmethod
208
  def _StaticMultiNodeCall(cls, node_list, procedure, args,
209
                           address_list=None):
210
    c = Client(procedure, args)
211
    c.ConnectList(node_list, address_list=address_list)
212
    return c.GetResults()
213

    
214
  def _SingleNodeCall(self, node, procedure, args):
215
    """
216

217
    """
218
    c = Client(procedure, args)
219
    self._ConnectNode(c, node)
220
    return c.GetResults().get(node, False)
221

    
222
  @classmethod
223
  def _StaticSingleNodeCall(cls, node, procedure, args):
224
    """
225

226
    """
227
    c = Client(procedure, args)
228
    c.ConnectNode(c, node)
229
    return c.GetResults().get(node, False)
230

    
231
  def call_volume_list(self, node_list, vg_name):
232
    """Gets the logical volumes present in a given volume group.
233

234
    This is a multi-node call.
235

236
    """
237
    return self._MultiNodeCall(node_list, "volume_list", [vg_name])
238

    
239
  def call_vg_list(self, node_list):
240
    """Gets the volume group list.
241

242
    This is a multi-node call.
243

244
    """
245
    return self._MultiNodeCall(node_list, "vg_list", [])
246

    
247
  def call_bridges_exist(self, node, bridges_list):
248
    """Checks if a node has all the bridges given.
249

250
    This method checks if all bridges given in the bridges_list are
251
    present on the remote node, so that an instance that uses interfaces
252
    on those bridges can be started.
253

254
    This is a single-node call.
255

256
    """
257
    return self._SingleNodeCall(node, "bridges_exist", [bridges_list])
258

    
259
  def call_instance_start(self, node, instance, extra_args):
260
    """Starts an instance.
261

262
    This is a single-node call.
263

264
    """
265
    return self._SingleNodeCall(node, "instance_start",
266
                                [self._InstDict(instance), extra_args])
267

    
268
  def call_instance_shutdown(self, node, instance):
269
    """Stops an instance.
270

271
    This is a single-node call.
272

273
    """
274
    return self._SingleNodeCall(node, "instance_shutdown",
275
                                [self._InstDict(instance)])
276

    
277
  def call_instance_migrate(self, node, instance, target, live):
278
    """Migrate an instance.
279

280
    This is a single-node call.
281

282
    @type node: string
283
    @param node: the node on which the instance is currently running
284
    @type instance: C{objects.Instance}
285
    @param instance: the instance definition
286
    @type target: string
287
    @param target: the target node name
288
    @type live: boolean
289
    @param live: whether the migration should be done live or not (the
290
        interpretation of this parameter is left to the hypervisor)
291

292
    """
293
    return self._SingleNodeCall(node, "instance_migrate",
294
                                [self._InstDict(instance), target, live])
295

    
296
  def call_instance_reboot(self, node, instance, reboot_type, extra_args):
297
    """Reboots an instance.
298

299
    This is a single-node call.
300

301
    """
302
    return self._SingleNodeCall(node, "instance_reboot",
303
                                [self._InstDict(instance), reboot_type,
304
                                 extra_args])
305

    
306
  def call_instance_os_add(self, node, inst):
307
    """Installs an OS on the given instance.
308

309
    This is a single-node call.
310

311
    """
312
    return self._SingleNodeCall(node, "instance_os_add",
313
                                [self._InstDict(inst)])
314

    
315
  def call_instance_run_rename(self, node, inst, old_name):
316
    """Run the OS rename script for an instance.
317

318
    This is a single-node call.
319

320
    """
321
    return self._SingleNodeCall(node, "instance_run_rename",
322
                                [self._InstDict(inst), old_name])
323

    
324
  def call_instance_info(self, node, instance, hname):
325
    """Returns information about a single instance.
326

327
    This is a single-node call.
328

329
    @type node: list
330
    @param node: the list of nodes to query
331
    @type instance: string
332
    @param instance: the instance name
333
    @type hname: string
334
    @param hname: the hypervisor type of the instance
335

336
    """
337
    return self._SingleNodeCall(node, "instance_info", [instance, hname])
338

    
339
  def call_all_instances_info(self, node_list, hypervisor_list):
340
    """Returns information about all instances on the given nodes.
341

342
    This is a multi-node call.
343

344
    @type node_list: list
345
    @param node_list: the list of nodes to query
346
    @type hypervisor_list: list
347
    @param hypervisor_list: the hypervisors to query for instances
348

349
    """
350
    return self._MultiNodeCall(node_list, "all_instances_info",
351
                               [hypervisor_list])
352

    
353
  def call_instance_list(self, node_list, hypervisor_list):
354
    """Returns the list of running instances on a given node.
355

356
    This is a multi-node call.
357

358
    @type node_list: list
359
    @param node_list: the list of nodes to query
360
    @type hypervisor_list: list
361
    @param hypervisor_list: the hypervisors to query for instances
362

363
    """
364
    return self._MultiNodeCall(node_list, "instance_list", [hypervisor_list])
365

    
366
  def call_node_tcp_ping(self, node, source, target, port, timeout,
367
                         live_port_needed):
368
    """Do a TcpPing on the remote node
369

370
    This is a single-node call.
371

372
    """
373
    return self._SingleNodeCall(node, "node_tcp_ping",
374
                                [source, target, port, timeout,
375
                                 live_port_needed])
376

    
377
  def call_node_has_ip_address(self, node, address):
378
    """Checks if a node has the given IP address.
379

380
    This is a single-node call.
381

382
    """
383
    return self._SingleNodeCall(node, "node_has_ip_address", [address])
384

    
385
  def call_node_info(self, node_list, vg_name, hypervisor_type):
386
    """Return node information.
387

388
    This will return memory information and volume group size and free
389
    space.
390

391
    This is a multi-node call.
392

393
    @type node_list: list
394
    @param node_list: the list of nodes to query
395
    @type vgname: C{string}
396
    @param vgname: the name of the volume group to ask for disk space
397
        information
398
    @type hypervisor_type: C{str}
399
    @param hypervisor_type: the name of the hypervisor to ask for
400
        memory information
401

402
    """
403
    retux = self._MultiNodeCall(node_list, "node_info",
404
                                [vg_name, hypervisor_type])
405

    
406
    for node_name in retux:
407
      ret = retux.get(node_name, False)
408
      if type(ret) != dict:
409
        logging.error("could not connect to node %s", node_name)
410
        ret = {}
411

    
412
      utils.CheckDict(ret, {
413
                        'memory_total' : '-',
414
                        'memory_dom0' : '-',
415
                        'memory_free' : '-',
416
                        'vg_size' : 'node_unreachable',
417
                        'vg_free' : '-',
418
                      }, "call_node_info")
419
    return retux
420

    
421
  def call_node_add(self, node, dsa, dsapub, rsa, rsapub, ssh, sshpub):
422
    """Add a node to the cluster.
423

424
    This is a single-node call.
425

426
    """
427
    return self._SingleNodeCall(node, "node_add",
428
                                [dsa, dsapub, rsa, rsapub, ssh, sshpub])
429

    
430
  def call_node_verify(self, node_list, checkdict, cluster_name):
431
    """Request verification of given parameters.
432

433
    This is a multi-node call.
434

435
    """
436
    return self._MultiNodeCall(node_list, "node_verify",
437
                               [checkdict, cluster_name])
438

    
439
  @classmethod
440
  def call_node_start_master(cls, node, start_daemons):
441
    """Tells a node to activate itself as a master.
442

443
    This is a single-node call.
444

445
    """
446
    return cls._StaticSingleNodeCall(node, "node_start_master",
447
                                     [start_daemons])
448

    
449
  @classmethod
450
  def call_node_stop_master(cls, node, stop_daemons):
451
    """Tells a node to demote itself from master status.
452

453
    This is a single-node call.
454

455
    """
456
    return cls._StaticSingleNodeCall(node, "node_stop_master", [stop_daemons])
457

    
458
  @classmethod
459
  def call_master_info(cls, node_list):
460
    """Query master info.
461

462
    This is a multi-node call.
463

464
    """
465
    # TODO: should this method query down nodes?
466
    return cls._StaticMultiNodeCall(node_list, "master_info", [])
467

    
468
  def call_version(self, node_list):
469
    """Query node version.
470

471
    This is a multi-node call.
472

473
    """
474
    return self._MultiNodeCall(node_list, "version", [])
475

    
476
  def call_blockdev_create(self, node, bdev, size, owner, on_primary, info):
477
    """Request creation of a given block device.
478

479
    This is a single-node call.
480

481
    """
482
    return self._SingleNodeCall(node, "blockdev_create",
483
                                [bdev.ToDict(), size, owner, on_primary, info])
484

    
485
  def call_blockdev_remove(self, node, bdev):
486
    """Request removal of a given block device.
487

488
    This is a single-node call.
489

490
    """
491
    return self._SingleNodeCall(node, "blockdev_remove", [bdev.ToDict()])
492

    
493
  def call_blockdev_rename(self, node, devlist):
494
    """Request rename of the given block devices.
495

496
    This is a single-node call.
497

498
    """
499
    return self._SingleNodeCall(node, "blockdev_rename",
500
                                [(d.ToDict(), uid) for d, uid in devlist])
501

    
502
  def call_blockdev_assemble(self, node, disk, owner, on_primary):
503
    """Request assembling of a given block device.
504

505
    This is a single-node call.
506

507
    """
508
    return self._SingleNodeCall(node, "blockdev_assemble",
509
                                [disk.ToDict(), owner, on_primary])
510

    
511
  def call_blockdev_shutdown(self, node, disk):
512
    """Request shutdown of a given block device.
513

514
    This is a single-node call.
515

516
    """
517
    return self._SingleNodeCall(node, "blockdev_shutdown", [disk.ToDict()])
518

    
519
  def call_blockdev_addchildren(self, node, bdev, ndevs):
520
    """Request adding a list of children to a (mirroring) device.
521

522
    This is a single-node call.
523

524
    """
525
    return self._SingleNodeCall(node, "blockdev_addchildren",
526
                                [bdev.ToDict(),
527
                                 [disk.ToDict() for disk in ndevs]])
528

    
529
  def call_blockdev_removechildren(self, node, bdev, ndevs):
530
    """Request removing a list of children from a (mirroring) device.
531

532
    This is a single-node call.
533

534
    """
535
    return self._SingleNodeCall(node, "blockdev_removechildren",
536
                                [bdev.ToDict(),
537
                                 [disk.ToDict() for disk in ndevs]])
538

    
539
  def call_blockdev_getmirrorstatus(self, node, disks):
540
    """Request status of a (mirroring) device.
541

542
    This is a single-node call.
543

544
    """
545
    return self._SingleNodeCall(node, "blockdev_getmirrorstatus",
546
                                [dsk.ToDict() for dsk in disks])
547

    
548
  def call_blockdev_find(self, node, disk):
549
    """Request identification of a given block device.
550

551
    This is a single-node call.
552

553
    """
554
    return self._SingleNodeCall(node, "blockdev_find", [disk.ToDict()])
555

    
556
  def call_blockdev_close(self, node, disks):
557
    """Closes the given block devices.
558

559
    This is a single-node call.
560

561
    """
562
    return self._SingleNodeCall(node, "blockdev_close",
563
                                [cf.ToDict() for cf in disks])
564

    
565
  @classmethod
566
  def call_upload_file(cls, node_list, file_name, address_list=None):
567
    """Upload a file.
568

569
    The node will refuse the operation in case the file is not on the
570
    approved file list.
571

572
    This is a multi-node call.
573

574
    @type node_list: list
575
    @param node_list: the list of node names to upload to
576
    @type file_name: str
577
    @param file_name: the filename to upload
578
    @type address_list: list or None
579
    @keyword address_list: an optional list of node addresses, in order
580
        to optimize the RPC speed
581

582
    """
583
    data = utils.ReadFile(file_name)
584
    st = os.stat(file_name)
585
    params = [file_name, data, st.st_mode, st.st_uid, st.st_gid,
586
              st.st_atime, st.st_mtime]
587
    return cls._StaticMultiNodeCall(node_list, "upload_file", params,
588
                                    address_list=address_list)
589

    
590
  @classmethod
591
  def call_write_ssconf_files(cls, node_list):
592
    """Write ssconf files.
593

594
    This is a multi-node call.
595

596
    """
597
    return cls._StaticMultiNodeCall(node_list, "write_ssconf_files", [])
598

    
599
  def call_os_diagnose(self, node_list):
600
    """Request a diagnose of OS definitions.
601

602
    This is a multi-node call.
603

604
    """
605
    result = self._MultiNodeCall(node_list, "os_diagnose", [])
606

    
607
    new_result = {}
608
    for node_name in result:
609
      if result[node_name]:
610
        nr = [objects.OS.FromDict(oss) for oss in result[node_name]]
611
      else:
612
        nr = []
613
      new_result[node_name] = nr
614
    return new_result
615

    
616
  def call_os_get(self, node, name):
617
    """Returns an OS definition.
618

619
    This is a single-node call.
620

621
    """
622
    result = self._SingleNodeCall(node, "os_get", [name])
623
    if isinstance(result, dict):
624
      return objects.OS.FromDict(result)
625
    else:
626
      return result
627

    
628
  def call_hooks_runner(self, node_list, hpath, phase, env):
629
    """Call the hooks runner.
630

631
    Args:
632
      - op: the OpCode instance
633
      - env: a dictionary with the environment
634

635
    This is a multi-node call.
636

637
    """
638
    params = [hpath, phase, env]
639
    return self._MultiNodeCall(node_list, "hooks_runner", params)
640

    
641
  def call_iallocator_runner(self, node, name, idata):
642
    """Call an iallocator on a remote node
643

644
    Args:
645
      - name: the iallocator name
646
      - input: the json-encoded input string
647

648
    This is a single-node call.
649

650
    """
651
    return self._SingleNodeCall(node, "iallocator_runner", [name, idata])
652

    
653
  def call_blockdev_grow(self, node, cf_bdev, amount):
654
    """Request a snapshot of the given block device.
655

656
    This is a single-node call.
657

658
    """
659
    return self._SingleNodeCall(node, "blockdev_grow",
660
                                [cf_bdev.ToDict(), amount])
661

    
662
  def call_blockdev_snapshot(self, node, cf_bdev):
663
    """Request a snapshot of the given block device.
664

665
    This is a single-node call.
666

667
    """
668
    return self._SingleNodeCall(node, "blockdev_snapshot", [cf_bdev.ToDict()])
669

    
670
  def call_snapshot_export(self, node, snap_bdev, dest_node, instance,
671
                           cluster_name, idx):
672
    """Request the export of a given snapshot.
673

674
    This is a single-node call.
675

676
    """
677
    return self._SingleNodeCall(node, "snapshot_export",
678
                                [snap_bdev.ToDict(), dest_node,
679
                                 self._InstDict(instance), cluster_name, idx])
680

    
681
  def call_finalize_export(self, node, instance, snap_disks):
682
    """Request the completion of an export operation.
683

684
    This writes the export config file, etc.
685

686
    This is a single-node call.
687

688
    """
689
    flat_disks = []
690
    for disk in snap_disks:
691
      flat_disks.append(disk.ToDict())
692

    
693
    return self._SingleNodeCall(node, "finalize_export",
694
                                [self._InstDict(instance), flat_disks])
695

    
696
  def call_export_info(self, node, path):
697
    """Queries the export information in a given path.
698

699
    This is a single-node call.
700

701
    """
702
    result = self._SingleNodeCall(node, "export_info", [path])
703
    if not result:
704
      return result
705
    return objects.SerializableConfigParser.Loads(str(result))
706

    
707
  def call_instance_os_import(self, node, inst, src_node, src_images,
708
                              cluster_name):
709
    """Request the import of a backup into an instance.
710

711
    This is a single-node call.
712

713
    """
714
    return self._SingleNodeCall(node, "instance_os_import",
715
                                [self._InstDict(inst), src_node, src_images,
716
                                 cluster_name])
717

    
718
  def call_export_list(self, node_list):
719
    """Gets the stored exports list.
720

721
    This is a multi-node call.
722

723
    """
724
    return self._MultiNodeCall(node_list, "export_list", [])
725

    
726
  def call_export_remove(self, node, export):
727
    """Requests removal of a given export.
728

729
    This is a single-node call.
730

731
    """
732
    return self._SingleNodeCall(node, "export_remove", [export])
733

    
734
  @classmethod
735
  def call_node_leave_cluster(cls, node):
736
    """Requests a node to clean the cluster information it has.
737

738
    This will remove the configuration information from the ganeti data
739
    dir.
740

741
    This is a single-node call.
742

743
    """
744
    return cls._StaticSingleNodeCall(node, "node_leave_cluster", [])
745

    
746
  def call_node_volumes(self, node_list):
747
    """Gets all volumes on node(s).
748

749
    This is a multi-node call.
750

751
    """
752
    return self._MultiNodeCall(node_list, "node_volumes", [])
753

    
754
  def call_test_delay(self, node_list, duration):
755
    """Sleep for a fixed time on given node(s).
756

757
    This is a multi-node call.
758

759
    """
760
    return self._MultiNodeCall(node_list, "test_delay", [duration])
761

    
762
  def call_file_storage_dir_create(self, node, file_storage_dir):
763
    """Create the given file storage directory.
764

765
    This is a single-node call.
766

767
    """
768
    return self._SingleNodeCall(node, "file_storage_dir_create",
769
                                [file_storage_dir])
770

    
771
  def call_file_storage_dir_remove(self, node, file_storage_dir):
772
    """Remove the given file storage directory.
773

774
    This is a single-node call.
775

776
    """
777
    return self._SingleNodeCall(node, "file_storage_dir_remove",
778
                                [file_storage_dir])
779

    
780
  def call_file_storage_dir_rename(self, node, old_file_storage_dir,
781
                                   new_file_storage_dir):
782
    """Rename file storage directory.
783

784
    This is a single-node call.
785

786
    """
787
    return self._SingleNodeCall(node, "file_storage_dir_rename",
788
                                [old_file_storage_dir, new_file_storage_dir])
789

    
790
  @classmethod
791
  def call_jobqueue_update(cls, node_list, address_list, file_name, content):
792
    """Update job queue.
793

794
    This is a multi-node call.
795

796
    """
797
    return cls._StaticMultiNodeCall(node_list, "jobqueue_update",
798
                                    [file_name, content],
799
                                    address_list=address_list)
800

    
801
  @classmethod
802
  def call_jobqueue_purge(cls, node):
803
    """Purge job queue.
804

805
    This is a single-node call.
806

807
    """
808
    return cls._StaticSingleNodeCall(node, "jobqueue_purge", [])
809

    
810
  @classmethod
811
  def call_jobqueue_rename(cls, node_list, address_list, old, new):
812
    """Rename a job queue file.
813

814
    This is a multi-node call.
815

816
    """
817
    return cls._StaticMultiNodeCall(node_list, "jobqueue_rename", [old, new],
818
                                    address_list=address_list)
819

    
820
  @classmethod
821
  def call_jobqueue_set_drain(cls, node_list, drain_flag):
822
    """Set the drain flag on the queue.
823

824
    This is a multi-node call.
825

826
    @type node_list: list
827
    @param node_list: the list of nodes to query
828
    @type drain_flag: bool
829
    @param drain_flag: if True, will set the drain flag, otherwise reset it.
830

831
    """
832
    return cls._StaticMultiNodeCall(node_list, "jobqueue_set_drain",
833
                                    [drain_flag])
834

    
835
  def call_hypervisor_validate_params(self, node_list, hvname, hvparams):
836
    """Validate the hypervisor params.
837

838
    This is a multi-node call.
839

840
    @type node_list: list
841
    @param node_list: the list of nodes to query
842
    @type hvname: string
843
    @param hvname: the hypervisor name
844
    @type hvparams: dict
845
    @param hvparams: the hypervisor parameters to be validated
846

847
    """
848
    cluster = self._cfg.GetClusterInfo()
849
    hv_full = cluster.FillDict(cluster.hvparams.get(hvname, {}), hvparams)
850
    return self._MultiNodeCall(node_list, "hypervisor_validate_params",
851
                               [hvname, hv_full])