Statistics
| Branch: | Tag: | Revision:

root / test / ganeti.jqueue_unittest.py @ 7578ab0a

History | View | Annotate | Download (46.8 kB)

1
#!/usr/bin/python
2
#
3

    
4
# Copyright (C) 2010 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Script for testing ganeti.jqueue"""
23

    
24
import os
25
import sys
26
import unittest
27
import tempfile
28
import shutil
29
import errno
30
import itertools
31

    
32
from ganeti import constants
33
from ganeti import utils
34
from ganeti import errors
35
from ganeti import jqueue
36
from ganeti import opcodes
37
from ganeti import compat
38
from ganeti import mcpu
39

    
40
import testutils
41

    
42

    
43
class _FakeJob:
44
  def __init__(self, job_id, status):
45
    self.id = job_id
46
    self._status = status
47
    self._log = []
48

    
49
  def SetStatus(self, status):
50
    self._status = status
51

    
52
  def AddLogEntry(self, msg):
53
    self._log.append((len(self._log), msg))
54

    
55
  def CalcStatus(self):
56
    return self._status
57

    
58
  def GetInfo(self, fields):
59
    result = []
60

    
61
    for name in fields:
62
      if name == "status":
63
        result.append(self._status)
64
      else:
65
        raise Exception("Unknown field")
66

    
67
    return result
68

    
69
  def GetLogEntries(self, newer_than):
70
    assert newer_than is None or newer_than >= 0
71

    
72
    if newer_than is None:
73
      return self._log
74

    
75
    return self._log[newer_than:]
76

    
77

    
78
class TestJobChangesChecker(unittest.TestCase):
79
  def testStatus(self):
80
    job = _FakeJob(9094, constants.JOB_STATUS_QUEUED)
81
    checker = jqueue._JobChangesChecker(["status"], None, None)
82
    self.assertEqual(checker(job), ([constants.JOB_STATUS_QUEUED], []))
83

    
84
    job.SetStatus(constants.JOB_STATUS_RUNNING)
85
    self.assertEqual(checker(job), ([constants.JOB_STATUS_RUNNING], []))
86

    
87
    job.SetStatus(constants.JOB_STATUS_SUCCESS)
88
    self.assertEqual(checker(job), ([constants.JOB_STATUS_SUCCESS], []))
89

    
90
    # job.id is used by checker
91
    self.assertEqual(job.id, 9094)
92

    
93
  def testStatusWithPrev(self):
94
    job = _FakeJob(12807, constants.JOB_STATUS_QUEUED)
95
    checker = jqueue._JobChangesChecker(["status"],
96
                                        [constants.JOB_STATUS_QUEUED], None)
97
    self.assert_(checker(job) is None)
98

    
99
    job.SetStatus(constants.JOB_STATUS_RUNNING)
100
    self.assertEqual(checker(job), ([constants.JOB_STATUS_RUNNING], []))
101

    
102
  def testFinalStatus(self):
103
    for status in constants.JOBS_FINALIZED:
104
      job = _FakeJob(2178711, status)
105
      checker = jqueue._JobChangesChecker(["status"], [status], None)
106
      # There won't be any changes in this status, hence it should signal
107
      # a change immediately
108
      self.assertEqual(checker(job), ([status], []))
109

    
110
  def testLog(self):
111
    job = _FakeJob(9094, constants.JOB_STATUS_RUNNING)
112
    checker = jqueue._JobChangesChecker(["status"], None, None)
113
    self.assertEqual(checker(job), ([constants.JOB_STATUS_RUNNING], []))
114

    
115
    job.AddLogEntry("Hello World")
116
    (job_info, log_entries) = checker(job)
117
    self.assertEqual(job_info, [constants.JOB_STATUS_RUNNING])
118
    self.assertEqual(log_entries, [[0, "Hello World"]])
119

    
120
    checker2 = jqueue._JobChangesChecker(["status"], job_info, len(log_entries))
121
    self.assert_(checker2(job) is None)
122

    
123
    job.AddLogEntry("Foo Bar")
124
    job.SetStatus(constants.JOB_STATUS_ERROR)
125

    
126
    (job_info, log_entries) = checker2(job)
127
    self.assertEqual(job_info, [constants.JOB_STATUS_ERROR])
128
    self.assertEqual(log_entries, [[1, "Foo Bar"]])
129

    
130
    checker3 = jqueue._JobChangesChecker(["status"], None, None)
131
    (job_info, log_entries) = checker3(job)
132
    self.assertEqual(job_info, [constants.JOB_STATUS_ERROR])
133
    self.assertEqual(log_entries, [[0, "Hello World"], [1, "Foo Bar"]])
134

    
135

    
136
class TestJobChangesWaiter(unittest.TestCase):
137
  def setUp(self):
138
    self.tmpdir = tempfile.mkdtemp()
139
    self.filename = utils.PathJoin(self.tmpdir, "job-1")
140
    utils.WriteFile(self.filename, data="")
141

    
142
  def tearDown(self):
143
    shutil.rmtree(self.tmpdir)
144

    
145
  def _EnsureNotifierClosed(self, notifier):
146
    try:
147
      os.fstat(notifier._fd)
148
    except EnvironmentError, err:
149
      self.assertEqual(err.errno, errno.EBADF)
150
    else:
151
      self.fail("File descriptor wasn't closed")
152

    
153
  def testClose(self):
154
    for wait in [False, True]:
155
      waiter = jqueue._JobFileChangesWaiter(self.filename)
156
      try:
157
        if wait:
158
          waiter.Wait(0.001)
159
      finally:
160
        waiter.Close()
161

    
162
      # Ensure file descriptor was closed
163
      self._EnsureNotifierClosed(waiter._notifier)
164

    
165
  def testChangingFile(self):
166
    waiter = jqueue._JobFileChangesWaiter(self.filename)
167
    try:
168
      self.assertFalse(waiter.Wait(0.1))
169
      utils.WriteFile(self.filename, data="changed")
170
      self.assert_(waiter.Wait(60))
171
    finally:
172
      waiter.Close()
173

    
174
    self._EnsureNotifierClosed(waiter._notifier)
175

    
176
  def testChangingFile2(self):
177
    waiter = jqueue._JobChangesWaiter(self.filename)
178
    try:
179
      self.assertFalse(waiter._filewaiter)
180
      self.assert_(waiter.Wait(0.1))
181
      self.assert_(waiter._filewaiter)
182

    
183
      # File waiter is now used, but there have been no changes
184
      self.assertFalse(waiter.Wait(0.1))
185
      utils.WriteFile(self.filename, data="changed")
186
      self.assert_(waiter.Wait(60))
187
    finally:
188
      waiter.Close()
189

    
190
    self._EnsureNotifierClosed(waiter._filewaiter._notifier)
191

    
192

    
193
class TestWaitForJobChangesHelper(unittest.TestCase):
194
  def setUp(self):
195
    self.tmpdir = tempfile.mkdtemp()
196
    self.filename = utils.PathJoin(self.tmpdir, "job-2614226563")
