Statistics
| Branch: | Tag: | Revision:

root / test / ganeti.jqueue_unittest.py @ 1e6d5750

History | View | Annotate | Download (47.5 kB)

1
#!/usr/bin/python
2
#
3

    
4
# Copyright (C) 2010, 2011 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Script for testing ganeti.jqueue"""
23

    
24
import os
25
import sys
26
import unittest
27
import tempfile
28
import shutil
29
import errno
30
import itertools
31

    
32
from ganeti import constants
33
from ganeti import utils
34
from ganeti import errors
35
from ganeti import jqueue
36
from ganeti import opcodes
37
from ganeti import compat
38
from ganeti import mcpu
39

    
40
import testutils
41

    
42

    
43
class _FakeJob:
44
  def __init__(self, job_id, status):
45
    self.id = job_id
46
    self._status = status
47
    self._log = []
48

    
49
  def SetStatus(self, status):
50
    self._status = status
51

    
52
  def AddLogEntry(self, msg):
53
    self._log.append((len(self._log), msg))
54

    
55
  def CalcStatus(self):
56
    return self._status
57

    
58
  def GetInfo(self, fields):
59
    result = []
60

    
61
    for name in fields:
62
      if name == "status":
63
        result.append(self._status)
64
      else:
65
        raise Exception("Unknown field")
66

    
67
    return result
68

    
69
  def GetLogEntries(self, newer_than):
70
    assert newer_than is None or newer_than >= 0
71

    
72
    if newer_than is None:
73
      return self._log
74

    
75
    return self._log[newer_than:]
76

    
77

    
78
class TestJobChangesChecker(unittest.TestCase):
79
  def testStatus(self):
80
    job = _FakeJob(9094, constants.JOB_STATUS_QUEUED)
81
    checker = jqueue._JobChangesChecker(["status"], None, None)
82
    self.assertEqual(checker(job), ([constants.JOB_STATUS_QUEUED], []))
83

    
84
    job.SetStatus(constants.JOB_STATUS_RUNNING)
85
    self.assertEqual(checker(job), ([constants.JOB_STATUS_RUNNING], []))
86

    
87
    job.SetStatus(constants.JOB_STATUS_SUCCESS)
88
    self.assertEqual(checker(job), ([constants.JOB_STATUS_SUCCESS], []))
89

    
90
    # job.id is used by checker
91
    self.assertEqual(job.id, 9094)
92

    
93
  def testStatusWithPrev(self):
94
    job = _FakeJob(12807, constants.JOB_STATUS_QUEUED)
95
    checker = jqueue._JobChangesChecker(["status"],
96
                                        [constants.JOB_STATUS_QUEUED], None)
97
    self.assert_(checker(job) is None)
98

    
99
    job.SetStatus(constants.JOB_STATUS_RUNNING)
100
    self.assertEqual(checker(job), ([constants.JOB_STATUS_RUNNING], []))
101

    
102
  def testFinalStatus(self):
103
    for status in constants.JOBS_FINALIZED:
104
      job = _FakeJob(2178711, status)
105
      checker = jqueue._JobChangesChecker(["status"], [status], None)
106
      # There won't be any changes in this status, hence it should signal
107
      # a change immediately
108
      self.assertEqual(checker(job), ([status], []))
109

    
110
  def testLog(self):
111
    job = _FakeJob(9094, constants.JOB_STATUS_RUNNING)
112
    checker = jqueue._JobChangesChecker(["status"], None, None)
113
    self.assertEqual(checker(job), ([constants.JOB_STATUS_RUNNING], []))
114

    
115
    job.AddLogEntry("Hello World")
116
    (job_info, log_entries) = checker(job)
117
    self.assertEqual(job_info, [constants.JOB_STATUS_RUNNING])
118
    self.assertEqual(log_entries, [[0, "Hello World"]])
119

    
120
    checker2 = jqueue._JobChangesChecker(["status"], job_info, len(log_entries))
121
    self.assert_(checker2(job) is None)
122

    
123
    job.AddLogEntry("Foo Bar")
124
    job.SetStatus(constants.JOB_STATUS_ERROR)
125

    
126
    (job_info, log_entries) = checker2(job)
127
    self.assertEqual(job_info, [constants.JOB_STATUS_ERROR])
128
    self.assertEqual(log_entries, [[1, "Foo Bar"]])
129

    
130
    checker3 = jqueue._JobChangesChecker(["status"], None, None)
131
    (job_info, log_entries) = checker3(job)
132
    self.assertEqual(job_info, [constants.JOB_STATUS_ERROR])
133
    self.assertEqual(log_entries, [[0, "Hello World"], [1, "Foo Bar"]])
134

    
135

    
136
class TestJobChangesWaiter(unittest.TestCase):
137
  def setUp(self):
138
    self.tmpdir = tempfile.mkdtemp()
139
    self.filename = utils.PathJoin(self.tmpdir, "job-1")
140
    utils.WriteFile(self.filename, data="")
141

    
142
  def tearDown(self):
143
    shutil.rmtree(self.tmpdir)
144

    
145
  def _EnsureNotifierClosed(self, notifier):
146
    try:
147
      os.fstat(notifier._fd)
148
    except EnvironmentError, err:
149
      self.assertEqual(err.errno, errno.EBADF)
150
    else:
151
      self.fail("File descriptor wasn't closed")
152

    
153
  def testClose(self):
154
    for wait in [False, True]:
155
      waiter = jqueue._JobFileChangesWaiter(self.filename)
156
      try:
157
        if wait:
158
          waiter.Wait(0.001)
159
      finally:
160
        waiter.Close()
161

    
162
      # Ensure file descriptor was closed
163
      self._EnsureNotifierClosed(waiter._notifier)
164

    
165
  def testChangingFile(self):
166
    waiter = jqueue._JobFileChangesWaiter(self.filename)
167
    try:
168
      self.assertFalse(waiter.Wait(0.1))
169
      utils.WriteFile(self.filename, data="changed")
170
      self.assert_(waiter.Wait(60))
171
    finally:
172
      waiter.Close()
173

    
174
    self._EnsureNotifierClosed(waiter._notifier)
175

    
176
  def testChangingFile2(self):
177
    waiter = jqueue._JobChangesWaiter(self.filename)
178
    try:
179
      self.assertFalse(waiter._filewaiter)
180
      self.assert_(waiter.Wait(0.1))
181
      self.assert_(waiter._filewaiter)
182

    
183
      # File waiter is now used, but there have been no changes
184
      self.assertFalse(waiter.Wait(0.1))
185
      utils.WriteFile(self.filename, data="changed")
186
      self.assert_(waiter.Wait(60))
187
    finally:
188
      waiter.Close()
189

    
190
    self._EnsureNotifierClosed(waiter._filewaiter._notifier)
191

    
192

    
193
class TestWaitForJobChangesHelper(unittest.TestCase):
194
  def setUp(self):
195
    self.tmpdir = tempfile.mkdtemp()
196
    self.filename = utils.PathJoin(self.tmpdir, "job-2614226563")
197
    utils.WriteFile(self.filename, data="")
198

    
199
  def tearDown(self):
200
    shutil.rmtree(self.tmpdir)
