Statistics
| Branch: | Tag: | Revision:

root / src / Ganeti / Query / Server.hs @ 36cb6837

History | View | Annotate | Download (16.6 kB)

1
{-| Implementation of the Ganeti Query2 server.
2

    
3
-}
4

    
5
{-
6

    
7
Copyright (C) 2012, 2013 Google Inc.
8

    
9
This program is free software; you can redistribute it and/or modify
10
it under the terms of the GNU General Public License as published by
11
the Free Software Foundation; either version 2 of the License, or
12
(at your option) any later version.
13

    
14
This program is distributed in the hope that it will be useful, but
15
WITHOUT ANY WARRANTY; without even the implied warranty of
16
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
17
General Public License for more details.
18

    
19
You should have received a copy of the GNU General Public License
20
along with this program; if not, write to the Free Software
21
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22
02110-1301, USA.
23

    
24
-}
25

    
26
module Ganeti.Query.Server
27
  ( main
28
  , checkMain
29
  , prepMain
30
  ) where
31

    
32
import Control.Applicative
33
import Control.Concurrent
34
import Control.Exception
35
import Control.Monad (forever, when, zipWithM, liftM)
36
import Data.Bits (bitSize)
37
import qualified Data.Set as Set (toList)
38
import Data.IORef
39
import Data.Maybe (fromMaybe)
40
import qualified Text.JSON as J
41
import Text.JSON (encode, showJSON, JSValue(..))
42
import System.Info (arch)
43
import System.Directory
44

    
45
import qualified Ganeti.Constants as C
46
import qualified Ganeti.ConstantUtils as ConstantUtils (unFrozenSet)
47
import Ganeti.Errors
48
import qualified Ganeti.Path as Path
49
import Ganeti.Daemon
50
import Ganeti.Objects
51
import qualified Ganeti.Config as Config
52
import Ganeti.ConfigReader
53
import Ganeti.BasicTypes
54
import Ganeti.JQueue
55
import Ganeti.JQScheduler
56
import Ganeti.Logging
57
import Ganeti.Luxi
58
import qualified Ganeti.Query.Language as Qlang
59
import qualified Ganeti.Query.Cluster as QCluster
60
import Ganeti.Path (queueDir, jobQueueLockFile, jobQueueDrainFile)
61
import Ganeti.Rpc
62
import Ganeti.Query.Query
63
import Ganeti.Query.Filter (makeSimpleFilter)
64
import Ganeti.Types
65
import qualified Ganeti.UDSServer as U (Handler(..), listener)
66
import Ganeti.Utils (lockFile, exitIfBad, watchFile)
67
import qualified Ganeti.Version as Version
68

    
69
-- | Helper for classic queries.
70
handleClassicQuery :: ConfigData      -- ^ Cluster config
71
                   -> Qlang.ItemType  -- ^ Query type
72
                   -> [Either String Integer] -- ^ Requested names
73
                                              -- (empty means all)
74
                   -> [String]        -- ^ Requested fields
75
                   -> Bool            -- ^ Whether to do sync queries or not
76
                   -> IO (GenericResult GanetiException JSValue)
77
handleClassicQuery _ _ _ _ True =
78
  return . Bad $ OpPrereqError "Sync queries are not allowed" ECodeInval
79
handleClassicQuery cfg qkind names fields _ = do
80
  let flt = makeSimpleFilter (nameField qkind) names
81
  qr <- query cfg True (Qlang.Query qkind fields flt)
82
  return $ showJSON <$> (qr >>= queryCompat)
83

    
84
-- | Minimal wrapper to handle the missing config case.
85
handleCallWrapper :: MVar () -> JQStatus ->  Result ConfigData
86
                     -> LuxiOp -> IO (ErrorResult JSValue)
87
handleCallWrapper _ _ (Bad msg) _ =
88
  return . Bad . ConfigurationError $
89
           "I do not have access to a valid configuration, cannot\
90
           \ process queries: " ++ msg
91
handleCallWrapper qlock qstat (Ok config) op = handleCall qlock qstat config op
92

    
93
-- | Actual luxi operation handler.
94
handleCall :: MVar () -> JQStatus
95
              -> ConfigData -> LuxiOp -> IO (ErrorResult JSValue)
