Statistics
| Branch: | Tag: | Revision:

root / test / ganeti.workerpool_unittest.py @ 2e04d454

History | View | Annotate | Download (12 kB)

1 76094e37 Michael Hanselmann
#!/usr/bin/python
2 76094e37 Michael Hanselmann
#
3 76094e37 Michael Hanselmann
4 52c47e4e Michael Hanselmann
# Copyright (C) 2008, 2009, 2010 Google Inc.
5 76094e37 Michael Hanselmann
#
6 76094e37 Michael Hanselmann
# This program is free software; you can redistribute it and/or modify
7 76094e37 Michael Hanselmann
# it under the terms of the GNU General Public License as published by
8 76094e37 Michael Hanselmann
# the Free Software Foundation; either version 2 of the License, or
9 76094e37 Michael Hanselmann
# (at your option) any later version.
10 76094e37 Michael Hanselmann
#
11 76094e37 Michael Hanselmann
# This program is distributed in the hope that it will be useful, but
12 76094e37 Michael Hanselmann
# WITHOUT ANY WARRANTY; without even the implied warranty of
13 76094e37 Michael Hanselmann
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 76094e37 Michael Hanselmann
# General Public License for more details.
15 76094e37 Michael Hanselmann
#
16 76094e37 Michael Hanselmann
# You should have received a copy of the GNU General Public License
17 76094e37 Michael Hanselmann
# along with this program; if not, write to the Free Software
18 76094e37 Michael Hanselmann
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 76094e37 Michael Hanselmann
# 02110-1301, USA.
20 76094e37 Michael Hanselmann
21 76094e37 Michael Hanselmann
22 76094e37 Michael Hanselmann
"""Script for unittesting the workerpool module"""
23 76094e37 Michael Hanselmann
24 76094e37 Michael Hanselmann
import unittest
25 76094e37 Michael Hanselmann
import threading
26 76094e37 Michael Hanselmann
import time
27 76094e37 Michael Hanselmann
import sys
28 76094e37 Michael Hanselmann
import zlib
29 52c47e4e Michael Hanselmann
import random
30 76094e37 Michael Hanselmann
31 76094e37 Michael Hanselmann
from ganeti import workerpool
32 52c47e4e Michael Hanselmann
from ganeti import errors
33 76094e37 Michael Hanselmann
34 25231ec5 Michael Hanselmann
import testutils
35 25231ec5 Michael Hanselmann
36 7ed3248b Guido Trotter
37 52c47e4e Michael Hanselmann
class CountingContext(object):
38 7ed3248b Guido Trotter
  def __init__(self):
39 7ed3248b Guido Trotter
    self._lock = threading.Condition(threading.Lock())
40 7ed3248b Guido Trotter
    self.done = 0
41 7ed3248b Guido Trotter
42 7ed3248b Guido Trotter
  def DoneTask(self):
43 7ed3248b Guido Trotter
    self._lock.acquire()
44 7ed3248b Guido Trotter
    try:
45 7ed3248b Guido Trotter
      self.done += 1
46 7ed3248b Guido Trotter
    finally:
47 7ed3248b Guido Trotter
      self._lock.release()
48 7ed3248b Guido Trotter
49 7ed3248b Guido Trotter
  def GetDoneTasks(self):
50 7ed3248b Guido Trotter
    self._lock.acquire()
51 7ed3248b Guido Trotter
    try:
52 7ed3248b Guido Trotter
      return self.done
53 7ed3248b Guido Trotter
    finally:
54 7ed3248b Guido Trotter
      self._lock.release()
55 7ed3248b Guido Trotter
56 7ed3248b Guido Trotter
  @staticmethod
57 7ed3248b Guido Trotter
  def UpdateChecksum(current, value):
58 7ed3248b Guido Trotter
    return zlib.adler32(str(value), current)
59 76094e37 Michael Hanselmann
60 7ed3248b Guido Trotter
61 7ed3248b Guido Trotter
class CountingBaseWorker(workerpool.BaseWorker):
62 7ed3248b Guido Trotter
  def RunTask(self, ctx, text):
63 7ed3248b Guido Trotter
    ctx.DoneTask()
64 76094e37 Michael Hanselmann
65 76094e37 Michael Hanselmann
66 76094e37 Michael Hanselmann
class ChecksumContext:
67 76094e37 Michael Hanselmann
  CHECKSUM_START = zlib.adler32("")
