Statistics
| Branch: | Tag: | Revision:

root / test / ganeti.jqueue_unittest.py @ 66bd7445

History | View | Annotate | Download (43.6 kB)

1
#!/usr/bin/python
2
#
3

    
4
# Copyright (C) 2010 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Script for testing ganeti.jqueue"""
23

    
24
import os
25
import sys
26
import unittest
27
import tempfile
28
import shutil
29
import errno
30
import itertools
31

    
32
from ganeti import constants
33
from ganeti import utils
34
from ganeti import errors
35
from ganeti import jqueue
36
from ganeti import opcodes
37
from ganeti import compat
38
from ganeti import mcpu
39

    
40
import testutils
41

    
42

    
43
class _FakeJob:
44
  def __init__(self, job_id, status):
45
    self.id = job_id
46
    self._status = status
47
    self._log = []
48

    
49
  def SetStatus(self, status):
50
    self._status = status
51

    
52
  def AddLogEntry(self, msg):
53
    self._log.append((len(self._log), msg))
54

    
55
  def CalcStatus(self):
56
    return self._status
57

    
58
  def GetInfo(self, fields):
59
    result = []
60

    
61
    for name in fields:
62
      if name == "status":
63
        result.append(self._status)
64
      else:
65
        raise Exception("Unknown field")
66

    
67
    return result
68

    
69
  def GetLogEntries(self, newer_than):
70
    assert newer_than is None or newer_than >= 0
71

    
72
    if newer_than is None:
73
      return self._log
74

    
75
    return self._log[newer_than:]
76

    
77

    
78
class TestJobChangesChecker(unittest.TestCase):
79
  def testStatus(self):
80
    job = _FakeJob(9094, constants.JOB_STATUS_QUEUED)
81
    checker = jqueue._JobChangesChecker(["status"], None, None)
82
    self.assertEqual(checker(job), ([constants.JOB_STATUS_QUEUED], []))
83

    
84
    job.SetStatus(constants.JOB_STATUS_RUNNING)
85
    self.assertEqual(checker(job), ([constants.JOB_STATUS_RUNNING], []))
86

    
87
    job.SetStatus(constants.JOB_STATUS_SUCCESS)
88
    self.assertEqual(checker(job), ([constants.JOB_STATUS_SUCCESS], []))
89

    
90
    # job.id is used by checker
91
    self.assertEqual(job.id, 9094)
92

    
93
  def testStatusWithPrev(self):
94
    job = _FakeJob(12807, constants.JOB_STATUS_QUEUED)
95
    checker = jqueue._JobChangesChecker(["status"],
96
                                        [constants.JOB_STATUS_QUEUED], None)
97
    self.assert_(checker(job) is None)
98

    
99
    job.SetStatus(constants.JOB_STATUS_RUNNING)
100
    self.assertEqual(checker(job), ([constants.JOB_STATUS_RUNNING], []))
101

    
102
  def testFinalStatus(self):
103
    for status in constants.JOBS_FINALIZED:
104
      job = _FakeJob(2178711, status)
105
      checker = jqueue._JobChangesChecker(["status"], [status], None)
106
      # There won't be any changes in this status, hence it should signal
107
      # a change immediately
108
      self.assertEqual(checker(job), ([status], []))
109

    
110
  def testLog(self):
111
    job = _FakeJob(9094, constants.JOB_STATUS_RUNNING)
112
    checker = jqueue._JobChangesChecker(["status"], None, None)
113
    self.assertEqual(checker(job), ([constants.JOB_STATUS_RUNNING], []))
114

    
115
    job.AddLogEntry("Hello World")
116
    (job_info, log_entries) = checker(job)
117
    self.assertEqual(job_info, [constants.JOB_STATUS_RUNNING])
118
    self.assertEqual(log_entries, [[0, "Hello World"]])
119

    
120
    checker2 = jqueue._JobChangesChecker(["status"], job_info, len(log_entries))
121
    self.assert_(checker2(job) is None)
122

    
123
    job.AddLogEntry("Foo Bar")
124
    job.SetStatus(constants.JOB_STATUS_ERROR)
125

    
126
    (job_info, log_entries) = checker2(job)
127
    self.assertEqual(job_info, [constants.JOB_STATUS_ERROR])
128
    self.assertEqual(log_entries, [[1, "Foo Bar"]])
129

    
130
    checker3 = jqueue._JobChangesChecker(["status"], None, None)
131
    (job_info, log_entries) = checker3(job)
132
    self.assertEqual(job_info, [constants.JOB_STATUS_ERROR])
133
    self.assertEqual(log_entries, [[0, "Hello World"], [1, "Foo Bar"]])
134

    
135

    
136
class TestJobChangesWaiter(unittest.TestCase):
137
  def setUp(self):
138
    self.tmpdir = tempfile.mkdtemp()
139
    self.filename = utils.PathJoin(self.tmpdir, "job-1")
140
    utils.WriteFile(self.filename, data="")
141

    
142
  def tearDown(self):
143
    shutil.rmtree(self.tmpdir)
144

    
145
  def _EnsureNotifierClosed(self, notifier):
146
    try:
147
      os.fstat(notifier._fd)
148
    except EnvironmentError, err:
149
      self.assertEqual(err.errno, errno.EBADF)
150
    else:
151
      self.fail("File descriptor wasn't closed")
152

    
153
  def testClose(self):
154
    for wait in [False, True]:
155
      waiter = jqueue._JobFileChangesWaiter(self.filename)
156
      try:
157
        if wait:
158
          waiter.Wait(0.001)
159
      finally:
160
        waiter.Close()
161

    
162
      # Ensure file descriptor was closed
163
      self._EnsureNotifierClosed(waiter._notifier)
164

    
165
  def testChangingFile(self):
166
    waiter = jqueue._JobFileChangesWaiter(self.filename)
167
    try:
168
      self.assertFalse(waiter.Wait(0.1))
169
      utils.WriteFile(self.filename, data="changed")
170
      self.assert_(waiter.Wait(60))
171
    finally:
172
      waiter.Close()
173

    
174
    self._EnsureNotifierClosed(waiter._notifier)
175

    
176
  def testChangingFile2(self):
177
    waiter = jqueue._JobChangesWaiter(self.filename)
178
    try:
179
      self.assertFalse(waiter._filewaiter)
180
      self.assert_(waiter.Wait(0.1))
181
      self.assert_(waiter._filewaiter)
182

    
183
      # File waiter is now used, but there have been no changes
184
      self.assertFalse(waiter.Wait(0.1))
185
      utils.WriteFile(self.filename, data="changed")
