Statistics
| Branch: | Tag: | Revision:

root / src / Ganeti / JQScheduler.hs @ 13d26b66

History | View | Annotate | Download (13.5 kB)

1
{-| Implementation of a reader for the job queue.
2

    
3
-}
4

    
5
{-
6

    
7
Copyright (C) 2013 Google Inc.
8

    
9
This program is free software; you can redistribute it and/or modify
10
it under the terms of the GNU General Public License as published by
11
the Free Software Foundation; either version 2 of the License, or
12
(at your option) any later version.
13

    
14
This program is distributed in the hope that it will be useful, but
15
WITHOUT ANY WARRANTY; without even the implied warranty of
16
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
17
General Public License for more details.
18

    
19
You should have received a copy of the GNU General Public License
20
along with this program; if not, write to the Free Software
21
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22
02110-1301, USA.
23

    
24
-}
25

    
26
module Ganeti.JQScheduler
27
  ( JQStatus
28
  , emptyJQStatus
29
  , initJQScheduler
30
  , enqueueNewJobs
31
  , dequeueJob
32
  , setJobPriority
33
  ) where
34

    
35
import Control.Arrow
36
import Control.Concurrent
37
import Control.Exception
38
import Control.Monad
39
import Control.Monad.IO.Class
40
import Data.Function (on)
41
import Data.List
42
import Data.Maybe
43
import Data.IORef
44
import System.INotify
45

    
46
import Ganeti.BasicTypes
47
import Ganeti.Constants as C
48
import Ganeti.JQueue as JQ
49
import Ganeti.Logging
50
import Ganeti.Objects
51
import Ganeti.Path
52
import Ganeti.Types
53
import Ganeti.Utils
54

    
55
data JobWithStat = JobWithStat { jINotify :: Maybe INotify
56
                               , jStat :: FStat
57
                               , jJob :: QueuedJob
58
                               }
59
data Queue = Queue { qEnqueued :: [JobWithStat], qRunning :: [JobWithStat] }
60

    
61
{-| Representation of the job queue
62

    
63
We keep two lists of jobs (together with information about the last
64
fstat result observed): the jobs that are enqueued, but not yet handed
65
over for execution, and the jobs already handed over for execution. They
66
are kept together in a single IORef, so that we can atomically update
67
both, in particular when scheduling jobs to be handed over for execution.
68

    
69
-}
70

    
71
data JQStatus = JQStatus
72
  { jqJobs :: IORef Queue
73
  , jqConfig :: IORef (Result ConfigData)
74
  }
75

    
76

    
77
emptyJQStatus :: IORef (Result ConfigData) -> IO JQStatus
78
emptyJQStatus config = do
79
  jqJ <- newIORef Queue { qEnqueued = [], qRunning = []}
80
  return JQStatus { jqJobs = jqJ, jqConfig = config }
81

    
82
-- | Apply a function on the running jobs.
83
onRunningJobs :: ([JobWithStat] -> [JobWithStat]) -> Queue -> Queue
84
onRunningJobs f queue = queue {qRunning=f $ qRunning queue}
85

    
86
-- | Apply a function on the queued jobs.
87
onQueuedJobs :: ([JobWithStat] -> [JobWithStat]) -> Queue -> Queue
88
onQueuedJobs f queue = queue {qEnqueued=f $ qEnqueued queue}
89

    
90
-- | Obtain a JobWithStat from a QueuedJob.
91
unreadJob :: QueuedJob -> JobWithStat
92
unreadJob job = JobWithStat {jJob=job, jStat=nullFStat, jINotify=Nothing}
93

    
94
-- | Reload interval for polling the running jobs for updates in microseconds.
95
watchInterval :: Int
96
watchInterval = C.luxidJobqueuePollInterval * 1000000 
97

    
98
-- | Get the maximual number of jobs to be run simultaneously from the
99
-- configuration. If the configuration is not available, be conservative
100
-- and use the smallest possible value, i.e., 1.
101
getMaxRunningJobs :: JQStatus -> IO Int
102
getMaxRunningJobs =
103
  liftM (genericResult (const 1) (clusterMaxRunningJobs . configCluster))
104
  . readIORef . jqConfig
