Statistics
| Branch: | Tag: | Revision:

root / test / ganeti.jqueue_unittest.py @ 30c945d0

History | View | Annotate | Download (42.9 kB)

1
#!/usr/bin/python
2
#
3

    
4
# Copyright (C) 2010 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Script for testing ganeti.jqueue"""
23

    
24
import os
25
import sys
26
import unittest
27
import tempfile
28
import shutil
29
import errno
30
import itertools
31

    
32
from ganeti import constants
33
from ganeti import utils
34
from ganeti import errors
35
from ganeti import jqueue
36
from ganeti import opcodes
37
from ganeti import compat
38
from ganeti import mcpu
39

    
40
import testutils
41

    
42

    
43
class _FakeJob:
44
  def __init__(self, job_id, status):
45
    self.id = job_id
46
    self._status = status
47
    self._log = []
48

    
49
  def SetStatus(self, status):
50
    self._status = status
51

    
52
  def AddLogEntry(self, msg):
53
    self._log.append((len(self._log), msg))
54

    
55
  def CalcStatus(self):
56
    return self._status
57

    
58
  def GetInfo(self, fields):
59
    result = []
60

    
61
    for name in fields:
62
      if name == "status":
63
        result.append(self._status)
64
      else:
65
        raise Exception("Unknown field")
66

    
67
    return result
68

    
69
  def GetLogEntries(self, newer_than):
70
    assert newer_than is None or newer_than >= 0
71

    
72
    if newer_than is None:
73
      return self._log
74

    
75
    return self._log[newer_than:]
76

    
77

    
78
class TestJobChangesChecker(unittest.TestCase):
79
  def testStatus(self):
80
    job = _FakeJob(9094, constants.JOB_STATUS_QUEUED)
81
    checker = jqueue._JobChangesChecker(["status"], None, None)
82
    self.assertEqual(checker(job), ([constants.JOB_STATUS_QUEUED], []))
83

    
84
    job.SetStatus(constants.JOB_STATUS_RUNNING)
85
    self.assertEqual(checker(job), ([constants.JOB_STATUS_RUNNING], []))
86

    
87
    job.SetStatus(constants.JOB_STATUS_SUCCESS)
88
    self.assertEqual(checker(job), ([constants.JOB_STATUS_SUCCESS], []))
89

    
90
    # job.id is used by checker
91
    self.assertEqual(job.id, 9094)
92

    
93
  def testStatusWithPrev(self):
94
    job = _FakeJob(12807, constants.JOB_STATUS_QUEUED)
95
    checker = jqueue._JobChangesChecker(["status"],
96
                                        [constants.JOB_STATUS_QUEUED], None)
97
    self.assert_(checker(job) is None)
98

    
99
    job.SetStatus(constants.JOB_STATUS_RUNNING)
100
    self.assertEqual(checker(job), ([constants.JOB_STATUS_RUNNING], []))
101

    
102
  def testFinalStatus(self):
103
    for status in constants.JOBS_FINALIZED:
104
      job = _FakeJob(2178711, status)
105
      checker = jqueue._JobChangesChecker(["status"], [status], None)
106
      # There won't be any changes in this status, hence it should signal
107
      # a change immediately
108
      self.assertEqual(checker(job), ([status], []))
109

    
110
  def testLog(self):
111
    job = _FakeJob(9094, constants.JOB_STATUS_RUNNING)
112
    checker = jqueue._JobChangesChecker(["status"], None, None)
113
    self.assertEqual(checker(job), ([constants.JOB_STATUS_RUNNING], []))
114

    
115
    job.AddLogEntry("Hello World")
116
    (job_info, log_entries) = checker(job)
117
    self.assertEqual(job_info, [constants.JOB_STATUS_RUNNING])
118
    self.assertEqual(log_entries, [[0, "Hello World"]])
119

    
120
    checker2 = jqueue._JobChangesChecker(["status"], job_info, len(log_entries))
121
    self.assert_(checker2(job) is None)
122

    
123
    job.AddLogEntry("Foo Bar")
124
    job.SetStatus(constants.JOB_STATUS_ERROR)
125

    
126
    (job_info, log_entries) = checker2(job)
127
    self.assertEqual(job_info, [constants.JOB_STATUS_ERROR])
128
    self.assertEqual(log_entries, [[1, "Foo Bar"]])
129

    
130
    checker3 = jqueue._JobChangesChecker(["status"], None, None)
131
    (job_info, log_entries) = checker3(job)
132
    self.assertEqual(job_info, [constants.JOB_STATUS_ERROR])
133
    self.assertEqual(log_entries, [[0, "Hello World"], [1, "Foo Bar"]])
134

    
135

    
136
class TestJobChangesWaiter(unittest.TestCase):
137
  def setUp(self):
138
    self.tmpdir = tempfile.mkdtemp()
139
    self.filename = utils.PathJoin(self.tmpdir, "job-1")
140
    utils.WriteFile(self.filename, data="")
141

    
142
  def tearDown(self):
143
    shutil.rmtree(self.tmpdir)
144

    
145
  def _EnsureNotifierClosed(self, notifier):
146
    try:
147
      os.fstat(notifier._fd)
148
    except EnvironmentError, err:
149
      self.assertEqual(err.errno, errno.EBADF)
150
    else:
151
      self.fail("File descriptor wasn't closed")
152

    
153
  def testClose(self):
154
    for wait in [False, True]:
155
      waiter = jqueue._JobFileChangesWaiter(self.filename)
156
      try:
157
        if wait:
158
          waiter.Wait(0.001)
159
      finally:
160
        waiter.Close()
161

    
162
      # Ensure file descriptor was closed
163
      self._EnsureNotifierClosed(waiter._notifier)
164

    
165
  def testChangingFile(self):
166
    waiter = jqueue._JobFileChangesWaiter(self.filename)
167
    try:
168
      self.assertFalse(waiter.Wait(0.1))
169
      utils.WriteFile(self.filename, data="changed")
170
      self.assert_(waiter.Wait(60))
171
    finally:
172
      waiter.Close()
173

    
174
    self._EnsureNotifierClosed(waiter._notifier)
175

    
176
  def testChangingFile2(self):
177
    waiter = jqueue._JobChangesWaiter(self.filename)
178
    try:
179
      self.assertFalse(waiter._filewaiter)
180
      self.assert_(waiter.Wait(0.1))
181
      self.assert_(waiter._filewaiter)
182

    
183
      # File waiter is now used, but there have been no changes
