Statistics
| Branch: | Tag: | Revision:

root / src / Ganeti / JQScheduler.hs @ c92b4671

History | View | Annotate | Download (12.3 kB)

1 48e4da5c Klaus Aehlig
{-| Implementation of a reader for the job queue.
2 48e4da5c Klaus Aehlig
3 48e4da5c Klaus Aehlig
-}
4 48e4da5c Klaus Aehlig
5 48e4da5c Klaus Aehlig
{-
6 48e4da5c Klaus Aehlig
7 48e4da5c Klaus Aehlig
Copyright (C) 2013 Google Inc.
8 48e4da5c Klaus Aehlig
9 48e4da5c Klaus Aehlig
This program is free software; you can redistribute it and/or modify
10 48e4da5c Klaus Aehlig
it under the terms of the GNU General Public License as published by
11 48e4da5c Klaus Aehlig
the Free Software Foundation; either version 2 of the License, or
12 48e4da5c Klaus Aehlig
(at your option) any later version.
13 48e4da5c Klaus Aehlig
14 48e4da5c Klaus Aehlig
This program is distributed in the hope that it will be useful, but
15 48e4da5c Klaus Aehlig
WITHOUT ANY WARRANTY; without even the implied warranty of
16 48e4da5c Klaus Aehlig
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
17 48e4da5c Klaus Aehlig
General Public License for more details.
18 48e4da5c Klaus Aehlig
19 48e4da5c Klaus Aehlig
You should have received a copy of the GNU General Public License
20 48e4da5c Klaus Aehlig
along with this program; if not, write to the Free Software
21 48e4da5c Klaus Aehlig
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 48e4da5c Klaus Aehlig
02110-1301, USA.
23 48e4da5c Klaus Aehlig
24 48e4da5c Klaus Aehlig
-}
25 48e4da5c Klaus Aehlig
26 48e4da5c Klaus Aehlig
module Ganeti.JQScheduler
27 48e4da5c Klaus Aehlig
  ( JQStatus
28 48e4da5c Klaus Aehlig
  , emptyJQStatus
29 48e4da5c Klaus Aehlig
  , initJQScheduler
30 48e4da5c Klaus Aehlig
  , enqueueNewJobs
31 bb62d52e Klaus Aehlig
  , dequeueJob
32 48e4da5c Klaus Aehlig
  ) where
33 48e4da5c Klaus Aehlig
34 48e4da5c Klaus Aehlig
import Control.Arrow
35 48e4da5c Klaus Aehlig
import Control.Concurrent
36 1c532d2d Klaus Aehlig
import Control.Exception
37 48e4da5c Klaus Aehlig
import Control.Monad
38 48e4da5c Klaus Aehlig
import Data.List
39 b81650b0 Klaus Aehlig
import Data.Maybe
40 48e4da5c Klaus Aehlig
import Data.IORef
41 ed6cf449 Klaus Aehlig
import System.INotify
42 48e4da5c Klaus Aehlig
43 48e4da5c Klaus Aehlig
import Ganeti.BasicTypes
44 48e4da5c Klaus Aehlig
import Ganeti.Constants as C
45 48e4da5c Klaus Aehlig
import Ganeti.JQueue as JQ
46 48e4da5c Klaus Aehlig
import Ganeti.Logging
47 d9dd04b1 Klaus Aehlig
import Ganeti.Objects
48 48e4da5c Klaus Aehlig
import Ganeti.Path
49 48e4da5c Klaus Aehlig
import Ganeti.Types
50 48e4da5c Klaus Aehlig
import Ganeti.Utils
51 48e4da5c Klaus Aehlig
52 ed6cf449 Klaus Aehlig
data JobWithStat = JobWithStat { jINotify :: Maybe INotify
53 ed6cf449 Klaus Aehlig
                               , jStat :: FStat
54 ed6cf449 Klaus Aehlig
                               , jJob :: QueuedJob
55 ed6cf449 Klaus Aehlig
                               }
