Statistics
| Branch: | Tag: | Revision:

root / test / ganeti.jqueue_unittest.py @ fb60bc6a

History | View | Annotate | Download (94.5 kB)

1
#!/usr/bin/python
2
#
3

    
4
# Copyright (C) 2010, 2011, 2012 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Script for testing ganeti.jqueue"""
23

    
24
import os
25
import sys
26
import unittest
27
import tempfile
28
import shutil
29
import errno
30
import itertools
31
import random
32
import operator
33

    
34
from ganeti import constants
35
from ganeti import utils
36
from ganeti import errors
37
from ganeti import jqueue
38
from ganeti import opcodes
39
from ganeti import compat
40
from ganeti import mcpu
41
from ganeti import query
42
from ganeti import workerpool
43

    
44
import testutils
45

    
46

    
47
class _FakeJob:
48
  def __init__(self, job_id, status):
49
    self.id = job_id
50
    self.writable = False
51
    self._status = status
52
    self._log = []
53

    
54
  def SetStatus(self, status):
55
    self._status = status
56

    
57
  def AddLogEntry(self, msg):
58
    self._log.append((len(self._log), msg))
59

    
60
  def CalcStatus(self):
61
    return self._status
62

    
63
  def GetInfo(self, fields):
64
    result = []
65

    
66
    for name in fields:
67
      if name == "status":
68
        result.append(self._status)
69
      else:
70
        raise Exception("Unknown field")
71

    
72
    return result
73

    
74
  def GetLogEntries(self, newer_than):
75
    assert newer_than is None or newer_than >= 0
76

    
77
    if newer_than is None:
78
      return self._log
79

    
80
    return self._log[newer_than:]
81

    
82

    
83
class TestJobChangesChecker(unittest.TestCase):
84
  def testStatus(self):
85
    job = _FakeJob(9094, constants.JOB_STATUS_QUEUED)
86
    checker = jqueue._JobChangesChecker(["status"], None, None)
87
    self.assertEqual(checker(job), ([constants.JOB_STATUS_QUEUED], []))
88

    
89
    job.SetStatus(constants.JOB_STATUS_RUNNING)
90
    self.assertEqual(checker(job), ([constants.JOB_STATUS_RUNNING], []))
91

    
92
    job.SetStatus(constants.JOB_STATUS_SUCCESS)
93
    self.assertEqual(checker(job), ([constants.JOB_STATUS_SUCCESS], []))
94

    
95
    # job.id is used by checker
96
    self.assertEqual(job.id, 9094)
97

    
98
  def testStatusWithPrev(self):
99
    job = _FakeJob(12807, constants.JOB_STATUS_QUEUED)
100
    checker = jqueue._JobChangesChecker(["status"],
101
                                        [constants.JOB_STATUS_QUEUED], None)
102
    self.assert_(checker(job) is None)
103

    
104
    job.SetStatus(constants.JOB_STATUS_RUNNING)
105
    self.assertEqual(checker(job), ([constants.JOB_STATUS_RUNNING], []))
106

    
107
  def testFinalStatus(self):
108
    for status in constants.JOBS_FINALIZED:
109
      job = _FakeJob(2178711, status)
110
      checker = jqueue._JobChangesChecker(["status"], [status], None)
111
      # There won't be any changes in this status, hence it should signal
112
      # a change immediately
113
      self.assertEqual(checker(job), ([status], []))
114

    
115
  def testLog(self):
116
    job = _FakeJob(9094, constants.JOB_STATUS_RUNNING)
117
    checker = jqueue._JobChangesChecker(["status"], None, None)
118
    self.assertEqual(checker(job), ([constants.JOB_STATUS_RUNNING], []))
119

    
120
    job.AddLogEntry("Hello World")
121
    (job_info, log_entries) = checker(job)
122
    self.assertEqual(job_info, [constants.JOB_STATUS_RUNNING])
123
    self.assertEqual(log_entries, [[0, "Hello World"]])
124

    
125
    checker2 = jqueue._JobChangesChecker(["status"], job_info, len(log_entries))
126
    self.assert_(checker2(job) is None)
127

    
128
    job.AddLogEntry("Foo Bar")
129
    job.SetStatus(constants.JOB_STATUS_ERROR)
130

    
131
    (job_info, log_entries) = checker2(job)
132
    self.assertEqual(job_info, [constants.JOB_STATUS_ERROR])
133
    self.assertEqual(log_entries, [[1, "Foo Bar"]])
134

    
135
    checker3 = jqueue._JobChangesChecker(["status"], None, None)
136
    (job_info, log_entries) = checker3(job)
137
    self.assertEqual(job_info, [constants.JOB_STATUS_ERROR])
138
    self.assertEqual(log_entries, [[0, "Hello World"], [1, "Foo Bar"]])
139

    
140

    
141
class TestJobChangesWaiter(unittest.TestCase):
142
  def setUp(self):
143
    self.tmpdir = tempfile.mkdtemp()
144
    self.filename = utils.PathJoin(self.tmpdir, "job-1")
145
    utils.WriteFile(self.filename, data="")
146

    
147
  def tearDown(self):
148
    shutil.rmtree(self.tmpdir)
149

    
150
  def _EnsureNotifierClosed(self, notifier):
151
    try:
152
      os.fstat(notifier._fd)
153
    except EnvironmentError, err:
154
      self.assertEqual(err.errno, errno.EBADF)
155
    else:
156
      self.fail("File descriptor wasn't closed")
157

    
158
  def testClose(self):
159
    for wait in [False, True]:
160
      waiter = jqueue._JobFileChangesWaiter(self.filename)
161
      try:
162
        if wait:
163
          waiter.Wait(0.001)
164
      finally:
165
        waiter.Close()
166

    
167
      # Ensure file descriptor was closed
168
      self._EnsureNotifierClosed(waiter._notifier)
169

    
170
  def testChangingFile(self):
171
    waiter = jqueue._JobFileChangesWaiter(self.filename)
172
    try:
173
      self.assertFalse(waiter.Wait(0.1))
174
      utils.WriteFile(self.filename, data="changed")
175
      self.assert_(waiter.Wait(60))
176
    finally:
177
      waiter.Close()
178

    
179
    self._EnsureNotifierClosed(waiter._notifier)
180

    
181
  def testChangingFile2(self):
182
    waiter = jqueue._JobChangesWaiter(self.filename)
183
    try:
184
      self.assertFalse(waiter._filewaiter)
185
      self.assert_(waiter.Wait(0.1))
186
      self.assert_(waiter._filewaiter)
187

    
188
      # File waiter is now used, but there have been no changes
189
      self.assertFalse(waiter.Wait(0.1))
190
      utils.WriteFile(self.filename, data="changed")
191
      self.assert_(waiter.Wait(60))
192
    finally:
193
      waiter.Close()
194

    
195
    self._EnsureNotifierClosed(waiter._filewaiter._notifier)
196

    
197

    
198
class TestWaitForJobChangesHelper(unittest.TestCase):
199
  def setUp(self):
200
    self.tmpdir = tempfile.mkdtemp()
201
    self.filename = utils.PathJoin(self.tmpdir, "job-2614226563")
202
    utils.WriteFile(self.filename, data="")
203

    
204
  def tearDown(self):
205
    shutil.rmtree(self.tmpdir)
206

    
207
  def _LoadWaitingJob(self):
208
    return _FakeJob(2614226563, constants.JOB_STATUS_WAITING)
209

    
210
  def _LoadLostJob(self):
211
    return None
212

    
213
  def testNoChanges(self):
214
    wfjc = jqueue._WaitForJobChangesHelper()
215

    
216
    # No change
217
    self.assertEqual(wfjc(self.filename, self._LoadWaitingJob, ["status"],
218
                          [constants.JOB_STATUS_WAITING], None, 0.1),
219
                     constants.JOB_NOTCHANGED)
220

    
221
    # No previous information
222
    self.assertEqual(wfjc(self.filename, self._LoadWaitingJob,
223
                          ["status"], None, None, 1.0),
224
                     ([constants.JOB_STATUS_WAITING], []))
225

    
226
  def testLostJob(self):
227
    wfjc = jqueue._WaitForJobChangesHelper()
228
    self.assert_(wfjc(self.filename, self._LoadLostJob,
229
                      ["status"], None, None, 1.0) is None)
230

    
231

    
232
class TestEncodeOpError(unittest.TestCase):
233
  def test(self):
234
    encerr = jqueue._EncodeOpError(errors.LockError("Test 1"))
235
    self.assert_(isinstance(encerr, tuple))
236
    self.assertRaises(errors.LockError, errors.MaybeRaise, encerr)
237

    
238
    encerr = jqueue._EncodeOpError(errors.GenericError("Test 2"))
239
    self.assert_(isinstance(encerr, tuple))
240
    self.assertRaises(errors.GenericError, errors.MaybeRaise, encerr)
241

    
242
    encerr = jqueue._EncodeOpError(NotImplementedError("Foo"))
243
    self.assert_(isinstance(encerr, tuple))
244
    self.assertRaises(errors.OpExecError, errors.MaybeRaise, encerr)
245

    
246
    encerr = jqueue._EncodeOpError("Hello World")
247
    self.assert_(isinstance(encerr, tuple))
248
    self.assertRaises(errors.OpExecError, errors.MaybeRaise, encerr)
249

    
250

    
251
class TestQueuedOpCode(unittest.TestCase):
252
  def testDefaults(self):
253
    def _Check(op):
254
      self.assertFalse(hasattr(op.input, "dry_run"))
255
      self.assertEqual(op.priority, constants.OP_PRIO_DEFAULT)
256
      self.assertFalse(op.log)
257
      self.assert_(op.start_timestamp is None)
258
      self.assert_(op.exec_timestamp is None)
259
      self.assert_(op.end_timestamp is None)
260
      self.assert_(op.result is None)
261
      self.assertEqual(op.status, constants.OP_STATUS_QUEUED)
262

    
263
    op1 = jqueue._QueuedOpCode(opcodes.OpTestDelay())
264
    _Check(op1)
265
    op2 = jqueue._QueuedOpCode.Restore(op1.Serialize())
266
    _Check(op2)
267
    self.assertEqual(op1.Serialize(), op2.Serialize())
268

    
269
  def testPriority(self):
270
    def _Check(op):
271
      assert constants.OP_PRIO_DEFAULT != constants.OP_PRIO_HIGH, \
272
             "Default priority equals high priority; test can't work"
273
      self.assertEqual(op.priority, constants.OP_PRIO_HIGH)
274
      self.assertEqual(op.status, constants.OP_STATUS_QUEUED)
275

    
276
    inpop = opcodes.OpTagsGet(priority=constants.OP_PRIO_HIGH)
277
    op1 = jqueue._QueuedOpCode(inpop)
278
    _Check(op1)
279
    op2 = jqueue._QueuedOpCode.Restore(op1.Serialize())
280
    _Check(op2)
281
    self.assertEqual(op1.Serialize(), op2.Serialize())
282

    
283

    
284
class TestQueuedJob(unittest.TestCase):
285
  def testNoOpCodes(self):
286
    self.assertRaises(errors.GenericError, jqueue._QueuedJob,
287
                      None, 1, [], False)
288

    
289
  def testDefaults(self):
290
    job_id = 4260
291
    ops = [
292
      opcodes.OpTagsGet(),
293
      opcodes.OpTestDelay(),
294
      ]
295

    
296
    def _Check(job):
297
      self.assertTrue(job.writable)
298
      self.assertEqual(job.id, job_id)
299
      self.assertEqual(job.log_serial, 0)
300
      self.assert_(job.received_timestamp)
301
      self.assert_(job.start_timestamp is None)
302
      self.assert_(job.end_timestamp is None)
303
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
304
      self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT)
305
      self.assert_(repr(job).startswith("<"))
306
      self.assertEqual(len(job.ops), len(ops))
307
      self.assert_(compat.all(inp.__getstate__() == op.input.__getstate__()
308
                              for (inp, op) in zip(ops, job.ops)))
309
      self.assertRaises(errors.OpPrereqError, job.GetInfo,
310
                        ["unknown-field"])
311
      self.assertEqual(job.GetInfo(["summary"]),
312
                       [[op.input.Summary() for op in job.ops]])
313
      self.assertFalse(job.archived)
314

    
315
    job1 = jqueue._QueuedJob(None, job_id, ops, True)
316
    _Check(job1)
317
    job2 = jqueue._QueuedJob.Restore(None, job1.Serialize(), True, False)
318
    _Check(job2)
319
    self.assertEqual(job1.Serialize(), job2.Serialize())
320

    
321
  def testWritable(self):
322
    job = jqueue._QueuedJob(None, 1, [opcodes.OpTestDelay()], False)
323
    self.assertFalse(job.writable)
324

    
325
    job = jqueue._QueuedJob(None, 1, [opcodes.OpTestDelay()], True)
326
    self.assertTrue(job.writable)
327

    
328
  def testArchived(self):
329
    job = jqueue._QueuedJob(None, 1, [opcodes.OpTestDelay()], False)
330
    self.assertFalse(job.archived)
331

    
332
    newjob = jqueue._QueuedJob.Restore(None, job.Serialize(), True, True)
333
    self.assertTrue(newjob.archived)
334

    
335
    newjob2 = jqueue._QueuedJob.Restore(None, newjob.Serialize(), True, False)
336
    self.assertFalse(newjob2.archived)
337

    
338
  def testPriority(self):
339
    job_id = 4283
340
    ops = [
341
      opcodes.OpTagsGet(priority=constants.OP_PRIO_DEFAULT),
342
      opcodes.OpTestDelay(),
343
      ]
344

    
345
    def _Check(job):
346
      self.assertEqual(job.id, job_id)
347
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
348
      self.assert_(repr(job).startswith("<"))
349

    
350
    job = jqueue._QueuedJob(None, job_id, ops, True)
351
    _Check(job)
352
    self.assert_(compat.all(op.priority == constants.OP_PRIO_DEFAULT
353
                            for op in job.ops))
354
    self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT)
355

    
356
    # Increase first
357
    job.ops[0].priority -= 1
358
    _Check(job)
359
    self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT - 1)
360

    
361
    # Mark opcode as finished
362
    job.ops[0].status = constants.OP_STATUS_SUCCESS
363
    _Check(job)
