Statistics
| Branch: | Tag: | Revision:

root / test / ganeti.jqueue_unittest.py @ 415feb2e

History | View | Annotate | Download (75.9 kB)

1 989a8bee Michael Hanselmann
#!/usr/bin/python
2 989a8bee Michael Hanselmann
#
3 989a8bee Michael Hanselmann
4 1e6d5750 Iustin Pop
# Copyright (C) 2010, 2011 Google Inc.
5 989a8bee Michael Hanselmann
#
6 989a8bee Michael Hanselmann
# This program is free software; you can redistribute it and/or modify
7 989a8bee Michael Hanselmann
# it under the terms of the GNU General Public License as published by
8 989a8bee Michael Hanselmann
# the Free Software Foundation; either version 2 of the License, or
9 989a8bee Michael Hanselmann
# (at your option) any later version.
10 989a8bee Michael Hanselmann
#
11 989a8bee Michael Hanselmann
# This program is distributed in the hope that it will be useful, but
12 989a8bee Michael Hanselmann
# WITHOUT ANY WARRANTY; without even the implied warranty of
13 989a8bee Michael Hanselmann
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 989a8bee Michael Hanselmann
# General Public License for more details.
15 989a8bee Michael Hanselmann
#
16 989a8bee Michael Hanselmann
# You should have received a copy of the GNU General Public License
17 989a8bee Michael Hanselmann
# along with this program; if not, write to the Free Software
18 989a8bee Michael Hanselmann
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 989a8bee Michael Hanselmann
# 02110-1301, USA.
20 989a8bee Michael Hanselmann
21 989a8bee Michael Hanselmann
22 989a8bee Michael Hanselmann
"""Script for testing ganeti.jqueue"""
23 989a8bee Michael Hanselmann
24 989a8bee Michael Hanselmann
import os
25 989a8bee Michael Hanselmann
import sys
26 989a8bee Michael Hanselmann
import unittest
27 989a8bee Michael Hanselmann
import tempfile
28 989a8bee Michael Hanselmann
import shutil
29 989a8bee Michael Hanselmann
import errno
30 26d3fd2f Michael Hanselmann
import itertools
31 fcb21ad7 Michael Hanselmann
import random
32 989a8bee Michael Hanselmann
33 989a8bee Michael Hanselmann
from ganeti import constants
34 989a8bee Michael Hanselmann
from ganeti import utils
35 989a8bee Michael Hanselmann
from ganeti import errors
36 989a8bee Michael Hanselmann
from ganeti import jqueue
37 8f5c488d Michael Hanselmann
from ganeti import opcodes
38 8f5c488d Michael Hanselmann
from ganeti import compat
39 26d3fd2f Michael Hanselmann
from ganeti import mcpu
40 fcb21ad7 Michael Hanselmann
from ganeti import query
41 df5a5730 Michael Hanselmann
from ganeti import workerpool
42 989a8bee Michael Hanselmann
43 989a8bee Michael Hanselmann
import testutils
44 989a8bee Michael Hanselmann
45 989a8bee Michael Hanselmann
46 989a8bee Michael Hanselmann
class _FakeJob:
47 989a8bee Michael Hanselmann
  def __init__(self, job_id, status):
48 989a8bee Michael Hanselmann
    self.id = job_id
49 c0f6d0d8 Michael Hanselmann
    self.writable = False
50 989a8bee Michael Hanselmann
    self._status = status
51 989a8bee Michael Hanselmann
    self._log = []
52 989a8bee Michael Hanselmann
53 989a8bee Michael Hanselmann
  def SetStatus(self, status):
54 989a8bee Michael Hanselmann
    self._status = status
55 989a8bee Michael Hanselmann
56 989a8bee Michael Hanselmann
  def AddLogEntry(self, msg):
57 989a8bee Michael Hanselmann
    self._log.append((len(self._log), msg))
58 989a8bee Michael Hanselmann
59 989a8bee Michael Hanselmann
  def CalcStatus(self):
60 989a8bee Michael Hanselmann
    return self._status
61 989a8bee Michael Hanselmann
62 989a8bee Michael Hanselmann
  def GetInfo(self, fields):
63 989a8bee Michael Hanselmann
    result = []
64 989a8bee Michael Hanselmann
65 989a8bee Michael Hanselmann
    for name in fields:
66 989a8bee Michael Hanselmann
      if name == "status":
67 989a8bee Michael Hanselmann
        result.append(self._status)
68 989a8bee Michael Hanselmann
      else:
69 989a8bee Michael Hanselmann
        raise Exception("Unknown field")
70 989a8bee Michael Hanselmann
71 989a8bee Michael Hanselmann
    return result
72 989a8bee Michael Hanselmann
73 989a8bee Michael Hanselmann
  def GetLogEntries(self, newer_than):
74 989a8bee Michael Hanselmann
    assert newer_than is None or newer_than >= 0
75 989a8bee Michael Hanselmann
76 989a8bee Michael Hanselmann
    if newer_than is None:
77 989a8bee Michael Hanselmann
      return self._log
78 989a8bee Michael Hanselmann
79 989a8bee Michael Hanselmann
    return self._log[newer_than:]
80 989a8bee Michael Hanselmann
81 989a8bee Michael Hanselmann
82 989a8bee Michael Hanselmann
class TestJobChangesChecker(unittest.TestCase):
83 989a8bee Michael Hanselmann
  def testStatus(self):
84 989a8bee Michael Hanselmann
    job = _FakeJob(9094, constants.JOB_STATUS_QUEUED)
85 989a8bee Michael Hanselmann
    checker = jqueue._JobChangesChecker(["status"], None, None)
86 989a8bee Michael Hanselmann
    self.assertEqual(checker(job), ([constants.JOB_STATUS_QUEUED], []))
87 989a8bee Michael Hanselmann
88 989a8bee Michael Hanselmann
    job.SetStatus(constants.JOB_STATUS_RUNNING)
89 989a8bee Michael Hanselmann
    self.assertEqual(checker(job), ([constants.JOB_STATUS_RUNNING], []))
90 989a8bee Michael Hanselmann
91 989a8bee Michael Hanselmann
    job.SetStatus(constants.JOB_STATUS_SUCCESS)
92 989a8bee Michael Hanselmann
    self.assertEqual(checker(job), ([constants.JOB_STATUS_SUCCESS], []))
93 989a8bee Michael Hanselmann
94 989a8bee Michael Hanselmann
    # job.id is used by checker
95 989a8bee Michael Hanselmann
    self.assertEqual(job.id, 9094)
96 989a8bee Michael Hanselmann
97 989a8bee Michael Hanselmann
  def testStatusWithPrev(self):
98 989a8bee Michael Hanselmann
    job = _FakeJob(12807, constants.JOB_STATUS_QUEUED)
99 989a8bee Michael Hanselmann
    checker = jqueue._JobChangesChecker(["status"],
100 989a8bee Michael Hanselmann
                                        [constants.JOB_STATUS_QUEUED], None)
101 989a8bee Michael Hanselmann
    self.assert_(checker(job) is None)
102 989a8bee Michael Hanselmann
103 989a8bee Michael Hanselmann
    job.SetStatus(constants.JOB_STATUS_RUNNING)
104 989a8bee Michael Hanselmann
    self.assertEqual(checker(job), ([constants.JOB_STATUS_RUNNING], []))
105 989a8bee Michael Hanselmann
106 989a8bee Michael Hanselmann
  def testFinalStatus(self):
107 989a8bee Michael Hanselmann
    for status in constants.JOBS_FINALIZED:
108 989a8bee Michael Hanselmann
      job = _FakeJob(2178711, status)
109 989a8bee Michael Hanselmann
      checker = jqueue._JobChangesChecker(["status"], [status], None)
110 989a8bee Michael Hanselmann
      # There won't be any changes in this status, hence it should signal
111 989a8bee Michael Hanselmann
      # a change immediately
112 989a8bee Michael Hanselmann
      self.assertEqual(checker(job), ([status], []))
113 989a8bee Michael Hanselmann
114 989a8bee Michael Hanselmann
  def testLog(self):
115 989a8bee Michael Hanselmann
    job = _FakeJob(9094, constants.JOB_STATUS_RUNNING)
116 989a8bee Michael Hanselmann
    checker = jqueue._JobChangesChecker(["status"], None, None)
117 989a8bee Michael Hanselmann
    self.assertEqual(checker(job), ([constants.JOB_STATUS_RUNNING], []))
118 989a8bee Michael Hanselmann
119 989a8bee Michael Hanselmann
    job.AddLogEntry("Hello World")
120 989a8bee Michael Hanselmann
    (job_info, log_entries) = checker(job)
121 989a8bee Michael Hanselmann
    self.assertEqual(job_info, [constants.JOB_STATUS_RUNNING])
122 989a8bee Michael Hanselmann
    self.assertEqual(log_entries, [[0, "Hello World"]])
123 989a8bee Michael Hanselmann
124 989a8bee Michael Hanselmann
    checker2 = jqueue._JobChangesChecker(["status"], job_info, len(log_entries))
125 989a8bee Michael Hanselmann
    self.assert_(checker2(job) is None)
126 989a8bee Michael Hanselmann
127 989a8bee Michael Hanselmann
    job.AddLogEntry("Foo Bar")
128 989a8bee Michael Hanselmann
    job.SetStatus(constants.JOB_STATUS_ERROR)
129 989a8bee Michael Hanselmann
130 989a8bee Michael Hanselmann
    (job_info, log_entries) = checker2(job)
131 989a8bee Michael Hanselmann
    self.assertEqual(job_info, [constants.JOB_STATUS_ERROR])
132 989a8bee Michael Hanselmann
    self.assertEqual(log_entries, [[1, "Foo Bar"]])
133 989a8bee Michael Hanselmann
134 989a8bee Michael Hanselmann
    checker3 = jqueue._JobChangesChecker(["status"], None, None)
135 989a8bee Michael Hanselmann
    (job_info, log_entries) = checker3(job)
136 989a8bee Michael Hanselmann
    self.assertEqual(job_info, [constants.JOB_STATUS_ERROR])
137 989a8bee Michael Hanselmann
    self.assertEqual(log_entries, [[0, "Hello World"], [1, "Foo Bar"]])
138 989a8bee Michael Hanselmann
139 989a8bee Michael Hanselmann
140 989a8bee Michael Hanselmann
class TestJobChangesWaiter(unittest.TestCase):
141 989a8bee Michael Hanselmann
  def setUp(self):
142 989a8bee Michael Hanselmann
    self.tmpdir = tempfile.mkdtemp()
143 989a8bee Michael Hanselmann
    self.filename = utils.PathJoin(self.tmpdir, "job-1")
144 989a8bee Michael Hanselmann
    utils.WriteFile(self.filename, data="")
145 989a8bee Michael Hanselmann
146 989a8bee Michael Hanselmann
  def tearDown(self):
147 989a8bee Michael Hanselmann
    shutil.rmtree(self.tmpdir)
148 989a8bee Michael Hanselmann
149 989a8bee Michael Hanselmann
  def _EnsureNotifierClosed(self, notifier):
150 989a8bee Michael Hanselmann
    try:
151 989a8bee Michael Hanselmann
      os.fstat(notifier._fd)
152 989a8bee Michael Hanselmann
    except EnvironmentError, err:
153 989a8bee Michael Hanselmann
      self.assertEqual(err.errno, errno.EBADF)
154 989a8bee Michael Hanselmann
    else:
155 989a8bee Michael Hanselmann
      self.fail("File descriptor wasn't closed")
156 989a8bee Michael Hanselmann
157 989a8bee Michael Hanselmann
  def testClose(self):
158 989a8bee Michael Hanselmann
    for wait in [False, True]:
159 989a8bee Michael Hanselmann
      waiter = jqueue._JobFileChangesWaiter(self.filename)
160 989a8bee Michael Hanselmann
      try:
161 989a8bee Michael Hanselmann
        if wait:
162 989a8bee Michael Hanselmann
          waiter.Wait(0.001)
163 989a8bee Michael Hanselmann
      finally:
164 989a8bee Michael Hanselmann
        waiter.Close()
165 989a8bee Michael Hanselmann
166 989a8bee Michael Hanselmann
      # Ensure file descriptor was closed
167 989a8bee Michael Hanselmann
      self._EnsureNotifierClosed(waiter._notifier)
168 989a8bee Michael Hanselmann
169 989a8bee Michael Hanselmann
  def testChangingFile(self):
170 989a8bee Michael Hanselmann
    waiter = jqueue._JobFileChangesWaiter(self.filename)
171 989a8bee Michael Hanselmann
    try:
172 989a8bee Michael Hanselmann
      self.assertFalse(waiter.Wait(0.1))
173 989a8bee Michael Hanselmann
      utils.WriteFile(self.filename, data="changed")
174 989a8bee Michael Hanselmann
      self.assert_(waiter.Wait(60))
175 989a8bee Michael Hanselmann
    finally:
176 989a8bee Michael Hanselmann
      waiter.Close()
177 989a8bee Michael Hanselmann
178 989a8bee Michael Hanselmann
    self._EnsureNotifierClosed(waiter._notifier)
179 989a8bee Michael Hanselmann
180 989a8bee Michael Hanselmann
  def testChangingFile2(self):
181 989a8bee Michael Hanselmann
    waiter = jqueue._JobChangesWaiter(self.filename)
182 989a8bee Michael Hanselmann
    try:
183 989a8bee Michael Hanselmann
      self.assertFalse(waiter._filewaiter)
184 989a8bee Michael Hanselmann
      self.assert_(waiter.Wait(0.1))
185 989a8bee Michael Hanselmann
      self.assert_(waiter._filewaiter)
186 989a8bee Michael Hanselmann
187 989a8bee Michael Hanselmann
      # File waiter is now used, but there have been no changes
188 989a8bee Michael Hanselmann
      self.assertFalse(waiter.Wait(0.1))
189 989a8bee Michael Hanselmann
      utils.WriteFile(self.filename, data="changed")
190 989a8bee Michael Hanselmann
      self.assert_(waiter.Wait(60))
191 989a8bee Michael Hanselmann
    finally:
192 989a8bee Michael Hanselmann
      waiter.Close()
193 989a8bee Michael Hanselmann
194 989a8bee Michael Hanselmann
    self._EnsureNotifierClosed(waiter._filewaiter._notifier)
195 989a8bee Michael Hanselmann
196 989a8bee Michael Hanselmann
197 989a8bee Michael Hanselmann
class TestWaitForJobChangesHelper(unittest.TestCase):
198 989a8bee Michael Hanselmann
  def setUp(self):
199 989a8bee Michael Hanselmann
    self.tmpdir = tempfile.mkdtemp()
200 989a8bee Michael Hanselmann
    self.filename = utils.PathJoin(self.tmpdir, "job-2614226563")
201 989a8bee Michael Hanselmann
    utils.WriteFile(self.filename, data="")
202 989a8bee Michael Hanselmann
203 989a8bee Michael Hanselmann
  def tearDown(self):
204 989a8bee Michael Hanselmann
    shutil.rmtree(self.tmpdir)
205 989a8bee Michael Hanselmann
206 989a8bee Michael Hanselmann
  def _LoadWaitingJob(self):
207 47099cd1 Michael Hanselmann
    return _FakeJob(2614226563, constants.JOB_STATUS_WAITING)
208 989a8bee Michael Hanselmann
209 989a8bee Michael Hanselmann
  def _LoadLostJob(self):
210 989a8bee Michael Hanselmann
    return None
211 989a8bee Michael Hanselmann
212 989a8bee Michael Hanselmann
  def testNoChanges(self):
213 989a8bee Michael Hanselmann
    wfjc = jqueue._WaitForJobChangesHelper()
214 989a8bee Michael Hanselmann
215 989a8bee Michael Hanselmann
    # No change
216 989a8bee Michael Hanselmann
    self.assertEqual(wfjc(self.filename, self._LoadWaitingJob, ["status"],
217 47099cd1 Michael Hanselmann
                          [constants.JOB_STATUS_WAITING], None, 0.1),
218 989a8bee Michael Hanselmann
                     constants.JOB_NOTCHANGED)
219 989a8bee Michael Hanselmann
220 989a8bee Michael Hanselmann
    # No previous information
221 989a8bee Michael Hanselmann
    self.assertEqual(wfjc(self.filename, self._LoadWaitingJob,
222 989a8bee Michael Hanselmann
                          ["status"], None, None, 1.0),
223 47099cd1 Michael Hanselmann
                     ([constants.JOB_STATUS_WAITING], []))
224 989a8bee Michael Hanselmann
225 989a8bee Michael Hanselmann
  def testLostJob(self):
226 989a8bee Michael Hanselmann
    wfjc = jqueue._WaitForJobChangesHelper()
227 989a8bee Michael Hanselmann
    self.assert_(wfjc(self.filename, self._LoadLostJob,
228 989a8bee Michael Hanselmann
                      ["status"], None, None, 1.0) is None)
229 989a8bee Michael Hanselmann
230 989a8bee Michael Hanselmann
231 6760e4ed Michael Hanselmann
class TestEncodeOpError(unittest.TestCase):
232 6760e4ed Michael Hanselmann
  def test(self):
233 6760e4ed Michael Hanselmann
    encerr = jqueue._EncodeOpError(errors.LockError("Test 1"))
234 6760e4ed Michael Hanselmann
    self.assert_(isinstance(encerr, tuple))
235 6760e4ed Michael Hanselmann
    self.assertRaises(errors.LockError, errors.MaybeRaise, encerr)
236 6760e4ed Michael Hanselmann
237 6760e4ed Michael Hanselmann
    encerr = jqueue._EncodeOpError(errors.GenericError("Test 2"))
238 6760e4ed Michael Hanselmann
    self.assert_(isinstance(encerr, tuple))
239 6760e4ed Michael Hanselmann
    self.assertRaises(errors.GenericError, errors.MaybeRaise, encerr)
240 6760e4ed Michael Hanselmann
241 6760e4ed Michael Hanselmann
    encerr = jqueue._EncodeOpError(NotImplementedError("Foo"))
242 6760e4ed Michael Hanselmann
    self.assert_(isinstance(encerr, tuple))
243 6760e4ed Michael Hanselmann
    self.assertRaises(errors.OpExecError, errors.MaybeRaise, encerr)
244 6760e4ed Michael Hanselmann
245 6760e4ed Michael Hanselmann
    encerr = jqueue._EncodeOpError("Hello World")
246 6760e4ed Michael Hanselmann
    self.assert_(isinstance(encerr, tuple))
247 6760e4ed Michael Hanselmann
    self.assertRaises(errors.OpExecError, errors.MaybeRaise, encerr)
248 6760e4ed Michael Hanselmann
249 6760e4ed Michael Hanselmann
250 8f5c488d Michael Hanselmann
class TestQueuedOpCode(unittest.TestCase):
251 8f5c488d Michael Hanselmann
  def testDefaults(self):
252 8f5c488d Michael Hanselmann
    def _Check(op):
253 8f5c488d Michael Hanselmann
      self.assertFalse(hasattr(op.input, "dry_run"))
254 8f5c488d Michael Hanselmann
      self.assertEqual(op.priority, constants.OP_PRIO_DEFAULT)
255 8f5c488d Michael Hanselmann
      self.assertFalse(op.log)
256 8f5c488d Michael Hanselmann
      self.assert_(op.start_timestamp is None)
257 8f5c488d Michael Hanselmann
      self.assert_(op.exec_timestamp is None)
258 8f5c488d Michael Hanselmann
      self.assert_(op.end_timestamp is None)
259 8f5c488d Michael Hanselmann
      self.assert_(op.result is None)
260 8f5c488d Michael Hanselmann
      self.assertEqual(op.status, constants.OP_STATUS_QUEUED)
261 8f5c488d Michael Hanselmann
262 8f5c488d Michael Hanselmann
    op1 = jqueue._QueuedOpCode(opcodes.OpTestDelay())
263 8f5c488d Michael Hanselmann
    _Check(op1)
264 8f5c488d Michael Hanselmann
    op2 = jqueue._QueuedOpCode.Restore(op1.Serialize())
265 8f5c488d Michael Hanselmann
    _Check(op2)
266 8f5c488d Michael Hanselmann
    self.assertEqual(op1.Serialize(), op2.Serialize())
267 8f5c488d Michael Hanselmann
268 8f5c488d Michael Hanselmann
  def testPriority(self):
269 8f5c488d Michael Hanselmann
    def _Check(op):
270 8f5c488d Michael Hanselmann
      assert constants.OP_PRIO_DEFAULT != constants.OP_PRIO_HIGH, \
271 8f5c488d Michael Hanselmann
             "Default priority equals high priority; test can't work"
272 8f5c488d Michael Hanselmann
      self.assertEqual(op.priority, constants.OP_PRIO_HIGH)
273 8f5c488d Michael Hanselmann
      self.assertEqual(op.status, constants.OP_STATUS_QUEUED)
274 8f5c488d Michael Hanselmann
275 c6afb1ca Iustin Pop
    inpop = opcodes.OpTagsGet(priority=constants.OP_PRIO_HIGH)
276 8f5c488d Michael Hanselmann
    op1 = jqueue._QueuedOpCode(inpop)
277 8f5c488d Michael Hanselmann
    _Check(op1)
278 8f5c488d Michael Hanselmann
    op2 = jqueue._QueuedOpCode.Restore(op1.Serialize())
279 8f5c488d Michael Hanselmann
    _Check(op2)
280 8f5c488d Michael Hanselmann
    self.assertEqual(op1.Serialize(), op2.Serialize())
281 8f5c488d Michael Hanselmann
282 8f5c488d Michael Hanselmann
283 8f5c488d Michael Hanselmann
class TestQueuedJob(unittest.TestCase):
284 5f6b0b71 Michael Hanselmann
  def test(self):
285 5f6b0b71 Michael Hanselmann
    self.assertRaises(errors.GenericError, jqueue._QueuedJob,
286 c0f6d0d8 Michael Hanselmann
                      None, 1, [], False)
287 5f6b0b71 Michael Hanselmann
288 8f5c488d Michael Hanselmann
  def testDefaults(self):
289 8f5c488d Michael Hanselmann
    job_id = 4260
