Cluster: add nicparams, and update them on upgrade
[ganeti-local] / daemons / ganeti-masterd
old mode 100644 (file)
new mode 100755 (executable)
index b18a8bb..2305fc6
@@ -1,4 +1,4 @@
-#!/usr/bin/python
+#!/usr/bin/python -u
 #
 
 # Copyright (C) 2006, 2007 Google Inc.
 #
 
 # Copyright (C) 2006, 2007 Google Inc.
@@ -27,24 +27,55 @@ inheritance from parent classes requires it.
 """
 
 
 """
 
 
+import os
+import errno
+import sys
 import SocketServer
 import SocketServer
-import threading
 import time
 import collections
 import Queue
 import random
 import signal
 import time
 import collections
 import Queue
 import random
 import signal
-import simplejson
-
+import logging
 
 from cStringIO import StringIO
 
 from cStringIO import StringIO
+from optparse import OptionParser
 
 
+from ganeti import config
 from ganeti import constants
 from ganeti import mcpu
 from ganeti import opcodes
 from ganeti import jqueue
 from ganeti import constants
 from ganeti import mcpu
 from ganeti import opcodes
 from ganeti import jqueue
+from ganeti import locking
 from ganeti import luxi
 from ganeti import utils
 from ganeti import luxi
 from ganeti import utils
+from ganeti import errors
+from ganeti import ssconf
+from ganeti import workerpool
+from ganeti import rpc
+from ganeti import bootstrap
+from ganeti import serializer
+
+
+CLIENT_REQUEST_WORKERS = 16
+
+EXIT_NOTMASTER = constants.EXIT_NOTMASTER
+EXIT_NODESETUP_ERROR = constants.EXIT_NODESETUP_ERROR
+
+
+class ClientRequestWorker(workerpool.BaseWorker):
+  def RunTask(self, server, request, client_address):
+    """Process the request.
+
+    This is copied from the code in ThreadingMixIn.
+
+    """
+    try:
+      server.finish_request(request, client_address)
+      server.close_request(request)
+    except:
+      server.handle_error(request, client_address)
+      server.close_request(request)
 
 
 class IOServer(SocketServer.UnixStreamServer):
 
 
 class IOServer(SocketServer.UnixStreamServer):
@@ -55,54 +86,53 @@ class IOServer(SocketServer.UnixStreamServer):
   cleanup at shutdown.
 
   """
   cleanup at shutdown.
 
   """
-  QUEUE_PROCESSOR_SIZE = 1
-
   def __init__(self, address, rqhandler):
   def __init__(self, address, rqhandler):
-    SocketServer.UnixStreamServer.__init__(self, address, rqhandler)
-    self.do_quit = False
-    self.queue = jqueue.QueueManager()
-    self.processors = []
-    for i in range(self.QUEUE_PROCESSOR_SIZE):
-      self.processors.append(threading.Thread(target=PoolWorker,
-                                              args=(i, self.queue.new_queue)))
-    for t in self.processors:
-      t.start()
-    signal.signal(signal.SIGINT, self.handle_sigint)
-
-  def process_request_thread(self, request, client_address):
-    """Process the request.
+    """IOServer constructor
 
 
-    This is copied from the code in ThreadingMixIn.
+    @param address: the address to bind this IOServer to
+    @param rqhandler: RequestHandler type object
 
     """
 
     """
-    try:
-      self.finish_request(request, client_address)
-      self.close_request(request)
-    except:
-      self.handle_error(request, client_address)
-      self.close_request(request)
+    SocketServer.UnixStreamServer.__init__(self, address, rqhandler)
 
 
-  def process_request(self, request, client_address):
-    """Start a new thread to process the request.
+    # We'll only start threads once we've forked.
+    self.context = None
+    self.request_workers = None
 
 
-    This is copied from the coode in ThreadingMixIn.
+  def setup_queue(self):
+    self.context = GanetiContext()
+    self.request_workers = workerpool.WorkerPool(CLIENT_REQUEST_WORKERS,
+                                                 ClientRequestWorker)
 
 
-    """
-    t = threading.Thread(target=self.process_request_thread,
-                         args=(request, client_address))
-    t.start()
+  def process_request(self, request, client_address):
+    """Add task to workerpool to process request.
 
 
-  def handle_sigint(self, signum, frame):
-    print "received %s in %s" % (signum, frame)
-    self.do_quit = True
-    self.server_close()
-    for i in range(self.QUEUE_PROCESSOR_SIZE):
-      self.queue.new_queue.put(None)
+    """
+    self.request_workers.AddTask(self, request, client_address)
 
   def serve_forever(self):
     """Handle one request at a time until told to quit."""
 
   def serve_forever(self):
     """Handle one request at a time until told to quit."""