201

    
202
  def _LoadWaitingJob(self):
203
    return _FakeJob(2614226563, constants.JOB_STATUS_WAITLOCK)
204

    
205
  def _LoadLostJob(self):
206
    return None
207

    
208
  def testNoChanges(self):
209
    wfjc = jqueue._WaitForJobChangesHelper()
210

    
211
    # No change
212
    self.assertEqual(wfjc(self.filename, self._LoadWaitingJob, ["status"],
213
                          [constants.JOB_STATUS_WAITLOCK], None, 0.1),
214
                     constants.JOB_NOTCHANGED)
215

    
216
    # No previous information
217
    self.assertEqual(wfjc(self.filename, self._LoadWaitingJob,
218
                          ["status"], None, None, 1.0),
219
                     ([constants.JOB_STATUS_WAITLOCK], []))
220

    
221
  def testLostJob(self):
222
    wfjc = jqueue._WaitForJobChangesHelper()
223
    self.assert_(wfjc(self.filename, self._LoadLostJob,
224
                      ["status"], None, None, 1.0) is None)
225

    
226

    
227
class TestEncodeOpError(unittest.TestCase):
228
  def test(self):
229
    encerr = jqueue._EncodeOpError(errors.LockError("Test 1"))
230
    self.assert_(isinstance(encerr, tuple))
231
    self.assertRaises(errors.LockError, errors.MaybeRaise, encerr)
232

    
233
    encerr = jqueue._EncodeOpError(errors.GenericError("Test 2"))
234
    self.assert_(isinstance(encerr, tuple))
235
    self.assertRaises(errors.GenericError, errors.MaybeRaise, encerr)
236

    
237
    encerr = jqueue._EncodeOpError(NotImplementedError("Foo"))
238
    self.assert_(isinstance(encerr, tuple))
239
    self.assertRaises(errors.OpExecError, errors.MaybeRaise, encerr)
240

    
241
    encerr = jqueue._EncodeOpError("Hello World")
242
    self.assert_(isinstance(encerr, tuple))
243
    self.assertRaises(errors.OpExecError, errors.MaybeRaise, encerr)
244

    
245

    
246
class TestQueuedOpCode(unittest.TestCase):
247
  def testDefaults(self):
248
    def _Check(op):
249
      self.assertFalse(hasattr(op.input, "dry_run"))
250
      self.assertEqual(op.priority, constants.OP_PRIO_DEFAULT)
251
      self.assertFalse(op.log)
252
      self.assert_(op.start_timestamp is None)
253
      self.assert_(op.exec_timestamp is None)
254
      self.assert_(op.end_timestamp is None)
255
      self.assert_(op.result is None)
256
      self.assertEqual(op.status, constants.OP_STATUS_QUEUED)
257

    
258
    op1 = jqueue._QueuedOpCode(opcodes.OpTestDelay())
259
    _Check(op1)
260
    op2 = jqueue._QueuedOpCode.Restore(op1.Serialize())
261
    _Check(op2)
262
    self.assertEqual(op1.Serialize(), op2.Serialize())
263

    
264
  def testPriority(self):
265
    def _Check(op):
266
      assert constants.OP_PRIO_DEFAULT != constants.OP_PRIO_HIGH, \
267
             "Default priority equals high priority; test can't work"
268
      self.assertEqual(op.priority, constants.OP_PRIO_HIGH)
269
      self.assertEqual(op.status, constants.OP_STATUS_QUEUED)
270

    
271
    inpop = opcodes.OpTagsGet(priority=constants.OP_PRIO_HIGH)
272
    op1 = jqueue._QueuedOpCode(inpop)
273
    _Check(op1)
274
    op2 = jqueue._QueuedOpCode.Restore(op1.Serialize())
275
    _Check(op2)
276
    self.assertEqual(op1.Serialize(), op2.Serialize())
277

    
278

    
279
class TestQueuedJob(unittest.TestCase):
280
  def test(self):
281
    self.assertRaises(errors.GenericError, jqueue._QueuedJob,
282
                      None, 1, [])
283

    
284
  def testDefaults(self):
285
    job_id = 4260
286
    ops = [
287
      opcodes.OpTagsGet(),
288
      opcodes.OpTestDelay(),
289
      ]
290

    
291
    def _Check(job):
292
      self.assertEqual(job.id, job_id)
293
      self.assertEqual(job.log_serial, 0)
294
      self.assert_(job.received_timestamp)
295
      self.assert_(job.start_timestamp is None)
296
      self.assert_(job.end_timestamp is None)
297
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
298
      self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT)
299
      self.assert_(repr(job).startswith("<"))
300
      self.assertEqual(len(job.ops), len(ops))
301
      self.assert_(compat.all(inp.__getstate__() == op.input.__getstate__()
302
                              for (inp, op) in zip(ops, job.ops)))
303
      self.assertRaises(errors.OpExecError, job.GetInfo,
304
                        ["unknown-field"])
305
      self.assertEqual(job.GetInfo(["summary"]),
306
                       [[op.input.Summary() for op in job.ops]])
307

    
308
    job1 = jqueue._QueuedJob(None, job_id, ops)
309
    _Check(job1)
310
    job2 = jqueue._QueuedJob.Restore(None, job1.Serialize())
311
    _Check(job2)
312
    self.assertEqual(job1.Serialize(), job2.Serialize())
313

    
314
  def testPriority(self):
315
    job_id = 4283
316
    ops = [
317
      opcodes.OpTagsGet(priority=constants.OP_PRIO_DEFAULT),
318
      opcodes.OpTestDelay(),
319
      ]
320

    
321
    def _Check(job):
322
      self.assertEqual(job.id, job_id)
323
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
324
      self.assert_(repr(job).startswith("<"))
325

    
326
    job = jqueue._QueuedJob(None, job_id, ops)
327
    _Check(job)
328
    self.assert_(compat.all(op.priority == constants.OP_PRIO_DEFAULT
329
                            for op in job.ops))
330
    self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT)
331

    
332
    # Increase first
333
    job.ops[0].priority -= 1
334
    _Check(job)
335
    self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT - 1)
336

    
337
    # Mark opcode as finished
338
    job.ops[0].status = constants.OP_STATUS_SUCCESS
339
    _Check(job)
340
    self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT)
341

    
342
    # Increase second
343
    job.ops[1].priority -= 10
344
    self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT - 10)
345

    
346
    # Test increasing first
347
    job.ops[0].status = constants.OP_STATUS_RUNNING
348
    job.ops[0].priority -= 19
349
    self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT - 20)
350

    
351
  def testCalcStatus(self):
352
    def _Queued(ops):
353
      # The default status is "queued"
354
      self.assert_(compat.all(op.status == constants.OP_STATUS_QUEUED
355
                              for op in ops))
356

    
357
    def _Waitlock1(ops):
358
      ops[0].status = constants.OP_STATUS_WAITLOCK
359

    
360
    def _Waitlock2(ops):
361
      ops[0].status = constants.OP_STATUS_SUCCESS
362
      ops[1].status = constants.OP_STATUS_SUCCESS
363
      ops[2].status = constants.OP_STATUS_WAITLOCK
364

    
365
    def _Running(ops):