56 48e4da5c Klaus Aehlig
data Queue = Queue { qEnqueued :: [JobWithStat], qRunning :: [JobWithStat] }
57 48e4da5c Klaus Aehlig
58 48e4da5c Klaus Aehlig
{-| Representation of the job queue
59 48e4da5c Klaus Aehlig
60 48e4da5c Klaus Aehlig
We keep two lists of jobs (together with information about the last
61 48e4da5c Klaus Aehlig
fstat result observed): the jobs that are enqueued, but not yet handed
62 48e4da5c Klaus Aehlig
over for execution, and the jobs already handed over for execution. They
63 48e4da5c Klaus Aehlig
are kept together in a single IORef, so that we can atomically update
64 48e4da5c Klaus Aehlig
both, in particular when scheduling jobs to be handed over for execution.
65 48e4da5c Klaus Aehlig
66 48e4da5c Klaus Aehlig
-}
67 48e4da5c Klaus Aehlig
68 48e4da5c Klaus Aehlig
data JQStatus = JQStatus
69 48e4da5c Klaus Aehlig
  { jqJobs :: IORef Queue
70 6046dca9 Klaus Aehlig
  , jqConfig :: IORef (Result ConfigData)
71 48e4da5c Klaus Aehlig
  }
72 48e4da5c Klaus Aehlig
73 48e4da5c Klaus Aehlig
74 6046dca9 Klaus Aehlig
emptyJQStatus :: IORef (Result ConfigData) -> IO JQStatus
75 6046dca9 Klaus Aehlig
emptyJQStatus config = do
76 6046dca9 Klaus Aehlig
  jqJ <- newIORef Queue { qEnqueued = [], qRunning = []}
77 6046dca9 Klaus Aehlig
  return JQStatus { jqJobs = jqJ, jqConfig = config }
78 48e4da5c Klaus Aehlig
79 48e4da5c Klaus Aehlig
-- | Apply a function on the running jobs.
80 48e4da5c Klaus Aehlig
onRunningJobs :: ([JobWithStat] -> [JobWithStat]) -> Queue -> Queue
81 48e4da5c Klaus Aehlig
onRunningJobs f queue = queue {qRunning=f $ qRunning queue}
82 48e4da5c Klaus Aehlig
83 48e4da5c Klaus Aehlig
-- | Apply a function on the queued jobs.
84 48e4da5c Klaus Aehlig
onQueuedJobs :: ([JobWithStat] -> [JobWithStat]) -> Queue -> Queue
85 48e4da5c Klaus Aehlig
onQueuedJobs f queue = queue {qEnqueued=f $ qEnqueued queue}
86 48e4da5c Klaus Aehlig
87 48e4da5c Klaus Aehlig
-- | Obtain a JobWithStat from a QueuedJob.
88 48e4da5c Klaus Aehlig
unreadJob :: QueuedJob -> JobWithStat
89 ed6cf449 Klaus Aehlig
unreadJob job = JobWithStat {jJob=job, jStat=nullFStat, jINotify=Nothing}
90 48e4da5c Klaus Aehlig
91 48e4da5c Klaus Aehlig
-- | Reload interval for polling the running jobs for updates in microseconds.
92 48e4da5c Klaus Aehlig
watchInterval :: Int
93 48e4da5c Klaus Aehlig
watchInterval = C.luxidJobqueuePollInterval * 1000000 
94 48e4da5c Klaus Aehlig
95 d9dd04b1 Klaus Aehlig
-- | Get the maximual number of jobs to be run simultaneously from the
96 d9dd04b1 Klaus Aehlig
-- configuration. If the configuration is not available, be conservative
97 d9dd04b1 Klaus Aehlig
-- and use the smallest possible value, i.e., 1.
98 d9dd04b1 Klaus Aehlig
getMaxRunningJobs :: JQStatus -> IO Int
99 d9dd04b1 Klaus Aehlig
getMaxRunningJobs =
100 d9dd04b1 Klaus Aehlig
  liftM (genericResult (const 1) (clusterMaxRunningJobs . configCluster))
101 d9dd04b1 Klaus Aehlig
  . readIORef . jqConfig
