Revision ff699aa9

b/lib/bootstrap.py
43 43
from ganeti import netutils
44 44
from ganeti import backend
45 45
from ganeti import luxi
46
from ganeti import jstore
46 47

  
47 48

  
48 49
# ec_id for InitConfig's temporary reservation manager
......
660 661
                    " continuing but activating the master on the current"
661 662
                    " node will probably fail", total_timeout)
662 663

  
664
  if jstore.CheckDrainFlag():
665
    logging.info("Undraining job queue")
666
    jstore.SetDrainFlag(False)
667

  
663 668
  logging.info("Starting the master daemons on the new master")
664 669

  
665 670
  result = rpc.RpcRunner.call_node_start_master(new_master, True, no_voting)
b/lib/jqueue.py
29 29

  
30 30
"""
31 31

  
32
import os
33 32
import logging
34 33
import errno
35 34
import re
......
1230 1229

  
1231 1230
    self._queue_size = 0
1232 1231
    self._UpdateQueueSizeUnlocked()
1233
    self._drained = self._IsQueueMarkedDrain()
1232
    self._drained = jstore.CheckDrainFlag()
1234 1233

  
1235 1234
    # Setup worker pool
1236 1235
    self._wpool = _JobQueueWorkerPool(self)
......
1629 1628
      logging.exception("Can't load/parse job %s", job_id)
1630 1629
      return None
1631 1630

  
1632
  @staticmethod
1633
  def _IsQueueMarkedDrain():
1634
    """Check if the queue is marked from drain.
1635

  
1636
    This currently uses the queue drain file, which makes it a
1637
    per-node flag. In the future this can be moved to the config file.
1638

  
1639
    @rtype: boolean
1640
    @return: True of the job queue is marked for draining
1641

  
1642
    """
1643
    return os.path.exists(constants.JOB_QUEUE_DRAIN_FILE)
1644

  
1645 1631
  def _UpdateQueueSizeUnlocked(self):
1646 1632
    """Update the queue size.
1647 1633

  
......
1657 1643
    @param drain_flag: Whether to set or unset the drain flag
1658 1644

  
1659 1645
    """
1660
    getents = runtime.GetEnts()
1661

  
1662
    if drain_flag:
1663
      utils.WriteFile(constants.JOB_QUEUE_DRAIN_FILE, data="", close=True,
1664
                      uid=getents.masterd_uid, gid=getents.masterd_gid)
1665
    else:
1666
      utils.RemoveFile(constants.JOB_QUEUE_DRAIN_FILE)
1646
    jstore.SetDrainFlag(drain_flag)
1667 1647

  
1668 1648
    self._drained = drain_flag
1669 1649

  
b/lib/jstore.py
22 22
"""Module implementing the job queue handling."""
23 23

  
24 24
import errno
25
import os
25 26

  
26 27
from ganeti import constants
27 28
from ganeti import errors
......
134 135
    raise
135 136

  
136 137
  return queue_lock
138

  
139

  
140
def CheckDrainFlag():
141
  """Check if the queue is marked to be drained.
142

  
143
  This currently uses the queue drain file, which makes it a per-node flag.
144
  In the future this can be moved to the config file.
145

  
146
  @rtype: boolean
147
  @return: True if the job queue is marked drained
148

  
149
  """
150
  return os.path.exists(constants.JOB_QUEUE_DRAIN_FILE)
151

  
152

  
153
def SetDrainFlag(drain_flag):
154
  """Sets the drain flag for the queue.
155

  
156
  @type drain_flag: boolean
157
  @param drain_flag: Whether to set or unset the drain flag
158
  @attention: This function should only called the current holder of the queue
159
    lock
160

  
161
  """
162
  getents = runtime.GetEnts()
163

  
164
  if drain_flag:
165
    utils.WriteFile(constants.JOB_QUEUE_DRAIN_FILE, data="",
166
                    uid=getents.masterd_uid, gid=getents.masterd_gid)
167
  else:
168
    utils.RemoveFile(constants.JOB_QUEUE_DRAIN_FILE)
169

  
170
  assert (not drain_flag) ^ CheckDrainFlag()
b/qa/ganeti-qa.py
167 167
    ("cluster-command", qa_cluster.TestClusterCommand),
168 168
    ("cluster-burnin", qa_cluster.TestClusterBurnin),
169 169
    ("cluster-master-failover", qa_cluster.TestClusterMasterFailover),
170
    ("cluster-master-failover",
171
     qa_cluster.TestClusterMasterFailoverWithDrainedQueue),
170 172
    ("cluster-oob", qa_cluster.TestClusterOob),
171 173
    ("rapi", qa_rapi.TestVersion),
172 174
    ("rapi", qa_rapi.TestEmptyCluster),
b/qa/qa_cluster.py
385 385
  cmd = ["gnt-cluster", "master-failover"]
386 386
  try:
387 387
    AssertCommand(cmd, node=failovermaster)
388
    # Back to original master node
388 389
    AssertCommand(cmd, node=master)
389 390
  finally:
390 391
    qa_config.ReleaseNode(failovermaster)
391 392

  
392 393

  
394
def TestClusterMasterFailoverWithDrainedQueue():
395
  """gnt-cluster master-failover with drained queue"""
396
  drain_check = ["test", "-f", constants.JOB_QUEUE_DRAIN_FILE]
397

  
398
  master = qa_config.GetMasterNode()
399
  failovermaster = qa_config.AcquireNode(exclude=master)
400

  
401
  # Ensure queue is not drained
402
  for node in [master, failovermaster]:
403
    AssertCommand(drain_check, node=node, fail=True)
404

  
405
  # Drain queue on failover master
406
  AssertCommand(["touch", constants.JOB_QUEUE_DRAIN_FILE], node=failovermaster)
407

  
408
  cmd = ["gnt-cluster", "master-failover"]
409
  try:
410
    AssertCommand(drain_check, node=failovermaster)
411
    AssertCommand(cmd, node=failovermaster)
412
    AssertCommand(drain_check, fail=True)
413
    AssertCommand(drain_check, node=failovermaster, fail=True)
414

  
415
    # Back to original master node
416
    AssertCommand(cmd, node=master)
417
  finally:
418
    qa_config.ReleaseNode(failovermaster)
419

  
420
  AssertCommand(drain_check, fail=True)
421
  AssertCommand(drain_check, node=failovermaster, fail=True)
422

  
423

  
393 424
def TestClusterCopyfile():
394 425
  """gnt-cluster copyfile"""
395 426
  master = qa_config.GetMasterNode()

Also available in: Unified diff