Statistics
| Branch: | Tag: | Revision:

root / test / ganeti.jqueue_unittest.py @ c0f6d0d8

History | View | Annotate | Download (69.9 kB)

1 989a8bee Michael Hanselmann
#!/usr/bin/python
2 989a8bee Michael Hanselmann
#
3 989a8bee Michael Hanselmann
4 1e6d5750 Iustin Pop
# Copyright (C) 2010, 2011 Google Inc.
5 989a8bee Michael Hanselmann
#
6 989a8bee Michael Hanselmann
# This program is free software; you can redistribute it and/or modify
7 989a8bee Michael Hanselmann
# it under the terms of the GNU General Public License as published by
8 989a8bee Michael Hanselmann
# the Free Software Foundation; either version 2 of the License, or
9 989a8bee Michael Hanselmann
# (at your option) any later version.
10 989a8bee Michael Hanselmann
#
11 989a8bee Michael Hanselmann
# This program is distributed in the hope that it will be useful, but
12 989a8bee Michael Hanselmann
# WITHOUT ANY WARRANTY; without even the implied warranty of
13 989a8bee Michael Hanselmann
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 989a8bee Michael Hanselmann
# General Public License for more details.
15 989a8bee Michael Hanselmann
#
16 989a8bee Michael Hanselmann
# You should have received a copy of the GNU General Public License
17 989a8bee Michael Hanselmann
# along with this program; if not, write to the Free Software
18 989a8bee Michael Hanselmann
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 989a8bee Michael Hanselmann
# 02110-1301, USA.
20 989a8bee Michael Hanselmann
21 989a8bee Michael Hanselmann
22 989a8bee Michael Hanselmann
"""Script for testing ganeti.jqueue"""
23 989a8bee Michael Hanselmann
24 989a8bee Michael Hanselmann
import os
25 989a8bee Michael Hanselmann
import sys
26 989a8bee Michael Hanselmann
import unittest
27 989a8bee Michael Hanselmann
import tempfile
28 989a8bee Michael Hanselmann
import shutil
29 989a8bee Michael Hanselmann
import errno
30 26d3fd2f Michael Hanselmann
import itertools
31 989a8bee Michael Hanselmann
32 989a8bee Michael Hanselmann
from ganeti import constants
33 989a8bee Michael Hanselmann
from ganeti import utils
34 989a8bee Michael Hanselmann
from ganeti import errors
35 989a8bee Michael Hanselmann
from ganeti import jqueue
36 8f5c488d Michael Hanselmann
from ganeti import opcodes
37 8f5c488d Michael Hanselmann
from ganeti import compat
38 26d3fd2f Michael Hanselmann
from ganeti import mcpu
39 989a8bee Michael Hanselmann
40 989a8bee Michael Hanselmann
import testutils
41 989a8bee Michael Hanselmann
42 989a8bee Michael Hanselmann
43 989a8bee Michael Hanselmann
class _FakeJob:
44 989a8bee Michael Hanselmann
  def __init__(self, job_id, status):
45 989a8bee Michael Hanselmann
    self.id = job_id
46 c0f6d0d8 Michael Hanselmann
    self.writable = False
47 989a8bee Michael Hanselmann
    self._status = status
48 989a8bee Michael Hanselmann
    self._log = []
49 989a8bee Michael Hanselmann
50 989a8bee Michael Hanselmann
  def SetStatus(self, status):
51 989a8bee Michael Hanselmann
    self._status = status
52 989a8bee Michael Hanselmann
53 989a8bee Michael Hanselmann
  def AddLogEntry(self, msg):
54 989a8bee Michael Hanselmann
    self._log.append((len(self._log), msg))
55 989a8bee Michael Hanselmann
56 989a8bee Michael Hanselmann
  def CalcStatus(self):
57 989a8bee Michael Hanselmann
    return self._status
58 989a8bee Michael Hanselmann
59 989a8bee Michael Hanselmann
  def GetInfo(self, fields):
60 989a8bee Michael Hanselmann
    result = []
61 989a8bee Michael Hanselmann
62 989a8bee Michael Hanselmann
    for name in fields:
63 989a8bee Michael Hanselmann
      if name == "status":
64 989a8bee Michael Hanselmann
        result.append(self._status)
65 989a8bee Michael Hanselmann
      else:
66 989a8bee Michael Hanselmann
        raise Exception("Unknown field")
67 989a8bee Michael Hanselmann
68 989a8bee Michael Hanselmann
    return result
69 989a8bee Michael Hanselmann
70 989a8bee Michael Hanselmann
  def GetLogEntries(self, newer_than):
71 989a8bee Michael Hanselmann
    assert newer_than is None or newer_than >= 0
72 989a8bee Michael Hanselmann
73 989a8bee Michael Hanselmann
    if newer_than is None:
74 989a8bee Michael Hanselmann
      return self._log
75 989a8bee Michael Hanselmann
76 989a8bee Michael Hanselmann
    return self._log[newer_than:]
77 989a8bee Michael Hanselmann
78 989a8bee Michael Hanselmann
79 989a8bee Michael Hanselmann
class TestJobChangesChecker(unittest.TestCase):
80 989a8bee Michael Hanselmann
  def testStatus(self):
81 989a8bee Michael Hanselmann
    job = _FakeJob(9094, constants.JOB_STATUS_QUEUED)
82 989a8bee Michael Hanselmann
    checker = jqueue._JobChangesChecker(["status"], None, None)
83 989a8bee Michael Hanselmann
    self.assertEqual(checker(job), ([constants.JOB_STATUS_QUEUED], []))
84 989a8bee Michael Hanselmann
85 989a8bee Michael Hanselmann
    job.SetStatus(constants.JOB_STATUS_RUNNING)
86 989a8bee Michael Hanselmann
    self.assertEqual(checker(job), ([constants.JOB_STATUS_RUNNING], []))
87 989a8bee Michael Hanselmann
88 989a8bee Michael Hanselmann
    job.SetStatus(constants.JOB_STATUS_SUCCESS)
89 989a8bee Michael Hanselmann
    self.assertEqual(checker(job), ([constants.JOB_STATUS_SUCCESS], []))
90 989a8bee Michael Hanselmann
91 989a8bee Michael Hanselmann
    # job.id is used by checker
92 989a8bee Michael Hanselmann
    self.assertEqual(job.id, 9094)
93 989a8bee Michael Hanselmann
94 989a8bee Michael Hanselmann
  def testStatusWithPrev(self):
95 989a8bee Michael Hanselmann
    job = _FakeJob(12807, constants.JOB_STATUS_QUEUED)
96 989a8bee Michael Hanselmann
    checker = jqueue._JobChangesChecker(["status"],
97 989a8bee Michael Hanselmann
                                        [constants.JOB_STATUS_QUEUED], None)
98 989a8bee Michael Hanselmann
    self.assert_(checker(job) is None)
99 989a8bee Michael Hanselmann
100 989a8bee Michael Hanselmann
    job.SetStatus(constants.JOB_STATUS_RUNNING)
101 989a8bee Michael Hanselmann
    self.assertEqual(checker(job), ([constants.JOB_STATUS_RUNNING], []))
102 989a8bee Michael Hanselmann
103 989a8bee Michael Hanselmann
  def testFinalStatus(self):
104 989a8bee Michael Hanselmann
    for status in constants.JOBS_FINALIZED:
105 989a8bee Michael Hanselmann
      job = _FakeJob(2178711, status)
106 989a8bee Michael Hanselmann
      checker = jqueue._JobChangesChecker(["status"], [status], None)
107 989a8bee Michael Hanselmann
      # There won't be any changes in this status, hence it should signal
108 989a8bee Michael Hanselmann
      # a change immediately
109 989a8bee Michael Hanselmann
      self.assertEqual(checker(job), ([status], []))
110 989a8bee Michael Hanselmann
111 989a8bee Michael Hanselmann
  def testLog(self):
112 989a8bee Michael Hanselmann
    job = _FakeJob(9094, constants.JOB_STATUS_RUNNING)
113 989a8bee Michael Hanselmann
    checker = jqueue._JobChangesChecker(["status"], None, None)
114 989a8bee Michael Hanselmann
    self.assertEqual(checker(job), ([constants.JOB_STATUS_RUNNING], []))
115 989a8bee Michael Hanselmann
116 989a8bee Michael Hanselmann
    job.AddLogEntry("Hello World")
117 989a8bee Michael Hanselmann
    (job_info, log_entries) = checker(job)
118 989a8bee Michael Hanselmann
    self.assertEqual(job_info, [constants.JOB_STATUS_RUNNING])
119 989a8bee Michael Hanselmann
    self.assertEqual(log_entries, [[0, "Hello World"]])
120 989a8bee Michael Hanselmann
121 989a8bee Michael Hanselmann
    checker2 = jqueue._JobChangesChecker(["status"], job_info, len(log_entries))
122 989a8bee Michael Hanselmann
    self.assert_(checker2(job) is None)
123 989a8bee Michael Hanselmann
124 989a8bee Michael Hanselmann
    job.AddLogEntry("Foo Bar")
125 989a8bee Michael Hanselmann
    job.SetStatus(constants.JOB_STATUS_ERROR)
126 989a8bee Michael Hanselmann
127 989a8bee Michael Hanselmann
    (job_info, log_entries) = checker2(job)
128 989a8bee Michael Hanselmann
    self.assertEqual(job_info, [constants.JOB_STATUS_ERROR])
129 989a8bee Michael Hanselmann
    self.assertEqual(log_entries, [[1, "Foo Bar"]])
130 989a8bee Michael Hanselmann
131 989a8bee Michael Hanselmann
    checker3 = jqueue._JobChangesChecker(["status"], None, None)
132 989a8bee Michael Hanselmann
    (job_info, log_entries) = checker3(job)
133 989a8bee Michael Hanselmann
    self.assertEqual(job_info, [constants.JOB_STATUS_ERROR])
134 989a8bee Michael Hanselmann
    self.assertEqual(log_entries, [[0, "Hello World"], [1, "Foo Bar"]])
135 989a8bee Michael Hanselmann
136 989a8bee Michael Hanselmann
137 989a8bee Michael Hanselmann
class TestJobChangesWaiter(unittest.TestCase):
138 989a8bee Michael Hanselmann
  def setUp(self):
139 989a8bee Michael Hanselmann
    self.tmpdir = tempfile.mkdtemp()
140 989a8bee Michael Hanselmann
    self.filename = utils.PathJoin(self.tmpdir, "job-1")
141 989a8bee Michael Hanselmann
    utils.WriteFile(self.filename, data="")
142 989a8bee Michael Hanselmann
143 989a8bee Michael Hanselmann
  def tearDown(self):
144 989a8bee Michael Hanselmann
    shutil.rmtree(self.tmpdir)
145 989a8bee Michael Hanselmann
146 989a8bee Michael Hanselmann
  def _EnsureNotifierClosed(self, notifier):
147 989a8bee Michael Hanselmann
    try:
148 989a8bee Michael Hanselmann
      os.fstat(notifier._fd)
149 989a8bee Michael Hanselmann
    except EnvironmentError, err:
150 989a8bee Michael Hanselmann
      self.assertEqual(err.errno, errno.EBADF)
151 989a8bee Michael Hanselmann
    else:
152 989a8bee Michael Hanselmann
      self.fail("File descriptor wasn't closed")
153 989a8bee Michael Hanselmann
154 989a8bee Michael Hanselmann
  def testClose(self):
155 989a8bee Michael Hanselmann
    for wait in [False, True]:
156 989a8bee Michael Hanselmann
      waiter = jqueue._JobFileChangesWaiter(self.filename)
157 989a8bee Michael Hanselmann
      try:
158 989a8bee Michael Hanselmann
        if wait:
159 989a8bee Michael Hanselmann
          waiter.Wait(0.001)
160 989a8bee Michael Hanselmann
      finally:
161 989a8bee Michael Hanselmann
        waiter.Close()
162 989a8bee Michael Hanselmann
163 989a8bee Michael Hanselmann
      # Ensure file descriptor was closed
164 989a8bee Michael Hanselmann
      self._EnsureNotifierClosed(waiter._notifier)
165 989a8bee Michael Hanselmann
166 989a8bee Michael Hanselmann
  def testChangingFile(self):
167 989a8bee Michael Hanselmann
    waiter = jqueue._JobFileChangesWaiter(self.filename)
168 989a8bee Michael Hanselmann
    try:
169 989a8bee Michael Hanselmann
      self.assertFalse(waiter.Wait(0.1))
170 989a8bee Michael Hanselmann
      utils.WriteFile(self.filename, data="changed")
171 989a8bee Michael Hanselmann
      self.assert_(waiter.Wait(60))
172 989a8bee Michael Hanselmann
    finally:
173 989a8bee Michael Hanselmann
      waiter.Close()
174 989a8bee Michael Hanselmann
175 989a8bee Michael Hanselmann
    self._EnsureNotifierClosed(waiter._notifier)
176 989a8bee Michael Hanselmann
177 989a8bee Michael Hanselmann
  def testChangingFile2(self):
178 989a8bee Michael Hanselmann
    waiter = jqueue._JobChangesWaiter(self.filename)
179 989a8bee Michael Hanselmann
    try:
180 989a8bee Michael Hanselmann
      self.assertFalse(waiter._filewaiter)
181 989a8bee Michael Hanselmann
      self.assert_(waiter.Wait(0.1))
182 989a8bee Michael Hanselmann
      self.assert_(waiter._filewaiter)
183 989a8bee Michael Hanselmann
184 989a8bee Michael Hanselmann
      # File waiter is now used, but there have been no changes
185 989a8bee Michael Hanselmann
      self.assertFalse(waiter.Wait(0.1))
186 989a8bee Michael Hanselmann
      utils.WriteFile(self.filename, data="changed")
187 989a8bee Michael Hanselmann
      self.assert_(waiter.Wait(60))
188 989a8bee Michael Hanselmann
    finally:
189 989a8bee Michael Hanselmann
      waiter.Close()
190 989a8bee Michael Hanselmann
191 989a8bee Michael Hanselmann
    self._EnsureNotifierClosed(waiter._filewaiter._notifier)
192 989a8bee Michael Hanselmann
193 989a8bee Michael Hanselmann
194 989a8bee Michael Hanselmann
class TestWaitForJobChangesHelper(unittest.TestCase):
195 989a8bee Michael Hanselmann
  def setUp(self):
196 989a8bee Michael Hanselmann
    self.tmpdir = tempfile.mkdtemp()
197 989a8bee Michael Hanselmann
    self.filename = utils.PathJoin(self.tmpdir, "job-2614226563")
198 989a8bee Michael Hanselmann
    utils.WriteFile(self.filename, data="")
199 989a8bee Michael Hanselmann
200 989a8bee Michael Hanselmann
  def tearDown(self):
201 989a8bee Michael Hanselmann
    shutil.rmtree(self.tmpdir)
202 989a8bee Michael Hanselmann
203 989a8bee Michael Hanselmann
  def _LoadWaitingJob(self):
204 989a8bee Michael Hanselmann
    return _FakeJob(2614226563, constants.JOB_STATUS_WAITLOCK)
205 989a8bee Michael Hanselmann
206 989a8bee Michael Hanselmann
  def _LoadLostJob(self):
207 989a8bee Michael Hanselmann
    return None
208 989a8bee Michael Hanselmann
209 989a8bee Michael Hanselmann
  def testNoChanges(self):
210 989a8bee Michael Hanselmann
    wfjc = jqueue._WaitForJobChangesHelper()
211 989a8bee Michael Hanselmann
212 989a8bee Michael Hanselmann
    # No change
213 989a8bee Michael Hanselmann
    self.assertEqual(wfjc(self.filename, self._LoadWaitingJob, ["status"],
214 989a8bee Michael Hanselmann
                          [constants.JOB_STATUS_WAITLOCK], None, 0.1),
215 989a8bee Michael Hanselmann
                     constants.JOB_NOTCHANGED)
216 989a8bee Michael Hanselmann
217 989a8bee Michael Hanselmann
    # No previous information
218 989a8bee Michael Hanselmann
    self.assertEqual(wfjc(self.filename, self._LoadWaitingJob,
219 989a8bee Michael Hanselmann
                          ["status"], None, None, 1.0),
220 989a8bee Michael Hanselmann
                     ([constants.JOB_STATUS_WAITLOCK], []))
221 989a8bee Michael Hanselmann
222 989a8bee Michael Hanselmann
  def testLostJob(self):
223 989a8bee Michael Hanselmann
    wfjc = jqueue._WaitForJobChangesHelper()
224 989a8bee Michael Hanselmann
    self.assert_(wfjc(self.filename, self._LoadLostJob,
225 989a8bee Michael Hanselmann
                      ["status"], None, None, 1.0) is None)
226 989a8bee Michael Hanselmann
227 989a8bee Michael Hanselmann
228 6760e4ed Michael Hanselmann
class TestEncodeOpError(unittest.TestCase):
229 6760e4ed Michael Hanselmann
  def test(self):
230 6760e4ed Michael Hanselmann
    encerr = jqueue._EncodeOpError(errors.LockError("Test 1"))
231 6760e4ed Michael Hanselmann
    self.assert_(isinstance(encerr, tuple))
232 6760e4ed Michael Hanselmann
    self.assertRaises(errors.LockError, errors.MaybeRaise, encerr)
233 6760e4ed Michael Hanselmann
234 6760e4ed Michael Hanselmann
    encerr = jqueue._EncodeOpError(errors.GenericError("Test 2"))
235 6760e4ed Michael Hanselmann
    self.assert_(isinstance(encerr, tuple))
236 6760e4ed Michael Hanselmann
    self.assertRaises(errors.GenericError, errors.MaybeRaise, encerr)
237 6760e4ed Michael Hanselmann
238 6760e4ed Michael Hanselmann
    encerr = jqueue._EncodeOpError(NotImplementedError("Foo"))
239 6760e4ed Michael Hanselmann
    self.assert_(isinstance(encerr, tuple))
240 6760e4ed Michael Hanselmann
    self.assertRaises(errors.OpExecError, errors.MaybeRaise, encerr)
241 6760e4ed Michael Hanselmann
242 6760e4ed Michael Hanselmann
    encerr = jqueue._EncodeOpError("Hello World")
243 6760e4ed Michael Hanselmann
    self.assert_(isinstance(encerr, tuple))
244 6760e4ed Michael Hanselmann
    self.assertRaises(errors.OpExecError, errors.MaybeRaise, encerr)
245 6760e4ed Michael Hanselmann
246 6760e4ed Michael Hanselmann
247 8f5c488d Michael Hanselmann
class TestQueuedOpCode(unittest.TestCase):
248 8f5c488d Michael Hanselmann
  def testDefaults(self):
249 8f5c488d Michael Hanselmann
    def _Check(op):
250 8f5c488d Michael Hanselmann
      self.assertFalse(hasattr(op.input, "dry_run"))
251 8f5c488d Michael Hanselmann
      self.assertEqual(op.priority, constants.OP_PRIO_DEFAULT)
252 8f5c488d Michael Hanselmann
      self.assertFalse(op.log)
253 8f5c488d Michael Hanselmann
      self.assert_(op.start_timestamp is None)
254 8f5c488d Michael Hanselmann
      self.assert_(op.exec_timestamp is None)
255 8f5c488d Michael Hanselmann
      self.assert_(op.end_timestamp is None)
256 8f5c488d Michael Hanselmann
      self.assert_(op.result is None)
257 8f5c488d Michael Hanselmann
      self.assertEqual(op.status, constants.OP_STATUS_QUEUED)
258 8f5c488d Michael Hanselmann
259 8f5c488d Michael Hanselmann
    op1 = jqueue._QueuedOpCode(opcodes.OpTestDelay())
260 8f5c488d Michael Hanselmann
    _Check(op1)
261 8f5c488d Michael Hanselmann
    op2 = jqueue._QueuedOpCode.Restore(op1.Serialize())
262 8f5c488d Michael Hanselmann
    _Check(op2)
263 8f5c488d Michael Hanselmann
    self.assertEqual(op1.Serialize(), op2.Serialize())
264 8f5c488d Michael Hanselmann
265 8f5c488d Michael Hanselmann
  def testPriority(self):
266 8f5c488d Michael Hanselmann
    def _Check(op):
267 8f5c488d Michael Hanselmann
      assert constants.OP_PRIO_DEFAULT != constants.OP_PRIO_HIGH, \
268 8f5c488d Michael Hanselmann
             "Default priority equals high priority; test can't work"
269 8f5c488d Michael Hanselmann
      self.assertEqual(op.priority, constants.OP_PRIO_HIGH)
