Statistics
| Branch: | Tag: | Revision:

root / lib / workerpool.py @ fb62843c

History | View | Annotate | Download (18.3 kB)

1 76094e37 Michael Hanselmann
#
2 76094e37 Michael Hanselmann
#
3 76094e37 Michael Hanselmann
4 189d2714 Michael Hanselmann
# Copyright (C) 2008, 2009, 2010 Google Inc.
5 76094e37 Michael Hanselmann
#
6 76094e37 Michael Hanselmann
# This program is free software; you can redistribute it and/or modify
7 76094e37 Michael Hanselmann
# it under the terms of the GNU General Public License as published by
8 76094e37 Michael Hanselmann
# the Free Software Foundation; either version 2 of the License, or
9 76094e37 Michael Hanselmann
# (at your option) any later version.
10 76094e37 Michael Hanselmann
#
11 76094e37 Michael Hanselmann
# This program is distributed in the hope that it will be useful, but
12 76094e37 Michael Hanselmann
# WITHOUT ANY WARRANTY; without even the implied warranty of
13 76094e37 Michael Hanselmann
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 76094e37 Michael Hanselmann
# General Public License for more details.
15 76094e37 Michael Hanselmann
#
16 76094e37 Michael Hanselmann
# You should have received a copy of the GNU General Public License
17 76094e37 Michael Hanselmann
# along with this program; if not, write to the Free Software
18 76094e37 Michael Hanselmann
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 76094e37 Michael Hanselmann
# 02110-1301, USA.
20 76094e37 Michael Hanselmann
21 76094e37 Michael Hanselmann
22 76094e37 Michael Hanselmann
"""Base classes for worker pools.
23 76094e37 Michael Hanselmann

24 76094e37 Michael Hanselmann
"""
25 76094e37 Michael Hanselmann
26 76094e37 Michael Hanselmann
import logging
27 76094e37 Michael Hanselmann
import threading
28 52c47e4e Michael Hanselmann
import heapq
29 c258f110 Michael Hanselmann
import itertools
30 76094e37 Michael Hanselmann
31 25e557a5 Guido Trotter
from ganeti import compat
32 52c47e4e Michael Hanselmann
from ganeti import errors
33 25e557a5 Guido Trotter
34 76094e37 Michael Hanselmann
35 21c5ad52 Michael Hanselmann
_TERMINATE = object()
36 52c47e4e Michael Hanselmann
_DEFAULT_PRIORITY = 0
37 52c47e4e Michael Hanselmann
38 52c47e4e Michael Hanselmann
39 52c47e4e Michael Hanselmann
class DeferTask(Exception):
40 52c47e4e Michael Hanselmann
  """Special exception class to defer a task.
41 52c47e4e Michael Hanselmann

42 52c47e4e Michael Hanselmann
  This class can be raised by L{BaseWorker.RunTask} to defer the execution of a
43 52c47e4e Michael Hanselmann
  task. Optionally, the priority of the task can be changed.
44 52c47e4e Michael Hanselmann

45 52c47e4e Michael Hanselmann
  """
46 52c47e4e Michael Hanselmann
  def __init__(self, priority=None):
47 52c47e4e Michael Hanselmann
    """Initializes this class.
48 52c47e4e Michael Hanselmann

49 52c47e4e Michael Hanselmann
    @type priority: number
50 52c47e4e Michael Hanselmann
    @param priority: New task priority (None means no change)
51 52c47e4e Michael Hanselmann

52 52c47e4e Michael Hanselmann
    """
53 52c47e4e Michael Hanselmann
    Exception.__init__(self)
54 52c47e4e Michael Hanselmann
    self.priority = priority
55 21c5ad52 Michael Hanselmann
56 21c5ad52 Michael Hanselmann
57 9a2564e7 Michael Hanselmann
class NoSuchTask(Exception):
58 9a2564e7 Michael Hanselmann
  """Exception raised when a task can't be found.
59 9a2564e7 Michael Hanselmann

60 9a2564e7 Michael Hanselmann
  """
61 9a2564e7 Michael Hanselmann
62 9a2564e7 Michael Hanselmann
63 76094e37 Michael Hanselmann
class BaseWorker(threading.Thread, object):
64 76094e37 Michael Hanselmann
  """Base worker class for worker pools.
65 76094e37 Michael Hanselmann

66 76094e37 Michael Hanselmann
  Users of a worker pool must override RunTask in a subclass.
67 76094e37 Michael Hanselmann

68 76094e37 Michael Hanselmann
  """
69 b459a848 Andrea Spadaccini
  # pylint: disable=W0212
70 76094e37 Michael Hanselmann
  def __init__(self, pool, worker_id):
71 76094e37 Michael Hanselmann
    """Constructor for BaseWorker thread.
72 76094e37 Michael Hanselmann

73 116db7c7 Iustin Pop
    @param pool: the parent worker pool
74 116db7c7 Iustin Pop
    @param worker_id: identifier for this worker
75 76094e37 Michael Hanselmann

76 76094e37 Michael Hanselmann
    """
77 d16e6fd9 Michael Hanselmann
    super(BaseWorker, self).__init__(name=worker_id)
78 76094e37 Michael Hanselmann
    self.pool = pool
79 daba67c7 Michael Hanselmann
    self._worker_id = worker_id
80 76094e37 Michael Hanselmann
    self._current_task = None
81 76094e37 Michael Hanselmann
82 daba67c7 Michael Hanselmann
    assert self.getName() == worker_id
83 daba67c7 Michael Hanselmann
84 76094e37 Michael Hanselmann
  def ShouldTerminate(self):
