Statistics
| Branch: | Tag: | Revision:

root / test / ganeti.jqueue_unittest.py @ c0f6d0d8

History | View | Annotate | Download (69.9 kB)

1
#!/usr/bin/python
2
#
3

    
4
# Copyright (C) 2010, 2011 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Script for testing ganeti.jqueue"""
23

    
24
import os
25
import sys
26
import unittest
27
import tempfile
28
import shutil
29
import errno
30
import itertools
31

    
32
from ganeti import constants
33
from ganeti import utils
34
from ganeti import errors
35
from ganeti import jqueue
36
from ganeti import opcodes
37
from ganeti import compat
38
from ganeti import mcpu
39

    
40
import testutils
41

    
42

    
43
class _FakeJob:
44
  def __init__(self, job_id, status):
45
    self.id = job_id
46
    self.writable = False
47
    self._status = status
48
    self._log = []
49

    
50
  def SetStatus(self, status):
51
    self._status = status
52

    
53
  def AddLogEntry(self, msg):
54
    self._log.append((len(self._log), msg))
55

    
56
  def CalcStatus(self):
57
    return self._status
58

    
59
  def GetInfo(self, fields):
60
    result = []
61

    
62
    for name in fields:
63
      if name == "status":
64
        result.append(self._status)
65
      else:
66
        raise Exception("Unknown field")
67

    
68
    return result
69

    
70
  def GetLogEntries(self, newer_than):
71
    assert newer_than is None or newer_than >= 0
72

    
73
    if newer_than is None:
74
      return self._log
75

    
76
    return self._log[newer_than:]
77

    
78

    
79
class TestJobChangesChecker(unittest.TestCase):
80
  def testStatus(self):
81
    job = _FakeJob(9094, constants.JOB_STATUS_QUEUED)
82
    checker = jqueue._JobChangesChecker(["status"], None, None)
83
    self.assertEqual(checker(job), ([constants.JOB_STATUS_QUEUED], []))
84

    
85
    job.SetStatus(constants.JOB_STATUS_RUNNING)
86
    self.assertEqual(checker(job), ([constants.JOB_STATUS_RUNNING], []))
87

    
88
    job.SetStatus(constants.JOB_STATUS_SUCCESS)
89
    self.assertEqual(checker(job), ([constants.JOB_STATUS_SUCCESS], []))
90

    
91
    # job.id is used by checker
92
    self.assertEqual(job.id, 9094)
93

    
94
  def testStatusWithPrev(self):
95
    job = _FakeJob(12807, constants.JOB_STATUS_QUEUED)
96
    checker = jqueue._JobChangesChecker(["status"],
97
                                        [constants.JOB_STATUS_QUEUED], None)
98
    self.assert_(checker(job) is None)
99

    
100
    job.SetStatus(constants.JOB_STATUS_RUNNING)
101
    self.assertEqual(checker(job), ([constants.JOB_STATUS_RUNNING], []))
102

    
103
  def testFinalStatus(self):
104
    for status in constants.JOBS_FINALIZED:
105
      job = _FakeJob(2178711, status)
106
      checker = jqueue._JobChangesChecker(["status"], [status], None)
107
      # There won't be any changes in this status, hence it should signal
108
      # a change immediately
109
      self.assertEqual(checker(job), ([status], []))
110

    
111
  def testLog(self):
112
    job = _FakeJob(9094, constants.JOB_STATUS_RUNNING)
113
    checker = jqueue._JobChangesChecker(["status"], None, None)
114
    self.assertEqual(checker(job), ([constants.JOB_STATUS_RUNNING], []))
115

    
116
    job.AddLogEntry("Hello World")
117
    (job_info, log_entries) = checker(job)
118
    self.assertEqual(job_info, [constants.JOB_STATUS_RUNNING])
119
    self.assertEqual(log_entries, [[0, "Hello World"]])
120

    
121
    checker2 = jqueue._JobChangesChecker(["status"], job_info, len(log_entries))
122
    self.assert_(checker2(job) is None)
123

    
124
    job.AddLogEntry("Foo Bar")
125
    job.SetStatus(constants.JOB_STATUS_ERROR)
126

    
127
    (job_info, log_entries) = checker2(job)
128
    self.assertEqual(job_info, [constants.JOB_STATUS_ERROR])
129
    self.assertEqual(log_entries, [[1, "Foo Bar"]])
130

    
131
    checker3 = jqueue._JobChangesChecker(["status"], None, None)
132
    (job_info, log_entries) = checker3(job)
133
    self.assertEqual(job_info, [constants.JOB_STATUS_ERROR])
134
    self.assertEqual(log_entries, [[0, "Hello World"], [1, "Foo Bar"]])
135

    
136

    
137
class TestJobChangesWaiter(unittest.TestCase):
138
  def setUp(self):
139
    self.tmpdir = tempfile.mkdtemp()
140
    self.filename = utils.PathJoin(self.tmpdir, "job-1")
141
    utils.WriteFile(self.filename, data="")
142

    
143
  def tearDown(self):
144
    shutil.rmtree(self.tmpdir)
145

    
146
  def _EnsureNotifierClosed(self, notifier):
147
    try:
148
      os.fstat(notifier._fd)
149
    except EnvironmentError, err:
150
      self.assertEqual(err.errno, errno.EBADF)
151
    else:
152
      self.fail("File descriptor wasn't closed")
153

    
154
  def testClose(self):
155
    for wait in [False, True]:
156
      waiter = jqueue._JobFileChangesWaiter(self.filename)
157
      try:
158
        if wait:
159
          waiter.Wait(0.001)
160
      finally:
161
        waiter.Close()
162

    
163
      # Ensure file descriptor was closed
164
      self._EnsureNotifierClosed(waiter._notifier)
165

    
166
  def testChangingFile(self):
167
    waiter = jqueue._JobFileChangesWaiter(self.filename)
168
    try:
169
      self.assertFalse(waiter.Wait(0.1))
170
      utils.WriteFile(self.filename, data="changed")
171
      self.assert_(waiter.Wait(60))
172
    finally:
173
      waiter.Close()
174

    
175
    self._EnsureNotifierClosed(waiter._notifier)
176

    
177
  def testChangingFile2(self):
178
    waiter = jqueue._JobChangesWaiter(self.filename)
179
    try:
180
      self.assertFalse(waiter._filewaiter)
181
      self.assert_(waiter.Wait(0.1))
182
      self.assert_(waiter._filewaiter)
183

    
184
      # File waiter is now used, but there have been no changes
185
      self.assertFalse(waiter.Wait(0.1))
186
      utils.WriteFile(self.filename, data="changed")
187
      self.assert_(waiter.Wait(60))
188
    finally:
189
      waiter.Close()
190

    
191
    self._EnsureNotifierClosed(waiter._filewaiter._notifier)
192

    
193

    
194
class TestWaitForJobChangesHelper(unittest.TestCase):
195
  def setUp(self):
196
    self.tmpdir = tempfile.mkdtemp()
197
    self.filename = utils.PathJoin(self.tmpdir, "job-2614226563")
198
    utils.WriteFile(self.filename, data="")
199

    
200
  def tearDown(self):
201
    shutil.rmtree(self.tmpdir)
202

    
203
  def _LoadWaitingJob(self):
204
    return _FakeJob(2614226563, constants.JOB_STATUS_WAITLOCK)
205

    
206
  def _LoadLostJob(self):
207
    return None
208

    
209
  def testNoChanges(self):
210
    wfjc = jqueue._WaitForJobChangesHelper()
211

    
212
    # No change
213
    self.assertEqual(wfjc(self.filename, self._LoadWaitingJob, ["status"],
214
                          [constants.JOB_STATUS_WAITLOCK], None, 0.1),
215
                     constants.JOB_NOTCHANGED)
216

    
217
    # No previous information
218
    self.assertEqual(wfjc(self.filename, self._LoadWaitingJob,
219
                          ["status"], None, None, 1.0),
220
                     ([constants.JOB_STATUS_WAITLOCK], []))
221

    
222
  def testLostJob(self):
223
    wfjc = jqueue._WaitForJobChangesHelper()
224
    self.assert_(wfjc(self.filename, self._LoadLostJob,
225
                      ["status"], None, None, 1.0) is None)
226

    
227

    
228
class TestEncodeOpError(unittest.TestCase):
229
  def test(self):
230
    encerr = jqueue._EncodeOpError(errors.LockError("Test 1"))
231
    self.assert_(isinstance(encerr, tuple))
232
    self.assertRaises(errors.LockError, errors.MaybeRaise, encerr)
233

    
234
    encerr = jqueue._EncodeOpError(errors.GenericError("Test 2"))
235
    self.assert_(isinstance(encerr, tuple))
236
    self.assertRaises(errors.GenericError, errors.MaybeRaise, encerr)
237

    
238
    encerr = jqueue._EncodeOpError(NotImplementedError("Foo"))
239
    self.assert_(isinstance(encerr, tuple))
240
    self.assertRaises(errors.OpExecError, errors.MaybeRaise, encerr)
241

    
242
    encerr = jqueue._EncodeOpError("Hello World")
243
    self.assert_(isinstance(encerr, tuple))
244
    self.assertRaises(errors.OpExecError, errors.MaybeRaise, encerr)
245

    
246

    
247
class TestQueuedOpCode(unittest.TestCase):
248
  def testDefaults(self):
249
    def _Check(op):
250
      self.assertFalse(hasattr(op.input, "dry_run"))
251
      self.assertEqual(op.priority, constants.OP_PRIO_DEFAULT)
252
      self.assertFalse(op.log)
253
      self.assert_(op.start_timestamp is None)
254
      self.assert_(op.exec_timestamp is None)
255
      self.assert_(op.end_timestamp is None)
256
      self.assert_(op.result is None)
257
      self.assertEqual(op.status, constants.OP_STATUS_QUEUED)
258

    
259
    op1 = jqueue._QueuedOpCode(opcodes.OpTestDelay())
260
    _Check(op1)
261
    op2 = jqueue._QueuedOpCode.Restore(op1.Serialize())
262
    _Check(op2)
263
    self.assertEqual(op1.Serialize(), op2.Serialize())
264

    
265
  def testPriority(self):
266
    def _Check(op):
267
      assert constants.OP_PRIO_DEFAULT != constants.OP_PRIO_HIGH, \
268
             "Default priority equals high priority; test can't work"
269
      self.assertEqual(op.priority, constants.OP_PRIO_HIGH)
270
      self.assertEqual(op.status, constants.OP_STATUS_QUEUED)
271

    
272
    inpop = opcodes.OpTagsGet(priority=constants.OP_PRIO_HIGH)
273
    op1 = jqueue._QueuedOpCode(inpop)
274
    _Check(op1)
