Statistics
| Branch: | Tag: | Revision:

root / src / Ganeti / JQScheduler.hs @ 71dc39a1

History | View | Annotate | Download (13.5 kB)

1 48e4da5c Klaus Aehlig
{-| Implementation of a reader for the job queue.
2 48e4da5c Klaus Aehlig
3 48e4da5c Klaus Aehlig
-}
4 48e4da5c Klaus Aehlig
5 48e4da5c Klaus Aehlig
{-
6 48e4da5c Klaus Aehlig
7 48e4da5c Klaus Aehlig
Copyright (C) 2013 Google Inc.
8 48e4da5c Klaus Aehlig
9 48e4da5c Klaus Aehlig
This program is free software; you can redistribute it and/or modify
10 48e4da5c Klaus Aehlig
it under the terms of the GNU General Public License as published by
11 48e4da5c Klaus Aehlig
the Free Software Foundation; either version 2 of the License, or
12 48e4da5c Klaus Aehlig
(at your option) any later version.
13 48e4da5c Klaus Aehlig
14 48e4da5c Klaus Aehlig
This program is distributed in the hope that it will be useful, but
15 48e4da5c Klaus Aehlig
WITHOUT ANY WARRANTY; without even the implied warranty of
16 48e4da5c Klaus Aehlig
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
17 48e4da5c Klaus Aehlig
General Public License for more details.
18 48e4da5c Klaus Aehlig
19 48e4da5c Klaus Aehlig
You should have received a copy of the GNU General Public License
20 48e4da5c Klaus Aehlig
along with this program; if not, write to the Free Software
21 48e4da5c Klaus Aehlig
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 48e4da5c Klaus Aehlig
02110-1301, USA.
23 48e4da5c Klaus Aehlig
24 48e4da5c Klaus Aehlig
-}
25 48e4da5c Klaus Aehlig
26 48e4da5c Klaus Aehlig
module Ganeti.JQScheduler
27 48e4da5c Klaus Aehlig
  ( JQStatus
28 48e4da5c Klaus Aehlig
  , emptyJQStatus
29 48e4da5c Klaus Aehlig
  , initJQScheduler
30 48e4da5c Klaus Aehlig
  , enqueueNewJobs
31 bb62d52e Klaus Aehlig
  , dequeueJob
32 96d55b50 Klaus Aehlig
  , setJobPriority
33 48e4da5c Klaus Aehlig
  ) where
34 48e4da5c Klaus Aehlig
35 48e4da5c Klaus Aehlig
import Control.Arrow
36 48e4da5c Klaus Aehlig
import Control.Concurrent
37 1c532d2d Klaus Aehlig
import Control.Exception
38 48e4da5c Klaus Aehlig
import Control.Monad
39 96d55b50 Klaus Aehlig
import Control.Monad.IO.Class
40 f7743189 Klaus Aehlig
import Data.Function (on)
41 48e4da5c Klaus Aehlig
import Data.List
42 b81650b0 Klaus Aehlig
import Data.Maybe
43 48e4da5c Klaus Aehlig
import Data.IORef
44 ed6cf449 Klaus Aehlig
import System.INotify
45 48e4da5c Klaus Aehlig
46 48e4da5c Klaus Aehlig
import Ganeti.BasicTypes
47 48e4da5c Klaus Aehlig
import Ganeti.Constants as C
48 48e4da5c Klaus Aehlig
import Ganeti.JQueue as JQ
49 48e4da5c Klaus Aehlig
import Ganeti.Logging
50 d9dd04b1 Klaus Aehlig
import Ganeti.Objects
51 48e4da5c Klaus Aehlig
import Ganeti.Path
52 48e4da5c Klaus Aehlig
import Ganeti.Types
53 48e4da5c Klaus Aehlig
import Ganeti.Utils
54 48e4da5c Klaus Aehlig
55 ed6cf449 Klaus Aehlig
data JobWithStat = JobWithStat { jINotify :: Maybe INotify
56 ed6cf449 Klaus Aehlig
                               , jStat :: FStat
57 ed6cf449 Klaus Aehlig
                               , jJob :: QueuedJob
58 ed6cf449 Klaus Aehlig
                               }
