Statistics
| Branch: | Tag: | Revision:

root / lib / workerpool.py @ 53b1d12b

History | View | Annotate | Download (8.9 kB)

1
#
2
#
3

    
4
# Copyright (C) 2008 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Base classes for worker pools.
23

24
"""
25

    
26
import collections
27
import logging
28
import threading
29

    
30
from ganeti import errors
31
from ganeti import utils
32

    
33

    
34
class BaseWorker(threading.Thread, object):
35
  """Base worker class for worker pools.
36

37
  Users of a worker pool must override RunTask in a subclass.
38

39
  """
40
  def __init__(self, pool, worker_id):
41
    """Constructor for BaseWorker thread.
42

43
    Args:
44
    - pool: Parent worker pool
45
    - worker_id: Identifier for this worker
46

47
    """
48
    super(BaseWorker, self).__init__()
49
    self.pool = pool
50
    self.worker_id = worker_id
51
    self._current_task = None
52

    
53
  def ShouldTerminate(self):
54
    """Returns whether a worker should terminate.
55

56
    """
57
    return self.pool.ShouldWorkerTerminate(self)
58

    
59
  def _HasRunningTaskUnlocked(self):
60
    """Returns whether this worker is currently running a task.
61

62
    """
63
    return (self._current_task is not None)
64

    
65
  def HasRunningTask(self):
66
    """Returns whether this worker is currently running a task.
67

68
    """
69
    self.pool._lock.acquire()
70
    try:
71
      return self._HasRunningTaskUnlocked()
72
    finally:
73
      self.pool._lock.release()
74

    
75
  def run(self):
76
    """Main thread function.
77

78
    Waits for new tasks to show up in the queue.
79

80
    """
81
    pool = self.pool
82

    
83
    assert not self.HasRunningTask()
84

    
85
    while True:
86
      try:
87
        # We wait on lock to be told either terminate or do a task.
88
        pool._lock.acquire()
89
        try:
90
          if pool._ShouldWorkerTerminateUnlocked(self):
91
            break
92

    
93
          # We only wait if there's no task for us.
94
          if not pool._tasks:
95
            logging.debug("Worker %s: waiting for tasks", self.worker_id)
96

    
97
            # wait() releases the lock and sleeps until notified
98
            pool._pool_to_worker.wait()
99

    
100
            logging.debug("Worker %s: notified while waiting", self.worker_id)
101

    
102
            # Were we woken up in order to terminate?
103
            if pool._ShouldWorkerTerminateUnlocked(self):
104
              break
105

    
106
            if not pool._tasks:
107
              # Spurious notification, ignore
108
              continue
109

    
110
          # Get task from queue and tell pool about it
111
          try:
112
            self._current_task = pool._tasks.popleft()
113
          finally:
114
            pool._worker_to_pool.notifyAll()
115
        finally:
116
          pool._lock.release()
117

    
118
        # Run the actual task
119
        try:
120
          logging.debug("Worker %s: starting task %r",
121
                        self.worker_id, self._current_task)
122
          self.RunTask(*self._current_task)
123
          logging.debug("Worker %s: done with task %r",
124
                        self.worker_id, self._current_task)
125
        except:
126
          logging.error("Worker %s: Caught unhandled exception",
127
                        self.worker_id, exc_info=True)
128
      finally:
129
        # Notify pool
130
        pool._lock.acquire()
131
        try:
132
          if self._current_task:
133
            self._current_task = None
134
            pool._worker_to_pool.notifyAll()
135
        finally:
136
          pool._lock.release()
137

    
138
    logging.debug("Worker %s: terminates", self.worker_id)
139

    
140
  def RunTask(self, *args):
141
    """Function called to start a task.
142

143
    """
144
    raise NotImplementedError()
145

    
146

    
147
class WorkerPool(object):
148
  """Worker pool with a queue.
149

150
  This class is thread-safe.
151

152
  Tasks are guaranteed to be started in the order in which they're added to the
153
  pool. Due to the nature of threading, they're not guaranteed to finish in the
154
  same order.
155

156
  """
157
  def __init__(self, num_workers, worker_class):
158
    """Constructor for worker pool.
159

160
    Args:
161
    - num_workers: Number of workers to be started (dynamic resizing is not
162
                   yet implemented)
163
    - worker_class: Class to be instantiated for workers; should derive from
164
                    BaseWorker
165

166
    """
167
    # Some of these variables are accessed by BaseWorker
168
    self._lock = threading.Lock()
169
    self._pool_to_pool = threading.Condition(self._lock)
170
    self._pool_to_worker = threading.Condition(self._lock)
171
    self._worker_to_pool = threading.Condition(self._lock)
172
    self._worker_class = worker_class
173
    self._last_worker_id = 0
174
    self._workers = []
175
    self._quiescing = False
176

    
177
    # Terminating workers
178
    self._termworkers = []
179

    
180
    # Queued tasks
181
    self._tasks = collections.deque()
182

    
183
    # Start workers
184
    self.Resize(num_workers)
185

    
186
  # TODO: Implement dynamic resizing?
187

    
188
  def AddTask(self, *args):
189
    """Adds a task to the queue.
