Statistics
| Branch: | Tag: | Revision:

root / test / ganeti.jqueue_unittest.py @ b95479a5

History | View | Annotate | Download (69.5 kB)

1
#!/usr/bin/python
2
#
3

    
4
# Copyright (C) 2010, 2011 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Script for testing ganeti.jqueue"""
23

    
24
import os
25
import sys
26
import unittest
27
import tempfile
28
import shutil
29
import errno
30
import itertools
31

    
32
from ganeti import constants
33
from ganeti import utils
34
from ganeti import errors
35
from ganeti import jqueue
36
from ganeti import opcodes
37
from ganeti import compat
38
from ganeti import mcpu
39

    
40
import testutils
41

    
42

    
43
class _FakeJob:
44
  def __init__(self, job_id, status):
45
    self.id = job_id
46
    self._status = status
47
    self._log = []
48

    
49
  def SetStatus(self, status):
50
    self._status = status
51

    
52
  def AddLogEntry(self, msg):
53
    self._log.append((len(self._log), msg))
54

    
55
  def CalcStatus(self):
56
    return self._status
57

    
58
  def GetInfo(self, fields):
59
    result = []
60

    
61
    for name in fields:
62
      if name == "status":
63
        result.append(self._status)
64
      else:
65
        raise Exception("Unknown field")
66

    
67
    return result
68

    
69
  def GetLogEntries(self, newer_than):
70
    assert newer_than is None or newer_than >= 0
71

    
72
    if newer_than is None:
73
      return self._log
74

    
75
    return self._log[newer_than:]
76

    
77

    
78
class TestJobChangesChecker(unittest.TestCase):
79
  def testStatus(self):
80
    job = _FakeJob(9094, constants.JOB_STATUS_QUEUED)
81
    checker = jqueue._JobChangesChecker(["status"], None, None)
82
    self.assertEqual(checker(job), ([constants.JOB_STATUS_QUEUED], []))
83

    
84
    job.SetStatus(constants.JOB_STATUS_RUNNING)
85
    self.assertEqual(checker(job), ([constants.JOB_STATUS_RUNNING], []))
86

    
87
    job.SetStatus(constants.JOB_STATUS_SUCCESS)
88
    self.assertEqual(checker(job), ([constants.JOB_STATUS_SUCCESS], []))
89

    
90
    # job.id is used by checker
91
    self.assertEqual(job.id, 9094)
92

    
93
  def testStatusWithPrev(self):
94
    job = _FakeJob(12807, constants.JOB_STATUS_QUEUED)
95
    checker = jqueue._JobChangesChecker(["status"],
96
                                        [constants.JOB_STATUS_QUEUED], None)
97
    self.assert_(checker(job) is None)
98

    
99
    job.SetStatus(constants.JOB_STATUS_RUNNING)
100
    self.assertEqual(checker(job), ([constants.JOB_STATUS_RUNNING], []))
101

    
102
  def testFinalStatus(self):
103
    for status in constants.JOBS_FINALIZED:
104
      job = _FakeJob(2178711, status)
105
      checker = jqueue._JobChangesChecker(["status"], [status], None)
106
      # There won't be any changes in this status, hence it should signal
107
      # a change immediately
108
      self.assertEqual(checker(job), ([status], []))
109

    
110
  def testLog(self):
111
    job = _FakeJob(9094, constants.JOB_STATUS_RUNNING)
112
    checker = jqueue._JobChangesChecker(["status"], None, None)
113
    self.assertEqual(checker(job), ([constants.JOB_STATUS_RUNNING], []))
114

    
115
    job.AddLogEntry("Hello World")
116
    (job_info, log_entries) = checker(job)
117
    self.assertEqual(job_info, [constants.JOB_STATUS_RUNNING])
118
    self.assertEqual(log_entries, [[0, "Hello World"]])
119

    
120
    checker2 = jqueue._JobChangesChecker(["status"], job_info, len(log_entries))
121
    self.assert_(checker2(job) is None)
122

    
123
    job.AddLogEntry("Foo Bar")
124
    job.SetStatus(constants.JOB_STATUS_ERROR)
125

    
126
    (job_info, log_entries) = checker2(job)
127
    self.assertEqual(job_info, [constants.JOB_STATUS_ERROR])
128
    self.assertEqual(log_entries, [[1, "Foo Bar"]])
129

    
130
    checker3 = jqueue._JobChangesChecker(["status"], None, None)
131
    (job_info, log_entries) = checker3(job)
132
    self.assertEqual(job_info, [constants.JOB_STATUS_ERROR])
133
    self.assertEqual(log_entries, [[0, "Hello World"], [1, "Foo Bar"]])
134

    
135

    
136
class TestJobChangesWaiter(unittest.TestCase):
137
  def setUp(self):
138
    self.tmpdir = tempfile.mkdtemp()
139
    self.filename = utils.PathJoin(self.tmpdir, "job-1")
140
    utils.WriteFile(self.filename, data="")
141

    
142
  def tearDown(self):
143
    shutil.rmtree(self.tmpdir)
144

    
145
  def _EnsureNotifierClosed(self, notifier):
146
    try:
147
      os.fstat(notifier._fd)
148
    except EnvironmentError, err:
149
      self.assertEqual(err.errno, errno.EBADF)
150
    else:
151
      self.fail("File descriptor wasn't closed")
152

    
153
  def testClose(self):
154
    for wait in [False, True]:
155
      waiter = jqueue._JobFileChangesWaiter(self.filename)
156
      try:
157
        if wait:
158
          waiter.Wait(0.001)
159
      finally:
160
        waiter.Close()
161

    
162
      # Ensure file descriptor was closed
163
      self._EnsureNotifierClosed(waiter._notifier)
164

    
165
  def testChangingFile(self):
166
    waiter = jqueue._JobFileChangesWaiter(self.filename)
167
    try:
168
      self.assertFalse(waiter.Wait(0.1))
169
      utils.WriteFile(self.filename, data="changed")
170
      self.assert_(waiter.Wait(60))
171
    finally:
172
      waiter.Close()
173

    
174
    self._EnsureNotifierClosed(waiter._notifier)
175

    
176
  def testChangingFile2(self):
177
    waiter = jqueue._JobChangesWaiter(self.filename)
178
    try:
179
      self.assertFalse(waiter._filewaiter)
180
      self.assert_(waiter.Wait(0.1))
181
      self.assert_(waiter._filewaiter)
182

    
183
      # File waiter is now used, but there have been no changes
184
      self.assertFalse(waiter.Wait(0.1))
185
      utils.WriteFile(self.filename, data="changed")
186
      self.assert_(waiter.Wait(60))
187
    finally:
188
      waiter.Close()
189

    
190
    self._EnsureNotifierClosed(waiter._filewaiter._notifier)
191

    
192

    
193
class TestWaitForJobChangesHelper(unittest.TestCase):
194
  def setUp(self):
195
    self.tmpdir = tempfile.mkdtemp()
196
    self.filename = utils.PathJoin(self.tmpdir, "job-2614226563")
197
    utils.WriteFile(self.filename, data="")
198

    
199
  def tearDown(self):
200
    shutil.rmtree(self.tmpdir)
201

    
202
  def _LoadWaitingJob(self):
203
    return _FakeJob(2614226563, constants.JOB_STATUS_WAITLOCK)
204

    
205
  def _LoadLostJob(self):
206
    return None
207

    
208
  def testNoChanges(self):
209
    wfjc = jqueue._WaitForJobChangesHelper()
210

    
211
    # No change
212
    self.assertEqual(wfjc(self.filename, self._LoadWaitingJob, ["status"],
213
                          [constants.JOB_STATUS_WAITLOCK], None, 0.1),
214
                     constants.JOB_NOTCHANGED)
215

    
216
    # No previous information
217
    self.assertEqual(wfjc(self.filename, self._LoadWaitingJob,
218
                          ["status"], None, None, 1.0),
219
                     ([constants.JOB_STATUS_WAITLOCK], []))
220

    
221
  def testLostJob(self):
222
    wfjc = jqueue._WaitForJobChangesHelper()
223
    self.assert_(wfjc(self.filename, self._LoadLostJob,
224
                      ["status"], None, None, 1.0) is None)
225

    
226

    
227
class TestEncodeOpError(unittest.TestCase):
228
  def test(self):
229
    encerr = jqueue._EncodeOpError(errors.LockError("Test 1"))
230
    self.assert_(isinstance(encerr, tuple))
231
    self.assertRaises(errors.LockError, errors.MaybeRaise, encerr)
232

    
233
    encerr = jqueue._EncodeOpError(errors.GenericError("Test 2"))
234
    self.assert_(isinstance(encerr, tuple))
235
    self.assertRaises(errors.GenericError, errors.MaybeRaise, encerr)
236

    
237
    encerr = jqueue._EncodeOpError(NotImplementedError("Foo"))
238
    self.assert_(isinstance(encerr, tuple))
239
    self.assertRaises(errors.OpExecError, errors.MaybeRaise, encerr)
240

    
241
    encerr = jqueue._EncodeOpError("Hello World")
242
    self.assert_(isinstance(encerr, tuple))
243
    self.assertRaises(errors.OpExecError, errors.MaybeRaise, encerr)
244

    
245

    
246
class TestQueuedOpCode(unittest.TestCase):
247
  def testDefaults(self):
248
    def _Check(op):
249
      self.assertFalse(hasattr(op.input, "dry_run"))
250
      self.assertEqual(op.priority, constants.OP_PRIO_DEFAULT)
251
      self.assertFalse(op.log)
252
      self.assert_(op.start_timestamp is None)
253
      self.assert_(op.exec_timestamp is None)
254
      self.assert_(op.end_timestamp is None)
255
      self.assert_(op.result is None)
256
      self.assertEqual(op.status, constants.OP_STATUS_QUEUED)
257

    
258
    op1 = jqueue._QueuedOpCode(opcodes.OpTestDelay())
259
    _Check(op1)
260
    op2 = jqueue._QueuedOpCode.Restore(op1.Serialize())
261
    _Check(op2)
262
    self.assertEqual(op1.Serialize(), op2.Serialize())
263

    
264
  def testPriority(self):
265
    def _Check(op):
266
      assert constants.OP_PRIO_DEFAULT != constants.OP_PRIO_HIGH, \
267
             "Default priority equals high priority; test can't work"
268
      self.assertEqual(op.priority, constants.OP_PRIO_HIGH)
269
      self.assertEqual(op.status, constants.OP_STATUS_QUEUED)
270

    
271
    inpop = opcodes.OpTagsGet(priority=constants.OP_PRIO_HIGH)
272
    op1 = jqueue._QueuedOpCode(inpop)
