Statistics
| Branch: | Tag: | Revision:

root / test / ganeti.jqueue_unittest.py @ 3c631ea2

History | View | Annotate | Download (94.7 kB)

1
#!/usr/bin/python
2
#
3

    
4
# Copyright (C) 2010, 2011, 2012 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Script for testing ganeti.jqueue"""
23

    
24
import os
25
import sys
26
import unittest
27
import tempfile
28
import shutil
29
import errno
30
import itertools
31
import random
32
import operator
33

    
34
from ganeti import constants
35
from ganeti import utils
36
from ganeti import errors
37
from ganeti import jqueue
38
from ganeti import opcodes
39
from ganeti import compat
40
from ganeti import mcpu
41
from ganeti import query
42
from ganeti import workerpool
43

    
44
import testutils
45

    
46

    
47
class _FakeJob:
48
  def __init__(self, job_id, status):
49
    self.id = job_id
50
    self.writable = False
51
    self._status = status
52
    self._log = []
53

    
54
  def SetStatus(self, status):
55
    self._status = status
56

    
57
  def AddLogEntry(self, msg):
58
    self._log.append((len(self._log), msg))
59

    
60
  def CalcStatus(self):
61
    return self._status
62

    
63
  def GetInfo(self, fields):
64
    result = []
65

    
66
    for name in fields:
67
      if name == "status":
68
        result.append(self._status)
69
      else:
70
        raise Exception("Unknown field")
71

    
72
    return result
73

    
74
  def GetLogEntries(self, newer_than):
75
    assert newer_than is None or newer_than >= 0
76

    
77
    if newer_than is None:
78
      return self._log
79

    
80
    return self._log[newer_than:]
81

    
82

    
83
class TestJobChangesChecker(unittest.TestCase):
84
  def testStatus(self):
85
    job = _FakeJob(9094, constants.JOB_STATUS_QUEUED)
86
    checker = jqueue._JobChangesChecker(["status"], None, None)
87
    self.assertEqual(checker(job), ([constants.JOB_STATUS_QUEUED], []))
88

    
89
    job.SetStatus(constants.JOB_STATUS_RUNNING)
90
    self.assertEqual(checker(job), ([constants.JOB_STATUS_RUNNING], []))
91

    
92
    job.SetStatus(constants.JOB_STATUS_SUCCESS)
93
    self.assertEqual(checker(job), ([constants.JOB_STATUS_SUCCESS], []))
94

    
95
    # job.id is used by checker
96
    self.assertEqual(job.id, 9094)
97

    
98
  def testStatusWithPrev(self):
99
    job = _FakeJob(12807, constants.JOB_STATUS_QUEUED)
100
    checker = jqueue._JobChangesChecker(["status"],
101
                                        [constants.JOB_STATUS_QUEUED], None)
102
    self.assert_(checker(job) is None)
103

    
104
    job.SetStatus(constants.JOB_STATUS_RUNNING)
105
    self.assertEqual(checker(job), ([constants.JOB_STATUS_RUNNING], []))
106

    
107
  def testFinalStatus(self):
108
    for status in constants.JOBS_FINALIZED:
109
      job = _FakeJob(2178711, status)
110
      checker = jqueue._JobChangesChecker(["status"], [status], None)
111
      # There won't be any changes in this status, hence it should signal
112
      # a change immediately
113
      self.assertEqual(checker(job), ([status], []))
114

    
115
  def testLog(self):
116
    job = _FakeJob(9094, constants.JOB_STATUS_RUNNING)
117
    checker = jqueue._JobChangesChecker(["status"], None, None)
118
    self.assertEqual(checker(job), ([constants.JOB_STATUS_RUNNING], []))
119

    
120
    job.AddLogEntry("Hello World")
121
    (job_info, log_entries) = checker(job)
122
    self.assertEqual(job_info, [constants.JOB_STATUS_RUNNING])
123
    self.assertEqual(log_entries, [[0, "Hello World"]])
124

    
125
    checker2 = jqueue._JobChangesChecker(["status"], job_info, len(log_entries))
126
    self.assert_(checker2(job) is None)
127

    
128
    job.AddLogEntry("Foo Bar")
129
    job.SetStatus(constants.JOB_STATUS_ERROR)
130

    
131
    (job_info, log_entries) = checker2(job)
132
    self.assertEqual(job_info, [constants.JOB_STATUS_ERROR])
133
    self.assertEqual(log_entries, [[1, "Foo Bar"]])
134

    
135
    checker3 = jqueue._JobChangesChecker(["status"], None, None)
136
    (job_info, log_entries) = checker3(job)
137
    self.assertEqual(job_info, [constants.JOB_STATUS_ERROR])
138
    self.assertEqual(log_entries, [[0, "Hello World"], [1, "Foo Bar"]])
139

    
140

    
141
class TestJobChangesWaiter(unittest.TestCase):
142
  def setUp(self):
143
    self.tmpdir = tempfile.mkdtemp()
144
    self.filename = utils.PathJoin(self.tmpdir, "job-1")
145
    utils.WriteFile(self.filename, data="")
146

    
147
  def tearDown(self):
148
    shutil.rmtree(self.tmpdir)
149

    
150
  def _EnsureNotifierClosed(self, notifier):
151
    try:
152
      os.fstat(notifier._fd)
153
    except EnvironmentError, err:
154
      self.assertEqual(err.errno, errno.EBADF)
155
    else:
156
      self.fail("File descriptor wasn't closed")
157

    
158
  def testClose(self):
159
    for wait in [False, True]:
160
      waiter = jqueue._JobFileChangesWaiter(self.filename)
161
      try:
162
        if wait:
163
          waiter.Wait(0.001)
164
      finally:
165
        waiter.Close()
166

    
167
      # Ensure file descriptor was closed
168
      self._EnsureNotifierClosed(waiter._notifier)
169

    
170
  def testChangingFile(self):
171
    waiter = jqueue._JobFileChangesWaiter(self.filename)
172
    try:
173
      self.assertFalse(waiter.Wait(0.1))
174
      utils.WriteFile(self.filename, data="changed")
175
      self.assert_(waiter.Wait(60))
176
    finally:
177
      waiter.Close()
178

    
179
    self._EnsureNotifierClosed(waiter._notifier)
180

    
181
  def testChangingFile2(self):
182
    waiter = jqueue._JobChangesWaiter(self.filename)
183
    try:
184
      self.assertFalse(waiter._filewaiter)
185
      self.assert_(waiter.Wait(0.1))
186
      self.assert_(waiter._filewaiter)
187

    
188
      # File waiter is now used, but there have been no changes
189
      self.assertFalse(waiter.Wait(0.1))
190
      utils.WriteFile(self.filename, data="changed")
191
      self.assert_(waiter.Wait(60))
192
    finally:
193
      waiter.Close()
194

    
195
    self._EnsureNotifierClosed(waiter._filewaiter._notifier)
196

    
197

    
198
class TestWaitForJobChangesHelper(unittest.TestCase):
199
  def setUp(self):
200
    self.tmpdir = tempfile.mkdtemp()
201
    self.filename = utils.PathJoin(self.tmpdir, "job-2614226563")
202
    utils.WriteFile(self.filename, data="")
203

    
204
  def tearDown(self):
205
    shutil.rmtree(self.tmpdir)
206

    
207
  def _LoadWaitingJob(self):
208
    return _FakeJob(2614226563, constants.JOB_STATUS_WAITING)
209

    
210
  def _LoadLostJob(self):
211
    return None
212

    
213
  def testNoChanges(self):
214
    wfjc = jqueue._WaitForJobChangesHelper()
215

    
216
    # No change
217
    self.assertEqual(wfjc(self.filename, self._LoadWaitingJob, ["status"],
218
                          [constants.JOB_STATUS_WAITING], None, 0.1),
219
                     constants.JOB_NOTCHANGED)
220

    
221
    # No previous information
222
    self.assertEqual(wfjc(self.filename, self._LoadWaitingJob,
223
                          ["status"], None, None, 1.0),
224
                     ([constants.JOB_STATUS_WAITING], []))
225

    
226
  def testLostJob(self):
227
    wfjc = jqueue._WaitForJobChangesHelper()
228
    self.assert_(wfjc(self.filename, self._LoadLostJob,
229
                      ["status"], None, None, 1.0) is None)
230

    
231

    
232
class TestEncodeOpError(unittest.TestCase):
233
  def test(self):
234
    encerr = jqueue._EncodeOpError(errors.LockError("Test 1"))
235
    self.assert_(isinstance(encerr, tuple))
236
    self.assertRaises(errors.LockError, errors.MaybeRaise, encerr)
237

    
238
    encerr = jqueue._EncodeOpError(errors.GenericError("Test 2"))
239
    self.assert_(isinstance(encerr, tuple))
240
    self.assertRaises(errors.GenericError, errors.MaybeRaise, encerr)
241

    
242
    encerr = jqueue._EncodeOpError(NotImplementedError("Foo"))
243
    self.assert_(isinstance(encerr, tuple))
244
    self.assertRaises(errors.OpExecError, errors.MaybeRaise, encerr)
245

    
246
    encerr = jqueue._EncodeOpError("Hello World")
247
    self.assert_(isinstance(encerr, tuple))
248
    self.assertRaises(errors.OpExecError, errors.MaybeRaise, encerr)
249

    
250

    
251
class TestQueuedOpCode(unittest.TestCase):
252
  def testDefaults(self):
253
    def _Check(op):
254
      self.assertFalse(hasattr(op.input, "dry_run"))
255
      self.assertEqual(op.priority, constants.OP_PRIO_DEFAULT)
256
      self.assertFalse(op.log)
257
      self.assert_(op.start_timestamp is None)
258
      self.assert_(op.exec_timestamp is None)
259
      self.assert_(op.end_timestamp is None)
260
      self.assert_(op.result is None)
261
      self.assertEqual(op.status, constants.OP_STATUS_QUEUED)
262

    
263
    op1 = jqueue._QueuedOpCode(opcodes.OpTestDelay())
264
    _Check(op1)
265
    op2 = jqueue._QueuedOpCode.Restore(op1.Serialize())
266
    _Check(op2)
267
    self.assertEqual(op1.Serialize(), op2.Serialize())
268

    
269
  def testPriority(self):
270
    def _Check(op):
271
      assert constants.OP_PRIO_DEFAULT != constants.OP_PRIO_HIGH, \
272
             "Default priority equals high priority; test can't work"
273
      self.assertEqual(op.priority, constants.OP_PRIO_HIGH)
274
      self.assertEqual(op.status, constants.OP_STATUS_QUEUED)
275

    
276
    inpop = opcodes.OpTagsGet(priority=constants.OP_PRIO_HIGH)
277
    op1 = jqueue._QueuedOpCode(inpop)
278
    _Check(op1)
279
    op2 = jqueue._QueuedOpCode.Restore(op1.Serialize())
280
    _Check(op2)
281
    self.assertEqual(op1.Serialize(), op2.Serialize())
282

    
283

    
284
class TestQueuedJob(unittest.TestCase):
285
  def testNoOpCodes(self):
286
    self.assertRaises(errors.GenericError, jqueue._QueuedJob,
287
                      None, 1, [], False)
288

    
289
  def testDefaults(self):
290
    job_id = 4260
291
    ops = [
292
      opcodes.OpTagsGet(),
293
      opcodes.OpTestDelay(),
294
      ]
295

    
296
    def _Check(job):
297
      self.assertTrue(job.writable)
298
      self.assertEqual(job.id, job_id)
299
      self.assertEqual(job.log_serial, 0)
300
      self.assert_(job.received_timestamp)
301
      self.assert_(job.start_timestamp is None)
302
      self.assert_(job.end_timestamp is None)
303
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
304
      self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT)
305
      self.assert_(repr(job).startswith("<"))
306
      self.assertEqual(len(job.ops), len(ops))
307
      self.assert_(compat.all(inp.__getstate__() == op.input.__getstate__()
308
                              for (inp, op) in zip(ops, job.ops)))
309
      self.assertRaises(errors.OpPrereqError, job.GetInfo,
310
                        ["unknown-field"])
311
      self.assertEqual(job.GetInfo(["summary"]),
312
                       [[op.input.Summary() for op in job.ops]])
313
      self.assertFalse(job.archived)
314

    
315
    job1 = jqueue._QueuedJob(None, job_id, ops, True)
316
    _Check(job1)
317
    job2 = jqueue._QueuedJob.Restore(None, job1.Serialize(), True, False)
318
    _Check(job2)
319
    self.assertEqual(job1.Serialize(), job2.Serialize())
320

    
321
  def testWritable(self):
322
    job = jqueue._QueuedJob(None, 1, [opcodes.OpTestDelay()], False)
323
    self.assertFalse(job.writable)
324

    
325
    job = jqueue._QueuedJob(None, 1, [opcodes.OpTestDelay()], True)
326
    self.assertTrue(job.writable)
327

    
328
  def testArchived(self):
329
    job = jqueue._QueuedJob(None, 1, [opcodes.OpTestDelay()], False)
330
    self.assertFalse(job.archived)
331

    
332
    newjob = jqueue._QueuedJob.Restore(None, job.Serialize(), True, True)
333
    self.assertTrue(newjob.archived)
334

    
335
    newjob2 = jqueue._QueuedJob.Restore(None, newjob.Serialize(), True, False)
336
    self.assertFalse(newjob2.archived)
337

    
338
  def testPriority(self):
339
    job_id = 4283
340
    ops = [
341
      opcodes.OpTagsGet(priority=constants.OP_PRIO_DEFAULT),
342
      opcodes.OpTestDelay(),
343
      ]
344

    
345
    def _Check(job):
346
      self.assertEqual(job.id, job_id)
347
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
348
      self.assert_(repr(job).startswith("<"))
349

    
350
    job = jqueue._QueuedJob(None, job_id, ops, True)
351
    _Check(job)
352
    self.assert_(compat.all(op.priority == constants.OP_PRIO_DEFAULT
353
                            for op in job.ops))
354
    self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT)
355

    
356
    # Increase first
357
    job.ops[0].priority -= 1
358
    _Check(job)
359
    self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT - 1)
360

    
361
    # Mark opcode as finished
362
    job.ops[0].status = constants.OP_STATUS_SUCCESS
363
    _Check(job)
364
    self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT)
