Revision 6c2549d6 lib/jqueue.py

b/lib/jqueue.py
37 37
import time
38 38
import weakref
39 39

  
40
try:
41
  # pylint: disable-msg=E0611
42
  from pyinotify import pyinotify
43
except ImportError:
44
  import pyinotify
45

  
46
from ganeti import asyncnotifier
40 47
from ganeti import constants
41 48
from ganeti import serializer
42 49
from ganeti import workerpool
......
473 480
    self._job.lock_status = msg
474 481

  
475 482

  
483
class _WaitForJobChangesHelper(object):
484
  """Helper class using initofy to wait for changes in a job file.
485

  
486
  This class takes a previous job status and serial, and alerts the client when
487
  the current job status has changed.
488

  
489
  @type job_id: string
490
  @ivar job_id: id of the job we're watching
491
  @type prev_job_info: string
492
  @ivar prev_job_info: previous job info, as passed by the luxi client
493
  @type prev_log_serial: string
494
  @ivar prev_log_serial: previous job serial, as passed by the luxi client
495
  @type queue: L{JobQueue}
496
  @ivar queue: job queue (used for a few utility functions)
497
  @type job_path: string
498
  @ivar job_path: absolute path of the job file
499
  @type wm: pyinotify.WatchManager (or None)
500
  @ivar wm: inotify watch manager to watch for changes
501
  @type inotify_handler: L{asyncnotifier.SingleFileEventHandler}
502
  @ivar inotify_handler: single file event handler, used for watching
503
  @type notifier: pyinotify.Notifier
504
  @ivar notifier: inotify single-threaded notifier, used for watching
505

  
506
  """
507

  
508
  def __init__(self, job_id, fields, prev_job_info, prev_log_serial, queue):
509
    self.job_id = job_id
510
    self.fields = fields
511
    self.prev_job_info = prev_job_info
512
    self.prev_log_serial = prev_log_serial
513
    self.queue = queue
514
    # pylint: disable-msg=W0212
515
    self.job_path = self.queue._GetJobPath(self.job_id)
516
    self.wm = None
517
    self.inotify_handler = None
518
    self.notifier = None
519

  
520
  def _SetupInotify(self):
521
    """Create the inotify
522

  
523
    @raises errors.InotifyError: if the notifier cannot be setup
524

  
525
    """
526
    if self.wm:
527
      return
528
    self.wm = pyinotify.WatchManager()
529
    self.inotify_handler = asyncnotifier.SingleFileEventHandler(self.wm,
530
                                                                self.OnInotify,
531
                                                                self.job_path)
532
    self.notifier = pyinotify.Notifier(self.wm, self.inotify_handler)
533
    self.inotify_handler.enable()
534

  
535
  def _LoadDiskStatus(self):
536
    job = self.queue.SafeLoadJobFromDisk(self.job_id)
537
    if not job:
538
      raise errors.JobLost()
539
    self.job_status = job.CalcStatus()
540

  
541
    job_info = job.GetInfo(self.fields)
542
    log_entries = job.GetLogEntries(self.prev_log_serial)
543
    # Serializing and deserializing data can cause type changes (e.g. from
544
    # tuple to list) or precision loss. We're doing it here so that we get
545
    # the same modifications as the data received from the client. Without
546
    # this, the comparison afterwards might fail without the data being
547
    # significantly different.
548
    # TODO: we just deserialized from disk, investigate how to make sure that
549
    # the job info and log entries are compatible to avoid this further step.
550
    self.job_info = serializer.LoadJson(serializer.DumpJson(job_info))
551
    self.log_entries = serializer.LoadJson(serializer.DumpJson(log_entries))
552

  
553
  def _CheckForChanges(self):
554
    self._LoadDiskStatus()
555
    # Don't even try to wait if the job is no longer running, there will be
556
    # no changes.
557
    if (self.job_status not in (constants.JOB_STATUS_QUEUED,
558
                                constants.JOB_STATUS_RUNNING,
559
                                constants.JOB_STATUS_WAITLOCK) or
560
        self.prev_job_info != self.job_info or
561
        (self.log_entries and self.prev_log_serial != self.log_entries[0][0])):
