Explicitly pass params to activate_master_ip
[ganeti-local] / lib / server / masterd.py
index c7d953d..2948bb0 100644 (file)
@@ -1,7 +1,7 @@
 #
 #
 
-# Copyright (C) 2006, 2007, 2010 Google Inc.
+# Copyright (C) 2006, 2007, 2010, 2011 Google Inc.
 #
 # This program is free software; you can redistribute it and/or modify
 # it under the terms of the GNU General Public License as published by
@@ -26,7 +26,7 @@ inheritance from parent classes requires it.
 
 """
 
-# pylint: disable-msg=C0103
+# pylint: disable=C0103
 # C0103: Invalid name ganeti-masterd
 
 import grp
@@ -66,7 +66,7 @@ EXIT_NODESETUP_ERROR = constants.EXIT_NODESETUP_ERROR
 
 
 class ClientRequestWorker(workerpool.BaseWorker):
-  # pylint: disable-msg=W0221
+  # pylint: disable=W0221
   def RunTask(self, server, message, client):
     """Process the request.
 
@@ -103,7 +103,7 @@ class ClientRequestWorker(workerpool.BaseWorker):
       client.send_message(reply)
       # awake the main thread so that it can write out the data.
       server.awaker.signal()
-    except: # pylint: disable-msg=W0702
+    except: # pylint: disable=W0702
       logging.exception("Send error")
       client.close_log()
 
