Breath life in to RAPI for trunk
[ganeti-local] / daemons / ganeti-masterd
index 11cbe34..00214ea 100755 (executable)
@@ -29,7 +29,6 @@ inheritance from parent classes requires it.
 
 import sys
 import SocketServer
-import threading
 import time
 import collections
 import Queue
@@ -52,12 +51,30 @@ from ganeti import utils
 from ganeti import errors
 from ganeti import ssconf
 from ganeti import logger
+from ganeti import workerpool
 
 
+CLIENT_REQUEST_WORKERS = 16
+
 EXIT_NOTMASTER = constants.EXIT_NOTMASTER
 EXIT_NODESETUP_ERROR = constants.EXIT_NODESETUP_ERROR
 
 
+class ClientRequestWorker(workerpool.BaseWorker):
+  def RunTask(self, server, request, client_address):
+    """Process the request.
+
+    This is copied from the code in ThreadingMixIn.
+
+    """
+    try:
+      server.finish_request(request, client_address)
+      server.close_request(request)
+    except:
+      server.handle_error(request, client_address)
+      server.close_request(request)
+
+
 class IOServer(SocketServer.UnixStreamServer):
   """IO thread class.
 
@@ -81,35 +98,21 @@ class IOServer(SocketServer.UnixStreamServer):
 
     # We'll only start threads once we've forked.
     self.jobqueue = None
+    self.request_workers = None
 
     signal.signal(signal.SIGINT, self.handle_quit_signals)
     signal.signal(signal.SIGTERM, self.handle_quit_signals)
 
   def setup_queue(self):
     self.jobqueue = jqueue.JobQueue(self.context)
-
-  def process_request_thread(self, request, client_address):
-    """Process the request.
-
-    This is copied from the code in ThreadingMixIn.
-
-    """
-    try:
-      self.finish_request(request, client_address)
-      self.close_request(request)
-    except:
-      self.handle_error(request, client_address)
-      self.close_request(request)
+    self.request_workers = workerpool.WorkerPool(CLIENT_REQUEST_WORKERS,
+                                                 ClientRequestWorker)
 
   def process_request(self, request, client_address):
-    """Start a new thread to process the request.
-
-    This is copied from the coode in ThreadingMixIn.
+    """Add task to workerpool to process request.
 
     """
-    t = threading.Thread(target=self.process_request_thread,
-                         args=(request, client_address))
-    t.start()
+    self.request_workers.AddTask(self, request, client_address)
 
   def handle_quit_signals(self, signum, frame):
     print "received %s in %s" % (signum, frame)
@@ -132,6 +135,8 @@ class IOServer(SocketServer.UnixStreamServer):
       self.server_close()
       utils.RemoveFile(constants.MASTER_SOCKET)
     finally:
+      if self.request_workers:
+        self.request_workers.TerminateWorkers()
       if self.jobqueue:
         self.jobqueue.Shutdown()