Statistics
| Branch: | Tag: | Revision:

root / src / Ganeti / Query / Server.hs @ b172b0ab

History | View | Annotate | Download (19.4 kB)

1
{-| Implementation of the Ganeti Query2 server.
2

    
3
-}
4

    
5
{-
6

    
7
Copyright (C) 2012, 2013, 2014 Google Inc.
8

    
9
This program is free software; you can redistribute it and/or modify
10
it under the terms of the GNU General Public License as published by
11
the Free Software Foundation; either version 2 of the License, or
12
(at your option) any later version.
13

    
14
This program is distributed in the hope that it will be useful, but
15
WITHOUT ANY WARRANTY; without even the implied warranty of
16
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
17
General Public License for more details.
18

    
19
You should have received a copy of the GNU General Public License
20
along with this program; if not, write to the Free Software
21
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22
02110-1301, USA.
23

    
24
-}
25

    
26
module Ganeti.Query.Server
27
  ( main
28
  , checkMain
29
  , prepMain
30
  ) where
31

    
32
import Control.Applicative
33
import Control.Concurrent
34
import Control.Exception
35
import Control.Monad (forever, when, zipWithM, liftM)
36
import Control.Monad.IO.Class
37
import Data.Bits (bitSize)
38
import qualified Data.Set as Set (toList)
39
import Data.IORef
40
import Data.Maybe (fromMaybe)
41
import qualified Text.JSON as J
42
import Text.JSON (encode, showJSON, JSValue(..))
43
import System.Info (arch)
44
import System.Directory
45

    
46
import qualified Ganeti.Constants as C
47
import qualified Ganeti.ConstantUtils as ConstantUtils (unFrozenSet)
48
import Ganeti.Errors
49
import qualified Ganeti.Path as Path
50
import Ganeti.Daemon
51
import Ganeti.Objects
52
import qualified Ganeti.Config as Config
53
import Ganeti.ConfigReader
54
import Ganeti.BasicTypes
55
import Ganeti.JQueue
56
import Ganeti.JQScheduler
57
import Ganeti.JSON (TimeAsDoubleJSON(..))
58
import Ganeti.Logging
59
import Ganeti.Luxi
60
import qualified Ganeti.Query.Language as Qlang
61
import qualified Ganeti.Query.Cluster as QCluster
62
import Ganeti.Path ( queueDir, jobQueueLockFile, jobQueueDrainFile
63
                   , defaultMasterSocket)
64
import Ganeti.Rpc
65
import Ganeti.Query.Query
66
import Ganeti.Query.Filter (makeSimpleFilter)
67
import Ganeti.Types
68
import qualified Ganeti.UDSServer as U (Handler(..), listener)
69
import Ganeti.Utils (lockFile, exitIfBad, watchFile, safeRenameFile)
70
import qualified Ganeti.Version as Version
71

    
72
-- | Helper for classic queries.
73
handleClassicQuery :: ConfigData      -- ^ Cluster config
74
                   -> Qlang.ItemType  -- ^ Query type
75
                   -> [Either String Integer] -- ^ Requested names
76
                                              -- (empty means all)
77
                   -> [String]        -- ^ Requested fields
78
                   -> Bool            -- ^ Whether to do sync queries or not
79
                   -> IO (GenericResult GanetiException JSValue)
80
handleClassicQuery _ _ _ _ True =
81
  return . Bad $ OpPrereqError "Sync queries are not allowed" ECodeInval
82
handleClassicQuery cfg qkind names fields _ = do
83
  let simpleNameFilter field = makeSimpleFilter (field qkind) names
84
      flt = Qlang.OrFilter $ map simpleNameFilter [nameField, uuidField]
85
  qr <- query cfg True (Qlang.Query qkind fields flt)
86
  return $ showJSON <$> (qr >>= queryCompat)
87

    
88
-- | Minimal wrapper to handle the missing config case.
89
handleCallWrapper :: MVar () -> JQStatus ->  Result ConfigData
90
                     -> LuxiOp -> IO (ErrorResult JSValue)