@@ -113,6 +113,7 @@ class MasterClientHandler(daemon.AsyncTerminatedMessageStream):
 
   """
   _MAX_UNHANDLED = 1
+
   def __init__(self, server, connected_socket, client_address, family):
     daemon.AsyncTerminatedMessageStream.__init__(self, connected_socket,
                                                  client_address,
@@ -188,10 +189,13 @@ class ClientOps:
   def __init__(self, server):
     self.server = server
 
-  def handle_request(self, method, args): # pylint: disable-msg=R0911
+  def handle_request(self, method, args): # pylint: disable=R0911
     queue = self.server.context.jobqueue
 
     # TODO: Parameter validation
+    if not isinstance(args, (tuple, list)):
+      logging.info("Received invalid arguments of type '%s'", type(args))
+      raise ValueError("Invalid arguments type '%s'" % type(args))
 
     # TODO: Rewrite to not exit in each 'if/elif' branch
 
@@ -208,12 +212,12 @@ class ClientOps:
       return queue.SubmitManyJobs(jobs)
 
     elif method == luxi.REQ_CANCEL_JOB:
-      job_id = args
+      (job_id, ) = args
       logging.info("Received job cancel request for %s", job_id)
       return queue.CancelJob(job_id)
 
     elif method == luxi.REQ_ARCHIVE_JOB:
-      job_id = args
+      (job_id, ) = args
       logging.info("Received job archive request for %s", job_id)
       return queue.ArchiveJob(job_id)
 
@@ -230,16 +234,17 @@ class ClientOps:
                                      prev_log_serial, timeout)
 
     elif method == luxi.REQ_QUERY:
-      req = objects.QueryRequest.FromDict(args)
+      (what, fields, qfilter) = args
+      req = objects.QueryRequest(what=what, fields=fields, qfilter=qfilter)
 
-      if req.what in constants.QR_OP_QUERY:
+      if req.what in constants.QR_VIA_OP:
         result = self._Query(opcodes.OpQuery(what=req.what, fields=req.fields,
-                                             filter=req.filter))
+                                             qfilter=req.qfilter))
       elif req.what == constants.QR_LOCK:
-        if req.filter is not None:
+        if req.qfilter is not None:
           raise errors.OpPrereqError("Lock queries can't be filtered")
         return self.server.context.glm.QueryLocks(req.fields)
-      elif req.what in constants.QR_OP_LUXI:
+      elif req.what in constants.QR_VIA_LUXI:
         raise NotImplementedError
       else:
         raise errors.OpPrereqError("Resource type '%s' unknown" % req.what,
@@ -248,20 +253,16 @@ class ClientOps:
       return result
 
     elif method == luxi.REQ_QUERY_FIELDS:
-      req = objects.QueryFieldsRequest.FromDict(args)
+      (what, fields) = args
+      req = objects.QueryFieldsRequest(what=what, fields=fields)
 
-      if req.what in constants.QR_OP_QUERY:
-        result = self._Query(opcodes.OpQueryFields(what=req.what,
-                                                   fields=req.fields))
-      elif req.what == constants.QR_LOCK:
-        return query.QueryFields(query.LOCK_FIELDS, req.fields)
-      elif req.what in constants.QR_OP_LUXI:
-        raise NotImplementedError
-      else:
+      try:
+        fielddefs = query.ALL_FIELDS[req.what]
+      except KeyError:
         raise errors.OpPrereqError("Resource type '%s' unknown" % req.what,
                                    errors.ECODE_INVAL)
 
-      return result
+      return query.QueryFields(fielddefs, req.fields)
 
     elif method == luxi.REQ_QUERY_JOBS:
       (job_ids, fields) = args
@@ -278,8 +279,8 @@ class ClientOps:
       if use_locking:
         raise errors.OpPrereqError("Sync queries are not allowed",
                                    errors.ECODE_INVAL)
-      op = opcodes.OpQueryInstances(names=names, output_fields=fields,
-                                    use_locking=use_locking)
+      op = opcodes.OpInstanceQuery(names=names, output_fields=fields,
+                                   use_locking=use_locking)
       return self._Query(op)
 
     elif method == luxi.REQ_QUERY_NODES:
@@ -288,8 +289,8 @@ class ClientOps:
       if use_locking:
         raise errors.OpPrereqError("Sync queries are not allowed",
                                    errors.ECODE_INVAL)
-      op = opcodes.OpQueryNodes(names=names, output_fields=fields,
-                                use_locking=use_locking)
+      op = opcodes.OpNodeQuery(names=names, output_fields=fields,
+                               use_locking=use_locking)
       return self._Query(op)
 
     elif method == luxi.REQ_QUERY_GROUPS:
@@ -298,11 +299,11 @@ class ClientOps:
       if use_locking:
         raise errors.OpPrereqError("Sync queries are not allowed",
                                    errors.ECODE_INVAL)
-      op = opcodes.OpQueryGroups(names=names, output_fields=fields)
+      op = opcodes.OpGroupQuery(names=names, output_fields=fields)
       return self._Query(op)
 
     elif method == luxi.REQ_QUERY_EXPORTS:
-      nodes, use_locking = args
+      (nodes, use_locking) = args
       if use_locking:
         raise errors.OpPrereqError("Sync queries are not allowed",
                                    errors.ECODE_INVAL)
@@ -311,7 +312,7 @@ class ClientOps:
       return self._Query(op)
 
     elif method == luxi.REQ_QUERY_CONFIG_VALUES:
-      fields = args
+      (fields, ) = args
       logging.info("Received config values query request for %s", fields)
       op = opcodes.OpClusterConfigQuery(output_fields=fields)
       return self._Query(op)
@@ -322,9 +323,9 @@ class ClientOps:
       return self._Query(op)
 
     elif method == luxi.REQ_QUERY_TAGS:
-      kind, name = args
+      (kind, name) = args
       logging.info("Received tags query request")
-      op = opcodes.OpGetTags(kind=kind, name=name)
+      op = opcodes.OpTagsGet(kind=kind, name=name)
       return self._Query(op)
 
     elif method == luxi.REQ_QUERY_LOCKS:
@@ -335,7 +336,7 @@ class ClientOps:
       return self.server.context.glm.OldStyleQueryLocks(fields)
 
     elif method == luxi.REQ_QUEUE_SET_DRAIN_FLAG:
-      drain_flag = args
+      (drain_flag, ) = args
       logging.info("Received queue drain flag change request to %s",
                    drain_flag)
       return queue.SetDrainFlag(drain_flag)
@@ -378,7 +379,7 @@ class GanetiContext(object):
   This class creates and holds common objects shared by all threads.
 
   """
