Statistics
| Branch: | Tag: | Revision:

root / src / Ganeti / Query / Server.hs @ 74b3f734

History | View | Annotate | Download (16.7 kB)

1
{-| Implementation of the Ganeti Query2 server.
2

    
3
-}
4

    
5
{-
6

    
7
Copyright (C) 2012, 2013 Google Inc.
8

    
9
This program is free software; you can redistribute it and/or modify
10
it under the terms of the GNU General Public License as published by
11
the Free Software Foundation; either version 2 of the License, or
12
(at your option) any later version.
13

    
14
This program is distributed in the hope that it will be useful, but
15
WITHOUT ANY WARRANTY; without even the implied warranty of
16
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
17
General Public License for more details.
18

    
19
You should have received a copy of the GNU General Public License
20
along with this program; if not, write to the Free Software
21
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22
02110-1301, USA.
23

    
24
-}
25

    
26
module Ganeti.Query.Server
27
  ( main
28
  , checkMain
29
  , prepMain
30
  ) where
31

    
32
import Control.Applicative
33
import Control.Concurrent
34
import Control.Exception
35
import Control.Monad (forever, when, zipWithM, liftM)
36
import Data.Bits (bitSize)
37
import qualified Data.Set as Set (toList)
38
import Data.IORef
39
import Data.Maybe (fromMaybe)
40
import qualified Text.JSON as J
41
import Text.JSON (encode, showJSON, JSValue(..))
42
import System.Info (arch)
43
import System.Directory
44

    
45
import qualified Ganeti.Constants as C
46
import qualified Ganeti.ConstantUtils as ConstantUtils (unFrozenSet)
47
import Ganeti.Errors
48
import qualified Ganeti.Path as Path
49
import Ganeti.Daemon
50
import Ganeti.Objects
51
import qualified Ganeti.Config as Config
52
import Ganeti.ConfigReader
53
import Ganeti.BasicTypes
54
import Ganeti.JQueue
55
import Ganeti.JQScheduler
56
import Ganeti.JSON (TimeAsDoubleJSON(..))
57
import Ganeti.Logging
58
import Ganeti.Luxi
59
import qualified Ganeti.Query.Language as Qlang
60
import qualified Ganeti.Query.Cluster as QCluster
61
import Ganeti.Path (queueDir, jobQueueLockFile, jobQueueDrainFile)
62
import Ganeti.Rpc
63
import Ganeti.Query.Query
64
import Ganeti.Query.Filter (makeSimpleFilter)
65
import Ganeti.Types
66
import qualified Ganeti.UDSServer as U (Handler(..), listener)
67
import Ganeti.Utils (lockFile, exitIfBad, watchFile)
68
import qualified Ganeti.Version as Version
69

    
70
-- | Helper for classic queries.
71
handleClassicQuery :: ConfigData      -- ^ Cluster config
72
                   -> Qlang.ItemType  -- ^ Query type
73
                   -> [Either String Integer] -- ^ Requested names
74
                                              -- (empty means all)
75
                   -> [String]        -- ^ Requested fields
76
                   -> Bool            -- ^ Whether to do sync queries or not
77
                   -> IO (GenericResult GanetiException JSValue)
78
handleClassicQuery _ _ _ _ True =
79
  return . Bad $ OpPrereqError "Sync queries are not allowed" ECodeInval
80
handleClassicQuery cfg qkind names fields _ = do
81
  let flt = makeSimpleFilter (nameField qkind) names
82
  qr <- query cfg True (Qlang.Query qkind fields flt)
83
  return $ showJSON <$> (qr >>= queryCompat)
84

    
85
-- | Minimal wrapper to handle the missing config case.
86
handleCallWrapper :: MVar () -> JQStatus ->  Result ConfigData
87
                     -> LuxiOp -> IO (ErrorResult JSValue)
88
handleCallWrapper _ _ (Bad msg) _ =
89
  return . Bad . ConfigurationError $
90
           "I do not have access to a valid configuration, cannot\
91
           \ process queries: " ++ msg
92
handleCallWrapper qlock qstat (Ok config) op = handleCall qlock qstat config op
93

    
94
-- | Actual luxi operation handler.
95
handleCall :: MVar () -> JQStatus
96
              -> ConfigData -> LuxiOp -> IO (ErrorResult JSValue)
