Statistics
| Branch: | Tag: | Revision:

root / test / ganeti.jqueue_unittest.py @ fcb21ad7

History | View | Annotate | Download (74.2 kB)

1
#!/usr/bin/python
2
#
3

    
4
# Copyright (C) 2010, 2011 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Script for testing ganeti.jqueue"""
23

    
24
import os
25
import sys
26
import unittest
27
import tempfile
28
import shutil
29
import errno
30
import itertools
31
import random
32

    
33
from ganeti import constants
34
from ganeti import utils
35
from ganeti import errors
36
from ganeti import jqueue
37
from ganeti import opcodes
38
from ganeti import compat
39
from ganeti import mcpu
40
from ganeti import query
41

    
42
import testutils
43

    
44

    
45
class _FakeJob:
46
  def __init__(self, job_id, status):
47
    self.id = job_id
48
    self.writable = False
49
    self._status = status
50
    self._log = []
51

    
52
  def SetStatus(self, status):
53
    self._status = status
54

    
55
  def AddLogEntry(self, msg):
56
    self._log.append((len(self._log), msg))
57

    
58
  def CalcStatus(self):
59
    return self._status
60

    
61
  def GetInfo(self, fields):
62
    result = []
63

    
64
    for name in fields:
65
      if name == "status":
66
        result.append(self._status)
67
      else:
68
        raise Exception("Unknown field")
69

    
70
    return result
71

    
72
  def GetLogEntries(self, newer_than):
73
    assert newer_than is None or newer_than >= 0
74

    
75
    if newer_than is None:
76
      return self._log
77

    
78
    return self._log[newer_than:]
79

    
80

    
81
class TestJobChangesChecker(unittest.TestCase):
82
  def testStatus(self):
83
    job = _FakeJob(9094, constants.JOB_STATUS_QUEUED)
84
    checker = jqueue._JobChangesChecker(["status"], None, None)
85
    self.assertEqual(checker(job), ([constants.JOB_STATUS_QUEUED], []))
86

    
87
    job.SetStatus(constants.JOB_STATUS_RUNNING)
88
    self.assertEqual(checker(job), ([constants.JOB_STATUS_RUNNING], []))
89

    
90
    job.SetStatus(constants.JOB_STATUS_SUCCESS)
91
    self.assertEqual(checker(job), ([constants.JOB_STATUS_SUCCESS], []))
92

    
93
    # job.id is used by checker
94
    self.assertEqual(job.id, 9094)
95

    
96
  def testStatusWithPrev(self):
97
    job = _FakeJob(12807, constants.JOB_STATUS_QUEUED)
98
    checker = jqueue._JobChangesChecker(["status"],
99
                                        [constants.JOB_STATUS_QUEUED], None)
100
    self.assert_(checker(job) is None)
101

    
102
    job.SetStatus(constants.JOB_STATUS_RUNNING)
103
    self.assertEqual(checker(job), ([constants.JOB_STATUS_RUNNING], []))
104

    
105
  def testFinalStatus(self):
106
    for status in constants.JOBS_FINALIZED:
107
      job = _FakeJob(2178711, status)
108
      checker = jqueue._JobChangesChecker(["status"], [status], None)
109
      # There won't be any changes in this status, hence it should signal
110
      # a change immediately
111
      self.assertEqual(checker(job), ([status], []))
112

    
113
  def testLog(self):
114
    job = _FakeJob(9094, constants.JOB_STATUS_RUNNING)
115
    checker = jqueue._JobChangesChecker(["status"], None, None)
116
    self.assertEqual(checker(job), ([constants.JOB_STATUS_RUNNING], []))
117

    
118
    job.AddLogEntry("Hello World")
119
    (job_info, log_entries) = checker(job)
120
    self.assertEqual(job_info, [constants.JOB_STATUS_RUNNING])
121
    self.assertEqual(log_entries, [[0, "Hello World"]])
122

    
123
    checker2 = jqueue._JobChangesChecker(["status"], job_info, len(log_entries))
124
    self.assert_(checker2(job) is None)
125

    
126
    job.AddLogEntry("Foo Bar")
127
    job.SetStatus(constants.JOB_STATUS_ERROR)
128

    
129
    (job_info, log_entries) = checker2(job)
130
    self.assertEqual(job_info, [constants.JOB_STATUS_ERROR])
131
    self.assertEqual(log_entries, [[1, "Foo Bar"]])
132

    
133
    checker3 = jqueue._JobChangesChecker(["status"], None, None)
134
    (job_info, log_entries) = checker3(job)
135
    self.assertEqual(job_info, [constants.JOB_STATUS_ERROR])
136
    self.assertEqual(log_entries, [[0, "Hello World"], [1, "Foo Bar"]])
137

    
138

    
139
class TestJobChangesWaiter(unittest.TestCase):
140
  def setUp(self):
141
    self.tmpdir = tempfile.mkdtemp()
142
    self.filename = utils.PathJoin(self.tmpdir, "job-1")
143
    utils.WriteFile(self.filename, data="")
144

    
145
  def tearDown(self):
146
    shutil.rmtree(self.tmpdir)
147

    
148
  def _EnsureNotifierClosed(self, notifier):
149
    try:
150
      os.fstat(notifier._fd)
151
    except EnvironmentError, err:
152
      self.assertEqual(err.errno, errno.EBADF)
153
    else:
154
      self.fail("File descriptor wasn't closed")
155

    
156
  def testClose(self):
157
    for wait in [False, True]:
158
      waiter = jqueue._JobFileChangesWaiter(self.filename)
159
      try:
160
        if wait:
161
          waiter.Wait(0.001)
162
      finally:
163
        waiter.Close()
164

    
165
      # Ensure file descriptor was closed
166
      self._EnsureNotifierClosed(waiter._notifier)
167

    
168
  def testChangingFile(self):
169
    waiter = jqueue._JobFileChangesWaiter(self.filename)
170
    try:
171
      self.assertFalse(waiter.Wait(0.1))
172
      utils.WriteFile(self.filename, data="changed")
173
      self.assert_(waiter.Wait(60))
174
    finally:
175
      waiter.Close()
176

    
177
    self._EnsureNotifierClosed(waiter._notifier)
178

    
179
  def testChangingFile2(self):
180
    waiter = jqueue._JobChangesWaiter(self.filename)
181
    try:
182
      self.assertFalse(waiter._filewaiter)
183
      self.assert_(waiter.Wait(0.1))
184
      self.assert_(waiter._filewaiter)
185

    
186
      # File waiter is now used, but there have been no changes
187
      self.assertFalse(waiter.Wait(0.1))
188
      utils.WriteFile(self.filename, data="changed")
189
      self.assert_(waiter.Wait(60))
190
    finally:
191
      waiter.Close()
192

    
193
    self._EnsureNotifierClosed(waiter._filewaiter._notifier)
194

    
195

    
196
class TestWaitForJobChangesHelper(unittest.TestCase):
197
  def setUp(self):
198
    self.tmpdir = tempfile.mkdtemp()
199
    self.filename = utils.PathJoin(self.tmpdir, "job-2614226563")
200
    utils.WriteFile(self.filename, data="")
201

    
202
  def tearDown(self):
203
    shutil.rmtree(self.tmpdir)
204

    
205
  def _LoadWaitingJob(self):
206
    return _FakeJob(2614226563, constants.JOB_STATUS_WAITING)
207

    
208
  def _LoadLostJob(self):
209
    return None
210

    
211
  def testNoChanges(self):
212
    wfjc = jqueue._WaitForJobChangesHelper()
213

    
214
    # No change
215
    self.assertEqual(wfjc(self.filename, self._LoadWaitingJob, ["status"],
216
                          [constants.JOB_STATUS_WAITING], None, 0.1),
217
                     constants.JOB_NOTCHANGED)
218

    
219
    # No previous information
220
    self.assertEqual(wfjc(self.filename, self._LoadWaitingJob,
221
                          ["status"], None, None, 1.0),
222
                     ([constants.JOB_STATUS_WAITING], []))
223

    
224
  def testLostJob(self):
225
    wfjc = jqueue._WaitForJobChangesHelper()
226
    self.assert_(wfjc(self.filename, self._LoadLostJob,
227
                      ["status"], None, None, 1.0) is None)
228

    
229

    
230
class TestEncodeOpError(unittest.TestCase):
231
  def test(self):
232
    encerr = jqueue._EncodeOpError(errors.LockError("Test 1"))
233
    self.assert_(isinstance(encerr, tuple))
234
    self.assertRaises(errors.LockError, errors.MaybeRaise, encerr)
235

    
236
    encerr = jqueue._EncodeOpError(errors.GenericError("Test 2"))
237
    self.assert_(isinstance(encerr, tuple))
238
    self.assertRaises(errors.GenericError, errors.MaybeRaise, encerr)
239

    
240
    encerr = jqueue._EncodeOpError(NotImplementedError("Foo"))
241
    self.assert_(isinstance(encerr, tuple))
242
    self.assertRaises(errors.OpExecError, errors.MaybeRaise, encerr)
243

    
244
    encerr = jqueue._EncodeOpError("Hello World")
245
    self.assert_(isinstance(encerr, tuple))
246
    self.assertRaises(errors.OpExecError, errors.MaybeRaise, encerr)
247

    
248

    
249
class TestQueuedOpCode(unittest.TestCase):
250
  def testDefaults(self):
251
    def _Check(op):
252
      self.assertFalse(hasattr(op.input, "dry_run"))
253
      self.assertEqual(op.priority, constants.OP_PRIO_DEFAULT)
254
      self.assertFalse(op.log)
255
      self.assert_(op.start_timestamp is None)
256
      self.assert_(op.exec_timestamp is None)
257
      self.assert_(op.end_timestamp is None)
258
      self.assert_(op.result is None)
259
      self.assertEqual(op.status, constants.OP_STATUS_QUEUED)
260

    
261
    op1 = jqueue._QueuedOpCode(opcodes.OpTestDelay())
262
    _Check(op1)
263
    op2 = jqueue._QueuedOpCode.Restore(op1.Serialize())
264
    _Check(op2)
265
    self.assertEqual(op1.Serialize(), op2.Serialize())
266

    
267
  def testPriority(self):
268
    def _Check(op):
269
      assert constants.OP_PRIO_DEFAULT != constants.OP_PRIO_HIGH, \
270
             "Default priority equals high priority; test can't work"
271
      self.assertEqual(op.priority, constants.OP_PRIO_HIGH)
272
      self.assertEqual(op.status, constants.OP_STATUS_QUEUED)
273

    
274
    inpop = opcodes.OpTagsGet(priority=constants.OP_PRIO_HIGH)
275
    op1 = jqueue._QueuedOpCode(inpop)
276
    _Check(op1)
277
    op2 = jqueue._QueuedOpCode.Restore(op1.Serialize())
278
    _Check(op2)
279
    self.assertEqual(op1.Serialize(), op2.Serialize())
280

    
281

    
282
class TestQueuedJob(unittest.TestCase):
283
  def test(self):
284
    self.assertRaises(errors.GenericError, jqueue._QueuedJob,
285
                      None, 1, [], False)
286

    
287
  def testDefaults(self):
288
    job_id = 4260
289
    ops = [
290
      opcodes.OpTagsGet(),
291
      opcodes.OpTestDelay(),
292
      ]
