Statistics
| Branch: | Tag: | Revision:

root / src / Ganeti / Query / Server.hs @ 560ef132

History | View | Annotate | Download (19.2 kB)

1
{-| Implementation of the Ganeti Query2 server.
2

    
3
-}
4

    
5
{-
6

    
7
Copyright (C) 2012, 2013 Google Inc.
8

    
9
This program is free software; you can redistribute it and/or modify
10
it under the terms of the GNU General Public License as published by
11
the Free Software Foundation; either version 2 of the License, or
12
(at your option) any later version.
13

    
14
This program is distributed in the hope that it will be useful, but
15
WITHOUT ANY WARRANTY; without even the implied warranty of
16
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
17
General Public License for more details.
18

    
19
You should have received a copy of the GNU General Public License
20
along with this program; if not, write to the Free Software
21
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22
02110-1301, USA.
23

    
24
-}
25

    
26
module Ganeti.Query.Server
27
  ( main
28
  , checkMain
29
  , prepMain
30
  ) where
31

    
32
import Control.Applicative
33
import Control.Concurrent
34
import Control.Exception
35
import Control.Monad (forever, when, zipWithM, liftM)
36
import Control.Monad.IO.Class
37
import Data.Bits (bitSize)
38
import qualified Data.Set as Set (toList)
39
import Data.IORef
40
import Data.Maybe (fromMaybe)
41
import qualified Text.JSON as J
42
import Text.JSON (encode, showJSON, JSValue(..))
43
import System.Info (arch)
44
import System.Directory
45

    
46
import qualified Ganeti.Constants as C
47
import qualified Ganeti.ConstantUtils as ConstantUtils (unFrozenSet)
48
import Ganeti.Errors
49
import qualified Ganeti.Path as Path
50
import Ganeti.Daemon
51
import Ganeti.Objects
52
import qualified Ganeti.Config as Config
53
import Ganeti.ConfigReader
54
import Ganeti.BasicTypes
55
import Ganeti.JQueue
56
import Ganeti.JQScheduler
57
import Ganeti.JSON (TimeAsDoubleJSON(..))
58
import Ganeti.Logging
59
import Ganeti.Luxi
60
import qualified Ganeti.Query.Language as Qlang
61
import qualified Ganeti.Query.Cluster as QCluster
62
import Ganeti.Path ( queueDir, jobQueueLockFile, jobQueueDrainFile
63
                   , defaultMasterSocket)
64
import Ganeti.Rpc
65
import Ganeti.Query.Query
66
import Ganeti.Query.Filter (makeSimpleFilter)
67
import Ganeti.Types
68
import qualified Ganeti.UDSServer as U (Handler(..), listener)
69
import Ganeti.Utils (lockFile, exitIfBad, watchFile, safeRenameFile)
70
import qualified Ganeti.Version as Version
71

    
72
-- | Helper for classic queries.
73
handleClassicQuery :: ConfigData      -- ^ Cluster config
74
                   -> Qlang.ItemType  -- ^ Query type
75
                   -> [Either String Integer] -- ^ Requested names
76
                                              -- (empty means all)
77
                   -> [String]        -- ^ Requested fields
78
                   -> Bool            -- ^ Whether to do sync queries or not
79
                   -> IO (GenericResult GanetiException JSValue)
80
handleClassicQuery _ _ _ _ True =
81
  return . Bad $ OpPrereqError "Sync queries are not allowed" ECodeInval
82
handleClassicQuery cfg qkind names fields _ = do
83
  let flt = makeSimpleFilter (nameField qkind) names
84
  qr <- query cfg True (Qlang.Query qkind fields flt)
85
  return $ showJSON <$> (qr >>= queryCompat)
86

    
87
-- | Minimal wrapper to handle the missing config case.
88
handleCallWrapper :: MVar () -> JQStatus ->  Result ConfigData
89
                     -> LuxiOp -> IO (ErrorResult JSValue)
90
handleCallWrapper _ _ (Bad msg) _ =
91
  return . Bad . ConfigurationError $
