Statistics
| Branch: | Tag: | Revision:

root / src / Ganeti / JQScheduler.hs @ cc5ab470

History | View | Annotate | Download (9.4 kB)

1 48e4da5c Klaus Aehlig
{-| Implementation of a reader for the job queue.
2 48e4da5c Klaus Aehlig
3 48e4da5c Klaus Aehlig
-}
4 48e4da5c Klaus Aehlig
5 48e4da5c Klaus Aehlig
{-
6 48e4da5c Klaus Aehlig
7 48e4da5c Klaus Aehlig
Copyright (C) 2013 Google Inc.
8 48e4da5c Klaus Aehlig
9 48e4da5c Klaus Aehlig
This program is free software; you can redistribute it and/or modify
10 48e4da5c Klaus Aehlig
it under the terms of the GNU General Public License as published by
11 48e4da5c Klaus Aehlig
the Free Software Foundation; either version 2 of the License, or
12 48e4da5c Klaus Aehlig
(at your option) any later version.
13 48e4da5c Klaus Aehlig
14 48e4da5c Klaus Aehlig
This program is distributed in the hope that it will be useful, but
15 48e4da5c Klaus Aehlig
WITHOUT ANY WARRANTY; without even the implied warranty of
16 48e4da5c Klaus Aehlig
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
17 48e4da5c Klaus Aehlig
General Public License for more details.
18 48e4da5c Klaus Aehlig
19 48e4da5c Klaus Aehlig
You should have received a copy of the GNU General Public License
20 48e4da5c Klaus Aehlig
along with this program; if not, write to the Free Software
21 48e4da5c Klaus Aehlig
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 48e4da5c Klaus Aehlig
02110-1301, USA.
23 48e4da5c Klaus Aehlig
24 48e4da5c Klaus Aehlig
-}
25 48e4da5c Klaus Aehlig
26 48e4da5c Klaus Aehlig
module Ganeti.JQScheduler
27 48e4da5c Klaus Aehlig
  ( JQStatus
28 48e4da5c Klaus Aehlig
  , emptyJQStatus
29 48e4da5c Klaus Aehlig
  , initJQScheduler
30 48e4da5c Klaus Aehlig
  , enqueueNewJobs
31 48e4da5c Klaus Aehlig
  ) where
32 48e4da5c Klaus Aehlig
33 48e4da5c Klaus Aehlig
import Control.Arrow
34 48e4da5c Klaus Aehlig
import Control.Concurrent
35 1c532d2d Klaus Aehlig
import Control.Exception
36 48e4da5c Klaus Aehlig
import Control.Monad
37 48e4da5c Klaus Aehlig
import Data.List
38 48e4da5c Klaus Aehlig
import Data.IORef
39 ed6cf449 Klaus Aehlig
import System.INotify
40 48e4da5c Klaus Aehlig
41 48e4da5c Klaus Aehlig
import Ganeti.BasicTypes
42 48e4da5c Klaus Aehlig
import Ganeti.Constants as C
43 48e4da5c Klaus Aehlig
import Ganeti.JQueue as JQ
44 48e4da5c Klaus Aehlig
import Ganeti.Logging
45 48e4da5c Klaus Aehlig
import Ganeti.Path
46 48e4da5c Klaus Aehlig
import Ganeti.Types
47 48e4da5c Klaus Aehlig
import Ganeti.Utils
48 48e4da5c Klaus Aehlig
49 ed6cf449 Klaus Aehlig
data JobWithStat = JobWithStat { jINotify :: Maybe INotify
50 ed6cf449 Klaus Aehlig
                               , jStat :: FStat
51 ed6cf449 Klaus Aehlig
                               , jJob :: QueuedJob
52 ed6cf449 Klaus Aehlig
                               }
53 48e4da5c Klaus Aehlig
data Queue = Queue { qEnqueued :: [JobWithStat], qRunning :: [JobWithStat] }
54 48e4da5c Klaus Aehlig
55 48e4da5c Klaus Aehlig
{-| Representation of the job queue
56 48e4da5c Klaus Aehlig
57 48e4da5c Klaus Aehlig
We keep two lists of jobs (together with information about the last
58 48e4da5c Klaus Aehlig
fstat result observed): the jobs that are enqueued, but not yet handed
59 48e4da5c Klaus Aehlig
over for execution, and the jobs already handed over for execution. They
60 48e4da5c Klaus Aehlig
are kept together in a single IORef, so that we can atomically update
61 48e4da5c Klaus Aehlig
both, in particular when scheduling jobs to be handed over for execution.
62 48e4da5c Klaus Aehlig
63 48e4da5c Klaus Aehlig
-}
64 48e4da5c Klaus Aehlig
65 48e4da5c Klaus Aehlig
data JQStatus = JQStatus
66 48e4da5c Klaus Aehlig
  { jqJobs :: IORef Queue
67 48e4da5c Klaus Aehlig
  }
