Revision d9dd04b1 src/Ganeti/JQScheduler.hs
b/src/Ganeti/JQScheduler.hs | ||
---|---|---|
44 | 44 |
import Ganeti.Constants as C |
45 | 45 |
import Ganeti.JQueue as JQ |
46 | 46 |
import Ganeti.Logging |
47 |
import Ganeti.Objects (ConfigData)
|
|
47 |
import Ganeti.Objects |
|
48 | 48 |
import Ganeti.Path |
49 | 49 |
import Ganeti.Types |
50 | 50 |
import Ganeti.Utils |
... | ... | |
92 | 92 |
watchInterval :: Int |
93 | 93 |
watchInterval = C.luxidJobqueuePollInterval * 1000000 |
94 | 94 |
|
95 |
-- | Maximal number of jobs to be running at the same time. |
|
96 |
maxRunningJobs :: Int |
|
97 |
maxRunningJobs = C.luxidMaximalRunningJobs |
|
95 |
-- | Get the maximual number of jobs to be run simultaneously from the |
|
96 |
-- configuration. If the configuration is not available, be conservative |
|
97 |
-- and use the smallest possible value, i.e., 1. |
|
98 |
getMaxRunningJobs :: JQStatus -> IO Int |
|
99 |
getMaxRunningJobs = |
|
100 |
liftM (genericResult (const 1) (clusterMaxRunningJobs . configCluster)) |
|
101 |
. readIORef . jqConfig |
|
98 | 102 |
|
99 | 103 |
-- | Wrapper function to atomically update the jobs in the queue status. |
100 | 104 |
modifyJobs :: JQStatus -> (Queue -> Queue) -> IO () |
... | ... | |
197 | 201 |
|
198 | 202 |
-- | Decide on which jobs to schedule next for execution. This is the |
199 | 203 |
-- pure function doing the scheduling. |
200 |
selectJobsToRun :: Queue -> (Queue, [JobWithStat]) |
|
201 |
selectJobsToRun queue = |
|
202 |
let n = maxRunningJobs - length (qRunning queue)
|
|
204 |
selectJobsToRun :: Int -> Queue -> (Queue, [JobWithStat])
|
|
205 |
selectJobsToRun count queue =
|
|
206 |
let n = count - length (qRunning queue)
|
|
203 | 207 |
(chosen, remain) = splitAt n (qEnqueued queue) |
204 | 208 |
in (queue {qEnqueued=remain, qRunning=qRunning queue ++ chosen}, chosen) |
205 | 209 |
|
... | ... | |
219 | 223 |
-- pure `selectJobsToRun`. |
220 | 224 |
scheduleSomeJobs :: JQStatus -> IO () |
221 | 225 |
scheduleSomeJobs qstate = do |
222 |
chosen <- atomicModifyIORef (jqJobs qstate) selectJobsToRun |
|
226 |
count <- getMaxRunningJobs qstate |
|
227 |
chosen <- atomicModifyIORef (jqJobs qstate) (selectJobsToRun count) |
|
223 | 228 |
let jobs = map jJob chosen |
224 | 229 |
unless (null chosen) . logInfo . (++) "Starting jobs: " . commaJoin |
225 | 230 |
$ map (show . fromJobId . qjId) jobs |
Also available in: Unified diff