Statistics
| Branch: | Tag: | Revision:

root / lib / rpc.py @ 26ba2bd8

History | View | Annotate | Download (23.4 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Script to show add a new node to the cluster
23

24
"""
25

    
26
# pylint: disable-msg=C0103,R0201,R0904
27
# C0103: Invalid name, since call_ are not valid
28
# R0201: Method could be a function, we keep all rpcs instance methods
29
# as not to change them back and forth between static/instance methods
30
# if they need to start using instance attributes
31
# R0904: Too many public methods
32

    
33
import os
34
import socket
35
import httplib
36

    
37
import simplejson
38

    
39
from ganeti import logger
40
from ganeti import utils
41
from ganeti import objects
42

    
43

    
44
class NodeController:
45
  """Node-handling class.
46

47
  For each node that we speak with, we create an instance of this
48
  class, so that we have a safe place to store the details of this
49
  individual call.
50

51
  """
52
  def __init__(self, parent, node):
53
    self.parent = parent
54
    self.node = node
55
    self.failed = False
56

    
57
    self.http_conn = hc = httplib.HTTPConnection(node, self.parent.port)
58
    try:
59
      hc.connect()
60
      hc.putrequest('PUT', "/%s" % self.parent.procedure,
61
                    skip_accept_encoding=True)
62
      hc.putheader('Content-Length', str(len(parent.body)))
63
      hc.endheaders()
64
      hc.send(parent.body)
65
    except socket.error, err:
66
      logger.Error("Error connecting to %s: %s" % (node, str(err)))
67
      self.failed = True
68

    
69
  def get_response(self):
70
    """Try to process the response from the node.
71

72
    """
73
    if self.failed:
74
      # we already failed in connect
75
      return False
76
    resp = self.http_conn.getresponse()
77
    if resp.status != 200:
78
      return False
79
    try:
80
      length = int(resp.getheader('Content-Length', '0'))
81
    except ValueError:
82
      return False
83
    if not length:
84
      logger.Error("Zero-length reply from %s" % self.node)
85
      return False
86
    payload = resp.read(length)
87
    unload = simplejson.loads(payload)
88
    return unload
89

    
90

    
91
class Client:
92
  """RPC Client class.
93

94
  This class, given a (remote) method name, a list of parameters and a
95
  list of nodes, will contact (in parallel) all nodes, and return a
96
  dict of results (key: node name, value: result).
97

98
  One current bug is that generic failure is still signalled by
99
  'False' result, which is not good. This overloading of values can
100
  cause bugs.
101

102
  """
103
  result_set = False
104
  result = False
105
  allresult = []
106

    
107
  def __init__(self, procedure, args):
108
    self.port = utils.GetNodeDaemonPort()
109
    self.nodepw = utils.GetNodeDaemonPassword()
110
    self.nc = {}
111
    self.results = {}
112
    self.procedure = procedure
113
    self.args = args
114
    self.body = simplejson.dumps(args)
115

    
116
  #--- generic connector -------------
117

    
118
  def connect_list(self, node_list):
119
    """Add a list of nodes to the target nodes.
120

121
    """
122
    for node in node_list:
123
      self.connect(node)
124

    
125
  def connect(self, connect_node):
126
    """Add a node to the target list.
127

128
    """
129
    self.nc[connect_node] = nc = NodeController(self, connect_node)
130

    
131
  def getresult(self):
132
    """Return the results of the call.
133

134
    """
135
    return self.results
136

    
137
  def run(self):
138
    """Wrapper over reactor.run().
139

140
    This function simply calls reactor.run() if we have any requests
141
    queued, otherwise it does nothing.
142

143
    """
144
    for node, nc in self.nc.items():
145
      self.results[node] = nc.get_response()
146

    
147

    
148
class RpcRunner(object):
149
  """RPC runner class"""
150

    
151
  def __init__(self, cfg):
152
    """Initialized the rpc runner.
153

154
    @type cfg:  C{config.ConfigWriter}
155
    @param cfg: the configuration object that will be used to get data
156
                about the cluster
157

158
    """
159
    self._cfg = cfg
160

    
161
  def _InstDict(self, instance):
162
    """Convert the given instance to a dict.
163

164
    This is done via the instance's ToDict() method and additionally
165
    we fill the hvparams with the cluster defaults.
166

167
    @type instance: L{objects.Instance}
168
    @param instance: an Instance object
169
    @rtype: dict
170
    @return: the instance dict, with the hvparams filled with the
171
        cluster defaults
172

173
    """
174
    idict = instance.ToDict()
175
    idict["hvparams"] = self._cfg.GetClusterInfo().FillHV(instance)
176
    return idict
177

    
178
  def call_volume_list(self, node_list, vg_name):
179
    """Gets the logical volumes present in a given volume group.
180

181
    This is a multi-node call.
182

183
    """
184
    c = Client("volume_list", [vg_name])
185
    c.connect_list(node_list)
186
    c.run()
187
    return c.getresult()
188

    
189
  def call_vg_list(self, node_list):
190
    """Gets the volume group list.
191

192
    This is a multi-node call.
193

194
    """
195
    c = Client("vg_list", [])
196
    c.connect_list(node_list)
197
    c.run()
198
    return c.getresult()
199

    
200
  def call_bridges_exist(self, node, bridges_list):
201
    """Checks if a node has all the bridges given.
202

203
    This method checks if all bridges given in the bridges_list are
204
    present on the remote node, so that an instance that uses interfaces
205
    on those bridges can be started.
206

207
    This is a single-node call.
208

209
    """
210
    c = Client("bridges_exist", [bridges_list])
211
    c.connect(node)
212
    c.run()
213
    return c.getresult().get(node, False)
214

    
215
  def call_instance_start(self, node, instance, extra_args):
216
    """Starts an instance.
217

218
    This is a single-node call.
219

220
    """
221
    c = Client("instance_start", [self._InstDict(instance), extra_args])
222
    c.connect(node)
223
    c.run()
224
    return c.getresult().get(node, False)
225

    
226
  def call_instance_shutdown(self, node, instance):
227
    """Stops an instance.
228

229
    This is a single-node call.
230

231
    """
232
    c = Client("instance_shutdown", [self._InstDict(instance)])
233
    c.connect(node)
234
    c.run()
235
    return c.getresult().get(node, False)
236

    
237
  def call_instance_migrate(self, node, instance, target, live):
238
    """Migrate an instance.
239

240
    This is a single-node call.
241

242
    @type node: string
243
    @param node: the node on which the instance is currently running
244
    @type instance: C{objects.Instance}
245
    @param instance: the instance definition
246
    @type target: string
247
    @param target: the target node name
248
    @type live: boolean
249
    @param live: whether the migration should be done live or not (the
250
        interpretation of this parameter is left to the hypervisor)
251

252
    """
253
    c = Client("instance_migrate", [self._InstDict(instance), target, live])
254
    c.connect(node)
255
    c.run()
256
    return c.getresult().get(node, False)
257

    
258
  def call_instance_reboot(self, node, instance, reboot_type, extra_args):
259
    """Reboots an instance.
260

261
    This is a single-node call.
262

263
    """
264
    c = Client("instance_reboot", [self._InstDict(instance),
265
                                   reboot_type, extra_args])
266
    c.connect(node)
267
    c.run()
268
    return c.getresult().get(node, False)
269

    
270
  def call_instance_os_add(self, node, inst, osdev, swapdev):
271
    """Installs an OS on the given instance.
272

273
    This is a single-node call.
274

275
    """
276
    params = [self._InstDict(inst), osdev, swapdev]
277
    c = Client("instance_os_add", params)
278
    c.connect(node)
279
    c.run()
280
    return c.getresult().get(node, False)
281

    
282
  def call_instance_run_rename(self, node, inst, old_name, osdev, swapdev):
283
    """Run the OS rename script for an instance.
284

285
    This is a single-node call.
286

287
    """
288
    params = [self._InstDict(inst), old_name, osdev, swapdev]
289
    c = Client("instance_run_rename", params)
290
    c.connect(node)
291
    c.run()
292
    return c.getresult().get(node, False)
293

    
294
  def call_instance_info(self, node, instance, hname):
295
    """Returns information about a single instance.
296

297
    This is a single-node call.
298

299
    @type node_list: list
300
    @param node_list: the list of nodes to query
301
    @type instance: string
302
    @param instance: the instance name
303
    @type hname: string
304
    @param hname: the hypervisor type of the instance
305

306
    """
307
    c = Client("instance_info", [instance, hname])
308
    c.connect(node)
309
    c.run()
310
    return c.getresult().get(node, False)
311

    
312
  def call_all_instances_info(self, node_list, hypervisor_list):
313
    """Returns information about all instances on the given nodes.
314

315
    This is a multi-node call.
316

317
    @type node_list: list
318
    @param node_list: the list of nodes to query
319
    @type hypervisor_list: list
320
    @param hypervisor_list: the hypervisors to query for instances
321

322
    """
323
    c = Client("all_instances_info", [hypervisor_list])
324
    c.connect_list(node_list)
325
    c.run()
326
    return c.getresult()
327

    
328
  def call_instance_list(self, node_list, hypervisor_list):
329
    """Returns the list of running instances on a given node.
330

331
    This is a multi-node call.
332

333
    @type node_list: list
334
    @param node_list: the list of nodes to query
335
    @type hypervisor_list: list
336
    @param hypervisor_list: the hypervisors to query for instances
337

338
    """
339
    c = Client("instance_list", [hypervisor_list])
340
    c.connect_list(node_list)
341
    c.run()
342
    return c.getresult()
343

    
344
  def call_node_tcp_ping(self, node, source, target, port, timeout,
345
                         live_port_needed):
346
    """Do a TcpPing on the remote node
347

348
    This is a single-node call.
349

350
    """
351
    c = Client("node_tcp_ping", [source, target, port, timeout,
352
                                 live_port_needed])
353
    c.connect(node)
354
    c.run()
355
    return c.getresult().get(node, False)
356

    
357
  def call_node_has_ip_address(self, node, address):
358
    """Checks if a node has the given IP address.
359

360
    This is a single-node call.
361

362
    """
363
    c = Client("node_has_ip_address", [address])
364
    c.connect(node)
365
    c.run()
366
    return c.getresult().get(node, False)
367

    
368
  def call_node_info(self, node_list, vg_name, hypervisor_type):
369
    """Return node information.
370

371
    This will return memory information and volume group size and free
372
    space.
373

374
    This is a multi-node call.
375

376
    @type node_list: list
377
    @param node_list: the list of nodes to query
378
    @type vgname: C{string}
379
    @param vgname: the name of the volume group to ask for disk space
380
        information
381
    @type hypervisor_type: C{str}
382
    @param hypervisor_type: the name of the hypervisor to ask for
383
        memory information
384

385
    """
386
    c = Client("node_info", [vg_name, hypervisor_type])
387
    c.connect_list(node_list)
388
    c.run()
389
    retux = c.getresult()
390

    
391
    for node_name in retux:
392
      ret = retux.get(node_name, False)
393
      if type(ret) != dict:
394
        logger.Error("could not connect to node %s" % (node_name))
395
        ret = {}
396

    
397
      utils.CheckDict(ret,
398
                      { 'memory_total' : '-',
399
                        'memory_dom0' : '-',
400
                        'memory_free' : '-',
401
                        'vg_size' : 'node_unreachable',
402
                        'vg_free' : '-' },
403
                      "call_node_info",
404
                      )
405
    return retux
406

    
407
  def call_node_add(self, node, dsa, dsapub, rsa, rsapub, ssh, sshpub):
408
    """Add a node to the cluster.
409

410
    This is a single-node call.
411

412
    """
413
    params = [dsa, dsapub, rsa, rsapub, ssh, sshpub]
414
    c = Client("node_add", params)
415
    c.connect(node)
416
    c.run()
417
    return c.getresult().get(node, False)
418

    
419
  def call_node_verify(self, node_list, checkdict, cluster_name):
420
    """Request verification of given parameters.
421

422
    This is a multi-node call.
423

424
    """
425
    c = Client("node_verify", [checkdict, cluster_name])
426
    c.connect_list(node_list)
427
    c.run()
428
    return c.getresult()
429

    
430
  @staticmethod
431
  def call_node_start_master(node, start_daemons):
432
    """Tells a node to activate itself as a master.
433

434
    This is a single-node call.
435

436
    """
437
    c = Client("node_start_master", [start_daemons])
438
    c.connect(node)
439
    c.run()
440
    return c.getresult().get(node, False)
441

    
442
  @staticmethod
443
  def call_node_stop_master(node, stop_daemons):
444
    """Tells a node to demote itself from master status.
445

446
    This is a single-node call.
447

448
    """
449
    c = Client("node_stop_master", [stop_daemons])
450
    c.connect(node)
451
    c.run()
452
    return c.getresult().get(node, False)
453

    
454
  @staticmethod
455
  def call_master_info(node_list):
456
    """Query master info.
457

458
    This is a multi-node call.
459

460
    """
461
    # TODO: should this method query down nodes?
462
    c = Client("master_info", [])
463
    c.connect_list(node_list)
464
    c.run()
465
    return c.getresult()
466

    
467
  def call_version(self, node_list):
468
    """Query node version.
469

470
    This is a multi-node call.
471

472
    """
473
    c = Client("version", [])
474
    c.connect_list(node_list)
475
    c.run()
476
    return c.getresult()
477

    
478
  def call_blockdev_create(self, node, bdev, size, owner, on_primary, info):
479
    """Request creation of a given block device.
480

481
    This is a single-node call.
482

483
    """
484
    params = [bdev.ToDict(), size, owner, on_primary, info]
485
    c = Client("blockdev_create", params)
486
    c.connect(node)
487
    c.run()
488
    return c.getresult().get(node, False)
489

    
490
  def call_blockdev_remove(self, node, bdev):
491
    """Request removal of a given block device.
492

493
    This is a single-node call.
494

495
    """
496
    c = Client("blockdev_remove", [bdev.ToDict()])
497
    c.connect(node)
498
    c.run()
499
    return c.getresult().get(node, False)
500

    
501
  def call_blockdev_rename(self, node, devlist):
502
    """Request rename of the given block devices.
503

504
    This is a single-node call.
505

506
    """
507
    params = [(d.ToDict(), uid) for d, uid in devlist]
508
    c = Client("blockdev_rename", params)
509
    c.connect(node)
510
    c.run()
511
    return c.getresult().get(node, False)
512

    
513
  def call_blockdev_assemble(self, node, disk, owner, on_primary):
514
    """Request assembling of a given block device.
515

516
    This is a single-node call.
517

518
    """
519
    params = [disk.ToDict(), owner, on_primary]
520
    c = Client("blockdev_assemble", params)
521
    c.connect(node)
522
    c.run()
523
    return c.getresult().get(node, False)
524

    
525
  def call_blockdev_shutdown(self, node, disk):
526
    """Request shutdown of a given block device.
527

528
    This is a single-node call.
529

530
    """
531
    c = Client("blockdev_shutdown", [disk.ToDict()])
532
    c.connect(node)
533
    c.run()
534
    return c.getresult().get(node, False)
535

    
536
  def call_blockdev_addchildren(self, node, bdev, ndevs):
537
    """Request adding a list of children to a (mirroring) device.
538

539
    This is a single-node call.
540

541
    """
542
    params = [bdev.ToDict(), [disk.ToDict() for disk in ndevs]]
543
    c = Client("blockdev_addchildren", params)
544
    c.connect(node)
545
    c.run()
546
    return c.getresult().get(node, False)
547

    
548
  def call_blockdev_removechildren(self, node, bdev, ndevs):
549
    """Request removing a list of children from a (mirroring) device.
550

551
    This is a single-node call.
552

553
    """
554
    params = [bdev.ToDict(), [disk.ToDict() for disk in ndevs]]
555
    c = Client("blockdev_removechildren", params)
556
    c.connect(node)
557
    c.run()
558
    return c.getresult().get(node, False)
559

    
560
  def call_blockdev_getmirrorstatus(self, node, disks):
561
    """Request status of a (mirroring) device.
562

563
    This is a single-node call.
564

565
    """
566
    params = [dsk.ToDict() for dsk in disks]
567
    c = Client("blockdev_getmirrorstatus", params)
568
    c.connect(node)
569
    c.run()
570
    return c.getresult().get(node, False)
571

    
572
  def call_blockdev_find(self, node, disk):
573
    """Request identification of a given block device.
574

575
    This is a single-node call.
576

577
    """
578
    c = Client("blockdev_find", [disk.ToDict()])
579
    c.connect(node)
580
    c.run()
581
    return c.getresult().get(node, False)
582

    
583
  def call_blockdev_close(self, node, disks):
584
    """Closes the given block devices.
585

586
    This is a single-node call.
587

588
    """
589
    params = [cf.ToDict() for cf in disks]
590
    c = Client("blockdev_close", params)
591
    c.connect(node)
592
    c.run()
593
    return c.getresult().get(node, False)
594

    
595
  @staticmethod
596
  def call_upload_file(node_list, file_name):
597
    """Upload a file.
598

599
    The node will refuse the operation in case the file is not on the
600
    approved file list.
601

602
    This is a multi-node call.
603

604
    """
605
    fh = file(file_name)
606
    try:
607
      data = fh.read()
608
    finally:
609
      fh.close()
610
    st = os.stat(file_name)
611
    params = [file_name, data, st.st_mode, st.st_uid, st.st_gid,
612
              st.st_atime, st.st_mtime]
613
    c = Client("upload_file", params)
614
    c.connect_list(node_list)
615
    c.run()
616
    return c.getresult()
617

    
618
  def call_os_diagnose(self, node_list):
619
    """Request a diagnose of OS definitions.
620

621
    This is a multi-node call.
622

623
    """
624
    c = Client("os_diagnose", [])
625
    c.connect_list(node_list)
626
    c.run()
627
    result = c.getresult()
628
    new_result = {}
629
    for node_name in result:
630
      if result[node_name]:
631
        nr = [objects.OS.FromDict(oss) for oss in result[node_name]]
632
      else:
633
        nr = []
634
      new_result[node_name] = nr
635
    return new_result
636

    
637
  def call_os_get(self, node, name):
638
    """Returns an OS definition.
639

640
    This is a single-node call.
641

642
    """
643
    c = Client("os_get", [name])
644
    c.connect(node)
645
    c.run()
646
    result = c.getresult().get(node, False)
647
    if isinstance(result, dict):
648
      return objects.OS.FromDict(result)
649
    else:
650
      return result
651

    
652
  def call_hooks_runner(self, node_list, hpath, phase, env):
653
    """Call the hooks runner.
654

655
    Args:
656
      - op: the OpCode instance
657
      - env: a dictionary with the environment
658

659
    This is a multi-node call.
660

661
    """
662
    params = [hpath, phase, env]
663
    c = Client("hooks_runner", params)
664
    c.connect_list(node_list)
665
    c.run()
666
    result = c.getresult()
667
    return result
668

    
669
  def call_iallocator_runner(self, node, name, idata):
670
    """Call an iallocator on a remote node
671

672
    Args:
673
      - name: the iallocator name
674
      - input: the json-encoded input string
675

676
    This is a single-node call.
677

678
    """
679
    params = [name, idata]
680
    c = Client("iallocator_runner", params)
681
    c.connect(node)
682
    c.run()
683
    result = c.getresult().get(node, False)
684
    return result
685

    
686
  def call_blockdev_grow(self, node, cf_bdev, amount):
687
    """Request a snapshot of the given block device.
688

689
    This is a single-node call.
690

691
    """
692
    c = Client("blockdev_grow", [cf_bdev.ToDict(), amount])
693
    c.connect(node)
694
    c.run()
695
    return c.getresult().get(node, False)
696

    
697
  def call_blockdev_snapshot(self, node, cf_bdev):
698
    """Request a snapshot of the given block device.
699

700
    This is a single-node call.
701

702
    """
703
    c = Client("blockdev_snapshot", [cf_bdev.ToDict()])
704
    c.connect(node)
705
    c.run()
706
    return c.getresult().get(node, False)
707

    
708
  def call_snapshot_export(self, node, snap_bdev, dest_node, instance,
709
                           cluster_name):
710
    """Request the export of a given snapshot.
711

712
    This is a single-node call.
713

714
    """
715
    params = [snap_bdev.ToDict(), dest_node,
716
              self._InstDict(instance), cluster_name]
717
    c = Client("snapshot_export", params)
718
    c.connect(node)
719
    c.run()
720
    return c.getresult().get(node, False)
721

    
722
  def call_finalize_export(self, node, instance, snap_disks):
723
    """Request the completion of an export operation.
724

725
    This writes the export config file, etc.
726

727
    This is a single-node call.
728

729
    """
730
    flat_disks = []
731
    for disk in snap_disks:
732
      flat_disks.append(disk.ToDict())
733
    params = [self._InstDict(instance), flat_disks]
734
    c = Client("finalize_export", params)
735
    c.connect(node)
736
    c.run()
737
    return c.getresult().get(node, False)
738

    
739
  def call_export_info(self, node, path):
740
    """Queries the export information in a given path.
741

742
    This is a single-node call.
743

744
    """
745
    c = Client("export_info", [path])
746
    c.connect(node)
747
    c.run()
748
    result = c.getresult().get(node, False)
749
    if not result:
750
      return result
751
    return objects.SerializableConfigParser.Loads(str(result))
752

    
753
  def call_instance_os_import(self, node, inst, osdev, swapdev,
754
                              src_node, src_image, cluster_name):
755
    """Request the import of a backup into an instance.
756

757
    This is a single-node call.
758

759
    """
760
    params = [self._InstDict(inst), osdev, swapdev,
761
              src_node, src_image, cluster_name]
762
    c = Client("instance_os_import", params)
763
    c.connect(node)
764
    c.run()
765
    return c.getresult().get(node, False)
766

    
767
  def call_export_list(self, node_list):
768
    """Gets the stored exports list.
769

770
    This is a multi-node call.
771

772
    """
773
    c = Client("export_list", [])
774
    c.connect_list(node_list)
775
    c.run()
776
    result = c.getresult()
777
    return result
778

    
779
  def call_export_remove(self, node, export):
780
    """Requests removal of a given export.
781

782
    This is a single-node call.
783

784
    """
785
    c = Client("export_remove", [export])
786
    c.connect(node)
787
    c.run()
788
    return c.getresult().get(node, False)
789

    
790
  @staticmethod
791
  def call_node_leave_cluster(node):
792
    """Requests a node to clean the cluster information it has.
793

794
    This will remove the configuration information from the ganeti data
795
    dir.
796

797
    This is a single-node call.
798

799
    """
800
    c = Client("node_leave_cluster", [])
801
    c.connect(node)
802
    c.run()
803
    return c.getresult().get(node, False)
804

    
805
  def call_node_volumes(self, node_list):
806
    """Gets all volumes on node(s).
807

808
    This is a multi-node call.
809

810
    """
811
    c = Client("node_volumes", [])
812
    c.connect_list(node_list)
813
    c.run()
814
    return c.getresult()
815

    
816
  def call_test_delay(self, node_list, duration):
817
    """Sleep for a fixed time on given node(s).
818

819
    This is a multi-node call.
820

821
    """
822
    c = Client("test_delay", [duration])
823
    c.connect_list(node_list)
824
    c.run()
825
    return c.getresult()
826

    
827
  def call_file_storage_dir_create(self, node, file_storage_dir):
828
    """Create the given file storage directory.
829

830
    This is a single-node call.
831

832
    """
833
    c = Client("file_storage_dir_create", [file_storage_dir])
834
    c.connect(node)
835
    c.run()
836
    return c.getresult().get(node, False)
837

    
838
  def call_file_storage_dir_remove(self, node, file_storage_dir):
839
    """Remove the given file storage directory.
840

841
    This is a single-node call.
842

843
    """
844
    c = Client("file_storage_dir_remove", [file_storage_dir])
845
    c.connect(node)
846
    c.run()
847
    return c.getresult().get(node, False)
848

    
849
  def call_file_storage_dir_rename(self, node, old_file_storage_dir,
850
                                   new_file_storage_dir):
851
    """Rename file storage directory.
852

853
    This is a single-node call.
854

855
    """
856
    c = Client("file_storage_dir_rename",
857
               [old_file_storage_dir, new_file_storage_dir])
858
    c.connect(node)
859
    c.run()
860
    return c.getresult().get(node, False)
861

    
862
  @staticmethod
863
  def call_jobqueue_update(node_list, file_name, content):
864
    """Update job queue.
865

866
    This is a multi-node call.
867

868
    """
869
    c = Client("jobqueue_update", [file_name, content])
870
    c.connect_list(node_list)
871
    c.run()
872
    result = c.getresult()
873
    return result
874

    
875
  @staticmethod
876
  def call_jobqueue_purge(node):
877
    """Purge job queue.
878

879
    This is a single-node call.
880

881
    """
882
    c = Client("jobqueue_purge", [])
883
    c.connect(node)
884
    c.run()
885
    return c.getresult().get(node, False)
886

    
887
  @staticmethod
888
  def call_jobqueue_rename(node_list, old, new):
889
    """Rename a job queue file.
890

891
    This is a multi-node call.
892

893
    """
894
    c = Client("jobqueue_rename", [old, new])
895
    c.connect_list(node_list)
896
    c.run()
897
    result = c.getresult()
898
    return result
899

    
900

    
901
  def call_hypervisor_validate_params(self, node_list, hvname, hvparams):
902
    """Validate the hypervisor params.
903

904
    This is a multi-node call.
905

906
    @type node_list: list
907
    @param node_list: the list of nodes to query
908
    @type hvname: string
909
    @param hvname: the hypervisor name
910
    @type hvparams: dict
911
    @param hvparams: the hypervisor parameters to be validated
912

913
    """
914
    cluster = self._cfg.GetClusterInfo()
915
    hv_full = cluster.FillDict(cluster.hvparams.get(hvname, {}), hvparams)
916
    c = Client("hypervisor_validate_params", [hvname, hv_full])
917
    c.connect_list(node_list)
918
    c.run()
919
    result = c.getresult()
920
    return result