Statistics
| Branch: | Tag: | Revision:

root / test / ganeti.jqueue_unittest.py @ db5bce34

History | View | Annotate | Download (30 kB)

1
#!/usr/bin/python
2
#
3

    
4
# Copyright (C) 2010 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Script for testing ganeti.jqueue"""
23

    
24
import os
25
import sys
26
import unittest
27
import tempfile
28
import shutil
29
import errno
30

    
31
from ganeti import constants
32
from ganeti import utils
33
from ganeti import errors
34
from ganeti import jqueue
35
from ganeti import opcodes
36
from ganeti import compat
37

    
38
import testutils
39

    
40

    
41
class _FakeJob:
42
  def __init__(self, job_id, status):
43
    self.id = job_id
44
    self._status = status
45
    self._log = []
46

    
47
  def SetStatus(self, status):
48
    self._status = status
49

    
50
  def AddLogEntry(self, msg):
51
    self._log.append((len(self._log), msg))
52

    
53
  def CalcStatus(self):
54
    return self._status
55

    
56
  def GetInfo(self, fields):
57
    result = []
58

    
59
    for name in fields:
60
      if name == "status":
61
        result.append(self._status)
62
      else:
63
        raise Exception("Unknown field")
64

    
65
    return result
66

    
67
  def GetLogEntries(self, newer_than):
68
    assert newer_than is None or newer_than >= 0
69

    
70
    if newer_than is None:
71
      return self._log
72

    
73
    return self._log[newer_than:]
74

    
75

    
76
class TestJobChangesChecker(unittest.TestCase):
77
  def testStatus(self):
78
    job = _FakeJob(9094, constants.JOB_STATUS_QUEUED)
79
    checker = jqueue._JobChangesChecker(["status"], None, None)
80
    self.assertEqual(checker(job), ([constants.JOB_STATUS_QUEUED], []))
81

    
82
    job.SetStatus(constants.JOB_STATUS_RUNNING)
83
    self.assertEqual(checker(job), ([constants.JOB_STATUS_RUNNING], []))
84

    
85
    job.SetStatus(constants.JOB_STATUS_SUCCESS)
86
    self.assertEqual(checker(job), ([constants.JOB_STATUS_SUCCESS], []))
87

    
88
    # job.id is used by checker
89
    self.assertEqual(job.id, 9094)
90

    
91
  def testStatusWithPrev(self):
92
    job = _FakeJob(12807, constants.JOB_STATUS_QUEUED)
93
    checker = jqueue._JobChangesChecker(["status"],
94
                                        [constants.JOB_STATUS_QUEUED], None)
95
    self.assert_(checker(job) is None)
96

    
97
    job.SetStatus(constants.JOB_STATUS_RUNNING)
98
    self.assertEqual(checker(job), ([constants.JOB_STATUS_RUNNING], []))
99

    
100
  def testFinalStatus(self):
101
    for status in constants.JOBS_FINALIZED:
102
      job = _FakeJob(2178711, status)
103
      checker = jqueue._JobChangesChecker(["status"], [status], None)
104
      # There won't be any changes in this status, hence it should signal
105
      # a change immediately
106
      self.assertEqual(checker(job), ([status], []))
107

    
108
  def testLog(self):
109
    job = _FakeJob(9094, constants.JOB_STATUS_RUNNING)
110
    checker = jqueue._JobChangesChecker(["status"], None, None)
111
    self.assertEqual(checker(job), ([constants.JOB_STATUS_RUNNING], []))
112

    
113
    job.AddLogEntry("Hello World")
114
    (job_info, log_entries) = checker(job)
115
    self.assertEqual(job_info, [constants.JOB_STATUS_RUNNING])
116
    self.assertEqual(log_entries, [[0, "Hello World"]])
117

    
118
    checker2 = jqueue._JobChangesChecker(["status"], job_info, len(log_entries))
119
    self.assert_(checker2(job) is None)
120

    
121
    job.AddLogEntry("Foo Bar")
122
    job.SetStatus(constants.JOB_STATUS_ERROR)
123

    
124
    (job_info, log_entries) = checker2(job)
125
    self.assertEqual(job_info, [constants.JOB_STATUS_ERROR])
126
    self.assertEqual(log_entries, [[1, "Foo Bar"]])
127

    
128
    checker3 = jqueue._JobChangesChecker(["status"], None, None)
129
    (job_info, log_entries) = checker3(job)
130
    self.assertEqual(job_info, [constants.JOB_STATUS_ERROR])
