Statistics
| Branch: | Tag: | Revision:

root / test / ganeti.jqueue_unittest.py @ 7578ab0a

History | View | Annotate | Download (46.8 kB)

1 989a8bee Michael Hanselmann
#!/usr/bin/python
2 989a8bee Michael Hanselmann
#
3 989a8bee Michael Hanselmann
4 989a8bee Michael Hanselmann
# Copyright (C) 2010 Google Inc.
5 989a8bee Michael Hanselmann
#
6 989a8bee Michael Hanselmann
# This program is free software; you can redistribute it and/or modify
7 989a8bee Michael Hanselmann
# it under the terms of the GNU General Public License as published by
8 989a8bee Michael Hanselmann
# the Free Software Foundation; either version 2 of the License, or
9 989a8bee Michael Hanselmann
# (at your option) any later version.
10 989a8bee Michael Hanselmann
#
11 989a8bee Michael Hanselmann
# This program is distributed in the hope that it will be useful, but
12 989a8bee Michael Hanselmann
# WITHOUT ANY WARRANTY; without even the implied warranty of
13 989a8bee Michael Hanselmann
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 989a8bee Michael Hanselmann
# General Public License for more details.
15 989a8bee Michael Hanselmann
#
16 989a8bee Michael Hanselmann
# You should have received a copy of the GNU General Public License
17 989a8bee Michael Hanselmann
# along with this program; if not, write to the Free Software
18 989a8bee Michael Hanselmann
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 989a8bee Michael Hanselmann
# 02110-1301, USA.
20 989a8bee Michael Hanselmann
21 989a8bee Michael Hanselmann
22 989a8bee Michael Hanselmann
"""Script for testing ganeti.jqueue"""
23 989a8bee Michael Hanselmann
24 989a8bee Michael Hanselmann
import os
25 989a8bee Michael Hanselmann
import sys
26 989a8bee Michael Hanselmann
import unittest
27 989a8bee Michael Hanselmann
import tempfile
28 989a8bee Michael Hanselmann
import shutil
29 989a8bee Michael Hanselmann
import errno
30 26d3fd2f Michael Hanselmann
import itertools
31 989a8bee Michael Hanselmann
32 989a8bee Michael Hanselmann
from ganeti import constants
33 989a8bee Michael Hanselmann
from ganeti import utils
34 989a8bee Michael Hanselmann
from ganeti import errors
35 989a8bee Michael Hanselmann
from ganeti import jqueue
36 8f5c488d Michael Hanselmann
from ganeti import opcodes
37 8f5c488d Michael Hanselmann
from ganeti import compat
38 26d3fd2f Michael Hanselmann
from ganeti import mcpu
39 989a8bee Michael Hanselmann
40 989a8bee Michael Hanselmann
import testutils
41 989a8bee Michael Hanselmann
42 989a8bee Michael Hanselmann
43 989a8bee Michael Hanselmann
class _FakeJob:
44 989a8bee Michael Hanselmann
  def __init__(self, job_id, status):
45 989a8bee Michael Hanselmann
    self.id = job_id
46 989a8bee Michael Hanselmann
    self._status = status
47 989a8bee Michael Hanselmann
    self._log = []
48 989a8bee Michael Hanselmann
49 989a8bee Michael Hanselmann
  def SetStatus(self, status):
50 989a8bee Michael Hanselmann
    self._status = status
51 989a8bee Michael Hanselmann
52 989a8bee Michael Hanselmann
  def AddLogEntry(self, msg):
53 989a8bee Michael Hanselmann
    self._log.append((len(self._log), msg))
54 989a8bee Michael Hanselmann
55 989a8bee Michael Hanselmann
  def CalcStatus(self):
56 989a8bee Michael Hanselmann
    return self._status
57 989a8bee Michael Hanselmann
58 989a8bee Michael Hanselmann
  def GetInfo(self, fields):
59 989a8bee Michael Hanselmann
    result = []
60 989a8bee Michael Hanselmann
61 989a8bee Michael Hanselmann
    for name in fields:
62 989a8bee Michael Hanselmann
      if name == "status":
63 989a8bee Michael Hanselmann
        result.append(self._status)
64 989a8bee Michael Hanselmann
      else:
65 989a8bee Michael Hanselmann
        raise Exception("Unknown field")
66 989a8bee Michael Hanselmann
67 989a8bee Michael Hanselmann
    return result
68 989a8bee Michael Hanselmann
69 989a8bee Michael Hanselmann
  def GetLogEntries(self, newer_than):
70 989a8bee Michael Hanselmann
    assert newer_than is None or newer_than >= 0
71 989a8bee Michael Hanselmann
72 989a8bee Michael Hanselmann
    if newer_than is None:
73 989a8bee Michael Hanselmann
      return self._log
74 989a8bee Michael Hanselmann
75 989a8bee Michael Hanselmann
    return self._log[newer_than:]
76 989a8bee Michael Hanselmann
77 989a8bee Michael Hanselmann
78 989a8bee Michael Hanselmann
class TestJobChangesChecker(unittest.TestCase):
79 989a8bee Michael Hanselmann
  def testStatus(self):
80 989a8bee Michael Hanselmann
    job = _FakeJob(9094, constants.JOB_STATUS_QUEUED)
81 989a8bee Michael Hanselmann
    checker = jqueue._JobChangesChecker(["status"], None, None)
82 989a8bee Michael Hanselmann
    self.assertEqual(checker(job), ([constants.JOB_STATUS_QUEUED], []))
83 989a8bee Michael Hanselmann
84 989a8bee Michael Hanselmann
    job.SetStatus(constants.JOB_STATUS_RUNNING)
85 989a8bee Michael Hanselmann
    self.assertEqual(checker(job), ([constants.JOB_STATUS_RUNNING], []))
86 989a8bee Michael Hanselmann
87 989a8bee Michael Hanselmann
    job.SetStatus(constants.JOB_STATUS_SUCCESS)
88 989a8bee Michael Hanselmann
    self.assertEqual(checker(job), ([constants.JOB_STATUS_SUCCESS], []))
89 989a8bee Michael Hanselmann
90 989a8bee Michael Hanselmann
    # job.id is used by checker
91 989a8bee Michael Hanselmann
    self.assertEqual(job.id, 9094)
92 989a8bee Michael Hanselmann
93 989a8bee Michael Hanselmann
  def testStatusWithPrev(self):
94 989a8bee Michael Hanselmann
    job = _FakeJob(12807, constants.JOB_STATUS_QUEUED)
95 989a8bee Michael Hanselmann
    checker = jqueue._JobChangesChecker(["status"],
96 989a8bee Michael Hanselmann
                                        [constants.JOB_STATUS_QUEUED], None)
97 989a8bee Michael Hanselmann
    self.assert_(checker(job) is None)
98 989a8bee Michael Hanselmann
99 989a8bee Michael Hanselmann
    job.SetStatus(constants.JOB_STATUS_RUNNING)
100 989a8bee Michael Hanselmann
    self.assertEqual(checker(job), ([constants.JOB_STATUS_RUNNING], []))
101 989a8bee Michael Hanselmann
102 989a8bee Michael Hanselmann
  def testFinalStatus(self):
103 989a8bee Michael Hanselmann
    for status in constants.JOBS_FINALIZED:
104 989a8bee Michael Hanselmann
      job = _FakeJob(2178711, status)
105 989a8bee Michael Hanselmann
      checker = jqueue._JobChangesChecker(["status"], [status], None)
106 989a8bee Michael Hanselmann
      # There won't be any changes in this status, hence it should signal
107 989a8bee Michael Hanselmann
      # a change immediately
108 989a8bee Michael Hanselmann
      self.assertEqual(checker(job), ([status], []))
109 989a8bee Michael Hanselmann
110 989a8bee Michael Hanselmann
  def testLog(self):
111 989a8bee Michael Hanselmann
    job = _FakeJob(9094, constants.JOB_STATUS_RUNNING)
112 989a8bee Michael Hanselmann
    checker = jqueue._JobChangesChecker(["status"], None, None)
113 989a8bee Michael Hanselmann
    self.assertEqual(checker(job), ([constants.JOB_STATUS_RUNNING], []))
114 989a8bee Michael Hanselmann
115 989a8bee Michael Hanselmann
    job.AddLogEntry("Hello World")
116 989a8bee Michael Hanselmann
    (job_info, log_entries) = checker(job)
117 989a8bee Michael Hanselmann
    self.assertEqual(job_info, [constants.JOB_STATUS_RUNNING])
118 989a8bee Michael Hanselmann
    self.assertEqual(log_entries, [[0, "Hello World"]])
119 989a8bee Michael Hanselmann
120 989a8bee Michael Hanselmann
    checker2 = jqueue._JobChangesChecker(["status"], job_info, len(log_entries))
121 989a8bee Michael Hanselmann
    self.assert_(checker2(job) is None)
122 989a8bee Michael Hanselmann
123 989a8bee Michael Hanselmann
    job.AddLogEntry("Foo Bar")
124 989a8bee Michael Hanselmann
    job.SetStatus(constants.JOB_STATUS_ERROR)
125 989a8bee Michael Hanselmann
126 989a8bee Michael Hanselmann
    (job_info, log_entries) = checker2(job)
127 989a8bee Michael Hanselmann
    self.assertEqual(job_info, [constants.JOB_STATUS_ERROR])
128 989a8bee Michael Hanselmann
    self.assertEqual(log_entries, [[1, "Foo Bar"]])
129 989a8bee Michael Hanselmann
130 989a8bee Michael Hanselmann
    checker3 = jqueue._JobChangesChecker(["status"], None, None)
131 989a8bee Michael Hanselmann
    (job_info, log_entries) = checker3(job)
132 989a8bee Michael Hanselmann
    self.assertEqual(job_info, [constants.JOB_STATUS_ERROR])
133 989a8bee Michael Hanselmann
    self.assertEqual(log_entries, [[0, "Hello World"], [1, "Foo Bar"]])
134 989a8bee Michael Hanselmann
135 989a8bee Michael Hanselmann
136 989a8bee Michael Hanselmann
class TestJobChangesWaiter(unittest.TestCase):
137 989a8bee Michael Hanselmann
  def setUp(self):
138 989a8bee Michael Hanselmann
    self.tmpdir = tempfile.mkdtemp()
139 989a8bee Michael Hanselmann
    self.filename = utils.PathJoin(self.tmpdir, "job-1")
140 989a8bee Michael Hanselmann
    utils.WriteFile(self.filename, data="")
141 989a8bee Michael Hanselmann
142 989a8bee Michael Hanselmann
  def tearDown(self):
143 989a8bee Michael Hanselmann
    shutil.rmtree(self.tmpdir)
144 989a8bee Michael Hanselmann
145 989a8bee Michael Hanselmann
  def _EnsureNotifierClosed(self, notifier):
146 989a8bee Michael Hanselmann
    try:
147 989a8bee Michael Hanselmann
      os.fstat(notifier._fd)
148 989a8bee Michael Hanselmann
    except EnvironmentError, err:
149 989a8bee Michael Hanselmann
      self.assertEqual(err.errno, errno.EBADF)
150 989a8bee Michael Hanselmann
    else:
151 989a8bee Michael Hanselmann
      self.fail("File descriptor wasn't closed")
152 989a8bee Michael Hanselmann
153 989a8bee Michael Hanselmann
  def testClose(self):
154 989a8bee Michael Hanselmann
    for wait in [False, True]:
155 989a8bee Michael Hanselmann
      waiter = jqueue._JobFileChangesWaiter(self.filename)
156 989a8bee Michael Hanselmann
      try:
157 989a8bee Michael Hanselmann
        if wait:
158 989a8bee Michael Hanselmann
          waiter.Wait(0.001)
159 989a8bee Michael Hanselmann
      finally:
160 989a8bee Michael Hanselmann
        waiter.Close()
161 989a8bee Michael Hanselmann
162 989a8bee Michael Hanselmann
      # Ensure file descriptor was closed
163 989a8bee Michael Hanselmann
      self._EnsureNotifierClosed(waiter._notifier)
164 989a8bee Michael Hanselmann
165 989a8bee Michael Hanselmann
  def testChangingFile(self):
166 989a8bee Michael Hanselmann
    waiter = jqueue._JobFileChangesWaiter(self.filename)
167 989a8bee Michael Hanselmann
    try:
168 989a8bee Michael Hanselmann
      self.assertFalse(waiter.Wait(0.1))
169 989a8bee Michael Hanselmann
      utils.WriteFile(self.filename, data="changed")
170 989a8bee Michael Hanselmann
      self.assert_(waiter.Wait(60))
171 989a8bee Michael Hanselmann
    finally:
172 989a8bee Michael Hanselmann
      waiter.Close()
173 989a8bee Michael Hanselmann
174 989a8bee Michael Hanselmann
    self._EnsureNotifierClosed(waiter._notifier)
175 989a8bee Michael Hanselmann
176 989a8bee Michael Hanselmann
  def testChangingFile2(self):
177 989a8bee Michael Hanselmann
    waiter = jqueue._JobChangesWaiter(self.filename)
178 989a8bee Michael Hanselmann
    try:
179 989a8bee Michael Hanselmann
      self.assertFalse(waiter._filewaiter)
180 989a8bee Michael Hanselmann
      self.assert_(waiter.Wait(0.1))
181 989a8bee Michael Hanselmann
      self.assert_(waiter._filewaiter)
182 989a8bee Michael Hanselmann
183 989a8bee Michael Hanselmann
      # File waiter is now used, but there have been no changes
184 989a8bee Michael Hanselmann
      self.assertFalse(waiter.Wait(0.1))
185 989a8bee Michael Hanselmann
      utils.WriteFile(self.filename, data="changed")
186 989a8bee Michael Hanselmann
      self.assert_(waiter.Wait(60))