68 76094e37 Michael Hanselmann
69 76094e37 Michael Hanselmann
  def __init__(self):
70 76094e37 Michael Hanselmann
    self.lock = threading.Condition(threading.Lock())
71 76094e37 Michael Hanselmann
    self.checksum = self.CHECKSUM_START
72 76094e37 Michael Hanselmann
73 76094e37 Michael Hanselmann
  @staticmethod
74 76094e37 Michael Hanselmann
  def UpdateChecksum(current, value):
75 76094e37 Michael Hanselmann
    return zlib.adler32(str(value), current)
76 76094e37 Michael Hanselmann
77 76094e37 Michael Hanselmann
78 76094e37 Michael Hanselmann
class ChecksumBaseWorker(workerpool.BaseWorker):
79 76094e37 Michael Hanselmann
  def RunTask(self, ctx, number):
80 daba67c7 Michael Hanselmann
    name = "number%s" % number
81 daba67c7 Michael Hanselmann
    self.SetTaskName(name)
82 daba67c7 Michael Hanselmann
83 daba67c7 Michael Hanselmann
    # This assertion needs to be checked before updating the checksum. A
84 daba67c7 Michael Hanselmann
    # failing assertion will then cause the result to be wrong.
85 daba67c7 Michael Hanselmann
    assert self.getName() == ("%s/%s" % (self._worker_id, name))
86 daba67c7 Michael Hanselmann
87 76094e37 Michael Hanselmann
    ctx.lock.acquire()
88 76094e37 Michael Hanselmann
    try:
89 76094e37 Michael Hanselmann
      ctx.checksum = ctx.UpdateChecksum(ctx.checksum, number)
90 76094e37 Michael Hanselmann
    finally:
91 76094e37 Michael Hanselmann
      ctx.lock.release()
92 76094e37 Michael Hanselmann
93 76094e37 Michael Hanselmann
94 52c47e4e Michael Hanselmann
class ListBuilderContext:
95 52c47e4e Michael Hanselmann
  def __init__(self):
96 52c47e4e Michael Hanselmann
    self.lock = threading.Lock()
97 52c47e4e Michael Hanselmann
    self.result = []
98 52c47e4e Michael Hanselmann
    self.prioresult = {}
99 52c47e4e Michael Hanselmann
100 52c47e4e Michael Hanselmann
101 52c47e4e Michael Hanselmann
class ListBuilderWorker(workerpool.BaseWorker):
102 52c47e4e Michael Hanselmann
  def RunTask(self, ctx, data):
103 52c47e4e Michael Hanselmann
    ctx.lock.acquire()
104 52c47e4e Michael Hanselmann
    try:
105 52c47e4e Michael Hanselmann
      ctx.result.append((self.GetCurrentPriority(), data))
106 52c47e4e Michael Hanselmann
      ctx.prioresult.setdefault(self.GetCurrentPriority(), []).append(data)
107 52c47e4e Michael Hanselmann
    finally:
108 52c47e4e Michael Hanselmann
      ctx.lock.release()
109 52c47e4e Michael Hanselmann
110 52c47e4e Michael Hanselmann
111 52c47e4e Michael Hanselmann
class DeferringTaskContext:
112 52c47e4e Michael Hanselmann
  def __init__(self):
113 52c47e4e Michael Hanselmann
    self.lock = threading.Lock()
114 52c47e4e Michael Hanselmann
    self.prioresult = {}
115 52c47e4e Michael Hanselmann
    self.samepriodefer = {}
116 52c47e4e Michael Hanselmann
117 52c47e4e Michael Hanselmann
118 52c47e4e Michael Hanselmann
class DeferringWorker(workerpool.BaseWorker):
119 52c47e4e Michael Hanselmann
  def RunTask(self, ctx, num, targetprio):
120 52c47e4e Michael Hanselmann
    ctx.lock.acquire()
121 52c47e4e Michael Hanselmann
    try:
122 52c47e4e Michael Hanselmann
      if num in ctx.samepriodefer:
123 52c47e4e Michael Hanselmann
        del ctx.samepriodefer[num]
124 52c47e4e Michael Hanselmann
        raise workerpool.DeferTask()
