Statistics
| Branch: | Tag: | Revision:

root / test / py / ganeti.jqueue_unittest.py @ a9e1819b

History | View | Annotate | Download (95.3 kB)

1
#!/usr/bin/python
2
#
3

    
4
# Copyright (C) 2010, 2011, 2012 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Script for testing ganeti.jqueue"""
23

    
24
import os
25
import sys
26
import unittest
27
import tempfile
28
import shutil
29
import errno
30
import itertools
31
import random
32
import operator
33

    
34
try:
35
  # pylint: disable=E0611
36
  from pyinotify import pyinotify
37
except ImportError:
38
  import pyinotify
39

    
40
from ganeti import constants
41
from ganeti import utils
42
from ganeti import errors
43
from ganeti import jqueue
44
from ganeti import opcodes
45
from ganeti import compat
46
from ganeti import mcpu
47
from ganeti import query
48
from ganeti import workerpool
49

    
50
import testutils
51

    
52

    
53
class _FakeJob:
54
  def __init__(self, job_id, status):
55
    self.id = job_id
56
    self.writable = False
57
    self._status = status
58
    self._log = []
59

    
60
  def SetStatus(self, status):
61
    self._status = status
62

    
63
  def AddLogEntry(self, msg):
64
    self._log.append((len(self._log), msg))
65

    
66
  def CalcStatus(self):
67
    return self._status
68

    
69
  def GetInfo(self, fields):
70
    result = []
71

    
72
    for name in fields:
73
      if name == "status":
74
        result.append(self._status)
75
      else:
76
        raise Exception("Unknown field")
77

    
78
    return result
79

    
80
  def GetLogEntries(self, newer_than):
81
    assert newer_than is None or newer_than >= 0
82

    
83
    if newer_than is None:
84
      return self._log
85

    
86
    return self._log[newer_than:]
87

    
88

    
89
class TestJobChangesChecker(unittest.TestCase):
90
  def testStatus(self):
91
    job = _FakeJob(9094, constants.JOB_STATUS_QUEUED)
92
    checker = jqueue._JobChangesChecker(["status"], None, None)
93
    self.assertEqual(checker(job), ([constants.JOB_STATUS_QUEUED], []))
94

    
95
    job.SetStatus(constants.JOB_STATUS_RUNNING)
96
    self.assertEqual(checker(job), ([constants.JOB_STATUS_RUNNING], []))
97

    
98
    job.SetStatus(constants.JOB_STATUS_SUCCESS)
99
    self.assertEqual(checker(job), ([constants.JOB_STATUS_SUCCESS], []))
100

    
101
    # job.id is used by checker
102
    self.assertEqual(job.id, 9094)
103

    
104
  def testStatusWithPrev(self):
105
    job = _FakeJob(12807, constants.JOB_STATUS_QUEUED)
106
    checker = jqueue._JobChangesChecker(["status"],
107
                                        [constants.JOB_STATUS_QUEUED], None)
108
    self.assert_(checker(job) is None)
109

    
110
    job.SetStatus(constants.JOB_STATUS_RUNNING)
111
    self.assertEqual(checker(job), ([constants.JOB_STATUS_RUNNING], []))
112

    
113
  def testFinalStatus(self):
114
    for status in constants.JOBS_FINALIZED:
115
      job = _FakeJob(2178711, status)
116
      checker = jqueue._JobChangesChecker(["status"], [status], None)
117
      # There won't be any changes in this status, hence it should signal
118
      # a change immediately
119
      self.assertEqual(checker(job), ([status], []))
120

    
121
  def testLog(self):
122
    job = _FakeJob(9094, constants.JOB_STATUS_RUNNING)
123
    checker = jqueue._JobChangesChecker(["status"], None, None)
124
    self.assertEqual(checker(job), ([constants.JOB_STATUS_RUNNING], []))
125

    
126
    job.AddLogEntry("Hello World")
127
    (job_info, log_entries) = checker(job)
128
    self.assertEqual(job_info, [constants.JOB_STATUS_RUNNING])
129
    self.assertEqual(log_entries, [[0, "Hello World"]])
130

    
131
    checker2 = jqueue._JobChangesChecker(["status"], job_info, len(log_entries))
132
    self.assert_(checker2(job) is None)
133

    
134
    job.AddLogEntry("Foo Bar")
135
    job.SetStatus(constants.JOB_STATUS_ERROR)
136

    
137
    (job_info, log_entries) = checker2(job)
138
    self.assertEqual(job_info, [constants.JOB_STATUS_ERROR])
139
    self.assertEqual(log_entries, [[1, "Foo Bar"]])
140

    
141
    checker3 = jqueue._JobChangesChecker(["status"], None, None)
142
    (job_info, log_entries) = checker3(job)
143
    self.assertEqual(job_info, [constants.JOB_STATUS_ERROR])
144
    self.assertEqual(log_entries, [[0, "Hello World"], [1, "Foo Bar"]])
145

    
146

    
147
class TestJobChangesWaiter(unittest.TestCase):
148
  def setUp(self):
149
    self.tmpdir = tempfile.mkdtemp()
150
    self.filename = utils.PathJoin(self.tmpdir, "job-1")
151
    utils.WriteFile(self.filename, data="")
152

    
153
  def tearDown(self):
154
    shutil.rmtree(self.tmpdir)
155

    
156
  def _EnsureNotifierClosed(self, notifier):
157
    try:
158
      os.fstat(notifier._fd)
159
    except EnvironmentError, err:
160
      self.assertEqual(err.errno, errno.EBADF)
161
    else:
162
      self.fail("File descriptor wasn't closed")
163

    
164
  def testClose(self):
165
    for wait in [False, True]:
166
      waiter = jqueue._JobFileChangesWaiter(self.filename)
167
      try:
168
        if wait:
169
          waiter.Wait(0.001)
170
      finally:
171
        waiter.Close()
172

    
173
      # Ensure file descriptor was closed
174
      self._EnsureNotifierClosed(waiter._notifier)
175

    
176
  def testChangingFile(self):
177
    waiter = jqueue._JobFileChangesWaiter(self.filename)
178
    try:
179
      self.assertFalse(waiter.Wait(0.1))
180
      utils.WriteFile(self.filename, data="changed")
181
      self.assert_(waiter.Wait(60))
182
    finally:
183
      waiter.Close()
184

    
185
    self._EnsureNotifierClosed(waiter._notifier)
186

    
187
  def testChangingFile2(self):
188
    waiter = jqueue._JobChangesWaiter(self.filename)
189
    try:
190
      self.assertFalse(waiter._filewaiter)
191
      self.assert_(waiter.Wait(0.1))
192
      self.assert_(waiter._filewaiter)
193

    
194
      # File waiter is now used, but there have been no changes
195
      self.assertFalse(waiter.Wait(0.1))
196
      utils.WriteFile(self.filename, data="changed")
197
      self.assert_(waiter.Wait(60))
198
    finally:
199
      waiter.Close()
200

    
201
    self._EnsureNotifierClosed(waiter._filewaiter._notifier)
202

    
203

    
204
class _FailingWatchManager(pyinotify.WatchManager):
205
  """Subclass of L{pyinotify.WatchManager} which always fails to register.
206

207
  """
208
  def add_watch(self, filename, mask):
209
    assert mask == (pyinotify.EventsCodes.ALL_FLAGS["IN_MODIFY"] |
210
                    pyinotify.EventsCodes.ALL_FLAGS["IN_IGNORED"])
211

    
212
    return {
213
      filename: -1,
214
      }
215

    
216

    
217
class TestWaitForJobChangesHelper(unittest.TestCase):
218
  def setUp(self):
219
    self.tmpdir = tempfile.mkdtemp()
220
    self.filename = utils.PathJoin(self.tmpdir, "job-2614226563")
221
    utils.WriteFile(self.filename, data="")
222

    
223
  def tearDown(self):
224
    shutil.rmtree(self.tmpdir)
225

    
226
  def _LoadWaitingJob(self):
227
    return _FakeJob(2614226563, constants.JOB_STATUS_WAITING)
228

    
229
  def _LoadLostJob(self):
230
    return None
231

    
232
  def testNoChanges(self):
233
    wfjc = jqueue._WaitForJobChangesHelper()
234

    
235
    # No change
236
    self.assertEqual(wfjc(self.filename, self._LoadWaitingJob, ["status"],
237
                          [constants.JOB_STATUS_WAITING], None, 0.1),
238
                     constants.JOB_NOTCHANGED)
239

    
240
    # No previous information
241
    self.assertEqual(wfjc(self.filename, self._LoadWaitingJob,
242
                          ["status"], None, None, 1.0),
243
                     ([constants.JOB_STATUS_WAITING], []))
244

    
245
  def testLostJob(self):
246
    wfjc = jqueue._WaitForJobChangesHelper()
247
    self.assert_(wfjc(self.filename, self._LoadLostJob,
248
                      ["status"], None, None, 1.0) is None)
249

    
250
  def testNonExistentFile(self):
251
    wfjc = jqueue._WaitForJobChangesHelper()
252

    
253
    filename = utils.PathJoin(self.tmpdir, "does-not-exist")
254
    self.assertFalse(os.path.exists(filename))
255

    
256
    result = wfjc(filename, self._LoadLostJob, ["status"], None, None, 1.0,
257
                  _waiter_cls=compat.partial(jqueue._JobChangesWaiter,
258
                                             _waiter_cls=NotImplemented))
259
    self.assertTrue(result is None)
260

    
261
  def testInotifyError(self):
262
    jobfile_waiter_cls = \
263
      compat.partial(jqueue._JobFileChangesWaiter,
264
                     _inotify_wm_cls=_FailingWatchManager)
265

    
266
    jobchange_waiter_cls = \
267
      compat.partial(jqueue._JobChangesWaiter, _waiter_cls=jobfile_waiter_cls)
268

    
269
    wfjc = jqueue._WaitForJobChangesHelper()
270

    
271
    # Test if failing to watch a job file (e.g. due to
272
    # fs.inotify.max_user_watches being too low) raises errors.InotifyError
273
    self.assertRaises(errors.InotifyError, wfjc,
274
                      self.filename, self._LoadWaitingJob,
275
                      ["status"], [constants.JOB_STATUS_WAITING], None, 1.0,
276
                      _waiter_cls=jobchange_waiter_cls)
277

    
278

    
279
class TestEncodeOpError(unittest.TestCase):
280
  def test(self):
281
    encerr = jqueue._EncodeOpError(errors.LockError("Test 1"))
282
    self.assert_(isinstance(encerr, tuple))
283
    self.assertRaises(errors.LockError, errors.MaybeRaise, encerr)
284

    
285
    encerr = jqueue._EncodeOpError(errors.GenericError("Test 2"))
286
    self.assert_(isinstance(encerr, tuple))
287
    self.assertRaises(errors.GenericError, errors.MaybeRaise, encerr)
288

    
289
    encerr = jqueue._EncodeOpError(NotImplementedError("Foo"))
290
    self.assert_(isinstance(encerr, tuple))
291
    self.assertRaises(errors.OpExecError, errors.MaybeRaise, encerr)
292

    
293
    encerr = jqueue._EncodeOpError("Hello World")
294
    self.assert_(isinstance(encerr, tuple))
295
    self.assertRaises(errors.OpExecError, errors.MaybeRaise, encerr)
296

    
297

    
298
class TestQueuedOpCode(unittest.TestCase):
299
  def testDefaults(self):
300
    def _Check(op):
301
      self.assertFalse(op.input.dry_run)
302
      self.assertEqual(op.priority, constants.OP_PRIO_DEFAULT)
303
      self.assertFalse(op.log)
304
      self.assert_(op.start_timestamp is None)
305
      self.assert_(op.exec_timestamp is None)
306
      self.assert_(op.end_timestamp is None)
307
      self.assert_(op.result is None)
308
      self.assertEqual(op.status, constants.OP_STATUS_QUEUED)
309

    
310
    op1 = jqueue._QueuedOpCode(opcodes.OpTestDelay())
311
    _Check(op1)
312
    op2 = jqueue._QueuedOpCode.Restore(op1.Serialize())
313
    _Check(op2)
314
    self.assertEqual(op1.Serialize(), op2.Serialize())
315

    
316
  def testPriority(self):
317
    def _Check(op):
318
      assert constants.OP_PRIO_DEFAULT != constants.OP_PRIO_HIGH, \
319
             "Default priority equals high priority; test can't work"
320
      self.assertEqual(op.priority, constants.OP_PRIO_HIGH)
321
      self.assertEqual(op.status, constants.OP_STATUS_QUEUED)
322

    
323
    inpop = opcodes.OpTagsGet(priority=constants.OP_PRIO_HIGH)
324
    op1 = jqueue._QueuedOpCode(inpop)
325
    _Check(op1)
326
    op2 = jqueue._QueuedOpCode.Restore(op1.Serialize())
327
    _Check(op2)
328
    self.assertEqual(op1.Serialize(), op2.Serialize())
329

    
330

    
331
class TestQueuedJob(unittest.TestCase):
332
  def testNoOpCodes(self):
333
    self.assertRaises(errors.GenericError, jqueue._QueuedJob,
334
                      None, 1, [], False)
335

    
336
  def testDefaults(self):
337
    job_id = 4260
338
    ops = [
339
      opcodes.OpTagsGet(),
340
      opcodes.OpTestDelay(),
341
      ]
342

    
343
    def _Check(job):
344
      self.assertTrue(job.writable)
345
      self.assertEqual(job.id, job_id)
346
      self.assertEqual(job.log_serial, 0)
347
      self.assert_(job.received_timestamp)
348
      self.assert_(job.start_timestamp is None)
349
      self.assert_(job.end_timestamp is None)
350
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
351
      self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT)
352
      self.assert_(repr(job).startswith("<"))
353
      self.assertEqual(len(job.ops), len(ops))
354
      self.assert_(compat.all(inp.__getstate__() == op.input.__getstate__()
355
                              for (inp, op) in zip(ops, job.ops)))
356
      self.assertRaises(errors.OpPrereqError, job.GetInfo,
357
                        ["unknown-field"])
358
      self.assertEqual(job.GetInfo(["summary"]),
359
                       [[op.input.Summary() for op in job.ops]])
360
      self.assertFalse(job.archived)
361

    
362
    job1 = jqueue._QueuedJob(None, job_id, ops, True)
363
    _Check(job1)
364
    job2 = jqueue._QueuedJob.Restore(None, job1.Serialize(), True, False)
