Use new class for wakeup file descriptor in import/export daemon
[ganeti-local] / daemons / ganeti-masterd
index b2e5bc1..1f909fc 100755 (executable)
@@ -1,4 +1,4 @@
-#!/usr/bin/python -u
+#!/usr/bin/python
 #
 
 # Copyright (C) 2006, 2007 Google Inc.
@@ -26,8 +26,9 @@ inheritance from parent classes requires it.
 
 """
 
+# pylint: disable-msg=C0103
+# C0103: Invalid name ganeti-masterd
 
-import os
 import sys
 import SocketServer
 import time
@@ -35,7 +36,6 @@ import collections
 import signal
 import logging
 
-from cStringIO import StringIO
 from optparse import OptionParser
 
 from ganeti import config
@@ -52,7 +52,6 @@ from ganeti import ssconf
 from ganeti import workerpool
 from ganeti import rpc
 from ganeti import bootstrap
-from ganeti import serializer
 
 
 CLIENT_REQUEST_WORKERS = 16
@@ -62,6 +61,7 @@ EXIT_NODESETUP_ERROR = constants.EXIT_NODESETUP_ERROR
 
 
 class ClientRequestWorker(workerpool.BaseWorker):
+   # pylint: disable-msg=W0221
   def RunTask(self, server, request, client_address):
     """Process the request.
 
@@ -71,7 +71,7 @@ class ClientRequestWorker(workerpool.BaseWorker):
     try:
       server.finish_request(request, client_address)
       server.close_request(request)
-    except:
+    except: # pylint: disable-msg=W0702
       server.handle_error(request, client_address)
       server.close_request(request)
 
@@ -99,17 +99,25 @@ class IOServer(SocketServer.UnixStreamServer):
 
   def setup_queue(self):
     self.context = GanetiContext()