290 8f5c488d Michael Hanselmann
    ops = [
291 c6afb1ca Iustin Pop
      opcodes.OpTagsGet(),
292 8f5c488d Michael Hanselmann
      opcodes.OpTestDelay(),
293 8f5c488d Michael Hanselmann
      ]
294 8f5c488d Michael Hanselmann
295 8f5c488d Michael Hanselmann
    def _Check(job):
296 c0f6d0d8 Michael Hanselmann
      self.assertTrue(job.writable)
297 8f5c488d Michael Hanselmann
      self.assertEqual(job.id, job_id)
298 8f5c488d Michael Hanselmann
      self.assertEqual(job.log_serial, 0)
299 8f5c488d Michael Hanselmann
      self.assert_(job.received_timestamp)
300 8f5c488d Michael Hanselmann
      self.assert_(job.start_timestamp is None)
301 8f5c488d Michael Hanselmann
      self.assert_(job.end_timestamp is None)
302 8f5c488d Michael Hanselmann
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
303 8f5c488d Michael Hanselmann
      self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT)
304 8f5c488d Michael Hanselmann
      self.assert_(repr(job).startswith("<"))
305 8f5c488d Michael Hanselmann
      self.assertEqual(len(job.ops), len(ops))
306 8f5c488d Michael Hanselmann
      self.assert_(compat.all(inp.__getstate__() == op.input.__getstate__()
307 8f5c488d Michael Hanselmann
                              for (inp, op) in zip(ops, job.ops)))
308 5f6b0b71 Michael Hanselmann
      self.assertRaises(errors.OpExecError, job.GetInfo,
309 5f6b0b71 Michael Hanselmann
                        ["unknown-field"])
310 5f6b0b71 Michael Hanselmann
      self.assertEqual(job.GetInfo(["summary"]),
311 5f6b0b71 Michael Hanselmann
                       [[op.input.Summary() for op in job.ops]])
312 8f5c488d Michael Hanselmann
313 c0f6d0d8 Michael Hanselmann
    job1 = jqueue._QueuedJob(None, job_id, ops, True)
314 8f5c488d Michael Hanselmann
    _Check(job1)
315 c0f6d0d8 Michael Hanselmann
    job2 = jqueue._QueuedJob.Restore(None, job1.Serialize(), True)
316 8f5c488d Michael Hanselmann
    _Check(job2)
317 8f5c488d Michael Hanselmann
    self.assertEqual(job1.Serialize(), job2.Serialize())
318 8f5c488d Michael Hanselmann
319 c0f6d0d8 Michael Hanselmann
  def testWritable(self):
320 c0f6d0d8 Michael Hanselmann
    job = jqueue._QueuedJob(None, 1, [opcodes.OpTestDelay()], False)
321 c0f6d0d8 Michael Hanselmann
    self.assertFalse(job.writable)
322 c0f6d0d8 Michael Hanselmann
323 c0f6d0d8 Michael Hanselmann
    job = jqueue._QueuedJob(None, 1, [opcodes.OpTestDelay()], True)
324 c0f6d0d8 Michael Hanselmann
    self.assertTrue(job.writable)
325 c0f6d0d8 Michael Hanselmann
326 8f5c488d Michael Hanselmann
  def testPriority(self):
327 8f5c488d Michael Hanselmann
    job_id = 4283
328 8f5c488d Michael Hanselmann
    ops = [
329 c6afb1ca Iustin Pop
      opcodes.OpTagsGet(priority=constants.OP_PRIO_DEFAULT),
330 8f5c488d Michael Hanselmann
      opcodes.OpTestDelay(),
331 8f5c488d Michael Hanselmann
      ]
332 8f5c488d Michael Hanselmann
333 8f5c488d Michael Hanselmann
    def _Check(job):
334 8f5c488d Michael Hanselmann
      self.assertEqual(job.id, job_id)
335 8f5c488d Michael Hanselmann
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
336 8f5c488d Michael Hanselmann
      self.assert_(repr(job).startswith("<"))
337 8f5c488d Michael Hanselmann
338 c0f6d0d8 Michael Hanselmann
    job = jqueue._QueuedJob(None, job_id, ops, True)
339 8f5c488d Michael Hanselmann
    _Check(job)
340 8f5c488d Michael Hanselmann
    self.assert_(compat.all(op.priority == constants.OP_PRIO_DEFAULT
341 8f5c488d Michael Hanselmann
                            for op in job.ops))
342 8f5c488d Michael Hanselmann
    self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT)
343 8f5c488d Michael Hanselmann
344 8f5c488d Michael Hanselmann
    # Increase first
345 8f5c488d Michael Hanselmann
    job.ops[0].priority -= 1
346 8f5c488d Michael Hanselmann
    _Check(job)
347 8f5c488d Michael Hanselmann
    self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT - 1)
348 8f5c488d Michael Hanselmann
349 8f5c488d Michael Hanselmann
    # Mark opcode as finished
350 8f5c488d Michael Hanselmann
    job.ops[0].status = constants.OP_STATUS_SUCCESS
351 8f5c488d Michael Hanselmann
    _Check(job)
352 8f5c488d Michael Hanselmann
    self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT)
353 8f5c488d Michael Hanselmann
354 8f5c488d Michael Hanselmann
    # Increase second
355 8f5c488d Michael Hanselmann
    job.ops[1].priority -= 10
356 8f5c488d Michael Hanselmann
    self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT - 10)
357 8f5c488d Michael Hanselmann
358 8f5c488d Michael Hanselmann
    # Test increasing first
359 8f5c488d Michael Hanselmann
    job.ops[0].status = constants.OP_STATUS_RUNNING
360 8f5c488d Michael Hanselmann
    job.ops[0].priority -= 19
361 8f5c488d Michael Hanselmann
    self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT - 20)
362 8f5c488d Michael Hanselmann
363 db5bce34 Michael Hanselmann
  def testCalcStatus(self):
364 db5bce34 Michael Hanselmann
    def _Queued(ops):
365 db5bce34 Michael Hanselmann
      # The default status is "queued"
366 db5bce34 Michael Hanselmann
      self.assert_(compat.all(op.status == constants.OP_STATUS_QUEUED
367 db5bce34 Michael Hanselmann
                              for op in ops))
368 db5bce34 Michael Hanselmann
369 db5bce34 Michael Hanselmann
    def _Waitlock1(ops):
370 47099cd1 Michael Hanselmann
      ops[0].status = constants.OP_STATUS_WAITING
371 db5bce34 Michael Hanselmann
372 db5bce34 Michael Hanselmann
    def _Waitlock2(ops):
373 db5bce34 Michael Hanselmann
      ops[0].status = constants.OP_STATUS_SUCCESS
374 db5bce34 Michael Hanselmann
      ops[1].status = constants.OP_STATUS_SUCCESS
375 47099cd1 Michael Hanselmann
      ops[2].status = constants.OP_STATUS_WAITING
376 db5bce34 Michael Hanselmann
377 db5bce34 Michael Hanselmann
    def _Running(ops):
378 db5bce34 Michael Hanselmann
      ops[0].status = constants.OP_STATUS_SUCCESS
379 db5bce34 Michael Hanselmann
      ops[1].status = constants.OP_STATUS_RUNNING
380 db5bce34 Michael Hanselmann
      for op in ops[2:]:
381 db5bce34 Michael Hanselmann
        op.status = constants.OP_STATUS_QUEUED
382 db5bce34 Michael Hanselmann
383 db5bce34 Michael Hanselmann
    def _Canceling1(ops):
384 db5bce34 Michael Hanselmann
      ops[0].status = constants.OP_STATUS_SUCCESS
385 db5bce34 Michael Hanselmann
      ops[1].status = constants.OP_STATUS_SUCCESS
386 db5bce34 Michael Hanselmann
      for op in ops[2:]:
387 db5bce34 Michael Hanselmann
        op.status = constants.OP_STATUS_CANCELING
388 db5bce34 Michael Hanselmann
389 db5bce34 Michael Hanselmann
    def _Canceling2(ops):
390 db5bce34 Michael Hanselmann
      for op in ops:
391 db5bce34 Michael Hanselmann
        op.status = constants.OP_STATUS_CANCELING
392 db5bce34 Michael Hanselmann
393 db5bce34 Michael Hanselmann
    def _Canceled(ops):
394 db5bce34 Michael Hanselmann
      for op in ops:
395 db5bce34 Michael Hanselmann
        op.status = constants.OP_STATUS_CANCELED
396 db5bce34 Michael Hanselmann
397 db5bce34 Michael Hanselmann
    def _Error1(ops):
398 db5bce34 Michael Hanselmann
      for idx, op in enumerate(ops):
399 db5bce34 Michael Hanselmann
        if idx > 3:
400 db5bce34 Michael Hanselmann
          op.status = constants.OP_STATUS_ERROR
401 db5bce34 Michael Hanselmann
        else:
402 db5bce34 Michael Hanselmann
          op.status = constants.OP_STATUS_SUCCESS
403 db5bce34 Michael Hanselmann
404 db5bce34 Michael Hanselmann
    def _Error2(ops):
405 db5bce34 Michael Hanselmann
      for op in ops:
406 db5bce34 Michael Hanselmann
        op.status = constants.OP_STATUS_ERROR
407 db5bce34 Michael Hanselmann
408 db5bce34 Michael Hanselmann
    def _Success(ops):
409 db5bce34 Michael Hanselmann
      for op in ops:
410 db5bce34 Michael Hanselmann
        op.status = constants.OP_STATUS_SUCCESS
411 db5bce34 Michael Hanselmann
412 db5bce34 Michael Hanselmann
    tests = {
413 db5bce34 Michael Hanselmann
      constants.JOB_STATUS_QUEUED: [_Queued],
414 47099cd1 Michael Hanselmann
      constants.JOB_STATUS_WAITING: [_Waitlock1, _Waitlock2],
415 db5bce34 Michael Hanselmann
      constants.JOB_STATUS_RUNNING: [_Running],
416 db5bce34 Michael Hanselmann
      constants.JOB_STATUS_CANCELING: [_Canceling1, _Canceling2],
417 db5bce34 Michael Hanselmann
      constants.JOB_STATUS_CANCELED: [_Canceled],
418 db5bce34 Michael Hanselmann
      constants.JOB_STATUS_ERROR: [_Error1, _Error2],
419 db5bce34 Michael Hanselmann
      constants.JOB_STATUS_SUCCESS: [_Success],
420 db5bce34 Michael Hanselmann
      }
421 db5bce34 Michael Hanselmann
422 db5bce34 Michael Hanselmann
    def _NewJob():
423 db5bce34 Michael Hanselmann
      job = jqueue._QueuedJob(None, 1,
424 c0f6d0d8 Michael Hanselmann
                              [opcodes.OpTestDelay() for _ in range(10)],
425 c0f6d0d8 Michael Hanselmann
                              True)
426 db5bce34 Michael Hanselmann
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
427 db5bce34 Michael Hanselmann
      self.assert_(compat.all(op.status == constants.OP_STATUS_QUEUED
428 db5bce34 Michael Hanselmann
                              for op in job.ops))
429 db5bce34 Michael Hanselmann
      return job
430 db5bce34 Michael Hanselmann
431 db5bce34 Michael Hanselmann
    for status in constants.JOB_STATUS_ALL:
432 db5bce34 Michael Hanselmann
      sttests = tests[status]
433 db5bce34 Michael Hanselmann
      assert sttests
434 db5bce34 Michael Hanselmann
      for fn in sttests:
435 db5bce34 Michael Hanselmann
        job = _NewJob()
436 db5bce34 Michael Hanselmann
        fn(job.ops)
437 db5bce34 Michael Hanselmann
        self.assertEqual(job.CalcStatus(), status)
438 db5bce34 Michael Hanselmann
439 8f5c488d Michael Hanselmann
440 b95479a5 Michael Hanselmann
class _FakeDependencyManager:
441 be760ba8 Michael Hanselmann
  def __init__(self):
442 b95479a5 Michael Hanselmann
    self._checks = []
443 b95479a5 Michael Hanselmann
    self._notifications = []
444 b95479a5 Michael Hanselmann
    self._waiting = set()
445 b95479a5 Michael Hanselmann
446 b95479a5 Michael Hanselmann
  def AddCheckResult(self, job, dep_job_id, dep_status, result):
447 b95479a5 Michael Hanselmann
    self._checks.append((job, dep_job_id, dep_status, result))
448 b95479a5 Michael Hanselmann
449 b95479a5 Michael Hanselmann
  def CountPendingResults(self):
450 b95479a5 Michael Hanselmann
    return len(self._checks)
451 b95479a5 Michael Hanselmann
452 b95479a5 Michael Hanselmann
  def CountWaitingJobs(self):
453 b95479a5 Michael Hanselmann
    return len(self._waiting)
454 b95479a5 Michael Hanselmann
455 b95479a5 Michael Hanselmann
  def GetNextNotification(self):
456 b95479a5 Michael Hanselmann
    return self._notifications.pop(0)
457 b95479a5 Michael Hanselmann
458 b95479a5 Michael Hanselmann
  def JobWaiting(self, job):
459 b95479a5 Michael Hanselmann
    return job in self._waiting
460 b95479a5 Michael Hanselmann
461 b95479a5 Michael Hanselmann
  def CheckAndRegister(self, job, dep_job_id, dep_status):
462 b95479a5 Michael Hanselmann
    (exp_job, exp_dep_job_id, exp_dep_status, result) = self._checks.pop(0)
463 b95479a5 Michael Hanselmann
464 b95479a5 Michael Hanselmann
    assert exp_job == job
465 b95479a5 Michael Hanselmann
    assert exp_dep_job_id == dep_job_id
466 b95479a5 Michael Hanselmann
    assert exp_dep_status == dep_status
467 b95479a5 Michael Hanselmann
468 b95479a5 Michael Hanselmann
    (result_status, _) = result
469 b95479a5 Michael Hanselmann
470 b95479a5 Michael Hanselmann
    if result_status == jqueue._JobDependencyManager.WAIT:
471 b95479a5 Michael Hanselmann
      self._waiting.add(job)
472 b95479a5 Michael Hanselmann
    elif result_status == jqueue._JobDependencyManager.CONTINUE:
473 b95479a5 Michael Hanselmann
      self._waiting.remove(job)
474 b95479a5 Michael Hanselmann
475 b95479a5 Michael Hanselmann
    return result
476 b95479a5 Michael Hanselmann
477 b95479a5 Michael Hanselmann
  def NotifyWaiters(self, job_id):
478 b95479a5 Michael Hanselmann
    self._notifications.append(job_id)
479 b95479a5 Michael Hanselmann
480 b95479a5 Michael Hanselmann
481 b95479a5 Michael Hanselmann
class _DisabledFakeDependencyManager:
482 b95479a5 Michael Hanselmann
  def JobWaiting(self, _):
483 b95479a5 Michael Hanselmann
    return False
484 b95479a5 Michael Hanselmann
485 b95479a5 Michael Hanselmann
  def CheckAndRegister(self, *args):
486 b95479a5 Michael Hanselmann
    assert False, "Should not be called"
487 b95479a5 Michael Hanselmann
488 b95479a5 Michael Hanselmann
  def NotifyWaiters(self, _):
489 b95479a5 Michael Hanselmann
    pass
490 b95479a5 Michael Hanselmann
491 b95479a5 Michael Hanselmann
492 b95479a5 Michael Hanselmann
class _FakeQueueForProc:
493 b95479a5 Michael Hanselmann
  def __init__(self, depmgr=None):
494 be760ba8 Michael Hanselmann
    self._acquired = False
495 ebb2a2a3 Michael Hanselmann
    self._updates = []
496 6a373640 Michael Hanselmann
    self._submitted = []
497 6a373640 Michael Hanselmann
498 6a373640 Michael Hanselmann
    self._submit_count = itertools.count(1000)
499 be760ba8 Michael Hanselmann
500 b95479a5 Michael Hanselmann
    if depmgr:
501 b95479a5 Michael Hanselmann
      self.depmgr = depmgr
502 b95479a5 Michael Hanselmann
    else:
503 b95479a5 Michael Hanselmann
      self.depmgr = _DisabledFakeDependencyManager()
504 b95479a5 Michael Hanselmann
505 be760ba8 Michael Hanselmann
  def IsAcquired(self):
506 be760ba8 Michael Hanselmann
    return self._acquired
507 be760ba8 Michael Hanselmann
508 ebb2a2a3 Michael Hanselmann
  def GetNextUpdate(self):
509 ebb2a2a3 Michael Hanselmann
    return self._updates.pop(0)
510 ebb2a2a3 Michael Hanselmann
511 6a373640 Michael Hanselmann
  def GetNextSubmittedJob(self):
512 6a373640 Michael Hanselmann
    return self._submitted.pop(0)
513 6a373640 Michael Hanselmann
514 be760ba8 Michael Hanselmann
  def acquire(self, shared=0):
515 be760ba8 Michael Hanselmann
    assert shared == 1
516 be760ba8 Michael Hanselmann
    self._acquired = True
517 be760ba8 Michael Hanselmann
518 be760ba8 Michael Hanselmann
  def release(self):
519 be760ba8 Michael Hanselmann
    assert self._acquired
520 be760ba8 Michael Hanselmann
    self._acquired = False
521 be760ba8 Michael Hanselmann
522 ebb2a2a3 Michael Hanselmann
  def UpdateJobUnlocked(self, job, replicate=True):
523 ebb2a2a3 Michael Hanselmann
    assert self._acquired, "Lock not acquired while updating job"
524 ebb2a2a3 Michael Hanselmann
    self._updates.append((job, bool(replicate)))
525 be760ba8 Michael Hanselmann
526 6a373640 Michael Hanselmann
  def SubmitManyJobs(self, jobs):
527 6a373640 Michael Hanselmann
    assert not self._acquired, "Lock acquired while submitting jobs"
528 6a373640 Michael Hanselmann
    job_ids = [self._submit_count.next() for _ in jobs]
529 6a373640 Michael Hanselmann
    self._submitted.extend(zip(job_ids, jobs))
530 6a373640 Michael Hanselmann
    return job_ids
531 6a373640 Michael Hanselmann
532 be760ba8 Michael Hanselmann
533 be760ba8 Michael Hanselmann
class _FakeExecOpCodeForProc:
534 ebb2a2a3 Michael Hanselmann
  def __init__(self, queue, before_start, after_start):
535 ebb2a2a3 Michael Hanselmann
    self._queue = queue
536 be760ba8 Michael Hanselmann
    self._before_start = before_start
537 be760ba8 Michael Hanselmann
    self._after_start = after_start
538 be760ba8 Michael Hanselmann
539 f23db633 Michael Hanselmann
  def __call__(self, op, cbs, timeout=None, priority=None):
540 be760ba8 Michael Hanselmann
    assert isinstance(op, opcodes.OpTestDummy)
541 ebb2a2a3 Michael Hanselmann
    assert not self._queue.IsAcquired(), \
542 ebb2a2a3 Michael Hanselmann
           "Queue lock not released when executing opcode"
543 be760ba8 Michael Hanselmann
544 be760ba8 Michael Hanselmann
    if self._before_start:
545 f23db633 Michael Hanselmann
      self._before_start(timeout, priority)
546 be760ba8 Michael Hanselmann
547 be760ba8 Michael Hanselmann
    cbs.NotifyStart()
548 be760ba8 Michael Hanselmann
549 be760ba8 Michael Hanselmann
    if self._after_start:
550 be760ba8 Michael Hanselmann
      self._after_start(op, cbs)
551 be760ba8 Michael Hanselmann
552 ebb2a2a3 Michael Hanselmann
    # Check again after the callbacks
553 ebb2a2a3 Michael Hanselmann
    assert not self._queue.IsAcquired()
554 ebb2a2a3 Michael Hanselmann
555 be760ba8 Michael Hanselmann
    if op.fail:
556 be760ba8 Michael Hanselmann
      raise errors.OpExecError("Error requested (%s)" % op.result)
557 be760ba8 Michael Hanselmann
558 6a373640 Michael Hanselmann
    if hasattr(op, "submit_jobs") and op.submit_jobs is not None:
559 6a373640 Michael Hanselmann
      return cbs.SubmitManyJobs(op.submit_jobs)
560 6a373640 Michael Hanselmann
561 be760ba8 Michael Hanselmann
    return op.result
562 be760ba8 Michael Hanselmann
563 be760ba8 Michael Hanselmann
564 26d3fd2f Michael Hanselmann
class _JobProcessorTestUtils:
565 be760ba8 Michael Hanselmann
  def _CreateJob(self, queue, job_id, ops):
566 c0f6d0d8 Michael Hanselmann
    job = jqueue._QueuedJob(queue, job_id, ops, True)
567 be760ba8 Michael Hanselmann
    self.assertFalse(job.start_timestamp)
568 be760ba8 Michael Hanselmann
    self.assertFalse(job.end_timestamp)
569 be760ba8 Michael Hanselmann
    self.assertEqual(len(ops), len(job.ops))
570 be760ba8 Michael Hanselmann
    self.assert_(compat.all(op.input == inp
571 be760ba8 Michael Hanselmann
                            for (op, inp) in zip(job.ops, ops)))
572 be760ba8 Michael Hanselmann
    self.assertEqual(job.GetInfo(["ops"]), [[op.__getstate__() for op in ops]])
573 be760ba8 Michael Hanselmann
    return job
574 be760ba8 Michael Hanselmann
575 26d3fd2f Michael Hanselmann
576 26d3fd2f Michael Hanselmann
class TestJobProcessor(unittest.TestCase, _JobProcessorTestUtils):
577 be760ba8 Michael Hanselmann
  def _GenericCheckJob(self, job):
578 be760ba8 Michael Hanselmann
    assert compat.all(isinstance(op.input, opcodes.OpTestDummy)
579 be760ba8 Michael Hanselmann
                      for op in job.ops)
580 be760ba8 Michael Hanselmann
581 be760ba8 Michael Hanselmann
    self.assertEqual(job.GetInfo(["opstart", "opexec", "opend"]),
582 be760ba8 Michael Hanselmann
                     [[op.start_timestamp for op in job.ops],
583 be760ba8 Michael Hanselmann
                      [op.exec_timestamp for op in job.ops],
584 be760ba8 Michael Hanselmann
                      [op.end_timestamp for op in job.ops]])
