Statistics
| Branch: | Tag: | Revision:

root / test / ganeti.workerpool_unittest.py @ b2e8a4d9

History | View | Annotate | Download (5.3 kB)

1
#!/usr/bin/python
2
#
3

    
4
# Copyright (C) 2008 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Script for unittesting the workerpool module"""
23

    
24
import unittest
25
import threading
26
import time
27
import sys
28
import zlib
29

    
30
from ganeti import workerpool
31

    
32
import testutils
33

    
34
class CountingContext(object):
35

    
36
  def __init__(self):
37
    self._lock = threading.Condition(threading.Lock())
38
    self.done = 0
39

    
40
  def DoneTask(self):
41
    self._lock.acquire()
42
    try:
43
      self.done += 1
44
    finally:
45
      self._lock.release()
46

    
47
  def GetDoneTasks(self):
48
    self._lock.acquire()
49
    try:
50
      return self.done
51
    finally:
52
      self._lock.release()
53

    
54
  @staticmethod
55
  def UpdateChecksum(current, value):
56
    return zlib.adler32(str(value), current)
57

    
58

    
59
class CountingBaseWorker(workerpool.BaseWorker):
60

    
61
  def RunTask(self, ctx, text):
62
    ctx.DoneTask()
63

    
64

    
65
class ChecksumContext:
66
  CHECKSUM_START = zlib.adler32("")
67

    
68
  def __init__(self):
69
    self.lock = threading.Condition(threading.Lock())
70
    self.checksum = self.CHECKSUM_START
71

    
72
  @staticmethod
73
  def UpdateChecksum(current, value):
74
    return zlib.adler32(str(value), current)
75

    
76

    
77
class ChecksumBaseWorker(workerpool.BaseWorker):
78
  def RunTask(self, ctx, number):
79
    ctx.lock.acquire()
80
    try:
81
      ctx.checksum = ctx.UpdateChecksum(ctx.checksum, number)
82
    finally:
83
      ctx.lock.release()
84

    
85

    
86
class TestWorkerpool(unittest.TestCase):
87
  """Workerpool tests"""
88

    
89
  def testCounting(self):
90
    ctx = CountingContext()
91
    wp = workerpool.WorkerPool("Test", 3, CountingBaseWorker)
92
    try:
93
      self._CheckWorkerCount(wp, 3)
94

    
95
      for i in range(10):
96
        wp.AddTask((ctx, "Hello world %s" % i))
97

    
98
      wp.Quiesce()
99
    finally:
100
      wp.TerminateWorkers()
101
      self._CheckWorkerCount(wp, 0)
102

    
103
    self.assertEquals(ctx.GetDoneTasks(), 10)
104

    
105
  def testNoTasks(self):
106
    wp = workerpool.WorkerPool("Test", 3, CountingBaseWorker)
107
    try:
108
      self._CheckWorkerCount(wp, 3)
109
      self._CheckNoTasks(wp)
110
    finally:
111
      wp.TerminateWorkers()
112
      self._CheckWorkerCount(wp, 0)
113

    
114
  def testNoTasksQuiesce(self):
115
    wp = workerpool.WorkerPool("Test", 3, CountingBaseWorker)
116
    try:
117
      self._CheckWorkerCount(wp, 3)
118
      self._CheckNoTasks(wp)
119
      wp.Quiesce()
120
      self._CheckNoTasks(wp)
121
    finally:
122
      wp.TerminateWorkers()
123
      self._CheckWorkerCount(wp, 0)
124

    
125
  def testChecksum(self):
126
    # Tests whether all tasks are run and, since we're only using a single
127
    # thread, whether everything is started in order.
128
    wp = workerpool.WorkerPool("Test", 1, ChecksumBaseWorker)
129
    try:
130
      self._CheckWorkerCount(wp, 1)
131

    
132
      ctx = ChecksumContext()
133
      checksum = ChecksumContext.CHECKSUM_START
134
      for i in range(1, 100):
135
        checksum = ChecksumContext.UpdateChecksum(checksum, i)
136
        wp.AddTask((ctx, i))
137

    
138
      wp.Quiesce()
139

    
140
      self._CheckNoTasks(wp)
141

    
142
      # Check sum
143
      ctx.lock.acquire()
144
      try:
145
        self.assertEqual(checksum, ctx.checksum)
146
      finally:
147
        ctx.lock.release()
148
    finally:
149
      wp.TerminateWorkers()
150
      self._CheckWorkerCount(wp, 0)
151

    
152
  def testAddManyTasks(self):
153
    ctx = CountingContext()
154
    wp = workerpool.WorkerPool("Test", 3, CountingBaseWorker)
155
    try:
156
      self._CheckWorkerCount(wp, 3)
157

    
158
      wp.AddManyTasks([(ctx, "Hello world %s" % i, ) for i in range(10)])
159
      wp.AddTask((ctx, "A separate hello"))
160
      wp.AddTask((ctx, "Once more, hi!"))
161
      wp.AddManyTasks([(ctx, "Hello world %s" % i, ) for i in range(10)])
162

    
163
      wp.Quiesce()
164

    
165
      self._CheckNoTasks(wp)
166
    finally:
167
      wp.TerminateWorkers()
168
      self._CheckWorkerCount(wp, 0)
169

    
170
    self.assertEquals(ctx.GetDoneTasks(), 22)
171

    
172
  def testManyTasksSequence(self):
173
    ctx = CountingContext()
174
    wp = workerpool.WorkerPool("Test", 3, CountingBaseWorker)
175
    try:
176
      self._CheckWorkerCount(wp, 3)
177
      self.assertRaises(AssertionError, wp.AddManyTasks,
178
                        ["Hello world %s" % i for i in range(10)])
179
      self.assertRaises(AssertionError, wp.AddManyTasks,
180
                        [i for i in range(10)])
181

    
182
      wp.AddManyTasks([(ctx, "Hello world %s" % i, ) for i in range(10)])
183
      wp.AddTask((ctx, "A separate hello"))
184

    
185
      wp.Quiesce()
186

    
187
      self._CheckNoTasks(wp)
188
    finally:
189
      wp.TerminateWorkers()
190
      self._CheckWorkerCount(wp, 0)
191

    
192
    self.assertEquals(ctx.GetDoneTasks(), 11)
193

    
194
  def _CheckNoTasks(self, wp):
195
    wp._lock.acquire()
196
    try:
197
      # The task queue must be empty now
198
      self.failUnless(not wp._tasks)
199
    finally:
200
      wp._lock.release()
201

    
202
  def _CheckWorkerCount(self, wp, num_workers):
203
    wp._lock.acquire()
204
    try:
205
      self.assertEqual(len(wp._workers), num_workers)
206
    finally:
207
      wp._lock.release()
208

    
209

    
210
if __name__ == '__main__':
211
  testutils.GanetiTestProgram()