Statistics
| Branch: | Tag: | Revision:

root / src / Ganeti / JQScheduler.hs @ b81650b0

History | View | Annotate | Download (10.6 kB)

1 48e4da5c Klaus Aehlig
{-| Implementation of a reader for the job queue.
2 48e4da5c Klaus Aehlig
3 48e4da5c Klaus Aehlig
-}
4 48e4da5c Klaus Aehlig
5 48e4da5c Klaus Aehlig
{-
6 48e4da5c Klaus Aehlig
7 48e4da5c Klaus Aehlig
Copyright (C) 2013 Google Inc.
8 48e4da5c Klaus Aehlig
9 48e4da5c Klaus Aehlig
This program is free software; you can redistribute it and/or modify
10 48e4da5c Klaus Aehlig
it under the terms of the GNU General Public License as published by
11 48e4da5c Klaus Aehlig
the Free Software Foundation; either version 2 of the License, or
12 48e4da5c Klaus Aehlig
(at your option) any later version.
13 48e4da5c Klaus Aehlig
14 48e4da5c Klaus Aehlig
This program is distributed in the hope that it will be useful, but
15 48e4da5c Klaus Aehlig
WITHOUT ANY WARRANTY; without even the implied warranty of
16 48e4da5c Klaus Aehlig
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
17 48e4da5c Klaus Aehlig
General Public License for more details.
18 48e4da5c Klaus Aehlig
19 48e4da5c Klaus Aehlig
You should have received a copy of the GNU General Public License
20 48e4da5c Klaus Aehlig
along with this program; if not, write to the Free Software
21 48e4da5c Klaus Aehlig
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 48e4da5c Klaus Aehlig
02110-1301, USA.
23 48e4da5c Klaus Aehlig
24 48e4da5c Klaus Aehlig
-}
25 48e4da5c Klaus Aehlig
26 48e4da5c Klaus Aehlig
module Ganeti.JQScheduler
27 48e4da5c Klaus Aehlig
  ( JQStatus
28 48e4da5c Klaus Aehlig
  , emptyJQStatus
29 48e4da5c Klaus Aehlig
  , initJQScheduler
30 48e4da5c Klaus Aehlig
  , enqueueNewJobs
31 48e4da5c Klaus Aehlig
  ) where
32 48e4da5c Klaus Aehlig
33 48e4da5c Klaus Aehlig
import Control.Arrow
34 48e4da5c Klaus Aehlig
import Control.Concurrent
35 1c532d2d Klaus Aehlig
import Control.Exception
36 48e4da5c Klaus Aehlig
import Control.Monad
37 48e4da5c Klaus Aehlig
import Data.List
38 b81650b0 Klaus Aehlig
import Data.Maybe
39 48e4da5c Klaus Aehlig
import Data.IORef
40 ed6cf449 Klaus Aehlig
import System.INotify
41 48e4da5c Klaus Aehlig
42 48e4da5c Klaus Aehlig
import Ganeti.BasicTypes
43 48e4da5c Klaus Aehlig
import Ganeti.Constants as C
44 48e4da5c Klaus Aehlig
import Ganeti.JQueue as JQ
45 48e4da5c Klaus Aehlig
import Ganeti.Logging
46 48e4da5c Klaus Aehlig
import Ganeti.Path
47 48e4da5c Klaus Aehlig
import Ganeti.Types
48 48e4da5c Klaus Aehlig
import Ganeti.Utils
49 48e4da5c Klaus Aehlig
50 ed6cf449 Klaus Aehlig
data JobWithStat = JobWithStat { jINotify :: Maybe INotify
51 ed6cf449 Klaus Aehlig
                               , jStat :: FStat
52 ed6cf449 Klaus Aehlig
                               , jJob :: QueuedJob
53 ed6cf449 Klaus Aehlig
                               }
