Add unittest for cli.FormatResultError
[ganeti-local] / test / ganeti.workerpool_unittest.py
index 549edba..89b3b1a 100755 (executable)
@@ -1,7 +1,7 @@
 #!/usr/bin/python
 #
 
-# Copyright (C) 2008 Google Inc.
+# Copyright (C) 2008, 2009, 2010 Google Inc.
 #
 # This program is free software; you can redistribute it and/or modify
 # it under the terms of the GNU General Public License as published by
@@ -26,13 +26,15 @@ import threading
 import time
 import sys
 import zlib
+import random
 
 from ganeti import workerpool
+from ganeti import errors
 
 import testutils
 
-class CountingContext(object):
 
+class CountingContext(object):
   def __init__(self):
     self._lock = threading.Condition(threading.Lock())
     self.done = 0
@@ -57,7 +59,6 @@ class CountingContext(object):
 
 
 class CountingBaseWorker(workerpool.BaseWorker):
-
   def RunTask(self, ctx, text):
     ctx.DoneTask()
 
@@ -90,6 +91,46 @@ class ChecksumBaseWorker(workerpool.BaseWorker):
       ctx.lock.release()
 
 
+class ListBuilderContext:
+  def __init__(self):
+    self.lock = threading.Lock()
+    self.result = []
+    self.prioresult = {}
+
+
+class ListBuilderWorker(workerpool.BaseWorker):
+  def RunTask(self, ctx, data):
+    ctx.lock.acquire()
+    try:
+      ctx.result.append((self.GetCurrentPriority(), data))
+      ctx.prioresult.setdefault(self.GetCurrentPriority(), []).append(data)
+    finally:
+      ctx.lock.release()
+
+
+class DeferringTaskContext:
+  def __init__(self):
+    self.lock = threading.Lock()
+    self.prioresult = {}
+    self.samepriodefer = {}
+
+
+class DeferringWorker(workerpool.BaseWorker):
+  def RunTask(self, ctx, num, targetprio):
+    ctx.lock.acquire()
+    try:
+      if num in ctx.samepriodefer:
+        del ctx.samepriodefer[num]
+        raise workerpool.DeferTask()
+
+      if self.GetCurrentPriority() > targetprio:
+        raise workerpool.DeferTask(priority=self.GetCurrentPriority() - 1)
+
+      ctx.prioresult.setdefault(self.GetCurrentPriority(), set()).add(num)
+    finally:
+      ctx.lock.release()
+
+
 class TestWorkerpool(unittest.TestCase):
   """Workerpool tests"""
 
@@ -213,6 +254,220 @@ class TestWorkerpool(unittest.TestCase):
     finally:
       wp._lock.release()
 
