Statistics
| Branch: | Tag: | Revision:

root / test / ganeti.jqueue_unittest.py @ 5fd6b694

History | View | Annotate | Download (41.5 kB)

1
#!/usr/bin/python
2
#
3

    
4
# Copyright (C) 2010 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Script for testing ganeti.jqueue"""
23

    
24
import os
25
import sys
26
import unittest
27
import tempfile
28
import shutil
29
import errno
30
import itertools
31

    
32
from ganeti import constants
33
from ganeti import utils
34
from ganeti import errors
35
from ganeti import jqueue
36
from ganeti import opcodes
37
from ganeti import compat
38
from ganeti import mcpu
39

    
40
import testutils
41

    
42

    
43
class _FakeJob:
44
  def __init__(self, job_id, status):
45
    self.id = job_id
46
    self._status = status
47
    self._log = []
48

    
49
  def SetStatus(self, status):
50
    self._status = status
51

    
52
  def AddLogEntry(self, msg):
53
    self._log.append((len(self._log), msg))
54

    
55
  def CalcStatus(self):
56
    return self._status
57

    
58
  def GetInfo(self, fields):
59
    result = []
60

    
61
    for name in fields:
62
      if name == "status":
63
        result.append(self._status)
64
      else:
65
        raise Exception("Unknown field")
66

    
67
    return result
68

    
69
  def GetLogEntries(self, newer_than):
70
    assert newer_than is None or newer_than >= 0
71

    
72
    if newer_than is None:
73
      return self._log
74

    
75
    return self._log[newer_than:]
76

    
77

    
78
class TestJobChangesChecker(unittest.TestCase):
79
  def testStatus(self):
80
    job = _FakeJob(9094, constants.JOB_STATUS_QUEUED)
81
    checker = jqueue._JobChangesChecker(["status"], None, None)
82
    self.assertEqual(checker(job), ([constants.JOB_STATUS_QUEUED], []))
83

    
84
    job.SetStatus(constants.JOB_STATUS_RUNNING)
85
    self.assertEqual(checker(job), ([constants.JOB_STATUS_RUNNING], []))
86

    
87
    job.SetStatus(constants.JOB_STATUS_SUCCESS)
88
    self.assertEqual(checker(job), ([constants.JOB_STATUS_SUCCESS], []))
89

    
90
    # job.id is used by checker
91
    self.assertEqual(job.id, 9094)
92

    
93
  def testStatusWithPrev(self):
94
    job = _FakeJob(12807, constants.JOB_STATUS_QUEUED)
95
    checker = jqueue._JobChangesChecker(["status"],
96
                                        [constants.JOB_STATUS_QUEUED], None)
97
    self.assert_(checker(job) is None)
98

    
99
    job.SetStatus(constants.JOB_STATUS_RUNNING)
100
    self.assertEqual(checker(job), ([constants.JOB_STATUS_RUNNING], []))
101

    
102
  def testFinalStatus(self):
103
    for status in constants.JOBS_FINALIZED:
104
      job = _FakeJob(2178711, status)
105
      checker = jqueue._JobChangesChecker(["status"], [status], None)
106
      # There won't be any changes in this status, hence it should signal
107
      # a change immediately
108
      self.assertEqual(checker(job), ([status], []))
109

    
110
  def testLog(self):
111
    job = _FakeJob(9094, constants.JOB_STATUS_RUNNING)
112
    checker = jqueue._JobChangesChecker(["status"], None, None)
113
    self.assertEqual(checker(job), ([constants.JOB_STATUS_RUNNING], []))
114

    
115
    job.AddLogEntry("Hello World")
116
    (job_info, log_entries) = checker(job)
117
    self.assertEqual(job_info, [constants.JOB_STATUS_RUNNING])
118
    self.assertEqual(log_entries, [[0, "Hello World"]])
119

    
120
    checker2 = jqueue._JobChangesChecker(["status"], job_info, len(log_entries))
121
    self.assert_(checker2(job) is None)
122

    
123
    job.AddLogEntry("Foo Bar")
124
    job.SetStatus(constants.JOB_STATUS_ERROR)
125

    
126
    (job_info, log_entries) = checker2(job)
127
    self.assertEqual(job_info, [constants.JOB_STATUS_ERROR])
128
    self.assertEqual(log_entries, [[1, "Foo Bar"]])
129

    
130
    checker3 = jqueue._JobChangesChecker(["status"], None, None)
131
    (job_info, log_entries) = checker3(job)
132
    self.assertEqual(job_info, [constants.JOB_STATUS_ERROR])
133
    self.assertEqual(log_entries, [[0, "Hello World"], [1, "Foo Bar"]])
134

    
135

    
136
class TestJobChangesWaiter(unittest.TestCase):
137
  def setUp(self):
138
    self.tmpdir = tempfile.mkdtemp()
139
    self.filename = utils.PathJoin(self.tmpdir, "job-1")
140
    utils.WriteFile(self.filename, data="")
141

    
142
  def tearDown(self):
143
    shutil.rmtree(self.tmpdir)
144

    
145
  def _EnsureNotifierClosed(self, notifier):
146
    try:
147
      os.fstat(notifier._fd)
148
    except EnvironmentError, err:
149
      self.assertEqual(err.errno, errno.EBADF)
150
    else:
151
      self.fail("File descriptor wasn't closed")
152

    
153
  def testClose(self):
154
    for wait in [False, True]:
155
      waiter = jqueue._JobFileChangesWaiter(self.filename)
156
      try:
157
        if wait:
158
          waiter.Wait(0.001)
159
      finally:
160
        waiter.Close()
161

    
162
      # Ensure file descriptor was closed
163
      self._EnsureNotifierClosed(waiter._notifier)
164

    
165
  def testChangingFile(self):
166
    waiter = jqueue._JobFileChangesWaiter(self.filename)
167
    try:
168
      self.assertFalse(waiter.Wait(0.1))
169
      utils.WriteFile(self.filename, data="changed")
170
      self.assert_(waiter.Wait(60))
171
    finally:
172
      waiter.Close()
173

    
174
    self._EnsureNotifierClosed(waiter._notifier)
175

    
176
  def testChangingFile2(self):
177
    waiter = jqueue._JobChangesWaiter(self.filename)
178
    try:
