Statistics
| Branch: | Tag: | Revision:

root / src / Ganeti / JQScheduler.hs @ 350f0759

History | View | Annotate | Download (8.1 kB)

1
{-| Implementation of a reader for the job queue.
2

    
3
-}
4

    
5
{-
6

    
7
Copyright (C) 2013 Google Inc.
8

    
9
This program is free software; you can redistribute it and/or modify
10
it under the terms of the GNU General Public License as published by
11
the Free Software Foundation; either version 2 of the License, or
12
(at your option) any later version.
13

    
14
This program is distributed in the hope that it will be useful, but
15
WITHOUT ANY WARRANTY; without even the implied warranty of
16
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
17
General Public License for more details.
18

    
19
You should have received a copy of the GNU General Public License
20
along with this program; if not, write to the Free Software
21
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22
02110-1301, USA.
23

    
24
-}
25

    
26
module Ganeti.JQScheduler
27
  ( JQStatus
28
  , emptyJQStatus
29
  , initJQScheduler
30
  , enqueueNewJobs
31
  ) where
32

    
33
import Control.Arrow
34
import Control.Concurrent
35
import Control.Exception
36
import Control.Monad
37
import Data.List
38
import Data.IORef
39

    
40
import Ganeti.BasicTypes
41
import Ganeti.Constants as C
42
import Ganeti.JQueue as JQ
43
import Ganeti.Logging
44
import Ganeti.Path
45
import Ganeti.Types
46
import Ganeti.Utils
47

    
48
data JobWithStat = JobWithStat { jStat :: FStat, jJob :: QueuedJob }
49
data Queue = Queue { qEnqueued :: [JobWithStat], qRunning :: [JobWithStat] }
50

    
51
{-| Representation of the job queue
52

    
53
We keep two lists of jobs (together with information about the last
54
fstat result observed): the jobs that are enqueued, but not yet handed
55
over for execution, and the jobs already handed over for execution. They
56
are kept together in a single IORef, so that we can atomically update
57
both, in particular when scheduling jobs to be handed over for execution.
58

    
59
-}
60

    
61
data JQStatus = JQStatus
62
  { jqJobs :: IORef Queue
63
  }
64

    
65

    
66
emptyJQStatus :: IO JQStatus
67
emptyJQStatus = do
68
  jqJ <- newIORef Queue {qEnqueued=[], qRunning=[]}
69
  return JQStatus { jqJobs=jqJ }
70

    
71
-- | Apply a function on the running jobs.
72
onRunningJobs :: ([JobWithStat] -> [JobWithStat]) -> Queue -> Queue
73
onRunningJobs f queue = queue {qRunning=f $ qRunning queue}
74

    
75
-- | Apply a function on the queued jobs.
76
onQueuedJobs :: ([JobWithStat] -> [JobWithStat]) -> Queue -> Queue
77
onQueuedJobs f queue = queue {qEnqueued=f $ qEnqueued queue}
78

    
79
-- | Obtain a JobWithStat from a QueuedJob.
80
unreadJob :: QueuedJob -> JobWithStat
81
unreadJob job = JobWithStat {jJob=job, jStat=nullFStat}
82

    
83
-- | Reload interval for polling the running jobs for updates in microseconds.
84
watchInterval :: Int
85
watchInterval = C.luxidJobqueuePollInterval * 1000000 
86

    
87
-- | Maximal number of jobs to be running at the same time.
88
maxRunningJobs :: Int
89
maxRunningJobs = C.luxidMaximalRunningJobs 
90

    
91
-- | Wrapper function to atomically update the jobs in the queue status.
92
modifyJobs :: JQStatus -> (Queue -> Queue) -> IO ()
93
modifyJobs qstat f = atomicModifyIORef (jqJobs qstat) (flip (,) ()  . f)
94

    
95
-- | Reread a job from disk, if the file has changed.
96
readJobStatus :: JobWithStat -> IO (Maybe JobWithStat)
97
readJobStatus (JobWithStat {jStat=fstat, jJob=job})  = do
98
  let jid = qjId job
99
  qdir <- queueDir
100
  let fpath = liveJobFile qdir jid
101
  logDebug $ "Checking if " ++ fpath ++ " changed on disk."
102
  changedResult <- try $ needsReload fstat fpath
103
                   :: IO (Either IOError (Maybe FStat))
104
  let changed = either (const $ Just nullFStat) id changedResult
105
  case changed of
106
    Nothing -> do
107
      logDebug $ "File " ++ fpath ++ " not changed on disk."
108
      return Nothing
109
    Just fstat' -> do
110
      let jids = show $ fromJobId jid
111
      logInfo $ "Rereading job "  ++ jids
112
      readResult <- loadJobFromDisk qdir True jid
113
      case readResult of
114
        Bad s -> do
115
          logWarning $ "Failed to read job " ++ jids ++ ": " ++ s
116
          return Nothing
