Statistics
| Branch: | Tag: | Revision:

root / test / ganeti.jqueue_unittest.py @ 26d3fd2f

History | View | Annotate | Download (35.4 kB)

1 989a8bee Michael Hanselmann
#!/usr/bin/python
2 989a8bee Michael Hanselmann
#
3 989a8bee Michael Hanselmann
4 989a8bee Michael Hanselmann
# Copyright (C) 2010 Google Inc.
5 989a8bee Michael Hanselmann
#
6 989a8bee Michael Hanselmann
# This program is free software; you can redistribute it and/or modify
7 989a8bee Michael Hanselmann
# it under the terms of the GNU General Public License as published by
8 989a8bee Michael Hanselmann
# the Free Software Foundation; either version 2 of the License, or
9 989a8bee Michael Hanselmann
# (at your option) any later version.
10 989a8bee Michael Hanselmann
#
11 989a8bee Michael Hanselmann
# This program is distributed in the hope that it will be useful, but
12 989a8bee Michael Hanselmann
# WITHOUT ANY WARRANTY; without even the implied warranty of
13 989a8bee Michael Hanselmann
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 989a8bee Michael Hanselmann
# General Public License for more details.
15 989a8bee Michael Hanselmann
#
16 989a8bee Michael Hanselmann
# You should have received a copy of the GNU General Public License
17 989a8bee Michael Hanselmann
# along with this program; if not, write to the Free Software
18 989a8bee Michael Hanselmann
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 989a8bee Michael Hanselmann
# 02110-1301, USA.
20 989a8bee Michael Hanselmann
21 989a8bee Michael Hanselmann
22 989a8bee Michael Hanselmann
"""Script for testing ganeti.jqueue"""
23 989a8bee Michael Hanselmann
24 989a8bee Michael Hanselmann
import os
25 989a8bee Michael Hanselmann
import sys
26 989a8bee Michael Hanselmann
import unittest
27 989a8bee Michael Hanselmann
import tempfile
28 989a8bee Michael Hanselmann
import shutil
29 989a8bee Michael Hanselmann
import errno
30 26d3fd2f Michael Hanselmann
import itertools
31 989a8bee Michael Hanselmann
32 989a8bee Michael Hanselmann
from ganeti import constants
33 989a8bee Michael Hanselmann
from ganeti import utils
34 989a8bee Michael Hanselmann
from ganeti import errors
35 989a8bee Michael Hanselmann
from ganeti import jqueue
36 8f5c488d Michael Hanselmann
from ganeti import opcodes
37 8f5c488d Michael Hanselmann
from ganeti import compat
38 26d3fd2f Michael Hanselmann
from ganeti import mcpu
39 989a8bee Michael Hanselmann
40 989a8bee Michael Hanselmann
import testutils
41 989a8bee Michael Hanselmann
42 989a8bee Michael Hanselmann
43 989a8bee Michael Hanselmann
class _FakeJob:
44 989a8bee Michael Hanselmann
  def __init__(self, job_id, status):
45 989a8bee Michael Hanselmann
    self.id = job_id
46 989a8bee Michael Hanselmann
    self._status = status
47 989a8bee Michael Hanselmann
    self._log = []
48 989a8bee Michael Hanselmann
49 989a8bee Michael Hanselmann
  def SetStatus(self, status):
50 989a8bee Michael Hanselmann
    self._status = status
51 989a8bee Michael Hanselmann
52 989a8bee Michael Hanselmann
  def AddLogEntry(self, msg):
53 989a8bee Michael Hanselmann
    self._log.append((len(self._log), msg))
54 989a8bee Michael Hanselmann
55 989a8bee Michael Hanselmann
  def CalcStatus(self):
56 989a8bee Michael Hanselmann
    return self._status
57 989a8bee Michael Hanselmann
58 989a8bee Michael Hanselmann
  def GetInfo(self, fields):
59 989a8bee Michael Hanselmann
    result = []
60 989a8bee Michael Hanselmann
61 989a8bee Michael Hanselmann
    for name in fields:
62 989a8bee Michael Hanselmann
      if name == "status":
63 989a8bee Michael Hanselmann
        result.append(self._status)
64 989a8bee Michael Hanselmann
      else:
65 989a8bee Michael Hanselmann
        raise Exception("Unknown field")
66 989a8bee Michael Hanselmann
67 989a8bee Michael Hanselmann
    return result
68 989a8bee Michael Hanselmann
69 989a8bee Michael Hanselmann
  def GetLogEntries(self, newer_than):
70 989a8bee Michael Hanselmann
    assert newer_than is None or newer_than >= 0
71 989a8bee Michael Hanselmann
72 989a8bee Michael Hanselmann
    if newer_than is None:
73 989a8bee Michael Hanselmann
      return self._log
74 989a8bee Michael Hanselmann
75 989a8bee Michael Hanselmann
    return self._log[newer_than:]
76 989a8bee Michael Hanselmann
77 989a8bee Michael Hanselmann
78 989a8bee Michael Hanselmann
class TestJobChangesChecker(unittest.TestCase):
79 989a8bee Michael Hanselmann
  def testStatus(self):
80 989a8bee Michael Hanselmann
    job = _FakeJob(9094, constants.JOB_STATUS_QUEUED)
81 989a8bee Michael Hanselmann
    checker = jqueue._JobChangesChecker(["status"], None, None)
82 989a8bee Michael Hanselmann
    self.assertEqual(checker(job), ([constants.JOB_STATUS_QUEUED], []))
83 989a8bee Michael Hanselmann
84 989a8bee Michael Hanselmann
    job.SetStatus(constants.JOB_STATUS_RUNNING)
85 989a8bee Michael Hanselmann
    self.assertEqual(checker(job), ([constants.JOB_STATUS_RUNNING], []))
86 989a8bee Michael Hanselmann
87 989a8bee Michael Hanselmann
    job.SetStatus(constants.JOB_STATUS_SUCCESS)
88 989a8bee Michael Hanselmann
    self.assertEqual(checker(job), ([constants.JOB_STATUS_SUCCESS], []))
89 989a8bee Michael Hanselmann
90 989a8bee Michael Hanselmann
    # job.id is used by checker
91 989a8bee Michael Hanselmann
    self.assertEqual(job.id, 9094)
92 989a8bee Michael Hanselmann
93 989a8bee Michael Hanselmann
  def testStatusWithPrev(self):
94 989a8bee Michael Hanselmann
    job = _FakeJob(12807, constants.JOB_STATUS_QUEUED)
95 989a8bee Michael Hanselmann
    checker = jqueue._JobChangesChecker(["status"],
96 989a8bee Michael Hanselmann
                                        [constants.JOB_STATUS_QUEUED], None)
97 989a8bee Michael Hanselmann
    self.assert_(checker(job) is None)
98 989a8bee Michael Hanselmann
99 989a8bee Michael Hanselmann
    job.SetStatus(constants.JOB_STATUS_RUNNING)
100 989a8bee Michael Hanselmann
    self.assertEqual(checker(job), ([constants.JOB_STATUS_RUNNING], []))
101 989a8bee Michael Hanselmann
102 989a8bee Michael Hanselmann
  def testFinalStatus(self):
103 989a8bee Michael Hanselmann
    for status in constants.JOBS_FINALIZED:
104 989a8bee Michael Hanselmann
      job = _FakeJob(2178711, status)
105 989a8bee Michael Hanselmann
      checker = jqueue._JobChangesChecker(["status"], [status], None)
106 989a8bee Michael Hanselmann
      # There won't be any changes in this status, hence it should signal
107 989a8bee Michael Hanselmann
      # a change immediately
108 989a8bee Michael Hanselmann
      self.assertEqual(checker(job), ([status], []))
109 989a8bee Michael Hanselmann
110 989a8bee Michael Hanselmann
  def testLog(self):
111 989a8bee Michael Hanselmann
    job = _FakeJob(9094, constants.JOB_STATUS_RUNNING)
112 989a8bee Michael Hanselmann
    checker = jqueue._JobChangesChecker(["status"], None, None)
113 989a8bee Michael Hanselmann
    self.assertEqual(checker(job), ([constants.JOB_STATUS_RUNNING], []))
114 989a8bee Michael Hanselmann
115 989a8bee Michael Hanselmann
    job.AddLogEntry("Hello World")
116 989a8bee Michael Hanselmann
    (job_info, log_entries) = checker(job)
117 989a8bee Michael Hanselmann
    self.assertEqual(job_info, [constants.JOB_STATUS_RUNNING])
118 989a8bee Michael Hanselmann
    self.assertEqual(log_entries, [[0, "Hello World"]])
119 989a8bee Michael Hanselmann
120 989a8bee Michael Hanselmann
    checker2 = jqueue._JobChangesChecker(["status"], job_info, len(log_entries))
121 989a8bee Michael Hanselmann
    self.assert_(checker2(job) is None)
122 989a8bee Michael Hanselmann
123 989a8bee Michael Hanselmann
    job.AddLogEntry("Foo Bar")
124 989a8bee Michael Hanselmann
    job.SetStatus(constants.JOB_STATUS_ERROR)
125 989a8bee Michael Hanselmann
126 989a8bee Michael Hanselmann
    (job_info, log_entries) = checker2(job)
127 989a8bee Michael Hanselmann
    self.assertEqual(job_info, [constants.JOB_STATUS_ERROR])
128 989a8bee Michael Hanselmann
    self.assertEqual(log_entries, [[1, "Foo Bar"]])
129 989a8bee Michael Hanselmann
130 989a8bee Michael Hanselmann
    checker3 = jqueue._JobChangesChecker(["status"], None, None)
131 989a8bee Michael Hanselmann
    (job_info, log_entries) = checker3(job)
132 989a8bee Michael Hanselmann
    self.assertEqual(job_info, [constants.JOB_STATUS_ERROR])
133 989a8bee Michael Hanselmann
    self.assertEqual(log_entries, [[0, "Hello World"], [1, "Foo Bar"]])
134 989a8bee Michael Hanselmann
135 989a8bee Michael Hanselmann
136 989a8bee Michael Hanselmann
class TestJobChangesWaiter(unittest.TestCase):
137 989a8bee Michael Hanselmann
  def setUp(self):
138 989a8bee Michael Hanselmann
    self.tmpdir = tempfile.mkdtemp()
139 989a8bee Michael Hanselmann
    self.filename = utils.PathJoin(self.tmpdir, "job-1")
140 989a8bee Michael Hanselmann
    utils.WriteFile(self.filename, data="")
141 989a8bee Michael Hanselmann
142 989a8bee Michael Hanselmann
  def tearDown(self):
143 989a8bee Michael Hanselmann
    shutil.rmtree(self.tmpdir)