366
      ops[0].status = constants.OP_STATUS_SUCCESS
367
      ops[1].status = constants.OP_STATUS_RUNNING
368
      for op in ops[2:]:
369
        op.status = constants.OP_STATUS_QUEUED
370

    
371
    def _Canceling1(ops):
372
      ops[0].status = constants.OP_STATUS_SUCCESS
373
      ops[1].status = constants.OP_STATUS_SUCCESS
374
      for op in ops[2:]:
375
        op.status = constants.OP_STATUS_CANCELING
376

    
377
    def _Canceling2(ops):
378
      for op in ops:
379
        op.status = constants.OP_STATUS_CANCELING
380

    
381
    def _Canceled(ops):
382
      for op in ops:
383
        op.status = constants.OP_STATUS_CANCELED
384

    
385
    def _Error1(ops):
386
      for idx, op in enumerate(ops):
387
        if idx > 3:
388
          op.status = constants.OP_STATUS_ERROR
389
        else:
390
          op.status = constants.OP_STATUS_SUCCESS
391

    
392
    def _Error2(ops):
393
      for op in ops:
394
        op.status = constants.OP_STATUS_ERROR
395

    
396
    def _Success(ops):
397
      for op in ops:
398
        op.status = constants.OP_STATUS_SUCCESS
399

    
400
    tests = {
401
      constants.JOB_STATUS_QUEUED: [_Queued],
402
      constants.JOB_STATUS_WAITLOCK: [_Waitlock1, _Waitlock2],
403
      constants.JOB_STATUS_RUNNING: [_Running],
404
      constants.JOB_STATUS_CANCELING: [_Canceling1, _Canceling2],
405
      constants.JOB_STATUS_CANCELED: [_Canceled],
406
      constants.JOB_STATUS_ERROR: [_Error1, _Error2],
407
      constants.JOB_STATUS_SUCCESS: [_Success],
408
      }
409

    
410
    def _NewJob():
411
      job = jqueue._QueuedJob(None, 1,
412
                              [opcodes.OpTestDelay() for _ in range(10)])
413
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
414
      self.assert_(compat.all(op.status == constants.OP_STATUS_QUEUED
415
                              for op in job.ops))
416
      return job
417

    
418
    for status in constants.JOB_STATUS_ALL:
419
      sttests = tests[status]
420
      assert sttests
421
      for fn in sttests:
422
        job = _NewJob()
423
        fn(job.ops)
424
        self.assertEqual(job.CalcStatus(), status)
425

    
426

    
427
class _FakeQueueForProc:
428
  def __init__(self):
429
    self._acquired = False
430
    self._updates = []
431
    self._submitted = []
432

    
433
    self._submit_count = itertools.count(1000)
434

    
435
  def IsAcquired(self):
436
    return self._acquired
437

    
438
  def GetNextUpdate(self):
439
    return self._updates.pop(0)
440

    
441
  def GetNextSubmittedJob(self):
442
    return self._submitted.pop(0)
443

    
444
  def acquire(self, shared=0):
445
    assert shared == 1
446
    self._acquired = True
447

    
448
  def release(self):
449
    assert self._acquired
450
    self._acquired = False
451

    
452
  def UpdateJobUnlocked(self, job, replicate=True):
453
    assert self._acquired, "Lock not acquired while updating job"
454
    self._updates.append((job, bool(replicate)))
455

    
456
  def SubmitManyJobs(self, jobs):
457
    assert not self._acquired, "Lock acquired while submitting jobs"
458
    job_ids = [self._submit_count.next() for _ in jobs]
459
    self._submitted.extend(zip(job_ids, jobs))
460
    return job_ids
461

    
462

    
463
class _FakeExecOpCodeForProc:
464
  def __init__(self, queue, before_start, after_start):
465
    self._queue = queue
466
    self._before_start = before_start
467
    self._after_start = after_start
468

    
469
  def __call__(self, op, cbs, timeout=None, priority=None):
470
    assert isinstance(op, opcodes.OpTestDummy)
471
    assert not self._queue.IsAcquired(), \
472
           "Queue lock not released when executing opcode"
473

    
474
    if self._before_start:
475
      self._before_start(timeout, priority)
476

    
477
    cbs.NotifyStart()
478

    
479
    if self._after_start:
480
      self._after_start(op, cbs)
481

    
482
    # Check again after the callbacks
483
    assert not self._queue.IsAcquired()
484

    
485
    if op.fail:
486
      raise errors.OpExecError("Error requested (%s)" % op.result)
487

    
488
    if hasattr(op, "submit_jobs") and op.submit_jobs is not None:
489
      return cbs.SubmitManyJobs(op.submit_jobs)
490

    
491
    return op.result
492

    
493

    
494
class _JobProcessorTestUtils:
495
  def _CreateJob(self, queue, job_id, ops):
496
    job = jqueue._QueuedJob(queue, job_id, ops)
497
    self.assertFalse(job.start_timestamp)
498
    self.assertFalse(job.end_timestamp)
499
    self.assertEqual(len(ops), len(job.ops))
500
    self.assert_(compat.all(op.input == inp
501
                            for (op, inp) in zip(job.ops, ops)))
502
    self.assertEqual(job.GetInfo(["ops"]), [[op.__getstate__() for op in ops]])
503
    return job
504

    
505

    
506
class TestJobProcessor(unittest.TestCase, _JobProcessorTestUtils):
507
  def _GenericCheckJob(self, job):
508
    assert compat.all(isinstance(op.input, opcodes.OpTestDummy)
509
                      for op in job.ops)
510

    
511
    self.assertEqual(job.GetInfo(["opstart", "opexec", "opend"]),
512
                     [[op.start_timestamp for op in job.ops],
513
                      [op.exec_timestamp for op in job.ops],
514
                      [op.end_timestamp for op in job.ops]])
515
    self.assertEqual(job.GetInfo(["received_ts", "start_ts", "end_ts"]),
516
                     [job.received_timestamp,
517
                      job.start_timestamp,
518
                      job.end_timestamp])
519
    self.assert_(job.start_timestamp)
520
    self.assert_(job.end_timestamp)
521
    self.assertEqual(job.start_timestamp, job.ops[0].start_timestamp)
522

    
523
  def testSuccess(self):
524
    queue = _FakeQueueForProc()
525

    
526
    for (job_id, opcount) in [(25351, 1), (6637, 3),
527
                              (24644, 10), (32207, 100)]:
528
      ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
529
             for i in range(opcount)]
530

    
531
      # Create job
532
      job = self._CreateJob(queue, job_id, ops)
533

    
534
      def _BeforeStart(timeout, priority):
535
        self.assertEqual(queue.GetNextUpdate(), (job, True))
536
        self.assertRaises(IndexError, queue.GetNextUpdate)
537
        self.assertFalse(queue.IsAcquired())
538
        self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITLOCK)
539
        self.assertFalse(job.cur_opctx)
540

    
541
      def _AfterStart(op, cbs):
542
        self.assertEqual(queue.GetNextUpdate(), (job, True))
543
        self.assertRaises(IndexError, queue.GetNextUpdate)
544

    
545
        self.assertFalse(queue.IsAcquired())