105

    
106
-- | Wrapper function to atomically update the jobs in the queue status.
107
modifyJobs :: JQStatus -> (Queue -> Queue) -> IO ()
108
modifyJobs qstat f = atomicModifyIORef (jqJobs qstat) (flip (,) ()  . f)
109

    
110
-- | Reread a job from disk, if the file has changed.
111
readJobStatus :: JobWithStat -> IO (Maybe JobWithStat)
112
readJobStatus jWS@(JobWithStat {jStat=fstat, jJob=job})  = do
113
  let jid = qjId job
114
  qdir <- queueDir
115
  let fpath = liveJobFile qdir jid
116
  logDebug $ "Checking if " ++ fpath ++ " changed on disk."
117
  changedResult <- try $ needsReload fstat fpath
118
                   :: IO (Either IOError (Maybe FStat))
119
  let changed = either (const $ Just nullFStat) id changedResult
120
  case changed of
121
    Nothing -> do
122
      logDebug $ "File " ++ fpath ++ " not changed on disk."
123
      return Nothing
124
    Just fstat' -> do
125
      let jids = show $ fromJobId jid
126
      logInfo $ "Rereading job "  ++ jids
127
      readResult <- loadJobFromDisk qdir True jid
128
      case readResult of
129
        Bad s -> do
130
          logWarning $ "Failed to read job " ++ jids ++ ": " ++ s
131
          return Nothing
