Statistics
| Branch: | Tag: | Revision:

root / src / Ganeti / JQScheduler.hs @ 1c532d2d

History | View | Annotate | Download (7.9 kB)

1
{-| Implementation of a reader for the job queue.
2

    
3
-}
4

    
5
{-
6

    
7
Copyright (C) 2013 Google Inc.
8

    
9
This program is free software; you can redistribute it and/or modify
10
it under the terms of the GNU General Public License as published by
11
the Free Software Foundation; either version 2 of the License, or
12
(at your option) any later version.
13

    
14
This program is distributed in the hope that it will be useful, but
15
WITHOUT ANY WARRANTY; without even the implied warranty of
16
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
17
General Public License for more details.
18

    
19
You should have received a copy of the GNU General Public License
20
along with this program; if not, write to the Free Software
21
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22
02110-1301, USA.
23

    
24
-}
25

    
26
module Ganeti.JQScheduler
27
  ( JQStatus
28
  , emptyJQStatus
29
  , initJQScheduler
30
  , enqueueNewJobs
31
  ) where
32

    
33
import Control.Arrow
34
import Control.Concurrent
35
import Control.Exception
36
import Control.Monad
37
import Data.List
38
import Data.IORef
39

    
40
import Ganeti.BasicTypes
41
import Ganeti.Constants as C
42
import Ganeti.JQueue as JQ
43
import Ganeti.Logging
44
import Ganeti.Path
45
import Ganeti.Types
46
import Ganeti.Utils
47

    
48
data JobWithStat = JobWithStat { jStat :: FStat, jJob :: QueuedJob }
49
data Queue = Queue { qEnqueued :: [JobWithStat], qRunning :: [JobWithStat] }
50

    
51
{-| Representation of the job queue
52

    
53
We keep two lists of jobs (together with information about the last
54
fstat result observed): the jobs that are enqueued, but not yet handed
55
over for execution, and the jobs already handed over for execution. They
56
are kept together in a single IORef, so that we can atomically update
57
both, in particular when scheduling jobs to be handed over for execution.
58

    
59
-}
60

    
61
data JQStatus = JQStatus
62
  { jqJobs :: IORef Queue
63
  }
64

    
65

    
66
emptyJQStatus :: IO JQStatus
67
emptyJQStatus = do
68
  jqJ <- newIORef Queue {qEnqueued=[], qRunning=[]}
69
  return JQStatus { jqJobs=jqJ }
70

    
71
-- | Apply a function on the running jobs.
72
onRunningJobs :: ([JobWithStat] -> [JobWithStat]) -> Queue -> Queue
73
onRunningJobs f queue = queue {qRunning=f $ qRunning queue}
74

    
75
-- | Apply a function on the queued jobs.
76
onQueuedJobs :: ([JobWithStat] -> [JobWithStat]) -> Queue -> Queue
77
onQueuedJobs f queue = queue {qEnqueued=f $ qEnqueued queue}
78

    
79
-- | Obtain a JobWithStat from a QueuedJob.
80
unreadJob :: QueuedJob -> JobWithStat
81
unreadJob job = JobWithStat {jJob=job, jStat=nullFStat}
82

    
83
-- | Reload interval for polling the running jobs for updates in microseconds.
84
watchInterval :: Int
85
watchInterval = C.luxidJobqueuePollInterval * 1000000 
86

    
87
-- | Maximal number of jobs to be running at the same time.
88
maxRunningJobs :: Int
89
maxRunningJobs = C.luxidMaximalRunningJobs 
90

    
91
-- | Wrapper function to atomically update the jobs in the queue status.
92
modifyJobs :: JQStatus -> (Queue -> Queue) -> IO ()
93
modifyJobs qstat f = atomicModifyIORef (jqJobs qstat) (flip (,) ()  . f)
94

    
95
-- | Reread a job from disk, if the file has changed.
96
readJobStatus :: JobWithStat -> IO (Maybe JobWithStat)
97
readJobStatus (JobWithStat {jStat=fstat, jJob=job})  = do
98
  let jid = qjId job
99
  qdir <- queueDir
100
  let fpath = liveJobFile qdir jid
101
  logDebug $ "Checking if " ++ fpath ++ " changed on disk."
102
  changed <- needsReload fstat fpath
103
  case changed of
104
    Nothing -> do
105
      logDebug $ "File " ++ fpath ++ " not changed on disk."
106
      return Nothing
107
    Just fstat' -> do
108
      logInfo $ "Rereading " ++ fpath
109
      readResult <- loadJobFromDisk qdir False jid
110
      let jids = show $ fromJobId jid
111
      case readResult of
112
        Bad s -> do
113
          logWarning $ "Failed to read job " ++ jids ++ ": " ++ s
114
          return Nothing