68 48e4da5c Klaus Aehlig
69 48e4da5c Klaus Aehlig
70 48e4da5c Klaus Aehlig
emptyJQStatus :: IO JQStatus
71 48e4da5c Klaus Aehlig
emptyJQStatus = do
72 48e4da5c Klaus Aehlig
  jqJ <- newIORef Queue {qEnqueued=[], qRunning=[]}
73 48e4da5c Klaus Aehlig
  return JQStatus { jqJobs=jqJ }
74 48e4da5c Klaus Aehlig
75 48e4da5c Klaus Aehlig
-- | Apply a function on the running jobs.
76 48e4da5c Klaus Aehlig
onRunningJobs :: ([JobWithStat] -> [JobWithStat]) -> Queue -> Queue
77 48e4da5c Klaus Aehlig
onRunningJobs f queue = queue {qRunning=f $ qRunning queue}
78 48e4da5c Klaus Aehlig
79 48e4da5c Klaus Aehlig
-- | Apply a function on the queued jobs.
80 48e4da5c Klaus Aehlig
onQueuedJobs :: ([JobWithStat] -> [JobWithStat]) -> Queue -> Queue
81 48e4da5c Klaus Aehlig
onQueuedJobs f queue = queue {qEnqueued=f $ qEnqueued queue}
82 48e4da5c Klaus Aehlig
83 48e4da5c Klaus Aehlig
-- | Obtain a JobWithStat from a QueuedJob.
84 48e4da5c Klaus Aehlig
unreadJob :: QueuedJob -> JobWithStat
85 ed6cf449 Klaus Aehlig
unreadJob job = JobWithStat {jJob=job, jStat=nullFStat, jINotify=Nothing}
86 48e4da5c Klaus Aehlig
87 48e4da5c Klaus Aehlig
-- | Reload interval for polling the running jobs for updates in microseconds.
88 48e4da5c Klaus Aehlig
watchInterval :: Int
89 48e4da5c Klaus Aehlig
watchInterval = C.luxidJobqueuePollInterval * 1000000 
90 48e4da5c Klaus Aehlig
91 48e4da5c Klaus Aehlig
-- | Maximal number of jobs to be running at the same time.
92 48e4da5c Klaus Aehlig
maxRunningJobs :: Int
93 48e4da5c Klaus Aehlig
maxRunningJobs = C.luxidMaximalRunningJobs 
94 48e4da5c Klaus Aehlig
95 48e4da5c Klaus Aehlig
-- | Wrapper function to atomically update the jobs in the queue status.
96 48e4da5c Klaus Aehlig
modifyJobs :: JQStatus -> (Queue -> Queue) -> IO ()
97 48e4da5c Klaus Aehlig
modifyJobs qstat f = atomicModifyIORef (jqJobs qstat) (flip (,) ()  . f)
98 48e4da5c Klaus Aehlig
99 48e4da5c Klaus Aehlig
-- | Reread a job from disk, if the file has changed.
100 48e4da5c Klaus Aehlig
readJobStatus :: JobWithStat -> IO (Maybe JobWithStat)
101 ed6cf449 Klaus Aehlig
readJobStatus jWS@(JobWithStat {jStat=fstat, jJob=job})  = do
102 48e4da5c Klaus Aehlig
  let jid = qjId job
103 48e4da5c Klaus Aehlig
  qdir <- queueDir
104 48e4da5c Klaus Aehlig
  let fpath = liveJobFile qdir jid
105 48e4da5c Klaus Aehlig
  logDebug $ "Checking if " ++ fpath ++ " changed on disk."
106 350f0759 Klaus Aehlig
  changedResult <- try $ needsReload fstat fpath
107 350f0759 Klaus Aehlig
                   :: IO (Either IOError (Maybe FStat))
108 350f0759 Klaus Aehlig
  let changed = either (const $ Just nullFStat) id changedResult
109 48e4da5c Klaus Aehlig
  case changed of
110 48e4da5c Klaus Aehlig
    Nothing -> do
111 48e4da5c Klaus Aehlig
      logDebug $ "File " ++ fpath ++ " not changed on disk."
112 48e4da5c Klaus Aehlig
      return Nothing
