Statistics
| Branch: | Tag: | Revision:

root / lib / rpc.py @ 16ad1a83

History | View | Annotate | Download (22.2 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Script to show add a new node to the cluster
23

24
"""
25

    
26
# pylint: disable-msg=C0103,R0201,R0904
27
# C0103: Invalid name, since call_ are not valid
28
# R0201: Method could be a function, we keep all rpcs instance methods
29
# as not to change them back and forth between static/instance methods
30
# if they need to start using instance attributes
31
# R0904: Too many public methods
32

    
33
import os
34
import socket
35
import httplib
36

    
37
import simplejson
38

    
39
from ganeti import logger
40
from ganeti import utils
41
from ganeti import objects
42

    
43

    
44
class NodeController:
45
  """Node-handling class.
46

47
  For each node that we speak with, we create an instance of this
48
  class, so that we have a safe place to store the details of this
49
  individual call.
50

51
  """
52
  def __init__(self, parent, node):
53
    self.parent = parent
54
    self.node = node
55
    self.failed = False
56

    
57
    self.http_conn = hc = httplib.HTTPConnection(node, self.parent.port)
58
    try:
59
      hc.connect()
60
      hc.putrequest('PUT', "/%s" % self.parent.procedure,
61
                    skip_accept_encoding=True)
62
      hc.putheader('Content-Length', str(len(parent.body)))
63
      hc.endheaders()
64
      hc.send(parent.body)
65
    except socket.error, err:
66
      logger.Error("Error connecting to %s: %s" % (node, str(err)))
67
      self.failed = True
68

    
69
  def get_response(self):
70
    """Try to process the response from the node.
71

72
    """
73
    if self.failed:
74
      # we already failed in connect
75
      return False
76
    resp = self.http_conn.getresponse()
77
    if resp.status != 200:
78
      return False
79
    try:
80
      length = int(resp.getheader('Content-Length', '0'))
81
    except ValueError:
82
      return False
83
    if not length:
84
      logger.Error("Zero-length reply from %s" % self.node)
85
      return False
86
    payload = resp.read(length)
87
    unload = simplejson.loads(payload)
88
    return unload
89

    
90

    
91
class Client:
92
  """RPC Client class.
93

94
  This class, given a (remote) method name, a list of parameters and a
95
  list of nodes, will contact (in parallel) all nodes, and return a
96
  dict of results (key: node name, value: result).
97

98
  One current bug is that generic failure is still signalled by
99
  'False' result, which is not good. This overloading of values can
100
  cause bugs.
101

102
  """
103
  result_set = False
104
  result = False
105
  allresult = []
106

    
107
  def __init__(self, procedure, args):
108
    self.port = utils.GetNodeDaemonPort()
109
    self.nodepw = utils.GetNodeDaemonPassword()
110
    self.nc = {}
111
    self.results = {}
112
    self.procedure = procedure
113
    self.args = args
114
    self.body = simplejson.dumps(args)
115

    
116
  #--- generic connector -------------
117

    
118
  def connect_list(self, node_list):
119
    """Add a list of nodes to the target nodes.
120

121
    """
122
    for node in node_list:
123
      self.connect(node)
124

    
125
  def connect(self, connect_node):
126
    """Add a node to the target list.
127

128
    """
129
    self.nc[connect_node] = nc = NodeController(self, connect_node)
130

    
131
  def getresult(self):
132
    """Return the results of the call.
133

134
    """
135
    return self.results
136

    
137
  def run(self):
138
    """Wrapper over reactor.run().
139

140
    This function simply calls reactor.run() if we have any requests
141
    queued, otherwise it does nothing.
142

143
    """
144
    for node, nc in self.nc.items():
145
      self.results[node] = nc.get_response()
146

    
147

    
148
class RpcRunner(object):
149
  """RPC runner class"""
150

    
151
  def __init__(self, cfg):
152
    """Initialized the rpc runner.
153

154
    @type cfg:  C{config.ConfigWriter}
155
    @param cfg: the configuration object that will be used to get data
156
                about the cluster
157

158
    """
159
    self._cfg = cfg
160

    
161
  def call_volume_list(self, node_list, vg_name):
162
    """Gets the logical volumes present in a given volume group.
163

164
    This is a multi-node call.
165

166
    """
167
    c = Client("volume_list", [vg_name])
168
    c.connect_list(node_list)
169
    c.run()
170
    return c.getresult()
171

    
172
  def call_vg_list(self, node_list):
173
    """Gets the volume group list.
174

175
    This is a multi-node call.
176

177
    """
178
    c = Client("vg_list", [])
179
    c.connect_list(node_list)
180
    c.run()
181
    return c.getresult()
182

    
183
  def call_bridges_exist(self, node, bridges_list):
184
    """Checks if a node has all the bridges given.
185

186
    This method checks if all bridges given in the bridges_list are
187
    present on the remote node, so that an instance that uses interfaces
188
    on those bridges can be started.
189

190
    This is a single-node call.
191

192
    """
193
    c = Client("bridges_exist", [bridges_list])
194
    c.connect(node)
195
    c.run()
196
    return c.getresult().get(node, False)
197

    
198
  def call_instance_start(self, node, instance, extra_args):
199
    """Starts an instance.
200

201
    This is a single-node call.
202

203
    """
204
    c = Client("instance_start", [instance.ToDict(), extra_args])
205
    c.connect(node)
206
    c.run()
207
    return c.getresult().get(node, False)
208

    
209
  def call_instance_shutdown(self, node, instance):
210
    """Stops an instance.
211

212
    This is a single-node call.
213

214
    """
215
    c = Client("instance_shutdown", [instance.ToDict()])
216
    c.connect(node)
217
    c.run()
218
    return c.getresult().get(node, False)
219

    
220
  def call_instance_migrate(self, node, instance, target, live):
221
    """Migrate an instance.
222

223
    This is a single-node call.
224

225
    @type node: string
226
    @param node: the node on which the instance is currently running
227
    @type instance: C{objects.Instance}
228
    @param instance: the instance definition
229
    @type target: string
230
    @param target: the target node name
231
    @type live: boolean
232
    @param live: whether the migration should be done live or not (the
233
        interpretation of this parameter is left to the hypervisor)
234

235
    """
236
    c = Client("instance_migrate", [instance.ToDict(), target, live])
237
    c.connect(node)
238
    c.run()
239
    return c.getresult().get(node, False)
240

    
241
  def call_instance_reboot(self, node, instance, reboot_type, extra_args):
242
    """Reboots an instance.
243

244
    This is a single-node call.
245

246
    """
247
    c = Client("instance_reboot", [instance.ToDict(), reboot_type, extra_args])
248
    c.connect(node)
249
    c.run()
250
    return c.getresult().get(node, False)
251

    
252
  def call_instance_os_add(self, node, inst, osdev, swapdev):
253
    """Installs an OS on the given instance.
254

255
    This is a single-node call.
256

257
    """
258
    params = [inst.ToDict(), osdev, swapdev]
259
    c = Client("instance_os_add", params)
260
    c.connect(node)
261
    c.run()
262
    return c.getresult().get(node, False)
263

    
264
  def call_instance_run_rename(self, node, inst, old_name, osdev, swapdev):
265
    """Run the OS rename script for an instance.
266

267
    This is a single-node call.
268

269
    """
270
    params = [inst.ToDict(), old_name, osdev, swapdev]
271
    c = Client("instance_run_rename", params)
272
    c.connect(node)
273
    c.run()
274
    return c.getresult().get(node, False)
275

    
276
  def call_instance_info(self, node, instance, hname):
277
    """Returns information about a single instance.
278

279
    This is a single-node call.
280

281
    @type node_list: list
282
    @param node_list: the list of nodes to query
283
    @type instance: string
284
    @param instance: the instance name
285
    @type hname: string
286
    @param hname: the hypervisor type of the instance
287

288
    """
289
    c = Client("instance_info", [instance, hname])
290
    c.connect(node)
291
    c.run()
292
    return c.getresult().get(node, False)
293

    
294
  def call_all_instances_info(self, node_list, hypervisor_list):
295
    """Returns information about all instances on the given nodes.
296

297
    This is a multi-node call.
298

299
    @type node_list: list
300
    @param node_list: the list of nodes to query
301
    @type hypervisor_list: list
302
    @param hypervisor_list: the hypervisors to query for instances
303

304
    """
305
    c = Client("all_instances_info", [hypervisor_list])
306
    c.connect_list(node_list)
307
    c.run()
308
    return c.getresult()
309

    
310
  def call_instance_list(self, node_list, hypervisor_list):
311
    """Returns the list of running instances on a given node.
312

313
    This is a multi-node call.
314

315
    @type node_list: list
316
    @param node_list: the list of nodes to query
317
    @type hypervisor_list: list
318
    @param hypervisor_list: the hypervisors to query for instances
319

320
    """
321
    c = Client("instance_list", [hypervisor_list])
322
    c.connect_list(node_list)
323
    c.run()
324
    return c.getresult()
325

    
326
  def call_node_tcp_ping(self, node, source, target, port, timeout,
327
                         live_port_needed):
328
    """Do a TcpPing on the remote node
329

330
    This is a single-node call.
331

332
    """
333
    c = Client("node_tcp_ping", [source, target, port, timeout,
334
                                 live_port_needed])
335
    c.connect(node)
336
    c.run()
337
    return c.getresult().get(node, False)
338

    
339
  def call_node_has_ip_address(self, node, address):
340
    """Checks if a node has the given IP address.
341

342
    This is a single-node call.
343

344
    """
345
    c = Client("node_has_ip_address", [address])
346
    c.connect(node)
347
    c.run()
348
    return c.getresult().get(node, False)
349

    
350
  def call_node_info(self, node_list, vg_name, hypervisor_type):
351
    """Return node information.
352

353
    This will return memory information and volume group size and free
354
    space.
355

356
    This is a multi-node call.
357

358
    @type node_list: list
359
    @param node_list: the list of nodes to query
360
    @type vgname: C{string}
361
    @param vgname: the name of the volume group to ask for disk space
362
        information
363
    @type hypervisor_type: C{str}
364
    @param hypervisor_type: the name of the hypervisor to ask for
365
        memory information
366

367
    """
368
    c = Client("node_info", [vg_name, hypervisor_type])
369
    c.connect_list(node_list)
370
    c.run()
371
    retux = c.getresult()
372

    
373
    for node_name in retux:
374
      ret = retux.get(node_name, False)
375
      if type(ret) != dict:
376
        logger.Error("could not connect to node %s" % (node_name))
377
        ret = {}
378

    
379
      utils.CheckDict(ret,
380
                      { 'memory_total' : '-',
381
                        'memory_dom0' : '-',
382
                        'memory_free' : '-',
383
                        'vg_size' : 'node_unreachable',
384
                        'vg_free' : '-' },
385
                      "call_node_info",
386
                      )
387
    return retux
388

    
389
  def call_node_add(self, node, dsa, dsapub, rsa, rsapub, ssh, sshpub):
390
    """Add a node to the cluster.
391

392
    This is a single-node call.
393

394
    """
395
    params = [dsa, dsapub, rsa, rsapub, ssh, sshpub]
396
    c = Client("node_add", params)
397
    c.connect(node)
398
    c.run()
399
    return c.getresult().get(node, False)
400

    
401
  def call_node_verify(self, node_list, checkdict, cluster_name):
402
    """Request verification of given parameters.
403

404
    This is a multi-node call.
405

406
    """
407
    c = Client("node_verify", [checkdict, cluster_name])
408
    c.connect_list(node_list)
409
    c.run()
410
    return c.getresult()
411

    
412
  @staticmethod
413
  def call_node_start_master(node, start_daemons):
414
    """Tells a node to activate itself as a master.
415

416
    This is a single-node call.
417

418
    """
419
    c = Client("node_start_master", [start_daemons])
420
    c.connect(node)
421
    c.run()
422
    return c.getresult().get(node, False)
423

    
424
  @staticmethod
425
  def call_node_stop_master(node, stop_daemons):
426
    """Tells a node to demote itself from master status.
427

428
    This is a single-node call.
429

430
    """
431
    c = Client("node_stop_master", [stop_daemons])
432
    c.connect(node)
433
    c.run()
434
    return c.getresult().get(node, False)
435

    
436
  @staticmethod
437
  def call_master_info(node_list):
438
    """Query master info.
439

440
    This is a multi-node call.
441

442
    """
443
    # TODO: should this method query down nodes?
444
    c = Client("master_info", [])
445
    c.connect_list(node_list)
446
    c.run()
447
    return c.getresult()
448

    
449
  def call_version(self, node_list):
450
    """Query node version.
451

452
    This is a multi-node call.
453

454
    """
455
    c = Client("version", [])
456
    c.connect_list(node_list)
457
    c.run()
458
    return c.getresult()
459

    
460
  def call_blockdev_create(self, node, bdev, size, owner, on_primary, info):
461
    """Request creation of a given block device.
462

463
    This is a single-node call.
464

465
    """
466
    params = [bdev.ToDict(), size, owner, on_primary, info]
467
    c = Client("blockdev_create", params)
468
    c.connect(node)
469
    c.run()
470
    return c.getresult().get(node, False)
471

    
472
  def call_blockdev_remove(self, node, bdev):
473
    """Request removal of a given block device.
474

475
    This is a single-node call.
476

477
    """
478
    c = Client("blockdev_remove", [bdev.ToDict()])
479
    c.connect(node)
480
    c.run()
481
    return c.getresult().get(node, False)
482

    
483
  def call_blockdev_rename(self, node, devlist):
484
    """Request rename of the given block devices.
485

486
    This is a single-node call.
487

488
    """
489
    params = [(d.ToDict(), uid) for d, uid in devlist]
490
    c = Client("blockdev_rename", params)
491
    c.connect(node)
492
    c.run()
493
    return c.getresult().get(node, False)
494

    
495
  def call_blockdev_assemble(self, node, disk, owner, on_primary):
496
    """Request assembling of a given block device.
497

498
    This is a single-node call.
499

500
    """
501
    params = [disk.ToDict(), owner, on_primary]
502
    c = Client("blockdev_assemble", params)
503
    c.connect(node)
504
    c.run()
505
    return c.getresult().get(node, False)
506

    
507
  def call_blockdev_shutdown(self, node, disk):
508
    """Request shutdown of a given block device.
509

510
    This is a single-node call.
511

512
    """
513
    c = Client("blockdev_shutdown", [disk.ToDict()])
514
    c.connect(node)
515
    c.run()
516
    return c.getresult().get(node, False)
517

    
518
  def call_blockdev_addchildren(self, node, bdev, ndevs):
519
    """Request adding a list of children to a (mirroring) device.
520

521
    This is a single-node call.
522

523
    """
524
    params = [bdev.ToDict(), [disk.ToDict() for disk in ndevs]]
525
    c = Client("blockdev_addchildren", params)
526
    c.connect(node)
527
    c.run()
528
    return c.getresult().get(node, False)
529

    
530
  def call_blockdev_removechildren(self, node, bdev, ndevs):
531
    """Request removing a list of children from a (mirroring) device.
532

533
    This is a single-node call.
534

535
    """
536
    params = [bdev.ToDict(), [disk.ToDict() for disk in ndevs]]
537
    c = Client("blockdev_removechildren", params)
538
    c.connect(node)
539
    c.run()
540
    return c.getresult().get(node, False)
541

    
542
  def call_blockdev_getmirrorstatus(self, node, disks):
543
    """Request status of a (mirroring) device.
544

545
    This is a single-node call.
546

547
    """
548
    params = [dsk.ToDict() for dsk in disks]
549
    c = Client("blockdev_getmirrorstatus", params)
550
    c.connect(node)
551
    c.run()
552
    return c.getresult().get(node, False)
553

    
554
  def call_blockdev_find(self, node, disk):
555
    """Request identification of a given block device.
556

557
    This is a single-node call.
558

559
    """
560
    c = Client("blockdev_find", [disk.ToDict()])
561
    c.connect(node)
562
    c.run()
563
    return c.getresult().get(node, False)
564

    
565
  def call_blockdev_close(self, node, disks):
566
    """Closes the given block devices.
567

568
    This is a single-node call.
569

570
    """
571
    params = [cf.ToDict() for cf in disks]
572
    c = Client("blockdev_close", params)
573
    c.connect(node)
574
    c.run()
575
    return c.getresult().get(node, False)
576

    
577
  @staticmethod
578
  def call_upload_file(node_list, file_name):
579
    """Upload a file.
580

581
    The node will refuse the operation in case the file is not on the
582
    approved file list.
583

584
    This is a multi-node call.
585

586
    """
587
    fh = file(file_name)
588
    try:
589
      data = fh.read()
590
    finally:
591
      fh.close()
592
    st = os.stat(file_name)
593
    params = [file_name, data, st.st_mode, st.st_uid, st.st_gid,
594
              st.st_atime, st.st_mtime]
595
    c = Client("upload_file", params)
596
    c.connect_list(node_list)
597
    c.run()
598
    return c.getresult()
599

    
600
  def call_os_diagnose(self, node_list):
601
    """Request a diagnose of OS definitions.
602

603
    This is a multi-node call.
604

605
    """
606
    c = Client("os_diagnose", [])
607
    c.connect_list(node_list)
608
    c.run()
609
    result = c.getresult()
610
    new_result = {}
611
    for node_name in result:
612
      if result[node_name]:
613
        nr = [objects.OS.FromDict(oss) for oss in result[node_name]]
614
      else:
615
        nr = []
616
      new_result[node_name] = nr
617
    return new_result
618

    
619
  def call_os_get(self, node, name):
620
    """Returns an OS definition.
621

622
    This is a single-node call.
623

624
    """
625
    c = Client("os_get", [name])
626
    c.connect(node)
627
    c.run()
628
    result = c.getresult().get(node, False)
629
    if isinstance(result, dict):
630
      return objects.OS.FromDict(result)
631
    else:
632
      return result
633

    
634
  def call_hooks_runner(self, node_list, hpath, phase, env):
635
    """Call the hooks runner.
636

637
    Args:
638
      - op: the OpCode instance
639
      - env: a dictionary with the environment
640

641
    This is a multi-node call.
642

643
    """
644
    params = [hpath, phase, env]
645
    c = Client("hooks_runner", params)
646
    c.connect_list(node_list)
647
    c.run()
648
    result = c.getresult()
649
    return result
650

    
651
  def call_iallocator_runner(self, node, name, idata):
652
    """Call an iallocator on a remote node
653

654
    Args:
655
      - name: the iallocator name
656
      - input: the json-encoded input string
657

658
    This is a single-node call.
659

660
    """
661
    params = [name, idata]
662
    c = Client("iallocator_runner", params)
663
    c.connect(node)
664
    c.run()
665
    result = c.getresult().get(node, False)
666
    return result
667

    
668
  def call_blockdev_grow(self, node, cf_bdev, amount):
669
    """Request a snapshot of the given block device.
670

671
    This is a single-node call.
672

673
    """
674
    c = Client("blockdev_grow", [cf_bdev.ToDict(), amount])
675
    c.connect(node)
676
    c.run()
677
    return c.getresult().get(node, False)
678

    
679
  def call_blockdev_snapshot(self, node, cf_bdev):
680
    """Request a snapshot of the given block device.
681

682
    This is a single-node call.
683

684
    """
685
    c = Client("blockdev_snapshot", [cf_bdev.ToDict()])
686
    c.connect(node)
687
    c.run()
688
    return c.getresult().get(node, False)
689

    
690
  def call_snapshot_export(self, node, snap_bdev, dest_node, instance,
691
                           cluster_name):
692
    """Request the export of a given snapshot.
693

694
    This is a single-node call.
695

696
    """
697
    params = [snap_bdev.ToDict(), dest_node, instance.ToDict(), cluster_name]
698
    c = Client("snapshot_export", params)
699
    c.connect(node)
700
    c.run()
701
    return c.getresult().get(node, False)
702

    
703
  def call_finalize_export(self, node, instance, snap_disks):
704
    """Request the completion of an export operation.
705

706
    This writes the export config file, etc.
707

708
    This is a single-node call.
709

710
    """
711
    flat_disks = []
712
    for disk in snap_disks:
713
      flat_disks.append(disk.ToDict())
714
    params = [instance.ToDict(), flat_disks]
715
    c = Client("finalize_export", params)
716
    c.connect(node)
717
    c.run()
718
    return c.getresult().get(node, False)
719

    
720
  def call_export_info(self, node, path):
721
    """Queries the export information in a given path.
722

723
    This is a single-node call.
724

725
    """
726
    c = Client("export_info", [path])
727
    c.connect(node)
728
    c.run()
729
    result = c.getresult().get(node, False)
730
    if not result:
731
      return result
732
    return objects.SerializableConfigParser.Loads(str(result))
733

    
734
  def call_instance_os_import(self, node, inst, osdev, swapdev,
735
                              src_node, src_image, cluster_name):
736
    """Request the import of a backup into an instance.
737

738
    This is a single-node call.
739

740
    """
741
    params = [inst.ToDict(), osdev, swapdev, src_node, src_image, cluster_name]
742
    c = Client("instance_os_import", params)
743
    c.connect(node)
744
    c.run()
745
    return c.getresult().get(node, False)
746

    
747
  def call_export_list(self, node_list):
748
    """Gets the stored exports list.
749

750
    This is a multi-node call.
751

752
    """
753
    c = Client("export_list", [])
754
    c.connect_list(node_list)
755
    c.run()
756
    result = c.getresult()
757
    return result
758

    
759
  def call_export_remove(self, node, export):
760
    """Requests removal of a given export.
761

762
    This is a single-node call.
763

764
    """
765
    c = Client("export_remove", [export])
766
    c.connect(node)
767
    c.run()
768
    return c.getresult().get(node, False)
769

    
770
  @staticmethod
771
  def call_node_leave_cluster(node):
772
    """Requests a node to clean the cluster information it has.
773

774
    This will remove the configuration information from the ganeti data
775
    dir.
776

777
    This is a single-node call.
778

779
    """
780
    c = Client("node_leave_cluster", [])
781
    c.connect(node)
782
    c.run()
783
    return c.getresult().get(node, False)
784

    
785
  def call_node_volumes(self, node_list):
786
    """Gets all volumes on node(s).
787

788
    This is a multi-node call.
789

790
    """
791
    c = Client("node_volumes", [])
792
    c.connect_list(node_list)
793
    c.run()
794
    return c.getresult()
795

    
796
  def call_test_delay(self, node_list, duration):
797
    """Sleep for a fixed time on given node(s).
798

799
    This is a multi-node call.
800

801
    """
802
    c = Client("test_delay", [duration])
803
    c.connect_list(node_list)
804
    c.run()
805
    return c.getresult()
806

    
807
  def call_file_storage_dir_create(self, node, file_storage_dir):
808
    """Create the given file storage directory.
809

810
    This is a single-node call.
811

812
    """
813
    c = Client("file_storage_dir_create", [file_storage_dir])
814
    c.connect(node)
815
    c.run()
816
    return c.getresult().get(node, False)
817

    
818
  def call_file_storage_dir_remove(self, node, file_storage_dir):
819
    """Remove the given file storage directory.
820

821
    This is a single-node call.
822

823
    """
824
    c = Client("file_storage_dir_remove", [file_storage_dir])
825
    c.connect(node)
826
    c.run()
827
    return c.getresult().get(node, False)
828

    
829
  def call_file_storage_dir_rename(self, node, old_file_storage_dir,
830
                                   new_file_storage_dir):
831
    """Rename file storage directory.
832

833
    This is a single-node call.
834

835
    """
836
    c = Client("file_storage_dir_rename",
837
               [old_file_storage_dir, new_file_storage_dir])
838
    c.connect(node)
839
    c.run()
840
    return c.getresult().get(node, False)
841

    
842
  @staticmethod
843
  def call_jobqueue_update(node_list, file_name, content):
844
    """Update job queue.
845

846
    This is a multi-node call.
847

848
    """
849
    c = Client("jobqueue_update", [file_name, content])
850
    c.connect_list(node_list)
851
    c.run()
852
    result = c.getresult()
853
    return result
854

    
855
  @staticmethod
856
  def call_jobqueue_purge(node):
857
    """Purge job queue.
858

859
    This is a single-node call.
860

861
    """
862
    c = Client("jobqueue_purge", [])
863
    c.connect(node)
864
    c.run()
865
    return c.getresult().get(node, False)
866

    
867
  @staticmethod
868
  def call_jobqueue_rename(node_list, old, new):
869
    """Rename a job queue file.
870

871
    This is a multi-node call.
872

873
    """
874
    c = Client("jobqueue_rename", [old, new])
875
    c.connect_list(node_list)
876
    c.run()
877
    result = c.getresult()
878
    return result