293

    
294
    def _Check(job):
295
      self.assertTrue(job.writable)
296
      self.assertEqual(job.id, job_id)
297
      self.assertEqual(job.log_serial, 0)
298
      self.assert_(job.received_timestamp)
299
      self.assert_(job.start_timestamp is None)
300
      self.assert_(job.end_timestamp is None)
301
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
302
      self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT)
303
      self.assert_(repr(job).startswith("<"))
304
      self.assertEqual(len(job.ops), len(ops))
305
      self.assert_(compat.all(inp.__getstate__() == op.input.__getstate__()
306
                              for (inp, op) in zip(ops, job.ops)))
307
      self.assertRaises(errors.OpExecError, job.GetInfo,
308
                        ["unknown-field"])
309
      self.assertEqual(job.GetInfo(["summary"]),
310
                       [[op.input.Summary() for op in job.ops]])
311

    
312
    job1 = jqueue._QueuedJob(None, job_id, ops, True)
313
    _Check(job1)
314
    job2 = jqueue._QueuedJob.Restore(None, job1.Serialize(), True)
315
    _Check(job2)
316
    self.assertEqual(job1.Serialize(), job2.Serialize())
317

    
318
  def testWritable(self):
319
    job = jqueue._QueuedJob(None, 1, [opcodes.OpTestDelay()], False)
320
    self.assertFalse(job.writable)
321

    
322
    job = jqueue._QueuedJob(None, 1, [opcodes.OpTestDelay()], True)
323
    self.assertTrue(job.writable)
324

    
325
  def testPriority(self):
326
    job_id = 4283
327
    ops = [
328
      opcodes.OpTagsGet(priority=constants.OP_PRIO_DEFAULT),
329
      opcodes.OpTestDelay(),
330
      ]
331

    
332
    def _Check(job):
333
      self.assertEqual(job.id, job_id)
334
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
335
      self.assert_(repr(job).startswith("<"))
336

    
337
    job = jqueue._QueuedJob(None, job_id, ops, True)
338
    _Check(job)
339
    self.assert_(compat.all(op.priority == constants.OP_PRIO_DEFAULT
340
                            for op in job.ops))
341
    self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT)
342

    
343
    # Increase first
344
    job.ops[0].priority -= 1
345
    _Check(job)
346
    self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT - 1)
347

    
348
    # Mark opcode as finished
349
    job.ops[0].status = constants.OP_STATUS_SUCCESS
350
    _Check(job)
351
    self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT)
352

    
353
    # Increase second
354
    job.ops[1].priority -= 10
355
    self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT - 10)
356

    
357
    # Test increasing first
358
    job.ops[0].status = constants.OP_STATUS_RUNNING
359
    job.ops[0].priority -= 19
360
    self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT - 20)
361

    
362
  def testCalcStatus(self):
363
    def _Queued(ops):
364
      # The default status is "queued"
365
      self.assert_(compat.all(op.status == constants.OP_STATUS_QUEUED
366
                              for op in ops))
367

    
368
    def _Waitlock1(ops):
369
      ops[0].status = constants.OP_STATUS_WAITING
370

    
371
    def _Waitlock2(ops):
372
      ops[0].status = constants.OP_STATUS_SUCCESS
373
      ops[1].status = constants.OP_STATUS_SUCCESS
374
      ops[2].status = constants.OP_STATUS_WAITING
375

    
376
    def _Running(ops):
377
      ops[0].status = constants.OP_STATUS_SUCCESS
378
      ops[1].status = constants.OP_STATUS_RUNNING
379
      for op in ops[2:]:
380
        op.status = constants.OP_STATUS_QUEUED
381

    
382
    def _Canceling1(ops):
383
      ops[0].status = constants.OP_STATUS_SUCCESS
384
      ops[1].status = constants.OP_STATUS_SUCCESS
385
      for op in ops[2:]:
386
        op.status = constants.OP_STATUS_CANCELING
387

    
388
    def _Canceling2(ops):
389
      for op in ops:
390
        op.status = constants.OP_STATUS_CANCELING
391

    
392
    def _Canceled(ops):
393
      for op in ops:
394
        op.status = constants.OP_STATUS_CANCELED
395

    
396
    def _Error1(ops):
397
      for idx, op in enumerate(ops):
398
        if idx > 3:
399
          op.status = constants.OP_STATUS_ERROR
400
        else:
401
          op.status = constants.OP_STATUS_SUCCESS
402

    
403
    def _Error2(ops):
404
      for op in ops:
405
        op.status = constants.OP_STATUS_ERROR
406

    
407
    def _Success(ops):
408
      for op in ops:
409
        op.status = constants.OP_STATUS_SUCCESS
410

    
411
    tests = {
412
      constants.JOB_STATUS_QUEUED: [_Queued],
413
      constants.JOB_STATUS_WAITING: [_Waitlock1, _Waitlock2],
414
      constants.JOB_STATUS_RUNNING: [_Running],
415
      constants.JOB_STATUS_CANCELING: [_Canceling1, _Canceling2],
416
      constants.JOB_STATUS_CANCELED: [_Canceled],
417
      constants.JOB_STATUS_ERROR: [_Error1, _Error2],
418
      constants.JOB_STATUS_SUCCESS: [_Success],
419
      }
420

    
421
    def _NewJob():
422
      job = jqueue._QueuedJob(None, 1,
423
                              [opcodes.OpTestDelay() for _ in range(10)],
424
                              True)
425
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
426
      self.assert_(compat.all(op.status == constants.OP_STATUS_QUEUED
427
                              for op in job.ops))
428
      return job
429

    
430
    for status in constants.JOB_STATUS_ALL:
431
      sttests = tests[status]
432
      assert sttests
433
      for fn in sttests:
434
        job = _NewJob()
435
        fn(job.ops)
436
        self.assertEqual(job.CalcStatus(), status)
437

    
438

    
439
class _FakeDependencyManager:
440
  def __init__(self):
441
    self._checks = []
442
    self._notifications = []
443
    self._waiting = set()
444

    
445
  def AddCheckResult(self, job, dep_job_id, dep_status, result):
446
    self._checks.append((job, dep_job_id, dep_status, result))
447

    
448
  def CountPendingResults(self):
449
    return len(self._checks)
450

    
451
  def CountWaitingJobs(self):
452
    return len(self._waiting)
453

    
454
  def GetNextNotification(self):
455
    return self._notifications.pop(0)
456

    
457
  def JobWaiting(self, job):
458
    return job in self._waiting
459

    
460
  def CheckAndRegister(self, job, dep_job_id, dep_status):
461
    (exp_job, exp_dep_job_id, exp_dep_status, result) = self._checks.pop(0)
462

    
463
    assert exp_job == job
464
    assert exp_dep_job_id == dep_job_id
465
    assert exp_dep_status == dep_status
466

    
467
    (result_status, _) = result
468

    
469
    if result_status == jqueue._JobDependencyManager.WAIT:
470
      self._waiting.add(job)
471
    elif result_status == jqueue._JobDependencyManager.CONTINUE:
472
      self._waiting.remove(job)
473

    
474
    return result
475

    
476
  def NotifyWaiters(self, job_id):
477
    self._notifications.append(job_id)
478

    
479

    
480
class _DisabledFakeDependencyManager:
481
  def JobWaiting(self, _):
482
    return False
483

    
484
  def CheckAndRegister(self, *args):
485
    assert False, "Should not be called"
486

    
487
  def NotifyWaiters(self, _):
488
    pass
489

    
490

    
491
class _FakeQueueForProc:
492
  def __init__(self, depmgr=None):
493
    self._acquired = False
494
    self._updates = []
495
    self._submitted = []
496

    
497
    self._submit_count = itertools.count(1000)
498

    
499
    if depmgr:
500
      self.depmgr = depmgr
501
    else:
502
      self.depmgr = _DisabledFakeDependencyManager()
503

    
504
  def IsAcquired(self):
505
    return self._acquired
506

    
507
  def GetNextUpdate(self):
508
    return self._updates.pop(0)
509

    
510
  def GetNextSubmittedJob(self):
511
    return self._submitted.pop(0)
512

    
513
  def acquire(self, shared=0):
514
    assert shared == 1
515
    self._acquired = True
516

    
517
  def release(self):
518
    assert self._acquired
519
    self._acquired = False
520

    
521
  def UpdateJobUnlocked(self, job, replicate=True):
522
    assert self._acquired, "Lock not acquired while updating job"
523
    self._updates.append((job, bool(replicate)))
524

    
525
  def SubmitManyJobs(self, jobs):
526
    assert not self._acquired, "Lock acquired while submitting jobs"
527
    job_ids = [self._submit_count.next() for _ in jobs]
528
    self._submitted.extend(zip(job_ids, jobs))
529
    return job_ids
530

    
531

    
532
class _FakeExecOpCodeForProc:
533
  def __init__(self, queue, before_start, after_start):
534
    self._queue = queue
535
    self._before_start = before_start
536
    self._after_start = after_start
537

    
538
  def __call__(self, op, cbs, timeout=None, priority=None):
539
    assert isinstance(op, opcodes.OpTestDummy)
540
    assert not self._queue.IsAcquired(), \
541
           "Queue lock not released when executing opcode"
542

    
543
    if self._before_start:
544
      self._before_start(timeout, priority)
545

    
546
    cbs.NotifyStart()
547

    
548
    if self._after_start:
549
      self._after_start(op, cbs)
550

    
551
    # Check again after the callbacks
552
    assert not self._queue.IsAcquired()
553

    
554
    if op.fail:
555
      raise errors.OpExecError("Error requested (%s)" % op.result)
556

    
557
    if hasattr(op, "submit_jobs") and op.submit_jobs is not None:
558
      return cbs.SubmitManyJobs(op.submit_jobs)
559

    
560
    return op.result
561

    
562

    
563
class _JobProcessorTestUtils:
564
  def _CreateJob(self, queue, job_id, ops):
565
    job = jqueue._QueuedJob(queue, job_id, ops, True)
566
    self.assertFalse(job.start_timestamp)
567
    self.assertFalse(job.end_timestamp)
568
    self.assertEqual(len(ops), len(job.ops))
569
    self.assert_(compat.all(op.input == inp
570
                            for (op, inp) in zip(job.ops, ops)))
571
    self.assertEqual(job.GetInfo(["ops"]), [[op.__getstate__() for op in ops]])
572
    return job
573

    
574

    
575
class TestJobProcessor(unittest.TestCase, _JobProcessorTestUtils):
576
  def _GenericCheckJob(self, job):
577
    assert compat.all(isinstance(op.input, opcodes.OpTestDummy)
578
                      for op in job.ops)
579

    
580
    self.assertEqual(job.GetInfo(["opstart", "opexec", "opend"]),
581
                     [[op.start_timestamp for op in job.ops],
582
                      [op.exec_timestamp for op in job.ops],
583
                      [op.end_timestamp for op in job.ops]])
584
    self.assertEqual(job.GetInfo(["received_ts", "start_ts", "end_ts"]),
585
                     [job.received_timestamp,
586
                      job.start_timestamp,
587
                      job.end_timestamp])
588
    self.assert_(job.start_timestamp)
589
    self.assert_(job.end_timestamp)
590
    self.assertEqual(job.start_timestamp, job.ops[0].start_timestamp)