144 989a8bee Michael Hanselmann
145 989a8bee Michael Hanselmann
  def _EnsureNotifierClosed(self, notifier):
146 989a8bee Michael Hanselmann
    try:
147 989a8bee Michael Hanselmann
      os.fstat(notifier._fd)
148 989a8bee Michael Hanselmann
    except EnvironmentError, err:
149 989a8bee Michael Hanselmann
      self.assertEqual(err.errno, errno.EBADF)
150 989a8bee Michael Hanselmann
    else:
151 989a8bee Michael Hanselmann
      self.fail("File descriptor wasn't closed")
152 989a8bee Michael Hanselmann
153 989a8bee Michael Hanselmann
  def testClose(self):
154 989a8bee Michael Hanselmann
    for wait in [False, True]:
155 989a8bee Michael Hanselmann
      waiter = jqueue._JobFileChangesWaiter(self.filename)
156 989a8bee Michael Hanselmann
      try:
157 989a8bee Michael Hanselmann
        if wait:
158 989a8bee Michael Hanselmann
          waiter.Wait(0.001)
159 989a8bee Michael Hanselmann
      finally:
160 989a8bee Michael Hanselmann
        waiter.Close()
161 989a8bee Michael Hanselmann
162 989a8bee Michael Hanselmann
      # Ensure file descriptor was closed
163 989a8bee Michael Hanselmann
      self._EnsureNotifierClosed(waiter._notifier)
164 989a8bee Michael Hanselmann
165 989a8bee Michael Hanselmann
  def testChangingFile(self):
166 989a8bee Michael Hanselmann
    waiter = jqueue._JobFileChangesWaiter(self.filename)
167 989a8bee Michael Hanselmann
    try:
168 989a8bee Michael Hanselmann
      self.assertFalse(waiter.Wait(0.1))
169 989a8bee Michael Hanselmann
      utils.WriteFile(self.filename, data="changed")
170 989a8bee Michael Hanselmann
      self.assert_(waiter.Wait(60))
171 989a8bee Michael Hanselmann
    finally:
172 989a8bee Michael Hanselmann
      waiter.Close()
173 989a8bee Michael Hanselmann
174 989a8bee Michael Hanselmann
    self._EnsureNotifierClosed(waiter._notifier)
175 989a8bee Michael Hanselmann
176 989a8bee Michael Hanselmann
  def testChangingFile2(self):
177 989a8bee Michael Hanselmann
    waiter = jqueue._JobChangesWaiter(self.filename)
178 989a8bee Michael Hanselmann
    try:
179 989a8bee Michael Hanselmann
      self.assertFalse(waiter._filewaiter)
180 989a8bee Michael Hanselmann
      self.assert_(waiter.Wait(0.1))
181 989a8bee Michael Hanselmann
      self.assert_(waiter._filewaiter)
182 989a8bee Michael Hanselmann
183 989a8bee Michael Hanselmann
      # File waiter is now used, but there have been no changes
184 989a8bee Michael Hanselmann
      self.assertFalse(waiter.Wait(0.1))
185 989a8bee Michael Hanselmann
      utils.WriteFile(self.filename, data="changed")
186 989a8bee Michael Hanselmann
      self.assert_(waiter.Wait(60))
187 989a8bee Michael Hanselmann
    finally:
188 989a8bee Michael Hanselmann
      waiter.Close()
189 989a8bee Michael Hanselmann
190 989a8bee Michael Hanselmann
    self._EnsureNotifierClosed(waiter._filewaiter._notifier)
191 989a8bee Michael Hanselmann
192 989a8bee Michael Hanselmann
193 989a8bee Michael Hanselmann
class TestWaitForJobChangesHelper(unittest.TestCase):
194 989a8bee Michael Hanselmann
  def setUp(self):
195 989a8bee Michael Hanselmann
    self.tmpdir = tempfile.mkdtemp()
196 989a8bee Michael Hanselmann
    self.filename = utils.PathJoin(self.tmpdir, "job-2614226563")
197 989a8bee Michael Hanselmann
    utils.WriteFile(self.filename, data="")
198 989a8bee Michael Hanselmann
199 989a8bee Michael Hanselmann
  def tearDown(self):
200 989a8bee Michael Hanselmann
    shutil.rmtree(self.tmpdir)
201 989a8bee Michael Hanselmann
202 989a8bee Michael Hanselmann
  def _LoadWaitingJob(self):
203 989a8bee Michael Hanselmann
    return _FakeJob(2614226563, constants.JOB_STATUS_WAITLOCK)
204 989a8bee Michael Hanselmann
205 989a8bee Michael Hanselmann
  def _LoadLostJob(self):
206 989a8bee Michael Hanselmann
    return None
207 989a8bee Michael Hanselmann
208 989a8bee Michael Hanselmann
  def testNoChanges(self):
209 989a8bee Michael Hanselmann
    wfjc = jqueue._WaitForJobChangesHelper()
210 989a8bee Michael Hanselmann
211 989a8bee Michael Hanselmann
    # No change
212 989a8bee Michael Hanselmann
    self.assertEqual(wfjc(self.filename, self._LoadWaitingJob, ["status"],
213 989a8bee Michael Hanselmann
                          [constants.JOB_STATUS_WAITLOCK], None, 0.1),
214 989a8bee Michael Hanselmann
                     constants.JOB_NOTCHANGED)
215 989a8bee Michael Hanselmann
216 989a8bee Michael Hanselmann
    # No previous information
217 989a8bee Michael Hanselmann
    self.assertEqual(wfjc(self.filename, self._LoadWaitingJob,
218 989a8bee Michael Hanselmann
                          ["status"], None, None, 1.0),
219 989a8bee Michael Hanselmann
                     ([constants.JOB_STATUS_WAITLOCK], []))
220 989a8bee Michael Hanselmann
221 989a8bee Michael Hanselmann
  def testLostJob(self):
222 989a8bee Michael Hanselmann
    wfjc = jqueue._WaitForJobChangesHelper()
223 989a8bee Michael Hanselmann
    self.assert_(wfjc(self.filename, self._LoadLostJob,
224 989a8bee Michael Hanselmann
                      ["status"], None, None, 1.0) is None)
225 989a8bee Michael Hanselmann
226 989a8bee Michael Hanselmann
227 6760e4ed Michael Hanselmann
class TestEncodeOpError(unittest.TestCase):
228 6760e4ed Michael Hanselmann
  def test(self):
229 6760e4ed Michael Hanselmann
    encerr = jqueue._EncodeOpError(errors.LockError("Test 1"))
230 6760e4ed Michael Hanselmann
    self.assert_(isinstance(encerr, tuple))
231 6760e4ed Michael Hanselmann
    self.assertRaises(errors.LockError, errors.MaybeRaise, encerr)
232 6760e4ed Michael Hanselmann
233 6760e4ed Michael Hanselmann
    encerr = jqueue._EncodeOpError(errors.GenericError("Test 2"))
234 6760e4ed Michael Hanselmann
    self.assert_(isinstance(encerr, tuple))
235 6760e4ed Michael Hanselmann
    self.assertRaises(errors.GenericError, errors.MaybeRaise, encerr)
236 6760e4ed Michael Hanselmann
237 6760e4ed Michael Hanselmann
    encerr = jqueue._EncodeOpError(NotImplementedError("Foo"))
238 6760e4ed Michael Hanselmann
    self.assert_(isinstance(encerr, tuple))
239 6760e4ed Michael Hanselmann
    self.assertRaises(errors.OpExecError, errors.MaybeRaise, encerr)
240 6760e4ed Michael Hanselmann
241 6760e4ed Michael Hanselmann
    encerr = jqueue._EncodeOpError("Hello World")
242 6760e4ed Michael Hanselmann
    self.assert_(isinstance(encerr, tuple))
243 6760e4ed Michael Hanselmann
    self.assertRaises(errors.OpExecError, errors.MaybeRaise, encerr)
244 6760e4ed Michael Hanselmann
245 6760e4ed Michael Hanselmann
246 8f5c488d Michael Hanselmann
class TestQueuedOpCode(unittest.TestCase):
247 8f5c488d Michael Hanselmann
  def testDefaults(self):
248 8f5c488d Michael Hanselmann
    def _Check(op):
249 8f5c488d Michael Hanselmann
      self.assertFalse(hasattr(op.input, "dry_run"))
250 8f5c488d Michael Hanselmann
      self.assertEqual(op.priority, constants.OP_PRIO_DEFAULT)
251 8f5c488d Michael Hanselmann
      self.assertFalse(op.log)
252 8f5c488d Michael Hanselmann
      self.assert_(op.start_timestamp is None)
253 8f5c488d Michael Hanselmann
      self.assert_(op.exec_timestamp is None)
254 8f5c488d Michael Hanselmann
      self.assert_(op.end_timestamp is None)
255 8f5c488d Michael Hanselmann
      self.assert_(op.result is None)
256 8f5c488d Michael Hanselmann
      self.assertEqual(op.status, constants.OP_STATUS_QUEUED)
257 8f5c488d Michael Hanselmann
258 8f5c488d Michael Hanselmann
    op1 = jqueue._QueuedOpCode(opcodes.OpTestDelay())
259 8f5c488d Michael Hanselmann
    _Check(op1)
260 8f5c488d Michael Hanselmann
    op2 = jqueue._QueuedOpCode.Restore(op1.Serialize())
261 8f5c488d Michael Hanselmann
    _Check(op2)
262 8f5c488d Michael Hanselmann
    self.assertEqual(op1.Serialize(), op2.Serialize())
263 8f5c488d Michael Hanselmann
264 8f5c488d Michael Hanselmann
  def testPriority(self):
265 8f5c488d Michael Hanselmann
    def _Check(op):
266 8f5c488d Michael Hanselmann
      assert constants.OP_PRIO_DEFAULT != constants.OP_PRIO_HIGH, \
267 8f5c488d Michael Hanselmann
             "Default priority equals high priority; test can't work"
268 8f5c488d Michael Hanselmann
      self.assertEqual(op.priority, constants.OP_PRIO_HIGH)
269 8f5c488d Michael Hanselmann
      self.assertEqual(op.status, constants.OP_STATUS_QUEUED)
270 8f5c488d Michael Hanselmann
271 8f5c488d Michael Hanselmann
    inpop = opcodes.OpGetTags(priority=constants.OP_PRIO_HIGH)
272 8f5c488d Michael Hanselmann
    op1 = jqueue._QueuedOpCode(inpop)
273 8f5c488d Michael Hanselmann
    _Check(op1)
274 8f5c488d Michael Hanselmann
    op2 = jqueue._QueuedOpCode.Restore(op1.Serialize())
275 8f5c488d Michael Hanselmann
    _Check(op2)