186
      self.assert_(waiter.Wait(60))
187
    finally:
188
      waiter.Close()
189

    
190
    self._EnsureNotifierClosed(waiter._filewaiter._notifier)
191

    
192

    
193
class TestWaitForJobChangesHelper(unittest.TestCase):
194
  def setUp(self):
195
    self.tmpdir = tempfile.mkdtemp()
196
    self.filename = utils.PathJoin(self.tmpdir, "job-2614226563")
197
    utils.WriteFile(self.filename, data="")
198

    
199
  def tearDown(self):
200
    shutil.rmtree(self.tmpdir)
201

    
202
  def _LoadWaitingJob(self):
203
    return _FakeJob(2614226563, constants.JOB_STATUS_WAITLOCK)
204

    
205
  def _LoadLostJob(self):
206
    return None
207

    
208
  def testNoChanges(self):
209
    wfjc = jqueue._WaitForJobChangesHelper()
210

    
211
    # No change
212
    self.assertEqual(wfjc(self.filename, self._LoadWaitingJob, ["status"],
213
                          [constants.JOB_STATUS_WAITLOCK], None, 0.1),
214
                     constants.JOB_NOTCHANGED)
215

    
216
    # No previous information
217
    self.assertEqual(wfjc(self.filename, self._LoadWaitingJob,
218
                          ["status"], None, None, 1.0),
219
                     ([constants.JOB_STATUS_WAITLOCK], []))
220

    
221
  def testLostJob(self):
222
    wfjc = jqueue._WaitForJobChangesHelper()
223
    self.assert_(wfjc(self.filename, self._LoadLostJob,
224
                      ["status"], None, None, 1.0) is None)
225

    
226

    
227
class TestEncodeOpError(unittest.TestCase):
228
  def test(self):
229
    encerr = jqueue._EncodeOpError(errors.LockError("Test 1"))
230
    self.assert_(isinstance(encerr, tuple))
231
    self.assertRaises(errors.LockError, errors.MaybeRaise, encerr)
232

    
233
    encerr = jqueue._EncodeOpError(errors.GenericError("Test 2"))
234
    self.assert_(isinstance(encerr, tuple))
235
    self.assertRaises(errors.GenericError, errors.MaybeRaise, encerr)
236

    
237
    encerr = jqueue._EncodeOpError(NotImplementedError("Foo"))
238
    self.assert_(isinstance(encerr, tuple))
239
    self.assertRaises(errors.OpExecError, errors.MaybeRaise, encerr)
240

    
241
    encerr = jqueue._EncodeOpError("Hello World")
242
    self.assert_(isinstance(encerr, tuple))
243
    self.assertRaises(errors.OpExecError, errors.MaybeRaise, encerr)
244

    
245

    
246
class TestQueuedOpCode(unittest.TestCase):
247
  def testDefaults(self):
248
    def _Check(op):
249
      self.assertFalse(hasattr(op.input, "dry_run"))
250
      self.assertEqual(op.priority, constants.OP_PRIO_DEFAULT)
251
      self.assertFalse(op.log)
252
      self.assert_(op.start_timestamp is None)
253
      self.assert_(op.exec_timestamp is None)
254
      self.assert_(op.end_timestamp is None)
255
      self.assert_(op.result is None)
256
      self.assertEqual(op.status, constants.OP_STATUS_QUEUED)
257

    
258
    op1 = jqueue._QueuedOpCode(opcodes.OpTestDelay())
259
    _Check(op1)
260
    op2 = jqueue._QueuedOpCode.Restore(op1.Serialize())
261
    _Check(op2)
262
    self.assertEqual(op1.Serialize(), op2.Serialize())
263

    
264
  def testPriority(self):
265
    def _Check(op):
266
      assert constants.OP_PRIO_DEFAULT != constants.OP_PRIO_HIGH, \
267
             "Default priority equals high priority; test can't work"
268
      self.assertEqual(op.priority, constants.OP_PRIO_HIGH)
269
      self.assertEqual(op.status, constants.OP_STATUS_QUEUED)
270

    
271
    inpop = opcodes.OpTagsGet(priority=constants.OP_PRIO_HIGH)
272
    op1 = jqueue._QueuedOpCode(inpop)
273
    _Check(op1)
274
    op2 = jqueue._QueuedOpCode.Restore(op1.Serialize())
275
    _Check(op2)
276
    self.assertEqual(op1.Serialize(), op2.Serialize())
277

    
278

    
279
class TestQueuedJob(unittest.TestCase):
280
  def test(self):
281
    self.assertRaises(errors.GenericError, jqueue._QueuedJob,
282
                      None, 1, [])
283

    
284
  def testDefaults(self):
285
    job_id = 4260
286
    ops = [
287
      opcodes.OpTagsGet(),
288
      opcodes.OpTestDelay(),
289
      ]
290

    
291
    def _Check(job):
292
      self.assertEqual(job.id, job_id)
293
      self.assertEqual(job.log_serial, 0)
294
      self.assert_(job.received_timestamp)
295
      self.assert_(job.start_timestamp is None)
296
      self.assert_(job.end_timestamp is None)
297
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
298
      self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT)
299
      self.assert_(repr(job).startswith("<"))
300
      self.assertEqual(len(job.ops), len(ops))
301
      self.assert_(compat.all(inp.__getstate__() == op.input.__getstate__()
302
                              for (inp, op) in zip(ops, job.ops)))
303
      self.assertRaises(errors.OpExecError, job.GetInfo,
304
                        ["unknown-field"])
305
      self.assertEqual(job.GetInfo(["summary"]),
306
                       [[op.input.Summary() for op in job.ops]])
307

    
308
    job1 = jqueue._QueuedJob(None, job_id, ops)
309
    _Check(job1)
310
    job2 = jqueue._QueuedJob.Restore(None, job1.Serialize())
311
    _Check(job2)
312
    self.assertEqual(job1.Serialize(), job2.Serialize())
313

    
314
  def testPriority(self):
315
    job_id = 4283
316
    ops = [
317
      opcodes.OpTagsGet(priority=constants.OP_PRIO_DEFAULT),
318
      opcodes.OpTestDelay(),
319
      ]
320

    
321
    def _Check(job):
322
      self.assertEqual(job.id, job_id)
323
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
324
      self.assert_(repr(job).startswith("<"))
325

    
326
    job = jqueue._QueuedJob(None, job_id, ops)
327
    _Check(job)
328
    self.assert_(compat.all(op.priority == constants.OP_PRIO_DEFAULT
329
                            for op in job.ops))
330
    self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT)
331

    
332
    # Increase first
