Statistics
| Branch: | Tag: | Revision:

root / test / ganeti.jqueue_unittest.py @ 76b62028

History | View | Annotate | Download (76 kB)

1
#!/usr/bin/python
2
#
3

    
4
# Copyright (C) 2010, 2011, 2012 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Script for testing ganeti.jqueue"""
23

    
24
import os
25
import sys
26
import unittest
27
import tempfile
28
import shutil
29
import errno
30
import itertools
31
import random
32

    
33
from ganeti import constants
34
from ganeti import utils
35
from ganeti import errors
36
from ganeti import jqueue
37
from ganeti import opcodes
38
from ganeti import compat
39
from ganeti import mcpu
40
from ganeti import query
41
from ganeti import workerpool
42

    
43
import testutils
44

    
45

    
46
class _FakeJob:
47
  def __init__(self, job_id, status):
48
    self.id = job_id
49
    self.writable = False
50
    self._status = status
51
    self._log = []
52

    
53
  def SetStatus(self, status):
54
    self._status = status
55

    
56
  def AddLogEntry(self, msg):
57
    self._log.append((len(self._log), msg))
58

    
59
  def CalcStatus(self):
60
    return self._status
61

    
62
  def GetInfo(self, fields):
63
    result = []
64

    
65
    for name in fields:
66
      if name == "status":
67
        result.append(self._status)
68
      else:
69
        raise Exception("Unknown field")
70

    
71
    return result
72

    
73
  def GetLogEntries(self, newer_than):
74
    assert newer_than is None or newer_than >= 0
75

    
76
    if newer_than is None:
77
      return self._log
78

    
79
    return self._log[newer_than:]
80

    
81

    
82
class TestJobChangesChecker(unittest.TestCase):
83
  def testStatus(self):
84
    job = _FakeJob(9094, constants.JOB_STATUS_QUEUED)
85
    checker = jqueue._JobChangesChecker(["status"], None, None)
86
    self.assertEqual(checker(job), ([constants.JOB_STATUS_QUEUED], []))
87

    
88
    job.SetStatus(constants.JOB_STATUS_RUNNING)
89
    self.assertEqual(checker(job), ([constants.JOB_STATUS_RUNNING], []))
90

    
91
    job.SetStatus(constants.JOB_STATUS_SUCCESS)
92
    self.assertEqual(checker(job), ([constants.JOB_STATUS_SUCCESS], []))
93

    
94
    # job.id is used by checker
95
    self.assertEqual(job.id, 9094)
96

    
97
  def testStatusWithPrev(self):
98
    job = _FakeJob(12807, constants.JOB_STATUS_QUEUED)
99
    checker = jqueue._JobChangesChecker(["status"],
100
                                        [constants.JOB_STATUS_QUEUED], None)
101
    self.assert_(checker(job) is None)
102

    
103
    job.SetStatus(constants.JOB_STATUS_RUNNING)
104
    self.assertEqual(checker(job), ([constants.JOB_STATUS_RUNNING], []))
105

    
106
  def testFinalStatus(self):
107
    for status in constants.JOBS_FINALIZED:
108
      job = _FakeJob(2178711, status)
109
      checker = jqueue._JobChangesChecker(["status"], [status], None)
110
      # There won't be any changes in this status, hence it should signal
111
      # a change immediately
112
      self.assertEqual(checker(job), ([status], []))
113

    
114
  def testLog(self):
115
    job = _FakeJob(9094, constants.JOB_STATUS_RUNNING)
116
    checker = jqueue._JobChangesChecker(["status"], None, None)
117
    self.assertEqual(checker(job), ([constants.JOB_STATUS_RUNNING], []))
118

    
119
    job.AddLogEntry("Hello World")
120
    (job_info, log_entries) = checker(job)
121
    self.assertEqual(job_info, [constants.JOB_STATUS_RUNNING])
122
    self.assertEqual(log_entries, [[0, "Hello World"]])
123

    
124
    checker2 = jqueue._JobChangesChecker(["status"], job_info, len(log_entries))
125
    self.assert_(checker2(job) is None)
126

    
127
    job.AddLogEntry("Foo Bar")
128
    job.SetStatus(constants.JOB_STATUS_ERROR)
129

    
130
    (job_info, log_entries) = checker2(job)
131
    self.assertEqual(job_info, [constants.JOB_STATUS_ERROR])
132
    self.assertEqual(log_entries, [[1, "Foo Bar"]])
133

    
134
    checker3 = jqueue._JobChangesChecker(["status"], None, None)
135
    (job_info, log_entries) = checker3(job)
136
    self.assertEqual(job_info, [constants.JOB_STATUS_ERROR])
137
    self.assertEqual(log_entries, [[0, "Hello World"], [1, "Foo Bar"]])
138

    
139

    
140
class TestJobChangesWaiter(unittest.TestCase):
141
  def setUp(self):
142
    self.tmpdir = tempfile.mkdtemp()
143
    self.filename = utils.PathJoin(self.tmpdir, "job-1")
144
    utils.WriteFile(self.filename, data="")
145

    
146
  def tearDown(self):
147
    shutil.rmtree(self.tmpdir)
148

    
149
  def _EnsureNotifierClosed(self, notifier):
150
    try:
151
      os.fstat(notifier._fd)
152
    except EnvironmentError, err:
153
      self.assertEqual(err.errno, errno.EBADF)
154
    else:
155
      self.fail("File descriptor wasn't closed")
156

    
157
  def testClose(self):
158
    for wait in [False, True]:
159
      waiter = jqueue._JobFileChangesWaiter(self.filename)
160
      try:
161
        if wait:
162
          waiter.Wait(0.001)
163
      finally:
164
        waiter.Close()
165

    
166
      # Ensure file descriptor was closed
167
      self._EnsureNotifierClosed(waiter._notifier)
168

    
169
  def testChangingFile(self):
170
    waiter = jqueue._JobFileChangesWaiter(self.filename)
171
    try:
172
      self.assertFalse(waiter.Wait(0.1))
173
      utils.WriteFile(self.filename, data="changed")
174
      self.assert_(waiter.Wait(60))
175
    finally:
176
      waiter.Close()
177

    
178
    self._EnsureNotifierClosed(waiter._notifier)
179

    
180
  def testChangingFile2(self):
181
    waiter = jqueue._JobChangesWaiter(self.filename)
182
    try:
183
      self.assertFalse(waiter._filewaiter)
184
      self.assert_(waiter.Wait(0.1))
185
      self.assert_(waiter._filewaiter)
186

    
187
      # File waiter is now used, but there have been no changes
188
      self.assertFalse(waiter.Wait(0.1))
189
      utils.WriteFile(self.filename, data="changed")
190
      self.assert_(waiter.Wait(60))
191
    finally:
192
      waiter.Close()
193

    
194
    self._EnsureNotifierClosed(waiter._filewaiter._notifier)
195

    
196

    
197
class TestWaitForJobChangesHelper(unittest.TestCase):
198
  def setUp(self):
199
    self.tmpdir = tempfile.mkdtemp()
200
    self.filename = utils.PathJoin(self.tmpdir, "job-2614226563")
201
    utils.WriteFile(self.filename, data="")
202

    
203
  def tearDown(self):
204
    shutil.rmtree(self.tmpdir)
205

    
206
  def _LoadWaitingJob(self):
207
    return _FakeJob(2614226563, constants.JOB_STATUS_WAITING)
208

    
209
  def _LoadLostJob(self):
210
    return None
211

    
212
  def testNoChanges(self):
213
    wfjc = jqueue._WaitForJobChangesHelper()
214

    
215
    # No change
216
    self.assertEqual(wfjc(self.filename, self._LoadWaitingJob, ["status"],
217
                          [constants.JOB_STATUS_WAITING], None, 0.1),
218
                     constants.JOB_NOTCHANGED)
219

    
220
    # No previous information
221
    self.assertEqual(wfjc(self.filename, self._LoadWaitingJob,
222
                          ["status"], None, None, 1.0),
223
                     ([constants.JOB_STATUS_WAITING], []))
224

    
225
  def testLostJob(self):
226
    wfjc = jqueue._WaitForJobChangesHelper()
227
    self.assert_(wfjc(self.filename, self._LoadLostJob,
228
                      ["status"], None, None, 1.0) is None)
229

    
230

    
231
class TestEncodeOpError(unittest.TestCase):
232
  def test(self):
233
    encerr = jqueue._EncodeOpError(errors.LockError("Test 1"))
234
    self.assert_(isinstance(encerr, tuple))
235
    self.assertRaises(errors.LockError, errors.MaybeRaise, encerr)
236

    
237
    encerr = jqueue._EncodeOpError(errors.GenericError("Test 2"))
238
    self.assert_(isinstance(encerr, tuple))
239
    self.assertRaises(errors.GenericError, errors.MaybeRaise, encerr)
240

    
241
    encerr = jqueue._EncodeOpError(NotImplementedError("Foo"))
242
    self.assert_(isinstance(encerr, tuple))
243
    self.assertRaises(errors.OpExecError, errors.MaybeRaise, encerr)
244

    
245
    encerr = jqueue._EncodeOpError("Hello World")
246
    self.assert_(isinstance(encerr, tuple))
247
    self.assertRaises(errors.OpExecError, errors.MaybeRaise, encerr)
248

    
249

    
250
class TestQueuedOpCode(unittest.TestCase):
251
  def testDefaults(self):
252
    def _Check(op):
253
      self.assertFalse(hasattr(op.input, "dry_run"))
254
      self.assertEqual(op.priority, constants.OP_PRIO_DEFAULT)
255
      self.assertFalse(op.log)
256
      self.assert_(op.start_timestamp is None)
257
      self.assert_(op.exec_timestamp is None)
258
      self.assert_(op.end_timestamp is None)
259
      self.assert_(op.result is None)
260
      self.assertEqual(op.status, constants.OP_STATUS_QUEUED)
261

    
262
    op1 = jqueue._QueuedOpCode(opcodes.OpTestDelay())
263
    _Check(op1)
264
    op2 = jqueue._QueuedOpCode.Restore(op1.Serialize())
265
    _Check(op2)
266
    self.assertEqual(op1.Serialize(), op2.Serialize())
267

    
268
  def testPriority(self):
269
    def _Check(op):
270
      assert constants.OP_PRIO_DEFAULT != constants.OP_PRIO_HIGH, \
271
             "Default priority equals high priority; test can't work"
272
      self.assertEqual(op.priority, constants.OP_PRIO_HIGH)
273
      self.assertEqual(op.status, constants.OP_STATUS_QUEUED)
274

    
275
    inpop = opcodes.OpTagsGet(priority=constants.OP_PRIO_HIGH)
276
    op1 = jqueue._QueuedOpCode(inpop)
277
    _Check(op1)
278
    op2 = jqueue._QueuedOpCode.Restore(op1.Serialize())
279
    _Check(op2)
280
    self.assertEqual(op1.Serialize(), op2.Serialize())
281

    
282

    
283
class TestQueuedJob(unittest.TestCase):
284
  def test(self):
285
    self.assertRaises(errors.GenericError, jqueue._QueuedJob,
286
                      None, 1, [], False)
287

    
288
  def testDefaults(self):
289
    job_id = 4260
290
    ops = [
291
      opcodes.OpTagsGet(),
292
      opcodes.OpTestDelay(),
293
      ]
294

    
295
    def _Check(job):
296
      self.assertTrue(job.writable)
297
      self.assertEqual(job.id, job_id)
298
      self.assertEqual(job.log_serial, 0)
299
      self.assert_(job.received_timestamp)
