Statistics
| Branch: | Tag: | Revision:

root / lib / rpc.py @ 74c47259

History | View | Annotate | Download (27.1 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Inter-node RPC library.
23

24
"""
25

    
26
# pylint: disable-msg=C0103,R0201,R0904
27
# C0103: Invalid name, since call_ are not valid
28
# R0201: Method could be a function, we keep all rpcs instance methods
29
# as not to change them back and forth between static/instance methods
30
# if they need to start using instance attributes
31
# R0904: Too many public methods
32

    
33
import os
34
import socket
35
import httplib
36
import logging
37

    
38
import simplejson
39

    
40
from ganeti import utils
41
from ganeti import objects
42

    
43

    
44
class NodeController:
45
  """Node-handling class.
46

47
  For each node that we speak with, we create an instance of this
48
  class, so that we have a safe place to store the details of this
49
  individual call.
50

51
  """
52
  def __init__(self, parent, node, address=None):
53
    """Constructor for the node controller.
54

55
    @type parent: L{Client}
56
    @param parent: the C{Client} instance which holds global parameters for
57
        the call
58
    @type node: str
59
    @param node: the name of the node we connect to; it is used for error
60
        messages and in cases we the address paramater is not passed
61
    @type address: str
62
    @keyword address: the node's address, in case we know it, so that we
63
        don't need to resolve it; testing shows that httplib has high
64
        overhead in resolving addresses (even when speficied in /etc/hosts)
65

66
    """
67
    self.parent = parent
68
    self.node = node
69
    if address is None:
70
      address = node
71
    self.failed = False
72

    
73
    self.http_conn = hc = httplib.HTTPConnection(address, parent.port)
74
    try:
75
      hc.connect()
76
      hc.putrequest('PUT', "/%s" % parent.procedure,
77
                    skip_accept_encoding=True)
78
      hc.putheader('Content-Length', parent.body_length)
79
      hc.endheaders()
80
      hc.send(parent.body)
81
    except socket.error:
82
      logging.exception("Error connecting to node %s", node)
83
      self.failed = True
84

    
85
  def GetResponse(self):
86
    """Try to process the response from the node.
87

88
    """
89
    if self.failed:
90
      # we already failed in connect
91
      return False
92
    resp = self.http_conn.getresponse()
93
    if resp.status != 200:
94
      return False
95
    try:
96
      length = int(resp.getheader('Content-Length', '0'))
97
    except ValueError:
98
      return False
99
    if not length:
100
      logging.error("Zero-length reply from node %s", self.node)
101
      return False
102
    payload = resp.read(length)
103
    unload = simplejson.loads(payload)
104
    return unload
105

    
106

    
107
class Client:
108
  """RPC Client class.
109

110
  This class, given a (remote) method name, a list of parameters and a
111
  list of nodes, will contact (in parallel) all nodes, and return a
112
  dict of results (key: node name, value: result).
113

114
  One current bug is that generic failure is still signalled by
115
  'False' result, which is not good. This overloading of values can
116
  cause bugs.
117

118
  @var body_length: cached string value of the length of the body (so that
119
      individual C{NodeController} instances don't have to recompute it)
120

121
  """
122
  result_set = False
123
  result = False
124
  allresult = []
125

    
126
  def __init__(self, procedure, args):
127
    self.port = utils.GetNodeDaemonPort()
128
    self.nodepw = utils.GetNodeDaemonPassword()
129
    self.nc = {}
130
    self.results = {}
131
    self.procedure = procedure
132
    self.args = args
133
    self.body = simplejson.dumps(args)
134
    self.body_length = str(len(self.body))
135

    
136
  #--- generic connector -------------
137

    
138
  def ConnectList(self, node_list, address_list=None):
139
    """Add a list of nodes to the target nodes.
140

141
    @type node_list: list
142
    @param node_list: the list of node names to connect
143
    @type address_list: list or None
144
    @keyword address_list: either None or a list with node addresses,
145
        which must have the same length as the node list
146

147
    """
148
    if address_list is None:
149
      address_list = [None for _ in node_list]
150
    else:
151
      assert len(node_list) == len(address_list), \
152
             "Name and address lists should have the same length"
153
    for node, address in zip(node_list, address_list):
154
      self.ConnectNode(node, address)
155

    
156
  def ConnectNode(self, name, address=None):
157
    """Add a node to the target list.
158

159
    @type name: str
160
    @param name: the node name
161
    @type address: str
162
    @keyword address: the node address, if known
163

164
    """
165
    self.nc[name] = NodeController(self, name, address)
166

    
167
  def GetResults(self):
168
    """Return the results of the call.
169

170
    """
171
    return self.results
172

    
173
  def Run(self):
174
    """Gather results from the node controllers.
175

176
    This function simply calls GetResponse() for each of our node
177
    controllers.
178

179
    """
180
    for node, nc in self.nc.items():
181
      self.results[node] = nc.GetResponse()
182

    
183

    
184
class RpcRunner(object):
185
  """RPC runner class"""
186

    
187
  def __init__(self, cfg):
188
    """Initialized the rpc runner.
189

190
    @type cfg:  C{config.ConfigWriter}
191
    @param cfg: the configuration object that will be used to get data
192
                about the cluster
193

194
    """
195
    self._cfg = cfg
196

    
197
  def _InstDict(self, instance):
198
    """Convert the given instance to a dict.
199

200
    This is done via the instance's ToDict() method and additionally
201
    we fill the hvparams with the cluster defaults.
202

203
    @type instance: L{objects.Instance}
204
    @param instance: an Instance object
205
    @rtype: dict
206
    @return: the instance dict, with the hvparams filled with the
207
        cluster defaults
208

209
    """
210
    idict = instance.ToDict()
211
    cluster = self._cfg.GetClusterInfo()
212
    idict["hvparams"] = cluster.FillHV(instance)
213
    idict["beparams"] = cluster.FillBE(instance)
214
    return idict
215

    
216
  def _ConnectList(self, client, node_list):
217
    """Helper for computing node addresses.
218

219
    @type client: L{Client}
220
    @param client: a C{Client} instance
221
    @type node_list: list
222
    @param node_list: the node list we should connect
223

224
    """
225
    all_nodes = self._cfg.GetAllNodesInfo()
226
    addr_list = []
227
    for node in node_list:
228
      if node in all_nodes:
229
        val = all_nodes[node].primary_ip
230
      else:
231
        val = None
232
      addr_list.append(val)
233
    client.ConnectList(node_list, address_list=addr_list)
234

    
235
  def _ConnectNode(self, client, node):
236
    """Helper for computing one node's address.
237

238
    @type client: L{Client}
239
    @param client: a C{Client} instance
240
    @type node: str
241
    @param node: the node we should connect
242

243
    """
244
    node_info = self._cfg.GetNodeInfo(node)
245
    if node_info is not None:
246
      addr = node_info.primary_ip
247
    else:
248
      addr = None
249
    client.ConnectNode(node, address=addr)
250

    
251
  def call_volume_list(self, node_list, vg_name):
252
    """Gets the logical volumes present in a given volume group.
253

254
    This is a multi-node call.
255

256
    """
257
    c = Client("volume_list", [vg_name])
258
    self._ConnectList(c, node_list)
259
    c.Run()
260
    return c.GetResults()
261

    
262
  def call_vg_list(self, node_list):
263
    """Gets the volume group list.
264

265
    This is a multi-node call.
266

267
    """
268
    c = Client("vg_list", [])
269
    self._ConnectList(c, node_list)
270
    c.Run()
271
    return c.GetResults()
272

    
273
  def call_bridges_exist(self, node, bridges_list):
274
    """Checks if a node has all the bridges given.
275

276
    This method checks if all bridges given in the bridges_list are
277
    present on the remote node, so that an instance that uses interfaces
278
    on those bridges can be started.
279

280
    This is a single-node call.
281

282
    """
283
    c = Client("bridges_exist", [bridges_list])
284
    self._ConnectNode(c, node)
285
    c.Run()
286
    return c.GetResults().get(node, False)
287

    
288
  def call_instance_start(self, node, instance, extra_args):
289
    """Starts an instance.
290

291
    This is a single-node call.
292

293
    """
294
    c = Client("instance_start", [self._InstDict(instance), extra_args])
295
    self._ConnectNode(c, node)
296
    c.Run()
297
    return c.GetResults().get(node, False)
298

    
299
  def call_instance_shutdown(self, node, instance):
300
    """Stops an instance.
301

302
    This is a single-node call.
303

304
    """
305
    c = Client("instance_shutdown", [self._InstDict(instance)])
306
    self._ConnectNode(c, node)
307
    c.Run()
308
    return c.GetResults().get(node, False)
309

    
310
  def call_instance_migrate(self, node, instance, target, live):
311
    """Migrate an instance.
312

313
    This is a single-node call.
314

315
    @type node: string
316
    @param node: the node on which the instance is currently running
317
    @type instance: C{objects.Instance}
318
    @param instance: the instance definition
319
    @type target: string
320
    @param target: the target node name
321
    @type live: boolean
322
    @param live: whether the migration should be done live or not (the
323
        interpretation of this parameter is left to the hypervisor)
324

325
    """
326
    c = Client("instance_migrate", [self._InstDict(instance), target, live])
327
    self._ConnectNode(c, node)
328
    c.Run()
329
    return c.GetResults().get(node, False)
330

    
331
  def call_instance_reboot(self, node, instance, reboot_type, extra_args):
332
    """Reboots an instance.
333

334
    This is a single-node call.
335

336
    """
337
    c = Client("instance_reboot", [self._InstDict(instance),
338
                                   reboot_type, extra_args])
339
    self._ConnectNode(c, node)
340
    c.Run()
341
    return c.GetResults().get(node, False)
342

    
343
  def call_instance_os_add(self, node, inst):
344
    """Installs an OS on the given instance.
345

346
    This is a single-node call.
347

348
    """
349
    params = [self._InstDict(inst)]
350
    c = Client("instance_os_add", params)
351
    self._ConnectNode(c, node)
352
    c.Run()
353
    return c.GetResults().get(node, False)
354

    
355
  def call_instance_run_rename(self, node, inst, old_name):
356
    """Run the OS rename script for an instance.
357

358
    This is a single-node call.
359

360
    """
361
    params = [self._InstDict(inst), old_name]
362
    c = Client("instance_run_rename", params)
363
    self._ConnectNode(c, node)
364
    c.Run()
365
    return c.GetResults().get(node, False)
366

    
367
  def call_instance_info(self, node, instance, hname):
368
    """Returns information about a single instance.
369

370
    This is a single-node call.
371

372
    @type node_list: list
373
    @param node_list: the list of nodes to query
374
    @type instance: string
375
    @param instance: the instance name
376
    @type hname: string
377
    @param hname: the hypervisor type of the instance
378

379
    """
380
    c = Client("instance_info", [instance, hname])
381
    self._ConnectNode(c, node)
382
    c.Run()
383
    return c.GetResults().get(node, False)
384

    
385
  def call_all_instances_info(self, node_list, hypervisor_list):
386
    """Returns information about all instances on the given nodes.
387

388
    This is a multi-node call.
389

390
    @type node_list: list
391
    @param node_list: the list of nodes to query
392
    @type hypervisor_list: list
393
    @param hypervisor_list: the hypervisors to query for instances
394

395
    """
396
    c = Client("all_instances_info", [hypervisor_list])
397
    self._ConnectList(c, node_list)
398
    c.Run()
399
    return c.GetResults()
400

    
401
  def call_instance_list(self, node_list, hypervisor_list):
402
    """Returns the list of running instances on a given node.
403

404
    This is a multi-node call.
405

406
    @type node_list: list
407
    @param node_list: the list of nodes to query
408
    @type hypervisor_list: list
409
    @param hypervisor_list: the hypervisors to query for instances
410

411
    """
412
    c = Client("instance_list", [hypervisor_list])
413
    self._ConnectList(c, node_list)
414
    c.Run()
415
    return c.GetResults()
416

    
417
  def call_node_tcp_ping(self, node, source, target, port, timeout,
418
                         live_port_needed):
419
    """Do a TcpPing on the remote node
420

421
    This is a single-node call.
422

423
    """
424
    c = Client("node_tcp_ping", [source, target, port, timeout,
425
                                 live_port_needed])
426
    self._ConnectNode(c, node)
427
    c.Run()
428
    return c.GetResults().get(node, False)
429

    
430
  def call_node_has_ip_address(self, node, address):
431
    """Checks if a node has the given IP address.
432

433
    This is a single-node call.
434

435
    """
436
    c = Client("node_has_ip_address", [address])
437
    self._ConnectNode(c, node)
438
    c.Run()
439
    return c.GetResults().get(node, False)
440

    
441
  def call_node_info(self, node_list, vg_name, hypervisor_type):
442
    """Return node information.
443

444
    This will return memory information and volume group size and free
445
    space.
446

447
    This is a multi-node call.
448

449
    @type node_list: list
450
    @param node_list: the list of nodes to query
451
    @type vgname: C{string}
452
    @param vgname: the name of the volume group to ask for disk space
453
        information
454
    @type hypervisor_type: C{str}
455
    @param hypervisor_type: the name of the hypervisor to ask for
456
        memory information
457

458
    """
459
    c = Client("node_info", [vg_name, hypervisor_type])
460
    self._ConnectList(c, node_list)
461
    c.Run()
462
    retux = c.GetResults()
463

    
464
    for node_name in retux:
465
      ret = retux.get(node_name, False)
466
      if type(ret) != dict:
467
        logging.error("could not connect to node %s", node_name)
468
        ret = {}
469

    
470
      utils.CheckDict(ret,
471
                      { 'memory_total' : '-',
472
                        'memory_dom0' : '-',
473
                        'memory_free' : '-',
474
                        'vg_size' : 'node_unreachable',
475
                        'vg_free' : '-' },
476
                      "call_node_info",
477
                      )
478
    return retux
479

    
480
  def call_node_add(self, node, dsa, dsapub, rsa, rsapub, ssh, sshpub):
481
    """Add a node to the cluster.
482

483
    This is a single-node call.
484

485
    """
486
    params = [dsa, dsapub, rsa, rsapub, ssh, sshpub]
487
    c = Client("node_add", params)
488
    self._ConnectNode(c, node)
489
    c.Run()
490
    return c.GetResults().get(node, False)
491

    
492
  def call_node_verify(self, node_list, checkdict, cluster_name):
493
    """Request verification of given parameters.
494

495
    This is a multi-node call.
496

497
    """
498
    c = Client("node_verify", [checkdict, cluster_name])
499
    self._ConnectList(c, node_list)
500
    c.Run()
501
    return c.GetResults()
502

    
503
  @staticmethod
504
  def call_node_start_master(node, start_daemons):
505
    """Tells a node to activate itself as a master.
506

507
    This is a single-node call.
508

509
    """
510
    c = Client("node_start_master", [start_daemons])
511
    c.ConnectNode(node)
512
    c.Run()
513
    return c.GetResults().get(node, False)
514

    
515
  @staticmethod
516
  def call_node_stop_master(node, stop_daemons):
517
    """Tells a node to demote itself from master status.
518

519
    This is a single-node call.
520

521
    """
522
    c = Client("node_stop_master", [stop_daemons])
523
    c.ConnectNode(node)
524
    c.Run()
525
    return c.GetResults().get(node, False)
526

    
527
  @staticmethod
528
  def call_master_info(node_list):
529
    """Query master info.
530

531
    This is a multi-node call.
532

533
    """
534
    # TODO: should this method query down nodes?
535
    c = Client("master_info", [])
536
    c.ConnectList(node_list)
537
    c.Run()
538
    return c.GetResults()
539

    
540
  def call_version(self, node_list):
541
    """Query node version.
542

543
    This is a multi-node call.
544

545
    """
546
    c = Client("version", [])
547
    self._ConnectList(c, node_list)
548
    c.Run()
549
    return c.GetResults()
550

    
551
  def call_blockdev_create(self, node, bdev, size, owner, on_primary, info):
552
    """Request creation of a given block device.
553

554
    This is a single-node call.
555

556
    """
557
    params = [bdev.ToDict(), size, owner, on_primary, info]
558
    c = Client("blockdev_create", params)
559
    self._ConnectNode(c, node)
560
    c.Run()
561
    return c.GetResults().get(node, False)
562

    
563
  def call_blockdev_remove(self, node, bdev):
564
    """Request removal of a given block device.
565

566
    This is a single-node call.
567

568
    """
569
    c = Client("blockdev_remove", [bdev.ToDict()])
570
    self._ConnectNode(c, node)
571
    c.Run()
572
    return c.GetResults().get(node, False)
573

    
574
  def call_blockdev_rename(self, node, devlist):
575
    """Request rename of the given block devices.
576

577
    This is a single-node call.
578

579
    """
580
    params = [(d.ToDict(), uid) for d, uid in devlist]
581
    c = Client("blockdev_rename", params)
582
    self._ConnectNode(c, node)
583
    c.Run()
584
    return c.GetResults().get(node, False)
585

    
586
  def call_blockdev_assemble(self, node, disk, owner, on_primary):
587
    """Request assembling of a given block device.
588

589
    This is a single-node call.
590

591
    """
592
    params = [disk.ToDict(), owner, on_primary]
593
    c = Client("blockdev_assemble", params)
594
    self._ConnectNode(c, node)
595
    c.Run()
596
    return c.GetResults().get(node, False)
597

    
598
  def call_blockdev_shutdown(self, node, disk):
599
    """Request shutdown of a given block device.
600

601
    This is a single-node call.
602

603
    """
604
    c = Client("blockdev_shutdown", [disk.ToDict()])
605
    self._ConnectNode(c, node)
606
    c.Run()
607
    return c.GetResults().get(node, False)
608

    
609
  def call_blockdev_addchildren(self, node, bdev, ndevs):
610
    """Request adding a list of children to a (mirroring) device.
611

612
    This is a single-node call.
613

614
    """
615
    params = [bdev.ToDict(), [disk.ToDict() for disk in ndevs]]
616
    c = Client("blockdev_addchildren", params)
617
    self._ConnectNode(c, node)
618
    c.Run()
619
    return c.GetResults().get(node, False)
620

    
621
  def call_blockdev_removechildren(self, node, bdev, ndevs):
622
    """Request removing a list of children from a (mirroring) device.
623

624
    This is a single-node call.
625

626
    """
627
    params = [bdev.ToDict(), [disk.ToDict() for disk in ndevs]]
628
    c = Client("blockdev_removechildren", params)
629
    self._ConnectNode(c, node)
630
    c.Run()
631
    return c.GetResults().get(node, False)
632

    
633
  def call_blockdev_getmirrorstatus(self, node, disks):
634
    """Request status of a (mirroring) device.
635

636
    This is a single-node call.
637

638
    """
639
    params = [dsk.ToDict() for dsk in disks]
640
    c = Client("blockdev_getmirrorstatus", params)
641
    self._ConnectNode(c, node)
642
    c.Run()
643
    return c.GetResults().get(node, False)
644

    
645
  def call_blockdev_find(self, node, disk):
646
    """Request identification of a given block device.
647

648
    This is a single-node call.
649

650
    """
651
    c = Client("blockdev_find", [disk.ToDict()])
652
    self._ConnectNode(c, node)
653
    c.Run()
654
    return c.GetResults().get(node, False)
655

    
656
  def call_blockdev_close(self, node, disks):
657
    """Closes the given block devices.
658

659
    This is a single-node call.
660

661
    """
662
    params = [cf.ToDict() for cf in disks]
663
    c = Client("blockdev_close", params)
664
    self._ConnectNode(c, node)
665
    c.Run()
666
    return c.GetResults().get(node, False)
667

    
668
  @staticmethod
669
  def call_upload_file(node_list, file_name, address_list=None):
670
    """Upload a file.
671

672
    The node will refuse the operation in case the file is not on the
673
    approved file list.
674

675
    This is a multi-node call.
676

677
    @type node_list: list
678
    @param node_list: the list of node names to upload to
679
    @type file_name: str
680
    @param file_name: the filename to upload
681
    @type address_list: list or None
682
    @keyword address_list: an optional list of node addresses, in order
683
        to optimize the RPC speed
684

685
    """
686
    fh = file(file_name)
687
    try:
688
      data = fh.read()
689
    finally:
690
      fh.close()
691
    st = os.stat(file_name)
692
    params = [file_name, data, st.st_mode, st.st_uid, st.st_gid,
693
              st.st_atime, st.st_mtime]
694
    c = Client("upload_file", params)
695
    c.ConnectList(node_list, address_list=address_list)
696
    c.Run()
697
    return c.GetResults()
698

    
699
  def call_os_diagnose(self, node_list):
700
    """Request a diagnose of OS definitions.
701

702
    This is a multi-node call.
703

704
    """
705
    c = Client("os_diagnose", [])
706
    self._ConnectList(c, node_list)
707
    c.Run()
708
    result = c.GetResults()
709
    new_result = {}
710
    for node_name in result:
711
      if result[node_name]:
712
        nr = [objects.OS.FromDict(oss) for oss in result[node_name]]
713
      else:
714
        nr = []
715
      new_result[node_name] = nr
716
    return new_result
717

    
718
  def call_os_get(self, node, name):
719
    """Returns an OS definition.
720

721
    This is a single-node call.
722

723
    """
724
    c = Client("os_get", [name])
725
    self._ConnectNode(c, node)
726
    c.Run()
727
    result = c.GetResults().get(node, False)
728
    if isinstance(result, dict):
729
      return objects.OS.FromDict(result)
730
    else:
731
      return result
732

    
733
  def call_hooks_runner(self, node_list, hpath, phase, env):
734
    """Call the hooks runner.
735

736
    Args:
737
      - op: the OpCode instance
738
      - env: a dictionary with the environment
739

740
    This is a multi-node call.
741

742
    """
743
    params = [hpath, phase, env]
744
    c = Client("hooks_runner", params)
745
    self._ConnectList(c, node_list)
746
    c.Run()
747
    result = c.GetResults()
748
    return result
749

    
750
  def call_iallocator_runner(self, node, name, idata):
751
    """Call an iallocator on a remote node
752

753
    Args:
754
      - name: the iallocator name
755
      - input: the json-encoded input string
756

757
    This is a single-node call.
758

759
    """
760
    params = [name, idata]
761
    c = Client("iallocator_runner", params)
762
    self._ConnectNode(c, node)
763
    c.Run()
764
    result = c.GetResults().get(node, False)
765
    return result
766

    
767
  def call_blockdev_grow(self, node, cf_bdev, amount):
768
    """Request a snapshot of the given block device.
769

770
    This is a single-node call.
771

772
    """
773
    c = Client("blockdev_grow", [cf_bdev.ToDict(), amount])
774
    self._ConnectNode(c, node)
775
    c.Run()
776
    return c.GetResults().get(node, False)
777

    
778
  def call_blockdev_snapshot(self, node, cf_bdev):
779
    """Request a snapshot of the given block device.
780

781
    This is a single-node call.
782

783
    """
784
    c = Client("blockdev_snapshot", [cf_bdev.ToDict()])
785
    self._ConnectNode(c, node)
786
    c.Run()
787
    return c.GetResults().get(node, False)
788

    
789
  def call_snapshot_export(self, node, snap_bdev, dest_node, instance,
790
                           cluster_name, idx):
791
    """Request the export of a given snapshot.
792

793
    This is a single-node call.
794

795
    """
796
    params = [snap_bdev.ToDict(), dest_node,
797
              self._InstDict(instance), cluster_name, idx]
798
    c = Client("snapshot_export", params)
799
    self._ConnectNode(c, node)
800
    c.Run()
801
    return c.GetResults().get(node, False)
802

    
803
  def call_finalize_export(self, node, instance, snap_disks):
804
    """Request the completion of an export operation.
805

806
    This writes the export config file, etc.
807

808
    This is a single-node call.
809

810
    """
811
    flat_disks = []
812
    for disk in snap_disks:
813
      flat_disks.append(disk.ToDict())
814
    params = [self._InstDict(instance), flat_disks]
815
    c = Client("finalize_export", params)
816
    self._ConnectNode(c, node)
817
    c.Run()
818
    return c.GetResults().get(node, False)
819

    
820
  def call_export_info(self, node, path):
821
    """Queries the export information in a given path.
822

823
    This is a single-node call.
824

825
    """
826
    c = Client("export_info", [path])
827
    self._ConnectNode(c, node)
828
    c.Run()
829
    result = c.GetResults().get(node, False)
830
    if not result:
831
      return result
832
    return objects.SerializableConfigParser.Loads(str(result))
833

    
834
  def call_instance_os_import(self, node, inst, src_node, src_images,
835
                              cluster_name):
836
    """Request the import of a backup into an instance.
837

838
    This is a single-node call.
839

840
    """
841
    params = [self._InstDict(inst), src_node, src_images, cluster_name]
842
    c = Client("instance_os_import", params)
843
    self._ConnectNode(c, node)
844
    c.Run()
845
    return c.GetResults().get(node, False)
846

    
847
  def call_export_list(self, node_list):
848
    """Gets the stored exports list.
849

850
    This is a multi-node call.
851

852
    """
853
    c = Client("export_list", [])
854
    self._ConnectList(c, node_list)
855
    c.Run()
856
    result = c.GetResults()
857
    return result
858

    
859
  def call_export_remove(self, node, export):
860
    """Requests removal of a given export.
861

862
    This is a single-node call.
863

864
    """
865
    c = Client("export_remove", [export])
866
    self._ConnectNode(c, node)
867
    c.Run()
868
    return c.GetResults().get(node, False)
869

    
870
  @staticmethod
871
  def call_node_leave_cluster(node):
872
    """Requests a node to clean the cluster information it has.
873

874
    This will remove the configuration information from the ganeti data
875
    dir.
876

877
    This is a single-node call.
878

879
    """
880
    c = Client("node_leave_cluster", [])
881
    c.ConnectNode(node)
882
    c.Run()
883
    return c.GetResults().get(node, False)
884

    
885
  def call_node_volumes(self, node_list):
886
    """Gets all volumes on node(s).
887

888
    This is a multi-node call.
889

890
    """
891
    c = Client("node_volumes", [])
892
    self._ConnectList(c, node_list)
893
    c.Run()
894
    return c.GetResults()
895

    
896
  def call_test_delay(self, node_list, duration):
897
    """Sleep for a fixed time on given node(s).
898

899
    This is a multi-node call.
900

901
    """
902
    c = Client("test_delay", [duration])
903
    self._ConnectList(c, node_list)
904
    c.Run()
905
    return c.GetResults()
906

    
907
  def call_file_storage_dir_create(self, node, file_storage_dir):
908
    """Create the given file storage directory.
909

910
    This is a single-node call.
911

912
    """
913
    c = Client("file_storage_dir_create", [file_storage_dir])
914
    self._ConnectNode(c, node)
915
    c.Run()
916
    return c.GetResults().get(node, False)
917

    
918
  def call_file_storage_dir_remove(self, node, file_storage_dir):
919
    """Remove the given file storage directory.
920

921
    This is a single-node call.
922

923
    """
924
    c = Client("file_storage_dir_remove", [file_storage_dir])
925
    self._ConnectNode(c, node)
926
    c.Run()
927
    return c.GetResults().get(node, False)
928

    
929
  def call_file_storage_dir_rename(self, node, old_file_storage_dir,
930
                                   new_file_storage_dir):
931
    """Rename file storage directory.
932

933
    This is a single-node call.
934

935
    """
936
    c = Client("file_storage_dir_rename",
937
               [old_file_storage_dir, new_file_storage_dir])
938
    self._ConnectNode(c, node)
939
    c.Run()
940
    return c.GetResults().get(node, False)
941

    
942
  @staticmethod
943
  def call_jobqueue_update(node_list, address_list, file_name, content):
944
    """Update job queue.
945

946
    This is a multi-node call.
947

948
    """
949
    c = Client("jobqueue_update", [file_name, content])
950
    c.ConnectList(node_list, address_list=address_list)
951
    c.Run()
952
    result = c.GetResults()
953
    return result
954

    
955
  @staticmethod
956
  def call_jobqueue_purge(node):
957
    """Purge job queue.
958

959
    This is a single-node call.
960

961
    """
962
    c = Client("jobqueue_purge", [])
963
    c.ConnectNode(node)
964
    c.Run()
965
    return c.GetResults().get(node, False)
966

    
967
  @staticmethod
968
  def call_jobqueue_rename(node_list, address_list, old, new):
969
    """Rename a job queue file.
970

971
    This is a multi-node call.
972

973
    """
974
    c = Client("jobqueue_rename", [old, new])
975
    c.ConnectList(node_list, address_list=address_list)
976
    c.Run()
977
    result = c.GetResults()
978
    return result
979

    
980

    
981
  @staticmethod
982
  def call_jobqueue_set_drain(node_list, drain_flag):
983
    """Set the drain flag on the queue.
984

985
    This is a multi-node call.
986

987
    @type node_list: list
988
    @param node_list: the list of nodes to query
989
    @type drain_flag: bool
990
    @param drain_flag: if True, will set the drain flag, otherwise reset it.
991

992
    """
993
    c = Client("jobqueue_set_drain", [drain_flag])
994
    c.ConnectList(node_list)
995
    c.Run()
996
    result = c.GetResults()
997
    return result
998

    
999

    
1000
  def call_hypervisor_validate_params(self, node_list, hvname, hvparams):
1001
    """Validate the hypervisor params.
1002

1003
    This is a multi-node call.
1004

1005
    @type node_list: list
1006
    @param node_list: the list of nodes to query
1007
    @type hvname: string
1008
    @param hvname: the hypervisor name
1009
    @type hvparams: dict
1010
    @param hvparams: the hypervisor parameters to be validated
1011

1012
    """
1013
    cluster = self._cfg.GetClusterInfo()
1014
    hv_full = cluster.FillDict(cluster.hvparams.get(hvname, {}), hvparams)
1015
    c = Client("hypervisor_validate_params", [hvname, hv_full])
1016
    self._ConnectList(c, node_list)
1017
    c.Run()
1018
    result = c.GetResults()
1019
    return result