187 989a8bee Michael Hanselmann
    finally:
188 989a8bee Michael Hanselmann
      waiter.Close()
189 989a8bee Michael Hanselmann
190 989a8bee Michael Hanselmann
    self._EnsureNotifierClosed(waiter._filewaiter._notifier)
191 989a8bee Michael Hanselmann
192 989a8bee Michael Hanselmann
193 989a8bee Michael Hanselmann
class TestWaitForJobChangesHelper(unittest.TestCase):
194 989a8bee Michael Hanselmann
  def setUp(self):
195 989a8bee Michael Hanselmann
    self.tmpdir = tempfile.mkdtemp()
196 989a8bee Michael Hanselmann
    self.filename = utils.PathJoin(self.tmpdir, "job-2614226563")
197 989a8bee Michael Hanselmann
    utils.WriteFile(self.filename, data="")
198 989a8bee Michael Hanselmann
199 989a8bee Michael Hanselmann
  def tearDown(self):
200 989a8bee Michael Hanselmann
    shutil.rmtree(self.tmpdir)
201 989a8bee Michael Hanselmann
202 989a8bee Michael Hanselmann
  def _LoadWaitingJob(self):
203 989a8bee Michael Hanselmann
    return _FakeJob(2614226563, constants.JOB_STATUS_WAITLOCK)
204 989a8bee Michael Hanselmann
205 989a8bee Michael Hanselmann
  def _LoadLostJob(self):
206 989a8bee Michael Hanselmann
    return None
207 989a8bee Michael Hanselmann
208 989a8bee Michael Hanselmann
  def testNoChanges(self):
209 989a8bee Michael Hanselmann
    wfjc = jqueue._WaitForJobChangesHelper()
210 989a8bee Michael Hanselmann
211 989a8bee Michael Hanselmann
    # No change
212 989a8bee Michael Hanselmann
    self.assertEqual(wfjc(self.filename, self._LoadWaitingJob, ["status"],
213 989a8bee Michael Hanselmann
                          [constants.JOB_STATUS_WAITLOCK], None, 0.1),
214 989a8bee Michael Hanselmann
                     constants.JOB_NOTCHANGED)
215 989a8bee Michael Hanselmann
216 989a8bee Michael Hanselmann
    # No previous information
217 989a8bee Michael Hanselmann
    self.assertEqual(wfjc(self.filename, self._LoadWaitingJob,
218 989a8bee Michael Hanselmann
                          ["status"], None, None, 1.0),
219 989a8bee Michael Hanselmann
                     ([constants.JOB_STATUS_WAITLOCK], []))
220 989a8bee Michael Hanselmann
221 989a8bee Michael Hanselmann
  def testLostJob(self):
222 989a8bee Michael Hanselmann
    wfjc = jqueue._WaitForJobChangesHelper()
223 989a8bee Michael Hanselmann
    self.assert_(wfjc(self.filename, self._LoadLostJob,
224 989a8bee Michael Hanselmann
                      ["status"], None, None, 1.0) is None)
225 989a8bee Michael Hanselmann
226 989a8bee Michael Hanselmann
227 6760e4ed Michael Hanselmann
class TestEncodeOpError(unittest.TestCase):
228 6760e4ed Michael Hanselmann
  def test(self):
229 6760e4ed Michael Hanselmann
    encerr = jqueue._EncodeOpError(errors.LockError("Test 1"))
230 6760e4ed Michael Hanselmann
    self.assert_(isinstance(encerr, tuple))
231 6760e4ed Michael Hanselmann
    self.assertRaises(errors.LockError, errors.MaybeRaise, encerr)
232 6760e4ed Michael Hanselmann
233 6760e4ed Michael Hanselmann
    encerr = jqueue._EncodeOpError(errors.GenericError("Test 2"))
234 6760e4ed Michael Hanselmann
    self.assert_(isinstance(encerr, tuple))
235 6760e4ed Michael Hanselmann
    self.assertRaises(errors.GenericError, errors.MaybeRaise, encerr)
236 6760e4ed Michael Hanselmann
237 6760e4ed Michael Hanselmann
    encerr = jqueue._EncodeOpError(NotImplementedError("Foo"))
238 6760e4ed Michael Hanselmann
    self.assert_(isinstance(encerr, tuple))
239 6760e4ed Michael Hanselmann
    self.assertRaises(errors.OpExecError, errors.MaybeRaise, encerr)
240 6760e4ed Michael Hanselmann
241 6760e4ed Michael Hanselmann
    encerr = jqueue._EncodeOpError("Hello World")
242 6760e4ed Michael Hanselmann
    self.assert_(isinstance(encerr, tuple))
243 6760e4ed Michael Hanselmann
    self.assertRaises(errors.OpExecError, errors.MaybeRaise, encerr)
244 6760e4ed Michael Hanselmann
245 6760e4ed Michael Hanselmann
246 8f5c488d Michael Hanselmann
class TestQueuedOpCode(unittest.TestCase):
247 8f5c488d Michael Hanselmann
  def testDefaults(self):
248 8f5c488d Michael Hanselmann
    def _Check(op):
249 8f5c488d Michael Hanselmann
      self.assertFalse(hasattr(op.input, "dry_run"))
250 8f5c488d Michael Hanselmann
      self.assertEqual(op.priority, constants.OP_PRIO_DEFAULT)
251 8f5c488d Michael Hanselmann
      self.assertFalse(op.log)
252 8f5c488d Michael Hanselmann
      self.assert_(op.start_timestamp is None)
253 8f5c488d Michael Hanselmann
      self.assert_(op.exec_timestamp is None)
254 8f5c488d Michael Hanselmann
      self.assert_(op.end_timestamp is None)
255 8f5c488d Michael Hanselmann
      self.assert_(op.result is None)
256 8f5c488d Michael Hanselmann
      self.assertEqual(op.status, constants.OP_STATUS_QUEUED)
257 8f5c488d Michael Hanselmann
258 8f5c488d Michael Hanselmann
    op1 = jqueue._QueuedOpCode(opcodes.OpTestDelay())
259 8f5c488d Michael Hanselmann
    _Check(op1)
260 8f5c488d Michael Hanselmann
    op2 = jqueue._QueuedOpCode.Restore(op1.Serialize())
261 8f5c488d Michael Hanselmann
    _Check(op2)
262 8f5c488d Michael Hanselmann
    self.assertEqual(op1.Serialize(), op2.Serialize())
263 8f5c488d Michael Hanselmann
264 8f5c488d Michael Hanselmann
  def testPriority(self):
265 8f5c488d Michael Hanselmann
    def _Check(op):
266 8f5c488d Michael Hanselmann
      assert constants.OP_PRIO_DEFAULT != constants.OP_PRIO_HIGH, \
267 8f5c488d Michael Hanselmann
             "Default priority equals high priority; test can't work"
268 8f5c488d Michael Hanselmann
      self.assertEqual(op.priority, constants.OP_PRIO_HIGH)
269 8f5c488d Michael Hanselmann
      self.assertEqual(op.status, constants.OP_STATUS_QUEUED)
270 8f5c488d Michael Hanselmann
271 c6afb1ca Iustin Pop
    inpop = opcodes.OpTagsGet(priority=constants.OP_PRIO_HIGH)
272 8f5c488d Michael Hanselmann
    op1 = jqueue._QueuedOpCode(inpop)
273 8f5c488d Michael Hanselmann
    _Check(op1)
274 8f5c488d Michael Hanselmann
    op2 = jqueue._QueuedOpCode.Restore(op1.Serialize())
275 8f5c488d Michael Hanselmann
    _Check(op2)
276 8f5c488d Michael Hanselmann
    self.assertEqual(op1.Serialize(), op2.Serialize())
277 8f5c488d Michael Hanselmann
278 8f5c488d Michael Hanselmann
279 8f5c488d Michael Hanselmann
class TestQueuedJob(unittest.TestCase):
280 5f6b0b71 Michael Hanselmann
  def test(self):
281 5f6b0b71 Michael Hanselmann
    self.assertRaises(errors.GenericError, jqueue._QueuedJob,
282 5f6b0b71 Michael Hanselmann
                      None, 1, [])
283 5f6b0b71 Michael Hanselmann
284 8f5c488d Michael Hanselmann
  def testDefaults(self):
285 8f5c488d Michael Hanselmann
    job_id = 4260
286 8f5c488d Michael Hanselmann
    ops = [
287 c6afb1ca Iustin Pop
      opcodes.OpTagsGet(),
288 8f5c488d Michael Hanselmann
      opcodes.OpTestDelay(),
289 8f5c488d Michael Hanselmann
      ]
290 8f5c488d Michael Hanselmann
291 8f5c488d Michael Hanselmann
    def _Check(job):
292 8f5c488d Michael Hanselmann
      self.assertEqual(job.id, job_id)
293 8f5c488d Michael Hanselmann
      self.assertEqual(job.log_serial, 0)
294 8f5c488d Michael Hanselmann
      self.assert_(job.received_timestamp)
295 8f5c488d Michael Hanselmann
      self.assert_(job.start_timestamp is None)
296 8f5c488d Michael Hanselmann
      self.assert_(job.end_timestamp is None)
297 8f5c488d Michael Hanselmann
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
298 8f5c488d Michael Hanselmann
      self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT)
299 8f5c488d Michael Hanselmann
      self.assert_(repr(job).startswith("<"))
300 8f5c488d Michael Hanselmann
      self.assertEqual(len(job.ops), len(ops))
301 8f5c488d Michael Hanselmann
      self.assert_(compat.all(inp.__getstate__() == op.input.__getstate__()
302 8f5c488d Michael Hanselmann
                              for (inp, op) in zip(ops, job.ops)))
303 5f6b0b71 Michael Hanselmann
      self.assertRaises(errors.OpExecError, job.GetInfo,
304 5f6b0b71 Michael Hanselmann
                        ["unknown-field"])
305 5f6b0b71 Michael Hanselmann
      self.assertEqual(job.GetInfo(["summary"]),
306 5f6b0b71 Michael Hanselmann
                       [[op.input.Summary() for op in job.ops]])
307 8f5c488d Michael Hanselmann
308 8f5c488d Michael Hanselmann
    job1 = jqueue._QueuedJob(None, job_id, ops)
309 8f5c488d Michael Hanselmann
    _Check(job1)
310 8f5c488d Michael Hanselmann
    job2 = jqueue._QueuedJob.Restore(None, job1.Serialize())
311 8f5c488d Michael Hanselmann
    _Check(job2)
312 8f5c488d Michael Hanselmann
    self.assertEqual(job1.Serialize(), job2.Serialize())
313 8f5c488d Michael Hanselmann
314 8f5c488d Michael Hanselmann
  def testPriority(self):
315 8f5c488d Michael Hanselmann
    job_id = 4283
316 8f5c488d Michael Hanselmann
    ops = [
317 c6afb1ca Iustin Pop
      opcodes.OpTagsGet(priority=constants.OP_PRIO_DEFAULT),
318 8f5c488d Michael Hanselmann
      opcodes.OpTestDelay(),
319 8f5c488d Michael Hanselmann
      ]
320 8f5c488d Michael Hanselmann
321 8f5c488d Michael Hanselmann
    def _Check(job):
322 8f5c488d Michael Hanselmann
      self.assertEqual(job.id, job_id)
323 8f5c488d Michael Hanselmann
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
324 8f5c488d Michael Hanselmann
      self.assert_(repr(job).startswith("<"))
325 8f5c488d Michael Hanselmann
326 8f5c488d Michael Hanselmann
    job = jqueue._QueuedJob(None, job_id, ops)
327 8f5c488d Michael Hanselmann
    _Check(job)
328 8f5c488d Michael Hanselmann
    self.assert_(compat.all(op.priority == constants.OP_PRIO_DEFAULT
329 8f5c488d Michael Hanselmann
                            for op in job.ops))
330 8f5c488d Michael Hanselmann
    self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT)
331 8f5c488d Michael Hanselmann
332 8f5c488d Michael Hanselmann
    # Increase first
333 8f5c488d Michael Hanselmann
    job.ops[0].priority -= 1
334 8f5c488d Michael Hanselmann
    _Check(job)
335 8f5c488d Michael Hanselmann
    self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT - 1)
336 8f5c488d Michael Hanselmann
337 8f5c488d Michael Hanselmann
    # Mark opcode as finished
338 8f5c488d Michael Hanselmann
    job.ops[0].status = constants.OP_STATUS_SUCCESS
339 8f5c488d Michael Hanselmann
    _Check(job)
340 8f5c488d Michael Hanselmann
    self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT)
341 8f5c488d Michael Hanselmann
342 8f5c488d Michael Hanselmann
    # Increase second
343 8f5c488d Michael Hanselmann
    job.ops[1].priority -= 10
344 8f5c488d Michael Hanselmann
    self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT - 10)
345 8f5c488d Michael Hanselmann
346 8f5c488d Michael Hanselmann
    # Test increasing first
347 8f5c488d Michael Hanselmann
    job.ops[0].status = constants.OP_STATUS_RUNNING
348 8f5c488d Michael Hanselmann
    job.ops[0].priority -= 19
349 8f5c488d Michael Hanselmann
    self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT - 20)
350 8f5c488d Michael Hanselmann
351 db5bce34 Michael Hanselmann
  def testCalcStatus(self):
352 db5bce34 Michael Hanselmann
    def _Queued(ops):
353 db5bce34 Michael Hanselmann
      # The default status is "queued"
354 db5bce34 Michael Hanselmann
      self.assert_(compat.all(op.status == constants.OP_STATUS_QUEUED
355 db5bce34 Michael Hanselmann
                              for op in ops))
356 db5bce34 Michael Hanselmann
357 db5bce34 Michael Hanselmann
    def _Waitlock1(ops):
