Statistics
| Branch: | Tag: | Revision:

root / test / ganeti.jqueue_unittest.py @ 26d3fd2f

History | View | Annotate | Download (35.4 kB)

1
#!/usr/bin/python
2
#
3

    
4
# Copyright (C) 2010 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Script for testing ganeti.jqueue"""
23

    
24
import os
25
import sys
26
import unittest
27
import tempfile
28
import shutil
29
import errno
30
import itertools
31

    
32
from ganeti import constants
33
from ganeti import utils
34
from ganeti import errors
35
from ganeti import jqueue
36
from ganeti import opcodes
37
from ganeti import compat
38
from ganeti import mcpu
39

    
40
import testutils
41

    
42

    
43
class _FakeJob:
44
  def __init__(self, job_id, status):
45
    self.id = job_id
46
    self._status = status
47
    self._log = []
48

    
49
  def SetStatus(self, status):
50
    self._status = status
51

    
52
  def AddLogEntry(self, msg):
53
    self._log.append((len(self._log), msg))
54

    
55
  def CalcStatus(self):
56
    return self._status
57

    
58
  def GetInfo(self, fields):
59
    result = []
60

    
61
    for name in fields:
62
      if name == "status":
63
        result.append(self._status)
64
      else:
65
        raise Exception("Unknown field")
66

    
67
    return result
68

    
69
  def GetLogEntries(self, newer_than):
70
    assert newer_than is None or newer_than >= 0
71

    
72
    if newer_than is None:
73
      return self._log
74

    
75
    return self._log[newer_than:]
76

    
77

    
78
class TestJobChangesChecker(unittest.TestCase):
79
  def testStatus(self):
80
    job = _FakeJob(9094, constants.JOB_STATUS_QUEUED)
81
    checker = jqueue._JobChangesChecker(["status"], None, None)
82
    self.assertEqual(checker(job), ([constants.JOB_STATUS_QUEUED], []))
83

    
84
    job.SetStatus(constants.JOB_STATUS_RUNNING)
85
    self.assertEqual(checker(job), ([constants.JOB_STATUS_RUNNING], []))
86

    
87
    job.SetStatus(constants.JOB_STATUS_SUCCESS)
88
    self.assertEqual(checker(job), ([constants.JOB_STATUS_SUCCESS], []))
89

    
90
    # job.id is used by checker
91
    self.assertEqual(job.id, 9094)
92

    
93
  def testStatusWithPrev(self):
94
    job = _FakeJob(12807, constants.JOB_STATUS_QUEUED)
95
    checker = jqueue._JobChangesChecker(["status"],
96
                                        [constants.JOB_STATUS_QUEUED], None)
97
    self.assert_(checker(job) is None)
98

    
99
    job.SetStatus(constants.JOB_STATUS_RUNNING)
100
    self.assertEqual(checker(job), ([constants.JOB_STATUS_RUNNING], []))
101

    
102
  def testFinalStatus(self):
103
    for status in constants.JOBS_FINALIZED:
104
      job = _FakeJob(2178711, status)
105
      checker = jqueue._JobChangesChecker(["status"], [status], None)
106
      # There won't be any changes in this status, hence it should signal
107
      # a change immediately
108
      self.assertEqual(checker(job), ([status], []))
109

    
110
  def testLog(self):
111
    job = _FakeJob(9094, constants.JOB_STATUS_RUNNING)
112
    checker = jqueue._JobChangesChecker(["status"], None, None)
113
    self.assertEqual(checker(job), ([constants.JOB_STATUS_RUNNING], []))
114

    
115
    job.AddLogEntry("Hello World")
116
    (job_info, log_entries) = checker(job)
117
    self.assertEqual(job_info, [constants.JOB_STATUS_RUNNING])
118
    self.assertEqual(log_entries, [[0, "Hello World"]])
119

    
120
    checker2 = jqueue._JobChangesChecker(["status"], job_info, len(log_entries))
121
    self.assert_(checker2(job) is None)
122

    
123
    job.AddLogEntry("Foo Bar")
124
    job.SetStatus(constants.JOB_STATUS_ERROR)
125

    
126
    (job_info, log_entries) = checker2(job)
127
    self.assertEqual(job_info, [constants.JOB_STATUS_ERROR])
128
    self.assertEqual(log_entries, [[1, "Foo Bar"]])
129

    
130
    checker3 = jqueue._JobChangesChecker(["status"], None, None)
131
    (job_info, log_entries) = checker3(job)
132
    self.assertEqual(job_info, [constants.JOB_STATUS_ERROR])
133
    self.assertEqual(log_entries, [[0, "Hello World"], [1, "Foo Bar"]])
134

    
135

    
136
class TestJobChangesWaiter(unittest.TestCase):
137
  def setUp(self):
138
    self.tmpdir = tempfile.mkdtemp()
139
    self.filename = utils.PathJoin(self.tmpdir, "job-1")
140
    utils.WriteFile(self.filename, data="")
141

    
142
  def tearDown(self):
143
    shutil.rmtree(self.tmpdir)
144

    
145
  def _EnsureNotifierClosed(self, notifier):
146
    try:
147
      os.fstat(notifier._fd)
148
    except EnvironmentError, err:
149
      self.assertEqual(err.errno, errno.EBADF)
150
    else:
151
      self.fail("File descriptor wasn't closed")