365
    _Check(job2)
366
    self.assertEqual(job1.Serialize(), job2.Serialize())
367

    
368
  def testWritable(self):
369
    job = jqueue._QueuedJob(None, 1, [opcodes.OpTestDelay()], False)
370
    self.assertFalse(job.writable)
371

    
372
    job = jqueue._QueuedJob(None, 1, [opcodes.OpTestDelay()], True)
373
    self.assertTrue(job.writable)
374

    
375
  def testArchived(self):
376
    job = jqueue._QueuedJob(None, 1, [opcodes.OpTestDelay()], False)
377
    self.assertFalse(job.archived)
378

    
379
    newjob = jqueue._QueuedJob.Restore(None, job.Serialize(), True, True)
380
    self.assertTrue(newjob.archived)
381

    
382
    newjob2 = jqueue._QueuedJob.Restore(None, newjob.Serialize(), True, False)
383
    self.assertFalse(newjob2.archived)
384

    
385
  def testPriority(self):
386
    job_id = 4283
387
    ops = [
388
      opcodes.OpTagsGet(priority=constants.OP_PRIO_DEFAULT),
389
      opcodes.OpTestDelay(),
390
      ]
391

    
392
    def _Check(job):
393
      self.assertEqual(job.id, job_id)
394
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
395
      self.assert_(repr(job).startswith("<"))
396

    
397
    job = jqueue._QueuedJob(None, job_id, ops, True)
398
    _Check(job)
399
    self.assert_(compat.all(op.priority == constants.OP_PRIO_DEFAULT
400
                            for op in job.ops))
401
    self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT)
402

    
403
    # Increase first
404
    job.ops[0].priority -= 1
405
    _Check(job)
406
    self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT - 1)
407

    
408
    # Mark opcode as finished
409
    job.ops[0].status = constants.OP_STATUS_SUCCESS
410
    _Check(job)
411
    self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT)
412

    
413
    # Increase second
414
    job.ops[1].priority -= 10
415
    self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT - 10)
416

    
417
    # Test increasing first
418
    job.ops[0].status = constants.OP_STATUS_RUNNING
419
    job.ops[0].priority -= 19
420
    self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT - 20)
421

    
422
  def _JobForPriority(self, job_id):
423
    ops = [
424
      opcodes.OpTagsGet(),
425
      opcodes.OpTestDelay(),
426
      opcodes.OpTagsGet(),
427
      opcodes.OpTestDelay(),
428
      ]
429

    
430
    job = jqueue._QueuedJob(None, job_id, ops, True)
431

    
432
    self.assertTrue(compat.all(op.priority == constants.OP_PRIO_DEFAULT
433
                               for op in job.ops))
434
    self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT)
435

    
436
    return job
437

    
438
  def testChangePriorityAllQueued(self):
439
    job = self._JobForPriority(24984)
440
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
441
    self.assertTrue(compat.all(op.status == constants.OP_STATUS_QUEUED
442
                               for op in job.ops))
443
    result = job.ChangePriority(-10)
444
    self.assertEqual(job.CalcPriority(), -10)
445
    self.assertTrue(compat.all(op.priority == -10 for op in job.ops))
446
    self.assertEqual(result,
447
                     (True, ("Priorities of pending opcodes for job 24984 have"
448
                             " been changed to -10")))
449

    
450
  def testChangePriorityAllFinished(self):
451
    job = self._JobForPriority(16405)
452

    
453
    for (idx, op) in enumerate(job.ops):
454
      if idx > 2:
455
        op.status = constants.OP_STATUS_ERROR
456
      else:
457
        op.status = constants.OP_STATUS_SUCCESS
458

    
459
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_ERROR)
460
    self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT)
461
    result = job.ChangePriority(-10)
462
    self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT)
463
    self.assertTrue(compat.all(op.priority == constants.OP_PRIO_DEFAULT
464
                               for op in job.ops))
465
    self.assertEqual(map(operator.attrgetter("status"), job.ops), [
466
      constants.OP_STATUS_SUCCESS,
467
      constants.OP_STATUS_SUCCESS,
468
      constants.OP_STATUS_SUCCESS,
469
      constants.OP_STATUS_ERROR,
470
      ])
471
    self.assertEqual(result, (False, "Job 16405 is finished"))
472

    
473
  def testChangePriorityCancelling(self):
474
    job = self._JobForPriority(31572)
475

    
476
    for (idx, op) in enumerate(job.ops):
477
      if idx > 1:
478
        op.status = constants.OP_STATUS_CANCELING
479
      else:
480
        op.status = constants.OP_STATUS_SUCCESS
481

    
482
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_CANCELING)
483
    self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT)
484
    result = job.ChangePriority(5)
485
    self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT)
486
    self.assertTrue(compat.all(op.priority == constants.OP_PRIO_DEFAULT
487
                               for op in job.ops))
488
    self.assertEqual(map(operator.attrgetter("status"), job.ops), [
489
      constants.OP_STATUS_SUCCESS,
490
      constants.OP_STATUS_SUCCESS,
491
      constants.OP_STATUS_CANCELING,
492
      constants.OP_STATUS_CANCELING,
493
      ])
494
    self.assertEqual(result, (False, "Job 31572 is cancelling"))
495

    
496
  def testChangePriorityFirstRunning(self):
497
    job = self._JobForPriority(1716215889)
498

    
499
    for (idx, op) in enumerate(job.ops):
500
      if idx == 0:
501
        op.status = constants.OP_STATUS_RUNNING
502
      else:
503
        op.status = constants.OP_STATUS_QUEUED
504

    
505
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
506
    self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT)
507
    result = job.ChangePriority(7)
508
    self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT)
509
    self.assertEqual(map(operator.attrgetter("priority"), job.ops),
510
                     [constants.OP_PRIO_DEFAULT, 7, 7, 7])
511
    self.assertEqual(map(operator.attrgetter("status"), job.ops), [
512
      constants.OP_STATUS_RUNNING,
513
      constants.OP_STATUS_QUEUED,
514
      constants.OP_STATUS_QUEUED,
515
      constants.OP_STATUS_QUEUED,
516
      ])
517
    self.assertEqual(result,
518
                     (True, ("Priorities of pending opcodes for job"
519
                             " 1716215889 have been changed to 7")))
520

    
521
  def testChangePriorityLastRunning(self):
522
    job = self._JobForPriority(1308)
523

    
524
    for (idx, op) in enumerate(job.ops):
525
      if idx == (len(job.ops) - 1):
526
        op.status = constants.OP_STATUS_RUNNING
527
      else:
528
        op.status = constants.OP_STATUS_SUCCESS
529

    
530
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
531
    self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT)
532
    result = job.ChangePriority(-3)
533
    self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT)
534
    self.assertTrue(compat.all(op.priority == constants.OP_PRIO_DEFAULT
535
                               for op in job.ops))
536
    self.assertEqual(map(operator.attrgetter("status"), job.ops), [
537
      constants.OP_STATUS_SUCCESS,
538
      constants.OP_STATUS_SUCCESS,
539
      constants.OP_STATUS_SUCCESS,
540
      constants.OP_STATUS_RUNNING,
541
      ])
542
    self.assertEqual(result, (False, "Job 1308 had no pending opcodes"))
543

    
544
  def testChangePrioritySecondOpcodeRunning(self):
545
    job = self._JobForPriority(27701)
546

    
547
    self.assertEqual(len(job.ops), 4)
548
    job.ops[0].status = constants.OP_STATUS_SUCCESS
549
    job.ops[1].status = constants.OP_STATUS_RUNNING
550
    job.ops[2].status = constants.OP_STATUS_QUEUED
551
    job.ops[3].status = constants.OP_STATUS_QUEUED
552

    
553
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
554
    result = job.ChangePriority(-19)
555
    self.assertEqual(job.CalcPriority(), -19)
556
    self.assertEqual(map(operator.attrgetter("priority"), job.ops),
557
                     [constants.OP_PRIO_DEFAULT, constants.OP_PRIO_DEFAULT,
558
                      -19, -19])
559
    self.assertEqual(map(operator.attrgetter("status"), job.ops), [
560
      constants.OP_STATUS_SUCCESS,
561
      constants.OP_STATUS_RUNNING,
562
      constants.OP_STATUS_QUEUED,
563
      constants.OP_STATUS_QUEUED,
564
      ])
565
    self.assertEqual(result,
566
                     (True, ("Priorities of pending opcodes for job"
567
                             " 27701 have been changed to -19")))
568

    
569
  def testChangePriorityWithInconsistentJob(self):
570
    job = self._JobForPriority(30097)
571

    
572
    self.assertEqual(len(job.ops), 4)
573

    
574
    # This job is invalid (as it has two opcodes marked as running) and make
575
    # the call fail because an unprocessed opcode precedes a running one (which
576
    # should never happen in reality)
577
    job.ops[0].status = constants.OP_STATUS_SUCCESS
578
    job.ops[1].status = constants.OP_STATUS_RUNNING
579
    job.ops[2].status = constants.OP_STATUS_QUEUED
580
    job.ops[3].status = constants.OP_STATUS_RUNNING
581

    
582
    self.assertRaises(AssertionError, job.ChangePriority, 19)
583

    
584
  def testCalcStatus(self):
585
    def _Queued(ops):
586
      # The default status is "queued"
587
      self.assert_(compat.all(op.status == constants.OP_STATUS_QUEUED
588
                              for op in ops))
589

    
590
    def _Waitlock1(ops):
591
      ops[0].status = constants.OP_STATUS_WAITING
592

    
593
    def _Waitlock2(ops):
594
      ops[0].status = constants.OP_STATUS_SUCCESS
595
      ops[1].status = constants.OP_STATUS_SUCCESS
596
      ops[2].status = constants.OP_STATUS_WAITING
597

    
598
    def _Running(ops):
599
      ops[0].status = constants.OP_STATUS_SUCCESS
600
      ops[1].status = constants.OP_STATUS_RUNNING
601
      for op in ops[2:]:
602
        op.status = constants.OP_STATUS_QUEUED
603

    
604
    def _Canceling1(ops):
605
      ops[0].status = constants.OP_STATUS_SUCCESS
606
      ops[1].status = constants.OP_STATUS_SUCCESS
607
      for op in ops[2:]:
608
        op.status = constants.OP_STATUS_CANCELING
609

    
610
    def _Canceling2(ops):
611
      for op in ops:
612
        op.status = constants.OP_STATUS_CANCELING
613

    
614
    def _Canceled(ops):
615
      for op in ops:
616
        op.status = constants.OP_STATUS_CANCELED
617

    
618
    def _Error1(ops):
619
      for idx, op in enumerate(ops):
620
        if idx > 3:
621
          op.status = constants.OP_STATUS_ERROR
622
        else:
623
          op.status = constants.OP_STATUS_SUCCESS
624

    
625
    def _Error2(ops):
626
      for op in ops:
627
        op.status = constants.OP_STATUS_ERROR
628

    
629
    def _Success(ops):
630
      for op in ops:
631
        op.status = constants.OP_STATUS_SUCCESS
632

    
633
    tests = {
634
      constants.JOB_STATUS_QUEUED: [_Queued],
635
      constants.JOB_STATUS_WAITING: [_Waitlock1, _Waitlock2],
636
      constants.JOB_STATUS_RUNNING: [_Running],
637
      constants.JOB_STATUS_CANCELING: [_Canceling1, _Canceling2],
638
      constants.JOB_STATUS_CANCELED: [_Canceled],
639
      constants.JOB_STATUS_ERROR: [_Error1, _Error2],
640
      constants.JOB_STATUS_SUCCESS: [_Success],
641
      }
642

    
643
    def _NewJob():
644
      job = jqueue._QueuedJob(None, 1,
645
                              [opcodes.OpTestDelay() for _ in range(10)],
646
                              True)
647
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
648
      self.assert_(compat.all(op.status == constants.OP_STATUS_QUEUED
649
                              for op in job.ops))
650
      return job
651

    
652
    for status in constants.JOB_STATUS_ALL:
653
      sttests = tests[status]
654
      assert sttests
655
      for fn in sttests:
656
        job = _NewJob()
657
        fn(job.ops)
658
        self.assertEqual(job.CalcStatus(), status)
659

    
660

    
661
class _FakeDependencyManager:
662
  def __init__(self):
663
    self._checks = []
664
    self._notifications = []
665
    self._waiting = set()
666

    
667
  def AddCheckResult(self, job, dep_job_id, dep_status, result):
668
    self._checks.append((job, dep_job_id, dep_status, result))
669

    
670
  def CountPendingResults(self):
671
    return len(self._checks)
672

    
673
  def CountWaitingJobs(self):
674
    return len(self._waiting)
675

    
676
  def GetNextNotification(self):
677
    return self._notifications.pop(0)
678

    
679
  def JobWaiting(self, job):
680
    return job in self._waiting
681

    
682
  def CheckAndRegister(self, job, dep_job_id, dep_status):
683
    (exp_job, exp_dep_job_id, exp_dep_status, result) = self._checks.pop(0)
684

    
685
    assert exp_job == job
686
    assert exp_dep_job_id == dep_job_id
687
    assert exp_dep_status == dep_status
688

    
689
    (result_status, _) = result
690

    
691
    if result_status == jqueue._JobDependencyManager.WAIT:
692
      self._waiting.add(job)
693
    elif result_status == jqueue._JobDependencyManager.CONTINUE:
694
      self._waiting.remove(job)
695

    
696
    return result
697

    
698
  def NotifyWaiters(self, job_id):
699
    self._notifications.append(job_id)
700

    
701

    
702
class _DisabledFakeDependencyManager:
703
  def JobWaiting(self, _):
704
    return False
705

    
706
  def CheckAndRegister(self, *args):
707
    assert False, "Should not be called"
708

    
709
  def NotifyWaiters(self, _):
710
    pass
711

    
712

    
713
class _FakeQueueForProc:
714
  def __init__(self, depmgr=None):
715
    self._acquired = False
716
    self._updates = []
717
    self._submitted = []
718
    self._accepting_jobs = True
719

    
720
    self._submit_count = itertools.count(1000)
721

    
722
    if depmgr:
723
      self.depmgr = depmgr
724
    else:
725
      self.depmgr = _DisabledFakeDependencyManager()
726

    
727
  def IsAcquired(self):
728
    return self._acquired
729

    
730
  def GetNextUpdate(self):
731
    return self._updates.pop(0)
732

    
733
  def GetNextSubmittedJob(self):
