Statistics
| Branch: | Tag: | Revision:

root / src / Ganeti / JQScheduler.hs @ ea174b21

History | View | Annotate | Download (10.8 kB)

1
{-| Implementation of a reader for the job queue.
2

    
3
-}
4

    
5
{-
6

    
7
Copyright (C) 2013 Google Inc.
8

    
9
This program is free software; you can redistribute it and/or modify
10
it under the terms of the GNU General Public License as published by
11
the Free Software Foundation; either version 2 of the License, or
12
(at your option) any later version.
13

    
14
This program is distributed in the hope that it will be useful, but
15
WITHOUT ANY WARRANTY; without even the implied warranty of
16
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
17
General Public License for more details.
18

    
19
You should have received a copy of the GNU General Public License
20
along with this program; if not, write to the Free Software
21
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22
02110-1301, USA.
23

    
24
-}
25

    
26
module Ganeti.JQScheduler
27
  ( JQStatus
28
  , emptyJQStatus
29
  , initJQScheduler
30
  , enqueueNewJobs
31
  ) where
32

    
33
import Control.Arrow
34
import Control.Concurrent
35
import Control.Exception
36
import Control.Monad
37
import Data.List
38
import Data.Maybe
39
import Data.IORef
40
import System.INotify
41

    
42
import Ganeti.BasicTypes
43
import Ganeti.Constants as C
44
import Ganeti.JQueue as JQ
45
import Ganeti.Logging
46
import Ganeti.Path
47
import Ganeti.Types
48
import Ganeti.Utils
49

    
50
data JobWithStat = JobWithStat { jINotify :: Maybe INotify
51
                               , jStat :: FStat
52
                               , jJob :: QueuedJob
53
                               }
54
data Queue = Queue { qEnqueued :: [JobWithStat], qRunning :: [JobWithStat] }
55

    
56
{-| Representation of the job queue
57

    
58
We keep two lists of jobs (together with information about the last
59
fstat result observed): the jobs that are enqueued, but not yet handed
60
over for execution, and the jobs already handed over for execution. They
61
are kept together in a single IORef, so that we can atomically update
62
both, in particular when scheduling jobs to be handed over for execution.
63

    
64
-}
65

    
66
data JQStatus = JQStatus
67
  { jqJobs :: IORef Queue
68
  }
69

    
70

    
71
emptyJQStatus :: IO JQStatus
72
emptyJQStatus = do
73
  jqJ <- newIORef Queue {qEnqueued=[], qRunning=[]}
74
  return JQStatus { jqJobs=jqJ }
75

    
76
-- | Apply a function on the running jobs.
77
onRunningJobs :: ([JobWithStat] -> [JobWithStat]) -> Queue -> Queue
78
onRunningJobs f queue = queue {qRunning=f $ qRunning queue}
79

    
80
-- | Apply a function on the queued jobs.
81
onQueuedJobs :: ([JobWithStat] -> [JobWithStat]) -> Queue -> Queue
82
onQueuedJobs f queue = queue {qEnqueued=f $ qEnqueued queue}
83

    
84
-- | Obtain a JobWithStat from a QueuedJob.
85
unreadJob :: QueuedJob -> JobWithStat
86
unreadJob job = JobWithStat {jJob=job, jStat=nullFStat, jINotify=Nothing}
87

    
88
-- | Reload interval for polling the running jobs for updates in microseconds.
89
watchInterval :: Int
90
watchInterval = C.luxidJobqueuePollInterval * 1000000 
91

    
92
-- | Maximal number of jobs to be running at the same time.
93
maxRunningJobs :: Int
94
maxRunningJobs = C.luxidMaximalRunningJobs 
95

    
96
-- | Wrapper function to atomically update the jobs in the queue status.
97
modifyJobs :: JQStatus -> (Queue -> Queue) -> IO ()
98
modifyJobs qstat f = atomicModifyIORef (jqJobs qstat) (flip (,) ()  . f)
99

    
100
-- | Reread a job from disk, if the file has changed.
101
readJobStatus :: JobWithStat -> IO (Maybe JobWithStat)
102
readJobStatus jWS@(JobWithStat {jStat=fstat, jJob=job})  = do
103
  let jid = qjId job
104
  qdir <- queueDir
105
  let fpath = liveJobFile qdir jid
106
  logDebug $ "Checking if " ++ fpath ++ " changed on disk."
107
  changedResult <- try $ needsReload fstat fpath
108
                   :: IO (Either IOError (Maybe FStat))
109
  let changed = either (const $ Just nullFStat) id changedResult
110
  case changed of
111
    Nothing -> do
112
      logDebug $ "File " ++ fpath ++ " not changed on disk."
113
      return Nothing
114
    Just fstat' -> do
115
      let jids = show $ fromJobId jid
116
      logInfo $ "Rereading job "  ++ jids