184
      self.assertFalse(waiter.Wait(0.1))
185
      utils.WriteFile(self.filename, data="changed")
186
      self.assert_(waiter.Wait(60))
187
    finally:
188
      waiter.Close()
189

    
190
    self._EnsureNotifierClosed(waiter._filewaiter._notifier)
191

    
192

    
193
class TestWaitForJobChangesHelper(unittest.TestCase):
194
  def setUp(self):
195
    self.tmpdir = tempfile.mkdtemp()
196
    self.filename = utils.PathJoin(self.tmpdir, "job-2614226563")
197
    utils.WriteFile(self.filename, data="")
198

    
199
  def tearDown(self):
200
    shutil.rmtree(self.tmpdir)
201

    
202
  def _LoadWaitingJob(self):
203
    return _FakeJob(2614226563, constants.JOB_STATUS_WAITLOCK)
204

    
205
  def _LoadLostJob(self):
206
    return None
207

    
208
  def testNoChanges(self):
209
    wfjc = jqueue._WaitForJobChangesHelper()
210

    
211
    # No change
212
    self.assertEqual(wfjc(self.filename, self._LoadWaitingJob, ["status"],
213
                          [constants.JOB_STATUS_WAITLOCK], None, 0.1),
214
                     constants.JOB_NOTCHANGED)
215

    
216
    # No previous information
217
    self.assertEqual(wfjc(self.filename, self._LoadWaitingJob,
218
                          ["status"], None, None, 1.0),
219
                     ([constants.JOB_STATUS_WAITLOCK], []))
220

    
221
  def testLostJob(self):
222
    wfjc = jqueue._WaitForJobChangesHelper()
223
    self.assert_(wfjc(self.filename, self._LoadLostJob,
224
                      ["status"], None, None, 1.0) is None)
225

    
226

    
227
class TestEncodeOpError(unittest.TestCase):
228
  def test(self):
229
    encerr = jqueue._EncodeOpError(errors.LockError("Test 1"))
230
    self.assert_(isinstance(encerr, tuple))
231
    self.assertRaises(errors.LockError, errors.MaybeRaise, encerr)
232

    
233
    encerr = jqueue._EncodeOpError(errors.GenericError("Test 2"))
234
    self.assert_(isinstance(encerr, tuple))
235
    self.assertRaises(errors.GenericError, errors.MaybeRaise, encerr)
236

    
237
    encerr = jqueue._EncodeOpError(NotImplementedError("Foo"))
238
    self.assert_(isinstance(encerr, tuple))
239
    self.assertRaises(errors.OpExecError, errors.MaybeRaise, encerr)
240

    
241
    encerr = jqueue._EncodeOpError("Hello World")
242
    self.assert_(isinstance(encerr, tuple))
243
    self.assertRaises(errors.OpExecError, errors.MaybeRaise, encerr)
244

    
245

    
246
class TestQueuedOpCode(unittest.TestCase):
247
  def testDefaults(self):
248
    def _Check(op):
249
      self.assertFalse(hasattr(op.input, "dry_run"))
250
      self.assertEqual(op.priority, constants.OP_PRIO_DEFAULT)
251
      self.assertFalse(op.log)
252
      self.assert_(op.start_timestamp is None)
253
      self.assert_(op.exec_timestamp is None)
254
      self.assert_(op.end_timestamp is None)
255
      self.assert_(op.result is None)
256
      self.assertEqual(op.status, constants.OP_STATUS_QUEUED)
257

    
258
    op1 = jqueue._QueuedOpCode(opcodes.OpTestDelay())
259
    _Check(op1)
260
    op2 = jqueue._QueuedOpCode.Restore(op1.Serialize())
261
    _Check(op2)
262
    self.assertEqual(op1.Serialize(), op2.Serialize())
263

    
264
  def testPriority(self):
265
    def _Check(op):
266
      assert constants.OP_PRIO_DEFAULT != constants.OP_PRIO_HIGH, \
267
             "Default priority equals high priority; test can't work"
268
      self.assertEqual(op.priority, constants.OP_PRIO_HIGH)
269
      self.assertEqual(op.status, constants.OP_STATUS_QUEUED)
270

    
271
    inpop = opcodes.OpGetTags(priority=constants.OP_PRIO_HIGH)
272
    op1 = jqueue._QueuedOpCode(inpop)
273
    _Check(op1)
274
    op2 = jqueue._QueuedOpCode.Restore(op1.Serialize())
275
    _Check(op2)
276
    self.assertEqual(op1.Serialize(), op2.Serialize())
277

    
278

    
279
class TestQueuedJob(unittest.TestCase):
280
  def test(self):
281
    self.assertRaises(errors.GenericError, jqueue._QueuedJob,
282
                      None, 1, [])
283

    
284
  def testDefaults(self):
285
    job_id = 4260
286
    ops = [
287
      opcodes.OpGetTags(),
288
      opcodes.OpTestDelay(),
289
      ]
290

    
291
    def _Check(job):
292
      self.assertEqual(job.id, job_id)
293
      self.assertEqual(job.log_serial, 0)
294
      self.assert_(job.received_timestamp)
295
      self.assert_(job.start_timestamp is None)
296
      self.assert_(job.end_timestamp is None)
297
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
298
      self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT)
299
      self.assert_(repr(job).startswith("<"))
300
      self.assertEqual(len(job.ops), len(ops))
301
      self.assert_(compat.all(inp.__getstate__() == op.input.__getstate__()
302
                              for (inp, op) in zip(ops, job.ops)))
303
      self.assertRaises(errors.OpExecError, job.GetInfo,
304
                        ["unknown-field"])
305
      self.assertEqual(job.GetInfo(["summary"]),
306
                       [[op.input.Summary() for op in job.ops]])
307

    
308
    job1 = jqueue._QueuedJob(None, job_id, ops)
309
    _Check(job1)
310
    job2 = jqueue._QueuedJob.Restore(None, job1.Serialize())
311
    _Check(job2)
312
    self.assertEqual(job1.Serialize(), job2.Serialize())
313

    
314
  def testPriority(self):
315
    job_id = 4283
316
    ops = [
317
      opcodes.OpGetTags(priority=constants.OP_PRIO_DEFAULT),
318
      opcodes.OpTestDelay(),
319
      ]
320

    
321
    def _Check(job):
322
      self.assertEqual(job.id, job_id)
323
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
324
      self.assert_(repr(job).startswith("<"))
