Statistics
| Branch: | Tag: | Revision:

root / src / Ganeti / JQScheduler.hs @ f7743189

History | View | Annotate | Download (12.7 kB)

1
{-| Implementation of a reader for the job queue.
2

    
3
-}
4

    
5
{-
6

    
7
Copyright (C) 2013 Google Inc.
8

    
9
This program is free software; you can redistribute it and/or modify
10
it under the terms of the GNU General Public License as published by
11
the Free Software Foundation; either version 2 of the License, or
12
(at your option) any later version.
13

    
14
This program is distributed in the hope that it will be useful, but
15
WITHOUT ANY WARRANTY; without even the implied warranty of
16
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
17
General Public License for more details.
18

    
19
You should have received a copy of the GNU General Public License
20
along with this program; if not, write to the Free Software
21
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22
02110-1301, USA.
23

    
24
-}
25

    
26
module Ganeti.JQScheduler
27
  ( JQStatus
28
  , emptyJQStatus
29
  , initJQScheduler
30
  , enqueueNewJobs
31
  , dequeueJob
32
  ) where
33

    
34
import Control.Arrow
35
import Control.Concurrent
36
import Control.Exception
37
import Control.Monad
38
import Data.Function (on)
39
import Data.List
40
import Data.Maybe
41
import Data.IORef
42
import System.INotify
43

    
44
import Ganeti.BasicTypes
45
import Ganeti.Constants as C
46
import Ganeti.JQueue as JQ
47
import Ganeti.Logging
48
import Ganeti.Objects
49
import Ganeti.Path
50
import Ganeti.Types
51
import Ganeti.Utils
52

    
53
data JobWithStat = JobWithStat { jINotify :: Maybe INotify
54
                               , jStat :: FStat
55
                               , jJob :: QueuedJob
56
                               }
57
data Queue = Queue { qEnqueued :: [JobWithStat], qRunning :: [JobWithStat] }
58

    
59
{-| Representation of the job queue
60

    
61
We keep two lists of jobs (together with information about the last
62
fstat result observed): the jobs that are enqueued, but not yet handed
63
over for execution, and the jobs already handed over for execution. They
64
are kept together in a single IORef, so that we can atomically update
65
both, in particular when scheduling jobs to be handed over for execution.
66

    
67
-}
68

    
69
data JQStatus = JQStatus
70
  { jqJobs :: IORef Queue
71
  , jqConfig :: IORef (Result ConfigData)
72
  }
73

    
74

    
75
emptyJQStatus :: IORef (Result ConfigData) -> IO JQStatus
76
emptyJQStatus config = do
77
  jqJ <- newIORef Queue { qEnqueued = [], qRunning = []}
78
  return JQStatus { jqJobs = jqJ, jqConfig = config }
79

    
80
-- | Apply a function on the running jobs.
81
onRunningJobs :: ([JobWithStat] -> [JobWithStat]) -> Queue -> Queue
82
onRunningJobs f queue = queue {qRunning=f $ qRunning queue}
83

    
84
-- | Apply a function on the queued jobs.
85
onQueuedJobs :: ([JobWithStat] -> [JobWithStat]) -> Queue -> Queue
86
onQueuedJobs f queue = queue {qEnqueued=f $ qEnqueued queue}
87

    
88
-- | Obtain a JobWithStat from a QueuedJob.
89
unreadJob :: QueuedJob -> JobWithStat
90
unreadJob job = JobWithStat {jJob=job, jStat=nullFStat, jINotify=Nothing}
91

    
92
-- | Reload interval for polling the running jobs for updates in microseconds.
93
watchInterval :: Int
94
watchInterval = C.luxidJobqueuePollInterval * 1000000 
95

    
96
-- | Get the maximual number of jobs to be run simultaneously from the
97
-- configuration. If the configuration is not available, be conservative
98
-- and use the smallest possible value, i.e., 1.
99
getMaxRunningJobs :: JQStatus -> IO Int
100
getMaxRunningJobs =
101
  liftM (genericResult (const 1) (clusterMaxRunningJobs . configCluster))
102
  . readIORef . jqConfig