585 be760ba8 Michael Hanselmann
    self.assertEqual(job.GetInfo(["received_ts", "start_ts", "end_ts"]),
586 be760ba8 Michael Hanselmann
                     [job.received_timestamp,
587 be760ba8 Michael Hanselmann
                      job.start_timestamp,
588 be760ba8 Michael Hanselmann
                      job.end_timestamp])
589 be760ba8 Michael Hanselmann
    self.assert_(job.start_timestamp)
590 be760ba8 Michael Hanselmann
    self.assert_(job.end_timestamp)
591 be760ba8 Michael Hanselmann
    self.assertEqual(job.start_timestamp, job.ops[0].start_timestamp)
592 be760ba8 Michael Hanselmann
593 be760ba8 Michael Hanselmann
  def testSuccess(self):
594 be760ba8 Michael Hanselmann
    queue = _FakeQueueForProc()
595 be760ba8 Michael Hanselmann
596 be760ba8 Michael Hanselmann
    for (job_id, opcount) in [(25351, 1), (6637, 3),
597 be760ba8 Michael Hanselmann
                              (24644, 10), (32207, 100)]:
598 be760ba8 Michael Hanselmann
      ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
599 be760ba8 Michael Hanselmann
             for i in range(opcount)]
600 be760ba8 Michael Hanselmann
601 be760ba8 Michael Hanselmann
      # Create job
602 be760ba8 Michael Hanselmann
      job = self._CreateJob(queue, job_id, ops)
603 be760ba8 Michael Hanselmann
604 f23db633 Michael Hanselmann
      def _BeforeStart(timeout, priority):
605 ebb2a2a3 Michael Hanselmann
        self.assertEqual(queue.GetNextUpdate(), (job, True))
606 ebb2a2a3 Michael Hanselmann
        self.assertRaises(IndexError, queue.GetNextUpdate)
607 be760ba8 Michael Hanselmann
        self.assertFalse(queue.IsAcquired())
608 47099cd1 Michael Hanselmann
        self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITING)
609 5fd6b694 Michael Hanselmann
        self.assertFalse(job.cur_opctx)
610 be760ba8 Michael Hanselmann
611 be760ba8 Michael Hanselmann
      def _AfterStart(op, cbs):
612 ebb2a2a3 Michael Hanselmann
        self.assertEqual(queue.GetNextUpdate(), (job, True))
613 ebb2a2a3 Michael Hanselmann
        self.assertRaises(IndexError, queue.GetNextUpdate)
614 ebb2a2a3 Michael Hanselmann
615 be760ba8 Michael Hanselmann
        self.assertFalse(queue.IsAcquired())
616 be760ba8 Michael Hanselmann
        self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
617 5fd6b694 Michael Hanselmann
        self.assertFalse(job.cur_opctx)
618 be760ba8 Michael Hanselmann
619 be760ba8 Michael Hanselmann
        # Job is running, cancelling shouldn't be possible
620 be760ba8 Michael Hanselmann
        (success, _) = job.Cancel()
621 be760ba8 Michael Hanselmann
        self.assertFalse(success)
622 be760ba8 Michael Hanselmann
623 ebb2a2a3 Michael Hanselmann
      opexec = _FakeExecOpCodeForProc(queue, _BeforeStart, _AfterStart)
624 be760ba8 Michael Hanselmann
625 be760ba8 Michael Hanselmann
      for idx in range(len(ops)):
626 ebb2a2a3 Michael Hanselmann
        self.assertRaises(IndexError, queue.GetNextUpdate)
627 be760ba8 Michael Hanselmann
        result = jqueue._JobProcessor(queue, opexec, job)()
628 ebb2a2a3 Michael Hanselmann
        self.assertEqual(queue.GetNextUpdate(), (job, True))
629 ebb2a2a3 Michael Hanselmann
        self.assertRaises(IndexError, queue.GetNextUpdate)
630 be760ba8 Michael Hanselmann
        if idx == len(ops) - 1:
631 be760ba8 Michael Hanselmann
          # Last opcode
632 75d81fc8 Michael Hanselmann
          self.assertEqual(result, jqueue._JobProcessor.FINISHED)
633 be760ba8 Michael Hanselmann
        else:
634 75d81fc8 Michael Hanselmann
          self.assertEqual(result, jqueue._JobProcessor.DEFER)
635 be760ba8 Michael Hanselmann
636 be760ba8 Michael Hanselmann
          self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
637 be760ba8 Michael Hanselmann
          self.assert_(job.start_timestamp)
638 be760ba8 Michael Hanselmann
          self.assertFalse(job.end_timestamp)
639 be760ba8 Michael Hanselmann
640 ebb2a2a3 Michael Hanselmann
      self.assertRaises(IndexError, queue.GetNextUpdate)
641 ebb2a2a3 Michael Hanselmann
642 be760ba8 Michael Hanselmann
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_SUCCESS)
643 be760ba8 Michael Hanselmann
      self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_SUCCESS])
644 be760ba8 Michael Hanselmann
      self.assertEqual(job.GetInfo(["opresult"]),
645 be760ba8 Michael Hanselmann
                       [[op.input.result for op in job.ops]])
646 be760ba8 Michael Hanselmann
      self.assertEqual(job.GetInfo(["opstatus"]),
647 be760ba8 Michael Hanselmann
                       [len(job.ops) * [constants.OP_STATUS_SUCCESS]])
648 be760ba8 Michael Hanselmann
      self.assert_(compat.all(op.start_timestamp and op.end_timestamp
649 be760ba8 Michael Hanselmann
                              for op in job.ops))
650 be760ba8 Michael Hanselmann
651 be760ba8 Michael Hanselmann
      self._GenericCheckJob(job)
652 be760ba8 Michael Hanselmann
653 66bd7445 Michael Hanselmann
      # Calling the processor on a finished job should be a no-op
654 75d81fc8 Michael Hanselmann
      self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
655 75d81fc8 Michael Hanselmann
                       jqueue._JobProcessor.FINISHED)
656 66bd7445 Michael Hanselmann
      self.assertRaises(IndexError, queue.GetNextUpdate)
657 be760ba8 Michael Hanselmann
658 be760ba8 Michael Hanselmann
  def testOpcodeError(self):
659 be760ba8 Michael Hanselmann
    queue = _FakeQueueForProc()
660 be760ba8 Michael Hanselmann
661 be760ba8 Michael Hanselmann
    testdata = [
662 be760ba8 Michael Hanselmann
      (17077, 1, 0, 0),
663 be760ba8 Michael Hanselmann
      (1782, 5, 2, 2),
664 be760ba8 Michael Hanselmann
      (18179, 10, 9, 9),
665 be760ba8 Michael Hanselmann
      (4744, 10, 3, 8),
666 be760ba8 Michael Hanselmann
      (23816, 100, 39, 45),
667 be760ba8 Michael Hanselmann
      ]
668 be760ba8 Michael Hanselmann
669 be760ba8 Michael Hanselmann
    for (job_id, opcount, failfrom, failto) in testdata:
670 be760ba8 Michael Hanselmann
      # Prepare opcodes
671 be760ba8 Michael Hanselmann
      ops = [opcodes.OpTestDummy(result="Res%s" % i,
672 be760ba8 Michael Hanselmann
                                 fail=(failfrom <= i and
673 be760ba8 Michael Hanselmann
                                       i <= failto))
674 be760ba8 Michael Hanselmann
             for i in range(opcount)]
675 be760ba8 Michael Hanselmann
676 be760ba8 Michael Hanselmann
      # Create job
677 be760ba8 Michael Hanselmann
      job = self._CreateJob(queue, job_id, ops)
678 be760ba8 Michael Hanselmann
679 ebb2a2a3 Michael Hanselmann
      opexec = _FakeExecOpCodeForProc(queue, None, None)
680 be760ba8 Michael Hanselmann
681 be760ba8 Michael Hanselmann
      for idx in range(len(ops)):
682 ebb2a2a3 Michael Hanselmann
        self.assertRaises(IndexError, queue.GetNextUpdate)
683 be760ba8 Michael Hanselmann
        result = jqueue._JobProcessor(queue, opexec, job)()
684 ebb2a2a3 Michael Hanselmann
        # queued to waitlock
685 ebb2a2a3 Michael Hanselmann
        self.assertEqual(queue.GetNextUpdate(), (job, True))
686 ebb2a2a3 Michael Hanselmann
        # waitlock to running
687 ebb2a2a3 Michael Hanselmann
        self.assertEqual(queue.GetNextUpdate(), (job, True))
688 ebb2a2a3 Michael Hanselmann
        # Opcode result
689 ebb2a2a3 Michael Hanselmann
        self.assertEqual(queue.GetNextUpdate(), (job, True))
690 ebb2a2a3 Michael Hanselmann
        self.assertRaises(IndexError, queue.GetNextUpdate)
691 be760ba8 Michael Hanselmann
692 be760ba8 Michael Hanselmann
        if idx in (failfrom, len(ops) - 1):
693 be760ba8 Michael Hanselmann
          # Last opcode
694 75d81fc8 Michael Hanselmann
          self.assertEqual(result, jqueue._JobProcessor.FINISHED)
695 be760ba8 Michael Hanselmann
          break
696 be760ba8 Michael Hanselmann
697 75d81fc8 Michael Hanselmann
        self.assertEqual(result, jqueue._JobProcessor.DEFER)
698 be760ba8 Michael Hanselmann
699 be760ba8 Michael Hanselmann
        self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
700 be760ba8 Michael Hanselmann
701 ebb2a2a3 Michael Hanselmann
      self.assertRaises(IndexError, queue.GetNextUpdate)
702 ebb2a2a3 Michael Hanselmann
703 be760ba8 Michael Hanselmann
      # Check job status
704 be760ba8 Michael Hanselmann
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_ERROR)
705 be760ba8 Michael Hanselmann
      self.assertEqual(job.GetInfo(["id"]), [job_id])
706 be760ba8 Michael Hanselmann
      self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_ERROR])
707 be760ba8 Michael Hanselmann
708 be760ba8 Michael Hanselmann
      # Check opcode status
709 be760ba8 Michael Hanselmann
      data = zip(job.ops,
710 be760ba8 Michael Hanselmann
                 job.GetInfo(["opstatus"])[0],
711 be760ba8 Michael Hanselmann
                 job.GetInfo(["opresult"])[0])
712 be760ba8 Michael Hanselmann
713 be760ba8 Michael Hanselmann
      for idx, (op, opstatus, opresult) in enumerate(data):
714 be760ba8 Michael Hanselmann
        if idx < failfrom:
715 be760ba8 Michael Hanselmann
          assert not op.input.fail
716 be760ba8 Michael Hanselmann
          self.assertEqual(opstatus, constants.OP_STATUS_SUCCESS)
717 be760ba8 Michael Hanselmann
          self.assertEqual(opresult, op.input.result)
718 be760ba8 Michael Hanselmann
        elif idx <= failto:
719 be760ba8 Michael Hanselmann
          assert op.input.fail
720 be760ba8 Michael Hanselmann
          self.assertEqual(opstatus, constants.OP_STATUS_ERROR)
721 be760ba8 Michael Hanselmann
          self.assertRaises(errors.OpExecError, errors.MaybeRaise, opresult)
722 be760ba8 Michael Hanselmann
        else:
723 be760ba8 Michael Hanselmann
          assert not op.input.fail
724 be760ba8 Michael Hanselmann
          self.assertEqual(opstatus, constants.OP_STATUS_ERROR)
725 be760ba8 Michael Hanselmann
          self.assertRaises(errors.OpExecError, errors.MaybeRaise, opresult)
726 be760ba8 Michael Hanselmann
727 be760ba8 Michael Hanselmann
      self.assert_(compat.all(op.start_timestamp and op.end_timestamp
728 be760ba8 Michael Hanselmann
                              for op in job.ops[:failfrom]))
729 be760ba8 Michael Hanselmann
730 be760ba8 Michael Hanselmann
      self._GenericCheckJob(job)
731 be760ba8 Michael Hanselmann
732 66bd7445 Michael Hanselmann
      # Calling the processor on a finished job should be a no-op
733 75d81fc8 Michael Hanselmann
      self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
734 75d81fc8 Michael Hanselmann
                       jqueue._JobProcessor.FINISHED)
735 66bd7445 Michael Hanselmann
      self.assertRaises(IndexError, queue.GetNextUpdate)
736 be760ba8 Michael Hanselmann
737 be760ba8 Michael Hanselmann
  def testCancelWhileInQueue(self):
738 be760ba8 Michael Hanselmann
    queue = _FakeQueueForProc()
739 be760ba8 Michael Hanselmann
740 be760ba8 Michael Hanselmann
    ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
741 be760ba8 Michael Hanselmann
           for i in range(5)]
742 be760ba8 Michael Hanselmann
743 be760ba8 Michael Hanselmann
    # Create job
744 be760ba8 Michael Hanselmann
    job_id = 17045
745 be760ba8 Michael Hanselmann
    job = self._CreateJob(queue, job_id, ops)
746 be760ba8 Michael Hanselmann
747 be760ba8 Michael Hanselmann
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
748 be760ba8 Michael Hanselmann
749 be760ba8 Michael Hanselmann
    # Mark as cancelled
750 be760ba8 Michael Hanselmann
    (success, _) = job.Cancel()
751 be760ba8 Michael Hanselmann
    self.assert_(success)
752 be760ba8 Michael Hanselmann
753 ebb2a2a3 Michael Hanselmann
    self.assertRaises(IndexError, queue.GetNextUpdate)
754 ebb2a2a3 Michael Hanselmann
755 66bd7445 Michael Hanselmann
    self.assertFalse(job.start_timestamp)
756 66bd7445 Michael Hanselmann
    self.assertTrue(job.end_timestamp)
757 be760ba8 Michael Hanselmann
    self.assert_(compat.all(op.status == constants.OP_STATUS_CANCELED
758 be760ba8 Michael Hanselmann
                            for op in job.ops))
759 be760ba8 Michael Hanselmann
760 66bd7445 Michael Hanselmann
    # Serialize to check for differences
761 66bd7445 Michael Hanselmann
    before_proc = job.Serialize()
762 66bd7445 Michael Hanselmann
763 66bd7445 Michael Hanselmann
    # Simulate processor called in workerpool
764 ebb2a2a3 Michael Hanselmann
    opexec = _FakeExecOpCodeForProc(queue, None, None)
765 75d81fc8 Michael Hanselmann
    self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
766 75d81fc8 Michael Hanselmann
                     jqueue._JobProcessor.FINISHED)
767 30c945d0 Michael Hanselmann
768 30c945d0 Michael Hanselmann
    # Check result
769 30c945d0 Michael Hanselmann
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_CANCELED)
770 30c945d0 Michael Hanselmann
    self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_CANCELED])
771 30c945d0 Michael Hanselmann
    self.assertFalse(job.start_timestamp)
772 66bd7445 Michael Hanselmann
    self.assertTrue(job.end_timestamp)
773 30c945d0 Michael Hanselmann
    self.assertFalse(compat.any(op.start_timestamp or op.end_timestamp
774 30c945d0 Michael Hanselmann
                                for op in job.ops))
775 30c945d0 Michael Hanselmann
    self.assertEqual(job.GetInfo(["opstatus", "opresult"]),
776 30c945d0 Michael Hanselmann
                     [[constants.OP_STATUS_CANCELED for _ in job.ops],
777 30c945d0 Michael Hanselmann
                      ["Job canceled by request" for _ in job.ops]])
778 30c945d0 Michael Hanselmann
779 66bd7445 Michael Hanselmann
    # Must not have changed or written
780 66bd7445 Michael Hanselmann
    self.assertEqual(before_proc, job.Serialize())
781 66bd7445 Michael Hanselmann
    self.assertRaises(IndexError, queue.GetNextUpdate)
782 66bd7445 Michael Hanselmann
783 30c945d0 Michael Hanselmann
  def testCancelWhileWaitlockInQueue(self):
784 30c945d0 Michael Hanselmann
    queue = _FakeQueueForProc()
785 30c945d0 Michael Hanselmann
786 30c945d0 Michael Hanselmann
    ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
787 30c945d0 Michael Hanselmann
           for i in range(5)]
788 30c945d0 Michael Hanselmann
789 30c945d0 Michael Hanselmann
    # Create job
790 30c945d0 Michael Hanselmann
    job_id = 8645
791 30c945d0 Michael Hanselmann
    job = self._CreateJob(queue, job_id, ops)
792 30c945d0 Michael Hanselmann
793 30c945d0 Michael Hanselmann
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
794 30c945d0 Michael Hanselmann
795 47099cd1 Michael Hanselmann
    job.ops[0].status = constants.OP_STATUS_WAITING
796 30c945d0 Michael Hanselmann
797 30c945d0 Michael Hanselmann
    assert len(job.ops) == 5
798 30c945d0 Michael Hanselmann
799 47099cd1 Michael Hanselmann
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITING)
800 30c945d0 Michael Hanselmann
801 30c945d0 Michael Hanselmann
    # Mark as cancelling
802 30c945d0 Michael Hanselmann
    (success, _) = job.Cancel()
803 30c945d0 Michael Hanselmann
    self.assert_(success)
804 30c945d0 Michael Hanselmann
805 30c945d0 Michael Hanselmann
    self.assertRaises(IndexError, queue.GetNextUpdate)
806 30c945d0 Michael Hanselmann
807 30c945d0 Michael Hanselmann
    self.assert_(compat.all(op.status == constants.OP_STATUS_CANCELING
808 30c945d0 Michael Hanselmann
                            for op in job.ops))
809 30c945d0 Michael Hanselmann
810 30c945d0 Michael Hanselmann
    opexec = _FakeExecOpCodeForProc(queue, None, None)
811 75d81fc8 Michael Hanselmann
    self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
812 75d81fc8 Michael Hanselmann
                     jqueue._JobProcessor.FINISHED)
813 be760ba8 Michael Hanselmann
814 be760ba8 Michael Hanselmann
    # Check result
815 be760ba8 Michael Hanselmann
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_CANCELED)
816 be760ba8 Michael Hanselmann
    self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_CANCELED])
817 be760ba8 Michael Hanselmann
    self.assertFalse(job.start_timestamp)
818 be760ba8 Michael Hanselmann
    self.assert_(job.end_timestamp)
819 be760ba8 Michael Hanselmann
    self.assertFalse(compat.any(op.start_timestamp or op.end_timestamp
820 be760ba8 Michael Hanselmann
                                for op in job.ops))
821 be760ba8 Michael Hanselmann
    self.assertEqual(job.GetInfo(["opstatus", "opresult"]),
822 be760ba8 Michael Hanselmann
                     [[constants.OP_STATUS_CANCELED for _ in job.ops],
823 be760ba8 Michael Hanselmann
                      ["Job canceled by request" for _ in job.ops]])
824 be760ba8 Michael Hanselmann
825 be760ba8 Michael Hanselmann
  def testCancelWhileWaitlock(self):
826 be760ba8 Michael Hanselmann
    queue = _FakeQueueForProc()
827 be760ba8 Michael Hanselmann
828 be760ba8 Michael Hanselmann
    ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
829 be760ba8 Michael Hanselmann
           for i in range(5)]
830 be760ba8 Michael Hanselmann
831 be760ba8 Michael Hanselmann
    # Create job
832 be760ba8 Michael Hanselmann
    job_id = 11009
833 be760ba8 Michael Hanselmann
    job = self._CreateJob(queue, job_id, ops)
834 be760ba8 Michael Hanselmann
835 be760ba8 Michael Hanselmann
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
836 be760ba8 Michael Hanselmann
837 f23db633 Michael Hanselmann
    def _BeforeStart(timeout, priority):
838 ebb2a2a3 Michael Hanselmann
      self.assertEqual(queue.GetNextUpdate(), (job, True))
839 ebb2a2a3 Michael Hanselmann
      self.assertRaises(IndexError, queue.GetNextUpdate)
840 be760ba8 Michael Hanselmann
      self.assertFalse(queue.IsAcquired())
841 47099cd1 Michael Hanselmann
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITING)
842 be760ba8 Michael Hanselmann
843 be760ba8 Michael Hanselmann
      # Mark as cancelled
844 be760ba8 Michael Hanselmann
      (success, _) = job.Cancel()
845 be760ba8 Michael Hanselmann
      self.assert_(success)
846 be760ba8 Michael Hanselmann
847 be760ba8 Michael Hanselmann
      self.assert_(compat.all(op.status == constants.OP_STATUS_CANCELING
848 be760ba8 Michael Hanselmann
                              for op in job.ops))
849 ebb2a2a3 Michael Hanselmann
      self.assertRaises(IndexError, queue.GetNextUpdate)
850 be760ba8 Michael Hanselmann
851 be760ba8 Michael Hanselmann
    def _AfterStart(op, cbs):
852 ebb2a2a3 Michael Hanselmann
      self.assertEqual(queue.GetNextUpdate(), (job, True))
853 ebb2a2a3 Michael Hanselmann
      self.assertRaises(IndexError, queue.GetNextUpdate)
854 be760ba8 Michael Hanselmann
      self.assertFalse(queue.IsAcquired())
855 be760ba8 Michael Hanselmann
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
856 be760ba8 Michael Hanselmann
857 ebb2a2a3 Michael Hanselmann
    opexec = _FakeExecOpCodeForProc(queue, _BeforeStart, _AfterStart)
858 be760ba8 Michael Hanselmann
859 ebb2a2a3 Michael Hanselmann
    self.assertRaises(IndexError, queue.GetNextUpdate)
860 75d81fc8 Michael Hanselmann
    self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
861 75d81fc8 Michael Hanselmann
                     jqueue._JobProcessor.FINISHED)
862 ebb2a2a3 Michael Hanselmann
    self.assertEqual(queue.GetNextUpdate(), (job, True))
863 ebb2a2a3 Michael Hanselmann
    self.assertRaises(IndexError, queue.GetNextUpdate)
864 be760ba8 Michael Hanselmann
865 be760ba8 Michael Hanselmann
    # Check result