300
      self.assert_(job.start_timestamp is None)
301
      self.assert_(job.end_timestamp is None)
302
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
303
      self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT)
304
      self.assert_(repr(job).startswith("<"))
305
      self.assertEqual(len(job.ops), len(ops))
306
      self.assert_(compat.all(inp.__getstate__() == op.input.__getstate__()
307
                              for (inp, op) in zip(ops, job.ops)))
308
      self.assertRaises(errors.OpPrereqError, job.GetInfo,
309
                        ["unknown-field"])
310
      self.assertEqual(job.GetInfo(["summary"]),
311
                       [[op.input.Summary() for op in job.ops]])
312

    
313
    job1 = jqueue._QueuedJob(None, job_id, ops, True)
314
    _Check(job1)
315
    job2 = jqueue._QueuedJob.Restore(None, job1.Serialize(), True)
316
    _Check(job2)
317
    self.assertEqual(job1.Serialize(), job2.Serialize())
318

    
319
  def testWritable(self):
320
    job = jqueue._QueuedJob(None, 1, [opcodes.OpTestDelay()], False)
321
    self.assertFalse(job.writable)
322

    
323
    job = jqueue._QueuedJob(None, 1, [opcodes.OpTestDelay()], True)
324
    self.assertTrue(job.writable)
325

    
326
  def testPriority(self):
327
    job_id = 4283
328
    ops = [
329
      opcodes.OpTagsGet(priority=constants.OP_PRIO_DEFAULT),
330
      opcodes.OpTestDelay(),
331
      ]
332

    
333
    def _Check(job):
334
      self.assertEqual(job.id, job_id)
335
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
336
      self.assert_(repr(job).startswith("<"))
337

    
338
    job = jqueue._QueuedJob(None, job_id, ops, True)
339
    _Check(job)
340
    self.assert_(compat.all(op.priority == constants.OP_PRIO_DEFAULT
341
                            for op in job.ops))
342
    self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT)
343

    
344
    # Increase first
345
    job.ops[0].priority -= 1
346
    _Check(job)
347
    self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT - 1)
348

    
349
    # Mark opcode as finished
350
    job.ops[0].status = constants.OP_STATUS_SUCCESS
351
    _Check(job)
352
    self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT)
353

    
354
    # Increase second
355
    job.ops[1].priority -= 10
356
    self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT - 10)
357

    
358
    # Test increasing first
359
    job.ops[0].status = constants.OP_STATUS_RUNNING
360
    job.ops[0].priority -= 19
361
    self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT - 20)
362

    
363
  def testCalcStatus(self):
364
    def _Queued(ops):
365
      # The default status is "queued"
366
      self.assert_(compat.all(op.status == constants.OP_STATUS_QUEUED
367
                              for op in ops))
368

    
369
    def _Waitlock1(ops):
370
      ops[0].status = constants.OP_STATUS_WAITING
371

    
372
    def _Waitlock2(ops):
373
      ops[0].status = constants.OP_STATUS_SUCCESS
374
      ops[1].status = constants.OP_STATUS_SUCCESS
375
      ops[2].status = constants.OP_STATUS_WAITING
376

    
377
    def _Running(ops):
378
      ops[0].status = constants.OP_STATUS_SUCCESS
379
      ops[1].status = constants.OP_STATUS_RUNNING
380
      for op in ops[2:]:
381
        op.status = constants.OP_STATUS_QUEUED
382

    
383
    def _Canceling1(ops):
384
      ops[0].status = constants.OP_STATUS_SUCCESS
385
      ops[1].status = constants.OP_STATUS_SUCCESS
386
      for op in ops[2:]:
387
        op.status = constants.OP_STATUS_CANCELING
388

    
389
    def _Canceling2(ops):
390
      for op in ops:
391
        op.status = constants.OP_STATUS_CANCELING
392

    
393
    def _Canceled(ops):
394
      for op in ops:
395
        op.status = constants.OP_STATUS_CANCELED
396

    
397
    def _Error1(ops):
398
      for idx, op in enumerate(ops):
399
        if idx > 3:
400
          op.status = constants.OP_STATUS_ERROR
401
        else:
402
          op.status = constants.OP_STATUS_SUCCESS
403

    
404
    def _Error2(ops):
405
      for op in ops:
406
        op.status = constants.OP_STATUS_ERROR
407

    
408
    def _Success(ops):
409
      for op in ops:
410
        op.status = constants.OP_STATUS_SUCCESS
411

    
412
    tests = {
413
      constants.JOB_STATUS_QUEUED: [_Queued],
414
      constants.JOB_STATUS_WAITING: [_Waitlock1, _Waitlock2],
415
      constants.JOB_STATUS_RUNNING: [_Running],
416
      constants.JOB_STATUS_CANCELING: [_Canceling1, _Canceling2],
417
      constants.JOB_STATUS_CANCELED: [_Canceled],
418
      constants.JOB_STATUS_ERROR: [_Error1, _Error2],
419
      constants.JOB_STATUS_SUCCESS: [_Success],
420
      }
421

    
422
    def _NewJob():
423
      job = jqueue._QueuedJob(None, 1,
424
                              [opcodes.OpTestDelay() for _ in range(10)],
425
                              True)
426
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
427
      self.assert_(compat.all(op.status == constants.OP_STATUS_QUEUED
428
                              for op in job.ops))
429
      return job
430

    
431
    for status in constants.JOB_STATUS_ALL:
432
      sttests = tests[status]
433
      assert sttests
434
      for fn in sttests:
435
        job = _NewJob()
436
        fn(job.ops)
437
        self.assertEqual(job.CalcStatus(), status)
438

    
439

    
440
class _FakeDependencyManager:
441
  def __init__(self):
442
    self._checks = []
443
    self._notifications = []
444
    self._waiting = set()
445

    
446
  def AddCheckResult(self, job, dep_job_id, dep_status, result):
447
    self._checks.append((job, dep_job_id, dep_status, result))
448

    
449
  def CountPendingResults(self):
450
    return len(self._checks)
451

    
452
  def CountWaitingJobs(self):
453
    return len(self._waiting)
454

    
455
  def GetNextNotification(self):
456
    return self._notifications.pop(0)
457

    
458
  def JobWaiting(self, job):
459
    return job in self._waiting
460

    
461
  def CheckAndRegister(self, job, dep_job_id, dep_status):
462
    (exp_job, exp_dep_job_id, exp_dep_status, result) = self._checks.pop(0)
463

    
464
    assert exp_job == job
465
    assert exp_dep_job_id == dep_job_id
466
    assert exp_dep_status == dep_status
467

    
468
    (result_status, _) = result
469

    
470
    if result_status == jqueue._JobDependencyManager.WAIT:
471
      self._waiting.add(job)
472
    elif result_status == jqueue._JobDependencyManager.CONTINUE:
473
      self._waiting.remove(job)
474

    
475
    return result
476

    
477
  def NotifyWaiters(self, job_id):
478
    self._notifications.append(job_id)
479

    
480

    
481
class _DisabledFakeDependencyManager:
482
  def JobWaiting(self, _):
483
    return False
484

    
485
  def CheckAndRegister(self, *args):
486
    assert False, "Should not be called"
487

    
488
  def NotifyWaiters(self, _):
489
    pass
490

    
491

    
492
class _FakeQueueForProc:
493
  def __init__(self, depmgr=None):
494
    self._acquired = False
495
    self._updates = []
496
    self._submitted = []
497

    
498
    self._submit_count = itertools.count(1000)
499

    
500
    if depmgr:
501
      self.depmgr = depmgr
502
    else:
503
      self.depmgr = _DisabledFakeDependencyManager()
504

    
505
  def IsAcquired(self):
506
    return self._acquired
507

    
508
  def GetNextUpdate(self):
509
    return self._updates.pop(0)
510

    
511
  def GetNextSubmittedJob(self):
512
    return self._submitted.pop(0)
513

    
514
  def acquire(self, shared=0):
515
    assert shared == 1
516
    self._acquired = True
517

    
518
  def release(self):
519
    assert self._acquired
520
    self._acquired = False
521

    
522
  def UpdateJobUnlocked(self, job, replicate=True):
523
    assert self._acquired, "Lock not acquired while updating job"
524
    self._updates.append((job, bool(replicate)))
525

    
526
  def SubmitManyJobs(self, jobs):
527
    assert not self._acquired, "Lock acquired while submitting jobs"
528
    job_ids = [self._submit_count.next() for _ in jobs]
529
    self._submitted.extend(zip(job_ids, jobs))
530
    return job_ids
531

    
532

    
533
class _FakeExecOpCodeForProc:
534
  def __init__(self, queue, before_start, after_start):
535
    self._queue = queue
536
    self._before_start = before_start
537
    self._after_start = after_start
538

    
539
  def __call__(self, op, cbs, timeout=None, priority=None):
540
    assert isinstance(op, opcodes.OpTestDummy)
541
    assert not self._queue.IsAcquired(), \
542
           "Queue lock not released when executing opcode"
543

    
544
    if self._before_start:
545
      self._before_start(timeout, priority)
546

    
547
    cbs.NotifyStart()
548

    
549
    if self._after_start:
550
      self._after_start(op, cbs)
551

    
552
    # Check again after the callbacks
553
    assert not self._queue.IsAcquired()
554

    
555
    if op.fail:
556
      raise errors.OpExecError("Error requested (%s)" % op.result)
557

    
558
    if hasattr(op, "submit_jobs") and op.submit_jobs is not None:
559
      return cbs.SubmitManyJobs(op.submit_jobs)
560

    
561
    return op.result
562

    
563

    
564
class _JobProcessorTestUtils:
565
  def _CreateJob(self, queue, job_id, ops):
566
    job = jqueue._QueuedJob(queue, job_id, ops, True)
567
    self.assertFalse(job.start_timestamp)
568
    self.assertFalse(job.end_timestamp)
569
    self.assertEqual(len(ops), len(job.ops))
570
    self.assert_(compat.all(op.input == inp
571
                            for (op, inp) in zip(job.ops, ops)))
572
    self.assertEqual(job.GetInfo(["ops"]), [[op.__getstate__() for op in ops]])
573
    return job
574

    
575

    
576
class TestJobProcessor(unittest.TestCase, _JobProcessorTestUtils):
577
  def _GenericCheckJob(self, job):
578
    assert compat.all(isinstance(op.input, opcodes.OpTestDummy)
579
                      for op in job.ops)
580

    
581
    self.assertEqual(job.GetInfo(["opstart", "opexec", "opend"]),
582
                     [[op.start_timestamp for op in job.ops],
583
                      [op.exec_timestamp for op in job.ops],
584
                      [op.end_timestamp for op in job.ops]])
585
    self.assertEqual(job.GetInfo(["received_ts", "start_ts", "end_ts"]),
586
                     [job.received_timestamp,
587
                      job.start_timestamp,
588
                      job.end_timestamp])
589
    self.assert_(job.start_timestamp)
590
    self.assert_(job.end_timestamp)
591
    self.assertEqual(job.start_timestamp, job.ops[0].start_timestamp)
592

    
593
  def testSuccess(self):
594
    queue = _FakeQueueForProc()
595

    
596
    for (job_id, opcount) in [(25351, 1), (6637, 3),
597
                              (24644, 10), (32207, 100)]:
598
      ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
599
             for i in range(opcount)]
600

    
601
      # Create job
602
      job = self._CreateJob(queue, job_id, ops)
603

    
604
      def _BeforeStart(timeout, priority):
605
        self.assertEqual(queue.GetNextUpdate(), (job, True))
606
        self.assertRaises(IndexError, queue.GetNextUpdate)
