Revision 52c47e4e

b/doc/design-2.3.rst
20 20
Core changes
21 21
------------
22 22

  
23
Job priorities
24
~~~~~~~~~~~~~~
25

  
26
Current state and shortcomings
27
++++++++++++++++++++++++++++++
28

  
29
.. TODO: Describe current situation
30

  
31
Proposed changes
32
++++++++++++++++
33

  
34
.. TODO: Describe changes to job queue and potentially client programs
35

  
36
Worker pool
37
^^^^^^^^^^^
38

  
39
To support job priorities in the job queue, the worker pool underlying
40
the job queue must be enhanced to support task priorities. Currently
41
tasks are processed in the order they are added to the queue (but, due
42
to their nature, they don't necessarily finish in that order). All tasks
43
are equal. To support tasks with higher or lower priority, a few changes
44
have to be made to the queue inside a worker pool.
45

  
46
Each task is assigned a priority when added to the queue. This priority
47
can not be changed until the task is executed (this is fine as in all
48
current use-cases, tasks are added to a pool and then forgotten about
49
until they're done).
50

  
51
A task's priority can be compared to Unix' process priorities. The lower
52
the priority number, the closer to the queue's front it is. A task with
53
priority 0 is going to be run before one with priority 10. Tasks with
54
the same priority are executed in the order in which they were added.
55

  
56
While a task is running it can query its own priority. If it's not ready
57
yet for finishing, it can raise an exception to defer itself, optionally
58
changing its own priority. This is useful for the following cases:
59

  
60
- A task is trying to acquire locks, but those locks are still held by
61
  other tasks. By deferring itself, the task gives others a chance to
62
  run. This is especially useful when all workers are busy.
63
- If a task decides it hasn't gotten its locks in a long time, it can
64
  start to increase its own priority.
65
- Tasks waiting for long-running operations running asynchronously could
66
  defer themselves while waiting for a long-running operation.
67

  
68
With these changes, the job queue will be able to implement per-job
69
priorities.
70

  
23 71

  
24 72
Feature changes
25 73
---------------
b/lib/workerpool.py
23 23

  
24 24
"""
25 25

  
26
import collections
27 26
import logging
28 27
import threading
28
import heapq
29 29

  
30 30
from ganeti import compat
31
from ganeti import errors
31 32

  
32 33

  
33 34
_TERMINATE = object()
35
_DEFAULT_PRIORITY = 0
36

  
37

  
38
class DeferTask(Exception):
39
  """Special exception class to defer a task.
40

  
41
  This class can be raised by L{BaseWorker.RunTask} to defer the execution of a
42
  task. Optionally, the priority of the task can be changed.
43

  
44
  """
45
  def __init__(self, priority=None):
46
    """Initializes this class.
47

  
48
    @type priority: number
49
    @param priority: New task priority (None means no change)
50

  
51
    """
52
    Exception.__init__(self)
53
    self.priority = priority
34 54

  
35 55

  
36 56
class BaseWorker(threading.Thread, object):
......
64 84
    finally:
65 85
      self.pool._lock.release()
66 86

  
87
  def GetCurrentPriority(self):
88
    """Returns the priority of the current task.
89

  
90
    Should only be called from within L{RunTask}.
91

  
92
    """
93
    self.pool._lock.acquire()
94
    try:
95
      assert self._HasRunningTaskUnlocked()
96

  
97
      (priority, _, _) = self._current_task
98

  
99
      return priority
100
    finally:
101
      self.pool._lock.release()
102

  
67 103
  def _HasRunningTaskUnlocked(self):
68 104
    """Returns whether this worker is currently running a task.
69 105

  
......
80 116

  
81 117
    while True:
82 118
      assert self._current_task is None
119

  
120
      defer = None
83 121
      try:
84 122
        # Wait on lock to be told either to terminate or to do a task
85 123
        pool._lock.acquire()
......
104 142
        finally:
105 143
          pool._lock.release()
106 144

  
107
        # Run the actual task
145
        (priority, _, args) = self._current_task
108 146
        try:
109
          logging.debug("Starting task %r", self._current_task)
110
          self.RunTask(*self._current_task)
111
          logging.debug("Done with task %r", self._current_task)
147
          # Run the actual task
148
          assert defer is None
149
          logging.debug("Starting task %r, priority %s", args, priority)
150
          self.RunTask(*args) # pylint: disable-msg=W0142
151
          logging.debug("Done with task %r, priority %s", args, priority)
152
        except DeferTask, err:
153
          defer = err
154

  
155
          if defer.priority is None:
156
            # Use same priority
157
            defer.priority = priority
158

  
159
          logging.debug("Deferring task %r, new priority %s", defer.priority)
160

  
161
          assert self._HasRunningTaskUnlocked()
162

  
112 163
        except: # pylint: disable-msg=W0702
113 164
          logging.exception("Caught unhandled exception")
114 165

  
......
117 168
        # Notify pool
118 169
        pool._lock.acquire()
119 170
        try:
171
          if defer:
172
            assert self._current_task
173
            # Schedule again for later run
174
            (_, _, args) = self._current_task
175
            pool._AddTaskUnlocked(args, defer.priority)
176

  
120 177
          if self._current_task:
121 178
            self._current_task = None
122 179
            pool._worker_to_pool.notifyAll()
......
170 227
    self._termworkers = []
171 228

  
172 229
    # Queued tasks
173
    self._tasks = collections.deque()
230
    self._counter = 0
231
    self._tasks = []
174 232

  
175 233
    # Start workers
176 234
    self.Resize(num_workers)
......
184 242
    while self._quiescing:
185 243
      self._pool_to_pool.wait()
186 244

  
187
  def _AddTaskUnlocked(self, args):
245
  def _AddTaskUnlocked(self, args, priority):
246
    """Adds a task to the internal queue.
247

  
248
    @type args: sequence
249
    @param args: Arguments passed to L{BaseWorker.RunTask}
250
    @type priority: number
251
    @param priority: Task priority
252

  
253
    """
188 254
    assert isinstance(args, (tuple, list)), "Arguments must be a sequence"
255
    assert isinstance(priority, (int, long)), "Priority must be numeric"
189 256

  
190
    self._tasks.append(args)
257
    # This counter is used to ensure elements are processed in their
258
    # incoming order. For processing they're sorted by priority and then
259
    # counter.
260
    self._counter += 1
261

  
262
    heapq.heappush(self._tasks, (priority, self._counter, args))
191 263

  
192 264
    # Notify a waiting worker
193 265
    self._pool_to_worker.notify()
194 266

  
195
  def AddTask(self, args):
267
  def AddTask(self, args, priority=_DEFAULT_PRIORITY):
196 268
    """Adds a task to the queue.
197 269

  
198 270
    @type args: sequence
199 271
    @param args: arguments passed to L{BaseWorker.RunTask}
272
    @type priority: number
273
    @param priority: Task priority
200 274

  
201 275
    """
202 276
    self._lock.acquire()
203 277
    try:
204 278
      self._WaitWhileQuiescingUnlocked()
205
      self._AddTaskUnlocked(args)
279
      self._AddTaskUnlocked(args, priority)
206 280
    finally:
207 281
      self._lock.release()
208 282

  
209
  def AddManyTasks(self, tasks):
283
  def AddManyTasks(self, tasks, priority=_DEFAULT_PRIORITY):
210 284
    """Add a list of tasks to the queue.
211 285

  
212 286
    @type tasks: list of tuples
213 287
    @param tasks: list of args passed to L{BaseWorker.RunTask}
288
    @type priority: number or list of numbers
289
    @param priority: Priority for all added tasks or a list with the priority
290
                     for each task
214 291

  
215 292
    """
216 293
    assert compat.all(isinstance(task, (tuple, list)) for task in tasks), \
217 294
      "Each task must be a sequence"
218 295

  
296
    assert (isinstance(priority, (int, long)) or
297
            compat.all(isinstance(prio, (int, long)) for prio in priority)), \
298
           "Priority must be numeric or be a list of numeric values"
299

  
300
    if isinstance(priority, (int, long)):
301
      priority = [priority] * len(tasks)
302
    elif len(priority) != len(tasks):
303
      raise errors.ProgrammerError("Number of priorities (%s) doesn't match"
304
                                   " number of tasks (%s)" %
305
                                   (len(priority), len(tasks)))
306

  
219 307
    self._lock.acquire()
220 308
    try:
221 309
      self._WaitWhileQuiescingUnlocked()
222 310

  
223
      for args in tasks:
224
        self._AddTaskUnlocked(args)
311
      assert compat.all(isinstance(prio, (int, long)) for prio in priority)
312
      assert len(tasks) == len(priority)
313

  
314
      for args, priority in zip(tasks, priority):
315
        self._AddTaskUnlocked(args, priority)
225 316
    finally:
226 317
      self._lock.release()
227 318

  
......
254 345

  
255 346
    # Get task from queue and tell pool about it
256 347
    try:
257
      return self._tasks.popleft()
348
      return heapq.heappop(self._tasks)
258 349
    finally:
259 350
      self._worker_to_pool.notifyAll()
260 351

  
b/test/ganeti.workerpool_unittest.py
1 1
#!/usr/bin/python
2 2
#
3 3

  
4
# Copyright (C) 2008 Google Inc.
4
# Copyright (C) 2008, 2009, 2010 Google Inc.
5 5
#
6 6
# This program is free software; you can redistribute it and/or modify
7 7
# it under the terms of the GNU General Public License as published by
......
26 26
import time
27 27
import sys
28 28
import zlib
29
import random
29 30

  
30 31
from ganeti import workerpool
32
from ganeti import errors
31 33

  
32 34
import testutils
33 35

  
34
class CountingContext(object):
35 36

  
37
class CountingContext(object):
36 38
  def __init__(self):
37 39
    self._lock = threading.Condition(threading.Lock())
38 40
    self.done = 0
......
57 59

  
58 60

  
59 61
class CountingBaseWorker(workerpool.BaseWorker):
60

  
61 62
  def RunTask(self, ctx, text):
62 63
    ctx.DoneTask()
63 64

  
......
83 84
      ctx.lock.release()
84 85

  
85 86

  
87
class ListBuilderContext:
88
  def __init__(self):
89
    self.lock = threading.Lock()
90
    self.result = []
91
    self.prioresult = {}
92

  
93

  
94
class ListBuilderWorker(workerpool.BaseWorker):
95
  def RunTask(self, ctx, data):
96
    ctx.lock.acquire()
97
    try:
98
      ctx.result.append((self.GetCurrentPriority(), data))
99
      ctx.prioresult.setdefault(self.GetCurrentPriority(), []).append(data)
100
    finally:
101
      ctx.lock.release()
102

  
103

  
104
class DeferringTaskContext:
105
  def __init__(self):
106
    self.lock = threading.Lock()
107
    self.prioresult = {}
108
    self.samepriodefer = {}
109

  
110

  
111
class DeferringWorker(workerpool.BaseWorker):
112
  def RunTask(self, ctx, num, targetprio):
113
    ctx.lock.acquire()
114
    try:
115
      if num in ctx.samepriodefer:
116
        del ctx.samepriodefer[num]
117
        raise workerpool.DeferTask()
118

  
119
      if self.GetCurrentPriority() > targetprio:
120
        raise workerpool.DeferTask(priority=self.GetCurrentPriority() - 1)
121

  
122
      ctx.prioresult.setdefault(self.GetCurrentPriority(), set()).add(num)
123
    finally:
124
      ctx.lock.release()
125

  
126

  
86 127
class TestWorkerpool(unittest.TestCase):
87 128
  """Workerpool tests"""
88 129

  
......
206 247
    finally:
207 248
      wp._lock.release()
208 249

  
250
  def testPriorityChecksum(self):
251
    # Tests whether all tasks are run and, since we're only using a single
252
    # thread, whether everything is started in order and respects the priority
253
    wp = workerpool.WorkerPool("Test", 1, ChecksumBaseWorker)
254
    try:
255
      self._CheckWorkerCount(wp, 1)
256

  
257
      ctx = ChecksumContext()
258

  
259
      data = {}
260
      tasks = []
261
      priorities = []
262
      for i in range(1, 333):
263
        prio = i % 7
264
        tasks.append((ctx, i))
265
        priorities.append(prio)
266
        data.setdefault(prio, []).append(i)
267

  
268
      wp.AddManyTasks(tasks, priority=priorities)
269

  
270
      wp.Quiesce()
271

  
272
      self._CheckNoTasks(wp)
273

  
274
      # Check sum
275
      ctx.lock.acquire()
276
      try:
277
        checksum = ChecksumContext.CHECKSUM_START
278
        for priority in sorted(data.keys()):
279
          for i in data[priority]:
280
            checksum = ChecksumContext.UpdateChecksum(checksum, i)
281

  
282
        self.assertEqual(checksum, ctx.checksum)
283
      finally:
284
        ctx.lock.release()
285

  
286
      self._CheckWorkerCount(wp, 1)
287
    finally:
288
      wp.TerminateWorkers()
289
      self._CheckWorkerCount(wp, 0)
290

  
291
  def testPriorityListManyTasks(self):
292
    # Tests whether all tasks are run and, since we're only using a single
293
    # thread, whether everything is started in order and respects the priority
294
    wp = workerpool.WorkerPool("Test", 1, ListBuilderWorker)
295
    try:
296
      self._CheckWorkerCount(wp, 1)
297

  
298
      ctx = ListBuilderContext()
299

  
300
      # Use static seed for this test
301
      rnd = random.Random(0)
302

  
303
      data = {}
304
      tasks = []
305
      priorities = []
306
      for i in range(1, 333):
307
        prio = int(rnd.random() * 10)
308
        tasks.append((ctx, i))
309
        priorities.append(prio)
310
        data.setdefault(prio, []).append((prio, i))
311

  
312
      wp.AddManyTasks(tasks, priority=priorities)
313

  
314
      self.assertRaises(errors.ProgrammerError, wp.AddManyTasks,
315
                        [("x", ), ("y", )], priority=[1] * 5)
316

  
317
      wp.Quiesce()
318

  
319
      self._CheckNoTasks(wp)
320

  
321
      # Check result
322
      ctx.lock.acquire()
323
      try:
324
        expresult = []
325
        for priority in sorted(data.keys()):
326
          expresult.extend(data[priority])
327

  
328
        self.assertEqual(expresult, ctx.result)
329
      finally:
330
        ctx.lock.release()
331

  
332
      self._CheckWorkerCount(wp, 1)
333
    finally:
334
      wp.TerminateWorkers()
335
      self._CheckWorkerCount(wp, 0)
336

  
337
  def testPriorityListSingleTasks(self):
338
    # Tests whether all tasks are run and, since we're only using a single
339
    # thread, whether everything is started in order and respects the priority
340
    wp = workerpool.WorkerPool("Test", 1, ListBuilderWorker)
341
    try:
342
      self._CheckWorkerCount(wp, 1)
343

  
344
      ctx = ListBuilderContext()
345

  
346
      # Use static seed for this test
347
      rnd = random.Random(26279)
348

  
349
      data = {}
350
      for i in range(1, 333):
351
        prio = int(rnd.random() * 30)
352
        wp.AddTask((ctx, i), priority=prio)
353
        data.setdefault(prio, []).append(i)
354

  
355
        # Cause some distortion
356
        if i % 11 == 0:
357
          time.sleep(.001)
358
        if i % 41 == 0:
359
          wp.Quiesce()
360

  
361
      wp.Quiesce()
362

  
363
      self._CheckNoTasks(wp)
364

  
365
      # Check result
366
      ctx.lock.acquire()
367
      try:
368
        self.assertEqual(data, ctx.prioresult)
369
      finally:
370
        ctx.lock.release()
371

  
372
      self._CheckWorkerCount(wp, 1)
373
    finally:
374
      wp.TerminateWorkers()
375
      self._CheckWorkerCount(wp, 0)
376

  
377
  def testPriorityListSingleTasks(self):
378
    # Tests whether all tasks are run and, since we're only using a single
379
    # thread, whether everything is started in order and respects the priority
380
    wp = workerpool.WorkerPool("Test", 1, ListBuilderWorker)
381
    try:
382
      self._CheckWorkerCount(wp, 1)
383

  
384
      ctx = ListBuilderContext()
385

  
386
      # Use static seed for this test
387
      rnd = random.Random(26279)
388

  
389
      data = {}
390
      for i in range(1, 333):
391
        prio = int(rnd.random() * 30)
392
        wp.AddTask((ctx, i), priority=prio)
393
        data.setdefault(prio, []).append(i)
394

  
395
        # Cause some distortion
396
        if i % 11 == 0:
397
          time.sleep(.001)
398
        if i % 41 == 0:
399
          wp.Quiesce()
400

  
401
      wp.Quiesce()
402

  
403
      self._CheckNoTasks(wp)
404

  
405
      # Check result
406
      ctx.lock.acquire()
407
      try:
408
        self.assertEqual(data, ctx.prioresult)
409
      finally:
410
        ctx.lock.release()
411

  
412
      self._CheckWorkerCount(wp, 1)
413
    finally:
414
      wp.TerminateWorkers()
415
      self._CheckWorkerCount(wp, 0)
416

  
417
  def testDeferTask(self):
418
    # Tests whether all tasks are run and, since we're only using a single
419
    # thread, whether everything is started in order and respects the priority
420
    wp = workerpool.WorkerPool("Test", 1, DeferringWorker)
421
    try:
422
      self._CheckWorkerCount(wp, 1)
423

  
424
      ctx = DeferringTaskContext()
425

  
426
      # Use static seed for this test
427
      rnd = random.Random(14921)
428

  
429
      data = {}
430
      for i in range(1, 333):
431
        ctx.lock.acquire()
432
        try:
433
          if i % 5 == 0:
434
            ctx.samepriodefer[i] = True
435
        finally:
436
          ctx.lock.release()
437

  
438
        prio = int(rnd.random() * 30)
439
        wp.AddTask((ctx, i, prio), priority=50)
440
        data.setdefault(prio, set()).add(i)
441

  
442
        # Cause some distortion
443
        if i % 24 == 0:
444
          time.sleep(.001)
445
        if i % 31 == 0:
446
          wp.Quiesce()
447

  
448
      wp.Quiesce()
449

  
450
      self._CheckNoTasks(wp)
451

  
452
      # Check result
453
      ctx.lock.acquire()
454
      try:
455
        self.assertEqual(data, ctx.prioresult)
456
      finally:
457
        ctx.lock.release()
458

  
459
      self._CheckWorkerCount(wp, 1)
460
    finally:
461
      wp.TerminateWorkers()
462
      self._CheckWorkerCount(wp, 0)
463

  
209 464

  
210 465
if __name__ == '__main__':
211 466
  testutils.GanetiTestProgram()

Also available in: Unified diff