Revision 52c47e4e lib/workerpool.py

b/lib/workerpool.py
23 23

  
24 24
"""
25 25

  
26
import collections
27 26
import logging
28 27
import threading
28
import heapq
29 29

  
30 30
from ganeti import compat
31
from ganeti import errors
31 32

  
32 33

  
33 34
_TERMINATE = object()
35
_DEFAULT_PRIORITY = 0
36

  
37

  
38
class DeferTask(Exception):
39
  """Special exception class to defer a task.
40

  
41
  This class can be raised by L{BaseWorker.RunTask} to defer the execution of a
42
  task. Optionally, the priority of the task can be changed.
43

  
44
  """
45
  def __init__(self, priority=None):
46
    """Initializes this class.
47

  
48
    @type priority: number
49
    @param priority: New task priority (None means no change)
50

  
51
    """
52
    Exception.__init__(self)
53
    self.priority = priority
34 54

  
35 55

  
36 56
class BaseWorker(threading.Thread, object):
......
64 84
    finally:
65 85
      self.pool._lock.release()
66 86

  
87
  def GetCurrentPriority(self):
88
    """Returns the priority of the current task.
89

  
90
    Should only be called from within L{RunTask}.
91

  
92
    """
93
    self.pool._lock.acquire()
94
    try:
95
      assert self._HasRunningTaskUnlocked()
96

  
97
      (priority, _, _) = self._current_task
98

  
99
      return priority
100
    finally:
101
      self.pool._lock.release()
102

  
67 103
  def _HasRunningTaskUnlocked(self):
68 104
    """Returns whether this worker is currently running a task.
69 105

  
......
80 116

  
81 117
    while True:
82 118
      assert self._current_task is None
119

  
120
      defer = None
83 121
      try:
84 122
        # Wait on lock to be told either to terminate or to do a task
85 123
        pool._lock.acquire()
......
104 142
        finally:
105 143
          pool._lock.release()
106 144

  
107
        # Run the actual task
145
        (priority, _, args) = self._current_task
108 146
        try:
109
          logging.debug("Starting task %r", self._current_task)
110
          self.RunTask(*self._current_task)
111
          logging.debug("Done with task %r", self._current_task)
147
          # Run the actual task
148
          assert defer is None
149
          logging.debug("Starting task %r, priority %s", args, priority)
150
          self.RunTask(*args) # pylint: disable-msg=W0142
151
          logging.debug("Done with task %r, priority %s", args, priority)
152
        except DeferTask, err:
153
          defer = err
154

  
155
          if defer.priority is None:
156
            # Use same priority
157
            defer.priority = priority
158

  
159
          logging.debug("Deferring task %r, new priority %s", defer.priority)
160

  
161
          assert self._HasRunningTaskUnlocked()
162

  
112 163
        except: # pylint: disable-msg=W0702
113 164
          logging.exception("Caught unhandled exception")
114 165

  
......
117 168
        # Notify pool
118 169
        pool._lock.acquire()
119 170
        try:
171
          if defer:
172
            assert self._current_task
173
            # Schedule again for later run
174
            (_, _, args) = self._current_task
175
            pool._AddTaskUnlocked(args, defer.priority)
176

  
120 177
          if self._current_task:
121 178
            self._current_task = None
122 179
            pool._worker_to_pool.notifyAll()
......
170 227
    self._termworkers = []
171 228

  
172 229
    # Queued tasks
173
    self._tasks = collections.deque()
230
    self._counter = 0
231
    self._tasks = []
174 232

  
175 233
    # Start workers
176 234
    self.Resize(num_workers)
......
184 242
    while self._quiescing:
185 243
      self._pool_to_pool.wait()
186 244

  
187
  def _AddTaskUnlocked(self, args):
245
  def _AddTaskUnlocked(self, args, priority):
246
    """Adds a task to the internal queue.
247

  
248
    @type args: sequence
249
    @param args: Arguments passed to L{BaseWorker.RunTask}
250
    @type priority: number
251
    @param priority: Task priority
252

  
253
    """
188 254
    assert isinstance(args, (tuple, list)), "Arguments must be a sequence"
255
    assert isinstance(priority, (int, long)), "Priority must be numeric"
189 256

  
190
    self._tasks.append(args)
257
    # This counter is used to ensure elements are processed in their
258
    # incoming order. For processing they're sorted by priority and then
259
    # counter.
260
    self._counter += 1
261

  
262
    heapq.heappush(self._tasks, (priority, self._counter, args))
191 263

  
192 264
    # Notify a waiting worker
193 265
    self._pool_to_worker.notify()
194 266

  
195
  def AddTask(self, args):
267
  def AddTask(self, args, priority=_DEFAULT_PRIORITY):
196 268
    """Adds a task to the queue.
197 269

  
198 270
    @type args: sequence
199 271
    @param args: arguments passed to L{BaseWorker.RunTask}
272
    @type priority: number
273
    @param priority: Task priority
200 274

  
201 275
    """
202 276
    self._lock.acquire()
203 277
    try:
204 278
      self._WaitWhileQuiescingUnlocked()
205
      self._AddTaskUnlocked(args)
279
      self._AddTaskUnlocked(args, priority)
206 280
    finally:
207 281
      self._lock.release()
208 282

  
209
  def AddManyTasks(self, tasks):
283
  def AddManyTasks(self, tasks, priority=_DEFAULT_PRIORITY):
210 284
    """Add a list of tasks to the queue.
211 285

  
212 286
    @type tasks: list of tuples
213 287
    @param tasks: list of args passed to L{BaseWorker.RunTask}
288
    @type priority: number or list of numbers
289
    @param priority: Priority for all added tasks or a list with the priority
290
                     for each task
214 291

  
215 292
    """
216 293
    assert compat.all(isinstance(task, (tuple, list)) for task in tasks), \
217 294
      "Each task must be a sequence"
218 295

  
296
    assert (isinstance(priority, (int, long)) or
297
            compat.all(isinstance(prio, (int, long)) for prio in priority)), \
298
           "Priority must be numeric or be a list of numeric values"
299

  
300
    if isinstance(priority, (int, long)):
301
      priority = [priority] * len(tasks)
302
    elif len(priority) != len(tasks):
303
      raise errors.ProgrammerError("Number of priorities (%s) doesn't match"
304
                                   " number of tasks (%s)" %
305
                                   (len(priority), len(tasks)))
306

  
219 307
    self._lock.acquire()
220 308
    try:
221 309
      self._WaitWhileQuiescingUnlocked()
222 310

  
223
      for args in tasks:
224
        self._AddTaskUnlocked(args)
311
      assert compat.all(isinstance(prio, (int, long)) for prio in priority)
312
      assert len(tasks) == len(priority)
313

  
314
      for args, priority in zip(tasks, priority):
315
        self._AddTaskUnlocked(args, priority)
225 316
    finally:
226 317
      self._lock.release()
227 318

  
......
254 345

  
255 346
    # Get task from queue and tell pool about it
256 347
    try:
257
      return self._tasks.popleft()
348
      return heapq.heappop(self._tasks)
258 349
    finally:
259 350
      self._worker_to_pool.notifyAll()
260 351

  

Also available in: Unified diff