Statistics
| Branch: | Tag: | Revision:

root / lib / rpc.py @ 7c28c575

History | View | Annotate | Download (24.8 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Inter-node RPC library.
23

24
"""
25

    
26
# pylint: disable-msg=C0103,R0201,R0904
27
# C0103: Invalid name, since call_ are not valid
28
# R0201: Method could be a function, we keep all rpcs instance methods
29
# as not to change them back and forth between static/instance methods
30
# if they need to start using instance attributes
31
# R0904: Too many public methods
32

    
33
import os
34
import socket
35
import httplib
36
import logging
37

    
38
from ganeti import utils
39
from ganeti import objects
40
from ganeti import http
41
from ganeti import serializer
42

    
43

    
44
class Client:
45
  """RPC Client class.
46

47
  This class, given a (remote) method name, a list of parameters and a
48
  list of nodes, will contact (in parallel) all nodes, and return a
49
  dict of results (key: node name, value: result).
50

51
  One current bug is that generic failure is still signalled by
52
  'False' result, which is not good. This overloading of values can
53
  cause bugs.
54

55
  """
56
  def __init__(self, procedure, args):
57
    self.procedure = procedure
58
    self.args = args
59
    self.body = serializer.DumpJson(args, indent=False)
60

    
61
    self.port = utils.GetNodeDaemonPort()
62
    self.nodepw = utils.GetNodeDaemonPassword()
63
    self.nc = {}
64

    
65
  def ConnectList(self, node_list, address_list=None):
66
    """Add a list of nodes to the target nodes.
67

68
    @type node_list: list
69
    @param node_list: the list of node names to connect
70
    @type address_list: list or None
71
    @keyword address_list: either None or a list with node addresses,
72
        which must have the same length as the node list
73

74
    """
75
    if address_list is None:
76
      address_list = [None for _ in node_list]
77
    else:
78
      assert len(node_list) == len(address_list), \
79
             "Name and address lists should have the same length"
80
    for node, address in zip(node_list, address_list):
81
      self.ConnectNode(node, address)
82

    
83
  def ConnectNode(self, name, address=None):
84
    """Add a node to the target list.
85

86
    @type name: str
87
    @param name: the node name
88
    @type address: str
89
    @keyword address: the node address, if known
90

91
    """
92
    if address is None:
93
      address = name
94

    
95
    self.nc[name] = http.HttpClientRequest(address, self.port, http.HTTP_PUT,
96
                                           "/%s" % self.procedure,
97
                                           post_data=self.body)
98

    
99
  def GetResults(self):
100
    """Call nodes and return results.
101

102
    @rtype: list
103
    @returns: List of RPC results
104

105
    """
106
    # TODO: Shared and reused manager
107
    mgr = http.HttpClientManager()
108
    try:
109
      mgr.ExecRequests(self.nc.values())
110
    finally:
111
      mgr.Shutdown()
112

    
113
    results = {}
114

    
115
    for name, req in self.nc.iteritems():
116
      if req.success and req.resp_status == http.HTTP_OK:
117
        results[name] = serializer.LoadJson(req.resp_body)
118
        continue
119

    
120
      if req.error:
121
        msg = req.error
122
      else:
123
        msg = req.resp_body
124

    
125
      logging.error("RPC error from node %s: %s", name, msg)
126
      results[name] = False
127

    
128
    return results
129

    
130

    
131
class RpcRunner(object):
132
  """RPC runner class"""
133

    
134
  def __init__(self, cfg):
135
    """Initialized the rpc runner.
136

137
    @type cfg:  C{config.ConfigWriter}
138
    @param cfg: the configuration object that will be used to get data
139
                about the cluster
140

141
    """
142
    self._cfg = cfg
143

    
144
  def _InstDict(self, instance):
145
    """Convert the given instance to a dict.
146

147
    This is done via the instance's ToDict() method and additionally
148
    we fill the hvparams with the cluster defaults.
149

150
    @type instance: L{objects.Instance}
151
    @param instance: an Instance object
152
    @rtype: dict
153
    @return: the instance dict, with the hvparams filled with the
154
        cluster defaults
155

156
    """
157
    idict = instance.ToDict()
158
    cluster = self._cfg.GetClusterInfo()
159
    idict["hvparams"] = cluster.FillHV(instance)
160
    idict["beparams"] = cluster.FillBE(instance)
161
    return idict
162

    
163
  def _ConnectList(self, client, node_list):
164
    """Helper for computing node addresses.
165

166
    @type client: L{Client}
167
    @param client: a C{Client} instance
168
    @type node_list: list
169
    @param node_list: the node list we should connect
170

171
    """
172
    all_nodes = self._cfg.GetAllNodesInfo()
173
    addr_list = []
174
    for node in node_list:
175
      if node in all_nodes:
176
        val = all_nodes[node].primary_ip
177
      else:
178
        val = None
179
      addr_list.append(val)
180
    client.ConnectList(node_list, address_list=addr_list)
181

    
182
  def _ConnectNode(self, client, node):
183
    """Helper for computing one node's address.
184

185
    @type client: L{Client}
186
    @param client: a C{Client} instance
187
    @type node: str
188
    @param node: the node we should connect
189

190
    """
191
    node_info = self._cfg.GetNodeInfo(node)
192
    if node_info is not None:
193
      addr = node_info.primary_ip
194
    else:
195
      addr = None
196
    client.ConnectNode(node, address=addr)
197

    
198
  def call_volume_list(self, node_list, vg_name):
199
    """Gets the logical volumes present in a given volume group.
200

201
    This is a multi-node call.
202

203
    """
204
    c = Client("volume_list", [vg_name])
205
    self._ConnectList(c, node_list)
206
    return c.GetResults()
207

    
208
  def call_vg_list(self, node_list):
209
    """Gets the volume group list.
210

211
    This is a multi-node call.
212

213
    """
214
    c = Client("vg_list", [])
215
    self._ConnectList(c, node_list)
216
    return c.GetResults()
217

    
218
  def call_bridges_exist(self, node, bridges_list):
219
    """Checks if a node has all the bridges given.
220

221
    This method checks if all bridges given in the bridges_list are
222
    present on the remote node, so that an instance that uses interfaces
223
    on those bridges can be started.
224

225
    This is a single-node call.
226

227
    """
228
    c = Client("bridges_exist", [bridges_list])
229
    self._ConnectNode(c, node)
230
    return c.GetResults().get(node, False)
231

    
232
  def call_instance_start(self, node, instance, extra_args):
233
    """Starts an instance.
234

235
    This is a single-node call.
236

237
    """
238
    c = Client("instance_start", [self._InstDict(instance), extra_args])
239
    self._ConnectNode(c, node)
240
    return c.GetResults().get(node, False)
241

    
242
  def call_instance_shutdown(self, node, instance):
243
    """Stops an instance.
244

245
    This is a single-node call.
246

247
    """
248
    c = Client("instance_shutdown", [self._InstDict(instance)])
249
    self._ConnectNode(c, node)
250
    return c.GetResults().get(node, False)
251

    
252
  def call_instance_migrate(self, node, instance, target, live):
253
    """Migrate an instance.
254

255
    This is a single-node call.
256

257
    @type node: string
258
    @param node: the node on which the instance is currently running
259
    @type instance: C{objects.Instance}
260
    @param instance: the instance definition
261
    @type target: string
262
    @param target: the target node name
263
    @type live: boolean
264
    @param live: whether the migration should be done live or not (the
265
        interpretation of this parameter is left to the hypervisor)
266

267
    """
268
    c = Client("instance_migrate", [self._InstDict(instance), target, live])
269
    self._ConnectNode(c, node)
270
    return c.GetResults().get(node, False)
271

    
272
  def call_instance_reboot(self, node, instance, reboot_type, extra_args):
273
    """Reboots an instance.
274

275
    This is a single-node call.
276

277
    """
278
    c = Client("instance_reboot", [self._InstDict(instance),
279
                                   reboot_type, extra_args])
280
    self._ConnectNode(c, node)
281
    return c.GetResults().get(node, False)
282

    
283
  def call_instance_os_add(self, node, inst):
284
    """Installs an OS on the given instance.
285

286
    This is a single-node call.
287

288
    """
289
    params = [self._InstDict(inst)]
290
    c = Client("instance_os_add", params)
291
    self._ConnectNode(c, node)
292
    return c.GetResults().get(node, False)
293

    
294
  def call_instance_run_rename(self, node, inst, old_name):
295
    """Run the OS rename script for an instance.
296

297
    This is a single-node call.
298

299
    """
300
    params = [self._InstDict(inst), old_name]
301
    c = Client("instance_run_rename", params)
302
    self._ConnectNode(c, node)
303
    return c.GetResults().get(node, False)
304

    
305
  def call_instance_info(self, node, instance, hname):
306
    """Returns information about a single instance.
307

308
    This is a single-node call.
309

310
    @type node_list: list
311
    @param node_list: the list of nodes to query
312
    @type instance: string
313
    @param instance: the instance name
314
    @type hname: string
315
    @param hname: the hypervisor type of the instance
316

317
    """
318
    c = Client("instance_info", [instance, hname])
319
    self._ConnectNode(c, node)
320
    return c.GetResults().get(node, False)
321

    
322
  def call_all_instances_info(self, node_list, hypervisor_list):
323
    """Returns information about all instances on the given nodes.
324

325
    This is a multi-node call.
326

327
    @type node_list: list
328
    @param node_list: the list of nodes to query
329
    @type hypervisor_list: list
330
    @param hypervisor_list: the hypervisors to query for instances
331

332
    """
333
    c = Client("all_instances_info", [hypervisor_list])
334
    self._ConnectList(c, node_list)
335
    return c.GetResults()
336

    
337
  def call_instance_list(self, node_list, hypervisor_list):
338
    """Returns the list of running instances on a given node.
339

340
    This is a multi-node call.
341

342
    @type node_list: list
343
    @param node_list: the list of nodes to query
344
    @type hypervisor_list: list
345
    @param hypervisor_list: the hypervisors to query for instances
346

347
    """
348
    c = Client("instance_list", [hypervisor_list])
349
    self._ConnectList(c, node_list)
350
    return c.GetResults()
351

    
352
  def call_node_tcp_ping(self, node, source, target, port, timeout,
353
                         live_port_needed):
354
    """Do a TcpPing on the remote node
355

356
    This is a single-node call.
357

358
    """
359
    c = Client("node_tcp_ping", [source, target, port, timeout,
360
                                 live_port_needed])
361
    self._ConnectNode(c, node)
362
    return c.GetResults().get(node, False)
363

    
364
  def call_node_has_ip_address(self, node, address):
365
    """Checks if a node has the given IP address.
366

367
    This is a single-node call.
368

369
    """
370
    c = Client("node_has_ip_address", [address])
371
    self._ConnectNode(c, node)
372
    return c.GetResults().get(node, False)
373

    
374
  def call_node_info(self, node_list, vg_name, hypervisor_type):
375
    """Return node information.
376

377
    This will return memory information and volume group size and free
378
    space.
379

380
    This is a multi-node call.
381

382
    @type node_list: list
383
    @param node_list: the list of nodes to query
384
    @type vgname: C{string}
385
    @param vgname: the name of the volume group to ask for disk space
386
        information
387
    @type hypervisor_type: C{str}
388
    @param hypervisor_type: the name of the hypervisor to ask for
389
        memory information
390

391
    """
392
    c = Client("node_info", [vg_name, hypervisor_type])
393
    self._ConnectList(c, node_list)
394
    retux = c.GetResults()
395

    
396
    for node_name in retux:
397
      ret = retux.get(node_name, False)
398
      if type(ret) != dict:
399
        logging.error("could not connect to node %s", node_name)
400
        ret = {}
401

    
402
      utils.CheckDict(ret,
403
                      { 'memory_total' : '-',
404
                        'memory_dom0' : '-',
405
                        'memory_free' : '-',
406
                        'vg_size' : 'node_unreachable',
407
                        'vg_free' : '-' },
408
                      "call_node_info",
409
                      )
410
    return retux
411

    
412
  def call_node_add(self, node, dsa, dsapub, rsa, rsapub, ssh, sshpub):
413
    """Add a node to the cluster.
414

415
    This is a single-node call.
416

417
    """
418
    params = [dsa, dsapub, rsa, rsapub, ssh, sshpub]
419
    c = Client("node_add", params)
420
    self._ConnectNode(c, node)
421
    return c.GetResults().get(node, False)
422

    
423
  def call_node_verify(self, node_list, checkdict, cluster_name):
424
    """Request verification of given parameters.
425

426
    This is a multi-node call.
427

428
    """
429
    c = Client("node_verify", [checkdict, cluster_name])
430
    self._ConnectList(c, node_list)
431
    return c.GetResults()
432

    
433
  @staticmethod
434
  def call_node_start_master(node, start_daemons):
435
    """Tells a node to activate itself as a master.
436

437
    This is a single-node call.
438

439
    """
440
    c = Client("node_start_master", [start_daemons])
441
    c.ConnectNode(node)
442
    return c.GetResults().get(node, False)
443

    
444
  @staticmethod
445
  def call_node_stop_master(node, stop_daemons):
446
    """Tells a node to demote itself from master status.
447

448
    This is a single-node call.
449

450
    """
451
    c = Client("node_stop_master", [stop_daemons])
452
    c.ConnectNode(node)
453
    return c.GetResults().get(node, False)
454

    
455
  @staticmethod
456
  def call_master_info(node_list):
457
    """Query master info.
458

459
    This is a multi-node call.
460

461
    """
462
    # TODO: should this method query down nodes?
463
    c = Client("master_info", [])
464
    c.ConnectList(node_list)
465
    return c.GetResults()
466

    
467
  def call_version(self, node_list):
468
    """Query node version.
469

470
    This is a multi-node call.
471

472
    """
473
    c = Client("version", [])
474
    self._ConnectList(c, node_list)
475
    return c.GetResults()
476

    
477
  def call_blockdev_create(self, node, bdev, size, owner, on_primary, info):
478
    """Request creation of a given block device.
479

480
    This is a single-node call.
481

482
    """
483
    params = [bdev.ToDict(), size, owner, on_primary, info]
484
    c = Client("blockdev_create", params)
485
    self._ConnectNode(c, node)
486
    return c.GetResults().get(node, False)
487

    
488
  def call_blockdev_remove(self, node, bdev):
489
    """Request removal of a given block device.
490

491
    This is a single-node call.
492

493
    """
494
    c = Client("blockdev_remove", [bdev.ToDict()])
495
    self._ConnectNode(c, node)
496
    return c.GetResults().get(node, False)
497

    
498
  def call_blockdev_rename(self, node, devlist):
499
    """Request rename of the given block devices.
500

501
    This is a single-node call.
502

503
    """
504
    params = [(d.ToDict(), uid) for d, uid in devlist]
505
    c = Client("blockdev_rename", params)
506
    self._ConnectNode(c, node)
507
    return c.GetResults().get(node, False)
508

    
509
  def call_blockdev_assemble(self, node, disk, owner, on_primary):
510
    """Request assembling of a given block device.
511

512
    This is a single-node call.
513

514
    """
515
    params = [disk.ToDict(), owner, on_primary]
516
    c = Client("blockdev_assemble", params)
517
    self._ConnectNode(c, node)
518
    return c.GetResults().get(node, False)
519

    
520
  def call_blockdev_shutdown(self, node, disk):
521
    """Request shutdown of a given block device.
522

523
    This is a single-node call.
524

525
    """
526
    c = Client("blockdev_shutdown", [disk.ToDict()])
527
    self._ConnectNode(c, node)
528
    return c.GetResults().get(node, False)
529

    
530
  def call_blockdev_addchildren(self, node, bdev, ndevs):
531
    """Request adding a list of children to a (mirroring) device.
532

533
    This is a single-node call.
534

535
    """
536
    params = [bdev.ToDict(), [disk.ToDict() for disk in ndevs]]
537
    c = Client("blockdev_addchildren", params)
538
    self._ConnectNode(c, node)
539
    return c.GetResults().get(node, False)
540

    
541
  def call_blockdev_removechildren(self, node, bdev, ndevs):
542
    """Request removing a list of children from a (mirroring) device.
543

544
    This is a single-node call.
545

546
    """
547
    params = [bdev.ToDict(), [disk.ToDict() for disk in ndevs]]
548
    c = Client("blockdev_removechildren", params)
549
    self._ConnectNode(c, node)
550
    return c.GetResults().get(node, False)
551

    
552
  def call_blockdev_getmirrorstatus(self, node, disks):
553
    """Request status of a (mirroring) device.
554

555
    This is a single-node call.
556

557
    """
558
    params = [dsk.ToDict() for dsk in disks]
559
    c = Client("blockdev_getmirrorstatus", params)
560
    self._ConnectNode(c, node)
561
    return c.GetResults().get(node, False)
562

    
563
  def call_blockdev_find(self, node, disk):
564
    """Request identification of a given block device.
565

566
    This is a single-node call.
567

568
    """
569
    c = Client("blockdev_find", [disk.ToDict()])
570
    self._ConnectNode(c, node)
571
    return c.GetResults().get(node, False)
572

    
573
  def call_blockdev_close(self, node, disks):
574
    """Closes the given block devices.
575

576
    This is a single-node call.
577

578
    """
579
    params = [cf.ToDict() for cf in disks]
580
    c = Client("blockdev_close", params)
581
    self._ConnectNode(c, node)
582
    return c.GetResults().get(node, False)
583

    
584
  @staticmethod
585
  def call_upload_file(node_list, file_name, address_list=None):
586
    """Upload a file.
587

588
    The node will refuse the operation in case the file is not on the
589
    approved file list.
590

591
    This is a multi-node call.
592

593
    @type node_list: list
594
    @param node_list: the list of node names to upload to
595
    @type file_name: str
596
    @param file_name: the filename to upload
597
    @type address_list: list or None
598
    @keyword address_list: an optional list of node addresses, in order
599
        to optimize the RPC speed
600

601
    """
602
    fh = file(file_name)
603
    try:
604
      data = fh.read()
605
    finally:
606
      fh.close()
607
    st = os.stat(file_name)
608
    params = [file_name, data, st.st_mode, st.st_uid, st.st_gid,
609
              st.st_atime, st.st_mtime]
610
    c = Client("upload_file", params)
611
    c.ConnectList(node_list, address_list=address_list)
612
    return c.GetResults()
613

    
614
  def call_os_diagnose(self, node_list):
615
    """Request a diagnose of OS definitions.
616

617
    This is a multi-node call.
618

619
    """
620
    c = Client("os_diagnose", [])
621
    self._ConnectList(c, node_list)
622
    result = c.GetResults()
623
    new_result = {}
624
    for node_name in result:
625
      if result[node_name]:
626
        nr = [objects.OS.FromDict(oss) for oss in result[node_name]]
627
      else:
628
        nr = []
629
      new_result[node_name] = nr
630
    return new_result
631

    
632
  def call_os_get(self, node, name):
633
    """Returns an OS definition.
634

635
    This is a single-node call.
636

637
    """
638
    c = Client("os_get", [name])
639
    self._ConnectNode(c, node)
640
    result = c.GetResults().get(node, False)
641
    if isinstance(result, dict):
642
      return objects.OS.FromDict(result)
643
    else:
644
      return result
645

    
646
  def call_hooks_runner(self, node_list, hpath, phase, env):
647
    """Call the hooks runner.
648

649
    Args:
650
      - op: the OpCode instance
651
      - env: a dictionary with the environment
652

653
    This is a multi-node call.
654

655
    """
656
    params = [hpath, phase, env]
657
    c = Client("hooks_runner", params)
658
    self._ConnectList(c, node_list)
659
    result = c.GetResults()
660
    return result
661

    
662
  def call_iallocator_runner(self, node, name, idata):
663
    """Call an iallocator on a remote node
664

665
    Args:
666
      - name: the iallocator name
667
      - input: the json-encoded input string
668

669
    This is a single-node call.
670

671
    """
672
    params = [name, idata]
673
    c = Client("iallocator_runner", params)
674
    self._ConnectNode(c, node)
675
    result = c.GetResults().get(node, False)
676
    return result
677

    
678
  def call_blockdev_grow(self, node, cf_bdev, amount):
679
    """Request a snapshot of the given block device.
680

681
    This is a single-node call.
682

683
    """
684
    c = Client("blockdev_grow", [cf_bdev.ToDict(), amount])
685
    self._ConnectNode(c, node)
686
    return c.GetResults().get(node, False)
687

    
688
  def call_blockdev_snapshot(self, node, cf_bdev):
689
    """Request a snapshot of the given block device.
690

691
    This is a single-node call.
692

693
    """
694
    c = Client("blockdev_snapshot", [cf_bdev.ToDict()])
695
    self._ConnectNode(c, node)
696
    return c.GetResults().get(node, False)
697

    
698
  def call_snapshot_export(self, node, snap_bdev, dest_node, instance,
699
                           cluster_name, idx):
700
    """Request the export of a given snapshot.
701

702
    This is a single-node call.
703

704
    """
705
    params = [snap_bdev.ToDict(), dest_node,
706
              self._InstDict(instance), cluster_name, idx]
707
    c = Client("snapshot_export", params)
708
    self._ConnectNode(c, node)
709
    return c.GetResults().get(node, False)
710

    
711
  def call_finalize_export(self, node, instance, snap_disks):
712
    """Request the completion of an export operation.
713

714
    This writes the export config file, etc.
715

716
    This is a single-node call.
717

718
    """
719
    flat_disks = []
720
    for disk in snap_disks:
721
      flat_disks.append(disk.ToDict())
722
    params = [self._InstDict(instance), flat_disks]
723
    c = Client("finalize_export", params)
724
    self._ConnectNode(c, node)
725
    return c.GetResults().get(node, False)
726

    
727
  def call_export_info(self, node, path):
728
    """Queries the export information in a given path.
729

730
    This is a single-node call.
731

732
    """
733
    c = Client("export_info", [path])
734
    self._ConnectNode(c, node)
735
    result = c.GetResults().get(node, False)
736
    if not result:
737
      return result
738
    return objects.SerializableConfigParser.Loads(str(result))
739

    
740
  def call_instance_os_import(self, node, inst, src_node, src_images,
741
                              cluster_name):
742
    """Request the import of a backup into an instance.
743

744
    This is a single-node call.
745

746
    """
747
    params = [self._InstDict(inst), src_node, src_images, cluster_name]
748
    c = Client("instance_os_import", params)
749
    self._ConnectNode(c, node)
750
    return c.GetResults().get(node, False)
751

    
752
  def call_export_list(self, node_list):
753
    """Gets the stored exports list.
754

755
    This is a multi-node call.
756

757
    """
758
    c = Client("export_list", [])
759
    self._ConnectList(c, node_list)
760
    result = c.GetResults()
761
    return result
762

    
763
  def call_export_remove(self, node, export):
764
    """Requests removal of a given export.
765

766
    This is a single-node call.
767

768
    """
769
    c = Client("export_remove", [export])
770
    self._ConnectNode(c, node)
771
    return c.GetResults().get(node, False)
772

    
773
  @staticmethod
774
  def call_node_leave_cluster(node):
775
    """Requests a node to clean the cluster information it has.
776

777
    This will remove the configuration information from the ganeti data
778
    dir.
779

780
    This is a single-node call.
781

782
    """
783
    c = Client("node_leave_cluster", [])
784
    c.ConnectNode(node)
785
    return c.GetResults().get(node, False)
786

    
787
  def call_node_volumes(self, node_list):
788
    """Gets all volumes on node(s).
789

790
    This is a multi-node call.
791

792
    """
793
    c = Client("node_volumes", [])
794
    self._ConnectList(c, node_list)
795
    return c.GetResults()
796

    
797
  def call_test_delay(self, node_list, duration):
798
    """Sleep for a fixed time on given node(s).
799

800
    This is a multi-node call.
801

802
    """
803
    c = Client("test_delay", [duration])
804
    self._ConnectList(c, node_list)
805
    return c.GetResults()
806

    
807
  def call_file_storage_dir_create(self, node, file_storage_dir):
808
    """Create the given file storage directory.
809

810
    This is a single-node call.
811

812
    """
813
    c = Client("file_storage_dir_create", [file_storage_dir])
814
    self._ConnectNode(c, node)
815
    return c.GetResults().get(node, False)
816

    
817
  def call_file_storage_dir_remove(self, node, file_storage_dir):
818
    """Remove the given file storage directory.
819

820
    This is a single-node call.
821

822
    """
823
    c = Client("file_storage_dir_remove", [file_storage_dir])
824
    self._ConnectNode(c, node)
825
    return c.GetResults().get(node, False)
826

    
827
  def call_file_storage_dir_rename(self, node, old_file_storage_dir,
828
                                   new_file_storage_dir):
829
    """Rename file storage directory.
830

831
    This is a single-node call.
832

833
    """
834
    c = Client("file_storage_dir_rename",
835
               [old_file_storage_dir, new_file_storage_dir])
836
    self._ConnectNode(c, node)
837
    return c.GetResults().get(node, False)
838

    
839
  @staticmethod
840
  def call_jobqueue_update(node_list, address_list, file_name, content):
841
    """Update job queue.
842

843
    This is a multi-node call.
844

845
    """
846
    c = Client("jobqueue_update", [file_name, content])
847
    c.ConnectList(node_list, address_list=address_list)
848
    result = c.GetResults()
849
    return result
850

    
851
  @staticmethod
852
  def call_jobqueue_purge(node):
853
    """Purge job queue.
854

855
    This is a single-node call.
856

857
    """
858
    c = Client("jobqueue_purge", [])
859
    c.ConnectNode(node)
860
    return c.GetResults().get(node, False)
861

    
862
  @staticmethod
863
  def call_jobqueue_rename(node_list, address_list, old, new):
864
    """Rename a job queue file.
865

866
    This is a multi-node call.
867

868
    """
869
    c = Client("jobqueue_rename", [old, new])
870
    c.ConnectList(node_list, address_list=address_list)
871
    result = c.GetResults()
872
    return result
873

    
874

    
875
  @staticmethod
876
  def call_jobqueue_set_drain(node_list, drain_flag):
877
    """Set the drain flag on the queue.
878

879
    This is a multi-node call.
880

881
    @type node_list: list
882
    @param node_list: the list of nodes to query
883
    @type drain_flag: bool
884
    @param drain_flag: if True, will set the drain flag, otherwise reset it.
885

886
    """
887
    c = Client("jobqueue_set_drain", [drain_flag])
888
    c.ConnectList(node_list)
889
    result = c.GetResults()
890
    return result
891

    
892

    
893
  def call_hypervisor_validate_params(self, node_list, hvname, hvparams):
894
    """Validate the hypervisor params.
895

896
    This is a multi-node call.
897

898
    @type node_list: list
899
    @param node_list: the list of nodes to query
900
    @type hvname: string
901
    @param hvname: the hypervisor name
902
    @type hvparams: dict
903
    @param hvparams: the hypervisor parameters to be validated
904

905
    """
906
    cluster = self._cfg.GetClusterInfo()
907
    hv_full = cluster.FillDict(cluster.hvparams.get(hvname, {}), hvparams)
908
    c = Client("hypervisor_validate_params", [hvname, hv_full])
909
    self._ConnectList(c, node_list)
910
    result = c.GetResults()
911
    return result