Statistics
| Branch: | Tag: | Revision:

root / lib / rpc.py @ 3ef3c771

History | View | Annotate | Download (24.1 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Inter-node RPC library.
23

24
"""
25

    
26
# pylint: disable-msg=C0103,R0201,R0904
27
# C0103: Invalid name, since call_ are not valid
28
# R0201: Method could be a function, we keep all rpcs instance methods
29
# as not to change them back and forth between static/instance methods
30
# if they need to start using instance attributes
31
# R0904: Too many public methods
32

    
33
import os
34
import socket
35
import httplib
36
import logging
37

    
38
import simplejson
39

    
40
from ganeti import utils
41
from ganeti import objects
42

    
43

    
44
class NodeController:
45
  """Node-handling class.
46

47
  For each node that we speak with, we create an instance of this
48
  class, so that we have a safe place to store the details of this
49
  individual call.
50

51
  """
52
  def __init__(self, parent, node):
53
    self.parent = parent
54
    self.node = node
55
    self.failed = False
56

    
57
    self.http_conn = hc = httplib.HTTPConnection(node, self.parent.port)
58
    try:
59
      hc.connect()
60
      hc.putrequest('PUT', "/%s" % self.parent.procedure,
61
                    skip_accept_encoding=True)
62
      hc.putheader('Content-Length', str(len(parent.body)))
63
      hc.endheaders()
64
      hc.send(parent.body)
65
    except socket.error, err:
66
      logging.exception("Error connecting to node %s", node)
67
      self.failed = True
68

    
69
  def GetResponse(self):
70
    """Try to process the response from the node.
71

72
    """
73
    if self.failed:
74
      # we already failed in connect
75
      return False
76
    resp = self.http_conn.getresponse()
77
    if resp.status != 200:
78
      return False
79
    try:
80
      length = int(resp.getheader('Content-Length', '0'))
81
    except ValueError:
82
      return False
83
    if not length:
84
      logging.error("Zero-length reply from node %s", self.node)
85
      return False
86
    payload = resp.read(length)
87
    unload = simplejson.loads(payload)
88
    return unload
89

    
90

    
91
class Client:
92
  """RPC Client class.
93

94
  This class, given a (remote) method name, a list of parameters and a
95
  list of nodes, will contact (in parallel) all nodes, and return a
96
  dict of results (key: node name, value: result).
97

98
  One current bug is that generic failure is still signalled by
99
  'False' result, which is not good. This overloading of values can
100
  cause bugs.
101

102
  """
103
  result_set = False
104
  result = False
105
  allresult = []
106

    
107
  def __init__(self, procedure, args):
108
    self.port = utils.GetNodeDaemonPort()
109
    self.nodepw = utils.GetNodeDaemonPassword()
110
    self.nc = {}
111
    self.results = {}
112
    self.procedure = procedure
113
    self.args = args
114
    self.body = simplejson.dumps(args)
115

    
116
  #--- generic connector -------------
117

    
118
  def ConnectList(self, node_list):
119
    """Add a list of nodes to the target nodes.
120

121
    @type node_list: list
122
    @param node_list: the list of node names to connect
123

124
    """
125
    for node in node_list:
126
      self.ConnectNode(node)
127

    
128
  def ConnectNode(self, connect_node):
129
    """Add a node to the target list.
130

131
    """
132
    self.nc[connect_node] = nc = NodeController(self, connect_node)
133

    
134
  def GetResults(self):
135
    """Return the results of the call.
136

137
    """
138
    return self.results
139

    
140
  def Run(self):
141
    """Gather results from the node controllers.
142

143
    This function simply calls GetResponse() for each of our node
144
    controllers.
145

146
    """
147
    for node, nc in self.nc.items():
148
      self.results[node] = nc.GetResponse()
149

    
150

    
151
class RpcRunner(object):
152
  """RPC runner class"""
153

    
154
  def __init__(self, cfg):
155
    """Initialized the rpc runner.
156

157
    @type cfg:  C{config.ConfigWriter}
158
    @param cfg: the configuration object that will be used to get data
159
                about the cluster
160

161
    """
162
    self._cfg = cfg
163

    
164
  def _InstDict(self, instance):
165
    """Convert the given instance to a dict.
166

167
    This is done via the instance's ToDict() method and additionally
168
    we fill the hvparams with the cluster defaults.
169

170
    @type instance: L{objects.Instance}
171
    @param instance: an Instance object
172
    @rtype: dict
173
    @return: the instance dict, with the hvparams filled with the
174
        cluster defaults
175

176
    """
177
    idict = instance.ToDict()
178
    cluster = self._cfg.GetClusterInfo()
179
    idict["hvparams"] = cluster.FillHV(instance)
180
    idict["beparams"] = cluster.FillBE(instance)
181
    return idict
182

    
183
  def call_volume_list(self, node_list, vg_name):
184
    """Gets the logical volumes present in a given volume group.
185

186
    This is a multi-node call.
187

188
    """
189
    c = Client("volume_list", [vg_name])
190
    c.ConnectList(node_list)
191
    c.Run()
192
    return c.GetResults()
193

    
194
  def call_vg_list(self, node_list):
195
    """Gets the volume group list.
196

197
    This is a multi-node call.
198

199
    """
200
    c = Client("vg_list", [])
201
    c.ConnectList(node_list)
202
    c.Run()
203
    return c.GetResults()
204

    
205
  def call_bridges_exist(self, node, bridges_list):
206
    """Checks if a node has all the bridges given.
207

208
    This method checks if all bridges given in the bridges_list are
209
    present on the remote node, so that an instance that uses interfaces
210
    on those bridges can be started.
211

212
    This is a single-node call.
213

214
    """
215
    c = Client("bridges_exist", [bridges_list])
216
    c.ConnectNode(node)
217
    c.Run()
218
    return c.GetResults().get(node, False)
219

    
220
  def call_instance_start(self, node, instance, extra_args):
221
    """Starts an instance.
222

223
    This is a single-node call.
224

225
    """
226
    c = Client("instance_start", [self._InstDict(instance), extra_args])
227
    c.ConnectNode(node)
228
    c.Run()
229
    return c.GetResults().get(node, False)
230

    
231
  def call_instance_shutdown(self, node, instance):
232
    """Stops an instance.
233

234
    This is a single-node call.
235

236
    """
237
    c = Client("instance_shutdown", [self._InstDict(instance)])
238
    c.ConnectNode(node)
239
    c.Run()
240
    return c.GetResults().get(node, False)
241

    
242
  def call_instance_migrate(self, node, instance, target, live):
243
    """Migrate an instance.
244

245
    This is a single-node call.
246

247
    @type node: string
248
    @param node: the node on which the instance is currently running
249
    @type instance: C{objects.Instance}
250
    @param instance: the instance definition
251
    @type target: string
252
    @param target: the target node name
253
    @type live: boolean
254
    @param live: whether the migration should be done live or not (the
255
        interpretation of this parameter is left to the hypervisor)
256

257
    """
258
    c = Client("instance_migrate", [self._InstDict(instance), target, live])
259
    c.ConnectNode(node)
260
    c.Run()
261
    return c.GetResults().get(node, False)
262

    
263
  def call_instance_reboot(self, node, instance, reboot_type, extra_args):
264
    """Reboots an instance.
265

266
    This is a single-node call.
267

268
    """
269
    c = Client("instance_reboot", [self._InstDict(instance),
270
                                   reboot_type, extra_args])
271
    c.ConnectNode(node)
272
    c.Run()
273
    return c.GetResults().get(node, False)
274

    
275
  def call_instance_os_add(self, node, inst):
276
    """Installs an OS on the given instance.
277

278
    This is a single-node call.
279

280
    """
281
    params = [self._InstDict(inst)]
282
    c = Client("instance_os_add", params)
283
    c.ConnectNode(node)
284
    c.Run()
285
    return c.GetResults().get(node, False)
286

    
287
  def call_instance_run_rename(self, node, inst, old_name):
288
    """Run the OS rename script for an instance.
289

290
    This is a single-node call.
291

292
    """
293
    params = [self._InstDict(inst), old_name]
294
    c = Client("instance_run_rename", params)
295
    c.ConnectNode(node)
296
    c.Run()
297
    return c.GetResults().get(node, False)
298

    
299
  def call_instance_info(self, node, instance, hname):
300
    """Returns information about a single instance.
301

302
    This is a single-node call.
303

304
    @type node_list: list
305
    @param node_list: the list of nodes to query
306
    @type instance: string
307
    @param instance: the instance name
308
    @type hname: string
309
    @param hname: the hypervisor type of the instance
310

311
    """
312
    c = Client("instance_info", [instance, hname])
313
    c.ConnectNode(node)
314
    c.Run()
315
    return c.GetResults().get(node, False)
316

    
317
  def call_all_instances_info(self, node_list, hypervisor_list):
318
    """Returns information about all instances on the given nodes.
319

320
    This is a multi-node call.
321

322
    @type node_list: list
323
    @param node_list: the list of nodes to query
324
    @type hypervisor_list: list
325
    @param hypervisor_list: the hypervisors to query for instances
326

327
    """
328
    c = Client("all_instances_info", [hypervisor_list])
329
    c.ConnectList(node_list)
330
    c.Run()
331
    return c.GetResults()
332

    
333
  def call_instance_list(self, node_list, hypervisor_list):
334
    """Returns the list of running instances on a given node.
335

336
    This is a multi-node call.
337

338
    @type node_list: list
339
    @param node_list: the list of nodes to query
340
    @type hypervisor_list: list
341
    @param hypervisor_list: the hypervisors to query for instances
342

343
    """
344
    c = Client("instance_list", [hypervisor_list])
345
    c.ConnectList(node_list)
346
    c.Run()
347
    return c.GetResults()
348

    
349
  def call_node_tcp_ping(self, node, source, target, port, timeout,
350
                         live_port_needed):
351
    """Do a TcpPing on the remote node
352

353
    This is a single-node call.
354

355
    """
356
    c = Client("node_tcp_ping", [source, target, port, timeout,
357
                                 live_port_needed])
358
    c.ConnectNode(node)
359
    c.Run()
360
    return c.GetResults().get(node, False)
361

    
362
  def call_node_has_ip_address(self, node, address):
363
    """Checks if a node has the given IP address.
364

365
    This is a single-node call.
366

367
    """
368
    c = Client("node_has_ip_address", [address])
369
    c.ConnectNode(node)
370
    c.Run()
371
    return c.GetResults().get(node, False)
372

    
373
  def call_node_info(self, node_list, vg_name, hypervisor_type):
374
    """Return node information.
375

376
    This will return memory information and volume group size and free
377
    space.
378

379
    This is a multi-node call.
380

381
    @type node_list: list
382
    @param node_list: the list of nodes to query
383
    @type vgname: C{string}
384
    @param vgname: the name of the volume group to ask for disk space
385
        information
386
    @type hypervisor_type: C{str}
387
    @param hypervisor_type: the name of the hypervisor to ask for
388
        memory information
389

390
    """
391
    c = Client("node_info", [vg_name, hypervisor_type])
392
    c.ConnectList(node_list)
393
    c.Run()
394
    retux = c.GetResults()
395

    
396
    for node_name in retux:
397
      ret = retux.get(node_name, False)
398
      if type(ret) != dict:
399
        logging.error("could not connect to node %s", node_name)
400
        ret = {}
401

    
402
      utils.CheckDict(ret,
403
                      { 'memory_total' : '-',
404
                        'memory_dom0' : '-',
405
                        'memory_free' : '-',
406
                        'vg_size' : 'node_unreachable',
407
                        'vg_free' : '-' },
408
                      "call_node_info",
409
                      )
410
    return retux
411

    
412
  def call_node_add(self, node, dsa, dsapub, rsa, rsapub, ssh, sshpub):
413
    """Add a node to the cluster.
414

415
    This is a single-node call.
416

417
    """
418
    params = [dsa, dsapub, rsa, rsapub, ssh, sshpub]
419
    c = Client("node_add", params)
420
    c.ConnectNode(node)
421
    c.Run()
422
    return c.GetResults().get(node, False)
423

    
424
  def call_node_verify(self, node_list, checkdict, cluster_name):
425
    """Request verification of given parameters.
426

427
    This is a multi-node call.
428

429
    """
430
    c = Client("node_verify", [checkdict, cluster_name])
431
    c.ConnectList(node_list)
432
    c.Run()
433
    return c.GetResults()
434

    
435
  @staticmethod
436
  def call_node_start_master(node, start_daemons):
437
    """Tells a node to activate itself as a master.
438

439
    This is a single-node call.
440

441
    """
442
    c = Client("node_start_master", [start_daemons])
443
    c.ConnectNode(node)
444
    c.Run()
445
    return c.GetResults().get(node, False)
446

    
447
  @staticmethod
448
  def call_node_stop_master(node, stop_daemons):
449
    """Tells a node to demote itself from master status.
450

451
    This is a single-node call.
452

453
    """
454
    c = Client("node_stop_master", [stop_daemons])
455
    c.ConnectNode(node)
456
    c.Run()
457
    return c.GetResults().get(node, False)
458

    
459
  @staticmethod
460
  def call_master_info(node_list):
461
    """Query master info.
462

463
    This is a multi-node call.
464

465
    """
466
    # TODO: should this method query down nodes?
467
    c = Client("master_info", [])
468
    c.ConnectList(node_list)
469
    c.Run()
470
    return c.GetResults()
471

    
472
  def call_version(self, node_list):
473
    """Query node version.
474

475
    This is a multi-node call.
476

477
    """
478
    c = Client("version", [])
479
    c.ConnectList(node_list)
480
    c.Run()
481
    return c.GetResults()
482

    
483
  def call_blockdev_create(self, node, bdev, size, owner, on_primary, info):
484
    """Request creation of a given block device.
485

486
    This is a single-node call.
487

488
    """
489
    params = [bdev.ToDict(), size, owner, on_primary, info]
490
    c = Client("blockdev_create", params)
491
    c.ConnectNode(node)
492
    c.Run()
493
    return c.GetResults().get(node, False)
494

    
495
  def call_blockdev_remove(self, node, bdev):
496
    """Request removal of a given block device.
497

498
    This is a single-node call.
499

500
    """
501
    c = Client("blockdev_remove", [bdev.ToDict()])
502
    c.ConnectNode(node)
503
    c.Run()
504
    return c.GetResults().get(node, False)
505

    
506
  def call_blockdev_rename(self, node, devlist):
507
    """Request rename of the given block devices.
508

509
    This is a single-node call.
510

511
    """
512
    params = [(d.ToDict(), uid) for d, uid in devlist]
513
    c = Client("blockdev_rename", params)
514
    c.ConnectNode(node)
515
    c.Run()
516
    return c.GetResults().get(node, False)
517

    
518
  def call_blockdev_assemble(self, node, disk, owner, on_primary):
519
    """Request assembling of a given block device.
520

521
    This is a single-node call.
522

523
    """
524
    params = [disk.ToDict(), owner, on_primary]
525
    c = Client("blockdev_assemble", params)
526
    c.ConnectNode(node)
527
    c.Run()
528
    return c.GetResults().get(node, False)
529

    
530
  def call_blockdev_shutdown(self, node, disk):
531
    """Request shutdown of a given block device.
532

533
    This is a single-node call.
534

535
    """
536
    c = Client("blockdev_shutdown", [disk.ToDict()])
537
    c.ConnectNode(node)
538
    c.Run()
539
    return c.GetResults().get(node, False)
540

    
541
  def call_blockdev_addchildren(self, node, bdev, ndevs):
542
    """Request adding a list of children to a (mirroring) device.
543

544
    This is a single-node call.
545

546
    """
547
    params = [bdev.ToDict(), [disk.ToDict() for disk in ndevs]]
548
    c = Client("blockdev_addchildren", params)
549
    c.ConnectNode(node)
550
    c.Run()
551
    return c.GetResults().get(node, False)
552

    
553
  def call_blockdev_removechildren(self, node, bdev, ndevs):
554
    """Request removing a list of children from a (mirroring) device.
555

556
    This is a single-node call.
557

558
    """
559
    params = [bdev.ToDict(), [disk.ToDict() for disk in ndevs]]
560
    c = Client("blockdev_removechildren", params)
561
    c.ConnectNode(node)
562
    c.Run()
563
    return c.GetResults().get(node, False)
564

    
565
  def call_blockdev_getmirrorstatus(self, node, disks):
566
    """Request status of a (mirroring) device.
567

568
    This is a single-node call.
569

570
    """
571
    params = [dsk.ToDict() for dsk in disks]
572
    c = Client("blockdev_getmirrorstatus", params)
573
    c.ConnectNode(node)
574
    c.Run()
575
    return c.GetResults().get(node, False)
576

    
577
  def call_blockdev_find(self, node, disk):
578
    """Request identification of a given block device.
579

580
    This is a single-node call.
581

582
    """
583
    c = Client("blockdev_find", [disk.ToDict()])
584
    c.ConnectNode(node)
585
    c.Run()
586
    return c.GetResults().get(node, False)
587

    
588
  def call_blockdev_close(self, node, disks):
589
    """Closes the given block devices.
590

591
    This is a single-node call.
592

593
    """
594
    params = [cf.ToDict() for cf in disks]
595
    c = Client("blockdev_close", params)
596
    c.ConnectNode(node)
597
    c.Run()
598
    return c.GetResults().get(node, False)
599

    
600
  @staticmethod
601
  def call_upload_file(node_list, file_name):
602
    """Upload a file.
603

604
    The node will refuse the operation in case the file is not on the
605
    approved file list.
606

607
    This is a multi-node call.
608

609
    """
610
    fh = file(file_name)
611
    try:
612
      data = fh.read()
613
    finally:
614
      fh.close()
615
    st = os.stat(file_name)
616
    params = [file_name, data, st.st_mode, st.st_uid, st.st_gid,
617
              st.st_atime, st.st_mtime]
618
    c = Client("upload_file", params)
619
    c.ConnectList(node_list)
620
    c.Run()
621
    return c.GetResults()
622

    
623
  def call_os_diagnose(self, node_list):
624
    """Request a diagnose of OS definitions.
625

626
    This is a multi-node call.
627

628
    """
629
    c = Client("os_diagnose", [])
630
    c.ConnectList(node_list)
631
    c.Run()
632
    result = c.GetResults()
633
    new_result = {}
634
    for node_name in result:
635
      if result[node_name]:
636
        nr = [objects.OS.FromDict(oss) for oss in result[node_name]]
637
      else:
638
        nr = []
639
      new_result[node_name] = nr
640
    return new_result
641

    
642
  def call_os_get(self, node, name):
643
    """Returns an OS definition.
644

645
    This is a single-node call.
646

647
    """
648
    c = Client("os_get", [name])
649
    c.ConnectNode(node)
650
    c.Run()
651
    result = c.GetResults().get(node, False)
652
    if isinstance(result, dict):
653
      return objects.OS.FromDict(result)
654
    else:
655
      return result
656

    
657
  def call_hooks_runner(self, node_list, hpath, phase, env):
658
    """Call the hooks runner.
659

660
    Args:
661
      - op: the OpCode instance
662
      - env: a dictionary with the environment
663

664
    This is a multi-node call.
665

666
    """
667
    params = [hpath, phase, env]
668
    c = Client("hooks_runner", params)
669
    c.ConnectList(node_list)
670
    c.Run()
671
    result = c.GetResults()
672
    return result
673

    
674
  def call_iallocator_runner(self, node, name, idata):
675
    """Call an iallocator on a remote node
676

677
    Args:
678
      - name: the iallocator name
679
      - input: the json-encoded input string
680

681
    This is a single-node call.
682

683
    """
684
    params = [name, idata]
685
    c = Client("iallocator_runner", params)
686
    c.ConnectNode(node)
687
    c.Run()
688
    result = c.GetResults().get(node, False)
689
    return result
690

    
691
  def call_blockdev_grow(self, node, cf_bdev, amount):
692
    """Request a snapshot of the given block device.
693

694
    This is a single-node call.
695

696
    """
697
    c = Client("blockdev_grow", [cf_bdev.ToDict(), amount])
698
    c.ConnectNode(node)
699
    c.Run()
700
    return c.GetResults().get(node, False)
701

    
702
  def call_blockdev_snapshot(self, node, cf_bdev):
703
    """Request a snapshot of the given block device.
704

705
    This is a single-node call.
706

707
    """
708
    c = Client("blockdev_snapshot", [cf_bdev.ToDict()])
709
    c.ConnectNode(node)
710
    c.Run()
711
    return c.GetResults().get(node, False)
712

    
713
  def call_snapshot_export(self, node, snap_bdev, dest_node, instance,
714
                           cluster_name):
715
    """Request the export of a given snapshot.
716

717
    This is a single-node call.
718

719
    """
720
    params = [snap_bdev.ToDict(), dest_node,
721
              self._InstDict(instance), cluster_name]
722
    c = Client("snapshot_export", params)
723
    c.ConnectNode(node)
724
    c.Run()
725
    return c.GetResults().get(node, False)
726

    
727
  def call_finalize_export(self, node, instance, snap_disks):
728
    """Request the completion of an export operation.
729

730
    This writes the export config file, etc.
731

732
    This is a single-node call.
733

734
    """
735
    flat_disks = []
736
    for disk in snap_disks:
737
      flat_disks.append(disk.ToDict())
738
    params = [self._InstDict(instance), flat_disks]
739
    c = Client("finalize_export", params)
740
    c.ConnectNode(node)
741
    c.Run()
742
    return c.GetResults().get(node, False)
743

    
744
  def call_export_info(self, node, path):
745
    """Queries the export information in a given path.
746

747
    This is a single-node call.
748

749
    """
750
    c = Client("export_info", [path])
751
    c.ConnectNode(node)
752
    c.Run()
753
    result = c.GetResults().get(node, False)
754
    if not result:
755
      return result
756
    return objects.SerializableConfigParser.Loads(str(result))
757

    
758
  def call_instance_os_import(self, node, inst, osdev, swapdev,
759
                              src_node, src_image, cluster_name):
760
    """Request the import of a backup into an instance.
761

762
    This is a single-node call.
763

764
    """
765
    params = [self._InstDict(inst), osdev, swapdev,
766
              src_node, src_image, cluster_name]
767
    c = Client("instance_os_import", params)
768
    c.ConnectNode(node)
769
    c.Run()
770
    return c.GetResults().get(node, False)
771

    
772
  def call_export_list(self, node_list):
773
    """Gets the stored exports list.
774

775
    This is a multi-node call.
776

777
    """
778
    c = Client("export_list", [])
779
    c.ConnectList(node_list)
780
    c.Run()
781
    result = c.GetResults()
782
    return result
783

    
784
  def call_export_remove(self, node, export):
785
    """Requests removal of a given export.
786

787
    This is a single-node call.
788

789
    """
790
    c = Client("export_remove", [export])
791
    c.ConnectNode(node)
792
    c.Run()
793
    return c.GetResults().get(node, False)
794

    
795
  @staticmethod
796
  def call_node_leave_cluster(node):
797
    """Requests a node to clean the cluster information it has.
798

799
    This will remove the configuration information from the ganeti data
800
    dir.
801

802
    This is a single-node call.
803

804
    """
805
    c = Client("node_leave_cluster", [])
806
    c.ConnectNode(node)
807
    c.Run()
808
    return c.GetResults().get(node, False)
809

    
810
  def call_node_volumes(self, node_list):
811
    """Gets all volumes on node(s).
812

813
    This is a multi-node call.
814

815
    """
816
    c = Client("node_volumes", [])
817
    c.ConnectList(node_list)
818
    c.Run()
819
    return c.GetResults()
820

    
821
  def call_test_delay(self, node_list, duration):
822
    """Sleep for a fixed time on given node(s).
823

824
    This is a multi-node call.
825

826
    """
827
    c = Client("test_delay", [duration])
828
    c.ConnectList(node_list)
829
    c.Run()
830
    return c.GetResults()
831

    
832
  def call_file_storage_dir_create(self, node, file_storage_dir):
833
    """Create the given file storage directory.
834

835
    This is a single-node call.
836

837
    """
838
    c = Client("file_storage_dir_create", [file_storage_dir])
839
    c.ConnectNode(node)
840
    c.Run()
841
    return c.GetResults().get(node, False)
842

    
843
  def call_file_storage_dir_remove(self, node, file_storage_dir):
844
    """Remove the given file storage directory.
845

846
    This is a single-node call.
847

848
    """
849
    c = Client("file_storage_dir_remove", [file_storage_dir])
850
    c.ConnectNode(node)
851
    c.Run()
852
    return c.GetResults().get(node, False)
853

    
854
  def call_file_storage_dir_rename(self, node, old_file_storage_dir,
855
                                   new_file_storage_dir):
856
    """Rename file storage directory.
857

858
    This is a single-node call.
859

860
    """
861
    c = Client("file_storage_dir_rename",
862
               [old_file_storage_dir, new_file_storage_dir])
863
    c.ConnectNode(node)
864
    c.Run()
865
    return c.GetResults().get(node, False)
866

    
867
  @staticmethod
868
  def call_jobqueue_update(node_list, file_name, content):
869
    """Update job queue.
870

871
    This is a multi-node call.
872

873
    """
874
    c = Client("jobqueue_update", [file_name, content])
875
    c.ConnectList(node_list)
876
    c.Run()
877
    result = c.GetResults()
878
    return result
879

    
880
  @staticmethod
881
  def call_jobqueue_purge(node):
882
    """Purge job queue.
883

884
    This is a single-node call.
885

886
    """
887
    c = Client("jobqueue_purge", [])
888
    c.ConnectNode(node)
889
    c.Run()
890
    return c.GetResults().get(node, False)
891

    
892
  @staticmethod
893
  def call_jobqueue_rename(node_list, old, new):
894
    """Rename a job queue file.
895

896
    This is a multi-node call.
897

898
    """
899
    c = Client("jobqueue_rename", [old, new])
900
    c.ConnectList(node_list)
901
    c.Run()
902
    result = c.GetResults()
903
    return result
904

    
905

    
906
  @staticmethod
907
  def call_jobqueue_set_drain(node_list, drain_flag):
908
    """Set the drain flag on the queue.
909

910
    This is a multi-node call.
911

912
    @type node_list: list
913
    @param node_list: the list of nodes to query
914
    @type drain_flag: bool
915
    @param drain_flag: if True, will set the drain flag, otherwise reset it.
916

917
    """
918
    c = Client("jobqueue_set_drain", [drain_flag])
919
    c.ConnectList(node_list)
920
    c.Run()
921
    result = c.GetResults()
922
    return result
923

    
924

    
925
  def call_hypervisor_validate_params(self, node_list, hvname, hvparams):
926
    """Validate the hypervisor params.
927

928
    This is a multi-node call.
929

930
    @type node_list: list
931
    @param node_list: the list of nodes to query
932
    @type hvname: string
933
    @param hvname: the hypervisor name
934
    @type hvparams: dict
935
    @param hvparams: the hypervisor parameters to be validated
936

937
    """
938
    cluster = self._cfg.GetClusterInfo()
939
    hv_full = cluster.FillDict(cluster.hvparams.get(hvname, {}), hvparams)
940
    c = Client("hypervisor_validate_params", [hvname, hv_full])
941
    c.ConnectList(node_list)
942
    c.Run()
943
    result = c.GetResults()
944
    return result