Statistics
| Branch: | Tag: | Revision:

root / test / ganeti.workerpool_unittest.py @ 76094e37

History | View | Annotate | Download (3.4 kB)

1
#!/usr/bin/python
2
#
3

    
4
# Copyright (C) 2008 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Script for unittesting the workerpool module"""
23

    
24
import unittest
25
import threading
26
import time
27
import sys
28
import zlib
29

    
30
from ganeti import workerpool
31

    
32

    
33
class DummyBaseWorker(workerpool.BaseWorker):
34
  def RunTask(self, text):
35
    pass
36

    
37

    
38
class ChecksumContext:
39
  CHECKSUM_START = zlib.adler32("")
40

    
41
  def __init__(self):
42
    self.lock = threading.Condition(threading.Lock())
43
    self.checksum = self.CHECKSUM_START
44

    
45
  @staticmethod
46
  def UpdateChecksum(current, value):
47
    return zlib.adler32(str(value), current)
48

    
49

    
50
class ChecksumBaseWorker(workerpool.BaseWorker):
51
  def RunTask(self, ctx, number):
52
    ctx.lock.acquire()
53
    try:
54
      ctx.checksum = ctx.UpdateChecksum(ctx.checksum, number)
55
    finally:
56
      ctx.lock.release()
57

    
58

    
59
class TestWorkerpool(unittest.TestCase):
60
  """Workerpool tests"""
61

    
62
  def testDummy(self):
63
    wp = workerpool.WorkerPool(3, DummyBaseWorker)
64
    try:
65
      self._CheckWorkerCount(wp, 3)
66

    
67
      for i in xrange(10):
68
        wp.AddTask("Hello world %s" % i)
69

    
70
      wp.Quiesce()
71
    finally:
72
      wp.TerminateWorkers()
73
      self._CheckWorkerCount(wp, 0)
74

    
75
  def testNoTasks(self):
76
    wp = workerpool.WorkerPool(3, DummyBaseWorker)
77
    try:
78
      self._CheckWorkerCount(wp, 3)
79
      self._CheckNoTasks(wp)
80
    finally:
81
      wp.TerminateWorkers()
82
      self._CheckWorkerCount(wp, 0)
83

    
84
  def testNoTasksQuiesce(self):
85
    wp = workerpool.WorkerPool(3, DummyBaseWorker)
86
    try:
87
      self._CheckWorkerCount(wp, 3)
88
      self._CheckNoTasks(wp)
89
      wp.Quiesce()
90
      self._CheckNoTasks(wp)
91
    finally:
92
      wp.TerminateWorkers()
93
      self._CheckWorkerCount(wp, 0)
94

    
95
  def testChecksum(self):
96
    # Tests whether all tasks are run and, since we're only using a single
97
    # thread, whether everything is started in order.
98
    wp = workerpool.WorkerPool(1, ChecksumBaseWorker)
99
    try:
100
      self._CheckWorkerCount(wp, 1)
101

    
102
      ctx = ChecksumContext()
103
      checksum = ChecksumContext.CHECKSUM_START
104
      for i in xrange(1, 100):
105
        checksum = ChecksumContext.UpdateChecksum(checksum, i)
106
        wp.AddTask(ctx, i)
107

    
108
      wp.Quiesce()
109

    
110
      self._CheckNoTasks(wp)
111

    
112
      # Check sum
113
      ctx.lock.acquire()
114
      try:
115
        self.assertEqual(checksum, ctx.checksum)
116
      finally:
117
        ctx.lock.release()
118
    finally:
119
      wp.TerminateWorkers()
120
      self._CheckWorkerCount(wp, 0)
121

    
122
  def _CheckNoTasks(self, wp):
123
    wp._lock.acquire()
124
    try:
125
      # The task queue must be empty now
126
      self.failUnless(not wp._tasks)
127
    finally:
128
      wp._lock.release()
129

    
130
  def _CheckWorkerCount(self, wp, num_workers):
131
    wp._lock.acquire()
132
    try:
133
      self.assertEqual(len(wp._workers), num_workers)
134
    finally:
135
      wp._lock.release()
136

    
137

    
138
if __name__ == '__main__':
139
  unittest.main()