Statistics
| Branch: | Tag: | Revision:

root / test / ganeti.workerpool_unittest.py @ 89e2b4d2

History | View | Annotate | Download (3.5 kB)

1
#!/usr/bin/python
2
#
3

    
4
# Copyright (C) 2008 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Script for unittesting the workerpool module"""
23

    
24
import unittest
25
import threading
26
import time
27
import sys
28
import zlib
29

    
30
from ganeti import workerpool
31

    
32
import testutils
33

    
34

    
35
class DummyBaseWorker(workerpool.BaseWorker):
36
  def RunTask(self, text):
37
    pass
38

    
39

    
40
class ChecksumContext:
41
  CHECKSUM_START = zlib.adler32("")
42

    
43
  def __init__(self):
44
    self.lock = threading.Condition(threading.Lock())
45
    self.checksum = self.CHECKSUM_START
46

    
47
  @staticmethod
48
  def UpdateChecksum(current, value):
49
    return zlib.adler32(str(value), current)
50

    
51

    
52
class ChecksumBaseWorker(workerpool.BaseWorker):
53
  def RunTask(self, ctx, number):
54
    ctx.lock.acquire()
55
    try:
56
      ctx.checksum = ctx.UpdateChecksum(ctx.checksum, number)
57
    finally:
58
      ctx.lock.release()
59

    
60

    
61
class TestWorkerpool(unittest.TestCase):
62
  """Workerpool tests"""
63

    
64
  def testDummy(self):
65
    wp = workerpool.WorkerPool("Test", 3, DummyBaseWorker)
66
    try:
67
      self._CheckWorkerCount(wp, 3)
68

    
69
      for i in range(10):
70
        wp.AddTask("Hello world %s" % i)
71

    
72
      wp.Quiesce()
73
    finally:
74
      wp.TerminateWorkers()
75
      self._CheckWorkerCount(wp, 0)
76

    
77
  def testNoTasks(self):
78
    wp = workerpool.WorkerPool("Test", 3, DummyBaseWorker)
79
    try:
80
      self._CheckWorkerCount(wp, 3)
81
      self._CheckNoTasks(wp)
82
    finally:
83
      wp.TerminateWorkers()
84
      self._CheckWorkerCount(wp, 0)
85

    
86
  def testNoTasksQuiesce(self):
87
    wp = workerpool.WorkerPool("Test", 3, DummyBaseWorker)
88
    try:
89
      self._CheckWorkerCount(wp, 3)
90
      self._CheckNoTasks(wp)
91
      wp.Quiesce()
92
      self._CheckNoTasks(wp)
93
    finally:
94
      wp.TerminateWorkers()
95
      self._CheckWorkerCount(wp, 0)
96

    
97
  def testChecksum(self):
98
    # Tests whether all tasks are run and, since we're only using a single
99
    # thread, whether everything is started in order.
100
    wp = workerpool.WorkerPool("Test", 1, ChecksumBaseWorker)
101
    try:
102
      self._CheckWorkerCount(wp, 1)
103

    
104
      ctx = ChecksumContext()
105
      checksum = ChecksumContext.CHECKSUM_START
106
      for i in range(1, 100):
107
        checksum = ChecksumContext.UpdateChecksum(checksum, i)
108
        wp.AddTask(ctx, i)
109

    
110
      wp.Quiesce()
111

    
112
      self._CheckNoTasks(wp)
113

    
114
      # Check sum
115
      ctx.lock.acquire()
116
      try:
117
        self.assertEqual(checksum, ctx.checksum)
118
      finally:
119
        ctx.lock.release()
120
    finally:
121
      wp.TerminateWorkers()
122
      self._CheckWorkerCount(wp, 0)
123

    
124
  def _CheckNoTasks(self, wp):
125
    wp._lock.acquire()
126
    try:
127
      # The task queue must be empty now
128
      self.failUnless(not wp._tasks)
129
    finally:
130
      wp._lock.release()
131

    
132
  def _CheckWorkerCount(self, wp, num_workers):
133
    wp._lock.acquire()
134
    try:
135
      self.assertEqual(len(wp._workers), num_workers)
136
    finally:
137
      wp._lock.release()
138

    
139

    
140
if __name__ == '__main__':
141
  testutils.GanetiTestProgram()