Statistics
| Branch: | Tag: | Revision:

root / test / ganeti.jqueue_unittest.py @ be760ba8

History | View | Annotate | Download (27.7 kB)

1
#!/usr/bin/python
2
#
3

    
4
# Copyright (C) 2010 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Script for testing ganeti.jqueue"""
23

    
24
import os
25
import sys
26
import unittest
27
import tempfile
28
import shutil
29
import errno
30

    
31
from ganeti import constants
32
from ganeti import utils
33
from ganeti import errors
34
from ganeti import jqueue
35
from ganeti import opcodes
36
from ganeti import compat
37

    
38
import testutils
39

    
40

    
41
class _FakeJob:
42
  def __init__(self, job_id, status):
43
    self.id = job_id
44
    self._status = status
45
    self._log = []
46

    
47
  def SetStatus(self, status):
48
    self._status = status
49

    
50
  def AddLogEntry(self, msg):
51
    self._log.append((len(self._log), msg))
52

    
53
  def CalcStatus(self):
54
    return self._status
55

    
56
  def GetInfo(self, fields):
57
    result = []
58

    
59
    for name in fields:
60
      if name == "status":
61
        result.append(self._status)
62
      else:
63
        raise Exception("Unknown field")
64

    
65
    return result
66

    
67
  def GetLogEntries(self, newer_than):
68
    assert newer_than is None or newer_than >= 0
69

    
70
    if newer_than is None:
71
      return self._log
72

    
73
    return self._log[newer_than:]
74

    
75

    
76
class TestJobChangesChecker(unittest.TestCase):
77
  def testStatus(self):
78
    job = _FakeJob(9094, constants.JOB_STATUS_QUEUED)
79
    checker = jqueue._JobChangesChecker(["status"], None, None)
80
    self.assertEqual(checker(job), ([constants.JOB_STATUS_QUEUED], []))
81

    
82
    job.SetStatus(constants.JOB_STATUS_RUNNING)
83
    self.assertEqual(checker(job), ([constants.JOB_STATUS_RUNNING], []))
84

    
85
    job.SetStatus(constants.JOB_STATUS_SUCCESS)
86
    self.assertEqual(checker(job), ([constants.JOB_STATUS_SUCCESS], []))
87

    
88
    # job.id is used by checker
89
    self.assertEqual(job.id, 9094)
90

    
91
  def testStatusWithPrev(self):
92
    job = _FakeJob(12807, constants.JOB_STATUS_QUEUED)
93
    checker = jqueue._JobChangesChecker(["status"],
94
                                        [constants.JOB_STATUS_QUEUED], None)
95
    self.assert_(checker(job) is None)
96

    
97
    job.SetStatus(constants.JOB_STATUS_RUNNING)
98
    self.assertEqual(checker(job), ([constants.JOB_STATUS_RUNNING], []))
99

    
100
  def testFinalStatus(self):
101
    for status in constants.JOBS_FINALIZED:
102
      job = _FakeJob(2178711, status)
103
      checker = jqueue._JobChangesChecker(["status"], [status], None)
104
      # There won't be any changes in this status, hence it should signal
105
      # a change immediately
106
      self.assertEqual(checker(job), ([status], []))
107

    
108
  def testLog(self):
109
    job = _FakeJob(9094, constants.JOB_STATUS_RUNNING)
110
    checker = jqueue._JobChangesChecker(["status"], None, None)
111
    self.assertEqual(checker(job), ([constants.JOB_STATUS_RUNNING], []))
112

    
113
    job.AddLogEntry("Hello World")
114
    (job_info, log_entries) = checker(job)
115
    self.assertEqual(job_info, [constants.JOB_STATUS_RUNNING])
116
    self.assertEqual(log_entries, [[0, "Hello World"]])
117

    
118
    checker2 = jqueue._JobChangesChecker(["status"], job_info, len(log_entries))
119
    self.assert_(checker2(job) is None)
120

    
121
    job.AddLogEntry("Foo Bar")
122
    job.SetStatus(constants.JOB_STATUS_ERROR)
123

    
124
    (job_info, log_entries) = checker2(job)