365

    
366
    # Increase second
367
    job.ops[1].priority -= 10
368
    self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT - 10)
369

    
370
    # Test increasing first
371
    job.ops[0].status = constants.OP_STATUS_RUNNING
372
    job.ops[0].priority -= 19
373
    self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT - 20)
374

    
375
  def _JobForPriority(self, job_id):
376
    ops = [
377
      opcodes.OpTagsGet(),
378
      opcodes.OpTestDelay(),
379
      opcodes.OpTagsGet(),
380
      opcodes.OpTestDelay(),
381
      ]
382

    
383
    job = jqueue._QueuedJob(None, job_id, ops, True)
384

    
385
    self.assertTrue(compat.all(op.priority == constants.OP_PRIO_DEFAULT
386
                               for op in job.ops))
387
    self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT)
388
    self.assertFalse(compat.any(hasattr(op.input, "priority")
389
                                for op in job.ops))
390

    
391
    return job
392

    
393
  def testChangePriorityAllQueued(self):
394
    job = self._JobForPriority(24984)
395
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
396
    self.assertTrue(compat.all(op.status == constants.OP_STATUS_QUEUED
397
                               for op in job.ops))
398
    result = job.ChangePriority(-10)
399
    self.assertEqual(job.CalcPriority(), -10)
400
    self.assertTrue(compat.all(op.priority == -10 for op in job.ops))
401
    self.assertFalse(compat.any(hasattr(op.input, "priority")
402
                                for op in job.ops))
403
    self.assertEqual(result,
404
                     (True, ("Priorities of pending opcodes for job 24984 have"
405
                             " been changed to -10")))
406

    
407
  def testChangePriorityAllFinished(self):
408
    job = self._JobForPriority(16405)
409

    
410
    for (idx, op) in enumerate(job.ops):
411
      if idx > 2:
412
        op.status = constants.OP_STATUS_ERROR
413
      else:
414
        op.status = constants.OP_STATUS_SUCCESS
415

    
416
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_ERROR)
417
    self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT)
418
    result = job.ChangePriority(-10)
419
    self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT)
420
    self.assertTrue(compat.all(op.priority == constants.OP_PRIO_DEFAULT
421
                               for op in job.ops))
422
    self.assertFalse(compat.any(hasattr(op.input, "priority")
423
                                for op in job.ops))
424
    self.assertEqual(map(operator.attrgetter("status"), job.ops), [
425
      constants.OP_STATUS_SUCCESS,
426
      constants.OP_STATUS_SUCCESS,
427
      constants.OP_STATUS_SUCCESS,
428
      constants.OP_STATUS_ERROR,
429
      ])
430
    self.assertEqual(result, (False, "Job 16405 is finished"))
431

    
432
  def testChangePriorityCancelling(self):
433
    job = self._JobForPriority(31572)
434

    
435
    for (idx, op) in enumerate(job.ops):
436
      if idx > 1:
437
        op.status = constants.OP_STATUS_CANCELING
438
      else:
439
        op.status = constants.OP_STATUS_SUCCESS
440

    
441
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_CANCELING)
442
    self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT)
443
    result = job.ChangePriority(5)
444
    self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT)
445
    self.assertTrue(compat.all(op.priority == constants.OP_PRIO_DEFAULT
446
                               for op in job.ops))
447
    self.assertFalse(compat.any(hasattr(op.input, "priority")
448
                                for op in job.ops))
449
    self.assertEqual(map(operator.attrgetter("status"), job.ops), [
450
      constants.OP_STATUS_SUCCESS,
451
      constants.OP_STATUS_SUCCESS,
452
      constants.OP_STATUS_CANCELING,
453
      constants.OP_STATUS_CANCELING,
454
      ])
455
    self.assertEqual(result, (False, "Job 31572 is cancelling"))
456

    
457
  def testChangePriorityFirstRunning(self):
458
    job = self._JobForPriority(1716215889)
459

    
460
    for (idx, op) in enumerate(job.ops):
461
      if idx == 0:
462
        op.status = constants.OP_STATUS_RUNNING
463
      else:
464
        op.status = constants.OP_STATUS_QUEUED
465

    
466
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
467
    self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT)
468
    result = job.ChangePriority(7)
469
    self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT)
470
    self.assertEqual(map(operator.attrgetter("priority"), job.ops),
471
                     [constants.OP_PRIO_DEFAULT, 7, 7, 7])
472
    self.assertFalse(compat.any(hasattr(op.input, "priority")
473
                                for op in job.ops))
474
    self.assertEqual(map(operator.attrgetter("status"), job.ops), [
475
      constants.OP_STATUS_RUNNING,
476
      constants.OP_STATUS_QUEUED,
477
      constants.OP_STATUS_QUEUED,
478
      constants.OP_STATUS_QUEUED,
479
      ])
480
    self.assertEqual(result,
481
                     (True, ("Priorities of pending opcodes for job"
482
                             " 1716215889 have been changed to 7")))
483

    
484
  def testChangePriorityLastRunning(self):
485
    job = self._JobForPriority(1308)
486

    
487
    for (idx, op) in enumerate(job.ops):
488
      if idx == (len(job.ops) - 1):
489
        op.status = constants.OP_STATUS_RUNNING
490
      else:
491
        op.status = constants.OP_STATUS_SUCCESS
492

    
493
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
494
    self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT)
495
    result = job.ChangePriority(-3)
496
    self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT)
497
    self.assertTrue(compat.all(op.priority == constants.OP_PRIO_DEFAULT
498
                               for op in job.ops))
499
    self.assertFalse(compat.any(hasattr(op.input, "priority")
500
                                for op in job.ops))
501
    self.assertEqual(map(operator.attrgetter("status"), job.ops), [
502
      constants.OP_STATUS_SUCCESS,
503
      constants.OP_STATUS_SUCCESS,
504
      constants.OP_STATUS_SUCCESS,
505
      constants.OP_STATUS_RUNNING,
506
      ])
507
    self.assertEqual(result, (False, "Job 1308 had no pending opcodes"))
508

    
509
  def testChangePrioritySecondOpcodeRunning(self):
510
    job = self._JobForPriority(27701)
511

    
512
    self.assertEqual(len(job.ops), 4)
513
    job.ops[0].status = constants.OP_STATUS_SUCCESS
514
    job.ops[1].status = constants.OP_STATUS_RUNNING
515
    job.ops[2].status = constants.OP_STATUS_QUEUED
516
    job.ops[3].status = constants.OP_STATUS_QUEUED
517

    
518
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
519
    result = job.ChangePriority(-19)
520
    self.assertEqual(job.CalcPriority(), -19)
521
    self.assertEqual(map(operator.attrgetter("priority"), job.ops),
522
                     [constants.OP_PRIO_DEFAULT, constants.OP_PRIO_DEFAULT,
523
                      -19, -19])
524
    self.assertFalse(compat.any(hasattr(op.input, "priority")
525
                                for op in job.ops))
526
    self.assertEqual(map(operator.attrgetter("status"), job.ops), [
527
      constants.OP_STATUS_SUCCESS,
528
      constants.OP_STATUS_RUNNING,
529
      constants.OP_STATUS_QUEUED,
530
      constants.OP_STATUS_QUEUED,
531
      ])
532
    self.assertEqual(result,
533
                     (True, ("Priorities of pending opcodes for job"
534
                             " 27701 have been changed to -19")))
535

    
536
  def testChangePriorityWithInconsistentJob(self):
537
    job = self._JobForPriority(30097)
538

    
539
    self.assertEqual(len(job.ops), 4)
540

    
541
    # This job is invalid (as it has two opcodes marked as running) and make
542
    # the call fail because an unprocessed opcode precedes a running one (which
543
    # should never happen in reality)
544
    job.ops[0].status = constants.OP_STATUS_SUCCESS
545
    job.ops[1].status = constants.OP_STATUS_RUNNING
546
    job.ops[2].status = constants.OP_STATUS_QUEUED
547
    job.ops[3].status = constants.OP_STATUS_RUNNING
548

    
549
    self.assertRaises(AssertionError, job.ChangePriority, 19)
550

    
551
  def testCalcStatus(self):
552
    def _Queued(ops):
553
      # The default status is "queued"
554
      self.assert_(compat.all(op.status == constants.OP_STATUS_QUEUED
555
                              for op in ops))
556

    
557
    def _Waitlock1(ops):
558
      ops[0].status = constants.OP_STATUS_WAITING
559

    
560
    def _Waitlock2(ops):
561
      ops[0].status = constants.OP_STATUS_SUCCESS
562
      ops[1].status = constants.OP_STATUS_SUCCESS
563
      ops[2].status = constants.OP_STATUS_WAITING
564

    
565
    def _Running(ops):
566
      ops[0].status = constants.OP_STATUS_SUCCESS
567
      ops[1].status = constants.OP_STATUS_RUNNING
568
      for op in ops[2:]:
569
        op.status = constants.OP_STATUS_QUEUED
570

    
571
    def _Canceling1(ops):
572
      ops[0].status = constants.OP_STATUS_SUCCESS
573
      ops[1].status = constants.OP_STATUS_SUCCESS
574
      for op in ops[2:]:
575
        op.status = constants.OP_STATUS_CANCELING
576

    
577
    def _Canceling2(ops):
578
      for op in ops:
579
        op.status = constants.OP_STATUS_CANCELING
580

    
581
    def _Canceled(ops):
582
      for op in ops:
583
        op.status = constants.OP_STATUS_CANCELED
584

    
585
    def _Error1(ops):
586
      for idx, op in enumerate(ops):
587
        if idx > 3:
588
          op.status = constants.OP_STATUS_ERROR
589
        else:
590
          op.status = constants.OP_STATUS_SUCCESS
591

    
592
    def _Error2(ops):
593
      for op in ops:
594
        op.status = constants.OP_STATUS_ERROR
595

    
596
    def _Success(ops):
597
      for op in ops:
598
        op.status = constants.OP_STATUS_SUCCESS
599

    
600
    tests = {
601
      constants.JOB_STATUS_QUEUED: [_Queued],
602
      constants.JOB_STATUS_WAITING: [_Waitlock1, _Waitlock2],
603
      constants.JOB_STATUS_RUNNING: [_Running],
604
      constants.JOB_STATUS_CANCELING: [_Canceling1, _Canceling2],
605
      constants.JOB_STATUS_CANCELED: [_Canceled],
606
      constants.JOB_STATUS_ERROR: [_Error1, _Error2],
607
      constants.JOB_STATUS_SUCCESS: [_Success],
608
      }
609

    
610
    def _NewJob():
611
      job = jqueue._QueuedJob(None, 1,
612
                              [opcodes.OpTestDelay() for _ in range(10)],
613
                              True)
614
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
615
      self.assert_(compat.all(op.status == constants.OP_STATUS_QUEUED
616
                              for op in job.ops))
617
      return job
618

    
619
    for status in constants.JOB_STATUS_ALL:
620
      sttests = tests[status]
621
      assert sttests
622
      for fn in sttests:
623
        job = _NewJob()
624
        fn(job.ops)
625
        self.assertEqual(job.CalcStatus(), status)
626

    
627

    
628
class _FakeDependencyManager:
629
  def __init__(self):
630
    self._checks = []
631
    self._notifications = []
632
    self._waiting = set()
633

    
634
  def AddCheckResult(self, job, dep_job_id, dep_status, result):
635
    self._checks.append((job, dep_job_id, dep_status, result))
636

    
637
  def CountPendingResults(self):
638
    return len(self._checks)
639

    
640
  def CountWaitingJobs(self):
641
    return len(self._waiting)
642

    
643
  def GetNextNotification(self):
644
    return self._notifications.pop(0)
645

    
646
  def JobWaiting(self, job):
647
    return job in self._waiting
648

    
649
  def CheckAndRegister(self, job, dep_job_id, dep_status):
650
    (exp_job, exp_dep_job_id, exp_dep_status, result) = self._checks.pop(0)
651

    
652
    assert exp_job == job
653
    assert exp_dep_job_id == dep_job_id
654
    assert exp_dep_status == dep_status
655

    
656
    (result_status, _) = result
657

    
658
    if result_status == jqueue._JobDependencyManager.WAIT:
659
      self._waiting.add(job)
660
    elif result_status == jqueue._JobDependencyManager.CONTINUE:
661
      self._waiting.remove(job)
662

    
663
    return result
664

    
665
  def NotifyWaiters(self, job_id):
666
    self._notifications.append(job_id)
667

    
668

    
669
class _DisabledFakeDependencyManager:
670
  def JobWaiting(self, _):
671
    return False
672

    
673
  def CheckAndRegister(self, *args):
674
    assert False, "Should not be called"
675

    
676
  def NotifyWaiters(self, _):
677
    pass
678

    
679

    
680
class _FakeQueueForProc:
681
  def __init__(self, depmgr=None):
682
    self._acquired = False
683
    self._updates = []
684
    self._submitted = []
685
    self._accepting_jobs = True
686

    
687
    self._submit_count = itertools.count(1000)
688

    
689
    if depmgr:
690
      self.depmgr = depmgr
691
    else:
692
      self.depmgr = _DisabledFakeDependencyManager()
693

    
694
  def IsAcquired(self):
695
    return self._acquired
696

    
697
  def GetNextUpdate(self):
698
    return self._updates.pop(0)
699

    
700
  def GetNextSubmittedJob(self):
701
    return self._submitted.pop(0)
702

    
703
  def acquire(self, shared=0):
704
    assert shared == 1
705
    self._acquired = True
706

    
707
  def release(self):
708
    assert self._acquired
709
    self._acquired = False
710

    
711
  def UpdateJobUnlocked(self, job, replicate=True):
712
    assert self._acquired, "Lock not acquired while updating job"
713
    self._updates.append((job, bool(replicate)))
714

    
715
  def SubmitManyJobs(self, jobs):
716
    assert not self._acquired, "Lock acquired while submitting jobs"
717
    job_ids = [self._submit_count.next() for _ in jobs]
718
    self._submitted.extend(zip(job_ids, jobs))
719
    return job_ids
720

    
721
  def StopAcceptingJobs(self):
722
    self._accepting_jobs = False
723

    
724
  def AcceptingJobsUnlocked(self):
725
    return self._accepting_jobs
726

    
727

    
728
class _FakeExecOpCodeForProc:
729
  def __init__(self, queue, before_start, after_start):