591

    
592
  def testSuccess(self):
593
    queue = _FakeQueueForProc()
594

    
595
    for (job_id, opcount) in [(25351, 1), (6637, 3),
596
                              (24644, 10), (32207, 100)]:
597
      ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
598
             for i in range(opcount)]
599

    
600
      # Create job
601
      job = self._CreateJob(queue, job_id, ops)
602

    
603
      def _BeforeStart(timeout, priority):
604
        self.assertEqual(queue.GetNextUpdate(), (job, True))
605
        self.assertRaises(IndexError, queue.GetNextUpdate)
606
        self.assertFalse(queue.IsAcquired())
607
        self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITING)
608
        self.assertFalse(job.cur_opctx)
609

    
610
      def _AfterStart(op, cbs):
611
        self.assertEqual(queue.GetNextUpdate(), (job, True))
612
        self.assertRaises(IndexError, queue.GetNextUpdate)
613

    
614
        self.assertFalse(queue.IsAcquired())
615
        self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
616
        self.assertFalse(job.cur_opctx)
617

    
618
        # Job is running, cancelling shouldn't be possible
619
        (success, _) = job.Cancel()
620
        self.assertFalse(success)
621

    
622
      opexec = _FakeExecOpCodeForProc(queue, _BeforeStart, _AfterStart)
623

    
624
      for idx in range(len(ops)):
625
        self.assertRaises(IndexError, queue.GetNextUpdate)
626
        result = jqueue._JobProcessor(queue, opexec, job)()
627
        self.assertEqual(queue.GetNextUpdate(), (job, True))
628
        self.assertRaises(IndexError, queue.GetNextUpdate)
629
        if idx == len(ops) - 1:
630
          # Last opcode
631
          self.assertEqual(result, jqueue._JobProcessor.FINISHED)
632
        else:
633
          self.assertEqual(result, jqueue._JobProcessor.DEFER)
634

    
635
          self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
636
          self.assert_(job.start_timestamp)
637
          self.assertFalse(job.end_timestamp)
638

    
639
      self.assertRaises(IndexError, queue.GetNextUpdate)
640

    
641
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_SUCCESS)
642
      self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_SUCCESS])
643
      self.assertEqual(job.GetInfo(["opresult"]),
644
                       [[op.input.result for op in job.ops]])
645
      self.assertEqual(job.GetInfo(["opstatus"]),
646
                       [len(job.ops) * [constants.OP_STATUS_SUCCESS]])
647
      self.assert_(compat.all(op.start_timestamp and op.end_timestamp
648
                              for op in job.ops))
649

    
650
      self._GenericCheckJob(job)
651

    
652
      # Calling the processor on a finished job should be a no-op
653
      self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
654
                       jqueue._JobProcessor.FINISHED)
655
      self.assertRaises(IndexError, queue.GetNextUpdate)
656

    
657
  def testOpcodeError(self):
658
    queue = _FakeQueueForProc()
659

    
660
    testdata = [
661
      (17077, 1, 0, 0),
662
      (1782, 5, 2, 2),
663
      (18179, 10, 9, 9),
664
      (4744, 10, 3, 8),
665
      (23816, 100, 39, 45),
666
      ]
667

    
668
    for (job_id, opcount, failfrom, failto) in testdata:
669
      # Prepare opcodes
670
      ops = [opcodes.OpTestDummy(result="Res%s" % i,
671
                                 fail=(failfrom <= i and
672
                                       i <= failto))
673
             for i in range(opcount)]
674

    
675
      # Create job
676
      job = self._CreateJob(queue, job_id, ops)
677

    
678
      opexec = _FakeExecOpCodeForProc(queue, None, None)
679

    
680
      for idx in range(len(ops)):
681
        self.assertRaises(IndexError, queue.GetNextUpdate)
682
        result = jqueue._JobProcessor(queue, opexec, job)()
683
        # queued to waitlock
684
        self.assertEqual(queue.GetNextUpdate(), (job, True))
685
        # waitlock to running
686
        self.assertEqual(queue.GetNextUpdate(), (job, True))
687
        # Opcode result
688
        self.assertEqual(queue.GetNextUpdate(), (job, True))
689
        self.assertRaises(IndexError, queue.GetNextUpdate)
690

    
691
        if idx in (failfrom, len(ops) - 1):
692
          # Last opcode
693
          self.assertEqual(result, jqueue._JobProcessor.FINISHED)
694
          break
695

    
696
        self.assertEqual(result, jqueue._JobProcessor.DEFER)
697

    
698
        self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
699

    
700
      self.assertRaises(IndexError, queue.GetNextUpdate)
701

    
702
      # Check job status
703
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_ERROR)
704
      self.assertEqual(job.GetInfo(["id"]), [job_id])
705
      self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_ERROR])
706

    
707
      # Check opcode status
708
      data = zip(job.ops,
709
                 job.GetInfo(["opstatus"])[0],
710
                 job.GetInfo(["opresult"])[0])
711

    
712
      for idx, (op, opstatus, opresult) in enumerate(data):
713
        if idx < failfrom:
714
          assert not op.input.fail
715
          self.assertEqual(opstatus, constants.OP_STATUS_SUCCESS)
716
          self.assertEqual(opresult, op.input.result)
717
        elif idx <= failto:
718
          assert op.input.fail
719
          self.assertEqual(opstatus, constants.OP_STATUS_ERROR)
720
          self.assertRaises(errors.OpExecError, errors.MaybeRaise, opresult)
721
        else:
722
          assert not op.input.fail
723
          self.assertEqual(opstatus, constants.OP_STATUS_ERROR)
724
          self.assertRaises(errors.OpExecError, errors.MaybeRaise, opresult)
725

    
726
      self.assert_(compat.all(op.start_timestamp and op.end_timestamp
727
                              for op in job.ops[:failfrom]))
728

    
729
      self._GenericCheckJob(job)
730

    
731
      # Calling the processor on a finished job should be a no-op
732
      self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
733
                       jqueue._JobProcessor.FINISHED)
734
      self.assertRaises(IndexError, queue.GetNextUpdate)
735

    
736
  def testCancelWhileInQueue(self):
737
    queue = _FakeQueueForProc()
738

    
739
    ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
740
           for i in range(5)]
741

    
742
    # Create job
743
    job_id = 17045
744
    job = self._CreateJob(queue, job_id, ops)
745

    
746
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
747

    
748
    # Mark as cancelled
749
    (success, _) = job.Cancel()
750
    self.assert_(success)
751

    
752
    self.assertRaises(IndexError, queue.GetNextUpdate)
753

    
754
    self.assertFalse(job.start_timestamp)
755
    self.assertTrue(job.end_timestamp)
756
    self.assert_(compat.all(op.status == constants.OP_STATUS_CANCELED
757
                            for op in job.ops))
758

    
759
    # Serialize to check for differences
760
    before_proc = job.Serialize()
761

    
762
    # Simulate processor called in workerpool
763
    opexec = _FakeExecOpCodeForProc(queue, None, None)
764
    self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
765
                     jqueue._JobProcessor.FINISHED)
766

    
767
    # Check result
768
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_CANCELED)
769
    self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_CANCELED])
770
    self.assertFalse(job.start_timestamp)
771
    self.assertTrue(job.end_timestamp)
772
    self.assertFalse(compat.any(op.start_timestamp or op.end_timestamp
773
                                for op in job.ops))
774
    self.assertEqual(job.GetInfo(["opstatus", "opresult"]),
775
                     [[constants.OP_STATUS_CANCELED for _ in job.ops],
776
                      ["Job canceled by request" for _ in job.ops]])
777

    
778
    # Must not have changed or written
779
    self.assertEqual(before_proc, job.Serialize())
780
    self.assertRaises(IndexError, queue.GetNextUpdate)
781

    
782
  def testCancelWhileWaitlockInQueue(self):
783
    queue = _FakeQueueForProc()
784

    
785
    ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
786
           for i in range(5)]
787

    
788
    # Create job
789
    job_id = 8645
790
    job = self._CreateJob(queue, job_id, ops)
791

    
792
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
793

    
794
    job.ops[0].status = constants.OP_STATUS_WAITING
795

    
796
    assert len(job.ops) == 5
797

    
798
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITING)
799

    
800
    # Mark as cancelling
801
    (success, _) = job.Cancel()
802
    self.assert_(success)
803

    
804
    self.assertRaises(IndexError, queue.GetNextUpdate)
805

    
806
    self.assert_(compat.all(op.status == constants.OP_STATUS_CANCELING
807
                            for op in job.ops))
808

    
809
    opexec = _FakeExecOpCodeForProc(queue, None, None)
810
    self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
811
                     jqueue._JobProcessor.FINISHED)
812

    
813
    # Check result
814
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_CANCELED)
815
    self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_CANCELED])
816
    self.assertFalse(job.start_timestamp)
817
    self.assert_(job.end_timestamp)
818
    self.assertFalse(compat.any(op.start_timestamp or op.end_timestamp
819
                                for op in job.ops))
820
    self.assertEqual(job.GetInfo(["opstatus", "opresult"]),
821
                     [[constants.OP_STATUS_CANCELED for _ in job.ops],
822
                      ["Job canceled by request" for _ in job.ops]])
823

    
824
  def testCancelWhileWaitlock(self):
825
    queue = _FakeQueueForProc()
826

    
827
    ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
828
           for i in range(5)]
829

    
830
    # Create job
831
    job_id = 11009
832
    job = self._CreateJob(queue, job_id, ops)
833

    
834
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
835

    
836
    def _BeforeStart(timeout, priority):
837
      self.assertEqual(queue.GetNextUpdate(), (job, True))
838
      self.assertRaises(IndexError, queue.GetNextUpdate)
839
      self.assertFalse(queue.IsAcquired())
840
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITING)
841

    
842
      # Mark as cancelled
843
      (success, _) = job.Cancel()
844
      self.assert_(success)
845

    
846
      self.assert_(compat.all(op.status == constants.OP_STATUS_CANCELING
847
                              for op in job.ops))
848
      self.assertRaises(IndexError, queue.GetNextUpdate)
849

    
850
    def _AfterStart(op, cbs):
851
      self.assertEqual(queue.GetNextUpdate(), (job, True))
852
      self.assertRaises(IndexError, queue.GetNextUpdate)
853
      self.assertFalse(queue.IsAcquired())
854
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
855

    
856
    opexec = _FakeExecOpCodeForProc(queue, _BeforeStart, _AfterStart)
857

    
858
    self.assertRaises(IndexError, queue.GetNextUpdate)
859
    self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
860
                     jqueue._JobProcessor.FINISHED)
861
    self.assertEqual(queue.GetNextUpdate(), (job, True))
862
    self.assertRaises(IndexError, queue.GetNextUpdate)
863

    
864
    # Check result
865
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_CANCELED)
866
    self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_CANCELED])
867
    self.assert_(job.start_timestamp)
868
    self.assert_(job.end_timestamp)
869
    self.assertFalse(compat.all(op.start_timestamp and op.end_timestamp
870
                                for op in job.ops))
871
    self.assertEqual(job.GetInfo(["opstatus", "opresult"]),
872
                     [[constants.OP_STATUS_CANCELED for _ in job.ops],
873
                      ["Job canceled by request" for _ in job.ops]])