91
handleCallWrapper _ _ (Bad msg) _ =
92
  return . Bad . ConfigurationError $
93
           "I do not have access to a valid configuration, cannot\
94
           \ process queries: " ++ msg
95
handleCallWrapper qlock qstat (Ok config) op = handleCall qlock qstat config op
96

    
97
-- | Actual luxi operation handler.
98
handleCall :: MVar () -> JQStatus
99
              -> ConfigData -> LuxiOp -> IO (ErrorResult JSValue)
100
handleCall _ _ cdata QueryClusterInfo =
101
  let cluster = configCluster cdata
102
      master = QCluster.clusterMasterNodeName cdata
103
      hypervisors = clusterEnabledHypervisors cluster
104
      diskTemplates = clusterEnabledDiskTemplates cluster
105
      def_hv = case hypervisors of
106
                 x:_ -> showJSON x
107
                 [] -> JSNull
108
      bits = show (bitSize (0::Int)) ++ "bits"
109
      arch_tuple = [bits, arch]
110
      obj = [ ("software_version", showJSON C.releaseVersion)
111
            , ("protocol_version", showJSON C.protocolVersion)
112
            , ("config_version", showJSON C.configVersion)
113
            , ("os_api_version", showJSON . maximum .
114
                                 Set.toList . ConstantUtils.unFrozenSet $
115
                                 C.osApiVersions)
116
            , ("export_version", showJSON C.exportVersion)
117
            , ("vcs_version", showJSON Version.version)
118
            , ("architecture", showJSON arch_tuple)
119
            , ("name", showJSON $ clusterClusterName cluster)
120
            , ("master", showJSON (case master of
121
                                     Ok name -> name
122
                                     _ -> undefined))
123
            , ("default_hypervisor", def_hv)
124
            , ("enabled_hypervisors", showJSON hypervisors)
125
            , ("hvparams", showJSON $ clusterHvparams cluster)
126
            , ("os_hvp", showJSON $ clusterOsHvp cluster)
127
            , ("beparams", showJSON $ clusterBeparams cluster)
128
            , ("osparams", showJSON $ clusterOsparams cluster)
129
            , ("ipolicy", showJSON $ clusterIpolicy cluster)
130
            , ("nicparams", showJSON $ clusterNicparams cluster)
131
            , ("ndparams", showJSON $ clusterNdparams cluster)
132
            , ("diskparams", showJSON $ clusterDiskparams cluster)
133
            , ("candidate_pool_size",
134
               showJSON $ clusterCandidatePoolSize cluster)
135
            , ("max_running_jobs",
136
               showJSON $ clusterMaxRunningJobs cluster)
137
            , ("master_netdev",  showJSON $ clusterMasterNetdev cluster)
138
            , ("master_netmask", showJSON $ clusterMasterNetmask cluster)
139
            , ("use_external_mip_script",
140
               showJSON $ clusterUseExternalMipScript cluster)
141
            , ("volume_group_name",
142
               maybe JSNull showJSON (clusterVolumeGroupName cluster))
143
            , ("drbd_usermode_helper",
144
               maybe JSNull showJSON (clusterDrbdUsermodeHelper cluster))
145
            , ("file_storage_dir", showJSON $ clusterFileStorageDir cluster)
146
            , ("shared_file_storage_dir",
147
               showJSON $ clusterSharedFileStorageDir cluster)
148
            , ("gluster_storage_dir",
149
               showJSON $ clusterGlusterStorageDir cluster)
150
            , ("maintain_node_health",
151
               showJSON $ clusterMaintainNodeHealth cluster)
152
            , ("ctime", showJSON . TimeAsDoubleJSON $ clusterCtime cluster)
153
            , ("mtime", showJSON . TimeAsDoubleJSON $ clusterMtime cluster)
154
            , ("uuid", showJSON $ clusterUuid cluster)
155
            , ("tags", showJSON $ clusterTags cluster)
156
            , ("uid_pool", showJSON $ clusterUidPool cluster)
157
            , ("default_iallocator",
158
               showJSON $ clusterDefaultIallocator cluster)
159
            , ("default_iallocator_params",
160
              showJSON $ clusterDefaultIallocatorParams cluster)
161
            , ("reserved_lvs", showJSON $ clusterReservedLvs cluster)
162
            , ("primary_ip_version",
163
               showJSON . ipFamilyToVersion $ clusterPrimaryIpFamily cluster)
164
            , ("prealloc_wipe_disks",
165
               showJSON $ clusterPreallocWipeDisks cluster)
166
            , ("hidden_os", showJSON $ clusterHiddenOs cluster)
167
            , ("blacklisted_os", showJSON $ clusterBlacklistedOs cluster)
168
            , ("enabled_disk_templates", showJSON diskTemplates)
169
            , ("instance_communication_network",
170
               showJSON (clusterInstanceCommunicationNetwork cluster))
171
            ]
