Modify gnt-node add to call external script
[ganeti-local] / test / ganeti.workerpool_unittest.py
1 #!/usr/bin/python
2 #
3
4 # Copyright (C) 2008 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Script for unittesting the workerpool module"""
23
24 import unittest
25 import threading
26 import time
27 import sys
28 import zlib
29
30 from ganeti import workerpool
31
32 import testutils
33
34 class CountingContext(object):
35
36   def __init__(self):
37     self._lock = threading.Condition(threading.Lock())
38     self.done = 0
39
40   def DoneTask(self):
41     self._lock.acquire()
42     try:
43       self.done += 1
44     finally:
45       self._lock.release()
46
47   def GetDoneTasks(self):
48     self._lock.acquire()
49     try:
50       return self.done
51     finally:
52       self._lock.release()
53
54   @staticmethod
55   def UpdateChecksum(current, value):
56     return zlib.adler32(str(value), current)
57
58
59 class CountingBaseWorker(workerpool.BaseWorker):
60
61   def RunTask(self, ctx, text):
62     ctx.DoneTask()
63
64
65 class ChecksumContext:
66   CHECKSUM_START = zlib.adler32("")
67
68   def __init__(self):
69     self.lock = threading.Condition(threading.Lock())
70     self.checksum = self.CHECKSUM_START
71
72   @staticmethod
73   def UpdateChecksum(current, value):
74     return zlib.adler32(str(value), current)
75
76
77 class ChecksumBaseWorker(workerpool.BaseWorker):
78   def RunTask(self, ctx, number):
79     ctx.lock.acquire()
80     try:
81       ctx.checksum = ctx.UpdateChecksum(ctx.checksum, number)
82     finally:
83       ctx.lock.release()
84
85
86 class TestWorkerpool(unittest.TestCase):
87   """Workerpool tests"""
88
89   def testCounting(self):
90     ctx = CountingContext()
91     wp = workerpool.WorkerPool("Test", 3, CountingBaseWorker)
92     try:
93       self._CheckWorkerCount(wp, 3)
94
95       for i in range(10):
96         wp.AddTask((ctx, "Hello world %s" % i))
97
98       wp.Quiesce()
99     finally:
100       wp.TerminateWorkers()
101       self._CheckWorkerCount(wp, 0)
102
103     self.assertEquals(ctx.GetDoneTasks(), 10)
104
105   def testNoTasks(self):
106     wp = workerpool.WorkerPool("Test", 3, CountingBaseWorker)
107     try:
108       self._CheckWorkerCount(wp, 3)
109       self._CheckNoTasks(wp)
110     finally:
111       wp.TerminateWorkers()
112       self._CheckWorkerCount(wp, 0)
113
114   def testNoTasksQuiesce(self):
115     wp = workerpool.WorkerPool("Test", 3, CountingBaseWorker)
116     try:
117       self._CheckWorkerCount(wp, 3)
118       self._CheckNoTasks(wp)
119       wp.Quiesce()
120       self._CheckNoTasks(wp)
121     finally:
122       wp.TerminateWorkers()
123       self._CheckWorkerCount(wp, 0)
124
125   def testChecksum(self):
126     # Tests whether all tasks are run and, since we're only using a single
127     # thread, whether everything is started in order.
128     wp = workerpool.WorkerPool("Test", 1, ChecksumBaseWorker)
129     try:
130       self._CheckWorkerCount(wp, 1)
131
132       ctx = ChecksumContext()
133       checksum = ChecksumContext.CHECKSUM_START
134       for i in range(1, 100):
135         checksum = ChecksumContext.UpdateChecksum(checksum, i)
136         wp.AddTask((ctx, i))
137
138       wp.Quiesce()
139
140       self._CheckNoTasks(wp)
141
142       # Check sum
143       ctx.lock.acquire()
144       try:
145         self.assertEqual(checksum, ctx.checksum)
146       finally:
147         ctx.lock.release()
148     finally:
149       wp.TerminateWorkers()
150       self._CheckWorkerCount(wp, 0)
151
152   def testAddManyTasks(self):
153     ctx = CountingContext()
154     wp = workerpool.WorkerPool("Test", 3, CountingBaseWorker)
155     try:
156       self._CheckWorkerCount(wp, 3)
157
158       wp.AddManyTasks([(ctx, "Hello world %s" % i, ) for i in range(10)])
159       wp.AddTask((ctx, "A separate hello"))
160       wp.AddTask((ctx, "Once more, hi!"))
161       wp.AddManyTasks([(ctx, "Hello world %s" % i, ) for i in range(10)])
162
163       wp.Quiesce()
164
165       self._CheckNoTasks(wp)
166     finally:
167       wp.TerminateWorkers()
168       self._CheckWorkerCount(wp, 0)
169
170     self.assertEquals(ctx.GetDoneTasks(), 22)
171
172   def testManyTasksSequence(self):
173     ctx = CountingContext()
174     wp = workerpool.WorkerPool("Test", 3, CountingBaseWorker)
175     try:
176       self._CheckWorkerCount(wp, 3)
177       self.assertRaises(AssertionError, wp.AddManyTasks,
178                         ["Hello world %s" % i for i in range(10)])
179       self.assertRaises(AssertionError, wp.AddManyTasks,
180                         [i for i in range(10)])
181
182       wp.AddManyTasks([(ctx, "Hello world %s" % i, ) for i in range(10)])
183       wp.AddTask((ctx, "A separate hello"))
184
185       wp.Quiesce()
186
187       self._CheckNoTasks(wp)
188     finally:
189       wp.TerminateWorkers()
190       self._CheckWorkerCount(wp, 0)
191
192     self.assertEquals(ctx.GetDoneTasks(), 11)
193
194   def _CheckNoTasks(self, wp):
195     wp._lock.acquire()
196     try:
197       # The task queue must be empty now
198       self.failUnless(not wp._tasks)
199     finally:
200       wp._lock.release()
201
202   def _CheckWorkerCount(self, wp, num_workers):
203     wp._lock.acquire()
204     try:
205       self.assertEqual(len(wp._workers), num_workers)
206     finally:
207       wp._lock.release()
208
209
210 if __name__ == '__main__':
211   testutils.GanetiTestProgram()