-    while not self.do_quit:
-      self.handle_request()
+    sighandler = utils.SignalHandler([signal.SIGINT, signal.SIGTERM])
+    try:
+      while not sighandler.called:
+        self.handle_request()
+    finally:
+      sighandler.Reset()
+
+  def server_cleanup(self):
+    """Cleanup the server.
+
+    This involves shutting down the processor threads and the master
+    socket.
+
+    """
+    try:
+      self.server_close()
+    finally:
+      if self.request_workers:
+        self.request_workers.TerminateWorkers()
+      if self.context:
+        self.context.jobqueue.Shutdown()
 
 
 class ClientRqHandler(SocketServer.BaseRequestHandler):
 
 
 class ClientRqHandler(SocketServer.BaseRequestHandler):
@@ -119,21 +149,39 @@ class ClientRqHandler(SocketServer.BaseRequestHandler):
     while True:
       msg = self.read_message()
       if msg is None:
     while True:
       msg = self.read_message()
       if msg is None:
-        print "client closed connection"
+        logging.debug("client closed connection")
         break
         break
-      request = simplejson.loads(msg)
+
+      request = serializer.LoadJson(msg)
+      logging.debug("request: %s", request)
       if not isinstance(request, dict):
       if not isinstance(request, dict):
-        print "wrong request received: %s" % msg
+        logging.error("wrong request received: %s", msg)
         break
         break
-      method = request.get('request', None)
-      data = request.get('data', None)
-      if method is None or data is None:
-        print "no method or data in request"
+
+      method = request.get(luxi.KEY_METHOD, None)
+      args = request.get(luxi.KEY_ARGS, None)
+      if method is None or args is None:
+        logging.error("no method or args in request")
         break
         break
-      print "request:", method, data
-      result = self._ops.handle_request(method, data)
-      print "result:", result
-      self.send_message(simplejson.dumps({'success': True, 'result': result}))
+
+      success = False
+      try:
+        result = self._ops.handle_request(method, args)
+        success = True
+      except errors.GenericError, err:
+        success = False
+        result = (err.__class__.__name__, err.args)
+      except:
+        logging.error("Unexpected exception", exc_info=True)
+        err = sys.exc_info()
+        result = "Caught exception: %s" % str(err[1])
+
+      response = {
+        luxi.KEY_SUCCESS: success,
+        luxi.KEY_RESULT: result,
+        }
+      logging.debug("response: %s", response)
+      self.send_message(serializer.DumpJson(response))
 
   def read_message(self):
     while not self._msgs:
 
   def read_message(self):
     while not self._msgs:
@@ -154,92 +202,337 @@ class ClientOps:
   """Class holding high-level client operations."""
   def __init__(self, server):
     self.server = server
   """Class holding high-level client operations."""
   def __init__(self, server):
     self.server = server