874

    
875
  def testCancelWhileWaitlockWithTimeout(self):
876
    queue = _FakeQueueForProc()
877

    
878
    ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
879
           for i in range(5)]
880

    
881
    # Create job
882
    job_id = 24314
883
    job = self._CreateJob(queue, job_id, ops)
884

    
885
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
886

    
887
    def _BeforeStart(timeout, priority):
888
      self.assertFalse(queue.IsAcquired())
889
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITING)
890

    
891
      # Mark as cancelled
892
      (success, _) = job.Cancel()
893
      self.assert_(success)
894

    
895
      self.assert_(compat.all(op.status == constants.OP_STATUS_CANCELING
896
                              for op in job.ops))
897

    
898
      # Fake an acquire attempt timing out
899
      raise mcpu.LockAcquireTimeout()
900

    
901
    def _AfterStart(op, cbs):
902
      self.fail("Should not reach this")
903

    
904
    opexec = _FakeExecOpCodeForProc(queue, _BeforeStart, _AfterStart)
905

    
906
    self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
907
                     jqueue._JobProcessor.FINISHED)
908

    
909
    # Check result
910
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_CANCELED)
911
    self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_CANCELED])
912
    self.assert_(job.start_timestamp)
913
    self.assert_(job.end_timestamp)
914
    self.assertFalse(compat.all(op.start_timestamp and op.end_timestamp
915
                                for op in job.ops))
916
    self.assertEqual(job.GetInfo(["opstatus", "opresult"]),
917
                     [[constants.OP_STATUS_CANCELED for _ in job.ops],
918
                      ["Job canceled by request" for _ in job.ops]])
919

    
920
  def testCancelWhileRunning(self):
921
    # Tests canceling a job with finished opcodes and more, unprocessed ones
922
    queue = _FakeQueueForProc()
923

    
924
    ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
925
           for i in range(3)]
926

    
927
    # Create job
928
    job_id = 28492
929
    job = self._CreateJob(queue, job_id, ops)
930

    
931
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
932

    
933
    opexec = _FakeExecOpCodeForProc(queue, None, None)
934

    
935
    # Run one opcode
936
    self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
937
                     jqueue._JobProcessor.DEFER)
938

    
939
    # Job goes back to queued
940
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
941
    self.assertEqual(job.GetInfo(["opstatus", "opresult"]),
942
                     [[constants.OP_STATUS_SUCCESS,
943
                       constants.OP_STATUS_QUEUED,
944
                       constants.OP_STATUS_QUEUED],
945
                      ["Res0", None, None]])
946

    
947
    # Mark as cancelled
948
    (success, _) = job.Cancel()
949
    self.assert_(success)
950

    
951
    # Try processing another opcode (this will actually cancel the job)
952
    self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
953
                     jqueue._JobProcessor.FINISHED)
954

    
955
    # Check result
956
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_CANCELED)
957
    self.assertEqual(job.GetInfo(["id"]), [job_id])
958
    self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_CANCELED])
959
    self.assertEqual(job.GetInfo(["opstatus", "opresult"]),
960
                     [[constants.OP_STATUS_SUCCESS,
961
                       constants.OP_STATUS_CANCELED,
962
                       constants.OP_STATUS_CANCELED],
963
                      ["Res0", "Job canceled by request",
964
                       "Job canceled by request"]])
965

    
966
  def testPartiallyRun(self):
967
    # Tests calling the processor on a job that's been partially run before the
968
    # program was restarted
969
    queue = _FakeQueueForProc()
970

    
971
    opexec = _FakeExecOpCodeForProc(queue, None, None)
972

    
973
    for job_id, successcount in [(30697, 1), (2552, 4), (12489, 9)]:
974
      ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
975
             for i in range(10)]
976

    
977
      # Create job
978
      job = self._CreateJob(queue, job_id, ops)
979

    
980
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
981

    
982
      for _ in range(successcount):
983
        self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
984
                         jqueue._JobProcessor.DEFER)
985

    
986
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
987
      self.assertEqual(job.GetInfo(["opstatus"]),
988
                       [[constants.OP_STATUS_SUCCESS
989
                         for _ in range(successcount)] +
990
                        [constants.OP_STATUS_QUEUED
991
                         for _ in range(len(ops) - successcount)]])
992

    
993
      self.assert_(job.ops_iter)
994

    
995
      # Serialize and restore (simulates program restart)
996
      newjob = jqueue._QueuedJob.Restore(queue, job.Serialize(), True)
997
      self.assertFalse(newjob.ops_iter)
998
      self._TestPartial(newjob, successcount)
999

    
1000
  def _TestPartial(self, job, successcount):
1001
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1002
    self.assertEqual(job.start_timestamp, job.ops[0].start_timestamp)
1003

    
1004
    queue = _FakeQueueForProc()
1005
    opexec = _FakeExecOpCodeForProc(queue, None, None)
1006

    
1007
    for remaining in reversed(range(len(job.ops) - successcount)):
1008
      result = jqueue._JobProcessor(queue, opexec, job)()
1009
      self.assertEqual(queue.GetNextUpdate(), (job, True))
1010
      self.assertEqual(queue.GetNextUpdate(), (job, True))
1011
      self.assertEqual(queue.GetNextUpdate(), (job, True))
1012
      self.assertRaises(IndexError, queue.GetNextUpdate)
1013

    
1014
      if remaining == 0:
1015
        # Last opcode
1016
        self.assertEqual(result, jqueue._JobProcessor.FINISHED)
1017
        break
1018

    
1019
      self.assertEqual(result, jqueue._JobProcessor.DEFER)
1020

    
1021
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1022

    
1023
    self.assertRaises(IndexError, queue.GetNextUpdate)
1024
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_SUCCESS)
1025
    self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_SUCCESS])
1026
    self.assertEqual(job.GetInfo(["opresult"]),
1027
                     [[op.input.result for op in job.ops]])
1028
    self.assertEqual(job.GetInfo(["opstatus"]),
1029
                     [[constants.OP_STATUS_SUCCESS for _ in job.ops]])
1030
    self.assert_(compat.all(op.start_timestamp and op.end_timestamp
1031
                            for op in job.ops))
1032

    
1033
    self._GenericCheckJob(job)
1034

    
1035
    # Calling the processor on a finished job should be a no-op
1036
    self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
1037
                     jqueue._JobProcessor.FINISHED)
1038
    self.assertRaises(IndexError, queue.GetNextUpdate)
1039

    
1040
    # ... also after being restored
1041
    job2 = jqueue._QueuedJob.Restore(queue, job.Serialize(), True)
1042
    # Calling the processor on a finished job should be a no-op
1043
    self.assertEqual(jqueue._JobProcessor(queue, opexec, job2)(),
1044
                     jqueue._JobProcessor.FINISHED)
1045
    self.assertRaises(IndexError, queue.GetNextUpdate)
1046

    
1047
  def testProcessorOnRunningJob(self):
1048
    ops = [opcodes.OpTestDummy(result="result", fail=False)]
1049

    
1050
    queue = _FakeQueueForProc()
1051
    opexec = _FakeExecOpCodeForProc(queue, None, None)
1052

    
1053
    # Create job
1054
    job = self._CreateJob(queue, 9571, ops)
1055

    
1056
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1057

    
1058
    job.ops[0].status = constants.OP_STATUS_RUNNING
1059

    
1060
    assert len(job.ops) == 1
1061

    
1062
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
1063

    
1064
    # Calling on running job must fail
1065
    self.assertRaises(errors.ProgrammerError,
1066
                      jqueue._JobProcessor(queue, opexec, job))
1067

    
1068
  def testLogMessages(self):
1069
    # Tests the "Feedback" callback function
1070
    queue = _FakeQueueForProc()
1071

    
1072
    messages = {
1073
      1: [
1074
        (None, "Hello"),
1075
        (None, "World"),
1076
        (constants.ELOG_MESSAGE, "there"),
1077
        ],
1078
      4: [
1079
        (constants.ELOG_JQUEUE_TEST, (1, 2, 3)),
1080
        (constants.ELOG_JQUEUE_TEST, ("other", "type")),
1081
        ],
1082
      }
1083
    ops = [opcodes.OpTestDummy(result="Logtest%s" % i, fail=False,
1084
                               messages=messages.get(i, []))
1085
           for i in range(5)]
1086

    
1087
    # Create job
1088
    job = self._CreateJob(queue, 29386, ops)
1089

    
1090
    def _BeforeStart(timeout, priority):
1091
      self.assertEqual(queue.GetNextUpdate(), (job, True))
1092
      self.assertRaises(IndexError, queue.GetNextUpdate)
1093
      self.assertFalse(queue.IsAcquired())
1094
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITING)
1095

    
1096
    def _AfterStart(op, cbs):
1097
      self.assertEqual(queue.GetNextUpdate(), (job, True))
1098
      self.assertRaises(IndexError, queue.GetNextUpdate)
1099
      self.assertFalse(queue.IsAcquired())
1100
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
1101

    
1102
      self.assertRaises(AssertionError, cbs.Feedback,
1103
                        "too", "many", "arguments")
1104

    
1105
      for (log_type, msg) in op.messages:
1106
        self.assertRaises(IndexError, queue.GetNextUpdate)
1107
        if log_type:
1108
          cbs.Feedback(log_type, msg)
1109
        else:
1110
          cbs.Feedback(msg)
1111
        # Check for job update without replication
1112
        self.assertEqual(queue.GetNextUpdate(), (job, False))
1113
        self.assertRaises(IndexError, queue.GetNextUpdate)
1114

    
1115
    opexec = _FakeExecOpCodeForProc(queue, _BeforeStart, _AfterStart)
1116

    
1117
    for remaining in reversed(range(len(job.ops))):
1118
      self.assertRaises(IndexError, queue.GetNextUpdate)
1119
      result = jqueue._JobProcessor(queue, opexec, job)()
1120
      self.assertEqual(queue.GetNextUpdate(), (job, True))
1121
      self.assertRaises(IndexError, queue.GetNextUpdate)
1122

    
1123
      if remaining == 0:
1124
        # Last opcode
1125
        self.assertEqual(result, jqueue._JobProcessor.FINISHED)
1126
        break
1127

    
1128
      self.assertEqual(result, jqueue._JobProcessor.DEFER)
1129

    
1130
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1131

    
1132
    self.assertRaises(IndexError, queue.GetNextUpdate)
1133

    
1134
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_SUCCESS)
1135
    self.assertEqual(job.GetInfo(["opresult"]),
1136
                     [[op.input.result for op in job.ops]])
1137

    
1138
    logmsgcount = sum(len(m) for m in messages.values())
1139

    
1140
    self._CheckLogMessages(job, logmsgcount)
1141

    
1142
    # Serialize and restore (simulates program restart)
1143
    newjob = jqueue._QueuedJob.Restore(queue, job.Serialize(), True)
1144
    self._CheckLogMessages(newjob, logmsgcount)
1145

    
1146
    # Check each message
1147
    prevserial = -1
1148
    for idx, oplog in enumerate(job.GetInfo(["oplog"])[0]):
1149
      for (serial, timestamp, log_type, msg) in oplog:
1150
        (exptype, expmsg) = messages.get(idx).pop(0)
1151
        if exptype:
1152
          self.assertEqual(log_type, exptype)
1153
        else:
