changed = True
- # Note: this also changes the on-disk priority ("op.priority" is only in
- # memory)
- op.input.priority = priority
+ # Set new priority (doesn't modify opcode input)
op.priority = priority
if changed:
class _JobFileChangesWaiter(object):
- def __init__(self, filename):
+ def __init__(self, filename, _inotify_wm_cls=pyinotify.WatchManager):
"""Initializes this class.
@type filename: string
@raises errors.InotifyError: if the notifier cannot be setup
"""
- self._wm = pyinotify.WatchManager()
+ self._wm = _inotify_wm_cls()
self._inotify_handler = \
asyncnotifier.SingleFileEventHandler(self._wm, self._OnInotify, filename)
self._notifier = \
class _JobChangesWaiter(object):
- def __init__(self, filename):
+ def __init__(self, filename, _waiter_cls=_JobFileChangesWaiter):
"""Initializes this class.
@type filename: string
"""
self._filewaiter = None
self._filename = filename
+ self._waiter_cls = _waiter_cls
def Wait(self, timeout):
"""Waits for a job to change.
# If this point is reached, return immediately and let caller check the job
# file again in case there were changes since the last check. This avoids a
# race condition.
- self._filewaiter = _JobFileChangesWaiter(self._filename)
+ self._filewaiter = self._waiter_cls(self._filename)
return True
return result
def __call__(self, filename, job_load_fn,
- fields, prev_job_info, prev_log_serial, timeout):
+ fields, prev_job_info, prev_log_serial, timeout,
+ _waiter_cls=_JobChangesWaiter):
"""Waits for changes on a job.
@type filename: string
counter = itertools.count()
try:
check_fn = _JobChangesChecker(fields, prev_job_info, prev_log_serial)
- waiter = _JobChangesWaiter(filename)
+ waiter = _waiter_cls(filename)
try:
return utils.Retry(compat.partial(self._CheckForChanges,
counter, job_load_fn, check_fn),
wait_fn=waiter.Wait)
finally:
waiter.Close()
- except (errors.InotifyError, errors.JobLost):
+ except errors.JobLost:
return None
except utils.RetryTimeout:
return constants.JOB_NOTCHANGED
logging.error("Failed to upload file %s to node %s: %s",
file_name, node_name, msg)
+ # Set queue drained flag
+ result = \
+ self._GetRpc(addrs).call_jobqueue_set_drain_flag([node_name],
+ self._drained)
+ msg = result[node_name].fail_msg
+ if msg:
+ logging.error("Failed to set queue drained flag on node %s: %s",
+ node_name, msg)
+
self._nodes[node_name] = node.primary_ip
@locking.ssynchronized(_LOCK)
@param drain_flag: Whether to set or unset the drain flag
"""
+ # Change flag locally
jstore.SetDrainFlag(drain_flag)
self._drained = drain_flag
+ # ... and on all nodes
+ (names, addrs) = self._GetNodeIp()
+ result = \
+ self._GetRpc(addrs).call_jobqueue_set_drain_flag(names, drain_flag)
+ self._CheckRpcResult(result, self._nodes,
+ "Setting queue drain flag to %s" % drain_flag)
+
return True
@_RequireOpenQueue