607
        self.assertFalse(queue.IsAcquired())
608
        self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITING)
609
        self.assertFalse(job.cur_opctx)
610

    
611
      def _AfterStart(op, cbs):
612
        self.assertEqual(queue.GetNextUpdate(), (job, True))
613
        self.assertRaises(IndexError, queue.GetNextUpdate)
614

    
615
        self.assertFalse(queue.IsAcquired())
616
        self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
617
        self.assertFalse(job.cur_opctx)
618

    
619
        # Job is running, cancelling shouldn't be possible
620
        (success, _) = job.Cancel()
621
        self.assertFalse(success)
622

    
623
      opexec = _FakeExecOpCodeForProc(queue, _BeforeStart, _AfterStart)
624

    
625
      for idx in range(len(ops)):
626
        self.assertRaises(IndexError, queue.GetNextUpdate)
627
        result = jqueue._JobProcessor(queue, opexec, job)()
628
        self.assertEqual(queue.GetNextUpdate(), (job, True))
629
        self.assertRaises(IndexError, queue.GetNextUpdate)
630
        if idx == len(ops) - 1:
631
          # Last opcode
632
          self.assertEqual(result, jqueue._JobProcessor.FINISHED)
633
        else:
634
          self.assertEqual(result, jqueue._JobProcessor.DEFER)
635

    
636
          self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
637
          self.assert_(job.start_timestamp)
638
          self.assertFalse(job.end_timestamp)
639

    
640
      self.assertRaises(IndexError, queue.GetNextUpdate)
641

    
642
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_SUCCESS)
643
      self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_SUCCESS])
644
      self.assertEqual(job.GetInfo(["opresult"]),
645
                       [[op.input.result for op in job.ops]])
646
      self.assertEqual(job.GetInfo(["opstatus"]),
647
                       [len(job.ops) * [constants.OP_STATUS_SUCCESS]])
648
      self.assert_(compat.all(op.start_timestamp and op.end_timestamp
649
                              for op in job.ops))
650

    
651
      self._GenericCheckJob(job)
652

    
653
      # Calling the processor on a finished job should be a no-op
654
      self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
655
                       jqueue._JobProcessor.FINISHED)
656
      self.assertRaises(IndexError, queue.GetNextUpdate)
657

    
658
  def testOpcodeError(self):
659
    queue = _FakeQueueForProc()
660

    
661
    testdata = [
662
      (17077, 1, 0, 0),
663
      (1782, 5, 2, 2),
664
      (18179, 10, 9, 9),
665
      (4744, 10, 3, 8),
666
      (23816, 100, 39, 45),
667
      ]
668

    
669
    for (job_id, opcount, failfrom, failto) in testdata:
670
      # Prepare opcodes
671
      ops = [opcodes.OpTestDummy(result="Res%s" % i,
672
                                 fail=(failfrom <= i and
673
                                       i <= failto))
674
             for i in range(opcount)]
675

    
676
      # Create job
677
      job = self._CreateJob(queue, str(job_id), ops)
678

    
679
      opexec = _FakeExecOpCodeForProc(queue, None, None)
680

    
681
      for idx in range(len(ops)):
682
        self.assertRaises(IndexError, queue.GetNextUpdate)
683
        result = jqueue._JobProcessor(queue, opexec, job)()
684
        # queued to waitlock
685
        self.assertEqual(queue.GetNextUpdate(), (job, True))
686
        # waitlock to running
687
        self.assertEqual(queue.GetNextUpdate(), (job, True))
688
        # Opcode result
689
        self.assertEqual(queue.GetNextUpdate(), (job, True))
690
        self.assertRaises(IndexError, queue.GetNextUpdate)
691

    
692
        if idx in (failfrom, len(ops) - 1):
693
          # Last opcode
694
          self.assertEqual(result, jqueue._JobProcessor.FINISHED)
695
          break
696

    
697
        self.assertEqual(result, jqueue._JobProcessor.DEFER)
698

    
699
        self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
700

    
701
      self.assertRaises(IndexError, queue.GetNextUpdate)
702

    
703
      # Check job status
704
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_ERROR)
705
      self.assertEqual(job.GetInfo(["id"]), [job_id])
706
      self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_ERROR])
707

    
708
      # Check opcode status
709
      data = zip(job.ops,
710
                 job.GetInfo(["opstatus"])[0],
711
                 job.GetInfo(["opresult"])[0])
712

    
713
      for idx, (op, opstatus, opresult) in enumerate(data):
714
        if idx < failfrom:
715
          assert not op.input.fail
716
          self.assertEqual(opstatus, constants.OP_STATUS_SUCCESS)
717
          self.assertEqual(opresult, op.input.result)
718
        elif idx <= failto:
719
          assert op.input.fail
720
          self.assertEqual(opstatus, constants.OP_STATUS_ERROR)
721
          self.assertRaises(errors.OpExecError, errors.MaybeRaise, opresult)
722
        else:
723
          assert not op.input.fail
724
          self.assertEqual(opstatus, constants.OP_STATUS_ERROR)
725
          self.assertRaises(errors.OpExecError, errors.MaybeRaise, opresult)
726

    
727
      self.assert_(compat.all(op.start_timestamp and op.end_timestamp
728
                              for op in job.ops[:failfrom]))
729

    
730
      self._GenericCheckJob(job)
731

    
732
      # Calling the processor on a finished job should be a no-op
733
      self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
734
                       jqueue._JobProcessor.FINISHED)
735
      self.assertRaises(IndexError, queue.GetNextUpdate)
736

    
737
  def testCancelWhileInQueue(self):
738
    queue = _FakeQueueForProc()
739

    
740
    ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
741
           for i in range(5)]
742

    
743
    # Create job
744
    job_id = 17045
745
    job = self._CreateJob(queue, job_id, ops)
746

    
747
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
748

    
749
    # Mark as cancelled
750
    (success, _) = job.Cancel()
751
    self.assert_(success)
752

    
753
    self.assertRaises(IndexError, queue.GetNextUpdate)
754

    
755
    self.assertFalse(job.start_timestamp)
756
    self.assertTrue(job.end_timestamp)
757
    self.assert_(compat.all(op.status == constants.OP_STATUS_CANCELED
758
                            for op in job.ops))
759

    
760
    # Serialize to check for differences
761
    before_proc = job.Serialize()
762

    
763
    # Simulate processor called in workerpool
764
    opexec = _FakeExecOpCodeForProc(queue, None, None)
765
    self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
766
                     jqueue._JobProcessor.FINISHED)
767

    
768
    # Check result
769
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_CANCELED)
770
    self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_CANCELED])
771
    self.assertFalse(job.start_timestamp)
772
    self.assertTrue(job.end_timestamp)
773
    self.assertFalse(compat.any(op.start_timestamp or op.end_timestamp
774
                                for op in job.ops))
775
    self.assertEqual(job.GetInfo(["opstatus", "opresult"]),
776
                     [[constants.OP_STATUS_CANCELED for _ in job.ops],
777
                      ["Job canceled by request" for _ in job.ops]])
778

    
779
    # Must not have changed or written
780
    self.assertEqual(before_proc, job.Serialize())
781
    self.assertRaises(IndexError, queue.GetNextUpdate)
782

    
783
  def testCancelWhileWaitlockInQueue(self):
784
    queue = _FakeQueueForProc()
785

    
786
    ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
787
           for i in range(5)]
788

    
789
    # Create job
790
    job_id = 8645
791
    job = self._CreateJob(queue, job_id, ops)
792

    
793
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
794

    
795
    job.ops[0].status = constants.OP_STATUS_WAITING
796

    
797
    assert len(job.ops) == 5
798

    
799
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITING)
800

    
801
    # Mark as cancelling
802
    (success, _) = job.Cancel()
803
    self.assert_(success)
804

    
805
    self.assertRaises(IndexError, queue.GetNextUpdate)
806

    
807
    self.assert_(compat.all(op.status == constants.OP_STATUS_CANCELING
808
                            for op in job.ops))
809

    
810
    opexec = _FakeExecOpCodeForProc(queue, None, None)
811
    self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
812
                     jqueue._JobProcessor.FINISHED)
813

    
814
    # Check result
815
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_CANCELED)
816
    self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_CANCELED])
817
    self.assertFalse(job.start_timestamp)
818
    self.assert_(job.end_timestamp)
819
    self.assertFalse(compat.any(op.start_timestamp or op.end_timestamp
820
                                for op in job.ops))
821
    self.assertEqual(job.GetInfo(["opstatus", "opresult"]),
822
                     [[constants.OP_STATUS_CANCELED for _ in job.ops],
823
                      ["Job canceled by request" for _ in job.ops]])
824

    
825
  def testCancelWhileWaitlock(self):
826
    queue = _FakeQueueForProc()
827

    
828
    ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
829
           for i in range(5)]
830

    
831
    # Create job
832
    job_id = 11009
833
    job = self._CreateJob(queue, job_id, ops)
834

    
835
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
836

    
837
    def _BeforeStart(timeout, priority):
838
      self.assertEqual(queue.GetNextUpdate(), (job, True))
839
      self.assertRaises(IndexError, queue.GetNextUpdate)
840
      self.assertFalse(queue.IsAcquired())
841
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITING)
842

    
843
      # Mark as cancelled
844
      (success, _) = job.Cancel()
845
      self.assert_(success)
846

    
847
      self.assert_(compat.all(op.status == constants.OP_STATUS_CANCELING
848
                              for op in job.ops))
849
      self.assertRaises(IndexError, queue.GetNextUpdate)
850

    
851
    def _AfterStart(op, cbs):
852
      self.assertEqual(queue.GetNextUpdate(), (job, True))
853
      self.assertRaises(IndexError, queue.GetNextUpdate)
854
      self.assertFalse(queue.IsAcquired())
855
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
856

    
857
    opexec = _FakeExecOpCodeForProc(queue, _BeforeStart, _AfterStart)
858

    
859
    self.assertRaises(IndexError, queue.GetNextUpdate)
860
    self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
861
                     jqueue._JobProcessor.FINISHED)
862
    self.assertEqual(queue.GetNextUpdate(), (job, True))
863
    self.assertRaises(IndexError, queue.GetNextUpdate)
864

    
865
    # Check result
866
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_CANCELED)
867
    self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_CANCELED])
868
    self.assert_(job.start_timestamp)
869
    self.assert_(job.end_timestamp)
870
    self.assertFalse(compat.all(op.start_timestamp and op.end_timestamp
871
                                for op in job.ops))
872
    self.assertEqual(job.GetInfo(["opstatus", "opresult"]),
873
                     [[constants.OP_STATUS_CANCELED for _ in job.ops],
874
                      ["Job canceled by request" for _ in job.ops]])
875

    
876
  def testCancelWhileWaitlockWithTimeout(self):
877
    queue = _FakeQueueForProc()
878

    
879
    ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
880
           for i in range(5)]
881

    
882
    # Create job
883
    job_id = 24314
884
    job = self._CreateJob(queue, job_id, ops)
885

    
886
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
887

    
888
    def _BeforeStart(timeout, priority):
889
      self.assertFalse(queue.IsAcquired())
890
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITING)
891

    
892
      # Mark as cancelled
893
      (success, _) = job.Cancel()
894
      self.assert_(success)
895

    
896
      self.assert_(compat.all(op.status == constants.OP_STATUS_CANCELING
897
                              for op in job.ops))
898

    
899
      # Fake an acquire attempt timing out
900
      raise mcpu.LockAcquireTimeout()
901

    
902
    def _AfterStart(op, cbs):
903
      self.fail("Should not reach this")
