Revision fcb21ad7

b/lib/jqueue.py
1340 1340
   CONTINUE,
1341 1341
   WRONGSTATUS) = range(1, 6)
1342 1342

  
1343
  # TODO: Export waiter information to lock monitor
1344

  
1345 1343
  def __init__(self, getstatus_fn, enqueue_fn):
1346 1344
    """Initializes this class.
1347 1345

  
......
1353 1351
    self._lock = locking.SharedLock("JobDepMgr")
1354 1352

  
1355 1353
  @locking.ssynchronized(_LOCK, shared=1)
1354
  def GetLockInfo(self, requested): # pylint: disable-msg=W0613
1355
    """Retrieves information about waiting jobs.
1356

  
1357
    @type requested: set
1358
    @param requested: Requested information, see C{query.LQ_*}
1359

  
1360
    """
1361
    # No need to sort here, that's being done by the lock manager and query
1362
    # library. There are no priorities for notifying jobs, hence all show up as
1363
    # one item under "pending".
1364
    return [("job/%s" % job_id, None, None,
1365
             [("job", [job.id for job in waiters])])
1366
            for job_id, waiters in self._waiters.items()
1367
            if waiters]
1368

  
1369
  @locking.ssynchronized(_LOCK, shared=1)
1356 1370
  def JobWaiting(self, job):
1357 1371
    """Checks if a job is waiting.
1358 1372

  
......
1527 1541
    # Job dependencies
1528 1542
    self.depmgr = _JobDependencyManager(self._GetJobStatusForDependencies,
1529 1543
                                        self._EnqueueJobs)
1544
    self.context.glm.AddToLockMonitor(self.depmgr)
1530 1545

  
1531 1546
    # Setup worker pool
1532 1547
    self._wpool = _JobQueueWorkerPool(self)
b/test/ganeti.jqueue_unittest.py
28 28
import shutil
29 29
import errno
30 30
import itertools
31
import random
31 32

  
32 33
from ganeti import constants
33 34
from ganeti import utils
......
36 37
from ganeti import opcodes
37 38
from ganeti import compat
38 39
from ganeti import mcpu
40
from ganeti import query
39 41

  
40 42
import testutils
41 43

  
......
1887 1889
    self.assertEqual(self.jdm._waiters, {
1888 1890
      job_id: set([job]),
1889 1891
      })
1892
    self.assertEqual(self.jdm.GetLockInfo([query.LQ_PENDING]), [
1893
      ("job/28625", None, None, [("job", [job.id])])
1894
      ])
1890 1895

  
1891 1896
    self._status.append((job_id, constants.JOB_STATUS_CANCELED))
1892 1897
    (result, _) = self.jdm.CheckAndRegister(job, job_id, [])
......
1894 1899
    self.assertFalse(self._status)
1895 1900
    self.assertFalse(self._queue)
1896 1901
    self.assertFalse(self.jdm.JobWaiting(job))
1902
    self.assertFalse(self.jdm.GetLockInfo([query.LQ_PENDING]))
1897 1903

  
1898 1904
  def testRequireCancel(self):
1899 1905
    job = self._FakeJob(5278)
......
1909 1915
    self.assertEqual(self.jdm._waiters, {
1910 1916
      job_id: set([job]),
1911 1917
      })
1918
    self.assertEqual(self.jdm.GetLockInfo([query.LQ_PENDING]), [
1919
      ("job/9610", None, None, [("job", [job.id])])
1920
      ])
1912 1921

  
1913 1922
    self._status.append((job_id, constants.JOB_STATUS_CANCELED))
1914 1923
    (result, _) = self.jdm.CheckAndRegister(job, job_id, dep_status)
......
1916 1925
    self.assertFalse(self._status)
1917 1926
    self.assertFalse(self._queue)
1918 1927
    self.assertFalse(self.jdm.JobWaiting(job))
1928
    self.assertFalse(self.jdm.GetLockInfo([query.LQ_PENDING]))
1919 1929

  
1920 1930
  def testRequireError(self):
1921 1931
    job = self._FakeJob(21459)
......
1938 1948
    self.assertFalse(self._status)
1939 1949
    self.assertFalse(self._queue)
1940 1950
    self.assertFalse(self.jdm.JobWaiting(job))
1951
    self.assertFalse(self.jdm.GetLockInfo([query.LQ_PENDING]))
1941 1952

  
1942 1953
  def testRequireMultiple(self):
1943 1954
    dep_status = list(constants.JOBS_FINALIZED)
