Revision b5fa2700

b/src/Ganeti/Query/Server.hs
53 53
import Ganeti.ConfigReader
54 54
import Ganeti.BasicTypes
55 55
import Ganeti.JQueue
56
import Ganeti.JQScheduler
56 57
import Ganeti.Logging
57 58
import Ganeti.Luxi
58 59
import qualified Ganeti.Query.Language as Qlang
......
80 81
  return $ showJSON <$> (qr >>= queryCompat)
81 82

  
82 83
-- | Minimal wrapper to handle the missing config case.
83
handleCallWrapper :: MVar () -> Result ConfigData 
84
handleCallWrapper :: MVar () -> JQStatus ->  Result ConfigData 
84 85
                     -> LuxiOp -> IO (ErrorResult JSValue)
85
handleCallWrapper _ (Bad msg) _ =
86
handleCallWrapper _ _ (Bad msg) _ =
86 87
  return . Bad . ConfigurationError $
87 88
           "I do not have access to a valid configuration, cannot\
88 89
           \ process queries: " ++ msg
89
handleCallWrapper qlock (Ok config) op = handleCall qlock config op
90
handleCallWrapper qlock qstat (Ok config) op = handleCall qlock qstat config op
90 91

  
91 92
-- | Actual luxi operation handler.
92
handleCall :: MVar () -> ConfigData -> LuxiOp -> IO (ErrorResult JSValue)
93
handleCall _ cdata QueryClusterInfo =
93
handleCall :: MVar () -> JQStatus 
94
              -> ConfigData -> LuxiOp -> IO (ErrorResult JSValue)
95
handleCall _ _ cdata QueryClusterInfo =
94 96
  let cluster = configCluster cdata
95 97
      master = QCluster.clusterMasterNodeName cdata
96 98
      hypervisors = clusterEnabledHypervisors cluster
......
161 163
    Ok _ -> return . Ok . J.makeObj $ obj
162 164
    Bad ex -> return $ Bad ex
163 165

  
164
handleCall _ cfg (QueryTags kind name) = do
166
handleCall _ _ cfg (QueryTags kind name) = do
165 167
  let tags = case kind of
166 168
               TagKindCluster  -> Ok . clusterTags $ configCluster cfg
167 169
               TagKindGroup    -> groupTags <$> Config.getGroup    cfg name
......
172 174
                                        ECodeInval
173 175
  return (J.showJSON <$> tags)
174 176

  
175
handleCall _ cfg (Query qkind qfields qfilter) = do
177
handleCall _ _ cfg (Query qkind qfields qfilter) = do
176 178
  result <- query cfg True (Qlang.Query qkind qfields qfilter)
177 179
  return $ J.showJSON <$> result
178 180

  
179
handleCall _ _ (QueryFields qkind qfields) = do
181
handleCall _ _ _ (QueryFields qkind qfields) = do
180 182
  let result = queryFields (Qlang.QueryFields qkind qfields)
181 183
  return $ J.showJSON <$> result
182 184

  
183
handleCall _ cfg (QueryNodes names fields lock) =
185
handleCall _ _ cfg (QueryNodes names fields lock) =
184 186
  handleClassicQuery cfg (Qlang.ItemTypeOpCode Qlang.QRNode)
185 187
    (map Left names) fields lock
186 188

  
187
handleCall _ cfg (QueryInstances names fields lock) =
189
handleCall _ _ cfg (QueryInstances names fields lock) =
188 190
  handleClassicQuery cfg (Qlang.ItemTypeOpCode Qlang.QRInstance)
189 191
    (map Left names) fields lock
190 192

  
191
handleCall _ cfg (QueryGroups names fields lock) =
193
handleCall _ _ cfg (QueryGroups names fields lock) =
192 194
  handleClassicQuery cfg (Qlang.ItemTypeOpCode Qlang.QRGroup)
193 195
    (map Left names) fields lock
194 196

  
195
handleCall _ cfg (QueryJobs names fields) =
197
handleCall _ _ cfg (QueryJobs names fields) =
196 198
  handleClassicQuery cfg (Qlang.ItemTypeLuxi Qlang.QRJob)
197 199
    (map (Right . fromIntegral . fromJobId) names)  fields False
198 200

  
199
handleCall _ cfg (QueryNetworks names fields lock) =
201
handleCall _ _ cfg (QueryNetworks names fields lock) =
200 202
  handleClassicQuery cfg (Qlang.ItemTypeOpCode Qlang.QRNetwork)
201 203
    (map Left names) fields lock
202 204

  
203
handleCall qlock cfg (SubmitJobToDrainedQueue ops) =
205
handleCall qlock qstat cfg (SubmitJobToDrainedQueue ops) =
204 206
  do
205 207
    let mcs = Config.getMasterCandidates cfg
206 208
    jobid <- allocateJobId mcs qlock
......
216 218
          Bad s -> return . Bad . GenericError $ s
217 219
          Ok () -> do
218 220
            _ <- replicateManyJobs qDir mcs [job]
219
            _ <- forkIO $ startJobs [job]
221
            _ <- forkIO $ enqueueNewJobs qstat [job]
220 222
            return . Ok . showJSON . fromJobId $ jid
221 223

  
222
handleCall qlock cfg (SubmitJob ops) =
224
handleCall qlock qstat cfg (SubmitJob ops) =
223 225
  do
224 226
    open <- isQueueOpen
225 227
    if not open
226 228
       then return . Bad . GenericError $ "Queue drained"
