Revision 989a8bee

b/Makefile.am
377 377
	test/ganeti.hooks_unittest.py \
378 378
	test/ganeti.http_unittest.py \
379 379
	test/ganeti.impexpd_unittest.py \
380
	test/ganeti.jqueue_unittest.py \
380 381
	test/ganeti.locking_unittest.py \
381 382
	test/ganeti.luxi_unittest.py \
382 383
	test/ganeti.masterd.instance_unittest.py \
b/lib/constants.py
785 785
JOB_STATUS_CANCELED = "canceled"
786 786
JOB_STATUS_SUCCESS = "success"
787 787
JOB_STATUS_ERROR = "error"
788
JOBS_FINALIZED = frozenset([
789
  JOB_STATUS_CANCELED,
790
  JOB_STATUS_SUCCESS,
791
  JOB_STATUS_ERROR,
792
  ])
788 793

  
789 794
# OpCode status
790 795
# not yet finalized
b/lib/jqueue.py
54 54
from ganeti import jstore
55 55
from ganeti import rpc
56 56
from ganeti import netutils
57
from ganeti import compat
57 58

  
58 59

  
59 60
JOBQUEUE_THREADS = 25
......
481 482
    self._job.lock_status = msg
482 483

  
483 484

  
484
class _WaitForJobChangesHelper(object):
485
  """Helper class using initofy to wait for changes in a job file.
486

  
487
  This class takes a previous job status and serial, and alerts the client when
488
  the current job status has changed.
485
class _JobChangesChecker(object):
486
  def __init__(self, fields, prev_job_info, prev_log_serial):
487
    """Initializes this class.
489 488

  
490
  @type job_id: string
491
  @ivar job_id: id of the job we're watching
492
  @type prev_job_info: string
493
  @ivar prev_job_info: previous job info, as passed by the luxi client
494
  @type prev_log_serial: string
495
  @ivar prev_log_serial: previous job serial, as passed by the luxi client
496
  @type queue: L{JobQueue}
497
  @ivar queue: job queue (used for a few utility functions)
498
  @type job_path: string
499
  @ivar job_path: absolute path of the job file
500
  @type wm: pyinotify.WatchManager (or None)
501
  @ivar wm: inotify watch manager to watch for changes
502
  @type inotify_handler: L{asyncnotifier.SingleFileEventHandler}
503
  @ivar inotify_handler: single file event handler, used for watching
504
  @type notifier: pyinotify.Notifier
505
  @ivar notifier: inotify single-threaded notifier, used for watching
489
    @type fields: list of strings
490
    @param fields: Fields requested by LUXI client
491
    @type prev_job_info: string
492
    @param prev_job_info: previous job info, as passed by the LUXI client
493
    @type prev_log_serial: string
494
    @param prev_log_serial: previous job serial, as passed by the LUXI client
506 495

  
507
  """
508
  def __init__(self, job_id, fields, prev_job_info, prev_log_serial, queue):
509
    self.job_id = job_id
510
    self.fields = fields
511
    self.prev_job_info = prev_job_info
512
    self.prev_log_serial = prev_log_serial
513
    self.queue = queue
514
    # pylint: disable-msg=W0212
515
    self.job_path = self.queue._GetJobPath(self.job_id)
516
    self.wm = None
517
    self.inotify_handler = None
518
    self.notifier = None
496
    """
497
    self._fields = fields
498
    self._prev_job_info = prev_job_info
499
    self._prev_log_serial = prev_log_serial
519 500

  
520
  def _SetupInotify(self):
521
    """Create the inotify
501
  def __call__(self, job):
502
    """Checks whether job has changed.
522 503

  
523
    @raises errors.InotifyError: if the notifier cannot be setup
504
    @type job: L{_QueuedJob}
505
    @param job: Job object
