Statistics
| Branch: | Tag: | Revision:

root / test / ganeti.workerpool_unittest.py @ 26d3fd2f

History | View | Annotate | Download (12 kB)

1
#!/usr/bin/python
2
#
3

    
4
# Copyright (C) 2008, 2009, 2010 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Script for unittesting the workerpool module"""
23

    
24
import unittest
25
import threading
26
import time
27
import sys
28
import zlib
29
import random
30

    
31
from ganeti import workerpool
32
from ganeti import errors
33

    
34
import testutils
35

    
36

    
37
class CountingContext(object):
38
  def __init__(self):
39
    self._lock = threading.Condition(threading.Lock())
40
    self.done = 0
41

    
42
  def DoneTask(self):
43
    self._lock.acquire()
44
    try:
45
      self.done += 1
46
    finally:
47
      self._lock.release()
48

    
49
  def GetDoneTasks(self):
50
    self._lock.acquire()
51
    try:
52
      return self.done
53
    finally:
54
      self._lock.release()
55

    
56
  @staticmethod
57
  def UpdateChecksum(current, value):
58
    return zlib.adler32(str(value), current)
59

    
60

    
61
class CountingBaseWorker(workerpool.BaseWorker):
62
  def RunTask(self, ctx, text):
63
    ctx.DoneTask()
64

    
65

    
66
class ChecksumContext:
67
  CHECKSUM_START = zlib.adler32("")
68

    
69
  def __init__(self):
70
    self.lock = threading.Condition(threading.Lock())
71
    self.checksum = self.CHECKSUM_START
72

    
73
  @staticmethod
74
  def UpdateChecksum(current, value):
75
    return zlib.adler32(str(value), current)
76

    
77

    
78
class ChecksumBaseWorker(workerpool.BaseWorker):
79
  def RunTask(self, ctx, number):
80
    name = "number%s" % number
81
    self.SetTaskName(name)
82

    
83
    # This assertion needs to be checked before updating the checksum. A
84
    # failing assertion will then cause the result to be wrong.
85
    assert self.getName() == ("%s/%s" % (self._worker_id, name))
86

    
87
    ctx.lock.acquire()
88
    try:
89
      ctx.checksum = ctx.UpdateChecksum(ctx.checksum, number)
90
    finally:
91
      ctx.lock.release()
92

    
93

    
94
class ListBuilderContext:
95
  def __init__(self):
96
    self.lock = threading.Lock()
97
    self.result = []
98
    self.prioresult = {}
99

    
100

    
101
class ListBuilderWorker(workerpool.BaseWorker):
102
  def RunTask(self, ctx, data):
103
    ctx.lock.acquire()
104
    try:
105
      ctx.result.append((self.GetCurrentPriority(), data))
106
      ctx.prioresult.setdefault(self.GetCurrentPriority(), []).append(data)
107
    finally:
108
      ctx.lock.release()
109

    
110

    
111
class DeferringTaskContext:
112
  def __init__(self):
113
    self.lock = threading.Lock()
114
    self.prioresult = {}
115
    self.samepriodefer = {}
116

    
117

    
118
class DeferringWorker(workerpool.BaseWorker):
119
  def RunTask(self, ctx, num, targetprio):
120
    ctx.lock.acquire()
121
    try:
122
      if num in ctx.samepriodefer:
123
        del ctx.samepriodefer[num]
124
        raise workerpool.DeferTask()
125

    
126
      if self.GetCurrentPriority() > targetprio:
127
        raise workerpool.DeferTask(priority=self.GetCurrentPriority() - 1)
128

    
129
      ctx.prioresult.setdefault(self.GetCurrentPriority(), set()).add(num)
130
    finally:
131
      ctx.lock.release()
132

    
133

    
134
class TestWorkerpool(unittest.TestCase):
135
  """Workerpool tests"""
136

    
137
  def testCounting(self):
138
    ctx = CountingContext()
139
    wp = workerpool.WorkerPool("Test", 3, CountingBaseWorker)
