Statistics
| Branch: | Tag: | Revision:

root / test / ganeti.workerpool_unittest.py @ 415feb2e

History | View | Annotate | Download (13.3 kB)

1 76094e37 Michael Hanselmann
#!/usr/bin/python
2 76094e37 Michael Hanselmann
#
3 76094e37 Michael Hanselmann
4 52c47e4e Michael Hanselmann
# Copyright (C) 2008, 2009, 2010 Google Inc.
5 76094e37 Michael Hanselmann
#
6 76094e37 Michael Hanselmann
# This program is free software; you can redistribute it and/or modify
7 76094e37 Michael Hanselmann
# it under the terms of the GNU General Public License as published by
8 76094e37 Michael Hanselmann
# the Free Software Foundation; either version 2 of the License, or
9 76094e37 Michael Hanselmann
# (at your option) any later version.
10 76094e37 Michael Hanselmann
#
11 76094e37 Michael Hanselmann
# This program is distributed in the hope that it will be useful, but
12 76094e37 Michael Hanselmann
# WITHOUT ANY WARRANTY; without even the implied warranty of
13 76094e37 Michael Hanselmann
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 76094e37 Michael Hanselmann
# General Public License for more details.
15 76094e37 Michael Hanselmann
#
16 76094e37 Michael Hanselmann
# You should have received a copy of the GNU General Public License
17 76094e37 Michael Hanselmann
# along with this program; if not, write to the Free Software
18 76094e37 Michael Hanselmann
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 76094e37 Michael Hanselmann
# 02110-1301, USA.
20 76094e37 Michael Hanselmann
21 76094e37 Michael Hanselmann
22 76094e37 Michael Hanselmann
"""Script for unittesting the workerpool module"""
23 76094e37 Michael Hanselmann
24 76094e37 Michael Hanselmann
import unittest
25 76094e37 Michael Hanselmann
import threading
26 76094e37 Michael Hanselmann
import time
27 76094e37 Michael Hanselmann
import sys
28 76094e37 Michael Hanselmann
import zlib
29 52c47e4e Michael Hanselmann
import random
30 76094e37 Michael Hanselmann
31 76094e37 Michael Hanselmann
from ganeti import workerpool
32 52c47e4e Michael Hanselmann
from ganeti import errors
33 76094e37 Michael Hanselmann
34 25231ec5 Michael Hanselmann
import testutils
35 25231ec5 Michael Hanselmann
36 7ed3248b Guido Trotter
37 52c47e4e Michael Hanselmann
class CountingContext(object):
38 7ed3248b Guido Trotter
  def __init__(self):
39 7ed3248b Guido Trotter
    self._lock = threading.Condition(threading.Lock())
40 7ed3248b Guido Trotter
    self.done = 0
41 7ed3248b Guido Trotter
42 7ed3248b Guido Trotter
  def DoneTask(self):
43 7ed3248b Guido Trotter
    self._lock.acquire()
44 7ed3248b Guido Trotter
    try:
45 7ed3248b Guido Trotter
      self.done += 1
46 7ed3248b Guido Trotter
    finally:
47 7ed3248b Guido Trotter
      self._lock.release()
48 7ed3248b Guido Trotter
49 7ed3248b Guido Trotter
  def GetDoneTasks(self):
50 7ed3248b Guido Trotter
    self._lock.acquire()
51 7ed3248b Guido Trotter
    try:
52 7ed3248b Guido Trotter
      return self.done
53 7ed3248b Guido Trotter
    finally:
54 7ed3248b Guido Trotter
      self._lock.release()
55 7ed3248b Guido Trotter
56 7ed3248b Guido Trotter
  @staticmethod
57 7ed3248b Guido Trotter
  def UpdateChecksum(current, value):
58 7ed3248b Guido Trotter
    return zlib.adler32(str(value), current)
59 76094e37 Michael Hanselmann
60 7ed3248b Guido Trotter
61 7ed3248b Guido Trotter
class CountingBaseWorker(workerpool.BaseWorker):
62 7ed3248b Guido Trotter
  def RunTask(self, ctx, text):
63 7ed3248b Guido Trotter
    ctx.DoneTask()
64 76094e37 Michael Hanselmann
65 76094e37 Michael Hanselmann
66 76094e37 Michael Hanselmann
class ChecksumContext:
67 76094e37 Michael Hanselmann
  CHECKSUM_START = zlib.adler32("")
68 76094e37 Michael Hanselmann
69 76094e37 Michael Hanselmann
  def __init__(self):
70 76094e37 Michael Hanselmann
    self.lock = threading.Condition(threading.Lock())
71 76094e37 Michael Hanselmann
    self.checksum = self.CHECKSUM_START