325

    
326
    job = jqueue._QueuedJob(None, job_id, ops)
327
    _Check(job)
328
    self.assert_(compat.all(op.priority == constants.OP_PRIO_DEFAULT
329
                            for op in job.ops))
330
    self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT)
331

    
332
    # Increase first
333
    job.ops[0].priority -= 1
334
    _Check(job)
335
    self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT - 1)
336

    
337
    # Mark opcode as finished
338
    job.ops[0].status = constants.OP_STATUS_SUCCESS
339
    _Check(job)
340
    self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT)
341

    
342
    # Increase second
343
    job.ops[1].priority -= 10
344
    self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT - 10)
345

    
346
    # Test increasing first
347
    job.ops[0].status = constants.OP_STATUS_RUNNING
348
    job.ops[0].priority -= 19
349
    self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT - 20)
350

    
351
  def testCalcStatus(self):
352
    def _Queued(ops):
353
      # The default status is "queued"
354
      self.assert_(compat.all(op.status == constants.OP_STATUS_QUEUED
355
                              for op in ops))
356

    
357
    def _Waitlock1(ops):
358
      ops[0].status = constants.OP_STATUS_WAITLOCK
359

    
360
    def _Waitlock2(ops):
361
      ops[0].status = constants.OP_STATUS_SUCCESS
362
      ops[1].status = constants.OP_STATUS_SUCCESS
363
      ops[2].status = constants.OP_STATUS_WAITLOCK
364

    
365
    def _Running(ops):
366
      ops[0].status = constants.OP_STATUS_SUCCESS
367
      ops[1].status = constants.OP_STATUS_RUNNING
368
      for op in ops[2:]:
369
        op.status = constants.OP_STATUS_QUEUED
370

    
371
    def _Canceling1(ops):
372
      ops[0].status = constants.OP_STATUS_SUCCESS
373
      ops[1].status = constants.OP_STATUS_SUCCESS
374
      for op in ops[2:]:
375
        op.status = constants.OP_STATUS_CANCELING
376

    
377
    def _Canceling2(ops):
378
      for op in ops:
379
        op.status = constants.OP_STATUS_CANCELING
380

    
381
    def _Canceled(ops):
382
      for op in ops:
383
        op.status = constants.OP_STATUS_CANCELED
384

    
385
    def _Error1(ops):
386
      for idx, op in enumerate(ops):
387
        if idx > 3:
388
          op.status = constants.OP_STATUS_ERROR
389
        else:
390
          op.status = constants.OP_STATUS_SUCCESS
391

    
392
    def _Error2(ops):
393
      for op in ops:
394
        op.status = constants.OP_STATUS_ERROR
395

    
396
    def _Success(ops):
397
      for op in ops:
398
        op.status = constants.OP_STATUS_SUCCESS
399

    
400
    tests = {
401
      constants.JOB_STATUS_QUEUED: [_Queued],
402
      constants.JOB_STATUS_WAITLOCK: [_Waitlock1, _Waitlock2],
403
      constants.JOB_STATUS_RUNNING: [_Running],
404
      constants.JOB_STATUS_CANCELING: [_Canceling1, _Canceling2],
405
      constants.JOB_STATUS_CANCELED: [_Canceled],
406
      constants.JOB_STATUS_ERROR: [_Error1, _Error2],
407
      constants.JOB_STATUS_SUCCESS: [_Success],
408
      }
409

    
410
    def _NewJob():
411
      job = jqueue._QueuedJob(None, 1,
412
                              [opcodes.OpTestDelay() for _ in range(10)])
413
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
414
      self.assert_(compat.all(op.status == constants.OP_STATUS_QUEUED
415
                              for op in job.ops))
416
      return job
417

    
418
    for status in constants.JOB_STATUS_ALL:
419
      sttests = tests[status]
420
      assert sttests
421
      for fn in sttests:
422
        job = _NewJob()
423
        fn(job.ops)
424
        self.assertEqual(job.CalcStatus(), status)
425

    
426

    
427
class _FakeQueueForProc:
428
  def __init__(self):
429
    self._acquired = False
430
    self._updates = []
431

    
432
  def IsAcquired(self):
433
    return self._acquired
434

    
435
  def GetNextUpdate(self):
436
    return self._updates.pop(0)
437

    
438
  def acquire(self, shared=0):
439
    assert shared == 1
440
    self._acquired = True
441

    
442
  def release(self):
443
    assert self._acquired
444
    self._acquired = False
445

    
446
  def UpdateJobUnlocked(self, job, replicate=True):
447
    assert self._acquired, "Lock not acquired while updating job"
448
    self._updates.append((job, bool(replicate)))
449

    
450

    
451
class _FakeExecOpCodeForProc:
452
  def __init__(self, queue, before_start, after_start):
453
    self._queue = queue
454
    self._before_start = before_start
455
    self._after_start = after_start
456

    
457
  def __call__(self, op, cbs, timeout=None, priority=None):
458
    assert isinstance(op, opcodes.OpTestDummy)
459
    assert not self._queue.IsAcquired(), \
460
           "Queue lock not released when executing opcode"
461

    
462
    if self._before_start:
463
      self._before_start(timeout, priority)
464

    
465
    cbs.NotifyStart()
466

    
467
    if self._after_start:
468
      self._after_start(op, cbs)
469

    
470
    # Check again after the callbacks
471
    assert not self._queue.IsAcquired()
472

    
473
    if op.fail:
474
      raise errors.OpExecError("Error requested (%s)" % op.result)
475

    
476
    return op.result
477

    
478

    
479
class _JobProcessorTestUtils:
480
  def _CreateJob(self, queue, job_id, ops):
481
    job = jqueue._QueuedJob(queue, job_id, ops)
482
    self.assertFalse(job.start_timestamp)
483
    self.assertFalse(job.end_timestamp)
484
    self.assertEqual(len(ops), len(job.ops))
485
    self.assert_(compat.all(op.input == inp
486
                            for (op, inp) in zip(job.ops, ops)))
487
    self.assertEqual(job.GetInfo(["ops"]), [[op.__getstate__() for op in ops]])
488
    return job
489

    
490

    
491
class TestJobProcessor(unittest.TestCase, _JobProcessorTestUtils):
492
  def _GenericCheckJob(self, job):
493
    assert compat.all(isinstance(op.input, opcodes.OpTestDummy)
494
                      for op in job.ops)