1154
          self.assertEqual(log_type, constants.ELOG_MESSAGE)
1155
        self.assertEqual(expmsg, msg)
1156
        self.assert_(serial > prevserial)
1157
        prevserial = serial
1158

    
1159
  def _CheckLogMessages(self, job, count):
1160
    # Check serial
1161
    self.assertEqual(job.log_serial, count)
1162

    
1163
    # No filter
1164
    self.assertEqual(job.GetLogEntries(None),
1165
                     [entry for entries in job.GetInfo(["oplog"])[0] if entries
1166
                      for entry in entries])
1167

    
1168
    # Filter with serial
1169
    assert count > 3
1170
    self.assert_(job.GetLogEntries(3))
1171
    self.assertEqual(job.GetLogEntries(3),
1172
                     [entry for entries in job.GetInfo(["oplog"])[0] if entries
1173
                      for entry in entries][3:])
1174

    
1175
    # No log message after highest serial
1176
    self.assertFalse(job.GetLogEntries(count))
1177
    self.assertFalse(job.GetLogEntries(count + 3))
1178

    
1179
  def testSubmitManyJobs(self):
1180
    queue = _FakeQueueForProc()
1181

    
1182
    job_id = 15656
1183
    ops = [
1184
      opcodes.OpTestDummy(result="Res0", fail=False,
1185
                          submit_jobs=[]),
1186
      opcodes.OpTestDummy(result="Res1", fail=False,
1187
                          submit_jobs=[
1188
                            [opcodes.OpTestDummy(result="r1j0", fail=False)],
1189
                            ]),
1190
      opcodes.OpTestDummy(result="Res2", fail=False,
1191
                          submit_jobs=[
1192
                            [opcodes.OpTestDummy(result="r2j0o0", fail=False),
1193
                             opcodes.OpTestDummy(result="r2j0o1", fail=False),
1194
                             opcodes.OpTestDummy(result="r2j0o2", fail=False),
1195
                             opcodes.OpTestDummy(result="r2j0o3", fail=False)],
1196
                            [opcodes.OpTestDummy(result="r2j1", fail=False)],
1197
                            [opcodes.OpTestDummy(result="r2j3o0", fail=False),
1198
                             opcodes.OpTestDummy(result="r2j3o1", fail=False)],
1199
                            ]),
1200
      ]
1201

    
1202
    # Create job
1203
    job = self._CreateJob(queue, job_id, ops)
1204

    
1205
    def _BeforeStart(timeout, priority):
1206
      self.assertEqual(queue.GetNextUpdate(), (job, True))
1207
      self.assertRaises(IndexError, queue.GetNextUpdate)
1208
      self.assertFalse(queue.IsAcquired())
1209
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITING)
1210
      self.assertFalse(job.cur_opctx)
1211

    
1212
    def _AfterStart(op, cbs):
1213
      self.assertEqual(queue.GetNextUpdate(), (job, True))
1214
      self.assertRaises(IndexError, queue.GetNextUpdate)
1215

    
1216
      self.assertFalse(queue.IsAcquired())
1217
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
1218
      self.assertFalse(job.cur_opctx)
1219

    
1220
      # Job is running, cancelling shouldn't be possible
1221
      (success, _) = job.Cancel()
1222
      self.assertFalse(success)
1223

    
1224
    opexec = _FakeExecOpCodeForProc(queue, _BeforeStart, _AfterStart)
1225

    
1226
    for idx in range(len(ops)):
1227
      self.assertRaises(IndexError, queue.GetNextUpdate)
1228
      result = jqueue._JobProcessor(queue, opexec, job)()
1229
      self.assertEqual(queue.GetNextUpdate(), (job, True))
1230
      self.assertRaises(IndexError, queue.GetNextUpdate)
1231
      if idx == len(ops) - 1:
1232
        # Last opcode
1233
        self.assertEqual(result, jqueue._JobProcessor.FINISHED)
1234
      else:
1235
        self.assertEqual(result, jqueue._JobProcessor.DEFER)
1236

    
1237
        self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1238
        self.assert_(job.start_timestamp)
1239
        self.assertFalse(job.end_timestamp)
1240

    
1241
    self.assertRaises(IndexError, queue.GetNextUpdate)
1242

    
1243
    for idx, submitted_ops in enumerate(job_ops
1244
                                        for op in ops
1245
                                        for job_ops in op.submit_jobs):
1246
      self.assertEqual(queue.GetNextSubmittedJob(),
1247
                       (1000 + idx, submitted_ops))
1248
    self.assertRaises(IndexError, queue.GetNextSubmittedJob)
1249

    
1250
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_SUCCESS)
1251
    self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_SUCCESS])
1252
    self.assertEqual(job.GetInfo(["opresult"]),
1253
                     [[[], [1000], [1001, 1002, 1003]]])
1254
    self.assertEqual(job.GetInfo(["opstatus"]),
1255
                     [len(job.ops) * [constants.OP_STATUS_SUCCESS]])
1256

    
1257
    self._GenericCheckJob(job)
1258

    
1259
    # Calling the processor on a finished job should be a no-op
1260
    self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
1261
                     jqueue._JobProcessor.FINISHED)
1262
    self.assertRaises(IndexError, queue.GetNextUpdate)
1263

    
1264
  def testJobDependency(self):
1265
    depmgr = _FakeDependencyManager()
1266
    queue = _FakeQueueForProc(depmgr=depmgr)
1267

    
1268
    self.assertEqual(queue.depmgr, depmgr)
1269

    
1270
    prev_job_id = 22113
1271
    prev_job_id2 = 28102
1272
    job_id = 29929
1273
    ops = [
1274
      opcodes.OpTestDummy(result="Res0", fail=False,
1275
                          depends=[
1276
                            [prev_job_id2, None],
1277
                            [prev_job_id, None],
1278
                            ]),
1279
      opcodes.OpTestDummy(result="Res1", fail=False),
1280
      ]
1281

    
1282
    # Create job
1283
    job = self._CreateJob(queue, job_id, ops)
1284

    
1285
    def _BeforeStart(timeout, priority):
1286
      if attempt == 0 or attempt > 5:
1287
        # Job should only be updated when it wasn't waiting for another job
1288
        self.assertEqual(queue.GetNextUpdate(), (job, True))
1289
      self.assertRaises(IndexError, queue.GetNextUpdate)
1290
      self.assertFalse(queue.IsAcquired())
1291
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITING)
1292
      self.assertFalse(job.cur_opctx)
1293

    
1294
    def _AfterStart(op, cbs):
1295
      self.assertEqual(queue.GetNextUpdate(), (job, True))
1296
      self.assertRaises(IndexError, queue.GetNextUpdate)
1297

    
1298
      self.assertFalse(queue.IsAcquired())
1299
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
1300
      self.assertFalse(job.cur_opctx)
1301

    
1302
      # Job is running, cancelling shouldn't be possible
1303
      (success, _) = job.Cancel()
1304
      self.assertFalse(success)
1305

    
1306
    opexec = _FakeExecOpCodeForProc(queue, _BeforeStart, _AfterStart)
1307

    
1308
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1309

    
1310
    counter = itertools.count()
1311
    while True:
1312
      attempt = counter.next()
1313

    
1314
      self.assertRaises(IndexError, queue.GetNextUpdate)
1315
      self.assertRaises(IndexError, depmgr.GetNextNotification)
1316

    
1317
      if attempt < 2:
1318
        depmgr.AddCheckResult(job, prev_job_id2, None,
1319
                              (jqueue._JobDependencyManager.WAIT, "wait2"))
1320
      elif attempt == 2:
1321
        depmgr.AddCheckResult(job, prev_job_id2, None,
1322
                              (jqueue._JobDependencyManager.CONTINUE, "cont"))
1323
        # The processor will ask for the next dependency immediately
1324
        depmgr.AddCheckResult(job, prev_job_id, None,
1325
                              (jqueue._JobDependencyManager.WAIT, "wait"))
1326
      elif attempt < 5:
1327
        depmgr.AddCheckResult(job, prev_job_id, None,
1328
                              (jqueue._JobDependencyManager.WAIT, "wait"))
1329
      elif attempt == 5:
1330
        depmgr.AddCheckResult(job, prev_job_id, None,
1331
                              (jqueue._JobDependencyManager.CONTINUE, "cont"))
1332
      if attempt == 2:
1333
        self.assertEqual(depmgr.CountPendingResults(), 2)
1334
      elif attempt > 5:
1335
        self.assertEqual(depmgr.CountPendingResults(), 0)
1336
      else:
1337
        self.assertEqual(depmgr.CountPendingResults(), 1)
1338

    
1339
      result = jqueue._JobProcessor(queue, opexec, job)()
1340
      if attempt == 0 or attempt >= 5:
1341
        # Job should only be updated if there was an actual change
1342
        self.assertEqual(queue.GetNextUpdate(), (job, True))
1343
      self.assertRaises(IndexError, queue.GetNextUpdate)
1344
      self.assertFalse(depmgr.CountPendingResults())
1345

    
1346
      if attempt < 5:
1347
        # Simulate waiting for other job
1348
        self.assertEqual(result, jqueue._JobProcessor.WAITDEP)
1349
        self.assertTrue(job.cur_opctx)
1350
        self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITING)
1351
        self.assertRaises(IndexError, depmgr.GetNextNotification)
1352
        self.assert_(job.start_timestamp)
1353
        self.assertFalse(job.end_timestamp)
1354
        continue
1355

    
1356
      if result == jqueue._JobProcessor.FINISHED:
1357
        # Last opcode
1358
        self.assertFalse(job.cur_opctx)
1359
        break
1360

    
1361
      self.assertRaises(IndexError, depmgr.GetNextNotification)
1362

    
1363
      self.assertEqual(result, jqueue._JobProcessor.DEFER)
1364
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1365
      self.assert_(job.start_timestamp)
1366
      self.assertFalse(job.end_timestamp)
1367

    
1368
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_SUCCESS)
1369
    self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_SUCCESS])
1370
    self.assertEqual(job.GetInfo(["opresult"]),
1371
                     [[op.input.result for op in job.ops]])
1372
    self.assertEqual(job.GetInfo(["opstatus"]),
1373
                     [len(job.ops) * [constants.OP_STATUS_SUCCESS]])
1374
    self.assertTrue(compat.all(op.start_timestamp and op.end_timestamp
1375
                               for op in job.ops))
1376

    
1377
    self._GenericCheckJob(job)
1378

    
1379
    self.assertRaises(IndexError, queue.GetNextUpdate)
1380
    self.assertRaises(IndexError, depmgr.GetNextNotification)
1381
    self.assertFalse(depmgr.CountPendingResults())
1382
    self.assertFalse(depmgr.CountWaitingJobs())
1383

    
1384
    # Calling the processor on a finished job should be a no-op
1385
    self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
1386
                     jqueue._JobProcessor.FINISHED)
1387
    self.assertRaises(IndexError, queue.GetNextUpdate)
1388

    
1389
  def testJobDependencyCancel(self):
1390
    depmgr = _FakeDependencyManager()
1391
    queue = _FakeQueueForProc(depmgr=depmgr)
1392

    
1393
    self.assertEqual(queue.depmgr, depmgr)
1394

    
1395
    prev_job_id = 13623
1396
    job_id = 30876
