Revision f5b765f0
b/src/Ganeti/Query/Server.hs | ||
---|---|---|
57 | 57 |
import Ganeti.Luxi |
58 | 58 |
import qualified Ganeti.Query.Language as Qlang |
59 | 59 |
import qualified Ganeti.Query.Cluster as QCluster |
60 |
import Ganeti.Path (queueDir, jobQueueLockFile, defaultMasterSocket)
|
|
60 |
import Ganeti.Path (queueDir, jobQueueLockFile) |
|
61 | 61 |
import Ganeti.Query.Query |
62 | 62 |
import Ganeti.Query.Filter (makeSimpleFilter) |
63 | 63 |
import Ganeti.Types |
... | ... | |
200 | 200 |
|
201 | 201 |
handleCall qlock cfg (SubmitJobToDrainedQueue ops) = |
202 | 202 |
do |
203 |
jobid <- allocateJobId (Config.getMasterCandidates cfg) qlock |
|
203 |
let mcs = Config.getMasterCandidates cfg |
|
204 |
jobid <- allocateJobId mcs qlock |
|
204 | 205 |
case jobid of |
205 | 206 |
Bad s -> return . Bad . GenericError $ s |
206 | 207 |
Ok jid -> do |
... | ... | |
210 | 211 |
case write_result of |
211 | 212 |
Bad s -> return . Bad . GenericError $ s |
212 | 213 |
Ok () -> do |
213 |
socketpath <- defaultMasterSocket |
|
214 |
client <- getClient socketpath |
|
215 |
pickupResult <- callMethod (PickupJob jid) client |
|
216 |
closeClient client |
|
217 |
case pickupResult of |
|
218 |
Ok _ -> return () |
|
219 |
Bad e -> logWarning $ "Failded to notify masterd: " ++ show e |
|
214 |
_ <- replicateManyJobs qDir mcs [job] |
|
215 |
_ <- forkIO $ enqueueJobs [job] |
|
220 | 216 |
return . Ok . showJSON . fromJobId $ jid |
221 | 217 |
|
222 | 218 |
handleCall qlock cfg (SubmitJob ops) = |
... | ... | |
232 | 228 |
if not open |
233 | 229 |
then return . Bad . GenericError $ "Queue drained" |
234 | 230 |
else do |
235 |
result_jobids <- allocateJobIds (Config.getMasterCandidates cfg)
|
|
236 |
qlock (length lops)
|
|
231 |
let mcs = Config.getMasterCandidates cfg
|
|
232 |
result_jobids <- allocateJobIds mcs qlock (length lops)
|
|
237 | 233 |
case result_jobids of |
238 | 234 |
Bad s -> return . Bad . GenericError $ s |
239 | 235 |
Ok jids -> do |
240 | 236 |
jobs <- zipWithM queuedJobFromOpCodes jids lops |
241 | 237 |
qDir <- queueDir |
242 | 238 |
write_results <- mapM (writeJobToDisk qDir) jobs |
243 |
let annotated_results = zip write_results jids
|
|
239 |
let annotated_results = zip write_results jobs
|
|
244 | 240 |
succeeded = map snd $ filter (isOk . fst) annotated_results |
245 | 241 |
when (any isBad write_results) . logWarning |
246 | 242 |
$ "Writing some jobs failed " ++ show annotated_results |
247 |
socketpath <- defaultMasterSocket |
|
248 |
client <- getClient socketpath |
|
249 |
pickupResults <- mapM (flip callMethod client . PickupJob) |
|
250 |
succeeded |
|
251 |
closeClient client |
|
252 |
when (any isBad pickupResults) |
|
253 |
. logWarning . (++) "Failed to notify maserd: " . show |
|
254 |
$ zip succeeded pickupResults |
|
243 |
replicateManyJobs qDir mcs succeeded |
|
244 |
_ <- forkIO $ enqueueJobs succeeded |
|
255 | 245 |
return . Ok . JSArray |
256 |
. map (\(res, jid) ->
|
|
246 |
. map (\(res, job) ->
|
|
257 | 247 |
if isOk res |
258 |
then showJSON (True, fromJobId jid)
|
|
248 |
then showJSON (True, fromJobId $ qjId job)
|
|
259 | 249 |
else showJSON (False, genericResult id (const "") res)) |
260 | 250 |
$ annotated_results |
261 | 251 |
|
Also available in: Unified diff