275
    op2 = jqueue._QueuedOpCode.Restore(op1.Serialize())
276
    _Check(op2)
277
    self.assertEqual(op1.Serialize(), op2.Serialize())
278

    
279

    
280
class TestQueuedJob(unittest.TestCase):
281
  def test(self):
282
    self.assertRaises(errors.GenericError, jqueue._QueuedJob,
283
                      None, 1, [], False)
284

    
285
  def testDefaults(self):
286
    job_id = 4260
287
    ops = [
288
      opcodes.OpTagsGet(),
289
      opcodes.OpTestDelay(),
290
      ]
291

    
292
    def _Check(job):
293
      self.assertTrue(job.writable)
294
      self.assertEqual(job.id, job_id)
295
      self.assertEqual(job.log_serial, 0)
296
      self.assert_(job.received_timestamp)
297
      self.assert_(job.start_timestamp is None)
298
      self.assert_(job.end_timestamp is None)
299
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
300
      self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT)
301
      self.assert_(repr(job).startswith("<"))
302
      self.assertEqual(len(job.ops), len(ops))
303
      self.assert_(compat.all(inp.__getstate__() == op.input.__getstate__()
304
                              for (inp, op) in zip(ops, job.ops)))
305
      self.assertRaises(errors.OpExecError, job.GetInfo,
306
                        ["unknown-field"])
307
      self.assertEqual(job.GetInfo(["summary"]),
308
                       [[op.input.Summary() for op in job.ops]])
309

    
310
    job1 = jqueue._QueuedJob(None, job_id, ops, True)
311
    _Check(job1)
312
    job2 = jqueue._QueuedJob.Restore(None, job1.Serialize(), True)
313
    _Check(job2)
314
    self.assertEqual(job1.Serialize(), job2.Serialize())
315

    
316
  def testWritable(self):
317
    job = jqueue._QueuedJob(None, 1, [opcodes.OpTestDelay()], False)
318
    self.assertFalse(job.writable)
319

    
320
    job = jqueue._QueuedJob(None, 1, [opcodes.OpTestDelay()], True)
321
    self.assertTrue(job.writable)
322

    
323
  def testPriority(self):
324
    job_id = 4283
325
    ops = [
326
      opcodes.OpTagsGet(priority=constants.OP_PRIO_DEFAULT),
327
      opcodes.OpTestDelay(),
328
      ]
329

    
330
    def _Check(job):
331
      self.assertEqual(job.id, job_id)
332
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
333
      self.assert_(repr(job).startswith("<"))
334

    
335
    job = jqueue._QueuedJob(None, job_id, ops, True)
336
    _Check(job)
337
    self.assert_(compat.all(op.priority == constants.OP_PRIO_DEFAULT
338
                            for op in job.ops))
339
    self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT)
340

    
341
    # Increase first
342
    job.ops[0].priority -= 1
343
    _Check(job)
344
    self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT - 1)
345

    
346
    # Mark opcode as finished
347
    job.ops[0].status = constants.OP_STATUS_SUCCESS
348
    _Check(job)
349
    self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT)
350

    
351
    # Increase second
352
    job.ops[1].priority -= 10
353
    self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT - 10)
354

    
355
    # Test increasing first
356
    job.ops[0].status = constants.OP_STATUS_RUNNING
357
    job.ops[0].priority -= 19
358
    self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT - 20)
359

    
360
  def testCalcStatus(self):
361
    def _Queued(ops):
362
      # The default status is "queued"
363
      self.assert_(compat.all(op.status == constants.OP_STATUS_QUEUED
364
                              for op in ops))
365

    
366
    def _Waitlock1(ops):
367
      ops[0].status = constants.OP_STATUS_WAITLOCK
368

    
369
    def _Waitlock2(ops):
370
      ops[0].status = constants.OP_STATUS_SUCCESS
371
      ops[1].status = constants.OP_STATUS_SUCCESS
372
      ops[2].status = constants.OP_STATUS_WAITLOCK
373

    
374
    def _Running(ops):
375
      ops[0].status = constants.OP_STATUS_SUCCESS
376
      ops[1].status = constants.OP_STATUS_RUNNING
377
      for op in ops[2:]:
378
        op.status = constants.OP_STATUS_QUEUED
379

    
380
    def _Canceling1(ops):
381
      ops[0].status = constants.OP_STATUS_SUCCESS
382
      ops[1].status = constants.OP_STATUS_SUCCESS
383
      for op in ops[2:]:
384
        op.status = constants.OP_STATUS_CANCELING
385

    
386
    def _Canceling2(ops):
387
      for op in ops:
388
        op.status = constants.OP_STATUS_CANCELING
389

    
390
    def _Canceled(ops):
391
      for op in ops:
392
        op.status = constants.OP_STATUS_CANCELED
393

    
394
    def _Error1(ops):
395
      for idx, op in enumerate(ops):
396
        if idx > 3:
397
          op.status = constants.OP_STATUS_ERROR
398
        else:
399
          op.status = constants.OP_STATUS_SUCCESS
400

    
401
    def _Error2(ops):
402
      for op in ops:
403
        op.status = constants.OP_STATUS_ERROR
404

    
405
    def _Success(ops):
406
      for op in ops:
407
        op.status = constants.OP_STATUS_SUCCESS
408

    
409
    tests = {
410
      constants.JOB_STATUS_QUEUED: [_Queued],
411
      constants.JOB_STATUS_WAITLOCK: [_Waitlock1, _Waitlock2],
412
      constants.JOB_STATUS_RUNNING: [_Running],
413
      constants.JOB_STATUS_CANCELING: [_Canceling1, _Canceling2],
414
      constants.JOB_STATUS_CANCELED: [_Canceled],
415
      constants.JOB_STATUS_ERROR: [_Error1, _Error2],
416
      constants.JOB_STATUS_SUCCESS: [_Success],
417
      }
418

    
419
    def _NewJob():
420
      job = jqueue._QueuedJob(None, 1,
421
                              [opcodes.OpTestDelay() for _ in range(10)],
422
                              True)
423
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
424
      self.assert_(compat.all(op.status == constants.OP_STATUS_QUEUED
425
                              for op in job.ops))
426
      return job
427

    
428
    for status in constants.JOB_STATUS_ALL:
429
      sttests = tests[status]
430
      assert sttests
431
      for fn in sttests:
432
        job = _NewJob()
433
        fn(job.ops)
434
        self.assertEqual(job.CalcStatus(), status)
435

    
436

    
437
class _FakeDependencyManager:
438
  def __init__(self):
439
    self._checks = []
440
    self._notifications = []
441
    self._waiting = set()
442

    
443
  def AddCheckResult(self, job, dep_job_id, dep_status, result):
444
    self._checks.append((job, dep_job_id, dep_status, result))
445

    
446
  def CountPendingResults(self):
447
    return len(self._checks)
448

    
449
  def CountWaitingJobs(self):
450
    return len(self._waiting)
451

    
452
  def GetNextNotification(self):
453
    return self._notifications.pop(0)
454

    
455
  def JobWaiting(self, job):
456
    return job in self._waiting
457

    
458
  def CheckAndRegister(self, job, dep_job_id, dep_status):
459
    (exp_job, exp_dep_job_id, exp_dep_status, result) = self._checks.pop(0)
460

    
461
    assert exp_job == job
462
    assert exp_dep_job_id == dep_job_id
463
    assert exp_dep_status == dep_status
464

    
465
    (result_status, _) = result
466

    
467
    if result_status == jqueue._JobDependencyManager.WAIT:
468
      self._waiting.add(job)
469
    elif result_status == jqueue._JobDependencyManager.CONTINUE:
470
      self._waiting.remove(job)
471

    
472
    return result
473

    
474
  def NotifyWaiters(self, job_id):
475
    self._notifications.append(job_id)
476

    
477

    
478
class _DisabledFakeDependencyManager:
479
  def JobWaiting(self, _):
480
    return False
481

    
482
  def CheckAndRegister(self, *args):
483
    assert False, "Should not be called"
484

    
485
  def NotifyWaiters(self, _):
486
    pass
487

    
488

    
489
class _FakeQueueForProc:
490
  def __init__(self, depmgr=None):
491
    self._acquired = False
492
    self._updates = []
493
    self._submitted = []
494

    
495
    self._submit_count = itertools.count(1000)
496

    
497
    if depmgr:
498
      self.depmgr = depmgr
499
    else:
500
      self.depmgr = _DisabledFakeDependencyManager()
501

    
502
  def IsAcquired(self):
503
    return self._acquired
504

    
505
  def GetNextUpdate(self):
506
    return self._updates.pop(0)
507

    
508
  def GetNextSubmittedJob(self):
509
    return self._submitted.pop(0)
510

    
511
  def acquire(self, shared=0):
512
    assert shared == 1
513
    self._acquired = True
514

    
515
  def release(self):
516
    assert self._acquired
517
    self._acquired = False
518

    
519
  def UpdateJobUnlocked(self, job, replicate=True):
520
    assert self._acquired, "Lock not acquired while updating job"
521
    self._updates.append((job, bool(replicate)))
522

    
523
  def SubmitManyJobs(self, jobs):
524
    assert not self._acquired, "Lock acquired while submitting jobs"
525
    job_ids = [self._submit_count.next() for _ in jobs]
526
    self._submitted.extend(zip(job_ids, jobs))
527
    return job_ids
528

    
529

    
530
class _FakeExecOpCodeForProc:
531
  def __init__(self, queue, before_start, after_start):
532
    self._queue = queue
533
    self._before_start = before_start
534
    self._after_start = after_start
535

    
536
  def __call__(self, op, cbs, timeout=None, priority=None):
537
    assert isinstance(op, opcodes.OpTestDummy)
538
    assert not self._queue.IsAcquired(), \
539
           "Queue lock not released when executing opcode"
540

    
541
    if self._before_start:
542
      self._before_start(timeout, priority)
543

    
544
    cbs.NotifyStart()
545

    
546
    if self._after_start:
547
      self._after_start(op, cbs)
548

    
549
    # Check again after the callbacks
550
    assert not self._queue.IsAcquired()
551

    
552
    if op.fail:
553
      raise errors.OpExecError("Error requested (%s)" % op.result)
554

    
555
    if hasattr(op, "submit_jobs") and op.submit_jobs is not None:
556
      return cbs.SubmitManyJobs(op.submit_jobs)
557

    
558
    return op.result
559

    
560

    
561
class _JobProcessorTestUtils:
562
  def _CreateJob(self, queue, job_id, ops):
563
    job = jqueue._QueuedJob(queue, job_id, ops, True)
564
    self.assertFalse(job.start_timestamp)
565
    self.assertFalse(job.end_timestamp)
566
    self.assertEqual(len(ops), len(job.ops))
