Revision 76094e37

b/Makefile.am
75 75
	lib/serializer.py \
76 76
	lib/ssconf.py \
77 77
	lib/ssh.py \
78
	lib/utils.py
78
	lib/utils.py \
79
	lib/workerpool.py
79 80

  
80 81
hypervisor_PYTHON = \
81 82
	lib/hypervisor/__init__.py \
......
172 173
	test/ganeti.ssh_unittest.py \
173 174
	test/ganeti.locking_unittest.py \
174 175
	test/ganeti.serializer_unittest.py \
176
	test/ganeti.workerpool_unittest.py \
175 177
	test/ganeti.constants_unittest.py
176 178

  
177 179
nodist_TESTS =
b/lib/workerpool.py
1
#
2
#
3

  
4
# Copyright (C) 2008 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

  
21

  
22
"""Base classes for worker pools.
23

  
24
"""
25

  
26
import collections
27
import logging
28
import threading
29

  
30
from ganeti import errors
31
from ganeti import utils
32

  
33

  
34
class BaseWorker(threading.Thread, object):
35
  """Base worker class for worker pools.
36

  
37
  Users of a worker pool must override RunTask in a subclass.
38

  
39
  """
40
  def __init__(self, pool, worker_id):
41
    """Constructor for BaseWorker thread.
42

  
43
    Args:
44
    - pool: Parent worker pool
45
    - worker_id: Identifier for this worker
46

  
47
    """
48
    super(BaseWorker, self).__init__()
49
    self.pool = pool
50
    self.worker_id = worker_id
51

  
52
    # Also used by WorkerPool
53
    self._current_task = None
54

  
55
  def ShouldTerminate(self):
56
    """Returns whether a worker should terminate.
57

  
58
    """
59
    return self.pool.ShouldWorkerTerminate(self)
60

  
61
  def run(self):
62
    """Main thread function.
63

  
64
    Waits for new tasks to show up in the queue.
65

  
66
    """
67
    pool = self.pool
68

  
69
    assert self._current_task is None
70

  
71
    while True:
72
      try:
73
        # We wait on lock to be told either terminate or do a task.
74
        pool._lock.acquire()
75
        try:
76
          if pool._ShouldWorkerTerminateUnlocked(self):
77
            break
78

  
79
          # We only wait if there's no task for us.
80
          if not pool._tasks:
81
            # wait() releases the lock and sleeps until notified
82
            pool._lock.wait()
83

  
84
            # Were we woken up in order to terminate?
85
            if pool._ShouldWorkerTerminateUnlocked(self):
86
              break
87

  
88
            if not pool._tasks:
89
              # Spurious notification, ignore
90
              continue
91

  
92
          # Get task from queue and tell pool about it
93
          try:
94
            self._current_task = pool._tasks.popleft()
95
          finally:
96
            pool._lock.notifyAll()
97
        finally:
98
          pool._lock.release()
99

  
100
        # Run the actual task
101
        try:
102
          self.RunTask(*self._current_task)
103
        except:
104
          logging.error("Worker %s: Caught unhandled exception",
105
                        self.worker_id, exc_info=True)
106
      finally:
107
        self._current_task = None
108

  
109
        # Notify pool
110
        pool._lock.acquire()
111
        try:
112
          pool._lock.notifyAll()
113
        finally:
114
          pool._lock.release()
115

  
116
  def RunTask(self, *args):
117
    """Function called to start a task.
118

  
119
    """
120
    raise NotImplementedError()
121

  
122

  
123
class WorkerPool(object):
124
  """Worker pool with a queue.
125

  
126
  This class is thread-safe.
127

  
128
  Tasks are guaranteed to be started in the order in which they're added to the
129
  pool. Due to the nature of threading, they're not guaranteed to finish in the
130
  same order.
131

  
132
  """
133
  def __init__(self, num_workers, worker_class):
134
    """Constructor for worker pool.
135

  
136
    Args:
137
    - num_workers: Number of workers to be started (dynamic resizing is not
138
                   yet implemented)
139
    - worker_class: Class to be instantiated for workers; should derive from
140
                    BaseWorker
141

  
142
    """
143
    # Some of these variables are accessed by BaseWorker
144
    self._lock = threading.Condition(threading.Lock())
145
    self._worker_class = worker_class
146
    self._last_worker_id = 0
147
    self._workers = []
148
    self._quiescing = False
149

  
150
    # Terminating workers
151
    self._termworkers = []
152

  
153
    # Queued tasks
