Statistics
| Branch: | Tag: | Revision:

root / lib / rpc.py @ 160e2921

History | View | Annotate | Download (25.4 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Inter-node RPC library.
23

24
"""
25

    
26
# pylint: disable-msg=C0103,R0201,R0904
27
# C0103: Invalid name, since call_ are not valid
28
# R0201: Method could be a function, we keep all rpcs instance methods
29
# as not to change them back and forth between static/instance methods
30
# if they need to start using instance attributes
31
# R0904: Too many public methods
32

    
33
import os
34
import socket
35
import httplib
36
import logging
37

    
38
from ganeti import utils
39
from ganeti import objects
40
from ganeti import http
41
from ganeti import serializer
42
from ganeti import constants
43

    
44

    
45
# Module level variable
46
_http_manager = None
47

    
48

    
49
def Init():
50
  """Initializes the module-global HTTP client manager.
51

52
  Must be called before using any RPC function.
53

54
  """
55
  global _http_manager
56

    
57
  assert not _http_manager, "RPC module initialized more than once"
58

    
59
  _http_manager = http.HttpClientManager()
60

    
61

    
62
def Shutdown():
63
  """Stops the module-global HTTP client manager.
64

65
  Must be called before quitting the program.
66

67
  """
68
  global _http_manager
69

    
70
  if _http_manager:
71
    _http_manager.Shutdown()
72
    _http_manager = None
73

    
74

    
75
class Client:
76
  """RPC Client class.
77

78
  This class, given a (remote) method name, a list of parameters and a
79
  list of nodes, will contact (in parallel) all nodes, and return a
80
  dict of results (key: node name, value: result).
81

82
  One current bug is that generic failure is still signalled by
83
  'False' result, which is not good. This overloading of values can
84
  cause bugs.
85

86
  """
87
  def __init__(self, procedure, body, port):
88
    self.procedure = procedure
89
    self.body = body
90
    self.port = port
91
    self.nc = {}
92

    
93
    self._ssl_params = \
94
      http.HttpSslParams(ssl_key_path=constants.SSL_CERT_FILE,
95
                         ssl_cert_path=constants.SSL_CERT_FILE)
96

    
97
  def ConnectList(self, node_list, address_list=None):
98
    """Add a list of nodes to the target nodes.
99

100
    @type node_list: list
101
    @param node_list: the list of node names to connect
102
    @type address_list: list or None
103
    @keyword address_list: either None or a list with node addresses,
104
        which must have the same length as the node list
105

106
    """
107
    if address_list is None:
108
      address_list = [None for _ in node_list]
109
    else:
110
      assert len(node_list) == len(address_list), \
111
             "Name and address lists should have the same length"
112
    for node, address in zip(node_list, address_list):
113
      self.ConnectNode(node, address)
114

    
115
  def ConnectNode(self, name, address=None):
116
    """Add a node to the target list.
117

118
    @type name: str
119
    @param name: the node name
120
    @type address: str
121
    @keyword address: the node address, if known
122

123
    """
124
    if address is None:
125
      address = name
126

    
127
    self.nc[name] = http.HttpClientRequest(address, self.port, http.HTTP_PUT,
128
                                           "/%s" % self.procedure,
129
                                           post_data=self.body,
130
                                           ssl_params=self._ssl_params,
131
                                           ssl_verify_peer=True)
132

    
133
  def GetResults(self):
134
    """Call nodes and return results.
135

136
    @rtype: list
137
    @returns: List of RPC results
138

139
    """
140
    assert _http_manager, "RPC module not intialized"
141

    
142
    _http_manager.ExecRequests(self.nc.values())
143

    
144
    results = {}
145

    
146
    for name, req in self.nc.iteritems():
147
      if req.success and req.resp_status == http.HTTP_OK:
148
        results[name] = serializer.LoadJson(req.resp_body)
149
        continue
150

    
151
      # TODO: Better error reporting
152
      if req.error:
153
        msg = req.error
154
      else:
155
        msg = req.resp_body
156

    
157
      logging.error("RPC error from node %s: %s", name, msg)
158
      results[name] = False
159

    
160
    return results
161

    
162

    
163
class RpcRunner(object):
164
  """RPC runner class"""
165

    
166
  def __init__(self, cfg):
167
    """Initialized the rpc runner.
168

169
    @type cfg:  C{config.ConfigWriter}
170
    @param cfg: the configuration object that will be used to get data
171
                about the cluster
172

173
    """
174
    self._cfg = cfg
175
    self.port = utils.GetNodeDaemonPort()
176

    
177
  def _InstDict(self, instance):
178
    """Convert the given instance to a dict.
179

180
    This is done via the instance's ToDict() method and additionally
181
    we fill the hvparams with the cluster defaults.
182

183
    @type instance: L{objects.Instance}
184
    @param instance: an Instance object
185
    @rtype: dict
186
    @return: the instance dict, with the hvparams filled with the
187
        cluster defaults
188

189
    """
190
    idict = instance.ToDict()
191
    cluster = self._cfg.GetClusterInfo()
192
    idict["hvparams"] = cluster.FillHV(instance)
193
    idict["beparams"] = cluster.FillBE(instance)
194
    return idict
195

    
196
  def _ConnectList(self, client, node_list):
197
    """Helper for computing node addresses.
198

199
    @type client: L{Client}
200
    @param client: a C{Client} instance
201
    @type node_list: list
202
    @param node_list: the node list we should connect
203

204
    """
205
    all_nodes = self._cfg.GetAllNodesInfo()
206
    addr_list = []
207
    for node in node_list:
208
      if node in all_nodes:
209
        val = all_nodes[node].primary_ip
210
      else:
211
        val = None
212
      addr_list.append(val)
213
    client.ConnectList(node_list, address_list=addr_list)
214

    
215
  def _ConnectNode(self, client, node):
216
    """Helper for computing one node's address.
217

218
    @type client: L{Client}
219
    @param client: a C{Client} instance
220
    @type node: str
221
    @param node: the node we should connect
222

223
    """
224
    node_info = self._cfg.GetNodeInfo(node)
225
    if node_info is not None:
226
      addr = node_info.primary_ip
227
    else:
228
      addr = None
229
    client.ConnectNode(node, address=addr)
230

    
231
  def _MultiNodeCall(self, node_list, procedure, args,
232
                     address_list=None):
233
    """Helper for making a multi-node call
234

235
    """
236
    body = serializer.DumpJson(args, indent=False)
237
    c = Client(procedure, body, self.port)
238
    if address_list is None:
239
      self._ConnectList(c, node_list)
240
    else:
241
      c.ConnectList(node_list, address_list=address_list)
242
    return c.GetResults()
243

    
244
  @classmethod
245
  def _StaticMultiNodeCall(cls, node_list, procedure, args,
246
                           address_list=None):
247
    """Helper for making a multi-node static call
248

249
    """
250
    body = serializer.DumpJson(args, indent=False)
251
    c = Client(procedure, body, utils.GetNodeDaemonPort())
252
    c.ConnectList(node_list, address_list=address_list)
253
    return c.GetResults()
254

    
255
  def _SingleNodeCall(self, node, procedure, args):
256
    """Helper for making a single-node call
257

258
    """
259
    body = serializer.DumpJson(args, indent=False)
260
    c = Client(procedure, body, self.port)
261
    self._ConnectNode(c, node)
262
    return c.GetResults().get(node, False)
263

    
264
  @classmethod
265
  def _StaticSingleNodeCall(cls, node, procedure, args):
266
    """Helper for making a single-node static call
267

268
    """
269
    body = serializer.DumpJson(args, indent=False)
270
    c = Client(procedure, body, utils.GetNodeDaemonPort())
271
    c.ConnectNode(c, node)
272
    return c.GetResults().get(node, False)
273

    
274
  def call_volume_list(self, node_list, vg_name):
275
    """Gets the logical volumes present in a given volume group.
276

277
    This is a multi-node call.
278

279
    """
280
    return self._MultiNodeCall(node_list, "volume_list", [vg_name])
281

    
282
  def call_vg_list(self, node_list):
283
    """Gets the volume group list.
284

285
    This is a multi-node call.
286

287
    """
288
    return self._MultiNodeCall(node_list, "vg_list", [])
289

    
290
  def call_bridges_exist(self, node, bridges_list):
291
    """Checks if a node has all the bridges given.
292

293
    This method checks if all bridges given in the bridges_list are
294
    present on the remote node, so that an instance that uses interfaces
295
    on those bridges can be started.
296

297
    This is a single-node call.
298

299
    """
300
    return self._SingleNodeCall(node, "bridges_exist", [bridges_list])
301

    
302
  def call_instance_start(self, node, instance, extra_args):
303
    """Starts an instance.
304

305
    This is a single-node call.
306

307
    """
308
    return self._SingleNodeCall(node, "instance_start",
309
                                [self._InstDict(instance), extra_args])
310

    
311
  def call_instance_shutdown(self, node, instance):
312
    """Stops an instance.
313

314
    This is a single-node call.
315

316
    """
317
    return self._SingleNodeCall(node, "instance_shutdown",
318
                                [self._InstDict(instance)])
319

    
320
  def call_instance_migrate(self, node, instance, target, live):
321
    """Migrate an instance.
322

323
    This is a single-node call.
324

325
    @type node: string
326
    @param node: the node on which the instance is currently running
327
    @type instance: C{objects.Instance}
328
    @param instance: the instance definition
329
    @type target: string
330
    @param target: the target node name
331
    @type live: boolean
332
    @param live: whether the migration should be done live or not (the
333
        interpretation of this parameter is left to the hypervisor)
334

335
    """
336
    return self._SingleNodeCall(node, "instance_migrate",
337
                                [self._InstDict(instance), target, live])
338

    
339
  def call_instance_reboot(self, node, instance, reboot_type, extra_args):
340
    """Reboots an instance.
341

342
    This is a single-node call.
343

344
    """
345
    return self._SingleNodeCall(node, "instance_reboot",
346
                                [self._InstDict(instance), reboot_type,
347
                                 extra_args])
348

    
349
  def call_instance_os_add(self, node, inst):
350
    """Installs an OS on the given instance.
351

352
    This is a single-node call.
353

354
    """
355
    return self._SingleNodeCall(node, "instance_os_add",
356
                                [self._InstDict(inst)])
357

    
358
  def call_instance_run_rename(self, node, inst, old_name):
359
    """Run the OS rename script for an instance.
360

361
    This is a single-node call.
362

363
    """
364
    return self._SingleNodeCall(node, "instance_run_rename",
365
                                [self._InstDict(inst), old_name])
366

    
367
  def call_instance_info(self, node, instance, hname):
368
    """Returns information about a single instance.
369

370
    This is a single-node call.
371

372
    @type node: list
373
    @param node: the list of nodes to query
374
    @type instance: string
375
    @param instance: the instance name
376
    @type hname: string
377
    @param hname: the hypervisor type of the instance
378

379
    """
380
    return self._SingleNodeCall(node, "instance_info", [instance, hname])
381

    
382
  def call_all_instances_info(self, node_list, hypervisor_list):
383
    """Returns information about all instances on the given nodes.
384

385
    This is a multi-node call.
386

387
    @type node_list: list
388
    @param node_list: the list of nodes to query
389
    @type hypervisor_list: list
390
    @param hypervisor_list: the hypervisors to query for instances
391

392
    """
393
    return self._MultiNodeCall(node_list, "all_instances_info",
394
                               [hypervisor_list])
395

    
396
  def call_instance_list(self, node_list, hypervisor_list):
397
    """Returns the list of running instances on a given node.
398

399
    This is a multi-node call.
400

401
    @type node_list: list
402
    @param node_list: the list of nodes to query
403
    @type hypervisor_list: list
404
    @param hypervisor_list: the hypervisors to query for instances
405

406
    """
407
    return self._MultiNodeCall(node_list, "instance_list", [hypervisor_list])
408

    
409
  def call_node_tcp_ping(self, node, source, target, port, timeout,
410
                         live_port_needed):
411
    """Do a TcpPing on the remote node
412

413
    This is a single-node call.
414

415
    """
416
    return self._SingleNodeCall(node, "node_tcp_ping",
417
                                [source, target, port, timeout,
418
                                 live_port_needed])
419

    
420
  def call_node_has_ip_address(self, node, address):
421
    """Checks if a node has the given IP address.
422

423
    This is a single-node call.
424

425
    """
426
    return self._SingleNodeCall(node, "node_has_ip_address", [address])
427

    
428
  def call_node_info(self, node_list, vg_name, hypervisor_type):
429
    """Return node information.
430

431
    This will return memory information and volume group size and free
432
    space.
433

434
    This is a multi-node call.
435

436
    @type node_list: list
437
    @param node_list: the list of nodes to query
438
    @type vgname: C{string}
439
    @param vgname: the name of the volume group to ask for disk space
440
        information
441
    @type hypervisor_type: C{str}
442
    @param hypervisor_type: the name of the hypervisor to ask for
443
        memory information
444

445
    """
446
    retux = self._MultiNodeCall(node_list, "node_info",
447
                                [vg_name, hypervisor_type])
448

    
449
    for node_name in retux:
450
      ret = retux.get(node_name, False)
451
      if type(ret) != dict:
452
        logging.error("could not connect to node %s", node_name)
453
        ret = {}
454

    
455
      utils.CheckDict(ret, {
456
                        'memory_total' : '-',
457
                        'memory_dom0' : '-',
458
                        'memory_free' : '-',
459
                        'vg_size' : 'node_unreachable',
460
                        'vg_free' : '-',
461
                      }, "call_node_info")
462
    return retux
463

    
464
  def call_node_add(self, node, dsa, dsapub, rsa, rsapub, ssh, sshpub):
465
    """Add a node to the cluster.
466

467
    This is a single-node call.
468

469
    """
470
    return self._SingleNodeCall(node, "node_add",
471
                                [dsa, dsapub, rsa, rsapub, ssh, sshpub])
472

    
473
  def call_node_verify(self, node_list, checkdict, cluster_name):
474
    """Request verification of given parameters.
475

476
    This is a multi-node call.
477

478
    """
479
    return self._MultiNodeCall(node_list, "node_verify",
480
                               [checkdict, cluster_name])
481

    
482
  @classmethod
483
  def call_node_start_master(cls, node, start_daemons):
484
    """Tells a node to activate itself as a master.
485

486
    This is a single-node call.
487

488
    """
489
    return cls._StaticSingleNodeCall(node, "node_start_master",
490
                                     [start_daemons])
491

    
492
  @classmethod
493
  def call_node_stop_master(cls, node, stop_daemons):
494
    """Tells a node to demote itself from master status.
495

496
    This is a single-node call.
497

498
    """
499
    return cls._StaticSingleNodeCall(node, "node_stop_master", [stop_daemons])
500

    
501
  @classmethod
502
  def call_master_info(cls, node_list):
503
    """Query master info.
504

505
    This is a multi-node call.
506

507
    """
508
    # TODO: should this method query down nodes?
509
    return cls._StaticMultiNodeCall(node_list, "master_info", [])
510

    
511
  def call_version(self, node_list):
512
    """Query node version.
513

514
    This is a multi-node call.
515

516
    """
517
    return self._MultiNodeCall(node_list, "version", [])
518

    
519
  def call_blockdev_create(self, node, bdev, size, owner, on_primary, info):
520
    """Request creation of a given block device.
521

522
    This is a single-node call.
523

524
    """
525
    return self._SingleNodeCall(node, "blockdev_create",
526
                                [bdev.ToDict(), size, owner, on_primary, info])
527

    
528
  def call_blockdev_remove(self, node, bdev):
529
    """Request removal of a given block device.
530

531
    This is a single-node call.
532

533
    """
534
    return self._SingleNodeCall(node, "blockdev_remove", [bdev.ToDict()])
535

    
536
  def call_blockdev_rename(self, node, devlist):
537
    """Request rename of the given block devices.
538

539
    This is a single-node call.
540

541
    """
542
    return self._SingleNodeCall(node, "blockdev_rename",
543
                                [(d.ToDict(), uid) for d, uid in devlist])
544

    
545
  def call_blockdev_assemble(self, node, disk, owner, on_primary):
546
    """Request assembling of a given block device.
547

548
    This is a single-node call.
549

550
    """
551
    return self._SingleNodeCall(node, "blockdev_assemble",
552
                                [disk.ToDict(), owner, on_primary])
553

    
554
  def call_blockdev_shutdown(self, node, disk):
555
    """Request shutdown of a given block device.
556

557
    This is a single-node call.
558

559
    """
560
    return self._SingleNodeCall(node, "blockdev_shutdown", [disk.ToDict()])
561

    
562
  def call_blockdev_addchildren(self, node, bdev, ndevs):
563
    """Request adding a list of children to a (mirroring) device.
564

565
    This is a single-node call.
566

567
    """
568
    return self._SingleNodeCall(node, "blockdev_addchildren",
569
                                [bdev.ToDict(),
570
                                 [disk.ToDict() for disk in ndevs]])
571

    
572
  def call_blockdev_removechildren(self, node, bdev, ndevs):
573
    """Request removing a list of children from a (mirroring) device.
574

575
    This is a single-node call.
576

577
    """
578
    return self._SingleNodeCall(node, "blockdev_removechildren",
579
                                [bdev.ToDict(),
580
                                 [disk.ToDict() for disk in ndevs]])
581

    
582
  def call_blockdev_getmirrorstatus(self, node, disks):
583
    """Request status of a (mirroring) device.
584

585
    This is a single-node call.
586

587
    """
588
    return self._SingleNodeCall(node, "blockdev_getmirrorstatus",
589
                                [dsk.ToDict() for dsk in disks])
590

    
591
  def call_blockdev_find(self, node, disk):
592
    """Request identification of a given block device.
593

594
    This is a single-node call.
595

596
    """
597
    return self._SingleNodeCall(node, "blockdev_find", [disk.ToDict()])
598

    
599
  def call_blockdev_close(self, node, disks):
600
    """Closes the given block devices.
601

602
    This is a single-node call.
603

604
    """
605
    return self._SingleNodeCall(node, "blockdev_close",
606
                                [cf.ToDict() for cf in disks])
607

    
608
  @classmethod
609
  def call_upload_file(cls, node_list, file_name, address_list=None):
610
    """Upload a file.
611

612
    The node will refuse the operation in case the file is not on the
613
    approved file list.
614

615
    This is a multi-node call.
616

617
    @type node_list: list
618
    @param node_list: the list of node names to upload to
619
    @type file_name: str
620
    @param file_name: the filename to upload
621
    @type address_list: list or None
622
    @keyword address_list: an optional list of node addresses, in order
623
        to optimize the RPC speed
624

625
    """
626
    data = utils.ReadFile(file_name)
627
    st = os.stat(file_name)
628
    params = [file_name, data, st.st_mode, st.st_uid, st.st_gid,
629
              st.st_atime, st.st_mtime]
630
    return cls._StaticMultiNodeCall(node_list, "upload_file", params,
631
                                    address_list=address_list)
632

    
633
  @classmethod
634
  def call_write_ssconf_files(cls, node_list, values):
635
    """Write ssconf files.
636

637
    This is a multi-node call.
638

639
    """
640
    return cls._StaticMultiNodeCall(node_list, "write_ssconf_files", [values])
641

    
642
  def call_os_diagnose(self, node_list):
643
    """Request a diagnose of OS definitions.
644

645
    This is a multi-node call.
646

647
    """
648
    result = self._MultiNodeCall(node_list, "os_diagnose", [])
649

    
650
    new_result = {}
651
    for node_name in result:
652
      if result[node_name]:
653
        nr = [objects.OS.FromDict(oss) for oss in result[node_name]]
654
      else:
655
        nr = []
656
      new_result[node_name] = nr
657
    return new_result
658

    
659
  def call_os_get(self, node, name):
660
    """Returns an OS definition.
661

662
    This is a single-node call.
663

664
    """
665
    result = self._SingleNodeCall(node, "os_get", [name])
666
    if isinstance(result, dict):
667
      return objects.OS.FromDict(result)
668
    else:
669
      return result
670

    
671
  def call_hooks_runner(self, node_list, hpath, phase, env):
672
    """Call the hooks runner.
673

674
    Args:
675
      - op: the OpCode instance
676
      - env: a dictionary with the environment
677

678
    This is a multi-node call.
679

680
    """
681
    params = [hpath, phase, env]
682
    return self._MultiNodeCall(node_list, "hooks_runner", params)
683

    
684
  def call_iallocator_runner(self, node, name, idata):
685
    """Call an iallocator on a remote node
686

687
    Args:
688
      - name: the iallocator name
689
      - input: the json-encoded input string
690

691
    This is a single-node call.
692

693
    """
694
    return self._SingleNodeCall(node, "iallocator_runner", [name, idata])
695

    
696
  def call_blockdev_grow(self, node, cf_bdev, amount):
697
    """Request a snapshot of the given block device.
698

699
    This is a single-node call.
700

701
    """
702
    return self._SingleNodeCall(node, "blockdev_grow",
703
                                [cf_bdev.ToDict(), amount])
704

    
705
  def call_blockdev_snapshot(self, node, cf_bdev):
706
    """Request a snapshot of the given block device.
707

708
    This is a single-node call.
709

710
    """
711
    return self._SingleNodeCall(node, "blockdev_snapshot", [cf_bdev.ToDict()])
712

    
713
  def call_snapshot_export(self, node, snap_bdev, dest_node, instance,
714
                           cluster_name, idx):
715
    """Request the export of a given snapshot.
716

717
    This is a single-node call.
718

719
    """
720
    return self._SingleNodeCall(node, "snapshot_export",
721
                                [snap_bdev.ToDict(), dest_node,
722
                                 self._InstDict(instance), cluster_name, idx])
723

    
724
  def call_finalize_export(self, node, instance, snap_disks):
725
    """Request the completion of an export operation.
726

727
    This writes the export config file, etc.
728

729
    This is a single-node call.
730

731
    """
732
    flat_disks = []
733
    for disk in snap_disks:
734
      flat_disks.append(disk.ToDict())
735

    
736
    return self._SingleNodeCall(node, "finalize_export",
737
                                [self._InstDict(instance), flat_disks])
738

    
739
  def call_export_info(self, node, path):
740
    """Queries the export information in a given path.
741

742
    This is a single-node call.
743

744
    """
745
    result = self._SingleNodeCall(node, "export_info", [path])
746
    if not result:
747
      return result
748
    return objects.SerializableConfigParser.Loads(str(result))
749

    
750
  def call_instance_os_import(self, node, inst, src_node, src_images,
751
                              cluster_name):
752
    """Request the import of a backup into an instance.
753

754
    This is a single-node call.
755

756
    """
757
    return self._SingleNodeCall(node, "instance_os_import",
758
                                [self._InstDict(inst), src_node, src_images,
759
                                 cluster_name])
760

    
761
  def call_export_list(self, node_list):
762
    """Gets the stored exports list.
763

764
    This is a multi-node call.
765

766
    """
767
    return self._MultiNodeCall(node_list, "export_list", [])
768

    
769
  def call_export_remove(self, node, export):
770
    """Requests removal of a given export.
771

772
    This is a single-node call.
773

774
    """
775
    return self._SingleNodeCall(node, "export_remove", [export])
776

    
777
  @classmethod
778
  def call_node_leave_cluster(cls, node):
779
    """Requests a node to clean the cluster information it has.
780

781
    This will remove the configuration information from the ganeti data
782
    dir.
783

784
    This is a single-node call.
785

786
    """
787
    return cls._StaticSingleNodeCall(node, "node_leave_cluster", [])
788

    
789
  def call_node_volumes(self, node_list):
790
    """Gets all volumes on node(s).
791

792
    This is a multi-node call.
793

794
    """
795
    return self._MultiNodeCall(node_list, "node_volumes", [])
796

    
797
  def call_test_delay(self, node_list, duration):
798
    """Sleep for a fixed time on given node(s).
799

800
    This is a multi-node call.
801

802
    """
803
    return self._MultiNodeCall(node_list, "test_delay", [duration])
804

    
805
  def call_file_storage_dir_create(self, node, file_storage_dir):
806
    """Create the given file storage directory.
807

808
    This is a single-node call.
809

810
    """
811
    return self._SingleNodeCall(node, "file_storage_dir_create",
812
                                [file_storage_dir])
813

    
814
  def call_file_storage_dir_remove(self, node, file_storage_dir):
815
    """Remove the given file storage directory.
816

817
    This is a single-node call.
818

819
    """
820
    return self._SingleNodeCall(node, "file_storage_dir_remove",
821
                                [file_storage_dir])
822

    
823
  def call_file_storage_dir_rename(self, node, old_file_storage_dir,
824
                                   new_file_storage_dir):
825
    """Rename file storage directory.
826

827
    This is a single-node call.
828

829
    """
830
    return self._SingleNodeCall(node, "file_storage_dir_rename",
831
                                [old_file_storage_dir, new_file_storage_dir])
832

    
833
  @classmethod
834
  def call_jobqueue_update(cls, node_list, address_list, file_name, content):
835
    """Update job queue.
836

837
    This is a multi-node call.
838

839
    """
840
    return cls._StaticMultiNodeCall(node_list, "jobqueue_update",
841
                                    [file_name, content],
842
                                    address_list=address_list)
843

    
844
  @classmethod
845
  def call_jobqueue_purge(cls, node):
846
    """Purge job queue.
847

848
    This is a single-node call.
849

850
    """
851
    return cls._StaticSingleNodeCall(node, "jobqueue_purge", [])
852

    
853
  @classmethod
854
  def call_jobqueue_rename(cls, node_list, address_list, old, new):
855
    """Rename a job queue file.
856

857
    This is a multi-node call.
858

859
    """
860
    return cls._StaticMultiNodeCall(node_list, "jobqueue_rename", [old, new],
861
                                    address_list=address_list)
862

    
863
  @classmethod
864
  def call_jobqueue_set_drain(cls, node_list, drain_flag):
865
    """Set the drain flag on the queue.
866

867
    This is a multi-node call.
868

869
    @type node_list: list
870
    @param node_list: the list of nodes to query
871
    @type drain_flag: bool
872
    @param drain_flag: if True, will set the drain flag, otherwise reset it.
873

874
    """
875
    return cls._StaticMultiNodeCall(node_list, "jobqueue_set_drain",
876
                                    [drain_flag])
877

    
878
  def call_hypervisor_validate_params(self, node_list, hvname, hvparams):
879
    """Validate the hypervisor params.
880

881
    This is a multi-node call.
882

883
    @type node_list: list
884
    @param node_list: the list of nodes to query
885
    @type hvname: string
886
    @param hvname: the hypervisor name
887
    @type hvparams: dict
888
    @param hvparams: the hypervisor parameters to be validated
889

890
    """
891
    cluster = self._cfg.GetClusterInfo()
892
    hv_full = cluster.FillDict(cluster.hvparams.get(hvname, {}), hvparams)
893
    return self._MultiNodeCall(node_list, "hypervisor_validate_params",
894
                               [hvname, hv_full])