358 db5bce34 Michael Hanselmann
      ops[0].status = constants.OP_STATUS_WAITLOCK
359 db5bce34 Michael Hanselmann
360 db5bce34 Michael Hanselmann
    def _Waitlock2(ops):
361 db5bce34 Michael Hanselmann
      ops[0].status = constants.OP_STATUS_SUCCESS
362 db5bce34 Michael Hanselmann
      ops[1].status = constants.OP_STATUS_SUCCESS
363 db5bce34 Michael Hanselmann
      ops[2].status = constants.OP_STATUS_WAITLOCK
364 db5bce34 Michael Hanselmann
365 db5bce34 Michael Hanselmann
    def _Running(ops):
366 db5bce34 Michael Hanselmann
      ops[0].status = constants.OP_STATUS_SUCCESS
367 db5bce34 Michael Hanselmann
      ops[1].status = constants.OP_STATUS_RUNNING
368 db5bce34 Michael Hanselmann
      for op in ops[2:]:
369 db5bce34 Michael Hanselmann
        op.status = constants.OP_STATUS_QUEUED
370 db5bce34 Michael Hanselmann
371 db5bce34 Michael Hanselmann
    def _Canceling1(ops):
372 db5bce34 Michael Hanselmann
      ops[0].status = constants.OP_STATUS_SUCCESS
373 db5bce34 Michael Hanselmann
      ops[1].status = constants.OP_STATUS_SUCCESS
374 db5bce34 Michael Hanselmann
      for op in ops[2:]:
375 db5bce34 Michael Hanselmann
        op.status = constants.OP_STATUS_CANCELING
376 db5bce34 Michael Hanselmann
377 db5bce34 Michael Hanselmann
    def _Canceling2(ops):
378 db5bce34 Michael Hanselmann
      for op in ops:
379 db5bce34 Michael Hanselmann
        op.status = constants.OP_STATUS_CANCELING
380 db5bce34 Michael Hanselmann
381 db5bce34 Michael Hanselmann
    def _Canceled(ops):
382 db5bce34 Michael Hanselmann
      for op in ops:
383 db5bce34 Michael Hanselmann
        op.status = constants.OP_STATUS_CANCELED
384 db5bce34 Michael Hanselmann
385 db5bce34 Michael Hanselmann
    def _Error1(ops):
386 db5bce34 Michael Hanselmann
      for idx, op in enumerate(ops):
387 db5bce34 Michael Hanselmann
        if idx > 3:
388 db5bce34 Michael Hanselmann
          op.status = constants.OP_STATUS_ERROR
389 db5bce34 Michael Hanselmann
        else:
390 db5bce34 Michael Hanselmann
          op.status = constants.OP_STATUS_SUCCESS
391 db5bce34 Michael Hanselmann
392 db5bce34 Michael Hanselmann
    def _Error2(ops):
393 db5bce34 Michael Hanselmann
      for op in ops:
394 db5bce34 Michael Hanselmann
        op.status = constants.OP_STATUS_ERROR
395 db5bce34 Michael Hanselmann
396 db5bce34 Michael Hanselmann
    def _Success(ops):
397 db5bce34 Michael Hanselmann
      for op in ops:
398 db5bce34 Michael Hanselmann
        op.status = constants.OP_STATUS_SUCCESS
399 db5bce34 Michael Hanselmann
400 db5bce34 Michael Hanselmann
    tests = {
401 db5bce34 Michael Hanselmann
      constants.JOB_STATUS_QUEUED: [_Queued],
402 db5bce34 Michael Hanselmann
      constants.JOB_STATUS_WAITLOCK: [_Waitlock1, _Waitlock2],
403 db5bce34 Michael Hanselmann
      constants.JOB_STATUS_RUNNING: [_Running],
404 db5bce34 Michael Hanselmann
      constants.JOB_STATUS_CANCELING: [_Canceling1, _Canceling2],
405 db5bce34 Michael Hanselmann
      constants.JOB_STATUS_CANCELED: [_Canceled],
406 db5bce34 Michael Hanselmann
      constants.JOB_STATUS_ERROR: [_Error1, _Error2],
407 db5bce34 Michael Hanselmann
      constants.JOB_STATUS_SUCCESS: [_Success],
408 db5bce34 Michael Hanselmann
      }
409 db5bce34 Michael Hanselmann
410 db5bce34 Michael Hanselmann
    def _NewJob():
411 db5bce34 Michael Hanselmann
      job = jqueue._QueuedJob(None, 1,
412 db5bce34 Michael Hanselmann
                              [opcodes.OpTestDelay() for _ in range(10)])
413 db5bce34 Michael Hanselmann
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
414 db5bce34 Michael Hanselmann
      self.assert_(compat.all(op.status == constants.OP_STATUS_QUEUED
415 db5bce34 Michael Hanselmann
                              for op in job.ops))
416 db5bce34 Michael Hanselmann
      return job
417 db5bce34 Michael Hanselmann
418 db5bce34 Michael Hanselmann
    for status in constants.JOB_STATUS_ALL:
419 db5bce34 Michael Hanselmann
      sttests = tests[status]
420 db5bce34 Michael Hanselmann
      assert sttests
421 db5bce34 Michael Hanselmann
      for fn in sttests:
422 db5bce34 Michael Hanselmann
        job = _NewJob()
423 db5bce34 Michael Hanselmann
        fn(job.ops)
424 db5bce34 Michael Hanselmann
        self.assertEqual(job.CalcStatus(), status)
425 db5bce34 Michael Hanselmann
426 8f5c488d Michael Hanselmann
427 be760ba8 Michael Hanselmann
class _FakeQueueForProc:
428 be760ba8 Michael Hanselmann
  def __init__(self):
429 be760ba8 Michael Hanselmann
    self._acquired = False
430 ebb2a2a3 Michael Hanselmann
    self._updates = []
431 6a373640 Michael Hanselmann
    self._submitted = []
432 6a373640 Michael Hanselmann
433 6a373640 Michael Hanselmann
    self._submit_count = itertools.count(1000)
434 be760ba8 Michael Hanselmann
435 be760ba8 Michael Hanselmann
  def IsAcquired(self):
436 be760ba8 Michael Hanselmann
    return self._acquired
437 be760ba8 Michael Hanselmann
438 ebb2a2a3 Michael Hanselmann
  def GetNextUpdate(self):
439 ebb2a2a3 Michael Hanselmann
    return self._updates.pop(0)
440 ebb2a2a3 Michael Hanselmann
441 6a373640 Michael Hanselmann
  def GetNextSubmittedJob(self):
442 6a373640 Michael Hanselmann
    return self._submitted.pop(0)
443 6a373640 Michael Hanselmann
444 be760ba8 Michael Hanselmann
  def acquire(self, shared=0):
445 be760ba8 Michael Hanselmann
    assert shared == 1
446 be760ba8 Michael Hanselmann
    self._acquired = True
447 be760ba8 Michael Hanselmann
448 be760ba8 Michael Hanselmann
  def release(self):
449 be760ba8 Michael Hanselmann
    assert self._acquired
450 be760ba8 Michael Hanselmann
    self._acquired = False
451 be760ba8 Michael Hanselmann
452 ebb2a2a3 Michael Hanselmann
  def UpdateJobUnlocked(self, job, replicate=True):
453 ebb2a2a3 Michael Hanselmann
    assert self._acquired, "Lock not acquired while updating job"
454 ebb2a2a3 Michael Hanselmann
    self._updates.append((job, bool(replicate)))
455 be760ba8 Michael Hanselmann
456 6a373640 Michael Hanselmann
  def SubmitManyJobs(self, jobs):
457 6a373640 Michael Hanselmann
    assert not self._acquired, "Lock acquired while submitting jobs"
458 6a373640 Michael Hanselmann
    job_ids = [self._submit_count.next() for _ in jobs]
459 6a373640 Michael Hanselmann
    self._submitted.extend(zip(job_ids, jobs))
460 6a373640 Michael Hanselmann
    return job_ids
461 6a373640 Michael Hanselmann
462 be760ba8 Michael Hanselmann
463 be760ba8 Michael Hanselmann
class _FakeExecOpCodeForProc:
464 ebb2a2a3 Michael Hanselmann
  def __init__(self, queue, before_start, after_start):
465 ebb2a2a3 Michael Hanselmann
    self._queue = queue
466 be760ba8 Michael Hanselmann
    self._before_start = before_start
467 be760ba8 Michael Hanselmann
    self._after_start = after_start
468 be760ba8 Michael Hanselmann
469 f23db633 Michael Hanselmann
  def __call__(self, op, cbs, timeout=None, priority=None):
470 be760ba8 Michael Hanselmann
    assert isinstance(op, opcodes.OpTestDummy)
471 ebb2a2a3 Michael Hanselmann
    assert not self._queue.IsAcquired(), \
472 ebb2a2a3 Michael Hanselmann
           "Queue lock not released when executing opcode"
473 be760ba8 Michael Hanselmann
474 be760ba8 Michael Hanselmann
    if self._before_start:
475 f23db633 Michael Hanselmann
      self._before_start(timeout, priority)
476 be760ba8 Michael Hanselmann
477 be760ba8 Michael Hanselmann
    cbs.NotifyStart()
478 be760ba8 Michael Hanselmann
479 be760ba8 Michael Hanselmann
    if self._after_start:
480 be760ba8 Michael Hanselmann
      self._after_start(op, cbs)
481 be760ba8 Michael Hanselmann
482 ebb2a2a3 Michael Hanselmann
    # Check again after the callbacks
483 ebb2a2a3 Michael Hanselmann
    assert not self._queue.IsAcquired()
484 ebb2a2a3 Michael Hanselmann
485 be760ba8 Michael Hanselmann
    if op.fail:
486 be760ba8 Michael Hanselmann
      raise errors.OpExecError("Error requested (%s)" % op.result)
487 be760ba8 Michael Hanselmann
488 6a373640 Michael Hanselmann
    if hasattr(op, "submit_jobs") and op.submit_jobs is not None:
489 6a373640 Michael Hanselmann
      return cbs.SubmitManyJobs(op.submit_jobs)
490 6a373640 Michael Hanselmann
491 be760ba8 Michael Hanselmann
    return op.result
492 be760ba8 Michael Hanselmann
493 be760ba8 Michael Hanselmann
494 26d3fd2f Michael Hanselmann
class _JobProcessorTestUtils:
495 be760ba8 Michael Hanselmann
  def _CreateJob(self, queue, job_id, ops):
496 be760ba8 Michael Hanselmann
    job = jqueue._QueuedJob(queue, job_id, ops)
497 be760ba8 Michael Hanselmann
    self.assertFalse(job.start_timestamp)
498 be760ba8 Michael Hanselmann
    self.assertFalse(job.end_timestamp)
499 be760ba8 Michael Hanselmann
    self.assertEqual(len(ops), len(job.ops))
500 be760ba8 Michael Hanselmann
    self.assert_(compat.all(op.input == inp
501 be760ba8 Michael Hanselmann
                            for (op, inp) in zip(job.ops, ops)))
502 be760ba8 Michael Hanselmann
    self.assertEqual(job.GetInfo(["ops"]), [[op.__getstate__() for op in ops]])
503 be760ba8 Michael Hanselmann
    return job
504 be760ba8 Michael Hanselmann
505 26d3fd2f Michael Hanselmann
506 26d3fd2f Michael Hanselmann
class TestJobProcessor(unittest.TestCase, _JobProcessorTestUtils):
507 be760ba8 Michael Hanselmann
  def _GenericCheckJob(self, job):
508 be760ba8 Michael Hanselmann
    assert compat.all(isinstance(op.input, opcodes.OpTestDummy)
509 be760ba8 Michael Hanselmann
                      for op in job.ops)
510 be760ba8 Michael Hanselmann
511 be760ba8 Michael Hanselmann
    self.assertEqual(job.GetInfo(["opstart", "opexec", "opend"]),
512 be760ba8 Michael Hanselmann
                     [[op.start_timestamp for op in job.ops],
513 be760ba8 Michael Hanselmann
                      [op.exec_timestamp for op in job.ops],
514 be760ba8 Michael Hanselmann
                      [op.end_timestamp for op in job.ops]])
515 be760ba8 Michael Hanselmann
    self.assertEqual(job.GetInfo(["received_ts", "start_ts", "end_ts"]),
516 be760ba8 Michael Hanselmann
                     [job.received_timestamp,
517 be760ba8 Michael Hanselmann
                      job.start_timestamp,
518 be760ba8 Michael Hanselmann
                      job.end_timestamp])
519 be760ba8 Michael Hanselmann
    self.assert_(job.start_timestamp)
520 be760ba8 Michael Hanselmann
    self.assert_(job.end_timestamp)
521 be760ba8 Michael Hanselmann
    self.assertEqual(job.start_timestamp, job.ops[0].start_timestamp)
522 be760ba8 Michael Hanselmann
523 be760ba8 Michael Hanselmann
  def testSuccess(self):
524 be760ba8 Michael Hanselmann
    queue = _FakeQueueForProc()
525 be760ba8 Michael Hanselmann
526 be760ba8 Michael Hanselmann
    for (job_id, opcount) in [(25351, 1), (6637, 3),
527 be760ba8 Michael Hanselmann
                              (24644, 10), (32207, 100)]:
528 be760ba8 Michael Hanselmann
      ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
529 be760ba8 Michael Hanselmann
             for i in range(opcount)]
530 be760ba8 Michael Hanselmann
531 be760ba8 Michael Hanselmann
      # Create job
532 be760ba8 Michael Hanselmann
      job = self._CreateJob(queue, job_id, ops)
533 be760ba8 Michael Hanselmann
534 f23db633 Michael Hanselmann
      def _BeforeStart(timeout, priority):
535 ebb2a2a3 Michael Hanselmann
        self.assertEqual(queue.GetNextUpdate(), (job, True))