154
    self._tasks = collections.deque()
155

  
156
    # Start workers
157
    self.Resize(num_workers)
158

  
159
  # TODO: Implement dynamic resizing?
160

  
161
  def AddTask(self, *args):
162
    """Adds a task to the queue.
163

  
164
    Args:
165
    - *args: Arguments passed to BaseWorker.RunTask
166

  
167
    """
168
    self._lock.acquire()
169
    try:
170
      # Don't add new tasks while we're quiescing
171
      while self._quiescing:
172
        self._lock.wait()
173

  
174
      # Add task to internal queue
175
      self._tasks.append(args)
176
      self._lock.notify()
177
    finally:
178
      self._lock.release()
179

  
180
  def _ShouldWorkerTerminateUnlocked(self, worker):
181
    """Returns whether a worker should terminate.
182

  
183
    """
184
    return (worker in self._termworkers)
185

  
186
  def ShouldWorkerTerminate(self, worker):
187
    """Returns whether a worker should terminate.
188

  
189
    """
190
    self._lock.acquire()
191
    try:
192
      return self._ShouldWorkerTerminateUnlocked(self)
193
    finally:
194
      self._lock.release()
195

  
196
  def _HasRunningTasksUnlocked(self):
197
    """Checks whether there's a task running in a worker.
198

  
199
    """
200
    for worker in self._workers + self._termworkers:
201
      if worker._current_task is not None:
202
        return True
203
    return False
204

  
205
  def Quiesce(self):
206
    """Waits until the task queue is empty.
207

  
208
    """
209
    self._lock.acquire()
210
    try:
211
      self._quiescing = True
212

  
213
      # Wait while there are tasks pending or running
214
      while self._tasks or self._HasRunningTasksUnlocked():
215
        self._lock.wait()
216

  
217
    finally:
218
      self._quiescing = False
219

  
220
      # Make sure AddTasks continues in case it was waiting
221
      self._lock.notifyAll()
222

  
223
      self._lock.release()
224

  
225
  def _NewWorkerIdUnlocked(self):
226
    self._last_worker_id += 1
227
    return self._last_worker_id
228

  
229
  def _ResizeUnlocked(self, num_workers):
230
    """Changes the number of workers.
231

  
232
    """
233
    assert num_workers >= 0, "num_workers must be >= 0"
234

  
235
    logging.debug("Resizing to %s workers", num_workers)
236

  
237
    current_count = len(self._workers)
238

  
239
    if current_count == num_workers:
240
      # Nothing to do
241
      pass
242

  
243
    elif current_count > num_workers:
244
      if num_workers == 0:
245
        # Create copy of list to iterate over while lock isn't held.
246
        termworkers = self._workers[:]
247
        del self._workers[:]
248
      else:
249
        # TODO: Implement partial downsizing
250
        raise NotImplementedError()
251
        #termworkers = ...
252

  
253
      self._termworkers += termworkers
254

  
255
      # Notify workers that something has changed
256
      self._lock.notifyAll()
257

  
258
      # Join all terminating workers
259
      self._lock.release()
260
      try:
261
        for worker in termworkers:
262
          worker.join()
263
      finally:
264
        self._lock.acquire()
265

  
266
      # Remove terminated threads. This could be done in a more efficient way
267
      # (del self._termworkers[:]), but checking worker.isAlive() makes sure we
268
      # don't leave zombie threads around.
269
      for worker in termworkers:
270
        assert worker in self._termworkers, ("Worker not in list of"
271
                                             " terminating workers")
272
        if not worker.isAlive():
273
          self._termworkers.remove(worker)
274

  
275
      assert not self._termworkers, "Zombie worker detected"
276

  
277
    elif current_count < num_workers:
278
      # Create (num_workers - current_count) new workers
279
      for i in xrange(num_workers - current_count):
280
        worker = self._worker_class(self, self._NewWorkerIdUnlocked())
281
        self._workers.append(worker)
282
        worker.start()
283

  
284
  def Resize(self, num_workers):
285
    """Changes the number of workers in the pool.
286

  
287
    Args:
288
    - num_workers: New number of workers
289

  
290
    """
291
    self._lock.acquire()
292
    try:
293
      return self._ResizeUnlocked(num_workers)
294
    finally:
295
      self._lock.release()
296

  
297
  def TerminateWorkers(self):
298
    """Terminate all worker threads.
299

  
300
    Unstarted tasks will be ignored.
301

  
302
    """
