Statistics
| Branch: | Tag: | Revision:

root / test / py / ganeti.workerpool_unittest.py @ 90066780

History | View | Annotate | Download (19.5 kB)

1
#!/usr/bin/python
2
#
3

    
4
# Copyright (C) 2008, 2009, 2010 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Script for unittesting the workerpool module"""
23

    
24
import unittest
25
import threading
26
import time
27
import sys
28
import zlib
29
import random
30

    
31
from ganeti import workerpool
32
from ganeti import errors
33
from ganeti import utils
34
from ganeti import compat
35

    
36
import testutils
37

    
38

    
39
class CountingContext(object):
40
  def __init__(self):
41
    self._lock = threading.Condition(threading.Lock())
42
    self.done = 0
43

    
44
  def DoneTask(self):
45
    self._lock.acquire()
46
    try:
47
      self.done += 1
48
    finally:
49
      self._lock.release()
50

    
51
  def GetDoneTasks(self):
52
    self._lock.acquire()
53
    try:
54
      return self.done
55
    finally:
56
      self._lock.release()
57

    
58
  @staticmethod
59
  def UpdateChecksum(current, value):
60
    return zlib.adler32(str(value), current)
61

    
62

    
63
class CountingBaseWorker(workerpool.BaseWorker):
64
  def RunTask(self, ctx, text):
65
    ctx.DoneTask()
66

    
67

    
68
class ChecksumContext:
69
  CHECKSUM_START = zlib.adler32("")
70

    
71
  def __init__(self):
72
    self.lock = threading.Condition(threading.Lock())
73
    self.checksum = self.CHECKSUM_START
74

    
75
  @staticmethod
76
  def UpdateChecksum(current, value):
77
    return zlib.adler32(str(value), current)
78

    
79

    
80
class ChecksumBaseWorker(workerpool.BaseWorker):
81
  def RunTask(self, ctx, number):
82
    name = "number%s" % number
83
    self.SetTaskName(name)
84

    
85
    # This assertion needs to be checked before updating the checksum. A
86
    # failing assertion will then cause the result to be wrong.
87
    assert self.getName() == ("%s/%s" % (self._worker_id, name))
88

    
89
    ctx.lock.acquire()
90
    try:
91
      ctx.checksum = ctx.UpdateChecksum(ctx.checksum, number)
92
    finally:
93
      ctx.lock.release()
94

    
95

    
96
class ListBuilderContext:
97
  def __init__(self):
98
    self.lock = threading.Lock()
99
    self.result = []
100
    self.prioresult = {}
101

    
102

    
103
class ListBuilderWorker(workerpool.BaseWorker):
104
  def RunTask(self, ctx, data):
105
    ctx.lock.acquire()
106
    try:
107
      ctx.result.append((self.GetCurrentPriority(), data))
108
      ctx.prioresult.setdefault(self.GetCurrentPriority(), []).append(data)
109
    finally:
110
      ctx.lock.release()
111

    
112

    
113
class DeferringTaskContext:
114
  def __init__(self):
115
    self.lock = threading.Lock()
116
    self.prioresult = {}
117
    self.samepriodefer = {}
118
    self.num2ordertaskid = {}
119

    
120

    
121
class DeferringWorker(workerpool.BaseWorker):
122
  def RunTask(self, ctx, num, targetprio):
123
    ctx.lock.acquire()
124
    try:
125
      otilst = ctx.num2ordertaskid.setdefault(num, [])
126
      otilst.append(self._GetCurrentOrderAndTaskId())
127

    
128
      if num in ctx.samepriodefer:
129
        del ctx.samepriodefer[num]
130
        raise workerpool.DeferTask()
131

    
132
      if self.GetCurrentPriority() > targetprio:
133
        raise workerpool.DeferTask(priority=self.GetCurrentPriority() - 1)
134

    
135
      ctx.prioresult.setdefault(self.GetCurrentPriority(), set()).add(num)
136
    finally:
137
      ctx.lock.release()
138

    
139

    
140
class PriorityContext:
141
  def __init__(self):
142
    self.lock = threading.Lock()
143
    self.result = []
144

    
145

    
146
class PriorityWorker(workerpool.BaseWorker):
147
  def RunTask(self, ctx, data):
148
    ctx.lock.acquire()
149
    try:
150
      ctx.result.append((self.GetCurrentPriority(), data))