495

    
496
    self.assertEqual(job.GetInfo(["opstart", "opexec", "opend"]),
497
                     [[op.start_timestamp for op in job.ops],
498
                      [op.exec_timestamp for op in job.ops],
499
                      [op.end_timestamp for op in job.ops]])
500
    self.assertEqual(job.GetInfo(["received_ts", "start_ts", "end_ts"]),
501
                     [job.received_timestamp,
502
                      job.start_timestamp,
503
                      job.end_timestamp])
504
    self.assert_(job.start_timestamp)
505
    self.assert_(job.end_timestamp)
506
    self.assertEqual(job.start_timestamp, job.ops[0].start_timestamp)
507

    
508
  def testSuccess(self):
509
    queue = _FakeQueueForProc()
510

    
511
    for (job_id, opcount) in [(25351, 1), (6637, 3),
512
                              (24644, 10), (32207, 100)]:
513
      ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
514
             for i in range(opcount)]
515

    
516
      # Create job
517
      job = self._CreateJob(queue, job_id, ops)
518

    
519
      def _BeforeStart(timeout, priority):
520
        self.assertEqual(queue.GetNextUpdate(), (job, True))
521
        self.assertRaises(IndexError, queue.GetNextUpdate)
522
        self.assertFalse(queue.IsAcquired())
523
        self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITLOCK)
524
        self.assertFalse(job.cur_opctx)
525

    
526
      def _AfterStart(op, cbs):
527
        self.assertEqual(queue.GetNextUpdate(), (job, True))
528
        self.assertRaises(IndexError, queue.GetNextUpdate)
529

    
530
        self.assertFalse(queue.IsAcquired())
531
        self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
532
        self.assertFalse(job.cur_opctx)
533

    
534
        # Job is running, cancelling shouldn't be possible
535
        (success, _) = job.Cancel()
536
        self.assertFalse(success)
537

    
538
      opexec = _FakeExecOpCodeForProc(queue, _BeforeStart, _AfterStart)
539

    
540
      for idx in range(len(ops)):
541
        self.assertRaises(IndexError, queue.GetNextUpdate)
542
        result = jqueue._JobProcessor(queue, opexec, job)()
543
        self.assertEqual(queue.GetNextUpdate(), (job, True))
544
        self.assertRaises(IndexError, queue.GetNextUpdate)
545
        if idx == len(ops) - 1:
546
          # Last opcode
547
          self.assert_(result)
548
        else:
549
          self.assertFalse(result)
550

    
551
          self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
552
          self.assert_(job.start_timestamp)
553
          self.assertFalse(job.end_timestamp)
554

    
555
      self.assertRaises(IndexError, queue.GetNextUpdate)
556

    
557
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_SUCCESS)
558
      self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_SUCCESS])
559
      self.assertEqual(job.GetInfo(["opresult"]),
560
                       [[op.input.result for op in job.ops]])
561
      self.assertEqual(job.GetInfo(["opstatus"]),
562
                       [len(job.ops) * [constants.OP_STATUS_SUCCESS]])
563
      self.assert_(compat.all(op.start_timestamp and op.end_timestamp
564
                              for op in job.ops))
565

    
566
      self._GenericCheckJob(job)
567

    
568
      # Finished jobs can't be processed any further
569
      self.assertRaises(errors.ProgrammerError,
570
                        jqueue._JobProcessor(queue, opexec, job))
571

    
572
  def testOpcodeError(self):
573
    queue = _FakeQueueForProc()
574

    
575
    testdata = [
576
      (17077, 1, 0, 0),
577
      (1782, 5, 2, 2),
578
      (18179, 10, 9, 9),
579
      (4744, 10, 3, 8),
580
      (23816, 100, 39, 45),
581
      ]
582

    
583
    for (job_id, opcount, failfrom, failto) in testdata:
584
      # Prepare opcodes
585
      ops = [opcodes.OpTestDummy(result="Res%s" % i,
586
                                 fail=(failfrom <= i and
587
                                       i <= failto))
588
             for i in range(opcount)]
589

    
590
      # Create job
591
      job = self._CreateJob(queue, job_id, ops)
592

    
593
      opexec = _FakeExecOpCodeForProc(queue, None, None)
594

    
595
      for idx in range(len(ops)):
596
        self.assertRaises(IndexError, queue.GetNextUpdate)
597
        result = jqueue._JobProcessor(queue, opexec, job)()
598
        # queued to waitlock
599
        self.assertEqual(queue.GetNextUpdate(), (job, True))
600
        # waitlock to running
601
        self.assertEqual(queue.GetNextUpdate(), (job, True))
602
        # Opcode result
603
        self.assertEqual(queue.GetNextUpdate(), (job, True))
604
        self.assertRaises(IndexError, queue.GetNextUpdate)
605

    
606
        if idx in (failfrom, len(ops) - 1):
607
          # Last opcode
608
          self.assert_(result)
609
          break
610

    
611
        self.assertFalse(result)
612

    
613
        self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
614

    
615
      self.assertRaises(IndexError, queue.GetNextUpdate)
616

    
617
      # Check job status
618
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_ERROR)
619
      self.assertEqual(job.GetInfo(["id"]), [job_id])
620
      self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_ERROR])
621

    
622
      # Check opcode status
623
      data = zip(job.ops,
624
                 job.GetInfo(["opstatus"])[0],
625
                 job.GetInfo(["opresult"])[0])
626

    
627
      for idx, (op, opstatus, opresult) in enumerate(data):
628
        if idx < failfrom:
629
          assert not op.input.fail
630
          self.assertEqual(opstatus, constants.OP_STATUS_SUCCESS)
631
          self.assertEqual(opresult, op.input.result)
632
        elif idx <= failto:
633
          assert op.input.fail
634
          self.assertEqual(opstatus, constants.OP_STATUS_ERROR)
635
          self.assertRaises(errors.OpExecError, errors.MaybeRaise, opresult)
636
        else:
637
          assert not op.input.fail
638
          self.assertEqual(opstatus, constants.OP_STATUS_ERROR)
639
          self.assertRaises(errors.OpExecError, errors.MaybeRaise, opresult)
640

    
641
      self.assert_(compat.all(op.start_timestamp and op.end_timestamp
642
                              for op in job.ops[:failfrom]))
643

    
644
      self._GenericCheckJob(job)
645

    
646
      # Finished jobs can't be processed any further
647
      self.assertRaises(errors.ProgrammerError,
648
                        jqueue._JobProcessor(queue, opexec, job))
649

    
650
  def testCancelWhileInQueue(self):