546
        self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
547
        self.assertFalse(job.cur_opctx)
548

    
549
        # Job is running, cancelling shouldn't be possible
550
        (success, _) = job.Cancel()
551
        self.assertFalse(success)
552

    
553
      opexec = _FakeExecOpCodeForProc(queue, _BeforeStart, _AfterStart)
554

    
555
      for idx in range(len(ops)):
556
        self.assertRaises(IndexError, queue.GetNextUpdate)
557
        result = jqueue._JobProcessor(queue, opexec, job)()
558
        self.assertEqual(queue.GetNextUpdate(), (job, True))
559
        self.assertRaises(IndexError, queue.GetNextUpdate)
560
        if idx == len(ops) - 1:
561
          # Last opcode
562
          self.assert_(result)
563
        else:
564
          self.assertFalse(result)
565

    
566
          self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
567
          self.assert_(job.start_timestamp)
568
          self.assertFalse(job.end_timestamp)
569

    
570
      self.assertRaises(IndexError, queue.GetNextUpdate)
571

    
572
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_SUCCESS)
573
      self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_SUCCESS])
574
      self.assertEqual(job.GetInfo(["opresult"]),
575
                       [[op.input.result for op in job.ops]])
576
      self.assertEqual(job.GetInfo(["opstatus"]),
577
                       [len(job.ops) * [constants.OP_STATUS_SUCCESS]])
578
      self.assert_(compat.all(op.start_timestamp and op.end_timestamp
579
                              for op in job.ops))
580

    
581
      self._GenericCheckJob(job)
582

    
583
      # Calling the processor on a finished job should be a no-op
584
      self.assertTrue(jqueue._JobProcessor(queue, opexec, job)())
585
      self.assertRaises(IndexError, queue.GetNextUpdate)
586

    
587
  def testOpcodeError(self):
588
    queue = _FakeQueueForProc()
589

    
590
    testdata = [
591
      (17077, 1, 0, 0),
592
      (1782, 5, 2, 2),
593
      (18179, 10, 9, 9),
594
      (4744, 10, 3, 8),
595
      (23816, 100, 39, 45),
596
      ]
597

    
598
    for (job_id, opcount, failfrom, failto) in testdata:
599
      # Prepare opcodes
600
      ops = [opcodes.OpTestDummy(result="Res%s" % i,
601
                                 fail=(failfrom <= i and
602
                                       i <= failto))
603
             for i in range(opcount)]
604

    
605
      # Create job
606
      job = self._CreateJob(queue, job_id, ops)
607

    
608
      opexec = _FakeExecOpCodeForProc(queue, None, None)
609

    
610
      for idx in range(len(ops)):
611
        self.assertRaises(IndexError, queue.GetNextUpdate)
612
        result = jqueue._JobProcessor(queue, opexec, job)()
613
        # queued to waitlock
614
        self.assertEqual(queue.GetNextUpdate(), (job, True))
615
        # waitlock to running
616
        self.assertEqual(queue.GetNextUpdate(), (job, True))
617
        # Opcode result
618
        self.assertEqual(queue.GetNextUpdate(), (job, True))
619
        self.assertRaises(IndexError, queue.GetNextUpdate)
620

    
621
        if idx in (failfrom, len(ops) - 1):
622
          # Last opcode
623
          self.assert_(result)
624
          break
625

    
626
        self.assertFalse(result)
627

    
628
        self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
629

    
630
      self.assertRaises(IndexError, queue.GetNextUpdate)
631

    
632
      # Check job status
633
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_ERROR)
634
      self.assertEqual(job.GetInfo(["id"]), [job_id])
635
      self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_ERROR])
636

    
637
      # Check opcode status
638
      data = zip(job.ops,
639
                 job.GetInfo(["opstatus"])[0],
640
                 job.GetInfo(["opresult"])[0])
641

    
642
      for idx, (op, opstatus, opresult) in enumerate(data):
643
        if idx < failfrom:
644
          assert not op.input.fail
645
          self.assertEqual(opstatus, constants.OP_STATUS_SUCCESS)
646
          self.assertEqual(opresult, op.input.result)
647
        elif idx <= failto:
648
          assert op.input.fail
649
          self.assertEqual(opstatus, constants.OP_STATUS_ERROR)
650
          self.assertRaises(errors.OpExecError, errors.MaybeRaise, opresult)
651
        else:
652
          assert not op.input.fail
653
          self.assertEqual(opstatus, constants.OP_STATUS_ERROR)
654
          self.assertRaises(errors.OpExecError, errors.MaybeRaise, opresult)
655

    
656
      self.assert_(compat.all(op.start_timestamp and op.end_timestamp
657
                              for op in job.ops[:failfrom]))
658

    
659
      self._GenericCheckJob(job)
660

    
661
      # Calling the processor on a finished job should be a no-op
662
      self.assertTrue(jqueue._JobProcessor(queue, opexec, job)())
663
      self.assertRaises(IndexError, queue.GetNextUpdate)
664

    
665
  def testCancelWhileInQueue(self):
666
    queue = _FakeQueueForProc()
667

    
668
    ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
669
           for i in range(5)]
670

    
671
    # Create job
672
    job_id = 17045
673
    job = self._CreateJob(queue, job_id, ops)
674

    
675
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
676

    
677
    # Mark as cancelled
678
    (success, _) = job.Cancel()
679
    self.assert_(success)
680

    
681
    self.assertRaises(IndexError, queue.GetNextUpdate)
682

    
683
    self.assertFalse(job.start_timestamp)
684
    self.assertTrue(job.end_timestamp)
685
    self.assert_(compat.all(op.status == constants.OP_STATUS_CANCELED
686
                            for op in job.ops))
687

    
688
    # Serialize to check for differences
689
    before_proc = job.Serialize()
690

    
691
    # Simulate processor called in workerpool
692
    opexec = _FakeExecOpCodeForProc(queue, None, None)
693
    self.assert_(jqueue._JobProcessor(queue, opexec, job)())
694

    
695
    # Check result
696
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_CANCELED)
697
    self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_CANCELED])
698
    self.assertFalse(job.start_timestamp)
699
    self.assertTrue(job.end_timestamp)
700
    self.assertFalse(compat.any(op.start_timestamp or op.end_timestamp
701
                                for op in job.ops))
702
    self.assertEqual(job.GetInfo(["opstatus", "opresult"]),
703
                     [[constants.OP_STATUS_CANCELED for _ in job.ops],
704
                      ["Job canceled by request" for _ in job.ops]])
705

    
706
    # Must not have changed or written
707
    self.assertEqual(before_proc, job.Serialize())
708
    self.assertRaises(IndexError, queue.GetNextUpdate)
709

    
710
  def testCancelWhileWaitlockInQueue(self):
711
    queue = _FakeQueueForProc()
712

    
713
    ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
714
           for i in range(5)]
715

    
716
    # Create job
717
    job_id = 8645
718
    job = self._CreateJob(queue, job_id, ops)
719

    
720
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
721

    
722
    job.ops[0].status = constants.OP_STATUS_WAITLOCK
723

    
724
    assert len(job.ops) == 5
725

    
726
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITLOCK)
727

    
728
    # Mark as cancelling