-    self._cpu = None
-
-  def _getcpu(self):
-    if self._cpu is None:
-      self._cpu = mcpu.Processor(lambda x: None)
-    return self._cpu
-
-  def handle_request(self, operation, args):
-    print operation, args
-    if operation == "submit":
-      return self.put(args)
-    elif operation == "query":
-      return self.query(args)
-    else:
-      raise ValueError("Invalid operation")
-
-  def put(self, args):
-    job = luxi.UnserializeJob(args)
-    rid = self.server.queue.put(job)
-    return rid
-
-  def query(self, args):
-    path = args["object"]
-    fields = args["fields"]
-    names = args["names"]
-    if path == "instances":
-      opclass = opcodes.OpQueryInstances
-    elif path == "jobs":
-      # early exit because job query-ing is special (not via opcodes)
-      return self.query_jobs(fields, names)
+
+  def handle_request(self, method, args):
+    queue = self.server.context.jobqueue
+
+    # TODO: Parameter validation
+
+    if method == luxi.REQ_SUBMIT_JOB:
+      logging.info("Received new job")
+      ops = [opcodes.OpCode.LoadOpCode(state) for state in args]
+      return queue.SubmitJob(ops)
+
+    if method == luxi.REQ_SUBMIT_MANY_JOBS:
+      logging.info("Received multiple jobs")
+      jobs = []
+      for ops in args:
+        jobs.append([opcodes.OpCode.LoadOpCode(state) for state in ops])
+      return queue.SubmitManyJobs(jobs)
+
+    elif method == luxi.REQ_CANCEL_JOB:
+      job_id = args
+      logging.info("Received job cancel request for %s", job_id)
+      return queue.CancelJob(job_id)
+
+    elif method == luxi.REQ_ARCHIVE_JOB:
+      job_id = args
+      logging.info("Received job archive request for %s", job_id)
+      return queue.ArchiveJob(job_id)
+
+    elif method == luxi.REQ_AUTOARCHIVE_JOBS:
+      (age, timeout) = args
+      logging.info("Received job autoarchive request for age %s, timeout %s",
+                   age, timeout)
+      return queue.AutoArchiveJobs(age, timeout)
+
+    elif method == luxi.REQ_WAIT_FOR_JOB_CHANGE:
+      (job_id, fields, prev_job_info, prev_log_serial, timeout) = args
+      logging.info("Received job poll request for %s", job_id)
+      return queue.WaitForJobChanges(job_id, fields, prev_job_info,
+                                     prev_log_serial, timeout)
+
+    elif method == luxi.REQ_QUERY_JOBS:
+      (job_ids, fields) = args
+      if isinstance(job_ids, (tuple, list)) and job_ids:
+        msg = ", ".join(job_ids)
+      else:
+        msg = str(job_ids)
+      logging.info("Received job query request for %s", msg)
+      return queue.QueryJobs(job_ids, fields)
+
+    elif method == luxi.REQ_QUERY_INSTANCES:
+      (names, fields, use_locking) = args
+      logging.info("Received instance query request for %s", names)
+      if use_locking:
+        raise errors.OpPrereqError("Sync queries are not allowed")
+      op = opcodes.OpQueryInstances(names=names, output_fields=fields,
+                                    use_locking=use_locking)
+      return self._Query(op)
+
+    elif method == luxi.REQ_QUERY_NODES:
+      (names, fields, use_locking) = args
+      logging.info("Received node query request for %s", names)
+      if use_locking:
+        raise errors.OpPrereqError("Sync queries are not allowed")
+      op = opcodes.OpQueryNodes(names=names, output_fields=fields,
+                                use_locking=use_locking)
+      return self._Query(op)
+
+    elif method == luxi.REQ_QUERY_EXPORTS:
+      nodes, use_locking = args
+      if use_locking:
+        raise errors.OpPrereqError("Sync queries are not allowed")
+      logging.info("Received exports query request")
+      op = opcodes.OpQueryExports(nodes=nodes, use_locking=use_locking)
+      return self._Query(op)
+
+    elif method == luxi.REQ_QUERY_CONFIG_VALUES:
+      fields = args
+      logging.info("Received config values query request for %s", fields)
+      op = opcodes.OpQueryConfigValues(output_fields=fields)
+      return self._Query(op)
+
+    elif method == luxi.REQ_QUERY_CLUSTER_INFO:
+      logging.info("Received cluster info query request")
+      op = opcodes.OpQueryClusterInfo()
+      return self._Query(op)
+
+    elif method == luxi.REQ_QUEUE_SET_DRAIN_FLAG:
+      drain_flag = args
+      logging.info("Received queue drain flag change request to %s",
+                   drain_flag)
+      return queue.SetDrainFlag(drain_flag)
+
     else:
     else:
-      raise ValueError("Invalid object %s" % path)
+      logging.info("Received invalid request '%s'", method)
+      raise ValueError("Invalid operation '%s'" % method)
 
 
-    op = opclass(output_fields = fields, names=names)
-    cpu = self._getcpu()
-    result = cpu.ExecOpCode(op)
-    return result
+  def _DummyLog(self, *args):
+    pass
 
 
-  def query_jobs(self, fields, names):
-    return self.server.queue.query_jobs(fields, names)
+  def _Query(self, op):
+    """Runs the specified opcode and returns the result.
+
+    """
+    proc = mcpu.Processor(self.server.context)
+    # TODO: Where should log messages go?
+    return proc.ExecOpCode(op, self._DummyLog, None)
 
 
 
 
-def JobRunner(proc, job):
-  """Job executor.
+class GanetiContext(object):
+  """Context common to all ganeti threads.
 
 
-  This functions processes a single job in the context of given
-  processor instance.
+  This class creates and holds common objects shared by all threads.
 
   """
 
   """