273
    _Check(op1)
274
    op2 = jqueue._QueuedOpCode.Restore(op1.Serialize())
275
    _Check(op2)
276
    self.assertEqual(op1.Serialize(), op2.Serialize())
277

    
278

    
279
class TestQueuedJob(unittest.TestCase):
280
  def test(self):
281
    self.assertRaises(errors.GenericError, jqueue._QueuedJob,
282
                      None, 1, [])
283

    
284
  def testDefaults(self):
285
    job_id = 4260
286
    ops = [
287
      opcodes.OpTagsGet(),
288
      opcodes.OpTestDelay(),
289
      ]
290

    
291
    def _Check(job):
292
      self.assertEqual(job.id, job_id)
293
      self.assertEqual(job.log_serial, 0)
294
      self.assert_(job.received_timestamp)
295
      self.assert_(job.start_timestamp is None)
296
      self.assert_(job.end_timestamp is None)
297
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
298
      self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT)
299
      self.assert_(repr(job).startswith("<"))
300
      self.assertEqual(len(job.ops), len(ops))
301
      self.assert_(compat.all(inp.__getstate__() == op.input.__getstate__()
302
                              for (inp, op) in zip(ops, job.ops)))
303
      self.assertRaises(errors.OpExecError, job.GetInfo,
304
                        ["unknown-field"])
305
      self.assertEqual(job.GetInfo(["summary"]),
306
                       [[op.input.Summary() for op in job.ops]])
307

    
308
    job1 = jqueue._QueuedJob(None, job_id, ops)
309
    _Check(job1)
310
    job2 = jqueue._QueuedJob.Restore(None, job1.Serialize())
311
    _Check(job2)
312
    self.assertEqual(job1.Serialize(), job2.Serialize())
313

    
314
  def testPriority(self):
315
    job_id = 4283
316
    ops = [
317
      opcodes.OpTagsGet(priority=constants.OP_PRIO_DEFAULT),
318
      opcodes.OpTestDelay(),
319
      ]
320

    
321
    def _Check(job):
322
      self.assertEqual(job.id, job_id)
323
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
324
      self.assert_(repr(job).startswith("<"))
325

    
326
    job = jqueue._QueuedJob(None, job_id, ops)
327
    _Check(job)
328
    self.assert_(compat.all(op.priority == constants.OP_PRIO_DEFAULT
329
                            for op in job.ops))
330
    self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT)
331

    
332
    # Increase first
333
    job.ops[0].priority -= 1
334
    _Check(job)
335
    self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT - 1)
336

    
337
    # Mark opcode as finished
338
    job.ops[0].status = constants.OP_STATUS_SUCCESS
339
    _Check(job)
340
    self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT)
341

    
342
    # Increase second
343
    job.ops[1].priority -= 10
344
    self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT - 10)
345

    
346
    # Test increasing first
347
    job.ops[0].status = constants.OP_STATUS_RUNNING
348
    job.ops[0].priority -= 19
349
    self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT - 20)
350

    
351
  def testCalcStatus(self):
352
    def _Queued(ops):
353
      # The default status is "queued"
354
      self.assert_(compat.all(op.status == constants.OP_STATUS_QUEUED
355
                              for op in ops))
356

    
357
    def _Waitlock1(ops):
358
      ops[0].status = constants.OP_STATUS_WAITLOCK
359

    
360
    def _Waitlock2(ops):
361
      ops[0].status = constants.OP_STATUS_SUCCESS
362
      ops[1].status = constants.OP_STATUS_SUCCESS
363
      ops[2].status = constants.OP_STATUS_WAITLOCK
364

    
365
    def _Running(ops):
366
      ops[0].status = constants.OP_STATUS_SUCCESS
367
      ops[1].status = constants.OP_STATUS_RUNNING
368
      for op in ops[2:]:
369
        op.status = constants.OP_STATUS_QUEUED
370

    
371
    def _Canceling1(ops):
372
      ops[0].status = constants.OP_STATUS_SUCCESS
373
      ops[1].status = constants.OP_STATUS_SUCCESS
374
      for op in ops[2:]:
375
        op.status = constants.OP_STATUS_CANCELING
376

    
377
    def _Canceling2(ops):
378
      for op in ops:
379
        op.status = constants.OP_STATUS_CANCELING
380

    
381
    def _Canceled(ops):
382
      for op in ops:
383
        op.status = constants.OP_STATUS_CANCELED
384

    
385
    def _Error1(ops):
386
      for idx, op in enumerate(ops):
387
        if idx > 3:
388
          op.status = constants.OP_STATUS_ERROR
389
        else:
390
          op.status = constants.OP_STATUS_SUCCESS
391

    
392
    def _Error2(ops):
393
      for op in ops:
394
        op.status = constants.OP_STATUS_ERROR
395

    
396
    def _Success(ops):
397
      for op in ops:
398
        op.status = constants.OP_STATUS_SUCCESS
399

    
400
    tests = {
401
      constants.JOB_STATUS_QUEUED: [_Queued],
402
      constants.JOB_STATUS_WAITLOCK: [_Waitlock1, _Waitlock2],
403
      constants.JOB_STATUS_RUNNING: [_Running],
404
      constants.JOB_STATUS_CANCELING: [_Canceling1, _Canceling2],
405
      constants.JOB_STATUS_CANCELED: [_Canceled],
406
      constants.JOB_STATUS_ERROR: [_Error1, _Error2],
407
      constants.JOB_STATUS_SUCCESS: [_Success],
408
      }
409

    
410
    def _NewJob():
411
      job = jqueue._QueuedJob(None, 1,
412
                              [opcodes.OpTestDelay() for _ in range(10)])
413
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
414
      self.assert_(compat.all(op.status == constants.OP_STATUS_QUEUED
415
                              for op in job.ops))
416
      return job
417

    
418
    for status in constants.JOB_STATUS_ALL:
419
      sttests = tests[status]
420
      assert sttests
421
      for fn in sttests:
422
        job = _NewJob()
423
        fn(job.ops)
424
        self.assertEqual(job.CalcStatus(), status)
425

    
426

    
427
class _FakeDependencyManager:
428
  def __init__(self):
429
    self._checks = []
430
    self._notifications = []
431
    self._waiting = set()
432

    
433
  def AddCheckResult(self, job, dep_job_id, dep_status, result):
434
    self._checks.append((job, dep_job_id, dep_status, result))
435

    
436
  def CountPendingResults(self):
437
    return len(self._checks)
438

    
439
  def CountWaitingJobs(self):
440
    return len(self._waiting)
441

    
442
  def GetNextNotification(self):
443
    return self._notifications.pop(0)
444

    
445
  def JobWaiting(self, job):
446
    return job in self._waiting
447

    
448
  def CheckAndRegister(self, job, dep_job_id, dep_status):
449
    (exp_job, exp_dep_job_id, exp_dep_status, result) = self._checks.pop(0)
450

    
451
    assert exp_job == job
452
    assert exp_dep_job_id == dep_job_id
453
    assert exp_dep_status == dep_status
454

    
455
    (result_status, _) = result
456

    
457
    if result_status == jqueue._JobDependencyManager.WAIT:
458
      self._waiting.add(job)
459
    elif result_status == jqueue._JobDependencyManager.CONTINUE:
460
      self._waiting.remove(job)
461

    
462
    return result
463

    
464
  def NotifyWaiters(self, job_id):
465
    self._notifications.append(job_id)
466

    
467

    
468
class _DisabledFakeDependencyManager:
469
  def JobWaiting(self, _):
470
    return False
471

    
472
  def CheckAndRegister(self, *args):
473
    assert False, "Should not be called"
474

    
475
  def NotifyWaiters(self, _):
476
    pass
477

    
478

    
479
class _FakeQueueForProc:
480
  def __init__(self, depmgr=None):
481
    self._acquired = False
482
    self._updates = []
483
    self._submitted = []
484

    
485
    self._submit_count = itertools.count(1000)
486

    
487
    if depmgr:
488
      self.depmgr = depmgr
489
    else:
490
      self.depmgr = _DisabledFakeDependencyManager()
491

    
492
  def IsAcquired(self):
493
    return self._acquired
494

    
495
  def GetNextUpdate(self):
496
    return self._updates.pop(0)
497

    
498
  def GetNextSubmittedJob(self):
499
    return self._submitted.pop(0)
500

    
501
  def acquire(self, shared=0):
502
    assert shared == 1
503
    self._acquired = True
504

    
505
  def release(self):
506
    assert self._acquired
507
    self._acquired = False
508

    
509
  def UpdateJobUnlocked(self, job, replicate=True):
510
    assert self._acquired, "Lock not acquired while updating job"
511
    self._updates.append((job, bool(replicate)))
512

    
513
  def SubmitManyJobs(self, jobs):
514
    assert not self._acquired, "Lock acquired while submitting jobs"
515
    job_ids = [self._submit_count.next() for _ in jobs]
516
    self._submitted.extend(zip(job_ids, jobs))
517
    return job_ids
518

    
519

    
520
class _FakeExecOpCodeForProc:
521
  def __init__(self, queue, before_start, after_start):
522
    self._queue = queue
523
    self._before_start = before_start
524
    self._after_start = after_start
525

    
526
  def __call__(self, op, cbs, timeout=None, priority=None):
527
    assert isinstance(op, opcodes.OpTestDummy)
528
    assert not self._queue.IsAcquired(), \
529
           "Queue lock not released when executing opcode"
530

    
531
    if self._before_start:
532
      self._before_start(timeout, priority)
533

    
534
    cbs.NotifyStart()
535

    
536
    if self._after_start:
537
      self._after_start(op, cbs)
538

    
539
    # Check again after the callbacks
540
    assert not self._queue.IsAcquired()
541

    
542
    if op.fail:
543
      raise errors.OpExecError("Error requested (%s)" % op.result)
544

    
545
    if hasattr(op, "submit_jobs") and op.submit_jobs is not None:
546
      return cbs.SubmitManyJobs(op.submit_jobs)
547

    
548
    return op.result
549

    
550

    
551
class _JobProcessorTestUtils:
552
  def _CreateJob(self, queue, job_id, ops):
553
    job = jqueue._QueuedJob(queue, job_id, ops)
554
    self.assertFalse(job.start_timestamp)
555
    self.assertFalse(job.end_timestamp)
556
    self.assertEqual(len(ops), len(job.ops))
557
    self.assert_(compat.all(op.input == inp
558
                            for (op, inp) in zip(job.ops, ops)))
559
    self.assertEqual(job.GetInfo(["ops"]), [[op.__getstate__() for op in ops]])
560
    return job
561

    
562

    
563
class TestJobProcessor(unittest.TestCase, _JobProcessorTestUtils):
564
  def _GenericCheckJob(self, job):
