Statistics
| Branch: | Tag: | Revision:

root / src / Ganeti / Query / Server.hs @ 7711f32b

History | View | Annotate | Download (19.1 kB)

1
{-| Implementation of the Ganeti Query2 server.
2

    
3
-}
4

    
5
{-
6

    
7
Copyright (C) 2012, 2013 Google Inc.
8

    
9
This program is free software; you can redistribute it and/or modify
10
it under the terms of the GNU General Public License as published by
11
the Free Software Foundation; either version 2 of the License, or
12
(at your option) any later version.
13

    
14
This program is distributed in the hope that it will be useful, but
15
WITHOUT ANY WARRANTY; without even the implied warranty of
16
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
17
General Public License for more details.
18

    
19
You should have received a copy of the GNU General Public License
20
along with this program; if not, write to the Free Software
21
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22
02110-1301, USA.
23

    
24
-}
25

    
26
module Ganeti.Query.Server
27
  ( main
28
  , checkMain
29
  , prepMain
30
  ) where
31

    
32
import Control.Applicative
33
import Control.Concurrent
34
import Control.Exception
35
import Control.Monad (forever, when, zipWithM, liftM)
36
import Control.Monad.IO.Class
37
import Data.Bits (bitSize)
38
import qualified Data.Set as Set (toList)
39
import Data.IORef
40
import Data.Maybe (fromMaybe)
41
import qualified Text.JSON as J
42
import Text.JSON (encode, showJSON, JSValue(..))
43
import System.Info (arch)
44
import System.Directory
45

    
46
import qualified Ganeti.Constants as C
47
import qualified Ganeti.ConstantUtils as ConstantUtils (unFrozenSet)
48
import Ganeti.Errors
49
import qualified Ganeti.Path as Path
50
import Ganeti.Daemon
51
import Ganeti.Objects
52
import qualified Ganeti.Config as Config
53
import Ganeti.ConfigReader
54
import Ganeti.BasicTypes
55
import Ganeti.JQueue
56
import Ganeti.JQScheduler
57
import Ganeti.Logging
58
import Ganeti.Luxi
59
import qualified Ganeti.Query.Language as Qlang
60
import qualified Ganeti.Query.Cluster as QCluster
61
import Ganeti.Path ( queueDir, jobQueueLockFile, jobQueueDrainFile
62
                   , defaultMasterSocket)
63
import Ganeti.Rpc
64
import Ganeti.Query.Query
65
import Ganeti.Query.Filter (makeSimpleFilter)
66
import Ganeti.Types
67
import qualified Ganeti.UDSServer as U (Handler(..), listener)
68
import Ganeti.Utils (lockFile, exitIfBad, watchFile, safeRenameFile)
69
import qualified Ganeti.Version as Version
70

    
71
-- | Helper for classic queries.
72
handleClassicQuery :: ConfigData      -- ^ Cluster config
73
                   -> Qlang.ItemType  -- ^ Query type
74
                   -> [Either String Integer] -- ^ Requested names
75
                                              -- (empty means all)
76
                   -> [String]        -- ^ Requested fields
77
                   -> Bool            -- ^ Whether to do sync queries or not
78
                   -> IO (GenericResult GanetiException JSValue)
79
handleClassicQuery _ _ _ _ True =
80
  return . Bad $ OpPrereqError "Sync queries are not allowed" ECodeInval
81
handleClassicQuery cfg qkind names fields _ = do
82
  let flt = makeSimpleFilter (nameField qkind) names
83
  qr <- query cfg True (Qlang.Query qkind fields flt)
84
  return $ showJSON <$> (qr >>= queryCompat)
85

    
86
-- | Minimal wrapper to handle the missing config case.
87
handleCallWrapper :: MVar () -> JQStatus ->  Result ConfigData
88
                     -> LuxiOp -> IO (ErrorResult JSValue)
89
handleCallWrapper _ _ (Bad msg) _ =
90
  return . Bad . ConfigurationError $
