Breath life in to RAPI for trunk
[ganeti-local] / daemons / ganeti-masterd
old mode 100644 (file)
new mode 100755 (executable)
index 24fec0e..00214ea
@@ -1,4 +1,4 @@
-#!/usr/bin/python
+#!/usr/bin/python -u
 #
 
 # Copyright (C) 2006, 2007 Google Inc.
 #
 
 # Copyright (C) 2006, 2007 Google Inc.
@@ -29,32 +29,52 @@ inheritance from parent classes requires it.
 
 import sys
 import SocketServer
 
 import sys
 import SocketServer
-import threading
 import time
 import collections
 import Queue
 import random
 import signal
 import simplejson
 import time
 import collections
 import Queue
 import random
 import signal
 import simplejson
-
+import logging
 
 from cStringIO import StringIO
 from optparse import OptionParser
 
 
 from cStringIO import StringIO
 from optparse import OptionParser
 
+from ganeti import config
 from ganeti import constants
 from ganeti import mcpu
 from ganeti import opcodes
 from ganeti import jqueue
 from ganeti import constants
 from ganeti import mcpu
 from ganeti import opcodes
 from ganeti import jqueue
+from ganeti import locking
 from ganeti import luxi
 from ganeti import utils
 from ganeti import errors
 from ganeti import ssconf
 from ganeti import luxi
 from ganeti import utils
 from ganeti import errors
 from ganeti import ssconf
+from ganeti import logger
+from ganeti import workerpool
+
 
 
+CLIENT_REQUEST_WORKERS = 16
 
 EXIT_NOTMASTER = constants.EXIT_NOTMASTER
 EXIT_NODESETUP_ERROR = constants.EXIT_NODESETUP_ERROR
 
 
 
 EXIT_NOTMASTER = constants.EXIT_NOTMASTER
 EXIT_NODESETUP_ERROR = constants.EXIT_NODESETUP_ERROR
 
 
+class ClientRequestWorker(workerpool.BaseWorker):
+  def RunTask(self, server, request, client_address):
+    """Process the request.
+
+    This is copied from the code in ThreadingMixIn.
+
+    """
+    try:
+      server.finish_request(request, client_address)
+      server.close_request(request)
+    except:
+      server.handle_error(request, client_address)
+      server.close_request(request)
+
+
 class IOServer(SocketServer.UnixStreamServer):
   """IO thread class.
 
 class IOServer(SocketServer.UnixStreamServer):
   """IO thread class.
 
@@ -63,52 +83,36 @@ class IOServer(SocketServer.UnixStreamServer):
   cleanup at shutdown.
 
   """
   cleanup at shutdown.
 
   """
-  QUEUE_PROCESSOR_SIZE = 1
-
-  def __init__(self, address, rqhandler):
-    SocketServer.UnixStreamServer.__init__(self, address, rqhandler)
-    self.do_quit = False
-    self.queue = jqueue.QueueManager()
-    self.processors = []
-    signal.signal(signal.SIGINT, self.handle_quit_signals)
-    signal.signal(signal.SIGTERM, self.handle_quit_signals)
-
-  def setup_processors(self):
-    """Spawn the processors threads.
+  def __init__(self, address, rqhandler, context):
+    """IOServer constructor
 
 
-    This initializes the queue and the thread processors. It is done
-    separately from the constructor because we want the clone()
-    syscalls to happen after the daemonize part.
+    Args:
+      address: the address to bind this IOServer to
+      rqhandler: RequestHandler type object
+      context: Context Object common to all worker threads
 
     """
 
     """
-    for i in range(self.QUEUE_PROCESSOR_SIZE):
-      self.processors.append(threading.Thread(target=PoolWorker,
-                                              args=(i, self.queue.new_queue)))
-    for t in self.processors:
-      t.start()
+    SocketServer.UnixStreamServer.__init__(self, address, rqhandler)
+    self.do_quit = False
+    self.context = context
 
 
-  def process_request_thread(self, request, client_address):
-    """Process the request.
+    # We'll only start threads once we've forked.
+    self.jobqueue = None
+    self.request_workers = None
 
 
-    This is copied from the code in ThreadingMixIn.
+    signal.signal(signal.SIGINT, self.handle_quit_signals)
+    signal.signal(signal.SIGTERM, self.handle_quit_signals)
 
 
-    """
-    try:
-      self.finish_request(request, client_address)
-      self.close_request(request)
-    except:
-      self.handle_error(request, client_address)
-      self.close_request(request)
+  def setup_queue(self):
+    self.jobqueue = jqueue.JobQueue(self.context)
+    self.request_workers = workerpool.WorkerPool(CLIENT_REQUEST_WORKERS,
+                                                 ClientRequestWorker)
 
   def process_request(self, request, client_address):
 
   def process_request(self, request, client_address):