1397
    ops = [
1398
      opcodes.OpTestDummy(result="Res0", fail=False),
1399
      opcodes.OpTestDummy(result="Res1", fail=False,
1400
                          depends=[
1401
                            [prev_job_id, None],
1402
                            ]),
1403
      opcodes.OpTestDummy(result="Res2", fail=False),
1404
      ]
1405

    
1406
    # Create job
1407
    job = self._CreateJob(queue, job_id, ops)
1408

    
1409
    def _BeforeStart(timeout, priority):
1410
      if attempt == 0 or attempt > 5:
1411
        # Job should only be updated when it wasn't waiting for another job
1412
        self.assertEqual(queue.GetNextUpdate(), (job, True))
1413
      self.assertRaises(IndexError, queue.GetNextUpdate)
1414
      self.assertFalse(queue.IsAcquired())
1415
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITING)
1416
      self.assertFalse(job.cur_opctx)
1417

    
1418
    def _AfterStart(op, cbs):
1419
      self.assertEqual(queue.GetNextUpdate(), (job, True))
1420
      self.assertRaises(IndexError, queue.GetNextUpdate)
1421

    
1422
      self.assertFalse(queue.IsAcquired())
1423
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
1424
      self.assertFalse(job.cur_opctx)
1425

    
1426
      # Job is running, cancelling shouldn't be possible
1427
      (success, _) = job.Cancel()
1428
      self.assertFalse(success)
1429

    
1430
    opexec = _FakeExecOpCodeForProc(queue, _BeforeStart, _AfterStart)
1431

    
1432
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1433

    
1434
    counter = itertools.count()
1435
    while True:
1436
      attempt = counter.next()
1437

    
1438
      self.assertRaises(IndexError, queue.GetNextUpdate)
1439
      self.assertRaises(IndexError, depmgr.GetNextNotification)
1440

    
1441
      if attempt == 0:
1442
        # This will handle the first opcode
1443
        pass
1444
      elif attempt < 4:
1445
        depmgr.AddCheckResult(job, prev_job_id, None,
1446
                              (jqueue._JobDependencyManager.WAIT, "wait"))
1447
      elif attempt == 4:
1448
        # Other job was cancelled
1449
        depmgr.AddCheckResult(job, prev_job_id, None,
1450
                              (jqueue._JobDependencyManager.CANCEL, "cancel"))
1451

    
1452
      if attempt == 0:
1453
        self.assertEqual(depmgr.CountPendingResults(), 0)
1454
      else:
1455
        self.assertEqual(depmgr.CountPendingResults(), 1)
1456

    
1457
      result = jqueue._JobProcessor(queue, opexec, job)()
1458
      if attempt <= 1 or attempt >= 4:
1459
        # Job should only be updated if there was an actual change
1460
        self.assertEqual(queue.GetNextUpdate(), (job, True))
1461
      self.assertRaises(IndexError, queue.GetNextUpdate)
1462
      self.assertFalse(depmgr.CountPendingResults())
1463

    
1464
      if attempt > 0 and attempt < 4:
1465
        # Simulate waiting for other job
1466
        self.assertEqual(result, jqueue._JobProcessor.WAITDEP)
1467
        self.assertTrue(job.cur_opctx)
1468
        self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITING)
1469
        self.assertRaises(IndexError, depmgr.GetNextNotification)
1470
        self.assert_(job.start_timestamp)
1471
        self.assertFalse(job.end_timestamp)
1472
        continue
1473

    
1474
      if result == jqueue._JobProcessor.FINISHED:
1475
        # Last opcode
1476
        self.assertFalse(job.cur_opctx)
1477
        break
1478

    
1479
      self.assertRaises(IndexError, depmgr.GetNextNotification)
1480

    
1481
      self.assertEqual(result, jqueue._JobProcessor.DEFER)
1482
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1483
      self.assert_(job.start_timestamp)
1484
      self.assertFalse(job.end_timestamp)
1485

    
1486
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_CANCELED)
1487
    self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_CANCELED])
1488
    self.assertEqual(job.GetInfo(["opstatus", "opresult"]),
1489
                     [[constants.OP_STATUS_SUCCESS,
1490
                       constants.OP_STATUS_CANCELED,
1491
                       constants.OP_STATUS_CANCELED],
1492
                      ["Res0", "Job canceled by request",
1493
                       "Job canceled by request"]])
1494

    
1495
    self._GenericCheckJob(job)
1496

    
1497
    self.assertRaises(IndexError, queue.GetNextUpdate)
1498
    self.assertRaises(IndexError, depmgr.GetNextNotification)
1499
    self.assertFalse(depmgr.CountPendingResults())
1500

    
1501
    # Calling the processor on a finished job should be a no-op
1502
    self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
1503
                     jqueue._JobProcessor.FINISHED)
1504
    self.assertRaises(IndexError, queue.GetNextUpdate)
1505

    
1506
  def testJobDependencyWrongstatus(self):
1507
    depmgr = _FakeDependencyManager()
1508
    queue = _FakeQueueForProc(depmgr=depmgr)
1509

    
1510
    self.assertEqual(queue.depmgr, depmgr)
1511

    
1512
    prev_job_id = 9741
1513
    job_id = 11763
1514
    ops = [
1515
      opcodes.OpTestDummy(result="Res0", fail=False),
1516
      opcodes.OpTestDummy(result="Res1", fail=False,
1517
                          depends=[
1518
                            [prev_job_id, None],
1519
                            ]),
1520
      opcodes.OpTestDummy(result="Res2", fail=False),
1521
      ]
1522

    
1523
    # Create job
1524
    job = self._CreateJob(queue, job_id, ops)
1525

    
1526
    def _BeforeStart(timeout, priority):
1527
      if attempt == 0 or attempt > 5:
1528
        # Job should only be updated when it wasn't waiting for another job
1529
        self.assertEqual(queue.GetNextUpdate(), (job, True))
1530
      self.assertRaises(IndexError, queue.GetNextUpdate)
1531
      self.assertFalse(queue.IsAcquired())
1532
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITING)
1533
      self.assertFalse(job.cur_opctx)
1534

    
1535
    def _AfterStart(op, cbs):
1536
      self.assertEqual(queue.GetNextUpdate(), (job, True))
1537
      self.assertRaises(IndexError, queue.GetNextUpdate)
1538

    
1539
      self.assertFalse(queue.IsAcquired())
1540
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
1541
      self.assertFalse(job.cur_opctx)
1542

    
1543
      # Job is running, cancelling shouldn't be possible
1544
      (success, _) = job.Cancel()
1545
      self.assertFalse(success)
1546

    
1547
    opexec = _FakeExecOpCodeForProc(queue, _BeforeStart, _AfterStart)
1548

    
1549
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1550

    
1551
    counter = itertools.count()
1552
    while True:
1553
      attempt = counter.next()
1554

    
1555
      self.assertRaises(IndexError, queue.GetNextUpdate)
1556
      self.assertRaises(IndexError, depmgr.GetNextNotification)
1557

    
1558
      if attempt == 0:
1559
        # This will handle the first opcode
1560
        pass
1561
      elif attempt < 4:
1562
        depmgr.AddCheckResult(job, prev_job_id, None,
1563
                              (jqueue._JobDependencyManager.WAIT, "wait"))
1564
      elif attempt == 4:
1565
        # Other job failed
1566
        depmgr.AddCheckResult(job, prev_job_id, None,
1567
                              (jqueue._JobDependencyManager.WRONGSTATUS, "w"))
1568

    
1569
      if attempt == 0:
1570
        self.assertEqual(depmgr.CountPendingResults(), 0)
1571
      else:
1572
        self.assertEqual(depmgr.CountPendingResults(), 1)
1573

    
1574
      result = jqueue._JobProcessor(queue, opexec, job)()
1575
      if attempt <= 1 or attempt >= 4:
1576
        # Job should only be updated if there was an actual change
1577
        self.assertEqual(queue.GetNextUpdate(), (job, True))
1578
      self.assertRaises(IndexError, queue.GetNextUpdate)
1579
      self.assertFalse(depmgr.CountPendingResults())
1580

    
1581
      if attempt > 0 and attempt < 4:
1582
        # Simulate waiting for other job
1583
        self.assertEqual(result, jqueue._JobProcessor.WAITDEP)
1584
        self.assertTrue(job.cur_opctx)
1585
        self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITING)
1586
        self.assertRaises(IndexError, depmgr.GetNextNotification)
1587
        self.assert_(job.start_timestamp)
1588
        self.assertFalse(job.end_timestamp)
1589
        continue
1590

    
1591
      if result == jqueue._JobProcessor.FINISHED:
1592
        # Last opcode
1593
        self.assertFalse(job.cur_opctx)
1594
        break
1595

    
1596
      self.assertRaises(IndexError, depmgr.GetNextNotification)
1597

    
1598
      self.assertEqual(result, jqueue._JobProcessor.DEFER)
1599
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1600
      self.assert_(job.start_timestamp)
1601
      self.assertFalse(job.end_timestamp)
1602

    
1603
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_ERROR)
1604
    self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_ERROR])
1605
    self.assertEqual(job.GetInfo(["opstatus"]),
1606
                     [[constants.OP_STATUS_SUCCESS,
1607
                       constants.OP_STATUS_ERROR,
1608
                       constants.OP_STATUS_ERROR]]),
1609

    
1610
    (opresult, ) = job.GetInfo(["opresult"])
1611
    self.assertEqual(len(opresult), len(ops))
1612
    self.assertEqual(opresult[0], "Res0")
1613
    self.assertTrue(errors.GetEncodedError(opresult[1]))
1614
    self.assertTrue(errors.GetEncodedError(opresult[2]))
1615

    
1616
    self._GenericCheckJob(job)
1617

    
1618
    self.assertRaises(IndexError, queue.GetNextUpdate)
1619
    self.assertRaises(IndexError, depmgr.GetNextNotification)
1620
    self.assertFalse(depmgr.CountPendingResults())
1621

    
1622
    # Calling the processor on a finished job should be a no-op
1623
    self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
1624
                     jqueue._JobProcessor.FINISHED)
1625
    self.assertRaises(IndexError, queue.GetNextUpdate)
1626

    
1627

    
1628
class _FakeTimeoutStrategy:
1629
  def __init__(self, timeouts):
1630
    self.timeouts = timeouts
1631
    self.attempts = 0
1632
    self.last_timeout = None
1633

    
1634
  def NextAttempt(self):
1635
    self.attempts += 1
1636
    if self.timeouts:
1637
      timeout = self.timeouts.pop(0)
1638
    else:
1639
      timeout = None
1640
    self.last_timeout = timeout
1641
    return timeout
1642

    
1643

    
1644
class TestJobProcessorTimeouts(unittest.TestCase, _JobProcessorTestUtils):
1645
  def setUp(self):
1646
    self.queue = _FakeQueueForProc()
1647
    self.job = None
1648
    self.curop = None
1649
    self.opcounter = None
1650
    self.timeout_strategy = None
1651
    self.retries = 0
1652
    self.prev_tsop = None
1653
    self.prev_prio = None
1654
    self.prev_status = None
1655
    self.lock_acq_prio = None
1656
    self.gave_lock = None
1657
    self.done_lock_before_blocking = False
1658

    
1659
  def _BeforeStart(self, timeout, priority):
1660
    job = self.job