92
           "I do not have access to a valid configuration, cannot\
93
           \ process queries: " ++ msg
94
handleCallWrapper qlock qstat (Ok config) op = handleCall qlock qstat config op
95

    
96
-- | Actual luxi operation handler.
97
handleCall :: MVar () -> JQStatus
98
              -> ConfigData -> LuxiOp -> IO (ErrorResult JSValue)
99
handleCall _ _ cdata QueryClusterInfo =
100
  let cluster = configCluster cdata
101
      master = QCluster.clusterMasterNodeName cdata
102
      hypervisors = clusterEnabledHypervisors cluster
103
      diskTemplates = clusterEnabledDiskTemplates cluster
104
      def_hv = case hypervisors of
105
                 x:_ -> showJSON x
106
                 [] -> JSNull
107
      bits = show (bitSize (0::Int)) ++ "bits"
108
      arch_tuple = [bits, arch]
109
      obj = [ ("software_version", showJSON C.releaseVersion)
110
            , ("protocol_version", showJSON C.protocolVersion)
111
            , ("config_version", showJSON C.configVersion)
112
            , ("os_api_version", showJSON . maximum .
113
                                 Set.toList . ConstantUtils.unFrozenSet $
114
                                 C.osApiVersions)
115
            , ("export_version", showJSON C.exportVersion)
116
            , ("vcs_version", showJSON Version.version)
117
            , ("architecture", showJSON arch_tuple)
118
            , ("name", showJSON $ clusterClusterName cluster)
119
            , ("master", showJSON (case master of
120
                                     Ok name -> name
121
                                     _ -> undefined))
122
            , ("default_hypervisor", def_hv)
123
            , ("enabled_hypervisors", showJSON hypervisors)
124
            , ("hvparams", showJSON $ clusterHvparams cluster)
125
            , ("os_hvp", showJSON $ clusterOsHvp cluster)
126
            , ("beparams", showJSON $ clusterBeparams cluster)
127
            , ("osparams", showJSON $ clusterOsparams cluster)
128
            , ("ipolicy", showJSON $ clusterIpolicy cluster)
129
            , ("nicparams", showJSON $ clusterNicparams cluster)
130
            , ("ndparams", showJSON $ clusterNdparams cluster)
131
            , ("diskparams", showJSON $ clusterDiskparams cluster)
132
            , ("candidate_pool_size",
133
               showJSON $ clusterCandidatePoolSize cluster)
134
            , ("max_running_jobs",
135
               showJSON $ clusterMaxRunningJobs cluster)
136
            , ("master_netdev",  showJSON $ clusterMasterNetdev cluster)
137
            , ("master_netmask", showJSON $ clusterMasterNetmask cluster)
138
            , ("use_external_mip_script",
139
               showJSON $ clusterUseExternalMipScript cluster)
140
            , ("volume_group_name",
141
               maybe JSNull showJSON (clusterVolumeGroupName cluster))
142
            , ("drbd_usermode_helper",
143
               maybe JSNull showJSON (clusterDrbdUsermodeHelper cluster))
144
            , ("file_storage_dir", showJSON $ clusterFileStorageDir cluster)
145
            , ("shared_file_storage_dir",
146
               showJSON $ clusterSharedFileStorageDir cluster)
147
            , ("gluster_storage_dir",
148
               showJSON $ clusterGlusterStorageDir cluster)
149
            , ("maintain_node_health",
150
               showJSON $ clusterMaintainNodeHealth cluster)
151
            , ("ctime", showJSON . TimeAsDoubleJSON $ clusterCtime cluster)
152
            , ("mtime", showJSON . TimeAsDoubleJSON $ clusterMtime cluster)
153
            , ("uuid", showJSON $ clusterUuid cluster)
154
            , ("tags", showJSON $ clusterTags cluster)
155
            , ("uid_pool", showJSON $ clusterUidPool cluster)
156
            , ("default_iallocator",
157
               showJSON $ clusterDefaultIallocator cluster)
158
            , ("default_iallocator_params",
159
              showJSON $ clusterDefaultIallocatorParams cluster)
160
            , ("reserved_lvs", showJSON $ clusterReservedLvs cluster)
161
            , ("primary_ip_version",
162
               showJSON . ipFamilyToVersion $ clusterPrimaryIpFamily cluster)
163
            , ("prealloc_wipe_disks",
164
               showJSON $ clusterPreallocWipeDisks cluster)
165
            , ("hidden_os", showJSON $ clusterHiddenOs cluster)
166
            , ("blacklisted_os", showJSON $ clusterBlacklistedOs cluster)
167
            , ("enabled_disk_templates", showJSON diskTemplates)
168
            ]