140
    try:
141
      self._CheckWorkerCount(wp, 3)
142

    
143
      for i in range(10):
144
        wp.AddTask((ctx, "Hello world %s" % i))
145

    
146
      wp.Quiesce()
147
    finally:
148
      wp.TerminateWorkers()
149
      self._CheckWorkerCount(wp, 0)
150

    
151
    self.assertEquals(ctx.GetDoneTasks(), 10)
152

    
153
  def testNoTasks(self):
154
    wp = workerpool.WorkerPool("Test", 3, CountingBaseWorker)
155
    try:
156
      self._CheckWorkerCount(wp, 3)
157
      self._CheckNoTasks(wp)
158
    finally:
159
      wp.TerminateWorkers()
160
      self._CheckWorkerCount(wp, 0)
161

    
162
  def testNoTasksQuiesce(self):
163
    wp = workerpool.WorkerPool("Test", 3, CountingBaseWorker)
164
    try:
165
      self._CheckWorkerCount(wp, 3)
166
      self._CheckNoTasks(wp)
167
      wp.Quiesce()
168
      self._CheckNoTasks(wp)
169
    finally:
170
      wp.TerminateWorkers()
171
      self._CheckWorkerCount(wp, 0)
172

    
173
  def testChecksum(self):
174
    # Tests whether all tasks are run and, since we're only using a single
175
    # thread, whether everything is started in order.
176
    wp = workerpool.WorkerPool("Test", 1, ChecksumBaseWorker)
177
    try:
178
      self._CheckWorkerCount(wp, 1)
179

    
180
      ctx = ChecksumContext()
181
      checksum = ChecksumContext.CHECKSUM_START
182
      for i in range(1, 100):
183
        checksum = ChecksumContext.UpdateChecksum(checksum, i)
184
        wp.AddTask((ctx, i))
185

    
186
      wp.Quiesce()
187

    
188
      self._CheckNoTasks(wp)
189

    
190
      # Check sum
191
      ctx.lock.acquire()
192
      try:
193
        self.assertEqual(checksum, ctx.checksum)
194
      finally:
195
        ctx.lock.release()
196
    finally:
197
      wp.TerminateWorkers()
198
      self._CheckWorkerCount(wp, 0)
199

    
200
  def testAddManyTasks(self):
201
    ctx = CountingContext()
202
    wp = workerpool.WorkerPool("Test", 3, CountingBaseWorker)
203
    try:
204
      self._CheckWorkerCount(wp, 3)
205

    
206
      wp.AddManyTasks([(ctx, "Hello world %s" % i, ) for i in range(10)])
207
      wp.AddTask((ctx, "A separate hello"))
208
      wp.AddTask((ctx, "Once more, hi!"))
209
      wp.AddManyTasks([(ctx, "Hello world %s" % i, ) for i in range(10)])
210

    
211
      wp.Quiesce()
212

    
213
      self._CheckNoTasks(wp)
214
    finally:
215
      wp.TerminateWorkers()
216
      self._CheckWorkerCount(wp, 0)
217

    
218
    self.assertEquals(ctx.GetDoneTasks(), 22)
219

    
220
  def testManyTasksSequence(self):
221
    ctx = CountingContext()
222
    wp = workerpool.WorkerPool("Test", 3, CountingBaseWorker)
223
    try:
224
      self._CheckWorkerCount(wp, 3)
225
      self.assertRaises(AssertionError, wp.AddManyTasks,
226
                        ["Hello world %s" % i for i in range(10)])
227
      self.assertRaises(AssertionError, wp.AddManyTasks,
228
                        [i for i in range(10)])
229

    
230
      wp.AddManyTasks([(ctx, "Hello world %s" % i, ) for i in range(10)])
231
      wp.AddTask((ctx, "A separate hello"))
232

    
233
      wp.Quiesce()
234

    
235
      self._CheckNoTasks(wp)
236
    finally:
237
      wp.TerminateWorkers()