734
    return self._submitted.pop(0)
735

    
736
  def acquire(self, shared=0):
737
    assert shared == 1
738
    self._acquired = True
739

    
740
  def release(self):
741
    assert self._acquired
742
    self._acquired = False
743

    
744
  def UpdateJobUnlocked(self, job, replicate=True):
745
    assert self._acquired, "Lock not acquired while updating job"
746
    self._updates.append((job, bool(replicate)))
747

    
748
  def SubmitManyJobs(self, jobs):
749
    assert not self._acquired, "Lock acquired while submitting jobs"
750
    job_ids = [self._submit_count.next() for _ in jobs]
751
    self._submitted.extend(zip(job_ids, jobs))
752
    return job_ids
753

    
754
  def StopAcceptingJobs(self):
755
    self._accepting_jobs = False
756

    
757
  def AcceptingJobsUnlocked(self):
758
    return self._accepting_jobs
759

    
760

    
761
class _FakeExecOpCodeForProc:
762
  def __init__(self, queue, before_start, after_start):
763
    self._queue = queue
764
    self._before_start = before_start
765
    self._after_start = after_start
766

    
767
  def __call__(self, op, cbs, timeout=None):
768
    assert isinstance(op, opcodes.OpTestDummy)
769
    assert not self._queue.IsAcquired(), \
770
           "Queue lock not released when executing opcode"
771

    
772
    if self._before_start:
773
      self._before_start(timeout, cbs.CurrentPriority())
774

    
775
    cbs.NotifyStart()
776

    
777
    if self._after_start:
778
      self._after_start(op, cbs)
779

    
780
    # Check again after the callbacks
781
    assert not self._queue.IsAcquired()
782

    
783
    if op.fail:
784
      raise errors.OpExecError("Error requested (%s)" % op.result)
785

    
786
    if hasattr(op, "submit_jobs") and op.submit_jobs is not None:
787
      return cbs.SubmitManyJobs(op.submit_jobs)
788

    
789
    return op.result
790

    
791

    
792
class _JobProcessorTestUtils:
793
  def _CreateJob(self, queue, job_id, ops):
794
    job = jqueue._QueuedJob(queue, job_id, ops, True)
795
    self.assertFalse(job.start_timestamp)
796
    self.assertFalse(job.end_timestamp)
797
    self.assertEqual(len(ops), len(job.ops))
798
    self.assert_(compat.all(op.input == inp
799
                            for (op, inp) in zip(job.ops, ops)))
800
    self.assertEqual(job.GetInfo(["ops"]), [[op.__getstate__() for op in ops]])
801
    return job
802

    
803

    
804
class TestJobProcessor(unittest.TestCase, _JobProcessorTestUtils):
805
  def _GenericCheckJob(self, job):
806
    assert compat.all(isinstance(op.input, opcodes.OpTestDummy)
807
                      for op in job.ops)
808

    
809
    self.assertEqual(job.GetInfo(["opstart", "opexec", "opend"]),
810
                     [[op.start_timestamp for op in job.ops],
811
                      [op.exec_timestamp for op in job.ops],
812
                      [op.end_timestamp for op in job.ops]])
813
    self.assertEqual(job.GetInfo(["received_ts", "start_ts", "end_ts"]),
814
                     [job.received_timestamp,
815
                      job.start_timestamp,
816
                      job.end_timestamp])
817
    self.assert_(job.start_timestamp)
818
    self.assert_(job.end_timestamp)
819
    self.assertEqual(job.start_timestamp, job.ops[0].start_timestamp)
820

    
821
  def testSuccess(self):
822
    queue = _FakeQueueForProc()
823

    
824
    for (job_id, opcount) in [(25351, 1), (6637, 3),
825
                              (24644, 10), (32207, 100)]:
826
      ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
827
             for i in range(opcount)]
828

    
829
      # Create job
830
      job = self._CreateJob(queue, job_id, ops)
831

    
832
      def _BeforeStart(timeout, priority):
833
        self.assertEqual(queue.GetNextUpdate(), (job, True))
834
        self.assertRaises(IndexError, queue.GetNextUpdate)
835
        self.assertFalse(queue.IsAcquired())
836
        self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITING)
837
        self.assertFalse(job.cur_opctx)
838

    
839
      def _AfterStart(op, cbs):
840
        self.assertEqual(queue.GetNextUpdate(), (job, True))
841
        self.assertRaises(IndexError, queue.GetNextUpdate)
842

    
843
        self.assertFalse(queue.IsAcquired())
844
        self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
845
        self.assertFalse(job.cur_opctx)
846

    
847
        # Job is running, cancelling shouldn't be possible
848
        (success, _) = job.Cancel()
849
        self.assertFalse(success)
850

    
851
      opexec = _FakeExecOpCodeForProc(queue, _BeforeStart, _AfterStart)
852

    
853
      for idx in range(len(ops)):
854
        self.assertRaises(IndexError, queue.GetNextUpdate)
855
        result = jqueue._JobProcessor(queue, opexec, job)()
856
        self.assertEqual(queue.GetNextUpdate(), (job, True))
857
        self.assertRaises(IndexError, queue.GetNextUpdate)
858
        if idx == len(ops) - 1:
859
          # Last opcode
860
          self.assertEqual(result, jqueue._JobProcessor.FINISHED)
861
        else:
862
          self.assertEqual(result, jqueue._JobProcessor.DEFER)
863

    
864
          self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
865
          self.assert_(job.start_timestamp)
866
          self.assertFalse(job.end_timestamp)
867

    
868
      self.assertRaises(IndexError, queue.GetNextUpdate)
869

    
870
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_SUCCESS)
871
      self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_SUCCESS])
872
      self.assertEqual(job.GetInfo(["opresult"]),
873
                       [[op.input.result for op in job.ops]])
874
      self.assertEqual(job.GetInfo(["opstatus"]),
875
                       [len(job.ops) * [constants.OP_STATUS_SUCCESS]])
876
      self.assert_(compat.all(op.start_timestamp and op.end_timestamp
877
                              for op in job.ops))
878

    
879
      self._GenericCheckJob(job)
880

    
881
      # Calling the processor on a finished job should be a no-op
882
      self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
883
                       jqueue._JobProcessor.FINISHED)
884
      self.assertRaises(IndexError, queue.GetNextUpdate)
885

    
886
  def testOpcodeError(self):
887
    queue = _FakeQueueForProc()
888

    
889
    testdata = [
890
      (17077, 1, 0, 0),
891
      (1782, 5, 2, 2),
892
      (18179, 10, 9, 9),
893
      (4744, 10, 3, 8),
894
      (23816, 100, 39, 45),
895
      ]
896

    
897
    for (job_id, opcount, failfrom, failto) in testdata:
898
      # Prepare opcodes
899
      ops = [opcodes.OpTestDummy(result="Res%s" % i,
900
                                 fail=(failfrom <= i and
901
                                       i <= failto))
902
             for i in range(opcount)]
903

    
904
      # Create job
905
      job = self._CreateJob(queue, str(job_id), ops)
906

    
907
      opexec = _FakeExecOpCodeForProc(queue, None, None)
908

    
909
      for idx in range(len(ops)):
910
        self.assertRaises(IndexError, queue.GetNextUpdate)
911
        result = jqueue._JobProcessor(queue, opexec, job)()
912
        # queued to waitlock
913
        self.assertEqual(queue.GetNextUpdate(), (job, True))
914
        # waitlock to running
915
        self.assertEqual(queue.GetNextUpdate(), (job, True))
916
        # Opcode result
917
        self.assertEqual(queue.GetNextUpdate(), (job, True))
918
        self.assertRaises(IndexError, queue.GetNextUpdate)
919

    
920
        if idx in (failfrom, len(ops) - 1):
921
          # Last opcode
922
          self.assertEqual(result, jqueue._JobProcessor.FINISHED)
923
          break
924

    
925
        self.assertEqual(result, jqueue._JobProcessor.DEFER)
926

    
927
        self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
928

    
929
      self.assertRaises(IndexError, queue.GetNextUpdate)
930

    
931
      # Check job status
932
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_ERROR)
933
      self.assertEqual(job.GetInfo(["id"]), [job_id])
934
      self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_ERROR])
935

    
936
      # Check opcode status
937
      data = zip(job.ops,
938
                 job.GetInfo(["opstatus"])[0],
939
                 job.GetInfo(["opresult"])[0])
940

    
941
      for idx, (op, opstatus, opresult) in enumerate(data):
942
        if idx < failfrom:
943
          assert not op.input.fail
944
          self.assertEqual(opstatus, constants.OP_STATUS_SUCCESS)
945
          self.assertEqual(opresult, op.input.result)
946
        elif idx <= failto:
947
          assert op.input.fail
948
          self.assertEqual(opstatus, constants.OP_STATUS_ERROR)
949
          self.assertRaises(errors.OpExecError, errors.MaybeRaise, opresult)
950
        else:
951
          assert not op.input.fail
952
          self.assertEqual(opstatus, constants.OP_STATUS_ERROR)
953
          self.assertRaises(errors.OpExecError, errors.MaybeRaise, opresult)
954

    
955
      self.assert_(compat.all(op.start_timestamp and op.end_timestamp
956
                              for op in job.ops[:failfrom]))
957

    
958
      self._GenericCheckJob(job)
959

    
960
      # Calling the processor on a finished job should be a no-op
961
      self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
962
                       jqueue._JobProcessor.FINISHED)
963
      self.assertRaises(IndexError, queue.GetNextUpdate)
964

    
965
  def testCancelWhileInQueue(self):
966
    queue = _FakeQueueForProc()
967

    
968
    ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
969
           for i in range(5)]
970

    
971
    # Create job
972
    job_id = 17045
973
    job = self._CreateJob(queue, job_id, ops)
974

    
975
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
976

    
977
    # Mark as cancelled
978
    (success, _) = job.Cancel()
979
    self.assert_(success)
980

    
981
    self.assertRaises(IndexError, queue.GetNextUpdate)
982

    
983
    self.assertFalse(job.start_timestamp)
984
    self.assertTrue(job.end_timestamp)
985
    self.assert_(compat.all(op.status == constants.OP_STATUS_CANCELED
986
                            for op in job.ops))
987

    
988
    # Serialize to check for differences
989
    before_proc = job.Serialize()
990

    
991
    # Simulate processor called in workerpool
992
    opexec = _FakeExecOpCodeForProc(queue, None, None)
993
    self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
994
                     jqueue._JobProcessor.FINISHED)
995

    
996
    # Check result
997
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_CANCELED)
998
    self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_CANCELED])
999
    self.assertFalse(job.start_timestamp)
1000
    self.assertTrue(job.end_timestamp)
1001
    self.assertFalse(compat.any(op.start_timestamp or op.end_timestamp
1002
                                for op in job.ops))
1003
    self.assertEqual(job.GetInfo(["opstatus", "opresult"]),
1004
                     [[constants.OP_STATUS_CANCELED for _ in job.ops],
1005
                      ["Job canceled by request" for _ in job.ops]])
1006

    
1007
    # Must not have changed or written
1008
    self.assertEqual(before_proc, job.Serialize())
1009
    self.assertRaises(IndexError, queue.GetNextUpdate)
1010

    
1011
  def testCancelWhileWaitlockInQueue(self):
1012
    queue = _FakeQueueForProc()
1013

    
1014
    ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
1015
           for i in range(5)]
1016

    
1017
    # Create job
1018
    job_id = 8645
1019
    job = self._CreateJob(queue, job_id, ops)
1020

    
1021
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1022

    
1023
    job.ops[0].status = constants.OP_STATUS_WAITING
1024

    
1025
    assert len(job.ops) == 5
1026

    
1027
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITING)
1028

    
1029
    # Mark as cancelling
1030
    (success, _) = job.Cancel()
1031
    self.assert_(success)
1032

    
1033
    self.assertRaises(IndexError, queue.GetNextUpdate)
1034

    
1035
    self.assert_(compat.all(op.status == constants.OP_STATUS_CANCELING
1036
                            for op in job.ops))
1037

    
1038
    opexec = _FakeExecOpCodeForProc(queue, None, None)
1039
    self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
1040
                     jqueue._JobProcessor.FINISHED)
1041

    
1042
    # Check result
1043
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_CANCELED)
1044
    self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_CANCELED])
1045
    self.assertFalse(job.start_timestamp)
1046
    self.assert_(job.end_timestamp)
1047
    self.assertFalse(compat.any(op.start_timestamp or op.end_timestamp
1048
                                for op in job.ops))
1049
    self.assertEqual(job.GetInfo(["opstatus", "opresult"]),
1050
                     [[constants.OP_STATUS_CANCELED for _ in job.ops],
1051
                      ["Job canceled by request" for _ in job.ops]])
1052

    
1053
  def testCancelWhileWaitlock(self):
1054
    queue = _FakeQueueForProc()
1055

    
1056
    ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
1057
           for i in range(5)]
1058

    
1059
    # Create job
1060
    job_id = 11009
1061
    job = self._CreateJob(queue, job_id, ops)
1062

    
1063
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1064

    
1065
    def _BeforeStart(timeout, priority):
1066
      self.assertEqual(queue.GetNextUpdate(), (job, True))
1067
      self.assertRaises(IndexError, queue.GetNextUpdate)
1068
      self.assertFalse(queue.IsAcquired())
1069
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITING)
1070

    
1071
      # Mark as cancelled
1072
      (success, _) = job.Cancel()
1073
      self.assert_(success)
1074

    
1075
      self.assert_(compat.all(op.status == constants.OP_STATUS_CANCELING
1076
                              for op in job.ops))
1077
      self.assertRaises(IndexError, queue.GetNextUpdate)
1078

    
1079
    def _AfterStart(op, cbs):
1080
      self.assertEqual(queue.GetNextUpdate(), (job, True))
1081
      self.assertRaises(IndexError, queue.GetNextUpdate)
1082
      self.assertFalse(queue.IsAcquired())
1083
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
1084

    
1085
    opexec = _FakeExecOpCodeForProc(queue, _BeforeStart, _AfterStart)
1086

    
1087
    self.assertRaises(IndexError, queue.GetNextUpdate)
1088
    self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
1089
                     jqueue._JobProcessor.FINISHED)
1090
    self.assertEqual(queue.GetNextUpdate(), (job, True))
1091
    self.assertRaises(IndexError, queue.GetNextUpdate)
1092

    
1093
    # Check result