169

    
170
  in case master of
171
    Ok _ -> return . Ok . J.makeObj $ obj
172
    Bad ex -> return $ Bad ex
173

    
174
handleCall _ _ cfg (QueryTags kind name) = do
175
  let tags = case kind of
176
               TagKindCluster  -> Ok . clusterTags $ configCluster cfg
177
               TagKindGroup    -> groupTags   <$> Config.getGroup    cfg name
178
               TagKindNode     -> nodeTags    <$> Config.getNode     cfg name
179
               TagKindInstance -> instTags    <$> Config.getInstance cfg name
180
               TagKindNetwork  -> networkTags <$> Config.getNetwork  cfg name
181
  return (J.showJSON <$> tags)
182

    
183
handleCall _ _ cfg (Query qkind qfields qfilter) = do
184
  result <- query cfg True (Qlang.Query qkind qfields qfilter)
185
  return $ J.showJSON <$> result
186

    
187
handleCall _ _ _ (QueryFields qkind qfields) = do
188
  let result = queryFields (Qlang.QueryFields qkind qfields)
189
  return $ J.showJSON <$> result
190

    
191
handleCall _ _ cfg (QueryNodes names fields lock) =
192
  handleClassicQuery cfg (Qlang.ItemTypeOpCode Qlang.QRNode)
193
    (map Left names) fields lock
194

    
195
handleCall _ _ cfg (QueryInstances names fields lock) =
196
  handleClassicQuery cfg (Qlang.ItemTypeOpCode Qlang.QRInstance)
197
    (map Left names) fields lock
198

    
199
handleCall _ _ cfg (QueryGroups names fields lock) =
200
  handleClassicQuery cfg (Qlang.ItemTypeOpCode Qlang.QRGroup)
201
    (map Left names) fields lock
202

    
203
handleCall _ _ cfg (QueryJobs names fields) =
204
  handleClassicQuery cfg (Qlang.ItemTypeLuxi Qlang.QRJob)
205
    (map (Right . fromIntegral . fromJobId) names)  fields False
206

    
207
handleCall _ _ cfg (QueryNetworks names fields lock) =
208
  handleClassicQuery cfg (Qlang.ItemTypeOpCode Qlang.QRNetwork)
209
    (map Left names) fields lock
210

    
211
handleCall _ _ cfg (QueryConfigValues fields) = do
212
  let params = [ ("cluster_name", return . showJSON . clusterClusterName
213
                                    . configCluster $ cfg)
214
               , ("watcher_pause", liftM (maybe JSNull showJSON)
215
                                     QCluster.isWatcherPaused)
216
               , ("master_node", return . genericResult (const JSNull) showJSON
217
                                   $ QCluster.clusterMasterNodeName cfg)
218
               , ("drain_flag", liftM (showJSON . not) isQueueOpen)
219
               ] :: [(String, IO JSValue)]
220
  let answer = map (fromMaybe (return JSNull) . flip lookup params) fields
221
  answerEval <- sequence answer
222
  return . Ok . showJSON $ answerEval
223

    
224
handleCall _ _ cfg (QueryExports nodes lock) =
225
  handleClassicQuery cfg (Qlang.ItemTypeOpCode Qlang.QRExport)