152

    
153
  def testClose(self):
154
    for wait in [False, True]:
155
      waiter = jqueue._JobFileChangesWaiter(self.filename)
156
      try:
157
        if wait:
158
          waiter.Wait(0.001)
159
      finally:
160
        waiter.Close()
161

    
162
      # Ensure file descriptor was closed
163
      self._EnsureNotifierClosed(waiter._notifier)
164

    
165
  def testChangingFile(self):
166
    waiter = jqueue._JobFileChangesWaiter(self.filename)
167
    try:
168
      self.assertFalse(waiter.Wait(0.1))
169
      utils.WriteFile(self.filename, data="changed")
170
      self.assert_(waiter.Wait(60))
171
    finally:
172
      waiter.Close()
173

    
174
    self._EnsureNotifierClosed(waiter._notifier)
175

    
176
  def testChangingFile2(self):
177
    waiter = jqueue._JobChangesWaiter(self.filename)
178
    try:
179
      self.assertFalse(waiter._filewaiter)
180
      self.assert_(waiter.Wait(0.1))
181
      self.assert_(waiter._filewaiter)
182

    
183
      # File waiter is now used, but there have been no changes
184
      self.assertFalse(waiter.Wait(0.1))
185
      utils.WriteFile(self.filename, data="changed")
186
      self.assert_(waiter.Wait(60))
187
    finally:
188
      waiter.Close()
189

    
190
    self._EnsureNotifierClosed(waiter._filewaiter._notifier)
191

    
192

    
193
class TestWaitForJobChangesHelper(unittest.TestCase):
194
  def setUp(self):
195
    self.tmpdir = tempfile.mkdtemp()
196
    self.filename = utils.PathJoin(self.tmpdir, "job-2614226563")
197
    utils.WriteFile(self.filename, data="")
198

    
199
  def tearDown(self):
200
    shutil.rmtree(self.tmpdir)
201

    
202
  def _LoadWaitingJob(self):
203
    return _FakeJob(2614226563, constants.JOB_STATUS_WAITLOCK)
204

    
205
  def _LoadLostJob(self):
206
    return None
207

    
208
  def testNoChanges(self):
209
    wfjc = jqueue._WaitForJobChangesHelper()
210

    
211
    # No change
212
    self.assertEqual(wfjc(self.filename, self._LoadWaitingJob, ["status"],
213
                          [constants.JOB_STATUS_WAITLOCK], None, 0.1),
214
                     constants.JOB_NOTCHANGED)
215

    
216
    # No previous information
217
    self.assertEqual(wfjc(self.filename, self._LoadWaitingJob,
218
                          ["status"], None, None, 1.0),
219
                     ([constants.JOB_STATUS_WAITLOCK], []))
220

    
221
  def testLostJob(self):
222
    wfjc = jqueue._WaitForJobChangesHelper()
223
    self.assert_(wfjc(self.filename, self._LoadLostJob,
224
                      ["status"], None, None, 1.0) is None)
225

    
226

    
227
class TestEncodeOpError(unittest.TestCase):
228
  def test(self):
229
    encerr = jqueue._EncodeOpError(errors.LockError("Test 1"))
230
    self.assert_(isinstance(encerr, tuple))
231
    self.assertRaises(errors.LockError, errors.MaybeRaise, encerr)
232

    
233
    encerr = jqueue._EncodeOpError(errors.GenericError("Test 2"))
234
    self.assert_(isinstance(encerr, tuple))
235
    self.assertRaises(errors.GenericError, errors.MaybeRaise, encerr)
236

    
237
    encerr = jqueue._EncodeOpError(NotImplementedError("Foo"))
238
    self.assert_(isinstance(encerr, tuple))
239
    self.assertRaises(errors.OpExecError, errors.MaybeRaise, encerr)
240

    
241
    encerr = jqueue._EncodeOpError("Hello World")
242
    self.assert_(isinstance(encerr, tuple))
243
    self.assertRaises(errors.OpExecError, errors.MaybeRaise, encerr)
244

    
245

    
246
class TestQueuedOpCode(unittest.TestCase):
247
  def testDefaults(self):
248
    def _Check(op):
249
      self.assertFalse(hasattr(op.input, "dry_run"))
250
      self.assertEqual(op.priority, constants.OP_PRIO_DEFAULT)
251
      self.assertFalse(op.log)
252
      self.assert_(op.start_timestamp is None)
253
      self.assert_(op.exec_timestamp is None)
254
      self.assert_(op.end_timestamp is None)
255
      self.assert_(op.result is None)
256
      self.assertEqual(op.status, constants.OP_STATUS_QUEUED)
257

    
258
    op1 = jqueue._QueuedOpCode(opcodes.OpTestDelay())
259
    _Check(op1)
260
    op2 = jqueue._QueuedOpCode.Restore(op1.Serialize())
261
    _Check(op2)
262
    self.assertEqual(op1.Serialize(), op2.Serialize())
263

    
264
  def testPriority(self):
265
    def _Check(op):
266
      assert constants.OP_PRIO_DEFAULT != constants.OP_PRIO_HIGH, \
267
             "Default priority equals high priority; test can't work"
268
      self.assertEqual(op.priority, constants.OP_PRIO_HIGH)
269
      self.assertEqual(op.status, constants.OP_STATUS_QUEUED)