238
      self._CheckWorkerCount(wp, 0)
239

    
240
    self.assertEquals(ctx.GetDoneTasks(), 11)
241

    
242
  def _CheckNoTasks(self, wp):
243
    wp._lock.acquire()
244
    try:
245
      # The task queue must be empty now
246
      self.failUnless(not wp._tasks)
247
    finally:
248
      wp._lock.release()
249

    
250
  def _CheckWorkerCount(self, wp, num_workers):
251
    wp._lock.acquire()
252
    try:
253
      self.assertEqual(len(wp._workers), num_workers)
254
    finally:
255
      wp._lock.release()
256

    
257
  def testPriorityChecksum(self):
258
    # Tests whether all tasks are run and, since we're only using a single
259
    # thread, whether everything is started in order and respects the priority
260
    wp = workerpool.WorkerPool("Test", 1, ChecksumBaseWorker)
261
    try:
262
      self._CheckWorkerCount(wp, 1)
263

    
264
      ctx = ChecksumContext()
265

    
266
      data = {}
267
      tasks = []
268
      priorities = []
269
      for i in range(1, 333):
270
        prio = i % 7
271
        tasks.append((ctx, i))
272
        priorities.append(prio)
273
        data.setdefault(prio, []).append(i)
274

    
275
      wp.AddManyTasks(tasks, priority=priorities)
276

    
277
      wp.Quiesce()
278

    
279
      self._CheckNoTasks(wp)
280

    
281
      # Check sum
282
      ctx.lock.acquire()
283
      try:
284
        checksum = ChecksumContext.CHECKSUM_START
285
        for priority in sorted(data.keys()):
286
          for i in data[priority]:
287
            checksum = ChecksumContext.UpdateChecksum(checksum, i)
288

    
289
        self.assertEqual(checksum, ctx.checksum)
290
      finally:
291
        ctx.lock.release()
292

    
293
      self._CheckWorkerCount(wp, 1)
294
    finally:
295
      wp.TerminateWorkers()
296
      self._CheckWorkerCount(wp, 0)
297

    
298
  def testPriorityListManyTasks(self):
299
    # Tests whether all tasks are run and, since we're only using a single
300
    # thread, whether everything is started in order and respects the priority
301
    wp = workerpool.WorkerPool("Test", 1, ListBuilderWorker)
302
    try:
303
      self._CheckWorkerCount(wp, 1)
304

    
305
      ctx = ListBuilderContext()
306

    
307
      # Use static seed for this test
308
      rnd = random.Random(0)
309

    
310
      data = {}
311
      tasks = []
312
      priorities = []
313
      for i in range(1, 333):
314
        prio = int(rnd.random() * 10)
315
        tasks.append((ctx, i))
316
        priorities.append(prio)
317
        data.setdefault(prio, []).append((prio, i))
318

    
319
      wp.AddManyTasks(tasks, priority=priorities)
320

    
321
      self.assertRaises(errors.ProgrammerError, wp.AddManyTasks,
322
                        [("x", ), ("y", )], priority=[1] * 5)
323

    
324
      wp.Quiesce()
325

    
326
      self._CheckNoTasks(wp)
327

    
328
      # Check result
329
      ctx.lock.acquire()
330
      try:
331
        expresult = []
332
        for priority in sorted(data.keys()):
333
          expresult.extend(data[priority])
334

    
335
        self.assertEqual(expresult, ctx.result)
336
      finally:
337
        ctx.lock.release()
338

    
339
      self._CheckWorkerCount(wp, 1)
340
    finally:
341
      wp.TerminateWorkers()
342
      self._CheckWorkerCount(wp, 0)
343

    
344
  def testPriorityListSingleTasks(self):
345
    # Tests whether all tasks are run and, since we're only using a single
346
    # thread, whether everything is started in order and respects the priority
347
    wp = workerpool.WorkerPool("Test", 1, ListBuilderWorker)
348
    try:
349
      self._CheckWorkerCount(wp, 1)