197
    utils.WriteFile(self.filename, data="")
198

    
199
  def tearDown(self):
200
    shutil.rmtree(self.tmpdir)
201

    
202
  def _LoadWaitingJob(self):
203
    return _FakeJob(2614226563, constants.JOB_STATUS_WAITLOCK)
204

    
205
  def _LoadLostJob(self):
206
    return None
207

    
208
  def testNoChanges(self):
209
    wfjc = jqueue._WaitForJobChangesHelper()
210

    
211
    # No change
212
    self.assertEqual(wfjc(self.filename, self._LoadWaitingJob, ["status"],
213
                          [constants.JOB_STATUS_WAITLOCK], None, 0.1),
214
                     constants.JOB_NOTCHANGED)
215

    
216
    # No previous information
217
    self.assertEqual(wfjc(self.filename, self._LoadWaitingJob,
218
                          ["status"], None, None, 1.0),
219
                     ([constants.JOB_STATUS_WAITLOCK], []))
220

    
221
  def testLostJob(self):
222
    wfjc = jqueue._WaitForJobChangesHelper()
223
    self.assert_(wfjc(self.filename, self._LoadLostJob,
224
                      ["status"], None, None, 1.0) is None)
225

    
226

    
227
class TestEncodeOpError(unittest.TestCase):
228
  def test(self):
229
    encerr = jqueue._EncodeOpError(errors.LockError("Test 1"))
230
    self.assert_(isinstance(encerr, tuple))
231
    self.assertRaises(errors.LockError, errors.MaybeRaise, encerr)
232

    
233
    encerr = jqueue._EncodeOpError(errors.GenericError("Test 2"))
234
    self.assert_(isinstance(encerr, tuple))
235
    self.assertRaises(errors.GenericError, errors.MaybeRaise, encerr)
236

    
237
    encerr = jqueue._EncodeOpError(NotImplementedError("Foo"))
238
    self.assert_(isinstance(encerr, tuple))
239
    self.assertRaises(errors.OpExecError, errors.MaybeRaise, encerr)
240

    
241
    encerr = jqueue._EncodeOpError("Hello World")
242
    self.assert_(isinstance(encerr, tuple))
243
    self.assertRaises(errors.OpExecError, errors.MaybeRaise, encerr)
244

    
245

    
246
class TestQueuedOpCode(unittest.TestCase):
247
  def testDefaults(self):
248
    def _Check(op):
249
      self.assertFalse(hasattr(op.input, "dry_run"))
250
      self.assertEqual(op.priority, constants.OP_PRIO_DEFAULT)
251
      self.assertFalse(op.log)
252
      self.assert_(op.start_timestamp is None)
253
      self.assert_(op.exec_timestamp is None)
254
      self.assert_(op.end_timestamp is None)
255
      self.assert_(op.result is None)
256
      self.assertEqual(op.status, constants.OP_STATUS_QUEUED)
257

    
258
    op1 = jqueue._QueuedOpCode(opcodes.OpTestDelay())
259
    _Check(op1)
260
    op2 = jqueue._QueuedOpCode.Restore(op1.Serialize())
261
    _Check(op2)
262
    self.assertEqual(op1.Serialize(), op2.Serialize())
263

    
264
  def testPriority(self):
265
    def _Check(op):
266
      assert constants.OP_PRIO_DEFAULT != constants.OP_PRIO_HIGH, \
267
             "Default priority equals high priority; test can't work"
268
      self.assertEqual(op.priority, constants.OP_PRIO_HIGH)
269
      self.assertEqual(op.status, constants.OP_STATUS_QUEUED)
270

    
271
    inpop = opcodes.OpTagsGet(priority=constants.OP_PRIO_HIGH)
272
    op1 = jqueue._QueuedOpCode(inpop)
273
    _Check(op1)
274
    op2 = jqueue._QueuedOpCode.Restore(op1.Serialize())
275
    _Check(op2)
276
    self.assertEqual(op1.Serialize(), op2.Serialize())
277

    
278

    
279
class TestQueuedJob(unittest.TestCase):
280
  def test(self):
281
    self.assertRaises(errors.GenericError, jqueue._QueuedJob,
282
                      None, 1, [])
283

    
284
  def testDefaults(self):
285
    job_id = 4260
286
    ops = [
287
      opcodes.OpTagsGet(),
288
      opcodes.OpTestDelay(),
289
      ]
290

    
291
    def _Check(job):
292
      self.assertEqual(job.id, job_id)
293
      self.assertEqual(job.log_serial, 0)
294
      self.assert_(job.received_timestamp)
295
      self.assert_(job.start_timestamp is None)
296
      self.assert_(job.end_timestamp is None)
297
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
298
      self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT)
299
      self.assert_(repr(job).startswith("<"))
300
      self.assertEqual(len(job.ops), len(ops))
301
      self.assert_(compat.all(inp.__getstate__() == op.input.__getstate__()
302
                              for (inp, op) in zip(ops, job.ops)))
303
      self.assertRaises(errors.OpExecError, job.GetInfo,
304
                        ["unknown-field"])
305
      self.assertEqual(job.GetInfo(["summary"]),
306
                       [[op.input.Summary() for op in job.ops]])
307

    
308
    job1 = jqueue._QueuedJob(None, job_id, ops)
309
    _Check(job1)
310
    job2 = jqueue._QueuedJob.Restore(None, job1.Serialize())
311
    _Check(job2)
312
    self.assertEqual(job1.Serialize(), job2.Serialize())
313

    
314
  def testPriority(self):
315
    job_id = 4283
316
    ops = [
317
      opcodes.OpTagsGet(priority=constants.OP_PRIO_DEFAULT),
318
      opcodes.OpTestDelay(),
319
      ]
320

    
321
    def _Check(job):
322
      self.assertEqual(job.id, job_id)
323
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
324
      self.assert_(repr(job).startswith("<"))
325

    
326
    job = jqueue._QueuedJob(None, job_id, ops)
327
    _Check(job)
328
    self.assert_(compat.all(op.priority == constants.OP_PRIO_DEFAULT
329
                            for op in job.ops))
330
    self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT)
331

    
332
    # Increase first
333
    job.ops[0].priority -= 1
334
    _Check(job)
335
    self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT - 1)
336

    
337
    # Mark opcode as finished
338
    job.ops[0].status = constants.OP_STATUS_SUCCESS
339
    _Check(job)
340
    self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT)
341

    
342
    # Increase second
343
    job.ops[1].priority -= 10
344
    self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT - 10)
345

    
346
    # Test increasing first
347
    job.ops[0].status = constants.OP_STATUS_RUNNING
348
    job.ops[0].priority -= 19
349
    self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT - 20)
350

    
351
  def testCalcStatus(self):
352
    def _Queued(ops):
353
      # The default status is "queued"
354
      self.assert_(compat.all(op.status == constants.OP_STATUS_QUEUED
355
                              for op in ops))
356

    
357
    def _Waitlock1(ops):
358
      ops[0].status = constants.OP_STATUS_WAITLOCK
