4 # Copyright (C) 2010 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Script for testing ganeti.jqueue"""
31 from ganeti import constants
32 from ganeti import utils
33 from ganeti import errors
34 from ganeti import jqueue
35 from ganeti import opcodes
36 from ganeti import compat
42 def __init__(self, job_id, status):
47 def SetStatus(self, status):
50 def AddLogEntry(self, msg):
51 self._log.append((len(self._log), msg))
56 def GetInfo(self, fields):
61 result.append(self._status)
63 raise Exception("Unknown field")
67 def GetLogEntries(self, newer_than):
68 assert newer_than is None or newer_than >= 0
70 if newer_than is None:
73 return self._log[newer_than:]
76 class TestJobChangesChecker(unittest.TestCase):
78 job = _FakeJob(9094, constants.JOB_STATUS_QUEUED)
79 checker = jqueue._JobChangesChecker(["status"], None, None)
80 self.assertEqual(checker(job), ([constants.JOB_STATUS_QUEUED], []))
82 job.SetStatus(constants.JOB_STATUS_RUNNING)
83 self.assertEqual(checker(job), ([constants.JOB_STATUS_RUNNING], []))
85 job.SetStatus(constants.JOB_STATUS_SUCCESS)
86 self.assertEqual(checker(job), ([constants.JOB_STATUS_SUCCESS], []))
88 # job.id is used by checker
89 self.assertEqual(job.id, 9094)
91 def testStatusWithPrev(self):
92 job = _FakeJob(12807, constants.JOB_STATUS_QUEUED)
93 checker = jqueue._JobChangesChecker(["status"],
94 [constants.JOB_STATUS_QUEUED], None)
95 self.assert_(checker(job) is None)
97 job.SetStatus(constants.JOB_STATUS_RUNNING)
98 self.assertEqual(checker(job), ([constants.JOB_STATUS_RUNNING], []))
100 def testFinalStatus(self):
101 for status in constants.JOBS_FINALIZED:
102 job = _FakeJob(2178711, status)
103 checker = jqueue._JobChangesChecker(["status"], [status], None)
104 # There won't be any changes in this status, hence it should signal
105 # a change immediately
106 self.assertEqual(checker(job), ([status], []))
109 job = _FakeJob(9094, constants.JOB_STATUS_RUNNING)
110 checker = jqueue._JobChangesChecker(["status"], None, None)
111 self.assertEqual(checker(job), ([constants.JOB_STATUS_RUNNING], []))
113 job.AddLogEntry("Hello World")
114 (job_info, log_entries) = checker(job)
115 self.assertEqual(job_info, [constants.JOB_STATUS_RUNNING])
116 self.assertEqual(log_entries, [[0, "Hello World"]])
118 checker2 = jqueue._JobChangesChecker(["status"], job_info, len(log_entries))
119 self.assert_(checker2(job) is None)
121 job.AddLogEntry("Foo Bar")
122 job.SetStatus(constants.JOB_STATUS_ERROR)
124 (job_info, log_entries) = checker2(job)
125 self.assertEqual(job_info, [constants.JOB_STATUS_ERROR])
126 self.assertEqual(log_entries, [[1, "Foo Bar"]])
128 checker3 = jqueue._JobChangesChecker(["status"], None, None)
129 (job_info, log_entries) = checker3(job)
130 self.assertEqual(job_info, [constants.JOB_STATUS_ERROR])
131 self.assertEqual(log_entries, [[0, "Hello World"], [1, "Foo Bar"]])
134 class TestJobChangesWaiter(unittest.TestCase):
136 self.tmpdir = tempfile.mkdtemp()
137 self.filename = utils.PathJoin(self.tmpdir, "job-1")
138 utils.WriteFile(self.filename, data="")
141 shutil.rmtree(self.tmpdir)
143 def _EnsureNotifierClosed(self, notifier):
145 os.fstat(notifier._fd)
146 except EnvironmentError, err:
147 self.assertEqual(err.errno, errno.EBADF)
149 self.fail("File descriptor wasn't closed")
152 for wait in [False, True]:
153 waiter = jqueue._JobFileChangesWaiter(self.filename)
160 # Ensure file descriptor was closed
161 self._EnsureNotifierClosed(waiter._notifier)
163 def testChangingFile(self):
164 waiter = jqueue._JobFileChangesWaiter(self.filename)
166 self.assertFalse(waiter.Wait(0.1))
167 utils.WriteFile(self.filename, data="changed")
168 self.assert_(waiter.Wait(60))
172 self._EnsureNotifierClosed(waiter._notifier)
174 def testChangingFile2(self):
175 waiter = jqueue._JobChangesWaiter(self.filename)
177 self.assertFalse(waiter._filewaiter)
178 self.assert_(waiter.Wait(0.1))
179 self.assert_(waiter._filewaiter)
181 # File waiter is now used, but there have been no changes
182 self.assertFalse(waiter.Wait(0.1))
183 utils.WriteFile(self.filename, data="changed")
184 self.assert_(waiter.Wait(60))
188 self._EnsureNotifierClosed(waiter._filewaiter._notifier)
191 class TestWaitForJobChangesHelper(unittest.TestCase):
193 self.tmpdir = tempfile.mkdtemp()
194 self.filename = utils.PathJoin(self.tmpdir, "job-2614226563")
195 utils.WriteFile(self.filename, data="")
198 shutil.rmtree(self.tmpdir)
200 def _LoadWaitingJob(self):
201 return _FakeJob(2614226563, constants.JOB_STATUS_WAITLOCK)
203 def _LoadLostJob(self):
206 def testNoChanges(self):
207 wfjc = jqueue._WaitForJobChangesHelper()
210 self.assertEqual(wfjc(self.filename, self._LoadWaitingJob, ["status"],
211 [constants.JOB_STATUS_WAITLOCK], None, 0.1),
212 constants.JOB_NOTCHANGED)
214 # No previous information
215 self.assertEqual(wfjc(self.filename, self._LoadWaitingJob,
216 ["status"], None, None, 1.0),
217 ([constants.JOB_STATUS_WAITLOCK], []))
219 def testLostJob(self):
220 wfjc = jqueue._WaitForJobChangesHelper()
221 self.assert_(wfjc(self.filename, self._LoadLostJob,
222 ["status"], None, None, 1.0) is None)
225 class TestEncodeOpError(unittest.TestCase):
227 encerr = jqueue._EncodeOpError(errors.LockError("Test 1"))
228 self.assert_(isinstance(encerr, tuple))
229 self.assertRaises(errors.LockError, errors.MaybeRaise, encerr)
231 encerr = jqueue._EncodeOpError(errors.GenericError("Test 2"))
232 self.assert_(isinstance(encerr, tuple))
233 self.assertRaises(errors.GenericError, errors.MaybeRaise, encerr)
235 encerr = jqueue._EncodeOpError(NotImplementedError("Foo"))
236 self.assert_(isinstance(encerr, tuple))
237 self.assertRaises(errors.OpExecError, errors.MaybeRaise, encerr)
239 encerr = jqueue._EncodeOpError("Hello World")
240 self.assert_(isinstance(encerr, tuple))
241 self.assertRaises(errors.OpExecError, errors.MaybeRaise, encerr)
244 class TestQueuedOpCode(unittest.TestCase):
245 def testDefaults(self):
247 self.assertFalse(hasattr(op.input, "dry_run"))
248 self.assertEqual(op.priority, constants.OP_PRIO_DEFAULT)
249 self.assertFalse(op.log)
250 self.assert_(op.start_timestamp is None)
251 self.assert_(op.exec_timestamp is None)
252 self.assert_(op.end_timestamp is None)
253 self.assert_(op.result is None)
254 self.assertEqual(op.status, constants.OP_STATUS_QUEUED)
256 op1 = jqueue._QueuedOpCode(opcodes.OpTestDelay())
258 op2 = jqueue._QueuedOpCode.Restore(op1.Serialize())
260 self.assertEqual(op1.Serialize(), op2.Serialize())
262 def testPriority(self):
264 assert constants.OP_PRIO_DEFAULT != constants.OP_PRIO_HIGH, \
265 "Default priority equals high priority; test can't work"
266 self.assertEqual(op.priority, constants.OP_PRIO_HIGH)
267 self.assertEqual(op.status, constants.OP_STATUS_QUEUED)
269 inpop = opcodes.OpGetTags(priority=constants.OP_PRIO_HIGH)
270 op1 = jqueue._QueuedOpCode(inpop)
272 op2 = jqueue._QueuedOpCode.Restore(op1.Serialize())
274 self.assertEqual(op1.Serialize(), op2.Serialize())
277 class TestQueuedJob(unittest.TestCase):
278 def testDefaults(self):
282 opcodes.OpTestDelay(),
286 self.assertEqual(job.id, job_id)
287 self.assertEqual(job.log_serial, 0)
288 self.assert_(job.received_timestamp)
289 self.assert_(job.start_timestamp is None)
290 self.assert_(job.end_timestamp is None)
291 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
292 self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT)
293 self.assert_(repr(job).startswith("<"))
294 self.assertEqual(len(job.ops), len(ops))
295 self.assert_(compat.all(inp.__getstate__() == op.input.__getstate__()
296 for (inp, op) in zip(ops, job.ops)))
298 job1 = jqueue._QueuedJob(None, job_id, ops)
300 job2 = jqueue._QueuedJob.Restore(None, job1.Serialize())
302 self.assertEqual(job1.Serialize(), job2.Serialize())
304 def testPriority(self):
307 opcodes.OpGetTags(priority=constants.OP_PRIO_DEFAULT),
308 opcodes.OpTestDelay(),
312 self.assertEqual(job.id, job_id)
313 self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
314 self.assert_(repr(job).startswith("<"))
316 job = jqueue._QueuedJob(None, job_id, ops)
318 self.assert_(compat.all(op.priority == constants.OP_PRIO_DEFAULT
320 self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT)
323 job.ops[0].priority -= 1
325 self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT - 1)
327 # Mark opcode as finished
328 job.ops[0].status = constants.OP_STATUS_SUCCESS
330 self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT)
333 job.ops[1].priority -= 10
334 self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT - 10)
336 # Test increasing first
337 job.ops[0].status = constants.OP_STATUS_RUNNING
338 job.ops[0].priority -= 19
339 self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT - 20)
342 if __name__ == "__main__":
343 testutils.GanetiTestProgram()