Revision e5fba493

b/src/Ganeti/Query/Server.hs
34 34
import Control.Applicative
35 35
import Control.Concurrent
36 36
import Control.Exception
37
import Control.Monad (forever)
37
import Control.Monad (forever, when)
38 38
import Data.Bits (bitSize)
39 39
import qualified Data.Set as Set (toList)
40 40
import Data.IORef
......
52 52
import qualified Ganeti.Config as Config
53 53
import Ganeti.ConfigReader
54 54
import Ganeti.BasicTypes
55
import Ganeti.JQueue
55 56
import Ganeti.Logging
56 57
import Ganeti.Luxi
57 58
import qualified Ganeti.Query.Language as Qlang
58 59
import qualified Ganeti.Query.Cluster as QCluster
60
import Ganeti.Path (queueDir, jobQueueLockFile, defaultLuxiSocket)
59 61
import Ganeti.Query.Query
60 62
import Ganeti.Query.Filter (makeSimpleFilter)
61 63
import Ganeti.Types
64
import Ganeti.Utils (lockFile, exitIfBad)
62 65

  
63 66
-- | Helper for classic queries.
64 67
handleClassicQuery :: ConfigData      -- ^ Cluster config
......
76 79
  return $ showJSON <$> (qr >>= queryCompat)
77 80

  
78 81
-- | Minimal wrapper to handle the missing config case.
79
handleCallWrapper :: Result ConfigData -> LuxiOp -> IO (ErrorResult JSValue)
80
handleCallWrapper (Bad msg) _ =
82
handleCallWrapper :: MVar () -> Result ConfigData 
83
                     -> LuxiOp -> IO (ErrorResult JSValue)
84
handleCallWrapper _ (Bad msg) _ =
81 85
  return . Bad . ConfigurationError $
82 86
           "I do not have access to a valid configuration, cannot\
83 87
           \ process queries: " ++ msg
84
handleCallWrapper (Ok config) op = handleCall config op
88
handleCallWrapper qlock (Ok config) op = handleCall qlock config op
85 89

  
86 90
-- | Actual luxi operation handler.
87
handleCall :: ConfigData -> LuxiOp -> IO (ErrorResult JSValue)
88
handleCall cdata QueryClusterInfo =
91
handleCall :: MVar () -> ConfigData -> LuxiOp -> IO (ErrorResult JSValue)
92
handleCall _ cdata QueryClusterInfo =
89 93
  let cluster = configCluster cdata
90 94
      master = QCluster.clusterMasterNodeName cdata
91 95
      hypervisors = clusterEnabledHypervisors cluster
......
154 158
    Ok _ -> return . Ok . J.makeObj $ obj
155 159
    Bad ex -> return $ Bad ex
156 160

  
157
handleCall cfg (QueryTags kind name) = do
161
handleCall _ cfg (QueryTags kind name) = do
158 162
  let tags = case kind of
159 163
               TagKindCluster  -> Ok . clusterTags $ configCluster cfg
160 164
               TagKindGroup    -> groupTags <$> Config.getGroup    cfg name
......
165 169
                                        ECodeInval
166 170
  return (J.showJSON <$> tags)
167 171

  
168
handleCall cfg (Query qkind qfields qfilter) = do
172
handleCall _ cfg (Query qkind qfields qfilter) = do
169 173
  result <- query cfg True (Qlang.Query qkind qfields qfilter)
170 174
  return $ J.showJSON <$> result
171 175

  
172
handleCall _ (QueryFields qkind qfields) = do
176
handleCall _ _ (QueryFields qkind qfields) = do
173 177
  let result = queryFields (Qlang.QueryFields qkind qfields)
174 178
  return $ J.showJSON <$> result
175 179

  
176
handleCall cfg (QueryNodes names fields lock) =
180
handleCall _ cfg (QueryNodes names fields lock) =
177 181
  handleClassicQuery cfg (Qlang.ItemTypeOpCode Qlang.QRNode)
178 182
    (map Left names) fields lock
179 183

  
180
handleCall cfg (QueryGroups names fields lock) =
184
handleCall _ cfg (QueryGroups names fields lock) =
181 185
  handleClassicQuery cfg (Qlang.ItemTypeOpCode Qlang.QRGroup)
182 186
    (map Left names) fields lock
183 187

  
184
handleCall cfg (QueryJobs names fields) =
188
handleCall _ cfg (QueryJobs names fields) =
185 189
  handleClassicQuery cfg (Qlang.ItemTypeLuxi Qlang.QRJob)
186 190
    (map (Right . fromIntegral . fromJobId) names)  fields False