1094
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_CANCELED)
1095
    self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_CANCELED])
1096
    self.assert_(job.start_timestamp)
1097
    self.assert_(job.end_timestamp)
1098
    self.assertFalse(compat.all(op.start_timestamp and op.end_timestamp
1099
                                for op in job.ops))
1100
    self.assertEqual(job.GetInfo(["opstatus", "opresult"]),
1101
                     [[constants.OP_STATUS_CANCELED for _ in job.ops],
1102
                      ["Job canceled by request" for _ in job.ops]])
1103

    
1104
  def _TestCancelWhileSomething(self, cb):
1105
    queue = _FakeQueueForProc()
1106

    
1107
    ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
1108
           for i in range(5)]
1109

    
1110
    # Create job
1111
    job_id = 24314
1112
    job = self._CreateJob(queue, job_id, ops)
1113

    
1114
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1115

    
1116
    def _BeforeStart(timeout, priority):
1117
      self.assertFalse(queue.IsAcquired())
1118
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITING)
1119

    
1120
      # Mark as cancelled
1121
      (success, _) = job.Cancel()
1122
      self.assert_(success)
1123

    
1124
      self.assert_(compat.all(op.status == constants.OP_STATUS_CANCELING
1125
                              for op in job.ops))
1126

    
1127
      cb(queue)
1128

    
1129
    def _AfterStart(op, cbs):
1130
      self.fail("Should not reach this")
1131

    
1132
    opexec = _FakeExecOpCodeForProc(queue, _BeforeStart, _AfterStart)
1133

    
1134
    self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
1135
                     jqueue._JobProcessor.FINISHED)
1136

    
1137
    # Check result
1138
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_CANCELED)
1139
    self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_CANCELED])
1140
    self.assert_(job.start_timestamp)
1141
    self.assert_(job.end_timestamp)
1142
    self.assertFalse(compat.all(op.start_timestamp and op.end_timestamp
1143
                                for op in job.ops))
1144
    self.assertEqual(job.GetInfo(["opstatus", "opresult"]),
1145
                     [[constants.OP_STATUS_CANCELED for _ in job.ops],
1146
                      ["Job canceled by request" for _ in job.ops]])
1147

    
1148
    return queue
1149

    
1150
  def testCancelWhileWaitlockWithTimeout(self):
1151
    def fn(_):
1152
      # Fake an acquire attempt timing out
1153
      raise mcpu.LockAcquireTimeout()
1154

    
1155
    self._TestCancelWhileSomething(fn)
1156

    
1157
  def testCancelDuringQueueShutdown(self):
1158
    queue = self._TestCancelWhileSomething(lambda q: q.StopAcceptingJobs())
1159
    self.assertFalse(queue.AcceptingJobsUnlocked())
1160

    
1161
  def testCancelWhileRunning(self):
1162
    # Tests canceling a job with finished opcodes and more, unprocessed ones
1163
    queue = _FakeQueueForProc()
1164

    
1165
    ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
1166
           for i in range(3)]
1167

    
1168
    # Create job
1169
    job_id = 28492
1170
    job = self._CreateJob(queue, job_id, ops)
1171

    
1172
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1173

    
1174
    opexec = _FakeExecOpCodeForProc(queue, None, None)
1175

    
1176
    # Run one opcode
1177
    self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
1178
                     jqueue._JobProcessor.DEFER)
1179

    
1180
    # Job goes back to queued
1181
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1182
    self.assertEqual(job.GetInfo(["opstatus", "opresult"]),
1183
                     [[constants.OP_STATUS_SUCCESS,
1184
                       constants.OP_STATUS_QUEUED,
1185
                       constants.OP_STATUS_QUEUED],
1186
                      ["Res0", None, None]])
1187

    
1188
    # Mark as cancelled
1189
    (success, _) = job.Cancel()
1190
    self.assert_(success)
1191

    
1192
    # Try processing another opcode (this will actually cancel the job)
1193
    self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
1194
                     jqueue._JobProcessor.FINISHED)
1195

    
1196
    # Check result
1197
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_CANCELED)
1198
    self.assertEqual(job.GetInfo(["id"]), [job_id])
1199
    self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_CANCELED])
1200
    self.assertEqual(job.GetInfo(["opstatus", "opresult"]),
1201
                     [[constants.OP_STATUS_SUCCESS,
1202
                       constants.OP_STATUS_CANCELED,
1203
                       constants.OP_STATUS_CANCELED],
1204
                      ["Res0", "Job canceled by request",
1205
                       "Job canceled by request"]])
1206

    
1207
  def _TestQueueShutdown(self, queue, opexec, job, runcount):
1208
    self.assertTrue(queue.AcceptingJobsUnlocked())
1209

    
1210
    # Simulate shutdown
1211
    queue.StopAcceptingJobs()
1212

    
1213
    self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
1214
                     jqueue._JobProcessor.DEFER)
1215

    
1216
    # Check result
1217
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1218
    self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_QUEUED])
1219
    self.assertFalse(job.cur_opctx)
1220
    self.assertTrue(job.start_timestamp)
1221
    self.assertFalse(job.end_timestamp)
1222
    self.assertEqual(job.start_timestamp, job.ops[0].start_timestamp)
1223
    self.assertTrue(compat.all(op.start_timestamp and op.end_timestamp
1224
                               for op in job.ops[:runcount]))
1225
    self.assertFalse(job.ops[runcount].end_timestamp)
1226
    self.assertFalse(compat.any(op.start_timestamp or op.end_timestamp
1227
                                for op in job.ops[(runcount + 1):]))
1228
    self.assertEqual(job.GetInfo(["opstatus", "opresult"]),
1229
                     [(([constants.OP_STATUS_SUCCESS] * runcount) +
1230
                       ([constants.OP_STATUS_QUEUED] *
1231
                        (len(job.ops) - runcount))),
1232
                      (["Res%s" % i for i in range(runcount)] +
1233
                       ([None] * (len(job.ops) - runcount)))])
1234

    
1235
    # Must have been written and replicated
1236
    self.assertEqual(queue.GetNextUpdate(), (job, True))
1237
    self.assertRaises(IndexError, queue.GetNextUpdate)
1238

    
1239
  def testQueueShutdownWhileRunning(self):
1240
    # Tests shutting down the queue while a job is running
1241
    queue = _FakeQueueForProc()
1242

    
1243
    ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
1244
           for i in range(3)]
1245

    
1246
    # Create job
1247
    job_id = 2718211587
1248
    job = self._CreateJob(queue, job_id, ops)
1249

    
1250
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1251

    
1252
    opexec = _FakeExecOpCodeForProc(queue, None, None)
1253

    
1254
    self.assertRaises(IndexError, queue.GetNextUpdate)
1255

    
1256
    # Run one opcode
1257
    self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
1258
                     jqueue._JobProcessor.DEFER)
1259

    
1260
    # Job goes back to queued
1261
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1262
    self.assertEqual(job.GetInfo(["opstatus", "opresult"]),
1263
                     [[constants.OP_STATUS_SUCCESS,
1264
                       constants.OP_STATUS_QUEUED,
1265
                       constants.OP_STATUS_QUEUED],
1266
                      ["Res0", None, None]])
1267
    self.assertFalse(job.cur_opctx)
1268

    
1269
    # Writes for waiting, running and result
1270
    for _ in range(3):
1271
      self.assertEqual(queue.GetNextUpdate(), (job, True))
1272

    
1273
    # Run second opcode
1274
    self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
1275
                     jqueue._JobProcessor.DEFER)
1276

    
1277
    # Job goes back to queued
1278
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1279
    self.assertEqual(job.GetInfo(["opstatus", "opresult"]),
1280
                     [[constants.OP_STATUS_SUCCESS,
1281
                       constants.OP_STATUS_SUCCESS,
1282
                       constants.OP_STATUS_QUEUED],
1283
                      ["Res0", "Res1", None]])
1284
    self.assertFalse(job.cur_opctx)
1285

    
1286
    # Writes for waiting, running and result
1287
    for _ in range(3):
1288
      self.assertEqual(queue.GetNextUpdate(), (job, True))
1289

    
1290
    self._TestQueueShutdown(queue, opexec, job, 2)
1291

    
1292
  def testQueueShutdownWithLockTimeout(self):
1293
    # Tests shutting down while a lock acquire times out
1294
    queue = _FakeQueueForProc()
1295

    
1296
    ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
1297
           for i in range(3)]
1298

    
1299
    # Create job
1300
    job_id = 1304231178
1301
    job = self._CreateJob(queue, job_id, ops)
1302

    
1303
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1304

    
1305
    acquire_timeout = False
1306

    
1307
    def _BeforeStart(timeout, priority):
1308
      self.assertFalse(queue.IsAcquired())
1309
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITING)
1310
      if acquire_timeout:
1311
        raise mcpu.LockAcquireTimeout()
1312

    
1313
    opexec = _FakeExecOpCodeForProc(queue, _BeforeStart, None)
1314

    
1315
    self.assertRaises(IndexError, queue.GetNextUpdate)
1316

    
1317
    # Run one opcode
1318
    self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
1319
                     jqueue._JobProcessor.DEFER)
1320

    
1321
    # Job goes back to queued
1322
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1323
    self.assertEqual(job.GetInfo(["opstatus", "opresult"]),
1324
                     [[constants.OP_STATUS_SUCCESS,
1325
                       constants.OP_STATUS_QUEUED,
1326
                       constants.OP_STATUS_QUEUED],
1327
                      ["Res0", None, None]])
1328
    self.assertFalse(job.cur_opctx)
1329

    
1330
    # Writes for waiting, running and result
1331
    for _ in range(3):
1332
      self.assertEqual(queue.GetNextUpdate(), (job, True))
1333

    
1334
    # The next opcode should have expiring lock acquires
1335
    acquire_timeout = True
1336

    
1337
    self._TestQueueShutdown(queue, opexec, job, 1)
1338

    
1339
  def testQueueShutdownWhileInQueue(self):
1340
    # This should never happen in reality (no new jobs are started by the
1341
    # workerpool once a shutdown has been initiated), but it's better to test
1342
    # the job processor for this scenario
1343
    queue = _FakeQueueForProc()
1344

    
1345
    ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
1346
           for i in range(5)]
1347

    
1348
    # Create job
1349
    job_id = 2031
1350
    job = self._CreateJob(queue, job_id, ops)
1351

    
1352
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1353
    self.assertRaises(IndexError, queue.GetNextUpdate)
1354

    
1355
    self.assertFalse(job.start_timestamp)
1356
    self.assertFalse(job.end_timestamp)
1357
    self.assertTrue(compat.all(op.status == constants.OP_STATUS_QUEUED
1358
                               for op in job.ops))
1359

    
1360
    opexec = _FakeExecOpCodeForProc(queue, None, None)
1361
    self._TestQueueShutdown(queue, opexec, job, 0)
1362

    
1363
  def testQueueShutdownWhileWaitlockInQueue(self):
1364
    queue = _FakeQueueForProc()
1365

    
1366
    ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
1367
           for i in range(5)]
1368

    
1369
    # Create job
1370
    job_id = 53125685
1371
    job = self._CreateJob(queue, job_id, ops)
1372

    
1373
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1374

    
1375
    job.ops[0].status = constants.OP_STATUS_WAITING
1376

    
1377
    assert len(job.ops) == 5
1378

    
1379
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITING)
1380

    
1381
    self.assertRaises(IndexError, queue.GetNextUpdate)
1382

    
1383
    opexec = _FakeExecOpCodeForProc(queue, None, None)
1384
    self._TestQueueShutdown(queue, opexec, job, 0)
1385

    
1386
  def testPartiallyRun(self):
1387
    # Tests calling the processor on a job that's been partially run before the
1388
    # program was restarted
1389
    queue = _FakeQueueForProc()
1390

    
1391
    opexec = _FakeExecOpCodeForProc(queue, None, None)
1392

    
1393
    for job_id, successcount in [(30697, 1), (2552, 4), (12489, 9)]:
1394
      ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
1395
             for i in range(10)]
1396

    
1397
      # Create job
1398
      job = self._CreateJob(queue, job_id, ops)
1399

    
1400
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1401

    
1402
      for _ in range(successcount):
1403
        self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
1404
                         jqueue._JobProcessor.DEFER)
1405

    
1406
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1407
      self.assertEqual(job.GetInfo(["opstatus"]),
1408
                       [[constants.OP_STATUS_SUCCESS
1409
                         for _ in range(successcount)] +
1410
                        [constants.OP_STATUS_QUEUED
1411
                         for _ in range(len(ops) - successcount)]])
1412

    
1413
      self.assert_(job.ops_iter)
1414

    
1415
      # Serialize and restore (simulates program restart)
1416
      newjob = jqueue._QueuedJob.Restore(queue, job.Serialize(), True, False)
1417
      self.assertFalse(newjob.ops_iter)
1418
      self._TestPartial(newjob, successcount)
1419

    
1420
  def _TestPartial(self, job, successcount):
1421
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1422
    self.assertEqual(job.start_timestamp, job.ops[0].start_timestamp)
1423

    
1424
    queue = _FakeQueueForProc()
1425
    opexec = _FakeExecOpCodeForProc(queue, None, None)
1426

    
1427
    for remaining in reversed(range(len(job.ops) - successcount)):
1428
      result = jqueue._JobProcessor(queue, opexec, job)()
1429
      self.assertEqual(queue.GetNextUpdate(), (job, True))
1430
      self.assertEqual(queue.GetNextUpdate(), (job, True))
1431
      self.assertEqual(queue.GetNextUpdate(), (job, True))
1432
      self.assertRaises(IndexError, queue.GetNextUpdate)
1433

    
1434
      if remaining == 0:
1435
        # Last opcode
1436
        self.assertEqual(result, jqueue._JobProcessor.FINISHED)
1437
        break
1438

    
1439
      self.assertEqual(result, jqueue._JobProcessor.DEFER)
1440

    
1441
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1442

    
1443
    self.assertRaises(IndexError, queue.GetNextUpdate)
1444
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_SUCCESS)
1445
    self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_SUCCESS])
1446
    self.assertEqual(job.GetInfo(["opresult"]),
1447
                     [[op.input.result for op in job.ops]])
1448
    self.assertEqual(job.GetInfo(["opstatus"]),
1449
                     [[constants.OP_STATUS_SUCCESS for _ in job.ops]])
1450
    self.assert_(compat.all(op.start_timestamp and op.end_timestamp
1451
                            for op in job.ops))