131
    self.assertEqual(log_entries, [[0, "Hello World"], [1, "Foo Bar"]])
132

    
133

    
134
class TestJobChangesWaiter(unittest.TestCase):
135
  def setUp(self):
136
    self.tmpdir = tempfile.mkdtemp()
137
    self.filename = utils.PathJoin(self.tmpdir, "job-1")
138
    utils.WriteFile(self.filename, data="")
139

    
140
  def tearDown(self):
141
    shutil.rmtree(self.tmpdir)
142

    
143
  def _EnsureNotifierClosed(self, notifier):
144
    try:
145
      os.fstat(notifier._fd)
146
    except EnvironmentError, err:
147
      self.assertEqual(err.errno, errno.EBADF)
148
    else:
149
      self.fail("File descriptor wasn't closed")
150

    
151
  def testClose(self):
152
    for wait in [False, True]:
153
      waiter = jqueue._JobFileChangesWaiter(self.filename)
154
      try:
155
        if wait:
156
          waiter.Wait(0.001)
157
      finally:
158
        waiter.Close()
159

    
160
      # Ensure file descriptor was closed
161
      self._EnsureNotifierClosed(waiter._notifier)
162

    
163
  def testChangingFile(self):
164
    waiter = jqueue._JobFileChangesWaiter(self.filename)
165
    try:
166
      self.assertFalse(waiter.Wait(0.1))
167
      utils.WriteFile(self.filename, data="changed")
168
      self.assert_(waiter.Wait(60))
169
    finally:
170
      waiter.Close()
171

    
172
    self._EnsureNotifierClosed(waiter._notifier)
173

    
174
  def testChangingFile2(self):
175
    waiter = jqueue._JobChangesWaiter(self.filename)
176
    try:
177
      self.assertFalse(waiter._filewaiter)
178
      self.assert_(waiter.Wait(0.1))
179
      self.assert_(waiter._filewaiter)
180

    
181
      # File waiter is now used, but there have been no changes
182
      self.assertFalse(waiter.Wait(0.1))
183
      utils.WriteFile(self.filename, data="changed")
184
      self.assert_(waiter.Wait(60))
185
    finally:
186
      waiter.Close()
187

    
188
    self._EnsureNotifierClosed(waiter._filewaiter._notifier)
189

    
190

    
191
class TestWaitForJobChangesHelper(unittest.TestCase):
192
  def setUp(self):
193
    self.tmpdir = tempfile.mkdtemp()
194
    self.filename = utils.PathJoin(self.tmpdir, "job-2614226563")
195
    utils.WriteFile(self.filename, data="")
196

    
197
  def tearDown(self):
198
    shutil.rmtree(self.tmpdir)
199

    
200
  def _LoadWaitingJob(self):
201
    return _FakeJob(2614226563, constants.JOB_STATUS_WAITLOCK)
202

    
203
  def _LoadLostJob(self):
204
    return None
205

    
206
  def testNoChanges(self):
207
    wfjc = jqueue._WaitForJobChangesHelper()
208

    
209
    # No change
210
    self.assertEqual(wfjc(self.filename, self._LoadWaitingJob, ["status"],
211
                          [constants.JOB_STATUS_WAITLOCK], None, 0.1),
212
                     constants.JOB_NOTCHANGED)
213

    
214
    # No previous information
215
    self.assertEqual(wfjc(self.filename, self._LoadWaitingJob,
216
                          ["status"], None, None, 1.0),
217
                     ([constants.JOB_STATUS_WAITLOCK], []))
218

    
219
  def testLostJob(self):
220
    wfjc = jqueue._WaitForJobChangesHelper()
221
    self.assert_(wfjc(self.filename, self._LoadLostJob,
222
                      ["status"], None, None, 1.0) is None)
223

    
224

    
225
class TestEncodeOpError(unittest.TestCase):
226
  def test(self):
227
    encerr = jqueue._EncodeOpError(errors.LockError("Test 1"))
228
    self.assert_(isinstance(encerr, tuple))
229
    self.assertRaises(errors.LockError, errors.MaybeRaise, encerr)
230

    
231
    encerr = jqueue._EncodeOpError(errors.GenericError("Test 2"))
232
    self.assert_(isinstance(encerr, tuple))
233
    self.assertRaises(errors.GenericError, errors.MaybeRaise, encerr)
234

    
235
    encerr = jqueue._EncodeOpError(NotImplementedError("Foo"))
236
    self.assert_(isinstance(encerr, tuple))
