|
1 |
{-| Implementation of a reader for the job queue.
|
|
2 |
|
|
3 |
-}
|
|
4 |
|
|
5 |
{-
|
|
6 |
|
|
7 |
Copyright (C) 2013 Google Inc.
|
|
8 |
|
|
9 |
This program is free software; you can redistribute it and/or modify
|
|
10 |
it under the terms of the GNU General Public License as published by
|
|
11 |
the Free Software Foundation; either version 2 of the License, or
|
|
12 |
(at your option) any later version.
|
|
13 |
|
|
14 |
This program is distributed in the hope that it will be useful, but
|
|
15 |
WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
16 |
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
|
|
17 |
General Public License for more details.
|
|
18 |
|
|
19 |
You should have received a copy of the GNU General Public License
|
|
20 |
along with this program; if not, write to the Free Software
|
|
21 |
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
|
|
22 |
02110-1301, USA.
|
|
23 |
|
|
24 |
-}
|
|
25 |
|
|
26 |
module Ganeti.JQScheduler
|
|
27 |
( JQStatus
|
|
28 |
, emptyJQStatus
|
|
29 |
, initJQScheduler
|
|
30 |
, enqueueNewJobs
|
|
31 |
) where
|
|
32 |
|
|
33 |
import Control.Arrow
|
|
34 |
import Control.Concurrent
|
|
35 |
import Control.Monad
|
|
36 |
import Data.List
|
|
37 |
import Data.IORef
|
|
38 |
|
|
39 |
import Ganeti.BasicTypes
|
|
40 |
import Ganeti.Constants as C
|
|
41 |
import Ganeti.JQueue as JQ
|
|
42 |
import Ganeti.Logging
|
|
43 |
import Ganeti.Path
|
|
44 |
import Ganeti.Types
|
|
45 |
import Ganeti.Utils
|
|
46 |
|
|
47 |
data JobWithStat = JobWithStat { jStat :: FStat, jJob :: QueuedJob }
|
|
48 |
data Queue = Queue { qEnqueued :: [JobWithStat], qRunning :: [JobWithStat] }
|
|
49 |
|
|
50 |
{-| Representation of the job queue
|
|
51 |
|
|
52 |
We keep two lists of jobs (together with information about the last
|
|
53 |
fstat result observed): the jobs that are enqueued, but not yet handed
|
|
54 |
over for execution, and the jobs already handed over for execution. They
|
|
55 |
are kept together in a single IORef, so that we can atomically update
|
|
56 |
both, in particular when scheduling jobs to be handed over for execution.
|
|
57 |
|
|
58 |
-}
|
|
59 |
|
|
60 |
data JQStatus = JQStatus
|
|
61 |
{ jqJobs :: IORef Queue
|
|
62 |
}
|
|
63 |
|
|
64 |
|
|
65 |
emptyJQStatus :: IO JQStatus
|
|
66 |
emptyJQStatus = do
|
|
67 |
jqJ <- newIORef Queue {qEnqueued=[], qRunning=[]}
|
|
68 |
return JQStatus { jqJobs=jqJ }
|
|
69 |
|
|
70 |
-- | Apply a function on the running jobs.
|
|
71 |
onRunningJobs :: ([JobWithStat] -> [JobWithStat]) -> Queue -> Queue
|
|
72 |
onRunningJobs f queue = queue {qRunning=f $ qRunning queue}
|
|
73 |
|
|
74 |
-- | Apply a function on the queued jobs.
|
|
75 |
onQueuedJobs :: ([JobWithStat] -> [JobWithStat]) -> Queue -> Queue
|
|
76 |
onQueuedJobs f queue = queue {qEnqueued=f $ qEnqueued queue}
|
|
77 |
|
|
78 |
-- | Obtain a JobWithStat from a QueuedJob.
|
|
79 |
unreadJob :: QueuedJob -> JobWithStat
|
|
80 |
unreadJob job = JobWithStat {jJob=job, jStat=nullFStat}
|
|
81 |
|
|
82 |
-- | Reload interval for polling the running jobs for updates in microseconds.
|
|
83 |
watchInterval :: Int
|
|
84 |
watchInterval = C.luxidJobqueuePollInterval * 1000000
|
|
85 |
|
|
86 |
-- | Maximal number of jobs to be running at the same time.
|
|
87 |
maxRunningJobs :: Int
|
|
88 |
maxRunningJobs = C.luxidMaximalRunningJobs
|
|
89 |
|
|
90 |
-- | Wrapper function to atomically update the jobs in the queue status.
|
|
91 |
modifyJobs :: JQStatus -> (Queue -> Queue) -> IO ()
|
|
92 |
modifyJobs qstat f = atomicModifyIORef (jqJobs qstat) (flip (,) () . f)
|
|
93 |
|
|
94 |
-- | Reread a job from disk, if the file has changed.
|
|
95 |
readJobStatus :: JobWithStat -> IO (Maybe JobWithStat)
|
|
96 |
readJobStatus (JobWithStat {jStat=fstat, jJob=job}) = do
|
|
97 |
let jid = qjId job
|
|
98 |
qdir <- queueDir
|
|
99 |
let fpath = liveJobFile qdir jid
|
|
100 |
logDebug $ "Checking if " ++ fpath ++ " changed on disk."
|
|
101 |
changed <- needsReload fstat fpath
|
|
102 |
case changed of
|
|
103 |
Nothing -> do
|
|
104 |
logDebug $ "File " ++ fpath ++ " not changed on disk."
|
|
105 |
return Nothing
|
|
106 |
Just fstat' -> do
|
|
107 |
logInfo $ "Rereading " ++ fpath
|
|
108 |
readResult <- loadJobFromDisk qdir False jid
|
|
109 |
let jids = show $ fromJobId jid
|
|
110 |
case readResult of
|
|
111 |
Bad s -> do
|
|
112 |
logWarning $ "Failed to read job " ++ jids ++ ": " ++ s
|
|
113 |
return Nothing
|
|
114 |
Ok (job', _) -> do
|
|
115 |
logDebug
|
|
116 |
$ "Read job " ++ jids ++ ", staus is " ++ show (calcJobStatus job')
|
|
117 |
return . Just $ JobWithStat {jStat=fstat', jJob=job'}
|
|
118 |
|
|
119 |
-- | Update a job in the job queue, if it is still there. This is the
|
|
120 |
-- pure function for inserting a previously read change into the queue.
|
|
121 |
-- as the change contains its time stamp, we don't have to worry about a
|
|
122 |
-- later read change overwriting a newer read state. If this happens, the
|
|
123 |
-- fstat value will be outdated, so the next poller run will fix this.
|
|
124 |
updateJobStatus :: JobWithStat -> [JobWithStat] -> [JobWithStat]
|
|
125 |
updateJobStatus job' =
|
|
126 |
let jid = qjId $ jJob job' in
|
|
127 |
map (\job -> if qjId (jJob job) == jid then job' else job)
|
|
128 |
|
|
129 |
-- | Update a single job by reading it from disk, if necessary.
|
|
130 |
updateJob :: JQStatus -> JobWithStat -> IO ()
|
|
131 |
updateJob state jb = do
|
|
132 |
jb' <- readJobStatus jb
|
|
133 |
maybe (return ()) (modifyJobs state . onRunningJobs . updateJobStatus) jb'
|
|
134 |
|
|
135 |
-- | Sort out the finished jobs from the monitored part of the queue.
|
|
136 |
-- This is the pure part, splitting the queue into a remaining queue
|
|
137 |
-- and the jobs that were removed.
|
|
138 |
sortoutFinishedJobs :: Queue -> (Queue, [QueuedJob])
|
|
139 |
sortoutFinishedJobs queue =
|
|
140 |
let (run', fin) = partition
|
|
141 |
((<= JOB_STATUS_RUNNING) . calcJobStatus . jJob)
|
|
142 |
. qRunning $ queue
|
|
143 |
in (queue {qRunning=run'}, map jJob fin)
|
|
144 |
|
|
145 |
-- | Actually clean up the finished jobs. This is the IO wrapper around
|
|
146 |
-- the pure `sortoutFinishedJobs`.
|
|
147 |
cleanupFinishedJobs :: JQStatus -> IO ()
|
|
148 |
cleanupFinishedJobs qstate = do
|
|
149 |
finished <- atomicModifyIORef (jqJobs qstate) sortoutFinishedJobs
|
|
150 |
let showJob = show . ((fromJobId . qjId) &&& calcJobStatus)
|
|
151 |
jlist = commaJoin $ map showJob finished
|
|
152 |
unless (null finished)
|
|
153 |
. logInfo $ "Finished jobs: " ++ jlist
|
|
154 |
|
|
155 |
-- | Decide on which jobs to schedule next for execution. This is the
|
|
156 |
-- pure function doing the scheduling.
|
|
157 |
selectJobsToRun :: Queue -> (Queue, [QueuedJob])
|
|
158 |
selectJobsToRun queue =
|
|
159 |
let n = maxRunningJobs - length (qRunning queue)
|
|
160 |
(chosen, remain) = splitAt n (qEnqueued queue)
|
|
161 |
in (queue {qEnqueued=remain, qRunning=qRunning queue ++ chosen}
|
|
162 |
, map jJob chosen)
|
|
163 |
|
|
164 |
-- | Schedule jobs to be run. This is the IO wrapper around the
|
|
165 |
-- pure `selectJobsToRun`.
|
|
166 |
scheduleSomeJobs :: JQStatus -> IO ()
|
|
167 |
scheduleSomeJobs qstate = do
|
|
168 |
chosen <- atomicModifyIORef (jqJobs qstate) selectJobsToRun
|
|
169 |
unless (null chosen) . logInfo . (++) "Staring jobs: " . commaJoin
|
|
170 |
$ map (show . fromJobId . qjId) chosen
|
|
171 |
JQ.startJobs chosen
|
|
172 |
|
|
173 |
-- | Format the job queue status in a compact, human readable way.
|
|
174 |
showQueue :: Queue -> String
|
|
175 |
showQueue (Queue {qEnqueued=waiting, qRunning=running}) =
|
|
176 |
let showids = show . map (fromJobId . qjId . jJob)
|
|
177 |
in "Waiting jobs: " ++ showids waiting
|
|
178 |
++ "; running jobs: " ++ showids running
|
|
179 |
|
|
180 |
-- | Time-based watcher for updating the job queue.
|
|
181 |
onTimeWatcher :: JQStatus -> IO ()
|
|
182 |
onTimeWatcher qstate = forever $ do
|
|
183 |
threadDelay watchInterval
|
|
184 |
logDebug "Watcher timer fired"
|
|
185 |
jobs <- readIORef (jqJobs qstate)
|
|
186 |
mapM_ (updateJob qstate) $ qRunning jobs
|
|
187 |
cleanupFinishedJobs qstate
|
|
188 |
jobs' <- readIORef (jqJobs qstate)
|
|
189 |
logInfo $ showQueue jobs'
|
|
190 |
scheduleSomeJobs qstate
|
|
191 |
|
|
192 |
-- | Set up the job scheduler. This will also start the monitoring
|
|
193 |
-- of changes to the running jobs.
|
|
194 |
initJQScheduler :: JQStatus -> IO ()
|
|
195 |
initJQScheduler qstate = do
|
|
196 |
logInfo "Starting time-based job queue watcher"
|
|
197 |
_ <- forkIO $ onTimeWatcher qstate
|
|
198 |
return ()
|
|
199 |
|
|
200 |
-- | Enqueue new jobs. This will guarantee that the jobs will be executed
|
|
201 |
-- eventually.
|
|
202 |
enqueueNewJobs :: JQStatus -> [QueuedJob] -> IO ()
|
|
203 |
enqueueNewJobs state jobs = do
|
|
204 |
logInfo . (++) "New jobs enqueued: " . commaJoin
|
|
205 |
$ map (show . fromJobId . qjId) jobs
|
|
206 |
let jobs' = map unreadJob jobs
|
|
207 |
modifyJobs state (onQueuedJobs (++ jobs'))
|
|
208 |
scheduleSomeJobs state
|