651
    queue = _FakeQueueForProc()
652

    
653
    ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
654
           for i in range(5)]
655

    
656
    # Create job
657
    job_id = 17045
658
    job = self._CreateJob(queue, job_id, ops)
659

    
660
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
661

    
662
    # Mark as cancelled
663
    (success, _) = job.Cancel()
664
    self.assert_(success)
665

    
666
    self.assertRaises(IndexError, queue.GetNextUpdate)
667

    
668
    self.assert_(compat.all(op.status == constants.OP_STATUS_CANCELED
669
                            for op in job.ops))
670

    
671
    opexec = _FakeExecOpCodeForProc(queue, None, None)
672
    self.assert_(jqueue._JobProcessor(queue, opexec, job)())
673

    
674
    # Check result
675
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_CANCELED)
676
    self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_CANCELED])
677
    self.assertFalse(job.start_timestamp)
678
    self.assert_(job.end_timestamp)
679
    self.assertFalse(compat.any(op.start_timestamp or op.end_timestamp
680
                                for op in job.ops))
681
    self.assertEqual(job.GetInfo(["opstatus", "opresult"]),
682
                     [[constants.OP_STATUS_CANCELED for _ in job.ops],
683
                      ["Job canceled by request" for _ in job.ops]])
684

    
685
  def testCancelWhileWaitlockInQueue(self):
686
    queue = _FakeQueueForProc()
687

    
688
    ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
689
           for i in range(5)]
690

    
691
    # Create job
692
    job_id = 8645
693
    job = self._CreateJob(queue, job_id, ops)
694

    
695
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
696

    
697
    job.ops[0].status = constants.OP_STATUS_WAITLOCK
698

    
699
    assert len(job.ops) == 5
700

    
701
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITLOCK)
702

    
703
    # Mark as cancelling
704
    (success, _) = job.Cancel()
705
    self.assert_(success)
706

    
707
    self.assertRaises(IndexError, queue.GetNextUpdate)
708

    
709
    self.assert_(compat.all(op.status == constants.OP_STATUS_CANCELING
710
                            for op in job.ops))
711

    
712
    opexec = _FakeExecOpCodeForProc(queue, None, None)
713
    self.assert_(jqueue._JobProcessor(queue, opexec, job)())
714

    
715
    # Check result
716
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_CANCELED)
717
    self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_CANCELED])
718
    self.assertFalse(job.start_timestamp)
719
    self.assert_(job.end_timestamp)
720
    self.assertFalse(compat.any(op.start_timestamp or op.end_timestamp
721
                                for op in job.ops))
722
    self.assertEqual(job.GetInfo(["opstatus", "opresult"]),
723
                     [[constants.OP_STATUS_CANCELED for _ in job.ops],
724
                      ["Job canceled by request" for _ in job.ops]])
725

    
726
  def testCancelWhileWaitlock(self):
727
    queue = _FakeQueueForProc()
728

    
729
    ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
730
           for i in range(5)]
731

    
732
    # Create job
733
    job_id = 11009
734
    job = self._CreateJob(queue, job_id, ops)
735

    
736
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
737

    
738
    def _BeforeStart(timeout, priority):
739
      self.assertEqual(queue.GetNextUpdate(), (job, True))
740
      self.assertRaises(IndexError, queue.GetNextUpdate)
741
      self.assertFalse(queue.IsAcquired())
742
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITLOCK)
743

    
744
      # Mark as cancelled
745
      (success, _) = job.Cancel()
746
      self.assert_(success)
747

    
748
      self.assert_(compat.all(op.status == constants.OP_STATUS_CANCELING
749
                              for op in job.ops))
750
      self.assertRaises(IndexError, queue.GetNextUpdate)
751

    
752
    def _AfterStart(op, cbs):
753
      self.assertEqual(queue.GetNextUpdate(), (job, True))
754
      self.assertRaises(IndexError, queue.GetNextUpdate)
755
      self.assertFalse(queue.IsAcquired())
756
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
757

    
758
    opexec = _FakeExecOpCodeForProc(queue, _BeforeStart, _AfterStart)
759

    
760
    self.assertRaises(IndexError, queue.GetNextUpdate)
761
    self.assert_(jqueue._JobProcessor(queue, opexec, job)())
762
    self.assertEqual(queue.GetNextUpdate(), (job, True))
763
    self.assertRaises(IndexError, queue.GetNextUpdate)
764

    
765
    # Check result
766
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_CANCELED)
767
    self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_CANCELED])
768
    self.assert_(job.start_timestamp)
769
    self.assert_(job.end_timestamp)
770
    self.assertFalse(compat.all(op.start_timestamp and op.end_timestamp
771
                                for op in job.ops))
772
    self.assertEqual(job.GetInfo(["opstatus", "opresult"]),
773
                     [[constants.OP_STATUS_CANCELED for _ in job.ops],
774
                      ["Job canceled by request" for _ in job.ops]])
775

    
776
  def testCancelWhileWaitlockWithTimeout(self):
777
    queue = _FakeQueueForProc()
778

    
779
    ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
780
           for i in range(5)]
781

    
782
    # Create job
783
    job_id = 24314
784
    job = self._CreateJob(queue, job_id, ops)
785

    
786
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
787

    
788
    def _BeforeStart(timeout, priority):
789
      self.assertFalse(queue.IsAcquired())
790
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITLOCK)
791

    
792
      # Mark as cancelled
793
      (success, _) = job.Cancel()
794
      self.assert_(success)
795

    
796
      self.assert_(compat.all(op.status == constants.OP_STATUS_CANCELING
797
                              for op in job.ops))
798

    
799
      # Fake an acquire attempt timing out
800
      raise mcpu.LockAcquireTimeout()
801

    
802
    def _AfterStart(op, cbs):
803
      self.fail("Should not reach this")
804

    
805
    opexec = _FakeExecOpCodeForProc(queue, _BeforeStart, _AfterStart)
806

    
807
    self.assert_(jqueue._JobProcessor(queue, opexec, job)())
808

    
809
    # Check result
810
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_CANCELED)
811
    self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_CANCELED])
812
    self.assert_(job.start_timestamp)
813
    self.assert_(job.end_timestamp)
814
    self.assertFalse(compat.all(op.start_timestamp and op.end_timestamp
815
                                for op in job.ops))
816
    self.assertEqual(job.GetInfo(["opstatus", "opresult"]),
817
                     [[constants.OP_STATUS_CANCELED for _ in job.ops],
818
                      ["Job canceled by request" for _ in job.ops]])