96
handleCall _ _ cdata QueryClusterInfo =
97
  let cluster = configCluster cdata
98
      master = QCluster.clusterMasterNodeName cdata
99
      hypervisors = clusterEnabledHypervisors cluster
100
      diskTemplates = clusterEnabledDiskTemplates cluster
101
      def_hv = case hypervisors of
102
                 x:_ -> showJSON x
103
                 [] -> JSNull
104
      bits = show (bitSize (0::Int)) ++ "bits"
105
      arch_tuple = [bits, arch]
106
      obj = [ ("software_version", showJSON C.releaseVersion)
107
            , ("protocol_version", showJSON C.protocolVersion)
108
            , ("config_version", showJSON C.configVersion)
109
            , ("os_api_version", showJSON . maximum .
110
                                 Set.toList . ConstantUtils.unFrozenSet $
111
                                 C.osApiVersions)
112
            , ("export_version", showJSON C.exportVersion)
113
            , ("vcs_version", showJSON Version.version)
114
            , ("architecture", showJSON arch_tuple)
115
            , ("name", showJSON $ clusterClusterName cluster)
116
            , ("master", showJSON (case master of
117
                                     Ok name -> name
118
                                     _ -> undefined))
119
            , ("default_hypervisor", def_hv)
120
            , ("enabled_hypervisors", showJSON hypervisors)
121
            , ("hvparams", showJSON $ clusterHvparams cluster)
122
            , ("os_hvp", showJSON $ clusterOsHvp cluster)
123
            , ("beparams", showJSON $ clusterBeparams cluster)
124
            , ("osparams", showJSON $ clusterOsparams cluster)
125
            , ("ipolicy", showJSON $ clusterIpolicy cluster)
126
            , ("nicparams", showJSON $ clusterNicparams cluster)
127
            , ("ndparams", showJSON $ clusterNdparams cluster)
128
            , ("diskparams", showJSON $ clusterDiskparams cluster)
129
            , ("candidate_pool_size",
130
               showJSON $ clusterCandidatePoolSize cluster)
131
            , ("master_netdev",  showJSON $ clusterMasterNetdev cluster)
132
            , ("master_netmask", showJSON $ clusterMasterNetmask cluster)
133
            , ("use_external_mip_script",
134
               showJSON $ clusterUseExternalMipScript cluster)
135
            , ("volume_group_name",
136
               maybe JSNull showJSON (clusterVolumeGroupName cluster))
137
            , ("drbd_usermode_helper",
138
               maybe JSNull showJSON (clusterDrbdUsermodeHelper cluster))
139
            , ("file_storage_dir", showJSON $ clusterFileStorageDir cluster)
140
            , ("shared_file_storage_dir",
141
               showJSON $ clusterSharedFileStorageDir cluster)
142
            , ("gluster_storage_dir",
143
               showJSON $ clusterGlusterStorageDir cluster)
144
            , ("maintain_node_health",
145
               showJSON $ clusterMaintainNodeHealth cluster)
146
            , ("ctime", showJSON $ clusterCtime cluster)
147
            , ("mtime", showJSON $ clusterMtime cluster)
148
            , ("uuid", showJSON $ clusterUuid cluster)
149
            , ("tags", showJSON $ clusterTags cluster)
150
            , ("uid_pool", showJSON $ clusterUidPool cluster)
151
            , ("default_iallocator",
152
               showJSON $ clusterDefaultIallocator cluster)
153
            , ("default_iallocator_params",
154
              showJSON $ clusterDefaultIallocatorParams cluster)
155
            , ("reserved_lvs", showJSON $ clusterReservedLvs cluster)
156
            , ("primary_ip_version",
157
               showJSON . ipFamilyToVersion $ clusterPrimaryIpFamily cluster)
158
            , ("prealloc_wipe_disks",
159
               showJSON $ clusterPreallocWipeDisks cluster)
160
            , ("hidden_os", showJSON $ clusterHiddenOs cluster)
161
            , ("blacklisted_os", showJSON $ clusterBlacklistedOs cluster)
162
            , ("enabled_disk_templates", showJSON diskTemplates)
163
            ]