91
           "I do not have access to a valid configuration, cannot\
92
           \ process queries: " ++ msg
93
handleCallWrapper qlock qstat (Ok config) op = handleCall qlock qstat config op
94

    
95
-- | Actual luxi operation handler.
96
handleCall :: MVar () -> JQStatus
97
              -> ConfigData -> LuxiOp -> IO (ErrorResult JSValue)
98
handleCall _ _ cdata QueryClusterInfo =
99
  let cluster = configCluster cdata
100
      master = QCluster.clusterMasterNodeName cdata
101
      hypervisors = clusterEnabledHypervisors cluster
102
      diskTemplates = clusterEnabledDiskTemplates cluster
103
      def_hv = case hypervisors of
104
                 x:_ -> showJSON x
105
                 [] -> JSNull
106
      bits = show (bitSize (0::Int)) ++ "bits"
107
      arch_tuple = [bits, arch]
108
      obj = [ ("software_version", showJSON C.releaseVersion)
109
            , ("protocol_version", showJSON C.protocolVersion)
110
            , ("config_version", showJSON C.configVersion)
111
            , ("os_api_version", showJSON . maximum .
112
                                 Set.toList . ConstantUtils.unFrozenSet $
113
                                 C.osApiVersions)
114
            , ("export_version", showJSON C.exportVersion)
115
            , ("vcs_version", showJSON Version.version)
116
            , ("architecture", showJSON arch_tuple)
117
            , ("name", showJSON $ clusterClusterName cluster)
118
            , ("master", showJSON (case master of
119
                                     Ok name -> name
120
                                     _ -> undefined))
121
            , ("default_hypervisor", def_hv)
122
            , ("enabled_hypervisors", showJSON hypervisors)
123
            , ("hvparams", showJSON $ clusterHvparams cluster)
124
            , ("os_hvp", showJSON $ clusterOsHvp cluster)
125
            , ("beparams", showJSON $ clusterBeparams cluster)
126
            , ("osparams", showJSON $ clusterOsparams cluster)
127
            , ("ipolicy", showJSON $ clusterIpolicy cluster)
128
            , ("nicparams", showJSON $ clusterNicparams cluster)
129
            , ("ndparams", showJSON $ clusterNdparams cluster)
130
            , ("diskparams", showJSON $ clusterDiskparams cluster)
131
            , ("candidate_pool_size",
132
               showJSON $ clusterCandidatePoolSize cluster)
133
            , ("max_running_jobs",
134
               showJSON $ clusterMaxRunningJobs cluster)
135
            , ("master_netdev",  showJSON $ clusterMasterNetdev cluster)
136
            , ("master_netmask", showJSON $ clusterMasterNetmask cluster)
137
            , ("use_external_mip_script",
138
               showJSON $ clusterUseExternalMipScript cluster)
139
            , ("volume_group_name",
140
               maybe JSNull showJSON (clusterVolumeGroupName cluster))
141
            , ("drbd_usermode_helper",
142
               maybe JSNull showJSON (clusterDrbdUsermodeHelper cluster))
143
            , ("file_storage_dir", showJSON $ clusterFileStorageDir cluster)
144
            , ("shared_file_storage_dir",
145
               showJSON $ clusterSharedFileStorageDir cluster)
146
            , ("gluster_storage_dir",
147
               showJSON $ clusterGlusterStorageDir cluster)
148
            , ("maintain_node_health",
149
               showJSON $ clusterMaintainNodeHealth cluster)
150
            , ("ctime", showJSON $ clusterCtime cluster)
151
            , ("mtime", showJSON $ clusterMtime cluster)
152
            , ("uuid", showJSON $ clusterUuid cluster)
153
            , ("tags", showJSON $ clusterTags cluster)
154
            , ("uid_pool", showJSON $ clusterUidPool cluster)
155
            , ("default_iallocator",
156
               showJSON $ clusterDefaultIallocator cluster)
157
            , ("default_iallocator_params",
158
              showJSON $ clusterDefaultIallocatorParams cluster)
159
            , ("reserved_lvs", showJSON $ clusterReservedLvs cluster)
160
            , ("primary_ip_version",
161
               showJSON . ipFamilyToVersion $ clusterPrimaryIpFamily cluster)
162
            , ("prealloc_wipe_disks",
163
               showJSON $ clusterPreallocWipeDisks cluster)
164
            , ("hidden_os", showJSON $ clusterHiddenOs cluster)
165
            , ("blacklisted_os", showJSON $ clusterBlacklistedOs cluster)
166
            , ("enabled_disk_templates", showJSON diskTemplates)
167
            ]