536 ebb2a2a3 Michael Hanselmann
        self.assertRaises(IndexError, queue.GetNextUpdate)
537 be760ba8 Michael Hanselmann
        self.assertFalse(queue.IsAcquired())
538 be760ba8 Michael Hanselmann
        self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITLOCK)
539 5fd6b694 Michael Hanselmann
        self.assertFalse(job.cur_opctx)
540 be760ba8 Michael Hanselmann
541 be760ba8 Michael Hanselmann
      def _AfterStart(op, cbs):
542 ebb2a2a3 Michael Hanselmann
        self.assertEqual(queue.GetNextUpdate(), (job, True))
543 ebb2a2a3 Michael Hanselmann
        self.assertRaises(IndexError, queue.GetNextUpdate)
544 ebb2a2a3 Michael Hanselmann
545 be760ba8 Michael Hanselmann
        self.assertFalse(queue.IsAcquired())
546 be760ba8 Michael Hanselmann
        self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
547 5fd6b694 Michael Hanselmann
        self.assertFalse(job.cur_opctx)
548 be760ba8 Michael Hanselmann
549 be760ba8 Michael Hanselmann
        # Job is running, cancelling shouldn't be possible
550 be760ba8 Michael Hanselmann
        (success, _) = job.Cancel()
551 be760ba8 Michael Hanselmann
        self.assertFalse(success)
552 be760ba8 Michael Hanselmann
553 ebb2a2a3 Michael Hanselmann
      opexec = _FakeExecOpCodeForProc(queue, _BeforeStart, _AfterStart)
554 be760ba8 Michael Hanselmann
555 be760ba8 Michael Hanselmann
      for idx in range(len(ops)):
556 ebb2a2a3 Michael Hanselmann
        self.assertRaises(IndexError, queue.GetNextUpdate)
557 be760ba8 Michael Hanselmann
        result = jqueue._JobProcessor(queue, opexec, job)()
558 ebb2a2a3 Michael Hanselmann
        self.assertEqual(queue.GetNextUpdate(), (job, True))
559 ebb2a2a3 Michael Hanselmann
        self.assertRaises(IndexError, queue.GetNextUpdate)
560 be760ba8 Michael Hanselmann
        if idx == len(ops) - 1:
561 be760ba8 Michael Hanselmann
          # Last opcode
562 be760ba8 Michael Hanselmann
          self.assert_(result)
563 be760ba8 Michael Hanselmann
        else:
564 be760ba8 Michael Hanselmann
          self.assertFalse(result)
565 be760ba8 Michael Hanselmann
566 be760ba8 Michael Hanselmann
          self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
567 be760ba8 Michael Hanselmann
          self.assert_(job.start_timestamp)
568 be760ba8 Michael Hanselmann
          self.assertFalse(job.end_timestamp)
569 be760ba8 Michael Hanselmann
570 ebb2a2a3 Michael Hanselmann
      self.assertRaises(IndexError, queue.GetNextUpdate)
571 ebb2a2a3 Michael Hanselmann
572 be760ba8 Michael Hanselmann
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_SUCCESS)
573 be760ba8 Michael Hanselmann
      self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_SUCCESS])
574 be760ba8 Michael Hanselmann
      self.assertEqual(job.GetInfo(["opresult"]),
575 be760ba8 Michael Hanselmann
                       [[op.input.result for op in job.ops]])
576 be760ba8 Michael Hanselmann
      self.assertEqual(job.GetInfo(["opstatus"]),
577 be760ba8 Michael Hanselmann
                       [len(job.ops) * [constants.OP_STATUS_SUCCESS]])
578 be760ba8 Michael Hanselmann
      self.assert_(compat.all(op.start_timestamp and op.end_timestamp
579 be760ba8 Michael Hanselmann
                              for op in job.ops))
580 be760ba8 Michael Hanselmann
581 be760ba8 Michael Hanselmann
      self._GenericCheckJob(job)
582 be760ba8 Michael Hanselmann
583 be760ba8 Michael Hanselmann
      # Finished jobs can't be processed any further
584 be760ba8 Michael Hanselmann
      self.assertRaises(errors.ProgrammerError,
585 be760ba8 Michael Hanselmann
                        jqueue._JobProcessor(queue, opexec, job))
586 be760ba8 Michael Hanselmann
587 be760ba8 Michael Hanselmann
  def testOpcodeError(self):
588 be760ba8 Michael Hanselmann
    queue = _FakeQueueForProc()
589 be760ba8 Michael Hanselmann
590 be760ba8 Michael Hanselmann
    testdata = [
591 be760ba8 Michael Hanselmann
      (17077, 1, 0, 0),
592 be760ba8 Michael Hanselmann
      (1782, 5, 2, 2),
593 be760ba8 Michael Hanselmann
      (18179, 10, 9, 9),
594 be760ba8 Michael Hanselmann
      (4744, 10, 3, 8),
595 be760ba8 Michael Hanselmann
      (23816, 100, 39, 45),
596 be760ba8 Michael Hanselmann
      ]
597 be760ba8 Michael Hanselmann
598 be760ba8 Michael Hanselmann
    for (job_id, opcount, failfrom, failto) in testdata:
599 be760ba8 Michael Hanselmann
      # Prepare opcodes
600 be760ba8 Michael Hanselmann
      ops = [opcodes.OpTestDummy(result="Res%s" % i,
601 be760ba8 Michael Hanselmann
                                 fail=(failfrom <= i and
602 be760ba8 Michael Hanselmann
                                       i <= failto))
603 be760ba8 Michael Hanselmann
             for i in range(opcount)]
604 be760ba8 Michael Hanselmann
605 be760ba8 Michael Hanselmann
      # Create job
606 be760ba8 Michael Hanselmann
      job = self._CreateJob(queue, job_id, ops)
607 be760ba8 Michael Hanselmann
608 ebb2a2a3 Michael Hanselmann
      opexec = _FakeExecOpCodeForProc(queue, None, None)
609 be760ba8 Michael Hanselmann
610 be760ba8 Michael Hanselmann
      for idx in range(len(ops)):
611 ebb2a2a3 Michael Hanselmann
        self.assertRaises(IndexError, queue.GetNextUpdate)
612 be760ba8 Michael Hanselmann
        result = jqueue._JobProcessor(queue, opexec, job)()
613 ebb2a2a3 Michael Hanselmann
        # queued to waitlock
614 ebb2a2a3 Michael Hanselmann
        self.assertEqual(queue.GetNextUpdate(), (job, True))
615 ebb2a2a3 Michael Hanselmann
        # waitlock to running
616 ebb2a2a3 Michael Hanselmann
        self.assertEqual(queue.GetNextUpdate(), (job, True))
617 ebb2a2a3 Michael Hanselmann
        # Opcode result
618 ebb2a2a3 Michael Hanselmann
        self.assertEqual(queue.GetNextUpdate(), (job, True))
619 ebb2a2a3 Michael Hanselmann
        self.assertRaises(IndexError, queue.GetNextUpdate)
620 be760ba8 Michael Hanselmann
621 be760ba8 Michael Hanselmann
        if idx in (failfrom, len(ops) - 1):
622 be760ba8 Michael Hanselmann
          # Last opcode
623 be760ba8 Michael Hanselmann
          self.assert_(result)
624 be760ba8 Michael Hanselmann
          break
625 be760ba8 Michael Hanselmann
626 be760ba8 Michael Hanselmann
        self.assertFalse(result)
627 be760ba8 Michael Hanselmann
628 be760ba8 Michael Hanselmann
        self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
629 be760ba8 Michael Hanselmann
630 ebb2a2a3 Michael Hanselmann
      self.assertRaises(IndexError, queue.GetNextUpdate)
631 ebb2a2a3 Michael Hanselmann
632 be760ba8 Michael Hanselmann
      # Check job status
633 be760ba8 Michael Hanselmann
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_ERROR)
634 be760ba8 Michael Hanselmann
      self.assertEqual(job.GetInfo(["id"]), [job_id])
635 be760ba8 Michael Hanselmann
      self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_ERROR])
636 be760ba8 Michael Hanselmann
637 be760ba8 Michael Hanselmann
      # Check opcode status
638 be760ba8 Michael Hanselmann
      data = zip(job.ops,
639 be760ba8 Michael Hanselmann
                 job.GetInfo(["opstatus"])[0],
640 be760ba8 Michael Hanselmann
                 job.GetInfo(["opresult"])[0])
641 be760ba8 Michael Hanselmann
642 be760ba8 Michael Hanselmann
      for idx, (op, opstatus, opresult) in enumerate(data):
643 be760ba8 Michael Hanselmann
        if idx < failfrom:
644 be760ba8 Michael Hanselmann
          assert not op.input.fail
645 be760ba8 Michael Hanselmann
          self.assertEqual(opstatus, constants.OP_STATUS_SUCCESS)
646 be760ba8 Michael Hanselmann
          self.assertEqual(opresult, op.input.result)
647 be760ba8 Michael Hanselmann
        elif idx <= failto:
648 be760ba8 Michael Hanselmann
          assert op.input.fail
649 be760ba8 Michael Hanselmann
          self.assertEqual(opstatus, constants.OP_STATUS_ERROR)
650 be760ba8 Michael Hanselmann
          self.assertRaises(errors.OpExecError, errors.MaybeRaise, opresult)
651 be760ba8 Michael Hanselmann
        else:
652 be760ba8 Michael Hanselmann
          assert not op.input.fail
653 be760ba8 Michael Hanselmann
          self.assertEqual(opstatus, constants.OP_STATUS_ERROR)
654 be760ba8 Michael Hanselmann
          self.assertRaises(errors.OpExecError, errors.MaybeRaise, opresult)
655 be760ba8 Michael Hanselmann
656 be760ba8 Michael Hanselmann
      self.assert_(compat.all(op.start_timestamp and op.end_timestamp
657 be760ba8 Michael Hanselmann
                              for op in job.ops[:failfrom]))
658 be760ba8 Michael Hanselmann
659 be760ba8 Michael Hanselmann
      self._GenericCheckJob(job)
660 be760ba8 Michael Hanselmann
661 be760ba8 Michael Hanselmann
      # Finished jobs can't be processed any further
662 be760ba8 Michael Hanselmann
      self.assertRaises(errors.ProgrammerError,
663 be760ba8 Michael Hanselmann
                        jqueue._JobProcessor(queue, opexec, job))
664 be760ba8 Michael Hanselmann
665 be760ba8 Michael Hanselmann
  def testCancelWhileInQueue(self):
666 be760ba8 Michael Hanselmann
    queue = _FakeQueueForProc()
667 be760ba8 Michael Hanselmann
668 be760ba8 Michael Hanselmann
    ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
669 be760ba8 Michael Hanselmann
           for i in range(5)]
670 be760ba8 Michael Hanselmann
671 be760ba8 Michael Hanselmann
    # Create job
672 be760ba8 Michael Hanselmann
    job_id = 17045
673 be760ba8 Michael Hanselmann
    job = self._CreateJob(queue, job_id, ops)
674 be760ba8 Michael Hanselmann
675 be760ba8 Michael Hanselmann
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
676 be760ba8 Michael Hanselmann
677 be760ba8 Michael Hanselmann
    # Mark as cancelled
678 be760ba8 Michael Hanselmann
    (success, _) = job.Cancel()
679 be760ba8 Michael Hanselmann
    self.assert_(success)
680 be760ba8 Michael Hanselmann
681 ebb2a2a3 Michael Hanselmann
    self.assertRaises(IndexError, queue.GetNextUpdate)
682 ebb2a2a3 Michael Hanselmann
683 be760ba8 Michael Hanselmann
    self.assert_(compat.all(op.status == constants.OP_STATUS_CANCELED
684 be760ba8 Michael Hanselmann
                            for op in job.ops))
685 be760ba8 Michael Hanselmann
686 ebb2a2a3 Michael Hanselmann
    opexec = _FakeExecOpCodeForProc(queue, None, None)
687 30c945d0 Michael Hanselmann
    self.assert_(jqueue._JobProcessor(queue, opexec, job)())
688 30c945d0 Michael Hanselmann
689 30c945d0 Michael Hanselmann
    # Check result
690 30c945d0 Michael Hanselmann
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_CANCELED)
691 30c945d0 Michael Hanselmann
    self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_CANCELED])
692 30c945d0 Michael Hanselmann
    self.assertFalse(job.start_timestamp)
693 30c945d0 Michael Hanselmann
    self.assert_(job.end_timestamp)
694 30c945d0 Michael Hanselmann
    self.assertFalse(compat.any(op.start_timestamp or op.end_timestamp
695 30c945d0 Michael Hanselmann
                                for op in job.ops))
696 30c945d0 Michael Hanselmann
    self.assertEqual(job.GetInfo(["opstatus", "opresult"]),
697 30c945d0 Michael Hanselmann
                     [[constants.OP_STATUS_CANCELED for _ in job.ops],
698 30c945d0 Michael Hanselmann
                      ["Job canceled by request" for _ in job.ops]])
699 30c945d0 Michael Hanselmann
700 30c945d0 Michael Hanselmann
  def testCancelWhileWaitlockInQueue(self):
701 30c945d0 Michael Hanselmann
    queue = _FakeQueueForProc()
702 30c945d0 Michael Hanselmann
703 30c945d0 Michael Hanselmann
    ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
704 30c945d0 Michael Hanselmann
           for i in range(5)]
705 30c945d0 Michael Hanselmann
706 30c945d0 Michael Hanselmann
    # Create job
707 30c945d0 Michael Hanselmann
    job_id = 8645
708 30c945d0 Michael Hanselmann
    job = self._CreateJob(queue, job_id, ops)