270

    
271
    inpop = opcodes.OpGetTags(priority=constants.OP_PRIO_HIGH)
272
    op1 = jqueue._QueuedOpCode(inpop)
273
    _Check(op1)
274
    op2 = jqueue._QueuedOpCode.Restore(op1.Serialize())
275
    _Check(op2)
276
    self.assertEqual(op1.Serialize(), op2.Serialize())
277

    
278

    
279
class TestQueuedJob(unittest.TestCase):
280
  def test(self):
281
    self.assertRaises(errors.GenericError, jqueue._QueuedJob,
282
                      None, 1, [])
283

    
284
  def testDefaults(self):
285
    job_id = 4260
286
    ops = [
287
      opcodes.OpGetTags(),
288
      opcodes.OpTestDelay(),
289
      ]
290

    
291
    def _Check(job):
292
      self.assertEqual(job.id, job_id)
293
      self.assertEqual(job.log_serial, 0)
294
      self.assert_(job.received_timestamp)
295
      self.assert_(job.start_timestamp is None)
296
      self.assert_(job.end_timestamp is None)
297
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
298
      self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT)
299
      self.assert_(repr(job).startswith("<"))
300
      self.assertEqual(len(job.ops), len(ops))
301
      self.assert_(compat.all(inp.__getstate__() == op.input.__getstate__()
302
                              for (inp, op) in zip(ops, job.ops)))
303
      self.assertRaises(errors.OpExecError, job.GetInfo,
304
                        ["unknown-field"])
305
      self.assertEqual(job.GetInfo(["summary"]),
306
                       [[op.input.Summary() for op in job.ops]])
307

    
308
    job1 = jqueue._QueuedJob(None, job_id, ops)
309
    _Check(job1)
310
    job2 = jqueue._QueuedJob.Restore(None, job1.Serialize())
311
    _Check(job2)
312
    self.assertEqual(job1.Serialize(), job2.Serialize())
313

    
314
  def testPriority(self):
315
    job_id = 4283
316
    ops = [
317
      opcodes.OpGetTags(priority=constants.OP_PRIO_DEFAULT),
318
      opcodes.OpTestDelay(),
319
      ]
320

    
321
    def _Check(job):
322
      self.assertEqual(job.id, job_id)
323
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
324
      self.assert_(repr(job).startswith("<"))
325

    
326
    job = jqueue._QueuedJob(None, job_id, ops)
327
    _Check(job)
328
    self.assert_(compat.all(op.priority == constants.OP_PRIO_DEFAULT
329
                            for op in job.ops))
330
    self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT)
331

    
332
    # Increase first
333
    job.ops[0].priority -= 1
334
    _Check(job)
335
    self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT - 1)
336

    
337
    # Mark opcode as finished
338
    job.ops[0].status = constants.OP_STATUS_SUCCESS
339
    _Check(job)
340
    self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT)
341

    
342
    # Increase second
343
    job.ops[1].priority -= 10
344
    self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT - 10)
345

    
346
    # Test increasing first
347
    job.ops[0].status = constants.OP_STATUS_RUNNING
348
    job.ops[0].priority -= 19
349
    self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT - 20)
350

    
351
  def testCalcStatus(self):
352
    def _Queued(ops):
353
      # The default status is "queued"
354
      self.assert_(compat.all(op.status == constants.OP_STATUS_QUEUED
355
                              for op in ops))
356

    
357
    def _Waitlock1(ops):
358
      ops[0].status = constants.OP_STATUS_WAITLOCK
359

    
360
    def _Waitlock2(ops):
361
      ops[0].status = constants.OP_STATUS_SUCCESS
362
      ops[1].status = constants.OP_STATUS_SUCCESS
363
      ops[2].status = constants.OP_STATUS_WAITLOCK
364

    
365
    def _Running(ops):
366
      ops[0].status = constants.OP_STATUS_SUCCESS
367
      ops[1].status = constants.OP_STATUS_RUNNING
368
      for op in ops[2:]:
369
        op.status = constants.OP_STATUS_QUEUED
370

    
371
    def _Canceling1(ops):
372
      ops[0].status = constants.OP_STATUS_SUCCESS
373
      ops[1].status = constants.OP_STATUS_SUCCESS
374
      for op in ops[2:]:
375
        op.status = constants.OP_STATUS_CANCELING
376

    
377
    def _Canceling2(ops):
378
      for op in ops:
379
        op.status = constants.OP_STATUS_CANCELING
380

    
381
    def _Canceled(ops):
382
      for op in ops:
383
        op.status = constants.OP_STATUS_CANCELED
384

    
385
    def _Error1(ops):
386
      for idx, op in enumerate(ops):
387
        if idx > 3:
388
          op.status = constants.OP_STATUS_ERROR
389
        else:
390
          op.status = constants.OP_STATUS_SUCCESS
391

    
392
    def _Error2(ops):
393
      for op in ops:
394
        op.status = constants.OP_STATUS_ERROR
395

    
396
    def _Success(ops):
397
      for op in ops:
398
        op.status = constants.OP_STATUS_SUCCESS
399

    
400
    tests = {
401
      constants.JOB_STATUS_QUEUED: [_Queued],
402
      constants.JOB_STATUS_WAITLOCK: [_Waitlock1, _Waitlock2],
403
      constants.JOB_STATUS_RUNNING: [_Running],
404
      constants.JOB_STATUS_CANCELING: [_Canceling1, _Canceling2],
405
      constants.JOB_STATUS_CANCELED: [_Canceled],
406
      constants.JOB_STATUS_ERROR: [_Error1, _Error2],
407
      constants.JOB_STATUS_SUCCESS: [_Success],
408
      }