565
    assert compat.all(isinstance(op.input, opcodes.OpTestDummy)
566
                      for op in job.ops)
567

    
568
    self.assertEqual(job.GetInfo(["opstart", "opexec", "opend"]),
569
                     [[op.start_timestamp for op in job.ops],
570
                      [op.exec_timestamp for op in job.ops],
571
                      [op.end_timestamp for op in job.ops]])
572
    self.assertEqual(job.GetInfo(["received_ts", "start_ts", "end_ts"]),
573
                     [job.received_timestamp,
574
                      job.start_timestamp,
575
                      job.end_timestamp])
576
    self.assert_(job.start_timestamp)
577
    self.assert_(job.end_timestamp)
578
    self.assertEqual(job.start_timestamp, job.ops[0].start_timestamp)
579

    
580
  def testSuccess(self):
581
    queue = _FakeQueueForProc()
582

    
583
    for (job_id, opcount) in [(25351, 1), (6637, 3),
584
                              (24644, 10), (32207, 100)]:
585
      ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
586
             for i in range(opcount)]
587

    
588
      # Create job
589
      job = self._CreateJob(queue, job_id, ops)
590

    
591
      def _BeforeStart(timeout, priority):
592
        self.assertEqual(queue.GetNextUpdate(), (job, True))
593
        self.assertRaises(IndexError, queue.GetNextUpdate)
594
        self.assertFalse(queue.IsAcquired())
595
        self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITLOCK)
596
        self.assertFalse(job.cur_opctx)
597

    
598
      def _AfterStart(op, cbs):
599
        self.assertEqual(queue.GetNextUpdate(), (job, True))
600
        self.assertRaises(IndexError, queue.GetNextUpdate)
601

    
602
        self.assertFalse(queue.IsAcquired())
603
        self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
604
        self.assertFalse(job.cur_opctx)
605

    
606
        # Job is running, cancelling shouldn't be possible
607
        (success, _) = job.Cancel()
608
        self.assertFalse(success)
609

    
610
      opexec = _FakeExecOpCodeForProc(queue, _BeforeStart, _AfterStart)
611

    
612
      for idx in range(len(ops)):
613
        self.assertRaises(IndexError, queue.GetNextUpdate)
614
        result = jqueue._JobProcessor(queue, opexec, job)()
615
        self.assertEqual(queue.GetNextUpdate(), (job, True))
616
        self.assertRaises(IndexError, queue.GetNextUpdate)
617
        if idx == len(ops) - 1:
618
          # Last opcode
619
          self.assert_(result)
620
        else:
621
          self.assertFalse(result)
622

    
623
          self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
624
          self.assert_(job.start_timestamp)
625
          self.assertFalse(job.end_timestamp)
626

    
627
      self.assertRaises(IndexError, queue.GetNextUpdate)
628

    
629
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_SUCCESS)
630
      self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_SUCCESS])
631
      self.assertEqual(job.GetInfo(["opresult"]),
632
                       [[op.input.result for op in job.ops]])
633
      self.assertEqual(job.GetInfo(["opstatus"]),
634
                       [len(job.ops) * [constants.OP_STATUS_SUCCESS]])
635
      self.assert_(compat.all(op.start_timestamp and op.end_timestamp
636
                              for op in job.ops))
637

    
638
      self._GenericCheckJob(job)
639

    
640
      # Calling the processor on a finished job should be a no-op
641
      self.assertTrue(jqueue._JobProcessor(queue, opexec, job)())
642
      self.assertRaises(IndexError, queue.GetNextUpdate)
643

    
644
  def testOpcodeError(self):
645
    queue = _FakeQueueForProc()
646

    
647
    testdata = [
648
      (17077, 1, 0, 0),
649
      (1782, 5, 2, 2),
650
      (18179, 10, 9, 9),
651
      (4744, 10, 3, 8),
652
      (23816, 100, 39, 45),
653
      ]
654

    
655
    for (job_id, opcount, failfrom, failto) in testdata:
656
      # Prepare opcodes
657
      ops = [opcodes.OpTestDummy(result="Res%s" % i,
658
                                 fail=(failfrom <= i and
659
                                       i <= failto))
660
             for i in range(opcount)]
661

    
662
      # Create job
663
      job = self._CreateJob(queue, job_id, ops)
664

    
665
      opexec = _FakeExecOpCodeForProc(queue, None, None)
666

    
667
      for idx in range(len(ops)):
668
        self.assertRaises(IndexError, queue.GetNextUpdate)
669
        result = jqueue._JobProcessor(queue, opexec, job)()
670
        # queued to waitlock
671
        self.assertEqual(queue.GetNextUpdate(), (job, True))
672
        # waitlock to running
673
        self.assertEqual(queue.GetNextUpdate(), (job, True))
674
        # Opcode result
675
        self.assertEqual(queue.GetNextUpdate(), (job, True))
676
        self.assertRaises(IndexError, queue.GetNextUpdate)
677

    
678
        if idx in (failfrom, len(ops) - 1):
679
          # Last opcode
680
          self.assert_(result)
681
          break
682

    
683
        self.assertFalse(result)
684

    
685
        self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
686

    
687
      self.assertRaises(IndexError, queue.GetNextUpdate)
688

    
689
      # Check job status
690
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_ERROR)
691
      self.assertEqual(job.GetInfo(["id"]), [job_id])
692
      self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_ERROR])
693

    
694
      # Check opcode status
695
      data = zip(job.ops,
696
                 job.GetInfo(["opstatus"])[0],
697
                 job.GetInfo(["opresult"])[0])
698

    
699
      for idx, (op, opstatus, opresult) in enumerate(data):
700
        if idx < failfrom:
701
          assert not op.input.fail
702
          self.assertEqual(opstatus, constants.OP_STATUS_SUCCESS)
703
          self.assertEqual(opresult, op.input.result)
704
        elif idx <= failto:
705
          assert op.input.fail
706
          self.assertEqual(opstatus, constants.OP_STATUS_ERROR)
707
          self.assertRaises(errors.OpExecError, errors.MaybeRaise, opresult)
708
        else:
709
          assert not op.input.fail
710
          self.assertEqual(opstatus, constants.OP_STATUS_ERROR)
711
          self.assertRaises(errors.OpExecError, errors.MaybeRaise, opresult)
712

    
713
      self.assert_(compat.all(op.start_timestamp and op.end_timestamp
714
                              for op in job.ops[:failfrom]))
715

    
716
      self._GenericCheckJob(job)
717

    
718
      # Calling the processor on a finished job should be a no-op
719
      self.assertTrue(jqueue._JobProcessor(queue, opexec, job)())
720
      self.assertRaises(IndexError, queue.GetNextUpdate)
721

    
722
  def testCancelWhileInQueue(self):
723
    queue = _FakeQueueForProc()
724

    
725
    ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
726
           for i in range(5)]
727

    
728
    # Create job
729
    job_id = 17045
730
    job = self._CreateJob(queue, job_id, ops)
731

    
732
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
733

    
734
    # Mark as cancelled
735
    (success, _) = job.Cancel()
736
    self.assert_(success)
737

    
738
    self.assertRaises(IndexError, queue.GetNextUpdate)
739

    
740
    self.assertFalse(job.start_timestamp)
741
    self.assertTrue(job.end_timestamp)
742
    self.assert_(compat.all(op.status == constants.OP_STATUS_CANCELED
743
                            for op in job.ops))
744

    
745
    # Serialize to check for differences
746
    before_proc = job.Serialize()
747

    
748
    # Simulate processor called in workerpool
749
    opexec = _FakeExecOpCodeForProc(queue, None, None)
750
    self.assert_(jqueue._JobProcessor(queue, opexec, job)())
751

    
752
    # Check result
753
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_CANCELED)
754
    self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_CANCELED])
755
    self.assertFalse(job.start_timestamp)
756
    self.assertTrue(job.end_timestamp)
757
    self.assertFalse(compat.any(op.start_timestamp or op.end_timestamp
758
                                for op in job.ops))
759
    self.assertEqual(job.GetInfo(["opstatus", "opresult"]),
760
                     [[constants.OP_STATUS_CANCELED for _ in job.ops],
761
                      ["Job canceled by request" for _ in job.ops]])
762

    
763
    # Must not have changed or written
764
    self.assertEqual(before_proc, job.Serialize())
765
    self.assertRaises(IndexError, queue.GetNextUpdate)
766

    
767
  def testCancelWhileWaitlockInQueue(self):
768
    queue = _FakeQueueForProc()
769

    
770
    ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
771
           for i in range(5)]
772

    
773
    # Create job
774
    job_id = 8645
775
    job = self._CreateJob(queue, job_id, ops)
776

    
777
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
778

    
779
    job.ops[0].status = constants.OP_STATUS_WAITLOCK
780

    
781
    assert len(job.ops) == 5
782

    
783
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITLOCK)
784

    
785
    # Mark as cancelling
786
    (success, _) = job.Cancel()
787
    self.assert_(success)
788

    
789
    self.assertRaises(IndexError, queue.GetNextUpdate)
790

    
791
    self.assert_(compat.all(op.status == constants.OP_STATUS_CANCELING
792
                            for op in job.ops))
793

    
794
    opexec = _FakeExecOpCodeForProc(queue, None, None)
795
    self.assert_(jqueue._JobProcessor(queue, opexec, job)())
796

    
797
    # Check result
798
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_CANCELED)
799
    self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_CANCELED])
800
    self.assertFalse(job.start_timestamp)
801
    self.assert_(job.end_timestamp)
802
    self.assertFalse(compat.any(op.start_timestamp or op.end_timestamp
803
                                for op in job.ops))
804
    self.assertEqual(job.GetInfo(["opstatus", "opresult"]),
805
                     [[constants.OP_STATUS_CANCELED for _ in job.ops],
806
                      ["Job canceled by request" for _ in job.ops]])
807

    
808
  def testCancelWhileWaitlock(self):
809
    queue = _FakeQueueForProc()
810

    
811
    ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
812
           for i in range(5)]
813

    
814
    # Create job
815
    job_id = 11009
816
    job = self._CreateJob(queue, job_id, ops)
817

    
818
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
819

    
820
    def _BeforeStart(timeout, priority):
821
      self.assertEqual(queue.GetNextUpdate(), (job, True))
822
      self.assertRaises(IndexError, queue.GetNextUpdate)
823
      self.assertFalse(queue.IsAcquired())
824
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITLOCK)
825

    
826
      # Mark as cancelled
827
      (success, _) = job.Cancel()
828
      self.assert_(success)
829

    
830
      self.assert_(compat.all(op.status == constants.OP_STATUS_CANCELING
831
                              for op in job.ops))