709 30c945d0 Michael Hanselmann
710 30c945d0 Michael Hanselmann
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
711 30c945d0 Michael Hanselmann
712 30c945d0 Michael Hanselmann
    job.ops[0].status = constants.OP_STATUS_WAITLOCK
713 30c945d0 Michael Hanselmann
714 30c945d0 Michael Hanselmann
    assert len(job.ops) == 5
715 30c945d0 Michael Hanselmann
716 30c945d0 Michael Hanselmann
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITLOCK)
717 30c945d0 Michael Hanselmann
718 30c945d0 Michael Hanselmann
    # Mark as cancelling
719 30c945d0 Michael Hanselmann
    (success, _) = job.Cancel()
720 30c945d0 Michael Hanselmann
    self.assert_(success)
721 30c945d0 Michael Hanselmann
722 30c945d0 Michael Hanselmann
    self.assertRaises(IndexError, queue.GetNextUpdate)
723 30c945d0 Michael Hanselmann
724 30c945d0 Michael Hanselmann
    self.assert_(compat.all(op.status == constants.OP_STATUS_CANCELING
725 30c945d0 Michael Hanselmann
                            for op in job.ops))
726 30c945d0 Michael Hanselmann
727 30c945d0 Michael Hanselmann
    opexec = _FakeExecOpCodeForProc(queue, None, None)
728 30c945d0 Michael Hanselmann
    self.assert_(jqueue._JobProcessor(queue, opexec, job)())
729 be760ba8 Michael Hanselmann
730 be760ba8 Michael Hanselmann
    # Check result
731 be760ba8 Michael Hanselmann
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_CANCELED)
732 be760ba8 Michael Hanselmann
    self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_CANCELED])
733 be760ba8 Michael Hanselmann
    self.assertFalse(job.start_timestamp)
734 be760ba8 Michael Hanselmann
    self.assert_(job.end_timestamp)
735 be760ba8 Michael Hanselmann
    self.assertFalse(compat.any(op.start_timestamp or op.end_timestamp
736 be760ba8 Michael Hanselmann
                                for op in job.ops))
737 be760ba8 Michael Hanselmann
    self.assertEqual(job.GetInfo(["opstatus", "opresult"]),
738 be760ba8 Michael Hanselmann
                     [[constants.OP_STATUS_CANCELED for _ in job.ops],
739 be760ba8 Michael Hanselmann
                      ["Job canceled by request" for _ in job.ops]])
740 be760ba8 Michael Hanselmann
741 be760ba8 Michael Hanselmann
  def testCancelWhileWaitlock(self):
742 be760ba8 Michael Hanselmann
    queue = _FakeQueueForProc()
743 be760ba8 Michael Hanselmann
744 be760ba8 Michael Hanselmann
    ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
745 be760ba8 Michael Hanselmann
           for i in range(5)]
746 be760ba8 Michael Hanselmann
747 be760ba8 Michael Hanselmann
    # Create job
748 be760ba8 Michael Hanselmann
    job_id = 11009
749 be760ba8 Michael Hanselmann
    job = self._CreateJob(queue, job_id, ops)
750 be760ba8 Michael Hanselmann
751 be760ba8 Michael Hanselmann
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
752 be760ba8 Michael Hanselmann
753 f23db633 Michael Hanselmann
    def _BeforeStart(timeout, priority):
754 ebb2a2a3 Michael Hanselmann
      self.assertEqual(queue.GetNextUpdate(), (job, True))
755 ebb2a2a3 Michael Hanselmann
      self.assertRaises(IndexError, queue.GetNextUpdate)
756 be760ba8 Michael Hanselmann
      self.assertFalse(queue.IsAcquired())
757 be760ba8 Michael Hanselmann
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITLOCK)
758 be760ba8 Michael Hanselmann
759 be760ba8 Michael Hanselmann
      # Mark as cancelled
760 be760ba8 Michael Hanselmann
      (success, _) = job.Cancel()
761 be760ba8 Michael Hanselmann
      self.assert_(success)
762 be760ba8 Michael Hanselmann
763 be760ba8 Michael Hanselmann
      self.assert_(compat.all(op.status == constants.OP_STATUS_CANCELING
764 be760ba8 Michael Hanselmann
                              for op in job.ops))
765 ebb2a2a3 Michael Hanselmann
      self.assertRaises(IndexError, queue.GetNextUpdate)
766 be760ba8 Michael Hanselmann
767 be760ba8 Michael Hanselmann
    def _AfterStart(op, cbs):
768 ebb2a2a3 Michael Hanselmann
      self.assertEqual(queue.GetNextUpdate(), (job, True))
769 ebb2a2a3 Michael Hanselmann
      self.assertRaises(IndexError, queue.GetNextUpdate)
770 be760ba8 Michael Hanselmann
      self.assertFalse(queue.IsAcquired())
771 be760ba8 Michael Hanselmann
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
772 be760ba8 Michael Hanselmann
773 ebb2a2a3 Michael Hanselmann
    opexec = _FakeExecOpCodeForProc(queue, _BeforeStart, _AfterStart)
774 be760ba8 Michael Hanselmann
775 ebb2a2a3 Michael Hanselmann
    self.assertRaises(IndexError, queue.GetNextUpdate)
776 ebb2a2a3 Michael Hanselmann
    self.assert_(jqueue._JobProcessor(queue, opexec, job)())
777 ebb2a2a3 Michael Hanselmann
    self.assertEqual(queue.GetNextUpdate(), (job, True))
778 ebb2a2a3 Michael Hanselmann
    self.assertRaises(IndexError, queue.GetNextUpdate)
779 be760ba8 Michael Hanselmann
780 be760ba8 Michael Hanselmann
    # Check result
781 be760ba8 Michael Hanselmann
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_CANCELED)
782 be760ba8 Michael Hanselmann
    self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_CANCELED])
783 be760ba8 Michael Hanselmann
    self.assert_(job.start_timestamp)
784 be760ba8 Michael Hanselmann
    self.assert_(job.end_timestamp)
785 be760ba8 Michael Hanselmann
    self.assertFalse(compat.all(op.start_timestamp and op.end_timestamp
786 be760ba8 Michael Hanselmann
                                for op in job.ops))
787 be760ba8 Michael Hanselmann
    self.assertEqual(job.GetInfo(["opstatus", "opresult"]),
788 be760ba8 Michael Hanselmann
                     [[constants.OP_STATUS_CANCELED for _ in job.ops],
789 be760ba8 Michael Hanselmann
                      ["Job canceled by request" for _ in job.ops]])
790 be760ba8 Michael Hanselmann
791 9e49dfc5 Michael Hanselmann
  def testCancelWhileWaitlockWithTimeout(self):
792 9e49dfc5 Michael Hanselmann
    queue = _FakeQueueForProc()
793 9e49dfc5 Michael Hanselmann
794 9e49dfc5 Michael Hanselmann
    ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
795 9e49dfc5 Michael Hanselmann
           for i in range(5)]
796 9e49dfc5 Michael Hanselmann
797 9e49dfc5 Michael Hanselmann
    # Create job
798 9e49dfc5 Michael Hanselmann
    job_id = 24314
799 9e49dfc5 Michael Hanselmann
    job = self._CreateJob(queue, job_id, ops)
800 9e49dfc5 Michael Hanselmann
801 9e49dfc5 Michael Hanselmann
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
802 9e49dfc5 Michael Hanselmann
803 9e49dfc5 Michael Hanselmann
    def _BeforeStart(timeout, priority):
804 9e49dfc5 Michael Hanselmann
      self.assertFalse(queue.IsAcquired())
805 9e49dfc5 Michael Hanselmann
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITLOCK)
806 9e49dfc5 Michael Hanselmann
807 9e49dfc5 Michael Hanselmann
      # Mark as cancelled
808 9e49dfc5 Michael Hanselmann
      (success, _) = job.Cancel()
809 9e49dfc5 Michael Hanselmann
      self.assert_(success)
810 9e49dfc5 Michael Hanselmann
811 9e49dfc5 Michael Hanselmann
      self.assert_(compat.all(op.status == constants.OP_STATUS_CANCELING
812 9e49dfc5 Michael Hanselmann
                              for op in job.ops))
813 9e49dfc5 Michael Hanselmann
814 9e49dfc5 Michael Hanselmann
      # Fake an acquire attempt timing out
815 9e49dfc5 Michael Hanselmann
      raise mcpu.LockAcquireTimeout()
816 9e49dfc5 Michael Hanselmann
817 9e49dfc5 Michael Hanselmann
    def _AfterStart(op, cbs):
818 9e49dfc5 Michael Hanselmann
      self.fail("Should not reach this")
819 9e49dfc5 Michael Hanselmann
820 ebb2a2a3 Michael Hanselmann
    opexec = _FakeExecOpCodeForProc(queue, _BeforeStart, _AfterStart)
821 9e49dfc5 Michael Hanselmann
822 9e49dfc5 Michael Hanselmann
    self.assert_(jqueue._JobProcessor(queue, opexec, job)())
823 9e49dfc5 Michael Hanselmann
824 9e49dfc5 Michael Hanselmann
    # Check result
825 9e49dfc5 Michael Hanselmann
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_CANCELED)
826 9e49dfc5 Michael Hanselmann
    self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_CANCELED])
827 9e49dfc5 Michael Hanselmann
    self.assert_(job.start_timestamp)
828 9e49dfc5 Michael Hanselmann
    self.assert_(job.end_timestamp)
829 9e49dfc5 Michael Hanselmann
    self.assertFalse(compat.all(op.start_timestamp and op.end_timestamp
830 9e49dfc5 Michael Hanselmann
                                for op in job.ops))
831 9e49dfc5 Michael Hanselmann
    self.assertEqual(job.GetInfo(["opstatus", "opresult"]),
832 9e49dfc5 Michael Hanselmann
                     [[constants.OP_STATUS_CANCELED for _ in job.ops],
833 9e49dfc5 Michael Hanselmann
                      ["Job canceled by request" for _ in job.ops]])
834 9e49dfc5 Michael Hanselmann
835 be760ba8 Michael Hanselmann
  def testCancelWhileRunning(self):
836 be760ba8 Michael Hanselmann
    # Tests canceling a job with finished opcodes and more, unprocessed ones
837 be760ba8 Michael Hanselmann
    queue = _FakeQueueForProc()
838 be760ba8 Michael Hanselmann
839 be760ba8 Michael Hanselmann
    ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
840 be760ba8 Michael Hanselmann
           for i in range(3)]
841 be760ba8 Michael Hanselmann
842 be760ba8 Michael Hanselmann
    # Create job
843 be760ba8 Michael Hanselmann
    job_id = 28492
844 be760ba8 Michael Hanselmann
    job = self._CreateJob(queue, job_id, ops)
845 be760ba8 Michael Hanselmann
846 be760ba8 Michael Hanselmann
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
847 be760ba8 Michael Hanselmann
848 ebb2a2a3 Michael Hanselmann
    opexec = _FakeExecOpCodeForProc(queue, None, None)
849 be760ba8 Michael Hanselmann
850 be760ba8 Michael Hanselmann
    # Run one opcode
851 be760ba8 Michael Hanselmann
    self.assertFalse(jqueue._JobProcessor(queue, opexec, job)())
852 be760ba8 Michael Hanselmann
853 be760ba8 Michael Hanselmann
    # Job goes back to queued
854 be760ba8 Michael Hanselmann
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
855 be760ba8 Michael Hanselmann
    self.assertEqual(job.GetInfo(["opstatus", "opresult"]),
856 be760ba8 Michael Hanselmann
                     [[constants.OP_STATUS_SUCCESS,
857 be760ba8 Michael Hanselmann
                       constants.OP_STATUS_QUEUED,
858 be760ba8 Michael Hanselmann
                       constants.OP_STATUS_QUEUED],
859 be760ba8 Michael Hanselmann
                      ["Res0", None, None]])
860 be760ba8 Michael Hanselmann
861 be760ba8 Michael Hanselmann
    # Mark as cancelled
862 be760ba8 Michael Hanselmann
    (success, _) = job.Cancel()
863 be760ba8 Michael Hanselmann
    self.assert_(success)
864 be760ba8 Michael Hanselmann
865 be760ba8 Michael Hanselmann
    # Try processing another opcode (this will actually cancel the job)
866 be760ba8 Michael Hanselmann
    self.assert_(jqueue._JobProcessor(queue, opexec, job)())
867 be760ba8 Michael Hanselmann
868 be760ba8 Michael Hanselmann
    # Check result
869 be760ba8 Michael Hanselmann
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_CANCELED)
870 be760ba8 Michael Hanselmann
    self.assertEqual(job.GetInfo(["id"]), [job_id])
871 be760ba8 Michael Hanselmann
    self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_CANCELED])
872 be760ba8 Michael Hanselmann
    self.assertEqual(job.GetInfo(["opstatus", "opresult"]),
873 be760ba8 Michael Hanselmann
                     [[constants.OP_STATUS_SUCCESS,
874 be760ba8 Michael Hanselmann
                       constants.OP_STATUS_CANCELED,
875 be760ba8 Michael Hanselmann
                       constants.OP_STATUS_CANCELED],
876 be760ba8 Michael Hanselmann
                      ["Res0", "Job canceled by request",
877 be760ba8 Michael Hanselmann
                       "Job canceled by request"]])
878 be760ba8 Michael Hanselmann
879 be760ba8 Michael Hanselmann
  def testPartiallyRun(self):
880 be760ba8 Michael Hanselmann
    # Tests calling the processor on a job that's been partially run before the
881 be760ba8 Michael Hanselmann
    # program was restarted
882 be760ba8 Michael Hanselmann
    queue = _FakeQueueForProc()
883 be760ba8 Michael Hanselmann
884 ebb2a2a3 Michael Hanselmann
    opexec = _FakeExecOpCodeForProc(queue, None, None)
