Statistics
| Branch: | Tag: | Revision:

root / src / Ganeti / JQScheduler.hs @ bb62d52e

History | View | Annotate | Download (11.9 kB)

1 48e4da5c Klaus Aehlig
{-| Implementation of a reader for the job queue.
2 48e4da5c Klaus Aehlig
3 48e4da5c Klaus Aehlig
-}
4 48e4da5c Klaus Aehlig
5 48e4da5c Klaus Aehlig
{-
6 48e4da5c Klaus Aehlig
7 48e4da5c Klaus Aehlig
Copyright (C) 2013 Google Inc.
8 48e4da5c Klaus Aehlig
9 48e4da5c Klaus Aehlig
This program is free software; you can redistribute it and/or modify
10 48e4da5c Klaus Aehlig
it under the terms of the GNU General Public License as published by
11 48e4da5c Klaus Aehlig
the Free Software Foundation; either version 2 of the License, or
12 48e4da5c Klaus Aehlig
(at your option) any later version.
13 48e4da5c Klaus Aehlig
14 48e4da5c Klaus Aehlig
This program is distributed in the hope that it will be useful, but
15 48e4da5c Klaus Aehlig
WITHOUT ANY WARRANTY; without even the implied warranty of
16 48e4da5c Klaus Aehlig
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
17 48e4da5c Klaus Aehlig
General Public License for more details.
18 48e4da5c Klaus Aehlig
19 48e4da5c Klaus Aehlig
You should have received a copy of the GNU General Public License
20 48e4da5c Klaus Aehlig
along with this program; if not, write to the Free Software
21 48e4da5c Klaus Aehlig
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 48e4da5c Klaus Aehlig
02110-1301, USA.
23 48e4da5c Klaus Aehlig
24 48e4da5c Klaus Aehlig
-}
25 48e4da5c Klaus Aehlig
26 48e4da5c Klaus Aehlig
module Ganeti.JQScheduler
27 48e4da5c Klaus Aehlig
  ( JQStatus
28 48e4da5c Klaus Aehlig
  , emptyJQStatus
29 48e4da5c Klaus Aehlig
  , initJQScheduler
30 48e4da5c Klaus Aehlig
  , enqueueNewJobs
31 bb62d52e Klaus Aehlig
  , dequeueJob
32 48e4da5c Klaus Aehlig
  ) where
33 48e4da5c Klaus Aehlig
34 48e4da5c Klaus Aehlig
import Control.Arrow
35 48e4da5c Klaus Aehlig
import Control.Concurrent
36 1c532d2d Klaus Aehlig
import Control.Exception
37 48e4da5c Klaus Aehlig
import Control.Monad
38 48e4da5c Klaus Aehlig
import Data.List
39 b81650b0 Klaus Aehlig
import Data.Maybe
40 48e4da5c Klaus Aehlig
import Data.IORef
41 ed6cf449 Klaus Aehlig
import System.INotify
42 48e4da5c Klaus Aehlig
43 48e4da5c Klaus Aehlig
import Ganeti.BasicTypes
44 48e4da5c Klaus Aehlig
import Ganeti.Constants as C
45 48e4da5c Klaus Aehlig
import Ganeti.JQueue as JQ
46 48e4da5c Klaus Aehlig
import Ganeti.Logging
47 48e4da5c Klaus Aehlig
import Ganeti.Path
48 48e4da5c Klaus Aehlig
import Ganeti.Types
49 48e4da5c Klaus Aehlig
import Ganeti.Utils
50 48e4da5c Klaus Aehlig
51 ed6cf449 Klaus Aehlig
data JobWithStat = JobWithStat { jINotify :: Maybe INotify
52 ed6cf449 Klaus Aehlig
                               , jStat :: FStat
53 ed6cf449 Klaus Aehlig
                               , jJob :: QueuedJob
54 ed6cf449 Klaus Aehlig
                               }