168

    
169
  in case master of
170
    Ok _ -> return . Ok . J.makeObj $ obj
171
    Bad ex -> return $ Bad ex
172

    
173
handleCall _ _ cfg (QueryTags kind name) = do
174
  let tags = case kind of
175
               TagKindCluster  -> Ok . clusterTags $ configCluster cfg
176
               TagKindGroup    -> groupTags   <$> Config.getGroup    cfg name
177
               TagKindNode     -> nodeTags    <$> Config.getNode     cfg name
178
               TagKindInstance -> instTags    <$> Config.getInstance cfg name
179
               TagKindNetwork  -> networkTags <$> Config.getNetwork  cfg name
180
  return (J.showJSON <$> tags)
181

    
182
handleCall _ _ cfg (Query qkind qfields qfilter) = do
183
  result <- query cfg True (Qlang.Query qkind qfields qfilter)
184
  return $ J.showJSON <$> result
185

    
186
handleCall _ _ _ (QueryFields qkind qfields) = do
187
  let result = queryFields (Qlang.QueryFields qkind qfields)
188
  return $ J.showJSON <$> result
189

    
190
handleCall _ _ cfg (QueryNodes names fields lock) =
191
  handleClassicQuery cfg (Qlang.ItemTypeOpCode Qlang.QRNode)
192
    (map Left names) fields lock
193

    
194
handleCall _ _ cfg (QueryInstances names fields lock) =
195
  handleClassicQuery cfg (Qlang.ItemTypeOpCode Qlang.QRInstance)
196
    (map Left names) fields lock
197

    
198
handleCall _ _ cfg (QueryGroups names fields lock) =
199
  handleClassicQuery cfg (Qlang.ItemTypeOpCode Qlang.QRGroup)
200
    (map Left names) fields lock
201

    
202
handleCall _ _ cfg (QueryJobs names fields) =
203
  handleClassicQuery cfg (Qlang.ItemTypeLuxi Qlang.QRJob)
204
    (map (Right . fromIntegral . fromJobId) names)  fields False
205

    
206
handleCall _ _ cfg (QueryNetworks names fields lock) =
207
  handleClassicQuery cfg (Qlang.ItemTypeOpCode Qlang.QRNetwork)
208
    (map Left names) fields lock
209

    
210
handleCall _ _ cfg (QueryConfigValues fields) = do
211
  let params = [ ("cluster_name", return . showJSON . clusterClusterName
212
                                    . configCluster $ cfg)
213
               , ("watcher_pause", liftM (maybe JSNull showJSON)
214
                                     QCluster.isWatcherPaused)
215
               , ("master_node", return . genericResult (const JSNull) showJSON
216
                                   $ QCluster.clusterMasterNodeName cfg)
217
               , ("drain_flag", liftM (showJSON . not) isQueueOpen)
218
               ] :: [(String, IO JSValue)]
219
  let answer = map (fromMaybe (return JSNull) . flip lookup params) fields
220
  answerEval <- sequence answer
221
  return . Ok . showJSON $ answerEval
222

    
223
handleCall qlock qstat cfg (SubmitJobToDrainedQueue ops) =
224
  do
225
    let mcs = Config.getMasterCandidates cfg
226
    jobid <- allocateJobId mcs qlock