85 2f4e1516 Michael Hanselmann
    """Returns whether this worker should terminate.
86 2f4e1516 Michael Hanselmann

87 2f4e1516 Michael Hanselmann
    Should only be called from within L{RunTask}.
88 76094e37 Michael Hanselmann

89 76094e37 Michael Hanselmann
    """
90 2f4e1516 Michael Hanselmann
    self.pool._lock.acquire()
91 2f4e1516 Michael Hanselmann
    try:
92 2f4e1516 Michael Hanselmann
      assert self._HasRunningTaskUnlocked()
93 2f4e1516 Michael Hanselmann
      return self.pool._ShouldWorkerTerminateUnlocked(self)
94 2f4e1516 Michael Hanselmann
    finally:
95 2f4e1516 Michael Hanselmann
      self.pool._lock.release()
96 76094e37 Michael Hanselmann
97 52c47e4e Michael Hanselmann
  def GetCurrentPriority(self):
98 52c47e4e Michael Hanselmann
    """Returns the priority of the current task.
99 52c47e4e Michael Hanselmann

100 52c47e4e Michael Hanselmann
    Should only be called from within L{RunTask}.
101 52c47e4e Michael Hanselmann

102 52c47e4e Michael Hanselmann
    """
103 52c47e4e Michael Hanselmann
    self.pool._lock.acquire()
104 52c47e4e Michael Hanselmann
    try:
105 52c47e4e Michael Hanselmann
      assert self._HasRunningTaskUnlocked()
106 52c47e4e Michael Hanselmann
107 125b74b2 Michael Hanselmann
      (priority, _, _, _) = self._current_task
108 52c47e4e Michael Hanselmann
109 52c47e4e Michael Hanselmann
      return priority
110 52c47e4e Michael Hanselmann
    finally:
111 52c47e4e Michael Hanselmann
      self.pool._lock.release()
112 52c47e4e Michael Hanselmann
113 daba67c7 Michael Hanselmann
  def SetTaskName(self, taskname):
114 daba67c7 Michael Hanselmann
    """Sets the name of the current task.
115 daba67c7 Michael Hanselmann

116 daba67c7 Michael Hanselmann
    Should only be called from within L{RunTask}.
117 daba67c7 Michael Hanselmann

118 daba67c7 Michael Hanselmann
    @type taskname: string
119 daba67c7 Michael Hanselmann
    @param taskname: Task's name
120 daba67c7 Michael Hanselmann

121 daba67c7 Michael Hanselmann
    """
122 daba67c7 Michael Hanselmann
    if taskname:
123 daba67c7 Michael Hanselmann
      name = "%s/%s" % (self._worker_id, taskname)
124 daba67c7 Michael Hanselmann
    else:
125 daba67c7 Michael Hanselmann
      name = self._worker_id
126 daba67c7 Michael Hanselmann
127 daba67c7 Michael Hanselmann
    # Set thread name
128 daba67c7 Michael Hanselmann
    self.setName(name)
129 daba67c7 Michael Hanselmann
130 b3558df1 Michael Hanselmann
  def _HasRunningTaskUnlocked(self):
131 b3558df1 Michael Hanselmann
    """Returns whether this worker is currently running a task.
132 b3558df1 Michael Hanselmann

133 b3558df1 Michael Hanselmann
    """
134 b3558df1 Michael Hanselmann
    return (self._current_task is not None)
135 b3558df1 Michael Hanselmann
136 bba69414 Michael Hanselmann
  def _GetCurrentOrderAndTaskId(self):
137 bba69414 Michael Hanselmann
    """Returns the order and task ID of the current task.
138 bba69414 Michael Hanselmann

139 bba69414 Michael Hanselmann
    Should only be called from within L{RunTask}.
140 bba69414 Michael Hanselmann

141 bba69414 Michael Hanselmann
    """
142 bba69414 Michael Hanselmann
    self.pool._lock.acquire()
143 bba69414 Michael Hanselmann
    try:
144 bba69414 Michael Hanselmann
      assert self._HasRunningTaskUnlocked()
145 bba69414 Michael Hanselmann
146 bba69414 Michael Hanselmann
      (_, order_id, task_id, _) = self._current_task
147 bba69414 Michael Hanselmann
148 bba69414 Michael Hanselmann
      return (order_id, task_id)
149 bba69414 Michael Hanselmann
    finally:
150 bba69414 Michael Hanselmann
      self.pool._lock.release()
151 bba69414 Michael Hanselmann
152 76094e37 Michael Hanselmann
  def run(self):
153 76094e37 Michael Hanselmann
    """Main thread function.
154 76094e37 Michael Hanselmann

155 76094e37 Michael Hanselmann
    Waits for new tasks to show up in the queue.
156 76094e37 Michael Hanselmann

157 76094e37 Michael Hanselmann
    """
158 76094e37 Michael Hanselmann
    pool = self.pool
159 76094e37 Michael Hanselmann
160 76094e37 Michael Hanselmann
    while True:
161 46d0a3d0 Michael Hanselmann
      assert self._current_task is None
162 52c47e4e Michael Hanselmann
163 52c47e4e Michael Hanselmann
      defer = None
164 76094e37 Michael Hanselmann
      try:
165 21c5ad52 Michael Hanselmann
        # Wait on lock to be told either to terminate or to do a task
166 76094e37 Michael Hanselmann
        pool._lock.acquire()
167 76094e37 Michael Hanselmann
        try:
168 21c5ad52 Michael Hanselmann
          task = pool._WaitForTaskUnlocked(self)
169 76094e37 Michael Hanselmann
170 21c5ad52 Michael Hanselmann
          if task is _TERMINATE:
171 21c5ad52 Michael Hanselmann
            # Told to terminate
172 21c5ad52 Michael Hanselmann
            break
173 b3558df1 Michael Hanselmann
174 21c5ad52 Michael Hanselmann
          if task is None:
175 21c5ad52 Michael Hanselmann
            # Spurious notification, ignore
176 21c5ad52 Michael Hanselmann
            continue
177 76094e37 Michael Hanselmann
178 21c5ad52 Michael Hanselmann
          self._current_task = task
179 76094e37 Michael Hanselmann
180 46d0a3d0 Michael Hanselmann
          # No longer needed, dispose of reference
181 46d0a3d0 Michael Hanselmann
          del task
182 46d0a3d0 Michael Hanselmann
183 21c5ad52 Michael Hanselmann
          assert self._HasRunningTaskUnlocked()
184 46d0a3d0 Michael Hanselmann
185 76094e37 Michael Hanselmann
        finally:
186 76094e37 Michael Hanselmann
          pool._lock.release()
187 76094e37 Michael Hanselmann
188 125b74b2 Michael Hanselmann
        (priority, _, _, args) = self._current_task
189 76094e37 Michael Hanselmann
        try:
190 52c47e4e Michael Hanselmann
          # Run the actual task
191 52c47e4e Michael Hanselmann
          assert defer is None
192 52c47e4e Michael Hanselmann
          logging.debug("Starting task %r, priority %s", args, priority)
193 daba67c7 Michael Hanselmann
          assert self.getName() == self._worker_id
194 daba67c7 Michael Hanselmann
          try:
195 b459a848 Andrea Spadaccini
            self.RunTask(*args) # pylint: disable=W0142
196 daba67c7 Michael Hanselmann
          finally:
197 daba67c7 Michael Hanselmann
            self.SetTaskName(None)
198 52c47e4e Michael Hanselmann
          logging.debug("Done with task %r, priority %s", args, priority)
199 52c47e4e Michael Hanselmann
        except DeferTask, err:
200 52c47e4e Michael Hanselmann
          defer = err
201 52c47e4e Michael Hanselmann
202 52c47e4e Michael Hanselmann
          if defer.priority is None:
203 52c47e4e Michael Hanselmann
            # Use same priority
204 52c47e4e Michael Hanselmann
            defer.priority = priority
205 52c47e4e Michael Hanselmann
206 e1ea54e9 Michael Hanselmann
          logging.debug("Deferring task %r, new priority %s",
207 e1ea54e9 Michael Hanselmann
                        args, defer.priority)
208 52c47e4e Michael Hanselmann
209 52c47e4e Michael Hanselmann
          assert self._HasRunningTaskUnlocked()
210 b459a848 Andrea Spadaccini
        except: # pylint: disable=W0702
211 02fc74da Michael Hanselmann
          logging.exception("Caught unhandled exception")
212 c1cf1fe5 Michael Hanselmann
213 c1cf1fe5 Michael Hanselmann
        assert self._HasRunningTaskUnlocked()
214 76094e37 Michael Hanselmann
      finally:
215 76094e37 Michael Hanselmann
        # Notify pool
216 76094e37 Michael Hanselmann
        pool._lock.acquire()
217 76094e37 Michael Hanselmann
        try:
218 52c47e4e Michael Hanselmann
          if defer:
219 52c47e4e Michael Hanselmann
            assert self._current_task
220 52c47e4e Michael Hanselmann
            # Schedule again for later run
221 bba69414 Michael Hanselmann
            (_, _, task_id, args) = self._current_task
222 bba69414 Michael Hanselmann
            pool._AddTaskUnlocked(args, defer.priority, task_id)
223 52c47e4e Michael Hanselmann
224 b3558df1 Michael Hanselmann
          if self._current_task:
225 b3558df1 Michael Hanselmann
            self._current_task = None
226 53b1d12b Michael Hanselmann
            pool._worker_to_pool.notifyAll()
227 76094e37 Michael Hanselmann
        finally:
228 76094e37 Michael Hanselmann
          pool._lock.release()
229 76094e37 Michael Hanselmann
230 c1cf1fe5 Michael Hanselmann
      assert not self._HasRunningTaskUnlocked()
231 c1cf1fe5 Michael Hanselmann
232 02fc74da Michael Hanselmann
    logging.debug("Terminates")
233 b3558df1 Michael Hanselmann
234 76094e37 Michael Hanselmann
  def RunTask(self, *args):
235 76094e37 Michael Hanselmann
    """Function called to start a task.
236 76094e37 Michael Hanselmann

237 116db7c7 Iustin Pop
    This needs to be implemented by child classes.
238 116db7c7 Iustin Pop

239 76094e37 Michael Hanselmann
    """
240 76094e37 Michael Hanselmann
    raise NotImplementedError()
241 76094e37 Michael Hanselmann
242 76094e37 Michael Hanselmann
243 76094e37 Michael Hanselmann
class WorkerPool(object):
244 76094e37 Michael Hanselmann
  """Worker pool with a queue.
245 76094e37 Michael Hanselmann

246 76094e37 Michael Hanselmann
  This class is thread-safe.
247 76094e37 Michael Hanselmann

248 116db7c7 Iustin Pop
  Tasks are guaranteed to be started in the order in which they're
249 116db7c7 Iustin Pop
  added to the pool. Due to the nature of threading, they're not
250 116db7c7 Iustin Pop
  guaranteed to finish in the same order.
251 76094e37 Michael Hanselmann

252 125b74b2 Michael Hanselmann
  @type _tasks: list of tuples
253 125b74b2 Michael Hanselmann
  @ivar _tasks: Each tuple has the format (priority, order ID, task ID,
254 125b74b2 Michael Hanselmann
    arguments). Priority and order ID are numeric and essentially control the
255 125b74b2 Michael Hanselmann
    sort order. The order ID is an increasing number denoting the order in
256 125b74b2 Michael Hanselmann
    which tasks are added to the queue. The task ID is controlled by user of
257 125b74b2 Michael Hanselmann
    workerpool, see L{AddTask} for details. The task arguments are C{None} for
258 125b74b2 Michael Hanselmann
    abandoned tasks, otherwise a sequence of arguments to be passed to
259 125b74b2 Michael Hanselmann
    L{BaseWorker.RunTask}). The list must fulfill the heap property (for use by
260 125b74b2 Michael Hanselmann
    the C{heapq} module).
261 125b74b2 Michael Hanselmann
  @type _taskdata: dict; (task IDs as keys, tuples as values)
262 125b74b2 Michael Hanselmann
  @ivar _taskdata: Mapping from task IDs to entries in L{_tasks}
263 125b74b2 Michael Hanselmann

264 76094e37 Michael Hanselmann
  """
