Revision bba69414

b/lib/workerpool.py
133 133
    """
134 134
    return (self._current_task is not None)
135 135

  
136
  def _GetCurrentOrderAndTaskId(self):
137
    """Returns the order and task ID of the current task.
138

  
139
    Should only be called from within L{RunTask}.
140

  
141
    """
142
    self.pool._lock.acquire()
143
    try:
144
      assert self._HasRunningTaskUnlocked()
145

  
146
      (_, order_id, task_id, _) = self._current_task
147

  
148
      return (order_id, task_id)
149
    finally:
150
      self.pool._lock.release()
151

  
136 152
  def run(self):
137 153
    """Main thread function.
138 154

  
......
202 218
          if defer:
203 219
            assert self._current_task
204 220
            # Schedule again for later run
205
            (_, _, _, args) = self._current_task
206
            pool._AddTaskUnlocked(args, defer.priority, None)
221
            (_, _, task_id, args) = self._current_task
222
            pool._AddTaskUnlocked(args, defer.priority, task_id)
207 223

  
208 224
          if self._current_task:
209 225
            self._current_task = None
......
299 315
    """
300 316
    assert isinstance(args, (tuple, list)), "Arguments must be a sequence"
301 317
    assert isinstance(priority, (int, long)), "Priority must be numeric"
318
    assert task_id is None or isinstance(task_id, (int, long)), \
319
      "Task ID must be numeric or None"
302 320

  
303 321
    task = [priority, self._counter.next(), task_id, args]
304 322

  
b/test/ganeti.workerpool_unittest.py
31 31
from ganeti import workerpool
32 32
from ganeti import errors
33 33
from ganeti import utils
34
from ganeti import compat
34 35

  
35 36
import testutils
36 37

  
......
114 115
    self.lock = threading.Lock()
115 116
    self.prioresult = {}
116 117
    self.samepriodefer = {}
118
    self.num2ordertaskid = {}
117 119

  
118 120

  
119 121
class DeferringWorker(workerpool.BaseWorker):
120 122
  def RunTask(self, ctx, num, targetprio):
121 123
    ctx.lock.acquire()
122 124
    try:
125
      otilst = ctx.num2ordertaskid.setdefault(num, [])
126
      otilst.append(self._GetCurrentOrderAndTaskId())
127

  
123 128
      if num in ctx.samepriodefer:
124 129
        del ctx.samepriodefer[num]
125 130
        raise workerpool.DeferTask()
......
466 471
      rnd = random.Random(14921)
467 472

  
468 473
      data = {}
474
      num2taskid = {}
469 475
      for i in range(1, 333):
470 476
        ctx.lock.acquire()
471 477
        try:
......
475 481
          ctx.lock.release()
476 482

  
477 483
        prio = int(rnd.random() * 30)
478
        wp.AddTask((ctx, i, prio), priority=50)
484
        num2taskid[i] = 1000 * i
485
        wp.AddTask((ctx, i, prio), priority=50,
486
                   task_id=num2taskid[i])
479 487
        data.setdefault(prio, set()).add(i)
480 488

  
481 489
        # Cause some distortion
......
492 500
      ctx.lock.acquire()
493 501
      try:
494 502
        self.assertEqual(data, ctx.prioresult)
503

  
504
        all_order_ids = []
505

  
506
        for (num, numordertaskid) in ctx.num2ordertaskid.items():
507
          order_ids = map(compat.fst, numordertaskid)
508
          self.assertFalse(utils.FindDuplicates(order_ids),
509
                           msg="Order ID has been reused")
510
          all_order_ids.extend(order_ids)
511

  
512
          for task_id in map(compat.snd, numordertaskid):
513
            self.assertEqual(task_id, num2taskid[num],
514
                             msg=("Task %s used different task IDs" % num))
515

  
516
        self.assertFalse(utils.FindDuplicates(all_order_ids),
517
                         msg="Order ID has been reused")
495 518
      finally:
496 519
        ctx.lock.release()
497 520

  

Also available in: Unified diff