333
    job.ops[0].priority -= 1
334
    _Check(job)
335
    self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT - 1)
336

    
337
    # Mark opcode as finished
338
    job.ops[0].status = constants.OP_STATUS_SUCCESS
339
    _Check(job)
340
    self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT)
341

    
342
    # Increase second
343
    job.ops[1].priority -= 10
344
    self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT - 10)
345

    
346
    # Test increasing first
347
    job.ops[0].status = constants.OP_STATUS_RUNNING
348
    job.ops[0].priority -= 19
349
    self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT - 20)
350

    
351
  def testCalcStatus(self):
352
    def _Queued(ops):
353
      # The default status is "queued"
354
      self.assert_(compat.all(op.status == constants.OP_STATUS_QUEUED
355
                              for op in ops))
356

    
357
    def _Waitlock1(ops):
358
      ops[0].status = constants.OP_STATUS_WAITLOCK
359

    
360
    def _Waitlock2(ops):
361
      ops[0].status = constants.OP_STATUS_SUCCESS
362
      ops[1].status = constants.OP_STATUS_SUCCESS
363
      ops[2].status = constants.OP_STATUS_WAITLOCK
364

    
365
    def _Running(ops):
366
      ops[0].status = constants.OP_STATUS_SUCCESS
367
      ops[1].status = constants.OP_STATUS_RUNNING
368
      for op in ops[2:]:
369
        op.status = constants.OP_STATUS_QUEUED
370

    
371
    def _Canceling1(ops):
372
      ops[0].status = constants.OP_STATUS_SUCCESS
373
      ops[1].status = constants.OP_STATUS_SUCCESS
374
      for op in ops[2:]:
375
        op.status = constants.OP_STATUS_CANCELING
376

    
377
    def _Canceling2(ops):
378
      for op in ops:
379
        op.status = constants.OP_STATUS_CANCELING
380

    
381
    def _Canceled(ops):
382
      for op in ops:
383
        op.status = constants.OP_STATUS_CANCELED
384

    
385
    def _Error1(ops):
386
      for idx, op in enumerate(ops):
387
        if idx > 3:
388
          op.status = constants.OP_STATUS_ERROR
389
        else:
390
          op.status = constants.OP_STATUS_SUCCESS
391

    
392
    def _Error2(ops):
393
      for op in ops:
394
        op.status = constants.OP_STATUS_ERROR
395

    
396
    def _Success(ops):
397
      for op in ops:
398
        op.status = constants.OP_STATUS_SUCCESS
399

    
400
    tests = {
401
      constants.JOB_STATUS_QUEUED: [_Queued],
402
      constants.JOB_STATUS_WAITLOCK: [_Waitlock1, _Waitlock2],
403
      constants.JOB_STATUS_RUNNING: [_Running],
404
      constants.JOB_STATUS_CANCELING: [_Canceling1, _Canceling2],
405
      constants.JOB_STATUS_CANCELED: [_Canceled],
406
      constants.JOB_STATUS_ERROR: [_Error1, _Error2],
407
      constants.JOB_STATUS_SUCCESS: [_Success],
408
      }
409

    
410
    def _NewJob():
411
      job = jqueue._QueuedJob(None, 1,
412
                              [opcodes.OpTestDelay() for _ in range(10)])
413
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
414
      self.assert_(compat.all(op.status == constants.OP_STATUS_QUEUED
415
                              for op in job.ops))
416
      return job
417

    
418
    for status in constants.JOB_STATUS_ALL:
419
      sttests = tests[status]
420
      assert sttests
421
      for fn in sttests:
422
        job = _NewJob()
423
        fn(job.ops)
424
        self.assertEqual(job.CalcStatus(), status)
425

    
426

    
427
class _FakeQueueForProc:
428
  def __init__(self):
429
    self._acquired = False
430
    self._updates = []
431

    
432
  def IsAcquired(self):
433
    return self._acquired
434

    
435
  def GetNextUpdate(self):
436
    return self._updates.pop(0)
437

    
438
  def acquire(self, shared=0):
439
    assert shared == 1
440
    self._acquired = True
441

    
442
  def release(self):
443
    assert self._acquired
444
    self._acquired = False
445

    
446
  def UpdateJobUnlocked(self, job, replicate=True):
447
    assert self._acquired, "Lock not acquired while updating job"
448
    self._updates.append((job, bool(replicate)))
449

    
450

    
451
class _FakeExecOpCodeForProc:
452
  def __init__(self, queue, before_start, after_start):
453
    self._queue = queue
454
    self._before_start = before_start
455
    self._after_start = after_start
456

    
457
  def __call__(self, op, cbs, timeout=None, priority=None):
458
    assert isinstance(op, opcodes.OpTestDummy)
459
    assert not self._queue.IsAcquired(), \
460
           "Queue lock not released when executing opcode"
461

    
462
    if self._before_start:
463
      self._before_start(timeout, priority)
464

    
465
    cbs.NotifyStart()
466

    
467
    if self._after_start:
468
      self._after_start(op, cbs)
469

    
470
    # Check again after the callbacks
471
    assert not self._queue.IsAcquired()
472

    
473
    if op.fail:
474
      raise errors.OpExecError("Error requested (%s)" % op.result)
475

    
476
    return op.result
477

    
478

    
479
class _JobProcessorTestUtils:
480
  def _CreateJob(self, queue, job_id, ops):
481
    job = jqueue._QueuedJob(queue, job_id, ops)
482
    self.assertFalse(job.start_timestamp)
483
    self.assertFalse(job.end_timestamp)
484
    self.assertEqual(len(ops), len(job.ops))
485
    self.assert_(compat.all(op.input == inp
486
                            for (op, inp) in zip(job.ops, ops)))
487
    self.assertEqual(job.GetInfo(["ops"]), [[op.__getstate__() for op in ops]])
488
    return job
489

    
490

    
491
class TestJobProcessor(unittest.TestCase, _JobProcessorTestUtils):
492
  def _GenericCheckJob(self, job):
493
    assert compat.all(isinstance(op.input, opcodes.OpTestDummy)
494
                      for op in job.ops)
495

    
496
    self.assertEqual(job.GetInfo(["opstart", "opexec", "opend"]),
497
                     [[op.start_timestamp for op in job.ops],
498
                      [op.exec_timestamp for op in job.ops],
499
                      [op.end_timestamp for op in job.ops]])
500
    self.assertEqual(job.GetInfo(["received_ts", "start_ts", "end_ts"]),
501
                     [job.received_timestamp,
502
                      job.start_timestamp,
503
                      job.end_timestamp])