364
    self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT)
365

    
366
    # Increase second
367
    job.ops[1].priority -= 10
368
    self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT - 10)
369

    
370
    # Test increasing first
371
    job.ops[0].status = constants.OP_STATUS_RUNNING
372
    job.ops[0].priority -= 19
373
    self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT - 20)
374

    
375
  def _JobForPriority(self, job_id):
376
    ops = [
377
      opcodes.OpTagsGet(),
378
      opcodes.OpTestDelay(),
379
      opcodes.OpTagsGet(),
380
      opcodes.OpTestDelay(),
381
      ]
382

    
383
    job = jqueue._QueuedJob(None, job_id, ops, True)
384

    
385
    self.assertTrue(compat.all(op.priority == constants.OP_PRIO_DEFAULT
386
                               for op in job.ops))
387
    self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT)
388
    self.assertFalse(compat.any(hasattr(op.input, "priority")
389
                                for op in job.ops))
390

    
391
    return job
392

    
393
  def testChangePriorityAllQueued(self):
394
    job = self._JobForPriority(24984)
395
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
396
    self.assertTrue(compat.all(op.status == constants.OP_STATUS_QUEUED
397
                               for op in job.ops))
398
    result = job.ChangePriority(-10)
399
    self.assertEqual(job.CalcPriority(), -10)
400
    self.assertTrue(compat.all(op.priority == -10 for op in job.ops))
401
    self.assertTrue(compat.all(op.input.priority == -10 for op in job.ops))
402
    self.assertEqual(result,
403
                     (True, ("Priorities of pending opcodes for job 24984 have"
404
                             " been changed to -10")))
405

    
406
  def testChangePriorityAllFinished(self):
407
    job = self._JobForPriority(16405)
408

    
409
    for (idx, op) in enumerate(job.ops):
410
      if idx > 2:
411
        op.status = constants.OP_STATUS_ERROR
412
      else:
413
        op.status = constants.OP_STATUS_SUCCESS
414

    
415
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_ERROR)
416
    self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT)
417
    result = job.ChangePriority(-10)
418
    self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT)
419
    self.assertTrue(compat.all(op.priority == constants.OP_PRIO_DEFAULT
420
                               for op in job.ops))
421
    self.assertFalse(compat.any(hasattr(op.input, "priority")
422
                                for op in job.ops))
423
    self.assertEqual(map(operator.attrgetter("status"), job.ops), [
424
      constants.OP_STATUS_SUCCESS,
425
      constants.OP_STATUS_SUCCESS,
426
      constants.OP_STATUS_SUCCESS,
427
      constants.OP_STATUS_ERROR,
428
      ])
429
    self.assertEqual(result, (False, "Job 16405 is finished"))
430

    
431
  def testChangePriorityCancelling(self):
432
    job = self._JobForPriority(31572)
433

    
434
    for (idx, op) in enumerate(job.ops):
435
      if idx > 1:
436
        op.status = constants.OP_STATUS_CANCELING
437
      else:
438
        op.status = constants.OP_STATUS_SUCCESS
439

    
440
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_CANCELING)
441
    self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT)
442
    result = job.ChangePriority(5)
443
    self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT)
444
    self.assertTrue(compat.all(op.priority == constants.OP_PRIO_DEFAULT
445
                               for op in job.ops))
446
    self.assertFalse(compat.any(hasattr(op.input, "priority")
447
                                for op in job.ops))
448
    self.assertEqual(map(operator.attrgetter("status"), job.ops), [
449
      constants.OP_STATUS_SUCCESS,
450
      constants.OP_STATUS_SUCCESS,
451
      constants.OP_STATUS_CANCELING,
452
      constants.OP_STATUS_CANCELING,
453
      ])
454
    self.assertEqual(result, (False, "Job 31572 is cancelling"))
455

    
456
  def testChangePriorityFirstRunning(self):
457
    job = self._JobForPriority(1716215889)
458

    
459
    for (idx, op) in enumerate(job.ops):
460
      if idx == 0:
461
        op.status = constants.OP_STATUS_RUNNING
462
      else:
463
        op.status = constants.OP_STATUS_QUEUED
464

    
465
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
466
    self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT)
467
    result = job.ChangePriority(7)
468
    self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT)
469
    self.assertEqual(map(operator.attrgetter("priority"), job.ops),
470
                     [constants.OP_PRIO_DEFAULT, 7, 7, 7])
471
    self.assertEqual([getattr(op.input, "priority", None) for op in job.ops],
472
                     [None, 7, 7, 7])
473
    self.assertEqual(map(operator.attrgetter("status"), job.ops), [
474
      constants.OP_STATUS_RUNNING,
475
      constants.OP_STATUS_QUEUED,
476
      constants.OP_STATUS_QUEUED,
477
      constants.OP_STATUS_QUEUED,
478
      ])
479
    self.assertEqual(result,
480
                     (True, ("Priorities of pending opcodes for job"
481
                             " 1716215889 have been changed to 7")))
482

    
483
  def testChangePriorityLastRunning(self):
484
    job = self._JobForPriority(1308)
485

    
486
    for (idx, op) in enumerate(job.ops):
487
      if idx == (len(job.ops) - 1):
488
        op.status = constants.OP_STATUS_RUNNING
489
      else:
490
        op.status = constants.OP_STATUS_SUCCESS
491

    
492
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
493
    self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT)
494
    result = job.ChangePriority(-3)
495
    self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT)
496
    self.assertTrue(compat.all(op.priority == constants.OP_PRIO_DEFAULT
497
                               for op in job.ops))
498
    self.assertFalse(compat.any(hasattr(op.input, "priority")
499
                                for op in job.ops))
500
    self.assertEqual(map(operator.attrgetter("status"), job.ops), [
501
      constants.OP_STATUS_SUCCESS,
502
      constants.OP_STATUS_SUCCESS,
503
      constants.OP_STATUS_SUCCESS,
504
      constants.OP_STATUS_RUNNING,
505
      ])
506
    self.assertEqual(result, (False, "Job 1308 had no pending opcodes"))
507

    
508
  def testChangePrioritySecondOpcodeRunning(self):
509
    job = self._JobForPriority(27701)
510

    
511
    self.assertEqual(len(job.ops), 4)
512
    job.ops[0].status = constants.OP_STATUS_SUCCESS
513
    job.ops[1].status = constants.OP_STATUS_RUNNING
514
    job.ops[2].status = constants.OP_STATUS_QUEUED
515
    job.ops[3].status = constants.OP_STATUS_QUEUED
516

    
517
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
518
    result = job.ChangePriority(-19)
519
    self.assertEqual(job.CalcPriority(), -19)
520
    self.assertEqual(map(operator.attrgetter("priority"), job.ops),
521
                     [constants.OP_PRIO_DEFAULT, constants.OP_PRIO_DEFAULT,
522
                      -19, -19])
523
    self.assertEqual([getattr(op.input, "priority", None) for op in job.ops],
524
                     [None, None, -19, -19])
525
    self.assertEqual(map(operator.attrgetter("status"), job.ops), [
526
      constants.OP_STATUS_SUCCESS,
527
      constants.OP_STATUS_RUNNING,
528
      constants.OP_STATUS_QUEUED,
529
      constants.OP_STATUS_QUEUED,
530
      ])
531
    self.assertEqual(result,
532
                     (True, ("Priorities of pending opcodes for job"
533
                             " 27701 have been changed to -19")))
534

    
535
  def testChangePriorityWithInconsistentJob(self):
536
    job = self._JobForPriority(30097)
537

    
538
    self.assertEqual(len(job.ops), 4)
539

    
540
    # This job is invalid (as it has two opcodes marked as running) and make
541
    # the call fail because an unprocessed opcode precedes a running one (which
542
    # should never happen in reality)
543
    job.ops[0].status = constants.OP_STATUS_SUCCESS
544
    job.ops[1].status = constants.OP_STATUS_RUNNING
545
    job.ops[2].status = constants.OP_STATUS_QUEUED
546
    job.ops[3].status = constants.OP_STATUS_RUNNING
547

    
548
    self.assertRaises(AssertionError, job.ChangePriority, 19)
549

    
550
  def testCalcStatus(self):
551
    def _Queued(ops):
552
      # The default status is "queued"
553
      self.assert_(compat.all(op.status == constants.OP_STATUS_QUEUED
554
                              for op in ops))
555

    
556
    def _Waitlock1(ops):
557
      ops[0].status = constants.OP_STATUS_WAITING
558

    
559
    def _Waitlock2(ops):
560
      ops[0].status = constants.OP_STATUS_SUCCESS
561
      ops[1].status = constants.OP_STATUS_SUCCESS
562
      ops[2].status = constants.OP_STATUS_WAITING
563

    
564
    def _Running(ops):
565
      ops[0].status = constants.OP_STATUS_SUCCESS
566
      ops[1].status = constants.OP_STATUS_RUNNING
567
      for op in ops[2:]:
568
        op.status = constants.OP_STATUS_QUEUED
569

    
570
    def _Canceling1(ops):
571
      ops[0].status = constants.OP_STATUS_SUCCESS
572
      ops[1].status = constants.OP_STATUS_SUCCESS
573
      for op in ops[2:]:
574
        op.status = constants.OP_STATUS_CANCELING
575

    
576
    def _Canceling2(ops):
577
      for op in ops:
578
        op.status = constants.OP_STATUS_CANCELING
579

    
580
    def _Canceled(ops):
581
      for op in ops:
582
        op.status = constants.OP_STATUS_CANCELED
583

    
584
    def _Error1(ops):
585
      for idx, op in enumerate(ops):
586
        if idx > 3:
587
          op.status = constants.OP_STATUS_ERROR
588
        else:
589
          op.status = constants.OP_STATUS_SUCCESS
590

    
591
    def _Error2(ops):
592
      for op in ops:
593
        op.status = constants.OP_STATUS_ERROR
594

    
595
    def _Success(ops):
596
      for op in ops:
597
        op.status = constants.OP_STATUS_SUCCESS
598

    
599
    tests = {
600
      constants.JOB_STATUS_QUEUED: [_Queued],
601
      constants.JOB_STATUS_WAITING: [_Waitlock1, _Waitlock2],
602
      constants.JOB_STATUS_RUNNING: [_Running],
603
      constants.JOB_STATUS_CANCELING: [_Canceling1, _Canceling2],
604
      constants.JOB_STATUS_CANCELED: [_Canceled],
605
      constants.JOB_STATUS_ERROR: [_Error1, _Error2],
606
      constants.JOB_STATUS_SUCCESS: [_Success],
607
      }
608

    
609
    def _NewJob():
610
      job = jqueue._QueuedJob(None, 1,
611
                              [opcodes.OpTestDelay() for _ in range(10)],
612
                              True)
613
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
614
      self.assert_(compat.all(op.status == constants.OP_STATUS_QUEUED
615
                              for op in job.ops))
616
      return job
617

    
618
    for status in constants.JOB_STATUS_ALL:
619
      sttests = tests[status]
620
      assert sttests
621
      for fn in sttests:
622
        job = _NewJob()
623
        fn(job.ops)
624
        self.assertEqual(job.CalcStatus(), status)
625

    
626

    
627
class _FakeDependencyManager:
628
  def __init__(self):
629
    self._checks = []
630
    self._notifications = []
631
    self._waiting = set()
632

    
633
  def AddCheckResult(self, job, dep_job_id, dep_status, result):
634
    self._checks.append((job, dep_job_id, dep_status, result))
635

    
636
  def CountPendingResults(self):
637
    return len(self._checks)
638

    
639
  def CountWaitingJobs(self):
640
    return len(self._waiting)
641

    
642
  def GetNextNotification(self):
643
    return self._notifications.pop(0)
644

    
645
  def JobWaiting(self, job):
646
    return job in self._waiting
647

    
648
  def CheckAndRegister(self, job, dep_job_id, dep_status):
649
    (exp_job, exp_dep_job_id, exp_dep_status, result) = self._checks.pop(0)
650

    
651
    assert exp_job == job
652
    assert exp_dep_job_id == dep_job_id
653
    assert exp_dep_status == dep_status
654

    
655
    (result_status, _) = result
656

    
657
    if result_status == jqueue._JobDependencyManager.WAIT:
658
      self._waiting.add(job)
659
    elif result_status == jqueue._JobDependencyManager.CONTINUE:
660
      self._waiting.remove(job)
661

    
662
    return result
663

    
664
  def NotifyWaiters(self, job_id):
665
    self._notifications.append(job_id)
666

    
667

    
668
class _DisabledFakeDependencyManager:
669
  def JobWaiting(self, _):
670
    return False
671

    
672
  def CheckAndRegister(self, *args):
673
    assert False, "Should not be called"
674

    
675
  def NotifyWaiters(self, _):
676
    pass
677

    
678

    
679
class _FakeQueueForProc:
680
  def __init__(self, depmgr=None):
681
    self._acquired = False
682
    self._updates = []
683
    self._submitted = []
684
    self._accepting_jobs = True
685

    
686
    self._submit_count = itertools.count(1000)
687

    
688
    if depmgr:
689
      self.depmgr = depmgr
690
    else:
691
      self.depmgr = _DisabledFakeDependencyManager()
692

    
693
  def IsAcquired(self):
694
    return self._acquired
695

    
696
  def GetNextUpdate(self):
697
    return self._updates.pop(0)
698

    
699
  def GetNextSubmittedJob(self):
700
    return self._submitted.pop(0)
701

    
702
  def acquire(self, shared=0):
703
    assert shared == 1
704
    self._acquired = True
705

    
706
  def release(self):
707
    assert self._acquired
708
    self._acquired = False
709

    
710
  def UpdateJobUnlocked(self, job, replicate=True):
711
    assert self._acquired, "Lock not acquired while updating job"
712
    self._updates.append((job, bool(replicate)))
713

    
714
  def SubmitManyJobs(self, jobs):
715
    assert not self._acquired, "Lock acquired while submitting jobs"
716
    job_ids = [self._submit_count.next() for _ in jobs]
717
    self._submitted.extend(zip(job_ids, jobs))
718
    return job_ids
719

    
720
  def StopAcceptingJobs(self):
721
    self._accepting_jobs = False
722

    
723
  def AcceptingJobsUnlocked(self):
724
    return self._accepting_jobs