125
    self.assertEqual(job_info, [constants.JOB_STATUS_ERROR])
126
    self.assertEqual(log_entries, [[1, "Foo Bar"]])
127

    
128
    checker3 = jqueue._JobChangesChecker(["status"], None, None)
129
    (job_info, log_entries) = checker3(job)
130
    self.assertEqual(job_info, [constants.JOB_STATUS_ERROR])
131
    self.assertEqual(log_entries, [[0, "Hello World"], [1, "Foo Bar"]])
132

    
133

    
134
class TestJobChangesWaiter(unittest.TestCase):
135
  def setUp(self):
136
    self.tmpdir = tempfile.mkdtemp()
137
    self.filename = utils.PathJoin(self.tmpdir, "job-1")
138
    utils.WriteFile(self.filename, data="")
139

    
140
  def tearDown(self):
141
    shutil.rmtree(self.tmpdir)
142

    
143
  def _EnsureNotifierClosed(self, notifier):
144
    try:
145
      os.fstat(notifier._fd)
146
    except EnvironmentError, err:
147
      self.assertEqual(err.errno, errno.EBADF)
148
    else:
149
      self.fail("File descriptor wasn't closed")
150

    
151
  def testClose(self):
152
    for wait in [False, True]:
153
      waiter = jqueue._JobFileChangesWaiter(self.filename)
154
      try:
155
        if wait:
156
          waiter.Wait(0.001)
157
      finally:
158
        waiter.Close()
159

    
160
      # Ensure file descriptor was closed
161
      self._EnsureNotifierClosed(waiter._notifier)
162

    
163
  def testChangingFile(self):
164
    waiter = jqueue._JobFileChangesWaiter(self.filename)
165
    try:
166
      self.assertFalse(waiter.Wait(0.1))
167
      utils.WriteFile(self.filename, data="changed")
168
      self.assert_(waiter.Wait(60))
169
    finally:
170
      waiter.Close()
171

    
172
    self._EnsureNotifierClosed(waiter._notifier)
173

    
174
  def testChangingFile2(self):
175
    waiter = jqueue._JobChangesWaiter(self.filename)
176
    try:
177
      self.assertFalse(waiter._filewaiter)
178
      self.assert_(waiter.Wait(0.1))
179
      self.assert_(waiter._filewaiter)
180

    
181
      # File waiter is now used, but there have been no changes
182
      self.assertFalse(waiter.Wait(0.1))
183
      utils.WriteFile(self.filename, data="changed")
184
      self.assert_(waiter.Wait(60))
185
    finally:
186
      waiter.Close()
187

    
188
    self._EnsureNotifierClosed(waiter._filewaiter._notifier)
189

    
190

    
191
class TestWaitForJobChangesHelper(unittest.TestCase):
192
  def setUp(self):
193
    self.tmpdir = tempfile.mkdtemp()
194
    self.filename = utils.PathJoin(self.tmpdir, "job-2614226563")
195
    utils.WriteFile(self.filename, data="")
196

    
197
  def tearDown(self):
198
    shutil.rmtree(self.tmpdir)
199

    
200
  def _LoadWaitingJob(self):
201
    return _FakeJob(2614226563, constants.JOB_STATUS_WAITLOCK)
202

    
203
  def _LoadLostJob(self):
204
    return None
205

    
206
  def testNoChanges(self):
207
    wfjc = jqueue._WaitForJobChangesHelper()
208

    
209
    # No change
210
    self.assertEqual(wfjc(self.filename, self._LoadWaitingJob, ["status"],
211
                          [constants.JOB_STATUS_WAITLOCK], None, 0.1),
212
                     constants.JOB_NOTCHANGED)
213

    
214
    # No previous information
215
    self.assertEqual(wfjc(self.filename, self._LoadWaitingJob,
216
                          ["status"], None, None, 1.0),
217
                     ([constants.JOB_STATUS_WAITLOCK], []))
218

    
219
  def testLostJob(self):
220
    wfjc = jqueue._WaitForJobChangesHelper()