179
      self.assertFalse(waiter._filewaiter)
180
      self.assert_(waiter.Wait(0.1))
181
      self.assert_(waiter._filewaiter)
182

    
183
      # File waiter is now used, but there have been no changes
184
      self.assertFalse(waiter.Wait(0.1))
185
      utils.WriteFile(self.filename, data="changed")
186
      self.assert_(waiter.Wait(60))
187
    finally:
188
      waiter.Close()
189

    
190
    self._EnsureNotifierClosed(waiter._filewaiter._notifier)
191

    
192

    
193
class TestWaitForJobChangesHelper(unittest.TestCase):
194
  def setUp(self):
195
    self.tmpdir = tempfile.mkdtemp()
196
    self.filename = utils.PathJoin(self.tmpdir, "job-2614226563")
197
    utils.WriteFile(self.filename, data="")
198

    
199
  def tearDown(self):
200
    shutil.rmtree(self.tmpdir)
201

    
202
  def _LoadWaitingJob(self):
203
    return _FakeJob(2614226563, constants.JOB_STATUS_WAITLOCK)
204

    
205
  def _LoadLostJob(self):
206
    return None
207

    
208
  def testNoChanges(self):
209
    wfjc = jqueue._WaitForJobChangesHelper()
210

    
211
    # No change
212
    self.assertEqual(wfjc(self.filename, self._LoadWaitingJob, ["status"],
213
                          [constants.JOB_STATUS_WAITLOCK], None, 0.1),
214
                     constants.JOB_NOTCHANGED)
215

    
216
    # No previous information
217
    self.assertEqual(wfjc(self.filename, self._LoadWaitingJob,
218
                          ["status"], None, None, 1.0),
219
                     ([constants.JOB_STATUS_WAITLOCK], []))
220

    
221
  def testLostJob(self):
222
    wfjc = jqueue._WaitForJobChangesHelper()
223
    self.assert_(wfjc(self.filename, self._LoadLostJob,
224
                      ["status"], None, None, 1.0) is None)
225

    
226

    
227
class TestEncodeOpError(unittest.TestCase):
228
  def test(self):
229
    encerr = jqueue._EncodeOpError(errors.LockError("Test 1"))
230
    self.assert_(isinstance(encerr, tuple))
231
    self.assertRaises(errors.LockError, errors.MaybeRaise, encerr)
232

    
233
    encerr = jqueue._EncodeOpError(errors.GenericError("Test 2"))
234
    self.assert_(isinstance(encerr, tuple))
235
    self.assertRaises(errors.GenericError, errors.MaybeRaise, encerr)
236

    
237
    encerr = jqueue._EncodeOpError(NotImplementedError("Foo"))
238
    self.assert_(isinstance(encerr, tuple))
239
    self.assertRaises(errors.OpExecError, errors.MaybeRaise, encerr)
240

    
241
    encerr = jqueue._EncodeOpError("Hello World")
242
    self.assert_(isinstance(encerr, tuple))
243
    self.assertRaises(errors.OpExecError, errors.MaybeRaise, encerr)
244

    
245

    
246
class TestQueuedOpCode(unittest.TestCase):
247
  def testDefaults(self):
248
    def _Check(op):
249
      self.assertFalse(hasattr(op.input, "dry_run"))
250
      self.assertEqual(op.priority, constants.OP_PRIO_DEFAULT)
251
      self.assertFalse(op.log)
252
      self.assert_(op.start_timestamp is None)
253
      self.assert_(op.exec_timestamp is None)
254
      self.assert_(op.end_timestamp is None)
255
      self.assert_(op.result is None)
256
      self.assertEqual(op.status, constants.OP_STATUS_QUEUED)
257

    
258
    op1 = jqueue._QueuedOpCode(opcodes.OpTestDelay())
259
    _Check(op1)
260
    op2 = jqueue._QueuedOpCode.Restore(op1.Serialize())
261
    _Check(op2)
262
    self.assertEqual(op1.Serialize(), op2.Serialize())
263

    
264
  def testPriority(self):
265
    def _Check(op):
266
      assert constants.OP_PRIO_DEFAULT != constants.OP_PRIO_HIGH, \
267
             "Default priority equals high priority; test can't work"
268
      self.assertEqual(op.priority, constants.OP_PRIO_HIGH)
269
      self.assertEqual(op.status, constants.OP_STATUS_QUEUED)
270

    
271
    inpop = opcodes.OpGetTags(priority=constants.OP_PRIO_HIGH)
272
    op1 = jqueue._QueuedOpCode(inpop)
273
    _Check(op1)
274
    op2 = jqueue._QueuedOpCode.Restore(op1.Serialize())
275
    _Check(op2)
276
    self.assertEqual(op1.Serialize(), op2.Serialize())
277

    
278

    
279
class TestQueuedJob(unittest.TestCase):
280
  def test(self):
281
    self.assertRaises(errors.GenericError, jqueue._QueuedJob,
282
                      None, 1, [])
283

    
284
  def testDefaults(self):
285
    job_id = 4260
286
    ops = [
287
      opcodes.OpGetTags(),
288
      opcodes.OpTestDelay(),
289
      ]
290

    
291
    def _Check(job):
292
      self.assertEqual(job.id, job_id)
293
      self.assertEqual(job.log_serial, 0)
294
      self.assert_(job.received_timestamp)
295
      self.assert_(job.start_timestamp is None)
296
      self.assert_(job.end_timestamp is None)
297
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
298
      self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT)
299
      self.assert_(repr(job).startswith("<"))
300
      self.assertEqual(len(job.ops), len(ops))
301
      self.assert_(compat.all(inp.__getstate__() == op.input.__getstate__()
302
                              for (inp, op) in zip(ops, job.ops)))
303
      self.assertRaises(errors.OpExecError, job.GetInfo,
304
                        ["unknown-field"])
305
      self.assertEqual(job.GetInfo(["summary"]),
306
                       [[op.input.Summary() for op in job.ops]])
307

    
308
    job1 = jqueue._QueuedJob(None, job_id, ops)
309
    _Check(job1)
310
    job2 = jqueue._QueuedJob.Restore(None, job1.Serialize())
311
    _Check(job2)
312
    self.assertEqual(job1.Serialize(), job2.Serialize())
