Statistics
| Branch: | Tag: | Revision:

root / lib / workerpool.py @ 76094e37

History | View | Annotate | Download (7.9 kB)

1
#
2
#
3

    
4
# Copyright (C) 2008 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Base classes for worker pools.
23

24
"""
25

    
26
import collections
27
import logging
28
import threading
29

    
30
from ganeti import errors
31
from ganeti import utils
32

    
33

    
34
class BaseWorker(threading.Thread, object):
35
  """Base worker class for worker pools.
36

37
  Users of a worker pool must override RunTask in a subclass.
38

39
  """
40
  def __init__(self, pool, worker_id):
41
    """Constructor for BaseWorker thread.
42

43
    Args:
44
    - pool: Parent worker pool
45
    - worker_id: Identifier for this worker
46

47
    """
48
    super(BaseWorker, self).__init__()
49
    self.pool = pool
50
    self.worker_id = worker_id
51

    
52
    # Also used by WorkerPool
53
    self._current_task = None
54

    
55
  def ShouldTerminate(self):
56
    """Returns whether a worker should terminate.
57

58
    """
59
    return self.pool.ShouldWorkerTerminate(self)
60

    
61
  def run(self):
62
    """Main thread function.
63

64
    Waits for new tasks to show up in the queue.
65

66
    """
67
    pool = self.pool
68

    
69
    assert self._current_task is None
70

    
71
    while True:
72
      try:
73
        # We wait on lock to be told either terminate or do a task.
74
        pool._lock.acquire()
75
        try:
76
          if pool._ShouldWorkerTerminateUnlocked(self):
77
            break
78

    
79
          # We only wait if there's no task for us.
80
          if not pool._tasks:
81
            # wait() releases the lock and sleeps until notified
82
            pool._lock.wait()
83

    
84
            # Were we woken up in order to terminate?
85
            if pool._ShouldWorkerTerminateUnlocked(self):
86
              break
87

    
88
            if not pool._tasks:
89
              # Spurious notification, ignore
90
              continue
91

    
92
          # Get task from queue and tell pool about it
93
          try:
94
            self._current_task = pool._tasks.popleft()
95
          finally:
96
            pool._lock.notifyAll()
97
        finally:
98
          pool._lock.release()
99

    
100
        # Run the actual task
101
        try:
102
          self.RunTask(*self._current_task)
103
        except:
104
          logging.error("Worker %s: Caught unhandled exception",
105
                        self.worker_id, exc_info=True)
106
      finally:
107
        self._current_task = None
108

    
109
        # Notify pool
110
        pool._lock.acquire()
111
        try:
112
          pool._lock.notifyAll()
113
        finally:
114
          pool._lock.release()
115

    
116
  def RunTask(self, *args):
117
    """Function called to start a task.
118

119
    """
120
    raise NotImplementedError()
121

    
122

    
123
class WorkerPool(object):
124
  """Worker pool with a queue.
125

126
  This class is thread-safe.
127

128
  Tasks are guaranteed to be started in the order in which they're added to the
129
  pool. Due to the nature of threading, they're not guaranteed to finish in the
130
  same order.
131

132
  """
133
  def __init__(self, num_workers, worker_class):
134
    """Constructor for worker pool.
135

136
    Args:
137
    - num_workers: Number of workers to be started (dynamic resizing is not
138
                   yet implemented)
139
    - worker_class: Class to be instantiated for workers; should derive from
140
                    BaseWorker
141

142
    """
143
    # Some of these variables are accessed by BaseWorker
144
    self._lock = threading.Condition(threading.Lock())
145
    self._worker_class = worker_class
146
    self._last_worker_id = 0
147
    self._workers = []
148
    self._quiescing = False
149

    
150
    # Terminating workers
151
    self._termworkers = []
152

    
153
    # Queued tasks
154
    self._tasks = collections.deque()
155

    
156
    # Start workers
157
    self.Resize(num_workers)
158

    
159
  # TODO: Implement dynamic resizing?
160

    
161
  def AddTask(self, *args):
162
    """Adds a task to the queue.
163

164
    Args:
165
    - *args: Arguments passed to BaseWorker.RunTask
166