725

    
726

    
727
class _FakeExecOpCodeForProc:
728
  def __init__(self, queue, before_start, after_start):
729
    self._queue = queue
730
    self._before_start = before_start
731
    self._after_start = after_start
732

    
733
  def __call__(self, op, cbs, timeout=None):
734
    assert isinstance(op, opcodes.OpTestDummy)
735
    assert not self._queue.IsAcquired(), \
736
           "Queue lock not released when executing opcode"
737

    
738
    if self._before_start:
739
      self._before_start(timeout, cbs.CurrentPriority())
740

    
741
    cbs.NotifyStart()
742

    
743
    if self._after_start:
744
      self._after_start(op, cbs)
745

    
746
    # Check again after the callbacks
747
    assert not self._queue.IsAcquired()
748

    
749
    if op.fail:
750
      raise errors.OpExecError("Error requested (%s)" % op.result)
751

    
752
    if hasattr(op, "submit_jobs") and op.submit_jobs is not None:
753
      return cbs.SubmitManyJobs(op.submit_jobs)
754

    
755
    return op.result
756

    
757

    
758
class _JobProcessorTestUtils:
759
  def _CreateJob(self, queue, job_id, ops):
760
    job = jqueue._QueuedJob(queue, job_id, ops, True)
761
    self.assertFalse(job.start_timestamp)
762
    self.assertFalse(job.end_timestamp)
763
    self.assertEqual(len(ops), len(job.ops))
764
    self.assert_(compat.all(op.input == inp
765
                            for (op, inp) in zip(job.ops, ops)))
766
    self.assertEqual(job.GetInfo(["ops"]), [[op.__getstate__() for op in ops]])
767
    return job
768

    
769

    
770
class TestJobProcessor(unittest.TestCase, _JobProcessorTestUtils):
771
  def _GenericCheckJob(self, job):
772
    assert compat.all(isinstance(op.input, opcodes.OpTestDummy)
773
                      for op in job.ops)
774

    
775
    self.assertEqual(job.GetInfo(["opstart", "opexec", "opend"]),
776
                     [[op.start_timestamp for op in job.ops],
777
                      [op.exec_timestamp for op in job.ops],
778
                      [op.end_timestamp for op in job.ops]])
779
    self.assertEqual(job.GetInfo(["received_ts", "start_ts", "end_ts"]),
780
                     [job.received_timestamp,
781
                      job.start_timestamp,
782
                      job.end_timestamp])
783
    self.assert_(job.start_timestamp)
784
    self.assert_(job.end_timestamp)
785
    self.assertEqual(job.start_timestamp, job.ops[0].start_timestamp)
786

    
787
  def testSuccess(self):
788
    queue = _FakeQueueForProc()
789

    
790
    for (job_id, opcount) in [(25351, 1), (6637, 3),
791
                              (24644, 10), (32207, 100)]:
792
      ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
793
             for i in range(opcount)]
794

    
795
      # Create job
796
      job = self._CreateJob(queue, job_id, ops)
797

    
798
      def _BeforeStart(timeout, priority):
799
        self.assertEqual(queue.GetNextUpdate(), (job, True))
800
        self.assertRaises(IndexError, queue.GetNextUpdate)
801
        self.assertFalse(queue.IsAcquired())
802
        self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITING)
803
        self.assertFalse(job.cur_opctx)
804

    
805
      def _AfterStart(op, cbs):
806
        self.assertEqual(queue.GetNextUpdate(), (job, True))
807
        self.assertRaises(IndexError, queue.GetNextUpdate)
808

    
809
        self.assertFalse(queue.IsAcquired())
810
        self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
811
        self.assertFalse(job.cur_opctx)
812

    
813
        # Job is running, cancelling shouldn't be possible
814
        (success, _) = job.Cancel()
815
        self.assertFalse(success)
816

    
817
      opexec = _FakeExecOpCodeForProc(queue, _BeforeStart, _AfterStart)
818

    
819
      for idx in range(len(ops)):
820
        self.assertRaises(IndexError, queue.GetNextUpdate)
821
        result = jqueue._JobProcessor(queue, opexec, job)()
822
        self.assertEqual(queue.GetNextUpdate(), (job, True))
823
        self.assertRaises(IndexError, queue.GetNextUpdate)
824
        if idx == len(ops) - 1:
825
          # Last opcode
826
          self.assertEqual(result, jqueue._JobProcessor.FINISHED)
827
        else:
828
          self.assertEqual(result, jqueue._JobProcessor.DEFER)
829

    
830
          self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
831
          self.assert_(job.start_timestamp)
832
          self.assertFalse(job.end_timestamp)
833

    
834
      self.assertRaises(IndexError, queue.GetNextUpdate)
835

    
836
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_SUCCESS)
837
      self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_SUCCESS])
838
      self.assertEqual(job.GetInfo(["opresult"]),
839
                       [[op.input.result for op in job.ops]])
840
      self.assertEqual(job.GetInfo(["opstatus"]),
841
                       [len(job.ops) * [constants.OP_STATUS_SUCCESS]])
842
      self.assert_(compat.all(op.start_timestamp and op.end_timestamp
843
                              for op in job.ops))
844

    
845
      self._GenericCheckJob(job)
846

    
847
      # Calling the processor on a finished job should be a no-op
848
      self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
849
                       jqueue._JobProcessor.FINISHED)
850
      self.assertRaises(IndexError, queue.GetNextUpdate)
851

    
852
  def testOpcodeError(self):
853
    queue = _FakeQueueForProc()
854

    
855
    testdata = [
856
      (17077, 1, 0, 0),
857
      (1782, 5, 2, 2),
858
      (18179, 10, 9, 9),
859
      (4744, 10, 3, 8),
860
      (23816, 100, 39, 45),
861
      ]
862

    
863
    for (job_id, opcount, failfrom, failto) in testdata:
864
      # Prepare opcodes
865
      ops = [opcodes.OpTestDummy(result="Res%s" % i,
866
                                 fail=(failfrom <= i and
867
                                       i <= failto))
868
             for i in range(opcount)]
869

    
870
      # Create job
871
      job = self._CreateJob(queue, str(job_id), ops)
872

    
873
      opexec = _FakeExecOpCodeForProc(queue, None, None)
874

    
875
      for idx in range(len(ops)):
876
        self.assertRaises(IndexError, queue.GetNextUpdate)
877
        result = jqueue._JobProcessor(queue, opexec, job)()
878
        # queued to waitlock
879
        self.assertEqual(queue.GetNextUpdate(), (job, True))
880
        # waitlock to running
881
        self.assertEqual(queue.GetNextUpdate(), (job, True))
882
        # Opcode result
883
        self.assertEqual(queue.GetNextUpdate(), (job, True))
884
        self.assertRaises(IndexError, queue.GetNextUpdate)
885

    
886
        if idx in (failfrom, len(ops) - 1):
887
          # Last opcode
888
          self.assertEqual(result, jqueue._JobProcessor.FINISHED)
889
          break
890

    
891
        self.assertEqual(result, jqueue._JobProcessor.DEFER)
892

    
893
        self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
894

    
895
      self.assertRaises(IndexError, queue.GetNextUpdate)
896

    
897
      # Check job status
898
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_ERROR)
899
      self.assertEqual(job.GetInfo(["id"]), [job_id])
900
      self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_ERROR])
901

    
902
      # Check opcode status
903
      data = zip(job.ops,
904
                 job.GetInfo(["opstatus"])[0],
905
                 job.GetInfo(["opresult"])[0])
906

    
907
      for idx, (op, opstatus, opresult) in enumerate(data):
908
        if idx < failfrom:
909
          assert not op.input.fail
910
          self.assertEqual(opstatus, constants.OP_STATUS_SUCCESS)
911
          self.assertEqual(opresult, op.input.result)
912
        elif idx <= failto:
913
          assert op.input.fail
914
          self.assertEqual(opstatus, constants.OP_STATUS_ERROR)
915
          self.assertRaises(errors.OpExecError, errors.MaybeRaise, opresult)
916
        else:
917
          assert not op.input.fail
918
          self.assertEqual(opstatus, constants.OP_STATUS_ERROR)
919
          self.assertRaises(errors.OpExecError, errors.MaybeRaise, opresult)
920

    
921
      self.assert_(compat.all(op.start_timestamp and op.end_timestamp
922
                              for op in job.ops[:failfrom]))
923

    
924
      self._GenericCheckJob(job)
925

    
926
      # Calling the processor on a finished job should be a no-op
927
      self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
928
                       jqueue._JobProcessor.FINISHED)
929
      self.assertRaises(IndexError, queue.GetNextUpdate)
930

    
931
  def testCancelWhileInQueue(self):
932
    queue = _FakeQueueForProc()
933

    
934
    ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
935
           for i in range(5)]
936

    
937
    # Create job
938
    job_id = 17045
939
    job = self._CreateJob(queue, job_id, ops)
940

    
941
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
942

    
943
    # Mark as cancelled
944
    (success, _) = job.Cancel()
945
    self.assert_(success)
946

    
947
    self.assertRaises(IndexError, queue.GetNextUpdate)
948

    
949
    self.assertFalse(job.start_timestamp)
950
    self.assertTrue(job.end_timestamp)
951
    self.assert_(compat.all(op.status == constants.OP_STATUS_CANCELED
952
                            for op in job.ops))
953

    
954
    # Serialize to check for differences
955
    before_proc = job.Serialize()
956

    
957
    # Simulate processor called in workerpool
958
    opexec = _FakeExecOpCodeForProc(queue, None, None)
959
    self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
960
                     jqueue._JobProcessor.FINISHED)
961

    
962
    # Check result
963
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_CANCELED)
964
    self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_CANCELED])
965
    self.assertFalse(job.start_timestamp)
966
    self.assertTrue(job.end_timestamp)
967
    self.assertFalse(compat.any(op.start_timestamp or op.end_timestamp
968
                                for op in job.ops))
969
    self.assertEqual(job.GetInfo(["opstatus", "opresult"]),
970
                     [[constants.OP_STATUS_CANCELED for _ in job.ops],
971
                      ["Job canceled by request" for _ in job.ops]])
972

    
973
    # Must not have changed or written
974
    self.assertEqual(before_proc, job.Serialize())
975
    self.assertRaises(IndexError, queue.GetNextUpdate)
976

    
977
  def testCancelWhileWaitlockInQueue(self):
978
    queue = _FakeQueueForProc()
979

    
980
    ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
981
           for i in range(5)]
982

    
983
    # Create job
984
    job_id = 8645
985
    job = self._CreateJob(queue, job_id, ops)
986

    
987
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
988

    
989
    job.ops[0].status = constants.OP_STATUS_WAITING
990

    
991
    assert len(job.ops) == 5
992

    
993
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITING)
994

    
995
    # Mark as cancelling
996
    (success, _) = job.Cancel()
997
    self.assert_(success)
998

    
999
    self.assertRaises(IndexError, queue.GetNextUpdate)
1000

    
1001
    self.assert_(compat.all(op.status == constants.OP_STATUS_CANCELING
1002
                            for op in job.ops))
1003

    
1004
    opexec = _FakeExecOpCodeForProc(queue, None, None)
1005
    self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
1006
                     jqueue._JobProcessor.FINISHED)
1007

    
1008
    # Check result
1009
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_CANCELED)
1010
    self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_CANCELED])
1011
    self.assertFalse(job.start_timestamp)
1012
    self.assert_(job.end_timestamp)
1013
    self.assertFalse(compat.any(op.start_timestamp or op.end_timestamp
1014
                                for op in job.ops))
1015
    self.assertEqual(job.GetInfo(["opstatus", "opresult"]),
1016
                     [[constants.OP_STATUS_CANCELED for _ in job.ops],
1017
                      ["Job canceled by request" for _ in job.ops]])
1018

    
1019
  def testCancelWhileWaitlock(self):
1020
    queue = _FakeQueueForProc()
1021

    
1022
    ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
1023
           for i in range(5)]
1024

    
1025
    # Create job
1026
    job_id = 11009
1027
    job = self._CreateJob(queue, job_id, ops)
1028

    
1029
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1030

    
1031
    def _BeforeStart(timeout, priority):
1032
      self.assertEqual(queue.GetNextUpdate(), (job, True))
1033
      self.assertRaises(IndexError, queue.GetNextUpdate)
1034
      self.assertFalse(queue.IsAcquired())
1035
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITING)
1036

    
1037
      # Mark as cancelled
1038
      (success, _) = job.Cancel()
1039
      self.assert_(success)
1040

    
1041
      self.assert_(compat.all(op.status == constants.OP_STATUS_CANCELING
1042
                              for op in job.ops))
1043
      self.assertRaises(IndexError, queue.GetNextUpdate)
1044

    
1045
    def _AfterStart(op, cbs):
1046
      self.assertEqual(queue.GetNextUpdate(), (job, True))
1047
      self.assertRaises(IndexError, queue.GetNextUpdate)
1048
      self.assertFalse(queue.IsAcquired())
1049
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
1050

    
1051
    opexec = _FakeExecOpCodeForProc(queue, _BeforeStart, _AfterStart)
1052

    
1053
    self.assertRaises(IndexError, queue.GetNextUpdate)
1054
    self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
1055
                     jqueue._JobProcessor.FINISHED)
1056
    self.assertEqual(queue.GetNextUpdate(), (job, True))
1057
    self.assertRaises(IndexError, queue.GetNextUpdate)
1058

    
1059
    # Check result
1060
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_CANCELED)
1061
    self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_CANCELED])
1062
    self.assert_(job.start_timestamp)
1063
    self.assert_(job.end_timestamp)
1064
    self.assertFalse(compat.all(op.start_timestamp and op.end_timestamp
1065
                                for op in job.ops))
1066
    self.assertEqual(job.GetInfo(["opstatus", "opresult"]),
1067
                     [[constants.OP_STATUS_CANCELED for _ in job.ops],
1068
                      ["Job canceled by request" for _ in job.ops]])
1069

    
1070
  def _TestCancelWhileSomething(self, cb):
1071
    queue = _FakeQueueForProc()
1072

    
1073
    ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
1074
           for i in range(5)]
1075

    
1076
    # Create job
1077
    job_id = 24314
1078
    job = self._CreateJob(queue, job_id, ops)
