Statistics
| Branch: | Tag: | Revision:

root / src / Ganeti / Query / Server.hs @ d79a6502

History | View | Annotate | Download (17.1 kB)

1
{-| Implementation of the Ganeti Query2 server.
2

    
3
-}
4

    
5
{-
6

    
7
Copyright (C) 2012, 2013 Google Inc.
8

    
9
This program is free software; you can redistribute it and/or modify
10
it under the terms of the GNU General Public License as published by
11
the Free Software Foundation; either version 2 of the License, or
12
(at your option) any later version.
13

    
14
This program is distributed in the hope that it will be useful, but
15
WITHOUT ANY WARRANTY; without even the implied warranty of
16
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
17
General Public License for more details.
18

    
19
You should have received a copy of the GNU General Public License
20
along with this program; if not, write to the Free Software
21
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22
02110-1301, USA.
23

    
24
-}
25

    
26
module Ganeti.Query.Server
27
  ( main
28
  , checkMain
29
  , prepMain
30
  ) where
31

    
32
import Control.Applicative
33
import Control.Concurrent
34
import Control.Exception
35
import Control.Monad (forever, when, zipWithM, liftM)
36
import Data.Bits (bitSize)
37
import qualified Data.Set as Set (toList)
38
import Data.IORef
39
import qualified Text.JSON as J
40
import Text.JSON (encode, showJSON, JSValue(..))
41
import System.Info (arch)
42

    
43
import qualified Ganeti.Constants as C
44
import qualified Ganeti.ConstantUtils as ConstantUtils (unFrozenSet)
45
import Ganeti.Errors
46
import qualified Ganeti.Path as Path
47
import Ganeti.Daemon
48
import Ganeti.Objects
49
import qualified Ganeti.Config as Config
50
import Ganeti.ConfigReader
51
import Ganeti.BasicTypes
52
import Ganeti.JQueue
53
import Ganeti.JQScheduler
54
import Ganeti.Logging
55
import Ganeti.Luxi
56
import qualified Ganeti.Query.Language as Qlang
57
import qualified Ganeti.Query.Cluster as QCluster
58
import Ganeti.Path (queueDir, jobQueueLockFile)
59
import Ganeti.Query.Query
60
import Ganeti.Query.Filter (makeSimpleFilter)
61
import Ganeti.Types
62
import qualified Ganeti.UDSServer as U
63
import Ganeti.Utils (lockFile, exitIfBad, watchFile)
64
import qualified Ganeti.Version as Version
65

    
66
-- | Helper for classic queries.
67
handleClassicQuery :: ConfigData      -- ^ Cluster config
68
                   -> Qlang.ItemType  -- ^ Query type
69
                   -> [Either String Integer] -- ^ Requested names
70
                                              -- (empty means all)
71
                   -> [String]        -- ^ Requested fields
72
                   -> Bool            -- ^ Whether to do sync queries or not
73
                   -> IO (GenericResult GanetiException JSValue)
74
handleClassicQuery _ _ _ _ True =
75
  return . Bad $ OpPrereqError "Sync queries are not allowed" ECodeInval
76
handleClassicQuery cfg qkind names fields _ = do
77
  let flt = makeSimpleFilter (nameField qkind) names
78
  qr <- query cfg True (Qlang.Query qkind fields flt)
79
  return $ showJSON <$> (qr >>= queryCompat)
80

    
81
-- | Minimal wrapper to handle the missing config case.
82
handleCallWrapper :: MVar () -> JQStatus ->  Result ConfigData 
83
                     -> LuxiOp -> IO (ErrorResult JSValue)
84
handleCallWrapper _ _ (Bad msg) _ =
85
  return . Bad . ConfigurationError $
86
           "I do not have access to a valid configuration, cannot\
87
           \ process queries: " ++ msg
88
handleCallWrapper qlock qstat (Ok config) op = handleCall qlock qstat config op
89

    
90
-- | Actual luxi operation handler.
91
handleCall :: MVar () -> JQStatus 
92
              -> ConfigData -> LuxiOp -> IO (ErrorResult JSValue)
93
handleCall _ _ cdata QueryClusterInfo =
94
  let cluster = configCluster cdata
95
      master = QCluster.clusterMasterNodeName cdata
96
      hypervisors = clusterEnabledHypervisors cluster