265 89e2b4d2 Michael Hanselmann
  def __init__(self, name, num_workers, worker_class):
266 76094e37 Michael Hanselmann
    """Constructor for worker pool.
267 76094e37 Michael Hanselmann

268 116db7c7 Iustin Pop
    @param num_workers: number of workers to be started
269 116db7c7 Iustin Pop
        (dynamic resizing is not yet implemented)
270 116db7c7 Iustin Pop
    @param worker_class: the class to be instantiated for workers;
271 116db7c7 Iustin Pop
        should derive from L{BaseWorker}
272 76094e37 Michael Hanselmann

273 76094e37 Michael Hanselmann
    """
274 76094e37 Michael Hanselmann
    # Some of these variables are accessed by BaseWorker
275 53b1d12b Michael Hanselmann
    self._lock = threading.Lock()
276 53b1d12b Michael Hanselmann
    self._pool_to_pool = threading.Condition(self._lock)
277 53b1d12b Michael Hanselmann
    self._pool_to_worker = threading.Condition(self._lock)
278 53b1d12b Michael Hanselmann
    self._worker_to_pool = threading.Condition(self._lock)
279 76094e37 Michael Hanselmann
    self._worker_class = worker_class
280 89e2b4d2 Michael Hanselmann
    self._name = name
281 76094e37 Michael Hanselmann
    self._last_worker_id = 0
282 76094e37 Michael Hanselmann
    self._workers = []
283 76094e37 Michael Hanselmann
    self._quiescing = False
284 27caa993 Michael Hanselmann
    self._active = True
285 76094e37 Michael Hanselmann
286 76094e37 Michael Hanselmann
    # Terminating workers
287 76094e37 Michael Hanselmann
    self._termworkers = []
288 76094e37 Michael Hanselmann
289 76094e37 Michael Hanselmann
    # Queued tasks
290 c258f110 Michael Hanselmann
    self._counter = itertools.count()
291 52c47e4e Michael Hanselmann
    self._tasks = []
292 125b74b2 Michael Hanselmann
    self._taskdata = {}
293 76094e37 Michael Hanselmann
294 76094e37 Michael Hanselmann
    # Start workers
295 76094e37 Michael Hanselmann
    self.Resize(num_workers)
296 76094e37 Michael Hanselmann
297 76094e37 Michael Hanselmann
  # TODO: Implement dynamic resizing?
298 76094e37 Michael Hanselmann
299 c2a8e8ba Guido Trotter
  def _WaitWhileQuiescingUnlocked(self):
300 c2a8e8ba Guido Trotter
    """Wait until the worker pool has finished quiescing.
301 c2a8e8ba Guido Trotter

302 c2a8e8ba Guido Trotter
    """
303 c2a8e8ba Guido Trotter
    while self._quiescing:
304 c2a8e8ba Guido Trotter
      self._pool_to_pool.wait()
305 c2a8e8ba Guido Trotter
306 125b74b2 Michael Hanselmann
  def _AddTaskUnlocked(self, args, priority, task_id):
307 52c47e4e Michael Hanselmann
    """Adds a task to the internal queue.
308 52c47e4e Michael Hanselmann

309 52c47e4e Michael Hanselmann
    @type args: sequence
310 52c47e4e Michael Hanselmann
    @param args: Arguments passed to L{BaseWorker.RunTask}
311 52c47e4e Michael Hanselmann
    @type priority: number
312 52c47e4e Michael Hanselmann
    @param priority: Task priority
313 125b74b2 Michael Hanselmann
    @param task_id: Task ID
314 52c47e4e Michael Hanselmann

315 52c47e4e Michael Hanselmann
    """
316 189d2714 Michael Hanselmann
    assert isinstance(args, (tuple, list)), "Arguments must be a sequence"
317 52c47e4e Michael Hanselmann
    assert isinstance(priority, (int, long)), "Priority must be numeric"
318 bba69414 Michael Hanselmann
    assert task_id is None or isinstance(task_id, (int, long)), \
319 bba69414 Michael Hanselmann
      "Task ID must be numeric or None"
320 189d2714 Michael Hanselmann
321 125b74b2 Michael Hanselmann
    task = [priority, self._counter.next(), task_id, args]
322 125b74b2 Michael Hanselmann
323 125b74b2 Michael Hanselmann
    if task_id is not None:
324 125b74b2 Michael Hanselmann
      assert task_id not in self._taskdata
325 125b74b2 Michael Hanselmann
      # Keep a reference to change priority later if necessary
326 125b74b2 Michael Hanselmann
      self._taskdata[task_id] = task
327 125b74b2 Michael Hanselmann
328 c258f110 Michael Hanselmann
    # A counter is used to ensure elements are processed in their incoming
329 c258f110 Michael Hanselmann
    # order. For processing they're sorted by priority and then counter.
330 125b74b2 Michael Hanselmann
    heapq.heappush(self._tasks, task)
331 189d2714 Michael Hanselmann
332 189d2714 Michael Hanselmann
    # Notify a waiting worker