359

    
360
    def _Waitlock2(ops):
361
      ops[0].status = constants.OP_STATUS_SUCCESS
362
      ops[1].status = constants.OP_STATUS_SUCCESS
363
      ops[2].status = constants.OP_STATUS_WAITLOCK
364

    
365
    def _Running(ops):
366
      ops[0].status = constants.OP_STATUS_SUCCESS
367
      ops[1].status = constants.OP_STATUS_RUNNING
368
      for op in ops[2:]:
369
        op.status = constants.OP_STATUS_QUEUED
370

    
371
    def _Canceling1(ops):
372
      ops[0].status = constants.OP_STATUS_SUCCESS
373
      ops[1].status = constants.OP_STATUS_SUCCESS
374
      for op in ops[2:]:
375
        op.status = constants.OP_STATUS_CANCELING
376

    
377
    def _Canceling2(ops):
378
      for op in ops:
379
        op.status = constants.OP_STATUS_CANCELING
380

    
381
    def _Canceled(ops):
382
      for op in ops:
383
        op.status = constants.OP_STATUS_CANCELED
384

    
385
    def _Error1(ops):
386
      for idx, op in enumerate(ops):
387
        if idx > 3:
388
          op.status = constants.OP_STATUS_ERROR
389
        else:
390
          op.status = constants.OP_STATUS_SUCCESS
391

    
392
    def _Error2(ops):
393
      for op in ops:
394
        op.status = constants.OP_STATUS_ERROR
395

    
396
    def _Success(ops):
397
      for op in ops:
398
        op.status = constants.OP_STATUS_SUCCESS
399

    
400
    tests = {
401
      constants.JOB_STATUS_QUEUED: [_Queued],
402
      constants.JOB_STATUS_WAITLOCK: [_Waitlock1, _Waitlock2],
403
      constants.JOB_STATUS_RUNNING: [_Running],
404
      constants.JOB_STATUS_CANCELING: [_Canceling1, _Canceling2],
405
      constants.JOB_STATUS_CANCELED: [_Canceled],
406
      constants.JOB_STATUS_ERROR: [_Error1, _Error2],
407
      constants.JOB_STATUS_SUCCESS: [_Success],
408
      }
409

    
410
    def _NewJob():
411
      job = jqueue._QueuedJob(None, 1,
412
                              [opcodes.OpTestDelay() for _ in range(10)])
413
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
414
      self.assert_(compat.all(op.status == constants.OP_STATUS_QUEUED
415
                              for op in job.ops))
416
      return job
417

    
418
    for status in constants.JOB_STATUS_ALL:
419
      sttests = tests[status]
420
      assert sttests
421
      for fn in sttests:
422
        job = _NewJob()
423
        fn(job.ops)
424
        self.assertEqual(job.CalcStatus(), status)
425

    
426

    
427
class _FakeQueueForProc:
428
  def __init__(self):
429
    self._acquired = False
430
    self._updates = []
431
    self._submitted = []
432

    
433
    self._submit_count = itertools.count(1000)
434

    
435
  def IsAcquired(self):
436
    return self._acquired
437

    
438
  def GetNextUpdate(self):
439
    return self._updates.pop(0)
440

    
441
  def GetNextSubmittedJob(self):
442
    return self._submitted.pop(0)
443

    
444
  def acquire(self, shared=0):
445
    assert shared == 1
446
    self._acquired = True
447

    
448
  def release(self):
449
    assert self._acquired
450
    self._acquired = False
451

    
452
  def UpdateJobUnlocked(self, job, replicate=True):
453
    assert self._acquired, "Lock not acquired while updating job"
454
    self._updates.append((job, bool(replicate)))
455

    
456
  def SubmitManyJobs(self, jobs):
457
    assert not self._acquired, "Lock acquired while submitting jobs"
458
    job_ids = [self._submit_count.next() for _ in jobs]
459
    self._submitted.extend(zip(job_ids, jobs))
460
    return job_ids
461

    
462

    
463
class _FakeExecOpCodeForProc:
464
  def __init__(self, queue, before_start, after_start):
465
    self._queue = queue
466
    self._before_start = before_start
467
    self._after_start = after_start
468

    
469
  def __call__(self, op, cbs, timeout=None, priority=None):
470
    assert isinstance(op, opcodes.OpTestDummy)
471
    assert not self._queue.IsAcquired(), \
472
           "Queue lock not released when executing opcode"
473

    
474
    if self._before_start:
475
      self._before_start(timeout, priority)
476

    
477
    cbs.NotifyStart()
478

    
479
    if self._after_start:
480
      self._after_start(op, cbs)
481

    
482
    # Check again after the callbacks
483
    assert not self._queue.IsAcquired()
484

    
485
    if op.fail:
486
      raise errors.OpExecError("Error requested (%s)" % op.result)
487

    
488
    if hasattr(op, "submit_jobs") and op.submit_jobs is not None:
489
      return cbs.SubmitManyJobs(op.submit_jobs)
490

    
491
    return op.result
492

    
493

    
494
class _JobProcessorTestUtils:
495
  def _CreateJob(self, queue, job_id, ops):
496
    job = jqueue._QueuedJob(queue, job_id, ops)
497
    self.assertFalse(job.start_timestamp)
498
    self.assertFalse(job.end_timestamp)
499
    self.assertEqual(len(ops), len(job.ops))
500
    self.assert_(compat.all(op.input == inp
501
                            for (op, inp) in zip(job.ops, ops)))
502
    self.assertEqual(job.GetInfo(["ops"]), [[op.__getstate__() for op in ops]])
503
    return job
504

    
505

    
506
class TestJobProcessor(unittest.TestCase, _JobProcessorTestUtils):
507
  def _GenericCheckJob(self, job):
508
    assert compat.all(isinstance(op.input, opcodes.OpTestDummy)
509
                      for op in job.ops)
510

    
511
    self.assertEqual(job.GetInfo(["opstart", "opexec", "opend"]),
512
                     [[op.start_timestamp for op in job.ops],
513
                      [op.exec_timestamp for op in job.ops],
514
                      [op.end_timestamp for op in job.ops]])
515
    self.assertEqual(job.GetInfo(["received_ts", "start_ts", "end_ts"]),
516
                     [job.received_timestamp,
517
                      job.start_timestamp,
518
                      job.end_timestamp])
519
    self.assert_(job.start_timestamp)
520
    self.assert_(job.end_timestamp)
521
    self.assertEqual(job.start_timestamp, job.ops[0].start_timestamp)
522

    
523
  def testSuccess(self):
524
    queue = _FakeQueueForProc()
525

    
526
    for (job_id, opcount) in [(25351, 1), (6637, 3),
527
                              (24644, 10), (32207, 100)]:
528
      ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
529
             for i in range(opcount)]
530

    
531
      # Create job
532
      job = self._CreateJob(queue, job_id, ops)
533

    
534
      def _BeforeStart(timeout, priority):
535
        self.assertEqual(queue.GetNextUpdate(), (job, True))
536
        self.assertRaises(IndexError, queue.GetNextUpdate)
