Statistics
| Branch: | Tag: | Revision:

root / test / ganeti.jqueue_unittest.py @ db04ce5d

History | View | Annotate | Download (74.2 kB)

1 989a8bee Michael Hanselmann
#!/usr/bin/python
2 989a8bee Michael Hanselmann
#
3 989a8bee Michael Hanselmann
4 1e6d5750 Iustin Pop
# Copyright (C) 2010, 2011 Google Inc.
5 989a8bee Michael Hanselmann
#
6 989a8bee Michael Hanselmann
# This program is free software; you can redistribute it and/or modify
7 989a8bee Michael Hanselmann
# it under the terms of the GNU General Public License as published by
8 989a8bee Michael Hanselmann
# the Free Software Foundation; either version 2 of the License, or
9 989a8bee Michael Hanselmann
# (at your option) any later version.
10 989a8bee Michael Hanselmann
#
11 989a8bee Michael Hanselmann
# This program is distributed in the hope that it will be useful, but
12 989a8bee Michael Hanselmann
# WITHOUT ANY WARRANTY; without even the implied warranty of
13 989a8bee Michael Hanselmann
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 989a8bee Michael Hanselmann
# General Public License for more details.
15 989a8bee Michael Hanselmann
#
16 989a8bee Michael Hanselmann
# You should have received a copy of the GNU General Public License
17 989a8bee Michael Hanselmann
# along with this program; if not, write to the Free Software
18 989a8bee Michael Hanselmann
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 989a8bee Michael Hanselmann
# 02110-1301, USA.
20 989a8bee Michael Hanselmann
21 989a8bee Michael Hanselmann
22 989a8bee Michael Hanselmann
"""Script for testing ganeti.jqueue"""
23 989a8bee Michael Hanselmann
24 989a8bee Michael Hanselmann
import os
25 989a8bee Michael Hanselmann
import sys
26 989a8bee Michael Hanselmann
import unittest
27 989a8bee Michael Hanselmann
import tempfile
28 989a8bee Michael Hanselmann
import shutil
29 989a8bee Michael Hanselmann
import errno
30 26d3fd2f Michael Hanselmann
import itertools
31 fcb21ad7 Michael Hanselmann
import random
32 989a8bee Michael Hanselmann
33 989a8bee Michael Hanselmann
from ganeti import constants
34 989a8bee Michael Hanselmann
from ganeti import utils
35 989a8bee Michael Hanselmann
from ganeti import errors
36 989a8bee Michael Hanselmann
from ganeti import jqueue
37 8f5c488d Michael Hanselmann
from ganeti import opcodes
38 8f5c488d Michael Hanselmann
from ganeti import compat
39 26d3fd2f Michael Hanselmann
from ganeti import mcpu
40 fcb21ad7 Michael Hanselmann
from ganeti import query
41 989a8bee Michael Hanselmann
42 989a8bee Michael Hanselmann
import testutils
43 989a8bee Michael Hanselmann
44 989a8bee Michael Hanselmann
45 989a8bee Michael Hanselmann
class _FakeJob:
46 989a8bee Michael Hanselmann
  def __init__(self, job_id, status):
47 989a8bee Michael Hanselmann
    self.id = job_id
48 c0f6d0d8 Michael Hanselmann
    self.writable = False
49 989a8bee Michael Hanselmann
    self._status = status
50 989a8bee Michael Hanselmann
    self._log = []
51 989a8bee Michael Hanselmann
52 989a8bee Michael Hanselmann
  def SetStatus(self, status):
53 989a8bee Michael Hanselmann
    self._status = status
54 989a8bee Michael Hanselmann
55 989a8bee Michael Hanselmann
  def AddLogEntry(self, msg):
56 989a8bee Michael Hanselmann
    self._log.append((len(self._log), msg))
57 989a8bee Michael Hanselmann
58 989a8bee Michael Hanselmann
  def CalcStatus(self):
59 989a8bee Michael Hanselmann
    return self._status
60 989a8bee Michael Hanselmann
61 989a8bee Michael Hanselmann
  def GetInfo(self, fields):
62 989a8bee Michael Hanselmann
    result = []
63 989a8bee Michael Hanselmann
64 989a8bee Michael Hanselmann
    for name in fields:
65 989a8bee Michael Hanselmann
      if name == "status":
66 989a8bee Michael Hanselmann
        result.append(self._status)
67 989a8bee Michael Hanselmann
      else:
68 989a8bee Michael Hanselmann
        raise Exception("Unknown field")
69 989a8bee Michael Hanselmann
70 989a8bee Michael Hanselmann
    return result
71 989a8bee Michael Hanselmann
72 989a8bee Michael Hanselmann
  def GetLogEntries(self, newer_than):
73 989a8bee Michael Hanselmann
    assert newer_than is None or newer_than >= 0
74 989a8bee Michael Hanselmann
75 989a8bee Michael Hanselmann
    if newer_than is None:
76 989a8bee Michael Hanselmann
      return self._log
77 989a8bee Michael Hanselmann
78 989a8bee Michael Hanselmann
    return self._log[newer_than:]
79 989a8bee Michael Hanselmann
80 989a8bee Michael Hanselmann
81 989a8bee Michael Hanselmann
class TestJobChangesChecker(unittest.TestCase):
82 989a8bee Michael Hanselmann
  def testStatus(self):
83 989a8bee Michael Hanselmann
    job = _FakeJob(9094, constants.JOB_STATUS_QUEUED)
84 989a8bee Michael Hanselmann
    checker = jqueue._JobChangesChecker(["status"], None, None)
85 989a8bee Michael Hanselmann
    self.assertEqual(checker(job), ([constants.JOB_STATUS_QUEUED], []))
86 989a8bee Michael Hanselmann
87 989a8bee Michael Hanselmann
    job.SetStatus(constants.JOB_STATUS_RUNNING)
88 989a8bee Michael Hanselmann
    self.assertEqual(checker(job), ([constants.JOB_STATUS_RUNNING], []))
89 989a8bee Michael Hanselmann
90 989a8bee Michael Hanselmann
    job.SetStatus(constants.JOB_STATUS_SUCCESS)
91 989a8bee Michael Hanselmann
    self.assertEqual(checker(job), ([constants.JOB_STATUS_SUCCESS], []))
92 989a8bee Michael Hanselmann
93 989a8bee Michael Hanselmann
    # job.id is used by checker
94 989a8bee Michael Hanselmann
    self.assertEqual(job.id, 9094)
95 989a8bee Michael Hanselmann
96 989a8bee Michael Hanselmann
  def testStatusWithPrev(self):
97 989a8bee Michael Hanselmann
    job = _FakeJob(12807, constants.JOB_STATUS_QUEUED)
98 989a8bee Michael Hanselmann
    checker = jqueue._JobChangesChecker(["status"],
99 989a8bee Michael Hanselmann
                                        [constants.JOB_STATUS_QUEUED], None)
100 989a8bee Michael Hanselmann
    self.assert_(checker(job) is None)
101 989a8bee Michael Hanselmann
102 989a8bee Michael Hanselmann
    job.SetStatus(constants.JOB_STATUS_RUNNING)
103 989a8bee Michael Hanselmann
    self.assertEqual(checker(job), ([constants.JOB_STATUS_RUNNING], []))
104 989a8bee Michael Hanselmann
105 989a8bee Michael Hanselmann
  def testFinalStatus(self):
106 989a8bee Michael Hanselmann
    for status in constants.JOBS_FINALIZED:
107 989a8bee Michael Hanselmann
      job = _FakeJob(2178711, status)
108 989a8bee Michael Hanselmann
      checker = jqueue._JobChangesChecker(["status"], [status], None)
109 989a8bee Michael Hanselmann
      # There won't be any changes in this status, hence it should signal
110 989a8bee Michael Hanselmann
      # a change immediately
111 989a8bee Michael Hanselmann
      self.assertEqual(checker(job), ([status], []))
112 989a8bee Michael Hanselmann
113 989a8bee Michael Hanselmann
  def testLog(self):
114 989a8bee Michael Hanselmann
    job = _FakeJob(9094, constants.JOB_STATUS_RUNNING)
115 989a8bee Michael Hanselmann
    checker = jqueue._JobChangesChecker(["status"], None, None)
116 989a8bee Michael Hanselmann
    self.assertEqual(checker(job), ([constants.JOB_STATUS_RUNNING], []))
117 989a8bee Michael Hanselmann
118 989a8bee Michael Hanselmann
    job.AddLogEntry("Hello World")
119 989a8bee Michael Hanselmann
    (job_info, log_entries) = checker(job)
120 989a8bee Michael Hanselmann
    self.assertEqual(job_info, [constants.JOB_STATUS_RUNNING])
121 989a8bee Michael Hanselmann
    self.assertEqual(log_entries, [[0, "Hello World"]])
122 989a8bee Michael Hanselmann
123 989a8bee Michael Hanselmann
    checker2 = jqueue._JobChangesChecker(["status"], job_info, len(log_entries))
124 989a8bee Michael Hanselmann
    self.assert_(checker2(job) is None)
125 989a8bee Michael Hanselmann
126 989a8bee Michael Hanselmann
    job.AddLogEntry("Foo Bar")
127 989a8bee Michael Hanselmann
    job.SetStatus(constants.JOB_STATUS_ERROR)
128 989a8bee Michael Hanselmann
129 989a8bee Michael Hanselmann
    (job_info, log_entries) = checker2(job)
130 989a8bee Michael Hanselmann
    self.assertEqual(job_info, [constants.JOB_STATUS_ERROR])
131 989a8bee Michael Hanselmann
    self.assertEqual(log_entries, [[1, "Foo Bar"]])
132 989a8bee Michael Hanselmann
133 989a8bee Michael Hanselmann
    checker3 = jqueue._JobChangesChecker(["status"], None, None)
134 989a8bee Michael Hanselmann
    (job_info, log_entries) = checker3(job)
135 989a8bee Michael Hanselmann
    self.assertEqual(job_info, [constants.JOB_STATUS_ERROR])
136 989a8bee Michael Hanselmann
    self.assertEqual(log_entries, [[0, "Hello World"], [1, "Foo Bar"]])
137 989a8bee Michael Hanselmann
138 989a8bee Michael Hanselmann
139 989a8bee Michael Hanselmann
class TestJobChangesWaiter(unittest.TestCase):
140 989a8bee Michael Hanselmann
  def setUp(self):
141 989a8bee Michael Hanselmann
    self.tmpdir = tempfile.mkdtemp()
142 989a8bee Michael Hanselmann
    self.filename = utils.PathJoin(self.tmpdir, "job-1")
143 989a8bee Michael Hanselmann
    utils.WriteFile(self.filename, data="")
144 989a8bee Michael Hanselmann
145 989a8bee Michael Hanselmann
  def tearDown(self):
146 989a8bee Michael Hanselmann
    shutil.rmtree(self.tmpdir)
147 989a8bee Michael Hanselmann
148 989a8bee Michael Hanselmann
  def _EnsureNotifierClosed(self, notifier):
149 989a8bee Michael Hanselmann
    try:
150 989a8bee Michael Hanselmann
      os.fstat(notifier._fd)
151 989a8bee Michael Hanselmann
    except EnvironmentError, err:
152 989a8bee Michael Hanselmann
      self.assertEqual(err.errno, errno.EBADF)
153 989a8bee Michael Hanselmann
    else:
154 989a8bee Michael Hanselmann
      self.fail("File descriptor wasn't closed")
155 989a8bee Michael Hanselmann
156 989a8bee Michael Hanselmann
  def testClose(self):
157 989a8bee Michael Hanselmann
    for wait in [False, True]:
158 989a8bee Michael Hanselmann
      waiter = jqueue._JobFileChangesWaiter(self.filename)
159 989a8bee Michael Hanselmann
      try:
160 989a8bee Michael Hanselmann
        if wait:
161 989a8bee Michael Hanselmann
          waiter.Wait(0.001)
162 989a8bee Michael Hanselmann
      finally:
163 989a8bee Michael Hanselmann
        waiter.Close()
164 989a8bee Michael Hanselmann
165 989a8bee Michael Hanselmann
      # Ensure file descriptor was closed
166 989a8bee Michael Hanselmann
      self._EnsureNotifierClosed(waiter._notifier)
167 989a8bee Michael Hanselmann
168 989a8bee Michael Hanselmann
  def testChangingFile(self):
169 989a8bee Michael Hanselmann
    waiter = jqueue._JobFileChangesWaiter(self.filename)
170 989a8bee Michael Hanselmann
    try:
171 989a8bee Michael Hanselmann
      self.assertFalse(waiter.Wait(0.1))
172 989a8bee Michael Hanselmann
      utils.WriteFile(self.filename, data="changed")
173 989a8bee Michael Hanselmann
      self.assert_(waiter.Wait(60))
174 989a8bee Michael Hanselmann
    finally:
175 989a8bee Michael Hanselmann
      waiter.Close()
176 989a8bee Michael Hanselmann
177 989a8bee Michael Hanselmann
    self._EnsureNotifierClosed(waiter._notifier)
178 989a8bee Michael Hanselmann
179 989a8bee Michael Hanselmann
  def testChangingFile2(self):
180 989a8bee Michael Hanselmann
    waiter = jqueue._JobChangesWaiter(self.filename)
181 989a8bee Michael Hanselmann
    try:
182 989a8bee Michael Hanselmann
      self.assertFalse(waiter._filewaiter)
183 989a8bee Michael Hanselmann
      self.assert_(waiter.Wait(0.1))
184 989a8bee Michael Hanselmann
      self.assert_(waiter._filewaiter)
185 989a8bee Michael Hanselmann
186 989a8bee Michael Hanselmann
      # File waiter is now used, but there have been no changes
187 989a8bee Michael Hanselmann
      self.assertFalse(waiter.Wait(0.1))
188 989a8bee Michael Hanselmann
      utils.WriteFile(self.filename, data="changed")
189 989a8bee Michael Hanselmann
      self.assert_(waiter.Wait(60))
190 989a8bee Michael Hanselmann
    finally:
191 989a8bee Michael Hanselmann
      waiter.Close()
192 989a8bee Michael Hanselmann
193 989a8bee Michael Hanselmann
    self._EnsureNotifierClosed(waiter._filewaiter._notifier)
194 989a8bee Michael Hanselmann
195 989a8bee Michael Hanselmann
196 989a8bee Michael Hanselmann
class TestWaitForJobChangesHelper(unittest.TestCase):
197 989a8bee Michael Hanselmann
  def setUp(self):
198 989a8bee Michael Hanselmann
    self.tmpdir = tempfile.mkdtemp()
199 989a8bee Michael Hanselmann
    self.filename = utils.PathJoin(self.tmpdir, "job-2614226563")
200 989a8bee Michael Hanselmann
    utils.WriteFile(self.filename, data="")
201 989a8bee Michael Hanselmann
202 989a8bee Michael Hanselmann
  def tearDown(self):
203 989a8bee Michael Hanselmann
    shutil.rmtree(self.tmpdir)
204 989a8bee Michael Hanselmann
205 989a8bee Michael Hanselmann
  def _LoadWaitingJob(self):
206 47099cd1 Michael Hanselmann
    return _FakeJob(2614226563, constants.JOB_STATUS_WAITING)
207 989a8bee Michael Hanselmann
208 989a8bee Michael Hanselmann
  def _LoadLostJob(self):
209 989a8bee Michael Hanselmann
    return None
210 989a8bee Michael Hanselmann
211 989a8bee Michael Hanselmann
  def testNoChanges(self):
212 989a8bee Michael Hanselmann
    wfjc = jqueue._WaitForJobChangesHelper()
213 989a8bee Michael Hanselmann
214 989a8bee Michael Hanselmann
    # No change
215 989a8bee Michael Hanselmann
    self.assertEqual(wfjc(self.filename, self._LoadWaitingJob, ["status"],
216 47099cd1 Michael Hanselmann
                          [constants.JOB_STATUS_WAITING], None, 0.1),
217 989a8bee Michael Hanselmann
                     constants.JOB_NOTCHANGED)
218 989a8bee Michael Hanselmann
219 989a8bee Michael Hanselmann
    # No previous information
220 989a8bee Michael Hanselmann
    self.assertEqual(wfjc(self.filename, self._LoadWaitingJob,
221 989a8bee Michael Hanselmann
                          ["status"], None, None, 1.0),
222 47099cd1 Michael Hanselmann
                     ([constants.JOB_STATUS_WAITING], []))
223 989a8bee Michael Hanselmann
224 989a8bee Michael Hanselmann
  def testLostJob(self):
225 989a8bee Michael Hanselmann
    wfjc = jqueue._WaitForJobChangesHelper()
226 989a8bee Michael Hanselmann
    self.assert_(wfjc(self.filename, self._LoadLostJob,
227 989a8bee Michael Hanselmann
                      ["status"], None, None, 1.0) is None)
228 989a8bee Michael Hanselmann
229 989a8bee Michael Hanselmann
230 6760e4ed Michael Hanselmann
class TestEncodeOpError(unittest.TestCase):
231 6760e4ed Michael Hanselmann
  def test(self):
232 6760e4ed Michael Hanselmann
    encerr = jqueue._EncodeOpError(errors.LockError("Test 1"))
233 6760e4ed Michael Hanselmann
    self.assert_(isinstance(encerr, tuple))
234 6760e4ed Michael Hanselmann
    self.assertRaises(errors.LockError, errors.MaybeRaise, encerr)
235 6760e4ed Michael Hanselmann
236 6760e4ed Michael Hanselmann
    encerr = jqueue._EncodeOpError(errors.GenericError("Test 2"))
237 6760e4ed Michael Hanselmann
    self.assert_(isinstance(encerr, tuple))
238 6760e4ed Michael Hanselmann
    self.assertRaises(errors.GenericError, errors.MaybeRaise, encerr)
239 6760e4ed Michael Hanselmann
240 6760e4ed Michael Hanselmann
    encerr = jqueue._EncodeOpError(NotImplementedError("Foo"))
241 6760e4ed Michael Hanselmann
    self.assert_(isinstance(encerr, tuple))
242 6760e4ed Michael Hanselmann
    self.assertRaises(errors.OpExecError, errors.MaybeRaise, encerr)
243 6760e4ed Michael Hanselmann
244 6760e4ed Michael Hanselmann
    encerr = jqueue._EncodeOpError("Hello World")
245 6760e4ed Michael Hanselmann
    self.assert_(isinstance(encerr, tuple))
246 6760e4ed Michael Hanselmann
    self.assertRaises(errors.OpExecError, errors.MaybeRaise, encerr)
247 6760e4ed Michael Hanselmann
248 6760e4ed Michael Hanselmann
249 8f5c488d Michael Hanselmann
class TestQueuedOpCode(unittest.TestCase):
250 8f5c488d Michael Hanselmann
  def testDefaults(self):
251 8f5c488d Michael Hanselmann
    def _Check(op):
252 8f5c488d Michael Hanselmann
      self.assertFalse(hasattr(op.input, "dry_run"))
253 8f5c488d Michael Hanselmann
      self.assertEqual(op.priority, constants.OP_PRIO_DEFAULT)
254 8f5c488d Michael Hanselmann
      self.assertFalse(op.log)
255 8f5c488d Michael Hanselmann
      self.assert_(op.start_timestamp is None)
256 8f5c488d Michael Hanselmann
      self.assert_(op.exec_timestamp is None)
257 8f5c488d Michael Hanselmann
      self.assert_(op.end_timestamp is None)
258 8f5c488d Michael Hanselmann
      self.assert_(op.result is None)
259 8f5c488d Michael Hanselmann
      self.assertEqual(op.status, constants.OP_STATUS_QUEUED)
260 8f5c488d Michael Hanselmann
261 8f5c488d Michael Hanselmann
    op1 = jqueue._QueuedOpCode(opcodes.OpTestDelay())
262 8f5c488d Michael Hanselmann
    _Check(op1)
263 8f5c488d Michael Hanselmann
    op2 = jqueue._QueuedOpCode.Restore(op1.Serialize())
264 8f5c488d Michael Hanselmann
    _Check(op2)
265 8f5c488d Michael Hanselmann
    self.assertEqual(op1.Serialize(), op2.Serialize())
266 8f5c488d Michael Hanselmann
267 8f5c488d Michael Hanselmann
  def testPriority(self):
268 8f5c488d Michael Hanselmann
    def _Check(op):
269 8f5c488d Michael Hanselmann
      assert constants.OP_PRIO_DEFAULT != constants.OP_PRIO_HIGH, \
270 8f5c488d Michael Hanselmann
             "Default priority equals high priority; test can't work"
271 8f5c488d Michael Hanselmann
      self.assertEqual(op.priority, constants.OP_PRIO_HIGH)
272 8f5c488d Michael Hanselmann
      self.assertEqual(op.status, constants.OP_STATUS_QUEUED)
273 8f5c488d Michael Hanselmann
274 c6afb1ca Iustin Pop
    inpop = opcodes.OpTagsGet(priority=constants.OP_PRIO_HIGH)
275 8f5c488d Michael Hanselmann
    op1 = jqueue._QueuedOpCode(inpop)
276 8f5c488d Michael Hanselmann
    _Check(op1)
277 8f5c488d Michael Hanselmann
    op2 = jqueue._QueuedOpCode.Restore(op1.Serialize())
278 8f5c488d Michael Hanselmann
    _Check(op2)
279 8f5c488d Michael Hanselmann
    self.assertEqual(op1.Serialize(), op2.Serialize())
280 8f5c488d Michael Hanselmann
281 8f5c488d Michael Hanselmann
282 8f5c488d Michael Hanselmann
class TestQueuedJob(unittest.TestCase):
283 5f6b0b71 Michael Hanselmann
  def test(self):