1079

    
1080
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1081

    
1082
    def _BeforeStart(timeout, priority):
1083
      self.assertFalse(queue.IsAcquired())
1084
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITING)
1085

    
1086
      # Mark as cancelled
1087
      (success, _) = job.Cancel()
1088
      self.assert_(success)
1089

    
1090
      self.assert_(compat.all(op.status == constants.OP_STATUS_CANCELING
1091
                              for op in job.ops))
1092

    
1093
      cb(queue)
1094

    
1095
    def _AfterStart(op, cbs):
1096
      self.fail("Should not reach this")
1097

    
1098
    opexec = _FakeExecOpCodeForProc(queue, _BeforeStart, _AfterStart)
1099

    
1100
    self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
1101
                     jqueue._JobProcessor.FINISHED)
1102

    
1103
    # Check result
1104
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_CANCELED)
1105
    self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_CANCELED])
1106
    self.assert_(job.start_timestamp)
1107
    self.assert_(job.end_timestamp)
1108
    self.assertFalse(compat.all(op.start_timestamp and op.end_timestamp
1109
                                for op in job.ops))
1110
    self.assertEqual(job.GetInfo(["opstatus", "opresult"]),
1111
                     [[constants.OP_STATUS_CANCELED for _ in job.ops],
1112
                      ["Job canceled by request" for _ in job.ops]])
1113

    
1114
    return queue
1115

    
1116
  def testCancelWhileWaitlockWithTimeout(self):
1117
    def fn(_):
1118
      # Fake an acquire attempt timing out
1119
      raise mcpu.LockAcquireTimeout()
1120

    
1121
    self._TestCancelWhileSomething(fn)
1122

    
1123
  def testCancelDuringQueueShutdown(self):
1124
    queue = self._TestCancelWhileSomething(lambda q: q.StopAcceptingJobs())
1125
    self.assertFalse(queue.AcceptingJobsUnlocked())
1126

    
1127
  def testCancelWhileRunning(self):
1128
    # Tests canceling a job with finished opcodes and more, unprocessed ones
1129
    queue = _FakeQueueForProc()
1130

    
1131
    ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
1132
           for i in range(3)]
1133

    
1134
    # Create job
1135
    job_id = 28492
1136
    job = self._CreateJob(queue, job_id, ops)
1137

    
1138
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1139

    
1140
    opexec = _FakeExecOpCodeForProc(queue, None, None)
1141

    
1142
    # Run one opcode
1143
    self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
1144
                     jqueue._JobProcessor.DEFER)
1145

    
1146
    # Job goes back to queued
1147
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1148
    self.assertEqual(job.GetInfo(["opstatus", "opresult"]),
1149
                     [[constants.OP_STATUS_SUCCESS,
1150
                       constants.OP_STATUS_QUEUED,
1151
                       constants.OP_STATUS_QUEUED],
1152
                      ["Res0", None, None]])
1153

    
1154
    # Mark as cancelled
1155
    (success, _) = job.Cancel()
1156
    self.assert_(success)
1157

    
1158
    # Try processing another opcode (this will actually cancel the job)
1159
    self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
1160
                     jqueue._JobProcessor.FINISHED)
1161

    
1162
    # Check result
1163
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_CANCELED)
1164
    self.assertEqual(job.GetInfo(["id"]), [job_id])
1165
    self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_CANCELED])
1166
    self.assertEqual(job.GetInfo(["opstatus", "opresult"]),
1167
                     [[constants.OP_STATUS_SUCCESS,
1168
                       constants.OP_STATUS_CANCELED,
1169
                       constants.OP_STATUS_CANCELED],
1170
                      ["Res0", "Job canceled by request",
1171
                       "Job canceled by request"]])
1172

    
1173
  def _TestQueueShutdown(self, queue, opexec, job, runcount):
1174
    self.assertTrue(queue.AcceptingJobsUnlocked())
1175

    
1176
    # Simulate shutdown
1177
    queue.StopAcceptingJobs()
1178

    
1179
    self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
1180
                     jqueue._JobProcessor.DEFER)
1181

    
1182
    # Check result
1183
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1184
    self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_QUEUED])
1185
    self.assertFalse(job.cur_opctx)
1186
    self.assertTrue(job.start_timestamp)
1187
    self.assertFalse(job.end_timestamp)
1188
    self.assertEqual(job.start_timestamp, job.ops[0].start_timestamp)
1189
    self.assertTrue(compat.all(op.start_timestamp and op.end_timestamp
1190
                               for op in job.ops[:runcount]))
1191
    self.assertFalse(job.ops[runcount].end_timestamp)
1192
    self.assertFalse(compat.any(op.start_timestamp or op.end_timestamp
1193
                                for op in job.ops[(runcount + 1):]))
1194
    self.assertEqual(job.GetInfo(["opstatus", "opresult"]),
1195
                     [(([constants.OP_STATUS_SUCCESS] * runcount) +
1196
                       ([constants.OP_STATUS_QUEUED] *
1197
                        (len(job.ops) - runcount))),
1198
                      (["Res%s" % i for i in range(runcount)] +
1199
                       ([None] * (len(job.ops) - runcount)))])
1200

    
1201
    # Must have been written and replicated
1202
    self.assertEqual(queue.GetNextUpdate(), (job, True))
1203
    self.assertRaises(IndexError, queue.GetNextUpdate)
1204

    
1205
  def testQueueShutdownWhileRunning(self):
1206
    # Tests shutting down the queue while a job is running
1207
    queue = _FakeQueueForProc()
1208

    
1209
    ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
1210
           for i in range(3)]
1211

    
1212
    # Create job
1213
    job_id = 2718211587
1214
    job = self._CreateJob(queue, job_id, ops)
1215

    
1216
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1217

    
1218
    opexec = _FakeExecOpCodeForProc(queue, None, None)
1219

    
1220
    self.assertRaises(IndexError, queue.GetNextUpdate)
1221

    
1222
    # Run one opcode
1223
    self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
1224
                     jqueue._JobProcessor.DEFER)
1225

    
1226
    # Job goes back to queued
1227
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1228
    self.assertEqual(job.GetInfo(["opstatus", "opresult"]),
1229
                     [[constants.OP_STATUS_SUCCESS,
1230
                       constants.OP_STATUS_QUEUED,
1231
                       constants.OP_STATUS_QUEUED],
1232
                      ["Res0", None, None]])
1233
    self.assertFalse(job.cur_opctx)
1234

    
1235
    # Writes for waiting, running and result
1236
    for _ in range(3):
1237
      self.assertEqual(queue.GetNextUpdate(), (job, True))
1238

    
1239
    # Run second opcode
1240
    self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
1241
                     jqueue._JobProcessor.DEFER)
1242

    
1243
    # Job goes back to queued
1244
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1245
    self.assertEqual(job.GetInfo(["opstatus", "opresult"]),
1246
                     [[constants.OP_STATUS_SUCCESS,
1247
                       constants.OP_STATUS_SUCCESS,
1248
                       constants.OP_STATUS_QUEUED],
1249
                      ["Res0", "Res1", None]])
1250
    self.assertFalse(job.cur_opctx)
1251

    
1252
    # Writes for waiting, running and result
1253
    for _ in range(3):
1254
      self.assertEqual(queue.GetNextUpdate(), (job, True))
1255

    
1256
    self._TestQueueShutdown(queue, opexec, job, 2)
1257

    
1258
  def testQueueShutdownWithLockTimeout(self):
1259
    # Tests shutting down while a lock acquire times out
1260
    queue = _FakeQueueForProc()
1261

    
1262
    ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
1263
           for i in range(3)]
1264

    
1265
    # Create job
1266
    job_id = 1304231178
1267
    job = self._CreateJob(queue, job_id, ops)
1268

    
1269
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1270

    
1271
    acquire_timeout = False
1272

    
1273
    def _BeforeStart(timeout, priority):
1274
      self.assertFalse(queue.IsAcquired())
1275
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITING)
1276
      if acquire_timeout:
1277
        raise mcpu.LockAcquireTimeout()
1278

    
1279
    opexec = _FakeExecOpCodeForProc(queue, _BeforeStart, None)
1280

    
1281
    self.assertRaises(IndexError, queue.GetNextUpdate)
1282

    
1283
    # Run one opcode
1284
    self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
1285
                     jqueue._JobProcessor.DEFER)
1286

    
1287
    # Job goes back to queued
1288
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1289
    self.assertEqual(job.GetInfo(["opstatus", "opresult"]),
1290
                     [[constants.OP_STATUS_SUCCESS,
1291
                       constants.OP_STATUS_QUEUED,
1292
                       constants.OP_STATUS_QUEUED],
1293
                      ["Res0", None, None]])
1294
    self.assertFalse(job.cur_opctx)
1295

    
1296
    # Writes for waiting, running and result
1297
    for _ in range(3):
1298
      self.assertEqual(queue.GetNextUpdate(), (job, True))
1299

    
1300
    # The next opcode should have expiring lock acquires
1301
    acquire_timeout = True
1302

    
1303
    self._TestQueueShutdown(queue, opexec, job, 1)
1304

    
1305
  def testQueueShutdownWhileInQueue(self):
1306
    # This should never happen in reality (no new jobs are started by the
1307
    # workerpool once a shutdown has been initiated), but it's better to test
1308
    # the job processor for this scenario
1309
    queue = _FakeQueueForProc()
1310

    
1311
    ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
1312
           for i in range(5)]
1313

    
1314
    # Create job
1315
    job_id = 2031
1316
    job = self._CreateJob(queue, job_id, ops)
1317

    
1318
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1319
    self.assertRaises(IndexError, queue.GetNextUpdate)
1320

    
1321
    self.assertFalse(job.start_timestamp)
1322
    self.assertFalse(job.end_timestamp)
1323
    self.assertTrue(compat.all(op.status == constants.OP_STATUS_QUEUED
1324
                               for op in job.ops))
1325

    
1326
    opexec = _FakeExecOpCodeForProc(queue, None, None)
1327
    self._TestQueueShutdown(queue, opexec, job, 0)
1328

    
1329
  def testQueueShutdownWhileWaitlockInQueue(self):
1330
    queue = _FakeQueueForProc()
1331

    
1332
    ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
1333
           for i in range(5)]
1334

    
1335
    # Create job
1336
    job_id = 53125685
1337
    job = self._CreateJob(queue, job_id, ops)
1338

    
1339
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1340

    
1341
    job.ops[0].status = constants.OP_STATUS_WAITING
1342

    
1343
    assert len(job.ops) == 5
1344

    
1345
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITING)
1346

    
1347
    self.assertRaises(IndexError, queue.GetNextUpdate)
1348

    
1349
    opexec = _FakeExecOpCodeForProc(queue, None, None)
1350
    self._TestQueueShutdown(queue, opexec, job, 0)
1351

    
1352
  def testPartiallyRun(self):
1353
    # Tests calling the processor on a job that's been partially run before the
1354
    # program was restarted
1355
    queue = _FakeQueueForProc()
1356

    
1357
    opexec = _FakeExecOpCodeForProc(queue, None, None)
1358

    
1359
    for job_id, successcount in [(30697, 1), (2552, 4), (12489, 9)]:
1360
      ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
1361
             for i in range(10)]
1362

    
1363
      # Create job
1364
      job = self._CreateJob(queue, job_id, ops)
1365

    
1366
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1367

    
1368
      for _ in range(successcount):
1369
        self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
1370
                         jqueue._JobProcessor.DEFER)
1371

    
1372
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1373
      self.assertEqual(job.GetInfo(["opstatus"]),
1374
                       [[constants.OP_STATUS_SUCCESS
1375
                         for _ in range(successcount)] +
1376
                        [constants.OP_STATUS_QUEUED
1377
                         for _ in range(len(ops) - successcount)]])
1378

    
1379
      self.assert_(job.ops_iter)
1380

    
1381
      # Serialize and restore (simulates program restart)
1382
      newjob = jqueue._QueuedJob.Restore(queue, job.Serialize(), True, False)
1383
      self.assertFalse(newjob.ops_iter)
1384
      self._TestPartial(newjob, successcount)
1385

    
1386
  def _TestPartial(self, job, successcount):
1387
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1388
    self.assertEqual(job.start_timestamp, job.ops[0].start_timestamp)
1389

    
1390
    queue = _FakeQueueForProc()
1391
    opexec = _FakeExecOpCodeForProc(queue, None, None)
1392

    
1393
    for remaining in reversed(range(len(job.ops) - successcount)):
1394
      result = jqueue._JobProcessor(queue, opexec, job)()
1395
      self.assertEqual(queue.GetNextUpdate(), (job, True))
1396
      self.assertEqual(queue.GetNextUpdate(), (job, True))
1397
      self.assertEqual(queue.GetNextUpdate(), (job, True))
1398
      self.assertRaises(IndexError, queue.GetNextUpdate)
1399

    
1400
      if remaining == 0:
1401
        # Last opcode
1402
        self.assertEqual(result, jqueue._JobProcessor.FINISHED)
1403
        break
1404

    
1405
      self.assertEqual(result, jqueue._JobProcessor.DEFER)
1406

    
1407
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1408

    
1409
    self.assertRaises(IndexError, queue.GetNextUpdate)
1410
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_SUCCESS)
1411
    self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_SUCCESS])
1412
    self.assertEqual(job.GetInfo(["opresult"]),
1413
                     [[op.input.result for op in job.ops]])
1414
    self.assertEqual(job.GetInfo(["opstatus"]),
1415
                     [[constants.OP_STATUS_SUCCESS for _ in job.ops]])
1416
    self.assert_(compat.all(op.start_timestamp and op.end_timestamp
1417
                            for op in job.ops))
1418

    
1419
    self._GenericCheckJob(job)
1420

    
1421
    # Calling the processor on a finished job should be a no-op
1422
    self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
1423
                     jqueue._JobProcessor.FINISHED)
1424
    self.assertRaises(IndexError, queue.GetNextUpdate)
1425

    
1426
    # ... also after being restored
1427
    job2 = jqueue._QueuedJob.Restore(queue, job.Serialize(), True, False)
1428
    # Calling the processor on a finished job should be a no-op
1429
    self.assertEqual(jqueue._JobProcessor(queue, opexec, job2)(),
1430
                     jqueue._JobProcessor.FINISHED)
1431
    self.assertRaises(IndexError, queue.GetNextUpdate)
