Cluster: add nicparams, and update them on upgrade
[ganeti-local] / daemons / ganeti-masterd
index 72c0fd5..2305fc6 100755 (executable)
@@ -27,34 +27,57 @@ inheritance from parent classes requires it.
 """
 
 
 """
 
 
+import os
+import errno
 import sys
 import SocketServer
 import sys
 import SocketServer
-import threading
 import time
 import collections
 import Queue
 import random
 import signal
 import time
 import collections
 import Queue
 import random
 import signal
-import simplejson
-
+import logging
 
 from cStringIO import StringIO
 from optparse import OptionParser
 
 
 from cStringIO import StringIO
 from optparse import OptionParser
 
+from ganeti import config
 from ganeti import constants
 from ganeti import mcpu
 from ganeti import opcodes
 from ganeti import jqueue
 from ganeti import constants
 from ganeti import mcpu
 from ganeti import opcodes
 from ganeti import jqueue
+from ganeti import locking
 from ganeti import luxi
 from ganeti import utils
 from ganeti import errors
 from ganeti import ssconf
 from ganeti import luxi
 from ganeti import utils
 from ganeti import errors
 from ganeti import ssconf
+from ganeti import workerpool
+from ganeti import rpc
+from ganeti import bootstrap
+from ganeti import serializer
+
 
 
+CLIENT_REQUEST_WORKERS = 16
 
 EXIT_NOTMASTER = constants.EXIT_NOTMASTER
 EXIT_NODESETUP_ERROR = constants.EXIT_NODESETUP_ERROR
 
 
 
 EXIT_NOTMASTER = constants.EXIT_NOTMASTER
 EXIT_NODESETUP_ERROR = constants.EXIT_NODESETUP_ERROR
 
 
+class ClientRequestWorker(workerpool.BaseWorker):
+  def RunTask(self, server, request, client_address):
+    """Process the request.
+
+    This is copied from the code in ThreadingMixIn.
+
+    """
+    try:
+      server.finish_request(request, client_address)
+      server.close_request(request)
+    except:
+      server.handle_error(request, client_address)
+      server.close_request(request)
+
+
 class IOServer(SocketServer.UnixStreamServer):
   """IO thread class.
 
 class IOServer(SocketServer.UnixStreamServer):
   """IO thread class.
 
@@ -63,69 +86,38 @@ class IOServer(SocketServer.UnixStreamServer):
   cleanup at shutdown.
 
   """
   cleanup at shutdown.
 
   """
-  QUEUE_PROCESSOR_SIZE = 1
-
   def __init__(self, address, rqhandler):
     """IOServer constructor
 
   def __init__(self, address, rqhandler):
     """IOServer constructor
 
-    Args:
-      address: the address to bind this IOServer to
-      rqhandler: RequestHandler type object
+    @param address: the address to bind this IOServer to
+    @param rqhandler: RequestHandler type object
 
     """
     SocketServer.UnixStreamServer.__init__(self, address, rqhandler)
 
     """
     SocketServer.UnixStreamServer.__init__(self, address, rqhandler)
-    self.do_quit = False
-    self.queue = jqueue.QueueManager()
-    self.processors = []
-    signal.signal(signal.SIGINT, self.handle_quit_signals)
-    signal.signal(signal.SIGTERM, self.handle_quit_signals)
-
-  def setup_processors(self):
-    """Spawn the processors threads.
-
-    This initializes the queue and the thread processors. It is done
-    separately from the constructor because we want the clone()
-    syscalls to happen after the daemonize part.
-
-    """
-    for i in range(self.QUEUE_PROCESSOR_SIZE):
-      self.processors.append(threading.Thread(target=PoolWorker,
-                                              args=(i, self.queue.new_queue)))
-    for t in self.processors:
-      t.start()
 
 
-  def process_request_thread(self, request, client_address):
-    """Process the request.
-
-    This is copied from the code in ThreadingMixIn.
+    # We'll only start threads once we've forked.
+    self.context = None
+    self.request_workers = None
 
 
-    """
-    try:
-      self.finish_request(request, client_address)
-      self.close_request(request)
-    except:
-      self.handle_error(request, client_address)
-      self.close_request(request)
+  def setup_queue(self):
+    self.context = GanetiContext()
+    self.request_workers = workerpool.WorkerPool(CLIENT_REQUEST_WORKERS,
+                                                 ClientRequestWorker)
 
   def process_request(self, request, client_address):
 
   def process_request(self, request, client_address):
