Statistics
| Branch: | Tag: | Revision:

root / lib / rpc.py @ 4331f6cd

History | View | Annotate | Download (24.7 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Inter-node RPC library.
23

24
"""
25

    
26
# pylint: disable-msg=C0103,R0201,R0904
27
# C0103: Invalid name, since call_ are not valid
28
# R0201: Method could be a function, we keep all rpcs instance methods
29
# as not to change them back and forth between static/instance methods
30
# if they need to start using instance attributes
31
# R0904: Too many public methods
32

    
33
import os
34
import socket
35
import httplib
36
import logging
37

    
38
from ganeti import utils
39
from ganeti import objects
40
from ganeti import http
41
from ganeti import serializer
42

    
43

    
44
# Module level variable
45
_http_manager = None
46

    
47

    
48
def Init():
49
  """Initializes the module-global HTTP client manager.
50

51
  Must be called before using any RPC function.
52

53
  """
54
  global _http_manager
55

    
56
  assert not _http_manager, "RPC module initialized more than once"
57

    
58
  _http_manager = http.HttpClientManager()
59

    
60

    
61
def Shutdown():
62
  """Stops the module-global HTTP client manager.
63

64
  Must be called before quitting the program.
65

66
  """
67
  global _http_manager
68

    
69
  if _http_manager:
70
    _http_manager.Shutdown()
71
    _http_manager = None
72

    
73

    
74
class Client:
75
  """RPC Client class.
76

77
  This class, given a (remote) method name, a list of parameters and a
78
  list of nodes, will contact (in parallel) all nodes, and return a
79
  dict of results (key: node name, value: result).
80

81
  One current bug is that generic failure is still signalled by
82
  'False' result, which is not good. This overloading of values can
83
  cause bugs.
84

85
  """
86
  def __init__(self, procedure, args):
87
    self.procedure = procedure
88
    self.args = args
89
    self.body = serializer.DumpJson(args, indent=False)
90

    
91
    self.port = utils.GetNodeDaemonPort()
92
    self.nodepw = utils.GetNodeDaemonPassword()
93
    self.nc = {}
94

    
95
  def ConnectList(self, node_list, address_list=None):
96
    """Add a list of nodes to the target nodes.
97

98
    @type node_list: list
99
    @param node_list: the list of node names to connect
100
    @type address_list: list or None
101
    @keyword address_list: either None or a list with node addresses,
102
        which must have the same length as the node list
103

104
    """
105
    if address_list is None:
106
      address_list = [None for _ in node_list]
107
    else:
108
      assert len(node_list) == len(address_list), \
109
             "Name and address lists should have the same length"
110
    for node, address in zip(node_list, address_list):
111
      self.ConnectNode(node, address)
112

    
113
  def ConnectNode(self, name, address=None):
114
    """Add a node to the target list.
115

116
    @type name: str
117
    @param name: the node name
118
    @type address: str
119
    @keyword address: the node address, if known
120

121
    """
122
    if address is None:
123
      address = name
124

    
125
    self.nc[name] = http.HttpClientRequest(address, self.port, http.HTTP_PUT,
126
                                           "/%s" % self.procedure,
127
                                           post_data=self.body)
128

    
129
  def GetResults(self):
130
    """Call nodes and return results.
131

132
    @rtype: list
133
    @returns: List of RPC results
134

135
    """
136
    assert _http_manager, "RPC module not intialized"
137

    
138
    _http_manager.ExecRequests(self.nc.values())
139

    
140
    results = {}
141

    
142
    for name, req in self.nc.iteritems():
143
      if req.success and req.resp_status == http.HTTP_OK:
144
        results[name] = serializer.LoadJson(req.resp_body)
145
        continue
146

    
147
      if req.error:
148
        msg = req.error
149
      else:
150
        msg = req.resp_body
151

    
152
      logging.error("RPC error from node %s: %s", name, msg)
153
      results[name] = False
154

    
155
    return results
156

    
157

    
158
class RpcRunner(object):
159
  """RPC runner class"""
160

    
161
  def __init__(self, cfg):
162
    """Initialized the rpc runner.
163

164
    @type cfg:  C{config.ConfigWriter}
165
    @param cfg: the configuration object that will be used to get data
166
                about the cluster
167

168
    """
169
    self._cfg = cfg
170

    
171
  def _InstDict(self, instance):
172
    """Convert the given instance to a dict.
173

174
    This is done via the instance's ToDict() method and additionally
175
    we fill the hvparams with the cluster defaults.
176

177
    @type instance: L{objects.Instance}
178
    @param instance: an Instance object
179
    @rtype: dict
180
    @return: the instance dict, with the hvparams filled with the
181
        cluster defaults
182

183
    """
184
    idict = instance.ToDict()
185
    cluster = self._cfg.GetClusterInfo()
186
    idict["hvparams"] = cluster.FillHV(instance)
187
    idict["beparams"] = cluster.FillBE(instance)
188
    return idict
189

    
190
  def _ConnectList(self, client, node_list):
191
    """Helper for computing node addresses.
192

193
    @type client: L{Client}
194
    @param client: a C{Client} instance
195
    @type node_list: list
196
    @param node_list: the node list we should connect
197

198
    """
199
    all_nodes = self._cfg.GetAllNodesInfo()
200
    addr_list = []
201
    for node in node_list:
202
      if node in all_nodes:
203
        val = all_nodes[node].primary_ip
204
      else:
205
        val = None
206
      addr_list.append(val)
207
    client.ConnectList(node_list, address_list=addr_list)
208

    
209
  def _ConnectNode(self, client, node):
210
    """Helper for computing one node's address.
211

212
    @type client: L{Client}
213
    @param client: a C{Client} instance
214
    @type node: str
215
    @param node: the node we should connect
216

217
    """
218
    node_info = self._cfg.GetNodeInfo(node)
219
    if node_info is not None:
220
      addr = node_info.primary_ip
221
    else:
222
      addr = None
223
    client.ConnectNode(node, address=addr)
224

    
225
  def _MultiNodeCall(self, node_list, procedure, args,
226
                     address_list=None):
227
    c = Client(procedure, args)
228
    if address_list is None:
229
      self._ConnectList(c, node_list)
230
    else:
231
      c.ConnectList(node_list, address_list=address_list)
232
    return c.GetResults()
233

    
234
  @classmethod
235
  def _StaticMultiNodeCall(cls, node_list, procedure, args,
236
                           address_list=None):
237
    c = Client(procedure, args)
238
    c.ConnectList(node_list, address_list=address_list)
239
    return c.GetResults()
240

    
241
  def _SingleNodeCall(self, node, procedure, args):
242
    """
243

244
    """
245
    c = Client(procedure, args)
246
    self._ConnectNode(c, node)
247
    return c.GetResults().get(node, False)
248

    
249
  @classmethod
250
  def _StaticSingleNodeCall(cls, node, procedure, args):
251
    """
252

253
    """
254
    c = Client(procedure, args)
255
    c.ConnectNode(c, node)
256
    return c.GetResults().get(node, False)
257

    
258
  def call_volume_list(self, node_list, vg_name):
259
    """Gets the logical volumes present in a given volume group.
260

261
    This is a multi-node call.
262

263
    """
264
    return self._MultiNodeCall(node_list, "volume_list", [vg_name])
265

    
266
  def call_vg_list(self, node_list):
267
    """Gets the volume group list.
268

269
    This is a multi-node call.
270

271
    """
272
    return self._MultiNodeCall(node_list, "vg_list", [])
273

    
274
  def call_bridges_exist(self, node, bridges_list):
275
    """Checks if a node has all the bridges given.
276

277
    This method checks if all bridges given in the bridges_list are
278
    present on the remote node, so that an instance that uses interfaces
279
    on those bridges can be started.
280

281
    This is a single-node call.
282

283
    """
284
    return self._SingleNodeCall(node, "bridges_exist", [bridges_list])
285

    
286
  def call_instance_start(self, node, instance, extra_args):
287
    """Starts an instance.
288

289
    This is a single-node call.
290

291
    """
292
    return self._SingleNodeCall(node, "instance_start",
293
                                [self._InstDict(instance), extra_args])
294

    
295
  def call_instance_shutdown(self, node, instance):
296
    """Stops an instance.
297

298
    This is a single-node call.
299

300
    """
301
    return self._SingleNodeCall(node, "instance_shutdown",
302
                                [self._InstDict(instance)])
303

    
304
  def call_instance_migrate(self, node, instance, target, live):
305
    """Migrate an instance.
306

307
    This is a single-node call.
308

309
    @type node: string
310
    @param node: the node on which the instance is currently running
311
    @type instance: C{objects.Instance}
312
    @param instance: the instance definition
313
    @type target: string
314
    @param target: the target node name
315
    @type live: boolean
316
    @param live: whether the migration should be done live or not (the
317
        interpretation of this parameter is left to the hypervisor)
318

319
    """
320
    return self._SingleNodeCall(node, "instance_migrate",
321
                                [self._InstDict(instance), target, live])
322

    
323
  def call_instance_reboot(self, node, instance, reboot_type, extra_args):
324
    """Reboots an instance.
325

326
    This is a single-node call.
327

328
    """
329
    return self._SingleNodeCall(node, "instance_reboot",
330
                                [self._InstDict(instance), reboot_type,
331
                                 extra_args])
332

    
333
  def call_instance_os_add(self, node, inst):
334
    """Installs an OS on the given instance.
335

336
    This is a single-node call.
337

338
    """
339
    return self._SingleNodeCall(node, "instance_os_add",
340
                                [self._InstDict(inst)])
341

    
342
  def call_instance_run_rename(self, node, inst, old_name):
343
    """Run the OS rename script for an instance.
344

345
    This is a single-node call.
346

347
    """
348
    return self._SingleNodeCall(node, "instance_run_rename",
349
                                [self._InstDict(inst), old_name])
350

    
351
  def call_instance_info(self, node, instance, hname):
352
    """Returns information about a single instance.
353

354
    This is a single-node call.
355

356
    @type node: list
357
    @param node: the list of nodes to query
358
    @type instance: string
359
    @param instance: the instance name
360
    @type hname: string
361
    @param hname: the hypervisor type of the instance
362

363
    """
364
    return self._SingleNodeCall(node, "instance_info", [instance, hname])
365

    
366
  def call_all_instances_info(self, node_list, hypervisor_list):
367
    """Returns information about all instances on the given nodes.
368

369
    This is a multi-node call.
370

371
    @type node_list: list
372
    @param node_list: the list of nodes to query
373
    @type hypervisor_list: list
374
    @param hypervisor_list: the hypervisors to query for instances
375

376
    """
377
    return self._MultiNodeCall(node_list, "all_instances_info",
378
                               [hypervisor_list])
379

    
380
  def call_instance_list(self, node_list, hypervisor_list):
381
    """Returns the list of running instances on a given node.
382

383
    This is a multi-node call.
384

385
    @type node_list: list
386
    @param node_list: the list of nodes to query
387
    @type hypervisor_list: list
388
    @param hypervisor_list: the hypervisors to query for instances
389

390
    """
391
    return self._MultiNodeCall(node_list, "instance_list", [hypervisor_list])
392

    
393
  def call_node_tcp_ping(self, node, source, target, port, timeout,
394
                         live_port_needed):
395
    """Do a TcpPing on the remote node
396

397
    This is a single-node call.
398

399
    """
400
    return self._SingleNodeCall(node, "node_tcp_ping",
401
                                [source, target, port, timeout,
402
                                 live_port_needed])
403

    
404
  def call_node_has_ip_address(self, node, address):
405
    """Checks if a node has the given IP address.
406

407
    This is a single-node call.
408

409
    """
410
    return self._SingleNodeCall(node, "node_has_ip_address", [address])
411

    
412
  def call_node_info(self, node_list, vg_name, hypervisor_type):
413
    """Return node information.
414

415
    This will return memory information and volume group size and free
416
    space.
417

418
    This is a multi-node call.
419

420
    @type node_list: list
421
    @param node_list: the list of nodes to query
422
    @type vgname: C{string}
423
    @param vgname: the name of the volume group to ask for disk space
424
        information
425
    @type hypervisor_type: C{str}
426
    @param hypervisor_type: the name of the hypervisor to ask for
427
        memory information
428

429
    """
430
    retux = self._MultiNodeCall(node_list, "node_info",
431
                                [vg_name, hypervisor_type])
432

    
433
    for node_name in retux:
434
      ret = retux.get(node_name, False)
435
      if type(ret) != dict:
436
        logging.error("could not connect to node %s", node_name)
437
        ret = {}
438

    
439
      utils.CheckDict(ret, {
440
                        'memory_total' : '-',
441
                        'memory_dom0' : '-',
442
                        'memory_free' : '-',
443
                        'vg_size' : 'node_unreachable',
444
                        'vg_free' : '-',
445
                      }, "call_node_info")
446
    return retux
447

    
448
  def call_node_add(self, node, dsa, dsapub, rsa, rsapub, ssh, sshpub):
449
    """Add a node to the cluster.
450

451
    This is a single-node call.
452

453
    """
454
    return self._SingleNodeCall(node, "node_add",
455
                                [dsa, dsapub, rsa, rsapub, ssh, sshpub])
456

    
457
  def call_node_verify(self, node_list, checkdict, cluster_name):
458
    """Request verification of given parameters.
459

460
    This is a multi-node call.
461

462
    """
463
    return self._MultiNodeCall(node_list, "node_verify",
464
                               [checkdict, cluster_name])
465

    
466
  @classmethod
467
  def call_node_start_master(cls, node, start_daemons):
468
    """Tells a node to activate itself as a master.
469

470
    This is a single-node call.
471

472
    """
473
    return cls._StaticSingleNodeCall(node, "node_start_master",
474
                                     [start_daemons])
475

    
476
  @classmethod
477
  def call_node_stop_master(cls, node, stop_daemons):
478
    """Tells a node to demote itself from master status.
479

480
    This is a single-node call.
481

482
    """
483
    return cls._StaticSingleNodeCall(node, "node_stop_master", [stop_daemons])
484

    
485
  @classmethod
486
  def call_master_info(cls, node_list):
487
    """Query master info.
488

489
    This is a multi-node call.
490

491
    """
492
    # TODO: should this method query down nodes?
493
    return cls._StaticMultiNodeCall(node_list, "master_info", [])
494

    
495
  def call_version(self, node_list):
496
    """Query node version.
497

498
    This is a multi-node call.
499

500
    """
501
    return self._MultiNodeCall(node_list, "version", [])
502

    
503
  def call_blockdev_create(self, node, bdev, size, owner, on_primary, info):
504
    """Request creation of a given block device.
505

506
    This is a single-node call.
507

508
    """
509
    return self._SingleNodeCall(node, "blockdev_create",
510
                                [bdev.ToDict(), size, owner, on_primary, info])
511

    
512
  def call_blockdev_remove(self, node, bdev):
513
    """Request removal of a given block device.
514

515
    This is a single-node call.
516

517
    """
518
    return self._SingleNodeCall(node, "blockdev_remove", [bdev.ToDict()])
519

    
520
  def call_blockdev_rename(self, node, devlist):
521
    """Request rename of the given block devices.
522

523
    This is a single-node call.
524

525
    """
526
    return self._SingleNodeCall(node, "blockdev_rename",
527
                                [(d.ToDict(), uid) for d, uid in devlist])
528

    
529
  def call_blockdev_assemble(self, node, disk, owner, on_primary):
530
    """Request assembling of a given block device.
531

532
    This is a single-node call.
533

534
    """
535
    return self._SingleNodeCall(node, "blockdev_assemble",
536
                                [disk.ToDict(), owner, on_primary])
537

    
538
  def call_blockdev_shutdown(self, node, disk):
539
    """Request shutdown of a given block device.
540

541
    This is a single-node call.
542

543
    """
544
    return self._SingleNodeCall(node, "blockdev_shutdown", [disk.ToDict()])
545

    
546
  def call_blockdev_addchildren(self, node, bdev, ndevs):
547
    """Request adding a list of children to a (mirroring) device.
548

549
    This is a single-node call.
550

551
    """
552
    return self._SingleNodeCall(node, "blockdev_addchildren",
553
                                [bdev.ToDict(),
554
                                 [disk.ToDict() for disk in ndevs]])
555

    
556
  def call_blockdev_removechildren(self, node, bdev, ndevs):
557
    """Request removing a list of children from a (mirroring) device.
558

559
    This is a single-node call.
560

561
    """
562
    return self._SingleNodeCall(node, "blockdev_removechildren",
563
                                [bdev.ToDict(),
564
                                 [disk.ToDict() for disk in ndevs]])
565

    
566
  def call_blockdev_getmirrorstatus(self, node, disks):
567
    """Request status of a (mirroring) device.
568

569
    This is a single-node call.
570

571
    """
572
    return self._SingleNodeCall(node, "blockdev_getmirrorstatus",
573
                                [dsk.ToDict() for dsk in disks])
574

    
575
  def call_blockdev_find(self, node, disk):
576
    """Request identification of a given block device.
577

578
    This is a single-node call.
579

580
    """
581
    return self._SingleNodeCall(node, "blockdev_find", [disk.ToDict()])
582

    
583
  def call_blockdev_close(self, node, disks):
584
    """Closes the given block devices.
585

586
    This is a single-node call.
587

588
    """
589
    return self._SingleNodeCall(node, "blockdev_close",
590
                                [cf.ToDict() for cf in disks])
591

    
592
  @classmethod
593
  def call_upload_file(cls, node_list, file_name, address_list=None):
594
    """Upload a file.
595

596
    The node will refuse the operation in case the file is not on the
597
    approved file list.
598

599
    This is a multi-node call.
600

601
    @type node_list: list
602
    @param node_list: the list of node names to upload to
603
    @type file_name: str
604
    @param file_name: the filename to upload
605
    @type address_list: list or None
606
    @keyword address_list: an optional list of node addresses, in order
607
        to optimize the RPC speed
608

609
    """
610
    data = utils.ReadFile(file_name)
611
    st = os.stat(file_name)
612
    params = [file_name, data, st.st_mode, st.st_uid, st.st_gid,
613
              st.st_atime, st.st_mtime]
614
    return cls._StaticMultiNodeCall(node_list, "upload_file", params,
615
                                    address_list=address_list)
616

    
617
  @classmethod
618
  def call_write_ssconf_files(cls, node_list):
619
    """Write ssconf files.
620

621
    This is a multi-node call.
622

623
    """
624
    return cls._StaticMultiNodeCall(node_list, "write_ssconf_files", [])
625

    
626
  def call_os_diagnose(self, node_list):
627
    """Request a diagnose of OS definitions.
628

629
    This is a multi-node call.
630

631
    """
632
    result = self._MultiNodeCall(node_list, "os_diagnose", [])
633

    
634
    new_result = {}
635
    for node_name in result:
636
      if result[node_name]:
637
        nr = [objects.OS.FromDict(oss) for oss in result[node_name]]
638
      else:
639
        nr = []
640
      new_result[node_name] = nr
641
    return new_result
642

    
643
  def call_os_get(self, node, name):
644
    """Returns an OS definition.
645

646
    This is a single-node call.
647

648
    """
649
    result = self._SingleNodeCall(node, "os_get", [name])
650
    if isinstance(result, dict):
651
      return objects.OS.FromDict(result)
652
    else:
653
      return result
654

    
655
  def call_hooks_runner(self, node_list, hpath, phase, env):
656
    """Call the hooks runner.
657

658
    Args:
659
      - op: the OpCode instance
660
      - env: a dictionary with the environment
661

662
    This is a multi-node call.
663

664
    """
665
    params = [hpath, phase, env]
666
    return self._MultiNodeCall(node_list, "hooks_runner", params)
667

    
668
  def call_iallocator_runner(self, node, name, idata):
669
    """Call an iallocator on a remote node
670

671
    Args:
672
      - name: the iallocator name
673
      - input: the json-encoded input string
674

675
    This is a single-node call.
676

677
    """
678
    return self._SingleNodeCall(node, "iallocator_runner", [name, idata])
679

    
680
  def call_blockdev_grow(self, node, cf_bdev, amount):
681
    """Request a snapshot of the given block device.
682

683
    This is a single-node call.
684

685
    """
686
    return self._SingleNodeCall(node, "blockdev_grow",
687
                                [cf_bdev.ToDict(), amount])
688

    
689
  def call_blockdev_snapshot(self, node, cf_bdev):
690
    """Request a snapshot of the given block device.
691

692
    This is a single-node call.
693

694
    """
695
    return self._SingleNodeCall(node, "blockdev_snapshot", [cf_bdev.ToDict()])
696

    
697
  def call_snapshot_export(self, node, snap_bdev, dest_node, instance,
698
                           cluster_name, idx):
699
    """Request the export of a given snapshot.
700

701
    This is a single-node call.
702

703
    """
704
    return self._SingleNodeCall(node, "snapshot_export",
705
                                [snap_bdev.ToDict(), dest_node,
706
                                 self._InstDict(instance), cluster_name, idx])
707

    
708
  def call_finalize_export(self, node, instance, snap_disks):
709
    """Request the completion of an export operation.
710

711
    This writes the export config file, etc.
712

713
    This is a single-node call.
714

715
    """
716
    flat_disks = []
717
    for disk in snap_disks:
718
      flat_disks.append(disk.ToDict())
719

    
720
    return self._SingleNodeCall(node, "finalize_export",
721
                                [self._InstDict(instance), flat_disks])
722

    
723
  def call_export_info(self, node, path):
724
    """Queries the export information in a given path.
725

726
    This is a single-node call.
727

728
    """
729
    result = self._SingleNodeCall(node, "export_info", [path])
730
    if not result:
731
      return result
732
    return objects.SerializableConfigParser.Loads(str(result))
733

    
734
  def call_instance_os_import(self, node, inst, src_node, src_images,
735
                              cluster_name):
736
    """Request the import of a backup into an instance.
737

738
    This is a single-node call.
739

740
    """
741
    return self._SingleNodeCall(node, "instance_os_import",
742
                                [self._InstDict(inst), src_node, src_images,
743
                                 cluster_name])
744

    
745
  def call_export_list(self, node_list):
746
    """Gets the stored exports list.
747

748
    This is a multi-node call.
749

750
    """
751
    return self._MultiNodeCall(node_list, "export_list", [])
752

    
753
  def call_export_remove(self, node, export):
754
    """Requests removal of a given export.
755

756
    This is a single-node call.
757

758
    """
759
    return self._SingleNodeCall(node, "export_remove", [export])
760

    
761
  @classmethod
762
  def call_node_leave_cluster(cls, node):
763
    """Requests a node to clean the cluster information it has.
764

765
    This will remove the configuration information from the ganeti data
766
    dir.
767

768
    This is a single-node call.
769

770
    """
771
    return cls._StaticSingleNodeCall(node, "node_leave_cluster", [])
772

    
773
  def call_node_volumes(self, node_list):
774
    """Gets all volumes on node(s).
775

776
    This is a multi-node call.
777

778
    """
779
    return self._MultiNodeCall(node_list, "node_volumes", [])
780

    
781
  def call_test_delay(self, node_list, duration):
782
    """Sleep for a fixed time on given node(s).
783

784
    This is a multi-node call.
785

786
    """
787
    return self._MultiNodeCall(node_list, "test_delay", [duration])
788

    
789
  def call_file_storage_dir_create(self, node, file_storage_dir):
790
    """Create the given file storage directory.
791

792
    This is a single-node call.
793

794
    """
795
    return self._SingleNodeCall(node, "file_storage_dir_create",
796
                                [file_storage_dir])
797

    
798
  def call_file_storage_dir_remove(self, node, file_storage_dir):
799
    """Remove the given file storage directory.
800

801
    This is a single-node call.
802

803
    """
804
    return self._SingleNodeCall(node, "file_storage_dir_remove",
805
                                [file_storage_dir])
806

    
807
  def call_file_storage_dir_rename(self, node, old_file_storage_dir,
808
                                   new_file_storage_dir):
809
    """Rename file storage directory.
810

811
    This is a single-node call.
812

813
    """
814
    return self._SingleNodeCall(node, "file_storage_dir_rename",
815
                                [old_file_storage_dir, new_file_storage_dir])
816

    
817
  @classmethod
818
  def call_jobqueue_update(cls, node_list, address_list, file_name, content):
819
    """Update job queue.
820

821
    This is a multi-node call.
822

823
    """
824
    return cls._StaticMultiNodeCall(node_list, "jobqueue_update",
825
                                    [file_name, content],
826
                                    address_list=address_list)
827

    
828
  @classmethod
829
  def call_jobqueue_purge(cls, node):
830
    """Purge job queue.
831

832
    This is a single-node call.
833

834
    """
835
    return cls._StaticSingleNodeCall(node, "jobqueue_purge", [])
836

    
837
  @classmethod
838
  def call_jobqueue_rename(cls, node_list, address_list, old, new):
839
    """Rename a job queue file.
840

841
    This is a multi-node call.
842

843
    """
844
    return cls._StaticMultiNodeCall(node_list, "jobqueue_rename", [old, new],
845
                                    address_list=address_list)
846

    
847
  @classmethod
848
  def call_jobqueue_set_drain(cls, node_list, drain_flag):
849
    """Set the drain flag on the queue.
850

851
    This is a multi-node call.
852

853
    @type node_list: list
854
    @param node_list: the list of nodes to query
855
    @type drain_flag: bool
856
    @param drain_flag: if True, will set the drain flag, otherwise reset it.
857

858
    """
859
    return cls._StaticMultiNodeCall(node_list, "jobqueue_set_drain",
860
                                    [drain_flag])
861

    
862
  def call_hypervisor_validate_params(self, node_list, hvname, hvparams):
863
    """Validate the hypervisor params.
864

865
    This is a multi-node call.
866

867
    @type node_list: list
868
    @param node_list: the list of nodes to query
869
    @type hvname: string
870
    @param hvname: the hypervisor name
871
    @type hvparams: dict
872
    @param hvparams: the hypervisor parameters to be validated
873

874
    """
875
    cluster = self._cfg.GetClusterInfo()
876
    hv_full = cluster.FillDict(cluster.hvparams.get(hvname, {}), hvparams)
877
    return self._MultiNodeCall(node_list, "hypervisor_validate_params",
878
                               [hvname, hv_full])