97
handleCall _ _ cdata QueryClusterInfo =
98
  let cluster = configCluster cdata
99
      master = QCluster.clusterMasterNodeName cdata
100
      hypervisors = clusterEnabledHypervisors cluster
101
      diskTemplates = clusterEnabledDiskTemplates cluster
102
      def_hv = case hypervisors of
103
                 x:_ -> showJSON x
104
                 [] -> JSNull
105
      bits = show (bitSize (0::Int)) ++ "bits"
106
      arch_tuple = [bits, arch]
107
      obj = [ ("software_version", showJSON C.releaseVersion)
108
            , ("protocol_version", showJSON C.protocolVersion)
109
            , ("config_version", showJSON C.configVersion)
110
            , ("os_api_version", showJSON . maximum .
111
                                 Set.toList . ConstantUtils.unFrozenSet $
112
                                 C.osApiVersions)
113
            , ("export_version", showJSON C.exportVersion)
114
            , ("vcs_version", showJSON Version.version)
115
            , ("architecture", showJSON arch_tuple)
116
            , ("name", showJSON $ clusterClusterName cluster)
117
            , ("master", showJSON (case master of
118
                                     Ok name -> name
119
                                     _ -> undefined))
120
            , ("default_hypervisor", def_hv)
121
            , ("enabled_hypervisors", showJSON hypervisors)
122
            , ("hvparams", showJSON $ clusterHvparams cluster)
123
            , ("os_hvp", showJSON $ clusterOsHvp cluster)
124
            , ("beparams", showJSON $ clusterBeparams cluster)
125
            , ("osparams", showJSON $ clusterOsparams cluster)
126
            , ("ipolicy", showJSON $ clusterIpolicy cluster)
127
            , ("nicparams", showJSON $ clusterNicparams cluster)
128
            , ("ndparams", showJSON $ clusterNdparams cluster)
129
            , ("diskparams", showJSON $ clusterDiskparams cluster)
130
            , ("candidate_pool_size",
131
               showJSON $ clusterCandidatePoolSize cluster)
132
            , ("master_netdev",  showJSON $ clusterMasterNetdev cluster)
133
            , ("master_netmask", showJSON $ clusterMasterNetmask cluster)
134
            , ("use_external_mip_script",
135
               showJSON $ clusterUseExternalMipScript cluster)
136
            , ("volume_group_name",
137
               maybe JSNull showJSON (clusterVolumeGroupName cluster))
138
            , ("drbd_usermode_helper",
139
               maybe JSNull showJSON (clusterDrbdUsermodeHelper cluster))
140
            , ("file_storage_dir", showJSON $ clusterFileStorageDir cluster)
141
            , ("shared_file_storage_dir",
142
               showJSON $ clusterSharedFileStorageDir cluster)
143
            , ("gluster_storage_dir",
144
               showJSON $ clusterGlusterStorageDir cluster)
145
            , ("maintain_node_health",
146
               showJSON $ clusterMaintainNodeHealth cluster)
147
            , ("ctime", showJSON . TimeAsDoubleJSON $ clusterCtime cluster)
148
            , ("mtime", showJSON . TimeAsDoubleJSON $ clusterMtime cluster)
149
            , ("uuid", showJSON $ clusterUuid cluster)
150
            , ("tags", showJSON $ clusterTags cluster)
151
            , ("uid_pool", showJSON $ clusterUidPool cluster)
152
            , ("default_iallocator",
153
               showJSON $ clusterDefaultIallocator cluster)
154
            , ("default_iallocator_params",
155
              showJSON $ clusterDefaultIallocatorParams cluster)
156
            , ("reserved_lvs", showJSON $ clusterReservedLvs cluster)
157
            , ("primary_ip_version",
158
               showJSON . ipFamilyToVersion $ clusterPrimaryIpFamily cluster)
159
            , ("prealloc_wipe_disks",
160
               showJSON $ clusterPreallocWipeDisks cluster)
161
            , ("hidden_os", showJSON $ clusterHiddenOs cluster)
162
            , ("blacklisted_os", showJSON $ clusterBlacklistedOs cluster)
163
            , ("enabled_disk_templates", showJSON diskTemplates)
164
            ]