187 191

  
188
handleCall cfg (QueryNetworks names fields lock) =
192
handleCall _ cfg (QueryNetworks names fields lock) =
189 193
  handleClassicQuery cfg (Qlang.ItemTypeOpCode Qlang.QRNetwork)
190 194
    (map Left names) fields lock
191 195

  
192
handleCall _ op =
196
handleCall qlock cfg (SubmitJobToDrainedQueue ops) =
197
  do
198
    jobid <- allocateJobId (Config.getMasterCandidates cfg) qlock
199
    case jobid of
200
      Bad s -> return . Bad . GenericError $ s
201
      Ok jid -> do
202
        qDir <- queueDir
203
        job <- queuedJobFromOpCodes jid ops
204
        write_result <- writeJobToDisk qDir job
205
        case write_result of
206
          Bad s -> return . Bad . GenericError $ s
207
          Ok () -> do
208
            socketpath <- defaultLuxiSocket
209
            client <- getClient socketpath
210
            pickupResult <- callMethod (PickupJob jid) client
211
            closeClient client
212
            case pickupResult of
213
              Ok _ -> return ()
214
              Bad e -> logWarning $ "Failded to notify masterd: " ++ show e
215
            return . Ok . showJSON . fromJobId $ jid
216

  
217
handleCall qlock cfg (SubmitJob ops) =
218
  do
219
    open <- isQueueOpen
220
    if not open
221
       then return . Bad . GenericError $ "Queue drained"
222
       else handleCall qlock cfg (SubmitJobToDrainedQueue ops)
223

  
224
handleCall _ _ op =
193 225
  return . Bad $
194 226
    GenericError ("Luxi call '" ++ strOfOp op ++ "' not implemented")
195 227

  
196 228
-- | Given a decoded luxi request, executes it and sends the luxi
197 229
-- response back to the client.
198
handleClientMsg :: Client -> ConfigReader -> LuxiOp -> IO Bool
199
handleClientMsg client creader args = do
230
handleClientMsg :: MVar () -> Client -> ConfigReader -> LuxiOp -> IO Bool
231
handleClientMsg qlock client creader args = do
200 232
  cfg <- creader
201 233
  logDebug $ "Request: " ++ show args
202
  call_result <- handleCallWrapper cfg args
234
  call_result <- handleCallWrapper qlock cfg args
203 235
  (!status, !rval) <-
204 236
    case call_result of
205 237
      Bad err -> do
......
216 248

  
217 249
-- | Handles one iteration of the client protocol: receives message,
218 250
-- checks it for validity and decodes it, returns response.
219
handleClient :: Client -> ConfigReader -> IO Bool
220
handleClient client creader = do
251
handleClient :: MVar () -> Client -> ConfigReader -> IO Bool
252
handleClient qlock client creader = do
221 253
  !msg <- recvMsgExt client
222 254
  logDebug $ "Received message: " ++ show msg
223 255
  case msg of
......
231 263
             logWarning errmsg
232 264
             sendMsg client $ buildResponse False (showJSON errmsg)
233 265
             return False
234
        Ok args -> handleClientMsg client creader args
266
        Ok args -> handleClientMsg qlock client creader args
235 267

  
236 268
-- | Main client loop: runs one loop of 'handleClient', and if that
237 269
-- doesn't report a finished (closed) connection, restarts itself.
238
clientLoop :: Client -> ConfigReader -> IO ()
239
clientLoop client creader = do
240
  result <- handleClient client creader
270
clientLoop :: MVar () -> Client -> ConfigReader -> IO ()
271
clientLoop qlock client creader = do
272
  result <- handleClient qlock client creader
241 273
  if result
242
    then clientLoop client creader
274
    then clientLoop qlock client creader
243 275
    else closeClient client
244 276

  
245 277
-- | Main listener loop: accepts clients, forks an I/O thread to handle
246 278
-- that client.
247
listener :: ConfigReader -> S.Socket -> IO ()
248
listener creader socket = do
279
listener :: MVar () -> ConfigReader -> S.Socket -> IO ()
280
listener qlock creader socket = do
249 281
  client <- acceptClient socket
250
  _ <- forkIO $ clientLoop client creader
282
  _ <- forkIO $ clientLoop qlock client creader
251 283
  return ()
252 284

  
253 285
-- | Type alias for prepMain results
......
272 304
main _ _ (socket_path, server, cref) = do
273 305
  initConfigReader id cref
274 306
  let creader = readIORef cref
307
  
308
  qlockFile <- jobQueueLockFile
309
  lockFile qlockFile >>= exitIfBad "Failed to obtain the job-queue lock"
310
  qlock <- newMVar ()
275 311

  
276 312
  finally
277
    (forever $ listener creader server)
313
    (forever $ listener qlock creader server)
278 314
    (closeServer socket_path server)

Also available in: Unified diff