904

    
905
    opexec = _FakeExecOpCodeForProc(queue, _BeforeStart, _AfterStart)
906

    
907
    self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
908
                     jqueue._JobProcessor.FINISHED)
909

    
910
    # Check result
911
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_CANCELED)
912
    self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_CANCELED])
913
    self.assert_(job.start_timestamp)
914
    self.assert_(job.end_timestamp)
915
    self.assertFalse(compat.all(op.start_timestamp and op.end_timestamp
916
                                for op in job.ops))
917
    self.assertEqual(job.GetInfo(["opstatus", "opresult"]),
918
                     [[constants.OP_STATUS_CANCELED for _ in job.ops],
919
                      ["Job canceled by request" for _ in job.ops]])
920

    
921
  def testCancelWhileRunning(self):
922
    # Tests canceling a job with finished opcodes and more, unprocessed ones
923
    queue = _FakeQueueForProc()
924

    
925
    ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
926
           for i in range(3)]
927

    
928
    # Create job
929
    job_id = 28492
930
    job = self._CreateJob(queue, job_id, ops)
931

    
932
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
933

    
934
    opexec = _FakeExecOpCodeForProc(queue, None, None)
935

    
936
    # Run one opcode
937
    self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
938
                     jqueue._JobProcessor.DEFER)
939

    
940
    # Job goes back to queued
941
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
942
    self.assertEqual(job.GetInfo(["opstatus", "opresult"]),
943
                     [[constants.OP_STATUS_SUCCESS,
944
                       constants.OP_STATUS_QUEUED,
945
                       constants.OP_STATUS_QUEUED],
946
                      ["Res0", None, None]])
947

    
948
    # Mark as cancelled
949
    (success, _) = job.Cancel()
950
    self.assert_(success)
951

    
952
    # Try processing another opcode (this will actually cancel the job)
953
    self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
954
                     jqueue._JobProcessor.FINISHED)
955

    
956
    # Check result
957
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_CANCELED)
958
    self.assertEqual(job.GetInfo(["id"]), [job_id])
959
    self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_CANCELED])
960
    self.assertEqual(job.GetInfo(["opstatus", "opresult"]),
961
                     [[constants.OP_STATUS_SUCCESS,
962
                       constants.OP_STATUS_CANCELED,
963
                       constants.OP_STATUS_CANCELED],
964
                      ["Res0", "Job canceled by request",
965
                       "Job canceled by request"]])
966

    
967
  def testPartiallyRun(self):
968
    # Tests calling the processor on a job that's been partially run before the
969
    # program was restarted
970
    queue = _FakeQueueForProc()
971

    
972
    opexec = _FakeExecOpCodeForProc(queue, None, None)
973

    
974
    for job_id, successcount in [(30697, 1), (2552, 4), (12489, 9)]:
975
      ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
976
             for i in range(10)]
977

    
978
      # Create job
979
      job = self._CreateJob(queue, job_id, ops)
980

    
981
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
982

    
983
      for _ in range(successcount):
984
        self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
985
                         jqueue._JobProcessor.DEFER)
986

    
987
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
988
      self.assertEqual(job.GetInfo(["opstatus"]),
989
                       [[constants.OP_STATUS_SUCCESS
990
                         for _ in range(successcount)] +
991
                        [constants.OP_STATUS_QUEUED
992
                         for _ in range(len(ops) - successcount)]])
993

    
994
      self.assert_(job.ops_iter)
995

    
996
      # Serialize and restore (simulates program restart)
997
      newjob = jqueue._QueuedJob.Restore(queue, job.Serialize(), True)
998
      self.assertFalse(newjob.ops_iter)
999
      self._TestPartial(newjob, successcount)
1000

    
1001
  def _TestPartial(self, job, successcount):
1002
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1003
    self.assertEqual(job.start_timestamp, job.ops[0].start_timestamp)
1004

    
1005
    queue = _FakeQueueForProc()
1006
    opexec = _FakeExecOpCodeForProc(queue, None, None)
1007

    
1008
    for remaining in reversed(range(len(job.ops) - successcount)):
1009
      result = jqueue._JobProcessor(queue, opexec, job)()
1010
      self.assertEqual(queue.GetNextUpdate(), (job, True))
1011
      self.assertEqual(queue.GetNextUpdate(), (job, True))
1012
      self.assertEqual(queue.GetNextUpdate(), (job, True))
1013
      self.assertRaises(IndexError, queue.GetNextUpdate)
1014

    
1015
      if remaining == 0:
1016
        # Last opcode
1017
        self.assertEqual(result, jqueue._JobProcessor.FINISHED)
1018
        break
1019

    
1020
      self.assertEqual(result, jqueue._JobProcessor.DEFER)
1021

    
1022
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1023

    
1024
    self.assertRaises(IndexError, queue.GetNextUpdate)
1025
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_SUCCESS)
1026
    self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_SUCCESS])
1027
    self.assertEqual(job.GetInfo(["opresult"]),
1028
                     [[op.input.result for op in job.ops]])
1029
    self.assertEqual(job.GetInfo(["opstatus"]),
1030
                     [[constants.OP_STATUS_SUCCESS for _ in job.ops]])
1031
    self.assert_(compat.all(op.start_timestamp and op.end_timestamp
1032
                            for op in job.ops))
1033

    
1034
    self._GenericCheckJob(job)
1035

    
1036
    # Calling the processor on a finished job should be a no-op
1037
    self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
1038
                     jqueue._JobProcessor.FINISHED)
1039
    self.assertRaises(IndexError, queue.GetNextUpdate)
1040

    
1041
    # ... also after being restored
1042
    job2 = jqueue._QueuedJob.Restore(queue, job.Serialize(), True)
1043
    # Calling the processor on a finished job should be a no-op
1044
    self.assertEqual(jqueue._JobProcessor(queue, opexec, job2)(),
1045
                     jqueue._JobProcessor.FINISHED)
1046
    self.assertRaises(IndexError, queue.GetNextUpdate)
1047

    
1048
  def testProcessorOnRunningJob(self):
1049
    ops = [opcodes.OpTestDummy(result="result", fail=False)]
1050

    
1051
    queue = _FakeQueueForProc()
1052
    opexec = _FakeExecOpCodeForProc(queue, None, None)
1053

    
1054
    # Create job
1055
    job = self._CreateJob(queue, 9571, ops)
1056

    
1057
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1058

    
1059
    job.ops[0].status = constants.OP_STATUS_RUNNING
1060

    
1061
    assert len(job.ops) == 1
1062

    
1063
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
1064

    
1065
    # Calling on running job must fail
1066
    self.assertRaises(errors.ProgrammerError,
1067
                      jqueue._JobProcessor(queue, opexec, job))
1068

    
1069
  def testLogMessages(self):
1070
    # Tests the "Feedback" callback function
1071
    queue = _FakeQueueForProc()
1072

    
1073
    messages = {
1074
      1: [
1075
        (None, "Hello"),
1076
        (None, "World"),
1077
        (constants.ELOG_MESSAGE, "there"),
1078
        ],
1079
      4: [
1080
        (constants.ELOG_JQUEUE_TEST, (1, 2, 3)),
1081
        (constants.ELOG_JQUEUE_TEST, ("other", "type")),
1082
        ],
1083
      }
1084
    ops = [opcodes.OpTestDummy(result="Logtest%s" % i, fail=False,
1085
                               messages=messages.get(i, []))
1086
           for i in range(5)]
1087

    
1088
    # Create job
1089
    job = self._CreateJob(queue, 29386, ops)
1090

    
1091
    def _BeforeStart(timeout, priority):
1092
      self.assertEqual(queue.GetNextUpdate(), (job, True))
1093
      self.assertRaises(IndexError, queue.GetNextUpdate)
1094
      self.assertFalse(queue.IsAcquired())
1095
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITING)
1096

    
1097
    def _AfterStart(op, cbs):
1098
      self.assertEqual(queue.GetNextUpdate(), (job, True))
1099
      self.assertRaises(IndexError, queue.GetNextUpdate)
1100
      self.assertFalse(queue.IsAcquired())
1101
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
1102

    
1103
      self.assertRaises(AssertionError, cbs.Feedback,
1104
                        "too", "many", "arguments")
1105

    
1106
      for (log_type, msg) in op.messages:
1107
        self.assertRaises(IndexError, queue.GetNextUpdate)
1108
        if log_type:
1109
          cbs.Feedback(log_type, msg)
1110
        else:
1111
          cbs.Feedback(msg)
1112
        # Check for job update without replication
1113
        self.assertEqual(queue.GetNextUpdate(), (job, False))
1114
        self.assertRaises(IndexError, queue.GetNextUpdate)
1115

    
1116
    opexec = _FakeExecOpCodeForProc(queue, _BeforeStart, _AfterStart)
1117

    
1118
    for remaining in reversed(range(len(job.ops))):
1119
      self.assertRaises(IndexError, queue.GetNextUpdate)
1120
      result = jqueue._JobProcessor(queue, opexec, job)()
1121
      self.assertEqual(queue.GetNextUpdate(), (job, True))
1122
      self.assertRaises(IndexError, queue.GetNextUpdate)
1123

    
1124
      if remaining == 0:
1125
        # Last opcode
1126
        self.assertEqual(result, jqueue._JobProcessor.FINISHED)
1127
        break
1128

    
1129
      self.assertEqual(result, jqueue._JobProcessor.DEFER)
1130

    
1131
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1132

    
1133
    self.assertRaises(IndexError, queue.GetNextUpdate)
1134

    
1135
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_SUCCESS)
1136
    self.assertEqual(job.GetInfo(["opresult"]),
1137
                     [[op.input.result for op in job.ops]])
1138

    
1139
    logmsgcount = sum(len(m) for m in messages.values())
1140

    
1141
    self._CheckLogMessages(job, logmsgcount)
1142

    
1143
    # Serialize and restore (simulates program restart)
1144
    newjob = jqueue._QueuedJob.Restore(queue, job.Serialize(), True)
1145
    self._CheckLogMessages(newjob, logmsgcount)
1146

    
1147
    # Check each message
1148
    prevserial = -1
1149
    for idx, oplog in enumerate(job.GetInfo(["oplog"])[0]):
1150
      for (serial, timestamp, log_type, msg) in oplog:
1151
        (exptype, expmsg) = messages.get(idx).pop(0)
1152
        if exptype:
1153
          self.assertEqual(log_type, exptype)
1154
        else:
1155
          self.assertEqual(log_type, constants.ELOG_MESSAGE)
1156
        self.assertEqual(expmsg, msg)
1157
        self.assert_(serial > prevserial)
1158
        prevserial = serial
1159

    
1160
  def _CheckLogMessages(self, job, count):
1161
    # Check serial
1162
    self.assertEqual(job.log_serial, count)
1163

    
1164
    # No filter
1165
    self.assertEqual(job.GetLogEntries(None),
1166
                     [entry for entries in job.GetInfo(["oplog"])[0] if entries
1167
                      for entry in entries])
1168

    
1169
    # Filter with serial
1170
    assert count > 3
1171
    self.assert_(job.GetLogEntries(3))
1172
    self.assertEqual(job.GetLogEntries(3),
1173
                     [entry for entries in job.GetInfo(["oplog"])[0] if entries
1174
                      for entry in entries][3:])
1175

    
1176
    # No log message after highest serial
1177
    self.assertFalse(job.GetLogEntries(count))
1178
    self.assertFalse(job.GetLogEntries(count + 3))
1179

    
1180
  def testSubmitManyJobs(self):
1181
    queue = _FakeQueueForProc()