729
    (success, _) = job.Cancel()
730
    self.assert_(success)
731

    
732
    self.assertRaises(IndexError, queue.GetNextUpdate)
733

    
734
    self.assert_(compat.all(op.status == constants.OP_STATUS_CANCELING
735
                            for op in job.ops))
736

    
737
    opexec = _FakeExecOpCodeForProc(queue, None, None)
738
    self.assert_(jqueue._JobProcessor(queue, opexec, job)())
739

    
740
    # Check result
741
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_CANCELED)
742
    self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_CANCELED])
743
    self.assertFalse(job.start_timestamp)
744
    self.assert_(job.end_timestamp)
745
    self.assertFalse(compat.any(op.start_timestamp or op.end_timestamp
746
                                for op in job.ops))
747
    self.assertEqual(job.GetInfo(["opstatus", "opresult"]),
748
                     [[constants.OP_STATUS_CANCELED for _ in job.ops],
749
                      ["Job canceled by request" for _ in job.ops]])
750

    
751
  def testCancelWhileWaitlock(self):
752
    queue = _FakeQueueForProc()
753

    
754
    ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
755
           for i in range(5)]
756

    
757
    # Create job
758
    job_id = 11009
759
    job = self._CreateJob(queue, job_id, ops)
760

    
761
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
762

    
763
    def _BeforeStart(timeout, priority):
764
      self.assertEqual(queue.GetNextUpdate(), (job, True))
765
      self.assertRaises(IndexError, queue.GetNextUpdate)
766
      self.assertFalse(queue.IsAcquired())
767
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITLOCK)
768

    
769
      # Mark as cancelled
770
      (success, _) = job.Cancel()
771
      self.assert_(success)
772

    
773
      self.assert_(compat.all(op.status == constants.OP_STATUS_CANCELING
774
                              for op in job.ops))
775
      self.assertRaises(IndexError, queue.GetNextUpdate)
776

    
777
    def _AfterStart(op, cbs):
778
      self.assertEqual(queue.GetNextUpdate(), (job, True))
779
      self.assertRaises(IndexError, queue.GetNextUpdate)
780
      self.assertFalse(queue.IsAcquired())
781
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
782

    
783
    opexec = _FakeExecOpCodeForProc(queue, _BeforeStart, _AfterStart)
784

    
785
    self.assertRaises(IndexError, queue.GetNextUpdate)
786
    self.assert_(jqueue._JobProcessor(queue, opexec, job)())
787
    self.assertEqual(queue.GetNextUpdate(), (job, True))
788
    self.assertRaises(IndexError, queue.GetNextUpdate)
789

    
790
    # Check result
791
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_CANCELED)
792
    self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_CANCELED])
793
    self.assert_(job.start_timestamp)
794
    self.assert_(job.end_timestamp)
795
    self.assertFalse(compat.all(op.start_timestamp and op.end_timestamp
796
                                for op in job.ops))
797
    self.assertEqual(job.GetInfo(["opstatus", "opresult"]),
798
                     [[constants.OP_STATUS_CANCELED for _ in job.ops],
799
                      ["Job canceled by request" for _ in job.ops]])
800

    
801
  def testCancelWhileWaitlockWithTimeout(self):
802
    queue = _FakeQueueForProc()
803

    
804
    ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
805
           for i in range(5)]
806

    
807
    # Create job
808
    job_id = 24314
809
    job = self._CreateJob(queue, job_id, ops)
810

    
811
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
812

    
813
    def _BeforeStart(timeout, priority):
814
      self.assertFalse(queue.IsAcquired())
815
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITLOCK)
816

    
817
      # Mark as cancelled
818
      (success, _) = job.Cancel()
819
      self.assert_(success)
820

    
821
      self.assert_(compat.all(op.status == constants.OP_STATUS_CANCELING
822
                              for op in job.ops))
823

    
824
      # Fake an acquire attempt timing out
825
      raise mcpu.LockAcquireTimeout()
826

    
827
    def _AfterStart(op, cbs):
828
      self.fail("Should not reach this")
829

    
830
    opexec = _FakeExecOpCodeForProc(queue, _BeforeStart, _AfterStart)
831

    
832
    self.assert_(jqueue._JobProcessor(queue, opexec, job)())
833

    
834
    # Check result
835
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_CANCELED)
836
    self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_CANCELED])
837
    self.assert_(job.start_timestamp)
838
    self.assert_(job.end_timestamp)
839
    self.assertFalse(compat.all(op.start_timestamp and op.end_timestamp
840
                                for op in job.ops))
841
    self.assertEqual(job.GetInfo(["opstatus", "opresult"]),
842
                     [[constants.OP_STATUS_CANCELED for _ in job.ops],
843
                      ["Job canceled by request" for _ in job.ops]])
844

    
845
  def testCancelWhileRunning(self):
846
    # Tests canceling a job with finished opcodes and more, unprocessed ones
847
    queue = _FakeQueueForProc()
848

    
849
    ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
850
           for i in range(3)]
851

    
852
    # Create job
853
    job_id = 28492
854
    job = self._CreateJob(queue, job_id, ops)
855

    
856
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
857

    
858
    opexec = _FakeExecOpCodeForProc(queue, None, None)
859

    
860
    # Run one opcode
861
    self.assertFalse(jqueue._JobProcessor(queue, opexec, job)())
862

    
863
    # Job goes back to queued
864
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
865
    self.assertEqual(job.GetInfo(["opstatus", "opresult"]),
866
                     [[constants.OP_STATUS_SUCCESS,
867
                       constants.OP_STATUS_QUEUED,
868
                       constants.OP_STATUS_QUEUED],
869
                      ["Res0", None, None]])
870

    
871
    # Mark as cancelled
872
    (success, _) = job.Cancel()
873
    self.assert_(success)
874

    
875
    # Try processing another opcode (this will actually cancel the job)
876
    self.assert_(jqueue._JobProcessor(queue, opexec, job)())
877

    
878
    # Check result
879
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_CANCELED)
880
    self.assertEqual(job.GetInfo(["id"]), [job_id])
881
    self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_CANCELED])
882
    self.assertEqual(job.GetInfo(["opstatus", "opresult"]),
883
                     [[constants.OP_STATUS_SUCCESS,
884
                       constants.OP_STATUS_CANCELED,
885
                       constants.OP_STATUS_CANCELED],
886
                      ["Res0", "Job canceled by request",
887
                       "Job canceled by request"]])
888

    
889
  def testPartiallyRun(self):
890
    # Tests calling the processor on a job that's been partially run before the
891
    # program was restarted
892
    queue = _FakeQueueForProc()
893

    
894
    opexec = _FakeExecOpCodeForProc(queue, None, None)
895

    
896
    for job_id, successcount in [(30697, 1), (2552, 4), (12489, 9)]:
897
      ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
898
             for i in range(10)]
899

    
900
      # Create job
901
      job = self._CreateJob(queue, job_id, ops)
902

    
903
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
904

    
905
      for _ in range(successcount):
906
        self.assertFalse(jqueue._JobProcessor(queue, opexec, job)())