537
        self.assertFalse(queue.IsAcquired())
538
        self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITLOCK)
539
        self.assertFalse(job.cur_opctx)
540

    
541
      def _AfterStart(op, cbs):
542
        self.assertEqual(queue.GetNextUpdate(), (job, True))
543
        self.assertRaises(IndexError, queue.GetNextUpdate)
544

    
545
        self.assertFalse(queue.IsAcquired())
546
        self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
547
        self.assertFalse(job.cur_opctx)
548

    
549
        # Job is running, cancelling shouldn't be possible
550
        (success, _) = job.Cancel()
551
        self.assertFalse(success)
552

    
553
      opexec = _FakeExecOpCodeForProc(queue, _BeforeStart, _AfterStart)
554

    
555
      for idx in range(len(ops)):
556
        self.assertRaises(IndexError, queue.GetNextUpdate)
557
        result = jqueue._JobProcessor(queue, opexec, job)()
558
        self.assertEqual(queue.GetNextUpdate(), (job, True))
559
        self.assertRaises(IndexError, queue.GetNextUpdate)
560
        if idx == len(ops) - 1:
561
          # Last opcode
562
          self.assert_(result)
563
        else:
564
          self.assertFalse(result)
565

    
566
          self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
567
          self.assert_(job.start_timestamp)
568
          self.assertFalse(job.end_timestamp)
569

    
570
      self.assertRaises(IndexError, queue.GetNextUpdate)
571

    
572
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_SUCCESS)
573
      self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_SUCCESS])
574
      self.assertEqual(job.GetInfo(["opresult"]),
575
                       [[op.input.result for op in job.ops]])
576
      self.assertEqual(job.GetInfo(["opstatus"]),
577
                       [len(job.ops) * [constants.OP_STATUS_SUCCESS]])
578
      self.assert_(compat.all(op.start_timestamp and op.end_timestamp
579
                              for op in job.ops))
580

    
581
      self._GenericCheckJob(job)
582

    
583
      # Finished jobs can't be processed any further
584
      self.assertRaises(errors.ProgrammerError,
585
                        jqueue._JobProcessor(queue, opexec, job))
586

    
587
  def testOpcodeError(self):
588
    queue = _FakeQueueForProc()
589

    
590
    testdata = [
591
      (17077, 1, 0, 0),
592
      (1782, 5, 2, 2),
593
      (18179, 10, 9, 9),
594
      (4744, 10, 3, 8),
595
      (23816, 100, 39, 45),
596
      ]
597

    
598
    for (job_id, opcount, failfrom, failto) in testdata:
599
      # Prepare opcodes
600
      ops = [opcodes.OpTestDummy(result="Res%s" % i,
601
                                 fail=(failfrom <= i and
602
                                       i <= failto))
603
             for i in range(opcount)]
604

    
605
      # Create job
606
      job = self._CreateJob(queue, job_id, ops)
607

    
608
      opexec = _FakeExecOpCodeForProc(queue, None, None)
609

    
610
      for idx in range(len(ops)):
611
        self.assertRaises(IndexError, queue.GetNextUpdate)
612
        result = jqueue._JobProcessor(queue, opexec, job)()
613
        # queued to waitlock
614
        self.assertEqual(queue.GetNextUpdate(), (job, True))
615
        # waitlock to running
616
        self.assertEqual(queue.GetNextUpdate(), (job, True))
617
        # Opcode result
618
        self.assertEqual(queue.GetNextUpdate(), (job, True))
619
        self.assertRaises(IndexError, queue.GetNextUpdate)
620

    
621
        if idx in (failfrom, len(ops) - 1):
622
          # Last opcode
623
          self.assert_(result)
624
          break
625

    
626
        self.assertFalse(result)
627

    
628
        self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
629

    
630
      self.assertRaises(IndexError, queue.GetNextUpdate)
631

    
632
      # Check job status
633
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_ERROR)
634
      self.assertEqual(job.GetInfo(["id"]), [job_id])
635
      self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_ERROR])
636

    
637
      # Check opcode status
638
      data = zip(job.ops,
639
                 job.GetInfo(["opstatus"])[0],
640
                 job.GetInfo(["opresult"])[0])
641

    
642
      for idx, (op, opstatus, opresult) in enumerate(data):
643
        if idx < failfrom:
644
          assert not op.input.fail
645
          self.assertEqual(opstatus, constants.OP_STATUS_SUCCESS)
646
          self.assertEqual(opresult, op.input.result)
647
        elif idx <= failto:
648
          assert op.input.fail
649
          self.assertEqual(opstatus, constants.OP_STATUS_ERROR)
650
          self.assertRaises(errors.OpExecError, errors.MaybeRaise, opresult)
651
        else:
652
          assert not op.input.fail
653
          self.assertEqual(opstatus, constants.OP_STATUS_ERROR)
654
          self.assertRaises(errors.OpExecError, errors.MaybeRaise, opresult)
655

    
656
      self.assert_(compat.all(op.start_timestamp and op.end_timestamp
657
                              for op in job.ops[:failfrom]))
658

    
659
      self._GenericCheckJob(job)
660

    
661
      # Finished jobs can't be processed any further
662
      self.assertRaises(errors.ProgrammerError,
663
                        jqueue._JobProcessor(queue, opexec, job))
664

    
665
  def testCancelWhileInQueue(self):
666
    queue = _FakeQueueForProc()
667

    
668
    ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
669
           for i in range(5)]
670

    
671
    # Create job
672
    job_id = 17045
673
    job = self._CreateJob(queue, job_id, ops)
674

    
675
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
676

    
677
    # Mark as cancelled
678
    (success, _) = job.Cancel()
679
    self.assert_(success)
680

    
681
    self.assertRaises(IndexError, queue.GetNextUpdate)
682

    
683
    self.assert_(compat.all(op.status == constants.OP_STATUS_CANCELED
684
                            for op in job.ops))
685

    
686
    opexec = _FakeExecOpCodeForProc(queue, None, None)
687
    self.assert_(jqueue._JobProcessor(queue, opexec, job)())
688

    
689
    # Check result
690
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_CANCELED)
691
    self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_CANCELED])
692
    self.assertFalse(job.start_timestamp)
693
    self.assert_(job.end_timestamp)
694
    self.assertFalse(compat.any(op.start_timestamp or op.end_timestamp
695
                                for op in job.ops))
696
    self.assertEqual(job.GetInfo(["opstatus", "opresult"]),
697
                     [[constants.OP_STATUS_CANCELED for _ in job.ops],
698
                      ["Job canceled by request" for _ in job.ops]])
699

    
700
  def testCancelWhileWaitlockInQueue(self):
701
    queue = _FakeQueueForProc()
702

    
703
    ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
704
           for i in range(5)]
705

    
706
    # Create job
707
    job_id = 8645
708
    job = self._CreateJob(queue, job_id, ops)
709

    
710
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
711

    
712
    job.ops[0].status = constants.OP_STATUS_WAITLOCK
713

    
714
    assert len(job.ops) == 5
