Statistics
| Branch: | Tag: | Revision:

root / test / py / ganeti.jqueue_unittest.py @ 90066780

History | View | Annotate | Download (96.3 kB)

1
#!/usr/bin/python
2
#
3

    
4
# Copyright (C) 2010, 2011, 2012 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Script for testing ganeti.jqueue"""
23

    
24
import os
25
import sys
26
import unittest
27
import tempfile
28
import shutil
29
import errno
30
import itertools
31
import random
32
import operator
33

    
34
try:
35
  # pylint: disable=E0611
36
  from pyinotify import pyinotify
37
except ImportError:
38
  import pyinotify
39

    
40
from ganeti import constants
41
from ganeti import utils
42
from ganeti import errors
43
from ganeti import jqueue
44
from ganeti import opcodes
45
from ganeti import compat
46
from ganeti import mcpu
47
from ganeti import query
48
from ganeti import workerpool
49

    
50
import testutils
51

    
52

    
53
class _FakeJob:
54
  def __init__(self, job_id, status):
55
    self.id = job_id
56
    self.writable = False
57
    self._status = status
58
    self._log = []
59

    
60
  def SetStatus(self, status):
61
    self._status = status
62

    
63
  def AddLogEntry(self, msg):
64
    self._log.append((len(self._log), msg))
65

    
66
  def CalcStatus(self):
67
    return self._status
68

    
69
  def GetInfo(self, fields):
70
    result = []
71

    
72
    for name in fields:
73
      if name == "status":
74
        result.append(self._status)
75
      else:
76
        raise Exception("Unknown field")
77

    
78
    return result
79

    
80
  def GetLogEntries(self, newer_than):
81
    assert newer_than is None or newer_than >= 0
82

    
83
    if newer_than is None:
84
      return self._log
85

    
86
    return self._log[newer_than:]
87

    
88

    
89
class TestJobChangesChecker(unittest.TestCase):
90
  def testStatus(self):
91
    job = _FakeJob(9094, constants.JOB_STATUS_QUEUED)
92
    checker = jqueue._JobChangesChecker(["status"], None, None)
93
    self.assertEqual(checker(job), ([constants.JOB_STATUS_QUEUED], []))
94

    
95
    job.SetStatus(constants.JOB_STATUS_RUNNING)
96
    self.assertEqual(checker(job), ([constants.JOB_STATUS_RUNNING], []))
97

    
98
    job.SetStatus(constants.JOB_STATUS_SUCCESS)
99
    self.assertEqual(checker(job), ([constants.JOB_STATUS_SUCCESS], []))
100

    
101
    # job.id is used by checker
102
    self.assertEqual(job.id, 9094)
103

    
104
  def testStatusWithPrev(self):
105
    job = _FakeJob(12807, constants.JOB_STATUS_QUEUED)
106
    checker = jqueue._JobChangesChecker(["status"],
107
                                        [constants.JOB_STATUS_QUEUED], None)
108
    self.assert_(checker(job) is None)
109

    
110
    job.SetStatus(constants.JOB_STATUS_RUNNING)
111
    self.assertEqual(checker(job), ([constants.JOB_STATUS_RUNNING], []))
112

    
113
  def testFinalStatus(self):
114
    for status in constants.JOBS_FINALIZED:
115
      job = _FakeJob(2178711, status)
116
      checker = jqueue._JobChangesChecker(["status"], [status], None)
117
      # There won't be any changes in this status, hence it should signal
118
      # a change immediately
119
      self.assertEqual(checker(job), ([status], []))
120

    
121
  def testLog(self):
122
    job = _FakeJob(9094, constants.JOB_STATUS_RUNNING)
123
    checker = jqueue._JobChangesChecker(["status"], None, None)
124
    self.assertEqual(checker(job), ([constants.JOB_STATUS_RUNNING], []))
125

    
126
    job.AddLogEntry("Hello World")
127
    (job_info, log_entries) = checker(job)
128
    self.assertEqual(job_info, [constants.JOB_STATUS_RUNNING])
129
    self.assertEqual(log_entries, [[0, "Hello World"]])
130

    
131
    checker2 = jqueue._JobChangesChecker(["status"], job_info, len(log_entries))
132
    self.assert_(checker2(job) is None)
133

    
134
    job.AddLogEntry("Foo Bar")
135
    job.SetStatus(constants.JOB_STATUS_ERROR)
136

    
137
    (job_info, log_entries) = checker2(job)
138
    self.assertEqual(job_info, [constants.JOB_STATUS_ERROR])
139
    self.assertEqual(log_entries, [[1, "Foo Bar"]])
140

    
141
    checker3 = jqueue._JobChangesChecker(["status"], None, None)
142
    (job_info, log_entries) = checker3(job)
143
    self.assertEqual(job_info, [constants.JOB_STATUS_ERROR])
144
    self.assertEqual(log_entries, [[0, "Hello World"], [1, "Foo Bar"]])
145

    
146

    
147
class TestJobChangesWaiter(unittest.TestCase):
148
  def setUp(self):
149
    self.tmpdir = tempfile.mkdtemp()
150
    self.filename = utils.PathJoin(self.tmpdir, "job-1")
151
    utils.WriteFile(self.filename, data="")
152

    
153
  def tearDown(self):
154
    shutil.rmtree(self.tmpdir)
155

    
156
  def _EnsureNotifierClosed(self, notifier):
157
    try:
158
      os.fstat(notifier._fd)
159
    except EnvironmentError, err:
160
      self.assertEqual(err.errno, errno.EBADF)
161
    else:
162
      self.fail("File descriptor wasn't closed")
163

    
164
  def testClose(self):
165
    for wait in [False, True]:
166
      waiter = jqueue._JobFileChangesWaiter(self.filename)
167
      try:
168
        if wait:
169
          waiter.Wait(0.001)
170
      finally:
171
        waiter.Close()
172

    
173
      # Ensure file descriptor was closed
174
      self._EnsureNotifierClosed(waiter._notifier)
175

    
176
  def testChangingFile(self):
177
    waiter = jqueue._JobFileChangesWaiter(self.filename)
178
    try:
179
      self.assertFalse(waiter.Wait(0.1))
180
      utils.WriteFile(self.filename, data="changed")
181
      self.assert_(waiter.Wait(60))
182
    finally:
183
      waiter.Close()
184

    
185
    self._EnsureNotifierClosed(waiter._notifier)
186

    
187
  def testChangingFile2(self):
188
    waiter = jqueue._JobChangesWaiter(self.filename)
189
    try:
190
      self.assertFalse(waiter._filewaiter)
191
      self.assert_(waiter.Wait(0.1))
192
      self.assert_(waiter._filewaiter)
193

    
194
      # File waiter is now used, but there have been no changes
195
      self.assertFalse(waiter.Wait(0.1))
196
      utils.WriteFile(self.filename, data="changed")
197
      self.assert_(waiter.Wait(60))
198
    finally:
199
      waiter.Close()
200

    
201
    self._EnsureNotifierClosed(waiter._filewaiter._notifier)
202

    
203

    
204
class _FailingWatchManager(pyinotify.WatchManager):
205
  """Subclass of L{pyinotify.WatchManager} which always fails to register.
206

207
  """
208
  def add_watch(self, filename, mask):
209
    assert mask == (pyinotify.EventsCodes.ALL_FLAGS["IN_MODIFY"] |
210
                    pyinotify.EventsCodes.ALL_FLAGS["IN_IGNORED"])
211

    
212
    return {
213
      filename: -1,
214
      }
215

    
216

    
217
class TestWaitForJobChangesHelper(unittest.TestCase):
218
  def setUp(self):
219
    self.tmpdir = tempfile.mkdtemp()
220
    self.filename = utils.PathJoin(self.tmpdir, "job-2614226563")
221
    utils.WriteFile(self.filename, data="")
222

    
223
  def tearDown(self):
224
    shutil.rmtree(self.tmpdir)
225

    
226
  def _LoadWaitingJob(self):
227
    return _FakeJob(2614226563, constants.JOB_STATUS_WAITING)
228

    
229
  def _LoadLostJob(self):
230
    return None
231

    
232
  def testNoChanges(self):
233
    wfjc = jqueue._WaitForJobChangesHelper()
234

    
235
    # No change
236
    self.assertEqual(wfjc(self.filename, self._LoadWaitingJob, ["status"],
237
                          [constants.JOB_STATUS_WAITING], None, 0.1),
238
                     constants.JOB_NOTCHANGED)
239

    
240
    # No previous information
241
    self.assertEqual(wfjc(self.filename, self._LoadWaitingJob,
242
                          ["status"], None, None, 1.0),
243
                     ([constants.JOB_STATUS_WAITING], []))
244

    
245
  def testLostJob(self):
246
    wfjc = jqueue._WaitForJobChangesHelper()
247
    self.assert_(wfjc(self.filename, self._LoadLostJob,
248
                      ["status"], None, None, 1.0) is None)
249

    
250
  def testNonExistentFile(self):
251
    wfjc = jqueue._WaitForJobChangesHelper()
252

    
253
    filename = utils.PathJoin(self.tmpdir, "does-not-exist")
254
    self.assertFalse(os.path.exists(filename))
255

    
256
    result = wfjc(filename, self._LoadLostJob, ["status"], None, None, 1.0,
257
                  _waiter_cls=compat.partial(jqueue._JobChangesWaiter,
258
                                             _waiter_cls=NotImplemented))
259
    self.assertTrue(result is None)
260

    
261
  def testInotifyError(self):
262
    jobfile_waiter_cls = \
263
      compat.partial(jqueue._JobFileChangesWaiter,
264
                     _inotify_wm_cls=_FailingWatchManager)
265

    
266
    jobchange_waiter_cls = \
267
      compat.partial(jqueue._JobChangesWaiter, _waiter_cls=jobfile_waiter_cls)
268

    
269
    wfjc = jqueue._WaitForJobChangesHelper()
270

    
271
    # Test if failing to watch a job file (e.g. due to
272
    # fs.inotify.max_user_watches being too low) raises errors.InotifyError
273
    self.assertRaises(errors.InotifyError, wfjc,
274
                      self.filename, self._LoadWaitingJob,
275
                      ["status"], [constants.JOB_STATUS_WAITING], None, 1.0,
276
                      _waiter_cls=jobchange_waiter_cls)
277

    
278

    
279
class TestEncodeOpError(unittest.TestCase):
280
  def test(self):
281
    encerr = jqueue._EncodeOpError(errors.LockError("Test 1"))
282
    self.assert_(isinstance(encerr, tuple))
283
    self.assertRaises(errors.LockError, errors.MaybeRaise, encerr)
284

    
285
    encerr = jqueue._EncodeOpError(errors.GenericError("Test 2"))
286
    self.assert_(isinstance(encerr, tuple))
287
    self.assertRaises(errors.GenericError, errors.MaybeRaise, encerr)
288

    
289
    encerr = jqueue._EncodeOpError(NotImplementedError("Foo"))
290
    self.assert_(isinstance(encerr, tuple))
291
    self.assertRaises(errors.OpExecError, errors.MaybeRaise, encerr)
292

    
293
    encerr = jqueue._EncodeOpError("Hello World")
294
    self.assert_(isinstance(encerr, tuple))
295
    self.assertRaises(errors.OpExecError, errors.MaybeRaise, encerr)
296

    
297

    
298
class TestQueuedOpCode(unittest.TestCase):
299
  def testDefaults(self):
300
    def _Check(op):
301
      self.assertFalse(hasattr(op.input, "dry_run"))
302
      self.assertEqual(op.priority, constants.OP_PRIO_DEFAULT)
303
      self.assertFalse(op.log)
304
      self.assert_(op.start_timestamp is None)
305
      self.assert_(op.exec_timestamp is None)
306
      self.assert_(op.end_timestamp is None)
307
      self.assert_(op.result is None)
308
      self.assertEqual(op.status, constants.OP_STATUS_QUEUED)
309

    
310
    op1 = jqueue._QueuedOpCode(opcodes.OpTestDelay())
311
    _Check(op1)
312
    op2 = jqueue._QueuedOpCode.Restore(op1.Serialize())
313
    _Check(op2)
314
    self.assertEqual(op1.Serialize(), op2.Serialize())
315

    
316
  def testPriority(self):
317
    def _Check(op):
318
      assert constants.OP_PRIO_DEFAULT != constants.OP_PRIO_HIGH, \
319
             "Default priority equals high priority; test can't work"
320
      self.assertEqual(op.priority, constants.OP_PRIO_HIGH)
321
      self.assertEqual(op.status, constants.OP_STATUS_QUEUED)
322

    
323
    inpop = opcodes.OpTagsGet(priority=constants.OP_PRIO_HIGH)
324
    op1 = jqueue._QueuedOpCode(inpop)
325
    _Check(op1)
326
    op2 = jqueue._QueuedOpCode.Restore(op1.Serialize())
327
    _Check(op2)
328
    self.assertEqual(op1.Serialize(), op2.Serialize())
329

    
330

    
331
class TestQueuedJob(unittest.TestCase):
332
  def testNoOpCodes(self):
333
    self.assertRaises(errors.GenericError, jqueue._QueuedJob,
334
                      None, 1, [], False)
335

    
336
  def testDefaults(self):
337
    job_id = 4260
338
    ops = [
339
      opcodes.OpTagsGet(),
340
      opcodes.OpTestDelay(),
341
      ]
342

    
343
    def _Check(job):
344
      self.assertTrue(job.writable)
345
      self.assertEqual(job.id, job_id)
346
      self.assertEqual(job.log_serial, 0)
347
      self.assert_(job.received_timestamp)
348
      self.assert_(job.start_timestamp is None)
349
      self.assert_(job.end_timestamp is None)
350
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
351
      self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT)
352
      self.assert_(repr(job).startswith("<"))
353
      self.assertEqual(len(job.ops), len(ops))
354
      self.assert_(compat.all(inp.__getstate__() == op.input.__getstate__()
355
                              for (inp, op) in zip(ops, job.ops)))
356
      self.assertRaises(errors.OpPrereqError, job.GetInfo,
357
                        ["unknown-field"])
358
      self.assertEqual(job.GetInfo(["summary"]),
359
                       [[op.input.Summary() for op in job.ops]])
360
      self.assertFalse(job.archived)
361

    
362
    job1 = jqueue._QueuedJob(None, job_id, ops, True)
363
    _Check(job1)
364
    job2 = jqueue._QueuedJob.Restore(None, job1.Serialize(), True, False)
365
    _Check(job2)
366
    self.assertEqual(job1.Serialize(), job2.Serialize())