270 8f5c488d Michael Hanselmann
      self.assertEqual(op.status, constants.OP_STATUS_QUEUED)
271 8f5c488d Michael Hanselmann
272 c6afb1ca Iustin Pop
    inpop = opcodes.OpTagsGet(priority=constants.OP_PRIO_HIGH)
273 8f5c488d Michael Hanselmann
    op1 = jqueue._QueuedOpCode(inpop)
274 8f5c488d Michael Hanselmann
    _Check(op1)
275 8f5c488d Michael Hanselmann
    op2 = jqueue._QueuedOpCode.Restore(op1.Serialize())
276 8f5c488d Michael Hanselmann
    _Check(op2)
277 8f5c488d Michael Hanselmann
    self.assertEqual(op1.Serialize(), op2.Serialize())
278 8f5c488d Michael Hanselmann
279 8f5c488d Michael Hanselmann
280 8f5c488d Michael Hanselmann
class TestQueuedJob(unittest.TestCase):
281 5f6b0b71 Michael Hanselmann
  def test(self):
282 5f6b0b71 Michael Hanselmann
    self.assertRaises(errors.GenericError, jqueue._QueuedJob,
283 c0f6d0d8 Michael Hanselmann
                      None, 1, [], False)
284 5f6b0b71 Michael Hanselmann
285 8f5c488d Michael Hanselmann
  def testDefaults(self):
286 8f5c488d Michael Hanselmann
    job_id = 4260
287 8f5c488d Michael Hanselmann
    ops = [
288 c6afb1ca Iustin Pop
      opcodes.OpTagsGet(),
289 8f5c488d Michael Hanselmann
      opcodes.OpTestDelay(),
290 8f5c488d Michael Hanselmann
      ]
291 8f5c488d Michael Hanselmann
292 8f5c488d Michael Hanselmann
    def _Check(job):
293 c0f6d0d8 Michael Hanselmann
      self.assertTrue(job.writable)
294 8f5c488d Michael Hanselmann
      self.assertEqual(job.id, job_id)
295 8f5c488d Michael Hanselmann
      self.assertEqual(job.log_serial, 0)
296 8f5c488d Michael Hanselmann
      self.assert_(job.received_timestamp)
297 8f5c488d Michael Hanselmann
      self.assert_(job.start_timestamp is None)
298 8f5c488d Michael Hanselmann
      self.assert_(job.end_timestamp is None)
299 8f5c488d Michael Hanselmann
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
300 8f5c488d Michael Hanselmann
      self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT)
301 8f5c488d Michael Hanselmann
      self.assert_(repr(job).startswith("<"))
302 8f5c488d Michael Hanselmann
      self.assertEqual(len(job.ops), len(ops))
303 8f5c488d Michael Hanselmann
      self.assert_(compat.all(inp.__getstate__() == op.input.__getstate__()
304 8f5c488d Michael Hanselmann
                              for (inp, op) in zip(ops, job.ops)))
305 5f6b0b71 Michael Hanselmann
      self.assertRaises(errors.OpExecError, job.GetInfo,
306 5f6b0b71 Michael Hanselmann
                        ["unknown-field"])
307 5f6b0b71 Michael Hanselmann
      self.assertEqual(job.GetInfo(["summary"]),
308 5f6b0b71 Michael Hanselmann
                       [[op.input.Summary() for op in job.ops]])
309 8f5c488d Michael Hanselmann
310 c0f6d0d8 Michael Hanselmann
    job1 = jqueue._QueuedJob(None, job_id, ops, True)
311 8f5c488d Michael Hanselmann
    _Check(job1)
312 c0f6d0d8 Michael Hanselmann
    job2 = jqueue._QueuedJob.Restore(None, job1.Serialize(), True)
313 8f5c488d Michael Hanselmann
    _Check(job2)
314 8f5c488d Michael Hanselmann
    self.assertEqual(job1.Serialize(), job2.Serialize())
315 8f5c488d Michael Hanselmann
316 c0f6d0d8 Michael Hanselmann
  def testWritable(self):
317 c0f6d0d8 Michael Hanselmann
    job = jqueue._QueuedJob(None, 1, [opcodes.OpTestDelay()], False)
318 c0f6d0d8 Michael Hanselmann
    self.assertFalse(job.writable)
319 c0f6d0d8 Michael Hanselmann
320 c0f6d0d8 Michael Hanselmann
    job = jqueue._QueuedJob(None, 1, [opcodes.OpTestDelay()], True)
321 c0f6d0d8 Michael Hanselmann
    self.assertTrue(job.writable)
322 c0f6d0d8 Michael Hanselmann
323 8f5c488d Michael Hanselmann
  def testPriority(self):
324 8f5c488d Michael Hanselmann
    job_id = 4283
325 8f5c488d Michael Hanselmann
    ops = [
326 c6afb1ca Iustin Pop
      opcodes.OpTagsGet(priority=constants.OP_PRIO_DEFAULT),
327 8f5c488d Michael Hanselmann
      opcodes.OpTestDelay(),
328 8f5c488d Michael Hanselmann
      ]
329 8f5c488d Michael Hanselmann
330 8f5c488d Michael Hanselmann
    def _Check(job):
331 8f5c488d Michael Hanselmann
      self.assertEqual(job.id, job_id)
332 8f5c488d Michael Hanselmann
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
333 8f5c488d Michael Hanselmann
      self.assert_(repr(job).startswith("<"))
334 8f5c488d Michael Hanselmann
335 c0f6d0d8 Michael Hanselmann
    job = jqueue._QueuedJob(None, job_id, ops, True)
336 8f5c488d Michael Hanselmann
    _Check(job)
337 8f5c488d Michael Hanselmann
    self.assert_(compat.all(op.priority == constants.OP_PRIO_DEFAULT
338 8f5c488d Michael Hanselmann
                            for op in job.ops))
339 8f5c488d Michael Hanselmann
    self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT)
340 8f5c488d Michael Hanselmann
341 8f5c488d Michael Hanselmann
    # Increase first
342 8f5c488d Michael Hanselmann
    job.ops[0].priority -= 1
343 8f5c488d Michael Hanselmann
    _Check(job)
344 8f5c488d Michael Hanselmann
    self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT - 1)
345 8f5c488d Michael Hanselmann
346 8f5c488d Michael Hanselmann
    # Mark opcode as finished
347 8f5c488d Michael Hanselmann
    job.ops[0].status = constants.OP_STATUS_SUCCESS
348 8f5c488d Michael Hanselmann
    _Check(job)
349 8f5c488d Michael Hanselmann
    self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT)
350 8f5c488d Michael Hanselmann
351 8f5c488d Michael Hanselmann
    # Increase second
352 8f5c488d Michael Hanselmann
    job.ops[1].priority -= 10
353 8f5c488d Michael Hanselmann
    self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT - 10)
354 8f5c488d Michael Hanselmann
355 8f5c488d Michael Hanselmann
    # Test increasing first
356 8f5c488d Michael Hanselmann
    job.ops[0].status = constants.OP_STATUS_RUNNING
357 8f5c488d Michael Hanselmann
    job.ops[0].priority -= 19
358 8f5c488d Michael Hanselmann
    self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT - 20)
359 8f5c488d Michael Hanselmann
360 db5bce34 Michael Hanselmann
  def testCalcStatus(self):
361 db5bce34 Michael Hanselmann
    def _Queued(ops):
362 db5bce34 Michael Hanselmann
      # The default status is "queued"
363 db5bce34 Michael Hanselmann
      self.assert_(compat.all(op.status == constants.OP_STATUS_QUEUED
364 db5bce34 Michael Hanselmann
                              for op in ops))
365 db5bce34 Michael Hanselmann
366 db5bce34 Michael Hanselmann
    def _Waitlock1(ops):
367 db5bce34 Michael Hanselmann
      ops[0].status = constants.OP_STATUS_WAITLOCK
368 db5bce34 Michael Hanselmann
369 db5bce34 Michael Hanselmann
    def _Waitlock2(ops):
370 db5bce34 Michael Hanselmann
      ops[0].status = constants.OP_STATUS_SUCCESS
371 db5bce34 Michael Hanselmann
      ops[1].status = constants.OP_STATUS_SUCCESS
372 db5bce34 Michael Hanselmann
      ops[2].status = constants.OP_STATUS_WAITLOCK
373 db5bce34 Michael Hanselmann
374 db5bce34 Michael Hanselmann
    def _Running(ops):
375 db5bce34 Michael Hanselmann
      ops[0].status = constants.OP_STATUS_SUCCESS
376 db5bce34 Michael Hanselmann
      ops[1].status = constants.OP_STATUS_RUNNING
377 db5bce34 Michael Hanselmann
      for op in ops[2:]:
378 db5bce34 Michael Hanselmann
        op.status = constants.OP_STATUS_QUEUED
379 db5bce34 Michael Hanselmann
380 db5bce34 Michael Hanselmann
    def _Canceling1(ops):
381 db5bce34 Michael Hanselmann
      ops[0].status = constants.OP_STATUS_SUCCESS
382 db5bce34 Michael Hanselmann
      ops[1].status = constants.OP_STATUS_SUCCESS
383 db5bce34 Michael Hanselmann
      for op in ops[2:]:
384 db5bce34 Michael Hanselmann
        op.status = constants.OP_STATUS_CANCELING
385 db5bce34 Michael Hanselmann
386 db5bce34 Michael Hanselmann
    def _Canceling2(ops):
387 db5bce34 Michael Hanselmann
      for op in ops:
388 db5bce34 Michael Hanselmann
        op.status = constants.OP_STATUS_CANCELING
389 db5bce34 Michael Hanselmann
390 db5bce34 Michael Hanselmann
    def _Canceled(ops):
391 db5bce34 Michael Hanselmann
      for op in ops:
392 db5bce34 Michael Hanselmann
        op.status = constants.OP_STATUS_CANCELED
393 db5bce34 Michael Hanselmann
394 db5bce34 Michael Hanselmann
    def _Error1(ops):
395 db5bce34 Michael Hanselmann
      for idx, op in enumerate(ops):
396 db5bce34 Michael Hanselmann
        if idx > 3:
397 db5bce34 Michael Hanselmann
          op.status = constants.OP_STATUS_ERROR
398 db5bce34 Michael Hanselmann
        else:
399 db5bce34 Michael Hanselmann
          op.status = constants.OP_STATUS_SUCCESS
400 db5bce34 Michael Hanselmann
401 db5bce34 Michael Hanselmann
    def _Error2(ops):
402 db5bce34 Michael Hanselmann
      for op in ops:
403 db5bce34 Michael Hanselmann
        op.status = constants.OP_STATUS_ERROR
404 db5bce34 Michael Hanselmann
405 db5bce34 Michael Hanselmann
    def _Success(ops):
406 db5bce34 Michael Hanselmann
      for op in ops:
407 db5bce34 Michael Hanselmann
        op.status = constants.OP_STATUS_SUCCESS
408 db5bce34 Michael Hanselmann
409 db5bce34 Michael Hanselmann
    tests = {
410 db5bce34 Michael Hanselmann
      constants.JOB_STATUS_QUEUED: [_Queued],
411 db5bce34 Michael Hanselmann
      constants.JOB_STATUS_WAITLOCK: [_Waitlock1, _Waitlock2],
412 db5bce34 Michael Hanselmann
      constants.JOB_STATUS_RUNNING: [_Running],
413 db5bce34 Michael Hanselmann
      constants.JOB_STATUS_CANCELING: [_Canceling1, _Canceling2],
414 db5bce34 Michael Hanselmann
      constants.JOB_STATUS_CANCELED: [_Canceled],
415 db5bce34 Michael Hanselmann
      constants.JOB_STATUS_ERROR: [_Error1, _Error2],
416 db5bce34 Michael Hanselmann
      constants.JOB_STATUS_SUCCESS: [_Success],
417 db5bce34 Michael Hanselmann
      }
418 db5bce34 Michael Hanselmann
419 db5bce34 Michael Hanselmann
    def _NewJob():
420 db5bce34 Michael Hanselmann
      job = jqueue._QueuedJob(None, 1,
421 c0f6d0d8 Michael Hanselmann
                              [opcodes.OpTestDelay() for _ in range(10)],
422 c0f6d0d8 Michael Hanselmann
                              True)
423 db5bce34 Michael Hanselmann
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
424 db5bce34 Michael Hanselmann
      self.assert_(compat.all(op.status == constants.OP_STATUS_QUEUED
425 db5bce34 Michael Hanselmann
                              for op in job.ops))
426 db5bce34 Michael Hanselmann
      return job
427 db5bce34 Michael Hanselmann
428 db5bce34 Michael Hanselmann
    for status in constants.JOB_STATUS_ALL:
429 db5bce34 Michael Hanselmann
      sttests = tests[status]
430 db5bce34 Michael Hanselmann
      assert sttests
431 db5bce34 Michael Hanselmann
      for fn in sttests:
432 db5bce34 Michael Hanselmann
        job = _NewJob()
433 db5bce34 Michael Hanselmann
        fn(job.ops)
434 db5bce34 Michael Hanselmann
        self.assertEqual(job.CalcStatus(), status)
435 db5bce34 Michael Hanselmann
436 8f5c488d Michael Hanselmann
437 b95479a5 Michael Hanselmann
class _FakeDependencyManager:
438 be760ba8 Michael Hanselmann
  def __init__(self):
439 b95479a5 Michael Hanselmann
    self._checks = []
440 b95479a5 Michael Hanselmann
    self._notifications = []
441 b95479a5 Michael Hanselmann
    self._waiting = set()
442 b95479a5 Michael Hanselmann
443 b95479a5 Michael Hanselmann
  def AddCheckResult(self, job, dep_job_id, dep_status, result):
444 b95479a5 Michael Hanselmann
    self._checks.append((job, dep_job_id, dep_status, result))
445 b95479a5 Michael Hanselmann
446 b95479a5 Michael Hanselmann
  def CountPendingResults(self):
447 b95479a5 Michael Hanselmann
    return len(self._checks)
448 b95479a5 Michael Hanselmann
449 b95479a5 Michael Hanselmann
  def CountWaitingJobs(self):
450 b95479a5 Michael Hanselmann
    return len(self._waiting)
451 b95479a5 Michael Hanselmann
452 b95479a5 Michael Hanselmann
  def GetNextNotification(self):
453 b95479a5 Michael Hanselmann
    return self._notifications.pop(0)
454 b95479a5 Michael Hanselmann
455 b95479a5 Michael Hanselmann
  def JobWaiting(self, job):
456 b95479a5 Michael Hanselmann
    return job in self._waiting
457 b95479a5 Michael Hanselmann
458 b95479a5 Michael Hanselmann
  def CheckAndRegister(self, job, dep_job_id, dep_status):
459 b95479a5 Michael Hanselmann
    (exp_job, exp_dep_job_id, exp_dep_status, result) = self._checks.pop(0)
460 b95479a5 Michael Hanselmann
461 b95479a5 Michael Hanselmann
    assert exp_job == job
462 b95479a5 Michael Hanselmann
    assert exp_dep_job_id == dep_job_id
463 b95479a5 Michael Hanselmann
    assert exp_dep_status == dep_status
464 b95479a5 Michael Hanselmann
465 b95479a5 Michael Hanselmann
    (result_status, _) = result
466 b95479a5 Michael Hanselmann
467 b95479a5 Michael Hanselmann
    if result_status == jqueue._JobDependencyManager.WAIT:
468 b95479a5 Michael Hanselmann
      self._waiting.add(job)
469 b95479a5 Michael Hanselmann
    elif result_status == jqueue._JobDependencyManager.CONTINUE:
470 b95479a5 Michael Hanselmann
      self._waiting.remove(job)
471 b95479a5 Michael Hanselmann
472 b95479a5 Michael Hanselmann
    return result
473 b95479a5 Michael Hanselmann
474 b95479a5 Michael Hanselmann
  def NotifyWaiters(self, job_id):
475 b95479a5 Michael Hanselmann
    self._notifications.append(job_id)
476 b95479a5 Michael Hanselmann
477 b95479a5 Michael Hanselmann
478 b95479a5 Michael Hanselmann
class _DisabledFakeDependencyManager:
479 b95479a5 Michael Hanselmann
  def JobWaiting(self, _):
480 b95479a5 Michael Hanselmann
    return False
481 b95479a5 Michael Hanselmann
482 b95479a5 Michael Hanselmann
  def CheckAndRegister(self, *args):
483 b95479a5 Michael Hanselmann
    assert False, "Should not be called"
484 b95479a5 Michael Hanselmann
485 b95479a5 Michael Hanselmann
  def NotifyWaiters(self, _):
486 b95479a5 Michael Hanselmann
    pass
487 b95479a5 Michael Hanselmann
488 b95479a5 Michael Hanselmann
489 b95479a5 Michael Hanselmann
class _FakeQueueForProc:
490 b95479a5 Michael Hanselmann
  def __init__(self, depmgr=None):
491 be760ba8 Michael Hanselmann
    self._acquired = False
492 ebb2a2a3 Michael Hanselmann
    self._updates = []
493 6a373640 Michael Hanselmann
    self._submitted = []
494 6a373640 Michael Hanselmann
495 6a373640 Michael Hanselmann
    self._submit_count = itertools.count(1000)
496 be760ba8 Michael Hanselmann
497 b95479a5 Michael Hanselmann
    if depmgr:
498 b95479a5 Michael Hanselmann
      self.depmgr = depmgr
499 b95479a5 Michael Hanselmann
    else:
500 b95479a5 Michael Hanselmann
      self.depmgr = _DisabledFakeDependencyManager()
501 b95479a5 Michael Hanselmann
502 be760ba8 Michael Hanselmann
  def IsAcquired(self):
503 be760ba8 Michael Hanselmann
    return self._acquired
504 be760ba8 Michael Hanselmann
505 ebb2a2a3 Michael Hanselmann
  def GetNextUpdate(self):
506 ebb2a2a3 Michael Hanselmann
    return self._updates.pop(0)
507 ebb2a2a3 Michael Hanselmann
508 6a373640 Michael Hanselmann
  def GetNextSubmittedJob(self):
509 6a373640 Michael Hanselmann
    return self._submitted.pop(0)
510 6a373640 Michael Hanselmann
511 be760ba8 Michael Hanselmann
  def acquire(self, shared=0):
512 be760ba8 Michael Hanselmann
    assert shared == 1
513 be760ba8 Michael Hanselmann
    self._acquired = True
514 be760ba8 Michael Hanselmann
515 be760ba8 Michael Hanselmann
  def release(self):
516 be760ba8 Michael Hanselmann
    assert self._acquired
517 be760ba8 Michael Hanselmann
    self._acquired = False
518 be760ba8 Michael Hanselmann
519 ebb2a2a3 Michael Hanselmann
  def UpdateJobUnlocked(self, job, replicate=True):
520 ebb2a2a3 Michael Hanselmann
    assert self._acquired, "Lock not acquired while updating job"
521 ebb2a2a3 Michael Hanselmann
    self._updates.append((job, bool(replicate)))
522 be760ba8 Michael Hanselmann
523 6a373640 Michael Hanselmann
  def SubmitManyJobs(self, jobs):
524 6a373640 Michael Hanselmann
    assert not self._acquired, "Lock acquired while submitting jobs"
525 6a373640 Michael Hanselmann
    job_ids = [self._submit_count.next() for _ in jobs]
526 6a373640 Michael Hanselmann
    self._submitted.extend(zip(job_ids, jobs))
527 6a373640 Michael Hanselmann
    return job_ids
528 6a373640 Michael Hanselmann
529 be760ba8 Michael Hanselmann
530 be760ba8 Michael Hanselmann
class _FakeExecOpCodeForProc:
531 ebb2a2a3 Michael Hanselmann
  def __init__(self, queue, before_start, after_start):
532 ebb2a2a3 Michael Hanselmann
    self._queue = queue
533 be760ba8 Michael Hanselmann
    self._before_start = before_start
534 be760ba8 Michael Hanselmann
    self._after_start = after_start
535 be760ba8 Michael Hanselmann
536 f23db633 Michael Hanselmann
  def __call__(self, op, cbs, timeout=None, priority=None):
537 be760ba8 Michael Hanselmann
    assert isinstance(op, opcodes.OpTestDummy)
538 ebb2a2a3 Michael Hanselmann
    assert not self._queue.IsAcquired(), \
539 ebb2a2a3 Michael Hanselmann
           "Queue lock not released when executing opcode"
540 be760ba8 Michael Hanselmann
541 be760ba8 Michael Hanselmann
    if self._before_start:
