Revision 6c2549d6 lib/jqueue.py
b/lib/jqueue.py | ||
---|---|---|
37 | 37 |
import time |
38 | 38 |
import weakref |
39 | 39 |
|
40 |
try: |
|
41 |
# pylint: disable-msg=E0611 |
|
42 |
from pyinotify import pyinotify |
|
43 |
except ImportError: |
|
44 |
import pyinotify |
|
45 |
|
|
46 |
from ganeti import asyncnotifier |
|
40 | 47 |
from ganeti import constants |
41 | 48 |
from ganeti import serializer |
42 | 49 |
from ganeti import workerpool |
... | ... | |
473 | 480 |
self._job.lock_status = msg |
474 | 481 |
|
475 | 482 |
|
483 |
class _WaitForJobChangesHelper(object): |
|
484 |
"""Helper class using initofy to wait for changes in a job file. |
|
485 |
|
|
486 |
This class takes a previous job status and serial, and alerts the client when |
|
487 |
the current job status has changed. |
|
488 |
|
|
489 |
@type job_id: string |
|
490 |
@ivar job_id: id of the job we're watching |
|
491 |
@type prev_job_info: string |
|
492 |
@ivar prev_job_info: previous job info, as passed by the luxi client |
|
493 |
@type prev_log_serial: string |
|
494 |
@ivar prev_log_serial: previous job serial, as passed by the luxi client |
|
495 |
@type queue: L{JobQueue} |
|
496 |
@ivar queue: job queue (used for a few utility functions) |
|
497 |
@type job_path: string |
|
498 |
@ivar job_path: absolute path of the job file |
|
499 |
@type wm: pyinotify.WatchManager (or None) |
|
500 |
@ivar wm: inotify watch manager to watch for changes |
|
501 |
@type inotify_handler: L{asyncnotifier.SingleFileEventHandler} |
|
502 |
@ivar inotify_handler: single file event handler, used for watching |
|
503 |
@type notifier: pyinotify.Notifier |
|
504 |
@ivar notifier: inotify single-threaded notifier, used for watching |
|
505 |
|
|
506 |
""" |
|
507 |
|
|
508 |
def __init__(self, job_id, fields, prev_job_info, prev_log_serial, queue): |
|
509 |
self.job_id = job_id |
|
510 |
self.fields = fields |
|
511 |
self.prev_job_info = prev_job_info |
|
512 |
self.prev_log_serial = prev_log_serial |
|
513 |
self.queue = queue |
|
514 |
# pylint: disable-msg=W0212 |
|
515 |
self.job_path = self.queue._GetJobPath(self.job_id) |
|
516 |
self.wm = None |
|
517 |
self.inotify_handler = None |
|
518 |
self.notifier = None |
|
519 |
|
|
520 |
def _SetupInotify(self): |
|
521 |
"""Create the inotify |
|
522 |
|
|
523 |
@raises errors.InotifyError: if the notifier cannot be setup |
|
524 |
|
|
525 |
""" |
|
526 |
if self.wm: |
|
527 |
return |
|
528 |
self.wm = pyinotify.WatchManager() |
|
529 |
self.inotify_handler = asyncnotifier.SingleFileEventHandler(self.wm, |
|
530 |
self.OnInotify, |
|
531 |
self.job_path) |
|
532 |
self.notifier = pyinotify.Notifier(self.wm, self.inotify_handler) |
|
533 |
self.inotify_handler.enable() |
|
534 |
|
|
535 |
def _LoadDiskStatus(self): |
|
536 |
job = self.queue.SafeLoadJobFromDisk(self.job_id) |
|
537 |
if not job: |
|
538 |
raise errors.JobLost() |
|
539 |
self.job_status = job.CalcStatus() |
|
540 |
|
|
541 |
job_info = job.GetInfo(self.fields) |
|
542 |
log_entries = job.GetLogEntries(self.prev_log_serial) |
|
543 |
# Serializing and deserializing data can cause type changes (e.g. from |
|
544 |
# tuple to list) or precision loss. We're doing it here so that we get |
|
545 |
# the same modifications as the data received from the client. Without |
|
546 |
# this, the comparison afterwards might fail without the data being |
|
547 |
# significantly different. |
|
548 |
# TODO: we just deserialized from disk, investigate how to make sure that |
|
549 |
# the job info and log entries are compatible to avoid this further step. |
|
550 |
self.job_info = serializer.LoadJson(serializer.DumpJson(job_info)) |
|
551 |
self.log_entries = serializer.LoadJson(serializer.DumpJson(log_entries)) |
|
552 |
|
|
553 |
def _CheckForChanges(self): |
|
554 |
self._LoadDiskStatus() |
|
555 |
# Don't even try to wait if the job is no longer running, there will be |
|
556 |
# no changes. |
|
557 |
if (self.job_status not in (constants.JOB_STATUS_QUEUED, |
|
558 |
constants.JOB_STATUS_RUNNING, |
|
559 |
constants.JOB_STATUS_WAITLOCK) or |
|
560 |
self.prev_job_info != self.job_info or |
|
561 |
(self.log_entries and self.prev_log_serial != self.log_entries[0][0])): |
|
562 |
logging.debug("Job %s changed", self.job_id) |
|
563 |
return (self.job_info, self.log_entries) |
|
564 |
|
|
565 |
raise utils.RetryAgain() |
|
566 |
|
|
567 |
def OnInotify(self, notifier_enabled): |
|
568 |
if not notifier_enabled: |
|
569 |
self.inotify_handler.enable() |
|
570 |
|
|
571 |
def WaitFn(self, timeout): |
|
572 |
self._SetupInotify() |
|
573 |
if self.notifier.check_events(timeout*1000): |
|
574 |
self.notifier.read_events() |
|
575 |
self.notifier.process_events() |
|
576 |
|
|
577 |
def WaitForChanges(self, timeout): |
|
578 |
try: |
|
579 |
return utils.Retry(self._CheckForChanges, |
|
580 |
utils.RETRY_REMAINING_TIME, |
|
581 |
timeout, |
|
582 |
wait_fn=self.WaitFn) |
|
583 |
except (errors.InotifyError, errors.JobLost): |
|
584 |
return None |
|
585 |
except utils.RetryTimeout: |
|
586 |
return constants.JOB_NOTCHANGED |
|
587 |
|
|
588 |
def Close(self): |
|
589 |
if self.wm: |
|
590 |
self.notifier.stop() |
|
591 |
|
|
592 |
|
|
476 | 593 |
class _JobQueueWorker(workerpool.BaseWorker): |
477 | 594 |
"""The actual job workers. |
478 | 595 |
|
... | ... | |
1183 | 1300 |
# Notify waiters about potential changes |
1184 | 1301 |
job.change.notifyAll() |
1185 | 1302 |
|
1186 |
@utils.LockedMethod |
|
1187 |
@_RequireOpenQueue |
|
1188 | 1303 |
def WaitForJobChanges(self, job_id, fields, prev_job_info, prev_log_serial, |
1189 | 1304 |
timeout): |
1190 | 1305 |
"""Waits for changes in a job. |
... | ... | |
1209 | 1324 |
as such by the clients |
1210 | 1325 |
|
1211 | 1326 |
""" |
1212 |
job = self._LoadJobUnlocked(job_id) |
|
1213 |
if not job: |
|
1214 |
logging.debug("Job %s not found", job_id) |
|
1215 |
return None |
|
1216 |
|
|
1217 |
def _CheckForChanges(): |
|
1218 |
logging.debug("Waiting for changes in job %s", job_id) |
|
1219 |
|
|
1220 |
status = job.CalcStatus() |
|
1221 |
job_info = job.GetInfo(fields) |
|
1222 |
log_entries = job.GetLogEntries(prev_log_serial) |
|
1223 |
|
|
1224 |
# Serializing and deserializing data can cause type changes (e.g. from |
|
1225 |
# tuple to list) or precision loss. We're doing it here so that we get |
|
1226 |
# the same modifications as the data received from the client. Without |
|
1227 |
# this, the comparison afterwards might fail without the data being |
|
1228 |
# significantly different. |
|
1229 |
job_info = serializer.LoadJson(serializer.DumpJson(job_info)) |
|
1230 |
log_entries = serializer.LoadJson(serializer.DumpJson(log_entries)) |
|
1231 |
|
|
1232 |
# Don't even try to wait if the job is no longer running, there will be |
|
1233 |
# no changes. |
|
1234 |
if (status not in (constants.JOB_STATUS_QUEUED, |
|
1235 |
constants.JOB_STATUS_RUNNING, |
|
1236 |
constants.JOB_STATUS_WAITLOCK) or |
|
1237 |
prev_job_info != job_info or |
|
1238 |
(log_entries and prev_log_serial != log_entries[0][0])): |
|
1239 |
logging.debug("Job %s changed", job_id) |
|
1240 |
return (job_info, log_entries) |
|
1241 |
|
|
1242 |
raise utils.RetryAgain() |
|
1243 |
|
|
1327 |
helper = _WaitForJobChangesHelper(job_id, fields, prev_job_info, |
|
1328 |
prev_log_serial, self) |
|
1244 | 1329 |
try: |
1245 |
# Setting wait function to release the queue lock while waiting |
|
1246 |
return utils.Retry(_CheckForChanges, utils.RETRY_REMAINING_TIME, timeout, |
|
1247 |
wait_fn=job.change.wait) |
|
1248 |
except utils.RetryTimeout: |
|
1249 |
return constants.JOB_NOTCHANGED |
|
1330 |
return helper.WaitForChanges(timeout) |
|
1331 |
finally: |
|
1332 |
helper.Close() |
|
1250 | 1333 |
|
1251 | 1334 |
@utils.LockedMethod |
1252 | 1335 |
@_RequireOpenQueue |
Also available in: Unified diff