1432

    
1433
  def testProcessorOnRunningJob(self):
1434
    ops = [opcodes.OpTestDummy(result="result", fail=False)]
1435

    
1436
    queue = _FakeQueueForProc()
1437
    opexec = _FakeExecOpCodeForProc(queue, None, None)
1438

    
1439
    # Create job
1440
    job = self._CreateJob(queue, 9571, ops)
1441

    
1442
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1443

    
1444
    job.ops[0].status = constants.OP_STATUS_RUNNING
1445

    
1446
    assert len(job.ops) == 1
1447

    
1448
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
1449

    
1450
    # Calling on running job must fail
1451
    self.assertRaises(errors.ProgrammerError,
1452
                      jqueue._JobProcessor(queue, opexec, job))
1453

    
1454
  def testLogMessages(self):
1455
    # Tests the "Feedback" callback function
1456
    queue = _FakeQueueForProc()
1457

    
1458
    messages = {
1459
      1: [
1460
        (None, "Hello"),
1461
        (None, "World"),
1462
        (constants.ELOG_MESSAGE, "there"),
1463
        ],
1464
      4: [
1465
        (constants.ELOG_JQUEUE_TEST, (1, 2, 3)),
1466
        (constants.ELOG_JQUEUE_TEST, ("other", "type")),
1467
        ],
1468
      }
1469
    ops = [opcodes.OpTestDummy(result="Logtest%s" % i, fail=False,
1470
                               messages=messages.get(i, []))
1471
           for i in range(5)]
1472

    
1473
    # Create job
1474
    job = self._CreateJob(queue, 29386, ops)
1475

    
1476
    def _BeforeStart(timeout, priority):
1477
      self.assertEqual(queue.GetNextUpdate(), (job, True))
1478
      self.assertRaises(IndexError, queue.GetNextUpdate)
1479
      self.assertFalse(queue.IsAcquired())
1480
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITING)
1481

    
1482
    def _AfterStart(op, cbs):
1483
      self.assertEqual(queue.GetNextUpdate(), (job, True))
1484
      self.assertRaises(IndexError, queue.GetNextUpdate)
1485
      self.assertFalse(queue.IsAcquired())
1486
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
1487

    
1488
      self.assertRaises(AssertionError, cbs.Feedback,
1489
                        "too", "many", "arguments")
1490

    
1491
      for (log_type, msg) in op.messages:
1492
        self.assertRaises(IndexError, queue.GetNextUpdate)
1493
        if log_type:
1494
          cbs.Feedback(log_type, msg)
1495
        else:
1496
          cbs.Feedback(msg)
1497
        # Check for job update without replication
1498
        self.assertEqual(queue.GetNextUpdate(), (job, False))
1499
        self.assertRaises(IndexError, queue.GetNextUpdate)
1500

    
1501
    opexec = _FakeExecOpCodeForProc(queue, _BeforeStart, _AfterStart)
1502

    
1503
    for remaining in reversed(range(len(job.ops))):
1504
      self.assertRaises(IndexError, queue.GetNextUpdate)
1505
      result = jqueue._JobProcessor(queue, opexec, job)()
1506
      self.assertEqual(queue.GetNextUpdate(), (job, True))
1507
      self.assertRaises(IndexError, queue.GetNextUpdate)
1508

    
1509
      if remaining == 0:
1510
        # Last opcode
1511
        self.assertEqual(result, jqueue._JobProcessor.FINISHED)
1512
        break
1513

    
1514
      self.assertEqual(result, jqueue._JobProcessor.DEFER)
1515

    
1516
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1517

    
1518
    self.assertRaises(IndexError, queue.GetNextUpdate)
1519

    
1520
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_SUCCESS)
1521
    self.assertEqual(job.GetInfo(["opresult"]),
1522
                     [[op.input.result for op in job.ops]])
1523

    
1524
    logmsgcount = sum(len(m) for m in messages.values())
1525

    
1526
    self._CheckLogMessages(job, logmsgcount)
1527

    
1528
    # Serialize and restore (simulates program restart)
1529
    newjob = jqueue._QueuedJob.Restore(queue, job.Serialize(), True, False)
1530
    self._CheckLogMessages(newjob, logmsgcount)
1531

    
1532
    # Check each message
1533
    prevserial = -1
1534
    for idx, oplog in enumerate(job.GetInfo(["oplog"])[0]):
1535
      for (serial, timestamp, log_type, msg) in oplog:
1536
        (exptype, expmsg) = messages.get(idx).pop(0)
1537
        if exptype:
1538
          self.assertEqual(log_type, exptype)
1539
        else:
1540
          self.assertEqual(log_type, constants.ELOG_MESSAGE)
1541
        self.assertEqual(expmsg, msg)
1542
        self.assert_(serial > prevserial)
1543
        prevserial = serial
1544

    
1545
  def _CheckLogMessages(self, job, count):
1546
    # Check serial
1547
    self.assertEqual(job.log_serial, count)
1548

    
1549
    # No filter
1550
    self.assertEqual(job.GetLogEntries(None),
1551
                     [entry for entries in job.GetInfo(["oplog"])[0] if entries
1552
                      for entry in entries])
1553

    
1554
    # Filter with serial
1555
    assert count > 3
1556
    self.assert_(job.GetLogEntries(3))
1557
    self.assertEqual(job.GetLogEntries(3),
1558
                     [entry for entries in job.GetInfo(["oplog"])[0] if entries
1559
                      for entry in entries][3:])
1560

    
1561
    # No log message after highest serial
1562
    self.assertFalse(job.GetLogEntries(count))
1563
    self.assertFalse(job.GetLogEntries(count + 3))
1564

    
1565
  def testSubmitManyJobs(self):
1566
    queue = _FakeQueueForProc()
1567

    
1568
    job_id = 15656
1569
    ops = [
1570
      opcodes.OpTestDummy(result="Res0", fail=False,
1571
                          submit_jobs=[]),
1572
      opcodes.OpTestDummy(result="Res1", fail=False,
1573
                          submit_jobs=[
1574
                            [opcodes.OpTestDummy(result="r1j0", fail=False)],
1575
                            ]),
1576
      opcodes.OpTestDummy(result="Res2", fail=False,
1577
                          submit_jobs=[
1578
                            [opcodes.OpTestDummy(result="r2j0o0", fail=False),
1579
                             opcodes.OpTestDummy(result="r2j0o1", fail=False),
1580
                             opcodes.OpTestDummy(result="r2j0o2", fail=False),
1581
                             opcodes.OpTestDummy(result="r2j0o3", fail=False)],
1582
                            [opcodes.OpTestDummy(result="r2j1", fail=False)],
1583
                            [opcodes.OpTestDummy(result="r2j3o0", fail=False),
1584
                             opcodes.OpTestDummy(result="r2j3o1", fail=False)],
1585
                            ]),
1586
      ]
1587

    
1588
    # Create job
1589
    job = self._CreateJob(queue, job_id, ops)
1590

    
1591
    def _BeforeStart(timeout, priority):
1592
      self.assertEqual(queue.GetNextUpdate(), (job, True))
1593
      self.assertRaises(IndexError, queue.GetNextUpdate)
1594
      self.assertFalse(queue.IsAcquired())
1595
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITING)
1596
      self.assertFalse(job.cur_opctx)
1597

    
1598
    def _AfterStart(op, cbs):
1599
      self.assertEqual(queue.GetNextUpdate(), (job, True))
1600
      self.assertRaises(IndexError, queue.GetNextUpdate)
1601

    
1602
      self.assertFalse(queue.IsAcquired())
1603
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
1604
      self.assertFalse(job.cur_opctx)
1605

    
1606
      # Job is running, cancelling shouldn't be possible
1607
      (success, _) = job.Cancel()
1608
      self.assertFalse(success)
1609

    
1610
    opexec = _FakeExecOpCodeForProc(queue, _BeforeStart, _AfterStart)
1611

    
1612
    for idx in range(len(ops)):
1613
      self.assertRaises(IndexError, queue.GetNextUpdate)
1614
      result = jqueue._JobProcessor(queue, opexec, job)()
1615
      self.assertEqual(queue.GetNextUpdate(), (job, True))
1616
      self.assertRaises(IndexError, queue.GetNextUpdate)
1617
      if idx == len(ops) - 1:
1618
        # Last opcode
1619
        self.assertEqual(result, jqueue._JobProcessor.FINISHED)
1620
      else:
1621
        self.assertEqual(result, jqueue._JobProcessor.DEFER)
1622

    
1623
        self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1624
        self.assert_(job.start_timestamp)
1625
        self.assertFalse(job.end_timestamp)
1626

    
1627
    self.assertRaises(IndexError, queue.GetNextUpdate)
1628

    
1629
    for idx, submitted_ops in enumerate(job_ops
1630
                                        for op in ops
1631
                                        for job_ops in op.submit_jobs):
1632
      self.assertEqual(queue.GetNextSubmittedJob(),
1633
                       (1000 + idx, submitted_ops))
1634
    self.assertRaises(IndexError, queue.GetNextSubmittedJob)
1635

    
1636
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_SUCCESS)
1637
    self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_SUCCESS])
1638
    self.assertEqual(job.GetInfo(["opresult"]),
1639
                     [[[], [1000], [1001, 1002, 1003]]])
1640
    self.assertEqual(job.GetInfo(["opstatus"]),
1641
                     [len(job.ops) * [constants.OP_STATUS_SUCCESS]])
1642

    
1643
    self._GenericCheckJob(job)
1644

    
1645
    # Calling the processor on a finished job should be a no-op
1646
    self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
1647
                     jqueue._JobProcessor.FINISHED)
1648
    self.assertRaises(IndexError, queue.GetNextUpdate)
1649

    
1650
  def testJobDependency(self):
1651
    depmgr = _FakeDependencyManager()
1652
    queue = _FakeQueueForProc(depmgr=depmgr)
1653

    
1654
    self.assertEqual(queue.depmgr, depmgr)
1655

    
1656
    prev_job_id = 22113
1657
    prev_job_id2 = 28102
1658
    job_id = 29929
1659
    ops = [
1660
      opcodes.OpTestDummy(result="Res0", fail=False,
1661
                          depends=[
1662
                            [prev_job_id2, None],
1663
                            [prev_job_id, None],
1664
                            ]),
1665
      opcodes.OpTestDummy(result="Res1", fail=False),
1666
      ]
1667

    
1668
    # Create job
1669
    job = self._CreateJob(queue, job_id, ops)
1670

    
1671
    def _BeforeStart(timeout, priority):
1672
      if attempt == 0 or attempt > 5:
1673
        # Job should only be updated when it wasn't waiting for another job
1674
        self.assertEqual(queue.GetNextUpdate(), (job, True))
1675
      self.assertRaises(IndexError, queue.GetNextUpdate)
1676
      self.assertFalse(queue.IsAcquired())
1677
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITING)
1678
      self.assertFalse(job.cur_opctx)
1679

    
1680
    def _AfterStart(op, cbs):
1681
      self.assertEqual(queue.GetNextUpdate(), (job, True))
1682
      self.assertRaises(IndexError, queue.GetNextUpdate)
1683

    
1684
      self.assertFalse(queue.IsAcquired())
1685
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
1686
      self.assertFalse(job.cur_opctx)
1687

    
1688
      # Job is running, cancelling shouldn't be possible
1689
      (success, _) = job.Cancel()
1690
      self.assertFalse(success)
1691

    
1692
    opexec = _FakeExecOpCodeForProc(queue, _BeforeStart, _AfterStart)
1693

    
1694
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1695

    
1696
    counter = itertools.count()
1697
    while True:
1698
      attempt = counter.next()
1699

    
1700
      self.assertRaises(IndexError, queue.GetNextUpdate)
1701
      self.assertRaises(IndexError, depmgr.GetNextNotification)
1702

    
1703
      if attempt < 2:
1704
        depmgr.AddCheckResult(job, prev_job_id2, None,
1705
                              (jqueue._JobDependencyManager.WAIT, "wait2"))
1706
      elif attempt == 2:
1707
        depmgr.AddCheckResult(job, prev_job_id2, None,
1708
                              (jqueue._JobDependencyManager.CONTINUE, "cont"))
1709
        # The processor will ask for the next dependency immediately
1710
        depmgr.AddCheckResult(job, prev_job_id, None,
1711
                              (jqueue._JobDependencyManager.WAIT, "wait"))
1712
      elif attempt < 5:
1713
        depmgr.AddCheckResult(job, prev_job_id, None,
1714
                              (jqueue._JobDependencyManager.WAIT, "wait"))
1715
      elif attempt == 5:
1716
        depmgr.AddCheckResult(job, prev_job_id, None,
1717
                              (jqueue._JobDependencyManager.CONTINUE, "cont"))
1718
      if attempt == 2:
1719
        self.assertEqual(depmgr.CountPendingResults(), 2)
1720
      elif attempt > 5:
1721
        self.assertEqual(depmgr.CountPendingResults(), 0)
1722
      else:
1723
        self.assertEqual(depmgr.CountPendingResults(), 1)
1724

    
1725
      result = jqueue._JobProcessor(queue, opexec, job)()
1726
      if attempt == 0 or attempt >= 5:
1727
        # Job should only be updated if there was an actual change
1728
        self.assertEqual(queue.GetNextUpdate(), (job, True))
1729
      self.assertRaises(IndexError, queue.GetNextUpdate)
1730
      self.assertFalse(depmgr.CountPendingResults())
1731

    
1732
      if attempt < 5:
1733
        # Simulate waiting for other job
1734
        self.assertEqual(result, jqueue._JobProcessor.WAITDEP)
1735
        self.assertTrue(job.cur_opctx)
1736
        self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITING)
1737
        self.assertRaises(IndexError, depmgr.GetNextNotification)
1738
        self.assert_(job.start_timestamp)
1739
        self.assertFalse(job.end_timestamp)
1740
        continue
1741

    
1742
      if result == jqueue._JobProcessor.FINISHED:
1743
        # Last opcode
1744
        self.assertFalse(job.cur_opctx)
1745
        break
1746

    
1747
      self.assertRaises(IndexError, depmgr.GetNextNotification)
1748

    
1749
      self.assertEqual(result, jqueue._JobProcessor.DEFER)