102 48e4da5c Klaus Aehlig
103 48e4da5c Klaus Aehlig
-- | Wrapper function to atomically update the jobs in the queue status.
104 48e4da5c Klaus Aehlig
modifyJobs :: JQStatus -> (Queue -> Queue) -> IO ()
105 48e4da5c Klaus Aehlig
modifyJobs qstat f = atomicModifyIORef (jqJobs qstat) (flip (,) ()  . f)
106 48e4da5c Klaus Aehlig
107 48e4da5c Klaus Aehlig
-- | Reread a job from disk, if the file has changed.
108 48e4da5c Klaus Aehlig
readJobStatus :: JobWithStat -> IO (Maybe JobWithStat)
109 ed6cf449 Klaus Aehlig
readJobStatus jWS@(JobWithStat {jStat=fstat, jJob=job})  = do
110 48e4da5c Klaus Aehlig
  let jid = qjId job
111 48e4da5c Klaus Aehlig
  qdir <- queueDir
112 48e4da5c Klaus Aehlig
  let fpath = liveJobFile qdir jid
113 48e4da5c Klaus Aehlig
  logDebug $ "Checking if " ++ fpath ++ " changed on disk."
114 350f0759 Klaus Aehlig
  changedResult <- try $ needsReload fstat fpath
115 350f0759 Klaus Aehlig
                   :: IO (Either IOError (Maybe FStat))
116 350f0759 Klaus Aehlig
  let changed = either (const $ Just nullFStat) id changedResult
117 48e4da5c Klaus Aehlig
  case changed of
118 48e4da5c Klaus Aehlig
    Nothing -> do
119 48e4da5c Klaus Aehlig
      logDebug $ "File " ++ fpath ++ " not changed on disk."
120 48e4da5c Klaus Aehlig
      return Nothing
121 48e4da5c Klaus Aehlig
    Just fstat' -> do
122 48e4da5c Klaus Aehlig
      let jids = show $ fromJobId jid
123 350f0759 Klaus Aehlig
      logInfo $ "Rereading job "  ++ jids
124 350f0759 Klaus Aehlig
      readResult <- loadJobFromDisk qdir True jid
125 48e4da5c Klaus Aehlig
      case readResult of
126 48e4da5c Klaus Aehlig
        Bad s -> do
127 48e4da5c Klaus Aehlig
          logWarning $ "Failed to read job " ++ jids ++ ": " ++ s
128 48e4da5c Klaus Aehlig
          return Nothing