113 48e4da5c Klaus Aehlig
    Just fstat' -> do
114 48e4da5c Klaus Aehlig
      let jids = show $ fromJobId jid
115 350f0759 Klaus Aehlig
      logInfo $ "Rereading job "  ++ jids
116 350f0759 Klaus Aehlig
      readResult <- loadJobFromDisk qdir True jid
117 48e4da5c Klaus Aehlig
      case readResult of
118 48e4da5c Klaus Aehlig
        Bad s -> do
119 48e4da5c Klaus Aehlig
          logWarning $ "Failed to read job " ++ jids ++ ": " ++ s
120 48e4da5c Klaus Aehlig
          return Nothing
121 48e4da5c Klaus Aehlig
        Ok (job', _) -> do
122 48e4da5c Klaus Aehlig
          logDebug
123 48e4da5c Klaus Aehlig
            $ "Read job " ++ jids ++ ", staus is " ++ show (calcJobStatus job')
124 ed6cf449 Klaus Aehlig
          return . Just $ jWS {jStat=fstat', jJob=job'}
125 ed6cf449 Klaus Aehlig
                          -- jINotify unchanged
126 48e4da5c Klaus Aehlig
127 48e4da5c Klaus Aehlig
-- | Update a job in the job queue, if it is still there. This is the
128 48e4da5c Klaus Aehlig
-- pure function for inserting a previously read change into the queue.
129 48e4da5c Klaus Aehlig
-- as the change contains its time stamp, we don't have to worry about a
130 48e4da5c Klaus Aehlig
-- later read change overwriting a newer read state. If this happens, the
131 48e4da5c Klaus Aehlig
-- fstat value will be outdated, so the next poller run will fix this.
132 48e4da5c Klaus Aehlig
updateJobStatus :: JobWithStat -> [JobWithStat] -> [JobWithStat]
133 48e4da5c Klaus Aehlig
updateJobStatus job' =
134 48e4da5c Klaus Aehlig
  let jid = qjId $ jJob job' in
135 48e4da5c Klaus Aehlig
  map (\job -> if qjId (jJob job) == jid then job' else job)
136 48e4da5c Klaus Aehlig
137 48e4da5c Klaus Aehlig
-- | Update a single job by reading it from disk, if necessary.
138 48e4da5c Klaus Aehlig
updateJob :: JQStatus -> JobWithStat -> IO ()
139 48e4da5c Klaus Aehlig
updateJob state jb = do
140 48e4da5c Klaus Aehlig
  jb' <- readJobStatus jb
141 48e4da5c Klaus Aehlig
  maybe (return ()) (modifyJobs state . onRunningJobs . updateJobStatus) jb'
142 48e4da5c Klaus Aehlig
143 48e4da5c Klaus Aehlig
-- | Sort out the finished jobs from the monitored part of the queue.
144 48e4da5c Klaus Aehlig
-- This is the pure part, splitting the queue into a remaining queue
145 48e4da5c Klaus Aehlig
-- and the jobs that were removed.
146 48e4da5c Klaus Aehlig
sortoutFinishedJobs :: Queue -> (Queue, [QueuedJob])
147 48e4da5c Klaus Aehlig
sortoutFinishedJobs queue =
148 1b3bde96 Klaus Aehlig
  let (fin, run') = partition (jobFinalized . jJob) . qRunning $ queue
149 48e4da5c Klaus Aehlig
  in (queue {qRunning=run'}, map jJob fin)
150 48e4da5c Klaus Aehlig
151 48e4da5c Klaus Aehlig
-- | Actually clean up the finished jobs. This is the IO wrapper around
152 48e4da5c Klaus Aehlig
-- the pure `sortoutFinishedJobs`.
153 48e4da5c Klaus Aehlig
cleanupFinishedJobs :: JQStatus -> IO ()
154 48e4da5c Klaus Aehlig
cleanupFinishedJobs qstate = do
155 48e4da5c Klaus Aehlig
  finished <- atomicModifyIORef (jqJobs qstate) sortoutFinishedJobs
156 48e4da5c Klaus Aehlig
  let showJob = show . ((fromJobId . qjId) &&& calcJobStatus)
157 48e4da5c Klaus Aehlig
      jlist = commaJoin $ map showJob finished
158 48e4da5c Klaus Aehlig
  unless (null finished)
159 48e4da5c Klaus Aehlig
    . logInfo $ "Finished jobs: " ++ jlist
160 cc5ab470 Klaus Aehlig
  mapM_ (maybe (return ()) killINotify . jINotify) finished
161 48e4da5c Klaus Aehlig
162 48e4da5c Klaus Aehlig
-- | Decide on which jobs to schedule next for execution. This is the
163 48e4da5c Klaus Aehlig
-- pure function doing the scheduling.
164 48e4da5c Klaus Aehlig
selectJobsToRun :: Queue -> (Queue, [QueuedJob])
165 48e4da5c Klaus Aehlig
selectJobsToRun queue =
166 48e4da5c Klaus Aehlig
  let n = maxRunningJobs - length (qRunning queue)
167 48e4da5c Klaus Aehlig
      (chosen, remain) = splitAt n (qEnqueued queue)
168 48e4da5c Klaus Aehlig
  in (queue {qEnqueued=remain, qRunning=qRunning queue ++ chosen}
169 48e4da5c Klaus Aehlig
     , map jJob chosen)
170 48e4da5c Klaus Aehlig
171 1c532d2d Klaus Aehlig
-- | Requeue jobs that were previously selected for execution
172 1c532d2d Klaus Aehlig
-- but couldn't be started.
173 1c532d2d Klaus Aehlig
requeueJobs :: JQStatus -> [QueuedJob] -> IOError -> IO ()
174 1c532d2d Klaus Aehlig
requeueJobs qstate jobs err = do
175 1c532d2d Klaus Aehlig
  let jids = map qjId jobs
176 1c532d2d Klaus Aehlig
      jidsString = commaJoin $ map (show . fromJobId) jids
177 1c532d2d Klaus Aehlig
      rmJobs = filter ((`notElem` jids) . qjId . jJob)
178 1c532d2d Klaus Aehlig
  logWarning $ "Starting jobs failed: " ++ show err
179 1c532d2d Klaus Aehlig
  logWarning $ "Rescheduling jobs: " ++ jidsString
180 1c532d2d Klaus Aehlig
  modifyJobs qstate (onRunningJobs rmJobs)
181 1c532d2d Klaus Aehlig
  modifyJobs qstate (onQueuedJobs . (++) $ map unreadJob jobs)
182 1c532d2d Klaus Aehlig
183 48e4da5c Klaus Aehlig
-- | Schedule jobs to be run. This is the IO wrapper around the
184 48e4da5c Klaus Aehlig
-- pure `selectJobsToRun`.
185 48e4da5c Klaus Aehlig
scheduleSomeJobs :: JQStatus -> IO ()
186 48e4da5c Klaus Aehlig
scheduleSomeJobs qstate = do
187 48e4da5c Klaus Aehlig
  chosen <- atomicModifyIORef (jqJobs qstate) selectJobsToRun
188 7dd21737 Klaus Aehlig
  unless (null chosen) . logInfo . (++) "Starting jobs: " . commaJoin
189 48e4da5c Klaus Aehlig
    $ map (show . fromJobId . qjId) chosen
190 1c532d2d Klaus Aehlig
  result <- try $ JQ.startJobs chosen
191 1c532d2d Klaus Aehlig
  either (requeueJobs qstate chosen) return result
192 48e4da5c Klaus Aehlig
193 48e4da5c Klaus Aehlig
-- | Format the job queue status in a compact, human readable way.
194 48e4da5c Klaus Aehlig
showQueue :: Queue -> String
195 48e4da5c Klaus Aehlig
showQueue (Queue {qEnqueued=waiting, qRunning=running}) =
196 48e4da5c Klaus Aehlig
  let showids = show . map (fromJobId . qjId . jJob)
197 48e4da5c Klaus Aehlig
  in "Waiting jobs: " ++ showids waiting 
198 48e4da5c Klaus Aehlig
       ++ "; running jobs: " ++ showids running
199 48e4da5c Klaus Aehlig
200 48e4da5c Klaus Aehlig
-- | Time-based watcher for updating the job queue.
201 48e4da5c Klaus Aehlig
onTimeWatcher :: JQStatus -> IO ()
202 48e4da5c Klaus Aehlig
onTimeWatcher qstate = forever $ do
203 48e4da5c Klaus Aehlig
  threadDelay watchInterval
204 fe50bb65 Klaus Aehlig
  logDebug "Job queue watcher timer fired"
205 48e4da5c Klaus Aehlig
  jobs <- readIORef (jqJobs qstate)
206 48e4da5c Klaus Aehlig
  mapM_ (updateJob qstate) $ qRunning jobs
207 48e4da5c Klaus Aehlig
  cleanupFinishedJobs qstate
208 48e4da5c Klaus Aehlig
  jobs' <- readIORef (jqJobs qstate)
209 48e4da5c Klaus Aehlig
  logInfo $ showQueue jobs'
210 48e4da5c Klaus Aehlig
  scheduleSomeJobs qstate
211 48e4da5c Klaus Aehlig
212 2713b91a Klaus Aehlig
-- | Read a single, non-archived, job, specified by its id, from disk.
213 2713b91a Klaus Aehlig
readJobFromDisk :: JobId -> IO (Result JobWithStat)
214 2713b91a Klaus Aehlig
readJobFromDisk jid = do
215 2713b91a Klaus Aehlig
  qdir <- queueDir
216 2713b91a Klaus Aehlig
  let fpath = liveJobFile qdir jid
217 2713b91a Klaus Aehlig
  logDebug $ "Reading " ++ fpath
218 2713b91a Klaus Aehlig
  tryFstat <- try $ getFStat fpath :: IO (Either IOError FStat)
219 2713b91a Klaus Aehlig
  let fstat = either (const nullFStat) id tryFstat
220 2713b91a Klaus Aehlig
  loadResult <- JQ.loadJobFromDisk qdir False jid
221 ed6cf449 Klaus Aehlig
  return $ liftM (JobWithStat Nothing fstat . fst) loadResult
222 2713b91a Klaus Aehlig
223 2713b91a Klaus Aehlig
-- | Read all non-finalized jobs from disk.
224 2713b91a Klaus Aehlig
readJobsFromDisk :: IO [JobWithStat]
225 2713b91a Klaus Aehlig
readJobsFromDisk = do
226 2713b91a Klaus Aehlig
  logInfo "Loading job queue"
227 2713b91a Klaus Aehlig
  qdir <- queueDir
228 2713b91a Klaus Aehlig
  eitherJids <- JQ.getJobIDs [qdir]
229 2713b91a Klaus Aehlig
  let jids = either (const []) JQ.sortJobIDs eitherJids
230 2713b91a Klaus Aehlig
      jidsstring = commaJoin $ map (show . fromJobId) jids
231 2713b91a Klaus Aehlig
  logInfo $ "Non-archived jobs on disk: " ++ jidsstring
232 2713b91a Klaus Aehlig
  jobs <- mapM readJobFromDisk jids
233 2713b91a Klaus Aehlig
  return $ justOk jobs
234 2713b91a Klaus Aehlig
235 48e4da5c Klaus Aehlig
-- | Set up the job scheduler. This will also start the monitoring
236 48e4da5c Klaus Aehlig
-- of changes to the running jobs.
237 48e4da5c Klaus Aehlig
initJQScheduler :: JQStatus -> IO ()
238 48e4da5c Klaus Aehlig
initJQScheduler qstate = do
239 2713b91a Klaus Aehlig
  alljobs <- readJobsFromDisk
240 2713b91a Klaus Aehlig
  let jobs = filter (not . jobFinalized . jJob) alljobs
241 2713b91a Klaus Aehlig
      (running, queued) = partition (jobStarted . jJob) jobs
242 2713b91a Klaus Aehlig
  modifyJobs qstate (onQueuedJobs (++ queued) . onRunningJobs (++ running))
243 2713b91a Klaus Aehlig
  jqjobs <- readIORef (jqJobs qstate)
244 2713b91a Klaus Aehlig
  logInfo $ showQueue jqjobs
245 2713b91a Klaus Aehlig
  scheduleSomeJobs qstate
246 48e4da5c Klaus Aehlig
  logInfo "Starting time-based job queue watcher"
247 48e4da5c Klaus Aehlig
  _ <- forkIO $ onTimeWatcher qstate
248 48e4da5c Klaus Aehlig
  return ()
249 48e4da5c Klaus Aehlig
250 48e4da5c Klaus Aehlig
-- | Enqueue new jobs. This will guarantee that the jobs will be executed
251 48e4da5c Klaus Aehlig
-- eventually.
252 48e4da5c Klaus Aehlig
enqueueNewJobs :: JQStatus -> [QueuedJob] -> IO ()
253 48e4da5c Klaus Aehlig
enqueueNewJobs state jobs = do
254 48e4da5c Klaus Aehlig
  logInfo . (++) "New jobs enqueued: " . commaJoin
255 48e4da5c Klaus Aehlig
    $ map (show . fromJobId . qjId) jobs
256 48e4da5c Klaus Aehlig
  let jobs' = map unreadJob jobs
257 48e4da5c Klaus Aehlig
  modifyJobs state (onQueuedJobs (++ jobs'))
258 48e4da5c Klaus Aehlig
  scheduleSomeJobs state