Statistics
| Branch: | Tag: | Revision:

root / lib / rpc.py @ 4c7c81d7

History | View | Annotate | Download (24.8 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Inter-node RPC library.
23

24
"""
25

    
26
# pylint: disable-msg=C0103,R0201,R0904
27
# C0103: Invalid name, since call_ are not valid
28
# R0201: Method could be a function, we keep all rpcs instance methods
29
# as not to change them back and forth between static/instance methods
30
# if they need to start using instance attributes
31
# R0904: Too many public methods
32

    
33
import os
34
import socket
35
import httplib
36
import logging
37

    
38
from ganeti import utils
39
from ganeti import objects
40
from ganeti import http
41
from ganeti import serializer
42

    
43

    
44
class Client:
45
  """RPC Client class.
46

47
  This class, given a (remote) method name, a list of parameters and a
48
  list of nodes, will contact (in parallel) all nodes, and return a
49
  dict of results (key: node name, value: result).
50

51
  One current bug is that generic failure is still signalled by
52
  'False' result, which is not good. This overloading of values can
53
  cause bugs.
54

55
  """
56
  def __init__(self, procedure, args):
57
    self.procedure = procedure
58
    self.args = args
59
    self.body = serializer.DumpJson(args, indent=False)
60

    
61
    self.port = utils.GetNodeDaemonPort()
62
    self.nodepw = utils.GetNodeDaemonPassword()
63
    self.nc = {}
64

    
65
  def ConnectList(self, node_list, address_list=None):
66
    """Add a list of nodes to the target nodes.
67

68
    @type node_list: list
69
    @param node_list: the list of node names to connect
70
    @type address_list: list or None
71
    @keyword address_list: either None or a list with node addresses,
72
        which must have the same length as the node list
73

74
    """
75
    if address_list is None:
76
      address_list = [None for _ in node_list]
77
    else:
78
      assert len(node_list) == len(address_list), \
79
             "Name and address lists should have the same length"
80
    for node, address in zip(node_list, address_list):
81
      self.ConnectNode(node, address)
82

    
83
  def ConnectNode(self, name, address=None):
84
    """Add a node to the target list.
85

86
    @type name: str
87
    @param name: the node name
88
    @type address: str
89
    @keyword address: the node address, if known
90

91
    """
92
    if address is None:
93
      address = name
94

    
95
    self.nc[name] = http.HttpClientRequest(address, self.port, http.HTTP_PUT,
96
                                           "/%s" % self.procedure,
97
                                           post_data=self.body)
98

    
99
  def GetResults(self):
100
    """Call nodes and return results.
101

102
    @rtype: list
103
    @returns: List of RPC results
104

105
    """
106
    # TODO: Shared and reused manager
107
    mgr = http.HttpClientManager()
108
    try:
109
      mgr.ExecRequests(self.nc.values())
110
    finally:
111
      mgr.Shutdown()
112

    
113
    results = {}
114

    
115
    for name, req in self.nc.iteritems():
116
      if req.success and req.resp_status == http.HTTP_OK:
117
        results[name] = serializer.LoadJson(req.resp_body)
118
        continue
119

    
120
      if req.error:
121
        msg = req.error
122
      else:
123
        msg = req.resp_body
124

    
125
      logging.error("RPC error from node %s: %s", name, msg)
126
      results[name] = False
127

    
128
    return results
129

    
130

    
131
class RpcRunner(object):
132
  """RPC runner class"""
133

    
134
  def __init__(self, cfg):
135
    """Initialized the rpc runner.
136

137
    @type cfg:  C{config.ConfigWriter}
138
    @param cfg: the configuration object that will be used to get data
139
                about the cluster
140

141
    """
142
    self._cfg = cfg
143

    
144
  def _InstDict(self, instance):
145
    """Convert the given instance to a dict.
146

147
    This is done via the instance's ToDict() method and additionally
148
    we fill the hvparams with the cluster defaults.
149

150
    @type instance: L{objects.Instance}
151
    @param instance: an Instance object
152
    @rtype: dict
153
    @return: the instance dict, with the hvparams filled with the
154
        cluster defaults
155

156
    """
157
    idict = instance.ToDict()
158
    cluster = self._cfg.GetClusterInfo()
159
    idict["hvparams"] = cluster.FillHV(instance)
160
    idict["beparams"] = cluster.FillBE(instance)
161
    return idict
162

    
163
  def _ConnectList(self, client, node_list):
164
    """Helper for computing node addresses.
165

166
    @type client: L{Client}
167
    @param client: a C{Client} instance
168
    @type node_list: list
169
    @param node_list: the node list we should connect
170

171
    """
172
    all_nodes = self._cfg.GetAllNodesInfo()
173
    addr_list = []
174
    for node in node_list:
175
      if node in all_nodes:
176
        val = all_nodes[node].primary_ip
177
      else:
178
        val = None
179
      addr_list.append(val)
180
    client.ConnectList(node_list, address_list=addr_list)
181

    
182
  def _ConnectNode(self, client, node):
183
    """Helper for computing one node's address.
184

185
    @type client: L{Client}
186
    @param client: a C{Client} instance
187
    @type node: str
188
    @param node: the node we should connect
189

190
    """
191
    node_info = self._cfg.GetNodeInfo(node)
192
    if node_info is not None:
193
      addr = node_info.primary_ip
194
    else:
195
      addr = None
196
    client.ConnectNode(node, address=addr)
197

    
198
  def call_volume_list(self, node_list, vg_name):
199
    """Gets the logical volumes present in a given volume group.
200

201
    This is a multi-node call.
202

203
    """
204
    c = Client("volume_list", [vg_name])
205
    self._ConnectList(c, node_list)
206
    return c.GetResults()
207

    
208
  def call_vg_list(self, node_list):
209
    """Gets the volume group list.
210

211
    This is a multi-node call.
212

213
    """
214
    c = Client("vg_list", [])
215
    self._ConnectList(c, node_list)
216
    return c.GetResults()
217

    
218
  def call_bridges_exist(self, node, bridges_list):
219
    """Checks if a node has all the bridges given.
220

221
    This method checks if all bridges given in the bridges_list are
222
    present on the remote node, so that an instance that uses interfaces
223
    on those bridges can be started.
224

225
    This is a single-node call.
226

227
    """
228
    c = Client("bridges_exist", [bridges_list])
229
    self._ConnectNode(c, node)
230
    return c.GetResults().get(node, False)
231

    
232
  def call_instance_start(self, node, instance, extra_args):
233
    """Starts an instance.
234

235
    This is a single-node call.
236

237
    """
238
    c = Client("instance_start", [self._InstDict(instance), extra_args])
239
    self._ConnectNode(c, node)
240
    return c.GetResults().get(node, False)
241

    
242
  def call_instance_shutdown(self, node, instance):
243
    """Stops an instance.
244

245
    This is a single-node call.
246

247
    """
248
    c = Client("instance_shutdown", [self._InstDict(instance)])
249
    self._ConnectNode(c, node)
250
    return c.GetResults().get(node, False)
251

    
252
  def call_instance_migrate(self, node, instance, target, live):
253
    """Migrate an instance.
254

255
    This is a single-node call.
256

257
    @type node: string
258
    @param node: the node on which the instance is currently running
259
    @type instance: C{objects.Instance}
260
    @param instance: the instance definition
261
    @type target: string
262
    @param target: the target node name
263
    @type live: boolean
264
    @param live: whether the migration should be done live or not (the
265
        interpretation of this parameter is left to the hypervisor)
266

267
    """
268
    c = Client("instance_migrate", [self._InstDict(instance), target, live])
269
    self._ConnectNode(c, node)
270
    return c.GetResults().get(node, False)
271

    
272
  def call_instance_reboot(self, node, instance, reboot_type, extra_args):
273
    """Reboots an instance.
274

275
    This is a single-node call.
276

277
    """
278
    c = Client("instance_reboot", [self._InstDict(instance),
279
                                   reboot_type, extra_args])
280
    self._ConnectNode(c, node)
281
    return c.GetResults().get(node, False)
282

    
283
  def call_instance_os_add(self, node, inst):
284
    """Installs an OS on the given instance.
285

286
    This is a single-node call.
287

288
    """
289
    params = [self._InstDict(inst)]
290
    c = Client("instance_os_add", params)
291
    self._ConnectNode(c, node)
292
    return c.GetResults().get(node, False)
293

    
294
  def call_instance_run_rename(self, node, inst, old_name):
295
    """Run the OS rename script for an instance.
296

297
    This is a single-node call.
298

299
    """
300
    params = [self._InstDict(inst), old_name]
301
    c = Client("instance_run_rename", params)
302
    self._ConnectNode(c, node)
303
    return c.GetResults().get(node, False)
304

    
305
  def call_instance_info(self, node, instance, hname):
306
    """Returns information about a single instance.
307

308
    This is a single-node call.
309

310
    @type node_list: list
311
    @param node_list: the list of nodes to query
312
    @type instance: string
313
    @param instance: the instance name
314
    @type hname: string
315
    @param hname: the hypervisor type of the instance
316

317
    """
318
    c = Client("instance_info", [instance, hname])
319
    self._ConnectNode(c, node)
320
    return c.GetResults().get(node, False)
321

    
322
  def call_all_instances_info(self, node_list, hypervisor_list):
323
    """Returns information about all instances on the given nodes.
324

325
    This is a multi-node call.
326

327
    @type node_list: list
328
    @param node_list: the list of nodes to query
329
    @type hypervisor_list: list
330
    @param hypervisor_list: the hypervisors to query for instances
331

332
    """
333
    c = Client("all_instances_info", [hypervisor_list])
334
    self._ConnectList(c, node_list)
335
    return c.GetResults()
336

    
337
  def call_instance_list(self, node_list, hypervisor_list):
338
    """Returns the list of running instances on a given node.
339

340
    This is a multi-node call.
341

342
    @type node_list: list
343
    @param node_list: the list of nodes to query
344
    @type hypervisor_list: list
345
    @param hypervisor_list: the hypervisors to query for instances
346

347
    """
348
    c = Client("instance_list", [hypervisor_list])
349
    self._ConnectList(c, node_list)
350
    return c.GetResults()
351

    
352
  def call_node_tcp_ping(self, node, source, target, port, timeout,
353
                         live_port_needed):
354
    """Do a TcpPing on the remote node
355

356
    This is a single-node call.
357

358
    """
359
    c = Client("node_tcp_ping", [source, target, port, timeout,
360
                                 live_port_needed])
361
    self._ConnectNode(c, node)
362
    return c.GetResults().get(node, False)
363

    
364
  def call_node_has_ip_address(self, node, address):
365
    """Checks if a node has the given IP address.
366

367
    This is a single-node call.
368

369
    """
370
    c = Client("node_has_ip_address", [address])
371
    self._ConnectNode(c, node)
372
    return c.GetResults().get(node, False)
373

    
374
  def call_node_info(self, node_list, vg_name, hypervisor_type):
375
    """Return node information.
376

377
    This will return memory information and volume group size and free
378
    space.
379

380
    This is a multi-node call.
381

382
    @type node_list: list
383
    @param node_list: the list of nodes to query
384
    @type vgname: C{string}
385
    @param vgname: the name of the volume group to ask for disk space
386
        information
387
    @type hypervisor_type: C{str}
388
    @param hypervisor_type: the name of the hypervisor to ask for
389
        memory information
390

391
    """
392
    c = Client("node_info", [vg_name, hypervisor_type])
393
    self._ConnectList(c, node_list)
394
    retux = c.GetResults()
395

    
396
    for node_name in retux:
397
      ret = retux.get(node_name, False)
398
      if type(ret) != dict:
399
        logging.error("could not connect to node %s", node_name)
400
        ret = {}
401

    
402
      utils.CheckDict(ret,
403
                      { 'memory_total' : '-',
404
                        'memory_dom0' : '-',
405
                        'memory_free' : '-',
406
                        'vg_size' : 'node_unreachable',
407
                        'vg_free' : '-' },
408
                      "call_node_info",
409
                      )
410
    return retux
411

    
412
  def call_node_add(self, node, dsa, dsapub, rsa, rsapub, ssh, sshpub):
413
    """Add a node to the cluster.
414

415
    This is a single-node call.
416

417
    """
418
    params = [dsa, dsapub, rsa, rsapub, ssh, sshpub]
419
    c = Client("node_add", params)
420
    self._ConnectNode(c, node)
421
    return c.GetResults().get(node, False)
422

    
423
  def call_node_verify(self, node_list, checkdict, cluster_name):
424
    """Request verification of given parameters.
425

426
    This is a multi-node call.
427

428
    """
429
    c = Client("node_verify", [checkdict, cluster_name])
430
    self._ConnectList(c, node_list)
431
    return c.GetResults()
432

    
433
  @staticmethod
434
  def call_node_start_master(node, start_daemons):
435
    """Tells a node to activate itself as a master.
436

437
    This is a single-node call.
438

439
    """
440
    c = Client("node_start_master", [start_daemons])
441
    c.ConnectNode(node)
442
    return c.GetResults().get(node, False)
443

    
444
  @staticmethod
445
  def call_node_stop_master(node, stop_daemons):
446
    """Tells a node to demote itself from master status.
447

448
    This is a single-node call.
449

450
    """
451
    c = Client("node_stop_master", [stop_daemons])
452
    c.ConnectNode(node)
453
    return c.GetResults().get(node, False)
454

    
455
  @staticmethod
456
  def call_master_info(node_list):
457
    """Query master info.
458

459
    This is a multi-node call.
460

461
    """
462
    # TODO: should this method query down nodes?
463
    c = Client("master_info", [])
464
    c.ConnectList(node_list)
465
    return c.GetResults()
466

    
467
  def call_version(self, node_list):
468
    """Query node version.
469

470
    This is a multi-node call.
471

472
    """
473
    c = Client("version", [])
474
    self._ConnectList(c, node_list)
475
    return c.GetResults()
476

    
477
  def call_blockdev_create(self, node, bdev, size, owner, on_primary, info):
478
    """Request creation of a given block device.
479

480
    This is a single-node call.
481

482
    """
483
    params = [bdev.ToDict(), size, owner, on_primary, info]
484
    c = Client("blockdev_create", params)
485
    self._ConnectNode(c, node)
486
    return c.GetResults().get(node, False)
487

    
488
  def call_blockdev_remove(self, node, bdev):
489
    """Request removal of a given block device.
490

491
    This is a single-node call.
492

493
    """
494
    c = Client("blockdev_remove", [bdev.ToDict()])
495
    self._ConnectNode(c, node)
496
    return c.GetResults().get(node, False)
497

    
498
  def call_blockdev_rename(self, node, devlist):
499
    """Request rename of the given block devices.
500

501
    This is a single-node call.
502

503
    """
504
    params = [(d.ToDict(), uid) for d, uid in devlist]
505
    c = Client("blockdev_rename", params)
506
    self._ConnectNode(c, node)
507
    return c.GetResults().get(node, False)
508

    
509
  def call_blockdev_assemble(self, node, disk, owner, on_primary):
510
    """Request assembling of a given block device.
511

512
    This is a single-node call.
513

514
    """
515
    params = [disk.ToDict(), owner, on_primary]
516
    c = Client("blockdev_assemble", params)
517
    self._ConnectNode(c, node)
518
    return c.GetResults().get(node, False)
519

    
520
  def call_blockdev_shutdown(self, node, disk):
521
    """Request shutdown of a given block device.
522

523
    This is a single-node call.
524

525
    """
526
    c = Client("blockdev_shutdown", [disk.ToDict()])
527
    self._ConnectNode(c, node)
528
    return c.GetResults().get(node, False)
529

    
530
  def call_blockdev_addchildren(self, node, bdev, ndevs):
531
    """Request adding a list of children to a (mirroring) device.
532

533
    This is a single-node call.
534

535
    """
536
    params = [bdev.ToDict(), [disk.ToDict() for disk in ndevs]]
537
    c = Client("blockdev_addchildren", params)
538
    self._ConnectNode(c, node)
539
    return c.GetResults().get(node, False)
540

    
541
  def call_blockdev_removechildren(self, node, bdev, ndevs):
542
    """Request removing a list of children from a (mirroring) device.
543

544
    This is a single-node call.
545

546
    """
547
    params = [bdev.ToDict(), [disk.ToDict() for disk in ndevs]]
548
    c = Client("blockdev_removechildren", params)
549
    self._ConnectNode(c, node)
550
    return c.GetResults().get(node, False)
551

    
552
  def call_blockdev_getmirrorstatus(self, node, disks):
553
    """Request status of a (mirroring) device.
554

555
    This is a single-node call.
556

557
    """
558
    params = [dsk.ToDict() for dsk in disks]
559
    c = Client("blockdev_getmirrorstatus", params)
560
    self._ConnectNode(c, node)
561
    return c.GetResults().get(node, False)
562

    
563
  def call_blockdev_find(self, node, disk):
564
    """Request identification of a given block device.
565

566
    This is a single-node call.
567

568
    """
569
    c = Client("blockdev_find", [disk.ToDict()])
570
    self._ConnectNode(c, node)
571
    return c.GetResults().get(node, False)
572

    
573
  def call_blockdev_close(self, node, disks):
574
    """Closes the given block devices.
575

576
    This is a single-node call.
577

578
    """
579
    params = [cf.ToDict() for cf in disks]
580
    c = Client("blockdev_close", params)
581
    self._ConnectNode(c, node)
582
    return c.GetResults().get(node, False)
583

    
584
  @staticmethod
585
  def call_upload_file(node_list, file_name, address_list=None):
586
    """Upload a file.
587

588
    The node will refuse the operation in case the file is not on the
589
    approved file list.
590

591
    This is a multi-node call.
592

593
    @type node_list: list
594
    @param node_list: the list of node names to upload to
595
    @type file_name: str
596
    @param file_name: the filename to upload
597
    @type address_list: list or None
598
    @keyword address_list: an optional list of node addresses, in order
599
        to optimize the RPC speed
600

601
    """
602
    data = utils.ReadFile(file_name)
603
    st = os.stat(file_name)
604
    params = [file_name, data, st.st_mode, st.st_uid, st.st_gid,
605
              st.st_atime, st.st_mtime]
606
    c = Client("upload_file", params)
607
    c.ConnectList(node_list, address_list=address_list)
608
    return c.GetResults()
609

    
610
  def call_os_diagnose(self, node_list):
611
    """Request a diagnose of OS definitions.
612

613
    This is a multi-node call.
614

615
    """
616
    c = Client("os_diagnose", [])
617
    self._ConnectList(c, node_list)
618
    result = c.GetResults()
619
    new_result = {}
620
    for node_name in result:
621
      if result[node_name]:
622
        nr = [objects.OS.FromDict(oss) for oss in result[node_name]]
623
      else:
624
        nr = []
625
      new_result[node_name] = nr
626
    return new_result
627

    
628
  def call_os_get(self, node, name):
629
    """Returns an OS definition.
630

631
    This is a single-node call.
632

633
    """
634
    c = Client("os_get", [name])
635
    self._ConnectNode(c, node)
636
    result = c.GetResults().get(node, False)
637
    if isinstance(result, dict):
638
      return objects.OS.FromDict(result)
639
    else:
640
      return result
641

    
642
  def call_hooks_runner(self, node_list, hpath, phase, env):
643
    """Call the hooks runner.
644

645
    Args:
646
      - op: the OpCode instance
647
      - env: a dictionary with the environment
648

649
    This is a multi-node call.
650

651
    """
652
    params = [hpath, phase, env]
653
    c = Client("hooks_runner", params)
654
    self._ConnectList(c, node_list)
655
    result = c.GetResults()
656
    return result
657

    
658
  def call_iallocator_runner(self, node, name, idata):
659
    """Call an iallocator on a remote node
660

661
    Args:
662
      - name: the iallocator name
663
      - input: the json-encoded input string
664

665
    This is a single-node call.
666

667
    """
668
    params = [name, idata]
669
    c = Client("iallocator_runner", params)
670
    self._ConnectNode(c, node)
671
    result = c.GetResults().get(node, False)
672
    return result
673

    
674
  def call_blockdev_grow(self, node, cf_bdev, amount):
675
    """Request a snapshot of the given block device.
676

677
    This is a single-node call.
678

679
    """
680
    c = Client("blockdev_grow", [cf_bdev.ToDict(), amount])
681
    self._ConnectNode(c, node)
682
    return c.GetResults().get(node, False)
683

    
684
  def call_blockdev_snapshot(self, node, cf_bdev):
685
    """Request a snapshot of the given block device.
686

687
    This is a single-node call.
688

689
    """
690
    c = Client("blockdev_snapshot", [cf_bdev.ToDict()])
691
    self._ConnectNode(c, node)
692
    return c.GetResults().get(node, False)
693

    
694
  def call_snapshot_export(self, node, snap_bdev, dest_node, instance,
695
                           cluster_name, idx):
696
    """Request the export of a given snapshot.
697

698
    This is a single-node call.
699

700
    """
701
    params = [snap_bdev.ToDict(), dest_node,
702
              self._InstDict(instance), cluster_name, idx]
703
    c = Client("snapshot_export", params)
704
    self._ConnectNode(c, node)
705
    return c.GetResults().get(node, False)
706

    
707
  def call_finalize_export(self, node, instance, snap_disks):
708
    """Request the completion of an export operation.
709

710
    This writes the export config file, etc.
711

712
    This is a single-node call.
713

714
    """
715
    flat_disks = []
716
    for disk in snap_disks:
717
      flat_disks.append(disk.ToDict())
718
    params = [self._InstDict(instance), flat_disks]
719
    c = Client("finalize_export", params)
720
    self._ConnectNode(c, node)
721
    return c.GetResults().get(node, False)
722

    
723
  def call_export_info(self, node, path):
724
    """Queries the export information in a given path.
725

726
    This is a single-node call.
727

728
    """
729
    c = Client("export_info", [path])
730
    self._ConnectNode(c, node)
731
    result = c.GetResults().get(node, False)
732
    if not result:
733
      return result
734
    return objects.SerializableConfigParser.Loads(str(result))
735

    
736
  def call_instance_os_import(self, node, inst, src_node, src_images,
737
                              cluster_name):
738
    """Request the import of a backup into an instance.
739

740
    This is a single-node call.
741

742
    """
743
    params = [self._InstDict(inst), src_node, src_images, cluster_name]
744
    c = Client("instance_os_import", params)
745
    self._ConnectNode(c, node)
746
    return c.GetResults().get(node, False)
747

    
748
  def call_export_list(self, node_list):
749
    """Gets the stored exports list.
750

751
    This is a multi-node call.
752

753
    """
754
    c = Client("export_list", [])
755
    self._ConnectList(c, node_list)
756
    result = c.GetResults()
757
    return result
758

    
759
  def call_export_remove(self, node, export):
760
    """Requests removal of a given export.
761

762
    This is a single-node call.
763

764
    """
765
    c = Client("export_remove", [export])
766
    self._ConnectNode(c, node)
767
    return c.GetResults().get(node, False)
768

    
769
  @staticmethod
770
  def call_node_leave_cluster(node):
771
    """Requests a node to clean the cluster information it has.
772

773
    This will remove the configuration information from the ganeti data
774
    dir.
775

776
    This is a single-node call.
777

778
    """
779
    c = Client("node_leave_cluster", [])
780
    c.ConnectNode(node)
781
    return c.GetResults().get(node, False)
782

    
783
  def call_node_volumes(self, node_list):
784
    """Gets all volumes on node(s).
785

786
    This is a multi-node call.
787

788
    """
789
    c = Client("node_volumes", [])
790
    self._ConnectList(c, node_list)
791
    return c.GetResults()
792

    
793
  def call_test_delay(self, node_list, duration):
794
    """Sleep for a fixed time on given node(s).
795

796
    This is a multi-node call.
797

798
    """
799
    c = Client("test_delay", [duration])
800
    self._ConnectList(c, node_list)
801
    return c.GetResults()
802

    
803
  def call_file_storage_dir_create(self, node, file_storage_dir):
804
    """Create the given file storage directory.
805

806
    This is a single-node call.
807

808
    """
809
    c = Client("file_storage_dir_create", [file_storage_dir])
810
    self._ConnectNode(c, node)
811
    return c.GetResults().get(node, False)
812

    
813
  def call_file_storage_dir_remove(self, node, file_storage_dir):
814
    """Remove the given file storage directory.
815

816
    This is a single-node call.
817

818
    """
819
    c = Client("file_storage_dir_remove", [file_storage_dir])
820
    self._ConnectNode(c, node)
821
    return c.GetResults().get(node, False)
822

    
823
  def call_file_storage_dir_rename(self, node, old_file_storage_dir,
824
                                   new_file_storage_dir):
825
    """Rename file storage directory.
826

827
    This is a single-node call.
828

829
    """
830
    c = Client("file_storage_dir_rename",
831
               [old_file_storage_dir, new_file_storage_dir])
832
    self._ConnectNode(c, node)
833
    return c.GetResults().get(node, False)
834

    
835
  @staticmethod
836
  def call_jobqueue_update(node_list, address_list, file_name, content):
837
    """Update job queue.
838

839
    This is a multi-node call.
840

841
    """
842
    c = Client("jobqueue_update", [file_name, content])
843
    c.ConnectList(node_list, address_list=address_list)
844
    result = c.GetResults()
845
    return result
846

    
847
  @staticmethod
848
  def call_jobqueue_purge(node):
849
    """Purge job queue.
850

851
    This is a single-node call.
852

853
    """
854
    c = Client("jobqueue_purge", [])
855
    c.ConnectNode(node)
856
    return c.GetResults().get(node, False)
857

    
858
  @staticmethod
859
  def call_jobqueue_rename(node_list, address_list, old, new):
860
    """Rename a job queue file.
861

862
    This is a multi-node call.
863

864
    """
865
    c = Client("jobqueue_rename", [old, new])
866
    c.ConnectList(node_list, address_list=address_list)
867
    result = c.GetResults()
868
    return result
869

    
870

    
871
  @staticmethod
872
  def call_jobqueue_set_drain(node_list, drain_flag):
873
    """Set the drain flag on the queue.
874

875
    This is a multi-node call.
876

877
    @type node_list: list
878
    @param node_list: the list of nodes to query
879
    @type drain_flag: bool
880
    @param drain_flag: if True, will set the drain flag, otherwise reset it.
881

882
    """
883
    c = Client("jobqueue_set_drain", [drain_flag])
884
    c.ConnectList(node_list)
885
    result = c.GetResults()
886
    return result
887

    
888

    
889
  def call_hypervisor_validate_params(self, node_list, hvname, hvparams):
890
    """Validate the hypervisor params.
891

892
    This is a multi-node call.
893

894
    @type node_list: list
895
    @param node_list: the list of nodes to query
896
    @type hvname: string
897
    @param hvname: the hypervisor name
898
    @type hvparams: dict
899
    @param hvparams: the hypervisor parameters to be validated
900

901
    """
902
    cluster = self._cfg.GetClusterInfo()
903
    hv_full = cluster.FillDict(cluster.hvparams.get(hvname, {}), hvparams)
904
    c = Client("hypervisor_validate_params", [hvname, hv_full])
905
    self._ConnectList(c, node_list)
906
    result = c.GetResults()
907
    return result