55 48e4da5c Klaus Aehlig
data Queue = Queue { qEnqueued :: [JobWithStat], qRunning :: [JobWithStat] }
56 48e4da5c Klaus Aehlig
57 48e4da5c Klaus Aehlig
{-| Representation of the job queue
58 48e4da5c Klaus Aehlig
59 48e4da5c Klaus Aehlig
We keep two lists of jobs (together with information about the last
60 48e4da5c Klaus Aehlig
fstat result observed): the jobs that are enqueued, but not yet handed
61 48e4da5c Klaus Aehlig
over for execution, and the jobs already handed over for execution. They
62 48e4da5c Klaus Aehlig
are kept together in a single IORef, so that we can atomically update
63 48e4da5c Klaus Aehlig
both, in particular when scheduling jobs to be handed over for execution.
64 48e4da5c Klaus Aehlig
65 48e4da5c Klaus Aehlig
-}
66 48e4da5c Klaus Aehlig
67 48e4da5c Klaus Aehlig
data JQStatus = JQStatus
68 48e4da5c Klaus Aehlig
  { jqJobs :: IORef Queue
69 48e4da5c Klaus Aehlig
  }
70 48e4da5c Klaus Aehlig
71 48e4da5c Klaus Aehlig
72 48e4da5c Klaus Aehlig
emptyJQStatus :: IO JQStatus
73 48e4da5c Klaus Aehlig
emptyJQStatus = do
74 48e4da5c Klaus Aehlig
  jqJ <- newIORef Queue {qEnqueued=[], qRunning=[]}
75 48e4da5c Klaus Aehlig
  return JQStatus { jqJobs=jqJ }
76 48e4da5c Klaus Aehlig
77 48e4da5c Klaus Aehlig
-- | Apply a function on the running jobs.
78 48e4da5c Klaus Aehlig
onRunningJobs :: ([JobWithStat] -> [JobWithStat]) -> Queue -> Queue
79 48e4da5c Klaus Aehlig
onRunningJobs f queue = queue {qRunning=f $ qRunning queue}
80 48e4da5c Klaus Aehlig
81 48e4da5c Klaus Aehlig
-- | Apply a function on the queued jobs.
82 48e4da5c Klaus Aehlig
onQueuedJobs :: ([JobWithStat] -> [JobWithStat]) -> Queue -> Queue
83 48e4da5c Klaus Aehlig
onQueuedJobs f queue = queue {qEnqueued=f $ qEnqueued queue}
84 48e4da5c Klaus Aehlig
85 48e4da5c Klaus Aehlig
-- | Obtain a JobWithStat from a QueuedJob.
86 48e4da5c Klaus Aehlig
unreadJob :: QueuedJob -> JobWithStat
87 ed6cf449 Klaus Aehlig
unreadJob job = JobWithStat {jJob=job, jStat=nullFStat, jINotify=Nothing}
88 48e4da5c Klaus Aehlig
89 48e4da5c Klaus Aehlig
-- | Reload interval for polling the running jobs for updates in microseconds.
90 48e4da5c Klaus Aehlig
watchInterval :: Int
91 48e4da5c Klaus Aehlig
watchInterval = C.luxidJobqueuePollInterval * 1000000 
92 48e4da5c Klaus Aehlig
93 48e4da5c Klaus Aehlig
-- | Maximal number of jobs to be running at the same time.
94 48e4da5c Klaus Aehlig
maxRunningJobs :: Int
95 48e4da5c Klaus Aehlig
maxRunningJobs = C.luxidMaximalRunningJobs 
96 48e4da5c Klaus Aehlig
97 48e4da5c Klaus Aehlig
-- | Wrapper function to atomically update the jobs in the queue status.
98 48e4da5c Klaus Aehlig
modifyJobs :: JQStatus -> (Queue -> Queue) -> IO ()
99 48e4da5c Klaus Aehlig
modifyJobs qstat f = atomicModifyIORef (jqJobs qstat) (flip (,) ()  . f)
100 48e4da5c Klaus Aehlig
101 48e4da5c Klaus Aehlig
-- | Reread a job from disk, if the file has changed.
102 48e4da5c Klaus Aehlig
readJobStatus :: JobWithStat -> IO (Maybe JobWithStat)
103 ed6cf449 Klaus Aehlig
readJobStatus jWS@(JobWithStat {jStat=fstat, jJob=job})  = do
104 48e4da5c Klaus Aehlig
  let jid = qjId job