367

    
368
  def testWritable(self):
369
    job = jqueue._QueuedJob(None, 1, [opcodes.OpTestDelay()], False)
370
    self.assertFalse(job.writable)
371

    
372
    job = jqueue._QueuedJob(None, 1, [opcodes.OpTestDelay()], True)
373
    self.assertTrue(job.writable)
374

    
375
  def testArchived(self):
376
    job = jqueue._QueuedJob(None, 1, [opcodes.OpTestDelay()], False)
377
    self.assertFalse(job.archived)
378

    
379
    newjob = jqueue._QueuedJob.Restore(None, job.Serialize(), True, True)
380
    self.assertTrue(newjob.archived)
381

    
382
    newjob2 = jqueue._QueuedJob.Restore(None, newjob.Serialize(), True, False)
383
    self.assertFalse(newjob2.archived)
384

    
385
  def testPriority(self):
386
    job_id = 4283
387
    ops = [
388
      opcodes.OpTagsGet(priority=constants.OP_PRIO_DEFAULT),
389
      opcodes.OpTestDelay(),
390
      ]
391

    
392
    def _Check(job):
393
      self.assertEqual(job.id, job_id)
394
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
395
      self.assert_(repr(job).startswith("<"))
396

    
397
    job = jqueue._QueuedJob(None, job_id, ops, True)
398
    _Check(job)
399
    self.assert_(compat.all(op.priority == constants.OP_PRIO_DEFAULT
400
                            for op in job.ops))
401
    self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT)
402

    
403
    # Increase first
404
    job.ops[0].priority -= 1
405
    _Check(job)
406
    self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT - 1)
407

    
408
    # Mark opcode as finished
409
    job.ops[0].status = constants.OP_STATUS_SUCCESS
410
    _Check(job)
411
    self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT)
412

    
413
    # Increase second
414
    job.ops[1].priority -= 10
415
    self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT - 10)
416

    
417
    # Test increasing first
418
    job.ops[0].status = constants.OP_STATUS_RUNNING
419
    job.ops[0].priority -= 19
420
    self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT - 20)
421

    
422
  def _JobForPriority(self, job_id):
423
    ops = [
424
      opcodes.OpTagsGet(),
425
      opcodes.OpTestDelay(),
426
      opcodes.OpTagsGet(),
427
      opcodes.OpTestDelay(),
428
      ]
429

    
430
    job = jqueue._QueuedJob(None, job_id, ops, True)
431

    
432
    self.assertTrue(compat.all(op.priority == constants.OP_PRIO_DEFAULT
433
                               for op in job.ops))
434
    self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT)
435
    self.assertFalse(compat.any(hasattr(op.input, "priority")
436
                                for op in job.ops))
437

    
438
    return job
439

    
440
  def testChangePriorityAllQueued(self):
441
    job = self._JobForPriority(24984)
442
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
443
    self.assertTrue(compat.all(op.status == constants.OP_STATUS_QUEUED
444
                               for op in job.ops))
445
    result = job.ChangePriority(-10)
446
    self.assertEqual(job.CalcPriority(), -10)
447
    self.assertTrue(compat.all(op.priority == -10 for op in job.ops))
448
    self.assertFalse(compat.any(hasattr(op.input, "priority")
449
                                for op in job.ops))
450
    self.assertEqual(result,
451
                     (True, ("Priorities of pending opcodes for job 24984 have"
452
                             " been changed to -10")))
453

    
454
  def testChangePriorityAllFinished(self):
455
    job = self._JobForPriority(16405)
456

    
457
    for (idx, op) in enumerate(job.ops):
458
      if idx > 2:
459
        op.status = constants.OP_STATUS_ERROR
460
      else:
461
        op.status = constants.OP_STATUS_SUCCESS
462

    
463
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_ERROR)
464
    self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT)
465
    result = job.ChangePriority(-10)
466
    self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT)
467
    self.assertTrue(compat.all(op.priority == constants.OP_PRIO_DEFAULT
468
                               for op in job.ops))
469
    self.assertFalse(compat.any(hasattr(op.input, "priority")
470
                                for op in job.ops))
471
    self.assertEqual(map(operator.attrgetter("status"), job.ops), [
472
      constants.OP_STATUS_SUCCESS,
473
      constants.OP_STATUS_SUCCESS,
474
      constants.OP_STATUS_SUCCESS,
475
      constants.OP_STATUS_ERROR,
476
      ])
477
    self.assertEqual(result, (False, "Job 16405 is finished"))
478

    
479
  def testChangePriorityCancelling(self):
480
    job = self._JobForPriority(31572)
481

    
482
    for (idx, op) in enumerate(job.ops):
483
      if idx > 1:
484
        op.status = constants.OP_STATUS_CANCELING
485
      else:
486
        op.status = constants.OP_STATUS_SUCCESS
487

    
488
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_CANCELING)
489
    self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT)
490
    result = job.ChangePriority(5)
491
    self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT)
492
    self.assertTrue(compat.all(op.priority == constants.OP_PRIO_DEFAULT
493
                               for op in job.ops))
494
    self.assertFalse(compat.any(hasattr(op.input, "priority")
495
                                for op in job.ops))
496
    self.assertEqual(map(operator.attrgetter("status"), job.ops), [
497
      constants.OP_STATUS_SUCCESS,
498
      constants.OP_STATUS_SUCCESS,
499
      constants.OP_STATUS_CANCELING,
500
      constants.OP_STATUS_CANCELING,
501
      ])
502
    self.assertEqual(result, (False, "Job 31572 is cancelling"))
503

    
504
  def testChangePriorityFirstRunning(self):
505
    job = self._JobForPriority(1716215889)
506

    
507
    for (idx, op) in enumerate(job.ops):
508
      if idx == 0:
509
        op.status = constants.OP_STATUS_RUNNING
510
      else:
511
        op.status = constants.OP_STATUS_QUEUED
512

    
513
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
514
    self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT)
515
    result = job.ChangePriority(7)
516
    self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT)
517
    self.assertEqual(map(operator.attrgetter("priority"), job.ops),
518
                     [constants.OP_PRIO_DEFAULT, 7, 7, 7])
519
    self.assertFalse(compat.any(hasattr(op.input, "priority")
520
                                for op in job.ops))
521
    self.assertEqual(map(operator.attrgetter("status"), job.ops), [
522
      constants.OP_STATUS_RUNNING,
523
      constants.OP_STATUS_QUEUED,
524
      constants.OP_STATUS_QUEUED,
525
      constants.OP_STATUS_QUEUED,
526
      ])
527
    self.assertEqual(result,
528
                     (True, ("Priorities of pending opcodes for job"
529
                             " 1716215889 have been changed to 7")))
530

    
531
  def testChangePriorityLastRunning(self):
532
    job = self._JobForPriority(1308)
533

    
534
    for (idx, op) in enumerate(job.ops):
535
      if idx == (len(job.ops) - 1):
536
        op.status = constants.OP_STATUS_RUNNING
537
      else:
538
        op.status = constants.OP_STATUS_SUCCESS
539

    
540
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
541
    self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT)
542
    result = job.ChangePriority(-3)
543
    self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT)
544
    self.assertTrue(compat.all(op.priority == constants.OP_PRIO_DEFAULT
545
                               for op in job.ops))
546
    self.assertFalse(compat.any(hasattr(op.input, "priority")
547
                                for op in job.ops))
548
    self.assertEqual(map(operator.attrgetter("status"), job.ops), [
549
      constants.OP_STATUS_SUCCESS,
550
      constants.OP_STATUS_SUCCESS,
551
      constants.OP_STATUS_SUCCESS,
552
      constants.OP_STATUS_RUNNING,
553
      ])
554
    self.assertEqual(result, (False, "Job 1308 had no pending opcodes"))
555

    
556
  def testChangePrioritySecondOpcodeRunning(self):
557
    job = self._JobForPriority(27701)
558

    
559
    self.assertEqual(len(job.ops), 4)
560
    job.ops[0].status = constants.OP_STATUS_SUCCESS
561
    job.ops[1].status = constants.OP_STATUS_RUNNING
562
    job.ops[2].status = constants.OP_STATUS_QUEUED
563
    job.ops[3].status = constants.OP_STATUS_QUEUED
564

    
565
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
566
    result = job.ChangePriority(-19)
567
    self.assertEqual(job.CalcPriority(), -19)
568
    self.assertEqual(map(operator.attrgetter("priority"), job.ops),
569
                     [constants.OP_PRIO_DEFAULT, constants.OP_PRIO_DEFAULT,
570
                      -19, -19])
571
    self.assertFalse(compat.any(hasattr(op.input, "priority")
572
                                for op in job.ops))
573
    self.assertEqual(map(operator.attrgetter("status"), job.ops), [
574
      constants.OP_STATUS_SUCCESS,
575
      constants.OP_STATUS_RUNNING,
576
      constants.OP_STATUS_QUEUED,
577
      constants.OP_STATUS_QUEUED,
578
      ])
579
    self.assertEqual(result,
580
                     (True, ("Priorities of pending opcodes for job"
581
                             " 27701 have been changed to -19")))
582

    
583
  def testChangePriorityWithInconsistentJob(self):
584
    job = self._JobForPriority(30097)
585

    
586
    self.assertEqual(len(job.ops), 4)
587

    
588
    # This job is invalid (as it has two opcodes marked as running) and make
589
    # the call fail because an unprocessed opcode precedes a running one (which
590
    # should never happen in reality)
591
    job.ops[0].status = constants.OP_STATUS_SUCCESS
592
    job.ops[1].status = constants.OP_STATUS_RUNNING
593
    job.ops[2].status = constants.OP_STATUS_QUEUED
594
    job.ops[3].status = constants.OP_STATUS_RUNNING
595

    
596
    self.assertRaises(AssertionError, job.ChangePriority, 19)
597

    
598
  def testCalcStatus(self):
599
    def _Queued(ops):
600
      # The default status is "queued"
601
      self.assert_(compat.all(op.status == constants.OP_STATUS_QUEUED
602
                              for op in ops))
603

    
604
    def _Waitlock1(ops):
605
      ops[0].status = constants.OP_STATUS_WAITING
606

    
607
    def _Waitlock2(ops):
608
      ops[0].status = constants.OP_STATUS_SUCCESS
609
      ops[1].status = constants.OP_STATUS_SUCCESS
610
      ops[2].status = constants.OP_STATUS_WAITING
611

    
612
    def _Running(ops):
613
      ops[0].status = constants.OP_STATUS_SUCCESS
614
      ops[1].status = constants.OP_STATUS_RUNNING
615
      for op in ops[2:]:
616
        op.status = constants.OP_STATUS_QUEUED
617

    
618
    def _Canceling1(ops):
619
      ops[0].status = constants.OP_STATUS_SUCCESS
620
      ops[1].status = constants.OP_STATUS_SUCCESS
621
      for op in ops[2:]:
622
        op.status = constants.OP_STATUS_CANCELING
623

    
624
    def _Canceling2(ops):
625
      for op in ops:
626
        op.status = constants.OP_STATUS_CANCELING
627

    
628
    def _Canceled(ops):
629
      for op in ops:
630
        op.status = constants.OP_STATUS_CANCELED
631

    
632
    def _Error1(ops):
633
      for idx, op in enumerate(ops):
634
        if idx > 3:
635
          op.status = constants.OP_STATUS_ERROR
636
        else:
637
          op.status = constants.OP_STATUS_SUCCESS
638

    
639
    def _Error2(ops):
640
      for op in ops:
641
        op.status = constants.OP_STATUS_ERROR
642

    
643
    def _Success(ops):
644
      for op in ops:
645
        op.status = constants.OP_STATUS_SUCCESS
646

    
647
    tests = {
648
      constants.JOB_STATUS_QUEUED: [_Queued],
649
      constants.JOB_STATUS_WAITING: [_Waitlock1, _Waitlock2],
650
      constants.JOB_STATUS_RUNNING: [_Running],
651
      constants.JOB_STATUS_CANCELING: [_Canceling1, _Canceling2],
652
      constants.JOB_STATUS_CANCELED: [_Canceled],
653
      constants.JOB_STATUS_ERROR: [_Error1, _Error2],
654
      constants.JOB_STATUS_SUCCESS: [_Success],
655
      }
656

    
657
    def _NewJob():
658
      job = jqueue._QueuedJob(None, 1,
659
                              [opcodes.OpTestDelay() for _ in range(10)],
660
                              True)
661
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
662
      self.assert_(compat.all(op.status == constants.OP_STATUS_QUEUED
663
                              for op in job.ops))
664
      return job
665

    
666
    for status in constants.JOB_STATUS_ALL:
667
      sttests = tests[status]
668
      assert sttests
669
      for fn in sttests:
670
        job = _NewJob()
671
        fn(job.ops)
672
        self.assertEqual(job.CalcStatus(), status)
673

    
674

    
675
class _FakeDependencyManager:
676
  def __init__(self):
677
    self._checks = []
678
    self._notifications = []
679
    self._waiting = set()
680

    
681
  def AddCheckResult(self, job, dep_job_id, dep_status, result):
682
    self._checks.append((job, dep_job_id, dep_status, result))
683

    
684
  def CountPendingResults(self):
685
    return len(self._checks)
686

    
687
  def CountWaitingJobs(self):
688
    return len(self._waiting)
689

    
690
  def GetNextNotification(self):
691
    return self._notifications.pop(0)
692

    
693
  def JobWaiting(self, job):
694
    return job in self._waiting
695

    
696
  def CheckAndRegister(self, job, dep_job_id, dep_status):
697
    (exp_job, exp_dep_job_id, exp_dep_status, result) = self._checks.pop(0)
698

    
699
    assert exp_job == job
700
    assert exp_dep_job_id == dep_job_id
701
    assert exp_dep_status == dep_status
702

    
703
    (result_status, _) = result
704

    
705
    if result_status == jqueue._JobDependencyManager.WAIT:
706
      self._waiting.add(job)
707
    elif result_status == jqueue._JobDependencyManager.CONTINUE:
708
      self._waiting.remove(job)
709

    
710
    return result
711

    
712
  def NotifyWaiters(self, job_id):
713
    self._notifications.append(job_id)
714

    
715

    
716
class _DisabledFakeDependencyManager:
717
  def JobWaiting(self, _):
718
    return False
719

    
720
  def CheckAndRegister(self, *args):
721
    assert False, "Should not be called"
722

    
723
  def NotifyWaiters(self, _):
724
    pass
725

    
726

    
727
class _FakeQueueForProc:
728
  def __init__(self, depmgr=None):