567
    self.assert_(compat.all(op.input == inp
568
                            for (op, inp) in zip(job.ops, ops)))
569
    self.assertEqual(job.GetInfo(["ops"]), [[op.__getstate__() for op in ops]])
570
    return job
571

    
572

    
573
class TestJobProcessor(unittest.TestCase, _JobProcessorTestUtils):
574
  def _GenericCheckJob(self, job):
575
    assert compat.all(isinstance(op.input, opcodes.OpTestDummy)
576
                      for op in job.ops)
577

    
578
    self.assertEqual(job.GetInfo(["opstart", "opexec", "opend"]),
579
                     [[op.start_timestamp for op in job.ops],
580
                      [op.exec_timestamp for op in job.ops],
581
                      [op.end_timestamp for op in job.ops]])
582
    self.assertEqual(job.GetInfo(["received_ts", "start_ts", "end_ts"]),
583
                     [job.received_timestamp,
584
                      job.start_timestamp,
585
                      job.end_timestamp])
586
    self.assert_(job.start_timestamp)
587
    self.assert_(job.end_timestamp)
588
    self.assertEqual(job.start_timestamp, job.ops[0].start_timestamp)
589

    
590
  def testSuccess(self):
591
    queue = _FakeQueueForProc()
592

    
593
    for (job_id, opcount) in [(25351, 1), (6637, 3),
594
                              (24644, 10), (32207, 100)]:
595
      ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
596
             for i in range(opcount)]
597

    
598
      # Create job
599
      job = self._CreateJob(queue, job_id, ops)
600

    
601
      def _BeforeStart(timeout, priority):
602
        self.assertEqual(queue.GetNextUpdate(), (job, True))
603
        self.assertRaises(IndexError, queue.GetNextUpdate)
604
        self.assertFalse(queue.IsAcquired())
605
        self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITLOCK)
606
        self.assertFalse(job.cur_opctx)
607

    
608
      def _AfterStart(op, cbs):
609
        self.assertEqual(queue.GetNextUpdate(), (job, True))
610
        self.assertRaises(IndexError, queue.GetNextUpdate)
611

    
612
        self.assertFalse(queue.IsAcquired())
613
        self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
614
        self.assertFalse(job.cur_opctx)
615

    
616
        # Job is running, cancelling shouldn't be possible
617
        (success, _) = job.Cancel()
618
        self.assertFalse(success)
619

    
620
      opexec = _FakeExecOpCodeForProc(queue, _BeforeStart, _AfterStart)
621

    
622
      for idx in range(len(ops)):
623
        self.assertRaises(IndexError, queue.GetNextUpdate)
624
        result = jqueue._JobProcessor(queue, opexec, job)()
625
        self.assertEqual(queue.GetNextUpdate(), (job, True))
626
        self.assertRaises(IndexError, queue.GetNextUpdate)
627
        if idx == len(ops) - 1:
628
          # Last opcode
629
          self.assert_(result)
630
        else:
631
          self.assertFalse(result)
632

    
633
          self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
634
          self.assert_(job.start_timestamp)
635
          self.assertFalse(job.end_timestamp)
636

    
637
      self.assertRaises(IndexError, queue.GetNextUpdate)
638

    
639
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_SUCCESS)
640
      self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_SUCCESS])
641
      self.assertEqual(job.GetInfo(["opresult"]),
642
                       [[op.input.result for op in job.ops]])
643
      self.assertEqual(job.GetInfo(["opstatus"]),
644
                       [len(job.ops) * [constants.OP_STATUS_SUCCESS]])
645
      self.assert_(compat.all(op.start_timestamp and op.end_timestamp
646
                              for op in job.ops))
647

    
648
      self._GenericCheckJob(job)
649

    
650
      # Calling the processor on a finished job should be a no-op
651
      self.assertTrue(jqueue._JobProcessor(queue, opexec, job)())
652
      self.assertRaises(IndexError, queue.GetNextUpdate)
653

    
654
  def testOpcodeError(self):
655
    queue = _FakeQueueForProc()
656

    
657
    testdata = [
658
      (17077, 1, 0, 0),
659
      (1782, 5, 2, 2),
660
      (18179, 10, 9, 9),
661
      (4744, 10, 3, 8),
662
      (23816, 100, 39, 45),
663
      ]
664

    
665
    for (job_id, opcount, failfrom, failto) in testdata:
666
      # Prepare opcodes
667
      ops = [opcodes.OpTestDummy(result="Res%s" % i,
668
                                 fail=(failfrom <= i and
669
                                       i <= failto))
670
             for i in range(opcount)]
671

    
672
      # Create job
673
      job = self._CreateJob(queue, job_id, ops)
674

    
675
      opexec = _FakeExecOpCodeForProc(queue, None, None)
676

    
677
      for idx in range(len(ops)):
678
        self.assertRaises(IndexError, queue.GetNextUpdate)
679
        result = jqueue._JobProcessor(queue, opexec, job)()
680
        # queued to waitlock
681
        self.assertEqual(queue.GetNextUpdate(), (job, True))
682
        # waitlock to running
683
        self.assertEqual(queue.GetNextUpdate(), (job, True))
684
        # Opcode result
685
        self.assertEqual(queue.GetNextUpdate(), (job, True))
686
        self.assertRaises(IndexError, queue.GetNextUpdate)
687

    
688
        if idx in (failfrom, len(ops) - 1):
689
          # Last opcode
690
          self.assert_(result)
691
          break
692

    
693
        self.assertFalse(result)
694

    
695
        self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
696

    
697
      self.assertRaises(IndexError, queue.GetNextUpdate)
698

    
699
      # Check job status
700
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_ERROR)
701
      self.assertEqual(job.GetInfo(["id"]), [job_id])
702
      self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_ERROR])
703

    
704
      # Check opcode status
705
      data = zip(job.ops,
706
                 job.GetInfo(["opstatus"])[0],
707
                 job.GetInfo(["opresult"])[0])
708

    
709
      for idx, (op, opstatus, opresult) in enumerate(data):
710
        if idx < failfrom:
711
          assert not op.input.fail
712
          self.assertEqual(opstatus, constants.OP_STATUS_SUCCESS)
713
          self.assertEqual(opresult, op.input.result)
714
        elif idx <= failto:
715
          assert op.input.fail
716
          self.assertEqual(opstatus, constants.OP_STATUS_ERROR)
717
          self.assertRaises(errors.OpExecError, errors.MaybeRaise, opresult)
718
        else:
719
          assert not op.input.fail
720
          self.assertEqual(opstatus, constants.OP_STATUS_ERROR)
721
          self.assertRaises(errors.OpExecError, errors.MaybeRaise, opresult)
722

    
723
      self.assert_(compat.all(op.start_timestamp and op.end_timestamp
724
                              for op in job.ops[:failfrom]))
725

    
726
      self._GenericCheckJob(job)
727

    
728
      # Calling the processor on a finished job should be a no-op
729
      self.assertTrue(jqueue._JobProcessor(queue, opexec, job)())
730
      self.assertRaises(IndexError, queue.GetNextUpdate)
731

    
732
  def testCancelWhileInQueue(self):
733
    queue = _FakeQueueForProc()
734

    
735
    ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
736
           for i in range(5)]
737

    
738
    # Create job
739
    job_id = 17045
740
    job = self._CreateJob(queue, job_id, ops)
741

    
742
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
743

    
744
    # Mark as cancelled
745
    (success, _) = job.Cancel()
746
    self.assert_(success)
747

    
748
    self.assertRaises(IndexError, queue.GetNextUpdate)
749

    
750
    self.assertFalse(job.start_timestamp)
751
    self.assertTrue(job.end_timestamp)
752
    self.assert_(compat.all(op.status == constants.OP_STATUS_CANCELED
753
                            for op in job.ops))
754

    
755
    # Serialize to check for differences
756
    before_proc = job.Serialize()
757

    
758
    # Simulate processor called in workerpool
759
    opexec = _FakeExecOpCodeForProc(queue, None, None)
760
    self.assert_(jqueue._JobProcessor(queue, opexec, job)())
761

    
762
    # Check result
763
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_CANCELED)
764
    self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_CANCELED])
765
    self.assertFalse(job.start_timestamp)
766
    self.assertTrue(job.end_timestamp)
767
    self.assertFalse(compat.any(op.start_timestamp or op.end_timestamp
768
                                for op in job.ops))
769
    self.assertEqual(job.GetInfo(["opstatus", "opresult"]),
770
                     [[constants.OP_STATUS_CANCELED for _ in job.ops],
771
                      ["Job canceled by request" for _ in job.ops]])
772

    
773
    # Must not have changed or written
774
    self.assertEqual(before_proc, job.Serialize())
775
    self.assertRaises(IndexError, queue.GetNextUpdate)
776

    
777
  def testCancelWhileWaitlockInQueue(self):
778
    queue = _FakeQueueForProc()
779

    
780
    ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
781
           for i in range(5)]
782

    
783
    # Create job
784
    job_id = 8645
785
    job = self._CreateJob(queue, job_id, ops)
786

    
787
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
788

    
789
    job.ops[0].status = constants.OP_STATUS_WAITLOCK
790

    
791
    assert len(job.ops) == 5
792

    
793
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITLOCK)
794

    
795
    # Mark as cancelling
796
    (success, _) = job.Cancel()
797
    self.assert_(success)
798

    
799
    self.assertRaises(IndexError, queue.GetNextUpdate)
800

    
801
    self.assert_(compat.all(op.status == constants.OP_STATUS_CANCELING
802
                            for op in job.ops))
803

    
804
    opexec = _FakeExecOpCodeForProc(queue, None, None)
805
    self.assert_(jqueue._JobProcessor(queue, opexec, job)())
806

    
807
    # Check result
808
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_CANCELED)
809
    self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_CANCELED])
810
    self.assertFalse(job.start_timestamp)
811
    self.assert_(job.end_timestamp)
812
    self.assertFalse(compat.any(op.start_timestamp or op.end_timestamp
813
                                for op in job.ops))
814
    self.assertEqual(job.GetInfo(["opstatus", "opresult"]),
815
                     [[constants.OP_STATUS_CANCELED for _ in job.ops],
816
                      ["Job canceled by request" for _ in job.ops]])
817

    
818
  def testCancelWhileWaitlock(self):
819
    queue = _FakeQueueForProc()
820

    
821
    ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
822
           for i in range(5)]
823

    
824
    # Create job
825
    job_id = 11009
826
    job = self._CreateJob(queue, job_id, ops)
827

    
828
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
829

    
830
    def _BeforeStart(timeout, priority):
831
      self.assertEqual(queue.GetNextUpdate(), (job, True))
832
      self.assertRaises(IndexError, queue.GetNextUpdate)
