Revision c2a8e8ba

b/lib/workerpool.py
180 180

  
181 181
  # TODO: Implement dynamic resizing?
182 182

  
183
  def _WaitWhileQuiescingUnlocked(self):
184
    """Wait until the worker pool has finished quiescing.
185

  
186
    """
187
    while self._quiescing:
188
      self._pool_to_pool.wait()
189

  
183 190
  def AddTask(self, *args):
184 191
    """Adds a task to the queue.
185 192

  
......
188 195
    """
189 196
    self._lock.acquire()
190 197
    try:
191
      # Don't add new tasks while we're quiescing
192
      while self._quiescing:
193
        self._pool_to_pool.wait()
198
      self._WaitWhileQuiescingUnlocked()
194 199

  
195
      # Add task to internal queue
196 200
      self._tasks.append(args)
197 201

  
198 202
      # Wake one idling worker up
......
200 204
    finally:
201 205
      self._lock.release()
202 206

  
207
  def AddManyTasks(self, tasks):
208
    """Add a list of tasks to the queue.
209

  
210
    @type tasks: list of tuples
211
    @param tasks: list of args passed to L{BaseWorker.RunTask}
212

  
213
    """
214
    self._lock.acquire()
215
    try:
216
      self._WaitWhileQuiescingUnlocked()
217

  
218
      self._tasks.extend(tasks)
219

  
220
      for _ in tasks:
221
        self._pool_to_worker.notify()
222
    finally:
223
      self._lock.release()
224

  
203 225
  def _ShouldWorkerTerminateUnlocked(self, worker):
204 226
    """Returns whether a worker should terminate.
205 227

  
b/test/ganeti.workerpool_unittest.py
121 121
      wp.TerminateWorkers()
122 122
      self._CheckWorkerCount(wp, 0)
123 123

  
124
  def testAddManyTasks(self):
125
    wp = workerpool.WorkerPool("Test", 3, DummyBaseWorker)
126
    try:
127
      self._CheckWorkerCount(wp, 3)
128

  
129
      wp.AddManyTasks(["Hello world %s" % i for i in range(10)])
130
      wp.AddTask("A separate hello")
131
      wp.AddTask("Once more, hi!")
132
      wp.AddManyTasks([("Hello world %s" % i, ) for i in range(10)])
133

  
134
      wp.Quiesce()
135

  
136
      self._CheckNoTasks(wp)
137
    finally:
138
      wp.TerminateWorkers()
139
      self._CheckWorkerCount(wp, 0)
140

  
124 141
  def _CheckNoTasks(self, wp):
125 142
    wp._lock.acquire()
126 143
    try:

Also available in: Unified diff