819

    
820
  def testCancelWhileRunning(self):
821
    # Tests canceling a job with finished opcodes and more, unprocessed ones
822
    queue = _FakeQueueForProc()
823

    
824
    ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
825
           for i in range(3)]
826

    
827
    # Create job
828
    job_id = 28492
829
    job = self._CreateJob(queue, job_id, ops)
830

    
831
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
832

    
833
    opexec = _FakeExecOpCodeForProc(queue, None, None)
834

    
835
    # Run one opcode
836
    self.assertFalse(jqueue._JobProcessor(queue, opexec, job)())
837

    
838
    # Job goes back to queued
839
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
840
    self.assertEqual(job.GetInfo(["opstatus", "opresult"]),
841
                     [[constants.OP_STATUS_SUCCESS,
842
                       constants.OP_STATUS_QUEUED,
843
                       constants.OP_STATUS_QUEUED],
844
                      ["Res0", None, None]])
845

    
846
    # Mark as cancelled
847
    (success, _) = job.Cancel()
848
    self.assert_(success)
849

    
850
    # Try processing another opcode (this will actually cancel the job)
851
    self.assert_(jqueue._JobProcessor(queue, opexec, job)())
852

    
853
    # Check result
854
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_CANCELED)
855
    self.assertEqual(job.GetInfo(["id"]), [job_id])
856
    self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_CANCELED])
857
    self.assertEqual(job.GetInfo(["opstatus", "opresult"]),
858
                     [[constants.OP_STATUS_SUCCESS,
859
                       constants.OP_STATUS_CANCELED,
860
                       constants.OP_STATUS_CANCELED],
861
                      ["Res0", "Job canceled by request",
862
                       "Job canceled by request"]])
863

    
864
  def testPartiallyRun(self):
865
    # Tests calling the processor on a job that's been partially run before the
866
    # program was restarted
867
    queue = _FakeQueueForProc()
868

    
869
    opexec = _FakeExecOpCodeForProc(queue, None, None)
870

    
871
    for job_id, successcount in [(30697, 1), (2552, 4), (12489, 9)]:
872
      ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
873
             for i in range(10)]
874

    
875
      # Create job
876
      job = self._CreateJob(queue, job_id, ops)
877

    
878
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
879

    
880
      for _ in range(successcount):
881
        self.assertFalse(jqueue._JobProcessor(queue, opexec, job)())
882

    
883
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
884
      self.assertEqual(job.GetInfo(["opstatus"]),
885
                       [[constants.OP_STATUS_SUCCESS
886
                         for _ in range(successcount)] +
887
                        [constants.OP_STATUS_QUEUED
888
                         for _ in range(len(ops) - successcount)]])
889

    
890
      self.assert_(job.ops_iter)
891

    
892
      # Serialize and restore (simulates program restart)
893
      newjob = jqueue._QueuedJob.Restore(queue, job.Serialize())
894
      self.assertFalse(newjob.ops_iter)
895
      self._TestPartial(newjob, successcount)
896

    
897
  def _TestPartial(self, job, successcount):
898
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
899
    self.assertEqual(job.start_timestamp, job.ops[0].start_timestamp)
900

    
901
    queue = _FakeQueueForProc()
902
    opexec = _FakeExecOpCodeForProc(queue, None, None)
903

    
904
    for remaining in reversed(range(len(job.ops) - successcount)):
905
      result = jqueue._JobProcessor(queue, opexec, job)()
906

    
907
      if remaining == 0:
908
        # Last opcode
909
        self.assert_(result)
910
        break
911

    
912
      self.assertFalse(result)
913

    
914
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
915

    
916
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_SUCCESS)
917
    self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_SUCCESS])
918
    self.assertEqual(job.GetInfo(["opresult"]),
919
                     [[op.input.result for op in job.ops]])
920
    self.assertEqual(job.GetInfo(["opstatus"]),
921
                     [[constants.OP_STATUS_SUCCESS for _ in job.ops]])
922
    self.assert_(compat.all(op.start_timestamp and op.end_timestamp
923
                            for op in job.ops))
924

    
925
    self._GenericCheckJob(job)
926

    
927
    # Finished jobs can't be processed any further
928
    self.assertRaises(errors.ProgrammerError,
929
                      jqueue._JobProcessor(queue, opexec, job))
930

    
931
    # ... also after being restored
932
    job2 = jqueue._QueuedJob.Restore(queue, job.Serialize())
933
    self.assertRaises(errors.ProgrammerError,
934
                      jqueue._JobProcessor(queue, opexec, job2))
935

    
936
  def testProcessorOnRunningJob(self):
937
    ops = [opcodes.OpTestDummy(result="result", fail=False)]
938

    
939
    queue = _FakeQueueForProc()
940
    opexec = _FakeExecOpCodeForProc(queue, None, None)
941

    
942
    # Create job
943
    job = self._CreateJob(queue, 9571, ops)
944

    
945
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
946

    
947
    job.ops[0].status = constants.OP_STATUS_RUNNING
948

    
949
    assert len(job.ops) == 1
950

    
951
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
952

    
953
    # Calling on running job must fail
954
    self.assertRaises(errors.ProgrammerError,
955
                      jqueue._JobProcessor(queue, opexec, job))
956

    
957
  def testLogMessages(self):
958
    # Tests the "Feedback" callback function
959
    queue = _FakeQueueForProc()
960

    
961
    messages = {
962
      1: [
963
        (None, "Hello"),
964
        (None, "World"),
965
        (constants.ELOG_MESSAGE, "there"),
966
        ],
967
      4: [
968
        (constants.ELOG_JQUEUE_TEST, (1, 2, 3)),
969
        (constants.ELOG_JQUEUE_TEST, ("other", "type")),
970
        ],
971
      }
972
    ops = [opcodes.OpTestDummy(result="Logtest%s" % i, fail=False,
973
                               messages=messages.get(i, []))
974
           for i in range(5)]
975

    
976
    # Create job
977
    job = self._CreateJob(queue, 29386, ops)
978

    
979
    def _BeforeStart(timeout, priority):
980
      self.assertEqual(queue.GetNextUpdate(), (job, True))
981
      self.assertRaises(IndexError, queue.GetNextUpdate)
982
      self.assertFalse(queue.IsAcquired())
983
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITLOCK)
984

    
985
    def _AfterStart(op, cbs):
986
      self.assertEqual(queue.GetNextUpdate(), (job, True))
