Add a function to validate and normalize hostnames
[ganeti-local] / lib / workerpool.py
index 54b3fb7..1ec89af 100644 (file)
@@ -34,6 +34,7 @@ class BaseWorker(threading.Thread, object):
   Users of a worker pool must override RunTask in a subclass.
 
   """
+  # pylint: disable-msg=W0212
   def __init__(self, pool, worker_id):
     """Constructor for BaseWorker thread.
 
@@ -41,9 +42,8 @@ class BaseWorker(threading.Thread, object):
     @param worker_id: identifier for this worker
 
     """
-    super(BaseWorker, self).__init__()
+    super(BaseWorker, self).__init__(name=worker_id)
     self.pool = pool
-    self.worker_id = worker_id
     self._current_task = None
 
   def ShouldTerminate(self):
@@ -88,12 +88,12 @@ class BaseWorker(threading.Thread, object):
 
           # We only wait if there's no task for us.
           if not pool._tasks:
-            logging.debug("Worker %s: waiting for tasks", self.worker_id)
+            logging.debug("Waiting for tasks")
 
             # wait() releases the lock and sleeps until notified
             pool._pool_to_worker.wait()
 
-            logging.debug("Worker %s: notified while waiting", self.worker_id)
+            logging.debug("Notified while waiting")
 
             # Were we woken up in order to terminate?
             if pool._ShouldWorkerTerminateUnlocked(self):
@@ -113,14 +113,11 @@ class BaseWorker(threading.Thread, object):
 
         # Run the actual task
         try:
-          logging.debug("Worker %s: starting task %r",
-                        self.worker_id, self._current_task)
+          logging.debug("Starting task %r", self._current_task)
           self.RunTask(*self._current_task)
-          logging.debug("Worker %s: done with task %r",
-                        self.worker_id, self._current_task)
-        except:
-          logging.error("Worker %s: Caught unhandled exception",
-                        self.worker_id, exc_info=True)
+          logging.debug("Done with task %r", self._current_task)
+        except: # pylint: disable-msg=W0702
+          logging.exception("Caught unhandled exception")
       finally:
         # Notify pool
         pool._lock.acquire()
@@ -131,7 +128,7 @@ class BaseWorker(threading.Thread, object):
         finally:
           pool._lock.release()
 
-    logging.debug("Worker %s: terminates", self.worker_id)
+    logging.debug("Terminates")
 
   def RunTask(self, *args):
     """Function called to start a task.
@@ -152,7 +149,7 @@ class WorkerPool(object):
   guaranteed to finish in the same order.
 
   """
-  def __init__(self, num_workers, worker_class):
+  def __init__(self, name, num_workers, worker_class):
     """Constructor for worker pool.
 
     @param num_workers: number of workers to be started
@@ -167,6 +164,7 @@ class WorkerPool(object):
     self._pool_to_worker = threading.Condition(self._lock)
     self._worker_to_pool = threading.Condition(self._lock)
     self._worker_class = worker_class
+    self._name = name
     self._last_worker_id = 0
     self._workers = []
     self._quiescing = False
@@ -223,7 +221,7 @@ class WorkerPool(object):
 
     """
     for worker in self._workers + self._termworkers:
-      if worker._HasRunningTaskUnlocked():
+      if worker._HasRunningTaskUnlocked(): # pylint: disable-msg=W0212
         return True
     return False
 
@@ -252,7 +250,8 @@ class WorkerPool(object):
 
     """
     self._last_worker_id += 1
-    return self._last_worker_id
+
+    return "%s%d" % (self._name, self._last_worker_id)
 
   def _ResizeUnlocked(self, num_workers):
     """Changes the number of workers.
@@ -305,7 +304,7 @@ class WorkerPool(object):
 
     elif current_count < num_workers:
       # Create (num_workers - current_count) new workers
-      for _ in xrange(num_workers - current_count):
+      for _ in range(num_workers - current_count):
         worker = self._worker_class(self, self._NewWorkerIdUnlocked())
         self._workers.append(worker)
         worker.start()