729
    self._acquired = False
730
    self._updates = []
731
    self._submitted = []
732
    self._accepting_jobs = True
733

    
734
    self._submit_count = itertools.count(1000)
735

    
736
    if depmgr:
737
      self.depmgr = depmgr
738
    else:
739
      self.depmgr = _DisabledFakeDependencyManager()
740

    
741
  def IsAcquired(self):
742
    return self._acquired
743

    
744
  def GetNextUpdate(self):
745
    return self._updates.pop(0)
746

    
747
  def GetNextSubmittedJob(self):
748
    return self._submitted.pop(0)
749

    
750
  def acquire(self, shared=0):
751
    assert shared == 1
752
    self._acquired = True
753

    
754
  def release(self):
755
    assert self._acquired
756
    self._acquired = False
757

    
758
  def UpdateJobUnlocked(self, job, replicate=True):
759
    assert self._acquired, "Lock not acquired while updating job"
760
    self._updates.append((job, bool(replicate)))
761

    
762
  def SubmitManyJobs(self, jobs):
763
    assert not self._acquired, "Lock acquired while submitting jobs"
764
    job_ids = [self._submit_count.next() for _ in jobs]
765
    self._submitted.extend(zip(job_ids, jobs))
766
    return job_ids
767

    
768
  def StopAcceptingJobs(self):
769
    self._accepting_jobs = False
770

    
771
  def AcceptingJobsUnlocked(self):
772
    return self._accepting_jobs
773

    
774

    
775
class _FakeExecOpCodeForProc:
776
  def __init__(self, queue, before_start, after_start):
777
    self._queue = queue
778
    self._before_start = before_start
779
    self._after_start = after_start
780

    
781
  def __call__(self, op, cbs, timeout=None):
782
    assert isinstance(op, opcodes.OpTestDummy)
783
    assert not self._queue.IsAcquired(), \
784
           "Queue lock not released when executing opcode"
785

    
786
    if self._before_start:
787
      self._before_start(timeout, cbs.CurrentPriority())
788

    
789
    cbs.NotifyStart()
790

    
791
    if self._after_start:
792
      self._after_start(op, cbs)
793

    
794
    # Check again after the callbacks
795
    assert not self._queue.IsAcquired()
796

    
797
    if op.fail:
798
      raise errors.OpExecError("Error requested (%s)" % op.result)
799

    
800
    if hasattr(op, "submit_jobs") and op.submit_jobs is not None:
801
      return cbs.SubmitManyJobs(op.submit_jobs)
802

    
803
    return op.result
804

    
805

    
806
class _JobProcessorTestUtils:
807
  def _CreateJob(self, queue, job_id, ops):
808
    job = jqueue._QueuedJob(queue, job_id, ops, True)
809
    self.assertFalse(job.start_timestamp)
810
    self.assertFalse(job.end_timestamp)
811
    self.assertEqual(len(ops), len(job.ops))
812
    self.assert_(compat.all(op.input == inp
813
                            for (op, inp) in zip(job.ops, ops)))
814
    self.assertEqual(job.GetInfo(["ops"]), [[op.__getstate__() for op in ops]])
815
    return job
816

    
817

    
818
class TestJobProcessor(unittest.TestCase, _JobProcessorTestUtils):
819
  def _GenericCheckJob(self, job):
820
    assert compat.all(isinstance(op.input, opcodes.OpTestDummy)
821
                      for op in job.ops)
822

    
823
    self.assertEqual(job.GetInfo(["opstart", "opexec", "opend"]),
824
                     [[op.start_timestamp for op in job.ops],
825
                      [op.exec_timestamp for op in job.ops],
826
                      [op.end_timestamp for op in job.ops]])
827
    self.assertEqual(job.GetInfo(["received_ts", "start_ts", "end_ts"]),
828
                     [job.received_timestamp,
829
                      job.start_timestamp,
830
                      job.end_timestamp])
831
    self.assert_(job.start_timestamp)
832
    self.assert_(job.end_timestamp)
833
    self.assertEqual(job.start_timestamp, job.ops[0].start_timestamp)
834

    
835
  def testSuccess(self):
836
    queue = _FakeQueueForProc()
837

    
838
    for (job_id, opcount) in [(25351, 1), (6637, 3),
839
                              (24644, 10), (32207, 100)]:
840
      ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
841
             for i in range(opcount)]
842

    
843
      # Create job
844
      job = self._CreateJob(queue, job_id, ops)
845

    
846
      def _BeforeStart(timeout, priority):
847
        self.assertEqual(queue.GetNextUpdate(), (job, True))
848
        self.assertRaises(IndexError, queue.GetNextUpdate)
849
        self.assertFalse(queue.IsAcquired())
850
        self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITING)
851
        self.assertFalse(job.cur_opctx)
852

    
853
      def _AfterStart(op, cbs):
854
        self.assertEqual(queue.GetNextUpdate(), (job, True))
855
        self.assertRaises(IndexError, queue.GetNextUpdate)
856

    
857
        self.assertFalse(queue.IsAcquired())
858
        self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
859
        self.assertFalse(job.cur_opctx)
860

    
861
        # Job is running, cancelling shouldn't be possible
862
        (success, _) = job.Cancel()
863
        self.assertFalse(success)
864

    
865
      opexec = _FakeExecOpCodeForProc(queue, _BeforeStart, _AfterStart)
866

    
867
      for idx in range(len(ops)):
868
        self.assertRaises(IndexError, queue.GetNextUpdate)
869
        result = jqueue._JobProcessor(queue, opexec, job)()
870
        self.assertEqual(queue.GetNextUpdate(), (job, True))
871
        self.assertRaises(IndexError, queue.GetNextUpdate)
872
        if idx == len(ops) - 1:
873
          # Last opcode
874
          self.assertEqual(result, jqueue._JobProcessor.FINISHED)
875
        else:
876
          self.assertEqual(result, jqueue._JobProcessor.DEFER)
877

    
878
          self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
879
          self.assert_(job.start_timestamp)
880
          self.assertFalse(job.end_timestamp)
881

    
882
      self.assertRaises(IndexError, queue.GetNextUpdate)
883

    
884
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_SUCCESS)
885
      self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_SUCCESS])
886
      self.assertEqual(job.GetInfo(["opresult"]),
887
                       [[op.input.result for op in job.ops]])
888
      self.assertEqual(job.GetInfo(["opstatus"]),
889
                       [len(job.ops) * [constants.OP_STATUS_SUCCESS]])
890
      self.assert_(compat.all(op.start_timestamp and op.end_timestamp
891
                              for op in job.ops))
892

    
893
      self._GenericCheckJob(job)
894

    
895
      # Calling the processor on a finished job should be a no-op
896
      self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
897
                       jqueue._JobProcessor.FINISHED)
898
      self.assertRaises(IndexError, queue.GetNextUpdate)
899

    
900
  def testOpcodeError(self):
901
    queue = _FakeQueueForProc()
902

    
903
    testdata = [
904
      (17077, 1, 0, 0),
905
      (1782, 5, 2, 2),
906
      (18179, 10, 9, 9),
907
      (4744, 10, 3, 8),
908
      (23816, 100, 39, 45),
909
      ]
910

    
911
    for (job_id, opcount, failfrom, failto) in testdata:
912
      # Prepare opcodes
913
      ops = [opcodes.OpTestDummy(result="Res%s" % i,
914
                                 fail=(failfrom <= i and
915
                                       i <= failto))
916
             for i in range(opcount)]
917

    
918
      # Create job
919
      job = self._CreateJob(queue, str(job_id), ops)
920

    
921
      opexec = _FakeExecOpCodeForProc(queue, None, None)
922

    
923
      for idx in range(len(ops)):
924
        self.assertRaises(IndexError, queue.GetNextUpdate)
925
        result = jqueue._JobProcessor(queue, opexec, job)()
926
        # queued to waitlock
927
        self.assertEqual(queue.GetNextUpdate(), (job, True))
928
        # waitlock to running
929
        self.assertEqual(queue.GetNextUpdate(), (job, True))
930
        # Opcode result
931
        self.assertEqual(queue.GetNextUpdate(), (job, True))
932
        self.assertRaises(IndexError, queue.GetNextUpdate)
933

    
934
        if idx in (failfrom, len(ops) - 1):
935
          # Last opcode
936
          self.assertEqual(result, jqueue._JobProcessor.FINISHED)
937
          break
938

    
939
        self.assertEqual(result, jqueue._JobProcessor.DEFER)
940

    
941
        self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
942

    
943
      self.assertRaises(IndexError, queue.GetNextUpdate)
944

    
945
      # Check job status
946
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_ERROR)
947
      self.assertEqual(job.GetInfo(["id"]), [job_id])
948
      self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_ERROR])
949

    
950
      # Check opcode status
951
      data = zip(job.ops,
952
                 job.GetInfo(["opstatus"])[0],
953
                 job.GetInfo(["opresult"])[0])
954

    
955
      for idx, (op, opstatus, opresult) in enumerate(data):
956
        if idx < failfrom:
957
          assert not op.input.fail
958
          self.assertEqual(opstatus, constants.OP_STATUS_SUCCESS)
959
          self.assertEqual(opresult, op.input.result)
960
        elif idx <= failto:
961
          assert op.input.fail
962
          self.assertEqual(opstatus, constants.OP_STATUS_ERROR)
963
          self.assertRaises(errors.OpExecError, errors.MaybeRaise, opresult)
964
        else:
965
          assert not op.input.fail
966
          self.assertEqual(opstatus, constants.OP_STATUS_ERROR)
967
          self.assertRaises(errors.OpExecError, errors.MaybeRaise, opresult)
968

    
969
      self.assert_(compat.all(op.start_timestamp and op.end_timestamp
970
                              for op in job.ops[:failfrom]))
971

    
972
      self._GenericCheckJob(job)
973

    
974
      # Calling the processor on a finished job should be a no-op
975
      self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
976
                       jqueue._JobProcessor.FINISHED)
977
      self.assertRaises(IndexError, queue.GetNextUpdate)
978

    
979
  def testCancelWhileInQueue(self):
980
    queue = _FakeQueueForProc()
981

    
982
    ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
983
           for i in range(5)]
984

    
985
    # Create job
986
    job_id = 17045
987
    job = self._CreateJob(queue, job_id, ops)
988

    
989
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
990

    
991
    # Mark as cancelled
992
    (success, _) = job.Cancel()
993
    self.assert_(success)
994

    
995
    self.assertRaises(IndexError, queue.GetNextUpdate)
996

    
997
    self.assertFalse(job.start_timestamp)
998
    self.assertTrue(job.end_timestamp)
999
    self.assert_(compat.all(op.status == constants.OP_STATUS_CANCELED
1000
                            for op in job.ops))
1001

    
1002
    # Serialize to check for differences
1003
    before_proc = job.Serialize()
1004

    
1005
    # Simulate processor called in workerpool
1006
    opexec = _FakeExecOpCodeForProc(queue, None, None)
1007
    self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
1008
                     jqueue._JobProcessor.FINISHED)
1009

    
1010
    # Check result
1011
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_CANCELED)
1012
    self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_CANCELED])
1013
    self.assertFalse(job.start_timestamp)
1014
    self.assertTrue(job.end_timestamp)
1015
    self.assertFalse(compat.any(op.start_timestamp or op.end_timestamp
1016
                                for op in job.ops))
1017
    self.assertEqual(job.GetInfo(["opstatus", "opresult"]),
1018
                     [[constants.OP_STATUS_CANCELED for _ in job.ops],
1019
                      ["Job canceled by request" for _ in job.ops]])
1020

    
1021
    # Must not have changed or written
1022
    self.assertEqual(before_proc, job.Serialize())
1023
    self.assertRaises(IndexError, queue.GetNextUpdate)
1024

    
1025
  def testCancelWhileWaitlockInQueue(self):
1026
    queue = _FakeQueueForProc()
1027

    
1028
    ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
1029
           for i in range(5)]
1030

    
1031
    # Create job
1032
    job_id = 8645
1033
    job = self._CreateJob(queue, job_id, ops)
1034

    
1035
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1036

    
1037
    job.ops[0].status = constants.OP_STATUS_WAITING
1038

    
1039
    assert len(job.ops) == 5
1040

    
1041
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITING)
1042

    
1043
    # Mark as cancelling
1044
    (success, _) = job.Cancel()
1045
    self.assert_(success)
1046

    
1047
    self.assertRaises(IndexError, queue.GetNextUpdate)
1048

    
1049
    self.assert_(compat.all(op.status == constants.OP_STATUS_CANCELING
1050
                            for op in job.ops))
1051

    
1052
    opexec = _FakeExecOpCodeForProc(queue, None, None)
1053
    self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
1054
                     jqueue._JobProcessor.FINISHED)
1055

    
1056
    # Check result
1057
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_CANCELED)
1058
    self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_CANCELED])
1059
    self.assertFalse(job.start_timestamp)
1060
    self.assert_(job.end_timestamp)
1061
    self.assertFalse(compat.any(op.start_timestamp or op.end_timestamp
1062
                                for op in job.ops))
1063
    self.assertEqual(job.GetInfo(["opstatus", "opresult"]),
1064
                     [[constants.OP_STATUS_CANCELED for _ in job.ops],
1065
                      ["Job canceled by request" for _ in job.ops]])
1066

    
1067
  def testCancelWhileWaitlock(self):
1068
    queue = _FakeQueueForProc()
1069

    
1070
    ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
1071
           for i in range(5)]
1072

    
1073
    # Create job
1074
    job_id = 11009
1075
    job = self._CreateJob(queue, job_id, ops)
1076

    
1077
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1078

    
1079
    def _BeforeStart(timeout, priority):
1080
      self.assertEqual(queue.GetNextUpdate(), (job, True))
1081
      self.assertRaises(IndexError, queue.GetNextUpdate)
1082
      self.assertFalse(queue.IsAcquired())
1083
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITING)
1084

    
1085
      # Mark as cancelled
1086
      (success, _) = job.Cancel()
1087
      self.assert_(success)
1088

    
1089
      self.assert_(compat.all(op.status == constants.OP_STATUS_CANCELING
1090
                              for op in job.ops))
1091
      self.assertRaises(IndexError, queue.GetNextUpdate)
1092

    
1093
    def _AfterStart(op, cbs):
1094
      self.assertEqual(queue.GetNextUpdate(), (job, True))
1095
      self.assertRaises(IndexError, queue.GetNextUpdate)
1096
      self.assertFalse(queue.IsAcquired())
1097
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
1098

    
1099
    opexec = _FakeExecOpCodeForProc(queue, _BeforeStart, _AfterStart)