1182

    
1183
    job_id = 15656
1184
    ops = [
1185
      opcodes.OpTestDummy(result="Res0", fail=False,
1186
                          submit_jobs=[]),
1187
      opcodes.OpTestDummy(result="Res1", fail=False,
1188
                          submit_jobs=[
1189
                            [opcodes.OpTestDummy(result="r1j0", fail=False)],
1190
                            ]),
1191
      opcodes.OpTestDummy(result="Res2", fail=False,
1192
                          submit_jobs=[
1193
                            [opcodes.OpTestDummy(result="r2j0o0", fail=False),
1194
                             opcodes.OpTestDummy(result="r2j0o1", fail=False),
1195
                             opcodes.OpTestDummy(result="r2j0o2", fail=False),
1196
                             opcodes.OpTestDummy(result="r2j0o3", fail=False)],
1197
                            [opcodes.OpTestDummy(result="r2j1", fail=False)],
1198
                            [opcodes.OpTestDummy(result="r2j3o0", fail=False),
1199
                             opcodes.OpTestDummy(result="r2j3o1", fail=False)],
1200
                            ]),
1201
      ]
1202

    
1203
    # Create job
1204
    job = self._CreateJob(queue, job_id, ops)
1205

    
1206
    def _BeforeStart(timeout, priority):
1207
      self.assertEqual(queue.GetNextUpdate(), (job, True))
1208
      self.assertRaises(IndexError, queue.GetNextUpdate)
1209
      self.assertFalse(queue.IsAcquired())
1210
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITING)
1211
      self.assertFalse(job.cur_opctx)
1212

    
1213
    def _AfterStart(op, cbs):
1214
      self.assertEqual(queue.GetNextUpdate(), (job, True))
1215
      self.assertRaises(IndexError, queue.GetNextUpdate)
1216

    
1217
      self.assertFalse(queue.IsAcquired())
1218
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
1219
      self.assertFalse(job.cur_opctx)
1220

    
1221
      # Job is running, cancelling shouldn't be possible
1222
      (success, _) = job.Cancel()
1223
      self.assertFalse(success)
1224

    
1225
    opexec = _FakeExecOpCodeForProc(queue, _BeforeStart, _AfterStart)
1226

    
1227
    for idx in range(len(ops)):
1228
      self.assertRaises(IndexError, queue.GetNextUpdate)
1229
      result = jqueue._JobProcessor(queue, opexec, job)()
1230
      self.assertEqual(queue.GetNextUpdate(), (job, True))
1231
      self.assertRaises(IndexError, queue.GetNextUpdate)
1232
      if idx == len(ops) - 1:
1233
        # Last opcode
1234
        self.assertEqual(result, jqueue._JobProcessor.FINISHED)
1235
      else:
1236
        self.assertEqual(result, jqueue._JobProcessor.DEFER)
1237

    
1238
        self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1239
        self.assert_(job.start_timestamp)
1240
        self.assertFalse(job.end_timestamp)
1241

    
1242
    self.assertRaises(IndexError, queue.GetNextUpdate)
1243

    
1244
    for idx, submitted_ops in enumerate(job_ops
1245
                                        for op in ops
1246
                                        for job_ops in op.submit_jobs):
1247
      self.assertEqual(queue.GetNextSubmittedJob(),
1248
                       (1000 + idx, submitted_ops))
1249
    self.assertRaises(IndexError, queue.GetNextSubmittedJob)
1250

    
1251
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_SUCCESS)
1252
    self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_SUCCESS])
1253
    self.assertEqual(job.GetInfo(["opresult"]),
1254
                     [[[], [1000], [1001, 1002, 1003]]])
1255
    self.assertEqual(job.GetInfo(["opstatus"]),
1256
                     [len(job.ops) * [constants.OP_STATUS_SUCCESS]])
1257

    
1258
    self._GenericCheckJob(job)
1259

    
1260
    # Calling the processor on a finished job should be a no-op
1261
    self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
1262
                     jqueue._JobProcessor.FINISHED)
1263
    self.assertRaises(IndexError, queue.GetNextUpdate)
1264

    
1265
  def testJobDependency(self):
1266
    depmgr = _FakeDependencyManager()
1267
    queue = _FakeQueueForProc(depmgr=depmgr)
1268

    
1269
    self.assertEqual(queue.depmgr, depmgr)
1270

    
1271
    prev_job_id = 22113
1272
    prev_job_id2 = 28102
1273
    job_id = 29929
1274
    ops = [
1275
      opcodes.OpTestDummy(result="Res0", fail=False,
1276
                          depends=[
1277
                            [prev_job_id2, None],
1278
                            [prev_job_id, None],
1279
                            ]),
1280
      opcodes.OpTestDummy(result="Res1", fail=False),
1281
      ]
1282

    
1283
    # Create job
1284
    job = self._CreateJob(queue, job_id, ops)
1285

    
1286
    def _BeforeStart(timeout, priority):
1287
      if attempt == 0 or attempt > 5:
1288
        # Job should only be updated when it wasn't waiting for another job
1289
        self.assertEqual(queue.GetNextUpdate(), (job, True))
1290
      self.assertRaises(IndexError, queue.GetNextUpdate)
1291
      self.assertFalse(queue.IsAcquired())
1292
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITING)
1293
      self.assertFalse(job.cur_opctx)
1294

    
1295
    def _AfterStart(op, cbs):
1296
      self.assertEqual(queue.GetNextUpdate(), (job, True))
1297
      self.assertRaises(IndexError, queue.GetNextUpdate)
1298

    
1299
      self.assertFalse(queue.IsAcquired())
1300
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
1301
      self.assertFalse(job.cur_opctx)
1302

    
1303
      # Job is running, cancelling shouldn't be possible
1304
      (success, _) = job.Cancel()
1305
      self.assertFalse(success)
1306

    
1307
    opexec = _FakeExecOpCodeForProc(queue, _BeforeStart, _AfterStart)
1308

    
1309
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1310

    
1311
    counter = itertools.count()
1312
    while True:
1313
      attempt = counter.next()
1314

    
1315
      self.assertRaises(IndexError, queue.GetNextUpdate)
1316
      self.assertRaises(IndexError, depmgr.GetNextNotification)
1317

    
1318
      if attempt < 2:
1319
        depmgr.AddCheckResult(job, prev_job_id2, None,
1320
                              (jqueue._JobDependencyManager.WAIT, "wait2"))
1321
      elif attempt == 2:
1322
        depmgr.AddCheckResult(job, prev_job_id2, None,
1323
                              (jqueue._JobDependencyManager.CONTINUE, "cont"))
1324
        # The processor will ask for the next dependency immediately
1325
        depmgr.AddCheckResult(job, prev_job_id, None,
1326
                              (jqueue._JobDependencyManager.WAIT, "wait"))
1327
      elif attempt < 5:
1328
        depmgr.AddCheckResult(job, prev_job_id, None,
1329
                              (jqueue._JobDependencyManager.WAIT, "wait"))
1330
      elif attempt == 5:
1331
        depmgr.AddCheckResult(job, prev_job_id, None,
1332
                              (jqueue._JobDependencyManager.CONTINUE, "cont"))
1333
      if attempt == 2:
1334
        self.assertEqual(depmgr.CountPendingResults(), 2)
1335
      elif attempt > 5:
1336
        self.assertEqual(depmgr.CountPendingResults(), 0)
1337
      else:
1338
        self.assertEqual(depmgr.CountPendingResults(), 1)
1339

    
1340
      result = jqueue._JobProcessor(queue, opexec, job)()
1341
      if attempt == 0 or attempt >= 5:
1342
        # Job should only be updated if there was an actual change
1343
        self.assertEqual(queue.GetNextUpdate(), (job, True))
1344
      self.assertRaises(IndexError, queue.GetNextUpdate)
1345
      self.assertFalse(depmgr.CountPendingResults())
1346

    
1347
      if attempt < 5:
1348
        # Simulate waiting for other job
1349
        self.assertEqual(result, jqueue._JobProcessor.WAITDEP)
1350
        self.assertTrue(job.cur_opctx)
1351
        self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITING)
1352
        self.assertRaises(IndexError, depmgr.GetNextNotification)
1353
        self.assert_(job.start_timestamp)
1354
        self.assertFalse(job.end_timestamp)
1355
        continue
1356

    
1357
      if result == jqueue._JobProcessor.FINISHED:
1358
        # Last opcode
1359
        self.assertFalse(job.cur_opctx)
1360
        break
1361

    
1362
      self.assertRaises(IndexError, depmgr.GetNextNotification)
1363

    
1364
      self.assertEqual(result, jqueue._JobProcessor.DEFER)
1365
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1366
      self.assert_(job.start_timestamp)
1367
      self.assertFalse(job.end_timestamp)
1368

    
1369
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_SUCCESS)
1370
    self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_SUCCESS])
1371
    self.assertEqual(job.GetInfo(["opresult"]),
1372
                     [[op.input.result for op in job.ops]])
1373
    self.assertEqual(job.GetInfo(["opstatus"]),
1374
                     [len(job.ops) * [constants.OP_STATUS_SUCCESS]])
1375
    self.assertTrue(compat.all(op.start_timestamp and op.end_timestamp
1376
                               for op in job.ops))
1377

    
1378
    self._GenericCheckJob(job)
1379

    
1380
    self.assertRaises(IndexError, queue.GetNextUpdate)
1381
    self.assertRaises(IndexError, depmgr.GetNextNotification)
1382
    self.assertFalse(depmgr.CountPendingResults())
1383
    self.assertFalse(depmgr.CountWaitingJobs())
1384

    
1385
    # Calling the processor on a finished job should be a no-op
1386
    self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
1387
                     jqueue._JobProcessor.FINISHED)
1388
    self.assertRaises(IndexError, queue.GetNextUpdate)
1389

    
1390
  def testJobDependencyCancel(self):
1391
    depmgr = _FakeDependencyManager()
1392
    queue = _FakeQueueForProc(depmgr=depmgr)
1393

    
1394
    self.assertEqual(queue.depmgr, depmgr)
1395

    
1396
    prev_job_id = 13623
1397
    job_id = 30876
1398
    ops = [
1399
      opcodes.OpTestDummy(result="Res0", fail=False),
1400
      opcodes.OpTestDummy(result="Res1", fail=False,
1401
                          depends=[
1402
                            [prev_job_id, None],
1403
                            ]),
1404
      opcodes.OpTestDummy(result="Res2", fail=False),
1405
      ]
1406

    
1407
    # Create job
1408
    job = self._CreateJob(queue, job_id, ops)
1409

    
1410
    def _BeforeStart(timeout, priority):
1411
      if attempt == 0 or attempt > 5:
1412
        # Job should only be updated when it wasn't waiting for another job
1413
        self.assertEqual(queue.GetNextUpdate(), (job, True))
1414
      self.assertRaises(IndexError, queue.GetNextUpdate)
1415
      self.assertFalse(queue.IsAcquired())
1416
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITING)
1417
      self.assertFalse(job.cur_opctx)
1418

    
1419
    def _AfterStart(op, cbs):
1420
      self.assertEqual(queue.GetNextUpdate(), (job, True))
1421
      self.assertRaises(IndexError, queue.GetNextUpdate)
1422

    
1423
      self.assertFalse(queue.IsAcquired())
1424
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
1425
      self.assertFalse(job.cur_opctx)
1426

    
1427
      # Job is running, cancelling shouldn't be possible
1428
      (success, _) = job.Cancel()
1429
      self.assertFalse(success)
1430

    
1431
    opexec = _FakeExecOpCodeForProc(queue, _BeforeStart, _AfterStart)