542 f23db633 Michael Hanselmann
      self._before_start(timeout, priority)
543 be760ba8 Michael Hanselmann
544 be760ba8 Michael Hanselmann
    cbs.NotifyStart()
545 be760ba8 Michael Hanselmann
546 be760ba8 Michael Hanselmann
    if self._after_start:
547 be760ba8 Michael Hanselmann
      self._after_start(op, cbs)
548 be760ba8 Michael Hanselmann
549 ebb2a2a3 Michael Hanselmann
    # Check again after the callbacks
550 ebb2a2a3 Michael Hanselmann
    assert not self._queue.IsAcquired()
551 ebb2a2a3 Michael Hanselmann
552 be760ba8 Michael Hanselmann
    if op.fail:
553 be760ba8 Michael Hanselmann
      raise errors.OpExecError("Error requested (%s)" % op.result)
554 be760ba8 Michael Hanselmann
555 6a373640 Michael Hanselmann
    if hasattr(op, "submit_jobs") and op.submit_jobs is not None:
556 6a373640 Michael Hanselmann
      return cbs.SubmitManyJobs(op.submit_jobs)
557 6a373640 Michael Hanselmann
558 be760ba8 Michael Hanselmann
    return op.result
559 be760ba8 Michael Hanselmann
560 be760ba8 Michael Hanselmann
561 26d3fd2f Michael Hanselmann
class _JobProcessorTestUtils:
562 be760ba8 Michael Hanselmann
  def _CreateJob(self, queue, job_id, ops):
563 c0f6d0d8 Michael Hanselmann
    job = jqueue._QueuedJob(queue, job_id, ops, True)
564 be760ba8 Michael Hanselmann
    self.assertFalse(job.start_timestamp)
565 be760ba8 Michael Hanselmann
    self.assertFalse(job.end_timestamp)
566 be760ba8 Michael Hanselmann
    self.assertEqual(len(ops), len(job.ops))
567 be760ba8 Michael Hanselmann
    self.assert_(compat.all(op.input == inp
568 be760ba8 Michael Hanselmann
                            for (op, inp) in zip(job.ops, ops)))
569 be760ba8 Michael Hanselmann
    self.assertEqual(job.GetInfo(["ops"]), [[op.__getstate__() for op in ops]])
570 be760ba8 Michael Hanselmann
    return job
571 be760ba8 Michael Hanselmann
572 26d3fd2f Michael Hanselmann
573 26d3fd2f Michael Hanselmann
class TestJobProcessor(unittest.TestCase, _JobProcessorTestUtils):
574 be760ba8 Michael Hanselmann
  def _GenericCheckJob(self, job):
575 be760ba8 Michael Hanselmann
    assert compat.all(isinstance(op.input, opcodes.OpTestDummy)
576 be760ba8 Michael Hanselmann
                      for op in job.ops)
577 be760ba8 Michael Hanselmann
578 be760ba8 Michael Hanselmann
    self.assertEqual(job.GetInfo(["opstart", "opexec", "opend"]),
579 be760ba8 Michael Hanselmann
                     [[op.start_timestamp for op in job.ops],
580 be760ba8 Michael Hanselmann
                      [op.exec_timestamp for op in job.ops],
581 be760ba8 Michael Hanselmann
                      [op.end_timestamp for op in job.ops]])
582 be760ba8 Michael Hanselmann
    self.assertEqual(job.GetInfo(["received_ts", "start_ts", "end_ts"]),
583 be760ba8 Michael Hanselmann
                     [job.received_timestamp,
584 be760ba8 Michael Hanselmann
                      job.start_timestamp,
585 be760ba8 Michael Hanselmann
                      job.end_timestamp])
586 be760ba8 Michael Hanselmann
    self.assert_(job.start_timestamp)
587 be760ba8 Michael Hanselmann
    self.assert_(job.end_timestamp)
588 be760ba8 Michael Hanselmann
    self.assertEqual(job.start_timestamp, job.ops[0].start_timestamp)
589 be760ba8 Michael Hanselmann
590 be760ba8 Michael Hanselmann
  def testSuccess(self):
591 be760ba8 Michael Hanselmann
    queue = _FakeQueueForProc()
592 be760ba8 Michael Hanselmann
593 be760ba8 Michael Hanselmann
    for (job_id, opcount) in [(25351, 1), (6637, 3),
594 be760ba8 Michael Hanselmann
                              (24644, 10), (32207, 100)]:
595 be760ba8 Michael Hanselmann
      ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
596 be760ba8 Michael Hanselmann
             for i in range(opcount)]
597 be760ba8 Michael Hanselmann
598 be760ba8 Michael Hanselmann
      # Create job
599 be760ba8 Michael Hanselmann
      job = self._CreateJob(queue, job_id, ops)
600 be760ba8 Michael Hanselmann
601 f23db633 Michael Hanselmann
      def _BeforeStart(timeout, priority):
602 ebb2a2a3 Michael Hanselmann
        self.assertEqual(queue.GetNextUpdate(), (job, True))
603 ebb2a2a3 Michael Hanselmann
        self.assertRaises(IndexError, queue.GetNextUpdate)
604 be760ba8 Michael Hanselmann
        self.assertFalse(queue.IsAcquired())
605 be760ba8 Michael Hanselmann
        self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITLOCK)
606 5fd6b694 Michael Hanselmann
        self.assertFalse(job.cur_opctx)
607 be760ba8 Michael Hanselmann
608 be760ba8 Michael Hanselmann
      def _AfterStart(op, cbs):
609 ebb2a2a3 Michael Hanselmann
        self.assertEqual(queue.GetNextUpdate(), (job, True))
610 ebb2a2a3 Michael Hanselmann
        self.assertRaises(IndexError, queue.GetNextUpdate)
611 ebb2a2a3 Michael Hanselmann
612 be760ba8 Michael Hanselmann
        self.assertFalse(queue.IsAcquired())
613 be760ba8 Michael Hanselmann
        self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
614 5fd6b694 Michael Hanselmann
        self.assertFalse(job.cur_opctx)
615 be760ba8 Michael Hanselmann
616 be760ba8 Michael Hanselmann
        # Job is running, cancelling shouldn't be possible
617 be760ba8 Michael Hanselmann
        (success, _) = job.Cancel()
618 be760ba8 Michael Hanselmann
        self.assertFalse(success)
619 be760ba8 Michael Hanselmann
620 ebb2a2a3 Michael Hanselmann
      opexec = _FakeExecOpCodeForProc(queue, _BeforeStart, _AfterStart)
621 be760ba8 Michael Hanselmann
622 be760ba8 Michael Hanselmann
      for idx in range(len(ops)):
623 ebb2a2a3 Michael Hanselmann
        self.assertRaises(IndexError, queue.GetNextUpdate)
624 be760ba8 Michael Hanselmann
        result = jqueue._JobProcessor(queue, opexec, job)()
625 ebb2a2a3 Michael Hanselmann
        self.assertEqual(queue.GetNextUpdate(), (job, True))
626 ebb2a2a3 Michael Hanselmann
        self.assertRaises(IndexError, queue.GetNextUpdate)
627 be760ba8 Michael Hanselmann
        if idx == len(ops) - 1:
628 be760ba8 Michael Hanselmann
          # Last opcode
629 be760ba8 Michael Hanselmann
          self.assert_(result)
630 be760ba8 Michael Hanselmann
        else:
631 be760ba8 Michael Hanselmann
          self.assertFalse(result)
632 be760ba8 Michael Hanselmann
633 be760ba8 Michael Hanselmann
          self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
634 be760ba8 Michael Hanselmann
          self.assert_(job.start_timestamp)
635 be760ba8 Michael Hanselmann
          self.assertFalse(job.end_timestamp)
636 be760ba8 Michael Hanselmann
637 ebb2a2a3 Michael Hanselmann
      self.assertRaises(IndexError, queue.GetNextUpdate)
638 ebb2a2a3 Michael Hanselmann
639 be760ba8 Michael Hanselmann
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_SUCCESS)
640 be760ba8 Michael Hanselmann
      self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_SUCCESS])
641 be760ba8 Michael Hanselmann
      self.assertEqual(job.GetInfo(["opresult"]),
642 be760ba8 Michael Hanselmann
                       [[op.input.result for op in job.ops]])
643 be760ba8 Michael Hanselmann
      self.assertEqual(job.GetInfo(["opstatus"]),
644 be760ba8 Michael Hanselmann
                       [len(job.ops) * [constants.OP_STATUS_SUCCESS]])
645 be760ba8 Michael Hanselmann
      self.assert_(compat.all(op.start_timestamp and op.end_timestamp
646 be760ba8 Michael Hanselmann
                              for op in job.ops))
647 be760ba8 Michael Hanselmann
648 be760ba8 Michael Hanselmann
      self._GenericCheckJob(job)
649 be760ba8 Michael Hanselmann
650 66bd7445 Michael Hanselmann
      # Calling the processor on a finished job should be a no-op
651 66bd7445 Michael Hanselmann
      self.assertTrue(jqueue._JobProcessor(queue, opexec, job)())
652 66bd7445 Michael Hanselmann
      self.assertRaises(IndexError, queue.GetNextUpdate)
653 be760ba8 Michael Hanselmann
654 be760ba8 Michael Hanselmann
  def testOpcodeError(self):
655 be760ba8 Michael Hanselmann
    queue = _FakeQueueForProc()
656 be760ba8 Michael Hanselmann
657 be760ba8 Michael Hanselmann
    testdata = [
658 be760ba8 Michael Hanselmann
      (17077, 1, 0, 0),
659 be760ba8 Michael Hanselmann
      (1782, 5, 2, 2),
660 be760ba8 Michael Hanselmann
      (18179, 10, 9, 9),
661 be760ba8 Michael Hanselmann
      (4744, 10, 3, 8),
662 be760ba8 Michael Hanselmann
      (23816, 100, 39, 45),
663 be760ba8 Michael Hanselmann
      ]
664 be760ba8 Michael Hanselmann
665 be760ba8 Michael Hanselmann
    for (job_id, opcount, failfrom, failto) in testdata:
666 be760ba8 Michael Hanselmann
      # Prepare opcodes
667 be760ba8 Michael Hanselmann
      ops = [opcodes.OpTestDummy(result="Res%s" % i,
668 be760ba8 Michael Hanselmann
                                 fail=(failfrom <= i and
669 be760ba8 Michael Hanselmann
                                       i <= failto))
670 be760ba8 Michael Hanselmann
             for i in range(opcount)]
671 be760ba8 Michael Hanselmann
672 be760ba8 Michael Hanselmann
      # Create job
673 be760ba8 Michael Hanselmann
      job = self._CreateJob(queue, job_id, ops)
674 be760ba8 Michael Hanselmann
675 ebb2a2a3 Michael Hanselmann
      opexec = _FakeExecOpCodeForProc(queue, None, None)
676 be760ba8 Michael Hanselmann
677 be760ba8 Michael Hanselmann
      for idx in range(len(ops)):
678 ebb2a2a3 Michael Hanselmann
        self.assertRaises(IndexError, queue.GetNextUpdate)
679 be760ba8 Michael Hanselmann
        result = jqueue._JobProcessor(queue, opexec, job)()
680 ebb2a2a3 Michael Hanselmann
        # queued to waitlock
681 ebb2a2a3 Michael Hanselmann
        self.assertEqual(queue.GetNextUpdate(), (job, True))
682 ebb2a2a3 Michael Hanselmann
        # waitlock to running
683 ebb2a2a3 Michael Hanselmann
        self.assertEqual(queue.GetNextUpdate(), (job, True))
684 ebb2a2a3 Michael Hanselmann
        # Opcode result
685 ebb2a2a3 Michael Hanselmann
        self.assertEqual(queue.GetNextUpdate(), (job, True))
686 ebb2a2a3 Michael Hanselmann
        self.assertRaises(IndexError, queue.GetNextUpdate)
687 be760ba8 Michael Hanselmann
688 be760ba8 Michael Hanselmann
        if idx in (failfrom, len(ops) - 1):
689 be760ba8 Michael Hanselmann
          # Last opcode
690 be760ba8 Michael Hanselmann
          self.assert_(result)
691 be760ba8 Michael Hanselmann
          break
692 be760ba8 Michael Hanselmann
693 be760ba8 Michael Hanselmann
        self.assertFalse(result)
694 be760ba8 Michael Hanselmann
695 be760ba8 Michael Hanselmann
        self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
696 be760ba8 Michael Hanselmann
697 ebb2a2a3 Michael Hanselmann
      self.assertRaises(IndexError, queue.GetNextUpdate)
698 ebb2a2a3 Michael Hanselmann
699 be760ba8 Michael Hanselmann
      # Check job status
700 be760ba8 Michael Hanselmann
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_ERROR)
701 be760ba8 Michael Hanselmann
      self.assertEqual(job.GetInfo(["id"]), [job_id])
702 be760ba8 Michael Hanselmann
      self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_ERROR])
703 be760ba8 Michael Hanselmann
704 be760ba8 Michael Hanselmann
      # Check opcode status
705 be760ba8 Michael Hanselmann
      data = zip(job.ops,
706 be760ba8 Michael Hanselmann
                 job.GetInfo(["opstatus"])[0],
707 be760ba8 Michael Hanselmann
                 job.GetInfo(["opresult"])[0])
708 be760ba8 Michael Hanselmann
709 be760ba8 Michael Hanselmann
      for idx, (op, opstatus, opresult) in enumerate(data):
710 be760ba8 Michael Hanselmann
        if idx < failfrom:
711 be760ba8 Michael Hanselmann
          assert not op.input.fail
712 be760ba8 Michael Hanselmann
          self.assertEqual(opstatus, constants.OP_STATUS_SUCCESS)
713 be760ba8 Michael Hanselmann
          self.assertEqual(opresult, op.input.result)
714 be760ba8 Michael Hanselmann
        elif idx <= failto:
715 be760ba8 Michael Hanselmann
          assert op.input.fail
716 be760ba8 Michael Hanselmann
          self.assertEqual(opstatus, constants.OP_STATUS_ERROR)
717 be760ba8 Michael Hanselmann
          self.assertRaises(errors.OpExecError, errors.MaybeRaise, opresult)
718 be760ba8 Michael Hanselmann
        else:
719 be760ba8 Michael Hanselmann
          assert not op.input.fail
720 be760ba8 Michael Hanselmann
          self.assertEqual(opstatus, constants.OP_STATUS_ERROR)
721 be760ba8 Michael Hanselmann
          self.assertRaises(errors.OpExecError, errors.MaybeRaise, opresult)
722 be760ba8 Michael Hanselmann
723 be760ba8 Michael Hanselmann
      self.assert_(compat.all(op.start_timestamp and op.end_timestamp
724 be760ba8 Michael Hanselmann
                              for op in job.ops[:failfrom]))
725 be760ba8 Michael Hanselmann
726 be760ba8 Michael Hanselmann
      self._GenericCheckJob(job)
727 be760ba8 Michael Hanselmann
728 66bd7445 Michael Hanselmann
      # Calling the processor on a finished job should be a no-op
729 66bd7445 Michael Hanselmann
      self.assertTrue(jqueue._JobProcessor(queue, opexec, job)())
730 66bd7445 Michael Hanselmann
      self.assertRaises(IndexError, queue.GetNextUpdate)
731 be760ba8 Michael Hanselmann
732 be760ba8 Michael Hanselmann
  def testCancelWhileInQueue(self):
733 be760ba8 Michael Hanselmann
    queue = _FakeQueueForProc()
734 be760ba8 Michael Hanselmann
735 be760ba8 Michael Hanselmann
    ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
736 be760ba8 Michael Hanselmann
           for i in range(5)]
737 be760ba8 Michael Hanselmann
738 be760ba8 Michael Hanselmann
    # Create job
739 be760ba8 Michael Hanselmann
    job_id = 17045
740 be760ba8 Michael Hanselmann
    job = self._CreateJob(queue, job_id, ops)
741 be760ba8 Michael Hanselmann
742 be760ba8 Michael Hanselmann
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
743 be760ba8 Michael Hanselmann
744 be760ba8 Michael Hanselmann
    # Mark as cancelled
745 be760ba8 Michael Hanselmann
    (success, _) = job.Cancel()
746 be760ba8 Michael Hanselmann
    self.assert_(success)
747 be760ba8 Michael Hanselmann
748 ebb2a2a3 Michael Hanselmann
    self.assertRaises(IndexError, queue.GetNextUpdate)
749 ebb2a2a3 Michael Hanselmann
750 66bd7445 Michael Hanselmann
    self.assertFalse(job.start_timestamp)
751 66bd7445 Michael Hanselmann
    self.assertTrue(job.end_timestamp)
752 be760ba8 Michael Hanselmann
    self.assert_(compat.all(op.status == constants.OP_STATUS_CANCELED
753 be760ba8 Michael Hanselmann
                            for op in job.ops))
754 be760ba8 Michael Hanselmann
755 66bd7445 Michael Hanselmann
    # Serialize to check for differences
756 66bd7445 Michael Hanselmann
    before_proc = job.Serialize()
757 66bd7445 Michael Hanselmann
758 66bd7445 Michael Hanselmann
    # Simulate processor called in workerpool
759 ebb2a2a3 Michael Hanselmann
    opexec = _FakeExecOpCodeForProc(queue, None, None)
760 30c945d0 Michael Hanselmann
    self.assert_(jqueue._JobProcessor(queue, opexec, job)())
761 30c945d0 Michael Hanselmann
762 30c945d0 Michael Hanselmann
    # Check result
763 30c945d0 Michael Hanselmann
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_CANCELED)
764 30c945d0 Michael Hanselmann
    self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_CANCELED])
765 30c945d0 Michael Hanselmann
    self.assertFalse(job.start_timestamp)
766 66bd7445 Michael Hanselmann
    self.assertTrue(job.end_timestamp)
767 30c945d0 Michael Hanselmann
    self.assertFalse(compat.any(op.start_timestamp or op.end_timestamp
768 30c945d0 Michael Hanselmann
                                for op in job.ops))
769 30c945d0 Michael Hanselmann
    self.assertEqual(job.GetInfo(["opstatus", "opresult"]),
770 30c945d0 Michael Hanselmann
                     [[constants.OP_STATUS_CANCELED for _ in job.ops],
771 30c945d0 Michael Hanselmann
                      ["Job canceled by request" for _ in job.ops]])
772 30c945d0 Michael Hanselmann
773 66bd7445 Michael Hanselmann
    # Must not have changed or written
774 66bd7445 Michael Hanselmann
    self.assertEqual(before_proc, job.Serialize())
775 66bd7445 Michael Hanselmann
    self.assertRaises(IndexError, queue.GetNextUpdate)
776 66bd7445 Michael Hanselmann
777 30c945d0 Michael Hanselmann
  def testCancelWhileWaitlockInQueue(self):
778 30c945d0 Michael Hanselmann
    queue = _FakeQueueForProc()
779 30c945d0 Michael Hanselmann
780 30c945d0 Michael Hanselmann
    ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
781 30c945d0 Michael Hanselmann
           for i in range(5)]
782 30c945d0 Michael Hanselmann
783 30c945d0 Michael Hanselmann
    # Create job
784 30c945d0 Michael Hanselmann
    job_id = 8645
785 30c945d0 Michael Hanselmann
    job = self._CreateJob(queue, job_id, ops)
786 30c945d0 Michael Hanselmann
787 30c945d0 Michael Hanselmann
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
788 30c945d0 Michael Hanselmann
789 30c945d0 Michael Hanselmann
    job.ops[0].status = constants.OP_STATUS_WAITLOCK
790 30c945d0 Michael Hanselmann
791 30c945d0 Michael Hanselmann
    assert len(job.ops) == 5
792 30c945d0 Michael Hanselmann
793 30c945d0 Michael Hanselmann
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITLOCK)
794 30c945d0 Michael Hanselmann
795 30c945d0 Michael Hanselmann
    # Mark as cancelling
796 30c945d0 Michael Hanselmann
    (success, _) = job.Cancel()
797 30c945d0 Michael Hanselmann
    self.assert_(success)
798 30c945d0 Michael Hanselmann
799 30c945d0 Michael Hanselmann
    self.assertRaises(IndexError, queue.GetNextUpdate)
800 30c945d0 Michael Hanselmann
801 30c945d0 Michael Hanselmann
    self.assert_(compat.all(op.status == constants.OP_STATUS_CANCELING
802 30c945d0 Michael Hanselmann
                            for op in job.ops))
803 30c945d0 Michael Hanselmann
804 30c945d0 Michael Hanselmann
    opexec = _FakeExecOpCodeForProc(queue, None, None)