1750
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1751
      self.assert_(job.start_timestamp)
1752
      self.assertFalse(job.end_timestamp)
1753

    
1754
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_SUCCESS)
1755
    self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_SUCCESS])
1756
    self.assertEqual(job.GetInfo(["opresult"]),
1757
                     [[op.input.result for op in job.ops]])
1758
    self.assertEqual(job.GetInfo(["opstatus"]),
1759
                     [len(job.ops) * [constants.OP_STATUS_SUCCESS]])
1760
    self.assertTrue(compat.all(op.start_timestamp and op.end_timestamp
1761
                               for op in job.ops))
1762

    
1763
    self._GenericCheckJob(job)
1764

    
1765
    self.assertRaises(IndexError, queue.GetNextUpdate)
1766
    self.assertRaises(IndexError, depmgr.GetNextNotification)
1767
    self.assertFalse(depmgr.CountPendingResults())
1768
    self.assertFalse(depmgr.CountWaitingJobs())
1769

    
1770
    # Calling the processor on a finished job should be a no-op
1771
    self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
1772
                     jqueue._JobProcessor.FINISHED)
1773
    self.assertRaises(IndexError, queue.GetNextUpdate)
1774

    
1775
  def testJobDependencyCancel(self):
1776
    depmgr = _FakeDependencyManager()
1777
    queue = _FakeQueueForProc(depmgr=depmgr)
1778

    
1779
    self.assertEqual(queue.depmgr, depmgr)
1780

    
1781
    prev_job_id = 13623
1782
    job_id = 30876
1783
    ops = [
1784
      opcodes.OpTestDummy(result="Res0", fail=False),
1785
      opcodes.OpTestDummy(result="Res1", fail=False,
1786
                          depends=[
1787
                            [prev_job_id, None],
1788
                            ]),
1789
      opcodes.OpTestDummy(result="Res2", fail=False),
1790
      ]
1791

    
1792
    # Create job
1793
    job = self._CreateJob(queue, job_id, ops)
1794

    
1795
    def _BeforeStart(timeout, priority):
1796
      if attempt == 0 or attempt > 5:
1797
        # Job should only be updated when it wasn't waiting for another job
1798
        self.assertEqual(queue.GetNextUpdate(), (job, True))
1799
      self.assertRaises(IndexError, queue.GetNextUpdate)
1800
      self.assertFalse(queue.IsAcquired())
1801
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITING)
1802
      self.assertFalse(job.cur_opctx)
1803

    
1804
    def _AfterStart(op, cbs):
1805
      self.assertEqual(queue.GetNextUpdate(), (job, True))
1806
      self.assertRaises(IndexError, queue.GetNextUpdate)
1807

    
1808
      self.assertFalse(queue.IsAcquired())
1809
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
1810
      self.assertFalse(job.cur_opctx)
1811

    
1812
      # Job is running, cancelling shouldn't be possible
1813
      (success, _) = job.Cancel()
1814
      self.assertFalse(success)
1815

    
1816
    opexec = _FakeExecOpCodeForProc(queue, _BeforeStart, _AfterStart)
1817

    
1818
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1819

    
1820
    counter = itertools.count()
1821
    while True:
1822
      attempt = counter.next()
1823

    
1824
      self.assertRaises(IndexError, queue.GetNextUpdate)
1825
      self.assertRaises(IndexError, depmgr.GetNextNotification)
1826

    
1827
      if attempt == 0:
1828
        # This will handle the first opcode
1829
        pass
1830
      elif attempt < 4:
1831
        depmgr.AddCheckResult(job, prev_job_id, None,
1832
                              (jqueue._JobDependencyManager.WAIT, "wait"))
1833
      elif attempt == 4:
1834
        # Other job was cancelled
1835
        depmgr.AddCheckResult(job, prev_job_id, None,
1836
                              (jqueue._JobDependencyManager.CANCEL, "cancel"))
1837

    
1838
      if attempt == 0:
1839
        self.assertEqual(depmgr.CountPendingResults(), 0)
1840
      else:
1841
        self.assertEqual(depmgr.CountPendingResults(), 1)
1842

    
1843
      result = jqueue._JobProcessor(queue, opexec, job)()
1844
      if attempt <= 1 or attempt >= 4:
1845
        # Job should only be updated if there was an actual change
1846
        self.assertEqual(queue.GetNextUpdate(), (job, True))
1847
      self.assertRaises(IndexError, queue.GetNextUpdate)
1848
      self.assertFalse(depmgr.CountPendingResults())
1849

    
1850
      if attempt > 0 and attempt < 4:
1851
        # Simulate waiting for other job
1852
        self.assertEqual(result, jqueue._JobProcessor.WAITDEP)
1853
        self.assertTrue(job.cur_opctx)
1854
        self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITING)
1855
        self.assertRaises(IndexError, depmgr.GetNextNotification)
1856
        self.assert_(job.start_timestamp)
1857
        self.assertFalse(job.end_timestamp)
1858
        continue
1859

    
1860
      if result == jqueue._JobProcessor.FINISHED:
1861
        # Last opcode
1862
        self.assertFalse(job.cur_opctx)
1863
        break
1864

    
1865
      self.assertRaises(IndexError, depmgr.GetNextNotification)
1866

    
1867
      self.assertEqual(result, jqueue._JobProcessor.DEFER)
1868
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1869
      self.assert_(job.start_timestamp)
1870
      self.assertFalse(job.end_timestamp)
1871

    
1872
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_CANCELED)
1873
    self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_CANCELED])
1874
    self.assertEqual(job.GetInfo(["opstatus", "opresult"]),
1875
                     [[constants.OP_STATUS_SUCCESS,
1876
                       constants.OP_STATUS_CANCELED,
1877
                       constants.OP_STATUS_CANCELED],
1878
                      ["Res0", "Job canceled by request",
1879
                       "Job canceled by request"]])
1880

    
1881
    self._GenericCheckJob(job)
1882

    
1883
    self.assertRaises(IndexError, queue.GetNextUpdate)
1884
    self.assertRaises(IndexError, depmgr.GetNextNotification)
1885
    self.assertFalse(depmgr.CountPendingResults())
1886

    
1887
    # Calling the processor on a finished job should be a no-op
1888
    self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
1889
                     jqueue._JobProcessor.FINISHED)
1890
    self.assertRaises(IndexError, queue.GetNextUpdate)
1891

    
1892
  def testJobDependencyWrongstatus(self):
1893
    depmgr = _FakeDependencyManager()
1894
    queue = _FakeQueueForProc(depmgr=depmgr)
1895

    
1896
    self.assertEqual(queue.depmgr, depmgr)
1897

    
1898
    prev_job_id = 9741
1899
    job_id = 11763
1900
    ops = [
1901
      opcodes.OpTestDummy(result="Res0", fail=False),
1902
      opcodes.OpTestDummy(result="Res1", fail=False,
1903
                          depends=[
1904
                            [prev_job_id, None],
1905
                            ]),
1906
      opcodes.OpTestDummy(result="Res2", fail=False),
1907
      ]
1908

    
1909
    # Create job
1910
    job = self._CreateJob(queue, job_id, ops)
1911

    
1912
    def _BeforeStart(timeout, priority):
1913
      if attempt == 0 or attempt > 5:
1914
        # Job should only be updated when it wasn't waiting for another job
1915
        self.assertEqual(queue.GetNextUpdate(), (job, True))
1916
      self.assertRaises(IndexError, queue.GetNextUpdate)
1917
      self.assertFalse(queue.IsAcquired())
1918
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITING)
1919
      self.assertFalse(job.cur_opctx)
1920

    
1921
    def _AfterStart(op, cbs):
1922
      self.assertEqual(queue.GetNextUpdate(), (job, True))
1923
      self.assertRaises(IndexError, queue.GetNextUpdate)
1924

    
1925
      self.assertFalse(queue.IsAcquired())
1926
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
1927
      self.assertFalse(job.cur_opctx)
1928

    
1929
      # Job is running, cancelling shouldn't be possible
1930
      (success, _) = job.Cancel()
1931
      self.assertFalse(success)
1932

    
1933
    opexec = _FakeExecOpCodeForProc(queue, _BeforeStart, _AfterStart)
1934

    
1935
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1936

    
1937
    counter = itertools.count()
1938
    while True:
1939
      attempt = counter.next()
1940

    
1941
      self.assertRaises(IndexError, queue.GetNextUpdate)
1942
      self.assertRaises(IndexError, depmgr.GetNextNotification)
1943

    
1944
      if attempt == 0:
1945
        # This will handle the first opcode
1946
        pass
1947
      elif attempt < 4:
1948
        depmgr.AddCheckResult(job, prev_job_id, None,
1949
                              (jqueue._JobDependencyManager.WAIT, "wait"))
1950
      elif attempt == 4:
1951
        # Other job failed
1952
        depmgr.AddCheckResult(job, prev_job_id, None,
1953
                              (jqueue._JobDependencyManager.WRONGSTATUS, "w"))
1954

    
1955
      if attempt == 0:
1956
        self.assertEqual(depmgr.CountPendingResults(), 0)
1957
      else:
1958
        self.assertEqual(depmgr.CountPendingResults(), 1)
1959

    
1960
      result = jqueue._JobProcessor(queue, opexec, job)()
1961
      if attempt <= 1 or attempt >= 4:
1962
        # Job should only be updated if there was an actual change
1963
        self.assertEqual(queue.GetNextUpdate(), (job, True))
1964
      self.assertRaises(IndexError, queue.GetNextUpdate)
1965
      self.assertFalse(depmgr.CountPendingResults())
1966

    
1967
      if attempt > 0 and attempt < 4:
1968
        # Simulate waiting for other job
1969
        self.assertEqual(result, jqueue._JobProcessor.WAITDEP)
1970
        self.assertTrue(job.cur_opctx)
1971
        self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITING)
1972
        self.assertRaises(IndexError, depmgr.GetNextNotification)
1973
        self.assert_(job.start_timestamp)
1974
        self.assertFalse(job.end_timestamp)
1975
        continue
1976

    
1977
      if result == jqueue._JobProcessor.FINISHED:
1978
        # Last opcode
1979
        self.assertFalse(job.cur_opctx)
1980
        break
1981

    
1982
      self.assertRaises(IndexError, depmgr.GetNextNotification)
1983

    
1984
      self.assertEqual(result, jqueue._JobProcessor.DEFER)
1985
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1986
      self.assert_(job.start_timestamp)
1987
      self.assertFalse(job.end_timestamp)
1988

    
1989
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_ERROR)
1990
    self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_ERROR])
1991
    self.assertEqual(job.GetInfo(["opstatus"]),
1992
                     [[constants.OP_STATUS_SUCCESS,
1993
                       constants.OP_STATUS_ERROR,
1994
                       constants.OP_STATUS_ERROR]]),
1995

    
1996
    (opresult, ) = job.GetInfo(["opresult"])
1997
    self.assertEqual(len(opresult), len(ops))
1998
    self.assertEqual(opresult[0], "Res0")
1999
    self.assertTrue(errors.GetEncodedError(opresult[1]))
2000
    self.assertTrue(errors.GetEncodedError(opresult[2]))
2001

    
2002
    self._GenericCheckJob(job)
2003

    
2004
    self.assertRaises(IndexError, queue.GetNextUpdate)
2005
    self.assertRaises(IndexError, depmgr.GetNextNotification)
2006
    self.assertFalse(depmgr.CountPendingResults())
2007

    
2008
    # Calling the processor on a finished job should be a no-op
2009
    self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
2010
                     jqueue._JobProcessor.FINISHED)
2011
    self.assertRaises(IndexError, queue.GetNextUpdate)
2012

    
2013

    
2014
class TestEvaluateJobProcessorResult(unittest.TestCase):
2015
  def testFinished(self):
2016
    depmgr = _FakeDependencyManager()
2017
    job = _IdOnlyFakeJob(30953)
2018
    jqueue._EvaluateJobProcessorResult(depmgr, job,
2019
                                       jqueue._JobProcessor.FINISHED)
2020
    self.assertEqual(depmgr.GetNextNotification(), job.id)
2021
    self.assertRaises(IndexError, depmgr.GetNextNotification)
2022

    
2023
  def testDefer(self):
2024
    depmgr = _FakeDependencyManager()
2025
    job = _IdOnlyFakeJob(11326, priority=5463)
2026
    try:
2027
      jqueue._EvaluateJobProcessorResult(depmgr, job,
2028
                                         jqueue._JobProcessor.DEFER)
2029
    except workerpool.DeferTask, err:
2030
      self.assertEqual(err.priority, 5463)
2031
    else:
2032
      self.fail("Didn't raise exception")
2033
    self.assertRaises(IndexError, depmgr.GetNextNotification)
2034

    
2035
  def testWaitdep(self):
2036
    depmgr = _FakeDependencyManager()
2037
    job = _IdOnlyFakeJob(21317)
2038
    jqueue._EvaluateJobProcessorResult(depmgr, job,
2039
                                       jqueue._JobProcessor.WAITDEP)
2040
    self.assertRaises(IndexError, depmgr.GetNextNotification)
2041

    
2042
  def testOther(self):
2043
    depmgr = _FakeDependencyManager()
2044
    job = _IdOnlyFakeJob(5813)
2045
    self.assertRaises(errors.ProgrammerError,
2046
                      jqueue._EvaluateJobProcessorResult,
2047
                      depmgr, job, "Other result")
2048
    self.assertRaises(IndexError, depmgr.GetNextNotification)
2049

    
2050

    
2051
class _FakeTimeoutStrategy:
2052
  def __init__(self, timeouts):
2053
    self.timeouts = timeouts
2054
    self.attempts = 0
2055
    self.last_timeout = None
2056

    
2057
  def NextAttempt(self):
2058
    self.attempts += 1
2059
    if self.timeouts:
2060
      timeout = self.timeouts.pop(0)
2061
    else:
2062
      timeout = None
2063
    self.last_timeout = timeout
2064
    return timeout
2065

    
2066

    
2067
class TestJobProcessorTimeouts(unittest.TestCase, _JobProcessorTestUtils):
2068
  def setUp(self):
2069
    self.queue = _FakeQueueForProc()
2070
    self.job = None
2071
    self.curop = None
2072
    self.opcounter = None
2073
    self.timeout_strategy = None