1432

    
1433
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1434

    
1435
    counter = itertools.count()
1436
    while True:
1437
      attempt = counter.next()
1438

    
1439
      self.assertRaises(IndexError, queue.GetNextUpdate)
1440
      self.assertRaises(IndexError, depmgr.GetNextNotification)
1441

    
1442
      if attempt == 0:
1443
        # This will handle the first opcode
1444
        pass
1445
      elif attempt < 4:
1446
        depmgr.AddCheckResult(job, prev_job_id, None,
1447
                              (jqueue._JobDependencyManager.WAIT, "wait"))
1448
      elif attempt == 4:
1449
        # Other job was cancelled
1450
        depmgr.AddCheckResult(job, prev_job_id, None,
1451
                              (jqueue._JobDependencyManager.CANCEL, "cancel"))
1452

    
1453
      if attempt == 0:
1454
        self.assertEqual(depmgr.CountPendingResults(), 0)
1455
      else:
1456
        self.assertEqual(depmgr.CountPendingResults(), 1)
1457

    
1458
      result = jqueue._JobProcessor(queue, opexec, job)()
1459
      if attempt <= 1 or attempt >= 4:
1460
        # Job should only be updated if there was an actual change
1461
        self.assertEqual(queue.GetNextUpdate(), (job, True))
1462
      self.assertRaises(IndexError, queue.GetNextUpdate)
1463
      self.assertFalse(depmgr.CountPendingResults())
1464

    
1465
      if attempt > 0 and attempt < 4:
1466
        # Simulate waiting for other job
1467
        self.assertEqual(result, jqueue._JobProcessor.WAITDEP)
1468
        self.assertTrue(job.cur_opctx)
1469
        self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITING)
1470
        self.assertRaises(IndexError, depmgr.GetNextNotification)
1471
        self.assert_(job.start_timestamp)
1472
        self.assertFalse(job.end_timestamp)
1473
        continue
1474

    
1475
      if result == jqueue._JobProcessor.FINISHED:
1476
        # Last opcode
1477
        self.assertFalse(job.cur_opctx)
1478
        break
1479

    
1480
      self.assertRaises(IndexError, depmgr.GetNextNotification)
1481

    
1482
      self.assertEqual(result, jqueue._JobProcessor.DEFER)
1483
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1484
      self.assert_(job.start_timestamp)
1485
      self.assertFalse(job.end_timestamp)
1486

    
1487
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_CANCELED)
1488
    self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_CANCELED])
1489
    self.assertEqual(job.GetInfo(["opstatus", "opresult"]),
1490
                     [[constants.OP_STATUS_SUCCESS,
1491
                       constants.OP_STATUS_CANCELED,
1492
                       constants.OP_STATUS_CANCELED],
1493
                      ["Res0", "Job canceled by request",
1494
                       "Job canceled by request"]])
1495

    
1496
    self._GenericCheckJob(job)
1497

    
1498
    self.assertRaises(IndexError, queue.GetNextUpdate)
1499
    self.assertRaises(IndexError, depmgr.GetNextNotification)
1500
    self.assertFalse(depmgr.CountPendingResults())
1501

    
1502
    # Calling the processor on a finished job should be a no-op
1503
    self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
1504
                     jqueue._JobProcessor.FINISHED)
1505
    self.assertRaises(IndexError, queue.GetNextUpdate)
1506

    
1507
  def testJobDependencyWrongstatus(self):
1508
    depmgr = _FakeDependencyManager()
1509
    queue = _FakeQueueForProc(depmgr=depmgr)
1510

    
1511
    self.assertEqual(queue.depmgr, depmgr)
1512

    
1513
    prev_job_id = 9741
1514
    job_id = 11763
1515
    ops = [
1516
      opcodes.OpTestDummy(result="Res0", fail=False),
1517
      opcodes.OpTestDummy(result="Res1", fail=False,
1518
                          depends=[
1519
                            [prev_job_id, None],
1520
                            ]),
1521
      opcodes.OpTestDummy(result="Res2", fail=False),
1522
      ]
1523

    
1524
    # Create job
1525
    job = self._CreateJob(queue, job_id, ops)
1526

    
1527
    def _BeforeStart(timeout, priority):
1528
      if attempt == 0 or attempt > 5:
1529
        # Job should only be updated when it wasn't waiting for another job
1530
        self.assertEqual(queue.GetNextUpdate(), (job, True))
1531
      self.assertRaises(IndexError, queue.GetNextUpdate)
1532
      self.assertFalse(queue.IsAcquired())
1533
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITING)
1534
      self.assertFalse(job.cur_opctx)
1535

    
1536
    def _AfterStart(op, cbs):
1537
      self.assertEqual(queue.GetNextUpdate(), (job, True))
1538
      self.assertRaises(IndexError, queue.GetNextUpdate)
1539

    
1540
      self.assertFalse(queue.IsAcquired())
1541
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
1542
      self.assertFalse(job.cur_opctx)
1543

    
1544
      # Job is running, cancelling shouldn't be possible
1545
      (success, _) = job.Cancel()
1546
      self.assertFalse(success)
1547

    
1548
    opexec = _FakeExecOpCodeForProc(queue, _BeforeStart, _AfterStart)
1549

    
1550
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1551

    
1552
    counter = itertools.count()
1553
    while True:
1554
      attempt = counter.next()
1555

    
1556
      self.assertRaises(IndexError, queue.GetNextUpdate)
1557
      self.assertRaises(IndexError, depmgr.GetNextNotification)
1558

    
1559
      if attempt == 0:
1560
        # This will handle the first opcode
1561
        pass
1562
      elif attempt < 4:
1563
        depmgr.AddCheckResult(job, prev_job_id, None,
1564
                              (jqueue._JobDependencyManager.WAIT, "wait"))
1565
      elif attempt == 4:
1566
        # Other job failed
1567
        depmgr.AddCheckResult(job, prev_job_id, None,
1568
                              (jqueue._JobDependencyManager.WRONGSTATUS, "w"))
1569

    
1570
      if attempt == 0:
1571
        self.assertEqual(depmgr.CountPendingResults(), 0)
1572
      else:
1573
        self.assertEqual(depmgr.CountPendingResults(), 1)
1574

    
1575
      result = jqueue._JobProcessor(queue, opexec, job)()
1576
      if attempt <= 1 or attempt >= 4:
1577
        # Job should only be updated if there was an actual change
1578
        self.assertEqual(queue.GetNextUpdate(), (job, True))
1579
      self.assertRaises(IndexError, queue.GetNextUpdate)
1580
      self.assertFalse(depmgr.CountPendingResults())
1581

    
1582
      if attempt > 0 and attempt < 4:
1583
        # Simulate waiting for other job
1584
        self.assertEqual(result, jqueue._JobProcessor.WAITDEP)
1585
        self.assertTrue(job.cur_opctx)
1586
        self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITING)
1587
        self.assertRaises(IndexError, depmgr.GetNextNotification)
1588
        self.assert_(job.start_timestamp)
1589
        self.assertFalse(job.end_timestamp)
1590
        continue
1591

    
1592
      if result == jqueue._JobProcessor.FINISHED:
1593
        # Last opcode
1594
        self.assertFalse(job.cur_opctx)
1595
        break
1596

    
1597
      self.assertRaises(IndexError, depmgr.GetNextNotification)
1598

    
1599
      self.assertEqual(result, jqueue._JobProcessor.DEFER)
1600
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1601
      self.assert_(job.start_timestamp)
1602
      self.assertFalse(job.end_timestamp)
1603

    
1604
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_ERROR)
1605
    self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_ERROR])
1606
    self.assertEqual(job.GetInfo(["opstatus"]),
1607
                     [[constants.OP_STATUS_SUCCESS,
1608
                       constants.OP_STATUS_ERROR,
1609
                       constants.OP_STATUS_ERROR]]),
1610

    
1611
    (opresult, ) = job.GetInfo(["opresult"])
1612
    self.assertEqual(len(opresult), len(ops))
1613
    self.assertEqual(opresult[0], "Res0")
1614
    self.assertTrue(errors.GetEncodedError(opresult[1]))
1615
    self.assertTrue(errors.GetEncodedError(opresult[2]))
1616

    
1617
    self._GenericCheckJob(job)
1618

    
1619
    self.assertRaises(IndexError, queue.GetNextUpdate)
1620
    self.assertRaises(IndexError, depmgr.GetNextNotification)
1621
    self.assertFalse(depmgr.CountPendingResults())
1622

    
1623
    # Calling the processor on a finished job should be a no-op
1624
    self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
1625
                     jqueue._JobProcessor.FINISHED)
1626
    self.assertRaises(IndexError, queue.GetNextUpdate)
1627

    
1628

    
1629
class TestEvaluateJobProcessorResult(unittest.TestCase):
1630
  def testFinished(self):
1631
    depmgr = _FakeDependencyManager()
1632
    job = _IdOnlyFakeJob(30953)
1633
    jqueue._EvaluateJobProcessorResult(depmgr, job,
1634
                                       jqueue._JobProcessor.FINISHED)
1635
    self.assertEqual(depmgr.GetNextNotification(), job.id)
1636
    self.assertRaises(IndexError, depmgr.GetNextNotification)
1637

    
1638
  def testDefer(self):
1639
    depmgr = _FakeDependencyManager()
1640
    job = _IdOnlyFakeJob(11326, priority=5463)
1641
    try:
1642
      jqueue._EvaluateJobProcessorResult(depmgr, job,
1643
                                         jqueue._JobProcessor.DEFER)
1644
    except workerpool.DeferTask, err:
1645
      self.assertEqual(err.priority, 5463)
1646
    else:
1647
      self.fail("Didn't raise exception")
1648
    self.assertRaises(IndexError, depmgr.GetNextNotification)
1649

    
1650
  def testWaitdep(self):
1651
    depmgr = _FakeDependencyManager()
1652
    job = _IdOnlyFakeJob(21317)
1653
    jqueue._EvaluateJobProcessorResult(depmgr, job,
1654
                                       jqueue._JobProcessor.WAITDEP)
1655
    self.assertRaises(IndexError, depmgr.GetNextNotification)
1656

    
1657
  def testOther(self):
1658
    depmgr = _FakeDependencyManager()
1659
    job = _IdOnlyFakeJob(5813)
1660
    self.assertRaises(errors.ProgrammerError,
1661
                      jqueue._EvaluateJobProcessorResult,
1662
                      depmgr, job, "Other result")
1663
    self.assertRaises(IndexError, depmgr.GetNextNotification)
1664

    
1665

    
1666
class _FakeTimeoutStrategy:
1667
  def __init__(self, timeouts):
1668
    self.timeouts = timeouts
1669
    self.attempts = 0
1670
    self.last_timeout = None
1671

    
1672
  def NextAttempt(self):
1673
    self.attempts += 1
1674
    if self.timeouts:
1675
      timeout = self.timeouts.pop(0)
1676
    else:
1677
      timeout = None
1678
    self.last_timeout = timeout
1679
    return timeout
1680

    
1681

    
1682
class TestJobProcessorTimeouts(unittest.TestCase, _JobProcessorTestUtils):
1683
  def setUp(self):
1684
    self.queue = _FakeQueueForProc()
1685
    self.job = None
1686
    self.curop = None
1687
    self.opcounter = None
1688
    self.timeout_strategy = None
1689
    self.retries = 0
1690
    self.prev_tsop = None
1691
    self.prev_prio = None
1692
    self.prev_status = None
1693
    self.lock_acq_prio = None
1694
    self.gave_lock = None
