Statistics
| Branch: | Tag: | Revision:

root / lib / rpc.py @ caad16e2

History | View | Annotate | Download (22.7 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Script to show add a new node to the cluster
23

24
"""
25

    
26
# pylint: disable-msg=C0103,R0201,R0904
27
# C0103: Invalid name, since call_ are not valid
28
# R0201: Method could be a function, we keep all rpcs instance methods
29
# as not to change them back and forth between static/instance methods
30
# if they need to start using instance attributes
31
# R0904: Too many public methods
32

    
33
import os
34
import socket
35
import httplib
36

    
37
import simplejson
38

    
39
from ganeti import logger
40
from ganeti import utils
41
from ganeti import objects
42

    
43

    
44
class NodeController:
45
  """Node-handling class.
46

47
  For each node that we speak with, we create an instance of this
48
  class, so that we have a safe place to store the details of this
49
  individual call.
50

51
  """
52
  def __init__(self, parent, node):
53
    self.parent = parent
54
    self.node = node
55
    self.failed = False
56

    
57
    self.http_conn = hc = httplib.HTTPConnection(node, self.parent.port)
58
    try:
59
      hc.connect()
60
      hc.putrequest('PUT', "/%s" % self.parent.procedure,
61
                    skip_accept_encoding=True)
62
      hc.putheader('Content-Length', str(len(parent.body)))
63
      hc.endheaders()
64
      hc.send(parent.body)
65
    except socket.error, err:
66
      logger.Error("Error connecting to %s: %s" % (node, str(err)))
67
      self.failed = True
68

    
69
  def get_response(self):
70
    """Try to process the response from the node.
71

72
    """
73
    if self.failed:
74
      # we already failed in connect
75
      return False
76
    resp = self.http_conn.getresponse()
77
    if resp.status != 200:
78
      return False
79
    try:
80
      length = int(resp.getheader('Content-Length', '0'))
81
    except ValueError:
82
      return False
83
    if not length:
84
      logger.Error("Zero-length reply from %s" % self.node)
85
      return False
86
    payload = resp.read(length)
87
    unload = simplejson.loads(payload)
88
    return unload
89

    
90

    
91
class Client:
92
  """RPC Client class.
93

94
  This class, given a (remote) method name, a list of parameters and a
95
  list of nodes, will contact (in parallel) all nodes, and return a
96
  dict of results (key: node name, value: result).
97

98
  One current bug is that generic failure is still signalled by
99
  'False' result, which is not good. This overloading of values can
100
  cause bugs.
101

102
  """
103
  result_set = False
104
  result = False
105
  allresult = []
106

    
107
  def __init__(self, procedure, args):
108
    self.port = utils.GetNodeDaemonPort()
109
    self.nodepw = utils.GetNodeDaemonPassword()
110
    self.nc = {}
111
    self.results = {}
112
    self.procedure = procedure
113
    self.args = args
114
    self.body = simplejson.dumps(args)
115

    
116
  #--- generic connector -------------
117

    
118
  def connect_list(self, node_list):
119
    """Add a list of nodes to the target nodes.
120

121
    """
122
    for node in node_list:
123
      self.connect(node)
124

    
125
  def connect(self, connect_node):
126
    """Add a node to the target list.
127

128
    """
129
    self.nc[connect_node] = nc = NodeController(self, connect_node)
130

    
131
  def getresult(self):
132
    """Return the results of the call.
133

134
    """
135
    return self.results
136

    
137
  def run(self):
138
    """Wrapper over reactor.run().
139

140
    This function simply calls reactor.run() if we have any requests
141
    queued, otherwise it does nothing.
142

143
    """
144
    for node, nc in self.nc.items():
145
      self.results[node] = nc.get_response()
146

    
147

    
148
class RpcRunner(object):
149
  """RPC runner class"""
150

    
151
  def __init__(self, cfg):
152
    """Initialized the rpc runner.
153

154
    @type cfg:  C{config.ConfigWriter}
155
    @param cfg: the configuration object that will be used to get data
156
                about the cluster
157

158
    """
159
    self._cfg = cfg
160

    
161
  def call_volume_list(self, node_list, vg_name):
162
    """Gets the logical volumes present in a given volume group.
163

164
    This is a multi-node call.
165

166
    """
167
    c = Client("volume_list", [vg_name])
168
    c.connect_list(node_list)
169
    c.run()
170
    return c.getresult()
171

    
172
  def call_vg_list(self, node_list):
173
    """Gets the volume group list.
174

175
    This is a multi-node call.
176

177
    """
178
    c = Client("vg_list", [])
179
    c.connect_list(node_list)
180
    c.run()
181
    return c.getresult()
182

    
183

    
184
  def call_bridges_exist(self, node, bridges_list):
185
    """Checks if a node has all the bridges given.
186

187
    This method checks if all bridges given in the bridges_list are
188
    present on the remote node, so that an instance that uses interfaces
189
    on those bridges can be started.
190

191
    This is a single-node call.
192

193
    """
194
    c = Client("bridges_exist", [bridges_list])
195
    c.connect(node)
196
    c.run()
197
    return c.getresult().get(node, False)
198

    
199

    
200
  def call_instance_start(self, node, instance, extra_args):
201
    """Starts an instance.
202

203
    This is a single-node call.
204

205
    """
206
    c = Client("instance_start", [instance.ToDict(), extra_args])
207
    c.connect(node)
208
    c.run()
209
    return c.getresult().get(node, False)
210

    
211

    
212
  def call_instance_shutdown(self, node, instance):
213
    """Stops an instance.
214

215
    This is a single-node call.
216

217
    """
218
    c = Client("instance_shutdown", [instance.ToDict()])
219
    c.connect(node)
220
    c.run()
221
    return c.getresult().get(node, False)
222

    
223

    
224
  def call_instance_migrate(self, node, instance, target, live):
225
    """Migrate an instance.
226

227
    This is a single-node call.
228

229
    @type node: string
230
    @param node: the node on which the instance is currently running
231
    @type instance: C{objects.Instance}
232
    @param instance: the instance definition
233
    @type target: string
234
    @param target: the target node name
235
    @type live: boolean
236
    @param live: whether the migration should be done live or not (the
237
        interpretation of this parameter is left to the hypervisor)
238

239
    """
240
    c = Client("instance_migrate", [instance.ToDict(), target, live])
241
    c.connect(node)
242
    c.run()
243
    return c.getresult().get(node, False)
244

    
245

    
246
  def call_instance_reboot(self, node, instance, reboot_type, extra_args):
247
    """Reboots an instance.
248

249
    This is a single-node call.
250

251
    """
252
    c = Client("instance_reboot", [instance.ToDict(), reboot_type, extra_args])
253
    c.connect(node)
254
    c.run()
255
    return c.getresult().get(node, False)
256

    
257

    
258
  def call_instance_os_add(self, node, inst, osdev, swapdev):
259
    """Installs an OS on the given instance.
260

261
    This is a single-node call.
262

263
    """
264
    params = [inst.ToDict(), osdev, swapdev]
265
    c = Client("instance_os_add", params)
266
    c.connect(node)
267
    c.run()
268
    return c.getresult().get(node, False)
269

    
270

    
271
  def call_instance_run_rename(self, node, inst, old_name, osdev, swapdev):
272
    """Run the OS rename script for an instance.
273

274
    This is a single-node call.
275

276
    """
277
    params = [inst.ToDict(), old_name, osdev, swapdev]
278
    c = Client("instance_run_rename", params)
279
    c.connect(node)
280
    c.run()
281
    return c.getresult().get(node, False)
282

    
283

    
284
  def call_instance_info(self, node, instance, hname):
285
    """Returns information about a single instance.
286

287
    This is a single-node call.
288

289
    @type node_list: list
290
    @param node_list: the list of nodes to query
291
    @type instance: string
292
    @param instance: the instance name
293
    @type hname: string
294
    @param hname: the hypervisor type of the instance
295

296
    """
297
    c = Client("instance_info", [instance])
298
    c.connect(node)
299
    c.run()
300
    return c.getresult().get(node, False)
301

    
302

    
303
  def call_all_instances_info(self, node_list, hypervisor_list):
304
    """Returns information about all instances on the given nodes.
305

306
    This is a multi-node call.
307

308
    @type node_list: list
309
    @param node_list: the list of nodes to query
310
    @type hypervisor_list: list
311
    @param hypervisor_list: the hypervisors to query for instances
312

313
    """
314
    c = Client("all_instances_info", [hypervisor_list])
315
    c.connect_list(node_list)
316
    c.run()
317
    return c.getresult()
318

    
319

    
320
  def call_instance_list(self, node_list, hypervisor_list):
321
    """Returns the list of running instances on a given node.
322

323
    This is a multi-node call.
324

325
    @type node_list: list
326
    @param node_list: the list of nodes to query
327
    @type hypervisor_list: list
328
    @param hypervisor_list: the hypervisors to query for instances
329

330
    """
331
    c = Client("instance_list", [hypervisor_list])
332
    c.connect_list(node_list)
333
    c.run()
334
    return c.getresult()
335

    
336

    
337
  def call_node_tcp_ping(self, node, source, target, port, timeout,
338
                         live_port_needed):
339
    """Do a TcpPing on the remote node
340

341
    This is a single-node call.
342

343
    """
344
    c = Client("node_tcp_ping", [source, target, port, timeout,
345
                                 live_port_needed])
346
    c.connect(node)
347
    c.run()
348
    return c.getresult().get(node, False)
349

    
350
  def call_node_has_ip_address(self, node, address):
351
    """Checks if a node has the given IP address.
352

353
    This is a single-node call.
354

355
    """
356
    c = Client("node_has_ip_address", [address])
357
    c.connect(node)
358
    c.run()
359
    return c.getresult().get(node, False)
360

    
361
  def call_node_info(self, node_list, vg_name, hypervisor_type):
362
    """Return node information.
363

364
    This will return memory information and volume group size and free
365
    space.
366

367
    This is a multi-node call.
368

369
    @type node_list: list
370
    @param node_list: the list of nodes to query
371
    @type vgname: C{string}
372
    @param vgname: the name of the volume group to ask for disk space
373
        information
374
    @type hypervisor_type: C{str}
375
    @param hypervisor_type: the name of the hypervisor to ask for
376
        memory information
377

378
    """
379
    c = Client("node_info", [vg_name, hypervisor_type])
380
    c.connect_list(node_list)
381
    c.run()
382
    retux = c.getresult()
383

    
384
    for node_name in retux:
385
      ret = retux.get(node_name, False)
386
      if type(ret) != dict:
387
        logger.Error("could not connect to node %s" % (node_name))
388
        ret = {}
389

    
390
      utils.CheckDict(ret,
391
                      { 'memory_total' : '-',
392
                        'memory_dom0' : '-',
393
                        'memory_free' : '-',
394
                        'vg_size' : 'node_unreachable',
395
                        'vg_free' : '-' },
396
                      "call_node_info",
397
                      )
398
    return retux
399

    
400

    
401
  def call_node_add(self, node, dsa, dsapub, rsa, rsapub, ssh, sshpub):
402
    """Add a node to the cluster.
403

404
    This is a single-node call.
405

406
    """
407
    params = [dsa, dsapub, rsa, rsapub, ssh, sshpub]
408
    c = Client("node_add", params)
409
    c.connect(node)
410
    c.run()
411
    return c.getresult().get(node, False)
412

    
413

    
414
  def call_node_verify(self, node_list, checkdict, cluster_name):
415
    """Request verification of given parameters.
416

417
    This is a multi-node call.
418

419
    """
420
    c = Client("node_verify", [checkdict, cluster_name])
421
    c.connect_list(node_list)
422
    c.run()
423
    return c.getresult()
424

    
425

    
426
  @staticmethod
427
  def call_node_start_master(node, start_daemons):
428
    """Tells a node to activate itself as a master.
429

430
    This is a single-node call.
431

432
    """
433
    c = Client("node_start_master", [start_daemons])
434
    c.connect(node)
435
    c.run()
436
    return c.getresult().get(node, False)
437

    
438

    
439
  @staticmethod
440
  def call_node_stop_master(node, stop_daemons):
441
    """Tells a node to demote itself from master status.
442

443
    This is a single-node call.
444

445
    """
446
    c = Client("node_stop_master", [stop_daemons])
447
    c.connect(node)
448
    c.run()
449
    return c.getresult().get(node, False)
450

    
451

    
452
  @staticmethod
453
  def call_master_info(node_list):
454
    """Query master info.
455

456
    This is a multi-node call.
457

458
    """
459
    # TODO: should this method query down nodes?
460
    c = Client("master_info", [])
461
    c.connect_list(node_list)
462
    c.run()
463
    return c.getresult()
464

    
465

    
466
  def call_version(self, node_list):
467
    """Query node version.
468

469
    This is a multi-node call.
470

471
    """
472
    c = Client("version", [])
473
    c.connect_list(node_list)
474
    c.run()
475
    return c.getresult()
476

    
477

    
478
  def call_blockdev_create(self, node, bdev, size, owner, on_primary, info):
479
    """Request creation of a given block device.
480

481
    This is a single-node call.
482

483
    """
484
    params = [bdev.ToDict(), size, owner, on_primary, info]
485
    c = Client("blockdev_create", params)
486
    c.connect(node)
487
    c.run()
488
    return c.getresult().get(node, False)
489

    
490

    
491
  def call_blockdev_remove(self, node, bdev):
492
    """Request removal of a given block device.
493

494
    This is a single-node call.
495

496
    """
497
    c = Client("blockdev_remove", [bdev.ToDict()])
498
    c.connect(node)
499
    c.run()
500
    return c.getresult().get(node, False)
501

    
502

    
503
  def call_blockdev_rename(self, node, devlist):
504
    """Request rename of the given block devices.
505

506
    This is a single-node call.
507

508
    """
509
    params = [(d.ToDict(), uid) for d, uid in devlist]
510
    c = Client("blockdev_rename", params)
511
    c.connect(node)
512
    c.run()
513
    return c.getresult().get(node, False)
514

    
515

    
516
  def call_blockdev_assemble(self, node, disk, owner, on_primary):
517
    """Request assembling of a given block device.
518

519
    This is a single-node call.
520

521
    """
522
    params = [disk.ToDict(), owner, on_primary]
523
    c = Client("blockdev_assemble", params)
524
    c.connect(node)
525
    c.run()
526
    return c.getresult().get(node, False)
527

    
528

    
529
  def call_blockdev_shutdown(self, node, disk):
530
    """Request shutdown of a given block device.
531

532
    This is a single-node call.
533

534
    """
535
    c = Client("blockdev_shutdown", [disk.ToDict()])
536
    c.connect(node)
537
    c.run()
538
    return c.getresult().get(node, False)
539

    
540

    
541
  def call_blockdev_addchildren(self, node, bdev, ndevs):
542
    """Request adding a list of children to a (mirroring) device.
543

544
    This is a single-node call.
545

546
    """
547
    params = [bdev.ToDict(), [disk.ToDict() for disk in ndevs]]
548
    c = Client("blockdev_addchildren", params)
549
    c.connect(node)
550
    c.run()
551
    return c.getresult().get(node, False)
552

    
553

    
554
  def call_blockdev_removechildren(self, node, bdev, ndevs):
555
    """Request removing a list of children from a (mirroring) device.
556

557
    This is a single-node call.
558

559
    """
560
    params = [bdev.ToDict(), [disk.ToDict() for disk in ndevs]]
561
    c = Client("blockdev_removechildren", params)
562
    c.connect(node)
563
    c.run()
564
    return c.getresult().get(node, False)
565

    
566

    
567
  def call_blockdev_getmirrorstatus(self, node, disks):
568
    """Request status of a (mirroring) device.
569

570
    This is a single-node call.
571

572
    """
573
    params = [dsk.ToDict() for dsk in disks]
574
    c = Client("blockdev_getmirrorstatus", params)
575
    c.connect(node)
576
    c.run()
577
    return c.getresult().get(node, False)
578

    
579

    
580
  def call_blockdev_find(self, node, disk):
581
    """Request identification of a given block device.
582

583
    This is a single-node call.
584

585
    """
586
    c = Client("blockdev_find", [disk.ToDict()])
587
    c.connect(node)
588
    c.run()
589
    return c.getresult().get(node, False)
590

    
591

    
592
  def call_blockdev_close(self, node, disks):
593
    """Closes the given block devices.
594

595
    This is a single-node call.
596

597
    """
598
    params = [cf.ToDict() for cf in disks]
599
    c = Client("blockdev_close", params)
600
    c.connect(node)
601
    c.run()
602
    return c.getresult().get(node, False)
603

    
604

    
605
  @staticmethod
606
  def call_upload_file(node_list, file_name):
607
    """Upload a file.
608

609
    The node will refuse the operation in case the file is not on the
610
    approved file list.
611

612
    This is a multi-node call.
613

614
    """
615
    fh = file(file_name)
616
    try:
617
      data = fh.read()
618
    finally:
619
      fh.close()
620
    st = os.stat(file_name)
621
    params = [file_name, data, st.st_mode, st.st_uid, st.st_gid,
622
              st.st_atime, st.st_mtime]
623
    c = Client("upload_file", params)
624
    c.connect_list(node_list)
625
    c.run()
626
    return c.getresult()
627

    
628
  @staticmethod
629
  def call_upload_file(node_list, file_name):
630
    """Upload a file.
631

632
    The node will refuse the operation in case the file is not on the
633
    approved file list.
634

635
    This is a multi-node call.
636

637
    """
638
    fh = file(file_name)
639
    try:
640
      data = fh.read()
641
    finally:
642
      fh.close()
643
    st = os.stat(file_name)
644
    params = [file_name, data, st.st_mode, st.st_uid, st.st_gid,
645
              st.st_atime, st.st_mtime]
646
    c = Client("upload_file", params)
647
    c.connect_list(node_list)
648
    c.run()
649
    return c.getresult()
650

    
651
  def call_os_diagnose(self, node_list):
652
    """Request a diagnose of OS definitions.
653

654
    This is a multi-node call.
655

656
    """
657
    c = Client("os_diagnose", [])
658
    c.connect_list(node_list)
659
    c.run()
660
    result = c.getresult()
661
    new_result = {}
662
    for node_name in result:
663
      if result[node_name]:
664
        nr = [objects.OS.FromDict(oss) for oss in result[node_name]]
665
      else:
666
        nr = []
667
      new_result[node_name] = nr
668
    return new_result
669

    
670

    
671
  def call_os_get(self, node, name):
672
    """Returns an OS definition.
673

674
    This is a single-node call.
675

676
    """
677
    c = Client("os_get", [name])
678
    c.connect(node)
679
    c.run()
680
    result = c.getresult().get(node, False)
681
    if isinstance(result, dict):
682
      return objects.OS.FromDict(result)
683
    else:
684
      return result
685

    
686

    
687
  def call_hooks_runner(self, node_list, hpath, phase, env):
688
    """Call the hooks runner.
689

690
    Args:
691
      - op: the OpCode instance
692
      - env: a dictionary with the environment
693

694
    This is a multi-node call.
695

696
    """
697
    params = [hpath, phase, env]
698
    c = Client("hooks_runner", params)
699
    c.connect_list(node_list)
700
    c.run()
701
    result = c.getresult()
702
    return result
703

    
704

    
705
  def call_iallocator_runner(self, node, name, idata):
706
    """Call an iallocator on a remote node
707

708
    Args:
709
      - name: the iallocator name
710
      - input: the json-encoded input string
711

712
    This is a single-node call.
713

714
    """
715
    params = [name, idata]
716
    c = Client("iallocator_runner", params)
717
    c.connect(node)
718
    c.run()
719
    result = c.getresult().get(node, False)
720
    return result
721

    
722

    
723
  def call_blockdev_grow(self, node, cf_bdev, amount):
724
    """Request a snapshot of the given block device.
725

726
    This is a single-node call.
727

728
    """
729
    c = Client("blockdev_grow", [cf_bdev.ToDict(), amount])
730
    c.connect(node)
731
    c.run()
732
    return c.getresult().get(node, False)
733

    
734

    
735
  def call_blockdev_snapshot(self, node, cf_bdev):
736
    """Request a snapshot of the given block device.
737

738
    This is a single-node call.
739

740
    """
741
    c = Client("blockdev_snapshot", [cf_bdev.ToDict()])
742
    c.connect(node)
743
    c.run()
744
    return c.getresult().get(node, False)
745

    
746

    
747
  def call_snapshot_export(self, node, snap_bdev, dest_node, instance,
748
                           cluster_name):
749
    """Request the export of a given snapshot.
750

751
    This is a single-node call.
752

753
    """
754
    params = [snap_bdev.ToDict(), dest_node, instance.ToDict(), cluster_name]
755
    c = Client("snapshot_export", params)
756
    c.connect(node)
757
    c.run()
758
    return c.getresult().get(node, False)
759

    
760

    
761
  def call_finalize_export(self, node, instance, snap_disks):
762
    """Request the completion of an export operation.
763

764
    This writes the export config file, etc.
765

766
    This is a single-node call.
767

768
    """
769
    flat_disks = []
770
    for disk in snap_disks:
771
      flat_disks.append(disk.ToDict())
772
    params = [instance.ToDict(), flat_disks]
773
    c = Client("finalize_export", params)
774
    c.connect(node)
775
    c.run()
776
    return c.getresult().get(node, False)
777

    
778

    
779
  def call_export_info(self, node, path):
780
    """Queries the export information in a given path.
781

782
    This is a single-node call.
783

784
    """
785
    c = Client("export_info", [path])
786
    c.connect(node)
787
    c.run()
788
    result = c.getresult().get(node, False)
789
    if not result:
790
      return result
791
    return objects.SerializableConfigParser.Loads(str(result))
792

    
793

    
794
  def call_instance_os_import(self, node, inst, osdev, swapdev,
795
                              src_node, src_image, cluster_name):
796
    """Request the import of a backup into an instance.
797

798
    This is a single-node call.
799

800
    """
801
    params = [inst.ToDict(), osdev, swapdev, src_node, src_image, cluster_name]
802
    c = Client("instance_os_import", params)
803
    c.connect(node)
804
    c.run()
805
    return c.getresult().get(node, False)
806

    
807

    
808
  def call_export_list(self, node_list):
809
    """Gets the stored exports list.
810

811
    This is a multi-node call.
812

813
    """
814
    c = Client("export_list", [])
815
    c.connect_list(node_list)
816
    c.run()
817
    result = c.getresult()
818
    return result
819

    
820

    
821
  def call_export_remove(self, node, export):
822
    """Requests removal of a given export.
823

824
    This is a single-node call.
825

826
    """
827
    c = Client("export_remove", [export])
828
    c.connect(node)
829
    c.run()
830
    return c.getresult().get(node, False)
831

    
832

    
833
  @staticmethod
834
  def call_node_leave_cluster(node):
835
    """Requests a node to clean the cluster information it has.
836

837
    This will remove the configuration information from the ganeti data
838
    dir.
839

840
    This is a single-node call.
841

842
    """
843
    c = Client("node_leave_cluster", [])
844
    c.connect(node)
845
    c.run()
846
    return c.getresult().get(node, False)
847

    
848

    
849
  def call_node_volumes(self, node_list):
850
    """Gets all volumes on node(s).
851

852
    This is a multi-node call.
853

854
    """
855
    c = Client("node_volumes", [])
856
    c.connect_list(node_list)
857
    c.run()
858
    return c.getresult()
859

    
860

    
861
  def call_test_delay(self, node_list, duration):
862
    """Sleep for a fixed time on given node(s).
863

864
    This is a multi-node call.
865

866
    """
867
    c = Client("test_delay", [duration])
868
    c.connect_list(node_list)
869
    c.run()
870
    return c.getresult()
871

    
872

    
873
  def call_file_storage_dir_create(self, node, file_storage_dir):
874
    """Create the given file storage directory.
875

876
    This is a single-node call.
877

878
    """
879
    c = Client("file_storage_dir_create", [file_storage_dir])
880
    c.connect(node)
881
    c.run()
882
    return c.getresult().get(node, False)
883

    
884

    
885
  def call_file_storage_dir_remove(self, node, file_storage_dir):
886
    """Remove the given file storage directory.
887

888
    This is a single-node call.
889

890
    """
891
    c = Client("file_storage_dir_remove", [file_storage_dir])
892
    c.connect(node)
893
    c.run()
894
    return c.getresult().get(node, False)
895

    
896

    
897
  def call_file_storage_dir_rename(self, node, old_file_storage_dir,
898
                                   new_file_storage_dir):
899
    """Rename file storage directory.
900

901
    This is a single-node call.
902

903
    """
904
    c = Client("file_storage_dir_rename",
905
               [old_file_storage_dir, new_file_storage_dir])
906
    c.connect(node)
907
    c.run()
908
    return c.getresult().get(node, False)
909

    
910

    
911
  @staticmethod
912
  def call_jobqueue_update(node_list, file_name, content):
913
    """Update job queue.
914

915
    This is a multi-node call.
916

917
    """
918
    c = Client("jobqueue_update", [file_name, content])
919
    c.connect_list(node_list)
920
    c.run()
921
    result = c.getresult()
922
    return result
923

    
924

    
925
  @staticmethod
926
  def call_jobqueue_purge(node):
927
    """Purge job queue.
928

929
    This is a single-node call.
930

931
    """
932
    c = Client("jobqueue_purge", [])
933
    c.connect(node)
934
    c.run()
935
    return c.getresult().get(node, False)
936

    
937

    
938
  @staticmethod
939
  def call_jobqueue_rename(node_list, old, new):
940
    """Rename a job queue file.
941

942
    This is a multi-node call.
943

944
    """
945
    c = Client("jobqueue_rename", [old, new])
946
    c.connect_list(node_list)
947
    c.run()
948
    result = c.getresult()
949
    return result