907

    
908
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
909
      self.assertEqual(job.GetInfo(["opstatus"]),
910
                       [[constants.OP_STATUS_SUCCESS
911
                         for _ in range(successcount)] +
912
                        [constants.OP_STATUS_QUEUED
913
                         for _ in range(len(ops) - successcount)]])
914

    
915
      self.assert_(job.ops_iter)
916

    
917
      # Serialize and restore (simulates program restart)
918
      newjob = jqueue._QueuedJob.Restore(queue, job.Serialize())
919
      self.assertFalse(newjob.ops_iter)
920
      self._TestPartial(newjob, successcount)
921

    
922
  def _TestPartial(self, job, successcount):
923
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
924
    self.assertEqual(job.start_timestamp, job.ops[0].start_timestamp)
925

    
926
    queue = _FakeQueueForProc()
927
    opexec = _FakeExecOpCodeForProc(queue, None, None)
928

    
929
    for remaining in reversed(range(len(job.ops) - successcount)):
930
      result = jqueue._JobProcessor(queue, opexec, job)()
931
      self.assertEqual(queue.GetNextUpdate(), (job, True))
932
      self.assertEqual(queue.GetNextUpdate(), (job, True))
933
      self.assertEqual(queue.GetNextUpdate(), (job, True))
934
      self.assertRaises(IndexError, queue.GetNextUpdate)
935

    
936
      if remaining == 0:
937
        # Last opcode
938
        self.assert_(result)
939
        break
940

    
941
      self.assertFalse(result)
942

    
943
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
944

    
945
    self.assertRaises(IndexError, queue.GetNextUpdate)
946
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_SUCCESS)
947
    self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_SUCCESS])
948
    self.assertEqual(job.GetInfo(["opresult"]),
949
                     [[op.input.result for op in job.ops]])
950
    self.assertEqual(job.GetInfo(["opstatus"]),
951
                     [[constants.OP_STATUS_SUCCESS for _ in job.ops]])
952
    self.assert_(compat.all(op.start_timestamp and op.end_timestamp
953
                            for op in job.ops))
954

    
955
    self._GenericCheckJob(job)
956

    
957
    # Calling the processor on a finished job should be a no-op
958
    self.assertTrue(jqueue._JobProcessor(queue, opexec, job)())
959
    self.assertRaises(IndexError, queue.GetNextUpdate)
960

    
961
    # ... also after being restored
962
    job2 = jqueue._QueuedJob.Restore(queue, job.Serialize())
963
    self.assertTrue(jqueue._JobProcessor(queue, opexec, job2)())
964
    self.assertRaises(IndexError, queue.GetNextUpdate)
965

    
966
  def testProcessorOnRunningJob(self):
967
    ops = [opcodes.OpTestDummy(result="result", fail=False)]
968

    
969
    queue = _FakeQueueForProc()
970
    opexec = _FakeExecOpCodeForProc(queue, None, None)
971

    
972
    # Create job
973
    job = self._CreateJob(queue, 9571, ops)
974

    
975
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
976

    
977
    job.ops[0].status = constants.OP_STATUS_RUNNING
978

    
979
    assert len(job.ops) == 1
980

    
981
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
982

    
983
    # Calling on running job must fail
984
    self.assertRaises(errors.ProgrammerError,
985
                      jqueue._JobProcessor(queue, opexec, job))
986

    
987
  def testLogMessages(self):
988
    # Tests the "Feedback" callback function
989
    queue = _FakeQueueForProc()
990

    
991
    messages = {
992
      1: [
993
        (None, "Hello"),
994
        (None, "World"),
995
        (constants.ELOG_MESSAGE, "there"),
996
        ],
997
      4: [
998
        (constants.ELOG_JQUEUE_TEST, (1, 2, 3)),
999
        (constants.ELOG_JQUEUE_TEST, ("other", "type")),
1000
        ],
1001
      }
1002
    ops = [opcodes.OpTestDummy(result="Logtest%s" % i, fail=False,
1003
                               messages=messages.get(i, []))
1004
           for i in range(5)]
1005

    
1006
    # Create job
1007
    job = self._CreateJob(queue, 29386, ops)
1008

    
1009
    def _BeforeStart(timeout, priority):
1010
      self.assertEqual(queue.GetNextUpdate(), (job, True))
1011
      self.assertRaises(IndexError, queue.GetNextUpdate)
1012
      self.assertFalse(queue.IsAcquired())
1013
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITLOCK)
1014

    
1015
    def _AfterStart(op, cbs):
1016
      self.assertEqual(queue.GetNextUpdate(), (job, True))
1017
      self.assertRaises(IndexError, queue.GetNextUpdate)
1018
      self.assertFalse(queue.IsAcquired())
1019
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
1020

    
1021
      self.assertRaises(AssertionError, cbs.Feedback,
1022
                        "too", "many", "arguments")
1023

    
1024
      for (log_type, msg) in op.messages:
1025
        self.assertRaises(IndexError, queue.GetNextUpdate)
1026
        if log_type:
1027
          cbs.Feedback(log_type, msg)
1028
        else:
1029
          cbs.Feedback(msg)
1030
        # Check for job update without replication
1031
        self.assertEqual(queue.GetNextUpdate(), (job, False))
1032
        self.assertRaises(IndexError, queue.GetNextUpdate)
1033

    
1034
    opexec = _FakeExecOpCodeForProc(queue, _BeforeStart, _AfterStart)
1035

    
1036
    for remaining in reversed(range(len(job.ops))):
1037
      self.assertRaises(IndexError, queue.GetNextUpdate)
1038
      result = jqueue._JobProcessor(queue, opexec, job)()
1039
      self.assertEqual(queue.GetNextUpdate(), (job, True))
1040
      self.assertRaises(IndexError, queue.GetNextUpdate)
1041

    
1042
      if remaining == 0:
1043
        # Last opcode
1044
        self.assert_(result)
1045
        break
1046

    
1047
      self.assertFalse(result)
1048

    
1049
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1050

    
1051
    self.assertRaises(IndexError, queue.GetNextUpdate)
1052

    
1053
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_SUCCESS)
1054
    self.assertEqual(job.GetInfo(["opresult"]),
1055
                     [[op.input.result for op in job.ops]])
1056

    
1057
    logmsgcount = sum(len(m) for m in messages.values())
1058

    
1059
    self._CheckLogMessages(job, logmsgcount)
1060

    
1061
    # Serialize and restore (simulates program restart)
1062
    newjob = jqueue._QueuedJob.Restore(queue, job.Serialize())
1063
    self._CheckLogMessages(newjob, logmsgcount)
1064

    
1065
    # Check each message
1066
    prevserial = -1
1067
    for idx, oplog in enumerate(job.GetInfo(["oplog"])[0]):
1068
      for (serial, timestamp, log_type, msg) in oplog:
1069
        (exptype, expmsg) = messages.get(idx).pop(0)
1070
        if exptype:
1071
          self.assertEqual(log_type, exptype)
1072
        else:
1073
          self.assertEqual(log_type, constants.ELOG_MESSAGE)
1074
        self.assertEqual(expmsg, msg)
1075
        self.assert_(serial > prevserial)
1076
        prevserial = serial
1077

    
1078
  def _CheckLogMessages(self, job, count):