866 be760ba8 Michael Hanselmann
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_CANCELED)
867 be760ba8 Michael Hanselmann
    self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_CANCELED])
868 be760ba8 Michael Hanselmann
    self.assert_(job.start_timestamp)
869 be760ba8 Michael Hanselmann
    self.assert_(job.end_timestamp)
870 be760ba8 Michael Hanselmann
    self.assertFalse(compat.all(op.start_timestamp and op.end_timestamp
871 be760ba8 Michael Hanselmann
                                for op in job.ops))
872 be760ba8 Michael Hanselmann
    self.assertEqual(job.GetInfo(["opstatus", "opresult"]),
873 be760ba8 Michael Hanselmann
                     [[constants.OP_STATUS_CANCELED for _ in job.ops],
874 be760ba8 Michael Hanselmann
                      ["Job canceled by request" for _ in job.ops]])
875 be760ba8 Michael Hanselmann
876 9e49dfc5 Michael Hanselmann
  def testCancelWhileWaitlockWithTimeout(self):
877 9e49dfc5 Michael Hanselmann
    queue = _FakeQueueForProc()
878 9e49dfc5 Michael Hanselmann
879 9e49dfc5 Michael Hanselmann
    ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
880 9e49dfc5 Michael Hanselmann
           for i in range(5)]
881 9e49dfc5 Michael Hanselmann
882 9e49dfc5 Michael Hanselmann
    # Create job
883 9e49dfc5 Michael Hanselmann
    job_id = 24314
884 9e49dfc5 Michael Hanselmann
    job = self._CreateJob(queue, job_id, ops)
885 9e49dfc5 Michael Hanselmann
886 9e49dfc5 Michael Hanselmann
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
887 9e49dfc5 Michael Hanselmann
888 9e49dfc5 Michael Hanselmann
    def _BeforeStart(timeout, priority):
889 9e49dfc5 Michael Hanselmann
      self.assertFalse(queue.IsAcquired())
890 47099cd1 Michael Hanselmann
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITING)
891 9e49dfc5 Michael Hanselmann
892 9e49dfc5 Michael Hanselmann
      # Mark as cancelled
893 9e49dfc5 Michael Hanselmann
      (success, _) = job.Cancel()
894 9e49dfc5 Michael Hanselmann
      self.assert_(success)
895 9e49dfc5 Michael Hanselmann
896 9e49dfc5 Michael Hanselmann
      self.assert_(compat.all(op.status == constants.OP_STATUS_CANCELING
897 9e49dfc5 Michael Hanselmann
                              for op in job.ops))
898 9e49dfc5 Michael Hanselmann
899 9e49dfc5 Michael Hanselmann
      # Fake an acquire attempt timing out
900 9e49dfc5 Michael Hanselmann
      raise mcpu.LockAcquireTimeout()
901 9e49dfc5 Michael Hanselmann
902 9e49dfc5 Michael Hanselmann
    def _AfterStart(op, cbs):
903 9e49dfc5 Michael Hanselmann
      self.fail("Should not reach this")
904 9e49dfc5 Michael Hanselmann
905 ebb2a2a3 Michael Hanselmann
    opexec = _FakeExecOpCodeForProc(queue, _BeforeStart, _AfterStart)
906 9e49dfc5 Michael Hanselmann
907 75d81fc8 Michael Hanselmann
    self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
908 75d81fc8 Michael Hanselmann
                     jqueue._JobProcessor.FINISHED)
909 9e49dfc5 Michael Hanselmann
910 9e49dfc5 Michael Hanselmann
    # Check result
911 9e49dfc5 Michael Hanselmann
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_CANCELED)
912 9e49dfc5 Michael Hanselmann
    self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_CANCELED])
913 9e49dfc5 Michael Hanselmann
    self.assert_(job.start_timestamp)
914 9e49dfc5 Michael Hanselmann
    self.assert_(job.end_timestamp)
915 9e49dfc5 Michael Hanselmann
    self.assertFalse(compat.all(op.start_timestamp and op.end_timestamp
916 9e49dfc5 Michael Hanselmann
                                for op in job.ops))
917 9e49dfc5 Michael Hanselmann
    self.assertEqual(job.GetInfo(["opstatus", "opresult"]),
918 9e49dfc5 Michael Hanselmann
                     [[constants.OP_STATUS_CANCELED for _ in job.ops],
919 9e49dfc5 Michael Hanselmann
                      ["Job canceled by request" for _ in job.ops]])
920 9e49dfc5 Michael Hanselmann
921 be760ba8 Michael Hanselmann
  def testCancelWhileRunning(self):
922 be760ba8 Michael Hanselmann
    # Tests canceling a job with finished opcodes and more, unprocessed ones
923 be760ba8 Michael Hanselmann
    queue = _FakeQueueForProc()
924 be760ba8 Michael Hanselmann
925 be760ba8 Michael Hanselmann
    ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
926 be760ba8 Michael Hanselmann
           for i in range(3)]
927 be760ba8 Michael Hanselmann
928 be760ba8 Michael Hanselmann
    # Create job
929 be760ba8 Michael Hanselmann
    job_id = 28492
930 be760ba8 Michael Hanselmann
    job = self._CreateJob(queue, job_id, ops)
931 be760ba8 Michael Hanselmann
932 be760ba8 Michael Hanselmann
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
933 be760ba8 Michael Hanselmann
934 ebb2a2a3 Michael Hanselmann
    opexec = _FakeExecOpCodeForProc(queue, None, None)
935 be760ba8 Michael Hanselmann
936 be760ba8 Michael Hanselmann
    # Run one opcode
937 75d81fc8 Michael Hanselmann
    self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
938 75d81fc8 Michael Hanselmann
                     jqueue._JobProcessor.DEFER)
939 be760ba8 Michael Hanselmann
940 be760ba8 Michael Hanselmann
    # Job goes back to queued
941 be760ba8 Michael Hanselmann
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
942 be760ba8 Michael Hanselmann
    self.assertEqual(job.GetInfo(["opstatus", "opresult"]),
943 be760ba8 Michael Hanselmann
                     [[constants.OP_STATUS_SUCCESS,
944 be760ba8 Michael Hanselmann
                       constants.OP_STATUS_QUEUED,
945 be760ba8 Michael Hanselmann
                       constants.OP_STATUS_QUEUED],
946 be760ba8 Michael Hanselmann
                      ["Res0", None, None]])
947 be760ba8 Michael Hanselmann
948 be760ba8 Michael Hanselmann
    # Mark as cancelled
949 be760ba8 Michael Hanselmann
    (success, _) = job.Cancel()
950 be760ba8 Michael Hanselmann
    self.assert_(success)
951 be760ba8 Michael Hanselmann
952 be760ba8 Michael Hanselmann
    # Try processing another opcode (this will actually cancel the job)
953 75d81fc8 Michael Hanselmann
    self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
954 75d81fc8 Michael Hanselmann
                     jqueue._JobProcessor.FINISHED)
955 be760ba8 Michael Hanselmann
956 be760ba8 Michael Hanselmann
    # Check result
957 be760ba8 Michael Hanselmann
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_CANCELED)
958 be760ba8 Michael Hanselmann
    self.assertEqual(job.GetInfo(["id"]), [job_id])
959 be760ba8 Michael Hanselmann
    self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_CANCELED])
960 be760ba8 Michael Hanselmann
    self.assertEqual(job.GetInfo(["opstatus", "opresult"]),
961 be760ba8 Michael Hanselmann
                     [[constants.OP_STATUS_SUCCESS,
962 be760ba8 Michael Hanselmann
                       constants.OP_STATUS_CANCELED,
963 be760ba8 Michael Hanselmann
                       constants.OP_STATUS_CANCELED],
964 be760ba8 Michael Hanselmann
                      ["Res0", "Job canceled by request",
965 be760ba8 Michael Hanselmann
                       "Job canceled by request"]])
966 be760ba8 Michael Hanselmann
967 be760ba8 Michael Hanselmann
  def testPartiallyRun(self):
968 be760ba8 Michael Hanselmann
    # Tests calling the processor on a job that's been partially run before the
969 be760ba8 Michael Hanselmann
    # program was restarted
970 be760ba8 Michael Hanselmann
    queue = _FakeQueueForProc()
971 be760ba8 Michael Hanselmann
972 ebb2a2a3 Michael Hanselmann
    opexec = _FakeExecOpCodeForProc(queue, None, None)
973 be760ba8 Michael Hanselmann
974 be760ba8 Michael Hanselmann
    for job_id, successcount in [(30697, 1), (2552, 4), (12489, 9)]:
975 be760ba8 Michael Hanselmann
      ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
976 be760ba8 Michael Hanselmann
             for i in range(10)]
977 be760ba8 Michael Hanselmann
978 be760ba8 Michael Hanselmann
      # Create job
979 be760ba8 Michael Hanselmann
      job = self._CreateJob(queue, job_id, ops)
980 be760ba8 Michael Hanselmann
981 be760ba8 Michael Hanselmann
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
982 be760ba8 Michael Hanselmann
983 be760ba8 Michael Hanselmann
      for _ in range(successcount):
984 75d81fc8 Michael Hanselmann
        self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
985 75d81fc8 Michael Hanselmann
                         jqueue._JobProcessor.DEFER)
986 be760ba8 Michael Hanselmann
987 be760ba8 Michael Hanselmann
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
988 be760ba8 Michael Hanselmann
      self.assertEqual(job.GetInfo(["opstatus"]),
989 be760ba8 Michael Hanselmann
                       [[constants.OP_STATUS_SUCCESS
990 be760ba8 Michael Hanselmann
                         for _ in range(successcount)] +
991 be760ba8 Michael Hanselmann
                        [constants.OP_STATUS_QUEUED
992 be760ba8 Michael Hanselmann
                         for _ in range(len(ops) - successcount)]])
993 be760ba8 Michael Hanselmann
994 03b63608 Michael Hanselmann
      self.assert_(job.ops_iter)
995 be760ba8 Michael Hanselmann
996 be760ba8 Michael Hanselmann
      # Serialize and restore (simulates program restart)
997 c0f6d0d8 Michael Hanselmann
      newjob = jqueue._QueuedJob.Restore(queue, job.Serialize(), True)
998 03b63608 Michael Hanselmann
      self.assertFalse(newjob.ops_iter)
999 be760ba8 Michael Hanselmann
      self._TestPartial(newjob, successcount)
1000 be760ba8 Michael Hanselmann
1001 be760ba8 Michael Hanselmann
  def _TestPartial(self, job, successcount):
1002 be760ba8 Michael Hanselmann
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1003 be760ba8 Michael Hanselmann
    self.assertEqual(job.start_timestamp, job.ops[0].start_timestamp)
1004 be760ba8 Michael Hanselmann
1005 be760ba8 Michael Hanselmann
    queue = _FakeQueueForProc()
1006 ebb2a2a3 Michael Hanselmann
    opexec = _FakeExecOpCodeForProc(queue, None, None)
1007 be760ba8 Michael Hanselmann
1008 be760ba8 Michael Hanselmann
    for remaining in reversed(range(len(job.ops) - successcount)):
1009 be760ba8 Michael Hanselmann
      result = jqueue._JobProcessor(queue, opexec, job)()
1010 66bd7445 Michael Hanselmann
      self.assertEqual(queue.GetNextUpdate(), (job, True))
1011 66bd7445 Michael Hanselmann
      self.assertEqual(queue.GetNextUpdate(), (job, True))
1012 66bd7445 Michael Hanselmann
      self.assertEqual(queue.GetNextUpdate(), (job, True))
1013 66bd7445 Michael Hanselmann
      self.assertRaises(IndexError, queue.GetNextUpdate)
1014 be760ba8 Michael Hanselmann
1015 be760ba8 Michael Hanselmann
      if remaining == 0:
1016 be760ba8 Michael Hanselmann
        # Last opcode
1017 75d81fc8 Michael Hanselmann
        self.assertEqual(result, jqueue._JobProcessor.FINISHED)
1018 be760ba8 Michael Hanselmann
        break
1019 be760ba8 Michael Hanselmann
1020 75d81fc8 Michael Hanselmann
      self.assertEqual(result, jqueue._JobProcessor.DEFER)
1021 be760ba8 Michael Hanselmann
1022 be760ba8 Michael Hanselmann
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1023 be760ba8 Michael Hanselmann
1024 66bd7445 Michael Hanselmann
    self.assertRaises(IndexError, queue.GetNextUpdate)
1025 be760ba8 Michael Hanselmann
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_SUCCESS)
1026 be760ba8 Michael Hanselmann
    self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_SUCCESS])
1027 be760ba8 Michael Hanselmann
    self.assertEqual(job.GetInfo(["opresult"]),
1028 be760ba8 Michael Hanselmann
                     [[op.input.result for op in job.ops]])
1029 be760ba8 Michael Hanselmann
    self.assertEqual(job.GetInfo(["opstatus"]),
1030 be760ba8 Michael Hanselmann
                     [[constants.OP_STATUS_SUCCESS for _ in job.ops]])
1031 be760ba8 Michael Hanselmann
    self.assert_(compat.all(op.start_timestamp and op.end_timestamp
1032 be760ba8 Michael Hanselmann
                            for op in job.ops))
1033 be760ba8 Michael Hanselmann
1034 be760ba8 Michael Hanselmann
    self._GenericCheckJob(job)
1035 be760ba8 Michael Hanselmann
1036 66bd7445 Michael Hanselmann
    # Calling the processor on a finished job should be a no-op
1037 75d81fc8 Michael Hanselmann
    self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
1038 75d81fc8 Michael Hanselmann
                     jqueue._JobProcessor.FINISHED)
1039 66bd7445 Michael Hanselmann
    self.assertRaises(IndexError, queue.GetNextUpdate)
1040 be760ba8 Michael Hanselmann
1041 be760ba8 Michael Hanselmann
    # ... also after being restored
1042 c0f6d0d8 Michael Hanselmann
    job2 = jqueue._QueuedJob.Restore(queue, job.Serialize(), True)
1043 b95479a5 Michael Hanselmann
    # Calling the processor on a finished job should be a no-op
1044 75d81fc8 Michael Hanselmann
    self.assertEqual(jqueue._JobProcessor(queue, opexec, job2)(),
1045 75d81fc8 Michael Hanselmann
                     jqueue._JobProcessor.FINISHED)
1046 66bd7445 Michael Hanselmann
    self.assertRaises(IndexError, queue.GetNextUpdate)
1047 be760ba8 Michael Hanselmann
1048 be760ba8 Michael Hanselmann
  def testProcessorOnRunningJob(self):
1049 be760ba8 Michael Hanselmann
    ops = [opcodes.OpTestDummy(result="result", fail=False)]
1050 be760ba8 Michael Hanselmann
1051 be760ba8 Michael Hanselmann
    queue = _FakeQueueForProc()
1052 ebb2a2a3 Michael Hanselmann
    opexec = _FakeExecOpCodeForProc(queue, None, None)
1053 be760ba8 Michael Hanselmann
1054 be760ba8 Michael Hanselmann
    # Create job
1055 be760ba8 Michael Hanselmann
    job = self._CreateJob(queue, 9571, ops)
1056 be760ba8 Michael Hanselmann
1057 be760ba8 Michael Hanselmann
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1058 be760ba8 Michael Hanselmann
1059 be760ba8 Michael Hanselmann
    job.ops[0].status = constants.OP_STATUS_RUNNING
1060 be760ba8 Michael Hanselmann
1061 be760ba8 Michael Hanselmann
    assert len(job.ops) == 1
1062 be760ba8 Michael Hanselmann
1063 be760ba8 Michael Hanselmann
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
1064 be760ba8 Michael Hanselmann
1065 be760ba8 Michael Hanselmann
    # Calling on running job must fail
1066 be760ba8 Michael Hanselmann
    self.assertRaises(errors.ProgrammerError,
1067 be760ba8 Michael Hanselmann
                      jqueue._JobProcessor(queue, opexec, job))
1068 be760ba8 Michael Hanselmann
1069 be760ba8 Michael Hanselmann
  def testLogMessages(self):
1070 be760ba8 Michael Hanselmann
    # Tests the "Feedback" callback function
1071 be760ba8 Michael Hanselmann
    queue = _FakeQueueForProc()
1072 be760ba8 Michael Hanselmann
1073 be760ba8 Michael Hanselmann
    messages = {
1074 be760ba8 Michael Hanselmann
      1: [
1075 be760ba8 Michael Hanselmann
        (None, "Hello"),
1076 be760ba8 Michael Hanselmann
        (None, "World"),
1077 be760ba8 Michael Hanselmann
        (constants.ELOG_MESSAGE, "there"),
1078 be760ba8 Michael Hanselmann
        ],
1079 be760ba8 Michael Hanselmann
      4: [
1080 be760ba8 Michael Hanselmann
        (constants.ELOG_JQUEUE_TEST, (1, 2, 3)),
1081 be760ba8 Michael Hanselmann
        (constants.ELOG_JQUEUE_TEST, ("other", "type")),
1082 be760ba8 Michael Hanselmann
        ],
1083 be760ba8 Michael Hanselmann
      }
1084 be760ba8 Michael Hanselmann
    ops = [opcodes.OpTestDummy(result="Logtest%s" % i, fail=False,
1085 be760ba8 Michael Hanselmann
                               messages=messages.get(i, []))
1086 be760ba8 Michael Hanselmann
           for i in range(5)]
1087 be760ba8 Michael Hanselmann
1088 be760ba8 Michael Hanselmann
    # Create job
1089 be760ba8 Michael Hanselmann
    job = self._CreateJob(queue, 29386, ops)
1090 be760ba8 Michael Hanselmann
1091 f23db633 Michael Hanselmann
    def _BeforeStart(timeout, priority):
1092 ebb2a2a3 Michael Hanselmann
      self.assertEqual(queue.GetNextUpdate(), (job, True))
1093 ebb2a2a3 Michael Hanselmann
      self.assertRaises(IndexError, queue.GetNextUpdate)
1094 be760ba8 Michael Hanselmann
      self.assertFalse(queue.IsAcquired())
1095 47099cd1 Michael Hanselmann
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITING)
1096 be760ba8 Michael Hanselmann
1097 be760ba8 Michael Hanselmann
    def _AfterStart(op, cbs):
1098 ebb2a2a3 Michael Hanselmann
      self.assertEqual(queue.GetNextUpdate(), (job, True))
1099 ebb2a2a3 Michael Hanselmann
      self.assertRaises(IndexError, queue.GetNextUpdate)
1100 be760ba8 Michael Hanselmann
      self.assertFalse(queue.IsAcquired())
1101 be760ba8 Michael Hanselmann
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
1102 be760ba8 Michael Hanselmann
1103 be760ba8 Michael Hanselmann
      self.assertRaises(AssertionError, cbs.Feedback,
1104 be760ba8 Michael Hanselmann
                        "too", "many", "arguments")
1105 be760ba8 Michael Hanselmann
1106 be760ba8 Michael Hanselmann
      for (log_type, msg) in op.messages:
1107 ebb2a2a3 Michael Hanselmann
        self.assertRaises(IndexError, queue.GetNextUpdate)
1108 be760ba8 Michael Hanselmann
        if log_type:
1109 be760ba8 Michael Hanselmann
          cbs.Feedback(log_type, msg)
1110 be760ba8 Michael Hanselmann
        else:
1111 be760ba8 Michael Hanselmann
          cbs.Feedback(msg)
1112 ebb2a2a3 Michael Hanselmann
        # Check for job update without replication
1113 ebb2a2a3 Michael Hanselmann
        self.assertEqual(queue.GetNextUpdate(), (job, False))
1114 ebb2a2a3 Michael Hanselmann
        self.assertRaises(IndexError, queue.GetNextUpdate)
1115 be760ba8 Michael Hanselmann
1116 ebb2a2a3 Michael Hanselmann
    opexec = _FakeExecOpCodeForProc(queue, _BeforeStart, _AfterStart)
1117 be760ba8 Michael Hanselmann
1118 be760ba8 Michael Hanselmann
    for remaining in reversed(range(len(job.ops))):
1119 ebb2a2a3 Michael Hanselmann
      self.assertRaises(IndexError, queue.GetNextUpdate)
1120 be760ba8 Michael Hanselmann
      result = jqueue._JobProcessor(queue, opexec, job)()
1121 ebb2a2a3 Michael Hanselmann
      self.assertEqual(queue.GetNextUpdate(), (job, True))
1122 ebb2a2a3 Michael Hanselmann
      self.assertRaises(IndexError, queue.GetNextUpdate)
1123 be760ba8 Michael Hanselmann
1124 be760ba8 Michael Hanselmann
      if remaining == 0:
1125 be760ba8 Michael Hanselmann
        # Last opcode
1126 75d81fc8 Michael Hanselmann
        self.assertEqual(result, jqueue._JobProcessor.FINISHED)
1127 be760ba8 Michael Hanselmann
        break
1128 be760ba8 Michael Hanselmann
1129 75d81fc8 Michael Hanselmann
      self.assertEqual(result, jqueue._JobProcessor.DEFER)
1130 be760ba8 Michael Hanselmann
1131 be760ba8 Michael Hanselmann
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1132 be760ba8 Michael Hanselmann
1133 ebb2a2a3 Michael Hanselmann
    self.assertRaises(IndexError, queue.GetNextUpdate)
1134 ebb2a2a3 Michael Hanselmann
1135 be760ba8 Michael Hanselmann
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_SUCCESS)
1136 be760ba8 Michael Hanselmann
    self.assertEqual(job.GetInfo(["opresult"]),
1137 be760ba8 Michael Hanselmann
                     [[op.input.result for op in job.ops]])
1138 be760ba8 Michael Hanselmann
1139 be760ba8 Michael Hanselmann
    logmsgcount = sum(len(m) for m in messages.values())
1140 be760ba8 Michael Hanselmann
1141 be760ba8 Michael Hanselmann
    self._CheckLogMessages(job, logmsgcount)
1142 be760ba8 Michael Hanselmann
1143 be760ba8 Michael Hanselmann
    # Serialize and restore (simulates program restart)
1144 c0f6d0d8 Michael Hanselmann
    newjob = jqueue._QueuedJob.Restore(queue, job.Serialize(), True)