-    """Start a new thread to process the request.
-
-    This is copied from the coode in ThreadingMixIn.
+    """Add task to workerpool to process request.
 
     """
 
     """
-    t = threading.Thread(target=self.process_request_thread,
-                         args=(request, client_address))
-    t.start()
-
-  def handle_quit_signals(self, signum, frame):
-    print "received %s in %s" % (signum, frame)
-    self.do_quit = True
+    self.request_workers.AddTask(self, request, client_address)
 
   def serve_forever(self):
     """Handle one request at a time until told to quit."""
 
   def serve_forever(self):
     """Handle one request at a time until told to quit."""
-    while not self.do_quit:
-      self.handle_request()
-      print "served request, quit=%s" % (self.do_quit)
+    sighandler = utils.SignalHandler([signal.SIGINT, signal.SIGTERM])
+    try:
+      while not sighandler.called:
+        self.handle_request()
+    finally:
+      sighandler.Reset()
 
   def server_cleanup(self):
     """Cleanup the server.
 
   def server_cleanup(self):
     """Cleanup the server.
@@ -134,14 +126,13 @@ class IOServer(SocketServer.UnixStreamServer):
     socket.
 
     """
     socket.
 
     """
-    self.server_close()
-    utils.RemoveFile(constants.MASTER_SOCKET)
-    for i in range(self.QUEUE_PROCESSOR_SIZE):
-      self.queue.new_queue.put(None)
-    for idx, t in enumerate(self.processors):
-      print "waiting for processor thread %s..." % idx
-      t.join()
-    print "done threads"
+    try:
+      self.server_close()
+    finally:
+      if self.request_workers:
+        self.request_workers.TerminateWorkers()
+      if self.context:
+        self.context.jobqueue.Shutdown()
 
 
 class ClientRqHandler(SocketServer.BaseRequestHandler):
 
 
 class ClientRqHandler(SocketServer.BaseRequestHandler):
@@ -158,21 +149,39 @@ class ClientRqHandler(SocketServer.BaseRequestHandler):
     while True:
       msg = self.read_message()
       if msg is None:
     while True:
       msg = self.read_message()
       if msg is None:
-        print "client closed connection"
+        logging.debug("client closed connection")
         break
         break
-      request = simplejson.loads(msg)
+
+      request = serializer.LoadJson(msg)
+      logging.debug("request: %s", request)
       if not isinstance(request, dict):
       if not isinstance(request, dict):
-        print "wrong request received: %s" % msg
+        logging.error("wrong request received: %s", msg)
         break
         break
-      method = request.get('request', None)
-      data = request.get('data', None)
-      if method is None or data is None:
-        print "no method or data in request"
+
+      method = request.get(luxi.KEY_METHOD, None)
+      args = request.get(luxi.KEY_ARGS, None)
+      if method is None or args is None:
+        logging.error("no method or args in request")
         break
         break
-      print "request:", method, data
-      result = self._ops.handle_request(method, data)
-      print "result:", result
-      self.send_message(simplejson.dumps({'success': True, 'result': result}))
+
+      success = False
+      try:
+        result = self._ops.handle_request(method, args)
+        success = True
+      except errors.GenericError, err:
+        success = False
+        result = (err.__class__.__name__, err.args)
+      except:
+        logging.error("Unexpected exception", exc_info=True)
+        err = sys.exc_info()
+        result = "Caught exception: %s" % str(err[1])
+
+      response = {
+        luxi.KEY_SUCCESS: success,
+        luxi.KEY_RESULT: result,
+        }
+      logging.debug("response: %s", response)
+      self.send_message(serializer.DumpJson(response))
 
   def read_message(self):
     while not self._msgs:
 
   def read_message(self):
     while not self._msgs:
@@ -193,138 +202,190 @@ class ClientOps:
   """Class holding high-level client operations."""
   def __init__(self, server):
     self.server = server
   """Class holding high-level client operations."""
   def __init__(self, server):
     self.server = server