1079
    # Check serial
1080
    self.assertEqual(job.log_serial, count)
1081

    
1082
    # No filter
1083
    self.assertEqual(job.GetLogEntries(None),
1084
                     [entry for entries in job.GetInfo(["oplog"])[0] if entries
1085
                      for entry in entries])
1086

    
1087
    # Filter with serial
1088
    assert count > 3
1089
    self.assert_(job.GetLogEntries(3))
1090
    self.assertEqual(job.GetLogEntries(3),
1091
                     [entry for entries in job.GetInfo(["oplog"])[0] if entries
1092
                      for entry in entries][3:])
1093

    
1094
    # No log message after highest serial
1095
    self.assertFalse(job.GetLogEntries(count))
1096
    self.assertFalse(job.GetLogEntries(count + 3))
1097

    
1098
  def testSubmitManyJobs(self):
1099
    queue = _FakeQueueForProc()
1100

    
1101
    job_id = 15656
1102
    ops = [
1103
      opcodes.OpTestDummy(result="Res0", fail=False,
1104
                          submit_jobs=[]),
1105
      opcodes.OpTestDummy(result="Res1", fail=False,
1106
                          submit_jobs=[
1107
                            [opcodes.OpTestDummy(result="r1j0", fail=False)],
1108
                            ]),
1109
      opcodes.OpTestDummy(result="Res2", fail=False,
1110
                          submit_jobs=[
1111
                            [opcodes.OpTestDummy(result="r2j0o0", fail=False),
1112
                             opcodes.OpTestDummy(result="r2j0o1", fail=False),
1113
                             opcodes.OpTestDummy(result="r2j0o2", fail=False),
1114
                             opcodes.OpTestDummy(result="r2j0o3", fail=False)],
1115
                            [opcodes.OpTestDummy(result="r2j1", fail=False)],
1116
                            [opcodes.OpTestDummy(result="r2j3o0", fail=False),
1117
                             opcodes.OpTestDummy(result="r2j3o1", fail=False)],
1118
                            ]),
1119
      ]
1120

    
1121
    # Create job
1122
    job = self._CreateJob(queue, job_id, ops)
1123

    
1124
    def _BeforeStart(timeout, priority):
1125
      self.assertEqual(queue.GetNextUpdate(), (job, True))
1126
      self.assertRaises(IndexError, queue.GetNextUpdate)
1127
      self.assertFalse(queue.IsAcquired())
1128
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITLOCK)
1129
      self.assertFalse(job.cur_opctx)
1130

    
1131
    def _AfterStart(op, cbs):
1132
      self.assertEqual(queue.GetNextUpdate(), (job, True))
1133
      self.assertRaises(IndexError, queue.GetNextUpdate)
1134

    
1135
      self.assertFalse(queue.IsAcquired())
1136
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
1137
      self.assertFalse(job.cur_opctx)
1138

    
1139
      # Job is running, cancelling shouldn't be possible
1140
      (success, _) = job.Cancel()
1141
      self.assertFalse(success)
1142

    
1143
    opexec = _FakeExecOpCodeForProc(queue, _BeforeStart, _AfterStart)
1144

    
1145
    for idx in range(len(ops)):
1146
      self.assertRaises(IndexError, queue.GetNextUpdate)
1147
      result = jqueue._JobProcessor(queue, opexec, job)()
1148
      self.assertEqual(queue.GetNextUpdate(), (job, True))
1149
      self.assertRaises(IndexError, queue.GetNextUpdate)
1150
      if idx == len(ops) - 1:
1151
        # Last opcode
1152
        self.assert_(result)
1153
      else:
1154
        self.assertFalse(result)
1155

    
1156
        self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1157
        self.assert_(job.start_timestamp)
1158
        self.assertFalse(job.end_timestamp)
1159

    
1160
    self.assertRaises(IndexError, queue.GetNextUpdate)
1161

    
1162
    for idx, submitted_ops in enumerate(job_ops
1163
                                        for op in ops
1164
                                        for job_ops in op.submit_jobs):
1165
      self.assertEqual(queue.GetNextSubmittedJob(),
1166
                       (1000 + idx, submitted_ops))
1167
    self.assertRaises(IndexError, queue.GetNextSubmittedJob)
1168

    
1169
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_SUCCESS)
1170
    self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_SUCCESS])
1171
    self.assertEqual(job.GetInfo(["opresult"]),
1172
                     [[[], [1000], [1001, 1002, 1003]]])
1173
    self.assertEqual(job.GetInfo(["opstatus"]),
1174
                     [len(job.ops) * [constants.OP_STATUS_SUCCESS]])
1175

    
1176
    self._GenericCheckJob(job)
1177

    
1178
    # Calling the processor on a finished job should be a no-op
1179
    self.assertTrue(jqueue._JobProcessor(queue, opexec, job)())
1180
    self.assertRaises(IndexError, queue.GetNextUpdate)
1181

    
1182

    
1183
class _FakeTimeoutStrategy:
1184
  def __init__(self, timeouts):
1185
    self.timeouts = timeouts
1186
    self.attempts = 0
1187
    self.last_timeout = None
1188

    
1189
  def NextAttempt(self):
1190
    self.attempts += 1
1191
    if self.timeouts:
1192
      timeout = self.timeouts.pop(0)
1193
    else:
1194
      timeout = None
1195
    self.last_timeout = timeout
1196
    return timeout
1197

    
1198

    
1199
class TestJobProcessorTimeouts(unittest.TestCase, _JobProcessorTestUtils):
1200
  def setUp(self):
1201
    self.queue = _FakeQueueForProc()
1202
    self.job = None
1203
    self.curop = None
1204
    self.opcounter = None
1205
    self.timeout_strategy = None
1206
    self.retries = 0
1207
    self.prev_tsop = None
1208
    self.prev_prio = None
1209
    self.prev_status = None
1210
    self.lock_acq_prio = None
1211
    self.gave_lock = None
1212
    self.done_lock_before_blocking = False
1213

    
1214
  def _BeforeStart(self, timeout, priority):
1215
    job = self.job
1216

    
1217
    # If status has changed, job must've been written
1218
    if self.prev_status != self.job.ops[self.curop].status:
1219
      self.assertEqual(self.queue.GetNextUpdate(), (job, True))
1220
    self.assertRaises(IndexError, self.queue.GetNextUpdate)
1221

    
1222
    self.assertFalse(self.queue.IsAcquired())
1223
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITLOCK)
1224

    
1225
    ts = self.timeout_strategy
1226

    
1227
    self.assert_(timeout is None or isinstance(timeout, (int, float)))
1228
    self.assertEqual(timeout, ts.last_timeout)
1229
    self.assertEqual(priority, job.ops[self.curop].priority)
1230

    
1231
    self.gave_lock = True
1232
    self.lock_acq_prio = priority
1233

    
1234
    if (self.curop == 3 and
1235
        job.ops[self.curop].priority == constants.OP_PRIO_HIGHEST + 3):
1236
      # Give locks before running into blocking acquire
1237
      assert self.retries == 7
1238
      self.retries = 0
1239
      self.done_lock_before_blocking = True
1240
      return