129 48e4da5c Klaus Aehlig
        Ok (job', _) -> do
130 48e4da5c Klaus Aehlig
          logDebug
131 48e4da5c Klaus Aehlig
            $ "Read job " ++ jids ++ ", staus is " ++ show (calcJobStatus job')
132 ed6cf449 Klaus Aehlig
          return . Just $ jWS {jStat=fstat', jJob=job'}
133 ed6cf449 Klaus Aehlig
                          -- jINotify unchanged
134 48e4da5c Klaus Aehlig
135 48e4da5c Klaus Aehlig
-- | Update a job in the job queue, if it is still there. This is the
136 48e4da5c Klaus Aehlig
-- pure function for inserting a previously read change into the queue.
137 48e4da5c Klaus Aehlig
-- as the change contains its time stamp, we don't have to worry about a
138 48e4da5c Klaus Aehlig
-- later read change overwriting a newer read state. If this happens, the
139 48e4da5c Klaus Aehlig
-- fstat value will be outdated, so the next poller run will fix this.
140 48e4da5c Klaus Aehlig
updateJobStatus :: JobWithStat -> [JobWithStat] -> [JobWithStat]
141 48e4da5c Klaus Aehlig
updateJobStatus job' =
142 48e4da5c Klaus Aehlig
  let jid = qjId $ jJob job' in
143 48e4da5c Klaus Aehlig
  map (\job -> if qjId (jJob job) == jid then job' else job)
144 48e4da5c Klaus Aehlig
145 48e4da5c Klaus Aehlig
-- | Update a single job by reading it from disk, if necessary.
146 48e4da5c Klaus Aehlig
updateJob :: JQStatus -> JobWithStat -> IO ()
147 48e4da5c Klaus Aehlig
updateJob state jb = do
148 48e4da5c Klaus Aehlig
  jb' <- readJobStatus jb
149 48e4da5c Klaus Aehlig
  maybe (return ()) (modifyJobs state . onRunningJobs . updateJobStatus) jb'
150 ea174b21 Klaus Aehlig
  when (maybe True (jobFinalized . jJob) jb') . (>> return ()) . forkIO $ do
151 ea174b21 Klaus Aehlig
    logDebug "Scheduler noticed a job to have finished."
152 ea174b21 Klaus Aehlig
    cleanupFinishedJobs state
153 ea174b21 Klaus Aehlig
    scheduleSomeJobs state
154 48e4da5c Klaus Aehlig
155 48e4da5c Klaus Aehlig
-- | Sort out the finished jobs from the monitored part of the queue.
156 48e4da5c Klaus Aehlig
-- This is the pure part, splitting the queue into a remaining queue
157 48e4da5c Klaus Aehlig
-- and the jobs that were removed.
158 a2977f53 Klaus Aehlig
sortoutFinishedJobs :: Queue -> (Queue, [JobWithStat])
159 48e4da5c Klaus Aehlig
sortoutFinishedJobs queue =
160 1b3bde96 Klaus Aehlig
  let (fin, run') = partition (jobFinalized . jJob) . qRunning $ queue
161 a2977f53 Klaus Aehlig
  in (queue {qRunning=run'}, fin)
162 48e4da5c Klaus Aehlig
163 48e4da5c Klaus Aehlig
-- | Actually clean up the finished jobs. This is the IO wrapper around
164 48e4da5c Klaus Aehlig
-- the pure `sortoutFinishedJobs`.
165 48e4da5c Klaus Aehlig
cleanupFinishedJobs :: JQStatus -> IO ()
166 48e4da5c Klaus Aehlig
cleanupFinishedJobs qstate = do
167 48e4da5c Klaus Aehlig
  finished <- atomicModifyIORef (jqJobs qstate) sortoutFinishedJobs
168 a2977f53 Klaus Aehlig
  let showJob = show . ((fromJobId . qjId) &&& calcJobStatus) . jJob
169 48e4da5c Klaus Aehlig
      jlist = commaJoin $ map showJob finished
170 48e4da5c Klaus Aehlig
  unless (null finished)
171 48e4da5c Klaus Aehlig
    . logInfo $ "Finished jobs: " ++ jlist
172 cc5ab470 Klaus Aehlig
  mapM_ (maybe (return ()) killINotify . jINotify) finished
173 48e4da5c Klaus Aehlig
174 b81650b0 Klaus Aehlig
-- | Watcher task for a job, to update it on file changes. It also
175 b81650b0 Klaus Aehlig
-- reinstantiates itself upon receiving an Ignored event.
176 b81650b0 Klaus Aehlig
jobWatcher :: JQStatus -> JobWithStat -> Event -> IO ()
177 b81650b0 Klaus Aehlig
jobWatcher state jWS e = do
178 b81650b0 Klaus Aehlig
  let jid = qjId $ jJob jWS
179 b81650b0 Klaus Aehlig
      jids = show $ fromJobId jid
180 b81650b0 Klaus Aehlig
  logInfo $ "Scheduler notified of change of job " ++ jids
181 b81650b0 Klaus Aehlig
  logDebug $ "Scheulder notify event for " ++ jids ++ ": " ++ show e
182 b81650b0 Klaus Aehlig
  let inotify = jINotify jWS
183 b81650b0 Klaus Aehlig
  when (e == Ignored  && isJust inotify) $ do
184 b81650b0 Klaus Aehlig
    qdir <- queueDir
185 b81650b0 Klaus Aehlig
    let fpath = liveJobFile qdir jid
186 b81650b0 Klaus Aehlig
    _ <- addWatch (fromJust inotify) [Modify, Delete] fpath
187 b81650b0 Klaus Aehlig
           (jobWatcher state jWS)
188 b81650b0 Klaus Aehlig
    return ()
189 b81650b0 Klaus Aehlig
  updateJob state jWS
190 b81650b0 Klaus Aehlig
191 b81650b0 Klaus Aehlig
-- | Attach the job watcher to a running job.
192 b81650b0 Klaus Aehlig
attachWatcher :: JQStatus -> JobWithStat -> IO ()
193 b81650b0 Klaus Aehlig
attachWatcher state jWS = when (isNothing $ jINotify jWS) $ do
194 b81650b0 Klaus Aehlig
  inotify <- initINotify
195 b81650b0 Klaus Aehlig
  qdir <- queueDir
196 b81650b0 Klaus Aehlig
  let fpath = liveJobFile qdir . qjId $ jJob jWS
197 b81650b0 Klaus Aehlig
      jWS' = jWS { jINotify=Just inotify }
198 b81650b0 Klaus Aehlig
  logDebug $ "Attaching queue watcher for " ++ fpath
199 b81650b0 Klaus Aehlig
  _ <- addWatch inotify [Modify, Delete] fpath $ jobWatcher state jWS'
200 b81650b0 Klaus Aehlig
  modifyJobs state . onRunningJobs $ updateJobStatus jWS'
201 b81650b0 Klaus Aehlig
202 48e4da5c Klaus Aehlig
-- | Decide on which jobs to schedule next for execution. This is the
203 48e4da5c Klaus Aehlig
-- pure function doing the scheduling.
204 d9dd04b1 Klaus Aehlig
selectJobsToRun :: Int -> Queue -> (Queue, [JobWithStat])
205 d9dd04b1 Klaus Aehlig
selectJobsToRun count queue =
206 d9dd04b1 Klaus Aehlig
  let n = count - length (qRunning queue)
207 48e4da5c Klaus Aehlig
      (chosen, remain) = splitAt n (qEnqueued queue)
208 a2977f53 Klaus Aehlig
  in (queue {qEnqueued=remain, qRunning=qRunning queue ++ chosen}, chosen)
209 48e4da5c Klaus Aehlig
210 1c532d2d Klaus Aehlig
-- | Requeue jobs that were previously selected for execution
211 1c532d2d Klaus Aehlig
-- but couldn't be started.
212 a2977f53 Klaus Aehlig
requeueJobs :: JQStatus -> [JobWithStat] -> IOError -> IO ()
213 1c532d2d Klaus Aehlig
requeueJobs qstate jobs err = do
214 a2977f53 Klaus Aehlig
  let jids = map (qjId . jJob) jobs
215 1c532d2d Klaus Aehlig
      jidsString = commaJoin $ map (show . fromJobId) jids
216 1c532d2d Klaus Aehlig
      rmJobs = filter ((`notElem` jids) . qjId . jJob)
217 1c532d2d Klaus Aehlig
  logWarning $ "Starting jobs failed: " ++ show err
218 1c532d2d Klaus Aehlig
  logWarning $ "Rescheduling jobs: " ++ jidsString
219 1c532d2d Klaus Aehlig
  modifyJobs qstate (onRunningJobs rmJobs)
220 a2977f53 Klaus Aehlig
  modifyJobs qstate (onQueuedJobs $ (++) jobs)
221 1c532d2d Klaus Aehlig
222 48e4da5c Klaus Aehlig
-- | Schedule jobs to be run. This is the IO wrapper around the
223 48e4da5c Klaus Aehlig
-- pure `selectJobsToRun`.
224 48e4da5c Klaus Aehlig
scheduleSomeJobs :: JQStatus -> IO ()
225 48e4da5c Klaus Aehlig
scheduleSomeJobs qstate = do
226 d9dd04b1 Klaus Aehlig
  count <- getMaxRunningJobs qstate
227 d9dd04b1 Klaus Aehlig
  chosen <- atomicModifyIORef (jqJobs qstate) (selectJobsToRun count)
228 a2977f53 Klaus Aehlig
  let jobs = map jJob chosen
229 7dd21737 Klaus Aehlig
  unless (null chosen) . logInfo . (++) "Starting jobs: " . commaJoin
230 a2977f53 Klaus Aehlig
    $ map (show . fromJobId . qjId) jobs
231 b81650b0 Klaus Aehlig
  mapM_ (attachWatcher qstate) chosen
232 a2977f53 Klaus Aehlig
  result <- try $ JQ.startJobs jobs
233 1c532d2d Klaus Aehlig
  either (requeueJobs qstate chosen) return result
234 48e4da5c Klaus Aehlig
235 48e4da5c Klaus Aehlig
-- | Format the job queue status in a compact, human readable way.
236 48e4da5c Klaus Aehlig
showQueue :: Queue -> String
237 48e4da5c Klaus Aehlig
showQueue (Queue {qEnqueued=waiting, qRunning=running}) =
238 48e4da5c Klaus Aehlig
  let showids = show . map (fromJobId . qjId . jJob)
239 48e4da5c Klaus Aehlig
  in "Waiting jobs: " ++ showids waiting 
240 48e4da5c Klaus Aehlig
       ++ "; running jobs: " ++ showids running
241 48e4da5c Klaus Aehlig
242 48e4da5c Klaus Aehlig
-- | Time-based watcher for updating the job queue.
243 48e4da5c Klaus Aehlig
onTimeWatcher :: JQStatus -> IO ()
244 48e4da5c Klaus Aehlig
onTimeWatcher qstate = forever $ do
245 48e4da5c Klaus Aehlig
  threadDelay watchInterval
246 fe50bb65 Klaus Aehlig
  logDebug "Job queue watcher timer fired"
247 48e4da5c Klaus Aehlig
  jobs <- readIORef (jqJobs qstate)
248 48e4da5c Klaus Aehlig
  mapM_ (updateJob qstate) $ qRunning jobs
249 48e4da5c Klaus Aehlig
  cleanupFinishedJobs qstate
250 48e4da5c Klaus Aehlig
  jobs' <- readIORef (jqJobs qstate)
251 48e4da5c Klaus Aehlig
  logInfo $ showQueue jobs'
252 48e4da5c Klaus Aehlig
  scheduleSomeJobs qstate
253 48e4da5c Klaus Aehlig
254 2713b91a Klaus Aehlig
-- | Read a single, non-archived, job, specified by its id, from disk.
255 2713b91a Klaus Aehlig
readJobFromDisk :: JobId -> IO (Result JobWithStat)
256 2713b91a Klaus Aehlig
readJobFromDisk jid = do
257 2713b91a Klaus Aehlig
  qdir <- queueDir
258 2713b91a Klaus Aehlig
  let fpath = liveJobFile qdir jid
259 2713b91a Klaus Aehlig
  logDebug $ "Reading " ++ fpath
260 2713b91a Klaus Aehlig
  tryFstat <- try $ getFStat fpath :: IO (Either IOError FStat)
261 2713b91a Klaus Aehlig
  let fstat = either (const nullFStat) id tryFstat
262 2713b91a Klaus Aehlig
  loadResult <- JQ.loadJobFromDisk qdir False jid
263 ed6cf449 Klaus Aehlig
  return $ liftM (JobWithStat Nothing fstat . fst) loadResult
264 2713b91a Klaus Aehlig
265 2713b91a Klaus Aehlig
-- | Read all non-finalized jobs from disk.
266 2713b91a Klaus Aehlig
readJobsFromDisk :: IO [JobWithStat]
267 2713b91a Klaus Aehlig
readJobsFromDisk = do
268 2713b91a Klaus Aehlig
  logInfo "Loading job queue"
269 2713b91a Klaus Aehlig
  qdir <- queueDir
270 2713b91a Klaus Aehlig
  eitherJids <- JQ.getJobIDs [qdir]
271 2713b91a Klaus Aehlig
  let jids = either (const []) JQ.sortJobIDs eitherJids
272 2713b91a Klaus Aehlig
      jidsstring = commaJoin $ map (show . fromJobId) jids
273 2713b91a Klaus Aehlig
  logInfo $ "Non-archived jobs on disk: " ++ jidsstring
274 2713b91a Klaus Aehlig
  jobs <- mapM readJobFromDisk jids
275 2713b91a Klaus Aehlig
  return $ justOk jobs
276 2713b91a Klaus Aehlig
277 48e4da5c Klaus Aehlig
-- | Set up the job scheduler. This will also start the monitoring
278 48e4da5c Klaus Aehlig
-- of changes to the running jobs.
279 48e4da5c Klaus Aehlig
initJQScheduler :: JQStatus -> IO ()
280 48e4da5c Klaus Aehlig
initJQScheduler qstate = do
281 2713b91a Klaus Aehlig
  alljobs <- readJobsFromDisk
282 2713b91a Klaus Aehlig
  let jobs = filter (not . jobFinalized . jJob) alljobs
283 2713b91a Klaus Aehlig
      (running, queued) = partition (jobStarted . jJob) jobs
284 2713b91a Klaus Aehlig
  modifyJobs qstate (onQueuedJobs (++ queued) . onRunningJobs (++ running))
285 2713b91a Klaus Aehlig
  jqjobs <- readIORef (jqJobs qstate)
286 2713b91a Klaus Aehlig
  logInfo $ showQueue jqjobs
287 2713b91a Klaus Aehlig
  scheduleSomeJobs qstate
288 48e4da5c Klaus Aehlig
  logInfo "Starting time-based job queue watcher"
289 48e4da5c Klaus Aehlig
  _ <- forkIO $ onTimeWatcher qstate
290 48e4da5c Klaus Aehlig
  return ()
291 48e4da5c Klaus Aehlig
292 48e4da5c Klaus Aehlig
-- | Enqueue new jobs. This will guarantee that the jobs will be executed
293 48e4da5c Klaus Aehlig
-- eventually.
294 48e4da5c Klaus Aehlig
enqueueNewJobs :: JQStatus -> [QueuedJob] -> IO ()
295 48e4da5c Klaus Aehlig
enqueueNewJobs state jobs = do
296 48e4da5c Klaus Aehlig
  logInfo . (++) "New jobs enqueued: " . commaJoin
297 48e4da5c Klaus Aehlig
    $ map (show . fromJobId . qjId) jobs
298 48e4da5c Klaus Aehlig
  let jobs' = map unreadJob jobs
299 48e4da5c Klaus Aehlig
  modifyJobs state (onQueuedJobs (++ jobs'))
300 48e4da5c Klaus Aehlig
  scheduleSomeJobs state
301 bb62d52e Klaus Aehlig
302 bb62d52e Klaus Aehlig
-- | Pure function for removing a queued job from the job queue by
303 bb62d52e Klaus Aehlig
-- atomicModifyIORef. The answer is True if the job could be removed
304 bb62d52e Klaus Aehlig
-- before being handed over to execution, False if it already was started
305 bb62d52e Klaus Aehlig
-- and a Bad result if the job is not found in the queue.
306 bb62d52e Klaus Aehlig
rmJob :: JobId -> Queue -> (Queue, Result Bool)
307 bb62d52e Klaus Aehlig
rmJob jid q =
308 bb62d52e Klaus Aehlig
  let isJid = (jid ==) . qjId . jJob
309 bb62d52e Klaus Aehlig
      (found, queued') = partition isJid $ qEnqueued q
310 bb62d52e Klaus Aehlig
  in if null found
311 bb62d52e Klaus Aehlig
       then if any isJid $ qRunning q
312 bb62d52e Klaus Aehlig
              then (q, Ok False)
313 bb62d52e Klaus Aehlig
              else (q, Bad $ "Job " ++ show (fromJobId jid)
314 bb62d52e Klaus Aehlig
                              ++ " unknown to the queue")
315 bb62d52e Klaus Aehlig
       else (q {qEnqueued = queued'}, Ok True)
316 bb62d52e Klaus Aehlig
317 bb62d52e Klaus Aehlig
-- | Try to remove a queued job from the job queue. Return True, if
318 bb62d52e Klaus Aehlig
-- the job could be removed from the queue before being handed over
319 bb62d52e Klaus Aehlig
-- to execution, False if the job already started, and a Bad result
320 bb62d52e Klaus Aehlig
-- if the job is unknown.
321 bb62d52e Klaus Aehlig
dequeueJob :: JQStatus -> JobId -> IO (Result Bool)
322 bb62d52e Klaus Aehlig
dequeueJob state jid = do
323 bb62d52e Klaus Aehlig
  result <- atomicModifyIORef (jqJobs state) $ rmJob jid
324 bb62d52e Klaus Aehlig
  logDebug $ "Result of dequeing job " ++ show (fromJobId jid)
325 bb62d52e Klaus Aehlig
              ++ " is " ++ show result
326 bb62d52e Klaus Aehlig
  return result