-    self._cpu = None
-
-  def _getcpu(self):
-    if self._cpu is None:
-      self._cpu = mcpu.Processor(lambda x: None)
-    return self._cpu
-
-  def handle_request(self, operation, args):
-    print operation, args
-    if operation == "submit":
-      return self.put(args)
-    elif operation == "query":
-      return self.query(args)
-    else:
-      raise ValueError("Invalid operation")
-
-  def put(self, args):
-    job = luxi.UnserializeJob(args)
-    rid = self.server.queue.put(job)
-    return rid
-
-  def query(self, args):
-    path = args["object"]
-    fields = args["fields"]
-    names = args["names"]
-    if path == "instances":
-      opclass = opcodes.OpQueryInstances
-    elif path == "jobs":
-      # early exit because job query-ing is special (not via opcodes)
-      return self.query_jobs(fields, names)
+
+  def handle_request(self, method, args):
+    queue = self.server.context.jobqueue
+
+    # TODO: Parameter validation
+
+    if method == luxi.REQ_SUBMIT_JOB:
+      logging.info("Received new job")
+      ops = [opcodes.OpCode.LoadOpCode(state) for state in args]
+      return queue.SubmitJob(ops)
+
+    if method == luxi.REQ_SUBMIT_MANY_JOBS:
+      logging.info("Received multiple jobs")
+      jobs = []
+      for ops in args:
+        jobs.append([opcodes.OpCode.LoadOpCode(state) for state in ops])
+      return queue.SubmitManyJobs(jobs)
+
+    elif method == luxi.REQ_CANCEL_JOB:
+      job_id = args
+      logging.info("Received job cancel request for %s", job_id)
+      return queue.CancelJob(job_id)
+
+    elif method == luxi.REQ_ARCHIVE_JOB:
+      job_id = args
+      logging.info("Received job archive request for %s", job_id)
+      return queue.ArchiveJob(job_id)
+
+    elif method == luxi.REQ_AUTOARCHIVE_JOBS:
+      (age, timeout) = args
+      logging.info("Received job autoarchive request for age %s, timeout %s",
+                   age, timeout)
+      return queue.AutoArchiveJobs(age, timeout)
+
+    elif method == luxi.REQ_WAIT_FOR_JOB_CHANGE:
+      (job_id, fields, prev_job_info, prev_log_serial, timeout) = args
+      logging.info("Received job poll request for %s", job_id)
+      return queue.WaitForJobChanges(job_id, fields, prev_job_info,
+                                     prev_log_serial, timeout)
+
+    elif method == luxi.REQ_QUERY_JOBS:
+      (job_ids, fields) = args
+      if isinstance(job_ids, (tuple, list)) and job_ids:
+        msg = ", ".join(job_ids)
+      else:
+        msg = str(job_ids)
+      logging.info("Received job query request for %s", msg)
+      return queue.QueryJobs(job_ids, fields)
+
+    elif method == luxi.REQ_QUERY_INSTANCES:
+      (names, fields, use_locking) = args
+      logging.info("Received instance query request for %s", names)
+      if use_locking:
+        raise errors.OpPrereqError("Sync queries are not allowed")
+      op = opcodes.OpQueryInstances(names=names, output_fields=fields,
+                                    use_locking=use_locking)
+      return self._Query(op)
+
+    elif method == luxi.REQ_QUERY_NODES:
+      (names, fields, use_locking) = args
+      logging.info("Received node query request for %s", names)
+      if use_locking:
+        raise errors.OpPrereqError("Sync queries are not allowed")
+      op = opcodes.OpQueryNodes(names=names, output_fields=fields,
+                                use_locking=use_locking)
+      return self._Query(op)
+
+    elif method == luxi.REQ_QUERY_EXPORTS:
+      nodes, use_locking = args
+      if use_locking:
+        raise errors.OpPrereqError("Sync queries are not allowed")
+      logging.info("Received exports query request")
+      op = opcodes.OpQueryExports(nodes=nodes, use_locking=use_locking)
+      return self._Query(op)
+
+    elif method == luxi.REQ_QUERY_CONFIG_VALUES:
+      fields = args
+      logging.info("Received config values query request for %s", fields)
+      op = opcodes.OpQueryConfigValues(output_fields=fields)
+      return self._Query(op)
+
+    elif method == luxi.REQ_QUERY_CLUSTER_INFO:
+      logging.info("Received cluster info query request")
+      op = opcodes.OpQueryClusterInfo()
+      return self._Query(op)
+
+    elif method == luxi.REQ_QUEUE_SET_DRAIN_FLAG:
+      drain_flag = args
+      logging.info("Received queue drain flag change request to %s",
+                   drain_flag)
+      return queue.SetDrainFlag(drain_flag)
+
     else:
     else:
-      raise ValueError("Invalid object %s" % path)
+      logging.info("Received invalid request '%s'", method)
+      raise ValueError("Invalid operation '%s'" % method)
 
 
-    op = opclass(output_fields = fields, names=names)
-    cpu = self._getcpu()
-    result = cpu.ExecOpCode(op)
-    return result
+  def _DummyLog(self, *args):
+    pass
 
 
-  def query_jobs(self, fields, names):
-    return self.server.queue.query_jobs(fields, names)
+  def _Query(self, op):
+    """Runs the specified opcode and returns the result.
 
 
+    """
+    proc = mcpu.Processor(self.server.context)
+    # TODO: Where should log messages go?
+    return proc.ExecOpCode(op, self._DummyLog, None)
 
 
-def JobRunner(proc, job):
-  """Job executor.
 
 
-  This functions processes a single job in the context of given
-  processor instance.
+class GanetiContext(object):
+  """Context common to all ganeti threads.
 
 
-  Args:
-    proc: Ganeti Processor to run the job on
-    job: The job to run (unserialized format)
+  This class creates and holds common objects shared by all threads.
 
   """
 
   """
-  job.SetStatus(opcodes.Job.STATUS_RUNNING)
-  fail = False
-  for idx, op in enumerate(job.data.op_list):
-    job.data.op_status[idx] = opcodes.Job.STATUS_RUNNING
-    try:
-      job.data.op_result[idx] = proc.ExecOpCode(op)
-      job.data.op_status[idx] = opcodes.Job.STATUS_SUCCESS
-    except (errors.OpPrereqError, errors.OpExecError), err:
-      fail = True
-      job.data.op_result[idx] = str(err)
-      job.data.op_status[idx] = opcodes.Job.STATUS_FAIL
-  if fail:
-    job.SetStatus(opcodes.Job.STATUS_FAIL)
-  else:
-    job.SetStatus(opcodes.Job.STATUS_SUCCESS)
+  _instance = None
 
 
+  def __init__(self):
+    """Constructs a new GanetiContext object.
 
 
-def PoolWorker(worker_id, incoming_queue):
-  """A worker thread function.
+    There should be only a GanetiContext object at any time, so this
+    function raises an error if this is not the case.
 
 
-  This is the actual processor of a single thread of Job execution.
+    """
+    assert self.__class__._instance is None, "double GanetiContext instance"
 
 
-  Args:
-    worker_id: the unique id for this worker
-    incoming_queue: a queue to get jobs from
+    # Create global configuration object
+    self.cfg = config.ConfigWriter()
 
 
-  """
-  while True:
-    print "worker %s sleeping" % worker_id
-    item = incoming_queue.get(True)
-    if item is None:
-      break
-    print "worker %s processing job %s" % (worker_id, item.data.job_id)
-    #utils.Lock('cmd')
-    try:
-      proc = mcpu.Processor(feedback=lambda x: None)
-      try:
-        JobRunner(proc, item)
-      except errors.GenericError, err:
-        print "ganeti exception %s" % err
-    finally:
-      #utils.Unlock('cmd')
-      #utils.LockCleanup()
-      pass
-    print "worker %s finish job %s" % (worker_id, item.data.job_id)
-  print "worker %s exiting" % worker_id
+    # Locking manager
+    self.glm = locking.GanetiLockManager(
+                self.cfg.GetNodeList(),
+                self.cfg.GetInstanceList())
 
 
+    # Job queue
+    self.jobqueue = jqueue.JobQueue(self)
 
 
-def CheckMaster(debug):
-  """Checks the node setup.
+    # setting this also locks the class against attribute modifications
+    self.__class__._instance = self
 
 
-  If this is the master, the function will return. Otherwise it will
-  exit with an exit code based on the node status.
+  def __setattr__(self, name, value):
+    """Setting GanetiContext attributes is forbidden after initialization.
 
 
-  """
-  try:
-    ss = ssconf.SimpleStore()
-    master_name = ss.GetMasterNode()
-  except errors.ConfigurationError, err:
-    print "Cluster configuration incomplete: '%s'" % str(err)
-    sys.exit(EXIT_NODESETUP_ERROR)
+    """
+    assert self.__class__._instance is None, "Attempt to modify Ganeti Context"
+    object.__setattr__(self, name, value)
 
 
-  try:
-    myself = utils.HostInfo()
-  except errors.ResolverError, err:
-    sys.stderr.write("Cannot resolve my own name (%s)\n" % err.args[0])
-    sys.exit(EXIT_NODESETUP_ERROR)
+  def AddNode(self, node):
+    """Adds a node to the configuration and lock manager.
+
+    """
+    # Add it to the configuration
+    self.cfg.AddNode(node)
+
+    # If preseeding fails it'll not be added
+    self.jobqueue.AddNode(node)
 
 
-  if myself.name != master_name:
-    if debug:
-      sys.stderr.write("Not master, exiting.\n")
-    sys.exit(EXIT_NOTMASTER)
+    # Add the new node to the Ganeti Lock Manager
+    self.glm.add(locking.LEVEL_NODE, node.name)
+
+  def ReaddNode(self, node):
+    """Updates a node that's already in the configuration
+
+    """
+    # Synchronize the queue again
+    self.jobqueue.AddNode(node)
+
+  def RemoveNode(self, name):
+    """Removes a node from the configuration and lock manager.
+
+    """
+    # Remove node from configuration
+    self.cfg.RemoveNode(name)
+
+    # Notify job queue
+    self.jobqueue.RemoveNode(name)
+
+    # Remove the node from the Ganeti Lock Manager
+    self.glm.remove(locking.LEVEL_NODE, name)
 
 
 def ParseOptions():
   """Parse the command line options.
 
 
 
 def ParseOptions():
   """Parse the command line options.
 
