Statistics
| Branch: | Tag: | Revision:

root / test / py / cmdlib / backup_unittest.py @ 70b634e6

History | View | Annotate | Download (7.6 kB)

1
#!/usr/bin/python
2
#
3

    
4
# Copyright (C) 2013 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Tests for LUBackup*"""
23

    
24
from ganeti import constants
25
from ganeti import objects
26
from ganeti import opcodes
27
from ganeti import query
28

    
29
from testsupport import *
30

    
31
import testutils
32

    
33

    
34
class TestLUBackupQuery(CmdlibTestCase):
35
  def setUp(self):
36
    super(TestLUBackupQuery, self).setUp()
37

    
38
    self.fields = query._BuildExportFields().keys()
39

    
40
  def testFailingExportList(self):
41
    self.rpc.call_export_list.return_value = \
42
      self.RpcResultsBuilder() \
43
        .AddFailedNode(self.master) \
44
        .Build()
45
    op = opcodes.OpBackupQuery(nodes=[self.master.name])
46
    ret = self.ExecOpCode(op)
47
    self.assertEqual({self.master.name: False}, ret)
48

    
49
  def testQueryOneNode(self):
50
    self.rpc.call_export_list.return_value = \
51
      self.RpcResultsBuilder() \
52
        .AddSuccessfulNode(self.master,
53
                           ["mock_export1", "mock_export2"]) \
54
        .Build()
55
    op = opcodes.OpBackupQuery(nodes=[self.master.name])
56
    ret = self.ExecOpCode(op)
57
    self.assertEqual({self.master.name: ["mock_export1", "mock_export2"]}, ret)
58

    
59
  def testQueryAllNodes(self):
60
    node = self.cfg.AddNewNode()
61
    self.rpc.call_export_list.return_value = \
62
      self.RpcResultsBuilder() \
63
        .AddSuccessfulNode(self.master, ["mock_export1"]) \
64
        .AddSuccessfulNode(node, ["mock_export2"]) \
65
        .Build()
66
    op = opcodes.OpBackupQuery()
67
    ret = self.ExecOpCode(op)
68
    self.assertEqual({
69
                       self.master.name: ["mock_export1"],
70
                       node.name: ["mock_export2"]
71
                     }, ret)
72

    
73

    
74
class TestLUBackupPrepare(CmdlibTestCase):
75
  @patchUtils("instance_utils")
76
  def testPrepareLocalExport(self, utils):
77
    utils.ReadOneLineFile.return_value = "cluster_secret"
78
    inst = self.cfg.AddNewInstance()
79
    op = opcodes.OpBackupPrepare(instance_name=inst.name,
80
                                 mode=constants.EXPORT_MODE_LOCAL)
81
    self.ExecOpCode(op)
82

    
83
  @patchUtils("instance_utils")
84
  def testPrepareRemoteExport(self, utils):
85
    utils.ReadOneLineFile.return_value = "cluster_secret"
86
    inst = self.cfg.AddNewInstance()
87
    self.rpc.call_x509_cert_create.return_value = \
88
      self.RpcResultsBuilder() \
89
        .CreateSuccessfulNodeResult(inst.primary_node,
90
                                    ("key_name",
91
                                     testutils.ReadTestData("cert1.pem")))
92
    op = opcodes.OpBackupPrepare(instance_name=inst.name,
93
                                 mode=constants.EXPORT_MODE_REMOTE)
94
    self.ExecOpCode(op)
95

    
96

    
97
class TestLUBackupExportBase(CmdlibTestCase):
98
  def setUp(self):
99
    super(TestLUBackupExportBase, self).setUp()
100

    
101
    self.rpc.call_instance_start.return_value = \
102
      self.RpcResultsBuilder() \
103
        .CreateSuccessfulNodeResult(self.master, True)
104

    
105
    self.rpc.call_blockdev_assemble.return_value = \
106
      self.RpcResultsBuilder() \
107
        .CreateSuccessfulNodeResult(self.master, None)
108

    
109
    self.rpc.call_blockdev_shutdown.return_value = \
110
      self.RpcResultsBuilder() \
111
        .CreateSuccessfulNodeResult(self.master, None)
112

    
113
    self.rpc.call_blockdev_snapshot.return_value = \
114
      self.RpcResultsBuilder() \
115
        .CreateSuccessfulNodeResult(self.master, ("mock_vg", "mock_id"))
116

    
117
    self.rpc.call_blockdev_remove.return_value = \