117
      readResult <- loadJobFromDisk qdir True jid
118
      case readResult of
119
        Bad s -> do
120
          logWarning $ "Failed to read job " ++ jids ++ ": " ++ s
121
          return Nothing
122
        Ok (job', _) -> do
123
          logDebug
124
            $ "Read job " ++ jids ++ ", staus is " ++ show (calcJobStatus job')
125
          return . Just $ jWS {jStat=fstat', jJob=job'}
126
                          -- jINotify unchanged
127

    
128
-- | Update a job in the job queue, if it is still there. This is the
129
-- pure function for inserting a previously read change into the queue.
130
-- as the change contains its time stamp, we don't have to worry about a
131
-- later read change overwriting a newer read state. If this happens, the
132
-- fstat value will be outdated, so the next poller run will fix this.
133
updateJobStatus :: JobWithStat -> [JobWithStat] -> [JobWithStat]
134
updateJobStatus job' =
135
  let jid = qjId $ jJob job' in
136
  map (\job -> if qjId (jJob job) == jid then job' else job)
137

    
138
-- | Update a single job by reading it from disk, if necessary.
139
updateJob :: JQStatus -> JobWithStat -> IO ()
140
updateJob state jb = do
141
  jb' <- readJobStatus jb
142
  maybe (return ()) (modifyJobs state . onRunningJobs . updateJobStatus) jb'
143
  when (maybe True (jobFinalized . jJob) jb') . (>> return ()) . forkIO $ do
144
    logDebug "Scheduler noticed a job to have finished."
145
    cleanupFinishedJobs state
146
    scheduleSomeJobs state
147

    
148
-- | Sort out the finished jobs from the monitored part of the queue.
149
-- This is the pure part, splitting the queue into a remaining queue
150
-- and the jobs that were removed.
151
sortoutFinishedJobs :: Queue -> (Queue, [JobWithStat])
152
sortoutFinishedJobs queue =
153
  let (fin, run') = partition (jobFinalized . jJob) . qRunning $ queue
154
  in (queue {qRunning=run'}, fin)
155

    
156
-- | Actually clean up the finished jobs. This is the IO wrapper around
157
-- the pure `sortoutFinishedJobs`.
158
cleanupFinishedJobs :: JQStatus -> IO ()
159
cleanupFinishedJobs qstate = do
160
  finished <- atomicModifyIORef (jqJobs qstate) sortoutFinishedJobs
161
  let showJob = show . ((fromJobId . qjId) &&& calcJobStatus) . jJob
162
      jlist = commaJoin $ map showJob finished
163
  unless (null finished)
164
    . logInfo $ "Finished jobs: " ++ jlist
165
  mapM_ (maybe (return ()) killINotify . jINotify) finished
166

    
167
-- | Watcher task for a job, to update it on file changes. It also
168
-- reinstantiates itself upon receiving an Ignored event.
169
jobWatcher :: JQStatus -> JobWithStat -> Event -> IO ()
170
jobWatcher state jWS e = do
171
  let jid = qjId $ jJob jWS
172
      jids = show $ fromJobId jid
173
  logInfo $ "Scheduler notified of change of job " ++ jids
174
  logDebug $ "Scheulder notify event for " ++ jids ++ ": " ++ show e
175
  let inotify = jINotify jWS
176
  when (e == Ignored  && isJust inotify) $ do
177
    qdir <- queueDir
178
    let fpath = liveJobFile qdir jid
179
    _ <- addWatch (fromJust inotify) [Modify, Delete] fpath
180
           (jobWatcher state jWS)
181
    return ()
182
  updateJob state jWS
183

    
184
-- | Attach the job watcher to a running job.
185
attachWatcher :: JQStatus -> JobWithStat -> IO ()
186
attachWatcher state jWS = when (isNothing $ jINotify jWS) $ do
187
  inotify <- initINotify
188
  qdir <- queueDir
189
  let fpath = liveJobFile qdir . qjId $ jJob jWS
190
      jWS' = jWS { jINotify=Just inotify }
191
  logDebug $ "Attaching queue watcher for " ++ fpath
192
  _ <- addWatch inotify [Modify, Delete] fpath $ jobWatcher state jWS'
193
  modifyJobs state . onRunningJobs $ updateJobStatus jWS'
194

    
195
-- | Decide on which jobs to schedule next for execution. This is the
196
-- pure function doing the scheduling.
197
selectJobsToRun :: Queue -> (Queue, [JobWithStat])
198
selectJobsToRun queue =
199
  let n = maxRunningJobs - length (qRunning queue)
200
      (chosen, remain) = splitAt n (qEnqueued queue)
201
  in (queue {qEnqueued=remain, qRunning=qRunning queue ++ chosen}, chosen)
202

    
203
-- | Requeue jobs that were previously selected for execution
204
-- but couldn't be started.
205
requeueJobs :: JQStatus -> [JobWithStat] -> IOError -> IO ()
206
requeueJobs qstate jobs err = do
207
  let jids = map (qjId . jJob) jobs
208
      jidsString = commaJoin $ map (show . fromJobId) jids
209
      rmJobs = filter ((`notElem` jids) . qjId . jJob)
210
  logWarning $ "Starting jobs failed: " ++ show err
211
  logWarning $ "Rescheduling jobs: " ++ jidsString
212
  modifyJobs qstate (onRunningJobs rmJobs)
213
  modifyJobs qstate (onQueuedJobs $ (++) jobs)
214

    
215
-- | Schedule jobs to be run. This is the IO wrapper around the
216
-- pure `selectJobsToRun`.
217
scheduleSomeJobs :: JQStatus -> IO ()
218
scheduleSomeJobs qstate = do
219
  chosen <- atomicModifyIORef (jqJobs qstate) selectJobsToRun
220
  let jobs = map jJob chosen
221
  unless (null chosen) . logInfo . (++) "Starting jobs: " . commaJoin
222
    $ map (show . fromJobId . qjId) jobs
223
  mapM_ (attachWatcher qstate) chosen
224
  result <- try $ JQ.startJobs jobs
225
  either (requeueJobs qstate chosen) return result
226

    
227
-- | Format the job queue status in a compact, human readable way.
228
showQueue :: Queue -> String
229
showQueue (Queue {qEnqueued=waiting, qRunning=running}) =
230
  let showids = show . map (fromJobId . qjId . jJob)
231
  in "Waiting jobs: " ++ showids waiting 
232
       ++ "; running jobs: " ++ showids running
233

    
234
-- | Time-based watcher for updating the job queue.
235
onTimeWatcher :: JQStatus -> IO ()
236
onTimeWatcher qstate = forever $ do
237
  threadDelay watchInterval
238
  logDebug "Job queue watcher timer fired"
239
  jobs <- readIORef (jqJobs qstate)
240
  mapM_ (updateJob qstate) $ qRunning jobs
241
  cleanupFinishedJobs qstate
242
  jobs' <- readIORef (jqJobs qstate)
243
  logInfo $ showQueue jobs'
244
  scheduleSomeJobs qstate
245

    
246
-- | Read a single, non-archived, job, specified by its id, from disk.
247
readJobFromDisk :: JobId -> IO (Result JobWithStat)
248
readJobFromDisk jid = do
249
  qdir <- queueDir
250
  let fpath = liveJobFile qdir jid
251
  logDebug $ "Reading " ++ fpath
252
  tryFstat <- try $ getFStat fpath :: IO (Either IOError FStat)
253
  let fstat = either (const nullFStat) id tryFstat
254
  loadResult <- JQ.loadJobFromDisk qdir False jid
255
  return $ liftM (JobWithStat Nothing fstat . fst) loadResult
256

    
257
-- | Read all non-finalized jobs from disk.
258
readJobsFromDisk :: IO [JobWithStat]
259
readJobsFromDisk = do
260
  logInfo "Loading job queue"
261
  qdir <- queueDir
262
  eitherJids <- JQ.getJobIDs [qdir]
263
  let jids = either (const []) JQ.sortJobIDs eitherJids
264
      jidsstring = commaJoin $ map (show . fromJobId) jids
265
  logInfo $ "Non-archived jobs on disk: " ++ jidsstring
266
  jobs <- mapM readJobFromDisk jids
267
  return $ justOk jobs
268

    
269
-- | Set up the job scheduler. This will also start the monitoring
270
-- of changes to the running jobs.
271
initJQScheduler :: JQStatus -> IO ()
272
initJQScheduler qstate = do
273
  alljobs <- readJobsFromDisk
274
  let jobs = filter (not . jobFinalized . jJob) alljobs
275
      (running, queued) = partition (jobStarted . jJob) jobs
276
  modifyJobs qstate (onQueuedJobs (++ queued) . onRunningJobs (++ running))
277
  jqjobs <- readIORef (jqJobs qstate)
278
  logInfo $ showQueue jqjobs
279
  scheduleSomeJobs qstate
280
  logInfo "Starting time-based job queue watcher"
281
  _ <- forkIO $ onTimeWatcher qstate
282
  return ()
283

    
284
-- | Enqueue new jobs. This will guarantee that the jobs will be executed
285
-- eventually.
286
enqueueNewJobs :: JQStatus -> [QueuedJob] -> IO ()
287
enqueueNewJobs state jobs = do
288
  logInfo . (++) "New jobs enqueued: " . commaJoin
289
    $ map (show . fromJobId . qjId) jobs
290
  let jobs' = map unreadJob jobs
291
  modifyJobs state (onQueuedJobs (++ jobs'))
292
  scheduleSomeJobs state