Statistics
| Branch: | Tag: | Revision:

root / test / ganeti.jqueue_unittest.py @ 47099cd1

History | View | Annotate | Download (71.1 kB)

1
#!/usr/bin/python
2
#
3

    
4
# Copyright (C) 2010, 2011 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Script for testing ganeti.jqueue"""
23

    
24
import os
25
import sys
26
import unittest
27
import tempfile
28
import shutil
29
import errno
30
import itertools
31

    
32
from ganeti import constants
33
from ganeti import utils
34
from ganeti import errors
35
from ganeti import jqueue
36
from ganeti import opcodes
37
from ganeti import compat
38
from ganeti import mcpu
39

    
40
import testutils
41

    
42

    
43
class _FakeJob:
44
  def __init__(self, job_id, status):
45
    self.id = job_id
46
    self.writable = False
47
    self._status = status
48
    self._log = []
49

    
50
  def SetStatus(self, status):
51
    self._status = status
52

    
53
  def AddLogEntry(self, msg):
54
    self._log.append((len(self._log), msg))
55

    
56
  def CalcStatus(self):
57
    return self._status
58

    
59
  def GetInfo(self, fields):
60
    result = []
61

    
62
    for name in fields:
63
      if name == "status":
64
        result.append(self._status)
65
      else:
66
        raise Exception("Unknown field")
67

    
68
    return result
69

    
70
  def GetLogEntries(self, newer_than):
71
    assert newer_than is None or newer_than >= 0
72

    
73
    if newer_than is None:
74
      return self._log
75

    
76
    return self._log[newer_than:]
77

    
78

    
79
class TestJobChangesChecker(unittest.TestCase):
80
  def testStatus(self):
81
    job = _FakeJob(9094, constants.JOB_STATUS_QUEUED)
82
    checker = jqueue._JobChangesChecker(["status"], None, None)
83
    self.assertEqual(checker(job), ([constants.JOB_STATUS_QUEUED], []))
84

    
85
    job.SetStatus(constants.JOB_STATUS_RUNNING)
86
    self.assertEqual(checker(job), ([constants.JOB_STATUS_RUNNING], []))
87

    
88
    job.SetStatus(constants.JOB_STATUS_SUCCESS)
89
    self.assertEqual(checker(job), ([constants.JOB_STATUS_SUCCESS], []))
90

    
91
    # job.id is used by checker
92
    self.assertEqual(job.id, 9094)
93

    
94
  def testStatusWithPrev(self):
95
    job = _FakeJob(12807, constants.JOB_STATUS_QUEUED)
96
    checker = jqueue._JobChangesChecker(["status"],
97
                                        [constants.JOB_STATUS_QUEUED], None)
98
    self.assert_(checker(job) is None)
99

    
100
    job.SetStatus(constants.JOB_STATUS_RUNNING)
101
    self.assertEqual(checker(job), ([constants.JOB_STATUS_RUNNING], []))
102

    
103
  def testFinalStatus(self):
104
    for status in constants.JOBS_FINALIZED:
105
      job = _FakeJob(2178711, status)
106
      checker = jqueue._JobChangesChecker(["status"], [status], None)
107
      # There won't be any changes in this status, hence it should signal
108
      # a change immediately
109
      self.assertEqual(checker(job), ([status], []))
110

    
111
  def testLog(self):
112
    job = _FakeJob(9094, constants.JOB_STATUS_RUNNING)
113
    checker = jqueue._JobChangesChecker(["status"], None, None)
114
    self.assertEqual(checker(job), ([constants.JOB_STATUS_RUNNING], []))
115

    
116
    job.AddLogEntry("Hello World")
117
    (job_info, log_entries) = checker(job)
118
    self.assertEqual(job_info, [constants.JOB_STATUS_RUNNING])
119
    self.assertEqual(log_entries, [[0, "Hello World"]])
120

    
121
    checker2 = jqueue._JobChangesChecker(["status"], job_info, len(log_entries))
122
    self.assert_(checker2(job) is None)
123

    
124
    job.AddLogEntry("Foo Bar")
125
    job.SetStatus(constants.JOB_STATUS_ERROR)
126

    
127
    (job_info, log_entries) = checker2(job)
128
    self.assertEqual(job_info, [constants.JOB_STATUS_ERROR])
129
    self.assertEqual(log_entries, [[1, "Foo Bar"]])
130

    
131
    checker3 = jqueue._JobChangesChecker(["status"], None, None)
132
    (job_info, log_entries) = checker3(job)
133
    self.assertEqual(job_info, [constants.JOB_STATUS_ERROR])
134
    self.assertEqual(log_entries, [[0, "Hello World"], [1, "Foo Bar"]])
135

    
136

    
137
class TestJobChangesWaiter(unittest.TestCase):
138
  def setUp(self):
139
    self.tmpdir = tempfile.mkdtemp()
140
    self.filename = utils.PathJoin(self.tmpdir, "job-1")
141
    utils.WriteFile(self.filename, data="")
142

    
143
  def tearDown(self):
144
    shutil.rmtree(self.tmpdir)
145

    
146
  def _EnsureNotifierClosed(self, notifier):
147
    try:
148
      os.fstat(notifier._fd)
149
    except EnvironmentError, err:
150
      self.assertEqual(err.errno, errno.EBADF)
151
    else:
152
      self.fail("File descriptor wasn't closed")
153

    
154
  def testClose(self):
155
    for wait in [False, True]:
156
      waiter = jqueue._JobFileChangesWaiter(self.filename)
157
      try:
158
        if wait:
159
          waiter.Wait(0.001)
160
      finally:
161
        waiter.Close()
162

    
163
      # Ensure file descriptor was closed
164
      self._EnsureNotifierClosed(waiter._notifier)
165

    
166
  def testChangingFile(self):
167
    waiter = jqueue._JobFileChangesWaiter(self.filename)
168
    try:
169
      self.assertFalse(waiter.Wait(0.1))
170
      utils.WriteFile(self.filename, data="changed")
171
      self.assert_(waiter.Wait(60))
172
    finally:
173
      waiter.Close()
174

    
175
    self._EnsureNotifierClosed(waiter._notifier)
176

    
177
  def testChangingFile2(self):
178
    waiter = jqueue._JobChangesWaiter(self.filename)
179
    try:
180
      self.assertFalse(waiter._filewaiter)
181
      self.assert_(waiter.Wait(0.1))
182
      self.assert_(waiter._filewaiter)
183

    
184
      # File waiter is now used, but there have been no changes
185
      self.assertFalse(waiter.Wait(0.1))
186
      utils.WriteFile(self.filename, data="changed")
187
      self.assert_(waiter.Wait(60))
188
    finally:
189
      waiter.Close()
190

    
191
    self._EnsureNotifierClosed(waiter._filewaiter._notifier)
192

    
193

    
194
class TestWaitForJobChangesHelper(unittest.TestCase):
195
  def setUp(self):
196
    self.tmpdir = tempfile.mkdtemp()
197
    self.filename = utils.PathJoin(self.tmpdir, "job-2614226563")
198
    utils.WriteFile(self.filename, data="")
199

    
200
  def tearDown(self):
201
    shutil.rmtree(self.tmpdir)
202

    
203
  def _LoadWaitingJob(self):
204
    return _FakeJob(2614226563, constants.JOB_STATUS_WAITING)
205

    
206
  def _LoadLostJob(self):
207
    return None
208

    
209
  def testNoChanges(self):
210
    wfjc = jqueue._WaitForJobChangesHelper()
211

    
212
    # No change
213
    self.assertEqual(wfjc(self.filename, self._LoadWaitingJob, ["status"],
214
                          [constants.JOB_STATUS_WAITING], None, 0.1),
215
                     constants.JOB_NOTCHANGED)
216

    
217
    # No previous information
218
    self.assertEqual(wfjc(self.filename, self._LoadWaitingJob,
219
                          ["status"], None, None, 1.0),
220
                     ([constants.JOB_STATUS_WAITING], []))
221

    
222
  def testLostJob(self):
223
    wfjc = jqueue._WaitForJobChangesHelper()
224
    self.assert_(wfjc(self.filename, self._LoadLostJob,
225
                      ["status"], None, None, 1.0) is None)
226

    
227

    
228
class TestEncodeOpError(unittest.TestCase):
229
  def test(self):
230
    encerr = jqueue._EncodeOpError(errors.LockError("Test 1"))
231
    self.assert_(isinstance(encerr, tuple))
232
    self.assertRaises(errors.LockError, errors.MaybeRaise, encerr)
233

    
234
    encerr = jqueue._EncodeOpError(errors.GenericError("Test 2"))
235
    self.assert_(isinstance(encerr, tuple))
236
    self.assertRaises(errors.GenericError, errors.MaybeRaise, encerr)
237

    
238
    encerr = jqueue._EncodeOpError(NotImplementedError("Foo"))
239
    self.assert_(isinstance(encerr, tuple))
240
    self.assertRaises(errors.OpExecError, errors.MaybeRaise, encerr)
241

    
242
    encerr = jqueue._EncodeOpError("Hello World")
243
    self.assert_(isinstance(encerr, tuple))
244
    self.assertRaises(errors.OpExecError, errors.MaybeRaise, encerr)
245

    
246

    
247
class TestQueuedOpCode(unittest.TestCase):
248
  def testDefaults(self):
249
    def _Check(op):
250
      self.assertFalse(hasattr(op.input, "dry_run"))
251
      self.assertEqual(op.priority, constants.OP_PRIO_DEFAULT)
252
      self.assertFalse(op.log)
253
      self.assert_(op.start_timestamp is None)
254
      self.assert_(op.exec_timestamp is None)
255
      self.assert_(op.end_timestamp is None)
256
      self.assert_(op.result is None)
257
      self.assertEqual(op.status, constants.OP_STATUS_QUEUED)
258

    
259
    op1 = jqueue._QueuedOpCode(opcodes.OpTestDelay())
260
    _Check(op1)
261
    op2 = jqueue._QueuedOpCode.Restore(op1.Serialize())
262
    _Check(op2)
263
    self.assertEqual(op1.Serialize(), op2.Serialize())
264

    
265
  def testPriority(self):
266
    def _Check(op):
267
      assert constants.OP_PRIO_DEFAULT != constants.OP_PRIO_HIGH, \
268
             "Default priority equals high priority; test can't work"
269
      self.assertEqual(op.priority, constants.OP_PRIO_HIGH)
270
      self.assertEqual(op.status, constants.OP_STATUS_QUEUED)
271

    
272
    inpop = opcodes.OpTagsGet(priority=constants.OP_PRIO_HIGH)
273
    op1 = jqueue._QueuedOpCode(inpop)
274
    _Check(op1)
275
    op2 = jqueue._QueuedOpCode.Restore(op1.Serialize())
276
    _Check(op2)