333 189d2714 Michael Hanselmann
    self._pool_to_worker.notify()
334 189d2714 Michael Hanselmann
335 125b74b2 Michael Hanselmann
  def AddTask(self, args, priority=_DEFAULT_PRIORITY, task_id=None):
336 76094e37 Michael Hanselmann
    """Adds a task to the queue.
337 76094e37 Michael Hanselmann

338 b2e8a4d9 Michael Hanselmann
    @type args: sequence
339 116db7c7 Iustin Pop
    @param args: arguments passed to L{BaseWorker.RunTask}
340 52c47e4e Michael Hanselmann
    @type priority: number
341 52c47e4e Michael Hanselmann
    @param priority: Task priority
342 125b74b2 Michael Hanselmann
    @param task_id: Task ID
343 125b74b2 Michael Hanselmann
    @note: The task ID can be essentially anything that can be used as a
344 125b74b2 Michael Hanselmann
      dictionary key. Callers, however, must ensure a task ID is unique while a
345 125b74b2 Michael Hanselmann
      task is in the pool or while it might return to the pool due to deferring
346 125b74b2 Michael Hanselmann
      using L{DeferTask}.
347 76094e37 Michael Hanselmann

348 76094e37 Michael Hanselmann
    """
349 76094e37 Michael Hanselmann
    self._lock.acquire()
350 76094e37 Michael Hanselmann
    try:
351 c2a8e8ba Guido Trotter
      self._WaitWhileQuiescingUnlocked()
352 125b74b2 Michael Hanselmann
      self._AddTaskUnlocked(args, priority, task_id)
353 76094e37 Michael Hanselmann
    finally:
354 76094e37 Michael Hanselmann
      self._lock.release()
355 76094e37 Michael Hanselmann
356 125b74b2 Michael Hanselmann
  def AddManyTasks(self, tasks, priority=_DEFAULT_PRIORITY, task_id=None):
357 c2a8e8ba Guido Trotter
    """Add a list of tasks to the queue.
358 c2a8e8ba Guido Trotter

359 c2a8e8ba Guido Trotter
    @type tasks: list of tuples
360 c2a8e8ba Guido Trotter
    @param tasks: list of args passed to L{BaseWorker.RunTask}
361 52c47e4e Michael Hanselmann
    @type priority: number or list of numbers
362 52c47e4e Michael Hanselmann
    @param priority: Priority for all added tasks or a list with the priority
363 52c47e4e Michael Hanselmann
                     for each task
364 125b74b2 Michael Hanselmann
    @type task_id: list
365 125b74b2 Michael Hanselmann
    @param task_id: List with the ID for each task
366 125b74b2 Michael Hanselmann
    @note: See L{AddTask} for a note on task IDs.
367 c2a8e8ba Guido Trotter

368 c2a8e8ba Guido Trotter
    """
369 25e557a5 Guido Trotter
    assert compat.all(isinstance(task, (tuple, list)) for task in tasks), \
370 125b74b2 Michael Hanselmann
           "Each task must be a sequence"
371 52c47e4e Michael Hanselmann
    assert (isinstance(priority, (int, long)) or
372 52c47e4e Michael Hanselmann
            compat.all(isinstance(prio, (int, long)) for prio in priority)), \
373 52c47e4e Michael Hanselmann
           "Priority must be numeric or be a list of numeric values"
374 125b74b2 Michael Hanselmann
    assert task_id is None or isinstance(task_id, (tuple, list)), \
375 125b74b2 Michael Hanselmann
           "Task IDs must be in a sequence"
376 52c47e4e Michael Hanselmann
377 52c47e4e Michael Hanselmann
    if isinstance(priority, (int, long)):
378 52c47e4e Michael Hanselmann
      priority = [priority] * len(tasks)
379 52c47e4e Michael Hanselmann
    elif len(priority) != len(tasks):
380 52c47e4e Michael Hanselmann
      raise errors.ProgrammerError("Number of priorities (%s) doesn't match"
381 52c47e4e Michael Hanselmann
                                   " number of tasks (%s)" %
382 52c47e4e Michael Hanselmann
                                   (len(priority), len(tasks)))
383 52c47e4e Michael Hanselmann
384 125b74b2 Michael Hanselmann
    if task_id is None:
385 125b74b2 Michael Hanselmann
      task_id = [None] * len(tasks)
386 125b74b2 Michael Hanselmann
    elif len(task_id) != len(tasks):
387 125b74b2 Michael Hanselmann
      raise errors.ProgrammerError("Number of task IDs (%s) doesn't match"
388 125b74b2 Michael Hanselmann
                                   " number of tasks (%s)" %
389 125b74b2 Michael Hanselmann
                                   (len(task_id), len(tasks)))
390 125b74b2 Michael Hanselmann
391 c2a8e8ba Guido Trotter
    self._lock.acquire()
392 c2a8e8ba Guido Trotter
    try:
393 c2a8e8ba Guido Trotter
      self._WaitWhileQuiescingUnlocked()
394 c2a8e8ba Guido Trotter
395 52c47e4e Michael Hanselmann
      assert compat.all(isinstance(prio, (int, long)) for prio in priority)
396 52c47e4e Michael Hanselmann
      assert len(tasks) == len(priority)
397 125b74b2 Michael Hanselmann
      assert len(tasks) == len(task_id)
398 52c47e4e Michael Hanselmann
399 125b74b2 Michael Hanselmann
      for (args, prio, tid) in zip(tasks, priority, task_id):
400 125b74b2 Michael Hanselmann
        self._AddTaskUnlocked(args, prio, tid)
401 c2a8e8ba Guido Trotter
    finally:
402 c2a8e8ba Guido Trotter
      self._lock.release()