1695
    self.done_lock_before_blocking = False
1696

    
1697
  def _BeforeStart(self, timeout, priority):
1698
    job = self.job
1699

    
1700
    # If status has changed, job must've been written
1701
    if self.prev_status != self.job.ops[self.curop].status:
1702
      self.assertEqual(self.queue.GetNextUpdate(), (job, True))
1703
    self.assertRaises(IndexError, self.queue.GetNextUpdate)
1704

    
1705
    self.assertFalse(self.queue.IsAcquired())
1706
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITING)
1707

    
1708
    ts = self.timeout_strategy
1709

    
1710
    self.assert_(timeout is None or isinstance(timeout, (int, float)))
1711
    self.assertEqual(timeout, ts.last_timeout)
1712
    self.assertEqual(priority, job.ops[self.curop].priority)
1713

    
1714
    self.gave_lock = True
1715
    self.lock_acq_prio = priority
1716

    
1717
    if (self.curop == 3 and
1718
        job.ops[self.curop].priority == constants.OP_PRIO_HIGHEST + 3):
1719
      # Give locks before running into blocking acquire
1720
      assert self.retries == 7
1721
      self.retries = 0
1722
      self.done_lock_before_blocking = True
1723
      return
1724

    
1725
    if self.retries > 0:
1726
      self.assert_(timeout is not None)
1727
      self.retries -= 1
1728
      self.gave_lock = False
1729
      raise mcpu.LockAcquireTimeout()
1730

    
1731
    if job.ops[self.curop].priority == constants.OP_PRIO_HIGHEST:
1732
      assert self.retries == 0, "Didn't exhaust all retries at highest priority"
1733
      assert not ts.timeouts
1734
      self.assert_(timeout is None)
1735

    
1736
  def _AfterStart(self, op, cbs):
1737
    job = self.job
1738

    
1739
    # Setting to "running" requires an update
1740
    self.assertEqual(self.queue.GetNextUpdate(), (job, True))
1741
    self.assertRaises(IndexError, self.queue.GetNextUpdate)
1742

    
1743
    self.assertFalse(self.queue.IsAcquired())
1744
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
1745

    
1746
    # Job is running, cancelling shouldn't be possible
1747
    (success, _) = job.Cancel()
1748
    self.assertFalse(success)
1749

    
1750
  def _NextOpcode(self):
1751
    self.curop = self.opcounter.next()
1752
    self.prev_prio = self.job.ops[self.curop].priority
1753
    self.prev_status = self.job.ops[self.curop].status
1754

    
1755
  def _NewTimeoutStrategy(self):
1756
    job = self.job
1757

    
1758
    self.assertEqual(self.retries, 0)
1759

    
1760
    if self.prev_tsop == self.curop:
1761
      # Still on the same opcode, priority must've been increased
1762
      self.assertEqual(self.prev_prio, job.ops[self.curop].priority + 1)
1763

    
1764
    if self.curop == 1:
1765
      # Normal retry
1766
      timeouts = range(10, 31, 10)
1767
      self.retries = len(timeouts) - 1
1768

    
1769
    elif self.curop == 2:
1770
      # Let this run into a blocking acquire
1771
      timeouts = range(11, 61, 12)
1772
      self.retries = len(timeouts)
1773

    
1774
    elif self.curop == 3:
1775
      # Wait for priority to increase, but give lock before blocking acquire
1776
      timeouts = range(12, 100, 14)
1777
      self.retries = len(timeouts)
1778

    
1779
      self.assertFalse(self.done_lock_before_blocking)
1780

    
1781
    elif self.curop == 4:
1782
      self.assert_(self.done_lock_before_blocking)
1783

    
1784
      # Timeouts, but no need to retry
1785
      timeouts = range(10, 31, 10)
1786
      self.retries = 0
1787

    
1788
    elif self.curop == 5:
1789
      # Normal retry
1790
      timeouts = range(19, 100, 11)
1791
      self.retries = len(timeouts)
1792

    
1793
    else:
1794
      timeouts = []
1795
      self.retries = 0
1796

    
1797
    assert len(job.ops) == 10
1798
    assert self.retries <= len(timeouts)
1799

    
1800
    ts = _FakeTimeoutStrategy(timeouts)
1801

    
1802
    self.timeout_strategy = ts
1803
    self.prev_tsop = self.curop
1804
    self.prev_prio = job.ops[self.curop].priority
1805

    
1806
    return ts
1807

    
1808
  def testTimeout(self):
1809
    ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
1810
           for i in range(10)]
1811

    
1812
    # Create job
1813
    job_id = 15801
1814
    job = self._CreateJob(self.queue, job_id, ops)
1815
    self.job = job
1816

    
1817
    self.opcounter = itertools.count(0)
1818

    
1819
    opexec = _FakeExecOpCodeForProc(self.queue, self._BeforeStart,
1820
                                    self._AfterStart)
1821
    tsf = self._NewTimeoutStrategy
1822

    
1823
    self.assertFalse(self.done_lock_before_blocking)
1824

    
1825
    while True:
1826
      proc = jqueue._JobProcessor(self.queue, opexec, job,
1827
                                  _timeout_strategy_factory=tsf)
1828

    
1829
      self.assertRaises(IndexError, self.queue.GetNextUpdate)
1830

    
1831
      if self.curop is not None:
1832
        self.prev_status = self.job.ops[self.curop].status
1833

    
1834
      self.lock_acq_prio = None
1835

    
1836
      result = proc(_nextop_fn=self._NextOpcode)
1837
      assert self.curop is not None
1838

    
1839
      if result == jqueue._JobProcessor.FINISHED or self.gave_lock:
1840
        # Got lock and/or job is done, result must've been written
1841
        self.assertFalse(job.cur_opctx)
1842
        self.assertEqual(self.queue.GetNextUpdate(), (job, True))
1843
        self.assertRaises(IndexError, self.queue.GetNextUpdate)
1844
        self.assertEqual(self.lock_acq_prio, job.ops[self.curop].priority)
1845
        self.assert_(job.ops[self.curop].exec_timestamp)
1846

    
1847
      if result == jqueue._JobProcessor.FINISHED:
1848
        self.assertFalse(job.cur_opctx)
1849
        break
1850

    
1851
      self.assertEqual(result, jqueue._JobProcessor.DEFER)
1852

    
1853
      if self.curop == 0:
1854
        self.assertEqual(job.ops[self.curop].start_timestamp,
1855
                         job.start_timestamp)
1856

    
1857
      if self.gave_lock:
1858
        # Opcode finished, but job not yet done
1859
        self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1860
      else:
1861
        # Did not get locks
1862
        self.assert_(job.cur_opctx)
1863
        self.assertEqual(job.cur_opctx._timeout_strategy._fn,
1864
                         self.timeout_strategy.NextAttempt)
1865
        self.assertFalse(job.ops[self.curop].exec_timestamp)
1866
        self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITING)
1867

    
1868
        # If priority has changed since acquiring locks, the job must've been
1869
        # updated
1870
        if self.lock_acq_prio != job.ops[self.curop].priority:
1871
          self.assertEqual(self.queue.GetNextUpdate(), (job, True))
1872

    
1873
      self.assertRaises(IndexError, self.queue.GetNextUpdate)
1874

    
1875
      self.assert_(job.start_timestamp)
1876
      self.assertFalse(job.end_timestamp)
1877

    
1878
    self.assertEqual(self.curop, len(job.ops) - 1)
1879
    self.assertEqual(self.job, job)
1880
    self.assertEqual(self.opcounter.next(), len(job.ops))
1881
    self.assert_(self.done_lock_before_blocking)
1882

    
1883
    self.assertRaises(IndexError, self.queue.GetNextUpdate)
1884
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_SUCCESS)
1885
    self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_SUCCESS])
1886
    self.assertEqual(job.GetInfo(["opresult"]),
1887
                     [[op.input.result for op in job.ops]])
1888
    self.assertEqual(job.GetInfo(["opstatus"]),
1889
                     [len(job.ops) * [constants.OP_STATUS_SUCCESS]])
1890
    self.assert_(compat.all(op.start_timestamp and op.end_timestamp
1891
                            for op in job.ops))
1892

    
1893
    # Calling the processor on a finished job should be a no-op
1894
    self.assertEqual(jqueue._JobProcessor(self.queue, opexec, job)(),
1895
                     jqueue._JobProcessor.FINISHED)
1896
    self.assertRaises(IndexError, self.queue.GetNextUpdate)
1897

    
1898

    
1899
class _IdOnlyFakeJob:
1900
  def __init__(self, job_id, priority=NotImplemented):
1901
    self.id = str(job_id)
1902
    self._priority = priority
1903

    
1904
  def CalcPriority(self):
1905
    return self._priority
1906

    
1907

    
1908
class TestJobDependencyManager(unittest.TestCase):
1909
  def setUp(self):
1910
    self._status = []
1911
    self._queue = []
1912
    self.jdm = jqueue._JobDependencyManager(self._GetStatus, self._Enqueue)
1913

    
1914
  def _GetStatus(self, job_id):
1915
    (exp_job_id, result) = self._status.pop(0)
1916
    self.assertEqual(exp_job_id, job_id)
1917
    return result
1918

    
1919
  def _Enqueue(self, jobs):
1920
    self.assertFalse(self.jdm._lock.is_owned(),
1921
                     msg=("Must not own manager lock while re-adding jobs"
1922
                          " (potential deadlock)"))
1923
    self._queue.append(jobs)
1924

    
1925
  def testNotFinalizedThenCancel(self):
1926
    job = _IdOnlyFakeJob(17697)
1927
    job_id = str(28625)
1928

    
1929
    self._status.append((job_id, constants.JOB_STATUS_RUNNING))
1930
    (result, _) = self.jdm.CheckAndRegister(job, job_id, [])
1931
    self.assertEqual(result, self.jdm.WAIT)
1932
    self.assertFalse(self._status)
1933
    self.assertFalse(self._queue)
1934
    self.assertTrue(self.jdm.JobWaiting(job))
1935
    self.assertEqual(self.jdm._waiters, {
1936
      job_id: set([job]),
1937
      })
1938
    self.assertEqual(self.jdm.GetLockInfo([query.LQ_PENDING]), [
1939
      ("job/28625", None, None, [("job", [job.id])])
1940
      ])
1941

    
1942
    self._status.append((job_id, constants.JOB_STATUS_CANCELED))
1943
    (result, _) = self.jdm.CheckAndRegister(job, job_id, [])
1944
    self.assertEqual(result, self.jdm.CANCEL)
1945
    self.assertFalse(self._status)
1946
    self.assertFalse(self._queue)
1947
    self.assertFalse(self.jdm.JobWaiting(job))
1948
    self.assertFalse(self.jdm.GetLockInfo([query.LQ_PENDING]))
1949

    
1950
  def testRequireCancel(self):
1951
    job = _IdOnlyFakeJob(5278)
1952
    job_id = str(9610)
1953
    dep_status = [constants.JOB_STATUS_CANCELED]
1954

    
1955
    self._status.append((job_id, constants.JOB_STATUS_WAITING))
1956
    (result, _) = self.jdm.CheckAndRegister(job, job_id, dep_status)
1957
    self.assertEqual(result, self.jdm.WAIT)
1958
    self.assertFalse(self._status)
1959
    self.assertFalse(self._queue)
1960
    self.assertTrue(self.jdm.JobWaiting(job))
1961
    self.assertEqual(self.jdm._waiters, {
1962
      job_id: set([job]),
1963
      })
1964
    self.assertEqual(self.jdm.GetLockInfo([query.LQ_PENDING]), [
1965
      ("job/9610", None, None, [("job", [job.id])])
1966
      ])