237
    self.assertRaises(errors.OpExecError, errors.MaybeRaise, encerr)
238

    
239
    encerr = jqueue._EncodeOpError("Hello World")
240
    self.assert_(isinstance(encerr, tuple))
241
    self.assertRaises(errors.OpExecError, errors.MaybeRaise, encerr)
242

    
243

    
244
class TestQueuedOpCode(unittest.TestCase):
245
  def testDefaults(self):
246
    def _Check(op):
247
      self.assertFalse(hasattr(op.input, "dry_run"))
248
      self.assertEqual(op.priority, constants.OP_PRIO_DEFAULT)
249
      self.assertFalse(op.log)
250
      self.assert_(op.start_timestamp is None)
251
      self.assert_(op.exec_timestamp is None)
252
      self.assert_(op.end_timestamp is None)
253
      self.assert_(op.result is None)
254
      self.assertEqual(op.status, constants.OP_STATUS_QUEUED)
255

    
256
    op1 = jqueue._QueuedOpCode(opcodes.OpTestDelay())
257
    _Check(op1)
258
    op2 = jqueue._QueuedOpCode.Restore(op1.Serialize())
259
    _Check(op2)
260
    self.assertEqual(op1.Serialize(), op2.Serialize())
261

    
262
  def testPriority(self):
263
    def _Check(op):
264
      assert constants.OP_PRIO_DEFAULT != constants.OP_PRIO_HIGH, \
265
             "Default priority equals high priority; test can't work"
266
      self.assertEqual(op.priority, constants.OP_PRIO_HIGH)
267
      self.assertEqual(op.status, constants.OP_STATUS_QUEUED)
268

    
269
    inpop = opcodes.OpGetTags(priority=constants.OP_PRIO_HIGH)
270
    op1 = jqueue._QueuedOpCode(inpop)
271
    _Check(op1)
272
    op2 = jqueue._QueuedOpCode.Restore(op1.Serialize())
273
    _Check(op2)
274
    self.assertEqual(op1.Serialize(), op2.Serialize())
275

    
276

    
277
class TestQueuedJob(unittest.TestCase):
278
  def test(self):
279
    self.assertRaises(errors.GenericError, jqueue._QueuedJob,
280
                      None, 1, [])
281

    
282
  def testDefaults(self):
283
    job_id = 4260
284
    ops = [
285
      opcodes.OpGetTags(),
286
      opcodes.OpTestDelay(),
287
      ]
288

    
289
    def _Check(job):
290
      self.assertEqual(job.id, job_id)
291
      self.assertEqual(job.log_serial, 0)
292
      self.assert_(job.received_timestamp)
293
      self.assert_(job.start_timestamp is None)
294
      self.assert_(job.end_timestamp is None)
295
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
296
      self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT)
297
      self.assert_(repr(job).startswith("<"))
298
      self.assertEqual(len(job.ops), len(ops))
299
      self.assert_(compat.all(inp.__getstate__() == op.input.__getstate__()
300
                              for (inp, op) in zip(ops, job.ops)))
301
      self.assertRaises(errors.OpExecError, job.GetInfo,
302
                        ["unknown-field"])
303
      self.assertEqual(job.GetInfo(["summary"]),
304
                       [[op.input.Summary() for op in job.ops]])
305

    
306
    job1 = jqueue._QueuedJob(None, job_id, ops)
307
    _Check(job1)
308
    job2 = jqueue._QueuedJob.Restore(None, job1.Serialize())
309
    _Check(job2)
310
    self.assertEqual(job1.Serialize(), job2.Serialize())
311

    
312
  def testPriority(self):
313
    job_id = 4283
314
    ops = [
315
      opcodes.OpGetTags(priority=constants.OP_PRIO_DEFAULT),
316
      opcodes.OpTestDelay(),
317
      ]
318

    
319
    def _Check(job):
320
      self.assertEqual(job.id, job_id)
321
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
322
      self.assert_(repr(job).startswith("<"))
323

    
324
    job = jqueue._QueuedJob(None, job_id, ops)
325
    _Check(job)
326
    self.assert_(compat.all(op.priority == constants.OP_PRIO_DEFAULT
327
                            for op in job.ops))
328
    self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT)
329

    
330
    # Increase first
331
    job.ops[0].priority -= 1
332
    _Check(job)
333
    self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT - 1)
334

    
335
    # Mark opcode as finished
336
    job.ops[0].status = constants.OP_STATUS_SUCCESS
