Statistics
| Branch: | Tag: | Revision:

root / src / Ganeti / Query / Server.hs @ 6222b3a3

History | View | Annotate | Download (15.8 kB)

1
{-# LANGUAGE BangPatterns #-}
2

    
3
{-| Implementation of the Ganeti Query2 server.
4

    
5
-}
6

    
7
{-
8

    
9
Copyright (C) 2012, 2013 Google Inc.
10

    
11
This program is free software; you can redistribute it and/or modify
12
it under the terms of the GNU General Public License as published by
13
the Free Software Foundation; either version 2 of the License, or
14
(at your option) any later version.
15

    
16
This program is distributed in the hope that it will be useful, but
17
WITHOUT ANY WARRANTY; without even the implied warranty of
18
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
19
General Public License for more details.
20

    
21
You should have received a copy of the GNU General Public License
22
along with this program; if not, write to the Free Software
23
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
24
02110-1301, USA.
25

    
26
-}
27

    
28
module Ganeti.Query.Server
29
  ( main
30
  , checkMain
31
  , prepMain
32
  ) where
33

    
34
import Control.Applicative
35
import Control.Concurrent
36
import Control.Exception
37
import Control.Monad (forever, when, zipWithM, liftM)
38
import Data.Bits (bitSize)
39
import qualified Data.Set as Set (toList)
40
import Data.IORef
41
import qualified Network.Socket as S
42
import qualified Text.JSON as J
43
import Text.JSON (encode, showJSON, JSValue(..))
44
import System.Info (arch)
45

    
46
import qualified Ganeti.Constants as C
47
import qualified Ganeti.ConstantUtils as ConstantUtils (unFrozenSet)
48
import Ganeti.Errors
49
import qualified Ganeti.Path as Path
50
import Ganeti.Daemon
51
import Ganeti.Objects
52
import qualified Ganeti.Config as Config
53
import Ganeti.ConfigReader
54
import Ganeti.BasicTypes
55
import Ganeti.JQueue
56
import Ganeti.JQScheduler
57
import Ganeti.Logging
58
import Ganeti.Luxi
59
import qualified Ganeti.Query.Language as Qlang
60
import qualified Ganeti.Query.Cluster as QCluster
61
import Ganeti.Path (queueDir, jobQueueLockFile)
62
import Ganeti.Query.Query
63
import Ganeti.Query.Filter (makeSimpleFilter)
64
import Ganeti.Types
65
import Ganeti.Utils (lockFile, exitIfBad, watchFile)
66
import qualified Ganeti.Version as Version
67

    
68
-- | Helper for classic queries.
69
handleClassicQuery :: ConfigData      -- ^ Cluster config
70
                   -> Qlang.ItemType  -- ^ Query type
71
                   -> [Either String Integer] -- ^ Requested names
72
                                              -- (empty means all)
73
                   -> [String]        -- ^ Requested fields
74
                   -> Bool            -- ^ Whether to do sync queries or not
75
                   -> IO (GenericResult GanetiException JSValue)
76
handleClassicQuery _ _ _ _ True =
77
  return . Bad $ OpPrereqError "Sync queries are not allowed" ECodeInval
78
handleClassicQuery cfg qkind names fields _ = do
79
  let flt = makeSimpleFilter (nameField qkind) names
80
  qr <- query cfg True (Qlang.Query qkind fields flt)
81
  return $ showJSON <$> (qr >>= queryCompat)
82

    
83
-- | Minimal wrapper to handle the missing config case.
84
handleCallWrapper :: MVar () -> JQStatus ->  Result ConfigData 
85
                     -> LuxiOp -> IO (ErrorResult JSValue)
86
handleCallWrapper _ _ (Bad msg) _ =
87
  return . Bad . ConfigurationError $
88
           "I do not have access to a valid configuration, cannot\
89
           \ process queries: " ++ msg
90
handleCallWrapper qlock qstat (Ok config) op = handleCall qlock qstat config op
91

    
92
-- | Actual luxi operation handler.
93
handleCall :: MVar () -> JQStatus 
94
              -> ConfigData -> LuxiOp -> IO (ErrorResult JSValue)
95
handleCall _ _ cdata QueryClusterInfo =
96
  let cluster = configCluster cdata
97
      master = QCluster.clusterMasterNodeName cdata
98
      hypervisors = clusterEnabledHypervisors cluster
99
      diskTemplates = clusterEnabledDiskTemplates cluster
100
      def_hv = case hypervisors of
101
                 x:_ -> showJSON x
102
                 [] -> JSNull
103
      bits = show (bitSize (0::Int)) ++ "bits"
104
      arch_tuple = [bits, arch]
105
      obj = [ ("software_version", showJSON C.releaseVersion)
106
            , ("protocol_version", showJSON C.protocolVersion)
107
            , ("config_version", showJSON C.configVersion)
108
            , ("os_api_version", showJSON . maximum .
109
                                 Set.toList . ConstantUtils.unFrozenSet $
110
                                 C.osApiVersions)
111
            , ("export_version", showJSON C.exportVersion)
112
            , ("vcs_version", showJSON Version.version)
113
            , ("architecture", showJSON arch_tuple)
114
            , ("name", showJSON $ clusterClusterName cluster)
115
            , ("master", showJSON (case master of
116
                                     Ok name -> name
117
                                     _ -> undefined))
118
            , ("default_hypervisor", def_hv)
119
            , ("enabled_hypervisors", showJSON hypervisors)
120
            , ("hvparams", showJSON $ clusterHvparams cluster)
121
            , ("os_hvp", showJSON $ clusterOsHvp cluster)
122
            , ("beparams", showJSON $ clusterBeparams cluster)
123
            , ("osparams", showJSON $ clusterOsparams cluster)
124
            , ("ipolicy", showJSON $ clusterIpolicy cluster)
125
            , ("nicparams", showJSON $ clusterNicparams cluster)
126
            , ("ndparams", showJSON $ clusterNdparams cluster)
127
            , ("diskparams", showJSON $ clusterDiskparams cluster)
128
            , ("candidate_pool_size",
129
               showJSON $ clusterCandidatePoolSize cluster)
130
            , ("master_netdev",  showJSON $ clusterMasterNetdev cluster)
131
            , ("master_netmask", showJSON $ clusterMasterNetmask cluster)
132
            , ("use_external_mip_script",
133
               showJSON $ clusterUseExternalMipScript cluster)
134
            , ("volume_group_name",
135
               maybe JSNull showJSON (clusterVolumeGroupName cluster))
136
            , ("drbd_usermode_helper",
137
               maybe JSNull showJSON (clusterDrbdUsermodeHelper cluster))
138
            , ("file_storage_dir", showJSON $ clusterFileStorageDir cluster)
139
            , ("shared_file_storage_dir",
140
               showJSON $ clusterSharedFileStorageDir cluster)
141
            , ("maintain_node_health",
142
               showJSON $ clusterMaintainNodeHealth cluster)
143
            , ("ctime", showJSON $ clusterCtime cluster)
144
            , ("mtime", showJSON $ clusterMtime cluster)
145
            , ("uuid", showJSON $ clusterUuid cluster)
146
            , ("tags", showJSON $ clusterTags cluster)
147
            , ("uid_pool", showJSON $ clusterUidPool cluster)
148
            , ("default_iallocator",
149
               showJSON $ clusterDefaultIallocator cluster)
150
            , ("default_iallocator_params",
151
              showJSON $ clusterDefaultIallocatorParams cluster)
152
            , ("reserved_lvs", showJSON $ clusterReservedLvs cluster)
153
            , ("primary_ip_version",
154
               showJSON . ipFamilyToVersion $ clusterPrimaryIpFamily cluster)
155
            , ("prealloc_wipe_disks",
156
               showJSON $ clusterPreallocWipeDisks cluster)
157
            , ("hidden_os", showJSON $ clusterHiddenOs cluster)
158
            , ("blacklisted_os", showJSON $ clusterBlacklistedOs cluster)
159
            , ("enabled_disk_templates", showJSON diskTemplates)
160
            ]
161

    
162
  in case master of
163
    Ok _ -> return . Ok . J.makeObj $ obj
164
    Bad ex -> return $ Bad ex
165

    
166
handleCall _ _ cfg (QueryTags kind name) = do
167
  let tags = case kind of
168
               TagKindCluster  -> Ok . clusterTags $ configCluster cfg
169
               TagKindGroup    -> groupTags <$> Config.getGroup    cfg name
170
               TagKindNode     -> nodeTags  <$> Config.getNode     cfg name
171
               TagKindInstance -> instTags  <$> Config.getInstance cfg name
172
               TagKindNetwork  -> Bad $ OpPrereqError
173
                                        "Network tag is not allowed"
174
                                        ECodeInval
175
  return (J.showJSON <$> tags)
176

    
177
handleCall _ _ cfg (Query qkind qfields qfilter) = do
178
  result <- query cfg True (Qlang.Query qkind qfields qfilter)
179
  return $ J.showJSON <$> result
180

    
181
handleCall _ _ _ (QueryFields qkind qfields) = do
182
  let result = queryFields (Qlang.QueryFields qkind qfields)
183
  return $ J.showJSON <$> result
184

    
185
handleCall _ _ cfg (QueryNodes names fields lock) =
186
  handleClassicQuery cfg (Qlang.ItemTypeOpCode Qlang.QRNode)
187
    (map Left names) fields lock
188

    
189
handleCall _ _ cfg (QueryInstances names fields lock) =
190
  handleClassicQuery cfg (Qlang.ItemTypeOpCode Qlang.QRInstance)
191
    (map Left names) fields lock
192

    
193
handleCall _ _ cfg (QueryGroups names fields lock) =
194
  handleClassicQuery cfg (Qlang.ItemTypeOpCode Qlang.QRGroup)
195
    (map Left names) fields lock
196

    
197
handleCall _ _ cfg (QueryJobs names fields) =
198
  handleClassicQuery cfg (Qlang.ItemTypeLuxi Qlang.QRJob)
199
    (map (Right . fromIntegral . fromJobId) names)  fields False
200

    
201
handleCall _ _ cfg (QueryNetworks names fields lock) =
202
  handleClassicQuery cfg (Qlang.ItemTypeOpCode Qlang.QRNetwork)
203
    (map Left names) fields lock
204

    
205
handleCall qlock qstat cfg (SubmitJobToDrainedQueue ops) =
206
  do
207
    let mcs = Config.getMasterCandidates cfg
208
    jobid <- allocateJobId mcs qlock
209
    case jobid of
210
      Bad s -> return . Bad . GenericError $ s
211
      Ok jid -> do
212
        ts <- currentTimestamp
213
        job <- liftM (setReceivedTimestamp ts)
214
                 $ queuedJobFromOpCodes jid ops
215
        qDir <- queueDir
216
        write_result <- writeJobToDisk qDir job
217
        case write_result of
218
          Bad s -> return . Bad . GenericError $ s
219
          Ok () -> do
220
            _ <- replicateManyJobs qDir mcs [job]
221
            _ <- forkIO $ enqueueNewJobs qstat [job]
222
            return . Ok . showJSON . fromJobId $ jid
223

    
224
handleCall qlock qstat cfg (SubmitJob ops) =
225
  do
226
    open <- isQueueOpen
227
    if not open
228
       then return . Bad . GenericError $ "Queue drained"
229
       else handleCall qlock qstat cfg (SubmitJobToDrainedQueue ops)
230

    
231
handleCall qlock qstat cfg (SubmitManyJobs lops) =
232
  do
233
    open <- isQueueOpen
234
    if not open
235
      then return . Bad . GenericError $ "Queue drained"
236
      else do
237
        let mcs = Config.getMasterCandidates cfg
238
        result_jobids <- allocateJobIds mcs qlock (length lops)
239
        case result_jobids of
240
          Bad s -> return . Bad . GenericError $ s
241
          Ok jids -> do
242
            ts <- currentTimestamp
243
            jobs <- liftM (map $ setReceivedTimestamp ts)
244
                      $ zipWithM queuedJobFromOpCodes jids lops
245
            qDir <- queueDir
246
            write_results <- mapM (writeJobToDisk qDir) jobs
247
            let annotated_results = zip write_results jobs
248
                succeeded = map snd $ filter (isOk . fst) annotated_results
249
            when (any isBad write_results) . logWarning
250
              $ "Writing some jobs failed " ++ show annotated_results
251
            replicateManyJobs qDir mcs succeeded
252
            _ <- forkIO $ enqueueNewJobs qstat succeeded
253
            return . Ok . JSArray
254
              . map (\(res, job) ->
255
                      if isOk res
256
                        then showJSON (True, fromJobId $ qjId job)
257
                        else showJSON (False, genericResult id (const "") res))
258
              $ annotated_results
259

    
260
handleCall _ _ cfg (WaitForJobChange jid fields prev_job prev_log tmout) = do
261
  let compute_fn = computeJobUpdate cfg jid fields prev_log 
262
  qDir <- queueDir
263
  -- verify if the job is finalized, and return immediately in this case
264
  jobresult <- loadJobFromDisk qDir False jid
265
  case jobresult of
266
    Ok (job, _) | not (jobFinalized job) -> do
267
      let jobfile = liveJobFile qDir jid
268
      answer <- watchFile jobfile (min tmout C.luxiWfjcTimeout)
269
                  (prev_job, JSArray []) compute_fn
270
      return . Ok $ showJSON answer
271
    _ -> liftM (Ok . showJSON) compute_fn
272

    
273
handleCall _ _ _ op =
274
  return . Bad $
275
    GenericError ("Luxi call '" ++ strOfOp op ++ "' not implemented")
276

    
277
{-# ANN handleCall "HLint: ignore Too strict if" #-}
278

    
279
-- | Query the status of a job and return the requested fields
280
-- and the logs newer than the given log number.
281
computeJobUpdate :: ConfigData -> JobId -> [String] -> JSValue 
282
                    -> IO (JSValue, JSValue)
283
computeJobUpdate cfg jid fields prev_log = do
284
  let sjid = show $ fromJobId jid
285
  logDebug $ "Inspecting fields " ++ show fields ++ " of job " ++ sjid
286
  let fromJSArray (JSArray xs) = xs
287
      fromJSArray _ = []
288
  let logFilter JSNull (JSArray _) = True
289
      logFilter (JSRational _ n) (JSArray (JSRational _ m:_)) = n < m
290
      logFilter _ _ = False
291
  let filterLogs n logs = JSArray (filter (logFilter n) (logs >>= fromJSArray))
292
  jobQuery <- handleClassicQuery cfg (Qlang.ItemTypeLuxi Qlang.QRJob)
293
                [Right . fromIntegral $ fromJobId jid] ("oplog" : fields) False
294
  let (rfields, rlogs) = case jobQuery of
295
        Ok (JSArray [JSArray (JSArray logs : answer)]) ->
296
          (answer, filterLogs prev_log logs)
297
        _ -> (map (const JSNull) fields, JSArray [])
298
  logDebug $ "Updates for job " ++ sjid ++ " are " ++ encode (rfields, rlogs)
299
  return (JSArray rfields, rlogs)
300

    
301
-- | Given a decoded luxi request, executes it and sends the luxi
302
-- response back to the client.
303
handleClientMsg :: MVar () -> JQStatus -> Client -> ConfigReader
304
                   -> LuxiOp -> IO Bool
305
handleClientMsg qlock qstat client creader args = do
306
  cfg <- creader
307
  logDebug $ "Request: " ++ show args
308
  call_result <- handleCallWrapper qlock qstat cfg args
309
  (!status, !rval) <-
310
    case call_result of
311
      Bad err -> do
312
        logWarning $ "Failed to execute request " ++ show args ++ ": "
313
                     ++ show err
314
        return (False, showJSON err)
315
      Ok result -> do
316
        -- only log the first 2,000 chars of the result
317
        logDebug $ "Result (truncated): " ++ take 2000 (J.encode result)
318
        logInfo $ "Successfully handled " ++ strOfOp args
319
        return (True, result)
320
  sendMsg client $ buildResponse status rval
321
  return True
322

    
323
-- | Handles one iteration of the client protocol: receives message,
324
-- checks it for validity and decodes it, returns response.
325
handleClient :: MVar () -> JQStatus -> Client -> ConfigReader -> IO Bool
326
handleClient qlock qstat client creader = do
327
  !msg <- recvMsgExt client
328
  logDebug $ "Received message: " ++ show msg
329
  case msg of
330
    RecvConnClosed -> logDebug "Connection closed" >> return False
331
    RecvError err -> logWarning ("Error during message receiving: " ++ err) >>
332
                     return False
333
    RecvOk payload ->
334
      case validateCall payload >>= decodeCall of
335
        Bad err -> do
336
             let errmsg = "Failed to parse request: " ++ err
337
             logWarning errmsg
338
             sendMsg client $ buildResponse False (showJSON errmsg)
339
             return False
340
        Ok args -> handleClientMsg qlock qstat client creader args
341

    
342
-- | Main client loop: runs one loop of 'handleClient', and if that
343
-- doesn't report a finished (closed) connection, restarts itself.
344
clientLoop :: MVar () -> JQStatus -> Client -> ConfigReader -> IO ()
345
clientLoop qlock qstat client creader = do
346
  result <- handleClient qlock qstat client creader
347
  if result
348
    then clientLoop qlock qstat client creader
349
    else closeClient client
350

    
351
-- | Main listener loop: accepts clients, forks an I/O thread to handle
352
-- that client.
353
listener :: MVar () -> JQStatus -> ConfigReader -> S.Socket -> IO ()
354
listener qlock qstat creader socket = do
355
  client <- acceptClient socket
356
  _ <- forkIO $ clientLoop qlock qstat client creader
357
  return ()
358

    
359
-- | Type alias for prepMain results
360
type PrepResult = (FilePath, S.Socket, IORef (Result ConfigData), JQStatus)
361

    
362
-- | Check function for luxid.
363
checkMain :: CheckFn ()
364
checkMain _ = return $ Right ()
365

    
366
-- | Prepare function for luxid.
367
prepMain :: PrepFn () PrepResult
368
prepMain _ _ = do
369
  socket_path <- Path.defaultQuerySocket
370
  cleanupSocket socket_path
371
  s <- describeError "binding to the Luxi socket"
372
         Nothing (Just socket_path) $ getServer True socket_path
373
  cref <- newIORef (Bad "Configuration not yet loaded")
374
  jq <- emptyJQStatus 
375
  return (socket_path, s, cref, jq)
376

    
377
-- | Main function.
378
main :: MainFn () PrepResult
379
main _ _ (socket_path, server, cref, jq) = do
380
  initConfigReader id cref
381
  let creader = readIORef cref
382
  initJQScheduler jq
383
  
384
  qlockFile <- jobQueueLockFile
385
  lockFile qlockFile >>= exitIfBad "Failed to obtain the job-queue lock"
386
  qlock <- newMVar ()
387

    
388
  finally
389
    (forever $ listener qlock jq creader server)
390
    (closeServer socket_path server)