DRBD: ensure peers are UpToDate for dual-primary
[ganeti-local] / lib / jqueue.py
index b855b77..110d386 100644 (file)
@@ -222,6 +222,22 @@ class _QueuedJob(object):
                "received_timestamp", "start_timestamp", "end_timestamp",
                "__weakref__", "processor_lock", "writable", "archived"]
 
+  def _AddReasons(self):
+    """Extend the reason trail
+
+    Add the reason for all the opcodes of this job to be executed.
+
+    """
+    count = 0
+    for queued_op in self.ops:
+      op = queued_op.input
+      reason_src = opcodes.NameToReasonSrc(op.__class__.__name__)
+      reason_text = "job=%d;index=%d" % (self.id, count)
+      reason = getattr(op, "reason", [])
+      reason.append((reason_src, reason_text, utils.EpochNano()))
+      op.reason = reason
+      count = count + 1
+
   def __init__(self, queue, job_id, ops, writable):
     """Constructor for the _QueuedJob.
 
@@ -242,6 +258,7 @@ class _QueuedJob(object):
     self.queue = queue
     self.id = int(job_id)
     self.ops = [_QueuedOpCode(op) for op in ops]
+    self._AddReasons()
     self.log_serial = 0
     self.received_timestamp = TimeStampNow()
     self.start_timestamp = None
@@ -689,7 +706,7 @@ class _JobChangesChecker(object):
 
 
 class _JobFileChangesWaiter(object):
-  def __init__(self, filename):
+  def __init__(self, filename, _inotify_wm_cls=pyinotify.WatchManager):
     """Initializes this class.
 
     @type filename: string
@@ -697,7 +714,7 @@ class _JobFileChangesWaiter(object):
     @raises errors.InotifyError: if the notifier cannot be setup
 
     """
-    self._wm = pyinotify.WatchManager()
+    self._wm = _inotify_wm_cls()
     self._inotify_handler = \
       asyncnotifier.SingleFileEventHandler(self._wm, self._OnInotify, filename)
     self._notifier = \
@@ -739,7 +756,7 @@ class _JobFileChangesWaiter(object):
 
 
 class _JobChangesWaiter(object):
-  def __init__(self, filename):
+  def __init__(self, filename, _waiter_cls=_JobFileChangesWaiter):
     """Initializes this class.
 
     @type filename: string
@@ -748,6 +765,7 @@ class _JobChangesWaiter(object):
     """
     self._filewaiter = None
     self._filename = filename
+    self._waiter_cls = _waiter_cls
 
   def Wait(self, timeout):
     """Waits for a job to change.
@@ -764,7 +782,7 @@ class _JobChangesWaiter(object):
     # If this point is reached, return immediately and let caller check the job
     # file again in case there were changes since the last check. This avoids a
     # race condition.
-    self._filewaiter = _JobFileChangesWaiter(self._filename)
+    self._filewaiter = self._waiter_cls(self._filename)
 
     return True
 
@@ -802,7 +820,8 @@ class _WaitForJobChangesHelper(object):
     return result
 
   def __call__(self, filename, job_load_fn,
-               fields, prev_job_info, prev_log_serial, timeout):
+               fields, prev_job_info, prev_log_serial, timeout,
+               _waiter_cls=_JobChangesWaiter):
     """Waits for changes on a job.
 
     @type filename: string
@@ -822,7 +841,7 @@ class _WaitForJobChangesHelper(object):
     counter = itertools.count()
     try:
       check_fn = _JobChangesChecker(fields, prev_job_info, prev_log_serial)
-      waiter = _JobChangesWaiter(filename)
+      waiter = _waiter_cls(filename)
       try:
         return utils.Retry(compat.partial(self._CheckForChanges,
                                           counter, job_load_fn, check_fn),
@@ -830,7 +849,7 @@ class _WaitForJobChangesHelper(object):
                            wait_fn=waiter.Wait)
       finally:
         waiter.Close()
-    except (errors.InotifyError, errors.JobLost):
+    except errors.JobLost:
       return None
     except utils.RetryTimeout:
       return constants.JOB_NOTCHANGED
@@ -1883,7 +1902,8 @@ class JobQueue(object):
     """
     getents = runtime.GetEnts()
     utils.WriteFile(file_name, data=data, uid=getents.masterd_uid,
-                    gid=getents.masterd_gid)
+                    gid=getents.daemons_gid,
+                    mode=constants.JOB_QUEUE_FILES_PERMS)
 
     if replicate:
       names, addrs = self._GetNodeIp()