Breath life in to RAPI for trunk
[ganeti-local] / daemons / ganeti-masterd
index 8e3701c..00214ea 100755 (executable)
@@ -29,7 +29,6 @@ inheritance from parent classes requires it.
 
 import sys
 import SocketServer
-import threading
 import time
 import collections
 import Queue
@@ -52,12 +51,30 @@ from ganeti import utils
 from ganeti import errors
 from ganeti import ssconf
 from ganeti import logger
+from ganeti import workerpool
 
 
+CLIENT_REQUEST_WORKERS = 16
+
 EXIT_NOTMASTER = constants.EXIT_NOTMASTER
 EXIT_NODESETUP_ERROR = constants.EXIT_NODESETUP_ERROR
 
 
+class ClientRequestWorker(workerpool.BaseWorker):
+  def RunTask(self, server, request, client_address):
+    """Process the request.
+
+    This is copied from the code in ThreadingMixIn.
+
+    """
+    try:
+      server.finish_request(request, client_address)
+      server.close_request(request)
+    except:
+      server.handle_error(request, client_address)
+      server.close_request(request)
+
+
 class IOServer(SocketServer.UnixStreamServer):
   """IO thread class.
 
@@ -81,35 +98,21 @@ class IOServer(SocketServer.UnixStreamServer):
 
     # We'll only start threads once we've forked.
     self.jobqueue = None
+    self.request_workers = None
 
     signal.signal(signal.SIGINT, self.handle_quit_signals)
     signal.signal(signal.SIGTERM, self.handle_quit_signals)
 
   def setup_queue(self):
     self.jobqueue = jqueue.JobQueue(self.context)
-
-  def process_request_thread(self, request, client_address):
-    """Process the request.
-
-    This is copied from the code in ThreadingMixIn.
-
-    """
-    try:
-      self.finish_request(request, client_address)
-      self.close_request(request)
-    except:
-      self.handle_error(request, client_address)
-      self.close_request(request)
+    self.request_workers = workerpool.WorkerPool(CLIENT_REQUEST_WORKERS,
+                                                 ClientRequestWorker)
 
   def process_request(self, request, client_address):
-    """Start a new thread to process the request.
-
-    This is copied from the coode in ThreadingMixIn.
+    """Add task to workerpool to process request.
 
     """
-    t = threading.Thread(target=self.process_request_thread,
-                         args=(request, client_address))
-    t.start()
+    self.request_workers.AddTask(self, request, client_address)
 
   def handle_quit_signals(self, signum, frame):
     print "received %s in %s" % (signum, frame)
@@ -132,6 +135,8 @@ class IOServer(SocketServer.UnixStreamServer):
       self.server_close()
       utils.RemoveFile(constants.MASTER_SOCKET)
     finally:
+      if self.request_workers:
+        self.request_workers.TerminateWorkers()
       if self.jobqueue:
         self.jobqueue.Shutdown()
 
@@ -226,72 +231,6 @@ class ClientOps:
       raise ValueError("Invalid operation")
 
 
-def JobRunner(proc, job, context):
-  """Job executor.
-
-  This functions processes a single job in the context of given
-  processor instance.
-
-  Args:
-    proc: Ganeti Processor to run the job on
-    job: The job to run (unserialized format)
-    context: Ganeti shared context
-
-  """
-  job.SetStatus(opcodes.Job.STATUS_RUNNING)
-  fail = False
-  for idx, op in enumerate(job.data.op_list):
-    job.data.op_status[idx] = opcodes.Job.STATUS_RUNNING
-    try:
-      job.data.op_result[idx] = proc.ExecOpCode(op)
-      job.data.op_status[idx] = opcodes.Job.STATUS_SUCCESS
-    except (errors.OpPrereqError, errors.OpExecError), err:
-      fail = True
-      job.data.op_result[idx] = str(err)
-      job.data.op_status[idx] = opcodes.Job.STATUS_FAIL
-  if fail:
-    job.SetStatus(opcodes.Job.STATUS_FAIL)
-  else:
-    job.SetStatus(opcodes.Job.STATUS_SUCCESS)
-
-
-def PoolWorker(worker_id, incoming_queue, context):
-  """A worker thread function.
-
-  This is the actual processor of a single thread of Job execution.
-
-  Args:
-    worker_id: the unique id for this worker
-    incoming_queue: a queue to get jobs from
-    context: the common server context, containing all shared data and
-             synchronization structures.
-
-  """
-  while True:
-    logging.debug("worker %s sleeping", worker_id)
-    item = incoming_queue.get(True)
-    if item is None:
-      break
-    logging.debug("worker %s processing job %s", worker_id, item.data.job_id)
-    proc = mcpu.Processor(context, feedback=lambda x: None)
-    try:
-      JobRunner(proc, item, context)
-    except errors.GenericError, err:
-      msg = "ganeti exception"
-      logging.error(msg, exc_info=err)
-      item.SetStatus(opcodes.Job.STATUS_FAIL, result=[msg])
-    except Exception, err:
-      msg = "unhandled exception"
-      logging.error(msg, exc_info=err)
-      item.SetStatus(opcodes.Job.STATUS_FAIL, result=[msg])
-    except:
-      msg = "unhandled unknown exception"
-      logging.error(msg, exc_info=True)
-      item.SetStatus(opcodes.Job.STATUS_FAIL, result=[msg])
-    logging.debug("worker %s finish job %s", worker_id, item.data.job_id)
-  logging.debug("worker %s exiting", worker_id)
-
-
 class GanetiContext(object):
   """Context common to all ganeti threads.