226
    (map Left nodes) ["node", "export"] lock
227

    
228
handleCall qlock qstat cfg (SubmitJobToDrainedQueue ops) = runResultT $ do
229
    let mcs = Config.getMasterCandidates cfg
230
    jid <- mkResultT $ allocateJobId mcs qlock
231
    ts <- liftIO currentTimestamp
232
    job <- liftM (setReceivedTimestamp ts)
233
             $ queuedJobFromOpCodes jid ops
234
    qDir <- liftIO queueDir
235
    mkResultT $ writeJobToDisk qDir job
236
    liftIO $ replicateManyJobs qDir mcs [job]
237
    _ <- liftIO . forkIO $ enqueueNewJobs qstat [job]
238
    return . showJSON . fromJobId $ jid
239

    
240
handleCall qlock qstat cfg (SubmitJob ops) =
241
  do
242
    open <- isQueueOpen
243
    if not open
244
       then return . Bad . GenericError $ "Queue drained"
245
       else handleCall qlock qstat cfg (SubmitJobToDrainedQueue ops)
246

    
247
handleCall qlock qstat cfg (SubmitManyJobs lops) =
248
  do
249
    open <- isQueueOpen
250
    if not open
251
      then return . Bad . GenericError $ "Queue drained"
252
      else do
253
        let mcs = Config.getMasterCandidates cfg
254
        result_jobids <- allocateJobIds mcs qlock (length lops)
255
        case result_jobids of
256
          Bad s -> return . Bad . GenericError $ s
257
          Ok jids -> do
258
            ts <- currentTimestamp
259
            jobs <- liftM (map $ setReceivedTimestamp ts)
260
                      $ zipWithM queuedJobFromOpCodes jids lops
261
            qDir <- queueDir
262
            write_results <- mapM (writeJobToDisk qDir) jobs
263
            let annotated_results = zip write_results jobs
264
                succeeded = map snd $ filter (isOk . fst) annotated_results
265
            when (any isBad write_results) . logWarning
266
              $ "Writing some jobs failed " ++ show annotated_results
267
            replicateManyJobs qDir mcs succeeded
268
            _ <- forkIO $ enqueueNewJobs qstat succeeded
269
            return . Ok . JSArray
270
              . map (\(res, job) ->
271
                      if isOk res
272
                        then showJSON (True, fromJobId $ qjId job)
273
                        else showJSON (False, genericResult id (const "") res))
274
              $ annotated_results
275

    
276
handleCall _ _ cfg (WaitForJobChange jid fields prev_job prev_log tmout) = do
277
  let compute_fn = computeJobUpdate cfg jid fields prev_log
278
  qDir <- queueDir
279
  -- verify if the job is finalized, and return immediately in this case
280
  jobresult <- loadJobFromDisk qDir False jid
281
  case jobresult of
282
    Ok (job, _) | not (jobFinalized job) -> do
283
      let jobfile = liveJobFile qDir jid
284
      answer <- watchFile jobfile (min tmout C.luxiWfjcTimeout)
285
                  (prev_job, JSArray []) compute_fn
286
      return . Ok $ showJSON answer
287
    _ -> liftM (Ok . showJSON) compute_fn
288

    
289
handleCall _ _ cfg (SetWatcherPause time) = do
290
  let mcs = Config.getMasterCandidates cfg
291
      masters = genericResult (const []) return
292
                  . Config.getNode cfg . clusterMasterNode
293
                  $ configCluster cfg
294
  _ <- executeRpcCall (masters ++ mcs) $ RpcCallSetWatcherPause time
295
  return . Ok . maybe JSNull showJSON $ fmap TimeAsDoubleJSON time
296

    
297
handleCall _ _ cfg (SetDrainFlag value) = do
298
  let mcs = Config.getMasterCandidates cfg
299
  fpath <- jobQueueDrainFile
300
  if value
301
     then writeFile fpath ""
302
     else removeFile fpath
303
  _ <- executeRpcCall mcs $ RpcCallSetDrainFlag value