409

    
410
    def _NewJob():
411
      job = jqueue._QueuedJob(None, 1,
412
                              [opcodes.OpTestDelay() for _ in range(10)])
413
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
414
      self.assert_(compat.all(op.status == constants.OP_STATUS_QUEUED
415
                              for op in job.ops))
416
      return job
417

    
418
    for status in constants.JOB_STATUS_ALL:
419
      sttests = tests[status]
420
      assert sttests
421
      for fn in sttests:
422
        job = _NewJob()
423
        fn(job.ops)
424
        self.assertEqual(job.CalcStatus(), status)
425

    
426

    
427
class _FakeQueueForProc:
428
  def __init__(self):
429
    self._acquired = False
430

    
431
  def IsAcquired(self):
432
    return self._acquired
433

    
434
  def acquire(self, shared=0):
435
    assert shared == 1
436
    self._acquired = True
437

    
438
  def release(self):
439
    assert self._acquired
440
    self._acquired = False
441

    
442
  def UpdateJobUnlocked(self, job, replicate=None):
443
    # TODO: Ensure job is updated at the correct places
444
    pass
445

    
446

    
447
class _FakeExecOpCodeForProc:
448
  def __init__(self, before_start, after_start):
449
    self._before_start = before_start
450
    self._after_start = after_start
451

    
452
  def __call__(self, op, cbs, timeout=None):
453
    assert isinstance(op, opcodes.OpTestDummy)
454

    
455
    if self._before_start:
456
      self._before_start(timeout)
457

    
458
    cbs.NotifyStart()
459

    
460
    if self._after_start:
461
      self._after_start(op, cbs)
462

    
463
    if op.fail:
464
      raise errors.OpExecError("Error requested (%s)" % op.result)
465

    
466
    return op.result
467

    
468

    
469
class _JobProcessorTestUtils:
470
  def _CreateJob(self, queue, job_id, ops):
471
    job = jqueue._QueuedJob(queue, job_id, ops)
472
    self.assertFalse(job.start_timestamp)
473
    self.assertFalse(job.end_timestamp)
474
    self.assertEqual(len(ops), len(job.ops))
475
    self.assert_(compat.all(op.input == inp
476
                            for (op, inp) in zip(job.ops, ops)))
477
    self.assertEqual(job.GetInfo(["ops"]), [[op.__getstate__() for op in ops]])
478
    return job
479

    
480

    
481
class TestJobProcessor(unittest.TestCase, _JobProcessorTestUtils):
482
  def _GenericCheckJob(self, job):
483
    assert compat.all(isinstance(op.input, opcodes.OpTestDummy)
484
                      for op in job.ops)
485

    
486
    self.assertEqual(job.GetInfo(["opstart", "opexec", "opend"]),
487
                     [[op.start_timestamp for op in job.ops],
488
                      [op.exec_timestamp for op in job.ops],
489
                      [op.end_timestamp for op in job.ops]])
490
    self.assertEqual(job.GetInfo(["received_ts", "start_ts", "end_ts"]),
491
                     [job.received_timestamp,
492
                      job.start_timestamp,
493
                      job.end_timestamp])
494
    self.assert_(job.start_timestamp)
495
    self.assert_(job.end_timestamp)
496
    self.assertEqual(job.start_timestamp, job.ops[0].start_timestamp)
497

    
498
  def testSuccess(self):
499
    queue = _FakeQueueForProc()
500

    
501
    for (job_id, opcount) in [(25351, 1), (6637, 3),
502
                              (24644, 10), (32207, 100)]:
503
      ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
504
             for i in range(opcount)]
505

    
506
      # Create job
507
      job = self._CreateJob(queue, job_id, ops)
508

    
509
      def _BeforeStart(_):
510
        self.assertFalse(queue.IsAcquired())
511
        self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITLOCK)
512

    
513
      def _AfterStart(op, cbs):
514
        self.assertFalse(queue.IsAcquired())
515
        self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
516

    
517
        # Job is running, cancelling shouldn't be possible
518
        (success, _) = job.Cancel()
519
        self.assertFalse(success)
520

    
521
      opexec = _FakeExecOpCodeForProc(_BeforeStart, _AfterStart)
522

    
523
      for idx in range(len(ops)):
524
        result = jqueue._JobProcessor(queue, opexec, job)()
525
        if idx == len(ops) - 1:
526
          # Last opcode
527
          self.assert_(result)
528
        else:
529
          self.assertFalse(result)
530

    
531
          self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
532
          self.assert_(job.start_timestamp)
533
          self.assertFalse(job.end_timestamp)
534

    
535
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_SUCCESS)
536
      self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_SUCCESS])
537
      self.assertEqual(job.GetInfo(["opresult"]),
538
                       [[op.input.result for op in job.ops]])
539
      self.assertEqual(job.GetInfo(["opstatus"]),
540
                       [len(job.ops) * [constants.OP_STATUS_SUCCESS]])
541
      self.assert_(compat.all(op.start_timestamp and op.end_timestamp
542
                              for op in job.ops))
543

    
544
      self._GenericCheckJob(job)