730
    self._queue = queue
731
    self._before_start = before_start
732
    self._after_start = after_start
733

    
734
  def __call__(self, op, cbs, timeout=None):
735
    assert isinstance(op, opcodes.OpTestDummy)
736
    assert not self._queue.IsAcquired(), \
737
           "Queue lock not released when executing opcode"
738

    
739
    if self._before_start:
740
      self._before_start(timeout, cbs.CurrentPriority())
741

    
742
    cbs.NotifyStart()
743

    
744
    if self._after_start:
745
      self._after_start(op, cbs)
746

    
747
    # Check again after the callbacks
748
    assert not self._queue.IsAcquired()
749

    
750
    if op.fail:
751
      raise errors.OpExecError("Error requested (%s)" % op.result)
752

    
753
    if hasattr(op, "submit_jobs") and op.submit_jobs is not None:
754
      return cbs.SubmitManyJobs(op.submit_jobs)
755

    
756
    return op.result
757

    
758

    
759
class _JobProcessorTestUtils:
760
  def _CreateJob(self, queue, job_id, ops):
761
    job = jqueue._QueuedJob(queue, job_id, ops, True)
762
    self.assertFalse(job.start_timestamp)
763
    self.assertFalse(job.end_timestamp)
764
    self.assertEqual(len(ops), len(job.ops))
765
    self.assert_(compat.all(op.input == inp
766
                            for (op, inp) in zip(job.ops, ops)))
767
    self.assertEqual(job.GetInfo(["ops"]), [[op.__getstate__() for op in ops]])
768
    return job
769

    
770

    
771
class TestJobProcessor(unittest.TestCase, _JobProcessorTestUtils):
772
  def _GenericCheckJob(self, job):
773
    assert compat.all(isinstance(op.input, opcodes.OpTestDummy)
774
                      for op in job.ops)
775

    
776
    self.assertEqual(job.GetInfo(["opstart", "opexec", "opend"]),
777
                     [[op.start_timestamp for op in job.ops],
778
                      [op.exec_timestamp for op in job.ops],
779
                      [op.end_timestamp for op in job.ops]])
780
    self.assertEqual(job.GetInfo(["received_ts", "start_ts", "end_ts"]),
781
                     [job.received_timestamp,
782
                      job.start_timestamp,
783
                      job.end_timestamp])
784
    self.assert_(job.start_timestamp)
785
    self.assert_(job.end_timestamp)
786
    self.assertEqual(job.start_timestamp, job.ops[0].start_timestamp)
787

    
788
  def testSuccess(self):
789
    queue = _FakeQueueForProc()
790

    
791
    for (job_id, opcount) in [(25351, 1), (6637, 3),
792
                              (24644, 10), (32207, 100)]:
793
      ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
794
             for i in range(opcount)]
795

    
796
      # Create job
797
      job = self._CreateJob(queue, job_id, ops)
798

    
799
      def _BeforeStart(timeout, priority):
800
        self.assertEqual(queue.GetNextUpdate(), (job, True))
801
        self.assertRaises(IndexError, queue.GetNextUpdate)
802
        self.assertFalse(queue.IsAcquired())
803
        self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITING)
804
        self.assertFalse(job.cur_opctx)
805

    
806
      def _AfterStart(op, cbs):
807
        self.assertEqual(queue.GetNextUpdate(), (job, True))
808
        self.assertRaises(IndexError, queue.GetNextUpdate)
809

    
810
        self.assertFalse(queue.IsAcquired())
811
        self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
812
        self.assertFalse(job.cur_opctx)
813

    
814
        # Job is running, cancelling shouldn't be possible
815
        (success, _) = job.Cancel()
816
        self.assertFalse(success)
817

    
818
      opexec = _FakeExecOpCodeForProc(queue, _BeforeStart, _AfterStart)
819

    
820
      for idx in range(len(ops)):
821
        self.assertRaises(IndexError, queue.GetNextUpdate)
822
        result = jqueue._JobProcessor(queue, opexec, job)()
823
        self.assertEqual(queue.GetNextUpdate(), (job, True))
824
        self.assertRaises(IndexError, queue.GetNextUpdate)
825
        if idx == len(ops) - 1:
826
          # Last opcode
827
          self.assertEqual(result, jqueue._JobProcessor.FINISHED)
828
        else:
829
          self.assertEqual(result, jqueue._JobProcessor.DEFER)
830

    
831
          self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
832
          self.assert_(job.start_timestamp)
833
          self.assertFalse(job.end_timestamp)
834

    
835
      self.assertRaises(IndexError, queue.GetNextUpdate)
836

    
837
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_SUCCESS)
838
      self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_SUCCESS])
839
      self.assertEqual(job.GetInfo(["opresult"]),
840
                       [[op.input.result for op in job.ops]])
841
      self.assertEqual(job.GetInfo(["opstatus"]),
842
                       [len(job.ops) * [constants.OP_STATUS_SUCCESS]])
843
      self.assert_(compat.all(op.start_timestamp and op.end_timestamp
844
                              for op in job.ops))
845

    
846
      self._GenericCheckJob(job)
847

    
848
      # Calling the processor on a finished job should be a no-op
849
      self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
850
                       jqueue._JobProcessor.FINISHED)
851
      self.assertRaises(IndexError, queue.GetNextUpdate)
852

    
853
  def testOpcodeError(self):
854
    queue = _FakeQueueForProc()
855

    
856
    testdata = [
857
      (17077, 1, 0, 0),
858
      (1782, 5, 2, 2),
859
      (18179, 10, 9, 9),
860
      (4744, 10, 3, 8),
861
      (23816, 100, 39, 45),
862
      ]
863

    
864
    for (job_id, opcount, failfrom, failto) in testdata:
865
      # Prepare opcodes
866
      ops = [opcodes.OpTestDummy(result="Res%s" % i,
867
                                 fail=(failfrom <= i and
868
                                       i <= failto))
869
             for i in range(opcount)]
870

    
871
      # Create job
872
      job = self._CreateJob(queue, str(job_id), ops)
873

    
874
      opexec = _FakeExecOpCodeForProc(queue, None, None)
875

    
876
      for idx in range(len(ops)):
877
        self.assertRaises(IndexError, queue.GetNextUpdate)
878
        result = jqueue._JobProcessor(queue, opexec, job)()
879
        # queued to waitlock
880
        self.assertEqual(queue.GetNextUpdate(), (job, True))
881
        # waitlock to running
882
        self.assertEqual(queue.GetNextUpdate(), (job, True))
883
        # Opcode result
884
        self.assertEqual(queue.GetNextUpdate(), (job, True))
885
        self.assertRaises(IndexError, queue.GetNextUpdate)
886

    
887
        if idx in (failfrom, len(ops) - 1):
888
          # Last opcode
889
          self.assertEqual(result, jqueue._JobProcessor.FINISHED)
890
          break
891

    
892
        self.assertEqual(result, jqueue._JobProcessor.DEFER)
893

    
894
        self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
895

    
896
      self.assertRaises(IndexError, queue.GetNextUpdate)
897

    
898
      # Check job status
899
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_ERROR)
900
      self.assertEqual(job.GetInfo(["id"]), [job_id])
901
      self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_ERROR])
902

    
903
      # Check opcode status
904
      data = zip(job.ops,
905
                 job.GetInfo(["opstatus"])[0],
906
                 job.GetInfo(["opresult"])[0])
907

    
908
      for idx, (op, opstatus, opresult) in enumerate(data):
909
        if idx < failfrom:
910
          assert not op.input.fail
911
          self.assertEqual(opstatus, constants.OP_STATUS_SUCCESS)
912
          self.assertEqual(opresult, op.input.result)
913
        elif idx <= failto:
914
          assert op.input.fail
915
          self.assertEqual(opstatus, constants.OP_STATUS_ERROR)
916
          self.assertRaises(errors.OpExecError, errors.MaybeRaise, opresult)
917
        else:
918
          assert not op.input.fail
919
          self.assertEqual(opstatus, constants.OP_STATUS_ERROR)
920
          self.assertRaises(errors.OpExecError, errors.MaybeRaise, opresult)
921

    
922
      self.assert_(compat.all(op.start_timestamp and op.end_timestamp
923
                              for op in job.ops[:failfrom]))
924

    
925
      self._GenericCheckJob(job)
926

    
927
      # Calling the processor on a finished job should be a no-op
928
      self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
929
                       jqueue._JobProcessor.FINISHED)
930
      self.assertRaises(IndexError, queue.GetNextUpdate)
931

    
932
  def testCancelWhileInQueue(self):
933
    queue = _FakeQueueForProc()
934

    
935
    ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
936
           for i in range(5)]
937

    
938
    # Create job
939
    job_id = 17045
940
    job = self._CreateJob(queue, job_id, ops)
941

    
942
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
943

    
944
    # Mark as cancelled
945
    (success, _) = job.Cancel()
946
    self.assert_(success)
947

    
948
    self.assertRaises(IndexError, queue.GetNextUpdate)
949

    
950
    self.assertFalse(job.start_timestamp)
951
    self.assertTrue(job.end_timestamp)
952
    self.assert_(compat.all(op.status == constants.OP_STATUS_CANCELED
953
                            for op in job.ops))
954

    
955
    # Serialize to check for differences
956
    before_proc = job.Serialize()
957

    
958
    # Simulate processor called in workerpool
959
    opexec = _FakeExecOpCodeForProc(queue, None, None)
960
    self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
961
                     jqueue._JobProcessor.FINISHED)
962

    
963
    # Check result
964
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_CANCELED)
965
    self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_CANCELED])
966
    self.assertFalse(job.start_timestamp)
967
    self.assertTrue(job.end_timestamp)
968
    self.assertFalse(compat.any(op.start_timestamp or op.end_timestamp
969
                                for op in job.ops))
970
    self.assertEqual(job.GetInfo(["opstatus", "opresult"]),
971
                     [[constants.OP_STATUS_CANCELED for _ in job.ops],
972
                      ["Job canceled by request" for _ in job.ops]])
973

    
974
    # Must not have changed or written
975
    self.assertEqual(before_proc, job.Serialize())
976
    self.assertRaises(IndexError, queue.GetNextUpdate)
977

    
978
  def testCancelWhileWaitlockInQueue(self):
979
    queue = _FakeQueueForProc()
980

    
981
    ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
982
           for i in range(5)]
983

    
984
    # Create job
985
    job_id = 8645
986
    job = self._CreateJob(queue, job_id, ops)
987

    
988
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
989

    
990
    job.ops[0].status = constants.OP_STATUS_WAITING
991

    
992
    assert len(job.ops) == 5
993

    
994
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITING)
995

    
996
    # Mark as cancelling
997
    (success, _) = job.Cancel()
998
    self.assert_(success)
999

    
1000
    self.assertRaises(IndexError, queue.GetNextUpdate)
1001

    
1002
    self.assert_(compat.all(op.status == constants.OP_STATUS_CANCELING
1003
                            for op in job.ops))
1004

    
1005
    opexec = _FakeExecOpCodeForProc(queue, None, None)
1006
    self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
1007
                     jqueue._JobProcessor.FINISHED)
1008

    
1009
    # Check result
1010
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_CANCELED)
1011
    self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_CANCELED])
1012
    self.assertFalse(job.start_timestamp)
1013
    self.assert_(job.end_timestamp)
1014
    self.assertFalse(compat.any(op.start_timestamp or op.end_timestamp
1015
                                for op in job.ops))
1016
    self.assertEqual(job.GetInfo(["opstatus", "opresult"]),
1017
                     [[constants.OP_STATUS_CANCELED for _ in job.ops],
1018
                      ["Job canceled by request" for _ in job.ops]])
1019

    
1020
  def testCancelWhileWaitlock(self):
1021
    queue = _FakeQueueForProc()
1022

    
1023
    ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
1024
           for i in range(5)]
1025

    
1026
    # Create job
1027
    job_id = 11009
1028
    job = self._CreateJob(queue, job_id, ops)
1029

    
1030
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1031

    
1032
    def _BeforeStart(timeout, priority):
1033
      self.assertEqual(queue.GetNextUpdate(), (job, True))
1034
      self.assertRaises(IndexError, queue.GetNextUpdate)
1035
      self.assertFalse(queue.IsAcquired())
1036
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITING)
1037

    
1038
      # Mark as cancelled
1039
      (success, _) = job.Cancel()
1040
      self.assert_(success)
1041

    
1042
      self.assert_(compat.all(op.status == constants.OP_STATUS_CANCELING
1043
                              for op in job.ops))
1044
      self.assertRaises(IndexError, queue.GetNextUpdate)
1045

    
1046
    def _AfterStart(op, cbs):
1047
      self.assertEqual(queue.GetNextUpdate(), (job, True))
1048
      self.assertRaises(IndexError, queue.GetNextUpdate)
1049
      self.assertFalse(queue.IsAcquired())
1050
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
1051

    
1052
    opexec = _FakeExecOpCodeForProc(queue, _BeforeStart, _AfterStart)
1053

    
1054
    self.assertRaises(IndexError, queue.GetNextUpdate)
1055
    self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
1056
                     jqueue._JobProcessor.FINISHED)
1057
    self.assertEqual(queue.GetNextUpdate(), (job, True))
1058
    self.assertRaises(IndexError, queue.GetNextUpdate)
1059

    
1060
    # Check result
1061
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_CANCELED)
1062
    self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_CANCELED])
1063
    self.assert_(job.start_timestamp)
1064
    self.assert_(job.end_timestamp)
1065
    self.assertFalse(compat.all(op.start_timestamp and op.end_timestamp
1066
                                for op in job.ops))
1067
    self.assertEqual(job.GetInfo(["opstatus", "opresult"]),
1068
                     [[constants.OP_STATUS_CANCELED for _ in job.ops],
1069
                      ["Job canceled by request" for _ in job.ops]])
1070

    
1071
  def _TestCancelWhileSomething(self, cb):
1072
    queue = _FakeQueueForProc()
1073

    
1074
    ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
1075
           for i in range(5)]
1076

    
1077
    # Create job
1078
    job_id = 24314
1079
    job = self._CreateJob(queue, job_id, ops)
1080

    
1081
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1082

    
1083
    def _BeforeStart(timeout, priority):