227
    case jobid of
228
      Bad s -> return . Bad . GenericError $ s
229
      Ok jid -> do
230
        ts <- currentTimestamp
231
        job <- liftM (setReceivedTimestamp ts)
232
                 $ queuedJobFromOpCodes jid ops
233
        qDir <- queueDir
234
        write_result <- writeJobToDisk qDir job
235
        case write_result of
236
          Bad s -> return . Bad . GenericError $ s
237
          Ok () -> do
238
            _ <- replicateManyJobs qDir mcs [job]
239
            _ <- forkIO $ enqueueNewJobs qstat [job]
240
            return . Ok . showJSON . fromJobId $ jid
241

    
242
handleCall qlock qstat cfg (SubmitJob ops) =
243
  do
244
    open <- isQueueOpen
245
    if not open
246
       then return . Bad . GenericError $ "Queue drained"
247
       else handleCall qlock qstat cfg (SubmitJobToDrainedQueue ops)
248

    
249
handleCall qlock qstat cfg (SubmitManyJobs lops) =
250
  do
251
    open <- isQueueOpen
252
    if not open
253
      then return . Bad . GenericError $ "Queue drained"
254
      else do
255
        let mcs = Config.getMasterCandidates cfg
256
        result_jobids <- allocateJobIds mcs qlock (length lops)
257
        case result_jobids of
258
          Bad s -> return . Bad . GenericError $ s
259
          Ok jids -> do
260
            ts <- currentTimestamp
261
            jobs <- liftM (map $ setReceivedTimestamp ts)
262
                      $ zipWithM queuedJobFromOpCodes jids lops
263
            qDir <- queueDir
264
            write_results <- mapM (writeJobToDisk qDir) jobs
265
            let annotated_results = zip write_results jobs
266
                succeeded = map snd $ filter (isOk . fst) annotated_results
267
            when (any isBad write_results) . logWarning
268
              $ "Writing some jobs failed " ++ show annotated_results
269
            replicateManyJobs qDir mcs succeeded
270
            _ <- forkIO $ enqueueNewJobs qstat succeeded
271
            return . Ok . JSArray
272
              . map (\(res, job) ->
273
                      if isOk res
274
                        then showJSON (True, fromJobId $ qjId job)
275
                        else showJSON (False, genericResult id (const "") res))
276
              $ annotated_results
277

    
278
handleCall _ _ cfg (WaitForJobChange jid fields prev_job prev_log tmout) = do
279
  let compute_fn = computeJobUpdate cfg jid fields prev_log
280
  qDir <- queueDir
281
  -- verify if the job is finalized, and return immediately in this case
282
  jobresult <- loadJobFromDisk qDir False jid
283
  case jobresult of
284
    Ok (job, _) | not (jobFinalized job) -> do
285
      let jobfile = liveJobFile qDir jid
286
      answer <- watchFile jobfile (min tmout C.luxiWfjcTimeout)
287
                  (prev_job, JSArray []) compute_fn
288
      return . Ok $ showJSON answer
289
    _ -> liftM (Ok . showJSON) compute_fn
290

    
291
handleCall _ _ cfg (SetWatcherPause time) = do
292
  let mcs = Config.getMasterCandidates cfg
293
      masters = genericResult (const []) return
294
                  . Config.getNode cfg . clusterMasterNode
295
                  $ configCluster cfg
296
  _ <- executeRpcCall (masters ++ mcs) $ RpcCallSetWatcherPause time
297
  return . Ok . maybe JSNull showJSON $ time
298

    
299
handleCall _ _ cfg (SetDrainFlag value) = do
300
  let mcs = Config.getMasterCandidates cfg
301
  fpath <- jobQueueDrainFile
302
  if value
303
     then writeFile fpath ""
304
     else removeFile fpath
305
  _ <- executeRpcCall mcs $ RpcCallSetDrainFlag value
306
  return . Ok . showJSON $ True