172

    
173
  in case master of
174
    Ok _ -> return . Ok . J.makeObj $ obj
175
    Bad ex -> return $ Bad ex
176

    
177
handleCall _ _ cfg (QueryTags kind name) = do
178
  let tags = case kind of
179
               TagKindCluster  -> Ok . clusterTags $ configCluster cfg
180
               TagKindGroup    -> groupTags   <$> Config.getGroup    cfg name
181
               TagKindNode     -> nodeTags    <$> Config.getNode     cfg name
182
               TagKindInstance -> instTags    <$> Config.getInstance cfg name
183
               TagKindNetwork  -> networkTags <$> Config.getNetwork  cfg name
184
  return (J.showJSON <$> tags)
185

    
186
handleCall _ _ cfg (Query qkind qfields qfilter) = do
187
  result <- query cfg True (Qlang.Query qkind qfields qfilter)
188
  return $ J.showJSON <$> result
189

    
190
handleCall _ _ _ (QueryFields qkind qfields) = do
191
  let result = queryFields (Qlang.QueryFields qkind qfields)
192
  return $ J.showJSON <$> result
193

    
194
handleCall _ _ cfg (QueryNodes names fields lock) =
195
  handleClassicQuery cfg (Qlang.ItemTypeOpCode Qlang.QRNode)
196
    (map Left names) fields lock
197

    
198
handleCall _ _ cfg (QueryInstances names fields lock) =
199
  handleClassicQuery cfg (Qlang.ItemTypeOpCode Qlang.QRInstance)
200
    (map Left names) fields lock
201

    
202
handleCall _ _ cfg (QueryGroups names fields lock) =
203
  handleClassicQuery cfg (Qlang.ItemTypeOpCode Qlang.QRGroup)
204
    (map Left names) fields lock
205

    
206
handleCall _ _ cfg (QueryJobs names fields) =
207
  handleClassicQuery cfg (Qlang.ItemTypeLuxi Qlang.QRJob)
208
    (map (Right . fromIntegral . fromJobId) names)  fields False
209

    
210
handleCall _ _ cfg (QueryNetworks names fields lock) =
211
  handleClassicQuery cfg (Qlang.ItemTypeOpCode Qlang.QRNetwork)
212
    (map Left names) fields lock
213

    
214
handleCall _ _ cfg (QueryConfigValues fields) = do
215
  let params = [ ("cluster_name", return . showJSON . clusterClusterName
216
                                    . configCluster $ cfg)
217
               , ("watcher_pause", liftM (maybe JSNull showJSON)
218
                                     QCluster.isWatcherPaused)
219
               , ("master_node", return . genericResult (const JSNull) showJSON
220
                                   $ QCluster.clusterMasterNodeName cfg)
221
               , ("drain_flag", liftM (showJSON . not) isQueueOpen)
222
               ] :: [(String, IO JSValue)]
223
  let answer = map (fromMaybe (return JSNull) . flip lookup params) fields
224
  answerEval <- sequence answer
225
  return . Ok . showJSON $ answerEval