715

    
716
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITLOCK)
717

    
718
    # Mark as cancelling
719
    (success, _) = job.Cancel()
720
    self.assert_(success)
721

    
722
    self.assertRaises(IndexError, queue.GetNextUpdate)
723

    
724
    self.assert_(compat.all(op.status == constants.OP_STATUS_CANCELING
725
                            for op in job.ops))
726

    
727
    opexec = _FakeExecOpCodeForProc(queue, None, None)
728
    self.assert_(jqueue._JobProcessor(queue, opexec, job)())
729

    
730
    # Check result
731
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_CANCELED)
732
    self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_CANCELED])
733
    self.assertFalse(job.start_timestamp)
734
    self.assert_(job.end_timestamp)
735
    self.assertFalse(compat.any(op.start_timestamp or op.end_timestamp
736
                                for op in job.ops))
737
    self.assertEqual(job.GetInfo(["opstatus", "opresult"]),
738
                     [[constants.OP_STATUS_CANCELED for _ in job.ops],
739
                      ["Job canceled by request" for _ in job.ops]])
740

    
741
  def testCancelWhileWaitlock(self):
742
    queue = _FakeQueueForProc()
743

    
744
    ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
745
           for i in range(5)]
746

    
747
    # Create job
748
    job_id = 11009
749
    job = self._CreateJob(queue, job_id, ops)
750

    
751
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
752

    
753
    def _BeforeStart(timeout, priority):
754
      self.assertEqual(queue.GetNextUpdate(), (job, True))
755
      self.assertRaises(IndexError, queue.GetNextUpdate)
756
      self.assertFalse(queue.IsAcquired())
757
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITLOCK)
758

    
759
      # Mark as cancelled
760
      (success, _) = job.Cancel()
761
      self.assert_(success)
762

    
763
      self.assert_(compat.all(op.status == constants.OP_STATUS_CANCELING
764
                              for op in job.ops))
765
      self.assertRaises(IndexError, queue.GetNextUpdate)
766

    
767
    def _AfterStart(op, cbs):
768
      self.assertEqual(queue.GetNextUpdate(), (job, True))
769
      self.assertRaises(IndexError, queue.GetNextUpdate)
770
      self.assertFalse(queue.IsAcquired())
771
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
772

    
773
    opexec = _FakeExecOpCodeForProc(queue, _BeforeStart, _AfterStart)
774

    
775
    self.assertRaises(IndexError, queue.GetNextUpdate)
776
    self.assert_(jqueue._JobProcessor(queue, opexec, job)())
777
    self.assertEqual(queue.GetNextUpdate(), (job, True))
778
    self.assertRaises(IndexError, queue.GetNextUpdate)
779

    
780
    # Check result
781
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_CANCELED)
782
    self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_CANCELED])
783
    self.assert_(job.start_timestamp)
784
    self.assert_(job.end_timestamp)
785
    self.assertFalse(compat.all(op.start_timestamp and op.end_timestamp
786
                                for op in job.ops))
787
    self.assertEqual(job.GetInfo(["opstatus", "opresult"]),
788
                     [[constants.OP_STATUS_CANCELED for _ in job.ops],
789
                      ["Job canceled by request" for _ in job.ops]])
790

    
791
  def testCancelWhileWaitlockWithTimeout(self):
792
    queue = _FakeQueueForProc()
793

    
794
    ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
795
           for i in range(5)]
796

    
797
    # Create job
798
    job_id = 24314
799
    job = self._CreateJob(queue, job_id, ops)
800

    
801
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
802

    
803
    def _BeforeStart(timeout, priority):
804
      self.assertFalse(queue.IsAcquired())
805
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITLOCK)
806

    
807
      # Mark as cancelled
808
      (success, _) = job.Cancel()
809
      self.assert_(success)
810

    
811
      self.assert_(compat.all(op.status == constants.OP_STATUS_CANCELING
812
                              for op in job.ops))
813

    
814
      # Fake an acquire attempt timing out
815
      raise mcpu.LockAcquireTimeout()
816

    
817
    def _AfterStart(op, cbs):
818
      self.fail("Should not reach this")
819

    
820
    opexec = _FakeExecOpCodeForProc(queue, _BeforeStart, _AfterStart)
821

    
822
    self.assert_(jqueue._JobProcessor(queue, opexec, job)())
823

    
824
    # Check result
825
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_CANCELED)
826
    self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_CANCELED])
827
    self.assert_(job.start_timestamp)
828
    self.assert_(job.end_timestamp)
829
    self.assertFalse(compat.all(op.start_timestamp and op.end_timestamp
830
                                for op in job.ops))
831
    self.assertEqual(job.GetInfo(["opstatus", "opresult"]),
832
                     [[constants.OP_STATUS_CANCELED for _ in job.ops],
833
                      ["Job canceled by request" for _ in job.ops]])
834

    
835
  def testCancelWhileRunning(self):
836
    # Tests canceling a job with finished opcodes and more, unprocessed ones
837
    queue = _FakeQueueForProc()
838

    
839
    ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
840
           for i in range(3)]
841

    
842
    # Create job
843
    job_id = 28492
844
    job = self._CreateJob(queue, job_id, ops)
845

    
846
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
847

    
848
    opexec = _FakeExecOpCodeForProc(queue, None, None)
849

    
850
    # Run one opcode
851
    self.assertFalse(jqueue._JobProcessor(queue, opexec, job)())
852

    
853
    # Job goes back to queued
854
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
855
    self.assertEqual(job.GetInfo(["opstatus", "opresult"]),
856
                     [[constants.OP_STATUS_SUCCESS,
857
                       constants.OP_STATUS_QUEUED,
858
                       constants.OP_STATUS_QUEUED],
859
                      ["Res0", None, None]])
860

    
861
    # Mark as cancelled
862
    (success, _) = job.Cancel()
863
    self.assert_(success)
864

    
865
    # Try processing another opcode (this will actually cancel the job)
866
    self.assert_(jqueue._JobProcessor(queue, opexec, job)())
867

    
868
    # Check result
869
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_CANCELED)
870
    self.assertEqual(job.GetInfo(["id"]), [job_id])
871
    self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_CANCELED])
872
    self.assertEqual(job.GetInfo(["opstatus", "opresult"]),
873
                     [[constants.OP_STATUS_SUCCESS,
874
                       constants.OP_STATUS_CANCELED,
875
                       constants.OP_STATUS_CANCELED],
876
                      ["Res0", "Job canceled by request",
877
                       "Job canceled by request"]])
878

    
879
  def testPartiallyRun(self):
880
    # Tests calling the processor on a job that's been partially run before the
881
    # program was restarted
882
    queue = _FakeQueueForProc()
883

    
884
    opexec = _FakeExecOpCodeForProc(queue, None, None)
885

    
886
    for job_id, successcount in [(30697, 1), (2552, 4), (12489, 9)]:
887
      ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
888
             for i in range(10)]
889

    
890
      # Create job
891
      job = self._CreateJob(queue, job_id, ops)
892

    
893
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
894

    
895
      for _ in range(successcount):
