Statistics
| Branch: | Tag: | Revision:

root / src / Ganeti / JQScheduler.hs @ 48e4da5c

History | View | Annotate | Download (7.3 kB)

1 48e4da5c Klaus Aehlig
{-| Implementation of a reader for the job queue.
2 48e4da5c Klaus Aehlig
3 48e4da5c Klaus Aehlig
-}
4 48e4da5c Klaus Aehlig
5 48e4da5c Klaus Aehlig
{-
6 48e4da5c Klaus Aehlig
7 48e4da5c Klaus Aehlig
Copyright (C) 2013 Google Inc.
8 48e4da5c Klaus Aehlig
9 48e4da5c Klaus Aehlig
This program is free software; you can redistribute it and/or modify
10 48e4da5c Klaus Aehlig
it under the terms of the GNU General Public License as published by
11 48e4da5c Klaus Aehlig
the Free Software Foundation; either version 2 of the License, or
12 48e4da5c Klaus Aehlig
(at your option) any later version.
13 48e4da5c Klaus Aehlig
14 48e4da5c Klaus Aehlig
This program is distributed in the hope that it will be useful, but
15 48e4da5c Klaus Aehlig
WITHOUT ANY WARRANTY; without even the implied warranty of
16 48e4da5c Klaus Aehlig
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
17 48e4da5c Klaus Aehlig
General Public License for more details.
18 48e4da5c Klaus Aehlig
19 48e4da5c Klaus Aehlig
You should have received a copy of the GNU General Public License
20 48e4da5c Klaus Aehlig
along with this program; if not, write to the Free Software
21 48e4da5c Klaus Aehlig
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 48e4da5c Klaus Aehlig
02110-1301, USA.
23 48e4da5c Klaus Aehlig
24 48e4da5c Klaus Aehlig
-}
25 48e4da5c Klaus Aehlig
26 48e4da5c Klaus Aehlig
module Ganeti.JQScheduler
27 48e4da5c Klaus Aehlig
  ( JQStatus
28 48e4da5c Klaus Aehlig
  , emptyJQStatus
29 48e4da5c Klaus Aehlig
  , initJQScheduler
30 48e4da5c Klaus Aehlig
  , enqueueNewJobs
31 48e4da5c Klaus Aehlig
  ) where
32 48e4da5c Klaus Aehlig
33 48e4da5c Klaus Aehlig
import Control.Arrow
34 48e4da5c Klaus Aehlig
import Control.Concurrent
35 48e4da5c Klaus Aehlig
import Control.Monad
36 48e4da5c Klaus Aehlig
import Data.List
37 48e4da5c Klaus Aehlig
import Data.IORef
38 48e4da5c Klaus Aehlig
39 48e4da5c Klaus Aehlig
import Ganeti.BasicTypes
40 48e4da5c Klaus Aehlig
import Ganeti.Constants as C
41 48e4da5c Klaus Aehlig
import Ganeti.JQueue as JQ
42 48e4da5c Klaus Aehlig
import Ganeti.Logging
43 48e4da5c Klaus Aehlig
import Ganeti.Path
44 48e4da5c Klaus Aehlig
import Ganeti.Types
45 48e4da5c Klaus Aehlig
import Ganeti.Utils
46 48e4da5c Klaus Aehlig
47 48e4da5c Klaus Aehlig
data JobWithStat = JobWithStat { jStat :: FStat, jJob :: QueuedJob }
48 48e4da5c Klaus Aehlig
data Queue = Queue { qEnqueued :: [JobWithStat], qRunning :: [JobWithStat] }
49 48e4da5c Klaus Aehlig
50 48e4da5c Klaus Aehlig
{-| Representation of the job queue
51 48e4da5c Klaus Aehlig
52 48e4da5c Klaus Aehlig
We keep two lists of jobs (together with information about the last
53 48e4da5c Klaus Aehlig
fstat result observed): the jobs that are enqueued, but not yet handed
54 48e4da5c Klaus Aehlig
over for execution, and the jobs already handed over for execution. They
55 48e4da5c Klaus Aehlig
are kept together in a single IORef, so that we can atomically update
56 48e4da5c Klaus Aehlig
both, in particular when scheduling jobs to be handed over for execution.
57 48e4da5c Klaus Aehlig
58 48e4da5c Klaus Aehlig
-}
59 48e4da5c Klaus Aehlig
60 48e4da5c Klaus Aehlig
data JQStatus = JQStatus
61 48e4da5c Klaus Aehlig
  { jqJobs :: IORef Queue
62 48e4da5c Klaus Aehlig
  }