226

    
227
handleCall _ _ cfg (QueryExports nodes lock) =
228
  handleClassicQuery cfg (Qlang.ItemTypeOpCode Qlang.QRExport)
229
    (map Left nodes) ["node", "export"] lock
230

    
231
handleCall qlock qstat cfg (SubmitJobToDrainedQueue ops) = runResultT $ do
232
    let mcs = Config.getMasterCandidates cfg
233
    jid <- mkResultT $ allocateJobId mcs qlock
234
    ts <- liftIO currentTimestamp
235
    job <- liftM (setReceivedTimestamp ts)
236
             $ queuedJobFromOpCodes jid ops
237
    qDir <- liftIO queueDir
238
    mkResultT $ writeJobToDisk qDir job
239
    liftIO $ replicateManyJobs qDir mcs [job]
240
    _ <- liftIO . forkIO $ enqueueNewJobs qstat [job]
241
    return . showJSON . fromJobId $ jid
242

    
243
handleCall qlock qstat cfg (SubmitJob ops) =
244
  do
245
    open <- isQueueOpen
246
    if not open
247
       then return . Bad . GenericError $ "Queue drained"
248
       else handleCall qlock qstat cfg (SubmitJobToDrainedQueue ops)
249

    
250
handleCall qlock qstat cfg (SubmitManyJobs lops) =
251
  do
252
    open <- isQueueOpen
253
    if not open
254
      then return . Bad . GenericError $ "Queue drained"
255
      else do
256
        let mcs = Config.getMasterCandidates cfg
257
        result_jobids <- allocateJobIds mcs qlock (length lops)
258
        case result_jobids of
259
          Bad s -> return . Bad . GenericError $ s
260
          Ok jids -> do
261
            ts <- currentTimestamp
262
            jobs <- liftM (map $ setReceivedTimestamp ts)
263
                      $ zipWithM queuedJobFromOpCodes jids lops
264
            qDir <- queueDir
265
            write_results <- mapM (writeJobToDisk qDir) jobs
266
            let annotated_results = zip write_results jobs
267
                succeeded = map snd $ filter (isOk . fst) annotated_results
268
            when (any isBad write_results) . logWarning
269
              $ "Writing some jobs failed " ++ show annotated_results
270
            replicateManyJobs qDir mcs succeeded
271
            _ <- forkIO $ enqueueNewJobs qstat succeeded
272
            return . Ok . JSArray
273
              . map (\(res, job) ->
274
                      if isOk res
275
                        then showJSON (True, fromJobId $ qjId job)
276
                        else showJSON (False, genericResult id (const "") res))
277
              $ annotated_results
278

    
279
handleCall _ _ cfg (WaitForJobChange jid fields prev_job prev_log tmout) = do
280
  let compute_fn = computeJobUpdate cfg jid fields prev_log
281
  qDir <- queueDir
282
  -- verify if the job is finalized, and return immediately in this case
283
  jobresult <- loadJobFromDisk qDir False jid
284
  case jobresult of
285
    Ok (job, _) | not (jobFinalized job) -> do
286
      let jobfile = liveJobFile qDir jid
287
      answer <- watchFile jobfile (min tmout C.luxiWfjcTimeout)
288
                  (prev_job, JSArray []) compute_fn
289
      return . Ok $ showJSON answer
290
    _ -> liftM (Ok . showJSON) compute_fn
291

    
292
handleCall _ _ cfg (SetWatcherPause time) = do
293
  let mcs = Config.getMasterCandidates cfg
294
      masters = genericResult (const []) return
295
                  . Config.getNode cfg . clusterMasterNode
296
                  $ configCluster cfg
297
  _ <- executeRpcCall (masters ++ mcs) $ RpcCallSetWatcherPause time
298
  return . Ok . maybe JSNull showJSON $ fmap TimeAsDoubleJSON time
299

    
300
handleCall _ _ cfg (SetDrainFlag value) = do
301
  let mcs = Config.getMasterCandidates cfg
302
  fpath <- jobQueueDrainFile
303
  if value
304
     then writeFile fpath ""
305
     else removeFile fpath