304
  return . Ok . showJSON $ True
305

    
306
handleCall _ qstat cfg (ChangeJobPriority jid prio) = do
307
  maybeJob <- setJobPriority qstat jid prio
308
  case maybeJob of
309
    Bad s -> return . Ok $ showJSON (False, s)
310
    Ok (Just job) -> runResultT $ do
311
      let mcs = Config.getMasterCandidates cfg
312
      qDir <- liftIO queueDir
313
      liftIO $ replicateManyJobs qDir mcs [job]
314
      return $ showJSON (True, "Priorities of pending opcodes for job "
315
                               ++ show (fromJobId jid) ++ " have been changed"
316
                               ++ " to " ++ show prio)
317
    Ok Nothing -> runResultT $ do
318
      -- Job has already started; so we have to forward the request
319
      -- to the job, currently handled by masterd.
320
      socketpath <- liftIO defaultMasterSocket
321
      cl <- liftIO $ getLuxiClient socketpath
322
      ResultT $ callMethod (ChangeJobPriority jid prio) cl
323

    
324
handleCall _ qstat  cfg (CancelJob jid) = do
325
  let jName = (++) "job " . show $ fromJobId jid
326
  dequeueResult <- dequeueJob qstat jid
327
  case dequeueResult of
328
    Ok True -> do
329
      logDebug $ jName ++ " dequeued, marking as canceled"
330
      qDir <- queueDir
331
      readResult <- loadJobFromDisk qDir True jid
332
      let jobFileFailed = return . Ok . showJSON . (,) False
333
                            . (++) ("Dequeued " ++ jName
334
                                    ++ ", but failed to mark as cancelled: ")
335
                          :: String -> IO (ErrorResult JSValue)
336
      case readResult of
337
        Bad s -> jobFileFailed s
338
        Ok (job, _) -> do
339
          now <- currentTimestamp
340
          let job' = cancelQueuedJob now job
341
              mcs = Config.getMasterCandidates cfg
342
          write_result <- writeJobToDisk qDir job'
343
          case write_result of
344
            Bad s -> jobFileFailed s
345
            Ok () -> do