277
    self.assertEqual(op1.Serialize(), op2.Serialize())
278

    
279

    
280
class TestQueuedJob(unittest.TestCase):
281
  def test(self):
282
    self.assertRaises(errors.GenericError, jqueue._QueuedJob,
283
                      None, 1, [], False)
284

    
285
  def testDefaults(self):
286
    job_id = 4260
287
    ops = [
288
      opcodes.OpTagsGet(),
289
      opcodes.OpTestDelay(),
290
      ]
291

    
292
    def _Check(job):
293
      self.assertTrue(job.writable)
294
      self.assertEqual(job.id, job_id)
295
      self.assertEqual(job.log_serial, 0)
296
      self.assert_(job.received_timestamp)
297
      self.assert_(job.start_timestamp is None)
298
      self.assert_(job.end_timestamp is None)
299
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
300
      self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT)
301
      self.assert_(repr(job).startswith("<"))
302
      self.assertEqual(len(job.ops), len(ops))
303
      self.assert_(compat.all(inp.__getstate__() == op.input.__getstate__()
304
                              for (inp, op) in zip(ops, job.ops)))
305
      self.assertRaises(errors.OpExecError, job.GetInfo,
306
                        ["unknown-field"])
307
      self.assertEqual(job.GetInfo(["summary"]),
308
                       [[op.input.Summary() for op in job.ops]])
309

    
310
    job1 = jqueue._QueuedJob(None, job_id, ops, True)
311
    _Check(job1)
312
    job2 = jqueue._QueuedJob.Restore(None, job1.Serialize(), True)
313
    _Check(job2)
314
    self.assertEqual(job1.Serialize(), job2.Serialize())
315

    
316
  def testWritable(self):
317
    job = jqueue._QueuedJob(None, 1, [opcodes.OpTestDelay()], False)
318
    self.assertFalse(job.writable)
319

    
320
    job = jqueue._QueuedJob(None, 1, [opcodes.OpTestDelay()], True)
321
    self.assertTrue(job.writable)
322

    
323
  def testPriority(self):
324
    job_id = 4283
325
    ops = [
326
      opcodes.OpTagsGet(priority=constants.OP_PRIO_DEFAULT),
327
      opcodes.OpTestDelay(),
328
      ]
329

    
330
    def _Check(job):
331
      self.assertEqual(job.id, job_id)
332
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
333
      self.assert_(repr(job).startswith("<"))
334

    
335
    job = jqueue._QueuedJob(None, job_id, ops, True)
336
    _Check(job)
337
    self.assert_(compat.all(op.priority == constants.OP_PRIO_DEFAULT
338
                            for op in job.ops))
339
    self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT)
340

    
341
    # Increase first
342
    job.ops[0].priority -= 1
343
    _Check(job)
344
    self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT - 1)
345

    
346
    # Mark opcode as finished
347
    job.ops[0].status = constants.OP_STATUS_SUCCESS
348
    _Check(job)
349
    self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT)
350

    
351
    # Increase second
352
    job.ops[1].priority -= 10
353
    self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT - 10)
354

    
355
    # Test increasing first
356
    job.ops[0].status = constants.OP_STATUS_RUNNING
357
    job.ops[0].priority -= 19
358
    self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT - 20)
359

    
360
  def testCalcStatus(self):
361
    def _Queued(ops):
362
      # The default status is "queued"
363
      self.assert_(compat.all(op.status == constants.OP_STATUS_QUEUED
364
                              for op in ops))
365

    
366
    def _Waitlock1(ops):
367
      ops[0].status = constants.OP_STATUS_WAITING
368

    
369
    def _Waitlock2(ops):
370
      ops[0].status = constants.OP_STATUS_SUCCESS
371
      ops[1].status = constants.OP_STATUS_SUCCESS
372
      ops[2].status = constants.OP_STATUS_WAITING
373

    
374
    def _Running(ops):
375
      ops[0].status = constants.OP_STATUS_SUCCESS
376
      ops[1].status = constants.OP_STATUS_RUNNING
377
      for op in ops[2:]:
378
        op.status = constants.OP_STATUS_QUEUED
379

    
380
    def _Canceling1(ops):
381
      ops[0].status = constants.OP_STATUS_SUCCESS
382
      ops[1].status = constants.OP_STATUS_SUCCESS
383
      for op in ops[2:]:
384
        op.status = constants.OP_STATUS_CANCELING
385

    
386
    def _Canceling2(ops):
387
      for op in ops:
388
        op.status = constants.OP_STATUS_CANCELING
389

    
390
    def _Canceled(ops):
391
      for op in ops:
392
        op.status = constants.OP_STATUS_CANCELED
393

    
394
    def _Error1(ops):
395
      for idx, op in enumerate(ops):
396
        if idx > 3:
397
          op.status = constants.OP_STATUS_ERROR
398
        else:
399
          op.status = constants.OP_STATUS_SUCCESS
400

    
401
    def _Error2(ops):
402
      for op in ops:
403
        op.status = constants.OP_STATUS_ERROR
404

    
405
    def _Success(ops):
406
      for op in ops:
407
        op.status = constants.OP_STATUS_SUCCESS
408

    
409
    tests = {
410
      constants.JOB_STATUS_QUEUED: [_Queued],
411
      constants.JOB_STATUS_WAITING: [_Waitlock1, _Waitlock2],
412
      constants.JOB_STATUS_RUNNING: [_Running],
413
      constants.JOB_STATUS_CANCELING: [_Canceling1, _Canceling2],
414
      constants.JOB_STATUS_CANCELED: [_Canceled],
415
      constants.JOB_STATUS_ERROR: [_Error1, _Error2],
416
      constants.JOB_STATUS_SUCCESS: [_Success],
417
      }
418

    
419
    def _NewJob():
420
      job = jqueue._QueuedJob(None, 1,
421
                              [opcodes.OpTestDelay() for _ in range(10)],
422
                              True)
423
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
424
      self.assert_(compat.all(op.status == constants.OP_STATUS_QUEUED
425
                              for op in job.ops))
426
      return job
427

    
428
    for status in constants.JOB_STATUS_ALL:
429
      sttests = tests[status]
430
      assert sttests
431
      for fn in sttests:
432
        job = _NewJob()
433
        fn(job.ops)
434
        self.assertEqual(job.CalcStatus(), status)
435

    
436

    
437
class _FakeDependencyManager:
438
  def __init__(self):
439
    self._checks = []
440
    self._notifications = []
441
    self._waiting = set()
442

    
443
  def AddCheckResult(self, job, dep_job_id, dep_status, result):
444
    self._checks.append((job, dep_job_id, dep_status, result))
445

    
446
  def CountPendingResults(self):
447
    return len(self._checks)
448

    
449
  def CountWaitingJobs(self):
450
    return len(self._waiting)
451

    
452
  def GetNextNotification(self):
453
    return self._notifications.pop(0)
454

    
455
  def JobWaiting(self, job):
456
    return job in self._waiting
457

    
458
  def CheckAndRegister(self, job, dep_job_id, dep_status):
459
    (exp_job, exp_dep_job_id, exp_dep_status, result) = self._checks.pop(0)
460

    
461
    assert exp_job == job
462
    assert exp_dep_job_id == dep_job_id
463
    assert exp_dep_status == dep_status
464

    
465
    (result_status, _) = result
466

    
467
    if result_status == jqueue._JobDependencyManager.WAIT:
468
      self._waiting.add(job)
469
    elif result_status == jqueue._JobDependencyManager.CONTINUE:
470
      self._waiting.remove(job)
471

    
472
    return result
473

    
474
  def NotifyWaiters(self, job_id):
475
    self._notifications.append(job_id)
476

    
477

    
478
class _DisabledFakeDependencyManager:
479
  def JobWaiting(self, _):
480
    return False
481

    
482
  def CheckAndRegister(self, *args):
483
    assert False, "Should not be called"
484

    
485
  def NotifyWaiters(self, _):
486
    pass
487

    
488

    
489
class _FakeQueueForProc:
490
  def __init__(self, depmgr=None):
491
    self._acquired = False
492
    self._updates = []
493
    self._submitted = []
494

    
495
    self._submit_count = itertools.count(1000)
496

    
497
    if depmgr:
498
      self.depmgr = depmgr
499
    else:
500
      self.depmgr = _DisabledFakeDependencyManager()
501

    
502
  def IsAcquired(self):
503
    return self._acquired
504

    
505
  def GetNextUpdate(self):
506
    return self._updates.pop(0)
507

    
508
  def GetNextSubmittedJob(self):
509
    return self._submitted.pop(0)
510

    
511
  def acquire(self, shared=0):
512
    assert shared == 1
513
    self._acquired = True
514

    
515
  def release(self):
516
    assert self._acquired
517
    self._acquired = False
518

    
519
  def UpdateJobUnlocked(self, job, replicate=True):
520
    assert self._acquired, "Lock not acquired while updating job"
521
    self._updates.append((job, bool(replicate)))
522

    
523
  def SubmitManyJobs(self, jobs):
524
    assert not self._acquired, "Lock acquired while submitting jobs"
525
    job_ids = [self._submit_count.next() for _ in jobs]
526
    self._submitted.extend(zip(job_ids, jobs))
527
    return job_ids
528

    
529

    
530
class _FakeExecOpCodeForProc:
531
  def __init__(self, queue, before_start, after_start):
532
    self._queue = queue
533
    self._before_start = before_start
534
    self._after_start = after_start
535

    
536
  def __call__(self, op, cbs, timeout=None, priority=None):
537
    assert isinstance(op, opcodes.OpTestDummy)
538
    assert not self._queue.IsAcquired(), \
539
           "Queue lock not released when executing opcode"
540

    
541
    if self._before_start:
542
      self._before_start(timeout, priority)
543

    
544
    cbs.NotifyStart()
545

    
546
    if self._after_start:
547
      self._after_start(op, cbs)
548

    
549
    # Check again after the callbacks
550
    assert not self._queue.IsAcquired()
551

    
552
    if op.fail:
553
      raise errors.OpExecError("Error requested (%s)" % op.result)
554

    
555
    if hasattr(op, "submit_jobs") and op.submit_jobs is not None:
556
      return cbs.SubmitManyJobs(op.submit_jobs)
557

    
558
    return op.result
559

    
560

    
561
class _JobProcessorTestUtils:
562
  def _CreateJob(self, queue, job_id, ops):
563
    job = jqueue._QueuedJob(queue, job_id, ops, True)
564
    self.assertFalse(job.start_timestamp)
565
    self.assertFalse(job.end_timestamp)
566
    self.assertEqual(len(ops), len(job.ops))
567
    self.assert_(compat.all(op.input == inp
568
                            for (op, inp) in zip(job.ops, ops)))
569
    self.assertEqual(job.GetInfo(["ops"]), [[op.__getstate__() for op in ops]])
