Revision 52c47e4e test/ganeti.workerpool_unittest.py

b/test/ganeti.workerpool_unittest.py
1 1
#!/usr/bin/python
2 2
#
3 3

  
4
# Copyright (C) 2008 Google Inc.
4
# Copyright (C) 2008, 2009, 2010 Google Inc.
5 5
#
6 6
# This program is free software; you can redistribute it and/or modify
7 7
# it under the terms of the GNU General Public License as published by
......
26 26
import time
27 27
import sys
28 28
import zlib
29
import random
29 30

  
30 31
from ganeti import workerpool
32
from ganeti import errors
31 33

  
32 34
import testutils
33 35

  
34
class CountingContext(object):
35 36

  
37
class CountingContext(object):
36 38
  def __init__(self):
37 39
    self._lock = threading.Condition(threading.Lock())
38 40
    self.done = 0
......
57 59

  
58 60

  
59 61
class CountingBaseWorker(workerpool.BaseWorker):
60

  
61 62
  def RunTask(self, ctx, text):
62 63
    ctx.DoneTask()
63 64

  
......
83 84
      ctx.lock.release()
84 85

  
85 86

  
87
class ListBuilderContext:
88
  def __init__(self):
89
    self.lock = threading.Lock()
90
    self.result = []
91
    self.prioresult = {}
92

  
93

  
94
class ListBuilderWorker(workerpool.BaseWorker):
95
  def RunTask(self, ctx, data):
96
    ctx.lock.acquire()
97
    try:
98
      ctx.result.append((self.GetCurrentPriority(), data))
99
      ctx.prioresult.setdefault(self.GetCurrentPriority(), []).append(data)
100
    finally:
101
      ctx.lock.release()
102

  
103

  
104
class DeferringTaskContext:
105
  def __init__(self):
106
    self.lock = threading.Lock()
107
    self.prioresult = {}
108
    self.samepriodefer = {}
109

  
110

  
111
class DeferringWorker(workerpool.BaseWorker):
112
  def RunTask(self, ctx, num, targetprio):
113
    ctx.lock.acquire()
114
    try:
115
      if num in ctx.samepriodefer:
116
        del ctx.samepriodefer[num]
117
        raise workerpool.DeferTask()
118

  
119
      if self.GetCurrentPriority() > targetprio:
120
        raise workerpool.DeferTask(priority=self.GetCurrentPriority() - 1)
121

  
122
      ctx.prioresult.setdefault(self.GetCurrentPriority(), set()).add(num)
123
    finally:
124
      ctx.lock.release()
125

  
126

  
86 127
class TestWorkerpool(unittest.TestCase):
87 128
  """Workerpool tests"""
88 129

  
......
206 247
    finally:
207 248
      wp._lock.release()
208 249

  
250
  def testPriorityChecksum(self):
251
    # Tests whether all tasks are run and, since we're only using a single
252
    # thread, whether everything is started in order and respects the priority
253
    wp = workerpool.WorkerPool("Test", 1, ChecksumBaseWorker)
254
    try:
255
      self._CheckWorkerCount(wp, 1)
256

  
257
      ctx = ChecksumContext()
258

  
259
      data = {}
260
      tasks = []
261
      priorities = []
262
      for i in range(1, 333):
263
        prio = i % 7
264
        tasks.append((ctx, i))
265
        priorities.append(prio)
266
        data.setdefault(prio, []).append(i)
267

  
268
      wp.AddManyTasks(tasks, priority=priorities)
269

  
270
      wp.Quiesce()
271

  
272
      self._CheckNoTasks(wp)
273

  
274
      # Check sum
275
      ctx.lock.acquire()
276
      try:
277
        checksum = ChecksumContext.CHECKSUM_START
278
        for priority in sorted(data.keys()):
279
          for i in data[priority]:
280
            checksum = ChecksumContext.UpdateChecksum(checksum, i)
281

  
282
        self.assertEqual(checksum, ctx.checksum)
283
      finally:
284
        ctx.lock.release()
285

  
286
      self._CheckWorkerCount(wp, 1)
287
    finally:
288
      wp.TerminateWorkers()
289
      self._CheckWorkerCount(wp, 0)
290

  
291
  def testPriorityListManyTasks(self):
292
    # Tests whether all tasks are run and, since we're only using a single
293
    # thread, whether everything is started in order and respects the priority
294
    wp = workerpool.WorkerPool("Test", 1, ListBuilderWorker)
295
    try:
296
      self._CheckWorkerCount(wp, 1)
297

  
298
      ctx = ListBuilderContext()
299

  
300
      # Use static seed for this test
301
      rnd = random.Random(0)
302

  
303
      data = {}
304
      tasks = []
305
      priorities = []
306
      for i in range(1, 333):
307
        prio = int(rnd.random() * 10)
308
        tasks.append((ctx, i))
309
        priorities.append(prio)
310
        data.setdefault(prio, []).append((prio, i))
311

  
312
      wp.AddManyTasks(tasks, priority=priorities)
