root / src / Ganeti / JQScheduler.hs @ a6a6a1b5
History | View | Annotate | Download (13.5 kB)
1 |
{-| Implementation of a reader for the job queue. |
---|---|
2 |
|
3 |
-} |
4 |
|
5 |
{- |
6 |
|
7 |
Copyright (C) 2013 Google Inc. |
8 |
|
9 |
This program is free software; you can redistribute it and/or modify |
10 |
it under the terms of the GNU General Public License as published by |
11 |
the Free Software Foundation; either version 2 of the License, or |
12 |
(at your option) any later version. |
13 |
|
14 |
This program is distributed in the hope that it will be useful, but |
15 |
WITHOUT ANY WARRANTY; without even the implied warranty of |
16 |
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU |
17 |
General Public License for more details. |
18 |
|
19 |
You should have received a copy of the GNU General Public License |
20 |
along with this program; if not, write to the Free Software |
21 |
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA |
22 |
02110-1301, USA. |
23 |
|
24 |
-} |
25 |
|
26 |
module Ganeti.JQScheduler |
27 |
( JQStatus |
28 |
, emptyJQStatus |
29 |
, initJQScheduler |
30 |
, enqueueNewJobs |
31 |
, dequeueJob |
32 |
, setJobPriority |
33 |
) where |
34 |
|
35 |
import Control.Arrow |
36 |
import Control.Concurrent |
37 |
import Control.Exception |
38 |
import Control.Monad |
39 |
import Control.Monad.IO.Class |
40 |
import Data.Function (on) |
41 |
import Data.List |
42 |
import Data.Maybe |
43 |
import Data.IORef |
44 |
import System.INotify |
45 |
|
46 |
import Ganeti.BasicTypes |
47 |
import Ganeti.Constants as C |
48 |
import Ganeti.JQueue as JQ |
49 |
import Ganeti.Logging |
50 |
import Ganeti.Objects |
51 |
import Ganeti.Path |
52 |
import Ganeti.Types |
53 |
import Ganeti.Utils |
54 |
|
55 |
data JobWithStat = JobWithStat { jINotify :: Maybe INotify |
56 |
, jStat :: FStat |
57 |
, jJob :: QueuedJob |
58 |
} |
59 |
data Queue = Queue { qEnqueued :: [JobWithStat], qRunning :: [JobWithStat] } |
60 |
|
61 |
{-| Representation of the job queue |
62 |
|
63 |
We keep two lists of jobs (together with information about the last |
64 |
fstat result observed): the jobs that are enqueued, but not yet handed |
65 |
over for execution, and the jobs already handed over for execution. They |
66 |
are kept together in a single IORef, so that we can atomically update |
67 |
both, in particular when scheduling jobs to be handed over for execution. |
68 |
|
69 |
-} |
70 |
|
71 |
data JQStatus = JQStatus |
72 |
{ jqJobs :: IORef Queue |
73 |
, jqConfig :: IORef (Result ConfigData) |
74 |
} |
75 |
|
76 |
|
77 |
emptyJQStatus :: IORef (Result ConfigData) -> IO JQStatus |
78 |
emptyJQStatus config = do |
79 |
jqJ <- newIORef Queue { qEnqueued = [], qRunning = []} |
80 |
return JQStatus { jqJobs = jqJ, jqConfig = config } |
81 |
|
82 |
-- | Apply a function on the running jobs. |
83 |
onRunningJobs :: ([JobWithStat] -> [JobWithStat]) -> Queue -> Queue |
84 |
onRunningJobs f queue = queue {qRunning=f $ qRunning queue} |
85 |
|
86 |
-- | Apply a function on the queued jobs. |
87 |
onQueuedJobs :: ([JobWithStat] -> [JobWithStat]) -> Queue -> Queue |
88 |
onQueuedJobs f queue = queue {qEnqueued=f $ qEnqueued queue} |
89 |
|
90 |
-- | Obtain a JobWithStat from a QueuedJob. |
91 |
unreadJob :: QueuedJob -> JobWithStat |
92 |
unreadJob job = JobWithStat {jJob=job, jStat=nullFStat, jINotify=Nothing} |
93 |
|
94 |
-- | Reload interval for polling the running jobs for updates in microseconds. |
95 |
watchInterval :: Int |
96 |
watchInterval = C.luxidJobqueuePollInterval * 1000000 |
97 |
|
98 |
-- | Get the maximual number of jobs to be run simultaneously from the |
99 |
-- configuration. If the configuration is not available, be conservative |
100 |
-- and use the smallest possible value, i.e., 1. |
101 |
getMaxRunningJobs :: JQStatus -> IO Int |
102 |
getMaxRunningJobs = |
103 |
liftM (genericResult (const 1) (clusterMaxRunningJobs . configCluster)) |
104 |
. readIORef . jqConfig |
105 |
|
106 |
-- | Wrapper function to atomically update the jobs in the queue status. |
107 |
modifyJobs :: JQStatus -> (Queue -> Queue) -> IO () |
108 |
modifyJobs qstat f = atomicModifyIORef (jqJobs qstat) (flip (,) () . f) |
109 |
|
110 |
-- | Reread a job from disk, if the file has changed. |
111 |
readJobStatus :: JobWithStat -> IO (Maybe JobWithStat) |
112 |
readJobStatus jWS@(JobWithStat {jStat=fstat, jJob=job}) = do |
113 |
let jid = qjId job |
114 |
qdir <- queueDir |
115 |
let fpath = liveJobFile qdir jid |
116 |
logDebug $ "Checking if " ++ fpath ++ " changed on disk." |
117 |
changedResult <- try $ needsReload fstat fpath |
118 |
:: IO (Either IOError (Maybe FStat)) |
119 |
let changed = either (const $ Just nullFStat) id changedResult |
120 |
case changed of |
121 |
Nothing -> do |
122 |
logDebug $ "File " ++ fpath ++ " not changed on disk." |
123 |
return Nothing |
124 |
Just fstat' -> do |
125 |
let jids = show $ fromJobId jid |
126 |
logInfo $ "Rereading job " ++ jids |
127 |
readResult <- loadJobFromDisk qdir True jid |
128 |
case readResult of |
129 |
Bad s -> do |
130 |
logWarning $ "Failed to read job " ++ jids ++ ": " ++ s |
131 |
return Nothing |
132 |
Ok (job', _) -> do |
133 |
logDebug |
134 |
$ "Read job " ++ jids ++ ", staus is " ++ show (calcJobStatus job') |
135 |
return . Just $ jWS {jStat=fstat', jJob=job'} |
136 |
-- jINotify unchanged |
137 |
|
138 |
-- | Update a job in the job queue, if it is still there. This is the |
139 |
-- pure function for inserting a previously read change into the queue. |
140 |
-- as the change contains its time stamp, we don't have to worry about a |
141 |
-- later read change overwriting a newer read state. If this happens, the |
142 |
-- fstat value will be outdated, so the next poller run will fix this. |
143 |
updateJobStatus :: JobWithStat -> [JobWithStat] -> [JobWithStat] |
144 |
updateJobStatus job' = |
145 |
let jid = qjId $ jJob job' in |
146 |
map (\job -> if qjId (jJob job) == jid then job' else job) |
147 |
|
148 |
-- | Update a single job by reading it from disk, if necessary. |
149 |
updateJob :: JQStatus -> JobWithStat -> IO () |
150 |
updateJob state jb = do |
151 |
jb' <- readJobStatus jb |
152 |
maybe (return ()) (modifyJobs state . onRunningJobs . updateJobStatus) jb' |
153 |
when (maybe True (jobFinalized . jJob) jb') . (>> return ()) . forkIO $ do |
154 |
logDebug "Scheduler noticed a job to have finished." |
155 |
cleanupFinishedJobs state |
156 |
scheduleSomeJobs state |
157 |
|
158 |
-- | Sort out the finished jobs from the monitored part of the queue. |
159 |
-- This is the pure part, splitting the queue into a remaining queue |
160 |
-- and the jobs that were removed. |
161 |
sortoutFinishedJobs :: Queue -> (Queue, [JobWithStat]) |
162 |
sortoutFinishedJobs queue = |
163 |
let (fin, run') = partition (jobFinalized . jJob) . qRunning $ queue |
164 |
in (queue {qRunning=run'}, fin) |
165 |
|
166 |
-- | Actually clean up the finished jobs. This is the IO wrapper around |
167 |
-- the pure `sortoutFinishedJobs`. |
168 |
cleanupFinishedJobs :: JQStatus -> IO () |
169 |
cleanupFinishedJobs qstate = do |
170 |
finished <- atomicModifyIORef (jqJobs qstate) sortoutFinishedJobs |
171 |
let showJob = show . ((fromJobId . qjId) &&& calcJobStatus) . jJob |
172 |
jlist = commaJoin $ map showJob finished |
173 |
unless (null finished) |
174 |
. logInfo $ "Finished jobs: " ++ jlist |
175 |
mapM_ (maybe (return ()) killINotify . jINotify) finished |
176 |
|
177 |
-- | Watcher task for a job, to update it on file changes. It also |
178 |
-- reinstantiates itself upon receiving an Ignored event. |
179 |
jobWatcher :: JQStatus -> JobWithStat -> Event -> IO () |
180 |
jobWatcher state jWS e = do |
181 |
let jid = qjId $ jJob jWS |
182 |
jids = show $ fromJobId jid |
183 |
logInfo $ "Scheduler notified of change of job " ++ jids |
184 |
logDebug $ "Scheulder notify event for " ++ jids ++ ": " ++ show e |
185 |
let inotify = jINotify jWS |
186 |
when (e == Ignored && isJust inotify) $ do |
187 |
qdir <- queueDir |
188 |
let fpath = liveJobFile qdir jid |
189 |
_ <- addWatch (fromJust inotify) [Modify, Delete] fpath |
190 |
(jobWatcher state jWS) |
191 |
return () |
192 |
updateJob state jWS |
193 |
|
194 |
-- | Attach the job watcher to a running job. |
195 |
attachWatcher :: JQStatus -> JobWithStat -> IO () |
196 |
attachWatcher state jWS = when (isNothing $ jINotify jWS) $ do |
197 |
inotify <- initINotify |
198 |
qdir <- queueDir |
199 |
let fpath = liveJobFile qdir . qjId $ jJob jWS |
200 |
jWS' = jWS { jINotify=Just inotify } |
201 |
logDebug $ "Attaching queue watcher for " ++ fpath |
202 |
_ <- addWatch inotify [Modify, Delete] fpath $ jobWatcher state jWS' |
203 |
modifyJobs state . onRunningJobs $ updateJobStatus jWS' |
204 |
|
205 |
-- | Decide on which jobs to schedule next for execution. This is the |
206 |
-- pure function doing the scheduling. |
207 |
selectJobsToRun :: Int -> Queue -> (Queue, [JobWithStat]) |
208 |
selectJobsToRun count queue = |
209 |
let n = count - length (qRunning queue) |
210 |
(chosen, remain) = splitAt n (qEnqueued queue) |
211 |
in (queue {qEnqueued=remain, qRunning=qRunning queue ++ chosen}, chosen) |
212 |
|
213 |
-- | Requeue jobs that were previously selected for execution |
214 |
-- but couldn't be started. |
215 |
requeueJobs :: JQStatus -> [JobWithStat] -> IOError -> IO () |
216 |
requeueJobs qstate jobs err = do |
217 |
let jids = map (qjId . jJob) jobs |
218 |
jidsString = commaJoin $ map (show . fromJobId) jids |
219 |
rmJobs = filter ((`notElem` jids) . qjId . jJob) |
220 |
logWarning $ "Starting jobs failed: " ++ show err |
221 |
logWarning $ "Rescheduling jobs: " ++ jidsString |
222 |
modifyJobs qstate (onRunningJobs rmJobs) |
223 |
modifyJobs qstate (onQueuedJobs $ (++) jobs) |
224 |
|
225 |
-- | Schedule jobs to be run. This is the IO wrapper around the |
226 |
-- pure `selectJobsToRun`. |
227 |
scheduleSomeJobs :: JQStatus -> IO () |
228 |
scheduleSomeJobs qstate = do |
229 |
count <- getMaxRunningJobs qstate |
230 |
chosen <- atomicModifyIORef (jqJobs qstate) (selectJobsToRun count) |
231 |
let jobs = map jJob chosen |
232 |
unless (null chosen) . logInfo . (++) "Starting jobs: " . commaJoin |
233 |
$ map (show . fromJobId . qjId) jobs |
234 |
mapM_ (attachWatcher qstate) chosen |
235 |
result <- try $ JQ.startJobs jobs |
236 |
either (requeueJobs qstate chosen) return result |
237 |
|
238 |
-- | Format the job queue status in a compact, human readable way. |
239 |
showQueue :: Queue -> String |
240 |
showQueue (Queue {qEnqueued=waiting, qRunning=running}) = |
241 |
let showids = show . map (fromJobId . qjId . jJob) |
242 |
in "Waiting jobs: " ++ showids waiting |
243 |
++ "; running jobs: " ++ showids running |
244 |
|
245 |
-- | Time-based watcher for updating the job queue. |
246 |
onTimeWatcher :: JQStatus -> IO () |
247 |
onTimeWatcher qstate = forever $ do |
248 |
threadDelay watchInterval |
249 |
logDebug "Job queue watcher timer fired" |
250 |
jobs <- readIORef (jqJobs qstate) |
251 |
mapM_ (updateJob qstate) $ qRunning jobs |
252 |
cleanupFinishedJobs qstate |
253 |
jobs' <- readIORef (jqJobs qstate) |
254 |
logInfo $ showQueue jobs' |
255 |
scheduleSomeJobs qstate |
256 |
|
257 |
-- | Read a single, non-archived, job, specified by its id, from disk. |
258 |
readJobFromDisk :: JobId -> IO (Result JobWithStat) |
259 |
readJobFromDisk jid = do |
260 |
qdir <- queueDir |
261 |
let fpath = liveJobFile qdir jid |
262 |
logDebug $ "Reading " ++ fpath |
263 |
tryFstat <- try $ getFStat fpath :: IO (Either IOError FStat) |
264 |
let fstat = either (const nullFStat) id tryFstat |
265 |
loadResult <- JQ.loadJobFromDisk qdir False jid |
266 |
return $ liftM (JobWithStat Nothing fstat . fst) loadResult |
267 |
|
268 |
-- | Read all non-finalized jobs from disk. |
269 |
readJobsFromDisk :: IO [JobWithStat] |
270 |
readJobsFromDisk = do |
271 |
logInfo "Loading job queue" |
272 |
qdir <- queueDir |
273 |
eitherJids <- JQ.getJobIDs [qdir] |
274 |
let jids = genericResult (const []) JQ.sortJobIDs eitherJids |
275 |
jidsstring = commaJoin $ map (show . fromJobId) jids |
276 |
logInfo $ "Non-archived jobs on disk: " ++ jidsstring |
277 |
jobs <- mapM readJobFromDisk jids |
278 |
return $ justOk jobs |
279 |
|
280 |
-- | Set up the job scheduler. This will also start the monitoring |
281 |
-- of changes to the running jobs. |
282 |
initJQScheduler :: JQStatus -> IO () |
283 |
initJQScheduler qstate = do |
284 |
alljobs <- readJobsFromDisk |
285 |
let jobs = filter (not . jobFinalized . jJob) alljobs |
286 |
(running, queued) = partition (jobStarted . jJob) jobs |
287 |
modifyJobs qstate (onQueuedJobs (++ queued) . onRunningJobs (++ running)) |
288 |
jqjobs <- readIORef (jqJobs qstate) |
289 |
logInfo $ showQueue jqjobs |
290 |
scheduleSomeJobs qstate |
291 |
logInfo "Starting time-based job queue watcher" |
292 |
_ <- forkIO $ onTimeWatcher qstate |
293 |
return () |
294 |
|
295 |
-- | Enqueue new jobs. This will guarantee that the jobs will be executed |
296 |
-- eventually. |
297 |
enqueueNewJobs :: JQStatus -> [QueuedJob] -> IO () |
298 |
enqueueNewJobs state jobs = do |
299 |
logInfo . (++) "New jobs enqueued: " . commaJoin |
300 |
$ map (show . fromJobId . qjId) jobs |
301 |
let jobs' = map unreadJob jobs |
302 |
insertFn = insertBy (compare `on` fromJobId . qjId . jJob) |
303 |
addJobs oldjobs = foldl (flip insertFn) oldjobs jobs' |
304 |
modifyJobs state (onQueuedJobs addJobs) |
305 |
scheduleSomeJobs state |
306 |
|
307 |
-- | Pure function for removing a queued job from the job queue by |
308 |
-- atomicModifyIORef. The answer is Just the job if the job could be removed |
309 |
-- before being handed over to execution, Nothing if it already was started |
310 |
-- and a Bad result if the job is not found in the queue. |
311 |
rmJob :: JobId -> Queue -> (Queue, Result (Maybe QueuedJob)) |
312 |
rmJob jid q = |
313 |
let isJid = (jid ==) . qjId . jJob |
314 |
(found, queued') = partition isJid $ qEnqueued q |
315 |
isRunning = any isJid $ qRunning q |
316 |
sJid = (++) "Job " . show $ fromJobId jid |
317 |
in case (found, isRunning) of |
318 |
([job], _) -> (q {qEnqueued = queued'}, Ok . Just $ jJob job) |
319 |
(_:_, _) -> (q, Bad $ "Queue in inconsistent state." |
320 |
++ sJid ++ " queued multiple times") |
321 |
(_, True) -> (q, Ok Nothing) |
322 |
_ -> (q, Bad $ sJid ++ " not found in queue") |
323 |
|
324 |
-- | Try to remove a queued job from the job queue. Return True, if |
325 |
-- the job could be removed from the queue before being handed over |
326 |
-- to execution, False if the job already started, and a Bad result |
327 |
-- if the job is unknown. |
328 |
dequeueJob :: JQStatus -> JobId -> IO (Result Bool) |
329 |
dequeueJob state jid = do |
330 |
result <- atomicModifyIORef (jqJobs state) $ rmJob jid |
331 |
let result' = fmap isJust result |
332 |
logDebug $ "Result of dequeing job " ++ show (fromJobId jid) |
333 |
++ " is " ++ show result' |
334 |
return result' |
335 |
|
336 |
-- | Change the priority of a queued job (once the job is handed over |
337 |
-- to execution, the job itself needs to be informed). To avoid the |
338 |
-- job being started unmodified, it is temporarily unqueued during the |
339 |
-- change. Return the modified job, if the job's priority was sucessfully |
340 |
-- modified, Nothing, if the job already started, and a Bad value, if the job |
341 |
-- is unkown. |
342 |
setJobPriority :: JQStatus -> JobId -> Int -> IO (Result (Maybe QueuedJob)) |
343 |
setJobPriority state jid prio = runResultT $ do |
344 |
maybeJob <- mkResultT . atomicModifyIORef (jqJobs state) $ rmJob jid |
345 |
case maybeJob of |
346 |
Nothing -> return Nothing |
347 |
Just job -> do |
348 |
let job' = changeJobPriority prio job |
349 |
qDir <- liftIO queueDir |
350 |
mkResultT $ writeJobToDisk qDir job' |
351 |
liftIO $ enqueueNewJobs state [job'] |
352 |
return $ Just job' |