221
    self.assert_(wfjc(self.filename, self._LoadLostJob,
222
                      ["status"], None, None, 1.0) is None)
223

    
224

    
225
class TestEncodeOpError(unittest.TestCase):
226
  def test(self):
227
    encerr = jqueue._EncodeOpError(errors.LockError("Test 1"))
228
    self.assert_(isinstance(encerr, tuple))
229
    self.assertRaises(errors.LockError, errors.MaybeRaise, encerr)
230

    
231
    encerr = jqueue._EncodeOpError(errors.GenericError("Test 2"))
232
    self.assert_(isinstance(encerr, tuple))
233
    self.assertRaises(errors.GenericError, errors.MaybeRaise, encerr)
234

    
235
    encerr = jqueue._EncodeOpError(NotImplementedError("Foo"))
236
    self.assert_(isinstance(encerr, tuple))
237
    self.assertRaises(errors.OpExecError, errors.MaybeRaise, encerr)
238

    
239
    encerr = jqueue._EncodeOpError("Hello World")
240
    self.assert_(isinstance(encerr, tuple))
241
    self.assertRaises(errors.OpExecError, errors.MaybeRaise, encerr)
242

    
243

    
244
class TestQueuedOpCode(unittest.TestCase):
245
  def testDefaults(self):
246
    def _Check(op):
247
      self.assertFalse(hasattr(op.input, "dry_run"))
248
      self.assertEqual(op.priority, constants.OP_PRIO_DEFAULT)
249
      self.assertFalse(op.log)
250
      self.assert_(op.start_timestamp is None)
251
      self.assert_(op.exec_timestamp is None)
252
      self.assert_(op.end_timestamp is None)
253
      self.assert_(op.result is None)
254
      self.assertEqual(op.status, constants.OP_STATUS_QUEUED)
255

    
256
    op1 = jqueue._QueuedOpCode(opcodes.OpTestDelay())
257
    _Check(op1)
258
    op2 = jqueue._QueuedOpCode.Restore(op1.Serialize())
259
    _Check(op2)
260
    self.assertEqual(op1.Serialize(), op2.Serialize())
261

    
262
  def testPriority(self):
263
    def _Check(op):
264
      assert constants.OP_PRIO_DEFAULT != constants.OP_PRIO_HIGH, \
265
             "Default priority equals high priority; test can't work"
266
      self.assertEqual(op.priority, constants.OP_PRIO_HIGH)
267
      self.assertEqual(op.status, constants.OP_STATUS_QUEUED)
268

    
269
    inpop = opcodes.OpGetTags(priority=constants.OP_PRIO_HIGH)
270
    op1 = jqueue._QueuedOpCode(inpop)
271
    _Check(op1)
272
    op2 = jqueue._QueuedOpCode.Restore(op1.Serialize())
273
    _Check(op2)
274
    self.assertEqual(op1.Serialize(), op2.Serialize())
275

    
276

    
277
class TestQueuedJob(unittest.TestCase):
278
  def test(self):
279
    self.assertRaises(errors.GenericError, jqueue._QueuedJob,
280
                      None, 1, [])
281

    
282
  def testDefaults(self):
283
    job_id = 4260
284
    ops = [
285
      opcodes.OpGetTags(),
286
      opcodes.OpTestDelay(),
287
      ]
288

    
289
    def _Check(job):
290
      self.assertEqual(job.id, job_id)
291
      self.assertEqual(job.log_serial, 0)
292
      self.assert_(job.received_timestamp)
293
      self.assert_(job.start_timestamp is None)
294
      self.assert_(job.end_timestamp is None)
295
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
296
      self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT)
297
      self.assert_(repr(job).startswith("<"))
298
      self.assertEqual(len(job.ops), len(ops))
299
      self.assert_(compat.all(inp.__getstate__() == op.input.__getstate__()
300
                              for (inp, op) in zip(ops, job.ops)))
301
      self.assertRaises(errors.OpExecError, job.GetInfo,
302
                        ["unknown-field"])
303
      self.assertEqual(job.GetInfo(["summary"]),
304
                       [[op.input.Summary() for op in job.ops]])
