Statistics
| Branch: | Tag: | Revision:

root / test / ganeti.workerpool_unittest.py @ 7ed3248b

History | View | Annotate | Download (4.6 kB)

1
#!/usr/bin/python
2
#
3

    
4
# Copyright (C) 2008 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Script for unittesting the workerpool module"""
23

    
24
import unittest
25
import threading
26
import time
27
import sys
28
import zlib
29

    
30
from ganeti import workerpool
31

    
32
import testutils
33

    
34
class CountingContext(object):
35

    
36
  def __init__(self):
37
    self._lock = threading.Condition(threading.Lock())
38
    self.done = 0
39

    
40
  def DoneTask(self):
41
    self._lock.acquire()
42
    try:
43
      self.done += 1
44
    finally:
45
      self._lock.release()
46

    
47
  def GetDoneTasks(self):
48
    self._lock.acquire()
49
    try:
50
      return self.done
51
    finally:
52
      self._lock.release()
53

    
54
  @staticmethod
55
  def UpdateChecksum(current, value):
56
    return zlib.adler32(str(value), current)
57

    
58

    
59
class CountingBaseWorker(workerpool.BaseWorker):
60

    
61
  def RunTask(self, ctx, text):
62
    ctx.DoneTask()
63

    
64

    
65
class ChecksumContext:
66
  CHECKSUM_START = zlib.adler32("")
67

    
68
  def __init__(self):
69
    self.lock = threading.Condition(threading.Lock())
70
    self.checksum = self.CHECKSUM_START
71

    
72
  @staticmethod
73
  def UpdateChecksum(current, value):
74
    return zlib.adler32(str(value), current)
75

    
76

    
77
class ChecksumBaseWorker(workerpool.BaseWorker):
78
  def RunTask(self, ctx, number):
79
    ctx.lock.acquire()
80
    try:
81
      ctx.checksum = ctx.UpdateChecksum(ctx.checksum, number)
82
    finally:
83
      ctx.lock.release()
84

    
85

    
86
class TestWorkerpool(unittest.TestCase):
87
  """Workerpool tests"""
88

    
89
  def testCounting(self):
90
    ctx = CountingContext()
91
    wp = workerpool.WorkerPool("Test", 3, CountingBaseWorker)
92
    try:
93
      self._CheckWorkerCount(wp, 3)
94

    
95
      for i in range(10):
96
        wp.AddTask(ctx, "Hello world %s" % i)
97

    
98
      wp.Quiesce()
99
    finally:
100
      wp.TerminateWorkers()
101
      self._CheckWorkerCount(wp, 0)
102

    
103
    self.assertEquals(ctx.GetDoneTasks(), 10)
104

    
105
  def testNoTasks(self):
106
    wp = workerpool.WorkerPool("Test", 3, CountingBaseWorker)
107
    try:
108
      self._CheckWorkerCount(wp, 3)
109
      self._CheckNoTasks(wp)
110
    finally:
111
      wp.TerminateWorkers()
112
      self._CheckWorkerCount(wp, 0)
113

    
114
  def testNoTasksQuiesce(self):
115
    wp = workerpool.WorkerPool("Test", 3, CountingBaseWorker)
116
    try:
117
      self._CheckWorkerCount(wp, 3)
118
      self._CheckNoTasks(wp)
119
      wp.Quiesce()
120
      self._CheckNoTasks(wp)
121
    finally:
122
      wp.TerminateWorkers()
123
      self._CheckWorkerCount(wp, 0)
124

    
125
  def testChecksum(self):
126
    # Tests whether all tasks are run and, since we're only using a single
127
    # thread, whether everything is started in order.
128
    wp = workerpool.WorkerPool("Test", 1, ChecksumBaseWorker)
129
    try:
130
      self._CheckWorkerCount(wp, 1)
131

    
132
      ctx = ChecksumContext()
133
      checksum = ChecksumContext.CHECKSUM_START
134
      for i in range(1, 100):
135
        checksum = ChecksumContext.UpdateChecksum(checksum, i)
136
        wp.AddTask(ctx, i)
137

    
138
      wp.Quiesce()
139

    
140
      self._CheckNoTasks(wp)
141

    
142
      # Check sum
143
      ctx.lock.acquire()
144
      try:
145
        self.assertEqual(checksum, ctx.checksum)
146
      finally:
147
        ctx.lock.release()
148
    finally:
149
      wp.TerminateWorkers()
150
      self._CheckWorkerCount(wp, 0)
151

    
152
  def testAddManyTasks(self):
153
    ctx = CountingContext()
154
    wp = workerpool.WorkerPool("Test", 3, CountingBaseWorker)
155
    try:
156
      self._CheckWorkerCount(wp, 3)
157

    
158
      wp.AddManyTasks([(ctx, "Hello world %s" % i, ) for i in range(10)])
159
      wp.AddTask(ctx, "A separate hello")
160
      wp.AddTask(ctx, "Once more, hi!")
161
      wp.AddManyTasks([(ctx, "Hello world %s" % i, ) for i in range(10)])
162

    
163
      wp.Quiesce()
164

    
165
      self._CheckNoTasks(wp)
166
    finally:
167
      wp.TerminateWorkers()
168
      self._CheckWorkerCount(wp, 0)
169

    
170
    self.assertEquals(ctx.GetDoneTasks(), 22)
171

    
172
  def _CheckNoTasks(self, wp):
173
    wp._lock.acquire()
174
    try:
175
      # The task queue must be empty now
176
      self.failUnless(not wp._tasks)
177
    finally:
178
      wp._lock.release()
179

    
180
  def _CheckWorkerCount(self, wp, num_workers):
181
    wp._lock.acquire()
182
    try:
183
      self.assertEqual(len(wp._workers), num_workers)
184
    finally:
185
      wp._lock.release()
186

    
187

    
188
if __name__ == '__main__':
189
  testutils.GanetiTestProgram()