-    """Start a new thread to process the request.
-
-    This is copied from the coode in ThreadingMixIn.
+    """Add task to workerpool to process request.
 
     """
 
     """
-    t = threading.Thread(target=self.process_request_thread,
-                         args=(request, client_address))
-    t.start()
+    self.request_workers.AddTask(self, request, client_address)
 
   def handle_quit_signals(self, signum, frame):
     print "received %s in %s" % (signum, frame)
 
   def handle_quit_signals(self, signum, frame):
     print "received %s in %s" % (signum, frame)
@@ -127,14 +131,14 @@ class IOServer(SocketServer.UnixStreamServer):
     socket.
 
     """
     socket.
 
     """
-    self.server_close()
-    utils.RemoveFile(constants.MASTER_SOCKET)
-    for i in range(self.QUEUE_PROCESSOR_SIZE):
-      self.queue.new_queue.put(None)
-    for idx, t in enumerate(self.processors):
-      print "waiting for processor thread %s..." % idx
-      t.join()
-    print "done threads"
+    try:
+      self.server_close()
+      utils.RemoveFile(constants.MASTER_SOCKET)
+    finally:
+      if self.request_workers:
+        self.request_workers.TerminateWorkers()
+      if self.jobqueue:
+        self.jobqueue.Shutdown()
 
 
 class ClientRqHandler(SocketServer.BaseRequestHandler):
 
 
 class ClientRqHandler(SocketServer.BaseRequestHandler):
@@ -151,21 +155,36 @@ class ClientRqHandler(SocketServer.BaseRequestHandler):
     while True:
       msg = self.read_message()
       if msg is None:
     while True:
       msg = self.read_message()
       if msg is None:
-        print "client closed connection"
+        logging.info("client closed connection")
         break
         break
+
       request = simplejson.loads(msg)
       request = simplejson.loads(msg)
+      logging.debug("request: %s", request)
       if not isinstance(request, dict):
       if not isinstance(request, dict):
-        print "wrong request received: %s" % msg
+        logging.error("wrong request received: %s", msg)
         break
         break
-      method = request.get('request', None)
-      data = request.get('data', None)
-      if method is None or data is None:
-        print "no method or data in request"
+
+      method = request.get(luxi.KEY_METHOD, None)
+      args = request.get(luxi.KEY_ARGS, None)
+      if method is None or args is None:
+        logging.error("no method or args in request")
         break
         break
-      print "request:", method, data
-      result = self._ops.handle_request(method, data)
-      print "result:", result
-      self.send_message(simplejson.dumps({'success': True, 'result': result}))
+
+      success = False
+      try:
+        result = self._ops.handle_request(method, args)
+        success = True
+      except:
+        logging.error("Unexpected exception", exc_info=True)
+        err = sys.exc_info()
+        result = "Caught exception: %s" % str(err[1])
+
+      response = {
+        luxi.KEY_SUCCESS: success,
+        luxi.KEY_RESULT: result,
+        }
+      logging.debug("response: %s", response)
+      self.send_message(simplejson.dumps(response))
 
   def read_message(self):
     while not self._msgs:
 
   def read_message(self):
     while not self._msgs:
@@ -186,85 +205,65 @@ class ClientOps:
   """Class holding high-level client operations."""
   def __init__(self, server):
     self.server = server
   """Class holding high-level client operations."""
   def __init__(self, server):
     self.server = server
-    self._cpu = None
-
-  def _getcpu(self):
-    if self._cpu is None:
-      self._cpu = mcpu.Processor(lambda x: None)
-    return self._cpu
-
-  def handle_request(self, operation, args):
-    print operation, args
-    if operation == "submit":
-      return self.put(args)
-    elif operation == "query":
-      return self.query(args)
+
+  def handle_request(self, method, args):
+    queue = self.server.jobqueue
+
+    # TODO: Parameter validation
+
+    if method == luxi.REQ_SUBMIT_JOB:
+      ops = [opcodes.OpCode.LoadOpCode(state) for state in args]
+      return queue.SubmitJob(ops)
+
+    elif method == luxi.REQ_CANCEL_JOB:
+      (job_id, ) = args
+      return queue.CancelJob(job_id)
+
+    elif method == luxi.REQ_ARCHIVE_JOB:
+      (job_id, ) = args
+      return queue.ArchiveJob(job_id)
+
+    elif method == luxi.REQ_QUERY_JOBS:
+      (job_ids, fields) = args
+      return queue.QueryJobs(job_ids, fields)
+
     else:
       raise ValueError("Invalid operation")
 
     else:
       raise ValueError("Invalid operation")
 
