Statistics
| Branch: | Tag: | Revision:

root / src / Ganeti / Query / Server.hs @ 5e671e0e

History | View | Annotate | Download (15.7 kB)

1
{-# LANGUAGE BangPatterns #-}
2

    
3
{-| Implementation of the Ganeti Query2 server.
4

    
5
-}
6

    
7
{-
8

    
9
Copyright (C) 2012, 2013 Google Inc.
10

    
11
This program is free software; you can redistribute it and/or modify
12
it under the terms of the GNU General Public License as published by
13
the Free Software Foundation; either version 2 of the License, or
14
(at your option) any later version.
15

    
16
This program is distributed in the hope that it will be useful, but
17
WITHOUT ANY WARRANTY; without even the implied warranty of
18
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
19
General Public License for more details.
20

    
21
You should have received a copy of the GNU General Public License
22
along with this program; if not, write to the Free Software
23
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
24
02110-1301, USA.
25

    
26
-}
27

    
28
module Ganeti.Query.Server
29
  ( main
30
  , checkMain
31
  , prepMain
32
  ) where
33

    
34
import Control.Applicative
35
import Control.Concurrent
36
import Control.Exception
37
import Control.Monad (forever, when, zipWithM, liftM)
38
import Data.Bits (bitSize)
39
import qualified Data.Set as Set (toList)
40
import Data.IORef
41
import qualified Text.JSON as J
42
import Text.JSON (encode, showJSON, JSValue(..))
43
import System.Info (arch)
44

    
45
import qualified Ganeti.Constants as C
46
import qualified Ganeti.ConstantUtils as ConstantUtils (unFrozenSet)
47
import Ganeti.Errors
48
import qualified Ganeti.Path as Path
49
import Ganeti.Daemon
50
import Ganeti.Objects
51
import qualified Ganeti.Config as Config
52
import Ganeti.ConfigReader
53
import Ganeti.BasicTypes
54
import Ganeti.JQueue
55
import Ganeti.JQScheduler
56
import Ganeti.Logging
57
import Ganeti.Luxi
58
import qualified Ganeti.Query.Language as Qlang
59
import qualified Ganeti.Query.Cluster as QCluster
60
import Ganeti.Path (queueDir, jobQueueLockFile)
61
import Ganeti.Query.Query
62
import Ganeti.Query.Filter (makeSimpleFilter)
63
import Ganeti.Types
64
import Ganeti.Utils (lockFile, exitIfBad, watchFile)
65
import qualified Ganeti.Version as Version
66

    
67
-- | Helper for classic queries.
68
handleClassicQuery :: ConfigData      -- ^ Cluster config
69
                   -> Qlang.ItemType  -- ^ Query type
70
                   -> [Either String Integer] -- ^ Requested names
71
                                              -- (empty means all)
72
                   -> [String]        -- ^ Requested fields
73
                   -> Bool            -- ^ Whether to do sync queries or not
74
                   -> IO (GenericResult GanetiException JSValue)
75
handleClassicQuery _ _ _ _ True =
76
  return . Bad $ OpPrereqError "Sync queries are not allowed" ECodeInval
77
handleClassicQuery cfg qkind names fields _ = do
78
  let flt = makeSimpleFilter (nameField qkind) names
79
  qr <- query cfg True (Qlang.Query qkind fields flt)
80
  return $ showJSON <$> (qr >>= queryCompat)
81

    
82
-- | Minimal wrapper to handle the missing config case.
83
handleCallWrapper :: MVar () -> JQStatus ->  Result ConfigData 
84
                     -> LuxiOp -> IO (ErrorResult JSValue)
85
handleCallWrapper _ _ (Bad msg) _ =
86
  return . Bad . ConfigurationError $
87
           "I do not have access to a valid configuration, cannot\
88
           \ process queries: " ++ msg
89
handleCallWrapper qlock qstat (Ok config) op = handleCall qlock qstat config op
90

    
91
-- | Actual luxi operation handler.
92
handleCall :: MVar () -> JQStatus 
93
              -> ConfigData -> LuxiOp -> IO (ErrorResult JSValue)
94
handleCall _ _ cdata QueryClusterInfo =
95
  let cluster = configCluster cdata
96
      master = QCluster.clusterMasterNodeName cdata
97
      hypervisors = clusterEnabledHypervisors cluster
98
      diskTemplates = clusterEnabledDiskTemplates cluster
99
      def_hv = case hypervisors of
100
                 x:_ -> showJSON x
101
                 [] -> JSNull
102
      bits = show (bitSize (0::Int)) ++ "bits"
103
      arch_tuple = [bits, arch]
104
      obj = [ ("software_version", showJSON C.releaseVersion)
105
            , ("protocol_version", showJSON C.protocolVersion)
106
            , ("config_version", showJSON C.configVersion)
107
            , ("os_api_version", showJSON . maximum .
108
                                 Set.toList . ConstantUtils.unFrozenSet $
109
                                 C.osApiVersions)
110
            , ("export_version", showJSON C.exportVersion)
111
            , ("vcs_version", showJSON Version.version)
112
            , ("architecture", showJSON arch_tuple)
113
            , ("name", showJSON $ clusterClusterName cluster)
114
            , ("master", showJSON (case master of
115
                                     Ok name -> name
116
                                     _ -> undefined))
117
            , ("default_hypervisor", def_hv)
118
            , ("enabled_hypervisors", showJSON hypervisors)
119
            , ("hvparams", showJSON $ clusterHvparams cluster)
120
            , ("os_hvp", showJSON $ clusterOsHvp cluster)
121
            , ("beparams", showJSON $ clusterBeparams cluster)
122
            , ("osparams", showJSON $ clusterOsparams cluster)
123
            , ("ipolicy", showJSON $ clusterIpolicy cluster)
124
            , ("nicparams", showJSON $ clusterNicparams cluster)
125
            , ("ndparams", showJSON $ clusterNdparams cluster)
126
            , ("diskparams", showJSON $ clusterDiskparams cluster)
127
            , ("candidate_pool_size",
128
               showJSON $ clusterCandidatePoolSize cluster)
129
            , ("master_netdev",  showJSON $ clusterMasterNetdev cluster)
130
            , ("master_netmask", showJSON $ clusterMasterNetmask cluster)
131
            , ("use_external_mip_script",
132
               showJSON $ clusterUseExternalMipScript cluster)
133
            , ("volume_group_name",
134
               maybe JSNull showJSON (clusterVolumeGroupName cluster))
135
            , ("drbd_usermode_helper",
136
               maybe JSNull showJSON (clusterDrbdUsermodeHelper cluster))
137
            , ("file_storage_dir", showJSON $ clusterFileStorageDir cluster)
138
            , ("shared_file_storage_dir",
139
               showJSON $ clusterSharedFileStorageDir cluster)
140
            , ("maintain_node_health",
141
               showJSON $ clusterMaintainNodeHealth cluster)
142
            , ("ctime", showJSON $ clusterCtime cluster)
143
            , ("mtime", showJSON $ clusterMtime cluster)
144
            , ("uuid", showJSON $ clusterUuid cluster)
145
            , ("tags", showJSON $ clusterTags cluster)
146
            , ("uid_pool", showJSON $ clusterUidPool cluster)
147
            , ("default_iallocator",
148
               showJSON $ clusterDefaultIallocator cluster)
149
            , ("default_iallocator_params",
150
              showJSON $ clusterDefaultIallocatorParams cluster)
151
            , ("reserved_lvs", showJSON $ clusterReservedLvs cluster)
152
            , ("primary_ip_version",
153
               showJSON . ipFamilyToVersion $ clusterPrimaryIpFamily cluster)
154
            , ("prealloc_wipe_disks",
155
               showJSON $ clusterPreallocWipeDisks cluster)
156
            , ("hidden_os", showJSON $ clusterHiddenOs cluster)
157
            , ("blacklisted_os", showJSON $ clusterBlacklistedOs cluster)
158
            , ("enabled_disk_templates", showJSON diskTemplates)
159
            ]
160

    
161
  in case master of
162
    Ok _ -> return . Ok . J.makeObj $ obj
163
    Bad ex -> return $ Bad ex
164

    
165
handleCall _ _ cfg (QueryTags kind name) = do
166
  let tags = case kind of
167
               TagKindCluster  -> Ok . clusterTags $ configCluster cfg
168
               TagKindGroup    -> groupTags <$> Config.getGroup    cfg name
169
               TagKindNode     -> nodeTags  <$> Config.getNode     cfg name
170
               TagKindInstance -> instTags  <$> Config.getInstance cfg name
171
               TagKindNetwork  -> Bad $ OpPrereqError
172
                                        "Network tag is not allowed"
173
                                        ECodeInval
174
  return (J.showJSON <$> tags)
175

    
176
handleCall _ _ cfg (Query qkind qfields qfilter) = do
177
  result <- query cfg True (Qlang.Query qkind qfields qfilter)
178
  return $ J.showJSON <$> result
179

    
180
handleCall _ _ _ (QueryFields qkind qfields) = do
181
  let result = queryFields (Qlang.QueryFields qkind qfields)
182
  return $ J.showJSON <$> result
183

    
184
handleCall _ _ cfg (QueryNodes names fields lock) =
185
  handleClassicQuery cfg (Qlang.ItemTypeOpCode Qlang.QRNode)
186
    (map Left names) fields lock
187

    
188
handleCall _ _ cfg (QueryInstances names fields lock) =
189
  handleClassicQuery cfg (Qlang.ItemTypeOpCode Qlang.QRInstance)
190
    (map Left names) fields lock
191

    
192
handleCall _ _ cfg (QueryGroups names fields lock) =
193
  handleClassicQuery cfg (Qlang.ItemTypeOpCode Qlang.QRGroup)
194
    (map Left names) fields lock
195

    
196
handleCall _ _ cfg (QueryJobs names fields) =
197
  handleClassicQuery cfg (Qlang.ItemTypeLuxi Qlang.QRJob)
198
    (map (Right . fromIntegral . fromJobId) names)  fields False
199

    
200
handleCall _ _ cfg (QueryNetworks names fields lock) =
201
  handleClassicQuery cfg (Qlang.ItemTypeOpCode Qlang.QRNetwork)
202
    (map Left names) fields lock
203

    
204
handleCall qlock qstat cfg (SubmitJobToDrainedQueue ops) =
205
  do
206
    let mcs = Config.getMasterCandidates cfg
207
    jobid <- allocateJobId mcs qlock
208
    case jobid of
209
      Bad s -> return . Bad . GenericError $ s
210
      Ok jid -> do
211
        ts <- currentTimestamp
212
        job <- liftM (setReceivedTimestamp ts)
213
                 $ queuedJobFromOpCodes jid ops
214
        qDir <- queueDir
215
        write_result <- writeJobToDisk qDir job
216
        case write_result of
217
          Bad s -> return . Bad . GenericError $ s
218
          Ok () -> do
219
            _ <- replicateManyJobs qDir mcs [job]
220
            _ <- forkIO $ enqueueNewJobs qstat [job]
221
            return . Ok . showJSON . fromJobId $ jid
222

    
223
handleCall qlock qstat cfg (SubmitJob ops) =
224
  do
225
    open <- isQueueOpen
226
    if not open
227
       then return . Bad . GenericError $ "Queue drained"
228
       else handleCall qlock qstat cfg (SubmitJobToDrainedQueue ops)
229

    
230
handleCall qlock qstat cfg (SubmitManyJobs lops) =
231
  do
232
    open <- isQueueOpen
233
    if not open
234
      then return . Bad . GenericError $ "Queue drained"
235
      else do
236
        let mcs = Config.getMasterCandidates cfg
237
        result_jobids <- allocateJobIds mcs qlock (length lops)
238
        case result_jobids of
239
          Bad s -> return . Bad . GenericError $ s
240
          Ok jids -> do
241
            ts <- currentTimestamp
242
            jobs <- liftM (map $ setReceivedTimestamp ts)
243
                      $ zipWithM queuedJobFromOpCodes jids lops
244
            qDir <- queueDir
245
            write_results <- mapM (writeJobToDisk qDir) jobs
246
            let annotated_results = zip write_results jobs
247
                succeeded = map snd $ filter (isOk . fst) annotated_results
248
            when (any isBad write_results) . logWarning
249
              $ "Writing some jobs failed " ++ show annotated_results
250
            replicateManyJobs qDir mcs succeeded
251
            _ <- forkIO $ enqueueNewJobs qstat succeeded
252
            return . Ok . JSArray
253
              . map (\(res, job) ->
254
                      if isOk res
255
                        then showJSON (True, fromJobId $ qjId job)
256
                        else showJSON (False, genericResult id (const "") res))
257
              $ annotated_results
258

    
259
handleCall _ _ cfg (WaitForJobChange jid fields prev_job prev_log tmout) = do
260
  let compute_fn = computeJobUpdate cfg jid fields prev_log 
261
  qDir <- queueDir
262
  -- verify if the job is finalized, and return immediately in this case
263
  jobresult <- loadJobFromDisk qDir False jid
264
  case jobresult of
265
    Ok (job, _) | not (jobFinalized job) -> do
266
      let jobfile = liveJobFile qDir jid
267
      answer <- watchFile jobfile (min tmout C.luxiWfjcTimeout)
268
                  (prev_job, JSArray []) compute_fn
269
      return . Ok $ showJSON answer
270
    _ -> liftM (Ok . showJSON) compute_fn
271

    
272
handleCall _ _ _ op =
273
  return . Bad $
274
    GenericError ("Luxi call '" ++ strOfOp op ++ "' not implemented")
275

    
276
{-# ANN handleCall "HLint: ignore Too strict if" #-}
277

    
278
-- | Query the status of a job and return the requested fields
279
-- and the logs newer than the given log number.
280
computeJobUpdate :: ConfigData -> JobId -> [String] -> JSValue 
281
                    -> IO (JSValue, JSValue)
282
computeJobUpdate cfg jid fields prev_log = do
283
  let sjid = show $ fromJobId jid
284
  logDebug $ "Inspecting fields " ++ show fields ++ " of job " ++ sjid
285
  let fromJSArray (JSArray xs) = xs
286
      fromJSArray _ = []
287
  let logFilter JSNull (JSArray _) = True
288
      logFilter (JSRational _ n) (JSArray (JSRational _ m:_)) = n < m
289
      logFilter _ _ = False
290
  let filterLogs n logs = JSArray (filter (logFilter n) (logs >>= fromJSArray))
291
  jobQuery <- handleClassicQuery cfg (Qlang.ItemTypeLuxi Qlang.QRJob)
292
                [Right . fromIntegral $ fromJobId jid] ("oplog" : fields) False
293
  let (rfields, rlogs) = case jobQuery of
294
        Ok (JSArray [JSArray (JSArray logs : answer)]) ->
295
          (answer, filterLogs prev_log logs)
296
        _ -> (map (const JSNull) fields, JSArray [])
297
  logDebug $ "Updates for job " ++ sjid ++ " are " ++ encode (rfields, rlogs)
298
  return (JSArray rfields, rlogs)
299

    
300
-- | Given a decoded luxi request, executes it and sends the luxi
301
-- response back to the client.
302
handleClientMsg :: MVar () -> JQStatus -> Client -> ConfigReader
303
                   -> LuxiOp -> IO Bool
304
handleClientMsg qlock qstat client creader args = do
305
  cfg <- creader
306
  logDebug $ "Request: " ++ show args
307
  call_result <- handleCallWrapper qlock qstat cfg args
308
  (!status, !rval) <-
309
    case call_result of
310
      Bad err -> do
311
        logWarning $ "Failed to execute request " ++ show args ++ ": "
312
                     ++ show err
313
        return (False, showJSON err)
314
      Ok result -> do
315
        -- only log the first 2,000 chars of the result
316
        logDebug $ "Result (truncated): " ++ take 2000 (J.encode result)
317
        logInfo $ "Successfully handled " ++ strOfOp args
318
        return (True, result)
319
  sendMsg client $ buildResponse status rval
320
  return True
321

    
322
-- | Handles one iteration of the client protocol: receives message,
323
-- checks it for validity and decodes it, returns response.
324
handleClient :: MVar () -> JQStatus -> Client -> ConfigReader -> IO Bool
325
handleClient qlock qstat client creader = do
326
  !msg <- recvMsgExt client
327
  logDebug $ "Received message: " ++ show msg
328
  case msg of
329
    RecvConnClosed -> logDebug "Connection closed" >> return False
330
    RecvError err -> logWarning ("Error during message receiving: " ++ err) >>
331
                     return False
332
    RecvOk payload ->
333
      case validateCall payload >>= decodeCall of
334
        Bad err -> do
335
             let errmsg = "Failed to parse request: " ++ err
336
             logWarning errmsg
337
             sendMsg client $ buildResponse False (showJSON errmsg)
338
             return False
339
        Ok args -> handleClientMsg qlock qstat client creader args
340

    
341
-- | Main client loop: runs one loop of 'handleClient', and if that
342
-- doesn't report a finished (closed) connection, restarts itself.
343
clientLoop :: MVar () -> JQStatus -> Client -> ConfigReader -> IO ()
344
clientLoop qlock qstat client creader = do
345
  result <- handleClient qlock qstat client creader
346
  if result
347
    then clientLoop qlock qstat client creader
348
    else closeClient client
349

    
350
-- | Main listener loop: accepts clients, forks an I/O thread to handle
351
-- that client.
352
listener :: MVar () -> JQStatus -> ConfigReader -> Server -> IO ()
353
listener qlock qstat creader socket = do
354
  client <- acceptClient socket
355
  _ <- forkIO $ clientLoop qlock qstat client creader
356
  return ()
357

    
358
-- | Type alias for prepMain results
359
type PrepResult = (Server, IORef (Result ConfigData), JQStatus)
360

    
361
-- | Check function for luxid.
362
checkMain :: CheckFn ()
363
checkMain _ = return $ Right ()
364

    
365
-- | Prepare function for luxid.
366
prepMain :: PrepFn () PrepResult
367
prepMain _ _ = do
368
  socket_path <- Path.defaultQuerySocket
369
  cleanupSocket socket_path
370
  s <- describeError "binding to the Luxi socket"
371
         Nothing (Just socket_path) $ getLuxiServer True socket_path
372
  cref <- newIORef (Bad "Configuration not yet loaded")
373
  jq <- emptyJQStatus 
374
  return (s, cref, jq)
375

    
376
-- | Main function.
377
main :: MainFn () PrepResult
378
main _ _ (server, cref, jq) = do
379
  initConfigReader id cref
380
  let creader = readIORef cref
381
  initJQScheduler jq
382
  
383
  qlockFile <- jobQueueLockFile
384
  lockFile qlockFile >>= exitIfBad "Failed to obtain the job-queue lock"
385
  qlock <- newMVar ()
386

    
387
  finally
388
    (forever $ listener qlock jq creader server)
389
    (closeServer server)