570
    return job
571

    
572

    
573
class TestJobProcessor(unittest.TestCase, _JobProcessorTestUtils):
574
  def _GenericCheckJob(self, job):
575
    assert compat.all(isinstance(op.input, opcodes.OpTestDummy)
576
                      for op in job.ops)
577

    
578
    self.assertEqual(job.GetInfo(["opstart", "opexec", "opend"]),
579
                     [[op.start_timestamp for op in job.ops],
580
                      [op.exec_timestamp for op in job.ops],
581
                      [op.end_timestamp for op in job.ops]])
582
    self.assertEqual(job.GetInfo(["received_ts", "start_ts", "end_ts"]),
583
                     [job.received_timestamp,
584
                      job.start_timestamp,
585
                      job.end_timestamp])
586
    self.assert_(job.start_timestamp)
587
    self.assert_(job.end_timestamp)
588
    self.assertEqual(job.start_timestamp, job.ops[0].start_timestamp)
589

    
590
  def testSuccess(self):
591
    queue = _FakeQueueForProc()
592

    
593
    for (job_id, opcount) in [(25351, 1), (6637, 3),
594
                              (24644, 10), (32207, 100)]:
595
      ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
596
             for i in range(opcount)]
597

    
598
      # Create job
599
      job = self._CreateJob(queue, job_id, ops)
600

    
601
      def _BeforeStart(timeout, priority):
602
        self.assertEqual(queue.GetNextUpdate(), (job, True))
603
        self.assertRaises(IndexError, queue.GetNextUpdate)
604
        self.assertFalse(queue.IsAcquired())
605
        self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITING)
606
        self.assertFalse(job.cur_opctx)
607

    
608
      def _AfterStart(op, cbs):
609
        self.assertEqual(queue.GetNextUpdate(), (job, True))
610
        self.assertRaises(IndexError, queue.GetNextUpdate)
611

    
612
        self.assertFalse(queue.IsAcquired())
613
        self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
614
        self.assertFalse(job.cur_opctx)
615

    
616
        # Job is running, cancelling shouldn't be possible
617
        (success, _) = job.Cancel()
618
        self.assertFalse(success)
619

    
620
      opexec = _FakeExecOpCodeForProc(queue, _BeforeStart, _AfterStart)
621

    
622
      for idx in range(len(ops)):
623
        self.assertRaises(IndexError, queue.GetNextUpdate)
624
        result = jqueue._JobProcessor(queue, opexec, job)()
625
        self.assertEqual(queue.GetNextUpdate(), (job, True))
626
        self.assertRaises(IndexError, queue.GetNextUpdate)
627
        if idx == len(ops) - 1:
628
          # Last opcode
629
          self.assertEqual(result, jqueue._JobProcessor.FINISHED)
630
        else:
631
          self.assertEqual(result, jqueue._JobProcessor.DEFER)
632

    
633
          self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
634
          self.assert_(job.start_timestamp)
635
          self.assertFalse(job.end_timestamp)
636

    
637
      self.assertRaises(IndexError, queue.GetNextUpdate)
638

    
639
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_SUCCESS)
640
      self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_SUCCESS])
641
      self.assertEqual(job.GetInfo(["opresult"]),
642
                       [[op.input.result for op in job.ops]])
643
      self.assertEqual(job.GetInfo(["opstatus"]),
644
                       [len(job.ops) * [constants.OP_STATUS_SUCCESS]])
645
      self.assert_(compat.all(op.start_timestamp and op.end_timestamp
646
                              for op in job.ops))
647

    
648
      self._GenericCheckJob(job)
649

    
650
      # Calling the processor on a finished job should be a no-op
651
      self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
652
                       jqueue._JobProcessor.FINISHED)
653
      self.assertRaises(IndexError, queue.GetNextUpdate)
654

    
655
  def testOpcodeError(self):
656
    queue = _FakeQueueForProc()
657

    
658
    testdata = [
659
      (17077, 1, 0, 0),
660
      (1782, 5, 2, 2),
661
      (18179, 10, 9, 9),
662
      (4744, 10, 3, 8),
663
      (23816, 100, 39, 45),
664
      ]
665

    
666
    for (job_id, opcount, failfrom, failto) in testdata:
667
      # Prepare opcodes
668
      ops = [opcodes.OpTestDummy(result="Res%s" % i,
669
                                 fail=(failfrom <= i and
670
                                       i <= failto))
671
             for i in range(opcount)]
672

    
673
      # Create job
674
      job = self._CreateJob(queue, job_id, ops)
675

    
676
      opexec = _FakeExecOpCodeForProc(queue, None, None)
677

    
678
      for idx in range(len(ops)):
679
        self.assertRaises(IndexError, queue.GetNextUpdate)
680
        result = jqueue._JobProcessor(queue, opexec, job)()
681
        # queued to waitlock
682
        self.assertEqual(queue.GetNextUpdate(), (job, True))
683
        # waitlock to running
684
        self.assertEqual(queue.GetNextUpdate(), (job, True))
685
        # Opcode result
686
        self.assertEqual(queue.GetNextUpdate(), (job, True))
687
        self.assertRaises(IndexError, queue.GetNextUpdate)
688

    
689
        if idx in (failfrom, len(ops) - 1):
690
          # Last opcode
691
          self.assertEqual(result, jqueue._JobProcessor.FINISHED)
692
          break
693

    
694
        self.assertEqual(result, jqueue._JobProcessor.DEFER)
695

    
696
        self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
697

    
698
      self.assertRaises(IndexError, queue.GetNextUpdate)
699

    
700
      # Check job status
701
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_ERROR)
702
      self.assertEqual(job.GetInfo(["id"]), [job_id])
703
      self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_ERROR])
704

    
705
      # Check opcode status
706
      data = zip(job.ops,
707
                 job.GetInfo(["opstatus"])[0],
708
                 job.GetInfo(["opresult"])[0])
709

    
710
      for idx, (op, opstatus, opresult) in enumerate(data):
711
        if idx < failfrom:
712
          assert not op.input.fail
713
          self.assertEqual(opstatus, constants.OP_STATUS_SUCCESS)
714
          self.assertEqual(opresult, op.input.result)
715
        elif idx <= failto:
716
          assert op.input.fail
717
          self.assertEqual(opstatus, constants.OP_STATUS_ERROR)
718
          self.assertRaises(errors.OpExecError, errors.MaybeRaise, opresult)
719
        else:
720
          assert not op.input.fail
721
          self.assertEqual(opstatus, constants.OP_STATUS_ERROR)
722
          self.assertRaises(errors.OpExecError, errors.MaybeRaise, opresult)
723

    
724
      self.assert_(compat.all(op.start_timestamp and op.end_timestamp
725
                              for op in job.ops[:failfrom]))
726

    
727
      self._GenericCheckJob(job)
728

    
729
      # Calling the processor on a finished job should be a no-op
730
      self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
731
                       jqueue._JobProcessor.FINISHED)
732
      self.assertRaises(IndexError, queue.GetNextUpdate)
733

    
734
  def testCancelWhileInQueue(self):
735
    queue = _FakeQueueForProc()
736

    
737
    ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
738
           for i in range(5)]
739

    
740
    # Create job
741
    job_id = 17045
742
    job = self._CreateJob(queue, job_id, ops)
743

    
744
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
745

    
746
    # Mark as cancelled
747
    (success, _) = job.Cancel()
748
    self.assert_(success)
749

    
750
    self.assertRaises(IndexError, queue.GetNextUpdate)
751

    
752
    self.assertFalse(job.start_timestamp)
753
    self.assertTrue(job.end_timestamp)
754
    self.assert_(compat.all(op.status == constants.OP_STATUS_CANCELED
755
                            for op in job.ops))
756

    
757
    # Serialize to check for differences
758
    before_proc = job.Serialize()
759

    
760
    # Simulate processor called in workerpool
761
    opexec = _FakeExecOpCodeForProc(queue, None, None)
762
    self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
763
                     jqueue._JobProcessor.FINISHED)
764

    
765
    # Check result
766
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_CANCELED)
767
    self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_CANCELED])
768
    self.assertFalse(job.start_timestamp)
769
    self.assertTrue(job.end_timestamp)
770
    self.assertFalse(compat.any(op.start_timestamp or op.end_timestamp
771
                                for op in job.ops))
772
    self.assertEqual(job.GetInfo(["opstatus", "opresult"]),
773
                     [[constants.OP_STATUS_CANCELED for _ in job.ops],
774
                      ["Job canceled by request" for _ in job.ops]])
775

    
776
    # Must not have changed or written
777
    self.assertEqual(before_proc, job.Serialize())
778
    self.assertRaises(IndexError, queue.GetNextUpdate)
779

    
780
  def testCancelWhileWaitlockInQueue(self):
781
    queue = _FakeQueueForProc()
782

    
783
    ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
784
           for i in range(5)]
785

    
786
    # Create job
787
    job_id = 8645
788
    job = self._CreateJob(queue, job_id, ops)
789

    
790
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
791

    
792
    job.ops[0].status = constants.OP_STATUS_WAITING
793

    
794
    assert len(job.ops) == 5
795

    
796
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITING)
797

    
798
    # Mark as cancelling
799
    (success, _) = job.Cancel()
800
    self.assert_(success)
801

    
802
    self.assertRaises(IndexError, queue.GetNextUpdate)
803

    
804
    self.assert_(compat.all(op.status == constants.OP_STATUS_CANCELING
805
                            for op in job.ops))
806

    
807
    opexec = _FakeExecOpCodeForProc(queue, None, None)
808
    self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
809
                     jqueue._JobProcessor.FINISHED)
810

    
811
    # Check result
812
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_CANCELED)
813
    self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_CANCELED])
814
    self.assertFalse(job.start_timestamp)
815
    self.assert_(job.end_timestamp)
816
    self.assertFalse(compat.any(op.start_timestamp or op.end_timestamp
817
                                for op in job.ops))
818
    self.assertEqual(job.GetInfo(["opstatus", "opresult"]),
819
                     [[constants.OP_STATUS_CANCELED for _ in job.ops],
820
                      ["Job canceled by request" for _ in job.ops]])
821

    
822
  def testCancelWhileWaitlock(self):
823
    queue = _FakeQueueForProc()
824

    
825
    ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
826
           for i in range(5)]
827

    
828
    # Create job
829
    job_id = 11009
830
    job = self._CreateJob(queue, job_id, ops)
831

    
832
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
833

    
834
    def _BeforeStart(timeout, priority):
835
      self.assertEqual(queue.GetNextUpdate(), (job, True))
836
      self.assertRaises(IndexError, queue.GetNextUpdate)
837
      self.assertFalse(queue.IsAcquired())
838
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITING)
839

    
840
      # Mark as cancelled
841
      (success, _) = job.Cancel()