1452

    
1453
    self._GenericCheckJob(job)
1454

    
1455
    # Calling the processor on a finished job should be a no-op
1456
    self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
1457
                     jqueue._JobProcessor.FINISHED)
1458
    self.assertRaises(IndexError, queue.GetNextUpdate)
1459

    
1460
    # ... also after being restored
1461
    job2 = jqueue._QueuedJob.Restore(queue, job.Serialize(), True, False)
1462
    # Calling the processor on a finished job should be a no-op
1463
    self.assertEqual(jqueue._JobProcessor(queue, opexec, job2)(),
1464
                     jqueue._JobProcessor.FINISHED)
1465
    self.assertRaises(IndexError, queue.GetNextUpdate)
1466

    
1467
  def testProcessorOnRunningJob(self):
1468
    ops = [opcodes.OpTestDummy(result="result", fail=False)]
1469

    
1470
    queue = _FakeQueueForProc()
1471
    opexec = _FakeExecOpCodeForProc(queue, None, None)
1472

    
1473
    # Create job
1474
    job = self._CreateJob(queue, 9571, ops)
1475

    
1476
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1477

    
1478
    job.ops[0].status = constants.OP_STATUS_RUNNING
1479

    
1480
    assert len(job.ops) == 1
1481

    
1482
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
1483

    
1484
    # Calling on running job must fail
1485
    self.assertRaises(errors.ProgrammerError,
1486
                      jqueue._JobProcessor(queue, opexec, job))
1487

    
1488
  def testLogMessages(self):
1489
    # Tests the "Feedback" callback function
1490
    queue = _FakeQueueForProc()
1491

    
1492
    messages = {
1493
      1: [
1494
        (None, "Hello"),
1495
        (None, "World"),
1496
        (constants.ELOG_MESSAGE, "there"),
1497
        ],
1498
      4: [
1499
        (constants.ELOG_JQUEUE_TEST, (1, 2, 3)),
1500
        (constants.ELOG_JQUEUE_TEST, ("other", "type")),
1501
        ],
1502
      }
1503
    ops = [opcodes.OpTestDummy(result="Logtest%s" % i, fail=False,
1504
                               messages=messages.get(i, []))
1505
           for i in range(5)]
1506

    
1507
    # Create job
1508
    job = self._CreateJob(queue, 29386, ops)
1509

    
1510
    def _BeforeStart(timeout, priority):
1511
      self.assertEqual(queue.GetNextUpdate(), (job, True))
1512
      self.assertRaises(IndexError, queue.GetNextUpdate)
1513
      self.assertFalse(queue.IsAcquired())
1514
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITING)
1515

    
1516
    def _AfterStart(op, cbs):
1517
      self.assertEqual(queue.GetNextUpdate(), (job, True))
1518
      self.assertRaises(IndexError, queue.GetNextUpdate)
1519
      self.assertFalse(queue.IsAcquired())
1520
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
1521

    
1522
      self.assertRaises(AssertionError, cbs.Feedback,
1523
                        "too", "many", "arguments")
1524

    
1525
      for (log_type, msg) in op.messages:
1526
        self.assertRaises(IndexError, queue.GetNextUpdate)
1527
        if log_type:
1528
          cbs.Feedback(log_type, msg)
1529
        else:
1530
          cbs.Feedback(msg)
1531
        # Check for job update without replication
1532
        self.assertEqual(queue.GetNextUpdate(), (job, False))
1533
        self.assertRaises(IndexError, queue.GetNextUpdate)
1534

    
1535
    opexec = _FakeExecOpCodeForProc(queue, _BeforeStart, _AfterStart)
1536

    
1537
    for remaining in reversed(range(len(job.ops))):
1538
      self.assertRaises(IndexError, queue.GetNextUpdate)
1539
      result = jqueue._JobProcessor(queue, opexec, job)()
1540
      self.assertEqual(queue.GetNextUpdate(), (job, True))
1541
      self.assertRaises(IndexError, queue.GetNextUpdate)
1542

    
1543
      if remaining == 0:
1544
        # Last opcode
1545
        self.assertEqual(result, jqueue._JobProcessor.FINISHED)
1546
        break
1547

    
1548
      self.assertEqual(result, jqueue._JobProcessor.DEFER)
1549

    
1550
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1551

    
1552
    self.assertRaises(IndexError, queue.GetNextUpdate)
1553

    
1554
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_SUCCESS)
1555
    self.assertEqual(job.GetInfo(["opresult"]),
1556
                     [[op.input.result for op in job.ops]])
1557

    
1558
    logmsgcount = sum(len(m) for m in messages.values())
1559

    
1560
    self._CheckLogMessages(job, logmsgcount)
1561

    
1562
    # Serialize and restore (simulates program restart)
1563
    newjob = jqueue._QueuedJob.Restore(queue, job.Serialize(), True, False)
1564
    self._CheckLogMessages(newjob, logmsgcount)
1565

    
1566
    # Check each message
1567
    prevserial = -1
1568
    for idx, oplog in enumerate(job.GetInfo(["oplog"])[0]):
1569
      for (serial, timestamp, log_type, msg) in oplog:
1570
        (exptype, expmsg) = messages.get(idx).pop(0)
1571
        if exptype:
1572
          self.assertEqual(log_type, exptype)
1573
        else:
1574
          self.assertEqual(log_type, constants.ELOG_MESSAGE)
1575
        self.assertEqual(expmsg, msg)
1576
        self.assert_(serial > prevserial)
1577
        prevserial = serial
1578

    
1579
  def _CheckLogMessages(self, job, count):
1580
    # Check serial
1581
    self.assertEqual(job.log_serial, count)
1582

    
1583
    # No filter
1584
    self.assertEqual(job.GetLogEntries(None),
1585
                     [entry for entries in job.GetInfo(["oplog"])[0] if entries
1586
                      for entry in entries])
1587

    
1588
    # Filter with serial
1589
    assert count > 3
1590
    self.assert_(job.GetLogEntries(3))
1591
    self.assertEqual(job.GetLogEntries(3),
1592
                     [entry for entries in job.GetInfo(["oplog"])[0] if entries
1593
                      for entry in entries][3:])
1594

    
1595
    # No log message after highest serial
1596
    self.assertFalse(job.GetLogEntries(count))
1597
    self.assertFalse(job.GetLogEntries(count + 3))
1598

    
1599
  def testSubmitManyJobs(self):
1600
    queue = _FakeQueueForProc()
1601

    
1602
    job_id = 15656
1603
    ops = [
1604
      opcodes.OpTestDummy(result="Res0", fail=False,
1605
                          submit_jobs=[]),
1606
      opcodes.OpTestDummy(result="Res1", fail=False,
1607
                          submit_jobs=[
1608
                            [opcodes.OpTestDummy(result="r1j0", fail=False)],
1609
                            ]),
1610
      opcodes.OpTestDummy(result="Res2", fail=False,
1611
                          submit_jobs=[
1612
                            [opcodes.OpTestDummy(result="r2j0o0", fail=False),
1613
                             opcodes.OpTestDummy(result="r2j0o1", fail=False),
1614
                             opcodes.OpTestDummy(result="r2j0o2", fail=False),
1615
                             opcodes.OpTestDummy(result="r2j0o3", fail=False)],
1616
                            [opcodes.OpTestDummy(result="r2j1", fail=False)],
1617
                            [opcodes.OpTestDummy(result="r2j3o0", fail=False),
1618
                             opcodes.OpTestDummy(result="r2j3o1", fail=False)],
1619
                            ]),
1620
      ]
1621

    
1622
    # Create job
1623
    job = self._CreateJob(queue, job_id, ops)
1624

    
1625
    def _BeforeStart(timeout, priority):
1626
      self.assertEqual(queue.GetNextUpdate(), (job, True))
1627
      self.assertRaises(IndexError, queue.GetNextUpdate)
1628
      self.assertFalse(queue.IsAcquired())
1629
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITING)
1630
      self.assertFalse(job.cur_opctx)
1631

    
1632
    def _AfterStart(op, cbs):
1633
      self.assertEqual(queue.GetNextUpdate(), (job, True))
1634
      self.assertRaises(IndexError, queue.GetNextUpdate)
1635

    
1636
      self.assertFalse(queue.IsAcquired())
1637
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
1638
      self.assertFalse(job.cur_opctx)
1639

    
1640
      # Job is running, cancelling shouldn't be possible
1641
      (success, _) = job.Cancel()
1642
      self.assertFalse(success)
1643

    
1644
    opexec = _FakeExecOpCodeForProc(queue, _BeforeStart, _AfterStart)
1645

    
1646
    for idx in range(len(ops)):
1647
      self.assertRaises(IndexError, queue.GetNextUpdate)
1648
      result = jqueue._JobProcessor(queue, opexec, job)()
1649
      self.assertEqual(queue.GetNextUpdate(), (job, True))
1650
      self.assertRaises(IndexError, queue.GetNextUpdate)
1651
      if idx == len(ops) - 1:
1652
        # Last opcode
1653
        self.assertEqual(result, jqueue._JobProcessor.FINISHED)
1654
      else:
1655
        self.assertEqual(result, jqueue._JobProcessor.DEFER)
1656

    
1657
        self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1658
        self.assert_(job.start_timestamp)
1659
        self.assertFalse(job.end_timestamp)
1660

    
1661
    self.assertRaises(IndexError, queue.GetNextUpdate)
1662

    
1663
    for idx, submitted_ops in enumerate(job_ops
1664
                                        for op in ops
1665
                                        for job_ops in op.submit_jobs):
1666
      self.assertEqual(queue.GetNextSubmittedJob(),
1667
                       (1000 + idx, submitted_ops))
1668
    self.assertRaises(IndexError, queue.GetNextSubmittedJob)
1669

    
1670
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_SUCCESS)
1671
    self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_SUCCESS])
1672
    self.assertEqual(job.GetInfo(["opresult"]),
1673
                     [[[], [1000], [1001, 1002, 1003]]])
1674
    self.assertEqual(job.GetInfo(["opstatus"]),
1675
                     [len(job.ops) * [constants.OP_STATUS_SUCCESS]])
1676

    
1677
    self._GenericCheckJob(job)
1678

    
1679
    # Calling the processor on a finished job should be a no-op
1680
    self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
1681
                     jqueue._JobProcessor.FINISHED)
1682
    self.assertRaises(IndexError, queue.GetNextUpdate)
1683

    
1684
  def testJobDependency(self):
1685
    depmgr = _FakeDependencyManager()
1686
    queue = _FakeQueueForProc(depmgr=depmgr)
1687

    
1688
    self.assertEqual(queue.depmgr, depmgr)
1689

    
1690
    prev_job_id = 22113
1691
    prev_job_id2 = 28102
1692
    job_id = 29929
1693
    ops = [
1694
      opcodes.OpTestDummy(result="Res0", fail=False,
1695
                          depends=[
1696
                            [prev_job_id2, None],
1697
                            [prev_job_id, None],
1698
                            ]),
1699
      opcodes.OpTestDummy(result="Res1", fail=False),
1700
      ]
1701

    
1702
    # Create job
1703
    job = self._CreateJob(queue, job_id, ops)
1704

    
1705
    def _BeforeStart(timeout, priority):
1706
      if attempt == 0 or attempt > 5:
1707
        # Job should only be updated when it wasn't waiting for another job
1708
        self.assertEqual(queue.GetNextUpdate(), (job, True))
1709
      self.assertRaises(IndexError, queue.GetNextUpdate)
1710
      self.assertFalse(queue.IsAcquired())
1711
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITING)
1712
      self.assertFalse(job.cur_opctx)
1713

    
1714
    def _AfterStart(op, cbs):
1715
      self.assertEqual(queue.GetNextUpdate(), (job, True))
1716
      self.assertRaises(IndexError, queue.GetNextUpdate)
1717

    
1718
      self.assertFalse(queue.IsAcquired())
1719
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
1720
      self.assertFalse(job.cur_opctx)
1721

    
1722
      # Job is running, cancelling shouldn't be possible
1723
      (success, _) = job.Cancel()
1724
      self.assertFalse(success)
1725

    
1726
    opexec = _FakeExecOpCodeForProc(queue, _BeforeStart, _AfterStart)
1727

    
1728
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1729

    
1730
    counter = itertools.count()
1731
    while True:
1732
      attempt = counter.next()
1733

    
1734
      self.assertRaises(IndexError, queue.GetNextUpdate)
1735
      self.assertRaises(IndexError, depmgr.GetNextNotification)
1736

    
1737
      if attempt < 2:
1738
        depmgr.AddCheckResult(job, prev_job_id2, None,
1739
                              (jqueue._JobDependencyManager.WAIT, "wait2"))
1740
      elif attempt == 2:
1741
        depmgr.AddCheckResult(job, prev_job_id2, None,
1742
                              (jqueue._JobDependencyManager.CONTINUE, "cont"))
1743
        # The processor will ask for the next dependency immediately
1744
        depmgr.AddCheckResult(job, prev_job_id, None,
1745
                              (jqueue._JobDependencyManager.WAIT, "wait"))
1746
      elif attempt < 5:
1747
        depmgr.AddCheckResult(job, prev_job_id, None,
1748
                              (jqueue._JobDependencyManager.WAIT, "wait"))
1749
      elif attempt == 5:
1750
        depmgr.AddCheckResult(job, prev_job_id, None,
1751
                              (jqueue._JobDependencyManager.CONTINUE, "cont"))
1752
      if attempt == 2:
1753
        self.assertEqual(depmgr.CountPendingResults(), 2)
1754
      elif attempt > 5:
1755
        self.assertEqual(depmgr.CountPendingResults(), 0)
1756
      else:
1757
        self.assertEqual(depmgr.CountPendingResults(), 1)
1758

    
1759
      result = jqueue._JobProcessor(queue, opexec, job)()
1760
      if attempt == 0 or attempt >= 5:
1761
        # Job should only be updated if there was an actual change
1762
        self.assertEqual(queue.GetNextUpdate(), (job, True))
1763
      self.assertRaises(IndexError, queue.GetNextUpdate)
1764
      self.assertFalse(depmgr.CountPendingResults())
1765

    
1766
      if attempt < 5:
1767
        # Simulate waiting for other job
1768
        self.assertEqual(result, jqueue._JobProcessor.WAITDEP)
1769
        self.assertTrue(job.cur_opctx)
1770
        self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITING)
1771
        self.assertRaises(IndexError, depmgr.GetNextNotification)