346
              replicateManyJobs qDir mcs [job']
347
              return . Ok . showJSON $ (True, "Dequeued " ++ jName)
348
    Ok False -> do
349
      logDebug $ jName ++ " not queued; trying to cancel directly"
350
      cancelJob jid
351
    Bad s -> return . Ok . showJSON $ (False, s)
352

    
353
handleCall qlock _ cfg (ArchiveJob jid) = do
354
  let archiveFailed = putMVar qlock  () >> (return . Ok $ showJSON False)
355
                      :: IO (ErrorResult JSValue)
356
  qDir <- queueDir
357
  takeMVar qlock
358
  result <- loadJobFromDisk qDir False jid
359
  case result of
360
    Bad _ -> archiveFailed
361
    Ok (job, _) -> if jobFinalized job
362
                     then do
363
                       let mcs = Config.getMasterCandidates cfg
364
                           live = liveJobFile qDir jid
365
                           archive = archivedJobFile qDir jid
366
                       renameResult <- safeRenameFile queueDirPermissions
367
                                         live archive
368
                       putMVar qlock ()
369
                       case renameResult of
370
                         Bad s -> return . Bad . JobQueueError
371
                                    $ "Archiving failed in an unexpected way: "
372
                                        ++ s
373
                         Ok () -> do
374
                           _ <- executeRpcCall mcs
375
                                  $ RpcCallJobqueueRename [(live, archive)]
376
                           return . Ok $ showJSON True
377
                     else archiveFailed
378

    
379
handleCall qlock _ cfg (AutoArchiveJobs age timeout) = do
380
  qDir <- queueDir
381
  resultJids <- getJobIDs [qDir]
382
  case resultJids of
383
    Bad s -> return . Bad . JobQueueError $ show s
384
    Ok jids -> do
385
      result <- bracket_ (takeMVar qlock) (putMVar qlock ())
386
                  . archiveJobs cfg age timeout
387
                  $ sortJobIDs jids
388
      return . Ok $ showJSON result
389

    
390
handleCall _ _ _ (PickupJob _) =
391
  return . Bad
392
    $ GenericError "Luxi call 'PickupJob' is for internal use only"
393

    
394
{-# ANN handleCall "HLint: ignore Too strict if" #-}
395

    
396
-- | Query the status of a job and return the requested fields
397
-- and the logs newer than the given log number.
398
computeJobUpdate :: ConfigData -> JobId -> [String] -> JSValue
399
                    -> IO (JSValue, JSValue)
400
computeJobUpdate cfg jid fields prev_log = do
401
  let sjid = show $ fromJobId jid
402
  logDebug $ "Inspecting fields " ++ show fields ++ " of job " ++ sjid
403
  let fromJSArray (JSArray xs) = xs
404
      fromJSArray _ = []
405
  let logFilter JSNull (JSArray _) = True
406
      logFilter (JSRational _ n) (JSArray (JSRational _ m:_)) = n < m
407
      logFilter _ _ = False
408
  let filterLogs n logs = JSArray (filter (logFilter n) (logs >>= fromJSArray))
409
  jobQuery <- handleClassicQuery cfg (Qlang.ItemTypeLuxi Qlang.QRJob)
410
                [Right . fromIntegral $ fromJobId jid] ("oplog" : fields) False
411
  let (rfields, rlogs) = case jobQuery of
412
        Ok (JSArray [JSArray (JSArray logs : answer)]) ->
413
          (answer, filterLogs prev_log logs)
414
        _ -> (map (const JSNull) fields, JSArray [])
415
  logDebug $ "Updates for job " ++ sjid ++ " are " ++ encode (rfields, rlogs)
416
  return (JSArray rfields, rlogs)
417

    
418

    
419
type LuxiConfig = (MVar (), JQStatus, ConfigReader)
420

    
421
luxiExec
422
    :: LuxiConfig
423
    -> LuxiOp
424
    -> IO (Bool, GenericResult GanetiException JSValue)
425
luxiExec (qlock, qstat, creader) args = do
426
  cfg <- creader
427
  result <- handleCallWrapper qlock qstat cfg args
428
  return (True, result)
429

    
430
luxiHandler :: LuxiConfig -> U.Handler LuxiOp JSValue
431
luxiHandler cfg = U.Handler { U.hParse         = decodeLuxiCall
432
                            , U.hInputLogShort = strOfOp
433
                            , U.hInputLogLong  = show
434
                            , U.hExec          = luxiExec cfg
435
                            }
436

    
437
-- | Type alias for prepMain results
438
type PrepResult = (Server, IORef (Result ConfigData), JQStatus)
439

    
440
-- | Check function for luxid.
441
checkMain :: CheckFn ()
442
checkMain _ = return $ Right ()
443

    
444
-- | Prepare function for luxid.
445
prepMain :: PrepFn () PrepResult
446
prepMain _ _ = do
447
  socket_path <- Path.defaultQuerySocket
448
  cleanupSocket socket_path
449
  s <- describeError "binding to the Luxi socket"
450
         Nothing (Just socket_path) $ getLuxiServer True socket_path
451
  cref <- newIORef (Bad "Configuration not yet loaded")
452
  jq <- emptyJQStatus cref
453
  return (s, cref, jq)
454

    
455
-- | Main function.
456
main :: MainFn () PrepResult
457
main _ _ (server, cref, jq) = do
458
  initConfigReader id cref
459
  let creader = readIORef cref
460
  initJQScheduler jq
461

    
462
  qlockFile <- jobQueueLockFile
463
  lockFile qlockFile >>= exitIfBad "Failed to obtain the job-queue lock"
464
  qlock <- newMVar ()
465

    
466
  finally
467
    (forever $ U.listener (luxiHandler (qlock, jq, creader)) server)
468
    (closeServer server)