303
    logging.debug("Terminating all workers")
304

  
305
    self._lock.acquire()
306
    try:
307
      self._ResizeUnlocked(0)
308

  
309
      if self._tasks:
310
        logging.debug("There are %s tasks left", len(self._tasks))
311
    finally:
312
      self._lock.release()
313

  
314
    logging.debug("All workers terminated")
b/test/ganeti.workerpool_unittest.py
1
#!/usr/bin/python
2
#
3

  
4
# Copyright (C) 2008 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

  
21

  
22
"""Script for unittesting the workerpool module"""
23

  
24
import unittest
25
import threading
26
import time
27
import sys
28
import zlib
29

  
30
from ganeti import workerpool
31

  
32

  
33
class DummyBaseWorker(workerpool.BaseWorker):
34
  def RunTask(self, text):
35
    pass
36

  
37

  
38
class ChecksumContext:
39
  CHECKSUM_START = zlib.adler32("")
40

  
41
  def __init__(self):
42
    self.lock = threading.Condition(threading.Lock())
43
    self.checksum = self.CHECKSUM_START
44

  
45
  @staticmethod
46
  def UpdateChecksum(current, value):
47
    return zlib.adler32(str(value), current)
48

  
49

  
50
class ChecksumBaseWorker(workerpool.BaseWorker):
51
  def RunTask(self, ctx, number):
52
    ctx.lock.acquire()
53
    try:
54
      ctx.checksum = ctx.UpdateChecksum(ctx.checksum, number)
55
    finally:
56
      ctx.lock.release()
57

  
58

  
59
class TestWorkerpool(unittest.TestCase):
60
  """Workerpool tests"""
61

  
62
  def testDummy(self):
63
    wp = workerpool.WorkerPool(3, DummyBaseWorker)
64
    try:
65
      self._CheckWorkerCount(wp, 3)
66

  
67
      for i in xrange(10):
68
        wp.AddTask("Hello world %s" % i)
69

  
70
      wp.Quiesce()
71
    finally:
72
      wp.TerminateWorkers()
73
      self._CheckWorkerCount(wp, 0)
74

  
75
  def testNoTasks(self):
76
    wp = workerpool.WorkerPool(3, DummyBaseWorker)
77
    try:
78
      self._CheckWorkerCount(wp, 3)
79
      self._CheckNoTasks(wp)
80
    finally:
81
      wp.TerminateWorkers()
82
      self._CheckWorkerCount(wp, 0)
83

  
84
  def testNoTasksQuiesce(self):
85
    wp = workerpool.WorkerPool(3, DummyBaseWorker)
86
    try:
87
      self._CheckWorkerCount(wp, 3)
88
      self._CheckNoTasks(wp)
89
      wp.Quiesce()
90
      self._CheckNoTasks(wp)
91
    finally:
92
      wp.TerminateWorkers()
93
      self._CheckWorkerCount(wp, 0)
94

  
95
  def testChecksum(self):
96
    # Tests whether all tasks are run and, since we're only using a single
97
    # thread, whether everything is started in order.
98
    wp = workerpool.WorkerPool(1, ChecksumBaseWorker)
99
    try:
100
      self._CheckWorkerCount(wp, 1)
101

  
102
      ctx = ChecksumContext()
103
      checksum = ChecksumContext.CHECKSUM_START
104
      for i in xrange(1, 100):
105
        checksum = ChecksumContext.UpdateChecksum(checksum, i)
106
        wp.AddTask(ctx, i)
107

  
108
      wp.Quiesce()
109

  
110
      self._CheckNoTasks(wp)
111

  
112
      # Check sum
113
      ctx.lock.acquire()
114
      try:
115
        self.assertEqual(checksum, ctx.checksum)
116
      finally:
117
        ctx.lock.release()
118
    finally:
119
      wp.TerminateWorkers()
120
      self._CheckWorkerCount(wp, 0)
121

  
122
  def _CheckNoTasks(self, wp):
123
    wp._lock.acquire()
124
    try:
125
      # The task queue must be empty now
126
      self.failUnless(not wp._tasks)
127
    finally:
128
      wp._lock.release()
129

  
130
  def _CheckWorkerCount(self, wp, num_workers):
131
    wp._lock.acquire()
132
    try:
133
      self.assertEqual(len(wp._workers), num_workers)
134
    finally:
135
      wp._lock.release()
136

  
137

  
138
if __name__ == '__main__':
139
  unittest.main()

Also available in: Unified diff