1145 be760ba8 Michael Hanselmann
    self._CheckLogMessages(newjob, logmsgcount)
1146 be760ba8 Michael Hanselmann
1147 be760ba8 Michael Hanselmann
    # Check each message
1148 be760ba8 Michael Hanselmann
    prevserial = -1
1149 be760ba8 Michael Hanselmann
    for idx, oplog in enumerate(job.GetInfo(["oplog"])[0]):
1150 be760ba8 Michael Hanselmann
      for (serial, timestamp, log_type, msg) in oplog:
1151 be760ba8 Michael Hanselmann
        (exptype, expmsg) = messages.get(idx).pop(0)
1152 be760ba8 Michael Hanselmann
        if exptype:
1153 be760ba8 Michael Hanselmann
          self.assertEqual(log_type, exptype)
1154 be760ba8 Michael Hanselmann
        else:
1155 be760ba8 Michael Hanselmann
          self.assertEqual(log_type, constants.ELOG_MESSAGE)
1156 be760ba8 Michael Hanselmann
        self.assertEqual(expmsg, msg)
1157 be760ba8 Michael Hanselmann
        self.assert_(serial > prevserial)
1158 be760ba8 Michael Hanselmann
        prevserial = serial
1159 be760ba8 Michael Hanselmann
1160 be760ba8 Michael Hanselmann
  def _CheckLogMessages(self, job, count):
1161 be760ba8 Michael Hanselmann
    # Check serial
1162 be760ba8 Michael Hanselmann
    self.assertEqual(job.log_serial, count)
1163 be760ba8 Michael Hanselmann
1164 be760ba8 Michael Hanselmann
    # No filter
1165 be760ba8 Michael Hanselmann
    self.assertEqual(job.GetLogEntries(None),
1166 be760ba8 Michael Hanselmann
                     [entry for entries in job.GetInfo(["oplog"])[0] if entries
1167 be760ba8 Michael Hanselmann
                      for entry in entries])
1168 be760ba8 Michael Hanselmann
1169 be760ba8 Michael Hanselmann
    # Filter with serial
1170 be760ba8 Michael Hanselmann
    assert count > 3
1171 be760ba8 Michael Hanselmann
    self.assert_(job.GetLogEntries(3))
1172 be760ba8 Michael Hanselmann
    self.assertEqual(job.GetLogEntries(3),
1173 be760ba8 Michael Hanselmann
                     [entry for entries in job.GetInfo(["oplog"])[0] if entries
1174 be760ba8 Michael Hanselmann
                      for entry in entries][3:])
1175 be760ba8 Michael Hanselmann
1176 be760ba8 Michael Hanselmann
    # No log message after highest serial
1177 be760ba8 Michael Hanselmann
    self.assertFalse(job.GetLogEntries(count))
1178 be760ba8 Michael Hanselmann
    self.assertFalse(job.GetLogEntries(count + 3))
1179 be760ba8 Michael Hanselmann
1180 6a373640 Michael Hanselmann
  def testSubmitManyJobs(self):
1181 6a373640 Michael Hanselmann
    queue = _FakeQueueForProc()
1182 6a373640 Michael Hanselmann
1183 6a373640 Michael Hanselmann
    job_id = 15656
1184 6a373640 Michael Hanselmann
    ops = [
1185 6a373640 Michael Hanselmann
      opcodes.OpTestDummy(result="Res0", fail=False,
1186 6a373640 Michael Hanselmann
                          submit_jobs=[]),
1187 6a373640 Michael Hanselmann
      opcodes.OpTestDummy(result="Res1", fail=False,
1188 6a373640 Michael Hanselmann
                          submit_jobs=[
1189 6a373640 Michael Hanselmann
                            [opcodes.OpTestDummy(result="r1j0", fail=False)],
1190 6a373640 Michael Hanselmann
                            ]),
1191 6a373640 Michael Hanselmann
      opcodes.OpTestDummy(result="Res2", fail=False,
1192 6a373640 Michael Hanselmann
                          submit_jobs=[
1193 6a373640 Michael Hanselmann
                            [opcodes.OpTestDummy(result="r2j0o0", fail=False),
1194 6a373640 Michael Hanselmann
                             opcodes.OpTestDummy(result="r2j0o1", fail=False),
1195 6a373640 Michael Hanselmann
                             opcodes.OpTestDummy(result="r2j0o2", fail=False),
1196 6a373640 Michael Hanselmann
                             opcodes.OpTestDummy(result="r2j0o3", fail=False)],
1197 6a373640 Michael Hanselmann
                            [opcodes.OpTestDummy(result="r2j1", fail=False)],
1198 6a373640 Michael Hanselmann
                            [opcodes.OpTestDummy(result="r2j3o0", fail=False),
1199 6a373640 Michael Hanselmann
                             opcodes.OpTestDummy(result="r2j3o1", fail=False)],
1200 6a373640 Michael Hanselmann
                            ]),
1201 6a373640 Michael Hanselmann
      ]
1202 6a373640 Michael Hanselmann
1203 6a373640 Michael Hanselmann
    # Create job
1204 6a373640 Michael Hanselmann
    job = self._CreateJob(queue, job_id, ops)
1205 6a373640 Michael Hanselmann
1206 6a373640 Michael Hanselmann
    def _BeforeStart(timeout, priority):
1207 6a373640 Michael Hanselmann
      self.assertEqual(queue.GetNextUpdate(), (job, True))
1208 6a373640 Michael Hanselmann
      self.assertRaises(IndexError, queue.GetNextUpdate)
1209 6a373640 Michael Hanselmann
      self.assertFalse(queue.IsAcquired())
1210 47099cd1 Michael Hanselmann
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITING)
1211 6a373640 Michael Hanselmann
      self.assertFalse(job.cur_opctx)
1212 6a373640 Michael Hanselmann
1213 6a373640 Michael Hanselmann
    def _AfterStart(op, cbs):
1214 6a373640 Michael Hanselmann
      self.assertEqual(queue.GetNextUpdate(), (job, True))
1215 6a373640 Michael Hanselmann
      self.assertRaises(IndexError, queue.GetNextUpdate)
1216 6a373640 Michael Hanselmann
1217 6a373640 Michael Hanselmann
      self.assertFalse(queue.IsAcquired())
1218 6a373640 Michael Hanselmann
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
1219 6a373640 Michael Hanselmann
      self.assertFalse(job.cur_opctx)
1220 6a373640 Michael Hanselmann
1221 6a373640 Michael Hanselmann
      # Job is running, cancelling shouldn't be possible
1222 6a373640 Michael Hanselmann
      (success, _) = job.Cancel()
1223 6a373640 Michael Hanselmann
      self.assertFalse(success)
1224 6a373640 Michael Hanselmann
1225 6a373640 Michael Hanselmann
    opexec = _FakeExecOpCodeForProc(queue, _BeforeStart, _AfterStart)
1226 6a373640 Michael Hanselmann
1227 6a373640 Michael Hanselmann
    for idx in range(len(ops)):
1228 6a373640 Michael Hanselmann
      self.assertRaises(IndexError, queue.GetNextUpdate)
1229 6a373640 Michael Hanselmann
      result = jqueue._JobProcessor(queue, opexec, job)()
1230 6a373640 Michael Hanselmann
      self.assertEqual(queue.GetNextUpdate(), (job, True))
1231 6a373640 Michael Hanselmann
      self.assertRaises(IndexError, queue.GetNextUpdate)
1232 6a373640 Michael Hanselmann
      if idx == len(ops) - 1:
1233 6a373640 Michael Hanselmann
        # Last opcode
1234 75d81fc8 Michael Hanselmann
        self.assertEqual(result, jqueue._JobProcessor.FINISHED)
1235 6a373640 Michael Hanselmann
      else:
1236 75d81fc8 Michael Hanselmann
        self.assertEqual(result, jqueue._JobProcessor.DEFER)
1237 6a373640 Michael Hanselmann
1238 6a373640 Michael Hanselmann
        self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1239 6a373640 Michael Hanselmann
        self.assert_(job.start_timestamp)
1240 6a373640 Michael Hanselmann
        self.assertFalse(job.end_timestamp)
1241 6a373640 Michael Hanselmann
1242 6a373640 Michael Hanselmann
    self.assertRaises(IndexError, queue.GetNextUpdate)
1243 6a373640 Michael Hanselmann
1244 6a373640 Michael Hanselmann
    for idx, submitted_ops in enumerate(job_ops
1245 6a373640 Michael Hanselmann
                                        for op in ops
1246 6a373640 Michael Hanselmann
                                        for job_ops in op.submit_jobs):
1247 6a373640 Michael Hanselmann
      self.assertEqual(queue.GetNextSubmittedJob(),
1248 6a373640 Michael Hanselmann
                       (1000 + idx, submitted_ops))
1249 6a373640 Michael Hanselmann
    self.assertRaises(IndexError, queue.GetNextSubmittedJob)
1250 6a373640 Michael Hanselmann
1251 6a373640 Michael Hanselmann
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_SUCCESS)
1252 6a373640 Michael Hanselmann
    self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_SUCCESS])
1253 6a373640 Michael Hanselmann
    self.assertEqual(job.GetInfo(["opresult"]),
1254 6a373640 Michael Hanselmann
                     [[[], [1000], [1001, 1002, 1003]]])
1255 6a373640 Michael Hanselmann
    self.assertEqual(job.GetInfo(["opstatus"]),
1256 6a373640 Michael Hanselmann
                     [len(job.ops) * [constants.OP_STATUS_SUCCESS]])
1257 6a373640 Michael Hanselmann
1258 6a373640 Michael Hanselmann
    self._GenericCheckJob(job)
1259 6a373640 Michael Hanselmann
1260 1e6d5750 Iustin Pop
    # Calling the processor on a finished job should be a no-op
1261 75d81fc8 Michael Hanselmann
    self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
1262 75d81fc8 Michael Hanselmann
                     jqueue._JobProcessor.FINISHED)
1263 1e6d5750 Iustin Pop
    self.assertRaises(IndexError, queue.GetNextUpdate)
1264 6a373640 Michael Hanselmann
1265 b95479a5 Michael Hanselmann
  def testJobDependency(self):
1266 b95479a5 Michael Hanselmann
    depmgr = _FakeDependencyManager()
1267 b95479a5 Michael Hanselmann
    queue = _FakeQueueForProc(depmgr=depmgr)
1268 b95479a5 Michael Hanselmann
1269 b95479a5 Michael Hanselmann
    self.assertEqual(queue.depmgr, depmgr)
1270 b95479a5 Michael Hanselmann
1271 b95479a5 Michael Hanselmann
    prev_job_id = 22113
1272 b95479a5 Michael Hanselmann
    prev_job_id2 = 28102
1273 b95479a5 Michael Hanselmann
    job_id = 29929
1274 b95479a5 Michael Hanselmann
    ops = [
1275 b95479a5 Michael Hanselmann
      opcodes.OpTestDummy(result="Res0", fail=False,
1276 b95479a5 Michael Hanselmann
                          depends=[
1277 b95479a5 Michael Hanselmann
                            [prev_job_id2, None],
1278 b95479a5 Michael Hanselmann
                            [prev_job_id, None],
1279 b95479a5 Michael Hanselmann
                            ]),
1280 b95479a5 Michael Hanselmann
      opcodes.OpTestDummy(result="Res1", fail=False),
1281 b95479a5 Michael Hanselmann
      ]
1282 b95479a5 Michael Hanselmann
1283 b95479a5 Michael Hanselmann
    # Create job
1284 b95479a5 Michael Hanselmann
    job = self._CreateJob(queue, job_id, ops)
1285 b95479a5 Michael Hanselmann
1286 b95479a5 Michael Hanselmann
    def _BeforeStart(timeout, priority):
1287 b95479a5 Michael Hanselmann
      if attempt == 0 or attempt > 5:
1288 b95479a5 Michael Hanselmann
        # Job should only be updated when it wasn't waiting for another job
1289 b95479a5 Michael Hanselmann
        self.assertEqual(queue.GetNextUpdate(), (job, True))
1290 b95479a5 Michael Hanselmann
      self.assertRaises(IndexError, queue.GetNextUpdate)
1291 b95479a5 Michael Hanselmann
      self.assertFalse(queue.IsAcquired())
1292 47099cd1 Michael Hanselmann
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITING)
1293 b95479a5 Michael Hanselmann
      self.assertFalse(job.cur_opctx)
1294 b95479a5 Michael Hanselmann
1295 b95479a5 Michael Hanselmann
    def _AfterStart(op, cbs):
1296 b95479a5 Michael Hanselmann
      self.assertEqual(queue.GetNextUpdate(), (job, True))
1297 b95479a5 Michael Hanselmann
      self.assertRaises(IndexError, queue.GetNextUpdate)
1298 b95479a5 Michael Hanselmann
1299 b95479a5 Michael Hanselmann
      self.assertFalse(queue.IsAcquired())
1300 b95479a5 Michael Hanselmann
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
1301 b95479a5 Michael Hanselmann
      self.assertFalse(job.cur_opctx)
1302 b95479a5 Michael Hanselmann
1303 b95479a5 Michael Hanselmann
      # Job is running, cancelling shouldn't be possible
1304 b95479a5 Michael Hanselmann
      (success, _) = job.Cancel()
1305 b95479a5 Michael Hanselmann
      self.assertFalse(success)
1306 b95479a5 Michael Hanselmann
1307 b95479a5 Michael Hanselmann
    opexec = _FakeExecOpCodeForProc(queue, _BeforeStart, _AfterStart)
1308 b95479a5 Michael Hanselmann
1309 b95479a5 Michael Hanselmann
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1310 b95479a5 Michael Hanselmann
1311 b95479a5 Michael Hanselmann
    counter = itertools.count()
1312 b95479a5 Michael Hanselmann
    while True:
1313 b95479a5 Michael Hanselmann
      attempt = counter.next()
1314 b95479a5 Michael Hanselmann
1315 b95479a5 Michael Hanselmann
      self.assertRaises(IndexError, queue.GetNextUpdate)
1316 b95479a5 Michael Hanselmann
      self.assertRaises(IndexError, depmgr.GetNextNotification)
1317 b95479a5 Michael Hanselmann
1318 b95479a5 Michael Hanselmann
      if attempt < 2:
1319 b95479a5 Michael Hanselmann
        depmgr.AddCheckResult(job, prev_job_id2, None,
1320 b95479a5 Michael Hanselmann
                              (jqueue._JobDependencyManager.WAIT, "wait2"))
1321 b95479a5 Michael Hanselmann
      elif attempt == 2:
1322 b95479a5 Michael Hanselmann
        depmgr.AddCheckResult(job, prev_job_id2, None,
1323 b95479a5 Michael Hanselmann
                              (jqueue._JobDependencyManager.CONTINUE, "cont"))
1324 b95479a5 Michael Hanselmann
        # The processor will ask for the next dependency immediately
1325 b95479a5 Michael Hanselmann
        depmgr.AddCheckResult(job, prev_job_id, None,
1326 b95479a5 Michael Hanselmann
                              (jqueue._JobDependencyManager.WAIT, "wait"))
1327 b95479a5 Michael Hanselmann
      elif attempt < 5:
1328 b95479a5 Michael Hanselmann
        depmgr.AddCheckResult(job, prev_job_id, None,
1329 b95479a5 Michael Hanselmann
                              (jqueue._JobDependencyManager.WAIT, "wait"))
1330 b95479a5 Michael Hanselmann
      elif attempt == 5:
1331 b95479a5 Michael Hanselmann
        depmgr.AddCheckResult(job, prev_job_id, None,
1332 b95479a5 Michael Hanselmann
                              (jqueue._JobDependencyManager.CONTINUE, "cont"))
1333 b95479a5 Michael Hanselmann
      if attempt == 2:
1334 b95479a5 Michael Hanselmann
        self.assertEqual(depmgr.CountPendingResults(), 2)
1335 b95479a5 Michael Hanselmann
      elif attempt > 5:
1336 b95479a5 Michael Hanselmann
        self.assertEqual(depmgr.CountPendingResults(), 0)
1337 b95479a5 Michael Hanselmann
      else:
1338 b95479a5 Michael Hanselmann
        self.assertEqual(depmgr.CountPendingResults(), 1)
1339 b95479a5 Michael Hanselmann
1340 b95479a5 Michael Hanselmann
      result = jqueue._JobProcessor(queue, opexec, job)()
1341 b95479a5 Michael Hanselmann
      if attempt == 0 or attempt >= 5:
1342 b95479a5 Michael Hanselmann
        # Job should only be updated if there was an actual change
1343 b95479a5 Michael Hanselmann
        self.assertEqual(queue.GetNextUpdate(), (job, True))
1344 b95479a5 Michael Hanselmann
      self.assertRaises(IndexError, queue.GetNextUpdate)
1345 b95479a5 Michael Hanselmann
      self.assertFalse(depmgr.CountPendingResults())
1346 b95479a5 Michael Hanselmann
1347 b95479a5 Michael Hanselmann
      if attempt < 5:
1348 b95479a5 Michael Hanselmann
        # Simulate waiting for other job
1349 75d81fc8 Michael Hanselmann
        self.assertEqual(result, jqueue._JobProcessor.WAITDEP)
1350 b95479a5 Michael Hanselmann
        self.assertTrue(job.cur_opctx)
1351 47099cd1 Michael Hanselmann
        self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITING)
1352 b95479a5 Michael Hanselmann
        self.assertRaises(IndexError, depmgr.GetNextNotification)
1353 b95479a5 Michael Hanselmann
        self.assert_(job.start_timestamp)
1354 b95479a5 Michael Hanselmann
        self.assertFalse(job.end_timestamp)
1355 b95479a5 Michael Hanselmann
        continue
1356 b95479a5 Michael Hanselmann
1357 75d81fc8 Michael Hanselmann
      if result == jqueue._JobProcessor.FINISHED:
1358 b95479a5 Michael Hanselmann
        # Last opcode
1359 b95479a5 Michael Hanselmann
        self.assertFalse(job.cur_opctx)
1360 b95479a5 Michael Hanselmann
        break
1361 b95479a5 Michael Hanselmann
1362 b95479a5 Michael Hanselmann
      self.assertRaises(IndexError, depmgr.GetNextNotification)
1363 b95479a5 Michael Hanselmann
1364 75d81fc8 Michael Hanselmann
      self.assertEqual(result, jqueue._JobProcessor.DEFER)
1365 b95479a5 Michael Hanselmann
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1366 b95479a5 Michael Hanselmann
      self.assert_(job.start_timestamp)
1367 b95479a5 Michael Hanselmann
      self.assertFalse(job.end_timestamp)
1368 b95479a5 Michael Hanselmann
1369 b95479a5 Michael Hanselmann
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_SUCCESS)
1370 b95479a5 Michael Hanselmann
    self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_SUCCESS])
1371 b95479a5 Michael Hanselmann
    self.assertEqual(job.GetInfo(["opresult"]),
1372 b95479a5 Michael Hanselmann
                     [[op.input.result for op in job.ops]])
1373 b95479a5 Michael Hanselmann
    self.assertEqual(job.GetInfo(["opstatus"]),
1374 b95479a5 Michael Hanselmann
                     [len(job.ops) * [constants.OP_STATUS_SUCCESS]])
1375 b95479a5 Michael Hanselmann
    self.assertTrue(compat.all(op.start_timestamp and op.end_timestamp
1376 b95479a5 Michael Hanselmann
                               for op in job.ops))
1377 b95479a5 Michael Hanselmann
1378 b95479a5 Michael Hanselmann
    self._GenericCheckJob(job)
1379 b95479a5 Michael Hanselmann
1380 b95479a5 Michael Hanselmann
    self.assertRaises(IndexError, queue.GetNextUpdate)
1381 b95479a5 Michael Hanselmann
    self.assertRaises(IndexError, depmgr.GetNextNotification)
1382 b95479a5 Michael Hanselmann
    self.assertFalse(depmgr.CountPendingResults())
1383 b95479a5 Michael Hanselmann
    self.assertFalse(depmgr.CountWaitingJobs())
1384 b95479a5 Michael Hanselmann
1385 b95479a5 Michael Hanselmann
    # Calling the processor on a finished job should be a no-op
1386 75d81fc8 Michael Hanselmann
    self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
1387 75d81fc8 Michael Hanselmann
                     jqueue._JobProcessor.FINISHED)
1388 b95479a5 Michael Hanselmann
    self.assertRaises(IndexError, queue.GetNextUpdate)
1389 b95479a5 Michael Hanselmann
1390 b95479a5 Michael Hanselmann
  def testJobDependencyCancel(self):
1391 b95479a5 Michael Hanselmann
    depmgr = _FakeDependencyManager()
1392 b95479a5 Michael Hanselmann
    queue = _FakeQueueForProc(depmgr=depmgr)
1393 b95479a5 Michael Hanselmann
1394 b95479a5 Michael Hanselmann
    self.assertEqual(queue.depmgr, depmgr)
1395 b95479a5 Michael Hanselmann
1396 b95479a5 Michael Hanselmann
    prev_job_id = 13623
1397 b95479a5 Michael Hanselmann
    job_id = 30876
1398 b95479a5 Michael Hanselmann
    ops = [
1399 b95479a5 Michael Hanselmann
      opcodes.OpTestDummy(result="Res0", fail=False),
1400 b95479a5 Michael Hanselmann
      opcodes.OpTestDummy(result="Res1", fail=False,
1401 b95479a5 Michael Hanselmann
                          depends=[
1402 b95479a5 Michael Hanselmann
                            [prev_job_id, None],
1403 b95479a5 Michael Hanselmann
                            ]),
1404 b95479a5 Michael Hanselmann
      opcodes.OpTestDummy(result="Res2", fail=False),
1405 b95479a5 Michael Hanselmann
      ]
1406 b95479a5 Michael Hanselmann
1407 b95479a5 Michael Hanselmann
    # Create job
1408 b95479a5 Michael Hanselmann
    job = self._CreateJob(queue, job_id, ops)
1409 b95479a5 Michael Hanselmann
1410 b95479a5 Michael Hanselmann
    def _BeforeStart(timeout, priority):
1411 b95479a5 Michael Hanselmann
      if attempt == 0 or attempt > 5:
1412 b95479a5 Michael Hanselmann
        # Job should only be updated when it wasn't waiting for another job
1413 b95479a5 Michael Hanselmann
        self.assertEqual(queue.GetNextUpdate(), (job, True))
1414 b95479a5 Michael Hanselmann
      self.assertRaises(IndexError, queue.GetNextUpdate)
1415 b95479a5 Michael Hanselmann
      self.assertFalse(queue.IsAcquired())
1416 47099cd1 Michael Hanselmann
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITING)
1417 b95479a5 Michael Hanselmann
      self.assertFalse(job.cur_opctx)
1418 b95479a5 Michael Hanselmann
1419 b95479a5 Michael Hanselmann
    def _AfterStart(op, cbs):
1420 b95479a5 Michael Hanselmann
      self.assertEqual(queue.GetNextUpdate(), (job, True))
1421 b95479a5 Michael Hanselmann
      self.assertRaises(IndexError, queue.GetNextUpdate)
1422 b95479a5 Michael Hanselmann
1423 b95479a5 Michael Hanselmann
      self.assertFalse(queue.IsAcquired())
1424 b95479a5 Michael Hanselmann
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
1425 b95479a5 Michael Hanselmann
      self.assertFalse(job.cur_opctx)
1426 b95479a5 Michael Hanselmann
1427 b95479a5 Michael Hanselmann
      # Job is running, cancelling shouldn't be possible
1428 b95479a5 Michael Hanselmann
      (success, _) = job.Cancel()
1429 b95479a5 Michael Hanselmann
      self.assertFalse(success)
1430 b95479a5 Michael Hanselmann
1431 b95479a5 Michael Hanselmann
    opexec = _FakeExecOpCodeForProc(queue, _BeforeStart, _AfterStart)
1432 b95479a5 Michael Hanselmann
1433 b95479a5 Michael Hanselmann
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1434 b95479a5 Michael Hanselmann
1435 b95479a5 Michael Hanselmann
    counter = itertools.count()
1436 b95479a5 Michael Hanselmann
    while True:
1437 b95479a5 Michael Hanselmann
      attempt = counter.next()
1438 b95479a5 Michael Hanselmann
1439 b95479a5 Michael Hanselmann
      self.assertRaises(IndexError, queue.GetNextUpdate)
1440 b95479a5 Michael Hanselmann
      self.assertRaises(IndexError, depmgr.GetNextNotification)
1441 b95479a5 Michael Hanselmann
1442 b95479a5 Michael Hanselmann
      if attempt == 0:
1443 b95479a5 Michael Hanselmann
        # This will handle the first opcode
1444 b95479a5 Michael Hanselmann
        pass
1445 b95479a5 Michael Hanselmann
      elif attempt < 4:
1446 b95479a5 Michael Hanselmann
        depmgr.AddCheckResult(job, prev_job_id, None,
1447 b95479a5 Michael Hanselmann
                              (jqueue._JobDependencyManager.WAIT, "wait"))
1448 b95479a5 Michael Hanselmann
      elif attempt == 4:
1449 b95479a5 Michael Hanselmann
        # Other job was cancelled
1450 b95479a5 Michael Hanselmann
        depmgr.AddCheckResult(job, prev_job_id, None,
1451 b95479a5 Michael Hanselmann
                              (jqueue._JobDependencyManager.CANCEL, "cancel"))
1452 b95479a5 Michael Hanselmann
1453 b95479a5 Michael Hanselmann
      if attempt == 0:
1454 b95479a5 Michael Hanselmann
        self.assertEqual(depmgr.CountPendingResults(), 0)
1455 b95479a5 Michael Hanselmann
      else:
1456 b95479a5 Michael Hanselmann
        self.assertEqual(depmgr.CountPendingResults(), 1)
1457 b95479a5 Michael Hanselmann
1458 b95479a5 Michael Hanselmann
      result = jqueue._JobProcessor(queue, opexec, job)()
1459 b95479a5 Michael Hanselmann
      if attempt <= 1 or attempt >= 4:
1460 b95479a5 Michael Hanselmann
        # Job should only be updated if there was an actual change
1461 b95479a5 Michael Hanselmann
        self.assertEqual(queue.GetNextUpdate(), (job, True))
1462 b95479a5 Michael Hanselmann
      self.assertRaises(IndexError, queue.GetNextUpdate)
1463 b95479a5 Michael Hanselmann
      self.assertFalse(depmgr.CountPendingResults())
1464 b95479a5 Michael Hanselmann
1465 b95479a5 Michael Hanselmann
      if attempt > 0 and attempt < 4:
1466 b95479a5 Michael Hanselmann
        # Simulate waiting for other job
1467 75d81fc8 Michael Hanselmann
        self.assertEqual(result, jqueue._JobProcessor.WAITDEP)
1468 b95479a5 Michael Hanselmann
        self.assertTrue(job.cur_opctx)
1469 47099cd1 Michael Hanselmann
        self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITING)
1470 b95479a5 Michael Hanselmann
        self.assertRaises(IndexError, depmgr.GetNextNotification)
1471 b95479a5 Michael Hanselmann
        self.assert_(job.start_timestamp)
1472 b95479a5 Michael Hanselmann
        self.assertFalse(job.end_timestamp)
1473 b95479a5 Michael Hanselmann
        continue
1474 b95479a5 Michael Hanselmann
1475 75d81fc8 Michael Hanselmann
      if result == jqueue._JobProcessor.FINISHED:
1476 b95479a5 Michael Hanselmann
        # Last opcode
1477 b95479a5 Michael Hanselmann
        self.assertFalse(job.cur_opctx)
1478 b95479a5 Michael Hanselmann
        break
1479 b95479a5 Michael Hanselmann
1480 b95479a5 Michael Hanselmann
      self.assertRaises(IndexError, depmgr.GetNextNotification)
1481 b95479a5 Michael Hanselmann
1482 75d81fc8 Michael Hanselmann
      self.assertEqual(result, jqueue._JobProcessor.DEFER)
1483 b95479a5 Michael Hanselmann
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1484 b95479a5 Michael Hanselmann
      self.assert_(job.start_timestamp)
1485 b95479a5 Michael Hanselmann
      self.assertFalse(job.end_timestamp)
1486 b95479a5 Michael Hanselmann
1487 b95479a5 Michael Hanselmann
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_CANCELED)
1488 b95479a5 Michael Hanselmann
    self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_CANCELED])
1489 b95479a5 Michael Hanselmann
    self.assertEqual(job.GetInfo(["opstatus", "opresult"]),
1490 b95479a5 Michael Hanselmann
                     [[constants.OP_STATUS_SUCCESS,
1491 b95479a5 Michael Hanselmann
                       constants.OP_STATUS_CANCELED,
1492 b95479a5 Michael Hanselmann
                       constants.OP_STATUS_CANCELED],
1493 b95479a5 Michael Hanselmann
                      ["Res0", "Job canceled by request",
1494 b95479a5 Michael Hanselmann
                       "Job canceled by request"]])
1495 b95479a5 Michael Hanselmann
1496 b95479a5 Michael Hanselmann
    self._GenericCheckJob(job)
1497 b95479a5 Michael Hanselmann
1498 b95479a5 Michael Hanselmann
    self.assertRaises(IndexError, queue.GetNextUpdate)
1499 b95479a5 Michael Hanselmann
    self.assertRaises(IndexError, depmgr.GetNextNotification)
1500 b95479a5 Michael Hanselmann
    self.assertFalse(depmgr.CountPendingResults())
1501 b95479a5 Michael Hanselmann
1502 b95479a5 Michael Hanselmann
    # Calling the processor on a finished job should be a no-op
1503 75d81fc8 Michael Hanselmann
    self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
1504 75d81fc8 Michael Hanselmann
                     jqueue._JobProcessor.FINISHED)
1505 b95479a5 Michael Hanselmann
    self.assertRaises(IndexError, queue.GetNextUpdate)
1506 b95479a5 Michael Hanselmann
1507 b95479a5 Michael Hanselmann
  def testJobDependencyWrongstatus(self):
1508 b95479a5 Michael Hanselmann
    depmgr = _FakeDependencyManager()
1509 b95479a5 Michael Hanselmann
    queue = _FakeQueueForProc(depmgr=depmgr)
1510 b95479a5 Michael Hanselmann
1511 b95479a5 Michael Hanselmann
    self.assertEqual(queue.depmgr, depmgr)
1512 b95479a5 Michael Hanselmann
1513 b95479a5 Michael Hanselmann
    prev_job_id = 9741
1514 b95479a5 Michael Hanselmann
    job_id = 11763
1515 b95479a5 Michael Hanselmann
    ops = [
1516 b95479a5 Michael Hanselmann
      opcodes.OpTestDummy(result="Res0", fail=False),
1517 b95479a5 Michael Hanselmann
      opcodes.OpTestDummy(result="Res1", fail=False,
1518 b95479a5 Michael Hanselmann
                          depends=[
1519 b95479a5 Michael Hanselmann
                            [prev_job_id, None],
1520 b95479a5 Michael Hanselmann
                            ]),
1521 b95479a5 Michael Hanselmann
      opcodes.OpTestDummy(result="Res2", fail=False),
1522 b95479a5 Michael Hanselmann
      ]
1523 b95479a5 Michael Hanselmann
1524 b95479a5 Michael Hanselmann
    # Create job
1525 b95479a5 Michael Hanselmann
    job = self._CreateJob(queue, job_id, ops)
1526 b95479a5 Michael Hanselmann
1527 b95479a5 Michael Hanselmann
    def _BeforeStart(timeout, priority):
1528 b95479a5 Michael Hanselmann
      if attempt == 0 or attempt > 5:
1529 b95479a5 Michael Hanselmann
        # Job should only be updated when it wasn't waiting for another job
1530 b95479a5 Michael Hanselmann
        self.assertEqual(queue.GetNextUpdate(), (job, True))
1531 b95479a5 Michael Hanselmann
      self.assertRaises(IndexError, queue.GetNextUpdate)
1532 b95479a5 Michael Hanselmann
      self.assertFalse(queue.IsAcquired())
1533 47099cd1 Michael Hanselmann
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITING)
1534 b95479a5 Michael Hanselmann
      self.assertFalse(job.cur_opctx)
1535 b95479a5 Michael Hanselmann
1536 b95479a5 Michael Hanselmann
    def _AfterStart(op, cbs):
1537 b95479a5 Michael Hanselmann
      self.assertEqual(queue.GetNextUpdate(), (job, True))
1538 b95479a5 Michael Hanselmann
      self.assertRaises(IndexError, queue.GetNextUpdate)
1539 b95479a5 Michael Hanselmann
1540 b95479a5 Michael Hanselmann
      self.assertFalse(queue.IsAcquired())
1541 b95479a5 Michael Hanselmann
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
1542 b95479a5 Michael Hanselmann
      self.assertFalse(job.cur_opctx)
1543 b95479a5 Michael Hanselmann
1544 b95479a5 Michael Hanselmann
      # Job is running, cancelling shouldn't be possible
1545 b95479a5 Michael Hanselmann
      (success, _) = job.Cancel()
1546 b95479a5 Michael Hanselmann
      self.assertFalse(success)
1547 b95479a5 Michael Hanselmann
1548 b95479a5 Michael Hanselmann
    opexec = _FakeExecOpCodeForProc(queue, _BeforeStart, _AfterStart)
1549 b95479a5 Michael Hanselmann
1550 b95479a5 Michael Hanselmann
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1551 b95479a5 Michael Hanselmann
1552 b95479a5 Michael Hanselmann
    counter = itertools.count()
1553 b95479a5 Michael Hanselmann
    while True:
1554 b95479a5 Michael Hanselmann
      attempt = counter.next()
1555 b95479a5 Michael Hanselmann
1556 b95479a5 Michael Hanselmann
      self.assertRaises(IndexError, queue.GetNextUpdate)
1557 b95479a5 Michael Hanselmann
      self.assertRaises(IndexError, depmgr.GetNextNotification)
1558 b95479a5 Michael Hanselmann
1559 b95479a5 Michael Hanselmann
      if attempt == 0:
1560 b95479a5 Michael Hanselmann
        # This will handle the first opcode
1561 b95479a5 Michael Hanselmann
        pass
1562 b95479a5 Michael Hanselmann
      elif attempt < 4:
1563 b95479a5 Michael Hanselmann
        depmgr.AddCheckResult(job, prev_job_id, None,
1564 b95479a5 Michael Hanselmann
                              (jqueue._JobDependencyManager.WAIT, "wait"))
1565 b95479a5 Michael Hanselmann
      elif attempt == 4:
1566 b95479a5 Michael Hanselmann
        # Other job failed
1567 b95479a5 Michael Hanselmann
        depmgr.AddCheckResult(job, prev_job_id, None,
1568 b95479a5 Michael Hanselmann
                              (jqueue._JobDependencyManager.WRONGSTATUS, "w"))
1569 b95479a5 Michael Hanselmann
1570 b95479a5 Michael Hanselmann
      if attempt == 0:
1571 b95479a5 Michael Hanselmann
        self.assertEqual(depmgr.CountPendingResults(), 0)
1572 b95479a5 Michael Hanselmann
      else:
1573 b95479a5 Michael Hanselmann
        self.assertEqual(depmgr.CountPendingResults(), 1)
1574 b95479a5 Michael Hanselmann
1575 b95479a5 Michael Hanselmann
      result = jqueue._JobProcessor(queue, opexec, job)()
1576 b95479a5 Michael Hanselmann
      if attempt <= 1 or attempt >= 4:
1577 b95479a5 Michael Hanselmann
        # Job should only be updated if there was an actual change
1578 b95479a5 Michael Hanselmann
        self.assertEqual(queue.GetNextUpdate(), (job, True))
1579 b95479a5 Michael Hanselmann
      self.assertRaises(IndexError, queue.GetNextUpdate)
1580 b95479a5 Michael Hanselmann
      self.assertFalse(depmgr.CountPendingResults())
1581 b95479a5 Michael Hanselmann
1582 b95479a5 Michael Hanselmann
      if attempt > 0 and attempt < 4:
1583 b95479a5 Michael Hanselmann
        # Simulate waiting for other job
1584 75d81fc8 Michael Hanselmann
        self.assertEqual(result, jqueue._JobProcessor.WAITDEP)
1585 b95479a5 Michael Hanselmann
        self.assertTrue(job.cur_opctx)
1586 47099cd1 Michael Hanselmann
        self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITING)
1587 b95479a5 Michael Hanselmann
        self.assertRaises(IndexError, depmgr.GetNextNotification)
1588 b95479a5 Michael Hanselmann
        self.assert_(job.start_timestamp)
1589 b95479a5 Michael Hanselmann
        self.assertFalse(job.end_timestamp)
1590 b95479a5 Michael Hanselmann
        continue
1591 b95479a5 Michael Hanselmann
1592 75d81fc8 Michael Hanselmann
      if result == jqueue._JobProcessor.FINISHED:
1593 b95479a5 Michael Hanselmann
        # Last opcode
1594 b95479a5 Michael Hanselmann
        self.assertFalse(job.cur_opctx)
1595 b95479a5 Michael Hanselmann
        break
1596 b95479a5 Michael Hanselmann
1597 b95479a5 Michael Hanselmann
      self.assertRaises(IndexError, depmgr.GetNextNotification)
1598 b95479a5 Michael Hanselmann
1599 75d81fc8 Michael Hanselmann
      self.assertEqual(result, jqueue._JobProcessor.DEFER)
1600 b95479a5 Michael Hanselmann
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1601 b95479a5 Michael Hanselmann
      self.assert_(job.start_timestamp)
1602 b95479a5 Michael Hanselmann
      self.assertFalse(job.end_timestamp)
1603 b95479a5 Michael Hanselmann
1604 b95479a5 Michael Hanselmann
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_ERROR)
1605 b95479a5 Michael Hanselmann
    self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_ERROR])
1606 b95479a5 Michael Hanselmann
    self.assertEqual(job.GetInfo(["opstatus"]),
1607 b95479a5 Michael Hanselmann
                     [[constants.OP_STATUS_SUCCESS,
1608 b95479a5 Michael Hanselmann
                       constants.OP_STATUS_ERROR,
1609 b95479a5 Michael Hanselmann
                       constants.OP_STATUS_ERROR]]),
1610 b95479a5 Michael Hanselmann
1611 b95479a5 Michael Hanselmann
    (opresult, ) = job.GetInfo(["opresult"])
1612 b95479a5 Michael Hanselmann
    self.assertEqual(len(opresult), len(ops))
1613 b95479a5 Michael Hanselmann
    self.assertEqual(opresult[0], "Res0")
1614 b95479a5 Michael Hanselmann
    self.assertTrue(errors.GetEncodedError(opresult[1]))
1615 b95479a5 Michael Hanselmann
    self.assertTrue(errors.GetEncodedError(opresult[2]))
1616 b95479a5 Michael Hanselmann
1617 b95479a5 Michael Hanselmann
    self._GenericCheckJob(job)
1618 b95479a5 Michael Hanselmann
1619 b95479a5 Michael Hanselmann
    self.assertRaises(IndexError, queue.GetNextUpdate)
1620 b95479a5 Michael Hanselmann
    self.assertRaises(IndexError, depmgr.GetNextNotification)
1621 b95479a5 Michael Hanselmann
    self.assertFalse(depmgr.CountPendingResults())
1622 b95479a5 Michael Hanselmann
1623 b95479a5 Michael Hanselmann
    # Calling the processor on a finished job should be a no-op
1624 75d81fc8 Michael Hanselmann
    self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
1625 75d81fc8 Michael Hanselmann
                     jqueue._JobProcessor.FINISHED)
1626 b95479a5 Michael Hanselmann
    self.assertRaises(IndexError, queue.GetNextUpdate)
1627 b95479a5 Michael Hanselmann
1628 be760ba8 Michael Hanselmann
1629 df5a5730 Michael Hanselmann
class TestEvaluateJobProcessorResult(unittest.TestCase):
1630 df5a5730 Michael Hanselmann
  def testFinished(self):
1631 df5a5730 Michael Hanselmann
    depmgr = _FakeDependencyManager()
1632 df5a5730 Michael Hanselmann
    job = _IdOnlyFakeJob(30953)
1633 df5a5730 Michael Hanselmann
    jqueue._EvaluateJobProcessorResult(depmgr, job,
1634 df5a5730 Michael Hanselmann
                                       jqueue._JobProcessor.FINISHED)
1635 df5a5730 Michael Hanselmann
    self.assertEqual(depmgr.GetNextNotification(), job.id)
1636 df5a5730 Michael Hanselmann
    self.assertRaises(IndexError, depmgr.GetNextNotification)
1637 df5a5730 Michael Hanselmann
1638 df5a5730 Michael Hanselmann
  def testDefer(self):
1639 df5a5730 Michael Hanselmann
    depmgr = _FakeDependencyManager()
1640 df5a5730 Michael Hanselmann
    job = _IdOnlyFakeJob(11326, priority=5463)
1641 df5a5730 Michael Hanselmann
    try:
1642 df5a5730 Michael Hanselmann
      jqueue._EvaluateJobProcessorResult(depmgr, job,
1643 df5a5730 Michael Hanselmann
                                         jqueue._JobProcessor.DEFER)
1644 df5a5730 Michael Hanselmann
    except workerpool.DeferTask, err:
1645 df5a5730 Michael Hanselmann
      self.assertEqual(err.priority, 5463)
1646 df5a5730 Michael Hanselmann
    else:
1647 df5a5730 Michael Hanselmann
      self.fail("Didn't raise exception")
1648 df5a5730 Michael Hanselmann
    self.assertRaises(IndexError, depmgr.GetNextNotification)
1649 df5a5730 Michael Hanselmann
1650 df5a5730 Michael Hanselmann
  def testWaitdep(self):
1651 df5a5730 Michael Hanselmann
    depmgr = _FakeDependencyManager()
1652 df5a5730 Michael Hanselmann
    job = _IdOnlyFakeJob(21317)
1653 df5a5730 Michael Hanselmann
    jqueue._EvaluateJobProcessorResult(depmgr, job,
1654 df5a5730 Michael Hanselmann
                                       jqueue._JobProcessor.WAITDEP)
1655 df5a5730 Michael Hanselmann
    self.assertRaises(IndexError, depmgr.GetNextNotification)
1656 df5a5730 Michael Hanselmann
1657 df5a5730 Michael Hanselmann
  def testOther(self):
1658 df5a5730 Michael Hanselmann
    depmgr = _FakeDependencyManager()
1659 df5a5730 Michael Hanselmann
    job = _IdOnlyFakeJob(5813)
1660 df5a5730 Michael Hanselmann
    self.assertRaises(errors.ProgrammerError,
1661 df5a5730 Michael Hanselmann
                      jqueue._EvaluateJobProcessorResult,
1662 df5a5730 Michael Hanselmann
                      depmgr, job, "Other result")
1663 df5a5730 Michael Hanselmann
    self.assertRaises(IndexError, depmgr.GetNextNotification)
1664 df5a5730 Michael Hanselmann
1665 df5a5730 Michael Hanselmann
1666 26d3fd2f Michael Hanselmann
class _FakeTimeoutStrategy:
1667 26d3fd2f Michael Hanselmann
  def __init__(self, timeouts):
1668 26d3fd2f Michael Hanselmann
    self.timeouts = timeouts
1669 26d3fd2f Michael Hanselmann
    self.attempts = 0
1670 26d3fd2f Michael Hanselmann
    self.last_timeout = None
1671 26d3fd2f Michael Hanselmann
1672 26d3fd2f Michael Hanselmann
  def NextAttempt(self):
1673 26d3fd2f Michael Hanselmann
    self.attempts += 1
1674 26d3fd2f Michael Hanselmann
    if self.timeouts:
1675 26d3fd2f Michael Hanselmann
      timeout = self.timeouts.pop(0)
1676 26d3fd2f Michael Hanselmann
    else:
1677 26d3fd2f Michael Hanselmann
      timeout = None
1678 26d3fd2f Michael Hanselmann
    self.last_timeout = timeout
1679 26d3fd2f Michael Hanselmann
    return timeout
