Revision 989a8bee lib/jqueue.py

b/lib/jqueue.py
54 54
from ganeti import jstore
55 55
from ganeti import rpc
56 56
from ganeti import netutils
57
from ganeti import compat
57 58

  
58 59

  
59 60
JOBQUEUE_THREADS = 25
......
481 482
    self._job.lock_status = msg
482 483

  
483 484

  
484
class _WaitForJobChangesHelper(object):
485
  """Helper class using initofy to wait for changes in a job file.
486

  
487
  This class takes a previous job status and serial, and alerts the client when
488
  the current job status has changed.
485
class _JobChangesChecker(object):
486
  def __init__(self, fields, prev_job_info, prev_log_serial):
487
    """Initializes this class.
489 488

  
490
  @type job_id: string
491
  @ivar job_id: id of the job we're watching
492
  @type prev_job_info: string
493
  @ivar prev_job_info: previous job info, as passed by the luxi client
494
  @type prev_log_serial: string
495
  @ivar prev_log_serial: previous job serial, as passed by the luxi client
496
  @type queue: L{JobQueue}
497
  @ivar queue: job queue (used for a few utility functions)
498
  @type job_path: string
499
  @ivar job_path: absolute path of the job file
500
  @type wm: pyinotify.WatchManager (or None)
501
  @ivar wm: inotify watch manager to watch for changes
502
  @type inotify_handler: L{asyncnotifier.SingleFileEventHandler}
503
  @ivar inotify_handler: single file event handler, used for watching
504
  @type notifier: pyinotify.Notifier
505
  @ivar notifier: inotify single-threaded notifier, used for watching
489
    @type fields: list of strings
490
    @param fields: Fields requested by LUXI client
491
    @type prev_job_info: string
492
    @param prev_job_info: previous job info, as passed by the LUXI client
493
    @type prev_log_serial: string
494
    @param prev_log_serial: previous job serial, as passed by the LUXI client
506 495

  
507
  """
508
  def __init__(self, job_id, fields, prev_job_info, prev_log_serial, queue):
509
    self.job_id = job_id
510
    self.fields = fields
511
    self.prev_job_info = prev_job_info
512
    self.prev_log_serial = prev_log_serial
513
    self.queue = queue
514
    # pylint: disable-msg=W0212
515
    self.job_path = self.queue._GetJobPath(self.job_id)
516
    self.wm = None
517
    self.inotify_handler = None
518
    self.notifier = None
496
    """
497
    self._fields = fields
498
    self._prev_job_info = prev_job_info
499
    self._prev_log_serial = prev_log_serial
519 500

  
520
  def _SetupInotify(self):
521
    """Create the inotify
501
  def __call__(self, job):
502
    """Checks whether job has changed.
522 503

  
523
    @raises errors.InotifyError: if the notifier cannot be setup
504
    @type job: L{_QueuedJob}
505
    @param job: Job object
