Statistics
| Branch: | Tag: | Revision:

root / test / ganeti.workerpool_unittest.py @ 52c47e4e

History | View | Annotate | Download (11.8 kB)

1
#!/usr/bin/python
2
#
3

    
4
# Copyright (C) 2008, 2009, 2010 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Script for unittesting the workerpool module"""
23

    
24
import unittest
25
import threading
26
import time
27
import sys
28
import zlib
29
import random
30

    
31
from ganeti import workerpool
32
from ganeti import errors
33

    
34
import testutils
35

    
36

    
37
class CountingContext(object):
38
  def __init__(self):
39
    self._lock = threading.Condition(threading.Lock())
40
    self.done = 0
41

    
42
  def DoneTask(self):
43
    self._lock.acquire()
44
    try:
45
      self.done += 1
46
    finally:
47
      self._lock.release()
48

    
49
  def GetDoneTasks(self):
50
    self._lock.acquire()
51
    try:
52
      return self.done
53
    finally:
54
      self._lock.release()
55

    
56
  @staticmethod
57
  def UpdateChecksum(current, value):
58
    return zlib.adler32(str(value), current)
59

    
60

    
61
class CountingBaseWorker(workerpool.BaseWorker):
62
  def RunTask(self, ctx, text):
63
    ctx.DoneTask()
64

    
65

    
66
class ChecksumContext:
67
  CHECKSUM_START = zlib.adler32("")
68

    
69
  def __init__(self):
70
    self.lock = threading.Condition(threading.Lock())
71
    self.checksum = self.CHECKSUM_START
72

    
73
  @staticmethod
74
  def UpdateChecksum(current, value):
75
    return zlib.adler32(str(value), current)
76

    
77

    
78
class ChecksumBaseWorker(workerpool.BaseWorker):
79
  def RunTask(self, ctx, number):
80
    ctx.lock.acquire()
81
    try:
82
      ctx.checksum = ctx.UpdateChecksum(ctx.checksum, number)
83
    finally:
84
      ctx.lock.release()
85

    
86

    
87
class ListBuilderContext:
88
  def __init__(self):
89
    self.lock = threading.Lock()
90
    self.result = []
91
    self.prioresult = {}
92

    
93

    
94
class ListBuilderWorker(workerpool.BaseWorker):
95
  def RunTask(self, ctx, data):
96
    ctx.lock.acquire()
97
    try:
98
      ctx.result.append((self.GetCurrentPriority(), data))
99
      ctx.prioresult.setdefault(self.GetCurrentPriority(), []).append(data)
100
    finally:
101
      ctx.lock.release()
102

    
103

    
104
class DeferringTaskContext:
105
  def __init__(self):
106
    self.lock = threading.Lock()
107
    self.prioresult = {}
108
    self.samepriodefer = {}
109

    
110

    
111
class DeferringWorker(workerpool.BaseWorker):
112
  def RunTask(self, ctx, num, targetprio):
113
    ctx.lock.acquire()
114
    try:
115
      if num in ctx.samepriodefer:
116
        del ctx.samepriodefer[num]
117
        raise workerpool.DeferTask()
118

    
119
      if self.GetCurrentPriority() > targetprio:
120
        raise workerpool.DeferTask(priority=self.GetCurrentPriority() - 1)
121

    
122
      ctx.prioresult.setdefault(self.GetCurrentPriority(), set()).add(num)
123
    finally:
124
      ctx.lock.release()
125

    
126

    
127
class TestWorkerpool(unittest.TestCase):
128
  """Workerpool tests"""
129

    
130
  def testCounting(self):
131
    ctx = CountingContext()
132
    wp = workerpool.WorkerPool("Test", 3, CountingBaseWorker)
133
    try:
134
      self._CheckWorkerCount(wp, 3)
135

    
136
      for i in range(10):
137
        wp.AddTask((ctx, "Hello world %s" % i))
