Statistics
| Branch: | Tag: | Revision:

root / lib / rpc.py @ ecfe9491

History | View | Annotate | Download (24.8 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Inter-node RPC library.
23

24
"""
25

    
26
# pylint: disable-msg=C0103,R0201,R0904
27
# C0103: Invalid name, since call_ are not valid
28
# R0201: Method could be a function, we keep all rpcs instance methods
29
# as not to change them back and forth between static/instance methods
30
# if they need to start using instance attributes
31
# R0904: Too many public methods
32

    
33
import os
34
import socket
35
import httplib
36
import logging
37

    
38
import simplejson
39

    
40
from ganeti import utils
41
from ganeti import objects
42
from ganeti import http
43

    
44

    
45
class Client:
46
  """RPC Client class.
47

48
  This class, given a (remote) method name, a list of parameters and a
49
  list of nodes, will contact (in parallel) all nodes, and return a
50
  dict of results (key: node name, value: result).
51

52
  One current bug is that generic failure is still signalled by
53
  'False' result, which is not good. This overloading of values can
54
  cause bugs.
55

56
  """
57
  def __init__(self, procedure, args):
58
    self.procedure = procedure
59
    self.args = args
60
    self.body = simplejson.dumps(args)
61

    
62
    self.port = utils.GetNodeDaemonPort()
63
    self.nodepw = utils.GetNodeDaemonPassword()
64
    self.nc = {}
65

    
66
  def ConnectList(self, node_list, address_list=None):
67
    """Add a list of nodes to the target nodes.
68

69
    @type node_list: list
70
    @param node_list: the list of node names to connect
71
    @type address_list: list or None
72
    @keyword address_list: either None or a list with node addresses,
73
        which must have the same length as the node list
74

75
    """
76
    if address_list is None:
77
      address_list = [None for _ in node_list]
78
    else:
79
      assert len(node_list) == len(address_list), \
80
             "Name and address lists should have the same length"
81
    for node, address in zip(node_list, address_list):
82
      self.ConnectNode(node, address)
83

    
84
  def ConnectNode(self, name, address=None):
85
    """Add a node to the target list.
86

87
    @type name: str
88
    @param name: the node name
89
    @type address: str
90
    @keyword address: the node address, if known
91

92
    """
93
    if address is None:
94
      address = name
95

    
96
    self.nc[name] = http.HttpClientRequest(address, self.port, http.HTTP_PUT,
97
                                           "/%s" % self.procedure,
98
                                           post_data=self.body)
99

    
100
  def GetResults(self):
101
    """Call nodes and return results.
102

103
    @rtype: list
104
    @returns: List of RPC results
105

106
    """
107
    # TODO: Shared and reused manager
108
    mgr = http.HttpClientManager()
109
    try:
110
      mgr.ExecRequests(self.nc.values())
111
    finally:
112
      mgr.Shutdown()
113

    
114
    results = {}
115

    
116
    for name, req in self.nc.iteritems():
117
      if req.success and req.resp_status == http.HTTP_OK:
118
        results[name] = simplejson.loads(req.resp_body)
119
        continue
120

    
121
      if req.error:
122
        msg = req.error
123
      else:
124
        msg = req.resp_body
125

    
126
      logging.error("RPC error from node %s: %s", name, msg)
127
      results[name] = False
128

    
129
    return results
130

    
131

    
132
class RpcRunner(object):
133
  """RPC runner class"""
134

    
135
  def __init__(self, cfg):
136
    """Initialized the rpc runner.
137

138
    @type cfg:  C{config.ConfigWriter}
139
    @param cfg: the configuration object that will be used to get data
140
                about the cluster
141

142
    """
143
    self._cfg = cfg
144

    
145
  def _InstDict(self, instance):
146
    """Convert the given instance to a dict.
147

148
    This is done via the instance's ToDict() method and additionally
149
    we fill the hvparams with the cluster defaults.
150

151
    @type instance: L{objects.Instance}
152
    @param instance: an Instance object
153
    @rtype: dict
154
    @return: the instance dict, with the hvparams filled with the
155
        cluster defaults
156

157
    """
158
    idict = instance.ToDict()
159
    cluster = self._cfg.GetClusterInfo()
160
    idict["hvparams"] = cluster.FillHV(instance)
161
    idict["beparams"] = cluster.FillBE(instance)
162
    return idict
163

    
164
  def _ConnectList(self, client, node_list):
165
    """Helper for computing node addresses.
166

167
    @type client: L{Client}
168
    @param client: a C{Client} instance
169
    @type node_list: list
170
    @param node_list: the node list we should connect
171

172
    """
173
    all_nodes = self._cfg.GetAllNodesInfo()
174
    addr_list = []
175
    for node in node_list:
176
      if node in all_nodes:
177
        val = all_nodes[node].primary_ip
178
      else:
179
        val = None
180
      addr_list.append(val)
181
    client.ConnectList(node_list, address_list=addr_list)
182

    
183
  def _ConnectNode(self, client, node):
184
    """Helper for computing one node's address.
185

186
    @type client: L{Client}
187
    @param client: a C{Client} instance
188
    @type node: str
189
    @param node: the node we should connect
190

191
    """
192
    node_info = self._cfg.GetNodeInfo(node)
193
    if node_info is not None:
194
      addr = node_info.primary_ip
195
    else:
196
      addr = None
197
    client.ConnectNode(node, address=addr)
198

    
199
  def call_volume_list(self, node_list, vg_name):
200
    """Gets the logical volumes present in a given volume group.
201

202
    This is a multi-node call.
203

204
    """
205
    c = Client("volume_list", [vg_name])
206
    self._ConnectList(c, node_list)
207
    return c.GetResults()
208

    
209
  def call_vg_list(self, node_list):
210
    """Gets the volume group list.
211

212
    This is a multi-node call.
213

214
    """
215
    c = Client("vg_list", [])
216
    self._ConnectList(c, node_list)
217
    return c.GetResults()
218

    
219
  def call_bridges_exist(self, node, bridges_list):
220
    """Checks if a node has all the bridges given.
221

222
    This method checks if all bridges given in the bridges_list are
223
    present on the remote node, so that an instance that uses interfaces
224
    on those bridges can be started.
225

226
    This is a single-node call.
227

228
    """
229
    c = Client("bridges_exist", [bridges_list])
230
    self._ConnectNode(c, node)
231
    return c.GetResults().get(node, False)
232

    
233
  def call_instance_start(self, node, instance, extra_args):
234
    """Starts an instance.
235

236
    This is a single-node call.
237

238
    """
239
    c = Client("instance_start", [self._InstDict(instance), extra_args])
240
    self._ConnectNode(c, node)
241
    return c.GetResults().get(node, False)
242

    
243
  def call_instance_shutdown(self, node, instance):
244
    """Stops an instance.
245

246
    This is a single-node call.
247

248
    """
249
    c = Client("instance_shutdown", [self._InstDict(instance)])
250
    self._ConnectNode(c, node)
251
    return c.GetResults().get(node, False)
252

    
253
  def call_instance_migrate(self, node, instance, target, live):
254
    """Migrate an instance.
255

256
    This is a single-node call.
257

258
    @type node: string
259
    @param node: the node on which the instance is currently running
260
    @type instance: C{objects.Instance}
261
    @param instance: the instance definition
262
    @type target: string
263
    @param target: the target node name
264
    @type live: boolean
265
    @param live: whether the migration should be done live or not (the
266
        interpretation of this parameter is left to the hypervisor)
267

268
    """
269
    c = Client("instance_migrate", [self._InstDict(instance), target, live])
270
    self._ConnectNode(c, node)
271
    return c.GetResults().get(node, False)
272

    
273
  def call_instance_reboot(self, node, instance, reboot_type, extra_args):
274
    """Reboots an instance.
275

276
    This is a single-node call.
277

278
    """
279
    c = Client("instance_reboot", [self._InstDict(instance),
280
                                   reboot_type, extra_args])
281
    self._ConnectNode(c, node)
282
    return c.GetResults().get(node, False)
283

    
284
  def call_instance_os_add(self, node, inst):
285
    """Installs an OS on the given instance.
286

287
    This is a single-node call.
288

289
    """
290
    params = [self._InstDict(inst)]
291
    c = Client("instance_os_add", params)
292
    self._ConnectNode(c, node)
293
    return c.GetResults().get(node, False)
294

    
295
  def call_instance_run_rename(self, node, inst, old_name):
296
    """Run the OS rename script for an instance.
297

298
    This is a single-node call.
299

300
    """
301
    params = [self._InstDict(inst), old_name]
302
    c = Client("instance_run_rename", params)
303
    self._ConnectNode(c, node)
304
    return c.GetResults().get(node, False)
305

    
306
  def call_instance_info(self, node, instance, hname):
307
    """Returns information about a single instance.
308

309
    This is a single-node call.
310

311
    @type node_list: list
312
    @param node_list: the list of nodes to query
313
    @type instance: string
314
    @param instance: the instance name
315
    @type hname: string
316
    @param hname: the hypervisor type of the instance
317

318
    """
319
    c = Client("instance_info", [instance, hname])
320
    self._ConnectNode(c, node)
321
    return c.GetResults().get(node, False)
322

    
323
  def call_all_instances_info(self, node_list, hypervisor_list):
324
    """Returns information about all instances on the given nodes.
325

326
    This is a multi-node call.
327

328
    @type node_list: list
329
    @param node_list: the list of nodes to query
330
    @type hypervisor_list: list
331
    @param hypervisor_list: the hypervisors to query for instances
332

333
    """
334
    c = Client("all_instances_info", [hypervisor_list])
335
    self._ConnectList(c, node_list)
336
    return c.GetResults()
337

    
338
  def call_instance_list(self, node_list, hypervisor_list):
339
    """Returns the list of running instances on a given node.
340

341
    This is a multi-node call.
342

343
    @type node_list: list
344
    @param node_list: the list of nodes to query
345
    @type hypervisor_list: list
346
    @param hypervisor_list: the hypervisors to query for instances
347

348
    """
349
    c = Client("instance_list", [hypervisor_list])
350
    self._ConnectList(c, node_list)
351
    return c.GetResults()
352

    
353
  def call_node_tcp_ping(self, node, source, target, port, timeout,
354
                         live_port_needed):
355
    """Do a TcpPing on the remote node
356

357
    This is a single-node call.
358

359
    """
360
    c = Client("node_tcp_ping", [source, target, port, timeout,
361
                                 live_port_needed])
362
    self._ConnectNode(c, node)
363
    return c.GetResults().get(node, False)
364

    
365
  def call_node_has_ip_address(self, node, address):
366
    """Checks if a node has the given IP address.
367

368
    This is a single-node call.
369

370
    """
371
    c = Client("node_has_ip_address", [address])
372
    self._ConnectNode(c, node)
373
    return c.GetResults().get(node, False)
374

    
375
  def call_node_info(self, node_list, vg_name, hypervisor_type):
376
    """Return node information.
377

378
    This will return memory information and volume group size and free
379
    space.
380

381
    This is a multi-node call.
382

383
    @type node_list: list
384
    @param node_list: the list of nodes to query
385
    @type vgname: C{string}
386
    @param vgname: the name of the volume group to ask for disk space
387
        information
388
    @type hypervisor_type: C{str}
389
    @param hypervisor_type: the name of the hypervisor to ask for
390
        memory information
391

392
    """
393
    c = Client("node_info", [vg_name, hypervisor_type])
394
    self._ConnectList(c, node_list)
395
    retux = c.GetResults()
396

    
397
    for node_name in retux:
398
      ret = retux.get(node_name, False)
399
      if type(ret) != dict:
400
        logging.error("could not connect to node %s", node_name)
401
        ret = {}
402

    
403
      utils.CheckDict(ret,
404
                      { 'memory_total' : '-',
405
                        'memory_dom0' : '-',
406
                        'memory_free' : '-',
407
                        'vg_size' : 'node_unreachable',
408
                        'vg_free' : '-' },
409
                      "call_node_info",
410
                      )
411
    return retux
412

    
413
  def call_node_add(self, node, dsa, dsapub, rsa, rsapub, ssh, sshpub):
414
    """Add a node to the cluster.
415

416
    This is a single-node call.
417

418
    """
419
    params = [dsa, dsapub, rsa, rsapub, ssh, sshpub]
420
    c = Client("node_add", params)
421
    self._ConnectNode(c, node)
422
    return c.GetResults().get(node, False)
423

    
424
  def call_node_verify(self, node_list, checkdict, cluster_name):
425
    """Request verification of given parameters.
426

427
    This is a multi-node call.
428

429
    """
430
    c = Client("node_verify", [checkdict, cluster_name])
431
    self._ConnectList(c, node_list)
432
    return c.GetResults()
433

    
434
  @staticmethod
435
  def call_node_start_master(node, start_daemons):
436
    """Tells a node to activate itself as a master.
437

438
    This is a single-node call.
439

440
    """
441
    c = Client("node_start_master", [start_daemons])
442
    c.ConnectNode(node)
443
    return c.GetResults().get(node, False)
444

    
445
  @staticmethod
446
  def call_node_stop_master(node, stop_daemons):
447
    """Tells a node to demote itself from master status.
448

449
    This is a single-node call.
450

451
    """
452
    c = Client("node_stop_master", [stop_daemons])
453
    c.ConnectNode(node)
454
    return c.GetResults().get(node, False)
455

    
456
  @staticmethod
457
  def call_master_info(node_list):
458
    """Query master info.
459

460
    This is a multi-node call.
461

462
    """
463
    # TODO: should this method query down nodes?
464
    c = Client("master_info", [])
465
    c.ConnectList(node_list)
466
    return c.GetResults()
467

    
468
  def call_version(self, node_list):
469
    """Query node version.
470

471
    This is a multi-node call.
472

473
    """
474
    c = Client("version", [])
475
    self._ConnectList(c, node_list)
476
    return c.GetResults()
477

    
478
  def call_blockdev_create(self, node, bdev, size, owner, on_primary, info):
479
    """Request creation of a given block device.
480

481
    This is a single-node call.
482

483
    """
484
    params = [bdev.ToDict(), size, owner, on_primary, info]
485
    c = Client("blockdev_create", params)
486
    self._ConnectNode(c, node)
487
    return c.GetResults().get(node, False)
488

    
489
  def call_blockdev_remove(self, node, bdev):
490
    """Request removal of a given block device.
491

492
    This is a single-node call.
493

494
    """
495
    c = Client("blockdev_remove", [bdev.ToDict()])
496
    self._ConnectNode(c, node)
497
    return c.GetResults().get(node, False)
498

    
499
  def call_blockdev_rename(self, node, devlist):
500
    """Request rename of the given block devices.
501

502
    This is a single-node call.
503

504
    """
505
    params = [(d.ToDict(), uid) for d, uid in devlist]
506
    c = Client("blockdev_rename", params)
507
    self._ConnectNode(c, node)
508
    return c.GetResults().get(node, False)
509

    
510
  def call_blockdev_assemble(self, node, disk, owner, on_primary):
511
    """Request assembling of a given block device.
512

513
    This is a single-node call.
514

515
    """
516
    params = [disk.ToDict(), owner, on_primary]
517
    c = Client("blockdev_assemble", params)
518
    self._ConnectNode(c, node)
519
    return c.GetResults().get(node, False)
520

    
521
  def call_blockdev_shutdown(self, node, disk):
522
    """Request shutdown of a given block device.
523

524
    This is a single-node call.
525

526
    """
527
    c = Client("blockdev_shutdown", [disk.ToDict()])
528
    self._ConnectNode(c, node)
529
    return c.GetResults().get(node, False)
530

    
531
  def call_blockdev_addchildren(self, node, bdev, ndevs):
532
    """Request adding a list of children to a (mirroring) device.
533

534
    This is a single-node call.
535

536
    """
537
    params = [bdev.ToDict(), [disk.ToDict() for disk in ndevs]]
538
    c = Client("blockdev_addchildren", params)
539
    self._ConnectNode(c, node)
540
    return c.GetResults().get(node, False)
541

    
542
  def call_blockdev_removechildren(self, node, bdev, ndevs):
543
    """Request removing a list of children from a (mirroring) device.
544

545
    This is a single-node call.
546

547
    """
548
    params = [bdev.ToDict(), [disk.ToDict() for disk in ndevs]]
549
    c = Client("blockdev_removechildren", params)
550
    self._ConnectNode(c, node)
551
    return c.GetResults().get(node, False)
552

    
553
  def call_blockdev_getmirrorstatus(self, node, disks):
554
    """Request status of a (mirroring) device.
555

556
    This is a single-node call.
557

558
    """
559
    params = [dsk.ToDict() for dsk in disks]
560
    c = Client("blockdev_getmirrorstatus", params)
561
    self._ConnectNode(c, node)
562
    return c.GetResults().get(node, False)
563

    
564
  def call_blockdev_find(self, node, disk):
565
    """Request identification of a given block device.
566

567
    This is a single-node call.
568

569
    """
570
    c = Client("blockdev_find", [disk.ToDict()])
571
    self._ConnectNode(c, node)
572
    return c.GetResults().get(node, False)
573

    
574
  def call_blockdev_close(self, node, disks):
575
    """Closes the given block devices.
576

577
    This is a single-node call.
578

579
    """
580
    params = [cf.ToDict() for cf in disks]
581
    c = Client("blockdev_close", params)
582
    self._ConnectNode(c, node)
583
    return c.GetResults().get(node, False)
584

    
585
  @staticmethod
586
  def call_upload_file(node_list, file_name, address_list=None):
587
    """Upload a file.
588

589
    The node will refuse the operation in case the file is not on the
590
    approved file list.
591

592
    This is a multi-node call.
593

594
    @type node_list: list
595
    @param node_list: the list of node names to upload to
596
    @type file_name: str
597
    @param file_name: the filename to upload
598
    @type address_list: list or None
599
    @keyword address_list: an optional list of node addresses, in order
600
        to optimize the RPC speed
601

602
    """
603
    fh = file(file_name)
604
    try:
605
      data = fh.read()
606
    finally:
607
      fh.close()
608
    st = os.stat(file_name)
609
    params = [file_name, data, st.st_mode, st.st_uid, st.st_gid,
610
              st.st_atime, st.st_mtime]
611
    c = Client("upload_file", params)
612
    c.ConnectList(node_list, address_list=address_list)
613
    return c.GetResults()
614

    
615
  def call_os_diagnose(self, node_list):
616
    """Request a diagnose of OS definitions.
617

618
    This is a multi-node call.
619

620
    """
621
    c = Client("os_diagnose", [])
622
    self._ConnectList(c, node_list)
623
    result = c.GetResults()
624
    new_result = {}
625
    for node_name in result:
626
      if result[node_name]:
627
        nr = [objects.OS.FromDict(oss) for oss in result[node_name]]
628
      else:
629
        nr = []
630
      new_result[node_name] = nr
631
    return new_result
632

    
633
  def call_os_get(self, node, name):
634
    """Returns an OS definition.
635

636
    This is a single-node call.
637

638
    """
639
    c = Client("os_get", [name])
640
    self._ConnectNode(c, node)
641
    result = c.GetResults().get(node, False)
642
    if isinstance(result, dict):
643
      return objects.OS.FromDict(result)
644
    else:
645
      return result
646

    
647
  def call_hooks_runner(self, node_list, hpath, phase, env):
648
    """Call the hooks runner.
649

650
    Args:
651
      - op: the OpCode instance
652
      - env: a dictionary with the environment
653

654
    This is a multi-node call.
655

656
    """
657
    params = [hpath, phase, env]
658
    c = Client("hooks_runner", params)
659
    self._ConnectList(c, node_list)
660
    result = c.GetResults()
661
    return result
662

    
663
  def call_iallocator_runner(self, node, name, idata):
664
    """Call an iallocator on a remote node
665

666
    Args:
667
      - name: the iallocator name
668
      - input: the json-encoded input string
669

670
    This is a single-node call.
671

672
    """
673
    params = [name, idata]
674
    c = Client("iallocator_runner", params)
675
    self._ConnectNode(c, node)
676
    result = c.GetResults().get(node, False)
677
    return result
678

    
679
  def call_blockdev_grow(self, node, cf_bdev, amount):
680
    """Request a snapshot of the given block device.
681

682
    This is a single-node call.
683

684
    """
685
    c = Client("blockdev_grow", [cf_bdev.ToDict(), amount])
686
    self._ConnectNode(c, node)
687
    return c.GetResults().get(node, False)
688

    
689
  def call_blockdev_snapshot(self, node, cf_bdev):
690
    """Request a snapshot of the given block device.
691

692
    This is a single-node call.
693

694
    """
695
    c = Client("blockdev_snapshot", [cf_bdev.ToDict()])
696
    self._ConnectNode(c, node)
697
    return c.GetResults().get(node, False)
698

    
699
  def call_snapshot_export(self, node, snap_bdev, dest_node, instance,
700
                           cluster_name, idx):
701
    """Request the export of a given snapshot.
702

703
    This is a single-node call.
704

705
    """
706
    params = [snap_bdev.ToDict(), dest_node,
707
              self._InstDict(instance), cluster_name, idx]
708
    c = Client("snapshot_export", params)
709
    self._ConnectNode(c, node)
710
    return c.GetResults().get(node, False)
711

    
712
  def call_finalize_export(self, node, instance, snap_disks):
713
    """Request the completion of an export operation.
714

715
    This writes the export config file, etc.
716

717
    This is a single-node call.
718

719
    """
720
    flat_disks = []
721
    for disk in snap_disks:
722
      flat_disks.append(disk.ToDict())
723
    params = [self._InstDict(instance), flat_disks]
724
    c = Client("finalize_export", params)
725
    self._ConnectNode(c, node)
726
    return c.GetResults().get(node, False)
727

    
728
  def call_export_info(self, node, path):
729
    """Queries the export information in a given path.
730

731
    This is a single-node call.
732

733
    """
734
    c = Client("export_info", [path])
735
    self._ConnectNode(c, node)
736
    result = c.GetResults().get(node, False)
737
    if not result:
738
      return result
739
    return objects.SerializableConfigParser.Loads(str(result))
740

    
741
  def call_instance_os_import(self, node, inst, src_node, src_images,
742
                              cluster_name):
743
    """Request the import of a backup into an instance.
744

745
    This is a single-node call.
746

747
    """
748
    params = [self._InstDict(inst), src_node, src_images, cluster_name]
749
    c = Client("instance_os_import", params)
750
    self._ConnectNode(c, node)
751
    return c.GetResults().get(node, False)
752

    
753
  def call_export_list(self, node_list):
754
    """Gets the stored exports list.
755

756
    This is a multi-node call.
757

758
    """
759
    c = Client("export_list", [])
760
    self._ConnectList(c, node_list)
761
    result = c.GetResults()
762
    return result
763

    
764
  def call_export_remove(self, node, export):
765
    """Requests removal of a given export.
766

767
    This is a single-node call.
768

769
    """
770
    c = Client("export_remove", [export])
771
    self._ConnectNode(c, node)
772
    return c.GetResults().get(node, False)
773

    
774
  @staticmethod
775
  def call_node_leave_cluster(node):
776
    """Requests a node to clean the cluster information it has.
777

778
    This will remove the configuration information from the ganeti data
779
    dir.
780

781
    This is a single-node call.
782

783
    """
784
    c = Client("node_leave_cluster", [])
785
    c.ConnectNode(node)
786
    return c.GetResults().get(node, False)
787

    
788
  def call_node_volumes(self, node_list):
789
    """Gets all volumes on node(s).
790

791
    This is a multi-node call.
792

793
    """
794
    c = Client("node_volumes", [])
795
    self._ConnectList(c, node_list)
796
    return c.GetResults()
797

    
798
  def call_test_delay(self, node_list, duration):
799
    """Sleep for a fixed time on given node(s).
800

801
    This is a multi-node call.
802

803
    """
804
    c = Client("test_delay", [duration])
805
    self._ConnectList(c, node_list)
806
    return c.GetResults()
807

    
808
  def call_file_storage_dir_create(self, node, file_storage_dir):
809
    """Create the given file storage directory.
810

811
    This is a single-node call.
812

813
    """
814
    c = Client("file_storage_dir_create", [file_storage_dir])
815
    self._ConnectNode(c, node)
816
    return c.GetResults().get(node, False)
817

    
818
  def call_file_storage_dir_remove(self, node, file_storage_dir):
819
    """Remove the given file storage directory.
820

821
    This is a single-node call.
822

823
    """
824
    c = Client("file_storage_dir_remove", [file_storage_dir])
825
    self._ConnectNode(c, node)
826
    return c.GetResults().get(node, False)
827

    
828
  def call_file_storage_dir_rename(self, node, old_file_storage_dir,
829
                                   new_file_storage_dir):
830
    """Rename file storage directory.
831

832
    This is a single-node call.
833

834
    """
835
    c = Client("file_storage_dir_rename",
836
               [old_file_storage_dir, new_file_storage_dir])
837
    self._ConnectNode(c, node)
838
    return c.GetResults().get(node, False)
839

    
840
  @staticmethod
841
  def call_jobqueue_update(node_list, address_list, file_name, content):
842
    """Update job queue.
843

844
    This is a multi-node call.
845

846
    """
847
    c = Client("jobqueue_update", [file_name, content])
848
    c.ConnectList(node_list, address_list=address_list)
849
    result = c.GetResults()
850
    return result
851

    
852
  @staticmethod
853
  def call_jobqueue_purge(node):
854
    """Purge job queue.
855

856
    This is a single-node call.
857

858
    """
859
    c = Client("jobqueue_purge", [])
860
    c.ConnectNode(node)
861
    return c.GetResults().get(node, False)
862

    
863
  @staticmethod
864
  def call_jobqueue_rename(node_list, address_list, old, new):
865
    """Rename a job queue file.
866

867
    This is a multi-node call.
868

869
    """
870
    c = Client("jobqueue_rename", [old, new])
871
    c.ConnectList(node_list, address_list=address_list)
872
    result = c.GetResults()
873
    return result
874

    
875

    
876
  @staticmethod
877
  def call_jobqueue_set_drain(node_list, drain_flag):
878
    """Set the drain flag on the queue.
879

880
    This is a multi-node call.
881

882
    @type node_list: list
883
    @param node_list: the list of nodes to query
884
    @type drain_flag: bool
885
    @param drain_flag: if True, will set the drain flag, otherwise reset it.
886

887
    """
888
    c = Client("jobqueue_set_drain", [drain_flag])
889
    c.ConnectList(node_list)
890
    result = c.GetResults()
891
    return result
892

    
893

    
894
  def call_hypervisor_validate_params(self, node_list, hvname, hvparams):
895
    """Validate the hypervisor params.
896

897
    This is a multi-node call.
898

899
    @type node_list: list
900
    @param node_list: the list of nodes to query
901
    @type hvname: string
902
    @param hvname: the hypervisor name
903
    @type hvparams: dict
904
    @param hvparams: the hypervisor parameters to be validated
905

906
    """
907
    cluster = self._cfg.GetClusterInfo()
908
    hv_full = cluster.FillDict(cluster.hvparams.get(hvname, {}), hvparams)
909
    c = Client("hypervisor_validate_params", [hvname, hv_full])
910
    self._ConnectList(c, node_list)
911
    result = c.GetResults()
912
    return result