125 52c47e4e Michael Hanselmann
126 52c47e4e Michael Hanselmann
      if self.GetCurrentPriority() > targetprio:
127 52c47e4e Michael Hanselmann
        raise workerpool.DeferTask(priority=self.GetCurrentPriority() - 1)
128 52c47e4e Michael Hanselmann
129 52c47e4e Michael Hanselmann
      ctx.prioresult.setdefault(self.GetCurrentPriority(), set()).add(num)
130 52c47e4e Michael Hanselmann
    finally:
131 52c47e4e Michael Hanselmann
      ctx.lock.release()
132 52c47e4e Michael Hanselmann
133 52c47e4e Michael Hanselmann
134 76094e37 Michael Hanselmann
class TestWorkerpool(unittest.TestCase):
135 76094e37 Michael Hanselmann
  """Workerpool tests"""
136 76094e37 Michael Hanselmann
137 7ed3248b Guido Trotter
  def testCounting(self):
138 7ed3248b Guido Trotter
    ctx = CountingContext()
139 7ed3248b Guido Trotter
    wp = workerpool.WorkerPool("Test", 3, CountingBaseWorker)
140 76094e37 Michael Hanselmann
    try:
141 76094e37 Michael Hanselmann
      self._CheckWorkerCount(wp, 3)
142 76094e37 Michael Hanselmann
143 f1501b3f Michael Hanselmann
      for i in range(10):
144 b2e8a4d9 Michael Hanselmann
        wp.AddTask((ctx, "Hello world %s" % i))
145 76094e37 Michael Hanselmann
146 76094e37 Michael Hanselmann
      wp.Quiesce()
147 76094e37 Michael Hanselmann
    finally:
148 76094e37 Michael Hanselmann
      wp.TerminateWorkers()
149 76094e37 Michael Hanselmann
      self._CheckWorkerCount(wp, 0)
150 76094e37 Michael Hanselmann
151 7ed3248b Guido Trotter
    self.assertEquals(ctx.GetDoneTasks(), 10)
152 7ed3248b Guido Trotter
153 76094e37 Michael Hanselmann
  def testNoTasks(self):
154 7ed3248b Guido Trotter
    wp = workerpool.WorkerPool("Test", 3, CountingBaseWorker)
155 76094e37 Michael Hanselmann
    try:
156 76094e37 Michael Hanselmann
      self._CheckWorkerCount(wp, 3)
157 76094e37 Michael Hanselmann
      self._CheckNoTasks(wp)
158 76094e37 Michael Hanselmann
    finally:
159 76094e37 Michael Hanselmann
      wp.TerminateWorkers()
160 76094e37 Michael Hanselmann
      self._CheckWorkerCount(wp, 0)
161 76094e37 Michael Hanselmann
162 76094e37 Michael Hanselmann
  def testNoTasksQuiesce(self):
163 7ed3248b Guido Trotter
    wp = workerpool.WorkerPool("Test", 3, CountingBaseWorker)
164 76094e37 Michael Hanselmann
    try:
165 76094e37 Michael Hanselmann
      self._CheckWorkerCount(wp, 3)
166 76094e37 Michael Hanselmann
      self._CheckNoTasks(wp)
167 76094e37 Michael Hanselmann
      wp.Quiesce()
168 76094e37 Michael Hanselmann
      self._CheckNoTasks(wp)
169 76094e37 Michael Hanselmann
    finally:
170 76094e37 Michael Hanselmann
      wp.TerminateWorkers()
171 76094e37 Michael Hanselmann
      self._CheckWorkerCount(wp, 0)
172 76094e37 Michael Hanselmann
173 76094e37 Michael Hanselmann
  def testChecksum(self):
174 76094e37 Michael Hanselmann
    # Tests whether all tasks are run and, since we're only using a single
175 76094e37 Michael Hanselmann
    # thread, whether everything is started in order.
176 89e2b4d2 Michael Hanselmann
    wp = workerpool.WorkerPool("Test", 1, ChecksumBaseWorker)
177 76094e37 Michael Hanselmann
    try:
178 76094e37 Michael Hanselmann
      self._CheckWorkerCount(wp, 1)
179 76094e37 Michael Hanselmann
180 76094e37 Michael Hanselmann
      ctx = ChecksumContext()
181 76094e37 Michael Hanselmann
      checksum = ChecksumContext.CHECKSUM_START