165

    
166
  in case master of
167
    Ok _ -> return . Ok . J.makeObj $ obj
168
    Bad ex -> return $ Bad ex
169

    
170
handleCall _ _ cfg (QueryTags kind name) = do
171
  let tags = case kind of
172
               TagKindCluster  -> Ok . clusterTags $ configCluster cfg
173
               TagKindGroup    -> groupTags <$> Config.getGroup    cfg name
174
               TagKindNode     -> nodeTags  <$> Config.getNode     cfg name
175
               TagKindInstance -> instTags  <$> Config.getInstance cfg name
176
               TagKindNetwork  -> Bad $ OpPrereqError
177
                                        "Network tag is not allowed"
178
                                        ECodeInval
179
  return (J.showJSON <$> tags)
180

    
181
handleCall _ _ cfg (Query qkind qfields qfilter) = do
182
  result <- query cfg True (Qlang.Query qkind qfields qfilter)
183
  return $ J.showJSON <$> result
184

    
185
handleCall _ _ _ (QueryFields qkind qfields) = do
186
  let result = queryFields (Qlang.QueryFields qkind qfields)
187
  return $ J.showJSON <$> result
188

    
189
handleCall _ _ cfg (QueryNodes names fields lock) =
190
  handleClassicQuery cfg (Qlang.ItemTypeOpCode Qlang.QRNode)
191
    (map Left names) fields lock
192

    
193
handleCall _ _ cfg (QueryInstances names fields lock) =
194
  handleClassicQuery cfg (Qlang.ItemTypeOpCode Qlang.QRInstance)
195
    (map Left names) fields lock
196

    
197
handleCall _ _ cfg (QueryGroups names fields lock) =
198
  handleClassicQuery cfg (Qlang.ItemTypeOpCode Qlang.QRGroup)
199
    (map Left names) fields lock
200

    
201
handleCall _ _ cfg (QueryJobs names fields) =
202
  handleClassicQuery cfg (Qlang.ItemTypeLuxi Qlang.QRJob)
203
    (map (Right . fromIntegral . fromJobId) names)  fields False
204

    
205
handleCall _ _ cfg (QueryNetworks names fields lock) =
206
  handleClassicQuery cfg (Qlang.ItemTypeOpCode Qlang.QRNetwork)
207
    (map Left names) fields lock
208

    
209
handleCall _ _ cfg (QueryConfigValues fields) = do
210
  let params = [ ("cluster_name", return . showJSON . clusterClusterName
211
                                    . configCluster $ cfg)
212
               , ("watcher_pause", liftM (maybe JSNull showJSON)
213
                                     QCluster.isWatcherPaused)
214
               , ("master_node", return . genericResult (const JSNull) showJSON
215
                                   $ QCluster.clusterMasterNodeName cfg)
216
               , ("drain_flag", liftM (showJSON . not) isQueueOpen)
217
               ] :: [(String, IO JSValue)]
218
  let answer = map (fromMaybe (return JSNull) . flip lookup params) fields
219
  answerEval <- sequence answer
220
  return . Ok . showJSON $ answerEval
221

    
222
handleCall qlock qstat cfg (SubmitJobToDrainedQueue ops) =
223
  do
224
    let mcs = Config.getMasterCandidates cfg
225
    jobid <- allocateJobId mcs qlock
226
    case jobid of
227
      Bad s -> return . Bad . GenericError $ s
228
      Ok jid -> do
229
        ts <- currentTimestamp
230
        job <- liftM (setReceivedTimestamp ts)
231
                 $ queuedJobFromOpCodes jid ops
232
        qDir <- queueDir
233
        write_result <- writeJobToDisk qDir job
234
        case write_result of
235
          Bad s -> return . Bad . GenericError $ s
236
          Ok () -> do
237
            _ <- replicateManyJobs qDir mcs [job]
238
            _ <- forkIO $ enqueueNewJobs qstat [job]
239
            return . Ok . showJSON . fromJobId $ jid
240

    
241
handleCall qlock qstat cfg (SubmitJob ops) =
242
  do
