root / src / Ganeti / JQScheduler.hs @ 48e4da5c
History | View | Annotate | Download (7.3 kB)
1 | 48e4da5c | Klaus Aehlig | {-| Implementation of a reader for the job queue. |
---|---|---|---|
2 | 48e4da5c | Klaus Aehlig | |
3 | 48e4da5c | Klaus Aehlig | -} |
4 | 48e4da5c | Klaus Aehlig | |
5 | 48e4da5c | Klaus Aehlig | {- |
6 | 48e4da5c | Klaus Aehlig | |
7 | 48e4da5c | Klaus Aehlig | Copyright (C) 2013 Google Inc. |
8 | 48e4da5c | Klaus Aehlig | |
9 | 48e4da5c | Klaus Aehlig | This program is free software; you can redistribute it and/or modify |
10 | 48e4da5c | Klaus Aehlig | it under the terms of the GNU General Public License as published by |
11 | 48e4da5c | Klaus Aehlig | the Free Software Foundation; either version 2 of the License, or |
12 | 48e4da5c | Klaus Aehlig | (at your option) any later version. |
13 | 48e4da5c | Klaus Aehlig | |
14 | 48e4da5c | Klaus Aehlig | This program is distributed in the hope that it will be useful, but |
15 | 48e4da5c | Klaus Aehlig | WITHOUT ANY WARRANTY; without even the implied warranty of |
16 | 48e4da5c | Klaus Aehlig | MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU |
17 | 48e4da5c | Klaus Aehlig | General Public License for more details. |
18 | 48e4da5c | Klaus Aehlig | |
19 | 48e4da5c | Klaus Aehlig | You should have received a copy of the GNU General Public License |
20 | 48e4da5c | Klaus Aehlig | along with this program; if not, write to the Free Software |
21 | 48e4da5c | Klaus Aehlig | Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA |
22 | 48e4da5c | Klaus Aehlig | 02110-1301, USA. |
23 | 48e4da5c | Klaus Aehlig | |
24 | 48e4da5c | Klaus Aehlig | -} |
25 | 48e4da5c | Klaus Aehlig | |
26 | 48e4da5c | Klaus Aehlig | module Ganeti.JQScheduler |
27 | 48e4da5c | Klaus Aehlig | ( JQStatus |
28 | 48e4da5c | Klaus Aehlig | , emptyJQStatus |
29 | 48e4da5c | Klaus Aehlig | , initJQScheduler |
30 | 48e4da5c | Klaus Aehlig | , enqueueNewJobs |
31 | 48e4da5c | Klaus Aehlig | ) where |
32 | 48e4da5c | Klaus Aehlig | |
33 | 48e4da5c | Klaus Aehlig | import Control.Arrow |
34 | 48e4da5c | Klaus Aehlig | import Control.Concurrent |
35 | 48e4da5c | Klaus Aehlig | import Control.Monad |
36 | 48e4da5c | Klaus Aehlig | import Data.List |
37 | 48e4da5c | Klaus Aehlig | import Data.IORef |
38 | 48e4da5c | Klaus Aehlig | |
39 | 48e4da5c | Klaus Aehlig | import Ganeti.BasicTypes |
40 | 48e4da5c | Klaus Aehlig | import Ganeti.Constants as C |
41 | 48e4da5c | Klaus Aehlig | import Ganeti.JQueue as JQ |
42 | 48e4da5c | Klaus Aehlig | import Ganeti.Logging |
43 | 48e4da5c | Klaus Aehlig | import Ganeti.Path |
44 | 48e4da5c | Klaus Aehlig | import Ganeti.Types |
45 | 48e4da5c | Klaus Aehlig | import Ganeti.Utils |
46 | 48e4da5c | Klaus Aehlig | |
47 | 48e4da5c | Klaus Aehlig | data JobWithStat = JobWithStat { jStat :: FStat, jJob :: QueuedJob } |
48 | 48e4da5c | Klaus Aehlig | data Queue = Queue { qEnqueued :: [JobWithStat], qRunning :: [JobWithStat] } |
49 | 48e4da5c | Klaus Aehlig | |
50 | 48e4da5c | Klaus Aehlig | {-| Representation of the job queue |
51 | 48e4da5c | Klaus Aehlig | |
52 | 48e4da5c | Klaus Aehlig | We keep two lists of jobs (together with information about the last |
53 | 48e4da5c | Klaus Aehlig | fstat result observed): the jobs that are enqueued, but not yet handed |
54 | 48e4da5c | Klaus Aehlig | over for execution, and the jobs already handed over for execution. They |
55 | 48e4da5c | Klaus Aehlig | are kept together in a single IORef, so that we can atomically update |
56 | 48e4da5c | Klaus Aehlig | both, in particular when scheduling jobs to be handed over for execution. |
57 | 48e4da5c | Klaus Aehlig | |
58 | 48e4da5c | Klaus Aehlig | -} |
59 | 48e4da5c | Klaus Aehlig | |
60 | 48e4da5c | Klaus Aehlig | data JQStatus = JQStatus |
61 | 48e4da5c | Klaus Aehlig | { jqJobs :: IORef Queue |
62 | 48e4da5c | Klaus Aehlig | } |
63 | 48e4da5c | Klaus Aehlig | |
64 | 48e4da5c | Klaus Aehlig | |
65 | 48e4da5c | Klaus Aehlig | emptyJQStatus :: IO JQStatus |
66 | 48e4da5c | Klaus Aehlig | emptyJQStatus = do |
67 | 48e4da5c | Klaus Aehlig | jqJ <- newIORef Queue {qEnqueued=[], qRunning=[]} |
68 | 48e4da5c | Klaus Aehlig | return JQStatus { jqJobs=jqJ } |
69 | 48e4da5c | Klaus Aehlig | |
70 | 48e4da5c | Klaus Aehlig | -- | Apply a function on the running jobs. |
71 | 48e4da5c | Klaus Aehlig | onRunningJobs :: ([JobWithStat] -> [JobWithStat]) -> Queue -> Queue |
72 | 48e4da5c | Klaus Aehlig | onRunningJobs f queue = queue {qRunning=f $ qRunning queue} |
73 | 48e4da5c | Klaus Aehlig | |
74 | 48e4da5c | Klaus Aehlig | -- | Apply a function on the queued jobs. |
75 | 48e4da5c | Klaus Aehlig | onQueuedJobs :: ([JobWithStat] -> [JobWithStat]) -> Queue -> Queue |
76 | 48e4da5c | Klaus Aehlig | onQueuedJobs f queue = queue {qEnqueued=f $ qEnqueued queue} |
77 | 48e4da5c | Klaus Aehlig | |
78 | 48e4da5c | Klaus Aehlig | -- | Obtain a JobWithStat from a QueuedJob. |
79 | 48e4da5c | Klaus Aehlig | unreadJob :: QueuedJob -> JobWithStat |
80 | 48e4da5c | Klaus Aehlig | unreadJob job = JobWithStat {jJob=job, jStat=nullFStat} |
81 | 48e4da5c | Klaus Aehlig | |
82 | 48e4da5c | Klaus Aehlig | -- | Reload interval for polling the running jobs for updates in microseconds. |
83 | 48e4da5c | Klaus Aehlig | watchInterval :: Int |
84 | 48e4da5c | Klaus Aehlig | watchInterval = C.luxidJobqueuePollInterval * 1000000 |
85 | 48e4da5c | Klaus Aehlig | |
86 | 48e4da5c | Klaus Aehlig | -- | Maximal number of jobs to be running at the same time. |
87 | 48e4da5c | Klaus Aehlig | maxRunningJobs :: Int |
88 | 48e4da5c | Klaus Aehlig | maxRunningJobs = C.luxidMaximalRunningJobs |
89 | 48e4da5c | Klaus Aehlig | |
90 | 48e4da5c | Klaus Aehlig | -- | Wrapper function to atomically update the jobs in the queue status. |
91 | 48e4da5c | Klaus Aehlig | modifyJobs :: JQStatus -> (Queue -> Queue) -> IO () |
92 | 48e4da5c | Klaus Aehlig | modifyJobs qstat f = atomicModifyIORef (jqJobs qstat) (flip (,) () . f) |
93 | 48e4da5c | Klaus Aehlig | |
94 | 48e4da5c | Klaus Aehlig | -- | Reread a job from disk, if the file has changed. |
95 | 48e4da5c | Klaus Aehlig | readJobStatus :: JobWithStat -> IO (Maybe JobWithStat) |
96 | 48e4da5c | Klaus Aehlig | readJobStatus (JobWithStat {jStat=fstat, jJob=job}) = do |
97 | 48e4da5c | Klaus Aehlig | let jid = qjId job |
98 | 48e4da5c | Klaus Aehlig | qdir <- queueDir |
99 | 48e4da5c | Klaus Aehlig | let fpath = liveJobFile qdir jid |
100 | 48e4da5c | Klaus Aehlig | logDebug $ "Checking if " ++ fpath ++ " changed on disk." |
101 | 48e4da5c | Klaus Aehlig | changed <- needsReload fstat fpath |
102 | 48e4da5c | Klaus Aehlig | case changed of |
103 | 48e4da5c | Klaus Aehlig | Nothing -> do |
104 | 48e4da5c | Klaus Aehlig | logDebug $ "File " ++ fpath ++ " not changed on disk." |
105 | 48e4da5c | Klaus Aehlig | return Nothing |
106 | 48e4da5c | Klaus Aehlig | Just fstat' -> do |
107 | 48e4da5c | Klaus Aehlig | logInfo $ "Rereading " ++ fpath |
108 | 48e4da5c | Klaus Aehlig | readResult <- loadJobFromDisk qdir False jid |
109 | 48e4da5c | Klaus Aehlig | let jids = show $ fromJobId jid |
110 | 48e4da5c | Klaus Aehlig | case readResult of |
111 | 48e4da5c | Klaus Aehlig | Bad s -> do |
112 | 48e4da5c | Klaus Aehlig | logWarning $ "Failed to read job " ++ jids ++ ": " ++ s |
113 | 48e4da5c | Klaus Aehlig | return Nothing |
114 | 48e4da5c | Klaus Aehlig | Ok (job', _) -> do |
115 | 48e4da5c | Klaus Aehlig | logDebug |
116 | 48e4da5c | Klaus Aehlig | $ "Read job " ++ jids ++ ", staus is " ++ show (calcJobStatus job') |
117 | 48e4da5c | Klaus Aehlig | return . Just $ JobWithStat {jStat=fstat', jJob=job'} |
118 | 48e4da5c | Klaus Aehlig | |
119 | 48e4da5c | Klaus Aehlig | -- | Update a job in the job queue, if it is still there. This is the |
120 | 48e4da5c | Klaus Aehlig | -- pure function for inserting a previously read change into the queue. |
121 | 48e4da5c | Klaus Aehlig | -- as the change contains its time stamp, we don't have to worry about a |
122 | 48e4da5c | Klaus Aehlig | -- later read change overwriting a newer read state. If this happens, the |
123 | 48e4da5c | Klaus Aehlig | -- fstat value will be outdated, so the next poller run will fix this. |
124 | 48e4da5c | Klaus Aehlig | updateJobStatus :: JobWithStat -> [JobWithStat] -> [JobWithStat] |
125 | 48e4da5c | Klaus Aehlig | updateJobStatus job' = |
126 | 48e4da5c | Klaus Aehlig | let jid = qjId $ jJob job' in |
127 | 48e4da5c | Klaus Aehlig | map (\job -> if qjId (jJob job) == jid then job' else job) |
128 | 48e4da5c | Klaus Aehlig | |
129 | 48e4da5c | Klaus Aehlig | -- | Update a single job by reading it from disk, if necessary. |
130 | 48e4da5c | Klaus Aehlig | updateJob :: JQStatus -> JobWithStat -> IO () |
131 | 48e4da5c | Klaus Aehlig | updateJob state jb = do |
132 | 48e4da5c | Klaus Aehlig | jb' <- readJobStatus jb |
133 | 48e4da5c | Klaus Aehlig | maybe (return ()) (modifyJobs state . onRunningJobs . updateJobStatus) jb' |
134 | 48e4da5c | Klaus Aehlig | |
135 | 48e4da5c | Klaus Aehlig | -- | Sort out the finished jobs from the monitored part of the queue. |
136 | 48e4da5c | Klaus Aehlig | -- This is the pure part, splitting the queue into a remaining queue |
137 | 48e4da5c | Klaus Aehlig | -- and the jobs that were removed. |
138 | 48e4da5c | Klaus Aehlig | sortoutFinishedJobs :: Queue -> (Queue, [QueuedJob]) |
139 | 48e4da5c | Klaus Aehlig | sortoutFinishedJobs queue = |
140 | 48e4da5c | Klaus Aehlig | let (run', fin) = partition |
141 | 48e4da5c | Klaus Aehlig | ((<= JOB_STATUS_RUNNING) . calcJobStatus . jJob) |
142 | 48e4da5c | Klaus Aehlig | . qRunning $ queue |
143 | 48e4da5c | Klaus Aehlig | in (queue {qRunning=run'}, map jJob fin) |
144 | 48e4da5c | Klaus Aehlig | |
145 | 48e4da5c | Klaus Aehlig | -- | Actually clean up the finished jobs. This is the IO wrapper around |
146 | 48e4da5c | Klaus Aehlig | -- the pure `sortoutFinishedJobs`. |
147 | 48e4da5c | Klaus Aehlig | cleanupFinishedJobs :: JQStatus -> IO () |
148 | 48e4da5c | Klaus Aehlig | cleanupFinishedJobs qstate = do |
149 | 48e4da5c | Klaus Aehlig | finished <- atomicModifyIORef (jqJobs qstate) sortoutFinishedJobs |
150 | 48e4da5c | Klaus Aehlig | let showJob = show . ((fromJobId . qjId) &&& calcJobStatus) |
151 | 48e4da5c | Klaus Aehlig | jlist = commaJoin $ map showJob finished |
152 | 48e4da5c | Klaus Aehlig | unless (null finished) |
153 | 48e4da5c | Klaus Aehlig | . logInfo $ "Finished jobs: " ++ jlist |
154 | 48e4da5c | Klaus Aehlig | |
155 | 48e4da5c | Klaus Aehlig | -- | Decide on which jobs to schedule next for execution. This is the |
156 | 48e4da5c | Klaus Aehlig | -- pure function doing the scheduling. |
157 | 48e4da5c | Klaus Aehlig | selectJobsToRun :: Queue -> (Queue, [QueuedJob]) |
158 | 48e4da5c | Klaus Aehlig | selectJobsToRun queue = |
159 | 48e4da5c | Klaus Aehlig | let n = maxRunningJobs - length (qRunning queue) |
160 | 48e4da5c | Klaus Aehlig | (chosen, remain) = splitAt n (qEnqueued queue) |
161 | 48e4da5c | Klaus Aehlig | in (queue {qEnqueued=remain, qRunning=qRunning queue ++ chosen} |
162 | 48e4da5c | Klaus Aehlig | , map jJob chosen) |
163 | 48e4da5c | Klaus Aehlig | |
164 | 48e4da5c | Klaus Aehlig | -- | Schedule jobs to be run. This is the IO wrapper around the |
165 | 48e4da5c | Klaus Aehlig | -- pure `selectJobsToRun`. |
166 | 48e4da5c | Klaus Aehlig | scheduleSomeJobs :: JQStatus -> IO () |
167 | 48e4da5c | Klaus Aehlig | scheduleSomeJobs qstate = do |
168 | 48e4da5c | Klaus Aehlig | chosen <- atomicModifyIORef (jqJobs qstate) selectJobsToRun |
169 | 48e4da5c | Klaus Aehlig | unless (null chosen) . logInfo . (++) "Staring jobs: " . commaJoin |
170 | 48e4da5c | Klaus Aehlig | $ map (show . fromJobId . qjId) chosen |
171 | 48e4da5c | Klaus Aehlig | JQ.startJobs chosen |
172 | 48e4da5c | Klaus Aehlig | |
173 | 48e4da5c | Klaus Aehlig | -- | Format the job queue status in a compact, human readable way. |
174 | 48e4da5c | Klaus Aehlig | showQueue :: Queue -> String |
175 | 48e4da5c | Klaus Aehlig | showQueue (Queue {qEnqueued=waiting, qRunning=running}) = |
176 | 48e4da5c | Klaus Aehlig | let showids = show . map (fromJobId . qjId . jJob) |
177 | 48e4da5c | Klaus Aehlig | in "Waiting jobs: " ++ showids waiting |
178 | 48e4da5c | Klaus Aehlig | ++ "; running jobs: " ++ showids running |
179 | 48e4da5c | Klaus Aehlig | |
180 | 48e4da5c | Klaus Aehlig | -- | Time-based watcher for updating the job queue. |
181 | 48e4da5c | Klaus Aehlig | onTimeWatcher :: JQStatus -> IO () |
182 | 48e4da5c | Klaus Aehlig | onTimeWatcher qstate = forever $ do |
183 | 48e4da5c | Klaus Aehlig | threadDelay watchInterval |
184 | 48e4da5c | Klaus Aehlig | logDebug "Watcher timer fired" |
185 | 48e4da5c | Klaus Aehlig | jobs <- readIORef (jqJobs qstate) |
186 | 48e4da5c | Klaus Aehlig | mapM_ (updateJob qstate) $ qRunning jobs |
187 | 48e4da5c | Klaus Aehlig | cleanupFinishedJobs qstate |
188 | 48e4da5c | Klaus Aehlig | jobs' <- readIORef (jqJobs qstate) |
189 | 48e4da5c | Klaus Aehlig | logInfo $ showQueue jobs' |
190 | 48e4da5c | Klaus Aehlig | scheduleSomeJobs qstate |
191 | 48e4da5c | Klaus Aehlig | |
192 | 48e4da5c | Klaus Aehlig | -- | Set up the job scheduler. This will also start the monitoring |
193 | 48e4da5c | Klaus Aehlig | -- of changes to the running jobs. |
194 | 48e4da5c | Klaus Aehlig | initJQScheduler :: JQStatus -> IO () |
195 | 48e4da5c | Klaus Aehlig | initJQScheduler qstate = do |
196 | 48e4da5c | Klaus Aehlig | logInfo "Starting time-based job queue watcher" |
197 | 48e4da5c | Klaus Aehlig | _ <- forkIO $ onTimeWatcher qstate |
198 | 48e4da5c | Klaus Aehlig | return () |
199 | 48e4da5c | Klaus Aehlig | |
200 | 48e4da5c | Klaus Aehlig | -- | Enqueue new jobs. This will guarantee that the jobs will be executed |
201 | 48e4da5c | Klaus Aehlig | -- eventually. |
202 | 48e4da5c | Klaus Aehlig | enqueueNewJobs :: JQStatus -> [QueuedJob] -> IO () |
203 | 48e4da5c | Klaus Aehlig | enqueueNewJobs state jobs = do |
204 | 48e4da5c | Klaus Aehlig | logInfo . (++) "New jobs enqueued: " . commaJoin |
205 | 48e4da5c | Klaus Aehlig | $ map (show . fromJobId . qjId) jobs |
206 | 48e4da5c | Klaus Aehlig | let jobs' = map unreadJob jobs |
207 | 48e4da5c | Klaus Aehlig | modifyJobs state (onQueuedJobs (++ jobs')) |
208 | 48e4da5c | Klaus Aehlig | scheduleSomeJobs state |