524 506

  
525 507
    """
526
    if self.wm:
527
      return
528
    self.wm = pyinotify.WatchManager()
529
    self.inotify_handler = asyncnotifier.SingleFileEventHandler(self.wm,
530
                                                                self.OnInotify,
531
                                                                self.job_path)
532
    self.notifier = pyinotify.Notifier(self.wm, self.inotify_handler)
533
    self.inotify_handler.enable()
534

  
535
  def _LoadDiskStatus(self):
536
    job = self.queue.SafeLoadJobFromDisk(self.job_id)
537
    if not job:
538
      raise errors.JobLost()
539
    self.job_status = job.CalcStatus()
508
    status = job.CalcStatus()
509
    job_info = job.GetInfo(self._fields)
510
    log_entries = job.GetLogEntries(self._prev_log_serial)
540 511

  
541
    job_info = job.GetInfo(self.fields)
542
    log_entries = job.GetLogEntries(self.prev_log_serial)
543 512
    # Serializing and deserializing data can cause type changes (e.g. from
544 513
    # tuple to list) or precision loss. We're doing it here so that we get
545 514
    # the same modifications as the data received from the client. Without
......
547 516
    # significantly different.
548 517
    # TODO: we just deserialized from disk, investigate how to make sure that
549 518
    # the job info and log entries are compatible to avoid this further step.
550
    self.job_info = serializer.LoadJson(serializer.DumpJson(job_info))
551
    self.log_entries = serializer.LoadJson(serializer.DumpJson(log_entries))
519
    # TODO: Doing something like in testutils.py:UnifyValueType might be more
520
    # efficient, though floats will be tricky
521
    job_info = serializer.LoadJson(serializer.DumpJson(job_info))
522
    log_entries = serializer.LoadJson(serializer.DumpJson(log_entries))
552 523

  
553
  def _CheckForChanges(self):
554
    self._LoadDiskStatus()
555 524
    # Don't even try to wait if the job is no longer running, there will be
556 525
    # no changes.
557
    if (self.job_status not in (constants.JOB_STATUS_QUEUED,
558
                                constants.JOB_STATUS_RUNNING,
559
                                constants.JOB_STATUS_WAITLOCK) or
560
        self.prev_job_info != self.job_info or
561
        (self.log_entries and self.prev_log_serial != self.log_entries[0][0])):
562
      logging.debug("Job %s changed", self.job_id)
563
      return (self.job_info, self.log_entries)
526
    if (status not in (constants.JOB_STATUS_QUEUED,
527
                       constants.JOB_STATUS_RUNNING,
528
                       constants.JOB_STATUS_WAITLOCK) or
529
        job_info != self._prev_job_info or
530
        (log_entries and self._prev_log_serial != log_entries[0][0])):
531
      logging.debug("Job %s changed", job.id)
532
      return (job_info, log_entries)
564 533

  
565
    raise utils.RetryAgain()
534
    return None
535

  
536

  
537
class _JobFileChangesWaiter(object):
538
  def __init__(self, filename):
539
    """Initializes this class.
540

  
541
    @type filename: string
542
    @param filename: Path to job file
543
    @raises errors.InotifyError: if the notifier cannot be setup
566 544

  
567
  def OnInotify(self, notifier_enabled):
545
    """
546
    self._wm = pyinotify.WatchManager()
547
    self._inotify_handler = \
548
      asyncnotifier.SingleFileEventHandler(self._wm, self._OnInotify, filename)
549
    self._notifier = \
550
      pyinotify.Notifier(self._wm, default_proc_fun=self._inotify_handler)
551
    try:
552
      self._inotify_handler.enable()
553
    except Exception:
554
      # pyinotify doesn't close file descriptors automatically
555
      self._notifier.stop()
556
      raise
557

  
558
  def _OnInotify(self, notifier_enabled):
559
    """Callback for inotify.
560

  
561
    """
568 562
    if not notifier_enabled:
569
      self.inotify_handler.enable()
563
      self._inotify_handler.enable()
564

  
565
  def Wait(self, timeout):
566
    """Waits for the job file to change.
567

  
568
    @type timeout: float
569
    @param timeout: Timeout in seconds
570
    @return: Whether there have been events
571

  
572
    """
573
    assert timeout >= 0
574
    have_events = self._notifier.check_events(timeout * 1000)
575
    if have_events:
576
      self._notifier.read_events()
577
    self._notifier.process_events()
578
    return have_events
579

  
580
  def Close(self):
581
    """Closes underlying notifier and its file descriptor.
582

  
583
    """
584
    self._notifier.stop()
585

  
586

  
587
class _JobChangesWaiter(object):
588
  def __init__(self, filename):
589
    """Initializes this class.
590

  
591
    @type filename: string
592
    @param filename: Path to job file
593

  
594
    """
595
    self._filewaiter = None
596
    self._filename = filename
570 597

  
571
  def WaitFn(self, timeout):
572
    self._SetupInotify()
573
    if self.notifier.check_events(timeout*1000):
574
      self.notifier.read_events()
575
    self.notifier.process_events()
598
  def Wait(self, timeout):
599
    """Waits for a job to change.