138

    
139
      wp.Quiesce()
140
    finally:
141
      wp.TerminateWorkers()
142
      self._CheckWorkerCount(wp, 0)
143

    
144
    self.assertEquals(ctx.GetDoneTasks(), 10)
145

    
146
  def testNoTasks(self):
147
    wp = workerpool.WorkerPool("Test", 3, CountingBaseWorker)
148
    try:
149
      self._CheckWorkerCount(wp, 3)
150
      self._CheckNoTasks(wp)
151
    finally:
152
      wp.TerminateWorkers()
153
      self._CheckWorkerCount(wp, 0)
154

    
155
  def testNoTasksQuiesce(self):
156
    wp = workerpool.WorkerPool("Test", 3, CountingBaseWorker)
157
    try:
158
      self._CheckWorkerCount(wp, 3)
159
      self._CheckNoTasks(wp)
160
      wp.Quiesce()
161
      self._CheckNoTasks(wp)
162
    finally:
163
      wp.TerminateWorkers()
164
      self._CheckWorkerCount(wp, 0)
165

    
166
  def testChecksum(self):
167
    # Tests whether all tasks are run and, since we're only using a single
168
    # thread, whether everything is started in order.
169
    wp = workerpool.WorkerPool("Test", 1, ChecksumBaseWorker)
170
    try:
171
      self._CheckWorkerCount(wp, 1)
172

    
173
      ctx = ChecksumContext()
174
      checksum = ChecksumContext.CHECKSUM_START
175
      for i in range(1, 100):
176
        checksum = ChecksumContext.UpdateChecksum(checksum, i)
177
        wp.AddTask((ctx, i))
178

    
179
      wp.Quiesce()
180

    
181
      self._CheckNoTasks(wp)
182

    
183
      # Check sum
184
      ctx.lock.acquire()
185
      try:
186
        self.assertEqual(checksum, ctx.checksum)
187
      finally:
188
        ctx.lock.release()
189
    finally:
190
      wp.TerminateWorkers()
191
      self._CheckWorkerCount(wp, 0)
192

    
193
  def testAddManyTasks(self):
194
    ctx = CountingContext()
195
    wp = workerpool.WorkerPool("Test", 3, CountingBaseWorker)
196
    try:
197
      self._CheckWorkerCount(wp, 3)
198

    
199
      wp.AddManyTasks([(ctx, "Hello world %s" % i, ) for i in range(10)])
200
      wp.AddTask((ctx, "A separate hello"))
201
      wp.AddTask((ctx, "Once more, hi!"))
202
      wp.AddManyTasks([(ctx, "Hello world %s" % i, ) for i in range(10)])
203

    
204
      wp.Quiesce()
205

    
206
      self._CheckNoTasks(wp)
207
    finally:
208
      wp.TerminateWorkers()
209
      self._CheckWorkerCount(wp, 0)
210

    
211
    self.assertEquals(ctx.GetDoneTasks(), 22)
212

    
213
  def testManyTasksSequence(self):
214
    ctx = CountingContext()
215
    wp = workerpool.WorkerPool("Test", 3, CountingBaseWorker)
216
    try:
217
      self._CheckWorkerCount(wp, 3)
218
      self.assertRaises(AssertionError, wp.AddManyTasks,
219
                        ["Hello world %s" % i for i in range(10)])
220
      self.assertRaises(AssertionError, wp.AddManyTasks,
221
                        [i for i in range(10)])
222

    
223
      wp.AddManyTasks([(ctx, "Hello world %s" % i, ) for i in range(10)])
224
      wp.AddTask((ctx, "A separate hello"))
225

    
226
      wp.Quiesce()
227

    
228
      self._CheckNoTasks(wp)
229
    finally:
230
      wp.TerminateWorkers()
231
      self._CheckWorkerCount(wp, 0)
232

    
233
    self.assertEquals(ctx.GetDoneTasks(), 11)
