Revision 27caa993

b/lib/workerpool.py
246 246
    self._last_worker_id = 0
247 247
    self._workers = []
248 248
    self._quiescing = False
249
    self._active = True
249 250

  
250 251
    # Terminating workers
251 252
    self._termworkers = []
......
340 341
    finally:
341 342
      self._lock.release()
342 343

  
344
  def SetActive(self, active):
345
    """Enable/disable processing of tasks.
346

  
347
    This is different from L{Quiesce} in the sense that this function just
348
    changes an internal flag and doesn't wait for the queue to be empty. Tasks
349
    already being processed continue normally, but no new tasks will be
350
    started. New tasks can still be added.
351

  
352
    @type active: bool
353
    @param active: Whether tasks should be processed
354

  
355
    """
356
    self._lock.acquire()
357
    try:
358
      self._active = active
359

  
360
      if active:
361
        # Tell all workers to continue processing
362
        self._pool_to_worker.notifyAll()
363
    finally:
364
      self._lock.release()
365

  
343 366
  def _WaitForTaskUnlocked(self, worker):
344 367
    """Waits for a task for a worker.
345 368

  
......
351 374
      return _TERMINATE
352 375

  
353 376
    # We only wait if there's no task for us.
354
    if not self._tasks:
377
    if not (self._active and self._tasks):
355 378
      logging.debug("Waiting for tasks")
356 379

  
357 380
      while True:
......
364 387
        if self._ShouldWorkerTerminateUnlocked(worker):
365 388
          return _TERMINATE
366 389

  
367
        if self._tasks:
390
        # Just loop if pool is not processing tasks at this time
391
        if self._active and self._tasks:
368 392
          break
369 393

  
370 394
    # Get task from queue and tell pool about it
b/test/ganeti.workerpool_unittest.py
170 170
      wp.TerminateWorkers()
171 171
      self._CheckWorkerCount(wp, 0)
172 172

  
173
  def testActive(self):
174
    ctx = CountingContext()
175
    wp = workerpool.WorkerPool("TestActive", 5, CountingBaseWorker)
176
    try:
177
      self._CheckWorkerCount(wp, 5)
178
      self.assertTrue(wp._active)
179

  
180
      # Process some tasks
181
      for _ in range(10):
182
        wp.AddTask((ctx, None))
183

  
184
      wp.Quiesce()
185
      self._CheckNoTasks(wp)
186
      self.assertEquals(ctx.GetDoneTasks(), 10)
187

  
188
      # Repeat a few times
189
      for count in range(10):
190
        # Deactivate pool
191
        wp.SetActive(False)
192
        self._CheckNoTasks(wp)
193

  
194
        # Queue some more tasks
195
        for _ in range(10):
196
          wp.AddTask((ctx, None))
197

  
198
        for _ in range(5):
199
          # Short delays to give other threads a chance to cause breakage
200
          time.sleep(.01)
201
          wp.AddTask((ctx, "Hello world %s" % 999))
202
          self.assertFalse(wp._active)
203

  
204
        self.assertEquals(ctx.GetDoneTasks(), 10 + (count * 15))
205

  
206
        # Start processing again
207
        wp.SetActive(True)
208
        self.assertTrue(wp._active)
209

  
210
        # Wait for tasks to finish
211
        wp.Quiesce()
212
        self._CheckNoTasks(wp)
213
        self.assertEquals(ctx.GetDoneTasks(), 10 + (count * 15) + 15)
214

  
215
        self._CheckWorkerCount(wp, 5)
216
    finally:
217
      wp.TerminateWorkers()
218
      self._CheckWorkerCount(wp, 0)
219

  
173 220
  def testChecksum(self):
174 221
    # Tests whether all tasks are run and, since we're only using a single
175 222
    # thread, whether everything is started in order.

Also available in: Unified diff