-  # pylint: disable-msg=W0212
+  # pylint: disable=W0212
   # we do want to ensure a singleton here
   _instance = None
 
@@ -403,6 +404,9 @@ class GanetiContext(object):
     # Job queue
     self.jobqueue = jqueue.JobQueue(self)
 
+    # RPC runner
+    self.rpc = rpc.RpcRunner(self)
+
     # setting this also locks the class against attribute modifications
     self.__class__._instance = self
 
@@ -425,6 +429,7 @@ class GanetiContext(object):
 
     # Add the new node to the Ganeti Lock Manager
     self.glm.add(locking.LEVEL_NODE, node.name)
+    self.glm.add(locking.LEVEL_NODE_RES, node.name)
 
   def ReaddNode(self, node):
     """Updates a node that's already in the configuration
@@ -445,6 +450,7 @@ class GanetiContext(object):
 
     # Remove the node from the Ganeti Lock Manager
     self.glm.remove(locking.LEVEL_NODE, name)
+    self.glm.remove(locking.LEVEL_NODE_RES, name)
 
 
 def _SetWatcherPause(until):
@@ -527,8 +533,11 @@ def CheckAgreement():
 @rpc.RunWithRPC
 def ActivateMasterIP():
   # activate ip
-  master_node = ssconf.SimpleStore().GetMasterNode()
-  result = rpc.RpcRunner.call_node_start_master(master_node, False, False)
+  cfg = config.ConfigWriter()
+  (master, ip, dev, netmask, family) = cfg.GetMasterNetworkParameters()
+  runner = rpc.BootstrapRunner()
+  result = runner.call_node_activate_master_ip(master, ip, netmask, dev, family)
+
   msg = result.fail_msg
   if msg:
     logging.error("Can't activate master IP address: %s", msg)
@@ -578,26 +587,24 @@ def CheckMasterd(options, args):
   # If CheckMaster didn't fail we believe we are the master, but we have to
   # confirm with the other nodes.
   if options.no_voting:
-    if options.yes_do_it:
-      return
+    if not options.yes_do_it:
+      sys.stdout.write("The 'no voting' option has been selected.\n")
+      sys.stdout.write("This is dangerous, please confirm by"
+                       " typing uppercase 'yes': ")
+      sys.stdout.flush()
 
-    sys.stdout.write("The 'no voting' option has been selected.\n")
-    sys.stdout.write("This is dangerous, please confirm by"
-                     " typing uppercase 'yes': ")
-    sys.stdout.flush()
+      confirmation = sys.stdin.readline().strip()
+      if confirmation != "YES":
+        print >> sys.stderr, "Aborting."
+        sys.exit(constants.EXIT_FAILURE)
 
-    confirmation = sys.stdin.readline().strip()
-    if confirmation != "YES":
-      print >> sys.stderr, "Aborting."
+  else:
+    # CheckAgreement uses RPC and threads, hence it needs to be run in
+    # a separate process before we call utils.Daemonize in the current
+    # process.
+    if not utils.RunInSeparateProcess(CheckAgreement):
       sys.exit(constants.EXIT_FAILURE)
 
-    return
-
-  # CheckAgreement uses RPC and threads, hence it needs to be run in a separate
-  # process before we call utils.Daemonize in the current process.
-  if not utils.RunInSeparateProcess(CheckAgreement):
-    sys.exit(constants.EXIT_FAILURE)
-
   # ActivateMasterIP also uses RPC/threads, so we run it again via a
   # separate process.
 
@@ -619,7 +626,7 @@ def PrepMasterd(options, _):
   return (mainloop, master)
 
 
-def ExecMasterd(options, args, prep_data): # pylint: disable-msg=W0613
+def ExecMasterd(options, args, prep_data): # pylint: disable=W0613
   """Main master daemon function, executed with the PID file held.
 
   """