524 506

  
525 507
    """
526
    if self.wm:
527
      return
528
    self.wm = pyinotify.WatchManager()
529
    self.inotify_handler = asyncnotifier.SingleFileEventHandler(self.wm,
530
                                                                self.OnInotify,
531
                                                                self.job_path)
532
    self.notifier = pyinotify.Notifier(self.wm, self.inotify_handler)
533
    self.inotify_handler.enable()
534

  
535
  def _LoadDiskStatus(self):
536
    job = self.queue.SafeLoadJobFromDisk(self.job_id)
537
    if not job:
538
      raise errors.JobLost()
539
    self.job_status = job.CalcStatus()
508
    status = job.CalcStatus()
509
    job_info = job.GetInfo(self._fields)
510
    log_entries = job.GetLogEntries(self._prev_log_serial)
540 511

  
541
    job_info = job.GetInfo(self.fields)
542
    log_entries = job.GetLogEntries(self.prev_log_serial)
543 512
    # Serializing and deserializing data can cause type changes (e.g. from
544 513
    # tuple to list) or precision loss. We're doing it here so that we get
545 514
    # the same modifications as the data received from the client. Without
......
547 516
    # significantly different.
548 517
    # TODO: we just deserialized from disk, investigate how to make sure that
549 518
    # the job info and log entries are compatible to avoid this further step.
550
    self.job_info = serializer.LoadJson(serializer.DumpJson(job_info))
551
    self.log_entries = serializer.LoadJson(serializer.DumpJson(log_entries))
519
    # TODO: Doing something like in testutils.py:UnifyValueType might be more
520
    # efficient, though floats will be tricky
521
    job_info = serializer.LoadJson(serializer.DumpJson(job_info))
522
    log_entries = serializer.LoadJson(serializer.DumpJson(log_entries))
552 523

  
553
  def _CheckForChanges(self):
554
    self._LoadDiskStatus()
555 524
    # Don't even try to wait if the job is no longer running, there will be
556 525
    # no changes.
557
    if (self.job_status not in (constants.JOB_STATUS_QUEUED,
558
                                constants.JOB_STATUS_RUNNING,
559
                                constants.JOB_STATUS_WAITLOCK) or
560
        self.prev_job_info != self.job_info or
561
        (self.log_entries and self.prev_log_serial != self.log_entries[0][0])):
562
      logging.debug("Job %s changed", self.job_id)
563
      return (self.job_info, self.log_entries)
526
    if (status not in (constants.JOB_STATUS_QUEUED,
527
                       constants.JOB_STATUS_RUNNING,
528
                       constants.JOB_STATUS_WAITLOCK) or
529
        job_info != self._prev_job_info or
530
        (log_entries and self._prev_log_serial != log_entries[0][0])):
531
      logging.debug("Job %s changed", job.id)
532
      return (job_info, log_entries)
564 533

  
565
    raise utils.RetryAgain()
534
    return None
535

  
536

  
537
class _JobFileChangesWaiter(object):
538
  def __init__(self, filename):
539
    """Initializes this class.
540

  
541
    @type filename: string
542
    @param filename: Path to job file
543
    @raises errors.InotifyError: if the notifier cannot be setup
566 544

  
567
  def OnInotify(self, notifier_enabled):
545
    """
546
    self._wm = pyinotify.WatchManager()
547
    self._inotify_handler = \
548
      asyncnotifier.SingleFileEventHandler(self._wm, self._OnInotify, filename)
549
    self._notifier = \
550
      pyinotify.Notifier(self._wm, default_proc_fun=self._inotify_handler)
551
    try:
552
      self._inotify_handler.enable()
553
    except Exception:
554
      # pyinotify doesn't close file descriptors automatically
555
      self._notifier.stop()
556
      raise
557

  
558
  def _OnInotify(self, notifier_enabled):
559
    """Callback for inotify.
560

  
561
    """
568 562
    if not notifier_enabled:
569
      self.inotify_handler.enable()
563
      self._inotify_handler.enable()
564

  
565
  def Wait(self, timeout):
566
    """Waits for the job file to change.
567

  
568
    @type timeout: float
569
    @param timeout: Timeout in seconds
570
    @return: Whether there have been events
571

  
572
    """
573
    assert timeout >= 0
574
    have_events = self._notifier.check_events(timeout * 1000)
575
    if have_events:
576
      self._notifier.read_events()
577
    self._notifier.process_events()
578
    return have_events
579

  
580
  def Close(self):
581
    """Closes underlying notifier and its file descriptor.
582

  
583
    """
584
    self._notifier.stop()
585

  
586

  
587
class _JobChangesWaiter(object):
588
  def __init__(self, filename):
589
    """Initializes this class.
590

  
591
    @type filename: string
592
    @param filename: Path to job file
593

  
594
    """
595
    self._filewaiter = None
596
    self._filename = filename
570 597

  
571
  def WaitFn(self, timeout):
572
    self._SetupInotify()
573
    if self.notifier.check_events(timeout*1000):
574
      self.notifier.read_events()
575
    self.notifier.process_events()
598
  def Wait(self, timeout):
599
    """Waits for a job to change.
576 600

  
577
  def WaitForChanges(self, timeout):
578
    self._SetupInotify()
601
    @type timeout: float
602
    @param timeout: Timeout in seconds
603
    @return: Whether there have been events
604

  
605
    """
606
    if self._filewaiter:
607
      return self._filewaiter.Wait(timeout)
608

  
609
    # Lazy setup: Avoid inotify setup cost when job file has already changed.
610
    # If this point is reached, return immediately and let caller check the job
611
    # file again in case there were changes since the last check. This avoids a
612
    # race condition.
613
    self._filewaiter = _JobFileChangesWaiter(self._filename)
614

  
615
    return True
616

  
617
  def Close(self):
618
    """Closes underlying waiter.
