Add support for job priority to opcodes and job queue objects
[ganeti-local] / test / ganeti.jqueue_unittest.py
1 #!/usr/bin/python
2 #
3
4 # Copyright (C) 2010 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Script for testing ganeti.jqueue"""
23
24 import os
25 import sys
26 import unittest
27 import tempfile
28 import shutil
29 import errno
30
31 from ganeti import constants
32 from ganeti import utils
33 from ganeti import errors
34 from ganeti import jqueue
35 from ganeti import opcodes
36 from ganeti import compat
37
38 import testutils
39
40
41 class _FakeJob:
42   def __init__(self, job_id, status):
43     self.id = job_id
44     self._status = status
45     self._log = []
46
47   def SetStatus(self, status):
48     self._status = status
49
50   def AddLogEntry(self, msg):
51     self._log.append((len(self._log), msg))
52
53   def CalcStatus(self):
54     return self._status
55
56   def GetInfo(self, fields):
57     result = []
58
59     for name in fields:
60       if name == "status":
61         result.append(self._status)
62       else:
63         raise Exception("Unknown field")
64
65     return result
66
67   def GetLogEntries(self, newer_than):
68     assert newer_than is None or newer_than >= 0
69
70     if newer_than is None:
71       return self._log
72
73     return self._log[newer_than:]
74
75
76 class TestJobChangesChecker(unittest.TestCase):
77   def testStatus(self):
78     job = _FakeJob(9094, constants.JOB_STATUS_QUEUED)
79     checker = jqueue._JobChangesChecker(["status"], None, None)
80     self.assertEqual(checker(job), ([constants.JOB_STATUS_QUEUED], []))
81
82     job.SetStatus(constants.JOB_STATUS_RUNNING)
83     self.assertEqual(checker(job), ([constants.JOB_STATUS_RUNNING], []))
84
85     job.SetStatus(constants.JOB_STATUS_SUCCESS)
86     self.assertEqual(checker(job), ([constants.JOB_STATUS_SUCCESS], []))
87
88     # job.id is used by checker
89     self.assertEqual(job.id, 9094)
90
91   def testStatusWithPrev(self):
92     job = _FakeJob(12807, constants.JOB_STATUS_QUEUED)
93     checker = jqueue._JobChangesChecker(["status"],
94                                         [constants.JOB_STATUS_QUEUED], None)
95     self.assert_(checker(job) is None)
96
97     job.SetStatus(constants.JOB_STATUS_RUNNING)
98     self.assertEqual(checker(job), ([constants.JOB_STATUS_RUNNING], []))
99
100   def testFinalStatus(self):
101     for status in constants.JOBS_FINALIZED:
102       job = _FakeJob(2178711, status)
103       checker = jqueue._JobChangesChecker(["status"], [status], None)
104       # There won't be any changes in this status, hence it should signal
105       # a change immediately
106       self.assertEqual(checker(job), ([status], []))
107
108   def testLog(self):
109     job = _FakeJob(9094, constants.JOB_STATUS_RUNNING)
110     checker = jqueue._JobChangesChecker(["status"], None, None)
111     self.assertEqual(checker(job), ([constants.JOB_STATUS_RUNNING], []))
112
113     job.AddLogEntry("Hello World")
114     (job_info, log_entries) = checker(job)
115     self.assertEqual(job_info, [constants.JOB_STATUS_RUNNING])
116     self.assertEqual(log_entries, [[0, "Hello World"]])
117
118     checker2 = jqueue._JobChangesChecker(["status"], job_info, len(log_entries))
119     self.assert_(checker2(job) is None)
120
121     job.AddLogEntry("Foo Bar")
122     job.SetStatus(constants.JOB_STATUS_ERROR)
123
124     (job_info, log_entries) = checker2(job)
125     self.assertEqual(job_info, [constants.JOB_STATUS_ERROR])
126     self.assertEqual(log_entries, [[1, "Foo Bar"]])
127
128     checker3 = jqueue._JobChangesChecker(["status"], None, None)
129     (job_info, log_entries) = checker3(job)
130     self.assertEqual(job_info, [constants.JOB_STATUS_ERROR])
131     self.assertEqual(log_entries, [[0, "Hello World"], [1, "Foo Bar"]])
132
133
134 class TestJobChangesWaiter(unittest.TestCase):
135   def setUp(self):
136     self.tmpdir = tempfile.mkdtemp()
137     self.filename = utils.PathJoin(self.tmpdir, "job-1")
138     utils.WriteFile(self.filename, data="")
139
140   def tearDown(self):
141     shutil.rmtree(self.tmpdir)
142
143   def _EnsureNotifierClosed(self, notifier):
144     try:
145       os.fstat(notifier._fd)
146     except EnvironmentError, err:
147       self.assertEqual(err.errno, errno.EBADF)
148     else:
149       self.fail("File descriptor wasn't closed")
150
151   def testClose(self):
152     for wait in [False, True]:
153       waiter = jqueue._JobFileChangesWaiter(self.filename)
154       try:
155         if wait:
156           waiter.Wait(0.001)
157       finally:
158         waiter.Close()
159
160       # Ensure file descriptor was closed
161       self._EnsureNotifierClosed(waiter._notifier)
162
163   def testChangingFile(self):
164     waiter = jqueue._JobFileChangesWaiter(self.filename)
165     try:
166       self.assertFalse(waiter.Wait(0.1))
167       utils.WriteFile(self.filename, data="changed")
168       self.assert_(waiter.Wait(60))
169     finally:
170       waiter.Close()
171
172     self._EnsureNotifierClosed(waiter._notifier)
173
174   def testChangingFile2(self):
175     waiter = jqueue._JobChangesWaiter(self.filename)
176     try:
177       self.assertFalse(waiter._filewaiter)
178       self.assert_(waiter.Wait(0.1))
179       self.assert_(waiter._filewaiter)
180
181       # File waiter is now used, but there have been no changes
182       self.assertFalse(waiter.Wait(0.1))
183       utils.WriteFile(self.filename, data="changed")
184       self.assert_(waiter.Wait(60))
185     finally:
186       waiter.Close()
187
188     self._EnsureNotifierClosed(waiter._filewaiter._notifier)
189
190
191 class TestWaitForJobChangesHelper(unittest.TestCase):
192   def setUp(self):
193     self.tmpdir = tempfile.mkdtemp()
194     self.filename = utils.PathJoin(self.tmpdir, "job-2614226563")
195     utils.WriteFile(self.filename, data="")
196
197   def tearDown(self):
198     shutil.rmtree(self.tmpdir)
199
200   def _LoadWaitingJob(self):
201     return _FakeJob(2614226563, constants.JOB_STATUS_WAITLOCK)
202
203   def _LoadLostJob(self):
204     return None
205
206   def testNoChanges(self):
207     wfjc = jqueue._WaitForJobChangesHelper()
208
209     # No change
210     self.assertEqual(wfjc(self.filename, self._LoadWaitingJob, ["status"],
211                           [constants.JOB_STATUS_WAITLOCK], None, 0.1),
212                      constants.JOB_NOTCHANGED)
213
214     # No previous information
215     self.assertEqual(wfjc(self.filename, self._LoadWaitingJob,
216                           ["status"], None, None, 1.0),
217                      ([constants.JOB_STATUS_WAITLOCK], []))
218
219   def testLostJob(self):
220     wfjc = jqueue._WaitForJobChangesHelper()
221     self.assert_(wfjc(self.filename, self._LoadLostJob,
222                       ["status"], None, None, 1.0) is None)
223
224
225 class TestEncodeOpError(unittest.TestCase):
226   def test(self):
227     encerr = jqueue._EncodeOpError(errors.LockError("Test 1"))
228     self.assert_(isinstance(encerr, tuple))
229     self.assertRaises(errors.LockError, errors.MaybeRaise, encerr)
230
231     encerr = jqueue._EncodeOpError(errors.GenericError("Test 2"))
232     self.assert_(isinstance(encerr, tuple))
233     self.assertRaises(errors.GenericError, errors.MaybeRaise, encerr)
234
235     encerr = jqueue._EncodeOpError(NotImplementedError("Foo"))
236     self.assert_(isinstance(encerr, tuple))
237     self.assertRaises(errors.OpExecError, errors.MaybeRaise, encerr)
238
239     encerr = jqueue._EncodeOpError("Hello World")
240     self.assert_(isinstance(encerr, tuple))
241     self.assertRaises(errors.OpExecError, errors.MaybeRaise, encerr)
242
243
244 class TestQueuedOpCode(unittest.TestCase):
245   def testDefaults(self):
246     def _Check(op):
247       self.assertFalse(hasattr(op.input, "dry_run"))
248       self.assertEqual(op.priority, constants.OP_PRIO_DEFAULT)
249       self.assertFalse(op.log)
250       self.assert_(op.start_timestamp is None)
251       self.assert_(op.exec_timestamp is None)
252       self.assert_(op.end_timestamp is None)
253       self.assert_(op.result is None)
254       self.assertEqual(op.status, constants.OP_STATUS_QUEUED)
255
256     op1 = jqueue._QueuedOpCode(opcodes.OpTestDelay())
257     _Check(op1)
258     op2 = jqueue._QueuedOpCode.Restore(op1.Serialize())
259     _Check(op2)
260     self.assertEqual(op1.Serialize(), op2.Serialize())
261
262   def testPriority(self):
263     def _Check(op):
264       assert constants.OP_PRIO_DEFAULT != constants.OP_PRIO_HIGH, \
265              "Default priority equals high priority; test can't work"
266       self.assertEqual(op.priority, constants.OP_PRIO_HIGH)
267       self.assertEqual(op.status, constants.OP_STATUS_QUEUED)
268
269     inpop = opcodes.OpGetTags(priority=constants.OP_PRIO_HIGH)
270     op1 = jqueue._QueuedOpCode(inpop)
271     _Check(op1)
272     op2 = jqueue._QueuedOpCode.Restore(op1.Serialize())
273     _Check(op2)
274     self.assertEqual(op1.Serialize(), op2.Serialize())
275
276
277 class TestQueuedJob(unittest.TestCase):
278   def testDefaults(self):
279     job_id = 4260
280     ops = [
281       opcodes.OpGetTags(),
282       opcodes.OpTestDelay(),
283       ]
284
285     def _Check(job):
286       self.assertEqual(job.id, job_id)
287       self.assertEqual(job.log_serial, 0)
288       self.assert_(job.received_timestamp)
289       self.assert_(job.start_timestamp is None)
290       self.assert_(job.end_timestamp is None)
291       self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
292       self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT)
293       self.assert_(repr(job).startswith("<"))
294       self.assertEqual(len(job.ops), len(ops))
295       self.assert_(compat.all(inp.__getstate__() == op.input.__getstate__()
296                               for (inp, op) in zip(ops, job.ops)))
297
298     job1 = jqueue._QueuedJob(None, job_id, ops)
299     _Check(job1)
300     job2 = jqueue._QueuedJob.Restore(None, job1.Serialize())
301     _Check(job2)
302     self.assertEqual(job1.Serialize(), job2.Serialize())
303
304   def testPriority(self):
305     job_id = 4283
306     ops = [
307       opcodes.OpGetTags(priority=constants.OP_PRIO_DEFAULT),
308       opcodes.OpTestDelay(),
309       ]
310
311     def _Check(job):
312       self.assertEqual(job.id, job_id)
313       self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
314       self.assert_(repr(job).startswith("<"))
315
316     job = jqueue._QueuedJob(None, job_id, ops)
317     _Check(job)
318     self.assert_(compat.all(op.priority == constants.OP_PRIO_DEFAULT
319                             for op in job.ops))
320     self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT)
321
322     # Increase first
323     job.ops[0].priority -= 1
324     _Check(job)
325     self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT - 1)
326
327     # Mark opcode as finished
328     job.ops[0].status = constants.OP_STATUS_SUCCESS
329     _Check(job)
330     self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT)
331
332     # Increase second
333     job.ops[1].priority -= 10
334     self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT - 10)
335
336     # Test increasing first
337     job.ops[0].status = constants.OP_STATUS_RUNNING
338     job.ops[0].priority -= 19
339     self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT - 20)
340
341
342 if __name__ == "__main__":
343   testutils.GanetiTestProgram()