1084
      self.assertFalse(queue.IsAcquired())
1085
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITING)
1086

    
1087
      # Mark as cancelled
1088
      (success, _) = job.Cancel()
1089
      self.assert_(success)
1090

    
1091
      self.assert_(compat.all(op.status == constants.OP_STATUS_CANCELING
1092
                              for op in job.ops))
1093

    
1094
      cb(queue)
1095

    
1096
    def _AfterStart(op, cbs):
1097
      self.fail("Should not reach this")
1098

    
1099
    opexec = _FakeExecOpCodeForProc(queue, _BeforeStart, _AfterStart)
1100

    
1101
    self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
1102
                     jqueue._JobProcessor.FINISHED)
1103

    
1104
    # Check result
1105
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_CANCELED)
1106
    self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_CANCELED])
1107
    self.assert_(job.start_timestamp)
1108
    self.assert_(job.end_timestamp)
1109
    self.assertFalse(compat.all(op.start_timestamp and op.end_timestamp
1110
                                for op in job.ops))
1111
    self.assertEqual(job.GetInfo(["opstatus", "opresult"]),
1112
                     [[constants.OP_STATUS_CANCELED for _ in job.ops],
1113
                      ["Job canceled by request" for _ in job.ops]])
1114

    
1115
    return queue
1116

    
1117
  def testCancelWhileWaitlockWithTimeout(self):
1118
    def fn(_):
1119
      # Fake an acquire attempt timing out
1120
      raise mcpu.LockAcquireTimeout()
1121

    
1122
    self._TestCancelWhileSomething(fn)
1123

    
1124
  def testCancelDuringQueueShutdown(self):
1125
    queue = self._TestCancelWhileSomething(lambda q: q.StopAcceptingJobs())
1126
    self.assertFalse(queue.AcceptingJobsUnlocked())
1127

    
1128
  def testCancelWhileRunning(self):
1129
    # Tests canceling a job with finished opcodes and more, unprocessed ones
1130
    queue = _FakeQueueForProc()
1131

    
1132
    ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
1133
           for i in range(3)]
1134

    
1135
    # Create job
1136
    job_id = 28492
1137
    job = self._CreateJob(queue, job_id, ops)
1138

    
1139
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1140

    
1141
    opexec = _FakeExecOpCodeForProc(queue, None, None)
1142

    
1143
    # Run one opcode
1144
    self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
1145
                     jqueue._JobProcessor.DEFER)
1146

    
1147
    # Job goes back to queued
1148
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1149
    self.assertEqual(job.GetInfo(["opstatus", "opresult"]),
1150
                     [[constants.OP_STATUS_SUCCESS,
1151
                       constants.OP_STATUS_QUEUED,
1152
                       constants.OP_STATUS_QUEUED],
1153
                      ["Res0", None, None]])
1154

    
1155
    # Mark as cancelled
1156
    (success, _) = job.Cancel()
1157
    self.assert_(success)
1158

    
1159
    # Try processing another opcode (this will actually cancel the job)
1160
    self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
1161
                     jqueue._JobProcessor.FINISHED)
1162

    
1163
    # Check result
1164
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_CANCELED)
1165
    self.assertEqual(job.GetInfo(["id"]), [job_id])
1166
    self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_CANCELED])
1167
    self.assertEqual(job.GetInfo(["opstatus", "opresult"]),
1168
                     [[constants.OP_STATUS_SUCCESS,
1169
                       constants.OP_STATUS_CANCELED,
1170
                       constants.OP_STATUS_CANCELED],
1171
                      ["Res0", "Job canceled by request",
1172
                       "Job canceled by request"]])
1173

    
1174
  def _TestQueueShutdown(self, queue, opexec, job, runcount):
1175
    self.assertTrue(queue.AcceptingJobsUnlocked())
1176

    
1177
    # Simulate shutdown
1178
    queue.StopAcceptingJobs()
1179

    
1180
    self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
1181
                     jqueue._JobProcessor.DEFER)
1182

    
1183
    # Check result
1184
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1185
    self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_QUEUED])
1186
    self.assertFalse(job.cur_opctx)
1187
    self.assertTrue(job.start_timestamp)
1188
    self.assertFalse(job.end_timestamp)
1189
    self.assertEqual(job.start_timestamp, job.ops[0].start_timestamp)
1190
    self.assertTrue(compat.all(op.start_timestamp and op.end_timestamp
1191
                               for op in job.ops[:runcount]))
1192
    self.assertFalse(job.ops[runcount].end_timestamp)
1193
    self.assertFalse(compat.any(op.start_timestamp or op.end_timestamp
1194
                                for op in job.ops[(runcount + 1):]))
1195
    self.assertEqual(job.GetInfo(["opstatus", "opresult"]),
1196
                     [(([constants.OP_STATUS_SUCCESS] * runcount) +
1197
                       ([constants.OP_STATUS_QUEUED] *
1198
                        (len(job.ops) - runcount))),
1199
                      (["Res%s" % i for i in range(runcount)] +
1200
                       ([None] * (len(job.ops) - runcount)))])
1201

    
1202
    # Must have been written and replicated
1203
    self.assertEqual(queue.GetNextUpdate(), (job, True))
1204
    self.assertRaises(IndexError, queue.GetNextUpdate)
1205

    
1206
  def testQueueShutdownWhileRunning(self):
1207
    # Tests shutting down the queue while a job is running
1208
    queue = _FakeQueueForProc()
1209

    
1210
    ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
1211
           for i in range(3)]
1212

    
1213
    # Create job
1214
    job_id = 2718211587
1215
    job = self._CreateJob(queue, job_id, ops)
1216

    
1217
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1218

    
1219
    opexec = _FakeExecOpCodeForProc(queue, None, None)
1220

    
1221
    self.assertRaises(IndexError, queue.GetNextUpdate)
1222

    
1223
    # Run one opcode
1224
    self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
1225
                     jqueue._JobProcessor.DEFER)
1226

    
1227
    # Job goes back to queued
1228
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1229
    self.assertEqual(job.GetInfo(["opstatus", "opresult"]),
1230
                     [[constants.OP_STATUS_SUCCESS,
1231
                       constants.OP_STATUS_QUEUED,
1232
                       constants.OP_STATUS_QUEUED],
1233
                      ["Res0", None, None]])
1234
    self.assertFalse(job.cur_opctx)
1235

    
1236
    # Writes for waiting, running and result
1237
    for _ in range(3):
1238
      self.assertEqual(queue.GetNextUpdate(), (job, True))
1239

    
1240
    # Run second opcode
1241
    self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
1242
                     jqueue._JobProcessor.DEFER)
1243

    
1244
    # Job goes back to queued
1245
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1246
    self.assertEqual(job.GetInfo(["opstatus", "opresult"]),
1247
                     [[constants.OP_STATUS_SUCCESS,
1248
                       constants.OP_STATUS_SUCCESS,
1249
                       constants.OP_STATUS_QUEUED],
1250
                      ["Res0", "Res1", None]])
1251
    self.assertFalse(job.cur_opctx)
1252

    
1253
    # Writes for waiting, running and result
1254
    for _ in range(3):
1255
      self.assertEqual(queue.GetNextUpdate(), (job, True))
1256

    
1257
    self._TestQueueShutdown(queue, opexec, job, 2)
1258

    
1259
  def testQueueShutdownWithLockTimeout(self):
1260
    # Tests shutting down while a lock acquire times out
1261
    queue = _FakeQueueForProc()
1262

    
1263
    ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
1264
           for i in range(3)]
1265

    
1266
    # Create job
1267
    job_id = 1304231178
1268
    job = self._CreateJob(queue, job_id, ops)
1269

    
1270
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1271

    
1272
    acquire_timeout = False
1273

    
1274
    def _BeforeStart(timeout, priority):
1275
      self.assertFalse(queue.IsAcquired())
1276
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITING)
1277
      if acquire_timeout:
1278
        raise mcpu.LockAcquireTimeout()
1279

    
1280
    opexec = _FakeExecOpCodeForProc(queue, _BeforeStart, None)
1281

    
1282
    self.assertRaises(IndexError, queue.GetNextUpdate)
1283

    
1284
    # Run one opcode
1285
    self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
1286
                     jqueue._JobProcessor.DEFER)
1287

    
1288
    # Job goes back to queued
1289
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1290
    self.assertEqual(job.GetInfo(["opstatus", "opresult"]),
1291
                     [[constants.OP_STATUS_SUCCESS,
1292
                       constants.OP_STATUS_QUEUED,
1293
                       constants.OP_STATUS_QUEUED],
1294
                      ["Res0", None, None]])
1295
    self.assertFalse(job.cur_opctx)
1296

    
1297
    # Writes for waiting, running and result
1298
    for _ in range(3):
1299
      self.assertEqual(queue.GetNextUpdate(), (job, True))
1300

    
1301
    # The next opcode should have expiring lock acquires
1302
    acquire_timeout = True
1303

    
1304
    self._TestQueueShutdown(queue, opexec, job, 1)
1305

    
1306
  def testQueueShutdownWhileInQueue(self):
1307
    # This should never happen in reality (no new jobs are started by the
1308
    # workerpool once a shutdown has been initiated), but it's better to test
1309
    # the job processor for this scenario
1310
    queue = _FakeQueueForProc()
1311

    
1312
    ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
1313
           for i in range(5)]
1314

    
1315
    # Create job
1316
    job_id = 2031
1317
    job = self._CreateJob(queue, job_id, ops)
1318

    
1319
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1320
    self.assertRaises(IndexError, queue.GetNextUpdate)
1321

    
1322
    self.assertFalse(job.start_timestamp)
1323
    self.assertFalse(job.end_timestamp)
1324
    self.assertTrue(compat.all(op.status == constants.OP_STATUS_QUEUED
1325
                               for op in job.ops))
1326

    
1327
    opexec = _FakeExecOpCodeForProc(queue, None, None)
1328
    self._TestQueueShutdown(queue, opexec, job, 0)
1329

    
1330
  def testQueueShutdownWhileWaitlockInQueue(self):
1331
    queue = _FakeQueueForProc()
1332

    
1333
    ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
1334
           for i in range(5)]
1335

    
1336
    # Create job
1337
    job_id = 53125685
1338
    job = self._CreateJob(queue, job_id, ops)
1339

    
1340
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1341

    
1342
    job.ops[0].status = constants.OP_STATUS_WAITING
1343

    
1344
    assert len(job.ops) == 5
1345

    
1346
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITING)
1347

    
1348
    self.assertRaises(IndexError, queue.GetNextUpdate)
1349

    
1350
    opexec = _FakeExecOpCodeForProc(queue, None, None)
1351
    self._TestQueueShutdown(queue, opexec, job, 0)
1352

    
1353
  def testPartiallyRun(self):
1354
    # Tests calling the processor on a job that's been partially run before the
1355
    # program was restarted
1356
    queue = _FakeQueueForProc()
1357

    
1358
    opexec = _FakeExecOpCodeForProc(queue, None, None)
1359

    
1360
    for job_id, successcount in [(30697, 1), (2552, 4), (12489, 9)]:
1361
      ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
1362
             for i in range(10)]
1363

    
1364
      # Create job
1365
      job = self._CreateJob(queue, job_id, ops)
1366

    
1367
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1368

    
1369
      for _ in range(successcount):
1370
        self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
1371
                         jqueue._JobProcessor.DEFER)
1372

    
1373
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1374
      self.assertEqual(job.GetInfo(["opstatus"]),
1375
                       [[constants.OP_STATUS_SUCCESS
1376
                         for _ in range(successcount)] +
1377
                        [constants.OP_STATUS_QUEUED
1378
                         for _ in range(len(ops) - successcount)]])
1379

    
1380
      self.assert_(job.ops_iter)
1381

    
1382
      # Serialize and restore (simulates program restart)
1383
      newjob = jqueue._QueuedJob.Restore(queue, job.Serialize(), True, False)
1384
      self.assertFalse(newjob.ops_iter)
1385
      self._TestPartial(newjob, successcount)
1386

    
1387
  def _TestPartial(self, job, successcount):
1388
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1389
    self.assertEqual(job.start_timestamp, job.ops[0].start_timestamp)
1390

    
1391
    queue = _FakeQueueForProc()
1392
    opexec = _FakeExecOpCodeForProc(queue, None, None)
1393

    
1394
    for remaining in reversed(range(len(job.ops) - successcount)):
1395
      result = jqueue._JobProcessor(queue, opexec, job)()
1396
      self.assertEqual(queue.GetNextUpdate(), (job, True))
1397
      self.assertEqual(queue.GetNextUpdate(), (job, True))
1398
      self.assertEqual(queue.GetNextUpdate(), (job, True))
1399
      self.assertRaises(IndexError, queue.GetNextUpdate)
1400

    
1401
      if remaining == 0:
1402
        # Last opcode
1403
        self.assertEqual(result, jqueue._JobProcessor.FINISHED)
1404
        break
1405

    
1406
      self.assertEqual(result, jqueue._JobProcessor.DEFER)
1407

    
1408
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1409

    
1410
    self.assertRaises(IndexError, queue.GetNextUpdate)
1411
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_SUCCESS)
1412
    self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_SUCCESS])
1413
    self.assertEqual(job.GetInfo(["opresult"]),
1414
                     [[op.input.result for op in job.ops]])
1415
    self.assertEqual(job.GetInfo(["opstatus"]),
1416
                     [[constants.OP_STATUS_SUCCESS for _ in job.ops]])
1417
    self.assert_(compat.all(op.start_timestamp and op.end_timestamp
1418
                            for op in job.ops))
1419

    
1420
    self._GenericCheckJob(job)
1421

    
1422
    # Calling the processor on a finished job should be a no-op
1423
    self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
1424
                     jqueue._JobProcessor.FINISHED)
1425
    self.assertRaises(IndexError, queue.GetNextUpdate)
1426

    
1427
    # ... also after being restored
1428
    job2 = jqueue._QueuedJob.Restore(queue, job.Serialize(), True, False)
1429
    # Calling the processor on a finished job should be a no-op
1430
    self.assertEqual(jqueue._JobProcessor(queue, opexec, job2)(),
1431
                     jqueue._JobProcessor.FINISHED)
1432
    self.assertRaises(IndexError, queue.GetNextUpdate)
1433

    
1434
  def testProcessorOnRunningJob(self):