987
      self.assertRaises(IndexError, queue.GetNextUpdate)
988
      self.assertFalse(queue.IsAcquired())
989
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
990

    
991
      self.assertRaises(AssertionError, cbs.Feedback,
992
                        "too", "many", "arguments")
993

    
994
      for (log_type, msg) in op.messages:
995
        self.assertRaises(IndexError, queue.GetNextUpdate)
996
        if log_type:
997
          cbs.Feedback(log_type, msg)
998
        else:
999
          cbs.Feedback(msg)
1000
        # Check for job update without replication
1001
        self.assertEqual(queue.GetNextUpdate(), (job, False))
1002
        self.assertRaises(IndexError, queue.GetNextUpdate)
1003

    
1004
    opexec = _FakeExecOpCodeForProc(queue, _BeforeStart, _AfterStart)
1005

    
1006
    for remaining in reversed(range(len(job.ops))):
1007
      self.assertRaises(IndexError, queue.GetNextUpdate)
1008
      result = jqueue._JobProcessor(queue, opexec, job)()
1009
      self.assertEqual(queue.GetNextUpdate(), (job, True))
1010
      self.assertRaises(IndexError, queue.GetNextUpdate)
1011

    
1012
      if remaining == 0:
1013
        # Last opcode
1014
        self.assert_(result)
1015
        break
1016

    
1017
      self.assertFalse(result)
1018

    
1019
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1020

    
1021
    self.assertRaises(IndexError, queue.GetNextUpdate)
1022

    
1023
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_SUCCESS)
1024
    self.assertEqual(job.GetInfo(["opresult"]),
1025
                     [[op.input.result for op in job.ops]])
1026

    
1027
    logmsgcount = sum(len(m) for m in messages.values())
1028

    
1029
    self._CheckLogMessages(job, logmsgcount)
1030

    
1031
    # Serialize and restore (simulates program restart)
1032
    newjob = jqueue._QueuedJob.Restore(queue, job.Serialize())
1033
    self._CheckLogMessages(newjob, logmsgcount)
1034

    
1035
    # Check each message
1036
    prevserial = -1
1037
    for idx, oplog in enumerate(job.GetInfo(["oplog"])[0]):
1038
      for (serial, timestamp, log_type, msg) in oplog:
1039
        (exptype, expmsg) = messages.get(idx).pop(0)
1040
        if exptype:
1041
          self.assertEqual(log_type, exptype)
1042
        else:
1043
          self.assertEqual(log_type, constants.ELOG_MESSAGE)
1044
        self.assertEqual(expmsg, msg)
1045
        self.assert_(serial > prevserial)
1046
        prevserial = serial
1047

    
1048
  def _CheckLogMessages(self, job, count):
1049
    # Check serial
1050
    self.assertEqual(job.log_serial, count)
1051

    
1052
    # No filter
1053
    self.assertEqual(job.GetLogEntries(None),
1054
                     [entry for entries in job.GetInfo(["oplog"])[0] if entries
1055
                      for entry in entries])
1056

    
1057
    # Filter with serial
1058
    assert count > 3
1059
    self.assert_(job.GetLogEntries(3))
1060
    self.assertEqual(job.GetLogEntries(3),
1061
                     [entry for entries in job.GetInfo(["oplog"])[0] if entries
1062
                      for entry in entries][3:])
1063

    
1064
    # No log message after highest serial
1065
    self.assertFalse(job.GetLogEntries(count))
1066
    self.assertFalse(job.GetLogEntries(count + 3))
1067

    
1068

    
1069
class _FakeTimeoutStrategy:
1070
  def __init__(self, timeouts):
1071
    self.timeouts = timeouts
1072
    self.attempts = 0
1073
    self.last_timeout = None
1074

    
1075
  def NextAttempt(self):
1076
    self.attempts += 1
1077
    if self.timeouts:
1078
      timeout = self.timeouts.pop(0)
1079
    else:
1080
      timeout = None
1081
    self.last_timeout = timeout
1082
    return timeout
1083

    
1084

    
1085
class TestJobProcessorTimeouts(unittest.TestCase, _JobProcessorTestUtils):
1086
  def setUp(self):
1087
    self.queue = _FakeQueueForProc()
1088
    self.job = None
1089
    self.curop = None
1090
    self.opcounter = None
1091
    self.timeout_strategy = None
1092
    self.retries = 0
1093
    self.prev_tsop = None
1094
    self.prev_prio = None
1095
    self.prev_status = None
1096
    self.lock_acq_prio = None
1097
    self.gave_lock = None
1098
    self.done_lock_before_blocking = False
1099

    
1100
  def _BeforeStart(self, timeout, priority):
1101
    job = self.job
1102

    
1103
    # If status has changed, job must've been written
1104
    if self.prev_status != self.job.ops[self.curop].status:
1105
      self.assertEqual(self.queue.GetNextUpdate(), (job, True))
1106
    self.assertRaises(IndexError, self.queue.GetNextUpdate)
1107

    
1108
    self.assertFalse(self.queue.IsAcquired())
1109
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITLOCK)
1110

    
1111
    ts = self.timeout_strategy
1112

    
1113
    self.assert_(timeout is None or isinstance(timeout, (int, float)))
1114
    self.assertEqual(timeout, ts.last_timeout)
1115
    self.assertEqual(priority, job.ops[self.curop].priority)
1116

    
1117
    self.gave_lock = True
1118
    self.lock_acq_prio = priority
1119

    
1120
    if (self.curop == 3 and
1121
        job.ops[self.curop].priority == constants.OP_PRIO_HIGHEST + 3):
1122
      # Give locks before running into blocking acquire
1123
      assert self.retries == 7
1124
      self.retries = 0
1125
      self.done_lock_before_blocking = True
1126
      return
1127

    
1128
    if self.retries > 0:
1129
      self.assert_(timeout is not None)
1130
      self.retries -= 1
1131
      self.gave_lock = False
1132
      raise mcpu.LockAcquireTimeout()
1133

    
1134
    if job.ops[self.curop].priority == constants.OP_PRIO_HIGHEST:
1135
      assert self.retries == 0, "Didn't exhaust all retries at highest priority"
1136
      assert not ts.timeouts
1137
      self.assert_(timeout is None)
1138

    
1139
  def _AfterStart(self, op, cbs):
1140
    job = self.job
1141

    
1142
    # Setting to "running" requires an update
1143
    self.assertEqual(self.queue.GetNextUpdate(), (job, True))