63 48e4da5c Klaus Aehlig
64 48e4da5c Klaus Aehlig
65 48e4da5c Klaus Aehlig
emptyJQStatus :: IO JQStatus
66 48e4da5c Klaus Aehlig
emptyJQStatus = do
67 48e4da5c Klaus Aehlig
  jqJ <- newIORef Queue {qEnqueued=[], qRunning=[]}
68 48e4da5c Klaus Aehlig
  return JQStatus { jqJobs=jqJ }
69 48e4da5c Klaus Aehlig
70 48e4da5c Klaus Aehlig
-- | Apply a function on the running jobs.
71 48e4da5c Klaus Aehlig
onRunningJobs :: ([JobWithStat] -> [JobWithStat]) -> Queue -> Queue
72 48e4da5c Klaus Aehlig
onRunningJobs f queue = queue {qRunning=f $ qRunning queue}
73 48e4da5c Klaus Aehlig
74 48e4da5c Klaus Aehlig
-- | Apply a function on the queued jobs.
75 48e4da5c Klaus Aehlig
onQueuedJobs :: ([JobWithStat] -> [JobWithStat]) -> Queue -> Queue
76 48e4da5c Klaus Aehlig
onQueuedJobs f queue = queue {qEnqueued=f $ qEnqueued queue}
77 48e4da5c Klaus Aehlig
78 48e4da5c Klaus Aehlig
-- | Obtain a JobWithStat from a QueuedJob.
79 48e4da5c Klaus Aehlig
unreadJob :: QueuedJob -> JobWithStat
80 48e4da5c Klaus Aehlig
unreadJob job = JobWithStat {jJob=job, jStat=nullFStat}
81 48e4da5c Klaus Aehlig
82 48e4da5c Klaus Aehlig
-- | Reload interval for polling the running jobs for updates in microseconds.
83 48e4da5c Klaus Aehlig
watchInterval :: Int
84 48e4da5c Klaus Aehlig
watchInterval = C.luxidJobqueuePollInterval * 1000000 
85 48e4da5c Klaus Aehlig
86 48e4da5c Klaus Aehlig
-- | Maximal number of jobs to be running at the same time.
87 48e4da5c Klaus Aehlig
maxRunningJobs :: Int
88 48e4da5c Klaus Aehlig
maxRunningJobs = C.luxidMaximalRunningJobs 
89 48e4da5c Klaus Aehlig
90 48e4da5c Klaus Aehlig
-- | Wrapper function to atomically update the jobs in the queue status.
91 48e4da5c Klaus Aehlig
modifyJobs :: JQStatus -> (Queue -> Queue) -> IO ()
92 48e4da5c Klaus Aehlig
modifyJobs qstat f = atomicModifyIORef (jqJobs qstat) (flip (,) ()  . f)
93 48e4da5c Klaus Aehlig
94 48e4da5c Klaus Aehlig
-- | Reread a job from disk, if the file has changed.
95 48e4da5c Klaus Aehlig
readJobStatus :: JobWithStat -> IO (Maybe JobWithStat)
96 48e4da5c Klaus Aehlig
readJobStatus (JobWithStat {jStat=fstat, jJob=job})  = do
97 48e4da5c Klaus Aehlig
  let jid = qjId job
98 48e4da5c Klaus Aehlig
  qdir <- queueDir
99 48e4da5c Klaus Aehlig
  let fpath = liveJobFile qdir jid
100 48e4da5c Klaus Aehlig
  logDebug $ "Checking if " ++ fpath ++ " changed on disk."
101 48e4da5c Klaus Aehlig
  changed <- needsReload fstat fpath
102 48e4da5c Klaus Aehlig
  case changed of
103 48e4da5c Klaus Aehlig
    Nothing -> do
104 48e4da5c Klaus Aehlig
      logDebug $ "File " ++ fpath ++ " not changed on disk."
105 48e4da5c Klaus Aehlig
      return Nothing
106 48e4da5c Klaus Aehlig
    Just fstat' -> do
107 48e4da5c Klaus Aehlig
      logInfo $ "Rereading " ++ fpath
108 48e4da5c Klaus Aehlig
      readResult <- loadJobFromDisk qdir False jid
109 48e4da5c Klaus Aehlig
      let jids = show $ fromJobId jid
110 48e4da5c Klaus Aehlig
      case readResult of
111 48e4da5c Klaus Aehlig
        Bad s -> do