1435
    ops = [opcodes.OpTestDummy(result="result", fail=False)]
1436

    
1437
    queue = _FakeQueueForProc()
1438
    opexec = _FakeExecOpCodeForProc(queue, None, None)
1439

    
1440
    # Create job
1441
    job = self._CreateJob(queue, 9571, ops)
1442

    
1443
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1444

    
1445
    job.ops[0].status = constants.OP_STATUS_RUNNING
1446

    
1447
    assert len(job.ops) == 1
1448

    
1449
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
1450

    
1451
    # Calling on running job must fail
1452
    self.assertRaises(errors.ProgrammerError,
1453
                      jqueue._JobProcessor(queue, opexec, job))
1454

    
1455
  def testLogMessages(self):
1456
    # Tests the "Feedback" callback function
1457
    queue = _FakeQueueForProc()
1458

    
1459
    messages = {
1460
      1: [
1461
        (None, "Hello"),
1462
        (None, "World"),
1463
        (constants.ELOG_MESSAGE, "there"),
1464
        ],
1465
      4: [
1466
        (constants.ELOG_JQUEUE_TEST, (1, 2, 3)),
1467
        (constants.ELOG_JQUEUE_TEST, ("other", "type")),
1468
        ],
1469
      }
1470
    ops = [opcodes.OpTestDummy(result="Logtest%s" % i, fail=False,
1471
                               messages=messages.get(i, []))
1472
           for i in range(5)]
1473

    
1474
    # Create job
1475
    job = self._CreateJob(queue, 29386, ops)
1476

    
1477
    def _BeforeStart(timeout, priority):
1478
      self.assertEqual(queue.GetNextUpdate(), (job, True))
1479
      self.assertRaises(IndexError, queue.GetNextUpdate)
1480
      self.assertFalse(queue.IsAcquired())
1481
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITING)
1482

    
1483
    def _AfterStart(op, cbs):
1484
      self.assertEqual(queue.GetNextUpdate(), (job, True))
1485
      self.assertRaises(IndexError, queue.GetNextUpdate)
1486
      self.assertFalse(queue.IsAcquired())
1487
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
1488

    
1489
      self.assertRaises(AssertionError, cbs.Feedback,
1490
                        "too", "many", "arguments")
1491

    
1492
      for (log_type, msg) in op.messages:
1493
        self.assertRaises(IndexError, queue.GetNextUpdate)
1494
        if log_type:
1495
          cbs.Feedback(log_type, msg)
1496
        else:
1497
          cbs.Feedback(msg)
1498
        # Check for job update without replication
1499
        self.assertEqual(queue.GetNextUpdate(), (job, False))
1500
        self.assertRaises(IndexError, queue.GetNextUpdate)
1501

    
1502
    opexec = _FakeExecOpCodeForProc(queue, _BeforeStart, _AfterStart)
1503

    
1504
    for remaining in reversed(range(len(job.ops))):
1505
      self.assertRaises(IndexError, queue.GetNextUpdate)
1506
      result = jqueue._JobProcessor(queue, opexec, job)()
1507
      self.assertEqual(queue.GetNextUpdate(), (job, True))
1508
      self.assertRaises(IndexError, queue.GetNextUpdate)
1509

    
1510
      if remaining == 0:
1511
        # Last opcode
1512
        self.assertEqual(result, jqueue._JobProcessor.FINISHED)
1513
        break
1514

    
1515
      self.assertEqual(result, jqueue._JobProcessor.DEFER)
1516

    
1517
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1518

    
1519
    self.assertRaises(IndexError, queue.GetNextUpdate)
1520

    
1521
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_SUCCESS)
1522
    self.assertEqual(job.GetInfo(["opresult"]),
1523
                     [[op.input.result for op in job.ops]])
1524

    
1525
    logmsgcount = sum(len(m) for m in messages.values())
1526

    
1527
    self._CheckLogMessages(job, logmsgcount)
1528

    
1529
    # Serialize and restore (simulates program restart)
1530
    newjob = jqueue._QueuedJob.Restore(queue, job.Serialize(), True, False)
1531
    self._CheckLogMessages(newjob, logmsgcount)
1532

    
1533
    # Check each message
1534
    prevserial = -1
1535
    for idx, oplog in enumerate(job.GetInfo(["oplog"])[0]):
1536
      for (serial, timestamp, log_type, msg) in oplog:
1537
        (exptype, expmsg) = messages.get(idx).pop(0)
1538
        if exptype:
1539
          self.assertEqual(log_type, exptype)
1540
        else:
1541
          self.assertEqual(log_type, constants.ELOG_MESSAGE)
1542
        self.assertEqual(expmsg, msg)
1543
        self.assert_(serial > prevserial)
1544
        prevserial = serial
1545

    
1546
  def _CheckLogMessages(self, job, count):
1547
    # Check serial
1548
    self.assertEqual(job.log_serial, count)
1549

    
1550
    # No filter
1551
    self.assertEqual(job.GetLogEntries(None),
1552
                     [entry for entries in job.GetInfo(["oplog"])[0] if entries
1553
                      for entry in entries])
1554

    
1555
    # Filter with serial
1556
    assert count > 3
1557
    self.assert_(job.GetLogEntries(3))
1558
    self.assertEqual(job.GetLogEntries(3),
1559
                     [entry for entries in job.GetInfo(["oplog"])[0] if entries
1560
                      for entry in entries][3:])
1561

    
1562
    # No log message after highest serial
1563
    self.assertFalse(job.GetLogEntries(count))
1564
    self.assertFalse(job.GetLogEntries(count + 3))
1565

    
1566
  def testSubmitManyJobs(self):
1567
    queue = _FakeQueueForProc()
1568

    
1569
    job_id = 15656
1570
    ops = [
1571
      opcodes.OpTestDummy(result="Res0", fail=False,
1572
                          submit_jobs=[]),
1573
      opcodes.OpTestDummy(result="Res1", fail=False,
1574
                          submit_jobs=[
1575
                            [opcodes.OpTestDummy(result="r1j0", fail=False)],
1576
                            ]),
1577
      opcodes.OpTestDummy(result="Res2", fail=False,
1578
                          submit_jobs=[
1579
                            [opcodes.OpTestDummy(result="r2j0o0", fail=False),
1580
                             opcodes.OpTestDummy(result="r2j0o1", fail=False),
1581
                             opcodes.OpTestDummy(result="r2j0o2", fail=False),
1582
                             opcodes.OpTestDummy(result="r2j0o3", fail=False)],
1583
                            [opcodes.OpTestDummy(result="r2j1", fail=False)],
1584
                            [opcodes.OpTestDummy(result="r2j3o0", fail=False),
1585
                             opcodes.OpTestDummy(result="r2j3o1", fail=False)],
1586
                            ]),
1587
      ]
1588

    
1589
    # Create job
1590
    job = self._CreateJob(queue, job_id, ops)
1591

    
1592
    def _BeforeStart(timeout, priority):
1593
      self.assertEqual(queue.GetNextUpdate(), (job, True))
1594
      self.assertRaises(IndexError, queue.GetNextUpdate)
1595
      self.assertFalse(queue.IsAcquired())
1596
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITING)
1597
      self.assertFalse(job.cur_opctx)
1598

    
1599
    def _AfterStart(op, cbs):
1600
      self.assertEqual(queue.GetNextUpdate(), (job, True))
1601
      self.assertRaises(IndexError, queue.GetNextUpdate)
1602

    
1603
      self.assertFalse(queue.IsAcquired())
1604
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
1605
      self.assertFalse(job.cur_opctx)
1606

    
1607
      # Job is running, cancelling shouldn't be possible
1608
      (success, _) = job.Cancel()
1609
      self.assertFalse(success)
1610

    
1611
    opexec = _FakeExecOpCodeForProc(queue, _BeforeStart, _AfterStart)
1612

    
1613
    for idx in range(len(ops)):
1614
      self.assertRaises(IndexError, queue.GetNextUpdate)
1615
      result = jqueue._JobProcessor(queue, opexec, job)()
1616
      self.assertEqual(queue.GetNextUpdate(), (job, True))
1617
      self.assertRaises(IndexError, queue.GetNextUpdate)
1618
      if idx == len(ops) - 1:
1619
        # Last opcode
1620
        self.assertEqual(result, jqueue._JobProcessor.FINISHED)
1621
      else:
1622
        self.assertEqual(result, jqueue._JobProcessor.DEFER)
1623

    
1624
        self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1625
        self.assert_(job.start_timestamp)
1626
        self.assertFalse(job.end_timestamp)
1627

    
1628
    self.assertRaises(IndexError, queue.GetNextUpdate)
1629

    
1630
    for idx, submitted_ops in enumerate(job_ops
1631
                                        for op in ops
1632
                                        for job_ops in op.submit_jobs):
1633
      self.assertEqual(queue.GetNextSubmittedJob(),
1634
                       (1000 + idx, submitted_ops))
1635
    self.assertRaises(IndexError, queue.GetNextSubmittedJob)
1636

    
1637
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_SUCCESS)
1638
    self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_SUCCESS])
1639
    self.assertEqual(job.GetInfo(["opresult"]),
1640
                     [[[], [1000], [1001, 1002, 1003]]])
1641
    self.assertEqual(job.GetInfo(["opstatus"]),
1642
                     [len(job.ops) * [constants.OP_STATUS_SUCCESS]])
1643

    
1644
    self._GenericCheckJob(job)
1645

    
1646
    # Calling the processor on a finished job should be a no-op
1647
    self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
1648
                     jqueue._JobProcessor.FINISHED)
1649
    self.assertRaises(IndexError, queue.GetNextUpdate)
1650

    
1651
  def testJobDependency(self):
1652
    depmgr = _FakeDependencyManager()
1653
    queue = _FakeQueueForProc(depmgr=depmgr)
1654

    
1655
    self.assertEqual(queue.depmgr, depmgr)
1656

    
1657
    prev_job_id = 22113
1658
    prev_job_id2 = 28102
1659
    job_id = 29929
1660
    ops = [
1661
      opcodes.OpTestDummy(result="Res0", fail=False,
1662
                          depends=[
1663
                            [prev_job_id2, None],
1664
                            [prev_job_id, None],
1665
                            ]),
1666
      opcodes.OpTestDummy(result="Res1", fail=False),
1667
      ]
1668

    
1669
    # Create job
1670
    job = self._CreateJob(queue, job_id, ops)
1671

    
1672
    def _BeforeStart(timeout, priority):
1673
      if attempt == 0 or attempt > 5:
1674
        # Job should only be updated when it wasn't waiting for another job
1675
        self.assertEqual(queue.GetNextUpdate(), (job, True))
1676
      self.assertRaises(IndexError, queue.GetNextUpdate)
1677
      self.assertFalse(queue.IsAcquired())
1678
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITING)
1679
      self.assertFalse(job.cur_opctx)
1680

    
1681
    def _AfterStart(op, cbs):
1682
      self.assertEqual(queue.GetNextUpdate(), (job, True))
1683
      self.assertRaises(IndexError, queue.GetNextUpdate)
1684

    
1685
      self.assertFalse(queue.IsAcquired())
1686
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
1687
      self.assertFalse(job.cur_opctx)
1688

    
1689
      # Job is running, cancelling shouldn't be possible
1690
      (success, _) = job.Cancel()
1691
      self.assertFalse(success)
1692

    
1693
    opexec = _FakeExecOpCodeForProc(queue, _BeforeStart, _AfterStart)
1694

    
1695
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1696

    
1697
    counter = itertools.count()
1698
    while True:
1699
      attempt = counter.next()
1700

    
1701
      self.assertRaises(IndexError, queue.GetNextUpdate)
1702
      self.assertRaises(IndexError, depmgr.GetNextNotification)
1703

    
1704
      if attempt < 2:
1705
        depmgr.AddCheckResult(job, prev_job_id2, None,
1706
                              (jqueue._JobDependencyManager.WAIT, "wait2"))
1707
      elif attempt == 2:
1708
        depmgr.AddCheckResult(job, prev_job_id2, None,
1709
                              (jqueue._JobDependencyManager.CONTINUE, "cont"))
1710
        # The processor will ask for the next dependency immediately
1711
        depmgr.AddCheckResult(job, prev_job_id, None,
1712
                              (jqueue._JobDependencyManager.WAIT, "wait"))
1713
      elif attempt < 5:
1714
        depmgr.AddCheckResult(job, prev_job_id, None,
1715
                              (jqueue._JobDependencyManager.WAIT, "wait"))
1716
      elif attempt == 5:
1717
        depmgr.AddCheckResult(job, prev_job_id, None,
1718
                              (jqueue._JobDependencyManager.CONTINUE, "cont"))
1719
      if attempt == 2:
1720
        self.assertEqual(depmgr.CountPendingResults(), 2)
1721
      elif attempt > 5:
1722
        self.assertEqual(depmgr.CountPendingResults(), 0)
1723
      else:
1724
        self.assertEqual(depmgr.CountPendingResults(), 1)
1725

    
1726
      result = jqueue._JobProcessor(queue, opexec, job)()
1727
      if attempt == 0 or attempt >= 5:
1728
        # Job should only be updated if there was an actual change
1729
        self.assertEqual(queue.GetNextUpdate(), (job, True))
1730
      self.assertRaises(IndexError, queue.GetNextUpdate)
1731
      self.assertFalse(depmgr.CountPendingResults())
1732

    
1733
      if attempt < 5:
1734
        # Simulate waiting for other job
1735
        self.assertEqual(result, jqueue._JobProcessor.WAITDEP)
1736
        self.assertTrue(job.cur_opctx)
1737
        self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITING)
1738
        self.assertRaises(IndexError, depmgr.GetNextNotification)
1739
        self.assert_(job.start_timestamp)
1740
        self.assertFalse(job.end_timestamp)
1741
        continue
1742

    
1743
      if result == jqueue._JobProcessor.FINISHED:
1744
        # Last opcode
1745
        self.assertFalse(job.cur_opctx)
1746
        break
1747

    
1748
      self.assertRaises(IndexError, depmgr.GetNextNotification)
1749

    
1750
      self.assertEqual(result, jqueue._JobProcessor.DEFER)
1751
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1752
      self.assert_(job.start_timestamp)
1753
      self.assertFalse(job.end_timestamp)