284 5f6b0b71 Michael Hanselmann
    self.assertRaises(errors.GenericError, jqueue._QueuedJob,
285 c0f6d0d8 Michael Hanselmann
                      None, 1, [], False)
286 5f6b0b71 Michael Hanselmann
287 8f5c488d Michael Hanselmann
  def testDefaults(self):
288 8f5c488d Michael Hanselmann
    job_id = 4260
289 8f5c488d Michael Hanselmann
    ops = [
290 c6afb1ca Iustin Pop
      opcodes.OpTagsGet(),
291 8f5c488d Michael Hanselmann
      opcodes.OpTestDelay(),
292 8f5c488d Michael Hanselmann
      ]
293 8f5c488d Michael Hanselmann
294 8f5c488d Michael Hanselmann
    def _Check(job):
295 c0f6d0d8 Michael Hanselmann
      self.assertTrue(job.writable)
296 8f5c488d Michael Hanselmann
      self.assertEqual(job.id, job_id)
297 8f5c488d Michael Hanselmann
      self.assertEqual(job.log_serial, 0)
298 8f5c488d Michael Hanselmann
      self.assert_(job.received_timestamp)
299 8f5c488d Michael Hanselmann
      self.assert_(job.start_timestamp is None)
300 8f5c488d Michael Hanselmann
      self.assert_(job.end_timestamp is None)
301 8f5c488d Michael Hanselmann
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
302 8f5c488d Michael Hanselmann
      self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT)
303 8f5c488d Michael Hanselmann
      self.assert_(repr(job).startswith("<"))
304 8f5c488d Michael Hanselmann
      self.assertEqual(len(job.ops), len(ops))
305 8f5c488d Michael Hanselmann
      self.assert_(compat.all(inp.__getstate__() == op.input.__getstate__()
306 8f5c488d Michael Hanselmann
                              for (inp, op) in zip(ops, job.ops)))
307 5f6b0b71 Michael Hanselmann
      self.assertRaises(errors.OpExecError, job.GetInfo,
308 5f6b0b71 Michael Hanselmann
                        ["unknown-field"])
309 5f6b0b71 Michael Hanselmann
      self.assertEqual(job.GetInfo(["summary"]),
310 5f6b0b71 Michael Hanselmann
                       [[op.input.Summary() for op in job.ops]])
311 8f5c488d Michael Hanselmann
312 c0f6d0d8 Michael Hanselmann
    job1 = jqueue._QueuedJob(None, job_id, ops, True)
313 8f5c488d Michael Hanselmann
    _Check(job1)
314 c0f6d0d8 Michael Hanselmann
    job2 = jqueue._QueuedJob.Restore(None, job1.Serialize(), True)
315 8f5c488d Michael Hanselmann
    _Check(job2)
316 8f5c488d Michael Hanselmann
    self.assertEqual(job1.Serialize(), job2.Serialize())
317 8f5c488d Michael Hanselmann
318 c0f6d0d8 Michael Hanselmann
  def testWritable(self):
319 c0f6d0d8 Michael Hanselmann
    job = jqueue._QueuedJob(None, 1, [opcodes.OpTestDelay()], False)
320 c0f6d0d8 Michael Hanselmann
    self.assertFalse(job.writable)
321 c0f6d0d8 Michael Hanselmann
322 c0f6d0d8 Michael Hanselmann
    job = jqueue._QueuedJob(None, 1, [opcodes.OpTestDelay()], True)
323 c0f6d0d8 Michael Hanselmann
    self.assertTrue(job.writable)
324 c0f6d0d8 Michael Hanselmann
325 8f5c488d Michael Hanselmann
  def testPriority(self):
326 8f5c488d Michael Hanselmann
    job_id = 4283
327 8f5c488d Michael Hanselmann
    ops = [
328 c6afb1ca Iustin Pop
      opcodes.OpTagsGet(priority=constants.OP_PRIO_DEFAULT),
329 8f5c488d Michael Hanselmann
      opcodes.OpTestDelay(),
330 8f5c488d Michael Hanselmann
      ]
331 8f5c488d Michael Hanselmann
332 8f5c488d Michael Hanselmann
    def _Check(job):
333 8f5c488d Michael Hanselmann
      self.assertEqual(job.id, job_id)
334 8f5c488d Michael Hanselmann
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
335 8f5c488d Michael Hanselmann
      self.assert_(repr(job).startswith("<"))
336 8f5c488d Michael Hanselmann
337 c0f6d0d8 Michael Hanselmann
    job = jqueue._QueuedJob(None, job_id, ops, True)
338 8f5c488d Michael Hanselmann
    _Check(job)
339 8f5c488d Michael Hanselmann
    self.assert_(compat.all(op.priority == constants.OP_PRIO_DEFAULT
340 8f5c488d Michael Hanselmann
                            for op in job.ops))
341 8f5c488d Michael Hanselmann
    self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT)
342 8f5c488d Michael Hanselmann
343 8f5c488d Michael Hanselmann
    # Increase first
344 8f5c488d Michael Hanselmann
    job.ops[0].priority -= 1
345 8f5c488d Michael Hanselmann
    _Check(job)
346 8f5c488d Michael Hanselmann
    self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT - 1)
347 8f5c488d Michael Hanselmann
348 8f5c488d Michael Hanselmann
    # Mark opcode as finished
349 8f5c488d Michael Hanselmann
    job.ops[0].status = constants.OP_STATUS_SUCCESS
350 8f5c488d Michael Hanselmann
    _Check(job)
351 8f5c488d Michael Hanselmann
    self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT)
352 8f5c488d Michael Hanselmann
353 8f5c488d Michael Hanselmann
    # Increase second
354 8f5c488d Michael Hanselmann
    job.ops[1].priority -= 10
355 8f5c488d Michael Hanselmann
    self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT - 10)
356 8f5c488d Michael Hanselmann
357 8f5c488d Michael Hanselmann
    # Test increasing first
358 8f5c488d Michael Hanselmann
    job.ops[0].status = constants.OP_STATUS_RUNNING
359 8f5c488d Michael Hanselmann
    job.ops[0].priority -= 19
360 8f5c488d Michael Hanselmann
    self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT - 20)
361 8f5c488d Michael Hanselmann
362 db5bce34 Michael Hanselmann
  def testCalcStatus(self):
363 db5bce34 Michael Hanselmann
    def _Queued(ops):
364 db5bce34 Michael Hanselmann
      # The default status is "queued"
365 db5bce34 Michael Hanselmann
      self.assert_(compat.all(op.status == constants.OP_STATUS_QUEUED
366 db5bce34 Michael Hanselmann
                              for op in ops))
367 db5bce34 Michael Hanselmann
368 db5bce34 Michael Hanselmann
    def _Waitlock1(ops):
369 47099cd1 Michael Hanselmann
      ops[0].status = constants.OP_STATUS_WAITING
370 db5bce34 Michael Hanselmann
371 db5bce34 Michael Hanselmann
    def _Waitlock2(ops):
372 db5bce34 Michael Hanselmann
      ops[0].status = constants.OP_STATUS_SUCCESS
373 db5bce34 Michael Hanselmann
      ops[1].status = constants.OP_STATUS_SUCCESS
374 47099cd1 Michael Hanselmann
      ops[2].status = constants.OP_STATUS_WAITING
375 db5bce34 Michael Hanselmann
376 db5bce34 Michael Hanselmann
    def _Running(ops):
377 db5bce34 Michael Hanselmann
      ops[0].status = constants.OP_STATUS_SUCCESS
378 db5bce34 Michael Hanselmann
      ops[1].status = constants.OP_STATUS_RUNNING
379 db5bce34 Michael Hanselmann
      for op in ops[2:]:
380 db5bce34 Michael Hanselmann
        op.status = constants.OP_STATUS_QUEUED
381 db5bce34 Michael Hanselmann
382 db5bce34 Michael Hanselmann
    def _Canceling1(ops):
383 db5bce34 Michael Hanselmann
      ops[0].status = constants.OP_STATUS_SUCCESS
384 db5bce34 Michael Hanselmann
      ops[1].status = constants.OP_STATUS_SUCCESS
385 db5bce34 Michael Hanselmann
      for op in ops[2:]:
386 db5bce34 Michael Hanselmann
        op.status = constants.OP_STATUS_CANCELING
387 db5bce34 Michael Hanselmann
388 db5bce34 Michael Hanselmann
    def _Canceling2(ops):
389 db5bce34 Michael Hanselmann
      for op in ops:
390 db5bce34 Michael Hanselmann
        op.status = constants.OP_STATUS_CANCELING
391 db5bce34 Michael Hanselmann
392 db5bce34 Michael Hanselmann
    def _Canceled(ops):
393 db5bce34 Michael Hanselmann
      for op in ops:
394 db5bce34 Michael Hanselmann
        op.status = constants.OP_STATUS_CANCELED
395 db5bce34 Michael Hanselmann
396 db5bce34 Michael Hanselmann
    def _Error1(ops):
397 db5bce34 Michael Hanselmann
      for idx, op in enumerate(ops):
398 db5bce34 Michael Hanselmann
        if idx > 3:
399 db5bce34 Michael Hanselmann
          op.status = constants.OP_STATUS_ERROR
400 db5bce34 Michael Hanselmann
        else:
401 db5bce34 Michael Hanselmann
          op.status = constants.OP_STATUS_SUCCESS
402 db5bce34 Michael Hanselmann
403 db5bce34 Michael Hanselmann
    def _Error2(ops):
404 db5bce34 Michael Hanselmann
      for op in ops:
405 db5bce34 Michael Hanselmann
        op.status = constants.OP_STATUS_ERROR
406 db5bce34 Michael Hanselmann
407 db5bce34 Michael Hanselmann
    def _Success(ops):
408 db5bce34 Michael Hanselmann
      for op in ops:
409 db5bce34 Michael Hanselmann
        op.status = constants.OP_STATUS_SUCCESS
410 db5bce34 Michael Hanselmann
411 db5bce34 Michael Hanselmann
    tests = {
412 db5bce34 Michael Hanselmann
      constants.JOB_STATUS_QUEUED: [_Queued],
413 47099cd1 Michael Hanselmann
      constants.JOB_STATUS_WAITING: [_Waitlock1, _Waitlock2],
414 db5bce34 Michael Hanselmann
      constants.JOB_STATUS_RUNNING: [_Running],
415 db5bce34 Michael Hanselmann
      constants.JOB_STATUS_CANCELING: [_Canceling1, _Canceling2],
416 db5bce34 Michael Hanselmann
      constants.JOB_STATUS_CANCELED: [_Canceled],
417 db5bce34 Michael Hanselmann
      constants.JOB_STATUS_ERROR: [_Error1, _Error2],
418 db5bce34 Michael Hanselmann
      constants.JOB_STATUS_SUCCESS: [_Success],
419 db5bce34 Michael Hanselmann
      }
420 db5bce34 Michael Hanselmann
421 db5bce34 Michael Hanselmann
    def _NewJob():
422 db5bce34 Michael Hanselmann
      job = jqueue._QueuedJob(None, 1,
423 c0f6d0d8 Michael Hanselmann
                              [opcodes.OpTestDelay() for _ in range(10)],
424 c0f6d0d8 Michael Hanselmann
                              True)
425 db5bce34 Michael Hanselmann
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
426 db5bce34 Michael Hanselmann
      self.assert_(compat.all(op.status == constants.OP_STATUS_QUEUED
427 db5bce34 Michael Hanselmann
                              for op in job.ops))
428 db5bce34 Michael Hanselmann
      return job
429 db5bce34 Michael Hanselmann
430 db5bce34 Michael Hanselmann
    for status in constants.JOB_STATUS_ALL:
431 db5bce34 Michael Hanselmann
      sttests = tests[status]
432 db5bce34 Michael Hanselmann
      assert sttests
433 db5bce34 Michael Hanselmann
      for fn in sttests:
434 db5bce34 Michael Hanselmann
        job = _NewJob()
435 db5bce34 Michael Hanselmann
        fn(job.ops)
436 db5bce34 Michael Hanselmann
        self.assertEqual(job.CalcStatus(), status)
437 db5bce34 Michael Hanselmann
438 8f5c488d Michael Hanselmann
439 b95479a5 Michael Hanselmann
class _FakeDependencyManager:
440 be760ba8 Michael Hanselmann
  def __init__(self):
441 b95479a5 Michael Hanselmann
    self._checks = []
442 b95479a5 Michael Hanselmann
    self._notifications = []
443 b95479a5 Michael Hanselmann
    self._waiting = set()
444 b95479a5 Michael Hanselmann
445 b95479a5 Michael Hanselmann
  def AddCheckResult(self, job, dep_job_id, dep_status, result):
446 b95479a5 Michael Hanselmann
    self._checks.append((job, dep_job_id, dep_status, result))
447 b95479a5 Michael Hanselmann
448 b95479a5 Michael Hanselmann
  def CountPendingResults(self):
449 b95479a5 Michael Hanselmann
    return len(self._checks)
450 b95479a5 Michael Hanselmann
451 b95479a5 Michael Hanselmann
  def CountWaitingJobs(self):
452 b95479a5 Michael Hanselmann
    return len(self._waiting)
453 b95479a5 Michael Hanselmann
454 b95479a5 Michael Hanselmann
  def GetNextNotification(self):
455 b95479a5 Michael Hanselmann
    return self._notifications.pop(0)
456 b95479a5 Michael Hanselmann
457 b95479a5 Michael Hanselmann
  def JobWaiting(self, job):
458 b95479a5 Michael Hanselmann
    return job in self._waiting
459 b95479a5 Michael Hanselmann
460 b95479a5 Michael Hanselmann
  def CheckAndRegister(self, job, dep_job_id, dep_status):
461 b95479a5 Michael Hanselmann
    (exp_job, exp_dep_job_id, exp_dep_status, result) = self._checks.pop(0)
462 b95479a5 Michael Hanselmann
463 b95479a5 Michael Hanselmann
    assert exp_job == job
464 b95479a5 Michael Hanselmann
    assert exp_dep_job_id == dep_job_id
465 b95479a5 Michael Hanselmann
    assert exp_dep_status == dep_status
466 b95479a5 Michael Hanselmann
467 b95479a5 Michael Hanselmann
    (result_status, _) = result
468 b95479a5 Michael Hanselmann
469 b95479a5 Michael Hanselmann
    if result_status == jqueue._JobDependencyManager.WAIT:
470 b95479a5 Michael Hanselmann
      self._waiting.add(job)
471 b95479a5 Michael Hanselmann
    elif result_status == jqueue._JobDependencyManager.CONTINUE:
472 b95479a5 Michael Hanselmann
      self._waiting.remove(job)
473 b95479a5 Michael Hanselmann
474 b95479a5 Michael Hanselmann
    return result
475 b95479a5 Michael Hanselmann
476 b95479a5 Michael Hanselmann
  def NotifyWaiters(self, job_id):
477 b95479a5 Michael Hanselmann
    self._notifications.append(job_id)
478 b95479a5 Michael Hanselmann
479 b95479a5 Michael Hanselmann
480 b95479a5 Michael Hanselmann
class _DisabledFakeDependencyManager:
481 b95479a5 Michael Hanselmann
  def JobWaiting(self, _):
482 b95479a5 Michael Hanselmann
    return False
483 b95479a5 Michael Hanselmann
484 b95479a5 Michael Hanselmann
  def CheckAndRegister(self, *args):
485 b95479a5 Michael Hanselmann
    assert False, "Should not be called"
486 b95479a5 Michael Hanselmann
487 b95479a5 Michael Hanselmann
  def NotifyWaiters(self, _):
488 b95479a5 Michael Hanselmann
    pass
489 b95479a5 Michael Hanselmann
490 b95479a5 Michael Hanselmann
491 b95479a5 Michael Hanselmann
class _FakeQueueForProc:
492 b95479a5 Michael Hanselmann
  def __init__(self, depmgr=None):
493 be760ba8 Michael Hanselmann
    self._acquired = False
494 ebb2a2a3 Michael Hanselmann
    self._updates = []
495 6a373640 Michael Hanselmann
    self._submitted = []
496 6a373640 Michael Hanselmann
497 6a373640 Michael Hanselmann
    self._submit_count = itertools.count(1000)
498 be760ba8 Michael Hanselmann
499 b95479a5 Michael Hanselmann
    if depmgr:
500 b95479a5 Michael Hanselmann
      self.depmgr = depmgr
501 b95479a5 Michael Hanselmann
    else:
502 b95479a5 Michael Hanselmann
      self.depmgr = _DisabledFakeDependencyManager()
503 b95479a5 Michael Hanselmann
504 be760ba8 Michael Hanselmann
  def IsAcquired(self):
505 be760ba8 Michael Hanselmann
    return self._acquired
506 be760ba8 Michael Hanselmann
507 ebb2a2a3 Michael Hanselmann
  def GetNextUpdate(self):
508 ebb2a2a3 Michael Hanselmann
    return self._updates.pop(0)
509 ebb2a2a3 Michael Hanselmann
510 6a373640 Michael Hanselmann
  def GetNextSubmittedJob(self):
511 6a373640 Michael Hanselmann
    return self._submitted.pop(0)
512 6a373640 Michael Hanselmann
513 be760ba8 Michael Hanselmann
  def acquire(self, shared=0):
514 be760ba8 Michael Hanselmann
    assert shared == 1
515 be760ba8 Michael Hanselmann
    self._acquired = True
516 be760ba8 Michael Hanselmann
517 be760ba8 Michael Hanselmann
  def release(self):
518 be760ba8 Michael Hanselmann
    assert self._acquired
519 be760ba8 Michael Hanselmann
    self._acquired = False
520 be760ba8 Michael Hanselmann
521 ebb2a2a3 Michael Hanselmann
  def UpdateJobUnlocked(self, job, replicate=True):
522 ebb2a2a3 Michael Hanselmann
    assert self._acquired, "Lock not acquired while updating job"
523 ebb2a2a3 Michael Hanselmann
    self._updates.append((job, bool(replicate)))
524 be760ba8 Michael Hanselmann
525 6a373640 Michael Hanselmann
  def SubmitManyJobs(self, jobs):
526 6a373640 Michael Hanselmann
    assert not self._acquired, "Lock acquired while submitting jobs"
527 6a373640 Michael Hanselmann
    job_ids = [self._submit_count.next() for _ in jobs]
528 6a373640 Michael Hanselmann
    self._submitted.extend(zip(job_ids, jobs))
529 6a373640 Michael Hanselmann
    return job_ids
530 6a373640 Michael Hanselmann
531 be760ba8 Michael Hanselmann
532 be760ba8 Michael Hanselmann
class _FakeExecOpCodeForProc:
533 ebb2a2a3 Michael Hanselmann
  def __init__(self, queue, before_start, after_start):
534 ebb2a2a3 Michael Hanselmann
    self._queue = queue
535 be760ba8 Michael Hanselmann
    self._before_start = before_start
536 be760ba8 Michael Hanselmann
    self._after_start = after_start
537 be760ba8 Michael Hanselmann
538 f23db633 Michael Hanselmann
  def __call__(self, op, cbs, timeout=None, priority=None):
539 be760ba8 Michael Hanselmann
    assert isinstance(op, opcodes.OpTestDummy)
540 ebb2a2a3 Michael Hanselmann
    assert not self._queue.IsAcquired(), \
541 ebb2a2a3 Michael Hanselmann
           "Queue lock not released when executing opcode"
542 be760ba8 Michael Hanselmann
543 be760ba8 Michael Hanselmann
    if self._before_start:
544 f23db633 Michael Hanselmann
      self._before_start(timeout, priority)
545 be760ba8 Michael Hanselmann
546 be760ba8 Michael Hanselmann
    cbs.NotifyStart()
547 be760ba8 Michael Hanselmann
548 be760ba8 Michael Hanselmann
    if self._after_start:
549 be760ba8 Michael Hanselmann
      self._after_start(op, cbs)
550 be760ba8 Michael Hanselmann
551 ebb2a2a3 Michael Hanselmann
    # Check again after the callbacks
552 ebb2a2a3 Michael Hanselmann
    assert not self._queue.IsAcquired()
553 ebb2a2a3 Michael Hanselmann
554 be760ba8 Michael Hanselmann
    if op.fail:
555 be760ba8 Michael Hanselmann
      raise errors.OpExecError("Error requested (%s)" % op.result)
556 be760ba8 Michael Hanselmann
557 6a373640 Michael Hanselmann
    if hasattr(op, "submit_jobs") and op.submit_jobs is not None:
558 6a373640 Michael Hanselmann
      return cbs.SubmitManyJobs(op.submit_jobs)
559 6a373640 Michael Hanselmann
560 be760ba8 Michael Hanselmann
    return op.result
561 be760ba8 Michael Hanselmann
562 be760ba8 Michael Hanselmann
563 26d3fd2f Michael Hanselmann
class _JobProcessorTestUtils:
564 be760ba8 Michael Hanselmann
  def _CreateJob(self, queue, job_id, ops):
565 c0f6d0d8 Michael Hanselmann
    job = jqueue._QueuedJob(queue, job_id, ops, True)
566 be760ba8 Michael Hanselmann
    self.assertFalse(job.start_timestamp)
567 be760ba8 Michael Hanselmann
    self.assertFalse(job.end_timestamp)
568 be760ba8 Michael Hanselmann
    self.assertEqual(len(ops), len(job.ops))
569 be760ba8 Michael Hanselmann
    self.assert_(compat.all(op.input == inp
570 be760ba8 Michael Hanselmann
                            for (op, inp) in zip(job.ops, ops)))
571 be760ba8 Michael Hanselmann
    self.assertEqual(job.GetInfo(["ops"]), [[op.__getstate__() for op in ops]])
