root / src / Ganeti / JQScheduler.hs @ 71dc39a1
History | View | Annotate | Download (13.5 kB)
1 | 48e4da5c | Klaus Aehlig | {-| Implementation of a reader for the job queue. |
---|---|---|---|
2 | 48e4da5c | Klaus Aehlig | |
3 | 48e4da5c | Klaus Aehlig | -} |
4 | 48e4da5c | Klaus Aehlig | |
5 | 48e4da5c | Klaus Aehlig | {- |
6 | 48e4da5c | Klaus Aehlig | |
7 | 48e4da5c | Klaus Aehlig | Copyright (C) 2013 Google Inc. |
8 | 48e4da5c | Klaus Aehlig | |
9 | 48e4da5c | Klaus Aehlig | This program is free software; you can redistribute it and/or modify |
10 | 48e4da5c | Klaus Aehlig | it under the terms of the GNU General Public License as published by |
11 | 48e4da5c | Klaus Aehlig | the Free Software Foundation; either version 2 of the License, or |
12 | 48e4da5c | Klaus Aehlig | (at your option) any later version. |
13 | 48e4da5c | Klaus Aehlig | |
14 | 48e4da5c | Klaus Aehlig | This program is distributed in the hope that it will be useful, but |
15 | 48e4da5c | Klaus Aehlig | WITHOUT ANY WARRANTY; without even the implied warranty of |
16 | 48e4da5c | Klaus Aehlig | MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU |
17 | 48e4da5c | Klaus Aehlig | General Public License for more details. |
18 | 48e4da5c | Klaus Aehlig | |
19 | 48e4da5c | Klaus Aehlig | You should have received a copy of the GNU General Public License |
20 | 48e4da5c | Klaus Aehlig | along with this program; if not, write to the Free Software |
21 | 48e4da5c | Klaus Aehlig | Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA |
22 | 48e4da5c | Klaus Aehlig | 02110-1301, USA. |
23 | 48e4da5c | Klaus Aehlig | |
24 | 48e4da5c | Klaus Aehlig | -} |
25 | 48e4da5c | Klaus Aehlig | |
26 | 48e4da5c | Klaus Aehlig | module Ganeti.JQScheduler |
27 | 48e4da5c | Klaus Aehlig | ( JQStatus |
28 | 48e4da5c | Klaus Aehlig | , emptyJQStatus |
29 | 48e4da5c | Klaus Aehlig | , initJQScheduler |
30 | 48e4da5c | Klaus Aehlig | , enqueueNewJobs |
31 | bb62d52e | Klaus Aehlig | , dequeueJob |
32 | 96d55b50 | Klaus Aehlig | , setJobPriority |
33 | 48e4da5c | Klaus Aehlig | ) where |
34 | 48e4da5c | Klaus Aehlig | |
35 | 48e4da5c | Klaus Aehlig | import Control.Arrow |
36 | 48e4da5c | Klaus Aehlig | import Control.Concurrent |
37 | 1c532d2d | Klaus Aehlig | import Control.Exception |
38 | 48e4da5c | Klaus Aehlig | import Control.Monad |
39 | 96d55b50 | Klaus Aehlig | import Control.Monad.IO.Class |
40 | f7743189 | Klaus Aehlig | import Data.Function (on) |
41 | 48e4da5c | Klaus Aehlig | import Data.List |
42 | b81650b0 | Klaus Aehlig | import Data.Maybe |
43 | 48e4da5c | Klaus Aehlig | import Data.IORef |
44 | ed6cf449 | Klaus Aehlig | import System.INotify |
45 | 48e4da5c | Klaus Aehlig | |
46 | 48e4da5c | Klaus Aehlig | import Ganeti.BasicTypes |
47 | 48e4da5c | Klaus Aehlig | import Ganeti.Constants as C |
48 | 48e4da5c | Klaus Aehlig | import Ganeti.JQueue as JQ |
49 | 48e4da5c | Klaus Aehlig | import Ganeti.Logging |
50 | d9dd04b1 | Klaus Aehlig | import Ganeti.Objects |
51 | 48e4da5c | Klaus Aehlig | import Ganeti.Path |
52 | 48e4da5c | Klaus Aehlig | import Ganeti.Types |
53 | 48e4da5c | Klaus Aehlig | import Ganeti.Utils |
54 | 48e4da5c | Klaus Aehlig | |
55 | ed6cf449 | Klaus Aehlig | data JobWithStat = JobWithStat { jINotify :: Maybe INotify |
56 | ed6cf449 | Klaus Aehlig | , jStat :: FStat |
57 | ed6cf449 | Klaus Aehlig | , jJob :: QueuedJob |
58 | ed6cf449 | Klaus Aehlig | } |
59 | 48e4da5c | Klaus Aehlig | data Queue = Queue { qEnqueued :: [JobWithStat], qRunning :: [JobWithStat] } |
60 | 48e4da5c | Klaus Aehlig | |
61 | 48e4da5c | Klaus Aehlig | {-| Representation of the job queue |
62 | 48e4da5c | Klaus Aehlig | |
63 | 48e4da5c | Klaus Aehlig | We keep two lists of jobs (together with information about the last |
64 | 48e4da5c | Klaus Aehlig | fstat result observed): the jobs that are enqueued, but not yet handed |
65 | 48e4da5c | Klaus Aehlig | over for execution, and the jobs already handed over for execution. They |
66 | 48e4da5c | Klaus Aehlig | are kept together in a single IORef, so that we can atomically update |
67 | 48e4da5c | Klaus Aehlig | both, in particular when scheduling jobs to be handed over for execution. |
68 | 48e4da5c | Klaus Aehlig | |
69 | 48e4da5c | Klaus Aehlig | -} |
70 | 48e4da5c | Klaus Aehlig | |
71 | 48e4da5c | Klaus Aehlig | data JQStatus = JQStatus |
72 | 48e4da5c | Klaus Aehlig | { jqJobs :: IORef Queue |
73 | 6046dca9 | Klaus Aehlig | , jqConfig :: IORef (Result ConfigData) |
74 | 48e4da5c | Klaus Aehlig | } |
75 | 48e4da5c | Klaus Aehlig | |
76 | 48e4da5c | Klaus Aehlig | |
77 | 6046dca9 | Klaus Aehlig | emptyJQStatus :: IORef (Result ConfigData) -> IO JQStatus |
78 | 6046dca9 | Klaus Aehlig | emptyJQStatus config = do |
79 | 6046dca9 | Klaus Aehlig | jqJ <- newIORef Queue { qEnqueued = [], qRunning = []} |
80 | 6046dca9 | Klaus Aehlig | return JQStatus { jqJobs = jqJ, jqConfig = config } |
81 | 48e4da5c | Klaus Aehlig | |
82 | 48e4da5c | Klaus Aehlig | -- | Apply a function on the running jobs. |
83 | 48e4da5c | Klaus Aehlig | onRunningJobs :: ([JobWithStat] -> [JobWithStat]) -> Queue -> Queue |
84 | 48e4da5c | Klaus Aehlig | onRunningJobs f queue = queue {qRunning=f $ qRunning queue} |
85 | 48e4da5c | Klaus Aehlig | |
86 | 48e4da5c | Klaus Aehlig | -- | Apply a function on the queued jobs. |
87 | 48e4da5c | Klaus Aehlig | onQueuedJobs :: ([JobWithStat] -> [JobWithStat]) -> Queue -> Queue |
88 | 48e4da5c | Klaus Aehlig | onQueuedJobs f queue = queue {qEnqueued=f $ qEnqueued queue} |
89 | 48e4da5c | Klaus Aehlig | |
90 | 48e4da5c | Klaus Aehlig | -- | Obtain a JobWithStat from a QueuedJob. |
91 | 48e4da5c | Klaus Aehlig | unreadJob :: QueuedJob -> JobWithStat |
92 | ed6cf449 | Klaus Aehlig | unreadJob job = JobWithStat {jJob=job, jStat=nullFStat, jINotify=Nothing} |
93 | 48e4da5c | Klaus Aehlig | |
94 | 48e4da5c | Klaus Aehlig | -- | Reload interval for polling the running jobs for updates in microseconds. |
95 | 48e4da5c | Klaus Aehlig | watchInterval :: Int |
96 | 48e4da5c | Klaus Aehlig | watchInterval = C.luxidJobqueuePollInterval * 1000000 |
97 | 48e4da5c | Klaus Aehlig | |
98 | d9dd04b1 | Klaus Aehlig | -- | Get the maximual number of jobs to be run simultaneously from the |
99 | d9dd04b1 | Klaus Aehlig | -- configuration. If the configuration is not available, be conservative |
100 | d9dd04b1 | Klaus Aehlig | -- and use the smallest possible value, i.e., 1. |
101 | d9dd04b1 | Klaus Aehlig | getMaxRunningJobs :: JQStatus -> IO Int |
102 | d9dd04b1 | Klaus Aehlig | getMaxRunningJobs = |
103 | d9dd04b1 | Klaus Aehlig | liftM (genericResult (const 1) (clusterMaxRunningJobs . configCluster)) |
104 | d9dd04b1 | Klaus Aehlig | . readIORef . jqConfig |
105 | 48e4da5c | Klaus Aehlig | |
106 | 48e4da5c | Klaus Aehlig | -- | Wrapper function to atomically update the jobs in the queue status. |
107 | 48e4da5c | Klaus Aehlig | modifyJobs :: JQStatus -> (Queue -> Queue) -> IO () |
108 | 48e4da5c | Klaus Aehlig | modifyJobs qstat f = atomicModifyIORef (jqJobs qstat) (flip (,) () . f) |
109 | 48e4da5c | Klaus Aehlig | |
110 | 48e4da5c | Klaus Aehlig | -- | Reread a job from disk, if the file has changed. |
111 | 48e4da5c | Klaus Aehlig | readJobStatus :: JobWithStat -> IO (Maybe JobWithStat) |
112 | ed6cf449 | Klaus Aehlig | readJobStatus jWS@(JobWithStat {jStat=fstat, jJob=job}) = do |
113 | 48e4da5c | Klaus Aehlig | let jid = qjId job |
114 | 48e4da5c | Klaus Aehlig | qdir <- queueDir |
115 | 48e4da5c | Klaus Aehlig | let fpath = liveJobFile qdir jid |
116 | 48e4da5c | Klaus Aehlig | logDebug $ "Checking if " ++ fpath ++ " changed on disk." |
117 | 350f0759 | Klaus Aehlig | changedResult <- try $ needsReload fstat fpath |
118 | 350f0759 | Klaus Aehlig | :: IO (Either IOError (Maybe FStat)) |
119 | 350f0759 | Klaus Aehlig | let changed = either (const $ Just nullFStat) id changedResult |
120 | 48e4da5c | Klaus Aehlig | case changed of |
121 | 48e4da5c | Klaus Aehlig | Nothing -> do |
122 | 48e4da5c | Klaus Aehlig | logDebug $ "File " ++ fpath ++ " not changed on disk." |
123 | 48e4da5c | Klaus Aehlig | return Nothing |
124 | 48e4da5c | Klaus Aehlig | Just fstat' -> do |
125 | 48e4da5c | Klaus Aehlig | let jids = show $ fromJobId jid |
126 | 350f0759 | Klaus Aehlig | logInfo $ "Rereading job " ++ jids |
127 | 350f0759 | Klaus Aehlig | readResult <- loadJobFromDisk qdir True jid |
128 | 48e4da5c | Klaus Aehlig | case readResult of |
129 | 48e4da5c | Klaus Aehlig | Bad s -> do |
130 | 48e4da5c | Klaus Aehlig | logWarning $ "Failed to read job " ++ jids ++ ": " ++ s |
131 | 48e4da5c | Klaus Aehlig | return Nothing |
132 | 48e4da5c | Klaus Aehlig | Ok (job', _) -> do |
133 | 48e4da5c | Klaus Aehlig | logDebug |
134 | 48e4da5c | Klaus Aehlig | $ "Read job " ++ jids ++ ", staus is " ++ show (calcJobStatus job') |
135 | ed6cf449 | Klaus Aehlig | return . Just $ jWS {jStat=fstat', jJob=job'} |
136 | ed6cf449 | Klaus Aehlig | -- jINotify unchanged |
137 | 48e4da5c | Klaus Aehlig | |
138 | 48e4da5c | Klaus Aehlig | -- | Update a job in the job queue, if it is still there. This is the |
139 | 48e4da5c | Klaus Aehlig | -- pure function for inserting a previously read change into the queue. |
140 | 48e4da5c | Klaus Aehlig | -- as the change contains its time stamp, we don't have to worry about a |
141 | 48e4da5c | Klaus Aehlig | -- later read change overwriting a newer read state. If this happens, the |
142 | 48e4da5c | Klaus Aehlig | -- fstat value will be outdated, so the next poller run will fix this. |
143 | 48e4da5c | Klaus Aehlig | updateJobStatus :: JobWithStat -> [JobWithStat] -> [JobWithStat] |
144 | 48e4da5c | Klaus Aehlig | updateJobStatus job' = |
145 | 48e4da5c | Klaus Aehlig | let jid = qjId $ jJob job' in |
146 | 48e4da5c | Klaus Aehlig | map (\job -> if qjId (jJob job) == jid then job' else job) |
147 | 48e4da5c | Klaus Aehlig | |
148 | 48e4da5c | Klaus Aehlig | -- | Update a single job by reading it from disk, if necessary. |
149 | 48e4da5c | Klaus Aehlig | updateJob :: JQStatus -> JobWithStat -> IO () |
150 | 48e4da5c | Klaus Aehlig | updateJob state jb = do |
151 | 48e4da5c | Klaus Aehlig | jb' <- readJobStatus jb |
152 | 48e4da5c | Klaus Aehlig | maybe (return ()) (modifyJobs state . onRunningJobs . updateJobStatus) jb' |
153 | ea174b21 | Klaus Aehlig | when (maybe True (jobFinalized . jJob) jb') . (>> return ()) . forkIO $ do |
154 | ea174b21 | Klaus Aehlig | logDebug "Scheduler noticed a job to have finished." |
155 | ea174b21 | Klaus Aehlig | cleanupFinishedJobs state |
156 | ea174b21 | Klaus Aehlig | scheduleSomeJobs state |
157 | 48e4da5c | Klaus Aehlig | |
158 | 48e4da5c | Klaus Aehlig | -- | Sort out the finished jobs from the monitored part of the queue. |
159 | 48e4da5c | Klaus Aehlig | -- This is the pure part, splitting the queue into a remaining queue |
160 | 48e4da5c | Klaus Aehlig | -- and the jobs that were removed. |
161 | a2977f53 | Klaus Aehlig | sortoutFinishedJobs :: Queue -> (Queue, [JobWithStat]) |
162 | 48e4da5c | Klaus Aehlig | sortoutFinishedJobs queue = |
163 | 1b3bde96 | Klaus Aehlig | let (fin, run') = partition (jobFinalized . jJob) . qRunning $ queue |
164 | a2977f53 | Klaus Aehlig | in (queue {qRunning=run'}, fin) |
165 | 48e4da5c | Klaus Aehlig | |
166 | 48e4da5c | Klaus Aehlig | -- | Actually clean up the finished jobs. This is the IO wrapper around |
167 | 48e4da5c | Klaus Aehlig | -- the pure `sortoutFinishedJobs`. |
168 | 48e4da5c | Klaus Aehlig | cleanupFinishedJobs :: JQStatus -> IO () |
169 | 48e4da5c | Klaus Aehlig | cleanupFinishedJobs qstate = do |
170 | 48e4da5c | Klaus Aehlig | finished <- atomicModifyIORef (jqJobs qstate) sortoutFinishedJobs |
171 | a2977f53 | Klaus Aehlig | let showJob = show . ((fromJobId . qjId) &&& calcJobStatus) . jJob |
172 | 48e4da5c | Klaus Aehlig | jlist = commaJoin $ map showJob finished |
173 | 48e4da5c | Klaus Aehlig | unless (null finished) |
174 | 48e4da5c | Klaus Aehlig | . logInfo $ "Finished jobs: " ++ jlist |
175 | cc5ab470 | Klaus Aehlig | mapM_ (maybe (return ()) killINotify . jINotify) finished |
176 | 48e4da5c | Klaus Aehlig | |
177 | b81650b0 | Klaus Aehlig | -- | Watcher task for a job, to update it on file changes. It also |
178 | b81650b0 | Klaus Aehlig | -- reinstantiates itself upon receiving an Ignored event. |
179 | b81650b0 | Klaus Aehlig | jobWatcher :: JQStatus -> JobWithStat -> Event -> IO () |
180 | b81650b0 | Klaus Aehlig | jobWatcher state jWS e = do |
181 | b81650b0 | Klaus Aehlig | let jid = qjId $ jJob jWS |
182 | b81650b0 | Klaus Aehlig | jids = show $ fromJobId jid |
183 | b81650b0 | Klaus Aehlig | logInfo $ "Scheduler notified of change of job " ++ jids |
184 | b81650b0 | Klaus Aehlig | logDebug $ "Scheulder notify event for " ++ jids ++ ": " ++ show e |
185 | b81650b0 | Klaus Aehlig | let inotify = jINotify jWS |
186 | b81650b0 | Klaus Aehlig | when (e == Ignored && isJust inotify) $ do |
187 | b81650b0 | Klaus Aehlig | qdir <- queueDir |
188 | b81650b0 | Klaus Aehlig | let fpath = liveJobFile qdir jid |
189 | b81650b0 | Klaus Aehlig | _ <- addWatch (fromJust inotify) [Modify, Delete] fpath |
190 | b81650b0 | Klaus Aehlig | (jobWatcher state jWS) |
191 | b81650b0 | Klaus Aehlig | return () |
192 | b81650b0 | Klaus Aehlig | updateJob state jWS |
193 | b81650b0 | Klaus Aehlig | |
194 | b81650b0 | Klaus Aehlig | -- | Attach the job watcher to a running job. |
195 | b81650b0 | Klaus Aehlig | attachWatcher :: JQStatus -> JobWithStat -> IO () |
196 | b81650b0 | Klaus Aehlig | attachWatcher state jWS = when (isNothing $ jINotify jWS) $ do |
197 | b81650b0 | Klaus Aehlig | inotify <- initINotify |
198 | b81650b0 | Klaus Aehlig | qdir <- queueDir |
199 | b81650b0 | Klaus Aehlig | let fpath = liveJobFile qdir . qjId $ jJob jWS |
200 | b81650b0 | Klaus Aehlig | jWS' = jWS { jINotify=Just inotify } |
201 | b81650b0 | Klaus Aehlig | logDebug $ "Attaching queue watcher for " ++ fpath |
202 | b81650b0 | Klaus Aehlig | _ <- addWatch inotify [Modify, Delete] fpath $ jobWatcher state jWS' |
203 | b81650b0 | Klaus Aehlig | modifyJobs state . onRunningJobs $ updateJobStatus jWS' |
204 | b81650b0 | Klaus Aehlig | |
205 | 48e4da5c | Klaus Aehlig | -- | Decide on which jobs to schedule next for execution. This is the |
206 | 48e4da5c | Klaus Aehlig | -- pure function doing the scheduling. |
207 | d9dd04b1 | Klaus Aehlig | selectJobsToRun :: Int -> Queue -> (Queue, [JobWithStat]) |
208 | d9dd04b1 | Klaus Aehlig | selectJobsToRun count queue = |
209 | d9dd04b1 | Klaus Aehlig | let n = count - length (qRunning queue) |
210 | 48e4da5c | Klaus Aehlig | (chosen, remain) = splitAt n (qEnqueued queue) |
211 | a2977f53 | Klaus Aehlig | in (queue {qEnqueued=remain, qRunning=qRunning queue ++ chosen}, chosen) |
212 | 48e4da5c | Klaus Aehlig | |
213 | 1c532d2d | Klaus Aehlig | -- | Requeue jobs that were previously selected for execution |
214 | 1c532d2d | Klaus Aehlig | -- but couldn't be started. |
215 | a2977f53 | Klaus Aehlig | requeueJobs :: JQStatus -> [JobWithStat] -> IOError -> IO () |
216 | 1c532d2d | Klaus Aehlig | requeueJobs qstate jobs err = do |
217 | a2977f53 | Klaus Aehlig | let jids = map (qjId . jJob) jobs |
218 | 1c532d2d | Klaus Aehlig | jidsString = commaJoin $ map (show . fromJobId) jids |
219 | 1c532d2d | Klaus Aehlig | rmJobs = filter ((`notElem` jids) . qjId . jJob) |
220 | 1c532d2d | Klaus Aehlig | logWarning $ "Starting jobs failed: " ++ show err |
221 | 1c532d2d | Klaus Aehlig | logWarning $ "Rescheduling jobs: " ++ jidsString |
222 | 1c532d2d | Klaus Aehlig | modifyJobs qstate (onRunningJobs rmJobs) |
223 | a2977f53 | Klaus Aehlig | modifyJobs qstate (onQueuedJobs $ (++) jobs) |
224 | 1c532d2d | Klaus Aehlig | |
225 | 48e4da5c | Klaus Aehlig | -- | Schedule jobs to be run. This is the IO wrapper around the |
226 | 48e4da5c | Klaus Aehlig | -- pure `selectJobsToRun`. |
227 | 48e4da5c | Klaus Aehlig | scheduleSomeJobs :: JQStatus -> IO () |
228 | 48e4da5c | Klaus Aehlig | scheduleSomeJobs qstate = do |
229 | d9dd04b1 | Klaus Aehlig | count <- getMaxRunningJobs qstate |
230 | d9dd04b1 | Klaus Aehlig | chosen <- atomicModifyIORef (jqJobs qstate) (selectJobsToRun count) |
231 | a2977f53 | Klaus Aehlig | let jobs = map jJob chosen |
232 | 7dd21737 | Klaus Aehlig | unless (null chosen) . logInfo . (++) "Starting jobs: " . commaJoin |
233 | a2977f53 | Klaus Aehlig | $ map (show . fromJobId . qjId) jobs |
234 | b81650b0 | Klaus Aehlig | mapM_ (attachWatcher qstate) chosen |
235 | a2977f53 | Klaus Aehlig | result <- try $ JQ.startJobs jobs |
236 | 1c532d2d | Klaus Aehlig | either (requeueJobs qstate chosen) return result |
237 | 48e4da5c | Klaus Aehlig | |
238 | 48e4da5c | Klaus Aehlig | -- | Format the job queue status in a compact, human readable way. |
239 | 48e4da5c | Klaus Aehlig | showQueue :: Queue -> String |
240 | 48e4da5c | Klaus Aehlig | showQueue (Queue {qEnqueued=waiting, qRunning=running}) = |
241 | 48e4da5c | Klaus Aehlig | let showids = show . map (fromJobId . qjId . jJob) |
242 | 48e4da5c | Klaus Aehlig | in "Waiting jobs: " ++ showids waiting |
243 | 48e4da5c | Klaus Aehlig | ++ "; running jobs: " ++ showids running |
244 | 48e4da5c | Klaus Aehlig | |
245 | 48e4da5c | Klaus Aehlig | -- | Time-based watcher for updating the job queue. |
246 | 48e4da5c | Klaus Aehlig | onTimeWatcher :: JQStatus -> IO () |
247 | 48e4da5c | Klaus Aehlig | onTimeWatcher qstate = forever $ do |
248 | 48e4da5c | Klaus Aehlig | threadDelay watchInterval |
249 | fe50bb65 | Klaus Aehlig | logDebug "Job queue watcher timer fired" |
250 | 48e4da5c | Klaus Aehlig | jobs <- readIORef (jqJobs qstate) |
251 | 48e4da5c | Klaus Aehlig | mapM_ (updateJob qstate) $ qRunning jobs |
252 | 48e4da5c | Klaus Aehlig | cleanupFinishedJobs qstate |
253 | 48e4da5c | Klaus Aehlig | jobs' <- readIORef (jqJobs qstate) |
254 | 48e4da5c | Klaus Aehlig | logInfo $ showQueue jobs' |
255 | 48e4da5c | Klaus Aehlig | scheduleSomeJobs qstate |
256 | 48e4da5c | Klaus Aehlig | |
257 | 2713b91a | Klaus Aehlig | -- | Read a single, non-archived, job, specified by its id, from disk. |
258 | 2713b91a | Klaus Aehlig | readJobFromDisk :: JobId -> IO (Result JobWithStat) |
259 | 2713b91a | Klaus Aehlig | readJobFromDisk jid = do |
260 | 2713b91a | Klaus Aehlig | qdir <- queueDir |
261 | 2713b91a | Klaus Aehlig | let fpath = liveJobFile qdir jid |
262 | 2713b91a | Klaus Aehlig | logDebug $ "Reading " ++ fpath |
263 | 2713b91a | Klaus Aehlig | tryFstat <- try $ getFStat fpath :: IO (Either IOError FStat) |
264 | 2713b91a | Klaus Aehlig | let fstat = either (const nullFStat) id tryFstat |
265 | 2713b91a | Klaus Aehlig | loadResult <- JQ.loadJobFromDisk qdir False jid |
266 | ed6cf449 | Klaus Aehlig | return $ liftM (JobWithStat Nothing fstat . fst) loadResult |
267 | 2713b91a | Klaus Aehlig | |
268 | 2713b91a | Klaus Aehlig | -- | Read all non-finalized jobs from disk. |
269 | 2713b91a | Klaus Aehlig | readJobsFromDisk :: IO [JobWithStat] |
270 | 2713b91a | Klaus Aehlig | readJobsFromDisk = do |
271 | 2713b91a | Klaus Aehlig | logInfo "Loading job queue" |
272 | 2713b91a | Klaus Aehlig | qdir <- queueDir |
273 | 2713b91a | Klaus Aehlig | eitherJids <- JQ.getJobIDs [qdir] |
274 | ea7032da | Petr Pudlak | let jids = genericResult (const []) JQ.sortJobIDs eitherJids |
275 | 2713b91a | Klaus Aehlig | jidsstring = commaJoin $ map (show . fromJobId) jids |
276 | 2713b91a | Klaus Aehlig | logInfo $ "Non-archived jobs on disk: " ++ jidsstring |
277 | 2713b91a | Klaus Aehlig | jobs <- mapM readJobFromDisk jids |
278 | 2713b91a | Klaus Aehlig | return $ justOk jobs |
279 | 2713b91a | Klaus Aehlig | |
280 | 48e4da5c | Klaus Aehlig | -- | Set up the job scheduler. This will also start the monitoring |
281 | 48e4da5c | Klaus Aehlig | -- of changes to the running jobs. |
282 | 48e4da5c | Klaus Aehlig | initJQScheduler :: JQStatus -> IO () |
283 | 48e4da5c | Klaus Aehlig | initJQScheduler qstate = do |
284 | 2713b91a | Klaus Aehlig | alljobs <- readJobsFromDisk |
285 | 2713b91a | Klaus Aehlig | let jobs = filter (not . jobFinalized . jJob) alljobs |
286 | 2713b91a | Klaus Aehlig | (running, queued) = partition (jobStarted . jJob) jobs |
287 | 2713b91a | Klaus Aehlig | modifyJobs qstate (onQueuedJobs (++ queued) . onRunningJobs (++ running)) |
288 | 2713b91a | Klaus Aehlig | jqjobs <- readIORef (jqJobs qstate) |
289 | 2713b91a | Klaus Aehlig | logInfo $ showQueue jqjobs |
290 | 2713b91a | Klaus Aehlig | scheduleSomeJobs qstate |
291 | 48e4da5c | Klaus Aehlig | logInfo "Starting time-based job queue watcher" |
292 | 48e4da5c | Klaus Aehlig | _ <- forkIO $ onTimeWatcher qstate |
293 | 48e4da5c | Klaus Aehlig | return () |
294 | 48e4da5c | Klaus Aehlig | |
295 | 48e4da5c | Klaus Aehlig | -- | Enqueue new jobs. This will guarantee that the jobs will be executed |
296 | 48e4da5c | Klaus Aehlig | -- eventually. |
297 | 48e4da5c | Klaus Aehlig | enqueueNewJobs :: JQStatus -> [QueuedJob] -> IO () |
298 | 48e4da5c | Klaus Aehlig | enqueueNewJobs state jobs = do |
299 | 48e4da5c | Klaus Aehlig | logInfo . (++) "New jobs enqueued: " . commaJoin |
300 | 48e4da5c | Klaus Aehlig | $ map (show . fromJobId . qjId) jobs |
301 | 48e4da5c | Klaus Aehlig | let jobs' = map unreadJob jobs |
302 | f7743189 | Klaus Aehlig | insertFn = insertBy (compare `on` fromJobId . qjId . jJob) |
303 | f7743189 | Klaus Aehlig | addJobs oldjobs = foldl (flip insertFn) oldjobs jobs' |
304 | f7743189 | Klaus Aehlig | modifyJobs state (onQueuedJobs addJobs) |
305 | 48e4da5c | Klaus Aehlig | scheduleSomeJobs state |
306 | bb62d52e | Klaus Aehlig | |
307 | bb62d52e | Klaus Aehlig | -- | Pure function for removing a queued job from the job queue by |
308 | 155df343 | Klaus Aehlig | -- atomicModifyIORef. The answer is Just the job if the job could be removed |
309 | 155df343 | Klaus Aehlig | -- before being handed over to execution, Nothing if it already was started |
310 | bb62d52e | Klaus Aehlig | -- and a Bad result if the job is not found in the queue. |
311 | 155df343 | Klaus Aehlig | rmJob :: JobId -> Queue -> (Queue, Result (Maybe QueuedJob)) |
312 | bb62d52e | Klaus Aehlig | rmJob jid q = |
313 | bb62d52e | Klaus Aehlig | let isJid = (jid ==) . qjId . jJob |
314 | bb62d52e | Klaus Aehlig | (found, queued') = partition isJid $ qEnqueued q |
315 | 155df343 | Klaus Aehlig | isRunning = any isJid $ qRunning q |
316 | 155df343 | Klaus Aehlig | sJid = (++) "Job " . show $ fromJobId jid |
317 | 155df343 | Klaus Aehlig | in case (found, isRunning) of |
318 | 155df343 | Klaus Aehlig | ([job], _) -> (q {qEnqueued = queued'}, Ok . Just $ jJob job) |
319 | 155df343 | Klaus Aehlig | (_:_, _) -> (q, Bad $ "Queue in inconsistent state." |
320 | 155df343 | Klaus Aehlig | ++ sJid ++ " queued multiple times") |
321 | 155df343 | Klaus Aehlig | (_, True) -> (q, Ok Nothing) |
322 | 155df343 | Klaus Aehlig | _ -> (q, Bad $ sJid ++ " not found in queue") |
323 | bb62d52e | Klaus Aehlig | |
324 | bb62d52e | Klaus Aehlig | -- | Try to remove a queued job from the job queue. Return True, if |
325 | bb62d52e | Klaus Aehlig | -- the job could be removed from the queue before being handed over |
326 | bb62d52e | Klaus Aehlig | -- to execution, False if the job already started, and a Bad result |
327 | bb62d52e | Klaus Aehlig | -- if the job is unknown. |
328 | bb62d52e | Klaus Aehlig | dequeueJob :: JQStatus -> JobId -> IO (Result Bool) |
329 | bb62d52e | Klaus Aehlig | dequeueJob state jid = do |
330 | bb62d52e | Klaus Aehlig | result <- atomicModifyIORef (jqJobs state) $ rmJob jid |
331 | 155df343 | Klaus Aehlig | let result' = fmap isJust result |
332 | bb62d52e | Klaus Aehlig | logDebug $ "Result of dequeing job " ++ show (fromJobId jid) |
333 | 155df343 | Klaus Aehlig | ++ " is " ++ show result' |
334 | 155df343 | Klaus Aehlig | return result' |
335 | 96d55b50 | Klaus Aehlig | |
336 | 96d55b50 | Klaus Aehlig | -- | Change the priority of a queued job (once the job is handed over |
337 | 96d55b50 | Klaus Aehlig | -- to execution, the job itself needs to be informed). To avoid the |
338 | 96d55b50 | Klaus Aehlig | -- job being started unmodified, it is temporarily unqueued during the |
339 | 96d55b50 | Klaus Aehlig | -- change. Return the modified job, if the job's priority was sucessfully |
340 | 96d55b50 | Klaus Aehlig | -- modified, Nothing, if the job already started, and a Bad value, if the job |
341 | 96d55b50 | Klaus Aehlig | -- is unkown. |
342 | 96d55b50 | Klaus Aehlig | setJobPriority :: JQStatus -> JobId -> Int -> IO (Result (Maybe QueuedJob)) |
343 | 96d55b50 | Klaus Aehlig | setJobPriority state jid prio = runResultT $ do |
344 | 96d55b50 | Klaus Aehlig | maybeJob <- mkResultT . atomicModifyIORef (jqJobs state) $ rmJob jid |
345 | 96d55b50 | Klaus Aehlig | case maybeJob of |
346 | 96d55b50 | Klaus Aehlig | Nothing -> return Nothing |
347 | 96d55b50 | Klaus Aehlig | Just job -> do |
348 | 96d55b50 | Klaus Aehlig | let job' = changeJobPriority prio job |
349 | 96d55b50 | Klaus Aehlig | qDir <- liftIO queueDir |
350 | 96d55b50 | Klaus Aehlig | mkResultT $ writeJobToDisk qDir job' |
351 | 96d55b50 | Klaus Aehlig | liftIO $ enqueueNewJobs state [job'] |
352 | 96d55b50 | Klaus Aehlig | return $ Just job' |