59 48e4da5c Klaus Aehlig
data Queue = Queue { qEnqueued :: [JobWithStat], qRunning :: [JobWithStat] }
60 48e4da5c Klaus Aehlig
61 48e4da5c Klaus Aehlig
{-| Representation of the job queue
62 48e4da5c Klaus Aehlig
63 48e4da5c Klaus Aehlig
We keep two lists of jobs (together with information about the last
64 48e4da5c Klaus Aehlig
fstat result observed): the jobs that are enqueued, but not yet handed
65 48e4da5c Klaus Aehlig
over for execution, and the jobs already handed over for execution. They
66 48e4da5c Klaus Aehlig
are kept together in a single IORef, so that we can atomically update
67 48e4da5c Klaus Aehlig
both, in particular when scheduling jobs to be handed over for execution.
68 48e4da5c Klaus Aehlig
69 48e4da5c Klaus Aehlig
-}
70 48e4da5c Klaus Aehlig
71 48e4da5c Klaus Aehlig
data JQStatus = JQStatus
72 48e4da5c Klaus Aehlig
  { jqJobs :: IORef Queue
73 6046dca9 Klaus Aehlig
  , jqConfig :: IORef (Result ConfigData)
74 48e4da5c Klaus Aehlig
  }
75 48e4da5c Klaus Aehlig
76 48e4da5c Klaus Aehlig
77 6046dca9 Klaus Aehlig
emptyJQStatus :: IORef (Result ConfigData) -> IO JQStatus
78 6046dca9 Klaus Aehlig
emptyJQStatus config = do
79 6046dca9 Klaus Aehlig
  jqJ <- newIORef Queue { qEnqueued = [], qRunning = []}
80 6046dca9 Klaus Aehlig
  return JQStatus { jqJobs = jqJ, jqConfig = config }
81 48e4da5c Klaus Aehlig
82 48e4da5c Klaus Aehlig
-- | Apply a function on the running jobs.
83 48e4da5c Klaus Aehlig
onRunningJobs :: ([JobWithStat] -> [JobWithStat]) -> Queue -> Queue
84 48e4da5c Klaus Aehlig
onRunningJobs f queue = queue {qRunning=f $ qRunning queue}
85 48e4da5c Klaus Aehlig
86 48e4da5c Klaus Aehlig
-- | Apply a function on the queued jobs.
87 48e4da5c Klaus Aehlig
onQueuedJobs :: ([JobWithStat] -> [JobWithStat]) -> Queue -> Queue
88 48e4da5c Klaus Aehlig
onQueuedJobs f queue = queue {qEnqueued=f $ qEnqueued queue}
89 48e4da5c Klaus Aehlig
90 48e4da5c Klaus Aehlig
-- | Obtain a JobWithStat from a QueuedJob.
91 48e4da5c Klaus Aehlig
unreadJob :: QueuedJob -> JobWithStat
92 ed6cf449 Klaus Aehlig
unreadJob job = JobWithStat {jJob=job, jStat=nullFStat, jINotify=Nothing}
93 48e4da5c Klaus Aehlig
94 48e4da5c Klaus Aehlig
-- | Reload interval for polling the running jobs for updates in microseconds.
95 48e4da5c Klaus Aehlig
watchInterval :: Int
96 48e4da5c Klaus Aehlig
watchInterval = C.luxidJobqueuePollInterval * 1000000 
97 48e4da5c Klaus Aehlig
98 d9dd04b1 Klaus Aehlig
-- | Get the maximual number of jobs to be run simultaneously from the
99 d9dd04b1 Klaus Aehlig
-- configuration. If the configuration is not available, be conservative
100 d9dd04b1 Klaus Aehlig
-- and use the smallest possible value, i.e., 1.
101 d9dd04b1 Klaus Aehlig
getMaxRunningJobs :: JQStatus -> IO Int
102 d9dd04b1 Klaus Aehlig
getMaxRunningJobs =
103 d9dd04b1 Klaus Aehlig
  liftM (genericResult (const 1) (clusterMaxRunningJobs . configCluster))