72 76094e37 Michael Hanselmann
73 76094e37 Michael Hanselmann
  @staticmethod
74 76094e37 Michael Hanselmann
  def UpdateChecksum(current, value):
75 76094e37 Michael Hanselmann
    return zlib.adler32(str(value), current)
76 76094e37 Michael Hanselmann
77 76094e37 Michael Hanselmann
78 76094e37 Michael Hanselmann
class ChecksumBaseWorker(workerpool.BaseWorker):
79 76094e37 Michael Hanselmann
  def RunTask(self, ctx, number):
80 daba67c7 Michael Hanselmann
    name = "number%s" % number
81 daba67c7 Michael Hanselmann
    self.SetTaskName(name)
82 daba67c7 Michael Hanselmann
83 daba67c7 Michael Hanselmann
    # This assertion needs to be checked before updating the checksum. A
84 daba67c7 Michael Hanselmann
    # failing assertion will then cause the result to be wrong.
85 daba67c7 Michael Hanselmann
    assert self.getName() == ("%s/%s" % (self._worker_id, name))
86 daba67c7 Michael Hanselmann
87 76094e37 Michael Hanselmann
    ctx.lock.acquire()
88 76094e37 Michael Hanselmann
    try:
89 76094e37 Michael Hanselmann
      ctx.checksum = ctx.UpdateChecksum(ctx.checksum, number)
90 76094e37 Michael Hanselmann
    finally:
91 76094e37 Michael Hanselmann
      ctx.lock.release()
92 76094e37 Michael Hanselmann
93 76094e37 Michael Hanselmann
94 52c47e4e Michael Hanselmann
class ListBuilderContext:
95 52c47e4e Michael Hanselmann
  def __init__(self):
96 52c47e4e Michael Hanselmann
    self.lock = threading.Lock()
97 52c47e4e Michael Hanselmann
    self.result = []
98 52c47e4e Michael Hanselmann
    self.prioresult = {}
99 52c47e4e Michael Hanselmann
100 52c47e4e Michael Hanselmann
101 52c47e4e Michael Hanselmann
class ListBuilderWorker(workerpool.BaseWorker):
102 52c47e4e Michael Hanselmann
  def RunTask(self, ctx, data):
103 52c47e4e Michael Hanselmann
    ctx.lock.acquire()
104 52c47e4e Michael Hanselmann
    try:
105 52c47e4e Michael Hanselmann
      ctx.result.append((self.GetCurrentPriority(), data))
106 52c47e4e Michael Hanselmann
      ctx.prioresult.setdefault(self.GetCurrentPriority(), []).append(data)
107 52c47e4e Michael Hanselmann
    finally:
108 52c47e4e Michael Hanselmann
      ctx.lock.release()
109 52c47e4e Michael Hanselmann
110 52c47e4e Michael Hanselmann
111 52c47e4e Michael Hanselmann
class DeferringTaskContext:
112 52c47e4e Michael Hanselmann
  def __init__(self):
113 52c47e4e Michael Hanselmann
    self.lock = threading.Lock()
114 52c47e4e Michael Hanselmann
    self.prioresult = {}
115 52c47e4e Michael Hanselmann
    self.samepriodefer = {}
116 52c47e4e Michael Hanselmann
117 52c47e4e Michael Hanselmann
118 52c47e4e Michael Hanselmann
class DeferringWorker(workerpool.BaseWorker):
119 52c47e4e Michael Hanselmann
  def RunTask(self, ctx, num, targetprio):
120 52c47e4e Michael Hanselmann
    ctx.lock.acquire()
121 52c47e4e Michael Hanselmann
    try:
122 52c47e4e Michael Hanselmann
      if num in ctx.samepriodefer:
123 52c47e4e Michael Hanselmann
        del ctx.samepriodefer[num]
124 52c47e4e Michael Hanselmann
        raise workerpool.DeferTask()
125 52c47e4e Michael Hanselmann
126 52c47e4e Michael Hanselmann
      if self.GetCurrentPriority() > targetprio:
127 52c47e4e Michael Hanselmann
        raise workerpool.DeferTask(priority=self.GetCurrentPriority() - 1)
128 52c47e4e Michael Hanselmann
129 52c47e4e Michael Hanselmann
      ctx.prioresult.setdefault(self.GetCurrentPriority(), set()).add(num)
130 52c47e4e Michael Hanselmann
    finally:
131 52c47e4e Michael Hanselmann
      ctx.lock.release()
132 52c47e4e Michael Hanselmann
133 52c47e4e Michael Hanselmann
134 76094e37 Michael Hanselmann
class TestWorkerpool(unittest.TestCase):
135 76094e37 Michael Hanselmann
  """Workerpool tests"""
136 76094e37 Michael Hanselmann
137 7ed3248b Guido Trotter
  def testCounting(self):