182 f1501b3f Michael Hanselmann
      for i in range(1, 100):
183 76094e37 Michael Hanselmann
        checksum = ChecksumContext.UpdateChecksum(checksum, i)
184 b2e8a4d9 Michael Hanselmann
        wp.AddTask((ctx, i))
185 76094e37 Michael Hanselmann
186 76094e37 Michael Hanselmann
      wp.Quiesce()
187 76094e37 Michael Hanselmann
188 76094e37 Michael Hanselmann
      self._CheckNoTasks(wp)
189 76094e37 Michael Hanselmann
190 76094e37 Michael Hanselmann
      # Check sum
191 76094e37 Michael Hanselmann
      ctx.lock.acquire()
192 76094e37 Michael Hanselmann
      try:
193 76094e37 Michael Hanselmann
        self.assertEqual(checksum, ctx.checksum)
194 76094e37 Michael Hanselmann
      finally:
195 76094e37 Michael Hanselmann
        ctx.lock.release()
196 76094e37 Michael Hanselmann
    finally:
197 76094e37 Michael Hanselmann
      wp.TerminateWorkers()
198 76094e37 Michael Hanselmann
      self._CheckWorkerCount(wp, 0)
199 76094e37 Michael Hanselmann
200 c2a8e8ba Guido Trotter
  def testAddManyTasks(self):
201 7ed3248b Guido Trotter
    ctx = CountingContext()
202 7ed3248b Guido Trotter
    wp = workerpool.WorkerPool("Test", 3, CountingBaseWorker)
203 c2a8e8ba Guido Trotter
    try:
204 c2a8e8ba Guido Trotter
      self._CheckWorkerCount(wp, 3)
205 c2a8e8ba Guido Trotter
206 7ed3248b Guido Trotter
      wp.AddManyTasks([(ctx, "Hello world %s" % i, ) for i in range(10)])
207 b2e8a4d9 Michael Hanselmann
      wp.AddTask((ctx, "A separate hello"))
208 b2e8a4d9 Michael Hanselmann
      wp.AddTask((ctx, "Once more, hi!"))
209 7ed3248b Guido Trotter
      wp.AddManyTasks([(ctx, "Hello world %s" % i, ) for i in range(10)])
210 c2a8e8ba Guido Trotter
211 c2a8e8ba Guido Trotter
      wp.Quiesce()
212 c2a8e8ba Guido Trotter
213 c2a8e8ba Guido Trotter
      self._CheckNoTasks(wp)
214 c2a8e8ba Guido Trotter
    finally:
215 c2a8e8ba Guido Trotter
      wp.TerminateWorkers()
216 c2a8e8ba Guido Trotter
      self._CheckWorkerCount(wp, 0)
217 c2a8e8ba Guido Trotter
218 7ed3248b Guido Trotter
    self.assertEquals(ctx.GetDoneTasks(), 22)
219 7ed3248b Guido Trotter
220 25e557a5 Guido Trotter
  def testManyTasksSequence(self):
221 25e557a5 Guido Trotter
    ctx = CountingContext()
222 25e557a5 Guido Trotter
    wp = workerpool.WorkerPool("Test", 3, CountingBaseWorker)
223 25e557a5 Guido Trotter
    try:
224 25e557a5 Guido Trotter
      self._CheckWorkerCount(wp, 3)
225 25e557a5 Guido Trotter
      self.assertRaises(AssertionError, wp.AddManyTasks,
226 25e557a5 Guido Trotter
                        ["Hello world %s" % i for i in range(10)])
227 25e557a5 Guido Trotter
      self.assertRaises(AssertionError, wp.AddManyTasks,
228 25e557a5 Guido Trotter
                        [i for i in range(10)])
229 25e557a5 Guido Trotter
230 25e557a5 Guido Trotter
      wp.AddManyTasks([(ctx, "Hello world %s" % i, ) for i in range(10)])
231 b2e8a4d9 Michael Hanselmann
      wp.AddTask((ctx, "A separate hello"))
232 25e557a5 Guido Trotter
233 25e557a5 Guido Trotter
      wp.Quiesce()
234 25e557a5 Guido Trotter
235 25e557a5 Guido Trotter
      self._CheckNoTasks(wp)
236 25e557a5 Guido Trotter
    finally:
237 25e557a5 Guido Trotter
      wp.TerminateWorkers()
238 25e557a5 Guido Trotter
      self._CheckWorkerCount(wp, 0)
239 25e557a5 Guido Trotter
240 25e557a5 Guido Trotter
    self.assertEquals(ctx.GetDoneTasks(), 11)
241 25e557a5 Guido Trotter
242 76094e37 Michael Hanselmann
  def _CheckNoTasks(self, wp):
243 76094e37 Michael Hanselmann
    wp._lock.acquire()
244 76094e37 Michael Hanselmann
    try:
245 76094e37 Michael Hanselmann
      # The task queue must be empty now
246 76094e37 Michael Hanselmann
      self.failUnless(not wp._tasks)
247 76094e37 Michael Hanselmann
    finally:
248 76094e37 Michael Hanselmann
      wp._lock.release()
249 76094e37 Michael Hanselmann
250 76094e37 Michael Hanselmann
  def _CheckWorkerCount(self, wp, num_workers):
251 76094e37 Michael Hanselmann
    wp._lock.acquire()
252 76094e37 Michael Hanselmann
    try:
253 76094e37 Michael Hanselmann
      self.assertEqual(len(wp._workers), num_workers)
254 76094e37 Michael Hanselmann
    finally:
255 76094e37 Michael Hanselmann
      wp._lock.release()
256 76094e37 Michael Hanselmann
257 52c47e4e Michael Hanselmann
  def testPriorityChecksum(self):
258 52c47e4e Michael Hanselmann
    # Tests whether all tasks are run and, since we're only using a single
259 52c47e4e Michael Hanselmann
    # thread, whether everything is started in order and respects the priority
260 52c47e4e Michael Hanselmann
    wp = workerpool.WorkerPool("Test", 1, ChecksumBaseWorker)
261 52c47e4e Michael Hanselmann
    try:
262 52c47e4e Michael Hanselmann
      self._CheckWorkerCount(wp, 1)
263 52c47e4e Michael Hanselmann
264 52c47e4e Michael Hanselmann
      ctx = ChecksumContext()
265 52c47e4e Michael Hanselmann
266 52c47e4e Michael Hanselmann
      data = {}
267 52c47e4e Michael Hanselmann
      tasks = []
268 52c47e4e Michael Hanselmann
      priorities = []
269 52c47e4e Michael Hanselmann
      for i in range(1, 333):
270 52c47e4e Michael Hanselmann
        prio = i % 7
271 52c47e4e Michael Hanselmann
        tasks.append((ctx, i))
272 52c47e4e Michael Hanselmann
        priorities.append(prio)
273 52c47e4e Michael Hanselmann
        data.setdefault(prio, []).append(i)
274 52c47e4e Michael Hanselmann
275 52c47e4e Michael Hanselmann
      wp.AddManyTasks(tasks, priority=priorities)
276 52c47e4e Michael Hanselmann
277 52c47e4e Michael Hanselmann
      wp.Quiesce()
278 52c47e4e Michael Hanselmann
279 52c47e4e Michael Hanselmann
      self._CheckNoTasks(wp)
280 52c47e4e Michael Hanselmann
281 52c47e4e Michael Hanselmann
      # Check sum
282 52c47e4e Michael Hanselmann
      ctx.lock.acquire()
283 52c47e4e Michael Hanselmann
      try:
284 52c47e4e Michael Hanselmann
        checksum = ChecksumContext.CHECKSUM_START
285 52c47e4e Michael Hanselmann
        for priority in sorted(data.keys()):
286 52c47e4e Michael Hanselmann
          for i in data[priority]:
287 52c47e4e Michael Hanselmann
            checksum = ChecksumContext.UpdateChecksum(checksum, i)
288 52c47e4e Michael Hanselmann
289 52c47e4e Michael Hanselmann
        self.assertEqual(checksum, ctx.checksum)
290 52c47e4e Michael Hanselmann
      finally:
291 52c47e4e Michael Hanselmann
        ctx.lock.release()
292 52c47e4e Michael Hanselmann
293 52c47e4e Michael Hanselmann
      self._CheckWorkerCount(wp, 1)
294 52c47e4e Michael Hanselmann
    finally:
295 52c47e4e Michael Hanselmann
      wp.TerminateWorkers()