1967

    
1968
    self._status.append((job_id, constants.JOB_STATUS_CANCELED))
1969
    (result, _) = self.jdm.CheckAndRegister(job, job_id, dep_status)
1970
    self.assertEqual(result, self.jdm.CONTINUE)
1971
    self.assertFalse(self._status)
1972
    self.assertFalse(self._queue)
1973
    self.assertFalse(self.jdm.JobWaiting(job))
1974
    self.assertFalse(self.jdm.GetLockInfo([query.LQ_PENDING]))
1975

    
1976
  def testRequireError(self):
1977
    job = _IdOnlyFakeJob(21459)
1978
    job_id = str(25519)
1979
    dep_status = [constants.JOB_STATUS_ERROR]
1980

    
1981
    self._status.append((job_id, constants.JOB_STATUS_WAITING))
1982
    (result, _) = self.jdm.CheckAndRegister(job, job_id, dep_status)
1983
    self.assertEqual(result, self.jdm.WAIT)
1984
    self.assertFalse(self._status)
1985
    self.assertFalse(self._queue)
1986
    self.assertTrue(self.jdm.JobWaiting(job))
1987
    self.assertEqual(self.jdm._waiters, {
1988
      job_id: set([job]),
1989
      })
1990

    
1991
    self._status.append((job_id, constants.JOB_STATUS_ERROR))
1992
    (result, _) = self.jdm.CheckAndRegister(job, job_id, dep_status)
1993
    self.assertEqual(result, self.jdm.CONTINUE)
1994
    self.assertFalse(self._status)
1995
    self.assertFalse(self._queue)
1996
    self.assertFalse(self.jdm.JobWaiting(job))
1997
    self.assertFalse(self.jdm.GetLockInfo([query.LQ_PENDING]))
1998

    
1999
  def testRequireMultiple(self):
2000
    dep_status = list(constants.JOBS_FINALIZED)
2001

    
2002
    for end_status in dep_status:
2003
      job = _IdOnlyFakeJob(21343)
2004
      job_id = str(14609)
2005

    
2006
      self._status.append((job_id, constants.JOB_STATUS_WAITING))
2007
      (result, _) = self.jdm.CheckAndRegister(job, job_id, dep_status)
2008
      self.assertEqual(result, self.jdm.WAIT)
2009
      self.assertFalse(self._status)
2010
      self.assertFalse(self._queue)
2011
      self.assertTrue(self.jdm.JobWaiting(job))
2012
      self.assertEqual(self.jdm._waiters, {
2013
        job_id: set([job]),
2014
        })
2015
      self.assertEqual(self.jdm.GetLockInfo([query.LQ_PENDING]), [
2016
        ("job/14609", None, None, [("job", [job.id])])
2017
        ])
2018

    
2019
      self._status.append((job_id, end_status))
2020
      (result, _) = self.jdm.CheckAndRegister(job, job_id, dep_status)
2021
      self.assertEqual(result, self.jdm.CONTINUE)
2022
      self.assertFalse(self._status)
2023
      self.assertFalse(self._queue)
2024
      self.assertFalse(self.jdm.JobWaiting(job))
2025
      self.assertFalse(self.jdm.GetLockInfo([query.LQ_PENDING]))
2026

    
2027
  def testNotify(self):
2028
    job = _IdOnlyFakeJob(8227)
2029
    job_id = str(4113)
2030

    
2031
    self._status.append((job_id, constants.JOB_STATUS_RUNNING))
2032
    (result, _) = self.jdm.CheckAndRegister(job, job_id, [])
2033
    self.assertEqual(result, self.jdm.WAIT)
2034
    self.assertFalse(self._status)
2035
    self.assertFalse(self._queue)
2036
    self.assertTrue(self.jdm.JobWaiting(job))
2037
    self.assertEqual(self.jdm._waiters, {
2038
      job_id: set([job]),
2039
      })
2040

    
2041
    self.jdm.NotifyWaiters(job_id)
2042
    self.assertFalse(self._status)
2043
    self.assertFalse(self.jdm._waiters)
2044
    self.assertFalse(self.jdm.JobWaiting(job))
2045
    self.assertEqual(self._queue, [set([job])])
2046

    
2047
  def testWrongStatus(self):
2048
    job = _IdOnlyFakeJob(10102)
2049
    job_id = str(1271)
2050

    
2051
    self._status.append((job_id, constants.JOB_STATUS_QUEUED))
2052
    (result, _) = self.jdm.CheckAndRegister(job, job_id,
2053
                                            [constants.JOB_STATUS_SUCCESS])
2054
    self.assertEqual(result, self.jdm.WAIT)
2055
    self.assertFalse(self._status)
2056
    self.assertFalse(self._queue)
2057
    self.assertTrue(self.jdm.JobWaiting(job))
2058
    self.assertEqual(self.jdm._waiters, {
2059
      job_id: set([job]),
2060
      })
2061

    
2062
    self._status.append((job_id, constants.JOB_STATUS_ERROR))
2063
    (result, _) = self.jdm.CheckAndRegister(job, job_id,
2064
                                            [constants.JOB_STATUS_SUCCESS])
2065
    self.assertEqual(result, self.jdm.WRONGSTATUS)
2066
    self.assertFalse(self._status)
2067
    self.assertFalse(self._queue)
2068
    self.assertFalse(self.jdm.JobWaiting(job))
2069

    
2070
  def testCorrectStatus(self):
2071
    job = _IdOnlyFakeJob(24273)
2072
    job_id = str(23885)
2073

    
2074
    self._status.append((job_id, constants.JOB_STATUS_QUEUED))
2075
    (result, _) = self.jdm.CheckAndRegister(job, job_id,
2076
                                            [constants.JOB_STATUS_SUCCESS])
2077
    self.assertEqual(result, self.jdm.WAIT)
2078
    self.assertFalse(self._status)
2079
    self.assertFalse(self._queue)
2080
    self.assertTrue(self.jdm.JobWaiting(job))
2081
    self.assertEqual(self.jdm._waiters, {
2082
      job_id: set([job]),
2083
      })
2084

    
2085
    self._status.append((job_id, constants.JOB_STATUS_SUCCESS))
2086
    (result, _) = self.jdm.CheckAndRegister(job, job_id,
2087
                                            [constants.JOB_STATUS_SUCCESS])
2088
    self.assertEqual(result, self.jdm.CONTINUE)
2089
    self.assertFalse(self._status)
2090
    self.assertFalse(self._queue)
2091
    self.assertFalse(self.jdm.JobWaiting(job))
2092

    
2093
  def testFinalizedRightAway(self):
2094
    job = _IdOnlyFakeJob(224)
2095
    job_id = str(3081)
2096

    
2097
    self._status.append((job_id, constants.JOB_STATUS_SUCCESS))
2098
    (result, _) = self.jdm.CheckAndRegister(job, job_id,
2099
                                            [constants.JOB_STATUS_SUCCESS])
2100
    self.assertEqual(result, self.jdm.CONTINUE)
2101
    self.assertFalse(self._status)
2102
    self.assertFalse(self._queue)
2103
    self.assertFalse(self.jdm.JobWaiting(job))
2104
    self.assertEqual(self.jdm._waiters, {
2105
      job_id: set(),
2106
      })
2107

    
2108
    # Force cleanup
2109
    self.jdm.NotifyWaiters("0")
2110
    self.assertFalse(self.jdm._waiters)
2111
    self.assertFalse(self._status)
2112
    self.assertFalse(self._queue)
2113

    
2114
  def testMultipleWaiting(self):
2115
    # Use a deterministic random generator
2116
    rnd = random.Random(21402)
2117

    
2118
    job_ids = map(str, rnd.sample(range(1, 10000), 150))
2119

    
2120
    waiters = dict((job_ids.pop(),
2121
                    set(map(_IdOnlyFakeJob,
2122
                            [job_ids.pop()
2123
                             for _ in range(rnd.randint(1, 20))])))
2124
                   for _ in range(10))
2125

    
2126
    # Ensure there are no duplicate job IDs
2127
    assert not utils.FindDuplicates(waiters.keys() +
2128
                                    [job.id
2129
                                     for jobs in waiters.values()
2130
                                     for job in jobs])
2131

    
2132
    # Register all jobs as waiters
2133
    for job_id, job in [(job_id, job)
2134
                        for (job_id, jobs) in waiters.items()
2135
                        for job in jobs]:
2136
      self._status.append((job_id, constants.JOB_STATUS_QUEUED))
2137
      (result, _) = self.jdm.CheckAndRegister(job, job_id,
2138
                                              [constants.JOB_STATUS_SUCCESS])
2139
      self.assertEqual(result, self.jdm.WAIT)
2140
      self.assertFalse(self._status)
2141
      self.assertFalse(self._queue)
2142
      self.assertTrue(self.jdm.JobWaiting(job))
2143

    
2144
    self.assertEqual(self.jdm._waiters, waiters)
2145

    
2146
    def _MakeSet((name, mode, owner_names, pending)):
2147
      return (name, mode, owner_names,
2148
              [(pendmode, set(pend)) for (pendmode, pend) in pending])
2149

    
2150
    def _CheckLockInfo():
2151
      info = self.jdm.GetLockInfo([query.LQ_PENDING])
2152
      self.assertEqual(sorted(map(_MakeSet, info)), sorted([
2153
        ("job/%s" % job_id, None, None,
2154
         [("job", set([job.id for job in jobs]))])
2155
        for job_id, jobs in waiters.items()
2156
        if jobs
2157
        ]))
2158

    
2159
    _CheckLockInfo()
2160

    
2161
    # Notify in random order
2162
    for job_id in rnd.sample(waiters, len(waiters)):
2163
      # Remove from pending waiter list
2164
      jobs = waiters.pop(job_id)
2165
      for job in jobs:
2166
        self._status.append((job_id, constants.JOB_STATUS_SUCCESS))
2167
        (result, _) = self.jdm.CheckAndRegister(job, job_id,
2168
                                                [constants.JOB_STATUS_SUCCESS])
2169
        self.assertEqual(result, self.jdm.CONTINUE)
2170
        self.assertFalse(self._status)
2171
        self.assertFalse(self._queue)
2172
        self.assertFalse(self.jdm.JobWaiting(job))
2173

    
2174
      _CheckLockInfo()
2175

    
2176
    self.assertFalse(self.jdm.GetLockInfo([query.LQ_PENDING]))
2177

    
2178
    assert not waiters
2179

    
2180
  def testSelfDependency(self):
2181
    job = _IdOnlyFakeJob(18937)
2182

    
2183
    self._status.append((job.id, constants.JOB_STATUS_SUCCESS))
2184
    (result, _) = self.jdm.CheckAndRegister(job, job.id, [])
2185
    self.assertEqual(result, self.jdm.ERROR)
2186

    
2187
  def testJobDisappears(self):
2188
    job = _IdOnlyFakeJob(30540)
2189
    job_id = str(23769)
2190

    
2191
    def _FakeStatus(_):
2192
      raise errors.JobLost("#msg#")
2193

    
2194
    jdm = jqueue._JobDependencyManager(_FakeStatus, None)
2195
    (result, _) = jdm.CheckAndRegister(job, job_id, [])
2196
    self.assertEqual(result, self.jdm.ERROR)
2197
    self.assertFalse(jdm.JobWaiting(job))
2198
    self.assertFalse(jdm.GetLockInfo([query.LQ_PENDING]))
2199

    
2200

    
2201
if __name__ == "__main__":
2202
  testutils.GanetiTestProgram()