Replicate queue drain flag across all master candidates
authorMichael Hanselmann <hansmi@google.com>
Tue, 11 Dec 2012 12:56:35 +0000 (13:56 +0100)
committerMichael Hanselmann <hansmi@google.com>
Tue, 11 Dec 2012 13:37:32 +0000 (14:37 +0100)
Until now, the flag was unset on a master failover unless the
“$localstatedir/lib/ganeti/queue/drain” file existed.

Signed-off-by: Michael Hanselmann <hansmi@google.com>
Reviewed-by: Guido Trotter <ultrotter@google.com>

NEWS
lib/jqueue.py
lib/rpc_defs.py
lib/server/noded.py

diff --git a/NEWS b/NEWS
index 5f07284..293378c 100644 (file)
--- a/NEWS
+++ b/NEWS
@@ -37,6 +37,9 @@ Version 2.7.0 beta1
   a cluster from a machine by stopping all daemons, removing
   certificates and ssconf files. Unless the ``--no-backup`` option is
   given, copies of the certificates are made.
+- Draining (``gnt-cluster queue drain``) and un-draining the job queue
+  (``gnt-cluster queue undrain``) now affects all nodes in a cluster and
+  the flag is not reset after a master failover.
 
 
 Version 2.6.1
index 5bde17d..b855b77 100644 (file)
@@ -1800,6 +1800,15 @@ class JobQueue(object):
         logging.error("Failed to upload file %s to node %s: %s",
                       file_name, node_name, msg)
 
+    # Set queue drained flag
+    result = \
+      self._GetRpc(addrs).call_jobqueue_set_drain_flag([node_name],
+                                                       self._drained)
+    msg = result[node_name].fail_msg
+    if msg:
+      logging.error("Failed to set queue drained flag on node %s: %s",
+                    node_name, msg)
+
     self._nodes[node_name] = node.primary_ip
 
   @locking.ssynchronized(_LOCK)
@@ -2123,10 +2132,18 @@ class JobQueue(object):
     @param drain_flag: Whether to set or unset the drain flag
 
     """
+    # Change flag locally
     jstore.SetDrainFlag(drain_flag)
 
     self._drained = drain_flag
 
+    # ... and on all nodes
+    (names, addrs) = self._GetNodeIp()
+    result = \
+      self._GetRpc(addrs).call_jobqueue_set_drain_flag(names, drain_flag)
+    self._CheckRpcResult(result, self._nodes,
+                         "Setting queue drain flag to %s" % drain_flag)
+
     return True
 
   @_RequireOpenQueue
index 6ecd2b2..ecea054 100644 (file)
@@ -522,6 +522,9 @@ CALLS = {
     ("jobqueue_rename", MULTI, None, constants.RPC_TMO_URGENT, [
       ("rename", None, None),
       ], None, None, "Rename job queue file"),
+    ("jobqueue_set_drain_flag", MULTI, None, constants.RPC_TMO_URGENT, [
+      ("flag", None, None),
+      ], None, None, "Set job queue drain flag"),
     ]),
   "RpcClientBootstrap": _Prepare([
     ("node_start_master_daemons", SINGLE, None, constants.RPC_TMO_FAST, [
index e04a2c7..91ce4ce 100644 (file)
@@ -944,6 +944,16 @@ class NodeRequestHandler(http.server.HttpServerHandler):
     # TODO: What if a file fails to rename?
     return [backend.JobQueueRename(old, new) for old, new in params[0]]
 
+  @staticmethod
+  @_RequireJobQueueLock
+  def perspective_jobqueue_set_drain_flag(params):
+    """Set job queue's drain flag.
+
+    """
+    (flag, ) = params
+
+    return jstore.SetDrainFlag(flag)
+
   # hypervisor ---------------
 
   @staticmethod