504
    self.assert_(job.start_timestamp)
505
    self.assert_(job.end_timestamp)
506
    self.assertEqual(job.start_timestamp, job.ops[0].start_timestamp)
507

    
508
  def testSuccess(self):
509
    queue = _FakeQueueForProc()
510

    
511
    for (job_id, opcount) in [(25351, 1), (6637, 3),
512
                              (24644, 10), (32207, 100)]:
513
      ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
514
             for i in range(opcount)]
515

    
516
      # Create job
517
      job = self._CreateJob(queue, job_id, ops)
518

    
519
      def _BeforeStart(timeout, priority):
520
        self.assertEqual(queue.GetNextUpdate(), (job, True))
521
        self.assertRaises(IndexError, queue.GetNextUpdate)
522
        self.assertFalse(queue.IsAcquired())
523
        self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITLOCK)
524
        self.assertFalse(job.cur_opctx)
525

    
526
      def _AfterStart(op, cbs):
527
        self.assertEqual(queue.GetNextUpdate(), (job, True))
528
        self.assertRaises(IndexError, queue.GetNextUpdate)
529

    
530
        self.assertFalse(queue.IsAcquired())
531
        self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
532
        self.assertFalse(job.cur_opctx)
533

    
534
        # Job is running, cancelling shouldn't be possible
535
        (success, _) = job.Cancel()
536
        self.assertFalse(success)
537

    
538
      opexec = _FakeExecOpCodeForProc(queue, _BeforeStart, _AfterStart)
539

    
540
      for idx in range(len(ops)):
541
        self.assertRaises(IndexError, queue.GetNextUpdate)
542
        result = jqueue._JobProcessor(queue, opexec, job)()
543
        self.assertEqual(queue.GetNextUpdate(), (job, True))
544
        self.assertRaises(IndexError, queue.GetNextUpdate)
545
        if idx == len(ops) - 1:
546
          # Last opcode
547
          self.assert_(result)
548
        else:
549
          self.assertFalse(result)
550

    
551
          self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
552
          self.assert_(job.start_timestamp)
553
          self.assertFalse(job.end_timestamp)
554

    
555
      self.assertRaises(IndexError, queue.GetNextUpdate)
556

    
557
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_SUCCESS)
558
      self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_SUCCESS])
559
      self.assertEqual(job.GetInfo(["opresult"]),
560
                       [[op.input.result for op in job.ops]])
561
      self.assertEqual(job.GetInfo(["opstatus"]),
562
                       [len(job.ops) * [constants.OP_STATUS_SUCCESS]])
563
      self.assert_(compat.all(op.start_timestamp and op.end_timestamp
564
                              for op in job.ops))
565

    
566
      self._GenericCheckJob(job)
567

    
568
      # Calling the processor on a finished job should be a no-op
569
      self.assertTrue(jqueue._JobProcessor(queue, opexec, job)())
570
      self.assertRaises(IndexError, queue.GetNextUpdate)
571

    
572
  def testOpcodeError(self):
573
    queue = _FakeQueueForProc()
574

    
575
    testdata = [
576
      (17077, 1, 0, 0),
577
      (1782, 5, 2, 2),
578
      (18179, 10, 9, 9),
579
      (4744, 10, 3, 8),
580
      (23816, 100, 39, 45),
581
      ]
582

    
583
    for (job_id, opcount, failfrom, failto) in testdata:
584
      # Prepare opcodes
585
      ops = [opcodes.OpTestDummy(result="Res%s" % i,
586
                                 fail=(failfrom <= i and
587
                                       i <= failto))
588
             for i in range(opcount)]
589

    
590
      # Create job
591
      job = self._CreateJob(queue, job_id, ops)
592

    
593
      opexec = _FakeExecOpCodeForProc(queue, None, None)
594

    
595
      for idx in range(len(ops)):
596
        self.assertRaises(IndexError, queue.GetNextUpdate)
597
        result = jqueue._JobProcessor(queue, opexec, job)()
598
        # queued to waitlock
599
        self.assertEqual(queue.GetNextUpdate(), (job, True))
600
        # waitlock to running
601
        self.assertEqual(queue.GetNextUpdate(), (job, True))
602
        # Opcode result
603
        self.assertEqual(queue.GetNextUpdate(), (job, True))
604
        self.assertRaises(IndexError, queue.GetNextUpdate)
605

    
606
        if idx in (failfrom, len(ops) - 1):
607
          # Last opcode
608
          self.assert_(result)
609
          break
610

    
611
        self.assertFalse(result)
612

    
613
        self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
614

    
615
      self.assertRaises(IndexError, queue.GetNextUpdate)
616

    
617
      # Check job status
618
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_ERROR)
619
      self.assertEqual(job.GetInfo(["id"]), [job_id])
620
      self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_ERROR])
621

    
622
      # Check opcode status
623
      data = zip(job.ops,
624
                 job.GetInfo(["opstatus"])[0],
625
                 job.GetInfo(["opresult"])[0])
626

    
627
      for idx, (op, opstatus, opresult) in enumerate(data):
628
        if idx < failfrom:
629
          assert not op.input.fail
630
          self.assertEqual(opstatus, constants.OP_STATUS_SUCCESS)
631
          self.assertEqual(opresult, op.input.result)
632
        elif idx <= failto:
633
          assert op.input.fail
634
          self.assertEqual(opstatus, constants.OP_STATUS_ERROR)
635
          self.assertRaises(errors.OpExecError, errors.MaybeRaise, opresult)
636
        else:
637
          assert not op.input.fail
638
          self.assertEqual(opstatus, constants.OP_STATUS_ERROR)
639
          self.assertRaises(errors.OpExecError, errors.MaybeRaise, opresult)
640

    
641
      self.assert_(compat.all(op.start_timestamp and op.end_timestamp
642
                              for op in job.ops[:failfrom]))
643

    
644
      self._GenericCheckJob(job)
645

    
646
      # Calling the processor on a finished job should be a no-op
647
      self.assertTrue(jqueue._JobProcessor(queue, opexec, job)())
648
      self.assertRaises(IndexError, queue.GetNextUpdate)
649

    
650
  def testCancelWhileInQueue(self):
651
    queue = _FakeQueueForProc()
652

    
653
    ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
654
           for i in range(5)]
655

    
656
    # Create job
657
    job_id = 17045
658
    job = self._CreateJob(queue, job_id, ops)
659

    
660
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
661

    
662
    # Mark as cancelled
663
    (success, _) = job.Cancel()
664
    self.assert_(success)
665

    
666
    self.assertRaises(IndexError, queue.GetNextUpdate)
