Revision f5b765f0

b/src/Ganeti/Query/Server.hs
57 57
import Ganeti.Luxi
58 58
import qualified Ganeti.Query.Language as Qlang
59 59
import qualified Ganeti.Query.Cluster as QCluster
60
import Ganeti.Path (queueDir, jobQueueLockFile, defaultMasterSocket)
60
import Ganeti.Path (queueDir, jobQueueLockFile)
61 61
import Ganeti.Query.Query
62 62
import Ganeti.Query.Filter (makeSimpleFilter)
63 63
import Ganeti.Types
......
200 200

  
201 201
handleCall qlock cfg (SubmitJobToDrainedQueue ops) =
202 202
  do
203
    jobid <- allocateJobId (Config.getMasterCandidates cfg) qlock
203
    let mcs = Config.getMasterCandidates cfg
204
    jobid <- allocateJobId mcs qlock
204 205
    case jobid of
205 206
      Bad s -> return . Bad . GenericError $ s
206 207
      Ok jid -> do
......
210 211
        case write_result of
211 212
          Bad s -> return . Bad . GenericError $ s
212 213
          Ok () -> do
213
            socketpath <- defaultMasterSocket
214
            client <- getClient socketpath
215
            pickupResult <- callMethod (PickupJob jid) client
216
            closeClient client
217
            case pickupResult of
218
              Ok _ -> return ()
219
              Bad e -> logWarning $ "Failded to notify masterd: " ++ show e
214
            _ <- replicateManyJobs qDir mcs [job]
215
            _ <- forkIO $ enqueueJobs [job]
220 216
            return . Ok . showJSON . fromJobId $ jid
221 217

  
222 218
handleCall qlock cfg (SubmitJob ops) =
......
232 228
    if not open
233 229
      then return . Bad . GenericError $ "Queue drained"
234 230
      else do
235
        result_jobids <- allocateJobIds (Config.getMasterCandidates cfg)
236
                           qlock (length lops)
231
        let mcs = Config.getMasterCandidates cfg
232
        result_jobids <- allocateJobIds mcs qlock (length lops)
237 233
        case result_jobids of
238 234
          Bad s -> return . Bad . GenericError $ s
239 235
          Ok jids -> do
240 236
            jobs <- zipWithM queuedJobFromOpCodes jids lops
241 237
            qDir <- queueDir
242 238
            write_results <- mapM (writeJobToDisk qDir) jobs
243
            let annotated_results = zip write_results jids
239
            let annotated_results = zip write_results jobs
244 240
                succeeded = map snd $ filter (isOk . fst) annotated_results
245 241
            when (any isBad write_results) . logWarning
246 242
              $ "Writing some jobs failed " ++ show annotated_results
247
            socketpath <- defaultMasterSocket
248
            client <- getClient socketpath
249
            pickupResults <- mapM (flip callMethod client . PickupJob)
250
                               succeeded
251
            closeClient client
252
            when (any isBad pickupResults)
253
              . logWarning . (++)  "Failed to notify maserd: " . show
254
              $ zip succeeded pickupResults
243
            replicateManyJobs qDir mcs succeeded
244
            _ <- forkIO $ enqueueJobs succeeded
255 245
            return . Ok . JSArray
256
              . map (\(res, jid) ->
246
              . map (\(res, job) ->
257 247
                      if isOk res
258
                        then showJSON (True, fromJobId jid)
248
                        then showJSON (True, fromJobId $ qjId job)
259 249
                        else showJSON (False, genericResult id (const "") res))
260 250
              $ annotated_results
261 251

  

Also available in: Unified diff