243
    open <- isQueueOpen
244
    if not open
245
       then return . Bad . GenericError $ "Queue drained"
246
       else handleCall qlock qstat cfg (SubmitJobToDrainedQueue ops)
247

    
248
handleCall qlock qstat cfg (SubmitManyJobs lops) =
249
  do
250
    open <- isQueueOpen
251
    if not open
252
      then return . Bad . GenericError $ "Queue drained"
253
      else do
254
        let mcs = Config.getMasterCandidates cfg
255
        result_jobids <- allocateJobIds mcs qlock (length lops)
256
        case result_jobids of
257
          Bad s -> return . Bad . GenericError $ s
258
          Ok jids -> do
259
            ts <- currentTimestamp
260
            jobs <- liftM (map $ setReceivedTimestamp ts)
261
                      $ zipWithM queuedJobFromOpCodes jids lops
262
            qDir <- queueDir
263
            write_results <- mapM (writeJobToDisk qDir) jobs
264
            let annotated_results = zip write_results jobs
265
                succeeded = map snd $ filter (isOk . fst) annotated_results
266
            when (any isBad write_results) . logWarning
267
              $ "Writing some jobs failed " ++ show annotated_results
268
            replicateManyJobs qDir mcs succeeded
269
            _ <- forkIO $ enqueueNewJobs qstat succeeded
270
            return . Ok . JSArray
271
              . map (\(res, job) ->
272
                      if isOk res
273
                        then showJSON (True, fromJobId $ qjId job)
274
                        else showJSON (False, genericResult id (const "") res))
275
              $ annotated_results
276

    
277
handleCall _ _ cfg (WaitForJobChange jid fields prev_job prev_log tmout) = do
278
  let compute_fn = computeJobUpdate cfg jid fields prev_log
279
  qDir <- queueDir
280
  -- verify if the job is finalized, and return immediately in this case
281
  jobresult <- loadJobFromDisk qDir False jid
282
  case jobresult of
283
    Ok (job, _) | not (jobFinalized job) -> do
284
      let jobfile = liveJobFile qDir jid
285
      answer <- watchFile jobfile (min tmout C.luxiWfjcTimeout)
286
                  (prev_job, JSArray []) compute_fn
287
      return . Ok $ showJSON answer
288
    _ -> liftM (Ok . showJSON) compute_fn
289

    
290
handleCall _ _ cfg (SetWatcherPause time) = do
291
  let mcs = Config.getMasterCandidates cfg
292
      masters = genericResult (const []) return
293
                  . Config.getNode cfg . clusterMasterNode
294
                  $ configCluster cfg
295
  _ <- executeRpcCall (masters ++ mcs) $ RpcCallSetWatcherPause time
296
  return . Ok . maybe JSNull showJSON $ time
297

    
298
handleCall _ _ cfg (SetDrainFlag value) = do
299
  let mcs = Config.getMasterCandidates cfg
300
  fpath <- jobQueueDrainFile
301
  if value
302
     then writeFile fpath ""
303
     else removeFile fpath
304
  _ <- executeRpcCall mcs $ RpcCallSetDrainFlag value
305
  return . Ok . showJSON $ True
306

    
307
handleCall _ qstat  cfg (CancelJob jid) = do
308
  let jName = (++) "job " . show $ fromJobId jid
309
  dequeueResult <- dequeueJob qstat jid
310
  case dequeueResult of
311
    Ok True -> do
312
      logDebug $ jName ++ " dequeued, marking as canceled"
313
      qDir <- queueDir
314
      readResult <- loadJobFromDisk qDir True jid
315
      let jobFileFailed = return . Ok . showJSON . (,) False
316
                            . (++) ("Dequeued " ++ jName
317
                                    ++ ", but failed to mark as cancelled: ")
318
                          :: String -> IO (ErrorResult JSValue)
319
      case readResult of
320
        Bad s -> jobFileFailed s
321
        Ok (job, _) -> do
322
          now <- currentTimestamp
323
          let job' = cancelQueuedJob now job
324
              mcs = Config.getMasterCandidates cfg
325
          write_result <- writeJobToDisk qDir job'
326
          case write_result of
327
            Bad s -> jobFileFailed s