305

    
306
    job1 = jqueue._QueuedJob(None, job_id, ops)
307
    _Check(job1)
308
    job2 = jqueue._QueuedJob.Restore(None, job1.Serialize())
309
    _Check(job2)
310
    self.assertEqual(job1.Serialize(), job2.Serialize())
311

    
312
  def testPriority(self):
313
    job_id = 4283
314
    ops = [
315
      opcodes.OpGetTags(priority=constants.OP_PRIO_DEFAULT),
316
      opcodes.OpTestDelay(),
317
      ]
318

    
319
    def _Check(job):
320
      self.assertEqual(job.id, job_id)
321
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
322
      self.assert_(repr(job).startswith("<"))
323

    
324
    job = jqueue._QueuedJob(None, job_id, ops)
325
    _Check(job)
326
    self.assert_(compat.all(op.priority == constants.OP_PRIO_DEFAULT
327
                            for op in job.ops))
328
    self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT)
329

    
330
    # Increase first
331
    job.ops[0].priority -= 1
332
    _Check(job)
333
    self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT - 1)
334

    
335
    # Mark opcode as finished
336
    job.ops[0].status = constants.OP_STATUS_SUCCESS
337
    _Check(job)
338
    self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT)
339

    
340
    # Increase second
341
    job.ops[1].priority -= 10
342
    self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT - 10)
343

    
344
    # Test increasing first
345
    job.ops[0].status = constants.OP_STATUS_RUNNING
346
    job.ops[0].priority -= 19
347
    self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT - 20)
348

    
349

    
350
class _FakeQueueForProc:
351
  def __init__(self):
352
    self._acquired = False
353

    
354
  def IsAcquired(self):
355
    return self._acquired
356

    
357
  def acquire(self, shared=0):
358
    assert shared == 1
359
    self._acquired = True
360

    
361
  def release(self):
362
    assert self._acquired
363
    self._acquired = False
364

    
365
  def UpdateJobUnlocked(self, job, replicate=None):
366
    # TODO: Ensure job is updated at the correct places
367
    pass
368

    
369

    
370
class _FakeExecOpCodeForProc:
371
  def __init__(self, before_start, after_start):
372
    self._before_start = before_start
373
    self._after_start = after_start
374

    
375
  def __call__(self, op, cbs):
376
    assert isinstance(op, opcodes.OpTestDummy)
377

    
378
    if self._before_start:
379
      self._before_start()
380

    
381
    cbs.NotifyStart()
382

    
383
    if self._after_start:
384
      self._after_start(op, cbs)
385

    
386
    if op.fail:
387
      raise errors.OpExecError("Error requested (%s)" % op.result)
388

    
389
    return op.result
390

    
391

    
392
class TestJobProcessor(unittest.TestCase):
393
  def _CreateJob(self, queue, job_id, ops):
394
    job = jqueue._QueuedJob(queue, job_id, ops)
395
    self.assertFalse(job.start_timestamp)
396
    self.assertFalse(job.end_timestamp)
397
    self.assertEqual(len(ops), len(job.ops))
398
    self.assert_(compat.all(op.input == inp
399
                            for (op, inp) in zip(job.ops, ops)))
400
    self.assertEqual(job.GetInfo(["ops"]), [[op.__getstate__() for op in ops]])
401
    return job
402

    
403
  def _GenericCheckJob(self, job):
404
    assert compat.all(isinstance(op.input, opcodes.OpTestDummy)
405
                      for op in job.ops)
406

    
407
    self.assertEqual(job.GetInfo(["opstart", "opexec", "opend"]),
408
                     [[op.start_timestamp for op in job.ops],
409
                      [op.exec_timestamp for op in job.ops],
410
                      [op.end_timestamp for op in job.ops]])
411
    self.assertEqual(job.GetInfo(["received_ts", "start_ts", "end_ts"]),
412
                     [job.received_timestamp,
413
                      job.start_timestamp,
414
                      job.end_timestamp])
415
    self.assert_(job.start_timestamp)
416
    self.assert_(job.end_timestamp)
417
    self.assertEqual(job.start_timestamp, job.ops[0].start_timestamp)
