Add unit tests for LUBackupExport
[ganeti-local] / test / py / cmdlib / backup_unittest.py
1 #!/usr/bin/python
2 #
3
4 # Copyright (C) 2013 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Tests for LUBackup*"""
23
24 from ganeti import constants
25 from ganeti import objects
26 from ganeti import opcodes
27 from ganeti import query
28
29 from testsupport import *
30
31 import testutils
32
33
34 class TestLUBackupQuery(CmdlibTestCase):
35   def setUp(self):
36     super(TestLUBackupQuery, self).setUp()
37
38     self.fields = query._BuildExportFields().keys()
39
40   def testFailingExportList(self):
41     self.rpc.call_export_list.return_value = \
42       self.RpcResultsBuilder() \
43         .AddFailedNode(self.master) \
44         .Build()
45     op = opcodes.OpBackupQuery(nodes=[self.master.name])
46     ret = self.ExecOpCode(op)
47     self.assertEqual({self.master.name: False}, ret)
48
49   def testQueryOneNode(self):
50     self.rpc.call_export_list.return_value = \
51       self.RpcResultsBuilder() \
52         .AddSuccessfulNode(self.master,
53                            ["mock_export1", "mock_export2"]) \
54         .Build()
55     op = opcodes.OpBackupQuery(nodes=[self.master.name])
56     ret = self.ExecOpCode(op)
57     self.assertEqual({self.master.name: ["mock_export1", "mock_export2"]}, ret)
58
59   def testQueryAllNodes(self):
60     node = self.cfg.AddNewNode()
61     self.rpc.call_export_list.return_value = \
62       self.RpcResultsBuilder() \
63         .AddSuccessfulNode(self.master, ["mock_export1"]) \
64         .AddSuccessfulNode(node, ["mock_export2"]) \
65         .Build()
66     op = opcodes.OpBackupQuery()
67     ret = self.ExecOpCode(op)
68     self.assertEqual({
69                        self.master.name: ["mock_export1"],
70                        node.name: ["mock_export2"]
71                      }, ret)
72
73
74 class TestLUBackupPrepare(CmdlibTestCase):
75   @patchUtils("instance_utils")
76   def testPrepareLocalExport(self, utils):
77     utils.ReadOneLineFile.return_value = "cluster_secret"
78     inst = self.cfg.AddNewInstance()
79     op = opcodes.OpBackupPrepare(instance_name=inst.name,
80                                  mode=constants.EXPORT_MODE_LOCAL)
81     self.ExecOpCode(op)
82
83   @patchUtils("instance_utils")
84   def testPrepareRemoteExport(self, utils):
85     utils.ReadOneLineFile.return_value = "cluster_secret"
86     inst = self.cfg.AddNewInstance()
87     self.rpc.call_x509_cert_create.return_value = \
88       self.RpcResultsBuilder() \
89         .CreateSuccessfulNodeResult(inst.primary_node,
90                                     ("key_name",
91                                      testutils.ReadTestData("cert1.pem")))
92     op = opcodes.OpBackupPrepare(instance_name=inst.name,
93                                  mode=constants.EXPORT_MODE_REMOTE)
94     self.ExecOpCode(op)
95
96
97 class TestLUBackupExportBase(CmdlibTestCase):
98   def setUp(self):
99     super(TestLUBackupExportBase, self).setUp()
100
101     self.rpc.call_instance_start.return_value = \
102       self.RpcResultsBuilder() \
103         .CreateSuccessfulNodeResult(self.master, True)
104
105     self.rpc.call_blockdev_assemble.return_value = \
106       self.RpcResultsBuilder() \
107         .CreateSuccessfulNodeResult(self.master, None)
108
109     self.rpc.call_blockdev_shutdown.return_value = \
110       self.RpcResultsBuilder() \
111         .CreateSuccessfulNodeResult(self.master, None)
112
113     self.rpc.call_blockdev_snapshot.return_value = \
114       self.RpcResultsBuilder() \
115         .CreateSuccessfulNodeResult(self.master, ("mock_vg", "mock_id"))
116
117     self.rpc.call_blockdev_remove.return_value = \
118       self.RpcResultsBuilder() \
119         .CreateSuccessfulNodeResult(self.master, None)
120
121     self.rpc.call_export_start.return_value = \
122       self.RpcResultsBuilder() \
123         .CreateSuccessfulNodeResult(self.master, "export_daemon")
124
125     def ImpExpStatus(node_uuid, name):
126       return self.RpcResultsBuilder() \
127                .CreateSuccessfulNodeResult(node_uuid,
128                                            [objects.ImportExportStatus(
129                                              exit_status=0
130                                            )])
131     self.rpc.call_impexp_status.side_effect = ImpExpStatus
132
133     def ImpExpCleanup(node_uuid, name):
134       return self.RpcResultsBuilder() \
135                .CreateSuccessfulNodeResult(node_uuid)
136     self.rpc.call_impexp_cleanup.side_effect = ImpExpCleanup
137
138     self.rpc.call_finalize_export.return_value = \
139       self.RpcResultsBuilder() \
140         .CreateSuccessfulNodeResult(self.master, None)
141
142   def testRemoveRunningInstanceWithoutShutdown(self):
143     inst = self.cfg.AddNewInstance(admin_state=constants.ADMINST_UP)
144     op = opcodes.OpBackupExport(instance_name=inst.name,
145                                 target_node=self.master.name,
146                                 shutdown=False,
147                                 remove_instance=True)
148     self.ExecOpCodeExpectOpPrereqError(
149       op, "Can not remove instance without shutting it down before")
150
151   def testUnsupportedDiskTemplate(self):
152     inst = self.cfg.AddNewInstance(disk_template=constants.DT_FILE)
153     op = opcodes.OpBackupExport(instance_name=inst.name,
154                                 target_node=self.master.name)
155     self.ExecOpCodeExpectOpPrereqError(
156       op, "Export not supported for instances with file-based disks")
157
158
159 class TestLUBackupExportLocalExport(TestLUBackupExportBase):
160   def setUp(self):
161     super(TestLUBackupExportLocalExport, self).setUp()
162
163     self.inst = self.cfg.AddNewInstance()
164     self.target_node = self.cfg.AddNewNode()
165     self.op = opcodes.OpBackupExport(mode=constants.EXPORT_MODE_LOCAL,
166                                      instance_name=self.inst.name,
167                                      target_node=self.target_node.name)
168
169     self.rpc.call_import_start.return_value = \
170       self.RpcResultsBuilder() \
171         .CreateSuccessfulNodeResult(self.target_node, "import_daemon")
172
173   def testExportWithShutdown(self):
174     inst = self.cfg.AddNewInstance(admin_state=constants.ADMINST_UP)
175     op = self.CopyOpCode(self.op, instance_name=inst.name, shutdown=True)
176     self.ExecOpCode(op)
177
178   def testExportDeactivatedDisks(self):
179     self.ExecOpCode(self.op)
180
181   def testExportRemoveInstance(self):
182     op = self.CopyOpCode(self.op, remove_instance=True)
183     self.ExecOpCode(op)
184
185
186 class TestLUBackupExportRemoteExport(TestLUBackupExportBase):
187   def setUp(self):
188     super(TestLUBackupExportRemoteExport, self).setUp()
189
190     self.inst = self.cfg.AddNewInstance()
191     self.op = opcodes.OpBackupExport(mode=constants.EXPORT_MODE_REMOTE,
192                                      instance_name=self.inst.name,
193                                      target_node=[],
194                                      x509_key_name=["mock_key_name"],
195                                      destination_x509_ca="mock_dest_ca")
196
197   def testRemoteExportWithoutX509KeyName(self):
198     op = self.CopyOpCode(self.op, x509_key_name=self.REMOVE)
199     self.ExecOpCodeExpectOpPrereqError(op,
200                                        "Missing X509 key name for encryption")
201
202   def testRemoteExportWithoutX509DestCa(self):
203     op = self.CopyOpCode(self.op, destination_x509_ca=self.REMOVE)
204     self.ExecOpCodeExpectOpPrereqError(op,
205                                        "Missing destination X509 CA")
206
207
208 if __name__ == "__main__":
209   testutils.GanetiTestProgram()