Statistics
| Branch: | Tag: | Revision:

root / src / Ganeti / JQScheduler.hs @ c92b4671

History | View | Annotate | Download (12.3 kB)

1
{-| Implementation of a reader for the job queue.
2

    
3
-}
4

    
5
{-
6

    
7
Copyright (C) 2013 Google Inc.
8

    
9
This program is free software; you can redistribute it and/or modify
10
it under the terms of the GNU General Public License as published by
11
the Free Software Foundation; either version 2 of the License, or
12
(at your option) any later version.
13

    
14
This program is distributed in the hope that it will be useful, but
15
WITHOUT ANY WARRANTY; without even the implied warranty of
16
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
17
General Public License for more details.
18

    
19
You should have received a copy of the GNU General Public License
20
along with this program; if not, write to the Free Software
21
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22
02110-1301, USA.
23

    
24
-}
25

    
26
module Ganeti.JQScheduler
27
  ( JQStatus
28
  , emptyJQStatus
29
  , initJQScheduler
30
  , enqueueNewJobs
31
  , dequeueJob
32
  ) where
33

    
34
import Control.Arrow
35
import Control.Concurrent
36
import Control.Exception
37
import Control.Monad
38
import Data.List
39
import Data.Maybe
40
import Data.IORef
41
import System.INotify
42

    
43
import Ganeti.BasicTypes
44
import Ganeti.Constants as C
45
import Ganeti.JQueue as JQ
46
import Ganeti.Logging
47
import Ganeti.Objects
48
import Ganeti.Path
49
import Ganeti.Types
50
import Ganeti.Utils
51

    
52
data JobWithStat = JobWithStat { jINotify :: Maybe INotify
53
                               , jStat :: FStat
54
                               , jJob :: QueuedJob
55
                               }
56
data Queue = Queue { qEnqueued :: [JobWithStat], qRunning :: [JobWithStat] }
57

    
58
{-| Representation of the job queue
59

    
60
We keep two lists of jobs (together with information about the last
61
fstat result observed): the jobs that are enqueued, but not yet handed
62
over for execution, and the jobs already handed over for execution. They
63
are kept together in a single IORef, so that we can atomically update
64
both, in particular when scheduling jobs to be handed over for execution.
65

    
66
-}
67

    
68
data JQStatus = JQStatus
69
  { jqJobs :: IORef Queue
70
  , jqConfig :: IORef (Result ConfigData)
71
  }
72

    
73

    
74
emptyJQStatus :: IORef (Result ConfigData) -> IO JQStatus
75
emptyJQStatus config = do
76
  jqJ <- newIORef Queue { qEnqueued = [], qRunning = []}
77
  return JQStatus { jqJobs = jqJ, jqConfig = config }
78

    
79
-- | Apply a function on the running jobs.
80
onRunningJobs :: ([JobWithStat] -> [JobWithStat]) -> Queue -> Queue
81
onRunningJobs f queue = queue {qRunning=f $ qRunning queue}
82

    
83
-- | Apply a function on the queued jobs.
84
onQueuedJobs :: ([JobWithStat] -> [JobWithStat]) -> Queue -> Queue
85
onQueuedJobs f queue = queue {qEnqueued=f $ qEnqueued queue}
86

    
87
-- | Obtain a JobWithStat from a QueuedJob.
88
unreadJob :: QueuedJob -> JobWithStat
89
unreadJob job = JobWithStat {jJob=job, jStat=nullFStat, jINotify=Nothing}
90

    
91
-- | Reload interval for polling the running jobs for updates in microseconds.
92
watchInterval :: Int
93
watchInterval = C.luxidJobqueuePollInterval * 1000000 
94

    
95
-- | Get the maximual number of jobs to be run simultaneously from the
96
-- configuration. If the configuration is not available, be conservative
97
-- and use the smallest possible value, i.e., 1.
98
getMaxRunningJobs :: JQStatus -> IO Int
99
getMaxRunningJobs =
100
  liftM (genericResult (const 1) (clusterMaxRunningJobs . configCluster))
101
  . readIORef . jqConfig
