Revision 52c47e4e lib/workerpool.py
b/lib/workerpool.py | ||
---|---|---|
23 | 23 |
|
24 | 24 |
""" |
25 | 25 |
|
26 |
import collections |
|
27 | 26 |
import logging |
28 | 27 |
import threading |
28 |
import heapq |
|
29 | 29 |
|
30 | 30 |
from ganeti import compat |
31 |
from ganeti import errors |
|
31 | 32 |
|
32 | 33 |
|
33 | 34 |
_TERMINATE = object() |
35 |
_DEFAULT_PRIORITY = 0 |
|
36 |
|
|
37 |
|
|
38 |
class DeferTask(Exception): |
|
39 |
"""Special exception class to defer a task. |
|
40 |
|
|
41 |
This class can be raised by L{BaseWorker.RunTask} to defer the execution of a |
|
42 |
task. Optionally, the priority of the task can be changed. |
|
43 |
|
|
44 |
""" |
|
45 |
def __init__(self, priority=None): |
|
46 |
"""Initializes this class. |
|
47 |
|
|
48 |
@type priority: number |
|
49 |
@param priority: New task priority (None means no change) |
|
50 |
|
|
51 |
""" |
|
52 |
Exception.__init__(self) |
|
53 |
self.priority = priority |
|
34 | 54 |
|
35 | 55 |
|
36 | 56 |
class BaseWorker(threading.Thread, object): |
... | ... | |
64 | 84 |
finally: |
65 | 85 |
self.pool._lock.release() |
66 | 86 |
|
87 |
def GetCurrentPriority(self): |
|
88 |
"""Returns the priority of the current task. |
|
89 |
|
|
90 |
Should only be called from within L{RunTask}. |
|
91 |
|
|
92 |
""" |
|
93 |
self.pool._lock.acquire() |
|
94 |
try: |
|
95 |
assert self._HasRunningTaskUnlocked() |
|
96 |
|
|
97 |
(priority, _, _) = self._current_task |
|
98 |
|
|
99 |
return priority |
|
100 |
finally: |
|
101 |
self.pool._lock.release() |
|
102 |
|
|
67 | 103 |
def _HasRunningTaskUnlocked(self): |
68 | 104 |
"""Returns whether this worker is currently running a task. |
69 | 105 |
|
... | ... | |
80 | 116 |
|
81 | 117 |
while True: |
82 | 118 |
assert self._current_task is None |
119 |
|
|
120 |
defer = None |
|
83 | 121 |
try: |
84 | 122 |
# Wait on lock to be told either to terminate or to do a task |
85 | 123 |
pool._lock.acquire() |
... | ... | |
104 | 142 |
finally: |
105 | 143 |
pool._lock.release() |
106 | 144 |
|
107 |
# Run the actual task
|
|
145 |
(priority, _, args) = self._current_task
|
|
108 | 146 |
try: |
109 |
logging.debug("Starting task %r", self._current_task) |
|
110 |
self.RunTask(*self._current_task) |
|
111 |
logging.debug("Done with task %r", self._current_task) |
|
147 |
# Run the actual task |
|
148 |
assert defer is None |
|
149 |
logging.debug("Starting task %r, priority %s", args, priority) |
|
150 |
self.RunTask(*args) # pylint: disable-msg=W0142 |
|
151 |
logging.debug("Done with task %r, priority %s", args, priority) |
|
152 |
except DeferTask, err: |
|
153 |
defer = err |
|
154 |
|
|
155 |
if defer.priority is None: |
|
156 |
# Use same priority |
|
157 |
defer.priority = priority |
|
158 |
|
|
159 |
logging.debug("Deferring task %r, new priority %s", defer.priority) |
|
160 |
|
|
161 |
assert self._HasRunningTaskUnlocked() |
|
162 |
|
|
112 | 163 |
except: # pylint: disable-msg=W0702 |
113 | 164 |
logging.exception("Caught unhandled exception") |
114 | 165 |
|
... | ... | |
117 | 168 |
# Notify pool |
118 | 169 |
pool._lock.acquire() |
119 | 170 |
try: |
171 |
if defer: |
|
172 |
assert self._current_task |
|
173 |
# Schedule again for later run |
|
174 |
(_, _, args) = self._current_task |
|
175 |
pool._AddTaskUnlocked(args, defer.priority) |
|
176 |
|
|
120 | 177 |
if self._current_task: |
121 | 178 |
self._current_task = None |
122 | 179 |
pool._worker_to_pool.notifyAll() |
... | ... | |
170 | 227 |
self._termworkers = [] |
171 | 228 |
|
172 | 229 |
# Queued tasks |
173 |
self._tasks = collections.deque() |
|
230 |
self._counter = 0 |
|
231 |
self._tasks = [] |
|
174 | 232 |
|
175 | 233 |
# Start workers |
176 | 234 |
self.Resize(num_workers) |
... | ... | |
184 | 242 |
while self._quiescing: |
185 | 243 |
self._pool_to_pool.wait() |
186 | 244 |
|
187 |
def _AddTaskUnlocked(self, args): |
|
245 |
def _AddTaskUnlocked(self, args, priority): |
|
246 |
"""Adds a task to the internal queue. |
|
247 |
|
|
248 |
@type args: sequence |
|
249 |
@param args: Arguments passed to L{BaseWorker.RunTask} |
|
250 |
@type priority: number |
|
251 |
@param priority: Task priority |
|
252 |
|
|
253 |
""" |
|
188 | 254 |
assert isinstance(args, (tuple, list)), "Arguments must be a sequence" |
255 |
assert isinstance(priority, (int, long)), "Priority must be numeric" |
|
189 | 256 |
|
190 |
self._tasks.append(args) |
|
257 |
# This counter is used to ensure elements are processed in their |
|
258 |
# incoming order. For processing they're sorted by priority and then |
|
259 |
# counter. |
|
260 |
self._counter += 1 |
|
261 |
|
|
262 |
heapq.heappush(self._tasks, (priority, self._counter, args)) |
|
191 | 263 |
|
192 | 264 |
# Notify a waiting worker |
193 | 265 |
self._pool_to_worker.notify() |
194 | 266 |
|
195 |
def AddTask(self, args): |
|
267 |
def AddTask(self, args, priority=_DEFAULT_PRIORITY):
|
|
196 | 268 |
"""Adds a task to the queue. |
197 | 269 |
|
198 | 270 |
@type args: sequence |
199 | 271 |
@param args: arguments passed to L{BaseWorker.RunTask} |
272 |
@type priority: number |
|
273 |
@param priority: Task priority |
|
200 | 274 |
|
201 | 275 |
""" |
202 | 276 |
self._lock.acquire() |
203 | 277 |
try: |
204 | 278 |
self._WaitWhileQuiescingUnlocked() |
205 |
self._AddTaskUnlocked(args) |
|
279 |
self._AddTaskUnlocked(args, priority)
|
|
206 | 280 |
finally: |
207 | 281 |
self._lock.release() |
208 | 282 |
|
209 |
def AddManyTasks(self, tasks): |
|
283 |
def AddManyTasks(self, tasks, priority=_DEFAULT_PRIORITY):
|
|
210 | 284 |
"""Add a list of tasks to the queue. |
211 | 285 |
|
212 | 286 |
@type tasks: list of tuples |
213 | 287 |
@param tasks: list of args passed to L{BaseWorker.RunTask} |
288 |
@type priority: number or list of numbers |
|
289 |
@param priority: Priority for all added tasks or a list with the priority |
|
290 |
for each task |
|
214 | 291 |
|
215 | 292 |
""" |
216 | 293 |
assert compat.all(isinstance(task, (tuple, list)) for task in tasks), \ |
217 | 294 |
"Each task must be a sequence" |
218 | 295 |
|
296 |
assert (isinstance(priority, (int, long)) or |
|
297 |
compat.all(isinstance(prio, (int, long)) for prio in priority)), \ |
|
298 |
"Priority must be numeric or be a list of numeric values" |
|
299 |
|
|
300 |
if isinstance(priority, (int, long)): |
|
301 |
priority = [priority] * len(tasks) |
|
302 |
elif len(priority) != len(tasks): |
|
303 |
raise errors.ProgrammerError("Number of priorities (%s) doesn't match" |
|
304 |
" number of tasks (%s)" % |
|
305 |
(len(priority), len(tasks))) |
|
306 |
|
|
219 | 307 |
self._lock.acquire() |
220 | 308 |
try: |
221 | 309 |
self._WaitWhileQuiescingUnlocked() |
222 | 310 |
|
223 |
for args in tasks: |
|
224 |
self._AddTaskUnlocked(args) |
|
311 |
assert compat.all(isinstance(prio, (int, long)) for prio in priority) |
|
312 |
assert len(tasks) == len(priority) |
|
313 |
|
|
314 |
for args, priority in zip(tasks, priority): |
|
315 |
self._AddTaskUnlocked(args, priority) |
|
225 | 316 |
finally: |
226 | 317 |
self._lock.release() |
227 | 318 |
|
... | ... | |
254 | 345 |
|
255 | 346 |
# Get task from queue and tell pool about it |
256 | 347 |
try: |
257 |
return self._tasks.popleft()
|
|
348 |
return heapq.heappop(self._tasks)
|
|
258 | 349 |
finally: |
259 | 350 |
self._worker_to_pool.notifyAll() |
260 | 351 |
|
Also available in: Unified diff