572 be760ba8 Michael Hanselmann
    return job
573 be760ba8 Michael Hanselmann
574 26d3fd2f Michael Hanselmann
575 26d3fd2f Michael Hanselmann
class TestJobProcessor(unittest.TestCase, _JobProcessorTestUtils):
576 be760ba8 Michael Hanselmann
  def _GenericCheckJob(self, job):
577 be760ba8 Michael Hanselmann
    assert compat.all(isinstance(op.input, opcodes.OpTestDummy)
578 be760ba8 Michael Hanselmann
                      for op in job.ops)
579 be760ba8 Michael Hanselmann
580 be760ba8 Michael Hanselmann
    self.assertEqual(job.GetInfo(["opstart", "opexec", "opend"]),
581 be760ba8 Michael Hanselmann
                     [[op.start_timestamp for op in job.ops],
582 be760ba8 Michael Hanselmann
                      [op.exec_timestamp for op in job.ops],
583 be760ba8 Michael Hanselmann
                      [op.end_timestamp for op in job.ops]])
584 be760ba8 Michael Hanselmann
    self.assertEqual(job.GetInfo(["received_ts", "start_ts", "end_ts"]),
585 be760ba8 Michael Hanselmann
                     [job.received_timestamp,
586 be760ba8 Michael Hanselmann
                      job.start_timestamp,
587 be760ba8 Michael Hanselmann
                      job.end_timestamp])
588 be760ba8 Michael Hanselmann
    self.assert_(job.start_timestamp)
589 be760ba8 Michael Hanselmann
    self.assert_(job.end_timestamp)
590 be760ba8 Michael Hanselmann
    self.assertEqual(job.start_timestamp, job.ops[0].start_timestamp)
591 be760ba8 Michael Hanselmann
592 be760ba8 Michael Hanselmann
  def testSuccess(self):
593 be760ba8 Michael Hanselmann
    queue = _FakeQueueForProc()
594 be760ba8 Michael Hanselmann
595 be760ba8 Michael Hanselmann
    for (job_id, opcount) in [(25351, 1), (6637, 3),
596 be760ba8 Michael Hanselmann
                              (24644, 10), (32207, 100)]:
597 be760ba8 Michael Hanselmann
      ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
598 be760ba8 Michael Hanselmann
             for i in range(opcount)]
599 be760ba8 Michael Hanselmann
600 be760ba8 Michael Hanselmann
      # Create job
601 be760ba8 Michael Hanselmann
      job = self._CreateJob(queue, job_id, ops)
602 be760ba8 Michael Hanselmann
603 f23db633 Michael Hanselmann
      def _BeforeStart(timeout, priority):
604 ebb2a2a3 Michael Hanselmann
        self.assertEqual(queue.GetNextUpdate(), (job, True))
605 ebb2a2a3 Michael Hanselmann
        self.assertRaises(IndexError, queue.GetNextUpdate)
606 be760ba8 Michael Hanselmann
        self.assertFalse(queue.IsAcquired())
607 47099cd1 Michael Hanselmann
        self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITING)
608 5fd6b694 Michael Hanselmann
        self.assertFalse(job.cur_opctx)
609 be760ba8 Michael Hanselmann
610 be760ba8 Michael Hanselmann
      def _AfterStart(op, cbs):
611 ebb2a2a3 Michael Hanselmann
        self.assertEqual(queue.GetNextUpdate(), (job, True))
612 ebb2a2a3 Michael Hanselmann
        self.assertRaises(IndexError, queue.GetNextUpdate)
613 ebb2a2a3 Michael Hanselmann
614 be760ba8 Michael Hanselmann
        self.assertFalse(queue.IsAcquired())
615 be760ba8 Michael Hanselmann
        self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
616 5fd6b694 Michael Hanselmann
        self.assertFalse(job.cur_opctx)
617 be760ba8 Michael Hanselmann
618 be760ba8 Michael Hanselmann
        # Job is running, cancelling shouldn't be possible
619 be760ba8 Michael Hanselmann
        (success, _) = job.Cancel()
620 be760ba8 Michael Hanselmann
        self.assertFalse(success)
621 be760ba8 Michael Hanselmann
622 ebb2a2a3 Michael Hanselmann
      opexec = _FakeExecOpCodeForProc(queue, _BeforeStart, _AfterStart)
623 be760ba8 Michael Hanselmann
624 be760ba8 Michael Hanselmann
      for idx in range(len(ops)):
625 ebb2a2a3 Michael Hanselmann
        self.assertRaises(IndexError, queue.GetNextUpdate)
626 be760ba8 Michael Hanselmann
        result = jqueue._JobProcessor(queue, opexec, job)()
627 ebb2a2a3 Michael Hanselmann
        self.assertEqual(queue.GetNextUpdate(), (job, True))
628 ebb2a2a3 Michael Hanselmann
        self.assertRaises(IndexError, queue.GetNextUpdate)
629 be760ba8 Michael Hanselmann
        if idx == len(ops) - 1:
630 be760ba8 Michael Hanselmann
          # Last opcode
631 75d81fc8 Michael Hanselmann
          self.assertEqual(result, jqueue._JobProcessor.FINISHED)
632 be760ba8 Michael Hanselmann
        else:
633 75d81fc8 Michael Hanselmann
          self.assertEqual(result, jqueue._JobProcessor.DEFER)
634 be760ba8 Michael Hanselmann
635 be760ba8 Michael Hanselmann
          self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
636 be760ba8 Michael Hanselmann
          self.assert_(job.start_timestamp)
637 be760ba8 Michael Hanselmann
          self.assertFalse(job.end_timestamp)
638 be760ba8 Michael Hanselmann
639 ebb2a2a3 Michael Hanselmann
      self.assertRaises(IndexError, queue.GetNextUpdate)
640 ebb2a2a3 Michael Hanselmann
641 be760ba8 Michael Hanselmann
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_SUCCESS)
642 be760ba8 Michael Hanselmann
      self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_SUCCESS])
643 be760ba8 Michael Hanselmann
      self.assertEqual(job.GetInfo(["opresult"]),
644 be760ba8 Michael Hanselmann
                       [[op.input.result for op in job.ops]])
645 be760ba8 Michael Hanselmann
      self.assertEqual(job.GetInfo(["opstatus"]),
646 be760ba8 Michael Hanselmann
                       [len(job.ops) * [constants.OP_STATUS_SUCCESS]])
647 be760ba8 Michael Hanselmann
      self.assert_(compat.all(op.start_timestamp and op.end_timestamp
648 be760ba8 Michael Hanselmann
                              for op in job.ops))
649 be760ba8 Michael Hanselmann
650 be760ba8 Michael Hanselmann
      self._GenericCheckJob(job)
651 be760ba8 Michael Hanselmann
652 66bd7445 Michael Hanselmann
      # Calling the processor on a finished job should be a no-op
653 75d81fc8 Michael Hanselmann
      self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
654 75d81fc8 Michael Hanselmann
                       jqueue._JobProcessor.FINISHED)
655 66bd7445 Michael Hanselmann
      self.assertRaises(IndexError, queue.GetNextUpdate)
656 be760ba8 Michael Hanselmann
657 be760ba8 Michael Hanselmann
  def testOpcodeError(self):
658 be760ba8 Michael Hanselmann
    queue = _FakeQueueForProc()
659 be760ba8 Michael Hanselmann
660 be760ba8 Michael Hanselmann
    testdata = [
661 be760ba8 Michael Hanselmann
      (17077, 1, 0, 0),
662 be760ba8 Michael Hanselmann
      (1782, 5, 2, 2),
663 be760ba8 Michael Hanselmann
      (18179, 10, 9, 9),
664 be760ba8 Michael Hanselmann
      (4744, 10, 3, 8),
665 be760ba8 Michael Hanselmann
      (23816, 100, 39, 45),
666 be760ba8 Michael Hanselmann
      ]
667 be760ba8 Michael Hanselmann
668 be760ba8 Michael Hanselmann
    for (job_id, opcount, failfrom, failto) in testdata:
669 be760ba8 Michael Hanselmann
      # Prepare opcodes
670 be760ba8 Michael Hanselmann
      ops = [opcodes.OpTestDummy(result="Res%s" % i,
671 be760ba8 Michael Hanselmann
                                 fail=(failfrom <= i and
672 be760ba8 Michael Hanselmann
                                       i <= failto))
673 be760ba8 Michael Hanselmann
             for i in range(opcount)]
674 be760ba8 Michael Hanselmann
675 be760ba8 Michael Hanselmann
      # Create job
676 be760ba8 Michael Hanselmann
      job = self._CreateJob(queue, job_id, ops)
677 be760ba8 Michael Hanselmann
678 ebb2a2a3 Michael Hanselmann
      opexec = _FakeExecOpCodeForProc(queue, None, None)
679 be760ba8 Michael Hanselmann
680 be760ba8 Michael Hanselmann
      for idx in range(len(ops)):
681 ebb2a2a3 Michael Hanselmann
        self.assertRaises(IndexError, queue.GetNextUpdate)
682 be760ba8 Michael Hanselmann
        result = jqueue._JobProcessor(queue, opexec, job)()
683 ebb2a2a3 Michael Hanselmann
        # queued to waitlock
684 ebb2a2a3 Michael Hanselmann
        self.assertEqual(queue.GetNextUpdate(), (job, True))
685 ebb2a2a3 Michael Hanselmann
        # waitlock to running
686 ebb2a2a3 Michael Hanselmann
        self.assertEqual(queue.GetNextUpdate(), (job, True))
687 ebb2a2a3 Michael Hanselmann
        # Opcode result
688 ebb2a2a3 Michael Hanselmann
        self.assertEqual(queue.GetNextUpdate(), (job, True))
689 ebb2a2a3 Michael Hanselmann
        self.assertRaises(IndexError, queue.GetNextUpdate)
690 be760ba8 Michael Hanselmann
691 be760ba8 Michael Hanselmann
        if idx in (failfrom, len(ops) - 1):
692 be760ba8 Michael Hanselmann
          # Last opcode
693 75d81fc8 Michael Hanselmann
          self.assertEqual(result, jqueue._JobProcessor.FINISHED)
694 be760ba8 Michael Hanselmann
          break
695 be760ba8 Michael Hanselmann
696 75d81fc8 Michael Hanselmann
        self.assertEqual(result, jqueue._JobProcessor.DEFER)
697 be760ba8 Michael Hanselmann
698 be760ba8 Michael Hanselmann
        self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
699 be760ba8 Michael Hanselmann
700 ebb2a2a3 Michael Hanselmann
      self.assertRaises(IndexError, queue.GetNextUpdate)
701 ebb2a2a3 Michael Hanselmann
702 be760ba8 Michael Hanselmann
      # Check job status
703 be760ba8 Michael Hanselmann
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_ERROR)
704 be760ba8 Michael Hanselmann
      self.assertEqual(job.GetInfo(["id"]), [job_id])
705 be760ba8 Michael Hanselmann
      self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_ERROR])
706 be760ba8 Michael Hanselmann
707 be760ba8 Michael Hanselmann
      # Check opcode status
708 be760ba8 Michael Hanselmann
      data = zip(job.ops,
709 be760ba8 Michael Hanselmann
                 job.GetInfo(["opstatus"])[0],
710 be760ba8 Michael Hanselmann
                 job.GetInfo(["opresult"])[0])
711 be760ba8 Michael Hanselmann
712 be760ba8 Michael Hanselmann
      for idx, (op, opstatus, opresult) in enumerate(data):
713 be760ba8 Michael Hanselmann
        if idx < failfrom:
714 be760ba8 Michael Hanselmann
          assert not op.input.fail
715 be760ba8 Michael Hanselmann
          self.assertEqual(opstatus, constants.OP_STATUS_SUCCESS)
716 be760ba8 Michael Hanselmann
          self.assertEqual(opresult, op.input.result)
717 be760ba8 Michael Hanselmann
        elif idx <= failto:
718 be760ba8 Michael Hanselmann
          assert op.input.fail
719 be760ba8 Michael Hanselmann
          self.assertEqual(opstatus, constants.OP_STATUS_ERROR)
720 be760ba8 Michael Hanselmann
          self.assertRaises(errors.OpExecError, errors.MaybeRaise, opresult)
721 be760ba8 Michael Hanselmann
        else:
722 be760ba8 Michael Hanselmann
          assert not op.input.fail
723 be760ba8 Michael Hanselmann
          self.assertEqual(opstatus, constants.OP_STATUS_ERROR)
724 be760ba8 Michael Hanselmann
          self.assertRaises(errors.OpExecError, errors.MaybeRaise, opresult)
725 be760ba8 Michael Hanselmann
726 be760ba8 Michael Hanselmann
      self.assert_(compat.all(op.start_timestamp and op.end_timestamp
727 be760ba8 Michael Hanselmann
                              for op in job.ops[:failfrom]))
728 be760ba8 Michael Hanselmann
729 be760ba8 Michael Hanselmann
      self._GenericCheckJob(job)
730 be760ba8 Michael Hanselmann
731 66bd7445 Michael Hanselmann
      # Calling the processor on a finished job should be a no-op
732 75d81fc8 Michael Hanselmann
      self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
733 75d81fc8 Michael Hanselmann
                       jqueue._JobProcessor.FINISHED)
734 66bd7445 Michael Hanselmann
      self.assertRaises(IndexError, queue.GetNextUpdate)
735 be760ba8 Michael Hanselmann
736 be760ba8 Michael Hanselmann
  def testCancelWhileInQueue(self):
737 be760ba8 Michael Hanselmann
    queue = _FakeQueueForProc()
738 be760ba8 Michael Hanselmann
739 be760ba8 Michael Hanselmann
    ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
740 be760ba8 Michael Hanselmann
           for i in range(5)]
741 be760ba8 Michael Hanselmann
742 be760ba8 Michael Hanselmann
    # Create job
743 be760ba8 Michael Hanselmann
    job_id = 17045
744 be760ba8 Michael Hanselmann
    job = self._CreateJob(queue, job_id, ops)
745 be760ba8 Michael Hanselmann
746 be760ba8 Michael Hanselmann
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
747 be760ba8 Michael Hanselmann
748 be760ba8 Michael Hanselmann
    # Mark as cancelled
749 be760ba8 Michael Hanselmann
    (success, _) = job.Cancel()
750 be760ba8 Michael Hanselmann
    self.assert_(success)
751 be760ba8 Michael Hanselmann
752 ebb2a2a3 Michael Hanselmann
    self.assertRaises(IndexError, queue.GetNextUpdate)
753 ebb2a2a3 Michael Hanselmann
754 66bd7445 Michael Hanselmann
    self.assertFalse(job.start_timestamp)
755 66bd7445 Michael Hanselmann
    self.assertTrue(job.end_timestamp)
756 be760ba8 Michael Hanselmann
    self.assert_(compat.all(op.status == constants.OP_STATUS_CANCELED
757 be760ba8 Michael Hanselmann
                            for op in job.ops))
758 be760ba8 Michael Hanselmann
759 66bd7445 Michael Hanselmann
    # Serialize to check for differences
760 66bd7445 Michael Hanselmann
    before_proc = job.Serialize()
761 66bd7445 Michael Hanselmann
762 66bd7445 Michael Hanselmann
    # Simulate processor called in workerpool
763 ebb2a2a3 Michael Hanselmann
    opexec = _FakeExecOpCodeForProc(queue, None, None)
764 75d81fc8 Michael Hanselmann
    self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
765 75d81fc8 Michael Hanselmann
                     jqueue._JobProcessor.FINISHED)
766 30c945d0 Michael Hanselmann
767 30c945d0 Michael Hanselmann
    # Check result
768 30c945d0 Michael Hanselmann
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_CANCELED)
769 30c945d0 Michael Hanselmann
    self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_CANCELED])
770 30c945d0 Michael Hanselmann
    self.assertFalse(job.start_timestamp)
771 66bd7445 Michael Hanselmann
    self.assertTrue(job.end_timestamp)
772 30c945d0 Michael Hanselmann
    self.assertFalse(compat.any(op.start_timestamp or op.end_timestamp
773 30c945d0 Michael Hanselmann
                                for op in job.ops))
774 30c945d0 Michael Hanselmann
    self.assertEqual(job.GetInfo(["opstatus", "opresult"]),
775 30c945d0 Michael Hanselmann
                     [[constants.OP_STATUS_CANCELED for _ in job.ops],
776 30c945d0 Michael Hanselmann
                      ["Job canceled by request" for _ in job.ops]])
777 30c945d0 Michael Hanselmann
778 66bd7445 Michael Hanselmann
    # Must not have changed or written
779 66bd7445 Michael Hanselmann
    self.assertEqual(before_proc, job.Serialize())
780 66bd7445 Michael Hanselmann
    self.assertRaises(IndexError, queue.GetNextUpdate)
781 66bd7445 Michael Hanselmann
782 30c945d0 Michael Hanselmann
  def testCancelWhileWaitlockInQueue(self):
783 30c945d0 Michael Hanselmann
    queue = _FakeQueueForProc()
784 30c945d0 Michael Hanselmann
785 30c945d0 Michael Hanselmann
    ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
786 30c945d0 Michael Hanselmann
           for i in range(5)]
787 30c945d0 Michael Hanselmann
788 30c945d0 Michael Hanselmann
    # Create job
789 30c945d0 Michael Hanselmann
    job_id = 8645
790 30c945d0 Michael Hanselmann
    job = self._CreateJob(queue, job_id, ops)
791 30c945d0 Michael Hanselmann
792 30c945d0 Michael Hanselmann
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
793 30c945d0 Michael Hanselmann
794 47099cd1 Michael Hanselmann
    job.ops[0].status = constants.OP_STATUS_WAITING
795 30c945d0 Michael Hanselmann
796 30c945d0 Michael Hanselmann
    assert len(job.ops) == 5
797 30c945d0 Michael Hanselmann
798 47099cd1 Michael Hanselmann
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITING)
799 30c945d0 Michael Hanselmann
800 30c945d0 Michael Hanselmann
    # Mark as cancelling
801 30c945d0 Michael Hanselmann
    (success, _) = job.Cancel()
802 30c945d0 Michael Hanselmann
    self.assert_(success)
803 30c945d0 Michael Hanselmann
804 30c945d0 Michael Hanselmann
    self.assertRaises(IndexError, queue.GetNextUpdate)
805 30c945d0 Michael Hanselmann
806 30c945d0 Michael Hanselmann
    self.assert_(compat.all(op.status == constants.OP_STATUS_CANCELING
807 30c945d0 Michael Hanselmann
                            for op in job.ops))
808 30c945d0 Michael Hanselmann
809 30c945d0 Michael Hanselmann
    opexec = _FakeExecOpCodeForProc(queue, None, None)
810 75d81fc8 Michael Hanselmann
    self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
811 75d81fc8 Michael Hanselmann
                     jqueue._JobProcessor.FINISHED)
812 be760ba8 Michael Hanselmann
813 be760ba8 Michael Hanselmann
    # Check result
814 be760ba8 Michael Hanselmann
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_CANCELED)
815 be760ba8 Michael Hanselmann
    self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_CANCELED])
816 be760ba8 Michael Hanselmann
    self.assertFalse(job.start_timestamp)
817 be760ba8 Michael Hanselmann
    self.assert_(job.end_timestamp)
818 be760ba8 Michael Hanselmann
    self.assertFalse(compat.any(op.start_timestamp or op.end_timestamp
819 be760ba8 Michael Hanselmann
                                for op in job.ops))
820 be760ba8 Michael Hanselmann
    self.assertEqual(job.GetInfo(["opstatus", "opresult"]),
821 be760ba8 Michael Hanselmann
                     [[constants.OP_STATUS_CANCELED for _ in job.ops],
822 be760ba8 Michael Hanselmann
                      ["Job canceled by request" for _ in job.ops]])
823 be760ba8 Michael Hanselmann
824 be760ba8 Michael Hanselmann
  def testCancelWhileWaitlock(self):
825 be760ba8 Michael Hanselmann
    queue = _FakeQueueForProc()
826 be760ba8 Michael Hanselmann
827 be760ba8 Michael Hanselmann
    ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
828 be760ba8 Michael Hanselmann
           for i in range(5)]
829 be760ba8 Michael Hanselmann
830 be760ba8 Michael Hanselmann
    # Create job
831 be760ba8 Michael Hanselmann
    job_id = 11009
832 be760ba8 Michael Hanselmann
    job = self._CreateJob(queue, job_id, ops)
833 be760ba8 Michael Hanselmann
834 be760ba8 Michael Hanselmann
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
835 be760ba8 Michael Hanselmann
836 f23db633 Michael Hanselmann
    def _BeforeStart(timeout, priority):
837 ebb2a2a3 Michael Hanselmann
      self.assertEqual(queue.GetNextUpdate(), (job, True))
838 ebb2a2a3 Michael Hanselmann
      self.assertRaises(IndexError, queue.GetNextUpdate)
839 be760ba8 Michael Hanselmann
      self.assertFalse(queue.IsAcquired())
840 47099cd1 Michael Hanselmann
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITING)
841 be760ba8 Michael Hanselmann
842 be760ba8 Michael Hanselmann
      # Mark as cancelled
843 be760ba8 Michael Hanselmann
      (success, _) = job.Cancel()
844 be760ba8 Michael Hanselmann
      self.assert_(success)
845 be760ba8 Michael Hanselmann
846 be760ba8 Michael Hanselmann
      self.assert_(compat.all(op.status == constants.OP_STATUS_CANCELING
847 be760ba8 Michael Hanselmann
                              for op in job.ops))
