Remove references to utils.debug
[ganeti-local] / daemons / ganeti-masterd
index 5e524d9..d9b8e74 100755 (executable)
@@ -36,7 +36,6 @@ import collections
 import Queue
 import random
 import signal
-import simplejson
 import logging
 
 from cStringIO import StringIO
@@ -55,6 +54,7 @@ from ganeti import ssconf
 from ganeti import workerpool
 from ganeti import rpc
 from ganeti import bootstrap
+from ganeti import serializer
 
 
 CLIENT_REQUEST_WORKERS = 16
@@ -149,10 +149,10 @@ class ClientRqHandler(SocketServer.BaseRequestHandler):
     while True:
       msg = self.read_message()
       if msg is None:
-        logging.info("client closed connection")
+        logging.debug("client closed connection")
         break
 
-      request = simplejson.loads(msg)
+      request = serializer.LoadJson(msg)
       logging.debug("request: %s", request)
       if not isinstance(request, dict):
         logging.error("wrong request received: %s", msg)
@@ -181,7 +181,7 @@ class ClientRqHandler(SocketServer.BaseRequestHandler):
         luxi.KEY_RESULT: result,
         }
       logging.debug("response: %s", response)
-      self.send_message(simplejson.dumps(response))
+      self.send_message(serializer.DumpJson(response))
 
   def read_message(self):
     while not self._msgs:
@@ -195,6 +195,7 @@ class ClientRqHandler(SocketServer.BaseRequestHandler):
 
   def send_message(self, msg):
     #print "sending", msg
+    # TODO: sendall is not guaranteed to send everything
     self.request.sendall(msg + self.EOM)
 
 
@@ -209,56 +210,94 @@ class ClientOps:
     # TODO: Parameter validation
 
     if method == luxi.REQ_SUBMIT_JOB:
+      logging.info("Received new job")
       ops = [opcodes.OpCode.LoadOpCode(state) for state in args]
       return queue.SubmitJob(ops)
 
+    if method == luxi.REQ_SUBMIT_MANY_JOBS:
+      logging.info("Received multiple jobs")
+      jobs = []
+      for ops in args:
+        jobs.append([opcodes.OpCode.LoadOpCode(state) for state in ops])
+      return queue.SubmitManyJobs(jobs)
+
     elif method == luxi.REQ_CANCEL_JOB:
       job_id = args
+      logging.info("Received job cancel request for %s", job_id)
       return queue.CancelJob(job_id)
 
     elif method == luxi.REQ_ARCHIVE_JOB:
       job_id = args
+      logging.info("Received job archive request for %s", job_id)
       return queue.ArchiveJob(job_id)
 
     elif method == luxi.REQ_AUTOARCHIVE_JOBS:
       (age, timeout) = args
+      logging.info("Received job autoarchive request for age %s, timeout %s",
+                   age, timeout)
       return queue.AutoArchiveJobs(age, timeout)
 
     elif method == luxi.REQ_WAIT_FOR_JOB_CHANGE:
       (job_id, fields, prev_job_info, prev_log_serial, timeout) = args
+      logging.info("Received job poll request for %s", job_id)
       return queue.WaitForJobChanges(job_id, fields, prev_job_info,
                                      prev_log_serial, timeout)
 
     elif method == luxi.REQ_QUERY_JOBS:
       (job_ids, fields) = args
+      if isinstance(job_ids, (tuple, list)) and job_ids:
+        msg = ", ".join(job_ids)
+      else:
+        msg = str(job_ids)
+      logging.info("Received job query request for %s", msg)
       return queue.QueryJobs(job_ids, fields)
 
     elif method == luxi.REQ_QUERY_INSTANCES:
-      (names, fields) = args
-      op = opcodes.OpQueryInstances(names=names, output_fields=fields)
+      (names, fields, use_locking) = args
+      logging.info("Received instance query request for %s", names)
+      if use_locking:
+        raise errors.OpPrereqError("Sync queries are not allowed")
+      op = opcodes.OpQueryInstances(names=names, output_fields=fields,
+                                    use_locking=use_locking)
       return self._Query(op)
 
     elif method == luxi.REQ_QUERY_NODES:
-      (names, fields) = args
-      op = opcodes.OpQueryNodes(names=names, output_fields=fields)
+      (names, fields, use_locking) = args
+      logging.info("Received node query request for %s", names)
+      if use_locking:
+        raise errors.OpPrereqError("Sync queries are not allowed")
+      op = opcodes.OpQueryNodes(names=names, output_fields=fields,
+                                use_locking=use_locking)
       return self._Query(op)
 
     elif method == luxi.REQ_QUERY_EXPORTS:
-      nodes = args
-      op = opcodes.OpQueryExports(nodes=nodes)
+      nodes, use_locking = args
+      if use_locking:
+        raise errors.OpPrereqError("Sync queries are not allowed")
+      logging.info("Received exports query request")
+      op = opcodes.OpQueryExports(nodes=nodes, use_locking=use_locking)
       return self._Query(op)
 
     elif method == luxi.REQ_QUERY_CONFIG_VALUES:
       fields = args
+      logging.info("Received config values query request for %s", fields)
       op = opcodes.OpQueryConfigValues(output_fields=fields)
       return self._Query(op)
 
+    elif method == luxi.REQ_QUERY_CLUSTER_INFO:
+      logging.info("Received cluster info query request")
+      op = opcodes.OpQueryClusterInfo()
+      return self._Query(op)
+
     elif method == luxi.REQ_QUEUE_SET_DRAIN_FLAG:
       drain_flag = args
+      logging.info("Received queue drain flag change request to %s",
+                   drain_flag)
       return queue.SetDrainFlag(drain_flag)
 
     else:
-      raise ValueError("Invalid operation")
+      logging.info("Received invalid request '%s'", method)
+      raise ValueError("Invalid operation '%s'" % method)
 
   def _DummyLog(self, *args):
     pass
@@ -361,6 +400,14 @@ def ParseOptions():
   parser.add_option("-d", "--debug", dest="debug",
                     help="Enable some debug messages",
                     default=False, action="store_true")
+  parser.add_option("--no-voting", dest="no_voting",
+                    help="Do not check that the nodes agree on this node"
+                    " being the master and start the daemon unconditionally",
+                    default=False, action="store_true")
+  parser.add_option("--yes-do-it", dest="yes_do_it",
+                    help="Override interactive check for --no-voting",
+                    default=False, action="store_true")
+
   options, args = parser.parse_args()
   return options, args
 
@@ -402,9 +449,9 @@ def CheckAgreement():
       continue
     break
   if retries == 0:
-      logging.critical("Cluster inconsistent, most of the nodes didn't answer"
-                       " after multiple retries. Aborting startup")
-      return False
+    logging.critical("Cluster inconsistent, most of the nodes didn't answer"
+                     " after multiple retries. Aborting startup")
+    return False
   # here a real node is at the top of the list
   all_votes = sum(item[1] for item in votes)
   top_node, top_votes = votes[0]
@@ -426,29 +473,33 @@ def main():
   """Main function"""
 
   options, args = ParseOptions()
-  utils.debug = options.debug
   utils.no_fork = True
 
+  if options.fork:
+    utils.CloseFDs()
+
   rpc.Init()
   try:
     ssconf.CheckMaster(options.debug)
 
     # we believe we are the master, let's ask the other nodes...
-    if not CheckAgreement():
-      return
+    if options.no_voting and not options.yes_do_it:
+      sys.stdout.write("The 'no voting' option has been selected.\n")
+      sys.stdout.write("This is dangerous, please confirm by"
+                       " typing uppercase 'yes': ")
+      sys.stdout.flush()
+      confirmation = sys.stdin.readline().strip()
+      if confirmation != "YES":
+        print "Aborting."
+        return
+    elif not options.no_voting:
+      if not CheckAgreement():
+        return
 
     dirs = [(constants.RUN_GANETI_DIR, constants.RUN_DIRS_MODE),
             (constants.SOCKET_DIR, constants.SOCKET_DIR_MODE),
            ]
-    for dir, mode in dirs:
-      try:
-        os.mkdir(dir, mode)
-      except EnvironmentError, err:
-        if err.errno != errno.EEXIST:
-          raise errors.GenericError("Cannot create needed directory"
-            " '%s': %s" % (constants.SOCKET_DIR, err))
-      if not os.path.isdir(dir):
-        raise errors.GenericError("%s is not a directory" % dir)
+    utils.EnsureDirs(dirs)
 
     # This is safe to do as the pid file guarantees against
     # concurrent execution.
@@ -460,20 +511,19 @@ def main():
 
   # become a daemon
   if options.fork:
-    utils.Daemonize(logfile=constants.LOG_MASTERDAEMON,
-                    noclose_fds=[master.fileno()])
+    utils.Daemonize(logfile=constants.LOG_MASTERDAEMON)
 
   utils.WritePidFile(constants.MASTERD_PID)
   try:
     utils.SetupLogging(constants.LOG_MASTERDAEMON, debug=options.debug,
-                       stderr_logging=not options.fork)
+                       stderr_logging=not options.fork, multithreaded=True)
 
     logging.info("Ganeti master daemon startup")
 
     rpc.Init()
     try:
       # activate ip
-      master_node = ssconf.SimpleConfigReader().GetMasterNode()
+      master_node = ssconf.SimpleStore().GetMasterNode()
       if not rpc.RpcRunner.call_node_start_master(master_node, False):
         logging.error("Can't activate master IP address")