833
      self.assertFalse(queue.IsAcquired())
834
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITLOCK)
835

    
836
      # Mark as cancelled
837
      (success, _) = job.Cancel()
838
      self.assert_(success)
839

    
840
      self.assert_(compat.all(op.status == constants.OP_STATUS_CANCELING
841
                              for op in job.ops))
842
      self.assertRaises(IndexError, queue.GetNextUpdate)
843

    
844
    def _AfterStart(op, cbs):
845
      self.assertEqual(queue.GetNextUpdate(), (job, True))
846
      self.assertRaises(IndexError, queue.GetNextUpdate)
847
      self.assertFalse(queue.IsAcquired())
848
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
849

    
850
    opexec = _FakeExecOpCodeForProc(queue, _BeforeStart, _AfterStart)
851

    
852
    self.assertRaises(IndexError, queue.GetNextUpdate)
853
    self.assert_(jqueue._JobProcessor(queue, opexec, job)())
854
    self.assertEqual(queue.GetNextUpdate(), (job, True))
855
    self.assertRaises(IndexError, queue.GetNextUpdate)
856

    
857
    # Check result
858
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_CANCELED)
859
    self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_CANCELED])
860
    self.assert_(job.start_timestamp)
861
    self.assert_(job.end_timestamp)
862
    self.assertFalse(compat.all(op.start_timestamp and op.end_timestamp
863
                                for op in job.ops))
864
    self.assertEqual(job.GetInfo(["opstatus", "opresult"]),
865
                     [[constants.OP_STATUS_CANCELED for _ in job.ops],
866
                      ["Job canceled by request" for _ in job.ops]])
867

    
868
  def testCancelWhileWaitlockWithTimeout(self):
869
    queue = _FakeQueueForProc()
870

    
871
    ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
872
           for i in range(5)]
873

    
874
    # Create job
875
    job_id = 24314
876
    job = self._CreateJob(queue, job_id, ops)
877

    
878
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
879

    
880
    def _BeforeStart(timeout, priority):
881
      self.assertFalse(queue.IsAcquired())
882
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITLOCK)
883

    
884
      # Mark as cancelled
885
      (success, _) = job.Cancel()
886
      self.assert_(success)
887

    
888
      self.assert_(compat.all(op.status == constants.OP_STATUS_CANCELING
889
                              for op in job.ops))
890

    
891
      # Fake an acquire attempt timing out
892
      raise mcpu.LockAcquireTimeout()
893

    
894
    def _AfterStart(op, cbs):
895
      self.fail("Should not reach this")
896

    
897
    opexec = _FakeExecOpCodeForProc(queue, _BeforeStart, _AfterStart)
898

    
899
    self.assert_(jqueue._JobProcessor(queue, opexec, job)())
900

    
901
    # Check result
902
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_CANCELED)
903
    self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_CANCELED])
904
    self.assert_(job.start_timestamp)
905
    self.assert_(job.end_timestamp)
906
    self.assertFalse(compat.all(op.start_timestamp and op.end_timestamp
907
                                for op in job.ops))
908
    self.assertEqual(job.GetInfo(["opstatus", "opresult"]),
909
                     [[constants.OP_STATUS_CANCELED for _ in job.ops],
910
                      ["Job canceled by request" for _ in job.ops]])
911

    
912
  def testCancelWhileRunning(self):
913
    # Tests canceling a job with finished opcodes and more, unprocessed ones
914
    queue = _FakeQueueForProc()
915

    
916
    ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
917
           for i in range(3)]
918

    
919
    # Create job
920
    job_id = 28492
921
    job = self._CreateJob(queue, job_id, ops)
922

    
923
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
924

    
925
    opexec = _FakeExecOpCodeForProc(queue, None, None)
926

    
927
    # Run one opcode
928
    self.assertFalse(jqueue._JobProcessor(queue, opexec, job)())
929

    
930
    # Job goes back to queued
931
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
932
    self.assertEqual(job.GetInfo(["opstatus", "opresult"]),
933
                     [[constants.OP_STATUS_SUCCESS,
934
                       constants.OP_STATUS_QUEUED,
935
                       constants.OP_STATUS_QUEUED],
936
                      ["Res0", None, None]])
937

    
938
    # Mark as cancelled
939
    (success, _) = job.Cancel()
940
    self.assert_(success)
941

    
942
    # Try processing another opcode (this will actually cancel the job)
943
    self.assert_(jqueue._JobProcessor(queue, opexec, job)())
944

    
945
    # Check result
946
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_CANCELED)
947
    self.assertEqual(job.GetInfo(["id"]), [job_id])
948
    self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_CANCELED])
949
    self.assertEqual(job.GetInfo(["opstatus", "opresult"]),
950
                     [[constants.OP_STATUS_SUCCESS,
951
                       constants.OP_STATUS_CANCELED,
952
                       constants.OP_STATUS_CANCELED],
953
                      ["Res0", "Job canceled by request",
954
                       "Job canceled by request"]])
955

    
956
  def testPartiallyRun(self):
957
    # Tests calling the processor on a job that's been partially run before the
958
    # program was restarted
959
    queue = _FakeQueueForProc()
960

    
961
    opexec = _FakeExecOpCodeForProc(queue, None, None)
962

    
963
    for job_id, successcount in [(30697, 1), (2552, 4), (12489, 9)]:
964
      ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
965
             for i in range(10)]
966

    
967
      # Create job
968
      job = self._CreateJob(queue, job_id, ops)
969

    
970
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
971

    
972
      for _ in range(successcount):
973
        self.assertFalse(jqueue._JobProcessor(queue, opexec, job)())
974

    
975
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
976
      self.assertEqual(job.GetInfo(["opstatus"]),
977
                       [[constants.OP_STATUS_SUCCESS
978
                         for _ in range(successcount)] +
979
                        [constants.OP_STATUS_QUEUED
980
                         for _ in range(len(ops) - successcount)]])
981

    
982
      self.assert_(job.ops_iter)
983

    
984
      # Serialize and restore (simulates program restart)
985
      newjob = jqueue._QueuedJob.Restore(queue, job.Serialize(), True)
986
      self.assertFalse(newjob.ops_iter)
987
      self._TestPartial(newjob, successcount)
988

    
989
  def _TestPartial(self, job, successcount):
990
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
991
    self.assertEqual(job.start_timestamp, job.ops[0].start_timestamp)
992

    
993
    queue = _FakeQueueForProc()
994
    opexec = _FakeExecOpCodeForProc(queue, None, None)
995

    
996
    for remaining in reversed(range(len(job.ops) - successcount)):
997
      result = jqueue._JobProcessor(queue, opexec, job)()
998
      self.assertEqual(queue.GetNextUpdate(), (job, True))
999
      self.assertEqual(queue.GetNextUpdate(), (job, True))
1000
      self.assertEqual(queue.GetNextUpdate(), (job, True))
1001
      self.assertRaises(IndexError, queue.GetNextUpdate)
1002

    
1003
      if remaining == 0:
1004
        # Last opcode
1005
        self.assert_(result)
1006
        break
1007

    
1008
      self.assertFalse(result)
1009

    
1010
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1011

    
1012
    self.assertRaises(IndexError, queue.GetNextUpdate)
1013
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_SUCCESS)
1014
    self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_SUCCESS])
1015
    self.assertEqual(job.GetInfo(["opresult"]),
1016
                     [[op.input.result for op in job.ops]])
1017
    self.assertEqual(job.GetInfo(["opstatus"]),
1018
                     [[constants.OP_STATUS_SUCCESS for _ in job.ops]])
1019
    self.assert_(compat.all(op.start_timestamp and op.end_timestamp
1020
                            for op in job.ops))
1021

    
1022
    self._GenericCheckJob(job)
1023

    
1024
    # Calling the processor on a finished job should be a no-op
1025
    self.assertTrue(jqueue._JobProcessor(queue, opexec, job)())
1026
    self.assertRaises(IndexError, queue.GetNextUpdate)
1027

    
1028
    # ... also after being restored
1029
    job2 = jqueue._QueuedJob.Restore(queue, job.Serialize(), True)
1030
    # Calling the processor on a finished job should be a no-op
1031
    self.assertTrue(jqueue._JobProcessor(queue, opexec, job2)())
1032
    self.assertRaises(IndexError, queue.GetNextUpdate)
1033

    
1034
  def testProcessorOnRunningJob(self):
1035
    ops = [opcodes.OpTestDummy(result="result", fail=False)]
1036

    
1037
    queue = _FakeQueueForProc()
1038
    opexec = _FakeExecOpCodeForProc(queue, None, None)
1039

    
1040
    # Create job
1041
    job = self._CreateJob(queue, 9571, ops)
1042

    
1043
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1044

    
1045
    job.ops[0].status = constants.OP_STATUS_RUNNING
1046

    
1047
    assert len(job.ops) == 1
1048

    
1049
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
1050

    
1051
    # Calling on running job must fail
1052
    self.assertRaises(errors.ProgrammerError,
1053
                      jqueue._JobProcessor(queue, opexec, job))
1054

    
1055
  def testLogMessages(self):
1056
    # Tests the "Feedback" callback function
1057
    queue = _FakeQueueForProc()
1058

    
1059
    messages = {
1060
      1: [
1061
        (None, "Hello"),
1062
        (None, "World"),
1063
        (constants.ELOG_MESSAGE, "there"),
1064
        ],
1065
      4: [
1066
        (constants.ELOG_JQUEUE_TEST, (1, 2, 3)),
1067
        (constants.ELOG_JQUEUE_TEST, ("other", "type")),
1068
        ],
1069
      }
1070
    ops = [opcodes.OpTestDummy(result="Logtest%s" % i, fail=False,
1071
                               messages=messages.get(i, []))
1072
           for i in range(5)]
1073

    
1074
    # Create job
1075
    job = self._CreateJob(queue, 29386, ops)
1076

    
1077
    def _BeforeStart(timeout, priority):
1078
      self.assertEqual(queue.GetNextUpdate(), (job, True))
1079
      self.assertRaises(IndexError, queue.GetNextUpdate)
1080
      self.assertFalse(queue.IsAcquired())
1081
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITLOCK)
1082

    
1083
    def _AfterStart(op, cbs):
1084
      self.assertEqual(queue.GetNextUpdate(), (job, True))
1085
      self.assertRaises(IndexError, queue.GetNextUpdate)
1086
      self.assertFalse(queue.IsAcquired())
1087
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
1088

    
1089
      self.assertRaises(AssertionError, cbs.Feedback,
1090
                        "too", "many", "arguments")
1091

    
1092
      for (log_type, msg) in op.messages:
1093
        self.assertRaises(IndexError, queue.GetNextUpdate)
1094
        if log_type:
1095
          cbs.Feedback(log_type, msg)
1096
        else:
1097
          cbs.Feedback(msg)
1098
        # Check for job update without replication
1099
        self.assertEqual(queue.GetNextUpdate(), (job, False))
1100
        self.assertRaises(IndexError, queue.GetNextUpdate)
1101

    
1102
    opexec = _FakeExecOpCodeForProc(queue, _BeforeStart, _AfterStart)
1103

    
1104
    for remaining in reversed(range(len(job.ops))):
1105
      self.assertRaises(IndexError, queue.GetNextUpdate)
1106
      result = jqueue._JobProcessor(queue, opexec, job)()
1107
      self.assertEqual(queue.GetNextUpdate(), (job, True))
1108
      self.assertRaises(IndexError, queue.GetNextUpdate)
1109

    
1110
      if remaining == 0:
1111
        # Last opcode
1112
        self.assert_(result)
1113
        break
1114

    
1115
      self.assertFalse(result)
1116

    
1117
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1118

    
1119
    self.assertRaises(IndexError, queue.GetNextUpdate)
1120

    
1121
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_SUCCESS)
1122
    self.assertEqual(job.GetInfo(["opresult"]),
1123
                     [[op.input.result for op in job.ops]])
1124

    
1125
    logmsgcount = sum(len(m) for m in messages.values())
1126

    
1127
    self._CheckLogMessages(job, logmsgcount)
1128

    
1129
    # Serialize and restore (simulates program restart)
1130
    newjob = jqueue._QueuedJob.Restore(queue, job.Serialize(), True)
1131
    self._CheckLogMessages(newjob, logmsgcount)
1132

    
1133
    # Check each message
1134
    prevserial = -1
1135
    for idx, oplog in enumerate(job.GetInfo(["oplog"])[0]):
1136
      for (serial, timestamp, log_type, msg) in oplog:
1137
        (exptype, expmsg) = messages.get(idx).pop(0)
1138
        if exptype:
1139
          self.assertEqual(log_type, exptype)
1140
        else:
1141
          self.assertEqual(log_type, constants.ELOG_MESSAGE)
1142
        self.assertEqual(expmsg, msg)
1143
        self.assert_(serial > prevserial)
1144
        prevserial = serial
1145

    
1146
  def _CheckLogMessages(self, job, count):
1147
    # Check serial
1148
    self.assertEqual(job.log_serial, count)
1149

    
1150
    # No filter
1151
    self.assertEqual(job.GetLogEntries(None),
1152
                     [entry for entries in job.GetInfo(["oplog"])[0] if entries
1153
                      for entry in entries])
1154

    
1155
    # Filter with serial
1156
    assert count > 3
1157
    self.assert_(job.GetLogEntries(3))
1158
    self.assertEqual(job.GetLogEntries(3),
1159
                     [entry for entries in job.GetInfo(["oplog"])[0] if entries
1160
                      for entry in entries][3:])
1161

    
1162
    # No log message after highest serial
1163
    self.assertFalse(job.GetLogEntries(count))
1164
    self.assertFalse(job.GetLogEntries(count + 3))
1165

    
1166
  def testSubmitManyJobs(self):
1167
    queue = _FakeQueueForProc()
1168

    
1169
    job_id = 15656
1170
    ops = [
1171
      opcodes.OpTestDummy(result="Res0", fail=False,
1172
                          submit_jobs=[]),
1173
      opcodes.OpTestDummy(result="Res1", fail=False,
1174
                          submit_jobs=[
1175
                            [opcodes.OpTestDummy(result="r1j0", fail=False)],
1176
                            ]),
1177
      opcodes.OpTestDummy(result="Res2", fail=False,
1178
                          submit_jobs=[
1179
                            [opcodes.OpTestDummy(result="r2j0o0", fail=False),
1180
                             opcodes.OpTestDummy(result="r2j0o1", fail=False),
1181
                             opcodes.OpTestDummy(result="r2j0o2", fail=False),
1182
                             opcodes.OpTestDummy(result="r2j0o3", fail=False)],
1183
                            [opcodes.OpTestDummy(result="r2j1", fail=False)],
1184
                            [opcodes.OpTestDummy(result="r2j3o0", fail=False),
1185
                             opcodes.OpTestDummy(result="r2j3o1", fail=False)],
1186
                            ]),
1187
      ]
1188

    
1189
    # Create job
1190
    job = self._CreateJob(queue, job_id, ops)
1191

    
1192
    def _BeforeStart(timeout, priority):
1193
      self.assertEqual(queue.GetNextUpdate(), (job, True))
1194
      self.assertRaises(IndexError, queue.GetNextUpdate)
1195
      self.assertFalse(queue.IsAcquired())
1196
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITLOCK)
1197
      self.assertFalse(job.cur_opctx)
1198

    
1199
    def _AfterStart(op, cbs):
1200
      self.assertEqual(queue.GetNextUpdate(), (job, True))
1201
      self.assertRaises(IndexError, queue.GetNextUpdate)
1202

    
1203
      self.assertFalse(queue.IsAcquired())
1204
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
1205
      self.assertFalse(job.cur_opctx)
1206

    
1207
      # Job is running, cancelling shouldn't be possible
1208
      (success, _) = job.Cancel()
1209
      self.assertFalse(success)
1210

    
1211
    opexec = _FakeExecOpCodeForProc(queue, _BeforeStart, _AfterStart)
1212

    
1213
    for idx in range(len(ops)):
1214
      self.assertRaises(IndexError, queue.GetNextUpdate)
1215
      result = jqueue._JobProcessor(queue, opexec, job)()
1216
      self.assertEqual(queue.GetNextUpdate(), (job, True))
1217
      self.assertRaises(IndexError, queue.GetNextUpdate)
1218
      if idx == len(ops) - 1:
1219
        # Last opcode
1220
        self.assert_(result)
1221
      else:
1222
        self.assertFalse(result)
1223

    
1224
        self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1225
        self.assert_(job.start_timestamp)
1226
        self.assertFalse(job.end_timestamp)
1227

    
1228
    self.assertRaises(IndexError, queue.GetNextUpdate)
1229

    
1230
    for idx, submitted_ops in enumerate(job_ops
1231
                                        for op in ops
1232
                                        for job_ops in op.submit_jobs):
1233
      self.assertEqual(queue.GetNextSubmittedJob(),
1234
                       (1000 + idx, submitted_ops))
1235
    self.assertRaises(IndexError, queue.GetNextSubmittedJob)
1236

    
1237
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_SUCCESS)
1238
    self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_SUCCESS])
1239
    self.assertEqual(job.GetInfo(["opresult"]),
1240
                     [[[], [1000], [1001, 1002, 1003]]])
1241
    self.assertEqual(job.GetInfo(["opstatus"]),
1242
                     [len(job.ops) * [constants.OP_STATUS_SUCCESS]])
1243

    
1244
    self._GenericCheckJob(job)
1245

    
1246
    # Calling the processor on a finished job should be a no-op
1247
    self.assertTrue(jqueue._JobProcessor(queue, opexec, job)())
1248
    self.assertRaises(IndexError, queue.GetNextUpdate)
1249

    
1250
  def testJobDependency(self):
1251
    depmgr = _FakeDependencyManager()
1252
    queue = _FakeQueueForProc(depmgr=depmgr)
1253

    
1254
    self.assertEqual(queue.depmgr, depmgr)
1255

    
1256
    prev_job_id = 22113
1257
    prev_job_id2 = 28102
1258
    job_id = 29929
1259
    ops = [
1260
      opcodes.OpTestDummy(result="Res0", fail=False,
1261
                          depends=[
1262
                            [prev_job_id2, None],
1263
                            [prev_job_id, None],
1264
                            ]),
1265
      opcodes.OpTestDummy(result="Res1", fail=False),
1266
      ]
1267

    
1268
    # Create job
1269
    job = self._CreateJob(queue, job_id, ops)
1270

    
1271
    def _BeforeStart(timeout, priority):
1272
      if attempt == 0 or attempt > 5:
1273
        # Job should only be updated when it wasn't waiting for another job
1274
        self.assertEqual(queue.GetNextUpdate(), (job, True))
1275
      self.assertRaises(IndexError, queue.GetNextUpdate)
1276
      self.assertFalse(queue.IsAcquired())
1277
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITLOCK)
1278
      self.assertFalse(job.cur_opctx)
1279

    
1280
    def _AfterStart(op, cbs):
1281
      self.assertEqual(queue.GetNextUpdate(), (job, True))
1282
      self.assertRaises(IndexError, queue.GetNextUpdate)
1283

    
1284
      self.assertFalse(queue.IsAcquired())
1285
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
1286
      self.assertFalse(job.cur_opctx)
1287

    
1288
      # Job is running, cancelling shouldn't be possible
1289
      (success, _) = job.Cancel()
1290
      self.assertFalse(success)
1291

    
1292
    opexec = _FakeExecOpCodeForProc(queue, _BeforeStart, _AfterStart)
1293

    
1294
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1295

    
1296
    counter = itertools.count()
1297
    while True:
1298
      attempt = counter.next()
1299

    
1300
      self.assertRaises(IndexError, queue.GetNextUpdate)
1301
      self.assertRaises(IndexError, depmgr.GetNextNotification)
1302

    
1303
      if attempt < 2:
1304
        depmgr.AddCheckResult(job, prev_job_id2, None,
1305
                              (jqueue._JobDependencyManager.WAIT, "wait2"))
1306
      elif attempt == 2:
1307
        depmgr.AddCheckResult(job, prev_job_id2, None,
1308
                              (jqueue._JobDependencyManager.CONTINUE, "cont"))
1309
        # The processor will ask for the next dependency immediately
1310
        depmgr.AddCheckResult(job, prev_job_id, None,
1311
                              (jqueue._JobDependencyManager.WAIT, "wait"))
1312
      elif attempt < 5:
1313
        depmgr.AddCheckResult(job, prev_job_id, None,
1314
                              (jqueue._JobDependencyManager.WAIT, "wait"))
1315
      elif attempt == 5:
1316
        depmgr.AddCheckResult(job, prev_job_id, None,
1317
                              (jqueue._JobDependencyManager.CONTINUE, "cont"))
1318
      if attempt == 2:
1319
        self.assertEqual(depmgr.CountPendingResults(), 2)
1320
      elif attempt > 5:
1321
        self.assertEqual(depmgr.CountPendingResults(), 0)
1322
      else:
1323
        self.assertEqual(depmgr.CountPendingResults(), 1)
1324

    
1325
      result = jqueue._JobProcessor(queue, opexec, job)()
1326
      if attempt == 0 or attempt >= 5:
1327
        # Job should only be updated if there was an actual change
1328
        self.assertEqual(queue.GetNextUpdate(), (job, True))