-  job.SetStatus(opcodes.Job.STATUS_RUNNING)
-  for op in job.data.op_list:
-    proc.ExecOpCode(op)
-  job.SetStatus(opcodes.Job.STATUS_FINISHED, result=opcodes.Job.RESULT_OK)
+  _instance = None
 
 
+  def __init__(self):
+    """Constructs a new GanetiContext object.
 
 
-def PoolWorker(worker_id, incoming_queue):
-  """A worker thread function.
+    There should be only a GanetiContext object at any time, so this
+    function raises an error if this is not the case.
 
 
-  This is the actual processor of a single thread of Job execution.
+    """
+    assert self.__class__._instance is None, "double GanetiContext instance"
+
+    # Create global configuration object
+    self.cfg = config.ConfigWriter()
+
+    # Locking manager
+    self.glm = locking.GanetiLockManager(
+                self.cfg.GetNodeList(),
+                self.cfg.GetInstanceList())
+
+    # Job queue
+    self.jobqueue = jqueue.JobQueue(self)
+
+    # setting this also locks the class against attribute modifications
+    self.__class__._instance = self
+
+  def __setattr__(self, name, value):
+    """Setting GanetiContext attributes is forbidden after initialization.
+
+    """
+    assert self.__class__._instance is None, "Attempt to modify Ganeti Context"
+    object.__setattr__(self, name, value)
+
+  def AddNode(self, node):
+    """Adds a node to the configuration and lock manager.
+
+    """
+    # Add it to the configuration
+    self.cfg.AddNode(node)
+
+    # If preseeding fails it'll not be added
+    self.jobqueue.AddNode(node)
+
+    # Add the new node to the Ganeti Lock Manager
+    self.glm.add(locking.LEVEL_NODE, node.name)
+
+  def ReaddNode(self, node):
+    """Updates a node that's already in the configuration
+
+    """
+    # Synchronize the queue again
+    self.jobqueue.AddNode(node)
+
+  def RemoveNode(self, name):
+    """Removes a node from the configuration and lock manager.
+
+    """
+    # Remove node from configuration
+    self.cfg.RemoveNode(name)
+
+    # Notify job queue
+    self.jobqueue.RemoveNode(name)
+
+    # Remove the node from the Ganeti Lock Manager
+    self.glm.remove(locking.LEVEL_NODE, name)
+
+
+def ParseOptions():
+  """Parse the command line options.
+
+  @return: (options, args) as from OptionParser.parse_args()
 
   """
 
   """