138 7ed3248b Guido Trotter
    ctx = CountingContext()
139 7ed3248b Guido Trotter
    wp = workerpool.WorkerPool("Test", 3, CountingBaseWorker)
140 76094e37 Michael Hanselmann
    try:
141 76094e37 Michael Hanselmann
      self._CheckWorkerCount(wp, 3)
142 76094e37 Michael Hanselmann
143 f1501b3f Michael Hanselmann
      for i in range(10):
144 b2e8a4d9 Michael Hanselmann
        wp.AddTask((ctx, "Hello world %s" % i))
145 76094e37 Michael Hanselmann
146 76094e37 Michael Hanselmann
      wp.Quiesce()
147 76094e37 Michael Hanselmann
    finally:
148 76094e37 Michael Hanselmann
      wp.TerminateWorkers()
149 76094e37 Michael Hanselmann
      self._CheckWorkerCount(wp, 0)
150 76094e37 Michael Hanselmann
151 7ed3248b Guido Trotter
    self.assertEquals(ctx.GetDoneTasks(), 10)
152 7ed3248b Guido Trotter
153 76094e37 Michael Hanselmann
  def testNoTasks(self):
154 7ed3248b Guido Trotter
    wp = workerpool.WorkerPool("Test", 3, CountingBaseWorker)
155 76094e37 Michael Hanselmann
    try:
156 76094e37 Michael Hanselmann
      self._CheckWorkerCount(wp, 3)
157 76094e37 Michael Hanselmann
      self._CheckNoTasks(wp)
158 76094e37 Michael Hanselmann
    finally:
159 76094e37 Michael Hanselmann
      wp.TerminateWorkers()
160 76094e37 Michael Hanselmann
      self._CheckWorkerCount(wp, 0)
161 76094e37 Michael Hanselmann
162 76094e37 Michael Hanselmann
  def testNoTasksQuiesce(self):
163 7ed3248b Guido Trotter
    wp = workerpool.WorkerPool("Test", 3, CountingBaseWorker)
164 76094e37 Michael Hanselmann
    try:
165 76094e37 Michael Hanselmann
      self._CheckWorkerCount(wp, 3)
166 76094e37 Michael Hanselmann
      self._CheckNoTasks(wp)
167 76094e37 Michael Hanselmann
      wp.Quiesce()
168 76094e37 Michael Hanselmann
      self._CheckNoTasks(wp)
169 76094e37 Michael Hanselmann
    finally:
170 76094e37 Michael Hanselmann
      wp.TerminateWorkers()
171 76094e37 Michael Hanselmann
      self._CheckWorkerCount(wp, 0)
172 76094e37 Michael Hanselmann
173 27caa993 Michael Hanselmann
  def testActive(self):
174 27caa993 Michael Hanselmann
    ctx = CountingContext()
175 27caa993 Michael Hanselmann
    wp = workerpool.WorkerPool("TestActive", 5, CountingBaseWorker)
176 27caa993 Michael Hanselmann
    try:
177 27caa993 Michael Hanselmann
      self._CheckWorkerCount(wp, 5)
178 27caa993 Michael Hanselmann
      self.assertTrue(wp._active)
179 27caa993 Michael Hanselmann
180 27caa993 Michael Hanselmann
      # Process some tasks
181 27caa993 Michael Hanselmann
      for _ in range(10):
182 27caa993 Michael Hanselmann
        wp.AddTask((ctx, None))
183 27caa993 Michael Hanselmann
184 27caa993 Michael Hanselmann
      wp.Quiesce()
185 27caa993 Michael Hanselmann
      self._CheckNoTasks(wp)
186 27caa993 Michael Hanselmann
      self.assertEquals(ctx.GetDoneTasks(), 10)
187 27caa993 Michael Hanselmann
188 27caa993 Michael Hanselmann
      # Repeat a few times
189 27caa993 Michael Hanselmann
      for count in range(10):
190 27caa993 Michael Hanselmann
        # Deactivate pool
191 27caa993 Michael Hanselmann
        wp.SetActive(False)
192 27caa993 Michael Hanselmann
        self._CheckNoTasks(wp)
193 27caa993 Michael Hanselmann
194 27caa993 Michael Hanselmann
        # Queue some more tasks
195 27caa993 Michael Hanselmann
        for _ in range(10):
196 27caa993 Michael Hanselmann
          wp.AddTask((ctx, None))
197 27caa993 Michael Hanselmann
198 27caa993 Michael Hanselmann
        for _ in range(5):
199 27caa993 Michael Hanselmann
          # Short delays to give other threads a chance to cause breakage
200 27caa993 Michael Hanselmann
          time.sleep(.01)