117
        Ok (job', _) -> do
118
          logDebug
119
            $ "Read job " ++ jids ++ ", staus is " ++ show (calcJobStatus job')
120
          return . Just $ JobWithStat {jStat=fstat', jJob=job'}
121

    
122
-- | Update a job in the job queue, if it is still there. This is the
123
-- pure function for inserting a previously read change into the queue.
124
-- as the change contains its time stamp, we don't have to worry about a
125
-- later read change overwriting a newer read state. If this happens, the
126
-- fstat value will be outdated, so the next poller run will fix this.
127
updateJobStatus :: JobWithStat -> [JobWithStat] -> [JobWithStat]
128
updateJobStatus job' =
129
  let jid = qjId $ jJob job' in
130
  map (\job -> if qjId (jJob job) == jid then job' else job)
131

    
132
-- | Update a single job by reading it from disk, if necessary.
133
updateJob :: JQStatus -> JobWithStat -> IO ()
134
updateJob state jb = do
135
  jb' <- readJobStatus jb
136
  maybe (return ()) (modifyJobs state . onRunningJobs . updateJobStatus) jb'
137

    
138
-- | Sort out the finished jobs from the monitored part of the queue.
139
-- This is the pure part, splitting the queue into a remaining queue
140
-- and the jobs that were removed.
141
sortoutFinishedJobs :: Queue -> (Queue, [QueuedJob])
142
sortoutFinishedJobs queue =
143
  let (run', fin) = partition
144
                      ((<= JOB_STATUS_RUNNING) . calcJobStatus . jJob)
145
                      . qRunning $ queue
146
  in (queue {qRunning=run'}, map jJob fin)
147

    
148
-- | Actually clean up the finished jobs. This is the IO wrapper around
149
-- the pure `sortoutFinishedJobs`.
150
cleanupFinishedJobs :: JQStatus -> IO ()
151
cleanupFinishedJobs qstate = do
152
  finished <- atomicModifyIORef (jqJobs qstate) sortoutFinishedJobs
153
  let showJob = show . ((fromJobId . qjId) &&& calcJobStatus)
154
      jlist = commaJoin $ map showJob finished
155
  unless (null finished)
156
    . logInfo $ "Finished jobs: " ++ jlist
157

    
158
-- | Decide on which jobs to schedule next for execution. This is the
159
-- pure function doing the scheduling.
160
selectJobsToRun :: Queue -> (Queue, [QueuedJob])
161
selectJobsToRun queue =
162
  let n = maxRunningJobs - length (qRunning queue)
163
      (chosen, remain) = splitAt n (qEnqueued queue)
164
  in (queue {qEnqueued=remain, qRunning=qRunning queue ++ chosen}
165
     , map jJob chosen)
166

    
167
-- | Requeue jobs that were previously selected for execution
168
-- but couldn't be started.
169
requeueJobs :: JQStatus -> [QueuedJob] -> IOError -> IO ()
170
requeueJobs qstate jobs err = do
171
  let jids = map qjId jobs
172
      jidsString = commaJoin $ map (show . fromJobId) jids
173
      rmJobs = filter ((`notElem` jids) . qjId . jJob)
174
  logWarning $ "Starting jobs failed: " ++ show err
175
  logWarning $ "Rescheduling jobs: " ++ jidsString
176
  modifyJobs qstate (onRunningJobs rmJobs)
177
  modifyJobs qstate (onQueuedJobs . (++) $ map unreadJob jobs)
178

    
179
-- | Schedule jobs to be run. This is the IO wrapper around the
180
-- pure `selectJobsToRun`.
181
scheduleSomeJobs :: JQStatus -> IO ()
182
scheduleSomeJobs qstate = do
183
  chosen <- atomicModifyIORef (jqJobs qstate) selectJobsToRun
184
  unless (null chosen) . logInfo . (++) "Starting jobs: " . commaJoin
185
    $ map (show . fromJobId . qjId) chosen
186
  result <- try $ JQ.startJobs chosen
187
  either (requeueJobs qstate chosen) return result
188

    
189
-- | Format the job queue status in a compact, human readable way.
190
showQueue :: Queue -> String
191
showQueue (Queue {qEnqueued=waiting, qRunning=running}) =
192
  let showids = show . map (fromJobId . qjId . jJob)
193
  in "Waiting jobs: " ++ showids waiting 
194
       ++ "; running jobs: " ++ showids running
195

    
196
-- | Time-based watcher for updating the job queue.
197
onTimeWatcher :: JQStatus -> IO ()
198
onTimeWatcher qstate = forever $ do
199
  threadDelay watchInterval
200
  logDebug "Job queue watcher timer fired"
201
  jobs <- readIORef (jqJobs qstate)
202
  mapM_ (updateJob qstate) $ qRunning jobs
203
  cleanupFinishedJobs qstate
204
  jobs' <- readIORef (jqJobs qstate)
205
  logInfo $ showQueue jobs'
206
  scheduleSomeJobs qstate
207

    
208
-- | Set up the job scheduler. This will also start the monitoring
209
-- of changes to the running jobs.
210
initJQScheduler :: JQStatus -> IO ()
211
initJQScheduler qstate = do
212
  logInfo "Starting time-based job queue watcher"
213
  _ <- forkIO $ onTimeWatcher qstate
214
  return ()
215

    
216
-- | Enqueue new jobs. This will guarantee that the jobs will be executed
217
-- eventually.
218
enqueueNewJobs :: JQStatus -> [QueuedJob] -> IO ()
219
enqueueNewJobs state jobs = do
220
  logInfo . (++) "New jobs enqueued: " . commaJoin
221
    $ map (show . fromJobId . qjId) jobs
222
  let jobs' = map unreadJob jobs
223
  modifyJobs state (onQueuedJobs (++ jobs'))
224
  scheduleSomeJobs state