848 ebb2a2a3 Michael Hanselmann
      self.assertRaises(IndexError, queue.GetNextUpdate)
849 be760ba8 Michael Hanselmann
850 be760ba8 Michael Hanselmann
    def _AfterStart(op, cbs):
851 ebb2a2a3 Michael Hanselmann
      self.assertEqual(queue.GetNextUpdate(), (job, True))
852 ebb2a2a3 Michael Hanselmann
      self.assertRaises(IndexError, queue.GetNextUpdate)
853 be760ba8 Michael Hanselmann
      self.assertFalse(queue.IsAcquired())
854 be760ba8 Michael Hanselmann
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
855 be760ba8 Michael Hanselmann
856 ebb2a2a3 Michael Hanselmann
    opexec = _FakeExecOpCodeForProc(queue, _BeforeStart, _AfterStart)
857 be760ba8 Michael Hanselmann
858 ebb2a2a3 Michael Hanselmann
    self.assertRaises(IndexError, queue.GetNextUpdate)
859 75d81fc8 Michael Hanselmann
    self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
860 75d81fc8 Michael Hanselmann
                     jqueue._JobProcessor.FINISHED)
861 ebb2a2a3 Michael Hanselmann
    self.assertEqual(queue.GetNextUpdate(), (job, True))
862 ebb2a2a3 Michael Hanselmann
    self.assertRaises(IndexError, queue.GetNextUpdate)
863 be760ba8 Michael Hanselmann
864 be760ba8 Michael Hanselmann
    # Check result
865 be760ba8 Michael Hanselmann
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_CANCELED)
866 be760ba8 Michael Hanselmann
    self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_CANCELED])
867 be760ba8 Michael Hanselmann
    self.assert_(job.start_timestamp)
868 be760ba8 Michael Hanselmann
    self.assert_(job.end_timestamp)
869 be760ba8 Michael Hanselmann
    self.assertFalse(compat.all(op.start_timestamp and op.end_timestamp
870 be760ba8 Michael Hanselmann
                                for op in job.ops))
871 be760ba8 Michael Hanselmann
    self.assertEqual(job.GetInfo(["opstatus", "opresult"]),
872 be760ba8 Michael Hanselmann
                     [[constants.OP_STATUS_CANCELED for _ in job.ops],
873 be760ba8 Michael Hanselmann
                      ["Job canceled by request" for _ in job.ops]])
874 be760ba8 Michael Hanselmann
875 9e49dfc5 Michael Hanselmann
  def testCancelWhileWaitlockWithTimeout(self):
876 9e49dfc5 Michael Hanselmann
    queue = _FakeQueueForProc()
877 9e49dfc5 Michael Hanselmann
878 9e49dfc5 Michael Hanselmann
    ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
879 9e49dfc5 Michael Hanselmann
           for i in range(5)]
880 9e49dfc5 Michael Hanselmann
881 9e49dfc5 Michael Hanselmann
    # Create job
882 9e49dfc5 Michael Hanselmann
    job_id = 24314
883 9e49dfc5 Michael Hanselmann
    job = self._CreateJob(queue, job_id, ops)
884 9e49dfc5 Michael Hanselmann
885 9e49dfc5 Michael Hanselmann
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
886 9e49dfc5 Michael Hanselmann
887 9e49dfc5 Michael Hanselmann
    def _BeforeStart(timeout, priority):
888 9e49dfc5 Michael Hanselmann
      self.assertFalse(queue.IsAcquired())
889 47099cd1 Michael Hanselmann
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITING)
890 9e49dfc5 Michael Hanselmann
891 9e49dfc5 Michael Hanselmann
      # Mark as cancelled
892 9e49dfc5 Michael Hanselmann
      (success, _) = job.Cancel()
893 9e49dfc5 Michael Hanselmann
      self.assert_(success)
894 9e49dfc5 Michael Hanselmann
895 9e49dfc5 Michael Hanselmann
      self.assert_(compat.all(op.status == constants.OP_STATUS_CANCELING
896 9e49dfc5 Michael Hanselmann
                              for op in job.ops))
897 9e49dfc5 Michael Hanselmann
898 9e49dfc5 Michael Hanselmann
      # Fake an acquire attempt timing out
899 9e49dfc5 Michael Hanselmann
      raise mcpu.LockAcquireTimeout()
900 9e49dfc5 Michael Hanselmann
901 9e49dfc5 Michael Hanselmann
    def _AfterStart(op, cbs):
902 9e49dfc5 Michael Hanselmann
      self.fail("Should not reach this")
903 9e49dfc5 Michael Hanselmann
904 ebb2a2a3 Michael Hanselmann
    opexec = _FakeExecOpCodeForProc(queue, _BeforeStart, _AfterStart)
905 9e49dfc5 Michael Hanselmann
906 75d81fc8 Michael Hanselmann
    self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
907 75d81fc8 Michael Hanselmann
                     jqueue._JobProcessor.FINISHED)
908 9e49dfc5 Michael Hanselmann
909 9e49dfc5 Michael Hanselmann
    # Check result
910 9e49dfc5 Michael Hanselmann
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_CANCELED)
911 9e49dfc5 Michael Hanselmann
    self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_CANCELED])
912 9e49dfc5 Michael Hanselmann
    self.assert_(job.start_timestamp)
913 9e49dfc5 Michael Hanselmann
    self.assert_(job.end_timestamp)
914 9e49dfc5 Michael Hanselmann
    self.assertFalse(compat.all(op.start_timestamp and op.end_timestamp
915 9e49dfc5 Michael Hanselmann
                                for op in job.ops))
916 9e49dfc5 Michael Hanselmann
    self.assertEqual(job.GetInfo(["opstatus", "opresult"]),
917 9e49dfc5 Michael Hanselmann
                     [[constants.OP_STATUS_CANCELED for _ in job.ops],
918 9e49dfc5 Michael Hanselmann
                      ["Job canceled by request" for _ in job.ops]])
919 9e49dfc5 Michael Hanselmann
920 be760ba8 Michael Hanselmann
  def testCancelWhileRunning(self):
921 be760ba8 Michael Hanselmann
    # Tests canceling a job with finished opcodes and more, unprocessed ones
922 be760ba8 Michael Hanselmann
    queue = _FakeQueueForProc()
923 be760ba8 Michael Hanselmann
924 be760ba8 Michael Hanselmann
    ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
925 be760ba8 Michael Hanselmann
           for i in range(3)]
926 be760ba8 Michael Hanselmann
927 be760ba8 Michael Hanselmann
    # Create job
928 be760ba8 Michael Hanselmann
    job_id = 28492
929 be760ba8 Michael Hanselmann
    job = self._CreateJob(queue, job_id, ops)
930 be760ba8 Michael Hanselmann
931 be760ba8 Michael Hanselmann
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
932 be760ba8 Michael Hanselmann
933 ebb2a2a3 Michael Hanselmann
    opexec = _FakeExecOpCodeForProc(queue, None, None)
934 be760ba8 Michael Hanselmann
935 be760ba8 Michael Hanselmann
    # Run one opcode
936 75d81fc8 Michael Hanselmann
    self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
937 75d81fc8 Michael Hanselmann
                     jqueue._JobProcessor.DEFER)
938 be760ba8 Michael Hanselmann
939 be760ba8 Michael Hanselmann
    # Job goes back to queued
940 be760ba8 Michael Hanselmann
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
941 be760ba8 Michael Hanselmann
    self.assertEqual(job.GetInfo(["opstatus", "opresult"]),
942 be760ba8 Michael Hanselmann
                     [[constants.OP_STATUS_SUCCESS,
943 be760ba8 Michael Hanselmann
                       constants.OP_STATUS_QUEUED,
944 be760ba8 Michael Hanselmann
                       constants.OP_STATUS_QUEUED],
945 be760ba8 Michael Hanselmann
                      ["Res0", None, None]])
946 be760ba8 Michael Hanselmann
947 be760ba8 Michael Hanselmann
    # Mark as cancelled
948 be760ba8 Michael Hanselmann
    (success, _) = job.Cancel()
949 be760ba8 Michael Hanselmann
    self.assert_(success)
950 be760ba8 Michael Hanselmann
951 be760ba8 Michael Hanselmann
    # Try processing another opcode (this will actually cancel the job)
952 75d81fc8 Michael Hanselmann
    self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
953 75d81fc8 Michael Hanselmann
                     jqueue._JobProcessor.FINISHED)
954 be760ba8 Michael Hanselmann
955 be760ba8 Michael Hanselmann
    # Check result
956 be760ba8 Michael Hanselmann
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_CANCELED)
957 be760ba8 Michael Hanselmann
    self.assertEqual(job.GetInfo(["id"]), [job_id])
958 be760ba8 Michael Hanselmann
    self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_CANCELED])
959 be760ba8 Michael Hanselmann
    self.assertEqual(job.GetInfo(["opstatus", "opresult"]),
960 be760ba8 Michael Hanselmann
                     [[constants.OP_STATUS_SUCCESS,
961 be760ba8 Michael Hanselmann
                       constants.OP_STATUS_CANCELED,
962 be760ba8 Michael Hanselmann
                       constants.OP_STATUS_CANCELED],
963 be760ba8 Michael Hanselmann
                      ["Res0", "Job canceled by request",
964 be760ba8 Michael Hanselmann
                       "Job canceled by request"]])
965 be760ba8 Michael Hanselmann
966 be760ba8 Michael Hanselmann
  def testPartiallyRun(self):
967 be760ba8 Michael Hanselmann
    # Tests calling the processor on a job that's been partially run before the
968 be760ba8 Michael Hanselmann
    # program was restarted
969 be760ba8 Michael Hanselmann
    queue = _FakeQueueForProc()
970 be760ba8 Michael Hanselmann
971 ebb2a2a3 Michael Hanselmann
    opexec = _FakeExecOpCodeForProc(queue, None, None)
972 be760ba8 Michael Hanselmann
973 be760ba8 Michael Hanselmann
    for job_id, successcount in [(30697, 1), (2552, 4), (12489, 9)]:
974 be760ba8 Michael Hanselmann
      ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
975 be760ba8 Michael Hanselmann
             for i in range(10)]
976 be760ba8 Michael Hanselmann
977 be760ba8 Michael Hanselmann
      # Create job
978 be760ba8 Michael Hanselmann
      job = self._CreateJob(queue, job_id, ops)
979 be760ba8 Michael Hanselmann
980 be760ba8 Michael Hanselmann
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
981 be760ba8 Michael Hanselmann
982 be760ba8 Michael Hanselmann
      for _ in range(successcount):
983 75d81fc8 Michael Hanselmann
        self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
984 75d81fc8 Michael Hanselmann
                         jqueue._JobProcessor.DEFER)
985 be760ba8 Michael Hanselmann
986 be760ba8 Michael Hanselmann
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
987 be760ba8 Michael Hanselmann
      self.assertEqual(job.GetInfo(["opstatus"]),
988 be760ba8 Michael Hanselmann
                       [[constants.OP_STATUS_SUCCESS
989 be760ba8 Michael Hanselmann
                         for _ in range(successcount)] +
990 be760ba8 Michael Hanselmann
                        [constants.OP_STATUS_QUEUED
991 be760ba8 Michael Hanselmann
                         for _ in range(len(ops) - successcount)]])
992 be760ba8 Michael Hanselmann
993 03b63608 Michael Hanselmann
      self.assert_(job.ops_iter)
994 be760ba8 Michael Hanselmann
995 be760ba8 Michael Hanselmann
      # Serialize and restore (simulates program restart)
996 c0f6d0d8 Michael Hanselmann
      newjob = jqueue._QueuedJob.Restore(queue, job.Serialize(), True)
997 03b63608 Michael Hanselmann
      self.assertFalse(newjob.ops_iter)
998 be760ba8 Michael Hanselmann
      self._TestPartial(newjob, successcount)
999 be760ba8 Michael Hanselmann
1000 be760ba8 Michael Hanselmann
  def _TestPartial(self, job, successcount):
1001 be760ba8 Michael Hanselmann
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1002 be760ba8 Michael Hanselmann
    self.assertEqual(job.start_timestamp, job.ops[0].start_timestamp)
1003 be760ba8 Michael Hanselmann
1004 be760ba8 Michael Hanselmann
    queue = _FakeQueueForProc()
1005 ebb2a2a3 Michael Hanselmann
    opexec = _FakeExecOpCodeForProc(queue, None, None)
1006 be760ba8 Michael Hanselmann
1007 be760ba8 Michael Hanselmann
    for remaining in reversed(range(len(job.ops) - successcount)):
1008 be760ba8 Michael Hanselmann
      result = jqueue._JobProcessor(queue, opexec, job)()
1009 66bd7445 Michael Hanselmann
      self.assertEqual(queue.GetNextUpdate(), (job, True))
1010 66bd7445 Michael Hanselmann
      self.assertEqual(queue.GetNextUpdate(), (job, True))
1011 66bd7445 Michael Hanselmann
      self.assertEqual(queue.GetNextUpdate(), (job, True))
1012 66bd7445 Michael Hanselmann
      self.assertRaises(IndexError, queue.GetNextUpdate)
1013 be760ba8 Michael Hanselmann
1014 be760ba8 Michael Hanselmann
      if remaining == 0:
1015 be760ba8 Michael Hanselmann
        # Last opcode
1016 75d81fc8 Michael Hanselmann
        self.assertEqual(result, jqueue._JobProcessor.FINISHED)
1017 be760ba8 Michael Hanselmann
        break
1018 be760ba8 Michael Hanselmann
1019 75d81fc8 Michael Hanselmann
      self.assertEqual(result, jqueue._JobProcessor.DEFER)
1020 be760ba8 Michael Hanselmann
1021 be760ba8 Michael Hanselmann
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1022 be760ba8 Michael Hanselmann
1023 66bd7445 Michael Hanselmann
    self.assertRaises(IndexError, queue.GetNextUpdate)
1024 be760ba8 Michael Hanselmann
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_SUCCESS)
1025 be760ba8 Michael Hanselmann
    self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_SUCCESS])
1026 be760ba8 Michael Hanselmann
    self.assertEqual(job.GetInfo(["opresult"]),
1027 be760ba8 Michael Hanselmann
                     [[op.input.result for op in job.ops]])
1028 be760ba8 Michael Hanselmann
    self.assertEqual(job.GetInfo(["opstatus"]),
1029 be760ba8 Michael Hanselmann
                     [[constants.OP_STATUS_SUCCESS for _ in job.ops]])
1030 be760ba8 Michael Hanselmann
    self.assert_(compat.all(op.start_timestamp and op.end_timestamp
1031 be760ba8 Michael Hanselmann
                            for op in job.ops))
1032 be760ba8 Michael Hanselmann
1033 be760ba8 Michael Hanselmann
    self._GenericCheckJob(job)
1034 be760ba8 Michael Hanselmann
1035 66bd7445 Michael Hanselmann
    # Calling the processor on a finished job should be a no-op
1036 75d81fc8 Michael Hanselmann
    self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
1037 75d81fc8 Michael Hanselmann
                     jqueue._JobProcessor.FINISHED)
1038 66bd7445 Michael Hanselmann
    self.assertRaises(IndexError, queue.GetNextUpdate)
1039 be760ba8 Michael Hanselmann
1040 be760ba8 Michael Hanselmann
    # ... also after being restored
1041 c0f6d0d8 Michael Hanselmann
    job2 = jqueue._QueuedJob.Restore(queue, job.Serialize(), True)
1042 b95479a5 Michael Hanselmann
    # Calling the processor on a finished job should be a no-op
1043 75d81fc8 Michael Hanselmann
    self.assertEqual(jqueue._JobProcessor(queue, opexec, job2)(),
1044 75d81fc8 Michael Hanselmann
                     jqueue._JobProcessor.FINISHED)
1045 66bd7445 Michael Hanselmann
    self.assertRaises(IndexError, queue.GetNextUpdate)
1046 be760ba8 Michael Hanselmann
1047 be760ba8 Michael Hanselmann
  def testProcessorOnRunningJob(self):
1048 be760ba8 Michael Hanselmann
    ops = [opcodes.OpTestDummy(result="result", fail=False)]
1049 be760ba8 Michael Hanselmann
1050 be760ba8 Michael Hanselmann
    queue = _FakeQueueForProc()
1051 ebb2a2a3 Michael Hanselmann
    opexec = _FakeExecOpCodeForProc(queue, None, None)
1052 be760ba8 Michael Hanselmann
1053 be760ba8 Michael Hanselmann
    # Create job
1054 be760ba8 Michael Hanselmann
    job = self._CreateJob(queue, 9571, ops)
1055 be760ba8 Michael Hanselmann
1056 be760ba8 Michael Hanselmann
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1057 be760ba8 Michael Hanselmann
1058 be760ba8 Michael Hanselmann
    job.ops[0].status = constants.OP_STATUS_RUNNING
1059 be760ba8 Michael Hanselmann
1060 be760ba8 Michael Hanselmann
    assert len(job.ops) == 1
1061 be760ba8 Michael Hanselmann
1062 be760ba8 Michael Hanselmann
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
1063 be760ba8 Michael Hanselmann
1064 be760ba8 Michael Hanselmann
    # Calling on running job must fail
1065 be760ba8 Michael Hanselmann
    self.assertRaises(errors.ProgrammerError,
1066 be760ba8 Michael Hanselmann
                      jqueue._JobProcessor(queue, opexec, job))
1067 be760ba8 Michael Hanselmann
1068 be760ba8 Michael Hanselmann
  def testLogMessages(self):
1069 be760ba8 Michael Hanselmann
    # Tests the "Feedback" callback function
1070 be760ba8 Michael Hanselmann
    queue = _FakeQueueForProc()
1071 be760ba8 Michael Hanselmann
1072 be760ba8 Michael Hanselmann
    messages = {
1073 be760ba8 Michael Hanselmann
      1: [
1074 be760ba8 Michael Hanselmann
        (None, "Hello"),
1075 be760ba8 Michael Hanselmann
        (None, "World"),
1076 be760ba8 Michael Hanselmann
        (constants.ELOG_MESSAGE, "there"),
1077 be760ba8 Michael Hanselmann
        ],
1078 be760ba8 Michael Hanselmann
      4: [
1079 be760ba8 Michael Hanselmann
        (constants.ELOG_JQUEUE_TEST, (1, 2, 3)),
1080 be760ba8 Michael Hanselmann
        (constants.ELOG_JQUEUE_TEST, ("other", "type")),
1081 be760ba8 Michael Hanselmann
        ],
1082 be760ba8 Michael Hanselmann
      }
1083 be760ba8 Michael Hanselmann
    ops = [opcodes.OpTestDummy(result="Logtest%s" % i, fail=False,
1084 be760ba8 Michael Hanselmann
                               messages=messages.get(i, []))
1085 be760ba8 Michael Hanselmann
           for i in range(5)]
1086 be760ba8 Michael Hanselmann
1087 be760ba8 Michael Hanselmann
    # Create job
1088 be760ba8 Michael Hanselmann
    job = self._CreateJob(queue, 29386, ops)
1089 be760ba8 Michael Hanselmann
1090 f23db633 Michael Hanselmann
    def _BeforeStart(timeout, priority):
1091 ebb2a2a3 Michael Hanselmann
      self.assertEqual(queue.GetNextUpdate(), (job, True))
1092 ebb2a2a3 Michael Hanselmann
      self.assertRaises(IndexError, queue.GetNextUpdate)
1093 be760ba8 Michael Hanselmann
      self.assertFalse(queue.IsAcquired())
1094 47099cd1 Michael Hanselmann
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITING)
1095 be760ba8 Michael Hanselmann
1096 be760ba8 Michael Hanselmann
    def _AfterStart(op, cbs):
1097 ebb2a2a3 Michael Hanselmann
      self.assertEqual(queue.GetNextUpdate(), (job, True))
1098 ebb2a2a3 Michael Hanselmann
      self.assertRaises(IndexError, queue.GetNextUpdate)
1099 be760ba8 Michael Hanselmann
      self.assertFalse(queue.IsAcquired())
1100 be760ba8 Michael Hanselmann
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
1101 be760ba8 Michael Hanselmann
1102 be760ba8 Michael Hanselmann
      self.assertRaises(AssertionError, cbs.Feedback,
1103 be760ba8 Michael Hanselmann
                        "too", "many", "arguments")
1104 be760ba8 Michael Hanselmann
1105 be760ba8 Michael Hanselmann
      for (log_type, msg) in op.messages:
1106 ebb2a2a3 Michael Hanselmann
        self.assertRaises(IndexError, queue.GetNextUpdate)
1107 be760ba8 Michael Hanselmann
        if log_type:
1108 be760ba8 Michael Hanselmann
          cbs.Feedback(log_type, msg)
1109 be760ba8 Michael Hanselmann
        else:
1110 be760ba8 Michael Hanselmann
          cbs.Feedback(msg)
1111 ebb2a2a3 Michael Hanselmann
        # Check for job update without replication
1112 ebb2a2a3 Michael Hanselmann
        self.assertEqual(queue.GetNextUpdate(), (job, False))
1113 ebb2a2a3 Michael Hanselmann
        self.assertRaises(IndexError, queue.GetNextUpdate)
1114 be760ba8 Michael Hanselmann
1115 ebb2a2a3 Michael Hanselmann
    opexec = _FakeExecOpCodeForProc(queue, _BeforeStart, _AfterStart)
1116 be760ba8 Michael Hanselmann
1117 be760ba8 Michael Hanselmann
    for remaining in reversed(range(len(job.ops))):
1118 ebb2a2a3 Michael Hanselmann
      self.assertRaises(IndexError, queue.GetNextUpdate)
1119 be760ba8 Michael Hanselmann
      result = jqueue._JobProcessor(queue, opexec, job)()