1329
      self.assertRaises(IndexError, queue.GetNextUpdate)
1330
      self.assertFalse(depmgr.CountPendingResults())
1331

    
1332
      if attempt < 5:
1333
        # Simulate waiting for other job
1334
        self.assertTrue(result)
1335
        self.assertTrue(job.cur_opctx)
1336
        self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITLOCK)
1337
        self.assertRaises(IndexError, depmgr.GetNextNotification)
1338
        self.assert_(job.start_timestamp)
1339
        self.assertFalse(job.end_timestamp)
1340
        continue
1341

    
1342
      if result:
1343
        # Last opcode
1344
        self.assertFalse(job.cur_opctx)
1345
        self.assertEqual(queue.depmgr.GetNextNotification(), job_id)
1346
        break
1347

    
1348
      self.assertRaises(IndexError, depmgr.GetNextNotification)
1349

    
1350
      self.assertFalse(result)
1351
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1352
      self.assert_(job.start_timestamp)
1353
      self.assertFalse(job.end_timestamp)
1354

    
1355
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_SUCCESS)
1356
    self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_SUCCESS])
1357
    self.assertEqual(job.GetInfo(["opresult"]),
1358
                     [[op.input.result for op in job.ops]])
1359
    self.assertEqual(job.GetInfo(["opstatus"]),
1360
                     [len(job.ops) * [constants.OP_STATUS_SUCCESS]])
1361
    self.assertTrue(compat.all(op.start_timestamp and op.end_timestamp
1362
                               for op in job.ops))
1363

    
1364
    self._GenericCheckJob(job)
1365

    
1366
    self.assertRaises(IndexError, queue.GetNextUpdate)
1367
    self.assertRaises(IndexError, depmgr.GetNextNotification)
1368
    self.assertFalse(depmgr.CountPendingResults())
1369
    self.assertFalse(depmgr.CountWaitingJobs())
1370

    
1371
    # Calling the processor on a finished job should be a no-op
1372
    self.assertTrue(jqueue._JobProcessor(queue, opexec, job)())
1373
    self.assertRaises(IndexError, queue.GetNextUpdate)
1374

    
1375
  def testJobDependencyCancel(self):
1376
    depmgr = _FakeDependencyManager()
1377
    queue = _FakeQueueForProc(depmgr=depmgr)
1378

    
1379
    self.assertEqual(queue.depmgr, depmgr)
1380

    
1381
    prev_job_id = 13623
1382
    job_id = 30876
1383
    ops = [
1384
      opcodes.OpTestDummy(result="Res0", fail=False),
1385
      opcodes.OpTestDummy(result="Res1", fail=False,
1386
                          depends=[
1387
                            [prev_job_id, None],
1388
                            ]),
1389
      opcodes.OpTestDummy(result="Res2", fail=False),
1390
      ]
1391

    
1392
    # Create job
1393
    job = self._CreateJob(queue, job_id, ops)
1394

    
1395
    def _BeforeStart(timeout, priority):
1396
      if attempt == 0 or attempt > 5:
1397
        # Job should only be updated when it wasn't waiting for another job
1398
        self.assertEqual(queue.GetNextUpdate(), (job, True))
1399
      self.assertRaises(IndexError, queue.GetNextUpdate)
1400
      self.assertFalse(queue.IsAcquired())
1401
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITLOCK)
1402
      self.assertFalse(job.cur_opctx)
1403

    
1404
    def _AfterStart(op, cbs):
1405
      self.assertEqual(queue.GetNextUpdate(), (job, True))
1406
      self.assertRaises(IndexError, queue.GetNextUpdate)
1407

    
1408
      self.assertFalse(queue.IsAcquired())
1409
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
1410
      self.assertFalse(job.cur_opctx)
1411

    
1412
      # Job is running, cancelling shouldn't be possible
1413
      (success, _) = job.Cancel()
1414
      self.assertFalse(success)
1415

    
1416
    opexec = _FakeExecOpCodeForProc(queue, _BeforeStart, _AfterStart)
1417

    
1418
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1419

    
1420
    counter = itertools.count()
1421
    while True:
1422
      attempt = counter.next()
1423

    
1424
      self.assertRaises(IndexError, queue.GetNextUpdate)
1425
      self.assertRaises(IndexError, depmgr.GetNextNotification)
1426

    
1427
      if attempt == 0:
1428
        # This will handle the first opcode
1429
        pass
1430
      elif attempt < 4:
1431
        depmgr.AddCheckResult(job, prev_job_id, None,
1432
                              (jqueue._JobDependencyManager.WAIT, "wait"))
1433
      elif attempt == 4:
1434
        # Other job was cancelled
1435
        depmgr.AddCheckResult(job, prev_job_id, None,
1436
                              (jqueue._JobDependencyManager.CANCEL, "cancel"))
1437

    
1438
      if attempt == 0:
1439
        self.assertEqual(depmgr.CountPendingResults(), 0)
1440
      else:
1441
        self.assertEqual(depmgr.CountPendingResults(), 1)
1442

    
1443
      result = jqueue._JobProcessor(queue, opexec, job)()
1444
      if attempt <= 1 or attempt >= 4:
1445
        # Job should only be updated if there was an actual change
1446
        self.assertEqual(queue.GetNextUpdate(), (job, True))
1447
      self.assertRaises(IndexError, queue.GetNextUpdate)
1448
      self.assertFalse(depmgr.CountPendingResults())
1449

    
1450
      if attempt > 0 and attempt < 4:
1451
        # Simulate waiting for other job
1452
        self.assertTrue(result)
1453
        self.assertTrue(job.cur_opctx)
1454
        self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITLOCK)
1455
        self.assertRaises(IndexError, depmgr.GetNextNotification)
1456
        self.assert_(job.start_timestamp)
1457
        self.assertFalse(job.end_timestamp)
1458
        continue
1459

    
1460
      if result:
1461
        # Last opcode
1462
        self.assertFalse(job.cur_opctx)
1463
        self.assertEqual(queue.depmgr.GetNextNotification(), job_id)
1464
        break
1465

    
1466
      self.assertRaises(IndexError, depmgr.GetNextNotification)
1467

    
1468
      self.assertFalse(result)
1469
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1470
      self.assert_(job.start_timestamp)
1471
      self.assertFalse(job.end_timestamp)
1472

    
1473
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_CANCELED)
1474
    self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_CANCELED])
1475
    self.assertEqual(job.GetInfo(["opstatus", "opresult"]),
1476
                     [[constants.OP_STATUS_SUCCESS,
1477
                       constants.OP_STATUS_CANCELED,
1478
                       constants.OP_STATUS_CANCELED],
1479
                      ["Res0", "Job canceled by request",
1480
                       "Job canceled by request"]])
1481

    
1482
    self._GenericCheckJob(job)
1483

    
1484
    self.assertRaises(IndexError, queue.GetNextUpdate)
1485
    self.assertRaises(IndexError, depmgr.GetNextNotification)
1486
    self.assertFalse(depmgr.CountPendingResults())
1487

    
1488
    # Calling the processor on a finished job should be a no-op
1489
    self.assertTrue(jqueue._JobProcessor(queue, opexec, job)())
1490
    self.assertRaises(IndexError, queue.GetNextUpdate)
1491

    
1492
  def testJobDependencyWrongstatus(self):
1493
    depmgr = _FakeDependencyManager()
1494
    queue = _FakeQueueForProc(depmgr=depmgr)
1495

    
1496
    self.assertEqual(queue.depmgr, depmgr)
1497

    
1498
    prev_job_id = 9741
1499
    job_id = 11763
1500
    ops = [
1501
      opcodes.OpTestDummy(result="Res0", fail=False),
1502
      opcodes.OpTestDummy(result="Res1", fail=False,
1503
                          depends=[
1504
                            [prev_job_id, None],
1505
                            ]),
1506
      opcodes.OpTestDummy(result="Res2", fail=False),
1507
      ]
1508

    
1509
    # Create job
1510
    job = self._CreateJob(queue, job_id, ops)
1511

    
1512
    def _BeforeStart(timeout, priority):
1513
      if attempt == 0 or attempt > 5:
1514
        # Job should only be updated when it wasn't waiting for another job
1515
        self.assertEqual(queue.GetNextUpdate(), (job, True))
1516
      self.assertRaises(IndexError, queue.GetNextUpdate)
1517
      self.assertFalse(queue.IsAcquired())
1518
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITLOCK)
1519
      self.assertFalse(job.cur_opctx)
1520

    
1521
    def _AfterStart(op, cbs):
1522
      self.assertEqual(queue.GetNextUpdate(), (job, True))
1523
      self.assertRaises(IndexError, queue.GetNextUpdate)
1524

    
1525
      self.assertFalse(queue.IsAcquired())
1526
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
1527
      self.assertFalse(job.cur_opctx)
1528

    
1529
      # Job is running, cancelling shouldn't be possible
1530
      (success, _) = job.Cancel()
1531
      self.assertFalse(success)
1532

    
1533
    opexec = _FakeExecOpCodeForProc(queue, _BeforeStart, _AfterStart)
1534

    
1535
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1536

    
1537
    counter = itertools.count()
1538
    while True:
1539
      attempt = counter.next()
1540

    
1541
      self.assertRaises(IndexError, queue.GetNextUpdate)
1542
      self.assertRaises(IndexError, depmgr.GetNextNotification)
1543

    
1544
      if attempt == 0:
1545
        # This will handle the first opcode
1546
        pass
1547
      elif attempt < 4:
1548
        depmgr.AddCheckResult(job, prev_job_id, None,
1549
                              (jqueue._JobDependencyManager.WAIT, "wait"))
1550
      elif attempt == 4:
1551
        # Other job failed
1552
        depmgr.AddCheckResult(job, prev_job_id, None,
1553
                              (jqueue._JobDependencyManager.WRONGSTATUS, "w"))
1554

    
1555
      if attempt == 0:
1556
        self.assertEqual(depmgr.CountPendingResults(), 0)
1557
      else:
1558
        self.assertEqual(depmgr.CountPendingResults(), 1)
1559

    
1560
      result = jqueue._JobProcessor(queue, opexec, job)()
1561
      if attempt <= 1 or attempt >= 4:
1562
        # Job should only be updated if there was an actual change
1563
        self.assertEqual(queue.GetNextUpdate(), (job, True))
1564
      self.assertRaises(IndexError, queue.GetNextUpdate)
1565
      self.assertFalse(depmgr.CountPendingResults())
1566

    
1567
      if attempt > 0 and attempt < 4:
1568
        # Simulate waiting for other job
1569
        self.assertTrue(result)
1570
        self.assertTrue(job.cur_opctx)
1571
        self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITLOCK)