1754

    
1755
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_SUCCESS)
1756
    self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_SUCCESS])
1757
    self.assertEqual(job.GetInfo(["opresult"]),
1758
                     [[op.input.result for op in job.ops]])
1759
    self.assertEqual(job.GetInfo(["opstatus"]),
1760
                     [len(job.ops) * [constants.OP_STATUS_SUCCESS]])
1761
    self.assertTrue(compat.all(op.start_timestamp and op.end_timestamp
1762
                               for op in job.ops))
1763

    
1764
    self._GenericCheckJob(job)
1765

    
1766
    self.assertRaises(IndexError, queue.GetNextUpdate)
1767
    self.assertRaises(IndexError, depmgr.GetNextNotification)
1768
    self.assertFalse(depmgr.CountPendingResults())
1769
    self.assertFalse(depmgr.CountWaitingJobs())
1770

    
1771
    # Calling the processor on a finished job should be a no-op
1772
    self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
1773
                     jqueue._JobProcessor.FINISHED)
1774
    self.assertRaises(IndexError, queue.GetNextUpdate)
1775

    
1776
  def testJobDependencyCancel(self):
1777
    depmgr = _FakeDependencyManager()
1778
    queue = _FakeQueueForProc(depmgr=depmgr)
1779

    
1780
    self.assertEqual(queue.depmgr, depmgr)
1781

    
1782
    prev_job_id = 13623
1783
    job_id = 30876
1784
    ops = [
1785
      opcodes.OpTestDummy(result="Res0", fail=False),
1786
      opcodes.OpTestDummy(result="Res1", fail=False,
1787
                          depends=[
1788
                            [prev_job_id, None],
1789
                            ]),
1790
      opcodes.OpTestDummy(result="Res2", fail=False),
1791
      ]
1792

    
1793
    # Create job
1794
    job = self._CreateJob(queue, job_id, ops)
1795

    
1796
    def _BeforeStart(timeout, priority):
1797
      if attempt == 0 or attempt > 5:
1798
        # Job should only be updated when it wasn't waiting for another job
1799
        self.assertEqual(queue.GetNextUpdate(), (job, True))
1800
      self.assertRaises(IndexError, queue.GetNextUpdate)
1801
      self.assertFalse(queue.IsAcquired())
1802
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITING)
1803
      self.assertFalse(job.cur_opctx)
1804

    
1805
    def _AfterStart(op, cbs):
1806
      self.assertEqual(queue.GetNextUpdate(), (job, True))
1807
      self.assertRaises(IndexError, queue.GetNextUpdate)
1808

    
1809
      self.assertFalse(queue.IsAcquired())
1810
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
1811
      self.assertFalse(job.cur_opctx)
1812

    
1813
      # Job is running, cancelling shouldn't be possible
1814
      (success, _) = job.Cancel()
1815
      self.assertFalse(success)
1816

    
1817
    opexec = _FakeExecOpCodeForProc(queue, _BeforeStart, _AfterStart)
1818

    
1819
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1820

    
1821
    counter = itertools.count()
1822
    while True:
1823
      attempt = counter.next()
1824

    
1825
      self.assertRaises(IndexError, queue.GetNextUpdate)
1826
      self.assertRaises(IndexError, depmgr.GetNextNotification)
1827

    
1828
      if attempt == 0:
1829
        # This will handle the first opcode
1830
        pass
1831
      elif attempt < 4:
1832
        depmgr.AddCheckResult(job, prev_job_id, None,
1833
                              (jqueue._JobDependencyManager.WAIT, "wait"))
1834
      elif attempt == 4:
1835
        # Other job was cancelled
1836
        depmgr.AddCheckResult(job, prev_job_id, None,
1837
                              (jqueue._JobDependencyManager.CANCEL, "cancel"))
1838

    
1839
      if attempt == 0:
1840
        self.assertEqual(depmgr.CountPendingResults(), 0)
1841
      else:
1842
        self.assertEqual(depmgr.CountPendingResults(), 1)
1843

    
1844
      result = jqueue._JobProcessor(queue, opexec, job)()
1845
      if attempt <= 1 or attempt >= 4:
1846
        # Job should only be updated if there was an actual change
1847
        self.assertEqual(queue.GetNextUpdate(), (job, True))
1848
      self.assertRaises(IndexError, queue.GetNextUpdate)
1849
      self.assertFalse(depmgr.CountPendingResults())
1850

    
1851
      if attempt > 0 and attempt < 4:
1852
        # Simulate waiting for other job
1853
        self.assertEqual(result, jqueue._JobProcessor.WAITDEP)
1854
        self.assertTrue(job.cur_opctx)
1855
        self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITING)
1856
        self.assertRaises(IndexError, depmgr.GetNextNotification)
1857
        self.assert_(job.start_timestamp)
1858
        self.assertFalse(job.end_timestamp)
1859
        continue
1860

    
1861
      if result == jqueue._JobProcessor.FINISHED:
1862
        # Last opcode
1863
        self.assertFalse(job.cur_opctx)
1864
        break
1865

    
1866
      self.assertRaises(IndexError, depmgr.GetNextNotification)
1867

    
1868
      self.assertEqual(result, jqueue._JobProcessor.DEFER)
1869
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1870
      self.assert_(job.start_timestamp)
1871
      self.assertFalse(job.end_timestamp)
1872

    
1873
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_CANCELED)
1874
    self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_CANCELED])
1875
    self.assertEqual(job.GetInfo(["opstatus", "opresult"]),
1876
                     [[constants.OP_STATUS_SUCCESS,
1877
                       constants.OP_STATUS_CANCELED,
1878
                       constants.OP_STATUS_CANCELED],
1879
                      ["Res0", "Job canceled by request",
1880
                       "Job canceled by request"]])
1881

    
1882
    self._GenericCheckJob(job)
1883

    
1884
    self.assertRaises(IndexError, queue.GetNextUpdate)
1885
    self.assertRaises(IndexError, depmgr.GetNextNotification)
1886
    self.assertFalse(depmgr.CountPendingResults())
1887

    
1888
    # Calling the processor on a finished job should be a no-op
1889
    self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
1890
                     jqueue._JobProcessor.FINISHED)
1891
    self.assertRaises(IndexError, queue.GetNextUpdate)
1892

    
1893
  def testJobDependencyWrongstatus(self):
1894
    depmgr = _FakeDependencyManager()
1895
    queue = _FakeQueueForProc(depmgr=depmgr)
1896

    
1897
    self.assertEqual(queue.depmgr, depmgr)
1898

    
1899
    prev_job_id = 9741
1900
    job_id = 11763
1901
    ops = [
1902
      opcodes.OpTestDummy(result="Res0", fail=False),
1903
      opcodes.OpTestDummy(result="Res1", fail=False,
1904
                          depends=[
1905
                            [prev_job_id, None],
1906
                            ]),
1907
      opcodes.OpTestDummy(result="Res2", fail=False),
1908
      ]
1909

    
1910
    # Create job
1911
    job = self._CreateJob(queue, job_id, ops)
1912

    
1913
    def _BeforeStart(timeout, priority):
1914
      if attempt == 0 or attempt > 5:
1915
        # Job should only be updated when it wasn't waiting for another job
1916
        self.assertEqual(queue.GetNextUpdate(), (job, True))
1917
      self.assertRaises(IndexError, queue.GetNextUpdate)
1918
      self.assertFalse(queue.IsAcquired())
1919
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITING)
1920
      self.assertFalse(job.cur_opctx)
1921

    
1922
    def _AfterStart(op, cbs):
1923
      self.assertEqual(queue.GetNextUpdate(), (job, True))
1924
      self.assertRaises(IndexError, queue.GetNextUpdate)
1925

    
1926
      self.assertFalse(queue.IsAcquired())
1927
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
1928
      self.assertFalse(job.cur_opctx)
1929

    
1930
      # Job is running, cancelling shouldn't be possible
1931
      (success, _) = job.Cancel()
1932
      self.assertFalse(success)
1933

    
1934
    opexec = _FakeExecOpCodeForProc(queue, _BeforeStart, _AfterStart)
1935

    
1936
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1937

    
1938
    counter = itertools.count()
1939
    while True:
1940
      attempt = counter.next()
1941

    
1942
      self.assertRaises(IndexError, queue.GetNextUpdate)
1943
      self.assertRaises(IndexError, depmgr.GetNextNotification)
1944

    
1945
      if attempt == 0:
1946
        # This will handle the first opcode
1947
        pass
1948
      elif attempt < 4:
1949
        depmgr.AddCheckResult(job, prev_job_id, None,
1950
                              (jqueue._JobDependencyManager.WAIT, "wait"))
1951
      elif attempt == 4:
1952
        # Other job failed
1953
        depmgr.AddCheckResult(job, prev_job_id, None,
1954
                              (jqueue._JobDependencyManager.WRONGSTATUS, "w"))
1955

    
1956
      if attempt == 0:
1957
        self.assertEqual(depmgr.CountPendingResults(), 0)
1958
      else:
1959
        self.assertEqual(depmgr.CountPendingResults(), 1)
1960

    
1961
      result = jqueue._JobProcessor(queue, opexec, job)()
1962
      if attempt <= 1 or attempt >= 4:
1963
        # Job should only be updated if there was an actual change
1964
        self.assertEqual(queue.GetNextUpdate(), (job, True))
1965
      self.assertRaises(IndexError, queue.GetNextUpdate)
1966
      self.assertFalse(depmgr.CountPendingResults())
1967

    
1968
      if attempt > 0 and attempt < 4:
1969
        # Simulate waiting for other job
1970
        self.assertEqual(result, jqueue._JobProcessor.WAITDEP)
1971
        self.assertTrue(job.cur_opctx)
1972
        self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITING)
1973
        self.assertRaises(IndexError, depmgr.GetNextNotification)
1974
        self.assert_(job.start_timestamp)
1975
        self.assertFalse(job.end_timestamp)
1976
        continue
1977

    
1978
      if result == jqueue._JobProcessor.FINISHED:
1979
        # Last opcode
1980
        self.assertFalse(job.cur_opctx)
1981
        break
1982

    
1983
      self.assertRaises(IndexError, depmgr.GetNextNotification)
1984

    
1985
      self.assertEqual(result, jqueue._JobProcessor.DEFER)
1986
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1987
      self.assert_(job.start_timestamp)
1988
      self.assertFalse(job.end_timestamp)
1989

    
1990
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_ERROR)
1991
    self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_ERROR])
1992
    self.assertEqual(job.GetInfo(["opstatus"]),
1993
                     [[constants.OP_STATUS_SUCCESS,
1994
                       constants.OP_STATUS_ERROR,
1995
                       constants.OP_STATUS_ERROR]]),
1996

    
1997
    (opresult, ) = job.GetInfo(["opresult"])
1998
    self.assertEqual(len(opresult), len(ops))
1999
    self.assertEqual(opresult[0], "Res0")
2000
    self.assertTrue(errors.GetEncodedError(opresult[1]))
2001
    self.assertTrue(errors.GetEncodedError(opresult[2]))
2002

    
2003
    self._GenericCheckJob(job)
2004

    
2005
    self.assertRaises(IndexError, queue.GetNextUpdate)
2006
    self.assertRaises(IndexError, depmgr.GetNextNotification)
2007
    self.assertFalse(depmgr.CountPendingResults())
2008

    
2009
    # Calling the processor on a finished job should be a no-op
2010
    self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
2011
                     jqueue._JobProcessor.FINISHED)
2012
    self.assertRaises(IndexError, queue.GetNextUpdate)
2013

    
2014

    
2015
class TestEvaluateJobProcessorResult(unittest.TestCase):
2016
  def testFinished(self):
2017
    depmgr = _FakeDependencyManager()
2018
    job = _IdOnlyFakeJob(30953)
2019
    jqueue._EvaluateJobProcessorResult(depmgr, job,
2020
                                       jqueue._JobProcessor.FINISHED)
2021
    self.assertEqual(depmgr.GetNextNotification(), job.id)
2022
    self.assertRaises(IndexError, depmgr.GetNextNotification)
2023

    
2024
  def testDefer(self):
2025
    depmgr = _FakeDependencyManager()
2026
    job = _IdOnlyFakeJob(11326, priority=5463)
2027
    try:
2028
      jqueue._EvaluateJobProcessorResult(depmgr, job,
2029
                                         jqueue._JobProcessor.DEFER)
2030
    except workerpool.DeferTask, err:
2031
      self.assertEqual(err.priority, 5463)
2032
    else:
2033
      self.fail("Didn't raise exception")
2034
    self.assertRaises(IndexError, depmgr.GetNextNotification)
2035

    
2036
  def testWaitdep(self):
2037
    depmgr = _FakeDependencyManager()
2038
    job = _IdOnlyFakeJob(21317)
2039
    jqueue._EvaluateJobProcessorResult(depmgr, job,
2040
                                       jqueue._JobProcessor.WAITDEP)
2041
    self.assertRaises(IndexError, depmgr.GetNextNotification)
2042

    
2043
  def testOther(self):
2044
    depmgr = _FakeDependencyManager()
2045
    job = _IdOnlyFakeJob(5813)
2046
    self.assertRaises(errors.ProgrammerError,
2047
                      jqueue._EvaluateJobProcessorResult,
2048
                      depmgr, job, "Other result")
2049
    self.assertRaises(IndexError, depmgr.GetNextNotification)
2050

    
2051

    
2052
class _FakeTimeoutStrategy:
2053
  def __init__(self, timeouts):
2054
    self.timeouts = timeouts
2055
    self.attempts = 0
2056
    self.last_timeout = None
2057

    
2058
  def NextAttempt(self):
2059
    self.attempts += 1
2060
    if self.timeouts:
2061
      timeout = self.timeouts.pop(0)
2062
    else:
2063
      timeout = None
2064
    self.last_timeout = timeout
2065
    return timeout
2066

    
2067

    
2068
class TestJobProcessorTimeouts(unittest.TestCase, _JobProcessorTestUtils):
2069
  def setUp(self):
2070
    self.queue = _FakeQueueForProc()
2071
    self.job = None
2072
    self.curop = None
2073
    self.opcounter = None
2074
    self.timeout_strategy = None
2075
    self.retries = 0
2076
    self.prev_tsop = None
2077
    self.prev_prio = None
2078
    self.prev_status = None