151
    finally:
152
      ctx.lock.release()
153

    
154

    
155
class NotImplementedWorker(workerpool.BaseWorker):
156
  def RunTask(self):
157
    raise NotImplementedError
158

    
159

    
160
class TestWorkerpool(unittest.TestCase):
161
  """Workerpool tests"""
162

    
163
  def testCounting(self):
164
    ctx = CountingContext()
165
    wp = workerpool.WorkerPool("Test", 3, CountingBaseWorker)
166
    try:
167
      self._CheckWorkerCount(wp, 3)
168

    
169
      for i in range(10):
170
        wp.AddTask((ctx, "Hello world %s" % i))
171

    
172
      wp.Quiesce()
173
    finally:
174
      wp.TerminateWorkers()
175
      self._CheckWorkerCount(wp, 0)
176

    
177
    self.assertEquals(ctx.GetDoneTasks(), 10)
178

    
179
  def testNoTasks(self):
180
    wp = workerpool.WorkerPool("Test", 3, CountingBaseWorker)
181
    try:
182
      self._CheckWorkerCount(wp, 3)
183
      self._CheckNoTasks(wp)
184
    finally:
185
      wp.TerminateWorkers()
186
      self._CheckWorkerCount(wp, 0)
187

    
188
  def testNoTasksQuiesce(self):
189
    wp = workerpool.WorkerPool("Test", 3, CountingBaseWorker)
190
    try:
191
      self._CheckWorkerCount(wp, 3)
192
      self._CheckNoTasks(wp)
193
      wp.Quiesce()
194
      self._CheckNoTasks(wp)
195
    finally:
196
      wp.TerminateWorkers()
197
      self._CheckWorkerCount(wp, 0)
198

    
199
  def testActive(self):
200
    ctx = CountingContext()
201
    wp = workerpool.WorkerPool("TestActive", 5, CountingBaseWorker)
202
    try:
203
      self._CheckWorkerCount(wp, 5)
204
      self.assertTrue(wp._active)
205

    
206
      # Process some tasks
207
      for _ in range(10):
208
        wp.AddTask((ctx, None))
209

    
210
      wp.Quiesce()
211
      self._CheckNoTasks(wp)
212
      self.assertEquals(ctx.GetDoneTasks(), 10)
213

    
214
      # Repeat a few times
215
      for count in range(10):
216
        # Deactivate pool
217
        wp.SetActive(False)
218
        self._CheckNoTasks(wp)
219

    
220
        # Queue some more tasks
221
        for _ in range(10):
222
          wp.AddTask((ctx, None))
223

    
224
        for _ in range(5):
225
          # Short delays to give other threads a chance to cause breakage
226
          time.sleep(.01)
227
          wp.AddTask((ctx, "Hello world %s" % 999))
228
          self.assertFalse(wp._active)
229

    
230
        self.assertEquals(ctx.GetDoneTasks(), 10 + (count * 15))
231

    
232
        # Start processing again
233
        wp.SetActive(True)
234
        self.assertTrue(wp._active)
235

    
236
        # Wait for tasks to finish
237
        wp.Quiesce()
238
        self._CheckNoTasks(wp)
239
        self.assertEquals(ctx.GetDoneTasks(), 10 + (count * 15) + 15)
240

    
241
        self._CheckWorkerCount(wp, 5)
242
    finally:
243
      wp.TerminateWorkers()
244
      self._CheckWorkerCount(wp, 0)
245

    
246
  def testChecksum(self):
247
    # Tests whether all tasks are run and, since we're only using a single
248
    # thread, whether everything is started in order.
249
    wp = workerpool.WorkerPool("Test", 1, ChecksumBaseWorker)
250
    try:
251
      self._CheckWorkerCount(wp, 1)
252

    
253
      ctx = ChecksumContext()
254
      checksum = ChecksumContext.CHECKSUM_START
255
      for i in range(1, 100):
256
        checksum = ChecksumContext.UpdateChecksum(checksum, i)
257
        wp.AddTask((ctx, i))
258

    
259
      wp.Quiesce()
260

    
261
      self._CheckNoTasks(wp)
262

    
263
      # Check sum
264
      ctx.lock.acquire()
265
      try:
266
        self.assertEqual(checksum, ctx.checksum)
267
      finally:
268
        ctx.lock.release()
269
    finally:
270
      wp.TerminateWorkers()
271
      self._CheckWorkerCount(wp, 0)
272

    
273
  def testAddManyTasks(self):
274
    ctx = CountingContext()
275
    wp = workerpool.WorkerPool("Test", 3, CountingBaseWorker)
276
    try:
277
      self._CheckWorkerCount(wp, 3)
278

    
279
      wp.AddManyTasks([(ctx, "Hello world %s" % i, ) for i in range(10)])
280
      wp.AddTask((ctx, "A separate hello"))
281
      wp.AddTask((ctx, "Once more, hi!"))
282
      wp.AddManyTasks([(ctx, "Hello world %s" % i, ) for i in range(10)])
283

    
284
      wp.Quiesce()
285

    
286
      self._CheckNoTasks(wp)
287
    finally:
288
      wp.TerminateWorkers()
289
      self._CheckWorkerCount(wp, 0)
290

    
291
    self.assertEquals(ctx.GetDoneTasks(), 22)
292

    
293
  def testManyTasksSequence(self):
294
    ctx = CountingContext()
295
    wp = workerpool.WorkerPool("Test", 3, CountingBaseWorker)
296
    try:
297
      self._CheckWorkerCount(wp, 3)
298
      self.assertRaises(AssertionError, wp.AddManyTasks,
299
                        ["Hello world %s" % i for i in range(10)])
300
      self.assertRaises(AssertionError, wp.AddManyTasks,
301
                        [i for i in range(10)])
302
      self.assertRaises(AssertionError, wp.AddManyTasks, [], task_id=0)
303

    
304
      wp.AddManyTasks([(ctx, "Hello world %s" % i, ) for i in range(10)])
305
      wp.AddTask((ctx, "A separate hello"))
306

    
307
      wp.Quiesce()
308

    
309
      self._CheckNoTasks(wp)
310
    finally:
311
      wp.TerminateWorkers()
312
      self._CheckWorkerCount(wp, 0)
313

    
314
    self.assertEquals(ctx.GetDoneTasks(), 11)
315

    
316
  def _CheckNoTasks(self, wp):
317
    wp._lock.acquire()
318
    try:
319
      # The task queue must be empty now
320
      self.assertFalse(wp._tasks)
321
      self.assertFalse(wp._taskdata)
322
    finally:
323
      wp._lock.release()
324

    
325
  def _CheckWorkerCount(self, wp, num_workers):
326
    wp._lock.acquire()
327
    try:
328
      self.assertEqual(len(wp._workers), num_workers)
329
    finally:
330
      wp._lock.release()
331

    
332
  def testPriorityChecksum(self):
333
    # Tests whether all tasks are run and, since we're only using a single
334
    # thread, whether everything is started in order and respects the priority
335
    wp = workerpool.WorkerPool("Test", 1, ChecksumBaseWorker)
336
    try:
337
      self._CheckWorkerCount(wp, 1)
338

    
339
      ctx = ChecksumContext()
340

    
341
      data = {}
342
      tasks = []
343
      priorities = []
344
      for i in range(1, 333):
345
        prio = i % 7
346
        tasks.append((ctx, i))
347
        priorities.append(prio)
348
        data.setdefault(prio, []).append(i)
349

    
350
      wp.AddManyTasks(tasks, priority=priorities)
351

    
352
      wp.Quiesce()
353

    
354
      self._CheckNoTasks(wp)
355

    
356
      # Check sum
357
      ctx.lock.acquire()
358
      try:
359
        checksum = ChecksumContext.CHECKSUM_START
360
        for priority in sorted(data.keys()):
361
          for i in data[priority]:
362
            checksum = ChecksumContext.UpdateChecksum(checksum, i)
363

    
364
        self.assertEqual(checksum, ctx.checksum)
365
      finally:
366
        ctx.lock.release()
367

    
368
      self._CheckWorkerCount(wp, 1)
369
    finally:
370
      wp.TerminateWorkers()
371
      self._CheckWorkerCount(wp, 0)
372

    
373
  def testPriorityListManyTasks(self):
374
    # Tests whether all tasks are run and, since we're only using a single
375
    # thread, whether everything is started in order and respects the priority
376
    wp = workerpool.WorkerPool("Test", 1, ListBuilderWorker)
377
    try:
378
      self._CheckWorkerCount(wp, 1)
379

    
380
      ctx = ListBuilderContext()
