root / src / Ganeti / JQScheduler.hs @ 48e4da5c
History | View | Annotate | Download (7.3 kB)
1 |
{-| Implementation of a reader for the job queue. |
---|---|
2 |
|
3 |
-} |
4 |
|
5 |
{- |
6 |
|
7 |
Copyright (C) 2013 Google Inc. |
8 |
|
9 |
This program is free software; you can redistribute it and/or modify |
10 |
it under the terms of the GNU General Public License as published by |
11 |
the Free Software Foundation; either version 2 of the License, or |
12 |
(at your option) any later version. |
13 |
|
14 |
This program is distributed in the hope that it will be useful, but |
15 |
WITHOUT ANY WARRANTY; without even the implied warranty of |
16 |
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU |
17 |
General Public License for more details. |
18 |
|
19 |
You should have received a copy of the GNU General Public License |
20 |
along with this program; if not, write to the Free Software |
21 |
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA |
22 |
02110-1301, USA. |
23 |
|
24 |
-} |
25 |
|
26 |
module Ganeti.JQScheduler |
27 |
( JQStatus |
28 |
, emptyJQStatus |
29 |
, initJQScheduler |
30 |
, enqueueNewJobs |
31 |
) where |
32 |
|
33 |
import Control.Arrow |
34 |
import Control.Concurrent |
35 |
import Control.Monad |
36 |
import Data.List |
37 |
import Data.IORef |
38 |
|
39 |
import Ganeti.BasicTypes |
40 |
import Ganeti.Constants as C |
41 |
import Ganeti.JQueue as JQ |
42 |
import Ganeti.Logging |
43 |
import Ganeti.Path |
44 |
import Ganeti.Types |
45 |
import Ganeti.Utils |
46 |
|
47 |
data JobWithStat = JobWithStat { jStat :: FStat, jJob :: QueuedJob } |
48 |
data Queue = Queue { qEnqueued :: [JobWithStat], qRunning :: [JobWithStat] } |
49 |
|
50 |
{-| Representation of the job queue |
51 |
|
52 |
We keep two lists of jobs (together with information about the last |
53 |
fstat result observed): the jobs that are enqueued, but not yet handed |
54 |
over for execution, and the jobs already handed over for execution. They |
55 |
are kept together in a single IORef, so that we can atomically update |
56 |
both, in particular when scheduling jobs to be handed over for execution. |
57 |
|
58 |
-} |
59 |
|
60 |
data JQStatus = JQStatus |
61 |
{ jqJobs :: IORef Queue |
62 |
} |
63 |
|
64 |
|
65 |
emptyJQStatus :: IO JQStatus |
66 |
emptyJQStatus = do |
67 |
jqJ <- newIORef Queue {qEnqueued=[], qRunning=[]} |
68 |
return JQStatus { jqJobs=jqJ } |
69 |
|
70 |
-- | Apply a function on the running jobs. |
71 |
onRunningJobs :: ([JobWithStat] -> [JobWithStat]) -> Queue -> Queue |
72 |
onRunningJobs f queue = queue {qRunning=f $ qRunning queue} |
73 |
|
74 |
-- | Apply a function on the queued jobs. |
75 |
onQueuedJobs :: ([JobWithStat] -> [JobWithStat]) -> Queue -> Queue |
76 |
onQueuedJobs f queue = queue {qEnqueued=f $ qEnqueued queue} |
77 |
|
78 |
-- | Obtain a JobWithStat from a QueuedJob. |
79 |
unreadJob :: QueuedJob -> JobWithStat |
80 |
unreadJob job = JobWithStat {jJob=job, jStat=nullFStat} |
81 |
|
82 |
-- | Reload interval for polling the running jobs for updates in microseconds. |
83 |
watchInterval :: Int |
84 |
watchInterval = C.luxidJobqueuePollInterval * 1000000 |
85 |
|
86 |
-- | Maximal number of jobs to be running at the same time. |
87 |
maxRunningJobs :: Int |
88 |
maxRunningJobs = C.luxidMaximalRunningJobs |
89 |
|
90 |
-- | Wrapper function to atomically update the jobs in the queue status. |
91 |
modifyJobs :: JQStatus -> (Queue -> Queue) -> IO () |
92 |
modifyJobs qstat f = atomicModifyIORef (jqJobs qstat) (flip (,) () . f) |
93 |
|
94 |
-- | Reread a job from disk, if the file has changed. |
95 |
readJobStatus :: JobWithStat -> IO (Maybe JobWithStat) |
96 |
readJobStatus (JobWithStat {jStat=fstat, jJob=job}) = do |
97 |
let jid = qjId job |
98 |
qdir <- queueDir |
99 |
let fpath = liveJobFile qdir jid |
100 |
logDebug $ "Checking if " ++ fpath ++ " changed on disk." |
101 |
changed <- needsReload fstat fpath |
102 |
case changed of |
103 |
Nothing -> do |
104 |
logDebug $ "File " ++ fpath ++ " not changed on disk." |
105 |
return Nothing |
106 |
Just fstat' -> do |
107 |
logInfo $ "Rereading " ++ fpath |
108 |
readResult <- loadJobFromDisk qdir False jid |
109 |
let jids = show $ fromJobId jid |
110 |
case readResult of |
111 |
Bad s -> do |
112 |
logWarning $ "Failed to read job " ++ jids ++ ": " ++ s |
113 |
return Nothing |
114 |
Ok (job', _) -> do |
115 |
logDebug |
116 |
$ "Read job " ++ jids ++ ", staus is " ++ show (calcJobStatus job') |
117 |
return . Just $ JobWithStat {jStat=fstat', jJob=job'} |
118 |
|
119 |
-- | Update a job in the job queue, if it is still there. This is the |
120 |
-- pure function for inserting a previously read change into the queue. |
121 |
-- as the change contains its time stamp, we don't have to worry about a |
122 |
-- later read change overwriting a newer read state. If this happens, the |
123 |
-- fstat value will be outdated, so the next poller run will fix this. |
124 |
updateJobStatus :: JobWithStat -> [JobWithStat] -> [JobWithStat] |
125 |
updateJobStatus job' = |
126 |
let jid = qjId $ jJob job' in |
127 |
map (\job -> if qjId (jJob job) == jid then job' else job) |
128 |
|
129 |
-- | Update a single job by reading it from disk, if necessary. |
130 |
updateJob :: JQStatus -> JobWithStat -> IO () |
131 |
updateJob state jb = do |
132 |
jb' <- readJobStatus jb |
133 |
maybe (return ()) (modifyJobs state . onRunningJobs . updateJobStatus) jb' |
134 |
|
135 |
-- | Sort out the finished jobs from the monitored part of the queue. |
136 |
-- This is the pure part, splitting the queue into a remaining queue |
137 |
-- and the jobs that were removed. |
138 |
sortoutFinishedJobs :: Queue -> (Queue, [QueuedJob]) |
139 |
sortoutFinishedJobs queue = |
140 |
let (run', fin) = partition |
141 |
((<= JOB_STATUS_RUNNING) . calcJobStatus . jJob) |
142 |
. qRunning $ queue |
143 |
in (queue {qRunning=run'}, map jJob fin) |
144 |
|
145 |
-- | Actually clean up the finished jobs. This is the IO wrapper around |
146 |
-- the pure `sortoutFinishedJobs`. |
147 |
cleanupFinishedJobs :: JQStatus -> IO () |
148 |
cleanupFinishedJobs qstate = do |
149 |
finished <- atomicModifyIORef (jqJobs qstate) sortoutFinishedJobs |
150 |
let showJob = show . ((fromJobId . qjId) &&& calcJobStatus) |
151 |
jlist = commaJoin $ map showJob finished |
152 |
unless (null finished) |
153 |
. logInfo $ "Finished jobs: " ++ jlist |
154 |
|
155 |
-- | Decide on which jobs to schedule next for execution. This is the |
156 |
-- pure function doing the scheduling. |
157 |
selectJobsToRun :: Queue -> (Queue, [QueuedJob]) |
158 |
selectJobsToRun queue = |
159 |
let n = maxRunningJobs - length (qRunning queue) |
160 |
(chosen, remain) = splitAt n (qEnqueued queue) |
161 |
in (queue {qEnqueued=remain, qRunning=qRunning queue ++ chosen} |
162 |
, map jJob chosen) |
163 |
|
164 |
-- | Schedule jobs to be run. This is the IO wrapper around the |
165 |
-- pure `selectJobsToRun`. |
166 |
scheduleSomeJobs :: JQStatus -> IO () |
167 |
scheduleSomeJobs qstate = do |
168 |
chosen <- atomicModifyIORef (jqJobs qstate) selectJobsToRun |
169 |
unless (null chosen) . logInfo . (++) "Staring jobs: " . commaJoin |
170 |
$ map (show . fromJobId . qjId) chosen |
171 |
JQ.startJobs chosen |
172 |
|
173 |
-- | Format the job queue status in a compact, human readable way. |
174 |
showQueue :: Queue -> String |
175 |
showQueue (Queue {qEnqueued=waiting, qRunning=running}) = |
176 |
let showids = show . map (fromJobId . qjId . jJob) |
177 |
in "Waiting jobs: " ++ showids waiting |
178 |
++ "; running jobs: " ++ showids running |
179 |
|
180 |
-- | Time-based watcher for updating the job queue. |
181 |
onTimeWatcher :: JQStatus -> IO () |
182 |
onTimeWatcher qstate = forever $ do |
183 |
threadDelay watchInterval |
184 |
logDebug "Watcher timer fired" |
185 |
jobs <- readIORef (jqJobs qstate) |
186 |
mapM_ (updateJob qstate) $ qRunning jobs |
187 |
cleanupFinishedJobs qstate |
188 |
jobs' <- readIORef (jqJobs qstate) |
189 |
logInfo $ showQueue jobs' |
190 |
scheduleSomeJobs qstate |
191 |
|
192 |
-- | Set up the job scheduler. This will also start the monitoring |
193 |
-- of changes to the running jobs. |
194 |
initJQScheduler :: JQStatus -> IO () |
195 |
initJQScheduler qstate = do |
196 |
logInfo "Starting time-based job queue watcher" |
197 |
_ <- forkIO $ onTimeWatcher qstate |
198 |
return () |
199 |
|
200 |
-- | Enqueue new jobs. This will guarantee that the jobs will be executed |
201 |
-- eventually. |
202 |
enqueueNewJobs :: JQStatus -> [QueuedJob] -> IO () |
203 |
enqueueNewJobs state jobs = do |
204 |
logInfo . (++) "New jobs enqueued: " . commaJoin |
205 |
$ map (show . fromJobId . qjId) jobs |
206 |
let jobs' = map unreadJob jobs |
207 |
modifyJobs state (onQueuedJobs (++ jobs')) |
208 |
scheduleSomeJobs state |