1120 ebb2a2a3 Michael Hanselmann
      self.assertEqual(queue.GetNextUpdate(), (job, True))
1121 ebb2a2a3 Michael Hanselmann
      self.assertRaises(IndexError, queue.GetNextUpdate)
1122 be760ba8 Michael Hanselmann
1123 be760ba8 Michael Hanselmann
      if remaining == 0:
1124 be760ba8 Michael Hanselmann
        # Last opcode
1125 75d81fc8 Michael Hanselmann
        self.assertEqual(result, jqueue._JobProcessor.FINISHED)
1126 be760ba8 Michael Hanselmann
        break
1127 be760ba8 Michael Hanselmann
1128 75d81fc8 Michael Hanselmann
      self.assertEqual(result, jqueue._JobProcessor.DEFER)
1129 be760ba8 Michael Hanselmann
1130 be760ba8 Michael Hanselmann
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1131 be760ba8 Michael Hanselmann
1132 ebb2a2a3 Michael Hanselmann
    self.assertRaises(IndexError, queue.GetNextUpdate)
1133 ebb2a2a3 Michael Hanselmann
1134 be760ba8 Michael Hanselmann
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_SUCCESS)
1135 be760ba8 Michael Hanselmann
    self.assertEqual(job.GetInfo(["opresult"]),
1136 be760ba8 Michael Hanselmann
                     [[op.input.result for op in job.ops]])
1137 be760ba8 Michael Hanselmann
1138 be760ba8 Michael Hanselmann
    logmsgcount = sum(len(m) for m in messages.values())
1139 be760ba8 Michael Hanselmann
1140 be760ba8 Michael Hanselmann
    self._CheckLogMessages(job, logmsgcount)
1141 be760ba8 Michael Hanselmann
1142 be760ba8 Michael Hanselmann
    # Serialize and restore (simulates program restart)
1143 c0f6d0d8 Michael Hanselmann
    newjob = jqueue._QueuedJob.Restore(queue, job.Serialize(), True)
1144 be760ba8 Michael Hanselmann
    self._CheckLogMessages(newjob, logmsgcount)
1145 be760ba8 Michael Hanselmann
1146 be760ba8 Michael Hanselmann
    # Check each message
1147 be760ba8 Michael Hanselmann
    prevserial = -1
1148 be760ba8 Michael Hanselmann
    for idx, oplog in enumerate(job.GetInfo(["oplog"])[0]):
1149 be760ba8 Michael Hanselmann
      for (serial, timestamp, log_type, msg) in oplog:
1150 be760ba8 Michael Hanselmann
        (exptype, expmsg) = messages.get(idx).pop(0)
1151 be760ba8 Michael Hanselmann
        if exptype:
1152 be760ba8 Michael Hanselmann
          self.assertEqual(log_type, exptype)
1153 be760ba8 Michael Hanselmann
        else:
1154 be760ba8 Michael Hanselmann
          self.assertEqual(log_type, constants.ELOG_MESSAGE)
1155 be760ba8 Michael Hanselmann
        self.assertEqual(expmsg, msg)
1156 be760ba8 Michael Hanselmann
        self.assert_(serial > prevserial)
1157 be760ba8 Michael Hanselmann
        prevserial = serial
1158 be760ba8 Michael Hanselmann
1159 be760ba8 Michael Hanselmann
  def _CheckLogMessages(self, job, count):
1160 be760ba8 Michael Hanselmann
    # Check serial
1161 be760ba8 Michael Hanselmann
    self.assertEqual(job.log_serial, count)
1162 be760ba8 Michael Hanselmann
1163 be760ba8 Michael Hanselmann
    # No filter
1164 be760ba8 Michael Hanselmann
    self.assertEqual(job.GetLogEntries(None),
1165 be760ba8 Michael Hanselmann
                     [entry for entries in job.GetInfo(["oplog"])[0] if entries
1166 be760ba8 Michael Hanselmann
                      for entry in entries])
1167 be760ba8 Michael Hanselmann
1168 be760ba8 Michael Hanselmann
    # Filter with serial
1169 be760ba8 Michael Hanselmann
    assert count > 3
1170 be760ba8 Michael Hanselmann
    self.assert_(job.GetLogEntries(3))
1171 be760ba8 Michael Hanselmann
    self.assertEqual(job.GetLogEntries(3),
1172 be760ba8 Michael Hanselmann
                     [entry for entries in job.GetInfo(["oplog"])[0] if entries
1173 be760ba8 Michael Hanselmann
                      for entry in entries][3:])
1174 be760ba8 Michael Hanselmann
1175 be760ba8 Michael Hanselmann
    # No log message after highest serial
1176 be760ba8 Michael Hanselmann
    self.assertFalse(job.GetLogEntries(count))
1177 be760ba8 Michael Hanselmann
    self.assertFalse(job.GetLogEntries(count + 3))
1178 be760ba8 Michael Hanselmann
1179 6a373640 Michael Hanselmann
  def testSubmitManyJobs(self):
1180 6a373640 Michael Hanselmann
    queue = _FakeQueueForProc()
1181 6a373640 Michael Hanselmann
1182 6a373640 Michael Hanselmann
    job_id = 15656
1183 6a373640 Michael Hanselmann
    ops = [
1184 6a373640 Michael Hanselmann
      opcodes.OpTestDummy(result="Res0", fail=False,
1185 6a373640 Michael Hanselmann
                          submit_jobs=[]),
1186 6a373640 Michael Hanselmann
      opcodes.OpTestDummy(result="Res1", fail=False,
1187 6a373640 Michael Hanselmann
                          submit_jobs=[
1188 6a373640 Michael Hanselmann
                            [opcodes.OpTestDummy(result="r1j0", fail=False)],
1189 6a373640 Michael Hanselmann
                            ]),
1190 6a373640 Michael Hanselmann
      opcodes.OpTestDummy(result="Res2", fail=False,
1191 6a373640 Michael Hanselmann
                          submit_jobs=[
1192 6a373640 Michael Hanselmann
                            [opcodes.OpTestDummy(result="r2j0o0", fail=False),
1193 6a373640 Michael Hanselmann
                             opcodes.OpTestDummy(result="r2j0o1", fail=False),
1194 6a373640 Michael Hanselmann
                             opcodes.OpTestDummy(result="r2j0o2", fail=False),
1195 6a373640 Michael Hanselmann
                             opcodes.OpTestDummy(result="r2j0o3", fail=False)],
1196 6a373640 Michael Hanselmann
                            [opcodes.OpTestDummy(result="r2j1", fail=False)],
1197 6a373640 Michael Hanselmann
                            [opcodes.OpTestDummy(result="r2j3o0", fail=False),
1198 6a373640 Michael Hanselmann
                             opcodes.OpTestDummy(result="r2j3o1", fail=False)],
1199 6a373640 Michael Hanselmann
                            ]),
1200 6a373640 Michael Hanselmann
      ]
1201 6a373640 Michael Hanselmann
1202 6a373640 Michael Hanselmann
    # Create job
1203 6a373640 Michael Hanselmann
    job = self._CreateJob(queue, job_id, ops)
1204 6a373640 Michael Hanselmann
1205 6a373640 Michael Hanselmann
    def _BeforeStart(timeout, priority):
1206 6a373640 Michael Hanselmann
      self.assertEqual(queue.GetNextUpdate(), (job, True))
1207 6a373640 Michael Hanselmann
      self.assertRaises(IndexError, queue.GetNextUpdate)
1208 6a373640 Michael Hanselmann
      self.assertFalse(queue.IsAcquired())
1209 47099cd1 Michael Hanselmann
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITING)
1210 6a373640 Michael Hanselmann
      self.assertFalse(job.cur_opctx)
1211 6a373640 Michael Hanselmann
1212 6a373640 Michael Hanselmann
    def _AfterStart(op, cbs):
1213 6a373640 Michael Hanselmann
      self.assertEqual(queue.GetNextUpdate(), (job, True))
1214 6a373640 Michael Hanselmann
      self.assertRaises(IndexError, queue.GetNextUpdate)
1215 6a373640 Michael Hanselmann
1216 6a373640 Michael Hanselmann
      self.assertFalse(queue.IsAcquired())
1217 6a373640 Michael Hanselmann
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
1218 6a373640 Michael Hanselmann
      self.assertFalse(job.cur_opctx)
1219 6a373640 Michael Hanselmann
1220 6a373640 Michael Hanselmann
      # Job is running, cancelling shouldn't be possible
1221 6a373640 Michael Hanselmann
      (success, _) = job.Cancel()
1222 6a373640 Michael Hanselmann
      self.assertFalse(success)
1223 6a373640 Michael Hanselmann
1224 6a373640 Michael Hanselmann
    opexec = _FakeExecOpCodeForProc(queue, _BeforeStart, _AfterStart)
1225 6a373640 Michael Hanselmann
1226 6a373640 Michael Hanselmann
    for idx in range(len(ops)):
1227 6a373640 Michael Hanselmann
      self.assertRaises(IndexError, queue.GetNextUpdate)
1228 6a373640 Michael Hanselmann
      result = jqueue._JobProcessor(queue, opexec, job)()
1229 6a373640 Michael Hanselmann
      self.assertEqual(queue.GetNextUpdate(), (job, True))
1230 6a373640 Michael Hanselmann
      self.assertRaises(IndexError, queue.GetNextUpdate)
1231 6a373640 Michael Hanselmann
      if idx == len(ops) - 1:
1232 6a373640 Michael Hanselmann
        # Last opcode
1233 75d81fc8 Michael Hanselmann
        self.assertEqual(result, jqueue._JobProcessor.FINISHED)
1234 6a373640 Michael Hanselmann
      else:
1235 75d81fc8 Michael Hanselmann
        self.assertEqual(result, jqueue._JobProcessor.DEFER)
1236 6a373640 Michael Hanselmann
1237 6a373640 Michael Hanselmann
        self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1238 6a373640 Michael Hanselmann
        self.assert_(job.start_timestamp)
1239 6a373640 Michael Hanselmann
        self.assertFalse(job.end_timestamp)
1240 6a373640 Michael Hanselmann
1241 6a373640 Michael Hanselmann
    self.assertRaises(IndexError, queue.GetNextUpdate)
1242 6a373640 Michael Hanselmann
1243 6a373640 Michael Hanselmann
    for idx, submitted_ops in enumerate(job_ops
1244 6a373640 Michael Hanselmann
                                        for op in ops
1245 6a373640 Michael Hanselmann
                                        for job_ops in op.submit_jobs):
1246 6a373640 Michael Hanselmann
      self.assertEqual(queue.GetNextSubmittedJob(),
1247 6a373640 Michael Hanselmann
                       (1000 + idx, submitted_ops))
1248 6a373640 Michael Hanselmann
    self.assertRaises(IndexError, queue.GetNextSubmittedJob)
1249 6a373640 Michael Hanselmann
1250 6a373640 Michael Hanselmann
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_SUCCESS)
1251 6a373640 Michael Hanselmann
    self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_SUCCESS])
1252 6a373640 Michael Hanselmann
    self.assertEqual(job.GetInfo(["opresult"]),
1253 6a373640 Michael Hanselmann
                     [[[], [1000], [1001, 1002, 1003]]])
1254 6a373640 Michael Hanselmann
    self.assertEqual(job.GetInfo(["opstatus"]),
1255 6a373640 Michael Hanselmann
                     [len(job.ops) * [constants.OP_STATUS_SUCCESS]])
1256 6a373640 Michael Hanselmann
1257 6a373640 Michael Hanselmann
    self._GenericCheckJob(job)
1258 6a373640 Michael Hanselmann
1259 1e6d5750 Iustin Pop
    # Calling the processor on a finished job should be a no-op
1260 75d81fc8 Michael Hanselmann
    self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
1261 75d81fc8 Michael Hanselmann
                     jqueue._JobProcessor.FINISHED)
1262 1e6d5750 Iustin Pop
    self.assertRaises(IndexError, queue.GetNextUpdate)
1263 6a373640 Michael Hanselmann
1264 b95479a5 Michael Hanselmann
  def testJobDependency(self):
1265 b95479a5 Michael Hanselmann
    depmgr = _FakeDependencyManager()
1266 b95479a5 Michael Hanselmann
    queue = _FakeQueueForProc(depmgr=depmgr)
1267 b95479a5 Michael Hanselmann
1268 b95479a5 Michael Hanselmann
    self.assertEqual(queue.depmgr, depmgr)
1269 b95479a5 Michael Hanselmann
1270 b95479a5 Michael Hanselmann
    prev_job_id = 22113
1271 b95479a5 Michael Hanselmann
    prev_job_id2 = 28102
1272 b95479a5 Michael Hanselmann
    job_id = 29929
1273 b95479a5 Michael Hanselmann
    ops = [
1274 b95479a5 Michael Hanselmann
      opcodes.OpTestDummy(result="Res0", fail=False,
1275 b95479a5 Michael Hanselmann
                          depends=[
1276 b95479a5 Michael Hanselmann
                            [prev_job_id2, None],
1277 b95479a5 Michael Hanselmann
                            [prev_job_id, None],
1278 b95479a5 Michael Hanselmann
                            ]),
1279 b95479a5 Michael Hanselmann
      opcodes.OpTestDummy(result="Res1", fail=False),
1280 b95479a5 Michael Hanselmann
      ]
1281 b95479a5 Michael Hanselmann
1282 b95479a5 Michael Hanselmann
    # Create job
1283 b95479a5 Michael Hanselmann
    job = self._CreateJob(queue, job_id, ops)
1284 b95479a5 Michael Hanselmann
1285 b95479a5 Michael Hanselmann
    def _BeforeStart(timeout, priority):
1286 b95479a5 Michael Hanselmann
      if attempt == 0 or attempt > 5:
1287 b95479a5 Michael Hanselmann
        # Job should only be updated when it wasn't waiting for another job
1288 b95479a5 Michael Hanselmann
        self.assertEqual(queue.GetNextUpdate(), (job, True))
1289 b95479a5 Michael Hanselmann
      self.assertRaises(IndexError, queue.GetNextUpdate)
1290 b95479a5 Michael Hanselmann
      self.assertFalse(queue.IsAcquired())
1291 47099cd1 Michael Hanselmann
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITING)
1292 b95479a5 Michael Hanselmann
      self.assertFalse(job.cur_opctx)
1293 b95479a5 Michael Hanselmann
1294 b95479a5 Michael Hanselmann
    def _AfterStart(op, cbs):
1295 b95479a5 Michael Hanselmann
      self.assertEqual(queue.GetNextUpdate(), (job, True))
1296 b95479a5 Michael Hanselmann
      self.assertRaises(IndexError, queue.GetNextUpdate)
1297 b95479a5 Michael Hanselmann
1298 b95479a5 Michael Hanselmann
      self.assertFalse(queue.IsAcquired())
1299 b95479a5 Michael Hanselmann
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
1300 b95479a5 Michael Hanselmann
      self.assertFalse(job.cur_opctx)
1301 b95479a5 Michael Hanselmann
1302 b95479a5 Michael Hanselmann
      # Job is running, cancelling shouldn't be possible
1303 b95479a5 Michael Hanselmann
      (success, _) = job.Cancel()
1304 b95479a5 Michael Hanselmann
      self.assertFalse(success)
1305 b95479a5 Michael Hanselmann
1306 b95479a5 Michael Hanselmann
    opexec = _FakeExecOpCodeForProc(queue, _BeforeStart, _AfterStart)
1307 b95479a5 Michael Hanselmann
1308 b95479a5 Michael Hanselmann
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1309 b95479a5 Michael Hanselmann
1310 b95479a5 Michael Hanselmann
    counter = itertools.count()
1311 b95479a5 Michael Hanselmann
    while True:
1312 b95479a5 Michael Hanselmann
      attempt = counter.next()
1313 b95479a5 Michael Hanselmann
1314 b95479a5 Michael Hanselmann
      self.assertRaises(IndexError, queue.GetNextUpdate)
1315 b95479a5 Michael Hanselmann
      self.assertRaises(IndexError, depmgr.GetNextNotification)
1316 b95479a5 Michael Hanselmann
1317 b95479a5 Michael Hanselmann
      if attempt < 2:
1318 b95479a5 Michael Hanselmann
        depmgr.AddCheckResult(job, prev_job_id2, None,
1319 b95479a5 Michael Hanselmann
                              (jqueue._JobDependencyManager.WAIT, "wait2"))
1320 b95479a5 Michael Hanselmann
      elif attempt == 2:
1321 b95479a5 Michael Hanselmann
        depmgr.AddCheckResult(job, prev_job_id2, None,
1322 b95479a5 Michael Hanselmann
                              (jqueue._JobDependencyManager.CONTINUE, "cont"))
1323 b95479a5 Michael Hanselmann
        # The processor will ask for the next dependency immediately
1324 b95479a5 Michael Hanselmann
        depmgr.AddCheckResult(job, prev_job_id, None,
1325 b95479a5 Michael Hanselmann
                              (jqueue._JobDependencyManager.WAIT, "wait"))
1326 b95479a5 Michael Hanselmann
      elif attempt < 5:
1327 b95479a5 Michael Hanselmann
        depmgr.AddCheckResult(job, prev_job_id, None,
1328 b95479a5 Michael Hanselmann
                              (jqueue._JobDependencyManager.WAIT, "wait"))
1329 b95479a5 Michael Hanselmann
      elif attempt == 5:
1330 b95479a5 Michael Hanselmann
        depmgr.AddCheckResult(job, prev_job_id, None,
1331 b95479a5 Michael Hanselmann
                              (jqueue._JobDependencyManager.CONTINUE, "cont"))
1332 b95479a5 Michael Hanselmann
      if attempt == 2:
1333 b95479a5 Michael Hanselmann
        self.assertEqual(depmgr.CountPendingResults(), 2)
1334 b95479a5 Michael Hanselmann
      elif attempt > 5:
1335 b95479a5 Michael Hanselmann
        self.assertEqual(depmgr.CountPendingResults(), 0)
1336 b95479a5 Michael Hanselmann
      else:
1337 b95479a5 Michael Hanselmann
        self.assertEqual(depmgr.CountPendingResults(), 1)
1338 b95479a5 Michael Hanselmann
1339 b95479a5 Michael Hanselmann
      result = jqueue._JobProcessor(queue, opexec, job)()
1340 b95479a5 Michael Hanselmann
      if attempt == 0 or attempt >= 5:
1341 b95479a5 Michael Hanselmann
        # Job should only be updated if there was an actual change
1342 b95479a5 Michael Hanselmann
        self.assertEqual(queue.GetNextUpdate(), (job, True))
1343 b95479a5 Michael Hanselmann
      self.assertRaises(IndexError, queue.GetNextUpdate)
1344 b95479a5 Michael Hanselmann
      self.assertFalse(depmgr.CountPendingResults())
1345 b95479a5 Michael Hanselmann
1346 b95479a5 Michael Hanselmann
      if attempt < 5:
1347 b95479a5 Michael Hanselmann
        # Simulate waiting for other job
1348 75d81fc8 Michael Hanselmann
        self.assertEqual(result, jqueue._JobProcessor.WAITDEP)
1349 b95479a5 Michael Hanselmann
        self.assertTrue(job.cur_opctx)
1350 47099cd1 Michael Hanselmann
        self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITING)
1351 b95479a5 Michael Hanselmann
        self.assertRaises(IndexError, depmgr.GetNextNotification)
1352 b95479a5 Michael Hanselmann
        self.assert_(job.start_timestamp)
1353 b95479a5 Michael Hanselmann
        self.assertFalse(job.end_timestamp)
1354 b95479a5 Michael Hanselmann
        continue
1355 b95479a5 Michael Hanselmann
1356 75d81fc8 Michael Hanselmann
      if result == jqueue._JobProcessor.FINISHED:
1357 b95479a5 Michael Hanselmann
        # Last opcode
1358 b95479a5 Michael Hanselmann
        self.assertFalse(job.cur_opctx)
1359 b95479a5 Michael Hanselmann
        break
1360 b95479a5 Michael Hanselmann
1361 b95479a5 Michael Hanselmann
      self.assertRaises(IndexError, depmgr.GetNextNotification)
1362 b95479a5 Michael Hanselmann
1363 75d81fc8 Michael Hanselmann
      self.assertEqual(result, jqueue._JobProcessor.DEFER)
1364 b95479a5 Michael Hanselmann
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1365 b95479a5 Michael Hanselmann
      self.assert_(job.start_timestamp)
1366 b95479a5 Michael Hanselmann
      self.assertFalse(job.end_timestamp)
1367 b95479a5 Michael Hanselmann
1368 b95479a5 Michael Hanselmann
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_SUCCESS)
1369 b95479a5 Michael Hanselmann
    self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_SUCCESS])
1370 b95479a5 Michael Hanselmann
    self.assertEqual(job.GetInfo(["opresult"]),
1371 b95479a5 Michael Hanselmann
                     [[op.input.result for op in job.ops]])
1372 b95479a5 Michael Hanselmann
    self.assertEqual(job.GetInfo(["opstatus"]),
1373 b95479a5 Michael Hanselmann
                     [len(job.ops) * [constants.OP_STATUS_SUCCESS]])
1374 b95479a5 Michael Hanselmann
    self.assertTrue(compat.all(op.start_timestamp and op.end_timestamp
1375 b95479a5 Michael Hanselmann
                               for op in job.ops))
1376 b95479a5 Michael Hanselmann
1377 b95479a5 Michael Hanselmann
    self._GenericCheckJob(job)
1378 b95479a5 Michael Hanselmann
1379 b95479a5 Michael Hanselmann
    self.assertRaises(IndexError, queue.GetNextUpdate)
