Statistics
| Branch: | Tag: | Revision:

root / src / Ganeti / JQScheduler.hs @ b81650b0

History | View | Annotate | Download (10.6 kB)

1
{-| Implementation of a reader for the job queue.
2

    
3
-}
4

    
5
{-
6

    
7
Copyright (C) 2013 Google Inc.
8

    
9
This program is free software; you can redistribute it and/or modify
10
it under the terms of the GNU General Public License as published by
11
the Free Software Foundation; either version 2 of the License, or
12
(at your option) any later version.
13

    
14
This program is distributed in the hope that it will be useful, but
15
WITHOUT ANY WARRANTY; without even the implied warranty of
16
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
17
General Public License for more details.
18

    
19
You should have received a copy of the GNU General Public License
20
along with this program; if not, write to the Free Software
21
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22
02110-1301, USA.
23

    
24
-}
25

    
26
module Ganeti.JQScheduler
27
  ( JQStatus
28
  , emptyJQStatus
29
  , initJQScheduler
30
  , enqueueNewJobs
31
  ) where
32

    
33
import Control.Arrow
34
import Control.Concurrent
35
import Control.Exception
36
import Control.Monad
37
import Data.List
38
import Data.Maybe
39
import Data.IORef
40
import System.INotify
41

    
42
import Ganeti.BasicTypes
43
import Ganeti.Constants as C
44
import Ganeti.JQueue as JQ
45
import Ganeti.Logging
46
import Ganeti.Path
47
import Ganeti.Types
48
import Ganeti.Utils
49

    
50
data JobWithStat = JobWithStat { jINotify :: Maybe INotify
51
                               , jStat :: FStat
52
                               , jJob :: QueuedJob
53
                               }
54
data Queue = Queue { qEnqueued :: [JobWithStat], qRunning :: [JobWithStat] }
55

    
56
{-| Representation of the job queue
57

    
58
We keep two lists of jobs (together with information about the last
59
fstat result observed): the jobs that are enqueued, but not yet handed
60
over for execution, and the jobs already handed over for execution. They
61
are kept together in a single IORef, so that we can atomically update
62
both, in particular when scheduling jobs to be handed over for execution.
63

    
64
-}
65

    
66
data JQStatus = JQStatus
67
  { jqJobs :: IORef Queue
68
  }
69

    
70

    
71
emptyJQStatus :: IO JQStatus
72
emptyJQStatus = do
73
  jqJ <- newIORef Queue {qEnqueued=[], qRunning=[]}
74
  return JQStatus { jqJobs=jqJ }
75

    
76
-- | Apply a function on the running jobs.
77
onRunningJobs :: ([JobWithStat] -> [JobWithStat]) -> Queue -> Queue
78
onRunningJobs f queue = queue {qRunning=f $ qRunning queue}
79

    
80
-- | Apply a function on the queued jobs.
81
onQueuedJobs :: ([JobWithStat] -> [JobWithStat]) -> Queue -> Queue
82
onQueuedJobs f queue = queue {qEnqueued=f $ qEnqueued queue}
83

    
84
-- | Obtain a JobWithStat from a QueuedJob.
85
unreadJob :: QueuedJob -> JobWithStat
86
unreadJob job = JobWithStat {jJob=job, jStat=nullFStat, jINotify=Nothing}
87

    
88
-- | Reload interval for polling the running jobs for updates in microseconds.
89
watchInterval :: Int
90
watchInterval = C.luxidJobqueuePollInterval * 1000000 
91

    
92
-- | Maximal number of jobs to be running at the same time.
93
maxRunningJobs :: Int
94
maxRunningJobs = C.luxidMaximalRunningJobs 
95

    
96
-- | Wrapper function to atomically update the jobs in the queue status.
97
modifyJobs :: JQStatus -> (Queue -> Queue) -> IO ()
98
modifyJobs qstat f = atomicModifyIORef (jqJobs qstat) (flip (,) ()  . f)
99

    
100
-- | Reread a job from disk, if the file has changed.
101
readJobStatus :: JobWithStat -> IO (Maybe JobWithStat)
102
readJobStatus jWS@(JobWithStat {jStat=fstat, jJob=job})  = do
103
  let jid = qjId job
104
  qdir <- queueDir
105
  let fpath = liveJobFile qdir jid
106
  logDebug $ "Checking if " ++ fpath ++ " changed on disk."
107
  changedResult <- try $ needsReload fstat fpath
108
                   :: IO (Either IOError (Maybe FStat))
109
  let changed = either (const $ Just nullFStat) id changedResult
110
  case changed of
111
    Nothing -> do
112
      logDebug $ "File " ++ fpath ++ " not changed on disk."
113
      return Nothing
114
    Just fstat' -> do
