Add a generic 'debug_level' attribute to opcodes
[ganeti-local] / daemons / ganeti-masterd
index e016d53..a789b7a 100755 (executable)
@@ -1,4 +1,4 @@
-#!/usr/bin/python -u
+#!/usr/bin/python
 #
 
 # Copyright (C) 2006, 2007 Google Inc.
@@ -26,6 +26,8 @@ inheritance from parent classes requires it.
 
 """
 
+# pylint: disable-msg=C0103
+# C0103: Invalid name ganeti-masterd
 
 import os
 import sys
@@ -61,6 +63,7 @@ EXIT_NODESETUP_ERROR = constants.EXIT_NODESETUP_ERROR
 
 
 class ClientRequestWorker(workerpool.BaseWorker):
+   # pylint: disable-msg=W0221
   def RunTask(self, server, request, client_address):
     """Process the request.
 
@@ -70,7 +73,7 @@ class ClientRequestWorker(workerpool.BaseWorker):
     try:
       server.finish_request(request, client_address)
       server.close_request(request)
-    except:
+    except: # pylint: disable-msg=W0702
       server.handle_error(request, client_address)
       server.close_request(request)
 
@@ -98,7 +101,8 @@ class IOServer(SocketServer.UnixStreamServer):
 
   def setup_queue(self):
     self.context = GanetiContext()