-  def put(self, args):
-    job = luxi.UnserializeJob(args)
-    rid = self.server.queue.put(job)
-    return rid
-
-  def query(self, args):
-    path = args["object"]
-    fields = args["fields"]
-    names = args["names"]
-    if path == "instances":
-      opclass = opcodes.OpQueryInstances
-    elif path == "jobs":
-      # early exit because job query-ing is special (not via opcodes)
-      return self.query_jobs(fields, names)
-    else:
-      raise ValueError("Invalid object %s" % path)
 
 
-    op = opclass(output_fields = fields, names=names)
-    cpu = self._getcpu()
-    result = cpu.ExecOpCode(op)
-    return result
+class GanetiContext(object):
+  """Context common to all ganeti threads.
 
 
-  def query_jobs(self, fields, names):
-    return self.server.queue.query_jobs(fields, names)
+  This class creates and holds common objects shared by all threads.
 
 
+  """
+  _instance = None
 
 
-def JobRunner(proc, job):
-  """Job executor.
+  def __init__(self):
+    """Constructs a new GanetiContext object.
 
 
-  This functions processes a single job in the context of given
-  processor instance.
+    There should be only a GanetiContext object at any time, so this
+    function raises an error if this is not the case.
 
 
-  """
-  job.SetStatus(opcodes.Job.STATUS_RUNNING)
-  for op in job.data.op_list:
-    proc.ExecOpCode(op)
-  job.SetStatus(opcodes.Job.STATUS_FINISHED, result=opcodes.Job.RESULT_OK)
+    """
+    assert self.__class__._instance is None, "double GanetiContext instance"
 
 
+    # Create a ConfigWriter...
+    self.cfg = config.ConfigWriter()
+    # And a GanetiLockingManager...
+    self.glm = locking.GanetiLockManager(
+                self.cfg.GetNodeList(),
+                self.cfg.GetInstanceList())
 
 
-def PoolWorker(worker_id, incoming_queue):
-  """A worker thread function.
+    # setting this also locks the class against attribute modifications
+    self.__class__._instance = self
 
 
-  This is the actual processor of a single thread of Job execution.
+  def __setattr__(self, name, value):
+    """Setting GanetiContext attributes is forbidden after initialization.
 
 
-  """
-  while True:
-    print "worker %s sleeping" % worker_id
-    item = incoming_queue.get(True)
-    if item is None:
-      break
-    print "worker %s processing job %s" % (worker_id, item.data.job_id)
-    utils.Lock('cmd')
-    try:
-      proc = mcpu.Processor(feedback=lambda x: None)
-      try:
-        JobRunner(proc, item)
-      except errors.GenericError, err:
-        print "ganeti exception %s" % err
-    finally:
-      utils.Unlock('cmd')
-      utils.LockCleanup()
-    print "worker %s finish job %s" % (worker_id, item.data.job_id)
-  print "worker %s exiting" % worker_id
+    """
+    assert self.__class__._instance is None, "Attempt to modify Ganeti Context"
+    object.__setattr__(self, name, value)
 
 
 def CheckMaster(debug):
 
 
 def CheckMaster(debug):
@@ -320,21 +319,28 @@ def main():
 
   options, args = ParseOptions()
   utils.debug = options.debug
 
   options, args = ParseOptions()
   utils.debug = options.debug
+  utils.no_fork = True
 
   CheckMaster(options.debug)
 
 
   CheckMaster(options.debug)
 
-  master = IOServer(constants.MASTER_SOCKET, ClientRqHandler)
+  master = IOServer(constants.MASTER_SOCKET, ClientRqHandler, GanetiContext())
 
   # become a daemon
   if options.fork:
     utils.Daemonize(logfile=constants.LOG_MASTERDAEMON,
                     noclose_fds=[master.fileno()])
 
 
   # become a daemon
   if options.fork:
     utils.Daemonize(logfile=constants.LOG_MASTERDAEMON,
                     noclose_fds=[master.fileno()])
 
-  master.setup_processors()
+  logger.SetupDaemon(constants.LOG_MASTERDAEMON, debug=options.debug,
+                     stderr_logging=not options.fork)
+
+  logging.info("ganeti master daemon startup")
+
+  master.setup_queue()
   try:
     master.serve_forever()
   finally:
     master.server_cleanup()
 
   try:
     master.serve_forever()
   finally:
     master.server_cleanup()
 
+
 if __name__ == "__main__":
   main()
 if __name__ == "__main__":
   main()