Statistics
| Branch: | Tag: | Revision:

root / lib / rpc.py @ 3f3dfc15

History | View | Annotate | Download (28.2 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Inter-node RPC library.
23

24
"""
25

    
26
# pylint: disable-msg=C0103,R0201,R0904
27
# C0103: Invalid name, since call_ are not valid
28
# R0201: Method could be a function, we keep all rpcs instance methods
29
# as not to change them back and forth between static/instance methods
30
# if they need to start using instance attributes
31
# R0904: Too many public methods
32

    
33
import os
34
import socket
35
import logging
36
import zlib
37
import base64
38

    
39
from ganeti import utils
40
from ganeti import objects
41
from ganeti import http
42
from ganeti import serializer
43
from ganeti import constants
44
from ganeti import errors
45

    
46
import ganeti.http.client
47

    
48

    
49
# Module level variable
50
_http_manager = None
51

    
52

    
53
def Init():
54
  """Initializes the module-global HTTP client manager.
55

56
  Must be called before using any RPC function.
57

58
  """
59
  global _http_manager
60

    
61
  assert not _http_manager, "RPC module initialized more than once"
62

    
63
  _http_manager = http.client.HttpClientManager()
64

    
65

    
66
def Shutdown():
67
  """Stops the module-global HTTP client manager.
68

69
  Must be called before quitting the program.
70

71
  """
72
  global _http_manager
73

    
74
  if _http_manager:
75
    _http_manager.Shutdown()
76
    _http_manager = None
77

    
78

    
79
class RpcResult(object):
80
  """RPC Result class.
81

82
  This class holds an RPC result. It is needed since in multi-node
83
  calls we can't raise an exception just because one one out of many
84
  failed, and therefore we use this class to encapsulate the result.
85

86
  @ivar data: the data payload, for successfull results, or None
87
  @type failed: boolean
88
  @ivar failed: whether the operation failed at RPC level (not
89
      application level on the remote node)
90
  @ivar call: the name of the RPC call
91
  @ivar node: the name of the node to which we made the call
92
  @ivar offline: whether the operation failed because the node was
93
      offline, as opposed to actual failure; offline=True will always
94
      imply failed=True, in order to allow simpler checking if
95
      the user doesn't care about the exact failure mode
96

97
  """
98
  def __init__(self, data=None, failed=False, offline=False,
99
               call=None, node=None):
100
    self.failed = failed
101
    self.offline = offline
102
    self.call = call
103
    self.node = node
104
    if offline:
105
      self.failed = True
106
      self.error = "Node is marked offline"
107
      self.data = None
108
    elif failed:
109
      self.error = data
110
      self.data = None
111
    else:
112
      self.data = data
113
      self.error = None
114

    
115
  def Raise(self):
116
    """If the result has failed, raise an OpExecError.
117

118
    This is used so that LU code doesn't have to check for each
119
    result, but instead can call this function.
120

121
    """
122
    if self.failed:
123
      raise errors.OpExecError("Call '%s' to node '%s' has failed: %s" %
124
                               (self.call, self.node, self.error))
125

    
126

    
127
class Client:
128
  """RPC Client class.
129

130
  This class, given a (remote) method name, a list of parameters and a
131
  list of nodes, will contact (in parallel) all nodes, and return a
132
  dict of results (key: node name, value: result).
133

134
  One current bug is that generic failure is still signalled by
135
  'False' result, which is not good. This overloading of values can
136
  cause bugs.
137

138
  """
139
  def __init__(self, procedure, body, port):
140
    self.procedure = procedure
141
    self.body = body
142
    self.port = port
143
    self.nc = {}
144

    
145
    self._ssl_params = \
146
      http.HttpSslParams(ssl_key_path=constants.SSL_CERT_FILE,
147
                         ssl_cert_path=constants.SSL_CERT_FILE)
148

    
149
  def ConnectList(self, node_list, address_list=None):
150
    """Add a list of nodes to the target nodes.
151

152
    @type node_list: list
153
    @param node_list: the list of node names to connect
154
    @type address_list: list or None
155
    @keyword address_list: either None or a list with node addresses,
156
        which must have the same length as the node list
157

158
    """
159
    if address_list is None:
160
      address_list = [None for _ in node_list]
161
    else:
162
      assert len(node_list) == len(address_list), \
163
             "Name and address lists should have the same length"
164
    for node, address in zip(node_list, address_list):
165
      self.ConnectNode(node, address)
166

    
167
  def ConnectNode(self, name, address=None):
168
    """Add a node to the target list.
169

170
    @type name: str
171
    @param name: the node name
172
    @type address: str
173
    @keyword address: the node address, if known
174

175
    """
176
    if address is None:
177
      address = name
178

    
179
    self.nc[name] = \
180
      http.client.HttpClientRequest(address, self.port, http.HTTP_PUT,
181
                                    "/%s" % self.procedure,
182
                                    post_data=self.body,
183
                                    ssl_params=self._ssl_params,
184
                                    ssl_verify_peer=True)
185

    
186
  def GetResults(self):
187
    """Call nodes and return results.
188

189
    @rtype: list
190
    @returns: List of RPC results
191

192
    """
193
    assert _http_manager, "RPC module not intialized"
194

    
195
    _http_manager.ExecRequests(self.nc.values())
196

    
197
    results = {}
198

    
199
    for name, req in self.nc.iteritems():
200
      if req.success and req.resp_status_code == http.HTTP_OK:
201
        results[name] = RpcResult(data=serializer.LoadJson(req.resp_body),
202
                                  node=name, call=self.procedure)
203
        continue
204

    
205
      # TODO: Better error reporting
206
      if req.error:
207
        msg = req.error
208
      else:
209
        msg = req.resp_body
210

    
211
      logging.error("RPC error from node %s: %s", name, msg)
212
      results[name] = RpcResult(data=msg, failed=True, node=name,
213
                                call=self.procedure)
214

    
215
    return results
216

    
217

    
218
class RpcRunner(object):
219
  """RPC runner class"""
220

    
221
  def __init__(self, cfg):
222
    """Initialized the rpc runner.
223

224
    @type cfg:  C{config.ConfigWriter}
225
    @param cfg: the configuration object that will be used to get data
226
                about the cluster
227

228
    """
229
    self._cfg = cfg
230
    self.port = utils.GetNodeDaemonPort()
231

    
232
  def _InstDict(self, instance):
233
    """Convert the given instance to a dict.
234

235
    This is done via the instance's ToDict() method and additionally
236
    we fill the hvparams with the cluster defaults.
237

238
    @type instance: L{objects.Instance}
239
    @param instance: an Instance object
240
    @rtype: dict
241
    @return: the instance dict, with the hvparams filled with the
242
        cluster defaults
243

244
    """
245
    idict = instance.ToDict()
246
    cluster = self._cfg.GetClusterInfo()
247
    idict["hvparams"] = cluster.FillHV(instance)
248
    idict["beparams"] = cluster.FillBE(instance)
249
    return idict
250

    
251
  def _ConnectList(self, client, node_list):
252
    """Helper for computing node addresses.
253

254
    @type client: L{Client}
255
    @param client: a C{Client} instance
256
    @type node_list: list
257
    @param node_list: the node list we should connect
258

259
    """
260
    all_nodes = self._cfg.GetAllNodesInfo()
261
    name_list = []
262
    addr_list = []
263
    skip_dict = {}
264
    for node in node_list:
265
      if node in all_nodes:
266
        if all_nodes[node].offline:
267
          skip_dict[node] = RpcResult(node=node, offline=True)
268
          continue
269
        val = all_nodes[node].primary_ip
270
      else:
271
        val = None
272
      addr_list.append(val)
273
      name_list.append(node)
274
    if name_list:
275
      client.ConnectList(name_list, address_list=addr_list)
276
    return skip_dict
277

    
278
  def _ConnectNode(self, client, node):
279
    """Helper for computing one node's address.
280

281
    @type client: L{Client}
282
    @param client: a C{Client} instance
283
    @type node: str
284
    @param node: the node we should connect
285

286
    """
287
    node_info = self._cfg.GetNodeInfo(node)
288
    if node_info is not None:
289
      if node_info.offline:
290
        return RpcResult(node=node, offline=True)
291
      addr = node_info.primary_ip
292
    else:
293
      addr = None
294
    client.ConnectNode(node, address=addr)
295

    
296
  def _MultiNodeCall(self, node_list, procedure, args):
297
    """Helper for making a multi-node call
298

299
    """
300
    body = serializer.DumpJson(args, indent=False)
301
    c = Client(procedure, body, self.port)
302
    skip_dict = self._ConnectList(c, node_list)
303
    skip_dict.update(c.GetResults())
304
    return skip_dict
305

    
306
  @classmethod
307
  def _StaticMultiNodeCall(cls, node_list, procedure, args,
308
                           address_list=None):
309
    """Helper for making a multi-node static call
310

311
    """
312
    body = serializer.DumpJson(args, indent=False)
313
    c = Client(procedure, body, utils.GetNodeDaemonPort())
314
    c.ConnectList(node_list, address_list=address_list)
315
    return c.GetResults()
316

    
317
  def _SingleNodeCall(self, node, procedure, args):
318
    """Helper for making a single-node call
319

320
    """
321
    body = serializer.DumpJson(args, indent=False)
322
    c = Client(procedure, body, self.port)
323
    result = self._ConnectNode(c, node)
324
    if result is None:
325
      # we did connect, node is not offline
326
      result = c.GetResults()[node]
327
    return result
328

    
329
  @classmethod
330
  def _StaticSingleNodeCall(cls, node, procedure, args):
331
    """Helper for making a single-node static call
332

333
    """
334
    body = serializer.DumpJson(args, indent=False)
335
    c = Client(procedure, body, utils.GetNodeDaemonPort())
336
    c.ConnectNode(node)
337
    return c.GetResults()[node]
338

    
339
  @staticmethod
340
  def _Compress(data):
341
    """Compresses a string for transport over RPC.
342

343
    Small amounts of data are not compressed.
344

345
    @type data: str
346
    @param data: Data
347
    @rtype: tuple
348
    @return: Encoded data to send
349

350
    """
351
    # Small amounts of data are not compressed
352
    if len(data) < 512:
353
      return (constants.RPC_ENCODING_NONE, data)
354

    
355
    # Compress with zlib and encode in base64
356
    return (constants.RPC_ENCODING_ZLIB_BASE64,
357
            base64.b64encode(zlib.compress(data, 3)))
358

    
359
  #
360
  # Begin RPC calls
361
  #
362

    
363
  def call_volume_list(self, node_list, vg_name):
364
    """Gets the logical volumes present in a given volume group.
365

366
    This is a multi-node call.
367

368
    """
369
    return self._MultiNodeCall(node_list, "volume_list", [vg_name])
370

    
371
  def call_vg_list(self, node_list):
372
    """Gets the volume group list.
373

374
    This is a multi-node call.
375

376
    """
377
    return self._MultiNodeCall(node_list, "vg_list", [])
378

    
379
  def call_bridges_exist(self, node, bridges_list):
380
    """Checks if a node has all the bridges given.
381

382
    This method checks if all bridges given in the bridges_list are
383
    present on the remote node, so that an instance that uses interfaces
384
    on those bridges can be started.
385

386
    This is a single-node call.
387

388
    """
389
    return self._SingleNodeCall(node, "bridges_exist", [bridges_list])
390

    
391
  def call_instance_start(self, node, instance, extra_args):
392
    """Starts an instance.
393

394
    This is a single-node call.
395

396
    """
397
    return self._SingleNodeCall(node, "instance_start",
398
                                [self._InstDict(instance), extra_args])
399

    
400
  def call_instance_shutdown(self, node, instance):
401
    """Stops an instance.
402

403
    This is a single-node call.
404

405
    """
406
    return self._SingleNodeCall(node, "instance_shutdown",
407
                                [self._InstDict(instance)])
408

    
409
  def call_instance_migrate(self, node, instance, target, live):
410
    """Migrate an instance.
411

412
    This is a single-node call.
413

414
    @type node: string
415
    @param node: the node on which the instance is currently running
416
    @type instance: C{objects.Instance}
417
    @param instance: the instance definition
418
    @type target: string
419
    @param target: the target node name
420
    @type live: boolean
421
    @param live: whether the migration should be done live or not (the
422
        interpretation of this parameter is left to the hypervisor)
423

424
    """
425
    return self._SingleNodeCall(node, "instance_migrate",
426
                                [self._InstDict(instance), target, live])
427

    
428
  def call_instance_reboot(self, node, instance, reboot_type, extra_args):
429
    """Reboots an instance.
430

431
    This is a single-node call.
432

433
    """
434
    return self._SingleNodeCall(node, "instance_reboot",
435
                                [self._InstDict(instance), reboot_type,
436
                                 extra_args])
437

    
438
  def call_instance_os_add(self, node, inst):
439
    """Installs an OS on the given instance.
440

441
    This is a single-node call.
442

443
    """
444
    return self._SingleNodeCall(node, "instance_os_add",
445
                                [self._InstDict(inst)])
446

    
447
  def call_instance_run_rename(self, node, inst, old_name):
448
    """Run the OS rename script for an instance.
449

450
    This is a single-node call.
451

452
    """
453
    return self._SingleNodeCall(node, "instance_run_rename",
454
                                [self._InstDict(inst), old_name])
455

    
456
  def call_instance_info(self, node, instance, hname):
457
    """Returns information about a single instance.
458

459
    This is a single-node call.
460

461
    @type node: list
462
    @param node: the list of nodes to query
463
    @type instance: string
464
    @param instance: the instance name
465
    @type hname: string
466
    @param hname: the hypervisor type of the instance
467

468
    """
469
    return self._SingleNodeCall(node, "instance_info", [instance, hname])
470

    
471
  def call_all_instances_info(self, node_list, hypervisor_list):
472
    """Returns information about all instances on the given nodes.
473

474
    This is a multi-node call.
475

476
    @type node_list: list
477
    @param node_list: the list of nodes to query
478
    @type hypervisor_list: list
479
    @param hypervisor_list: the hypervisors to query for instances
480

481
    """
482
    return self._MultiNodeCall(node_list, "all_instances_info",
483
                               [hypervisor_list])
484

    
485
  def call_instance_list(self, node_list, hypervisor_list):
486
    """Returns the list of running instances on a given node.
487

488
    This is a multi-node call.
489

490
    @type node_list: list
491
    @param node_list: the list of nodes to query
492
    @type hypervisor_list: list
493
    @param hypervisor_list: the hypervisors to query for instances
494

495
    """
496
    return self._MultiNodeCall(node_list, "instance_list", [hypervisor_list])
497

    
498
  def call_node_tcp_ping(self, node, source, target, port, timeout,
499
                         live_port_needed):
500
    """Do a TcpPing on the remote node
501

502
    This is a single-node call.
503

504
    """
505
    return self._SingleNodeCall(node, "node_tcp_ping",
506
                                [source, target, port, timeout,
507
                                 live_port_needed])
508

    
509
  def call_node_has_ip_address(self, node, address):
510
    """Checks if a node has the given IP address.
511

512
    This is a single-node call.
513

514
    """
515
    return self._SingleNodeCall(node, "node_has_ip_address", [address])
516

    
517
  def call_node_info(self, node_list, vg_name, hypervisor_type):
518
    """Return node information.
519

520
    This will return memory information and volume group size and free
521
    space.
522

523
    This is a multi-node call.
524

525
    @type node_list: list
526
    @param node_list: the list of nodes to query
527
    @type vg_name: C{string}
528
    @param vg_name: the name of the volume group to ask for disk space
529
        information
530
    @type hypervisor_type: C{str}
531
    @param hypervisor_type: the name of the hypervisor to ask for
532
        memory information
533

534
    """
535
    retux = self._MultiNodeCall(node_list, "node_info",
536
                                [vg_name, hypervisor_type])
537

    
538
    for result in retux.itervalues():
539
      if result.failed or not isinstance(result.data, dict):
540
        result.data = {}
541

    
542
      utils.CheckDict(result.data, {
543
        'memory_total' : '-',
544
        'memory_dom0' : '-',
545
        'memory_free' : '-',
546
        'vg_size' : 'node_unreachable',
547
        'vg_free' : '-',
548
        }, "call_node_info")
549
    return retux
550

    
551
  def call_node_add(self, node, dsa, dsapub, rsa, rsapub, ssh, sshpub):
552
    """Add a node to the cluster.
553

554
    This is a single-node call.
555

556
    """
557
    return self._SingleNodeCall(node, "node_add",
558
                                [dsa, dsapub, rsa, rsapub, ssh, sshpub])
559

    
560
  def call_node_verify(self, node_list, checkdict, cluster_name):
561
    """Request verification of given parameters.
562

563
    This is a multi-node call.
564

565
    """
566
    return self._MultiNodeCall(node_list, "node_verify",
567
                               [checkdict, cluster_name])
568

    
569
  @classmethod
570
  def call_node_start_master(cls, node, start_daemons):
571
    """Tells a node to activate itself as a master.
572

573
    This is a single-node call.
574

575
    """
576
    return cls._StaticSingleNodeCall(node, "node_start_master",
577
                                     [start_daemons])
578

    
579
  @classmethod
580
  def call_node_stop_master(cls, node, stop_daemons):
581
    """Tells a node to demote itself from master status.
582

583
    This is a single-node call.
584

585
    """
586
    return cls._StaticSingleNodeCall(node, "node_stop_master", [stop_daemons])
587

    
588
  @classmethod
589
  def call_master_info(cls, node_list):
590
    """Query master info.
591

592
    This is a multi-node call.
593

594
    """
595
    # TODO: should this method query down nodes?
596
    return cls._StaticMultiNodeCall(node_list, "master_info", [])
597

    
598
  def call_version(self, node_list):
599
    """Query node version.
600

601
    This is a multi-node call.
602

603
    """
604
    return self._MultiNodeCall(node_list, "version", [])
605

    
606
  def call_blockdev_create(self, node, bdev, size, owner, on_primary, info):
607
    """Request creation of a given block device.
608

609
    This is a single-node call.
610

611
    """
612
    return self._SingleNodeCall(node, "blockdev_create",
613
                                [bdev.ToDict(), size, owner, on_primary, info])
614

    
615
  def call_blockdev_remove(self, node, bdev):
616
    """Request removal of a given block device.
617

618
    This is a single-node call.
619

620
    """
621
    return self._SingleNodeCall(node, "blockdev_remove", [bdev.ToDict()])
622

    
623
  def call_blockdev_rename(self, node, devlist):
624
    """Request rename of the given block devices.
625

626
    This is a single-node call.
627

628
    """
629
    return self._SingleNodeCall(node, "blockdev_rename",
630
                                [(d.ToDict(), uid) for d, uid in devlist])
631

    
632
  def call_blockdev_assemble(self, node, disk, owner, on_primary):
633
    """Request assembling of a given block device.
634

635
    This is a single-node call.
636

637
    """
638
    return self._SingleNodeCall(node, "blockdev_assemble",
639
                                [disk.ToDict(), owner, on_primary])
640

    
641
  def call_blockdev_shutdown(self, node, disk):
642
    """Request shutdown of a given block device.
643

644
    This is a single-node call.
645

646
    """
647
    return self._SingleNodeCall(node, "blockdev_shutdown", [disk.ToDict()])
648

    
649
  def call_blockdev_addchildren(self, node, bdev, ndevs):
650
    """Request adding a list of children to a (mirroring) device.
651

652
    This is a single-node call.
653

654
    """
655
    return self._SingleNodeCall(node, "blockdev_addchildren",
656
                                [bdev.ToDict(),
657
                                 [disk.ToDict() for disk in ndevs]])
658

    
659
  def call_blockdev_removechildren(self, node, bdev, ndevs):
660
    """Request removing a list of children from a (mirroring) device.
661

662
    This is a single-node call.
663

664
    """
665
    return self._SingleNodeCall(node, "blockdev_removechildren",
666
                                [bdev.ToDict(),
667
                                 [disk.ToDict() for disk in ndevs]])
668

    
669
  def call_blockdev_getmirrorstatus(self, node, disks):
670
    """Request status of a (mirroring) device.
671

672
    This is a single-node call.
673

674
    """
675
    return self._SingleNodeCall(node, "blockdev_getmirrorstatus",
676
                                [dsk.ToDict() for dsk in disks])
677

    
678
  def call_blockdev_find(self, node, disk):
679
    """Request identification of a given block device.
680

681
    This is a single-node call.
682

683
    """
684
    return self._SingleNodeCall(node, "blockdev_find", [disk.ToDict()])
685

    
686
  def call_blockdev_close(self, node, disks):
687
    """Closes the given block devices.
688

689
    This is a single-node call.
690

691
    """
692
    return self._SingleNodeCall(node, "blockdev_close",
693
                                [cf.ToDict() for cf in disks])
694

    
695
  @classmethod
696
  def call_upload_file(cls, node_list, file_name, address_list=None):
697
    """Upload a file.
698

699
    The node will refuse the operation in case the file is not on the
700
    approved file list.
701

702
    This is a multi-node call.
703

704
    @type node_list: list
705
    @param node_list: the list of node names to upload to
706
    @type file_name: str
707
    @param file_name: the filename to upload
708
    @type address_list: list or None
709
    @keyword address_list: an optional list of node addresses, in order
710
        to optimize the RPC speed
711

712
    """
713
    file_contents = utils.ReadFile(file_name)
714
    data = cls._Compress(file_contents)
715
    st = os.stat(file_name)
716
    params = [file_name, data, st.st_mode, st.st_uid, st.st_gid,
717
              st.st_atime, st.st_mtime]
718
    return cls._StaticMultiNodeCall(node_list, "upload_file", params,
719
                                    address_list=address_list)
720

    
721
  @classmethod
722
  def call_write_ssconf_files(cls, node_list, values):
723
    """Write ssconf files.
724

725
    This is a multi-node call.
726

727
    """
728
    return cls._StaticMultiNodeCall(node_list, "write_ssconf_files", [values])
729

    
730
  def call_os_diagnose(self, node_list):
731
    """Request a diagnose of OS definitions.
732

733
    This is a multi-node call.
734

735
    """
736
    result = self._MultiNodeCall(node_list, "os_diagnose", [])
737

    
738
    for node_result in result.values():
739
      if not node_result.failed and node_result.data:
740
        node_result.data = [objects.OS.FromDict(oss)
741
                            for oss in node_result.data]
742
    return result
743

    
744
  def call_os_get(self, node, name):
745
    """Returns an OS definition.
746

747
    This is a single-node call.
748

749
    """
750
    result = self._SingleNodeCall(node, "os_get", [name])
751
    if not result.failed and isinstance(result.data, dict):
752
      result.data = objects.OS.FromDict(result.data)
753
    return result
754

    
755
  def call_hooks_runner(self, node_list, hpath, phase, env):
756
    """Call the hooks runner.
757

758
    Args:
759
      - op: the OpCode instance
760
      - env: a dictionary with the environment
761

762
    This is a multi-node call.
763

764
    """
765
    params = [hpath, phase, env]
766
    return self._MultiNodeCall(node_list, "hooks_runner", params)
767

    
768
  def call_iallocator_runner(self, node, name, idata):
769
    """Call an iallocator on a remote node
770

771
    Args:
772
      - name: the iallocator name
773
      - input: the json-encoded input string
774

775
    This is a single-node call.
776

777
    """
778
    return self._SingleNodeCall(node, "iallocator_runner", [name, idata])
779

    
780
  def call_blockdev_grow(self, node, cf_bdev, amount):
781
    """Request a snapshot of the given block device.
782

783
    This is a single-node call.
784

785
    """
786
    return self._SingleNodeCall(node, "blockdev_grow",
787
                                [cf_bdev.ToDict(), amount])
788

    
789
  def call_blockdev_snapshot(self, node, cf_bdev):
790
    """Request a snapshot of the given block device.
791

792
    This is a single-node call.
793

794
    """
795
    return self._SingleNodeCall(node, "blockdev_snapshot", [cf_bdev.ToDict()])
796

    
797
  def call_snapshot_export(self, node, snap_bdev, dest_node, instance,
798
                           cluster_name, idx):
799
    """Request the export of a given snapshot.
800

801
    This is a single-node call.
802

803
    """
804
    return self._SingleNodeCall(node, "snapshot_export",
805
                                [snap_bdev.ToDict(), dest_node,
806
                                 self._InstDict(instance), cluster_name, idx])
807

    
808
  def call_finalize_export(self, node, instance, snap_disks):
809
    """Request the completion of an export operation.
810

811
    This writes the export config file, etc.
812

813
    This is a single-node call.
814

815
    """
816
    flat_disks = []
817
    for disk in snap_disks:
818
      flat_disks.append(disk.ToDict())
819

    
820
    return self._SingleNodeCall(node, "finalize_export",
821
                                [self._InstDict(instance), flat_disks])
822

    
823
  def call_export_info(self, node, path):
824
    """Queries the export information in a given path.
825

826
    This is a single-node call.
827

828
    """
829
    result = self._SingleNodeCall(node, "export_info", [path])
830
    if not result.failed and result.data:
831
      result.data = objects.SerializableConfigParser.Loads(str(result.data))
832
    return result
833

    
834
  def call_instance_os_import(self, node, inst, src_node, src_images,
835
                              cluster_name):
836
    """Request the import of a backup into an instance.
837

838
    This is a single-node call.
839

840
    """
841
    return self._SingleNodeCall(node, "instance_os_import",
842
                                [self._InstDict(inst), src_node, src_images,
843
                                 cluster_name])
844

    
845
  def call_export_list(self, node_list):
846
    """Gets the stored exports list.
847

848
    This is a multi-node call.
849

850
    """
851
    return self._MultiNodeCall(node_list, "export_list", [])
852

    
853
  def call_export_remove(self, node, export):
854
    """Requests removal of a given export.
855

856
    This is a single-node call.
857

858
    """
859
    return self._SingleNodeCall(node, "export_remove", [export])
860

    
861
  @classmethod
862
  def call_node_leave_cluster(cls, node):
863
    """Requests a node to clean the cluster information it has.
864

865
    This will remove the configuration information from the ganeti data
866
    dir.
867

868
    This is a single-node call.
869

870
    """
871
    return cls._StaticSingleNodeCall(node, "node_leave_cluster", [])
872

    
873
  def call_node_volumes(self, node_list):
874
    """Gets all volumes on node(s).
875

876
    This is a multi-node call.
877

878
    """
879
    return self._MultiNodeCall(node_list, "node_volumes", [])
880

    
881
  def call_node_demote_from_mc(self, node):
882
    """Demote a node from the master candidate role.
883

884
    This is a single-node call.
885

886
    """
887
    return self._SingleNodeCall(node, "node_demote_from_mc", [])
888

    
889
  def call_test_delay(self, node_list, duration):
890
    """Sleep for a fixed time on given node(s).
891

892
    This is a multi-node call.
893

894
    """
895
    return self._MultiNodeCall(node_list, "test_delay", [duration])
896

    
897
  def call_file_storage_dir_create(self, node, file_storage_dir):
898
    """Create the given file storage directory.
899

900
    This is a single-node call.
901

902
    """
903
    return self._SingleNodeCall(node, "file_storage_dir_create",
904
                                [file_storage_dir])
905

    
906
  def call_file_storage_dir_remove(self, node, file_storage_dir):
907
    """Remove the given file storage directory.
908

909
    This is a single-node call.
910

911
    """
912
    return self._SingleNodeCall(node, "file_storage_dir_remove",
913
                                [file_storage_dir])
914

    
915
  def call_file_storage_dir_rename(self, node, old_file_storage_dir,
916
                                   new_file_storage_dir):
917
    """Rename file storage directory.
918

919
    This is a single-node call.
920

921
    """
922
    return self._SingleNodeCall(node, "file_storage_dir_rename",
923
                                [old_file_storage_dir, new_file_storage_dir])
924

    
925
  @classmethod
926
  def call_jobqueue_update(cls, node_list, address_list, file_name, content):
927
    """Update job queue.
928

929
    This is a multi-node call.
930

931
    """
932
    return cls._StaticMultiNodeCall(node_list, "jobqueue_update",
933
                                    [file_name, cls._Compress(content)],
934
                                    address_list=address_list)
935

    
936
  @classmethod
937
  def call_jobqueue_purge(cls, node):
938
    """Purge job queue.
939

940
    This is a single-node call.
941

942
    """
943
    return cls._StaticSingleNodeCall(node, "jobqueue_purge", [])
944

    
945
  @classmethod
946
  def call_jobqueue_rename(cls, node_list, address_list, old, new):
947
    """Rename a job queue file.
948

949
    This is a multi-node call.
950

951
    """
952
    return cls._StaticMultiNodeCall(node_list, "jobqueue_rename", [old, new],
953
                                    address_list=address_list)
954

    
955
  @classmethod
956
  def call_jobqueue_set_drain(cls, node_list, drain_flag):
957
    """Set the drain flag on the queue.
958

959
    This is a multi-node call.
960

961
    @type node_list: list
962
    @param node_list: the list of nodes to query
963
    @type drain_flag: bool
964
    @param drain_flag: if True, will set the drain flag, otherwise reset it.
965

966
    """
967
    return cls._StaticMultiNodeCall(node_list, "jobqueue_set_drain",
968
                                    [drain_flag])
969

    
970
  def call_hypervisor_validate_params(self, node_list, hvname, hvparams):
971
    """Validate the hypervisor params.
972

973
    This is a multi-node call.
974

975
    @type node_list: list
976
    @param node_list: the list of nodes to query
977
    @type hvname: string
978
    @param hvname: the hypervisor name
979
    @type hvparams: dict
980
    @param hvparams: the hypervisor parameters to be validated
981

982
    """
983
    cluster = self._cfg.GetClusterInfo()
984
    hv_full = cluster.FillDict(cluster.hvparams.get(hvname, {}), hvparams)
985
    return self._MultiNodeCall(node_list, "hypervisor_validate_params",
986
                               [hvname, hv_full])