Unify SetupDaemon/SetupLogging
[ganeti-local] / daemons / ganeti-masterd
old mode 100644 (file)
new mode 100755 (executable)
index d67b6fd..cdaff4e
@@ -1,4 +1,4 @@
-#!/usr/bin/python
+#!/usr/bin/python -u
 #
 
 # Copyright (C) 2006, 2007 Google Inc.
 #
 
 # Copyright (C) 2006, 2007 Google Inc.
@@ -29,32 +29,53 @@ inheritance from parent classes requires it.
 
 import sys
 import SocketServer
 
 import sys
 import SocketServer
-import threading
 import time
 import collections
 import Queue
 import random
 import signal
 import simplejson
 import time
 import collections
 import Queue
 import random
 import signal
 import simplejson
-
+import logging
 
 from cStringIO import StringIO
 from optparse import OptionParser
 
 
 from cStringIO import StringIO
 from optparse import OptionParser
 
+from ganeti import config
 from ganeti import constants
 from ganeti import mcpu
 from ganeti import opcodes
 from ganeti import jqueue
 from ganeti import constants
 from ganeti import mcpu
 from ganeti import opcodes
 from ganeti import jqueue
+from ganeti import locking
 from ganeti import luxi
 from ganeti import utils
 from ganeti import errors
 from ganeti import ssconf
 from ganeti import luxi
 from ganeti import utils
 from ganeti import errors
 from ganeti import ssconf
+from ganeti import logger
+from ganeti import workerpool
+from ganeti import rpc
+
 
 
+CLIENT_REQUEST_WORKERS = 16
 
 EXIT_NOTMASTER = constants.EXIT_NOTMASTER
 EXIT_NODESETUP_ERROR = constants.EXIT_NODESETUP_ERROR
 
 
 
 EXIT_NOTMASTER = constants.EXIT_NOTMASTER
 EXIT_NODESETUP_ERROR = constants.EXIT_NODESETUP_ERROR
 
 
+class ClientRequestWorker(workerpool.BaseWorker):
+  def RunTask(self, server, request, client_address):
+    """Process the request.
+
+    This is copied from the code in ThreadingMixIn.
+
+    """
+    try:
+      server.finish_request(request, client_address)
+      server.close_request(request)
+    except:
+      server.handle_error(request, client_address)
+      server.close_request(request)
+
+
 class IOServer(SocketServer.UnixStreamServer):
   """IO thread class.
 
 class IOServer(SocketServer.UnixStreamServer):
   """IO thread class.
 
@@ -63,62 +84,41 @@ class IOServer(SocketServer.UnixStreamServer):
   cleanup at shutdown.
 
   """
   cleanup at shutdown.
 
   """
-  QUEUE_PROCESSOR_SIZE = 1
-
-  def __init__(self, address, rqhandler):
-    SocketServer.UnixStreamServer.__init__(self, address, rqhandler)
-    self.do_quit = False
-    self.queue = jqueue.QueueManager()
-    self.processors = []
-    signal.signal(signal.SIGINT, self.handle_quit_signals)
-    signal.signal(signal.SIGTERM, self.handle_quit_signals)
+  def __init__(self, address, rqhandler, context):
+    """IOServer constructor
 
 
-  def setup_processors(self):
-    """Spawn the processors threads.
-
-    This initializes the queue and the thread processors. It is done
-    separately from the constructor because we want the clone()
-    syscalls to happen after the daemonize part.
+    Args:
+      address: the address to bind this IOServer to
+      rqhandler: RequestHandler type object
+      context: Context Object common to all worker threads
 
     """
 
     """