115
      let jids = show $ fromJobId jid
116
      logInfo $ "Rereading job "  ++ jids
117
      readResult <- loadJobFromDisk qdir True jid
118
      case readResult of
119
        Bad s -> do
120
          logWarning $ "Failed to read job " ++ jids ++ ": " ++ s
121
          return Nothing
122
        Ok (job', _) -> do
123
          logDebug
124
            $ "Read job " ++ jids ++ ", staus is " ++ show (calcJobStatus job')
125
          return . Just $ jWS {jStat=fstat', jJob=job'}
126
                          -- jINotify unchanged
127

    
128
-- | Update a job in the job queue, if it is still there. This is the
129
-- pure function for inserting a previously read change into the queue.
130
-- as the change contains its time stamp, we don't have to worry about a
131
-- later read change overwriting a newer read state. If this happens, the
132
-- fstat value will be outdated, so the next poller run will fix this.
133
updateJobStatus :: JobWithStat -> [JobWithStat] -> [JobWithStat]
134
updateJobStatus job' =
135
  let jid = qjId $ jJob job' in
136
  map (\job -> if qjId (jJob job) == jid then job' else job)
137

    
138
-- | Update a single job by reading it from disk, if necessary.
139
updateJob :: JQStatus -> JobWithStat -> IO ()
140
updateJob state jb = do
141
  jb' <- readJobStatus jb
142
  maybe (return ()) (modifyJobs state . onRunningJobs . updateJobStatus) jb'
143

    
144
-- | Sort out the finished jobs from the monitored part of the queue.
145
-- This is the pure part, splitting the queue into a remaining queue
146
-- and the jobs that were removed.
147
sortoutFinishedJobs :: Queue -> (Queue, [JobWithStat])
148
sortoutFinishedJobs queue =
149
  let (fin, run') = partition (jobFinalized . jJob) . qRunning $ queue
150
  in (queue {qRunning=run'}, fin)
151

    
152
-- | Actually clean up the finished jobs. This is the IO wrapper around
153
-- the pure `sortoutFinishedJobs`.
154
cleanupFinishedJobs :: JQStatus -> IO ()
155
cleanupFinishedJobs qstate = do
156
  finished <- atomicModifyIORef (jqJobs qstate) sortoutFinishedJobs
157
  let showJob = show . ((fromJobId . qjId) &&& calcJobStatus) . jJob
158
      jlist = commaJoin $ map showJob finished
159
  unless (null finished)
160
    . logInfo $ "Finished jobs: " ++ jlist
161
  mapM_ (maybe (return ()) killINotify . jINotify) finished
162

    
163
-- | Watcher task for a job, to update it on file changes. It also
164
-- reinstantiates itself upon receiving an Ignored event.
165
jobWatcher :: JQStatus -> JobWithStat -> Event -> IO ()
166
jobWatcher state jWS e = do
167
  let jid = qjId $ jJob jWS
168
      jids = show $ fromJobId jid
169
  logInfo $ "Scheduler notified of change of job " ++ jids
170
  logDebug $ "Scheulder notify event for " ++ jids ++ ": " ++ show e
171
  let inotify = jINotify jWS
172
  when (e == Ignored  && isJust inotify) $ do
173
    qdir <- queueDir
174
    let fpath = liveJobFile qdir jid
175
    _ <- addWatch (fromJust inotify) [Modify, Delete] fpath
176
           (jobWatcher state jWS)
177
    return ()
178
  updateJob state jWS
179

    
180
-- | Attach the job watcher to a running job.
181
attachWatcher :: JQStatus -> JobWithStat -> IO ()
182
attachWatcher state jWS = when (isNothing $ jINotify jWS) $ do
183
  inotify <- initINotify
184
  qdir <- queueDir
185
  let fpath = liveJobFile qdir . qjId $ jJob jWS
186
      jWS' = jWS { jINotify=Just inotify }
187
  logDebug $ "Attaching queue watcher for " ++ fpath
188
  _ <- addWatch inotify [Modify, Delete] fpath $ jobWatcher state jWS'
189
  modifyJobs state . onRunningJobs $ updateJobStatus jWS'
190

    
191
-- | Decide on which jobs to schedule next for execution. This is the
192
-- pure function doing the scheduling.
193
selectJobsToRun :: Queue -> (Queue, [JobWithStat])
194
selectJobsToRun queue =
195
  let n = maxRunningJobs - length (qRunning queue)
196
      (chosen, remain) = splitAt n (qEnqueued queue)
197
  in (queue {qEnqueued=remain, qRunning=qRunning queue ++ chosen}, chosen)
198

    
199
-- | Requeue jobs that were previously selected for execution
200
-- but couldn't be started.
201
requeueJobs :: JQStatus -> [JobWithStat] -> IOError -> IO ()
202
requeueJobs qstate jobs err = do
203
  let jids = map (qjId . jJob) jobs
204
      jidsString = commaJoin $ map (show . fromJobId) jids
205
      rmJobs = filter ((`notElem` jids) . qjId . jJob)
206
  logWarning $ "Starting jobs failed: " ++ show err
207
  logWarning $ "Rescheduling jobs: " ++ jidsString
208
  modifyJobs qstate (onRunningJobs rmJobs)
209
  modifyJobs qstate (onQueuedJobs $ (++) jobs)
210

    
211
-- | Schedule jobs to be run. This is the IO wrapper around the
212
-- pure `selectJobsToRun`.
213
scheduleSomeJobs :: JQStatus -> IO ()
214
scheduleSomeJobs qstate = do
215
  chosen <- atomicModifyIORef (jqJobs qstate) selectJobsToRun
216
  let jobs = map jJob chosen
217
  unless (null chosen) . logInfo . (++) "Starting jobs: " . commaJoin
218
    $ map (show . fromJobId . qjId) jobs
219
  mapM_ (attachWatcher qstate) chosen
220
  result <- try $ JQ.startJobs jobs
221
  either (requeueJobs qstate chosen) return result
222

    
223
-- | Format the job queue status in a compact, human readable way.
224
showQueue :: Queue -> String
225
showQueue (Queue {qEnqueued=waiting, qRunning=running}) =
226
  let showids = show . map (fromJobId . qjId . jJob)
227
  in "Waiting jobs: " ++ showids waiting 
228
       ++ "; running jobs: " ++ showids running
229

    
230
-- | Time-based watcher for updating the job queue.
231
onTimeWatcher :: JQStatus -> IO ()
232
onTimeWatcher qstate = forever $ do
233
  threadDelay watchInterval
234
  logDebug "Job queue watcher timer fired"
235
  jobs <- readIORef (jqJobs qstate)
236
  mapM_ (updateJob qstate) $ qRunning jobs
237
  cleanupFinishedJobs qstate
238
  jobs' <- readIORef (jqJobs qstate)
239
  logInfo $ showQueue jobs'
240
  scheduleSomeJobs qstate
241

    
242
-- | Read a single, non-archived, job, specified by its id, from disk.
243
readJobFromDisk :: JobId -> IO (Result JobWithStat)
244
readJobFromDisk jid = do
245
  qdir <- queueDir
246
  let fpath = liveJobFile qdir jid
247
  logDebug $ "Reading " ++ fpath
248
  tryFstat <- try $ getFStat fpath :: IO (Either IOError FStat)
249
  let fstat = either (const nullFStat) id tryFstat
250
  loadResult <- JQ.loadJobFromDisk qdir False jid
251
  return $ liftM (JobWithStat Nothing fstat . fst) loadResult
252

    
253
-- | Read all non-finalized jobs from disk.
254
readJobsFromDisk :: IO [JobWithStat]
255
readJobsFromDisk = do
256
  logInfo "Loading job queue"
257
  qdir <- queueDir
258
  eitherJids <- JQ.getJobIDs [qdir]
259
  let jids = either (const []) JQ.sortJobIDs eitherJids
260
      jidsstring = commaJoin $ map (show . fromJobId) jids
261
  logInfo $ "Non-archived jobs on disk: " ++ jidsstring
262
  jobs <- mapM readJobFromDisk jids
263
  return $ justOk jobs
264

    
265
-- | Set up the job scheduler. This will also start the monitoring
266
-- of changes to the running jobs.
267
initJQScheduler :: JQStatus -> IO ()
268
initJQScheduler qstate = do
269
  alljobs <- readJobsFromDisk
270
  let jobs = filter (not . jobFinalized . jJob) alljobs
271
      (running, queued) = partition (jobStarted . jJob) jobs
272
  modifyJobs qstate (onQueuedJobs (++ queued) . onRunningJobs (++ running))
273
  jqjobs <- readIORef (jqJobs qstate)
274
  logInfo $ showQueue jqjobs
275
  scheduleSomeJobs qstate
276
  logInfo "Starting time-based job queue watcher"
277
  _ <- forkIO $ onTimeWatcher qstate
278
  return ()
279

    
280
-- | Enqueue new jobs. This will guarantee that the jobs will be executed
281
-- eventually.
282
enqueueNewJobs :: JQStatus -> [QueuedJob] -> IO ()
283
enqueueNewJobs state jobs = do
284
  logInfo . (++) "New jobs enqueued: " . commaJoin
285
    $ map (show . fromJobId . qjId) jobs
286
  let jobs' = map unreadJob jobs
287
  modifyJobs state (onQueuedJobs (++ jobs'))
288
  scheduleSomeJobs state