562
      logging.debug("Job %s changed", self.job_id)
563
      return (self.job_info, self.log_entries)
564

  
565
    raise utils.RetryAgain()
566

  
567
  def OnInotify(self, notifier_enabled):
568
    if not notifier_enabled:
569
      self.inotify_handler.enable()
570

  
571
  def WaitFn(self, timeout):
572
    self._SetupInotify()
573
    if self.notifier.check_events(timeout*1000):
574
      self.notifier.read_events()
575
    self.notifier.process_events()
576

  
577
  def WaitForChanges(self, timeout):
578
    try:
579
      return utils.Retry(self._CheckForChanges,
580
                         utils.RETRY_REMAINING_TIME,
581
                         timeout,
582
                         wait_fn=self.WaitFn)
583
    except (errors.InotifyError, errors.JobLost):
584
      return None
585
    except utils.RetryTimeout:
586
      return constants.JOB_NOTCHANGED
587

  
588
  def Close(self):
589
    if self.wm:
590
      self.notifier.stop()
591

  
592

  
476 593
class _JobQueueWorker(workerpool.BaseWorker):
477 594
  """The actual job workers.
478 595

  
......
1183 1300
    # Notify waiters about potential changes
1184 1301
    job.change.notifyAll()
1185 1302

  
1186
  @utils.LockedMethod
1187
  @_RequireOpenQueue
1188 1303
  def WaitForJobChanges(self, job_id, fields, prev_job_info, prev_log_serial,
1189 1304
                        timeout):
1190 1305
    """Waits for changes in a job.
......
1209 1324
        as such by the clients
1210 1325

  
1211 1326
    """
1212
    job = self._LoadJobUnlocked(job_id)
1213
    if not job:
1214
      logging.debug("Job %s not found", job_id)
1215
      return None
1216

  
1217
    def _CheckForChanges():
1218
      logging.debug("Waiting for changes in job %s", job_id)
1219

  
1220
      status = job.CalcStatus()
1221
      job_info = job.GetInfo(fields)
1222
      log_entries = job.GetLogEntries(prev_log_serial)
1223

  
1224
      # Serializing and deserializing data can cause type changes (e.g. from
1225
      # tuple to list) or precision loss. We're doing it here so that we get
1226
      # the same modifications as the data received from the client. Without
1227
      # this, the comparison afterwards might fail without the data being
1228
      # significantly different.
1229
      job_info = serializer.LoadJson(serializer.DumpJson(job_info))
1230
      log_entries = serializer.LoadJson(serializer.DumpJson(log_entries))
1231

  
1232
      # Don't even try to wait if the job is no longer running, there will be
1233
      # no changes.
1234
      if (status not in (constants.JOB_STATUS_QUEUED,
1235
                         constants.JOB_STATUS_RUNNING,
1236
                         constants.JOB_STATUS_WAITLOCK) or
1237
          prev_job_info != job_info or
1238
          (log_entries and prev_log_serial != log_entries[0][0])):
1239
        logging.debug("Job %s changed", job_id)
1240
        return (job_info, log_entries)
1241

  
1242
      raise utils.RetryAgain()
1243

  
1327
    helper = _WaitForJobChangesHelper(job_id, fields, prev_job_info,
1328
                                      prev_log_serial, self)
1244 1329
    try:
1245
      # Setting wait function to release the queue lock while waiting
1246
      return utils.Retry(_CheckForChanges, utils.RETRY_REMAINING_TIME, timeout,
1247
                         wait_fn=job.change.wait)
1248
    except utils.RetryTimeout:
1249
      return constants.JOB_NOTCHANGED
1330
      return helper.WaitForChanges(timeout)
1331
    finally:
1332
      helper.Close()
1250 1333

  
1251 1334
  @utils.LockedMethod
1252 1335
  @_RequireOpenQueue

Also available in: Unified diff