842
      self.assert_(success)
843

    
844
      self.assert_(compat.all(op.status == constants.OP_STATUS_CANCELING
845
                              for op in job.ops))
846
      self.assertRaises(IndexError, queue.GetNextUpdate)
847

    
848
    def _AfterStart(op, cbs):
849
      self.assertEqual(queue.GetNextUpdate(), (job, True))
850
      self.assertRaises(IndexError, queue.GetNextUpdate)
851
      self.assertFalse(queue.IsAcquired())
852
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
853

    
854
    opexec = _FakeExecOpCodeForProc(queue, _BeforeStart, _AfterStart)
855

    
856
    self.assertRaises(IndexError, queue.GetNextUpdate)
857
    self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
858
                     jqueue._JobProcessor.FINISHED)
859
    self.assertEqual(queue.GetNextUpdate(), (job, True))
860
    self.assertRaises(IndexError, queue.GetNextUpdate)
861

    
862
    # Check result
863
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_CANCELED)
864
    self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_CANCELED])
865
    self.assert_(job.start_timestamp)
866
    self.assert_(job.end_timestamp)
867
    self.assertFalse(compat.all(op.start_timestamp and op.end_timestamp
868
                                for op in job.ops))
869
    self.assertEqual(job.GetInfo(["opstatus", "opresult"]),
870
                     [[constants.OP_STATUS_CANCELED for _ in job.ops],
871
                      ["Job canceled by request" for _ in job.ops]])
872

    
873
  def testCancelWhileWaitlockWithTimeout(self):
874
    queue = _FakeQueueForProc()
875

    
876
    ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
877
           for i in range(5)]
878

    
879
    # Create job
880
    job_id = 24314
881
    job = self._CreateJob(queue, job_id, ops)
882

    
883
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
884

    
885
    def _BeforeStart(timeout, priority):
886
      self.assertFalse(queue.IsAcquired())
887
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITING)
888

    
889
      # Mark as cancelled
890
      (success, _) = job.Cancel()
891
      self.assert_(success)
892

    
893
      self.assert_(compat.all(op.status == constants.OP_STATUS_CANCELING
894
                              for op in job.ops))
895

    
896
      # Fake an acquire attempt timing out
897
      raise mcpu.LockAcquireTimeout()
898

    
899
    def _AfterStart(op, cbs):
900
      self.fail("Should not reach this")
901

    
902
    opexec = _FakeExecOpCodeForProc(queue, _BeforeStart, _AfterStart)
903

    
904
    self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
905
                     jqueue._JobProcessor.FINISHED)
906

    
907
    # Check result
908
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_CANCELED)
909
    self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_CANCELED])
910
    self.assert_(job.start_timestamp)
911
    self.assert_(job.end_timestamp)
912
    self.assertFalse(compat.all(op.start_timestamp and op.end_timestamp
913
                                for op in job.ops))
914
    self.assertEqual(job.GetInfo(["opstatus", "opresult"]),
915
                     [[constants.OP_STATUS_CANCELED for _ in job.ops],
916
                      ["Job canceled by request" for _ in job.ops]])
917

    
918
  def testCancelWhileRunning(self):
919
    # Tests canceling a job with finished opcodes and more, unprocessed ones
920
    queue = _FakeQueueForProc()
921

    
922
    ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
923
           for i in range(3)]
924

    
925
    # Create job
926
    job_id = 28492
927
    job = self._CreateJob(queue, job_id, ops)
928

    
929
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
930

    
931
    opexec = _FakeExecOpCodeForProc(queue, None, None)
932

    
933
    # Run one opcode
934
    self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
935
                     jqueue._JobProcessor.DEFER)
936

    
937
    # Job goes back to queued
938
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
939
    self.assertEqual(job.GetInfo(["opstatus", "opresult"]),
940
                     [[constants.OP_STATUS_SUCCESS,
941
                       constants.OP_STATUS_QUEUED,
942
                       constants.OP_STATUS_QUEUED],
943
                      ["Res0", None, None]])
944

    
945
    # Mark as cancelled
946
    (success, _) = job.Cancel()
947
    self.assert_(success)
948

    
949
    # Try processing another opcode (this will actually cancel the job)
950
    self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
951
                     jqueue._JobProcessor.FINISHED)
952

    
953
    # Check result
954
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_CANCELED)
955
    self.assertEqual(job.GetInfo(["id"]), [job_id])
956
    self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_CANCELED])
957
    self.assertEqual(job.GetInfo(["opstatus", "opresult"]),
958
                     [[constants.OP_STATUS_SUCCESS,
959
                       constants.OP_STATUS_CANCELED,
960
                       constants.OP_STATUS_CANCELED],
961
                      ["Res0", "Job canceled by request",
962
                       "Job canceled by request"]])
963

    
964
  def testPartiallyRun(self):
965
    # Tests calling the processor on a job that's been partially run before the
966
    # program was restarted
967
    queue = _FakeQueueForProc()
968

    
969
    opexec = _FakeExecOpCodeForProc(queue, None, None)
970

    
971
    for job_id, successcount in [(30697, 1), (2552, 4), (12489, 9)]:
972
      ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
973
             for i in range(10)]
974

    
975
      # Create job
976
      job = self._CreateJob(queue, job_id, ops)
977

    
978
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
979

    
980
      for _ in range(successcount):
981
        self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
982
                         jqueue._JobProcessor.DEFER)
983

    
984
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
985
      self.assertEqual(job.GetInfo(["opstatus"]),
986
                       [[constants.OP_STATUS_SUCCESS
987
                         for _ in range(successcount)] +
988
                        [constants.OP_STATUS_QUEUED
989
                         for _ in range(len(ops) - successcount)]])
990

    
991
      self.assert_(job.ops_iter)
992

    
993
      # Serialize and restore (simulates program restart)
994
      newjob = jqueue._QueuedJob.Restore(queue, job.Serialize(), True)
995
      self.assertFalse(newjob.ops_iter)
996
      self._TestPartial(newjob, successcount)
997

    
998
  def _TestPartial(self, job, successcount):
999
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1000
    self.assertEqual(job.start_timestamp, job.ops[0].start_timestamp)
1001

    
1002
    queue = _FakeQueueForProc()
1003
    opexec = _FakeExecOpCodeForProc(queue, None, None)
1004

    
1005
    for remaining in reversed(range(len(job.ops) - successcount)):
1006
      result = jqueue._JobProcessor(queue, opexec, job)()
1007
      self.assertEqual(queue.GetNextUpdate(), (job, True))
1008
      self.assertEqual(queue.GetNextUpdate(), (job, True))
1009
      self.assertEqual(queue.GetNextUpdate(), (job, True))
1010
      self.assertRaises(IndexError, queue.GetNextUpdate)
1011

    
1012
      if remaining == 0:
1013
        # Last opcode
1014
        self.assertEqual(result, jqueue._JobProcessor.FINISHED)
1015
        break
1016

    
1017
      self.assertEqual(result, jqueue._JobProcessor.DEFER)
1018

    
1019
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1020

    
1021
    self.assertRaises(IndexError, queue.GetNextUpdate)
1022
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_SUCCESS)
1023
    self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_SUCCESS])
1024
    self.assertEqual(job.GetInfo(["opresult"]),
1025
                     [[op.input.result for op in job.ops]])
1026
    self.assertEqual(job.GetInfo(["opstatus"]),
1027
                     [[constants.OP_STATUS_SUCCESS for _ in job.ops]])
1028
    self.assert_(compat.all(op.start_timestamp and op.end_timestamp
1029
                            for op in job.ops))
1030

    
1031
    self._GenericCheckJob(job)
1032

    
1033
    # Calling the processor on a finished job should be a no-op
1034
    self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
1035
                     jqueue._JobProcessor.FINISHED)
1036
    self.assertRaises(IndexError, queue.GetNextUpdate)
1037

    
1038
    # ... also after being restored
1039
    job2 = jqueue._QueuedJob.Restore(queue, job.Serialize(), True)
1040
    # Calling the processor on a finished job should be a no-op
1041
    self.assertEqual(jqueue._JobProcessor(queue, opexec, job2)(),
1042
                     jqueue._JobProcessor.FINISHED)
1043
    self.assertRaises(IndexError, queue.GetNextUpdate)
1044

    
1045
  def testProcessorOnRunningJob(self):
1046
    ops = [opcodes.OpTestDummy(result="result", fail=False)]
1047

    
1048
    queue = _FakeQueueForProc()
1049
    opexec = _FakeExecOpCodeForProc(queue, None, None)
1050

    
1051
    # Create job
1052
    job = self._CreateJob(queue, 9571, ops)
1053

    
1054
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1055

    
1056
    job.ops[0].status = constants.OP_STATUS_RUNNING
1057

    
1058
    assert len(job.ops) == 1
1059

    
1060
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
1061

    
1062
    # Calling on running job must fail
1063
    self.assertRaises(errors.ProgrammerError,
1064
                      jqueue._JobProcessor(queue, opexec, job))
1065

    
1066
  def testLogMessages(self):
1067
    # Tests the "Feedback" callback function
1068
    queue = _FakeQueueForProc()
1069

    
1070
    messages = {
1071
      1: [
1072
        (None, "Hello"),
1073
        (None, "World"),
1074
        (constants.ELOG_MESSAGE, "there"),
1075
        ],
1076
      4: [
1077
        (constants.ELOG_JQUEUE_TEST, (1, 2, 3)),
1078
        (constants.ELOG_JQUEUE_TEST, ("other", "type")),
1079
        ],
1080
      }
1081
    ops = [opcodes.OpTestDummy(result="Logtest%s" % i, fail=False,
1082
                               messages=messages.get(i, []))
1083
           for i in range(5)]
1084

    
1085
    # Create job
1086
    job = self._CreateJob(queue, 29386, ops)
1087

    
1088
    def _BeforeStart(timeout, priority):
1089
      self.assertEqual(queue.GetNextUpdate(), (job, True))
1090
      self.assertRaises(IndexError, queue.GetNextUpdate)
1091
      self.assertFalse(queue.IsAcquired())
1092
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITING)
1093

    
1094
    def _AfterStart(op, cbs):
1095
      self.assertEqual(queue.GetNextUpdate(), (job, True))
1096
      self.assertRaises(IndexError, queue.GetNextUpdate)
1097
      self.assertFalse(queue.IsAcquired())
1098
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
1099

    
1100
      self.assertRaises(AssertionError, cbs.Feedback,
1101
                        "too", "many", "arguments")
1102

    
1103
      for (log_type, msg) in op.messages:
1104
        self.assertRaises(IndexError, queue.GetNextUpdate)
1105
        if log_type:
1106
          cbs.Feedback(log_type, msg)
1107
        else:
1108
          cbs.Feedback(msg)
1109
        # Check for job update without replication