-  while True:
-    print "worker %s sleeping" % worker_id
-    item = incoming_queue.get(True)
-    if item is None:
-      break
-    print "worker %s processing job %s" % (worker_id, item.data.job_id)
-    utils.Lock('cmd')
-    try:
-      proc = mcpu.Processor(feedback=lambda x: None)
-      try:
-        JobRunner(proc, item)
-      except errors.GenericError, err:
-        print "ganeti exception %s" % err
-    finally:
-      utils.Unlock('cmd')
-      utils.LockCleanup()
-    print "worker %s finish job %s" % (worker_id, item.data.job_id)
-  print "worker %s exiting" % worker_id
+  parser = OptionParser(description="Ganeti master daemon",
+                        usage="%prog [-f] [-d]",
+                        version="%%prog (ganeti) %s" %
+                        constants.RELEASE_VERSION)
+
+  parser.add_option("-f", "--foreground", dest="fork",
+                    help="Don't detach from the current terminal",
+                    default=True, action="store_false")
+  parser.add_option("-d", "--debug", dest="debug",
+                    help="Enable some debug messages",
+                    default=False, action="store_true")
+  parser.add_option("--no-voting", dest="no_voting",
+                    help="Do not check that the nodes agree on this node"
+                    " being the master and start the daemon unconditionally",
+                    default=False, action="store_true")
+  options, args = parser.parse_args()
+  return options, args
+
+
+def CheckAgreement():
+  """Check the agreement on who is the master.
+
+  The function uses a very simple algorithm: we must get more positive
+  than negative answers. Since in most of the cases we are the master,
+  we'll use our own config file for getting the node list. In the
+  future we could collect the current node list from our (possibly
+  obsolete) known nodes.
+
+  In order to account for cold-start of all nodes, we retry for up to
+  a minute until we get a real answer as the top-voted one. If the
+  nodes are more out-of-sync, for now manual startup of the master
+  should be attempted.
+
+  Note that for a even number of nodes cluster, we need at least half
+  of the nodes (beside ourselves) to vote for us. This creates a
+  problem on two-node clusters, since in this case we require the
+  other node to be up too to confirm our status.
+
+  """
+  myself = utils.HostInfo().name
+  #temp instantiation of a config writer, used only to get the node list
+  cfg = config.ConfigWriter()
+  node_list = cfg.GetNodeList()
+  del cfg
+  retries = 6
+  while retries > 0:
+    votes = bootstrap.GatherMasterVotes(node_list)
+    if not votes:
+      # empty node list, this is a one node cluster
+      return True
+    if votes[0][0] is None:
+      retries -= 1
+      time.sleep(10)
+      continue
+    break
+  if retries == 0:
+    logging.critical("Cluster inconsistent, most of the nodes didn't answer"
+                     " after multiple retries. Aborting startup")
+    return False
+  # here a real node is at the top of the list
+  all_votes = sum(item[1] for item in votes)
+  top_node, top_votes = votes[0]
+  result = False
+  if top_node != myself:
+    logging.critical("It seems we are not the master (top-voted node"
+                     " is %s with %d out of %d votes)", top_node, top_votes,
+                     all_votes)
+  elif top_votes < all_votes - top_votes:
+    logging.critical("It seems we are not the master (%d votes for,"
+                     " %d votes against)", top_votes, all_votes - top_votes)
+  else:
+    result = True
+
+  return result
 
 
 def main():
   """Main function"""
 
 
 
 def main():
   """Main function"""
 
-  master = IOServer(constants.MASTER_SOCKET, ClientRqHandler)
-  master.serve_forever()
+  options, args = ParseOptions()
+  utils.debug = options.debug
+  utils.no_fork = True
+
+  if options.fork:
+    utils.CloseFDs()
+
+  rpc.Init()
+  try:
+    ssconf.CheckMaster(options.debug)
+
+    # we believe we are the master, let's ask the other nodes...
+    if options.no_voting:
+      sys.stdout.write("The 'no voting' option has been selected.\n")
+      sys.stdout.write("This is dangerous, please confirm by"
+                       " typing uppercase 'yes': ")
+      sys.stdout.flush()
+      confirmation = sys.stdin.readline().strip()
+      if confirmation != "YES":
+        print "Aborting."
+        return
+    else:
+      if not CheckAgreement():
+        return
+
+    dirs = [(constants.RUN_GANETI_DIR, constants.RUN_DIRS_MODE),
+            (constants.SOCKET_DIR, constants.SOCKET_DIR_MODE),
+           ]
+    utils.EnsureDirs(dirs)
+
+    # This is safe to do as the pid file guarantees against
+    # concurrent execution.
+    utils.RemoveFile(constants.MASTER_SOCKET)
+
+    master = IOServer(constants.MASTER_SOCKET, ClientRqHandler)
+  finally:
+    rpc.Shutdown()
+
+  # become a daemon
+  if options.fork:
+    utils.Daemonize(logfile=constants.LOG_MASTERDAEMON)
+
+  utils.WritePidFile(constants.MASTERD_PID)
+  try:
+    utils.SetupLogging(constants.LOG_MASTERDAEMON, debug=options.debug,
+                       stderr_logging=not options.fork, multithreaded=True)
+
+    logging.info("Ganeti master daemon startup")
+
+    rpc.Init()
+    try:
+      # activate ip
+      master_node = ssconf.SimpleConfigReader().GetMasterNode()
+      if not rpc.RpcRunner.call_node_start_master(master_node, False):
+        logging.error("Can't activate master IP address")
+
+      master.setup_queue()
+      try:
+        master.serve_forever()
+      finally:
+        master.server_cleanup()
+    finally:
+      rpc.Shutdown()
+  finally:
+    utils.RemovePidFile(constants.MASTERD_PID)
+    utils.RemoveFile(constants.MASTER_SOCKET)
 
 
 if __name__ == "__main__":
 
 
 if __name__ == "__main__":