1380 b95479a5 Michael Hanselmann
    self.assertRaises(IndexError, depmgr.GetNextNotification)
1381 b95479a5 Michael Hanselmann
    self.assertFalse(depmgr.CountPendingResults())
1382 b95479a5 Michael Hanselmann
    self.assertFalse(depmgr.CountWaitingJobs())
1383 b95479a5 Michael Hanselmann
1384 b95479a5 Michael Hanselmann
    # Calling the processor on a finished job should be a no-op
1385 75d81fc8 Michael Hanselmann
    self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
1386 75d81fc8 Michael Hanselmann
                     jqueue._JobProcessor.FINISHED)
1387 b95479a5 Michael Hanselmann
    self.assertRaises(IndexError, queue.GetNextUpdate)
1388 b95479a5 Michael Hanselmann
1389 b95479a5 Michael Hanselmann
  def testJobDependencyCancel(self):
1390 b95479a5 Michael Hanselmann
    depmgr = _FakeDependencyManager()
1391 b95479a5 Michael Hanselmann
    queue = _FakeQueueForProc(depmgr=depmgr)
1392 b95479a5 Michael Hanselmann
1393 b95479a5 Michael Hanselmann
    self.assertEqual(queue.depmgr, depmgr)
1394 b95479a5 Michael Hanselmann
1395 b95479a5 Michael Hanselmann
    prev_job_id = 13623
1396 b95479a5 Michael Hanselmann
    job_id = 30876
1397 b95479a5 Michael Hanselmann
    ops = [
1398 b95479a5 Michael Hanselmann
      opcodes.OpTestDummy(result="Res0", fail=False),
1399 b95479a5 Michael Hanselmann
      opcodes.OpTestDummy(result="Res1", fail=False,
1400 b95479a5 Michael Hanselmann
                          depends=[
1401 b95479a5 Michael Hanselmann
                            [prev_job_id, None],
1402 b95479a5 Michael Hanselmann
                            ]),
1403 b95479a5 Michael Hanselmann
      opcodes.OpTestDummy(result="Res2", fail=False),
1404 b95479a5 Michael Hanselmann
      ]
1405 b95479a5 Michael Hanselmann
1406 b95479a5 Michael Hanselmann
    # Create job
1407 b95479a5 Michael Hanselmann
    job = self._CreateJob(queue, job_id, ops)
1408 b95479a5 Michael Hanselmann
1409 b95479a5 Michael Hanselmann
    def _BeforeStart(timeout, priority):
1410 b95479a5 Michael Hanselmann
      if attempt == 0 or attempt > 5:
1411 b95479a5 Michael Hanselmann
        # Job should only be updated when it wasn't waiting for another job
1412 b95479a5 Michael Hanselmann
        self.assertEqual(queue.GetNextUpdate(), (job, True))
1413 b95479a5 Michael Hanselmann
      self.assertRaises(IndexError, queue.GetNextUpdate)
1414 b95479a5 Michael Hanselmann
      self.assertFalse(queue.IsAcquired())
1415 47099cd1 Michael Hanselmann
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITING)
1416 b95479a5 Michael Hanselmann
      self.assertFalse(job.cur_opctx)
1417 b95479a5 Michael Hanselmann
1418 b95479a5 Michael Hanselmann
    def _AfterStart(op, cbs):
1419 b95479a5 Michael Hanselmann
      self.assertEqual(queue.GetNextUpdate(), (job, True))
1420 b95479a5 Michael Hanselmann
      self.assertRaises(IndexError, queue.GetNextUpdate)
1421 b95479a5 Michael Hanselmann
1422 b95479a5 Michael Hanselmann
      self.assertFalse(queue.IsAcquired())
1423 b95479a5 Michael Hanselmann
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
1424 b95479a5 Michael Hanselmann
      self.assertFalse(job.cur_opctx)
1425 b95479a5 Michael Hanselmann
1426 b95479a5 Michael Hanselmann
      # Job is running, cancelling shouldn't be possible
1427 b95479a5 Michael Hanselmann
      (success, _) = job.Cancel()
1428 b95479a5 Michael Hanselmann
      self.assertFalse(success)
1429 b95479a5 Michael Hanselmann
1430 b95479a5 Michael Hanselmann
    opexec = _FakeExecOpCodeForProc(queue, _BeforeStart, _AfterStart)
1431 b95479a5 Michael Hanselmann
1432 b95479a5 Michael Hanselmann
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1433 b95479a5 Michael Hanselmann
1434 b95479a5 Michael Hanselmann
    counter = itertools.count()
1435 b95479a5 Michael Hanselmann
    while True:
1436 b95479a5 Michael Hanselmann
      attempt = counter.next()
1437 b95479a5 Michael Hanselmann
1438 b95479a5 Michael Hanselmann
      self.assertRaises(IndexError, queue.GetNextUpdate)
1439 b95479a5 Michael Hanselmann
      self.assertRaises(IndexError, depmgr.GetNextNotification)
1440 b95479a5 Michael Hanselmann
1441 b95479a5 Michael Hanselmann
      if attempt == 0:
1442 b95479a5 Michael Hanselmann
        # This will handle the first opcode
1443 b95479a5 Michael Hanselmann
        pass
1444 b95479a5 Michael Hanselmann
      elif attempt < 4:
1445 b95479a5 Michael Hanselmann
        depmgr.AddCheckResult(job, prev_job_id, None,
1446 b95479a5 Michael Hanselmann
                              (jqueue._JobDependencyManager.WAIT, "wait"))
1447 b95479a5 Michael Hanselmann
      elif attempt == 4:
1448 b95479a5 Michael Hanselmann
        # Other job was cancelled
1449 b95479a5 Michael Hanselmann
        depmgr.AddCheckResult(job, prev_job_id, None,
1450 b95479a5 Michael Hanselmann
                              (jqueue._JobDependencyManager.CANCEL, "cancel"))
1451 b95479a5 Michael Hanselmann
1452 b95479a5 Michael Hanselmann
      if attempt == 0:
1453 b95479a5 Michael Hanselmann
        self.assertEqual(depmgr.CountPendingResults(), 0)
1454 b95479a5 Michael Hanselmann
      else:
1455 b95479a5 Michael Hanselmann
        self.assertEqual(depmgr.CountPendingResults(), 1)
1456 b95479a5 Michael Hanselmann
1457 b95479a5 Michael Hanselmann
      result = jqueue._JobProcessor(queue, opexec, job)()
1458 b95479a5 Michael Hanselmann
      if attempt <= 1 or attempt >= 4:
1459 b95479a5 Michael Hanselmann
        # Job should only be updated if there was an actual change
1460 b95479a5 Michael Hanselmann
        self.assertEqual(queue.GetNextUpdate(), (job, True))
1461 b95479a5 Michael Hanselmann
      self.assertRaises(IndexError, queue.GetNextUpdate)
1462 b95479a5 Michael Hanselmann
      self.assertFalse(depmgr.CountPendingResults())
1463 b95479a5 Michael Hanselmann
1464 b95479a5 Michael Hanselmann
      if attempt > 0 and attempt < 4:
1465 b95479a5 Michael Hanselmann
        # Simulate waiting for other job
1466 75d81fc8 Michael Hanselmann
        self.assertEqual(result, jqueue._JobProcessor.WAITDEP)
1467 b95479a5 Michael Hanselmann
        self.assertTrue(job.cur_opctx)
1468 47099cd1 Michael Hanselmann
        self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITING)
1469 b95479a5 Michael Hanselmann
        self.assertRaises(IndexError, depmgr.GetNextNotification)
1470 b95479a5 Michael Hanselmann
        self.assert_(job.start_timestamp)
1471 b95479a5 Michael Hanselmann
        self.assertFalse(job.end_timestamp)
1472 b95479a5 Michael Hanselmann
        continue
1473 b95479a5 Michael Hanselmann
1474 75d81fc8 Michael Hanselmann
      if result == jqueue._JobProcessor.FINISHED:
1475 b95479a5 Michael Hanselmann
        # Last opcode
1476 b95479a5 Michael Hanselmann
        self.assertFalse(job.cur_opctx)
1477 b95479a5 Michael Hanselmann
        break
1478 b95479a5 Michael Hanselmann
1479 b95479a5 Michael Hanselmann
      self.assertRaises(IndexError, depmgr.GetNextNotification)
1480 b95479a5 Michael Hanselmann
1481 75d81fc8 Michael Hanselmann
      self.assertEqual(result, jqueue._JobProcessor.DEFER)
1482 b95479a5 Michael Hanselmann
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1483 b95479a5 Michael Hanselmann
      self.assert_(job.start_timestamp)
1484 b95479a5 Michael Hanselmann
      self.assertFalse(job.end_timestamp)
1485 b95479a5 Michael Hanselmann
1486 b95479a5 Michael Hanselmann
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_CANCELED)
1487 b95479a5 Michael Hanselmann
    self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_CANCELED])
1488 b95479a5 Michael Hanselmann
    self.assertEqual(job.GetInfo(["opstatus", "opresult"]),
1489 b95479a5 Michael Hanselmann
                     [[constants.OP_STATUS_SUCCESS,
1490 b95479a5 Michael Hanselmann
                       constants.OP_STATUS_CANCELED,
1491 b95479a5 Michael Hanselmann
                       constants.OP_STATUS_CANCELED],
1492 b95479a5 Michael Hanselmann
                      ["Res0", "Job canceled by request",
1493 b95479a5 Michael Hanselmann
                       "Job canceled by request"]])
1494 b95479a5 Michael Hanselmann
1495 b95479a5 Michael Hanselmann
    self._GenericCheckJob(job)
1496 b95479a5 Michael Hanselmann
1497 b95479a5 Michael Hanselmann
    self.assertRaises(IndexError, queue.GetNextUpdate)
1498 b95479a5 Michael Hanselmann
    self.assertRaises(IndexError, depmgr.GetNextNotification)
1499 b95479a5 Michael Hanselmann
    self.assertFalse(depmgr.CountPendingResults())
1500 b95479a5 Michael Hanselmann
1501 b95479a5 Michael Hanselmann
    # Calling the processor on a finished job should be a no-op
1502 75d81fc8 Michael Hanselmann
    self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
1503 75d81fc8 Michael Hanselmann
                     jqueue._JobProcessor.FINISHED)
1504 b95479a5 Michael Hanselmann
    self.assertRaises(IndexError, queue.GetNextUpdate)
1505 b95479a5 Michael Hanselmann
1506 b95479a5 Michael Hanselmann
  def testJobDependencyWrongstatus(self):
1507 b95479a5 Michael Hanselmann
    depmgr = _FakeDependencyManager()
1508 b95479a5 Michael Hanselmann
    queue = _FakeQueueForProc(depmgr=depmgr)
1509 b95479a5 Michael Hanselmann
1510 b95479a5 Michael Hanselmann
    self.assertEqual(queue.depmgr, depmgr)
1511 b95479a5 Michael Hanselmann
1512 b95479a5 Michael Hanselmann
    prev_job_id = 9741
1513 b95479a5 Michael Hanselmann
    job_id = 11763
1514 b95479a5 Michael Hanselmann
    ops = [
1515 b95479a5 Michael Hanselmann
      opcodes.OpTestDummy(result="Res0", fail=False),
1516 b95479a5 Michael Hanselmann
      opcodes.OpTestDummy(result="Res1", fail=False,
1517 b95479a5 Michael Hanselmann
                          depends=[
1518 b95479a5 Michael Hanselmann
                            [prev_job_id, None],
1519 b95479a5 Michael Hanselmann
                            ]),
1520 b95479a5 Michael Hanselmann
      opcodes.OpTestDummy(result="Res2", fail=False),
1521 b95479a5 Michael Hanselmann
      ]
1522 b95479a5 Michael Hanselmann
1523 b95479a5 Michael Hanselmann
    # Create job
1524 b95479a5 Michael Hanselmann
    job = self._CreateJob(queue, job_id, ops)
1525 b95479a5 Michael Hanselmann
1526 b95479a5 Michael Hanselmann
    def _BeforeStart(timeout, priority):
1527 b95479a5 Michael Hanselmann
      if attempt == 0 or attempt > 5:
1528 b95479a5 Michael Hanselmann
        # Job should only be updated when it wasn't waiting for another job
1529 b95479a5 Michael Hanselmann
        self.assertEqual(queue.GetNextUpdate(), (job, True))
1530 b95479a5 Michael Hanselmann
      self.assertRaises(IndexError, queue.GetNextUpdate)
1531 b95479a5 Michael Hanselmann
      self.assertFalse(queue.IsAcquired())
1532 47099cd1 Michael Hanselmann
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITING)
1533 b95479a5 Michael Hanselmann
      self.assertFalse(job.cur_opctx)
1534 b95479a5 Michael Hanselmann
1535 b95479a5 Michael Hanselmann
    def _AfterStart(op, cbs):
1536 b95479a5 Michael Hanselmann
      self.assertEqual(queue.GetNextUpdate(), (job, True))
1537 b95479a5 Michael Hanselmann
      self.assertRaises(IndexError, queue.GetNextUpdate)
1538 b95479a5 Michael Hanselmann
1539 b95479a5 Michael Hanselmann
      self.assertFalse(queue.IsAcquired())
1540 b95479a5 Michael Hanselmann
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
1541 b95479a5 Michael Hanselmann
      self.assertFalse(job.cur_opctx)
1542 b95479a5 Michael Hanselmann
1543 b95479a5 Michael Hanselmann
      # Job is running, cancelling shouldn't be possible
1544 b95479a5 Michael Hanselmann
      (success, _) = job.Cancel()
1545 b95479a5 Michael Hanselmann
      self.assertFalse(success)
1546 b95479a5 Michael Hanselmann
1547 b95479a5 Michael Hanselmann
    opexec = _FakeExecOpCodeForProc(queue, _BeforeStart, _AfterStart)
1548 b95479a5 Michael Hanselmann
1549 b95479a5 Michael Hanselmann
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1550 b95479a5 Michael Hanselmann
1551 b95479a5 Michael Hanselmann
    counter = itertools.count()
1552 b95479a5 Michael Hanselmann
    while True:
1553 b95479a5 Michael Hanselmann
      attempt = counter.next()
1554 b95479a5 Michael Hanselmann
1555 b95479a5 Michael Hanselmann
      self.assertRaises(IndexError, queue.GetNextUpdate)
1556 b95479a5 Michael Hanselmann
      self.assertRaises(IndexError, depmgr.GetNextNotification)
1557 b95479a5 Michael Hanselmann
1558 b95479a5 Michael Hanselmann
      if attempt == 0:
1559 b95479a5 Michael Hanselmann
        # This will handle the first opcode
1560 b95479a5 Michael Hanselmann
        pass
1561 b95479a5 Michael Hanselmann
      elif attempt < 4:
1562 b95479a5 Michael Hanselmann
        depmgr.AddCheckResult(job, prev_job_id, None,
1563 b95479a5 Michael Hanselmann
                              (jqueue._JobDependencyManager.WAIT, "wait"))
1564 b95479a5 Michael Hanselmann
      elif attempt == 4:
1565 b95479a5 Michael Hanselmann
        # Other job failed
1566 b95479a5 Michael Hanselmann
        depmgr.AddCheckResult(job, prev_job_id, None,
1567 b95479a5 Michael Hanselmann
                              (jqueue._JobDependencyManager.WRONGSTATUS, "w"))
1568 b95479a5 Michael Hanselmann
1569 b95479a5 Michael Hanselmann
      if attempt == 0:
1570 b95479a5 Michael Hanselmann
        self.assertEqual(depmgr.CountPendingResults(), 0)
1571 b95479a5 Michael Hanselmann
      else:
1572 b95479a5 Michael Hanselmann
        self.assertEqual(depmgr.CountPendingResults(), 1)
1573 b95479a5 Michael Hanselmann
1574 b95479a5 Michael Hanselmann
      result = jqueue._JobProcessor(queue, opexec, job)()
1575 b95479a5 Michael Hanselmann
      if attempt <= 1 or attempt >= 4:
1576 b95479a5 Michael Hanselmann
        # Job should only be updated if there was an actual change
1577 b95479a5 Michael Hanselmann
        self.assertEqual(queue.GetNextUpdate(), (job, True))
1578 b95479a5 Michael Hanselmann
      self.assertRaises(IndexError, queue.GetNextUpdate)
1579 b95479a5 Michael Hanselmann
      self.assertFalse(depmgr.CountPendingResults())
1580 b95479a5 Michael Hanselmann
1581 b95479a5 Michael Hanselmann
      if attempt > 0 and attempt < 4:
1582 b95479a5 Michael Hanselmann
        # Simulate waiting for other job
1583 75d81fc8 Michael Hanselmann
        self.assertEqual(result, jqueue._JobProcessor.WAITDEP)
1584 b95479a5 Michael Hanselmann
        self.assertTrue(job.cur_opctx)
1585 47099cd1 Michael Hanselmann
        self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITING)
1586 b95479a5 Michael Hanselmann
        self.assertRaises(IndexError, depmgr.GetNextNotification)
1587 b95479a5 Michael Hanselmann
        self.assert_(job.start_timestamp)
1588 b95479a5 Michael Hanselmann
        self.assertFalse(job.end_timestamp)
1589 b95479a5 Michael Hanselmann
        continue
1590 b95479a5 Michael Hanselmann
1591 75d81fc8 Michael Hanselmann
      if result == jqueue._JobProcessor.FINISHED:
1592 b95479a5 Michael Hanselmann
        # Last opcode
1593 b95479a5 Michael Hanselmann
        self.assertFalse(job.cur_opctx)
1594 b95479a5 Michael Hanselmann
        break
1595 b95479a5 Michael Hanselmann
1596 b95479a5 Michael Hanselmann
      self.assertRaises(IndexError, depmgr.GetNextNotification)
1597 b95479a5 Michael Hanselmann
1598 75d81fc8 Michael Hanselmann
      self.assertEqual(result, jqueue._JobProcessor.DEFER)
1599 b95479a5 Michael Hanselmann
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1600 b95479a5 Michael Hanselmann
      self.assert_(job.start_timestamp)
1601 b95479a5 Michael Hanselmann
      self.assertFalse(job.end_timestamp)
1602 b95479a5 Michael Hanselmann
1603 b95479a5 Michael Hanselmann
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_ERROR)
1604 b95479a5 Michael Hanselmann
    self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_ERROR])
1605 b95479a5 Michael Hanselmann
    self.assertEqual(job.GetInfo(["opstatus"]),
1606 b95479a5 Michael Hanselmann
                     [[constants.OP_STATUS_SUCCESS,
1607 b95479a5 Michael Hanselmann
                       constants.OP_STATUS_ERROR,
1608 b95479a5 Michael Hanselmann
                       constants.OP_STATUS_ERROR]]),
1609 b95479a5 Michael Hanselmann
1610 b95479a5 Michael Hanselmann
    (opresult, ) = job.GetInfo(["opresult"])
1611 b95479a5 Michael Hanselmann
    self.assertEqual(len(opresult), len(ops))
1612 b95479a5 Michael Hanselmann
    self.assertEqual(opresult[0], "Res0")
1613 b95479a5 Michael Hanselmann
    self.assertTrue(errors.GetEncodedError(opresult[1]))
1614 b95479a5 Michael Hanselmann
    self.assertTrue(errors.GetEncodedError(opresult[2]))
1615 b95479a5 Michael Hanselmann
1616 b95479a5 Michael Hanselmann
    self._GenericCheckJob(job)
1617 b95479a5 Michael Hanselmann
1618 b95479a5 Michael Hanselmann
    self.assertRaises(IndexError, queue.GetNextUpdate)
1619 b95479a5 Michael Hanselmann
    self.assertRaises(IndexError, depmgr.GetNextNotification)
1620 b95479a5 Michael Hanselmann
    self.assertFalse(depmgr.CountPendingResults())
1621 b95479a5 Michael Hanselmann
1622 b95479a5 Michael Hanselmann
    # Calling the processor on a finished job should be a no-op
1623 75d81fc8 Michael Hanselmann
    self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
1624 75d81fc8 Michael Hanselmann
                     jqueue._JobProcessor.FINISHED)
1625 b95479a5 Michael Hanselmann
    self.assertRaises(IndexError, queue.GetNextUpdate)
1626 b95479a5 Michael Hanselmann
1627 be760ba8 Michael Hanselmann
1628 26d3fd2f Michael Hanselmann
class _FakeTimeoutStrategy:
1629 26d3fd2f Michael Hanselmann
  def __init__(self, timeouts):
1630 26d3fd2f Michael Hanselmann
    self.timeouts = timeouts
1631 26d3fd2f Michael Hanselmann
    self.attempts = 0
1632 26d3fd2f Michael Hanselmann
    self.last_timeout = None
1633 26d3fd2f Michael Hanselmann
1634 26d3fd2f Michael Hanselmann
  def NextAttempt(self):
1635 26d3fd2f Michael Hanselmann
    self.attempts += 1
1636 26d3fd2f Michael Hanselmann
    if self.timeouts:
1637 26d3fd2f Michael Hanselmann
      timeout = self.timeouts.pop(0)
1638 26d3fd2f Michael Hanselmann
    else:
1639 26d3fd2f Michael Hanselmann
      timeout = None
1640 26d3fd2f Michael Hanselmann
    self.last_timeout = timeout
1641 26d3fd2f Michael Hanselmann
    return timeout
1642 26d3fd2f Michael Hanselmann
1643 26d3fd2f Michael Hanselmann
1644 26d3fd2f Michael Hanselmann
class TestJobProcessorTimeouts(unittest.TestCase, _JobProcessorTestUtils):
1645 26d3fd2f Michael Hanselmann
  def setUp(self):