832
      self.assertRaises(IndexError, queue.GetNextUpdate)
833

    
834
    def _AfterStart(op, cbs):
835
      self.assertEqual(queue.GetNextUpdate(), (job, True))
836
      self.assertRaises(IndexError, queue.GetNextUpdate)
837
      self.assertFalse(queue.IsAcquired())
838
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
839

    
840
    opexec = _FakeExecOpCodeForProc(queue, _BeforeStart, _AfterStart)
841

    
842
    self.assertRaises(IndexError, queue.GetNextUpdate)
843
    self.assert_(jqueue._JobProcessor(queue, opexec, job)())
844
    self.assertEqual(queue.GetNextUpdate(), (job, True))
845
    self.assertRaises(IndexError, queue.GetNextUpdate)
846

    
847
    # Check result
848
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_CANCELED)
849
    self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_CANCELED])
850
    self.assert_(job.start_timestamp)
851
    self.assert_(job.end_timestamp)
852
    self.assertFalse(compat.all(op.start_timestamp and op.end_timestamp
853
                                for op in job.ops))
854
    self.assertEqual(job.GetInfo(["opstatus", "opresult"]),
855
                     [[constants.OP_STATUS_CANCELED for _ in job.ops],
856
                      ["Job canceled by request" for _ in job.ops]])
857

    
858
  def testCancelWhileWaitlockWithTimeout(self):
859
    queue = _FakeQueueForProc()
860

    
861
    ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
862
           for i in range(5)]
863

    
864
    # Create job
865
    job_id = 24314
866
    job = self._CreateJob(queue, job_id, ops)
867

    
868
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
869

    
870
    def _BeforeStart(timeout, priority):
871
      self.assertFalse(queue.IsAcquired())
872
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITLOCK)
873

    
874
      # Mark as cancelled
875
      (success, _) = job.Cancel()
876
      self.assert_(success)
877

    
878
      self.assert_(compat.all(op.status == constants.OP_STATUS_CANCELING
879
                              for op in job.ops))
880

    
881
      # Fake an acquire attempt timing out
882
      raise mcpu.LockAcquireTimeout()
883

    
884
    def _AfterStart(op, cbs):
885
      self.fail("Should not reach this")
886

    
887
    opexec = _FakeExecOpCodeForProc(queue, _BeforeStart, _AfterStart)
888

    
889
    self.assert_(jqueue._JobProcessor(queue, opexec, job)())
890

    
891
    # Check result
892
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_CANCELED)
893
    self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_CANCELED])
894
    self.assert_(job.start_timestamp)
895
    self.assert_(job.end_timestamp)
896
    self.assertFalse(compat.all(op.start_timestamp and op.end_timestamp
897
                                for op in job.ops))
898
    self.assertEqual(job.GetInfo(["opstatus", "opresult"]),
899
                     [[constants.OP_STATUS_CANCELED for _ in job.ops],
900
                      ["Job canceled by request" for _ in job.ops]])
901

    
902
  def testCancelWhileRunning(self):
903
    # Tests canceling a job with finished opcodes and more, unprocessed ones
904
    queue = _FakeQueueForProc()
905

    
906
    ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
907
           for i in range(3)]
908

    
909
    # Create job
910
    job_id = 28492
911
    job = self._CreateJob(queue, job_id, ops)
912

    
913
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
914

    
915
    opexec = _FakeExecOpCodeForProc(queue, None, None)
916

    
917
    # Run one opcode
918
    self.assertFalse(jqueue._JobProcessor(queue, opexec, job)())
919

    
920
    # Job goes back to queued
921
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
922
    self.assertEqual(job.GetInfo(["opstatus", "opresult"]),
923
                     [[constants.OP_STATUS_SUCCESS,
924
                       constants.OP_STATUS_QUEUED,
925
                       constants.OP_STATUS_QUEUED],
926
                      ["Res0", None, None]])
927

    
928
    # Mark as cancelled
929
    (success, _) = job.Cancel()
930
    self.assert_(success)
931

    
932
    # Try processing another opcode (this will actually cancel the job)
933
    self.assert_(jqueue._JobProcessor(queue, opexec, job)())
934

    
935
    # Check result
936
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_CANCELED)
937
    self.assertEqual(job.GetInfo(["id"]), [job_id])
938
    self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_CANCELED])
939
    self.assertEqual(job.GetInfo(["opstatus", "opresult"]),
940
                     [[constants.OP_STATUS_SUCCESS,
941
                       constants.OP_STATUS_CANCELED,
942
                       constants.OP_STATUS_CANCELED],
943
                      ["Res0", "Job canceled by request",
944
                       "Job canceled by request"]])
945

    
946
  def testPartiallyRun(self):
947
    # Tests calling the processor on a job that's been partially run before the
948
    # program was restarted
949
    queue = _FakeQueueForProc()
950

    
951
    opexec = _FakeExecOpCodeForProc(queue, None, None)
952

    
953
    for job_id, successcount in [(30697, 1), (2552, 4), (12489, 9)]:
954
      ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
955
             for i in range(10)]
956

    
957
      # Create job
958
      job = self._CreateJob(queue, job_id, ops)
959

    
960
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
961

    
962
      for _ in range(successcount):
963
        self.assertFalse(jqueue._JobProcessor(queue, opexec, job)())
964

    
965
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
966
      self.assertEqual(job.GetInfo(["opstatus"]),
967
                       [[constants.OP_STATUS_SUCCESS
968
                         for _ in range(successcount)] +
969
                        [constants.OP_STATUS_QUEUED
970
                         for _ in range(len(ops) - successcount)]])
971

    
972
      self.assert_(job.ops_iter)
973

    
974
      # Serialize and restore (simulates program restart)
975
      newjob = jqueue._QueuedJob.Restore(queue, job.Serialize())
976
      self.assertFalse(newjob.ops_iter)
977
      self._TestPartial(newjob, successcount)
978

    
979
  def _TestPartial(self, job, successcount):
980
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
981
    self.assertEqual(job.start_timestamp, job.ops[0].start_timestamp)
982

    
983
    queue = _FakeQueueForProc()
984
    opexec = _FakeExecOpCodeForProc(queue, None, None)
985

    
986
    for remaining in reversed(range(len(job.ops) - successcount)):
987
      result = jqueue._JobProcessor(queue, opexec, job)()
988
      self.assertEqual(queue.GetNextUpdate(), (job, True))
989
      self.assertEqual(queue.GetNextUpdate(), (job, True))
990
      self.assertEqual(queue.GetNextUpdate(), (job, True))
991
      self.assertRaises(IndexError, queue.GetNextUpdate)
992

    
993
      if remaining == 0:
994
        # Last opcode
995
        self.assert_(result)
996
        break
997

    
998
      self.assertFalse(result)
999

    
1000
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1001

    
1002
    self.assertRaises(IndexError, queue.GetNextUpdate)
1003
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_SUCCESS)
1004
    self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_SUCCESS])
1005
    self.assertEqual(job.GetInfo(["opresult"]),
1006
                     [[op.input.result for op in job.ops]])
1007
    self.assertEqual(job.GetInfo(["opstatus"]),
1008
                     [[constants.OP_STATUS_SUCCESS for _ in job.ops]])
1009
    self.assert_(compat.all(op.start_timestamp and op.end_timestamp
1010
                            for op in job.ops))
1011

    
1012
    self._GenericCheckJob(job)
1013

    
1014
    # Calling the processor on a finished job should be a no-op
1015
    self.assertTrue(jqueue._JobProcessor(queue, opexec, job)())
1016
    self.assertRaises(IndexError, queue.GetNextUpdate)
1017

    
1018
    # ... also after being restored
1019
    job2 = jqueue._QueuedJob.Restore(queue, job.Serialize())
1020
    # Calling the processor on a finished job should be a no-op
1021
    self.assertTrue(jqueue._JobProcessor(queue, opexec, job2)())
1022
    self.assertRaises(IndexError, queue.GetNextUpdate)
1023

    
1024
  def testProcessorOnRunningJob(self):
1025
    ops = [opcodes.OpTestDummy(result="result", fail=False)]
1026

    
1027
    queue = _FakeQueueForProc()
1028
    opexec = _FakeExecOpCodeForProc(queue, None, None)
1029

    
1030
    # Create job
1031
    job = self._CreateJob(queue, 9571, ops)
1032

    
1033
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1034

    
1035
    job.ops[0].status = constants.OP_STATUS_RUNNING
1036

    
1037
    assert len(job.ops) == 1
1038

    
1039
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
1040

    
1041
    # Calling on running job must fail
1042
    self.assertRaises(errors.ProgrammerError,
1043
                      jqueue._JobProcessor(queue, opexec, job))
1044

    
1045
  def testLogMessages(self):
1046
    # Tests the "Feedback" callback function
1047
    queue = _FakeQueueForProc()
1048

    
1049
    messages = {
1050
      1: [
1051
        (None, "Hello"),
1052
        (None, "World"),
1053
        (constants.ELOG_MESSAGE, "there"),
1054
        ],
1055
      4: [
1056
        (constants.ELOG_JQUEUE_TEST, (1, 2, 3)),
1057
        (constants.ELOG_JQUEUE_TEST, ("other", "type")),
1058
        ],
1059
      }
1060
    ops = [opcodes.OpTestDummy(result="Logtest%s" % i, fail=False,
1061
                               messages=messages.get(i, []))
1062
           for i in range(5)]
1063

    
1064
    # Create job
1065
    job = self._CreateJob(queue, 29386, ops)
1066

    
1067
    def _BeforeStart(timeout, priority):
1068
      self.assertEqual(queue.GetNextUpdate(), (job, True))
1069
      self.assertRaises(IndexError, queue.GetNextUpdate)
1070
      self.assertFalse(queue.IsAcquired())
1071
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITLOCK)
1072

    
1073
    def _AfterStart(op, cbs):
1074
      self.assertEqual(queue.GetNextUpdate(), (job, True))
1075
      self.assertRaises(IndexError, queue.GetNextUpdate)
1076
      self.assertFalse(queue.IsAcquired())
1077
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
1078

    
1079
      self.assertRaises(AssertionError, cbs.Feedback,
1080
                        "too", "many", "arguments")
1081

    
1082
      for (log_type, msg) in op.messages:
1083
        self.assertRaises(IndexError, queue.GetNextUpdate)
1084
        if log_type:
1085
          cbs.Feedback(log_type, msg)
1086
        else:
1087
          cbs.Feedback(msg)
1088
        # Check for job update without replication
1089
        self.assertEqual(queue.GetNextUpdate(), (job, False))
1090
        self.assertRaises(IndexError, queue.GetNextUpdate)
