Statistics
| Branch: | Tag: | Revision:

root / test / ganeti.workerpool_unittest.py @ 27caa993

History | View | Annotate | Download (13.3 kB)

1
#!/usr/bin/python
2
#
3

    
4
# Copyright (C) 2008, 2009, 2010 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Script for unittesting the workerpool module"""
23

    
24
import unittest
25
import threading
26
import time
27
import sys
28
import zlib
29
import random
30

    
31
from ganeti import workerpool
32
from ganeti import errors
33

    
34
import testutils
35

    
36

    
37
class CountingContext(object):
38
  def __init__(self):
39
    self._lock = threading.Condition(threading.Lock())
40
    self.done = 0
41

    
42
  def DoneTask(self):
43
    self._lock.acquire()
44
    try:
45
      self.done += 1
46
    finally:
47
      self._lock.release()
48

    
49
  def GetDoneTasks(self):
50
    self._lock.acquire()
51
    try:
52
      return self.done
53
    finally:
54
      self._lock.release()
55

    
56
  @staticmethod
57
  def UpdateChecksum(current, value):
58
    return zlib.adler32(str(value), current)
59

    
60

    
61
class CountingBaseWorker(workerpool.BaseWorker):
62
  def RunTask(self, ctx, text):
63
    ctx.DoneTask()
64

    
65

    
66
class ChecksumContext:
67
  CHECKSUM_START = zlib.adler32("")
68

    
69
  def __init__(self):
70
    self.lock = threading.Condition(threading.Lock())
71
    self.checksum = self.CHECKSUM_START
72

    
73
  @staticmethod
74
  def UpdateChecksum(current, value):
75
    return zlib.adler32(str(value), current)
76

    
77

    
78
class ChecksumBaseWorker(workerpool.BaseWorker):
79
  def RunTask(self, ctx, number):
80
    name = "number%s" % number
81
    self.SetTaskName(name)
82

    
83
    # This assertion needs to be checked before updating the checksum. A
84
    # failing assertion will then cause the result to be wrong.
85
    assert self.getName() == ("%s/%s" % (self._worker_id, name))
86

    
87
    ctx.lock.acquire()
88
    try:
89
      ctx.checksum = ctx.UpdateChecksum(ctx.checksum, number)
90
    finally:
91
      ctx.lock.release()
92

    
93

    
94
class ListBuilderContext:
95
  def __init__(self):
96
    self.lock = threading.Lock()
97
    self.result = []
98
    self.prioresult = {}
99

    
100

    
101
class ListBuilderWorker(workerpool.BaseWorker):
102
  def RunTask(self, ctx, data):
103
    ctx.lock.acquire()
104
    try:
105
      ctx.result.append((self.GetCurrentPriority(), data))
106
      ctx.prioresult.setdefault(self.GetCurrentPriority(), []).append(data)
107
    finally:
108
      ctx.lock.release()
109

    
110

    
111
class DeferringTaskContext:
112
  def __init__(self):
113
    self.lock = threading.Lock()
114
    self.prioresult = {}
115
    self.samepriodefer = {}
116

    
117

    
118
class DeferringWorker(workerpool.BaseWorker):
119
  def RunTask(self, ctx, num, targetprio):
120
    ctx.lock.acquire()
121
    try:
122
      if num in ctx.samepriodefer:
123
        del ctx.samepriodefer[num]
124
        raise workerpool.DeferTask()
125

    
126
      if self.GetCurrentPriority() > targetprio:
127
        raise workerpool.DeferTask(priority=self.GetCurrentPriority() - 1)
128

    
129
      ctx.prioresult.setdefault(self.GetCurrentPriority(), set()).add(num)
130
    finally:
131
      ctx.lock.release()
132

    
133

    
134
class TestWorkerpool(unittest.TestCase):
135
  """Workerpool tests"""
136

    
137
  def testCounting(self):
138
    ctx = CountingContext()
139
    wp = workerpool.WorkerPool("Test", 3, CountingBaseWorker)
140
    try:
141
      self._CheckWorkerCount(wp, 3)
142

    
143
      for i in range(10):
144
        wp.AddTask((ctx, "Hello world %s" % i))
145

    
146
      wp.Quiesce()
147
    finally:
148
      wp.TerminateWorkers()
149
      self._CheckWorkerCount(wp, 0)
150

    
151
    self.assertEquals(ctx.GetDoneTasks(), 10)