1646 26d3fd2f Michael Hanselmann
    self.queue = _FakeQueueForProc()
1647 26d3fd2f Michael Hanselmann
    self.job = None
1648 26d3fd2f Michael Hanselmann
    self.curop = None
1649 26d3fd2f Michael Hanselmann
    self.opcounter = None
1650 26d3fd2f Michael Hanselmann
    self.timeout_strategy = None
1651 26d3fd2f Michael Hanselmann
    self.retries = 0
1652 26d3fd2f Michael Hanselmann
    self.prev_tsop = None
1653 26d3fd2f Michael Hanselmann
    self.prev_prio = None
1654 5fd6b694 Michael Hanselmann
    self.prev_status = None
1655 5fd6b694 Michael Hanselmann
    self.lock_acq_prio = None
1656 26d3fd2f Michael Hanselmann
    self.gave_lock = None
1657 26d3fd2f Michael Hanselmann
    self.done_lock_before_blocking = False
1658 26d3fd2f Michael Hanselmann
1659 f23db633 Michael Hanselmann
  def _BeforeStart(self, timeout, priority):
1660 26d3fd2f Michael Hanselmann
    job = self.job
1661 26d3fd2f Michael Hanselmann
1662 5fd6b694 Michael Hanselmann
    # If status has changed, job must've been written
1663 5fd6b694 Michael Hanselmann
    if self.prev_status != self.job.ops[self.curop].status:
1664 5fd6b694 Michael Hanselmann
      self.assertEqual(self.queue.GetNextUpdate(), (job, True))
1665 ebb2a2a3 Michael Hanselmann
    self.assertRaises(IndexError, self.queue.GetNextUpdate)
1666 5fd6b694 Michael Hanselmann
1667 26d3fd2f Michael Hanselmann
    self.assertFalse(self.queue.IsAcquired())
1668 47099cd1 Michael Hanselmann
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITING)
1669 26d3fd2f Michael Hanselmann
1670 26d3fd2f Michael Hanselmann
    ts = self.timeout_strategy
1671 26d3fd2f Michael Hanselmann
1672 26d3fd2f Michael Hanselmann
    self.assert_(timeout is None or isinstance(timeout, (int, float)))
1673 26d3fd2f Michael Hanselmann
    self.assertEqual(timeout, ts.last_timeout)
1674 f23db633 Michael Hanselmann
    self.assertEqual(priority, job.ops[self.curop].priority)
1675 26d3fd2f Michael Hanselmann
1676 26d3fd2f Michael Hanselmann
    self.gave_lock = True
1677 5fd6b694 Michael Hanselmann
    self.lock_acq_prio = priority
1678 26d3fd2f Michael Hanselmann
1679 26d3fd2f Michael Hanselmann
    if (self.curop == 3 and
1680 26d3fd2f Michael Hanselmann
        job.ops[self.curop].priority == constants.OP_PRIO_HIGHEST + 3):
1681 26d3fd2f Michael Hanselmann
      # Give locks before running into blocking acquire
1682 26d3fd2f Michael Hanselmann
      assert self.retries == 7
1683 26d3fd2f Michael Hanselmann
      self.retries = 0
1684 26d3fd2f Michael Hanselmann
      self.done_lock_before_blocking = True
1685 26d3fd2f Michael Hanselmann
      return
1686 26d3fd2f Michael Hanselmann
1687 26d3fd2f Michael Hanselmann
    if self.retries > 0:
1688 26d3fd2f Michael Hanselmann
      self.assert_(timeout is not None)
1689 26d3fd2f Michael Hanselmann
      self.retries -= 1
1690 26d3fd2f Michael Hanselmann
      self.gave_lock = False
1691 26d3fd2f Michael Hanselmann
      raise mcpu.LockAcquireTimeout()
1692 26d3fd2f Michael Hanselmann
1693 26d3fd2f Michael Hanselmann
    if job.ops[self.curop].priority == constants.OP_PRIO_HIGHEST:
1694 26d3fd2f Michael Hanselmann
      assert self.retries == 0, "Didn't exhaust all retries at highest priority"
1695 26d3fd2f Michael Hanselmann
      assert not ts.timeouts
1696 26d3fd2f Michael Hanselmann
      self.assert_(timeout is None)
1697 26d3fd2f Michael Hanselmann
1698 26d3fd2f Michael Hanselmann
  def _AfterStart(self, op, cbs):
1699 26d3fd2f Michael Hanselmann
    job = self.job
1700 26d3fd2f Michael Hanselmann
1701 5fd6b694 Michael Hanselmann
    # Setting to "running" requires an update
1702 ebb2a2a3 Michael Hanselmann
    self.assertEqual(self.queue.GetNextUpdate(), (job, True))
1703 ebb2a2a3 Michael Hanselmann
    self.assertRaises(IndexError, self.queue.GetNextUpdate)
1704 5fd6b694 Michael Hanselmann
1705 26d3fd2f Michael Hanselmann
    self.assertFalse(self.queue.IsAcquired())
1706 26d3fd2f Michael Hanselmann
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
1707 26d3fd2f Michael Hanselmann
1708 26d3fd2f Michael Hanselmann
    # Job is running, cancelling shouldn't be possible
1709 26d3fd2f Michael Hanselmann
    (success, _) = job.Cancel()
1710 26d3fd2f Michael Hanselmann
    self.assertFalse(success)
1711 26d3fd2f Michael Hanselmann
1712 26d3fd2f Michael Hanselmann
  def _NextOpcode(self):
1713 26d3fd2f Michael Hanselmann
    self.curop = self.opcounter.next()
1714 26d3fd2f Michael Hanselmann
    self.prev_prio = self.job.ops[self.curop].priority
1715 5fd6b694 Michael Hanselmann
    self.prev_status = self.job.ops[self.curop].status
1716 26d3fd2f Michael Hanselmann
1717 26d3fd2f Michael Hanselmann
  def _NewTimeoutStrategy(self):
1718 26d3fd2f Michael Hanselmann
    job = self.job
1719 26d3fd2f Michael Hanselmann
1720 26d3fd2f Michael Hanselmann
    self.assertEqual(self.retries, 0)
1721 26d3fd2f Michael Hanselmann
1722 26d3fd2f Michael Hanselmann
    if self.prev_tsop == self.curop:
1723 26d3fd2f Michael Hanselmann
      # Still on the same opcode, priority must've been increased
1724 26d3fd2f Michael Hanselmann
      self.assertEqual(self.prev_prio, job.ops[self.curop].priority + 1)
1725 26d3fd2f Michael Hanselmann
1726 26d3fd2f Michael Hanselmann
    if self.curop == 1:
1727 26d3fd2f Michael Hanselmann
      # Normal retry
1728 26d3fd2f Michael Hanselmann
      timeouts = range(10, 31, 10)
1729 26d3fd2f Michael Hanselmann
      self.retries = len(timeouts) - 1
1730 26d3fd2f Michael Hanselmann
1731 26d3fd2f Michael Hanselmann
    elif self.curop == 2:
1732 26d3fd2f Michael Hanselmann
      # Let this run into a blocking acquire
1733 26d3fd2f Michael Hanselmann
      timeouts = range(11, 61, 12)
1734 26d3fd2f Michael Hanselmann
      self.retries = len(timeouts)
1735 26d3fd2f Michael Hanselmann
1736 26d3fd2f Michael Hanselmann
    elif self.curop == 3:
1737 26d3fd2f Michael Hanselmann
      # Wait for priority to increase, but give lock before blocking acquire
1738 26d3fd2f Michael Hanselmann
      timeouts = range(12, 100, 14)
1739 26d3fd2f Michael Hanselmann
      self.retries = len(timeouts)
1740 26d3fd2f Michael Hanselmann
1741 26d3fd2f Michael Hanselmann
      self.assertFalse(self.done_lock_before_blocking)
1742 26d3fd2f Michael Hanselmann
1743 26d3fd2f Michael Hanselmann
    elif self.curop == 4:
1744 26d3fd2f Michael Hanselmann
      self.assert_(self.done_lock_before_blocking)
1745 26d3fd2f Michael Hanselmann
1746 26d3fd2f Michael Hanselmann
      # Timeouts, but no need to retry
1747 26d3fd2f Michael Hanselmann
      timeouts = range(10, 31, 10)
1748 26d3fd2f Michael Hanselmann
      self.retries = 0
1749 26d3fd2f Michael Hanselmann
1750 26d3fd2f Michael Hanselmann
    elif self.curop == 5:
1751 26d3fd2f Michael Hanselmann
      # Normal retry
1752 26d3fd2f Michael Hanselmann
      timeouts = range(19, 100, 11)
1753 26d3fd2f Michael Hanselmann
      self.retries = len(timeouts)
1754 26d3fd2f Michael Hanselmann
1755 26d3fd2f Michael Hanselmann
    else:
1756 26d3fd2f Michael Hanselmann
      timeouts = []
1757 26d3fd2f Michael Hanselmann
      self.retries = 0
1758 26d3fd2f Michael Hanselmann
1759 26d3fd2f Michael Hanselmann
    assert len(job.ops) == 10
1760 26d3fd2f Michael Hanselmann
    assert self.retries <= len(timeouts)
1761 26d3fd2f Michael Hanselmann
1762 26d3fd2f Michael Hanselmann
    ts = _FakeTimeoutStrategy(timeouts)
1763 26d3fd2f Michael Hanselmann
1764 26d3fd2f Michael Hanselmann
    self.timeout_strategy = ts
1765 26d3fd2f Michael Hanselmann
    self.prev_tsop = self.curop
1766 26d3fd2f Michael Hanselmann
    self.prev_prio = job.ops[self.curop].priority
1767 26d3fd2f Michael Hanselmann
1768 26d3fd2f Michael Hanselmann
    return ts
1769 26d3fd2f Michael Hanselmann
1770 26d3fd2f Michael Hanselmann
  def testTimeout(self):
1771 26d3fd2f Michael Hanselmann
    ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
1772 26d3fd2f Michael Hanselmann
           for i in range(10)]
1773 26d3fd2f Michael Hanselmann
1774 26d3fd2f Michael Hanselmann
    # Create job
1775 26d3fd2f Michael Hanselmann
    job_id = 15801
1776 26d3fd2f Michael Hanselmann
    job = self._CreateJob(self.queue, job_id, ops)
1777 26d3fd2f Michael Hanselmann
    self.job = job
1778 26d3fd2f Michael Hanselmann
1779 26d3fd2f Michael Hanselmann
    self.opcounter = itertools.count(0)
1780 26d3fd2f Michael Hanselmann
1781 ebb2a2a3 Michael Hanselmann
    opexec = _FakeExecOpCodeForProc(self.queue, self._BeforeStart,
1782 ebb2a2a3 Michael Hanselmann
                                    self._AfterStart)
1783 26d3fd2f Michael Hanselmann
    tsf = self._NewTimeoutStrategy
1784 26d3fd2f Michael Hanselmann
1785 26d3fd2f Michael Hanselmann
    self.assertFalse(self.done_lock_before_blocking)
1786 26d3fd2f Michael Hanselmann
1787 5fd6b694 Michael Hanselmann
    while True:
1788 26d3fd2f Michael Hanselmann
      proc = jqueue._JobProcessor(self.queue, opexec, job,
1789 26d3fd2f Michael Hanselmann
                                  _timeout_strategy_factory=tsf)
1790 26d3fd2f Michael Hanselmann
1791 ebb2a2a3 Michael Hanselmann
      self.assertRaises(IndexError, self.queue.GetNextUpdate)
1792 5fd6b694 Michael Hanselmann
1793 5fd6b694 Michael Hanselmann
      if self.curop is not None:
1794 5fd6b694 Michael Hanselmann
        self.prev_status = self.job.ops[self.curop].status
1795 5fd6b694 Michael Hanselmann
1796 5fd6b694 Michael Hanselmann
      self.lock_acq_prio = None
1797 5fd6b694 Michael Hanselmann
1798 26d3fd2f Michael Hanselmann
      result = proc(_nextop_fn=self._NextOpcode)
1799 5fd6b694 Michael Hanselmann
      assert self.curop is not None
1800 5fd6b694 Michael Hanselmann
1801 75d81fc8 Michael Hanselmann
      if result == jqueue._JobProcessor.FINISHED or self.gave_lock:
1802 5fd6b694 Michael Hanselmann
        # Got lock and/or job is done, result must've been written
1803 5fd6b694 Michael Hanselmann
        self.assertFalse(job.cur_opctx)
1804 5fd6b694 Michael Hanselmann
        self.assertEqual(self.queue.GetNextUpdate(), (job, True))
1805 5fd6b694 Michael Hanselmann
        self.assertRaises(IndexError, self.queue.GetNextUpdate)
1806 5fd6b694 Michael Hanselmann
        self.assertEqual(self.lock_acq_prio, job.ops[self.curop].priority)
1807 5fd6b694 Michael Hanselmann
        self.assert_(job.ops[self.curop].exec_timestamp)
1808 5fd6b694 Michael Hanselmann
1809 75d81fc8 Michael Hanselmann
      if result == jqueue._JobProcessor.FINISHED:
1810 26d3fd2f Michael Hanselmann
        self.assertFalse(job.cur_opctx)
1811 26d3fd2f Michael Hanselmann
        break
1812 26d3fd2f Michael Hanselmann
1813 75d81fc8 Michael Hanselmann
      self.assertEqual(result, jqueue._JobProcessor.DEFER)
1814 26d3fd2f Michael Hanselmann
1815 5fd6b694 Michael Hanselmann
      if self.curop == 0:
1816 5fd6b694 Michael Hanselmann
        self.assertEqual(job.ops[self.curop].start_timestamp,
1817 5fd6b694 Michael Hanselmann
                         job.start_timestamp)
1818 5fd6b694 Michael Hanselmann
1819 26d3fd2f Michael Hanselmann
      if self.gave_lock:
1820 5fd6b694 Michael Hanselmann
        # Opcode finished, but job not yet done
1821 5fd6b694 Michael Hanselmann
        self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1822 26d3fd2f Michael Hanselmann
      else:
1823 5fd6b694 Michael Hanselmann
        # Did not get locks
1824 26d3fd2f Michael Hanselmann
        self.assert_(job.cur_opctx)
1825 26d3fd2f Michael Hanselmann
        self.assertEqual(job.cur_opctx._timeout_strategy._fn,
1826 26d3fd2f Michael Hanselmann
                         self.timeout_strategy.NextAttempt)
1827 5fd6b694 Michael Hanselmann
        self.assertFalse(job.ops[self.curop].exec_timestamp)
1828 47099cd1 Michael Hanselmann
        self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITING)
1829 5fd6b694 Michael Hanselmann
1830 5fd6b694 Michael Hanselmann
        # If priority has changed since acquiring locks, the job must've been
1831 5fd6b694 Michael Hanselmann
        # updated
1832 5fd6b694 Michael Hanselmann
        if self.lock_acq_prio != job.ops[self.curop].priority:
1833 5fd6b694 Michael Hanselmann
          self.assertEqual(self.queue.GetNextUpdate(), (job, True))
1834 5fd6b694 Michael Hanselmann
1835 5fd6b694 Michael Hanselmann
      self.assertRaises(IndexError, self.queue.GetNextUpdate)
1836 26d3fd2f Michael Hanselmann
1837 26d3fd2f Michael Hanselmann
      self.assert_(job.start_timestamp)
1838 26d3fd2f Michael Hanselmann
      self.assertFalse(job.end_timestamp)
1839 26d3fd2f Michael Hanselmann
1840 26d3fd2f Michael Hanselmann
    self.assertEqual(self.curop, len(job.ops) - 1)
1841 26d3fd2f Michael Hanselmann
    self.assertEqual(self.job, job)
1842 26d3fd2f Michael Hanselmann
    self.assertEqual(self.opcounter.next(), len(job.ops))
1843 26d3fd2f Michael Hanselmann
    self.assert_(self.done_lock_before_blocking)
1844 26d3fd2f Michael Hanselmann
1845 ebb2a2a3 Michael Hanselmann
    self.assertRaises(IndexError, self.queue.GetNextUpdate)
1846 26d3fd2f Michael Hanselmann
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_SUCCESS)
1847 26d3fd2f Michael Hanselmann
    self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_SUCCESS])
1848 26d3fd2f Michael Hanselmann
    self.assertEqual(job.GetInfo(["opresult"]),
1849 26d3fd2f Michael Hanselmann
                     [[op.input.result for op in job.ops]])
1850 26d3fd2f Michael Hanselmann
    self.assertEqual(job.GetInfo(["opstatus"]),
1851 26d3fd2f Michael Hanselmann
                     [len(job.ops) * [constants.OP_STATUS_SUCCESS]])
1852 26d3fd2f Michael Hanselmann
    self.assert_(compat.all(op.start_timestamp and op.end_timestamp
1853 26d3fd2f Michael Hanselmann
                            for op in job.ops))
1854 26d3fd2f Michael Hanselmann
1855 66bd7445 Michael Hanselmann
    # Calling the processor on a finished job should be a no-op
1856 75d81fc8 Michael Hanselmann
    self.assertEqual(jqueue._JobProcessor(self.queue, opexec, job)(),
1857 75d81fc8 Michael Hanselmann
                     jqueue._JobProcessor.FINISHED)
1858 66bd7445 Michael Hanselmann
    self.assertRaises(IndexError, self.queue.GetNextUpdate)
1859 26d3fd2f Michael Hanselmann
1860 26d3fd2f Michael Hanselmann
1861 b95479a5 Michael Hanselmann
class TestJobDependencyManager(unittest.TestCase):
1862 b95479a5 Michael Hanselmann
  class _FakeJob:
1863 b95479a5 Michael Hanselmann
    def __init__(self, job_id):
1864 b95479a5 Michael Hanselmann
      self.id = str(job_id)
1865 b95479a5 Michael Hanselmann
1866 b95479a5 Michael Hanselmann
  def setUp(self):
1867 b95479a5 Michael Hanselmann
    self._status = []
1868 b95479a5 Michael Hanselmann
    self._queue = []
1869 b95479a5 Michael Hanselmann
    self.jdm = jqueue._JobDependencyManager(self._GetStatus, self._Enqueue)
1870 b95479a5 Michael Hanselmann
1871 b95479a5 Michael Hanselmann
  def _GetStatus(self, job_id):
1872 b95479a5 Michael Hanselmann
    (exp_job_id, result) = self._status.pop(0)
1873 b95479a5 Michael Hanselmann
    self.assertEqual(exp_job_id, job_id)
1874 b95479a5 Michael Hanselmann
    return result
1875 b95479a5 Michael Hanselmann
1876 b95479a5 Michael Hanselmann
  def _Enqueue(self, jobs):
1877 b95479a5 Michael Hanselmann
    self._queue.append(jobs)
1878 b95479a5 Michael Hanselmann
1879 b95479a5 Michael Hanselmann
  def testNotFinalizedThenCancel(self):
1880 b95479a5 Michael Hanselmann
    job = self._FakeJob(17697)
1881 b95479a5 Michael Hanselmann
    job_id = str(28625)
1882 b95479a5 Michael Hanselmann
1883 b95479a5 Michael Hanselmann
    self._status.append((job_id, constants.JOB_STATUS_RUNNING))
1884 b95479a5 Michael Hanselmann
    (result, _) = self.jdm.CheckAndRegister(job, job_id, [])
1885 b95479a5 Michael Hanselmann
    self.assertEqual(result, self.jdm.WAIT)
1886 b95479a5 Michael Hanselmann
    self.assertFalse(self._status)
1887 b95479a5 Michael Hanselmann
    self.assertFalse(self._queue)
1888 b95479a5 Michael Hanselmann
    self.assertTrue(self.jdm.JobWaiting(job))
1889 b95479a5 Michael Hanselmann
    self.assertEqual(self.jdm._waiters, {
1890 b95479a5 Michael Hanselmann
      job_id: set([job]),
1891 b95479a5 Michael Hanselmann
      })
1892 fcb21ad7 Michael Hanselmann
    self.assertEqual(self.jdm.GetLockInfo([query.LQ_PENDING]), [
1893 fcb21ad7 Michael Hanselmann
      ("job/28625", None, None, [("job", [job.id])])
1894 fcb21ad7 Michael Hanselmann
      ])
1895 b95479a5 Michael Hanselmann
1896 b95479a5 Michael Hanselmann
    self._status.append((job_id, constants.JOB_STATUS_CANCELED))
1897 b95479a5 Michael Hanselmann
    (result, _) = self.jdm.CheckAndRegister(job, job_id, [])
1898 b95479a5 Michael Hanselmann
    self.assertEqual(result, self.jdm.CANCEL)
1899 b95479a5 Michael Hanselmann
    self.assertFalse(self._status)
1900 b95479a5 Michael Hanselmann
    self.assertFalse(self._queue)
1901 b95479a5 Michael Hanselmann
    self.assertFalse(self.jdm.JobWaiting(job))
1902 fcb21ad7 Michael Hanselmann
    self.assertFalse(self.jdm.GetLockInfo([query.LQ_PENDING]))
1903 b95479a5 Michael Hanselmann
1904 b95479a5 Michael Hanselmann
  def testRequireCancel(self):
1905 b95479a5 Michael Hanselmann
    job = self._FakeJob(5278)
1906 b95479a5 Michael Hanselmann
    job_id = str(9610)
1907 b95479a5 Michael Hanselmann
    dep_status = [constants.JOB_STATUS_CANCELED]
1908 b95479a5 Michael Hanselmann
1909 47099cd1 Michael Hanselmann
    self._status.append((job_id, constants.JOB_STATUS_WAITING))