313

    
314
  def testPriority(self):
315
    job_id = 4283
316
    ops = [
317
      opcodes.OpGetTags(priority=constants.OP_PRIO_DEFAULT),
318
      opcodes.OpTestDelay(),
319
      ]
320

    
321
    def _Check(job):
322
      self.assertEqual(job.id, job_id)
323
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
324
      self.assert_(repr(job).startswith("<"))
325

    
326
    job = jqueue._QueuedJob(None, job_id, ops)
327
    _Check(job)
328
    self.assert_(compat.all(op.priority == constants.OP_PRIO_DEFAULT
329
                            for op in job.ops))
330
    self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT)
331

    
332
    # Increase first
333
    job.ops[0].priority -= 1
334
    _Check(job)
335
    self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT - 1)
336

    
337
    # Mark opcode as finished
338
    job.ops[0].status = constants.OP_STATUS_SUCCESS
339
    _Check(job)
340
    self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT)
341

    
342
    # Increase second
343
    job.ops[1].priority -= 10
344
    self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT - 10)
345

    
346
    # Test increasing first
347
    job.ops[0].status = constants.OP_STATUS_RUNNING
348
    job.ops[0].priority -= 19
349
    self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT - 20)
350

    
351
  def testCalcStatus(self):
352
    def _Queued(ops):
353
      # The default status is "queued"
354
      self.assert_(compat.all(op.status == constants.OP_STATUS_QUEUED
355
                              for op in ops))
356

    
357
    def _Waitlock1(ops):
358
      ops[0].status = constants.OP_STATUS_WAITLOCK
359

    
360
    def _Waitlock2(ops):
361
      ops[0].status = constants.OP_STATUS_SUCCESS
362
      ops[1].status = constants.OP_STATUS_SUCCESS
363
      ops[2].status = constants.OP_STATUS_WAITLOCK
364

    
365
    def _Running(ops):
366
      ops[0].status = constants.OP_STATUS_SUCCESS
367
      ops[1].status = constants.OP_STATUS_RUNNING
368
      for op in ops[2:]:
369
        op.status = constants.OP_STATUS_QUEUED
370

    
371
    def _Canceling1(ops):
372
      ops[0].status = constants.OP_STATUS_SUCCESS
373
      ops[1].status = constants.OP_STATUS_SUCCESS
374
      for op in ops[2:]:
375
        op.status = constants.OP_STATUS_CANCELING
376

    
377
    def _Canceling2(ops):
378
      for op in ops:
379
        op.status = constants.OP_STATUS_CANCELING
380

    
381
    def _Canceled(ops):
382
      for op in ops:
383
        op.status = constants.OP_STATUS_CANCELED
384

    
385
    def _Error1(ops):
386
      for idx, op in enumerate(ops):
387
        if idx > 3:
388
          op.status = constants.OP_STATUS_ERROR
389
        else:
390
          op.status = constants.OP_STATUS_SUCCESS
391

    
392
    def _Error2(ops):
393
      for op in ops:
394
        op.status = constants.OP_STATUS_ERROR
395

    
396
    def _Success(ops):
397
      for op in ops:
398
        op.status = constants.OP_STATUS_SUCCESS
399

    
400
    tests = {
401
      constants.JOB_STATUS_QUEUED: [_Queued],
402
      constants.JOB_STATUS_WAITLOCK: [_Waitlock1, _Waitlock2],
403
      constants.JOB_STATUS_RUNNING: [_Running],
404
      constants.JOB_STATUS_CANCELING: [_Canceling1, _Canceling2],
405
      constants.JOB_STATUS_CANCELED: [_Canceled],
406
      constants.JOB_STATUS_ERROR: [_Error1, _Error2],
407
      constants.JOB_STATUS_SUCCESS: [_Success],
408
      }
409

    
410
    def _NewJob():
411
      job = jqueue._QueuedJob(None, 1,
412
                              [opcodes.OpTestDelay() for _ in range(10)])
413
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
414
      self.assert_(compat.all(op.status == constants.OP_STATUS_QUEUED
415
                              for op in job.ops))
416
      return job
417

    
418
    for status in constants.JOB_STATUS_ALL:
419
      sttests = tests[status]
420
      assert sttests
421
      for fn in sttests:
422
        job = _NewJob()
423
        fn(job.ops)
424
        self.assertEqual(job.CalcStatus(), status)
425

    
426

    
427
class _FakeQueueForProc:
428
  def __init__(self):
429
    self._acquired = False
430
    self._updates = []
431

    
432
  def IsAcquired(self):
433
    return self._acquired
434

    
435
  def GetNextUpdate(self):
436
    return self._updates.pop(0)
437

    
438
  def acquire(self, shared=0):
439
    assert shared == 1
440
    self._acquired = True
441

    
442
  def release(self):
443
    assert self._acquired
444
    self._acquired = False
445

    
446
  def UpdateJobUnlocked(self, job, replicate=True):
447
    assert self._acquired, "Lock not acquired while updating job"
448
    self._updates.append((job, bool(replicate)))
449

    
450

    
451
class _FakeExecOpCodeForProc:
452
  def __init__(self, queue, before_start, after_start):
453
    self._queue = queue
454
    self._before_start = before_start
455
    self._after_start = after_start
456

    
457
  def __call__(self, op, cbs, timeout=None, priority=None):
458
    assert isinstance(op, opcodes.OpTestDummy)
459
    assert not self._queue.IsAcquired(), \
460
           "Queue lock not released when executing opcode"
461

    
462
    if self._before_start:
463
      self._before_start(timeout, priority)
464

    
465
    cbs.NotifyStart()
466

    
467
    if self._after_start:
468
      self._after_start(op, cbs)
469

    
470
    # Check again after the callbacks
471
    assert not self._queue.IsAcquired()
472

    
473
    if op.fail:
474
      raise errors.OpExecError("Error requested (%s)" % op.result)
475

    
476
    return op.result
477

    
478

    
479
class _JobProcessorTestUtils:
480
  def _CreateJob(self, queue, job_id, ops):
481
    job = jqueue._QueuedJob(queue, job_id, ops)
482
    self.assertFalse(job.start_timestamp)
483
    self.assertFalse(job.end_timestamp)
484
    self.assertEqual(len(ops), len(job.ops))