132
        Ok (job', _) -> do
133
          logDebug
134
            $ "Read job " ++ jids ++ ", staus is " ++ show (calcJobStatus job')
135
          return . Just $ jWS {jStat=fstat', jJob=job'}
136
                          -- jINotify unchanged
137

    
138
-- | Update a job in the job queue, if it is still there. This is the
139
-- pure function for inserting a previously read change into the queue.
140
-- as the change contains its time stamp, we don't have to worry about a
141
-- later read change overwriting a newer read state. If this happens, the
142
-- fstat value will be outdated, so the next poller run will fix this.
143
updateJobStatus :: JobWithStat -> [JobWithStat] -> [JobWithStat]
144
updateJobStatus job' =
145
  let jid = qjId $ jJob job' in
146
  map (\job -> if qjId (jJob job) == jid then job' else job)
147

    
148
-- | Update a single job by reading it from disk, if necessary.
149
updateJob :: JQStatus -> JobWithStat -> IO ()
150
updateJob state jb = do
151
  jb' <- readJobStatus jb
152
  maybe (return ()) (modifyJobs state . onRunningJobs . updateJobStatus) jb'
153
  when (maybe True (jobFinalized . jJob) jb') . (>> return ()) . forkIO $ do
154
    logDebug "Scheduler noticed a job to have finished."
155
    cleanupFinishedJobs state
156
    scheduleSomeJobs state
157

    
158
-- | Sort out the finished jobs from the monitored part of the queue.
159
-- This is the pure part, splitting the queue into a remaining queue
160
-- and the jobs that were removed.
161
sortoutFinishedJobs :: Queue -> (Queue, [JobWithStat])
162
sortoutFinishedJobs queue =
163
  let (fin, run') = partition (jobFinalized . jJob) . qRunning $ queue
164
  in (queue {qRunning=run'}, fin)
165

    
166
-- | Actually clean up the finished jobs. This is the IO wrapper around
167
-- the pure `sortoutFinishedJobs`.
168
cleanupFinishedJobs :: JQStatus -> IO ()
169
cleanupFinishedJobs qstate = do
170
  finished <- atomicModifyIORef (jqJobs qstate) sortoutFinishedJobs
171
  let showJob = show . ((fromJobId . qjId) &&& calcJobStatus) . jJob
172
      jlist = commaJoin $ map showJob finished
173
  unless (null finished)
174
    . logInfo $ "Finished jobs: " ++ jlist
175
  mapM_ (maybe (return ()) killINotify . jINotify) finished
176

    
177
-- | Watcher task for a job, to update it on file changes. It also
178
-- reinstantiates itself upon receiving an Ignored event.
179
jobWatcher :: JQStatus -> JobWithStat -> Event -> IO ()
180
jobWatcher state jWS e = do
181
  let jid = qjId $ jJob jWS
182
      jids = show $ fromJobId jid
183
  logInfo $ "Scheduler notified of change of job " ++ jids
184
  logDebug $ "Scheulder notify event for " ++ jids ++ ": " ++ show e
185
  let inotify = jINotify jWS
186
  when (e == Ignored  && isJust inotify) $ do
187
    qdir <- queueDir
188
    let fpath = liveJobFile qdir jid
189
    _ <- addWatch (fromJust inotify) [Modify, Delete] fpath
190
           (jobWatcher state jWS)
191
    return ()
192
  updateJob state jWS
193

    
194
-- | Attach the job watcher to a running job.
195
attachWatcher :: JQStatus -> JobWithStat -> IO ()
196
attachWatcher state jWS = when (isNothing $ jINotify jWS) $ do
197
  inotify <- initINotify
198
  qdir <- queueDir
199
  let fpath = liveJobFile qdir . qjId $ jJob jWS
200
      jWS' = jWS { jINotify=Just inotify }
201
  logDebug $ "Attaching queue watcher for " ++ fpath
202
  _ <- addWatch inotify [Modify, Delete] fpath $ jobWatcher state jWS'
203
  modifyJobs state . onRunningJobs $ updateJobStatus jWS'
204

    
205
-- | Decide on which jobs to schedule next for execution. This is the
206
-- pure function doing the scheduling.
207
selectJobsToRun :: Int -> Queue -> (Queue, [JobWithStat])
208
selectJobsToRun count queue =
209
  let n = count - length (qRunning queue)
210
      (chosen, remain) = splitAt n (qEnqueued queue)
211
  in (queue {qEnqueued=remain, qRunning=qRunning queue ++ chosen}, chosen)
212

    
213
-- | Requeue jobs that were previously selected for execution
214
-- but couldn't be started.
215
requeueJobs :: JQStatus -> [JobWithStat] -> IOError -> IO ()
216
requeueJobs qstate jobs err = do
217
  let jids = map (qjId . jJob) jobs
218
      jidsString = commaJoin $ map (show . fromJobId) jids
219
      rmJobs = filter ((`notElem` jids) . qjId . jJob)
220
  logWarning $ "Starting jobs failed: " ++ show err
221
  logWarning $ "Rescheduling jobs: " ++ jidsString
222
  modifyJobs qstate (onRunningJobs rmJobs)
223
  modifyJobs qstate (onQueuedJobs $ (++) jobs)
224

    
225
-- | Schedule jobs to be run. This is the IO wrapper around the
226
-- pure `selectJobsToRun`.
227
scheduleSomeJobs :: JQStatus -> IO ()
228
scheduleSomeJobs qstate = do
229
  count <- getMaxRunningJobs qstate
230
  chosen <- atomicModifyIORef (jqJobs qstate) (selectJobsToRun count)
231
  let jobs = map jJob chosen
232
  unless (null chosen) . logInfo . (++) "Starting jobs: " . commaJoin
233
    $ map (show . fromJobId . qjId) jobs
234
  mapM_ (attachWatcher qstate) chosen
235
  result <- try $ JQ.startJobs jobs
236
  either (requeueJobs qstate chosen) return result
237

    
238
-- | Format the job queue status in a compact, human readable way.
239
showQueue :: Queue -> String
240
showQueue (Queue {qEnqueued=waiting, qRunning=running}) =
241
  let showids = show . map (fromJobId . qjId . jJob)
242
  in "Waiting jobs: " ++ showids waiting 
243
       ++ "; running jobs: " ++ showids running
244

    
245
-- | Time-based watcher for updating the job queue.
246
onTimeWatcher :: JQStatus -> IO ()
247
onTimeWatcher qstate = forever $ do
248
  threadDelay watchInterval
249
  logDebug "Job queue watcher timer fired"
250
  jobs <- readIORef (jqJobs qstate)
251
  mapM_ (updateJob qstate) $ qRunning jobs
252
  cleanupFinishedJobs qstate
253
  jobs' <- readIORef (jqJobs qstate)
254
  logInfo $ showQueue jobs'
255
  scheduleSomeJobs qstate
256

    
257
-- | Read a single, non-archived, job, specified by its id, from disk.
258
readJobFromDisk :: JobId -> IO (Result JobWithStat)
259
readJobFromDisk jid = do
260
  qdir <- queueDir
261
  let fpath = liveJobFile qdir jid
262
  logDebug $ "Reading " ++ fpath
263
  tryFstat <- try $ getFStat fpath :: IO (Either IOError FStat)
264
  let fstat = either (const nullFStat) id tryFstat
265
  loadResult <- JQ.loadJobFromDisk qdir False jid
266
  return $ liftM (JobWithStat Nothing fstat . fst) loadResult
267

    
268
-- | Read all non-finalized jobs from disk.
269
readJobsFromDisk :: IO [JobWithStat]
270
readJobsFromDisk = do
271
  logInfo "Loading job queue"
272
  qdir <- queueDir
273
  eitherJids <- JQ.getJobIDs [qdir]
274
  let jids = genericResult (const []) JQ.sortJobIDs eitherJids
275
      jidsstring = commaJoin $ map (show . fromJobId) jids
276
  logInfo $ "Non-archived jobs on disk: " ++ jidsstring
277
  jobs <- mapM readJobFromDisk jids
278
  return $ justOk jobs
279

    
280
-- | Set up the job scheduler. This will also start the monitoring
281
-- of changes to the running jobs.
282
initJQScheduler :: JQStatus -> IO ()
283
initJQScheduler qstate = do
284
  alljobs <- readJobsFromDisk
285
  let jobs = filter (not . jobFinalized . jJob) alljobs
286
      (running, queued) = partition (jobStarted . jJob) jobs
287
  modifyJobs qstate (onQueuedJobs (++ queued) . onRunningJobs (++ running))
288
  jqjobs <- readIORef (jqJobs qstate)
289
  logInfo $ showQueue jqjobs
290
  scheduleSomeJobs qstate
291
  logInfo "Starting time-based job queue watcher"
292
  _ <- forkIO $ onTimeWatcher qstate
293
  return ()
294

    
295
-- | Enqueue new jobs. This will guarantee that the jobs will be executed
296
-- eventually.
297
enqueueNewJobs :: JQStatus -> [QueuedJob] -> IO ()
298
enqueueNewJobs state jobs = do
299
  logInfo . (++) "New jobs enqueued: " . commaJoin
300
    $ map (show . fromJobId . qjId) jobs
301
  let jobs' = map unreadJob jobs
302
      insertFn = insertBy (compare `on` fromJobId . qjId . jJob)
303
      addJobs oldjobs = foldl (flip insertFn) oldjobs jobs'
304
  modifyJobs state (onQueuedJobs addJobs)
305
  scheduleSomeJobs state
306

    
307
-- | Pure function for removing a queued job from the job queue by
308
-- atomicModifyIORef. The answer is Just the job if the job could be removed
309
-- before being handed over to execution, Nothing if it already was started
310
-- and a Bad result if the job is not found in the queue.
311
rmJob :: JobId -> Queue -> (Queue, Result (Maybe QueuedJob))
312
rmJob jid q =
313
  let isJid = (jid ==) . qjId . jJob
314
      (found, queued') = partition isJid $ qEnqueued q
315
      isRunning = any isJid $ qRunning q
316
      sJid = (++) "Job " . show $ fromJobId jid
317
  in case (found, isRunning) of
318
    ([job], _) -> (q {qEnqueued = queued'}, Ok . Just $ jJob job)
319
    (_:_, _) -> (q, Bad $ "Queue in inconsistent state."
320
                           ++ sJid ++ " queued multiple times")
321
    (_, True) -> (q, Ok Nothing)
322
    _ -> (q, Bad $ sJid ++ " not found in queue")
323

    
324
-- | Try to remove a queued job from the job queue. Return True, if
325
-- the job could be removed from the queue before being handed over
326
-- to execution, False if the job already started, and a Bad result
327
-- if the job is unknown.
328
dequeueJob :: JQStatus -> JobId -> IO (Result Bool)
329
dequeueJob state jid = do
330
  result <- atomicModifyIORef (jqJobs state) $ rmJob jid
331
  let result' = fmap isJust result
332
  logDebug $ "Result of dequeing job " ++ show (fromJobId jid)
333
              ++ " is " ++ show result'
334
  return result'
335

    
336
-- | Change the priority of a queued job (once the job is handed over
337
-- to execution, the job itself needs to be informed). To avoid the
338
-- job being started unmodified, it is temporarily unqueued during the
339
-- change. Return the modified job, if the job's priority was sucessfully
340
-- modified, Nothing, if the job already started, and a Bad value, if the job
341
-- is unkown.
342
setJobPriority :: JQStatus -> JobId -> Int -> IO (Result (Maybe QueuedJob))
343
setJobPriority state jid prio = runResultT $ do
344
  maybeJob <- mkResultT . atomicModifyIORef (jqJobs state) $ rmJob jid
345
  case maybeJob of
346
    Nothing -> return Nothing
347
    Just job -> do
348
      let job' = changeJobPriority prio job
349
      qDir <- liftIO queueDir
350
      mkResultT $ writeJobToDisk qDir job'
351
      liftIO $ enqueueNewJobs state [job']
352
      return $ Just job'