1091

    
1092
    opexec = _FakeExecOpCodeForProc(queue, _BeforeStart, _AfterStart)
1093

    
1094
    for remaining in reversed(range(len(job.ops))):
1095
      self.assertRaises(IndexError, queue.GetNextUpdate)
1096
      result = jqueue._JobProcessor(queue, opexec, job)()
1097
      self.assertEqual(queue.GetNextUpdate(), (job, True))
1098
      self.assertRaises(IndexError, queue.GetNextUpdate)
1099

    
1100
      if remaining == 0:
1101
        # Last opcode
1102
        self.assert_(result)
1103
        break
1104

    
1105
      self.assertFalse(result)
1106

    
1107
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1108

    
1109
    self.assertRaises(IndexError, queue.GetNextUpdate)
1110

    
1111
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_SUCCESS)
1112
    self.assertEqual(job.GetInfo(["opresult"]),
1113
                     [[op.input.result for op in job.ops]])
1114

    
1115
    logmsgcount = sum(len(m) for m in messages.values())
1116

    
1117
    self._CheckLogMessages(job, logmsgcount)
1118

    
1119
    # Serialize and restore (simulates program restart)
1120
    newjob = jqueue._QueuedJob.Restore(queue, job.Serialize())
1121
    self._CheckLogMessages(newjob, logmsgcount)
1122

    
1123
    # Check each message
1124
    prevserial = -1
1125
    for idx, oplog in enumerate(job.GetInfo(["oplog"])[0]):
1126
      for (serial, timestamp, log_type, msg) in oplog:
1127
        (exptype, expmsg) = messages.get(idx).pop(0)
1128
        if exptype:
1129
          self.assertEqual(log_type, exptype)
1130
        else:
1131
          self.assertEqual(log_type, constants.ELOG_MESSAGE)
1132
        self.assertEqual(expmsg, msg)
1133
        self.assert_(serial > prevserial)
1134
        prevserial = serial
1135

    
1136
  def _CheckLogMessages(self, job, count):
1137
    # Check serial
1138
    self.assertEqual(job.log_serial, count)
1139

    
1140
    # No filter
1141
    self.assertEqual(job.GetLogEntries(None),
1142
                     [entry for entries in job.GetInfo(["oplog"])[0] if entries
1143
                      for entry in entries])
1144

    
1145
    # Filter with serial
1146
    assert count > 3
1147
    self.assert_(job.GetLogEntries(3))
1148
    self.assertEqual(job.GetLogEntries(3),
1149
                     [entry for entries in job.GetInfo(["oplog"])[0] if entries
1150
                      for entry in entries][3:])
1151

    
1152
    # No log message after highest serial
1153
    self.assertFalse(job.GetLogEntries(count))
1154
    self.assertFalse(job.GetLogEntries(count + 3))
1155

    
1156
  def testSubmitManyJobs(self):
1157
    queue = _FakeQueueForProc()
1158

    
1159
    job_id = 15656
1160
    ops = [
1161
      opcodes.OpTestDummy(result="Res0", fail=False,
1162
                          submit_jobs=[]),
1163
      opcodes.OpTestDummy(result="Res1", fail=False,
1164
                          submit_jobs=[
1165
                            [opcodes.OpTestDummy(result="r1j0", fail=False)],
1166
                            ]),
1167
      opcodes.OpTestDummy(result="Res2", fail=False,
1168
                          submit_jobs=[
1169
                            [opcodes.OpTestDummy(result="r2j0o0", fail=False),
1170
                             opcodes.OpTestDummy(result="r2j0o1", fail=False),
1171
                             opcodes.OpTestDummy(result="r2j0o2", fail=False),
1172
                             opcodes.OpTestDummy(result="r2j0o3", fail=False)],
1173
                            [opcodes.OpTestDummy(result="r2j1", fail=False)],
1174
                            [opcodes.OpTestDummy(result="r2j3o0", fail=False),
1175
                             opcodes.OpTestDummy(result="r2j3o1", fail=False)],
1176
                            ]),
1177
      ]
1178

    
1179
    # Create job
1180
    job = self._CreateJob(queue, job_id, ops)
1181

    
1182
    def _BeforeStart(timeout, priority):
1183
      self.assertEqual(queue.GetNextUpdate(), (job, True))
1184
      self.assertRaises(IndexError, queue.GetNextUpdate)
1185
      self.assertFalse(queue.IsAcquired())
1186
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITLOCK)
1187
      self.assertFalse(job.cur_opctx)
1188

    
1189
    def _AfterStart(op, cbs):
1190
      self.assertEqual(queue.GetNextUpdate(), (job, True))
1191
      self.assertRaises(IndexError, queue.GetNextUpdate)
1192

    
1193
      self.assertFalse(queue.IsAcquired())
1194
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
1195
      self.assertFalse(job.cur_opctx)
1196

    
1197
      # Job is running, cancelling shouldn't be possible
1198
      (success, _) = job.Cancel()
1199
      self.assertFalse(success)
1200

    
1201
    opexec = _FakeExecOpCodeForProc(queue, _BeforeStart, _AfterStart)
1202

    
1203
    for idx in range(len(ops)):
1204
      self.assertRaises(IndexError, queue.GetNextUpdate)
1205
      result = jqueue._JobProcessor(queue, opexec, job)()
1206
      self.assertEqual(queue.GetNextUpdate(), (job, True))
1207
      self.assertRaises(IndexError, queue.GetNextUpdate)
1208
      if idx == len(ops) - 1:
1209
        # Last opcode
1210
        self.assert_(result)
1211
      else:
1212
        self.assertFalse(result)
1213

    
1214
        self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1215
        self.assert_(job.start_timestamp)
1216
        self.assertFalse(job.end_timestamp)
1217

    
1218
    self.assertRaises(IndexError, queue.GetNextUpdate)
1219

    
1220
    for idx, submitted_ops in enumerate(job_ops
1221
                                        for op in ops
1222
                                        for job_ops in op.submit_jobs):
1223
      self.assertEqual(queue.GetNextSubmittedJob(),
1224
                       (1000 + idx, submitted_ops))
1225
    self.assertRaises(IndexError, queue.GetNextSubmittedJob)
1226

    
1227
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_SUCCESS)
1228
    self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_SUCCESS])
1229
    self.assertEqual(job.GetInfo(["opresult"]),
1230
                     [[[], [1000], [1001, 1002, 1003]]])
1231
    self.assertEqual(job.GetInfo(["opstatus"]),
1232
                     [len(job.ops) * [constants.OP_STATUS_SUCCESS]])
1233

    
1234
    self._GenericCheckJob(job)
1235

    
1236
    # Calling the processor on a finished job should be a no-op
1237
    self.assertTrue(jqueue._JobProcessor(queue, opexec, job)())
1238
    self.assertRaises(IndexError, queue.GetNextUpdate)
1239

    
1240
  def testJobDependency(self):
1241
    depmgr = _FakeDependencyManager()
1242
    queue = _FakeQueueForProc(depmgr=depmgr)
1243

    
1244
    self.assertEqual(queue.depmgr, depmgr)
1245

    
1246
    prev_job_id = 22113
1247
    prev_job_id2 = 28102
1248
    job_id = 29929
1249
    ops = [
1250
      opcodes.OpTestDummy(result="Res0", fail=False,
1251
                          depends=[
1252
                            [prev_job_id2, None],
1253
                            [prev_job_id, None],
1254
                            ]),
1255
      opcodes.OpTestDummy(result="Res1", fail=False),
1256
      ]
1257

    
1258
    # Create job
1259
    job = self._CreateJob(queue, job_id, ops)
1260

    
1261
    def _BeforeStart(timeout, priority):
1262
      if attempt == 0 or attempt > 5:
1263
        # Job should only be updated when it wasn't waiting for another job
1264
        self.assertEqual(queue.GetNextUpdate(), (job, True))
1265
      self.assertRaises(IndexError, queue.GetNextUpdate)
1266
      self.assertFalse(queue.IsAcquired())
1267
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITLOCK)
1268
      self.assertFalse(job.cur_opctx)
1269

    
1270
    def _AfterStart(op, cbs):
1271
      self.assertEqual(queue.GetNextUpdate(), (job, True))
1272
      self.assertRaises(IndexError, queue.GetNextUpdate)
1273

    
1274
      self.assertFalse(queue.IsAcquired())
1275
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
1276
      self.assertFalse(job.cur_opctx)
1277

    
1278
      # Job is running, cancelling shouldn't be possible
1279
      (success, _) = job.Cancel()
1280
      self.assertFalse(success)
1281

    
1282
    opexec = _FakeExecOpCodeForProc(queue, _BeforeStart, _AfterStart)
1283

    
1284
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1285

    
1286
    counter = itertools.count()
1287
    while True:
1288
      attempt = counter.next()
1289

    
1290
      self.assertRaises(IndexError, queue.GetNextUpdate)
1291
      self.assertRaises(IndexError, depmgr.GetNextNotification)
1292

    
1293
      if attempt < 2:
1294
        depmgr.AddCheckResult(job, prev_job_id2, None,
1295
                              (jqueue._JobDependencyManager.WAIT, "wait2"))
1296
      elif attempt == 2:
1297
        depmgr.AddCheckResult(job, prev_job_id2, None,
1298
                              (jqueue._JobDependencyManager.CONTINUE, "cont"))
1299
        # The processor will ask for the next dependency immediately
1300
        depmgr.AddCheckResult(job, prev_job_id, None,
1301
                              (jqueue._JobDependencyManager.WAIT, "wait"))
1302
      elif attempt < 5:
1303
        depmgr.AddCheckResult(job, prev_job_id, None,
1304
                              (jqueue._JobDependencyManager.WAIT, "wait"))
1305
      elif attempt == 5:
1306
        depmgr.AddCheckResult(job, prev_job_id, None,
1307
                              (jqueue._JobDependencyManager.CONTINUE, "cont"))
1308
      if attempt == 2:
1309
        self.assertEqual(depmgr.CountPendingResults(), 2)
1310
      elif attempt > 5:
1311
        self.assertEqual(depmgr.CountPendingResults(), 0)
1312
      else:
1313
        self.assertEqual(depmgr.CountPendingResults(), 1)
1314

    
1315
      result = jqueue._JobProcessor(queue, opexec, job)()
1316
      if attempt == 0 or attempt >= 5:
1317
        # Job should only be updated if there was an actual change
1318
        self.assertEqual(queue.GetNextUpdate(), (job, True))
1319
      self.assertRaises(IndexError, queue.GetNextUpdate)
1320
      self.assertFalse(depmgr.CountPendingResults())
1321

    
1322
      if attempt < 5:
1323
        # Simulate waiting for other job
1324
        self.assertTrue(result)
1325
        self.assertTrue(job.cur_opctx)
1326
        self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITLOCK)
1327
        self.assertRaises(IndexError, depmgr.GetNextNotification)
1328
        self.assert_(job.start_timestamp)
1329
        self.assertFalse(job.end_timestamp)
1330
        continue
1331

    
1332
      if result:
1333
        # Last opcode
1334
        self.assertFalse(job.cur_opctx)
1335
        self.assertEqual(queue.depmgr.GetNextNotification(), job_id)
1336
        break
1337

    
1338
      self.assertRaises(IndexError, depmgr.GetNextNotification)
1339

    
1340
      self.assertFalse(result)
1341
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1342
      self.assert_(job.start_timestamp)
1343
      self.assertFalse(job.end_timestamp)
1344

    
1345
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_SUCCESS)
1346
    self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_SUCCESS])
1347
    self.assertEqual(job.GetInfo(["opresult"]),
1348
                     [[op.input.result for op in job.ops]])
1349
    self.assertEqual(job.GetInfo(["opstatus"]),
1350
                     [len(job.ops) * [constants.OP_STATUS_SUCCESS]])
1351
    self.assertTrue(compat.all(op.start_timestamp and op.end_timestamp
1352
                               for op in job.ops))
1353

    
1354
    self._GenericCheckJob(job)
1355

    
1356
    self.assertRaises(IndexError, queue.GetNextUpdate)
1357
    self.assertRaises(IndexError, depmgr.GetNextNotification)
1358
    self.assertFalse(depmgr.CountPendingResults())
1359
    self.assertFalse(depmgr.CountWaitingJobs())
1360

    
1361
    # Calling the processor on a finished job should be a no-op
1362
    self.assertTrue(jqueue._JobProcessor(queue, opexec, job)())
1363
    self.assertRaises(IndexError, queue.GetNextUpdate)
1364

    
1365
  def testJobDependencyCancel(self):
1366
    depmgr = _FakeDependencyManager()
1367
    queue = _FakeQueueForProc(depmgr=depmgr)
1368

    
1369
    self.assertEqual(queue.depmgr, depmgr)
1370

    
1371
    prev_job_id = 13623
1372
    job_id = 30876
1373
    ops = [
1374
      opcodes.OpTestDummy(result="Res0", fail=False),
1375
      opcodes.OpTestDummy(result="Res1", fail=False,
1376
                          depends=[
1377
                            [prev_job_id, None],
1378
                            ]),
1379
      opcodes.OpTestDummy(result="Res2", fail=False),
1380
      ]
1381

    
1382
    # Create job
1383
    job = self._CreateJob(queue, job_id, ops)
1384

    
1385
    def _BeforeStart(timeout, priority):
1386
      if attempt == 0 or attempt > 5:
1387
        # Job should only be updated when it wasn't waiting for another job
1388
        self.assertEqual(queue.GetNextUpdate(), (job, True))
1389
      self.assertRaises(IndexError, queue.GetNextUpdate)
1390
      self.assertFalse(queue.IsAcquired())
1391
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITLOCK)
1392
      self.assertFalse(job.cur_opctx)
1393

    
1394
    def _AfterStart(op, cbs):
1395
      self.assertEqual(queue.GetNextUpdate(), (job, True))
1396
      self.assertRaises(IndexError, queue.GetNextUpdate)
1397

    
1398
      self.assertFalse(queue.IsAcquired())
1399
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
1400
      self.assertFalse(job.cur_opctx)
1401

    
1402
      # Job is running, cancelling shouldn't be possible
1403
      (success, _) = job.Cancel()
1404
      self.assertFalse(success)
1405

    
1406
    opexec = _FakeExecOpCodeForProc(queue, _BeforeStart, _AfterStart)
1407

    
1408
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1409

    
1410
    counter = itertools.count()
1411
    while True:
1412
      attempt = counter.next()
1413

    
1414
      self.assertRaises(IndexError, queue.GetNextUpdate)
1415
      self.assertRaises(IndexError, depmgr.GetNextNotification)
1416

    
1417
      if attempt == 0:
1418
        # This will handle the first opcode
1419
        pass
1420
      elif attempt < 4:
1421
        depmgr.AddCheckResult(job, prev_job_id, None,
1422
                              (jqueue._JobDependencyManager.WAIT, "wait"))
1423
      elif attempt == 4:
1424
        # Other job was cancelled
1425
        depmgr.AddCheckResult(job, prev_job_id, None,
1426
                              (jqueue._JobDependencyManager.CANCEL, "cancel"))
1427

    
1428
      if attempt == 0:
1429
        self.assertEqual(depmgr.CountPendingResults(), 0)
1430
      else:
1431
        self.assertEqual(depmgr.CountPendingResults(), 1)
1432

    
1433
      result = jqueue._JobProcessor(queue, opexec, job)()
1434
      if attempt <= 1 or attempt >= 4:
1435
        # Job should only be updated if there was an actual change
1436
        self.assertEqual(queue.GetNextUpdate(), (job, True))
1437
      self.assertRaises(IndexError, queue.GetNextUpdate)
1438
      self.assertFalse(depmgr.CountPendingResults())
1439

    
1440
      if attempt > 0 and attempt < 4:
1441
        # Simulate waiting for other job
1442
        self.assertTrue(result)
1443
        self.assertTrue(job.cur_opctx)
1444
        self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITLOCK)
1445
        self.assertRaises(IndexError, depmgr.GetNextNotification)
1446
        self.assert_(job.start_timestamp)
1447
        self.assertFalse(job.end_timestamp)
1448
        continue
1449

    
1450
      if result:
1451
        # Last opcode
1452
        self.assertFalse(job.cur_opctx)
1453
        self.assertEqual(queue.depmgr.GetNextNotification(), job_id)
1454
        break
1455

    
1456
      self.assertRaises(IndexError, depmgr.GetNextNotification)
1457

    
1458
      self.assertFalse(result)
1459
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1460
      self.assert_(job.start_timestamp)
1461
      self.assertFalse(job.end_timestamp)
1462

    
1463
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_CANCELED)
1464
    self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_CANCELED])
1465
    self.assertEqual(job.GetInfo(["opstatus", "opresult"]),
1466
                     [[constants.OP_STATUS_SUCCESS,
1467
                       constants.OP_STATUS_CANCELED,
1468
                       constants.OP_STATUS_CANCELED],
1469
                      ["Res0", "Job canceled by request",
1470
                       "Job canceled by request"]])
1471

    
1472
    self._GenericCheckJob(job)
1473

    
1474
    self.assertRaises(IndexError, queue.GetNextUpdate)
1475
    self.assertRaises(IndexError, depmgr.GetNextNotification)
1476
    self.assertFalse(depmgr.CountPendingResults())
1477

    
1478
    # Calling the processor on a finished job should be a no-op
1479
    self.assertTrue(jqueue._JobProcessor(queue, opexec, job)())
1480
    self.assertRaises(IndexError, queue.GetNextUpdate)
1481

    
1482
  def testJobDependencyWrongstatus(self):
1483
    depmgr = _FakeDependencyManager()
1484
    queue = _FakeQueueForProc(depmgr=depmgr)
1485

    
1486
    self.assertEqual(queue.depmgr, depmgr)
1487

    
1488
    prev_job_id = 9741
1489
    job_id = 11763
1490
    ops = [
1491
      opcodes.OpTestDummy(result="Res0", fail=False),
1492
      opcodes.OpTestDummy(result="Res1", fail=False,
1493
                          depends=[
1494
                            [prev_job_id, None],
1495
                            ]),
1496
      opcodes.OpTestDummy(result="Res2", fail=False),
1497
      ]
1498

    
1499
    # Create job
1500
    job = self._CreateJob(queue, job_id, ops)
1501

    
1502
    def _BeforeStart(timeout, priority):
1503
      if attempt == 0 or attempt > 5:
1504
        # Job should only be updated when it wasn't waiting for another job
1505
        self.assertEqual(queue.GetNextUpdate(), (job, True))
1506
      self.assertRaises(IndexError, queue.GetNextUpdate)
1507
      self.assertFalse(queue.IsAcquired())
1508
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITLOCK)
1509
      self.assertFalse(job.cur_opctx)
1510

    
1511
    def _AfterStart(op, cbs):
1512
      self.assertEqual(queue.GetNextUpdate(), (job, True))
1513
      self.assertRaises(IndexError, queue.GetNextUpdate)
1514

    
1515
      self.assertFalse(queue.IsAcquired())
1516
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
1517
      self.assertFalse(job.cur_opctx)
1518

    
1519
      # Job is running, cancelling shouldn't be possible
1520
      (success, _) = job.Cancel()
1521
      self.assertFalse(success)
1522

    
1523
    opexec = _FakeExecOpCodeForProc(queue, _BeforeStart, _AfterStart)
1524

    
1525
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1526

    
1527
    counter = itertools.count()
1528
    while True:
1529
      attempt = counter.next()
1530

    
1531
      self.assertRaises(IndexError, queue.GetNextUpdate)
1532
      self.assertRaises(IndexError, depmgr.GetNextNotification)
1533

    
1534
      if attempt == 0:
1535
        # This will handle the first opcode
1536
        pass
1537
      elif attempt < 4:
1538
        depmgr.AddCheckResult(job, prev_job_id, None,
1539
                              (jqueue._JobDependencyManager.WAIT, "wait"))
1540
      elif attempt == 4:
1541
        # Other job failed
1542
        depmgr.AddCheckResult(job, prev_job_id, None,
1543
                              (jqueue._JobDependencyManager.WRONGSTATUS, "w"))
1544

    
1545
      if attempt == 0:
1546
        self.assertEqual(depmgr.CountPendingResults(), 0)
1547
      else:
1548
        self.assertEqual(depmgr.CountPendingResults(), 1)
1549

    
1550
      result = jqueue._JobProcessor(queue, opexec, job)()
1551
      if attempt <= 1 or attempt >= 4:
1552
        # Job should only be updated if there was an actual change
1553
        self.assertEqual(queue.GetNextUpdate(), (job, True))
1554
      self.assertRaises(IndexError, queue.GetNextUpdate)
1555
      self.assertFalse(depmgr.CountPendingResults())
1556

    
1557
      if attempt > 0 and attempt < 4:
1558
        # Simulate waiting for other job
1559
        self.assertTrue(result)
1560
        self.assertTrue(job.cur_opctx)
1561
        self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITLOCK)
1562
        self.assertRaises(IndexError, depmgr.GetNextNotification)
1563
        self.assert_(job.start_timestamp)