667

    
668
    self.assertFalse(job.start_timestamp)
669
    self.assertTrue(job.end_timestamp)
670
    self.assert_(compat.all(op.status == constants.OP_STATUS_CANCELED
671
                            for op in job.ops))
672

    
673
    # Serialize to check for differences
674
    before_proc = job.Serialize()
675

    
676
    # Simulate processor called in workerpool
677
    opexec = _FakeExecOpCodeForProc(queue, None, None)
678
    self.assert_(jqueue._JobProcessor(queue, opexec, job)())
679

    
680
    # Check result
681
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_CANCELED)
682
    self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_CANCELED])
683
    self.assertFalse(job.start_timestamp)
684
    self.assertTrue(job.end_timestamp)
685
    self.assertFalse(compat.any(op.start_timestamp or op.end_timestamp
686
                                for op in job.ops))
687
    self.assertEqual(job.GetInfo(["opstatus", "opresult"]),
688
                     [[constants.OP_STATUS_CANCELED for _ in job.ops],
689
                      ["Job canceled by request" for _ in job.ops]])
690

    
691
    # Must not have changed or written
692
    self.assertEqual(before_proc, job.Serialize())
693
    self.assertRaises(IndexError, queue.GetNextUpdate)
694

    
695
  def testCancelWhileWaitlockInQueue(self):
696
    queue = _FakeQueueForProc()
697

    
698
    ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
699
           for i in range(5)]
700

    
701
    # Create job
702
    job_id = 8645
703
    job = self._CreateJob(queue, job_id, ops)
704

    
705
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
706

    
707
    job.ops[0].status = constants.OP_STATUS_WAITLOCK
708

    
709
    assert len(job.ops) == 5
710

    
711
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITLOCK)
712

    
713
    # Mark as cancelling
714
    (success, _) = job.Cancel()
715
    self.assert_(success)
716

    
717
    self.assertRaises(IndexError, queue.GetNextUpdate)
718

    
719
    self.assert_(compat.all(op.status == constants.OP_STATUS_CANCELING
720
                            for op in job.ops))
721

    
722
    opexec = _FakeExecOpCodeForProc(queue, None, None)
723
    self.assert_(jqueue._JobProcessor(queue, opexec, job)())
724

    
725
    # Check result
726
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_CANCELED)
727
    self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_CANCELED])
728
    self.assertFalse(job.start_timestamp)
729
    self.assert_(job.end_timestamp)
730
    self.assertFalse(compat.any(op.start_timestamp or op.end_timestamp
731
                                for op in job.ops))
732
    self.assertEqual(job.GetInfo(["opstatus", "opresult"]),
733
                     [[constants.OP_STATUS_CANCELED for _ in job.ops],
734
                      ["Job canceled by request" for _ in job.ops]])
735

    
736
  def testCancelWhileWaitlock(self):
737
    queue = _FakeQueueForProc()
738

    
739
    ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
740
           for i in range(5)]
741

    
742
    # Create job
743
    job_id = 11009
744
    job = self._CreateJob(queue, job_id, ops)
745

    
746
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
747

    
748
    def _BeforeStart(timeout, priority):
749
      self.assertEqual(queue.GetNextUpdate(), (job, True))
750
      self.assertRaises(IndexError, queue.GetNextUpdate)
751
      self.assertFalse(queue.IsAcquired())
752
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITLOCK)
753

    
754
      # Mark as cancelled
755
      (success, _) = job.Cancel()
756
      self.assert_(success)
757

    
758
      self.assert_(compat.all(op.status == constants.OP_STATUS_CANCELING
759
                              for op in job.ops))
760
      self.assertRaises(IndexError, queue.GetNextUpdate)
761

    
762
    def _AfterStart(op, cbs):
763
      self.assertEqual(queue.GetNextUpdate(), (job, True))
764
      self.assertRaises(IndexError, queue.GetNextUpdate)
765
      self.assertFalse(queue.IsAcquired())
766
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
767

    
768
    opexec = _FakeExecOpCodeForProc(queue, _BeforeStart, _AfterStart)
769

    
770
    self.assertRaises(IndexError, queue.GetNextUpdate)
771
    self.assert_(jqueue._JobProcessor(queue, opexec, job)())
772
    self.assertEqual(queue.GetNextUpdate(), (job, True))
773
    self.assertRaises(IndexError, queue.GetNextUpdate)
774

    
775
    # Check result
776
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_CANCELED)
777
    self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_CANCELED])
778
    self.assert_(job.start_timestamp)
779
    self.assert_(job.end_timestamp)
780
    self.assertFalse(compat.all(op.start_timestamp and op.end_timestamp
781
                                for op in job.ops))
782
    self.assertEqual(job.GetInfo(["opstatus", "opresult"]),
783
                     [[constants.OP_STATUS_CANCELED for _ in job.ops],
784
                      ["Job canceled by request" for _ in job.ops]])
785

    
786
  def testCancelWhileWaitlockWithTimeout(self):
787
    queue = _FakeQueueForProc()
788

    
789
    ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
790
           for i in range(5)]
791

    
792
    # Create job
793
    job_id = 24314
794
    job = self._CreateJob(queue, job_id, ops)
795

    
796
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
797

    
798
    def _BeforeStart(timeout, priority):
799
      self.assertFalse(queue.IsAcquired())
800
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITLOCK)
801

    
802
      # Mark as cancelled
803
      (success, _) = job.Cancel()
804
      self.assert_(success)
805

    
806
      self.assert_(compat.all(op.status == constants.OP_STATUS_CANCELING
807
                              for op in job.ops))
808

    
809
      # Fake an acquire attempt timing out
810
      raise mcpu.LockAcquireTimeout()
811

    
812
    def _AfterStart(op, cbs):
813
      self.fail("Should not reach this")
814

    
815
    opexec = _FakeExecOpCodeForProc(queue, _BeforeStart, _AfterStart)
816

    
817
    self.assert_(jqueue._JobProcessor(queue, opexec, job)())
818

    
819
    # Check result
820
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_CANCELED)
821
    self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_CANCELED])
822
    self.assert_(job.start_timestamp)
823
    self.assert_(job.end_timestamp)
824
    self.assertFalse(compat.all(op.start_timestamp and op.end_timestamp
825
                                for op in job.ops))
826
    self.assertEqual(job.GetInfo(["opstatus", "opresult"]),
827
                     [[constants.OP_STATUS_CANCELED for _ in job.ops],
828
                      ["Job canceled by request" for _ in job.ops]])
829

    
830
  def testCancelWhileRunning(self):