1572
        self.assertRaises(IndexError, depmgr.GetNextNotification)
1573
        self.assert_(job.start_timestamp)
1574
        self.assertFalse(job.end_timestamp)
1575
        continue
1576

    
1577
      if result:
1578
        # Last opcode
1579
        self.assertFalse(job.cur_opctx)
1580
        self.assertEqual(queue.depmgr.GetNextNotification(), job_id)
1581
        break
1582

    
1583
      self.assertRaises(IndexError, depmgr.GetNextNotification)
1584

    
1585
      self.assertFalse(result)
1586
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1587
      self.assert_(job.start_timestamp)
1588
      self.assertFalse(job.end_timestamp)
1589

    
1590
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_ERROR)
1591
    self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_ERROR])
1592
    self.assertEqual(job.GetInfo(["opstatus"]),
1593
                     [[constants.OP_STATUS_SUCCESS,
1594
                       constants.OP_STATUS_ERROR,
1595
                       constants.OP_STATUS_ERROR]]),
1596

    
1597
    (opresult, ) = job.GetInfo(["opresult"])
1598
    self.assertEqual(len(opresult), len(ops))
1599
    self.assertEqual(opresult[0], "Res0")
1600
    self.assertTrue(errors.GetEncodedError(opresult[1]))
1601
    self.assertTrue(errors.GetEncodedError(opresult[2]))
1602

    
1603
    self._GenericCheckJob(job)
1604

    
1605
    self.assertRaises(IndexError, queue.GetNextUpdate)
1606
    self.assertRaises(IndexError, depmgr.GetNextNotification)
1607
    self.assertFalse(depmgr.CountPendingResults())
1608

    
1609
    # Calling the processor on a finished job should be a no-op
1610
    self.assertTrue(jqueue._JobProcessor(queue, opexec, job)())
1611
    self.assertRaises(IndexError, queue.GetNextUpdate)
1612

    
1613

    
1614
class _FakeTimeoutStrategy:
1615
  def __init__(self, timeouts):
1616
    self.timeouts = timeouts
1617
    self.attempts = 0
1618
    self.last_timeout = None
1619

    
1620
  def NextAttempt(self):
1621
    self.attempts += 1
1622
    if self.timeouts:
1623
      timeout = self.timeouts.pop(0)
1624
    else:
1625
      timeout = None
1626
    self.last_timeout = timeout
1627
    return timeout
1628

    
1629

    
1630
class TestJobProcessorTimeouts(unittest.TestCase, _JobProcessorTestUtils):
1631
  def setUp(self):
1632
    self.queue = _FakeQueueForProc()
1633
    self.job = None
1634
    self.curop = None
1635
    self.opcounter = None
1636
    self.timeout_strategy = None
1637
    self.retries = 0
1638
    self.prev_tsop = None
1639
    self.prev_prio = None
1640
    self.prev_status = None
1641
    self.lock_acq_prio = None
1642
    self.gave_lock = None
1643
    self.done_lock_before_blocking = False
1644

    
1645
  def _BeforeStart(self, timeout, priority):
1646
    job = self.job
1647

    
1648
    # If status has changed, job must've been written
1649
    if self.prev_status != self.job.ops[self.curop].status:
1650
      self.assertEqual(self.queue.GetNextUpdate(), (job, True))
1651
    self.assertRaises(IndexError, self.queue.GetNextUpdate)
1652

    
1653
    self.assertFalse(self.queue.IsAcquired())
1654
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITLOCK)
1655

    
1656
    ts = self.timeout_strategy
1657

    
1658
    self.assert_(timeout is None or isinstance(timeout, (int, float)))
1659
    self.assertEqual(timeout, ts.last_timeout)
1660
    self.assertEqual(priority, job.ops[self.curop].priority)
1661

    
1662
    self.gave_lock = True
1663
    self.lock_acq_prio = priority
1664

    
1665
    if (self.curop == 3 and
1666
        job.ops[self.curop].priority == constants.OP_PRIO_HIGHEST + 3):
1667
      # Give locks before running into blocking acquire
1668
      assert self.retries == 7
1669
      self.retries = 0
1670
      self.done_lock_before_blocking = True
1671
      return
1672

    
1673
    if self.retries > 0:
1674
      self.assert_(timeout is not None)
1675
      self.retries -= 1
1676
      self.gave_lock = False
1677
      raise mcpu.LockAcquireTimeout()
1678

    
1679
    if job.ops[self.curop].priority == constants.OP_PRIO_HIGHEST:
1680
      assert self.retries == 0, "Didn't exhaust all retries at highest priority"
1681
      assert not ts.timeouts
1682
      self.assert_(timeout is None)
1683

    
1684
  def _AfterStart(self, op, cbs):
1685
    job = self.job
1686

    
1687
    # Setting to "running" requires an update
1688
    self.assertEqual(self.queue.GetNextUpdate(), (job, True))
1689
    self.assertRaises(IndexError, self.queue.GetNextUpdate)
1690

    
1691
    self.assertFalse(self.queue.IsAcquired())
1692
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
1693

    
1694
    # Job is running, cancelling shouldn't be possible
1695
    (success, _) = job.Cancel()
1696
    self.assertFalse(success)
1697

    
1698
  def _NextOpcode(self):
1699
    self.curop = self.opcounter.next()
1700
    self.prev_prio = self.job.ops[self.curop].priority
1701
    self.prev_status = self.job.ops[self.curop].status
1702

    
1703
  def _NewTimeoutStrategy(self):
1704
    job = self.job
1705

    
1706
    self.assertEqual(self.retries, 0)
1707

    
1708
    if self.prev_tsop == self.curop:
1709
      # Still on the same opcode, priority must've been increased
1710
      self.assertEqual(self.prev_prio, job.ops[self.curop].priority + 1)
1711

    
1712
    if self.curop == 1:
1713
      # Normal retry
1714
      timeouts = range(10, 31, 10)
1715
      self.retries = len(timeouts) - 1
1716

    
1717
    elif self.curop == 2:
1718
      # Let this run into a blocking acquire
1719
      timeouts = range(11, 61, 12)
1720
      self.retries = len(timeouts)
1721

    
1722
    elif self.curop == 3:
1723
      # Wait for priority to increase, but give lock before blocking acquire
1724
      timeouts = range(12, 100, 14)
1725
      self.retries = len(timeouts)
1726

    
1727
      self.assertFalse(self.done_lock_before_blocking)
1728

    
1729
    elif self.curop == 4:
1730
      self.assert_(self.done_lock_before_blocking)
1731

    
1732
      # Timeouts, but no need to retry
1733
      timeouts = range(10, 31, 10)
1734
      self.retries = 0
1735

    
1736
    elif self.curop == 5:
1737
      # Normal retry
1738
      timeouts = range(19, 100, 11)
1739
      self.retries = len(timeouts)
1740

    
1741
    else:
1742
      timeouts = []
1743
      self.retries = 0
1744

    
1745
    assert len(job.ops) == 10
1746
    assert self.retries <= len(timeouts)
1747

    
1748
    ts = _FakeTimeoutStrategy(timeouts)
1749

    
1750
    self.timeout_strategy = ts
1751
    self.prev_tsop = self.curop
1752
    self.prev_prio = job.ops[self.curop].priority
1753

    
1754
    return ts
1755

    
1756
  def testTimeout(self):
1757
    ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
1758
           for i in range(10)]
1759

    
1760
    # Create job
1761
    job_id = 15801
1762
    job = self._CreateJob(self.queue, job_id, ops)
1763
    self.job = job
1764

    
1765
    self.opcounter = itertools.count(0)
1766

    
1767
    opexec = _FakeExecOpCodeForProc(self.queue, self._BeforeStart,
1768
                                    self._AfterStart)
1769
    tsf = self._NewTimeoutStrategy
1770

    
1771
    self.assertFalse(self.done_lock_before_blocking)
1772

    
1773
    while True:
1774
      proc = jqueue._JobProcessor(self.queue, opexec, job,
1775
                                  _timeout_strategy_factory=tsf)
1776

    
1777
      self.assertRaises(IndexError, self.queue.GetNextUpdate)
1778

    
1779
      if self.curop is not None:
1780
        self.prev_status = self.job.ops[self.curop].status
1781

    
1782
      self.lock_acq_prio = None
1783

    
1784
      result = proc(_nextop_fn=self._NextOpcode)
1785
      assert self.curop is not None
1786

    
1787
      if result or self.gave_lock:
1788
        # Got lock and/or job is done, result must've been written
1789
        self.assertFalse(job.cur_opctx)
1790
        self.assertEqual(self.queue.GetNextUpdate(), (job, True))
1791
        self.assertRaises(IndexError, self.queue.GetNextUpdate)
1792
        self.assertEqual(self.lock_acq_prio, job.ops[self.curop].priority)
1793
        self.assert_(job.ops[self.curop].exec_timestamp)
1794

    
1795
      if result:
1796
        self.assertFalse(job.cur_opctx)
1797
        break
1798

    
1799
      self.assertFalse(result)
1800

    
1801
      if self.curop == 0:
1802
        self.assertEqual(job.ops[self.curop].start_timestamp,
1803
                         job.start_timestamp)
1804

    
1805
      if self.gave_lock:
1806
        # Opcode finished, but job not yet done
1807
        self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1808
      else:
1809
        # Did not get locks
1810
        self.assert_(job.cur_opctx)
1811
        self.assertEqual(job.cur_opctx._timeout_strategy._fn,
1812
                         self.timeout_strategy.NextAttempt)
1813
        self.assertFalse(job.ops[self.curop].exec_timestamp)
1814
        self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITLOCK)
1815

    
1816
        # If priority has changed since acquiring locks, the job must've been
1817
        # updated
1818
        if self.lock_acq_prio != job.ops[self.curop].priority:
1819
          self.assertEqual(self.queue.GetNextUpdate(), (job, True))
1820

    
1821
      self.assertRaises(IndexError, self.queue.GetNextUpdate)
1822

    
1823
      self.assert_(job.start_timestamp)
1824
      self.assertFalse(job.end_timestamp)
1825

    
1826
    self.assertEqual(self.curop, len(job.ops) - 1)
1827
    self.assertEqual(self.job, job)
1828
    self.assertEqual(self.opcounter.next(), len(job.ops))
1829
    self.assert_(self.done_lock_before_blocking)
1830

    
1831
    self.assertRaises(IndexError, self.queue.GetNextUpdate)
1832
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_SUCCESS)
1833
    self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_SUCCESS])
1834
    self.assertEqual(job.GetInfo(["opresult"]),
1835
                     [[op.input.result for op in job.ops]])
1836
    self.assertEqual(job.GetInfo(["opstatus"]),
1837
                     [len(job.ops) * [constants.OP_STATUS_SUCCESS]])
