Revision 52c47e4e
b/doc/design-2.3.rst | ||
---|---|---|
20 | 20 |
Core changes |
21 | 21 |
------------ |
22 | 22 |
|
23 |
Job priorities |
|
24 |
~~~~~~~~~~~~~~ |
|
25 |
|
|
26 |
Current state and shortcomings |
|
27 |
++++++++++++++++++++++++++++++ |
|
28 |
|
|
29 |
.. TODO: Describe current situation |
|
30 |
|
|
31 |
Proposed changes |
|
32 |
++++++++++++++++ |
|
33 |
|
|
34 |
.. TODO: Describe changes to job queue and potentially client programs |
|
35 |
|
|
36 |
Worker pool |
|
37 |
^^^^^^^^^^^ |
|
38 |
|
|
39 |
To support job priorities in the job queue, the worker pool underlying |
|
40 |
the job queue must be enhanced to support task priorities. Currently |
|
41 |
tasks are processed in the order they are added to the queue (but, due |
|
42 |
to their nature, they don't necessarily finish in that order). All tasks |
|
43 |
are equal. To support tasks with higher or lower priority, a few changes |
|
44 |
have to be made to the queue inside a worker pool. |
|
45 |
|
|
46 |
Each task is assigned a priority when added to the queue. This priority |
|
47 |
can not be changed until the task is executed (this is fine as in all |
|
48 |
current use-cases, tasks are added to a pool and then forgotten about |
|
49 |
until they're done). |
|
50 |
|
|
51 |
A task's priority can be compared to Unix' process priorities. The lower |
|
52 |
the priority number, the closer to the queue's front it is. A task with |
|
53 |
priority 0 is going to be run before one with priority 10. Tasks with |
|
54 |
the same priority are executed in the order in which they were added. |
|
55 |
|
|
56 |
While a task is running it can query its own priority. If it's not ready |
|
57 |
yet for finishing, it can raise an exception to defer itself, optionally |
|
58 |
changing its own priority. This is useful for the following cases: |
|
59 |
|
|
60 |
- A task is trying to acquire locks, but those locks are still held by |
|
61 |
other tasks. By deferring itself, the task gives others a chance to |
|
62 |
run. This is especially useful when all workers are busy. |
|
63 |
- If a task decides it hasn't gotten its locks in a long time, it can |
|
64 |
start to increase its own priority. |
|
65 |
- Tasks waiting for long-running operations running asynchronously could |
|
66 |
defer themselves while waiting for a long-running operation. |
|
67 |
|
|
68 |
With these changes, the job queue will be able to implement per-job |
|
69 |
priorities. |
|
70 |
|
|
23 | 71 |
|
24 | 72 |
Feature changes |
25 | 73 |
--------------- |
b/lib/workerpool.py | ||
---|---|---|
23 | 23 |
|
24 | 24 |
""" |
25 | 25 |
|
26 |
import collections |
|
27 | 26 |
import logging |
28 | 27 |
import threading |
28 |
import heapq |
|
29 | 29 |
|
30 | 30 |
from ganeti import compat |
31 |
from ganeti import errors |
|
31 | 32 |
|
32 | 33 |
|
33 | 34 |
_TERMINATE = object() |
35 |
_DEFAULT_PRIORITY = 0 |
|
36 |
|
|
37 |
|
|
38 |
class DeferTask(Exception): |
|
39 |
"""Special exception class to defer a task. |
|
40 |
|
|
41 |
This class can be raised by L{BaseWorker.RunTask} to defer the execution of a |
|
42 |
task. Optionally, the priority of the task can be changed. |
|
43 |
|
|
44 |
""" |
|
45 |
def __init__(self, priority=None): |
|
46 |
"""Initializes this class. |
|
47 |
|
|
48 |
@type priority: number |
|
49 |
@param priority: New task priority (None means no change) |
|
50 |
|
|
51 |
""" |
|
52 |
Exception.__init__(self) |
|
53 |
self.priority = priority |
|
34 | 54 |
|
35 | 55 |
|
36 | 56 |
class BaseWorker(threading.Thread, object): |
... | ... | |
64 | 84 |
finally: |
65 | 85 |
self.pool._lock.release() |
66 | 86 |
|
87 |
def GetCurrentPriority(self): |
|
88 |
"""Returns the priority of the current task. |
|
89 |
|
|
90 |
Should only be called from within L{RunTask}. |
|
91 |
|
|
92 |
""" |
|
93 |
self.pool._lock.acquire() |
|
94 |
try: |
|
95 |
assert self._HasRunningTaskUnlocked() |
|
96 |
|
|
97 |
(priority, _, _) = self._current_task |
|
98 |
|
|
99 |
return priority |
|
100 |
finally: |
|
101 |
self.pool._lock.release() |
|
102 |
|
|
67 | 103 |
def _HasRunningTaskUnlocked(self): |
68 | 104 |
"""Returns whether this worker is currently running a task. |
69 | 105 |
|
... | ... | |
80 | 116 |
|
81 | 117 |
while True: |
82 | 118 |
assert self._current_task is None |
119 |
|
|
120 |
defer = None |
|
83 | 121 |
try: |
84 | 122 |
# Wait on lock to be told either to terminate or to do a task |
85 | 123 |
pool._lock.acquire() |
... | ... | |
104 | 142 |
finally: |
105 | 143 |
pool._lock.release() |
106 | 144 |
|
107 |
# Run the actual task
|
|
145 |
(priority, _, args) = self._current_task
|
|
108 | 146 |
try: |
109 |
logging.debug("Starting task %r", self._current_task) |
|
110 |
self.RunTask(*self._current_task) |
|
111 |
logging.debug("Done with task %r", self._current_task) |
|
147 |
# Run the actual task |
|
148 |
assert defer is None |
|
149 |
logging.debug("Starting task %r, priority %s", args, priority) |
|
150 |
self.RunTask(*args) # pylint: disable-msg=W0142 |
|
151 |
logging.debug("Done with task %r, priority %s", args, priority) |
|
152 |
except DeferTask, err: |
|
153 |
defer = err |
|
154 |
|
|
155 |
if defer.priority is None: |
|
156 |
# Use same priority |
|
157 |
defer.priority = priority |
|
158 |
|
|
159 |
logging.debug("Deferring task %r, new priority %s", defer.priority) |
|
160 |
|
|
161 |
assert self._HasRunningTaskUnlocked() |
|
162 |
|
|
112 | 163 |
except: # pylint: disable-msg=W0702 |
113 | 164 |
logging.exception("Caught unhandled exception") |
114 | 165 |
|
... | ... | |
117 | 168 |
# Notify pool |
118 | 169 |
pool._lock.acquire() |
119 | 170 |
try: |
171 |
if defer: |
|
172 |
assert self._current_task |
|
173 |
# Schedule again for later run |
|
174 |
(_, _, args) = self._current_task |
|
175 |
pool._AddTaskUnlocked(args, defer.priority) |
|
176 |
|
|
120 | 177 |
if self._current_task: |
121 | 178 |
self._current_task = None |
122 | 179 |
pool._worker_to_pool.notifyAll() |
... | ... | |
170 | 227 |
self._termworkers = [] |
171 | 228 |
|
172 | 229 |
# Queued tasks |
173 |
self._tasks = collections.deque() |
|
230 |
self._counter = 0 |
|
231 |
self._tasks = [] |
|
174 | 232 |
|
175 | 233 |
# Start workers |
176 | 234 |
self.Resize(num_workers) |
... | ... | |
184 | 242 |
while self._quiescing: |
185 | 243 |
self._pool_to_pool.wait() |
186 | 244 |
|
187 |
def _AddTaskUnlocked(self, args): |
|
245 |
def _AddTaskUnlocked(self, args, priority): |
|
246 |
"""Adds a task to the internal queue. |
|
247 |
|
|
248 |
@type args: sequence |
|
249 |
@param args: Arguments passed to L{BaseWorker.RunTask} |
|
250 |
@type priority: number |
|
251 |
@param priority: Task priority |
|
252 |
|
|
253 |
""" |
|
188 | 254 |
assert isinstance(args, (tuple, list)), "Arguments must be a sequence" |
255 |
assert isinstance(priority, (int, long)), "Priority must be numeric" |
|
189 | 256 |
|
190 |
self._tasks.append(args) |
|
257 |
# This counter is used to ensure elements are processed in their |
|
258 |
# incoming order. For processing they're sorted by priority and then |
|
259 |
# counter. |
|
260 |
self._counter += 1 |
|
261 |
|
|
262 |
heapq.heappush(self._tasks, (priority, self._counter, args)) |
|
191 | 263 |
|
192 | 264 |
# Notify a waiting worker |
193 | 265 |
self._pool_to_worker.notify() |
194 | 266 |
|
195 |
def AddTask(self, args): |
|
267 |
def AddTask(self, args, priority=_DEFAULT_PRIORITY):
|
|
196 | 268 |
"""Adds a task to the queue. |
197 | 269 |
|
198 | 270 |
@type args: sequence |
199 | 271 |
@param args: arguments passed to L{BaseWorker.RunTask} |
272 |
@type priority: number |
|
273 |
@param priority: Task priority |
|
200 | 274 |
|
201 | 275 |
""" |
202 | 276 |
self._lock.acquire() |
203 | 277 |
try: |
204 | 278 |
self._WaitWhileQuiescingUnlocked() |
205 |
self._AddTaskUnlocked(args) |
|
279 |
self._AddTaskUnlocked(args, priority)
|
|
206 | 280 |
finally: |
207 | 281 |
self._lock.release() |
208 | 282 |
|
209 |
def AddManyTasks(self, tasks): |
|
283 |
def AddManyTasks(self, tasks, priority=_DEFAULT_PRIORITY):
|
|
210 | 284 |
"""Add a list of tasks to the queue. |
211 | 285 |
|
212 | 286 |
@type tasks: list of tuples |
213 | 287 |
@param tasks: list of args passed to L{BaseWorker.RunTask} |
288 |
@type priority: number or list of numbers |
|
289 |
@param priority: Priority for all added tasks or a list with the priority |
|
290 |
for each task |
|
214 | 291 |
|
215 | 292 |
""" |
216 | 293 |
assert compat.all(isinstance(task, (tuple, list)) for task in tasks), \ |
217 | 294 |
"Each task must be a sequence" |
218 | 295 |
|
296 |
assert (isinstance(priority, (int, long)) or |
|
297 |
compat.all(isinstance(prio, (int, long)) for prio in priority)), \ |
|
298 |
"Priority must be numeric or be a list of numeric values" |
|
299 |
|
|
300 |
if isinstance(priority, (int, long)): |
|
301 |
priority = [priority] * len(tasks) |
|
302 |
elif len(priority) != len(tasks): |
|
303 |
raise errors.ProgrammerError("Number of priorities (%s) doesn't match" |
|
304 |
" number of tasks (%s)" % |
|
305 |
(len(priority), len(tasks))) |
|
306 |
|
|
219 | 307 |
self._lock.acquire() |
220 | 308 |
try: |
221 | 309 |
self._WaitWhileQuiescingUnlocked() |
222 | 310 |
|
223 |
for args in tasks: |
|
224 |
self._AddTaskUnlocked(args) |
|
311 |
assert compat.all(isinstance(prio, (int, long)) for prio in priority) |
|
312 |
assert len(tasks) == len(priority) |
|
313 |
|
|
314 |
for args, priority in zip(tasks, priority): |
|
315 |
self._AddTaskUnlocked(args, priority) |
|
225 | 316 |
finally: |
226 | 317 |
self._lock.release() |
227 | 318 |
|
... | ... | |
254 | 345 |
|
255 | 346 |
# Get task from queue and tell pool about it |
256 | 347 |
try: |
257 |
return self._tasks.popleft()
|
|
348 |
return heapq.heappop(self._tasks)
|
|
258 | 349 |
finally: |
259 | 350 |
self._worker_to_pool.notifyAll() |
260 | 351 |
|
b/test/ganeti.workerpool_unittest.py | ||
---|---|---|
1 | 1 |
#!/usr/bin/python |
2 | 2 |
# |
3 | 3 |
|
4 |
# Copyright (C) 2008 Google Inc. |
|
4 |
# Copyright (C) 2008, 2009, 2010 Google Inc.
|
|
5 | 5 |
# |
6 | 6 |
# This program is free software; you can redistribute it and/or modify |
7 | 7 |
# it under the terms of the GNU General Public License as published by |
... | ... | |
26 | 26 |
import time |
27 | 27 |
import sys |
28 | 28 |
import zlib |
29 |
import random |
|
29 | 30 |
|
30 | 31 |
from ganeti import workerpool |
32 |
from ganeti import errors |
|
31 | 33 |
|
32 | 34 |
import testutils |
33 | 35 |
|
34 |
class CountingContext(object): |
|
35 | 36 |
|
37 |
class CountingContext(object): |
|
36 | 38 |
def __init__(self): |
37 | 39 |
self._lock = threading.Condition(threading.Lock()) |
38 | 40 |
self.done = 0 |
... | ... | |
57 | 59 |
|
58 | 60 |
|
59 | 61 |
class CountingBaseWorker(workerpool.BaseWorker): |
60 |
|
|
61 | 62 |
def RunTask(self, ctx, text): |
62 | 63 |
ctx.DoneTask() |
63 | 64 |
|
... | ... | |
83 | 84 |
ctx.lock.release() |
84 | 85 |
|
85 | 86 |
|
87 |
class ListBuilderContext: |
|
88 |
def __init__(self): |
|
89 |
self.lock = threading.Lock() |
|
90 |
self.result = [] |
|
91 |
self.prioresult = {} |
|
92 |
|
|
93 |
|
|
94 |
class ListBuilderWorker(workerpool.BaseWorker): |
|
95 |
def RunTask(self, ctx, data): |
|
96 |
ctx.lock.acquire() |
|
97 |
try: |
|
98 |
ctx.result.append((self.GetCurrentPriority(), data)) |
|
99 |
ctx.prioresult.setdefault(self.GetCurrentPriority(), []).append(data) |
|
100 |
finally: |
|
101 |
ctx.lock.release() |
|
102 |
|
|
103 |
|
|
104 |
class DeferringTaskContext: |
|
105 |
def __init__(self): |
|
106 |
self.lock = threading.Lock() |
|
107 |
self.prioresult = {} |
|
108 |
self.samepriodefer = {} |
|
109 |
|
|
110 |
|
|
111 |
class DeferringWorker(workerpool.BaseWorker): |
|
112 |
def RunTask(self, ctx, num, targetprio): |
|
113 |
ctx.lock.acquire() |
|
114 |
try: |
|
115 |
if num in ctx.samepriodefer: |
|
116 |
del ctx.samepriodefer[num] |
|
117 |
raise workerpool.DeferTask() |
|
118 |
|
|
119 |
if self.GetCurrentPriority() > targetprio: |
|
120 |
raise workerpool.DeferTask(priority=self.GetCurrentPriority() - 1) |
|
121 |
|
|
122 |
ctx.prioresult.setdefault(self.GetCurrentPriority(), set()).add(num) |
|
123 |
finally: |
|
124 |
ctx.lock.release() |
|
125 |
|
|
126 |
|
|
86 | 127 |
class TestWorkerpool(unittest.TestCase): |
87 | 128 |
"""Workerpool tests""" |
88 | 129 |
|
... | ... | |
206 | 247 |
finally: |
207 | 248 |
wp._lock.release() |
208 | 249 |
|
250 |
def testPriorityChecksum(self): |
|
251 |
# Tests whether all tasks are run and, since we're only using a single |
|
252 |
# thread, whether everything is started in order and respects the priority |
|
253 |
wp = workerpool.WorkerPool("Test", 1, ChecksumBaseWorker) |
|
254 |
try: |
|
255 |
self._CheckWorkerCount(wp, 1) |
|
256 |
|
|
257 |
ctx = ChecksumContext() |
|
258 |
|
|
259 |
data = {} |
|
260 |
tasks = [] |
|
261 |
priorities = [] |
|
262 |
for i in range(1, 333): |
|
263 |
prio = i % 7 |
|
264 |
tasks.append((ctx, i)) |
|
265 |
priorities.append(prio) |
|
266 |
data.setdefault(prio, []).append(i) |
|
267 |
|
|
268 |
wp.AddManyTasks(tasks, priority=priorities) |
|
269 |
|
|
270 |
wp.Quiesce() |
|
271 |
|
|
272 |
self._CheckNoTasks(wp) |
|
273 |
|
|
274 |
# Check sum |
|
275 |
ctx.lock.acquire() |
|
276 |
try: |
|
277 |
checksum = ChecksumContext.CHECKSUM_START |
|
278 |
for priority in sorted(data.keys()): |
|
279 |
for i in data[priority]: |
|
280 |
checksum = ChecksumContext.UpdateChecksum(checksum, i) |
|
281 |
|
|
282 |
self.assertEqual(checksum, ctx.checksum) |
|
283 |
finally: |
|
284 |
ctx.lock.release() |
|
285 |
|
|
286 |
self._CheckWorkerCount(wp, 1) |
|
287 |
finally: |
|
288 |
wp.TerminateWorkers() |
|
289 |
self._CheckWorkerCount(wp, 0) |
|
290 |
|
|
291 |
def testPriorityListManyTasks(self): |
|
292 |
# Tests whether all tasks are run and, since we're only using a single |
|
293 |
# thread, whether everything is started in order and respects the priority |
|
294 |
wp = workerpool.WorkerPool("Test", 1, ListBuilderWorker) |
|
295 |
try: |
|
296 |
self._CheckWorkerCount(wp, 1) |
|
297 |
|
|
298 |
ctx = ListBuilderContext() |
|
299 |
|
|
300 |
# Use static seed for this test |
|
301 |
rnd = random.Random(0) |
|
302 |
|
|
303 |
data = {} |
|
304 |
tasks = [] |
|
305 |
priorities = [] |
|
306 |
for i in range(1, 333): |
|
307 |
prio = int(rnd.random() * 10) |
|
308 |
tasks.append((ctx, i)) |
|
309 |
priorities.append(prio) |
|
310 |
data.setdefault(prio, []).append((prio, i)) |
|
311 |
|
|
312 |
wp.AddManyTasks(tasks, priority=priorities) |
|
313 |
|
|
314 |
self.assertRaises(errors.ProgrammerError, wp.AddManyTasks, |
|
315 |
[("x", ), ("y", )], priority=[1] * 5) |
|
316 |
|
|
317 |
wp.Quiesce() |
|
318 |
|
|
319 |
self._CheckNoTasks(wp) |
|
320 |
|
|
321 |
# Check result |
|
322 |
ctx.lock.acquire() |
|
323 |
try: |
|
324 |
expresult = [] |
|
325 |
for priority in sorted(data.keys()): |
|
326 |
expresult.extend(data[priority]) |
|
327 |
|
|
328 |
self.assertEqual(expresult, ctx.result) |
|
329 |
finally: |
|
330 |
ctx.lock.release() |
|
331 |
|
|
332 |
self._CheckWorkerCount(wp, 1) |
|
333 |
finally: |
|
334 |
wp.TerminateWorkers() |
|
335 |
self._CheckWorkerCount(wp, 0) |
|
336 |
|
|
337 |
def testPriorityListSingleTasks(self): |
|
338 |
# Tests whether all tasks are run and, since we're only using a single |
|
339 |
# thread, whether everything is started in order and respects the priority |
|
340 |
wp = workerpool.WorkerPool("Test", 1, ListBuilderWorker) |
|
341 |
try: |
|
342 |
self._CheckWorkerCount(wp, 1) |
|
343 |
|
|
344 |
ctx = ListBuilderContext() |
|
345 |
|
|
346 |
# Use static seed for this test |
|
347 |
rnd = random.Random(26279) |
|
348 |
|
|
349 |
data = {} |
|
350 |
for i in range(1, 333): |
|
351 |
prio = int(rnd.random() * 30) |
|
352 |
wp.AddTask((ctx, i), priority=prio) |
|
353 |
data.setdefault(prio, []).append(i) |
|
354 |
|
|
355 |
# Cause some distortion |
|
356 |
if i % 11 == 0: |
|
357 |
time.sleep(.001) |
|
358 |
if i % 41 == 0: |
|
359 |
wp.Quiesce() |
|
360 |
|
|
361 |
wp.Quiesce() |
|
362 |
|
|
363 |
self._CheckNoTasks(wp) |
|
364 |
|
|
365 |
# Check result |
|
366 |
ctx.lock.acquire() |
|
367 |
try: |
|
368 |
self.assertEqual(data, ctx.prioresult) |
|
369 |
finally: |
|
370 |
ctx.lock.release() |
|
371 |
|
|
372 |
self._CheckWorkerCount(wp, 1) |
|
373 |
finally: |
|
374 |
wp.TerminateWorkers() |
|
375 |
self._CheckWorkerCount(wp, 0) |
|
376 |
|
|
377 |
def testPriorityListSingleTasks(self): |
|
378 |
# Tests whether all tasks are run and, since we're only using a single |
|
379 |
# thread, whether everything is started in order and respects the priority |
|
380 |
wp = workerpool.WorkerPool("Test", 1, ListBuilderWorker) |
|
381 |
try: |
|
382 |
self._CheckWorkerCount(wp, 1) |
|
383 |
|
|
384 |
ctx = ListBuilderContext() |
|
385 |
|
|
386 |
# Use static seed for this test |
|
387 |
rnd = random.Random(26279) |
|
388 |
|
|
389 |
data = {} |
|
390 |
for i in range(1, 333): |
|
391 |
prio = int(rnd.random() * 30) |
|
392 |
wp.AddTask((ctx, i), priority=prio) |
|
393 |
data.setdefault(prio, []).append(i) |
|
394 |
|
|
395 |
# Cause some distortion |
|
396 |
if i % 11 == 0: |
|
397 |
time.sleep(.001) |
|
398 |
if i % 41 == 0: |
|
399 |
wp.Quiesce() |
|
400 |
|
|
401 |
wp.Quiesce() |
|
402 |
|
|
403 |
self._CheckNoTasks(wp) |
|
404 |
|
|
405 |
# Check result |
|
406 |
ctx.lock.acquire() |
|
407 |
try: |
|
408 |
self.assertEqual(data, ctx.prioresult) |
|
409 |
finally: |
|
410 |
ctx.lock.release() |
|
411 |
|
|
412 |
self._CheckWorkerCount(wp, 1) |
|
413 |
finally: |
|
414 |
wp.TerminateWorkers() |
|
415 |
self._CheckWorkerCount(wp, 0) |
|
416 |
|
|
417 |
def testDeferTask(self): |
|
418 |
# Tests whether all tasks are run and, since we're only using a single |
|
419 |
# thread, whether everything is started in order and respects the priority |
|
420 |
wp = workerpool.WorkerPool("Test", 1, DeferringWorker) |
|
421 |
try: |
|
422 |
self._CheckWorkerCount(wp, 1) |
|
423 |
|
|
424 |
ctx = DeferringTaskContext() |
|
425 |
|
|
426 |
# Use static seed for this test |
|
427 |
rnd = random.Random(14921) |
|
428 |
|
|
429 |
data = {} |
|
430 |
for i in range(1, 333): |
|
431 |
ctx.lock.acquire() |
|
432 |
try: |
|
433 |
if i % 5 == 0: |
|
434 |
ctx.samepriodefer[i] = True |
|
435 |
finally: |
|
436 |
ctx.lock.release() |
|
437 |
|
|
438 |
prio = int(rnd.random() * 30) |
|
439 |
wp.AddTask((ctx, i, prio), priority=50) |
|
440 |
data.setdefault(prio, set()).add(i) |
|
441 |
|
|
442 |
# Cause some distortion |
|
443 |
if i % 24 == 0: |
|
444 |
time.sleep(.001) |
|
445 |
if i % 31 == 0: |
|
446 |
wp.Quiesce() |
|
447 |
|
|
448 |
wp.Quiesce() |
|
449 |
|
|
450 |
self._CheckNoTasks(wp) |
|
451 |
|
|
452 |
# Check result |
|
453 |
ctx.lock.acquire() |
|
454 |
try: |
|
455 |
self.assertEqual(data, ctx.prioresult) |
|
456 |
finally: |
|
457 |
ctx.lock.release() |
|
458 |
|
|
459 |
self._CheckWorkerCount(wp, 1) |
|
460 |
finally: |
|
461 |
wp.TerminateWorkers() |
|
462 |
self._CheckWorkerCount(wp, 0) |
|
463 |
|
|
209 | 464 |
|
210 | 465 |
if __name__ == '__main__': |
211 | 466 |
testutils.GanetiTestProgram() |
Also available in: Unified diff