896
        self.assertFalse(jqueue._JobProcessor(queue, opexec, job)())
897

    
898
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
899
      self.assertEqual(job.GetInfo(["opstatus"]),
900
                       [[constants.OP_STATUS_SUCCESS
901
                         for _ in range(successcount)] +
902
                        [constants.OP_STATUS_QUEUED
903
                         for _ in range(len(ops) - successcount)]])
904

    
905
      self.assert_(job.ops_iter)
906

    
907
      # Serialize and restore (simulates program restart)
908
      newjob = jqueue._QueuedJob.Restore(queue, job.Serialize())
909
      self.assertFalse(newjob.ops_iter)
910
      self._TestPartial(newjob, successcount)
911

    
912
  def _TestPartial(self, job, successcount):
913
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
914
    self.assertEqual(job.start_timestamp, job.ops[0].start_timestamp)
915

    
916
    queue = _FakeQueueForProc()
917
    opexec = _FakeExecOpCodeForProc(queue, None, None)
918

    
919
    for remaining in reversed(range(len(job.ops) - successcount)):
920
      result = jqueue._JobProcessor(queue, opexec, job)()
921

    
922
      if remaining == 0:
923
        # Last opcode
924
        self.assert_(result)
925
        break
926

    
927
      self.assertFalse(result)
928

    
929
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
930

    
931
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_SUCCESS)
932
    self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_SUCCESS])
933
    self.assertEqual(job.GetInfo(["opresult"]),
934
                     [[op.input.result for op in job.ops]])
935
    self.assertEqual(job.GetInfo(["opstatus"]),
936
                     [[constants.OP_STATUS_SUCCESS for _ in job.ops]])
937
    self.assert_(compat.all(op.start_timestamp and op.end_timestamp
938
                            for op in job.ops))
939

    
940
    self._GenericCheckJob(job)
941

    
942
    # Finished jobs can't be processed any further
943
    self.assertRaises(errors.ProgrammerError,
944
                      jqueue._JobProcessor(queue, opexec, job))
945

    
946
    # ... also after being restored
947
    job2 = jqueue._QueuedJob.Restore(queue, job.Serialize())
948
    self.assertRaises(errors.ProgrammerError,
949
                      jqueue._JobProcessor(queue, opexec, job2))
950

    
951
  def testProcessorOnRunningJob(self):
952
    ops = [opcodes.OpTestDummy(result="result", fail=False)]
953

    
954
    queue = _FakeQueueForProc()
955
    opexec = _FakeExecOpCodeForProc(queue, None, None)
956

    
957
    # Create job
958
    job = self._CreateJob(queue, 9571, ops)
959

    
960
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
961

    
962
    job.ops[0].status = constants.OP_STATUS_RUNNING
963

    
964
    assert len(job.ops) == 1
965

    
966
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
967

    
968
    # Calling on running job must fail
969
    self.assertRaises(errors.ProgrammerError,
970
                      jqueue._JobProcessor(queue, opexec, job))
971

    
972
  def testLogMessages(self):
973
    # Tests the "Feedback" callback function
974
    queue = _FakeQueueForProc()
975

    
976
    messages = {
977
      1: [
978
        (None, "Hello"),
979
        (None, "World"),
980
        (constants.ELOG_MESSAGE, "there"),
981
        ],
982
      4: [
983
        (constants.ELOG_JQUEUE_TEST, (1, 2, 3)),
984
        (constants.ELOG_JQUEUE_TEST, ("other", "type")),
985
        ],
986
      }
987
    ops = [opcodes.OpTestDummy(result="Logtest%s" % i, fail=False,
988
                               messages=messages.get(i, []))
989
           for i in range(5)]
990

    
991
    # Create job
992
    job = self._CreateJob(queue, 29386, ops)
993

    
994
    def _BeforeStart(timeout, priority):
995
      self.assertEqual(queue.GetNextUpdate(), (job, True))
996
      self.assertRaises(IndexError, queue.GetNextUpdate)
997
      self.assertFalse(queue.IsAcquired())
998
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITLOCK)
999

    
1000
    def _AfterStart(op, cbs):
1001
      self.assertEqual(queue.GetNextUpdate(), (job, True))
1002
      self.assertRaises(IndexError, queue.GetNextUpdate)
1003
      self.assertFalse(queue.IsAcquired())
1004
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
1005

    
1006
      self.assertRaises(AssertionError, cbs.Feedback,
1007
                        "too", "many", "arguments")
1008

    
1009
      for (log_type, msg) in op.messages:
1010
        self.assertRaises(IndexError, queue.GetNextUpdate)
1011
        if log_type:
1012
          cbs.Feedback(log_type, msg)
1013
        else:
1014
          cbs.Feedback(msg)
1015
        # Check for job update without replication
1016
        self.assertEqual(queue.GetNextUpdate(), (job, False))
1017
        self.assertRaises(IndexError, queue.GetNextUpdate)
1018

    
1019
    opexec = _FakeExecOpCodeForProc(queue, _BeforeStart, _AfterStart)
1020

    
1021
    for remaining in reversed(range(len(job.ops))):
1022
      self.assertRaises(IndexError, queue.GetNextUpdate)
1023
      result = jqueue._JobProcessor(queue, opexec, job)()
1024
      self.assertEqual(queue.GetNextUpdate(), (job, True))
1025
      self.assertRaises(IndexError, queue.GetNextUpdate)
1026

    
1027
      if remaining == 0:
1028
        # Last opcode
1029
        self.assert_(result)
1030
        break
1031

    
1032
      self.assertFalse(result)
1033

    
1034
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1035

    
1036
    self.assertRaises(IndexError, queue.GetNextUpdate)
1037

    
1038
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_SUCCESS)
1039
    self.assertEqual(job.GetInfo(["opresult"]),
1040
                     [[op.input.result for op in job.ops]])
1041

    
1042
    logmsgcount = sum(len(m) for m in messages.values())
1043

    
1044
    self._CheckLogMessages(job, logmsgcount)
1045

    
1046
    # Serialize and restore (simulates program restart)
1047
    newjob = jqueue._QueuedJob.Restore(queue, job.Serialize())
1048
    self._CheckLogMessages(newjob, logmsgcount)
1049

    
1050
    # Check each message
1051
    prevserial = -1
1052
    for idx, oplog in enumerate(job.GetInfo(["oplog"])[0]):
1053
      for (serial, timestamp, log_type, msg) in oplog:
1054
        (exptype, expmsg) = messages.get(idx).pop(0)
1055
        if exptype:
1056
          self.assertEqual(log_type, exptype)
1057
        else:
1058
          self.assertEqual(log_type, constants.ELOG_MESSAGE)
1059
        self.assertEqual(expmsg, msg)
1060
        self.assert_(serial > prevserial)
1061
        prevserial = serial
1062

    
1063
  def _CheckLogMessages(self, job, count):
1064
    # Check serial
1065
    self.assertEqual(job.log_serial, count)
1066

    
1067
    # No filter