201 27caa993 Michael Hanselmann
          wp.AddTask((ctx, "Hello world %s" % 999))
202 27caa993 Michael Hanselmann
          self.assertFalse(wp._active)
203 27caa993 Michael Hanselmann
204 27caa993 Michael Hanselmann
        self.assertEquals(ctx.GetDoneTasks(), 10 + (count * 15))
205 27caa993 Michael Hanselmann
206 27caa993 Michael Hanselmann
        # Start processing again
207 27caa993 Michael Hanselmann
        wp.SetActive(True)
208 27caa993 Michael Hanselmann
        self.assertTrue(wp._active)
209 27caa993 Michael Hanselmann
210 27caa993 Michael Hanselmann
        # Wait for tasks to finish
211 27caa993 Michael Hanselmann
        wp.Quiesce()
212 27caa993 Michael Hanselmann
        self._CheckNoTasks(wp)
213 27caa993 Michael Hanselmann
        self.assertEquals(ctx.GetDoneTasks(), 10 + (count * 15) + 15)
214 27caa993 Michael Hanselmann
215 27caa993 Michael Hanselmann
        self._CheckWorkerCount(wp, 5)
216 27caa993 Michael Hanselmann
    finally:
217 27caa993 Michael Hanselmann
      wp.TerminateWorkers()
218 27caa993 Michael Hanselmann
      self._CheckWorkerCount(wp, 0)
219 27caa993 Michael Hanselmann
220 76094e37 Michael Hanselmann
  def testChecksum(self):
221 76094e37 Michael Hanselmann
    # Tests whether all tasks are run and, since we're only using a single
222 76094e37 Michael Hanselmann
    # thread, whether everything is started in order.
223 89e2b4d2 Michael Hanselmann
    wp = workerpool.WorkerPool("Test", 1, ChecksumBaseWorker)
224 76094e37 Michael Hanselmann
    try:
225 76094e37 Michael Hanselmann
      self._CheckWorkerCount(wp, 1)
226 76094e37 Michael Hanselmann
227 76094e37 Michael Hanselmann
      ctx = ChecksumContext()
228 76094e37 Michael Hanselmann
      checksum = ChecksumContext.CHECKSUM_START
229 f1501b3f Michael Hanselmann
      for i in range(1, 100):
230 76094e37 Michael Hanselmann
        checksum = ChecksumContext.UpdateChecksum(checksum, i)
231 b2e8a4d9 Michael Hanselmann
        wp.AddTask((ctx, i))
232 76094e37 Michael Hanselmann
233 76094e37 Michael Hanselmann
      wp.Quiesce()
234 76094e37 Michael Hanselmann
235 76094e37 Michael Hanselmann
      self._CheckNoTasks(wp)
236 76094e37 Michael Hanselmann
237 76094e37 Michael Hanselmann
      # Check sum
238 76094e37 Michael Hanselmann
      ctx.lock.acquire()
239 76094e37 Michael Hanselmann
      try:
240 76094e37 Michael Hanselmann
        self.assertEqual(checksum, ctx.checksum)
241 76094e37 Michael Hanselmann
      finally:
242 76094e37 Michael Hanselmann
        ctx.lock.release()
243 76094e37 Michael Hanselmann
    finally:
244 76094e37 Michael Hanselmann
      wp.TerminateWorkers()
245 76094e37 Michael Hanselmann
      self._CheckWorkerCount(wp, 0)
246 76094e37 Michael Hanselmann
247 c2a8e8ba Guido Trotter
  def testAddManyTasks(self):
248 7ed3248b Guido Trotter
    ctx = CountingContext()
249 7ed3248b Guido Trotter
    wp = workerpool.WorkerPool("Test", 3, CountingBaseWorker)
250 c2a8e8ba Guido Trotter
    try:
251 c2a8e8ba Guido Trotter
      self._CheckWorkerCount(wp, 3)
252 c2a8e8ba Guido Trotter
253 7ed3248b Guido Trotter
      wp.AddManyTasks([(ctx, "Hello world %s" % i, ) for i in range(10)])
254 b2e8a4d9 Michael Hanselmann
      wp.AddTask((ctx, "A separate hello"))
255 b2e8a4d9 Michael Hanselmann
      wp.AddTask((ctx, "Once more, hi!"))
256 7ed3248b Guido Trotter
      wp.AddManyTasks([(ctx, "Hello world %s" % i, ) for i in range(10)])
257 c2a8e8ba Guido Trotter
258 c2a8e8ba Guido Trotter
      wp.Quiesce()
259 c2a8e8ba Guido Trotter
260 c2a8e8ba Guido Trotter
      self._CheckNoTasks(wp)
261 c2a8e8ba Guido Trotter
    finally:
262 c2a8e8ba Guido Trotter
      wp.TerminateWorkers()
263 c2a8e8ba Guido Trotter
      self._CheckWorkerCount(wp, 0)
264 c2a8e8ba Guido Trotter
265 7ed3248b Guido Trotter
    self.assertEquals(ctx.GetDoneTasks(), 22)
266 7ed3248b Guido Trotter
267 25e557a5 Guido Trotter
  def testManyTasksSequence(self):
268 25e557a5 Guido Trotter
    ctx = CountingContext()
269 25e557a5 Guido Trotter
    wp = workerpool.WorkerPool("Test", 3, CountingBaseWorker)
270 25e557a5 Guido Trotter
    try:
271 25e557a5 Guido Trotter
      self._CheckWorkerCount(wp, 3)
272 25e557a5 Guido Trotter
      self.assertRaises(AssertionError, wp.AddManyTasks,
273 25e557a5 Guido Trotter
                        ["Hello world %s" % i for i in range(10)])
274 25e557a5 Guido Trotter
      self.assertRaises(AssertionError, wp.AddManyTasks,
275 25e557a5 Guido Trotter
                        [i for i in range(10)])
276 25e557a5 Guido Trotter
277 25e557a5 Guido Trotter
      wp.AddManyTasks([(ctx, "Hello world %s" % i, ) for i in range(10)])
278 b2e8a4d9 Michael Hanselmann
      wp.AddTask((ctx, "A separate hello"))
279 25e557a5 Guido Trotter
280 25e557a5 Guido Trotter
      wp.Quiesce()
281 25e557a5 Guido Trotter
282 25e557a5 Guido Trotter
      self._CheckNoTasks(wp)
283 25e557a5 Guido Trotter
    finally:
284 25e557a5 Guido Trotter
      wp.TerminateWorkers()
285 25e557a5 Guido Trotter
      self._CheckWorkerCount(wp, 0)
286 25e557a5 Guido Trotter
287 25e557a5 Guido Trotter
    self.assertEquals(ctx.GetDoneTasks(), 11)
288 25e557a5 Guido Trotter
289 76094e37 Michael Hanselmann
  def _CheckNoTasks(self, wp):
290 76094e37 Michael Hanselmann
    wp._lock.acquire()
291 76094e37 Michael Hanselmann
    try:
292 76094e37 Michael Hanselmann
      # The task queue must be empty now
293 76094e37 Michael Hanselmann
      self.failUnless(not wp._tasks)
294 76094e37 Michael Hanselmann
    finally:
295 76094e37 Michael Hanselmann
      wp._lock.release()
296 76094e37 Michael Hanselmann
297 76094e37 Michael Hanselmann
  def _CheckWorkerCount(self, wp, num_workers):
298 76094e37 Michael Hanselmann
    wp._lock.acquire()
299 76094e37 Michael Hanselmann
    try:
300 76094e37 Michael Hanselmann
      self.assertEqual(len(wp._workers), num_workers)
301 76094e37 Michael Hanselmann
    finally:
302 76094e37 Michael Hanselmann
      wp._lock.release()
303 76094e37 Michael Hanselmann
304 52c47e4e Michael Hanselmann
  def testPriorityChecksum(self):
305 52c47e4e Michael Hanselmann
    # Tests whether all tasks are run and, since we're only using a single
306 52c47e4e Michael Hanselmann
    # thread, whether everything is started in order and respects the priority
307 52c47e4e Michael Hanselmann
    wp = workerpool.WorkerPool("Test", 1, ChecksumBaseWorker)
308 52c47e4e Michael Hanselmann
    try:
309 52c47e4e Michael Hanselmann
      self._CheckWorkerCount(wp, 1)
310 52c47e4e Michael Hanselmann
311 52c47e4e Michael Hanselmann
      ctx = ChecksumContext()
312 52c47e4e Michael Hanselmann
313 52c47e4e Michael Hanselmann
      data = {}
314 52c47e4e Michael Hanselmann
      tasks = []
315 52c47e4e Michael Hanselmann
      priorities = []
316 52c47e4e Michael Hanselmann
      for i in range(1, 333):
317 52c47e4e Michael Hanselmann
        prio = i % 7
318 52c47e4e Michael Hanselmann
        tasks.append((ctx, i))
319 52c47e4e Michael Hanselmann
        priorities.append(prio)
320 52c47e4e Michael Hanselmann
        data.setdefault(prio, []).append(i)
321 52c47e4e Michael Hanselmann
322 52c47e4e Michael Hanselmann
      wp.AddManyTasks(tasks, priority=priorities)
323 52c47e4e Michael Hanselmann
324 52c47e4e Michael Hanselmann
      wp.Quiesce()