1838
    self.assert_(compat.all(op.start_timestamp and op.end_timestamp
1839
                            for op in job.ops))
1840

    
1841
    # Calling the processor on a finished job should be a no-op
1842
    self.assertTrue(jqueue._JobProcessor(self.queue, opexec, job)())
1843
    self.assertRaises(IndexError, self.queue.GetNextUpdate)
1844

    
1845

    
1846
class TestJobDependencyManager(unittest.TestCase):
1847
  class _FakeJob:
1848
    def __init__(self, job_id):
1849
      self.id = str(job_id)
1850

    
1851
  def setUp(self):
1852
    self._status = []
1853
    self._queue = []
1854
    self.jdm = jqueue._JobDependencyManager(self._GetStatus, self._Enqueue)
1855

    
1856
  def _GetStatus(self, job_id):
1857
    (exp_job_id, result) = self._status.pop(0)
1858
    self.assertEqual(exp_job_id, job_id)
1859
    return result
1860

    
1861
  def _Enqueue(self, jobs):
1862
    self._queue.append(jobs)
1863

    
1864
  def testNotFinalizedThenCancel(self):
1865
    job = self._FakeJob(17697)
1866
    job_id = str(28625)
1867

    
1868
    self._status.append((job_id, constants.JOB_STATUS_RUNNING))
1869
    (result, _) = self.jdm.CheckAndRegister(job, job_id, [])
1870
    self.assertEqual(result, self.jdm.WAIT)
1871
    self.assertFalse(self._status)
1872
    self.assertFalse(self._queue)
1873
    self.assertTrue(self.jdm.JobWaiting(job))
1874
    self.assertEqual(self.jdm._waiters, {
1875
      job_id: set([job]),
1876
      })
1877

    
1878
    self._status.append((job_id, constants.JOB_STATUS_CANCELED))
1879
    (result, _) = self.jdm.CheckAndRegister(job, job_id, [])
1880
    self.assertEqual(result, self.jdm.CANCEL)
1881
    self.assertFalse(self._status)
1882
    self.assertFalse(self._queue)
1883
    self.assertFalse(self.jdm.JobWaiting(job))
1884

    
1885
  def testRequireCancel(self):
1886
    job = self._FakeJob(5278)
1887
    job_id = str(9610)
1888
    dep_status = [constants.JOB_STATUS_CANCELED]
1889

    
1890
    self._status.append((job_id, constants.JOB_STATUS_WAITLOCK))
1891
    (result, _) = self.jdm.CheckAndRegister(job, job_id, dep_status)
1892
    self.assertEqual(result, self.jdm.WAIT)
1893
    self.assertFalse(self._status)
1894
    self.assertFalse(self._queue)
1895
    self.assertTrue(self.jdm.JobWaiting(job))
1896
    self.assertEqual(self.jdm._waiters, {
1897
      job_id: set([job]),
1898
      })
1899

    
1900
    self._status.append((job_id, constants.JOB_STATUS_CANCELED))
1901
    (result, _) = self.jdm.CheckAndRegister(job, job_id, dep_status)
1902
    self.assertEqual(result, self.jdm.CONTINUE)
1903
    self.assertFalse(self._status)
1904
    self.assertFalse(self._queue)
1905
    self.assertFalse(self.jdm.JobWaiting(job))
1906

    
1907
  def testRequireError(self):
1908
    job = self._FakeJob(21459)
1909
    job_id = str(25519)
1910
    dep_status = [constants.JOB_STATUS_ERROR]
1911

    
1912
    self._status.append((job_id, constants.JOB_STATUS_WAITLOCK))
1913
    (result, _) = self.jdm.CheckAndRegister(job, job_id, dep_status)
1914
    self.assertEqual(result, self.jdm.WAIT)
1915
    self.assertFalse(self._status)
1916
    self.assertFalse(self._queue)
1917
    self.assertTrue(self.jdm.JobWaiting(job))
1918
    self.assertEqual(self.jdm._waiters, {
1919
      job_id: set([job]),
1920
      })
1921

    
1922
    self._status.append((job_id, constants.JOB_STATUS_ERROR))
1923
    (result, _) = self.jdm.CheckAndRegister(job, job_id, dep_status)
1924
    self.assertEqual(result, self.jdm.CONTINUE)
1925
    self.assertFalse(self._status)
1926
    self.assertFalse(self._queue)
1927
    self.assertFalse(self.jdm.JobWaiting(job))
1928

    
1929
  def testRequireMultiple(self):
1930
    dep_status = list(constants.JOBS_FINALIZED)
1931

    
1932
    for end_status in dep_status:
1933
      job = self._FakeJob(21343)
1934
      job_id = str(14609)
1935

    
1936
      self._status.append((job_id, constants.JOB_STATUS_WAITLOCK))
1937
      (result, _) = self.jdm.CheckAndRegister(job, job_id, dep_status)
1938
      self.assertEqual(result, self.jdm.WAIT)
1939
      self.assertFalse(self._status)
1940
      self.assertFalse(self._queue)
1941
      self.assertTrue(self.jdm.JobWaiting(job))
1942
      self.assertEqual(self.jdm._waiters, {
1943
        job_id: set([job]),
1944
        })
1945

    
1946
      self._status.append((job_id, end_status))
1947
      (result, _) = self.jdm.CheckAndRegister(job, job_id, dep_status)
1948
      self.assertEqual(result, self.jdm.CONTINUE)
1949
      self.assertFalse(self._status)
1950
      self.assertFalse(self._queue)
1951
      self.assertFalse(self.jdm.JobWaiting(job))
1952

    
1953
  def testNotify(self):
1954
    job = self._FakeJob(8227)
1955
    job_id = str(4113)
1956

    
1957
    self._status.append((job_id, constants.JOB_STATUS_RUNNING))
1958
    (result, _) = self.jdm.CheckAndRegister(job, job_id, [])
1959
    self.assertEqual(result, self.jdm.WAIT)
1960
    self.assertFalse(self._status)
1961
    self.assertFalse(self._queue)
1962
    self.assertTrue(self.jdm.JobWaiting(job))
1963
    self.assertEqual(self.jdm._waiters, {
1964
      job_id: set([job]),
1965
      })
1966

    
1967
    self.jdm.NotifyWaiters(job_id)
1968
    self.assertFalse(self._status)
1969
    self.assertFalse(self.jdm._waiters)
1970
    self.assertFalse(self.jdm.JobWaiting(job))
1971
    self.assertEqual(self._queue, [set([job])])
1972

    
1973
  def testWrongStatus(self):
1974
    job = self._FakeJob(10102)
1975
    job_id = str(1271)
1976

    
1977
    self._status.append((job_id, constants.JOB_STATUS_QUEUED))
1978
    (result, _) = self.jdm.CheckAndRegister(job, job_id,
1979
                                            [constants.JOB_STATUS_SUCCESS])
1980
    self.assertEqual(result, self.jdm.WAIT)
1981
    self.assertFalse(self._status)
1982
    self.assertFalse(self._queue)
1983
    self.assertTrue(self.jdm.JobWaiting(job))
1984
    self.assertEqual(self.jdm._waiters, {
1985
      job_id: set([job]),
1986
      })
1987

    
1988
    self._status.append((job_id, constants.JOB_STATUS_ERROR))
1989
    (result, _) = self.jdm.CheckAndRegister(job, job_id,
1990
                                            [constants.JOB_STATUS_SUCCESS])
1991
    self.assertEqual(result, self.jdm.WRONGSTATUS)
1992
    self.assertFalse(self._status)
1993
    self.assertFalse(self._queue)
1994
    self.assertFalse(self.jdm.JobWaiting(job))
1995

    
1996
  def testCorrectStatus(self):
1997
    job = self._FakeJob(24273)
1998
    job_id = str(23885)
1999

    
2000
    self._status.append((job_id, constants.JOB_STATUS_QUEUED))
2001
    (result, _) = self.jdm.CheckAndRegister(job, job_id,
2002
                                            [constants.JOB_STATUS_SUCCESS])
2003
    self.assertEqual(result, self.jdm.WAIT)
2004
    self.assertFalse(self._status)
2005
    self.assertFalse(self._queue)
2006
    self.assertTrue(self.jdm.JobWaiting(job))
2007
    self.assertEqual(self.jdm._waiters, {
2008
      job_id: set([job]),
2009
      })
2010

    
2011
    self._status.append((job_id, constants.JOB_STATUS_SUCCESS))
2012
    (result, _) = self.jdm.CheckAndRegister(job, job_id,
2013
                                            [constants.JOB_STATUS_SUCCESS])
2014
    self.assertEqual(result, self.jdm.CONTINUE)
2015
    self.assertFalse(self._status)
2016
    self.assertFalse(self._queue)
2017
    self.assertFalse(self.jdm.JobWaiting(job))
2018

    
2019
  def testFinalizedRightAway(self):
2020
    job = self._FakeJob(224)
2021
    job_id = str(3081)
2022

    
2023
    self._status.append((job_id, constants.JOB_STATUS_SUCCESS))
2024
    (result, _) = self.jdm.CheckAndRegister(job, job_id,
2025
                                            [constants.JOB_STATUS_SUCCESS])
2026
    self.assertEqual(result, self.jdm.CONTINUE)
2027
    self.assertFalse(self._status)
2028
    self.assertFalse(self._queue)
2029
    self.assertFalse(self.jdm.JobWaiting(job))
2030
    self.assertEqual(self.jdm._waiters, {
2031
      job_id: set(),
2032
      })
2033

    
2034
    # Force cleanup
2035
    self.jdm.NotifyWaiters("0")
2036
    self.assertFalse(self.jdm._waiters)
2037
    self.assertFalse(self._status)
2038
    self.assertFalse(self._queue)
2039

    
2040
  def testSelfDependency(self):
2041
    job = self._FakeJob(18937)
2042

    
2043
    self._status.append((job.id, constants.JOB_STATUS_SUCCESS))
2044
    (result, _) = self.jdm.CheckAndRegister(job, job.id, [])
2045
    self.assertEqual(result, self.jdm.ERROR)
2046

    
2047
  def testJobDisappears(self):
2048
    job = self._FakeJob(30540)
2049
    job_id = str(23769)
2050

    
2051
    def _FakeStatus(_):
2052
      raise errors.JobLost("#msg#")
2053

    
2054
    jdm = jqueue._JobDependencyManager(_FakeStatus, None)
2055
    (result, _) = jdm.CheckAndRegister(job, job_id, [])
2056
    self.assertEqual(result, self.jdm.ERROR)
2057
    self.assertFalse(jdm.JobWaiting(job))
2058

    
2059

    
2060
if __name__ == "__main__":
2061
  testutils.GanetiTestProgram()