805 30c945d0 Michael Hanselmann
    self.assert_(jqueue._JobProcessor(queue, opexec, job)())
806 be760ba8 Michael Hanselmann
807 be760ba8 Michael Hanselmann
    # Check result
808 be760ba8 Michael Hanselmann
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_CANCELED)
809 be760ba8 Michael Hanselmann
    self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_CANCELED])
810 be760ba8 Michael Hanselmann
    self.assertFalse(job.start_timestamp)
811 be760ba8 Michael Hanselmann
    self.assert_(job.end_timestamp)
812 be760ba8 Michael Hanselmann
    self.assertFalse(compat.any(op.start_timestamp or op.end_timestamp
813 be760ba8 Michael Hanselmann
                                for op in job.ops))
814 be760ba8 Michael Hanselmann
    self.assertEqual(job.GetInfo(["opstatus", "opresult"]),
815 be760ba8 Michael Hanselmann
                     [[constants.OP_STATUS_CANCELED for _ in job.ops],
816 be760ba8 Michael Hanselmann
                      ["Job canceled by request" for _ in job.ops]])
817 be760ba8 Michael Hanselmann
818 be760ba8 Michael Hanselmann
  def testCancelWhileWaitlock(self):
819 be760ba8 Michael Hanselmann
    queue = _FakeQueueForProc()
820 be760ba8 Michael Hanselmann
821 be760ba8 Michael Hanselmann
    ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
822 be760ba8 Michael Hanselmann
           for i in range(5)]
823 be760ba8 Michael Hanselmann
824 be760ba8 Michael Hanselmann
    # Create job
825 be760ba8 Michael Hanselmann
    job_id = 11009
826 be760ba8 Michael Hanselmann
    job = self._CreateJob(queue, job_id, ops)
827 be760ba8 Michael Hanselmann
828 be760ba8 Michael Hanselmann
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
829 be760ba8 Michael Hanselmann
830 f23db633 Michael Hanselmann
    def _BeforeStart(timeout, priority):
831 ebb2a2a3 Michael Hanselmann
      self.assertEqual(queue.GetNextUpdate(), (job, True))
832 ebb2a2a3 Michael Hanselmann
      self.assertRaises(IndexError, queue.GetNextUpdate)
833 be760ba8 Michael Hanselmann
      self.assertFalse(queue.IsAcquired())
834 be760ba8 Michael Hanselmann
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITLOCK)
835 be760ba8 Michael Hanselmann
836 be760ba8 Michael Hanselmann
      # Mark as cancelled
837 be760ba8 Michael Hanselmann
      (success, _) = job.Cancel()
838 be760ba8 Michael Hanselmann
      self.assert_(success)
839 be760ba8 Michael Hanselmann
840 be760ba8 Michael Hanselmann
      self.assert_(compat.all(op.status == constants.OP_STATUS_CANCELING
841 be760ba8 Michael Hanselmann
                              for op in job.ops))
842 ebb2a2a3 Michael Hanselmann
      self.assertRaises(IndexError, queue.GetNextUpdate)
843 be760ba8 Michael Hanselmann
844 be760ba8 Michael Hanselmann
    def _AfterStart(op, cbs):
845 ebb2a2a3 Michael Hanselmann
      self.assertEqual(queue.GetNextUpdate(), (job, True))
846 ebb2a2a3 Michael Hanselmann
      self.assertRaises(IndexError, queue.GetNextUpdate)
847 be760ba8 Michael Hanselmann
      self.assertFalse(queue.IsAcquired())
848 be760ba8 Michael Hanselmann
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
849 be760ba8 Michael Hanselmann
850 ebb2a2a3 Michael Hanselmann
    opexec = _FakeExecOpCodeForProc(queue, _BeforeStart, _AfterStart)
851 be760ba8 Michael Hanselmann
852 ebb2a2a3 Michael Hanselmann
    self.assertRaises(IndexError, queue.GetNextUpdate)
853 ebb2a2a3 Michael Hanselmann
    self.assert_(jqueue._JobProcessor(queue, opexec, job)())
854 ebb2a2a3 Michael Hanselmann
    self.assertEqual(queue.GetNextUpdate(), (job, True))
855 ebb2a2a3 Michael Hanselmann
    self.assertRaises(IndexError, queue.GetNextUpdate)
856 be760ba8 Michael Hanselmann
857 be760ba8 Michael Hanselmann
    # Check result
858 be760ba8 Michael Hanselmann
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_CANCELED)
859 be760ba8 Michael Hanselmann
    self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_CANCELED])
860 be760ba8 Michael Hanselmann
    self.assert_(job.start_timestamp)
861 be760ba8 Michael Hanselmann
    self.assert_(job.end_timestamp)
862 be760ba8 Michael Hanselmann
    self.assertFalse(compat.all(op.start_timestamp and op.end_timestamp
863 be760ba8 Michael Hanselmann
                                for op in job.ops))
864 be760ba8 Michael Hanselmann
    self.assertEqual(job.GetInfo(["opstatus", "opresult"]),
865 be760ba8 Michael Hanselmann
                     [[constants.OP_STATUS_CANCELED for _ in job.ops],
866 be760ba8 Michael Hanselmann
                      ["Job canceled by request" for _ in job.ops]])
867 be760ba8 Michael Hanselmann
868 9e49dfc5 Michael Hanselmann
  def testCancelWhileWaitlockWithTimeout(self):
869 9e49dfc5 Michael Hanselmann
    queue = _FakeQueueForProc()
870 9e49dfc5 Michael Hanselmann
871 9e49dfc5 Michael Hanselmann
    ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
872 9e49dfc5 Michael Hanselmann
           for i in range(5)]
873 9e49dfc5 Michael Hanselmann
874 9e49dfc5 Michael Hanselmann
    # Create job
875 9e49dfc5 Michael Hanselmann
    job_id = 24314
876 9e49dfc5 Michael Hanselmann
    job = self._CreateJob(queue, job_id, ops)
877 9e49dfc5 Michael Hanselmann
878 9e49dfc5 Michael Hanselmann
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
879 9e49dfc5 Michael Hanselmann
880 9e49dfc5 Michael Hanselmann
    def _BeforeStart(timeout, priority):
881 9e49dfc5 Michael Hanselmann
      self.assertFalse(queue.IsAcquired())
882 9e49dfc5 Michael Hanselmann
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITLOCK)
883 9e49dfc5 Michael Hanselmann
884 9e49dfc5 Michael Hanselmann
      # Mark as cancelled
885 9e49dfc5 Michael Hanselmann
      (success, _) = job.Cancel()
886 9e49dfc5 Michael Hanselmann
      self.assert_(success)
887 9e49dfc5 Michael Hanselmann
888 9e49dfc5 Michael Hanselmann
      self.assert_(compat.all(op.status == constants.OP_STATUS_CANCELING
889 9e49dfc5 Michael Hanselmann
                              for op in job.ops))
890 9e49dfc5 Michael Hanselmann
891 9e49dfc5 Michael Hanselmann
      # Fake an acquire attempt timing out
892 9e49dfc5 Michael Hanselmann
      raise mcpu.LockAcquireTimeout()
893 9e49dfc5 Michael Hanselmann
894 9e49dfc5 Michael Hanselmann
    def _AfterStart(op, cbs):
895 9e49dfc5 Michael Hanselmann
      self.fail("Should not reach this")
896 9e49dfc5 Michael Hanselmann
897 ebb2a2a3 Michael Hanselmann
    opexec = _FakeExecOpCodeForProc(queue, _BeforeStart, _AfterStart)
898 9e49dfc5 Michael Hanselmann
899 9e49dfc5 Michael Hanselmann
    self.assert_(jqueue._JobProcessor(queue, opexec, job)())
900 9e49dfc5 Michael Hanselmann
901 9e49dfc5 Michael Hanselmann
    # Check result
902 9e49dfc5 Michael Hanselmann
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_CANCELED)
903 9e49dfc5 Michael Hanselmann
    self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_CANCELED])
904 9e49dfc5 Michael Hanselmann
    self.assert_(job.start_timestamp)
905 9e49dfc5 Michael Hanselmann
    self.assert_(job.end_timestamp)
906 9e49dfc5 Michael Hanselmann
    self.assertFalse(compat.all(op.start_timestamp and op.end_timestamp
907 9e49dfc5 Michael Hanselmann
                                for op in job.ops))
908 9e49dfc5 Michael Hanselmann
    self.assertEqual(job.GetInfo(["opstatus", "opresult"]),
909 9e49dfc5 Michael Hanselmann
                     [[constants.OP_STATUS_CANCELED for _ in job.ops],
910 9e49dfc5 Michael Hanselmann
                      ["Job canceled by request" for _ in job.ops]])
911 9e49dfc5 Michael Hanselmann
912 be760ba8 Michael Hanselmann
  def testCancelWhileRunning(self):
913 be760ba8 Michael Hanselmann
    # Tests canceling a job with finished opcodes and more, unprocessed ones
914 be760ba8 Michael Hanselmann
    queue = _FakeQueueForProc()
915 be760ba8 Michael Hanselmann
916 be760ba8 Michael Hanselmann
    ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
917 be760ba8 Michael Hanselmann
           for i in range(3)]
918 be760ba8 Michael Hanselmann
919 be760ba8 Michael Hanselmann
    # Create job
920 be760ba8 Michael Hanselmann
    job_id = 28492
921 be760ba8 Michael Hanselmann
    job = self._CreateJob(queue, job_id, ops)
922 be760ba8 Michael Hanselmann
923 be760ba8 Michael Hanselmann
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
924 be760ba8 Michael Hanselmann
925 ebb2a2a3 Michael Hanselmann
    opexec = _FakeExecOpCodeForProc(queue, None, None)
926 be760ba8 Michael Hanselmann
927 be760ba8 Michael Hanselmann
    # Run one opcode
928 be760ba8 Michael Hanselmann
    self.assertFalse(jqueue._JobProcessor(queue, opexec, job)())
929 be760ba8 Michael Hanselmann
930 be760ba8 Michael Hanselmann
    # Job goes back to queued
931 be760ba8 Michael Hanselmann
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
932 be760ba8 Michael Hanselmann
    self.assertEqual(job.GetInfo(["opstatus", "opresult"]),
933 be760ba8 Michael Hanselmann
                     [[constants.OP_STATUS_SUCCESS,
934 be760ba8 Michael Hanselmann
                       constants.OP_STATUS_QUEUED,
935 be760ba8 Michael Hanselmann
                       constants.OP_STATUS_QUEUED],
936 be760ba8 Michael Hanselmann
                      ["Res0", None, None]])
937 be760ba8 Michael Hanselmann
938 be760ba8 Michael Hanselmann
    # Mark as cancelled
939 be760ba8 Michael Hanselmann
    (success, _) = job.Cancel()
940 be760ba8 Michael Hanselmann
    self.assert_(success)
941 be760ba8 Michael Hanselmann
942 be760ba8 Michael Hanselmann
    # Try processing another opcode (this will actually cancel the job)
943 be760ba8 Michael Hanselmann
    self.assert_(jqueue._JobProcessor(queue, opexec, job)())
944 be760ba8 Michael Hanselmann
945 be760ba8 Michael Hanselmann
    # Check result
946 be760ba8 Michael Hanselmann
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_CANCELED)
947 be760ba8 Michael Hanselmann
    self.assertEqual(job.GetInfo(["id"]), [job_id])
948 be760ba8 Michael Hanselmann
    self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_CANCELED])
949 be760ba8 Michael Hanselmann
    self.assertEqual(job.GetInfo(["opstatus", "opresult"]),
950 be760ba8 Michael Hanselmann
                     [[constants.OP_STATUS_SUCCESS,
951 be760ba8 Michael Hanselmann
                       constants.OP_STATUS_CANCELED,
952 be760ba8 Michael Hanselmann
                       constants.OP_STATUS_CANCELED],
953 be760ba8 Michael Hanselmann
                      ["Res0", "Job canceled by request",
954 be760ba8 Michael Hanselmann
                       "Job canceled by request"]])
955 be760ba8 Michael Hanselmann
956 be760ba8 Michael Hanselmann
  def testPartiallyRun(self):
957 be760ba8 Michael Hanselmann
    # Tests calling the processor on a job that's been partially run before the
958 be760ba8 Michael Hanselmann
    # program was restarted
959 be760ba8 Michael Hanselmann
    queue = _FakeQueueForProc()
960 be760ba8 Michael Hanselmann
961 ebb2a2a3 Michael Hanselmann
    opexec = _FakeExecOpCodeForProc(queue, None, None)
962 be760ba8 Michael Hanselmann
963 be760ba8 Michael Hanselmann
    for job_id, successcount in [(30697, 1), (2552, 4), (12489, 9)]:
964 be760ba8 Michael Hanselmann
      ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
965 be760ba8 Michael Hanselmann
             for i in range(10)]
966 be760ba8 Michael Hanselmann
967 be760ba8 Michael Hanselmann
      # Create job
968 be760ba8 Michael Hanselmann
      job = self._CreateJob(queue, job_id, ops)
969 be760ba8 Michael Hanselmann
970 be760ba8 Michael Hanselmann
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
971 be760ba8 Michael Hanselmann
972 be760ba8 Michael Hanselmann
      for _ in range(successcount):
973 be760ba8 Michael Hanselmann
        self.assertFalse(jqueue._JobProcessor(queue, opexec, job)())
974 be760ba8 Michael Hanselmann
975 be760ba8 Michael Hanselmann
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
976 be760ba8 Michael Hanselmann
      self.assertEqual(job.GetInfo(["opstatus"]),
977 be760ba8 Michael Hanselmann
                       [[constants.OP_STATUS_SUCCESS
978 be760ba8 Michael Hanselmann
                         for _ in range(successcount)] +
979 be760ba8 Michael Hanselmann
                        [constants.OP_STATUS_QUEUED
980 be760ba8 Michael Hanselmann
                         for _ in range(len(ops) - successcount)]])
981 be760ba8 Michael Hanselmann
982 03b63608 Michael Hanselmann
      self.assert_(job.ops_iter)
983 be760ba8 Michael Hanselmann
984 be760ba8 Michael Hanselmann
      # Serialize and restore (simulates program restart)
985 c0f6d0d8 Michael Hanselmann
      newjob = jqueue._QueuedJob.Restore(queue, job.Serialize(), True)
986 03b63608 Michael Hanselmann
      self.assertFalse(newjob.ops_iter)
987 be760ba8 Michael Hanselmann
      self._TestPartial(newjob, successcount)
988 be760ba8 Michael Hanselmann
989 be760ba8 Michael Hanselmann
  def _TestPartial(self, job, successcount):
990 be760ba8 Michael Hanselmann
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
991 be760ba8 Michael Hanselmann
    self.assertEqual(job.start_timestamp, job.ops[0].start_timestamp)
992 be760ba8 Michael Hanselmann
993 be760ba8 Michael Hanselmann
    queue = _FakeQueueForProc()
994 ebb2a2a3 Michael Hanselmann
    opexec = _FakeExecOpCodeForProc(queue, None, None)
995 be760ba8 Michael Hanselmann
996 be760ba8 Michael Hanselmann
    for remaining in reversed(range(len(job.ops) - successcount)):
997 be760ba8 Michael Hanselmann
      result = jqueue._JobProcessor(queue, opexec, job)()
998 66bd7445 Michael Hanselmann
      self.assertEqual(queue.GetNextUpdate(), (job, True))
999 66bd7445 Michael Hanselmann
      self.assertEqual(queue.GetNextUpdate(), (job, True))
1000 66bd7445 Michael Hanselmann
      self.assertEqual(queue.GetNextUpdate(), (job, True))
1001 66bd7445 Michael Hanselmann
      self.assertRaises(IndexError, queue.GetNextUpdate)
1002 be760ba8 Michael Hanselmann
1003 be760ba8 Michael Hanselmann
      if remaining == 0:
1004 be760ba8 Michael Hanselmann
        # Last opcode
1005 be760ba8 Michael Hanselmann
        self.assert_(result)
1006 be760ba8 Michael Hanselmann
        break
1007 be760ba8 Michael Hanselmann
1008 be760ba8 Michael Hanselmann
      self.assertFalse(result)
1009 be760ba8 Michael Hanselmann
1010 be760ba8 Michael Hanselmann
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1011 be760ba8 Michael Hanselmann
1012 66bd7445 Michael Hanselmann
    self.assertRaises(IndexError, queue.GetNextUpdate)
1013 be760ba8 Michael Hanselmann
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_SUCCESS)
1014 be760ba8 Michael Hanselmann
    self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_SUCCESS])
1015 be760ba8 Michael Hanselmann
    self.assertEqual(job.GetInfo(["opresult"]),
1016 be760ba8 Michael Hanselmann
                     [[op.input.result for op in job.ops]])
1017 be760ba8 Michael Hanselmann
    self.assertEqual(job.GetInfo(["opstatus"]),
1018 be760ba8 Michael Hanselmann
                     [[constants.OP_STATUS_SUCCESS for _ in job.ops]])
1019 be760ba8 Michael Hanselmann
    self.assert_(compat.all(op.start_timestamp and op.end_timestamp
1020 be760ba8 Michael Hanselmann
                            for op in job.ops))
1021 be760ba8 Michael Hanselmann
1022 be760ba8 Michael Hanselmann
    self._GenericCheckJob(job)
1023 be760ba8 Michael Hanselmann
1024 66bd7445 Michael Hanselmann
    # Calling the processor on a finished job should be a no-op
1025 66bd7445 Michael Hanselmann
    self.assertTrue(jqueue._JobProcessor(queue, opexec, job)())
1026 66bd7445 Michael Hanselmann
    self.assertRaises(IndexError, queue.GetNextUpdate)
1027 be760ba8 Michael Hanselmann
1028 be760ba8 Michael Hanselmann
    # ... also after being restored
1029 c0f6d0d8 Michael Hanselmann
    job2 = jqueue._QueuedJob.Restore(queue, job.Serialize(), True)
1030 b95479a5 Michael Hanselmann
    # Calling the processor on a finished job should be a no-op
1031 66bd7445 Michael Hanselmann
    self.assertTrue(jqueue._JobProcessor(queue, opexec, job2)())
1032 66bd7445 Michael Hanselmann
    self.assertRaises(IndexError, queue.GetNextUpdate)
1033 be760ba8 Michael Hanselmann
1034 be760ba8 Michael Hanselmann
  def testProcessorOnRunningJob(self):
1035 be760ba8 Michael Hanselmann
    ops = [opcodes.OpTestDummy(result="result", fail=False)]
1036 be760ba8 Michael Hanselmann
1037 be760ba8 Michael Hanselmann
    queue = _FakeQueueForProc()
1038 ebb2a2a3 Michael Hanselmann
    opexec = _FakeExecOpCodeForProc(queue, None, None)
1039 be760ba8 Michael Hanselmann
1040 be760ba8 Michael Hanselmann
    # Create job
1041 be760ba8 Michael Hanselmann
    job = self._CreateJob(queue, 9571, ops)
1042 be760ba8 Michael Hanselmann
1043 be760ba8 Michael Hanselmann
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1044 be760ba8 Michael Hanselmann
1045 be760ba8 Michael Hanselmann
    job.ops[0].status = constants.OP_STATUS_RUNNING
1046 be760ba8 Michael Hanselmann
1047 be760ba8 Michael Hanselmann
    assert len(job.ops) == 1
1048 be760ba8 Michael Hanselmann
1049 be760ba8 Michael Hanselmann
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
1050 be760ba8 Michael Hanselmann
1051 be760ba8 Michael Hanselmann
    # Calling on running job must fail
1052 be760ba8 Michael Hanselmann
    self.assertRaises(errors.ProgrammerError,
1053 be760ba8 Michael Hanselmann
                      jqueue._JobProcessor(queue, opexec, job))
1054 be760ba8 Michael Hanselmann
1055 be760ba8 Michael Hanselmann
  def testLogMessages(self):
1056 be760ba8 Michael Hanselmann
    # Tests the "Feedback" callback function
1057 be760ba8 Michael Hanselmann
    queue = _FakeQueueForProc()
1058 be760ba8 Michael Hanselmann
1059 be760ba8 Michael Hanselmann
    messages = {
1060 be760ba8 Michael Hanselmann
      1: [
1061 be760ba8 Michael Hanselmann
        (None, "Hello"),
1062 be760ba8 Michael Hanselmann
        (None, "World"),
1063 be760ba8 Michael Hanselmann
        (constants.ELOG_MESSAGE, "there"),
1064 be760ba8 Michael Hanselmann
        ],
1065 be760ba8 Michael Hanselmann
      4: [
1066 be760ba8 Michael Hanselmann
        (constants.ELOG_JQUEUE_TEST, (1, 2, 3)),
1067 be760ba8 Michael Hanselmann
        (constants.ELOG_JQUEUE_TEST, ("other", "type")),
1068 be760ba8 Michael Hanselmann
        ],
1069 be760ba8 Michael Hanselmann
      }