2079
    self.lock_acq_prio = None
2080
    self.gave_lock = None
2081
    self.done_lock_before_blocking = False
2082

    
2083
  def _BeforeStart(self, timeout, priority):
2084
    job = self.job
2085

    
2086
    # If status has changed, job must've been written
2087
    if self.prev_status != self.job.ops[self.curop].status:
2088
      self.assertEqual(self.queue.GetNextUpdate(), (job, True))
2089
    self.assertRaises(IndexError, self.queue.GetNextUpdate)
2090

    
2091
    self.assertFalse(self.queue.IsAcquired())
2092
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITING)
2093

    
2094
    ts = self.timeout_strategy
2095

    
2096
    self.assert_(timeout is None or isinstance(timeout, (int, float)))
2097
    self.assertEqual(timeout, ts.last_timeout)
2098
    self.assertEqual(priority, job.ops[self.curop].priority)
2099

    
2100
    self.gave_lock = True
2101
    self.lock_acq_prio = priority
2102

    
2103
    if (self.curop == 3 and
2104
        job.ops[self.curop].priority == constants.OP_PRIO_HIGHEST + 3):
2105
      # Give locks before running into blocking acquire
2106
      assert self.retries == 7
2107
      self.retries = 0
2108
      self.done_lock_before_blocking = True
2109
      return
2110

    
2111
    if self.retries > 0:
2112
      self.assert_(timeout is not None)
2113
      self.retries -= 1
2114
      self.gave_lock = False
2115
      raise mcpu.LockAcquireTimeout()
2116

    
2117
    if job.ops[self.curop].priority == constants.OP_PRIO_HIGHEST:
2118
      assert self.retries == 0, "Didn't exhaust all retries at highest priority"
2119
      assert not ts.timeouts
2120
      self.assert_(timeout is None)
2121

    
2122
  def _AfterStart(self, op, cbs):
2123
    job = self.job
2124

    
2125
    # Setting to "running" requires an update
2126
    self.assertEqual(self.queue.GetNextUpdate(), (job, True))
2127
    self.assertRaises(IndexError, self.queue.GetNextUpdate)
2128

    
2129
    self.assertFalse(self.queue.IsAcquired())
2130
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
2131

    
2132
    # Job is running, cancelling shouldn't be possible
2133
    (success, _) = job.Cancel()
2134
    self.assertFalse(success)
2135

    
2136
  def _NextOpcode(self):
2137
    self.curop = self.opcounter.next()
2138
    self.prev_prio = self.job.ops[self.curop].priority
2139
    self.prev_status = self.job.ops[self.curop].status
2140

    
2141
  def _NewTimeoutStrategy(self):
2142
    job = self.job
2143

    
2144
    self.assertEqual(self.retries, 0)
2145

    
2146
    if self.prev_tsop == self.curop:
2147
      # Still on the same opcode, priority must've been increased
2148
      self.assertEqual(self.prev_prio, job.ops[self.curop].priority + 1)
2149

    
2150
    if self.curop == 1:
2151
      # Normal retry
2152
      timeouts = range(10, 31, 10)
2153
      self.retries = len(timeouts) - 1
2154

    
2155
    elif self.curop == 2:
2156
      # Let this run into a blocking acquire
2157
      timeouts = range(11, 61, 12)
2158
      self.retries = len(timeouts)
2159

    
2160
    elif self.curop == 3:
2161
      # Wait for priority to increase, but give lock before blocking acquire
2162
      timeouts = range(12, 100, 14)
2163
      self.retries = len(timeouts)
2164

    
2165
      self.assertFalse(self.done_lock_before_blocking)
2166

    
2167
    elif self.curop == 4:
2168
      self.assert_(self.done_lock_before_blocking)
2169

    
2170
      # Timeouts, but no need to retry
2171
      timeouts = range(10, 31, 10)
2172
      self.retries = 0
2173

    
2174
    elif self.curop == 5:
2175
      # Normal retry
2176
      timeouts = range(19, 100, 11)
2177
      self.retries = len(timeouts)
2178

    
2179
    else:
2180
      timeouts = []
2181
      self.retries = 0
2182

    
2183
    assert len(job.ops) == 10
2184
    assert self.retries <= len(timeouts)
2185

    
2186
    ts = _FakeTimeoutStrategy(timeouts)
2187

    
2188
    self.timeout_strategy = ts
2189
    self.prev_tsop = self.curop
2190
    self.prev_prio = job.ops[self.curop].priority
2191

    
2192
    return ts
2193

    
2194
  def testTimeout(self):
2195
    ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
2196
           for i in range(10)]
2197

    
2198
    # Create job
2199
    job_id = 15801
2200
    job = self._CreateJob(self.queue, job_id, ops)
2201
    self.job = job
2202

    
2203
    self.opcounter = itertools.count(0)
2204

    
2205
    opexec = _FakeExecOpCodeForProc(self.queue, self._BeforeStart,
2206
                                    self._AfterStart)
2207
    tsf = self._NewTimeoutStrategy
2208

    
2209
    self.assertFalse(self.done_lock_before_blocking)
2210

    
2211
    while True:
2212
      proc = jqueue._JobProcessor(self.queue, opexec, job,
2213
                                  _timeout_strategy_factory=tsf)
2214

    
2215
      self.assertRaises(IndexError, self.queue.GetNextUpdate)
2216

    
2217
      if self.curop is not None:
2218
        self.prev_status = self.job.ops[self.curop].status
2219

    
2220
      self.lock_acq_prio = None
2221

    
2222
      result = proc(_nextop_fn=self._NextOpcode)
2223
      assert self.curop is not None
2224

    
2225
      # Input priority should never be set or modified
2226
      self.assertFalse(compat.any(hasattr(op.input, "priority")
2227
                                  for op in job.ops))
2228

    
2229
      if result == jqueue._JobProcessor.FINISHED or self.gave_lock:
2230
        # Got lock and/or job is done, result must've been written
2231
        self.assertFalse(job.cur_opctx)
2232
        self.assertEqual(self.queue.GetNextUpdate(), (job, True))
2233
        self.assertRaises(IndexError, self.queue.GetNextUpdate)
2234
        self.assertEqual(self.lock_acq_prio, job.ops[self.curop].priority)
2235
        self.assert_(job.ops[self.curop].exec_timestamp)
2236

    
2237
      if result == jqueue._JobProcessor.FINISHED:
2238
        self.assertFalse(job.cur_opctx)
2239
        break
2240

    
2241
      self.assertEqual(result, jqueue._JobProcessor.DEFER)
2242

    
2243
      if self.curop == 0:
2244
        self.assertEqual(job.ops[self.curop].start_timestamp,
2245
                         job.start_timestamp)
2246

    
2247
      if self.gave_lock:
2248
        # Opcode finished, but job not yet done
2249
        self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
2250
      else:
2251
        # Did not get locks
2252
        self.assert_(job.cur_opctx)
2253
        self.assertEqual(job.cur_opctx._timeout_strategy._fn,
2254
                         self.timeout_strategy.NextAttempt)
2255
        self.assertFalse(job.ops[self.curop].exec_timestamp)
2256
        self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITING)
2257

    
2258
        # If priority has changed since acquiring locks, the job must've been
2259
        # updated
2260
        if self.lock_acq_prio != job.ops[self.curop].priority:
2261
          self.assertEqual(self.queue.GetNextUpdate(), (job, True))
2262

    
2263
      self.assertRaises(IndexError, self.queue.GetNextUpdate)
2264

    
2265
      self.assert_(job.start_timestamp)
2266
      self.assertFalse(job.end_timestamp)
2267

    
2268
    self.assertEqual(self.curop, len(job.ops) - 1)
2269
    self.assertEqual(self.job, job)
2270
    self.assertEqual(self.opcounter.next(), len(job.ops))
2271
    self.assert_(self.done_lock_before_blocking)
2272

    
2273
    self.assertRaises(IndexError, self.queue.GetNextUpdate)
2274
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_SUCCESS)
2275
    self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_SUCCESS])
2276
    self.assertEqual(job.GetInfo(["opresult"]),
2277
                     [[op.input.result for op in job.ops]])
2278
    self.assertEqual(job.GetInfo(["opstatus"]),
2279
                     [len(job.ops) * [constants.OP_STATUS_SUCCESS]])
2280
    self.assert_(compat.all(op.start_timestamp and op.end_timestamp
2281
                            for op in job.ops))
2282

    
2283
    # Calling the processor on a finished job should be a no-op
2284
    self.assertEqual(jqueue._JobProcessor(self.queue, opexec, job)(),
2285
                     jqueue._JobProcessor.FINISHED)
2286
    self.assertRaises(IndexError, self.queue.GetNextUpdate)
2287

    
2288

    
2289
class TestJobProcessorChangePriority(unittest.TestCase, _JobProcessorTestUtils):
2290
  def setUp(self):
2291
    self.queue = _FakeQueueForProc()
2292
    self.opexecprio = []
2293

    
2294
  def _BeforeStart(self, timeout, priority):
2295
    self.assertFalse(self.queue.IsAcquired())
2296
    self.opexecprio.append(priority)
2297

    
2298
  def testChangePriorityWhileRunning(self):
2299
    # Tests changing the priority on a job while it has finished opcodes
2300
    # (successful) and more, unprocessed ones
2301
    ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
2302
           for i in range(3)]
2303

    
2304
    # Create job
2305
    job_id = 3499
2306
    job = self._CreateJob(self.queue, job_id, ops)
2307

    
2308
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
2309

    
2310
    opexec = _FakeExecOpCodeForProc(self.queue, self._BeforeStart, None)
2311

    
2312
    # Run first opcode
2313
    self.assertEqual(jqueue._JobProcessor(self.queue, opexec, job)(),
2314
                     jqueue._JobProcessor.DEFER)
2315

    
2316
    # Job goes back to queued
2317
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
2318
    self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT)
2319
    self.assertEqual(job.GetInfo(["opstatus", "opresult"]),
2320
                     [[constants.OP_STATUS_SUCCESS,
2321
                       constants.OP_STATUS_QUEUED,
2322
                       constants.OP_STATUS_QUEUED],
2323
                      ["Res0", None, None]])
2324

    
2325
    self.assertEqual(self.opexecprio.pop(0), constants.OP_PRIO_DEFAULT)
2326
    self.assertRaises(IndexError, self.opexecprio.pop, 0)
2327

    
2328
    # Change priority
2329
    self.assertEqual(job.ChangePriority(-10),
2330
                     (True,
2331
                      ("Priorities of pending opcodes for job 3499 have"
2332
                       " been changed to -10")))
2333
    self.assertEqual(job.CalcPriority(), -10)
2334

    
2335
    # Process second opcode
2336
    self.assertEqual(jqueue._JobProcessor(self.queue, opexec, job)(),
2337
                     jqueue._JobProcessor.DEFER)
2338

    
2339
    self.assertEqual(self.opexecprio.pop(0), -10)
2340
    self.assertRaises(IndexError, self.opexecprio.pop, 0)
2341

    
2342
    # Check status
2343
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
2344
    self.assertEqual(job.CalcPriority(), -10)
2345
    self.assertEqual(job.GetInfo(["id"]), [job_id])
2346
    self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_QUEUED])
2347
    self.assertEqual(job.GetInfo(["opstatus", "opresult"]),
2348
                     [[constants.OP_STATUS_SUCCESS,
2349
                       constants.OP_STATUS_SUCCESS,
2350
                       constants.OP_STATUS_QUEUED],
2351
                      ["Res0", "Res1", None]])
2352

    
2353
    # Change priority once more
2354
    self.assertEqual(job.ChangePriority(5),
2355
                     (True,
2356
                      ("Priorities of pending opcodes for job 3499 have"
2357
                       " been changed to 5")))
2358
    self.assertEqual(job.CalcPriority(), 5)
2359

    
2360
    # Process third opcode
2361
    self.assertEqual(jqueue._JobProcessor(self.queue, opexec, job)(),
2362
                     jqueue._JobProcessor.FINISHED)
2363

    
2364
    self.assertEqual(self.opexecprio.pop(0), 5)
2365
    self.assertRaises(IndexError, self.opexecprio.pop, 0)
2366

    
2367
    # Check status
2368
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_SUCCESS)
2369
    self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT)
2370
    self.assertEqual(job.GetInfo(["id"]), [job_id])
2371
    self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_SUCCESS])
2372
    self.assertEqual(job.GetInfo(["opstatus", "opresult"]),
2373
                     [[constants.OP_STATUS_SUCCESS,
2374
                       constants.OP_STATUS_SUCCESS,
2375
                       constants.OP_STATUS_SUCCESS],
2376
                      ["Res0", "Res1", "Res2"]])
2377
    self.assertEqual(map(operator.attrgetter("priority"), job.ops),
2378
                     [constants.OP_PRIO_DEFAULT, -10, 5])
2379

    
2380

    
2381
class _IdOnlyFakeJob:
2382
  def __init__(self, job_id, priority=NotImplemented):
2383
    self.id = str(job_id)
2384
    self._priority = priority
2385

    
2386
  def CalcPriority(self):
2387
    return self._priority
2388

    
2389

    
2390
class TestJobDependencyManager(unittest.TestCase):
2391
  def setUp(self):
2392
    self._status = []
2393
    self._queue = []
2394
    self.jdm = jqueue._JobDependencyManager(self._GetStatus, self._Enqueue)
2395

    
2396
  def _GetStatus(self, job_id):
2397
    (exp_job_id, result) = self._status.pop(0)
2398
    self.assertEqual(exp_job_id, job_id)
2399
    return result
2400

    
2401
  def _Enqueue(self, jobs):
2402
    self.assertFalse(self.jdm._lock.is_owned(),
2403
                     msg=("Must not own manager lock while re-adding jobs"
2404
                          " (potential deadlock)"))
2405
    self._queue.append(jobs)
2406

    
2407
  def testNotFinalizedThenCancel(self):
2408
    job = _IdOnlyFakeJob(17697)
2409
    job_id = str(28625)
2410

    
2411
    self._status.append((job_id, constants.JOB_STATUS_RUNNING))
2412
    (result, _) = self.jdm.CheckAndRegister(job, job_id, [])
2413
    self.assertEqual(result, self.jdm.WAIT)
2414
    self.assertFalse(self._status)