276 8f5c488d Michael Hanselmann
    self.assertEqual(op1.Serialize(), op2.Serialize())
277 8f5c488d Michael Hanselmann
278 8f5c488d Michael Hanselmann
279 8f5c488d Michael Hanselmann
class TestQueuedJob(unittest.TestCase):
280 5f6b0b71 Michael Hanselmann
  def test(self):
281 5f6b0b71 Michael Hanselmann
    self.assertRaises(errors.GenericError, jqueue._QueuedJob,
282 5f6b0b71 Michael Hanselmann
                      None, 1, [])
283 5f6b0b71 Michael Hanselmann
284 8f5c488d Michael Hanselmann
  def testDefaults(self):
285 8f5c488d Michael Hanselmann
    job_id = 4260
286 8f5c488d Michael Hanselmann
    ops = [
287 8f5c488d Michael Hanselmann
      opcodes.OpGetTags(),
288 8f5c488d Michael Hanselmann
      opcodes.OpTestDelay(),
289 8f5c488d Michael Hanselmann
      ]
290 8f5c488d Michael Hanselmann
291 8f5c488d Michael Hanselmann
    def _Check(job):
292 8f5c488d Michael Hanselmann
      self.assertEqual(job.id, job_id)
293 8f5c488d Michael Hanselmann
      self.assertEqual(job.log_serial, 0)
294 8f5c488d Michael Hanselmann
      self.assert_(job.received_timestamp)
295 8f5c488d Michael Hanselmann
      self.assert_(job.start_timestamp is None)
296 8f5c488d Michael Hanselmann
      self.assert_(job.end_timestamp is None)
297 8f5c488d Michael Hanselmann
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
298 8f5c488d Michael Hanselmann
      self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT)
299 8f5c488d Michael Hanselmann
      self.assert_(repr(job).startswith("<"))
300 8f5c488d Michael Hanselmann
      self.assertEqual(len(job.ops), len(ops))
301 8f5c488d Michael Hanselmann
      self.assert_(compat.all(inp.__getstate__() == op.input.__getstate__()
302 8f5c488d Michael Hanselmann
                              for (inp, op) in zip(ops, job.ops)))
303 5f6b0b71 Michael Hanselmann
      self.assertRaises(errors.OpExecError, job.GetInfo,
304 5f6b0b71 Michael Hanselmann
                        ["unknown-field"])
305 5f6b0b71 Michael Hanselmann
      self.assertEqual(job.GetInfo(["summary"]),
306 5f6b0b71 Michael Hanselmann
                       [[op.input.Summary() for op in job.ops]])
307 8f5c488d Michael Hanselmann
308 8f5c488d Michael Hanselmann
    job1 = jqueue._QueuedJob(None, job_id, ops)
309 8f5c488d Michael Hanselmann
    _Check(job1)
310 8f5c488d Michael Hanselmann
    job2 = jqueue._QueuedJob.Restore(None, job1.Serialize())
311 8f5c488d Michael Hanselmann
    _Check(job2)
312 8f5c488d Michael Hanselmann
    self.assertEqual(job1.Serialize(), job2.Serialize())
313 8f5c488d Michael Hanselmann
314 8f5c488d Michael Hanselmann
  def testPriority(self):
315 8f5c488d Michael Hanselmann
    job_id = 4283
316 8f5c488d Michael Hanselmann
    ops = [
317 8f5c488d Michael Hanselmann
      opcodes.OpGetTags(priority=constants.OP_PRIO_DEFAULT),
318 8f5c488d Michael Hanselmann
      opcodes.OpTestDelay(),
319 8f5c488d Michael Hanselmann
      ]
320 8f5c488d Michael Hanselmann
321 8f5c488d Michael Hanselmann
    def _Check(job):
322 8f5c488d Michael Hanselmann
      self.assertEqual(job.id, job_id)
323 8f5c488d Michael Hanselmann
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
324 8f5c488d Michael Hanselmann
      self.assert_(repr(job).startswith("<"))
325 8f5c488d Michael Hanselmann
326 8f5c488d Michael Hanselmann
    job = jqueue._QueuedJob(None, job_id, ops)
327 8f5c488d Michael Hanselmann
    _Check(job)
328 8f5c488d Michael Hanselmann
    self.assert_(compat.all(op.priority == constants.OP_PRIO_DEFAULT
329 8f5c488d Michael Hanselmann
                            for op in job.ops))
330 8f5c488d Michael Hanselmann
    self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT)
331 8f5c488d Michael Hanselmann
332 8f5c488d Michael Hanselmann
    # Increase first
333 8f5c488d Michael Hanselmann
    job.ops[0].priority -= 1
334 8f5c488d Michael Hanselmann
    _Check(job)
335 8f5c488d Michael Hanselmann
    self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT - 1)
336 8f5c488d Michael Hanselmann
337 8f5c488d Michael Hanselmann
    # Mark opcode as finished
338 8f5c488d Michael Hanselmann
    job.ops[0].status = constants.OP_STATUS_SUCCESS
339 8f5c488d Michael Hanselmann
    _Check(job)
340 8f5c488d Michael Hanselmann
    self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT)
341 8f5c488d Michael Hanselmann
342 8f5c488d Michael Hanselmann
    # Increase second
343 8f5c488d Michael Hanselmann
    job.ops[1].priority -= 10
344 8f5c488d Michael Hanselmann
    self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT - 10)
345 8f5c488d Michael Hanselmann
346 8f5c488d Michael Hanselmann
    # Test increasing first
347 8f5c488d Michael Hanselmann
    job.ops[0].status = constants.OP_STATUS_RUNNING
348 8f5c488d Michael Hanselmann
    job.ops[0].priority -= 19
349 8f5c488d Michael Hanselmann
    self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT - 20)
350 8f5c488d Michael Hanselmann
351 db5bce34 Michael Hanselmann
  def testCalcStatus(self):
352 db5bce34 Michael Hanselmann
    def _Queued(ops):
353 db5bce34 Michael Hanselmann
      # The default status is "queued"
354 db5bce34 Michael Hanselmann
      self.assert_(compat.all(op.status == constants.OP_STATUS_QUEUED
355 db5bce34 Michael Hanselmann
                              for op in ops))
356 db5bce34 Michael Hanselmann
357 db5bce34 Michael Hanselmann
    def _Waitlock1(ops):
358 db5bce34 Michael Hanselmann
      ops[0].status = constants.OP_STATUS_WAITLOCK
359 db5bce34 Michael Hanselmann
360 db5bce34 Michael Hanselmann
    def _Waitlock2(ops):
361 db5bce34 Michael Hanselmann
      ops[0].status = constants.OP_STATUS_SUCCESS
362 db5bce34 Michael Hanselmann
      ops[1].status = constants.OP_STATUS_SUCCESS
363 db5bce34 Michael Hanselmann
      ops[2].status = constants.OP_STATUS_WAITLOCK
364 db5bce34 Michael Hanselmann
365 db5bce34 Michael Hanselmann
    def _Running(ops):
366 db5bce34 Michael Hanselmann
      ops[0].status = constants.OP_STATUS_SUCCESS
367 db5bce34 Michael Hanselmann
      ops[1].status = constants.OP_STATUS_RUNNING
368 db5bce34 Michael Hanselmann
      for op in ops[2:]:
369 db5bce34 Michael Hanselmann
        op.status = constants.OP_STATUS_QUEUED
370 db5bce34 Michael Hanselmann
371 db5bce34 Michael Hanselmann
    def _Canceling1(ops):
372 db5bce34 Michael Hanselmann
      ops[0].status = constants.OP_STATUS_SUCCESS
373 db5bce34 Michael Hanselmann
      ops[1].status = constants.OP_STATUS_SUCCESS
374 db5bce34 Michael Hanselmann
      for op in ops[2:]:
375 db5bce34 Michael Hanselmann
        op.status = constants.OP_STATUS_CANCELING
376 db5bce34 Michael Hanselmann
377 db5bce34 Michael Hanselmann
    def _Canceling2(ops):
378 db5bce34 Michael Hanselmann
      for op in ops:
379 db5bce34 Michael Hanselmann
        op.status = constants.OP_STATUS_CANCELING
380 db5bce34 Michael Hanselmann
381 db5bce34 Michael Hanselmann
    def _Canceled(ops):
382 db5bce34 Michael Hanselmann
      for op in ops:
383 db5bce34 Michael Hanselmann
        op.status = constants.OP_STATUS_CANCELED
384 db5bce34 Michael Hanselmann
385 db5bce34 Michael Hanselmann
    def _Error1(ops):
386 db5bce34 Michael Hanselmann
      for idx, op in enumerate(ops):
387 db5bce34 Michael Hanselmann
        if idx > 3:
388 db5bce34 Michael Hanselmann
          op.status = constants.OP_STATUS_ERROR
389 db5bce34 Michael Hanselmann
        else:
390 db5bce34 Michael Hanselmann
          op.status = constants.OP_STATUS_SUCCESS
391 db5bce34 Michael Hanselmann
392 db5bce34 Michael Hanselmann
    def _Error2(ops):
393 db5bce34 Michael Hanselmann
      for op in ops:
394 db5bce34 Michael Hanselmann
        op.status = constants.OP_STATUS_ERROR
395 db5bce34 Michael Hanselmann
396 db5bce34 Michael Hanselmann
    def _Success(ops):
397 db5bce34 Michael Hanselmann
      for op in ops:
398 db5bce34 Michael Hanselmann
        op.status = constants.OP_STATUS_SUCCESS
399 db5bce34 Michael Hanselmann
400 db5bce34 Michael Hanselmann
    tests = {
401 db5bce34 Michael Hanselmann
      constants.JOB_STATUS_QUEUED: [_Queued],
402 db5bce34 Michael Hanselmann
      constants.JOB_STATUS_WAITLOCK: [_Waitlock1, _Waitlock2],
403 db5bce34 Michael Hanselmann
      constants.JOB_STATUS_RUNNING: [_Running],
404 db5bce34 Michael Hanselmann
      constants.JOB_STATUS_CANCELING: [_Canceling1, _Canceling2],
405 db5bce34 Michael Hanselmann
      constants.JOB_STATUS_CANCELED: [_Canceled],
406 db5bce34 Michael Hanselmann
      constants.JOB_STATUS_ERROR: [_Error1, _Error2],
407 db5bce34 Michael Hanselmann
      constants.JOB_STATUS_SUCCESS: [_Success],
408 db5bce34 Michael Hanselmann
      }
409 db5bce34 Michael Hanselmann
410 db5bce34 Michael Hanselmann
    def _NewJob():
