Statistics
| Branch: | Tag: | Revision:

root / lib / rpc.py @ 9a525d83

History | View | Annotate | Download (24 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Inter-node RPC library.
23

24
"""
25

    
26
# pylint: disable-msg=C0103,R0201,R0904
27
# C0103: Invalid name, since call_ are not valid
28
# R0201: Method could be a function, we keep all rpcs instance methods
29
# as not to change them back and forth between static/instance methods
30
# if they need to start using instance attributes
31
# R0904: Too many public methods
32

    
33
import os
34
import socket
35
import httplib
36
import logging
37

    
38
from ganeti import utils
39
from ganeti import objects
40
from ganeti import http
41
from ganeti import serializer
42

    
43

    
44
class Client:
45
  """RPC Client class.
46

47
  This class, given a (remote) method name, a list of parameters and a
48
  list of nodes, will contact (in parallel) all nodes, and return a
49
  dict of results (key: node name, value: result).
50

51
  One current bug is that generic failure is still signalled by
52
  'False' result, which is not good. This overloading of values can
53
  cause bugs.
54

55
  """
56
  def __init__(self, procedure, args):
57
    self.procedure = procedure
58
    self.args = args
59
    self.body = serializer.DumpJson(args, indent=False)
60

    
61
    self.port = utils.GetNodeDaemonPort()
62
    self.nodepw = utils.GetNodeDaemonPassword()
63
    self.nc = {}
64

    
65
  def ConnectList(self, node_list, address_list=None):
66
    """Add a list of nodes to the target nodes.
67

68
    @type node_list: list
69
    @param node_list: the list of node names to connect
70
    @type address_list: list or None
71
    @keyword address_list: either None or a list with node addresses,
72
        which must have the same length as the node list
73

74
    """
75
    if address_list is None:
76
      address_list = [None for _ in node_list]
77
    else:
78
      assert len(node_list) == len(address_list), \
79
             "Name and address lists should have the same length"
80
    for node, address in zip(node_list, address_list):
81
      self.ConnectNode(node, address)
82

    
83
  def ConnectNode(self, name, address=None):
84
    """Add a node to the target list.
85

86
    @type name: str
87
    @param name: the node name
88
    @type address: str
89
    @keyword address: the node address, if known
90

91
    """
92
    if address is None:
93
      address = name
94

    
95
    self.nc[name] = http.HttpClientRequest(address, self.port, http.HTTP_PUT,
96
                                           "/%s" % self.procedure,
97
                                           post_data=self.body)
98

    
99
  def GetResults(self):
100
    """Call nodes and return results.
101

102
    @rtype: list
103
    @returns: List of RPC results
104

105
    """
106
    # TODO: Shared and reused manager
107
    mgr = http.HttpClientManager()
108
    try:
109
      mgr.ExecRequests(self.nc.values())
110
    finally:
111
      mgr.Shutdown()
112

    
113
    results = {}
114

    
115
    for name, req in self.nc.iteritems():
116
      if req.success and req.resp_status == http.HTTP_OK:
117
        results[name] = serializer.LoadJson(req.resp_body)
118
        continue
119

    
120
      if req.error:
121
        msg = req.error
122
      else:
123
        msg = req.resp_body
124

    
125
      logging.error("RPC error from node %s: %s", name, msg)
126
      results[name] = False
127

    
128
    return results
129

    
130

    
131
class RpcRunner(object):
132
  """RPC runner class"""
133

    
134
  def __init__(self, cfg):
135
    """Initialized the rpc runner.
136

137
    @type cfg:  C{config.ConfigWriter}
138
    @param cfg: the configuration object that will be used to get data
139
                about the cluster
140

141
    """
142
    self._cfg = cfg
143

    
144
  def _InstDict(self, instance):
145
    """Convert the given instance to a dict.
146

147
    This is done via the instance's ToDict() method and additionally
148
    we fill the hvparams with the cluster defaults.
149

150
    @type instance: L{objects.Instance}
151
    @param instance: an Instance object
152
    @rtype: dict
153
    @return: the instance dict, with the hvparams filled with the
154
        cluster defaults
155

156
    """
157
    idict = instance.ToDict()
158
    cluster = self._cfg.GetClusterInfo()
159
    idict["hvparams"] = cluster.FillHV(instance)
160
    idict["beparams"] = cluster.FillBE(instance)
161
    return idict
162

    
163
  def _ConnectList(self, client, node_list):
164
    """Helper for computing node addresses.
165

166
    @type client: L{Client}
167
    @param client: a C{Client} instance
168
    @type node_list: list
169
    @param node_list: the node list we should connect
170

171
    """
172
    all_nodes = self._cfg.GetAllNodesInfo()
173
    addr_list = []
174
    for node in node_list:
175
      if node in all_nodes:
176
        val = all_nodes[node].primary_ip
177
      else:
178
        val = None
179
      addr_list.append(val)
180
    client.ConnectList(node_list, address_list=addr_list)
181

    
182
  def _ConnectNode(self, client, node):
183
    """Helper for computing one node's address.
184

185
    @type client: L{Client}
186
    @param client: a C{Client} instance
187
    @type node: str
188
    @param node: the node we should connect
189

190
    """
191
    node_info = self._cfg.GetNodeInfo(node)
192
    if node_info is not None:
193
      addr = node_info.primary_ip
194
    else:
195
      addr = None
196
    client.ConnectNode(node, address=addr)
197

    
198
  def _MultiNodeCall(self, node_list, procedure, args,
199
                     address_list=None):
200
    c = Client(procedure, args)
201
    if address_list is None:
202
      self._ConnectList(c, node_list)
203
    else:
204
      c.ConnectList(node_list, address_list=address_list)
205
    return c.GetResults()
206

    
207
  @classmethod
208
  def _StaticMultiNodeCall(cls, node_list, procedure, args,
209
                           address_list=None):
210
    c = Client(procedure, args)
211
    c.ConnectList(node_list, address_list=address_list)
212
    return c.GetResults()
213

    
214
  def _SingleNodeCall(self, node, procedure, args):
215
    """
216

217
    """
218
    c = Client(procedure, args)
219
    self._ConnectNode(c, node)
220
    return c.GetResults().get(node, False)
221

    
222
  @classmethod
223
  def _StaticSingleNodeCall(cls, node, procedure, args):
224
    """
225

226
    """
227
    c = Client(procedure, args)
228
    c.ConnectNode(c, node)
229
    return c.GetResults().get(node, False)
230

    
231
  def call_volume_list(self, node_list, vg_name):
232
    """Gets the logical volumes present in a given volume group.
233

234
    This is a multi-node call.
235

236
    """
237
    return self._MultiNodeCall(node_list, "volume_list", [vg_name])
238

    
239
  def call_vg_list(self, node_list):
240
    """Gets the volume group list.
241

242
    This is a multi-node call.
243

244
    """
245
    return self._MultiNodeCall(node_list, "vg_list", [])
246

    
247
  def call_bridges_exist(self, node, bridges_list):
248
    """Checks if a node has all the bridges given.
249

250
    This method checks if all bridges given in the bridges_list are
251
    present on the remote node, so that an instance that uses interfaces
252
    on those bridges can be started.
253

254
    This is a single-node call.
255

256
    """
257
    return self._SingleNodeCall(node, "bridges_exist", [bridges_list])
258

    
259
  def call_instance_start(self, node, instance, extra_args):
260
    """Starts an instance.
261

262
    This is a single-node call.
263

264
    """
265
    return self._SingleNodeCall(node, "instance_start",
266
                                [self._InstDict(instance), extra_args])
267

    
268
  def call_instance_shutdown(self, node, instance):
269
    """Stops an instance.
270

271
    This is a single-node call.
272

273
    """
274
    return self._SingleNodeCall(node, "instance_shutdown",
275
                                [self._InstDict(instance)])
276

    
277
  def call_instance_migrate(self, node, instance, target, live):
278
    """Migrate an instance.
279

280
    This is a single-node call.
281

282
    @type node: string
283
    @param node: the node on which the instance is currently running
284
    @type instance: C{objects.Instance}
285
    @param instance: the instance definition
286
    @type target: string
287
    @param target: the target node name
288
    @type live: boolean
289
    @param live: whether the migration should be done live or not (the
290
        interpretation of this parameter is left to the hypervisor)
291

292
    """
293
    return self._SingleNodeCall(node, "instance_migrate",
294
                                [self._InstDict(instance), target, live])
295

    
296
  def call_instance_reboot(self, node, instance, reboot_type, extra_args):
297
    """Reboots an instance.
298

299
    This is a single-node call.
300

301
    """
302
    return self._SingleNodeCall(node, "instance_reboot",
303
                                [self._InstDict(instance), reboot_type,
304
                                 extra_args])
305

    
306
  def call_instance_os_add(self, node, inst):
307
    """Installs an OS on the given instance.
308

309
    This is a single-node call.
310

311
    """
312
    return self._SingleNodeCall(node, "instance_os_add",
313
                                [self._InstDict(inst)])
314

    
315
  def call_instance_run_rename(self, node, inst, old_name):
316
    """Run the OS rename script for an instance.
317

318
    This is a single-node call.
319

320
    """
321
    return self._SingleNodeCall(node, "instance_run_rename",
322
                                [self._InstDict(inst), old_name])
323

    
324
  def call_instance_info(self, node, instance, hname):
325
    """Returns information about a single instance.
326

327
    This is a single-node call.
328

329
    @type node: list
330
    @param node: the list of nodes to query
331
    @type instance: string
332
    @param instance: the instance name
333
    @type hname: string
334
    @param hname: the hypervisor type of the instance
335

336
    """
337
    return self._SingleNodeCall(node, "instance_info", [instance, hname])
338

    
339
  def call_all_instances_info(self, node_list, hypervisor_list):
340
    """Returns information about all instances on the given nodes.
341

342
    This is a multi-node call.
343

344
    @type node_list: list
345
    @param node_list: the list of nodes to query
346
    @type hypervisor_list: list
347
    @param hypervisor_list: the hypervisors to query for instances
348

349
    """
350
    return self._MultiNodeCall(node_list, "all_instances_info",
351
                               [hypervisor_list])
352

    
353
  def call_instance_list(self, node_list, hypervisor_list):
354
    """Returns the list of running instances on a given node.
355

356
    This is a multi-node call.
357

358
    @type node_list: list
359
    @param node_list: the list of nodes to query
360
    @type hypervisor_list: list
361
    @param hypervisor_list: the hypervisors to query for instances
362

363
    """
364
    return self._MultiNodeCall(node_list, "instance_list", [hypervisor_list])
365

    
366
  def call_node_tcp_ping(self, node, source, target, port, timeout,
367
                         live_port_needed):
368
    """Do a TcpPing on the remote node
369

370
    This is a single-node call.
371

372
    """
373
    return self._SingleNodeCall(node, "node_tcp_ping",
374
                                [source, target, port, timeout,
375
                                 live_port_needed])
376

    
377
  def call_node_has_ip_address(self, node, address):
378
    """Checks if a node has the given IP address.
379

380
    This is a single-node call.
381

382
    """
383
    return self._SingleNodeCall(node, "node_has_ip_address", [address])
384

    
385
  def call_node_info(self, node_list, vg_name, hypervisor_type):
386
    """Return node information.
387

388
    This will return memory information and volume group size and free
389
    space.
390

391
    This is a multi-node call.
392

393
    @type node_list: list
394
    @param node_list: the list of nodes to query
395
    @type vgname: C{string}
396
    @param vgname: the name of the volume group to ask for disk space
397
        information
398
    @type hypervisor_type: C{str}
399
    @param hypervisor_type: the name of the hypervisor to ask for
400
        memory information
401

402
    """
403
    retux = self._MultiNodeCall(node_list, "node_info",
404
                                [vg_name, hypervisor_type])
405

    
406
    for node_name in retux:
407
      ret = retux.get(node_name, False)
408
      if type(ret) != dict:
409
        logging.error("could not connect to node %s", node_name)
410
        ret = {}
411

    
412
      utils.CheckDict(ret, {
413
                        'memory_total' : '-',
414
                        'memory_dom0' : '-',
415
                        'memory_free' : '-',
416
                        'vg_size' : 'node_unreachable',
417
                        'vg_free' : '-',
418
                      }, "call_node_info")
419
    return retux
420

    
421
  def call_node_add(self, node, dsa, dsapub, rsa, rsapub, ssh, sshpub):
422
    """Add a node to the cluster.
423

424
    This is a single-node call.
425

426
    """
427
    return self._SingleNodeCall(node, "node_add",
428
                                [dsa, dsapub, rsa, rsapub, ssh, sshpub])
429

    
430
  def call_node_verify(self, node_list, checkdict, cluster_name):
431
    """Request verification of given parameters.
432

433
    This is a multi-node call.
434

435
    """
436
    return self._MultiNodeCall(node_list, "node_verify",
437
                               [checkdict, cluster_name])
438

    
439
  @classmethod
440
  def call_node_start_master(cls, node, start_daemons):
441
    """Tells a node to activate itself as a master.
442

443
    This is a single-node call.
444

445
    """
446
    return cls._StaticSingleNodeCall(node, "node_start_master",
447
                                     [start_daemons])
448

    
449
  @classmethod
450
  def call_node_stop_master(cls, node, stop_daemons):
451
    """Tells a node to demote itself from master status.
452

453
    This is a single-node call.
454

455
    """
456
    return cls._StaticSingleNodeCall(node, "node_stop_master", [stop_daemons])
457

    
458
  @classmethod
459
  def call_master_info(cls, node_list):
460
    """Query master info.
461

462
    This is a multi-node call.
463

464
    """
465
    # TODO: should this method query down nodes?
466
    return cls._StaticMultiNodeCall(node_list, "master_info", [])
467

    
468
  def call_version(self, node_list):
469
    """Query node version.
470

471
    This is a multi-node call.
472

473
    """
474
    return self._MultiNodeCall(node_list, "version", [])
475

    
476
  def call_blockdev_create(self, node, bdev, size, owner, on_primary, info):
477
    """Request creation of a given block device.
478

479
    This is a single-node call.
480

481
    """
482
    return self._SingleNodeCall(node, "blockdev_create",
483
                                [bdev.ToDict(), size, owner, on_primary, info])
484

    
485
  def call_blockdev_remove(self, node, bdev):
486
    """Request removal of a given block device.
487

488
    This is a single-node call.
489

490
    """
491
    return self._SingleNodeCall(node, "blockdev_remove", [bdev.ToDict()])
492

    
493
  def call_blockdev_rename(self, node, devlist):
494
    """Request rename of the given block devices.
495

496
    This is a single-node call.
497

498
    """
499
    return self._SingleNodeCall(node, "blockdev_rename",
500
                                [(d.ToDict(), uid) for d, uid in devlist])
501

    
502
  def call_blockdev_assemble(self, node, disk, owner, on_primary):
503
    """Request assembling of a given block device.
504

505
    This is a single-node call.
506

507
    """
508
    return self._SingleNodeCall(node, "blockdev_assemble",
509
                                [disk.ToDict(), owner, on_primary])
510

    
511
  def call_blockdev_shutdown(self, node, disk):
512
    """Request shutdown of a given block device.
513

514
    This is a single-node call.
515

516
    """
517
    return self._SingleNodeCall(node, "blockdev_shutdown", [disk.ToDict()])
518

    
519
  def call_blockdev_addchildren(self, node, bdev, ndevs):
520
    """Request adding a list of children to a (mirroring) device.
521

522
    This is a single-node call.
523

524
    """
525
    return self._SingleNodeCall(node, "blockdev_addchildren",
526
                                [bdev.ToDict(),
527
                                 [disk.ToDict() for disk in ndevs]])
528

    
529
  def call_blockdev_removechildren(self, node, bdev, ndevs):
530
    """Request removing a list of children from a (mirroring) device.
531

532
    This is a single-node call.
533

534
    """
535
    return self._SingleNodeCall(node, "blockdev_removechildren",
536
                                [bdev.ToDict(),
537
                                 [disk.ToDict() for disk in ndevs]])
538

    
539
  def call_blockdev_getmirrorstatus(self, node, disks):
540
    """Request status of a (mirroring) device.
541

542
    This is a single-node call.
543

544
    """
545
    return self._SingleNodeCall(node, "blockdev_getmirrorstatus",
546
                                [dsk.ToDict() for dsk in disks])
547

    
548
  def call_blockdev_find(self, node, disk):
549
    """Request identification of a given block device.
550

551
    This is a single-node call.
552

553
    """
554
    return self._SingleNodeCall(node, "blockdev_find", [disk.ToDict()])
555

    
556
  def call_blockdev_close(self, node, disks):
557
    """Closes the given block devices.
558

559
    This is a single-node call.
560

561
    """
562
    return self._SingleNodeCall(node, "blockdev_close",
563
                                [cf.ToDict() for cf in disks])
564

    
565
  @classmethod
566
  def call_upload_file(cls, node_list, file_name, address_list=None):
567
    """Upload a file.
568

569
    The node will refuse the operation in case the file is not on the
570
    approved file list.
571

572
    This is a multi-node call.
573

574
    @type node_list: list
575
    @param node_list: the list of node names to upload to
576
    @type file_name: str
577
    @param file_name: the filename to upload
578
    @type address_list: list or None
579
    @keyword address_list: an optional list of node addresses, in order
580
        to optimize the RPC speed
581

582
    """
583
    data = utils.ReadFile(file_name)
584
    st = os.stat(file_name)
585
    params = [file_name, data, st.st_mode, st.st_uid, st.st_gid,
586
              st.st_atime, st.st_mtime]
587
    return cls._StaticMultiNodeCall(node_list, "upload_file", params,
588
                                    address_list=address_list)
589

    
590
  def call_os_diagnose(self, node_list):
591
    """Request a diagnose of OS definitions.
592

593
    This is a multi-node call.
594

595
    """
596
    result = self._MultiNodeCall(node_list, "os_diagnose", [])
597

    
598
    new_result = {}
599
    for node_name in result:
600
      if result[node_name]:
601
        nr = [objects.OS.FromDict(oss) for oss in result[node_name]]
602
      else:
603
        nr = []
604
      new_result[node_name] = nr
605
    return new_result
606

    
607
  def call_os_get(self, node, name):
608
    """Returns an OS definition.
609

610
    This is a single-node call.
611

612
    """
613
    result = self._SingleNodeCall(node, "os_get", [name])
614
    if isinstance(result, dict):
615
      return objects.OS.FromDict(result)
616
    else:
617
      return result
618

    
619
  def call_hooks_runner(self, node_list, hpath, phase, env):
620
    """Call the hooks runner.
621

622
    Args:
623
      - op: the OpCode instance
624
      - env: a dictionary with the environment
625

626
    This is a multi-node call.
627

628
    """
629
    params = [hpath, phase, env]
630
    return self._MultiNodeCall(node_list, "hooks_runner", params)
631

    
632
  def call_iallocator_runner(self, node, name, idata):
633
    """Call an iallocator on a remote node
634

635
    Args:
636
      - name: the iallocator name
637
      - input: the json-encoded input string
638

639
    This is a single-node call.
640

641
    """
642
    return self._SingleNodeCall(node, "iallocator_runner", [name, idata])
643

    
644
  def call_blockdev_grow(self, node, cf_bdev, amount):
645
    """Request a snapshot of the given block device.
646

647
    This is a single-node call.
648

649
    """
650
    return self._SingleNodeCall(node, "blockdev_grow",
651
                                [cf_bdev.ToDict(), amount])
652

    
653
  def call_blockdev_snapshot(self, node, cf_bdev):
654
    """Request a snapshot of the given block device.
655

656
    This is a single-node call.
657

658
    """
659
    return self._SingleNodeCall(node, "blockdev_snapshot", [cf_bdev.ToDict()])
660

    
661
  def call_snapshot_export(self, node, snap_bdev, dest_node, instance,
662
                           cluster_name, idx):
663
    """Request the export of a given snapshot.
664

665
    This is a single-node call.
666

667
    """
668
    return self._SingleNodeCall(node, "snapshot_export",
669
                                [snap_bdev.ToDict(), dest_node,
670
                                 self._InstDict(instance), cluster_name, idx])
671

    
672
  def call_finalize_export(self, node, instance, snap_disks):
673
    """Request the completion of an export operation.
674

675
    This writes the export config file, etc.
676

677
    This is a single-node call.
678

679
    """
680
    flat_disks = []
681
    for disk in snap_disks:
682
      flat_disks.append(disk.ToDict())
683

    
684
    return self._SingleNodeCall(node, "finalize_export",
685
                                [self._InstDict(instance), flat_disks])
686

    
687
  def call_export_info(self, node, path):
688
    """Queries the export information in a given path.
689

690
    This is a single-node call.
691

692
    """
693
    result = self._SingleNodeCall(node, "export_info", [path])
694
    if not result:
695
      return result
696
    return objects.SerializableConfigParser.Loads(str(result))
697

    
698
  def call_instance_os_import(self, node, inst, src_node, src_images,
699
                              cluster_name):
700
    """Request the import of a backup into an instance.
701

702
    This is a single-node call.
703

704
    """
705
    return self._SingleNodeCall(node, "instance_os_import",
706
                                [self._InstDict(inst), src_node, src_images,
707
                                 cluster_name])
708

    
709
  def call_export_list(self, node_list):
710
    """Gets the stored exports list.
711

712
    This is a multi-node call.
713

714
    """
715
    return self._MultiNodeCall(node_list, "export_list", [])
716

    
717
  def call_export_remove(self, node, export):
718
    """Requests removal of a given export.
719

720
    This is a single-node call.
721

722
    """
723
    return self._SingleNodeCall(node, "export_remove", [export])
724

    
725
  @classmethod
726
  def call_node_leave_cluster(cls, node):
727
    """Requests a node to clean the cluster information it has.
728

729
    This will remove the configuration information from the ganeti data
730
    dir.
731

732
    This is a single-node call.
733

734
    """
735
    return cls._StaticSingleNodeCall(node, "node_leave_cluster", [])
736

    
737
  def call_node_volumes(self, node_list):
738
    """Gets all volumes on node(s).
739

740
    This is a multi-node call.
741

742
    """
743
    return self._MultiNodeCall(node_list, "node_volumes", [])
744

    
745
  def call_test_delay(self, node_list, duration):
746
    """Sleep for a fixed time on given node(s).
747

748
    This is a multi-node call.
749

750
    """
751
    return self._MultiNodeCall(node_list, "test_delay", [duration])
752

    
753
  def call_file_storage_dir_create(self, node, file_storage_dir):
754
    """Create the given file storage directory.
755

756
    This is a single-node call.
757

758
    """
759
    return self._SingleNodeCall(node, "file_storage_dir_create",
760
                                [file_storage_dir])
761

    
762
  def call_file_storage_dir_remove(self, node, file_storage_dir):
763
    """Remove the given file storage directory.
764

765
    This is a single-node call.
766

767
    """
768
    return self._SingleNodeCall(node, "file_storage_dir_remove",
769
                                [file_storage_dir])
770

    
771
  def call_file_storage_dir_rename(self, node, old_file_storage_dir,
772
                                   new_file_storage_dir):
773
    """Rename file storage directory.
774

775
    This is a single-node call.
776

777
    """
778
    return self._SingleNodeCall(node, "file_storage_dir_rename",
779
                                [old_file_storage_dir, new_file_storage_dir])
780

    
781
  @classmethod
782
  def call_jobqueue_update(cls, node_list, address_list, file_name, content):
783
    """Update job queue.
784

785
    This is a multi-node call.
786

787
    """
788
    return cls._StaticMultiNodeCall(node_list, "jobqueue_update",
789
                                    [file_name, content],
790
                                    address_list=address_list)
791

    
792
  @classmethod
793
  def call_jobqueue_purge(cls, node):
794
    """Purge job queue.
795

796
    This is a single-node call.
797

798
    """
799
    return cls._StaticSingleNodeCall(node, "jobqueue_purge", [])
800

    
801
  @classmethod
802
  def call_jobqueue_rename(cls, node_list, address_list, old, new):
803
    """Rename a job queue file.
804

805
    This is a multi-node call.
806

807
    """
808
    return cls._StaticMultiNodeCall(node_list, "jobqueue_rename", [old, new],
809
                                    address_list=address_list)
810

    
811
  @classmethod
812
  def call_jobqueue_set_drain(cls, node_list, drain_flag):
813
    """Set the drain flag on the queue.
814

815
    This is a multi-node call.
816

817
    @type node_list: list
818
    @param node_list: the list of nodes to query
819
    @type drain_flag: bool
820
    @param drain_flag: if True, will set the drain flag, otherwise reset it.
821

822
    """
823
    return cls._StaticMultiNodeCall(node_list, "jobqueue_set_drain",
824
                                    [drain_flag])
825

    
826
  def call_hypervisor_validate_params(self, node_list, hvname, hvparams):
827
    """Validate the hypervisor params.
828

829
    This is a multi-node call.
830

831
    @type node_list: list
832
    @param node_list: the list of nodes to query
833
    @type hvname: string
834
    @param hvname: the hypervisor name
835
    @type hvparams: dict
836
    @param hvparams: the hypervisor parameters to be validated
837

838
    """
839
    cluster = self._cfg.GetClusterInfo()
840
    hv_full = cluster.FillDict(cluster.hvparams.get(hvname, {}), hvparams)
841
    return self._MultiNodeCall(node_list, "hypervisor_validate_params",
842
                               [hvname, hv_full])