104 d9dd04b1 Klaus Aehlig
  . readIORef . jqConfig
105 48e4da5c Klaus Aehlig
106 48e4da5c Klaus Aehlig
-- | Wrapper function to atomically update the jobs in the queue status.
107 48e4da5c Klaus Aehlig
modifyJobs :: JQStatus -> (Queue -> Queue) -> IO ()
108 48e4da5c Klaus Aehlig
modifyJobs qstat f = atomicModifyIORef (jqJobs qstat) (flip (,) ()  . f)
109 48e4da5c Klaus Aehlig
110 48e4da5c Klaus Aehlig
-- | Reread a job from disk, if the file has changed.
111 48e4da5c Klaus Aehlig
readJobStatus :: JobWithStat -> IO (Maybe JobWithStat)
112 ed6cf449 Klaus Aehlig
readJobStatus jWS@(JobWithStat {jStat=fstat, jJob=job})  = do
113 48e4da5c Klaus Aehlig
  let jid = qjId job
114 48e4da5c Klaus Aehlig
  qdir <- queueDir
115 48e4da5c Klaus Aehlig
  let fpath = liveJobFile qdir jid
116 48e4da5c Klaus Aehlig
  logDebug $ "Checking if " ++ fpath ++ " changed on disk."
117 350f0759 Klaus Aehlig
  changedResult <- try $ needsReload fstat fpath
118 350f0759 Klaus Aehlig
                   :: IO (Either IOError (Maybe FStat))
119 350f0759 Klaus Aehlig
  let changed = either (const $ Just nullFStat) id changedResult
120 48e4da5c Klaus Aehlig
  case changed of
121 48e4da5c Klaus Aehlig
    Nothing -> do
122 48e4da5c Klaus Aehlig
      logDebug $ "File " ++ fpath ++ " not changed on disk."
123 48e4da5c Klaus Aehlig
      return Nothing
124 48e4da5c Klaus Aehlig
    Just fstat' -> do
125 48e4da5c Klaus Aehlig
      let jids = show $ fromJobId jid
126 350f0759 Klaus Aehlig
      logInfo $ "Rereading job "  ++ jids
127 350f0759 Klaus Aehlig
      readResult <- loadJobFromDisk qdir True jid
128 48e4da5c Klaus Aehlig
      case readResult of
129 48e4da5c Klaus Aehlig
        Bad s -> do
130 48e4da5c Klaus Aehlig
          logWarning $ "Failed to read job " ++ jids ++ ": " ++ s
131 48e4da5c Klaus Aehlig
          return Nothing
