Statistics
| Branch: | Tag: | Revision:

root / lib / rpc.py @ ed83f5cc

History | View | Annotate | Download (27.6 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Inter-node RPC library.
23

24
"""
25

    
26
# pylint: disable-msg=C0103,R0201,R0904
27
# C0103: Invalid name, since call_ are not valid
28
# R0201: Method could be a function, we keep all rpcs instance methods
29
# as not to change them back and forth between static/instance methods
30
# if they need to start using instance attributes
31
# R0904: Too many public methods
32

    
33
import os
34
import socket
35
import logging
36

    
37
from ganeti import utils
38
from ganeti import objects
39
from ganeti import http
40
from ganeti import serializer
41
from ganeti import constants
42
from ganeti import errors
43

    
44
import ganeti.http.client
45

    
46

    
47
# Module level variable
48
_http_manager = None
49

    
50

    
51
def Init():
52
  """Initializes the module-global HTTP client manager.
53

54
  Must be called before using any RPC function.
55

56
  """
57
  global _http_manager
58

    
59
  assert not _http_manager, "RPC module initialized more than once"
60

    
61
  _http_manager = http.client.HttpClientManager()
62

    
63

    
64
def Shutdown():
65
  """Stops the module-global HTTP client manager.
66

67
  Must be called before quitting the program.
68

69
  """
70
  global _http_manager
71

    
72
  if _http_manager:
73
    _http_manager.Shutdown()
74
    _http_manager = None
75

    
76

    
77
class RpcResult(object):
78
  """RPC Result class.
79

80
  This class holds an RPC result. It is needed since in multi-node
81
  calls we can't raise an exception just because one one out of many
82
  failed, and therefore we use this class to encapsulate the result.
83

84
  @ivar data: the data payload, for successfull results, or None
85
  @type failed: boolean
86
  @ivar failed: whether the operation failed at RPC level (not
87
      application level on the remote node)
88
  @ivar call: the name of the RPC call
89
  @ivar node: the name of the node to which we made the call
90
  @ivar offline: whether the operation failed because the node was
91
      offline, as opposed to actual failure; offline=True will always
92
      imply failed=True, in order to allow simpler checking if
93
      the user doesn't care about the exact failure mode
94

95
  """
96
  def __init__(self, data=None, failed=False, offline=False,
97
               call=None, node=None):
98
    self.failed = failed
99
    self.offline = offline
100
    self.call = call
101
    self.node = node
102
    if offline:
103
      self.failed = True
104
      self.error = "Node is marked offline"
105
      self.data = None
106
    elif failed:
107
      self.error = data
108
      self.data = None
109
    else:
110
      self.data = data
111
      self.error = None
112

    
113
  def Raise(self):
114
    """If the result has failed, raise an OpExecError.
115

116
    This is used so that LU code doesn't have to check for each
117
    result, but instead can call this function.
118

119
    """
120
    if self.failed:
121
      raise errors.OpExecError("Call '%s' to node '%s' has failed: %s" %
122
                               (self.call, self.node, self.error))
123

    
124

    
125
class Client:
126
  """RPC Client class.
127

128
  This class, given a (remote) method name, a list of parameters and a
129
  list of nodes, will contact (in parallel) all nodes, and return a
130
  dict of results (key: node name, value: result).
131

132
  One current bug is that generic failure is still signalled by
133
  'False' result, which is not good. This overloading of values can
134
  cause bugs.
135

136
  """
137
  def __init__(self, procedure, body, port):
138
    self.procedure = procedure
139
    self.body = body
140
    self.port = port
141
    self.nc = {}
142

    
143
    self._ssl_params = \
144
      http.HttpSslParams(ssl_key_path=constants.SSL_CERT_FILE,
145
                         ssl_cert_path=constants.SSL_CERT_FILE)
146

    
147
  def ConnectList(self, node_list, address_list=None):
148
    """Add a list of nodes to the target nodes.
149

150
    @type node_list: list
151
    @param node_list: the list of node names to connect
152
    @type address_list: list or None
153
    @keyword address_list: either None or a list with node addresses,
154
        which must have the same length as the node list
155

156
    """
157
    if address_list is None:
158
      address_list = [None for _ in node_list]
159
    else:
160
      assert len(node_list) == len(address_list), \
161
             "Name and address lists should have the same length"
162
    for node, address in zip(node_list, address_list):
163
      self.ConnectNode(node, address)
164

    
165
  def ConnectNode(self, name, address=None):
166
    """Add a node to the target list.
167

168
    @type name: str
169
    @param name: the node name
170
    @type address: str
171
    @keyword address: the node address, if known
172

173
    """
174
    if address is None:
175
      address = name
176

    
177
    self.nc[name] = \
178
      http.client.HttpClientRequest(address, self.port, http.HTTP_PUT,
179
                                    "/%s" % self.procedure,
180
                                    post_data=self.body,
181
                                    ssl_params=self._ssl_params,
182
                                    ssl_verify_peer=True)
183

    
184
  def GetResults(self):
185
    """Call nodes and return results.
186

187
    @rtype: list
188
    @returns: List of RPC results
189

190
    """
191
    assert _http_manager, "RPC module not intialized"
192

    
193
    _http_manager.ExecRequests(self.nc.values())
194

    
195
    results = {}
196

    
197
    for name, req in self.nc.iteritems():
198
      if req.success and req.resp_status_code == http.HTTP_OK:
199
        results[name] = RpcResult(data=serializer.LoadJson(req.resp_body),
200
                                  node=name, call=self.procedure)
201
        continue
202

    
203
      # TODO: Better error reporting
204
      if req.error:
205
        msg = req.error
206
      else:
207
        msg = req.resp_body
208

    
209
      logging.error("RPC error from node %s: %s", name, msg)
210
      results[name] = RpcResult(data=msg, failed=True, node=name,
211
                                call=self.procedure)
212

    
213
    return results
214

    
215

    
216
class RpcRunner(object):
217
  """RPC runner class"""
218

    
219
  def __init__(self, cfg):
220
    """Initialized the rpc runner.
221

222
    @type cfg:  C{config.ConfigWriter}
223
    @param cfg: the configuration object that will be used to get data
224
                about the cluster
225

226
    """
227
    self._cfg = cfg
228
    self.port = utils.GetNodeDaemonPort()
229

    
230
  def _InstDict(self, instance):
231
    """Convert the given instance to a dict.
232

233
    This is done via the instance's ToDict() method and additionally
234
    we fill the hvparams with the cluster defaults.
235

236
    @type instance: L{objects.Instance}
237
    @param instance: an Instance object
238
    @rtype: dict
239
    @return: the instance dict, with the hvparams filled with the
240
        cluster defaults
241

242
    """
243
    idict = instance.ToDict()
244
    cluster = self._cfg.GetClusterInfo()
245
    idict["hvparams"] = cluster.FillHV(instance)
246
    idict["beparams"] = cluster.FillBE(instance)
247
    return idict
248

    
249
  def _ConnectList(self, client, node_list):
250
    """Helper for computing node addresses.
251

252
    @type client: L{Client}
253
    @param client: a C{Client} instance
254
    @type node_list: list
255
    @param node_list: the node list we should connect
256

257
    """
258
    all_nodes = self._cfg.GetAllNodesInfo()
259
    name_list = []
260
    addr_list = []
261
    skip_dict = {}
262
    for node in node_list:
263
      if node in all_nodes:
264
        if all_nodes[node].offline:
265
          skip_dict[node] = RpcResult(node=node, offline=True)
266
          continue
267
        val = all_nodes[node].primary_ip
268
      else:
269
        val = None
270
      addr_list.append(val)
271
      name_list.append(node)
272
    if name_list:
273
      client.ConnectList(name_list, address_list=addr_list)
274
    return skip_dict
275

    
276
  def _ConnectNode(self, client, node):
277
    """Helper for computing one node's address.
278

279
    @type client: L{Client}
280
    @param client: a C{Client} instance
281
    @type node: str
282
    @param node: the node we should connect
283

284
    """
285
    node_info = self._cfg.GetNodeInfo(node)
286
    if node_info is not None:
287
      if node_info.offline:
288
        return RpcResult(node=node, offline=True)
289
      addr = node_info.primary_ip
290
    else:
291
      addr = None
292
    client.ConnectNode(node, address=addr)
293

    
294
  def _MultiNodeCall(self, node_list, procedure, args):
295
    """Helper for making a multi-node call
296

297
    """
298
    body = serializer.DumpJson(args, indent=False)
299
    c = Client(procedure, body, self.port)
300
    skip_dict = self._ConnectList(c, node_list)
301
    skip_dict.update(c.GetResults())
302
    return skip_dict
303

    
304
  @classmethod
305
  def _StaticMultiNodeCall(cls, node_list, procedure, args,
306
                           address_list=None):
307
    """Helper for making a multi-node static call
308

309
    """
310
    body = serializer.DumpJson(args, indent=False)
311
    c = Client(procedure, body, utils.GetNodeDaemonPort())
312
    c.ConnectList(node_list, address_list=address_list)
313
    return c.GetResults()
314

    
315
  def _SingleNodeCall(self, node, procedure, args):
316
    """Helper for making a single-node call
317

318
    """
319
    body = serializer.DumpJson(args, indent=False)
320
    c = Client(procedure, body, self.port)
321
    result = self._ConnectNode(c, node)
322
    if result is None:
323
      # we did connect, node is not offline
324
      result = c.GetResults()[node]
325
    return result
326

    
327
  @classmethod
328
  def _StaticSingleNodeCall(cls, node, procedure, args):
329
    """Helper for making a single-node static call
330

331
    """
332
    body = serializer.DumpJson(args, indent=False)
333
    c = Client(procedure, body, utils.GetNodeDaemonPort())
334
    c.ConnectNode(node)
335
    return c.GetResults()[node]
336

    
337
  #
338
  # Begin RPC calls
339
  #
340

    
341
  def call_volume_list(self, node_list, vg_name):
342
    """Gets the logical volumes present in a given volume group.
343

344
    This is a multi-node call.
345

346
    """
347
    return self._MultiNodeCall(node_list, "volume_list", [vg_name])
348

    
349
  def call_vg_list(self, node_list):
350
    """Gets the volume group list.
351

352
    This is a multi-node call.
353

354
    """
355
    return self._MultiNodeCall(node_list, "vg_list", [])
356

    
357
  def call_bridges_exist(self, node, bridges_list):
358
    """Checks if a node has all the bridges given.
359

360
    This method checks if all bridges given in the bridges_list are
361
    present on the remote node, so that an instance that uses interfaces
362
    on those bridges can be started.
363

364
    This is a single-node call.
365

366
    """
367
    return self._SingleNodeCall(node, "bridges_exist", [bridges_list])
368

    
369
  def call_instance_start(self, node, instance, extra_args):
370
    """Starts an instance.
371

372
    This is a single-node call.
373

374
    """
375
    return self._SingleNodeCall(node, "instance_start",
376
                                [self._InstDict(instance), extra_args])
377

    
378
  def call_instance_shutdown(self, node, instance):
379
    """Stops an instance.
380

381
    This is a single-node call.
382

383
    """
384
    return self._SingleNodeCall(node, "instance_shutdown",
385
                                [self._InstDict(instance)])
386

    
387
  def call_instance_migrate(self, node, instance, target, live):
388
    """Migrate an instance.
389

390
    This is a single-node call.
391

392
    @type node: string
393
    @param node: the node on which the instance is currently running
394
    @type instance: C{objects.Instance}
395
    @param instance: the instance definition
396
    @type target: string
397
    @param target: the target node name
398
    @type live: boolean
399
    @param live: whether the migration should be done live or not (the
400
        interpretation of this parameter is left to the hypervisor)
401

402
    """
403
    return self._SingleNodeCall(node, "instance_migrate",
404
                                [self._InstDict(instance), target, live])
405

    
406
  def call_instance_reboot(self, node, instance, reboot_type, extra_args):
407
    """Reboots an instance.
408

409
    This is a single-node call.
410

411
    """
412
    return self._SingleNodeCall(node, "instance_reboot",
413
                                [self._InstDict(instance), reboot_type,
414
                                 extra_args])
415

    
416
  def call_instance_os_add(self, node, inst):
417
    """Installs an OS on the given instance.
418

419
    This is a single-node call.
420

421
    """
422
    return self._SingleNodeCall(node, "instance_os_add",
423
                                [self._InstDict(inst)])
424

    
425
  def call_instance_run_rename(self, node, inst, old_name):
426
    """Run the OS rename script for an instance.
427

428
    This is a single-node call.
429

430
    """
431
    return self._SingleNodeCall(node, "instance_run_rename",
432
                                [self._InstDict(inst), old_name])
433

    
434
  def call_instance_info(self, node, instance, hname):
435
    """Returns information about a single instance.
436

437
    This is a single-node call.
438

439
    @type node: list
440
    @param node: the list of nodes to query
441
    @type instance: string
442
    @param instance: the instance name
443
    @type hname: string
444
    @param hname: the hypervisor type of the instance
445

446
    """
447
    return self._SingleNodeCall(node, "instance_info", [instance, hname])
448

    
449
  def call_all_instances_info(self, node_list, hypervisor_list):
450
    """Returns information about all instances on the given nodes.
451

452
    This is a multi-node call.
453

454
    @type node_list: list
455
    @param node_list: the list of nodes to query
456
    @type hypervisor_list: list
457
    @param hypervisor_list: the hypervisors to query for instances
458

459
    """
460
    return self._MultiNodeCall(node_list, "all_instances_info",
461
                               [hypervisor_list])
462

    
463
  def call_instance_list(self, node_list, hypervisor_list):
464
    """Returns the list of running instances on a given node.
465

466
    This is a multi-node call.
467

468
    @type node_list: list
469
    @param node_list: the list of nodes to query
470
    @type hypervisor_list: list
471
    @param hypervisor_list: the hypervisors to query for instances
472

473
    """
474
    return self._MultiNodeCall(node_list, "instance_list", [hypervisor_list])
475

    
476
  def call_node_tcp_ping(self, node, source, target, port, timeout,
477
                         live_port_needed):
478
    """Do a TcpPing on the remote node
479

480
    This is a single-node call.
481

482
    """
483
    return self._SingleNodeCall(node, "node_tcp_ping",
484
                                [source, target, port, timeout,
485
                                 live_port_needed])
486

    
487
  def call_node_has_ip_address(self, node, address):
488
    """Checks if a node has the given IP address.
489

490
    This is a single-node call.
491

492
    """
493
    return self._SingleNodeCall(node, "node_has_ip_address", [address])
494

    
495
  def call_node_info(self, node_list, vg_name, hypervisor_type):
496
    """Return node information.
497

498
    This will return memory information and volume group size and free
499
    space.
500

501
    This is a multi-node call.
502

503
    @type node_list: list
504
    @param node_list: the list of nodes to query
505
    @type vgname: C{string}
506
    @param vgname: the name of the volume group to ask for disk space
507
        information
508
    @type hypervisor_type: C{str}
509
    @param hypervisor_type: the name of the hypervisor to ask for
510
        memory information
511

512
    """
513
    retux = self._MultiNodeCall(node_list, "node_info",
514
                                [vg_name, hypervisor_type])
515

    
516
    for result in retux.itervalues():
517
      if result.failed or not isinstance(result.data, dict):
518
        result.data = {}
519

    
520
      utils.CheckDict(result.data, {
521
        'memory_total' : '-',
522
        'memory_dom0' : '-',
523
        'memory_free' : '-',
524
        'vg_size' : 'node_unreachable',
525
        'vg_free' : '-',
526
        }, "call_node_info")
527
    return retux
528

    
529
  def call_node_add(self, node, dsa, dsapub, rsa, rsapub, ssh, sshpub):
530
    """Add a node to the cluster.
531

532
    This is a single-node call.
533

534
    """
535
    return self._SingleNodeCall(node, "node_add",
536
                                [dsa, dsapub, rsa, rsapub, ssh, sshpub])
537

    
538
  def call_node_verify(self, node_list, checkdict, cluster_name):
539
    """Request verification of given parameters.
540

541
    This is a multi-node call.
542

543
    """
544
    return self._MultiNodeCall(node_list, "node_verify",
545
                               [checkdict, cluster_name])
546

    
547
  @classmethod
548
  def call_node_start_master(cls, node, start_daemons):
549
    """Tells a node to activate itself as a master.
550

551
    This is a single-node call.
552

553
    """
554
    return cls._StaticSingleNodeCall(node, "node_start_master",
555
                                     [start_daemons])
556

    
557
  @classmethod
558
  def call_node_stop_master(cls, node, stop_daemons):
559
    """Tells a node to demote itself from master status.
560

561
    This is a single-node call.
562

563
    """
564
    return cls._StaticSingleNodeCall(node, "node_stop_master", [stop_daemons])
565

    
566
  @classmethod
567
  def call_master_info(cls, node_list):
568
    """Query master info.
569

570
    This is a multi-node call.
571

572
    """
573
    # TODO: should this method query down nodes?
574
    return cls._StaticMultiNodeCall(node_list, "master_info", [])
575

    
576
  def call_version(self, node_list):
577
    """Query node version.
578

579
    This is a multi-node call.
580

581
    """
582
    return self._MultiNodeCall(node_list, "version", [])
583

    
584
  def call_blockdev_create(self, node, bdev, size, owner, on_primary, info):
585
    """Request creation of a given block device.
586

587
    This is a single-node call.
588

589
    """
590
    return self._SingleNodeCall(node, "blockdev_create",
591
                                [bdev.ToDict(), size, owner, on_primary, info])
592

    
593
  def call_blockdev_remove(self, node, bdev):
594
    """Request removal of a given block device.
595

596
    This is a single-node call.
597

598
    """
599
    return self._SingleNodeCall(node, "blockdev_remove", [bdev.ToDict()])
600

    
601
  def call_blockdev_rename(self, node, devlist):
602
    """Request rename of the given block devices.
603

604
    This is a single-node call.
605

606
    """
607
    return self._SingleNodeCall(node, "blockdev_rename",
608
                                [(d.ToDict(), uid) for d, uid in devlist])
609

    
610
  def call_blockdev_assemble(self, node, disk, owner, on_primary):
611
    """Request assembling of a given block device.
612

613
    This is a single-node call.
614

615
    """
616
    return self._SingleNodeCall(node, "blockdev_assemble",
617
                                [disk.ToDict(), owner, on_primary])
618

    
619
  def call_blockdev_shutdown(self, node, disk):
620
    """Request shutdown of a given block device.
621

622
    This is a single-node call.
623

624
    """
625
    return self._SingleNodeCall(node, "blockdev_shutdown", [disk.ToDict()])
626

    
627
  def call_blockdev_addchildren(self, node, bdev, ndevs):
628
    """Request adding a list of children to a (mirroring) device.
629

630
    This is a single-node call.
631

632
    """
633
    return self._SingleNodeCall(node, "blockdev_addchildren",
634
                                [bdev.ToDict(),
635
                                 [disk.ToDict() for disk in ndevs]])
636

    
637
  def call_blockdev_removechildren(self, node, bdev, ndevs):
638
    """Request removing a list of children from a (mirroring) device.
639

640
    This is a single-node call.
641

642
    """
643
    return self._SingleNodeCall(node, "blockdev_removechildren",
644
                                [bdev.ToDict(),
645
                                 [disk.ToDict() for disk in ndevs]])
646

    
647
  def call_blockdev_getmirrorstatus(self, node, disks):
648
    """Request status of a (mirroring) device.
649

650
    This is a single-node call.
651

652
    """
653
    return self._SingleNodeCall(node, "blockdev_getmirrorstatus",
654
                                [dsk.ToDict() for dsk in disks])
655

    
656
  def call_blockdev_find(self, node, disk):
657
    """Request identification of a given block device.
658

659
    This is a single-node call.
660

661
    """
662
    return self._SingleNodeCall(node, "blockdev_find", [disk.ToDict()])
663

    
664
  def call_blockdev_close(self, node, disks):
665
    """Closes the given block devices.
666

667
    This is a single-node call.
668

669
    """
670
    return self._SingleNodeCall(node, "blockdev_close",
671
                                [cf.ToDict() for cf in disks])
672

    
673
  @classmethod
674
  def call_upload_file(cls, node_list, file_name, address_list=None):
675
    """Upload a file.
676

677
    The node will refuse the operation in case the file is not on the
678
    approved file list.
679

680
    This is a multi-node call.
681

682
    @type node_list: list
683
    @param node_list: the list of node names to upload to
684
    @type file_name: str
685
    @param file_name: the filename to upload
686
    @type address_list: list or None
687
    @keyword address_list: an optional list of node addresses, in order
688
        to optimize the RPC speed
689

690
    """
691
    data = utils.ReadFile(file_name)
692
    st = os.stat(file_name)
693
    params = [file_name, data, st.st_mode, st.st_uid, st.st_gid,
694
              st.st_atime, st.st_mtime]
695
    return cls._StaticMultiNodeCall(node_list, "upload_file", params,
696
                                    address_list=address_list)
697

    
698
  @classmethod
699
  def call_write_ssconf_files(cls, node_list, values):
700
    """Write ssconf files.
701

702
    This is a multi-node call.
703

704
    """
705
    return cls._StaticMultiNodeCall(node_list, "write_ssconf_files", [values])
706

    
707
  def call_os_diagnose(self, node_list):
708
    """Request a diagnose of OS definitions.
709

710
    This is a multi-node call.
711

712
    """
713
    result = self._MultiNodeCall(node_list, "os_diagnose", [])
714

    
715
    for node_name, node_result in result.iteritems():
716
      if not node_result.failed and node_result.data:
717
        node_result.data = [objects.OS.FromDict(oss)
718
                            for oss in node_result.data]
719
    return result
720

    
721
  def call_os_get(self, node, name):
722
    """Returns an OS definition.
723

724
    This is a single-node call.
725

726
    """
727
    result = self._SingleNodeCall(node, "os_get", [name])
728
    if not result.failed and isinstance(result.data, dict):
729
      result.data = objects.OS.FromDict(result.data)
730
    return result
731

    
732
  def call_hooks_runner(self, node_list, hpath, phase, env):
733
    """Call the hooks runner.
734

735
    Args:
736
      - op: the OpCode instance
737
      - env: a dictionary with the environment
738

739
    This is a multi-node call.
740

741
    """
742
    params = [hpath, phase, env]
743
    return self._MultiNodeCall(node_list, "hooks_runner", params)
744

    
745
  def call_iallocator_runner(self, node, name, idata):
746
    """Call an iallocator on a remote node
747

748
    Args:
749
      - name: the iallocator name
750
      - input: the json-encoded input string
751

752
    This is a single-node call.
753

754
    """
755
    return self._SingleNodeCall(node, "iallocator_runner", [name, idata])
756

    
757
  def call_blockdev_grow(self, node, cf_bdev, amount):
758
    """Request a snapshot of the given block device.
759

760
    This is a single-node call.
761

762
    """
763
    return self._SingleNodeCall(node, "blockdev_grow",
764
                                [cf_bdev.ToDict(), amount])
765

    
766
  def call_blockdev_snapshot(self, node, cf_bdev):
767
    """Request a snapshot of the given block device.
768

769
    This is a single-node call.
770

771
    """
772
    return self._SingleNodeCall(node, "blockdev_snapshot", [cf_bdev.ToDict()])
773

    
774
  def call_snapshot_export(self, node, snap_bdev, dest_node, instance,
775
                           cluster_name, idx):
776
    """Request the export of a given snapshot.
777

778
    This is a single-node call.
779

780
    """
781
    return self._SingleNodeCall(node, "snapshot_export",
782
                                [snap_bdev.ToDict(), dest_node,
783
                                 self._InstDict(instance), cluster_name, idx])
784

    
785
  def call_finalize_export(self, node, instance, snap_disks):
786
    """Request the completion of an export operation.
787

788
    This writes the export config file, etc.
789

790
    This is a single-node call.
791

792
    """
793
    flat_disks = []
794
    for disk in snap_disks:
795
      flat_disks.append(disk.ToDict())
796

    
797
    return self._SingleNodeCall(node, "finalize_export",
798
                                [self._InstDict(instance), flat_disks])
799

    
800
  def call_export_info(self, node, path):
801
    """Queries the export information in a given path.
802

803
    This is a single-node call.
804

805
    """
806
    result = self._SingleNodeCall(node, "export_info", [path])
807
    if not result.failed and result.data:
808
      result.data = objects.SerializableConfigParser.Loads(str(result.data))
809
    return result
810

    
811
  def call_instance_os_import(self, node, inst, src_node, src_images,
812
                              cluster_name):
813
    """Request the import of a backup into an instance.
814

815
    This is a single-node call.
816

817
    """
818
    return self._SingleNodeCall(node, "instance_os_import",
819
                                [self._InstDict(inst), src_node, src_images,
820
                                 cluster_name])
821

    
822
  def call_export_list(self, node_list):
823
    """Gets the stored exports list.
824

825
    This is a multi-node call.
826

827
    """
828
    return self._MultiNodeCall(node_list, "export_list", [])
829

    
830
  def call_export_remove(self, node, export):
831
    """Requests removal of a given export.
832

833
    This is a single-node call.
834

835
    """
836
    return self._SingleNodeCall(node, "export_remove", [export])
837

    
838
  @classmethod
839
  def call_node_leave_cluster(cls, node):
840
    """Requests a node to clean the cluster information it has.
841

842
    This will remove the configuration information from the ganeti data
843
    dir.
844

845
    This is a single-node call.
846

847
    """
848
    return cls._StaticSingleNodeCall(node, "node_leave_cluster", [])
849

    
850
  def call_node_volumes(self, node_list):
851
    """Gets all volumes on node(s).
852

853
    This is a multi-node call.
854

855
    """
856
    return self._MultiNodeCall(node_list, "node_volumes", [])
857

    
858
  def call_node_demote_from_mc(self, node):
859
    """Demote a node from the master candidate role.
860

861
    This is a single-node call.
862

863
    """
864
    return self._SingleNodeCall(node, "node_demote_from_mc", [])
865

    
866
  def call_test_delay(self, node_list, duration):
867
    """Sleep for a fixed time on given node(s).
868

869
    This is a multi-node call.
870

871
    """
872
    return self._MultiNodeCall(node_list, "test_delay", [duration])
873

    
874
  def call_file_storage_dir_create(self, node, file_storage_dir):
875
    """Create the given file storage directory.
876

877
    This is a single-node call.
878

879
    """
880
    return self._SingleNodeCall(node, "file_storage_dir_create",
881
                                [file_storage_dir])
882

    
883
  def call_file_storage_dir_remove(self, node, file_storage_dir):
884
    """Remove the given file storage directory.
885

886
    This is a single-node call.
887

888
    """
889
    return self._SingleNodeCall(node, "file_storage_dir_remove",
890
                                [file_storage_dir])
891

    
892
  def call_file_storage_dir_rename(self, node, old_file_storage_dir,
893
                                   new_file_storage_dir):
894
    """Rename file storage directory.
895

896
    This is a single-node call.
897

898
    """
899
    return self._SingleNodeCall(node, "file_storage_dir_rename",
900
                                [old_file_storage_dir, new_file_storage_dir])
901

    
902
  @classmethod
903
  def call_jobqueue_update(cls, node_list, address_list, file_name, content):
904
    """Update job queue.
905

906
    This is a multi-node call.
907

908
    """
909
    return cls._StaticMultiNodeCall(node_list, "jobqueue_update",
910
                                    [file_name, content],
911
                                    address_list=address_list)
912

    
913
  @classmethod
914
  def call_jobqueue_purge(cls, node):
915
    """Purge job queue.
916

917
    This is a single-node call.
918

919
    """
920
    return cls._StaticSingleNodeCall(node, "jobqueue_purge", [])
921

    
922
  @classmethod
923
  def call_jobqueue_rename(cls, node_list, address_list, old, new):
924
    """Rename a job queue file.
925

926
    This is a multi-node call.
927

928
    """
929
    return cls._StaticMultiNodeCall(node_list, "jobqueue_rename", [old, new],
930
                                    address_list=address_list)
931

    
932
  @classmethod
933
  def call_jobqueue_set_drain(cls, node_list, drain_flag):
934
    """Set the drain flag on the queue.
935

936
    This is a multi-node call.
937

938
    @type node_list: list
939
    @param node_list: the list of nodes to query
940
    @type drain_flag: bool
941
    @param drain_flag: if True, will set the drain flag, otherwise reset it.
942

943
    """
944
    return cls._StaticMultiNodeCall(node_list, "jobqueue_set_drain",
945
                                    [drain_flag])
946

    
947
  def call_hypervisor_validate_params(self, node_list, hvname, hvparams):
948
    """Validate the hypervisor params.
949

950
    This is a multi-node call.
951

952
    @type node_list: list
953
    @param node_list: the list of nodes to query
954
    @type hvname: string
955
    @param hvname: the hypervisor name
956
    @type hvparams: dict
957
    @param hvparams: the hypervisor parameters to be validated
958

959
    """
960
    cluster = self._cfg.GetClusterInfo()
961
    hv_full = cluster.FillDict(cluster.hvparams.get(hvname, {}), hvparams)
962
    return self._MultiNodeCall(node_list, "hypervisor_validate_params",
963
                               [hvname, hv_full])