381

    
382
      # Use static seed for this test
383
      rnd = random.Random(0)
384

    
385
      data = {}
386
      tasks = []
387
      priorities = []
388
      for i in range(1, 333):
389
        prio = int(rnd.random() * 10)
390
        tasks.append((ctx, i))
391
        priorities.append(prio)
392
        data.setdefault(prio, []).append((prio, i))
393

    
394
      wp.AddManyTasks(tasks, priority=priorities)
395

    
396
      self.assertRaises(errors.ProgrammerError, wp.AddManyTasks,
397
                        [("x", ), ("y", )], priority=[1] * 5)
398
      self.assertRaises(errors.ProgrammerError, wp.AddManyTasks,
399
                        [("x", ), ("y", )], task_id=[1] * 5)
400

    
401
      wp.Quiesce()
402

    
403
      self._CheckNoTasks(wp)
404

    
405
      # Check result
406
      ctx.lock.acquire()
407
      try:
408
        expresult = []
409
        for priority in sorted(data.keys()):
410
          expresult.extend(data[priority])
411

    
412
        self.assertEqual(expresult, ctx.result)
413
      finally:
414
        ctx.lock.release()
415

    
416
      self._CheckWorkerCount(wp, 1)
417
    finally:
418
      wp.TerminateWorkers()
419
      self._CheckWorkerCount(wp, 0)
420

    
421
  def testPriorityListSingleTasks(self):
422
    # Tests whether all tasks are run and, since we're only using a single
423
    # thread, whether everything is started in order and respects the priority
424
    wp = workerpool.WorkerPool("Test", 1, ListBuilderWorker)
425
    try:
426
      self._CheckWorkerCount(wp, 1)
427

    
428
      ctx = ListBuilderContext()
429

    
430
      # Use static seed for this test
431
      rnd = random.Random(26279)
432

    
433
      data = {}
434
      for i in range(1, 333):
435
        prio = int(rnd.random() * 30)
436
        wp.AddTask((ctx, i), priority=prio)
437
        data.setdefault(prio, []).append(i)
438

    
439
        # Cause some distortion
440
        if i % 11 == 0:
441
          time.sleep(.001)
442
        if i % 41 == 0:
443
          wp.Quiesce()
444

    
445
      wp.Quiesce()
446

    
447
      self._CheckNoTasks(wp)
448

    
449
      # Check result
450
      ctx.lock.acquire()
451
      try:
452
        self.assertEqual(data, ctx.prioresult)
453
      finally:
454
        ctx.lock.release()
455

    
456
      self._CheckWorkerCount(wp, 1)
457
    finally:
458
      wp.TerminateWorkers()
459
      self._CheckWorkerCount(wp, 0)
460

    
461
  def testDeferTask(self):
462
    # Tests whether all tasks are run and, since we're only using a single
463
    # thread, whether everything is started in order and respects the priority
464
    wp = workerpool.WorkerPool("Test", 1, DeferringWorker)
465
    try:
466
      self._CheckWorkerCount(wp, 1)
467

    
468
      ctx = DeferringTaskContext()
469

    
470
      # Use static seed for this test
471
      rnd = random.Random(14921)
472

    
473
      data = {}
474
      num2taskid = {}
475
      for i in range(1, 333):
476
        ctx.lock.acquire()
477
        try:
478
          if i % 5 == 0:
479
            ctx.samepriodefer[i] = True
480
        finally:
481
          ctx.lock.release()
482

    
483
        prio = int(rnd.random() * 30)
484
        num2taskid[i] = 1000 * i
485
        wp.AddTask((ctx, i, prio), priority=50,
486
                   task_id=num2taskid[i])
487
        data.setdefault(prio, set()).add(i)
488

    
489
        # Cause some distortion
490
        if i % 24 == 0:
491
          time.sleep(.001)
492
        if i % 31 == 0:
493
          wp.Quiesce()
494

    
495
      wp.Quiesce()
496

    
497
      self._CheckNoTasks(wp)
498

    
499
      # Check result
500
      ctx.lock.acquire()
501
      try:
502
        self.assertEqual(data, ctx.prioresult)
503

    
504
        all_order_ids = []
505

    
506
        for (num, numordertaskid) in ctx.num2ordertaskid.items():
507
          order_ids = map(compat.fst, numordertaskid)