418

    
419
  def testSuccess(self):
420
    queue = _FakeQueueForProc()
421

    
422
    for (job_id, opcount) in [(25351, 1), (6637, 3),
423
                              (24644, 10), (32207, 100)]:
424
      ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
425
             for i in range(opcount)]
426

    
427
      # Create job
428
      job = self._CreateJob(queue, job_id, ops)
429

    
430
      def _BeforeStart():
431
        self.assertFalse(queue.IsAcquired())
432
        self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITLOCK)
433

    
434
      def _AfterStart(op, cbs):
435
        self.assertFalse(queue.IsAcquired())
436
        self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
437

    
438
        # Job is running, cancelling shouldn't be possible
439
        (success, _) = job.Cancel()
440
        self.assertFalse(success)
441

    
442
      opexec = _FakeExecOpCodeForProc(_BeforeStart, _AfterStart)
443

    
444
      for idx in range(len(ops)):
445
        result = jqueue._JobProcessor(queue, opexec, job)()
446
        if idx == len(ops) - 1:
447
          # Last opcode
448
          self.assert_(result)
449
        else:
450
          self.assertFalse(result)
451

    
452
          self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
453
          self.assert_(job.start_timestamp)
454
          self.assertFalse(job.end_timestamp)
455

    
456
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_SUCCESS)
457
      self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_SUCCESS])
458
      self.assertEqual(job.GetInfo(["opresult"]),
459
                       [[op.input.result for op in job.ops]])
460
      self.assertEqual(job.GetInfo(["opstatus"]),
461
                       [len(job.ops) * [constants.OP_STATUS_SUCCESS]])
462
      self.assert_(compat.all(op.start_timestamp and op.end_timestamp
463
                              for op in job.ops))
464

    
465
      self._GenericCheckJob(job)
466

    
467
      # Finished jobs can't be processed any further
468
      self.assertRaises(errors.ProgrammerError,
469
                        jqueue._JobProcessor(queue, opexec, job))
470

    
471
  def testOpcodeError(self):
472
    queue = _FakeQueueForProc()
473

    
474
    testdata = [
475
      (17077, 1, 0, 0),
476
      (1782, 5, 2, 2),
477
      (18179, 10, 9, 9),
478
      (4744, 10, 3, 8),
479
      (23816, 100, 39, 45),
480
      ]
481

    
482
    for (job_id, opcount, failfrom, failto) in testdata:
483
      # Prepare opcodes
484
      ops = [opcodes.OpTestDummy(result="Res%s" % i,
485
                                 fail=(failfrom <= i and
486
                                       i <= failto))
487
             for i in range(opcount)]
488

    
489
      # Create job
490
      job = self._CreateJob(queue, job_id, ops)
491

    
492
      opexec = _FakeExecOpCodeForProc(None, None)
493

    
494
      for idx in range(len(ops)):
495
        result = jqueue._JobProcessor(queue, opexec, job)()
496

    
497
        if idx in (failfrom, len(ops) - 1):
498
          # Last opcode
499
          self.assert_(result)
500
          break
501

    
502
        self.assertFalse(result)
503

    
504
        self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
505

    
506
      # Check job status
507
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_ERROR)
508
      self.assertEqual(job.GetInfo(["id"]), [job_id])
509
      self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_ERROR])
510

    
511
      # Check opcode status
512
      data = zip(job.ops,
513
                 job.GetInfo(["opstatus"])[0],
514
                 job.GetInfo(["opresult"])[0])
515

    
516
      for idx, (op, opstatus, opresult) in enumerate(data):
517
        if idx < failfrom:
518
          assert not op.input.fail
519
          self.assertEqual(opstatus, constants.OP_STATUS_SUCCESS)
520
          self.assertEqual(opresult, op.input.result)
521
        elif idx <= failto:
522
          assert op.input.fail
523
          self.assertEqual(opstatus, constants.OP_STATUS_ERROR)
524
          self.assertRaises(errors.OpExecError, errors.MaybeRaise, opresult)
525
        else:
526
          assert not op.input.fail