1110
        self.assertEqual(queue.GetNextUpdate(), (job, False))
1111
        self.assertRaises(IndexError, queue.GetNextUpdate)
1112

    
1113
    opexec = _FakeExecOpCodeForProc(queue, _BeforeStart, _AfterStart)
1114

    
1115
    for remaining in reversed(range(len(job.ops))):
1116
      self.assertRaises(IndexError, queue.GetNextUpdate)
1117
      result = jqueue._JobProcessor(queue, opexec, job)()
1118
      self.assertEqual(queue.GetNextUpdate(), (job, True))
1119
      self.assertRaises(IndexError, queue.GetNextUpdate)
1120

    
1121
      if remaining == 0:
1122
        # Last opcode
1123
        self.assertEqual(result, jqueue._JobProcessor.FINISHED)
1124
        break
1125

    
1126
      self.assertEqual(result, jqueue._JobProcessor.DEFER)
1127

    
1128
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1129

    
1130
    self.assertRaises(IndexError, queue.GetNextUpdate)
1131

    
1132
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_SUCCESS)
1133
    self.assertEqual(job.GetInfo(["opresult"]),
1134
                     [[op.input.result for op in job.ops]])
1135

    
1136
    logmsgcount = sum(len(m) for m in messages.values())
1137

    
1138
    self._CheckLogMessages(job, logmsgcount)
1139

    
1140
    # Serialize and restore (simulates program restart)
1141
    newjob = jqueue._QueuedJob.Restore(queue, job.Serialize(), True)
1142
    self._CheckLogMessages(newjob, logmsgcount)
1143

    
1144
    # Check each message
1145
    prevserial = -1
1146
    for idx, oplog in enumerate(job.GetInfo(["oplog"])[0]):
1147
      for (serial, timestamp, log_type, msg) in oplog:
1148
        (exptype, expmsg) = messages.get(idx).pop(0)
1149
        if exptype:
1150
          self.assertEqual(log_type, exptype)
1151
        else:
1152
          self.assertEqual(log_type, constants.ELOG_MESSAGE)
1153
        self.assertEqual(expmsg, msg)
1154
        self.assert_(serial > prevserial)
1155
        prevserial = serial
1156

    
1157
  def _CheckLogMessages(self, job, count):
1158
    # Check serial
1159
    self.assertEqual(job.log_serial, count)
1160

    
1161
    # No filter
1162
    self.assertEqual(job.GetLogEntries(None),
1163
                     [entry for entries in job.GetInfo(["oplog"])[0] if entries
1164
                      for entry in entries])
1165

    
1166
    # Filter with serial
1167
    assert count > 3
1168
    self.assert_(job.GetLogEntries(3))
1169
    self.assertEqual(job.GetLogEntries(3),
1170
                     [entry for entries in job.GetInfo(["oplog"])[0] if entries
1171
                      for entry in entries][3:])
1172

    
1173
    # No log message after highest serial
1174
    self.assertFalse(job.GetLogEntries(count))
1175
    self.assertFalse(job.GetLogEntries(count + 3))
1176

    
1177
  def testSubmitManyJobs(self):
1178
    queue = _FakeQueueForProc()
1179

    
1180
    job_id = 15656
1181
    ops = [
1182
      opcodes.OpTestDummy(result="Res0", fail=False,
1183
                          submit_jobs=[]),
1184
      opcodes.OpTestDummy(result="Res1", fail=False,
1185
                          submit_jobs=[
1186
                            [opcodes.OpTestDummy(result="r1j0", fail=False)],
1187
                            ]),
1188
      opcodes.OpTestDummy(result="Res2", fail=False,
1189
                          submit_jobs=[
1190
                            [opcodes.OpTestDummy(result="r2j0o0", fail=False),
1191
                             opcodes.OpTestDummy(result="r2j0o1", fail=False),
1192
                             opcodes.OpTestDummy(result="r2j0o2", fail=False),
1193
                             opcodes.OpTestDummy(result="r2j0o3", fail=False)],
1194
                            [opcodes.OpTestDummy(result="r2j1", fail=False)],
1195
                            [opcodes.OpTestDummy(result="r2j3o0", fail=False),
1196
                             opcodes.OpTestDummy(result="r2j3o1", fail=False)],
1197
                            ]),
1198
      ]
1199

    
1200
    # Create job
1201
    job = self._CreateJob(queue, job_id, ops)
1202

    
1203
    def _BeforeStart(timeout, priority):
1204
      self.assertEqual(queue.GetNextUpdate(), (job, True))
1205
      self.assertRaises(IndexError, queue.GetNextUpdate)
1206
      self.assertFalse(queue.IsAcquired())
1207
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITING)
1208
      self.assertFalse(job.cur_opctx)
1209

    
1210
    def _AfterStart(op, cbs):
1211
      self.assertEqual(queue.GetNextUpdate(), (job, True))
1212
      self.assertRaises(IndexError, queue.GetNextUpdate)
1213

    
1214
      self.assertFalse(queue.IsAcquired())
1215
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
1216
      self.assertFalse(job.cur_opctx)
1217

    
1218
      # Job is running, cancelling shouldn't be possible
1219
      (success, _) = job.Cancel()
1220
      self.assertFalse(success)
1221

    
1222
    opexec = _FakeExecOpCodeForProc(queue, _BeforeStart, _AfterStart)
1223

    
1224
    for idx in range(len(ops)):
1225
      self.assertRaises(IndexError, queue.GetNextUpdate)
1226
      result = jqueue._JobProcessor(queue, opexec, job)()
1227
      self.assertEqual(queue.GetNextUpdate(), (job, True))
1228
      self.assertRaises(IndexError, queue.GetNextUpdate)
1229
      if idx == len(ops) - 1:
1230
        # Last opcode
1231
        self.assertEqual(result, jqueue._JobProcessor.FINISHED)
1232
      else:
1233
        self.assertEqual(result, jqueue._JobProcessor.DEFER)
1234

    
1235
        self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1236
        self.assert_(job.start_timestamp)
1237
        self.assertFalse(job.end_timestamp)
1238

    
1239
    self.assertRaises(IndexError, queue.GetNextUpdate)
1240

    
1241
    for idx, submitted_ops in enumerate(job_ops
1242
                                        for op in ops
1243
                                        for job_ops in op.submit_jobs):
1244
      self.assertEqual(queue.GetNextSubmittedJob(),
1245
                       (1000 + idx, submitted_ops))
1246
    self.assertRaises(IndexError, queue.GetNextSubmittedJob)
1247

    
1248
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_SUCCESS)
1249
    self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_SUCCESS])
1250
    self.assertEqual(job.GetInfo(["opresult"]),
1251
                     [[[], [1000], [1001, 1002, 1003]]])
1252
    self.assertEqual(job.GetInfo(["opstatus"]),
1253
                     [len(job.ops) * [constants.OP_STATUS_SUCCESS]])
1254

    
1255
    self._GenericCheckJob(job)
1256

    
1257
    # Calling the processor on a finished job should be a no-op
1258
    self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
1259
                     jqueue._JobProcessor.FINISHED)
1260
    self.assertRaises(IndexError, queue.GetNextUpdate)
1261

    
1262
  def testJobDependency(self):
1263
    depmgr = _FakeDependencyManager()
1264
    queue = _FakeQueueForProc(depmgr=depmgr)
1265

    
1266
    self.assertEqual(queue.depmgr, depmgr)
1267

    
1268
    prev_job_id = 22113
1269
    prev_job_id2 = 28102
1270
    job_id = 29929
1271
    ops = [
1272
      opcodes.OpTestDummy(result="Res0", fail=False,
1273
                          depends=[
1274
                            [prev_job_id2, None],
1275
                            [prev_job_id, None],
1276
                            ]),
1277
      opcodes.OpTestDummy(result="Res1", fail=False),
1278
      ]
1279

    
1280
    # Create job
1281
    job = self._CreateJob(queue, job_id, ops)
1282

    
1283
    def _BeforeStart(timeout, priority):
1284
      if attempt == 0 or attempt > 5:
1285
        # Job should only be updated when it wasn't waiting for another job
1286
        self.assertEqual(queue.GetNextUpdate(), (job, True))
1287
      self.assertRaises(IndexError, queue.GetNextUpdate)
1288
      self.assertFalse(queue.IsAcquired())
1289
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITING)
1290
      self.assertFalse(job.cur_opctx)
1291

    
1292
    def _AfterStart(op, cbs):
1293
      self.assertEqual(queue.GetNextUpdate(), (job, True))
1294
      self.assertRaises(IndexError, queue.GetNextUpdate)
1295

    
1296
      self.assertFalse(queue.IsAcquired())
1297
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
1298
      self.assertFalse(job.cur_opctx)
1299

    
1300
      # Job is running, cancelling shouldn't be possible
1301
      (success, _) = job.Cancel()
1302
      self.assertFalse(success)
1303

    
1304
    opexec = _FakeExecOpCodeForProc(queue, _BeforeStart, _AfterStart)
1305

    
1306
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1307

    
1308
    counter = itertools.count()
1309
    while True:
1310
      attempt = counter.next()
1311

    
1312
      self.assertRaises(IndexError, queue.GetNextUpdate)
1313
      self.assertRaises(IndexError, depmgr.GetNextNotification)
1314

    
1315
      if attempt < 2:
1316
        depmgr.AddCheckResult(job, prev_job_id2, None,
1317
                              (jqueue._JobDependencyManager.WAIT, "wait2"))
1318
      elif attempt == 2:
1319
        depmgr.AddCheckResult(job, prev_job_id2, None,
1320
                              (jqueue._JobDependencyManager.CONTINUE, "cont"))
1321
        # The processor will ask for the next dependency immediately
1322
        depmgr.AddCheckResult(job, prev_job_id, None,
1323
                              (jqueue._JobDependencyManager.WAIT, "wait"))
1324
      elif attempt < 5:
1325
        depmgr.AddCheckResult(job, prev_job_id, None,
1326
                              (jqueue._JobDependencyManager.WAIT, "wait"))
1327
      elif attempt == 5:
1328
        depmgr.AddCheckResult(job, prev_job_id, None,
1329
                              (jqueue._JobDependencyManager.CONTINUE, "cont"))
1330
      if attempt == 2:
1331
        self.assertEqual(depmgr.CountPendingResults(), 2)
1332
      elif attempt > 5:
1333
        self.assertEqual(depmgr.CountPendingResults(), 0)
1334
      else:
1335
        self.assertEqual(depmgr.CountPendingResults(), 1)
1336

    
1337
      result = jqueue._JobProcessor(queue, opexec, job)()
1338
      if attempt == 0 or attempt >= 5:
1339
        # Job should only be updated if there was an actual change
1340
        self.assertEqual(queue.GetNextUpdate(), (job, True))