-  Returns:
-    (options, args) as from OptionParser.parse_args()
+  @return: (options, args) as from OptionParser.parse_args()
 
   """
   parser = OptionParser(description="Ganeti master daemon",
 
   """
   parser = OptionParser(description="Ganeti master daemon",
@@ -338,10 +399,71 @@ def ParseOptions():
   parser.add_option("-d", "--debug", dest="debug",
                     help="Enable some debug messages",
                     default=False, action="store_true")
   parser.add_option("-d", "--debug", dest="debug",
                     help="Enable some debug messages",
                     default=False, action="store_true")
+  parser.add_option("--no-voting", dest="no_voting",
+                    help="Do not check that the nodes agree on this node"
+                    " being the master and start the daemon unconditionally",
+                    default=False, action="store_true")
   options, args = parser.parse_args()
   return options, args
 
 
   options, args = parser.parse_args()
   return options, args
 
 
+def CheckAgreement():
+  """Check the agreement on who is the master.
+
+  The function uses a very simple algorithm: we must get more positive
+  than negative answers. Since in most of the cases we are the master,
+  we'll use our own config file for getting the node list. In the
+  future we could collect the current node list from our (possibly
+  obsolete) known nodes.
+
+  In order to account for cold-start of all nodes, we retry for up to
+  a minute until we get a real answer as the top-voted one. If the
+  nodes are more out-of-sync, for now manual startup of the master
+  should be attempted.
+
+  Note that for a even number of nodes cluster, we need at least half
+  of the nodes (beside ourselves) to vote for us. This creates a
+  problem on two-node clusters, since in this case we require the
+  other node to be up too to confirm our status.
+
+  """
+  myself = utils.HostInfo().name
+  #temp instantiation of a config writer, used only to get the node list
+  cfg = config.ConfigWriter()
+  node_list = cfg.GetNodeList()
+  del cfg
+  retries = 6
+  while retries > 0:
+    votes = bootstrap.GatherMasterVotes(node_list)
+    if not votes:
+      # empty node list, this is a one node cluster
+      return True
+    if votes[0][0] is None:
+      retries -= 1
+      time.sleep(10)
+      continue
+    break
+  if retries == 0:
+    logging.critical("Cluster inconsistent, most of the nodes didn't answer"
+                     " after multiple retries. Aborting startup")
+    return False
+  # here a real node is at the top of the list
+  all_votes = sum(item[1] for item in votes)
+  top_node, top_votes = votes[0]
+  result = False
+  if top_node != myself:
+    logging.critical("It seems we are not the master (top-voted node"
+                     " is %s with %d out of %d votes)", top_node, top_votes,
+                     all_votes)
+  elif top_votes < all_votes - top_votes:
+    logging.critical("It seems we are not the master (%d votes for,"
+                     " %d votes against)", top_votes, all_votes - top_votes)
+  else:
+    result = True
+
+  return result
+
+
 def main():
   """Main function"""
 
 def main():
   """Main function"""
 
@@ -349,31 +471,68 @@ def main():
   utils.debug = options.debug
   utils.no_fork = True
 
   utils.debug = options.debug
   utils.no_fork = True
 
-  CheckMaster(options.debug)
+  if options.fork:
+    utils.CloseFDs()
 
 
-  master = IOServer(constants.MASTER_SOCKET, ClientRqHandler)
+  rpc.Init()
+  try:
+    ssconf.CheckMaster(options.debug)
+
+    # we believe we are the master, let's ask the other nodes...
+    if options.no_voting:
+      sys.stdout.write("The 'no voting' option has been selected.\n")
+      sys.stdout.write("This is dangerous, please confirm by"
+                       " typing uppercase 'yes': ")
+      sys.stdout.flush()
+      confirmation = sys.stdin.readline().strip()
+      if confirmation != "YES":
+        print "Aborting."
+        return
+    else:
+      if not CheckAgreement():
+        return
+
+    dirs = [(constants.RUN_GANETI_DIR, constants.RUN_DIRS_MODE),
+            (constants.SOCKET_DIR, constants.SOCKET_DIR_MODE),
+           ]
+    utils.EnsureDirs(dirs)
+
+    # This is safe to do as the pid file guarantees against
+    # concurrent execution.
+    utils.RemoveFile(constants.MASTER_SOCKET)
+
+    master = IOServer(constants.MASTER_SOCKET, ClientRqHandler)
+  finally:
+    rpc.Shutdown()
 
   # become a daemon
   if options.fork:
 
   # become a daemon
   if options.fork:
-    utils.Daemonize(logfile=constants.LOG_MASTERDAEMON,
-                    noclose_fds=[master.fileno()])
+    utils.Daemonize(logfile=constants.LOG_MASTERDAEMON)
 
 
+  utils.WritePidFile(constants.MASTERD_PID)
   try:
   try:
-    utils.Lock('cmd', debug=options.debug)
-  except errors.LockError, err:
-    print >> sys.stderr, str(err)
-    master.server_cleanup()
-    return
+    utils.SetupLogging(constants.LOG_MASTERDAEMON, debug=options.debug,
+                       stderr_logging=not options.fork, multithreaded=True)
 
 
-  try:
-    master.setup_processors()
+    logging.info("Ganeti master daemon startup")
+
+    rpc.Init()
     try:
     try:
-      master.serve_forever()
+      # activate ip
+      master_node = ssconf.SimpleConfigReader().GetMasterNode()
+      if not rpc.RpcRunner.call_node_start_master(master_node, False):
+        logging.error("Can't activate master IP address")
+
+      master.setup_queue()
+      try:
+        master.serve_forever()
+      finally:
+        master.server_cleanup()
     finally:
     finally:
-      master.server_cleanup()
+      rpc.Shutdown()
   finally:
   finally:
-    utils.Unlock('cmd')
-    utils.LockCleanup()
+    utils.RemovePidFile(constants.MASTERD_PID)
+    utils.RemoveFile(constants.MASTER_SOCKET)
 
 
 if __name__ == "__main__":
 
 
 if __name__ == "__main__":