105 48e4da5c Klaus Aehlig
  qdir <- queueDir
106 48e4da5c Klaus Aehlig
  let fpath = liveJobFile qdir jid
107 48e4da5c Klaus Aehlig
  logDebug $ "Checking if " ++ fpath ++ " changed on disk."
108 350f0759 Klaus Aehlig
  changedResult <- try $ needsReload fstat fpath
109 350f0759 Klaus Aehlig
                   :: IO (Either IOError (Maybe FStat))
110 350f0759 Klaus Aehlig
  let changed = either (const $ Just nullFStat) id changedResult
111 48e4da5c Klaus Aehlig
  case changed of
112 48e4da5c Klaus Aehlig
    Nothing -> do
113 48e4da5c Klaus Aehlig
      logDebug $ "File " ++ fpath ++ " not changed on disk."
114 48e4da5c Klaus Aehlig
      return Nothing
115 48e4da5c Klaus Aehlig
    Just fstat' -> do
116 48e4da5c Klaus Aehlig
      let jids = show $ fromJobId jid
117 350f0759 Klaus Aehlig
      logInfo $ "Rereading job "  ++ jids
118 350f0759 Klaus Aehlig
      readResult <- loadJobFromDisk qdir True jid
119 48e4da5c Klaus Aehlig
      case readResult of
120 48e4da5c Klaus Aehlig
        Bad s -> do
121 48e4da5c Klaus Aehlig
          logWarning $ "Failed to read job " ++ jids ++ ": " ++ s
122 48e4da5c Klaus Aehlig
          return Nothing
