4 # Copyright (C) 2013 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Tests for LUBackup*"""
24 from ganeti import constants
25 from ganeti import objects
26 from ganeti import opcodes
27 from ganeti import query
29 from testsupport import *
34 class TestLUBackupQuery(CmdlibTestCase):
36 super(TestLUBackupQuery, self).setUp()
38 self.fields = query._BuildExportFields().keys()
40 def testFailingExportList(self):
41 self.rpc.call_export_list.return_value = \
42 self.RpcResultsBuilder() \
43 .AddFailedNode(self.master) \
45 op = opcodes.OpBackupQuery(nodes=[self.master.name])
46 ret = self.ExecOpCode(op)
47 self.assertEqual({self.master.name: False}, ret)
49 def testQueryOneNode(self):
50 self.rpc.call_export_list.return_value = \
51 self.RpcResultsBuilder() \
52 .AddSuccessfulNode(self.master,
53 ["mock_export1", "mock_export2"]) \
55 op = opcodes.OpBackupQuery(nodes=[self.master.name])
56 ret = self.ExecOpCode(op)
57 self.assertEqual({self.master.name: ["mock_export1", "mock_export2"]}, ret)
59 def testQueryAllNodes(self):
60 node = self.cfg.AddNewNode()
61 self.rpc.call_export_list.return_value = \
62 self.RpcResultsBuilder() \
63 .AddSuccessfulNode(self.master, ["mock_export1"]) \
64 .AddSuccessfulNode(node, ["mock_export2"]) \
66 op = opcodes.OpBackupQuery()
67 ret = self.ExecOpCode(op)
69 self.master.name: ["mock_export1"],
70 node.name: ["mock_export2"]
74 class TestLUBackupPrepare(CmdlibTestCase):
75 @patchUtils("instance_utils")
76 def testPrepareLocalExport(self, utils):
77 utils.ReadOneLineFile.return_value = "cluster_secret"
78 inst = self.cfg.AddNewInstance()
79 op = opcodes.OpBackupPrepare(instance_name=inst.name,
80 mode=constants.EXPORT_MODE_LOCAL)
83 @patchUtils("instance_utils")
84 def testPrepareRemoteExport(self, utils):
85 utils.ReadOneLineFile.return_value = "cluster_secret"
86 inst = self.cfg.AddNewInstance()
87 self.rpc.call_x509_cert_create.return_value = \
88 self.RpcResultsBuilder() \
89 .CreateSuccessfulNodeResult(inst.primary_node,
91 testutils.ReadTestData("cert1.pem")))
92 op = opcodes.OpBackupPrepare(instance_name=inst.name,
93 mode=constants.EXPORT_MODE_REMOTE)
97 class TestLUBackupExportBase(CmdlibTestCase):
99 super(TestLUBackupExportBase, self).setUp()
101 self.rpc.call_instance_start.return_value = \
102 self.RpcResultsBuilder() \
103 .CreateSuccessfulNodeResult(self.master, True)
105 self.rpc.call_blockdev_assemble.return_value = \
106 self.RpcResultsBuilder() \
107 .CreateSuccessfulNodeResult(self.master, None)
109 self.rpc.call_blockdev_shutdown.return_value = \
110 self.RpcResultsBuilder() \
111 .CreateSuccessfulNodeResult(self.master, None)
113 self.rpc.call_blockdev_snapshot.return_value = \
114 self.RpcResultsBuilder() \
115 .CreateSuccessfulNodeResult(self.master, ("mock_vg", "mock_id"))
117 self.rpc.call_blockdev_remove.return_value = \
118 self.RpcResultsBuilder() \
119 .CreateSuccessfulNodeResult(self.master, None)
121 self.rpc.call_export_start.return_value = \
122 self.RpcResultsBuilder() \
123 .CreateSuccessfulNodeResult(self.master, "export_daemon")
125 def ImpExpStatus(node_uuid, name):
126 return self.RpcResultsBuilder() \
127 .CreateSuccessfulNodeResult(node_uuid,
128 [objects.ImportExportStatus(
131 self.rpc.call_impexp_status.side_effect = ImpExpStatus
133 def ImpExpCleanup(node_uuid, name):
134 return self.RpcResultsBuilder() \
135 .CreateSuccessfulNodeResult(node_uuid)
136 self.rpc.call_impexp_cleanup.side_effect = ImpExpCleanup
138 self.rpc.call_finalize_export.return_value = \
139 self.RpcResultsBuilder() \
140 .CreateSuccessfulNodeResult(self.master, None)
142 def testRemoveRunningInstanceWithoutShutdown(self):
143 inst = self.cfg.AddNewInstance(admin_state=constants.ADMINST_UP)
144 op = opcodes.OpBackupExport(instance_name=inst.name,
145 target_node=self.master.name,
147 remove_instance=True)
148 self.ExecOpCodeExpectOpPrereqError(
149 op, "Can not remove instance without shutting it down before")
151 def testUnsupportedDiskTemplate(self):
152 inst = self.cfg.AddNewInstance(disk_template=constants.DT_FILE)
153 op = opcodes.OpBackupExport(instance_name=inst.name,
154 target_node=self.master.name)
155 self.ExecOpCodeExpectOpPrereqError(
156 op, "Export not supported for instances with file-based disks")
159 class TestLUBackupExportLocalExport(TestLUBackupExportBase):
161 super(TestLUBackupExportLocalExport, self).setUp()
163 self.inst = self.cfg.AddNewInstance()
164 self.target_node = self.cfg.AddNewNode()
165 self.op = opcodes.OpBackupExport(mode=constants.EXPORT_MODE_LOCAL,
166 instance_name=self.inst.name,
167 target_node=self.target_node.name)
169 self.rpc.call_import_start.return_value = \
170 self.RpcResultsBuilder() \
171 .CreateSuccessfulNodeResult(self.target_node, "import_daemon")
173 def testExportWithShutdown(self):
174 inst = self.cfg.AddNewInstance(admin_state=constants.ADMINST_UP)
175 op = self.CopyOpCode(self.op, instance_name=inst.name, shutdown=True)
178 def testExportDeactivatedDisks(self):
179 self.ExecOpCode(self.op)
181 def testExportRemoveInstance(self):
182 op = self.CopyOpCode(self.op, remove_instance=True)
186 class TestLUBackupExportRemoteExport(TestLUBackupExportBase):
188 super(TestLUBackupExportRemoteExport, self).setUp()
190 self.inst = self.cfg.AddNewInstance()
191 self.op = opcodes.OpBackupExport(mode=constants.EXPORT_MODE_REMOTE,
192 instance_name=self.inst.name,
194 x509_key_name=["mock_key_name"],
195 destination_x509_ca="mock_dest_ca")
197 def testRemoteExportWithoutX509KeyName(self):
198 op = self.CopyOpCode(self.op, x509_key_name=self.REMOVE)
199 self.ExecOpCodeExpectOpPrereqError(op,
200 "Missing X509 key name for encryption")
202 def testRemoteExportWithoutX509DestCa(self):
203 op = self.CopyOpCode(self.op, destination_x509_ca=self.REMOVE)
204 self.ExecOpCodeExpectOpPrereqError(op,
205 "Missing destination X509 CA")
208 if __name__ == "__main__":
209 testutils.GanetiTestProgram()