619

  
620
    """
621
    if self._filewaiter:
622
      self._filewaiter.Close()
623

  
624

  
625
class _WaitForJobChangesHelper(object):
626
  """Helper class using inotify to wait for changes in a job file.
627

  
628
  This class takes a previous job status and serial, and alerts the client when
629
  the current job status has changed.
630

  
631
  """
632
  @staticmethod
633
  def _CheckForChanges(job_load_fn, check_fn):
634
    job = job_load_fn()
635
    if not job:
636
      raise errors.JobLost()
637

  
638
    result = check_fn(job)
639
    if result is None:
640
      raise utils.RetryAgain()
641

  
642
    return result
643

  
644
  def __call__(self, filename, job_load_fn,
645
               fields, prev_job_info, prev_log_serial, timeout):
646
    """Waits for changes on a job.
647

  
648
    @type filename: string
649
    @param filename: File on which to wait for changes
650
    @type job_load_fn: callable
651
    @param job_load_fn: Function to load job
652
    @type fields: list of strings
653
    @param fields: Which fields to check for changes
654
    @type prev_job_info: list or None
655
    @param prev_job_info: Last job information returned
656
    @type prev_log_serial: int
657
    @param prev_log_serial: Last job message serial number
658
    @type timeout: float
659
    @param timeout: maximum time to wait in seconds
660

  
661
    """
579 662
    try:
580
      return utils.Retry(self._CheckForChanges,
581
                         utils.RETRY_REMAINING_TIME,
582
                         timeout,
583
                         wait_fn=self.WaitFn)
663
      check_fn = _JobChangesChecker(fields, prev_job_info, prev_log_serial)
664
      waiter = _JobChangesWaiter(filename)
665
      try:
666
        return utils.Retry(compat.partial(self._CheckForChanges,
667
                                          job_load_fn, check_fn),
668
                           utils.RETRY_REMAINING_TIME, timeout,
669
                           wait_fn=waiter.Wait)
670
      finally:
671
        waiter.Close()
584 672
    except (errors.InotifyError, errors.JobLost):
585 673
      return None
586 674
    except utils.RetryTimeout:
587 675
      return constants.JOB_NOTCHANGED
588 676

  
589
  def Close(self):
590
    if self.wm:
591
      self.notifier.stop()
592

  
593 677

  
594 678
class _JobQueueWorker(workerpool.BaseWorker):
595 679
  """The actual job workers.
......
1314 1398
    @type prev_log_serial: int
1315 1399
    @param prev_log_serial: Last job message serial number
1316 1400
    @type timeout: float
1317
    @param timeout: maximum time to wait
1401
    @param timeout: maximum time to wait in seconds
1318 1402
    @rtype: tuple (job info, log entries)
1319 1403
    @return: a tuple of the job information as required via
1320 1404
        the fields parameter, and the log entries as a list
......
1325 1409
        as such by the clients
1326 1410

  
1327 1411
    """
1328
    helper = _WaitForJobChangesHelper(job_id, fields, prev_job_info,
1329
                                      prev_log_serial, self)
1330
    try:
1331
      return helper.WaitForChanges(timeout)
1332
    finally:
1333
      helper.Close()
1412
    load_fn = compat.partial(self.SafeLoadJobFromDisk, job_id)
1413

  
1414
    helper = _WaitForJobChangesHelper()
1415

  
1416
    return helper(self._GetJobPath(job_id), load_fn,
1417
                  fields, prev_job_info, prev_log_serial, timeout)
1334 1418

  
1335 1419
  @locking.ssynchronized(_LOCK)
1336 1420
  @_RequireOpenQueue
......
1380 1464
    archive_jobs = []
1381 1465
    rename_files = []
1382 1466
    for job in jobs:
1383
      if job.CalcStatus() not in (constants.JOB_STATUS_CANCELED,
1384
                                  constants.JOB_STATUS_SUCCESS,
1385
                                  constants.JOB_STATUS_ERROR):
1467
      if job.CalcStatus() not in constants.JOBS_FINALIZED:
1386 1468
        logging.debug("Job %s is not yet done", job.id)
1387 1469
        continue
1388 1470

  

Also available in: Unified diff