403 c2a8e8ba Guido Trotter
404 9a2564e7 Michael Hanselmann
  def ChangeTaskPriority(self, task_id, priority):
405 9a2564e7 Michael Hanselmann
    """Changes a task's priority.
406 9a2564e7 Michael Hanselmann

407 9a2564e7 Michael Hanselmann
    @param task_id: Task ID
408 9a2564e7 Michael Hanselmann
    @type priority: number
409 9a2564e7 Michael Hanselmann
    @param priority: New task priority
410 9a2564e7 Michael Hanselmann
    @raise NoSuchTask: When the task referred by C{task_id} can not be found
411 9a2564e7 Michael Hanselmann
      (it may never have existed, may have already been processed, or is
412 9a2564e7 Michael Hanselmann
      currently running)
413 9a2564e7 Michael Hanselmann

414 9a2564e7 Michael Hanselmann
    """
415 9a2564e7 Michael Hanselmann
    assert isinstance(priority, (int, long)), "Priority must be numeric"
416 9a2564e7 Michael Hanselmann
417 9a2564e7 Michael Hanselmann
    self._lock.acquire()
418 9a2564e7 Michael Hanselmann
    try:
419 9a2564e7 Michael Hanselmann
      logging.debug("About to change priority of task %s to %s",
420 9a2564e7 Michael Hanselmann
                    task_id, priority)
421 9a2564e7 Michael Hanselmann
422 9a2564e7 Michael Hanselmann
      # Find old task
423 9a2564e7 Michael Hanselmann
      oldtask = self._taskdata.get(task_id, None)
424 9a2564e7 Michael Hanselmann
      if oldtask is None:
425 9a2564e7 Michael Hanselmann
        msg = "Task '%s' was not found" % task_id
426 9a2564e7 Michael Hanselmann
        logging.debug(msg)
427 9a2564e7 Michael Hanselmann
        raise NoSuchTask(msg)
428 9a2564e7 Michael Hanselmann
429 9a2564e7 Michael Hanselmann
      # Prepare new task
430 9a2564e7 Michael Hanselmann
      newtask = [priority] + oldtask[1:]
431 9a2564e7 Michael Hanselmann
432 9a2564e7 Michael Hanselmann
      # Mark old entry as abandoned (this doesn't change the sort order and
433 9a2564e7 Michael Hanselmann
      # therefore doesn't invalidate the heap property of L{self._tasks}).
434 9a2564e7 Michael Hanselmann
      # See also <http://docs.python.org/library/heapq.html#priority-queue-
435 9a2564e7 Michael Hanselmann
      # implementation-notes>.
436 9a2564e7 Michael Hanselmann
      oldtask[-1] = None
437 9a2564e7 Michael Hanselmann
438 9a2564e7 Michael Hanselmann
      # Change reference to new task entry and forget the old one
439 9a2564e7 Michael Hanselmann
      assert task_id is not None
440 9a2564e7 Michael Hanselmann
      self._taskdata[task_id] = newtask
441 9a2564e7 Michael Hanselmann
442 9a2564e7 Michael Hanselmann
      # Add a new task with the old number and arguments
443 9a2564e7 Michael Hanselmann
      heapq.heappush(self._tasks, newtask)
444 9a2564e7 Michael Hanselmann
445 9a2564e7 Michael Hanselmann
      # Notify a waiting worker
446 9a2564e7 Michael Hanselmann
      self._pool_to_worker.notify()
447 9a2564e7 Michael Hanselmann
    finally:
448 9a2564e7 Michael Hanselmann
      self._lock.release()
449 9a2564e7 Michael Hanselmann
450 27caa993 Michael Hanselmann
  def SetActive(self, active):
451 27caa993 Michael Hanselmann
    """Enable/disable processing of tasks.
452 27caa993 Michael Hanselmann

453 27caa993 Michael Hanselmann
    This is different from L{Quiesce} in the sense that this function just
454 27caa993 Michael Hanselmann
    changes an internal flag and doesn't wait for the queue to be empty. Tasks
455 27caa993 Michael Hanselmann
    already being processed continue normally, but no new tasks will be
456 27caa993 Michael Hanselmann
    started. New tasks can still be added.
457 27caa993 Michael Hanselmann

458 27caa993 Michael Hanselmann
    @type active: bool
459 27caa993 Michael Hanselmann
    @param active: Whether tasks should be processed
460 27caa993 Michael Hanselmann

461 27caa993 Michael Hanselmann
    """
462 27caa993 Michael Hanselmann
    self._lock.acquire()
463 27caa993 Michael Hanselmann
    try:
464 27caa993 Michael Hanselmann
      self._active = active
465 27caa993 Michael Hanselmann
466 27caa993 Michael Hanselmann
      if active:
467 27caa993 Michael Hanselmann
        # Tell all workers to continue processing
468 27caa993 Michael Hanselmann
        self._pool_to_worker.notifyAll()
469 27caa993 Michael Hanselmann
    finally:
470 27caa993 Michael Hanselmann
      self._lock.release()
471 27caa993 Michael Hanselmann
472 21c5ad52 Michael Hanselmann
  def _WaitForTaskUnlocked(self, worker):
473 21c5ad52 Michael Hanselmann
    """Waits for a task for a worker.
474 21c5ad52 Michael Hanselmann

475 21c5ad52 Michael Hanselmann
    @type worker: L{BaseWorker}
476 21c5ad52 Michael Hanselmann
    @param worker: Worker thread
477 21c5ad52 Michael Hanselmann

478 21c5ad52 Michael Hanselmann
    """
479 c69c45a7 Michael Hanselmann
    while True:
480 c69c45a7 Michael Hanselmann
      if self._ShouldWorkerTerminateUnlocked(worker):