306
  _ <- executeRpcCall mcs $ RpcCallSetDrainFlag value
307
  return . Ok . showJSON $ True
308

    
309
handleCall _ qstat cfg (ChangeJobPriority jid prio) = do
310
  maybeJob <- setJobPriority qstat jid prio
311
  case maybeJob of
312
    Bad s -> return . Ok $ showJSON (False, s)
313
    Ok (Just job) -> runResultT $ do
314
      let mcs = Config.getMasterCandidates cfg
315
      qDir <- liftIO queueDir
316
      liftIO $ replicateManyJobs qDir mcs [job]
317
      return $ showJSON (True, "Priorities of pending opcodes for job "
318
                               ++ show (fromJobId jid) ++ " have been changed"
319
                               ++ " to " ++ show prio)
320
    Ok Nothing -> runResultT $ do
321
      -- Job has already started; so we have to forward the request
322
      -- to the job, currently handled by masterd.
323
      socketpath <- liftIO defaultMasterSocket
324
      cl <- liftIO $ getLuxiClient socketpath
325
      ResultT $ callMethod (ChangeJobPriority jid prio) cl
326

    
327
handleCall _ qstat  cfg (CancelJob jid) = do
328
  let jName = (++) "job " . show $ fromJobId jid
329
  dequeueResult <- dequeueJob qstat jid
330
  case dequeueResult of
331
    Ok True -> do
332
      logDebug $ jName ++ " dequeued, marking as canceled"
333
      qDir <- queueDir
334
      readResult <- loadJobFromDisk qDir True jid
335
      let jobFileFailed = return . Ok . showJSON . (,) False
336
                            . (++) ("Dequeued " ++ jName
337
                                    ++ ", but failed to mark as cancelled: ")
338
                          :: String -> IO (ErrorResult JSValue)
339
      case readResult of
340
        Bad s -> jobFileFailed s
341
        Ok (job, _) -> do
342
          now <- currentTimestamp
343
          let job' = cancelQueuedJob now job
344
              mcs = Config.getMasterCandidates cfg
345
          write_result <- writeJobToDisk qDir job'
346
          case write_result of
347
            Bad s -> jobFileFailed s
348
            Ok () -> do