545

    
546
      # Finished jobs can't be processed any further
547
      self.assertRaises(errors.ProgrammerError,
548
                        jqueue._JobProcessor(queue, opexec, job))
549

    
550
  def testOpcodeError(self):
551
    queue = _FakeQueueForProc()
552

    
553
    testdata = [
554
      (17077, 1, 0, 0),
555
      (1782, 5, 2, 2),
556
      (18179, 10, 9, 9),
557
      (4744, 10, 3, 8),
558
      (23816, 100, 39, 45),
559
      ]
560

    
561
    for (job_id, opcount, failfrom, failto) in testdata:
562
      # Prepare opcodes
563
      ops = [opcodes.OpTestDummy(result="Res%s" % i,
564
                                 fail=(failfrom <= i and
565
                                       i <= failto))
566
             for i in range(opcount)]
567

    
568
      # Create job
569
      job = self._CreateJob(queue, job_id, ops)
570

    
571
      opexec = _FakeExecOpCodeForProc(None, None)
572

    
573
      for idx in range(len(ops)):
574
        result = jqueue._JobProcessor(queue, opexec, job)()
575

    
576
        if idx in (failfrom, len(ops) - 1):
577
          # Last opcode
578
          self.assert_(result)
579
          break
580

    
581
        self.assertFalse(result)
582

    
583
        self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
584

    
585
      # Check job status
586
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_ERROR)
587
      self.assertEqual(job.GetInfo(["id"]), [job_id])
588
      self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_ERROR])
589

    
590
      # Check opcode status
591
      data = zip(job.ops,
592
                 job.GetInfo(["opstatus"])[0],
593
                 job.GetInfo(["opresult"])[0])
594

    
595
      for idx, (op, opstatus, opresult) in enumerate(data):
596
        if idx < failfrom:
597
          assert not op.input.fail
598
          self.assertEqual(opstatus, constants.OP_STATUS_SUCCESS)
599
          self.assertEqual(opresult, op.input.result)
600
        elif idx <= failto:
601
          assert op.input.fail
602
          self.assertEqual(opstatus, constants.OP_STATUS_ERROR)
603
          self.assertRaises(errors.OpExecError, errors.MaybeRaise, opresult)
604
        else:
605
          assert not op.input.fail
606
          self.assertEqual(opstatus, constants.OP_STATUS_ERROR)
607
          self.assertRaises(errors.OpExecError, errors.MaybeRaise, opresult)
608

    
609
      self.assert_(compat.all(op.start_timestamp and op.end_timestamp
610
                              for op in job.ops[:failfrom]))
611

    
612
      self._GenericCheckJob(job)
613

    
614
      # Finished jobs can't be processed any further
615
      self.assertRaises(errors.ProgrammerError,
616
                        jqueue._JobProcessor(queue, opexec, job))
617

    
618
  def testCancelWhileInQueue(self):
619
    queue = _FakeQueueForProc()
620

    
621
    ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
622
           for i in range(5)]
623

    
624
    # Create job
625
    job_id = 17045
626
    job = self._CreateJob(queue, job_id, ops)
627

    
628
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
629

    
630
    # Mark as cancelled
631
    (success, _) = job.Cancel()
632
    self.assert_(success)
633

    
634
    self.assert_(compat.all(op.status == constants.OP_STATUS_CANCELED
635
                            for op in job.ops))
636

    
637
    opexec = _FakeExecOpCodeForProc(None, None)
638
    jqueue._JobProcessor(queue, opexec, job)()
639

    
640
    # Check result
641
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_CANCELED)
642
    self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_CANCELED])
643
    self.assertFalse(job.start_timestamp)
644
    self.assert_(job.end_timestamp)
645
    self.assertFalse(compat.any(op.start_timestamp or op.end_timestamp
646
                                for op in job.ops))
647
    self.assertEqual(job.GetInfo(["opstatus", "opresult"]),
648
                     [[constants.OP_STATUS_CANCELED for _ in job.ops],
649
                      ["Job canceled by request" for _ in job.ops]])
650

    
651
  def testCancelWhileWaitlock(self):
652
    queue = _FakeQueueForProc()
653

    
654
    ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
655
           for i in range(5)]
656

    
657
    # Create job
658
    job_id = 11009
659
    job = self._CreateJob(queue, job_id, ops)
660

    
661
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
662

    
663
    def _BeforeStart(_):
664
      self.assertFalse(queue.IsAcquired())
665
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITLOCK)
666

    
667
      # Mark as cancelled
668
      (success, _) = job.Cancel()
669
      self.assert_(success)
670

    
671
      self.assert_(compat.all(op.status == constants.OP_STATUS_CANCELING
672
                              for op in job.ops))
673

    
674
    def _AfterStart(op, cbs):
675
      self.assertFalse(queue.IsAcquired())
676
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
677

    
678
    opexec = _FakeExecOpCodeForProc(_BeforeStart, _AfterStart)
679

    
680
    jqueue._JobProcessor(queue, opexec, job)()
681

    
682
    # Check result
683
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_CANCELED)
684
    self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_CANCELED])
685
    self.assert_(job.start_timestamp)
686
    self.assert_(job.end_timestamp)
687
    self.assertFalse(compat.all(op.start_timestamp and op.end_timestamp
688
                                for op in job.ops))