508
          self.assertFalse(utils.FindDuplicates(order_ids),
509
                           msg="Order ID has been reused")
510
          all_order_ids.extend(order_ids)
511

    
512
          for task_id in map(compat.snd, numordertaskid):
513
            self.assertEqual(task_id, num2taskid[num],
514
                             msg=("Task %s used different task IDs" % num))
515

    
516
        self.assertFalse(utils.FindDuplicates(all_order_ids),
517
                         msg="Order ID has been reused")
518
      finally:
519
        ctx.lock.release()
520

    
521
      self._CheckWorkerCount(wp, 1)
522
    finally:
523
      wp.TerminateWorkers()
524
      self._CheckWorkerCount(wp, 0)
525

    
526
  def testChangeTaskPriority(self):
527
    wp = workerpool.WorkerPool("Test", 1, PriorityWorker)
528
    try:
529
      self._CheckWorkerCount(wp, 1)
530

    
531
      ctx = PriorityContext()
532

    
533
      # Use static seed for this test
534
      rnd = random.Random(4727)
535

    
536
      # Disable processing of tasks
537
      wp.SetActive(False)
538

    
539
      # No task ID
540
      self.assertRaises(workerpool.NoSuchTask, wp.ChangeTaskPriority,
541
                        None, 0)
542

    
543
      # Pre-generate task IDs and priorities
544
      count = 100
545
      task_ids = range(0, count)
546
      priorities = range(200, 200 + count) * 2
547

    
548
      rnd.shuffle(task_ids)
549
      rnd.shuffle(priorities)
550

    
551
      # Make sure there are some duplicate priorities, but not all
552
      priorities[count * 2 - 10:count * 2 - 1] = \
553
        priorities[count - 10: count - 1]
554

    
555
      assert len(priorities) == 2 * count
556
      assert priorities[0:(count - 1)] != priorities[count:(2 * count - 1)]
557

    
558
      # Add some tasks; this loop consumes the first half of all previously
559
      # generated priorities
560
      for (idx, task_id) in enumerate(task_ids):
561
        wp.AddTask((ctx, idx),
562
                   priority=priorities.pop(),
563
                   task_id=task_id)
564

    
565
      self.assertEqual(len(wp._tasks), len(task_ids))
566
      self.assertEqual(len(wp._taskdata), len(task_ids))
567

    
568
      # Tasks have been added, so half of the priorities should have been
569
      # consumed
570
      assert len(priorities) == len(task_ids)
571

    
572
      # Change task priority
573
      expected = []
574
      for ((idx, task_id), prio) in zip(enumerate(task_ids), priorities):
575
        wp.ChangeTaskPriority(task_id, prio)
576
        expected.append((prio, idx))
577

    
578
      self.assertEqual(len(wp._taskdata), len(task_ids))
579

    
580
      # Half the entries are now abandoned tasks
581
      self.assertEqual(len(wp._tasks), len(task_ids) * 2)
582

    
583
      assert len(priorities) == count
584
      assert len(task_ids) == count
585

    
586
      # Start processing
587
      wp.SetActive(True)
588

    
589
      # Wait for tasks to finish
590
      wp.Quiesce()
591

    
592
      self._CheckNoTasks(wp)
593

    
594
      for task_id in task_ids:
595
        # All tasks are done
596
        self.assertRaises(workerpool.NoSuchTask, wp.ChangeTaskPriority,
597
                          task_id, 0)
598

    
599
      # Check result
600
      ctx.lock.acquire()
601
      try:
602
        self.assertEqual(ctx.result, sorted(expected))
603
      finally:
604
        ctx.lock.release()
605

    
606
      self._CheckWorkerCount(wp, 1)
607
    finally:
608
      wp.TerminateWorkers()
609
      self._CheckWorkerCount(wp, 0)
610

    
611
  def testChangeTaskPriorityInteralStructures(self):
612
    wp = workerpool.WorkerPool("Test", 1, NotImplementedWorker)
613
    try:
614
      self._CheckWorkerCount(wp, 1)
615

    
616
      # Use static seed for this test
617
      rnd = random.Random(643)
618

    
619
      (num1, num2) = rnd.sample(range(1000), 2)
620

    
621
      # Disable processing of tasks
622
      wp.SetActive(False)
623

    
624
      self.assertFalse(wp._tasks)
625
      self.assertFalse(wp._taskdata)