190

191
    Args:
192
    - *args: Arguments passed to BaseWorker.RunTask
193

194
    """
195
    self._lock.acquire()
196
    try:
197
      # Don't add new tasks while we're quiescing
198
      while self._quiescing:
199
        self._pool_to_pool.wait()
200

    
201
      # Add task to internal queue
202
      self._tasks.append(args)
203

    
204
      # Wake one idling worker up
205
      self._pool_to_worker.notify()
206
    finally:
207
      self._lock.release()
208

    
209
  def _ShouldWorkerTerminateUnlocked(self, worker):
210
    """Returns whether a worker should terminate.
211

212
    """
213
    return (worker in self._termworkers)
214

    
215
  def ShouldWorkerTerminate(self, worker):
216
    """Returns whether a worker should terminate.
217

218
    """
219
    self._lock.acquire()
220
    try:
221
      return self._ShouldWorkerTerminateUnlocked(self)
222
    finally:
223
      self._lock.release()
224

    
225
  def _HasRunningTasksUnlocked(self):
226
    """Checks whether there's a task running in a worker.
227

228
    """
229
    for worker in self._workers + self._termworkers:
230
      if worker._HasRunningTaskUnlocked():
231
        return True
232
    return False
233

    
234
  def Quiesce(self):
235
    """Waits until the task queue is empty.
236

237
    """
238
    self._lock.acquire()
239
    try:
240
      self._quiescing = True
241

    
242
      # Wait while there are tasks pending or running
243
      while self._tasks or self._HasRunningTasksUnlocked():
244
        self._worker_to_pool.wait()
245

    
246
    finally:
247
      self._quiescing = False
248

    
249
      # Make sure AddTasks continues in case it was waiting
250
      self._pool_to_pool.notifyAll()
251

    
252
      self._lock.release()
253

    
254
  def _NewWorkerIdUnlocked(self):
255
    self._last_worker_id += 1
256
    return self._last_worker_id
257

    
258
  def _ResizeUnlocked(self, num_workers):
259
    """Changes the number of workers.
260

261
    """
262
    assert num_workers >= 0, "num_workers must be >= 0"
263

    
264
    logging.debug("Resizing to %s workers", num_workers)
265

    
266
    current_count = len(self._workers)
267

    
268
    if current_count == num_workers:
269
      # Nothing to do
270
      pass
271

    
272
    elif current_count > num_workers:
273
      if num_workers == 0:
274
        # Create copy of list to iterate over while lock isn't held.
275
        termworkers = self._workers[:]
276
        del self._workers[:]
277
      else:
278
        # TODO: Implement partial downsizing
279
        raise NotImplementedError()
280
        #termworkers = ...
281

    
282
      self._termworkers += termworkers
283

    
284
      # Notify workers that something has changed
285
      self._pool_to_worker.notifyAll()
286

    
287
      # Join all terminating workers
288
      self._lock.release()
289
      try:
290
        for worker in termworkers:
291
          worker.join()
292
      finally:
293
        self._lock.acquire()
294

    
295
      # Remove terminated threads. This could be done in a more efficient way
296
      # (del self._termworkers[:]), but checking worker.isAlive() makes sure we
297
      # don't leave zombie threads around.
298
      for worker in termworkers:
299
        assert worker in self._termworkers, ("Worker not in list of"
300
                                             " terminating workers")
301
        if not worker.isAlive():
302
          self._termworkers.remove(worker)
303

    
304
      assert not self._termworkers, "Zombie worker detected"
305

    
306
    elif current_count < num_workers:
307
      # Create (num_workers - current_count) new workers
308
      for i in xrange(num_workers - current_count):
309
        worker = self._worker_class(self, self._NewWorkerIdUnlocked())
310
        self._workers.append(worker)
311
        worker.start()
312

    
313
  def Resize(self, num_workers):
314
    """Changes the number of workers in the pool.
315

316
    Args:
317
    - num_workers: New number of workers
318

319
    """
320
    self._lock.acquire()
321
    try:
322
      return self._ResizeUnlocked(num_workers)
323
    finally:
324
      self._lock.release()
325

    
326
  def TerminateWorkers(self):
327
    """Terminate all worker threads.
328

329
    Unstarted tasks will be ignored.
330

331
    """
332
    logging.debug("Terminating all workers")
333

    
334
    self._lock.acquire()
335
    try:
336
      self._ResizeUnlocked(0)
337

    
338
      if self._tasks:
339
        logging.debug("There are %s tasks left", len(self._tasks))
340
    finally:
341
      self._lock.release()
342

    
343
    logging.debug("All workers terminated")