Statistics
| Branch: | Tag: | Revision:

root / src / Ganeti / JQScheduler.hs @ cc5ab470

History | View | Annotate | Download (9.4 kB)

1
{-| Implementation of a reader for the job queue.
2

    
3
-}
4

    
5
{-
6

    
7
Copyright (C) 2013 Google Inc.
8

    
9
This program is free software; you can redistribute it and/or modify
10
it under the terms of the GNU General Public License as published by
11
the Free Software Foundation; either version 2 of the License, or
12
(at your option) any later version.
13

    
14
This program is distributed in the hope that it will be useful, but
15
WITHOUT ANY WARRANTY; without even the implied warranty of
16
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
17
General Public License for more details.
18

    
19
You should have received a copy of the GNU General Public License
20
along with this program; if not, write to the Free Software
21
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22
02110-1301, USA.
23

    
24
-}
25

    
26
module Ganeti.JQScheduler
27
  ( JQStatus
28
  , emptyJQStatus
29
  , initJQScheduler
30
  , enqueueNewJobs
31
  ) where
32

    
33
import Control.Arrow
34
import Control.Concurrent
35
import Control.Exception
36
import Control.Monad
37
import Data.List
38
import Data.IORef
39
import System.INotify
40

    
41
import Ganeti.BasicTypes
42
import Ganeti.Constants as C
43
import Ganeti.JQueue as JQ
44
import Ganeti.Logging
45
import Ganeti.Path
46
import Ganeti.Types
47
import Ganeti.Utils
48

    
49
data JobWithStat = JobWithStat { jINotify :: Maybe INotify
50
                               , jStat :: FStat
51
                               , jJob :: QueuedJob
52
                               }
53
data Queue = Queue { qEnqueued :: [JobWithStat], qRunning :: [JobWithStat] }
54

    
55
{-| Representation of the job queue
56

    
57
We keep two lists of jobs (together with information about the last
58
fstat result observed): the jobs that are enqueued, but not yet handed
59
over for execution, and the jobs already handed over for execution. They
60
are kept together in a single IORef, so that we can atomically update
61
both, in particular when scheduling jobs to be handed over for execution.
62

    
63
-}
64

    
65
data JQStatus = JQStatus
66
  { jqJobs :: IORef Queue
67
  }
68

    
69

    
70
emptyJQStatus :: IO JQStatus
71
emptyJQStatus = do
72
  jqJ <- newIORef Queue {qEnqueued=[], qRunning=[]}
73
  return JQStatus { jqJobs=jqJ }
74

    
75
-- | Apply a function on the running jobs.
76
onRunningJobs :: ([JobWithStat] -> [JobWithStat]) -> Queue -> Queue
77
onRunningJobs f queue = queue {qRunning=f $ qRunning queue}
78

    
79
-- | Apply a function on the queued jobs.
80
onQueuedJobs :: ([JobWithStat] -> [JobWithStat]) -> Queue -> Queue
81
onQueuedJobs f queue = queue {qEnqueued=f $ qEnqueued queue}
82

    
83
-- | Obtain a JobWithStat from a QueuedJob.
84
unreadJob :: QueuedJob -> JobWithStat
85
unreadJob job = JobWithStat {jJob=job, jStat=nullFStat, jINotify=Nothing}
86

    
87
-- | Reload interval for polling the running jobs for updates in microseconds.
88
watchInterval :: Int
89
watchInterval = C.luxidJobqueuePollInterval * 1000000 
90

    
91
-- | Maximal number of jobs to be running at the same time.
92
maxRunningJobs :: Int
93
maxRunningJobs = C.luxidMaximalRunningJobs 
94

    
95
-- | Wrapper function to atomically update the jobs in the queue status.
96
modifyJobs :: JQStatus -> (Queue -> Queue) -> IO ()
97
modifyJobs qstat f = atomicModifyIORef (jqJobs qstat) (flip (,) ()  . f)
98

    
99
-- | Reread a job from disk, if the file has changed.
100
readJobStatus :: JobWithStat -> IO (Maybe JobWithStat)
101
readJobStatus jWS@(JobWithStat {jStat=fstat, jJob=job})  = do
102
  let jid = qjId job
103
  qdir <- queueDir