1070 be760ba8 Michael Hanselmann
    ops = [opcodes.OpTestDummy(result="Logtest%s" % i, fail=False,
1071 be760ba8 Michael Hanselmann
                               messages=messages.get(i, []))
1072 be760ba8 Michael Hanselmann
           for i in range(5)]
1073 be760ba8 Michael Hanselmann
1074 be760ba8 Michael Hanselmann
    # Create job
1075 be760ba8 Michael Hanselmann
    job = self._CreateJob(queue, 29386, ops)
1076 be760ba8 Michael Hanselmann
1077 f23db633 Michael Hanselmann
    def _BeforeStart(timeout, priority):
1078 ebb2a2a3 Michael Hanselmann
      self.assertEqual(queue.GetNextUpdate(), (job, True))
1079 ebb2a2a3 Michael Hanselmann
      self.assertRaises(IndexError, queue.GetNextUpdate)
1080 be760ba8 Michael Hanselmann
      self.assertFalse(queue.IsAcquired())
1081 be760ba8 Michael Hanselmann
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITLOCK)
1082 be760ba8 Michael Hanselmann
1083 be760ba8 Michael Hanselmann
    def _AfterStart(op, cbs):
1084 ebb2a2a3 Michael Hanselmann
      self.assertEqual(queue.GetNextUpdate(), (job, True))
1085 ebb2a2a3 Michael Hanselmann
      self.assertRaises(IndexError, queue.GetNextUpdate)
1086 be760ba8 Michael Hanselmann
      self.assertFalse(queue.IsAcquired())
1087 be760ba8 Michael Hanselmann
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
1088 be760ba8 Michael Hanselmann
1089 be760ba8 Michael Hanselmann
      self.assertRaises(AssertionError, cbs.Feedback,
1090 be760ba8 Michael Hanselmann
                        "too", "many", "arguments")
1091 be760ba8 Michael Hanselmann
1092 be760ba8 Michael Hanselmann
      for (log_type, msg) in op.messages:
1093 ebb2a2a3 Michael Hanselmann
        self.assertRaises(IndexError, queue.GetNextUpdate)
1094 be760ba8 Michael Hanselmann
        if log_type:
1095 be760ba8 Michael Hanselmann
          cbs.Feedback(log_type, msg)
1096 be760ba8 Michael Hanselmann
        else:
1097 be760ba8 Michael Hanselmann
          cbs.Feedback(msg)
1098 ebb2a2a3 Michael Hanselmann
        # Check for job update without replication
1099 ebb2a2a3 Michael Hanselmann
        self.assertEqual(queue.GetNextUpdate(), (job, False))
1100 ebb2a2a3 Michael Hanselmann
        self.assertRaises(IndexError, queue.GetNextUpdate)
1101 be760ba8 Michael Hanselmann
1102 ebb2a2a3 Michael Hanselmann
    opexec = _FakeExecOpCodeForProc(queue, _BeforeStart, _AfterStart)
1103 be760ba8 Michael Hanselmann
1104 be760ba8 Michael Hanselmann
    for remaining in reversed(range(len(job.ops))):
1105 ebb2a2a3 Michael Hanselmann
      self.assertRaises(IndexError, queue.GetNextUpdate)
1106 be760ba8 Michael Hanselmann
      result = jqueue._JobProcessor(queue, opexec, job)()
1107 ebb2a2a3 Michael Hanselmann
      self.assertEqual(queue.GetNextUpdate(), (job, True))
1108 ebb2a2a3 Michael Hanselmann
      self.assertRaises(IndexError, queue.GetNextUpdate)
1109 be760ba8 Michael Hanselmann
1110 be760ba8 Michael Hanselmann
      if remaining == 0:
1111 be760ba8 Michael Hanselmann
        # Last opcode
1112 be760ba8 Michael Hanselmann
        self.assert_(result)
1113 be760ba8 Michael Hanselmann
        break
1114 be760ba8 Michael Hanselmann
1115 be760ba8 Michael Hanselmann
      self.assertFalse(result)
1116 be760ba8 Michael Hanselmann
1117 be760ba8 Michael Hanselmann
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1118 be760ba8 Michael Hanselmann
1119 ebb2a2a3 Michael Hanselmann
    self.assertRaises(IndexError, queue.GetNextUpdate)
1120 ebb2a2a3 Michael Hanselmann
1121 be760ba8 Michael Hanselmann
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_SUCCESS)
1122 be760ba8 Michael Hanselmann
    self.assertEqual(job.GetInfo(["opresult"]),
1123 be760ba8 Michael Hanselmann
                     [[op.input.result for op in job.ops]])
1124 be760ba8 Michael Hanselmann
1125 be760ba8 Michael Hanselmann
    logmsgcount = sum(len(m) for m in messages.values())
1126 be760ba8 Michael Hanselmann
1127 be760ba8 Michael Hanselmann
    self._CheckLogMessages(job, logmsgcount)
1128 be760ba8 Michael Hanselmann
1129 be760ba8 Michael Hanselmann
    # Serialize and restore (simulates program restart)
1130 c0f6d0d8 Michael Hanselmann
    newjob = jqueue._QueuedJob.Restore(queue, job.Serialize(), True)
1131 be760ba8 Michael Hanselmann
    self._CheckLogMessages(newjob, logmsgcount)
1132 be760ba8 Michael Hanselmann
1133 be760ba8 Michael Hanselmann
    # Check each message
1134 be760ba8 Michael Hanselmann
    prevserial = -1
1135 be760ba8 Michael Hanselmann
    for idx, oplog in enumerate(job.GetInfo(["oplog"])[0]):
1136 be760ba8 Michael Hanselmann
      for (serial, timestamp, log_type, msg) in oplog:
1137 be760ba8 Michael Hanselmann
        (exptype, expmsg) = messages.get(idx).pop(0)
1138 be760ba8 Michael Hanselmann
        if exptype:
1139 be760ba8 Michael Hanselmann
          self.assertEqual(log_type, exptype)
1140 be760ba8 Michael Hanselmann
        else:
1141 be760ba8 Michael Hanselmann
          self.assertEqual(log_type, constants.ELOG_MESSAGE)
1142 be760ba8 Michael Hanselmann
        self.assertEqual(expmsg, msg)
1143 be760ba8 Michael Hanselmann
        self.assert_(serial > prevserial)
1144 be760ba8 Michael Hanselmann
        prevserial = serial
1145 be760ba8 Michael Hanselmann
1146 be760ba8 Michael Hanselmann
  def _CheckLogMessages(self, job, count):
1147 be760ba8 Michael Hanselmann
    # Check serial
1148 be760ba8 Michael Hanselmann
    self.assertEqual(job.log_serial, count)
1149 be760ba8 Michael Hanselmann
1150 be760ba8 Michael Hanselmann
    # No filter
1151 be760ba8 Michael Hanselmann
    self.assertEqual(job.GetLogEntries(None),
1152 be760ba8 Michael Hanselmann
                     [entry for entries in job.GetInfo(["oplog"])[0] if entries
1153 be760ba8 Michael Hanselmann
                      for entry in entries])
1154 be760ba8 Michael Hanselmann
1155 be760ba8 Michael Hanselmann
    # Filter with serial
1156 be760ba8 Michael Hanselmann
    assert count > 3
1157 be760ba8 Michael Hanselmann
    self.assert_(job.GetLogEntries(3))
1158 be760ba8 Michael Hanselmann
    self.assertEqual(job.GetLogEntries(3),
1159 be760ba8 Michael Hanselmann
                     [entry for entries in job.GetInfo(["oplog"])[0] if entries
1160 be760ba8 Michael Hanselmann
                      for entry in entries][3:])
1161 be760ba8 Michael Hanselmann
1162 be760ba8 Michael Hanselmann
    # No log message after highest serial
1163 be760ba8 Michael Hanselmann
    self.assertFalse(job.GetLogEntries(count))
1164 be760ba8 Michael Hanselmann
    self.assertFalse(job.GetLogEntries(count + 3))
1165 be760ba8 Michael Hanselmann
1166 6a373640 Michael Hanselmann
  def testSubmitManyJobs(self):
1167 6a373640 Michael Hanselmann
    queue = _FakeQueueForProc()
1168 6a373640 Michael Hanselmann
1169 6a373640 Michael Hanselmann
    job_id = 15656
1170 6a373640 Michael Hanselmann
    ops = [
1171 6a373640 Michael Hanselmann
      opcodes.OpTestDummy(result="Res0", fail=False,
1172 6a373640 Michael Hanselmann
                          submit_jobs=[]),
1173 6a373640 Michael Hanselmann
      opcodes.OpTestDummy(result="Res1", fail=False,
1174 6a373640 Michael Hanselmann
                          submit_jobs=[
1175 6a373640 Michael Hanselmann
                            [opcodes.OpTestDummy(result="r1j0", fail=False)],
1176 6a373640 Michael Hanselmann
                            ]),
1177 6a373640 Michael Hanselmann
      opcodes.OpTestDummy(result="Res2", fail=False,
1178 6a373640 Michael Hanselmann
                          submit_jobs=[
1179 6a373640 Michael Hanselmann
                            [opcodes.OpTestDummy(result="r2j0o0", fail=False),
1180 6a373640 Michael Hanselmann
                             opcodes.OpTestDummy(result="r2j0o1", fail=False),
1181 6a373640 Michael Hanselmann
                             opcodes.OpTestDummy(result="r2j0o2", fail=False),
1182 6a373640 Michael Hanselmann
                             opcodes.OpTestDummy(result="r2j0o3", fail=False)],
1183 6a373640 Michael Hanselmann
                            [opcodes.OpTestDummy(result="r2j1", fail=False)],
1184 6a373640 Michael Hanselmann
                            [opcodes.OpTestDummy(result="r2j3o0", fail=False),
1185 6a373640 Michael Hanselmann
                             opcodes.OpTestDummy(result="r2j3o1", fail=False)],
1186 6a373640 Michael Hanselmann
                            ]),
1187 6a373640 Michael Hanselmann
      ]
1188 6a373640 Michael Hanselmann
1189 6a373640 Michael Hanselmann
    # Create job
1190 6a373640 Michael Hanselmann
    job = self._CreateJob(queue, job_id, ops)
1191 6a373640 Michael Hanselmann
1192 6a373640 Michael Hanselmann
    def _BeforeStart(timeout, priority):
1193 6a373640 Michael Hanselmann
      self.assertEqual(queue.GetNextUpdate(), (job, True))
1194 6a373640 Michael Hanselmann
      self.assertRaises(IndexError, queue.GetNextUpdate)
1195 6a373640 Michael Hanselmann
      self.assertFalse(queue.IsAcquired())
1196 6a373640 Michael Hanselmann
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITLOCK)
1197 6a373640 Michael Hanselmann
      self.assertFalse(job.cur_opctx)
1198 6a373640 Michael Hanselmann
1199 6a373640 Michael Hanselmann
    def _AfterStart(op, cbs):
1200 6a373640 Michael Hanselmann
      self.assertEqual(queue.GetNextUpdate(), (job, True))
1201 6a373640 Michael Hanselmann
      self.assertRaises(IndexError, queue.GetNextUpdate)
1202 6a373640 Michael Hanselmann
1203 6a373640 Michael Hanselmann
      self.assertFalse(queue.IsAcquired())
1204 6a373640 Michael Hanselmann
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
1205 6a373640 Michael Hanselmann
      self.assertFalse(job.cur_opctx)
1206 6a373640 Michael Hanselmann
1207 6a373640 Michael Hanselmann
      # Job is running, cancelling shouldn't be possible
1208 6a373640 Michael Hanselmann
      (success, _) = job.Cancel()
1209 6a373640 Michael Hanselmann
      self.assertFalse(success)
1210 6a373640 Michael Hanselmann
1211 6a373640 Michael Hanselmann
    opexec = _FakeExecOpCodeForProc(queue, _BeforeStart, _AfterStart)
1212 6a373640 Michael Hanselmann
1213 6a373640 Michael Hanselmann
    for idx in range(len(ops)):
1214 6a373640 Michael Hanselmann
      self.assertRaises(IndexError, queue.GetNextUpdate)
1215 6a373640 Michael Hanselmann
      result = jqueue._JobProcessor(queue, opexec, job)()
1216 6a373640 Michael Hanselmann
      self.assertEqual(queue.GetNextUpdate(), (job, True))
1217 6a373640 Michael Hanselmann
      self.assertRaises(IndexError, queue.GetNextUpdate)
1218 6a373640 Michael Hanselmann
      if idx == len(ops) - 1:
1219 6a373640 Michael Hanselmann
        # Last opcode
1220 6a373640 Michael Hanselmann
        self.assert_(result)
1221 6a373640 Michael Hanselmann
      else:
1222 6a373640 Michael Hanselmann
        self.assertFalse(result)
1223 6a373640 Michael Hanselmann
1224 6a373640 Michael Hanselmann
        self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1225 6a373640 Michael Hanselmann
        self.assert_(job.start_timestamp)
1226 6a373640 Michael Hanselmann
        self.assertFalse(job.end_timestamp)
1227 6a373640 Michael Hanselmann
1228 6a373640 Michael Hanselmann
    self.assertRaises(IndexError, queue.GetNextUpdate)
1229 6a373640 Michael Hanselmann
1230 6a373640 Michael Hanselmann
    for idx, submitted_ops in enumerate(job_ops
1231 6a373640 Michael Hanselmann
                                        for op in ops
1232 6a373640 Michael Hanselmann
                                        for job_ops in op.submit_jobs):
1233 6a373640 Michael Hanselmann
      self.assertEqual(queue.GetNextSubmittedJob(),
1234 6a373640 Michael Hanselmann
                       (1000 + idx, submitted_ops))
1235 6a373640 Michael Hanselmann
    self.assertRaises(IndexError, queue.GetNextSubmittedJob)
1236 6a373640 Michael Hanselmann
1237 6a373640 Michael Hanselmann
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_SUCCESS)
1238 6a373640 Michael Hanselmann
    self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_SUCCESS])
1239 6a373640 Michael Hanselmann
    self.assertEqual(job.GetInfo(["opresult"]),
1240 6a373640 Michael Hanselmann
                     [[[], [1000], [1001, 1002, 1003]]])
1241 6a373640 Michael Hanselmann
    self.assertEqual(job.GetInfo(["opstatus"]),
1242 6a373640 Michael Hanselmann
                     [len(job.ops) * [constants.OP_STATUS_SUCCESS]])
1243 6a373640 Michael Hanselmann
1244 6a373640 Michael Hanselmann
    self._GenericCheckJob(job)
1245 6a373640 Michael Hanselmann
1246 1e6d5750 Iustin Pop
    # Calling the processor on a finished job should be a no-op
1247 1e6d5750 Iustin Pop
    self.assertTrue(jqueue._JobProcessor(queue, opexec, job)())
1248 1e6d5750 Iustin Pop
    self.assertRaises(IndexError, queue.GetNextUpdate)
1249 6a373640 Michael Hanselmann
1250 b95479a5 Michael Hanselmann
  def testJobDependency(self):
1251 b95479a5 Michael Hanselmann
    depmgr = _FakeDependencyManager()
1252 b95479a5 Michael Hanselmann
    queue = _FakeQueueForProc(depmgr=depmgr)
1253 b95479a5 Michael Hanselmann
1254 b95479a5 Michael Hanselmann
    self.assertEqual(queue.depmgr, depmgr)
1255 b95479a5 Michael Hanselmann
1256 b95479a5 Michael Hanselmann
    prev_job_id = 22113
1257 b95479a5 Michael Hanselmann
    prev_job_id2 = 28102
1258 b95479a5 Michael Hanselmann
    job_id = 29929
1259 b95479a5 Michael Hanselmann
    ops = [
1260 b95479a5 Michael Hanselmann
      opcodes.OpTestDummy(result="Res0", fail=False,
1261 b95479a5 Michael Hanselmann
                          depends=[
1262 b95479a5 Michael Hanselmann
                            [prev_job_id2, None],
1263 b95479a5 Michael Hanselmann
                            [prev_job_id, None],
1264 b95479a5 Michael Hanselmann
                            ]),
1265 b95479a5 Michael Hanselmann
      opcodes.OpTestDummy(result="Res1", fail=False),
1266 b95479a5 Michael Hanselmann
      ]
1267 b95479a5 Michael Hanselmann
1268 b95479a5 Michael Hanselmann
    # Create job
1269 b95479a5 Michael Hanselmann
    job = self._CreateJob(queue, job_id, ops)
1270 b95479a5 Michael Hanselmann
1271 b95479a5 Michael Hanselmann
    def _BeforeStart(timeout, priority):
1272 b95479a5 Michael Hanselmann
      if attempt == 0 or attempt > 5:
1273 b95479a5 Michael Hanselmann
        # Job should only be updated when it wasn't waiting for another job
1274 b95479a5 Michael Hanselmann
        self.assertEqual(queue.GetNextUpdate(), (job, True))
1275 b95479a5 Michael Hanselmann
      self.assertRaises(IndexError, queue.GetNextUpdate)
1276 b95479a5 Michael Hanselmann
      self.assertFalse(queue.IsAcquired())
1277 b95479a5 Michael Hanselmann
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITLOCK)
1278 b95479a5 Michael Hanselmann
      self.assertFalse(job.cur_opctx)
1279 b95479a5 Michael Hanselmann
1280 b95479a5 Michael Hanselmann
    def _AfterStart(op, cbs):
1281 b95479a5 Michael Hanselmann
      self.assertEqual(queue.GetNextUpdate(), (job, True))
1282 b95479a5 Michael Hanselmann
      self.assertRaises(IndexError, queue.GetNextUpdate)
1283 b95479a5 Michael Hanselmann
1284 b95479a5 Michael Hanselmann
      self.assertFalse(queue.IsAcquired())
1285 b95479a5 Michael Hanselmann
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
1286 b95479a5 Michael Hanselmann
      self.assertFalse(job.cur_opctx)
1287 b95479a5 Michael Hanselmann
1288 b95479a5 Michael Hanselmann
      # Job is running, cancelling shouldn't be possible
1289 b95479a5 Michael Hanselmann
      (success, _) = job.Cancel()
1290 b95479a5 Michael Hanselmann
      self.assertFalse(success)
1291 b95479a5 Michael Hanselmann
1292 b95479a5 Michael Hanselmann
    opexec = _FakeExecOpCodeForProc(queue, _BeforeStart, _AfterStart)
1293 b95479a5 Michael Hanselmann
1294 b95479a5 Michael Hanselmann
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1295 b95479a5 Michael Hanselmann
1296 b95479a5 Michael Hanselmann
    counter = itertools.count()
1297 b95479a5 Michael Hanselmann
    while True:
1298 b95479a5 Michael Hanselmann
      attempt = counter.next()
1299 b95479a5 Michael Hanselmann
1300 b95479a5 Michael Hanselmann
      self.assertRaises(IndexError, queue.GetNextUpdate)
1301 b95479a5 Michael Hanselmann
      self.assertRaises(IndexError, depmgr.GetNextNotification)
1302 b95479a5 Michael Hanselmann
1303 b95479a5 Michael Hanselmann
      if attempt < 2:
1304 b95479a5 Michael Hanselmann
        depmgr.AddCheckResult(job, prev_job_id2, None,
1305 b95479a5 Michael Hanselmann
                              (jqueue._JobDependencyManager.WAIT, "wait2"))
1306 b95479a5 Michael Hanselmann
      elif attempt == 2:
1307 b95479a5 Michael Hanselmann
        depmgr.AddCheckResult(job, prev_job_id2, None,
1308 b95479a5 Michael Hanselmann
                              (jqueue._JobDependencyManager.CONTINUE, "cont"))
1309 b95479a5 Michael Hanselmann
        # The processor will ask for the next dependency immediately
1310 b95479a5 Michael Hanselmann
        depmgr.AddCheckResult(job, prev_job_id, None,
1311 b95479a5 Michael Hanselmann
                              (jqueue._JobDependencyManager.WAIT, "wait"))
1312 b95479a5 Michael Hanselmann
      elif attempt < 5:
1313 b95479a5 Michael Hanselmann
        depmgr.AddCheckResult(job, prev_job_id, None,
1314 b95479a5 Michael Hanselmann
                              (jqueue._JobDependencyManager.WAIT, "wait"))