123 48e4da5c Klaus Aehlig
        Ok (job', _) -> do
124 48e4da5c Klaus Aehlig
          logDebug
125 48e4da5c Klaus Aehlig
            $ "Read job " ++ jids ++ ", staus is " ++ show (calcJobStatus job')
126 ed6cf449 Klaus Aehlig
          return . Just $ jWS {jStat=fstat', jJob=job'}
127 ed6cf449 Klaus Aehlig
                          -- jINotify unchanged
128 48e4da5c Klaus Aehlig
129 48e4da5c Klaus Aehlig
-- | Update a job in the job queue, if it is still there. This is the
130 48e4da5c Klaus Aehlig
-- pure function for inserting a previously read change into the queue.
131 48e4da5c Klaus Aehlig
-- as the change contains its time stamp, we don't have to worry about a
132 48e4da5c Klaus Aehlig
-- later read change overwriting a newer read state. If this happens, the
133 48e4da5c Klaus Aehlig
-- fstat value will be outdated, so the next poller run will fix this.
134 48e4da5c Klaus Aehlig
updateJobStatus :: JobWithStat -> [JobWithStat] -> [JobWithStat]
135 48e4da5c Klaus Aehlig
updateJobStatus job' =
136 48e4da5c Klaus Aehlig
  let jid = qjId $ jJob job' in
137 48e4da5c Klaus Aehlig
  map (\job -> if qjId (jJob job) == jid then job' else job)
138 48e4da5c Klaus Aehlig
139 48e4da5c Klaus Aehlig
-- | Update a single job by reading it from disk, if necessary.
140 48e4da5c Klaus Aehlig
updateJob :: JQStatus -> JobWithStat -> IO ()
141 48e4da5c Klaus Aehlig
updateJob state jb = do
142 48e4da5c Klaus Aehlig
  jb' <- readJobStatus jb
143 48e4da5c Klaus Aehlig
  maybe (return ()) (modifyJobs state . onRunningJobs . updateJobStatus) jb'
144 ea174b21 Klaus Aehlig
  when (maybe True (jobFinalized . jJob) jb') . (>> return ()) . forkIO $ do
145 ea174b21 Klaus Aehlig
    logDebug "Scheduler noticed a job to have finished."
146 ea174b21 Klaus Aehlig
    cleanupFinishedJobs state
147 ea174b21 Klaus Aehlig
    scheduleSomeJobs state
148 48e4da5c Klaus Aehlig
149 48e4da5c Klaus Aehlig
-- | Sort out the finished jobs from the monitored part of the queue.
150 48e4da5c Klaus Aehlig
-- This is the pure part, splitting the queue into a remaining queue
151 48e4da5c Klaus Aehlig
-- and the jobs that were removed.
152 a2977f53 Klaus Aehlig
sortoutFinishedJobs :: Queue -> (Queue, [JobWithStat])
153 48e4da5c Klaus Aehlig
sortoutFinishedJobs queue =
154 1b3bde96 Klaus Aehlig
  let (fin, run') = partition (jobFinalized . jJob) . qRunning $ queue
155 a2977f53 Klaus Aehlig
  in (queue {qRunning=run'}, fin)
156 48e4da5c Klaus Aehlig
157 48e4da5c Klaus Aehlig
-- | Actually clean up the finished jobs. This is the IO wrapper around
158 48e4da5c Klaus Aehlig
-- the pure `sortoutFinishedJobs`.
159 48e4da5c Klaus Aehlig
cleanupFinishedJobs :: JQStatus -> IO ()
160 48e4da5c Klaus Aehlig
cleanupFinishedJobs qstate = do
161 48e4da5c Klaus Aehlig
  finished <- atomicModifyIORef (jqJobs qstate) sortoutFinishedJobs
162 a2977f53 Klaus Aehlig
  let showJob = show . ((fromJobId . qjId) &&& calcJobStatus) . jJob
163 48e4da5c Klaus Aehlig
      jlist = commaJoin $ map showJob finished
164 48e4da5c Klaus Aehlig
  unless (null finished)
165 48e4da5c Klaus Aehlig
    . logInfo $ "Finished jobs: " ++ jlist
166 cc5ab470 Klaus Aehlig
  mapM_ (maybe (return ()) killINotify . jINotify) finished
167 48e4da5c Klaus Aehlig
168 b81650b0 Klaus Aehlig
-- | Watcher task for a job, to update it on file changes. It also
169 b81650b0 Klaus Aehlig
-- reinstantiates itself upon receiving an Ignored event.
170 b81650b0 Klaus Aehlig
jobWatcher :: JQStatus -> JobWithStat -> Event -> IO ()
171 b81650b0 Klaus Aehlig
jobWatcher state jWS e = do
172 b81650b0 Klaus Aehlig
  let jid = qjId $ jJob jWS
173 b81650b0 Klaus Aehlig
      jids = show $ fromJobId jid
174 b81650b0 Klaus Aehlig
  logInfo $ "Scheduler notified of change of job " ++ jids
175 b81650b0 Klaus Aehlig
  logDebug $ "Scheulder notify event for " ++ jids ++ ": " ++ show e
176 b81650b0 Klaus Aehlig
  let inotify = jINotify jWS
177 b81650b0 Klaus Aehlig
  when (e == Ignored  && isJust inotify) $ do
178 b81650b0 Klaus Aehlig
    qdir <- queueDir
179 b81650b0 Klaus Aehlig
    let fpath = liveJobFile qdir jid
180 b81650b0 Klaus Aehlig
    _ <- addWatch (fromJust inotify) [Modify, Delete] fpath
181 b81650b0 Klaus Aehlig
           (jobWatcher state jWS)
182 b81650b0 Klaus Aehlig
    return ()
183 b81650b0 Klaus Aehlig
  updateJob state jWS
184 b81650b0 Klaus Aehlig
185 b81650b0 Klaus Aehlig
-- | Attach the job watcher to a running job.
186 b81650b0 Klaus Aehlig
attachWatcher :: JQStatus -> JobWithStat -> IO ()
187 b81650b0 Klaus Aehlig
attachWatcher state jWS = when (isNothing $ jINotify jWS) $ do
188 b81650b0 Klaus Aehlig
  inotify <- initINotify
189 b81650b0 Klaus Aehlig
  qdir <- queueDir
190 b81650b0 Klaus Aehlig
  let fpath = liveJobFile qdir . qjId $ jJob jWS
191 b81650b0 Klaus Aehlig
      jWS' = jWS { jINotify=Just inotify }
192 b81650b0 Klaus Aehlig
  logDebug $ "Attaching queue watcher for " ++ fpath
193 b81650b0 Klaus Aehlig
  _ <- addWatch inotify [Modify, Delete] fpath $ jobWatcher state jWS'
194 b81650b0 Klaus Aehlig
  modifyJobs state . onRunningJobs $ updateJobStatus jWS'
195 b81650b0 Klaus Aehlig
196 48e4da5c Klaus Aehlig
-- | Decide on which jobs to schedule next for execution. This is the
197 48e4da5c Klaus Aehlig
-- pure function doing the scheduling.
198 a2977f53 Klaus Aehlig
selectJobsToRun :: Queue -> (Queue, [JobWithStat])
199 48e4da5c Klaus Aehlig
selectJobsToRun queue =
200 48e4da5c Klaus Aehlig
  let n = maxRunningJobs - length (qRunning queue)
201 48e4da5c Klaus Aehlig
      (chosen, remain) = splitAt n (qEnqueued queue)
202 a2977f53 Klaus Aehlig
  in (queue {qEnqueued=remain, qRunning=qRunning queue ++ chosen}, chosen)
203 48e4da5c Klaus Aehlig
204 1c532d2d Klaus Aehlig
-- | Requeue jobs that were previously selected for execution
205 1c532d2d Klaus Aehlig
-- but couldn't be started.
206 a2977f53 Klaus Aehlig
requeueJobs :: JQStatus -> [JobWithStat] -> IOError -> IO ()
207 1c532d2d Klaus Aehlig
requeueJobs qstate jobs err = do
208 a2977f53 Klaus Aehlig
  let jids = map (qjId . jJob) jobs
209 1c532d2d Klaus Aehlig
      jidsString = commaJoin $ map (show . fromJobId) jids
210 1c532d2d Klaus Aehlig
      rmJobs = filter ((`notElem` jids) . qjId . jJob)
211 1c532d2d Klaus Aehlig
  logWarning $ "Starting jobs failed: " ++ show err
212 1c532d2d Klaus Aehlig
  logWarning $ "Rescheduling jobs: " ++ jidsString
213 1c532d2d Klaus Aehlig
  modifyJobs qstate (onRunningJobs rmJobs)
214 a2977f53 Klaus Aehlig
  modifyJobs qstate (onQueuedJobs $ (++) jobs)
215 1c532d2d Klaus Aehlig
216 48e4da5c Klaus Aehlig
-- | Schedule jobs to be run. This is the IO wrapper around the
217 48e4da5c Klaus Aehlig
-- pure `selectJobsToRun`.
218 48e4da5c Klaus Aehlig
scheduleSomeJobs :: JQStatus -> IO ()
219 48e4da5c Klaus Aehlig
scheduleSomeJobs qstate = do
220 48e4da5c Klaus Aehlig
  chosen <- atomicModifyIORef (jqJobs qstate) selectJobsToRun
221 a2977f53 Klaus Aehlig
  let jobs = map jJob chosen
222 7dd21737 Klaus Aehlig
  unless (null chosen) . logInfo . (++) "Starting jobs: " . commaJoin
223 a2977f53 Klaus Aehlig
    $ map (show . fromJobId . qjId) jobs
224 b81650b0 Klaus Aehlig
  mapM_ (attachWatcher qstate) chosen
225 a2977f53 Klaus Aehlig
  result <- try $ JQ.startJobs jobs
226 1c532d2d Klaus Aehlig
  either (requeueJobs qstate chosen) return result
227 48e4da5c Klaus Aehlig
228 48e4da5c Klaus Aehlig
-- | Format the job queue status in a compact, human readable way.
229 48e4da5c Klaus Aehlig
showQueue :: Queue -> String
230 48e4da5c Klaus Aehlig
showQueue (Queue {qEnqueued=waiting, qRunning=running}) =
231 48e4da5c Klaus Aehlig
  let showids = show . map (fromJobId . qjId . jJob)
232 48e4da5c Klaus Aehlig
  in "Waiting jobs: " ++ showids waiting 
233 48e4da5c Klaus Aehlig
       ++ "; running jobs: " ++ showids running
234 48e4da5c Klaus Aehlig
235 48e4da5c Klaus Aehlig
-- | Time-based watcher for updating the job queue.
236 48e4da5c Klaus Aehlig
onTimeWatcher :: JQStatus -> IO ()
237 48e4da5c Klaus Aehlig
onTimeWatcher qstate = forever $ do
238 48e4da5c Klaus Aehlig
  threadDelay watchInterval
239 fe50bb65 Klaus Aehlig
  logDebug "Job queue watcher timer fired"
240 48e4da5c Klaus Aehlig
  jobs <- readIORef (jqJobs qstate)
241 48e4da5c Klaus Aehlig
  mapM_ (updateJob qstate) $ qRunning jobs
242 48e4da5c Klaus Aehlig
  cleanupFinishedJobs qstate
243 48e4da5c Klaus Aehlig
  jobs' <- readIORef (jqJobs qstate)
244 48e4da5c Klaus Aehlig
  logInfo $ showQueue jobs'
245 48e4da5c Klaus Aehlig
  scheduleSomeJobs qstate
246 48e4da5c Klaus Aehlig
247 2713b91a Klaus Aehlig
-- | Read a single, non-archived, job, specified by its id, from disk.
248 2713b91a Klaus Aehlig
readJobFromDisk :: JobId -> IO (Result JobWithStat)
249 2713b91a Klaus Aehlig
readJobFromDisk jid = do
250 2713b91a Klaus Aehlig
  qdir <- queueDir
251 2713b91a Klaus Aehlig
  let fpath = liveJobFile qdir jid
252 2713b91a Klaus Aehlig
  logDebug $ "Reading " ++ fpath
253 2713b91a Klaus Aehlig
  tryFstat <- try $ getFStat fpath :: IO (Either IOError FStat)
254 2713b91a Klaus Aehlig
  let fstat = either (const nullFStat) id tryFstat
255 2713b91a Klaus Aehlig
  loadResult <- JQ.loadJobFromDisk qdir False jid
256 ed6cf449 Klaus Aehlig
  return $ liftM (JobWithStat Nothing fstat . fst) loadResult
257 2713b91a Klaus Aehlig
258 2713b91a Klaus Aehlig
-- | Read all non-finalized jobs from disk.
259 2713b91a Klaus Aehlig
readJobsFromDisk :: IO [JobWithStat]
260 2713b91a Klaus Aehlig
readJobsFromDisk = do
261 2713b91a Klaus Aehlig
  logInfo "Loading job queue"
262 2713b91a Klaus Aehlig
  qdir <- queueDir
263 2713b91a Klaus Aehlig
  eitherJids <- JQ.getJobIDs [qdir]
264 2713b91a Klaus Aehlig
  let jids = either (const []) JQ.sortJobIDs eitherJids
265 2713b91a Klaus Aehlig
      jidsstring = commaJoin $ map (show . fromJobId) jids
266 2713b91a Klaus Aehlig
  logInfo $ "Non-archived jobs on disk: " ++ jidsstring
267 2713b91a Klaus Aehlig
  jobs <- mapM readJobFromDisk jids
268 2713b91a Klaus Aehlig
  return $ justOk jobs
269 2713b91a Klaus Aehlig
270 48e4da5c Klaus Aehlig
-- | Set up the job scheduler. This will also start the monitoring
271 48e4da5c Klaus Aehlig
-- of changes to the running jobs.
272 48e4da5c Klaus Aehlig
initJQScheduler :: JQStatus -> IO ()
273 48e4da5c Klaus Aehlig
initJQScheduler qstate = do
274 2713b91a Klaus Aehlig
  alljobs <- readJobsFromDisk
275 2713b91a Klaus Aehlig
  let jobs = filter (not . jobFinalized . jJob) alljobs
276 2713b91a Klaus Aehlig
      (running, queued) = partition (jobStarted . jJob) jobs
277 2713b91a Klaus Aehlig
  modifyJobs qstate (onQueuedJobs (++ queued) . onRunningJobs (++ running))
278 2713b91a Klaus Aehlig
  jqjobs <- readIORef (jqJobs qstate)
279 2713b91a Klaus Aehlig
  logInfo $ showQueue jqjobs
280 2713b91a Klaus Aehlig
  scheduleSomeJobs qstate
281 48e4da5c Klaus Aehlig
  logInfo "Starting time-based job queue watcher"
282 48e4da5c Klaus Aehlig
  _ <- forkIO $ onTimeWatcher qstate
283 48e4da5c Klaus Aehlig
  return ()
284 48e4da5c Klaus Aehlig
285 48e4da5c Klaus Aehlig
-- | Enqueue new jobs. This will guarantee that the jobs will be executed
286 48e4da5c Klaus Aehlig
-- eventually.
287 48e4da5c Klaus Aehlig
enqueueNewJobs :: JQStatus -> [QueuedJob] -> IO ()
288 48e4da5c Klaus Aehlig
enqueueNewJobs state jobs = do
289 48e4da5c Klaus Aehlig
  logInfo . (++) "New jobs enqueued: " . commaJoin
290 48e4da5c Klaus Aehlig
    $ map (show . fromJobId . qjId) jobs
291 48e4da5c Klaus Aehlig
  let jobs' = map unreadJob jobs
292 48e4da5c Klaus Aehlig
  modifyJobs state (onQueuedJobs (++ jobs'))
293 48e4da5c Klaus Aehlig
  scheduleSomeJobs state
294 bb62d52e Klaus Aehlig
295 bb62d52e Klaus Aehlig
-- | Pure function for removing a queued job from the job queue by
296 bb62d52e Klaus Aehlig
-- atomicModifyIORef. The answer is True if the job could be removed
297 bb62d52e Klaus Aehlig
-- before being handed over to execution, False if it already was started
298 bb62d52e Klaus Aehlig
-- and a Bad result if the job is not found in the queue.
299 bb62d52e Klaus Aehlig
rmJob :: JobId -> Queue -> (Queue, Result Bool)
300 bb62d52e Klaus Aehlig
rmJob jid q =
301 bb62d52e Klaus Aehlig
  let isJid = (jid ==) . qjId . jJob
302 bb62d52e Klaus Aehlig
      (found, queued') = partition isJid $ qEnqueued q
303 bb62d52e Klaus Aehlig
  in if null found
304 bb62d52e Klaus Aehlig
       then if any isJid $ qRunning q
305 bb62d52e Klaus Aehlig
              then (q, Ok False)
306 bb62d52e Klaus Aehlig
              else (q, Bad $ "Job " ++ show (fromJobId jid)
307 bb62d52e Klaus Aehlig
                              ++ " unknown to the queue")
308 bb62d52e Klaus Aehlig
       else (q {qEnqueued = queued'}, Ok True)
309 bb62d52e Klaus Aehlig
310 bb62d52e Klaus Aehlig
-- | Try to remove a queued job from the job queue. Return True, if
311 bb62d52e Klaus Aehlig
-- the job could be removed from the queue before being handed over
312 bb62d52e Klaus Aehlig
-- to execution, False if the job already started, and a Bad result
313 bb62d52e Klaus Aehlig
-- if the job is unknown.
314 bb62d52e Klaus Aehlig
dequeueJob :: JQStatus -> JobId -> IO (Result Bool)
315 bb62d52e Klaus Aehlig
dequeueJob state jid = do
316 bb62d52e Klaus Aehlig
  result <- atomicModifyIORef (jqJobs state) $ rmJob jid
317 bb62d52e Klaus Aehlig
  logDebug $ "Result of dequeing job " ++ show (fromJobId jid)
318 bb62d52e Klaus Aehlig
              ++ " is " ++ show result
319 bb62d52e Klaus Aehlig
  return result