......
1955 1966
      self.assertEqual(self.jdm._waiters, {
1956 1967
        job_id: set([job]),
1957 1968
        })
1969
      self.assertEqual(self.jdm.GetLockInfo([query.LQ_PENDING]), [
1970
        ("job/14609", None, None, [("job", [job.id])])
1971
        ])
1958 1972

  
1959 1973
      self._status.append((job_id, end_status))
1960 1974
      (result, _) = self.jdm.CheckAndRegister(job, job_id, dep_status)
......
1962 1976
      self.assertFalse(self._status)
1963 1977
      self.assertFalse(self._queue)
1964 1978
      self.assertFalse(self.jdm.JobWaiting(job))
1979
      self.assertFalse(self.jdm.GetLockInfo([query.LQ_PENDING]))
1965 1980

  
1966 1981
  def testNotify(self):
1967 1982
    job = self._FakeJob(8227)
......
2050 2065
    self.assertFalse(self._status)
2051 2066
    self.assertFalse(self._queue)
2052 2067

  
2068
  def testMultipleWaiting(self):
2069
    # Use a deterministic random generator
2070
    rnd = random.Random(21402)
2071

  
2072
    job_ids = map(str, rnd.sample(range(1, 10000), 150))
2073

  
2074
    waiters = dict((job_ids.pop(),
2075
                    set(map(self._FakeJob,
2076
                            [job_ids.pop()
2077
                             for _ in range(rnd.randint(1, 20))])))
2078
                   for _ in range(10))
2079

  
2080
    # Ensure there are no duplicate job IDs
2081
    assert not utils.FindDuplicates(waiters.keys() +
2082
                                    [job.id
2083
                                     for jobs in waiters.values()
2084
                                     for job in jobs])
2085

  
2086
    # Register all jobs as waiters
2087
    for job_id, job in [(job_id, job)
2088
                        for (job_id, jobs) in waiters.items()
2089
                        for job in jobs]:
2090
      self._status.append((job_id, constants.JOB_STATUS_QUEUED))
2091
      (result, _) = self.jdm.CheckAndRegister(job, job_id,
2092
                                              [constants.JOB_STATUS_SUCCESS])
2093
      self.assertEqual(result, self.jdm.WAIT)
2094
      self.assertFalse(self._status)
2095
      self.assertFalse(self._queue)
2096
      self.assertTrue(self.jdm.JobWaiting(job))
2097

  
2098
    self.assertEqual(self.jdm._waiters, waiters)
2099

  
2100
    def _MakeSet((name, mode, owner_names, pending)):
2101
      return (name, mode, owner_names,
2102
              [(pendmode, set(pend)) for (pendmode, pend) in pending])
2103

  
2104
    def _CheckLockInfo():
2105
      info = self.jdm.GetLockInfo([query.LQ_PENDING])
2106
      self.assertEqual(sorted(map(_MakeSet, info)), sorted([
2107
        ("job/%s" % job_id, None, None,
2108
         [("job", set([job.id for job in jobs]))])
2109
        for job_id, jobs in waiters.items()
2110
        if jobs
2111
        ]))
2112

  
2113
    _CheckLockInfo()
2114

  
2115
    # Notify in random order
2116
    for job_id in rnd.sample(waiters, len(waiters)):
2117
      # Remove from pending waiter list
2118
      jobs = waiters.pop(job_id)
2119
      for job in jobs:
2120
        self._status.append((job_id, constants.JOB_STATUS_SUCCESS))
2121
        (result, _) = self.jdm.CheckAndRegister(job, job_id,
2122
                                                [constants.JOB_STATUS_SUCCESS])
2123
        self.assertEqual(result, self.jdm.CONTINUE)
2124
        self.assertFalse(self._status)
2125
        self.assertFalse(self._queue)
2126
        self.assertFalse(self.jdm.JobWaiting(job))
2127

  
2128
      _CheckLockInfo()
2129

  
2130
    self.assertFalse(self.jdm.GetLockInfo([query.LQ_PENDING]))
2131

  
2132
    assert not waiters
2133

  
2053 2134
  def testSelfDependency(self):
2054 2135
    job = self._FakeJob(18937)
2055 2136

  
......
2068 2149
    (result, _) = jdm.CheckAndRegister(job, job_id, [])
2069 2150
    self.assertEqual(result, self.jdm.ERROR)
2070 2151
    self.assertFalse(jdm.JobWaiting(job))
2152
    self.assertFalse(jdm.GetLockInfo([query.LQ_PENDING]))
2071 2153

  
2072 2154

  
2073 2155
if __name__ == "__main__":

Also available in: Unified diff