1315 b95479a5 Michael Hanselmann
      elif attempt == 5:
1316 b95479a5 Michael Hanselmann
        depmgr.AddCheckResult(job, prev_job_id, None,
1317 b95479a5 Michael Hanselmann
                              (jqueue._JobDependencyManager.CONTINUE, "cont"))
1318 b95479a5 Michael Hanselmann
      if attempt == 2:
1319 b95479a5 Michael Hanselmann
        self.assertEqual(depmgr.CountPendingResults(), 2)
1320 b95479a5 Michael Hanselmann
      elif attempt > 5:
1321 b95479a5 Michael Hanselmann
        self.assertEqual(depmgr.CountPendingResults(), 0)
1322 b95479a5 Michael Hanselmann
      else:
1323 b95479a5 Michael Hanselmann
        self.assertEqual(depmgr.CountPendingResults(), 1)
1324 b95479a5 Michael Hanselmann
1325 b95479a5 Michael Hanselmann
      result = jqueue._JobProcessor(queue, opexec, job)()
1326 b95479a5 Michael Hanselmann
      if attempt == 0 or attempt >= 5:
1327 b95479a5 Michael Hanselmann
        # Job should only be updated if there was an actual change
1328 b95479a5 Michael Hanselmann
        self.assertEqual(queue.GetNextUpdate(), (job, True))
1329 b95479a5 Michael Hanselmann
      self.assertRaises(IndexError, queue.GetNextUpdate)
1330 b95479a5 Michael Hanselmann
      self.assertFalse(depmgr.CountPendingResults())
1331 b95479a5 Michael Hanselmann
1332 b95479a5 Michael Hanselmann
      if attempt < 5:
1333 b95479a5 Michael Hanselmann
        # Simulate waiting for other job
1334 b95479a5 Michael Hanselmann
        self.assertTrue(result)
1335 b95479a5 Michael Hanselmann
        self.assertTrue(job.cur_opctx)
1336 b95479a5 Michael Hanselmann
        self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITLOCK)
1337 b95479a5 Michael Hanselmann
        self.assertRaises(IndexError, depmgr.GetNextNotification)
1338 b95479a5 Michael Hanselmann
        self.assert_(job.start_timestamp)
1339 b95479a5 Michael Hanselmann
        self.assertFalse(job.end_timestamp)
1340 b95479a5 Michael Hanselmann
        continue
1341 b95479a5 Michael Hanselmann
1342 b95479a5 Michael Hanselmann
      if result:
1343 b95479a5 Michael Hanselmann
        # Last opcode
1344 b95479a5 Michael Hanselmann
        self.assertFalse(job.cur_opctx)
1345 b95479a5 Michael Hanselmann
        self.assertEqual(queue.depmgr.GetNextNotification(), job_id)
1346 b95479a5 Michael Hanselmann
        break
1347 b95479a5 Michael Hanselmann
1348 b95479a5 Michael Hanselmann
      self.assertRaises(IndexError, depmgr.GetNextNotification)
1349 b95479a5 Michael Hanselmann
1350 b95479a5 Michael Hanselmann
      self.assertFalse(result)
1351 b95479a5 Michael Hanselmann
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1352 b95479a5 Michael Hanselmann
      self.assert_(job.start_timestamp)
1353 b95479a5 Michael Hanselmann
      self.assertFalse(job.end_timestamp)
1354 b95479a5 Michael Hanselmann
1355 b95479a5 Michael Hanselmann
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_SUCCESS)
1356 b95479a5 Michael Hanselmann
    self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_SUCCESS])
1357 b95479a5 Michael Hanselmann
    self.assertEqual(job.GetInfo(["opresult"]),
1358 b95479a5 Michael Hanselmann
                     [[op.input.result for op in job.ops]])
1359 b95479a5 Michael Hanselmann
    self.assertEqual(job.GetInfo(["opstatus"]),
1360 b95479a5 Michael Hanselmann
                     [len(job.ops) * [constants.OP_STATUS_SUCCESS]])
1361 b95479a5 Michael Hanselmann
    self.assertTrue(compat.all(op.start_timestamp and op.end_timestamp
1362 b95479a5 Michael Hanselmann
                               for op in job.ops))
1363 b95479a5 Michael Hanselmann
1364 b95479a5 Michael Hanselmann
    self._GenericCheckJob(job)
1365 b95479a5 Michael Hanselmann
1366 b95479a5 Michael Hanselmann
    self.assertRaises(IndexError, queue.GetNextUpdate)
1367 b95479a5 Michael Hanselmann
    self.assertRaises(IndexError, depmgr.GetNextNotification)
1368 b95479a5 Michael Hanselmann
    self.assertFalse(depmgr.CountPendingResults())
1369 b95479a5 Michael Hanselmann
    self.assertFalse(depmgr.CountWaitingJobs())
1370 b95479a5 Michael Hanselmann
1371 b95479a5 Michael Hanselmann
    # Calling the processor on a finished job should be a no-op
1372 b95479a5 Michael Hanselmann
    self.assertTrue(jqueue._JobProcessor(queue, opexec, job)())
1373 b95479a5 Michael Hanselmann
    self.assertRaises(IndexError, queue.GetNextUpdate)
1374 b95479a5 Michael Hanselmann
1375 b95479a5 Michael Hanselmann
  def testJobDependencyCancel(self):
1376 b95479a5 Michael Hanselmann
    depmgr = _FakeDependencyManager()
1377 b95479a5 Michael Hanselmann
    queue = _FakeQueueForProc(depmgr=depmgr)
1378 b95479a5 Michael Hanselmann
1379 b95479a5 Michael Hanselmann
    self.assertEqual(queue.depmgr, depmgr)
1380 b95479a5 Michael Hanselmann
1381 b95479a5 Michael Hanselmann
    prev_job_id = 13623
1382 b95479a5 Michael Hanselmann
    job_id = 30876
1383 b95479a5 Michael Hanselmann
    ops = [
1384 b95479a5 Michael Hanselmann
      opcodes.OpTestDummy(result="Res0", fail=False),
1385 b95479a5 Michael Hanselmann
      opcodes.OpTestDummy(result="Res1", fail=False,
1386 b95479a5 Michael Hanselmann
                          depends=[
1387 b95479a5 Michael Hanselmann
                            [prev_job_id, None],
1388 b95479a5 Michael Hanselmann
                            ]),
1389 b95479a5 Michael Hanselmann
      opcodes.OpTestDummy(result="Res2", fail=False),
1390 b95479a5 Michael Hanselmann
      ]
1391 b95479a5 Michael Hanselmann
1392 b95479a5 Michael Hanselmann
    # Create job
1393 b95479a5 Michael Hanselmann
    job = self._CreateJob(queue, job_id, ops)
1394 b95479a5 Michael Hanselmann
1395 b95479a5 Michael Hanselmann
    def _BeforeStart(timeout, priority):
1396 b95479a5 Michael Hanselmann
      if attempt == 0 or attempt > 5:
1397 b95479a5 Michael Hanselmann
        # Job should only be updated when it wasn't waiting for another job
1398 b95479a5 Michael Hanselmann
        self.assertEqual(queue.GetNextUpdate(), (job, True))
1399 b95479a5 Michael Hanselmann
      self.assertRaises(IndexError, queue.GetNextUpdate)
1400 b95479a5 Michael Hanselmann
      self.assertFalse(queue.IsAcquired())
1401 b95479a5 Michael Hanselmann
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITLOCK)
1402 b95479a5 Michael Hanselmann
      self.assertFalse(job.cur_opctx)
1403 b95479a5 Michael Hanselmann
1404 b95479a5 Michael Hanselmann
    def _AfterStart(op, cbs):
1405 b95479a5 Michael Hanselmann
      self.assertEqual(queue.GetNextUpdate(), (job, True))
1406 b95479a5 Michael Hanselmann
      self.assertRaises(IndexError, queue.GetNextUpdate)
1407 b95479a5 Michael Hanselmann
1408 b95479a5 Michael Hanselmann
      self.assertFalse(queue.IsAcquired())
1409 b95479a5 Michael Hanselmann
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
1410 b95479a5 Michael Hanselmann
      self.assertFalse(job.cur_opctx)
1411 b95479a5 Michael Hanselmann
1412 b95479a5 Michael Hanselmann
      # Job is running, cancelling shouldn't be possible
1413 b95479a5 Michael Hanselmann
      (success, _) = job.Cancel()
1414 b95479a5 Michael Hanselmann
      self.assertFalse(success)
1415 b95479a5 Michael Hanselmann
1416 b95479a5 Michael Hanselmann
    opexec = _FakeExecOpCodeForProc(queue, _BeforeStart, _AfterStart)
1417 b95479a5 Michael Hanselmann
1418 b95479a5 Michael Hanselmann
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1419 b95479a5 Michael Hanselmann
1420 b95479a5 Michael Hanselmann
    counter = itertools.count()
1421 b95479a5 Michael Hanselmann
    while True:
1422 b95479a5 Michael Hanselmann
      attempt = counter.next()
1423 b95479a5 Michael Hanselmann
1424 b95479a5 Michael Hanselmann
      self.assertRaises(IndexError, queue.GetNextUpdate)
1425 b95479a5 Michael Hanselmann
      self.assertRaises(IndexError, depmgr.GetNextNotification)
1426 b95479a5 Michael Hanselmann
1427 b95479a5 Michael Hanselmann
      if attempt == 0:
1428 b95479a5 Michael Hanselmann
        # This will handle the first opcode
1429 b95479a5 Michael Hanselmann
        pass
1430 b95479a5 Michael Hanselmann
      elif attempt < 4:
1431 b95479a5 Michael Hanselmann
        depmgr.AddCheckResult(job, prev_job_id, None,
1432 b95479a5 Michael Hanselmann
                              (jqueue._JobDependencyManager.WAIT, "wait"))
1433 b95479a5 Michael Hanselmann
      elif attempt == 4:
1434 b95479a5 Michael Hanselmann
        # Other job was cancelled
1435 b95479a5 Michael Hanselmann
        depmgr.AddCheckResult(job, prev_job_id, None,
1436 b95479a5 Michael Hanselmann
                              (jqueue._JobDependencyManager.CANCEL, "cancel"))
1437 b95479a5 Michael Hanselmann
1438 b95479a5 Michael Hanselmann
      if attempt == 0:
1439 b95479a5 Michael Hanselmann
        self.assertEqual(depmgr.CountPendingResults(), 0)
1440 b95479a5 Michael Hanselmann
      else:
1441 b95479a5 Michael Hanselmann
        self.assertEqual(depmgr.CountPendingResults(), 1)
1442 b95479a5 Michael Hanselmann
1443 b95479a5 Michael Hanselmann
      result = jqueue._JobProcessor(queue, opexec, job)()
1444 b95479a5 Michael Hanselmann
      if attempt <= 1 or attempt >= 4:
1445 b95479a5 Michael Hanselmann
        # Job should only be updated if there was an actual change
1446 b95479a5 Michael Hanselmann
        self.assertEqual(queue.GetNextUpdate(), (job, True))
1447 b95479a5 Michael Hanselmann
      self.assertRaises(IndexError, queue.GetNextUpdate)
1448 b95479a5 Michael Hanselmann
      self.assertFalse(depmgr.CountPendingResults())
1449 b95479a5 Michael Hanselmann
1450 b95479a5 Michael Hanselmann
      if attempt > 0 and attempt < 4:
1451 b95479a5 Michael Hanselmann
        # Simulate waiting for other job
1452 b95479a5 Michael Hanselmann
        self.assertTrue(result)
1453 b95479a5 Michael Hanselmann
        self.assertTrue(job.cur_opctx)
1454 b95479a5 Michael Hanselmann
        self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITLOCK)
1455 b95479a5 Michael Hanselmann
        self.assertRaises(IndexError, depmgr.GetNextNotification)
1456 b95479a5 Michael Hanselmann
        self.assert_(job.start_timestamp)
1457 b95479a5 Michael Hanselmann
        self.assertFalse(job.end_timestamp)
1458 b95479a5 Michael Hanselmann
        continue
1459 b95479a5 Michael Hanselmann
1460 b95479a5 Michael Hanselmann
      if result:
1461 b95479a5 Michael Hanselmann
        # Last opcode
1462 b95479a5 Michael Hanselmann
        self.assertFalse(job.cur_opctx)
1463 b95479a5 Michael Hanselmann
        self.assertEqual(queue.depmgr.GetNextNotification(), job_id)
1464 b95479a5 Michael Hanselmann
        break
1465 b95479a5 Michael Hanselmann
1466 b95479a5 Michael Hanselmann
      self.assertRaises(IndexError, depmgr.GetNextNotification)
1467 b95479a5 Michael Hanselmann
1468 b95479a5 Michael Hanselmann
      self.assertFalse(result)
1469 b95479a5 Michael Hanselmann
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1470 b95479a5 Michael Hanselmann
      self.assert_(job.start_timestamp)
1471 b95479a5 Michael Hanselmann
      self.assertFalse(job.end_timestamp)
1472 b95479a5 Michael Hanselmann
1473 b95479a5 Michael Hanselmann
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_CANCELED)
1474 b95479a5 Michael Hanselmann
    self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_CANCELED])
1475 b95479a5 Michael Hanselmann
    self.assertEqual(job.GetInfo(["opstatus", "opresult"]),
1476 b95479a5 Michael Hanselmann
                     [[constants.OP_STATUS_SUCCESS,
1477 b95479a5 Michael Hanselmann
                       constants.OP_STATUS_CANCELED,
1478 b95479a5 Michael Hanselmann
                       constants.OP_STATUS_CANCELED],
1479 b95479a5 Michael Hanselmann
                      ["Res0", "Job canceled by request",
1480 b95479a5 Michael Hanselmann
                       "Job canceled by request"]])
1481 b95479a5 Michael Hanselmann
1482 b95479a5 Michael Hanselmann
    self._GenericCheckJob(job)
1483 b95479a5 Michael Hanselmann
1484 b95479a5 Michael Hanselmann
    self.assertRaises(IndexError, queue.GetNextUpdate)
1485 b95479a5 Michael Hanselmann
    self.assertRaises(IndexError, depmgr.GetNextNotification)
1486 b95479a5 Michael Hanselmann
    self.assertFalse(depmgr.CountPendingResults())
1487 b95479a5 Michael Hanselmann
1488 b95479a5 Michael Hanselmann
    # Calling the processor on a finished job should be a no-op
1489 b95479a5 Michael Hanselmann
    self.assertTrue(jqueue._JobProcessor(queue, opexec, job)())
1490 b95479a5 Michael Hanselmann
    self.assertRaises(IndexError, queue.GetNextUpdate)
1491 b95479a5 Michael Hanselmann
1492 b95479a5 Michael Hanselmann
  def testJobDependencyWrongstatus(self):
1493 b95479a5 Michael Hanselmann
    depmgr = _FakeDependencyManager()
1494 b95479a5 Michael Hanselmann
    queue = _FakeQueueForProc(depmgr=depmgr)
1495 b95479a5 Michael Hanselmann
1496 b95479a5 Michael Hanselmann
    self.assertEqual(queue.depmgr, depmgr)
1497 b95479a5 Michael Hanselmann
1498 b95479a5 Michael Hanselmann
    prev_job_id = 9741
1499 b95479a5 Michael Hanselmann
    job_id = 11763
1500 b95479a5 Michael Hanselmann
    ops = [
1501 b95479a5 Michael Hanselmann
      opcodes.OpTestDummy(result="Res0", fail=False),
1502 b95479a5 Michael Hanselmann
      opcodes.OpTestDummy(result="Res1", fail=False,
1503 b95479a5 Michael Hanselmann
                          depends=[
1504 b95479a5 Michael Hanselmann
                            [prev_job_id, None],
1505 b95479a5 Michael Hanselmann
                            ]),
1506 b95479a5 Michael Hanselmann
      opcodes.OpTestDummy(result="Res2", fail=False),
1507 b95479a5 Michael Hanselmann
      ]
1508 b95479a5 Michael Hanselmann
1509 b95479a5 Michael Hanselmann
    # Create job
1510 b95479a5 Michael Hanselmann
    job = self._CreateJob(queue, job_id, ops)
1511 b95479a5 Michael Hanselmann
1512 b95479a5 Michael Hanselmann
    def _BeforeStart(timeout, priority):
1513 b95479a5 Michael Hanselmann
      if attempt == 0 or attempt > 5:
1514 b95479a5 Michael Hanselmann
        # Job should only be updated when it wasn't waiting for another job
1515 b95479a5 Michael Hanselmann
        self.assertEqual(queue.GetNextUpdate(), (job, True))
1516 b95479a5 Michael Hanselmann
      self.assertRaises(IndexError, queue.GetNextUpdate)
1517 b95479a5 Michael Hanselmann
      self.assertFalse(queue.IsAcquired())
1518 b95479a5 Michael Hanselmann
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITLOCK)
1519 b95479a5 Michael Hanselmann
      self.assertFalse(job.cur_opctx)
1520 b95479a5 Michael Hanselmann
1521 b95479a5 Michael Hanselmann
    def _AfterStart(op, cbs):
1522 b95479a5 Michael Hanselmann
      self.assertEqual(queue.GetNextUpdate(), (job, True))
1523 b95479a5 Michael Hanselmann
      self.assertRaises(IndexError, queue.GetNextUpdate)
1524 b95479a5 Michael Hanselmann
1525 b95479a5 Michael Hanselmann
      self.assertFalse(queue.IsAcquired())
1526 b95479a5 Michael Hanselmann
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
1527 b95479a5 Michael Hanselmann
      self.assertFalse(job.cur_opctx)
1528 b95479a5 Michael Hanselmann
1529 b95479a5 Michael Hanselmann
      # Job is running, cancelling shouldn't be possible
1530 b95479a5 Michael Hanselmann
      (success, _) = job.Cancel()
1531 b95479a5 Michael Hanselmann
      self.assertFalse(success)
1532 b95479a5 Michael Hanselmann
1533 b95479a5 Michael Hanselmann
    opexec = _FakeExecOpCodeForProc(queue, _BeforeStart, _AfterStart)
1534 b95479a5 Michael Hanselmann
1535 b95479a5 Michael Hanselmann
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1536 b95479a5 Michael Hanselmann
1537 b95479a5 Michael Hanselmann
    counter = itertools.count()
1538 b95479a5 Michael Hanselmann
    while True:
1539 b95479a5 Michael Hanselmann
      attempt = counter.next()
1540 b95479a5 Michael Hanselmann
1541 b95479a5 Michael Hanselmann
      self.assertRaises(IndexError, queue.GetNextUpdate)
1542 b95479a5 Michael Hanselmann
      self.assertRaises(IndexError, depmgr.GetNextNotification)
1543 b95479a5 Michael Hanselmann
1544 b95479a5 Michael Hanselmann
      if attempt == 0:
1545 b95479a5 Michael Hanselmann
        # This will handle the first opcode
1546 b95479a5 Michael Hanselmann
        pass
1547 b95479a5 Michael Hanselmann
      elif attempt < 4:
1548 b95479a5 Michael Hanselmann
        depmgr.AddCheckResult(job, prev_job_id, None,
1549 b95479a5 Michael Hanselmann
                              (jqueue._JobDependencyManager.WAIT, "wait"))
1550 b95479a5 Michael Hanselmann
      elif attempt == 4:
1551 b95479a5 Michael Hanselmann
        # Other job failed
1552 b95479a5 Michael Hanselmann
        depmgr.AddCheckResult(job, prev_job_id, None,
1553 b95479a5 Michael Hanselmann
                              (jqueue._JobDependencyManager.WRONGSTATUS, "w"))
1554 b95479a5 Michael Hanselmann
1555 b95479a5 Michael Hanselmann
      if attempt == 0:
1556 b95479a5 Michael Hanselmann
        self.assertEqual(depmgr.CountPendingResults(), 0)
1557 b95479a5 Michael Hanselmann
      else:
1558 b95479a5 Michael Hanselmann
        self.assertEqual(depmgr.CountPendingResults(), 1)
1559 b95479a5 Michael Hanselmann
1560 b95479a5 Michael Hanselmann
      result = jqueue._JobProcessor(queue, opexec, job)()
1561 b95479a5 Michael Hanselmann
      if attempt <= 1 or attempt >= 4:
1562 b95479a5 Michael Hanselmann
        # Job should only be updated if there was an actual change
1563 b95479a5 Michael Hanselmann
        self.assertEqual(queue.GetNextUpdate(), (job, True))