481 c69c45a7 Michael Hanselmann
        return _TERMINATE
482 21c5ad52 Michael Hanselmann
483 c69c45a7 Michael Hanselmann
      # If there's a pending task, return it immediately
484 c69c45a7 Michael Hanselmann
      if self._active and self._tasks:
485 c69c45a7 Michael Hanselmann
        # Get task from queue and tell pool about it
486 c69c45a7 Michael Hanselmann
        try:
487 c69c45a7 Michael Hanselmann
          task = heapq.heappop(self._tasks)
488 c69c45a7 Michael Hanselmann
        finally:
489 c69c45a7 Michael Hanselmann
          self._worker_to_pool.notifyAll()
490 21c5ad52 Michael Hanselmann
491 9a2564e7 Michael Hanselmann
        (_, _, task_id, args) = task
492 9a2564e7 Michael Hanselmann
493 9a2564e7 Michael Hanselmann
        # If the priority was changed, "args" is None
494 9a2564e7 Michael Hanselmann
        if args is None:
495 9a2564e7 Michael Hanselmann
          # Try again
496 9a2564e7 Michael Hanselmann
          logging.debug("Found abandoned task (%r)", task)
497 9a2564e7 Michael Hanselmann
          continue
498 9a2564e7 Michael Hanselmann
499 125b74b2 Michael Hanselmann
        # Delete reference
500 125b74b2 Michael Hanselmann
        if task_id is not None:
501 125b74b2 Michael Hanselmann
          del self._taskdata[task_id]
502 125b74b2 Michael Hanselmann
503 c69c45a7 Michael Hanselmann
        return task
504 21c5ad52 Michael Hanselmann
505 c69c45a7 Michael Hanselmann
      logging.debug("Waiting for tasks")
506 21c5ad52 Michael Hanselmann
507 c69c45a7 Michael Hanselmann
      # wait() releases the lock and sleeps until notified
508 c69c45a7 Michael Hanselmann
      self._pool_to_worker.wait()
509 21c5ad52 Michael Hanselmann
510 c69c45a7 Michael Hanselmann
      logging.debug("Notified while waiting")
511 21c5ad52 Michael Hanselmann
512 76094e37 Michael Hanselmann
  def _ShouldWorkerTerminateUnlocked(self, worker):
513 76094e37 Michael Hanselmann
    """Returns whether a worker should terminate.
514 76094e37 Michael Hanselmann

515 76094e37 Michael Hanselmann
    """
516 76094e37 Michael Hanselmann
    return (worker in self._termworkers)
517 76094e37 Michael Hanselmann
518 76094e37 Michael Hanselmann
  def _HasRunningTasksUnlocked(self):
519 76094e37 Michael Hanselmann
    """Checks whether there's a task running in a worker.
520 76094e37 Michael Hanselmann

521 76094e37 Michael Hanselmann
    """
522 76094e37 Michael Hanselmann
    for worker in self._workers + self._termworkers:
523 b459a848 Andrea Spadaccini
      if worker._HasRunningTaskUnlocked(): # pylint: disable=W0212
524 76094e37 Michael Hanselmann
        return True
525 76094e37 Michael Hanselmann
    return False
526 76094e37 Michael Hanselmann
527 ef52306a Michael Hanselmann
  def HasRunningTasks(self):
528 ef52306a Michael Hanselmann
    """Checks whether there's at least one task running.
529 ef52306a Michael Hanselmann

530 ef52306a Michael Hanselmann
    """
531 ef52306a Michael Hanselmann
    self._lock.acquire()
532 ef52306a Michael Hanselmann
    try:
533 ef52306a Michael Hanselmann
      return self._HasRunningTasksUnlocked()
534 ef52306a Michael Hanselmann
    finally:
535 ef52306a Michael Hanselmann
      self._lock.release()
536 ef52306a Michael Hanselmann
537 76094e37 Michael Hanselmann
  def Quiesce(self):
538 76094e37 Michael Hanselmann
    """Waits until the task queue is empty.
539 76094e37 Michael Hanselmann

540 76094e37 Michael Hanselmann
    """
541 76094e37 Michael Hanselmann
    self._lock.acquire()
542 76094e37 Michael Hanselmann
    try:
543 76094e37 Michael Hanselmann
      self._quiescing = True
544 76094e37 Michael Hanselmann
545 76094e37 Michael Hanselmann
      # Wait while there are tasks pending or running
546 76094e37 Michael Hanselmann
      while self._tasks or self._HasRunningTasksUnlocked():
547 53b1d12b Michael Hanselmann
        self._worker_to_pool.wait()
548 76094e37 Michael Hanselmann
549 76094e37 Michael Hanselmann
    finally:
550 76094e37 Michael Hanselmann
      self._quiescing = False
551 76094e37 Michael Hanselmann
552 76094e37 Michael Hanselmann
      # Make sure AddTasks continues in case it was waiting
553 53b1d12b Michael Hanselmann
      self._pool_to_pool.notifyAll()
554 76094e37 Michael Hanselmann
555 76094e37 Michael Hanselmann
      self._lock.release()
556 76094e37 Michael Hanselmann
557 76094e37 Michael Hanselmann
  def _NewWorkerIdUnlocked(self):
558 116db7c7 Iustin Pop
    """Return an identifier for a new worker.
559 116db7c7 Iustin Pop

560 116db7c7 Iustin Pop
    """
561 76094e37 Michael Hanselmann
    self._last_worker_id += 1
562 89e2b4d2 Michael Hanselmann
563 89e2b4d2 Michael Hanselmann
    return "%s%d" % (self._name, self._last_worker_id)
564 76094e37 Michael Hanselmann
565 76094e37 Michael Hanselmann
  def _ResizeUnlocked(self, num_workers):