831
    # Tests canceling a job with finished opcodes and more, unprocessed ones
832
    queue = _FakeQueueForProc()
833

    
834
    ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
835
           for i in range(3)]
836

    
837
    # Create job
838
    job_id = 28492
839
    job = self._CreateJob(queue, job_id, ops)
840

    
841
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
842

    
843
    opexec = _FakeExecOpCodeForProc(queue, None, None)
844

    
845
    # Run one opcode
846
    self.assertFalse(jqueue._JobProcessor(queue, opexec, job)())
847

    
848
    # Job goes back to queued
849
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
850
    self.assertEqual(job.GetInfo(["opstatus", "opresult"]),
851
                     [[constants.OP_STATUS_SUCCESS,
852
                       constants.OP_STATUS_QUEUED,
853
                       constants.OP_STATUS_QUEUED],
854
                      ["Res0", None, None]])
855

    
856
    # Mark as cancelled
857
    (success, _) = job.Cancel()
858
    self.assert_(success)
859

    
860
    # Try processing another opcode (this will actually cancel the job)
861
    self.assert_(jqueue._JobProcessor(queue, opexec, job)())
862

    
863
    # Check result
864
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_CANCELED)
865
    self.assertEqual(job.GetInfo(["id"]), [job_id])
866
    self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_CANCELED])
867
    self.assertEqual(job.GetInfo(["opstatus", "opresult"]),
868
                     [[constants.OP_STATUS_SUCCESS,
869
                       constants.OP_STATUS_CANCELED,
870
                       constants.OP_STATUS_CANCELED],
871
                      ["Res0", "Job canceled by request",
872
                       "Job canceled by request"]])
873

    
874
  def testPartiallyRun(self):
875
    # Tests calling the processor on a job that's been partially run before the
876
    # program was restarted
877
    queue = _FakeQueueForProc()
878

    
879
    opexec = _FakeExecOpCodeForProc(queue, None, None)
880

    
881
    for job_id, successcount in [(30697, 1), (2552, 4), (12489, 9)]:
882
      ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
883
             for i in range(10)]
884

    
885
      # Create job
886
      job = self._CreateJob(queue, job_id, ops)
887

    
888
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
889

    
890
      for _ in range(successcount):
891
        self.assertFalse(jqueue._JobProcessor(queue, opexec, job)())
892

    
893
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
894
      self.assertEqual(job.GetInfo(["opstatus"]),
895
                       [[constants.OP_STATUS_SUCCESS
896
                         for _ in range(successcount)] +
897
                        [constants.OP_STATUS_QUEUED
898
                         for _ in range(len(ops) - successcount)]])
899

    
900
      self.assert_(job.ops_iter)
901

    
902
      # Serialize and restore (simulates program restart)
903
      newjob = jqueue._QueuedJob.Restore(queue, job.Serialize())
904
      self.assertFalse(newjob.ops_iter)
905
      self._TestPartial(newjob, successcount)
906

    
907
  def _TestPartial(self, job, successcount):
908
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
909
    self.assertEqual(job.start_timestamp, job.ops[0].start_timestamp)
910

    
911
    queue = _FakeQueueForProc()
912
    opexec = _FakeExecOpCodeForProc(queue, None, None)
913

    
914
    for remaining in reversed(range(len(job.ops) - successcount)):
915
      result = jqueue._JobProcessor(queue, opexec, job)()
916
      self.assertEqual(queue.GetNextUpdate(), (job, True))
917
      self.assertEqual(queue.GetNextUpdate(), (job, True))
918
      self.assertEqual(queue.GetNextUpdate(), (job, True))
919
      self.assertRaises(IndexError, queue.GetNextUpdate)
920

    
921
      if remaining == 0:
922
        # Last opcode
923
        self.assert_(result)
924
        break
925

    
926
      self.assertFalse(result)
927

    
928
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
929

    
930
    self.assertRaises(IndexError, queue.GetNextUpdate)
931
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_SUCCESS)
932
    self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_SUCCESS])
933
    self.assertEqual(job.GetInfo(["opresult"]),
934
                     [[op.input.result for op in job.ops]])
935
    self.assertEqual(job.GetInfo(["opstatus"]),
936
                     [[constants.OP_STATUS_SUCCESS for _ in job.ops]])
937
    self.assert_(compat.all(op.start_timestamp and op.end_timestamp
938
                            for op in job.ops))
939

    
940
    self._GenericCheckJob(job)
941

    
942
    # Calling the processor on a finished job should be a no-op
943
    self.assertTrue(jqueue._JobProcessor(queue, opexec, job)())
944
    self.assertRaises(IndexError, queue.GetNextUpdate)
945

    
946
    # ... also after being restored
947
    job2 = jqueue._QueuedJob.Restore(queue, job.Serialize())
948
    self.assertTrue(jqueue._JobProcessor(queue, opexec, job2)())
949
    self.assertRaises(IndexError, queue.GetNextUpdate)
950

    
951
  def testProcessorOnRunningJob(self):
952
    ops = [opcodes.OpTestDummy(result="result", fail=False)]
953

    
954
    queue = _FakeQueueForProc()
955
    opexec = _FakeExecOpCodeForProc(queue, None, None)
956

    
957
    # Create job
958
    job = self._CreateJob(queue, 9571, ops)
959

    
960
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
961

    
962
    job.ops[0].status = constants.OP_STATUS_RUNNING
963

    
964
    assert len(job.ops) == 1
965

    
966
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
967

    
968
    # Calling on running job must fail
969
    self.assertRaises(errors.ProgrammerError,
970
                      jqueue._JobProcessor(queue, opexec, job))
971

    
972
  def testLogMessages(self):
973
    # Tests the "Feedback" callback function
974
    queue = _FakeQueueForProc()
975

    
976
    messages = {
977
      1: [
978
        (None, "Hello"),
979
        (None, "World"),
980
        (constants.ELOG_MESSAGE, "there"),
981
        ],
982
      4: [
983
        (constants.ELOG_JQUEUE_TEST, (1, 2, 3)),
984
        (constants.ELOG_JQUEUE_TEST, ("other", "type")),
985
        ],
986
      }
987
    ops = [opcodes.OpTestDummy(result="Logtest%s" % i, fail=False,
988
                               messages=messages.get(i, []))
989
           for i in range(5)]
990

    
991
    # Create job
992
    job = self._CreateJob(queue, 29386, ops)
993

    
994
    def _BeforeStart(timeout, priority):
995
      self.assertEqual(queue.GetNextUpdate(), (job, True))
996
      self.assertRaises(IndexError, queue.GetNextUpdate)
997
      self.assertFalse(queue.IsAcquired())