885 be760ba8 Michael Hanselmann
886 be760ba8 Michael Hanselmann
    for job_id, successcount in [(30697, 1), (2552, 4), (12489, 9)]:
887 be760ba8 Michael Hanselmann
      ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
888 be760ba8 Michael Hanselmann
             for i in range(10)]
889 be760ba8 Michael Hanselmann
890 be760ba8 Michael Hanselmann
      # Create job
891 be760ba8 Michael Hanselmann
      job = self._CreateJob(queue, job_id, ops)
892 be760ba8 Michael Hanselmann
893 be760ba8 Michael Hanselmann
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
894 be760ba8 Michael Hanselmann
895 be760ba8 Michael Hanselmann
      for _ in range(successcount):
896 be760ba8 Michael Hanselmann
        self.assertFalse(jqueue._JobProcessor(queue, opexec, job)())
897 be760ba8 Michael Hanselmann
898 be760ba8 Michael Hanselmann
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
899 be760ba8 Michael Hanselmann
      self.assertEqual(job.GetInfo(["opstatus"]),
900 be760ba8 Michael Hanselmann
                       [[constants.OP_STATUS_SUCCESS
901 be760ba8 Michael Hanselmann
                         for _ in range(successcount)] +
902 be760ba8 Michael Hanselmann
                        [constants.OP_STATUS_QUEUED
903 be760ba8 Michael Hanselmann
                         for _ in range(len(ops) - successcount)]])
904 be760ba8 Michael Hanselmann
905 03b63608 Michael Hanselmann
      self.assert_(job.ops_iter)
906 be760ba8 Michael Hanselmann
907 be760ba8 Michael Hanselmann
      # Serialize and restore (simulates program restart)
908 be760ba8 Michael Hanselmann
      newjob = jqueue._QueuedJob.Restore(queue, job.Serialize())
909 03b63608 Michael Hanselmann
      self.assertFalse(newjob.ops_iter)
910 be760ba8 Michael Hanselmann
      self._TestPartial(newjob, successcount)
911 be760ba8 Michael Hanselmann
912 be760ba8 Michael Hanselmann
  def _TestPartial(self, job, successcount):
913 be760ba8 Michael Hanselmann
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
914 be760ba8 Michael Hanselmann
    self.assertEqual(job.start_timestamp, job.ops[0].start_timestamp)
915 be760ba8 Michael Hanselmann
916 be760ba8 Michael Hanselmann
    queue = _FakeQueueForProc()
917 ebb2a2a3 Michael Hanselmann
    opexec = _FakeExecOpCodeForProc(queue, None, None)
918 be760ba8 Michael Hanselmann
919 be760ba8 Michael Hanselmann
    for remaining in reversed(range(len(job.ops) - successcount)):
920 be760ba8 Michael Hanselmann
      result = jqueue._JobProcessor(queue, opexec, job)()
921 be760ba8 Michael Hanselmann
922 be760ba8 Michael Hanselmann
      if remaining == 0:
923 be760ba8 Michael Hanselmann
        # Last opcode
924 be760ba8 Michael Hanselmann
        self.assert_(result)
925 be760ba8 Michael Hanselmann
        break
926 be760ba8 Michael Hanselmann
927 be760ba8 Michael Hanselmann
      self.assertFalse(result)
928 be760ba8 Michael Hanselmann
929 be760ba8 Michael Hanselmann
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
930 be760ba8 Michael Hanselmann
931 be760ba8 Michael Hanselmann
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_SUCCESS)
932 be760ba8 Michael Hanselmann
    self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_SUCCESS])
933 be760ba8 Michael Hanselmann
    self.assertEqual(job.GetInfo(["opresult"]),
934 be760ba8 Michael Hanselmann
                     [[op.input.result for op in job.ops]])
935 be760ba8 Michael Hanselmann
    self.assertEqual(job.GetInfo(["opstatus"]),
936 be760ba8 Michael Hanselmann
                     [[constants.OP_STATUS_SUCCESS for _ in job.ops]])
937 be760ba8 Michael Hanselmann
    self.assert_(compat.all(op.start_timestamp and op.end_timestamp
938 be760ba8 Michael Hanselmann
                            for op in job.ops))
939 be760ba8 Michael Hanselmann
940 be760ba8 Michael Hanselmann
    self._GenericCheckJob(job)
941 be760ba8 Michael Hanselmann
942 be760ba8 Michael Hanselmann
    # Finished jobs can't be processed any further
943 be760ba8 Michael Hanselmann
    self.assertRaises(errors.ProgrammerError,
944 be760ba8 Michael Hanselmann
                      jqueue._JobProcessor(queue, opexec, job))
945 be760ba8 Michael Hanselmann
946 be760ba8 Michael Hanselmann
    # ... also after being restored
947 be760ba8 Michael Hanselmann
    job2 = jqueue._QueuedJob.Restore(queue, job.Serialize())
948 be760ba8 Michael Hanselmann
    self.assertRaises(errors.ProgrammerError,
949 be760ba8 Michael Hanselmann
                      jqueue._JobProcessor(queue, opexec, job2))
950 be760ba8 Michael Hanselmann
951 be760ba8 Michael Hanselmann
  def testProcessorOnRunningJob(self):
952 be760ba8 Michael Hanselmann
    ops = [opcodes.OpTestDummy(result="result", fail=False)]
953 be760ba8 Michael Hanselmann
954 be760ba8 Michael Hanselmann
    queue = _FakeQueueForProc()
955 ebb2a2a3 Michael Hanselmann
    opexec = _FakeExecOpCodeForProc(queue, None, None)
956 be760ba8 Michael Hanselmann
957 be760ba8 Michael Hanselmann
    # Create job
958 be760ba8 Michael Hanselmann
    job = self._CreateJob(queue, 9571, ops)
959 be760ba8 Michael Hanselmann
960 be760ba8 Michael Hanselmann
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
961 be760ba8 Michael Hanselmann
962 be760ba8 Michael Hanselmann
    job.ops[0].status = constants.OP_STATUS_RUNNING
963 be760ba8 Michael Hanselmann
964 be760ba8 Michael Hanselmann
    assert len(job.ops) == 1
965 be760ba8 Michael Hanselmann
966 be760ba8 Michael Hanselmann
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
967 be760ba8 Michael Hanselmann
968 be760ba8 Michael Hanselmann
    # Calling on running job must fail
969 be760ba8 Michael Hanselmann
    self.assertRaises(errors.ProgrammerError,
970 be760ba8 Michael Hanselmann
                      jqueue._JobProcessor(queue, opexec, job))
971 be760ba8 Michael Hanselmann
972 be760ba8 Michael Hanselmann
  def testLogMessages(self):
973 be760ba8 Michael Hanselmann
    # Tests the "Feedback" callback function
974 be760ba8 Michael Hanselmann
    queue = _FakeQueueForProc()
975 be760ba8 Michael Hanselmann
976 be760ba8 Michael Hanselmann
    messages = {
977 be760ba8 Michael Hanselmann
      1: [
978 be760ba8 Michael Hanselmann
        (None, "Hello"),
979 be760ba8 Michael Hanselmann
        (None, "World"),
980 be760ba8 Michael Hanselmann
        (constants.ELOG_MESSAGE, "there"),
981 be760ba8 Michael Hanselmann
        ],
982 be760ba8 Michael Hanselmann
      4: [
983 be760ba8 Michael Hanselmann
        (constants.ELOG_JQUEUE_TEST, (1, 2, 3)),
984 be760ba8 Michael Hanselmann
        (constants.ELOG_JQUEUE_TEST, ("other", "type")),
985 be760ba8 Michael Hanselmann
        ],
986 be760ba8 Michael Hanselmann
      }
987 be760ba8 Michael Hanselmann
    ops = [opcodes.OpTestDummy(result="Logtest%s" % i, fail=False,
988 be760ba8 Michael Hanselmann
                               messages=messages.get(i, []))
989 be760ba8 Michael Hanselmann
           for i in range(5)]
990 be760ba8 Michael Hanselmann
991 be760ba8 Michael Hanselmann
    # Create job
992 be760ba8 Michael Hanselmann
    job = self._CreateJob(queue, 29386, ops)
993 be760ba8 Michael Hanselmann
994 f23db633 Michael Hanselmann
    def _BeforeStart(timeout, priority):
995 ebb2a2a3 Michael Hanselmann
      self.assertEqual(queue.GetNextUpdate(), (job, True))
996 ebb2a2a3 Michael Hanselmann
      self.assertRaises(IndexError, queue.GetNextUpdate)
997 be760ba8 Michael Hanselmann
      self.assertFalse(queue.IsAcquired())
998 be760ba8 Michael Hanselmann
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITLOCK)
999 be760ba8 Michael Hanselmann
1000 be760ba8 Michael Hanselmann
    def _AfterStart(op, cbs):
1001 ebb2a2a3 Michael Hanselmann
      self.assertEqual(queue.GetNextUpdate(), (job, True))
1002 ebb2a2a3 Michael Hanselmann
      self.assertRaises(IndexError, queue.GetNextUpdate)
1003 be760ba8 Michael Hanselmann
      self.assertFalse(queue.IsAcquired())
1004 be760ba8 Michael Hanselmann
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
1005 be760ba8 Michael Hanselmann
1006 be760ba8 Michael Hanselmann
      self.assertRaises(AssertionError, cbs.Feedback,
1007 be760ba8 Michael Hanselmann
                        "too", "many", "arguments")
1008 be760ba8 Michael Hanselmann
1009 be760ba8 Michael Hanselmann
      for (log_type, msg) in op.messages:
1010 ebb2a2a3 Michael Hanselmann
        self.assertRaises(IndexError, queue.GetNextUpdate)
1011 be760ba8 Michael Hanselmann
        if log_type:
1012 be760ba8 Michael Hanselmann
          cbs.Feedback(log_type, msg)
1013 be760ba8 Michael Hanselmann
        else:
1014 be760ba8 Michael Hanselmann
          cbs.Feedback(msg)
1015 ebb2a2a3 Michael Hanselmann
        # Check for job update without replication
1016 ebb2a2a3 Michael Hanselmann
        self.assertEqual(queue.GetNextUpdate(), (job, False))
1017 ebb2a2a3 Michael Hanselmann
        self.assertRaises(IndexError, queue.GetNextUpdate)
1018 be760ba8 Michael Hanselmann
1019 ebb2a2a3 Michael Hanselmann
    opexec = _FakeExecOpCodeForProc(queue, _BeforeStart, _AfterStart)
1020 be760ba8 Michael Hanselmann
1021 be760ba8 Michael Hanselmann
    for remaining in reversed(range(len(job.ops))):
1022 ebb2a2a3 Michael Hanselmann
      self.assertRaises(IndexError, queue.GetNextUpdate)
1023 be760ba8 Michael Hanselmann
      result = jqueue._JobProcessor(queue, opexec, job)()
1024 ebb2a2a3 Michael Hanselmann
      self.assertEqual(queue.GetNextUpdate(), (job, True))
1025 ebb2a2a3 Michael Hanselmann
      self.assertRaises(IndexError, queue.GetNextUpdate)
1026 be760ba8 Michael Hanselmann
1027 be760ba8 Michael Hanselmann
      if remaining == 0:
1028 be760ba8 Michael Hanselmann
        # Last opcode
1029 be760ba8 Michael Hanselmann
        self.assert_(result)
1030 be760ba8 Michael Hanselmann
        break
1031 be760ba8 Michael Hanselmann
1032 be760ba8 Michael Hanselmann
      self.assertFalse(result)
1033 be760ba8 Michael Hanselmann
1034 be760ba8 Michael Hanselmann
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1035 be760ba8 Michael Hanselmann
1036 ebb2a2a3 Michael Hanselmann
    self.assertRaises(IndexError, queue.GetNextUpdate)
1037 ebb2a2a3 Michael Hanselmann
1038 be760ba8 Michael Hanselmann
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_SUCCESS)
1039 be760ba8 Michael Hanselmann
    self.assertEqual(job.GetInfo(["opresult"]),
1040 be760ba8 Michael Hanselmann
                     [[op.input.result for op in job.ops]])
1041 be760ba8 Michael Hanselmann
1042 be760ba8 Michael Hanselmann
    logmsgcount = sum(len(m) for m in messages.values())
1043 be760ba8 Michael Hanselmann
1044 be760ba8 Michael Hanselmann
    self._CheckLogMessages(job, logmsgcount)
1045 be760ba8 Michael Hanselmann
1046 be760ba8 Michael Hanselmann
    # Serialize and restore (simulates program restart)
1047 be760ba8 Michael Hanselmann
    newjob = jqueue._QueuedJob.Restore(queue, job.Serialize())
1048 be760ba8 Michael Hanselmann
    self._CheckLogMessages(newjob, logmsgcount)
1049 be760ba8 Michael Hanselmann
1050 be760ba8 Michael Hanselmann
    # Check each message
1051 be760ba8 Michael Hanselmann
    prevserial = -1
1052 be760ba8 Michael Hanselmann
    for idx, oplog in enumerate(job.GetInfo(["oplog"])[0]):
1053 be760ba8 Michael Hanselmann
      for (serial, timestamp, log_type, msg) in oplog:
1054 be760ba8 Michael Hanselmann
        (exptype, expmsg) = messages.get(idx).pop(0)
1055 be760ba8 Michael Hanselmann
        if exptype:
1056 be760ba8 Michael Hanselmann
          self.assertEqual(log_type, exptype)
1057 be760ba8 Michael Hanselmann
        else:
1058 be760ba8 Michael Hanselmann
          self.assertEqual(log_type, constants.ELOG_MESSAGE)
1059 be760ba8 Michael Hanselmann
        self.assertEqual(expmsg, msg)
1060 be760ba8 Michael Hanselmann
        self.assert_(serial > prevserial)