1910 b95479a5 Michael Hanselmann
    (result, _) = self.jdm.CheckAndRegister(job, job_id, dep_status)
1911 b95479a5 Michael Hanselmann
    self.assertEqual(result, self.jdm.WAIT)
1912 b95479a5 Michael Hanselmann
    self.assertFalse(self._status)
1913 b95479a5 Michael Hanselmann
    self.assertFalse(self._queue)
1914 b95479a5 Michael Hanselmann
    self.assertTrue(self.jdm.JobWaiting(job))
1915 b95479a5 Michael Hanselmann
    self.assertEqual(self.jdm._waiters, {
1916 b95479a5 Michael Hanselmann
      job_id: set([job]),
1917 b95479a5 Michael Hanselmann
      })
1918 fcb21ad7 Michael Hanselmann
    self.assertEqual(self.jdm.GetLockInfo([query.LQ_PENDING]), [
1919 fcb21ad7 Michael Hanselmann
      ("job/9610", None, None, [("job", [job.id])])
1920 fcb21ad7 Michael Hanselmann
      ])
1921 b95479a5 Michael Hanselmann
1922 b95479a5 Michael Hanselmann
    self._status.append((job_id, constants.JOB_STATUS_CANCELED))
1923 b95479a5 Michael Hanselmann
    (result, _) = self.jdm.CheckAndRegister(job, job_id, dep_status)
1924 b95479a5 Michael Hanselmann
    self.assertEqual(result, self.jdm.CONTINUE)
1925 b95479a5 Michael Hanselmann
    self.assertFalse(self._status)
1926 b95479a5 Michael Hanselmann
    self.assertFalse(self._queue)
1927 b95479a5 Michael Hanselmann
    self.assertFalse(self.jdm.JobWaiting(job))
1928 fcb21ad7 Michael Hanselmann
    self.assertFalse(self.jdm.GetLockInfo([query.LQ_PENDING]))
1929 b95479a5 Michael Hanselmann
1930 b95479a5 Michael Hanselmann
  def testRequireError(self):
1931 b95479a5 Michael Hanselmann
    job = self._FakeJob(21459)
1932 b95479a5 Michael Hanselmann
    job_id = str(25519)
1933 b95479a5 Michael Hanselmann
    dep_status = [constants.JOB_STATUS_ERROR]
1934 b95479a5 Michael Hanselmann
1935 47099cd1 Michael Hanselmann
    self._status.append((job_id, constants.JOB_STATUS_WAITING))
1936 b95479a5 Michael Hanselmann
    (result, _) = self.jdm.CheckAndRegister(job, job_id, dep_status)
1937 b95479a5 Michael Hanselmann
    self.assertEqual(result, self.jdm.WAIT)
1938 b95479a5 Michael Hanselmann
    self.assertFalse(self._status)
1939 b95479a5 Michael Hanselmann
    self.assertFalse(self._queue)
1940 b95479a5 Michael Hanselmann
    self.assertTrue(self.jdm.JobWaiting(job))
1941 b95479a5 Michael Hanselmann
    self.assertEqual(self.jdm._waiters, {
1942 b95479a5 Michael Hanselmann
      job_id: set([job]),
1943 b95479a5 Michael Hanselmann
      })
1944 b95479a5 Michael Hanselmann
1945 b95479a5 Michael Hanselmann
    self._status.append((job_id, constants.JOB_STATUS_ERROR))
1946 b95479a5 Michael Hanselmann
    (result, _) = self.jdm.CheckAndRegister(job, job_id, dep_status)
1947 b95479a5 Michael Hanselmann
    self.assertEqual(result, self.jdm.CONTINUE)
1948 b95479a5 Michael Hanselmann
    self.assertFalse(self._status)
1949 b95479a5 Michael Hanselmann
    self.assertFalse(self._queue)
1950 b95479a5 Michael Hanselmann
    self.assertFalse(self.jdm.JobWaiting(job))
1951 fcb21ad7 Michael Hanselmann
    self.assertFalse(self.jdm.GetLockInfo([query.LQ_PENDING]))
1952 b95479a5 Michael Hanselmann
1953 b95479a5 Michael Hanselmann
  def testRequireMultiple(self):
1954 b95479a5 Michael Hanselmann
    dep_status = list(constants.JOBS_FINALIZED)
1955 b95479a5 Michael Hanselmann
1956 b95479a5 Michael Hanselmann
    for end_status in dep_status:
1957 b95479a5 Michael Hanselmann
      job = self._FakeJob(21343)
1958 b95479a5 Michael Hanselmann
      job_id = str(14609)
1959 b95479a5 Michael Hanselmann
1960 47099cd1 Michael Hanselmann
      self._status.append((job_id, constants.JOB_STATUS_WAITING))
1961 b95479a5 Michael Hanselmann
      (result, _) = self.jdm.CheckAndRegister(job, job_id, dep_status)
1962 b95479a5 Michael Hanselmann
      self.assertEqual(result, self.jdm.WAIT)
1963 b95479a5 Michael Hanselmann
      self.assertFalse(self._status)
1964 b95479a5 Michael Hanselmann
      self.assertFalse(self._queue)
1965 b95479a5 Michael Hanselmann
      self.assertTrue(self.jdm.JobWaiting(job))
1966 b95479a5 Michael Hanselmann
      self.assertEqual(self.jdm._waiters, {
1967 b95479a5 Michael Hanselmann
        job_id: set([job]),
1968 b95479a5 Michael Hanselmann
        })
1969 fcb21ad7 Michael Hanselmann
      self.assertEqual(self.jdm.GetLockInfo([query.LQ_PENDING]), [
1970 fcb21ad7 Michael Hanselmann
        ("job/14609", None, None, [("job", [job.id])])
1971 fcb21ad7 Michael Hanselmann
        ])
1972 b95479a5 Michael Hanselmann
1973 b95479a5 Michael Hanselmann
      self._status.append((job_id, end_status))
1974 b95479a5 Michael Hanselmann
      (result, _) = self.jdm.CheckAndRegister(job, job_id, dep_status)
1975 b95479a5 Michael Hanselmann
      self.assertEqual(result, self.jdm.CONTINUE)
1976 b95479a5 Michael Hanselmann
      self.assertFalse(self._status)
1977 b95479a5 Michael Hanselmann
      self.assertFalse(self._queue)
1978 b95479a5 Michael Hanselmann
      self.assertFalse(self.jdm.JobWaiting(job))
1979 fcb21ad7 Michael Hanselmann
      self.assertFalse(self.jdm.GetLockInfo([query.LQ_PENDING]))
1980 b95479a5 Michael Hanselmann
1981 b95479a5 Michael Hanselmann
  def testNotify(self):
1982 b95479a5 Michael Hanselmann
    job = self._FakeJob(8227)
1983 b95479a5 Michael Hanselmann
    job_id = str(4113)
1984 b95479a5 Michael Hanselmann
1985 b95479a5 Michael Hanselmann
    self._status.append((job_id, constants.JOB_STATUS_RUNNING))
1986 b95479a5 Michael Hanselmann
    (result, _) = self.jdm.CheckAndRegister(job, job_id, [])
1987 b95479a5 Michael Hanselmann
    self.assertEqual(result, self.jdm.WAIT)
1988 b95479a5 Michael Hanselmann
    self.assertFalse(self._status)
1989 b95479a5 Michael Hanselmann
    self.assertFalse(self._queue)
1990 b95479a5 Michael Hanselmann
    self.assertTrue(self.jdm.JobWaiting(job))
1991 b95479a5 Michael Hanselmann
    self.assertEqual(self.jdm._waiters, {
1992 b95479a5 Michael Hanselmann
      job_id: set([job]),
1993 b95479a5 Michael Hanselmann
      })
1994 b95479a5 Michael Hanselmann
1995 b95479a5 Michael Hanselmann
    self.jdm.NotifyWaiters(job_id)
1996 b95479a5 Michael Hanselmann
    self.assertFalse(self._status)
1997 b95479a5 Michael Hanselmann
    self.assertFalse(self.jdm._waiters)
1998 b95479a5 Michael Hanselmann
    self.assertFalse(self.jdm.JobWaiting(job))
1999 b95479a5 Michael Hanselmann
    self.assertEqual(self._queue, [set([job])])
2000 b95479a5 Michael Hanselmann
2001 b95479a5 Michael Hanselmann
  def testWrongStatus(self):
2002 b95479a5 Michael Hanselmann
    job = self._FakeJob(10102)
2003 b95479a5 Michael Hanselmann
    job_id = str(1271)
2004 b95479a5 Michael Hanselmann
2005 b95479a5 Michael Hanselmann
    self._status.append((job_id, constants.JOB_STATUS_QUEUED))
2006 b95479a5 Michael Hanselmann
    (result, _) = self.jdm.CheckAndRegister(job, job_id,
2007 b95479a5 Michael Hanselmann
                                            [constants.JOB_STATUS_SUCCESS])
2008 b95479a5 Michael Hanselmann
    self.assertEqual(result, self.jdm.WAIT)
2009 b95479a5 Michael Hanselmann
    self.assertFalse(self._status)
2010 b95479a5 Michael Hanselmann
    self.assertFalse(self._queue)
2011 b95479a5 Michael Hanselmann
    self.assertTrue(self.jdm.JobWaiting(job))
2012 b95479a5 Michael Hanselmann
    self.assertEqual(self.jdm._waiters, {
2013 b95479a5 Michael Hanselmann
      job_id: set([job]),
2014 b95479a5 Michael Hanselmann
      })
2015 b95479a5 Michael Hanselmann
2016 b95479a5 Michael Hanselmann
    self._status.append((job_id, constants.JOB_STATUS_ERROR))
2017 b95479a5 Michael Hanselmann
    (result, _) = self.jdm.CheckAndRegister(job, job_id,
2018 b95479a5 Michael Hanselmann
                                            [constants.JOB_STATUS_SUCCESS])
2019 b95479a5 Michael Hanselmann
    self.assertEqual(result, self.jdm.WRONGSTATUS)
2020 b95479a5 Michael Hanselmann
    self.assertFalse(self._status)
2021 b95479a5 Michael Hanselmann
    self.assertFalse(self._queue)
2022 b95479a5 Michael Hanselmann
    self.assertFalse(self.jdm.JobWaiting(job))
2023 b95479a5 Michael Hanselmann
2024 b95479a5 Michael Hanselmann
  def testCorrectStatus(self):
2025 b95479a5 Michael Hanselmann
    job = self._FakeJob(24273)
2026 b95479a5 Michael Hanselmann
    job_id = str(23885)
2027 b95479a5 Michael Hanselmann
2028 b95479a5 Michael Hanselmann
    self._status.append((job_id, constants.JOB_STATUS_QUEUED))
2029 b95479a5 Michael Hanselmann
    (result, _) = self.jdm.CheckAndRegister(job, job_id,
2030 b95479a5 Michael Hanselmann
                                            [constants.JOB_STATUS_SUCCESS])
2031 b95479a5 Michael Hanselmann
    self.assertEqual(result, self.jdm.WAIT)
2032 b95479a5 Michael Hanselmann
    self.assertFalse(self._status)
2033 b95479a5 Michael Hanselmann
    self.assertFalse(self._queue)
2034 b95479a5 Michael Hanselmann
    self.assertTrue(self.jdm.JobWaiting(job))
2035 b95479a5 Michael Hanselmann
    self.assertEqual(self.jdm._waiters, {
2036 b95479a5 Michael Hanselmann
      job_id: set([job]),
2037 b95479a5 Michael Hanselmann
      })
2038 b95479a5 Michael Hanselmann
2039 b95479a5 Michael Hanselmann
    self._status.append((job_id, constants.JOB_STATUS_SUCCESS))
2040 b95479a5 Michael Hanselmann
    (result, _) = self.jdm.CheckAndRegister(job, job_id,
2041 b95479a5 Michael Hanselmann
                                            [constants.JOB_STATUS_SUCCESS])
2042 b95479a5 Michael Hanselmann
    self.assertEqual(result, self.jdm.CONTINUE)
2043 b95479a5 Michael Hanselmann
    self.assertFalse(self._status)
2044 b95479a5 Michael Hanselmann
    self.assertFalse(self._queue)
2045 b95479a5 Michael Hanselmann
    self.assertFalse(self.jdm.JobWaiting(job))
2046 b95479a5 Michael Hanselmann
2047 b95479a5 Michael Hanselmann
  def testFinalizedRightAway(self):
2048 b95479a5 Michael Hanselmann
    job = self._FakeJob(224)
2049 b95479a5 Michael Hanselmann
    job_id = str(3081)
2050 b95479a5 Michael Hanselmann
2051 b95479a5 Michael Hanselmann
    self._status.append((job_id, constants.JOB_STATUS_SUCCESS))
2052 b95479a5 Michael Hanselmann
    (result, _) = self.jdm.CheckAndRegister(job, job_id,
2053 b95479a5 Michael Hanselmann
                                            [constants.JOB_STATUS_SUCCESS])
2054 b95479a5 Michael Hanselmann
    self.assertEqual(result, self.jdm.CONTINUE)
2055 b95479a5 Michael Hanselmann
    self.assertFalse(self._status)
2056 b95479a5 Michael Hanselmann
    self.assertFalse(self._queue)
2057 b95479a5 Michael Hanselmann
    self.assertFalse(self.jdm.JobWaiting(job))
2058 b95479a5 Michael Hanselmann
    self.assertEqual(self.jdm._waiters, {
2059 b95479a5 Michael Hanselmann
      job_id: set(),
2060 b95479a5 Michael Hanselmann
      })
2061 b95479a5 Michael Hanselmann
2062 b95479a5 Michael Hanselmann
    # Force cleanup
2063 b95479a5 Michael Hanselmann
    self.jdm.NotifyWaiters("0")
2064 b95479a5 Michael Hanselmann
    self.assertFalse(self.jdm._waiters)
2065 b95479a5 Michael Hanselmann
    self.assertFalse(self._status)
2066 b95479a5 Michael Hanselmann
    self.assertFalse(self._queue)
2067 b95479a5 Michael Hanselmann
2068 fcb21ad7 Michael Hanselmann
  def testMultipleWaiting(self):
2069 fcb21ad7 Michael Hanselmann
    # Use a deterministic random generator
2070 fcb21ad7 Michael Hanselmann
    rnd = random.Random(21402)
2071 fcb21ad7 Michael Hanselmann
2072 fcb21ad7 Michael Hanselmann
    job_ids = map(str, rnd.sample(range(1, 10000), 150))
2073 fcb21ad7 Michael Hanselmann
2074 fcb21ad7 Michael Hanselmann
    waiters = dict((job_ids.pop(),
2075 fcb21ad7 Michael Hanselmann
                    set(map(self._FakeJob,
2076 fcb21ad7 Michael Hanselmann
                            [job_ids.pop()
2077 fcb21ad7 Michael Hanselmann
                             for _ in range(rnd.randint(1, 20))])))
2078 fcb21ad7 Michael Hanselmann
                   for _ in range(10))
2079 fcb21ad7 Michael Hanselmann
2080 fcb21ad7 Michael Hanselmann
    # Ensure there are no duplicate job IDs
2081 fcb21ad7 Michael Hanselmann
    assert not utils.FindDuplicates(waiters.keys() +
2082 fcb21ad7 Michael Hanselmann
                                    [job.id
2083 fcb21ad7 Michael Hanselmann
                                     for jobs in waiters.values()
2084 fcb21ad7 Michael Hanselmann
                                     for job in jobs])
2085 fcb21ad7 Michael Hanselmann
2086 fcb21ad7 Michael Hanselmann
    # Register all jobs as waiters
2087 fcb21ad7 Michael Hanselmann
    for job_id, job in [(job_id, job)
2088 fcb21ad7 Michael Hanselmann
                        for (job_id, jobs) in waiters.items()
2089 fcb21ad7 Michael Hanselmann
                        for job in jobs]:
2090 fcb21ad7 Michael Hanselmann
      self._status.append((job_id, constants.JOB_STATUS_QUEUED))
2091 fcb21ad7 Michael Hanselmann
      (result, _) = self.jdm.CheckAndRegister(job, job_id,
2092 fcb21ad7 Michael Hanselmann
                                              [constants.JOB_STATUS_SUCCESS])
2093 fcb21ad7 Michael Hanselmann
      self.assertEqual(result, self.jdm.WAIT)
2094 fcb21ad7 Michael Hanselmann
      self.assertFalse(self._status)
2095 fcb21ad7 Michael Hanselmann
      self.assertFalse(self._queue)
2096 fcb21ad7 Michael Hanselmann
      self.assertTrue(self.jdm.JobWaiting(job))
2097 fcb21ad7 Michael Hanselmann
2098 fcb21ad7 Michael Hanselmann
    self.assertEqual(self.jdm._waiters, waiters)
2099 fcb21ad7 Michael Hanselmann
2100 fcb21ad7 Michael Hanselmann
    def _MakeSet((name, mode, owner_names, pending)):
2101 fcb21ad7 Michael Hanselmann
      return (name, mode, owner_names,
2102 fcb21ad7 Michael Hanselmann
              [(pendmode, set(pend)) for (pendmode, pend) in pending])
2103 fcb21ad7 Michael Hanselmann
2104 fcb21ad7 Michael Hanselmann
    def _CheckLockInfo():
2105 fcb21ad7 Michael Hanselmann
      info = self.jdm.GetLockInfo([query.LQ_PENDING])
2106 fcb21ad7 Michael Hanselmann
      self.assertEqual(sorted(map(_MakeSet, info)), sorted([
2107 fcb21ad7 Michael Hanselmann
        ("job/%s" % job_id, None, None,
2108 fcb21ad7 Michael Hanselmann
         [("job", set([job.id for job in jobs]))])
2109 fcb21ad7 Michael Hanselmann
        for job_id, jobs in waiters.items()
2110 fcb21ad7 Michael Hanselmann
        if jobs
2111 fcb21ad7 Michael Hanselmann
        ]))
2112 fcb21ad7 Michael Hanselmann
2113 fcb21ad7 Michael Hanselmann
    _CheckLockInfo()
2114 fcb21ad7 Michael Hanselmann
2115 fcb21ad7 Michael Hanselmann
    # Notify in random order
2116 fcb21ad7 Michael Hanselmann
    for job_id in rnd.sample(waiters, len(waiters)):
2117 fcb21ad7 Michael Hanselmann
      # Remove from pending waiter list
2118 fcb21ad7 Michael Hanselmann
      jobs = waiters.pop(job_id)
2119 fcb21ad7 Michael Hanselmann
      for job in jobs:
2120 fcb21ad7 Michael Hanselmann
        self._status.append((job_id, constants.JOB_STATUS_SUCCESS))
2121 fcb21ad7 Michael Hanselmann
        (result, _) = self.jdm.CheckAndRegister(job, job_id,
2122 fcb21ad7 Michael Hanselmann
                                                [constants.JOB_STATUS_SUCCESS])
2123 fcb21ad7 Michael Hanselmann
        self.assertEqual(result, self.jdm.CONTINUE)
2124 fcb21ad7 Michael Hanselmann
        self.assertFalse(self._status)
2125 fcb21ad7 Michael Hanselmann
        self.assertFalse(self._queue)
2126 fcb21ad7 Michael Hanselmann
        self.assertFalse(self.jdm.JobWaiting(job))
2127 fcb21ad7 Michael Hanselmann
2128 fcb21ad7 Michael Hanselmann
      _CheckLockInfo()
2129 fcb21ad7 Michael Hanselmann
2130 fcb21ad7 Michael Hanselmann
    self.assertFalse(self.jdm.GetLockInfo([query.LQ_PENDING]))
2131 fcb21ad7 Michael Hanselmann
2132 fcb21ad7 Michael Hanselmann
    assert not waiters
2133 fcb21ad7 Michael Hanselmann
2134 b95479a5 Michael Hanselmann
  def testSelfDependency(self):
2135 b95479a5 Michael Hanselmann
    job = self._FakeJob(18937)
2136 b95479a5 Michael Hanselmann
2137 b95479a5 Michael Hanselmann
    self._status.append((job.id, constants.JOB_STATUS_SUCCESS))
2138 b95479a5 Michael Hanselmann
    (result, _) = self.jdm.CheckAndRegister(job, job.id, [])
2139 b95479a5 Michael Hanselmann
    self.assertEqual(result, self.jdm.ERROR)
2140 b95479a5 Michael Hanselmann
2141 b95479a5 Michael Hanselmann
  def testJobDisappears(self):
2142 b95479a5 Michael Hanselmann
    job = self._FakeJob(30540)
2143 b95479a5 Michael Hanselmann
    job_id = str(23769)
2144 b95479a5 Michael Hanselmann
2145 b95479a5 Michael Hanselmann
    def _FakeStatus(_):
2146 b95479a5 Michael Hanselmann
      raise errors.JobLost("#msg#")
2147 b95479a5 Michael Hanselmann
2148 b95479a5 Michael Hanselmann
    jdm = jqueue._JobDependencyManager(_FakeStatus, None)
2149 b95479a5 Michael Hanselmann
    (result, _) = jdm.CheckAndRegister(job, job_id, [])
2150 b95479a5 Michael Hanselmann
    self.assertEqual(result, self.jdm.ERROR)
2151 b95479a5 Michael Hanselmann
    self.assertFalse(jdm.JobWaiting(job))
2152 fcb21ad7 Michael Hanselmann
    self.assertFalse(jdm.GetLockInfo([query.LQ_PENDING]))
2153 b95479a5 Michael Hanselmann
2154 b95479a5 Michael Hanselmann
2155 989a8bee Michael Hanselmann
if __name__ == "__main__":
2156 989a8bee Michael Hanselmann
  testutils.GanetiTestProgram()