115
        Ok (job', _) -> do
116
          logDebug
117
            $ "Read job " ++ jids ++ ", staus is " ++ show (calcJobStatus job')
118
          return . Just $ JobWithStat {jStat=fstat', jJob=job'}
119

    
120
-- | Update a job in the job queue, if it is still there. This is the
121
-- pure function for inserting a previously read change into the queue.
122
-- as the change contains its time stamp, we don't have to worry about a
123
-- later read change overwriting a newer read state. If this happens, the
124
-- fstat value will be outdated, so the next poller run will fix this.
125
updateJobStatus :: JobWithStat -> [JobWithStat] -> [JobWithStat]
126
updateJobStatus job' =
127
  let jid = qjId $ jJob job' in
128
  map (\job -> if qjId (jJob job) == jid then job' else job)
129

    
130
-- | Update a single job by reading it from disk, if necessary.
131
updateJob :: JQStatus -> JobWithStat -> IO ()
132
updateJob state jb = do
133
  jb' <- readJobStatus jb
134
  maybe (return ()) (modifyJobs state . onRunningJobs . updateJobStatus) jb'
135

    
136
-- | Sort out the finished jobs from the monitored part of the queue.
137
-- This is the pure part, splitting the queue into a remaining queue
138
-- and the jobs that were removed.
139
sortoutFinishedJobs :: Queue -> (Queue, [QueuedJob])
140
sortoutFinishedJobs queue =
141
  let (run', fin) = partition
142
                      ((<= JOB_STATUS_RUNNING) . calcJobStatus . jJob)
143
                      . qRunning $ queue
144
  in (queue {qRunning=run'}, map jJob fin)
145

    
146
-- | Actually clean up the finished jobs. This is the IO wrapper around
147
-- the pure `sortoutFinishedJobs`.
148
cleanupFinishedJobs :: JQStatus -> IO ()
149
cleanupFinishedJobs qstate = do
150
  finished <- atomicModifyIORef (jqJobs qstate) sortoutFinishedJobs
151
  let showJob = show . ((fromJobId . qjId) &&& calcJobStatus)
152
      jlist = commaJoin $ map showJob finished
153
  unless (null finished)
154
    . logInfo $ "Finished jobs: " ++ jlist
155

    
156
-- | Decide on which jobs to schedule next for execution. This is the
157
-- pure function doing the scheduling.
158
selectJobsToRun :: Queue -> (Queue, [QueuedJob])
159
selectJobsToRun queue =
160
  let n = maxRunningJobs - length (qRunning queue)
161
      (chosen, remain) = splitAt n (qEnqueued queue)
162
  in (queue {qEnqueued=remain, qRunning=qRunning queue ++ chosen}
163
     , map jJob chosen)
164

    
165
-- | Requeue jobs that were previously selected for execution
166
-- but couldn't be started.
167
requeueJobs :: JQStatus -> [QueuedJob] -> IOError -> IO ()
168
requeueJobs qstate jobs err = do
169
  let jids = map qjId jobs
170
      jidsString = commaJoin $ map (show . fromJobId) jids
171
      rmJobs = filter ((`notElem` jids) . qjId . jJob)
172
  logWarning $ "Starting jobs failed: " ++ show err
173
  logWarning $ "Rescheduling jobs: " ++ jidsString
174
  modifyJobs qstate (onRunningJobs rmJobs)
175
  modifyJobs qstate (onQueuedJobs . (++) $ map unreadJob jobs)
176

    
177
-- | Schedule jobs to be run. This is the IO wrapper around the
178
-- pure `selectJobsToRun`.
179
scheduleSomeJobs :: JQStatus -> IO ()
180
scheduleSomeJobs qstate = do
181
  chosen <- atomicModifyIORef (jqJobs qstate) selectJobsToRun
182
  unless (null chosen) . logInfo . (++) "Starting jobs: " . commaJoin
183
    $ map (show . fromJobId . qjId) chosen
184
  result <- try $ JQ.startJobs chosen
185
  either (requeueJobs qstate chosen) return result
186

    
187
-- | Format the job queue status in a compact, human readable way.
188
showQueue :: Queue -> String
189
showQueue (Queue {qEnqueued=waiting, qRunning=running}) =
190
  let showids = show . map (fromJobId . qjId . jJob)
191
  in "Waiting jobs: " ++ showids waiting 
192
       ++ "; running jobs: " ++ showids running
193

    
194
-- | Time-based watcher for updating the job queue.
195
onTimeWatcher :: JQStatus -> IO ()
196
onTimeWatcher qstate = forever $ do
197
  threadDelay watchInterval
198
  logDebug "Job queue watcher timer fired"
199
  jobs <- readIORef (jqJobs qstate)
200
  mapM_ (updateJob qstate) $ qRunning jobs
201
  cleanupFinishedJobs qstate
202
  jobs' <- readIORef (jqJobs qstate)
203
  logInfo $ showQueue jobs'
204
  scheduleSomeJobs qstate
205

    
206
-- | Set up the job scheduler. This will also start the monitoring
207
-- of changes to the running jobs.
208
initJQScheduler :: JQStatus -> IO ()
209
initJQScheduler qstate = do
210
  logInfo "Starting time-based job queue watcher"
211
  _ <- forkIO $ onTimeWatcher qstate
212
  return ()
213

    
214
-- | Enqueue new jobs. This will guarantee that the jobs will be executed
215
-- eventually.
216
enqueueNewJobs :: JQStatus -> [QueuedJob] -> IO ()
217
enqueueNewJobs state jobs = do
218
  logInfo . (++) "New jobs enqueued: " . commaJoin
219
    $ map (show . fromJobId . qjId) jobs
220
  let jobs' = map unreadJob jobs
221
  modifyJobs state (onQueuedJobs (++ jobs'))
222
  scheduleSomeJobs state