307

    
308
handleCall _ qstat cfg (ChangeJobPriority jid prio) = do
309
  maybeJob <- setJobPriority qstat jid prio
310
  case maybeJob of
311
    Bad s -> return . Ok $ showJSON (False, s)
312
    Ok (Just job) -> runResultT $ do
313
      let mcs = Config.getMasterCandidates cfg
314
      qDir <- liftIO queueDir
315
      liftIO $ replicateManyJobs qDir mcs [job]
316
      return $ showJSON (True, "Priorities of pending opcodes for job "
317
                               ++ show (fromJobId jid) ++ " have been changed"
318
                               ++ " to " ++ show prio)
319
    Ok Nothing -> runResultT $ do
320
      -- Job has already started; so we have to forward the request
321
      -- to the job, currently handled by masterd.
322
      socketpath <- liftIO defaultMasterSocket
323
      cl <- liftIO $ getLuxiClient socketpath
324
      ResultT $ callMethod (ChangeJobPriority jid prio) cl
325

    
326
handleCall _ qstat  cfg (CancelJob jid) = do
327
  let jName = (++) "job " . show $ fromJobId jid
328
  dequeueResult <- dequeueJob qstat jid
329
  case dequeueResult of
330
    Ok True -> do
331
      logDebug $ jName ++ " dequeued, marking as canceled"
332
      qDir <- queueDir
333
      readResult <- loadJobFromDisk qDir True jid
334
      let jobFileFailed = return . Ok . showJSON . (,) False
335
                            . (++) ("Dequeued " ++ jName
336
                                    ++ ", but failed to mark as cancelled: ")
337
                          :: String -> IO (ErrorResult JSValue)
338
      case readResult of
339
        Bad s -> jobFileFailed s
340
        Ok (job, _) -> do
341
          now <- currentTimestamp
342
          let job' = cancelQueuedJob now job
343
              mcs = Config.getMasterCandidates cfg
344
          write_result <- writeJobToDisk qDir job'
345
          case write_result of
346
            Bad s -> jobFileFailed s
347
            Ok () -> do