485
    self.assert_(compat.all(op.input == inp
486
                            for (op, inp) in zip(job.ops, ops)))
487
    self.assertEqual(job.GetInfo(["ops"]), [[op.__getstate__() for op in ops]])
488
    return job
489

    
490

    
491
class TestJobProcessor(unittest.TestCase, _JobProcessorTestUtils):
492
  def _GenericCheckJob(self, job):
493
    assert compat.all(isinstance(op.input, opcodes.OpTestDummy)
494
                      for op in job.ops)
495

    
496
    self.assertEqual(job.GetInfo(["opstart", "opexec", "opend"]),
497
                     [[op.start_timestamp for op in job.ops],
498
                      [op.exec_timestamp for op in job.ops],
499
                      [op.end_timestamp for op in job.ops]])
500
    self.assertEqual(job.GetInfo(["received_ts", "start_ts", "end_ts"]),
501
                     [job.received_timestamp,
502
                      job.start_timestamp,
503
                      job.end_timestamp])
504
    self.assert_(job.start_timestamp)
505
    self.assert_(job.end_timestamp)
506
    self.assertEqual(job.start_timestamp, job.ops[0].start_timestamp)
507

    
508
  def testSuccess(self):
509
    queue = _FakeQueueForProc()
510

    
511
    for (job_id, opcount) in [(25351, 1), (6637, 3),
512
                              (24644, 10), (32207, 100)]:
513
      ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
514
             for i in range(opcount)]
515

    
516
      # Create job
517
      job = self._CreateJob(queue, job_id, ops)
518

    
519
      def _BeforeStart(timeout, priority):
520
        self.assertEqual(queue.GetNextUpdate(), (job, True))
521
        self.assertRaises(IndexError, queue.GetNextUpdate)
522
        self.assertFalse(queue.IsAcquired())
523
        self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITLOCK)
524
        self.assertFalse(job.cur_opctx)
525

    
526
      def _AfterStart(op, cbs):
527
        self.assertEqual(queue.GetNextUpdate(), (job, True))
528
        self.assertRaises(IndexError, queue.GetNextUpdate)
529

    
530
        self.assertFalse(queue.IsAcquired())
531
        self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
532
        self.assertFalse(job.cur_opctx)
533

    
534
        # Job is running, cancelling shouldn't be possible
535
        (success, _) = job.Cancel()
536
        self.assertFalse(success)
537

    
538
      opexec = _FakeExecOpCodeForProc(queue, _BeforeStart, _AfterStart)
539

    
540
      for idx in range(len(ops)):
541
        self.assertRaises(IndexError, queue.GetNextUpdate)
542
        result = jqueue._JobProcessor(queue, opexec, job)()
543
        self.assertEqual(queue.GetNextUpdate(), (job, True))
544
        self.assertRaises(IndexError, queue.GetNextUpdate)
545
        if idx == len(ops) - 1:
546
          # Last opcode
547
          self.assert_(result)
548
        else:
549
          self.assertFalse(result)
550

    
551
          self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
552
          self.assert_(job.start_timestamp)
553
          self.assertFalse(job.end_timestamp)
554

    
555
      self.assertRaises(IndexError, queue.GetNextUpdate)
556

    
557
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_SUCCESS)
558
      self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_SUCCESS])
559
      self.assertEqual(job.GetInfo(["opresult"]),
560
                       [[op.input.result for op in job.ops]])
561
      self.assertEqual(job.GetInfo(["opstatus"]),
562
                       [len(job.ops) * [constants.OP_STATUS_SUCCESS]])
563
      self.assert_(compat.all(op.start_timestamp and op.end_timestamp
564
                              for op in job.ops))
565

    
566
      self._GenericCheckJob(job)
567

    
568
      # Finished jobs can't be processed any further
569
      self.assertRaises(errors.ProgrammerError,
570
                        jqueue._JobProcessor(queue, opexec, job))
571

    
572
  def testOpcodeError(self):
573
    queue = _FakeQueueForProc()
574

    
575
    testdata = [
576
      (17077, 1, 0, 0),
577
      (1782, 5, 2, 2),
578
      (18179, 10, 9, 9),
579
      (4744, 10, 3, 8),
580
      (23816, 100, 39, 45),
581
      ]
582

    
583
    for (job_id, opcount, failfrom, failto) in testdata:
584
      # Prepare opcodes
585
      ops = [opcodes.OpTestDummy(result="Res%s" % i,
586
                                 fail=(failfrom <= i and
587
                                       i <= failto))
588
             for i in range(opcount)]
589

    
590
      # Create job
591
      job = self._CreateJob(queue, job_id, ops)
592

    
593
      opexec = _FakeExecOpCodeForProc(queue, None, None)
594

    
595
      for idx in range(len(ops)):
596
        self.assertRaises(IndexError, queue.GetNextUpdate)
597
        result = jqueue._JobProcessor(queue, opexec, job)()
598
        # queued to waitlock
599
        self.assertEqual(queue.GetNextUpdate(), (job, True))
600
        # waitlock to running
601
        self.assertEqual(queue.GetNextUpdate(), (job, True))
602
        # Opcode result
603
        self.assertEqual(queue.GetNextUpdate(), (job, True))
604
        self.assertRaises(IndexError, queue.GetNextUpdate)
605

    
606
        if idx in (failfrom, len(ops) - 1):
607
          # Last opcode
608
          self.assert_(result)
609
          break
610

    
611
        self.assertFalse(result)
612

    
613
        self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
614

    
615
      self.assertRaises(IndexError, queue.GetNextUpdate)
616

    
617
      # Check job status
618
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_ERROR)
619
      self.assertEqual(job.GetInfo(["id"]), [job_id])
620
      self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_ERROR])
621

    
622
      # Check opcode status
623
      data = zip(job.ops,
624
                 job.GetInfo(["opstatus"])[0],
625
                 job.GetInfo(["opresult"])[0])
626

    
627
      for idx, (op, opstatus, opresult) in enumerate(data):
628
        if idx < failfrom:
629
          assert not op.input.fail
630
          self.assertEqual(opstatus, constants.OP_STATUS_SUCCESS)
631
          self.assertEqual(opresult, op.input.result)
632
        elif idx <= failto:
633
          assert op.input.fail