97
      diskTemplates = clusterEnabledDiskTemplates cluster
98
      def_hv = case hypervisors of
99
                 x:_ -> showJSON x
100
                 [] -> JSNull
101
      bits = show (bitSize (0::Int)) ++ "bits"
102
      arch_tuple = [bits, arch]
103
      obj = [ ("software_version", showJSON C.releaseVersion)
104
            , ("protocol_version", showJSON C.protocolVersion)
105
            , ("config_version", showJSON C.configVersion)
106
            , ("os_api_version", showJSON . maximum .
107
                                 Set.toList . ConstantUtils.unFrozenSet $
108
                                 C.osApiVersions)
109
            , ("export_version", showJSON C.exportVersion)
110
            , ("vcs_version", showJSON Version.version)
111
            , ("architecture", showJSON arch_tuple)
112
            , ("name", showJSON $ clusterClusterName cluster)
113
            , ("master", showJSON (case master of
114
                                     Ok name -> name
115
                                     _ -> undefined))
116
            , ("default_hypervisor", def_hv)
117
            , ("enabled_hypervisors", showJSON hypervisors)
118
            , ("hvparams", showJSON $ clusterHvparams cluster)
119
            , ("os_hvp", showJSON $ clusterOsHvp cluster)
120
            , ("beparams", showJSON $ clusterBeparams cluster)
121
            , ("osparams", showJSON $ clusterOsparams cluster)
122
            , ("ipolicy", showJSON $ clusterIpolicy cluster)
123
            , ("nicparams", showJSON $ clusterNicparams cluster)
124
            , ("ndparams", showJSON $ clusterNdparams cluster)
125
            , ("diskparams", showJSON $ clusterDiskparams cluster)
126
            , ("candidate_pool_size",
127
               showJSON $ clusterCandidatePoolSize cluster)
128
            , ("master_netdev",  showJSON $ clusterMasterNetdev cluster)
129
            , ("master_netmask", showJSON $ clusterMasterNetmask cluster)
130
            , ("use_external_mip_script",
131
               showJSON $ clusterUseExternalMipScript cluster)
132
            , ("volume_group_name",
133
               maybe JSNull showJSON (clusterVolumeGroupName cluster))
134
            , ("drbd_usermode_helper",
135
               maybe JSNull showJSON (clusterDrbdUsermodeHelper cluster))
136
            , ("file_storage_dir", showJSON $ clusterFileStorageDir cluster)
137
            , ("shared_file_storage_dir",
138
               showJSON $ clusterSharedFileStorageDir cluster)
139
            , ("maintain_node_health",
140
               showJSON $ clusterMaintainNodeHealth cluster)
141
            , ("ctime", showJSON $ clusterCtime cluster)
142
            , ("mtime", showJSON $ clusterMtime cluster)
143
            , ("uuid", showJSON $ clusterUuid cluster)
144
            , ("tags", showJSON $ clusterTags cluster)
145
            , ("uid_pool", showJSON $ clusterUidPool cluster)
146
            , ("default_iallocator",
147
               showJSON $ clusterDefaultIallocator cluster)
148
            , ("default_iallocator_params",
149
              showJSON $ clusterDefaultIallocatorParams cluster)
150
            , ("reserved_lvs", showJSON $ clusterReservedLvs cluster)
151
            , ("primary_ip_version",
152
               showJSON . ipFamilyToVersion $ clusterPrimaryIpFamily cluster)
153
            , ("prealloc_wipe_disks",
154
               showJSON $ clusterPreallocWipeDisks cluster)
155
            , ("hidden_os", showJSON $ clusterHiddenOs cluster)
156
            , ("blacklisted_os", showJSON $ clusterBlacklistedOs cluster)
157
            , ("enabled_disk_templates", showJSON diskTemplates)
158
            ]
159

    
160
  in case master of
161
    Ok _ -> return . Ok . J.makeObj $ obj
162
    Bad ex -> return $ Bad ex
163

    
164
handleCall _ _ cfg (QueryTags kind name) = do
165
  let tags = case kind of
166
               TagKindCluster  -> Ok . clusterTags $ configCluster cfg
167
               TagKindGroup    -> groupTags <$> Config.getGroup    cfg name
168
               TagKindNode     -> nodeTags  <$> Config.getNode     cfg name