337
    _Check(job)
338
    self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT)
339

    
340
    # Increase second
341
    job.ops[1].priority -= 10
342
    self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT - 10)
343

    
344
    # Test increasing first
345
    job.ops[0].status = constants.OP_STATUS_RUNNING
346
    job.ops[0].priority -= 19
347
    self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT - 20)
348

    
349
  def testCalcStatus(self):
350
    def _Queued(ops):
351
      # The default status is "queued"
352
      self.assert_(compat.all(op.status == constants.OP_STATUS_QUEUED
353
                              for op in ops))
354

    
355
    def _Waitlock1(ops):
356
      ops[0].status = constants.OP_STATUS_WAITLOCK
357

    
358
    def _Waitlock2(ops):
359
      ops[0].status = constants.OP_STATUS_SUCCESS
360
      ops[1].status = constants.OP_STATUS_SUCCESS
361
      ops[2].status = constants.OP_STATUS_WAITLOCK
362

    
363
    def _Running(ops):
364
      ops[0].status = constants.OP_STATUS_SUCCESS
365
      ops[1].status = constants.OP_STATUS_RUNNING
366
      for op in ops[2:]:
367
        op.status = constants.OP_STATUS_QUEUED
368

    
369
    def _Canceling1(ops):
370
      ops[0].status = constants.OP_STATUS_SUCCESS
371
      ops[1].status = constants.OP_STATUS_SUCCESS
372
      for op in ops[2:]:
373
        op.status = constants.OP_STATUS_CANCELING
374

    
375
    def _Canceling2(ops):
376
      for op in ops:
377
        op.status = constants.OP_STATUS_CANCELING
378

    
379
    def _Canceled(ops):
380
      for op in ops:
381
        op.status = constants.OP_STATUS_CANCELED
382

    
383
    def _Error1(ops):
384
      for idx, op in enumerate(ops):
385
        if idx > 3:
386
          op.status = constants.OP_STATUS_ERROR
387
        else:
388
          op.status = constants.OP_STATUS_SUCCESS
389

    
390
    def _Error2(ops):
391
      for op in ops:
392
        op.status = constants.OP_STATUS_ERROR
393

    
394
    def _Success(ops):
395
      for op in ops:
396
        op.status = constants.OP_STATUS_SUCCESS
397

    
398
    tests = {
399
      constants.JOB_STATUS_QUEUED: [_Queued],
400
      constants.JOB_STATUS_WAITLOCK: [_Waitlock1, _Waitlock2],
401
      constants.JOB_STATUS_RUNNING: [_Running],
402
      constants.JOB_STATUS_CANCELING: [_Canceling1, _Canceling2],
403
      constants.JOB_STATUS_CANCELED: [_Canceled],
404
      constants.JOB_STATUS_ERROR: [_Error1, _Error2],
405
      constants.JOB_STATUS_SUCCESS: [_Success],
406
      }
407

    
408
    def _NewJob():
409
      job = jqueue._QueuedJob(None, 1,
410
                              [opcodes.OpTestDelay() for _ in range(10)])
411
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
412
      self.assert_(compat.all(op.status == constants.OP_STATUS_QUEUED
413
                              for op in job.ops))
414
      return job
415

    
416
    for status in constants.JOB_STATUS_ALL:
417
      sttests = tests[status]
418
      assert sttests
419
      for fn in sttests:
420
        job = _NewJob()
421
        fn(job.ops)
422
        self.assertEqual(job.CalcStatus(), status)
423

    
424

    
425
class _FakeQueueForProc:
426
  def __init__(self):
427
    self._acquired = False
428

    
429
  def IsAcquired(self):
430
    return self._acquired
431

    
432
  def acquire(self, shared=0):
433
    assert shared == 1
434
    self._acquired = True
435

    
436
  def release(self):
437
    assert self._acquired
438
    self._acquired = False
439

    
440
  def UpdateJobUnlocked(self, job, replicate=None):
441
    # TODO: Ensure job is updated at the correct places
442
    pass
443

    
444

    
445
class _FakeExecOpCodeForProc:
446
  def __init__(self, before_start, after_start):
447
    self._before_start = before_start
448
    self._after_start = after_start
449

    
450
  def __call__(self, op, cbs):
451
    assert isinstance(op, opcodes.OpTestDummy)
452

    
453
    if self._before_start:
454
      self._before_start()
455

    
456
    cbs.NotifyStart()
457

    
458
    if self._after_start:
459
      self._after_start(op, cbs)