1661

    
1662
    # If status has changed, job must've been written
1663
    if self.prev_status != self.job.ops[self.curop].status:
1664
      self.assertEqual(self.queue.GetNextUpdate(), (job, True))
1665
    self.assertRaises(IndexError, self.queue.GetNextUpdate)
1666

    
1667
    self.assertFalse(self.queue.IsAcquired())
1668
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITING)
1669

    
1670
    ts = self.timeout_strategy
1671

    
1672
    self.assert_(timeout is None or isinstance(timeout, (int, float)))
1673
    self.assertEqual(timeout, ts.last_timeout)
1674
    self.assertEqual(priority, job.ops[self.curop].priority)
1675

    
1676
    self.gave_lock = True
1677
    self.lock_acq_prio = priority
1678

    
1679
    if (self.curop == 3 and
1680
        job.ops[self.curop].priority == constants.OP_PRIO_HIGHEST + 3):
1681
      # Give locks before running into blocking acquire
1682
      assert self.retries == 7
1683
      self.retries = 0
1684
      self.done_lock_before_blocking = True
1685
      return
1686

    
1687
    if self.retries > 0:
1688
      self.assert_(timeout is not None)
1689
      self.retries -= 1
1690
      self.gave_lock = False
1691
      raise mcpu.LockAcquireTimeout()
1692

    
1693
    if job.ops[self.curop].priority == constants.OP_PRIO_HIGHEST:
1694
      assert self.retries == 0, "Didn't exhaust all retries at highest priority"
1695
      assert not ts.timeouts
1696
      self.assert_(timeout is None)
1697

    
1698
  def _AfterStart(self, op, cbs):
1699
    job = self.job
1700

    
1701
    # Setting to "running" requires an update
1702
    self.assertEqual(self.queue.GetNextUpdate(), (job, True))
1703
    self.assertRaises(IndexError, self.queue.GetNextUpdate)
1704

    
1705
    self.assertFalse(self.queue.IsAcquired())
1706
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
1707

    
1708
    # Job is running, cancelling shouldn't be possible
1709
    (success, _) = job.Cancel()
1710
    self.assertFalse(success)
1711

    
1712
  def _NextOpcode(self):
1713
    self.curop = self.opcounter.next()
1714
    self.prev_prio = self.job.ops[self.curop].priority
1715
    self.prev_status = self.job.ops[self.curop].status
1716

    
1717
  def _NewTimeoutStrategy(self):
1718
    job = self.job
1719

    
1720
    self.assertEqual(self.retries, 0)
1721

    
1722
    if self.prev_tsop == self.curop:
1723
      # Still on the same opcode, priority must've been increased
1724
      self.assertEqual(self.prev_prio, job.ops[self.curop].priority + 1)
1725

    
1726
    if self.curop == 1:
1727
      # Normal retry
1728
      timeouts = range(10, 31, 10)
1729
      self.retries = len(timeouts) - 1
1730

    
1731
    elif self.curop == 2:
1732
      # Let this run into a blocking acquire
1733
      timeouts = range(11, 61, 12)
1734
      self.retries = len(timeouts)
1735

    
1736
    elif self.curop == 3:
1737
      # Wait for priority to increase, but give lock before blocking acquire
1738
      timeouts = range(12, 100, 14)
1739
      self.retries = len(timeouts)
1740

    
1741
      self.assertFalse(self.done_lock_before_blocking)
1742

    
1743
    elif self.curop == 4:
1744
      self.assert_(self.done_lock_before_blocking)
1745

    
1746
      # Timeouts, but no need to retry
1747
      timeouts = range(10, 31, 10)
1748
      self.retries = 0
1749

    
1750
    elif self.curop == 5:
1751
      # Normal retry
1752
      timeouts = range(19, 100, 11)
1753
      self.retries = len(timeouts)
1754

    
1755
    else:
1756
      timeouts = []
1757
      self.retries = 0
1758

    
1759
    assert len(job.ops) == 10
1760
    assert self.retries <= len(timeouts)
1761

    
1762
    ts = _FakeTimeoutStrategy(timeouts)
1763

    
1764
    self.timeout_strategy = ts
1765
    self.prev_tsop = self.curop
1766
    self.prev_prio = job.ops[self.curop].priority
1767

    
1768
    return ts
1769

    
1770
  def testTimeout(self):
1771
    ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
1772
           for i in range(10)]
1773

    
1774
    # Create job
1775
    job_id = 15801
1776
    job = self._CreateJob(self.queue, job_id, ops)
1777
    self.job = job
1778

    
1779
    self.opcounter = itertools.count(0)
1780

    
1781
    opexec = _FakeExecOpCodeForProc(self.queue, self._BeforeStart,
1782
                                    self._AfterStart)
1783
    tsf = self._NewTimeoutStrategy
1784

    
1785
    self.assertFalse(self.done_lock_before_blocking)
1786

    
1787
    while True:
1788
      proc = jqueue._JobProcessor(self.queue, opexec, job,
1789
                                  _timeout_strategy_factory=tsf)
1790

    
1791
      self.assertRaises(IndexError, self.queue.GetNextUpdate)
1792

    
1793
      if self.curop is not None:
1794
        self.prev_status = self.job.ops[self.curop].status
1795

    
1796
      self.lock_acq_prio = None
1797

    
1798
      result = proc(_nextop_fn=self._NextOpcode)
1799
      assert self.curop is not None
1800

    
1801
      if result == jqueue._JobProcessor.FINISHED or self.gave_lock:
1802
        # Got lock and/or job is done, result must've been written
1803
        self.assertFalse(job.cur_opctx)
1804
        self.assertEqual(self.queue.GetNextUpdate(), (job, True))
1805
        self.assertRaises(IndexError, self.queue.GetNextUpdate)
1806
        self.assertEqual(self.lock_acq_prio, job.ops[self.curop].priority)
1807
        self.assert_(job.ops[self.curop].exec_timestamp)
1808

    
1809
      if result == jqueue._JobProcessor.FINISHED:
1810
        self.assertFalse(job.cur_opctx)
1811
        break
1812

    
1813
      self.assertEqual(result, jqueue._JobProcessor.DEFER)
1814

    
1815
      if self.curop == 0:
1816
        self.assertEqual(job.ops[self.curop].start_timestamp,
1817
                         job.start_timestamp)
1818

    
1819
      if self.gave_lock:
1820
        # Opcode finished, but job not yet done
1821
        self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1822
      else:
1823
        # Did not get locks
1824
        self.assert_(job.cur_opctx)
1825
        self.assertEqual(job.cur_opctx._timeout_strategy._fn,
1826
                         self.timeout_strategy.NextAttempt)
1827
        self.assertFalse(job.ops[self.curop].exec_timestamp)
1828
        self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITING)
1829

    
1830
        # If priority has changed since acquiring locks, the job must've been
1831
        # updated
1832
        if self.lock_acq_prio != job.ops[self.curop].priority:
1833
          self.assertEqual(self.queue.GetNextUpdate(), (job, True))
1834

    
1835
      self.assertRaises(IndexError, self.queue.GetNextUpdate)
1836

    
1837
      self.assert_(job.start_timestamp)
1838
      self.assertFalse(job.end_timestamp)
1839

    
1840
    self.assertEqual(self.curop, len(job.ops) - 1)
1841
    self.assertEqual(self.job, job)
1842
    self.assertEqual(self.opcounter.next(), len(job.ops))
1843
    self.assert_(self.done_lock_before_blocking)
1844

    
1845
    self.assertRaises(IndexError, self.queue.GetNextUpdate)
1846
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_SUCCESS)
1847
    self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_SUCCESS])
1848
    self.assertEqual(job.GetInfo(["opresult"]),
1849
                     [[op.input.result for op in job.ops]])
1850
    self.assertEqual(job.GetInfo(["opstatus"]),
1851
                     [len(job.ops) * [constants.OP_STATUS_SUCCESS]])
1852
    self.assert_(compat.all(op.start_timestamp and op.end_timestamp
1853
                            for op in job.ops))
1854

    
1855
    # Calling the processor on a finished job should be a no-op
1856
    self.assertEqual(jqueue._JobProcessor(self.queue, opexec, job)(),
1857
                     jqueue._JobProcessor.FINISHED)
1858
    self.assertRaises(IndexError, self.queue.GetNextUpdate)
1859

    
1860

    
1861
class TestJobDependencyManager(unittest.TestCase):
1862
  class _FakeJob:
1863
    def __init__(self, job_id):
1864
      self.id = str(job_id)
1865

    
1866
  def setUp(self):
1867
    self._status = []
1868
    self._queue = []
1869
    self.jdm = jqueue._JobDependencyManager(self._GetStatus, self._Enqueue)
1870

    
1871
  def _GetStatus(self, job_id):
1872
    (exp_job_id, result) = self._status.pop(0)
1873
    self.assertEqual(exp_job_id, job_id)
1874
    return result
1875

    
1876
  def _Enqueue(self, jobs):
1877
    self._queue.append(jobs)
1878

    
1879
  def testNotFinalizedThenCancel(self):
1880
    job = self._FakeJob(17697)
1881
    job_id = str(28625)
1882

    
1883
    self._status.append((job_id, constants.JOB_STATUS_RUNNING))
1884
    (result, _) = self.jdm.CheckAndRegister(job, job_id, [])
1885
    self.assertEqual(result, self.jdm.WAIT)
1886
    self.assertFalse(self._status)
1887
    self.assertFalse(self._queue)
1888
    self.assertTrue(self.jdm.JobWaiting(job))
1889
    self.assertEqual(self.jdm._waiters, {
1890
      job_id: set([job]),
1891
      })
1892
    self.assertEqual(self.jdm.GetLockInfo([query.LQ_PENDING]), [
1893
      ("job/28625", None, None, [("job", [job.id])])
1894
      ])
1895

    
1896
    self._status.append((job_id, constants.JOB_STATUS_CANCELED))
1897
    (result, _) = self.jdm.CheckAndRegister(job, job_id, [])
1898
    self.assertEqual(result, self.jdm.CANCEL)
1899
    self.assertFalse(self._status)
1900
    self.assertFalse(self._queue)
1901
    self.assertFalse(self.jdm.JobWaiting(job))
1902
    self.assertFalse(self.jdm.GetLockInfo([query.LQ_PENDING]))
1903

    
1904
  def testRequireCancel(self):
1905
    job = self._FakeJob(5278)
1906
    job_id = str(9610)
1907
    dep_status = [constants.JOB_STATUS_CANCELED]
1908

    
1909
    self._status.append((job_id, constants.JOB_STATUS_WAITING))
1910
    (result, _) = self.jdm.CheckAndRegister(job, job_id, dep_status)
1911
    self.assertEqual(result, self.jdm.WAIT)
1912
    self.assertFalse(self._status)
1913
    self.assertFalse(self._queue)
1914
    self.assertTrue(self.jdm.JobWaiting(job))
1915
    self.assertEqual(self.jdm._waiters, {
1916
      job_id: set([job]),
1917
      })
1918
    self.assertEqual(self.jdm.GetLockInfo([query.LQ_PENDING]), [
1919
      ("job/9610", None, None, [("job", [job.id])])
1920
      ])
1921

    
1922
    self._status.append((job_id, constants.JOB_STATUS_CANCELED))