1680 26d3fd2f Michael Hanselmann
1681 26d3fd2f Michael Hanselmann
1682 26d3fd2f Michael Hanselmann
class TestJobProcessorTimeouts(unittest.TestCase, _JobProcessorTestUtils):
1683 26d3fd2f Michael Hanselmann
  def setUp(self):
1684 26d3fd2f Michael Hanselmann
    self.queue = _FakeQueueForProc()
1685 26d3fd2f Michael Hanselmann
    self.job = None
1686 26d3fd2f Michael Hanselmann
    self.curop = None
1687 26d3fd2f Michael Hanselmann
    self.opcounter = None
1688 26d3fd2f Michael Hanselmann
    self.timeout_strategy = None
1689 26d3fd2f Michael Hanselmann
    self.retries = 0
1690 26d3fd2f Michael Hanselmann
    self.prev_tsop = None
1691 26d3fd2f Michael Hanselmann
    self.prev_prio = None
1692 5fd6b694 Michael Hanselmann
    self.prev_status = None
1693 5fd6b694 Michael Hanselmann
    self.lock_acq_prio = None
1694 26d3fd2f Michael Hanselmann
    self.gave_lock = None
1695 26d3fd2f Michael Hanselmann
    self.done_lock_before_blocking = False
1696 26d3fd2f Michael Hanselmann
1697 f23db633 Michael Hanselmann
  def _BeforeStart(self, timeout, priority):
1698 26d3fd2f Michael Hanselmann
    job = self.job
1699 26d3fd2f Michael Hanselmann
1700 5fd6b694 Michael Hanselmann
    # If status has changed, job must've been written
1701 5fd6b694 Michael Hanselmann
    if self.prev_status != self.job.ops[self.curop].status:
1702 5fd6b694 Michael Hanselmann
      self.assertEqual(self.queue.GetNextUpdate(), (job, True))
1703 ebb2a2a3 Michael Hanselmann
    self.assertRaises(IndexError, self.queue.GetNextUpdate)
1704 5fd6b694 Michael Hanselmann
1705 26d3fd2f Michael Hanselmann
    self.assertFalse(self.queue.IsAcquired())
1706 47099cd1 Michael Hanselmann
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITING)
1707 26d3fd2f Michael Hanselmann
1708 26d3fd2f Michael Hanselmann
    ts = self.timeout_strategy
1709 26d3fd2f Michael Hanselmann
1710 26d3fd2f Michael Hanselmann
    self.assert_(timeout is None or isinstance(timeout, (int, float)))
1711 26d3fd2f Michael Hanselmann
    self.assertEqual(timeout, ts.last_timeout)
1712 f23db633 Michael Hanselmann
    self.assertEqual(priority, job.ops[self.curop].priority)
1713 26d3fd2f Michael Hanselmann
1714 26d3fd2f Michael Hanselmann
    self.gave_lock = True
1715 5fd6b694 Michael Hanselmann
    self.lock_acq_prio = priority
1716 26d3fd2f Michael Hanselmann
1717 26d3fd2f Michael Hanselmann
    if (self.curop == 3 and
1718 26d3fd2f Michael Hanselmann
        job.ops[self.curop].priority == constants.OP_PRIO_HIGHEST + 3):
1719 26d3fd2f Michael Hanselmann
      # Give locks before running into blocking acquire
1720 26d3fd2f Michael Hanselmann
      assert self.retries == 7
1721 26d3fd2f Michael Hanselmann
      self.retries = 0
1722 26d3fd2f Michael Hanselmann
      self.done_lock_before_blocking = True
1723 26d3fd2f Michael Hanselmann
      return
1724 26d3fd2f Michael Hanselmann
1725 26d3fd2f Michael Hanselmann
    if self.retries > 0:
1726 26d3fd2f Michael Hanselmann
      self.assert_(timeout is not None)
1727 26d3fd2f Michael Hanselmann
      self.retries -= 1
1728 26d3fd2f Michael Hanselmann
      self.gave_lock = False
1729 26d3fd2f Michael Hanselmann
      raise mcpu.LockAcquireTimeout()
1730 26d3fd2f Michael Hanselmann
1731 26d3fd2f Michael Hanselmann
    if job.ops[self.curop].priority == constants.OP_PRIO_HIGHEST:
1732 26d3fd2f Michael Hanselmann
      assert self.retries == 0, "Didn't exhaust all retries at highest priority"
1733 26d3fd2f Michael Hanselmann
      assert not ts.timeouts
1734 26d3fd2f Michael Hanselmann
      self.assert_(timeout is None)
1735 26d3fd2f Michael Hanselmann
1736 26d3fd2f Michael Hanselmann
  def _AfterStart(self, op, cbs):
1737 26d3fd2f Michael Hanselmann
    job = self.job
1738 26d3fd2f Michael Hanselmann
1739 5fd6b694 Michael Hanselmann
    # Setting to "running" requires an update
1740 ebb2a2a3 Michael Hanselmann
    self.assertEqual(self.queue.GetNextUpdate(), (job, True))
1741 ebb2a2a3 Michael Hanselmann
    self.assertRaises(IndexError, self.queue.GetNextUpdate)
1742 5fd6b694 Michael Hanselmann
1743 26d3fd2f Michael Hanselmann
    self.assertFalse(self.queue.IsAcquired())
1744 26d3fd2f Michael Hanselmann
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
1745 26d3fd2f Michael Hanselmann
1746 26d3fd2f Michael Hanselmann
    # Job is running, cancelling shouldn't be possible
1747 26d3fd2f Michael Hanselmann
    (success, _) = job.Cancel()
1748 26d3fd2f Michael Hanselmann
    self.assertFalse(success)
1749 26d3fd2f Michael Hanselmann
1750 26d3fd2f Michael Hanselmann
  def _NextOpcode(self):
1751 26d3fd2f Michael Hanselmann
    self.curop = self.opcounter.next()
1752 26d3fd2f Michael Hanselmann
    self.prev_prio = self.job.ops[self.curop].priority
1753 5fd6b694 Michael Hanselmann
    self.prev_status = self.job.ops[self.curop].status
1754 26d3fd2f Michael Hanselmann
1755 26d3fd2f Michael Hanselmann
  def _NewTimeoutStrategy(self):
1756 26d3fd2f Michael Hanselmann
    job = self.job
1757 26d3fd2f Michael Hanselmann
1758 26d3fd2f Michael Hanselmann
    self.assertEqual(self.retries, 0)
1759 26d3fd2f Michael Hanselmann
1760 26d3fd2f Michael Hanselmann
    if self.prev_tsop == self.curop:
1761 26d3fd2f Michael Hanselmann
      # Still on the same opcode, priority must've been increased
1762 26d3fd2f Michael Hanselmann
      self.assertEqual(self.prev_prio, job.ops[self.curop].priority + 1)
1763 26d3fd2f Michael Hanselmann
1764 26d3fd2f Michael Hanselmann
    if self.curop == 1:
1765 26d3fd2f Michael Hanselmann
      # Normal retry
1766 26d3fd2f Michael Hanselmann
      timeouts = range(10, 31, 10)
1767 26d3fd2f Michael Hanselmann
      self.retries = len(timeouts) - 1
1768 26d3fd2f Michael Hanselmann
1769 26d3fd2f Michael Hanselmann
    elif self.curop == 2:
1770 26d3fd2f Michael Hanselmann
      # Let this run into a blocking acquire
1771 26d3fd2f Michael Hanselmann
      timeouts = range(11, 61, 12)
1772 26d3fd2f Michael Hanselmann
      self.retries = len(timeouts)
1773 26d3fd2f Michael Hanselmann
1774 26d3fd2f Michael Hanselmann
    elif self.curop == 3:
1775 26d3fd2f Michael Hanselmann
      # Wait for priority to increase, but give lock before blocking acquire
1776 26d3fd2f Michael Hanselmann
      timeouts = range(12, 100, 14)
1777 26d3fd2f Michael Hanselmann
      self.retries = len(timeouts)
1778 26d3fd2f Michael Hanselmann
1779 26d3fd2f Michael Hanselmann
      self.assertFalse(self.done_lock_before_blocking)
1780 26d3fd2f Michael Hanselmann
1781 26d3fd2f Michael Hanselmann
    elif self.curop == 4:
1782 26d3fd2f Michael Hanselmann
      self.assert_(self.done_lock_before_blocking)
1783 26d3fd2f Michael Hanselmann
1784 26d3fd2f Michael Hanselmann
      # Timeouts, but no need to retry
1785 26d3fd2f Michael Hanselmann
      timeouts = range(10, 31, 10)
1786 26d3fd2f Michael Hanselmann
      self.retries = 0
1787 26d3fd2f Michael Hanselmann
1788 26d3fd2f Michael Hanselmann
    elif self.curop == 5:
1789 26d3fd2f Michael Hanselmann
      # Normal retry
1790 26d3fd2f Michael Hanselmann
      timeouts = range(19, 100, 11)
1791 26d3fd2f Michael Hanselmann
      self.retries = len(timeouts)
1792 26d3fd2f Michael Hanselmann
1793 26d3fd2f Michael Hanselmann
    else:
1794 26d3fd2f Michael Hanselmann
      timeouts = []
1795 26d3fd2f Michael Hanselmann
      self.retries = 0
1796 26d3fd2f Michael Hanselmann
1797 26d3fd2f Michael Hanselmann
    assert len(job.ops) == 10
1798 26d3fd2f Michael Hanselmann
    assert self.retries <= len(timeouts)
1799 26d3fd2f Michael Hanselmann
1800 26d3fd2f Michael Hanselmann
    ts = _FakeTimeoutStrategy(timeouts)
1801 26d3fd2f Michael Hanselmann
1802 26d3fd2f Michael Hanselmann
    self.timeout_strategy = ts
1803 26d3fd2f Michael Hanselmann
    self.prev_tsop = self.curop
1804 26d3fd2f Michael Hanselmann
    self.prev_prio = job.ops[self.curop].priority
1805 26d3fd2f Michael Hanselmann
1806 26d3fd2f Michael Hanselmann
    return ts
1807 26d3fd2f Michael Hanselmann
1808 26d3fd2f Michael Hanselmann
  def testTimeout(self):
1809 26d3fd2f Michael Hanselmann
    ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
1810 26d3fd2f Michael Hanselmann
           for i in range(10)]
1811 26d3fd2f Michael Hanselmann
1812 26d3fd2f Michael Hanselmann
    # Create job
1813 26d3fd2f Michael Hanselmann
    job_id = 15801
1814 26d3fd2f Michael Hanselmann
    job = self._CreateJob(self.queue, job_id, ops)
1815 26d3fd2f Michael Hanselmann
    self.job = job
1816 26d3fd2f Michael Hanselmann
1817 26d3fd2f Michael Hanselmann
    self.opcounter = itertools.count(0)
1818 26d3fd2f Michael Hanselmann
1819 ebb2a2a3 Michael Hanselmann
    opexec = _FakeExecOpCodeForProc(self.queue, self._BeforeStart,
1820 ebb2a2a3 Michael Hanselmann
                                    self._AfterStart)
1821 26d3fd2f Michael Hanselmann
    tsf = self._NewTimeoutStrategy
1822 26d3fd2f Michael Hanselmann
1823 26d3fd2f Michael Hanselmann
    self.assertFalse(self.done_lock_before_blocking)
1824 26d3fd2f Michael Hanselmann
1825 5fd6b694 Michael Hanselmann
    while True:
1826 26d3fd2f Michael Hanselmann
      proc = jqueue._JobProcessor(self.queue, opexec, job,
1827 26d3fd2f Michael Hanselmann
                                  _timeout_strategy_factory=tsf)
1828 26d3fd2f Michael Hanselmann
1829 ebb2a2a3 Michael Hanselmann
      self.assertRaises(IndexError, self.queue.GetNextUpdate)
1830 5fd6b694 Michael Hanselmann
1831 5fd6b694 Michael Hanselmann
      if self.curop is not None:
1832 5fd6b694 Michael Hanselmann
        self.prev_status = self.job.ops[self.curop].status
1833 5fd6b694 Michael Hanselmann
1834 5fd6b694 Michael Hanselmann
      self.lock_acq_prio = None
1835 5fd6b694 Michael Hanselmann
1836 26d3fd2f Michael Hanselmann
      result = proc(_nextop_fn=self._NextOpcode)
1837 5fd6b694 Michael Hanselmann
      assert self.curop is not None
1838 5fd6b694 Michael Hanselmann
1839 75d81fc8 Michael Hanselmann
      if result == jqueue._JobProcessor.FINISHED or self.gave_lock:
1840 5fd6b694 Michael Hanselmann
        # Got lock and/or job is done, result must've been written
1841 5fd6b694 Michael Hanselmann
        self.assertFalse(job.cur_opctx)
1842 5fd6b694 Michael Hanselmann
        self.assertEqual(self.queue.GetNextUpdate(), (job, True))
1843 5fd6b694 Michael Hanselmann
        self.assertRaises(IndexError, self.queue.GetNextUpdate)
1844 5fd6b694 Michael Hanselmann
        self.assertEqual(self.lock_acq_prio, job.ops[self.curop].priority)
1845 5fd6b694 Michael Hanselmann
        self.assert_(job.ops[self.curop].exec_timestamp)
1846 5fd6b694 Michael Hanselmann
1847 75d81fc8 Michael Hanselmann
      if result == jqueue._JobProcessor.FINISHED:
1848 26d3fd2f Michael Hanselmann
        self.assertFalse(job.cur_opctx)
1849 26d3fd2f Michael Hanselmann
        break
1850 26d3fd2f Michael Hanselmann
1851 75d81fc8 Michael Hanselmann
      self.assertEqual(result, jqueue._JobProcessor.DEFER)
1852 26d3fd2f Michael Hanselmann
1853 5fd6b694 Michael Hanselmann
      if self.curop == 0:
1854 5fd6b694 Michael Hanselmann
        self.assertEqual(job.ops[self.curop].start_timestamp,
1855 5fd6b694 Michael Hanselmann
                         job.start_timestamp)
1856 5fd6b694 Michael Hanselmann
1857 26d3fd2f Michael Hanselmann
      if self.gave_lock:
1858 5fd6b694 Michael Hanselmann
        # Opcode finished, but job not yet done
1859 5fd6b694 Michael Hanselmann
        self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1860 26d3fd2f Michael Hanselmann
      else:
1861 5fd6b694 Michael Hanselmann
        # Did not get locks
1862 26d3fd2f Michael Hanselmann
        self.assert_(job.cur_opctx)
1863 26d3fd2f Michael Hanselmann
        self.assertEqual(job.cur_opctx._timeout_strategy._fn,
1864 26d3fd2f Michael Hanselmann
                         self.timeout_strategy.NextAttempt)
1865 5fd6b694 Michael Hanselmann
        self.assertFalse(job.ops[self.curop].exec_timestamp)
1866 47099cd1 Michael Hanselmann
        self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITING)
1867 5fd6b694 Michael Hanselmann
1868 5fd6b694 Michael Hanselmann
        # If priority has changed since acquiring locks, the job must've been
1869 5fd6b694 Michael Hanselmann
        # updated
1870 5fd6b694 Michael Hanselmann
        if self.lock_acq_prio != job.ops[self.curop].priority:
1871 5fd6b694 Michael Hanselmann
          self.assertEqual(self.queue.GetNextUpdate(), (job, True))
1872 5fd6b694 Michael Hanselmann
1873 5fd6b694 Michael Hanselmann
      self.assertRaises(IndexError, self.queue.GetNextUpdate)
1874 26d3fd2f Michael Hanselmann
1875 26d3fd2f Michael Hanselmann
      self.assert_(job.start_timestamp)
1876 26d3fd2f Michael Hanselmann
      self.assertFalse(job.end_timestamp)
1877 26d3fd2f Michael Hanselmann
1878 26d3fd2f Michael Hanselmann
    self.assertEqual(self.curop, len(job.ops) - 1)
1879 26d3fd2f Michael Hanselmann
    self.assertEqual(self.job, job)
1880 26d3fd2f Michael Hanselmann
    self.assertEqual(self.opcounter.next(), len(job.ops))
1881 26d3fd2f Michael Hanselmann
    self.assert_(self.done_lock_before_blocking)
1882 26d3fd2f Michael Hanselmann
1883 ebb2a2a3 Michael Hanselmann
    self.assertRaises(IndexError, self.queue.GetNextUpdate)
1884 26d3fd2f Michael Hanselmann
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_SUCCESS)
1885 26d3fd2f Michael Hanselmann
    self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_SUCCESS])
1886 26d3fd2f Michael Hanselmann
    self.assertEqual(job.GetInfo(["opresult"]),
1887 26d3fd2f Michael Hanselmann
                     [[op.input.result for op in job.ops]])
1888 26d3fd2f Michael Hanselmann
    self.assertEqual(job.GetInfo(["opstatus"]),
1889 26d3fd2f Michael Hanselmann
                     [len(job.ops) * [constants.OP_STATUS_SUCCESS]])
1890 26d3fd2f Michael Hanselmann
    self.assert_(compat.all(op.start_timestamp and op.end_timestamp
1891 26d3fd2f Michael Hanselmann
                            for op in job.ops))
1892 26d3fd2f Michael Hanselmann
1893 66bd7445 Michael Hanselmann
    # Calling the processor on a finished job should be a no-op
1894 75d81fc8 Michael Hanselmann
    self.assertEqual(jqueue._JobProcessor(self.queue, opexec, job)(),
1895 75d81fc8 Michael Hanselmann
                     jqueue._JobProcessor.FINISHED)
1896 66bd7445 Michael Hanselmann
    self.assertRaises(IndexError, self.queue.GetNextUpdate)
1897 26d3fd2f Michael Hanselmann
1898 26d3fd2f Michael Hanselmann
1899 1b5150ab Michael Hanselmann
class _IdOnlyFakeJob:
1900 1b5150ab Michael Hanselmann
  def __init__(self, job_id, priority=NotImplemented):
1901 1b5150ab Michael Hanselmann
    self.id = str(job_id)
1902 1b5150ab Michael Hanselmann
    self._priority = priority
1903 1b5150ab Michael Hanselmann
1904 1b5150ab Michael Hanselmann
  def CalcPriority(self):
1905 1b5150ab Michael Hanselmann
    return self._priority
1906 b95479a5 Michael Hanselmann
1907 1b5150ab Michael Hanselmann
1908 1b5150ab Michael Hanselmann
class TestJobDependencyManager(unittest.TestCase):
1909 b95479a5 Michael Hanselmann
  def setUp(self):
1910 b95479a5 Michael Hanselmann
    self._status = []
1911 b95479a5 Michael Hanselmann
    self._queue = []
1912 b95479a5 Michael Hanselmann
    self.jdm = jqueue._JobDependencyManager(self._GetStatus, self._Enqueue)
1913 b95479a5 Michael Hanselmann
1914 b95479a5 Michael Hanselmann
  def _GetStatus(self, job_id):
1915 b95479a5 Michael Hanselmann
    (exp_job_id, result) = self._status.pop(0)
1916 b95479a5 Michael Hanselmann
    self.assertEqual(exp_job_id, job_id)
1917 b95479a5 Michael Hanselmann
    return result
1918 b95479a5 Michael Hanselmann
1919 b95479a5 Michael Hanselmann
  def _Enqueue(self, jobs):
1920 37d76f1e Michael Hanselmann
    self.assertFalse(self.jdm._lock.is_owned(),
1921 37d76f1e Michael Hanselmann
                     msg=("Must not own manager lock while re-adding jobs"
1922 37d76f1e Michael Hanselmann
                          " (potential deadlock)"))
1923 b95479a5 Michael Hanselmann
    self._queue.append(jobs)
1924 b95479a5 Michael Hanselmann
1925 b95479a5 Michael Hanselmann
  def testNotFinalizedThenCancel(self):
1926 1b5150ab Michael Hanselmann
    job = _IdOnlyFakeJob(17697)
1927 b95479a5 Michael Hanselmann
    job_id = str(28625)
1928 b95479a5 Michael Hanselmann
1929 b95479a5 Michael Hanselmann
    self._status.append((job_id, constants.JOB_STATUS_RUNNING))
1930 b95479a5 Michael Hanselmann
    (result, _) = self.jdm.CheckAndRegister(job, job_id, [])
1931 b95479a5 Michael Hanselmann
    self.assertEqual(result, self.jdm.WAIT)
1932 b95479a5 Michael Hanselmann
    self.assertFalse(self._status)
1933 b95479a5 Michael Hanselmann
    self.assertFalse(self._queue)
1934 b95479a5 Michael Hanselmann
    self.assertTrue(self.jdm.JobWaiting(job))
1935 b95479a5 Michael Hanselmann
    self.assertEqual(self.jdm._waiters, {
1936 b95479a5 Michael Hanselmann
      job_id: set([job]),
1937 b95479a5 Michael Hanselmann
      })
1938 fcb21ad7 Michael Hanselmann
    self.assertEqual(self.jdm.GetLockInfo([query.LQ_PENDING]), [
1939 fcb21ad7 Michael Hanselmann
      ("job/28625", None, None, [("job", [job.id])])
1940 fcb21ad7 Michael Hanselmann
      ])
1941 b95479a5 Michael Hanselmann
1942 b95479a5 Michael Hanselmann
    self._status.append((job_id, constants.JOB_STATUS_CANCELED))
1943 b95479a5 Michael Hanselmann
    (result, _) = self.jdm.CheckAndRegister(job, job_id, [])
1944 b95479a5 Michael Hanselmann
    self.assertEqual(result, self.jdm.CANCEL)
1945 b95479a5 Michael Hanselmann
    self.assertFalse(self._status)
1946 b95479a5 Michael Hanselmann
    self.assertFalse(self._queue)
1947 b95479a5 Michael Hanselmann
    self.assertFalse(self.jdm.JobWaiting(job))
1948 fcb21ad7 Michael Hanselmann
    self.assertFalse(self.jdm.GetLockInfo([query.LQ_PENDING]))
1949 b95479a5 Michael Hanselmann
1950 b95479a5 Michael Hanselmann
  def testRequireCancel(self):
1951 1b5150ab Michael Hanselmann
    job = _IdOnlyFakeJob(5278)