103

    
104
-- | Wrapper function to atomically update the jobs in the queue status.
105
modifyJobs :: JQStatus -> (Queue -> Queue) -> IO ()
106
modifyJobs qstat f = atomicModifyIORef (jqJobs qstat) (flip (,) ()  . f)
107

    
108
-- | Reread a job from disk, if the file has changed.
109
readJobStatus :: JobWithStat -> IO (Maybe JobWithStat)
110
readJobStatus jWS@(JobWithStat {jStat=fstat, jJob=job})  = do
111
  let jid = qjId job
112
  qdir <- queueDir
113
  let fpath = liveJobFile qdir jid
114
  logDebug $ "Checking if " ++ fpath ++ " changed on disk."
115
  changedResult <- try $ needsReload fstat fpath
116
                   :: IO (Either IOError (Maybe FStat))
117
  let changed = either (const $ Just nullFStat) id changedResult
118
  case changed of
119
    Nothing -> do
120
      logDebug $ "File " ++ fpath ++ " not changed on disk."
121
      return Nothing
122
    Just fstat' -> do
123
      let jids = show $ fromJobId jid
124
      logInfo $ "Rereading job "  ++ jids
125
      readResult <- loadJobFromDisk qdir True jid
126
      case readResult of
127
        Bad s -> do
128
          logWarning $ "Failed to read job " ++ jids ++ ": " ++ s
129
          return Nothing