1564 b95479a5 Michael Hanselmann
      self.assertRaises(IndexError, queue.GetNextUpdate)
1565 b95479a5 Michael Hanselmann
      self.assertFalse(depmgr.CountPendingResults())
1566 b95479a5 Michael Hanselmann
1567 b95479a5 Michael Hanselmann
      if attempt > 0 and attempt < 4:
1568 b95479a5 Michael Hanselmann
        # Simulate waiting for other job
1569 b95479a5 Michael Hanselmann
        self.assertTrue(result)
1570 b95479a5 Michael Hanselmann
        self.assertTrue(job.cur_opctx)
1571 b95479a5 Michael Hanselmann
        self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITLOCK)
1572 b95479a5 Michael Hanselmann
        self.assertRaises(IndexError, depmgr.GetNextNotification)
1573 b95479a5 Michael Hanselmann
        self.assert_(job.start_timestamp)
1574 b95479a5 Michael Hanselmann
        self.assertFalse(job.end_timestamp)
1575 b95479a5 Michael Hanselmann
        continue
1576 b95479a5 Michael Hanselmann
1577 b95479a5 Michael Hanselmann
      if result:
1578 b95479a5 Michael Hanselmann
        # Last opcode
1579 b95479a5 Michael Hanselmann
        self.assertFalse(job.cur_opctx)
1580 b95479a5 Michael Hanselmann
        self.assertEqual(queue.depmgr.GetNextNotification(), job_id)
1581 b95479a5 Michael Hanselmann
        break
1582 b95479a5 Michael Hanselmann
1583 b95479a5 Michael Hanselmann
      self.assertRaises(IndexError, depmgr.GetNextNotification)
1584 b95479a5 Michael Hanselmann
1585 b95479a5 Michael Hanselmann
      self.assertFalse(result)
1586 b95479a5 Michael Hanselmann
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1587 b95479a5 Michael Hanselmann
      self.assert_(job.start_timestamp)
1588 b95479a5 Michael Hanselmann
      self.assertFalse(job.end_timestamp)
1589 b95479a5 Michael Hanselmann
1590 b95479a5 Michael Hanselmann
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_ERROR)
1591 b95479a5 Michael Hanselmann
    self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_ERROR])
1592 b95479a5 Michael Hanselmann
    self.assertEqual(job.GetInfo(["opstatus"]),
1593 b95479a5 Michael Hanselmann
                     [[constants.OP_STATUS_SUCCESS,
1594 b95479a5 Michael Hanselmann
                       constants.OP_STATUS_ERROR,
1595 b95479a5 Michael Hanselmann
                       constants.OP_STATUS_ERROR]]),
1596 b95479a5 Michael Hanselmann
1597 b95479a5 Michael Hanselmann
    (opresult, ) = job.GetInfo(["opresult"])
1598 b95479a5 Michael Hanselmann
    self.assertEqual(len(opresult), len(ops))
1599 b95479a5 Michael Hanselmann
    self.assertEqual(opresult[0], "Res0")
1600 b95479a5 Michael Hanselmann
    self.assertTrue(errors.GetEncodedError(opresult[1]))
1601 b95479a5 Michael Hanselmann
    self.assertTrue(errors.GetEncodedError(opresult[2]))
1602 b95479a5 Michael Hanselmann
1603 b95479a5 Michael Hanselmann
    self._GenericCheckJob(job)
1604 b95479a5 Michael Hanselmann
1605 b95479a5 Michael Hanselmann
    self.assertRaises(IndexError, queue.GetNextUpdate)
1606 b95479a5 Michael Hanselmann
    self.assertRaises(IndexError, depmgr.GetNextNotification)
1607 b95479a5 Michael Hanselmann
    self.assertFalse(depmgr.CountPendingResults())
1608 b95479a5 Michael Hanselmann
1609 b95479a5 Michael Hanselmann
    # Calling the processor on a finished job should be a no-op
1610 b95479a5 Michael Hanselmann
    self.assertTrue(jqueue._JobProcessor(queue, opexec, job)())
1611 b95479a5 Michael Hanselmann
    self.assertRaises(IndexError, queue.GetNextUpdate)
1612 b95479a5 Michael Hanselmann
1613 be760ba8 Michael Hanselmann
1614 26d3fd2f Michael Hanselmann
class _FakeTimeoutStrategy:
1615 26d3fd2f Michael Hanselmann
  def __init__(self, timeouts):
1616 26d3fd2f Michael Hanselmann
    self.timeouts = timeouts
1617 26d3fd2f Michael Hanselmann
    self.attempts = 0
1618 26d3fd2f Michael Hanselmann
    self.last_timeout = None
1619 26d3fd2f Michael Hanselmann
1620 26d3fd2f Michael Hanselmann
  def NextAttempt(self):
1621 26d3fd2f Michael Hanselmann
    self.attempts += 1
1622 26d3fd2f Michael Hanselmann
    if self.timeouts:
1623 26d3fd2f Michael Hanselmann
      timeout = self.timeouts.pop(0)
1624 26d3fd2f Michael Hanselmann
    else:
1625 26d3fd2f Michael Hanselmann
      timeout = None
1626 26d3fd2f Michael Hanselmann
    self.last_timeout = timeout
1627 26d3fd2f Michael Hanselmann
    return timeout
1628 26d3fd2f Michael Hanselmann
1629 26d3fd2f Michael Hanselmann
1630 26d3fd2f Michael Hanselmann
class TestJobProcessorTimeouts(unittest.TestCase, _JobProcessorTestUtils):
1631 26d3fd2f Michael Hanselmann
  def setUp(self):
1632 26d3fd2f Michael Hanselmann
    self.queue = _FakeQueueForProc()
1633 26d3fd2f Michael Hanselmann
    self.job = None
1634 26d3fd2f Michael Hanselmann
    self.curop = None
1635 26d3fd2f Michael Hanselmann
    self.opcounter = None
1636 26d3fd2f Michael Hanselmann
    self.timeout_strategy = None
1637 26d3fd2f Michael Hanselmann
    self.retries = 0
1638 26d3fd2f Michael Hanselmann
    self.prev_tsop = None
1639 26d3fd2f Michael Hanselmann
    self.prev_prio = None
1640 5fd6b694 Michael Hanselmann
    self.prev_status = None
1641 5fd6b694 Michael Hanselmann
    self.lock_acq_prio = None
1642 26d3fd2f Michael Hanselmann
    self.gave_lock = None
1643 26d3fd2f Michael Hanselmann
    self.done_lock_before_blocking = False
1644 26d3fd2f Michael Hanselmann
1645 f23db633 Michael Hanselmann
  def _BeforeStart(self, timeout, priority):
1646 26d3fd2f Michael Hanselmann
    job = self.job
1647 26d3fd2f Michael Hanselmann
1648 5fd6b694 Michael Hanselmann
    # If status has changed, job must've been written
1649 5fd6b694 Michael Hanselmann
    if self.prev_status != self.job.ops[self.curop].status:
1650 5fd6b694 Michael Hanselmann
      self.assertEqual(self.queue.GetNextUpdate(), (job, True))
1651 ebb2a2a3 Michael Hanselmann
    self.assertRaises(IndexError, self.queue.GetNextUpdate)
1652 5fd6b694 Michael Hanselmann
1653 26d3fd2f Michael Hanselmann
    self.assertFalse(self.queue.IsAcquired())
1654 26d3fd2f Michael Hanselmann
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITLOCK)
1655 26d3fd2f Michael Hanselmann
1656 26d3fd2f Michael Hanselmann
    ts = self.timeout_strategy
1657 26d3fd2f Michael Hanselmann
1658 26d3fd2f Michael Hanselmann
    self.assert_(timeout is None or isinstance(timeout, (int, float)))
1659 26d3fd2f Michael Hanselmann
    self.assertEqual(timeout, ts.last_timeout)
1660 f23db633 Michael Hanselmann
    self.assertEqual(priority, job.ops[self.curop].priority)
1661 26d3fd2f Michael Hanselmann
1662 26d3fd2f Michael Hanselmann
    self.gave_lock = True
1663 5fd6b694 Michael Hanselmann
    self.lock_acq_prio = priority
1664 26d3fd2f Michael Hanselmann
1665 26d3fd2f Michael Hanselmann
    if (self.curop == 3 and
1666 26d3fd2f Michael Hanselmann
        job.ops[self.curop].priority == constants.OP_PRIO_HIGHEST + 3):
1667 26d3fd2f Michael Hanselmann
      # Give locks before running into blocking acquire
1668 26d3fd2f Michael Hanselmann
      assert self.retries == 7
1669 26d3fd2f Michael Hanselmann
      self.retries = 0
1670 26d3fd2f Michael Hanselmann
      self.done_lock_before_blocking = True
1671 26d3fd2f Michael Hanselmann
      return
1672 26d3fd2f Michael Hanselmann
1673 26d3fd2f Michael Hanselmann
    if self.retries > 0:
1674 26d3fd2f Michael Hanselmann
      self.assert_(timeout is not None)
1675 26d3fd2f Michael Hanselmann
      self.retries -= 1
1676 26d3fd2f Michael Hanselmann
      self.gave_lock = False
1677 26d3fd2f Michael Hanselmann
      raise mcpu.LockAcquireTimeout()
1678 26d3fd2f Michael Hanselmann
1679 26d3fd2f Michael Hanselmann
    if job.ops[self.curop].priority == constants.OP_PRIO_HIGHEST:
1680 26d3fd2f Michael Hanselmann
      assert self.retries == 0, "Didn't exhaust all retries at highest priority"
1681 26d3fd2f Michael Hanselmann
      assert not ts.timeouts
1682 26d3fd2f Michael Hanselmann
      self.assert_(timeout is None)
1683 26d3fd2f Michael Hanselmann
1684 26d3fd2f Michael Hanselmann
  def _AfterStart(self, op, cbs):
1685 26d3fd2f Michael Hanselmann
    job = self.job
1686 26d3fd2f Michael Hanselmann
1687 5fd6b694 Michael Hanselmann
    # Setting to "running" requires an update
1688 ebb2a2a3 Michael Hanselmann
    self.assertEqual(self.queue.GetNextUpdate(), (job, True))
1689 ebb2a2a3 Michael Hanselmann
    self.assertRaises(IndexError, self.queue.GetNextUpdate)
1690 5fd6b694 Michael Hanselmann
1691 26d3fd2f Michael Hanselmann
    self.assertFalse(self.queue.IsAcquired())
1692 26d3fd2f Michael Hanselmann
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
1693 26d3fd2f Michael Hanselmann
1694 26d3fd2f Michael Hanselmann
    # Job is running, cancelling shouldn't be possible
1695 26d3fd2f Michael Hanselmann
    (success, _) = job.Cancel()
1696 26d3fd2f Michael Hanselmann
    self.assertFalse(success)
1697 26d3fd2f Michael Hanselmann
1698 26d3fd2f Michael Hanselmann
  def _NextOpcode(self):
1699 26d3fd2f Michael Hanselmann
    self.curop = self.opcounter.next()
1700 26d3fd2f Michael Hanselmann
    self.prev_prio = self.job.ops[self.curop].priority
1701 5fd6b694 Michael Hanselmann
    self.prev_status = self.job.ops[self.curop].status
1702 26d3fd2f Michael Hanselmann
1703 26d3fd2f Michael Hanselmann
  def _NewTimeoutStrategy(self):
1704 26d3fd2f Michael Hanselmann
    job = self.job
1705 26d3fd2f Michael Hanselmann
1706 26d3fd2f Michael Hanselmann
    self.assertEqual(self.retries, 0)
1707 26d3fd2f Michael Hanselmann
1708 26d3fd2f Michael Hanselmann
    if self.prev_tsop == self.curop:
1709 26d3fd2f Michael Hanselmann
      # Still on the same opcode, priority must've been increased
1710 26d3fd2f Michael Hanselmann
      self.assertEqual(self.prev_prio, job.ops[self.curop].priority + 1)
1711 26d3fd2f Michael Hanselmann
1712 26d3fd2f Michael Hanselmann
    if self.curop == 1:
1713 26d3fd2f Michael Hanselmann
      # Normal retry
1714 26d3fd2f Michael Hanselmann
      timeouts = range(10, 31, 10)
1715 26d3fd2f Michael Hanselmann
      self.retries = len(timeouts) - 1
1716 26d3fd2f Michael Hanselmann
1717 26d3fd2f Michael Hanselmann
    elif self.curop == 2:
1718 26d3fd2f Michael Hanselmann
      # Let this run into a blocking acquire
1719 26d3fd2f Michael Hanselmann
      timeouts = range(11, 61, 12)
1720 26d3fd2f Michael Hanselmann
      self.retries = len(timeouts)
1721 26d3fd2f Michael Hanselmann
1722 26d3fd2f Michael Hanselmann
    elif self.curop == 3:
1723 26d3fd2f Michael Hanselmann
      # Wait for priority to increase, but give lock before blocking acquire
1724 26d3fd2f Michael Hanselmann
      timeouts = range(12, 100, 14)
1725 26d3fd2f Michael Hanselmann
      self.retries = len(timeouts)
1726 26d3fd2f Michael Hanselmann
1727 26d3fd2f Michael Hanselmann
      self.assertFalse(self.done_lock_before_blocking)
1728 26d3fd2f Michael Hanselmann
1729 26d3fd2f Michael Hanselmann
    elif self.curop == 4:
1730 26d3fd2f Michael Hanselmann
      self.assert_(self.done_lock_before_blocking)
1731 26d3fd2f Michael Hanselmann
1732 26d3fd2f Michael Hanselmann
      # Timeouts, but no need to retry
1733 26d3fd2f Michael Hanselmann
      timeouts = range(10, 31, 10)
1734 26d3fd2f Michael Hanselmann
      self.retries = 0
1735 26d3fd2f Michael Hanselmann
1736 26d3fd2f Michael Hanselmann
    elif self.curop == 5:
1737 26d3fd2f Michael Hanselmann
      # Normal retry
1738 26d3fd2f Michael Hanselmann
      timeouts = range(19, 100, 11)
1739 26d3fd2f Michael Hanselmann
      self.retries = len(timeouts)
1740 26d3fd2f Michael Hanselmann
1741 26d3fd2f Michael Hanselmann
    else:
1742 26d3fd2f Michael Hanselmann
      timeouts = []
1743 26d3fd2f Michael Hanselmann
      self.retries = 0
1744 26d3fd2f Michael Hanselmann
1745 26d3fd2f Michael Hanselmann
    assert len(job.ops) == 10
1746 26d3fd2f Michael Hanselmann
    assert self.retries <= len(timeouts)
1747 26d3fd2f Michael Hanselmann
1748 26d3fd2f Michael Hanselmann
    ts = _FakeTimeoutStrategy(timeouts)
1749 26d3fd2f Michael Hanselmann
1750 26d3fd2f Michael Hanselmann
    self.timeout_strategy = ts
1751 26d3fd2f Michael Hanselmann
    self.prev_tsop = self.curop
1752 26d3fd2f Michael Hanselmann
    self.prev_prio = job.ops[self.curop].priority
1753 26d3fd2f Michael Hanselmann
1754 26d3fd2f Michael Hanselmann
    return ts
1755 26d3fd2f Michael Hanselmann
1756 26d3fd2f Michael Hanselmann
  def testTimeout(self):
1757 26d3fd2f Michael Hanselmann
    ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
1758 26d3fd2f Michael Hanselmann
           for i in range(10)]
1759 26d3fd2f Michael Hanselmann
1760 26d3fd2f Michael Hanselmann
    # Create job
1761 26d3fd2f Michael Hanselmann
    job_id = 15801
1762 26d3fd2f Michael Hanselmann
    job = self._CreateJob(self.queue, job_id, ops)
1763 26d3fd2f Michael Hanselmann
    self.job = job
1764 26d3fd2f Michael Hanselmann
1765 26d3fd2f Michael Hanselmann
    self.opcounter = itertools.count(0)
1766 26d3fd2f Michael Hanselmann
1767 ebb2a2a3 Michael Hanselmann
    opexec = _FakeExecOpCodeForProc(self.queue, self._BeforeStart,
1768 ebb2a2a3 Michael Hanselmann
                                    self._AfterStart)
1769 26d3fd2f Michael Hanselmann
    tsf = self._NewTimeoutStrategy
1770 26d3fd2f Michael Hanselmann
1771 26d3fd2f Michael Hanselmann
    self.assertFalse(self.done_lock_before_blocking)
1772 26d3fd2f Michael Hanselmann
1773 5fd6b694 Michael Hanselmann
    while True:
1774 26d3fd2f Michael Hanselmann
      proc = jqueue._JobProcessor(self.queue, opexec, job,
1775 26d3fd2f Michael Hanselmann
                                  _timeout_strategy_factory=tsf)
1776 26d3fd2f Michael Hanselmann
1777 ebb2a2a3 Michael Hanselmann
      self.assertRaises(IndexError, self.queue.GetNextUpdate)
1778 5fd6b694 Michael Hanselmann
1779 5fd6b694 Michael Hanselmann
      if self.curop is not None:
1780 5fd6b694 Michael Hanselmann
        self.prev_status = self.job.ops[self.curop].status
1781 5fd6b694 Michael Hanselmann
1782 5fd6b694 Michael Hanselmann
      self.lock_acq_prio = None
1783 5fd6b694 Michael Hanselmann
1784 26d3fd2f Michael Hanselmann
      result = proc(_nextop_fn=self._NextOpcode)
1785 5fd6b694 Michael Hanselmann
      assert self.curop is not None
1786 5fd6b694 Michael Hanselmann
1787 5fd6b694 Michael Hanselmann
      if result or self.gave_lock:
1788 5fd6b694 Michael Hanselmann
        # Got lock and/or job is done, result must've been written
1789 5fd6b694 Michael Hanselmann
        self.assertFalse(job.cur_opctx)
1790 5fd6b694 Michael Hanselmann
        self.assertEqual(self.queue.GetNextUpdate(), (job, True))
1791 5fd6b694 Michael Hanselmann
        self.assertRaises(IndexError, self.queue.GetNextUpdate)
1792 5fd6b694 Michael Hanselmann
        self.assertEqual(self.lock_acq_prio, job.ops[self.curop].priority)
1793 5fd6b694 Michael Hanselmann
        self.assert_(job.ops[self.curop].exec_timestamp)
1794 5fd6b694 Michael Hanselmann
1795 26d3fd2f Michael Hanselmann
      if result:
1796 26d3fd2f Michael Hanselmann
        self.assertFalse(job.cur_opctx)
1797 26d3fd2f Michael Hanselmann
        break
1798 26d3fd2f Michael Hanselmann
1799 26d3fd2f Michael Hanselmann
      self.assertFalse(result)
1800 26d3fd2f Michael Hanselmann
1801 5fd6b694 Michael Hanselmann
      if self.curop == 0:
1802 5fd6b694 Michael Hanselmann
        self.assertEqual(job.ops[self.curop].start_timestamp,
1803 5fd6b694 Michael Hanselmann
                         job.start_timestamp)
1804 5fd6b694 Michael Hanselmann
1805 26d3fd2f Michael Hanselmann
      if self.gave_lock:
1806 5fd6b694 Michael Hanselmann
        # Opcode finished, but job not yet done
1807 5fd6b694 Michael Hanselmann
        self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1808 26d3fd2f Michael Hanselmann
      else:
1809 5fd6b694 Michael Hanselmann
        # Did not get locks
1810 26d3fd2f Michael Hanselmann
        self.assert_(job.cur_opctx)
1811 26d3fd2f Michael Hanselmann
        self.assertEqual(job.cur_opctx._timeout_strategy._fn,
1812 26d3fd2f Michael Hanselmann
                         self.timeout_strategy.NextAttempt)
1813 5fd6b694 Michael Hanselmann
        self.assertFalse(job.ops[self.curop].exec_timestamp)
1814 5fd6b694 Michael Hanselmann
        self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITLOCK)
1815 5fd6b694 Michael Hanselmann
1816 5fd6b694 Michael Hanselmann
        # If priority has changed since acquiring locks, the job must've been
1817 5fd6b694 Michael Hanselmann
        # updated
1818 5fd6b694 Michael Hanselmann
        if self.lock_acq_prio != job.ops[self.curop].priority:
1819 5fd6b694 Michael Hanselmann
          self.assertEqual(self.queue.GetNextUpdate(), (job, True))
1820 5fd6b694 Michael Hanselmann
1821 5fd6b694 Michael Hanselmann
      self.assertRaises(IndexError, self.queue.GetNextUpdate)