460

    
461
    if op.fail:
462
      raise errors.OpExecError("Error requested (%s)" % op.result)
463

    
464
    return op.result
465

    
466

    
467
class TestJobProcessor(unittest.TestCase):
468
  def _CreateJob(self, queue, job_id, ops):
469
    job = jqueue._QueuedJob(queue, job_id, ops)
470
    self.assertFalse(job.start_timestamp)
471
    self.assertFalse(job.end_timestamp)
472
    self.assertEqual(len(ops), len(job.ops))
473
    self.assert_(compat.all(op.input == inp
474
                            for (op, inp) in zip(job.ops, ops)))
475
    self.assertEqual(job.GetInfo(["ops"]), [[op.__getstate__() for op in ops]])
476
    return job
477

    
478
  def _GenericCheckJob(self, job):
479
    assert compat.all(isinstance(op.input, opcodes.OpTestDummy)
480
                      for op in job.ops)
481

    
482
    self.assertEqual(job.GetInfo(["opstart", "opexec", "opend"]),
483
                     [[op.start_timestamp for op in job.ops],
484
                      [op.exec_timestamp for op in job.ops],
485
                      [op.end_timestamp for op in job.ops]])
486
    self.assertEqual(job.GetInfo(["received_ts", "start_ts", "end_ts"]),
487
                     [job.received_timestamp,
488
                      job.start_timestamp,
489
                      job.end_timestamp])
490
    self.assert_(job.start_timestamp)
491
    self.assert_(job.end_timestamp)
492
    self.assertEqual(job.start_timestamp, job.ops[0].start_timestamp)
493

    
494
  def testSuccess(self):
495
    queue = _FakeQueueForProc()
496

    
497
    for (job_id, opcount) in [(25351, 1), (6637, 3),
498
                              (24644, 10), (32207, 100)]:
499
      ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
500
             for i in range(opcount)]
501

    
502
      # Create job
503
      job = self._CreateJob(queue, job_id, ops)
504

    
505
      def _BeforeStart():
506
        self.assertFalse(queue.IsAcquired())
507
        self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITLOCK)
508

    
509
      def _AfterStart(op, cbs):
510
        self.assertFalse(queue.IsAcquired())
511
        self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
512

    
513
        # Job is running, cancelling shouldn't be possible
514
        (success, _) = job.Cancel()
515
        self.assertFalse(success)
516

    
517
      opexec = _FakeExecOpCodeForProc(_BeforeStart, _AfterStart)
518

    
519
      for idx in range(len(ops)):
520
        result = jqueue._JobProcessor(queue, opexec, job)()
521
        if idx == len(ops) - 1:
522
          # Last opcode
523
          self.assert_(result)
524
        else:
525
          self.assertFalse(result)
526

    
527
          self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
528
          self.assert_(job.start_timestamp)
529
          self.assertFalse(job.end_timestamp)
530

    
531
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_SUCCESS)
532
      self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_SUCCESS])
533
      self.assertEqual(job.GetInfo(["opresult"]),
534
                       [[op.input.result for op in job.ops]])
535
      self.assertEqual(job.GetInfo(["opstatus"]),
536
                       [len(job.ops) * [constants.OP_STATUS_SUCCESS]])
537
      self.assert_(compat.all(op.start_timestamp and op.end_timestamp
538
                              for op in job.ops))
539

    
540
      self._GenericCheckJob(job)
541

    
542
      # Finished jobs can't be processed any further
543
      self.assertRaises(errors.ProgrammerError,
544
                        jqueue._JobProcessor(queue, opexec, job))
545

    
546
  def testOpcodeError(self):
547
    queue = _FakeQueueForProc()
548

    
549
    testdata = [
550
      (17077, 1, 0, 0),
551
      (1782, 5, 2, 2),
552
      (18179, 10, 9, 9),
553
      (4744, 10, 3, 8),
554
      (23816, 100, 39, 45),
555
      ]
556

    
557
    for (job_id, opcount, failfrom, failto) in testdata:
558
      # Prepare opcodes
559
      ops = [opcodes.OpTestDummy(result="Res%s" % i,
560
                                 fail=(failfrom <= i and
561
                                       i <= failto))
562
             for i in range(opcount)]
563

    
564
      # Create job
565
      job = self._CreateJob(queue, job_id, ops)
566

    
567
      opexec = _FakeExecOpCodeForProc(None, None)
568

    
569
      for idx in range(len(ops)):