689
    self.assertEqual(job.GetInfo(["opstatus", "opresult"]),
690
                     [[constants.OP_STATUS_CANCELED for _ in job.ops],
691
                      ["Job canceled by request" for _ in job.ops]])
692

    
693
  def testCancelWhileRunning(self):
694
    # Tests canceling a job with finished opcodes and more, unprocessed ones
695
    queue = _FakeQueueForProc()
696

    
697
    ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
698
           for i in range(3)]
699

    
700
    # Create job
701
    job_id = 28492
702
    job = self._CreateJob(queue, job_id, ops)
703

    
704
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
705

    
706
    opexec = _FakeExecOpCodeForProc(None, None)
707

    
708
    # Run one opcode
709
    self.assertFalse(jqueue._JobProcessor(queue, opexec, job)())
710

    
711
    # Job goes back to queued
712
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
713
    self.assertEqual(job.GetInfo(["opstatus", "opresult"]),
714
                     [[constants.OP_STATUS_SUCCESS,
715
                       constants.OP_STATUS_QUEUED,
716
                       constants.OP_STATUS_QUEUED],
717
                      ["Res0", None, None]])
718

    
719
    # Mark as cancelled
720
    (success, _) = job.Cancel()
721
    self.assert_(success)
722

    
723
    # Try processing another opcode (this will actually cancel the job)
724
    self.assert_(jqueue._JobProcessor(queue, opexec, job)())
725

    
726
    # Check result
727
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_CANCELED)
728
    self.assertEqual(job.GetInfo(["id"]), [job_id])
729
    self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_CANCELED])
730
    self.assertEqual(job.GetInfo(["opstatus", "opresult"]),
731
                     [[constants.OP_STATUS_SUCCESS,
732
                       constants.OP_STATUS_CANCELED,
733
                       constants.OP_STATUS_CANCELED],
734
                      ["Res0", "Job canceled by request",
735
                       "Job canceled by request"]])
736

    
737
  def testPartiallyRun(self):
738
    # Tests calling the processor on a job that's been partially run before the
739
    # program was restarted
740
    queue = _FakeQueueForProc()
741

    
742
    opexec = _FakeExecOpCodeForProc(None, None)
743

    
744
    for job_id, successcount in [(30697, 1), (2552, 4), (12489, 9)]:
745
      ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
746
             for i in range(10)]
747

    
748
      # Create job
749
      job = self._CreateJob(queue, job_id, ops)
750

    
751
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
752

    
753
      for _ in range(successcount):
754
        self.assertFalse(jqueue._JobProcessor(queue, opexec, job)())
755

    
756
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
757
      self.assertEqual(job.GetInfo(["opstatus"]),
758
                       [[constants.OP_STATUS_SUCCESS
759
                         for _ in range(successcount)] +
760
                        [constants.OP_STATUS_QUEUED
761
                         for _ in range(len(ops) - successcount)]])
762

    
763
      self.assert_(job.ops_iter)
764

    
765
      # Serialize and restore (simulates program restart)
766
      newjob = jqueue._QueuedJob.Restore(queue, job.Serialize())
767
      self.assertFalse(newjob.ops_iter)
768
      self._TestPartial(newjob, successcount)
769

    
770
  def _TestPartial(self, job, successcount):
771
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
772
    self.assertEqual(job.start_timestamp, job.ops[0].start_timestamp)
773

    
774
    queue = _FakeQueueForProc()
775
    opexec = _FakeExecOpCodeForProc(None, None)
776

    
777
    for remaining in reversed(range(len(job.ops) - successcount)):
778
      result = jqueue._JobProcessor(queue, opexec, job)()
779

    
780
      if remaining == 0:
781
        # Last opcode
782
        self.assert_(result)
783
        break
784

    
785
      self.assertFalse(result)
786

    
787
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
788

    
789
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_SUCCESS)
790
    self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_SUCCESS])
791
    self.assertEqual(job.GetInfo(["opresult"]),
792
                     [[op.input.result for op in job.ops]])
793
    self.assertEqual(job.GetInfo(["opstatus"]),
794
                     [[constants.OP_STATUS_SUCCESS for _ in job.ops]])
795
    self.assert_(compat.all(op.start_timestamp and op.end_timestamp
796
                            for op in job.ops))
797

    
798
    self._GenericCheckJob(job)
799

    
800
    # Finished jobs can't be processed any further
801
    self.assertRaises(errors.ProgrammerError,
802
                      jqueue._JobProcessor(queue, opexec, job))
803

    
804
    # ... also after being restored
805
    job2 = jqueue._QueuedJob.Restore(queue, job.Serialize())
806
    self.assertRaises(errors.ProgrammerError,
807
                      jqueue._JobProcessor(queue, opexec, job2))
808

    
809
  def testProcessorOnRunningJob(self):
810
    ops = [opcodes.OpTestDummy(result="result", fail=False)]
811

    
812
    queue = _FakeQueueForProc()
813
    opexec = _FakeExecOpCodeForProc(None, None)
814

    
815
    # Create job
816
    job = self._CreateJob(queue, 9571, ops)
817

    
818
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
819

    
820
    job.ops[0].status = constants.OP_STATUS_RUNNING
821

    
822
    assert len(job.ops) == 1
823

    
824
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
825

    
826
    # Calling on running job must fail
827
    self.assertRaises(errors.ProgrammerError,
828
                      jqueue._JobProcessor(queue, opexec, job))