325 52c47e4e Michael Hanselmann
326 52c47e4e Michael Hanselmann
      self._CheckNoTasks(wp)
327 52c47e4e Michael Hanselmann
328 52c47e4e Michael Hanselmann
      # Check sum
329 52c47e4e Michael Hanselmann
      ctx.lock.acquire()
330 52c47e4e Michael Hanselmann
      try:
331 52c47e4e Michael Hanselmann
        checksum = ChecksumContext.CHECKSUM_START
332 52c47e4e Michael Hanselmann
        for priority in sorted(data.keys()):
333 52c47e4e Michael Hanselmann
          for i in data[priority]:
334 52c47e4e Michael Hanselmann
            checksum = ChecksumContext.UpdateChecksum(checksum, i)
335 52c47e4e Michael Hanselmann
336 52c47e4e Michael Hanselmann
        self.assertEqual(checksum, ctx.checksum)
337 52c47e4e Michael Hanselmann
      finally:
338 52c47e4e Michael Hanselmann
        ctx.lock.release()
339 52c47e4e Michael Hanselmann
340 52c47e4e Michael Hanselmann
      self._CheckWorkerCount(wp, 1)
341 52c47e4e Michael Hanselmann
    finally:
342 52c47e4e Michael Hanselmann
      wp.TerminateWorkers()
343 52c47e4e Michael Hanselmann
      self._CheckWorkerCount(wp, 0)
344 52c47e4e Michael Hanselmann
345 52c47e4e Michael Hanselmann
  def testPriorityListManyTasks(self):
346 52c47e4e Michael Hanselmann
    # Tests whether all tasks are run and, since we're only using a single
347 52c47e4e Michael Hanselmann
    # thread, whether everything is started in order and respects the priority
348 52c47e4e Michael Hanselmann
    wp = workerpool.WorkerPool("Test", 1, ListBuilderWorker)
349 52c47e4e Michael Hanselmann
    try:
350 52c47e4e Michael Hanselmann
      self._CheckWorkerCount(wp, 1)
351 52c47e4e Michael Hanselmann
352 52c47e4e Michael Hanselmann
      ctx = ListBuilderContext()
353 52c47e4e Michael Hanselmann
354 52c47e4e Michael Hanselmann
      # Use static seed for this test
355 52c47e4e Michael Hanselmann
      rnd = random.Random(0)
356 52c47e4e Michael Hanselmann
357 52c47e4e Michael Hanselmann
      data = {}
358 52c47e4e Michael Hanselmann
      tasks = []
359 52c47e4e Michael Hanselmann
      priorities = []
360 52c47e4e Michael Hanselmann
      for i in range(1, 333):
361 52c47e4e Michael Hanselmann
        prio = int(rnd.random() * 10)
362 52c47e4e Michael Hanselmann
        tasks.append((ctx, i))
363 52c47e4e Michael Hanselmann
        priorities.append(prio)
364 52c47e4e Michael Hanselmann
        data.setdefault(prio, []).append((prio, i))
365 52c47e4e Michael Hanselmann
366 52c47e4e Michael Hanselmann
      wp.AddManyTasks(tasks, priority=priorities)
367 52c47e4e Michael Hanselmann
368 52c47e4e Michael Hanselmann
      self.assertRaises(errors.ProgrammerError, wp.AddManyTasks,
369 52c47e4e Michael Hanselmann
                        [("x", ), ("y", )], priority=[1] * 5)
370 52c47e4e Michael Hanselmann
371 52c47e4e Michael Hanselmann
      wp.Quiesce()
372 52c47e4e Michael Hanselmann
373 52c47e4e Michael Hanselmann
      self._CheckNoTasks(wp)
374 52c47e4e Michael Hanselmann
375 52c47e4e Michael Hanselmann
      # Check result
376 52c47e4e Michael Hanselmann
      ctx.lock.acquire()
377 52c47e4e Michael Hanselmann
      try:
378 52c47e4e Michael Hanselmann
        expresult = []
379 52c47e4e Michael Hanselmann
        for priority in sorted(data.keys()):
380 52c47e4e Michael Hanselmann
          expresult.extend(data[priority])
381 52c47e4e Michael Hanselmann
382 52c47e4e Michael Hanselmann
        self.assertEqual(expresult, ctx.result)
383 52c47e4e Michael Hanselmann
      finally:
384 52c47e4e Michael Hanselmann
        ctx.lock.release()
385 52c47e4e Michael Hanselmann
386 52c47e4e Michael Hanselmann
      self._CheckWorkerCount(wp, 1)
387 52c47e4e Michael Hanselmann
    finally:
388 52c47e4e Michael Hanselmann
      wp.TerminateWorkers()
389 52c47e4e Michael Hanselmann
      self._CheckWorkerCount(wp, 0)