227
       else handleCall qlock cfg (SubmitJobToDrainedQueue ops)
229
       else handleCall qlock qstat cfg (SubmitJobToDrainedQueue ops)
228 230

  
229
handleCall qlock cfg (SubmitManyJobs lops) =
231
handleCall qlock qstat cfg (SubmitManyJobs lops) =
230 232
  do
231 233
    open <- isQueueOpen
232 234
    if not open
......
247 249
            when (any isBad write_results) . logWarning
248 250
              $ "Writing some jobs failed " ++ show annotated_results
249 251
            replicateManyJobs qDir mcs succeeded
250
            _ <- forkIO $ startJobs succeeded
252
            _ <- forkIO $ enqueueNewJobs qstat succeeded
251 253
            return . Ok . JSArray
252 254
              . map (\(res, job) ->
253 255
                      if isOk res
......
255 257
                        else showJSON (False, genericResult id (const "") res))
256 258
              $ annotated_results
257 259

  
258
handleCall _ _ op =
260
handleCall _ _ _ op =
259 261
  return . Bad $
260 262
    GenericError ("Luxi call '" ++ strOfOp op ++ "' not implemented")
261 263

  
......
263 265

  
264 266
-- | Given a decoded luxi request, executes it and sends the luxi
265 267
-- response back to the client.
266
handleClientMsg :: MVar () -> Client -> ConfigReader -> LuxiOp -> IO Bool
267
handleClientMsg qlock client creader args = do
268
handleClientMsg :: MVar () -> JQStatus -> Client -> ConfigReader
269
                   -> LuxiOp -> IO Bool
270
handleClientMsg qlock qstat client creader args = do
268 271
  cfg <- creader
269 272
  logDebug $ "Request: " ++ show args
270
  call_result <- handleCallWrapper qlock cfg args
273
  call_result <- handleCallWrapper qlock qstat cfg args
271 274
  (!status, !rval) <-
272 275
    case call_result of
273 276
      Bad err -> do
......
284 287

  
285 288
-- | Handles one iteration of the client protocol: receives message,
286 289
-- checks it for validity and decodes it, returns response.
287
handleClient :: MVar () -> Client -> ConfigReader -> IO Bool
288
handleClient qlock client creader = do
290
handleClient :: MVar () -> JQStatus -> Client -> ConfigReader -> IO Bool
291
handleClient qlock qstat client creader = do
289 292
  !msg <- recvMsgExt client
290 293
  logDebug $ "Received message: " ++ show msg
291 294
  case msg of
......
299 302
             logWarning errmsg
300 303
             sendMsg client $ buildResponse False (showJSON errmsg)
301 304
             return False
302
        Ok args -> handleClientMsg qlock client creader args
305
        Ok args -> handleClientMsg qlock qstat client creader args
303 306

  
304 307
-- | Main client loop: runs one loop of 'handleClient', and if that
305 308
-- doesn't report a finished (closed) connection, restarts itself.
306
clientLoop :: MVar () -> Client -> ConfigReader -> IO ()
307
clientLoop qlock client creader = do
308
  result <- handleClient qlock client creader
309
clientLoop :: MVar () -> JQStatus -> Client -> ConfigReader -> IO ()
310
clientLoop qlock qstat client creader = do
311
  result <- handleClient qlock qstat client creader
309 312
  if result
310
    then clientLoop qlock client creader
313
    then clientLoop qlock qstat client creader
311 314
    else closeClient client
312 315

  
313 316
-- | Main listener loop: accepts clients, forks an I/O thread to handle
314 317
-- that client.
315
listener :: MVar () -> ConfigReader -> S.Socket -> IO ()
316
listener qlock creader socket = do
318
listener :: MVar () -> JQStatus -> ConfigReader -> S.Socket -> IO ()
319
listener qlock qstat creader socket = do
317 320
  client <- acceptClient socket
318
  _ <- forkIO $ clientLoop qlock client creader
321
  _ <- forkIO $ clientLoop qlock qstat client creader
319 322
  return ()
320 323

  
321 324
-- | Type alias for prepMain results
322
type PrepResult = (FilePath, S.Socket, IORef (Result ConfigData))
325
type PrepResult = (FilePath, S.Socket, IORef (Result ConfigData), JQStatus)
323 326

  
324 327
-- | Check function for luxid.
325 328
checkMain :: CheckFn ()
......
333 336
  s <- describeError "binding to the Luxi socket"
334 337
         Nothing (Just socket_path) $ getServer True socket_path
335 338
  cref <- newIORef (Bad "Configuration not yet loaded")
336
  return (socket_path, s, cref)
339
  jq <- emptyJQStatus 
340
  return (socket_path, s, cref, jq)
337 341

  
338 342
-- | Main function.
339 343
main :: MainFn () PrepResult
340
main _ _ (socket_path, server, cref) = do
344
main _ _ (socket_path, server, cref, jq) = do
341 345
  initConfigReader id cref
342 346
  let creader = readIORef cref
347
  initJQScheduler jq
343 348
  
344 349
  qlockFile <- jobQueueLockFile
345 350
  lockFile qlockFile >>= exitIfBad "Failed to obtain the job-queue lock"
346 351
  qlock <- newMVar ()
347 352

  
348 353
  finally
349
    (forever $ listener qlock creader server)
354
    (forever $ listener qlock jq creader server)
350 355
    (closeServer socket_path server)

Also available in: Unified diff