527
          self.assertEqual(opstatus, constants.OP_STATUS_ERROR)
528
          self.assertRaises(errors.OpExecError, errors.MaybeRaise, opresult)
529

    
530
      self.assert_(compat.all(op.start_timestamp and op.end_timestamp
531
                              for op in job.ops[:failfrom]))
532

    
533
      self._GenericCheckJob(job)
534

    
535
      # Finished jobs can't be processed any further
536
      self.assertRaises(errors.ProgrammerError,
537
                        jqueue._JobProcessor(queue, opexec, job))
538

    
539
  def testCancelWhileInQueue(self):
540
    queue = _FakeQueueForProc()
541

    
542
    ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
543
           for i in range(5)]
544

    
545
    # Create job
546
    job_id = 17045
547
    job = self._CreateJob(queue, job_id, ops)
548

    
549
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
550

    
551
    # Mark as cancelled
552
    (success, _) = job.Cancel()
553
    self.assert_(success)
554

    
555
    self.assert_(compat.all(op.status == constants.OP_STATUS_CANCELED
556
                            for op in job.ops))
557

    
558
    opexec = _FakeExecOpCodeForProc(None, None)
559
    jqueue._JobProcessor(queue, opexec, job)()
560

    
561
    # Check result
562
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_CANCELED)
563
    self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_CANCELED])
564
    self.assertFalse(job.start_timestamp)
565
    self.assert_(job.end_timestamp)
566
    self.assertFalse(compat.any(op.start_timestamp or op.end_timestamp
567
                                for op in job.ops))
568
    self.assertEqual(job.GetInfo(["opstatus", "opresult"]),
569
                     [[constants.OP_STATUS_CANCELED for _ in job.ops],
570
                      ["Job canceled by request" for _ in job.ops]])
571

    
572
  def testCancelWhileWaitlock(self):
573
    queue = _FakeQueueForProc()
574

    
575
    ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
576
           for i in range(5)]
577

    
578
    # Create job
579
    job_id = 11009
580
    job = self._CreateJob(queue, job_id, ops)
581

    
582
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
583

    
584
    def _BeforeStart():
585
      self.assertFalse(queue.IsAcquired())
586
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITLOCK)
587

    
588
      # Mark as cancelled
589
      (success, _) = job.Cancel()
590
      self.assert_(success)
591

    
592
      self.assert_(compat.all(op.status == constants.OP_STATUS_CANCELING
593
                              for op in job.ops))
594

    
595
    def _AfterStart(op, cbs):
596
      self.assertFalse(queue.IsAcquired())
597
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
598

    
599
    opexec = _FakeExecOpCodeForProc(_BeforeStart, _AfterStart)
600

    
601
    jqueue._JobProcessor(queue, opexec, job)()
602

    
603
    # Check result
604
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_CANCELED)
605
    self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_CANCELED])
606
    self.assert_(job.start_timestamp)
607
    self.assert_(job.end_timestamp)
608
    self.assertFalse(compat.all(op.start_timestamp and op.end_timestamp
609
                                for op in job.ops))
610
    self.assertEqual(job.GetInfo(["opstatus", "opresult"]),
611
                     [[constants.OP_STATUS_CANCELED for _ in job.ops],
612
                      ["Job canceled by request" for _ in job.ops]])
613

    
614
  def testCancelWhileRunning(self):
615
    # Tests canceling a job with finished opcodes and more, unprocessed ones
616
    queue = _FakeQueueForProc()
617

    
618
    ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
619
           for i in range(3)]
620

    
621
    # Create job
622
    job_id = 28492
623
    job = self._CreateJob(queue, job_id, ops)
624

    
625
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
626

    
627
    opexec = _FakeExecOpCodeForProc(None, None)
628

    
629
    # Run one opcode
630
    self.assertFalse(jqueue._JobProcessor(queue, opexec, job)())
631

    
632
    # Job goes back to queued
633
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
634
    self.assertEqual(job.GetInfo(["opstatus", "opresult"]),
635
                     [[constants.OP_STATUS_SUCCESS,
636
                       constants.OP_STATUS_QUEUED,
637
                       constants.OP_STATUS_QUEUED],
638
                      ["Res0", None, None]])