296 52c47e4e Michael Hanselmann
      self._CheckWorkerCount(wp, 0)
297 52c47e4e Michael Hanselmann
298 52c47e4e Michael Hanselmann
  def testPriorityListManyTasks(self):
299 52c47e4e Michael Hanselmann
    # Tests whether all tasks are run and, since we're only using a single
300 52c47e4e Michael Hanselmann
    # thread, whether everything is started in order and respects the priority
301 52c47e4e Michael Hanselmann
    wp = workerpool.WorkerPool("Test", 1, ListBuilderWorker)
302 52c47e4e Michael Hanselmann
    try:
303 52c47e4e Michael Hanselmann
      self._CheckWorkerCount(wp, 1)
304 52c47e4e Michael Hanselmann
305 52c47e4e Michael Hanselmann
      ctx = ListBuilderContext()
306 52c47e4e Michael Hanselmann
307 52c47e4e Michael Hanselmann
      # Use static seed for this test
308 52c47e4e Michael Hanselmann
      rnd = random.Random(0)
309 52c47e4e Michael Hanselmann
310 52c47e4e Michael Hanselmann
      data = {}
311 52c47e4e Michael Hanselmann
      tasks = []
312 52c47e4e Michael Hanselmann
      priorities = []
313 52c47e4e Michael Hanselmann
      for i in range(1, 333):
314 52c47e4e Michael Hanselmann
        prio = int(rnd.random() * 10)
315 52c47e4e Michael Hanselmann
        tasks.append((ctx, i))
316 52c47e4e Michael Hanselmann
        priorities.append(prio)
317 52c47e4e Michael Hanselmann
        data.setdefault(prio, []).append((prio, i))
318 52c47e4e Michael Hanselmann
319 52c47e4e Michael Hanselmann
      wp.AddManyTasks(tasks, priority=priorities)
320 52c47e4e Michael Hanselmann
321 52c47e4e Michael Hanselmann
      self.assertRaises(errors.ProgrammerError, wp.AddManyTasks,
322 52c47e4e Michael Hanselmann
                        [("x", ), ("y", )], priority=[1] * 5)
323 52c47e4e Michael Hanselmann
324 52c47e4e Michael Hanselmann
      wp.Quiesce()
325 52c47e4e Michael Hanselmann
326 52c47e4e Michael Hanselmann
      self._CheckNoTasks(wp)
327 52c47e4e Michael Hanselmann
328 52c47e4e Michael Hanselmann
      # Check result
329 52c47e4e Michael Hanselmann
      ctx.lock.acquire()
330 52c47e4e Michael Hanselmann
      try:
331 52c47e4e Michael Hanselmann
        expresult = []
332 52c47e4e Michael Hanselmann
        for priority in sorted(data.keys()):
333 52c47e4e Michael Hanselmann
          expresult.extend(data[priority])
334 52c47e4e Michael Hanselmann
335 52c47e4e Michael Hanselmann
        self.assertEqual(expresult, ctx.result)
336 52c47e4e Michael Hanselmann
      finally:
337 52c47e4e Michael Hanselmann
        ctx.lock.release()
338 52c47e4e Michael Hanselmann
339 52c47e4e Michael Hanselmann
      self._CheckWorkerCount(wp, 1)
340 52c47e4e Michael Hanselmann
    finally:
341 52c47e4e Michael Hanselmann
      wp.TerminateWorkers()
342 52c47e4e Michael Hanselmann
      self._CheckWorkerCount(wp, 0)
343 52c47e4e Michael Hanselmann
344 52c47e4e Michael Hanselmann
  def testPriorityListSingleTasks(self):
345 52c47e4e Michael Hanselmann
    # Tests whether all tasks are run and, since we're only using a single
346 52c47e4e Michael Hanselmann
    # thread, whether everything is started in order and respects the priority
347 52c47e4e Michael Hanselmann
    wp = workerpool.WorkerPool("Test", 1, ListBuilderWorker)
348 52c47e4e Michael Hanselmann
    try:
349 52c47e4e Michael Hanselmann
      self._CheckWorkerCount(wp, 1)
350 52c47e4e Michael Hanselmann
351 52c47e4e Michael Hanselmann
      ctx = ListBuilderContext()
352 52c47e4e Michael Hanselmann
353 52c47e4e Michael Hanselmann
      # Use static seed for this test