350

    
351
      ctx = ListBuilderContext()
352

    
353
      # Use static seed for this test
354
      rnd = random.Random(26279)
355

    
356
      data = {}
357
      for i in range(1, 333):
358
        prio = int(rnd.random() * 30)
359
        wp.AddTask((ctx, i), priority=prio)
360
        data.setdefault(prio, []).append(i)
361

    
362
        # Cause some distortion
363
        if i % 11 == 0:
364
          time.sleep(.001)
365
        if i % 41 == 0:
366
          wp.Quiesce()
367

    
368
      wp.Quiesce()
369

    
370
      self._CheckNoTasks(wp)
371

    
372
      # Check result
373
      ctx.lock.acquire()
374
      try:
375
        self.assertEqual(data, ctx.prioresult)
376
      finally:
377
        ctx.lock.release()
378

    
379
      self._CheckWorkerCount(wp, 1)
380
    finally:
381
      wp.TerminateWorkers()
382
      self._CheckWorkerCount(wp, 0)
383

    
384
  def testPriorityListSingleTasks(self):
385
    # Tests whether all tasks are run and, since we're only using a single
386
    # thread, whether everything is started in order and respects the priority
387
    wp = workerpool.WorkerPool("Test", 1, ListBuilderWorker)
388
    try:
389
      self._CheckWorkerCount(wp, 1)
390

    
391
      ctx = ListBuilderContext()
392

    
393
      # Use static seed for this test
394
      rnd = random.Random(26279)
395

    
396
      data = {}
397
      for i in range(1, 333):
398
        prio = int(rnd.random() * 30)
399
        wp.AddTask((ctx, i), priority=prio)
400
        data.setdefault(prio, []).append(i)
401

    
402
        # Cause some distortion
403
        if i % 11 == 0:
404
          time.sleep(.001)
405
        if i % 41 == 0:
406
          wp.Quiesce()
407

    
408
      wp.Quiesce()
409

    
410
      self._CheckNoTasks(wp)
411

    
412
      # Check result
413
      ctx.lock.acquire()
414
      try:
415
        self.assertEqual(data, ctx.prioresult)
416
      finally:
417
        ctx.lock.release()
418

    
419
      self._CheckWorkerCount(wp, 1)
420
    finally:
421
      wp.TerminateWorkers()
422
      self._CheckWorkerCount(wp, 0)
423

    
424
  def testDeferTask(self):
425
    # Tests whether all tasks are run and, since we're only using a single
426
    # thread, whether everything is started in order and respects the priority
427
    wp = workerpool.WorkerPool("Test", 1, DeferringWorker)
428
    try:
429
      self._CheckWorkerCount(wp, 1)
430

    
431
      ctx = DeferringTaskContext()
432

    
433
      # Use static seed for this test
434
      rnd = random.Random(14921)
435

    
436
      data = {}
437
      for i in range(1, 333):
438
        ctx.lock.acquire()
439
        try:
440
          if i % 5 == 0:
441
            ctx.samepriodefer[i] = True
442
        finally:
443
          ctx.lock.release()
444

    
445
        prio = int(rnd.random() * 30)
446
        wp.AddTask((ctx, i, prio), priority=50)
447
        data.setdefault(prio, set()).add(i)
448

    
449
        # Cause some distortion
450
        if i % 24 == 0:
451
          time.sleep(.001)
452
        if i % 31 == 0:
453
          wp.Quiesce()
454

    
455
      wp.Quiesce()
456

    
457
      self._CheckNoTasks(wp)
458

    
459
      # Check result
460
      ctx.lock.acquire()
461
      try:
462
        self.assertEqual(data, ctx.prioresult)
463
      finally:
464
        ctx.lock.release()
465

    
466
      self._CheckWorkerCount(wp, 1)
467
    finally:
468
      wp.TerminateWorkers()
469
      self._CheckWorkerCount(wp, 0)
470

    
471

    
472
if __name__ == '__main__':
473
  testutils.GanetiTestProgram()