390 52c47e4e Michael Hanselmann
391 52c47e4e Michael Hanselmann
  def testPriorityListSingleTasks(self):
392 52c47e4e Michael Hanselmann
    # Tests whether all tasks are run and, since we're only using a single
393 52c47e4e Michael Hanselmann
    # thread, whether everything is started in order and respects the priority
394 52c47e4e Michael Hanselmann
    wp = workerpool.WorkerPool("Test", 1, ListBuilderWorker)
395 52c47e4e Michael Hanselmann
    try:
396 52c47e4e Michael Hanselmann
      self._CheckWorkerCount(wp, 1)
397 52c47e4e Michael Hanselmann
398 52c47e4e Michael Hanselmann
      ctx = ListBuilderContext()
399 52c47e4e Michael Hanselmann
400 52c47e4e Michael Hanselmann
      # Use static seed for this test
401 52c47e4e Michael Hanselmann
      rnd = random.Random(26279)
402 52c47e4e Michael Hanselmann
403 52c47e4e Michael Hanselmann
      data = {}
404 52c47e4e Michael Hanselmann
      for i in range(1, 333):
405 52c47e4e Michael Hanselmann
        prio = int(rnd.random() * 30)
406 52c47e4e Michael Hanselmann
        wp.AddTask((ctx, i), priority=prio)
407 52c47e4e Michael Hanselmann
        data.setdefault(prio, []).append(i)
408 52c47e4e Michael Hanselmann
409 52c47e4e Michael Hanselmann
        # Cause some distortion
410 52c47e4e Michael Hanselmann
        if i % 11 == 0:
411 52c47e4e Michael Hanselmann
          time.sleep(.001)
412 52c47e4e Michael Hanselmann
        if i % 41 == 0:
413 52c47e4e Michael Hanselmann
          wp.Quiesce()
414 52c47e4e Michael Hanselmann
415 52c47e4e Michael Hanselmann
      wp.Quiesce()
416 52c47e4e Michael Hanselmann
417 52c47e4e Michael Hanselmann
      self._CheckNoTasks(wp)
418 52c47e4e Michael Hanselmann
419 52c47e4e Michael Hanselmann
      # Check result
420 52c47e4e Michael Hanselmann
      ctx.lock.acquire()
421 52c47e4e Michael Hanselmann
      try:
422 52c47e4e Michael Hanselmann
        self.assertEqual(data, ctx.prioresult)
423 52c47e4e Michael Hanselmann
      finally:
424 52c47e4e Michael Hanselmann
        ctx.lock.release()
425 52c47e4e Michael Hanselmann
426 52c47e4e Michael Hanselmann
      self._CheckWorkerCount(wp, 1)
427 52c47e4e Michael Hanselmann
    finally:
428 52c47e4e Michael Hanselmann
      wp.TerminateWorkers()
429 52c47e4e Michael Hanselmann
      self._CheckWorkerCount(wp, 0)
430 52c47e4e Michael Hanselmann
431 52c47e4e Michael Hanselmann
  def testPriorityListSingleTasks(self):
432 52c47e4e Michael Hanselmann
    # Tests whether all tasks are run and, since we're only using a single
433 52c47e4e Michael Hanselmann
    # thread, whether everything is started in order and respects the priority
434 52c47e4e Michael Hanselmann
    wp = workerpool.WorkerPool("Test", 1, ListBuilderWorker)
435 52c47e4e Michael Hanselmann
    try:
436 52c47e4e Michael Hanselmann
      self._CheckWorkerCount(wp, 1)
437 52c47e4e Michael Hanselmann
438 52c47e4e Michael Hanselmann
      ctx = ListBuilderContext()
439 52c47e4e Michael Hanselmann
440 52c47e4e Michael Hanselmann
      # Use static seed for this test
441 52c47e4e Michael Hanselmann
      rnd = random.Random(26279)
442 52c47e4e Michael Hanselmann
443 52c47e4e Michael Hanselmann
      data = {}
444 52c47e4e Michael Hanselmann
      for i in range(1, 333):
445 52c47e4e Michael Hanselmann
        prio = int(rnd.random() * 30)
446 52c47e4e Michael Hanselmann
        wp.AddTask((ctx, i), priority=prio)
447 52c47e4e Michael Hanselmann
        data.setdefault(prio, []).append(i)
448 52c47e4e Michael Hanselmann
449 52c47e4e Michael Hanselmann
        # Cause some distortion
450 52c47e4e Michael Hanselmann
        if i % 11 == 0:
451 52c47e4e Michael Hanselmann
          time.sleep(.001)
452 52c47e4e Michael Hanselmann
        if i % 41 == 0:
453 52c47e4e Michael Hanselmann
          wp.Quiesce()