1100

    
1101
    self.assertRaises(IndexError, queue.GetNextUpdate)
1102
    self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
1103
                     jqueue._JobProcessor.FINISHED)
1104
    self.assertEqual(queue.GetNextUpdate(), (job, True))
1105
    self.assertRaises(IndexError, queue.GetNextUpdate)
1106

    
1107
    # Check result
1108
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_CANCELED)
1109
    self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_CANCELED])
1110
    self.assert_(job.start_timestamp)
1111
    self.assert_(job.end_timestamp)
1112
    self.assertFalse(compat.all(op.start_timestamp and op.end_timestamp
1113
                                for op in job.ops))
1114
    self.assertEqual(job.GetInfo(["opstatus", "opresult"]),
1115
                     [[constants.OP_STATUS_CANCELED for _ in job.ops],
1116
                      ["Job canceled by request" for _ in job.ops]])
1117

    
1118
  def _TestCancelWhileSomething(self, cb):
1119
    queue = _FakeQueueForProc()
1120

    
1121
    ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
1122
           for i in range(5)]
1123

    
1124
    # Create job
1125
    job_id = 24314
1126
    job = self._CreateJob(queue, job_id, ops)
1127

    
1128
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1129

    
1130
    def _BeforeStart(timeout, priority):
1131
      self.assertFalse(queue.IsAcquired())
1132
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITING)
1133

    
1134
      # Mark as cancelled
1135
      (success, _) = job.Cancel()
1136
      self.assert_(success)
1137

    
1138
      self.assert_(compat.all(op.status == constants.OP_STATUS_CANCELING
1139
                              for op in job.ops))
1140

    
1141
      cb(queue)
1142

    
1143
    def _AfterStart(op, cbs):
1144
      self.fail("Should not reach this")
1145

    
1146
    opexec = _FakeExecOpCodeForProc(queue, _BeforeStart, _AfterStart)
1147

    
1148
    self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
1149
                     jqueue._JobProcessor.FINISHED)
1150

    
1151
    # Check result
1152
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_CANCELED)
1153
    self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_CANCELED])
1154
    self.assert_(job.start_timestamp)
1155
    self.assert_(job.end_timestamp)
1156
    self.assertFalse(compat.all(op.start_timestamp and op.end_timestamp
1157
                                for op in job.ops))
1158
    self.assertEqual(job.GetInfo(["opstatus", "opresult"]),
1159
                     [[constants.OP_STATUS_CANCELED for _ in job.ops],
1160
                      ["Job canceled by request" for _ in job.ops]])
1161

    
1162
    return queue
1163

    
1164
  def testCancelWhileWaitlockWithTimeout(self):
1165
    def fn(_):
1166
      # Fake an acquire attempt timing out
1167
      raise mcpu.LockAcquireTimeout()
1168

    
1169
    self._TestCancelWhileSomething(fn)
1170

    
1171
  def testCancelDuringQueueShutdown(self):
1172
    queue = self._TestCancelWhileSomething(lambda q: q.StopAcceptingJobs())
1173
    self.assertFalse(queue.AcceptingJobsUnlocked())
1174

    
1175
  def testCancelWhileRunning(self):
1176
    # Tests canceling a job with finished opcodes and more, unprocessed ones
1177
    queue = _FakeQueueForProc()
1178

    
1179
    ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
1180
           for i in range(3)]
1181

    
1182
    # Create job
1183
    job_id = 28492
1184
    job = self._CreateJob(queue, job_id, ops)
1185

    
1186
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1187

    
1188
    opexec = _FakeExecOpCodeForProc(queue, None, None)
1189

    
1190
    # Run one opcode
1191
    self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
1192
                     jqueue._JobProcessor.DEFER)
1193

    
1194
    # Job goes back to queued
1195
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1196
    self.assertEqual(job.GetInfo(["opstatus", "opresult"]),
1197
                     [[constants.OP_STATUS_SUCCESS,
1198
                       constants.OP_STATUS_QUEUED,
1199
                       constants.OP_STATUS_QUEUED],
1200
                      ["Res0", None, None]])
1201

    
1202
    # Mark as cancelled
1203
    (success, _) = job.Cancel()
1204
    self.assert_(success)
1205

    
1206
    # Try processing another opcode (this will actually cancel the job)
1207
    self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
1208
                     jqueue._JobProcessor.FINISHED)
1209

    
1210
    # Check result
1211
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_CANCELED)
1212
    self.assertEqual(job.GetInfo(["id"]), [job_id])
1213
    self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_CANCELED])
1214
    self.assertEqual(job.GetInfo(["opstatus", "opresult"]),
1215
                     [[constants.OP_STATUS_SUCCESS,
1216
                       constants.OP_STATUS_CANCELED,
1217
                       constants.OP_STATUS_CANCELED],
1218
                      ["Res0", "Job canceled by request",
1219
                       "Job canceled by request"]])
1220

    
1221
  def _TestQueueShutdown(self, queue, opexec, job, runcount):
1222
    self.assertTrue(queue.AcceptingJobsUnlocked())
1223

    
1224
    # Simulate shutdown
1225
    queue.StopAcceptingJobs()
1226

    
1227
    self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
1228
                     jqueue._JobProcessor.DEFER)
1229

    
1230
    # Check result
1231
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1232
    self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_QUEUED])
1233
    self.assertFalse(job.cur_opctx)
1234
    self.assertTrue(job.start_timestamp)
1235
    self.assertFalse(job.end_timestamp)
1236
    self.assertEqual(job.start_timestamp, job.ops[0].start_timestamp)
1237
    self.assertTrue(compat.all(op.start_timestamp and op.end_timestamp
1238
                               for op in job.ops[:runcount]))
1239
    self.assertFalse(job.ops[runcount].end_timestamp)
1240
    self.assertFalse(compat.any(op.start_timestamp or op.end_timestamp
1241
                                for op in job.ops[(runcount + 1):]))
1242
    self.assertEqual(job.GetInfo(["opstatus", "opresult"]),
1243
                     [(([constants.OP_STATUS_SUCCESS] * runcount) +
1244
                       ([constants.OP_STATUS_QUEUED] *
1245
                        (len(job.ops) - runcount))),
1246
                      (["Res%s" % i for i in range(runcount)] +
1247
                       ([None] * (len(job.ops) - runcount)))])
1248

    
1249
    # Must have been written and replicated
1250
    self.assertEqual(queue.GetNextUpdate(), (job, True))
1251
    self.assertRaises(IndexError, queue.GetNextUpdate)
1252

    
1253
  def testQueueShutdownWhileRunning(self):
1254
    # Tests shutting down the queue while a job is running
1255
    queue = _FakeQueueForProc()
1256

    
1257
    ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
1258
           for i in range(3)]
1259

    
1260
    # Create job
1261
    job_id = 2718211587
1262
    job = self._CreateJob(queue, job_id, ops)
1263

    
1264
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1265

    
1266
    opexec = _FakeExecOpCodeForProc(queue, None, None)
1267

    
1268
    self.assertRaises(IndexError, queue.GetNextUpdate)
1269

    
1270
    # Run one opcode
1271
    self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
1272
                     jqueue._JobProcessor.DEFER)
1273

    
1274
    # Job goes back to queued
1275
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1276
    self.assertEqual(job.GetInfo(["opstatus", "opresult"]),
1277
                     [[constants.OP_STATUS_SUCCESS,
1278
                       constants.OP_STATUS_QUEUED,
1279
                       constants.OP_STATUS_QUEUED],
1280
                      ["Res0", None, None]])
1281
    self.assertFalse(job.cur_opctx)
1282

    
1283
    # Writes for waiting, running and result
1284
    for _ in range(3):
1285
      self.assertEqual(queue.GetNextUpdate(), (job, True))
1286

    
1287
    # Run second opcode
1288
    self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
1289
                     jqueue._JobProcessor.DEFER)
1290

    
1291
    # Job goes back to queued
1292
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1293
    self.assertEqual(job.GetInfo(["opstatus", "opresult"]),
1294
                     [[constants.OP_STATUS_SUCCESS,
1295
                       constants.OP_STATUS_SUCCESS,
1296
                       constants.OP_STATUS_QUEUED],
1297
                      ["Res0", "Res1", None]])
1298
    self.assertFalse(job.cur_opctx)
1299

    
1300
    # Writes for waiting, running and result
1301
    for _ in range(3):
1302
      self.assertEqual(queue.GetNextUpdate(), (job, True))
1303

    
1304
    self._TestQueueShutdown(queue, opexec, job, 2)
1305

    
1306
  def testQueueShutdownWithLockTimeout(self):
1307
    # Tests shutting down while a lock acquire times out
1308
    queue = _FakeQueueForProc()
1309

    
1310
    ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
1311
           for i in range(3)]
1312

    
1313
    # Create job
1314
    job_id = 1304231178
1315
    job = self._CreateJob(queue, job_id, ops)
1316

    
1317
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1318

    
1319
    acquire_timeout = False
1320

    
1321
    def _BeforeStart(timeout, priority):
1322
      self.assertFalse(queue.IsAcquired())
1323
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITING)
1324
      if acquire_timeout:
1325
        raise mcpu.LockAcquireTimeout()
1326

    
1327
    opexec = _FakeExecOpCodeForProc(queue, _BeforeStart, None)
1328

    
1329
    self.assertRaises(IndexError, queue.GetNextUpdate)
1330

    
1331
    # Run one opcode
1332
    self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
1333
                     jqueue._JobProcessor.DEFER)
1334

    
1335
    # Job goes back to queued
1336
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1337
    self.assertEqual(job.GetInfo(["opstatus", "opresult"]),
1338
                     [[constants.OP_STATUS_SUCCESS,
1339
                       constants.OP_STATUS_QUEUED,
1340
                       constants.OP_STATUS_QUEUED],
1341
                      ["Res0", None, None]])
1342
    self.assertFalse(job.cur_opctx)
1343

    
1344
    # Writes for waiting, running and result
1345
    for _ in range(3):
1346
      self.assertEqual(queue.GetNextUpdate(), (job, True))
1347

    
1348
    # The next opcode should have expiring lock acquires
1349
    acquire_timeout = True
1350

    
1351
    self._TestQueueShutdown(queue, opexec, job, 1)
1352

    
1353
  def testQueueShutdownWhileInQueue(self):
1354
    # This should never happen in reality (no new jobs are started by the
1355
    # workerpool once a shutdown has been initiated), but it's better to test
1356
    # the job processor for this scenario
1357
    queue = _FakeQueueForProc()
1358

    
1359
    ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
1360
           for i in range(5)]
1361

    
1362
    # Create job
1363
    job_id = 2031
1364
    job = self._CreateJob(queue, job_id, ops)
1365

    
1366
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1367
    self.assertRaises(IndexError, queue.GetNextUpdate)
1368

    
1369
    self.assertFalse(job.start_timestamp)
1370
    self.assertFalse(job.end_timestamp)
1371
    self.assertTrue(compat.all(op.status == constants.OP_STATUS_QUEUED
1372
                               for op in job.ops))
1373

    
1374
    opexec = _FakeExecOpCodeForProc(queue, None, None)
1375
    self._TestQueueShutdown(queue, opexec, job, 0)
1376

    
1377
  def testQueueShutdownWhileWaitlockInQueue(self):
1378
    queue = _FakeQueueForProc()
1379

    
1380
    ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
1381
           for i in range(5)]
1382

    
1383
    # Create job
1384
    job_id = 53125685
1385
    job = self._CreateJob(queue, job_id, ops)
1386

    
1387
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1388

    
1389
    job.ops[0].status = constants.OP_STATUS_WAITING
1390

    
1391
    assert len(job.ops) == 5
1392

    
1393
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITING)
1394

    
1395
    self.assertRaises(IndexError, queue.GetNextUpdate)
1396

    
1397
    opexec = _FakeExecOpCodeForProc(queue, None, None)
1398
    self._TestQueueShutdown(queue, opexec, job, 0)
1399

    
1400
  def testPartiallyRun(self):
1401
    # Tests calling the processor on a job that's been partially run before the
1402
    # program was restarted
1403
    queue = _FakeQueueForProc()
1404

    
1405
    opexec = _FakeExecOpCodeForProc(queue, None, None)
1406

    
1407
    for job_id, successcount in [(30697, 1), (2552, 4), (12489, 9)]:
1408
      ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
1409
             for i in range(10)]
1410

    
1411
      # Create job
1412
      job = self._CreateJob(queue, job_id, ops)
1413

    
1414
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1415

    
1416
      for _ in range(successcount):
1417
        self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
1418
                         jqueue._JobProcessor.DEFER)
1419

    
1420
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1421
      self.assertEqual(job.GetInfo(["opstatus"]),
1422
                       [[constants.OP_STATUS_SUCCESS
1423
                         for _ in range(successcount)] +
1424
                        [constants.OP_STATUS_QUEUED
1425
                         for _ in range(len(ops) - successcount)]])
1426

    
1427
      self.assert_(job.ops_iter)
1428

    
1429
      # Serialize and restore (simulates program restart)
1430
      newjob = jqueue._QueuedJob.Restore(queue, job.Serialize(), True, False)
1431
      self.assertFalse(newjob.ops_iter)
1432
      self._TestPartial(newjob, successcount)
1433

    
1434
  def _TestPartial(self, job, successcount):
1435
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1436
    self.assertEqual(job.start_timestamp, job.ops[0].start_timestamp)
1437

    
1438
    queue = _FakeQueueForProc()
1439
    opexec = _FakeExecOpCodeForProc(queue, None, None)
1440

    
1441
    for remaining in reversed(range(len(job.ops) - successcount)):
1442
      result = jqueue._JobProcessor(queue, opexec, job)()
1443
      self.assertEqual(queue.GetNextUpdate(), (job, True))
1444
      self.assertEqual(queue.GetNextUpdate(), (job, True))
1445
      self.assertEqual(queue.GetNextUpdate(), (job, True))
1446
      self.assertRaises(IndexError, queue.GetNextUpdate)
1447

    
1448
      if remaining == 0:
1449
        # Last opcode
1450
        self.assertEqual(result, jqueue._JobProcessor.FINISHED)
1451
        break
1452

    
1453
      self.assertEqual(result, jqueue._JobProcessor.DEFER)
1454

    
1455
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1456

    
1457
    self.assertRaises(IndexError, queue.GetNextUpdate)
1458
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_SUCCESS)
1459
    self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_SUCCESS])
1460
    self.assertEqual(job.GetInfo(["opresult"]),
1461
                     [[op.input.result for op in job.ops]])