570
        result = jqueue._JobProcessor(queue, opexec, job)()
571

    
572
        if idx in (failfrom, len(ops) - 1):
573
          # Last opcode
574
          self.assert_(result)
575
          break
576

    
577
        self.assertFalse(result)
578

    
579
        self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
580

    
581
      # Check job status
582
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_ERROR)
583
      self.assertEqual(job.GetInfo(["id"]), [job_id])
584
      self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_ERROR])
585

    
586
      # Check opcode status
587
      data = zip(job.ops,
588
                 job.GetInfo(["opstatus"])[0],
589
                 job.GetInfo(["opresult"])[0])
590

    
591
      for idx, (op, opstatus, opresult) in enumerate(data):
592
        if idx < failfrom:
593
          assert not op.input.fail
594
          self.assertEqual(opstatus, constants.OP_STATUS_SUCCESS)
595
          self.assertEqual(opresult, op.input.result)
596
        elif idx <= failto:
597
          assert op.input.fail
598
          self.assertEqual(opstatus, constants.OP_STATUS_ERROR)
599
          self.assertRaises(errors.OpExecError, errors.MaybeRaise, opresult)
600
        else:
601
          assert not op.input.fail
602
          self.assertEqual(opstatus, constants.OP_STATUS_ERROR)
603
          self.assertRaises(errors.OpExecError, errors.MaybeRaise, opresult)
604

    
605
      self.assert_(compat.all(op.start_timestamp and op.end_timestamp
606
                              for op in job.ops[:failfrom]))
607

    
608
      self._GenericCheckJob(job)
609

    
610
      # Finished jobs can't be processed any further
611
      self.assertRaises(errors.ProgrammerError,
612
                        jqueue._JobProcessor(queue, opexec, job))
613

    
614
  def testCancelWhileInQueue(self):
615
    queue = _FakeQueueForProc()
616

    
617
    ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
618
           for i in range(5)]
619

    
620
    # Create job
621
    job_id = 17045
622
    job = self._CreateJob(queue, job_id, ops)
623

    
624
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
625

    
626
    # Mark as cancelled
627
    (success, _) = job.Cancel()
628
    self.assert_(success)
629

    
630
    self.assert_(compat.all(op.status == constants.OP_STATUS_CANCELED
631
                            for op in job.ops))
632

    
633
    opexec = _FakeExecOpCodeForProc(None, None)
634
    jqueue._JobProcessor(queue, opexec, job)()
635

    
636
    # Check result
637
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_CANCELED)
638
    self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_CANCELED])
639
    self.assertFalse(job.start_timestamp)
640
    self.assert_(job.end_timestamp)
641
    self.assertFalse(compat.any(op.start_timestamp or op.end_timestamp
642
                                for op in job.ops))
643
    self.assertEqual(job.GetInfo(["opstatus", "opresult"]),
644
                     [[constants.OP_STATUS_CANCELED for _ in job.ops],
645
                      ["Job canceled by request" for _ in job.ops]])
646

    
647
  def testCancelWhileWaitlock(self):
648
    queue = _FakeQueueForProc()
649

    
650
    ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
651
           for i in range(5)]
652

    
653
    # Create job
654
    job_id = 11009
655
    job = self._CreateJob(queue, job_id, ops)
656

    
657
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
658

    
659
    def _BeforeStart():
660
      self.assertFalse(queue.IsAcquired())
661
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITLOCK)
662

    
663
      # Mark as cancelled
664
      (success, _) = job.Cancel()
665
      self.assert_(success)
666

    
667
      self.assert_(compat.all(op.status == constants.OP_STATUS_CANCELING
668
                              for op in job.ops))
669

    
670
    def _AfterStart(op, cbs):
671
      self.assertFalse(queue.IsAcquired())
672
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
673

    
674
    opexec = _FakeExecOpCodeForProc(_BeforeStart, _AfterStart)
675

    
676
    jqueue._JobProcessor(queue, opexec, job)()
677

    
678
    # Check result
679
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_CANCELED)
680
    self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_CANCELED])
681
    self.assert_(job.start_timestamp)
682
    self.assert_(job.end_timestamp)
683
    self.assertFalse(compat.all(op.start_timestamp and op.end_timestamp
684
                                for op in job.ops))
685
    self.assertEqual(job.GetInfo(["opstatus", "opresult"]),
686
                     [[constants.OP_STATUS_CANCELED for _ in job.ops],
687
                      ["Job canceled by request" for _ in job.ops]])