354 52c47e4e Michael Hanselmann
      rnd = random.Random(26279)
355 52c47e4e Michael Hanselmann
356 52c47e4e Michael Hanselmann
      data = {}
357 52c47e4e Michael Hanselmann
      for i in range(1, 333):
358 52c47e4e Michael Hanselmann
        prio = int(rnd.random() * 30)
359 52c47e4e Michael Hanselmann
        wp.AddTask((ctx, i), priority=prio)
360 52c47e4e Michael Hanselmann
        data.setdefault(prio, []).append(i)
361 52c47e4e Michael Hanselmann
362 52c47e4e Michael Hanselmann
        # Cause some distortion
363 52c47e4e Michael Hanselmann
        if i % 11 == 0:
364 52c47e4e Michael Hanselmann
          time.sleep(.001)
365 52c47e4e Michael Hanselmann
        if i % 41 == 0:
366 52c47e4e Michael Hanselmann
          wp.Quiesce()
367 52c47e4e Michael Hanselmann
368 52c47e4e Michael Hanselmann
      wp.Quiesce()
369 52c47e4e Michael Hanselmann
370 52c47e4e Michael Hanselmann
      self._CheckNoTasks(wp)
371 52c47e4e Michael Hanselmann
372 52c47e4e Michael Hanselmann
      # Check result
373 52c47e4e Michael Hanselmann
      ctx.lock.acquire()
374 52c47e4e Michael Hanselmann
      try:
375 52c47e4e Michael Hanselmann
        self.assertEqual(data, ctx.prioresult)
376 52c47e4e Michael Hanselmann
      finally:
377 52c47e4e Michael Hanselmann
        ctx.lock.release()
378 52c47e4e Michael Hanselmann
379 52c47e4e Michael Hanselmann
      self._CheckWorkerCount(wp, 1)
380 52c47e4e Michael Hanselmann
    finally:
381 52c47e4e Michael Hanselmann
      wp.TerminateWorkers()
382 52c47e4e Michael Hanselmann
      self._CheckWorkerCount(wp, 0)
383 52c47e4e Michael Hanselmann
384 52c47e4e Michael Hanselmann
  def testPriorityListSingleTasks(self):
385 52c47e4e Michael Hanselmann
    # Tests whether all tasks are run and, since we're only using a single
386 52c47e4e Michael Hanselmann
    # thread, whether everything is started in order and respects the priority
387 52c47e4e Michael Hanselmann
    wp = workerpool.WorkerPool("Test", 1, ListBuilderWorker)
388 52c47e4e Michael Hanselmann
    try:
389 52c47e4e Michael Hanselmann
      self._CheckWorkerCount(wp, 1)
390 52c47e4e Michael Hanselmann
391 52c47e4e Michael Hanselmann
      ctx = ListBuilderContext()
392 52c47e4e Michael Hanselmann
393 52c47e4e Michael Hanselmann
      # Use static seed for this test
394 52c47e4e Michael Hanselmann
      rnd = random.Random(26279)
395 52c47e4e Michael Hanselmann
396 52c47e4e Michael Hanselmann
      data = {}
397 52c47e4e Michael Hanselmann
      for i in range(1, 333):
398 52c47e4e Michael Hanselmann
        prio = int(rnd.random() * 30)
399 52c47e4e Michael Hanselmann
        wp.AddTask((ctx, i), priority=prio)
400 52c47e4e Michael Hanselmann
        data.setdefault(prio, []).append(i)
401 52c47e4e Michael Hanselmann
402 52c47e4e Michael Hanselmann
        # Cause some distortion
403 52c47e4e Michael Hanselmann
        if i % 11 == 0:
404 52c47e4e Michael Hanselmann
          time.sleep(.001)
405 52c47e4e Michael Hanselmann
        if i % 41 == 0:
406 52c47e4e Michael Hanselmann
          wp.Quiesce()
407 52c47e4e Michael Hanselmann
408 52c47e4e Michael Hanselmann
      wp.Quiesce()
409 52c47e4e Michael Hanselmann
410 52c47e4e Michael Hanselmann
      self._CheckNoTasks(wp)
411 52c47e4e Michael Hanselmann
412 52c47e4e Michael Hanselmann
      # Check result
413 52c47e4e Michael Hanselmann
      ctx.lock.acquire()