1068
    self.assertEqual(job.GetLogEntries(None),
1069
                     [entry for entries in job.GetInfo(["oplog"])[0] if entries
1070
                      for entry in entries])
1071

    
1072
    # Filter with serial
1073
    assert count > 3
1074
    self.assert_(job.GetLogEntries(3))
1075
    self.assertEqual(job.GetLogEntries(3),
1076
                     [entry for entries in job.GetInfo(["oplog"])[0] if entries
1077
                      for entry in entries][3:])
1078

    
1079
    # No log message after highest serial
1080
    self.assertFalse(job.GetLogEntries(count))
1081
    self.assertFalse(job.GetLogEntries(count + 3))
1082

    
1083
  def testSubmitManyJobs(self):
1084
    queue = _FakeQueueForProc()
1085

    
1086
    job_id = 15656
1087
    ops = [
1088
      opcodes.OpTestDummy(result="Res0", fail=False,
1089
                          submit_jobs=[]),
1090
      opcodes.OpTestDummy(result="Res1", fail=False,
1091
                          submit_jobs=[
1092
                            [opcodes.OpTestDummy(result="r1j0", fail=False)],
1093
                            ]),
1094
      opcodes.OpTestDummy(result="Res2", fail=False,
1095
                          submit_jobs=[
1096
                            [opcodes.OpTestDummy(result="r2j0o0", fail=False),
1097
                             opcodes.OpTestDummy(result="r2j0o1", fail=False),
1098
                             opcodes.OpTestDummy(result="r2j0o2", fail=False),
1099
                             opcodes.OpTestDummy(result="r2j0o3", fail=False)],
1100
                            [opcodes.OpTestDummy(result="r2j1", fail=False)],
1101
                            [opcodes.OpTestDummy(result="r2j3o0", fail=False),
1102
                             opcodes.OpTestDummy(result="r2j3o1", fail=False)],
1103
                            ]),
1104
      ]
1105

    
1106
    # Create job
1107
    job = self._CreateJob(queue, job_id, ops)
1108

    
1109
    def _BeforeStart(timeout, priority):
1110
      self.assertEqual(queue.GetNextUpdate(), (job, True))
1111
      self.assertRaises(IndexError, queue.GetNextUpdate)
1112
      self.assertFalse(queue.IsAcquired())
1113
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITLOCK)
1114
      self.assertFalse(job.cur_opctx)
1115

    
1116
    def _AfterStart(op, cbs):
1117
      self.assertEqual(queue.GetNextUpdate(), (job, True))
1118
      self.assertRaises(IndexError, queue.GetNextUpdate)
1119

    
1120
      self.assertFalse(queue.IsAcquired())
1121
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
1122
      self.assertFalse(job.cur_opctx)
1123

    
1124
      # Job is running, cancelling shouldn't be possible
1125
      (success, _) = job.Cancel()
1126
      self.assertFalse(success)
1127

    
1128
    opexec = _FakeExecOpCodeForProc(queue, _BeforeStart, _AfterStart)
1129

    
1130
    for idx in range(len(ops)):
1131
      self.assertRaises(IndexError, queue.GetNextUpdate)
1132
      result = jqueue._JobProcessor(queue, opexec, job)()
1133
      self.assertEqual(queue.GetNextUpdate(), (job, True))
1134
      self.assertRaises(IndexError, queue.GetNextUpdate)
1135
      if idx == len(ops) - 1:
1136
        # Last opcode
1137
        self.assert_(result)
1138
      else:
1139
        self.assertFalse(result)
1140

    
1141
        self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1142
        self.assert_(job.start_timestamp)
1143
        self.assertFalse(job.end_timestamp)
1144

    
1145
    self.assertRaises(IndexError, queue.GetNextUpdate)
1146

    
1147
    for idx, submitted_ops in enumerate(job_ops
1148
                                        for op in ops
1149
                                        for job_ops in op.submit_jobs):
1150
      self.assertEqual(queue.GetNextSubmittedJob(),
1151
                       (1000 + idx, submitted_ops))
1152
    self.assertRaises(IndexError, queue.GetNextSubmittedJob)
1153

    
1154
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_SUCCESS)
1155
    self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_SUCCESS])
1156
    self.assertEqual(job.GetInfo(["opresult"]),
1157
                     [[[], [1000], [1001, 1002, 1003]]])
1158
    self.assertEqual(job.GetInfo(["opstatus"]),
1159
                     [len(job.ops) * [constants.OP_STATUS_SUCCESS]])
1160

    
1161
    self._GenericCheckJob(job)
1162

    
1163
    # Finished jobs can't be processed any further
1164
    self.assertRaises(errors.ProgrammerError,
1165
                      jqueue._JobProcessor(queue, opexec, job))
1166

    
1167

    
1168
class _FakeTimeoutStrategy:
1169
  def __init__(self, timeouts):
1170
    self.timeouts = timeouts
1171
    self.attempts = 0
1172
    self.last_timeout = None
1173

    
1174
  def NextAttempt(self):
1175
    self.attempts += 1
1176
    if self.timeouts:
1177
      timeout = self.timeouts.pop(0)
1178
    else:
1179
      timeout = None
1180
    self.last_timeout = timeout
1181
    return timeout
1182

    
1183

    
1184
class TestJobProcessorTimeouts(unittest.TestCase, _JobProcessorTestUtils):
1185
  def setUp(self):
1186
    self.queue = _FakeQueueForProc()
1187
    self.job = None
1188
    self.curop = None
1189
    self.opcounter = None
1190
    self.timeout_strategy = None
1191
    self.retries = 0
1192
    self.prev_tsop = None
1193
    self.prev_prio = None
1194
    self.prev_status = None
1195
    self.lock_acq_prio = None
1196
    self.gave_lock = None
1197
    self.done_lock_before_blocking = False
1198

    
1199
  def _BeforeStart(self, timeout, priority):
1200
    job = self.job
1201

    
1202
    # If status has changed, job must've been written
1203
    if self.prev_status != self.job.ops[self.curop].status:
1204
      self.assertEqual(self.queue.GetNextUpdate(), (job, True))
1205
    self.assertRaises(IndexError, self.queue.GetNextUpdate)
1206

    
1207
    self.assertFalse(self.queue.IsAcquired())
1208
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITLOCK)
1209

    
1210
    ts = self.timeout_strategy
1211

    
1212
    self.assert_(timeout is None or isinstance(timeout, (int, float)))
1213
    self.assertEqual(timeout, ts.last_timeout)
1214
    self.assertEqual(priority, job.ops[self.curop].priority)
1215

    
1216
    self.gave_lock = True
1217
    self.lock_acq_prio = priority
1218

    
1219
    if (self.curop == 3 and
1220
        job.ops[self.curop].priority == constants.OP_PRIO_HIGHEST + 3):
1221
      # Give locks before running into blocking acquire
1222
      assert self.retries == 7
1223
      self.retries = 0
1224
      self.done_lock_before_blocking = True
1225
      return
1226

    
1227
    if self.retries > 0:
1228
      self.assert_(timeout is not None)