576 600

  
577
  def WaitForChanges(self, timeout):
578
    self._SetupInotify()
601
    @type timeout: float
602
    @param timeout: Timeout in seconds
603
    @return: Whether there have been events
604

  
605
    """
606
    if self._filewaiter:
607
      return self._filewaiter.Wait(timeout)
608

  
609
    # Lazy setup: Avoid inotify setup cost when job file has already changed.
610
    # If this point is reached, return immediately and let caller check the job
611
    # file again in case there were changes since the last check. This avoids a
612
    # race condition.
613
    self._filewaiter = _JobFileChangesWaiter(self._filename)
614

  
615
    return True
616

  
617
  def Close(self):
618
    """Closes underlying waiter.
619

  
620
    """
621
    if self._filewaiter:
622
      self._filewaiter.Close()
623

  
624

  
625
class _WaitForJobChangesHelper(object):
626
  """Helper class using inotify to wait for changes in a job file.
627

  
628
  This class takes a previous job status and serial, and alerts the client when
629
  the current job status has changed.
630

  
631
  """
632
  @staticmethod
633
  def _CheckForChanges(job_load_fn, check_fn):
634
    job = job_load_fn()
635
    if not job:
636
      raise errors.JobLost()
637

  
638
    result = check_fn(job)
639
    if result is None:
640
      raise utils.RetryAgain()
641

  
642
    return result
643

  
644
  def __call__(self, filename, job_load_fn,
645
               fields, prev_job_info, prev_log_serial, timeout):
646
    """Waits for changes on a job.
647

  
648
    @type filename: string
649
    @param filename: File on which to wait for changes
650
    @type job_load_fn: callable
651
    @param job_load_fn: Function to load job
652
    @type fields: list of strings
653
    @param fields: Which fields to check for changes
654
    @type prev_job_info: list or None
655
    @param prev_job_info: Last job information returned
656
    @type prev_log_serial: int
657
    @param prev_log_serial: Last job message serial number
658
    @type timeout: float
659
    @param timeout: maximum time to wait in seconds
660

  
661
    """
579 662
    try:
580
      return utils.Retry(self._CheckForChanges,
581
                         utils.RETRY_REMAINING_TIME,
582
                         timeout,
583
                         wait_fn=self.WaitFn)
663
      check_fn = _JobChangesChecker(fields, prev_job_info, prev_log_serial)
664
      waiter = _JobChangesWaiter(filename)
665
      try:
666
        return utils.Retry(compat.partial(self._CheckForChanges,
667
                                          job_load_fn, check_fn),
668
                           utils.RETRY_REMAINING_TIME, timeout,
669
                           wait_fn=waiter.Wait)
670
      finally:
671
        waiter.Close()
584 672
    except (errors.InotifyError, errors.JobLost):
585 673
      return None
586 674
    except utils.RetryTimeout:
587 675
      return constants.JOB_NOTCHANGED
588 676

  
589
  def Close(self):
590
    if self.wm:
591
      self.notifier.stop()
592

  
593 677

  
594 678
class _JobQueueWorker(workerpool.BaseWorker):
595 679
  """The actual job workers.
......
1314 1398
    @type prev_log_serial: int
1315 1399
    @param prev_log_serial: Last job message serial number
1316 1400
    @type timeout: float
1317
    @param timeout: maximum time to wait
1401
    @param timeout: maximum time to wait in seconds
1318 1402
    @rtype: tuple (job info, log entries)
1319 1403
    @return: a tuple of the job information as required via
1320 1404
        the fields parameter, and the log entries as a list
......
1325 1409
        as such by the clients