328
            Ok () -> do
329
              replicateManyJobs qDir mcs [job']
330
              return . Ok . showJSON $ (True, "Dequeued " ++ jName)
331
    Ok False -> do
332
      logDebug $ jName ++ " not queued; trying to cancel directly"
333
      cancelJob jid
334
    Bad s -> return . Ok . showJSON $ (False, s)
335

    
336
handleCall _ _ _ op =
337
  return . Bad $
338
    GenericError ("Luxi call '" ++ strOfOp op ++ "' not implemented")
339

    
340
{-# ANN handleCall "HLint: ignore Too strict if" #-}
341

    
342
-- | Query the status of a job and return the requested fields
343
-- and the logs newer than the given log number.
344
computeJobUpdate :: ConfigData -> JobId -> [String] -> JSValue
345
                    -> IO (JSValue, JSValue)
346
computeJobUpdate cfg jid fields prev_log = do
347
  let sjid = show $ fromJobId jid
348
  logDebug $ "Inspecting fields " ++ show fields ++ " of job " ++ sjid
349
  let fromJSArray (JSArray xs) = xs
350
      fromJSArray _ = []
351
  let logFilter JSNull (JSArray _) = True
352
      logFilter (JSRational _ n) (JSArray (JSRational _ m:_)) = n < m
353
      logFilter _ _ = False
354
  let filterLogs n logs = JSArray (filter (logFilter n) (logs >>= fromJSArray))
355
  jobQuery <- handleClassicQuery cfg (Qlang.ItemTypeLuxi Qlang.QRJob)
356
                [Right . fromIntegral $ fromJobId jid] ("oplog" : fields) False
357
  let (rfields, rlogs) = case jobQuery of
358
        Ok (JSArray [JSArray (JSArray logs : answer)]) ->
359
          (answer, filterLogs prev_log logs)
360
        _ -> (map (const JSNull) fields, JSArray [])
361
  logDebug $ "Updates for job " ++ sjid ++ " are " ++ encode (rfields, rlogs)
362
  return (JSArray rfields, rlogs)
363

    
364

    
365
type LuxiConfig = (MVar (), JQStatus, ConfigReader)
366

    
367
luxiExec
368
    :: LuxiConfig
369
    -> LuxiOp
370
    -> IO (Bool, GenericResult GanetiException JSValue)
371
luxiExec (qlock, qstat, creader) args = do
372
  cfg <- creader
373
  result <- handleCallWrapper qlock qstat cfg args
374
  return (True, result)
375

    
376
luxiHandler :: LuxiConfig -> U.Handler LuxiOp JSValue
377
luxiHandler cfg = U.Handler { U.hParse         = decodeLuxiCall
378
                            , U.hInputLogShort = strOfOp
379
                            , U.hInputLogLong  = show
380
                            , U.hExec          = luxiExec cfg
381
                            }
382

    
383
-- | Type alias for prepMain results
384
type PrepResult = (Server, IORef (Result ConfigData), JQStatus)
385

    
386
-- | Check function for luxid.
387
checkMain :: CheckFn ()
388
checkMain _ = return $ Right ()
389

    
390
-- | Prepare function for luxid.
391
prepMain :: PrepFn () PrepResult
392
prepMain _ _ = do
393
  socket_path <- Path.defaultQuerySocket
394
  cleanupSocket socket_path
395
  s <- describeError "binding to the Luxi socket"
396
         Nothing (Just socket_path) $ getLuxiServer True socket_path
397
  cref <- newIORef (Bad "Configuration not yet loaded")
398
  jq <- emptyJQStatus
399
  return (s, cref, jq)
400

    
401
-- | Main function.
402
main :: MainFn () PrepResult
403
main _ _ (server, cref, jq) = do
404
  initConfigReader id cref
405
  let creader = readIORef cref
406
  initJQScheduler jq
407

    
408
  qlockFile <- jobQueueLockFile
409
  lockFile qlockFile >>= exitIfBad "Failed to obtain the job-queue lock"
410
  qlock <- newMVar ()
411

    
412
  finally
413
    (forever $ U.listener (luxiHandler (qlock, jq, creader)) server)
414
    (closeServer server)