634
          self.assertEqual(opstatus, constants.OP_STATUS_ERROR)
635
          self.assertRaises(errors.OpExecError, errors.MaybeRaise, opresult)
636
        else:
637
          assert not op.input.fail
638
          self.assertEqual(opstatus, constants.OP_STATUS_ERROR)
639
          self.assertRaises(errors.OpExecError, errors.MaybeRaise, opresult)
640

    
641
      self.assert_(compat.all(op.start_timestamp and op.end_timestamp
642
                              for op in job.ops[:failfrom]))
643

    
644
      self._GenericCheckJob(job)
645

    
646
      # Finished jobs can't be processed any further
647
      self.assertRaises(errors.ProgrammerError,
648
                        jqueue._JobProcessor(queue, opexec, job))
649

    
650
  def testCancelWhileInQueue(self):
651
    queue = _FakeQueueForProc()
652

    
653
    ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
654
           for i in range(5)]
655

    
656
    # Create job
657
    job_id = 17045
658
    job = self._CreateJob(queue, job_id, ops)
659

    
660
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
661

    
662
    # Mark as cancelled
663
    (success, _) = job.Cancel()
664
    self.assert_(success)
665

    
666
    self.assertRaises(IndexError, queue.GetNextUpdate)
667

    
668
    self.assert_(compat.all(op.status == constants.OP_STATUS_CANCELED
669
                            for op in job.ops))
670

    
671
    opexec = _FakeExecOpCodeForProc(queue, None, None)
672
    jqueue._JobProcessor(queue, opexec, job)()
673

    
674
    # Check result
675
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_CANCELED)
676
    self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_CANCELED])
677
    self.assertFalse(job.start_timestamp)
678
    self.assert_(job.end_timestamp)
679
    self.assertFalse(compat.any(op.start_timestamp or op.end_timestamp
680
                                for op in job.ops))
681
    self.assertEqual(job.GetInfo(["opstatus", "opresult"]),
682
                     [[constants.OP_STATUS_CANCELED for _ in job.ops],
683
                      ["Job canceled by request" for _ in job.ops]])
684

    
685
  def testCancelWhileWaitlock(self):
686
    queue = _FakeQueueForProc()
687

    
688
    ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
689
           for i in range(5)]
690

    
691
    # Create job
692
    job_id = 11009
693
    job = self._CreateJob(queue, job_id, ops)
694

    
695
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
696

    
697
    def _BeforeStart(timeout, priority):
698
      self.assertEqual(queue.GetNextUpdate(), (job, True))
699
      self.assertRaises(IndexError, queue.GetNextUpdate)
700
      self.assertFalse(queue.IsAcquired())
701
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITLOCK)
702

    
703
      # Mark as cancelled
704
      (success, _) = job.Cancel()
705
      self.assert_(success)
706

    
707
      self.assert_(compat.all(op.status == constants.OP_STATUS_CANCELING
708
                              for op in job.ops))
709
      self.assertRaises(IndexError, queue.GetNextUpdate)
710

    
711
    def _AfterStart(op, cbs):
712
      self.assertEqual(queue.GetNextUpdate(), (job, True))
713
      self.assertRaises(IndexError, queue.GetNextUpdate)
714
      self.assertFalse(queue.IsAcquired())
715
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
716

    
717
    opexec = _FakeExecOpCodeForProc(queue, _BeforeStart, _AfterStart)
718

    
719
    self.assertRaises(IndexError, queue.GetNextUpdate)
720
    self.assert_(jqueue._JobProcessor(queue, opexec, job)())
721
    self.assertEqual(queue.GetNextUpdate(), (job, True))
722
    self.assertRaises(IndexError, queue.GetNextUpdate)
723

    
724
    # Check result
725
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_CANCELED)
726
    self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_CANCELED])
727
    self.assert_(job.start_timestamp)
728
    self.assert_(job.end_timestamp)
729
    self.assertFalse(compat.all(op.start_timestamp and op.end_timestamp
730
                                for op in job.ops))
731
    self.assertEqual(job.GetInfo(["opstatus", "opresult"]),
732
                     [[constants.OP_STATUS_CANCELED for _ in job.ops],
733
                      ["Job canceled by request" for _ in job.ops]])
734

    
735
  def testCancelWhileWaitlockWithTimeout(self):
736
    queue = _FakeQueueForProc()
737

    
738
    ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
739
           for i in range(5)]
740

    
741
    # Create job
742
    job_id = 24314
743
    job = self._CreateJob(queue, job_id, ops)
744

    
745
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
746

    
747
    def _BeforeStart(timeout, priority):
748
      self.assertFalse(queue.IsAcquired())
749
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITLOCK)
750

    
751
      # Mark as cancelled
752
      (success, _) = job.Cancel()
753
      self.assert_(success)
754

    
755
      self.assert_(compat.all(op.status == constants.OP_STATUS_CANCELING
756
                              for op in job.ops))
757

    
758
      # Fake an acquire attempt timing out
759
      raise mcpu.LockAcquireTimeout()
760

    
761
    def _AfterStart(op, cbs):
762
      self.fail("Should not reach this")
763

    
764
    opexec = _FakeExecOpCodeForProc(queue, _BeforeStart, _AfterStart)
765

    
766
    self.assert_(jqueue._JobProcessor(queue, opexec, job)())
767

    
768
    # Check result
769
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_CANCELED)
770
    self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_CANCELED])
771
    self.assert_(job.start_timestamp)
772
    self.assert_(job.end_timestamp)
773
    self.assertFalse(compat.all(op.start_timestamp and op.end_timestamp
774
                                for op in job.ops))
775
    self.assertEqual(job.GetInfo(["opstatus", "opresult"]),
776
                     [[constants.OP_STATUS_CANCELED for _ in job.ops],
777
                      ["Job canceled by request" for _ in job.ops]])
778

    
779
  def testCancelWhileRunning(self):
780
    # Tests canceling a job with finished opcodes and more, unprocessed ones
781
    queue = _FakeQueueForProc()
782

    
783
    ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
784
           for i in range(3)]
785

    
786
    # Create job
787
    job_id = 28492
788
    job = self._CreateJob(queue, job_id, ops)
789

    
790
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
791

    
792
    opexec = _FakeExecOpCodeForProc(queue, None, None)