102

    
103
-- | Wrapper function to atomically update the jobs in the queue status.
104
modifyJobs :: JQStatus -> (Queue -> Queue) -> IO ()
105
modifyJobs qstat f = atomicModifyIORef (jqJobs qstat) (flip (,) ()  . f)
106

    
107
-- | Reread a job from disk, if the file has changed.
108
readJobStatus :: JobWithStat -> IO (Maybe JobWithStat)
109
readJobStatus jWS@(JobWithStat {jStat=fstat, jJob=job})  = do
110
  let jid = qjId job
111
  qdir <- queueDir
112
  let fpath = liveJobFile qdir jid
113
  logDebug $ "Checking if " ++ fpath ++ " changed on disk."
114
  changedResult <- try $ needsReload fstat fpath
115
                   :: IO (Either IOError (Maybe FStat))
116
  let changed = either (const $ Just nullFStat) id changedResult
117
  case changed of
118
    Nothing -> do
119
      logDebug $ "File " ++ fpath ++ " not changed on disk."
120
      return Nothing
121
    Just fstat' -> do
122
      let jids = show $ fromJobId jid
123
      logInfo $ "Rereading job "  ++ jids
124
      readResult <- loadJobFromDisk qdir True jid
125
      case readResult of
126
        Bad s -> do
127
          logWarning $ "Failed to read job " ++ jids ++ ": " ++ s
128
          return Nothing
