X-Git-Url: https://code.grnet.gr/git/ganeti-local/blobdiff_plain/38206f3c3e44e87e0dc489e7c073c56f9cd1301d..a162cf5b6e67d48b048df0cd03bdaeb8e5bbdb70:/lib/workerpool.py diff --git a/lib/workerpool.py b/lib/workerpool.py index b895b2c..54b3fb7 100644 --- a/lib/workerpool.py +++ b/lib/workerpool.py @@ -37,9 +37,8 @@ class BaseWorker(threading.Thread, object): def __init__(self, pool, worker_id): """Constructor for BaseWorker thread. - Args: - - pool: Parent worker pool - - worker_id: Identifier for this worker + @param pool: the parent worker pool + @param worker_id: identifier for this worker """ super(BaseWorker, self).__init__() @@ -137,6 +136,8 @@ class BaseWorker(threading.Thread, object): def RunTask(self, *args): """Function called to start a task. + This needs to be implemented by child classes. + """ raise NotImplementedError() @@ -146,19 +147,18 @@ class WorkerPool(object): This class is thread-safe. - Tasks are guaranteed to be started in the order in which they're added to the - pool. Due to the nature of threading, they're not guaranteed to finish in the - same order. + Tasks are guaranteed to be started in the order in which they're + added to the pool. Due to the nature of threading, they're not + guaranteed to finish in the same order. """ def __init__(self, num_workers, worker_class): """Constructor for worker pool. - Args: - - num_workers: Number of workers to be started (dynamic resizing is not - yet implemented) - - worker_class: Class to be instantiated for workers; should derive from - BaseWorker + @param num_workers: number of workers to be started + (dynamic resizing is not yet implemented) + @param worker_class: the class to be instantiated for workers; + should derive from L{BaseWorker} """ # Some of these variables are accessed by BaseWorker @@ -185,8 +185,7 @@ class WorkerPool(object): def AddTask(self, *args): """Adds a task to the queue. - Args: - - *args: Arguments passed to BaseWorker.RunTask + @param args: arguments passed to L{BaseWorker.RunTask} """ self._lock.acquire() @@ -215,7 +214,7 @@ class WorkerPool(object): """ self._lock.acquire() try: - return self._ShouldWorkerTerminateUnlocked(self) + return self._ShouldWorkerTerminateUnlocked(worker) finally: self._lock.release() @@ -249,6 +248,9 @@ class WorkerPool(object): self._lock.release() def _NewWorkerIdUnlocked(self): + """Return an identifier for a new worker. + + """ self._last_worker_id += 1 return self._last_worker_id @@ -303,7 +305,7 @@ class WorkerPool(object): elif current_count < num_workers: # Create (num_workers - current_count) new workers - for i in xrange(num_workers - current_count): + for _ in xrange(num_workers - current_count): worker = self._worker_class(self, self._NewWorkerIdUnlocked()) self._workers.append(worker) worker.start() @@ -311,8 +313,7 @@ class WorkerPool(object): def Resize(self, num_workers): """Changes the number of workers in the pool. - Args: - - num_workers: New number of workers + @param num_workers: the new number of workers """ self._lock.acquire()