112 48e4da5c Klaus Aehlig
          logWarning $ "Failed to read job " ++ jids ++ ": " ++ s
113 48e4da5c Klaus Aehlig
          return Nothing
114 48e4da5c Klaus Aehlig
        Ok (job', _) -> do
115 48e4da5c Klaus Aehlig
          logDebug
116 48e4da5c Klaus Aehlig
            $ "Read job " ++ jids ++ ", staus is " ++ show (calcJobStatus job')
117 48e4da5c Klaus Aehlig
          return . Just $ JobWithStat {jStat=fstat', jJob=job'}
118 48e4da5c Klaus Aehlig
119 48e4da5c Klaus Aehlig
-- | Update a job in the job queue, if it is still there. This is the
120 48e4da5c Klaus Aehlig
-- pure function for inserting a previously read change into the queue.
121 48e4da5c Klaus Aehlig
-- as the change contains its time stamp, we don't have to worry about a
122 48e4da5c Klaus Aehlig
-- later read change overwriting a newer read state. If this happens, the
123 48e4da5c Klaus Aehlig
-- fstat value will be outdated, so the next poller run will fix this.
124 48e4da5c Klaus Aehlig
updateJobStatus :: JobWithStat -> [JobWithStat] -> [JobWithStat]
125 48e4da5c Klaus Aehlig
updateJobStatus job' =
126 48e4da5c Klaus Aehlig
  let jid = qjId $ jJob job' in
127 48e4da5c Klaus Aehlig
  map (\job -> if qjId (jJob job) == jid then job' else job)
128 48e4da5c Klaus Aehlig
129 48e4da5c Klaus Aehlig
-- | Update a single job by reading it from disk, if necessary.
130 48e4da5c Klaus Aehlig
updateJob :: JQStatus -> JobWithStat -> IO ()
131 48e4da5c Klaus Aehlig
updateJob state jb = do
132 48e4da5c Klaus Aehlig
  jb' <- readJobStatus jb
133 48e4da5c Klaus Aehlig
  maybe (return ()) (modifyJobs state . onRunningJobs . updateJobStatus) jb'
134 48e4da5c Klaus Aehlig
135 48e4da5c Klaus Aehlig
-- | Sort out the finished jobs from the monitored part of the queue.
136 48e4da5c Klaus Aehlig
-- This is the pure part, splitting the queue into a remaining queue
137 48e4da5c Klaus Aehlig
-- and the jobs that were removed.
138 48e4da5c Klaus Aehlig
sortoutFinishedJobs :: Queue -> (Queue, [QueuedJob])
139 48e4da5c Klaus Aehlig
sortoutFinishedJobs queue =
140 48e4da5c Klaus Aehlig
  let (run', fin) = partition
141 48e4da5c Klaus Aehlig
                      ((<= JOB_STATUS_RUNNING) . calcJobStatus . jJob)
142 48e4da5c Klaus Aehlig
                      . qRunning $ queue
143 48e4da5c Klaus Aehlig
  in (queue {qRunning=run'}, map jJob fin)
144 48e4da5c Klaus Aehlig
145 48e4da5c Klaus Aehlig
-- | Actually clean up the finished jobs. This is the IO wrapper around
146 48e4da5c Klaus Aehlig
-- the pure `sortoutFinishedJobs`.
147 48e4da5c Klaus Aehlig
cleanupFinishedJobs :: JQStatus -> IO ()
148 48e4da5c Klaus Aehlig
cleanupFinishedJobs qstate = do
149 48e4da5c Klaus Aehlig
  finished <- atomicModifyIORef (jqJobs qstate) sortoutFinishedJobs
150 48e4da5c Klaus Aehlig
  let showJob = show . ((fromJobId . qjId) &&& calcJobStatus)
151 48e4da5c Klaus Aehlig
      jlist = commaJoin $ map showJob finished
152 48e4da5c Klaus Aehlig
  unless (null finished)
153 48e4da5c Klaus Aehlig
    . logInfo $ "Finished jobs: " ++ jlist
154 48e4da5c Klaus Aehlig
155 48e4da5c Klaus Aehlig
-- | Decide on which jobs to schedule next for execution. This is the
156 48e4da5c Klaus Aehlig
-- pure function doing the scheduling.
157 48e4da5c Klaus Aehlig
selectJobsToRun :: Queue -> (Queue, [QueuedJob])
158 48e4da5c Klaus Aehlig
selectJobsToRun queue =
159 48e4da5c Klaus Aehlig
  let n = maxRunningJobs - length (qRunning queue)
160 48e4da5c Klaus Aehlig
      (chosen, remain) = splitAt n (qEnqueued queue)
161 48e4da5c Klaus Aehlig
  in (queue {qEnqueued=remain, qRunning=qRunning queue ++ chosen}
162 48e4da5c Klaus Aehlig
     , map jJob chosen)
163 48e4da5c Klaus Aehlig
164 48e4da5c Klaus Aehlig
-- | Schedule jobs to be run. This is the IO wrapper around the
165 48e4da5c Klaus Aehlig
-- pure `selectJobsToRun`.
166 48e4da5c Klaus Aehlig
scheduleSomeJobs :: JQStatus -> IO ()
167 48e4da5c Klaus Aehlig
scheduleSomeJobs qstate = do
168 48e4da5c Klaus Aehlig
  chosen <- atomicModifyIORef (jqJobs qstate) selectJobsToRun
169 48e4da5c Klaus Aehlig
  unless (null chosen) . logInfo . (++) "Staring jobs: " . commaJoin
170 48e4da5c Klaus Aehlig
    $ map (show . fromJobId . qjId) chosen
171 48e4da5c Klaus Aehlig
  JQ.startJobs chosen
172 48e4da5c Klaus Aehlig
173 48e4da5c Klaus Aehlig
-- | Format the job queue status in a compact, human readable way.
174 48e4da5c Klaus Aehlig
showQueue :: Queue -> String
175 48e4da5c Klaus Aehlig
showQueue (Queue {qEnqueued=waiting, qRunning=running}) =
176 48e4da5c Klaus Aehlig
  let showids = show . map (fromJobId . qjId . jJob)
177 48e4da5c Klaus Aehlig
  in "Waiting jobs: " ++ showids waiting 
178 48e4da5c Klaus Aehlig
       ++ "; running jobs: " ++ showids running
179 48e4da5c Klaus Aehlig
180 48e4da5c Klaus Aehlig
-- | Time-based watcher for updating the job queue.
181 48e4da5c Klaus Aehlig
onTimeWatcher :: JQStatus -> IO ()
182 48e4da5c Klaus Aehlig
onTimeWatcher qstate = forever $ do
183 48e4da5c Klaus Aehlig
  threadDelay watchInterval
184 48e4da5c Klaus Aehlig
  logDebug "Watcher timer fired"
185 48e4da5c Klaus Aehlig
  jobs <- readIORef (jqJobs qstate)
186 48e4da5c Klaus Aehlig
  mapM_ (updateJob qstate) $ qRunning jobs
187 48e4da5c Klaus Aehlig
  cleanupFinishedJobs qstate
188 48e4da5c Klaus Aehlig
  jobs' <- readIORef (jqJobs qstate)
189 48e4da5c Klaus Aehlig
  logInfo $ showQueue jobs'
190 48e4da5c Klaus Aehlig
  scheduleSomeJobs qstate
191 48e4da5c Klaus Aehlig
192 48e4da5c Klaus Aehlig
-- | Set up the job scheduler. This will also start the monitoring
193 48e4da5c Klaus Aehlig
-- of changes to the running jobs.
194 48e4da5c Klaus Aehlig
initJQScheduler :: JQStatus -> IO ()
195 48e4da5c Klaus Aehlig
initJQScheduler qstate = do
196 48e4da5c Klaus Aehlig
  logInfo "Starting time-based job queue watcher"
197 48e4da5c Klaus Aehlig
  _ <- forkIO $ onTimeWatcher qstate
198 48e4da5c Klaus Aehlig
  return ()
199 48e4da5c Klaus Aehlig
200 48e4da5c Klaus Aehlig
-- | Enqueue new jobs. This will guarantee that the jobs will be executed
201 48e4da5c Klaus Aehlig
-- eventually.
202 48e4da5c Klaus Aehlig
enqueueNewJobs :: JQStatus -> [QueuedJob] -> IO ()
203 48e4da5c Klaus Aehlig
enqueueNewJobs state jobs = do
204 48e4da5c Klaus Aehlig
  logInfo . (++) "New jobs enqueued: " . commaJoin
205 48e4da5c Klaus Aehlig
    $ map (show . fromJobId . qjId) jobs
206 48e4da5c Klaus Aehlig
  let jobs' = map unreadJob jobs
207 48e4da5c Klaus Aehlig
  modifyJobs state (onQueuedJobs (++ jobs'))
208 48e4da5c Klaus Aehlig
  scheduleSomeJobs state