688

    
689
  def testCancelWhileRunning(self):
690
    # Tests canceling a job with finished opcodes and more, unprocessed ones
691
    queue = _FakeQueueForProc()
692

    
693
    ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
694
           for i in range(3)]
695

    
696
    # Create job
697
    job_id = 28492
698
    job = self._CreateJob(queue, job_id, ops)
699

    
700
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
701

    
702
    opexec = _FakeExecOpCodeForProc(None, None)
703

    
704
    # Run one opcode
705
    self.assertFalse(jqueue._JobProcessor(queue, opexec, job)())
706

    
707
    # Job goes back to queued
708
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
709
    self.assertEqual(job.GetInfo(["opstatus", "opresult"]),
710
                     [[constants.OP_STATUS_SUCCESS,
711
                       constants.OP_STATUS_QUEUED,
712
                       constants.OP_STATUS_QUEUED],
713
                      ["Res0", None, None]])
714

    
715
    # Mark as cancelled
716
    (success, _) = job.Cancel()
717
    self.assert_(success)
718

    
719
    # Try processing another opcode (this will actually cancel the job)
720
    self.assert_(jqueue._JobProcessor(queue, opexec, job)())
721

    
722
    # Check result
723
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_CANCELED)
724
    self.assertEqual(job.GetInfo(["id"]), [job_id])
725
    self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_CANCELED])
726
    self.assertEqual(job.GetInfo(["opstatus", "opresult"]),
727
                     [[constants.OP_STATUS_SUCCESS,
728
                       constants.OP_STATUS_CANCELED,
729
                       constants.OP_STATUS_CANCELED],
730
                      ["Res0", "Job canceled by request",
731
                       "Job canceled by request"]])
732

    
733
  def testPartiallyRun(self):
734
    # Tests calling the processor on a job that's been partially run before the
735
    # program was restarted
736
    queue = _FakeQueueForProc()
737

    
738
    opexec = _FakeExecOpCodeForProc(None, None)
739

    
740
    for job_id, successcount in [(30697, 1), (2552, 4), (12489, 9)]:
741
      ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
742
             for i in range(10)]
743

    
744
      # Create job
745
      job = self._CreateJob(queue, job_id, ops)
746

    
747
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
748

    
749
      for _ in range(successcount):
750
        self.assertFalse(jqueue._JobProcessor(queue, opexec, job)())
751

    
752
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
753
      self.assertEqual(job.GetInfo(["opstatus"]),
754
                       [[constants.OP_STATUS_SUCCESS
755
                         for _ in range(successcount)] +
756
                        [constants.OP_STATUS_QUEUED
757
                         for _ in range(len(ops) - successcount)]])
758

    
759
      self.assert_(job.current_op)
760

    
761
      # Serialize and restore (simulates program restart)
762
      newjob = jqueue._QueuedJob.Restore(queue, job.Serialize())
763
      self.assertFalse(newjob.current_op)
764
      self._TestPartial(newjob, successcount)
765

    
766
  def _TestPartial(self, job, successcount):
767
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
768
    self.assertEqual(job.start_timestamp, job.ops[0].start_timestamp)
769

    
770
    queue = _FakeQueueForProc()
771
    opexec = _FakeExecOpCodeForProc(None, None)
772

    
773
    for remaining in reversed(range(len(job.ops) - successcount)):
774
      result = jqueue._JobProcessor(queue, opexec, job)()
775

    
776
      if remaining == 0:
777
        # Last opcode
778
        self.assert_(result)
779
        break
780

    
781
      self.assertFalse(result)
782

    
783
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
784

    
785
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_SUCCESS)
786
    self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_SUCCESS])
787
    self.assertEqual(job.GetInfo(["opresult"]),
788
                     [[op.input.result for op in job.ops]])
789
    self.assertEqual(job.GetInfo(["opstatus"]),
790
                     [[constants.OP_STATUS_SUCCESS for _ in job.ops]])
791
    self.assert_(compat.all(op.start_timestamp and op.end_timestamp
792
                            for op in job.ops))
793

    
794
    self._GenericCheckJob(job)
795

    
796
    # Finished jobs can't be processed any further
797
    self.assertRaises(errors.ProgrammerError,
798
                      jqueue._JobProcessor(queue, opexec, job))
799

    
800
    # ... also after being restored
801
    job2 = jqueue._QueuedJob.Restore(queue, job.Serialize())
802
    self.assertRaises(errors.ProgrammerError,
803
                      jqueue._JobProcessor(queue, opexec, job2))
