Revision b3558df1 lib/workerpool.py

b/lib/workerpool.py
48 48
    super(BaseWorker, self).__init__()
49 49
    self.pool = pool
50 50
    self.worker_id = worker_id
51

  
52
    # Also used by WorkerPool
53 51
    self._current_task = None
54 52

  
55 53
  def ShouldTerminate(self):
......
58 56
    """
59 57
    return self.pool.ShouldWorkerTerminate(self)
60 58

  
59
  def _HasRunningTaskUnlocked(self):
60
    """Returns whether this worker is currently running a task.
61

  
62
    """
63
    return (self._current_task is not None)
64

  
65
  def HasRunningTask(self):
66
    """Returns whether this worker is currently running a task.
67

  
68
    """
69
    self.pool._lock.acquire()
70
    try:
71
      return self._HasRunningTaskUnlocked()
72
    finally:
73
      self.pool._lock.release()
74

  
61 75
  def run(self):
62 76
    """Main thread function.
63 77

  
......
66 80
    """
67 81
    pool = self.pool
68 82

  
69
    assert self._current_task is None
83
    assert not self.HasRunningTask()
70 84

  
71 85
    while True:
72 86
      try:
......
78 92

  
79 93
          # We only wait if there's no task for us.
80 94
          if not pool._tasks:
95
            logging.debug("Worker %s: waiting for tasks", self.worker_id)
96

  
81 97
            # wait() releases the lock and sleeps until notified
82 98
            pool._lock.wait()
83 99

  
100
            logging.debug("Worker %s: notified while waiting", self.worker_id)
101

  
84 102
            # Were we woken up in order to terminate?
85 103
            if pool._ShouldWorkerTerminateUnlocked(self):
86 104
              break
......
99 117

  
100 118
        # Run the actual task
101 119
        try:
120
          logging.debug("Worker %s: starting task %r",
121
                        self.worker_id, self._current_task)
102 122
          self.RunTask(*self._current_task)
123
          logging.debug("Worker %s: done with task %r",
124
                        self.worker_id, self._current_task)
103 125
        except:
104 126
          logging.error("Worker %s: Caught unhandled exception",
105 127
                        self.worker_id, exc_info=True)
106 128
      finally:
107
        self._current_task = None
108

  
109 129
        # Notify pool
110 130
        pool._lock.acquire()
111 131
        try:
112
          pool._lock.notifyAll()
132
          if self._current_task:
133
            self._current_task = None
134
            pool._lock.notifyAll()
113 135
        finally:
114 136
          pool._lock.release()
115 137

  
138
    logging.debug("Worker %s: terminates", self.worker_id)
139

  
116 140
  def RunTask(self, *args):
117 141
    """Function called to start a task.
118 142

  
......
198 222

  
199 223
    """
200 224
    for worker in self._workers + self._termworkers:
201
      if worker._current_task is not None:
225
      if worker._HasRunningTaskUnlocked():
202 226
        return True
203 227
    return False
204 228

  

Also available in: Unified diff