639

    
640
    # Mark as cancelled
641
    (success, _) = job.Cancel()
642
    self.assert_(success)
643

    
644
    # Try processing another opcode (this will actually cancel the job)
645
    self.assert_(jqueue._JobProcessor(queue, opexec, job)())
646

    
647
    # Check result
648
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_CANCELED)
649
    self.assertEqual(job.GetInfo(["id"]), [job_id])
650
    self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_CANCELED])
651
    self.assertEqual(job.GetInfo(["opstatus", "opresult"]),
652
                     [[constants.OP_STATUS_SUCCESS,
653
                       constants.OP_STATUS_CANCELED,
654
                       constants.OP_STATUS_CANCELED],
655
                      ["Res0", "Job canceled by request",
656
                       "Job canceled by request"]])
657

    
658
  def testPartiallyRun(self):
659
    # Tests calling the processor on a job that's been partially run before the
660
    # program was restarted
661
    queue = _FakeQueueForProc()
662

    
663
    opexec = _FakeExecOpCodeForProc(None, None)
664

    
665
    for job_id, successcount in [(30697, 1), (2552, 4), (12489, 9)]:
666
      ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
667
             for i in range(10)]
668

    
669
      # Create job
670
      job = self._CreateJob(queue, job_id, ops)
671

    
672
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
673

    
674
      for _ in range(successcount):
675
        self.assertFalse(jqueue._JobProcessor(queue, opexec, job)())
676

    
677
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
678
      self.assertEqual(job.GetInfo(["opstatus"]),
679
                       [[constants.OP_STATUS_SUCCESS
680
                         for _ in range(successcount)] +
681
                        [constants.OP_STATUS_QUEUED
682
                         for _ in range(len(ops) - successcount)]])
683

    
684
      self.assert_(job.current_op)
685

    
686
      # Serialize and restore (simulates program restart)
687
      newjob = jqueue._QueuedJob.Restore(queue, job.Serialize())
688
      self.assertFalse(newjob.current_op)
689
      self._TestPartial(newjob, successcount)
690

    
691
  def _TestPartial(self, job, successcount):
692
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
693
    self.assertEqual(job.start_timestamp, job.ops[0].start_timestamp)
694

    
695
    queue = _FakeQueueForProc()
696
    opexec = _FakeExecOpCodeForProc(None, None)
697

    
698
    for remaining in reversed(range(len(job.ops) - successcount)):
699
      result = jqueue._JobProcessor(queue, opexec, job)()
700

    
701
      if remaining == 0:
702
        # Last opcode
703
        self.assert_(result)
704
        break
705

    
706
      self.assertFalse(result)
707

    
708
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
709

    
710
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_SUCCESS)
711
    self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_SUCCESS])
712
    self.assertEqual(job.GetInfo(["opresult"]),
713
                     [[op.input.result for op in job.ops]])
714
    self.assertEqual(job.GetInfo(["opstatus"]),
715
                     [[constants.OP_STATUS_SUCCESS for _ in job.ops]])
716
    self.assert_(compat.all(op.start_timestamp and op.end_timestamp
717
                            for op in job.ops))
718

    
719
    self._GenericCheckJob(job)
720

    
721
    # Finished jobs can't be processed any further
722
    self.assertRaises(errors.ProgrammerError,
723
                      jqueue._JobProcessor(queue, opexec, job))
724

    
725
    # ... also after being restored
726
    job2 = jqueue._QueuedJob.Restore(queue, job.Serialize())
727
    self.assertRaises(errors.ProgrammerError,
728
                      jqueue._JobProcessor(queue, opexec, job2))
729

    
730
  def testProcessorOnRunningJob(self):
731
    ops = [opcodes.OpTestDummy(result="result", fail=False)]
732

    
733
    queue = _FakeQueueForProc()
734
    opexec = _FakeExecOpCodeForProc(None, None)
735

    
736
    # Create job
737
    job = self._CreateJob(queue, 9571, ops)