152

    
153
  def testNoTasks(self):
154
    wp = workerpool.WorkerPool("Test", 3, CountingBaseWorker)
155
    try:
156
      self._CheckWorkerCount(wp, 3)
157
      self._CheckNoTasks(wp)
158
    finally:
159
      wp.TerminateWorkers()
160
      self._CheckWorkerCount(wp, 0)
161

    
162
  def testNoTasksQuiesce(self):
163
    wp = workerpool.WorkerPool("Test", 3, CountingBaseWorker)
164
    try:
165
      self._CheckWorkerCount(wp, 3)
166
      self._CheckNoTasks(wp)
167
      wp.Quiesce()
168
      self._CheckNoTasks(wp)
169
    finally:
170
      wp.TerminateWorkers()
171
      self._CheckWorkerCount(wp, 0)
172

    
173
  def testActive(self):
174
    ctx = CountingContext()
175
    wp = workerpool.WorkerPool("TestActive", 5, CountingBaseWorker)
176
    try:
177
      self._CheckWorkerCount(wp, 5)
178
      self.assertTrue(wp._active)
179

    
180
      # Process some tasks
181
      for _ in range(10):
182
        wp.AddTask((ctx, None))
183

    
184
      wp.Quiesce()
185
      self._CheckNoTasks(wp)
186
      self.assertEquals(ctx.GetDoneTasks(), 10)
187

    
188
      # Repeat a few times
189
      for count in range(10):
190
        # Deactivate pool
191
        wp.SetActive(False)
192
        self._CheckNoTasks(wp)
193

    
194
        # Queue some more tasks
195
        for _ in range(10):
196
          wp.AddTask((ctx, None))
197

    
198
        for _ in range(5):
199
          # Short delays to give other threads a chance to cause breakage
200
          time.sleep(.01)
201
          wp.AddTask((ctx, "Hello world %s" % 999))
202
          self.assertFalse(wp._active)
203

    
204
        self.assertEquals(ctx.GetDoneTasks(), 10 + (count * 15))
205

    
206
        # Start processing again
207
        wp.SetActive(True)
208
        self.assertTrue(wp._active)
209

    
210
        # Wait for tasks to finish
211
        wp.Quiesce()
212
        self._CheckNoTasks(wp)
213
        self.assertEquals(ctx.GetDoneTasks(), 10 + (count * 15) + 15)
214

    
215
        self._CheckWorkerCount(wp, 5)
216
    finally:
217
      wp.TerminateWorkers()
218
      self._CheckWorkerCount(wp, 0)
219

    
220
  def testChecksum(self):
221
    # Tests whether all tasks are run and, since we're only using a single
222
    # thread, whether everything is started in order.
223
    wp = workerpool.WorkerPool("Test", 1, ChecksumBaseWorker)
224
    try:
225
      self._CheckWorkerCount(wp, 1)
226

    
227
      ctx = ChecksumContext()
228
      checksum = ChecksumContext.CHECKSUM_START
229
      for i in range(1, 100):
230
        checksum = ChecksumContext.UpdateChecksum(checksum, i)
231
        wp.AddTask((ctx, i))
232

    
233
      wp.Quiesce()
234

    
235
      self._CheckNoTasks(wp)
236

    
237
      # Check sum
238
      ctx.lock.acquire()
239
      try:
240
        self.assertEqual(checksum, ctx.checksum)
241
      finally:
242
        ctx.lock.release()
243
    finally:
244
      wp.TerminateWorkers()
245
      self._CheckWorkerCount(wp, 0)
246

    
247
  def testAddManyTasks(self):
248
    ctx = CountingContext()
249
    wp = workerpool.WorkerPool("Test", 3, CountingBaseWorker)
250
    try:
251
      self._CheckWorkerCount(wp, 3)
252

    
253
      wp.AddManyTasks([(ctx, "Hello world %s" % i, ) for i in range(10)])
254
      wp.AddTask((ctx, "A separate hello"))
255
      wp.AddTask((ctx, "Once more, hi!"))
256
      wp.AddManyTasks([(ctx, "Hello world %s" % i, ) for i in range(10)])
257

    
258
      wp.Quiesce()
259

    
260
      self._CheckNoTasks(wp)
261
    finally:
262
      wp.TerminateWorkers()
263
      self._CheckWorkerCount(wp, 0)
264

    
265
    self.assertEquals(ctx.GetDoneTasks(), 22)