829

    
830
  def testLogMessages(self):
831
    # Tests the "Feedback" callback function
832
    queue = _FakeQueueForProc()
833

    
834
    messages = {
835
      1: [
836
        (None, "Hello"),
837
        (None, "World"),
838
        (constants.ELOG_MESSAGE, "there"),
839
        ],
840
      4: [
841
        (constants.ELOG_JQUEUE_TEST, (1, 2, 3)),
842
        (constants.ELOG_JQUEUE_TEST, ("other", "type")),
843
        ],
844
      }
845
    ops = [opcodes.OpTestDummy(result="Logtest%s" % i, fail=False,
846
                               messages=messages.get(i, []))
847
           for i in range(5)]
848

    
849
    # Create job
850
    job = self._CreateJob(queue, 29386, ops)
851

    
852
    def _BeforeStart(_):
853
      self.assertFalse(queue.IsAcquired())
854
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITLOCK)
855

    
856
    def _AfterStart(op, cbs):
857
      self.assertFalse(queue.IsAcquired())
858
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
859

    
860
      self.assertRaises(AssertionError, cbs.Feedback,
861
                        "too", "many", "arguments")
862

    
863
      for (log_type, msg) in op.messages:
864
        if log_type:
865
          cbs.Feedback(log_type, msg)
866
        else:
867
          cbs.Feedback(msg)
868

    
869
    opexec = _FakeExecOpCodeForProc(_BeforeStart, _AfterStart)
870

    
871
    for remaining in reversed(range(len(job.ops))):
872
      result = jqueue._JobProcessor(queue, opexec, job)()
873

    
874
      if remaining == 0:
875
        # Last opcode
876
        self.assert_(result)
877
        break
878

    
879
      self.assertFalse(result)
880

    
881
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
882

    
883
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_SUCCESS)
884
    self.assertEqual(job.GetInfo(["opresult"]),
885
                     [[op.input.result for op in job.ops]])
886

    
887
    logmsgcount = sum(len(m) for m in messages.values())
888

    
889
    self._CheckLogMessages(job, logmsgcount)
890

    
891
    # Serialize and restore (simulates program restart)
892
    newjob = jqueue._QueuedJob.Restore(queue, job.Serialize())
893
    self._CheckLogMessages(newjob, logmsgcount)
894

    
895
    # Check each message
896
    prevserial = -1
897
    for idx, oplog in enumerate(job.GetInfo(["oplog"])[0]):
898
      for (serial, timestamp, log_type, msg) in oplog:
899
        (exptype, expmsg) = messages.get(idx).pop(0)
900
        if exptype:
901
          self.assertEqual(log_type, exptype)
902
        else:
903
          self.assertEqual(log_type, constants.ELOG_MESSAGE)
904
        self.assertEqual(expmsg, msg)
905
        self.assert_(serial > prevserial)
906
        prevserial = serial
907

    
908
  def _CheckLogMessages(self, job, count):
909
    # Check serial
910
    self.assertEqual(job.log_serial, count)
911

    
912
    # No filter
913
    self.assertEqual(job.GetLogEntries(None),
914
                     [entry for entries in job.GetInfo(["oplog"])[0] if entries
915
                      for entry in entries])
916

    
917
    # Filter with serial
918
    assert count > 3
919
    self.assert_(job.GetLogEntries(3))
920
    self.assertEqual(job.GetLogEntries(3),
921
                     [entry for entries in job.GetInfo(["oplog"])[0] if entries
922
                      for entry in entries][3:])
923

    
924
    # No log message after highest serial
925
    self.assertFalse(job.GetLogEntries(count))
926
    self.assertFalse(job.GetLogEntries(count + 3))
927

    
928

    
929
class _FakeTimeoutStrategy:
930
  def __init__(self, timeouts):
931
    self.timeouts = timeouts
932
    self.attempts = 0
933
    self.last_timeout = None
934

    
935
  def NextAttempt(self):
936
    self.attempts += 1
937
    if self.timeouts:
938
      timeout = self.timeouts.pop(0)
939
    else:
940
      timeout = None
941
    self.last_timeout = timeout
942
    return timeout
943

    
944

    
945
class TestJobProcessorTimeouts(unittest.TestCase, _JobProcessorTestUtils):
946
  def setUp(self):
947
    self.queue = _FakeQueueForProc()
948
    self.job = None
949
    self.curop = None
950
    self.opcounter = None
951
    self.timeout_strategy = None
952
    self.retries = 0
953
    self.prev_tsop = None
954
    self.prev_prio = None
955
    self.gave_lock = None
956
    self.done_lock_before_blocking = False
957

    
958
  def _BeforeStart(self, timeout):
959
    job = self.job
960

    
961
    self.assertFalse(self.queue.IsAcquired())
962
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITLOCK)
963

    
964
    ts = self.timeout_strategy
965

    
966
    self.assert_(timeout is None or isinstance(timeout, (int, float)))
967
    self.assertEqual(timeout, ts.last_timeout)
968

    
969
    self.gave_lock = True
970

    
971
    if (self.curop == 3 and
972
        job.ops[self.curop].priority == constants.OP_PRIO_HIGHEST + 3):
973
      # Give locks before running into blocking acquire
974
      assert self.retries == 7
975
      self.retries = 0
976
      self.done_lock_before_blocking = True