2074
    self.retries = 0
2075
    self.prev_tsop = None
2076
    self.prev_prio = None
2077
    self.prev_status = None
2078
    self.lock_acq_prio = None
2079
    self.gave_lock = None
2080
    self.done_lock_before_blocking = False
2081

    
2082
  def _BeforeStart(self, timeout, priority):
2083
    job = self.job
2084

    
2085
    # If status has changed, job must've been written
2086
    if self.prev_status != self.job.ops[self.curop].status:
2087
      self.assertEqual(self.queue.GetNextUpdate(), (job, True))
2088
    self.assertRaises(IndexError, self.queue.GetNextUpdate)
2089

    
2090
    self.assertFalse(self.queue.IsAcquired())
2091
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITING)
2092

    
2093
    ts = self.timeout_strategy
2094

    
2095
    self.assert_(timeout is None or isinstance(timeout, (int, float)))
2096
    self.assertEqual(timeout, ts.last_timeout)
2097
    self.assertEqual(priority, job.ops[self.curop].priority)
2098

    
2099
    self.gave_lock = True
2100
    self.lock_acq_prio = priority
2101

    
2102
    if (self.curop == 3 and
2103
        job.ops[self.curop].priority == constants.OP_PRIO_HIGHEST + 3):
2104
      # Give locks before running into blocking acquire
2105
      assert self.retries == 7
2106
      self.retries = 0
2107
      self.done_lock_before_blocking = True
2108
      return
2109

    
2110
    if self.retries > 0:
2111
      self.assert_(timeout is not None)
2112
      self.retries -= 1
2113
      self.gave_lock = False
2114
      raise mcpu.LockAcquireTimeout()
2115

    
2116
    if job.ops[self.curop].priority == constants.OP_PRIO_HIGHEST:
2117
      assert self.retries == 0, "Didn't exhaust all retries at highest priority"
2118
      assert not ts.timeouts
2119
      self.assert_(timeout is None)
2120

    
2121
  def _AfterStart(self, op, cbs):
2122
    job = self.job
2123

    
2124
    # Setting to "running" requires an update
2125
    self.assertEqual(self.queue.GetNextUpdate(), (job, True))
2126
    self.assertRaises(IndexError, self.queue.GetNextUpdate)
2127

    
2128
    self.assertFalse(self.queue.IsAcquired())
2129
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
2130

    
2131
    # Job is running, cancelling shouldn't be possible
2132
    (success, _) = job.Cancel()
2133
    self.assertFalse(success)
2134

    
2135
  def _NextOpcode(self):
2136
    self.curop = self.opcounter.next()
2137
    self.prev_prio = self.job.ops[self.curop].priority
2138
    self.prev_status = self.job.ops[self.curop].status
2139

    
2140
  def _NewTimeoutStrategy(self):
2141
    job = self.job
2142

    
2143
    self.assertEqual(self.retries, 0)
2144

    
2145
    if self.prev_tsop == self.curop:
2146
      # Still on the same opcode, priority must've been increased
2147
      self.assertEqual(self.prev_prio, job.ops[self.curop].priority + 1)
2148

    
2149
    if self.curop == 1:
2150
      # Normal retry
2151
      timeouts = range(10, 31, 10)
2152
      self.retries = len(timeouts) - 1
2153

    
2154
    elif self.curop == 2:
2155
      # Let this run into a blocking acquire
2156
      timeouts = range(11, 61, 12)
2157
      self.retries = len(timeouts)
2158

    
2159
    elif self.curop == 3:
2160
      # Wait for priority to increase, but give lock before blocking acquire
2161
      timeouts = range(12, 100, 14)
2162
      self.retries = len(timeouts)
2163

    
2164
      self.assertFalse(self.done_lock_before_blocking)
2165

    
2166
    elif self.curop == 4:
2167
      self.assert_(self.done_lock_before_blocking)
2168

    
2169
      # Timeouts, but no need to retry
2170
      timeouts = range(10, 31, 10)
2171
      self.retries = 0
2172

    
2173
    elif self.curop == 5:
2174
      # Normal retry
2175
      timeouts = range(19, 100, 11)
2176
      self.retries = len(timeouts)
2177

    
2178
    else:
2179
      timeouts = []
2180
      self.retries = 0
2181

    
2182
    assert len(job.ops) == 10
2183
    assert self.retries <= len(timeouts)
2184

    
2185
    ts = _FakeTimeoutStrategy(timeouts)
2186

    
2187
    self.timeout_strategy = ts
2188
    self.prev_tsop = self.curop
2189
    self.prev_prio = job.ops[self.curop].priority
2190

    
2191
    return ts
2192

    
2193
  def testTimeout(self):
2194
    ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
2195
           for i in range(10)]
2196

    
2197
    # Create job
2198
    job_id = 15801
2199
    job = self._CreateJob(self.queue, job_id, ops)
2200
    self.job = job
2201

    
2202
    self.opcounter = itertools.count(0)
2203

    
2204
    opexec = _FakeExecOpCodeForProc(self.queue, self._BeforeStart,
2205
                                    self._AfterStart)
2206
    tsf = self._NewTimeoutStrategy
2207

    
2208
    self.assertFalse(self.done_lock_before_blocking)
2209

    
2210
    while True:
2211
      proc = jqueue._JobProcessor(self.queue, opexec, job,
2212
                                  _timeout_strategy_factory=tsf)
2213

    
2214
      self.assertRaises(IndexError, self.queue.GetNextUpdate)
2215

    
2216
      if self.curop is not None:
2217
        self.prev_status = self.job.ops[self.curop].status
2218

    
2219
      self.lock_acq_prio = None
2220

    
2221
      result = proc(_nextop_fn=self._NextOpcode)
2222
      assert self.curop is not None
2223

    
2224
      if result == jqueue._JobProcessor.FINISHED or self.gave_lock:
2225
        # Got lock and/or job is done, result must've been written
2226
        self.assertFalse(job.cur_opctx)
2227
        self.assertEqual(self.queue.GetNextUpdate(), (job, True))
2228
        self.assertRaises(IndexError, self.queue.GetNextUpdate)
2229
        self.assertEqual(self.lock_acq_prio, job.ops[self.curop].priority)
2230
        self.assert_(job.ops[self.curop].exec_timestamp)
2231

    
2232
      if result == jqueue._JobProcessor.FINISHED:
2233
        self.assertFalse(job.cur_opctx)
2234
        break
2235

    
2236
      self.assertEqual(result, jqueue._JobProcessor.DEFER)
2237

    
2238
      if self.curop == 0:
2239
        self.assertEqual(job.ops[self.curop].start_timestamp,
2240
                         job.start_timestamp)
2241

    
2242
      if self.gave_lock:
2243
        # Opcode finished, but job not yet done
2244
        self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
2245
      else:
2246
        # Did not get locks
2247
        self.assert_(job.cur_opctx)
2248
        self.assertEqual(job.cur_opctx._timeout_strategy._fn,
2249
                         self.timeout_strategy.NextAttempt)
2250
        self.assertFalse(job.ops[self.curop].exec_timestamp)
2251
        self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITING)
2252

    
2253
        # If priority has changed since acquiring locks, the job must've been
2254
        # updated
2255
        if self.lock_acq_prio != job.ops[self.curop].priority:
2256
          self.assertEqual(self.queue.GetNextUpdate(), (job, True))
2257

    
2258
      self.assertRaises(IndexError, self.queue.GetNextUpdate)
2259

    
2260
      self.assert_(job.start_timestamp)
2261
      self.assertFalse(job.end_timestamp)
2262

    
2263
    self.assertEqual(self.curop, len(job.ops) - 1)
2264
    self.assertEqual(self.job, job)
2265
    self.assertEqual(self.opcounter.next(), len(job.ops))
2266
    self.assert_(self.done_lock_before_blocking)
2267

    
2268
    self.assertRaises(IndexError, self.queue.GetNextUpdate)
2269
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_SUCCESS)
2270
    self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_SUCCESS])
2271
    self.assertEqual(job.GetInfo(["opresult"]),
2272
                     [[op.input.result for op in job.ops]])
2273
    self.assertEqual(job.GetInfo(["opstatus"]),
2274
                     [len(job.ops) * [constants.OP_STATUS_SUCCESS]])
2275
    self.assert_(compat.all(op.start_timestamp and op.end_timestamp
2276
                            for op in job.ops))
2277

    
2278
    # Calling the processor on a finished job should be a no-op
2279
    self.assertEqual(jqueue._JobProcessor(self.queue, opexec, job)(),
2280
                     jqueue._JobProcessor.FINISHED)
2281
    self.assertRaises(IndexError, self.queue.GetNextUpdate)
2282

    
2283

    
2284
class TestJobProcessorChangePriority(unittest.TestCase, _JobProcessorTestUtils):
2285
  def setUp(self):
2286
    self.queue = _FakeQueueForProc()
2287
    self.opexecprio = []
2288

    
2289
  def _BeforeStart(self, timeout, priority):
2290
    self.assertFalse(self.queue.IsAcquired())
2291
    self.opexecprio.append(priority)
2292

    
2293
  def testChangePriorityWhileRunning(self):
2294
    # Tests changing the priority on a job while it has finished opcodes
2295
    # (successful) and more, unprocessed ones
2296
    ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
2297
           for i in range(3)]
2298

    
2299
    # Create job
2300
    job_id = 3499
2301
    job = self._CreateJob(self.queue, job_id, ops)
2302

    
2303
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
2304

    
2305
    opexec = _FakeExecOpCodeForProc(self.queue, self._BeforeStart, None)
2306

    
2307
    # Run first opcode
2308
    self.assertEqual(jqueue._JobProcessor(self.queue, opexec, job)(),
2309
                     jqueue._JobProcessor.DEFER)
2310

    
2311
    # Job goes back to queued
2312
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
2313
    self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT)
2314
    self.assertEqual(job.GetInfo(["opstatus", "opresult"]),
2315
                     [[constants.OP_STATUS_SUCCESS,
2316
                       constants.OP_STATUS_QUEUED,
2317
                       constants.OP_STATUS_QUEUED],
2318
                      ["Res0", None, None]])
2319

    
2320
    self.assertEqual(self.opexecprio.pop(0), constants.OP_PRIO_DEFAULT)
2321
    self.assertRaises(IndexError, self.opexecprio.pop, 0)
2322

    
2323
    # Change priority
2324
    self.assertEqual(job.ChangePriority(-10),
2325
                     (True,
2326
                      ("Priorities of pending opcodes for job 3499 have"
2327
                       " been changed to -10")))
2328
    self.assertEqual(job.CalcPriority(), -10)
2329

    
2330
    # Process second opcode
2331
    self.assertEqual(jqueue._JobProcessor(self.queue, opexec, job)(),
2332
                     jqueue._JobProcessor.DEFER)
2333

    
2334
    self.assertEqual(self.opexecprio.pop(0), -10)
2335
    self.assertRaises(IndexError, self.opexecprio.pop, 0)
2336

    
2337
    # Check status
2338
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
2339
    self.assertEqual(job.CalcPriority(), -10)
2340
    self.assertEqual(job.GetInfo(["id"]), [job_id])
2341
    self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_QUEUED])
2342
    self.assertEqual(job.GetInfo(["opstatus", "opresult"]),
2343
                     [[constants.OP_STATUS_SUCCESS,
2344
                       constants.OP_STATUS_SUCCESS,
2345
                       constants.OP_STATUS_QUEUED],
2346
                      ["Res0", "Res1", None]])
2347

    
2348
    # Change priority once more
2349
    self.assertEqual(job.ChangePriority(5),
2350
                     (True,
2351
                      ("Priorities of pending opcodes for job 3499 have"
2352
                       " been changed to 5")))
2353
    self.assertEqual(job.CalcPriority(), 5)
2354

    
2355
    # Process third opcode
2356
    self.assertEqual(jqueue._JobProcessor(self.queue, opexec, job)(),
2357
                     jqueue._JobProcessor.FINISHED)
2358

    
2359
    self.assertEqual(self.opexecprio.pop(0), 5)
2360
    self.assertRaises(IndexError, self.opexecprio.pop, 0)
2361

    
2362
    # Check status
2363
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_SUCCESS)
2364
    self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT)
2365
    self.assertEqual(job.GetInfo(["id"]), [job_id])
2366
    self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_SUCCESS])
2367
    self.assertEqual(job.GetInfo(["opstatus", "opresult"]),
2368
                     [[constants.OP_STATUS_SUCCESS,
2369
                       constants.OP_STATUS_SUCCESS,
2370
                       constants.OP_STATUS_SUCCESS],
2371
                      ["Res0", "Res1", "Res2"]])
2372
    self.assertEqual(map(operator.attrgetter("priority"), job.ops),
2373
                     [constants.OP_PRIO_DEFAULT, -10, 5])
2374

    
2375

    
2376
class _IdOnlyFakeJob:
2377
  def __init__(self, job_id, priority=NotImplemented):
2378
    self.id = str(job_id)
2379
    self._priority = priority
2380

    
2381
  def CalcPriority(self):
2382
    return self._priority
2383

    
2384

    
2385
class TestJobDependencyManager(unittest.TestCase):
2386
  def setUp(self):
2387
    self._status = []
2388
    self._queue = []
2389
    self.jdm = jqueue._JobDependencyManager(self._GetStatus, self._Enqueue)
2390

    
2391
  def _GetStatus(self, job_id):
2392
    (exp_job_id, result) = self._status.pop(0)
2393
    self.assertEqual(exp_job_id, job_id)
2394
    return result
2395

    
2396
  def _Enqueue(self, jobs):
2397
    self.assertFalse(self.jdm._lock.is_owned(),
2398
                     msg=("Must not own manager lock while re-adding jobs"
2399
                          " (potential deadlock)"))
2400
    self._queue.append(jobs)
2401

    
2402
  def testNotFinalizedThenCancel(self):
2403
    job = _IdOnlyFakeJob(17697)
2404
    job_id = str(28625)
2405

    
2406
    self._status.append((job_id, constants.JOB_STATUS_RUNNING))
2407
    (result, _) = self.jdm.CheckAndRegister(job, job_id, [])
2408
    self.assertEqual(result, self.jdm.WAIT)
2409
    self.assertFalse(self._status)
2410
    self.assertFalse(self._queue)