349
              replicateManyJobs qDir mcs [job']
350
              return . Ok . showJSON $ (True, "Dequeued " ++ jName)
351
    Ok False -> do
352
      logDebug $ jName ++ " not queued; trying to cancel directly"
353
      cancelJob jid
354
    Bad s -> return . Ok . showJSON $ (False, s)
355

    
356
handleCall qlock _ cfg (ArchiveJob jid) = do
357
  let archiveFailed = putMVar qlock  () >> (return . Ok $ showJSON False)
358
                      :: IO (ErrorResult JSValue)
359
  qDir <- queueDir
360
  takeMVar qlock
361
  result <- loadJobFromDisk qDir False jid
362
  case result of
363
    Bad _ -> archiveFailed
364
    Ok (job, _) -> if jobFinalized job
365
                     then do
366
                       let mcs = Config.getMasterCandidates cfg
367
                           live = liveJobFile qDir jid
368
                           archive = archivedJobFile qDir jid
369
                       renameResult <- safeRenameFile queueDirPermissions
370
                                         live archive
371
                       putMVar qlock ()
372
                       case renameResult of
373
                         Bad s -> return . Bad . JobQueueError
374
                                    $ "Archiving failed in an unexpected way: "
375
                                        ++ s
376
                         Ok () -> do
377
                           _ <- executeRpcCall mcs
378
                                  $ RpcCallJobqueueRename [(live, archive)]
379
                           return . Ok $ showJSON True
380
                     else archiveFailed
381

    
382
handleCall qlock _ cfg (AutoArchiveJobs age timeout) = do
383
  qDir <- queueDir
384
  resultJids <- getJobIDs [qDir]
385
  case resultJids of
386
    Bad s -> return . Bad . JobQueueError $ show s
387
    Ok jids -> do
388
      result <- bracket_ (takeMVar qlock) (putMVar qlock ())
389
                  . archiveJobs cfg age timeout
390
                  $ sortJobIDs jids
391
      return . Ok $ showJSON result
392

    
393
handleCall _ _ _ (PickupJob _) =
394
  return . Bad
395
    $ GenericError "Luxi call 'PickupJob' is for internal use only"
396

    
397
{-# ANN handleCall "HLint: ignore Too strict if" #-}
398

    
399
-- | Query the status of a job and return the requested fields
400
-- and the logs newer than the given log number.
401
computeJobUpdate :: ConfigData -> JobId -> [String] -> JSValue
402
                    -> IO (JSValue, JSValue)
403
computeJobUpdate cfg jid fields prev_log = do
404
  let sjid = show $ fromJobId jid
405
  logDebug $ "Inspecting fields " ++ show fields ++ " of job " ++ sjid
406
  let fromJSArray (JSArray xs) = xs
407
      fromJSArray _ = []
408
  let logFilter JSNull (JSArray _) = True
409
      logFilter (JSRational _ n) (JSArray (JSRational _ m:_)) = n < m
410
      logFilter _ _ = False
411
  let filterLogs n logs = JSArray (filter (logFilter n) (logs >>= fromJSArray))
412
  jobQuery <- handleClassicQuery cfg (Qlang.ItemTypeLuxi Qlang.QRJob)
413
                [Right . fromIntegral $ fromJobId jid] ("oplog" : fields) False
414
  let (rfields, rlogs) = case jobQuery of
415
        Ok (JSArray [JSArray (JSArray logs : answer)]) ->
416
          (answer, filterLogs prev_log logs)
417
        _ -> (map (const JSNull) fields, JSArray [])
418
  logDebug $ "Updates for job " ++ sjid ++ " are " ++ encode (rfields, rlogs)
419
  return (JSArray rfields, rlogs)
420

    
421

    
422
type LuxiConfig = (MVar (), JQStatus, ConfigReader)
423

    
424
luxiExec
425
    :: LuxiConfig
426
    -> LuxiOp
427
    -> IO (Bool, GenericResult GanetiException JSValue)
428
luxiExec (qlock, qstat, creader) args = do
429
  cfg <- creader
430
  result <- handleCallWrapper qlock qstat cfg args
431
  return (True, result)
432

    
433
luxiHandler :: LuxiConfig -> U.Handler LuxiOp IO JSValue
434
luxiHandler cfg = U.Handler { U.hParse         = decodeLuxiCall
435
                            , U.hInputLogShort = strOfOp
436
                            , U.hInputLogLong  = show
437
                            , U.hExec          = luxiExec cfg
438
                            }
439

    
440
-- | Type alias for prepMain results
441
type PrepResult = (Server, IORef (Result ConfigData), JQStatus)
442

    
443
-- | Check function for luxid.
444
checkMain :: CheckFn ()
445
checkMain _ = return $ Right ()
446

    
447
-- | Prepare function for luxid.
448
prepMain :: PrepFn () PrepResult
449
prepMain _ _ = do
450
  socket_path <- Path.defaultQuerySocket
451
  cleanupSocket socket_path
452
  s <- describeError "binding to the Luxi socket"
453
         Nothing (Just socket_path) $ getLuxiServer True socket_path
454
  cref <- newIORef (Bad "Configuration not yet loaded")
455
  jq <- emptyJQStatus cref
456
  return (s, cref, jq)
457

    
458
-- | Main function.
459
main :: MainFn () PrepResult
460
main _ _ (server, cref, jq) = do
461
  initConfigReader id cref
462
  let creader = readIORef cref
463
  initJQScheduler jq
464

    
465
  qlockFile <- jobQueueLockFile
466
  lockFile qlockFile >>= exitIfBad "Failed to obtain the job-queue lock"
467
  qlock <- newMVar ()
468

    
469
  finally
470
    (forever $ U.listener (luxiHandler (qlock, jq, creader)) server)
471
    (closeServer server)