1952 b95479a5 Michael Hanselmann
    job_id = str(9610)
1953 b95479a5 Michael Hanselmann
    dep_status = [constants.JOB_STATUS_CANCELED]
1954 b95479a5 Michael Hanselmann
1955 47099cd1 Michael Hanselmann
    self._status.append((job_id, constants.JOB_STATUS_WAITING))
1956 b95479a5 Michael Hanselmann
    (result, _) = self.jdm.CheckAndRegister(job, job_id, dep_status)
1957 b95479a5 Michael Hanselmann
    self.assertEqual(result, self.jdm.WAIT)
1958 b95479a5 Michael Hanselmann
    self.assertFalse(self._status)
1959 b95479a5 Michael Hanselmann
    self.assertFalse(self._queue)
1960 b95479a5 Michael Hanselmann
    self.assertTrue(self.jdm.JobWaiting(job))
1961 b95479a5 Michael Hanselmann
    self.assertEqual(self.jdm._waiters, {
1962 b95479a5 Michael Hanselmann
      job_id: set([job]),
1963 b95479a5 Michael Hanselmann
      })
1964 fcb21ad7 Michael Hanselmann
    self.assertEqual(self.jdm.GetLockInfo([query.LQ_PENDING]), [
1965 fcb21ad7 Michael Hanselmann
      ("job/9610", None, None, [("job", [job.id])])
1966 fcb21ad7 Michael Hanselmann
      ])
1967 b95479a5 Michael Hanselmann
1968 b95479a5 Michael Hanselmann
    self._status.append((job_id, constants.JOB_STATUS_CANCELED))
1969 b95479a5 Michael Hanselmann
    (result, _) = self.jdm.CheckAndRegister(job, job_id, dep_status)
1970 b95479a5 Michael Hanselmann
    self.assertEqual(result, self.jdm.CONTINUE)
1971 b95479a5 Michael Hanselmann
    self.assertFalse(self._status)
1972 b95479a5 Michael Hanselmann
    self.assertFalse(self._queue)
1973 b95479a5 Michael Hanselmann
    self.assertFalse(self.jdm.JobWaiting(job))
1974 fcb21ad7 Michael Hanselmann
    self.assertFalse(self.jdm.GetLockInfo([query.LQ_PENDING]))
1975 b95479a5 Michael Hanselmann
1976 b95479a5 Michael Hanselmann
  def testRequireError(self):
1977 1b5150ab Michael Hanselmann
    job = _IdOnlyFakeJob(21459)
1978 b95479a5 Michael Hanselmann
    job_id = str(25519)
1979 b95479a5 Michael Hanselmann
    dep_status = [constants.JOB_STATUS_ERROR]
1980 b95479a5 Michael Hanselmann
1981 47099cd1 Michael Hanselmann
    self._status.append((job_id, constants.JOB_STATUS_WAITING))
1982 b95479a5 Michael Hanselmann
    (result, _) = self.jdm.CheckAndRegister(job, job_id, dep_status)
1983 b95479a5 Michael Hanselmann
    self.assertEqual(result, self.jdm.WAIT)
1984 b95479a5 Michael Hanselmann
    self.assertFalse(self._status)
1985 b95479a5 Michael Hanselmann
    self.assertFalse(self._queue)
1986 b95479a5 Michael Hanselmann
    self.assertTrue(self.jdm.JobWaiting(job))
1987 b95479a5 Michael Hanselmann
    self.assertEqual(self.jdm._waiters, {
1988 b95479a5 Michael Hanselmann
      job_id: set([job]),
1989 b95479a5 Michael Hanselmann
      })
1990 b95479a5 Michael Hanselmann
1991 b95479a5 Michael Hanselmann
    self._status.append((job_id, constants.JOB_STATUS_ERROR))
1992 b95479a5 Michael Hanselmann
    (result, _) = self.jdm.CheckAndRegister(job, job_id, dep_status)
1993 b95479a5 Michael Hanselmann
    self.assertEqual(result, self.jdm.CONTINUE)
1994 b95479a5 Michael Hanselmann
    self.assertFalse(self._status)
1995 b95479a5 Michael Hanselmann
    self.assertFalse(self._queue)
1996 b95479a5 Michael Hanselmann
    self.assertFalse(self.jdm.JobWaiting(job))
1997 fcb21ad7 Michael Hanselmann
    self.assertFalse(self.jdm.GetLockInfo([query.LQ_PENDING]))
1998 b95479a5 Michael Hanselmann
1999 b95479a5 Michael Hanselmann
  def testRequireMultiple(self):
2000 b95479a5 Michael Hanselmann
    dep_status = list(constants.JOBS_FINALIZED)
2001 b95479a5 Michael Hanselmann
2002 b95479a5 Michael Hanselmann
    for end_status in dep_status:
2003 1b5150ab Michael Hanselmann
      job = _IdOnlyFakeJob(21343)
2004 b95479a5 Michael Hanselmann
      job_id = str(14609)
2005 b95479a5 Michael Hanselmann
2006 47099cd1 Michael Hanselmann
      self._status.append((job_id, constants.JOB_STATUS_WAITING))
2007 b95479a5 Michael Hanselmann
      (result, _) = self.jdm.CheckAndRegister(job, job_id, dep_status)
2008 b95479a5 Michael Hanselmann
      self.assertEqual(result, self.jdm.WAIT)
2009 b95479a5 Michael Hanselmann
      self.assertFalse(self._status)
2010 b95479a5 Michael Hanselmann
      self.assertFalse(self._queue)
2011 b95479a5 Michael Hanselmann
      self.assertTrue(self.jdm.JobWaiting(job))
2012 b95479a5 Michael Hanselmann
      self.assertEqual(self.jdm._waiters, {
2013 b95479a5 Michael Hanselmann
        job_id: set([job]),
2014 b95479a5 Michael Hanselmann
        })
2015 fcb21ad7 Michael Hanselmann
      self.assertEqual(self.jdm.GetLockInfo([query.LQ_PENDING]), [
2016 fcb21ad7 Michael Hanselmann
        ("job/14609", None, None, [("job", [job.id])])
2017 fcb21ad7 Michael Hanselmann
        ])
2018 b95479a5 Michael Hanselmann
2019 b95479a5 Michael Hanselmann
      self._status.append((job_id, end_status))
2020 b95479a5 Michael Hanselmann
      (result, _) = self.jdm.CheckAndRegister(job, job_id, dep_status)
2021 b95479a5 Michael Hanselmann
      self.assertEqual(result, self.jdm.CONTINUE)
2022 b95479a5 Michael Hanselmann
      self.assertFalse(self._status)
2023 b95479a5 Michael Hanselmann
      self.assertFalse(self._queue)
2024 b95479a5 Michael Hanselmann
      self.assertFalse(self.jdm.JobWaiting(job))
2025 fcb21ad7 Michael Hanselmann
      self.assertFalse(self.jdm.GetLockInfo([query.LQ_PENDING]))
2026 b95479a5 Michael Hanselmann
2027 b95479a5 Michael Hanselmann
  def testNotify(self):
2028 1b5150ab Michael Hanselmann
    job = _IdOnlyFakeJob(8227)
2029 b95479a5 Michael Hanselmann
    job_id = str(4113)
2030 b95479a5 Michael Hanselmann
2031 b95479a5 Michael Hanselmann
    self._status.append((job_id, constants.JOB_STATUS_RUNNING))
2032 b95479a5 Michael Hanselmann
    (result, _) = self.jdm.CheckAndRegister(job, job_id, [])
2033 b95479a5 Michael Hanselmann
    self.assertEqual(result, self.jdm.WAIT)
2034 b95479a5 Michael Hanselmann
    self.assertFalse(self._status)
2035 b95479a5 Michael Hanselmann
    self.assertFalse(self._queue)
2036 b95479a5 Michael Hanselmann
    self.assertTrue(self.jdm.JobWaiting(job))
2037 b95479a5 Michael Hanselmann
    self.assertEqual(self.jdm._waiters, {
2038 b95479a5 Michael Hanselmann
      job_id: set([job]),
2039 b95479a5 Michael Hanselmann
      })
2040 b95479a5 Michael Hanselmann
2041 b95479a5 Michael Hanselmann
    self.jdm.NotifyWaiters(job_id)
2042 b95479a5 Michael Hanselmann
    self.assertFalse(self._status)
2043 b95479a5 Michael Hanselmann
    self.assertFalse(self.jdm._waiters)
2044 b95479a5 Michael Hanselmann
    self.assertFalse(self.jdm.JobWaiting(job))
2045 b95479a5 Michael Hanselmann
    self.assertEqual(self._queue, [set([job])])
2046 b95479a5 Michael Hanselmann
2047 b95479a5 Michael Hanselmann
  def testWrongStatus(self):
2048 1b5150ab Michael Hanselmann
    job = _IdOnlyFakeJob(10102)
2049 b95479a5 Michael Hanselmann
    job_id = str(1271)
2050 b95479a5 Michael Hanselmann
2051 b95479a5 Michael Hanselmann
    self._status.append((job_id, constants.JOB_STATUS_QUEUED))
2052 b95479a5 Michael Hanselmann
    (result, _) = self.jdm.CheckAndRegister(job, job_id,
2053 b95479a5 Michael Hanselmann
                                            [constants.JOB_STATUS_SUCCESS])
2054 b95479a5 Michael Hanselmann
    self.assertEqual(result, self.jdm.WAIT)
2055 b95479a5 Michael Hanselmann
    self.assertFalse(self._status)
2056 b95479a5 Michael Hanselmann
    self.assertFalse(self._queue)
2057 b95479a5 Michael Hanselmann
    self.assertTrue(self.jdm.JobWaiting(job))
2058 b95479a5 Michael Hanselmann
    self.assertEqual(self.jdm._waiters, {
2059 b95479a5 Michael Hanselmann
      job_id: set([job]),
2060 b95479a5 Michael Hanselmann
      })
2061 b95479a5 Michael Hanselmann
2062 b95479a5 Michael Hanselmann
    self._status.append((job_id, constants.JOB_STATUS_ERROR))
2063 b95479a5 Michael Hanselmann
    (result, _) = self.jdm.CheckAndRegister(job, job_id,
2064 b95479a5 Michael Hanselmann
                                            [constants.JOB_STATUS_SUCCESS])
2065 b95479a5 Michael Hanselmann
    self.assertEqual(result, self.jdm.WRONGSTATUS)
2066 b95479a5 Michael Hanselmann
    self.assertFalse(self._status)
2067 b95479a5 Michael Hanselmann
    self.assertFalse(self._queue)
2068 b95479a5 Michael Hanselmann
    self.assertFalse(self.jdm.JobWaiting(job))
2069 b95479a5 Michael Hanselmann
2070 b95479a5 Michael Hanselmann
  def testCorrectStatus(self):
2071 1b5150ab Michael Hanselmann
    job = _IdOnlyFakeJob(24273)
2072 b95479a5 Michael Hanselmann
    job_id = str(23885)
2073 b95479a5 Michael Hanselmann
2074 b95479a5 Michael Hanselmann
    self._status.append((job_id, constants.JOB_STATUS_QUEUED))
2075 b95479a5 Michael Hanselmann
    (result, _) = self.jdm.CheckAndRegister(job, job_id,
2076 b95479a5 Michael Hanselmann
                                            [constants.JOB_STATUS_SUCCESS])
2077 b95479a5 Michael Hanselmann
    self.assertEqual(result, self.jdm.WAIT)
2078 b95479a5 Michael Hanselmann
    self.assertFalse(self._status)
2079 b95479a5 Michael Hanselmann
    self.assertFalse(self._queue)
2080 b95479a5 Michael Hanselmann
    self.assertTrue(self.jdm.JobWaiting(job))
2081 b95479a5 Michael Hanselmann
    self.assertEqual(self.jdm._waiters, {
2082 b95479a5 Michael Hanselmann
      job_id: set([job]),
2083 b95479a5 Michael Hanselmann
      })
2084 b95479a5 Michael Hanselmann
2085 b95479a5 Michael Hanselmann
    self._status.append((job_id, constants.JOB_STATUS_SUCCESS))
2086 b95479a5 Michael Hanselmann
    (result, _) = self.jdm.CheckAndRegister(job, job_id,
2087 b95479a5 Michael Hanselmann
                                            [constants.JOB_STATUS_SUCCESS])
2088 b95479a5 Michael Hanselmann
    self.assertEqual(result, self.jdm.CONTINUE)
2089 b95479a5 Michael Hanselmann
    self.assertFalse(self._status)
2090 b95479a5 Michael Hanselmann
    self.assertFalse(self._queue)
2091 b95479a5 Michael Hanselmann
    self.assertFalse(self.jdm.JobWaiting(job))
2092 b95479a5 Michael Hanselmann
2093 b95479a5 Michael Hanselmann
  def testFinalizedRightAway(self):
2094 1b5150ab Michael Hanselmann
    job = _IdOnlyFakeJob(224)
2095 b95479a5 Michael Hanselmann
    job_id = str(3081)
2096 b95479a5 Michael Hanselmann
2097 b95479a5 Michael Hanselmann
    self._status.append((job_id, constants.JOB_STATUS_SUCCESS))
2098 b95479a5 Michael Hanselmann
    (result, _) = self.jdm.CheckAndRegister(job, job_id,
2099 b95479a5 Michael Hanselmann
                                            [constants.JOB_STATUS_SUCCESS])
2100 b95479a5 Michael Hanselmann
    self.assertEqual(result, self.jdm.CONTINUE)
2101 b95479a5 Michael Hanselmann
    self.assertFalse(self._status)
2102 b95479a5 Michael Hanselmann
    self.assertFalse(self._queue)
2103 b95479a5 Michael Hanselmann
    self.assertFalse(self.jdm.JobWaiting(job))
2104 b95479a5 Michael Hanselmann
    self.assertEqual(self.jdm._waiters, {
2105 b95479a5 Michael Hanselmann
      job_id: set(),
2106 b95479a5 Michael Hanselmann
      })
2107 b95479a5 Michael Hanselmann
2108 b95479a5 Michael Hanselmann
    # Force cleanup
2109 b95479a5 Michael Hanselmann
    self.jdm.NotifyWaiters("0")
2110 b95479a5 Michael Hanselmann
    self.assertFalse(self.jdm._waiters)
2111 b95479a5 Michael Hanselmann
    self.assertFalse(self._status)
2112 b95479a5 Michael Hanselmann
    self.assertFalse(self._queue)
2113 b95479a5 Michael Hanselmann
2114 fcb21ad7 Michael Hanselmann
  def testMultipleWaiting(self):
2115 fcb21ad7 Michael Hanselmann
    # Use a deterministic random generator
2116 fcb21ad7 Michael Hanselmann
    rnd = random.Random(21402)
2117 fcb21ad7 Michael Hanselmann
2118 fcb21ad7 Michael Hanselmann
    job_ids = map(str, rnd.sample(range(1, 10000), 150))
2119 fcb21ad7 Michael Hanselmann
2120 fcb21ad7 Michael Hanselmann
    waiters = dict((job_ids.pop(),
2121 1b5150ab Michael Hanselmann
                    set(map(_IdOnlyFakeJob,
2122 fcb21ad7 Michael Hanselmann
                            [job_ids.pop()
2123 fcb21ad7 Michael Hanselmann
                             for _ in range(rnd.randint(1, 20))])))
2124 fcb21ad7 Michael Hanselmann
                   for _ in range(10))
2125 fcb21ad7 Michael Hanselmann
2126 fcb21ad7 Michael Hanselmann
    # Ensure there are no duplicate job IDs
2127 fcb21ad7 Michael Hanselmann
    assert not utils.FindDuplicates(waiters.keys() +
2128 fcb21ad7 Michael Hanselmann
                                    [job.id
2129 fcb21ad7 Michael Hanselmann
                                     for jobs in waiters.values()
2130 fcb21ad7 Michael Hanselmann
                                     for job in jobs])
2131 fcb21ad7 Michael Hanselmann
2132 fcb21ad7 Michael Hanselmann
    # Register all jobs as waiters
2133 fcb21ad7 Michael Hanselmann
    for job_id, job in [(job_id, job)
2134 fcb21ad7 Michael Hanselmann
                        for (job_id, jobs) in waiters.items()
2135 fcb21ad7 Michael Hanselmann
                        for job in jobs]:
2136 fcb21ad7 Michael Hanselmann
      self._status.append((job_id, constants.JOB_STATUS_QUEUED))
2137 fcb21ad7 Michael Hanselmann
      (result, _) = self.jdm.CheckAndRegister(job, job_id,
2138 fcb21ad7 Michael Hanselmann
                                              [constants.JOB_STATUS_SUCCESS])
2139 fcb21ad7 Michael Hanselmann
      self.assertEqual(result, self.jdm.WAIT)
2140 fcb21ad7 Michael Hanselmann
      self.assertFalse(self._status)
2141 fcb21ad7 Michael Hanselmann
      self.assertFalse(self._queue)
2142 fcb21ad7 Michael Hanselmann
      self.assertTrue(self.jdm.JobWaiting(job))
2143 fcb21ad7 Michael Hanselmann
2144 fcb21ad7 Michael Hanselmann
    self.assertEqual(self.jdm._waiters, waiters)
2145 fcb21ad7 Michael Hanselmann
2146 fcb21ad7 Michael Hanselmann
    def _MakeSet((name, mode, owner_names, pending)):
2147 fcb21ad7 Michael Hanselmann
      return (name, mode, owner_names,
2148 fcb21ad7 Michael Hanselmann
              [(pendmode, set(pend)) for (pendmode, pend) in pending])
2149 fcb21ad7 Michael Hanselmann
2150 fcb21ad7 Michael Hanselmann
    def _CheckLockInfo():
2151 fcb21ad7 Michael Hanselmann
      info = self.jdm.GetLockInfo([query.LQ_PENDING])
2152 fcb21ad7 Michael Hanselmann
      self.assertEqual(sorted(map(_MakeSet, info)), sorted([
2153 fcb21ad7 Michael Hanselmann
        ("job/%s" % job_id, None, None,
2154 fcb21ad7 Michael Hanselmann
         [("job", set([job.id for job in jobs]))])
2155 fcb21ad7 Michael Hanselmann
        for job_id, jobs in waiters.items()
2156 fcb21ad7 Michael Hanselmann
        if jobs
2157 fcb21ad7 Michael Hanselmann
        ]))
2158 fcb21ad7 Michael Hanselmann
2159 fcb21ad7 Michael Hanselmann
    _CheckLockInfo()
2160 fcb21ad7 Michael Hanselmann
2161 fcb21ad7 Michael Hanselmann
    # Notify in random order
2162 fcb21ad7 Michael Hanselmann
    for job_id in rnd.sample(waiters, len(waiters)):
2163 fcb21ad7 Michael Hanselmann
      # Remove from pending waiter list
2164 fcb21ad7 Michael Hanselmann
      jobs = waiters.pop(job_id)
2165 fcb21ad7 Michael Hanselmann
      for job in jobs:
2166 fcb21ad7 Michael Hanselmann
        self._status.append((job_id, constants.JOB_STATUS_SUCCESS))
2167 fcb21ad7 Michael Hanselmann
        (result, _) = self.jdm.CheckAndRegister(job, job_id,
2168 fcb21ad7 Michael Hanselmann
                                                [constants.JOB_STATUS_SUCCESS])
2169 fcb21ad7 Michael Hanselmann
        self.assertEqual(result, self.jdm.CONTINUE)
2170 fcb21ad7 Michael Hanselmann
        self.assertFalse(self._status)
2171 fcb21ad7 Michael Hanselmann
        self.assertFalse(self._queue)
2172 fcb21ad7 Michael Hanselmann
        self.assertFalse(self.jdm.JobWaiting(job))
2173 fcb21ad7 Michael Hanselmann
2174 fcb21ad7 Michael Hanselmann
      _CheckLockInfo()
2175 fcb21ad7 Michael Hanselmann
2176 fcb21ad7 Michael Hanselmann
    self.assertFalse(self.jdm.GetLockInfo([query.LQ_PENDING]))
2177 fcb21ad7 Michael Hanselmann
2178 fcb21ad7 Michael Hanselmann
    assert not waiters
2179 fcb21ad7 Michael Hanselmann
2180 b95479a5 Michael Hanselmann
  def testSelfDependency(self):
2181 1b5150ab Michael Hanselmann
    job = _IdOnlyFakeJob(18937)
2182 b95479a5 Michael Hanselmann
2183 b95479a5 Michael Hanselmann
    self._status.append((job.id, constants.JOB_STATUS_SUCCESS))
2184 b95479a5 Michael Hanselmann
    (result, _) = self.jdm.CheckAndRegister(job, job.id, [])
2185 b95479a5 Michael Hanselmann
    self.assertEqual(result, self.jdm.ERROR)
2186 b95479a5 Michael Hanselmann
2187 b95479a5 Michael Hanselmann
  def testJobDisappears(self):
2188 1b5150ab Michael Hanselmann
    job = _IdOnlyFakeJob(30540)
2189 b95479a5 Michael Hanselmann
    job_id = str(23769)
2190 b95479a5 Michael Hanselmann
2191 b95479a5 Michael Hanselmann
    def _FakeStatus(_):
2192 b95479a5 Michael Hanselmann
      raise errors.JobLost("#msg#")
2193 b95479a5 Michael Hanselmann
2194 b95479a5 Michael Hanselmann
    jdm = jqueue._JobDependencyManager(_FakeStatus, None)
2195 b95479a5 Michael Hanselmann
    (result, _) = jdm.CheckAndRegister(job, job_id, [])
2196 b95479a5 Michael Hanselmann
    self.assertEqual(result, self.jdm.ERROR)
2197 b95479a5 Michael Hanselmann
    self.assertFalse(jdm.JobWaiting(job))
2198 fcb21ad7 Michael Hanselmann
    self.assertFalse(jdm.GetLockInfo([query.LQ_PENDING]))
2199 b95479a5 Michael Hanselmann
2200 b95479a5 Michael Hanselmann
2201 989a8bee Michael Hanselmann
if __name__ == "__main__":
2202 989a8bee Michael Hanselmann
  testutils.GanetiTestProgram()