1564
        self.assertFalse(job.end_timestamp)
1565
        continue
1566

    
1567
      if result:
1568
        # Last opcode
1569
        self.assertFalse(job.cur_opctx)
1570
        self.assertEqual(queue.depmgr.GetNextNotification(), job_id)
1571
        break
1572

    
1573
      self.assertRaises(IndexError, depmgr.GetNextNotification)
1574

    
1575
      self.assertFalse(result)
1576
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1577
      self.assert_(job.start_timestamp)
1578
      self.assertFalse(job.end_timestamp)
1579

    
1580
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_ERROR)
1581
    self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_ERROR])
1582
    self.assertEqual(job.GetInfo(["opstatus"]),
1583
                     [[constants.OP_STATUS_SUCCESS,
1584
                       constants.OP_STATUS_ERROR,
1585
                       constants.OP_STATUS_ERROR]]),
1586

    
1587
    (opresult, ) = job.GetInfo(["opresult"])
1588
    self.assertEqual(len(opresult), len(ops))
1589
    self.assertEqual(opresult[0], "Res0")
1590
    self.assertTrue(errors.GetEncodedError(opresult[1]))
1591
    self.assertTrue(errors.GetEncodedError(opresult[2]))
1592

    
1593
    self._GenericCheckJob(job)
1594

    
1595
    self.assertRaises(IndexError, queue.GetNextUpdate)
1596
    self.assertRaises(IndexError, depmgr.GetNextNotification)
1597
    self.assertFalse(depmgr.CountPendingResults())
1598

    
1599
    # Calling the processor on a finished job should be a no-op
1600
    self.assertTrue(jqueue._JobProcessor(queue, opexec, job)())
1601
    self.assertRaises(IndexError, queue.GetNextUpdate)
1602

    
1603

    
1604
class _FakeTimeoutStrategy:
1605
  def __init__(self, timeouts):
1606
    self.timeouts = timeouts
1607
    self.attempts = 0
1608
    self.last_timeout = None
1609

    
1610
  def NextAttempt(self):
1611
    self.attempts += 1
1612
    if self.timeouts:
1613
      timeout = self.timeouts.pop(0)
1614
    else:
1615
      timeout = None
1616
    self.last_timeout = timeout
1617
    return timeout
1618

    
1619

    
1620
class TestJobProcessorTimeouts(unittest.TestCase, _JobProcessorTestUtils):
1621
  def setUp(self):
1622
    self.queue = _FakeQueueForProc()
1623
    self.job = None
1624
    self.curop = None
1625
    self.opcounter = None
1626
    self.timeout_strategy = None
1627
    self.retries = 0
1628
    self.prev_tsop = None
1629
    self.prev_prio = None
1630
    self.prev_status = None
1631
    self.lock_acq_prio = None
1632
    self.gave_lock = None
1633
    self.done_lock_before_blocking = False
1634

    
1635
  def _BeforeStart(self, timeout, priority):
1636
    job = self.job
1637

    
1638
    # If status has changed, job must've been written
1639
    if self.prev_status != self.job.ops[self.curop].status:
1640
      self.assertEqual(self.queue.GetNextUpdate(), (job, True))
1641
    self.assertRaises(IndexError, self.queue.GetNextUpdate)
1642

    
1643
    self.assertFalse(self.queue.IsAcquired())
1644
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITLOCK)
1645

    
1646
    ts = self.timeout_strategy
1647

    
1648
    self.assert_(timeout is None or isinstance(timeout, (int, float)))
1649
    self.assertEqual(timeout, ts.last_timeout)
1650
    self.assertEqual(priority, job.ops[self.curop].priority)
1651

    
1652
    self.gave_lock = True
1653
    self.lock_acq_prio = priority
1654

    
1655
    if (self.curop == 3 and
1656
        job.ops[self.curop].priority == constants.OP_PRIO_HIGHEST + 3):
1657
      # Give locks before running into blocking acquire
1658
      assert self.retries == 7
1659
      self.retries = 0
1660
      self.done_lock_before_blocking = True
1661
      return
1662

    
1663
    if self.retries > 0:
1664
      self.assert_(timeout is not None)
1665
      self.retries -= 1
1666
      self.gave_lock = False
1667
      raise mcpu.LockAcquireTimeout()
1668

    
1669
    if job.ops[self.curop].priority == constants.OP_PRIO_HIGHEST:
1670
      assert self.retries == 0, "Didn't exhaust all retries at highest priority"
1671
      assert not ts.timeouts
1672
      self.assert_(timeout is None)
1673

    
1674
  def _AfterStart(self, op, cbs):
1675
    job = self.job
1676

    
1677
    # Setting to "running" requires an update
1678
    self.assertEqual(self.queue.GetNextUpdate(), (job, True))
1679
    self.assertRaises(IndexError, self.queue.GetNextUpdate)
1680

    
1681
    self.assertFalse(self.queue.IsAcquired())
1682
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
1683

    
1684
    # Job is running, cancelling shouldn't be possible
1685
    (success, _) = job.Cancel()
1686
    self.assertFalse(success)
1687

    
1688
  def _NextOpcode(self):
1689
    self.curop = self.opcounter.next()
1690
    self.prev_prio = self.job.ops[self.curop].priority
1691
    self.prev_status = self.job.ops[self.curop].status
1692

    
1693
  def _NewTimeoutStrategy(self):
1694
    job = self.job
1695

    
1696
    self.assertEqual(self.retries, 0)
1697

    
1698
    if self.prev_tsop == self.curop:
1699
      # Still on the same opcode, priority must've been increased
1700
      self.assertEqual(self.prev_prio, job.ops[self.curop].priority + 1)
1701

    
1702
    if self.curop == 1:
1703
      # Normal retry
1704
      timeouts = range(10, 31, 10)
1705
      self.retries = len(timeouts) - 1
1706

    
1707
    elif self.curop == 2:
1708
      # Let this run into a blocking acquire
1709
      timeouts = range(11, 61, 12)
1710
      self.retries = len(timeouts)
1711

    
1712
    elif self.curop == 3:
1713
      # Wait for priority to increase, but give lock before blocking acquire
1714
      timeouts = range(12, 100, 14)
1715
      self.retries = len(timeouts)
1716

    
1717
      self.assertFalse(self.done_lock_before_blocking)
1718

    
1719
    elif self.curop == 4:
1720
      self.assert_(self.done_lock_before_blocking)
1721

    
1722
      # Timeouts, but no need to retry
1723
      timeouts = range(10, 31, 10)
1724
      self.retries = 0
1725

    
1726
    elif self.curop == 5:
1727
      # Normal retry
1728
      timeouts = range(19, 100, 11)
1729
      self.retries = len(timeouts)
1730

    
1731
    else:
1732
      timeouts = []
1733
      self.retries = 0
1734

    
1735
    assert len(job.ops) == 10
1736
    assert self.retries <= len(timeouts)
1737

    
1738
    ts = _FakeTimeoutStrategy(timeouts)
1739

    
1740
    self.timeout_strategy = ts
1741
    self.prev_tsop = self.curop
1742
    self.prev_prio = job.ops[self.curop].priority
1743

    
1744
    return ts
1745

    
1746
  def testTimeout(self):
1747
    ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
1748
           for i in range(10)]
1749

    
1750
    # Create job
1751
    job_id = 15801
1752
    job = self._CreateJob(self.queue, job_id, ops)
1753
    self.job = job
1754

    
1755
    self.opcounter = itertools.count(0)
1756

    
1757
    opexec = _FakeExecOpCodeForProc(self.queue, self._BeforeStart,
1758
                                    self._AfterStart)
1759
    tsf = self._NewTimeoutStrategy
1760

    
1761
    self.assertFalse(self.done_lock_before_blocking)
1762

    
1763
    while True:
1764
      proc = jqueue._JobProcessor(self.queue, opexec, job,
1765
                                  _timeout_strategy_factory=tsf)
1766

    
1767
      self.assertRaises(IndexError, self.queue.GetNextUpdate)
1768

    
1769
      if self.curop is not None:
1770
        self.prev_status = self.job.ops[self.curop].status
1771

    
1772
      self.lock_acq_prio = None
1773

    
1774
      result = proc(_nextop_fn=self._NextOpcode)
1775
      assert self.curop is not None
1776

    
1777
      if result or self.gave_lock:
1778
        # Got lock and/or job is done, result must've been written
1779
        self.assertFalse(job.cur_opctx)
1780
        self.assertEqual(self.queue.GetNextUpdate(), (job, True))
1781
        self.assertRaises(IndexError, self.queue.GetNextUpdate)
1782
        self.assertEqual(self.lock_acq_prio, job.ops[self.curop].priority)
1783
        self.assert_(job.ops[self.curop].exec_timestamp)
1784

    
1785
      if result:
1786
        self.assertFalse(job.cur_opctx)
1787
        break
1788

    
1789
      self.assertFalse(result)
1790

    
1791
      if self.curop == 0:
1792
        self.assertEqual(job.ops[self.curop].start_timestamp,
1793
                         job.start_timestamp)
1794

    
1795
      if self.gave_lock:
1796
        # Opcode finished, but job not yet done
1797
        self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1798
      else:
1799
        # Did not get locks
1800
        self.assert_(job.cur_opctx)
1801
        self.assertEqual(job.cur_opctx._timeout_strategy._fn,
1802
                         self.timeout_strategy.NextAttempt)
1803
        self.assertFalse(job.ops[self.curop].exec_timestamp)
1804
        self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITLOCK)
1805

    
1806
        # If priority has changed since acquiring locks, the job must've been
1807
        # updated
1808
        if self.lock_acq_prio != job.ops[self.curop].priority:
1809
          self.assertEqual(self.queue.GetNextUpdate(), (job, True))
1810

    
1811
      self.assertRaises(IndexError, self.queue.GetNextUpdate)
1812

    
1813
      self.assert_(job.start_timestamp)
1814
      self.assertFalse(job.end_timestamp)
1815

    
1816
    self.assertEqual(self.curop, len(job.ops) - 1)
1817
    self.assertEqual(self.job, job)
1818
    self.assertEqual(self.opcounter.next(), len(job.ops))
1819
    self.assert_(self.done_lock_before_blocking)
1820

    
1821
    self.assertRaises(IndexError, self.queue.GetNextUpdate)
1822
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_SUCCESS)
1823
    self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_SUCCESS])
1824
    self.assertEqual(job.GetInfo(["opresult"]),
1825
                     [[op.input.result for op in job.ops]])
1826
    self.assertEqual(job.GetInfo(["opstatus"]),
1827
                     [len(job.ops) * [constants.OP_STATUS_SUCCESS]])