793

    
794
    # Run one opcode
795
    self.assertFalse(jqueue._JobProcessor(queue, opexec, job)())
796

    
797
    # Job goes back to queued
798
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
799
    self.assertEqual(job.GetInfo(["opstatus", "opresult"]),
800
                     [[constants.OP_STATUS_SUCCESS,
801
                       constants.OP_STATUS_QUEUED,
802
                       constants.OP_STATUS_QUEUED],
803
                      ["Res0", None, None]])
804

    
805
    # Mark as cancelled
806
    (success, _) = job.Cancel()
807
    self.assert_(success)
808

    
809
    # Try processing another opcode (this will actually cancel the job)
810
    self.assert_(jqueue._JobProcessor(queue, opexec, job)())
811

    
812
    # Check result
813
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_CANCELED)
814
    self.assertEqual(job.GetInfo(["id"]), [job_id])
815
    self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_CANCELED])
816
    self.assertEqual(job.GetInfo(["opstatus", "opresult"]),
817
                     [[constants.OP_STATUS_SUCCESS,
818
                       constants.OP_STATUS_CANCELED,
819
                       constants.OP_STATUS_CANCELED],
820
                      ["Res0", "Job canceled by request",
821
                       "Job canceled by request"]])
822

    
823
  def testPartiallyRun(self):
824
    # Tests calling the processor on a job that's been partially run before the
825
    # program was restarted
826
    queue = _FakeQueueForProc()
827

    
828
    opexec = _FakeExecOpCodeForProc(queue, None, None)
829

    
830
    for job_id, successcount in [(30697, 1), (2552, 4), (12489, 9)]:
831
      ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
832
             for i in range(10)]
833

    
834
      # Create job
835
      job = self._CreateJob(queue, job_id, ops)
836

    
837
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
838

    
839
      for _ in range(successcount):
840
        self.assertFalse(jqueue._JobProcessor(queue, opexec, job)())
841

    
842
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
843
      self.assertEqual(job.GetInfo(["opstatus"]),
844
                       [[constants.OP_STATUS_SUCCESS
845
                         for _ in range(successcount)] +
846
                        [constants.OP_STATUS_QUEUED
847
                         for _ in range(len(ops) - successcount)]])
848

    
849
      self.assert_(job.ops_iter)
850

    
851
      # Serialize and restore (simulates program restart)
852
      newjob = jqueue._QueuedJob.Restore(queue, job.Serialize())
853
      self.assertFalse(newjob.ops_iter)
854
      self._TestPartial(newjob, successcount)
855

    
856
  def _TestPartial(self, job, successcount):
857
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
858
    self.assertEqual(job.start_timestamp, job.ops[0].start_timestamp)
859

    
860
    queue = _FakeQueueForProc()
861
    opexec = _FakeExecOpCodeForProc(queue, None, None)
862

    
863
    for remaining in reversed(range(len(job.ops) - successcount)):
864
      result = jqueue._JobProcessor(queue, opexec, job)()
865

    
866
      if remaining == 0:
867
        # Last opcode
868
        self.assert_(result)
869
        break
870

    
871
      self.assertFalse(result)
872

    
873
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
874

    
875
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_SUCCESS)
876
    self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_SUCCESS])
877
    self.assertEqual(job.GetInfo(["opresult"]),
878
                     [[op.input.result for op in job.ops]])
879
    self.assertEqual(job.GetInfo(["opstatus"]),
880
                     [[constants.OP_STATUS_SUCCESS for _ in job.ops]])
881
    self.assert_(compat.all(op.start_timestamp and op.end_timestamp
882
                            for op in job.ops))
883

    
884
    self._GenericCheckJob(job)
885

    
886
    # Finished jobs can't be processed any further
887
    self.assertRaises(errors.ProgrammerError,
888
                      jqueue._JobProcessor(queue, opexec, job))
889

    
890
    # ... also after being restored
891
    job2 = jqueue._QueuedJob.Restore(queue, job.Serialize())
892
    self.assertRaises(errors.ProgrammerError,
893
                      jqueue._JobProcessor(queue, opexec, job2))
894

    
895
  def testProcessorOnRunningJob(self):
896
    ops = [opcodes.OpTestDummy(result="result", fail=False)]
897

    
898
    queue = _FakeQueueForProc()
899
    opexec = _FakeExecOpCodeForProc(queue, None, None)
900

    
901
    # Create job
902
    job = self._CreateJob(queue, 9571, ops)
903

    
904
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
905

    
906
    job.ops[0].status = constants.OP_STATUS_RUNNING
907

    
908
    assert len(job.ops) == 1
909

    
910
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
911

    
912
    # Calling on running job must fail
913
    self.assertRaises(errors.ProgrammerError,
914
                      jqueue._JobProcessor(queue, opexec, job))
915

    
916
  def testLogMessages(self):
917
    # Tests the "Feedback" callback function
918
    queue = _FakeQueueForProc()
919

    
920
    messages = {
921
      1: [
922
        (None, "Hello"),
923
        (None, "World"),
924
        (constants.ELOG_MESSAGE, "there"),
925
        ],
926
      4: [
927
        (constants.ELOG_JQUEUE_TEST, (1, 2, 3)),
928
        (constants.ELOG_JQUEUE_TEST, ("other", "type")),
929
        ],
930
      }
931
    ops = [opcodes.OpTestDummy(result="Logtest%s" % i, fail=False,
932
                               messages=messages.get(i, []))
933
           for i in range(5)]
934

    
935
    # Create job
936
    job = self._CreateJob(queue, 29386, ops)
937

    
938
    def _BeforeStart(timeout, priority):
939
      self.assertEqual(queue.GetNextUpdate(), (job, True))
940
      self.assertRaises(IndexError, queue.GetNextUpdate)
941
      self.assertFalse(queue.IsAcquired())
942
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITLOCK)
943

    
944
    def _AfterStart(op, cbs):
945
      self.assertEqual(queue.GetNextUpdate(), (job, True))
946
      self.assertRaises(IndexError, queue.GetNextUpdate)
947
      self.assertFalse(queue.IsAcquired())
948
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
949

    
950
      self.assertRaises(AssertionError, cbs.Feedback,
951
                        "too", "many", "arguments")
952

    
953
      for (log_type, msg) in op.messages:
954
        self.assertRaises(IndexError, queue.GetNextUpdate)
955
        if log_type:
956
          cbs.Feedback(log_type, msg)
957
        else:
958
          cbs.Feedback(msg)
959
        # Check for job update without replication
960
        self.assertEqual(queue.GetNextUpdate(), (job, False))
961
        self.assertRaises(IndexError, queue.GetNextUpdate)
962

    
963
    opexec = _FakeExecOpCodeForProc(queue, _BeforeStart, _AfterStart)
964

    
965
    for remaining in reversed(range(len(job.ops))):
966
      self.assertRaises(IndexError, queue.GetNextUpdate)
967
      result = jqueue._JobProcessor(queue, opexec, job)()
968
      self.assertEqual(queue.GetNextUpdate(), (job, True))
969
      self.assertRaises(IndexError, queue.GetNextUpdate)
970

    
971
      if remaining == 0:
972
        # Last opcode
973
        self.assert_(result)
974
        break
975

    
976
      self.assertFalse(result)
977

    
978
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
979

    
980
    self.assertRaises(IndexError, queue.GetNextUpdate)
981

    
982
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_SUCCESS)
983
    self.assertEqual(job.GetInfo(["opresult"]),
984
                     [[op.input.result for op in job.ops]])
985

    
986
    logmsgcount = sum(len(m) for m in messages.values())
987

    
988
    self._CheckLogMessages(job, logmsgcount)
989

    
990
    # Serialize and restore (simulates program restart)
991
    newjob = jqueue._QueuedJob.Restore(queue, job.Serialize())
992
    self._CheckLogMessages(newjob, logmsgcount)
993

    
994
    # Check each message
995
    prevserial = -1
996
    for idx, oplog in enumerate(job.GetInfo(["oplog"])[0]):
997
      for (serial, timestamp, log_type, msg) in oplog:
998
        (exptype, expmsg) = messages.get(idx).pop(0)
999
        if exptype:
1000
          self.assertEqual(log_type, exptype)
1001
        else:
1002
          self.assertEqual(log_type, constants.ELOG_MESSAGE)
1003
        self.assertEqual(expmsg, msg)
1004
        self.assert_(serial > prevserial)
1005
        prevserial = serial
1006

    
1007
  def _CheckLogMessages(self, job, count):
1008
    # Check serial
1009
    self.assertEqual(job.log_serial, count)
1010

    
1011
    # No filter
1012
    self.assertEqual(job.GetLogEntries(None),
1013
                     [entry for entries in job.GetInfo(["oplog"])[0] if entries
1014
                      for entry in entries])
1015

    
1016
    # Filter with serial
1017
    assert count > 3
1018
    self.assert_(job.GetLogEntries(3))
1019
    self.assertEqual(job.GetLogEntries(3),
1020
                     [entry for entries in job.GetInfo(["oplog"])[0] if entries
1021
                      for entry in entries][3:])
1022

    
1023
    # No log message after highest serial
1024
    self.assertFalse(job.GetLogEntries(count))
1025
    self.assertFalse(job.GetLogEntries(count + 3))
1026

    
1027

    
1028
class _FakeTimeoutStrategy:
1029
  def __init__(self, timeouts):
1030
    self.timeouts = timeouts
1031
    self.attempts = 0
1032
    self.last_timeout = None
1033

    
1034
  def NextAttempt(self):
1035
    self.attempts += 1
1036
    if self.timeouts:
1037
      timeout = self.timeouts.pop(0)
1038
    else:
1039
      timeout = None
1040
    self.last_timeout = timeout
1041
    return timeout
1042

    
1043

    
1044
class TestJobProcessorTimeouts(unittest.TestCase, _JobProcessorTestUtils):
1045
  def setUp(self):
1046
    self.queue = _FakeQueueForProc()
1047
    self.job = None
1048
    self.curop = None
1049
    self.opcounter = None
1050
    self.timeout_strategy = None
1051
    self.retries = 0
1052
    self.prev_tsop = None
1053
    self.prev_prio = None
1054
    self.prev_status = None
1055
    self.lock_acq_prio = None
1056
    self.gave_lock = None
1057
    self.done_lock_before_blocking = False
1058

    
1059
  def _BeforeStart(self, timeout, priority):
1060
    job = self.job
1061

    
1062
    # If status has changed, job must've been written
1063
    if self.prev_status != self.job.ops[self.curop].status:
1064
      self.assertEqual(self.queue.GetNextUpdate(), (job, True))
1065
    self.assertRaises(IndexError, self.queue.GetNextUpdate)
1066

    
1067
    self.assertFalse(self.queue.IsAcquired())
1068
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITLOCK)
1069

    
1070
    ts = self.timeout_strategy
1071

    
1072
    self.assert_(timeout is None or isinstance(timeout, (int, float)))
1073
    self.assertEqual(timeout, ts.last_timeout)
1074
    self.assertEqual(priority, job.ops[self.curop].priority)
1075

    
1076
    self.gave_lock = True
1077
    self.lock_acq_prio = priority
1078

    
1079
    if (self.curop == 3 and
1080
        job.ops[self.curop].priority == constants.OP_PRIO_HIGHEST + 3):
1081
      # Give locks before running into blocking acquire
1082
      assert self.retries == 7
1083
      self.retries = 0
1084
      self.done_lock_before_blocking = True
1085
      return
1086

    
1087
    if self.retries > 0:
1088
      self.assert_(timeout is not None)
1089
      self.retries -= 1
1090
      self.gave_lock = False
1091
      raise mcpu.LockAcquireTimeout()
1092

    
1093
    if job.ops[self.curop].priority == constants.OP_PRIO_HIGHEST:
1094
      assert self.retries == 0, "Didn't exhaust all retries at highest priority"
1095
      assert not ts.timeouts
1096
      self.assert_(timeout is None)
1097

    
1098
  def _AfterStart(self, op, cbs):
1099
    job = self.job
1100

    
1101
    # Setting to "running" requires an update
1102
    self.assertEqual(self.queue.GetNextUpdate(), (job, True))
1103
    self.assertRaises(IndexError, self.queue.GetNextUpdate)
1104

    
1105
    self.assertFalse(self.queue.IsAcquired())
