Statistics
| Branch: | Tag: | Revision:

root / lib / rpc.py @ bdf7d8c0

History | View | Annotate | Download (25.4 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Inter-node RPC library.
23

24
"""
25

    
26
# pylint: disable-msg=C0103,R0201,R0904
27
# C0103: Invalid name, since call_ are not valid
28
# R0201: Method could be a function, we keep all rpcs instance methods
29
# as not to change them back and forth between static/instance methods
30
# if they need to start using instance attributes
31
# R0904: Too many public methods
32

    
33
import os
34
import socket
35
import httplib
36
import logging
37

    
38
import simplejson
39

    
40
from ganeti import utils
41
from ganeti import objects
42

    
43

    
44
class NodeController:
45
  """Node-handling class.
46

47
  For each node that we speak with, we create an instance of this
48
  class, so that we have a safe place to store the details of this
49
  individual call.
50

51
  """
52
  def __init__(self, parent, node, address=None):
53
    """Constructor for the node controller.
54

55
    @type parent: L{Client}
56
    @param parent: the C{Client} instance which holds global parameters for
57
        the call
58
    @type node: str
59
    @param node: the name of the node we connect to; it is used for error
60
        messages and in cases we the address paramater is not passed
61
    @type address: str
62
    @keyword address: the node's address, in case we know it, so that we
63
        don't need to resolve it; testing shows that httplib has high
64
        overhead in resolving addresses (even when speficied in /etc/hosts)
65

66
    """
67
    self.parent = parent
68
    self.node = node
69
    if address is None:
70
      address = node
71
    self.failed = False
72

    
73
    self.http_conn = hc = httplib.HTTPConnection(address, parent.port)
74
    try:
75
      hc.connect()
76
      hc.putrequest('PUT', "/%s" % parent.procedure,
77
                    skip_accept_encoding=True)
78
      hc.putheader('Content-Length', parent.body_length)
79
      hc.endheaders()
80
      hc.send(parent.body)
81
    except socket.error:
82
      logging.exception("Error connecting to node %s", node)
83
      self.failed = True
84

    
85
  def GetResponse(self):
86
    """Try to process the response from the node.
87

88
    """
89
    if self.failed:
90
      # we already failed in connect
91
      return False
92
    resp = self.http_conn.getresponse()
93
    if resp.status != 200:
94
      return False
95
    try:
96
      length = int(resp.getheader('Content-Length', '0'))
97
    except ValueError:
98
      return False
99
    if not length:
100
      logging.error("Zero-length reply from node %s", self.node)
101
      return False
102
    payload = resp.read(length)
103
    unload = simplejson.loads(payload)
104
    return unload
105

    
106

    
107
class Client:
108
  """RPC Client class.
109

110
  This class, given a (remote) method name, a list of parameters and a
111
  list of nodes, will contact (in parallel) all nodes, and return a
112
  dict of results (key: node name, value: result).
113

114
  One current bug is that generic failure is still signalled by
115
  'False' result, which is not good. This overloading of values can
116
  cause bugs.
117

118
  @var body_length: cached string value of the length of the body (so that
119
      individual C{NodeController} instances don't have to recompute it)
120

121
  """
122
  result_set = False
123
  result = False
124
  allresult = []
125

    
126
  def __init__(self, procedure, args):
127
    self.port = utils.GetNodeDaemonPort()
128
    self.nodepw = utils.GetNodeDaemonPassword()
129
    self.nc = {}
130
    self.results = {}
131
    self.procedure = procedure
132
    self.args = args
133
    self.body = simplejson.dumps(args)
134
    self.body_length = str(len(self.body))
135

    
136
  #--- generic connector -------------
137

    
138
  def ConnectList(self, node_list, address_list=None):
139
    """Add a list of nodes to the target nodes.
140

141
    @type node_list: list
142
    @param node_list: the list of node names to connect
143
    @type address_list: list or None
144
    @keyword address_list: either None or a list with node addresses,
145
        which must have the same length as the node list
146

147
    """
148
    if address_list is None:
149
      address_list = [None for _ in node_list]
150
    else:
151
      assert len(node_list) == len(address_list), \
152
             "Name and address lists should have the same length"
153
    for node, address in zip(node_list, address_list):
154
      self.ConnectNode(node, address)
155

    
156
  def ConnectNode(self, name, address=None):
157
    """Add a node to the target list.
158

159
    @type name: str
160
    @param name: the node name
161
    @type address: str
162
    @keyword address: the node address, if known
163

164
    """
165
    self.nc[name] = NodeController(self, name, address)
166

    
167
  def GetResults(self):
168
    """Return the results of the call.
169

170
    """
171
    return self.results
172

    
173
  def Run(self):
174
    """Gather results from the node controllers.
175

176
    This function simply calls GetResponse() for each of our node
177
    controllers.
178

179
    """
180
    for node, nc in self.nc.items():
181
      self.results[node] = nc.GetResponse()
182

    
183

    
184
class RpcRunner(object):
185
  """RPC runner class"""
186

    
187
  def __init__(self, cfg):
188
    """Initialized the rpc runner.
189

190
    @type cfg:  C{config.ConfigWriter}
191
    @param cfg: the configuration object that will be used to get data
192
                about the cluster
193

194
    """
195
    self._cfg = cfg
196

    
197
  def _InstDict(self, instance):
198
    """Convert the given instance to a dict.
199

200
    This is done via the instance's ToDict() method and additionally
201
    we fill the hvparams with the cluster defaults.
202

203
    @type instance: L{objects.Instance}
204
    @param instance: an Instance object
205
    @rtype: dict
206
    @return: the instance dict, with the hvparams filled with the
207
        cluster defaults
208

209
    """
210
    idict = instance.ToDict()
211
    cluster = self._cfg.GetClusterInfo()
212
    idict["hvparams"] = cluster.FillHV(instance)
213
    idict["beparams"] = cluster.FillBE(instance)
214
    return idict
215

    
216
  def call_volume_list(self, node_list, vg_name):
217
    """Gets the logical volumes present in a given volume group.
218

219
    This is a multi-node call.
220

221
    """
222
    c = Client("volume_list", [vg_name])
223
    c.ConnectList(node_list)
224
    c.Run()
225
    return c.GetResults()
226

    
227
  def call_vg_list(self, node_list):
228
    """Gets the volume group list.
229

230
    This is a multi-node call.
231

232
    """
233
    c = Client("vg_list", [])
234
    c.ConnectList(node_list)
235
    c.Run()
236
    return c.GetResults()
237

    
238
  def call_bridges_exist(self, node, bridges_list):
239
    """Checks if a node has all the bridges given.
240

241
    This method checks if all bridges given in the bridges_list are
242
    present on the remote node, so that an instance that uses interfaces
243
    on those bridges can be started.
244

245
    This is a single-node call.
246

247
    """
248
    c = Client("bridges_exist", [bridges_list])
249
    c.ConnectNode(node)
250
    c.Run()
251
    return c.GetResults().get(node, False)
252

    
253
  def call_instance_start(self, node, instance, extra_args):
254
    """Starts an instance.
255

256
    This is a single-node call.
257

258
    """
259
    c = Client("instance_start", [self._InstDict(instance), extra_args])
260
    c.ConnectNode(node)
261
    c.Run()
262
    return c.GetResults().get(node, False)
263

    
264
  def call_instance_shutdown(self, node, instance):
265
    """Stops an instance.
266

267
    This is a single-node call.
268

269
    """
270
    c = Client("instance_shutdown", [self._InstDict(instance)])
271
    c.ConnectNode(node)
272
    c.Run()
273
    return c.GetResults().get(node, False)
274

    
275
  def call_instance_migrate(self, node, instance, target, live):
276
    """Migrate an instance.
277

278
    This is a single-node call.
279

280
    @type node: string
281
    @param node: the node on which the instance is currently running
282
    @type instance: C{objects.Instance}
283
    @param instance: the instance definition
284
    @type target: string
285
    @param target: the target node name
286
    @type live: boolean
287
    @param live: whether the migration should be done live or not (the
288
        interpretation of this parameter is left to the hypervisor)
289

290
    """
291
    c = Client("instance_migrate", [self._InstDict(instance), target, live])
292
    c.ConnectNode(node)
293
    c.Run()
294
    return c.GetResults().get(node, False)
295

    
296
  def call_instance_reboot(self, node, instance, reboot_type, extra_args):
297
    """Reboots an instance.
298

299
    This is a single-node call.
300

301
    """
302
    c = Client("instance_reboot", [self._InstDict(instance),
303
                                   reboot_type, extra_args])
304
    c.ConnectNode(node)
305
    c.Run()
306
    return c.GetResults().get(node, False)
307

    
308
  def call_instance_os_add(self, node, inst):
309
    """Installs an OS on the given instance.
310

311
    This is a single-node call.
312

313
    """
314
    params = [self._InstDict(inst)]
315
    c = Client("instance_os_add", params)
316
    c.ConnectNode(node)
317
    c.Run()
318
    return c.GetResults().get(node, False)
319

    
320
  def call_instance_run_rename(self, node, inst, old_name):
321
    """Run the OS rename script for an instance.
322

323
    This is a single-node call.
324

325
    """
326
    params = [self._InstDict(inst), old_name]
327
    c = Client("instance_run_rename", params)
328
    c.ConnectNode(node)
329
    c.Run()
330
    return c.GetResults().get(node, False)
331

    
332
  def call_instance_info(self, node, instance, hname):
333
    """Returns information about a single instance.
334

335
    This is a single-node call.
336

337
    @type node_list: list
338
    @param node_list: the list of nodes to query
339
    @type instance: string
340
    @param instance: the instance name
341
    @type hname: string
342
    @param hname: the hypervisor type of the instance
343

344
    """
345
    c = Client("instance_info", [instance, hname])
346
    c.ConnectNode(node)
347
    c.Run()
348
    return c.GetResults().get(node, False)
349

    
350
  def call_all_instances_info(self, node_list, hypervisor_list):
351
    """Returns information about all instances on the given nodes.
352

353
    This is a multi-node call.
354

355
    @type node_list: list
356
    @param node_list: the list of nodes to query
357
    @type hypervisor_list: list
358
    @param hypervisor_list: the hypervisors to query for instances
359

360
    """
361
    c = Client("all_instances_info", [hypervisor_list])
362
    c.ConnectList(node_list)
363
    c.Run()
364
    return c.GetResults()
365

    
366
  def call_instance_list(self, node_list, hypervisor_list):
367
    """Returns the list of running instances on a given node.
368

369
    This is a multi-node call.
370

371
    @type node_list: list
372
    @param node_list: the list of nodes to query
373
    @type hypervisor_list: list
374
    @param hypervisor_list: the hypervisors to query for instances
375

376
    """
377
    c = Client("instance_list", [hypervisor_list])
378
    c.ConnectList(node_list)
379
    c.Run()
380
    return c.GetResults()
381

    
382
  def call_node_tcp_ping(self, node, source, target, port, timeout,
383
                         live_port_needed):
384
    """Do a TcpPing on the remote node
385

386
    This is a single-node call.
387

388
    """
389
    c = Client("node_tcp_ping", [source, target, port, timeout,
390
                                 live_port_needed])
391
    c.ConnectNode(node)
392
    c.Run()
393
    return c.GetResults().get(node, False)
394

    
395
  def call_node_has_ip_address(self, node, address):
396
    """Checks if a node has the given IP address.
397

398
    This is a single-node call.
399

400
    """
401
    c = Client("node_has_ip_address", [address])
402
    c.ConnectNode(node)
403
    c.Run()
404
    return c.GetResults().get(node, False)
405

    
406
  def call_node_info(self, node_list, vg_name, hypervisor_type):
407
    """Return node information.
408

409
    This will return memory information and volume group size and free
410
    space.
411

412
    This is a multi-node call.
413

414
    @type node_list: list
415
    @param node_list: the list of nodes to query
416
    @type vgname: C{string}
417
    @param vgname: the name of the volume group to ask for disk space
418
        information
419
    @type hypervisor_type: C{str}
420
    @param hypervisor_type: the name of the hypervisor to ask for
421
        memory information
422

423
    """
424
    c = Client("node_info", [vg_name, hypervisor_type])
425
    c.ConnectList(node_list)
426
    c.Run()
427
    retux = c.GetResults()
428

    
429
    for node_name in retux:
430
      ret = retux.get(node_name, False)
431
      if type(ret) != dict:
432
        logging.error("could not connect to node %s", node_name)
433
        ret = {}
434

    
435
      utils.CheckDict(ret,
436
                      { 'memory_total' : '-',
437
                        'memory_dom0' : '-',
438
                        'memory_free' : '-',
439
                        'vg_size' : 'node_unreachable',
440
                        'vg_free' : '-' },
441
                      "call_node_info",
442
                      )
443
    return retux
444

    
445
  def call_node_add(self, node, dsa, dsapub, rsa, rsapub, ssh, sshpub):
446
    """Add a node to the cluster.
447

448
    This is a single-node call.
449

450
    """
451
    params = [dsa, dsapub, rsa, rsapub, ssh, sshpub]
452
    c = Client("node_add", params)
453
    c.ConnectNode(node)
454
    c.Run()
455
    return c.GetResults().get(node, False)
456

    
457
  def call_node_verify(self, node_list, checkdict, cluster_name):
458
    """Request verification of given parameters.
459

460
    This is a multi-node call.
461

462
    """
463
    c = Client("node_verify", [checkdict, cluster_name])
464
    c.ConnectList(node_list)
465
    c.Run()
466
    return c.GetResults()
467

    
468
  @staticmethod
469
  def call_node_start_master(node, start_daemons):
470
    """Tells a node to activate itself as a master.
471

472
    This is a single-node call.
473

474
    """
475
    c = Client("node_start_master", [start_daemons])
476
    c.ConnectNode(node)
477
    c.Run()
478
    return c.GetResults().get(node, False)
479

    
480
  @staticmethod
481
  def call_node_stop_master(node, stop_daemons):
482
    """Tells a node to demote itself from master status.
483

484
    This is a single-node call.
485

486
    """
487
    c = Client("node_stop_master", [stop_daemons])
488
    c.ConnectNode(node)
489
    c.Run()
490
    return c.GetResults().get(node, False)
491

    
492
  @staticmethod
493
  def call_master_info(node_list):
494
    """Query master info.
495

496
    This is a multi-node call.
497

498
    """
499
    # TODO: should this method query down nodes?
500
    c = Client("master_info", [])
501
    c.ConnectList(node_list)
502
    c.Run()
503
    return c.GetResults()
504

    
505
  def call_version(self, node_list):
506
    """Query node version.
507

508
    This is a multi-node call.
509

510
    """
511
    c = Client("version", [])
512
    c.ConnectList(node_list)
513
    c.Run()
514
    return c.GetResults()
515

    
516
  def call_blockdev_create(self, node, bdev, size, owner, on_primary, info):
517
    """Request creation of a given block device.
518

519
    This is a single-node call.
520

521
    """
522
    params = [bdev.ToDict(), size, owner, on_primary, info]
523
    c = Client("blockdev_create", params)
524
    c.ConnectNode(node)
525
    c.Run()
526
    return c.GetResults().get(node, False)
527

    
528
  def call_blockdev_remove(self, node, bdev):
529
    """Request removal of a given block device.
530

531
    This is a single-node call.
532

533
    """
534
    c = Client("blockdev_remove", [bdev.ToDict()])
535
    c.ConnectNode(node)
536
    c.Run()
537
    return c.GetResults().get(node, False)
538

    
539
  def call_blockdev_rename(self, node, devlist):
540
    """Request rename of the given block devices.
541

542
    This is a single-node call.
543

544
    """
545
    params = [(d.ToDict(), uid) for d, uid in devlist]
546
    c = Client("blockdev_rename", params)
547
    c.ConnectNode(node)
548
    c.Run()
549
    return c.GetResults().get(node, False)
550

    
551
  def call_blockdev_assemble(self, node, disk, owner, on_primary):
552
    """Request assembling of a given block device.
553

554
    This is a single-node call.
555

556
    """
557
    params = [disk.ToDict(), owner, on_primary]
558
    c = Client("blockdev_assemble", params)
559
    c.ConnectNode(node)
560
    c.Run()
561
    return c.GetResults().get(node, False)
562

    
563
  def call_blockdev_shutdown(self, node, disk):
564
    """Request shutdown of a given block device.
565

566
    This is a single-node call.
567

568
    """
569
    c = Client("blockdev_shutdown", [disk.ToDict()])
570
    c.ConnectNode(node)
571
    c.Run()
572
    return c.GetResults().get(node, False)
573

    
574
  def call_blockdev_addchildren(self, node, bdev, ndevs):
575
    """Request adding a list of children to a (mirroring) device.
576

577
    This is a single-node call.
578

579
    """
580
    params = [bdev.ToDict(), [disk.ToDict() for disk in ndevs]]
581
    c = Client("blockdev_addchildren", params)
582
    c.ConnectNode(node)
583
    c.Run()
584
    return c.GetResults().get(node, False)
585

    
586
  def call_blockdev_removechildren(self, node, bdev, ndevs):
587
    """Request removing a list of children from a (mirroring) device.
588

589
    This is a single-node call.
590

591
    """
592
    params = [bdev.ToDict(), [disk.ToDict() for disk in ndevs]]
593
    c = Client("blockdev_removechildren", params)
594
    c.ConnectNode(node)
595
    c.Run()
596
    return c.GetResults().get(node, False)
597

    
598
  def call_blockdev_getmirrorstatus(self, node, disks):
599
    """Request status of a (mirroring) device.
600

601
    This is a single-node call.
602

603
    """
604
    params = [dsk.ToDict() for dsk in disks]
605
    c = Client("blockdev_getmirrorstatus", params)
606
    c.ConnectNode(node)
607
    c.Run()
608
    return c.GetResults().get(node, False)
609

    
610
  def call_blockdev_find(self, node, disk):
611
    """Request identification of a given block device.
612

613
    This is a single-node call.
614

615
    """
616
    c = Client("blockdev_find", [disk.ToDict()])
617
    c.ConnectNode(node)
618
    c.Run()
619
    return c.GetResults().get(node, False)
620

    
621
  def call_blockdev_close(self, node, disks):
622
    """Closes the given block devices.
623

624
    This is a single-node call.
625

626
    """
627
    params = [cf.ToDict() for cf in disks]
628
    c = Client("blockdev_close", params)
629
    c.ConnectNode(node)
630
    c.Run()
631
    return c.GetResults().get(node, False)
632

    
633
  @staticmethod
634
  def call_upload_file(node_list, file_name):
635
    """Upload a file.
636

637
    The node will refuse the operation in case the file is not on the
638
    approved file list.
639

640
    This is a multi-node call.
641

642
    """
643
    fh = file(file_name)
644
    try:
645
      data = fh.read()
646
    finally:
647
      fh.close()
648
    st = os.stat(file_name)
649
    params = [file_name, data, st.st_mode, st.st_uid, st.st_gid,
650
              st.st_atime, st.st_mtime]
651
    c = Client("upload_file", params)
652
    c.ConnectList(node_list)
653
    c.Run()
654
    return c.GetResults()
655

    
656
  def call_os_diagnose(self, node_list):
657
    """Request a diagnose of OS definitions.
658

659
    This is a multi-node call.
660

661
    """
662
    c = Client("os_diagnose", [])
663
    c.ConnectList(node_list)
664
    c.Run()
665
    result = c.GetResults()
666
    new_result = {}
667
    for node_name in result:
668
      if result[node_name]:
669
        nr = [objects.OS.FromDict(oss) for oss in result[node_name]]
670
      else:
671
        nr = []
672
      new_result[node_name] = nr
673
    return new_result
674

    
675
  def call_os_get(self, node, name):
676
    """Returns an OS definition.
677

678
    This is a single-node call.
679

680
    """
681
    c = Client("os_get", [name])
682
    c.ConnectNode(node)
683
    c.Run()
684
    result = c.GetResults().get(node, False)
685
    if isinstance(result, dict):
686
      return objects.OS.FromDict(result)
687
    else:
688
      return result
689

    
690
  def call_hooks_runner(self, node_list, hpath, phase, env):
691
    """Call the hooks runner.
692

693
    Args:
694
      - op: the OpCode instance
695
      - env: a dictionary with the environment
696

697
    This is a multi-node call.
698

699
    """
700
    params = [hpath, phase, env]
701
    c = Client("hooks_runner", params)
702
    c.ConnectList(node_list)
703
    c.Run()
704
    result = c.GetResults()
705
    return result
706

    
707
  def call_iallocator_runner(self, node, name, idata):
708
    """Call an iallocator on a remote node
709

710
    Args:
711
      - name: the iallocator name
712
      - input: the json-encoded input string
713

714
    This is a single-node call.
715

716
    """
717
    params = [name, idata]
718
    c = Client("iallocator_runner", params)
719
    c.ConnectNode(node)
720
    c.Run()
721
    result = c.GetResults().get(node, False)
722
    return result
723

    
724
  def call_blockdev_grow(self, node, cf_bdev, amount):
725
    """Request a snapshot of the given block device.
726

727
    This is a single-node call.
728

729
    """
730
    c = Client("blockdev_grow", [cf_bdev.ToDict(), amount])
731
    c.ConnectNode(node)
732
    c.Run()
733
    return c.GetResults().get(node, False)
734

    
735
  def call_blockdev_snapshot(self, node, cf_bdev):
736
    """Request a snapshot of the given block device.
737

738
    This is a single-node call.
739

740
    """
741
    c = Client("blockdev_snapshot", [cf_bdev.ToDict()])
742
    c.ConnectNode(node)
743
    c.Run()
744
    return c.GetResults().get(node, False)
745

    
746
  def call_snapshot_export(self, node, snap_bdev, dest_node, instance,
747
                           cluster_name):
748
    """Request the export of a given snapshot.
749

750
    This is a single-node call.
751

752
    """
753
    params = [snap_bdev.ToDict(), dest_node,
754
              self._InstDict(instance), cluster_name]
755
    c = Client("snapshot_export", params)
756
    c.ConnectNode(node)
757
    c.Run()
758
    return c.GetResults().get(node, False)
759

    
760
  def call_finalize_export(self, node, instance, snap_disks):
761
    """Request the completion of an export operation.
762

763
    This writes the export config file, etc.
764

765
    This is a single-node call.
766

767
    """
768
    flat_disks = []
769
    for disk in snap_disks:
770
      flat_disks.append(disk.ToDict())
771
    params = [self._InstDict(instance), flat_disks]
772
    c = Client("finalize_export", params)
773
    c.ConnectNode(node)
774
    c.Run()
775
    return c.GetResults().get(node, False)
776

    
777
  def call_export_info(self, node, path):
778
    """Queries the export information in a given path.
779

780
    This is a single-node call.
781

782
    """
783
    c = Client("export_info", [path])
784
    c.ConnectNode(node)
785
    c.Run()
786
    result = c.GetResults().get(node, False)
787
    if not result:
788
      return result
789
    return objects.SerializableConfigParser.Loads(str(result))
790

    
791
  def call_instance_os_import(self, node, inst, osdev, swapdev,
792
                              src_node, src_image, cluster_name):
793
    """Request the import of a backup into an instance.
794

795
    This is a single-node call.
796

797
    """
798
    params = [self._InstDict(inst), osdev, swapdev,
799
              src_node, src_image, cluster_name]
800
    c = Client("instance_os_import", params)
801
    c.ConnectNode(node)
802
    c.Run()
803
    return c.GetResults().get(node, False)
804

    
805
  def call_export_list(self, node_list):
806
    """Gets the stored exports list.
807

808
    This is a multi-node call.
809

810
    """
811
    c = Client("export_list", [])
812
    c.ConnectList(node_list)
813
    c.Run()
814
    result = c.GetResults()
815
    return result
816

    
817
  def call_export_remove(self, node, export):
818
    """Requests removal of a given export.
819

820
    This is a single-node call.
821

822
    """
823
    c = Client("export_remove", [export])
824
    c.ConnectNode(node)
825
    c.Run()
826
    return c.GetResults().get(node, False)
827

    
828
  @staticmethod
829
  def call_node_leave_cluster(node):
830
    """Requests a node to clean the cluster information it has.
831

832
    This will remove the configuration information from the ganeti data
833
    dir.
834

835
    This is a single-node call.
836

837
    """
838
    c = Client("node_leave_cluster", [])
839
    c.ConnectNode(node)
840
    c.Run()
841
    return c.GetResults().get(node, False)
842

    
843
  def call_node_volumes(self, node_list):
844
    """Gets all volumes on node(s).
845

846
    This is a multi-node call.
847

848
    """
849
    c = Client("node_volumes", [])
850
    c.ConnectList(node_list)
851
    c.Run()
852
    return c.GetResults()
853

    
854
  def call_test_delay(self, node_list, duration):
855
    """Sleep for a fixed time on given node(s).
856

857
    This is a multi-node call.
858

859
    """
860
    c = Client("test_delay", [duration])
861
    c.ConnectList(node_list)
862
    c.Run()
863
    return c.GetResults()
864

    
865
  def call_file_storage_dir_create(self, node, file_storage_dir):
866
    """Create the given file storage directory.
867

868
    This is a single-node call.
869

870
    """
871
    c = Client("file_storage_dir_create", [file_storage_dir])
872
    c.ConnectNode(node)
873
    c.Run()
874
    return c.GetResults().get(node, False)
875

    
876
  def call_file_storage_dir_remove(self, node, file_storage_dir):
877
    """Remove the given file storage directory.
878

879
    This is a single-node call.
880

881
    """
882
    c = Client("file_storage_dir_remove", [file_storage_dir])
883
    c.ConnectNode(node)
884
    c.Run()
885
    return c.GetResults().get(node, False)
886

    
887
  def call_file_storage_dir_rename(self, node, old_file_storage_dir,
888
                                   new_file_storage_dir):
889
    """Rename file storage directory.
890

891
    This is a single-node call.
892

893
    """
894
    c = Client("file_storage_dir_rename",
895
               [old_file_storage_dir, new_file_storage_dir])
896
    c.ConnectNode(node)
897
    c.Run()
898
    return c.GetResults().get(node, False)
899

    
900
  @staticmethod
901
  def call_jobqueue_update(node_list, file_name, content):
902
    """Update job queue.
903

904
    This is a multi-node call.
905

906
    """
907
    c = Client("jobqueue_update", [file_name, content])
908
    c.ConnectList(node_list)
909
    c.Run()
910
    result = c.GetResults()
911
    return result
912

    
913
  @staticmethod
914
  def call_jobqueue_purge(node):
915
    """Purge job queue.
916

917
    This is a single-node call.
918

919
    """
920
    c = Client("jobqueue_purge", [])
921
    c.ConnectNode(node)
922
    c.Run()
923
    return c.GetResults().get(node, False)
924

    
925
  @staticmethod
926
  def call_jobqueue_rename(node_list, old, new):
927
    """Rename a job queue file.
928

929
    This is a multi-node call.
930

931
    """
932
    c = Client("jobqueue_rename", [old, new])
933
    c.ConnectList(node_list)
934
    c.Run()
935
    result = c.GetResults()
936
    return result
937

    
938

    
939
  @staticmethod
940
  def call_jobqueue_set_drain(node_list, drain_flag):
941
    """Set the drain flag on the queue.
942

943
    This is a multi-node call.
944

945
    @type node_list: list
946
    @param node_list: the list of nodes to query
947
    @type drain_flag: bool
948
    @param drain_flag: if True, will set the drain flag, otherwise reset it.
949

950
    """
951
    c = Client("jobqueue_set_drain", [drain_flag])
952
    c.ConnectList(node_list)
953
    c.Run()
954
    result = c.GetResults()
955
    return result
956

    
957

    
958
  def call_hypervisor_validate_params(self, node_list, hvname, hvparams):
959
    """Validate the hypervisor params.
960

961
    This is a multi-node call.
962

963
    @type node_list: list
964
    @param node_list: the list of nodes to query
965
    @type hvname: string
966
    @param hvname: the hypervisor name
967
    @type hvparams: dict
968
    @param hvparams: the hypervisor parameters to be validated
969

970
    """
971
    cluster = self._cfg.GetClusterInfo()
972
    hv_full = cluster.FillDict(cluster.hvparams.get(hvname, {}), hvparams)
973
    c = Client("hypervisor_validate_params", [hvname, hv_full])
974
    c.ConnectList(node_list)
975
    c.Run()
976
    result = c.GetResults()
977
    return result