2411
    self.assertTrue(self.jdm.JobWaiting(job))
2412
    self.assertEqual(self.jdm._waiters, {
2413
      job_id: set([job]),
2414
      })
2415
    self.assertEqual(self.jdm.GetLockInfo([query.LQ_PENDING]), [
2416
      ("job/28625", None, None, [("job", [job.id])])
2417
      ])
2418

    
2419
    self._status.append((job_id, constants.JOB_STATUS_CANCELED))
2420
    (result, _) = self.jdm.CheckAndRegister(job, job_id, [])
2421
    self.assertEqual(result, self.jdm.CANCEL)
2422
    self.assertFalse(self._status)
2423
    self.assertFalse(self._queue)
2424
    self.assertFalse(self.jdm.JobWaiting(job))
2425
    self.assertFalse(self.jdm.GetLockInfo([query.LQ_PENDING]))
2426

    
2427
  def testNotFinalizedThenQueued(self):
2428
    # This can happen on a queue shutdown
2429
    job = _IdOnlyFakeJob(1320)
2430
    job_id = str(22971)
2431

    
2432
    for i in range(5):
2433
      if i > 2:
2434
        self._status.append((job_id, constants.JOB_STATUS_QUEUED))
2435
      else:
2436
        self._status.append((job_id, constants.JOB_STATUS_RUNNING))
2437
      (result, _) = self.jdm.CheckAndRegister(job, job_id, [])
2438
      self.assertEqual(result, self.jdm.WAIT)
2439
      self.assertFalse(self._status)
2440
      self.assertFalse(self._queue)
2441
      self.assertTrue(self.jdm.JobWaiting(job))
2442
      self.assertEqual(self.jdm._waiters, {
2443
        job_id: set([job]),
2444
        })
2445
      self.assertEqual(self.jdm.GetLockInfo([query.LQ_PENDING]), [
2446
        ("job/22971", None, None, [("job", [job.id])])
2447
        ])
2448

    
2449
  def testRequireCancel(self):
2450
    job = _IdOnlyFakeJob(5278)
2451
    job_id = str(9610)
2452
    dep_status = [constants.JOB_STATUS_CANCELED]
2453

    
2454
    self._status.append((job_id, constants.JOB_STATUS_WAITING))
2455
    (result, _) = self.jdm.CheckAndRegister(job, job_id, dep_status)
2456
    self.assertEqual(result, self.jdm.WAIT)
2457
    self.assertFalse(self._status)
2458
    self.assertFalse(self._queue)
2459
    self.assertTrue(self.jdm.JobWaiting(job))
2460
    self.assertEqual(self.jdm._waiters, {
2461
      job_id: set([job]),
2462
      })
2463
    self.assertEqual(self.jdm.GetLockInfo([query.LQ_PENDING]), [
2464
      ("job/9610", None, None, [("job", [job.id])])
2465
      ])
2466

    
2467
    self._status.append((job_id, constants.JOB_STATUS_CANCELED))
2468
    (result, _) = self.jdm.CheckAndRegister(job, job_id, dep_status)
2469
    self.assertEqual(result, self.jdm.CONTINUE)
2470
    self.assertFalse(self._status)
2471
    self.assertFalse(self._queue)
2472
    self.assertFalse(self.jdm.JobWaiting(job))
2473
    self.assertFalse(self.jdm.GetLockInfo([query.LQ_PENDING]))
2474

    
2475
  def testRequireError(self):
2476
    job = _IdOnlyFakeJob(21459)
2477
    job_id = str(25519)
2478
    dep_status = [constants.JOB_STATUS_ERROR]
2479

    
2480
    self._status.append((job_id, constants.JOB_STATUS_WAITING))
2481
    (result, _) = self.jdm.CheckAndRegister(job, job_id, dep_status)
2482
    self.assertEqual(result, self.jdm.WAIT)
2483
    self.assertFalse(self._status)
2484
    self.assertFalse(self._queue)
2485
    self.assertTrue(self.jdm.JobWaiting(job))
2486
    self.assertEqual(self.jdm._waiters, {
2487
      job_id: set([job]),
2488
      })
2489

    
2490
    self._status.append((job_id, constants.JOB_STATUS_ERROR))
2491
    (result, _) = self.jdm.CheckAndRegister(job, job_id, dep_status)
2492
    self.assertEqual(result, self.jdm.CONTINUE)
2493
    self.assertFalse(self._status)
2494
    self.assertFalse(self._queue)
2495
    self.assertFalse(self.jdm.JobWaiting(job))
2496
    self.assertFalse(self.jdm.GetLockInfo([query.LQ_PENDING]))
2497

    
2498
  def testRequireMultiple(self):
2499
    dep_status = list(constants.JOBS_FINALIZED)
2500

    
2501
    for end_status in dep_status:
2502
      job = _IdOnlyFakeJob(21343)
2503
      job_id = str(14609)
2504

    
2505
      self._status.append((job_id, constants.JOB_STATUS_WAITING))
2506
      (result, _) = self.jdm.CheckAndRegister(job, job_id, dep_status)
2507
      self.assertEqual(result, self.jdm.WAIT)
2508
      self.assertFalse(self._status)
2509
      self.assertFalse(self._queue)
2510
      self.assertTrue(self.jdm.JobWaiting(job))
2511
      self.assertEqual(self.jdm._waiters, {
2512
        job_id: set([job]),
2513
        })
2514
      self.assertEqual(self.jdm.GetLockInfo([query.LQ_PENDING]), [
2515
        ("job/14609", None, None, [("job", [job.id])])
2516
        ])
2517

    
2518
      self._status.append((job_id, end_status))
2519
      (result, _) = self.jdm.CheckAndRegister(job, job_id, dep_status)
2520
      self.assertEqual(result, self.jdm.CONTINUE)
2521
      self.assertFalse(self._status)
2522
      self.assertFalse(self._queue)
2523
      self.assertFalse(self.jdm.JobWaiting(job))
2524
      self.assertFalse(self.jdm.GetLockInfo([query.LQ_PENDING]))
2525

    
2526
  def testNotify(self):
2527
    job = _IdOnlyFakeJob(8227)
2528
    job_id = str(4113)
2529

    
2530
    self._status.append((job_id, constants.JOB_STATUS_RUNNING))
2531
    (result, _) = self.jdm.CheckAndRegister(job, job_id, [])
2532
    self.assertEqual(result, self.jdm.WAIT)
2533
    self.assertFalse(self._status)
2534
    self.assertFalse(self._queue)
2535
    self.assertTrue(self.jdm.JobWaiting(job))
2536
    self.assertEqual(self.jdm._waiters, {
2537
      job_id: set([job]),
2538
      })
2539

    
2540
    self.jdm.NotifyWaiters(job_id)
2541
    self.assertFalse(self._status)
2542
    self.assertFalse(self.jdm._waiters)
2543
    self.assertFalse(self.jdm.JobWaiting(job))
2544
    self.assertEqual(self._queue, [set([job])])
2545

    
2546
  def testWrongStatus(self):
2547
    job = _IdOnlyFakeJob(10102)
2548
    job_id = str(1271)
2549

    
2550
    self._status.append((job_id, constants.JOB_STATUS_QUEUED))
2551
    (result, _) = self.jdm.CheckAndRegister(job, job_id,
2552
                                            [constants.JOB_STATUS_SUCCESS])
2553
    self.assertEqual(result, self.jdm.WAIT)
2554
    self.assertFalse(self._status)
2555
    self.assertFalse(self._queue)
2556
    self.assertTrue(self.jdm.JobWaiting(job))
2557
    self.assertEqual(self.jdm._waiters, {
2558
      job_id: set([job]),
2559
      })
2560

    
2561
    self._status.append((job_id, constants.JOB_STATUS_ERROR))
2562
    (result, _) = self.jdm.CheckAndRegister(job, job_id,
2563
                                            [constants.JOB_STATUS_SUCCESS])
2564
    self.assertEqual(result, self.jdm.WRONGSTATUS)
2565
    self.assertFalse(self._status)
2566
    self.assertFalse(self._queue)
2567
    self.assertFalse(self.jdm.JobWaiting(job))
2568

    
2569
  def testCorrectStatus(self):
2570
    job = _IdOnlyFakeJob(24273)
2571
    job_id = str(23885)
2572

    
2573
    self._status.append((job_id, constants.JOB_STATUS_QUEUED))
2574
    (result, _) = self.jdm.CheckAndRegister(job, job_id,
2575
                                            [constants.JOB_STATUS_SUCCESS])
2576
    self.assertEqual(result, self.jdm.WAIT)
2577
    self.assertFalse(self._status)
2578
    self.assertFalse(self._queue)
2579
    self.assertTrue(self.jdm.JobWaiting(job))
2580
    self.assertEqual(self.jdm._waiters, {
2581
      job_id: set([job]),
2582
      })
2583

    
2584
    self._status.append((job_id, constants.JOB_STATUS_SUCCESS))
2585
    (result, _) = self.jdm.CheckAndRegister(job, job_id,
2586
                                            [constants.JOB_STATUS_SUCCESS])
2587
    self.assertEqual(result, self.jdm.CONTINUE)
2588
    self.assertFalse(self._status)
2589
    self.assertFalse(self._queue)
2590
    self.assertFalse(self.jdm.JobWaiting(job))
2591

    
2592
  def testFinalizedRightAway(self):
2593
    job = _IdOnlyFakeJob(224)
2594
    job_id = str(3081)
2595

    
2596
    self._status.append((job_id, constants.JOB_STATUS_SUCCESS))
2597
    (result, _) = self.jdm.CheckAndRegister(job, job_id,
2598
                                            [constants.JOB_STATUS_SUCCESS])
2599
    self.assertEqual(result, self.jdm.CONTINUE)
2600
    self.assertFalse(self._status)
2601
    self.assertFalse(self._queue)
2602
    self.assertFalse(self.jdm.JobWaiting(job))
2603
    self.assertEqual(self.jdm._waiters, {
2604
      job_id: set(),
2605
      })
2606

    
2607
    # Force cleanup
2608
    self.jdm.NotifyWaiters("0")
2609
    self.assertFalse(self.jdm._waiters)
2610
    self.assertFalse(self._status)
2611
    self.assertFalse(self._queue)
2612

    
2613
  def testMultipleWaiting(self):
2614
    # Use a deterministic random generator
2615
    rnd = random.Random(21402)
2616

    
2617
    job_ids = map(str, rnd.sample(range(1, 10000), 150))
2618

    
2619
    waiters = dict((job_ids.pop(),
2620
                    set(map(_IdOnlyFakeJob,
2621
                            [job_ids.pop()
2622
                             for _ in range(rnd.randint(1, 20))])))
2623
                   for _ in range(10))
2624

    
2625
    # Ensure there are no duplicate job IDs
2626
    assert not utils.FindDuplicates(waiters.keys() +
2627
                                    [job.id
2628
                                     for jobs in waiters.values()
2629
                                     for job in jobs])
2630

    
2631
    # Register all jobs as waiters
2632
    for job_id, job in [(job_id, job)
2633
                        for (job_id, jobs) in waiters.items()
2634
                        for job in jobs]:
2635
      self._status.append((job_id, constants.JOB_STATUS_QUEUED))
2636
      (result, _) = self.jdm.CheckAndRegister(job, job_id,
2637
                                              [constants.JOB_STATUS_SUCCESS])
2638
      self.assertEqual(result, self.jdm.WAIT)
2639
      self.assertFalse(self._status)
2640
      self.assertFalse(self._queue)
2641
      self.assertTrue(self.jdm.JobWaiting(job))
2642

    
2643
    self.assertEqual(self.jdm._waiters, waiters)
2644

    
2645
    def _MakeSet((name, mode, owner_names, pending)):
2646
      return (name, mode, owner_names,
2647
              [(pendmode, set(pend)) for (pendmode, pend) in pending])
2648

    
2649
    def _CheckLockInfo():
2650
      info = self.jdm.GetLockInfo([query.LQ_PENDING])
2651
      self.assertEqual(sorted(map(_MakeSet, info)), sorted([
2652
        ("job/%s" % job_id, None, None,
2653
         [("job", set([job.id for job in jobs]))])
2654
        for job_id, jobs in waiters.items()
2655
        if jobs
2656
        ]))
2657

    
2658
    _CheckLockInfo()
2659

    
2660
    # Notify in random order
2661
    for job_id in rnd.sample(waiters, len(waiters)):
2662
      # Remove from pending waiter list
2663
      jobs = waiters.pop(job_id)
2664
      for job in jobs:
2665
        self._status.append((job_id, constants.JOB_STATUS_SUCCESS))
2666
        (result, _) = self.jdm.CheckAndRegister(job, job_id,
2667
                                                [constants.JOB_STATUS_SUCCESS])
2668
        self.assertEqual(result, self.jdm.CONTINUE)
2669
        self.assertFalse(self._status)
2670
        self.assertFalse(self._queue)
2671
        self.assertFalse(self.jdm.JobWaiting(job))
2672

    
2673
      _CheckLockInfo()
2674

    
2675
    self.assertFalse(self.jdm.GetLockInfo([query.LQ_PENDING]))
2676

    
2677
    assert not waiters
2678

    
2679
  def testSelfDependency(self):
2680
    job = _IdOnlyFakeJob(18937)
2681

    
2682
    self._status.append((job.id, constants.JOB_STATUS_SUCCESS))
2683
    (result, _) = self.jdm.CheckAndRegister(job, job.id, [])
2684
    self.assertEqual(result, self.jdm.ERROR)
2685

    
2686
  def testJobDisappears(self):
2687
    job = _IdOnlyFakeJob(30540)
2688
    job_id = str(23769)
2689

    
2690
    def _FakeStatus(_):
2691
      raise errors.JobLost("#msg#")
2692

    
2693
    jdm = jqueue._JobDependencyManager(_FakeStatus, None)
2694
    (result, _) = jdm.CheckAndRegister(job, job_id, [])
2695
    self.assertEqual(result, self.jdm.ERROR)
2696
    self.assertFalse(jdm.JobWaiting(job))
2697
    self.assertFalse(jdm.GetLockInfo([query.LQ_PENDING]))
2698

    
2699

    
2700
if __name__ == "__main__":
2701
  testutils.GanetiTestProgram()