1144
    self.assertRaises(IndexError, self.queue.GetNextUpdate)
1145

    
1146
    self.assertFalse(self.queue.IsAcquired())
1147
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
1148

    
1149
    # Job is running, cancelling shouldn't be possible
1150
    (success, _) = job.Cancel()
1151
    self.assertFalse(success)
1152

    
1153
  def _NextOpcode(self):
1154
    self.curop = self.opcounter.next()
1155
    self.prev_prio = self.job.ops[self.curop].priority
1156
    self.prev_status = self.job.ops[self.curop].status
1157

    
1158
  def _NewTimeoutStrategy(self):
1159
    job = self.job
1160

    
1161
    self.assertEqual(self.retries, 0)
1162

    
1163
    if self.prev_tsop == self.curop:
1164
      # Still on the same opcode, priority must've been increased
1165
      self.assertEqual(self.prev_prio, job.ops[self.curop].priority + 1)
1166

    
1167
    if self.curop == 1:
1168
      # Normal retry
1169
      timeouts = range(10, 31, 10)
1170
      self.retries = len(timeouts) - 1
1171

    
1172
    elif self.curop == 2:
1173
      # Let this run into a blocking acquire
1174
      timeouts = range(11, 61, 12)
1175
      self.retries = len(timeouts)
1176

    
1177
    elif self.curop == 3:
1178
      # Wait for priority to increase, but give lock before blocking acquire
1179
      timeouts = range(12, 100, 14)
1180
      self.retries = len(timeouts)
1181

    
1182
      self.assertFalse(self.done_lock_before_blocking)
1183

    
1184
    elif self.curop == 4:
1185
      self.assert_(self.done_lock_before_blocking)
1186

    
1187
      # Timeouts, but no need to retry
1188
      timeouts = range(10, 31, 10)
1189
      self.retries = 0
1190

    
1191
    elif self.curop == 5:
1192
      # Normal retry
1193
      timeouts = range(19, 100, 11)
1194
      self.retries = len(timeouts)
1195

    
1196
    else:
1197
      timeouts = []
1198
      self.retries = 0
1199

    
1200
    assert len(job.ops) == 10
1201
    assert self.retries <= len(timeouts)
1202

    
1203
    ts = _FakeTimeoutStrategy(timeouts)
1204

    
1205
    self.timeout_strategy = ts
1206
    self.prev_tsop = self.curop
1207
    self.prev_prio = job.ops[self.curop].priority
1208

    
1209
    return ts
1210

    
1211
  def testTimeout(self):
1212
    ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
1213
           for i in range(10)]
1214

    
1215
    # Create job
1216
    job_id = 15801
1217
    job = self._CreateJob(self.queue, job_id, ops)
1218
    self.job = job
1219

    
1220
    self.opcounter = itertools.count(0)
1221

    
1222
    opexec = _FakeExecOpCodeForProc(self.queue, self._BeforeStart,
1223
                                    self._AfterStart)
1224
    tsf = self._NewTimeoutStrategy
1225

    
1226
    self.assertFalse(self.done_lock_before_blocking)
1227

    
1228
    while True:
1229
      proc = jqueue._JobProcessor(self.queue, opexec, job,
1230
                                  _timeout_strategy_factory=tsf)
1231

    
1232
      self.assertRaises(IndexError, self.queue.GetNextUpdate)
1233

    
1234
      if self.curop is not None:
1235
        self.prev_status = self.job.ops[self.curop].status
1236

    
1237
      self.lock_acq_prio = None
1238

    
1239
      result = proc(_nextop_fn=self._NextOpcode)
1240
      assert self.curop is not None
1241

    
1242
      if result or self.gave_lock:
1243
        # Got lock and/or job is done, result must've been written
1244
        self.assertFalse(job.cur_opctx)
1245
        self.assertEqual(self.queue.GetNextUpdate(), (job, True))
1246
        self.assertRaises(IndexError, self.queue.GetNextUpdate)
1247
        self.assertEqual(self.lock_acq_prio, job.ops[self.curop].priority)
1248
        self.assert_(job.ops[self.curop].exec_timestamp)
1249

    
1250
      if result:
1251
        self.assertFalse(job.cur_opctx)
1252
        break
1253

    
1254
      self.assertFalse(result)
1255

    
1256
      if self.curop == 0:
1257
        self.assertEqual(job.ops[self.curop].start_timestamp,
1258
                         job.start_timestamp)
1259

    
1260
      if self.gave_lock:
1261
        # Opcode finished, but job not yet done
1262
        self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1263
      else:
1264
        # Did not get locks
1265
        self.assert_(job.cur_opctx)
1266
        self.assertEqual(job.cur_opctx._timeout_strategy._fn,
1267
                         self.timeout_strategy.NextAttempt)
1268
        self.assertFalse(job.ops[self.curop].exec_timestamp)
1269
        self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITLOCK)
1270

    
1271
        # If priority has changed since acquiring locks, the job must've been
1272
        # updated
1273
        if self.lock_acq_prio != job.ops[self.curop].priority:
1274
          self.assertEqual(self.queue.GetNextUpdate(), (job, True))
1275

    
1276
      self.assertRaises(IndexError, self.queue.GetNextUpdate)
1277

    
1278
      self.assert_(job.start_timestamp)
1279
      self.assertFalse(job.end_timestamp)
1280

    
1281
    self.assertEqual(self.curop, len(job.ops) - 1)
1282
    self.assertEqual(self.job, job)
1283
    self.assertEqual(self.opcounter.next(), len(job.ops))
1284
    self.assert_(self.done_lock_before_blocking)
1285

    
1286
    self.assertRaises(IndexError, self.queue.GetNextUpdate)
1287
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_SUCCESS)
1288
    self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_SUCCESS])
1289
    self.assertEqual(job.GetInfo(["opresult"]),
1290
                     [[op.input.result for op in job.ops]])
1291
    self.assertEqual(job.GetInfo(["opstatus"]),
1292
                     [len(job.ops) * [constants.OP_STATUS_SUCCESS]])
1293
    self.assert_(compat.all(op.start_timestamp and op.end_timestamp
1294
                            for op in job.ops))
1295

    
1296
    # Finished jobs can't be processed any further
1297
    self.assertRaises(errors.ProgrammerError,
1298
                      jqueue._JobProcessor(self.queue, opexec, job))
1299

    
1300

    
1301
if __name__ == "__main__":
1302
  testutils.GanetiTestProgram()