1341
      self.assertRaises(IndexError, queue.GetNextUpdate)
1342
      self.assertFalse(depmgr.CountPendingResults())
1343

    
1344
      if attempt < 5:
1345
        # Simulate waiting for other job
1346
        self.assertEqual(result, jqueue._JobProcessor.WAITDEP)
1347
        self.assertTrue(job.cur_opctx)
1348
        self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITING)
1349
        self.assertRaises(IndexError, depmgr.GetNextNotification)
1350
        self.assert_(job.start_timestamp)
1351
        self.assertFalse(job.end_timestamp)
1352
        continue
1353

    
1354
      if result == jqueue._JobProcessor.FINISHED:
1355
        # Last opcode
1356
        self.assertFalse(job.cur_opctx)
1357
        break
1358

    
1359
      self.assertRaises(IndexError, depmgr.GetNextNotification)
1360

    
1361
      self.assertEqual(result, jqueue._JobProcessor.DEFER)
1362
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1363
      self.assert_(job.start_timestamp)
1364
      self.assertFalse(job.end_timestamp)
1365

    
1366
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_SUCCESS)
1367
    self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_SUCCESS])
1368
    self.assertEqual(job.GetInfo(["opresult"]),
1369
                     [[op.input.result for op in job.ops]])
1370
    self.assertEqual(job.GetInfo(["opstatus"]),
1371
                     [len(job.ops) * [constants.OP_STATUS_SUCCESS]])
1372
    self.assertTrue(compat.all(op.start_timestamp and op.end_timestamp
1373
                               for op in job.ops))
1374

    
1375
    self._GenericCheckJob(job)
1376

    
1377
    self.assertRaises(IndexError, queue.GetNextUpdate)
1378
    self.assertRaises(IndexError, depmgr.GetNextNotification)
1379
    self.assertFalse(depmgr.CountPendingResults())
1380
    self.assertFalse(depmgr.CountWaitingJobs())
1381

    
1382
    # Calling the processor on a finished job should be a no-op
1383
    self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
1384
                     jqueue._JobProcessor.FINISHED)
1385
    self.assertRaises(IndexError, queue.GetNextUpdate)
1386

    
1387
  def testJobDependencyCancel(self):
1388
    depmgr = _FakeDependencyManager()
1389
    queue = _FakeQueueForProc(depmgr=depmgr)
1390

    
1391
    self.assertEqual(queue.depmgr, depmgr)
1392

    
1393
    prev_job_id = 13623
1394
    job_id = 30876
1395
    ops = [
1396
      opcodes.OpTestDummy(result="Res0", fail=False),
1397
      opcodes.OpTestDummy(result="Res1", fail=False,
1398
                          depends=[
1399
                            [prev_job_id, None],
1400
                            ]),
1401
      opcodes.OpTestDummy(result="Res2", fail=False),
1402
      ]
1403

    
1404
    # Create job
1405
    job = self._CreateJob(queue, job_id, ops)
1406

    
1407
    def _BeforeStart(timeout, priority):
1408
      if attempt == 0 or attempt > 5:
1409
        # Job should only be updated when it wasn't waiting for another job
1410
        self.assertEqual(queue.GetNextUpdate(), (job, True))
1411
      self.assertRaises(IndexError, queue.GetNextUpdate)
1412
      self.assertFalse(queue.IsAcquired())
1413
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITING)
1414
      self.assertFalse(job.cur_opctx)
1415

    
1416
    def _AfterStart(op, cbs):
1417
      self.assertEqual(queue.GetNextUpdate(), (job, True))
1418
      self.assertRaises(IndexError, queue.GetNextUpdate)
1419

    
1420
      self.assertFalse(queue.IsAcquired())
1421
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
1422
      self.assertFalse(job.cur_opctx)
1423

    
1424
      # Job is running, cancelling shouldn't be possible
1425
      (success, _) = job.Cancel()
1426
      self.assertFalse(success)
1427

    
1428
    opexec = _FakeExecOpCodeForProc(queue, _BeforeStart, _AfterStart)
1429

    
1430
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1431

    
1432
    counter = itertools.count()
1433
    while True:
1434
      attempt = counter.next()
1435

    
1436
      self.assertRaises(IndexError, queue.GetNextUpdate)
1437
      self.assertRaises(IndexError, depmgr.GetNextNotification)
1438

    
1439
      if attempt == 0:
1440
        # This will handle the first opcode
1441
        pass
1442
      elif attempt < 4:
1443
        depmgr.AddCheckResult(job, prev_job_id, None,
1444
                              (jqueue._JobDependencyManager.WAIT, "wait"))
1445
      elif attempt == 4:
1446
        # Other job was cancelled
1447
        depmgr.AddCheckResult(job, prev_job_id, None,
1448
                              (jqueue._JobDependencyManager.CANCEL, "cancel"))
1449

    
1450
      if attempt == 0:
1451
        self.assertEqual(depmgr.CountPendingResults(), 0)
1452
      else:
1453
        self.assertEqual(depmgr.CountPendingResults(), 1)
1454

    
1455
      result = jqueue._JobProcessor(queue, opexec, job)()
1456
      if attempt <= 1 or attempt >= 4:
1457
        # Job should only be updated if there was an actual change
1458
        self.assertEqual(queue.GetNextUpdate(), (job, True))
1459
      self.assertRaises(IndexError, queue.GetNextUpdate)
1460
      self.assertFalse(depmgr.CountPendingResults())
1461

    
1462
      if attempt > 0 and attempt < 4:
1463
        # Simulate waiting for other job
1464
        self.assertEqual(result, jqueue._JobProcessor.WAITDEP)
1465
        self.assertTrue(job.cur_opctx)
1466
        self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITING)
1467
        self.assertRaises(IndexError, depmgr.GetNextNotification)
1468
        self.assert_(job.start_timestamp)
1469
        self.assertFalse(job.end_timestamp)
1470
        continue
1471

    
1472
      if result == jqueue._JobProcessor.FINISHED:
1473
        # Last opcode
1474
        self.assertFalse(job.cur_opctx)
1475
        break
1476

    
1477
      self.assertRaises(IndexError, depmgr.GetNextNotification)
1478

    
1479
      self.assertEqual(result, jqueue._JobProcessor.DEFER)
1480
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1481
      self.assert_(job.start_timestamp)
1482
      self.assertFalse(job.end_timestamp)
1483

    
1484
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_CANCELED)
1485
    self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_CANCELED])
1486
    self.assertEqual(job.GetInfo(["opstatus", "opresult"]),
1487
                     [[constants.OP_STATUS_SUCCESS,
1488
                       constants.OP_STATUS_CANCELED,
1489
                       constants.OP_STATUS_CANCELED],
1490
                      ["Res0", "Job canceled by request",
1491
                       "Job canceled by request"]])
1492

    
1493
    self._GenericCheckJob(job)
1494

    
1495
    self.assertRaises(IndexError, queue.GetNextUpdate)
1496
    self.assertRaises(IndexError, depmgr.GetNextNotification)
1497
    self.assertFalse(depmgr.CountPendingResults())
1498

    
1499
    # Calling the processor on a finished job should be a no-op
1500
    self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
1501
                     jqueue._JobProcessor.FINISHED)
1502
    self.assertRaises(IndexError, queue.GetNextUpdate)
1503

    
1504
  def testJobDependencyWrongstatus(self):
1505
    depmgr = _FakeDependencyManager()
1506
    queue = _FakeQueueForProc(depmgr=depmgr)
1507

    
1508
    self.assertEqual(queue.depmgr, depmgr)
1509

    
1510
    prev_job_id = 9741
1511
    job_id = 11763
1512
    ops = [
1513
      opcodes.OpTestDummy(result="Res0", fail=False),
1514
      opcodes.OpTestDummy(result="Res1", fail=False,
1515
                          depends=[
1516
                            [prev_job_id, None],
1517
                            ]),
1518
      opcodes.OpTestDummy(result="Res2", fail=False),
1519
      ]
1520

    
1521
    # Create job
1522
    job = self._CreateJob(queue, job_id, ops)
1523

    
1524
    def _BeforeStart(timeout, priority):
1525
      if attempt == 0 or attempt > 5:
1526
        # Job should only be updated when it wasn't waiting for another job
1527
        self.assertEqual(queue.GetNextUpdate(), (job, True))
1528
      self.assertRaises(IndexError, queue.GetNextUpdate)
1529
      self.assertFalse(queue.IsAcquired())
1530
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITING)
1531
      self.assertFalse(job.cur_opctx)
1532

    
1533
    def _AfterStart(op, cbs):
1534
      self.assertEqual(queue.GetNextUpdate(), (job, True))
1535
      self.assertRaises(IndexError, queue.GetNextUpdate)
1536

    
1537
      self.assertFalse(queue.IsAcquired())
1538
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
1539
      self.assertFalse(job.cur_opctx)
1540

    
1541
      # Job is running, cancelling shouldn't be possible
1542
      (success, _) = job.Cancel()
1543
      self.assertFalse(success)
1544

    
1545
    opexec = _FakeExecOpCodeForProc(queue, _BeforeStart, _AfterStart)
1546

    
1547
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1548

    
1549
    counter = itertools.count()
1550
    while True:
1551
      attempt = counter.next()
1552

    
1553
      self.assertRaises(IndexError, queue.GetNextUpdate)
1554
      self.assertRaises(IndexError, depmgr.GetNextNotification)
1555

    
1556
      if attempt == 0:
1557
        # This will handle the first opcode
1558
        pass
1559
      elif attempt < 4:
1560
        depmgr.AddCheckResult(job, prev_job_id, None,
1561
                              (jqueue._JobDependencyManager.WAIT, "wait"))
1562
      elif attempt == 4:
1563
        # Other job failed
1564
        depmgr.AddCheckResult(job, prev_job_id, None,
1565
                              (jqueue._JobDependencyManager.WRONGSTATUS, "w"))
1566

    
1567
      if attempt == 0:
1568
        self.assertEqual(depmgr.CountPendingResults(), 0)
1569
      else:
1570
        self.assertEqual(depmgr.CountPendingResults(), 1)
1571

    
1572
      result = jqueue._JobProcessor(queue, opexec, job)()
1573
      if attempt <= 1 or attempt >= 4:
1574
        # Job should only be updated if there was an actual change
1575
        self.assertEqual(queue.GetNextUpdate(), (job, True))
1576
      self.assertRaises(IndexError, queue.GetNextUpdate)
1577
      self.assertFalse(depmgr.CountPendingResults())
1578

    
1579
      if attempt > 0 and attempt < 4:
1580
        # Simulate waiting for other job
1581
        self.assertEqual(result, jqueue._JobProcessor.WAITDEP)
1582
        self.assertTrue(job.cur_opctx)
1583
        self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITING)