1106
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
1107

    
1108
    # Job is running, cancelling shouldn't be possible
1109
    (success, _) = job.Cancel()
1110
    self.assertFalse(success)
1111

    
1112
  def _NextOpcode(self):
1113
    self.curop = self.opcounter.next()
1114
    self.prev_prio = self.job.ops[self.curop].priority
1115
    self.prev_status = self.job.ops[self.curop].status
1116

    
1117
  def _NewTimeoutStrategy(self):
1118
    job = self.job
1119

    
1120
    self.assertEqual(self.retries, 0)
1121

    
1122
    if self.prev_tsop == self.curop:
1123
      # Still on the same opcode, priority must've been increased
1124
      self.assertEqual(self.prev_prio, job.ops[self.curop].priority + 1)
1125

    
1126
    if self.curop == 1:
1127
      # Normal retry
1128
      timeouts = range(10, 31, 10)
1129
      self.retries = len(timeouts) - 1
1130

    
1131
    elif self.curop == 2:
1132
      # Let this run into a blocking acquire
1133
      timeouts = range(11, 61, 12)
1134
      self.retries = len(timeouts)
1135

    
1136
    elif self.curop == 3:
1137
      # Wait for priority to increase, but give lock before blocking acquire
1138
      timeouts = range(12, 100, 14)
1139
      self.retries = len(timeouts)
1140

    
1141
      self.assertFalse(self.done_lock_before_blocking)
1142

    
1143
    elif self.curop == 4:
1144
      self.assert_(self.done_lock_before_blocking)
1145

    
1146
      # Timeouts, but no need to retry
1147
      timeouts = range(10, 31, 10)
1148
      self.retries = 0
1149

    
1150
    elif self.curop == 5:
1151
      # Normal retry
1152
      timeouts = range(19, 100, 11)
1153
      self.retries = len(timeouts)
1154

    
1155
    else:
1156
      timeouts = []
1157
      self.retries = 0
1158

    
1159
    assert len(job.ops) == 10
1160
    assert self.retries <= len(timeouts)
1161

    
1162
    ts = _FakeTimeoutStrategy(timeouts)
1163

    
1164
    self.timeout_strategy = ts
1165
    self.prev_tsop = self.curop
1166
    self.prev_prio = job.ops[self.curop].priority
1167

    
1168
    return ts
1169

    
1170
  def testTimeout(self):
1171
    ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
1172
           for i in range(10)]
1173

    
1174
    # Create job
1175
    job_id = 15801
1176
    job = self._CreateJob(self.queue, job_id, ops)
1177
    self.job = job
1178

    
1179
    self.opcounter = itertools.count(0)
1180

    
1181
    opexec = _FakeExecOpCodeForProc(self.queue, self._BeforeStart,
1182
                                    self._AfterStart)
1183
    tsf = self._NewTimeoutStrategy
1184

    
1185
    self.assertFalse(self.done_lock_before_blocking)
1186

    
1187
    while True:
1188
      proc = jqueue._JobProcessor(self.queue, opexec, job,
1189
                                  _timeout_strategy_factory=tsf)
1190

    
1191
      self.assertRaises(IndexError, self.queue.GetNextUpdate)
1192

    
1193
      if self.curop is not None:
1194
        self.prev_status = self.job.ops[self.curop].status
1195

    
1196
      self.lock_acq_prio = None
1197

    
1198
      result = proc(_nextop_fn=self._NextOpcode)
1199
      assert self.curop is not None
1200

    
1201
      if result or self.gave_lock:
1202
        # Got lock and/or job is done, result must've been written
1203
        self.assertFalse(job.cur_opctx)
1204
        self.assertEqual(self.queue.GetNextUpdate(), (job, True))
1205
        self.assertRaises(IndexError, self.queue.GetNextUpdate)
1206
        self.assertEqual(self.lock_acq_prio, job.ops[self.curop].priority)
1207
        self.assert_(job.ops[self.curop].exec_timestamp)
1208

    
1209
      if result:
1210
        self.assertFalse(job.cur_opctx)
1211
        break
1212

    
1213
      self.assertFalse(result)
1214

    
1215
      if self.curop == 0:
1216
        self.assertEqual(job.ops[self.curop].start_timestamp,
1217
                         job.start_timestamp)
1218

    
1219
      if self.gave_lock:
1220
        # Opcode finished, but job not yet done
1221
        self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1222
      else:
1223
        # Did not get locks
1224
        self.assert_(job.cur_opctx)
1225
        self.assertEqual(job.cur_opctx._timeout_strategy._fn,
1226
                         self.timeout_strategy.NextAttempt)
1227
        self.assertFalse(job.ops[self.curop].exec_timestamp)
1228
        self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITLOCK)
1229

    
1230
        # If priority has changed since acquiring locks, the job must've been
1231
        # updated
1232
        if self.lock_acq_prio != job.ops[self.curop].priority:
1233
          self.assertEqual(self.queue.GetNextUpdate(), (job, True))
1234

    
1235
      self.assertRaises(IndexError, self.queue.GetNextUpdate)
1236

    
1237
      self.assert_(job.start_timestamp)
1238
      self.assertFalse(job.end_timestamp)
1239

    
1240
    self.assertEqual(self.curop, len(job.ops) - 1)
1241
    self.assertEqual(self.job, job)
1242
    self.assertEqual(self.opcounter.next(), len(job.ops))
1243
    self.assert_(self.done_lock_before_blocking)
1244

    
1245
    self.assertRaises(IndexError, self.queue.GetNextUpdate)
1246
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_SUCCESS)
1247
    self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_SUCCESS])
1248
    self.assertEqual(job.GetInfo(["opresult"]),
1249
                     [[op.input.result for op in job.ops]])
1250
    self.assertEqual(job.GetInfo(["opstatus"]),
1251
                     [len(job.ops) * [constants.OP_STATUS_SUCCESS]])
1252
    self.assert_(compat.all(op.start_timestamp and op.end_timestamp
1253
                            for op in job.ops))
1254

    
1255
    # Finished jobs can't be processed any further
1256
    self.assertRaises(errors.ProgrammerError,
1257
                      jqueue._JobProcessor(self.queue, opexec, job))
1258

    
1259

    
1260
if __name__ == "__main__":
1261
  testutils.GanetiTestProgram()