804

    
805
  def testProcessorOnRunningJob(self):
806
    ops = [opcodes.OpTestDummy(result="result", fail=False)]
807

    
808
    queue = _FakeQueueForProc()
809
    opexec = _FakeExecOpCodeForProc(None, None)
810

    
811
    # Create job
812
    job = self._CreateJob(queue, 9571, ops)
813

    
814
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
815

    
816
    job.ops[0].status = constants.OP_STATUS_RUNNING
817

    
818
    assert len(job.ops) == 1
819

    
820
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
821

    
822
    # Calling on running job must fail
823
    self.assertRaises(errors.ProgrammerError,
824
                      jqueue._JobProcessor(queue, opexec, job))
825

    
826
  def testLogMessages(self):
827
    # Tests the "Feedback" callback function
828
    queue = _FakeQueueForProc()
829

    
830
    messages = {
831
      1: [
832
        (None, "Hello"),
833
        (None, "World"),
834
        (constants.ELOG_MESSAGE, "there"),
835
        ],
836
      4: [
837
        (constants.ELOG_JQUEUE_TEST, (1, 2, 3)),
838
        (constants.ELOG_JQUEUE_TEST, ("other", "type")),
839
        ],
840
      }
841
    ops = [opcodes.OpTestDummy(result="Logtest%s" % i, fail=False,
842
                               messages=messages.get(i, []))
843
           for i in range(5)]
844

    
845
    # Create job
846
    job = self._CreateJob(queue, 29386, ops)
847

    
848
    def _BeforeStart():
849
      self.assertFalse(queue.IsAcquired())
850
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITLOCK)
851

    
852
    def _AfterStart(op, cbs):
853
      self.assertFalse(queue.IsAcquired())
854
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
855

    
856
      self.assertRaises(AssertionError, cbs.Feedback,
857
                        "too", "many", "arguments")
858

    
859
      for (log_type, msg) in op.messages:
860
        if log_type:
861
          cbs.Feedback(log_type, msg)
862
        else:
863
          cbs.Feedback(msg)
864

    
865
    opexec = _FakeExecOpCodeForProc(_BeforeStart, _AfterStart)
866

    
867
    for remaining in reversed(range(len(job.ops))):
868
      result = jqueue._JobProcessor(queue, opexec, job)()
869

    
870
      if remaining == 0:
871
        # Last opcode
872
        self.assert_(result)
873
        break
874

    
875
      self.assertFalse(result)
876

    
877
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
878

    
879
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_SUCCESS)
880
    self.assertEqual(job.GetInfo(["opresult"]),
881
                     [[op.input.result for op in job.ops]])
882

    
883
    logmsgcount = sum(len(m) for m in messages.values())
884

    
885
    self._CheckLogMessages(job, logmsgcount)
886

    
887
    # Serialize and restore (simulates program restart)
888
    newjob = jqueue._QueuedJob.Restore(queue, job.Serialize())
889
    self._CheckLogMessages(newjob, logmsgcount)
890

    
891
    # Check each message
892
    prevserial = -1
893
    for idx, oplog in enumerate(job.GetInfo(["oplog"])[0]):
894
      for (serial, timestamp, log_type, msg) in oplog:
895
        (exptype, expmsg) = messages.get(idx).pop(0)
896
        if exptype:
897
          self.assertEqual(log_type, exptype)
898
        else:
899
          self.assertEqual(log_type, constants.ELOG_MESSAGE)
900
        self.assertEqual(expmsg, msg)
901
        self.assert_(serial > prevserial)
902
        prevserial = serial
903

    
904
  def _CheckLogMessages(self, job, count):
905
    # Check serial
906
    self.assertEqual(job.log_serial, count)
907

    
908
    # No filter
909
    self.assertEqual(job.GetLogEntries(None),
910
                     [entry for entries in job.GetInfo(["oplog"])[0] if entries
911
                      for entry in entries])
912

    
913
    # Filter with serial
914
    assert count > 3
915
    self.assert_(job.GetLogEntries(3))
916
    self.assertEqual(job.GetLogEntries(3),
917
                     [entry for entries in job.GetInfo(["oplog"])[0] if entries
918
                      for entry in entries][3:])
919

    
920
    # No log message after highest serial
921
    self.assertFalse(job.GetLogEntries(count))
922
    self.assertFalse(job.GetLogEntries(count + 3))
923

    
924

    
925
if __name__ == "__main__":
926
  testutils.GanetiTestProgram()