-    for i in range(self.QUEUE_PROCESSOR_SIZE):
-      self.processors.append(threading.Thread(target=PoolWorker,
-                                              args=(i, self.queue.new_queue)))
-    for t in self.processors:
-      t.start()
+    SocketServer.UnixStreamServer.__init__(self, address, rqhandler)
+    self.context = context
 
 
-  def process_request_thread(self, request, client_address):
-    """Process the request.
+    # We'll only start threads once we've forked.
+    self.jobqueue = None
+    self.request_workers = None
 
 
-    This is copied from the code in ThreadingMixIn.
-
-    """
-    try:
-      self.finish_request(request, client_address)
-      self.close_request(request)
-    except:
-      self.handle_error(request, client_address)
-      self.close_request(request)
+  def setup_queue(self):
+    self.jobqueue = jqueue.JobQueue(self.context)
+    self.request_workers = workerpool.WorkerPool(CLIENT_REQUEST_WORKERS,
+                                                 ClientRequestWorker)
 
   def process_request(self, request, client_address):
 
   def process_request(self, request, client_address):
-    """Start a new thread to process the request.
-
-    This is copied from the coode in ThreadingMixIn.
+    """Add task to workerpool to process request.
 
     """
 
     """
-    t = threading.Thread(target=self.process_request_thread,
-                         args=(request, client_address))
-    t.start()
-
-  def handle_quit_signals(self, signum, frame):
-    print "received %s in %s" % (signum, frame)
-    self.do_quit = True
+    self.request_workers.AddTask(self, request, client_address)
 
   def serve_forever(self):
     """Handle one request at a time until told to quit."""
 
   def serve_forever(self):
     """Handle one request at a time until told to quit."""
-    while not self.do_quit:
-      self.handle_request()
-      print "served request, quit=%s" % (self.do_quit)
+    sighandler = utils.SignalHandler([signal.SIGINT, signal.SIGTERM])
+    try:
+      while not sighandler.called:
+        self.handle_request()
+    finally:
+      sighandler.Reset()
 
   def server_cleanup(self):
     """Cleanup the server.
 
   def server_cleanup(self):
     """Cleanup the server.
@@ -127,14 +127,14 @@ class IOServer(SocketServer.UnixStreamServer):
     socket.
 
     """
     socket.
 
     """
-    self.server_close()
-    utils.RemoveFile(constants.MASTER_SOCKET)
-    for i in range(self.QUEUE_PROCESSOR_SIZE):
-      self.queue.new_queue.put(None)
-    for idx, t in enumerate(self.processors):
-      print "waiting for processor thread %s..." % idx
-      t.join()
-    print "done threads"
+    try:
+      self.server_close()
+      utils.RemoveFile(constants.MASTER_SOCKET)
+    finally:
+      if self.request_workers:
+        self.request_workers.TerminateWorkers()
+      if self.jobqueue:
+        self.jobqueue.Shutdown()
 
 
 class ClientRqHandler(SocketServer.BaseRequestHandler):
 
 
 class ClientRqHandler(SocketServer.BaseRequestHandler):
@@ -151,21 +151,36 @@ class ClientRqHandler(SocketServer.BaseRequestHandler):
     while True:
       msg = self.read_message()
       if msg is None:
     while True:
       msg = self.read_message()
       if msg is None:
-        print "client closed connection"
+        logging.info("client closed connection")
         break
         break
+
       request = simplejson.loads(msg)
       request = simplejson.loads(msg)
+      logging.debug("request: %s", request)
       if not isinstance(request, dict):
       if not isinstance(request, dict):
-        print "wrong request received: %s" % msg
+        logging.error("wrong request received: %s", msg)
         break
         break
-      method = request.get('request', None)
-      data = request.get('data', None)
-      if method is None or data is None:
-        print "no method or data in request"
+
+      method = request.get(luxi.KEY_METHOD, None)
+      args = request.get(luxi.KEY_ARGS, None)
+      if method is None or args is None:
+        logging.error("no method or args in request")
         break
         break
-      print "request:", method, data
-      result = self._ops.handle_request(method, data)
-      print "result:", result
-      self.send_message(simplejson.dumps({'success': True, 'result': result}))
+
+      success = False
+      try:
+        result = self._ops.handle_request(method, args)
+        success = True
+      except:
+        logging.error("Unexpected exception", exc_info=True)
+        err = sys.exc_info()
+        result = "Caught exception: %s" % str(err[1])
+
+      response = {
+        luxi.KEY_SUCCESS: success,
+        luxi.KEY_RESULT: result,
+        }
+      logging.debug("response: %s", response)
+      self.send_message(simplejson.dumps(response))
 
   def read_message(self):
     while not self._msgs:
 
   def read_message(self):
     while not self._msgs:
@@ -186,122 +201,69 @@ class ClientOps:
   """Class holding high-level client operations."""
   def __init__(self, server):
     self.server = server
   """Class holding high-level client operations."""
   def __init__(self, server):
     self.server = server
-    self._cpu = None
-
-  def _getcpu(self):
-    if self._cpu is None:
-      self._cpu = mcpu.Processor(lambda x: None)
-    return self._cpu
-
-  def handle_request(self, operation, args):
-    print operation, args
-    if operation == "submit":
-      return self.put(args)
-    elif operation == "query":
-      return self.query(args)
-    else:
-      raise ValueError("Invalid operation")
 
 
-  def put(self, args):
-    job = luxi.UnserializeJob(args)
-    rid = self.server.queue.put(job)
-    return rid
-
-  def query(self, args):
-    path = args["object"]
-    fields = args["fields"]
-    names = args["names"]
-    if path == "instances":
-      opclass = opcodes.OpQueryInstances
-    elif path == "jobs":
-      # early exit because job query-ing is special (not via opcodes)
-      return self.query_jobs(fields, names)
-    else:
-      raise ValueError("Invalid object %s" % path)
+  def handle_request(self, method, args):
+    queue = self.server.jobqueue
 
 
-    op = opclass(output_fields = fields, names=names)
-    cpu = self._getcpu()
-    result = cpu.ExecOpCode(op)
-    return result
+    # TODO: Parameter validation
 
 
-  def query_jobs(self, fields, names):
-    return self.server.queue.query_jobs(fields, names)
+    if method == luxi.REQ_SUBMIT_JOB:
+      ops = [opcodes.OpCode.LoadOpCode(state) for state in args]
+      # we need to compute the node list here, since from now on all
+      # operations require locks on the queue or the storage, and we
+      # shouldn't get another lock
+      node_list = self.server.context.cfg.GetNodeList()
+      return queue.SubmitJob(ops, node_list)
 
 
+    elif method == luxi.REQ_CANCEL_JOB:
+      job_id = args
+      return queue.CancelJob(job_id)
 
 
-def JobRunner(proc, job):
-  """Job executor.
+    elif method == luxi.REQ_ARCHIVE_JOB:
+      job_id = args
+      return queue.ArchiveJob(job_id)
 
 
-  This functions processes a single job in the context of given
-  processor instance.
+    elif method == luxi.REQ_QUERY_JOBS:
+      (job_ids, fields) = args
+      return queue.QueryJobs(job_ids, fields)
 
 
-  """
-  job.SetStatus(opcodes.Job.STATUS_RUNNING)
-  fail = False
-  for idx, op in enumerate(job.data.op_list):
-    job.data.op_status[idx] = opcodes.Job.STATUS_RUNNING
-    try:
-      job.data.op_result[idx] = proc.ExecOpCode(op)
-      job.data.op_status[idx] = opcodes.Job.STATUS_SUCCESS
-    except (errors.OpPrereqError, errors.OpExecError), err:
-      fail = True
-      job.data.op_result[idx] = str(err)
-      job.data.op_status[idx] = opcodes.Job.STATUS_FAIL
-  if fail:
-    job.SetStatus(opcodes.Job.STATUS_FAIL)
-  else:
-    job.SetStatus(opcodes.Job.STATUS_SUCCESS)
+    else:
+      raise ValueError("Invalid operation")
 
 
 
 
-def PoolWorker(worker_id, incoming_queue):
-  """A worker thread function.
+class GanetiContext(object):
+  """Context common to all ganeti threads.
 
 
-  This is the actual processor of a single thread of Job execution.
+  This class creates and holds common objects shared by all threads.
 
   """
 
   """