169
               TagKindInstance -> instTags  <$> Config.getInstance cfg name
170
               TagKindNetwork  -> Bad $ OpPrereqError
171
                                        "Network tag is not allowed"
172
                                        ECodeInval
173
  return (J.showJSON <$> tags)
174

    
175
handleCall _ _ cfg (Query qkind qfields qfilter) = do
176
  result <- query cfg True (Qlang.Query qkind qfields qfilter)
177
  return $ J.showJSON <$> result
178

    
179
handleCall _ _ _ (QueryFields qkind qfields) = do
180
  let result = queryFields (Qlang.QueryFields qkind qfields)
181
  return $ J.showJSON <$> result
182

    
183
handleCall _ _ cfg (QueryNodes names fields lock) =
184
  handleClassicQuery cfg (Qlang.ItemTypeOpCode Qlang.QRNode)
185
    (map Left names) fields lock
186

    
187
handleCall _ _ cfg (QueryInstances names fields lock) =
188
  handleClassicQuery cfg (Qlang.ItemTypeOpCode Qlang.QRInstance)
189
    (map Left names) fields lock
190

    
191
handleCall _ _ cfg (QueryGroups names fields lock) =
192
  handleClassicQuery cfg (Qlang.ItemTypeOpCode Qlang.QRGroup)
193
    (map Left names) fields lock
194

    
195
handleCall _ _ cfg (QueryJobs names fields) =
196
  handleClassicQuery cfg (Qlang.ItemTypeLuxi Qlang.QRJob)
197
    (map (Right . fromIntegral . fromJobId) names)  fields False
198

    
199
handleCall _ _ cfg (QueryNetworks names fields lock) =
200
  handleClassicQuery cfg (Qlang.ItemTypeOpCode Qlang.QRNetwork)
201
    (map Left names) fields lock
202

    
203
handleCall qlock qstat cfg (SubmitJobToDrainedQueue ops) =
204
  do
205
    let mcs = Config.getMasterCandidates cfg
206
    jobid <- allocateJobId mcs qlock
207
    case jobid of
208
      Bad s -> return . Bad . GenericError $ s
209
      Ok jid -> do
210
        ts <- currentTimestamp
211
        job <- liftM (setReceivedTimestamp ts)
212
                 $ queuedJobFromOpCodes jid ops
213
        qDir <- queueDir
214
        write_result <- writeJobToDisk qDir job
215
        case write_result of
216
          Bad s -> return . Bad . GenericError $ s
217
          Ok () -> do
218
            _ <- replicateManyJobs qDir mcs [job]
219
            _ <- forkIO $ enqueueNewJobs qstat [job]
220
            return . Ok . showJSON . fromJobId $ jid
221

    
222
handleCall qlock qstat cfg (SubmitJob ops) =
223
  do
224
    open <- isQueueOpen
225
    if not open
226
       then return . Bad . GenericError $ "Queue drained"
227
       else handleCall qlock qstat cfg (SubmitJobToDrainedQueue ops)
228

    
229
handleCall qlock qstat cfg (SubmitManyJobs lops) =
230
  do
231
    open <- isQueueOpen
232
    if not open
233
      then return . Bad . GenericError $ "Queue drained"
234
      else do
235
        let mcs = Config.getMasterCandidates cfg
236
        result_jobids <- allocateJobIds mcs qlock (length lops)
237
        case result_jobids of
238
          Bad s -> return . Bad . GenericError $ s
239
          Ok jids -> do
240
            ts <- currentTimestamp
241
            jobs <- liftM (map $ setReceivedTimestamp ts)
242
                      $ zipWithM queuedJobFromOpCodes jids lops
243
            qDir <- queueDir
244
            write_results <- mapM (writeJobToDisk qDir) jobs
245
            let annotated_results = zip write_results jobs
246
                succeeded = map snd $ filter (isOk . fst) annotated_results
247
            when (any isBad write_results) . logWarning
248
              $ "Writing some jobs failed " ++ show annotated_results
249
            replicateManyJobs qDir mcs succeeded
250
            _ <- forkIO $ enqueueNewJobs qstat succeeded
251
            return . Ok . JSArray
252
              . map (\(res, job) ->
253
                      if isOk res
254
                        then showJSON (True, fromJobId $ qjId job)
255
                        else showJSON (False, genericResult id (const "") res))
