Statistics
| Branch: | Tag: | Revision:

root / lib / workerpool.py @ b3558df1

History | View | Annotate | Download (8.7 kB)

1
#
2
#
3

    
4
# Copyright (C) 2008 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Base classes for worker pools.
23

24
"""
25

    
26
import collections
27
import logging
28
import threading
29

    
30
from ganeti import errors
31
from ganeti import utils
32

    
33

    
34
class BaseWorker(threading.Thread, object):
35
  """Base worker class for worker pools.
36

37
  Users of a worker pool must override RunTask in a subclass.
38

39
  """
40
  def __init__(self, pool, worker_id):
41
    """Constructor for BaseWorker thread.
42

43
    Args:
44
    - pool: Parent worker pool
45
    - worker_id: Identifier for this worker
46

47
    """
48
    super(BaseWorker, self).__init__()
49
    self.pool = pool
50
    self.worker_id = worker_id
51
    self._current_task = None
52

    
53
  def ShouldTerminate(self):
54
    """Returns whether a worker should terminate.
55

56
    """
57
    return self.pool.ShouldWorkerTerminate(self)
58

    
59
  def _HasRunningTaskUnlocked(self):
60
    """Returns whether this worker is currently running a task.
61

62
    """
63
    return (self._current_task is not None)
64

    
65
  def HasRunningTask(self):
66
    """Returns whether this worker is currently running a task.
67

68
    """
69
    self.pool._lock.acquire()
70
    try:
71
      return self._HasRunningTaskUnlocked()
72
    finally:
73
      self.pool._lock.release()
74

    
75
  def run(self):
76
    """Main thread function.
77

78
    Waits for new tasks to show up in the queue.
79

80
    """
81
    pool = self.pool
82

    
83
    assert not self.HasRunningTask()
84

    
85
    while True:
86
      try:
87
        # We wait on lock to be told either terminate or do a task.
88
        pool._lock.acquire()
89
        try:
90
          if pool._ShouldWorkerTerminateUnlocked(self):
91
            break
92

    
93
          # We only wait if there's no task for us.
94
          if not pool._tasks:
95
            logging.debug("Worker %s: waiting for tasks", self.worker_id)
96

    
97
            # wait() releases the lock and sleeps until notified
98
            pool._lock.wait()
99

    
100
            logging.debug("Worker %s: notified while waiting", self.worker_id)
101

    
102
            # Were we woken up in order to terminate?
103
            if pool._ShouldWorkerTerminateUnlocked(self):
104
              break
105

    
106
            if not pool._tasks:
107
              # Spurious notification, ignore
108
              continue
109

    
110
          # Get task from queue and tell pool about it
111
          try:
112
            self._current_task = pool._tasks.popleft()
113
          finally:
114
            pool._lock.notifyAll()
115
        finally:
116
          pool._lock.release()
117

    
118
        # Run the actual task
119
        try:
120
          logging.debug("Worker %s: starting task %r",
121
                        self.worker_id, self._current_task)
122
          self.RunTask(*self._current_task)
123
          logging.debug("Worker %s: done with task %r",
124
                        self.worker_id, self._current_task)
125
        except:
126
          logging.error("Worker %s: Caught unhandled exception",
127
                        self.worker_id, exc_info=True)
128
      finally:
129
        # Notify pool
130
        pool._lock.acquire()
131
        try:
132
          if self._current_task:
133
            self._current_task = None
134
            pool._lock.notifyAll()
135
        finally:
136
          pool._lock.release()
137

    
138
    logging.debug("Worker %s: terminates", self.worker_id)
139

    
140
  def RunTask(self, *args):
141
    """Function called to start a task.
142

143
    """
144
    raise NotImplementedError()
145

    
146

    
147
class WorkerPool(object):
148
  """Worker pool with a queue.
149

150
  This class is thread-safe.
151

152
  Tasks are guaranteed to be started in the order in which they're added to the
153
  pool. Due to the nature of threading, they're not guaranteed to finish in the
154
  same order.
155

156
  """
157
  def __init__(self, num_workers, worker_class):
158
    """Constructor for worker pool.
159

160
    Args:
161
    - num_workers: Number of workers to be started (dynamic resizing is not
162
                   yet implemented)
163
    - worker_class: Class to be instantiated for workers; should derive from
164
                    BaseWorker
165

166
    """
167
    # Some of these variables are accessed by BaseWorker
168
    self._lock = threading.Condition(threading.Lock())
169
    self._worker_class = worker_class
170
    self._last_worker_id = 0
171
    self._workers = []
172
    self._quiescing = False
173

    
174
    # Terminating workers
175
    self._termworkers = []
176

    
177
    # Queued tasks
178
    self._tasks = collections.deque()
179

    
180
    # Start workers
181
    self.Resize(num_workers)
182

    
183
  # TODO: Implement dynamic resizing?
184

    
185
  def AddTask(self, *args):
186
    """Adds a task to the queue.
