Revision 989a8bee
b/Makefile.am | ||
---|---|---|
377 | 377 |
test/ganeti.hooks_unittest.py \ |
378 | 378 |
test/ganeti.http_unittest.py \ |
379 | 379 |
test/ganeti.impexpd_unittest.py \ |
380 |
test/ganeti.jqueue_unittest.py \ |
|
380 | 381 |
test/ganeti.locking_unittest.py \ |
381 | 382 |
test/ganeti.luxi_unittest.py \ |
382 | 383 |
test/ganeti.masterd.instance_unittest.py \ |
b/lib/constants.py | ||
---|---|---|
785 | 785 |
JOB_STATUS_CANCELED = "canceled" |
786 | 786 |
JOB_STATUS_SUCCESS = "success" |
787 | 787 |
JOB_STATUS_ERROR = "error" |
788 |
JOBS_FINALIZED = frozenset([ |
|
789 |
JOB_STATUS_CANCELED, |
|
790 |
JOB_STATUS_SUCCESS, |
|
791 |
JOB_STATUS_ERROR, |
|
792 |
]) |
|
788 | 793 |
|
789 | 794 |
# OpCode status |
790 | 795 |
# not yet finalized |
b/lib/jqueue.py | ||
---|---|---|
54 | 54 |
from ganeti import jstore |
55 | 55 |
from ganeti import rpc |
56 | 56 |
from ganeti import netutils |
57 |
from ganeti import compat |
|
57 | 58 |
|
58 | 59 |
|
59 | 60 |
JOBQUEUE_THREADS = 25 |
... | ... | |
481 | 482 |
self._job.lock_status = msg |
482 | 483 |
|
483 | 484 |
|
484 |
class _WaitForJobChangesHelper(object): |
|
485 |
"""Helper class using initofy to wait for changes in a job file. |
|
486 |
|
|
487 |
This class takes a previous job status and serial, and alerts the client when |
|
488 |
the current job status has changed. |
|
485 |
class _JobChangesChecker(object): |
|
486 |
def __init__(self, fields, prev_job_info, prev_log_serial): |
|
487 |
"""Initializes this class. |
|
489 | 488 |
|
490 |
@type job_id: string |
|
491 |
@ivar job_id: id of the job we're watching |
|
492 |
@type prev_job_info: string |
|
493 |
@ivar prev_job_info: previous job info, as passed by the luxi client |
|
494 |
@type prev_log_serial: string |
|
495 |
@ivar prev_log_serial: previous job serial, as passed by the luxi client |
|
496 |
@type queue: L{JobQueue} |
|
497 |
@ivar queue: job queue (used for a few utility functions) |
|
498 |
@type job_path: string |
|
499 |
@ivar job_path: absolute path of the job file |
|
500 |
@type wm: pyinotify.WatchManager (or None) |
|
501 |
@ivar wm: inotify watch manager to watch for changes |
|
502 |
@type inotify_handler: L{asyncnotifier.SingleFileEventHandler} |
|
503 |
@ivar inotify_handler: single file event handler, used for watching |
|
504 |
@type notifier: pyinotify.Notifier |
|
505 |
@ivar notifier: inotify single-threaded notifier, used for watching |
|
489 |
@type fields: list of strings |
|
490 |
@param fields: Fields requested by LUXI client |
|
491 |
@type prev_job_info: string |
|
492 |
@param prev_job_info: previous job info, as passed by the LUXI client |
|
493 |
@type prev_log_serial: string |
|
494 |
@param prev_log_serial: previous job serial, as passed by the LUXI client |
|
506 | 495 |
|
507 |
""" |
|
508 |
def __init__(self, job_id, fields, prev_job_info, prev_log_serial, queue): |
|
509 |
self.job_id = job_id |
|
510 |
self.fields = fields |
|
511 |
self.prev_job_info = prev_job_info |
|
512 |
self.prev_log_serial = prev_log_serial |
|
513 |
self.queue = queue |
|
514 |
# pylint: disable-msg=W0212 |
|
515 |
self.job_path = self.queue._GetJobPath(self.job_id) |
|
516 |
self.wm = None |
|
517 |
self.inotify_handler = None |
|
518 |
self.notifier = None |
|
496 |
""" |
|
497 |
self._fields = fields |
|
498 |
self._prev_job_info = prev_job_info |
|
499 |
self._prev_log_serial = prev_log_serial |
|
519 | 500 |
|
520 |
def _SetupInotify(self):
|
|
521 |
"""Create the inotify
|
|
501 |
def __call__(self, job):
|
|
502 |
"""Checks whether job has changed.
|
|
522 | 503 |
|
523 |
@raises errors.InotifyError: if the notifier cannot be setup |
|
504 |
@type job: L{_QueuedJob} |
|
505 |
@param job: Job object |
|
524 | 506 |
|
525 | 507 |
""" |
526 |
if self.wm: |
|
527 |
return |
|
528 |
self.wm = pyinotify.WatchManager() |
|
529 |
self.inotify_handler = asyncnotifier.SingleFileEventHandler(self.wm, |
|
530 |
self.OnInotify, |
|
531 |
self.job_path) |
|
532 |
self.notifier = pyinotify.Notifier(self.wm, self.inotify_handler) |
|
533 |
self.inotify_handler.enable() |
|
534 |
|
|
535 |
def _LoadDiskStatus(self): |
|
536 |
job = self.queue.SafeLoadJobFromDisk(self.job_id) |
|
537 |
if not job: |
|
538 |
raise errors.JobLost() |
|
539 |
self.job_status = job.CalcStatus() |
|
508 |
status = job.CalcStatus() |
|
509 |
job_info = job.GetInfo(self._fields) |
|
510 |
log_entries = job.GetLogEntries(self._prev_log_serial) |
|
540 | 511 |
|
541 |
job_info = job.GetInfo(self.fields) |
|
542 |
log_entries = job.GetLogEntries(self.prev_log_serial) |
|
543 | 512 |
# Serializing and deserializing data can cause type changes (e.g. from |
544 | 513 |
# tuple to list) or precision loss. We're doing it here so that we get |
545 | 514 |
# the same modifications as the data received from the client. Without |
... | ... | |
547 | 516 |
# significantly different. |
548 | 517 |
# TODO: we just deserialized from disk, investigate how to make sure that |
549 | 518 |
# the job info and log entries are compatible to avoid this further step. |
550 |
self.job_info = serializer.LoadJson(serializer.DumpJson(job_info)) |
|
551 |
self.log_entries = serializer.LoadJson(serializer.DumpJson(log_entries)) |
|
519 |
# TODO: Doing something like in testutils.py:UnifyValueType might be more |
|
520 |
# efficient, though floats will be tricky |
|
521 |
job_info = serializer.LoadJson(serializer.DumpJson(job_info)) |
|
522 |
log_entries = serializer.LoadJson(serializer.DumpJson(log_entries)) |
|
552 | 523 |
|
553 |
def _CheckForChanges(self): |
|
554 |
self._LoadDiskStatus() |
|
555 | 524 |
# Don't even try to wait if the job is no longer running, there will be |
556 | 525 |
# no changes. |
557 |
if (self.job_status not in (constants.JOB_STATUS_QUEUED,
|
|
558 |
constants.JOB_STATUS_RUNNING,
|
|
559 |
constants.JOB_STATUS_WAITLOCK) or
|
|
560 |
self.prev_job_info != self.job_info or
|
|
561 |
(self.log_entries and self.prev_log_serial != self.log_entries[0][0])):
|
|
562 |
logging.debug("Job %s changed", self.job_id)
|
|
563 |
return (self.job_info, self.log_entries)
|
|
526 |
if (status not in (constants.JOB_STATUS_QUEUED, |
|
527 |
constants.JOB_STATUS_RUNNING, |
|
528 |
constants.JOB_STATUS_WAITLOCK) or |
|
529 |
job_info != self._prev_job_info or
|
|
530 |
(log_entries and self._prev_log_serial != log_entries[0][0])):
|
|
531 |
logging.debug("Job %s changed", job.id)
|
|
532 |
return (job_info, log_entries)
|
|
564 | 533 |
|
565 |
raise utils.RetryAgain() |
|
534 |
return None |
|
535 |
|
|
536 |
|
|
537 |
class _JobFileChangesWaiter(object): |
|
538 |
def __init__(self, filename): |
|
539 |
"""Initializes this class. |
|
540 |
|
|
541 |
@type filename: string |
|
542 |
@param filename: Path to job file |
|
543 |
@raises errors.InotifyError: if the notifier cannot be setup |
|
566 | 544 |
|
567 |
def OnInotify(self, notifier_enabled): |
|
545 |
""" |
|
546 |
self._wm = pyinotify.WatchManager() |
|
547 |
self._inotify_handler = \ |
|
548 |
asyncnotifier.SingleFileEventHandler(self._wm, self._OnInotify, filename) |
|
549 |
self._notifier = \ |
|
550 |
pyinotify.Notifier(self._wm, default_proc_fun=self._inotify_handler) |
|
551 |
try: |
|
552 |
self._inotify_handler.enable() |
|
553 |
except Exception: |
|
554 |
# pyinotify doesn't close file descriptors automatically |
|
555 |
self._notifier.stop() |
|
556 |
raise |
|
557 |
|
|
558 |
def _OnInotify(self, notifier_enabled): |
|
559 |
"""Callback for inotify. |
|
560 |
|
|
561 |
""" |
|
568 | 562 |
if not notifier_enabled: |
569 |
self.inotify_handler.enable() |
|
563 |
self._inotify_handler.enable() |
|
564 |
|
|
565 |
def Wait(self, timeout): |
|
566 |
"""Waits for the job file to change. |
|
567 |
|
|
568 |
@type timeout: float |
|
569 |
@param timeout: Timeout in seconds |
|
570 |
@return: Whether there have been events |
|
571 |
|
|
572 |
""" |
|
573 |
assert timeout >= 0 |
|
574 |
have_events = self._notifier.check_events(timeout * 1000) |
|
575 |
if have_events: |
|
576 |
self._notifier.read_events() |
|
577 |
self._notifier.process_events() |
|
578 |
return have_events |
|
579 |
|
|
580 |
def Close(self): |
|
581 |
"""Closes underlying notifier and its file descriptor. |
|
582 |
|
|
583 |
""" |
|
584 |
self._notifier.stop() |
|
585 |
|
|
586 |
|
|
587 |
class _JobChangesWaiter(object): |
|
588 |
def __init__(self, filename): |
|
589 |
"""Initializes this class. |
|
590 |
|
|
591 |
@type filename: string |
|
592 |
@param filename: Path to job file |
|
593 |
|
|
594 |
""" |
|
595 |
self._filewaiter = None |
|
596 |
self._filename = filename |
|
570 | 597 |
|
571 |
def WaitFn(self, timeout): |
|
572 |
self._SetupInotify() |
|
573 |
if self.notifier.check_events(timeout*1000): |
|
574 |
self.notifier.read_events() |
|
575 |
self.notifier.process_events() |
|
598 |
def Wait(self, timeout): |
|
599 |
"""Waits for a job to change. |
|
576 | 600 |
|
577 |
def WaitForChanges(self, timeout): |
|
578 |
self._SetupInotify() |
|
601 |
@type timeout: float |
|
602 |
@param timeout: Timeout in seconds |
|
603 |
@return: Whether there have been events |
|
604 |
|
|
605 |
""" |
|
606 |
if self._filewaiter: |
|
607 |
return self._filewaiter.Wait(timeout) |
|
608 |
|
|
609 |
# Lazy setup: Avoid inotify setup cost when job file has already changed. |
|
610 |
# If this point is reached, return immediately and let caller check the job |
|
611 |
# file again in case there were changes since the last check. This avoids a |
|
612 |
# race condition. |
|
613 |
self._filewaiter = _JobFileChangesWaiter(self._filename) |
|
614 |
|
|
615 |
return True |
|
616 |
|
|
617 |
def Close(self): |
|
618 |
"""Closes underlying waiter. |
|
619 |
|
|
620 |
""" |
|
621 |
if self._filewaiter: |
|
622 |
self._filewaiter.Close() |
|
623 |
|
|
624 |
|
|
625 |
class _WaitForJobChangesHelper(object): |
|
626 |
"""Helper class using inotify to wait for changes in a job file. |
|
627 |
|
|
628 |
This class takes a previous job status and serial, and alerts the client when |
|
629 |
the current job status has changed. |
|
630 |
|
|
631 |
""" |
|
632 |
@staticmethod |
|
633 |
def _CheckForChanges(job_load_fn, check_fn): |
|
634 |
job = job_load_fn() |
|
635 |
if not job: |
|
636 |
raise errors.JobLost() |
|
637 |
|
|
638 |
result = check_fn(job) |
|
639 |
if result is None: |
|
640 |
raise utils.RetryAgain() |
|
641 |
|
|
642 |
return result |
|
643 |
|
|
644 |
def __call__(self, filename, job_load_fn, |
|
645 |
fields, prev_job_info, prev_log_serial, timeout): |
|
646 |
"""Waits for changes on a job. |
|
647 |
|
|
648 |
@type filename: string |
|
649 |
@param filename: File on which to wait for changes |
|
650 |
@type job_load_fn: callable |
|
651 |
@param job_load_fn: Function to load job |
|
652 |
@type fields: list of strings |
|
653 |
@param fields: Which fields to check for changes |
|
654 |
@type prev_job_info: list or None |
|
655 |
@param prev_job_info: Last job information returned |
|
656 |
@type prev_log_serial: int |
|
657 |
@param prev_log_serial: Last job message serial number |
|
658 |
@type timeout: float |
|
659 |
@param timeout: maximum time to wait in seconds |
|
660 |
|
|
661 |
""" |
|
579 | 662 |
try: |
580 |
return utils.Retry(self._CheckForChanges, |
|
581 |
utils.RETRY_REMAINING_TIME, |
|
582 |
timeout, |
|
583 |
wait_fn=self.WaitFn) |
|
663 |
check_fn = _JobChangesChecker(fields, prev_job_info, prev_log_serial) |
|
664 |
waiter = _JobChangesWaiter(filename) |
|
665 |
try: |
|
666 |
return utils.Retry(compat.partial(self._CheckForChanges, |
|
667 |
job_load_fn, check_fn), |
|
668 |
utils.RETRY_REMAINING_TIME, timeout, |
|
669 |
wait_fn=waiter.Wait) |
|
670 |
finally: |
|
671 |
waiter.Close() |
|
584 | 672 |
except (errors.InotifyError, errors.JobLost): |
585 | 673 |
return None |
586 | 674 |
except utils.RetryTimeout: |
587 | 675 |
return constants.JOB_NOTCHANGED |
588 | 676 |
|
589 |
def Close(self): |
|
590 |
if self.wm: |
|
591 |
self.notifier.stop() |
|
592 |
|
|
593 | 677 |
|
594 | 678 |
class _JobQueueWorker(workerpool.BaseWorker): |
595 | 679 |
"""The actual job workers. |
... | ... | |
1314 | 1398 |
@type prev_log_serial: int |
1315 | 1399 |
@param prev_log_serial: Last job message serial number |
1316 | 1400 |
@type timeout: float |
1317 |
@param timeout: maximum time to wait |
|
1401 |
@param timeout: maximum time to wait in seconds
|
|
1318 | 1402 |
@rtype: tuple (job info, log entries) |
1319 | 1403 |
@return: a tuple of the job information as required via |
1320 | 1404 |
the fields parameter, and the log entries as a list |
... | ... | |
1325 | 1409 |
as such by the clients |
1326 | 1410 |
|
1327 | 1411 |
""" |
1328 |
helper = _WaitForJobChangesHelper(job_id, fields, prev_job_info,
|
|
1329 |
prev_log_serial, self) |
|
1330 |
try:
|
|
1331 |
return helper.WaitForChanges(timeout) |
|
1332 |
finally:
|
|
1333 |
helper.Close()
|
|
1412 |
load_fn = compat.partial(self.SafeLoadJobFromDisk, job_id)
|
|
1413 |
|
|
1414 |
helper = _WaitForJobChangesHelper()
|
|
1415 |
|
|
1416 |
return helper(self._GetJobPath(job_id), load_fn,
|
|
1417 |
fields, prev_job_info, prev_log_serial, timeout)
|
|
1334 | 1418 |
|
1335 | 1419 |
@locking.ssynchronized(_LOCK) |
1336 | 1420 |
@_RequireOpenQueue |
... | ... | |
1380 | 1464 |
archive_jobs = [] |
1381 | 1465 |
rename_files = [] |
1382 | 1466 |
for job in jobs: |
1383 |
if job.CalcStatus() not in (constants.JOB_STATUS_CANCELED, |
|
1384 |
constants.JOB_STATUS_SUCCESS, |
|
1385 |
constants.JOB_STATUS_ERROR): |
|
1467 |
if job.CalcStatus() not in constants.JOBS_FINALIZED: |
|
1386 | 1468 |
logging.debug("Job %s is not yet done", job.id) |
1387 | 1469 |
continue |
1388 | 1470 |
|
b/test/ganeti.jqueue_unittest.py | ||
---|---|---|
1 |
#!/usr/bin/python |
|
2 |
# |
|
3 |
|
|
4 |
# Copyright (C) 2010 Google Inc. |
|
5 |
# |
|
6 |
# This program is free software; you can redistribute it and/or modify |
|
7 |
# it under the terms of the GNU General Public License as published by |
|
8 |
# the Free Software Foundation; either version 2 of the License, or |
|
9 |
# (at your option) any later version. |
|
10 |
# |
|
11 |
# This program is distributed in the hope that it will be useful, but |
|
12 |
# WITHOUT ANY WARRANTY; without even the implied warranty of |
|
13 |
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU |
|
14 |
# General Public License for more details. |
|
15 |
# |
|
16 |
# You should have received a copy of the GNU General Public License |
|
17 |
# along with this program; if not, write to the Free Software |
|
18 |
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA |
|
19 |
# 02110-1301, USA. |
|
20 |
|
|
21 |
|
|
22 |
"""Script for testing ganeti.jqueue""" |
|
23 |
|
|
24 |
import os |
|
25 |
import sys |
|
26 |
import unittest |
|
27 |
import tempfile |
|
28 |
import shutil |
|
29 |
import errno |
|
30 |
|
|
31 |
from ganeti import constants |
|
32 |
from ganeti import utils |
|
33 |
from ganeti import errors |
|
34 |
from ganeti import jqueue |
|
35 |
|
|
36 |
import testutils |
|
37 |
|
|
38 |
|
|
39 |
class _FakeJob: |
|
40 |
def __init__(self, job_id, status): |
|
41 |
self.id = job_id |
|
42 |
self._status = status |
|
43 |
self._log = [] |
|
44 |
|
|
45 |
def SetStatus(self, status): |
|
46 |
self._status = status |
|
47 |
|
|
48 |
def AddLogEntry(self, msg): |
|
49 |
self._log.append((len(self._log), msg)) |
|
50 |
|
|
51 |
def CalcStatus(self): |
|
52 |
return self._status |
|
53 |
|
|
54 |
def GetInfo(self, fields): |
|
55 |
result = [] |
|
56 |
|
|
57 |
for name in fields: |
|
58 |
if name == "status": |
|
59 |
result.append(self._status) |
|
60 |
else: |
|
61 |
raise Exception("Unknown field") |
|
62 |
|
|
63 |
return result |
|
64 |
|
|
65 |
def GetLogEntries(self, newer_than): |
|
66 |
assert newer_than is None or newer_than >= 0 |
|
67 |
|
|
68 |
if newer_than is None: |
|
69 |
return self._log |
|
70 |
|
|
71 |
return self._log[newer_than:] |
|
72 |
|
|
73 |
|
|
74 |
class TestJobChangesChecker(unittest.TestCase): |
|
75 |
def testStatus(self): |
|
76 |
job = _FakeJob(9094, constants.JOB_STATUS_QUEUED) |
|
77 |
checker = jqueue._JobChangesChecker(["status"], None, None) |
|
78 |
self.assertEqual(checker(job), ([constants.JOB_STATUS_QUEUED], [])) |
|
79 |
|
|
80 |
job.SetStatus(constants.JOB_STATUS_RUNNING) |
|
81 |
self.assertEqual(checker(job), ([constants.JOB_STATUS_RUNNING], [])) |
|
82 |
|
|
83 |
job.SetStatus(constants.JOB_STATUS_SUCCESS) |
|
84 |
self.assertEqual(checker(job), ([constants.JOB_STATUS_SUCCESS], [])) |
|
85 |
|
|
86 |
# job.id is used by checker |
|
87 |
self.assertEqual(job.id, 9094) |
|
88 |
|
|
89 |
def testStatusWithPrev(self): |
|
90 |
job = _FakeJob(12807, constants.JOB_STATUS_QUEUED) |
|
91 |
checker = jqueue._JobChangesChecker(["status"], |
|
92 |
[constants.JOB_STATUS_QUEUED], None) |
|
93 |
self.assert_(checker(job) is None) |
|
94 |
|
|
95 |
job.SetStatus(constants.JOB_STATUS_RUNNING) |
|
96 |
self.assertEqual(checker(job), ([constants.JOB_STATUS_RUNNING], [])) |
|
97 |
|
|
98 |
def testFinalStatus(self): |
|
99 |
for status in constants.JOBS_FINALIZED: |
|
100 |
job = _FakeJob(2178711, status) |
|
101 |
checker = jqueue._JobChangesChecker(["status"], [status], None) |
|
102 |
# There won't be any changes in this status, hence it should signal |
|
103 |
# a change immediately |
|
104 |
self.assertEqual(checker(job), ([status], [])) |
|
105 |
|
|
106 |
def testLog(self): |
|
107 |
job = _FakeJob(9094, constants.JOB_STATUS_RUNNING) |
|
108 |
checker = jqueue._JobChangesChecker(["status"], None, None) |
|
109 |
self.assertEqual(checker(job), ([constants.JOB_STATUS_RUNNING], [])) |
|
110 |
|
|
111 |
job.AddLogEntry("Hello World") |
|
112 |
(job_info, log_entries) = checker(job) |
|
113 |
self.assertEqual(job_info, [constants.JOB_STATUS_RUNNING]) |
|
114 |
self.assertEqual(log_entries, [[0, "Hello World"]]) |
|
115 |
|
|
116 |
checker2 = jqueue._JobChangesChecker(["status"], job_info, len(log_entries)) |
|
117 |
self.assert_(checker2(job) is None) |
|
118 |
|
|
119 |
job.AddLogEntry("Foo Bar") |
|
120 |
job.SetStatus(constants.JOB_STATUS_ERROR) |
|
121 |
|
|
122 |
(job_info, log_entries) = checker2(job) |
|
123 |
self.assertEqual(job_info, [constants.JOB_STATUS_ERROR]) |
|
124 |
self.assertEqual(log_entries, [[1, "Foo Bar"]]) |
|
125 |
|
|
126 |
checker3 = jqueue._JobChangesChecker(["status"], None, None) |
|
127 |
(job_info, log_entries) = checker3(job) |
|
128 |
self.assertEqual(job_info, [constants.JOB_STATUS_ERROR]) |
|
129 |
self.assertEqual(log_entries, [[0, "Hello World"], [1, "Foo Bar"]]) |
|
130 |
|
|
131 |
|
|
132 |
class TestJobChangesWaiter(unittest.TestCase): |
|
133 |
def setUp(self): |
|
134 |
self.tmpdir = tempfile.mkdtemp() |
|
135 |
self.filename = utils.PathJoin(self.tmpdir, "job-1") |
|
136 |
utils.WriteFile(self.filename, data="") |
|
137 |
|
|
138 |
def tearDown(self): |
|
139 |
shutil.rmtree(self.tmpdir) |
|
140 |
|
|
141 |
def _EnsureNotifierClosed(self, notifier): |
|
142 |
try: |
|
143 |
os.fstat(notifier._fd) |
|
144 |
except EnvironmentError, err: |
|
145 |
self.assertEqual(err.errno, errno.EBADF) |
|
146 |
else: |
|
147 |
self.fail("File descriptor wasn't closed") |
|
148 |
|
|
149 |
def testClose(self): |
|
150 |
for wait in [False, True]: |
|
151 |
waiter = jqueue._JobFileChangesWaiter(self.filename) |
|
152 |
try: |
|
153 |
if wait: |
|
154 |
waiter.Wait(0.001) |
|
155 |
finally: |
|
156 |
waiter.Close() |
|
157 |
|
|
158 |
# Ensure file descriptor was closed |
|
159 |
self._EnsureNotifierClosed(waiter._notifier) |
|
160 |
|
|
161 |
def testChangingFile(self): |
|
162 |
waiter = jqueue._JobFileChangesWaiter(self.filename) |
|
163 |
try: |
|
164 |
self.assertFalse(waiter.Wait(0.1)) |
|
165 |
utils.WriteFile(self.filename, data="changed") |
|
166 |
self.assert_(waiter.Wait(60)) |
|
167 |
finally: |
|
168 |
waiter.Close() |
|
169 |
|
|
170 |
self._EnsureNotifierClosed(waiter._notifier) |
|
171 |
|
|
172 |
def testChangingFile2(self): |
|
173 |
waiter = jqueue._JobChangesWaiter(self.filename) |
|
174 |
try: |
|
175 |
self.assertFalse(waiter._filewaiter) |
|
176 |
self.assert_(waiter.Wait(0.1)) |
|
177 |
self.assert_(waiter._filewaiter) |
|
178 |
|
|
179 |
# File waiter is now used, but there have been no changes |
|
180 |
self.assertFalse(waiter.Wait(0.1)) |
|
181 |
utils.WriteFile(self.filename, data="changed") |
|
182 |
self.assert_(waiter.Wait(60)) |
|
183 |
finally: |
|
184 |
waiter.Close() |
|
185 |
|
|
186 |
self._EnsureNotifierClosed(waiter._filewaiter._notifier) |
|
187 |
|
|
188 |
|
|
189 |
class TestWaitForJobChangesHelper(unittest.TestCase): |
|
190 |
def setUp(self): |
|
191 |
self.tmpdir = tempfile.mkdtemp() |
|
192 |
self.filename = utils.PathJoin(self.tmpdir, "job-2614226563") |
|
193 |
utils.WriteFile(self.filename, data="") |
|
194 |
|
|
195 |
def tearDown(self): |
|
196 |
shutil.rmtree(self.tmpdir) |
|
197 |
|
|
198 |
def _LoadWaitingJob(self): |
|
199 |
return _FakeJob(2614226563, constants.JOB_STATUS_WAITLOCK) |
|
200 |
|
|
201 |
def _LoadLostJob(self): |
|
202 |
return None |
|
203 |
|
|
204 |
def testNoChanges(self): |
|
205 |
wfjc = jqueue._WaitForJobChangesHelper() |
|
206 |
|
|
207 |
# No change |
|
208 |
self.assertEqual(wfjc(self.filename, self._LoadWaitingJob, ["status"], |
|
209 |
[constants.JOB_STATUS_WAITLOCK], None, 0.1), |
|
210 |
constants.JOB_NOTCHANGED) |
|
211 |
|
|
212 |
# No previous information |
|
213 |
self.assertEqual(wfjc(self.filename, self._LoadWaitingJob, |
|
214 |
["status"], None, None, 1.0), |
|
215 |
([constants.JOB_STATUS_WAITLOCK], [])) |
|
216 |
|
|
217 |
def testLostJob(self): |
|
218 |
wfjc = jqueue._WaitForJobChangesHelper() |
|
219 |
self.assert_(wfjc(self.filename, self._LoadLostJob, |
|
220 |
["status"], None, None, 1.0) is None) |
|
221 |
|
|
222 |
|
|
223 |
if __name__ == "__main__": |
|
224 |
testutils.GanetiTestProgram() |
Also available in: Unified diff