1241

    
1242
    if self.retries > 0:
1243
      self.assert_(timeout is not None)
1244
      self.retries -= 1
1245
      self.gave_lock = False
1246
      raise mcpu.LockAcquireTimeout()
1247

    
1248
    if job.ops[self.curop].priority == constants.OP_PRIO_HIGHEST:
1249
      assert self.retries == 0, "Didn't exhaust all retries at highest priority"
1250
      assert not ts.timeouts
1251
      self.assert_(timeout is None)
1252

    
1253
  def _AfterStart(self, op, cbs):
1254
    job = self.job
1255

    
1256
    # Setting to "running" requires an update
1257
    self.assertEqual(self.queue.GetNextUpdate(), (job, True))
1258
    self.assertRaises(IndexError, self.queue.GetNextUpdate)
1259

    
1260
    self.assertFalse(self.queue.IsAcquired())
1261
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
1262

    
1263
    # Job is running, cancelling shouldn't be possible
1264
    (success, _) = job.Cancel()
1265
    self.assertFalse(success)
1266

    
1267
  def _NextOpcode(self):
1268
    self.curop = self.opcounter.next()
1269
    self.prev_prio = self.job.ops[self.curop].priority
1270
    self.prev_status = self.job.ops[self.curop].status
1271

    
1272
  def _NewTimeoutStrategy(self):
1273
    job = self.job
1274

    
1275
    self.assertEqual(self.retries, 0)
1276

    
1277
    if self.prev_tsop == self.curop:
1278
      # Still on the same opcode, priority must've been increased
1279
      self.assertEqual(self.prev_prio, job.ops[self.curop].priority + 1)
1280

    
1281
    if self.curop == 1:
1282
      # Normal retry
1283
      timeouts = range(10, 31, 10)
1284
      self.retries = len(timeouts) - 1
1285

    
1286
    elif self.curop == 2:
1287
      # Let this run into a blocking acquire
1288
      timeouts = range(11, 61, 12)
1289
      self.retries = len(timeouts)
1290

    
1291
    elif self.curop == 3:
1292
      # Wait for priority to increase, but give lock before blocking acquire
1293
      timeouts = range(12, 100, 14)
1294
      self.retries = len(timeouts)
1295

    
1296
      self.assertFalse(self.done_lock_before_blocking)
1297

    
1298
    elif self.curop == 4:
1299
      self.assert_(self.done_lock_before_blocking)
1300

    
1301
      # Timeouts, but no need to retry
1302
      timeouts = range(10, 31, 10)
1303
      self.retries = 0
1304

    
1305
    elif self.curop == 5:
1306
      # Normal retry
1307
      timeouts = range(19, 100, 11)
1308
      self.retries = len(timeouts)
1309

    
1310
    else:
1311
      timeouts = []
1312
      self.retries = 0
1313

    
1314
    assert len(job.ops) == 10
1315
    assert self.retries <= len(timeouts)
1316

    
1317
    ts = _FakeTimeoutStrategy(timeouts)
1318

    
1319
    self.timeout_strategy = ts
1320
    self.prev_tsop = self.curop
1321
    self.prev_prio = job.ops[self.curop].priority
1322

    
1323
    return ts
1324

    
1325
  def testTimeout(self):
1326
    ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
1327
           for i in range(10)]
1328

    
1329
    # Create job
1330
    job_id = 15801
1331
    job = self._CreateJob(self.queue, job_id, ops)
1332
    self.job = job
1333

    
1334
    self.opcounter = itertools.count(0)
1335

    
1336
    opexec = _FakeExecOpCodeForProc(self.queue, self._BeforeStart,
1337
                                    self._AfterStart)
1338
    tsf = self._NewTimeoutStrategy
1339

    
1340
    self.assertFalse(self.done_lock_before_blocking)
1341

    
1342
    while True:
1343
      proc = jqueue._JobProcessor(self.queue, opexec, job,
1344
                                  _timeout_strategy_factory=tsf)
1345

    
1346
      self.assertRaises(IndexError, self.queue.GetNextUpdate)
1347

    
1348
      if self.curop is not None:
1349
        self.prev_status = self.job.ops[self.curop].status
1350

    
1351
      self.lock_acq_prio = None
1352

    
1353
      result = proc(_nextop_fn=self._NextOpcode)
1354
      assert self.curop is not None
1355

    
1356
      if result or self.gave_lock:
1357
        # Got lock and/or job is done, result must've been written
1358
        self.assertFalse(job.cur_opctx)
1359
        self.assertEqual(self.queue.GetNextUpdate(), (job, True))
1360
        self.assertRaises(IndexError, self.queue.GetNextUpdate)
1361
        self.assertEqual(self.lock_acq_prio, job.ops[self.curop].priority)
1362
        self.assert_(job.ops[self.curop].exec_timestamp)
1363

    
1364
      if result:
1365
        self.assertFalse(job.cur_opctx)
1366
        break
1367

    
1368
      self.assertFalse(result)
1369

    
1370
      if self.curop == 0:
1371
        self.assertEqual(job.ops[self.curop].start_timestamp,
1372
                         job.start_timestamp)
1373

    
1374
      if self.gave_lock:
1375
        # Opcode finished, but job not yet done
1376
        self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1377
      else:
1378
        # Did not get locks
1379
        self.assert_(job.cur_opctx)
1380
        self.assertEqual(job.cur_opctx._timeout_strategy._fn,
1381
                         self.timeout_strategy.NextAttempt)
1382
        self.assertFalse(job.ops[self.curop].exec_timestamp)
1383
        self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITLOCK)
1384

    
1385
        # If priority has changed since acquiring locks, the job must've been
1386
        # updated
1387
        if self.lock_acq_prio != job.ops[self.curop].priority:
1388
          self.assertEqual(self.queue.GetNextUpdate(), (job, True))
1389

    
1390
      self.assertRaises(IndexError, self.queue.GetNextUpdate)
1391

    
1392
      self.assert_(job.start_timestamp)
1393
      self.assertFalse(job.end_timestamp)
1394

    
1395
    self.assertEqual(self.curop, len(job.ops) - 1)
1396
    self.assertEqual(self.job, job)
1397
    self.assertEqual(self.opcounter.next(), len(job.ops))
1398
    self.assert_(self.done_lock_before_blocking)
1399

    
1400
    self.assertRaises(IndexError, self.queue.GetNextUpdate)
1401
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_SUCCESS)
1402
    self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_SUCCESS])
1403
    self.assertEqual(job.GetInfo(["opresult"]),
1404
                     [[op.input.result for op in job.ops]])
1405
    self.assertEqual(job.GetInfo(["opstatus"]),
1406
                     [len(job.ops) * [constants.OP_STATUS_SUCCESS]])
1407
    self.assert_(compat.all(op.start_timestamp and op.end_timestamp
1408
                            for op in job.ops))
1409

    
1410
    # Calling the processor on a finished job should be a no-op
1411
    self.assertTrue(jqueue._JobProcessor(self.queue, opexec, job)())
1412
    self.assertRaises(IndexError, self.queue.GetNextUpdate)
1413

    
1414

    
1415
if __name__ == "__main__":
1416
  testutils.GanetiTestProgram()