234

    
235
  def _CheckNoTasks(self, wp):
236
    wp._lock.acquire()
237
    try:
238
      # The task queue must be empty now
239
      self.failUnless(not wp._tasks)
240
    finally:
241
      wp._lock.release()
242

    
243
  def _CheckWorkerCount(self, wp, num_workers):
244
    wp._lock.acquire()
245
    try:
246
      self.assertEqual(len(wp._workers), num_workers)
247
    finally:
248
      wp._lock.release()
249

    
250
  def testPriorityChecksum(self):
251
    # Tests whether all tasks are run and, since we're only using a single
252
    # thread, whether everything is started in order and respects the priority
253
    wp = workerpool.WorkerPool("Test", 1, ChecksumBaseWorker)
254
    try:
255
      self._CheckWorkerCount(wp, 1)
256

    
257
      ctx = ChecksumContext()
258

    
259
      data = {}
260
      tasks = []
261
      priorities = []
262
      for i in range(1, 333):
263
        prio = i % 7
264
        tasks.append((ctx, i))
265
        priorities.append(prio)
266
        data.setdefault(prio, []).append(i)
267

    
268
      wp.AddManyTasks(tasks, priority=priorities)
269

    
270
      wp.Quiesce()
271

    
272
      self._CheckNoTasks(wp)
273

    
274
      # Check sum
275
      ctx.lock.acquire()
276
      try:
277
        checksum = ChecksumContext.CHECKSUM_START
278
        for priority in sorted(data.keys()):
279
          for i in data[priority]:
280
            checksum = ChecksumContext.UpdateChecksum(checksum, i)
281

    
282
        self.assertEqual(checksum, ctx.checksum)
283
      finally:
284
        ctx.lock.release()
285

    
286
      self._CheckWorkerCount(wp, 1)
287
    finally:
288
      wp.TerminateWorkers()
289
      self._CheckWorkerCount(wp, 0)
290

    
291
  def testPriorityListManyTasks(self):
292
    # Tests whether all tasks are run and, since we're only using a single
293
    # thread, whether everything is started in order and respects the priority
294
    wp = workerpool.WorkerPool("Test", 1, ListBuilderWorker)
295
    try:
296
      self._CheckWorkerCount(wp, 1)
297

    
298
      ctx = ListBuilderContext()
299

    
300
      # Use static seed for this test
301
      rnd = random.Random(0)
302

    
303
      data = {}
304
      tasks = []
305
      priorities = []
306
      for i in range(1, 333):
307
        prio = int(rnd.random() * 10)
308
        tasks.append((ctx, i))
309
        priorities.append(prio)
310
        data.setdefault(prio, []).append((prio, i))
311

    
312
      wp.AddManyTasks(tasks, priority=priorities)
313

    
314
      self.assertRaises(errors.ProgrammerError, wp.AddManyTasks,
315
                        [("x", ), ("y", )], priority=[1] * 5)
316

    
317
      wp.Quiesce()
318

    
319
      self._CheckNoTasks(wp)
320

    
321
      # Check result
322
      ctx.lock.acquire()
323
      try:
324
        expresult = []
325
        for priority in sorted(data.keys()):
326
          expresult.extend(data[priority])
327

    
328
        self.assertEqual(expresult, ctx.result)
329
      finally:
330
        ctx.lock.release()
331

    
332
      self._CheckWorkerCount(wp, 1)
333
    finally:
334
      wp.TerminateWorkers()
335
      self._CheckWorkerCount(wp, 0)
336

    
337
  def testPriorityListSingleTasks(self):
338
    # Tests whether all tasks are run and, since we're only using a single
339
    # thread, whether everything is started in order and respects the priority
340
    wp = workerpool.WorkerPool("Test", 1, ListBuilderWorker)
341
    try:
342
      self._CheckWorkerCount(wp, 1)
343

    
344
      ctx = ListBuilderContext()
345

    
346
      # Use static seed for this test