1584
        self.assertRaises(IndexError, depmgr.GetNextNotification)
1585
        self.assert_(job.start_timestamp)
1586
        self.assertFalse(job.end_timestamp)
1587
        continue
1588

    
1589
      if result == jqueue._JobProcessor.FINISHED:
1590
        # Last opcode
1591
        self.assertFalse(job.cur_opctx)
1592
        break
1593

    
1594
      self.assertRaises(IndexError, depmgr.GetNextNotification)
1595

    
1596
      self.assertEqual(result, jqueue._JobProcessor.DEFER)
1597
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1598
      self.assert_(job.start_timestamp)
1599
      self.assertFalse(job.end_timestamp)
1600

    
1601
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_ERROR)
1602
    self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_ERROR])
1603
    self.assertEqual(job.GetInfo(["opstatus"]),
1604
                     [[constants.OP_STATUS_SUCCESS,
1605
                       constants.OP_STATUS_ERROR,
1606
                       constants.OP_STATUS_ERROR]]),
1607

    
1608
    (opresult, ) = job.GetInfo(["opresult"])
1609
    self.assertEqual(len(opresult), len(ops))
1610
    self.assertEqual(opresult[0], "Res0")
1611
    self.assertTrue(errors.GetEncodedError(opresult[1]))
1612
    self.assertTrue(errors.GetEncodedError(opresult[2]))
1613

    
1614
    self._GenericCheckJob(job)
1615

    
1616
    self.assertRaises(IndexError, queue.GetNextUpdate)
1617
    self.assertRaises(IndexError, depmgr.GetNextNotification)
1618
    self.assertFalse(depmgr.CountPendingResults())
1619

    
1620
    # Calling the processor on a finished job should be a no-op
1621
    self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
1622
                     jqueue._JobProcessor.FINISHED)
1623
    self.assertRaises(IndexError, queue.GetNextUpdate)
1624

    
1625

    
1626
class _FakeTimeoutStrategy:
1627
  def __init__(self, timeouts):
1628
    self.timeouts = timeouts
1629
    self.attempts = 0
1630
    self.last_timeout = None
1631

    
1632
  def NextAttempt(self):
1633
    self.attempts += 1
1634
    if self.timeouts:
1635
      timeout = self.timeouts.pop(0)
1636
    else:
1637
      timeout = None
1638
    self.last_timeout = timeout
1639
    return timeout
1640

    
1641

    
1642
class TestJobProcessorTimeouts(unittest.TestCase, _JobProcessorTestUtils):
1643
  def setUp(self):
1644
    self.queue = _FakeQueueForProc()
1645
    self.job = None
1646
    self.curop = None
1647
    self.opcounter = None
1648
    self.timeout_strategy = None
1649
    self.retries = 0
1650
    self.prev_tsop = None
1651
    self.prev_prio = None
1652
    self.prev_status = None
1653
    self.lock_acq_prio = None
1654
    self.gave_lock = None
1655
    self.done_lock_before_blocking = False
1656

    
1657
  def _BeforeStart(self, timeout, priority):
1658
    job = self.job
1659

    
1660
    # If status has changed, job must've been written
1661
    if self.prev_status != self.job.ops[self.curop].status:
1662
      self.assertEqual(self.queue.GetNextUpdate(), (job, True))
1663
    self.assertRaises(IndexError, self.queue.GetNextUpdate)
1664

    
1665
    self.assertFalse(self.queue.IsAcquired())
1666
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITING)
1667

    
1668
    ts = self.timeout_strategy
1669

    
1670
    self.assert_(timeout is None or isinstance(timeout, (int, float)))
1671
    self.assertEqual(timeout, ts.last_timeout)
1672
    self.assertEqual(priority, job.ops[self.curop].priority)
1673

    
1674
    self.gave_lock = True
1675
    self.lock_acq_prio = priority
1676

    
1677
    if (self.curop == 3 and
1678
        job.ops[self.curop].priority == constants.OP_PRIO_HIGHEST + 3):
1679
      # Give locks before running into blocking acquire
1680
      assert self.retries == 7
1681
      self.retries = 0
1682
      self.done_lock_before_blocking = True
1683
      return
1684

    
1685
    if self.retries > 0:
1686
      self.assert_(timeout is not None)
1687
      self.retries -= 1
1688
      self.gave_lock = False
1689
      raise mcpu.LockAcquireTimeout()
1690

    
1691
    if job.ops[self.curop].priority == constants.OP_PRIO_HIGHEST:
1692
      assert self.retries == 0, "Didn't exhaust all retries at highest priority"
1693
      assert not ts.timeouts
1694
      self.assert_(timeout is None)
1695

    
1696
  def _AfterStart(self, op, cbs):
1697
    job = self.job
1698

    
1699
    # Setting to "running" requires an update
1700
    self.assertEqual(self.queue.GetNextUpdate(), (job, True))
1701
    self.assertRaises(IndexError, self.queue.GetNextUpdate)
1702

    
1703
    self.assertFalse(self.queue.IsAcquired())
1704
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
1705

    
1706
    # Job is running, cancelling shouldn't be possible
1707
    (success, _) = job.Cancel()
1708
    self.assertFalse(success)
1709

    
1710
  def _NextOpcode(self):
1711
    self.curop = self.opcounter.next()
1712
    self.prev_prio = self.job.ops[self.curop].priority
1713
    self.prev_status = self.job.ops[self.curop].status
1714

    
1715
  def _NewTimeoutStrategy(self):
1716
    job = self.job
1717

    
1718
    self.assertEqual(self.retries, 0)
1719

    
1720
    if self.prev_tsop == self.curop:
1721
      # Still on the same opcode, priority must've been increased
1722
      self.assertEqual(self.prev_prio, job.ops[self.curop].priority + 1)
1723

    
1724
    if self.curop == 1:
1725
      # Normal retry
1726
      timeouts = range(10, 31, 10)
1727
      self.retries = len(timeouts) - 1
1728

    
1729
    elif self.curop == 2:
1730
      # Let this run into a blocking acquire
1731
      timeouts = range(11, 61, 12)
1732
      self.retries = len(timeouts)
1733

    
1734
    elif self.curop == 3:
1735
      # Wait for priority to increase, but give lock before blocking acquire
1736
      timeouts = range(12, 100, 14)
1737
      self.retries = len(timeouts)
1738

    
1739
      self.assertFalse(self.done_lock_before_blocking)
1740

    
1741
    elif self.curop == 4:
1742
      self.assert_(self.done_lock_before_blocking)
1743

    
1744
      # Timeouts, but no need to retry
1745
      timeouts = range(10, 31, 10)
1746
      self.retries = 0
1747

    
1748
    elif self.curop == 5:
1749
      # Normal retry
1750
      timeouts = range(19, 100, 11)
1751
      self.retries = len(timeouts)
1752

    
1753
    else:
1754
      timeouts = []
1755
      self.retries = 0
1756

    
1757
    assert len(job.ops) == 10
1758
    assert self.retries <= len(timeouts)
1759

    
1760
    ts = _FakeTimeoutStrategy(timeouts)
1761

    
1762
    self.timeout_strategy = ts
1763
    self.prev_tsop = self.curop
1764
    self.prev_prio = job.ops[self.curop].priority
1765

    
1766
    return ts
1767

    
1768
  def testTimeout(self):
1769
    ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
1770
           for i in range(10)]
1771

    
1772
    # Create job
1773
    job_id = 15801
1774
    job = self._CreateJob(self.queue, job_id, ops)
1775
    self.job = job
1776

    
1777
    self.opcounter = itertools.count(0)
1778

    
1779
    opexec = _FakeExecOpCodeForProc(self.queue, self._BeforeStart,
1780
                                    self._AfterStart)
1781
    tsf = self._NewTimeoutStrategy
1782

    
1783
    self.assertFalse(self.done_lock_before_blocking)
1784

    
1785
    while True:
1786
      proc = jqueue._JobProcessor(self.queue, opexec, job,
1787
                                  _timeout_strategy_factory=tsf)
1788

    
1789
      self.assertRaises(IndexError, self.queue.GetNextUpdate)
1790

    
1791
      if self.curop is not None:
1792
        self.prev_status = self.job.ops[self.curop].status
1793

    
1794
      self.lock_acq_prio = None
1795

    
1796
      result = proc(_nextop_fn=self._NextOpcode)
1797
      assert self.curop is not None
1798

    
1799
      if result == jqueue._JobProcessor.FINISHED or self.gave_lock:
1800
        # Got lock and/or job is done, result must've been written
1801
        self.assertFalse(job.cur_opctx)
1802
        self.assertEqual(self.queue.GetNextUpdate(), (job, True))
1803
        self.assertRaises(IndexError, self.queue.GetNextUpdate)
1804
        self.assertEqual(self.lock_acq_prio, job.ops[self.curop].priority)
1805
        self.assert_(job.ops[self.curop].exec_timestamp)
1806

    
1807
      if result == jqueue._JobProcessor.FINISHED:
1808
        self.assertFalse(job.cur_opctx)
1809
        break
1810

    
1811
      self.assertEqual(result, jqueue._JobProcessor.DEFER)
1812

    
1813
      if self.curop == 0:
1814
        self.assertEqual(job.ops[self.curop].start_timestamp,
1815
                         job.start_timestamp)
1816

    
1817
      if self.gave_lock:
1818
        # Opcode finished, but job not yet done
1819
        self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1820
      else:
1821
        # Did not get locks
1822
        self.assert_(job.cur_opctx)
1823
        self.assertEqual(job.cur_opctx._timeout_strategy._fn,
1824
                         self.timeout_strategy.NextAttempt)
1825
        self.assertFalse(job.ops[self.curop].exec_timestamp)
1826
        self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITING)
1827

    
1828
        # If priority has changed since acquiring locks, the job must've been
1829
        # updated
1830
        if self.lock_acq_prio != job.ops[self.curop].priority:
1831
          self.assertEqual(self.queue.GetNextUpdate(), (job, True))
1832

    
1833
      self.assertRaises(IndexError, self.queue.GetNextUpdate)
1834

    
1835
      self.assert_(job.start_timestamp)
1836
      self.assertFalse(job.end_timestamp)
1837

    
1838
    self.assertEqual(self.curop, len(job.ops) - 1)
1839
    self.assertEqual(self.job, job)
1840
    self.assertEqual(self.opcounter.next(), len(job.ops))
1841
    self.assert_(self.done_lock_before_blocking)
1842

    
1843
    self.assertRaises(IndexError, self.queue.GetNextUpdate)
1844
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_SUCCESS)
1845
    self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_SUCCESS])
1846
    self.assertEqual(job.GetInfo(["opresult"]),
1847
                     [[op.input.result for op in job.ops]])
1848
    self.assertEqual(job.GetInfo(["opstatus"]),
1849
                     [len(job.ops) * [constants.OP_STATUS_SUCCESS]])