411 db5bce34 Michael Hanselmann
      job = jqueue._QueuedJob(None, 1,
412 db5bce34 Michael Hanselmann
                              [opcodes.OpTestDelay() for _ in range(10)])
413 db5bce34 Michael Hanselmann
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
414 db5bce34 Michael Hanselmann
      self.assert_(compat.all(op.status == constants.OP_STATUS_QUEUED
415 db5bce34 Michael Hanselmann
                              for op in job.ops))
416 db5bce34 Michael Hanselmann
      return job
417 db5bce34 Michael Hanselmann
418 db5bce34 Michael Hanselmann
    for status in constants.JOB_STATUS_ALL:
419 db5bce34 Michael Hanselmann
      sttests = tests[status]
420 db5bce34 Michael Hanselmann
      assert sttests
421 db5bce34 Michael Hanselmann
      for fn in sttests:
422 db5bce34 Michael Hanselmann
        job = _NewJob()
423 db5bce34 Michael Hanselmann
        fn(job.ops)
424 db5bce34 Michael Hanselmann
        self.assertEqual(job.CalcStatus(), status)
425 db5bce34 Michael Hanselmann
426 8f5c488d Michael Hanselmann
427 be760ba8 Michael Hanselmann
class _FakeQueueForProc:
428 be760ba8 Michael Hanselmann
  def __init__(self):
429 be760ba8 Michael Hanselmann
    self._acquired = False
430 be760ba8 Michael Hanselmann
431 be760ba8 Michael Hanselmann
  def IsAcquired(self):
432 be760ba8 Michael Hanselmann
    return self._acquired
433 be760ba8 Michael Hanselmann
434 be760ba8 Michael Hanselmann
  def acquire(self, shared=0):
435 be760ba8 Michael Hanselmann
    assert shared == 1
436 be760ba8 Michael Hanselmann
    self._acquired = True
437 be760ba8 Michael Hanselmann
438 be760ba8 Michael Hanselmann
  def release(self):
439 be760ba8 Michael Hanselmann
    assert self._acquired
440 be760ba8 Michael Hanselmann
    self._acquired = False
441 be760ba8 Michael Hanselmann
442 be760ba8 Michael Hanselmann
  def UpdateJobUnlocked(self, job, replicate=None):
443 be760ba8 Michael Hanselmann
    # TODO: Ensure job is updated at the correct places
444 be760ba8 Michael Hanselmann
    pass
445 be760ba8 Michael Hanselmann
446 be760ba8 Michael Hanselmann
447 be760ba8 Michael Hanselmann
class _FakeExecOpCodeForProc:
448 be760ba8 Michael Hanselmann
  def __init__(self, before_start, after_start):
449 be760ba8 Michael Hanselmann
    self._before_start = before_start
450 be760ba8 Michael Hanselmann
    self._after_start = after_start
451 be760ba8 Michael Hanselmann
452 26d3fd2f Michael Hanselmann
  def __call__(self, op, cbs, timeout=None):
453 be760ba8 Michael Hanselmann
    assert isinstance(op, opcodes.OpTestDummy)
454 be760ba8 Michael Hanselmann
455 be760ba8 Michael Hanselmann
    if self._before_start:
456 26d3fd2f Michael Hanselmann
      self._before_start(timeout)
457 be760ba8 Michael Hanselmann
458 be760ba8 Michael Hanselmann
    cbs.NotifyStart()
459 be760ba8 Michael Hanselmann
460 be760ba8 Michael Hanselmann
    if self._after_start:
461 be760ba8 Michael Hanselmann
      self._after_start(op, cbs)
462 be760ba8 Michael Hanselmann
463 be760ba8 Michael Hanselmann
    if op.fail:
464 be760ba8 Michael Hanselmann
      raise errors.OpExecError("Error requested (%s)" % op.result)
465 be760ba8 Michael Hanselmann
466 be760ba8 Michael Hanselmann
    return op.result
467 be760ba8 Michael Hanselmann
468 be760ba8 Michael Hanselmann
469 26d3fd2f Michael Hanselmann
class _JobProcessorTestUtils:
470 be760ba8 Michael Hanselmann
  def _CreateJob(self, queue, job_id, ops):
471 be760ba8 Michael Hanselmann
    job = jqueue._QueuedJob(queue, job_id, ops)
472 be760ba8 Michael Hanselmann
    self.assertFalse(job.start_timestamp)
473 be760ba8 Michael Hanselmann
    self.assertFalse(job.end_timestamp)
474 be760ba8 Michael Hanselmann
    self.assertEqual(len(ops), len(job.ops))
475 be760ba8 Michael Hanselmann
    self.assert_(compat.all(op.input == inp
476 be760ba8 Michael Hanselmann
                            for (op, inp) in zip(job.ops, ops)))
477 be760ba8 Michael Hanselmann
    self.assertEqual(job.GetInfo(["ops"]), [[op.__getstate__() for op in ops]])
478 be760ba8 Michael Hanselmann
    return job
479 be760ba8 Michael Hanselmann
480 26d3fd2f Michael Hanselmann
481 26d3fd2f Michael Hanselmann
class TestJobProcessor(unittest.TestCase, _JobProcessorTestUtils):
482 be760ba8 Michael Hanselmann
  def _GenericCheckJob(self, job):
483 be760ba8 Michael Hanselmann
    assert compat.all(isinstance(op.input, opcodes.OpTestDummy)
484 be760ba8 Michael Hanselmann
                      for op in job.ops)
485 be760ba8 Michael Hanselmann
486 be760ba8 Michael Hanselmann
    self.assertEqual(job.GetInfo(["opstart", "opexec", "opend"]),
487 be760ba8 Michael Hanselmann
                     [[op.start_timestamp for op in job.ops],
488 be760ba8 Michael Hanselmann
                      [op.exec_timestamp for op in job.ops],
489 be760ba8 Michael Hanselmann
                      [op.end_timestamp for op in job.ops]])
490 be760ba8 Michael Hanselmann
    self.assertEqual(job.GetInfo(["received_ts", "start_ts", "end_ts"]),
491 be760ba8 Michael Hanselmann
                     [job.received_timestamp,
492 be760ba8 Michael Hanselmann
                      job.start_timestamp,
493 be760ba8 Michael Hanselmann
                      job.end_timestamp])
494 be760ba8 Michael Hanselmann
    self.assert_(job.start_timestamp)
495 be760ba8 Michael Hanselmann
    self.assert_(job.end_timestamp)
496 be760ba8 Michael Hanselmann
    self.assertEqual(job.start_timestamp, job.ops[0].start_timestamp)
497 be760ba8 Michael Hanselmann
498 be760ba8 Michael Hanselmann
  def testSuccess(self):
499 be760ba8 Michael Hanselmann
    queue = _FakeQueueForProc()
500 be760ba8 Michael Hanselmann
501 be760ba8 Michael Hanselmann
    for (job_id, opcount) in [(25351, 1), (6637, 3),
502 be760ba8 Michael Hanselmann
                              (24644, 10), (32207, 100)]:
503 be760ba8 Michael Hanselmann
      ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
504 be760ba8 Michael Hanselmann
             for i in range(opcount)]
505 be760ba8 Michael Hanselmann
506 be760ba8 Michael Hanselmann
      # Create job
507 be760ba8 Michael Hanselmann
      job = self._CreateJob(queue, job_id, ops)
508 be760ba8 Michael Hanselmann
509 26d3fd2f Michael Hanselmann
      def _BeforeStart(_):
510 be760ba8 Michael Hanselmann
        self.assertFalse(queue.IsAcquired())
511 be760ba8 Michael Hanselmann
        self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITLOCK)
512 be760ba8 Michael Hanselmann
513 be760ba8 Michael Hanselmann
      def _AfterStart(op, cbs):
514 be760ba8 Michael Hanselmann
        self.assertFalse(queue.IsAcquired())
515 be760ba8 Michael Hanselmann
        self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
516 be760ba8 Michael Hanselmann
517 be760ba8 Michael Hanselmann
        # Job is running, cancelling shouldn't be possible
518 be760ba8 Michael Hanselmann
        (success, _) = job.Cancel()
519 be760ba8 Michael Hanselmann
        self.assertFalse(success)
520 be760ba8 Michael Hanselmann
521 be760ba8 Michael Hanselmann
      opexec = _FakeExecOpCodeForProc(_BeforeStart, _AfterStart)
522 be760ba8 Michael Hanselmann
523 be760ba8 Michael Hanselmann
      for idx in range(len(ops)):
524 be760ba8 Michael Hanselmann
        result = jqueue._JobProcessor(queue, opexec, job)()
525 be760ba8 Michael Hanselmann
        if idx == len(ops) - 1:
526 be760ba8 Michael Hanselmann
          # Last opcode
527 be760ba8 Michael Hanselmann
          self.assert_(result)
528 be760ba8 Michael Hanselmann
        else:
529 be760ba8 Michael Hanselmann
          self.assertFalse(result)
530 be760ba8 Michael Hanselmann
531 be760ba8 Michael Hanselmann
          self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
532 be760ba8 Michael Hanselmann
          self.assert_(job.start_timestamp)
533 be760ba8 Michael Hanselmann
          self.assertFalse(job.end_timestamp)
534 be760ba8 Michael Hanselmann
535 be760ba8 Michael Hanselmann
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_SUCCESS)
536 be760ba8 Michael Hanselmann
      self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_SUCCESS])
537 be760ba8 Michael Hanselmann
      self.assertEqual(job.GetInfo(["opresult"]),
538 be760ba8 Michael Hanselmann
                       [[op.input.result for op in job.ops]])
539 be760ba8 Michael Hanselmann
      self.assertEqual(job.GetInfo(["opstatus"]),
540 be760ba8 Michael Hanselmann
                       [len(job.ops) * [constants.OP_STATUS_SUCCESS]])
541 be760ba8 Michael Hanselmann
      self.assert_(compat.all(op.start_timestamp and op.end_timestamp
542 be760ba8 Michael Hanselmann
                              for op in job.ops))
543 be760ba8 Michael Hanselmann
544 be760ba8 Michael Hanselmann
      self._GenericCheckJob(job)
545 be760ba8 Michael Hanselmann
546 be760ba8 Michael Hanselmann
      # Finished jobs can't be processed any further
547 be760ba8 Michael Hanselmann
      self.assertRaises(errors.ProgrammerError,
548 be760ba8 Michael Hanselmann
                        jqueue._JobProcessor(queue, opexec, job))
549 be760ba8 Michael Hanselmann
550 be760ba8 Michael Hanselmann
  def testOpcodeError(self):
551 be760ba8 Michael Hanselmann
    queue = _FakeQueueForProc()