1326 1410

  
1327 1411
    """
1328
    helper = _WaitForJobChangesHelper(job_id, fields, prev_job_info,
1329
                                      prev_log_serial, self)
1330
    try:
1331
      return helper.WaitForChanges(timeout)
1332
    finally:
1333
      helper.Close()
1412
    load_fn = compat.partial(self.SafeLoadJobFromDisk, job_id)
1413

  
1414
    helper = _WaitForJobChangesHelper()
1415

  
1416
    return helper(self._GetJobPath(job_id), load_fn,
1417
                  fields, prev_job_info, prev_log_serial, timeout)
1334 1418

  
1335 1419
  @locking.ssynchronized(_LOCK)
1336 1420
  @_RequireOpenQueue
......
1380 1464
    archive_jobs = []
1381 1465
    rename_files = []
1382 1466
    for job in jobs:
1383
      if job.CalcStatus() not in (constants.JOB_STATUS_CANCELED,
1384
                                  constants.JOB_STATUS_SUCCESS,
1385
                                  constants.JOB_STATUS_ERROR):
1467
      if job.CalcStatus() not in constants.JOBS_FINALIZED:
1386 1468
        logging.debug("Job %s is not yet done", job.id)
1387 1469
        continue
1388 1470

  
b/test/ganeti.jqueue_unittest.py
1
#!/usr/bin/python
2
#
3

  
4
# Copyright (C) 2010 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

  
21

  
22
"""Script for testing ganeti.jqueue"""
23

  
24
import os
25
import sys
26
import unittest
27
import tempfile
28
import shutil
29
import errno
30

  
31
from ganeti import constants
32
from ganeti import utils
33
from ganeti import errors
34
from ganeti import jqueue
35

  
36
import testutils
37

  
38

  
39
class _FakeJob:
40
  def __init__(self, job_id, status):
41
    self.id = job_id
42
    self._status = status
43
    self._log = []
44

  
45
  def SetStatus(self, status):
46
    self._status = status
47

  
48
  def AddLogEntry(self, msg):
49
    self._log.append((len(self._log), msg))
50

  
51
  def CalcStatus(self):
52
    return self._status
53

  
54
  def GetInfo(self, fields):
55
    result = []
56

  
57
    for name in fields:
58
      if name == "status":
59
        result.append(self._status)
60
      else:
61
        raise Exception("Unknown field")
62

  
63
    return result
64

  
65
  def GetLogEntries(self, newer_than):
66
    assert newer_than is None or newer_than >= 0
67

  
68
    if newer_than is None:
69
      return self._log
70

  
71
    return self._log[newer_than:]
72

  
73

  
74
class TestJobChangesChecker(unittest.TestCase):
75
  def testStatus(self):
76
    job = _FakeJob(9094, constants.JOB_STATUS_QUEUED)
77
    checker = jqueue._JobChangesChecker(["status"], None, None)
78
    self.assertEqual(checker(job), ([constants.JOB_STATUS_QUEUED], []))
79

  
80
    job.SetStatus(constants.JOB_STATUS_RUNNING)
81
    self.assertEqual(checker(job), ([constants.JOB_STATUS_RUNNING], []))
82

  
83
    job.SetStatus(constants.JOB_STATUS_SUCCESS)
84
    self.assertEqual(checker(job), ([constants.JOB_STATUS_SUCCESS], []))
85

  
86
    # job.id is used by checker
87
    self.assertEqual(job.id, 9094)
88

  
89
  def testStatusWithPrev(self):
90
    job = _FakeJob(12807, constants.JOB_STATUS_QUEUED)
91
    checker = jqueue._JobChangesChecker(["status"],
92
                                        [constants.JOB_STATUS_QUEUED], None)
93
    self.assert_(checker(job) is None)
94

  
95
    job.SetStatus(constants.JOB_STATUS_RUNNING)
96
    self.assertEqual(checker(job), ([constants.JOB_STATUS_RUNNING], []))
97

  
98
  def testFinalStatus(self):
99
    for status in constants.JOBS_FINALIZED:
100
      job = _FakeJob(2178711, status)
101
      checker = jqueue._JobChangesChecker(["status"], [status], None)
102
      # There won't be any changes in this status, hence it should signal
103
      # a change immediately
104
      self.assertEqual(checker(job), ([status], []))
105

  
106
  def testLog(self):
107
    job = _FakeJob(9094, constants.JOB_STATUS_RUNNING)
108
    checker = jqueue._JobChangesChecker(["status"], None, None)
109
    self.assertEqual(checker(job), ([constants.JOB_STATUS_RUNNING], []))
110

  
111
    job.AddLogEntry("Hello World")
112
    (job_info, log_entries) = checker(job)
113
    self.assertEqual(job_info, [constants.JOB_STATUS_RUNNING])
114
    self.assertEqual(log_entries, [[0, "Hello World"]])
115

  
116
    checker2 = jqueue._JobChangesChecker(["status"], job_info, len(log_entries))
117
    self.assert_(checker2(job) is None)
118

  
119
    job.AddLogEntry("Foo Bar")
120
    job.SetStatus(constants.JOB_STATUS_ERROR)
121

  
122
    (job_info, log_entries) = checker2(job)
123
    self.assertEqual(job_info, [constants.JOB_STATUS_ERROR])
124
    self.assertEqual(log_entries, [[1, "Foo Bar"]])
125

  
126
    checker3 = jqueue._JobChangesChecker(["status"], None, None)
127
    (job_info, log_entries) = checker3(job)
128
    self.assertEqual(job_info, [constants.JOB_STATUS_ERROR])
129
    self.assertEqual(log_entries, [[0, "Hello World"], [1, "Foo Bar"]])
130

  
131

  
132
class TestJobChangesWaiter(unittest.TestCase):
133
  def setUp(self):
134
    self.tmpdir = tempfile.mkdtemp()
135
    self.filename = utils.PathJoin(self.tmpdir, "job-1")
136
    utils.WriteFile(self.filename, data="")
137

  
138
  def tearDown(self):
139
    shutil.rmtree(self.tmpdir)
140

  
141
  def _EnsureNotifierClosed(self, notifier):
142
    try:
143
      os.fstat(notifier._fd)
144
    except EnvironmentError, err:
145
      self.assertEqual(err.errno, errno.EBADF)
146
    else:
147
      self.fail("File descriptor wasn't closed")
148

  
149
  def testClose(self):
150
    for wait in [False, True]:
151
      waiter = jqueue._JobFileChangesWaiter(self.filename)
152
      try:
153
        if wait:
154
          waiter.Wait(0.001)
155
      finally:
156
        waiter.Close()
157

  
158
      # Ensure file descriptor was closed
159
      self._EnsureNotifierClosed(waiter._notifier)
160

  
161
  def testChangingFile(self):
162
    waiter = jqueue._JobFileChangesWaiter(self.filename)
163
    try:
164
      self.assertFalse(waiter.Wait(0.1))
165
      utils.WriteFile(self.filename, data="changed")
166
      self.assert_(waiter.Wait(60))
167
    finally:
168
      waiter.Close()
169

  
170
    self._EnsureNotifierClosed(waiter._notifier)
171

  
172
  def testChangingFile2(self):
173
    waiter = jqueue._JobChangesWaiter(self.filename)
174
    try:
175
      self.assertFalse(waiter._filewaiter)
176
      self.assert_(waiter.Wait(0.1))
177
      self.assert_(waiter._filewaiter)
178

  
179
      # File waiter is now used, but there have been no changes
180
      self.assertFalse(waiter.Wait(0.1))
181
      utils.WriteFile(self.filename, data="changed")
182
      self.assert_(waiter.Wait(60))
183
    finally:
184
      waiter.Close()
185

  
186
    self._EnsureNotifierClosed(waiter._filewaiter._notifier)
187

  
188

  
189
class TestWaitForJobChangesHelper(unittest.TestCase):
190
  def setUp(self):
191
    self.tmpdir = tempfile.mkdtemp()
192
    self.filename = utils.PathJoin(self.tmpdir, "job-2614226563")
193
    utils.WriteFile(self.filename, data="")
194

  
195
  def tearDown(self):
196
    shutil.rmtree(self.tmpdir)
197

  
198
  def _LoadWaitingJob(self):
199
    return _FakeJob(2614226563, constants.JOB_STATUS_WAITLOCK)
200

  
201
  def _LoadLostJob(self):
202
    return None
203

  
204
  def testNoChanges(self):
205
    wfjc = jqueue._WaitForJobChangesHelper()
206

  
207
    # No change
208
    self.assertEqual(wfjc(self.filename, self._LoadWaitingJob, ["status"],
209
                          [constants.JOB_STATUS_WAITLOCK], None, 0.1),
210
                     constants.JOB_NOTCHANGED)
211

  
212
    # No previous information
213
    self.assertEqual(wfjc(self.filename, self._LoadWaitingJob,
214
                          ["status"], None, None, 1.0),
215
                     ([constants.JOB_STATUS_WAITLOCK], []))
216

  
217
  def testLostJob(self):
218
    wfjc = jqueue._WaitForJobChangesHelper()
219
    self.assert_(wfjc(self.filename, self._LoadLostJob,
220
                      ["status"], None, None, 1.0) is None)
221

  
222

  
223
if __name__ == "__main__":
224
  testutils.GanetiTestProgram()

Also available in: Unified diff