414 52c47e4e Michael Hanselmann
      try:
415 52c47e4e Michael Hanselmann
        self.assertEqual(data, ctx.prioresult)
416 52c47e4e Michael Hanselmann
      finally:
417 52c47e4e Michael Hanselmann
        ctx.lock.release()
418 52c47e4e Michael Hanselmann
419 52c47e4e Michael Hanselmann
      self._CheckWorkerCount(wp, 1)
420 52c47e4e Michael Hanselmann
    finally:
421 52c47e4e Michael Hanselmann
      wp.TerminateWorkers()
422 52c47e4e Michael Hanselmann
      self._CheckWorkerCount(wp, 0)
423 52c47e4e Michael Hanselmann
424 52c47e4e Michael Hanselmann
  def testDeferTask(self):
425 52c47e4e Michael Hanselmann
    # Tests whether all tasks are run and, since we're only using a single
426 52c47e4e Michael Hanselmann
    # thread, whether everything is started in order and respects the priority
427 52c47e4e Michael Hanselmann
    wp = workerpool.WorkerPool("Test", 1, DeferringWorker)
428 52c47e4e Michael Hanselmann
    try:
429 52c47e4e Michael Hanselmann
      self._CheckWorkerCount(wp, 1)
430 52c47e4e Michael Hanselmann
431 52c47e4e Michael Hanselmann
      ctx = DeferringTaskContext()
432 52c47e4e Michael Hanselmann
433 52c47e4e Michael Hanselmann
      # Use static seed for this test
434 52c47e4e Michael Hanselmann
      rnd = random.Random(14921)
435 52c47e4e Michael Hanselmann
436 52c47e4e Michael Hanselmann
      data = {}
437 52c47e4e Michael Hanselmann
      for i in range(1, 333):
438 52c47e4e Michael Hanselmann
        ctx.lock.acquire()
439 52c47e4e Michael Hanselmann
        try:
440 52c47e4e Michael Hanselmann
          if i % 5 == 0:
441 52c47e4e Michael Hanselmann
            ctx.samepriodefer[i] = True
442 52c47e4e Michael Hanselmann
        finally:
443 52c47e4e Michael Hanselmann
          ctx.lock.release()
444 52c47e4e Michael Hanselmann
445 52c47e4e Michael Hanselmann
        prio = int(rnd.random() * 30)
446 52c47e4e Michael Hanselmann
        wp.AddTask((ctx, i, prio), priority=50)
447 52c47e4e Michael Hanselmann
        data.setdefault(prio, set()).add(i)
448 52c47e4e Michael Hanselmann
449 52c47e4e Michael Hanselmann
        # Cause some distortion
450 52c47e4e Michael Hanselmann
        if i % 24 == 0:
451 52c47e4e Michael Hanselmann
          time.sleep(.001)
452 52c47e4e Michael Hanselmann
        if i % 31 == 0:
453 52c47e4e Michael Hanselmann
          wp.Quiesce()
454 52c47e4e Michael Hanselmann
455 52c47e4e Michael Hanselmann
      wp.Quiesce()
456 52c47e4e Michael Hanselmann
457 52c47e4e Michael Hanselmann
      self._CheckNoTasks(wp)
458 52c47e4e Michael Hanselmann
459 52c47e4e Michael Hanselmann
      # Check result
460 52c47e4e Michael Hanselmann
      ctx.lock.acquire()
461 52c47e4e Michael Hanselmann
      try:
462 52c47e4e Michael Hanselmann
        self.assertEqual(data, ctx.prioresult)
463 52c47e4e Michael Hanselmann
      finally:
464 52c47e4e Michael Hanselmann
        ctx.lock.release()
465 52c47e4e Michael Hanselmann
466 52c47e4e Michael Hanselmann
      self._CheckWorkerCount(wp, 1)
467 52c47e4e Michael Hanselmann
    finally:
468 52c47e4e Michael Hanselmann
      wp.TerminateWorkers()
469 52c47e4e Michael Hanselmann
      self._CheckWorkerCount(wp, 0)
470 52c47e4e Michael Hanselmann
471 76094e37 Michael Hanselmann
472 76094e37 Michael Hanselmann
if __name__ == '__main__':
473 25231ec5 Michael Hanselmann
  testutils.GanetiTestProgram()