164

    
165
  in case master of
166
    Ok _ -> return . Ok . J.makeObj $ obj
167
    Bad ex -> return $ Bad ex
168

    
169
handleCall _ _ cfg (QueryTags kind name) = do
170
  let tags = case kind of
171
               TagKindCluster  -> Ok . clusterTags $ configCluster cfg
172
               TagKindGroup    -> groupTags <$> Config.getGroup    cfg name
173
               TagKindNode     -> nodeTags  <$> Config.getNode     cfg name
174
               TagKindInstance -> instTags  <$> Config.getInstance cfg name
175
               TagKindNetwork  -> Bad $ OpPrereqError
176
                                        "Network tag is not allowed"
177
                                        ECodeInval
178
  return (J.showJSON <$> tags)
179

    
180
handleCall _ _ cfg (Query qkind qfields qfilter) = do
181
  result <- query cfg True (Qlang.Query qkind qfields qfilter)
182
  return $ J.showJSON <$> result
183

    
184
handleCall _ _ _ (QueryFields qkind qfields) = do
185
  let result = queryFields (Qlang.QueryFields qkind qfields)
186
  return $ J.showJSON <$> result
187

    
188
handleCall _ _ cfg (QueryNodes names fields lock) =
189
  handleClassicQuery cfg (Qlang.ItemTypeOpCode Qlang.QRNode)
190
    (map Left names) fields lock
191

    
192
handleCall _ _ cfg (QueryInstances names fields lock) =
193
  handleClassicQuery cfg (Qlang.ItemTypeOpCode Qlang.QRInstance)
194
    (map Left names) fields lock
195

    
196
handleCall _ _ cfg (QueryGroups names fields lock) =
197
  handleClassicQuery cfg (Qlang.ItemTypeOpCode Qlang.QRGroup)
198
    (map Left names) fields lock
199

    
200
handleCall _ _ cfg (QueryJobs names fields) =
201
  handleClassicQuery cfg (Qlang.ItemTypeLuxi Qlang.QRJob)
202
    (map (Right . fromIntegral . fromJobId) names)  fields False
203

    
204
handleCall _ _ cfg (QueryNetworks names fields lock) =
205
  handleClassicQuery cfg (Qlang.ItemTypeOpCode Qlang.QRNetwork)
206
    (map Left names) fields lock
207

    
208
handleCall _ _ cfg (QueryConfigValues fields) = do
209
  let params = [ ("cluster_name", return . showJSON . clusterClusterName
210
                                    . configCluster $ cfg)
211
               , ("watcher_pause", liftM (maybe JSNull showJSON)
212
                                     QCluster.isWatcherPaused)
213
               , ("master_node", return . genericResult (const JSNull) showJSON
214
                                   $ QCluster.clusterMasterNodeName cfg)
215
               , ("drain_flag", liftM (showJSON . not) isQueueOpen)
216
               ] :: [(String, IO JSValue)]
217
  let answer = map (fromMaybe (return JSNull) . flip lookup params) fields
218
  answerEval <- sequence answer
219
  return . Ok . showJSON $ answerEval
220

    
221
handleCall qlock qstat cfg (SubmitJobToDrainedQueue ops) =
222
  do
223
    let mcs = Config.getMasterCandidates cfg
224
    jobid <- allocateJobId mcs qlock
225
    case jobid of
226
      Bad s -> return . Bad . GenericError $ s
227
      Ok jid -> do
228
        ts <- currentTimestamp
229
        job <- liftM (setReceivedTimestamp ts)
230
                 $ queuedJobFromOpCodes jid ops
231
        qDir <- queueDir
232
        write_result <- writeJobToDisk qDir job
233
        case write_result of
234
          Bad s -> return . Bad . GenericError $ s
235
          Ok () -> do
236
            _ <- replicateManyJobs qDir mcs [job]
237
            _ <- forkIO $ enqueueNewJobs qstat [job]
238
            return . Ok . showJSON . fromJobId $ jid
239

    
240
handleCall qlock qstat cfg (SubmitJob ops) =
241
  do
242
    open <- isQueueOpen
243
    if not open