104
  let fpath = liveJobFile qdir jid
105
  logDebug $ "Checking if " ++ fpath ++ " changed on disk."
106
  changedResult <- try $ needsReload fstat fpath
107
                   :: IO (Either IOError (Maybe FStat))
108
  let changed = either (const $ Just nullFStat) id changedResult
109
  case changed of
110
    Nothing -> do
111
      logDebug $ "File " ++ fpath ++ " not changed on disk."
112
      return Nothing
113
    Just fstat' -> do
114
      let jids = show $ fromJobId jid
115
      logInfo $ "Rereading job "  ++ jids
116
      readResult <- loadJobFromDisk qdir True jid
117
      case readResult of
118
        Bad s -> do
119
          logWarning $ "Failed to read job " ++ jids ++ ": " ++ s
120
          return Nothing
121
        Ok (job', _) -> do
122
          logDebug
123
            $ "Read job " ++ jids ++ ", staus is " ++ show (calcJobStatus job')
124
          return . Just $ jWS {jStat=fstat', jJob=job'}
125
                          -- jINotify unchanged
126

    
127
-- | Update a job in the job queue, if it is still there. This is the
128
-- pure function for inserting a previously read change into the queue.
129
-- as the change contains its time stamp, we don't have to worry about a
130
-- later read change overwriting a newer read state. If this happens, the
131
-- fstat value will be outdated, so the next poller run will fix this.
132
updateJobStatus :: JobWithStat -> [JobWithStat] -> [JobWithStat]
133
updateJobStatus job' =
134
  let jid = qjId $ jJob job' in
135
  map (\job -> if qjId (jJob job) == jid then job' else job)
136

    
137
-- | Update a single job by reading it from disk, if necessary.
138
updateJob :: JQStatus -> JobWithStat -> IO ()
139
updateJob state jb = do
140
  jb' <- readJobStatus jb
141
  maybe (return ()) (modifyJobs state . onRunningJobs . updateJobStatus) jb'
142

    
143
-- | Sort out the finished jobs from the monitored part of the queue.
144
-- This is the pure part, splitting the queue into a remaining queue
145
-- and the jobs that were removed.
146
sortoutFinishedJobs :: Queue -> (Queue, [QueuedJob])
147
sortoutFinishedJobs queue =
148
  let (fin, run') = partition (jobFinalized . jJob) . qRunning $ queue
149
  in (queue {qRunning=run'}, map jJob fin)
150

    
151
-- | Actually clean up the finished jobs. This is the IO wrapper around
152
-- the pure `sortoutFinishedJobs`.
153
cleanupFinishedJobs :: JQStatus -> IO ()
154
cleanupFinishedJobs qstate = do
155
  finished <- atomicModifyIORef (jqJobs qstate) sortoutFinishedJobs
156
  let showJob = show . ((fromJobId . qjId) &&& calcJobStatus)
157
      jlist = commaJoin $ map showJob finished
158
  unless (null finished)
159
    . logInfo $ "Finished jobs: " ++ jlist
160
  mapM_ (maybe (return ()) killINotify . jINotify) finished
161

    
162
-- | Decide on which jobs to schedule next for execution. This is the
163
-- pure function doing the scheduling.
164
selectJobsToRun :: Queue -> (Queue, [QueuedJob])
165
selectJobsToRun queue =
166
  let n = maxRunningJobs - length (qRunning queue)
167
      (chosen, remain) = splitAt n (qEnqueued queue)
168
  in (queue {qEnqueued=remain, qRunning=qRunning queue ++ chosen}
169
     , map jJob chosen)
170

    
171
-- | Requeue jobs that were previously selected for execution
172
-- but couldn't be started.
173
requeueJobs :: JQStatus -> [QueuedJob] -> IOError -> IO ()
174
requeueJobs qstate jobs err = do
175
  let jids = map qjId jobs
176
      jidsString = commaJoin $ map (show . fromJobId) jids
177
      rmJobs = filter ((`notElem` jids) . qjId . jJob)
178
  logWarning $ "Starting jobs failed: " ++ show err
179
  logWarning $ "Rescheduling jobs: " ++ jidsString
180
  modifyJobs qstate (onRunningJobs rmJobs)
181
  modifyJobs qstate (onQueuedJobs . (++) $ map unreadJob jobs)
182

    
183
-- | Schedule jobs to be run. This is the IO wrapper around the
184
-- pure `selectJobsToRun`.
185
scheduleSomeJobs :: JQStatus -> IO ()
186
scheduleSomeJobs qstate = do
187
  chosen <- atomicModifyIORef (jqJobs qstate) selectJobsToRun
188
  unless (null chosen) . logInfo . (++) "Starting jobs: " . commaJoin
189
    $ map (show . fromJobId . qjId) chosen
190
  result <- try $ JQ.startJobs chosen
191
  either (requeueJobs qstate chosen) return result
192

    
193
-- | Format the job queue status in a compact, human readable way.
194
showQueue :: Queue -> String
195
showQueue (Queue {qEnqueued=waiting, qRunning=running}) =
196
  let showids = show . map (fromJobId . qjId . jJob)
197
  in "Waiting jobs: " ++ showids waiting 
198
       ++ "; running jobs: " ++ showids running
199

    
200
-- | Time-based watcher for updating the job queue.
201
onTimeWatcher :: JQStatus -> IO ()
202
onTimeWatcher qstate = forever $ do
203
  threadDelay watchInterval
204
  logDebug "Job queue watcher timer fired"
205
  jobs <- readIORef (jqJobs qstate)
206
  mapM_ (updateJob qstate) $ qRunning jobs
207
  cleanupFinishedJobs qstate
208
  jobs' <- readIORef (jqJobs qstate)
209
  logInfo $ showQueue jobs'
210
  scheduleSomeJobs qstate
211

    
212
-- | Read a single, non-archived, job, specified by its id, from disk.
213
readJobFromDisk :: JobId -> IO (Result JobWithStat)
214
readJobFromDisk jid = do
215
  qdir <- queueDir
216
  let fpath = liveJobFile qdir jid
217
  logDebug $ "Reading " ++ fpath
218
  tryFstat <- try $ getFStat fpath :: IO (Either IOError FStat)
219
  let fstat = either (const nullFStat) id tryFstat
220
  loadResult <- JQ.loadJobFromDisk qdir False jid
221
  return $ liftM (JobWithStat Nothing fstat . fst) loadResult
222

    
223
-- | Read all non-finalized jobs from disk.
224
readJobsFromDisk :: IO [JobWithStat]
225
readJobsFromDisk = do
226
  logInfo "Loading job queue"
227
  qdir <- queueDir
228
  eitherJids <- JQ.getJobIDs [qdir]
229
  let jids = either (const []) JQ.sortJobIDs eitherJids
230
      jidsstring = commaJoin $ map (show . fromJobId) jids
231
  logInfo $ "Non-archived jobs on disk: " ++ jidsstring
232
  jobs <- mapM readJobFromDisk jids
233
  return $ justOk jobs
234

    
235
-- | Set up the job scheduler. This will also start the monitoring
236
-- of changes to the running jobs.
237
initJQScheduler :: JQStatus -> IO ()
238
initJQScheduler qstate = do
239
  alljobs <- readJobsFromDisk
240
  let jobs = filter (not . jobFinalized . jJob) alljobs
241
      (running, queued) = partition (jobStarted . jJob) jobs
242
  modifyJobs qstate (onQueuedJobs (++ queued) . onRunningJobs (++ running))
243
  jqjobs <- readIORef (jqJobs qstate)
244
  logInfo $ showQueue jqjobs
245
  scheduleSomeJobs qstate
246
  logInfo "Starting time-based job queue watcher"
247
  _ <- forkIO $ onTimeWatcher qstate
248
  return ()
249

    
250
-- | Enqueue new jobs. This will guarantee that the jobs will be executed
251
-- eventually.
252
enqueueNewJobs :: JQStatus -> [QueuedJob] -> IO ()
253
enqueueNewJobs state jobs = do
254
  logInfo . (++) "New jobs enqueued: " . commaJoin
255
    $ map (show . fromJobId . qjId) jobs
256
  let jobs' = map unreadJob jobs
257
  modifyJobs state (onQueuedJobs (++ jobs'))
258
  scheduleSomeJobs state