54 48e4da5c Klaus Aehlig
data Queue = Queue { qEnqueued :: [JobWithStat], qRunning :: [JobWithStat] }
55 48e4da5c Klaus Aehlig
56 48e4da5c Klaus Aehlig
{-| Representation of the job queue
57 48e4da5c Klaus Aehlig
58 48e4da5c Klaus Aehlig
We keep two lists of jobs (together with information about the last
59 48e4da5c Klaus Aehlig
fstat result observed): the jobs that are enqueued, but not yet handed
60 48e4da5c Klaus Aehlig
over for execution, and the jobs already handed over for execution. They
61 48e4da5c Klaus Aehlig
are kept together in a single IORef, so that we can atomically update
62 48e4da5c Klaus Aehlig
both, in particular when scheduling jobs to be handed over for execution.
63 48e4da5c Klaus Aehlig
64 48e4da5c Klaus Aehlig
-}
65 48e4da5c Klaus Aehlig
66 48e4da5c Klaus Aehlig
data JQStatus = JQStatus
67 48e4da5c Klaus Aehlig
  { jqJobs :: IORef Queue
68 48e4da5c Klaus Aehlig
  }
69 48e4da5c Klaus Aehlig
70 48e4da5c Klaus Aehlig
71 48e4da5c Klaus Aehlig
emptyJQStatus :: IO JQStatus
72 48e4da5c Klaus Aehlig
emptyJQStatus = do
73 48e4da5c Klaus Aehlig
  jqJ <- newIORef Queue {qEnqueued=[], qRunning=[]}
74 48e4da5c Klaus Aehlig
  return JQStatus { jqJobs=jqJ }
75 48e4da5c Klaus Aehlig
76 48e4da5c Klaus Aehlig
-- | Apply a function on the running jobs.
77 48e4da5c Klaus Aehlig
onRunningJobs :: ([JobWithStat] -> [JobWithStat]) -> Queue -> Queue
78 48e4da5c Klaus Aehlig
onRunningJobs f queue = queue {qRunning=f $ qRunning queue}
79 48e4da5c Klaus Aehlig
80 48e4da5c Klaus Aehlig
-- | Apply a function on the queued jobs.
81 48e4da5c Klaus Aehlig
onQueuedJobs :: ([JobWithStat] -> [JobWithStat]) -> Queue -> Queue
82 48e4da5c Klaus Aehlig
onQueuedJobs f queue = queue {qEnqueued=f $ qEnqueued queue}
83 48e4da5c Klaus Aehlig
84 48e4da5c Klaus Aehlig
-- | Obtain a JobWithStat from a QueuedJob.
85 48e4da5c Klaus Aehlig
unreadJob :: QueuedJob -> JobWithStat
86 ed6cf449 Klaus Aehlig
unreadJob job = JobWithStat {jJob=job, jStat=nullFStat, jINotify=Nothing}
87 48e4da5c Klaus Aehlig
88 48e4da5c Klaus Aehlig
-- | Reload interval for polling the running jobs for updates in microseconds.
89 48e4da5c Klaus Aehlig
watchInterval :: Int
90 48e4da5c Klaus Aehlig
watchInterval = C.luxidJobqueuePollInterval * 1000000 
91 48e4da5c Klaus Aehlig
92 48e4da5c Klaus Aehlig
-- | Maximal number of jobs to be running at the same time.
93 48e4da5c Klaus Aehlig
maxRunningJobs :: Int
94 48e4da5c Klaus Aehlig
maxRunningJobs = C.luxidMaximalRunningJobs 
95 48e4da5c Klaus Aehlig
96 48e4da5c Klaus Aehlig
-- | Wrapper function to atomically update the jobs in the queue status.
97 48e4da5c Klaus Aehlig
modifyJobs :: JQStatus -> (Queue -> Queue) -> IO ()
98 48e4da5c Klaus Aehlig
modifyJobs qstat f = atomicModifyIORef (jqJobs qstat) (flip (,) ()  . f)
99 48e4da5c Klaus Aehlig
100 48e4da5c Klaus Aehlig
-- | Reread a job from disk, if the file has changed.
101 48e4da5c Klaus Aehlig
readJobStatus :: JobWithStat -> IO (Maybe JobWithStat)
102 ed6cf449 Klaus Aehlig
readJobStatus jWS@(JobWithStat {jStat=fstat, jJob=job})  = do
103 48e4da5c Klaus Aehlig
  let jid = qjId job
104 48e4da5c Klaus Aehlig
  qdir <- queueDir
105 48e4da5c Klaus Aehlig
  let fpath = liveJobFile qdir jid
106 48e4da5c Klaus Aehlig
  logDebug $ "Checking if " ++ fpath ++ " changed on disk."