552 be760ba8 Michael Hanselmann
553 be760ba8 Michael Hanselmann
    testdata = [
554 be760ba8 Michael Hanselmann
      (17077, 1, 0, 0),
555 be760ba8 Michael Hanselmann
      (1782, 5, 2, 2),
556 be760ba8 Michael Hanselmann
      (18179, 10, 9, 9),
557 be760ba8 Michael Hanselmann
      (4744, 10, 3, 8),
558 be760ba8 Michael Hanselmann
      (23816, 100, 39, 45),
559 be760ba8 Michael Hanselmann
      ]
560 be760ba8 Michael Hanselmann
561 be760ba8 Michael Hanselmann
    for (job_id, opcount, failfrom, failto) in testdata:
562 be760ba8 Michael Hanselmann
      # Prepare opcodes
563 be760ba8 Michael Hanselmann
      ops = [opcodes.OpTestDummy(result="Res%s" % i,
564 be760ba8 Michael Hanselmann
                                 fail=(failfrom <= i and
565 be760ba8 Michael Hanselmann
                                       i <= failto))
566 be760ba8 Michael Hanselmann
             for i in range(opcount)]
567 be760ba8 Michael Hanselmann
568 be760ba8 Michael Hanselmann
      # Create job
569 be760ba8 Michael Hanselmann
      job = self._CreateJob(queue, job_id, ops)
570 be760ba8 Michael Hanselmann
571 be760ba8 Michael Hanselmann
      opexec = _FakeExecOpCodeForProc(None, None)
572 be760ba8 Michael Hanselmann
573 be760ba8 Michael Hanselmann
      for idx in range(len(ops)):
574 be760ba8 Michael Hanselmann
        result = jqueue._JobProcessor(queue, opexec, job)()
575 be760ba8 Michael Hanselmann
576 be760ba8 Michael Hanselmann
        if idx in (failfrom, len(ops) - 1):
577 be760ba8 Michael Hanselmann
          # Last opcode
578 be760ba8 Michael Hanselmann
          self.assert_(result)
579 be760ba8 Michael Hanselmann
          break
580 be760ba8 Michael Hanselmann
581 be760ba8 Michael Hanselmann
        self.assertFalse(result)
582 be760ba8 Michael Hanselmann
583 be760ba8 Michael Hanselmann
        self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
584 be760ba8 Michael Hanselmann
585 be760ba8 Michael Hanselmann
      # Check job status
586 be760ba8 Michael Hanselmann
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_ERROR)
587 be760ba8 Michael Hanselmann
      self.assertEqual(job.GetInfo(["id"]), [job_id])
588 be760ba8 Michael Hanselmann
      self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_ERROR])
589 be760ba8 Michael Hanselmann
590 be760ba8 Michael Hanselmann
      # Check opcode status
591 be760ba8 Michael Hanselmann
      data = zip(job.ops,
592 be760ba8 Michael Hanselmann
                 job.GetInfo(["opstatus"])[0],
593 be760ba8 Michael Hanselmann
                 job.GetInfo(["opresult"])[0])
594 be760ba8 Michael Hanselmann
595 be760ba8 Michael Hanselmann
      for idx, (op, opstatus, opresult) in enumerate(data):
596 be760ba8 Michael Hanselmann
        if idx < failfrom:
597 be760ba8 Michael Hanselmann
          assert not op.input.fail
598 be760ba8 Michael Hanselmann
          self.assertEqual(opstatus, constants.OP_STATUS_SUCCESS)
599 be760ba8 Michael Hanselmann
          self.assertEqual(opresult, op.input.result)
600 be760ba8 Michael Hanselmann
        elif idx <= failto:
601 be760ba8 Michael Hanselmann
          assert op.input.fail
602 be760ba8 Michael Hanselmann
          self.assertEqual(opstatus, constants.OP_STATUS_ERROR)
603 be760ba8 Michael Hanselmann
          self.assertRaises(errors.OpExecError, errors.MaybeRaise, opresult)
604 be760ba8 Michael Hanselmann
        else:
605 be760ba8 Michael Hanselmann
          assert not op.input.fail
606 be760ba8 Michael Hanselmann
          self.assertEqual(opstatus, constants.OP_STATUS_ERROR)
607 be760ba8 Michael Hanselmann
          self.assertRaises(errors.OpExecError, errors.MaybeRaise, opresult)
608 be760ba8 Michael Hanselmann
609 be760ba8 Michael Hanselmann
      self.assert_(compat.all(op.start_timestamp and op.end_timestamp
610 be760ba8 Michael Hanselmann
                              for op in job.ops[:failfrom]))
611 be760ba8 Michael Hanselmann
612 be760ba8 Michael Hanselmann
      self._GenericCheckJob(job)
613 be760ba8 Michael Hanselmann
614 be760ba8 Michael Hanselmann
      # Finished jobs can't be processed any further
615 be760ba8 Michael Hanselmann
      self.assertRaises(errors.ProgrammerError,
616 be760ba8 Michael Hanselmann
                        jqueue._JobProcessor(queue, opexec, job))
617 be760ba8 Michael Hanselmann
618 be760ba8 Michael Hanselmann
  def testCancelWhileInQueue(self):
619 be760ba8 Michael Hanselmann
    queue = _FakeQueueForProc()
620 be760ba8 Michael Hanselmann
621 be760ba8 Michael Hanselmann
    ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
622 be760ba8 Michael Hanselmann
           for i in range(5)]
623 be760ba8 Michael Hanselmann
624 be760ba8 Michael Hanselmann
    # Create job
625 be760ba8 Michael Hanselmann
    job_id = 17045
626 be760ba8 Michael Hanselmann
    job = self._CreateJob(queue, job_id, ops)
627 be760ba8 Michael Hanselmann
628 be760ba8 Michael Hanselmann
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
629 be760ba8 Michael Hanselmann
630 be760ba8 Michael Hanselmann
    # Mark as cancelled
631 be760ba8 Michael Hanselmann
    (success, _) = job.Cancel()
632 be760ba8 Michael Hanselmann
    self.assert_(success)
633 be760ba8 Michael Hanselmann
634 be760ba8 Michael Hanselmann
    self.assert_(compat.all(op.status == constants.OP_STATUS_CANCELED
635 be760ba8 Michael Hanselmann
                            for op in job.ops))
636 be760ba8 Michael Hanselmann
637 be760ba8 Michael Hanselmann
    opexec = _FakeExecOpCodeForProc(None, None)
638 be760ba8 Michael Hanselmann
    jqueue._JobProcessor(queue, opexec, job)()
639 be760ba8 Michael Hanselmann
640 be760ba8 Michael Hanselmann
    # Check result
641 be760ba8 Michael Hanselmann
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_CANCELED)
642 be760ba8 Michael Hanselmann
    self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_CANCELED])
643 be760ba8 Michael Hanselmann
    self.assertFalse(job.start_timestamp)
644 be760ba8 Michael Hanselmann
    self.assert_(job.end_timestamp)
645 be760ba8 Michael Hanselmann
    self.assertFalse(compat.any(op.start_timestamp or op.end_timestamp
646 be760ba8 Michael Hanselmann
                                for op in job.ops))
647 be760ba8 Michael Hanselmann
    self.assertEqual(job.GetInfo(["opstatus", "opresult"]),
648 be760ba8 Michael Hanselmann
                     [[constants.OP_STATUS_CANCELED for _ in job.ops],
649 be760ba8 Michael Hanselmann
                      ["Job canceled by request" for _ in job.ops]])
650 be760ba8 Michael Hanselmann
651 be760ba8 Michael Hanselmann
  def testCancelWhileWaitlock(self):
652 be760ba8 Michael Hanselmann
    queue = _FakeQueueForProc()
653 be760ba8 Michael Hanselmann
654 be760ba8 Michael Hanselmann
    ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
655 be760ba8 Michael Hanselmann
           for i in range(5)]
656 be760ba8 Michael Hanselmann
657 be760ba8 Michael Hanselmann
    # Create job
658 be760ba8 Michael Hanselmann
    job_id = 11009
659 be760ba8 Michael Hanselmann
    job = self._CreateJob(queue, job_id, ops)
660 be760ba8 Michael Hanselmann
661 be760ba8 Michael Hanselmann
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
662 be760ba8 Michael Hanselmann
663 26d3fd2f Michael Hanselmann
    def _BeforeStart(_):
664 be760ba8 Michael Hanselmann
      self.assertFalse(queue.IsAcquired())
665 be760ba8 Michael Hanselmann
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITLOCK)
666 be760ba8 Michael Hanselmann
667 be760ba8 Michael Hanselmann
      # Mark as cancelled
668 be760ba8 Michael Hanselmann
      (success, _) = job.Cancel()
669 be760ba8 Michael Hanselmann
      self.assert_(success)
670 be760ba8 Michael Hanselmann
671 be760ba8 Michael Hanselmann
      self.assert_(compat.all(op.status == constants.OP_STATUS_CANCELING
672 be760ba8 Michael Hanselmann
                              for op in job.ops))
673 be760ba8 Michael Hanselmann
674 be760ba8 Michael Hanselmann
    def _AfterStart(op, cbs):
675 be760ba8 Michael Hanselmann
      self.assertFalse(queue.IsAcquired())
676 be760ba8 Michael Hanselmann
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
677 be760ba8 Michael Hanselmann
678 be760ba8 Michael Hanselmann
    opexec = _FakeExecOpCodeForProc(_BeforeStart, _AfterStart)
679 be760ba8 Michael Hanselmann
680 be760ba8 Michael Hanselmann
    jqueue._JobProcessor(queue, opexec, job)()
681 be760ba8 Michael Hanselmann
682 be760ba8 Michael Hanselmann
    # Check result
683 be760ba8 Michael Hanselmann
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_CANCELED)
684 be760ba8 Michael Hanselmann
    self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_CANCELED])
685 be760ba8 Michael Hanselmann
    self.assert_(job.start_timestamp)
686 be760ba8 Michael Hanselmann
    self.assert_(job.end_timestamp)
687 be760ba8 Michael Hanselmann
    self.assertFalse(compat.all(op.start_timestamp and op.end_timestamp
688 be760ba8 Michael Hanselmann
                                for op in job.ops))
689 be760ba8 Michael Hanselmann
    self.assertEqual(job.GetInfo(["opstatus", "opresult"]),
690 be760ba8 Michael Hanselmann
                     [[constants.OP_STATUS_CANCELED for _ in job.ops],
691 be760ba8 Michael Hanselmann
                      ["Job canceled by request" for _ in job.ops]])
692 be760ba8 Michael Hanselmann
693 be760ba8 Michael Hanselmann
  def testCancelWhileRunning(self):
694 be760ba8 Michael Hanselmann
    # Tests canceling a job with finished opcodes and more, unprocessed ones
