4 # Copyright (C) 2010 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Script for testing ganeti.jqueue"""
31 from ganeti import constants
32 from ganeti import utils
33 from ganeti import errors
34 from ganeti import jqueue
40 def __init__(self, job_id, status):
45 def SetStatus(self, status):
48 def AddLogEntry(self, msg):
49 self._log.append((len(self._log), msg))
54 def GetInfo(self, fields):
59 result.append(self._status)
61 raise Exception("Unknown field")
65 def GetLogEntries(self, newer_than):
66 assert newer_than is None or newer_than >= 0
68 if newer_than is None:
71 return self._log[newer_than:]
74 class TestJobChangesChecker(unittest.TestCase):
76 job = _FakeJob(9094, constants.JOB_STATUS_QUEUED)
77 checker = jqueue._JobChangesChecker(["status"], None, None)
78 self.assertEqual(checker(job), ([constants.JOB_STATUS_QUEUED], []))
80 job.SetStatus(constants.JOB_STATUS_RUNNING)
81 self.assertEqual(checker(job), ([constants.JOB_STATUS_RUNNING], []))
83 job.SetStatus(constants.JOB_STATUS_SUCCESS)
84 self.assertEqual(checker(job), ([constants.JOB_STATUS_SUCCESS], []))
86 # job.id is used by checker
87 self.assertEqual(job.id, 9094)
89 def testStatusWithPrev(self):
90 job = _FakeJob(12807, constants.JOB_STATUS_QUEUED)
91 checker = jqueue._JobChangesChecker(["status"],
92 [constants.JOB_STATUS_QUEUED], None)
93 self.assert_(checker(job) is None)
95 job.SetStatus(constants.JOB_STATUS_RUNNING)
96 self.assertEqual(checker(job), ([constants.JOB_STATUS_RUNNING], []))
98 def testFinalStatus(self):
99 for status in constants.JOBS_FINALIZED:
100 job = _FakeJob(2178711, status)
101 checker = jqueue._JobChangesChecker(["status"], [status], None)
102 # There won't be any changes in this status, hence it should signal
103 # a change immediately
104 self.assertEqual(checker(job), ([status], []))
107 job = _FakeJob(9094, constants.JOB_STATUS_RUNNING)
108 checker = jqueue._JobChangesChecker(["status"], None, None)
109 self.assertEqual(checker(job), ([constants.JOB_STATUS_RUNNING], []))
111 job.AddLogEntry("Hello World")
112 (job_info, log_entries) = checker(job)
113 self.assertEqual(job_info, [constants.JOB_STATUS_RUNNING])
114 self.assertEqual(log_entries, [[0, "Hello World"]])
116 checker2 = jqueue._JobChangesChecker(["status"], job_info, len(log_entries))
117 self.assert_(checker2(job) is None)
119 job.AddLogEntry("Foo Bar")
120 job.SetStatus(constants.JOB_STATUS_ERROR)
122 (job_info, log_entries) = checker2(job)
123 self.assertEqual(job_info, [constants.JOB_STATUS_ERROR])
124 self.assertEqual(log_entries, [[1, "Foo Bar"]])
126 checker3 = jqueue._JobChangesChecker(["status"], None, None)
127 (job_info, log_entries) = checker3(job)
128 self.assertEqual(job_info, [constants.JOB_STATUS_ERROR])
129 self.assertEqual(log_entries, [[0, "Hello World"], [1, "Foo Bar"]])
132 class TestJobChangesWaiter(unittest.TestCase):
134 self.tmpdir = tempfile.mkdtemp()
135 self.filename = utils.PathJoin(self.tmpdir, "job-1")
136 utils.WriteFile(self.filename, data="")
139 shutil.rmtree(self.tmpdir)
141 def _EnsureNotifierClosed(self, notifier):
143 os.fstat(notifier._fd)
144 except EnvironmentError, err:
145 self.assertEqual(err.errno, errno.EBADF)
147 self.fail("File descriptor wasn't closed")
150 for wait in [False, True]:
151 waiter = jqueue._JobFileChangesWaiter(self.filename)
158 # Ensure file descriptor was closed
159 self._EnsureNotifierClosed(waiter._notifier)
161 def testChangingFile(self):
162 waiter = jqueue._JobFileChangesWaiter(self.filename)
164 self.assertFalse(waiter.Wait(0.1))
165 utils.WriteFile(self.filename, data="changed")
166 self.assert_(waiter.Wait(60))
170 self._EnsureNotifierClosed(waiter._notifier)
172 def testChangingFile2(self):
173 waiter = jqueue._JobChangesWaiter(self.filename)
175 self.assertFalse(waiter._filewaiter)
176 self.assert_(waiter.Wait(0.1))
177 self.assert_(waiter._filewaiter)
179 # File waiter is now used, but there have been no changes
180 self.assertFalse(waiter.Wait(0.1))
181 utils.WriteFile(self.filename, data="changed")
182 self.assert_(waiter.Wait(60))
186 self._EnsureNotifierClosed(waiter._filewaiter._notifier)
189 class TestWaitForJobChangesHelper(unittest.TestCase):
191 self.tmpdir = tempfile.mkdtemp()
192 self.filename = utils.PathJoin(self.tmpdir, "job-2614226563")
193 utils.WriteFile(self.filename, data="")
196 shutil.rmtree(self.tmpdir)
198 def _LoadWaitingJob(self):
199 return _FakeJob(2614226563, constants.JOB_STATUS_WAITLOCK)
201 def _LoadLostJob(self):
204 def testNoChanges(self):
205 wfjc = jqueue._WaitForJobChangesHelper()
208 self.assertEqual(wfjc(self.filename, self._LoadWaitingJob, ["status"],
209 [constants.JOB_STATUS_WAITLOCK], None, 0.1),
210 constants.JOB_NOTCHANGED)
212 # No previous information
213 self.assertEqual(wfjc(self.filename, self._LoadWaitingJob,
214 ["status"], None, None, 1.0),
215 ([constants.JOB_STATUS_WAITLOCK], []))
217 def testLostJob(self):
218 wfjc = jqueue._WaitForJobChangesHelper()
219 self.assert_(wfjc(self.filename, self._LoadLostJob,
220 ["status"], None, None, 1.0) is None)
223 class TestEncodeOpError(unittest.TestCase):
225 encerr = jqueue._EncodeOpError(errors.LockError("Test 1"))
226 self.assert_(isinstance(encerr, tuple))
227 self.assertRaises(errors.LockError, errors.MaybeRaise, encerr)
229 encerr = jqueue._EncodeOpError(errors.GenericError("Test 2"))
230 self.assert_(isinstance(encerr, tuple))
231 self.assertRaises(errors.GenericError, errors.MaybeRaise, encerr)
233 encerr = jqueue._EncodeOpError(NotImplementedError("Foo"))
234 self.assert_(isinstance(encerr, tuple))
235 self.assertRaises(errors.OpExecError, errors.MaybeRaise, encerr)
237 encerr = jqueue._EncodeOpError("Hello World")
238 self.assert_(isinstance(encerr, tuple))
239 self.assertRaises(errors.OpExecError, errors.MaybeRaise, encerr)
242 if __name__ == "__main__":
243 testutils.GanetiTestProgram()