454 52c47e4e Michael Hanselmann
455 52c47e4e Michael Hanselmann
      wp.Quiesce()
456 52c47e4e Michael Hanselmann
457 52c47e4e Michael Hanselmann
      self._CheckNoTasks(wp)
458 52c47e4e Michael Hanselmann
459 52c47e4e Michael Hanselmann
      # Check result
460 52c47e4e Michael Hanselmann
      ctx.lock.acquire()
461 52c47e4e Michael Hanselmann
      try:
462 52c47e4e Michael Hanselmann
        self.assertEqual(data, ctx.prioresult)
463 52c47e4e Michael Hanselmann
      finally:
464 52c47e4e Michael Hanselmann
        ctx.lock.release()
465 52c47e4e Michael Hanselmann
466 52c47e4e Michael Hanselmann
      self._CheckWorkerCount(wp, 1)
467 52c47e4e Michael Hanselmann
    finally:
468 52c47e4e Michael Hanselmann
      wp.TerminateWorkers()
469 52c47e4e Michael Hanselmann
      self._CheckWorkerCount(wp, 0)
470 52c47e4e Michael Hanselmann
471 52c47e4e Michael Hanselmann
  def testDeferTask(self):
472 52c47e4e Michael Hanselmann
    # Tests whether all tasks are run and, since we're only using a single
473 52c47e4e Michael Hanselmann
    # thread, whether everything is started in order and respects the priority
474 52c47e4e Michael Hanselmann
    wp = workerpool.WorkerPool("Test", 1, DeferringWorker)
475 52c47e4e Michael Hanselmann
    try:
476 52c47e4e Michael Hanselmann
      self._CheckWorkerCount(wp, 1)
477 52c47e4e Michael Hanselmann
478 52c47e4e Michael Hanselmann
      ctx = DeferringTaskContext()
479 52c47e4e Michael Hanselmann
480 52c47e4e Michael Hanselmann
      # Use static seed for this test
481 52c47e4e Michael Hanselmann
      rnd = random.Random(14921)
482 52c47e4e Michael Hanselmann
483 52c47e4e Michael Hanselmann
      data = {}
484 52c47e4e Michael Hanselmann
      for i in range(1, 333):
485 52c47e4e Michael Hanselmann
        ctx.lock.acquire()
486 52c47e4e Michael Hanselmann
        try:
487 52c47e4e Michael Hanselmann
          if i % 5 == 0:
488 52c47e4e Michael Hanselmann
            ctx.samepriodefer[i] = True
489 52c47e4e Michael Hanselmann
        finally:
490 52c47e4e Michael Hanselmann
          ctx.lock.release()
491 52c47e4e Michael Hanselmann
492 52c47e4e Michael Hanselmann
        prio = int(rnd.random() * 30)
493 52c47e4e Michael Hanselmann
        wp.AddTask((ctx, i, prio), priority=50)
494 52c47e4e Michael Hanselmann
        data.setdefault(prio, set()).add(i)
495 52c47e4e Michael Hanselmann
496 52c47e4e Michael Hanselmann
        # Cause some distortion
497 52c47e4e Michael Hanselmann
        if i % 24 == 0:
498 52c47e4e Michael Hanselmann
          time.sleep(.001)
499 52c47e4e Michael Hanselmann
        if i % 31 == 0:
500 52c47e4e Michael Hanselmann
          wp.Quiesce()
501 52c47e4e Michael Hanselmann
502 52c47e4e Michael Hanselmann
      wp.Quiesce()
503 52c47e4e Michael Hanselmann
504 52c47e4e Michael Hanselmann
      self._CheckNoTasks(wp)
505 52c47e4e Michael Hanselmann
506 52c47e4e Michael Hanselmann
      # Check result
507 52c47e4e Michael Hanselmann
      ctx.lock.acquire()
508 52c47e4e Michael Hanselmann
      try:
509 52c47e4e Michael Hanselmann
        self.assertEqual(data, ctx.prioresult)
510 52c47e4e Michael Hanselmann
      finally:
511 52c47e4e Michael Hanselmann
        ctx.lock.release()
512 52c47e4e Michael Hanselmann
513 52c47e4e Michael Hanselmann
      self._CheckWorkerCount(wp, 1)
514 52c47e4e Michael Hanselmann
    finally:
515 52c47e4e Michael Hanselmann
      wp.TerminateWorkers()
516 52c47e4e Michael Hanselmann
      self._CheckWorkerCount(wp, 0)
517 52c47e4e Michael Hanselmann
518 76094e37 Michael Hanselmann
519 76094e37 Michael Hanselmann
if __name__ == '__main__':
520 25231ec5 Michael Hanselmann
  testutils.GanetiTestProgram()