1772
        self.assert_(job.start_timestamp)
1773
        self.assertFalse(job.end_timestamp)
1774
        continue
1775

    
1776
      if result == jqueue._JobProcessor.FINISHED:
1777
        # Last opcode
1778
        self.assertFalse(job.cur_opctx)
1779
        break
1780

    
1781
      self.assertRaises(IndexError, depmgr.GetNextNotification)
1782

    
1783
      self.assertEqual(result, jqueue._JobProcessor.DEFER)
1784
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1785
      self.assert_(job.start_timestamp)
1786
      self.assertFalse(job.end_timestamp)
1787

    
1788
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_SUCCESS)
1789
    self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_SUCCESS])
1790
    self.assertEqual(job.GetInfo(["opresult"]),
1791
                     [[op.input.result for op in job.ops]])
1792
    self.assertEqual(job.GetInfo(["opstatus"]),
1793
                     [len(job.ops) * [constants.OP_STATUS_SUCCESS]])
1794
    self.assertTrue(compat.all(op.start_timestamp and op.end_timestamp
1795
                               for op in job.ops))
1796

    
1797
    self._GenericCheckJob(job)
1798

    
1799
    self.assertRaises(IndexError, queue.GetNextUpdate)
1800
    self.assertRaises(IndexError, depmgr.GetNextNotification)
1801
    self.assertFalse(depmgr.CountPendingResults())
1802
    self.assertFalse(depmgr.CountWaitingJobs())
1803

    
1804
    # Calling the processor on a finished job should be a no-op
1805
    self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
1806
                     jqueue._JobProcessor.FINISHED)
1807
    self.assertRaises(IndexError, queue.GetNextUpdate)
1808

    
1809
  def testJobDependencyCancel(self):
1810
    depmgr = _FakeDependencyManager()
1811
    queue = _FakeQueueForProc(depmgr=depmgr)
1812

    
1813
    self.assertEqual(queue.depmgr, depmgr)
1814

    
1815
    prev_job_id = 13623
1816
    job_id = 30876
1817
    ops = [
1818
      opcodes.OpTestDummy(result="Res0", fail=False),
1819
      opcodes.OpTestDummy(result="Res1", fail=False,
1820
                          depends=[
1821
                            [prev_job_id, None],
1822
                            ]),
1823
      opcodes.OpTestDummy(result="Res2", fail=False),
1824
      ]
1825

    
1826
    # Create job
1827
    job = self._CreateJob(queue, job_id, ops)
1828

    
1829
    def _BeforeStart(timeout, priority):
1830
      if attempt == 0 or attempt > 5:
1831
        # Job should only be updated when it wasn't waiting for another job
1832
        self.assertEqual(queue.GetNextUpdate(), (job, True))
1833
      self.assertRaises(IndexError, queue.GetNextUpdate)
1834
      self.assertFalse(queue.IsAcquired())
1835
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITING)
1836
      self.assertFalse(job.cur_opctx)
1837

    
1838
    def _AfterStart(op, cbs):
1839
      self.assertEqual(queue.GetNextUpdate(), (job, True))
1840
      self.assertRaises(IndexError, queue.GetNextUpdate)
1841

    
1842
      self.assertFalse(queue.IsAcquired())
1843
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
1844
      self.assertFalse(job.cur_opctx)
1845

    
1846
      # Job is running, cancelling shouldn't be possible
1847
      (success, _) = job.Cancel()
1848
      self.assertFalse(success)
1849

    
1850
    opexec = _FakeExecOpCodeForProc(queue, _BeforeStart, _AfterStart)
1851

    
1852
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1853

    
1854
    counter = itertools.count()
1855
    while True:
1856
      attempt = counter.next()
1857

    
1858
      self.assertRaises(IndexError, queue.GetNextUpdate)
1859
      self.assertRaises(IndexError, depmgr.GetNextNotification)
1860

    
1861
      if attempt == 0:
1862
        # This will handle the first opcode
1863
        pass
1864
      elif attempt < 4:
1865
        depmgr.AddCheckResult(job, prev_job_id, None,
1866
                              (jqueue._JobDependencyManager.WAIT, "wait"))
1867
      elif attempt == 4:
1868
        # Other job was cancelled
1869
        depmgr.AddCheckResult(job, prev_job_id, None,
1870
                              (jqueue._JobDependencyManager.CANCEL, "cancel"))
1871

    
1872
      if attempt == 0:
1873
        self.assertEqual(depmgr.CountPendingResults(), 0)
1874
      else:
1875
        self.assertEqual(depmgr.CountPendingResults(), 1)
1876

    
1877
      result = jqueue._JobProcessor(queue, opexec, job)()
1878
      if attempt <= 1 or attempt >= 4:
1879
        # Job should only be updated if there was an actual change
1880
        self.assertEqual(queue.GetNextUpdate(), (job, True))
1881
      self.assertRaises(IndexError, queue.GetNextUpdate)
1882
      self.assertFalse(depmgr.CountPendingResults())
1883

    
1884
      if attempt > 0 and attempt < 4:
1885
        # Simulate waiting for other job
1886
        self.assertEqual(result, jqueue._JobProcessor.WAITDEP)
1887
        self.assertTrue(job.cur_opctx)
1888
        self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITING)
1889
        self.assertRaises(IndexError, depmgr.GetNextNotification)
1890
        self.assert_(job.start_timestamp)
1891
        self.assertFalse(job.end_timestamp)
1892
        continue
1893

    
1894
      if result == jqueue._JobProcessor.FINISHED:
1895
        # Last opcode
1896
        self.assertFalse(job.cur_opctx)
1897
        break
1898

    
1899
      self.assertRaises(IndexError, depmgr.GetNextNotification)
1900

    
1901
      self.assertEqual(result, jqueue._JobProcessor.DEFER)
1902
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1903
      self.assert_(job.start_timestamp)
1904
      self.assertFalse(job.end_timestamp)
1905

    
1906
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_CANCELED)
1907
    self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_CANCELED])
1908
    self.assertEqual(job.GetInfo(["opstatus", "opresult"]),
1909
                     [[constants.OP_STATUS_SUCCESS,
1910
                       constants.OP_STATUS_CANCELED,
1911
                       constants.OP_STATUS_CANCELED],
1912
                      ["Res0", "Job canceled by request",
1913
                       "Job canceled by request"]])
1914

    
1915
    self._GenericCheckJob(job)
1916

    
1917
    self.assertRaises(IndexError, queue.GetNextUpdate)
1918
    self.assertRaises(IndexError, depmgr.GetNextNotification)
1919
    self.assertFalse(depmgr.CountPendingResults())
1920

    
1921
    # Calling the processor on a finished job should be a no-op
1922
    self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
1923
                     jqueue._JobProcessor.FINISHED)
1924
    self.assertRaises(IndexError, queue.GetNextUpdate)
1925

    
1926
  def testJobDependencyWrongstatus(self):
1927
    depmgr = _FakeDependencyManager()
1928
    queue = _FakeQueueForProc(depmgr=depmgr)
1929

    
1930
    self.assertEqual(queue.depmgr, depmgr)
1931

    
1932
    prev_job_id = 9741
1933
    job_id = 11763
1934
    ops = [
1935
      opcodes.OpTestDummy(result="Res0", fail=False),
1936
      opcodes.OpTestDummy(result="Res1", fail=False,
1937
                          depends=[
1938
                            [prev_job_id, None],
1939
                            ]),
1940
      opcodes.OpTestDummy(result="Res2", fail=False),
1941
      ]
1942

    
1943
    # Create job
1944
    job = self._CreateJob(queue, job_id, ops)
1945

    
1946
    def _BeforeStart(timeout, priority):
1947
      if attempt == 0 or attempt > 5:
1948
        # Job should only be updated when it wasn't waiting for another job
1949
        self.assertEqual(queue.GetNextUpdate(), (job, True))
1950
      self.assertRaises(IndexError, queue.GetNextUpdate)
1951
      self.assertFalse(queue.IsAcquired())
1952
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITING)
1953
      self.assertFalse(job.cur_opctx)
1954

    
1955
    def _AfterStart(op, cbs):
1956
      self.assertEqual(queue.GetNextUpdate(), (job, True))
1957
      self.assertRaises(IndexError, queue.GetNextUpdate)
1958

    
1959
      self.assertFalse(queue.IsAcquired())
1960
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
1961
      self.assertFalse(job.cur_opctx)
1962

    
1963
      # Job is running, cancelling shouldn't be possible
1964
      (success, _) = job.Cancel()
1965
      self.assertFalse(success)
1966

    
1967
    opexec = _FakeExecOpCodeForProc(queue, _BeforeStart, _AfterStart)
1968

    
1969
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1970

    
1971
    counter = itertools.count()
1972
    while True:
1973
      attempt = counter.next()
1974

    
1975
      self.assertRaises(IndexError, queue.GetNextUpdate)
1976
      self.assertRaises(IndexError, depmgr.GetNextNotification)
1977

    
1978
      if attempt == 0:
1979
        # This will handle the first opcode
1980
        pass
1981
      elif attempt < 4:
1982
        depmgr.AddCheckResult(job, prev_job_id, None,
1983
                              (jqueue._JobDependencyManager.WAIT, "wait"))
1984
      elif attempt == 4:
1985
        # Other job failed
1986
        depmgr.AddCheckResult(job, prev_job_id, None,
1987
                              (jqueue._JobDependencyManager.WRONGSTATUS, "w"))
1988

    
1989
      if attempt == 0:
1990
        self.assertEqual(depmgr.CountPendingResults(), 0)
1991
      else:
1992
        self.assertEqual(depmgr.CountPendingResults(), 1)
1993

    
1994
      result = jqueue._JobProcessor(queue, opexec, job)()
1995
      if attempt <= 1 or attempt >= 4:
1996
        # Job should only be updated if there was an actual change
1997
        self.assertEqual(queue.GetNextUpdate(), (job, True))
1998
      self.assertRaises(IndexError, queue.GetNextUpdate)
1999
      self.assertFalse(depmgr.CountPendingResults())
2000

    
2001
      if attempt > 0 and attempt < 4:
2002
        # Simulate waiting for other job
2003
        self.assertEqual(result, jqueue._JobProcessor.WAITDEP)
2004
        self.assertTrue(job.cur_opctx)
2005
        self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITING)
2006
        self.assertRaises(IndexError, depmgr.GetNextNotification)
2007
        self.assert_(job.start_timestamp)
2008
        self.assertFalse(job.end_timestamp)
2009
        continue
2010

    
2011
      if result == jqueue._JobProcessor.FINISHED:
2012
        # Last opcode
2013
        self.assertFalse(job.cur_opctx)
2014
        break
2015

    
2016
      self.assertRaises(IndexError, depmgr.GetNextNotification)
2017

    
2018
      self.assertEqual(result, jqueue._JobProcessor.DEFER)
2019
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
2020
      self.assert_(job.start_timestamp)
2021
      self.assertFalse(job.end_timestamp)
2022

    
2023
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_ERROR)
2024
    self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_ERROR])
2025
    self.assertEqual(job.GetInfo(["opstatus"]),
2026
                     [[constants.OP_STATUS_SUCCESS,
2027
                       constants.OP_STATUS_ERROR,
2028
                       constants.OP_STATUS_ERROR]]),
2029

    
2030
    (opresult, ) = job.GetInfo(["opresult"])
2031
    self.assertEqual(len(opresult), len(ops))
2032
    self.assertEqual(opresult[0], "Res0")
2033
    self.assertTrue(errors.GetEncodedError(opresult[1]))
2034
    self.assertTrue(errors.GetEncodedError(opresult[2]))
2035

    
2036
    self._GenericCheckJob(job)
2037

    
2038
    self.assertRaises(IndexError, queue.GetNextUpdate)
2039
    self.assertRaises(IndexError, depmgr.GetNextNotification)
2040
    self.assertFalse(depmgr.CountPendingResults())
2041

    
2042
    # Calling the processor on a finished job should be a no-op
2043
    self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
2044
                     jqueue._JobProcessor.FINISHED)
2045
    self.assertRaises(IndexError, queue.GetNextUpdate)
2046

    
2047

    
2048
class TestEvaluateJobProcessorResult(unittest.TestCase):
2049
  def testFinished(self):
2050
    depmgr = _FakeDependencyManager()
2051
    job = _IdOnlyFakeJob(30953)
2052
    jqueue._EvaluateJobProcessorResult(depmgr, job,
2053
                                       jqueue._JobProcessor.FINISHED)
2054
    self.assertEqual(depmgr.GetNextNotification(), job.id)
2055
    self.assertRaises(IndexError, depmgr.GetNextNotification)
2056

    
2057
  def testDefer(self):
2058
    depmgr = _FakeDependencyManager()
2059
    job = _IdOnlyFakeJob(11326, priority=5463)
2060
    try:
2061
      jqueue._EvaluateJobProcessorResult(depmgr, job,
2062
                                         jqueue._JobProcessor.DEFER)
2063
    except workerpool.DeferTask, err:
2064
      self.assertEqual(err.priority, 5463)
2065
    else:
2066
      self.fail("Didn't raise exception")
2067
    self.assertRaises(IndexError, depmgr.GetNextNotification)
2068

    
2069
  def testWaitdep(self):
2070
    depmgr = _FakeDependencyManager()
2071
    job = _IdOnlyFakeJob(21317)
2072
    jqueue._EvaluateJobProcessorResult(depmgr, job,
2073
                                       jqueue._JobProcessor.WAITDEP)
2074
    self.assertRaises(IndexError, depmgr.GetNextNotification)
2075

    
2076
  def testOther(self):
2077
    depmgr = _FakeDependencyManager()
2078
    job = _IdOnlyFakeJob(5813)
2079
    self.assertRaises(errors.ProgrammerError,
2080
                      jqueue._EvaluateJobProcessorResult,
2081
                      depmgr, job, "Other result")
2082
    self.assertRaises(IndexError, depmgr.GetNextNotification)
2083

    
2084

    
2085
class _FakeTimeoutStrategy:
2086
  def __init__(self, timeouts):
2087
    self.timeouts = timeouts
2088
    self.attempts = 0
2089
    self.last_timeout = None
2090

    
2091
  def NextAttempt(self):
2092
    self.attempts += 1
2093
    if self.timeouts:
2094
      timeout = self.timeouts.pop(0)
2095
    else:
2096
      timeout = None
2097
    self.last_timeout = timeout