695 be760ba8 Michael Hanselmann
    queue = _FakeQueueForProc()
696 be760ba8 Michael Hanselmann
697 be760ba8 Michael Hanselmann
    ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
698 be760ba8 Michael Hanselmann
           for i in range(3)]
699 be760ba8 Michael Hanselmann
700 be760ba8 Michael Hanselmann
    # Create job
701 be760ba8 Michael Hanselmann
    job_id = 28492
702 be760ba8 Michael Hanselmann
    job = self._CreateJob(queue, job_id, ops)
703 be760ba8 Michael Hanselmann
704 be760ba8 Michael Hanselmann
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
705 be760ba8 Michael Hanselmann
706 be760ba8 Michael Hanselmann
    opexec = _FakeExecOpCodeForProc(None, None)
707 be760ba8 Michael Hanselmann
708 be760ba8 Michael Hanselmann
    # Run one opcode
709 be760ba8 Michael Hanselmann
    self.assertFalse(jqueue._JobProcessor(queue, opexec, job)())
710 be760ba8 Michael Hanselmann
711 be760ba8 Michael Hanselmann
    # Job goes back to queued
712 be760ba8 Michael Hanselmann
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
713 be760ba8 Michael Hanselmann
    self.assertEqual(job.GetInfo(["opstatus", "opresult"]),
714 be760ba8 Michael Hanselmann
                     [[constants.OP_STATUS_SUCCESS,
715 be760ba8 Michael Hanselmann
                       constants.OP_STATUS_QUEUED,
716 be760ba8 Michael Hanselmann
                       constants.OP_STATUS_QUEUED],
717 be760ba8 Michael Hanselmann
                      ["Res0", None, None]])
718 be760ba8 Michael Hanselmann
719 be760ba8 Michael Hanselmann
    # Mark as cancelled
720 be760ba8 Michael Hanselmann
    (success, _) = job.Cancel()
721 be760ba8 Michael Hanselmann
    self.assert_(success)
722 be760ba8 Michael Hanselmann
723 be760ba8 Michael Hanselmann
    # Try processing another opcode (this will actually cancel the job)
724 be760ba8 Michael Hanselmann
    self.assert_(jqueue._JobProcessor(queue, opexec, job)())
725 be760ba8 Michael Hanselmann
726 be760ba8 Michael Hanselmann
    # Check result
727 be760ba8 Michael Hanselmann
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_CANCELED)
728 be760ba8 Michael Hanselmann
    self.assertEqual(job.GetInfo(["id"]), [job_id])
729 be760ba8 Michael Hanselmann
    self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_CANCELED])
730 be760ba8 Michael Hanselmann
    self.assertEqual(job.GetInfo(["opstatus", "opresult"]),
731 be760ba8 Michael Hanselmann
                     [[constants.OP_STATUS_SUCCESS,
732 be760ba8 Michael Hanselmann
                       constants.OP_STATUS_CANCELED,
733 be760ba8 Michael Hanselmann
                       constants.OP_STATUS_CANCELED],
734 be760ba8 Michael Hanselmann
                      ["Res0", "Job canceled by request",
735 be760ba8 Michael Hanselmann
                       "Job canceled by request"]])
736 be760ba8 Michael Hanselmann
737 be760ba8 Michael Hanselmann
  def testPartiallyRun(self):
738 be760ba8 Michael Hanselmann
    # Tests calling the processor on a job that's been partially run before the
739 be760ba8 Michael Hanselmann
    # program was restarted
740 be760ba8 Michael Hanselmann
    queue = _FakeQueueForProc()
741 be760ba8 Michael Hanselmann
742 be760ba8 Michael Hanselmann
    opexec = _FakeExecOpCodeForProc(None, None)
743 be760ba8 Michael Hanselmann
744 be760ba8 Michael Hanselmann
    for job_id, successcount in [(30697, 1), (2552, 4), (12489, 9)]:
745 be760ba8 Michael Hanselmann
      ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
746 be760ba8 Michael Hanselmann
             for i in range(10)]
747 be760ba8 Michael Hanselmann
748 be760ba8 Michael Hanselmann
      # Create job
749 be760ba8 Michael Hanselmann
      job = self._CreateJob(queue, job_id, ops)
750 be760ba8 Michael Hanselmann
751 be760ba8 Michael Hanselmann
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
752 be760ba8 Michael Hanselmann
753 be760ba8 Michael Hanselmann
      for _ in range(successcount):
754 be760ba8 Michael Hanselmann
        self.assertFalse(jqueue._JobProcessor(queue, opexec, job)())
755 be760ba8 Michael Hanselmann
756 be760ba8 Michael Hanselmann
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
757 be760ba8 Michael Hanselmann
      self.assertEqual(job.GetInfo(["opstatus"]),
758 be760ba8 Michael Hanselmann
                       [[constants.OP_STATUS_SUCCESS
759 be760ba8 Michael Hanselmann
                         for _ in range(successcount)] +
760 be760ba8 Michael Hanselmann
                        [constants.OP_STATUS_QUEUED
761 be760ba8 Michael Hanselmann
                         for _ in range(len(ops) - successcount)]])
762 be760ba8 Michael Hanselmann
763 03b63608 Michael Hanselmann
      self.assert_(job.ops_iter)
764 be760ba8 Michael Hanselmann
765 be760ba8 Michael Hanselmann
      # Serialize and restore (simulates program restart)
766 be760ba8 Michael Hanselmann
      newjob = jqueue._QueuedJob.Restore(queue, job.Serialize())
767 03b63608 Michael Hanselmann
      self.assertFalse(newjob.ops_iter)
768 be760ba8 Michael Hanselmann
      self._TestPartial(newjob, successcount)
769 be760ba8 Michael Hanselmann
770 be760ba8 Michael Hanselmann
  def _TestPartial(self, job, successcount):
771 be760ba8 Michael Hanselmann
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
772 be760ba8 Michael Hanselmann
    self.assertEqual(job.start_timestamp, job.ops[0].start_timestamp)
773 be760ba8 Michael Hanselmann
774 be760ba8 Michael Hanselmann
    queue = _FakeQueueForProc()
775 be760ba8 Michael Hanselmann
    opexec = _FakeExecOpCodeForProc(None, None)
776 be760ba8 Michael Hanselmann
777 be760ba8 Michael Hanselmann
    for remaining in reversed(range(len(job.ops) - successcount)):
778 be760ba8 Michael Hanselmann
      result = jqueue._JobProcessor(queue, opexec, job)()
779 be760ba8 Michael Hanselmann
780 be760ba8 Michael Hanselmann
      if remaining == 0:
781 be760ba8 Michael Hanselmann
        # Last opcode
782 be760ba8 Michael Hanselmann
        self.assert_(result)
783 be760ba8 Michael Hanselmann
        break
784 be760ba8 Michael Hanselmann
785 be760ba8 Michael Hanselmann
      self.assertFalse(result)
786 be760ba8 Michael Hanselmann
787 be760ba8 Michael Hanselmann
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
788 be760ba8 Michael Hanselmann
789 be760ba8 Michael Hanselmann
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_SUCCESS)
790 be760ba8 Michael Hanselmann
    self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_SUCCESS])
791 be760ba8 Michael Hanselmann
    self.assertEqual(job.GetInfo(["opresult"]),
792 be760ba8 Michael Hanselmann
                     [[op.input.result for op in job.ops]])
793 be760ba8 Michael Hanselmann
    self.assertEqual(job.GetInfo(["opstatus"]),
794 be760ba8 Michael Hanselmann
                     [[constants.OP_STATUS_SUCCESS for _ in job.ops]])
795 be760ba8 Michael Hanselmann
    self.assert_(compat.all(op.start_timestamp and op.end_timestamp
796 be760ba8 Michael Hanselmann
                            for op in job.ops))
797 be760ba8 Michael Hanselmann
798 be760ba8 Michael Hanselmann
    self._GenericCheckJob(job)
799 be760ba8 Michael Hanselmann
800 be760ba8 Michael Hanselmann
    # Finished jobs can't be processed any further
801 be760ba8 Michael Hanselmann
    self.assertRaises(errors.ProgrammerError,
802 be760ba8 Michael Hanselmann
                      jqueue._JobProcessor(queue, opexec, job))
803 be760ba8 Michael Hanselmann
804 be760ba8 Michael Hanselmann
    # ... also after being restored
805 be760ba8 Michael Hanselmann
    job2 = jqueue._QueuedJob.Restore(queue, job.Serialize())
806 be760ba8 Michael Hanselmann
    self.assertRaises(errors.ProgrammerError,
807 be760ba8 Michael Hanselmann
                      jqueue._JobProcessor(queue, opexec, job2))
808 be760ba8 Michael Hanselmann
809 be760ba8 Michael Hanselmann
  def testProcessorOnRunningJob(self):
810 be760ba8 Michael Hanselmann
    ops = [opcodes.OpTestDummy(result="result", fail=False)]
811 be760ba8 Michael Hanselmann
812 be760ba8 Michael Hanselmann
    queue = _FakeQueueForProc()
813 be760ba8 Michael Hanselmann
    opexec = _FakeExecOpCodeForProc(None, None)
814 be760ba8 Michael Hanselmann
815 be760ba8 Michael Hanselmann
    # Create job
816 be760ba8 Michael Hanselmann
    job = self._CreateJob(queue, 9571, ops)
817 be760ba8 Michael Hanselmann
818 be760ba8 Michael Hanselmann
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
819 be760ba8 Michael Hanselmann
820 be760ba8 Michael Hanselmann
    job.ops[0].status = constants.OP_STATUS_RUNNING
821 be760ba8 Michael Hanselmann
822 be760ba8 Michael Hanselmann
    assert len(job.ops) == 1
823 be760ba8 Michael Hanselmann
824 be760ba8 Michael Hanselmann
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
825 be760ba8 Michael Hanselmann
826 be760ba8 Michael Hanselmann
    # Calling on running job must fail
827 be760ba8 Michael Hanselmann
    self.assertRaises(errors.ProgrammerError,
828 be760ba8 Michael Hanselmann
                      jqueue._JobProcessor(queue, opexec, job))
829 be760ba8 Michael Hanselmann
830 be760ba8 Michael Hanselmann
  def testLogMessages(self):
831 be760ba8 Michael Hanselmann
    # Tests the "Feedback" callback function
832 be760ba8 Michael Hanselmann
    queue = _FakeQueueForProc()