998
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITLOCK)
999

    
1000
    def _AfterStart(op, cbs):
1001
      self.assertEqual(queue.GetNextUpdate(), (job, True))
1002
      self.assertRaises(IndexError, queue.GetNextUpdate)
1003
      self.assertFalse(queue.IsAcquired())
1004
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
1005

    
1006
      self.assertRaises(AssertionError, cbs.Feedback,
1007
                        "too", "many", "arguments")
1008

    
1009
      for (log_type, msg) in op.messages:
1010
        self.assertRaises(IndexError, queue.GetNextUpdate)
1011
        if log_type:
1012
          cbs.Feedback(log_type, msg)
1013
        else:
1014
          cbs.Feedback(msg)
1015
        # Check for job update without replication
1016
        self.assertEqual(queue.GetNextUpdate(), (job, False))
1017
        self.assertRaises(IndexError, queue.GetNextUpdate)
1018

    
1019
    opexec = _FakeExecOpCodeForProc(queue, _BeforeStart, _AfterStart)
1020

    
1021
    for remaining in reversed(range(len(job.ops))):
1022
      self.assertRaises(IndexError, queue.GetNextUpdate)
1023
      result = jqueue._JobProcessor(queue, opexec, job)()
1024
      self.assertEqual(queue.GetNextUpdate(), (job, True))
1025
      self.assertRaises(IndexError, queue.GetNextUpdate)
1026

    
1027
      if remaining == 0:
1028
        # Last opcode
1029
        self.assert_(result)
1030
        break
1031

    
1032
      self.assertFalse(result)
1033

    
1034
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1035

    
1036
    self.assertRaises(IndexError, queue.GetNextUpdate)
1037

    
1038
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_SUCCESS)
1039
    self.assertEqual(job.GetInfo(["opresult"]),
1040
                     [[op.input.result for op in job.ops]])
1041

    
1042
    logmsgcount = sum(len(m) for m in messages.values())
1043

    
1044
    self._CheckLogMessages(job, logmsgcount)
1045

    
1046
    # Serialize and restore (simulates program restart)
1047
    newjob = jqueue._QueuedJob.Restore(queue, job.Serialize())
1048
    self._CheckLogMessages(newjob, logmsgcount)
1049

    
1050
    # Check each message
1051
    prevserial = -1
1052
    for idx, oplog in enumerate(job.GetInfo(["oplog"])[0]):
1053
      for (serial, timestamp, log_type, msg) in oplog:
1054
        (exptype, expmsg) = messages.get(idx).pop(0)
1055
        if exptype:
1056
          self.assertEqual(log_type, exptype)
1057
        else:
1058
          self.assertEqual(log_type, constants.ELOG_MESSAGE)
1059
        self.assertEqual(expmsg, msg)
1060
        self.assert_(serial > prevserial)
1061
        prevserial = serial
1062

    
1063
  def _CheckLogMessages(self, job, count):
1064
    # Check serial
1065
    self.assertEqual(job.log_serial, count)
1066

    
1067
    # No filter
1068
    self.assertEqual(job.GetLogEntries(None),
1069
                     [entry for entries in job.GetInfo(["oplog"])[0] if entries
1070
                      for entry in entries])
1071

    
1072
    # Filter with serial
1073
    assert count > 3
1074
    self.assert_(job.GetLogEntries(3))
1075
    self.assertEqual(job.GetLogEntries(3),
1076
                     [entry for entries in job.GetInfo(["oplog"])[0] if entries
1077
                      for entry in entries][3:])
1078

    
1079
    # No log message after highest serial
1080
    self.assertFalse(job.GetLogEntries(count))
1081
    self.assertFalse(job.GetLogEntries(count + 3))
1082

    
1083

    
1084
class _FakeTimeoutStrategy:
1085
  def __init__(self, timeouts):
1086
    self.timeouts = timeouts
1087
    self.attempts = 0
1088
    self.last_timeout = None
1089

    
1090
  def NextAttempt(self):
1091
    self.attempts += 1
1092
    if self.timeouts:
1093
      timeout = self.timeouts.pop(0)
1094
    else:
1095
      timeout = None
1096
    self.last_timeout = timeout
1097
    return timeout
1098

    
1099

    
1100
class TestJobProcessorTimeouts(unittest.TestCase, _JobProcessorTestUtils):
1101
  def setUp(self):
1102
    self.queue = _FakeQueueForProc()
1103
    self.job = None
1104
    self.curop = None
1105
    self.opcounter = None
1106
    self.timeout_strategy = None
1107
    self.retries = 0
1108
    self.prev_tsop = None
1109
    self.prev_prio = None
1110
    self.prev_status = None
1111
    self.lock_acq_prio = None
1112
    self.gave_lock = None
1113
    self.done_lock_before_blocking = False
1114

    
1115
  def _BeforeStart(self, timeout, priority):
1116
    job = self.job
1117

    
1118
    # If status has changed, job must've been written
1119
    if self.prev_status != self.job.ops[self.curop].status:
1120
      self.assertEqual(self.queue.GetNextUpdate(), (job, True))
1121
    self.assertRaises(IndexError, self.queue.GetNextUpdate)
1122

    
1123
    self.assertFalse(self.queue.IsAcquired())
1124
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITLOCK)
1125

    
1126
    ts = self.timeout_strategy
1127

    
1128
    self.assert_(timeout is None or isinstance(timeout, (int, float)))
1129
    self.assertEqual(timeout, ts.last_timeout)
1130
    self.assertEqual(priority, job.ops[self.curop].priority)
1131

    
1132
    self.gave_lock = True
1133
    self.lock_acq_prio = priority
1134

    
1135
    if (self.curop == 3 and
1136
        job.ops[self.curop].priority == constants.OP_PRIO_HIGHEST + 3):
1137
      # Give locks before running into blocking acquire
1138
      assert self.retries == 7
1139
      self.retries = 0
1140
      self.done_lock_before_blocking = True
1141
      return
1142

    
1143
    if self.retries > 0:
1144
      self.assert_(timeout is not None)
1145
      self.retries -= 1
1146
      self.gave_lock = False
1147
      raise mcpu.LockAcquireTimeout()
1148

    
1149
    if job.ops[self.curop].priority == constants.OP_PRIO_HIGHEST:
1150
      assert self.retries == 0, "Didn't exhaust all retries at highest priority"
1151
      assert not ts.timeouts
1152
      self.assert_(timeout is None)
1153

    
1154
  def _AfterStart(self, op, cbs):
1155
    job = self.job
1156

    
1157
    # Setting to "running" requires an update