1061 be760ba8 Michael Hanselmann
        prevserial = serial
1062 be760ba8 Michael Hanselmann
1063 be760ba8 Michael Hanselmann
  def _CheckLogMessages(self, job, count):
1064 be760ba8 Michael Hanselmann
    # Check serial
1065 be760ba8 Michael Hanselmann
    self.assertEqual(job.log_serial, count)
1066 be760ba8 Michael Hanselmann
1067 be760ba8 Michael Hanselmann
    # No filter
1068 be760ba8 Michael Hanselmann
    self.assertEqual(job.GetLogEntries(None),
1069 be760ba8 Michael Hanselmann
                     [entry for entries in job.GetInfo(["oplog"])[0] if entries
1070 be760ba8 Michael Hanselmann
                      for entry in entries])
1071 be760ba8 Michael Hanselmann
1072 be760ba8 Michael Hanselmann
    # Filter with serial
1073 be760ba8 Michael Hanselmann
    assert count > 3
1074 be760ba8 Michael Hanselmann
    self.assert_(job.GetLogEntries(3))
1075 be760ba8 Michael Hanselmann
    self.assertEqual(job.GetLogEntries(3),
1076 be760ba8 Michael Hanselmann
                     [entry for entries in job.GetInfo(["oplog"])[0] if entries
1077 be760ba8 Michael Hanselmann
                      for entry in entries][3:])
1078 be760ba8 Michael Hanselmann
1079 be760ba8 Michael Hanselmann
    # No log message after highest serial
1080 be760ba8 Michael Hanselmann
    self.assertFalse(job.GetLogEntries(count))
1081 be760ba8 Michael Hanselmann
    self.assertFalse(job.GetLogEntries(count + 3))
1082 be760ba8 Michael Hanselmann
1083 6a373640 Michael Hanselmann
  def testSubmitManyJobs(self):
1084 6a373640 Michael Hanselmann
    queue = _FakeQueueForProc()
1085 6a373640 Michael Hanselmann
1086 6a373640 Michael Hanselmann
    job_id = 15656
1087 6a373640 Michael Hanselmann
    ops = [
1088 6a373640 Michael Hanselmann
      opcodes.OpTestDummy(result="Res0", fail=False,
1089 6a373640 Michael Hanselmann
                          submit_jobs=[]),
1090 6a373640 Michael Hanselmann
      opcodes.OpTestDummy(result="Res1", fail=False,
1091 6a373640 Michael Hanselmann
                          submit_jobs=[
1092 6a373640 Michael Hanselmann
                            [opcodes.OpTestDummy(result="r1j0", fail=False)],
1093 6a373640 Michael Hanselmann
                            ]),
1094 6a373640 Michael Hanselmann
      opcodes.OpTestDummy(result="Res2", fail=False,
1095 6a373640 Michael Hanselmann
                          submit_jobs=[
1096 6a373640 Michael Hanselmann
                            [opcodes.OpTestDummy(result="r2j0o0", fail=False),
1097 6a373640 Michael Hanselmann
                             opcodes.OpTestDummy(result="r2j0o1", fail=False),
1098 6a373640 Michael Hanselmann
                             opcodes.OpTestDummy(result="r2j0o2", fail=False),
1099 6a373640 Michael Hanselmann
                             opcodes.OpTestDummy(result="r2j0o3", fail=False)],
1100 6a373640 Michael Hanselmann
                            [opcodes.OpTestDummy(result="r2j1", fail=False)],
1101 6a373640 Michael Hanselmann
                            [opcodes.OpTestDummy(result="r2j3o0", fail=False),
1102 6a373640 Michael Hanselmann
                             opcodes.OpTestDummy(result="r2j3o1", fail=False)],
1103 6a373640 Michael Hanselmann
                            ]),
1104 6a373640 Michael Hanselmann
      ]
1105 6a373640 Michael Hanselmann
1106 6a373640 Michael Hanselmann
    # Create job
1107 6a373640 Michael Hanselmann
    job = self._CreateJob(queue, job_id, ops)
1108 6a373640 Michael Hanselmann
1109 6a373640 Michael Hanselmann
    def _BeforeStart(timeout, priority):
1110 6a373640 Michael Hanselmann
      self.assertEqual(queue.GetNextUpdate(), (job, True))
1111 6a373640 Michael Hanselmann
      self.assertRaises(IndexError, queue.GetNextUpdate)
1112 6a373640 Michael Hanselmann
      self.assertFalse(queue.IsAcquired())
1113 6a373640 Michael Hanselmann
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITLOCK)
1114 6a373640 Michael Hanselmann
      self.assertFalse(job.cur_opctx)
1115 6a373640 Michael Hanselmann
1116 6a373640 Michael Hanselmann
    def _AfterStart(op, cbs):
1117 6a373640 Michael Hanselmann
      self.assertEqual(queue.GetNextUpdate(), (job, True))
1118 6a373640 Michael Hanselmann
      self.assertRaises(IndexError, queue.GetNextUpdate)
1119 6a373640 Michael Hanselmann
1120 6a373640 Michael Hanselmann
      self.assertFalse(queue.IsAcquired())
1121 6a373640 Michael Hanselmann
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
1122 6a373640 Michael Hanselmann
      self.assertFalse(job.cur_opctx)
1123 6a373640 Michael Hanselmann
1124 6a373640 Michael Hanselmann
      # Job is running, cancelling shouldn't be possible
1125 6a373640 Michael Hanselmann
      (success, _) = job.Cancel()
1126 6a373640 Michael Hanselmann
      self.assertFalse(success)
1127 6a373640 Michael Hanselmann
1128 6a373640 Michael Hanselmann
    opexec = _FakeExecOpCodeForProc(queue, _BeforeStart, _AfterStart)
1129 6a373640 Michael Hanselmann
1130 6a373640 Michael Hanselmann
    for idx in range(len(ops)):
1131 6a373640 Michael Hanselmann
      self.assertRaises(IndexError, queue.GetNextUpdate)
1132 6a373640 Michael Hanselmann
      result = jqueue._JobProcessor(queue, opexec, job)()
1133 6a373640 Michael Hanselmann
      self.assertEqual(queue.GetNextUpdate(), (job, True))
1134 6a373640 Michael Hanselmann
      self.assertRaises(IndexError, queue.GetNextUpdate)
1135 6a373640 Michael Hanselmann
      if idx == len(ops) - 1:
1136 6a373640 Michael Hanselmann
        # Last opcode
1137 6a373640 Michael Hanselmann
        self.assert_(result)
1138 6a373640 Michael Hanselmann
      else:
1139 6a373640 Michael Hanselmann
        self.assertFalse(result)
1140 6a373640 Michael Hanselmann
1141 6a373640 Michael Hanselmann
        self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1142 6a373640 Michael Hanselmann
        self.assert_(job.start_timestamp)
1143 6a373640 Michael Hanselmann
        self.assertFalse(job.end_timestamp)
1144 6a373640 Michael Hanselmann
1145 6a373640 Michael Hanselmann
    self.assertRaises(IndexError, queue.GetNextUpdate)
1146 6a373640 Michael Hanselmann
1147 6a373640 Michael Hanselmann
    for idx, submitted_ops in enumerate(job_ops
1148 6a373640 Michael Hanselmann
                                        for op in ops
1149 6a373640 Michael Hanselmann
                                        for job_ops in op.submit_jobs):
1150 6a373640 Michael Hanselmann
      self.assertEqual(queue.GetNextSubmittedJob(),
1151 6a373640 Michael Hanselmann
                       (1000 + idx, submitted_ops))
1152 6a373640 Michael Hanselmann
    self.assertRaises(IndexError, queue.GetNextSubmittedJob)
1153 6a373640 Michael Hanselmann
1154 6a373640 Michael Hanselmann
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_SUCCESS)
1155 6a373640 Michael Hanselmann
    self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_SUCCESS])
1156 6a373640 Michael Hanselmann
    self.assertEqual(job.GetInfo(["opresult"]),
1157 6a373640 Michael Hanselmann
                     [[[], [1000], [1001, 1002, 1003]]])
1158 6a373640 Michael Hanselmann
    self.assertEqual(job.GetInfo(["opstatus"]),
1159 6a373640 Michael Hanselmann
                     [len(job.ops) * [constants.OP_STATUS_SUCCESS]])
1160 6a373640 Michael Hanselmann
1161 6a373640 Michael Hanselmann
    self._GenericCheckJob(job)
1162 6a373640 Michael Hanselmann
1163 6a373640 Michael Hanselmann
    # Finished jobs can't be processed any further
1164 6a373640 Michael Hanselmann
    self.assertRaises(errors.ProgrammerError,
1165 6a373640 Michael Hanselmann
                      jqueue._JobProcessor(queue, opexec, job))
1166 6a373640 Michael Hanselmann
1167 be760ba8 Michael Hanselmann
1168 26d3fd2f Michael Hanselmann
class _FakeTimeoutStrategy:
1169 26d3fd2f Michael Hanselmann
  def __init__(self, timeouts):
1170 26d3fd2f Michael Hanselmann
    self.timeouts = timeouts
1171 26d3fd2f Michael Hanselmann
    self.attempts = 0
1172 26d3fd2f Michael Hanselmann
    self.last_timeout = None
1173 26d3fd2f Michael Hanselmann
1174 26d3fd2f Michael Hanselmann
  def NextAttempt(self):
1175 26d3fd2f Michael Hanselmann
    self.attempts += 1
1176 26d3fd2f Michael Hanselmann
    if self.timeouts:
1177 26d3fd2f Michael Hanselmann
      timeout = self.timeouts.pop(0)
1178 26d3fd2f Michael Hanselmann
    else:
1179 26d3fd2f Michael Hanselmann
      timeout = None
1180 26d3fd2f Michael Hanselmann
    self.last_timeout = timeout
1181 26d3fd2f Michael Hanselmann
    return timeout
1182 26d3fd2f Michael Hanselmann
1183 26d3fd2f Michael Hanselmann
1184 26d3fd2f Michael Hanselmann
class TestJobProcessorTimeouts(unittest.TestCase, _JobProcessorTestUtils):
1185 26d3fd2f Michael Hanselmann
  def setUp(self):
1186 26d3fd2f Michael Hanselmann
    self.queue = _FakeQueueForProc()
1187 26d3fd2f Michael Hanselmann
    self.job = None
1188 26d3fd2f Michael Hanselmann
    self.curop = None
1189 26d3fd2f Michael Hanselmann
    self.opcounter = None
1190 26d3fd2f Michael Hanselmann
    self.timeout_strategy = None
1191 26d3fd2f Michael Hanselmann
    self.retries = 0
1192 26d3fd2f Michael Hanselmann
    self.prev_tsop = None
1193 26d3fd2f Michael Hanselmann
    self.prev_prio = None
1194 5fd6b694 Michael Hanselmann
    self.prev_status = None
1195 5fd6b694 Michael Hanselmann
    self.lock_acq_prio = None
1196 26d3fd2f Michael Hanselmann
    self.gave_lock = None
1197 26d3fd2f Michael Hanselmann
    self.done_lock_before_blocking = False
1198 26d3fd2f Michael Hanselmann
1199 f23db633 Michael Hanselmann
  def _BeforeStart(self, timeout, priority):
1200 26d3fd2f Michael Hanselmann
    job = self.job
1201 26d3fd2f Michael Hanselmann
1202 5fd6b694 Michael Hanselmann
    # If status has changed, job must've been written
1203 5fd6b694 Michael Hanselmann
    if self.prev_status != self.job.ops[self.curop].status:
1204 5fd6b694 Michael Hanselmann
      self.assertEqual(self.queue.GetNextUpdate(), (job, True))
1205 ebb2a2a3 Michael Hanselmann
    self.assertRaises(IndexError, self.queue.GetNextUpdate)
1206 5fd6b694 Michael Hanselmann
1207 26d3fd2f Michael Hanselmann
    self.assertFalse(self.queue.IsAcquired())
1208 26d3fd2f Michael Hanselmann
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITLOCK)
1209 26d3fd2f Michael Hanselmann
1210 26d3fd2f Michael Hanselmann
    ts = self.timeout_strategy
1211 26d3fd2f Michael Hanselmann
1212 26d3fd2f Michael Hanselmann
    self.assert_(timeout is None or isinstance(timeout, (int, float)))
1213 26d3fd2f Michael Hanselmann
    self.assertEqual(timeout, ts.last_timeout)
1214 f23db633 Michael Hanselmann
    self.assertEqual(priority, job.ops[self.curop].priority)
1215 26d3fd2f Michael Hanselmann
1216 26d3fd2f Michael Hanselmann
    self.gave_lock = True
1217 5fd6b694 Michael Hanselmann
    self.lock_acq_prio = priority
1218 26d3fd2f Michael Hanselmann
1219 26d3fd2f Michael Hanselmann
    if (self.curop == 3 and
1220 26d3fd2f Michael Hanselmann
        job.ops[self.curop].priority == constants.OP_PRIO_HIGHEST + 3):
1221 26d3fd2f Michael Hanselmann
      # Give locks before running into blocking acquire
1222 26d3fd2f Michael Hanselmann
      assert self.retries == 7
1223 26d3fd2f Michael Hanselmann
      self.retries = 0
1224 26d3fd2f Michael Hanselmann
      self.done_lock_before_blocking = True
1225 26d3fd2f Michael Hanselmann
      return
1226 26d3fd2f Michael Hanselmann
1227 26d3fd2f Michael Hanselmann
    if self.retries > 0:
1228 26d3fd2f Michael Hanselmann
      self.assert_(timeout is not None)
1229 26d3fd2f Michael Hanselmann
      self.retries -= 1
1230 26d3fd2f Michael Hanselmann
      self.gave_lock = False
1231 26d3fd2f Michael Hanselmann
      raise mcpu.LockAcquireTimeout()