738

    
739
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
740

    
741
    job.ops[0].status = constants.OP_STATUS_RUNNING
742

    
743
    assert len(job.ops) == 1
744

    
745
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
746

    
747
    # Calling on running job must fail
748
    self.assertRaises(errors.ProgrammerError,
749
                      jqueue._JobProcessor(queue, opexec, job))
750

    
751
  def testLogMessages(self):
752
    # Tests the "Feedback" callback function
753
    queue = _FakeQueueForProc()
754

    
755
    messages = {
756
      1: [
757
        (None, "Hello"),
758
        (None, "World"),
759
        (constants.ELOG_MESSAGE, "there"),
760
        ],
761
      4: [
762
        (constants.ELOG_JQUEUE_TEST, (1, 2, 3)),
763
        (constants.ELOG_JQUEUE_TEST, ("other", "type")),
764
        ],
765
      }
766
    ops = [opcodes.OpTestDummy(result="Logtest%s" % i, fail=False,
767
                               messages=messages.get(i, []))
768
           for i in range(5)]
769

    
770
    # Create job
771
    job = self._CreateJob(queue, 29386, ops)
772

    
773
    def _BeforeStart():
774
      self.assertFalse(queue.IsAcquired())
775
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITLOCK)
776

    
777
    def _AfterStart(op, cbs):
778
      self.assertFalse(queue.IsAcquired())
779
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
780

    
781
      self.assertRaises(AssertionError, cbs.Feedback,
782
                        "too", "many", "arguments")
783

    
784
      for (log_type, msg) in op.messages:
785
        if log_type:
786
          cbs.Feedback(log_type, msg)
787
        else:
788
          cbs.Feedback(msg)
789

    
790
    opexec = _FakeExecOpCodeForProc(_BeforeStart, _AfterStart)
791

    
792
    for remaining in reversed(range(len(job.ops))):
793
      result = jqueue._JobProcessor(queue, opexec, job)()
794

    
795
      if remaining == 0:
796
        # Last opcode
797
        self.assert_(result)
798
        break
799

    
800
      self.assertFalse(result)
801

    
802
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
803

    
804
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_SUCCESS)
805
    self.assertEqual(job.GetInfo(["opresult"]),
806
                     [[op.input.result for op in job.ops]])
807

    
808
    logmsgcount = sum(len(m) for m in messages.values())
809

    
810
    self._CheckLogMessages(job, logmsgcount)
811

    
812
    # Serialize and restore (simulates program restart)
813
    newjob = jqueue._QueuedJob.Restore(queue, job.Serialize())
814
    self._CheckLogMessages(newjob, logmsgcount)
815

    
816
    # Check each message
817
    prevserial = -1
818
    for idx, oplog in enumerate(job.GetInfo(["oplog"])[0]):
819
      for (serial, timestamp, log_type, msg) in oplog:
820
        (exptype, expmsg) = messages.get(idx).pop(0)
821
        if exptype:
822
          self.assertEqual(log_type, exptype)
823
        else:
824
          self.assertEqual(log_type, constants.ELOG_MESSAGE)
825
        self.assertEqual(expmsg, msg)
826
        self.assert_(serial > prevserial)
827
        prevserial = serial
828

    
829
  def _CheckLogMessages(self, job, count):
830
    # Check serial
831
    self.assertEqual(job.log_serial, count)
832

    
833
    # No filter
834
    self.assertEqual(job.GetLogEntries(None),
835
                     [entry for entries in job.GetInfo(["oplog"])[0] if entries
836
                      for entry in entries])
837

    
838
    # Filter with serial
839
    assert count > 3
840
    self.assert_(job.GetLogEntries(3))
841
    self.assertEqual(job.GetLogEntries(3),
842
                     [entry for entries in job.GetInfo(["oplog"])[0] if entries
843
                      for entry in entries][3:])
844

    
845
    # No log message after highest serial
846
    self.assertFalse(job.GetLogEntries(count))
847
    self.assertFalse(job.GetLogEntries(count + 3))
848

    
849

    
850
if __name__ == "__main__":
851
  testutils.GanetiTestProgram()