-    self.request_workers = workerpool.WorkerPool(CLIENT_REQUEST_WORKERS,
+    self.request_workers = workerpool.WorkerPool("ClientReq",
+                                                 CLIENT_REQUEST_WORKERS,
                                                  ClientRequestWorker)
 
   def process_request(self, request, client_address):
@@ -108,7 +112,7 @@ class IOServer(SocketServer.UnixStreamServer):
     self.request_workers.AddTask(self, request, client_address)
 
   @utils.SignalHandled([signal.SIGINT, signal.SIGTERM])
-  def serve_forever(self, signal_handlers=None):
+  def serve_forever(self, signal_handlers=None): # pylint: disable-msg=W0221
     """Handle one request at a time until told to quit."""
     assert isinstance(signal_handlers, dict) and \
            len(signal_handlers) > 0, \
@@ -141,6 +145,8 @@ class ClientRqHandler(SocketServer.BaseRequestHandler):
   READ_SIZE = 4096
 
   def setup(self):
+    # pylint: disable-msg=W0201
+    # setup() is the api for initialising for this class
     self._buffer = ""
     self._msgs = collections.deque()
     self._ops = ClientOps(self.server)
@@ -204,11 +210,13 @@ class ClientOps:
   def __init__(self, server):
     self.server = server
 
-  def handle_request(self, method, args):
+  def handle_request(self, method, args): # pylint: disable-msg=R0911
     queue = self.server.context.jobqueue
 
     # TODO: Parameter validation
 
+    # TODO: Rewrite to not exit in each 'if/elif' branch
+
     if method == luxi.REQ_SUBMIT_JOB:
       logging.info("Received new job")
       ops = [opcodes.OpCode.LoadOpCode(state) for state in args]
@@ -246,7 +254,7 @@ class ClientOps:
     elif method == luxi.REQ_QUERY_JOBS:
       (job_ids, fields) = args
       if isinstance(job_ids, (tuple, list)) and job_ids:
-        msg = ", ".join(job_ids)
+        msg = utils.CommaJoin(job_ids)
       else:
         msg = str(job_ids)
       logging.info("Received job query request for %s", msg)
@@ -256,7 +264,8 @@ class ClientOps:
       (names, fields, use_locking) = args
       logging.info("Received instance query request for %s", names)
       if use_locking:
-        raise errors.OpPrereqError("Sync queries are not allowed")
+        raise errors.OpPrereqError("Sync queries are not allowed",
+                                   errors.ECODE_INVAL)
       op = opcodes.OpQueryInstances(names=names, output_fields=fields,
                                     use_locking=use_locking)
       return self._Query(op)
@@ -265,7 +274,8 @@ class ClientOps:
       (names, fields, use_locking) = args
       logging.info("Received node query request for %s", names)
       if use_locking:
-        raise errors.OpPrereqError("Sync queries are not allowed")
+        raise errors.OpPrereqError("Sync queries are not allowed",
+                                   errors.ECODE_INVAL)
       op = opcodes.OpQueryNodes(names=names, output_fields=fields,
                                 use_locking=use_locking)
       return self._Query(op)
@@ -273,7 +283,8 @@ class ClientOps:
     elif method == luxi.REQ_QUERY_EXPORTS:
       nodes, use_locking = args
       if use_locking:
-        raise errors.OpPrereqError("Sync queries are not allowed")
+        raise errors.OpPrereqError("Sync queries are not allowed",
+                                   errors.ECODE_INVAL)
       logging.info("Received exports query request")
       op = opcodes.OpQueryExports(nodes=nodes, use_locking=use_locking)
       return self._Query(op)
@@ -289,6 +300,12 @@ class ClientOps:
       op = opcodes.OpQueryClusterInfo()
       return self._Query(op)
 
+    elif method == luxi.REQ_QUERY_TAGS:
+      kind, name = args
+      logging.info("Received tags query request")
+      op = opcodes.OpGetTags(kind=kind, name=name)
+      return self._Query(op)
+
     elif method == luxi.REQ_QUEUE_SET_DRAIN_FLAG:
       drain_flag = args
       logging.info("Received queue drain flag change request to %s",
@@ -319,7 +336,8 @@ class ClientOps:
     """Runs the specified opcode and returns the result.
 
     """
-    proc = mcpu.Processor(self.server.context)
+    # Queries don't have a job id
+    proc = mcpu.Processor(self.server.context, None)
     return proc.ExecOpCode(op, None)
 
 
@@ -329,6 +347,8 @@ class GanetiContext(object):
   This class creates and holds common objects shared by all threads.
 
   """
+  # pylint: disable-msg=W0212
+  # we do want to ensure a singleton here
   _instance = None
 
   def __init__(self):
@@ -361,12 +381,12 @@ class GanetiContext(object):
     assert self.__class__._instance is None, "Attempt to modify Ganeti Context"
     object.__setattr__(self, name, value)
 
-  def AddNode(self, node):
+  def AddNode(self, node, ec_id):
     """Adds a node to the configuration and lock manager.
 
     """
     # Add it to the configuration
-    self.cfg.AddNode(node)
+    self.cfg.AddNode(node, ec_id)
 
     # If preseeding fails it'll not be added
     self.jobqueue.AddNode(node)
@@ -494,12 +514,12 @@ def _RunInSeparateProcess(fn):
       # Call function
       result = int(bool(fn()))
       assert result in (0, 1)
-    except:
+    except: # pylint: disable-msg=W0702
       logging.exception("Error while calling function in separate process")
       # 0 and 1 are reserved for the return value
       result = 33
 
-    os._exit(result)
+    os._exit(result) # pylint: disable-msg=W0212
 
   # Parent process
 
@@ -525,6 +545,10 @@ def CheckMasterd(options, args):
   """Initial checks whether to run or exit with a failure.
 
   """
+  if args: # masterd doesn't take any arguments
+    print >> sys.stderr, ("Usage: %s [-f] [-d]" % sys.argv[0])
+    sys.exit(constants.EXIT_FAILURE)
+
   ssconf.CheckMaster(options.debug)
 
   # If CheckMaster didn't fail we believe we are the master, but we have to
@@ -540,7 +564,7 @@ def CheckMasterd(options, args):
 
     confirmation = sys.stdin.readline().strip()
     if confirmation != "YES":
-      print >>sys.stderr, "Aborting."
+      print >> sys.stderr, "Aborting."
       sys.exit(constants.EXIT_FAILURE)
 
     return
@@ -551,7 +575,7 @@ def CheckMasterd(options, args):
     sys.exit(constants.EXIT_FAILURE)
 
 
-def ExecMasterd (options, args):
+def ExecMasterd (options, args): # pylint: disable-msg=W0613
   """Main master daemon function, executed with the PID file held.
 
   """