2098
    return timeout
2099

    
2100

    
2101
class TestJobProcessorTimeouts(unittest.TestCase, _JobProcessorTestUtils):
2102
  def setUp(self):
2103
    self.queue = _FakeQueueForProc()
2104
    self.job = None
2105
    self.curop = None
2106
    self.opcounter = None
2107
    self.timeout_strategy = None
2108
    self.retries = 0
2109
    self.prev_tsop = None
2110
    self.prev_prio = None
2111
    self.prev_status = None
2112
    self.lock_acq_prio = None
2113
    self.gave_lock = None
2114
    self.done_lock_before_blocking = False
2115

    
2116
  def _BeforeStart(self, timeout, priority):
2117
    job = self.job
2118

    
2119
    # If status has changed, job must've been written
2120
    if self.prev_status != self.job.ops[self.curop].status:
2121
      self.assertEqual(self.queue.GetNextUpdate(), (job, True))
2122
    self.assertRaises(IndexError, self.queue.GetNextUpdate)
2123

    
2124
    self.assertFalse(self.queue.IsAcquired())
2125
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITING)
2126

    
2127
    ts = self.timeout_strategy
2128

    
2129
    self.assert_(timeout is None or isinstance(timeout, (int, float)))
2130
    self.assertEqual(timeout, ts.last_timeout)
2131
    self.assertEqual(priority, job.ops[self.curop].priority)
2132

    
2133
    self.gave_lock = True
2134
    self.lock_acq_prio = priority
2135

    
2136
    if (self.curop == 3 and
2137
        job.ops[self.curop].priority == constants.OP_PRIO_HIGHEST + 3):
2138
      # Give locks before running into blocking acquire
2139
      assert self.retries == 7
2140
      self.retries = 0
2141
      self.done_lock_before_blocking = True
2142
      return
2143

    
2144
    if self.retries > 0:
2145
      self.assert_(timeout is not None)
2146
      self.retries -= 1
2147
      self.gave_lock = False
2148
      raise mcpu.LockAcquireTimeout()
2149

    
2150
    if job.ops[self.curop].priority == constants.OP_PRIO_HIGHEST:
2151
      assert self.retries == 0, "Didn't exhaust all retries at highest priority"
2152
      assert not ts.timeouts
2153
      self.assert_(timeout is None)
2154

    
2155
  def _AfterStart(self, op, cbs):
2156
    job = self.job
2157

    
2158
    # Setting to "running" requires an update
2159
    self.assertEqual(self.queue.GetNextUpdate(), (job, True))
2160
    self.assertRaises(IndexError, self.queue.GetNextUpdate)
2161

    
2162
    self.assertFalse(self.queue.IsAcquired())
2163
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
2164

    
2165
    # Job is running, cancelling shouldn't be possible
2166
    (success, _) = job.Cancel()
2167
    self.assertFalse(success)
2168

    
2169
  def _NextOpcode(self):
2170
    self.curop = self.opcounter.next()
2171
    self.prev_prio = self.job.ops[self.curop].priority
2172
    self.prev_status = self.job.ops[self.curop].status
2173

    
2174
  def _NewTimeoutStrategy(self):
2175
    job = self.job
2176

    
2177
    self.assertEqual(self.retries, 0)
2178

    
2179
    if self.prev_tsop == self.curop:
2180
      # Still on the same opcode, priority must've been increased
2181
      self.assertEqual(self.prev_prio, job.ops[self.curop].priority + 1)
2182

    
2183
    if self.curop == 1:
2184
      # Normal retry
2185
      timeouts = range(10, 31, 10)
2186
      self.retries = len(timeouts) - 1
2187

    
2188
    elif self.curop == 2:
2189
      # Let this run into a blocking acquire
2190
      timeouts = range(11, 61, 12)
2191
      self.retries = len(timeouts)
2192

    
2193
    elif self.curop == 3:
2194
      # Wait for priority to increase, but give lock before blocking acquire
2195
      timeouts = range(12, 100, 14)
2196
      self.retries = len(timeouts)
2197

    
2198
      self.assertFalse(self.done_lock_before_blocking)
2199

    
2200
    elif self.curop == 4:
2201
      self.assert_(self.done_lock_before_blocking)
2202

    
2203
      # Timeouts, but no need to retry
2204
      timeouts = range(10, 31, 10)
2205
      self.retries = 0
2206

    
2207
    elif self.curop == 5:
2208
      # Normal retry
2209
      timeouts = range(19, 100, 11)
2210
      self.retries = len(timeouts)
2211

    
2212
    else:
2213
      timeouts = []
2214
      self.retries = 0
2215

    
2216
    assert len(job.ops) == 10
2217
    assert self.retries <= len(timeouts)
2218

    
2219
    ts = _FakeTimeoutStrategy(timeouts)
2220

    
2221
    self.timeout_strategy = ts
2222
    self.prev_tsop = self.curop
2223
    self.prev_prio = job.ops[self.curop].priority
2224

    
2225
    return ts
2226

    
2227
  def testTimeout(self):
2228
    ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
2229
           for i in range(10)]
2230

    
2231
    # Create job
2232
    job_id = 15801
2233
    job = self._CreateJob(self.queue, job_id, ops)
2234
    self.job = job
2235

    
2236
    self.opcounter = itertools.count(0)
2237

    
2238
    opexec = _FakeExecOpCodeForProc(self.queue, self._BeforeStart,
2239
                                    self._AfterStart)
2240
    tsf = self._NewTimeoutStrategy
2241

    
2242
    self.assertFalse(self.done_lock_before_blocking)
2243

    
2244
    while True:
2245
      proc = jqueue._JobProcessor(self.queue, opexec, job,
2246
                                  _timeout_strategy_factory=tsf)
2247

    
2248
      self.assertRaises(IndexError, self.queue.GetNextUpdate)
2249

    
2250
      if self.curop is not None:
2251
        self.prev_status = self.job.ops[self.curop].status
2252

    
2253
      self.lock_acq_prio = None
2254

    
2255
      result = proc(_nextop_fn=self._NextOpcode)
2256
      assert self.curop is not None
2257

    
2258
      if result == jqueue._JobProcessor.FINISHED or self.gave_lock:
2259
        # Got lock and/or job is done, result must've been written
2260
        self.assertFalse(job.cur_opctx)
2261
        self.assertEqual(self.queue.GetNextUpdate(), (job, True))
2262
        self.assertRaises(IndexError, self.queue.GetNextUpdate)
2263
        self.assertEqual(self.lock_acq_prio, job.ops[self.curop].priority)
2264
        self.assert_(job.ops[self.curop].exec_timestamp)
2265

    
2266
      if result == jqueue._JobProcessor.FINISHED:
2267
        self.assertFalse(job.cur_opctx)
2268
        break
2269

    
2270
      self.assertEqual(result, jqueue._JobProcessor.DEFER)
2271

    
2272
      if self.curop == 0:
2273
        self.assertEqual(job.ops[self.curop].start_timestamp,
2274
                         job.start_timestamp)
2275

    
2276
      if self.gave_lock:
2277
        # Opcode finished, but job not yet done
2278
        self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
2279
      else:
2280
        # Did not get locks
2281
        self.assert_(job.cur_opctx)
2282
        self.assertEqual(job.cur_opctx._timeout_strategy._fn,
2283
                         self.timeout_strategy.NextAttempt)
2284
        self.assertFalse(job.ops[self.curop].exec_timestamp)
2285
        self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITING)
2286

    
2287
        # If priority has changed since acquiring locks, the job must've been
2288
        # updated
2289
        if self.lock_acq_prio != job.ops[self.curop].priority:
2290
          self.assertEqual(self.queue.GetNextUpdate(), (job, True))
2291

    
2292
      self.assertRaises(IndexError, self.queue.GetNextUpdate)
2293

    
2294
      self.assert_(job.start_timestamp)
2295
      self.assertFalse(job.end_timestamp)
2296

    
2297
    self.assertEqual(self.curop, len(job.ops) - 1)
2298
    self.assertEqual(self.job, job)
2299
    self.assertEqual(self.opcounter.next(), len(job.ops))
2300
    self.assert_(self.done_lock_before_blocking)
2301

    
2302
    self.assertRaises(IndexError, self.queue.GetNextUpdate)
2303
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_SUCCESS)
2304
    self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_SUCCESS])
2305
    self.assertEqual(job.GetInfo(["opresult"]),
2306
                     [[op.input.result for op in job.ops]])
2307
    self.assertEqual(job.GetInfo(["opstatus"]),
2308
                     [len(job.ops) * [constants.OP_STATUS_SUCCESS]])
2309
    self.assert_(compat.all(op.start_timestamp and op.end_timestamp
2310
                            for op in job.ops))
2311

    
2312
    # Calling the processor on a finished job should be a no-op
2313
    self.assertEqual(jqueue._JobProcessor(self.queue, opexec, job)(),
2314
                     jqueue._JobProcessor.FINISHED)
2315
    self.assertRaises(IndexError, self.queue.GetNextUpdate)
2316

    
2317

    
2318
class TestJobProcessorChangePriority(unittest.TestCase, _JobProcessorTestUtils):
2319
  def setUp(self):
2320
    self.queue = _FakeQueueForProc()
2321
    self.opexecprio = []
2322

    
2323
  def _BeforeStart(self, timeout, priority):
2324
    self.assertFalse(self.queue.IsAcquired())
2325
    self.opexecprio.append(priority)
2326

    
2327
  def testChangePriorityWhileRunning(self):
2328
    # Tests changing the priority on a job while it has finished opcodes
2329
    # (successful) and more, unprocessed ones
2330
    ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
2331
           for i in range(3)]
2332

    
2333
    # Create job
2334
    job_id = 3499
2335
    job = self._CreateJob(self.queue, job_id, ops)
2336

    
2337
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
2338

    
2339
    opexec = _FakeExecOpCodeForProc(self.queue, self._BeforeStart, None)
2340

    
2341
    # Run first opcode
2342
    self.assertEqual(jqueue._JobProcessor(self.queue, opexec, job)(),
2343
                     jqueue._JobProcessor.DEFER)
2344

    
2345
    # Job goes back to queued
2346
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
2347
    self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT)
2348
    self.assertEqual(job.GetInfo(["opstatus", "opresult"]),
2349
                     [[constants.OP_STATUS_SUCCESS,
2350
                       constants.OP_STATUS_QUEUED,
2351
                       constants.OP_STATUS_QUEUED],
2352
                      ["Res0", None, None]])
2353

    
2354
    self.assertEqual(self.opexecprio.pop(0), constants.OP_PRIO_DEFAULT)
2355
    self.assertRaises(IndexError, self.opexecprio.pop, 0)
2356

    
2357
    # Change priority
2358
    self.assertEqual(job.ChangePriority(-10),
2359
                     (True,
2360
                      ("Priorities of pending opcodes for job 3499 have"
2361
                       " been changed to -10")))
2362
    self.assertEqual(job.CalcPriority(), -10)
2363

    
2364
    # Process second opcode
2365
    self.assertEqual(jqueue._JobProcessor(self.queue, opexec, job)(),
2366
                     jqueue._JobProcessor.DEFER)
2367

    
2368
    self.assertEqual(self.opexecprio.pop(0), -10)
2369
    self.assertRaises(IndexError, self.opexecprio.pop, 0)
2370

    
2371
    # Check status
2372
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
2373
    self.assertEqual(job.CalcPriority(), -10)
2374
    self.assertEqual(job.GetInfo(["id"]), [job_id])
2375
    self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_QUEUED])
2376
    self.assertEqual(job.GetInfo(["opstatus", "opresult"]),
2377
                     [[constants.OP_STATUS_SUCCESS,
2378
                       constants.OP_STATUS_SUCCESS,
2379
                       constants.OP_STATUS_QUEUED],
2380
                      ["Res0", "Res1", None]])
2381

    
2382
    # Change priority once more
2383
    self.assertEqual(job.ChangePriority(5),
2384
                     (True,
2385
                      ("Priorities of pending opcodes for job 3499 have"
2386
                       " been changed to 5")))
2387
    self.assertEqual(job.CalcPriority(), 5)
2388

    
2389
    # Process third opcode
2390
    self.assertEqual(jqueue._JobProcessor(self.queue, opexec, job)(),
2391
                     jqueue._JobProcessor.FINISHED)
2392

    
2393
    self.assertEqual(self.opexecprio.pop(0), 5)
2394
    self.assertRaises(IndexError, self.opexecprio.pop, 0)
2395

    
2396
    # Check status
2397
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_SUCCESS)
2398
    self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT)
2399
    self.assertEqual(job.GetInfo(["id"]), [job_id])
2400
    self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_SUCCESS])
2401
    self.assertEqual(job.GetInfo(["opstatus", "opresult"]),
2402
                     [[constants.OP_STATUS_SUCCESS,
2403
                       constants.OP_STATUS_SUCCESS,
2404
                       constants.OP_STATUS_SUCCESS],
2405
                      ["Res0", "Res1", "Res2"]])
2406
    self.assertEqual(map(operator.attrgetter("priority"), job.ops),
2407
                     [constants.OP_PRIO_DEFAULT, -10, 5])
2408

    
2409

    
2410
class _IdOnlyFakeJob:
2411
  def __init__(self, job_id, priority=NotImplemented):
2412
    self.id = str(job_id)
2413
    self._priority = priority
2414

    
2415
  def CalcPriority(self):
2416
    return self._priority
2417

    
2418

    
2419
class TestJobDependencyManager(unittest.TestCase):
2420
  def setUp(self):
2421
    self._status = []
2422
    self._queue = []
2423
    self.jdm = jqueue._JobDependencyManager(self._GetStatus, self._Enqueue)
2424

    
2425
  def _GetStatus(self, job_id):
2426
    (exp_job_id, result) = self._status.pop(0)
2427
    self.assertEqual(exp_job_id, job_id)
2428
    return result
2429

    
2430
  def _Enqueue(self, jobs):
2431
    self.assertFalse(self.jdm._lock.is_owned(),
2432
                     msg=("Must not own manager lock while re-adding jobs"
2433
                          " (potential deadlock)"))
2434
    self._queue.append(jobs)
2435

    
2436
  def testNotFinalizedThenCancel(self):
2437
    job = _IdOnlyFakeJob(17697)
2438
    job_id = str(28625)
2439

    
2440
    self._status.append((job_id, constants.JOB_STATUS_RUNNING))
2441
    (result, _) = self.jdm.CheckAndRegister(job, job_id, [])