313

  
314
      self.assertRaises(errors.ProgrammerError, wp.AddManyTasks,
315
                        [("x", ), ("y", )], priority=[1] * 5)
316

  
317
      wp.Quiesce()
318

  
319
      self._CheckNoTasks(wp)
320

  
321
      # Check result
322
      ctx.lock.acquire()
323
      try:
324
        expresult = []
325
        for priority in sorted(data.keys()):
326
          expresult.extend(data[priority])
327

  
328
        self.assertEqual(expresult, ctx.result)
329
      finally:
330
        ctx.lock.release()
331

  
332
      self._CheckWorkerCount(wp, 1)
333
    finally:
334
      wp.TerminateWorkers()
335
      self._CheckWorkerCount(wp, 0)
336

  
337
  def testPriorityListSingleTasks(self):
338
    # Tests whether all tasks are run and, since we're only using a single
339
    # thread, whether everything is started in order and respects the priority
340
    wp = workerpool.WorkerPool("Test", 1, ListBuilderWorker)
341
    try:
342
      self._CheckWorkerCount(wp, 1)
343

  
344
      ctx = ListBuilderContext()
345

  
346
      # Use static seed for this test
347
      rnd = random.Random(26279)
348

  
349
      data = {}
350
      for i in range(1, 333):
351
        prio = int(rnd.random() * 30)
352
        wp.AddTask((ctx, i), priority=prio)
353
        data.setdefault(prio, []).append(i)
354

  
355
        # Cause some distortion
356
        if i % 11 == 0:
357
          time.sleep(.001)
358
        if i % 41 == 0:
359
          wp.Quiesce()
360

  
361
      wp.Quiesce()
362

  
363
      self._CheckNoTasks(wp)
364

  
365
      # Check result
366
      ctx.lock.acquire()
367
      try:
368
        self.assertEqual(data, ctx.prioresult)
369
      finally:
370
        ctx.lock.release()
371

  
372
      self._CheckWorkerCount(wp, 1)
373
    finally:
374
      wp.TerminateWorkers()
375
      self._CheckWorkerCount(wp, 0)
376

  
377
  def testPriorityListSingleTasks(self):
378
    # Tests whether all tasks are run and, since we're only using a single
379
    # thread, whether everything is started in order and respects the priority
380
    wp = workerpool.WorkerPool("Test", 1, ListBuilderWorker)
381
    try:
382
      self._CheckWorkerCount(wp, 1)
383

  
384
      ctx = ListBuilderContext()
385

  
386
      # Use static seed for this test
387
      rnd = random.Random(26279)
388

  
389
      data = {}
390
      for i in range(1, 333):
391
        prio = int(rnd.random() * 30)
392
        wp.AddTask((ctx, i), priority=prio)
393
        data.setdefault(prio, []).append(i)
394

  
395
        # Cause some distortion
396
        if i % 11 == 0:
397
          time.sleep(.001)
398
        if i % 41 == 0:
399
          wp.Quiesce()
400

  
401
      wp.Quiesce()
402

  
403
      self._CheckNoTasks(wp)
404

  
405
      # Check result
406
      ctx.lock.acquire()
407
      try:
408
        self.assertEqual(data, ctx.prioresult)
409
      finally:
410
        ctx.lock.release()
411

  
412
      self._CheckWorkerCount(wp, 1)
413
    finally:
414
      wp.TerminateWorkers()
415
      self._CheckWorkerCount(wp, 0)
416

  
417
  def testDeferTask(self):
418
    # Tests whether all tasks are run and, since we're only using a single
419
    # thread, whether everything is started in order and respects the priority
420
    wp = workerpool.WorkerPool("Test", 1, DeferringWorker)
421
    try:
422
      self._CheckWorkerCount(wp, 1)
423

  
424
      ctx = DeferringTaskContext()
425

  
426
      # Use static seed for this test
427
      rnd = random.Random(14921)
428

  
429
      data = {}
430
      for i in range(1, 333):
431
        ctx.lock.acquire()
432
        try:
433
          if i % 5 == 0:
434
            ctx.samepriodefer[i] = True
435
        finally:
436
          ctx.lock.release()
437

  
438
        prio = int(rnd.random() * 30)
439
        wp.AddTask((ctx, i, prio), priority=50)
440
        data.setdefault(prio, set()).add(i)
441

  
442
        # Cause some distortion
443
        if i % 24 == 0:
444
          time.sleep(.001)
445
        if i % 31 == 0:
446
          wp.Quiesce()
447

  
448
      wp.Quiesce()
449

  
450
      self._CheckNoTasks(wp)
451

  
452
      # Check result
453
      ctx.lock.acquire()
454
      try:
455
        self.assertEqual(data, ctx.prioresult)
456
      finally:
457
        ctx.lock.release()
458

  
459
      self._CheckWorkerCount(wp, 1)
460
    finally:
461
      wp.TerminateWorkers()
462
      self._CheckWorkerCount(wp, 0)
463

  
209 464

  
210 465
if __name__ == '__main__':
211 466
  testutils.GanetiTestProgram()

Also available in: Unified diff