244
       then return . Bad . GenericError $ "Queue drained"
245
       else handleCall qlock qstat cfg (SubmitJobToDrainedQueue ops)
246

    
247
handleCall qlock qstat cfg (SubmitManyJobs lops) =
248
  do
249
    open <- isQueueOpen
250
    if not open
251
      then return . Bad . GenericError $ "Queue drained"
252
      else do
253
        let mcs = Config.getMasterCandidates cfg
254
        result_jobids <- allocateJobIds mcs qlock (length lops)
255
        case result_jobids of
256
          Bad s -> return . Bad . GenericError $ s
257
          Ok jids -> do
258
            ts <- currentTimestamp
259
            jobs <- liftM (map $ setReceivedTimestamp ts)
260
                      $ zipWithM queuedJobFromOpCodes jids lops
261
            qDir <- queueDir
262
            write_results <- mapM (writeJobToDisk qDir) jobs
263
            let annotated_results = zip write_results jobs
264
                succeeded = map snd $ filter (isOk . fst) annotated_results
265
            when (any isBad write_results) . logWarning
266
              $ "Writing some jobs failed " ++ show annotated_results
267
            replicateManyJobs qDir mcs succeeded
268
            _ <- forkIO $ enqueueNewJobs qstat succeeded
269
            return . Ok . JSArray
270
              . map (\(res, job) ->
271
                      if isOk res
272
                        then showJSON (True, fromJobId $ qjId job)
273
                        else showJSON (False, genericResult id (const "") res))
274
              $ annotated_results
275

    
276
handleCall _ _ cfg (WaitForJobChange jid fields prev_job prev_log tmout) = do
277
  let compute_fn = computeJobUpdate cfg jid fields prev_log
278
  qDir <- queueDir
279
  -- verify if the job is finalized, and return immediately in this case
280
  jobresult <- loadJobFromDisk qDir False jid
281
  case jobresult of
282
    Ok (job, _) | not (jobFinalized job) -> do
283
      let jobfile = liveJobFile qDir jid
284
      answer <- watchFile jobfile (min tmout C.luxiWfjcTimeout)
285
                  (prev_job, JSArray []) compute_fn
286
      return . Ok $ showJSON answer
287
    _ -> liftM (Ok . showJSON) compute_fn
288

    
289
handleCall _ _ cfg (SetWatcherPause time) = do
290
  let mcs = Config.getMasterCandidates cfg
291
      masters = genericResult (const []) return
292
                  . Config.getNode cfg . clusterMasterNode
293
                  $ configCluster cfg
294
  _ <- executeRpcCall (masters ++ mcs) $ RpcCallSetWatcherPause time
295
  return . Ok . maybe JSNull showJSON $ time
296

    
297
handleCall _ _ cfg (SetDrainFlag value) = do
298
  let mcs = Config.getMasterCandidates cfg
299
  fpath <- jobQueueDrainFile
300
  if value
301
     then writeFile fpath ""
302
     else removeFile fpath
303
  _ <- executeRpcCall mcs $ RpcCallSetDrainFlag value
304
  return . Ok . showJSON $ True
305

    
306
handleCall _ qstat  cfg (CancelJob jid) = do
307
  let jName = (++) "job " . show $ fromJobId jid
308
  dequeueResult <- dequeueJob qstat jid
309
  case dequeueResult of
310
    Ok True -> do
311
      logDebug $ jName ++ " dequeued, marking as canceled"
312
      qDir <- queueDir
313
      readResult <- loadJobFromDisk qDir True jid
314
      let jobFileFailed = return . Ok . showJSON . (,) False
315
                            . (++) ("Dequeued " ++ jName
316
                                    ++ ", but failed to mark as cancelled: ")
317
                          :: String -> IO (ErrorResult JSValue)
318
      case readResult of
319
        Bad s -> jobFileFailed s
320
        Ok (job, _) -> do
321
          now <- currentTimestamp
322
          let job' = cancelQueuedJob now job
323
              mcs = Config.getMasterCandidates cfg
324
          write_result <- writeJobToDisk qDir job'
325
          case write_result of
326
            Bad s -> jobFileFailed s
