Statistics
| Branch: | Tag: | Revision:

root / lib / rpc.py @ ec17d09c

History | View | Annotate | Download (24.6 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Inter-node RPC library.
23

24
"""
25

    
26
# pylint: disable-msg=C0103,R0201,R0904
27
# C0103: Invalid name, since call_ are not valid
28
# R0201: Method could be a function, we keep all rpcs instance methods
29
# as not to change them back and forth between static/instance methods
30
# if they need to start using instance attributes
31
# R0904: Too many public methods
32

    
33
import os
34
import socket
35
import httplib
36
import logging
37

    
38
from ganeti import utils
39
from ganeti import objects
40
from ganeti import http
41
from ganeti import serializer
42

    
43

    
44
# Module level variable
45
_http_manager = None
46

    
47

    
48
def Init():
49
  """Initializes the module-global HTTP client manager.
50

51
  Must be called before using any RPC function.
52

53
  """
54
  global _http_manager
55

    
56
  assert not _http_manager, "RPC module initialized more than once"
57

    
58
  _http_manager = http.HttpClientManager()
59

    
60

    
61
def Shutdown():
62
  """Stops the module-global HTTP client manager.
63

64
  Must be called before quitting the program.
65

66
  """
67
  global _http_manager
68

    
69
  if _http_manager:
70
    _http_manager.Shutdown()
71
    _http_manager = None
72

    
73

    
74
class Client:
75
  """RPC Client class.
76

77
  This class, given a (remote) method name, a list of parameters and a
78
  list of nodes, will contact (in parallel) all nodes, and return a
79
  dict of results (key: node name, value: result).
80

81
  One current bug is that generic failure is still signalled by
82
  'False' result, which is not good. This overloading of values can
83
  cause bugs.
84

85
  """
86
  def __init__(self, procedure, args):
87
    self.procedure = procedure
88
    self.args = args
89
    self.body = serializer.DumpJson(args, indent=False)
90

    
91
    self.port = utils.GetNodeDaemonPort()
92
    self.nc = {}
93

    
94
  def ConnectList(self, node_list, address_list=None):
95
    """Add a list of nodes to the target nodes.
96

97
    @type node_list: list
98
    @param node_list: the list of node names to connect
99
    @type address_list: list or None
100
    @keyword address_list: either None or a list with node addresses,
101
        which must have the same length as the node list
102

103
    """
104
    if address_list is None:
105
      address_list = [None for _ in node_list]
106
    else:
107
      assert len(node_list) == len(address_list), \
108
             "Name and address lists should have the same length"
109
    for node, address in zip(node_list, address_list):
110
      self.ConnectNode(node, address)
111

    
112
  def ConnectNode(self, name, address=None):
113
    """Add a node to the target list.
114

115
    @type name: str
116
    @param name: the node name
117
    @type address: str
118
    @keyword address: the node address, if known
119

120
    """
121
    if address is None:
122
      address = name
123

    
124
    self.nc[name] = http.HttpClientRequest(address, self.port, http.HTTP_PUT,
125
                                           "/%s" % self.procedure,
126
                                           post_data=self.body)
127

    
128
  def GetResults(self):
129
    """Call nodes and return results.
130

131
    @rtype: list
132
    @returns: List of RPC results
133

134
    """
135
    assert _http_manager, "RPC module not intialized"
136

    
137
    _http_manager.ExecRequests(self.nc.values())
138

    
139
    results = {}
140

    
141
    for name, req in self.nc.iteritems():
142
      if req.success and req.resp_status == http.HTTP_OK:
143
        results[name] = serializer.LoadJson(req.resp_body)
144
        continue
145

    
146
      if req.error:
147
        msg = req.error
148
      else:
149
        msg = req.resp_body
150

    
151
      logging.error("RPC error from node %s: %s", name, msg)
152
      results[name] = False
153

    
154
    return results
155

    
156

    
157
class RpcRunner(object):
158
  """RPC runner class"""
159

    
160
  def __init__(self, cfg):
161
    """Initialized the rpc runner.
162

163
    @type cfg:  C{config.ConfigWriter}
164
    @param cfg: the configuration object that will be used to get data
165
                about the cluster
166

167
    """
168
    self._cfg = cfg
169

    
170
  def _InstDict(self, instance):
171
    """Convert the given instance to a dict.
172

173
    This is done via the instance's ToDict() method and additionally
174
    we fill the hvparams with the cluster defaults.
175

176
    @type instance: L{objects.Instance}
177
    @param instance: an Instance object
178
    @rtype: dict
179
    @return: the instance dict, with the hvparams filled with the
180
        cluster defaults
181

182
    """
183
    idict = instance.ToDict()
184
    cluster = self._cfg.GetClusterInfo()
185
    idict["hvparams"] = cluster.FillHV(instance)
186
    idict["beparams"] = cluster.FillBE(instance)
187
    return idict
188

    
189
  def _ConnectList(self, client, node_list):
190
    """Helper for computing node addresses.
191

192
    @type client: L{Client}
193
    @param client: a C{Client} instance
194
    @type node_list: list
195
    @param node_list: the node list we should connect
196

197
    """
198
    all_nodes = self._cfg.GetAllNodesInfo()
199
    addr_list = []
200
    for node in node_list:
201
      if node in all_nodes:
202
        val = all_nodes[node].primary_ip
203
      else:
204
        val = None
205
      addr_list.append(val)
206
    client.ConnectList(node_list, address_list=addr_list)
207

    
208
  def _ConnectNode(self, client, node):
209
    """Helper for computing one node's address.
210

211
    @type client: L{Client}
212
    @param client: a C{Client} instance
213
    @type node: str
214
    @param node: the node we should connect
215

216
    """
217
    node_info = self._cfg.GetNodeInfo(node)
218
    if node_info is not None:
219
      addr = node_info.primary_ip
220
    else:
221
      addr = None
222
    client.ConnectNode(node, address=addr)
223

    
224
  def _MultiNodeCall(self, node_list, procedure, args,
225
                     address_list=None):
226
    c = Client(procedure, args)
227
    if address_list is None:
228
      self._ConnectList(c, node_list)
229
    else:
230
      c.ConnectList(node_list, address_list=address_list)
231
    return c.GetResults()
232

    
233
  @classmethod
234
  def _StaticMultiNodeCall(cls, node_list, procedure, args,
235
                           address_list=None):
236
    c = Client(procedure, args)
237
    c.ConnectList(node_list, address_list=address_list)
238
    return c.GetResults()
239

    
240
  def _SingleNodeCall(self, node, procedure, args):
241
    """
242

243
    """
244
    c = Client(procedure, args)
245
    self._ConnectNode(c, node)
246
    return c.GetResults().get(node, False)
247

    
248
  @classmethod
249
  def _StaticSingleNodeCall(cls, node, procedure, args):
250
    """
251

252
    """
253
    c = Client(procedure, args)
254
    c.ConnectNode(c, node)
255
    return c.GetResults().get(node, False)
256

    
257
  def call_volume_list(self, node_list, vg_name):
258
    """Gets the logical volumes present in a given volume group.
259

260
    This is a multi-node call.
261

262
    """
263
    return self._MultiNodeCall(node_list, "volume_list", [vg_name])
264

    
265
  def call_vg_list(self, node_list):
266
    """Gets the volume group list.
267

268
    This is a multi-node call.
269

270
    """
271
    return self._MultiNodeCall(node_list, "vg_list", [])
272

    
273
  def call_bridges_exist(self, node, bridges_list):
274
    """Checks if a node has all the bridges given.
275

276
    This method checks if all bridges given in the bridges_list are
277
    present on the remote node, so that an instance that uses interfaces
278
    on those bridges can be started.
279

280
    This is a single-node call.
281

282
    """
283
    return self._SingleNodeCall(node, "bridges_exist", [bridges_list])
284

    
285
  def call_instance_start(self, node, instance, extra_args):
286
    """Starts an instance.
287

288
    This is a single-node call.
289

290
    """
291
    return self._SingleNodeCall(node, "instance_start",
292
                                [self._InstDict(instance), extra_args])
293

    
294
  def call_instance_shutdown(self, node, instance):
295
    """Stops an instance.
296

297
    This is a single-node call.
298

299
    """
300
    return self._SingleNodeCall(node, "instance_shutdown",
301
                                [self._InstDict(instance)])
302

    
303
  def call_instance_migrate(self, node, instance, target, live):
304
    """Migrate an instance.
305

306
    This is a single-node call.
307

308
    @type node: string
309
    @param node: the node on which the instance is currently running
310
    @type instance: C{objects.Instance}
311
    @param instance: the instance definition
312
    @type target: string
313
    @param target: the target node name
314
    @type live: boolean
315
    @param live: whether the migration should be done live or not (the
316
        interpretation of this parameter is left to the hypervisor)
317

318
    """
319
    return self._SingleNodeCall(node, "instance_migrate",
320
                                [self._InstDict(instance), target, live])
321

    
322
  def call_instance_reboot(self, node, instance, reboot_type, extra_args):
323
    """Reboots an instance.
324

325
    This is a single-node call.
326

327
    """
328
    return self._SingleNodeCall(node, "instance_reboot",
329
                                [self._InstDict(instance), reboot_type,
330
                                 extra_args])
331

    
332
  def call_instance_os_add(self, node, inst):
333
    """Installs an OS on the given instance.
334

335
    This is a single-node call.
336

337
    """
338
    return self._SingleNodeCall(node, "instance_os_add",
339
                                [self._InstDict(inst)])
340

    
341
  def call_instance_run_rename(self, node, inst, old_name):
342
    """Run the OS rename script for an instance.
343

344
    This is a single-node call.
345

346
    """
347
    return self._SingleNodeCall(node, "instance_run_rename",
348
                                [self._InstDict(inst), old_name])
349

    
350
  def call_instance_info(self, node, instance, hname):
351
    """Returns information about a single instance.
352

353
    This is a single-node call.
354

355
    @type node: list
356
    @param node: the list of nodes to query
357
    @type instance: string
358
    @param instance: the instance name
359
    @type hname: string
360
    @param hname: the hypervisor type of the instance
361

362
    """
363
    return self._SingleNodeCall(node, "instance_info", [instance, hname])
364

    
365
  def call_all_instances_info(self, node_list, hypervisor_list):
366
    """Returns information about all instances on the given nodes.
367

368
    This is a multi-node call.
369

370
    @type node_list: list
371
    @param node_list: the list of nodes to query
372
    @type hypervisor_list: list
373
    @param hypervisor_list: the hypervisors to query for instances
374

375
    """
376
    return self._MultiNodeCall(node_list, "all_instances_info",
377
                               [hypervisor_list])
378

    
379
  def call_instance_list(self, node_list, hypervisor_list):
380
    """Returns the list of running instances on a given node.
381

382
    This is a multi-node call.
383

384
    @type node_list: list
385
    @param node_list: the list of nodes to query
386
    @type hypervisor_list: list
387
    @param hypervisor_list: the hypervisors to query for instances
388

389
    """
390
    return self._MultiNodeCall(node_list, "instance_list", [hypervisor_list])
391

    
392
  def call_node_tcp_ping(self, node, source, target, port, timeout,
393
                         live_port_needed):
394
    """Do a TcpPing on the remote node
395

396
    This is a single-node call.
397

398
    """
399
    return self._SingleNodeCall(node, "node_tcp_ping",
400
                                [source, target, port, timeout,
401
                                 live_port_needed])
402

    
403
  def call_node_has_ip_address(self, node, address):
404
    """Checks if a node has the given IP address.
405

406
    This is a single-node call.
407

408
    """
409
    return self._SingleNodeCall(node, "node_has_ip_address", [address])
410

    
411
  def call_node_info(self, node_list, vg_name, hypervisor_type):
412
    """Return node information.
413

414
    This will return memory information and volume group size and free
415
    space.
416

417
    This is a multi-node call.
418

419
    @type node_list: list
420
    @param node_list: the list of nodes to query
421
    @type vgname: C{string}
422
    @param vgname: the name of the volume group to ask for disk space
423
        information
424
    @type hypervisor_type: C{str}
425
    @param hypervisor_type: the name of the hypervisor to ask for
426
        memory information
427

428
    """
429
    retux = self._MultiNodeCall(node_list, "node_info",
430
                                [vg_name, hypervisor_type])
431

    
432
    for node_name in retux:
433
      ret = retux.get(node_name, False)
434
      if type(ret) != dict:
435
        logging.error("could not connect to node %s", node_name)
436
        ret = {}
437

    
438
      utils.CheckDict(ret, {
439
                        'memory_total' : '-',
440
                        'memory_dom0' : '-',
441
                        'memory_free' : '-',
442
                        'vg_size' : 'node_unreachable',
443
                        'vg_free' : '-',
444
                      }, "call_node_info")
445
    return retux
446

    
447
  def call_node_add(self, node, dsa, dsapub, rsa, rsapub, ssh, sshpub):
448
    """Add a node to the cluster.
449

450
    This is a single-node call.
451

452
    """
453
    return self._SingleNodeCall(node, "node_add",
454
                                [dsa, dsapub, rsa, rsapub, ssh, sshpub])
455

    
456
  def call_node_verify(self, node_list, checkdict, cluster_name):
457
    """Request verification of given parameters.
458

459
    This is a multi-node call.
460

461
    """
462
    return self._MultiNodeCall(node_list, "node_verify",
463
                               [checkdict, cluster_name])
464

    
465
  @classmethod
466
  def call_node_start_master(cls, node, start_daemons):
467
    """Tells a node to activate itself as a master.
468

469
    This is a single-node call.
470

471
    """
472
    return cls._StaticSingleNodeCall(node, "node_start_master",
473
                                     [start_daemons])
474

    
475
  @classmethod
476
  def call_node_stop_master(cls, node, stop_daemons):
477
    """Tells a node to demote itself from master status.
478

479
    This is a single-node call.
480

481
    """
482
    return cls._StaticSingleNodeCall(node, "node_stop_master", [stop_daemons])
483

    
484
  @classmethod
485
  def call_master_info(cls, node_list):
486
    """Query master info.
487

488
    This is a multi-node call.
489

490
    """
491
    # TODO: should this method query down nodes?
492
    return cls._StaticMultiNodeCall(node_list, "master_info", [])
493

    
494
  def call_version(self, node_list):
495
    """Query node version.
496

497
    This is a multi-node call.
498

499
    """
500
    return self._MultiNodeCall(node_list, "version", [])
501

    
502
  def call_blockdev_create(self, node, bdev, size, owner, on_primary, info):
503
    """Request creation of a given block device.
504

505
    This is a single-node call.
506

507
    """
508
    return self._SingleNodeCall(node, "blockdev_create",
509
                                [bdev.ToDict(), size, owner, on_primary, info])
510

    
511
  def call_blockdev_remove(self, node, bdev):
512
    """Request removal of a given block device.
513

514
    This is a single-node call.
515

516
    """
517
    return self._SingleNodeCall(node, "blockdev_remove", [bdev.ToDict()])
518

    
519
  def call_blockdev_rename(self, node, devlist):
520
    """Request rename of the given block devices.
521

522
    This is a single-node call.
523

524
    """
525
    return self._SingleNodeCall(node, "blockdev_rename",
526
                                [(d.ToDict(), uid) for d, uid in devlist])
527

    
528
  def call_blockdev_assemble(self, node, disk, owner, on_primary):
529
    """Request assembling of a given block device.
530

531
    This is a single-node call.
532

533
    """
534
    return self._SingleNodeCall(node, "blockdev_assemble",
535
                                [disk.ToDict(), owner, on_primary])
536

    
537
  def call_blockdev_shutdown(self, node, disk):
538
    """Request shutdown of a given block device.
539

540
    This is a single-node call.
541

542
    """
543
    return self._SingleNodeCall(node, "blockdev_shutdown", [disk.ToDict()])
544

    
545
  def call_blockdev_addchildren(self, node, bdev, ndevs):
546
    """Request adding a list of children to a (mirroring) device.
547

548
    This is a single-node call.
549

550
    """
551
    return self._SingleNodeCall(node, "blockdev_addchildren",
552
                                [bdev.ToDict(),
553
                                 [disk.ToDict() for disk in ndevs]])
554

    
555
  def call_blockdev_removechildren(self, node, bdev, ndevs):
556
    """Request removing a list of children from a (mirroring) device.
557

558
    This is a single-node call.
559

560
    """
561
    return self._SingleNodeCall(node, "blockdev_removechildren",
562
                                [bdev.ToDict(),
563
                                 [disk.ToDict() for disk in ndevs]])
564

    
565
  def call_blockdev_getmirrorstatus(self, node, disks):
566
    """Request status of a (mirroring) device.
567

568
    This is a single-node call.
569

570
    """
571
    return self._SingleNodeCall(node, "blockdev_getmirrorstatus",
572
                                [dsk.ToDict() for dsk in disks])
573

    
574
  def call_blockdev_find(self, node, disk):
575
    """Request identification of a given block device.
576

577
    This is a single-node call.
578

579
    """
580
    return self._SingleNodeCall(node, "blockdev_find", [disk.ToDict()])
581

    
582
  def call_blockdev_close(self, node, disks):
583
    """Closes the given block devices.
584

585
    This is a single-node call.
586

587
    """
588
    return self._SingleNodeCall(node, "blockdev_close",
589
                                [cf.ToDict() for cf in disks])
590

    
591
  @classmethod
592
  def call_upload_file(cls, node_list, file_name, address_list=None):
593
    """Upload a file.
594

595
    The node will refuse the operation in case the file is not on the
596
    approved file list.
597

598
    This is a multi-node call.
599

600
    @type node_list: list
601
    @param node_list: the list of node names to upload to
602
    @type file_name: str
603
    @param file_name: the filename to upload
604
    @type address_list: list or None
605
    @keyword address_list: an optional list of node addresses, in order
606
        to optimize the RPC speed
607

608
    """
609
    data = utils.ReadFile(file_name)
610
    st = os.stat(file_name)
611
    params = [file_name, data, st.st_mode, st.st_uid, st.st_gid,
612
              st.st_atime, st.st_mtime]
613
    return cls._StaticMultiNodeCall(node_list, "upload_file", params,
614
                                    address_list=address_list)
615

    
616
  @classmethod
617
  def call_write_ssconf_files(cls, node_list):
618
    """Write ssconf files.
619

620
    This is a multi-node call.
621

622
    """
623
    return cls._StaticMultiNodeCall(node_list, "write_ssconf_files", [])
624

    
625
  def call_os_diagnose(self, node_list):
626
    """Request a diagnose of OS definitions.
627

628
    This is a multi-node call.
629

630
    """
631
    result = self._MultiNodeCall(node_list, "os_diagnose", [])
632

    
633
    new_result = {}
634
    for node_name in result:
635
      if result[node_name]:
636
        nr = [objects.OS.FromDict(oss) for oss in result[node_name]]
637
      else:
638
        nr = []
639
      new_result[node_name] = nr
640
    return new_result
641

    
642
  def call_os_get(self, node, name):
643
    """Returns an OS definition.
644

645
    This is a single-node call.
646

647
    """
648
    result = self._SingleNodeCall(node, "os_get", [name])
649
    if isinstance(result, dict):
650
      return objects.OS.FromDict(result)
651
    else:
652
      return result
653

    
654
  def call_hooks_runner(self, node_list, hpath, phase, env):
655
    """Call the hooks runner.
656

657
    Args:
658
      - op: the OpCode instance
659
      - env: a dictionary with the environment
660

661
    This is a multi-node call.
662

663
    """
664
    params = [hpath, phase, env]
665
    return self._MultiNodeCall(node_list, "hooks_runner", params)
666

    
667
  def call_iallocator_runner(self, node, name, idata):
668
    """Call an iallocator on a remote node
669

670
    Args:
671
      - name: the iallocator name
672
      - input: the json-encoded input string
673

674
    This is a single-node call.
675

676
    """
677
    return self._SingleNodeCall(node, "iallocator_runner", [name, idata])
678

    
679
  def call_blockdev_grow(self, node, cf_bdev, amount):
680
    """Request a snapshot of the given block device.
681

682
    This is a single-node call.
683

684
    """
685
    return self._SingleNodeCall(node, "blockdev_grow",
686
                                [cf_bdev.ToDict(), amount])
687

    
688
  def call_blockdev_snapshot(self, node, cf_bdev):
689
    """Request a snapshot of the given block device.
690

691
    This is a single-node call.
692

693
    """
694
    return self._SingleNodeCall(node, "blockdev_snapshot", [cf_bdev.ToDict()])
695

    
696
  def call_snapshot_export(self, node, snap_bdev, dest_node, instance,
697
                           cluster_name, idx):
698
    """Request the export of a given snapshot.
699

700
    This is a single-node call.
701

702
    """
703
    return self._SingleNodeCall(node, "snapshot_export",
704
                                [snap_bdev.ToDict(), dest_node,
705
                                 self._InstDict(instance), cluster_name, idx])
706

    
707
  def call_finalize_export(self, node, instance, snap_disks):
708
    """Request the completion of an export operation.
709

710
    This writes the export config file, etc.
711

712
    This is a single-node call.
713

714
    """
715
    flat_disks = []
716
    for disk in snap_disks:
717
      flat_disks.append(disk.ToDict())
718

    
719
    return self._SingleNodeCall(node, "finalize_export",
720
                                [self._InstDict(instance), flat_disks])
721

    
722
  def call_export_info(self, node, path):
723
    """Queries the export information in a given path.
724

725
    This is a single-node call.
726

727
    """
728
    result = self._SingleNodeCall(node, "export_info", [path])
729
    if not result:
730
      return result
731
    return objects.SerializableConfigParser.Loads(str(result))
732

    
733
  def call_instance_os_import(self, node, inst, src_node, src_images,
734
                              cluster_name):
735
    """Request the import of a backup into an instance.
736

737
    This is a single-node call.
738

739
    """
740
    return self._SingleNodeCall(node, "instance_os_import",
741
                                [self._InstDict(inst), src_node, src_images,
742
                                 cluster_name])
743

    
744
  def call_export_list(self, node_list):
745
    """Gets the stored exports list.
746

747
    This is a multi-node call.
748

749
    """
750
    return self._MultiNodeCall(node_list, "export_list", [])
751

    
752
  def call_export_remove(self, node, export):
753
    """Requests removal of a given export.
754

755
    This is a single-node call.
756

757
    """
758
    return self._SingleNodeCall(node, "export_remove", [export])
759

    
760
  @classmethod
761
  def call_node_leave_cluster(cls, node):
762
    """Requests a node to clean the cluster information it has.
763

764
    This will remove the configuration information from the ganeti data
765
    dir.
766

767
    This is a single-node call.
768

769
    """
770
    return cls._StaticSingleNodeCall(node, "node_leave_cluster", [])
771

    
772
  def call_node_volumes(self, node_list):
773
    """Gets all volumes on node(s).
774

775
    This is a multi-node call.
776

777
    """
778
    return self._MultiNodeCall(node_list, "node_volumes", [])
779

    
780
  def call_test_delay(self, node_list, duration):
781
    """Sleep for a fixed time on given node(s).
782

783
    This is a multi-node call.
784

785
    """
786
    return self._MultiNodeCall(node_list, "test_delay", [duration])
787

    
788
  def call_file_storage_dir_create(self, node, file_storage_dir):
789
    """Create the given file storage directory.
790

791
    This is a single-node call.
792

793
    """
794
    return self._SingleNodeCall(node, "file_storage_dir_create",
795
                                [file_storage_dir])
796

    
797
  def call_file_storage_dir_remove(self, node, file_storage_dir):
798
    """Remove the given file storage directory.
799

800
    This is a single-node call.
801

802
    """
803
    return self._SingleNodeCall(node, "file_storage_dir_remove",
804
                                [file_storage_dir])
805

    
806
  def call_file_storage_dir_rename(self, node, old_file_storage_dir,
807
                                   new_file_storage_dir):
808
    """Rename file storage directory.
809

810
    This is a single-node call.
811

812
    """
813
    return self._SingleNodeCall(node, "file_storage_dir_rename",
814
                                [old_file_storage_dir, new_file_storage_dir])
815

    
816
  @classmethod
817
  def call_jobqueue_update(cls, node_list, address_list, file_name, content):
818
    """Update job queue.
819

820
    This is a multi-node call.
821

822
    """
823
    return cls._StaticMultiNodeCall(node_list, "jobqueue_update",
824
                                    [file_name, content],
825
                                    address_list=address_list)
826

    
827
  @classmethod
828
  def call_jobqueue_purge(cls, node):
829
    """Purge job queue.
830

831
    This is a single-node call.
832

833
    """
834
    return cls._StaticSingleNodeCall(node, "jobqueue_purge", [])
835

    
836
  @classmethod
837
  def call_jobqueue_rename(cls, node_list, address_list, old, new):
838
    """Rename a job queue file.
839

840
    This is a multi-node call.
841

842
    """
843
    return cls._StaticMultiNodeCall(node_list, "jobqueue_rename", [old, new],
844
                                    address_list=address_list)
845

    
846
  @classmethod
847
  def call_jobqueue_set_drain(cls, node_list, drain_flag):
848
    """Set the drain flag on the queue.
849

850
    This is a multi-node call.
851

852
    @type node_list: list
853
    @param node_list: the list of nodes to query
854
    @type drain_flag: bool
855
    @param drain_flag: if True, will set the drain flag, otherwise reset it.
856

857
    """
858
    return cls._StaticMultiNodeCall(node_list, "jobqueue_set_drain",
859
                                    [drain_flag])
860

    
861
  def call_hypervisor_validate_params(self, node_list, hvname, hvparams):
862
    """Validate the hypervisor params.
863

864
    This is a multi-node call.
865

866
    @type node_list: list
867
    @param node_list: the list of nodes to query
868
    @type hvname: string
869
    @param hvname: the hypervisor name
870
    @type hvparams: dict
871
    @param hvparams: the hypervisor parameters to be validated
872

873
    """
874
    cluster = self._cfg.GetClusterInfo()
875
    hv_full = cluster.FillDict(cluster.hvparams.get(hvname, {}), hvparams)
876
    return self._MultiNodeCall(node_list, "hypervisor_validate_params",
877
                               [hvname, hv_full])