-    self.request_workers = workerpool.WorkerPool(CLIENT_REQUEST_WORKERS,
+    self.request_workers = workerpool.WorkerPool("ClientReq",
+                                                 CLIENT_REQUEST_WORKERS,
                                                  ClientRequestWorker)
 
   def process_request(self, request, client_address):
     """Add task to workerpool to process request.
 
     """
+    (pid, uid, gid) = utils.GetSocketCredentials(request)
+    logging.info("Accepted connection from pid=%s, uid=%s, gid=%s",
+                 pid, uid, gid)
+
     self.request_workers.AddTask(self, request, client_address)
 
+  def handle_error(self, request, client_address):
+    logging.exception("Error while handling request")
+
   @utils.SignalHandled([signal.SIGINT, signal.SIGTERM])
-  def serve_forever(self, signal_handlers=None):
+  def serve_forever(self, signal_handlers=None): # pylint: disable-msg=W0221
     """Handle one request at a time until told to quit."""
     assert isinstance(signal_handlers, dict) and \
            len(signal_handlers) > 0, \
@@ -142,6 +150,8 @@ class ClientRqHandler(SocketServer.BaseRequestHandler):
   READ_SIZE = 4096
 
   def setup(self):
+    # pylint: disable-msg=W0201
+    # setup() is the api for initialising for this class
     self._buffer = ""
     self._msgs = collections.deque()
     self._ops = ClientOps(self.server)
@@ -153,36 +163,19 @@ class ClientRqHandler(SocketServer.BaseRequestHandler):
         logging.debug("client closed connection")
         break
 
-      request = serializer.LoadJson(msg)
-      logging.debug("request: %s", request)
-      if not isinstance(request, dict):
-        logging.error("wrong request received: %s", msg)
-        break
-
-      method = request.get(luxi.KEY_METHOD, None)
-      args = request.get(luxi.KEY_ARGS, None)
-      if method is None or args is None:
-        logging.error("no method or args in request")
-        break
+      (method, args) = luxi.ParseRequest(msg)
 
       success = False
       try:
         result = self._ops.handle_request(method, args)
         success = True
       except errors.GenericError, err:
-        success = False
         result = errors.EncodeException(err)
       except:
         logging.error("Unexpected exception", exc_info=True)
-        err = sys.exc_info()
-        result = "Caught exception: %s" % str(err[1])
+        result = "Caught exception: %s" % str(sys.exc_info()[1])
 
-      response = {
-        luxi.KEY_SUCCESS: success,
-        luxi.KEY_RESULT: result,
-        }
-      logging.debug("response: %s", response)
-      self.send_message(serializer.DumpJson(response))
+      self.send_message(luxi.FormatResponse(success, result))
 
   def read_message(self):
     while not self._msgs:
@@ -195,7 +188,6 @@ class ClientRqHandler(SocketServer.BaseRequestHandler):
     return self._msgs.popleft()
 
   def send_message(self, msg):
-    #print "sending", msg
     # TODO: sendall is not guaranteed to send everything
     self.request.sendall(msg + self.EOM)
 
@@ -205,11 +197,13 @@ class ClientOps:
   def __init__(self, server):
     self.server = server
 
-  def handle_request(self, method, args):
+  def handle_request(self, method, args): # pylint: disable-msg=R0911
     queue = self.server.context.jobqueue
 
     # TODO: Parameter validation
 
+    # TODO: Rewrite to not exit in each 'if/elif' branch
+
     if method == luxi.REQ_SUBMIT_JOB:
       logging.info("Received new job")
       ops = [opcodes.OpCode.LoadOpCode(state) for state in args]
@@ -247,7 +241,7 @@ class ClientOps:
     elif method == luxi.REQ_QUERY_JOBS:
       (job_ids, fields) = args
       if isinstance(job_ids, (tuple, list)) and job_ids:
-        msg = ", ".join(job_ids)
+        msg = utils.CommaJoin(job_ids)
       else:
         msg = str(job_ids)
       logging.info("Received job query request for %s", msg)
@@ -257,7 +251,8 @@ class ClientOps:
       (names, fields, use_locking) = args
       logging.info("Received instance query request for %s", names)
       if use_locking:
-        raise errors.OpPrereqError("Sync queries are not allowed")
+        raise errors.OpPrereqError("Sync queries are not allowed",
+                                   errors.ECODE_INVAL)
       op = opcodes.OpQueryInstances(names=names, output_fields=fields,
                                     use_locking=use_locking)
       return self._Query(op)
@@ -266,7 +261,8 @@ class ClientOps:
       (names, fields, use_locking) = args
       logging.info("Received node query request for %s", names)
       if use_locking:
-        raise errors.OpPrereqError("Sync queries are not allowed")
+        raise errors.OpPrereqError("Sync queries are not allowed",
+                                   errors.ECODE_INVAL)
       op = opcodes.OpQueryNodes(names=names, output_fields=fields,
                                 use_locking=use_locking)
       return self._Query(op)
@@ -274,7 +270,8 @@ class ClientOps:
     elif method == luxi.REQ_QUERY_EXPORTS:
       nodes, use_locking = args
       if use_locking:
-        raise errors.OpPrereqError("Sync queries are not allowed")
+        raise errors.OpPrereqError("Sync queries are not allowed",
+                                   errors.ECODE_INVAL)
       logging.info("Received exports query request")
       op = opcodes.OpQueryExports(nodes=nodes, use_locking=use_locking)
       return self._Query(op)
@@ -290,6 +287,12 @@ class ClientOps:
       op = opcodes.OpQueryClusterInfo()
       return self._Query(op)
 
+    elif method == luxi.REQ_QUERY_TAGS:
+      kind, name = args
+      logging.info("Received tags query request")
+      op = opcodes.OpGetTags(kind=kind, name=name)
+      return self._Query(op)
+
     elif method == luxi.REQ_QUEUE_SET_DRAIN_FLAG:
       drain_flag = args
       logging.info("Received queue drain flag change request to %s",
@@ -320,7 +323,8 @@ class ClientOps:
     """Runs the specified opcode and returns the result.
 
     """
-    proc = mcpu.Processor(self.server.context)
+    # Queries don't have a job id
+    proc = mcpu.Processor(self.server.context, None)
     return proc.ExecOpCode(op, None)
 
 
@@ -330,6 +334,8 @@ class GanetiContext(object):
   This class creates and holds common objects shared by all threads.
 
   """
+  # pylint: disable-msg=W0212
+  # we do want to ensure a singleton here
   _instance = None
 
   def __init__(self):
@@ -362,12 +368,12 @@ class GanetiContext(object):
     assert self.__class__._instance is None, "Attempt to modify Ganeti Context"
     object.__setattr__(self, name, value)
 
-  def AddNode(self, node):
+  def AddNode(self, node, ec_id):
     """Adds a node to the configuration and lock manager.
 
     """
     # Add it to the configuration
-    self.cfg.AddNode(node)
+    self.cfg.AddNode(node, ec_id)
 
     # If preseeding fails it'll not be added
     self.jobqueue.AddNode(node)
@@ -451,6 +457,8 @@ def CheckAgreement():
   if retries == 0:
     logging.critical("Cluster inconsistent, most of the nodes didn't answer"
                      " after multiple retries. Aborting startup")
+    logging.critical("Use the --no-voting option if you understand what"
+                     " effects it has on the cluster state")
     return False
   # here a real node is at the top of the list
   all_votes = sum(item[1] for item in votes)
@@ -478,54 +486,14 @@ def CheckAgreementWithRpc():
     rpc.Shutdown()
 
 
-def _RunInSeparateProcess(fn):
-  """Runs a function in a separate process.
-
-  Note: Only boolean return values are supported.
-
-  @type fn: callable
-  @param fn: Function to be called
-  @rtype: bool
-
-  """
-  pid = os.fork()
-  if pid == 0:
-    # Child process
-    try:
-      # Call function
-      result = int(bool(fn()))
-      assert result in (0, 1)
-    except:
-      logging.exception("Error while calling function in separate process")
-      # 0 and 1 are reserved for the return value
-      result = 33
-
-    os._exit(result)
-
-  # Parent process
-
-  # Avoid zombies and check exit code
-  (_, status) = os.waitpid(pid, 0)
-
-  if os.WIFSIGNALED(status):
-    signum = os.WTERMSIG(status)
-    exitcode = None
-  else:
-    signum = None
-    exitcode = os.WEXITSTATUS(status)
-
-  if not (exitcode in (0, 1) and signum is None):
-    logging.error("Child program failed (code=%s, signal=%s)",
-                  exitcode, signum)
-    sys.exit(constants.EXIT_FAILURE)
-
-  return bool(exitcode)
-
-
 def CheckMasterd(options, args):
   """Initial checks whether to run or exit with a failure.
 
   """
+  if args: # masterd doesn't take any arguments
+    print >> sys.stderr, ("Usage: %s [-f] [-d]" % sys.argv[0])
+    sys.exit(constants.EXIT_FAILURE)
+
   ssconf.CheckMaster(options.debug)
 
   # If CheckMaster didn't fail we believe we are the master, but we have to
@@ -541,18 +509,18 @@ def CheckMasterd(options, args):
 
     confirmation = sys.stdin.readline().strip()
     if confirmation != "YES":
-      print >>sys.stderr, "Aborting."
+      print >> sys.stderr, "Aborting."
       sys.exit(constants.EXIT_FAILURE)
 
     return
 
   # CheckAgreement uses RPC and threads, hence it needs to be run in a separate
   # process before we call utils.Daemonize in the current process.
-  if not _RunInSeparateProcess(CheckAgreementWithRpc):
+  if not utils.RunInSeparateProcess(CheckAgreementWithRpc):
     sys.exit(constants.EXIT_FAILURE)
 
 
-def ExecMasterd (options, args):
+def ExecMasterd (options, args): # pylint: disable-msg=W0613
   """Main master daemon function, executed with the PID file held.
 
   """
@@ -567,7 +535,7 @@ def ExecMasterd (options, args):
       # activate ip
       master_node = ssconf.SimpleStore().GetMasterNode()
       result = rpc.RpcRunner.call_node_start_master(master_node, False, False)
-      msg = result.RemoteFailMsg()
+      msg = result.fail_msg
       if msg:
         logging.error("Can't activate master IP address: %s", msg)
 
@@ -599,7 +567,8 @@ def main():
           (constants.SOCKET_DIR, constants.SOCKET_DIR_MODE),
          ]
   daemon.GenericMain(constants.MASTERD, parser, dirs,
-                     CheckMasterd, ExecMasterd)
+                     CheckMasterd, ExecMasterd,
+                     multithreaded=True)
 
 
 if __name__ == "__main__":