187

188
    Args:
189
    - *args: Arguments passed to BaseWorker.RunTask
190

191
    """
192
    self._lock.acquire()
193
    try:
194
      # Don't add new tasks while we're quiescing
195
      while self._quiescing:
196
        self._lock.wait()
197

    
198
      # Add task to internal queue
199
      self._tasks.append(args)
200
      self._lock.notify()
201
    finally:
202
      self._lock.release()
203

    
204
  def _ShouldWorkerTerminateUnlocked(self, worker):
205
    """Returns whether a worker should terminate.
206

207
    """
208
    return (worker in self._termworkers)
209

    
210
  def ShouldWorkerTerminate(self, worker):
211
    """Returns whether a worker should terminate.
212

213
    """
214
    self._lock.acquire()
215
    try:
216
      return self._ShouldWorkerTerminateUnlocked(self)
217
    finally:
218
      self._lock.release()
219

    
220
  def _HasRunningTasksUnlocked(self):
221
    """Checks whether there's a task running in a worker.
222

223
    """
224
    for worker in self._workers + self._termworkers:
225
      if worker._HasRunningTaskUnlocked():
226
        return True
227
    return False
228

    
229
  def Quiesce(self):
230
    """Waits until the task queue is empty.
231

232
    """
233
    self._lock.acquire()
234
    try:
235
      self._quiescing = True
236

    
237
      # Wait while there are tasks pending or running
238
      while self._tasks or self._HasRunningTasksUnlocked():
239
        self._lock.wait()
240

    
241
    finally:
242
      self._quiescing = False
243

    
244
      # Make sure AddTasks continues in case it was waiting
245
      self._lock.notifyAll()
246

    
247
      self._lock.release()
248

    
249
  def _NewWorkerIdUnlocked(self):
250
    self._last_worker_id += 1
251
    return self._last_worker_id
252

    
253
  def _ResizeUnlocked(self, num_workers):
254
    """Changes the number of workers.
255

256
    """
257
    assert num_workers >= 0, "num_workers must be >= 0"
258

    
259
    logging.debug("Resizing to %s workers", num_workers)
260

    
261
    current_count = len(self._workers)
262

    
263
    if current_count == num_workers:
264
      # Nothing to do
265
      pass
266

    
267
    elif current_count > num_workers:
268
      if num_workers == 0:
269
        # Create copy of list to iterate over while lock isn't held.
270
        termworkers = self._workers[:]
271
        del self._workers[:]
272
      else:
273
        # TODO: Implement partial downsizing
274
        raise NotImplementedError()
275
        #termworkers = ...
276

    
277
      self._termworkers += termworkers
278

    
279
      # Notify workers that something has changed
280
      self._lock.notifyAll()
281

    
282
      # Join all terminating workers
283
      self._lock.release()
284
      try:
285
        for worker in termworkers:
286
          worker.join()
287
      finally:
288
        self._lock.acquire()
289

    
290
      # Remove terminated threads. This could be done in a more efficient way
291
      # (del self._termworkers[:]), but checking worker.isAlive() makes sure we
292
      # don't leave zombie threads around.
293
      for worker in termworkers:
294
        assert worker in self._termworkers, ("Worker not in list of"
295
                                             " terminating workers")
296
        if not worker.isAlive():
297
          self._termworkers.remove(worker)
298

    
299
      assert not self._termworkers, "Zombie worker detected"
300

    
301
    elif current_count < num_workers:
302
      # Create (num_workers - current_count) new workers
303
      for i in xrange(num_workers - current_count):
304
        worker = self._worker_class(self, self._NewWorkerIdUnlocked())
305
        self._workers.append(worker)
306
        worker.start()
307

    
308
  def Resize(self, num_workers):
309
    """Changes the number of workers in the pool.
310

311
    Args:
312
    - num_workers: New number of workers
313

314
    """
315
    self._lock.acquire()
316
    try:
317
      return self._ResizeUnlocked(num_workers)
318
    finally:
319
      self._lock.release()
320

    
321
  def TerminateWorkers(self):
322
    """Terminate all worker threads.
323

324
    Unstarted tasks will be ignored.
325

326
    """
327
    logging.debug("Terminating all workers")
328

    
329
    self._lock.acquire()
330
    try:
331
      self._ResizeUnlocked(0)
332

    
333
      if self._tasks:
334
        logging.debug("There are %s tasks left", len(self._tasks))
335
    finally:
336
      self._lock.release()
337

    
338
    logging.debug("All workers terminated")