Revision 989a8bee lib/jqueue.py
b/lib/jqueue.py | ||
---|---|---|
54 | 54 |
from ganeti import jstore |
55 | 55 |
from ganeti import rpc |
56 | 56 |
from ganeti import netutils |
57 |
from ganeti import compat |
|
57 | 58 |
|
58 | 59 |
|
59 | 60 |
JOBQUEUE_THREADS = 25 |
... | ... | |
481 | 482 |
self._job.lock_status = msg |
482 | 483 |
|
483 | 484 |
|
484 |
class _WaitForJobChangesHelper(object): |
|
485 |
"""Helper class using initofy to wait for changes in a job file. |
|
486 |
|
|
487 |
This class takes a previous job status and serial, and alerts the client when |
|
488 |
the current job status has changed. |
|
485 |
class _JobChangesChecker(object): |
|
486 |
def __init__(self, fields, prev_job_info, prev_log_serial): |
|
487 |
"""Initializes this class. |
|
489 | 488 |
|
490 |
@type job_id: string |
|
491 |
@ivar job_id: id of the job we're watching |
|
492 |
@type prev_job_info: string |
|
493 |
@ivar prev_job_info: previous job info, as passed by the luxi client |
|
494 |
@type prev_log_serial: string |
|
495 |
@ivar prev_log_serial: previous job serial, as passed by the luxi client |
|
496 |
@type queue: L{JobQueue} |
|
497 |
@ivar queue: job queue (used for a few utility functions) |
|
498 |
@type job_path: string |
|
499 |
@ivar job_path: absolute path of the job file |
|
500 |
@type wm: pyinotify.WatchManager (or None) |
|
501 |
@ivar wm: inotify watch manager to watch for changes |
|
502 |
@type inotify_handler: L{asyncnotifier.SingleFileEventHandler} |
|
503 |
@ivar inotify_handler: single file event handler, used for watching |
|
504 |
@type notifier: pyinotify.Notifier |
|
505 |
@ivar notifier: inotify single-threaded notifier, used for watching |
|
489 |
@type fields: list of strings |
|
490 |
@param fields: Fields requested by LUXI client |
|
491 |
@type prev_job_info: string |
|
492 |
@param prev_job_info: previous job info, as passed by the LUXI client |
|
493 |
@type prev_log_serial: string |
|
494 |
@param prev_log_serial: previous job serial, as passed by the LUXI client |
|
506 | 495 |
|
507 |
""" |
|
508 |
def __init__(self, job_id, fields, prev_job_info, prev_log_serial, queue): |
|
509 |
self.job_id = job_id |
|
510 |
self.fields = fields |
|
511 |
self.prev_job_info = prev_job_info |
|
512 |
self.prev_log_serial = prev_log_serial |
|
513 |
self.queue = queue |
|
514 |
# pylint: disable-msg=W0212 |
|
515 |
self.job_path = self.queue._GetJobPath(self.job_id) |
|
516 |
self.wm = None |
|
517 |
self.inotify_handler = None |
|
518 |
self.notifier = None |
|
496 |
""" |
|
497 |
self._fields = fields |
|
498 |
self._prev_job_info = prev_job_info |
|
499 |
self._prev_log_serial = prev_log_serial |
|
519 | 500 |
|
520 |
def _SetupInotify(self):
|
|
521 |
"""Create the inotify
|
|
501 |
def __call__(self, job):
|
|
502 |
"""Checks whether job has changed.
|
|
522 | 503 |
|
523 |
@raises errors.InotifyError: if the notifier cannot be setup |
|
504 |
@type job: L{_QueuedJob} |
|
505 |
@param job: Job object |
|
524 | 506 |
|
525 | 507 |
""" |
526 |
if self.wm: |
|
527 |
return |
|
528 |
self.wm = pyinotify.WatchManager() |
|
529 |
self.inotify_handler = asyncnotifier.SingleFileEventHandler(self.wm, |
|
530 |
self.OnInotify, |
|
531 |
self.job_path) |
|
532 |
self.notifier = pyinotify.Notifier(self.wm, self.inotify_handler) |
|
533 |
self.inotify_handler.enable() |
|
534 |
|
|
535 |
def _LoadDiskStatus(self): |
|
536 |
job = self.queue.SafeLoadJobFromDisk(self.job_id) |
|
537 |
if not job: |
|
538 |
raise errors.JobLost() |
|
539 |
self.job_status = job.CalcStatus() |
|
508 |
status = job.CalcStatus() |
|
509 |
job_info = job.GetInfo(self._fields) |
|
510 |
log_entries = job.GetLogEntries(self._prev_log_serial) |
|
540 | 511 |
|
541 |
job_info = job.GetInfo(self.fields) |
|
542 |
log_entries = job.GetLogEntries(self.prev_log_serial) |
|
543 | 512 |
# Serializing and deserializing data can cause type changes (e.g. from |
544 | 513 |
# tuple to list) or precision loss. We're doing it here so that we get |
545 | 514 |
# the same modifications as the data received from the client. Without |
... | ... | |
547 | 516 |
# significantly different. |
548 | 517 |
# TODO: we just deserialized from disk, investigate how to make sure that |
549 | 518 |
# the job info and log entries are compatible to avoid this further step. |
550 |
self.job_info = serializer.LoadJson(serializer.DumpJson(job_info)) |
|
551 |
self.log_entries = serializer.LoadJson(serializer.DumpJson(log_entries)) |
|
519 |
# TODO: Doing something like in testutils.py:UnifyValueType might be more |
|
520 |
# efficient, though floats will be tricky |
|
521 |
job_info = serializer.LoadJson(serializer.DumpJson(job_info)) |
|
522 |
log_entries = serializer.LoadJson(serializer.DumpJson(log_entries)) |
|
552 | 523 |
|
553 |
def _CheckForChanges(self): |
|
554 |
self._LoadDiskStatus() |
|
555 | 524 |
# Don't even try to wait if the job is no longer running, there will be |
556 | 525 |
# no changes. |
557 |
if (self.job_status not in (constants.JOB_STATUS_QUEUED,
|
|
558 |
constants.JOB_STATUS_RUNNING,
|
|
559 |
constants.JOB_STATUS_WAITLOCK) or
|
|
560 |
self.prev_job_info != self.job_info or
|
|
561 |
(self.log_entries and self.prev_log_serial != self.log_entries[0][0])):
|
|
562 |
logging.debug("Job %s changed", self.job_id)
|
|
563 |
return (self.job_info, self.log_entries)
|
|
526 |
if (status not in (constants.JOB_STATUS_QUEUED, |
|
527 |
constants.JOB_STATUS_RUNNING, |
|
528 |
constants.JOB_STATUS_WAITLOCK) or |
|
529 |
job_info != self._prev_job_info or
|
|
530 |
(log_entries and self._prev_log_serial != log_entries[0][0])):
|
|
531 |
logging.debug("Job %s changed", job.id)
|
|
532 |
return (job_info, log_entries)
|
|
564 | 533 |
|
565 |
raise utils.RetryAgain() |
|
534 |
return None |
|
535 |
|
|
536 |
|
|
537 |
class _JobFileChangesWaiter(object): |
|
538 |
def __init__(self, filename): |
|
539 |
"""Initializes this class. |
|
540 |
|
|
541 |
@type filename: string |
|
542 |
@param filename: Path to job file |
|
543 |
@raises errors.InotifyError: if the notifier cannot be setup |
|
566 | 544 |
|
567 |
def OnInotify(self, notifier_enabled): |
|
545 |
""" |
|
546 |
self._wm = pyinotify.WatchManager() |
|
547 |
self._inotify_handler = \ |
|
548 |
asyncnotifier.SingleFileEventHandler(self._wm, self._OnInotify, filename) |
|
549 |
self._notifier = \ |
|
550 |
pyinotify.Notifier(self._wm, default_proc_fun=self._inotify_handler) |
|
551 |
try: |
|
552 |
self._inotify_handler.enable() |
|
553 |
except Exception: |
|
554 |
# pyinotify doesn't close file descriptors automatically |
|
555 |
self._notifier.stop() |
|
556 |
raise |
|
557 |
|
|
558 |
def _OnInotify(self, notifier_enabled): |
|
559 |
"""Callback for inotify. |
|
560 |
|
|
561 |
""" |
|
568 | 562 |
if not notifier_enabled: |
569 |
self.inotify_handler.enable() |
|
563 |
self._inotify_handler.enable() |
|
564 |
|
|
565 |
def Wait(self, timeout): |
|
566 |
"""Waits for the job file to change. |
|
567 |
|
|
568 |
@type timeout: float |
|
569 |
@param timeout: Timeout in seconds |
|
570 |
@return: Whether there have been events |
|
571 |
|
|
572 |
""" |
|
573 |
assert timeout >= 0 |
|
574 |
have_events = self._notifier.check_events(timeout * 1000) |
|
575 |
if have_events: |
|
576 |
self._notifier.read_events() |
|
577 |
self._notifier.process_events() |
|
578 |
return have_events |
|
579 |
|
|
580 |
def Close(self): |
|
581 |
"""Closes underlying notifier and its file descriptor. |
|
582 |
|
|
583 |
""" |
|
584 |
self._notifier.stop() |
|
585 |
|
|
586 |
|
|
587 |
class _JobChangesWaiter(object): |
|
588 |
def __init__(self, filename): |
|
589 |
"""Initializes this class. |
|
590 |
|
|
591 |
@type filename: string |
|
592 |
@param filename: Path to job file |
|
593 |
|
|
594 |
""" |
|
595 |
self._filewaiter = None |
|
596 |
self._filename = filename |
|
570 | 597 |
|
571 |
def WaitFn(self, timeout): |
|
572 |
self._SetupInotify() |
|
573 |
if self.notifier.check_events(timeout*1000): |
|
574 |
self.notifier.read_events() |
|
575 |
self.notifier.process_events() |
|
598 |
def Wait(self, timeout): |
|
599 |
"""Waits for a job to change. |
|
576 | 600 |
|
577 |
def WaitForChanges(self, timeout): |
|
578 |
self._SetupInotify() |
|
601 |
@type timeout: float |
|
602 |
@param timeout: Timeout in seconds |
|
603 |
@return: Whether there have been events |
|
604 |
|
|
605 |
""" |
|
606 |
if self._filewaiter: |
|
607 |
return self._filewaiter.Wait(timeout) |
|
608 |
|
|
609 |
# Lazy setup: Avoid inotify setup cost when job file has already changed. |
|
610 |
# If this point is reached, return immediately and let caller check the job |
|
611 |
# file again in case there were changes since the last check. This avoids a |
|
612 |
# race condition. |
|
613 |
self._filewaiter = _JobFileChangesWaiter(self._filename) |
|
614 |
|
|
615 |
return True |
|
616 |
|
|
617 |
def Close(self): |
|
618 |
"""Closes underlying waiter. |
|
619 |
|
|
620 |
""" |
|
621 |
if self._filewaiter: |
|
622 |
self._filewaiter.Close() |
|
623 |
|
|
624 |
|
|
625 |
class _WaitForJobChangesHelper(object): |
|
626 |
"""Helper class using inotify to wait for changes in a job file. |
|
627 |
|
|
628 |
This class takes a previous job status and serial, and alerts the client when |
|
629 |
the current job status has changed. |
|
630 |
|
|
631 |
""" |
|
632 |
@staticmethod |
|
633 |
def _CheckForChanges(job_load_fn, check_fn): |
|
634 |
job = job_load_fn() |
|
635 |
if not job: |
|
636 |
raise errors.JobLost() |
|
637 |
|
|
638 |
result = check_fn(job) |
|
639 |
if result is None: |
|
640 |
raise utils.RetryAgain() |
|
641 |
|
|
642 |
return result |
|
643 |
|
|
644 |
def __call__(self, filename, job_load_fn, |
|
645 |
fields, prev_job_info, prev_log_serial, timeout): |
|
646 |
"""Waits for changes on a job. |
|
647 |
|
|
648 |
@type filename: string |
|
649 |
@param filename: File on which to wait for changes |
|
650 |
@type job_load_fn: callable |
|
651 |
@param job_load_fn: Function to load job |
|
652 |
@type fields: list of strings |
|
653 |
@param fields: Which fields to check for changes |
|
654 |
@type prev_job_info: list or None |
|
655 |
@param prev_job_info: Last job information returned |
|
656 |
@type prev_log_serial: int |
|
657 |
@param prev_log_serial: Last job message serial number |
|
658 |
@type timeout: float |
|
659 |
@param timeout: maximum time to wait in seconds |
|
660 |
|
|
661 |
""" |
|
579 | 662 |
try: |
580 |
return utils.Retry(self._CheckForChanges, |
|
581 |
utils.RETRY_REMAINING_TIME, |
|
582 |
timeout, |
|
583 |
wait_fn=self.WaitFn) |
|
663 |
check_fn = _JobChangesChecker(fields, prev_job_info, prev_log_serial) |
|
664 |
waiter = _JobChangesWaiter(filename) |
|
665 |
try: |
|
666 |
return utils.Retry(compat.partial(self._CheckForChanges, |
|
667 |
job_load_fn, check_fn), |
|
668 |
utils.RETRY_REMAINING_TIME, timeout, |
|
669 |
wait_fn=waiter.Wait) |
|
670 |
finally: |
|
671 |
waiter.Close() |
|
584 | 672 |
except (errors.InotifyError, errors.JobLost): |
585 | 673 |
return None |
586 | 674 |
except utils.RetryTimeout: |
587 | 675 |
return constants.JOB_NOTCHANGED |
588 | 676 |
|
589 |
def Close(self): |
|
590 |
if self.wm: |
|
591 |
self.notifier.stop() |
|
592 |
|
|
593 | 677 |
|
594 | 678 |
class _JobQueueWorker(workerpool.BaseWorker): |
595 | 679 |
"""The actual job workers. |
... | ... | |
1314 | 1398 |
@type prev_log_serial: int |
1315 | 1399 |
@param prev_log_serial: Last job message serial number |
1316 | 1400 |
@type timeout: float |
1317 |
@param timeout: maximum time to wait |
|
1401 |
@param timeout: maximum time to wait in seconds
|
|
1318 | 1402 |
@rtype: tuple (job info, log entries) |
1319 | 1403 |
@return: a tuple of the job information as required via |
1320 | 1404 |
the fields parameter, and the log entries as a list |
... | ... | |
1325 | 1409 |
as such by the clients |
1326 | 1410 |
|
1327 | 1411 |
""" |
1328 |
helper = _WaitForJobChangesHelper(job_id, fields, prev_job_info,
|
|
1329 |
prev_log_serial, self) |
|
1330 |
try:
|
|
1331 |
return helper.WaitForChanges(timeout) |
|
1332 |
finally:
|
|
1333 |
helper.Close()
|
|
1412 |
load_fn = compat.partial(self.SafeLoadJobFromDisk, job_id)
|
|
1413 |
|
|
1414 |
helper = _WaitForJobChangesHelper()
|
|
1415 |
|
|
1416 |
return helper(self._GetJobPath(job_id), load_fn,
|
|
1417 |
fields, prev_job_info, prev_log_serial, timeout)
|
|
1334 | 1418 |
|
1335 | 1419 |
@locking.ssynchronized(_LOCK) |
1336 | 1420 |
@_RequireOpenQueue |
... | ... | |
1380 | 1464 |
archive_jobs = [] |
1381 | 1465 |
rename_files = [] |
1382 | 1466 |
for job in jobs: |
1383 |
if job.CalcStatus() not in (constants.JOB_STATUS_CANCELED, |
|
1384 |
constants.JOB_STATUS_SUCCESS, |
|
1385 |
constants.JOB_STATUS_ERROR): |
|
1467 |
if job.CalcStatus() not in constants.JOBS_FINALIZED: |
|
1386 | 1468 |
logging.debug("Job %s is not yet done", job.id) |
1387 | 1469 |
continue |
1388 | 1470 |
|
Also available in: Unified diff