1232 26d3fd2f Michael Hanselmann
1233 26d3fd2f Michael Hanselmann
    if job.ops[self.curop].priority == constants.OP_PRIO_HIGHEST:
1234 26d3fd2f Michael Hanselmann
      assert self.retries == 0, "Didn't exhaust all retries at highest priority"
1235 26d3fd2f Michael Hanselmann
      assert not ts.timeouts
1236 26d3fd2f Michael Hanselmann
      self.assert_(timeout is None)
1237 26d3fd2f Michael Hanselmann
1238 26d3fd2f Michael Hanselmann
  def _AfterStart(self, op, cbs):
1239 26d3fd2f Michael Hanselmann
    job = self.job
1240 26d3fd2f Michael Hanselmann
1241 5fd6b694 Michael Hanselmann
    # Setting to "running" requires an update
1242 ebb2a2a3 Michael Hanselmann
    self.assertEqual(self.queue.GetNextUpdate(), (job, True))
1243 ebb2a2a3 Michael Hanselmann
    self.assertRaises(IndexError, self.queue.GetNextUpdate)
1244 5fd6b694 Michael Hanselmann
1245 26d3fd2f Michael Hanselmann
    self.assertFalse(self.queue.IsAcquired())
1246 26d3fd2f Michael Hanselmann
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
1247 26d3fd2f Michael Hanselmann
1248 26d3fd2f Michael Hanselmann
    # Job is running, cancelling shouldn't be possible
1249 26d3fd2f Michael Hanselmann
    (success, _) = job.Cancel()
1250 26d3fd2f Michael Hanselmann
    self.assertFalse(success)
1251 26d3fd2f Michael Hanselmann
1252 26d3fd2f Michael Hanselmann
  def _NextOpcode(self):
1253 26d3fd2f Michael Hanselmann
    self.curop = self.opcounter.next()
1254 26d3fd2f Michael Hanselmann
    self.prev_prio = self.job.ops[self.curop].priority
1255 5fd6b694 Michael Hanselmann
    self.prev_status = self.job.ops[self.curop].status
1256 26d3fd2f Michael Hanselmann
1257 26d3fd2f Michael Hanselmann
  def _NewTimeoutStrategy(self):
1258 26d3fd2f Michael Hanselmann
    job = self.job
1259 26d3fd2f Michael Hanselmann
1260 26d3fd2f Michael Hanselmann
    self.assertEqual(self.retries, 0)
1261 26d3fd2f Michael Hanselmann
1262 26d3fd2f Michael Hanselmann
    if self.prev_tsop == self.curop:
1263 26d3fd2f Michael Hanselmann
      # Still on the same opcode, priority must've been increased
1264 26d3fd2f Michael Hanselmann
      self.assertEqual(self.prev_prio, job.ops[self.curop].priority + 1)
1265 26d3fd2f Michael Hanselmann
1266 26d3fd2f Michael Hanselmann
    if self.curop == 1:
1267 26d3fd2f Michael Hanselmann
      # Normal retry
1268 26d3fd2f Michael Hanselmann
      timeouts = range(10, 31, 10)
1269 26d3fd2f Michael Hanselmann
      self.retries = len(timeouts) - 1
1270 26d3fd2f Michael Hanselmann
1271 26d3fd2f Michael Hanselmann
    elif self.curop == 2:
1272 26d3fd2f Michael Hanselmann
      # Let this run into a blocking acquire
1273 26d3fd2f Michael Hanselmann
      timeouts = range(11, 61, 12)
1274 26d3fd2f Michael Hanselmann
      self.retries = len(timeouts)
1275 26d3fd2f Michael Hanselmann
1276 26d3fd2f Michael Hanselmann
    elif self.curop == 3:
1277 26d3fd2f Michael Hanselmann
      # Wait for priority to increase, but give lock before blocking acquire
1278 26d3fd2f Michael Hanselmann
      timeouts = range(12, 100, 14)
1279 26d3fd2f Michael Hanselmann
      self.retries = len(timeouts)
1280 26d3fd2f Michael Hanselmann
1281 26d3fd2f Michael Hanselmann
      self.assertFalse(self.done_lock_before_blocking)
1282 26d3fd2f Michael Hanselmann
1283 26d3fd2f Michael Hanselmann
    elif self.curop == 4:
1284 26d3fd2f Michael Hanselmann
      self.assert_(self.done_lock_before_blocking)
1285 26d3fd2f Michael Hanselmann
1286 26d3fd2f Michael Hanselmann
      # Timeouts, but no need to retry
1287 26d3fd2f Michael Hanselmann
      timeouts = range(10, 31, 10)
1288 26d3fd2f Michael Hanselmann
      self.retries = 0
1289 26d3fd2f Michael Hanselmann
1290 26d3fd2f Michael Hanselmann
    elif self.curop == 5:
1291 26d3fd2f Michael Hanselmann
      # Normal retry
1292 26d3fd2f Michael Hanselmann
      timeouts = range(19, 100, 11)
1293 26d3fd2f Michael Hanselmann
      self.retries = len(timeouts)
1294 26d3fd2f Michael Hanselmann
1295 26d3fd2f Michael Hanselmann
    else:
1296 26d3fd2f Michael Hanselmann
      timeouts = []
1297 26d3fd2f Michael Hanselmann
      self.retries = 0
1298 26d3fd2f Michael Hanselmann
1299 26d3fd2f Michael Hanselmann
    assert len(job.ops) == 10
1300 26d3fd2f Michael Hanselmann
    assert self.retries <= len(timeouts)
1301 26d3fd2f Michael Hanselmann
1302 26d3fd2f Michael Hanselmann
    ts = _FakeTimeoutStrategy(timeouts)
1303 26d3fd2f Michael Hanselmann
1304 26d3fd2f Michael Hanselmann
    self.timeout_strategy = ts
1305 26d3fd2f Michael Hanselmann
    self.prev_tsop = self.curop
1306 26d3fd2f Michael Hanselmann
    self.prev_prio = job.ops[self.curop].priority
1307 26d3fd2f Michael Hanselmann
1308 26d3fd2f Michael Hanselmann
    return ts
1309 26d3fd2f Michael Hanselmann
1310 26d3fd2f Michael Hanselmann
  def testTimeout(self):
1311 26d3fd2f Michael Hanselmann
    ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
1312 26d3fd2f Michael Hanselmann
           for i in range(10)]
1313 26d3fd2f Michael Hanselmann
1314 26d3fd2f Michael Hanselmann
    # Create job
1315 26d3fd2f Michael Hanselmann
    job_id = 15801
1316 26d3fd2f Michael Hanselmann
    job = self._CreateJob(self.queue, job_id, ops)
1317 26d3fd2f Michael Hanselmann
    self.job = job
1318 26d3fd2f Michael Hanselmann
1319 26d3fd2f Michael Hanselmann
    self.opcounter = itertools.count(0)
1320 26d3fd2f Michael Hanselmann
1321 ebb2a2a3 Michael Hanselmann
    opexec = _FakeExecOpCodeForProc(self.queue, self._BeforeStart,
1322 ebb2a2a3 Michael Hanselmann
                                    self._AfterStart)
1323 26d3fd2f Michael Hanselmann
    tsf = self._NewTimeoutStrategy
1324 26d3fd2f Michael Hanselmann
1325 26d3fd2f Michael Hanselmann
    self.assertFalse(self.done_lock_before_blocking)
1326 26d3fd2f Michael Hanselmann
1327 5fd6b694 Michael Hanselmann
    while True:
1328 26d3fd2f Michael Hanselmann
      proc = jqueue._JobProcessor(self.queue, opexec, job,
1329 26d3fd2f Michael Hanselmann
                                  _timeout_strategy_factory=tsf)
1330 26d3fd2f Michael Hanselmann
1331 ebb2a2a3 Michael Hanselmann
      self.assertRaises(IndexError, self.queue.GetNextUpdate)
1332 5fd6b694 Michael Hanselmann
1333 5fd6b694 Michael Hanselmann
      if self.curop is not None:
1334 5fd6b694 Michael Hanselmann
        self.prev_status = self.job.ops[self.curop].status
1335 5fd6b694 Michael Hanselmann
1336 5fd6b694 Michael Hanselmann
      self.lock_acq_prio = None
1337 5fd6b694 Michael Hanselmann
1338 26d3fd2f Michael Hanselmann
      result = proc(_nextop_fn=self._NextOpcode)
1339 5fd6b694 Michael Hanselmann
      assert self.curop is not None
1340 5fd6b694 Michael Hanselmann
1341 5fd6b694 Michael Hanselmann
      if result or self.gave_lock:
1342 5fd6b694 Michael Hanselmann
        # Got lock and/or job is done, result must've been written
1343 5fd6b694 Michael Hanselmann
        self.assertFalse(job.cur_opctx)
1344 5fd6b694 Michael Hanselmann
        self.assertEqual(self.queue.GetNextUpdate(), (job, True))
1345 5fd6b694 Michael Hanselmann
        self.assertRaises(IndexError, self.queue.GetNextUpdate)
1346 5fd6b694 Michael Hanselmann
        self.assertEqual(self.lock_acq_prio, job.ops[self.curop].priority)
1347 5fd6b694 Michael Hanselmann
        self.assert_(job.ops[self.curop].exec_timestamp)
1348 5fd6b694 Michael Hanselmann
1349 26d3fd2f Michael Hanselmann
      if result:
1350 26d3fd2f Michael Hanselmann
        self.assertFalse(job.cur_opctx)
1351 26d3fd2f Michael Hanselmann
        break
1352 26d3fd2f Michael Hanselmann
1353 26d3fd2f Michael Hanselmann
      self.assertFalse(result)
1354 26d3fd2f Michael Hanselmann
1355 5fd6b694 Michael Hanselmann
      if self.curop == 0:
1356 5fd6b694 Michael Hanselmann
        self.assertEqual(job.ops[self.curop].start_timestamp,
1357 5fd6b694 Michael Hanselmann
                         job.start_timestamp)
1358 5fd6b694 Michael Hanselmann
1359 26d3fd2f Michael Hanselmann
      if self.gave_lock:
1360 5fd6b694 Michael Hanselmann
        # Opcode finished, but job not yet done
1361 5fd6b694 Michael Hanselmann
        self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1362 26d3fd2f Michael Hanselmann
      else:
1363 5fd6b694 Michael Hanselmann
        # Did not get locks
1364 26d3fd2f Michael Hanselmann
        self.assert_(job.cur_opctx)
1365 26d3fd2f Michael Hanselmann
        self.assertEqual(job.cur_opctx._timeout_strategy._fn,
1366 26d3fd2f Michael Hanselmann
                         self.timeout_strategy.NextAttempt)
1367 5fd6b694 Michael Hanselmann
        self.assertFalse(job.ops[self.curop].exec_timestamp)
1368 5fd6b694 Michael Hanselmann
        self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITLOCK)
1369 5fd6b694 Michael Hanselmann
1370 5fd6b694 Michael Hanselmann
        # If priority has changed since acquiring locks, the job must've been
1371 5fd6b694 Michael Hanselmann
        # updated
1372 5fd6b694 Michael Hanselmann
        if self.lock_acq_prio != job.ops[self.curop].priority:
1373 5fd6b694 Michael Hanselmann
          self.assertEqual(self.queue.GetNextUpdate(), (job, True))
1374 5fd6b694 Michael Hanselmann
1375 5fd6b694 Michael Hanselmann
      self.assertRaises(IndexError, self.queue.GetNextUpdate)
1376 26d3fd2f Michael Hanselmann
1377 26d3fd2f Michael Hanselmann
      self.assert_(job.start_timestamp)
1378 26d3fd2f Michael Hanselmann
      self.assertFalse(job.end_timestamp)
1379 26d3fd2f Michael Hanselmann
1380 26d3fd2f Michael Hanselmann
    self.assertEqual(self.curop, len(job.ops) - 1)
1381 26d3fd2f Michael Hanselmann
    self.assertEqual(self.job, job)
1382 26d3fd2f Michael Hanselmann
    self.assertEqual(self.opcounter.next(), len(job.ops))
1383 26d3fd2f Michael Hanselmann
    self.assert_(self.done_lock_before_blocking)
1384 26d3fd2f Michael Hanselmann
1385 ebb2a2a3 Michael Hanselmann
    self.assertRaises(IndexError, self.queue.GetNextUpdate)
1386 26d3fd2f Michael Hanselmann
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_SUCCESS)
1387 26d3fd2f Michael Hanselmann
    self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_SUCCESS])
1388 26d3fd2f Michael Hanselmann
    self.assertEqual(job.GetInfo(["opresult"]),
1389 26d3fd2f Michael Hanselmann
                     [[op.input.result for op in job.ops]])
1390 26d3fd2f Michael Hanselmann
    self.assertEqual(job.GetInfo(["opstatus"]),
1391 26d3fd2f Michael Hanselmann
                     [len(job.ops) * [constants.OP_STATUS_SUCCESS]])
1392 26d3fd2f Michael Hanselmann
    self.assert_(compat.all(op.start_timestamp and op.end_timestamp
1393 26d3fd2f Michael Hanselmann
                            for op in job.ops))
1394 26d3fd2f Michael Hanselmann
1395 26d3fd2f Michael Hanselmann
    # Finished jobs can't be processed any further
1396 26d3fd2f Michael Hanselmann
    self.assertRaises(errors.ProgrammerError,
1397 26d3fd2f Michael Hanselmann
                      jqueue._JobProcessor(self.queue, opexec, job))
1398 26d3fd2f Michael Hanselmann
1399 26d3fd2f Michael Hanselmann
1400 989a8bee Michael Hanselmann
if __name__ == "__main__":
1401 989a8bee Michael Hanselmann
  testutils.GanetiTestProgram()