977
      return
978

    
979
    if self.retries > 0:
980
      self.assert_(timeout is not None)
981
      self.retries -= 1
982
      self.gave_lock = False
983
      raise mcpu.LockAcquireTimeout()
984

    
985
    if job.ops[self.curop].priority == constants.OP_PRIO_HIGHEST:
986
      assert self.retries == 0, "Didn't exhaust all retries at highest priority"
987
      assert not ts.timeouts
988
      self.assert_(timeout is None)
989

    
990
  def _AfterStart(self, op, cbs):
991
    job = self.job
992

    
993
    self.assertFalse(self.queue.IsAcquired())
994
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
995

    
996
    # Job is running, cancelling shouldn't be possible
997
    (success, _) = job.Cancel()
998
    self.assertFalse(success)
999

    
1000
  def _NextOpcode(self):
1001
    self.curop = self.opcounter.next()
1002
    self.prev_prio = self.job.ops[self.curop].priority
1003

    
1004
  def _NewTimeoutStrategy(self):
1005
    job = self.job
1006

    
1007
    self.assertEqual(self.retries, 0)
1008

    
1009
    if self.prev_tsop == self.curop:
1010
      # Still on the same opcode, priority must've been increased
1011
      self.assertEqual(self.prev_prio, job.ops[self.curop].priority + 1)
1012

    
1013
    if self.curop == 1:
1014
      # Normal retry
1015
      timeouts = range(10, 31, 10)
1016
      self.retries = len(timeouts) - 1
1017

    
1018
    elif self.curop == 2:
1019
      # Let this run into a blocking acquire
1020
      timeouts = range(11, 61, 12)
1021
      self.retries = len(timeouts)
1022

    
1023
    elif self.curop == 3:
1024
      # Wait for priority to increase, but give lock before blocking acquire
1025
      timeouts = range(12, 100, 14)
1026
      self.retries = len(timeouts)
1027

    
1028
      self.assertFalse(self.done_lock_before_blocking)
1029

    
1030
    elif self.curop == 4:
1031
      self.assert_(self.done_lock_before_blocking)
1032

    
1033
      # Timeouts, but no need to retry
1034
      timeouts = range(10, 31, 10)
1035
      self.retries = 0
1036

    
1037
    elif self.curop == 5:
1038
      # Normal retry
1039
      timeouts = range(19, 100, 11)
1040
      self.retries = len(timeouts)
1041

    
1042
    else:
1043
      timeouts = []
1044
      self.retries = 0
1045

    
1046
    assert len(job.ops) == 10
1047
    assert self.retries <= len(timeouts)
1048

    
1049
    ts = _FakeTimeoutStrategy(timeouts)
1050

    
1051
    self.timeout_strategy = ts
1052
    self.prev_tsop = self.curop
1053
    self.prev_prio = job.ops[self.curop].priority
1054

    
1055
    return ts
1056

    
1057
  def testTimeout(self):
1058
    ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
1059
           for i in range(10)]
1060

    
1061
    # Create job
1062
    job_id = 15801
1063
    job = self._CreateJob(self.queue, job_id, ops)
1064
    self.job = job
1065

    
1066
    self.opcounter = itertools.count(0)
1067

    
1068
    opexec = _FakeExecOpCodeForProc(self._BeforeStart, self._AfterStart)
1069
    tsf = self._NewTimeoutStrategy
1070

    
1071
    self.assertFalse(self.done_lock_before_blocking)
1072

    
1073
    for i in itertools.count(0):
1074
      proc = jqueue._JobProcessor(self.queue, opexec, job,
1075
                                  _timeout_strategy_factory=tsf)
1076

    
1077
      result = proc(_nextop_fn=self._NextOpcode)
1078
      if result:
1079
        self.assertFalse(job.cur_opctx)
1080
        break
1081

    
1082
      self.assertFalse(result)
1083

    
1084
      if self.gave_lock:
1085
        self.assertFalse(job.cur_opctx)
1086
      else:
1087
        self.assert_(job.cur_opctx)
1088
        self.assertEqual(job.cur_opctx._timeout_strategy._fn,
1089
                         self.timeout_strategy.NextAttempt)
1090

    
1091
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1092
      self.assert_(job.start_timestamp)
1093
      self.assertFalse(job.end_timestamp)
1094

    
1095
    self.assertEqual(self.curop, len(job.ops) - 1)
1096
    self.assertEqual(self.job, job)
1097
    self.assertEqual(self.opcounter.next(), len(job.ops))
1098
    self.assert_(self.done_lock_before_blocking)
1099

    
1100
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_SUCCESS)
1101
    self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_SUCCESS])
1102
    self.assertEqual(job.GetInfo(["opresult"]),
1103
                     [[op.input.result for op in job.ops]])
1104
    self.assertEqual(job.GetInfo(["opstatus"]),
1105
                     [len(job.ops) * [constants.OP_STATUS_SUCCESS]])
1106
    self.assert_(compat.all(op.start_timestamp and op.end_timestamp
1107
                            for op in job.ops))
1108

    
1109
    # Finished jobs can't be processed any further
1110
    self.assertRaises(errors.ProgrammerError,
1111
                      jqueue._JobProcessor(self.queue, opexec, job))
1112

    
1113

    
1114
if __name__ == "__main__":
1115
  testutils.GanetiTestProgram()