833 be760ba8 Michael Hanselmann
834 be760ba8 Michael Hanselmann
    messages = {
835 be760ba8 Michael Hanselmann
      1: [
836 be760ba8 Michael Hanselmann
        (None, "Hello"),
837 be760ba8 Michael Hanselmann
        (None, "World"),
838 be760ba8 Michael Hanselmann
        (constants.ELOG_MESSAGE, "there"),
839 be760ba8 Michael Hanselmann
        ],
840 be760ba8 Michael Hanselmann
      4: [
841 be760ba8 Michael Hanselmann
        (constants.ELOG_JQUEUE_TEST, (1, 2, 3)),
842 be760ba8 Michael Hanselmann
        (constants.ELOG_JQUEUE_TEST, ("other", "type")),
843 be760ba8 Michael Hanselmann
        ],
844 be760ba8 Michael Hanselmann
      }
845 be760ba8 Michael Hanselmann
    ops = [opcodes.OpTestDummy(result="Logtest%s" % i, fail=False,
846 be760ba8 Michael Hanselmann
                               messages=messages.get(i, []))
847 be760ba8 Michael Hanselmann
           for i in range(5)]
848 be760ba8 Michael Hanselmann
849 be760ba8 Michael Hanselmann
    # Create job
850 be760ba8 Michael Hanselmann
    job = self._CreateJob(queue, 29386, ops)
851 be760ba8 Michael Hanselmann
852 26d3fd2f Michael Hanselmann
    def _BeforeStart(_):
853 be760ba8 Michael Hanselmann
      self.assertFalse(queue.IsAcquired())
854 be760ba8 Michael Hanselmann
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITLOCK)
855 be760ba8 Michael Hanselmann
856 be760ba8 Michael Hanselmann
    def _AfterStart(op, cbs):
857 be760ba8 Michael Hanselmann
      self.assertFalse(queue.IsAcquired())
858 be760ba8 Michael Hanselmann
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
859 be760ba8 Michael Hanselmann
860 be760ba8 Michael Hanselmann
      self.assertRaises(AssertionError, cbs.Feedback,
861 be760ba8 Michael Hanselmann
                        "too", "many", "arguments")
862 be760ba8 Michael Hanselmann
863 be760ba8 Michael Hanselmann
      for (log_type, msg) in op.messages:
864 be760ba8 Michael Hanselmann
        if log_type:
865 be760ba8 Michael Hanselmann
          cbs.Feedback(log_type, msg)
866 be760ba8 Michael Hanselmann
        else:
867 be760ba8 Michael Hanselmann
          cbs.Feedback(msg)
868 be760ba8 Michael Hanselmann
869 be760ba8 Michael Hanselmann
    opexec = _FakeExecOpCodeForProc(_BeforeStart, _AfterStart)
870 be760ba8 Michael Hanselmann
871 be760ba8 Michael Hanselmann
    for remaining in reversed(range(len(job.ops))):
872 be760ba8 Michael Hanselmann
      result = jqueue._JobProcessor(queue, opexec, job)()
873 be760ba8 Michael Hanselmann
874 be760ba8 Michael Hanselmann
      if remaining == 0:
875 be760ba8 Michael Hanselmann
        # Last opcode
876 be760ba8 Michael Hanselmann
        self.assert_(result)
877 be760ba8 Michael Hanselmann
        break
878 be760ba8 Michael Hanselmann
879 be760ba8 Michael Hanselmann
      self.assertFalse(result)
880 be760ba8 Michael Hanselmann
881 be760ba8 Michael Hanselmann
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
882 be760ba8 Michael Hanselmann
883 be760ba8 Michael Hanselmann
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_SUCCESS)
884 be760ba8 Michael Hanselmann
    self.assertEqual(job.GetInfo(["opresult"]),
885 be760ba8 Michael Hanselmann
                     [[op.input.result for op in job.ops]])
886 be760ba8 Michael Hanselmann
887 be760ba8 Michael Hanselmann
    logmsgcount = sum(len(m) for m in messages.values())
888 be760ba8 Michael Hanselmann
889 be760ba8 Michael Hanselmann
    self._CheckLogMessages(job, logmsgcount)
890 be760ba8 Michael Hanselmann
891 be760ba8 Michael Hanselmann
    # Serialize and restore (simulates program restart)
892 be760ba8 Michael Hanselmann
    newjob = jqueue._QueuedJob.Restore(queue, job.Serialize())
893 be760ba8 Michael Hanselmann
    self._CheckLogMessages(newjob, logmsgcount)
894 be760ba8 Michael Hanselmann
895 be760ba8 Michael Hanselmann
    # Check each message
896 be760ba8 Michael Hanselmann
    prevserial = -1
897 be760ba8 Michael Hanselmann
    for idx, oplog in enumerate(job.GetInfo(["oplog"])[0]):
898 be760ba8 Michael Hanselmann
      for (serial, timestamp, log_type, msg) in oplog:
899 be760ba8 Michael Hanselmann
        (exptype, expmsg) = messages.get(idx).pop(0)
900 be760ba8 Michael Hanselmann
        if exptype:
901 be760ba8 Michael Hanselmann
          self.assertEqual(log_type, exptype)
902 be760ba8 Michael Hanselmann
        else:
903 be760ba8 Michael Hanselmann
          self.assertEqual(log_type, constants.ELOG_MESSAGE)
904 be760ba8 Michael Hanselmann
        self.assertEqual(expmsg, msg)
905 be760ba8 Michael Hanselmann
        self.assert_(serial > prevserial)
906 be760ba8 Michael Hanselmann
        prevserial = serial
907 be760ba8 Michael Hanselmann
908 be760ba8 Michael Hanselmann
  def _CheckLogMessages(self, job, count):
909 be760ba8 Michael Hanselmann
    # Check serial
910 be760ba8 Michael Hanselmann
    self.assertEqual(job.log_serial, count)
911 be760ba8 Michael Hanselmann
912 be760ba8 Michael Hanselmann
    # No filter
913 be760ba8 Michael Hanselmann
    self.assertEqual(job.GetLogEntries(None),
914 be760ba8 Michael Hanselmann
                     [entry for entries in job.GetInfo(["oplog"])[0] if entries
915 be760ba8 Michael Hanselmann
                      for entry in entries])
916 be760ba8 Michael Hanselmann
917 be760ba8 Michael Hanselmann
    # Filter with serial
918 be760ba8 Michael Hanselmann
    assert count > 3
919 be760ba8 Michael Hanselmann
    self.assert_(job.GetLogEntries(3))
920 be760ba8 Michael Hanselmann
    self.assertEqual(job.GetLogEntries(3),
921 be760ba8 Michael Hanselmann
                     [entry for entries in job.GetInfo(["oplog"])[0] if entries
922 be760ba8 Michael Hanselmann
                      for entry in entries][3:])
923 be760ba8 Michael Hanselmann
924 be760ba8 Michael Hanselmann
    # No log message after highest serial
925 be760ba8 Michael Hanselmann
    self.assertFalse(job.GetLogEntries(count))
926 be760ba8 Michael Hanselmann
    self.assertFalse(job.GetLogEntries(count + 3))
927 be760ba8 Michael Hanselmann
928 be760ba8 Michael Hanselmann
929 26d3fd2f Michael Hanselmann
class _FakeTimeoutStrategy:
930 26d3fd2f Michael Hanselmann
  def __init__(self, timeouts):
931 26d3fd2f Michael Hanselmann
    self.timeouts = timeouts
932 26d3fd2f Michael Hanselmann
    self.attempts = 0
933 26d3fd2f Michael Hanselmann
    self.last_timeout = None
934 26d3fd2f Michael Hanselmann
935 26d3fd2f Michael Hanselmann
  def NextAttempt(self):
936 26d3fd2f Michael Hanselmann
    self.attempts += 1
937 26d3fd2f Michael Hanselmann
    if self.timeouts:
938 26d3fd2f Michael Hanselmann
      timeout = self.timeouts.pop(0)
939 26d3fd2f Michael Hanselmann
    else:
940 26d3fd2f Michael Hanselmann
      timeout = None
941 26d3fd2f Michael Hanselmann
    self.last_timeout = timeout
942 26d3fd2f Michael Hanselmann
    return timeout
943 26d3fd2f Michael Hanselmann
944 26d3fd2f Michael Hanselmann
945 26d3fd2f Michael Hanselmann
class TestJobProcessorTimeouts(unittest.TestCase, _JobProcessorTestUtils):
946 26d3fd2f Michael Hanselmann
  def setUp(self):
947 26d3fd2f Michael Hanselmann
    self.queue = _FakeQueueForProc()
948 26d3fd2f Michael Hanselmann
    self.job = None
949 26d3fd2f Michael Hanselmann
    self.curop = None
950 26d3fd2f Michael Hanselmann
    self.opcounter = None
951 26d3fd2f Michael Hanselmann
    self.timeout_strategy = None
952 26d3fd2f Michael Hanselmann
    self.retries = 0
953 26d3fd2f Michael Hanselmann
    self.prev_tsop = None
954 26d3fd2f Michael Hanselmann
    self.prev_prio = None
955 26d3fd2f Michael Hanselmann
    self.gave_lock = None
956 26d3fd2f Michael Hanselmann
    self.done_lock_before_blocking = False
957 26d3fd2f Michael Hanselmann
958 26d3fd2f Michael Hanselmann
  def _BeforeStart(self, timeout):
959 26d3fd2f Michael Hanselmann
    job = self.job
960 26d3fd2f Michael Hanselmann
961 26d3fd2f Michael Hanselmann
    self.assertFalse(self.queue.IsAcquired())
962 26d3fd2f Michael Hanselmann
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITLOCK)
963 26d3fd2f Michael Hanselmann
964 26d3fd2f Michael Hanselmann
    ts = self.timeout_strategy
965 26d3fd2f Michael Hanselmann
966 26d3fd2f Michael Hanselmann
    self.assert_(timeout is None or isinstance(timeout, (int, float)))
967 26d3fd2f Michael Hanselmann
    self.assertEqual(timeout, ts.last_timeout)
968 26d3fd2f Michael Hanselmann
969 26d3fd2f Michael Hanselmann
    self.gave_lock = True
970 26d3fd2f Michael Hanselmann
971 26d3fd2f Michael Hanselmann
    if (self.curop == 3 and
972 26d3fd2f Michael Hanselmann
        job.ops[self.curop].priority == constants.OP_PRIO_HIGHEST + 3):
973 26d3fd2f Michael Hanselmann
      # Give locks before running into blocking acquire
974 26d3fd2f Michael Hanselmann
      assert self.retries == 7
975 26d3fd2f Michael Hanselmann
      self.retries = 0
976 26d3fd2f Michael Hanselmann
      self.done_lock_before_blocking = True
977 26d3fd2f Michael Hanselmann
      return
978 26d3fd2f Michael Hanselmann
979 26d3fd2f Michael Hanselmann
    if self.retries > 0:
980 26d3fd2f Michael Hanselmann
      self.assert_(timeout is not None)
981 26d3fd2f Michael Hanselmann
      self.retries -= 1
982 26d3fd2f Michael Hanselmann
      self.gave_lock = False
983 26d3fd2f Michael Hanselmann
      raise mcpu.LockAcquireTimeout()
984 26d3fd2f Michael Hanselmann
985 26d3fd2f Michael Hanselmann
    if job.ops[self.curop].priority == constants.OP_PRIO_HIGHEST:
986 26d3fd2f Michael Hanselmann
      assert self.retries == 0, "Didn't exhaust all retries at highest priority"
987 26d3fd2f Michael Hanselmann
      assert not ts.timeouts
988 26d3fd2f Michael Hanselmann
      self.assert_(timeout is None)
989 26d3fd2f Michael Hanselmann
990 26d3fd2f Michael Hanselmann
  def _AfterStart(self, op, cbs):
991 26d3fd2f Michael Hanselmann
    job = self.job
992 26d3fd2f Michael Hanselmann
993 26d3fd2f Michael Hanselmann
    self.assertFalse(self.queue.IsAcquired())
994 26d3fd2f Michael Hanselmann
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
995 26d3fd2f Michael Hanselmann
996 26d3fd2f Michael Hanselmann
    # Job is running, cancelling shouldn't be possible
997 26d3fd2f Michael Hanselmann
    (success, _) = job.Cancel()
998 26d3fd2f Michael Hanselmann
    self.assertFalse(success)
999 26d3fd2f Michael Hanselmann
1000 26d3fd2f Michael Hanselmann
  def _NextOpcode(self):
1001 26d3fd2f Michael Hanselmann
    self.curop = self.opcounter.next()
1002 26d3fd2f Michael Hanselmann
    self.prev_prio = self.job.ops[self.curop].priority
1003 26d3fd2f Michael Hanselmann
1004 26d3fd2f Michael Hanselmann
  def _NewTimeoutStrategy(self):
1005 26d3fd2f Michael Hanselmann
    job = self.job
1006 26d3fd2f Michael Hanselmann
1007 26d3fd2f Michael Hanselmann
    self.assertEqual(self.retries, 0)
1008 26d3fd2f Michael Hanselmann
1009 26d3fd2f Michael Hanselmann
    if self.prev_tsop == self.curop:
1010 26d3fd2f Michael Hanselmann
      # Still on the same opcode, priority must've been increased
1011 26d3fd2f Michael Hanselmann
      self.assertEqual(self.prev_prio, job.ops[self.curop].priority + 1)
1012 26d3fd2f Michael Hanselmann
1013 26d3fd2f Michael Hanselmann
    if self.curop == 1:
1014 26d3fd2f Michael Hanselmann
      # Normal retry
1015 26d3fd2f Michael Hanselmann
      timeouts = range(10, 31, 10)
1016 26d3fd2f Michael Hanselmann
      self.retries = len(timeouts) - 1
1017 26d3fd2f Michael Hanselmann
1018 26d3fd2f Michael Hanselmann
    elif self.curop == 2:
1019 26d3fd2f Michael Hanselmann
      # Let this run into a blocking acquire
1020 26d3fd2f Michael Hanselmann
      timeouts = range(11, 61, 12)
1021 26d3fd2f Michael Hanselmann
      self.retries = len(timeouts)
1022 26d3fd2f Michael Hanselmann
1023 26d3fd2f Michael Hanselmann
    elif self.curop == 3:
1024 26d3fd2f Michael Hanselmann
      # Wait for priority to increase, but give lock before blocking acquire
1025 26d3fd2f Michael Hanselmann
      timeouts = range(12, 100, 14)
1026 26d3fd2f Michael Hanselmann
      self.retries = len(timeouts)
1027 26d3fd2f Michael Hanselmann
1028 26d3fd2f Michael Hanselmann
      self.assertFalse(self.done_lock_before_blocking)
1029 26d3fd2f Michael Hanselmann
1030 26d3fd2f Michael Hanselmann
    elif self.curop == 4:
1031 26d3fd2f Michael Hanselmann
      self.assert_(self.done_lock_before_blocking)
1032 26d3fd2f Michael Hanselmann
1033 26d3fd2f Michael Hanselmann
      # Timeouts, but no need to retry
1034 26d3fd2f Michael Hanselmann
      timeouts = range(10, 31, 10)
1035 26d3fd2f Michael Hanselmann
      self.retries = 0
1036 26d3fd2f Michael Hanselmann
1037 26d3fd2f Michael Hanselmann
    elif self.curop == 5:
1038 26d3fd2f Michael Hanselmann
      # Normal retry
1039 26d3fd2f Michael Hanselmann
      timeouts = range(19, 100, 11)
1040 26d3fd2f Michael Hanselmann
      self.retries = len(timeouts)
1041 26d3fd2f Michael Hanselmann
1042 26d3fd2f Michael Hanselmann
    else:
1043 26d3fd2f Michael Hanselmann
      timeouts = []
1044 26d3fd2f Michael Hanselmann
      self.retries = 0
1045 26d3fd2f Michael Hanselmann
1046 26d3fd2f Michael Hanselmann
    assert len(job.ops) == 10
1047 26d3fd2f Michael Hanselmann
    assert self.retries <= len(timeouts)
1048 26d3fd2f Michael Hanselmann
1049 26d3fd2f Michael Hanselmann
    ts = _FakeTimeoutStrategy(timeouts)
1050 26d3fd2f Michael Hanselmann
1051 26d3fd2f Michael Hanselmann
    self.timeout_strategy = ts
1052 26d3fd2f Michael Hanselmann
    self.prev_tsop = self.curop
1053 26d3fd2f Michael Hanselmann
    self.prev_prio = job.ops[self.curop].priority
1054 26d3fd2f Michael Hanselmann
1055 26d3fd2f Michael Hanselmann
    return ts
1056 26d3fd2f Michael Hanselmann
1057 26d3fd2f Michael Hanselmann
  def testTimeout(self):
1058 26d3fd2f Michael Hanselmann
    ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
1059 26d3fd2f Michael Hanselmann
           for i in range(10)]
1060 26d3fd2f Michael Hanselmann
1061 26d3fd2f Michael Hanselmann
    # Create job
1062 26d3fd2f Michael Hanselmann
    job_id = 15801
1063 26d3fd2f Michael Hanselmann
    job = self._CreateJob(self.queue, job_id, ops)
1064 26d3fd2f Michael Hanselmann
    self.job = job
1065 26d3fd2f Michael Hanselmann
1066 26d3fd2f Michael Hanselmann
    self.opcounter = itertools.count(0)
1067 26d3fd2f Michael Hanselmann
1068 26d3fd2f Michael Hanselmann
    opexec = _FakeExecOpCodeForProc(self._BeforeStart, self._AfterStart)
1069 26d3fd2f Michael Hanselmann
    tsf = self._NewTimeoutStrategy
1070 26d3fd2f Michael Hanselmann
1071 26d3fd2f Michael Hanselmann
    self.assertFalse(self.done_lock_before_blocking)
1072 26d3fd2f Michael Hanselmann
1073 26d3fd2f Michael Hanselmann
    for i in itertools.count(0):
1074 26d3fd2f Michael Hanselmann
      proc = jqueue._JobProcessor(self.queue, opexec, job,
1075 26d3fd2f Michael Hanselmann
                                  _timeout_strategy_factory=tsf)
1076 26d3fd2f Michael Hanselmann
1077 26d3fd2f Michael Hanselmann
      result = proc(_nextop_fn=self._NextOpcode)
1078 26d3fd2f Michael Hanselmann
      if result:
1079 26d3fd2f Michael Hanselmann
        self.assertFalse(job.cur_opctx)
1080 26d3fd2f Michael Hanselmann
        break
1081 26d3fd2f Michael Hanselmann
1082 26d3fd2f Michael Hanselmann
      self.assertFalse(result)
1083 26d3fd2f Michael Hanselmann
1084 26d3fd2f Michael Hanselmann
      if self.gave_lock:
1085 26d3fd2f Michael Hanselmann
        self.assertFalse(job.cur_opctx)
1086 26d3fd2f Michael Hanselmann
      else:
1087 26d3fd2f Michael Hanselmann
        self.assert_(job.cur_opctx)
1088 26d3fd2f Michael Hanselmann
        self.assertEqual(job.cur_opctx._timeout_strategy._fn,
1089 26d3fd2f Michael Hanselmann
                         self.timeout_strategy.NextAttempt)
1090 26d3fd2f Michael Hanselmann
1091 26d3fd2f Michael Hanselmann
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1092 26d3fd2f Michael Hanselmann
      self.assert_(job.start_timestamp)
1093 26d3fd2f Michael Hanselmann
      self.assertFalse(job.end_timestamp)
1094 26d3fd2f Michael Hanselmann
1095 26d3fd2f Michael Hanselmann
    self.assertEqual(self.curop, len(job.ops) - 1)
1096 26d3fd2f Michael Hanselmann
    self.assertEqual(self.job, job)
1097 26d3fd2f Michael Hanselmann
    self.assertEqual(self.opcounter.next(), len(job.ops))
1098 26d3fd2f Michael Hanselmann
    self.assert_(self.done_lock_before_blocking)
1099 26d3fd2f Michael Hanselmann
1100 26d3fd2f Michael Hanselmann
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_SUCCESS)
1101 26d3fd2f Michael Hanselmann
    self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_SUCCESS])
1102 26d3fd2f Michael Hanselmann
    self.assertEqual(job.GetInfo(["opresult"]),
1103 26d3fd2f Michael Hanselmann
                     [[op.input.result for op in job.ops]])
1104 26d3fd2f Michael Hanselmann
    self.assertEqual(job.GetInfo(["opstatus"]),
1105 26d3fd2f Michael Hanselmann
                     [len(job.ops) * [constants.OP_STATUS_SUCCESS]])
1106 26d3fd2f Michael Hanselmann
    self.assert_(compat.all(op.start_timestamp and op.end_timestamp
1107 26d3fd2f Michael Hanselmann
                            for op in job.ops))
1108 26d3fd2f Michael Hanselmann
1109 26d3fd2f Michael Hanselmann
    # Finished jobs can't be processed any further
1110 26d3fd2f Michael Hanselmann
    self.assertRaises(errors.ProgrammerError,
1111 26d3fd2f Michael Hanselmann
                      jqueue._JobProcessor(self.queue, opexec, job))
1112 26d3fd2f Michael Hanselmann
1113 26d3fd2f Michael Hanselmann
1114 989a8bee Michael Hanselmann
if __name__ == "__main__":
1115 989a8bee Michael Hanselmann
  testutils.GanetiTestProgram()