Statistics
| Branch: | Tag: | Revision:

root / lib / rpc.py @ 15396f60

History | View | Annotate | Download (22.5 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Script to show add a new node to the cluster
23

24
"""
25

    
26
# pylint: disable-msg=C0103,R0201,R0904
27
# C0103: Invalid name, since call_ are not valid
28
# R0201: Method could be a function, we keep all rpcs instance methods
29
# as not to change them back and forth between static/instance methods
30
# if they need to start using instance attributes
31
# R0904: Too many public methods
32

    
33
import os
34
import socket
35
import httplib
36

    
37
import simplejson
38

    
39
from ganeti import logger
40
from ganeti import utils
41
from ganeti import objects
42

    
43

    
44
class NodeController:
45
  """Node-handling class.
46

47
  For each node that we speak with, we create an instance of this
48
  class, so that we have a safe place to store the details of this
49
  individual call.
50

51
  """
52
  def __init__(self, parent, node):
53
    self.parent = parent
54
    self.node = node
55
    self.failed = False
56

    
57
    self.http_conn = hc = httplib.HTTPConnection(node, self.parent.port)
58
    try:
59
      hc.connect()
60
      hc.putrequest('PUT', "/%s" % self.parent.procedure,
61
                    skip_accept_encoding=True)
62
      hc.putheader('Content-Length', str(len(parent.body)))
63
      hc.endheaders()
64
      hc.send(parent.body)
65
    except socket.error, err:
66
      logger.Error("Error connecting to %s: %s" % (node, str(err)))
67
      self.failed = True
68

    
69
  def get_response(self):
70
    """Try to process the response from the node.
71

72
    """
73
    if self.failed:
74
      # we already failed in connect
75
      return False
76
    resp = self.http_conn.getresponse()
77
    if resp.status != 200:
78
      return False
79
    try:
80
      length = int(resp.getheader('Content-Length', '0'))
81
    except ValueError:
82
      return False
83
    if not length:
84
      logger.Error("Zero-length reply from %s" % self.node)
85
      return False
86
    payload = resp.read(length)
87
    unload = simplejson.loads(payload)
88
    return unload
89

    
90

    
91
class Client:
92
  """RPC Client class.
93

94
  This class, given a (remote) method name, a list of parameters and a
95
  list of nodes, will contact (in parallel) all nodes, and return a
96
  dict of results (key: node name, value: result).
97

98
  One current bug is that generic failure is still signalled by
99
  'False' result, which is not good. This overloading of values can
100
  cause bugs.
101

102
  """
103
  result_set = False
104
  result = False
105
  allresult = []
106

    
107
  def __init__(self, procedure, args):
108
    self.port = utils.GetNodeDaemonPort()
109
    self.nodepw = utils.GetNodeDaemonPassword()
110
    self.nc = {}
111
    self.results = {}
112
    self.procedure = procedure
113
    self.args = args
114
    self.body = simplejson.dumps(args)
115

    
116
  #--- generic connector -------------
117

    
118
  def connect_list(self, node_list):
119
    """Add a list of nodes to the target nodes.
120

121
    """
122
    for node in node_list:
123
      self.connect(node)
124

    
125
  def connect(self, connect_node):
126
    """Add a node to the target list.
127

128
    """
129
    self.nc[connect_node] = nc = NodeController(self, connect_node)
130

    
131
  def getresult(self):
132
    """Return the results of the call.
133

134
    """
135
    return self.results
136

    
137
  def run(self):
138
    """Wrapper over reactor.run().
139

140
    This function simply calls reactor.run() if we have any requests
141
    queued, otherwise it does nothing.
142

143
    """
144
    for node, nc in self.nc.items():
145
      self.results[node] = nc.get_response()
146

    
147

    
148
class RpcRunner(object):
149
  """RPC runner class"""
150

    
151
  def __init__(self, cfg):
152
    """Initialized the rpc runner.
153

154
    @type cfg:  C{config.ConfigWriter}
155
    @param cfg: the configuration object that will be used to get data
156
                about the cluster
157

158
    """
159
    self._cfg = cfg
160

    
161
  def call_volume_list(self, node_list, vg_name):
162
    """Gets the logical volumes present in a given volume group.
163

164
    This is a multi-node call.
165

166
    """
167
    c = Client("volume_list", [vg_name])
168
    c.connect_list(node_list)
169
    c.run()
170
    return c.getresult()
171

    
172
  def call_vg_list(self, node_list):
173
    """Gets the volume group list.
174

175
    This is a multi-node call.
176

177
    """
178
    c = Client("vg_list", [])
179
    c.connect_list(node_list)
180
    c.run()
181
    return c.getresult()
182

    
183

    
184
  def call_bridges_exist(self, node, bridges_list):
185
    """Checks if a node has all the bridges given.
186

187
    This method checks if all bridges given in the bridges_list are
188
    present on the remote node, so that an instance that uses interfaces
189
    on those bridges can be started.
190

191
    This is a single-node call.
192

193
    """
194
    c = Client("bridges_exist", [bridges_list])
195
    c.connect(node)
196
    c.run()
197
    return c.getresult().get(node, False)
198

    
199

    
200
  def call_instance_start(self, node, instance, extra_args):
201
    """Starts an instance.
202

203
    This is a single-node call.
204

205
    """
206
    c = Client("instance_start", [instance.ToDict(), extra_args])
207
    c.connect(node)
208
    c.run()
209
    return c.getresult().get(node, False)
210

    
211

    
212
  def call_instance_shutdown(self, node, instance):
213
    """Stops an instance.
214

215
    This is a single-node call.
216

217
    """
218
    c = Client("instance_shutdown", [instance.ToDict()])
219
    c.connect(node)
220
    c.run()
221
    return c.getresult().get(node, False)
222

    
223

    
224
  def call_instance_migrate(self, node, instance, target, live):
225
    """Migrate an instance.
226

227
    This is a single-node call.
228

229
    @type node: string
230
    @param node: the node on which the instance is currently running
231
    @type instance: C{objects.Instance}
232
    @param instance: the instance definition
233
    @type target: string
234
    @param target: the target node name
235
    @type live: boolean
236
    @param live: whether the migration should be done live or not (the
237
        interpretation of this parameter is left to the hypervisor)
238

239
    """
240
    c = Client("instance_migrate", [instance.ToDict(), target, live])
241
    c.connect(node)
242
    c.run()
243
    return c.getresult().get(node, False)
244

    
245

    
246
  def call_instance_reboot(self, node, instance, reboot_type, extra_args):
247
    """Reboots an instance.
248

249
    This is a single-node call.
250

251
    """
252
    c = Client("instance_reboot", [instance.ToDict(), reboot_type, extra_args])
253
    c.connect(node)
254
    c.run()
255
    return c.getresult().get(node, False)
256

    
257

    
258
  def call_instance_os_add(self, node, inst, osdev, swapdev):
259
    """Installs an OS on the given instance.
260

261
    This is a single-node call.
262

263
    """
264
    params = [inst.ToDict(), osdev, swapdev]
265
    c = Client("instance_os_add", params)
266
    c.connect(node)
267
    c.run()
268
    return c.getresult().get(node, False)
269

    
270

    
271
  def call_instance_run_rename(self, node, inst, old_name, osdev, swapdev):
272
    """Run the OS rename script for an instance.
273

274
    This is a single-node call.
275

276
    """
277
    params = [inst.ToDict(), old_name, osdev, swapdev]
278
    c = Client("instance_run_rename", params)
279
    c.connect(node)
280
    c.run()
281
    return c.getresult().get(node, False)
282

    
283

    
284
  def call_instance_info(self, node, instance, hname):
285
    """Returns information about a single instance.
286

287
    This is a single-node call.
288

289
    @type node_list: list
290
    @param node_list: the list of nodes to query
291
    @type instance: string
292
    @param instance: the instance name
293
    @type hname: string
294
    @param hname: the hypervisor type of the instance
295

296
    """
297
    c = Client("instance_info", [instance])
298
    c.connect(node)
299
    c.run()
300
    return c.getresult().get(node, False)
301

    
302

    
303
  def call_all_instances_info(self, node_list, hypervisor_list):
304
    """Returns information about all instances on the given nodes.
305

306
    This is a multi-node call.
307

308
    @type node_list: list
309
    @param node_list: the list of nodes to query
310
    @type hypervisor_list: list
311
    @param hypervisor_list: the hypervisors to query for instances
312

313
    """
314
    c = Client("all_instances_info", [hypervisor_list])
315
    c.connect_list(node_list)
316
    c.run()
317
    return c.getresult()
318

    
319

    
320
  def call_instance_list(self, node_list, hypervisor_list):
321
    """Returns the list of running instances on a given node.
322

323
    This is a multi-node call.
324

325
    @type node_list: list
326
    @param node_list: the list of nodes to query
327
    @type hypervisor_list: list
328
    @param hypervisor_list: the hypervisors to query for instances
329

330
    """
331
    c = Client("instance_list", [hypervisor_list])
332
    c.connect_list(node_list)
333
    c.run()
334
    return c.getresult()
335

    
336

    
337
  def call_node_tcp_ping(self, node, source, target, port, timeout,
338
                         live_port_needed):
339
    """Do a TcpPing on the remote node
340

341
    This is a single-node call.
342
    """
343
    c = Client("node_tcp_ping", [source, target, port, timeout,
344
                                 live_port_needed])
345
    c.connect(node)
346
    c.run()
347
    return c.getresult().get(node, False)
348

    
349

    
350
  def call_node_info(self, node_list, vg_name, hypervisor_type):
351
    """Return node information.
352

353
    This will return memory information and volume group size and free
354
    space.
355

356
    This is a multi-node call.
357

358
    @type node_list: list
359
    @param node_list: the list of nodes to query
360
    @type vgname: C{string}
361
    @param vgname: the name of the volume group to ask for disk space
362
        information
363
    @type hypervisor_type: C{str}
364
    @param hypervisor_type: the name of the hypervisor to ask for
365
        memory information
366

367
    """
368
    c = Client("node_info", [vg_name, hypervisor_type])
369
    c.connect_list(node_list)
370
    c.run()
371
    retux = c.getresult()
372

    
373
    for node_name in retux:
374
      ret = retux.get(node_name, False)
375
      if type(ret) != dict:
376
        logger.Error("could not connect to node %s" % (node_name))
377
        ret = {}
378

    
379
      utils.CheckDict(ret,
380
                      { 'memory_total' : '-',
381
                        'memory_dom0' : '-',
382
                        'memory_free' : '-',
383
                        'vg_size' : 'node_unreachable',
384
                        'vg_free' : '-' },
385
                      "call_node_info",
386
                      )
387
    return retux
388

    
389

    
390
  def call_node_add(self, node, dsa, dsapub, rsa, rsapub, ssh, sshpub):
391
    """Add a node to the cluster.
392

393
    This is a single-node call.
394

395
    """
396
    params = [dsa, dsapub, rsa, rsapub, ssh, sshpub]
397
    c = Client("node_add", params)
398
    c.connect(node)
399
    c.run()
400
    return c.getresult().get(node, False)
401

    
402

    
403
  def call_node_verify(self, node_list, checkdict, cluster_name):
404
    """Request verification of given parameters.
405

406
    This is a multi-node call.
407

408
    """
409
    c = Client("node_verify", [checkdict, cluster_name])
410
    c.connect_list(node_list)
411
    c.run()
412
    return c.getresult()
413

    
414

    
415
  @staticmethod
416
  def call_node_start_master(node, start_daemons):
417
    """Tells a node to activate itself as a master.
418

419
    This is a single-node call.
420

421
    """
422
    c = Client("node_start_master", [start_daemons])
423
    c.connect(node)
424
    c.run()
425
    return c.getresult().get(node, False)
426

    
427

    
428
  @staticmethod
429
  def call_node_stop_master(node, stop_daemons):
430
    """Tells a node to demote itself from master status.
431

432
    This is a single-node call.
433

434
    """
435
    c = Client("node_stop_master", [stop_daemons])
436
    c.connect(node)
437
    c.run()
438
    return c.getresult().get(node, False)
439

    
440

    
441
  @staticmethod
442
  def call_master_info(node_list):
443
    """Query master info.
444

445
    This is a multi-node call.
446

447
    """
448
    # TODO: should this method query down nodes?
449
    c = Client("master_info", [])
450
    c.connect_list(node_list)
451
    c.run()
452
    return c.getresult()
453

    
454

    
455
  def call_version(self, node_list):
456
    """Query node version.
457

458
    This is a multi-node call.
459

460
    """
461
    c = Client("version", [])
462
    c.connect_list(node_list)
463
    c.run()
464
    return c.getresult()
465

    
466

    
467
  def call_blockdev_create(self, node, bdev, size, owner, on_primary, info):
468
    """Request creation of a given block device.
469

470
    This is a single-node call.
471

472
    """
473
    params = [bdev.ToDict(), size, owner, on_primary, info]
474
    c = Client("blockdev_create", params)
475
    c.connect(node)
476
    c.run()
477
    return c.getresult().get(node, False)
478

    
479

    
480
  def call_blockdev_remove(self, node, bdev):
481
    """Request removal of a given block device.
482

483
    This is a single-node call.
484

485
    """
486
    c = Client("blockdev_remove", [bdev.ToDict()])
487
    c.connect(node)
488
    c.run()
489
    return c.getresult().get(node, False)
490

    
491

    
492
  def call_blockdev_rename(self, node, devlist):
493
    """Request rename of the given block devices.
494

495
    This is a single-node call.
496

497
    """
498
    params = [(d.ToDict(), uid) for d, uid in devlist]
499
    c = Client("blockdev_rename", params)
500
    c.connect(node)
501
    c.run()
502
    return c.getresult().get(node, False)
503

    
504

    
505
  def call_blockdev_assemble(self, node, disk, owner, on_primary):
506
    """Request assembling of a given block device.
507

508
    This is a single-node call.
509

510
    """
511
    params = [disk.ToDict(), owner, on_primary]
512
    c = Client("blockdev_assemble", params)
513
    c.connect(node)
514
    c.run()
515
    return c.getresult().get(node, False)
516

    
517

    
518
  def call_blockdev_shutdown(self, node, disk):
519
    """Request shutdown of a given block device.
520

521
    This is a single-node call.
522

523
    """
524
    c = Client("blockdev_shutdown", [disk.ToDict()])
525
    c.connect(node)
526
    c.run()
527
    return c.getresult().get(node, False)
528

    
529

    
530
  def call_blockdev_addchildren(self, node, bdev, ndevs):
531
    """Request adding a list of children to a (mirroring) device.
532

533
    This is a single-node call.
534

535
    """
536
    params = [bdev.ToDict(), [disk.ToDict() for disk in ndevs]]
537
    c = Client("blockdev_addchildren", params)
538
    c.connect(node)
539
    c.run()
540
    return c.getresult().get(node, False)
541

    
542

    
543
  def call_blockdev_removechildren(self, node, bdev, ndevs):
544
    """Request removing a list of children from a (mirroring) device.
545

546
    This is a single-node call.
547

548
    """
549
    params = [bdev.ToDict(), [disk.ToDict() for disk in ndevs]]
550
    c = Client("blockdev_removechildren", params)
551
    c.connect(node)
552
    c.run()
553
    return c.getresult().get(node, False)
554

    
555

    
556
  def call_blockdev_getmirrorstatus(self, node, disks):
557
    """Request status of a (mirroring) device.
558

559
    This is a single-node call.
560

561
    """
562
    params = [dsk.ToDict() for dsk in disks]
563
    c = Client("blockdev_getmirrorstatus", params)
564
    c.connect(node)
565
    c.run()
566
    return c.getresult().get(node, False)
567

    
568

    
569
  def call_blockdev_find(self, node, disk):
570
    """Request identification of a given block device.
571

572
    This is a single-node call.
573

574
    """
575
    c = Client("blockdev_find", [disk.ToDict()])
576
    c.connect(node)
577
    c.run()
578
    return c.getresult().get(node, False)
579

    
580

    
581
  def call_blockdev_close(self, node, disks):
582
    """Closes the given block devices.
583

584
    This is a single-node call.
585

586
    """
587
    params = [cf.ToDict() for cf in disks]
588
    c = Client("blockdev_close", params)
589
    c.connect(node)
590
    c.run()
591
    return c.getresult().get(node, False)
592

    
593

    
594
  @staticmethod
595
  def call_upload_file(node_list, file_name):
596
    """Upload a file.
597

598
    The node will refuse the operation in case the file is not on the
599
    approved file list.
600

601
    This is a multi-node call.
602

603
    """
604
    fh = file(file_name)
605
    try:
606
      data = fh.read()
607
    finally:
608
      fh.close()
609
    st = os.stat(file_name)
610
    params = [file_name, data, st.st_mode, st.st_uid, st.st_gid,
611
              st.st_atime, st.st_mtime]
612
    c = Client("upload_file", params)
613
    c.connect_list(node_list)
614
    c.run()
615
    return c.getresult()
616

    
617
  @staticmethod
618
  def call_upload_file(node_list, file_name):
619
    """Upload a file.
620

621
    The node will refuse the operation in case the file is not on the
622
    approved file list.
623

624
    This is a multi-node call.
625

626
    """
627
    fh = file(file_name)
628
    try:
629
      data = fh.read()
630
    finally:
631
      fh.close()
632
    st = os.stat(file_name)
633
    params = [file_name, data, st.st_mode, st.st_uid, st.st_gid,
634
              st.st_atime, st.st_mtime]
635
    c = Client("upload_file", params)
636
    c.connect_list(node_list)
637
    c.run()
638
    return c.getresult()
639

    
640
  def call_os_diagnose(self, node_list):
641
    """Request a diagnose of OS definitions.
642

643
    This is a multi-node call.
644

645
    """
646
    c = Client("os_diagnose", [])
647
    c.connect_list(node_list)
648
    c.run()
649
    result = c.getresult()
650
    new_result = {}
651
    for node_name in result:
652
      if result[node_name]:
653
        nr = [objects.OS.FromDict(oss) for oss in result[node_name]]
654
      else:
655
        nr = []
656
      new_result[node_name] = nr
657
    return new_result
658

    
659

    
660
  def call_os_get(self, node, name):
661
    """Returns an OS definition.
662

663
    This is a single-node call.
664

665
    """
666
    c = Client("os_get", [name])
667
    c.connect(node)
668
    c.run()
669
    result = c.getresult().get(node, False)
670
    if isinstance(result, dict):
671
      return objects.OS.FromDict(result)
672
    else:
673
      return result
674

    
675

    
676
  def call_hooks_runner(self, node_list, hpath, phase, env):
677
    """Call the hooks runner.
678

679
    Args:
680
      - op: the OpCode instance
681
      - env: a dictionary with the environment
682

683
    This is a multi-node call.
684

685
    """
686
    params = [hpath, phase, env]
687
    c = Client("hooks_runner", params)
688
    c.connect_list(node_list)
689
    c.run()
690
    result = c.getresult()
691
    return result
692

    
693

    
694
  def call_iallocator_runner(self, node, name, idata):
695
    """Call an iallocator on a remote node
696

697
    Args:
698
      - name: the iallocator name
699
      - input: the json-encoded input string
700

701
    This is a single-node call.
702

703
    """
704
    params = [name, idata]
705
    c = Client("iallocator_runner", params)
706
    c.connect(node)
707
    c.run()
708
    result = c.getresult().get(node, False)
709
    return result
710

    
711

    
712
  def call_blockdev_grow(self, node, cf_bdev, amount):
713
    """Request a snapshot of the given block device.
714

715
    This is a single-node call.
716

717
    """
718
    c = Client("blockdev_grow", [cf_bdev.ToDict(), amount])
719
    c.connect(node)
720
    c.run()
721
    return c.getresult().get(node, False)
722

    
723

    
724
  def call_blockdev_snapshot(self, node, cf_bdev):
725
    """Request a snapshot of the given block device.
726

727
    This is a single-node call.
728

729
    """
730
    c = Client("blockdev_snapshot", [cf_bdev.ToDict()])
731
    c.connect(node)
732
    c.run()
733
    return c.getresult().get(node, False)
734

    
735

    
736
  def call_snapshot_export(self, node, snap_bdev, dest_node, instance,
737
                           cluster_name):
738
    """Request the export of a given snapshot.
739

740
    This is a single-node call.
741

742
    """
743
    params = [snap_bdev.ToDict(), dest_node, instance.ToDict(), cluster_name]
744
    c = Client("snapshot_export", params)
745
    c.connect(node)
746
    c.run()
747
    return c.getresult().get(node, False)
748

    
749

    
750
  def call_finalize_export(self, node, instance, snap_disks):
751
    """Request the completion of an export operation.
752

753
    This writes the export config file, etc.
754

755
    This is a single-node call.
756

757
    """
758
    flat_disks = []
759
    for disk in snap_disks:
760
      flat_disks.append(disk.ToDict())
761
    params = [instance.ToDict(), flat_disks]
762
    c = Client("finalize_export", params)
763
    c.connect(node)
764
    c.run()
765
    return c.getresult().get(node, False)
766

    
767

    
768
  def call_export_info(self, node, path):
769
    """Queries the export information in a given path.
770

771
    This is a single-node call.
772

773
    """
774
    c = Client("export_info", [path])
775
    c.connect(node)
776
    c.run()
777
    result = c.getresult().get(node, False)
778
    if not result:
779
      return result
780
    return objects.SerializableConfigParser.Loads(str(result))
781

    
782

    
783
  def call_instance_os_import(self, node, inst, osdev, swapdev,
784
                              src_node, src_image, cluster_name):
785
    """Request the import of a backup into an instance.
786

787
    This is a single-node call.
788

789
    """
790
    params = [inst.ToDict(), osdev, swapdev, src_node, src_image, cluster_name]
791
    c = Client("instance_os_import", params)
792
    c.connect(node)
793
    c.run()
794
    return c.getresult().get(node, False)
795

    
796

    
797
  def call_export_list(self, node_list):
798
    """Gets the stored exports list.
799

800
    This is a multi-node call.
801

802
    """
803
    c = Client("export_list", [])
804
    c.connect_list(node_list)
805
    c.run()
806
    result = c.getresult()
807
    return result
808

    
809

    
810
  def call_export_remove(self, node, export):
811
    """Requests removal of a given export.
812

813
    This is a single-node call.
814

815
    """
816
    c = Client("export_remove", [export])
817
    c.connect(node)
818
    c.run()
819
    return c.getresult().get(node, False)
820

    
821

    
822
  @staticmethod
823
  def call_node_leave_cluster(node):
824
    """Requests a node to clean the cluster information it has.
825

826
    This will remove the configuration information from the ganeti data
827
    dir.
828

829
    This is a single-node call.
830

831
    """
832
    c = Client("node_leave_cluster", [])
833
    c.connect(node)
834
    c.run()
835
    return c.getresult().get(node, False)
836

    
837

    
838
  def call_node_volumes(self, node_list):
839
    """Gets all volumes on node(s).
840

841
    This is a multi-node call.
842

843
    """
844
    c = Client("node_volumes", [])
845
    c.connect_list(node_list)
846
    c.run()
847
    return c.getresult()
848

    
849

    
850
  def call_test_delay(self, node_list, duration):
851
    """Sleep for a fixed time on given node(s).
852

853
    This is a multi-node call.
854

855
    """
856
    c = Client("test_delay", [duration])
857
    c.connect_list(node_list)
858
    c.run()
859
    return c.getresult()
860

    
861

    
862
  def call_file_storage_dir_create(self, node, file_storage_dir):
863
    """Create the given file storage directory.
864

865
    This is a single-node call.
866

867
    """
868
    c = Client("file_storage_dir_create", [file_storage_dir])
869
    c.connect(node)
870
    c.run()
871
    return c.getresult().get(node, False)
872

    
873

    
874
  def call_file_storage_dir_remove(self, node, file_storage_dir):
875
    """Remove the given file storage directory.
876

877
    This is a single-node call.
878

879
    """
880
    c = Client("file_storage_dir_remove", [file_storage_dir])
881
    c.connect(node)
882
    c.run()
883
    return c.getresult().get(node, False)
884

    
885

    
886
  def call_file_storage_dir_rename(self, node, old_file_storage_dir,
887
                                   new_file_storage_dir):
888
    """Rename file storage directory.
889

890
    This is a single-node call.
891

892
    """
893
    c = Client("file_storage_dir_rename",
894
               [old_file_storage_dir, new_file_storage_dir])
895
    c.connect(node)
896
    c.run()
897
    return c.getresult().get(node, False)
898

    
899

    
900
  @staticmethod
901
  def call_jobqueue_update(node_list, file_name, content):
902
    """Update job queue.
903

904
    This is a multi-node call.
905

906
    """
907
    c = Client("jobqueue_update", [file_name, content])
908
    c.connect_list(node_list)
909
    c.run()
910
    result = c.getresult()
911
    return result
912

    
913

    
914
  @staticmethod
915
  def call_jobqueue_purge(node):
916
    """Purge job queue.
917

918
    This is a single-node call.
919

920
    """
921
    c = Client("jobqueue_purge", [])
922
    c.connect(node)
923
    c.run()
924
    return c.getresult().get(node, False)
925

    
926

    
927
  @staticmethod
928
  def call_jobqueue_rename(node_list, old, new):
929
    """Rename a job queue file.
930

931
    This is a multi-node call.
932

933
    """
934
    c = Client("jobqueue_rename", [old, new])
935
    c.connect_list(node_list)
936
    c.run()
937
    result = c.getresult()
938
    return result