2442
    self.assertEqual(result, self.jdm.WAIT)
2443
    self.assertFalse(self._status)
2444
    self.assertFalse(self._queue)
2445
    self.assertTrue(self.jdm.JobWaiting(job))
2446
    self.assertEqual(self.jdm._waiters, {
2447
      job_id: set([job]),
2448
      })
2449
    self.assertEqual(self.jdm.GetLockInfo([query.LQ_PENDING]), [
2450
      ("job/28625", None, None, [("job", [job.id])])
2451
      ])
2452

    
2453
    self._status.append((job_id, constants.JOB_STATUS_CANCELED))
2454
    (result, _) = self.jdm.CheckAndRegister(job, job_id, [])
2455
    self.assertEqual(result, self.jdm.CANCEL)
2456
    self.assertFalse(self._status)
2457
    self.assertFalse(self._queue)
2458
    self.assertFalse(self.jdm.JobWaiting(job))
2459
    self.assertFalse(self.jdm.GetLockInfo([query.LQ_PENDING]))
2460

    
2461
  def testNotFinalizedThenQueued(self):
2462
    # This can happen on a queue shutdown
2463
    job = _IdOnlyFakeJob(1320)
2464
    job_id = str(22971)
2465

    
2466
    for i in range(5):
2467
      if i > 2:
2468
        self._status.append((job_id, constants.JOB_STATUS_QUEUED))
2469
      else:
2470
        self._status.append((job_id, constants.JOB_STATUS_RUNNING))
2471
      (result, _) = self.jdm.CheckAndRegister(job, job_id, [])
2472
      self.assertEqual(result, self.jdm.WAIT)
2473
      self.assertFalse(self._status)
2474
      self.assertFalse(self._queue)
2475
      self.assertTrue(self.jdm.JobWaiting(job))
2476
      self.assertEqual(self.jdm._waiters, {
2477
        job_id: set([job]),
2478
        })
2479
      self.assertEqual(self.jdm.GetLockInfo([query.LQ_PENDING]), [
2480
        ("job/22971", None, None, [("job", [job.id])])
2481
        ])
2482

    
2483
  def testRequireCancel(self):
2484
    job = _IdOnlyFakeJob(5278)
2485
    job_id = str(9610)
2486
    dep_status = [constants.JOB_STATUS_CANCELED]
2487

    
2488
    self._status.append((job_id, constants.JOB_STATUS_WAITING))
2489
    (result, _) = self.jdm.CheckAndRegister(job, job_id, dep_status)
2490
    self.assertEqual(result, self.jdm.WAIT)
2491
    self.assertFalse(self._status)
2492
    self.assertFalse(self._queue)
2493
    self.assertTrue(self.jdm.JobWaiting(job))
2494
    self.assertEqual(self.jdm._waiters, {
2495
      job_id: set([job]),
2496
      })
2497
    self.assertEqual(self.jdm.GetLockInfo([query.LQ_PENDING]), [
2498
      ("job/9610", None, None, [("job", [job.id])])
2499
      ])
2500

    
2501
    self._status.append((job_id, constants.JOB_STATUS_CANCELED))
2502
    (result, _) = self.jdm.CheckAndRegister(job, job_id, dep_status)
2503
    self.assertEqual(result, self.jdm.CONTINUE)
2504
    self.assertFalse(self._status)
2505
    self.assertFalse(self._queue)
2506
    self.assertFalse(self.jdm.JobWaiting(job))
2507
    self.assertFalse(self.jdm.GetLockInfo([query.LQ_PENDING]))
2508

    
2509
  def testRequireError(self):
2510
    job = _IdOnlyFakeJob(21459)
2511
    job_id = str(25519)
2512
    dep_status = [constants.JOB_STATUS_ERROR]
2513

    
2514
    self._status.append((job_id, constants.JOB_STATUS_WAITING))
2515
    (result, _) = self.jdm.CheckAndRegister(job, job_id, dep_status)
2516
    self.assertEqual(result, self.jdm.WAIT)
2517
    self.assertFalse(self._status)
2518
    self.assertFalse(self._queue)
2519
    self.assertTrue(self.jdm.JobWaiting(job))
2520
    self.assertEqual(self.jdm._waiters, {
2521
      job_id: set([job]),
2522
      })
2523

    
2524
    self._status.append((job_id, constants.JOB_STATUS_ERROR))
2525
    (result, _) = self.jdm.CheckAndRegister(job, job_id, dep_status)
2526
    self.assertEqual(result, self.jdm.CONTINUE)
2527
    self.assertFalse(self._status)
2528
    self.assertFalse(self._queue)
2529
    self.assertFalse(self.jdm.JobWaiting(job))
2530
    self.assertFalse(self.jdm.GetLockInfo([query.LQ_PENDING]))
2531

    
2532
  def testRequireMultiple(self):
2533
    dep_status = list(constants.JOBS_FINALIZED)
2534

    
2535
    for end_status in dep_status:
2536
      job = _IdOnlyFakeJob(21343)
2537
      job_id = str(14609)
2538

    
2539
      self._status.append((job_id, constants.JOB_STATUS_WAITING))
2540
      (result, _) = self.jdm.CheckAndRegister(job, job_id, dep_status)
2541
      self.assertEqual(result, self.jdm.WAIT)
2542
      self.assertFalse(self._status)
2543
      self.assertFalse(self._queue)
2544
      self.assertTrue(self.jdm.JobWaiting(job))
2545
      self.assertEqual(self.jdm._waiters, {
2546
        job_id: set([job]),
2547
        })
2548
      self.assertEqual(self.jdm.GetLockInfo([query.LQ_PENDING]), [
2549
        ("job/14609", None, None, [("job", [job.id])])
2550
        ])
2551

    
2552
      self._status.append((job_id, end_status))
2553
      (result, _) = self.jdm.CheckAndRegister(job, job_id, dep_status)
2554
      self.assertEqual(result, self.jdm.CONTINUE)
2555
      self.assertFalse(self._status)
2556
      self.assertFalse(self._queue)
2557
      self.assertFalse(self.jdm.JobWaiting(job))
2558
      self.assertFalse(self.jdm.GetLockInfo([query.LQ_PENDING]))
2559

    
2560
  def testNotify(self):
2561
    job = _IdOnlyFakeJob(8227)
2562
    job_id = str(4113)
2563

    
2564
    self._status.append((job_id, constants.JOB_STATUS_RUNNING))
2565
    (result, _) = self.jdm.CheckAndRegister(job, job_id, [])
2566
    self.assertEqual(result, self.jdm.WAIT)
2567
    self.assertFalse(self._status)
2568
    self.assertFalse(self._queue)
2569
    self.assertTrue(self.jdm.JobWaiting(job))
2570
    self.assertEqual(self.jdm._waiters, {
2571
      job_id: set([job]),
2572
      })
2573

    
2574
    self.jdm.NotifyWaiters(job_id)
2575
    self.assertFalse(self._status)
2576
    self.assertFalse(self.jdm._waiters)
2577
    self.assertFalse(self.jdm.JobWaiting(job))
2578
    self.assertEqual(self._queue, [set([job])])
2579

    
2580
  def testWrongStatus(self):
2581
    job = _IdOnlyFakeJob(10102)
2582
    job_id = str(1271)
2583

    
2584
    self._status.append((job_id, constants.JOB_STATUS_QUEUED))
2585
    (result, _) = self.jdm.CheckAndRegister(job, job_id,
2586
                                            [constants.JOB_STATUS_SUCCESS])
2587
    self.assertEqual(result, self.jdm.WAIT)
2588
    self.assertFalse(self._status)
2589
    self.assertFalse(self._queue)
2590
    self.assertTrue(self.jdm.JobWaiting(job))
2591
    self.assertEqual(self.jdm._waiters, {
2592
      job_id: set([job]),
2593
      })
2594

    
2595
    self._status.append((job_id, constants.JOB_STATUS_ERROR))
2596
    (result, _) = self.jdm.CheckAndRegister(job, job_id,
2597
                                            [constants.JOB_STATUS_SUCCESS])
2598
    self.assertEqual(result, self.jdm.WRONGSTATUS)
2599
    self.assertFalse(self._status)
2600
    self.assertFalse(self._queue)
2601
    self.assertFalse(self.jdm.JobWaiting(job))
2602

    
2603
  def testCorrectStatus(self):
2604
    job = _IdOnlyFakeJob(24273)
2605
    job_id = str(23885)
2606

    
2607
    self._status.append((job_id, constants.JOB_STATUS_QUEUED))
2608
    (result, _) = self.jdm.CheckAndRegister(job, job_id,
2609
                                            [constants.JOB_STATUS_SUCCESS])
2610
    self.assertEqual(result, self.jdm.WAIT)
2611
    self.assertFalse(self._status)
2612
    self.assertFalse(self._queue)
2613
    self.assertTrue(self.jdm.JobWaiting(job))
2614
    self.assertEqual(self.jdm._waiters, {
2615
      job_id: set([job]),
2616
      })
2617

    
2618
    self._status.append((job_id, constants.JOB_STATUS_SUCCESS))
2619
    (result, _) = self.jdm.CheckAndRegister(job, job_id,
2620
                                            [constants.JOB_STATUS_SUCCESS])
2621
    self.assertEqual(result, self.jdm.CONTINUE)
2622
    self.assertFalse(self._status)
2623
    self.assertFalse(self._queue)
2624
    self.assertFalse(self.jdm.JobWaiting(job))
2625

    
2626
  def testFinalizedRightAway(self):
2627
    job = _IdOnlyFakeJob(224)
2628
    job_id = str(3081)
2629

    
2630
    self._status.append((job_id, constants.JOB_STATUS_SUCCESS))
2631
    (result, _) = self.jdm.CheckAndRegister(job, job_id,
2632
                                            [constants.JOB_STATUS_SUCCESS])
2633
    self.assertEqual(result, self.jdm.CONTINUE)
2634
    self.assertFalse(self._status)
2635
    self.assertFalse(self._queue)
2636
    self.assertFalse(self.jdm.JobWaiting(job))
2637
    self.assertEqual(self.jdm._waiters, {
2638
      job_id: set(),
2639
      })
2640

    
2641
    # Force cleanup
2642
    self.jdm.NotifyWaiters("0")
2643
    self.assertFalse(self.jdm._waiters)
2644
    self.assertFalse(self._status)
2645
    self.assertFalse(self._queue)
2646

    
2647
  def testMultipleWaiting(self):
2648
    # Use a deterministic random generator
2649
    rnd = random.Random(21402)
2650

    
2651
    job_ids = map(str, rnd.sample(range(1, 10000), 150))
2652

    
2653
    waiters = dict((job_ids.pop(),
2654
                    set(map(_IdOnlyFakeJob,
2655
                            [job_ids.pop()
2656
                             for _ in range(rnd.randint(1, 20))])))
2657
                   for _ in range(10))
2658

    
2659
    # Ensure there are no duplicate job IDs
2660
    assert not utils.FindDuplicates(waiters.keys() +
2661
                                    [job.id
2662
                                     for jobs in waiters.values()
2663
                                     for job in jobs])
2664

    
2665
    # Register all jobs as waiters
2666
    for job_id, job in [(job_id, job)
2667
                        for (job_id, jobs) in waiters.items()
2668
                        for job in jobs]:
2669
      self._status.append((job_id, constants.JOB_STATUS_QUEUED))
2670
      (result, _) = self.jdm.CheckAndRegister(job, job_id,
2671
                                              [constants.JOB_STATUS_SUCCESS])
2672
      self.assertEqual(result, self.jdm.WAIT)
2673
      self.assertFalse(self._status)
2674
      self.assertFalse(self._queue)
2675
      self.assertTrue(self.jdm.JobWaiting(job))
2676

    
2677
    self.assertEqual(self.jdm._waiters, waiters)
2678

    
2679
    def _MakeSet((name, mode, owner_names, pending)):
2680
      return (name, mode, owner_names,
2681
              [(pendmode, set(pend)) for (pendmode, pend) in pending])
2682

    
2683
    def _CheckLockInfo():
2684
      info = self.jdm.GetLockInfo([query.LQ_PENDING])
2685
      self.assertEqual(sorted(map(_MakeSet, info)), sorted([
2686
        ("job/%s" % job_id, None, None,
2687
         [("job", set([job.id for job in jobs]))])
2688
        for job_id, jobs in waiters.items()
2689
        if jobs
2690
        ]))
2691

    
2692
    _CheckLockInfo()
2693

    
2694
    # Notify in random order
2695
    for job_id in rnd.sample(waiters, len(waiters)):
2696
      # Remove from pending waiter list
2697
      jobs = waiters.pop(job_id)
2698
      for job in jobs:
2699
        self._status.append((job_id, constants.JOB_STATUS_SUCCESS))
2700
        (result, _) = self.jdm.CheckAndRegister(job, job_id,
2701
                                                [constants.JOB_STATUS_SUCCESS])
2702
        self.assertEqual(result, self.jdm.CONTINUE)
2703
        self.assertFalse(self._status)
2704
        self.assertFalse(self._queue)
2705
        self.assertFalse(self.jdm.JobWaiting(job))
2706

    
2707
      _CheckLockInfo()
2708

    
2709
    self.assertFalse(self.jdm.GetLockInfo([query.LQ_PENDING]))
2710

    
2711
    assert not waiters
2712

    
2713
  def testSelfDependency(self):
2714
    job = _IdOnlyFakeJob(18937)
2715

    
2716
    self._status.append((job.id, constants.JOB_STATUS_SUCCESS))
2717
    (result, _) = self.jdm.CheckAndRegister(job, job.id, [])
2718
    self.assertEqual(result, self.jdm.ERROR)
2719

    
2720
  def testJobDisappears(self):
2721
    job = _IdOnlyFakeJob(30540)
2722
    job_id = str(23769)
2723

    
2724
    def _FakeStatus(_):
2725
      raise errors.JobLost("#msg#")
2726

    
2727
    jdm = jqueue._JobDependencyManager(_FakeStatus, None)
2728
    (result, _) = jdm.CheckAndRegister(job, job_id, [])
2729
    self.assertEqual(result, self.jdm.ERROR)
2730
    self.assertFalse(jdm.JobWaiting(job))
2731
    self.assertFalse(jdm.GetLockInfo([query.LQ_PENDING]))
2732

    
2733

    
2734
if __name__ == "__main__":
2735
  testutils.GanetiTestProgram()