1462
    self.assertEqual(job.GetInfo(["opstatus"]),
1463
                     [[constants.OP_STATUS_SUCCESS for _ in job.ops]])
1464
    self.assert_(compat.all(op.start_timestamp and op.end_timestamp
1465
                            for op in job.ops))
1466

    
1467
    self._GenericCheckJob(job)
1468

    
1469
    # Calling the processor on a finished job should be a no-op
1470
    self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
1471
                     jqueue._JobProcessor.FINISHED)
1472
    self.assertRaises(IndexError, queue.GetNextUpdate)
1473

    
1474
    # ... also after being restored
1475
    job2 = jqueue._QueuedJob.Restore(queue, job.Serialize(), True, False)
1476
    # Calling the processor on a finished job should be a no-op
1477
    self.assertEqual(jqueue._JobProcessor(queue, opexec, job2)(),
1478
                     jqueue._JobProcessor.FINISHED)
1479
    self.assertRaises(IndexError, queue.GetNextUpdate)
1480

    
1481
  def testProcessorOnRunningJob(self):
1482
    ops = [opcodes.OpTestDummy(result="result", fail=False)]
1483

    
1484
    queue = _FakeQueueForProc()
1485
    opexec = _FakeExecOpCodeForProc(queue, None, None)
1486

    
1487
    # Create job
1488
    job = self._CreateJob(queue, 9571, ops)
1489

    
1490
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1491

    
1492
    job.ops[0].status = constants.OP_STATUS_RUNNING
1493

    
1494
    assert len(job.ops) == 1
1495

    
1496
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
1497

    
1498
    # Calling on running job must fail
1499
    self.assertRaises(errors.ProgrammerError,
1500
                      jqueue._JobProcessor(queue, opexec, job))
1501

    
1502
  def testLogMessages(self):
1503
    # Tests the "Feedback" callback function
1504
    queue = _FakeQueueForProc()
1505

    
1506
    messages = {
1507
      1: [
1508
        (None, "Hello"),
1509
        (None, "World"),
1510
        (constants.ELOG_MESSAGE, "there"),
1511
        ],
1512
      4: [
1513
        (constants.ELOG_JQUEUE_TEST, (1, 2, 3)),
1514
        (constants.ELOG_JQUEUE_TEST, ("other", "type")),
1515
        ],
1516
      }
1517
    ops = [opcodes.OpTestDummy(result="Logtest%s" % i, fail=False,
1518
                               messages=messages.get(i, []))
1519
           for i in range(5)]
1520

    
1521
    # Create job
1522
    job = self._CreateJob(queue, 29386, ops)
1523

    
1524
    def _BeforeStart(timeout, priority):
1525
      self.assertEqual(queue.GetNextUpdate(), (job, True))
1526
      self.assertRaises(IndexError, queue.GetNextUpdate)
1527
      self.assertFalse(queue.IsAcquired())
1528
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITING)
1529

    
1530
    def _AfterStart(op, cbs):
1531
      self.assertEqual(queue.GetNextUpdate(), (job, True))
1532
      self.assertRaises(IndexError, queue.GetNextUpdate)
1533
      self.assertFalse(queue.IsAcquired())
1534
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
1535

    
1536
      self.assertRaises(AssertionError, cbs.Feedback,
1537
                        "too", "many", "arguments")
1538

    
1539
      for (log_type, msg) in op.messages:
1540
        self.assertRaises(IndexError, queue.GetNextUpdate)
1541
        if log_type:
1542
          cbs.Feedback(log_type, msg)
1543
        else:
1544
          cbs.Feedback(msg)
1545
        # Check for job update without replication
1546
        self.assertEqual(queue.GetNextUpdate(), (job, False))
1547
        self.assertRaises(IndexError, queue.GetNextUpdate)
1548

    
1549
    opexec = _FakeExecOpCodeForProc(queue, _BeforeStart, _AfterStart)
1550

    
1551
    for remaining in reversed(range(len(job.ops))):
1552
      self.assertRaises(IndexError, queue.GetNextUpdate)
1553
      result = jqueue._JobProcessor(queue, opexec, job)()
1554
      self.assertEqual(queue.GetNextUpdate(), (job, True))
1555
      self.assertRaises(IndexError, queue.GetNextUpdate)
1556

    
1557
      if remaining == 0:
1558
        # Last opcode
1559
        self.assertEqual(result, jqueue._JobProcessor.FINISHED)
1560
        break
1561

    
1562
      self.assertEqual(result, jqueue._JobProcessor.DEFER)
1563

    
1564
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1565

    
1566
    self.assertRaises(IndexError, queue.GetNextUpdate)
1567

    
1568
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_SUCCESS)
1569
    self.assertEqual(job.GetInfo(["opresult"]),
1570
                     [[op.input.result for op in job.ops]])
1571

    
1572
    logmsgcount = sum(len(m) for m in messages.values())
1573

    
1574
    self._CheckLogMessages(job, logmsgcount)
1575

    
1576
    # Serialize and restore (simulates program restart)
1577
    newjob = jqueue._QueuedJob.Restore(queue, job.Serialize(), True, False)
1578
    self._CheckLogMessages(newjob, logmsgcount)
1579

    
1580
    # Check each message
1581
    prevserial = -1
1582
    for idx, oplog in enumerate(job.GetInfo(["oplog"])[0]):
1583
      for (serial, timestamp, log_type, msg) in oplog:
1584
        (exptype, expmsg) = messages.get(idx).pop(0)
1585
        if exptype:
1586
          self.assertEqual(log_type, exptype)
1587
        else:
1588
          self.assertEqual(log_type, constants.ELOG_MESSAGE)
1589
        self.assertEqual(expmsg, msg)
1590
        self.assert_(serial > prevserial)
1591
        prevserial = serial
1592

    
1593
  def _CheckLogMessages(self, job, count):
1594
    # Check serial
1595
    self.assertEqual(job.log_serial, count)
1596

    
1597
    # No filter
1598
    self.assertEqual(job.GetLogEntries(None),
1599
                     [entry for entries in job.GetInfo(["oplog"])[0] if entries
1600
                      for entry in entries])
1601

    
1602
    # Filter with serial
1603
    assert count > 3
1604
    self.assert_(job.GetLogEntries(3))
1605
    self.assertEqual(job.GetLogEntries(3),
1606
                     [entry for entries in job.GetInfo(["oplog"])[0] if entries
1607
                      for entry in entries][3:])
1608

    
1609
    # No log message after highest serial
1610
    self.assertFalse(job.GetLogEntries(count))
1611
    self.assertFalse(job.GetLogEntries(count + 3))
1612

    
1613
  def testSubmitManyJobs(self):
1614
    queue = _FakeQueueForProc()
1615

    
1616
    job_id = 15656
1617
    ops = [
1618
      opcodes.OpTestDummy(result="Res0", fail=False,
1619
                          submit_jobs=[]),
1620
      opcodes.OpTestDummy(result="Res1", fail=False,
1621
                          submit_jobs=[
1622
                            [opcodes.OpTestDummy(result="r1j0", fail=False)],
1623
                            ]),
1624
      opcodes.OpTestDummy(result="Res2", fail=False,
1625
                          submit_jobs=[
1626
                            [opcodes.OpTestDummy(result="r2j0o0", fail=False),
1627
                             opcodes.OpTestDummy(result="r2j0o1", fail=False),
1628
                             opcodes.OpTestDummy(result="r2j0o2", fail=False),
1629
                             opcodes.OpTestDummy(result="r2j0o3", fail=False)],
1630
                            [opcodes.OpTestDummy(result="r2j1", fail=False)],
1631
                            [opcodes.OpTestDummy(result="r2j3o0", fail=False),
1632
                             opcodes.OpTestDummy(result="r2j3o1", fail=False)],
1633
                            ]),
1634
      ]
1635

    
1636
    # Create job
1637
    job = self._CreateJob(queue, job_id, ops)
1638

    
1639
    def _BeforeStart(timeout, priority):
1640
      self.assertEqual(queue.GetNextUpdate(), (job, True))
1641
      self.assertRaises(IndexError, queue.GetNextUpdate)
1642
      self.assertFalse(queue.IsAcquired())
1643
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITING)
1644
      self.assertFalse(job.cur_opctx)
1645

    
1646
    def _AfterStart(op, cbs):
1647
      self.assertEqual(queue.GetNextUpdate(), (job, True))
1648
      self.assertRaises(IndexError, queue.GetNextUpdate)
1649

    
1650
      self.assertFalse(queue.IsAcquired())
1651
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
1652
      self.assertFalse(job.cur_opctx)
1653

    
1654
      # Job is running, cancelling shouldn't be possible
1655
      (success, _) = job.Cancel()
1656
      self.assertFalse(success)
1657

    
1658
    opexec = _FakeExecOpCodeForProc(queue, _BeforeStart, _AfterStart)
1659

    
1660
    for idx in range(len(ops)):
1661
      self.assertRaises(IndexError, queue.GetNextUpdate)
1662
      result = jqueue._JobProcessor(queue, opexec, job)()
1663
      self.assertEqual(queue.GetNextUpdate(), (job, True))
1664
      self.assertRaises(IndexError, queue.GetNextUpdate)
1665
      if idx == len(ops) - 1:
1666
        # Last opcode
1667
        self.assertEqual(result, jqueue._JobProcessor.FINISHED)
1668
      else:
1669
        self.assertEqual(result, jqueue._JobProcessor.DEFER)
1670

    
1671
        self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1672
        self.assert_(job.start_timestamp)
1673
        self.assertFalse(job.end_timestamp)
1674

    
1675
    self.assertRaises(IndexError, queue.GetNextUpdate)
1676

    
1677
    for idx, submitted_ops in enumerate(job_ops
1678
                                        for op in ops
1679
                                        for job_ops in op.submit_jobs):
1680
      self.assertEqual(queue.GetNextSubmittedJob(),
1681
                       (1000 + idx, submitted_ops))
1682
    self.assertRaises(IndexError, queue.GetNextSubmittedJob)
1683

    
1684
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_SUCCESS)
1685
    self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_SUCCESS])
1686
    self.assertEqual(job.GetInfo(["opresult"]),
1687
                     [[[], [1000], [1001, 1002, 1003]]])
1688
    self.assertEqual(job.GetInfo(["opstatus"]),
1689
                     [len(job.ops) * [constants.OP_STATUS_SUCCESS]])
1690

    
1691
    self._GenericCheckJob(job)
1692

    
1693
    # Calling the processor on a finished job should be a no-op
1694
    self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
1695
                     jqueue._JobProcessor.FINISHED)
1696
    self.assertRaises(IndexError, queue.GetNextUpdate)
1697

    
1698
  def testJobDependency(self):
1699
    depmgr = _FakeDependencyManager()
1700
    queue = _FakeQueueForProc(depmgr=depmgr)
1701

    
1702
    self.assertEqual(queue.depmgr, depmgr)
1703

    
1704
    prev_job_id = 22113
1705
    prev_job_id2 = 28102
1706
    job_id = 29929
1707
    ops = [
1708
      opcodes.OpTestDummy(result="Res0", fail=False,
1709
                          depends=[
1710
                            [prev_job_id2, None],
1711
                            [prev_job_id, None],
1712
                            ]),
1713
      opcodes.OpTestDummy(result="Res1", fail=False),
1714
      ]
1715

    
1716
    # Create job
1717
    job = self._CreateJob(queue, job_id, ops)
1718

    
1719
    def _BeforeStart(timeout, priority):
1720
      if attempt == 0 or attempt > 5:
1721
        # Job should only be updated when it wasn't waiting for another job
1722
        self.assertEqual(queue.GetNextUpdate(), (job, True))
1723
      self.assertRaises(IndexError, queue.GetNextUpdate)
1724
      self.assertFalse(queue.IsAcquired())
1725
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITING)
1726
      self.assertFalse(job.cur_opctx)
1727

    
1728
    def _AfterStart(op, cbs):
1729
      self.assertEqual(queue.GetNextUpdate(), (job, True))
1730
      self.assertRaises(IndexError, queue.GetNextUpdate)
1731

    
1732
      self.assertFalse(queue.IsAcquired())
1733
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
1734
      self.assertFalse(job.cur_opctx)
1735

    
1736
      # Job is running, cancelling shouldn't be possible
1737
      (success, _) = job.Cancel()
1738
      self.assertFalse(success)
1739

    
1740
    opexec = _FakeExecOpCodeForProc(queue, _BeforeStart, _AfterStart)
1741

    
1742
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1743

    
1744
    counter = itertools.count()
1745
    while True:
1746
      attempt = counter.next()
1747

    
1748
      self.assertRaises(IndexError, queue.GetNextUpdate)
1749
      self.assertRaises(IndexError, depmgr.GetNextNotification)
1750

    
1751
      if attempt < 2:
1752
        depmgr.AddCheckResult(job, prev_job_id2, None,
1753
                              (jqueue._JobDependencyManager.WAIT, "wait2"))
1754
      elif attempt == 2:
1755
        depmgr.AddCheckResult(job, prev_job_id2, None,
1756
                              (jqueue._JobDependencyManager.CONTINUE, "cont"))
1757
        # The processor will ask for the next dependency immediately
1758
        depmgr.AddCheckResult(job, prev_job_id, None,
1759
                              (jqueue._JobDependencyManager.WAIT, "wait"))
1760
      elif attempt < 5:
1761
        depmgr.AddCheckResult(job, prev_job_id, None,
1762
                              (jqueue._JobDependencyManager.WAIT, "wait"))
1763
      elif attempt == 5:
1764
        depmgr.AddCheckResult(job, prev_job_id, None,
1765
                              (jqueue._JobDependencyManager.CONTINUE, "cont"))
1766
      if attempt == 2:
1767
        self.assertEqual(depmgr.CountPendingResults(), 2)
1768
      elif attempt > 5:
1769
        self.assertEqual(depmgr.CountPendingResults(), 0)
1770
      else:
1771
        self.assertEqual(depmgr.CountPendingResults(), 1)
1772

    
1773
      result = jqueue._JobProcessor(queue, opexec, job)()
1774
      if attempt == 0 or attempt >= 5:
1775
        # Job should only be updated if there was an actual change
1776
        self.assertEqual(queue.GetNextUpdate(), (job, True))
1777
      self.assertRaises(IndexError, queue.GetNextUpdate)
1778
      self.assertFalse(depmgr.CountPendingResults())
1779

    
1780
      if attempt < 5:
1781
        # Simulate waiting for other job