132 48e4da5c Klaus Aehlig
        Ok (job', _) -> do
133 48e4da5c Klaus Aehlig
          logDebug
134 48e4da5c Klaus Aehlig
            $ "Read job " ++ jids ++ ", staus is " ++ show (calcJobStatus job')
135 ed6cf449 Klaus Aehlig
          return . Just $ jWS {jStat=fstat', jJob=job'}
136 ed6cf449 Klaus Aehlig
                          -- jINotify unchanged
137 48e4da5c Klaus Aehlig
138 48e4da5c Klaus Aehlig
-- | Update a job in the job queue, if it is still there. This is the
139 48e4da5c Klaus Aehlig
-- pure function for inserting a previously read change into the queue.
140 48e4da5c Klaus Aehlig
-- as the change contains its time stamp, we don't have to worry about a
141 48e4da5c Klaus Aehlig
-- later read change overwriting a newer read state. If this happens, the
142 48e4da5c Klaus Aehlig
-- fstat value will be outdated, so the next poller run will fix this.
143 48e4da5c Klaus Aehlig
updateJobStatus :: JobWithStat -> [JobWithStat] -> [JobWithStat]
144 48e4da5c Klaus Aehlig
updateJobStatus job' =
145 48e4da5c Klaus Aehlig
  let jid = qjId $ jJob job' in
146 48e4da5c Klaus Aehlig
  map (\job -> if qjId (jJob job) == jid then job' else job)
147 48e4da5c Klaus Aehlig
148 48e4da5c Klaus Aehlig
-- | Update a single job by reading it from disk, if necessary.
149 48e4da5c Klaus Aehlig
updateJob :: JQStatus -> JobWithStat -> IO ()
150 48e4da5c Klaus Aehlig
updateJob state jb = do
151 48e4da5c Klaus Aehlig
  jb' <- readJobStatus jb
152 48e4da5c Klaus Aehlig
  maybe (return ()) (modifyJobs state . onRunningJobs . updateJobStatus) jb'
153 ea174b21 Klaus Aehlig
  when (maybe True (jobFinalized . jJob) jb') . (>> return ()) . forkIO $ do
154 ea174b21 Klaus Aehlig
    logDebug "Scheduler noticed a job to have finished."
155 ea174b21 Klaus Aehlig
    cleanupFinishedJobs state
156 ea174b21 Klaus Aehlig
    scheduleSomeJobs state
157 48e4da5c Klaus Aehlig
158 48e4da5c Klaus Aehlig
-- | Sort out the finished jobs from the monitored part of the queue.
159 48e4da5c Klaus Aehlig
-- This is the pure part, splitting the queue into a remaining queue
160 48e4da5c Klaus Aehlig
-- and the jobs that were removed.
161 a2977f53 Klaus Aehlig
sortoutFinishedJobs :: Queue -> (Queue, [JobWithStat])
162 48e4da5c Klaus Aehlig
sortoutFinishedJobs queue =
163 1b3bde96 Klaus Aehlig
  let (fin, run') = partition (jobFinalized . jJob) . qRunning $ queue
164 a2977f53 Klaus Aehlig
  in (queue {qRunning=run'}, fin)
165 48e4da5c Klaus Aehlig
166 48e4da5c Klaus Aehlig
-- | Actually clean up the finished jobs. This is the IO wrapper around
167 48e4da5c Klaus Aehlig
-- the pure `sortoutFinishedJobs`.
168 48e4da5c Klaus Aehlig
cleanupFinishedJobs :: JQStatus -> IO ()
169 48e4da5c Klaus Aehlig
cleanupFinishedJobs qstate = do
170 48e4da5c Klaus Aehlig
  finished <- atomicModifyIORef (jqJobs qstate) sortoutFinishedJobs
171 a2977f53 Klaus Aehlig
  let showJob = show . ((fromJobId . qjId) &&& calcJobStatus) . jJob
172 48e4da5c Klaus Aehlig
      jlist = commaJoin $ map showJob finished
173 48e4da5c Klaus Aehlig
  unless (null finished)
174 48e4da5c Klaus Aehlig
    . logInfo $ "Finished jobs: " ++ jlist
175 cc5ab470 Klaus Aehlig
  mapM_ (maybe (return ()) killINotify . jINotify) finished
176 48e4da5c Klaus Aehlig
177 b81650b0 Klaus Aehlig
-- | Watcher task for a job, to update it on file changes. It also
178 b81650b0 Klaus Aehlig
-- reinstantiates itself upon receiving an Ignored event.
179 b81650b0 Klaus Aehlig
jobWatcher :: JQStatus -> JobWithStat -> Event -> IO ()
180 b81650b0 Klaus Aehlig
jobWatcher state jWS e = do
181 b81650b0 Klaus Aehlig
  let jid = qjId $ jJob jWS
182 b81650b0 Klaus Aehlig
      jids = show $ fromJobId jid
183 b81650b0 Klaus Aehlig
  logInfo $ "Scheduler notified of change of job " ++ jids
184 b81650b0 Klaus Aehlig
  logDebug $ "Scheulder notify event for " ++ jids ++ ": " ++ show e
185 b81650b0 Klaus Aehlig
  let inotify = jINotify jWS
186 b81650b0 Klaus Aehlig
  when (e == Ignored  && isJust inotify) $ do
187 b81650b0 Klaus Aehlig
    qdir <- queueDir
188 b81650b0 Klaus Aehlig
    let fpath = liveJobFile qdir jid
189 b81650b0 Klaus Aehlig
    _ <- addWatch (fromJust inotify) [Modify, Delete] fpath
190 b81650b0 Klaus Aehlig
           (jobWatcher state jWS)
191 b81650b0 Klaus Aehlig
    return ()
192 b81650b0 Klaus Aehlig
  updateJob state jWS
193 b81650b0 Klaus Aehlig
194 b81650b0 Klaus Aehlig
-- | Attach the job watcher to a running job.
195 b81650b0 Klaus Aehlig
attachWatcher :: JQStatus -> JobWithStat -> IO ()
196 b81650b0 Klaus Aehlig
attachWatcher state jWS = when (isNothing $ jINotify jWS) $ do
197 b81650b0 Klaus Aehlig
  inotify <- initINotify
198 b81650b0 Klaus Aehlig
  qdir <- queueDir
199 b81650b0 Klaus Aehlig
  let fpath = liveJobFile qdir . qjId $ jJob jWS
200 b81650b0 Klaus Aehlig
      jWS' = jWS { jINotify=Just inotify }
201 b81650b0 Klaus Aehlig
  logDebug $ "Attaching queue watcher for " ++ fpath
202 b81650b0 Klaus Aehlig
  _ <- addWatch inotify [Modify, Delete] fpath $ jobWatcher state jWS'
203 b81650b0 Klaus Aehlig
  modifyJobs state . onRunningJobs $ updateJobStatus jWS'
204 b81650b0 Klaus Aehlig
205 48e4da5c Klaus Aehlig
-- | Decide on which jobs to schedule next for execution. This is the
206 48e4da5c Klaus Aehlig
-- pure function doing the scheduling.
207 d9dd04b1 Klaus Aehlig
selectJobsToRun :: Int -> Queue -> (Queue, [JobWithStat])
208 d9dd04b1 Klaus Aehlig
selectJobsToRun count queue =
209 d9dd04b1 Klaus Aehlig
  let n = count - length (qRunning queue)
210 48e4da5c Klaus Aehlig
      (chosen, remain) = splitAt n (qEnqueued queue)
211 a2977f53 Klaus Aehlig
  in (queue {qEnqueued=remain, qRunning=qRunning queue ++ chosen}, chosen)
212 48e4da5c Klaus Aehlig
213 1c532d2d Klaus Aehlig
-- | Requeue jobs that were previously selected for execution
214 1c532d2d Klaus Aehlig
-- but couldn't be started.
215 a2977f53 Klaus Aehlig
requeueJobs :: JQStatus -> [JobWithStat] -> IOError -> IO ()
216 1c532d2d Klaus Aehlig
requeueJobs qstate jobs err = do
217 a2977f53 Klaus Aehlig
  let jids = map (qjId . jJob) jobs
218 1c532d2d Klaus Aehlig
      jidsString = commaJoin $ map (show . fromJobId) jids
219 1c532d2d Klaus Aehlig
      rmJobs = filter ((`notElem` jids) . qjId . jJob)
220 1c532d2d Klaus Aehlig
  logWarning $ "Starting jobs failed: " ++ show err
221 1c532d2d Klaus Aehlig
  logWarning $ "Rescheduling jobs: " ++ jidsString
222 1c532d2d Klaus Aehlig
  modifyJobs qstate (onRunningJobs rmJobs)
223 a2977f53 Klaus Aehlig
  modifyJobs qstate (onQueuedJobs $ (++) jobs)
224 1c532d2d Klaus Aehlig
225 48e4da5c Klaus Aehlig
-- | Schedule jobs to be run. This is the IO wrapper around the
226 48e4da5c Klaus Aehlig
-- pure `selectJobsToRun`.
227 48e4da5c Klaus Aehlig
scheduleSomeJobs :: JQStatus -> IO ()
228 48e4da5c Klaus Aehlig
scheduleSomeJobs qstate = do
229 d9dd04b1 Klaus Aehlig
  count <- getMaxRunningJobs qstate
230 d9dd04b1 Klaus Aehlig
  chosen <- atomicModifyIORef (jqJobs qstate) (selectJobsToRun count)
231 a2977f53 Klaus Aehlig
  let jobs = map jJob chosen
232 7dd21737 Klaus Aehlig
  unless (null chosen) . logInfo . (++) "Starting jobs: " . commaJoin
233 a2977f53 Klaus Aehlig
    $ map (show . fromJobId . qjId) jobs
234 b81650b0 Klaus Aehlig
  mapM_ (attachWatcher qstate) chosen
235 a2977f53 Klaus Aehlig
  result <- try $ JQ.startJobs jobs
236 1c532d2d Klaus Aehlig
  either (requeueJobs qstate chosen) return result
237 48e4da5c Klaus Aehlig
238 48e4da5c Klaus Aehlig
-- | Format the job queue status in a compact, human readable way.
239 48e4da5c Klaus Aehlig
showQueue :: Queue -> String
240 48e4da5c Klaus Aehlig
showQueue (Queue {qEnqueued=waiting, qRunning=running}) =
241 48e4da5c Klaus Aehlig
  let showids = show . map (fromJobId . qjId . jJob)
242 48e4da5c Klaus Aehlig
  in "Waiting jobs: " ++ showids waiting 
243 48e4da5c Klaus Aehlig
       ++ "; running jobs: " ++ showids running
244 48e4da5c Klaus Aehlig
245 48e4da5c Klaus Aehlig
-- | Time-based watcher for updating the job queue.
246 48e4da5c Klaus Aehlig
onTimeWatcher :: JQStatus -> IO ()
247 48e4da5c Klaus Aehlig
onTimeWatcher qstate = forever $ do
248 48e4da5c Klaus Aehlig
  threadDelay watchInterval
249 fe50bb65 Klaus Aehlig
  logDebug "Job queue watcher timer fired"
250 48e4da5c Klaus Aehlig
  jobs <- readIORef (jqJobs qstate)
251 48e4da5c Klaus Aehlig
  mapM_ (updateJob qstate) $ qRunning jobs
252 48e4da5c Klaus Aehlig
  cleanupFinishedJobs qstate
253 48e4da5c Klaus Aehlig
  jobs' <- readIORef (jqJobs qstate)
254 48e4da5c Klaus Aehlig
  logInfo $ showQueue jobs'
255 48e4da5c Klaus Aehlig
  scheduleSomeJobs qstate
256 48e4da5c Klaus Aehlig
257 2713b91a Klaus Aehlig
-- | Read a single, non-archived, job, specified by its id, from disk.
258 2713b91a Klaus Aehlig
readJobFromDisk :: JobId -> IO (Result JobWithStat)
259 2713b91a Klaus Aehlig
readJobFromDisk jid = do
260 2713b91a Klaus Aehlig
  qdir <- queueDir
261 2713b91a Klaus Aehlig
  let fpath = liveJobFile qdir jid
262 2713b91a Klaus Aehlig
  logDebug $ "Reading " ++ fpath
263 2713b91a Klaus Aehlig
  tryFstat <- try $ getFStat fpath :: IO (Either IOError FStat)
264 2713b91a Klaus Aehlig
  let fstat = either (const nullFStat) id tryFstat
265 2713b91a Klaus Aehlig
  loadResult <- JQ.loadJobFromDisk qdir False jid
266 ed6cf449 Klaus Aehlig
  return $ liftM (JobWithStat Nothing fstat . fst) loadResult
267 2713b91a Klaus Aehlig
268 2713b91a Klaus Aehlig
-- | Read all non-finalized jobs from disk.
269 2713b91a Klaus Aehlig
readJobsFromDisk :: IO [JobWithStat]
270 2713b91a Klaus Aehlig
readJobsFromDisk = do
271 2713b91a Klaus Aehlig
  logInfo "Loading job queue"
272 2713b91a Klaus Aehlig
  qdir <- queueDir
273 2713b91a Klaus Aehlig
  eitherJids <- JQ.getJobIDs [qdir]
274 ea7032da Petr Pudlak
  let jids = genericResult (const []) JQ.sortJobIDs eitherJids
275 2713b91a Klaus Aehlig
      jidsstring = commaJoin $ map (show . fromJobId) jids
276 2713b91a Klaus Aehlig
  logInfo $ "Non-archived jobs on disk: " ++ jidsstring
277 2713b91a Klaus Aehlig
  jobs <- mapM readJobFromDisk jids
278 2713b91a Klaus Aehlig
  return $ justOk jobs
279 2713b91a Klaus Aehlig
280 48e4da5c Klaus Aehlig
-- | Set up the job scheduler. This will also start the monitoring
281 48e4da5c Klaus Aehlig
-- of changes to the running jobs.
282 48e4da5c Klaus Aehlig
initJQScheduler :: JQStatus -> IO ()
283 48e4da5c Klaus Aehlig
initJQScheduler qstate = do
284 2713b91a Klaus Aehlig
  alljobs <- readJobsFromDisk
285 2713b91a Klaus Aehlig
  let jobs = filter (not . jobFinalized . jJob) alljobs
286 2713b91a Klaus Aehlig
      (running, queued) = partition (jobStarted . jJob) jobs
287 2713b91a Klaus Aehlig
  modifyJobs qstate (onQueuedJobs (++ queued) . onRunningJobs (++ running))
288 2713b91a Klaus Aehlig
  jqjobs <- readIORef (jqJobs qstate)
289 2713b91a Klaus Aehlig
  logInfo $ showQueue jqjobs
290 2713b91a Klaus Aehlig
  scheduleSomeJobs qstate
291 48e4da5c Klaus Aehlig
  logInfo "Starting time-based job queue watcher"
292 48e4da5c Klaus Aehlig
  _ <- forkIO $ onTimeWatcher qstate
293 48e4da5c Klaus Aehlig
  return ()
294 48e4da5c Klaus Aehlig
295 48e4da5c Klaus Aehlig
-- | Enqueue new jobs. This will guarantee that the jobs will be executed
296 48e4da5c Klaus Aehlig
-- eventually.
297 48e4da5c Klaus Aehlig
enqueueNewJobs :: JQStatus -> [QueuedJob] -> IO ()
298 48e4da5c Klaus Aehlig
enqueueNewJobs state jobs = do
299 48e4da5c Klaus Aehlig
  logInfo . (++) "New jobs enqueued: " . commaJoin
300 48e4da5c Klaus Aehlig
    $ map (show . fromJobId . qjId) jobs
301 48e4da5c Klaus Aehlig
  let jobs' = map unreadJob jobs
302 f7743189 Klaus Aehlig
      insertFn = insertBy (compare `on` fromJobId . qjId . jJob)
303 f7743189 Klaus Aehlig
      addJobs oldjobs = foldl (flip insertFn) oldjobs jobs'
304 f7743189 Klaus Aehlig
  modifyJobs state (onQueuedJobs addJobs)
305 48e4da5c Klaus Aehlig
  scheduleSomeJobs state
306 bb62d52e Klaus Aehlig
307 bb62d52e Klaus Aehlig
-- | Pure function for removing a queued job from the job queue by
308 155df343 Klaus Aehlig
-- atomicModifyIORef. The answer is Just the job if the job could be removed
309 155df343 Klaus Aehlig
-- before being handed over to execution, Nothing if it already was started
310 bb62d52e Klaus Aehlig
-- and a Bad result if the job is not found in the queue.
311 155df343 Klaus Aehlig
rmJob :: JobId -> Queue -> (Queue, Result (Maybe QueuedJob))
312 bb62d52e Klaus Aehlig
rmJob jid q =
313 bb62d52e Klaus Aehlig
  let isJid = (jid ==) . qjId . jJob
314 bb62d52e Klaus Aehlig
      (found, queued') = partition isJid $ qEnqueued q
315 155df343 Klaus Aehlig
      isRunning = any isJid $ qRunning q
316 155df343 Klaus Aehlig
      sJid = (++) "Job " . show $ fromJobId jid
317 155df343 Klaus Aehlig
  in case (found, isRunning) of
318 155df343 Klaus Aehlig
    ([job], _) -> (q {qEnqueued = queued'}, Ok . Just $ jJob job)
319 155df343 Klaus Aehlig
    (_:_, _) -> (q, Bad $ "Queue in inconsistent state."
320 155df343 Klaus Aehlig
                           ++ sJid ++ " queued multiple times")
321 155df343 Klaus Aehlig
    (_, True) -> (q, Ok Nothing)
322 155df343 Klaus Aehlig
    _ -> (q, Bad $ sJid ++ " not found in queue")
323 bb62d52e Klaus Aehlig
324 bb62d52e Klaus Aehlig
-- | Try to remove a queued job from the job queue. Return True, if
325 bb62d52e Klaus Aehlig
-- the job could be removed from the queue before being handed over
326 bb62d52e Klaus Aehlig
-- to execution, False if the job already started, and a Bad result
327 bb62d52e Klaus Aehlig
-- if the job is unknown.
328 bb62d52e Klaus Aehlig
dequeueJob :: JQStatus -> JobId -> IO (Result Bool)
329 bb62d52e Klaus Aehlig
dequeueJob state jid = do
330 bb62d52e Klaus Aehlig
  result <- atomicModifyIORef (jqJobs state) $ rmJob jid
331 155df343 Klaus Aehlig
  let result' = fmap isJust result
332 bb62d52e Klaus Aehlig
  logDebug $ "Result of dequeing job " ++ show (fromJobId jid)
333 155df343 Klaus Aehlig
              ++ " is " ++ show result'
334 155df343 Klaus Aehlig
  return result'
335 96d55b50 Klaus Aehlig
336 96d55b50 Klaus Aehlig
-- | Change the priority of a queued job (once the job is handed over
337 96d55b50 Klaus Aehlig
-- to execution, the job itself needs to be informed). To avoid the
338 96d55b50 Klaus Aehlig
-- job being started unmodified, it is temporarily unqueued during the
339 96d55b50 Klaus Aehlig
-- change. Return the modified job, if the job's priority was sucessfully
340 96d55b50 Klaus Aehlig
-- modified, Nothing, if the job already started, and a Bad value, if the job
341 96d55b50 Klaus Aehlig
-- is unkown.
342 96d55b50 Klaus Aehlig
setJobPriority :: JQStatus -> JobId -> Int -> IO (Result (Maybe QueuedJob))
343 96d55b50 Klaus Aehlig
setJobPriority state jid prio = runResultT $ do
344 96d55b50 Klaus Aehlig
  maybeJob <- mkResultT . atomicModifyIORef (jqJobs state) $ rmJob jid
345 96d55b50 Klaus Aehlig
  case maybeJob of
346 96d55b50 Klaus Aehlig
    Nothing -> return Nothing
347 96d55b50 Klaus Aehlig
    Just job -> do
348 96d55b50 Klaus Aehlig
      let job' = changeJobPriority prio job
349 96d55b50 Klaus Aehlig
      qDir <- liftIO queueDir
350 96d55b50 Klaus Aehlig
      mkResultT $ writeJobToDisk qDir job'
351 96d55b50 Klaus Aehlig
      liftIO $ enqueueNewJobs state [job']
352 96d55b50 Klaus Aehlig
      return $ Just job'