1828
    self.assert_(compat.all(op.start_timestamp and op.end_timestamp
1829
                            for op in job.ops))
1830

    
1831
    # Calling the processor on a finished job should be a no-op
1832
    self.assertTrue(jqueue._JobProcessor(self.queue, opexec, job)())
1833
    self.assertRaises(IndexError, self.queue.GetNextUpdate)
1834

    
1835

    
1836
class TestJobDependencyManager(unittest.TestCase):
1837
  class _FakeJob:
1838
    def __init__(self, job_id):
1839
      self.id = str(job_id)
1840

    
1841
  def setUp(self):
1842
    self._status = []
1843
    self._queue = []
1844
    self.jdm = jqueue._JobDependencyManager(self._GetStatus, self._Enqueue)
1845

    
1846
  def _GetStatus(self, job_id):
1847
    (exp_job_id, result) = self._status.pop(0)
1848
    self.assertEqual(exp_job_id, job_id)
1849
    return result
1850

    
1851
  def _Enqueue(self, jobs):
1852
    self._queue.append(jobs)
1853

    
1854
  def testNotFinalizedThenCancel(self):
1855
    job = self._FakeJob(17697)
1856
    job_id = str(28625)
1857

    
1858
    self._status.append((job_id, constants.JOB_STATUS_RUNNING))
1859
    (result, _) = self.jdm.CheckAndRegister(job, job_id, [])
1860
    self.assertEqual(result, self.jdm.WAIT)
1861
    self.assertFalse(self._status)
1862
    self.assertFalse(self._queue)
1863
    self.assertTrue(self.jdm.JobWaiting(job))
1864
    self.assertEqual(self.jdm._waiters, {
1865
      job_id: set([job]),
1866
      })
1867

    
1868
    self._status.append((job_id, constants.JOB_STATUS_CANCELED))
1869
    (result, _) = self.jdm.CheckAndRegister(job, job_id, [])
1870
    self.assertEqual(result, self.jdm.CANCEL)
1871
    self.assertFalse(self._status)
1872
    self.assertFalse(self._queue)
1873
    self.assertFalse(self.jdm.JobWaiting(job))
1874

    
1875
  def testRequireCancel(self):
1876
    job = self._FakeJob(5278)
1877
    job_id = str(9610)
1878
    dep_status = [constants.JOB_STATUS_CANCELED]
1879

    
1880
    self._status.append((job_id, constants.JOB_STATUS_WAITLOCK))
1881
    (result, _) = self.jdm.CheckAndRegister(job, job_id, dep_status)
1882
    self.assertEqual(result, self.jdm.WAIT)
1883
    self.assertFalse(self._status)
1884
    self.assertFalse(self._queue)
1885
    self.assertTrue(self.jdm.JobWaiting(job))
1886
    self.assertEqual(self.jdm._waiters, {
1887
      job_id: set([job]),
1888
      })
1889

    
1890
    self._status.append((job_id, constants.JOB_STATUS_CANCELED))
1891
    (result, _) = self.jdm.CheckAndRegister(job, job_id, dep_status)
1892
    self.assertEqual(result, self.jdm.CONTINUE)
1893
    self.assertFalse(self._status)
1894
    self.assertFalse(self._queue)
1895
    self.assertFalse(self.jdm.JobWaiting(job))
1896

    
1897
  def testRequireError(self):
1898
    job = self._FakeJob(21459)
1899
    job_id = str(25519)
1900
    dep_status = [constants.JOB_STATUS_ERROR]
1901

    
1902
    self._status.append((job_id, constants.JOB_STATUS_WAITLOCK))
1903
    (result, _) = self.jdm.CheckAndRegister(job, job_id, dep_status)
1904
    self.assertEqual(result, self.jdm.WAIT)
1905
    self.assertFalse(self._status)
1906
    self.assertFalse(self._queue)
1907
    self.assertTrue(self.jdm.JobWaiting(job))
1908
    self.assertEqual(self.jdm._waiters, {
1909
      job_id: set([job]),
1910
      })
1911

    
1912
    self._status.append((job_id, constants.JOB_STATUS_ERROR))
1913
    (result, _) = self.jdm.CheckAndRegister(job, job_id, dep_status)
1914
    self.assertEqual(result, self.jdm.CONTINUE)
1915
    self.assertFalse(self._status)
1916
    self.assertFalse(self._queue)
1917
    self.assertFalse(self.jdm.JobWaiting(job))
1918

    
1919
  def testRequireMultiple(self):
1920
    dep_status = list(constants.JOBS_FINALIZED)
1921

    
1922
    for end_status in dep_status:
1923
      job = self._FakeJob(21343)
1924
      job_id = str(14609)
1925

    
1926
      self._status.append((job_id, constants.JOB_STATUS_WAITLOCK))
1927
      (result, _) = self.jdm.CheckAndRegister(job, job_id, dep_status)
1928
      self.assertEqual(result, self.jdm.WAIT)
1929
      self.assertFalse(self._status)
1930
      self.assertFalse(self._queue)
1931
      self.assertTrue(self.jdm.JobWaiting(job))
1932
      self.assertEqual(self.jdm._waiters, {
1933
        job_id: set([job]),
1934
        })
1935

    
1936
      self._status.append((job_id, end_status))
1937
      (result, _) = self.jdm.CheckAndRegister(job, job_id, dep_status)
1938
      self.assertEqual(result, self.jdm.CONTINUE)
1939
      self.assertFalse(self._status)
1940
      self.assertFalse(self._queue)
1941
      self.assertFalse(self.jdm.JobWaiting(job))
1942

    
1943
  def testNotify(self):
1944
    job = self._FakeJob(8227)
1945
    job_id = str(4113)
1946

    
1947
    self._status.append((job_id, constants.JOB_STATUS_RUNNING))
1948
    (result, _) = self.jdm.CheckAndRegister(job, job_id, [])
1949
    self.assertEqual(result, self.jdm.WAIT)
1950
    self.assertFalse(self._status)
1951
    self.assertFalse(self._queue)
1952
    self.assertTrue(self.jdm.JobWaiting(job))
1953
    self.assertEqual(self.jdm._waiters, {
1954
      job_id: set([job]),
1955
      })
1956

    
1957
    self.jdm.NotifyWaiters(job_id)
1958
    self.assertFalse(self._status)
1959
    self.assertFalse(self.jdm._waiters)
1960
    self.assertFalse(self.jdm.JobWaiting(job))
1961
    self.assertEqual(self._queue, [set([job])])
1962

    
1963
  def testWrongStatus(self):
1964
    job = self._FakeJob(10102)
1965
    job_id = str(1271)
1966

    
1967
    self._status.append((job_id, constants.JOB_STATUS_QUEUED))
1968
    (result, _) = self.jdm.CheckAndRegister(job, job_id,
1969
                                            [constants.JOB_STATUS_SUCCESS])
1970
    self.assertEqual(result, self.jdm.WAIT)
1971
    self.assertFalse(self._status)
1972
    self.assertFalse(self._queue)
1973
    self.assertTrue(self.jdm.JobWaiting(job))
1974
    self.assertEqual(self.jdm._waiters, {
1975
      job_id: set([job]),
1976
      })
1977

    
1978
    self._status.append((job_id, constants.JOB_STATUS_ERROR))
1979
    (result, _) = self.jdm.CheckAndRegister(job, job_id,
1980
                                            [constants.JOB_STATUS_SUCCESS])
1981
    self.assertEqual(result, self.jdm.WRONGSTATUS)
1982
    self.assertFalse(self._status)
1983
    self.assertFalse(self._queue)
1984
    self.assertFalse(self.jdm.JobWaiting(job))
1985

    
1986
  def testCorrectStatus(self):
1987
    job = self._FakeJob(24273)
1988
    job_id = str(23885)
1989

    
1990
    self._status.append((job_id, constants.JOB_STATUS_QUEUED))
1991
    (result, _) = self.jdm.CheckAndRegister(job, job_id,
1992
                                            [constants.JOB_STATUS_SUCCESS])
1993
    self.assertEqual(result, self.jdm.WAIT)
1994
    self.assertFalse(self._status)
1995
    self.assertFalse(self._queue)
1996
    self.assertTrue(self.jdm.JobWaiting(job))
1997
    self.assertEqual(self.jdm._waiters, {
1998
      job_id: set([job]),
1999
      })
2000

    
2001
    self._status.append((job_id, constants.JOB_STATUS_SUCCESS))
2002
    (result, _) = self.jdm.CheckAndRegister(job, job_id,
2003
                                            [constants.JOB_STATUS_SUCCESS])
2004
    self.assertEqual(result, self.jdm.CONTINUE)
2005
    self.assertFalse(self._status)
2006
    self.assertFalse(self._queue)
2007
    self.assertFalse(self.jdm.JobWaiting(job))
2008

    
2009
  def testFinalizedRightAway(self):
2010
    job = self._FakeJob(224)
2011
    job_id = str(3081)
2012

    
2013
    self._status.append((job_id, constants.JOB_STATUS_SUCCESS))
2014
    (result, _) = self.jdm.CheckAndRegister(job, job_id,
2015
                                            [constants.JOB_STATUS_SUCCESS])
2016
    self.assertEqual(result, self.jdm.CONTINUE)
2017
    self.assertFalse(self._status)
2018
    self.assertFalse(self._queue)
2019
    self.assertFalse(self.jdm.JobWaiting(job))
2020
    self.assertEqual(self.jdm._waiters, {
2021
      job_id: set(),
2022
      })
2023

    
2024
    # Force cleanup
2025
    self.jdm.NotifyWaiters("0")
2026
    self.assertFalse(self.jdm._waiters)
2027
    self.assertFalse(self._status)
2028
    self.assertFalse(self._queue)
2029

    
2030
  def testSelfDependency(self):
2031
    job = self._FakeJob(18937)
2032

    
2033
    self._status.append((job.id, constants.JOB_STATUS_SUCCESS))
2034
    (result, _) = self.jdm.CheckAndRegister(job, job.id, [])
2035
    self.assertEqual(result, self.jdm.ERROR)
2036

    
2037
  def testJobDisappears(self):
2038
    job = self._FakeJob(30540)
2039
    job_id = str(23769)
2040

    
2041
    def _FakeStatus(_):
2042
      raise errors.JobLost("#msg#")
2043

    
2044
    jdm = jqueue._JobDependencyManager(_FakeStatus, None)
2045
    (result, _) = jdm.CheckAndRegister(job, job_id, [])
2046
    self.assertEqual(result, self.jdm.ERROR)
2047
    self.assertFalse(jdm.JobWaiting(job))
2048

    
2049

    
2050
if __name__ == "__main__":
2051
  testutils.GanetiTestProgram()