1782
        self.assertEqual(result, jqueue._JobProcessor.WAITDEP)
1783
        self.assertTrue(job.cur_opctx)
1784
        self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITING)
1785
        self.assertRaises(IndexError, depmgr.GetNextNotification)
1786
        self.assert_(job.start_timestamp)
1787
        self.assertFalse(job.end_timestamp)
1788
        continue
1789

    
1790
      if result == jqueue._JobProcessor.FINISHED:
1791
        # Last opcode
1792
        self.assertFalse(job.cur_opctx)
1793
        break
1794

    
1795
      self.assertRaises(IndexError, depmgr.GetNextNotification)
1796

    
1797
      self.assertEqual(result, jqueue._JobProcessor.DEFER)
1798
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1799
      self.assert_(job.start_timestamp)
1800
      self.assertFalse(job.end_timestamp)
1801

    
1802
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_SUCCESS)
1803
    self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_SUCCESS])
1804
    self.assertEqual(job.GetInfo(["opresult"]),
1805
                     [[op.input.result for op in job.ops]])
1806
    self.assertEqual(job.GetInfo(["opstatus"]),
1807
                     [len(job.ops) * [constants.OP_STATUS_SUCCESS]])
1808
    self.assertTrue(compat.all(op.start_timestamp and op.end_timestamp
1809
                               for op in job.ops))
1810

    
1811
    self._GenericCheckJob(job)
1812

    
1813
    self.assertRaises(IndexError, queue.GetNextUpdate)
1814
    self.assertRaises(IndexError, depmgr.GetNextNotification)
1815
    self.assertFalse(depmgr.CountPendingResults())
1816
    self.assertFalse(depmgr.CountWaitingJobs())
1817

    
1818
    # Calling the processor on a finished job should be a no-op
1819
    self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
1820
                     jqueue._JobProcessor.FINISHED)
1821
    self.assertRaises(IndexError, queue.GetNextUpdate)
1822

    
1823
  def testJobDependencyCancel(self):
1824
    depmgr = _FakeDependencyManager()
1825
    queue = _FakeQueueForProc(depmgr=depmgr)
1826

    
1827
    self.assertEqual(queue.depmgr, depmgr)
1828

    
1829
    prev_job_id = 13623
1830
    job_id = 30876
1831
    ops = [
1832
      opcodes.OpTestDummy(result="Res0", fail=False),
1833
      opcodes.OpTestDummy(result="Res1", fail=False,
1834
                          depends=[
1835
                            [prev_job_id, None],
1836
                            ]),
1837
      opcodes.OpTestDummy(result="Res2", fail=False),
1838
      ]
1839

    
1840
    # Create job
1841
    job = self._CreateJob(queue, job_id, ops)
1842

    
1843
    def _BeforeStart(timeout, priority):
1844
      if attempt == 0 or attempt > 5:
1845
        # Job should only be updated when it wasn't waiting for another job
1846
        self.assertEqual(queue.GetNextUpdate(), (job, True))
1847
      self.assertRaises(IndexError, queue.GetNextUpdate)
1848
      self.assertFalse(queue.IsAcquired())
1849
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITING)
1850
      self.assertFalse(job.cur_opctx)
1851

    
1852
    def _AfterStart(op, cbs):
1853
      self.assertEqual(queue.GetNextUpdate(), (job, True))
1854
      self.assertRaises(IndexError, queue.GetNextUpdate)
1855

    
1856
      self.assertFalse(queue.IsAcquired())
1857
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
1858
      self.assertFalse(job.cur_opctx)
1859

    
1860
      # Job is running, cancelling shouldn't be possible
1861
      (success, _) = job.Cancel()
1862
      self.assertFalse(success)
1863

    
1864
    opexec = _FakeExecOpCodeForProc(queue, _BeforeStart, _AfterStart)
1865

    
1866
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1867

    
1868
    counter = itertools.count()
1869
    while True:
1870
      attempt = counter.next()
1871

    
1872
      self.assertRaises(IndexError, queue.GetNextUpdate)
1873
      self.assertRaises(IndexError, depmgr.GetNextNotification)
1874

    
1875
      if attempt == 0:
1876
        # This will handle the first opcode
1877
        pass
1878
      elif attempt < 4:
1879
        depmgr.AddCheckResult(job, prev_job_id, None,
1880
                              (jqueue._JobDependencyManager.WAIT, "wait"))
1881
      elif attempt == 4:
1882
        # Other job was cancelled
1883
        depmgr.AddCheckResult(job, prev_job_id, None,
1884
                              (jqueue._JobDependencyManager.CANCEL, "cancel"))
1885

    
1886
      if attempt == 0:
1887
        self.assertEqual(depmgr.CountPendingResults(), 0)
1888
      else:
1889
        self.assertEqual(depmgr.CountPendingResults(), 1)
1890

    
1891
      result = jqueue._JobProcessor(queue, opexec, job)()
1892
      if attempt <= 1 or attempt >= 4:
1893
        # Job should only be updated if there was an actual change
1894
        self.assertEqual(queue.GetNextUpdate(), (job, True))
1895
      self.assertRaises(IndexError, queue.GetNextUpdate)
1896
      self.assertFalse(depmgr.CountPendingResults())
1897

    
1898
      if attempt > 0 and attempt < 4:
1899
        # Simulate waiting for other job
1900
        self.assertEqual(result, jqueue._JobProcessor.WAITDEP)
1901
        self.assertTrue(job.cur_opctx)
1902
        self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITING)
1903
        self.assertRaises(IndexError, depmgr.GetNextNotification)
1904
        self.assert_(job.start_timestamp)
1905
        self.assertFalse(job.end_timestamp)
1906
        continue
1907

    
1908
      if result == jqueue._JobProcessor.FINISHED:
1909
        # Last opcode
1910
        self.assertFalse(job.cur_opctx)
1911
        break
1912

    
1913
      self.assertRaises(IndexError, depmgr.GetNextNotification)
1914

    
1915
      self.assertEqual(result, jqueue._JobProcessor.DEFER)
1916
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1917
      self.assert_(job.start_timestamp)
1918
      self.assertFalse(job.end_timestamp)
1919

    
1920
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_CANCELED)
1921
    self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_CANCELED])
1922
    self.assertEqual(job.GetInfo(["opstatus", "opresult"]),
1923
                     [[constants.OP_STATUS_SUCCESS,
1924
                       constants.OP_STATUS_CANCELED,
1925
                       constants.OP_STATUS_CANCELED],
1926
                      ["Res0", "Job canceled by request",
1927
                       "Job canceled by request"]])
1928

    
1929
    self._GenericCheckJob(job)
1930

    
1931
    self.assertRaises(IndexError, queue.GetNextUpdate)
1932
    self.assertRaises(IndexError, depmgr.GetNextNotification)
1933
    self.assertFalse(depmgr.CountPendingResults())
1934

    
1935
    # Calling the processor on a finished job should be a no-op
1936
    self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
1937
                     jqueue._JobProcessor.FINISHED)
1938
    self.assertRaises(IndexError, queue.GetNextUpdate)
1939

    
1940
  def testJobDependencyWrongstatus(self):
1941
    depmgr = _FakeDependencyManager()
1942
    queue = _FakeQueueForProc(depmgr=depmgr)
1943

    
1944
    self.assertEqual(queue.depmgr, depmgr)
1945

    
1946
    prev_job_id = 9741
1947
    job_id = 11763
1948
    ops = [
1949
      opcodes.OpTestDummy(result="Res0", fail=False),
1950
      opcodes.OpTestDummy(result="Res1", fail=False,
1951
                          depends=[
1952
                            [prev_job_id, None],
1953
                            ]),
1954
      opcodes.OpTestDummy(result="Res2", fail=False),
1955
      ]
1956

    
1957
    # Create job
1958
    job = self._CreateJob(queue, job_id, ops)
1959

    
1960
    def _BeforeStart(timeout, priority):
1961
      if attempt == 0 or attempt > 5:
1962
        # Job should only be updated when it wasn't waiting for another job
1963
        self.assertEqual(queue.GetNextUpdate(), (job, True))
1964
      self.assertRaises(IndexError, queue.GetNextUpdate)
1965
      self.assertFalse(queue.IsAcquired())
1966
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITING)
1967
      self.assertFalse(job.cur_opctx)
1968

    
1969
    def _AfterStart(op, cbs):
1970
      self.assertEqual(queue.GetNextUpdate(), (job, True))
1971
      self.assertRaises(IndexError, queue.GetNextUpdate)
1972

    
1973
      self.assertFalse(queue.IsAcquired())
1974
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
1975
      self.assertFalse(job.cur_opctx)
1976

    
1977
      # Job is running, cancelling shouldn't be possible
1978
      (success, _) = job.Cancel()
1979
      self.assertFalse(success)
1980

    
1981
    opexec = _FakeExecOpCodeForProc(queue, _BeforeStart, _AfterStart)
1982

    
1983
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1984

    
1985
    counter = itertools.count()
1986
    while True:
1987
      attempt = counter.next()
1988

    
1989
      self.assertRaises(IndexError, queue.GetNextUpdate)
1990
      self.assertRaises(IndexError, depmgr.GetNextNotification)
1991

    
1992
      if attempt == 0:
1993
        # This will handle the first opcode
1994
        pass
1995
      elif attempt < 4:
1996
        depmgr.AddCheckResult(job, prev_job_id, None,
1997
                              (jqueue._JobDependencyManager.WAIT, "wait"))
1998
      elif attempt == 4:
1999
        # Other job failed
2000
        depmgr.AddCheckResult(job, prev_job_id, None,
2001
                              (jqueue._JobDependencyManager.WRONGSTATUS, "w"))
2002

    
2003
      if attempt == 0:
2004
        self.assertEqual(depmgr.CountPendingResults(), 0)
2005
      else:
2006
        self.assertEqual(depmgr.CountPendingResults(), 1)
2007

    
2008
      result = jqueue._JobProcessor(queue, opexec, job)()
2009
      if attempt <= 1 or attempt >= 4:
2010
        # Job should only be updated if there was an actual change
2011
        self.assertEqual(queue.GetNextUpdate(), (job, True))
2012
      self.assertRaises(IndexError, queue.GetNextUpdate)
2013
      self.assertFalse(depmgr.CountPendingResults())
2014

    
2015
      if attempt > 0 and attempt < 4:
2016
        # Simulate waiting for other job
2017
        self.assertEqual(result, jqueue._JobProcessor.WAITDEP)
2018
        self.assertTrue(job.cur_opctx)
2019
        self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITING)
2020
        self.assertRaises(IndexError, depmgr.GetNextNotification)
2021
        self.assert_(job.start_timestamp)
2022
        self.assertFalse(job.end_timestamp)
2023
        continue
2024

    
2025
      if result == jqueue._JobProcessor.FINISHED:
2026
        # Last opcode
2027
        self.assertFalse(job.cur_opctx)
2028
        break
2029

    
2030
      self.assertRaises(IndexError, depmgr.GetNextNotification)
2031

    
2032
      self.assertEqual(result, jqueue._JobProcessor.DEFER)
2033
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
2034
      self.assert_(job.start_timestamp)
2035
      self.assertFalse(job.end_timestamp)
2036

    
2037
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_ERROR)
2038
    self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_ERROR])
2039
    self.assertEqual(job.GetInfo(["opstatus"]),
2040
                     [[constants.OP_STATUS_SUCCESS,
2041
                       constants.OP_STATUS_ERROR,
2042
                       constants.OP_STATUS_ERROR]]),
2043

    
2044
    (opresult, ) = job.GetInfo(["opresult"])
2045
    self.assertEqual(len(opresult), len(ops))
2046
    self.assertEqual(opresult[0], "Res0")
2047
    self.assertTrue(errors.GetEncodedError(opresult[1]))
2048
    self.assertTrue(errors.GetEncodedError(opresult[2]))
2049

    
2050
    self._GenericCheckJob(job)
2051

    
2052
    self.assertRaises(IndexError, queue.GetNextUpdate)
2053
    self.assertRaises(IndexError, depmgr.GetNextNotification)
2054
    self.assertFalse(depmgr.CountPendingResults())
2055

    
2056
    # Calling the processor on a finished job should be a no-op
2057
    self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
2058
                     jqueue._JobProcessor.FINISHED)
2059
    self.assertRaises(IndexError, queue.GetNextUpdate)
2060

    
2061

    
2062
class TestEvaluateJobProcessorResult(unittest.TestCase):
2063
  def testFinished(self):
2064
    depmgr = _FakeDependencyManager()
2065
    job = _IdOnlyFakeJob(30953)
2066
    jqueue._EvaluateJobProcessorResult(depmgr, job,
2067
                                       jqueue._JobProcessor.FINISHED)
2068
    self.assertEqual(depmgr.GetNextNotification(), job.id)
2069
    self.assertRaises(IndexError, depmgr.GetNextNotification)
2070

    
2071
  def testDefer(self):
2072
    depmgr = _FakeDependencyManager()
2073
    job = _IdOnlyFakeJob(11326, priority=5463)
2074
    try:
2075
      jqueue._EvaluateJobProcessorResult(depmgr, job,
2076
                                         jqueue._JobProcessor.DEFER)
2077
    except workerpool.DeferTask, err:
2078
      self.assertEqual(err.priority, 5463)
2079
    else:
2080
      self.fail("Didn't raise exception")
2081
    self.assertRaises(IndexError, depmgr.GetNextNotification)
2082

    
2083
  def testWaitdep(self):
2084
    depmgr = _FakeDependencyManager()
2085
    job = _IdOnlyFakeJob(21317)
2086
    jqueue._EvaluateJobProcessorResult(depmgr, job,
2087
                                       jqueue._JobProcessor.WAITDEP)
2088
    self.assertRaises(IndexError, depmgr.GetNextNotification)
2089

    
2090
  def testOther(self):
2091
    depmgr = _FakeDependencyManager()
2092
    job = _IdOnlyFakeJob(5813)
2093
    self.assertRaises(errors.ProgrammerError,
2094
                      jqueue._EvaluateJobProcessorResult,
2095
                      depmgr, job, "Other result")
2096
    self.assertRaises(IndexError, depmgr.GetNextNotification)
2097

    
2098

    
2099
class _FakeTimeoutStrategy:
2100
  def __init__(self, timeouts):
2101
    self.timeouts = timeouts
2102
    self.attempts = 0
2103
    self.last_timeout = None
2104

    
2105
  def NextAttempt(self):
2106
    self.attempts += 1
2107
    if self.timeouts:
2108
      timeout = self.timeouts.pop(0)
2109
    else:
2110
      timeout = None
2111
    self.last_timeout = timeout
2112
    return timeout
2113

    
2114

    
2115
class TestJobProcessorTimeouts(unittest.TestCase, _JobProcessorTestUtils):
2116
  def setUp(self):
2117
    self.queue = _FakeQueueForProc()
2118
    self.job = None
2119
    self.curop = None
2120
    self.opcounter = None
2121
    self.timeout_strategy = None
2122
    self.retries = 0
2123
    self.prev_tsop = None
2124
    self.prev_prio = None
2125
    self.prev_status = None
2126
    self.lock_acq_prio = None
2127
    self.gave_lock = None
2128
    self.done_lock_before_blocking = False
2129

    
2130
  def _BeforeStart(self, timeout, priority):
2131
    job = self.job
2132

    
2133
    # If status has changed, job must've been written
2134
    if self.prev_status != self.job.ops[self.curop].status:
2135
      self.assertEqual(self.queue.GetNextUpdate(), (job, True))
2136
    self.assertRaises(IndexError, self.queue.GetNextUpdate)
2137

    
2138
    self.assertFalse(self.queue.IsAcquired())
2139
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITING)
2140

    
2141
    ts = self.timeout_strategy
2142

    
2143
    self.assert_(timeout is None or isinstance(timeout, (int, float)))
2144
    self.assertEqual(timeout, ts.last_timeout)
2145
    self.assertEqual(priority, job.ops[self.curop].priority)
2146

    
2147
    self.gave_lock = True
2148
    self.lock_acq_prio = priority
2149

    
2150
    if (self.curop == 3 and
2151
        job.ops[self.curop].priority == constants.OP_PRIO_HIGHEST + 3):
2152
      # Give locks before running into blocking acquire
2153
      assert self.retries == 7
2154
      self.retries = 0
2155
      self.done_lock_before_blocking = True
2156
      return
2157

    
2158
    if self.retries > 0:
2159
      self.assert_(timeout is not None)
2160
      self.retries -= 1
2161
      self.gave_lock = False
2162
      raise mcpu.LockAcquireTimeout()
2163

    
2164
    if job.ops[self.curop].priority == constants.OP_PRIO_HIGHEST:
2165
      assert self.retries == 0, "Didn't exhaust all retries at highest priority"
2166
      assert not ts.timeouts
2167
      self.assert_(timeout is None)
2168

    
2169
  def _AfterStart(self, op, cbs):
2170
    job = self.job
2171

    
2172
    # Setting to "running" requires an update
2173
    self.assertEqual(self.queue.GetNextUpdate(), (job, True))
2174
    self.assertRaises(IndexError, self.queue.GetNextUpdate)
2175

    
2176
    self.assertFalse(self.queue.IsAcquired())
2177
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
2178

    
2179
    # Job is running, cancelling shouldn't be possible
2180
    (success, _) = job.Cancel()
2181
    self.assertFalse(success)
2182

    
2183
  def _NextOpcode(self):
2184
    self.curop = self.opcounter.next()
2185
    self.prev_prio = self.job.ops[self.curop].priority
2186
    self.prev_status = self.job.ops[self.curop].status
2187

    
2188
  def _NewTimeoutStrategy(self):
2189
    job = self.job
2190

    
2191
    self.assertEqual(self.retries, 0)
2192

    
2193
    if self.prev_tsop == self.curop:
2194
      # Still on the same opcode, priority must've been increased
2195
      self.assertEqual(self.prev_prio, job.ops[self.curop].priority + 1)
2196

    
2197
    if self.curop == 1:
2198
      # Normal retry
2199
      timeouts = range(10, 31, 10)
2200
      self.retries = len(timeouts) - 1
2201

    
2202
    elif self.curop == 2:
2203
      # Let this run into a blocking acquire
2204
      timeouts = range(11, 61, 12)
2205
      self.retries = len(timeouts)
2206

    
2207
    elif self.curop == 3:
2208
      # Wait for priority to increase, but give lock before blocking acquire
2209
      timeouts = range(12, 100, 14)
2210
      self.retries = len(timeouts)
2211

    
2212
      self.assertFalse(self.done_lock_before_blocking)
2213

    
2214
    elif self.curop == 4:
2215
      self.assert_(self.done_lock_before_blocking)
2216

    
2217
      # Timeouts, but no need to retry
2218
      timeouts = range(10, 31, 10)
2219
      self.retries = 0
2220

    
2221
    elif self.curop == 5:
2222
      # Normal retry
2223
      timeouts = range(19, 100, 11)
2224
      self.retries = len(timeouts)
2225

    
2226
    else:
2227
      timeouts = []
2228
      self.retries = 0
2229

    
2230
    assert len(job.ops) == 10
2231
    assert self.retries <= len(timeouts)
2232

    
2233
    ts = _FakeTimeoutStrategy(timeouts)
2234

    
2235
    self.timeout_strategy = ts
2236
    self.prev_tsop = self.curop
2237
    self.prev_prio = job.ops[self.curop].priority
2238

    
2239
    return ts
2240

    
2241
  def testTimeout(self):
2242
    ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
2243
           for i in range(10)]
2244

    
2245
    # Create job
2246
    job_id = 15801
2247
    job = self._CreateJob(self.queue, job_id, ops)
2248
    self.job = job
2249

    
2250
    self.opcounter = itertools.count(0)
2251

    
2252
    opexec = _FakeExecOpCodeForProc(self.queue, self._BeforeStart,
2253
                                    self._AfterStart)
2254
    tsf = self._NewTimeoutStrategy
2255

    
2256
    self.assertFalse(self.done_lock_before_blocking)
2257

    
2258
    while True:
2259
      proc = jqueue._JobProcessor(self.queue, opexec, job,
2260
                                  _timeout_strategy_factory=tsf)
2261

    
2262
      self.assertRaises(IndexError, self.queue.GetNextUpdate)
2263

    
2264
      if self.curop is not None:
2265
        self.prev_status = self.job.ops[self.curop].status
2266

    
2267
      self.lock_acq_prio = None
2268

    
2269
      result = proc(_nextop_fn=self._NextOpcode)
2270
      assert self.curop is not None
2271

    
2272
      # Input priority should never be set or modified
2273
      self.assertFalse(compat.any(hasattr(op.input, "priority")
2274
                                  for op in job.ops))
2275

    
2276
      if result == jqueue._JobProcessor.FINISHED or self.gave_lock:
2277
        # Got lock and/or job is done, result must've been written
2278
        self.assertFalse(job.cur_opctx)
2279
        self.assertEqual(self.queue.GetNextUpdate(), (job, True))
2280
        self.assertRaises(IndexError, self.queue.GetNextUpdate)
2281
        self.assertEqual(self.lock_acq_prio, job.ops[self.curop].priority)
2282
        self.assert_(job.ops[self.curop].exec_timestamp)
2283

    
2284
      if result == jqueue._JobProcessor.FINISHED:
2285
        self.assertFalse(job.cur_opctx)
2286
        break
2287

    
2288
      self.assertEqual(result, jqueue._JobProcessor.DEFER)
2289

    
2290
      if self.curop == 0:
2291
        self.assertEqual(job.ops[self.curop].start_timestamp,
2292
                         job.start_timestamp)
2293

    
2294
      if self.gave_lock:
2295
        # Opcode finished, but job not yet done
2296
        self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
2297
      else:
2298
        # Did not get locks
2299
        self.assert_(job.cur_opctx)
2300
        self.assertEqual(job.cur_opctx._timeout_strategy._fn,
2301
                         self.timeout_strategy.NextAttempt)
2302
        self.assertFalse(job.ops[self.curop].exec_timestamp)
2303
        self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITING)
2304

    
2305
        # If priority has changed since acquiring locks, the job must've been
2306
        # updated
2307
        if self.lock_acq_prio != job.ops[self.curop].priority:
2308
          self.assertEqual(self.queue.GetNextUpdate(), (job, True))
2309

    
2310
      self.assertRaises(IndexError, self.queue.GetNextUpdate)
2311

    
2312
      self.assert_(job.start_timestamp)
2313
      self.assertFalse(job.end_timestamp)
2314

    
2315
    self.assertEqual(self.curop, len(job.ops) - 1)
2316
    self.assertEqual(self.job, job)
2317
    self.assertEqual(self.opcounter.next(), len(job.ops))
2318
    self.assert_(self.done_lock_before_blocking)
2319

    
2320
    self.assertRaises(IndexError, self.queue.GetNextUpdate)
2321
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_SUCCESS)
2322
    self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_SUCCESS])
2323
    self.assertEqual(job.GetInfo(["opresult"]),
2324
                     [[op.input.result for op in job.ops]])
2325
    self.assertEqual(job.GetInfo(["opstatus"]),
2326
                     [len(job.ops) * [constants.OP_STATUS_SUCCESS]])
2327
    self.assert_(compat.all(op.start_timestamp and op.end_timestamp
2328
                            for op in job.ops))
2329

    
2330
    # Calling the processor on a finished job should be a no-op
2331
    self.assertEqual(jqueue._JobProcessor(self.queue, opexec, job)(),
2332
                     jqueue._JobProcessor.FINISHED)
2333
    self.assertRaises(IndexError, self.queue.GetNextUpdate)
2334

    
2335

    
2336
class TestJobProcessorChangePriority(unittest.TestCase, _JobProcessorTestUtils):
2337
  def setUp(self):
2338
    self.queue = _FakeQueueForProc()
2339
    self.opexecprio = []
2340

    
2341
  def _BeforeStart(self, timeout, priority):
2342
    self.assertFalse(self.queue.IsAcquired())
2343
    self.opexecprio.append(priority)
2344

    
2345
  def testChangePriorityWhileRunning(self):
2346
    # Tests changing the priority on a job while it has finished opcodes
2347
    # (successful) and more, unprocessed ones
2348
    ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
2349
           for i in range(3)]
2350

    
2351
    # Create job
2352
    job_id = 3499
2353
    job = self._CreateJob(self.queue, job_id, ops)
2354

    
2355
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
2356

    
2357
    opexec = _FakeExecOpCodeForProc(self.queue, self._BeforeStart, None)
2358

    
2359
    # Run first opcode
2360
    self.assertEqual(jqueue._JobProcessor(self.queue, opexec, job)(),
2361
                     jqueue._JobProcessor.DEFER)
2362

    
2363
    # Job goes back to queued
2364
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
2365
    self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT)
2366
    self.assertEqual(job.GetInfo(["opstatus", "opresult"]),
2367
                     [[constants.OP_STATUS_SUCCESS,
2368
                       constants.OP_STATUS_QUEUED,
2369
                       constants.OP_STATUS_QUEUED],
2370
                      ["Res0", None, None]])
2371

    
2372
    self.assertEqual(self.opexecprio.pop(0), constants.OP_PRIO_DEFAULT)
2373
    self.assertRaises(IndexError, self.opexecprio.pop, 0)
2374

    
2375
    # Change priority
2376
    self.assertEqual(job.ChangePriority(-10),
2377
                     (True,
2378
                      ("Priorities of pending opcodes for job 3499 have"
2379
                       " been changed to -10")))
2380
    self.assertEqual(job.CalcPriority(), -10)
2381

    
2382
    # Process second opcode
2383
    self.assertEqual(jqueue._JobProcessor(self.queue, opexec, job)(),
2384
                     jqueue._JobProcessor.DEFER)
2385

    
2386
    self.assertEqual(self.opexecprio.pop(0), -10)
2387
    self.assertRaises(IndexError, self.opexecprio.pop, 0)
2388

    
2389
    # Check status
2390
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
2391
    self.assertEqual(job.CalcPriority(), -10)
2392
    self.assertEqual(job.GetInfo(["id"]), [job_id])
2393
    self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_QUEUED])
2394
    self.assertEqual(job.GetInfo(["opstatus", "opresult"]),
2395
                     [[constants.OP_STATUS_SUCCESS,
2396
                       constants.OP_STATUS_SUCCESS,
2397
                       constants.OP_STATUS_QUEUED],
2398
                      ["Res0", "Res1", None]])
2399

    
2400
    # Change priority once more
2401
    self.assertEqual(job.ChangePriority(5),
2402
                     (True,
2403
                      ("Priorities of pending opcodes for job 3499 have"
2404
                       " been changed to 5")))
2405
    self.assertEqual(job.CalcPriority(), 5)
2406

    
2407
    # Process third opcode
2408
    self.assertEqual(jqueue._JobProcessor(self.queue, opexec, job)(),
2409
                     jqueue._JobProcessor.FINISHED)
2410

    
2411
    self.assertEqual(self.opexecprio.pop(0), 5)
2412
    self.assertRaises(IndexError, self.opexecprio.pop, 0)
2413

    
2414
    # Check status
2415
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_SUCCESS)
2416
    self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT)
2417
    self.assertEqual(job.GetInfo(["id"]), [job_id])
2418
    self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_SUCCESS])
2419
    self.assertEqual(job.GetInfo(["opstatus", "opresult"]),
2420
                     [[constants.OP_STATUS_SUCCESS,
2421
                       constants.OP_STATUS_SUCCESS,
2422
                       constants.OP_STATUS_SUCCESS],
2423
                      ["Res0", "Res1", "Res2"]])
2424
    self.assertEqual(map(operator.attrgetter("priority"), job.ops),
2425
                     [constants.OP_PRIO_DEFAULT, -10, 5])
2426

    
2427

    
2428
class _IdOnlyFakeJob:
2429
  def __init__(self, job_id, priority=NotImplemented):
2430
    self.id = str(job_id)
2431
    self._priority = priority
2432

    
2433
  def CalcPriority(self):
2434
    return self._priority
2435

    
2436

    
2437
class TestJobDependencyManager(unittest.TestCase):
2438
  def setUp(self):
2439
    self._status = []
2440
    self._queue = []
2441
    self.jdm = jqueue._JobDependencyManager(self._GetStatus, self._Enqueue)
2442

    
2443
  def _GetStatus(self, job_id):
2444
    (exp_job_id, result) = self._status.pop(0)
2445
    self.assertEqual(exp_job_id, job_id)
2446
    return result
2447

    
2448
  def _Enqueue(self, jobs):
2449
    self.assertFalse(self.jdm._lock.is_owned(),
2450
                     msg=("Must not own manager lock while re-adding jobs"
2451
                          " (potential deadlock)"))
2452
    self._queue.append(jobs)
2453

    
2454
  def testNotFinalizedThenCancel(self):
2455
    job = _IdOnlyFakeJob(17697)
2456
    job_id = str(28625)