-  while True:
-    print "worker %s sleeping" % worker_id
-    item = incoming_queue.get(True)
-    if item is None:
-      break
-    print "worker %s processing job %s" % (worker_id, item.data.job_id)
-    utils.Lock('cmd')
-    try:
-      proc = mcpu.Processor(feedback=lambda x: None)
-      try:
-        JobRunner(proc, item)
-      except errors.GenericError, err:
-        print "ganeti exception %s" % err
-    finally:
-      utils.Unlock('cmd')
-      utils.LockCleanup()
-    print "worker %s finish job %s" % (worker_id, item.data.job_id)
-  print "worker %s exiting" % worker_id
+  _instance = None
 
 
+  def __init__(self):
+    """Constructs a new GanetiContext object.
 
 
-def CheckMaster(debug):
-  """Checks the node setup.
+    There should be only a GanetiContext object at any time, so this
+    function raises an error if this is not the case.
 
 
-  If this is the master, the function will return. Otherwise it will
-  exit with an exit code based on the node status.
+    """
+    assert self.__class__._instance is None, "double GanetiContext instance"
 
 
-  """
-  try:
-    ss = ssconf.SimpleStore()
-    master_name = ss.GetMasterNode()
-  except errors.ConfigurationError, err:
-    print "Cluster configuration incomplete: '%s'" % str(err)
-    sys.exit(EXIT_NODESETUP_ERROR)
+    # Create a ConfigWriter...
+    self.cfg = config.ConfigWriter()
+    # And a GanetiLockingManager...
+    self.glm = locking.GanetiLockManager(
+                self.cfg.GetNodeList(),
+                self.cfg.GetInstanceList())
 
 
-  try:
-    myself = utils.HostInfo()
-  except errors.ResolverError, err:
-    sys.stderr.write("Cannot resolve my own name (%s)\n" % err.args[0])
-    sys.exit(EXIT_NODESETUP_ERROR)
+    # setting this also locks the class against attribute modifications
+    self.__class__._instance = self
 
 
-  if myself.name != master_name:
-    if debug:
-      sys.stderr.write("Not master, exiting.\n")
-    sys.exit(EXIT_NOTMASTER)
+  def __setattr__(self, name, value):
+    """Setting GanetiContext attributes is forbidden after initialization.
+
+    """
+    assert self.__class__._instance is None, "Attempt to modify Ganeti Context"
+    object.__setattr__(self, name, value)
 
 
 def ParseOptions():
 
 
 def ParseOptions():
@@ -331,21 +293,36 @@ def main():
 
   options, args = ParseOptions()
   utils.debug = options.debug
 
   options, args = ParseOptions()
   utils.debug = options.debug
+  utils.no_fork = True
 
 
-  CheckMaster(options.debug)
+  ssconf.CheckMaster(options.debug)
 
 
-  master = IOServer(constants.MASTER_SOCKET, ClientRqHandler)
+  master = IOServer(constants.MASTER_SOCKET, ClientRqHandler, GanetiContext())
 
   # become a daemon
   if options.fork:
     utils.Daemonize(logfile=constants.LOG_MASTERDAEMON,
                     noclose_fds=[master.fileno()])
 
 
   # become a daemon
   if options.fork:
     utils.Daemonize(logfile=constants.LOG_MASTERDAEMON,
                     noclose_fds=[master.fileno()])
 
-  master.setup_processors()
+  utils.WritePidFile(constants.MASTERD_PID)
+
+  logger.SetupLogging(constants.LOG_MASTERDAEMON, debug=options.debug,
+                      stderr_logging=not options.fork)
+
+  logging.info("ganeti master daemon startup")
+
+  # activate ip
+  master_node = ssconf.SimpleStore().GetMasterNode()
+  if not rpc.call_node_start_master(master_node, False):
+    logging.error("Can't activate master IP address")
+
+  master.setup_queue()
   try:
     master.serve_forever()
   finally:
     master.server_cleanup()
   try:
     master.serve_forever()
   finally:
     master.server_cleanup()
+    utils.RemovePidFile(constants.MASTERD_PID)
+
 
 if __name__ == "__main__":
   main()
 
 if __name__ == "__main__":
   main()