+  def testPriorityChecksum(self):
+    # Tests whether all tasks are run and, since we're only using a single
+    # thread, whether everything is started in order and respects the priority
+    wp = workerpool.WorkerPool("Test", 1, ChecksumBaseWorker)
+    try:
+      self._CheckWorkerCount(wp, 1)
+
+      ctx = ChecksumContext()
+
+      data = {}
+      tasks = []
+      priorities = []
+      for i in range(1, 333):
+        prio = i % 7
+        tasks.append((ctx, i))
+        priorities.append(prio)
+        data.setdefault(prio, []).append(i)
+
+      wp.AddManyTasks(tasks, priority=priorities)
+
+      wp.Quiesce()
+
+      self._CheckNoTasks(wp)
+
+      # Check sum
+      ctx.lock.acquire()
+      try:
+        checksum = ChecksumContext.CHECKSUM_START
+        for priority in sorted(data.keys()):
+          for i in data[priority]:
+            checksum = ChecksumContext.UpdateChecksum(checksum, i)
+
+        self.assertEqual(checksum, ctx.checksum)
+      finally:
+        ctx.lock.release()
+
+      self._CheckWorkerCount(wp, 1)
+    finally:
+      wp.TerminateWorkers()
+      self._CheckWorkerCount(wp, 0)
+
+  def testPriorityListManyTasks(self):
+    # Tests whether all tasks are run and, since we're only using a single
+    # thread, whether everything is started in order and respects the priority
+    wp = workerpool.WorkerPool("Test", 1, ListBuilderWorker)
+    try:
+      self._CheckWorkerCount(wp, 1)
+
+      ctx = ListBuilderContext()
+
+      # Use static seed for this test
+      rnd = random.Random(0)
+
+      data = {}
+      tasks = []
+      priorities = []
+      for i in range(1, 333):
+        prio = int(rnd.random() * 10)
+        tasks.append((ctx, i))
+        priorities.append(prio)
+        data.setdefault(prio, []).append((prio, i))
+
+      wp.AddManyTasks(tasks, priority=priorities)
+
+      self.assertRaises(errors.ProgrammerError, wp.AddManyTasks,
+                        [("x", ), ("y", )], priority=[1] * 5)
+
+      wp.Quiesce()
+
+      self._CheckNoTasks(wp)
+
+      # Check result
+      ctx.lock.acquire()
+      try:
+        expresult = []
+        for priority in sorted(data.keys()):
+          expresult.extend(data[priority])
+
+        self.assertEqual(expresult, ctx.result)
+      finally:
+        ctx.lock.release()
+
+      self._CheckWorkerCount(wp, 1)
+    finally:
+      wp.TerminateWorkers()
+      self._CheckWorkerCount(wp, 0)
+
+  def testPriorityListSingleTasks(self):
+    # Tests whether all tasks are run and, since we're only using a single
+    # thread, whether everything is started in order and respects the priority
+    wp = workerpool.WorkerPool("Test", 1, ListBuilderWorker)
+    try:
+      self._CheckWorkerCount(wp, 1)
+
+      ctx = ListBuilderContext()
+
+      # Use static seed for this test
+      rnd = random.Random(26279)
+
+      data = {}
+      for i in range(1, 333):
+        prio = int(rnd.random() * 30)
+        wp.AddTask((ctx, i), priority=prio)
+        data.setdefault(prio, []).append(i)
+
+        # Cause some distortion
+        if i % 11 == 0:
+          time.sleep(.001)
+        if i % 41 == 0:
+          wp.Quiesce()
+
+      wp.Quiesce()
+
+      self._CheckNoTasks(wp)
+
+      # Check result
+      ctx.lock.acquire()
+      try:
+        self.assertEqual(data, ctx.prioresult)
+      finally:
+        ctx.lock.release()
+
+      self._CheckWorkerCount(wp, 1)
+    finally:
+      wp.TerminateWorkers()
+      self._CheckWorkerCount(wp, 0)
+
+  def testPriorityListSingleTasks(self):
+    # Tests whether all tasks are run and, since we're only using a single
+    # thread, whether everything is started in order and respects the priority
+    wp = workerpool.WorkerPool("Test", 1, ListBuilderWorker)
+    try:
+      self._CheckWorkerCount(wp, 1)
+
+      ctx = ListBuilderContext()
+
+      # Use static seed for this test
+      rnd = random.Random(26279)
+
+      data = {}
+      for i in range(1, 333):
+        prio = int(rnd.random() * 30)
+        wp.AddTask((ctx, i), priority=prio)
+        data.setdefault(prio, []).append(i)
+
+        # Cause some distortion
+        if i % 11 == 0:
+          time.sleep(.001)
+        if i % 41 == 0:
+          wp.Quiesce()
+
+      wp.Quiesce()
+
+      self._CheckNoTasks(wp)
+
+      # Check result
+      ctx.lock.acquire()
+      try:
+        self.assertEqual(data, ctx.prioresult)
+      finally:
+        ctx.lock.release()
+
+      self._CheckWorkerCount(wp, 1)
+    finally:
+      wp.TerminateWorkers()
+      self._CheckWorkerCount(wp, 0)
+
+  def testDeferTask(self):
+    # Tests whether all tasks are run and, since we're only using a single
+    # thread, whether everything is started in order and respects the priority
+    wp = workerpool.WorkerPool("Test", 1, DeferringWorker)
+    try:
+      self._CheckWorkerCount(wp, 1)
+
+      ctx = DeferringTaskContext()
+
+      # Use static seed for this test
+      rnd = random.Random(14921)
+
+      data = {}
+      for i in range(1, 333):
+        ctx.lock.acquire()
+        try:
+          if i % 5 == 0:
+            ctx.samepriodefer[i] = True
+        finally:
+          ctx.lock.release()
+
+        prio = int(rnd.random() * 30)
+        wp.AddTask((ctx, i, prio), priority=50)
+        data.setdefault(prio, set()).add(i)
+
+        # Cause some distortion
+        if i % 24 == 0:
+          time.sleep(.001)
+        if i % 31 == 0:
+          wp.Quiesce()
+
+      wp.Quiesce()
+
+      self._CheckNoTasks(wp)
+
+      # Check result
+      ctx.lock.acquire()
+      try:
+        self.assertEqual(data, ctx.prioresult)
+      finally:
+        ctx.lock.release()
+
+      self._CheckWorkerCount(wp, 1)
+    finally:
+      wp.TerminateWorkers()
+      self._CheckWorkerCount(wp, 0)
+
 
 if __name__ == '__main__':
   testutils.GanetiTestProgram()