327
            Ok () -> do
328
              replicateManyJobs qDir mcs [job']
329
              return . Ok . showJSON $ (True, "Dequeued " ++ jName)
330
    Ok False -> do
331
      logDebug $ jName ++ " not queued; trying to cancel directly"
332
      cancelJob jid
333
    Bad s -> return . Ok . showJSON $ (False, s)
334

    
335
handleCall _ _ _ op =
336
  return . Bad $
337
    GenericError ("Luxi call '" ++ strOfOp op ++ "' not implemented")
338

    
339
{-# ANN handleCall "HLint: ignore Too strict if" #-}
340

    
341
-- | Query the status of a job and return the requested fields
342
-- and the logs newer than the given log number.
343
computeJobUpdate :: ConfigData -> JobId -> [String] -> JSValue
344
                    -> IO (JSValue, JSValue)
345
computeJobUpdate cfg jid fields prev_log = do
346
  let sjid = show $ fromJobId jid
347
  logDebug $ "Inspecting fields " ++ show fields ++ " of job " ++ sjid
348
  let fromJSArray (JSArray xs) = xs
349
      fromJSArray _ = []
350
  let logFilter JSNull (JSArray _) = True
351
      logFilter (JSRational _ n) (JSArray (JSRational _ m:_)) = n < m
352
      logFilter _ _ = False
353
  let filterLogs n logs = JSArray (filter (logFilter n) (logs >>= fromJSArray))
354
  jobQuery <- handleClassicQuery cfg (Qlang.ItemTypeLuxi Qlang.QRJob)
355
                [Right . fromIntegral $ fromJobId jid] ("oplog" : fields) False
356
  let (rfields, rlogs) = case jobQuery of
357
        Ok (JSArray [JSArray (JSArray logs : answer)]) ->
358
          (answer, filterLogs prev_log logs)
359
        _ -> (map (const JSNull) fields, JSArray [])
360
  logDebug $ "Updates for job " ++ sjid ++ " are " ++ encode (rfields, rlogs)
361
  return (JSArray rfields, rlogs)
362

    
363

    
364
type LuxiConfig = (MVar (), JQStatus, ConfigReader)
365

    
366
luxiExec
367
    :: LuxiConfig
368
    -> LuxiOp
369
    -> IO (Bool, GenericResult GanetiException JSValue)
370
luxiExec (qlock, qstat, creader) args = do
371
  cfg <- creader
372
  result <- handleCallWrapper qlock qstat cfg args
373
  return (True, result)
374

    
375
luxiHandler :: LuxiConfig -> U.Handler LuxiOp JSValue
376
luxiHandler cfg = U.Handler { U.hParse         = decodeLuxiCall
377
                            , U.hInputLogShort = strOfOp
378
                            , U.hInputLogLong  = show
379
                            , U.hExec          = luxiExec cfg
380
                            }
381

    
382
-- | Type alias for prepMain results
383
type PrepResult = (Server, IORef (Result ConfigData), JQStatus)
384

    
385
-- | Check function for luxid.
386
checkMain :: CheckFn ()
387
checkMain _ = return $ Right ()
388

    
389
-- | Prepare function for luxid.
390
prepMain :: PrepFn () PrepResult
391
prepMain _ _ = do
392
  socket_path <- Path.defaultQuerySocket
393
  cleanupSocket socket_path
394
  s <- describeError "binding to the Luxi socket"
395
         Nothing (Just socket_path) $ getLuxiServer True socket_path
396
  cref <- newIORef (Bad "Configuration not yet loaded")
397
  jq <- emptyJQStatus
398
  return (s, cref, jq)
399

    
400
-- | Main function.
401
main :: MainFn () PrepResult
402
main _ _ (server, cref, jq) = do
403
  initConfigReader id cref
404
  let creader = readIORef cref
405
  initJQScheduler jq
406

    
407
  qlockFile <- jobQueueLockFile
408
  lockFile qlockFile >>= exitIfBad "Failed to obtain the job-queue lock"
409
  qlock <- newMVar ()
410

    
411
  finally
412
    (forever $ U.listener (luxiHandler (qlock, jq, creader)) server)
413
    (closeServer server)