1923
    (result, _) = self.jdm.CheckAndRegister(job, job_id, dep_status)
1924
    self.assertEqual(result, self.jdm.CONTINUE)
1925
    self.assertFalse(self._status)
1926
    self.assertFalse(self._queue)
1927
    self.assertFalse(self.jdm.JobWaiting(job))
1928
    self.assertFalse(self.jdm.GetLockInfo([query.LQ_PENDING]))
1929

    
1930
  def testRequireError(self):
1931
    job = self._FakeJob(21459)
1932
    job_id = str(25519)
1933
    dep_status = [constants.JOB_STATUS_ERROR]
1934

    
1935
    self._status.append((job_id, constants.JOB_STATUS_WAITING))
1936
    (result, _) = self.jdm.CheckAndRegister(job, job_id, dep_status)
1937
    self.assertEqual(result, self.jdm.WAIT)
1938
    self.assertFalse(self._status)
1939
    self.assertFalse(self._queue)
1940
    self.assertTrue(self.jdm.JobWaiting(job))
1941
    self.assertEqual(self.jdm._waiters, {
1942
      job_id: set([job]),
1943
      })
1944

    
1945
    self._status.append((job_id, constants.JOB_STATUS_ERROR))
1946
    (result, _) = self.jdm.CheckAndRegister(job, job_id, dep_status)
1947
    self.assertEqual(result, self.jdm.CONTINUE)
1948
    self.assertFalse(self._status)
1949
    self.assertFalse(self._queue)
1950
    self.assertFalse(self.jdm.JobWaiting(job))
1951
    self.assertFalse(self.jdm.GetLockInfo([query.LQ_PENDING]))
1952

    
1953
  def testRequireMultiple(self):
1954
    dep_status = list(constants.JOBS_FINALIZED)
1955

    
1956
    for end_status in dep_status:
1957
      job = self._FakeJob(21343)
1958
      job_id = str(14609)
1959

    
1960
      self._status.append((job_id, constants.JOB_STATUS_WAITING))
1961
      (result, _) = self.jdm.CheckAndRegister(job, job_id, dep_status)
1962
      self.assertEqual(result, self.jdm.WAIT)
1963
      self.assertFalse(self._status)
1964
      self.assertFalse(self._queue)
1965
      self.assertTrue(self.jdm.JobWaiting(job))
1966
      self.assertEqual(self.jdm._waiters, {
1967
        job_id: set([job]),
1968
        })
1969
      self.assertEqual(self.jdm.GetLockInfo([query.LQ_PENDING]), [
1970
        ("job/14609", None, None, [("job", [job.id])])
1971
        ])
1972

    
1973
      self._status.append((job_id, end_status))
1974
      (result, _) = self.jdm.CheckAndRegister(job, job_id, dep_status)
1975
      self.assertEqual(result, self.jdm.CONTINUE)
1976
      self.assertFalse(self._status)
1977
      self.assertFalse(self._queue)
1978
      self.assertFalse(self.jdm.JobWaiting(job))
1979
      self.assertFalse(self.jdm.GetLockInfo([query.LQ_PENDING]))
1980

    
1981
  def testNotify(self):
1982
    job = self._FakeJob(8227)
1983
    job_id = str(4113)
1984

    
1985
    self._status.append((job_id, constants.JOB_STATUS_RUNNING))
1986
    (result, _) = self.jdm.CheckAndRegister(job, job_id, [])
1987
    self.assertEqual(result, self.jdm.WAIT)
1988
    self.assertFalse(self._status)
1989
    self.assertFalse(self._queue)
1990
    self.assertTrue(self.jdm.JobWaiting(job))
1991
    self.assertEqual(self.jdm._waiters, {
1992
      job_id: set([job]),
1993
      })
1994

    
1995
    self.jdm.NotifyWaiters(job_id)
1996
    self.assertFalse(self._status)
1997
    self.assertFalse(self.jdm._waiters)
1998
    self.assertFalse(self.jdm.JobWaiting(job))
1999
    self.assertEqual(self._queue, [set([job])])
2000

    
2001
  def testWrongStatus(self):
2002
    job = self._FakeJob(10102)
2003
    job_id = str(1271)
2004

    
2005
    self._status.append((job_id, constants.JOB_STATUS_QUEUED))
2006
    (result, _) = self.jdm.CheckAndRegister(job, job_id,
2007
                                            [constants.JOB_STATUS_SUCCESS])
2008
    self.assertEqual(result, self.jdm.WAIT)
2009
    self.assertFalse(self._status)
2010
    self.assertFalse(self._queue)
2011
    self.assertTrue(self.jdm.JobWaiting(job))
2012
    self.assertEqual(self.jdm._waiters, {
2013
      job_id: set([job]),
2014
      })
2015

    
2016
    self._status.append((job_id, constants.JOB_STATUS_ERROR))
2017
    (result, _) = self.jdm.CheckAndRegister(job, job_id,
2018
                                            [constants.JOB_STATUS_SUCCESS])
2019
    self.assertEqual(result, self.jdm.WRONGSTATUS)
2020
    self.assertFalse(self._status)
2021
    self.assertFalse(self._queue)
2022
    self.assertFalse(self.jdm.JobWaiting(job))
2023

    
2024
  def testCorrectStatus(self):
2025
    job = self._FakeJob(24273)
2026
    job_id = str(23885)
2027

    
2028
    self._status.append((job_id, constants.JOB_STATUS_QUEUED))
2029
    (result, _) = self.jdm.CheckAndRegister(job, job_id,
2030
                                            [constants.JOB_STATUS_SUCCESS])
2031
    self.assertEqual(result, self.jdm.WAIT)
2032
    self.assertFalse(self._status)
2033
    self.assertFalse(self._queue)
2034
    self.assertTrue(self.jdm.JobWaiting(job))
2035
    self.assertEqual(self.jdm._waiters, {
2036
      job_id: set([job]),
2037
      })
2038

    
2039
    self._status.append((job_id, constants.JOB_STATUS_SUCCESS))
2040
    (result, _) = self.jdm.CheckAndRegister(job, job_id,
2041
                                            [constants.JOB_STATUS_SUCCESS])
2042
    self.assertEqual(result, self.jdm.CONTINUE)
2043
    self.assertFalse(self._status)
2044
    self.assertFalse(self._queue)
2045
    self.assertFalse(self.jdm.JobWaiting(job))
2046

    
2047
  def testFinalizedRightAway(self):
2048
    job = self._FakeJob(224)
2049
    job_id = str(3081)
2050

    
2051
    self._status.append((job_id, constants.JOB_STATUS_SUCCESS))
2052
    (result, _) = self.jdm.CheckAndRegister(job, job_id,
2053
                                            [constants.JOB_STATUS_SUCCESS])
2054
    self.assertEqual(result, self.jdm.CONTINUE)
2055
    self.assertFalse(self._status)
2056
    self.assertFalse(self._queue)
2057
    self.assertFalse(self.jdm.JobWaiting(job))
2058
    self.assertEqual(self.jdm._waiters, {
2059
      job_id: set(),
2060
      })
2061

    
2062
    # Force cleanup
2063
    self.jdm.NotifyWaiters("0")
2064
    self.assertFalse(self.jdm._waiters)
2065
    self.assertFalse(self._status)
2066
    self.assertFalse(self._queue)
2067

    
2068
  def testMultipleWaiting(self):
2069
    # Use a deterministic random generator
2070
    rnd = random.Random(21402)
2071

    
2072
    job_ids = map(str, rnd.sample(range(1, 10000), 150))
2073

    
2074
    waiters = dict((job_ids.pop(),
2075
                    set(map(self._FakeJob,
2076
                            [job_ids.pop()
2077
                             for _ in range(rnd.randint(1, 20))])))
2078
                   for _ in range(10))
2079

    
2080
    # Ensure there are no duplicate job IDs
2081
    assert not utils.FindDuplicates(waiters.keys() +
2082
                                    [job.id
2083
                                     for jobs in waiters.values()
2084
                                     for job in jobs])
2085

    
2086
    # Register all jobs as waiters
2087
    for job_id, job in [(job_id, job)
2088
                        for (job_id, jobs) in waiters.items()
2089
                        for job in jobs]:
2090
      self._status.append((job_id, constants.JOB_STATUS_QUEUED))
2091
      (result, _) = self.jdm.CheckAndRegister(job, job_id,
2092
                                              [constants.JOB_STATUS_SUCCESS])
2093
      self.assertEqual(result, self.jdm.WAIT)
2094
      self.assertFalse(self._status)
2095
      self.assertFalse(self._queue)
2096
      self.assertTrue(self.jdm.JobWaiting(job))
2097

    
2098
    self.assertEqual(self.jdm._waiters, waiters)
2099

    
2100
    def _MakeSet((name, mode, owner_names, pending)):
2101
      return (name, mode, owner_names,
2102
              [(pendmode, set(pend)) for (pendmode, pend) in pending])
2103

    
2104
    def _CheckLockInfo():
2105
      info = self.jdm.GetLockInfo([query.LQ_PENDING])
2106
      self.assertEqual(sorted(map(_MakeSet, info)), sorted([
2107
        ("job/%s" % job_id, None, None,
2108
         [("job", set([job.id for job in jobs]))])
2109
        for job_id, jobs in waiters.items()
2110
        if jobs
2111
        ]))
2112

    
2113
    _CheckLockInfo()
2114

    
2115
    # Notify in random order
2116
    for job_id in rnd.sample(waiters, len(waiters)):
2117
      # Remove from pending waiter list
2118
      jobs = waiters.pop(job_id)
2119
      for job in jobs:
2120
        self._status.append((job_id, constants.JOB_STATUS_SUCCESS))
2121
        (result, _) = self.jdm.CheckAndRegister(job, job_id,
2122
                                                [constants.JOB_STATUS_SUCCESS])
2123
        self.assertEqual(result, self.jdm.CONTINUE)
2124
        self.assertFalse(self._status)
2125
        self.assertFalse(self._queue)
2126
        self.assertFalse(self.jdm.JobWaiting(job))
2127

    
2128
      _CheckLockInfo()
2129

    
2130
    self.assertFalse(self.jdm.GetLockInfo([query.LQ_PENDING]))
2131

    
2132
    assert not waiters
2133

    
2134
  def testSelfDependency(self):
2135
    job = self._FakeJob(18937)
2136

    
2137
    self._status.append((job.id, constants.JOB_STATUS_SUCCESS))
2138
    (result, _) = self.jdm.CheckAndRegister(job, job.id, [])
2139
    self.assertEqual(result, self.jdm.ERROR)
2140

    
2141
  def testJobDisappears(self):
2142
    job = self._FakeJob(30540)
2143
    job_id = str(23769)
2144

    
2145
    def _FakeStatus(_):
2146
      raise errors.JobLost("#msg#")
2147

    
2148
    jdm = jqueue._JobDependencyManager(_FakeStatus, None)
2149
    (result, _) = jdm.CheckAndRegister(job, job_id, [])
2150
    self.assertEqual(result, self.jdm.ERROR)
2151
    self.assertFalse(jdm.JobWaiting(job))
2152
    self.assertFalse(jdm.GetLockInfo([query.LQ_PENDING]))
2153

    
2154

    
2155
if __name__ == "__main__":
2156
  testutils.GanetiTestProgram()