626

    
627
      # No priority or task ID
628
      wp.AddTask(())
629
      self.assertEqual(wp._tasks, [
630
        [workerpool._DEFAULT_PRIORITY, 0, None, ()],
631
        ])
632
      self.assertFalse(wp._taskdata)
633

    
634
      # No task ID
635
      wp.AddTask((), priority=7413)
636
      self.assertEqual(wp._tasks, [
637
        [workerpool._DEFAULT_PRIORITY, 0, None, ()],
638
        [7413, 1, None, ()],
639
        ])
640
      self.assertFalse(wp._taskdata)
641

    
642
      # Start adding real tasks
643
      wp.AddTask((), priority=10267659, task_id=num1)
644
      self.assertEqual(wp._tasks, [
645
        [workerpool._DEFAULT_PRIORITY, 0, None, ()],
646
        [7413, 1, None, ()],
647
        [10267659, 2, num1, ()],
648
        ])
649
      self.assertEqual(wp._taskdata, {
650
        num1: [10267659, 2, num1, ()],
651
        })
652

    
653
      wp.AddTask((), priority=123, task_id=num2)
654
      self.assertEqual(sorted(wp._tasks), [
655
        [workerpool._DEFAULT_PRIORITY, 0, None, ()],
656
        [123, 3, num2, ()],
657
        [7413, 1, None, ()],
658
        [10267659, 2, num1, ()],
659
        ])
660
      self.assertEqual(wp._taskdata, {
661
        num1: [10267659, 2, num1, ()],
662
        num2: [123, 3, num2, ()],
663
        })
664

    
665
      wp.ChangeTaskPriority(num1, 100)
666
      self.assertEqual(sorted(wp._tasks), [
667
        [workerpool._DEFAULT_PRIORITY, 0, None, ()],
668
        [100, 2, num1, ()],
669
        [123, 3, num2, ()],
670
        [7413, 1, None, ()],
671
        [10267659, 2, num1, None],
672
        ])
673
      self.assertEqual(wp._taskdata, {
674
        num1: [100, 2, num1, ()],
675
        num2: [123, 3, num2, ()],
676
        })
677

    
678
      wp.ChangeTaskPriority(num2, 91337)
679
      self.assertEqual(sorted(wp._tasks), [
680
        [workerpool._DEFAULT_PRIORITY, 0, None, ()],
681
        [100, 2, num1, ()],
682
        [123, 3, num2, None],
683
        [7413, 1, None, ()],
684
        [91337, 3, num2, ()],
685
        [10267659, 2, num1, None],
686
        ])
687
      self.assertEqual(wp._taskdata, {
688
        num1: [100, 2, num1, ()],
689
        num2: [91337, 3, num2, ()],
690
        })
691

    
692
      wp.ChangeTaskPriority(num1, 10139)
693
      self.assertEqual(sorted(wp._tasks), [
694
        [workerpool._DEFAULT_PRIORITY, 0, None, ()],
695
        [100, 2, num1, None],
696
        [123, 3, num2, None],
697
        [7413, 1, None, ()],
698
        [10139, 2, num1, ()],
699
        [91337, 3, num2, ()],
700
        [10267659, 2, num1, None],
701
        ])
702
      self.assertEqual(wp._taskdata, {
703
        num1: [10139, 2, num1, ()],
704
        num2: [91337, 3, num2, ()],
705
        })
706

    
707
      # Change to the same priority once again
708
      wp.ChangeTaskPriority(num1, 10139)
709
      self.assertEqual(sorted(wp._tasks), [
710
        [workerpool._DEFAULT_PRIORITY, 0, None, ()],
711
        [100, 2, num1, None],
712
        [123, 3, num2, None],
713
        [7413, 1, None, ()],
714
        [10139, 2, num1, None],
715
        [10139, 2, num1, ()],
716
        [91337, 3, num2, ()],
717
        [10267659, 2, num1, None],
718
        ])
719
      self.assertEqual(wp._taskdata, {
720
        num1: [10139, 2, num1, ()],
721
        num2: [91337, 3, num2, ()],
722
        })
723

    
724
      self._CheckWorkerCount(wp, 1)
725
    finally:
726
      wp.TerminateWorkers()
727
      self._CheckWorkerCount(wp, 0)
728

    
729

    
730
if __name__ == "__main__":
731
  testutils.GanetiTestProgram()