rapi.testutils: Add exported functions to verify opcode input/result
[ganeti-local] / lib / jstore.py
index 7424b7b..88b15cc 100644 (file)
 
 """Module implementing the job queue handling."""
 
-import os
-import logging
 import errno
-import re
+import os
 
 from ganeti import constants
 from ganeti import errors
+from ganeti import runtime
 from ganeti import utils
 
 
@@ -39,17 +38,12 @@ def _ReadNumericFile(file_name):
 
   """
   try:
-    fd = open(file_name, "r")
-  except IOError, err:
+    return int(utils.ReadFile(file_name))
+  except EnvironmentError, err:
     if err.errno in (errno.ENOENT, ):
       return None
     raise
 
-  try:
-    return int(fd.read(128))
-  finally:
-    fd.close()
-
 
 def ReadSerial():
   """Read the serial file.
@@ -81,16 +75,10 @@ def InitAndVerifyQueue(must_lock):
            locking mode.
 
   """
-  # Make sure our directories exists
-  for path in (constants.QUEUE_DIR, constants.JOB_QUEUE_ARCHIVE_DIR):
-    try:
-      os.mkdir(path, 0700)
-    except OSError, err:
-      if err.errno not in (errno.EEXIST, ):
-        raise
+  getents = runtime.GetEnts()
 
   # Lock queue
-  queue_lock = utils.FileLock(constants.JOB_QUEUE_LOCK_FILE)
+  queue_lock = utils.FileLock.Open(constants.JOB_QUEUE_LOCK_FILE)
   try:
     # The queue needs to be locked in exclusive mode to write to the serial and
     # version files.
@@ -112,6 +100,7 @@ def InitAndVerifyQueue(must_lock):
       if version is None:
         # Write new version file
         utils.WriteFile(constants.JOB_QUEUE_VERSION_FILE,
+                        uid=getents.masterd_uid, gid=getents.masterd_gid,
                         data="%s\n" % constants.JOB_QUEUE_VERSION)
 
         # Read again
@@ -125,6 +114,7 @@ def InitAndVerifyQueue(must_lock):
       if serial is None:
         # Write new serial file
         utils.WriteFile(constants.JOB_QUEUE_SERIAL_FILE,
+                        uid=getents.masterd_uid, gid=getents.masterd_gid,
                         data="%s\n" % 0)
 
         # Read again
@@ -132,10 +122,49 @@ def InitAndVerifyQueue(must_lock):
 
       if serial is None:
         # There must be a serious problem
-        raise errors.JobQueueError("Can't read/parse the job queue serial file")
+        raise errors.JobQueueError("Can't read/parse the job queue"
+                                   " serial file")
+
+      if not must_lock:
+        # There's no need for more error handling. Closing the lock
+        # file below in case of an error will unlock it anyway.
+        queue_lock.Unlock()
 
   except:
     queue_lock.Close()
     raise
 
   return queue_lock
+
+
+def CheckDrainFlag():
+  """Check if the queue is marked to be drained.
+
+  This currently uses the queue drain file, which makes it a per-node flag.
+  In the future this can be moved to the config file.
+
+  @rtype: boolean
+  @return: True if the job queue is marked drained
+
+  """
+  return os.path.exists(constants.JOB_QUEUE_DRAIN_FILE)
+
+
+def SetDrainFlag(drain_flag):
+  """Sets the drain flag for the queue.
+
+  @type drain_flag: boolean
+  @param drain_flag: Whether to set or unset the drain flag
+  @attention: This function should only called the current holder of the queue
+    lock
+
+  """
+  getents = runtime.GetEnts()
+
+  if drain_flag:
+    utils.WriteFile(constants.JOB_QUEUE_DRAIN_FILE, data="",
+                    uid=getents.masterd_uid, gid=getents.masterd_gid)
+  else:
+    utils.RemoveFile(constants.JOB_QUEUE_DRAIN_FILE)
+
+  assert (not drain_flag) ^ CheckDrainFlag()