Statistics
| Branch: | Tag: | Revision:

root / lib / rpc.py @ 58b311ca

History | View | Annotate | Download (23.9 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Script to show add a new node to the cluster
23

24
"""
25

    
26
# pylint: disable-msg=C0103,R0201,R0904
27
# C0103: Invalid name, since call_ are not valid
28
# R0201: Method could be a function, we keep all rpcs instance methods
29
# as not to change them back and forth between static/instance methods
30
# if they need to start using instance attributes
31
# R0904: Too many public methods
32

    
33
import os
34
import socket
35
import httplib
36
import logging
37

    
38
import simplejson
39

    
40
from ganeti import utils
41
from ganeti import objects
42

    
43

    
44
class NodeController:
45
  """Node-handling class.
46

47
  For each node that we speak with, we create an instance of this
48
  class, so that we have a safe place to store the details of this
49
  individual call.
50

51
  """
52
  def __init__(self, parent, node):
53
    self.parent = parent
54
    self.node = node
55
    self.failed = False
56

    
57
    self.http_conn = hc = httplib.HTTPConnection(node, self.parent.port)
58
    try:
59
      hc.connect()
60
      hc.putrequest('PUT', "/%s" % self.parent.procedure,
61
                    skip_accept_encoding=True)
62
      hc.putheader('Content-Length', str(len(parent.body)))
63
      hc.endheaders()
64
      hc.send(parent.body)
65
    except socket.error, err:
66
      logging.exception("Error connecting to node %s", node)
67
      self.failed = True
68

    
69
  def get_response(self):
70
    """Try to process the response from the node.
71

72
    """
73
    if self.failed:
74
      # we already failed in connect
75
      return False
76
    resp = self.http_conn.getresponse()
77
    if resp.status != 200:
78
      return False
79
    try:
80
      length = int(resp.getheader('Content-Length', '0'))
81
    except ValueError:
82
      return False
83
    if not length:
84
      logging.error("Zero-length reply from node %s", self.node)
85
      return False
86
    payload = resp.read(length)
87
    unload = simplejson.loads(payload)
88
    return unload
89

    
90

    
91
class Client:
92
  """RPC Client class.
93

94
  This class, given a (remote) method name, a list of parameters and a
95
  list of nodes, will contact (in parallel) all nodes, and return a
96
  dict of results (key: node name, value: result).
97

98
  One current bug is that generic failure is still signalled by
99
  'False' result, which is not good. This overloading of values can
100
  cause bugs.
101

102
  """
103
  result_set = False
104
  result = False
105
  allresult = []
106

    
107
  def __init__(self, procedure, args):
108
    self.port = utils.GetNodeDaemonPort()
109
    self.nodepw = utils.GetNodeDaemonPassword()
110
    self.nc = {}
111
    self.results = {}
112
    self.procedure = procedure
113
    self.args = args
114
    self.body = simplejson.dumps(args)
115

    
116
  #--- generic connector -------------
117

    
118
  def connect_list(self, node_list):
119
    """Add a list of nodes to the target nodes.
120

121
    """
122
    for node in node_list:
123
      self.connect(node)
124

    
125
  def connect(self, connect_node):
126
    """Add a node to the target list.
127

128
    """
129
    self.nc[connect_node] = nc = NodeController(self, connect_node)
130

    
131
  def getresult(self):
132
    """Return the results of the call.
133

134
    """
135
    return self.results
136

    
137
  def run(self):
138
    """Wrapper over reactor.run().
139

140
    This function simply calls reactor.run() if we have any requests
141
    queued, otherwise it does nothing.
142

143
    """
144
    for node, nc in self.nc.items():
145
      self.results[node] = nc.get_response()
146

    
147

    
148
class RpcRunner(object):
149
  """RPC runner class"""
150

    
151
  def __init__(self, cfg):
152
    """Initialized the rpc runner.
153

154
    @type cfg:  C{config.ConfigWriter}
155
    @param cfg: the configuration object that will be used to get data
156
                about the cluster
157

158
    """
159
    self._cfg = cfg
160

    
161
  def _InstDict(self, instance):
162
    """Convert the given instance to a dict.
163

164
    This is done via the instance's ToDict() method and additionally
165
    we fill the hvparams with the cluster defaults.
166

167
    @type instance: L{objects.Instance}
168
    @param instance: an Instance object
169
    @rtype: dict
170
    @return: the instance dict, with the hvparams filled with the
171
        cluster defaults
172

173
    """
174
    idict = instance.ToDict()
175
    cluster = self._cfg.GetClusterInfo()
176
    idict["hvparams"] = cluster.FillHV(instance)
177
    idict["beparams"] = cluster.FillBE(instance)
178
    return idict
179

    
180
  def call_volume_list(self, node_list, vg_name):
181
    """Gets the logical volumes present in a given volume group.
182

183
    This is a multi-node call.
184

185
    """
186
    c = Client("volume_list", [vg_name])
187
    c.connect_list(node_list)
188
    c.run()
189
    return c.getresult()
190

    
191
  def call_vg_list(self, node_list):
192
    """Gets the volume group list.
193

194
    This is a multi-node call.
195

196
    """
197
    c = Client("vg_list", [])
198
    c.connect_list(node_list)
199
    c.run()
200
    return c.getresult()
201

    
202
  def call_bridges_exist(self, node, bridges_list):
203
    """Checks if a node has all the bridges given.
204

205
    This method checks if all bridges given in the bridges_list are
206
    present on the remote node, so that an instance that uses interfaces
207
    on those bridges can be started.
208

209
    This is a single-node call.
210

211
    """
212
    c = Client("bridges_exist", [bridges_list])
213
    c.connect(node)
214
    c.run()
215
    return c.getresult().get(node, False)
216

    
217
  def call_instance_start(self, node, instance, extra_args):
218
    """Starts an instance.
219

220
    This is a single-node call.
221

222
    """
223
    c = Client("instance_start", [self._InstDict(instance), extra_args])
224
    c.connect(node)
225
    c.run()
226
    return c.getresult().get(node, False)
227

    
228
  def call_instance_shutdown(self, node, instance):
229
    """Stops an instance.
230

231
    This is a single-node call.
232

233
    """
234
    c = Client("instance_shutdown", [self._InstDict(instance)])
235
    c.connect(node)
236
    c.run()
237
    return c.getresult().get(node, False)
238

    
239
  def call_instance_migrate(self, node, instance, target, live):
240
    """Migrate an instance.
241

242
    This is a single-node call.
243

244
    @type node: string
245
    @param node: the node on which the instance is currently running
246
    @type instance: C{objects.Instance}
247
    @param instance: the instance definition
248
    @type target: string
249
    @param target: the target node name
250
    @type live: boolean
251
    @param live: whether the migration should be done live or not (the
252
        interpretation of this parameter is left to the hypervisor)
253

254
    """
255
    c = Client("instance_migrate", [self._InstDict(instance), target, live])
256
    c.connect(node)
257
    c.run()
258
    return c.getresult().get(node, False)
259

    
260
  def call_instance_reboot(self, node, instance, reboot_type, extra_args):
261
    """Reboots an instance.
262

263
    This is a single-node call.
264

265
    """
266
    c = Client("instance_reboot", [self._InstDict(instance),
267
                                   reboot_type, extra_args])
268
    c.connect(node)
269
    c.run()
270
    return c.getresult().get(node, False)
271

    
272
  def call_instance_os_add(self, node, inst):
273
    """Installs an OS on the given instance.
274

275
    This is a single-node call.
276

277
    """
278
    params = [self._InstDict(inst)]
279
    c = Client("instance_os_add", params)
280
    c.connect(node)
281
    c.run()
282
    return c.getresult().get(node, False)
283

    
284
  def call_instance_run_rename(self, node, inst, old_name):
285
    """Run the OS rename script for an instance.
286

287
    This is a single-node call.
288

289
    """
290
    params = [self._InstDict(inst), old_name]
291
    c = Client("instance_run_rename", params)
292
    c.connect(node)
293
    c.run()
294
    return c.getresult().get(node, False)
295

    
296
  def call_instance_info(self, node, instance, hname):
297
    """Returns information about a single instance.
298

299
    This is a single-node call.
300

301
    @type node_list: list
302
    @param node_list: the list of nodes to query
303
    @type instance: string
304
    @param instance: the instance name
305
    @type hname: string
306
    @param hname: the hypervisor type of the instance
307

308
    """
309
    c = Client("instance_info", [instance, hname])
310
    c.connect(node)
311
    c.run()
312
    return c.getresult().get(node, False)
313

    
314
  def call_all_instances_info(self, node_list, hypervisor_list):
315
    """Returns information about all instances on the given nodes.
316

317
    This is a multi-node call.
318

319
    @type node_list: list
320
    @param node_list: the list of nodes to query
321
    @type hypervisor_list: list
322
    @param hypervisor_list: the hypervisors to query for instances
323

324
    """
325
    c = Client("all_instances_info", [hypervisor_list])
326
    c.connect_list(node_list)
327
    c.run()
328
    return c.getresult()
329

    
330
  def call_instance_list(self, node_list, hypervisor_list):
331
    """Returns the list of running instances on a given node.
332

333
    This is a multi-node call.
334

335
    @type node_list: list
336
    @param node_list: the list of nodes to query
337
    @type hypervisor_list: list
338
    @param hypervisor_list: the hypervisors to query for instances
339

340
    """
341
    c = Client("instance_list", [hypervisor_list])
342
    c.connect_list(node_list)
343
    c.run()
344
    return c.getresult()
345

    
346
  def call_node_tcp_ping(self, node, source, target, port, timeout,
347
                         live_port_needed):
348
    """Do a TcpPing on the remote node
349

350
    This is a single-node call.
351

352
    """
353
    c = Client("node_tcp_ping", [source, target, port, timeout,
354
                                 live_port_needed])
355
    c.connect(node)
356
    c.run()
357
    return c.getresult().get(node, False)
358

    
359
  def call_node_has_ip_address(self, node, address):
360
    """Checks if a node has the given IP address.
361

362
    This is a single-node call.
363

364
    """
365
    c = Client("node_has_ip_address", [address])
366
    c.connect(node)
367
    c.run()
368
    return c.getresult().get(node, False)
369

    
370
  def call_node_info(self, node_list, vg_name, hypervisor_type):
371
    """Return node information.
372

373
    This will return memory information and volume group size and free
374
    space.
375

376
    This is a multi-node call.
377

378
    @type node_list: list
379
    @param node_list: the list of nodes to query
380
    @type vgname: C{string}
381
    @param vgname: the name of the volume group to ask for disk space
382
        information
383
    @type hypervisor_type: C{str}
384
    @param hypervisor_type: the name of the hypervisor to ask for
385
        memory information
386

387
    """
388
    c = Client("node_info", [vg_name, hypervisor_type])
389
    c.connect_list(node_list)
390
    c.run()
391
    retux = c.getresult()
392

    
393
    for node_name in retux:
394
      ret = retux.get(node_name, False)
395
      if type(ret) != dict:
396
        logging.error("could not connect to node %s", node_name)
397
        ret = {}
398

    
399
      utils.CheckDict(ret,
400
                      { 'memory_total' : '-',
401
                        'memory_dom0' : '-',
402
                        'memory_free' : '-',
403
                        'vg_size' : 'node_unreachable',
404
                        'vg_free' : '-' },
405
                      "call_node_info",
406
                      )
407
    return retux
408

    
409
  def call_node_add(self, node, dsa, dsapub, rsa, rsapub, ssh, sshpub):
410
    """Add a node to the cluster.
411

412
    This is a single-node call.
413

414
    """
415
    params = [dsa, dsapub, rsa, rsapub, ssh, sshpub]
416
    c = Client("node_add", params)
417
    c.connect(node)
418
    c.run()
419
    return c.getresult().get(node, False)
420

    
421
  def call_node_verify(self, node_list, checkdict, cluster_name):
422
    """Request verification of given parameters.
423

424
    This is a multi-node call.
425

426
    """
427
    c = Client("node_verify", [checkdict, cluster_name])
428
    c.connect_list(node_list)
429
    c.run()
430
    return c.getresult()
431

    
432
  @staticmethod
433
  def call_node_start_master(node, start_daemons):
434
    """Tells a node to activate itself as a master.
435

436
    This is a single-node call.
437

438
    """
439
    c = Client("node_start_master", [start_daemons])
440
    c.connect(node)
441
    c.run()
442
    return c.getresult().get(node, False)
443

    
444
  @staticmethod
445
  def call_node_stop_master(node, stop_daemons):
446
    """Tells a node to demote itself from master status.
447

448
    This is a single-node call.
449

450
    """
451
    c = Client("node_stop_master", [stop_daemons])
452
    c.connect(node)
453
    c.run()
454
    return c.getresult().get(node, False)
455

    
456
  @staticmethod
457
  def call_master_info(node_list):
458
    """Query master info.
459

460
    This is a multi-node call.
461

462
    """
463
    # TODO: should this method query down nodes?
464
    c = Client("master_info", [])
465
    c.connect_list(node_list)
466
    c.run()
467
    return c.getresult()
468

    
469
  def call_version(self, node_list):
470
    """Query node version.
471

472
    This is a multi-node call.
473

474
    """
475
    c = Client("version", [])
476
    c.connect_list(node_list)
477
    c.run()
478
    return c.getresult()
479

    
480
  def call_blockdev_create(self, node, bdev, size, owner, on_primary, info):
481
    """Request creation of a given block device.
482

483
    This is a single-node call.
484

485
    """
486
    params = [bdev.ToDict(), size, owner, on_primary, info]
487
    c = Client("blockdev_create", params)
488
    c.connect(node)
489
    c.run()
490
    return c.getresult().get(node, False)
491

    
492
  def call_blockdev_remove(self, node, bdev):
493
    """Request removal of a given block device.
494

495
    This is a single-node call.
496

497
    """
498
    c = Client("blockdev_remove", [bdev.ToDict()])
499
    c.connect(node)
500
    c.run()
501
    return c.getresult().get(node, False)
502

    
503
  def call_blockdev_rename(self, node, devlist):
504
    """Request rename of the given block devices.
505

506
    This is a single-node call.
507

508
    """
509
    params = [(d.ToDict(), uid) for d, uid in devlist]
510
    c = Client("blockdev_rename", params)
511
    c.connect(node)
512
    c.run()
513
    return c.getresult().get(node, False)
514

    
515
  def call_blockdev_assemble(self, node, disk, owner, on_primary):
516
    """Request assembling of a given block device.
517

518
    This is a single-node call.
519

520
    """
521
    params = [disk.ToDict(), owner, on_primary]
522
    c = Client("blockdev_assemble", params)
523
    c.connect(node)
524
    c.run()
525
    return c.getresult().get(node, False)
526

    
527
  def call_blockdev_shutdown(self, node, disk):
528
    """Request shutdown of a given block device.
529

530
    This is a single-node call.
531

532
    """
533
    c = Client("blockdev_shutdown", [disk.ToDict()])
534
    c.connect(node)
535
    c.run()
536
    return c.getresult().get(node, False)
537

    
538
  def call_blockdev_addchildren(self, node, bdev, ndevs):
539
    """Request adding a list of children to a (mirroring) device.
540

541
    This is a single-node call.
542

543
    """
544
    params = [bdev.ToDict(), [disk.ToDict() for disk in ndevs]]
545
    c = Client("blockdev_addchildren", params)
546
    c.connect(node)
547
    c.run()
548
    return c.getresult().get(node, False)
549

    
550
  def call_blockdev_removechildren(self, node, bdev, ndevs):
551
    """Request removing a list of children from a (mirroring) device.
552

553
    This is a single-node call.
554

555
    """
556
    params = [bdev.ToDict(), [disk.ToDict() for disk in ndevs]]
557
    c = Client("blockdev_removechildren", params)
558
    c.connect(node)
559
    c.run()
560
    return c.getresult().get(node, False)
561

    
562
  def call_blockdev_getmirrorstatus(self, node, disks):
563
    """Request status of a (mirroring) device.
564

565
    This is a single-node call.
566

567
    """
568
    params = [dsk.ToDict() for dsk in disks]
569
    c = Client("blockdev_getmirrorstatus", params)
570
    c.connect(node)
571
    c.run()
572
    return c.getresult().get(node, False)
573

    
574
  def call_blockdev_find(self, node, disk):
575
    """Request identification of a given block device.
576

577
    This is a single-node call.
578

579
    """
580
    c = Client("blockdev_find", [disk.ToDict()])
581
    c.connect(node)
582
    c.run()
583
    return c.getresult().get(node, False)
584

    
585
  def call_blockdev_close(self, node, disks):
586
    """Closes the given block devices.
587

588
    This is a single-node call.
589

590
    """
591
    params = [cf.ToDict() for cf in disks]
592
    c = Client("blockdev_close", params)
593
    c.connect(node)
594
    c.run()
595
    return c.getresult().get(node, False)
596

    
597
  @staticmethod
598
  def call_upload_file(node_list, file_name):
599
    """Upload a file.
600

601
    The node will refuse the operation in case the file is not on the
602
    approved file list.
603

604
    This is a multi-node call.
605

606
    """
607
    fh = file(file_name)
608
    try:
609
      data = fh.read()
610
    finally:
611
      fh.close()
612
    st = os.stat(file_name)
613
    params = [file_name, data, st.st_mode, st.st_uid, st.st_gid,
614
              st.st_atime, st.st_mtime]
615
    c = Client("upload_file", params)
616
    c.connect_list(node_list)
617
    c.run()
618
    return c.getresult()
619

    
620
  def call_os_diagnose(self, node_list):
621
    """Request a diagnose of OS definitions.
622

623
    This is a multi-node call.
624

625
    """
626
    c = Client("os_diagnose", [])
627
    c.connect_list(node_list)
628
    c.run()
629
    result = c.getresult()
630
    new_result = {}
631
    for node_name in result:
632
      if result[node_name]:
633
        nr = [objects.OS.FromDict(oss) for oss in result[node_name]]
634
      else:
635
        nr = []
636
      new_result[node_name] = nr
637
    return new_result
638

    
639
  def call_os_get(self, node, name):
640
    """Returns an OS definition.
641

642
    This is a single-node call.
643

644
    """
645
    c = Client("os_get", [name])
646
    c.connect(node)
647
    c.run()
648
    result = c.getresult().get(node, False)
649
    if isinstance(result, dict):
650
      return objects.OS.FromDict(result)
651
    else:
652
      return result
653

    
654
  def call_hooks_runner(self, node_list, hpath, phase, env):
655
    """Call the hooks runner.
656

657
    Args:
658
      - op: the OpCode instance
659
      - env: a dictionary with the environment
660

661
    This is a multi-node call.
662

663
    """
664
    params = [hpath, phase, env]
665
    c = Client("hooks_runner", params)
666
    c.connect_list(node_list)
667
    c.run()
668
    result = c.getresult()
669
    return result
670

    
671
  def call_iallocator_runner(self, node, name, idata):
672
    """Call an iallocator on a remote node
673

674
    Args:
675
      - name: the iallocator name
676
      - input: the json-encoded input string
677

678
    This is a single-node call.
679

680
    """
681
    params = [name, idata]
682
    c = Client("iallocator_runner", params)
683
    c.connect(node)
684
    c.run()
685
    result = c.getresult().get(node, False)
686
    return result
687

    
688
  def call_blockdev_grow(self, node, cf_bdev, amount):
689
    """Request a snapshot of the given block device.
690

691
    This is a single-node call.
692

693
    """
694
    c = Client("blockdev_grow", [cf_bdev.ToDict(), amount])
695
    c.connect(node)
696
    c.run()
697
    return c.getresult().get(node, False)
698

    
699
  def call_blockdev_snapshot(self, node, cf_bdev):
700
    """Request a snapshot of the given block device.
701

702
    This is a single-node call.
703

704
    """
705
    c = Client("blockdev_snapshot", [cf_bdev.ToDict()])
706
    c.connect(node)
707
    c.run()
708
    return c.getresult().get(node, False)
709

    
710
  def call_snapshot_export(self, node, snap_bdev, dest_node, instance,
711
                           cluster_name):
712
    """Request the export of a given snapshot.
713

714
    This is a single-node call.
715

716
    """
717
    params = [snap_bdev.ToDict(), dest_node,
718
              self._InstDict(instance), cluster_name]
719
    c = Client("snapshot_export", params)
720
    c.connect(node)
721
    c.run()
722
    return c.getresult().get(node, False)
723

    
724
  def call_finalize_export(self, node, instance, snap_disks):
725
    """Request the completion of an export operation.
726

727
    This writes the export config file, etc.
728

729
    This is a single-node call.
730

731
    """
732
    flat_disks = []
733
    for disk in snap_disks:
734
      flat_disks.append(disk.ToDict())
735
    params = [self._InstDict(instance), flat_disks]
736
    c = Client("finalize_export", params)
737
    c.connect(node)
738
    c.run()
739
    return c.getresult().get(node, False)
740

    
741
  def call_export_info(self, node, path):
742
    """Queries the export information in a given path.
743

744
    This is a single-node call.
745

746
    """
747
    c = Client("export_info", [path])
748
    c.connect(node)
749
    c.run()
750
    result = c.getresult().get(node, False)
751
    if not result:
752
      return result
753
    return objects.SerializableConfigParser.Loads(str(result))
754

    
755
  def call_instance_os_import(self, node, inst, osdev, swapdev,
756
                              src_node, src_image, cluster_name):
757
    """Request the import of a backup into an instance.
758

759
    This is a single-node call.
760

761
    """
762
    params = [self._InstDict(inst), osdev, swapdev,
763
              src_node, src_image, cluster_name]
764
    c = Client("instance_os_import", params)
765
    c.connect(node)
766
    c.run()
767
    return c.getresult().get(node, False)
768

    
769
  def call_export_list(self, node_list):
770
    """Gets the stored exports list.
771

772
    This is a multi-node call.
773

774
    """
775
    c = Client("export_list", [])
776
    c.connect_list(node_list)
777
    c.run()
778
    result = c.getresult()
779
    return result
780

    
781
  def call_export_remove(self, node, export):
782
    """Requests removal of a given export.
783

784
    This is a single-node call.
785

786
    """
787
    c = Client("export_remove", [export])
788
    c.connect(node)
789
    c.run()
790
    return c.getresult().get(node, False)
791

    
792
  @staticmethod
793
  def call_node_leave_cluster(node):
794
    """Requests a node to clean the cluster information it has.
795

796
    This will remove the configuration information from the ganeti data
797
    dir.
798

799
    This is a single-node call.
800

801
    """
802
    c = Client("node_leave_cluster", [])
803
    c.connect(node)
804
    c.run()
805
    return c.getresult().get(node, False)
806

    
807
  def call_node_volumes(self, node_list):
808
    """Gets all volumes on node(s).
809

810
    This is a multi-node call.
811

812
    """
813
    c = Client("node_volumes", [])
814
    c.connect_list(node_list)
815
    c.run()
816
    return c.getresult()
817

    
818
  def call_test_delay(self, node_list, duration):
819
    """Sleep for a fixed time on given node(s).
820

821
    This is a multi-node call.
822

823
    """
824
    c = Client("test_delay", [duration])
825
    c.connect_list(node_list)
826
    c.run()
827
    return c.getresult()
828

    
829
  def call_file_storage_dir_create(self, node, file_storage_dir):
830
    """Create the given file storage directory.
831

832
    This is a single-node call.
833

834
    """
835
    c = Client("file_storage_dir_create", [file_storage_dir])
836
    c.connect(node)
837
    c.run()
838
    return c.getresult().get(node, False)
839

    
840
  def call_file_storage_dir_remove(self, node, file_storage_dir):
841
    """Remove the given file storage directory.
842

843
    This is a single-node call.
844

845
    """
846
    c = Client("file_storage_dir_remove", [file_storage_dir])
847
    c.connect(node)
848
    c.run()
849
    return c.getresult().get(node, False)
850

    
851
  def call_file_storage_dir_rename(self, node, old_file_storage_dir,
852
                                   new_file_storage_dir):
853
    """Rename file storage directory.
854

855
    This is a single-node call.
856

857
    """
858
    c = Client("file_storage_dir_rename",
859
               [old_file_storage_dir, new_file_storage_dir])
860
    c.connect(node)
861
    c.run()
862
    return c.getresult().get(node, False)
863

    
864
  @staticmethod
865
  def call_jobqueue_update(node_list, file_name, content):
866
    """Update job queue.
867

868
    This is a multi-node call.
869

870
    """
871
    c = Client("jobqueue_update", [file_name, content])
872
    c.connect_list(node_list)
873
    c.run()
874
    result = c.getresult()
875
    return result
876

    
877
  @staticmethod
878
  def call_jobqueue_purge(node):
879
    """Purge job queue.
880

881
    This is a single-node call.
882

883
    """
884
    c = Client("jobqueue_purge", [])
885
    c.connect(node)
886
    c.run()
887
    return c.getresult().get(node, False)
888

    
889
  @staticmethod
890
  def call_jobqueue_rename(node_list, old, new):
891
    """Rename a job queue file.
892

893
    This is a multi-node call.
894

895
    """
896
    c = Client("jobqueue_rename", [old, new])
897
    c.connect_list(node_list)
898
    c.run()
899
    result = c.getresult()
900
    return result
901

    
902

    
903
  @staticmethod
904
  def call_jobqueue_set_drain(node_list, drain_flag):
905
    """Set the drain flag on the queue.
906

907
    This is a multi-node call.
908

909
    @type node_list: list
910
    @param node_list: the list of nodes to query
911
    @type drain_flag: bool
912
    @param drain_flag: if True, will set the drain flag, otherwise reset it.
913

914
    """
915
    c = Client("jobqueue_set_drain", [drain_flag])
916
    c.connect_list(node_list)
917
    c.run()
918
    result = c.getresult()
919
    return result
920

    
921

    
922
  def call_hypervisor_validate_params(self, node_list, hvname, hvparams):
923
    """Validate the hypervisor params.
924

925
    This is a multi-node call.
926

927
    @type node_list: list
928
    @param node_list: the list of nodes to query
929
    @type hvname: string
930
    @param hvname: the hypervisor name
931
    @type hvparams: dict
932
    @param hvparams: the hypervisor parameters to be validated
933

934
    """
935
    cluster = self._cfg.GetClusterInfo()
936
    hv_full = cluster.FillDict(cluster.hvparams.get(hvname, {}), hvparams)
937
    c = Client("hypervisor_validate_params", [hvname, hv_full])
938
    c.connect_list(node_list)
939
    c.run()
940
    result = c.getresult()
941
    return result