1822 26d3fd2f Michael Hanselmann
1823 26d3fd2f Michael Hanselmann
      self.assert_(job.start_timestamp)
1824 26d3fd2f Michael Hanselmann
      self.assertFalse(job.end_timestamp)
1825 26d3fd2f Michael Hanselmann
1826 26d3fd2f Michael Hanselmann
    self.assertEqual(self.curop, len(job.ops) - 1)
1827 26d3fd2f Michael Hanselmann
    self.assertEqual(self.job, job)
1828 26d3fd2f Michael Hanselmann
    self.assertEqual(self.opcounter.next(), len(job.ops))
1829 26d3fd2f Michael Hanselmann
    self.assert_(self.done_lock_before_blocking)
1830 26d3fd2f Michael Hanselmann
1831 ebb2a2a3 Michael Hanselmann
    self.assertRaises(IndexError, self.queue.GetNextUpdate)
1832 26d3fd2f Michael Hanselmann
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_SUCCESS)
1833 26d3fd2f Michael Hanselmann
    self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_SUCCESS])
1834 26d3fd2f Michael Hanselmann
    self.assertEqual(job.GetInfo(["opresult"]),
1835 26d3fd2f Michael Hanselmann
                     [[op.input.result for op in job.ops]])
1836 26d3fd2f Michael Hanselmann
    self.assertEqual(job.GetInfo(["opstatus"]),
1837 26d3fd2f Michael Hanselmann
                     [len(job.ops) * [constants.OP_STATUS_SUCCESS]])
1838 26d3fd2f Michael Hanselmann
    self.assert_(compat.all(op.start_timestamp and op.end_timestamp
1839 26d3fd2f Michael Hanselmann
                            for op in job.ops))
1840 26d3fd2f Michael Hanselmann
1841 66bd7445 Michael Hanselmann
    # Calling the processor on a finished job should be a no-op
1842 66bd7445 Michael Hanselmann
    self.assertTrue(jqueue._JobProcessor(self.queue, opexec, job)())
1843 66bd7445 Michael Hanselmann
    self.assertRaises(IndexError, self.queue.GetNextUpdate)
1844 26d3fd2f Michael Hanselmann
1845 26d3fd2f Michael Hanselmann
1846 b95479a5 Michael Hanselmann
class TestJobDependencyManager(unittest.TestCase):
1847 b95479a5 Michael Hanselmann
  class _FakeJob:
1848 b95479a5 Michael Hanselmann
    def __init__(self, job_id):
1849 b95479a5 Michael Hanselmann
      self.id = str(job_id)
1850 b95479a5 Michael Hanselmann
1851 b95479a5 Michael Hanselmann
  def setUp(self):
1852 b95479a5 Michael Hanselmann
    self._status = []
1853 b95479a5 Michael Hanselmann
    self._queue = []
1854 b95479a5 Michael Hanselmann
    self.jdm = jqueue._JobDependencyManager(self._GetStatus, self._Enqueue)
1855 b95479a5 Michael Hanselmann
1856 b95479a5 Michael Hanselmann
  def _GetStatus(self, job_id):
1857 b95479a5 Michael Hanselmann
    (exp_job_id, result) = self._status.pop(0)
1858 b95479a5 Michael Hanselmann
    self.assertEqual(exp_job_id, job_id)
1859 b95479a5 Michael Hanselmann
    return result
1860 b95479a5 Michael Hanselmann
1861 b95479a5 Michael Hanselmann
  def _Enqueue(self, jobs):
1862 b95479a5 Michael Hanselmann
    self._queue.append(jobs)
1863 b95479a5 Michael Hanselmann
1864 b95479a5 Michael Hanselmann
  def testNotFinalizedThenCancel(self):
1865 b95479a5 Michael Hanselmann
    job = self._FakeJob(17697)
1866 b95479a5 Michael Hanselmann
    job_id = str(28625)
1867 b95479a5 Michael Hanselmann
1868 b95479a5 Michael Hanselmann
    self._status.append((job_id, constants.JOB_STATUS_RUNNING))
1869 b95479a5 Michael Hanselmann
    (result, _) = self.jdm.CheckAndRegister(job, job_id, [])
1870 b95479a5 Michael Hanselmann
    self.assertEqual(result, self.jdm.WAIT)
1871 b95479a5 Michael Hanselmann
    self.assertFalse(self._status)
1872 b95479a5 Michael Hanselmann
    self.assertFalse(self._queue)
1873 b95479a5 Michael Hanselmann
    self.assertTrue(self.jdm.JobWaiting(job))
1874 b95479a5 Michael Hanselmann
    self.assertEqual(self.jdm._waiters, {
1875 b95479a5 Michael Hanselmann
      job_id: set([job]),
1876 b95479a5 Michael Hanselmann
      })
1877 b95479a5 Michael Hanselmann
1878 b95479a5 Michael Hanselmann
    self._status.append((job_id, constants.JOB_STATUS_CANCELED))
1879 b95479a5 Michael Hanselmann
    (result, _) = self.jdm.CheckAndRegister(job, job_id, [])
1880 b95479a5 Michael Hanselmann
    self.assertEqual(result, self.jdm.CANCEL)
1881 b95479a5 Michael Hanselmann
    self.assertFalse(self._status)
1882 b95479a5 Michael Hanselmann
    self.assertFalse(self._queue)
1883 b95479a5 Michael Hanselmann
    self.assertFalse(self.jdm.JobWaiting(job))
1884 b95479a5 Michael Hanselmann
1885 b95479a5 Michael Hanselmann
  def testRequireCancel(self):
1886 b95479a5 Michael Hanselmann
    job = self._FakeJob(5278)
1887 b95479a5 Michael Hanselmann
    job_id = str(9610)
1888 b95479a5 Michael Hanselmann
    dep_status = [constants.JOB_STATUS_CANCELED]
1889 b95479a5 Michael Hanselmann
1890 b95479a5 Michael Hanselmann
    self._status.append((job_id, constants.JOB_STATUS_WAITLOCK))
1891 b95479a5 Michael Hanselmann
    (result, _) = self.jdm.CheckAndRegister(job, job_id, dep_status)
1892 b95479a5 Michael Hanselmann
    self.assertEqual(result, self.jdm.WAIT)
1893 b95479a5 Michael Hanselmann
    self.assertFalse(self._status)
1894 b95479a5 Michael Hanselmann
    self.assertFalse(self._queue)
1895 b95479a5 Michael Hanselmann
    self.assertTrue(self.jdm.JobWaiting(job))
1896 b95479a5 Michael Hanselmann
    self.assertEqual(self.jdm._waiters, {
1897 b95479a5 Michael Hanselmann
      job_id: set([job]),
1898 b95479a5 Michael Hanselmann
      })
1899 b95479a5 Michael Hanselmann
1900 b95479a5 Michael Hanselmann
    self._status.append((job_id, constants.JOB_STATUS_CANCELED))
1901 b95479a5 Michael Hanselmann
    (result, _) = self.jdm.CheckAndRegister(job, job_id, dep_status)
1902 b95479a5 Michael Hanselmann
    self.assertEqual(result, self.jdm.CONTINUE)
1903 b95479a5 Michael Hanselmann
    self.assertFalse(self._status)
1904 b95479a5 Michael Hanselmann
    self.assertFalse(self._queue)
1905 b95479a5 Michael Hanselmann
    self.assertFalse(self.jdm.JobWaiting(job))
1906 b95479a5 Michael Hanselmann
1907 b95479a5 Michael Hanselmann
  def testRequireError(self):
1908 b95479a5 Michael Hanselmann
    job = self._FakeJob(21459)
1909 b95479a5 Michael Hanselmann
    job_id = str(25519)
1910 b95479a5 Michael Hanselmann
    dep_status = [constants.JOB_STATUS_ERROR]
1911 b95479a5 Michael Hanselmann
1912 b95479a5 Michael Hanselmann
    self._status.append((job_id, constants.JOB_STATUS_WAITLOCK))
1913 b95479a5 Michael Hanselmann
    (result, _) = self.jdm.CheckAndRegister(job, job_id, dep_status)
1914 b95479a5 Michael Hanselmann
    self.assertEqual(result, self.jdm.WAIT)
1915 b95479a5 Michael Hanselmann
    self.assertFalse(self._status)
1916 b95479a5 Michael Hanselmann
    self.assertFalse(self._queue)
1917 b95479a5 Michael Hanselmann
    self.assertTrue(self.jdm.JobWaiting(job))
1918 b95479a5 Michael Hanselmann
    self.assertEqual(self.jdm._waiters, {
1919 b95479a5 Michael Hanselmann
      job_id: set([job]),
1920 b95479a5 Michael Hanselmann
      })
1921 b95479a5 Michael Hanselmann
1922 b95479a5 Michael Hanselmann
    self._status.append((job_id, constants.JOB_STATUS_ERROR))
1923 b95479a5 Michael Hanselmann
    (result, _) = self.jdm.CheckAndRegister(job, job_id, dep_status)
1924 b95479a5 Michael Hanselmann
    self.assertEqual(result, self.jdm.CONTINUE)
1925 b95479a5 Michael Hanselmann
    self.assertFalse(self._status)
1926 b95479a5 Michael Hanselmann
    self.assertFalse(self._queue)
1927 b95479a5 Michael Hanselmann
    self.assertFalse(self.jdm.JobWaiting(job))
1928 b95479a5 Michael Hanselmann
1929 b95479a5 Michael Hanselmann
  def testRequireMultiple(self):
1930 b95479a5 Michael Hanselmann
    dep_status = list(constants.JOBS_FINALIZED)
1931 b95479a5 Michael Hanselmann
1932 b95479a5 Michael Hanselmann
    for end_status in dep_status:
1933 b95479a5 Michael Hanselmann
      job = self._FakeJob(21343)
1934 b95479a5 Michael Hanselmann
      job_id = str(14609)
1935 b95479a5 Michael Hanselmann
1936 b95479a5 Michael Hanselmann
      self._status.append((job_id, constants.JOB_STATUS_WAITLOCK))
1937 b95479a5 Michael Hanselmann
      (result, _) = self.jdm.CheckAndRegister(job, job_id, dep_status)
1938 b95479a5 Michael Hanselmann
      self.assertEqual(result, self.jdm.WAIT)
1939 b95479a5 Michael Hanselmann
      self.assertFalse(self._status)
1940 b95479a5 Michael Hanselmann
      self.assertFalse(self._queue)
1941 b95479a5 Michael Hanselmann
      self.assertTrue(self.jdm.JobWaiting(job))
1942 b95479a5 Michael Hanselmann
      self.assertEqual(self.jdm._waiters, {
1943 b95479a5 Michael Hanselmann
        job_id: set([job]),
1944 b95479a5 Michael Hanselmann
        })
1945 b95479a5 Michael Hanselmann
1946 b95479a5 Michael Hanselmann
      self._status.append((job_id, end_status))
1947 b95479a5 Michael Hanselmann
      (result, _) = self.jdm.CheckAndRegister(job, job_id, dep_status)
1948 b95479a5 Michael Hanselmann
      self.assertEqual(result, self.jdm.CONTINUE)
1949 b95479a5 Michael Hanselmann
      self.assertFalse(self._status)
1950 b95479a5 Michael Hanselmann
      self.assertFalse(self._queue)
1951 b95479a5 Michael Hanselmann
      self.assertFalse(self.jdm.JobWaiting(job))
1952 b95479a5 Michael Hanselmann
1953 b95479a5 Michael Hanselmann
  def testNotify(self):
1954 b95479a5 Michael Hanselmann
    job = self._FakeJob(8227)
1955 b95479a5 Michael Hanselmann
    job_id = str(4113)
1956 b95479a5 Michael Hanselmann
1957 b95479a5 Michael Hanselmann
    self._status.append((job_id, constants.JOB_STATUS_RUNNING))
1958 b95479a5 Michael Hanselmann
    (result, _) = self.jdm.CheckAndRegister(job, job_id, [])
1959 b95479a5 Michael Hanselmann
    self.assertEqual(result, self.jdm.WAIT)
1960 b95479a5 Michael Hanselmann
    self.assertFalse(self._status)
1961 b95479a5 Michael Hanselmann
    self.assertFalse(self._queue)
1962 b95479a5 Michael Hanselmann
    self.assertTrue(self.jdm.JobWaiting(job))
1963 b95479a5 Michael Hanselmann
    self.assertEqual(self.jdm._waiters, {
1964 b95479a5 Michael Hanselmann
      job_id: set([job]),
1965 b95479a5 Michael Hanselmann
      })
1966 b95479a5 Michael Hanselmann
1967 b95479a5 Michael Hanselmann
    self.jdm.NotifyWaiters(job_id)
1968 b95479a5 Michael Hanselmann
    self.assertFalse(self._status)
1969 b95479a5 Michael Hanselmann
    self.assertFalse(self.jdm._waiters)
1970 b95479a5 Michael Hanselmann
    self.assertFalse(self.jdm.JobWaiting(job))
1971 b95479a5 Michael Hanselmann
    self.assertEqual(self._queue, [set([job])])
1972 b95479a5 Michael Hanselmann
1973 b95479a5 Michael Hanselmann
  def testWrongStatus(self):
1974 b95479a5 Michael Hanselmann
    job = self._FakeJob(10102)
1975 b95479a5 Michael Hanselmann
    job_id = str(1271)
1976 b95479a5 Michael Hanselmann
1977 b95479a5 Michael Hanselmann
    self._status.append((job_id, constants.JOB_STATUS_QUEUED))
1978 b95479a5 Michael Hanselmann
    (result, _) = self.jdm.CheckAndRegister(job, job_id,
1979 b95479a5 Michael Hanselmann
                                            [constants.JOB_STATUS_SUCCESS])
1980 b95479a5 Michael Hanselmann
    self.assertEqual(result, self.jdm.WAIT)
1981 b95479a5 Michael Hanselmann
    self.assertFalse(self._status)
1982 b95479a5 Michael Hanselmann
    self.assertFalse(self._queue)
1983 b95479a5 Michael Hanselmann
    self.assertTrue(self.jdm.JobWaiting(job))
1984 b95479a5 Michael Hanselmann
    self.assertEqual(self.jdm._waiters, {
1985 b95479a5 Michael Hanselmann
      job_id: set([job]),
1986 b95479a5 Michael Hanselmann
      })
1987 b95479a5 Michael Hanselmann
1988 b95479a5 Michael Hanselmann
    self._status.append((job_id, constants.JOB_STATUS_ERROR))
1989 b95479a5 Michael Hanselmann
    (result, _) = self.jdm.CheckAndRegister(job, job_id,
1990 b95479a5 Michael Hanselmann
                                            [constants.JOB_STATUS_SUCCESS])
1991 b95479a5 Michael Hanselmann
    self.assertEqual(result, self.jdm.WRONGSTATUS)
1992 b95479a5 Michael Hanselmann
    self.assertFalse(self._status)
1993 b95479a5 Michael Hanselmann
    self.assertFalse(self._queue)
1994 b95479a5 Michael Hanselmann
    self.assertFalse(self.jdm.JobWaiting(job))
1995 b95479a5 Michael Hanselmann
1996 b95479a5 Michael Hanselmann
  def testCorrectStatus(self):
1997 b95479a5 Michael Hanselmann
    job = self._FakeJob(24273)
1998 b95479a5 Michael Hanselmann
    job_id = str(23885)
1999 b95479a5 Michael Hanselmann
2000 b95479a5 Michael Hanselmann
    self._status.append((job_id, constants.JOB_STATUS_QUEUED))
2001 b95479a5 Michael Hanselmann
    (result, _) = self.jdm.CheckAndRegister(job, job_id,
2002 b95479a5 Michael Hanselmann
                                            [constants.JOB_STATUS_SUCCESS])
2003 b95479a5 Michael Hanselmann
    self.assertEqual(result, self.jdm.WAIT)
2004 b95479a5 Michael Hanselmann
    self.assertFalse(self._status)
2005 b95479a5 Michael Hanselmann
    self.assertFalse(self._queue)
2006 b95479a5 Michael Hanselmann
    self.assertTrue(self.jdm.JobWaiting(job))
2007 b95479a5 Michael Hanselmann
    self.assertEqual(self.jdm._waiters, {
2008 b95479a5 Michael Hanselmann
      job_id: set([job]),
2009 b95479a5 Michael Hanselmann
      })
2010 b95479a5 Michael Hanselmann
2011 b95479a5 Michael Hanselmann
    self._status.append((job_id, constants.JOB_STATUS_SUCCESS))
2012 b95479a5 Michael Hanselmann
    (result, _) = self.jdm.CheckAndRegister(job, job_id,
2013 b95479a5 Michael Hanselmann
                                            [constants.JOB_STATUS_SUCCESS])
2014 b95479a5 Michael Hanselmann
    self.assertEqual(result, self.jdm.CONTINUE)
2015 b95479a5 Michael Hanselmann
    self.assertFalse(self._status)
2016 b95479a5 Michael Hanselmann
    self.assertFalse(self._queue)
2017 b95479a5 Michael Hanselmann
    self.assertFalse(self.jdm.JobWaiting(job))
2018 b95479a5 Michael Hanselmann
2019 b95479a5 Michael Hanselmann
  def testFinalizedRightAway(self):
2020 b95479a5 Michael Hanselmann
    job = self._FakeJob(224)
2021 b95479a5 Michael Hanselmann
    job_id = str(3081)
2022 b95479a5 Michael Hanselmann
2023 b95479a5 Michael Hanselmann
    self._status.append((job_id, constants.JOB_STATUS_SUCCESS))
2024 b95479a5 Michael Hanselmann
    (result, _) = self.jdm.CheckAndRegister(job, job_id,
2025 b95479a5 Michael Hanselmann
                                            [constants.JOB_STATUS_SUCCESS])
2026 b95479a5 Michael Hanselmann
    self.assertEqual(result, self.jdm.CONTINUE)
2027 b95479a5 Michael Hanselmann
    self.assertFalse(self._status)
2028 b95479a5 Michael Hanselmann
    self.assertFalse(self._queue)
2029 b95479a5 Michael Hanselmann
    self.assertFalse(self.jdm.JobWaiting(job))
2030 b95479a5 Michael Hanselmann
    self.assertEqual(self.jdm._waiters, {
2031 b95479a5 Michael Hanselmann
      job_id: set(),
2032 b95479a5 Michael Hanselmann
      })
2033 b95479a5 Michael Hanselmann
2034 b95479a5 Michael Hanselmann
    # Force cleanup
2035 b95479a5 Michael Hanselmann
    self.jdm.NotifyWaiters("0")
2036 b95479a5 Michael Hanselmann
    self.assertFalse(self.jdm._waiters)
2037 b95479a5 Michael Hanselmann
    self.assertFalse(self._status)
2038 b95479a5 Michael Hanselmann
    self.assertFalse(self._queue)
2039 b95479a5 Michael Hanselmann
2040 b95479a5 Michael Hanselmann
  def testSelfDependency(self):
2041 b95479a5 Michael Hanselmann
    job = self._FakeJob(18937)
2042 b95479a5 Michael Hanselmann
2043 b95479a5 Michael Hanselmann
    self._status.append((job.id, constants.JOB_STATUS_SUCCESS))
2044 b95479a5 Michael Hanselmann
    (result, _) = self.jdm.CheckAndRegister(job, job.id, [])
2045 b95479a5 Michael Hanselmann
    self.assertEqual(result, self.jdm.ERROR)
2046 b95479a5 Michael Hanselmann
2047 b95479a5 Michael Hanselmann
  def testJobDisappears(self):
2048 b95479a5 Michael Hanselmann
    job = self._FakeJob(30540)
2049 b95479a5 Michael Hanselmann
    job_id = str(23769)
2050 b95479a5 Michael Hanselmann
2051 b95479a5 Michael Hanselmann
    def _FakeStatus(_):
2052 b95479a5 Michael Hanselmann
      raise errors.JobLost("#msg#")
2053 b95479a5 Michael Hanselmann
2054 b95479a5 Michael Hanselmann
    jdm = jqueue._JobDependencyManager(_FakeStatus, None)
2055 b95479a5 Michael Hanselmann
    (result, _) = jdm.CheckAndRegister(job, job_id, [])
2056 b95479a5 Michael Hanselmann
    self.assertEqual(result, self.jdm.ERROR)
2057 b95479a5 Michael Hanselmann
    self.assertFalse(jdm.JobWaiting(job))
2058 b95479a5 Michael Hanselmann
2059 b95479a5 Michael Hanselmann
2060 989a8bee Michael Hanselmann
if __name__ == "__main__":
2061 989a8bee Michael Hanselmann
  testutils.GanetiTestProgram()