266

    
267
  def testManyTasksSequence(self):
268
    ctx = CountingContext()
269
    wp = workerpool.WorkerPool("Test", 3, CountingBaseWorker)
270
    try:
271
      self._CheckWorkerCount(wp, 3)
272
      self.assertRaises(AssertionError, wp.AddManyTasks,
273
                        ["Hello world %s" % i for i in range(10)])
274
      self.assertRaises(AssertionError, wp.AddManyTasks,
275
                        [i for i in range(10)])
276

    
277
      wp.AddManyTasks([(ctx, "Hello world %s" % i, ) for i in range(10)])
278
      wp.AddTask((ctx, "A separate hello"))
279

    
280
      wp.Quiesce()
281

    
282
      self._CheckNoTasks(wp)
283
    finally:
284
      wp.TerminateWorkers()
285
      self._CheckWorkerCount(wp, 0)
286

    
287
    self.assertEquals(ctx.GetDoneTasks(), 11)
288

    
289
  def _CheckNoTasks(self, wp):
290
    wp._lock.acquire()
291
    try:
292
      # The task queue must be empty now
293
      self.failUnless(not wp._tasks)
294
    finally:
295
      wp._lock.release()
296

    
297
  def _CheckWorkerCount(self, wp, num_workers):
298
    wp._lock.acquire()
299
    try:
300
      self.assertEqual(len(wp._workers), num_workers)
301
    finally:
302
      wp._lock.release()
303

    
304
  def testPriorityChecksum(self):
305
    # Tests whether all tasks are run and, since we're only using a single
306
    # thread, whether everything is started in order and respects the priority
307
    wp = workerpool.WorkerPool("Test", 1, ChecksumBaseWorker)
308
    try:
309
      self._CheckWorkerCount(wp, 1)
310

    
311
      ctx = ChecksumContext()
312

    
313
      data = {}
314
      tasks = []
315
      priorities = []
316
      for i in range(1, 333):
317
        prio = i % 7
318
        tasks.append((ctx, i))
319
        priorities.append(prio)
320
        data.setdefault(prio, []).append(i)
321

    
322
      wp.AddManyTasks(tasks, priority=priorities)
323

    
324
      wp.Quiesce()
325

    
326
      self._CheckNoTasks(wp)
327

    
328
      # Check sum
329
      ctx.lock.acquire()
330
      try:
331
        checksum = ChecksumContext.CHECKSUM_START
332
        for priority in sorted(data.keys()):
333
          for i in data[priority]:
334
            checksum = ChecksumContext.UpdateChecksum(checksum, i)
335

    
336
        self.assertEqual(checksum, ctx.checksum)
337
      finally:
338
        ctx.lock.release()
339

    
340
      self._CheckWorkerCount(wp, 1)
341
    finally:
342
      wp.TerminateWorkers()
343
      self._CheckWorkerCount(wp, 0)
344

    
345
  def testPriorityListManyTasks(self):
346
    # Tests whether all tasks are run and, since we're only using a single
347
    # thread, whether everything is started in order and respects the priority
348
    wp = workerpool.WorkerPool("Test", 1, ListBuilderWorker)
349
    try:
350
      self._CheckWorkerCount(wp, 1)
351

    
352
      ctx = ListBuilderContext()
353

    
354
      # Use static seed for this test
355
      rnd = random.Random(0)
356

    
357
      data = {}
358
      tasks = []
359
      priorities = []
360
      for i in range(1, 333):
361
        prio = int(rnd.random() * 10)
362
        tasks.append((ctx, i))
363
        priorities.append(prio)
364
        data.setdefault(prio, []).append((prio, i))
365

    
366
      wp.AddManyTasks(tasks, priority=priorities)
367

    
368
      self.assertRaises(errors.ProgrammerError, wp.AddManyTasks,
369
                        [("x", ), ("y", )], priority=[1] * 5)
370

    
371
      wp.Quiesce()
372

    
373
      self._CheckNoTasks(wp)
374

    
375
      # Check result
376
      ctx.lock.acquire()
377
      try:
378
        expresult = []
379
        for priority in sorted(data.keys()):
380
          expresult.extend(data[priority])
381

    
382
        self.assertEqual(expresult, ctx.result)
383
      finally:
384
        ctx.lock.release()
385

    
386
      self._CheckWorkerCount(wp, 1)
387
    finally:
388
      wp.TerminateWorkers()
389
      self._CheckWorkerCount(wp, 0)
390

    
391
  def testPriorityListSingleTasks(self):
392
    # Tests whether all tasks are run and, since we're only using a single
393
    # thread, whether everything is started in order and respects the priority
394
    wp = workerpool.WorkerPool("Test", 1, ListBuilderWorker)
395
    try:
396
      self._CheckWorkerCount(wp, 1)
397

    
398
      ctx = ListBuilderContext()
399

    
400
      # Use static seed for this test
401
      rnd = random.Random(26279)
402

    
403
      data = {}
404
      for i in range(1, 333):
405
        prio = int(rnd.random() * 30)
406
        wp.AddTask((ctx, i), priority=prio)
407
        data.setdefault(prio, []).append(i)
408

    
409
        # Cause some distortion
410
        if i % 11 == 0:
411
          time.sleep(.001)
412
        if i % 41 == 0:
413
          wp.Quiesce()
414

    
415
      wp.Quiesce()
416

    
417
      self._CheckNoTasks(wp)
418

    
419
      # Check result
420
      ctx.lock.acquire()
421
      try:
422
        self.assertEqual(data, ctx.prioresult)
423
      finally:
424
        ctx.lock.release()
425

    
426
      self._CheckWorkerCount(wp, 1)
427
    finally:
428
      wp.TerminateWorkers()
429
      self._CheckWorkerCount(wp, 0)
430

    
431
  def testPriorityListSingleTasks(self):
432
    # Tests whether all tasks are run and, since we're only using a single
433
    # thread, whether everything is started in order and respects the priority
434
    wp = workerpool.WorkerPool("Test", 1, ListBuilderWorker)
435
    try:
436
      self._CheckWorkerCount(wp, 1)
437

    
438
      ctx = ListBuilderContext()
439

    
440
      # Use static seed for this test
441
      rnd = random.Random(26279)
442

    
443
      data = {}
444
      for i in range(1, 333):
445
        prio = int(rnd.random() * 30)
446
        wp.AddTask((ctx, i), priority=prio)
447
        data.setdefault(prio, []).append(i)
448

    
449
        # Cause some distortion
450
        if i % 11 == 0:
451
          time.sleep(.001)
452
        if i % 41 == 0:
453
          wp.Quiesce()
454

    
455
      wp.Quiesce()
456

    
457
      self._CheckNoTasks(wp)
458

    
459
      # Check result
460
      ctx.lock.acquire()
461
      try:
462
        self.assertEqual(data, ctx.prioresult)
463
      finally:
464
        ctx.lock.release()
465

    
466
      self._CheckWorkerCount(wp, 1)
467
    finally:
468
      wp.TerminateWorkers()
469
      self._CheckWorkerCount(wp, 0)
470

    
471
  def testDeferTask(self):
472
    # Tests whether all tasks are run and, since we're only using a single
473
    # thread, whether everything is started in order and respects the priority
474
    wp = workerpool.WorkerPool("Test", 1, DeferringWorker)
475
    try:
476
      self._CheckWorkerCount(wp, 1)
477

    
478
      ctx = DeferringTaskContext()
479

    
480
      # Use static seed for this test
481
      rnd = random.Random(14921)
482

    
483
      data = {}
484
      for i in range(1, 333):
485
        ctx.lock.acquire()
486
        try:
487
          if i % 5 == 0:
488
            ctx.samepriodefer[i] = True
489
        finally:
490
          ctx.lock.release()
491

    
492
        prio = int(rnd.random() * 30)
493
        wp.AddTask((ctx, i, prio), priority=50)
494
        data.setdefault(prio, set()).add(i)
495

    
496
        # Cause some distortion
497
        if i % 24 == 0:
498
          time.sleep(.001)
499
        if i % 31 == 0:
500
          wp.Quiesce()
501

    
502
      wp.Quiesce()
503

    
504
      self._CheckNoTasks(wp)
505

    
506
      # Check result
507
      ctx.lock.acquire()
508
      try:
509
        self.assertEqual(data, ctx.prioresult)
510
      finally:
511
        ctx.lock.release()
512

    
513
      self._CheckWorkerCount(wp, 1)
514
    finally:
515
      wp.TerminateWorkers()
516
      self._CheckWorkerCount(wp, 0)
517

    
518

    
519
if __name__ == '__main__':
520
  testutils.GanetiTestProgram()