2457

    
2458
    self._status.append((job_id, constants.JOB_STATUS_RUNNING))
2459
    (result, _) = self.jdm.CheckAndRegister(job, job_id, [])
2460
    self.assertEqual(result, self.jdm.WAIT)
2461
    self.assertFalse(self._status)
2462
    self.assertFalse(self._queue)
2463
    self.assertTrue(self.jdm.JobWaiting(job))
2464
    self.assertEqual(self.jdm._waiters, {
2465
      job_id: set([job]),
2466
      })
2467
    self.assertEqual(self.jdm.GetLockInfo([query.LQ_PENDING]), [
2468
      ("job/28625", None, None, [("job", [job.id])])
2469
      ])
2470

    
2471
    self._status.append((job_id, constants.JOB_STATUS_CANCELED))
2472
    (result, _) = self.jdm.CheckAndRegister(job, job_id, [])
2473
    self.assertEqual(result, self.jdm.CANCEL)
2474
    self.assertFalse(self._status)
2475
    self.assertFalse(self._queue)
2476
    self.assertFalse(self.jdm.JobWaiting(job))
2477
    self.assertFalse(self.jdm.GetLockInfo([query.LQ_PENDING]))
2478

    
2479
  def testNotFinalizedThenQueued(self):
2480
    # This can happen on a queue shutdown
2481
    job = _IdOnlyFakeJob(1320)
2482
    job_id = str(22971)
2483

    
2484
    for i in range(5):
2485
      if i > 2:
2486
        self._status.append((job_id, constants.JOB_STATUS_QUEUED))
2487
      else:
2488
        self._status.append((job_id, constants.JOB_STATUS_RUNNING))
2489
      (result, _) = self.jdm.CheckAndRegister(job, job_id, [])
2490
      self.assertEqual(result, self.jdm.WAIT)
2491
      self.assertFalse(self._status)
2492
      self.assertFalse(self._queue)
2493
      self.assertTrue(self.jdm.JobWaiting(job))
2494
      self.assertEqual(self.jdm._waiters, {
2495
        job_id: set([job]),
2496
        })
2497
      self.assertEqual(self.jdm.GetLockInfo([query.LQ_PENDING]), [
2498
        ("job/22971", None, None, [("job", [job.id])])
2499
        ])
2500

    
2501
  def testRequireCancel(self):
2502
    job = _IdOnlyFakeJob(5278)
2503
    job_id = str(9610)
2504
    dep_status = [constants.JOB_STATUS_CANCELED]
2505

    
2506
    self._status.append((job_id, constants.JOB_STATUS_WAITING))
2507
    (result, _) = self.jdm.CheckAndRegister(job, job_id, dep_status)
2508
    self.assertEqual(result, self.jdm.WAIT)
2509
    self.assertFalse(self._status)
2510
    self.assertFalse(self._queue)
2511
    self.assertTrue(self.jdm.JobWaiting(job))
2512
    self.assertEqual(self.jdm._waiters, {
2513
      job_id: set([job]),
2514
      })
2515
    self.assertEqual(self.jdm.GetLockInfo([query.LQ_PENDING]), [
2516
      ("job/9610", None, None, [("job", [job.id])])
2517
      ])
2518

    
2519
    self._status.append((job_id, constants.JOB_STATUS_CANCELED))
2520
    (result, _) = self.jdm.CheckAndRegister(job, job_id, dep_status)
2521
    self.assertEqual(result, self.jdm.CONTINUE)
2522
    self.assertFalse(self._status)
2523
    self.assertFalse(self._queue)
2524
    self.assertFalse(self.jdm.JobWaiting(job))
2525
    self.assertFalse(self.jdm.GetLockInfo([query.LQ_PENDING]))
2526

    
2527
  def testRequireError(self):
2528
    job = _IdOnlyFakeJob(21459)
2529
    job_id = str(25519)
2530
    dep_status = [constants.JOB_STATUS_ERROR]
2531

    
2532
    self._status.append((job_id, constants.JOB_STATUS_WAITING))
2533
    (result, _) = self.jdm.CheckAndRegister(job, job_id, dep_status)
2534
    self.assertEqual(result, self.jdm.WAIT)
2535
    self.assertFalse(self._status)
2536
    self.assertFalse(self._queue)
2537
    self.assertTrue(self.jdm.JobWaiting(job))
2538
    self.assertEqual(self.jdm._waiters, {
2539
      job_id: set([job]),
2540
      })
2541

    
2542
    self._status.append((job_id, constants.JOB_STATUS_ERROR))
2543
    (result, _) = self.jdm.CheckAndRegister(job, job_id, dep_status)
2544
    self.assertEqual(result, self.jdm.CONTINUE)
2545
    self.assertFalse(self._status)
2546
    self.assertFalse(self._queue)
2547
    self.assertFalse(self.jdm.JobWaiting(job))
2548
    self.assertFalse(self.jdm.GetLockInfo([query.LQ_PENDING]))
2549

    
2550
  def testRequireMultiple(self):
2551
    dep_status = list(constants.JOBS_FINALIZED)
2552

    
2553
    for end_status in dep_status:
2554
      job = _IdOnlyFakeJob(21343)
2555
      job_id = str(14609)
2556

    
2557
      self._status.append((job_id, constants.JOB_STATUS_WAITING))
2558
      (result, _) = self.jdm.CheckAndRegister(job, job_id, dep_status)
2559
      self.assertEqual(result, self.jdm.WAIT)
2560
      self.assertFalse(self._status)
2561
      self.assertFalse(self._queue)
2562
      self.assertTrue(self.jdm.JobWaiting(job))
2563
      self.assertEqual(self.jdm._waiters, {
2564
        job_id: set([job]),
2565
        })
2566
      self.assertEqual(self.jdm.GetLockInfo([query.LQ_PENDING]), [
2567
        ("job/14609", None, None, [("job", [job.id])])
2568
        ])
2569

    
2570
      self._status.append((job_id, end_status))
2571
      (result, _) = self.jdm.CheckAndRegister(job, job_id, dep_status)
2572
      self.assertEqual(result, self.jdm.CONTINUE)
2573
      self.assertFalse(self._status)
2574
      self.assertFalse(self._queue)
2575
      self.assertFalse(self.jdm.JobWaiting(job))
2576
      self.assertFalse(self.jdm.GetLockInfo([query.LQ_PENDING]))
2577

    
2578
  def testNotify(self):
2579
    job = _IdOnlyFakeJob(8227)
2580
    job_id = str(4113)
2581

    
2582
    self._status.append((job_id, constants.JOB_STATUS_RUNNING))
2583
    (result, _) = self.jdm.CheckAndRegister(job, job_id, [])
2584
    self.assertEqual(result, self.jdm.WAIT)
2585
    self.assertFalse(self._status)
2586
    self.assertFalse(self._queue)
2587
    self.assertTrue(self.jdm.JobWaiting(job))
2588
    self.assertEqual(self.jdm._waiters, {
2589
      job_id: set([job]),
2590
      })
2591

    
2592
    self.jdm.NotifyWaiters(job_id)
2593
    self.assertFalse(self._status)
2594
    self.assertFalse(self.jdm._waiters)
2595
    self.assertFalse(self.jdm.JobWaiting(job))
2596
    self.assertEqual(self._queue, [set([job])])
2597

    
2598
  def testWrongStatus(self):
2599
    job = _IdOnlyFakeJob(10102)
2600
    job_id = str(1271)
2601

    
2602
    self._status.append((job_id, constants.JOB_STATUS_QUEUED))
2603
    (result, _) = self.jdm.CheckAndRegister(job, job_id,
2604
                                            [constants.JOB_STATUS_SUCCESS])
2605
    self.assertEqual(result, self.jdm.WAIT)
2606
    self.assertFalse(self._status)
2607
    self.assertFalse(self._queue)
2608
    self.assertTrue(self.jdm.JobWaiting(job))
2609
    self.assertEqual(self.jdm._waiters, {
2610
      job_id: set([job]),
2611
      })
2612

    
2613
    self._status.append((job_id, constants.JOB_STATUS_ERROR))
2614
    (result, _) = self.jdm.CheckAndRegister(job, job_id,
2615
                                            [constants.JOB_STATUS_SUCCESS])
2616
    self.assertEqual(result, self.jdm.WRONGSTATUS)
2617
    self.assertFalse(self._status)
2618
    self.assertFalse(self._queue)
2619
    self.assertFalse(self.jdm.JobWaiting(job))
2620

    
2621
  def testCorrectStatus(self):
2622
    job = _IdOnlyFakeJob(24273)
2623
    job_id = str(23885)
2624

    
2625
    self._status.append((job_id, constants.JOB_STATUS_QUEUED))
2626
    (result, _) = self.jdm.CheckAndRegister(job, job_id,
2627
                                            [constants.JOB_STATUS_SUCCESS])
2628
    self.assertEqual(result, self.jdm.WAIT)
2629
    self.assertFalse(self._status)
2630
    self.assertFalse(self._queue)
2631
    self.assertTrue(self.jdm.JobWaiting(job))
2632
    self.assertEqual(self.jdm._waiters, {
2633
      job_id: set([job]),
2634
      })
2635

    
2636
    self._status.append((job_id, constants.JOB_STATUS_SUCCESS))
2637
    (result, _) = self.jdm.CheckAndRegister(job, job_id,
2638
                                            [constants.JOB_STATUS_SUCCESS])
2639
    self.assertEqual(result, self.jdm.CONTINUE)
2640
    self.assertFalse(self._status)
2641
    self.assertFalse(self._queue)
2642
    self.assertFalse(self.jdm.JobWaiting(job))
2643

    
2644
  def testFinalizedRightAway(self):
2645
    job = _IdOnlyFakeJob(224)
2646
    job_id = str(3081)
2647

    
2648
    self._status.append((job_id, constants.JOB_STATUS_SUCCESS))
2649
    (result, _) = self.jdm.CheckAndRegister(job, job_id,
2650
                                            [constants.JOB_STATUS_SUCCESS])
2651
    self.assertEqual(result, self.jdm.CONTINUE)
2652
    self.assertFalse(self._status)
2653
    self.assertFalse(self._queue)
2654
    self.assertFalse(self.jdm.JobWaiting(job))
2655
    self.assertEqual(self.jdm._waiters, {
2656
      job_id: set(),
2657
      })
2658

    
2659
    # Force cleanup
2660
    self.jdm.NotifyWaiters("0")
2661
    self.assertFalse(self.jdm._waiters)
2662
    self.assertFalse(self._status)
2663
    self.assertFalse(self._queue)
2664

    
2665
  def testMultipleWaiting(self):
2666
    # Use a deterministic random generator
2667
    rnd = random.Random(21402)
2668

    
2669
    job_ids = map(str, rnd.sample(range(1, 10000), 150))
2670

    
2671
    waiters = dict((job_ids.pop(),
2672
                    set(map(_IdOnlyFakeJob,
2673
                            [job_ids.pop()
2674
                             for _ in range(rnd.randint(1, 20))])))
2675
                   for _ in range(10))
2676

    
2677
    # Ensure there are no duplicate job IDs
2678
    assert not utils.FindDuplicates(waiters.keys() +
2679
                                    [job.id
2680
                                     for jobs in waiters.values()
2681
                                     for job in jobs])
2682

    
2683
    # Register all jobs as waiters
2684
    for job_id, job in [(job_id, job)
2685
                        for (job_id, jobs) in waiters.items()
2686
                        for job in jobs]:
2687
      self._status.append((job_id, constants.JOB_STATUS_QUEUED))
2688
      (result, _) = self.jdm.CheckAndRegister(job, job_id,
2689
                                              [constants.JOB_STATUS_SUCCESS])
2690
      self.assertEqual(result, self.jdm.WAIT)
2691
      self.assertFalse(self._status)
2692
      self.assertFalse(self._queue)
2693
      self.assertTrue(self.jdm.JobWaiting(job))
2694

    
2695
    self.assertEqual(self.jdm._waiters, waiters)
2696

    
2697
    def _MakeSet((name, mode, owner_names, pending)):
2698
      return (name, mode, owner_names,
2699
              [(pendmode, set(pend)) for (pendmode, pend) in pending])
2700

    
2701
    def _CheckLockInfo():
2702
      info = self.jdm.GetLockInfo([query.LQ_PENDING])
2703
      self.assertEqual(sorted(map(_MakeSet, info)), sorted([
2704
        ("job/%s" % job_id, None, None,
2705
         [("job", set([job.id for job in jobs]))])
2706
        for job_id, jobs in waiters.items()
2707
        if jobs
2708
        ]))
2709

    
2710
    _CheckLockInfo()
2711

    
2712
    # Notify in random order
2713
    for job_id in rnd.sample(waiters, len(waiters)):
2714
      # Remove from pending waiter list
2715
      jobs = waiters.pop(job_id)
2716
      for job in jobs:
2717
        self._status.append((job_id, constants.JOB_STATUS_SUCCESS))
2718
        (result, _) = self.jdm.CheckAndRegister(job, job_id,
2719
                                                [constants.JOB_STATUS_SUCCESS])
2720
        self.assertEqual(result, self.jdm.CONTINUE)
2721
        self.assertFalse(self._status)
2722
        self.assertFalse(self._queue)
2723
        self.assertFalse(self.jdm.JobWaiting(job))
2724

    
2725
      _CheckLockInfo()
2726

    
2727
    self.assertFalse(self.jdm.GetLockInfo([query.LQ_PENDING]))
2728

    
2729
    assert not waiters
2730

    
2731
  def testSelfDependency(self):
2732
    job = _IdOnlyFakeJob(18937)
2733

    
2734
    self._status.append((job.id, constants.JOB_STATUS_SUCCESS))
2735
    (result, _) = self.jdm.CheckAndRegister(job, job.id, [])
2736
    self.assertEqual(result, self.jdm.ERROR)
2737

    
2738
  def testJobDisappears(self):
2739
    job = _IdOnlyFakeJob(30540)
2740
    job_id = str(23769)
2741

    
2742
    def _FakeStatus(_):
2743
      raise errors.JobLost("#msg#")
2744

    
2745
    jdm = jqueue._JobDependencyManager(_FakeStatus, None)
2746
    (result, _) = jdm.CheckAndRegister(job, job_id, [])
2747
    self.assertEqual(result, self.jdm.ERROR)
2748
    self.assertFalse(jdm.JobWaiting(job))
2749
    self.assertFalse(jdm.GetLockInfo([query.LQ_PENDING]))
2750

    
2751

    
2752
if __name__ == "__main__":
2753
  testutils.GanetiTestProgram()