130
        Ok (job', _) -> do
131
          logDebug
132
            $ "Read job " ++ jids ++ ", staus is " ++ show (calcJobStatus job')
133
          return . Just $ jWS {jStat=fstat', jJob=job'}
134
                          -- jINotify unchanged
135

    
136
-- | Update a job in the job queue, if it is still there. This is the
137
-- pure function for inserting a previously read change into the queue.
138
-- as the change contains its time stamp, we don't have to worry about a
139
-- later read change overwriting a newer read state. If this happens, the
140
-- fstat value will be outdated, so the next poller run will fix this.
141
updateJobStatus :: JobWithStat -> [JobWithStat] -> [JobWithStat]
142
updateJobStatus job' =
143
  let jid = qjId $ jJob job' in
144
  map (\job -> if qjId (jJob job) == jid then job' else job)
145

    
146
-- | Update a single job by reading it from disk, if necessary.
147
updateJob :: JQStatus -> JobWithStat -> IO ()
148
updateJob state jb = do
149
  jb' <- readJobStatus jb
150
  maybe (return ()) (modifyJobs state . onRunningJobs . updateJobStatus) jb'
151
  when (maybe True (jobFinalized . jJob) jb') . (>> return ()) . forkIO $ do
152
    logDebug "Scheduler noticed a job to have finished."
153
    cleanupFinishedJobs state
154
    scheduleSomeJobs state
155

    
156
-- | Sort out the finished jobs from the monitored part of the queue.
157
-- This is the pure part, splitting the queue into a remaining queue
158
-- and the jobs that were removed.
159
sortoutFinishedJobs :: Queue -> (Queue, [JobWithStat])
160
sortoutFinishedJobs queue =
161
  let (fin, run') = partition (jobFinalized . jJob) . qRunning $ queue
162
  in (queue {qRunning=run'}, fin)
163

    
164
-- | Actually clean up the finished jobs. This is the IO wrapper around
165
-- the pure `sortoutFinishedJobs`.
166
cleanupFinishedJobs :: JQStatus -> IO ()
167
cleanupFinishedJobs qstate = do
168
  finished <- atomicModifyIORef (jqJobs qstate) sortoutFinishedJobs
169
  let showJob = show . ((fromJobId . qjId) &&& calcJobStatus) . jJob
170
      jlist = commaJoin $ map showJob finished
171
  unless (null finished)
172
    . logInfo $ "Finished jobs: " ++ jlist
173
  mapM_ (maybe (return ()) killINotify . jINotify) finished
174

    
175
-- | Watcher task for a job, to update it on file changes. It also
176
-- reinstantiates itself upon receiving an Ignored event.
177
jobWatcher :: JQStatus -> JobWithStat -> Event -> IO ()
178
jobWatcher state jWS e = do
179
  let jid = qjId $ jJob jWS
180
      jids = show $ fromJobId jid
181
  logInfo $ "Scheduler notified of change of job " ++ jids
182
  logDebug $ "Scheulder notify event for " ++ jids ++ ": " ++ show e
183
  let inotify = jINotify jWS
184
  when (e == Ignored  && isJust inotify) $ do
185
    qdir <- queueDir
186
    let fpath = liveJobFile qdir jid
187
    _ <- addWatch (fromJust inotify) [Modify, Delete] fpath
188
           (jobWatcher state jWS)
189
    return ()
190
  updateJob state jWS
191

    
192
-- | Attach the job watcher to a running job.
193
attachWatcher :: JQStatus -> JobWithStat -> IO ()
194
attachWatcher state jWS = when (isNothing $ jINotify jWS) $ do
195
  inotify <- initINotify
196
  qdir <- queueDir
197
  let fpath = liveJobFile qdir . qjId $ jJob jWS
198
      jWS' = jWS { jINotify=Just inotify }
199
  logDebug $ "Attaching queue watcher for " ++ fpath
200
  _ <- addWatch inotify [Modify, Delete] fpath $ jobWatcher state jWS'
201
  modifyJobs state . onRunningJobs $ updateJobStatus jWS'
202

    
203
-- | Decide on which jobs to schedule next for execution. This is the
204
-- pure function doing the scheduling.
205
selectJobsToRun :: Int -> Queue -> (Queue, [JobWithStat])
206
selectJobsToRun count queue =
207
  let n = count - length (qRunning queue)
208
      (chosen, remain) = splitAt n (qEnqueued queue)
209
  in (queue {qEnqueued=remain, qRunning=qRunning queue ++ chosen}, chosen)
210

    
211
-- | Requeue jobs that were previously selected for execution
212
-- but couldn't be started.
213
requeueJobs :: JQStatus -> [JobWithStat] -> IOError -> IO ()
214
requeueJobs qstate jobs err = do
215
  let jids = map (qjId . jJob) jobs
216
      jidsString = commaJoin $ map (show . fromJobId) jids
217
      rmJobs = filter ((`notElem` jids) . qjId . jJob)
218
  logWarning $ "Starting jobs failed: " ++ show err
219
  logWarning $ "Rescheduling jobs: " ++ jidsString
220
  modifyJobs qstate (onRunningJobs rmJobs)
221
  modifyJobs qstate (onQueuedJobs $ (++) jobs)
222

    
223
-- | Schedule jobs to be run. This is the IO wrapper around the
224
-- pure `selectJobsToRun`.
225
scheduleSomeJobs :: JQStatus -> IO ()
226
scheduleSomeJobs qstate = do
227
  count <- getMaxRunningJobs qstate
228
  chosen <- atomicModifyIORef (jqJobs qstate) (selectJobsToRun count)
229
  let jobs = map jJob chosen
230
  unless (null chosen) . logInfo . (++) "Starting jobs: " . commaJoin
231
    $ map (show . fromJobId . qjId) jobs
232
  mapM_ (attachWatcher qstate) chosen
233
  result <- try $ JQ.startJobs jobs
234
  either (requeueJobs qstate chosen) return result
235

    
236
-- | Format the job queue status in a compact, human readable way.
237
showQueue :: Queue -> String
238
showQueue (Queue {qEnqueued=waiting, qRunning=running}) =
239
  let showids = show . map (fromJobId . qjId . jJob)
240
  in "Waiting jobs: " ++ showids waiting 
241
       ++ "; running jobs: " ++ showids running
242

    
243
-- | Time-based watcher for updating the job queue.
244
onTimeWatcher :: JQStatus -> IO ()
245
onTimeWatcher qstate = forever $ do
246
  threadDelay watchInterval
247
  logDebug "Job queue watcher timer fired"
248
  jobs <- readIORef (jqJobs qstate)
249
  mapM_ (updateJob qstate) $ qRunning jobs
250
  cleanupFinishedJobs qstate
251
  jobs' <- readIORef (jqJobs qstate)
252
  logInfo $ showQueue jobs'
253
  scheduleSomeJobs qstate
254

    
255
-- | Read a single, non-archived, job, specified by its id, from disk.
256
readJobFromDisk :: JobId -> IO (Result JobWithStat)
257
readJobFromDisk jid = do
258
  qdir <- queueDir
259
  let fpath = liveJobFile qdir jid
260
  logDebug $ "Reading " ++ fpath
261
  tryFstat <- try $ getFStat fpath :: IO (Either IOError FStat)
262
  let fstat = either (const nullFStat) id tryFstat
263
  loadResult <- JQ.loadJobFromDisk qdir False jid
264
  return $ liftM (JobWithStat Nothing fstat . fst) loadResult
265

    
266
-- | Read all non-finalized jobs from disk.
267
readJobsFromDisk :: IO [JobWithStat]
268
readJobsFromDisk = do
269
  logInfo "Loading job queue"
270
  qdir <- queueDir
271
  eitherJids <- JQ.getJobIDs [qdir]
272
  let jids = either (const []) JQ.sortJobIDs eitherJids
273
      jidsstring = commaJoin $ map (show . fromJobId) jids
274
  logInfo $ "Non-archived jobs on disk: " ++ jidsstring
275
  jobs <- mapM readJobFromDisk jids
276
  return $ justOk jobs
277

    
278
-- | Set up the job scheduler. This will also start the monitoring
279
-- of changes to the running jobs.
280
initJQScheduler :: JQStatus -> IO ()
281
initJQScheduler qstate = do
282
  alljobs <- readJobsFromDisk
283
  let jobs = filter (not . jobFinalized . jJob) alljobs
284
      (running, queued) = partition (jobStarted . jJob) jobs
285
  modifyJobs qstate (onQueuedJobs (++ queued) . onRunningJobs (++ running))
286
  jqjobs <- readIORef (jqJobs qstate)
287
  logInfo $ showQueue jqjobs
288
  scheduleSomeJobs qstate
289
  logInfo "Starting time-based job queue watcher"
290
  _ <- forkIO $ onTimeWatcher qstate
291
  return ()
292

    
293
-- | Enqueue new jobs. This will guarantee that the jobs will be executed
294
-- eventually.
295
enqueueNewJobs :: JQStatus -> [QueuedJob] -> IO ()
296
enqueueNewJobs state jobs = do
297
  logInfo . (++) "New jobs enqueued: " . commaJoin
298
    $ map (show . fromJobId . qjId) jobs
299
  let jobs' = map unreadJob jobs
300
      insertFn = insertBy (compare `on` fromJobId . qjId . jJob)
301
      addJobs oldjobs = foldl (flip insertFn) oldjobs jobs'
302
  modifyJobs state (onQueuedJobs addJobs)
303
  scheduleSomeJobs state
304

    
305
-- | Pure function for removing a queued job from the job queue by
306
-- atomicModifyIORef. The answer is Just the job if the job could be removed
307
-- before being handed over to execution, Nothing if it already was started
308
-- and a Bad result if the job is not found in the queue.
309
rmJob :: JobId -> Queue -> (Queue, Result (Maybe QueuedJob))
310
rmJob jid q =
311
  let isJid = (jid ==) . qjId . jJob
312
      (found, queued') = partition isJid $ qEnqueued q
313
      isRunning = any isJid $ qRunning q
314
      sJid = (++) "Job " . show $ fromJobId jid
315
  in case (found, isRunning) of
316
    ([job], _) -> (q {qEnqueued = queued'}, Ok . Just $ jJob job)
317
    (_:_, _) -> (q, Bad $ "Queue in inconsistent state."
318
                           ++ sJid ++ " queued multiple times")
319
    (_, True) -> (q, Ok Nothing)
320
    _ -> (q, Bad $ sJid ++ " not found in queue")
321

    
322
-- | Try to remove a queued job from the job queue. Return True, if
323
-- the job could be removed from the queue before being handed over
324
-- to execution, False if the job already started, and a Bad result
325
-- if the job is unknown.
326
dequeueJob :: JQStatus -> JobId -> IO (Result Bool)
327
dequeueJob state jid = do
328
  result <- atomicModifyIORef (jqJobs state) $ rmJob jid
329
  let result' = fmap isJust result
330
  logDebug $ "Result of dequeing job " ++ show (fromJobId jid)
331
              ++ " is " ++ show result'
332
  return result'