347
      rnd = random.Random(26279)
348

    
349
      data = {}
350
      for i in range(1, 333):
351
        prio = int(rnd.random() * 30)
352
        wp.AddTask((ctx, i), priority=prio)
353
        data.setdefault(prio, []).append(i)
354

    
355
        # Cause some distortion
356
        if i % 11 == 0:
357
          time.sleep(.001)
358
        if i % 41 == 0:
359
          wp.Quiesce()
360

    
361
      wp.Quiesce()
362

    
363
      self._CheckNoTasks(wp)
364

    
365
      # Check result
366
      ctx.lock.acquire()
367
      try:
368
        self.assertEqual(data, ctx.prioresult)
369
      finally:
370
        ctx.lock.release()
371

    
372
      self._CheckWorkerCount(wp, 1)
373
    finally:
374
      wp.TerminateWorkers()
375
      self._CheckWorkerCount(wp, 0)
376

    
377
  def testPriorityListSingleTasks(self):
378
    # Tests whether all tasks are run and, since we're only using a single
379
    # thread, whether everything is started in order and respects the priority
380
    wp = workerpool.WorkerPool("Test", 1, ListBuilderWorker)
381
    try:
382
      self._CheckWorkerCount(wp, 1)
383

    
384
      ctx = ListBuilderContext()
385

    
386
      # Use static seed for this test
387
      rnd = random.Random(26279)
388

    
389
      data = {}
390
      for i in range(1, 333):
391
        prio = int(rnd.random() * 30)
392
        wp.AddTask((ctx, i), priority=prio)
393
        data.setdefault(prio, []).append(i)
394

    
395
        # Cause some distortion
396
        if i % 11 == 0:
397
          time.sleep(.001)
398
        if i % 41 == 0:
399
          wp.Quiesce()
400

    
401
      wp.Quiesce()
402

    
403
      self._CheckNoTasks(wp)
404

    
405
      # Check result
406
      ctx.lock.acquire()
407
      try:
408
        self.assertEqual(data, ctx.prioresult)
409
      finally:
410
        ctx.lock.release()
411

    
412
      self._CheckWorkerCount(wp, 1)
413
    finally:
414
      wp.TerminateWorkers()
415
      self._CheckWorkerCount(wp, 0)
416

    
417
  def testDeferTask(self):
418
    # Tests whether all tasks are run and, since we're only using a single
419
    # thread, whether everything is started in order and respects the priority
420
    wp = workerpool.WorkerPool("Test", 1, DeferringWorker)
421
    try:
422
      self._CheckWorkerCount(wp, 1)
423

    
424
      ctx = DeferringTaskContext()
425

    
426
      # Use static seed for this test
427
      rnd = random.Random(14921)
428

    
429
      data = {}
430
      for i in range(1, 333):
431
        ctx.lock.acquire()
432
        try:
433
          if i % 5 == 0:
434
            ctx.samepriodefer[i] = True
435
        finally:
436
          ctx.lock.release()
437

    
438
        prio = int(rnd.random() * 30)
439
        wp.AddTask((ctx, i, prio), priority=50)
440
        data.setdefault(prio, set()).add(i)
441

    
442
        # Cause some distortion
443
        if i % 24 == 0:
444
          time.sleep(.001)
445
        if i % 31 == 0:
446
          wp.Quiesce()
447

    
448
      wp.Quiesce()
449

    
450
      self._CheckNoTasks(wp)
451

    
452
      # Check result
453
      ctx.lock.acquire()
454
      try:
455
        self.assertEqual(data, ctx.prioresult)
456
      finally:
457
        ctx.lock.release()
458

    
459
      self._CheckWorkerCount(wp, 1)
460
    finally:
461
      wp.TerminateWorkers()
462
      self._CheckWorkerCount(wp, 0)
463

    
464

    
465
if __name__ == '__main__':
466
  testutils.GanetiTestProgram()