129
        Ok (job', _) -> do
130
          logDebug
131
            $ "Read job " ++ jids ++ ", staus is " ++ show (calcJobStatus job')
132
          return . Just $ jWS {jStat=fstat', jJob=job'}
133
                          -- jINotify unchanged
134

    
135
-- | Update a job in the job queue, if it is still there. This is the
136
-- pure function for inserting a previously read change into the queue.
137
-- as the change contains its time stamp, we don't have to worry about a
138
-- later read change overwriting a newer read state. If this happens, the
139
-- fstat value will be outdated, so the next poller run will fix this.
140
updateJobStatus :: JobWithStat -> [JobWithStat] -> [JobWithStat]
141
updateJobStatus job' =
142
  let jid = qjId $ jJob job' in
143
  map (\job -> if qjId (jJob job) == jid then job' else job)
144

    
145
-- | Update a single job by reading it from disk, if necessary.
146
updateJob :: JQStatus -> JobWithStat -> IO ()
147
updateJob state jb = do
148
  jb' <- readJobStatus jb
149
  maybe (return ()) (modifyJobs state . onRunningJobs . updateJobStatus) jb'
150
  when (maybe True (jobFinalized . jJob) jb') . (>> return ()) . forkIO $ do
151
    logDebug "Scheduler noticed a job to have finished."
152
    cleanupFinishedJobs state
153
    scheduleSomeJobs state
154

    
155
-- | Sort out the finished jobs from the monitored part of the queue.
156
-- This is the pure part, splitting the queue into a remaining queue
157
-- and the jobs that were removed.
158
sortoutFinishedJobs :: Queue -> (Queue, [JobWithStat])
159
sortoutFinishedJobs queue =
160
  let (fin, run') = partition (jobFinalized . jJob) . qRunning $ queue
161
  in (queue {qRunning=run'}, fin)
162

    
163
-- | Actually clean up the finished jobs. This is the IO wrapper around
164
-- the pure `sortoutFinishedJobs`.
165
cleanupFinishedJobs :: JQStatus -> IO ()
166
cleanupFinishedJobs qstate = do
167
  finished <- atomicModifyIORef (jqJobs qstate) sortoutFinishedJobs
168
  let showJob = show . ((fromJobId . qjId) &&& calcJobStatus) . jJob
169
      jlist = commaJoin $ map showJob finished
170
  unless (null finished)
171
    . logInfo $ "Finished jobs: " ++ jlist
172
  mapM_ (maybe (return ()) killINotify . jINotify) finished
173

    
174
-- | Watcher task for a job, to update it on file changes. It also
175
-- reinstantiates itself upon receiving an Ignored event.
176
jobWatcher :: JQStatus -> JobWithStat -> Event -> IO ()
177
jobWatcher state jWS e = do
178
  let jid = qjId $ jJob jWS
179
      jids = show $ fromJobId jid
180
  logInfo $ "Scheduler notified of change of job " ++ jids
181
  logDebug $ "Scheulder notify event for " ++ jids ++ ": " ++ show e
182
  let inotify = jINotify jWS
183
  when (e == Ignored  && isJust inotify) $ do
184
    qdir <- queueDir
185
    let fpath = liveJobFile qdir jid
186
    _ <- addWatch (fromJust inotify) [Modify, Delete] fpath
187
           (jobWatcher state jWS)
188
    return ()
189
  updateJob state jWS
190

    
191
-- | Attach the job watcher to a running job.
192
attachWatcher :: JQStatus -> JobWithStat -> IO ()
193
attachWatcher state jWS = when (isNothing $ jINotify jWS) $ do
194
  inotify <- initINotify
195
  qdir <- queueDir
196
  let fpath = liveJobFile qdir . qjId $ jJob jWS
197
      jWS' = jWS { jINotify=Just inotify }
198
  logDebug $ "Attaching queue watcher for " ++ fpath
199
  _ <- addWatch inotify [Modify, Delete] fpath $ jobWatcher state jWS'
200
  modifyJobs state . onRunningJobs $ updateJobStatus jWS'
201

    
202
-- | Decide on which jobs to schedule next for execution. This is the
203
-- pure function doing the scheduling.
204
selectJobsToRun :: Int -> Queue -> (Queue, [JobWithStat])
205
selectJobsToRun count queue =
206
  let n = count - length (qRunning queue)
207
      (chosen, remain) = splitAt n (qEnqueued queue)
208
  in (queue {qEnqueued=remain, qRunning=qRunning queue ++ chosen}, chosen)
209

    
210
-- | Requeue jobs that were previously selected for execution
211
-- but couldn't be started.
212
requeueJobs :: JQStatus -> [JobWithStat] -> IOError -> IO ()
213
requeueJobs qstate jobs err = do
214
  let jids = map (qjId . jJob) jobs
215
      jidsString = commaJoin $ map (show . fromJobId) jids
216
      rmJobs = filter ((`notElem` jids) . qjId . jJob)
217
  logWarning $ "Starting jobs failed: " ++ show err
218
  logWarning $ "Rescheduling jobs: " ++ jidsString
219
  modifyJobs qstate (onRunningJobs rmJobs)
220
  modifyJobs qstate (onQueuedJobs $ (++) jobs)
221

    
222
-- | Schedule jobs to be run. This is the IO wrapper around the
223
-- pure `selectJobsToRun`.
224
scheduleSomeJobs :: JQStatus -> IO ()
225
scheduleSomeJobs qstate = do
226
  count <- getMaxRunningJobs qstate
227
  chosen <- atomicModifyIORef (jqJobs qstate) (selectJobsToRun count)
228
  let jobs = map jJob chosen
229
  unless (null chosen) . logInfo . (++) "Starting jobs: " . commaJoin
230
    $ map (show . fromJobId . qjId) jobs
231
  mapM_ (attachWatcher qstate) chosen
232
  result <- try $ JQ.startJobs jobs
233
  either (requeueJobs qstate chosen) return result
234

    
235
-- | Format the job queue status in a compact, human readable way.
236
showQueue :: Queue -> String
237
showQueue (Queue {qEnqueued=waiting, qRunning=running}) =
238
  let showids = show . map (fromJobId . qjId . jJob)
239
  in "Waiting jobs: " ++ showids waiting 
240
       ++ "; running jobs: " ++ showids running
241

    
242
-- | Time-based watcher for updating the job queue.
243
onTimeWatcher :: JQStatus -> IO ()
244
onTimeWatcher qstate = forever $ do
245
  threadDelay watchInterval
246
  logDebug "Job queue watcher timer fired"
247
  jobs <- readIORef (jqJobs qstate)
248
  mapM_ (updateJob qstate) $ qRunning jobs
249
  cleanupFinishedJobs qstate
250
  jobs' <- readIORef (jqJobs qstate)
251
  logInfo $ showQueue jobs'
252
  scheduleSomeJobs qstate
253

    
254
-- | Read a single, non-archived, job, specified by its id, from disk.
255
readJobFromDisk :: JobId -> IO (Result JobWithStat)
256
readJobFromDisk jid = do
257
  qdir <- queueDir
258
  let fpath = liveJobFile qdir jid
259
  logDebug $ "Reading " ++ fpath
260
  tryFstat <- try $ getFStat fpath :: IO (Either IOError FStat)
261
  let fstat = either (const nullFStat) id tryFstat
262
  loadResult <- JQ.loadJobFromDisk qdir False jid
263
  return $ liftM (JobWithStat Nothing fstat . fst) loadResult
264

    
265
-- | Read all non-finalized jobs from disk.
266
readJobsFromDisk :: IO [JobWithStat]
267
readJobsFromDisk = do
268
  logInfo "Loading job queue"
269
  qdir <- queueDir
270
  eitherJids <- JQ.getJobIDs [qdir]
271
  let jids = either (const []) JQ.sortJobIDs eitherJids
272
      jidsstring = commaJoin $ map (show . fromJobId) jids
273
  logInfo $ "Non-archived jobs on disk: " ++ jidsstring
274
  jobs <- mapM readJobFromDisk jids
275
  return $ justOk jobs
276

    
277
-- | Set up the job scheduler. This will also start the monitoring
278
-- of changes to the running jobs.
279
initJQScheduler :: JQStatus -> IO ()
280
initJQScheduler qstate = do
281
  alljobs <- readJobsFromDisk
282
  let jobs = filter (not . jobFinalized . jJob) alljobs
283
      (running, queued) = partition (jobStarted . jJob) jobs
284
  modifyJobs qstate (onQueuedJobs (++ queued) . onRunningJobs (++ running))
285
  jqjobs <- readIORef (jqJobs qstate)
286
  logInfo $ showQueue jqjobs
287
  scheduleSomeJobs qstate
288
  logInfo "Starting time-based job queue watcher"
289
  _ <- forkIO $ onTimeWatcher qstate
290
  return ()
291

    
292
-- | Enqueue new jobs. This will guarantee that the jobs will be executed
293
-- eventually.
294
enqueueNewJobs :: JQStatus -> [QueuedJob] -> IO ()
295
enqueueNewJobs state jobs = do
296
  logInfo . (++) "New jobs enqueued: " . commaJoin
297
    $ map (show . fromJobId . qjId) jobs
298
  let jobs' = map unreadJob jobs
299
  modifyJobs state (onQueuedJobs (++ jobs'))
300
  scheduleSomeJobs state
301

    
302
-- | Pure function for removing a queued job from the job queue by
303
-- atomicModifyIORef. The answer is True if the job could be removed
304
-- before being handed over to execution, False if it already was started
305
-- and a Bad result if the job is not found in the queue.
306
rmJob :: JobId -> Queue -> (Queue, Result Bool)
307
rmJob jid q =
308
  let isJid = (jid ==) . qjId . jJob
309
      (found, queued') = partition isJid $ qEnqueued q
310
  in if null found
311
       then if any isJid $ qRunning q
312
              then (q, Ok False)
313
              else (q, Bad $ "Job " ++ show (fromJobId jid)
314
                              ++ " unknown to the queue")
315
       else (q {qEnqueued = queued'}, Ok True)
316

    
317
-- | Try to remove a queued job from the job queue. Return True, if
318
-- the job could be removed from the queue before being handed over
319
-- to execution, False if the job already started, and a Bad result
320
-- if the job is unknown.
321
dequeueJob :: JQStatus -> JobId -> IO (Result Bool)
322
dequeueJob state jid = do
323
  result <- atomicModifyIORef (jqJobs state) $ rmJob jid
324
  logDebug $ "Result of dequeing job " ++ show (fromJobId jid)
325
              ++ " is " ++ show result
326
  return result