107 350f0759 Klaus Aehlig
  changedResult <- try $ needsReload fstat fpath
108 350f0759 Klaus Aehlig
                   :: IO (Either IOError (Maybe FStat))
109 350f0759 Klaus Aehlig
  let changed = either (const $ Just nullFStat) id changedResult
110 48e4da5c Klaus Aehlig
  case changed of
111 48e4da5c Klaus Aehlig
    Nothing -> do
112 48e4da5c Klaus Aehlig
      logDebug $ "File " ++ fpath ++ " not changed on disk."
113 48e4da5c Klaus Aehlig
      return Nothing
114 48e4da5c Klaus Aehlig
    Just fstat' -> do
115 48e4da5c Klaus Aehlig
      let jids = show $ fromJobId jid
116 350f0759 Klaus Aehlig
      logInfo $ "Rereading job "  ++ jids
117 350f0759 Klaus Aehlig
      readResult <- loadJobFromDisk qdir True jid
118 48e4da5c Klaus Aehlig
      case readResult of
119 48e4da5c Klaus Aehlig
        Bad s -> do
120 48e4da5c Klaus Aehlig
          logWarning $ "Failed to read job " ++ jids ++ ": " ++ s
121 48e4da5c Klaus Aehlig
          return Nothing
122 48e4da5c Klaus Aehlig
        Ok (job', _) -> do
123 48e4da5c Klaus Aehlig
          logDebug
124 48e4da5c Klaus Aehlig
            $ "Read job " ++ jids ++ ", staus is " ++ show (calcJobStatus job')
125 ed6cf449 Klaus Aehlig
          return . Just $ jWS {jStat=fstat', jJob=job'}
126 ed6cf449 Klaus Aehlig
                          -- jINotify unchanged
127 48e4da5c Klaus Aehlig
128 48e4da5c Klaus Aehlig
-- | Update a job in the job queue, if it is still there. This is the
129 48e4da5c Klaus Aehlig
-- pure function for inserting a previously read change into the queue.
130 48e4da5c Klaus Aehlig
-- as the change contains its time stamp, we don't have to worry about a
131 48e4da5c Klaus Aehlig
-- later read change overwriting a newer read state. If this happens, the
132 48e4da5c Klaus Aehlig
-- fstat value will be outdated, so the next poller run will fix this.
133 48e4da5c Klaus Aehlig
updateJobStatus :: JobWithStat -> [JobWithStat] -> [JobWithStat]
134 48e4da5c Klaus Aehlig
updateJobStatus job' =
135 48e4da5c Klaus Aehlig
  let jid = qjId $ jJob job' in
136 48e4da5c Klaus Aehlig
  map (\job -> if qjId (jJob job) == jid then job' else job)
137 48e4da5c Klaus Aehlig
138 48e4da5c Klaus Aehlig
-- | Update a single job by reading it from disk, if necessary.
139 48e4da5c Klaus Aehlig
updateJob :: JQStatus -> JobWithStat -> IO ()
140 48e4da5c Klaus Aehlig
updateJob state jb = do
141 48e4da5c Klaus Aehlig
  jb' <- readJobStatus jb
142 48e4da5c Klaus Aehlig
  maybe (return ()) (modifyJobs state . onRunningJobs . updateJobStatus) jb'
143 48e4da5c Klaus Aehlig
144 48e4da5c Klaus Aehlig
-- | Sort out the finished jobs from the monitored part of the queue.
145 48e4da5c Klaus Aehlig
-- This is the pure part, splitting the queue into a remaining queue
146 48e4da5c Klaus Aehlig
-- and the jobs that were removed.
147 a2977f53 Klaus Aehlig
sortoutFinishedJobs :: Queue -> (Queue, [JobWithStat])
148 48e4da5c Klaus Aehlig
sortoutFinishedJobs queue =
149 1b3bde96 Klaus Aehlig
  let (fin, run') = partition (jobFinalized . jJob) . qRunning $ queue
150 a2977f53 Klaus Aehlig
  in (queue {qRunning=run'}, fin)
151 48e4da5c Klaus Aehlig
152 48e4da5c Klaus Aehlig
-- | Actually clean up the finished jobs. This is the IO wrapper around
153 48e4da5c Klaus Aehlig
-- the pure `sortoutFinishedJobs`.
154 48e4da5c Klaus Aehlig
cleanupFinishedJobs :: JQStatus -> IO ()
155 48e4da5c Klaus Aehlig
cleanupFinishedJobs qstate = do
156 48e4da5c Klaus Aehlig
  finished <- atomicModifyIORef (jqJobs qstate) sortoutFinishedJobs
157 a2977f53 Klaus Aehlig
  let showJob = show . ((fromJobId . qjId) &&& calcJobStatus) . jJob
158 48e4da5c Klaus Aehlig
      jlist = commaJoin $ map showJob finished
159 48e4da5c Klaus Aehlig
  unless (null finished)
160 48e4da5c Klaus Aehlig
    . logInfo $ "Finished jobs: " ++ jlist
161 cc5ab470 Klaus Aehlig
  mapM_ (maybe (return ()) killINotify . jINotify) finished
162 48e4da5c Klaus Aehlig
163 b81650b0 Klaus Aehlig
-- | Watcher task for a job, to update it on file changes. It also
164 b81650b0 Klaus Aehlig
-- reinstantiates itself upon receiving an Ignored event.
165 b81650b0 Klaus Aehlig
jobWatcher :: JQStatus -> JobWithStat -> Event -> IO ()
166 b81650b0 Klaus Aehlig
jobWatcher state jWS e = do
167 b81650b0 Klaus Aehlig
  let jid = qjId $ jJob jWS
168 b81650b0 Klaus Aehlig
      jids = show $ fromJobId jid
169 b81650b0 Klaus Aehlig
  logInfo $ "Scheduler notified of change of job " ++ jids
170 b81650b0 Klaus Aehlig
  logDebug $ "Scheulder notify event for " ++ jids ++ ": " ++ show e
171 b81650b0 Klaus Aehlig
  let inotify = jINotify jWS
172 b81650b0 Klaus Aehlig
  when (e == Ignored  && isJust inotify) $ do
173 b81650b0 Klaus Aehlig
    qdir <- queueDir
174 b81650b0 Klaus Aehlig
    let fpath = liveJobFile qdir jid
175 b81650b0 Klaus Aehlig
    _ <- addWatch (fromJust inotify) [Modify, Delete] fpath
176 b81650b0 Klaus Aehlig
           (jobWatcher state jWS)
177 b81650b0 Klaus Aehlig
    return ()
178 b81650b0 Klaus Aehlig
  updateJob state jWS
179 b81650b0 Klaus Aehlig
180 b81650b0 Klaus Aehlig
-- | Attach the job watcher to a running job.
181 b81650b0 Klaus Aehlig
attachWatcher :: JQStatus -> JobWithStat -> IO ()
182 b81650b0 Klaus Aehlig
attachWatcher state jWS = when (isNothing $ jINotify jWS) $ do
183 b81650b0 Klaus Aehlig
  inotify <- initINotify
184 b81650b0 Klaus Aehlig
  qdir <- queueDir
185 b81650b0 Klaus Aehlig
  let fpath = liveJobFile qdir . qjId $ jJob jWS
186 b81650b0 Klaus Aehlig
      jWS' = jWS { jINotify=Just inotify }
187 b81650b0 Klaus Aehlig
  logDebug $ "Attaching queue watcher for " ++ fpath
188 b81650b0 Klaus Aehlig
  _ <- addWatch inotify [Modify, Delete] fpath $ jobWatcher state jWS'
189 b81650b0 Klaus Aehlig
  modifyJobs state . onRunningJobs $ updateJobStatus jWS'
190 b81650b0 Klaus Aehlig
191 48e4da5c Klaus Aehlig
-- | Decide on which jobs to schedule next for execution. This is the
192 48e4da5c Klaus Aehlig
-- pure function doing the scheduling.
193 a2977f53 Klaus Aehlig
selectJobsToRun :: Queue -> (Queue, [JobWithStat])
194 48e4da5c Klaus Aehlig
selectJobsToRun queue =
195 48e4da5c Klaus Aehlig
  let n = maxRunningJobs - length (qRunning queue)
196 48e4da5c Klaus Aehlig
      (chosen, remain) = splitAt n (qEnqueued queue)
197 a2977f53 Klaus Aehlig
  in (queue {qEnqueued=remain, qRunning=qRunning queue ++ chosen}, chosen)
198 48e4da5c Klaus Aehlig
199 1c532d2d Klaus Aehlig
-- | Requeue jobs that were previously selected for execution
200 1c532d2d Klaus Aehlig
-- but couldn't be started.
201 a2977f53 Klaus Aehlig
requeueJobs :: JQStatus -> [JobWithStat] -> IOError -> IO ()
202 1c532d2d Klaus Aehlig
requeueJobs qstate jobs err = do
203 a2977f53 Klaus Aehlig
  let jids = map (qjId . jJob) jobs
204 1c532d2d Klaus Aehlig
      jidsString = commaJoin $ map (show . fromJobId) jids
205 1c532d2d Klaus Aehlig
      rmJobs = filter ((`notElem` jids) . qjId . jJob)
206 1c532d2d Klaus Aehlig
  logWarning $ "Starting jobs failed: " ++ show err
207 1c532d2d Klaus Aehlig
  logWarning $ "Rescheduling jobs: " ++ jidsString
208 1c532d2d Klaus Aehlig
  modifyJobs qstate (onRunningJobs rmJobs)
209 a2977f53 Klaus Aehlig
  modifyJobs qstate (onQueuedJobs $ (++) jobs)
210 1c532d2d Klaus Aehlig
211 48e4da5c Klaus Aehlig
-- | Schedule jobs to be run. This is the IO wrapper around the
212 48e4da5c Klaus Aehlig
-- pure `selectJobsToRun`.
213 48e4da5c Klaus Aehlig
scheduleSomeJobs :: JQStatus -> IO ()
214 48e4da5c Klaus Aehlig
scheduleSomeJobs qstate = do
215 48e4da5c Klaus Aehlig
  chosen <- atomicModifyIORef (jqJobs qstate) selectJobsToRun
216 a2977f53 Klaus Aehlig
  let jobs = map jJob chosen
217 7dd21737 Klaus Aehlig
  unless (null chosen) . logInfo . (++) "Starting jobs: " . commaJoin
218 a2977f53 Klaus Aehlig
    $ map (show . fromJobId . qjId) jobs
219 b81650b0 Klaus Aehlig
  mapM_ (attachWatcher qstate) chosen
220 a2977f53 Klaus Aehlig
  result <- try $ JQ.startJobs jobs
221 1c532d2d Klaus Aehlig
  either (requeueJobs qstate chosen) return result
222 48e4da5c Klaus Aehlig
223 48e4da5c Klaus Aehlig
-- | Format the job queue status in a compact, human readable way.
224 48e4da5c Klaus Aehlig
showQueue :: Queue -> String
225 48e4da5c Klaus Aehlig
showQueue (Queue {qEnqueued=waiting, qRunning=running}) =
226 48e4da5c Klaus Aehlig
  let showids = show . map (fromJobId . qjId . jJob)
227 48e4da5c Klaus Aehlig
  in "Waiting jobs: " ++ showids waiting 
228 48e4da5c Klaus Aehlig
       ++ "; running jobs: " ++ showids running
229 48e4da5c Klaus Aehlig
230 48e4da5c Klaus Aehlig
-- | Time-based watcher for updating the job queue.
231 48e4da5c Klaus Aehlig
onTimeWatcher :: JQStatus -> IO ()
232 48e4da5c Klaus Aehlig
onTimeWatcher qstate = forever $ do
233 48e4da5c Klaus Aehlig
  threadDelay watchInterval
234 fe50bb65 Klaus Aehlig
  logDebug "Job queue watcher timer fired"
235 48e4da5c Klaus Aehlig
  jobs <- readIORef (jqJobs qstate)
236 48e4da5c Klaus Aehlig
  mapM_ (updateJob qstate) $ qRunning jobs
237 48e4da5c Klaus Aehlig
  cleanupFinishedJobs qstate
238 48e4da5c Klaus Aehlig
  jobs' <- readIORef (jqJobs qstate)
239 48e4da5c Klaus Aehlig
  logInfo $ showQueue jobs'
240 48e4da5c Klaus Aehlig
  scheduleSomeJobs qstate
241 48e4da5c Klaus Aehlig
242 2713b91a Klaus Aehlig
-- | Read a single, non-archived, job, specified by its id, from disk.
243 2713b91a Klaus Aehlig
readJobFromDisk :: JobId -> IO (Result JobWithStat)
244 2713b91a Klaus Aehlig
readJobFromDisk jid = do
245 2713b91a Klaus Aehlig
  qdir <- queueDir
246 2713b91a Klaus Aehlig
  let fpath = liveJobFile qdir jid
247 2713b91a Klaus Aehlig
  logDebug $ "Reading " ++ fpath
248 2713b91a Klaus Aehlig
  tryFstat <- try $ getFStat fpath :: IO (Either IOError FStat)
249 2713b91a Klaus Aehlig
  let fstat = either (const nullFStat) id tryFstat
250 2713b91a Klaus Aehlig
  loadResult <- JQ.loadJobFromDisk qdir False jid
251 ed6cf449 Klaus Aehlig
  return $ liftM (JobWithStat Nothing fstat . fst) loadResult
252 2713b91a Klaus Aehlig
253 2713b91a Klaus Aehlig
-- | Read all non-finalized jobs from disk.
254 2713b91a Klaus Aehlig
readJobsFromDisk :: IO [JobWithStat]
255 2713b91a Klaus Aehlig
readJobsFromDisk = do
256 2713b91a Klaus Aehlig
  logInfo "Loading job queue"
257 2713b91a Klaus Aehlig
  qdir <- queueDir
258 2713b91a Klaus Aehlig
  eitherJids <- JQ.getJobIDs [qdir]
259 2713b91a Klaus Aehlig
  let jids = either (const []) JQ.sortJobIDs eitherJids
260 2713b91a Klaus Aehlig
      jidsstring = commaJoin $ map (show . fromJobId) jids
261 2713b91a Klaus Aehlig
  logInfo $ "Non-archived jobs on disk: " ++ jidsstring
262 2713b91a Klaus Aehlig
  jobs <- mapM readJobFromDisk jids
263 2713b91a Klaus Aehlig
  return $ justOk jobs
264 2713b91a Klaus Aehlig
265 48e4da5c Klaus Aehlig
-- | Set up the job scheduler. This will also start the monitoring
266 48e4da5c Klaus Aehlig
-- of changes to the running jobs.
267 48e4da5c Klaus Aehlig
initJQScheduler :: JQStatus -> IO ()
268 48e4da5c Klaus Aehlig
initJQScheduler qstate = do
269 2713b91a Klaus Aehlig
  alljobs <- readJobsFromDisk
270 2713b91a Klaus Aehlig
  let jobs = filter (not . jobFinalized . jJob) alljobs
271 2713b91a Klaus Aehlig
      (running, queued) = partition (jobStarted . jJob) jobs
272 2713b91a Klaus Aehlig
  modifyJobs qstate (onQueuedJobs (++ queued) . onRunningJobs (++ running))
273 2713b91a Klaus Aehlig
  jqjobs <- readIORef (jqJobs qstate)
274 2713b91a Klaus Aehlig
  logInfo $ showQueue jqjobs
275 2713b91a Klaus Aehlig
  scheduleSomeJobs qstate
276 48e4da5c Klaus Aehlig
  logInfo "Starting time-based job queue watcher"
277 48e4da5c Klaus Aehlig
  _ <- forkIO $ onTimeWatcher qstate
278 48e4da5c Klaus Aehlig
  return ()
279 48e4da5c Klaus Aehlig
280 48e4da5c Klaus Aehlig
-- | Enqueue new jobs. This will guarantee that the jobs will be executed
281 48e4da5c Klaus Aehlig
-- eventually.
282 48e4da5c Klaus Aehlig
enqueueNewJobs :: JQStatus -> [QueuedJob] -> IO ()
283 48e4da5c Klaus Aehlig
enqueueNewJobs state jobs = do
284 48e4da5c Klaus Aehlig
  logInfo . (++) "New jobs enqueued: " . commaJoin
285 48e4da5c Klaus Aehlig
    $ map (show . fromJobId . qjId) jobs
286 48e4da5c Klaus Aehlig
  let jobs' = map unreadJob jobs
287 48e4da5c Klaus Aehlig
  modifyJobs state (onQueuedJobs (++ jobs'))
288 48e4da5c Klaus Aehlig
  scheduleSomeJobs state