1158
    self.assertEqual(self.queue.GetNextUpdate(), (job, True))
1159
    self.assertRaises(IndexError, self.queue.GetNextUpdate)
1160

    
1161
    self.assertFalse(self.queue.IsAcquired())
1162
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
1163

    
1164
    # Job is running, cancelling shouldn't be possible
1165
    (success, _) = job.Cancel()
1166
    self.assertFalse(success)
1167

    
1168
  def _NextOpcode(self):
1169
    self.curop = self.opcounter.next()
1170
    self.prev_prio = self.job.ops[self.curop].priority
1171
    self.prev_status = self.job.ops[self.curop].status
1172

    
1173
  def _NewTimeoutStrategy(self):
1174
    job = self.job
1175

    
1176
    self.assertEqual(self.retries, 0)
1177

    
1178
    if self.prev_tsop == self.curop:
1179
      # Still on the same opcode, priority must've been increased
1180
      self.assertEqual(self.prev_prio, job.ops[self.curop].priority + 1)
1181

    
1182
    if self.curop == 1:
1183
      # Normal retry
1184
      timeouts = range(10, 31, 10)
1185
      self.retries = len(timeouts) - 1
1186

    
1187
    elif self.curop == 2:
1188
      # Let this run into a blocking acquire
1189
      timeouts = range(11, 61, 12)
1190
      self.retries = len(timeouts)
1191

    
1192
    elif self.curop == 3:
1193
      # Wait for priority to increase, but give lock before blocking acquire
1194
      timeouts = range(12, 100, 14)
1195
      self.retries = len(timeouts)
1196

    
1197
      self.assertFalse(self.done_lock_before_blocking)
1198

    
1199
    elif self.curop == 4:
1200
      self.assert_(self.done_lock_before_blocking)
1201

    
1202
      # Timeouts, but no need to retry
1203
      timeouts = range(10, 31, 10)
1204
      self.retries = 0
1205

    
1206
    elif self.curop == 5:
1207
      # Normal retry
1208
      timeouts = range(19, 100, 11)
1209
      self.retries = len(timeouts)
1210

    
1211
    else:
1212
      timeouts = []
1213
      self.retries = 0
1214

    
1215
    assert len(job.ops) == 10
1216
    assert self.retries <= len(timeouts)
1217

    
1218
    ts = _FakeTimeoutStrategy(timeouts)
1219

    
1220
    self.timeout_strategy = ts
1221
    self.prev_tsop = self.curop
1222
    self.prev_prio = job.ops[self.curop].priority
1223

    
1224
    return ts
1225

    
1226
  def testTimeout(self):
1227
    ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
1228
           for i in range(10)]
1229

    
1230
    # Create job
1231
    job_id = 15801
1232
    job = self._CreateJob(self.queue, job_id, ops)
1233
    self.job = job
1234

    
1235
    self.opcounter = itertools.count(0)
1236

    
1237
    opexec = _FakeExecOpCodeForProc(self.queue, self._BeforeStart,
1238
                                    self._AfterStart)
1239
    tsf = self._NewTimeoutStrategy
1240

    
1241
    self.assertFalse(self.done_lock_before_blocking)
1242

    
1243
    while True:
1244
      proc = jqueue._JobProcessor(self.queue, opexec, job,
1245
                                  _timeout_strategy_factory=tsf)
1246

    
1247
      self.assertRaises(IndexError, self.queue.GetNextUpdate)
1248

    
1249
      if self.curop is not None:
1250
        self.prev_status = self.job.ops[self.curop].status
1251

    
1252
      self.lock_acq_prio = None
1253

    
1254
      result = proc(_nextop_fn=self._NextOpcode)
1255
      assert self.curop is not None
1256

    
1257
      if result or self.gave_lock:
1258
        # Got lock and/or job is done, result must've been written
1259
        self.assertFalse(job.cur_opctx)
1260
        self.assertEqual(self.queue.GetNextUpdate(), (job, True))
1261
        self.assertRaises(IndexError, self.queue.GetNextUpdate)
1262
        self.assertEqual(self.lock_acq_prio, job.ops[self.curop].priority)
1263
        self.assert_(job.ops[self.curop].exec_timestamp)
1264

    
1265
      if result:
1266
        self.assertFalse(job.cur_opctx)
1267
        break
1268

    
1269
      self.assertFalse(result)
1270

    
1271
      if self.curop == 0:
1272
        self.assertEqual(job.ops[self.curop].start_timestamp,
1273
                         job.start_timestamp)
1274

    
1275
      if self.gave_lock:
1276
        # Opcode finished, but job not yet done
1277
        self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1278
      else:
1279
        # Did not get locks
1280
        self.assert_(job.cur_opctx)
1281
        self.assertEqual(job.cur_opctx._timeout_strategy._fn,
1282
                         self.timeout_strategy.NextAttempt)
1283
        self.assertFalse(job.ops[self.curop].exec_timestamp)
1284
        self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITLOCK)
1285

    
1286
        # If priority has changed since acquiring locks, the job must've been
1287
        # updated
1288
        if self.lock_acq_prio != job.ops[self.curop].priority:
1289
          self.assertEqual(self.queue.GetNextUpdate(), (job, True))
1290

    
1291
      self.assertRaises(IndexError, self.queue.GetNextUpdate)
1292

    
1293
      self.assert_(job.start_timestamp)
1294
      self.assertFalse(job.end_timestamp)
1295

    
1296
    self.assertEqual(self.curop, len(job.ops) - 1)
1297
    self.assertEqual(self.job, job)
1298
    self.assertEqual(self.opcounter.next(), len(job.ops))
1299
    self.assert_(self.done_lock_before_blocking)
1300

    
1301
    self.assertRaises(IndexError, self.queue.GetNextUpdate)
1302
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_SUCCESS)
1303
    self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_SUCCESS])
1304
    self.assertEqual(job.GetInfo(["opresult"]),
1305
                     [[op.input.result for op in job.ops]])
1306
    self.assertEqual(job.GetInfo(["opstatus"]),
1307
                     [len(job.ops) * [constants.OP_STATUS_SUCCESS]])
1308
    self.assert_(compat.all(op.start_timestamp and op.end_timestamp
1309
                            for op in job.ops))
1310

    
1311
    # Calling the processor on a finished job should be a no-op
1312
    self.assertTrue(jqueue._JobProcessor(self.queue, opexec, job)())
1313
    self.assertRaises(IndexError, self.queue.GetNextUpdate)
1314

    
1315

    
1316
if __name__ == "__main__":
1317
  testutils.GanetiTestProgram()