118
      self.RpcResultsBuilder() \
119
        .CreateSuccessfulNodeResult(self.master, None)
120

    
121
    self.rpc.call_export_start.return_value = \
122
      self.RpcResultsBuilder() \
123
        .CreateSuccessfulNodeResult(self.master, "export_daemon")
124

    
125
    def ImpExpStatus(node_uuid, name):
126
      return self.RpcResultsBuilder() \
127
               .CreateSuccessfulNodeResult(node_uuid,
128
                                           [objects.ImportExportStatus(
129
                                             exit_status=0
130
                                           )])
131
    self.rpc.call_impexp_status.side_effect = ImpExpStatus
132

    
133
    def ImpExpCleanup(node_uuid, name):
134
      return self.RpcResultsBuilder() \
135
               .CreateSuccessfulNodeResult(node_uuid)
136
    self.rpc.call_impexp_cleanup.side_effect = ImpExpCleanup
137

    
138
    self.rpc.call_finalize_export.return_value = \
139
      self.RpcResultsBuilder() \
140
        .CreateSuccessfulNodeResult(self.master, None)
141

    
142
  def testRemoveRunningInstanceWithoutShutdown(self):
143
    inst = self.cfg.AddNewInstance(admin_state=constants.ADMINST_UP)
144
    op = opcodes.OpBackupExport(instance_name=inst.name,
145
                                target_node=self.master.name,
146
                                shutdown=False,
147
                                remove_instance=True)
148
    self.ExecOpCodeExpectOpPrereqError(
149
      op, "Can not remove instance without shutting it down before")
150

    
151
  def testUnsupportedDiskTemplate(self):
152
    inst = self.cfg.AddNewInstance(disk_template=constants.DT_FILE)
153
    op = opcodes.OpBackupExport(instance_name=inst.name,
154
                                target_node=self.master.name)
155
    self.ExecOpCodeExpectOpPrereqError(
156
      op, "Export not supported for instances with file-based disks")
157

    
158

    
159
class TestLUBackupExportLocalExport(TestLUBackupExportBase):
160
  def setUp(self):
161
    super(TestLUBackupExportLocalExport, self).setUp()
162

    
163
    self.inst = self.cfg.AddNewInstance()
164
    self.target_node = self.cfg.AddNewNode()
165
    self.op = opcodes.OpBackupExport(mode=constants.EXPORT_MODE_LOCAL,
166
                                     instance_name=self.inst.name,
167
                                     target_node=self.target_node.name)
168

    
169
    self.rpc.call_import_start.return_value = \
170
      self.RpcResultsBuilder() \
171
        .CreateSuccessfulNodeResult(self.target_node, "import_daemon")
172

    
173
  def testExportWithShutdown(self):
174
    inst = self.cfg.AddNewInstance(admin_state=constants.ADMINST_UP)
175
    op = self.CopyOpCode(self.op, instance_name=inst.name, shutdown=True)
176
    self.ExecOpCode(op)
177

    
178
  def testExportDeactivatedDisks(self):
179
    self.ExecOpCode(self.op)
180

    
181
  def testExportRemoveInstance(self):
182
    op = self.CopyOpCode(self.op, remove_instance=True)
183
    self.ExecOpCode(op)
184

    
185

    
186
class TestLUBackupExportRemoteExport(TestLUBackupExportBase):
187
  def setUp(self):
188
    super(TestLUBackupExportRemoteExport, self).setUp()
189

    
190
    self.inst = self.cfg.AddNewInstance()
191
    self.op = opcodes.OpBackupExport(mode=constants.EXPORT_MODE_REMOTE,
192
                                     instance_name=self.inst.name,
193
                                     target_node=[],
194
                                     x509_key_name=["mock_key_name"],
195
                                     destination_x509_ca="mock_dest_ca")
196

    
197
  def testRemoteExportWithoutX509KeyName(self):
198
    op = self.CopyOpCode(self.op, x509_key_name=self.REMOVE)
199
    self.ExecOpCodeExpectOpPrereqError(op,
200
                                       "Missing X509 key name for encryption")
201

    
202
  def testRemoteExportWithoutX509DestCa(self):
203
    op = self.CopyOpCode(self.op, destination_x509_ca=self.REMOVE)
204
    self.ExecOpCodeExpectOpPrereqError(op,
205
                                       "Missing destination X509 CA")
206

    
207

    
208
if __name__ == "__main__":
209
  testutils.GanetiTestProgram()