1850
    self.assert_(compat.all(op.start_timestamp and op.end_timestamp
1851
                            for op in job.ops))
1852

    
1853
    # Calling the processor on a finished job should be a no-op
1854
    self.assertEqual(jqueue._JobProcessor(self.queue, opexec, job)(),
1855
                     jqueue._JobProcessor.FINISHED)
1856
    self.assertRaises(IndexError, self.queue.GetNextUpdate)
1857

    
1858

    
1859
class TestJobDependencyManager(unittest.TestCase):
1860
  class _FakeJob:
1861
    def __init__(self, job_id):
1862
      self.id = str(job_id)
1863

    
1864
  def setUp(self):
1865
    self._status = []
1866
    self._queue = []
1867
    self.jdm = jqueue._JobDependencyManager(self._GetStatus, self._Enqueue)
1868

    
1869
  def _GetStatus(self, job_id):
1870
    (exp_job_id, result) = self._status.pop(0)
1871
    self.assertEqual(exp_job_id, job_id)
1872
    return result
1873

    
1874
  def _Enqueue(self, jobs):
1875
    self._queue.append(jobs)
1876

    
1877
  def testNotFinalizedThenCancel(self):
1878
    job = self._FakeJob(17697)
1879
    job_id = str(28625)
1880

    
1881
    self._status.append((job_id, constants.JOB_STATUS_RUNNING))
1882
    (result, _) = self.jdm.CheckAndRegister(job, job_id, [])
1883
    self.assertEqual(result, self.jdm.WAIT)
1884
    self.assertFalse(self._status)
1885
    self.assertFalse(self._queue)
1886
    self.assertTrue(self.jdm.JobWaiting(job))
1887
    self.assertEqual(self.jdm._waiters, {
1888
      job_id: set([job]),
1889
      })
1890

    
1891
    self._status.append((job_id, constants.JOB_STATUS_CANCELED))
1892
    (result, _) = self.jdm.CheckAndRegister(job, job_id, [])
1893
    self.assertEqual(result, self.jdm.CANCEL)
1894
    self.assertFalse(self._status)
1895
    self.assertFalse(self._queue)
1896
    self.assertFalse(self.jdm.JobWaiting(job))
1897

    
1898
  def testRequireCancel(self):
1899
    job = self._FakeJob(5278)
1900
    job_id = str(9610)
1901
    dep_status = [constants.JOB_STATUS_CANCELED]
1902

    
1903
    self._status.append((job_id, constants.JOB_STATUS_WAITING))
1904
    (result, _) = self.jdm.CheckAndRegister(job, job_id, dep_status)
1905
    self.assertEqual(result, self.jdm.WAIT)
1906
    self.assertFalse(self._status)
1907
    self.assertFalse(self._queue)
1908
    self.assertTrue(self.jdm.JobWaiting(job))
1909
    self.assertEqual(self.jdm._waiters, {
1910
      job_id: set([job]),
1911
      })
1912

    
1913
    self._status.append((job_id, constants.JOB_STATUS_CANCELED))
1914
    (result, _) = self.jdm.CheckAndRegister(job, job_id, dep_status)
1915
    self.assertEqual(result, self.jdm.CONTINUE)
1916
    self.assertFalse(self._status)
1917
    self.assertFalse(self._queue)
1918
    self.assertFalse(self.jdm.JobWaiting(job))
1919

    
1920
  def testRequireError(self):
1921
    job = self._FakeJob(21459)
1922
    job_id = str(25519)
1923
    dep_status = [constants.JOB_STATUS_ERROR]
1924

    
1925
    self._status.append((job_id, constants.JOB_STATUS_WAITING))
1926
    (result, _) = self.jdm.CheckAndRegister(job, job_id, dep_status)
1927
    self.assertEqual(result, self.jdm.WAIT)
1928
    self.assertFalse(self._status)
1929
    self.assertFalse(self._queue)
1930
    self.assertTrue(self.jdm.JobWaiting(job))
1931
    self.assertEqual(self.jdm._waiters, {
1932
      job_id: set([job]),
1933
      })
1934

    
1935
    self._status.append((job_id, constants.JOB_STATUS_ERROR))
1936
    (result, _) = self.jdm.CheckAndRegister(job, job_id, dep_status)
1937
    self.assertEqual(result, self.jdm.CONTINUE)
1938
    self.assertFalse(self._status)
1939
    self.assertFalse(self._queue)
1940
    self.assertFalse(self.jdm.JobWaiting(job))
1941

    
1942
  def testRequireMultiple(self):
1943
    dep_status = list(constants.JOBS_FINALIZED)
1944

    
1945
    for end_status in dep_status:
1946
      job = self._FakeJob(21343)
1947
      job_id = str(14609)
1948

    
1949
      self._status.append((job_id, constants.JOB_STATUS_WAITING))
1950
      (result, _) = self.jdm.CheckAndRegister(job, job_id, dep_status)
1951
      self.assertEqual(result, self.jdm.WAIT)
1952
      self.assertFalse(self._status)
1953
      self.assertFalse(self._queue)
1954
      self.assertTrue(self.jdm.JobWaiting(job))
1955
      self.assertEqual(self.jdm._waiters, {
1956
        job_id: set([job]),
1957
        })
1958

    
1959
      self._status.append((job_id, end_status))
1960
      (result, _) = self.jdm.CheckAndRegister(job, job_id, dep_status)
1961
      self.assertEqual(result, self.jdm.CONTINUE)
1962
      self.assertFalse(self._status)
1963
      self.assertFalse(self._queue)
1964
      self.assertFalse(self.jdm.JobWaiting(job))
1965

    
1966
  def testNotify(self):
1967
    job = self._FakeJob(8227)
1968
    job_id = str(4113)
1969

    
1970
    self._status.append((job_id, constants.JOB_STATUS_RUNNING))
1971
    (result, _) = self.jdm.CheckAndRegister(job, job_id, [])
1972
    self.assertEqual(result, self.jdm.WAIT)
1973
    self.assertFalse(self._status)
1974
    self.assertFalse(self._queue)
1975
    self.assertTrue(self.jdm.JobWaiting(job))
1976
    self.assertEqual(self.jdm._waiters, {
1977
      job_id: set([job]),
1978
      })
1979

    
1980
    self.jdm.NotifyWaiters(job_id)
1981
    self.assertFalse(self._status)
1982
    self.assertFalse(self.jdm._waiters)
1983
    self.assertFalse(self.jdm.JobWaiting(job))
1984
    self.assertEqual(self._queue, [set([job])])
1985

    
1986
  def testWrongStatus(self):
1987
    job = self._FakeJob(10102)
1988
    job_id = str(1271)
1989

    
1990
    self._status.append((job_id, constants.JOB_STATUS_QUEUED))
1991
    (result, _) = self.jdm.CheckAndRegister(job, job_id,
1992
                                            [constants.JOB_STATUS_SUCCESS])
1993
    self.assertEqual(result, self.jdm.WAIT)
1994
    self.assertFalse(self._status)
1995
    self.assertFalse(self._queue)
1996
    self.assertTrue(self.jdm.JobWaiting(job))
1997
    self.assertEqual(self.jdm._waiters, {
1998
      job_id: set([job]),
1999
      })
2000

    
2001
    self._status.append((job_id, constants.JOB_STATUS_ERROR))
2002
    (result, _) = self.jdm.CheckAndRegister(job, job_id,
2003
                                            [constants.JOB_STATUS_SUCCESS])
2004
    self.assertEqual(result, self.jdm.WRONGSTATUS)
2005
    self.assertFalse(self._status)
2006
    self.assertFalse(self._queue)
2007
    self.assertFalse(self.jdm.JobWaiting(job))
2008

    
2009
  def testCorrectStatus(self):
2010
    job = self._FakeJob(24273)
2011
    job_id = str(23885)
2012

    
2013
    self._status.append((job_id, constants.JOB_STATUS_QUEUED))
2014
    (result, _) = self.jdm.CheckAndRegister(job, job_id,
2015
                                            [constants.JOB_STATUS_SUCCESS])
2016
    self.assertEqual(result, self.jdm.WAIT)
2017
    self.assertFalse(self._status)
2018
    self.assertFalse(self._queue)
2019
    self.assertTrue(self.jdm.JobWaiting(job))
2020
    self.assertEqual(self.jdm._waiters, {
2021
      job_id: set([job]),
2022
      })
2023

    
2024
    self._status.append((job_id, constants.JOB_STATUS_SUCCESS))
2025
    (result, _) = self.jdm.CheckAndRegister(job, job_id,
2026
                                            [constants.JOB_STATUS_SUCCESS])
2027
    self.assertEqual(result, self.jdm.CONTINUE)
2028
    self.assertFalse(self._status)
2029
    self.assertFalse(self._queue)
2030
    self.assertFalse(self.jdm.JobWaiting(job))
2031

    
2032
  def testFinalizedRightAway(self):
2033
    job = self._FakeJob(224)
2034
    job_id = str(3081)
2035

    
2036
    self._status.append((job_id, constants.JOB_STATUS_SUCCESS))
2037
    (result, _) = self.jdm.CheckAndRegister(job, job_id,
2038
                                            [constants.JOB_STATUS_SUCCESS])
2039
    self.assertEqual(result, self.jdm.CONTINUE)
2040
    self.assertFalse(self._status)
2041
    self.assertFalse(self._queue)
2042
    self.assertFalse(self.jdm.JobWaiting(job))
2043
    self.assertEqual(self.jdm._waiters, {
2044
      job_id: set(),
2045
      })
2046

    
2047
    # Force cleanup
2048
    self.jdm.NotifyWaiters("0")
2049
    self.assertFalse(self.jdm._waiters)
2050
    self.assertFalse(self._status)
2051
    self.assertFalse(self._queue)
2052

    
2053
  def testSelfDependency(self):
2054
    job = self._FakeJob(18937)
2055

    
2056
    self._status.append((job.id, constants.JOB_STATUS_SUCCESS))
2057
    (result, _) = self.jdm.CheckAndRegister(job, job.id, [])
2058
    self.assertEqual(result, self.jdm.ERROR)
2059

    
2060
  def testJobDisappears(self):
2061
    job = self._FakeJob(30540)
2062
    job_id = str(23769)
2063

    
2064
    def _FakeStatus(_):
2065
      raise errors.JobLost("#msg#")
2066

    
2067
    jdm = jqueue._JobDependencyManager(_FakeStatus, None)
2068
    (result, _) = jdm.CheckAndRegister(job, job_id, [])
2069
    self.assertEqual(result, self.jdm.ERROR)
2070
    self.assertFalse(jdm.JobWaiting(job))
2071

    
2072

    
2073
if __name__ == "__main__":
2074
  testutils.GanetiTestProgram()