348
              replicateManyJobs qDir mcs [job']
349
              return . Ok . showJSON $ (True, "Dequeued " ++ jName)
350
    Ok False -> do
351
      logDebug $ jName ++ " not queued; trying to cancel directly"
352
      cancelJob jid
353
    Bad s -> return . Ok . showJSON $ (False, s)
354

    
355
handleCall qlock _ cfg (ArchiveJob jid) = do
356
  let archiveFailed = putMVar qlock  () >> (return . Ok $ showJSON False)
357
                      :: IO (ErrorResult JSValue)
358
  qDir <- queueDir
359
  takeMVar qlock
360
  result <- loadJobFromDisk qDir False jid
361
  case result of
362
    Bad _ -> archiveFailed
363
    Ok (job, _) -> if jobFinalized job
364
                     then do
365
                       let mcs = Config.getMasterCandidates cfg
366
                           live = liveJobFile qDir jid
367
                           archive = archivedJobFile qDir jid
368
                       renameResult <- safeRenameFile queueDirPermissions
369
                                         live archive
370
                       putMVar qlock ()
371
                       case renameResult of
372
                         Bad s -> return . Bad . JobQueueError
373
                                    $ "Archiving failed in an unexpected way: "
374
                                        ++ s
375
                         Ok () -> do
376
                           _ <- executeRpcCall mcs
377
                                  $ RpcCallJobqueueRename [(live, archive)]
378
                           return . Ok $ showJSON True
379
                     else archiveFailed
380

    
381
handleCall qlock _ cfg (AutoArchiveJobs age timeout) = do
382
  qDir <- queueDir
383
  eitherJids <- getJobIDs [qDir]
384
  case eitherJids of
385
    Left s -> return . Bad . JobQueueError $ show s
386
    Right jids -> do
387
      result <- bracket_ (takeMVar qlock) (putMVar qlock ())
388
                  . archiveJobs cfg age timeout
389
                  $ sortJobIDs jids
390
      return . Ok $ showJSON result
391

    
392
handleCall _ _ _ op =
393
  return . Bad $
394
    GenericError ("Luxi call '" ++ strOfOp op ++ "' not implemented")
395

    
396
{-# ANN handleCall "HLint: ignore Too strict if" #-}
397

    
398
-- | Query the status of a job and return the requested fields
399
-- and the logs newer than the given log number.
400
computeJobUpdate :: ConfigData -> JobId -> [String] -> JSValue
401
                    -> IO (JSValue, JSValue)
402
computeJobUpdate cfg jid fields prev_log = do
403
  let sjid = show $ fromJobId jid
404
  logDebug $ "Inspecting fields " ++ show fields ++ " of job " ++ sjid
405
  let fromJSArray (JSArray xs) = xs
406
      fromJSArray _ = []
407
  let logFilter JSNull (JSArray _) = True
408
      logFilter (JSRational _ n) (JSArray (JSRational _ m:_)) = n < m
409
      logFilter _ _ = False
410
  let filterLogs n logs = JSArray (filter (logFilter n) (logs >>= fromJSArray))
411
  jobQuery <- handleClassicQuery cfg (Qlang.ItemTypeLuxi Qlang.QRJob)
412
                [Right . fromIntegral $ fromJobId jid] ("oplog" : fields) False
413
  let (rfields, rlogs) = case jobQuery of
414
        Ok (JSArray [JSArray (JSArray logs : answer)]) ->
415
          (answer, filterLogs prev_log logs)
416
        _ -> (map (const JSNull) fields, JSArray [])
417
  logDebug $ "Updates for job " ++ sjid ++ " are " ++ encode (rfields, rlogs)
418
  return (JSArray rfields, rlogs)
419

    
420

    
421
type LuxiConfig = (MVar (), JQStatus, ConfigReader)
422

    
423
luxiExec
424
    :: LuxiConfig
425
    -> LuxiOp
426
    -> IO (Bool, GenericResult GanetiException JSValue)
427
luxiExec (qlock, qstat, creader) args = do
428
  cfg <- creader
429
  result <- handleCallWrapper qlock qstat cfg args
430
  return (True, result)
431

    
432
luxiHandler :: LuxiConfig -> U.Handler LuxiOp JSValue
433
luxiHandler cfg = U.Handler { U.hParse         = decodeLuxiCall
434
                            , U.hInputLogShort = strOfOp
435
                            , U.hInputLogLong  = show
436
                            , U.hExec          = luxiExec cfg
437
                            }
438

    
439
-- | Type alias for prepMain results
440
type PrepResult = (Server, IORef (Result ConfigData), JQStatus)
441

    
442
-- | Check function for luxid.
443
checkMain :: CheckFn ()
444
checkMain _ = return $ Right ()
445

    
446
-- | Prepare function for luxid.
447
prepMain :: PrepFn () PrepResult
448
prepMain _ _ = do
449
  socket_path <- Path.defaultQuerySocket
450
  cleanupSocket socket_path
451
  s <- describeError "binding to the Luxi socket"
452
         Nothing (Just socket_path) $ getLuxiServer True socket_path
453
  cref <- newIORef (Bad "Configuration not yet loaded")
454
  jq <- emptyJQStatus cref
455
  return (s, cref, jq)
456

    
457
-- | Main function.
458
main :: MainFn () PrepResult
459
main _ _ (server, cref, jq) = do
460
  initConfigReader id cref
461
  let creader = readIORef cref
462
  initJQScheduler jq
463

    
464
  qlockFile <- jobQueueLockFile
465
  lockFile qlockFile >>= exitIfBad "Failed to obtain the job-queue lock"
466
  qlock <- newMVar ()
467

    
468
  finally
469
    (forever $ U.listener (luxiHandler (qlock, jq, creader)) server)
470
    (closeServer server)