566 76094e37 Michael Hanselmann
    """Changes the number of workers.
567 76094e37 Michael Hanselmann

568 76094e37 Michael Hanselmann
    """
569 76094e37 Michael Hanselmann
    assert num_workers >= 0, "num_workers must be >= 0"
570 76094e37 Michael Hanselmann
571 76094e37 Michael Hanselmann
    logging.debug("Resizing to %s workers", num_workers)
572 76094e37 Michael Hanselmann
573 76094e37 Michael Hanselmann
    current_count = len(self._workers)
574 76094e37 Michael Hanselmann
575 76094e37 Michael Hanselmann
    if current_count == num_workers:
576 76094e37 Michael Hanselmann
      # Nothing to do
577 76094e37 Michael Hanselmann
      pass
578 76094e37 Michael Hanselmann
579 76094e37 Michael Hanselmann
    elif current_count > num_workers:
580 76094e37 Michael Hanselmann
      if num_workers == 0:
581 76094e37 Michael Hanselmann
        # Create copy of list to iterate over while lock isn't held.
582 76094e37 Michael Hanselmann
        termworkers = self._workers[:]
583 76094e37 Michael Hanselmann
        del self._workers[:]
584 76094e37 Michael Hanselmann
      else:
585 76094e37 Michael Hanselmann
        # TODO: Implement partial downsizing
586 76094e37 Michael Hanselmann
        raise NotImplementedError()
587 76094e37 Michael Hanselmann
        #termworkers = ...
588 76094e37 Michael Hanselmann
589 76094e37 Michael Hanselmann
      self._termworkers += termworkers
590 76094e37 Michael Hanselmann
591 76094e37 Michael Hanselmann
      # Notify workers that something has changed
592 53b1d12b Michael Hanselmann
      self._pool_to_worker.notifyAll()
593 76094e37 Michael Hanselmann
594 76094e37 Michael Hanselmann
      # Join all terminating workers
595 76094e37 Michael Hanselmann
      self._lock.release()
596 76094e37 Michael Hanselmann
      try:
597 76094e37 Michael Hanselmann
        for worker in termworkers:
598 c0a8eb9e Michael Hanselmann
          logging.debug("Waiting for thread %s", worker.getName())
599 76094e37 Michael Hanselmann
          worker.join()
600 76094e37 Michael Hanselmann
      finally:
601 76094e37 Michael Hanselmann
        self._lock.acquire()
602 76094e37 Michael Hanselmann
603 76094e37 Michael Hanselmann
      # Remove terminated threads. This could be done in a more efficient way
604 76094e37 Michael Hanselmann
      # (del self._termworkers[:]), but checking worker.isAlive() makes sure we
605 76094e37 Michael Hanselmann
      # don't leave zombie threads around.
606 76094e37 Michael Hanselmann
      for worker in termworkers:
607 76094e37 Michael Hanselmann
        assert worker in self._termworkers, ("Worker not in list of"
608 76094e37 Michael Hanselmann
                                             " terminating workers")
609 76094e37 Michael Hanselmann
        if not worker.isAlive():
610 76094e37 Michael Hanselmann
          self._termworkers.remove(worker)
611 76094e37 Michael Hanselmann
612 76094e37 Michael Hanselmann
      assert not self._termworkers, "Zombie worker detected"
613 76094e37 Michael Hanselmann
614 76094e37 Michael Hanselmann
    elif current_count < num_workers:
615 76094e37 Michael Hanselmann
      # Create (num_workers - current_count) new workers
616 f1501b3f Michael Hanselmann
      for _ in range(num_workers - current_count):
617 76094e37 Michael Hanselmann
        worker = self._worker_class(self, self._NewWorkerIdUnlocked())
618 76094e37 Michael Hanselmann
        self._workers.append(worker)
619 76094e37 Michael Hanselmann
        worker.start()
620 76094e37 Michael Hanselmann
621 76094e37 Michael Hanselmann
  def Resize(self, num_workers):
622 76094e37 Michael Hanselmann
    """Changes the number of workers in the pool.
623 76094e37 Michael Hanselmann

624 116db7c7 Iustin Pop
    @param num_workers: the new number of workers
625 76094e37 Michael Hanselmann

626 76094e37 Michael Hanselmann
    """
627 76094e37 Michael Hanselmann
    self._lock.acquire()
628 76094e37 Michael Hanselmann
    try:
629 76094e37 Michael Hanselmann
      return self._ResizeUnlocked(num_workers)
630 76094e37 Michael Hanselmann
    finally:
631 76094e37 Michael Hanselmann
      self._lock.release()
632 76094e37 Michael Hanselmann
633 76094e37 Michael Hanselmann
  def TerminateWorkers(self):
634 76094e37 Michael Hanselmann
    """Terminate all worker threads.
635 76094e37 Michael Hanselmann

636 76094e37 Michael Hanselmann
    Unstarted tasks will be ignored.
637 76094e37 Michael Hanselmann

638 76094e37 Michael Hanselmann
    """
639 76094e37 Michael Hanselmann
    logging.debug("Terminating all workers")
640 76094e37 Michael Hanselmann
641 76094e37 Michael Hanselmann
    self._lock.acquire()
642 76094e37 Michael Hanselmann
    try:
643 76094e37 Michael Hanselmann
      self._ResizeUnlocked(0)
644 76094e37 Michael Hanselmann
645 76094e37 Michael Hanselmann
      if self._tasks:
646 76094e37 Michael Hanselmann
        logging.debug("There are %s tasks left", len(self._tasks))
647 76094e37 Michael Hanselmann
    finally:
648 76094e37 Michael Hanselmann
      self._lock.release()
649 76094e37 Michael Hanselmann
650 76094e37 Michael Hanselmann
    logging.debug("All workers terminated")