2415
    self.assertFalse(self._queue)
2416
    self.assertTrue(self.jdm.JobWaiting(job))
2417
    self.assertEqual(self.jdm._waiters, {
2418
      job_id: set([job]),
2419
      })
2420
    self.assertEqual(self.jdm.GetLockInfo([query.LQ_PENDING]), [
2421
      ("job/28625", None, None, [("job", [job.id])])
2422
      ])
2423

    
2424
    self._status.append((job_id, constants.JOB_STATUS_CANCELED))
2425
    (result, _) = self.jdm.CheckAndRegister(job, job_id, [])
2426
    self.assertEqual(result, self.jdm.CANCEL)
2427
    self.assertFalse(self._status)
2428
    self.assertFalse(self._queue)
2429
    self.assertFalse(self.jdm.JobWaiting(job))
2430
    self.assertFalse(self.jdm.GetLockInfo([query.LQ_PENDING]))
2431

    
2432
  def testNotFinalizedThenQueued(self):
2433
    # This can happen on a queue shutdown
2434
    job = _IdOnlyFakeJob(1320)
2435
    job_id = str(22971)
2436

    
2437
    for i in range(5):
2438
      if i > 2:
2439
        self._status.append((job_id, constants.JOB_STATUS_QUEUED))
2440
      else:
2441
        self._status.append((job_id, constants.JOB_STATUS_RUNNING))
2442
      (result, _) = self.jdm.CheckAndRegister(job, job_id, [])
2443
      self.assertEqual(result, self.jdm.WAIT)
2444
      self.assertFalse(self._status)
2445
      self.assertFalse(self._queue)
2446
      self.assertTrue(self.jdm.JobWaiting(job))
2447
      self.assertEqual(self.jdm._waiters, {
2448
        job_id: set([job]),
2449
        })
2450
      self.assertEqual(self.jdm.GetLockInfo([query.LQ_PENDING]), [
2451
        ("job/22971", None, None, [("job", [job.id])])
2452
        ])
2453

    
2454
  def testRequireCancel(self):
2455
    job = _IdOnlyFakeJob(5278)
2456
    job_id = str(9610)
2457
    dep_status = [constants.JOB_STATUS_CANCELED]
2458

    
2459
    self._status.append((job_id, constants.JOB_STATUS_WAITING))
2460
    (result, _) = self.jdm.CheckAndRegister(job, job_id, dep_status)
2461
    self.assertEqual(result, self.jdm.WAIT)
2462
    self.assertFalse(self._status)
2463
    self.assertFalse(self._queue)
2464
    self.assertTrue(self.jdm.JobWaiting(job))
2465
    self.assertEqual(self.jdm._waiters, {
2466
      job_id: set([job]),
2467
      })
2468
    self.assertEqual(self.jdm.GetLockInfo([query.LQ_PENDING]), [
2469
      ("job/9610", None, None, [("job", [job.id])])
2470
      ])
2471

    
2472
    self._status.append((job_id, constants.JOB_STATUS_CANCELED))
2473
    (result, _) = self.jdm.CheckAndRegister(job, job_id, dep_status)
2474
    self.assertEqual(result, self.jdm.CONTINUE)
2475
    self.assertFalse(self._status)
2476
    self.assertFalse(self._queue)
2477
    self.assertFalse(self.jdm.JobWaiting(job))
2478
    self.assertFalse(self.jdm.GetLockInfo([query.LQ_PENDING]))
2479

    
2480
  def testRequireError(self):
2481
    job = _IdOnlyFakeJob(21459)
2482
    job_id = str(25519)
2483
    dep_status = [constants.JOB_STATUS_ERROR]
2484

    
2485
    self._status.append((job_id, constants.JOB_STATUS_WAITING))
2486
    (result, _) = self.jdm.CheckAndRegister(job, job_id, dep_status)
2487
    self.assertEqual(result, self.jdm.WAIT)
2488
    self.assertFalse(self._status)
2489
    self.assertFalse(self._queue)
2490
    self.assertTrue(self.jdm.JobWaiting(job))
2491
    self.assertEqual(self.jdm._waiters, {
2492
      job_id: set([job]),
2493
      })
2494

    
2495
    self._status.append((job_id, constants.JOB_STATUS_ERROR))
2496
    (result, _) = self.jdm.CheckAndRegister(job, job_id, dep_status)
2497
    self.assertEqual(result, self.jdm.CONTINUE)
2498
    self.assertFalse(self._status)
2499
    self.assertFalse(self._queue)
2500
    self.assertFalse(self.jdm.JobWaiting(job))
2501
    self.assertFalse(self.jdm.GetLockInfo([query.LQ_PENDING]))
2502

    
2503
  def testRequireMultiple(self):
2504
    dep_status = list(constants.JOBS_FINALIZED)
2505

    
2506
    for end_status in dep_status:
2507
      job = _IdOnlyFakeJob(21343)
2508
      job_id = str(14609)
2509

    
2510
      self._status.append((job_id, constants.JOB_STATUS_WAITING))
2511
      (result, _) = self.jdm.CheckAndRegister(job, job_id, dep_status)
2512
      self.assertEqual(result, self.jdm.WAIT)
2513
      self.assertFalse(self._status)
2514
      self.assertFalse(self._queue)
2515
      self.assertTrue(self.jdm.JobWaiting(job))
2516
      self.assertEqual(self.jdm._waiters, {
2517
        job_id: set([job]),
2518
        })
2519
      self.assertEqual(self.jdm.GetLockInfo([query.LQ_PENDING]), [
2520
        ("job/14609", None, None, [("job", [job.id])])
2521
        ])
2522

    
2523
      self._status.append((job_id, end_status))
2524
      (result, _) = self.jdm.CheckAndRegister(job, job_id, dep_status)
2525
      self.assertEqual(result, self.jdm.CONTINUE)
2526
      self.assertFalse(self._status)
2527
      self.assertFalse(self._queue)
2528
      self.assertFalse(self.jdm.JobWaiting(job))
2529
      self.assertFalse(self.jdm.GetLockInfo([query.LQ_PENDING]))
2530

    
2531
  def testNotify(self):
2532
    job = _IdOnlyFakeJob(8227)
2533
    job_id = str(4113)
2534

    
2535
    self._status.append((job_id, constants.JOB_STATUS_RUNNING))
2536
    (result, _) = self.jdm.CheckAndRegister(job, job_id, [])
2537
    self.assertEqual(result, self.jdm.WAIT)
2538
    self.assertFalse(self._status)
2539
    self.assertFalse(self._queue)
2540
    self.assertTrue(self.jdm.JobWaiting(job))
2541
    self.assertEqual(self.jdm._waiters, {
2542
      job_id: set([job]),
2543
      })
2544

    
2545
    self.jdm.NotifyWaiters(job_id)
2546
    self.assertFalse(self._status)
2547
    self.assertFalse(self.jdm._waiters)
2548
    self.assertFalse(self.jdm.JobWaiting(job))
2549
    self.assertEqual(self._queue, [set([job])])
2550

    
2551
  def testWrongStatus(self):
2552
    job = _IdOnlyFakeJob(10102)
2553
    job_id = str(1271)
2554

    
2555
    self._status.append((job_id, constants.JOB_STATUS_QUEUED))
2556
    (result, _) = self.jdm.CheckAndRegister(job, job_id,
2557
                                            [constants.JOB_STATUS_SUCCESS])
2558
    self.assertEqual(result, self.jdm.WAIT)
2559
    self.assertFalse(self._status)
2560
    self.assertFalse(self._queue)
2561
    self.assertTrue(self.jdm.JobWaiting(job))
2562
    self.assertEqual(self.jdm._waiters, {
2563
      job_id: set([job]),
2564
      })
2565

    
2566
    self._status.append((job_id, constants.JOB_STATUS_ERROR))
2567
    (result, _) = self.jdm.CheckAndRegister(job, job_id,
2568
                                            [constants.JOB_STATUS_SUCCESS])
2569
    self.assertEqual(result, self.jdm.WRONGSTATUS)
2570
    self.assertFalse(self._status)
2571
    self.assertFalse(self._queue)
2572
    self.assertFalse(self.jdm.JobWaiting(job))
2573

    
2574
  def testCorrectStatus(self):
2575
    job = _IdOnlyFakeJob(24273)
2576
    job_id = str(23885)
2577

    
2578
    self._status.append((job_id, constants.JOB_STATUS_QUEUED))
2579
    (result, _) = self.jdm.CheckAndRegister(job, job_id,
2580
                                            [constants.JOB_STATUS_SUCCESS])
2581
    self.assertEqual(result, self.jdm.WAIT)
2582
    self.assertFalse(self._status)
2583
    self.assertFalse(self._queue)
2584
    self.assertTrue(self.jdm.JobWaiting(job))
2585
    self.assertEqual(self.jdm._waiters, {
2586
      job_id: set([job]),
2587
      })
2588

    
2589
    self._status.append((job_id, constants.JOB_STATUS_SUCCESS))
2590
    (result, _) = self.jdm.CheckAndRegister(job, job_id,
2591
                                            [constants.JOB_STATUS_SUCCESS])
2592
    self.assertEqual(result, self.jdm.CONTINUE)
2593
    self.assertFalse(self._status)
2594
    self.assertFalse(self._queue)
2595
    self.assertFalse(self.jdm.JobWaiting(job))
2596

    
2597
  def testFinalizedRightAway(self):
2598
    job = _IdOnlyFakeJob(224)
2599
    job_id = str(3081)
2600

    
2601
    self._status.append((job_id, constants.JOB_STATUS_SUCCESS))
2602
    (result, _) = self.jdm.CheckAndRegister(job, job_id,
2603
                                            [constants.JOB_STATUS_SUCCESS])
2604
    self.assertEqual(result, self.jdm.CONTINUE)
2605
    self.assertFalse(self._status)
2606
    self.assertFalse(self._queue)
2607
    self.assertFalse(self.jdm.JobWaiting(job))
2608
    self.assertEqual(self.jdm._waiters, {
2609
      job_id: set(),
2610
      })
2611

    
2612
    # Force cleanup
2613
    self.jdm.NotifyWaiters("0")
2614
    self.assertFalse(self.jdm._waiters)
2615
    self.assertFalse(self._status)
2616
    self.assertFalse(self._queue)
2617

    
2618
  def testMultipleWaiting(self):
2619
    # Use a deterministic random generator
2620
    rnd = random.Random(21402)
2621

    
2622
    job_ids = map(str, rnd.sample(range(1, 10000), 150))
2623

    
2624
    waiters = dict((job_ids.pop(),
2625
                    set(map(_IdOnlyFakeJob,
2626
                            [job_ids.pop()
2627
                             for _ in range(rnd.randint(1, 20))])))
2628
                   for _ in range(10))
2629

    
2630
    # Ensure there are no duplicate job IDs
2631
    assert not utils.FindDuplicates(waiters.keys() +
2632
                                    [job.id
2633
                                     for jobs in waiters.values()
2634
                                     for job in jobs])
2635

    
2636
    # Register all jobs as waiters
2637
    for job_id, job in [(job_id, job)
2638
                        for (job_id, jobs) in waiters.items()
2639
                        for job in jobs]:
2640
      self._status.append((job_id, constants.JOB_STATUS_QUEUED))
2641
      (result, _) = self.jdm.CheckAndRegister(job, job_id,
2642
                                              [constants.JOB_STATUS_SUCCESS])
2643
      self.assertEqual(result, self.jdm.WAIT)
2644
      self.assertFalse(self._status)
2645
      self.assertFalse(self._queue)
2646
      self.assertTrue(self.jdm.JobWaiting(job))
2647

    
2648
    self.assertEqual(self.jdm._waiters, waiters)
2649

    
2650
    def _MakeSet((name, mode, owner_names, pending)):
2651
      return (name, mode, owner_names,
2652
              [(pendmode, set(pend)) for (pendmode, pend) in pending])
2653

    
2654
    def _CheckLockInfo():
2655
      info = self.jdm.GetLockInfo([query.LQ_PENDING])
2656
      self.assertEqual(sorted(map(_MakeSet, info)), sorted([
2657
        ("job/%s" % job_id, None, None,
2658
         [("job", set([job.id for job in jobs]))])
2659
        for job_id, jobs in waiters.items()
2660
        if jobs
2661
        ]))
2662

    
2663
    _CheckLockInfo()
2664

    
2665
    # Notify in random order
2666
    for job_id in rnd.sample(waiters, len(waiters)):
2667
      # Remove from pending waiter list
2668
      jobs = waiters.pop(job_id)
2669
      for job in jobs:
2670
        self._status.append((job_id, constants.JOB_STATUS_SUCCESS))
2671
        (result, _) = self.jdm.CheckAndRegister(job, job_id,
2672
                                                [constants.JOB_STATUS_SUCCESS])
2673
        self.assertEqual(result, self.jdm.CONTINUE)
2674
        self.assertFalse(self._status)
2675
        self.assertFalse(self._queue)
2676
        self.assertFalse(self.jdm.JobWaiting(job))
2677

    
2678
      _CheckLockInfo()
2679

    
2680
    self.assertFalse(self.jdm.GetLockInfo([query.LQ_PENDING]))
2681

    
2682
    assert not waiters
2683

    
2684
  def testSelfDependency(self):
2685
    job = _IdOnlyFakeJob(18937)
2686

    
2687
    self._status.append((job.id, constants.JOB_STATUS_SUCCESS))
2688
    (result, _) = self.jdm.CheckAndRegister(job, job.id, [])
2689
    self.assertEqual(result, self.jdm.ERROR)
2690

    
2691
  def testJobDisappears(self):
2692
    job = _IdOnlyFakeJob(30540)
2693
    job_id = str(23769)
2694

    
2695
    def _FakeStatus(_):
2696
      raise errors.JobLost("#msg#")
2697

    
2698
    jdm = jqueue._JobDependencyManager(_FakeStatus, None)
2699
    (result, _) = jdm.CheckAndRegister(job, job_id, [])
2700
    self.assertEqual(result, self.jdm.ERROR)
2701
    self.assertFalse(jdm.JobWaiting(job))
2702
    self.assertFalse(jdm.GetLockInfo([query.LQ_PENDING]))
2703

    
2704

    
2705
if __name__ == "__main__":
2706
  testutils.GanetiTestProgram()