167
    """
168
    self._lock.acquire()
169
    try:
170
      # Don't add new tasks while we're quiescing
171
      while self._quiescing:
172
        self._lock.wait()
173

    
174
      # Add task to internal queue
175
      self._tasks.append(args)
176
      self._lock.notify()
177
    finally:
178
      self._lock.release()
179

    
180
  def _ShouldWorkerTerminateUnlocked(self, worker):
181
    """Returns whether a worker should terminate.
182

183
    """
184
    return (worker in self._termworkers)
185

    
186
  def ShouldWorkerTerminate(self, worker):
187
    """Returns whether a worker should terminate.
188

189
    """
190
    self._lock.acquire()
191
    try:
192
      return self._ShouldWorkerTerminateUnlocked(self)
193
    finally:
194
      self._lock.release()
195

    
196
  def _HasRunningTasksUnlocked(self):
197
    """Checks whether there's a task running in a worker.
198

199
    """
200
    for worker in self._workers + self._termworkers:
201
      if worker._current_task is not None:
202
        return True
203
    return False
204

    
205
  def Quiesce(self):
206
    """Waits until the task queue is empty.
207

208
    """
209
    self._lock.acquire()
210
    try:
211
      self._quiescing = True
212

    
213
      # Wait while there are tasks pending or running
214
      while self._tasks or self._HasRunningTasksUnlocked():
215
        self._lock.wait()
216

    
217
    finally:
218
      self._quiescing = False
219

    
220
      # Make sure AddTasks continues in case it was waiting
221
      self._lock.notifyAll()
222

    
223
      self._lock.release()
224

    
225
  def _NewWorkerIdUnlocked(self):
226
    self._last_worker_id += 1
227
    return self._last_worker_id
228

    
229
  def _ResizeUnlocked(self, num_workers):
230
    """Changes the number of workers.
231

232
    """
233
    assert num_workers >= 0, "num_workers must be >= 0"
234

    
235
    logging.debug("Resizing to %s workers", num_workers)
236

    
237
    current_count = len(self._workers)
238

    
239
    if current_count == num_workers:
240
      # Nothing to do
241
      pass
242

    
243
    elif current_count > num_workers:
244
      if num_workers == 0:
245
        # Create copy of list to iterate over while lock isn't held.
246
        termworkers = self._workers[:]
247
        del self._workers[:]
248
      else:
249
        # TODO: Implement partial downsizing
250
        raise NotImplementedError()
251
        #termworkers = ...
252

    
253
      self._termworkers += termworkers
254

    
255
      # Notify workers that something has changed
256
      self._lock.notifyAll()
257

    
258
      # Join all terminating workers
259
      self._lock.release()
260
      try:
261
        for worker in termworkers:
262
          worker.join()
263
      finally:
264
        self._lock.acquire()
265

    
266
      # Remove terminated threads. This could be done in a more efficient way
267
      # (del self._termworkers[:]), but checking worker.isAlive() makes sure we
268
      # don't leave zombie threads around.
269
      for worker in termworkers:
270
        assert worker in self._termworkers, ("Worker not in list of"
271
                                             " terminating workers")
272
        if not worker.isAlive():
273
          self._termworkers.remove(worker)
274

    
275
      assert not self._termworkers, "Zombie worker detected"
276

    
277
    elif current_count < num_workers:
278
      # Create (num_workers - current_count) new workers
279
      for i in xrange(num_workers - current_count):
280
        worker = self._worker_class(self, self._NewWorkerIdUnlocked())
281
        self._workers.append(worker)
282
        worker.start()
283

    
284
  def Resize(self, num_workers):
285
    """Changes the number of workers in the pool.
286

287
    Args:
288
    - num_workers: New number of workers
289

290
    """
291
    self._lock.acquire()
292
    try:
293
      return self._ResizeUnlocked(num_workers)
294
    finally:
295
      self._lock.release()
296

    
297
  def TerminateWorkers(self):
298
    """Terminate all worker threads.
299

300
    Unstarted tasks will be ignored.
301

302
    """
303
    logging.debug("Terminating all workers")
304

    
305
    self._lock.acquire()
306
    try:
307
      self._ResizeUnlocked(0)
308

    
309
      if self._tasks:
310
        logging.debug("There are %s tasks left", len(self._tasks))
311
    finally:
312
      self._lock.release()
313

    
314
    logging.debug("All workers terminated")