256
              $ annotated_results
257

    
258
handleCall _ _ cfg (WaitForJobChange jid fields prev_job prev_log tmout) = do
259
  let compute_fn = computeJobUpdate cfg jid fields prev_log 
260
  qDir <- queueDir
261
  -- verify if the job is finalized, and return immediately in this case
262
  jobresult <- loadJobFromDisk qDir False jid
263
  case jobresult of
264
    Ok (job, _) | not (jobFinalized job) -> do
265
      let jobfile = liveJobFile qDir jid
266
      answer <- watchFile jobfile (min tmout C.luxiWfjcTimeout)
267
                  (prev_job, JSArray []) compute_fn
268
      return . Ok $ showJSON answer
269
    _ -> liftM (Ok . showJSON) compute_fn
270

    
271
handleCall _ _ _ op =
272
  return . Bad $
273
    GenericError ("Luxi call '" ++ strOfOp op ++ "' not implemented")
274

    
275
{-# ANN handleCall "HLint: ignore Too strict if" #-}
276

    
277
-- | Query the status of a job and return the requested fields
278
-- and the logs newer than the given log number.
279
computeJobUpdate :: ConfigData -> JobId -> [String] -> JSValue 
280
                    -> IO (JSValue, JSValue)
281
computeJobUpdate cfg jid fields prev_log = do
282
  let sjid = show $ fromJobId jid
283
  logDebug $ "Inspecting fields " ++ show fields ++ " of job " ++ sjid
284
  let fromJSArray (JSArray xs) = xs
285
      fromJSArray _ = []
286
  let logFilter JSNull (JSArray _) = True
287
      logFilter (JSRational _ n) (JSArray (JSRational _ m:_)) = n < m
288
      logFilter _ _ = False
289
  let filterLogs n logs = JSArray (filter (logFilter n) (logs >>= fromJSArray))
290
  jobQuery <- handleClassicQuery cfg (Qlang.ItemTypeLuxi Qlang.QRJob)
291
                [Right . fromIntegral $ fromJobId jid] ("oplog" : fields) False
292
  let (rfields, rlogs) = case jobQuery of
293
        Ok (JSArray [JSArray (JSArray logs : answer)]) ->
294
          (answer, filterLogs prev_log logs)
295
        _ -> (map (const JSNull) fields, JSArray [])
296
  logDebug $ "Updates for job " ++ sjid ++ " are " ++ encode (rfields, rlogs)
297
  return (JSArray rfields, rlogs)
298

    
299

    
300
type LuxiConfig = (MVar (), JQStatus, ConfigReader)
301

    
302
luxiExec
303
    :: LuxiConfig
304
    -> LuxiOp
305
    -> IO (Bool, GenericResult GanetiException JSValue)
306
luxiExec (qlock, qstat, creader) args = do
307
  cfg <- creader
308
  result <- handleCallWrapper qlock qstat cfg args
309
  return (True, result)
310

    
311
luxiHandler :: LuxiConfig -> U.Handler LuxiOp JSValue
312
luxiHandler cfg = U.Handler { U.hParse         = decodeLuxiCall
313
                            , U.hInputLogShort = strOfOp
314
                            , U.hInputLogLong  = show
315
                            , U.hExec          = luxiExec cfg
316
                            }
317

    
318

    
319
-- | Logs an outgoing message.
320
logMsg
321
    :: (Show e, J.JSON e, MonadLog m)
322
    => U.Handler i o
323
    -> i                          -- ^ the received request (used for logging)
324
    -> GenericResult e J.JSValue  -- ^ A message to be sent
325
    -> m ()
326
logMsg handler req (Bad err) =
327
  logWarning $ "Failed to execute request "
328
               ++ U.hInputLogLong handler req ++ ": "
329
               ++ show err
330
logMsg handler req (Ok result) = do
331
  -- only log the first 2,000 chars of the result
332
  logDebug $ "Result (truncated): " ++ take 2000 (J.encode result)
333
  logInfo $ "Successfully handled " ++ U.hInputLogShort handler req
334

    
335
-- | Prepares an outgoing message.
336
prepareMsg
337
    :: (J.JSON e)
338
    => GenericResult e J.JSValue  -- ^ A message to be sent
339
    -> (Bool, J.JSValue)
340
prepareMsg (Bad err)   = (False, J.showJSON err)
341
prepareMsg (Ok result) = (True, result)
342

    
343
handleJsonMessage
344
    :: (J.JSON o)
345
    => U.Handler i o              -- ^ handler
346
    -> i                        -- ^ parsed input
347
    -> U.HandlerResult J.JSValue
348
handleJsonMessage handler req = do
349
  (close, call_result) <- U.hExec handler req
350
  return (close, fmap J.showJSON call_result)
351

    
352
-- | Takes a request as a 'String', parses it, passes it to a handler and
353
-- formats its response.
354
handleRawMessage
355
    :: (J.JSON o)
356
    => U.Handler i o              -- ^ handler
357
    -> String                   -- ^ raw unparsed input
358
    -> IO (Bool, String)
359
handleRawMessage handler payload =
360
  case U.parseCall payload >>= uncurry (U.hParse handler) of
361
    Bad err -> do
362
         let errmsg = "Failed to parse request: " ++ err
363
         logWarning errmsg
364
         return (False, buildResponse False (J.showJSON errmsg))
365
    Ok req -> do
366
        logDebug $ "Request: " ++ U.hInputLogLong handler req
367
        (close, call_result_json) <- handleJsonMessage handler req
368
        logMsg handler req call_result_json
369
        let (status, response) = prepareMsg call_result_json
370
        return (close, buildResponse status response)
371

    
372
-- | Reads a request, passes it to a handler and sends a response back to the
373
-- client.
374
handleClient
375
    :: (J.JSON o)
376
    => U.Handler i o
377
    -> Client
378
    -> IO Bool
379
handleClient handler client = do
380
  msg <- recvMsgExt client
381
  logDebug $ "Received message: " ++ show msg
382
  case msg of
383
    RecvConnClosed -> logDebug "Connection closed" >>
384
                      return False
385
    RecvError err -> logWarning ("Error during message receiving: " ++ err) >>
386
                     return False
387
    RecvOk payload -> do
388
      (close, outMsg) <- handleRawMessage handler payload
389
      sendMsg client outMsg
390
      return close
391

    
392
-- | Main client loop: runs one loop of 'handleClient', and if that
393
-- doesn't report a finished (closed) connection, restarts itself.
394
clientLoop
395
    :: (J.JSON o)
396
    => U.Handler i o
397
    -> Client
398
    -> IO ()
399
clientLoop handler client = do
400
  result <- handleClient handler client
401
  if result
402
    then clientLoop handler client
403
    else closeClient client
404

    
405
-- | Main listener loop: accepts clients, forks an I/O thread to handle
406
-- that client.
407
listener
408
    :: (J.JSON o)
409
    => U.Handler i o
410
    -> Server
411
    -> IO ()
412
listener handler server = do
413
  client <- acceptClient server
414
  _ <- forkIO $ clientLoop handler client
415
  return ()
416

    
417

    
418
-- | Type alias for prepMain results
419
type PrepResult = (Server, IORef (Result ConfigData), JQStatus)
420

    
421
-- | Check function for luxid.
422
checkMain :: CheckFn ()
423
checkMain _ = return $ Right ()
424

    
425
-- | Prepare function for luxid.
426
prepMain :: PrepFn () PrepResult
427
prepMain _ _ = do
428
  socket_path <- Path.defaultQuerySocket
429
  cleanupSocket socket_path
430
  s <- describeError "binding to the Luxi socket"
431
         Nothing (Just socket_path) $ getLuxiServer True socket_path
432
  cref <- newIORef (Bad "Configuration not yet loaded")
433
  jq <- emptyJQStatus 
434
  return (s, cref, jq)
435

    
436
-- | Main function.
437
main :: MainFn () PrepResult
438
main _ _ (server, cref, jq) = do
439
  initConfigReader id cref
440
  let creader = readIORef cref
441
  initJQScheduler jq
442
  
443
  qlockFile <- jobQueueLockFile
444
  lockFile qlockFile >>= exitIfBad "Failed to obtain the job-queue lock"
445
  qlock <- newMVar ()
446

    
447
  finally
448
    (forever $ listener (luxiHandler (qlock, jq, creader)) server)
449
    (closeServer server)