1229
      self.retries -= 1
1230
      self.gave_lock = False
1231
      raise mcpu.LockAcquireTimeout()
1232

    
1233
    if job.ops[self.curop].priority == constants.OP_PRIO_HIGHEST:
1234
      assert self.retries == 0, "Didn't exhaust all retries at highest priority"
1235
      assert not ts.timeouts
1236
      self.assert_(timeout is None)
1237

    
1238
  def _AfterStart(self, op, cbs):
1239
    job = self.job
1240

    
1241
    # Setting to "running" requires an update
1242
    self.assertEqual(self.queue.GetNextUpdate(), (job, True))
1243
    self.assertRaises(IndexError, self.queue.GetNextUpdate)
1244

    
1245
    self.assertFalse(self.queue.IsAcquired())
1246
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
1247

    
1248
    # Job is running, cancelling shouldn't be possible
1249
    (success, _) = job.Cancel()
1250
    self.assertFalse(success)
1251

    
1252
  def _NextOpcode(self):
1253
    self.curop = self.opcounter.next()
1254
    self.prev_prio = self.job.ops[self.curop].priority
1255
    self.prev_status = self.job.ops[self.curop].status
1256

    
1257
  def _NewTimeoutStrategy(self):
1258
    job = self.job
1259

    
1260
    self.assertEqual(self.retries, 0)
1261

    
1262
    if self.prev_tsop == self.curop:
1263
      # Still on the same opcode, priority must've been increased
1264
      self.assertEqual(self.prev_prio, job.ops[self.curop].priority + 1)
1265

    
1266
    if self.curop == 1:
1267
      # Normal retry
1268
      timeouts = range(10, 31, 10)
1269
      self.retries = len(timeouts) - 1
1270

    
1271
    elif self.curop == 2:
1272
      # Let this run into a blocking acquire
1273
      timeouts = range(11, 61, 12)
1274
      self.retries = len(timeouts)
1275

    
1276
    elif self.curop == 3:
1277
      # Wait for priority to increase, but give lock before blocking acquire
1278
      timeouts = range(12, 100, 14)
1279
      self.retries = len(timeouts)
1280

    
1281
      self.assertFalse(self.done_lock_before_blocking)
1282

    
1283
    elif self.curop == 4:
1284
      self.assert_(self.done_lock_before_blocking)
1285

    
1286
      # Timeouts, but no need to retry
1287
      timeouts = range(10, 31, 10)
1288
      self.retries = 0
1289

    
1290
    elif self.curop == 5:
1291
      # Normal retry
1292
      timeouts = range(19, 100, 11)
1293
      self.retries = len(timeouts)
1294

    
1295
    else:
1296
      timeouts = []
1297
      self.retries = 0
1298

    
1299
    assert len(job.ops) == 10
1300
    assert self.retries <= len(timeouts)
1301

    
1302
    ts = _FakeTimeoutStrategy(timeouts)
1303

    
1304
    self.timeout_strategy = ts
1305
    self.prev_tsop = self.curop
1306
    self.prev_prio = job.ops[self.curop].priority
1307

    
1308
    return ts
1309

    
1310
  def testTimeout(self):
1311
    ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
1312
           for i in range(10)]
1313

    
1314
    # Create job
1315
    job_id = 15801
1316
    job = self._CreateJob(self.queue, job_id, ops)
1317
    self.job = job
1318

    
1319
    self.opcounter = itertools.count(0)
1320

    
1321
    opexec = _FakeExecOpCodeForProc(self.queue, self._BeforeStart,
1322
                                    self._AfterStart)
1323
    tsf = self._NewTimeoutStrategy
1324

    
1325
    self.assertFalse(self.done_lock_before_blocking)
1326

    
1327
    while True:
1328
      proc = jqueue._JobProcessor(self.queue, opexec, job,
1329
                                  _timeout_strategy_factory=tsf)
1330

    
1331
      self.assertRaises(IndexError, self.queue.GetNextUpdate)
1332

    
1333
      if self.curop is not None:
1334
        self.prev_status = self.job.ops[self.curop].status
1335

    
1336
      self.lock_acq_prio = None
1337

    
1338
      result = proc(_nextop_fn=self._NextOpcode)
1339
      assert self.curop is not None
1340

    
1341
      if result or self.gave_lock:
1342
        # Got lock and/or job is done, result must've been written
1343
        self.assertFalse(job.cur_opctx)
1344
        self.assertEqual(self.queue.GetNextUpdate(), (job, True))
1345
        self.assertRaises(IndexError, self.queue.GetNextUpdate)
1346
        self.assertEqual(self.lock_acq_prio, job.ops[self.curop].priority)
1347
        self.assert_(job.ops[self.curop].exec_timestamp)
1348

    
1349
      if result:
1350
        self.assertFalse(job.cur_opctx)
1351
        break
1352

    
1353
      self.assertFalse(result)
1354

    
1355
      if self.curop == 0:
1356
        self.assertEqual(job.ops[self.curop].start_timestamp,
1357
                         job.start_timestamp)
1358

    
1359
      if self.gave_lock:
1360
        # Opcode finished, but job not yet done
1361
        self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1362
      else:
1363
        # Did not get locks
1364
        self.assert_(job.cur_opctx)
1365
        self.assertEqual(job.cur_opctx._timeout_strategy._fn,
1366
                         self.timeout_strategy.NextAttempt)
1367
        self.assertFalse(job.ops[self.curop].exec_timestamp)
1368
        self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITLOCK)
1369

    
1370
        # If priority has changed since acquiring locks, the job must've been
1371
        # updated
1372
        if self.lock_acq_prio != job.ops[self.curop].priority:
1373
          self.assertEqual(self.queue.GetNextUpdate(), (job, True))
1374

    
1375
      self.assertRaises(IndexError, self.queue.GetNextUpdate)
1376

    
1377
      self.assert_(job.start_timestamp)
1378
      self.assertFalse(job.end_timestamp)
1379

    
1380
    self.assertEqual(self.curop, len(job.ops) - 1)
1381
    self.assertEqual(self.job, job)
1382
    self.assertEqual(self.opcounter.next(), len(job.ops))
1383
    self.assert_(self.done_lock_before_blocking)
1384

    
1385
    self.assertRaises(IndexError, self.queue.GetNextUpdate)
1386
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_SUCCESS)
1387
    self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_SUCCESS])
1388
    self.assertEqual(job.GetInfo(["opresult"]),
1389
                     [[op.input.result for op in job.ops]])
1390
    self.assertEqual(job.GetInfo(["opstatus"]),
1391
                     [len(job.ops) * [constants.OP_STATUS_SUCCESS]])
1392
    self.assert_(compat.all(op.start_timestamp and op.end_timestamp
1393
                            for op in job.ops))
1394

    
1395
    # Finished jobs can't be processed any further
1396
    self.assertRaises(errors.ProgrammerError,
1397
                      jqueue._JobProcessor(self.queue, opexec, job))
1398

    
1399

    
1400
if __name__ == "__main__":
1401
  testutils.GanetiTestProgram()