Statistics
| Branch: | Tag: | Revision:

root / src / Ganeti / Query / Server.hs @ 353bd75b

History | View | Annotate | Download (19.4 kB)

1
{-| Implementation of the Ganeti Query2 server.
2

    
3
-}
4

    
5
{-
6

    
7
Copyright (C) 2012, 2013, 2014 Google Inc.
8

    
9
This program is free software; you can redistribute it and/or modify
10
it under the terms of the GNU General Public License as published by
11
the Free Software Foundation; either version 2 of the License, or
12
(at your option) any later version.
13

    
14
This program is distributed in the hope that it will be useful, but
15
WITHOUT ANY WARRANTY; without even the implied warranty of
16
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
17
General Public License for more details.
18

    
19
You should have received a copy of the GNU General Public License
20
along with this program; if not, write to the Free Software
21
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22
02110-1301, USA.
23

    
24
-}
25

    
26
module Ganeti.Query.Server
27
  ( main
28
  , checkMain
29
  , prepMain
30
  ) where
31

    
32
import Control.Applicative
33
import Control.Concurrent
34
import Control.Exception
35
import Control.Monad (forever, when, zipWithM, liftM)
36
import Control.Monad.IO.Class
37
import Data.Bits (bitSize)
38
import qualified Data.Set as Set (toList)
39
import Data.IORef
40
import Data.Maybe (fromMaybe)
41
import qualified Text.JSON as J
42
import Text.JSON (encode, showJSON, JSValue(..))
43
import System.Info (arch)
44
import System.Directory
45

    
46
import qualified Ganeti.Constants as C
47
import qualified Ganeti.ConstantUtils as ConstantUtils (unFrozenSet)
48
import Ganeti.Errors
49
import qualified Ganeti.Path as Path
50
import Ganeti.Daemon
51
import Ganeti.Objects
52
import qualified Ganeti.Config as Config
53
import Ganeti.ConfigReader
54
import Ganeti.BasicTypes
55
import Ganeti.JQueue
56
import Ganeti.JQScheduler
57
import Ganeti.JSON (TimeAsDoubleJSON(..))
58
import Ganeti.Logging
59
import Ganeti.Luxi
60
import qualified Ganeti.Query.Language as Qlang
61
import qualified Ganeti.Query.Cluster as QCluster
62
import Ganeti.Path ( queueDir, jobQueueLockFile, jobQueueDrainFile
63
                   , defaultMasterSocket)
64
import Ganeti.Rpc
65
import Ganeti.Query.Query
66
import Ganeti.Query.Filter (makeSimpleFilter)
67
import Ganeti.Types
68
import qualified Ganeti.UDSServer as U (Handler(..), listener)
69
import Ganeti.Utils (lockFile, exitIfBad, watchFile, safeRenameFile)
70
import qualified Ganeti.Version as Version
71

    
72
-- | Helper for classic queries.
73
handleClassicQuery :: ConfigData      -- ^ Cluster config
74
                   -> Qlang.ItemType  -- ^ Query type
75
                   -> [Either String Integer] -- ^ Requested names
76
                                              -- (empty means all)
77
                   -> [String]        -- ^ Requested fields
78
                   -> Bool            -- ^ Whether to do sync queries or not
79
                   -> IO (GenericResult GanetiException JSValue)
80
handleClassicQuery _ _ _ _ True =
81
  return . Bad $ OpPrereqError "Sync queries are not allowed" ECodeInval
82
handleClassicQuery cfg qkind names fields _ = do
83
  let simpleNameFilter field = makeSimpleFilter (field qkind) names
84
      flt = Qlang.OrFilter $ map simpleNameFilter [nameField, uuidField]
85
  qr <- query cfg True (Qlang.Query qkind fields flt)
86
  return $ showJSON <$> (qr >>= queryCompat)
87

    
88
-- | Minimal wrapper to handle the missing config case.
89
handleCallWrapper :: MVar () -> JQStatus ->  Result ConfigData
90
                     -> LuxiOp -> IO (ErrorResult JSValue)
91
handleCallWrapper _ _ (Bad msg) _ =
92
  return . Bad . ConfigurationError $
93
           "I do not have access to a valid configuration, cannot\
94
           \ process queries: " ++ msg
95
handleCallWrapper qlock qstat (Ok config) op = handleCall qlock qstat config op
96

    
97
-- | Actual luxi operation handler.
98
handleCall :: MVar () -> JQStatus
99
              -> ConfigData -> LuxiOp -> IO (ErrorResult JSValue)
100
handleCall _ _ cdata QueryClusterInfo =
101
  let cluster = configCluster cdata
102
      master = QCluster.clusterMasterNodeName cdata
103
      hypervisors = clusterEnabledHypervisors cluster
104
      diskTemplates = clusterEnabledDiskTemplates cluster
105
      def_hv = case hypervisors of
106
                 x:_ -> showJSON x
107
                 [] -> JSNull
108
      bits = show (bitSize (0::Int)) ++ "bits"
109
      arch_tuple = [bits, arch]
110
      obj = [ ("software_version", showJSON C.releaseVersion)
111
            , ("protocol_version", showJSON C.protocolVersion)
112
            , ("config_version", showJSON C.configVersion)
113
            , ("os_api_version", showJSON . maximum .
114
                                 Set.toList . ConstantUtils.unFrozenSet $
115
                                 C.osApiVersions)
116
            , ("export_version", showJSON C.exportVersion)
117
            , ("vcs_version", showJSON Version.version)
118
            , ("architecture", showJSON arch_tuple)
119
            , ("name", showJSON $ clusterClusterName cluster)
120
            , ("master", showJSON (case master of
121
                                     Ok name -> name
122
                                     _ -> undefined))
123
            , ("default_hypervisor", def_hv)
124
            , ("enabled_hypervisors", showJSON hypervisors)
125
            , ("hvparams", showJSON $ clusterHvparams cluster)
126
            , ("os_hvp", showJSON $ clusterOsHvp cluster)
127
            , ("beparams", showJSON $ clusterBeparams cluster)
128
            , ("osparams", showJSON $ clusterOsparams cluster)
129
            , ("ipolicy", showJSON $ clusterIpolicy cluster)
130
            , ("nicparams", showJSON $ clusterNicparams cluster)
131
            , ("ndparams", showJSON $ clusterNdparams cluster)
132
            , ("diskparams", showJSON $ clusterDiskparams cluster)
133
            , ("candidate_pool_size",
134
               showJSON $ clusterCandidatePoolSize cluster)
135
            , ("max_running_jobs",
136
               showJSON $ clusterMaxRunningJobs cluster)
137
            , ("mac_prefix",  showJSON $ clusterMacPrefix cluster)
138
            , ("master_netdev",  showJSON $ clusterMasterNetdev cluster)
139
            , ("master_netmask", showJSON $ clusterMasterNetmask cluster)
140
            , ("use_external_mip_script",
141
               showJSON $ clusterUseExternalMipScript cluster)
142
            , ("volume_group_name",
143
               maybe JSNull showJSON (clusterVolumeGroupName cluster))
144
            , ("drbd_usermode_helper",
145
               maybe JSNull showJSON (clusterDrbdUsermodeHelper cluster))
146
            , ("file_storage_dir", showJSON $ clusterFileStorageDir cluster)
147
            , ("shared_file_storage_dir",
148
               showJSON $ clusterSharedFileStorageDir cluster)
149
            , ("gluster_storage_dir",
150
               showJSON $ clusterGlusterStorageDir cluster)
151
            , ("maintain_node_health",
152
               showJSON $ clusterMaintainNodeHealth cluster)
153
            , ("ctime", showJSON . TimeAsDoubleJSON $ clusterCtime cluster)
154
            , ("mtime", showJSON . TimeAsDoubleJSON $ clusterMtime cluster)
155
            , ("uuid", showJSON $ clusterUuid cluster)
156
            , ("tags", showJSON $ clusterTags cluster)
157
            , ("uid_pool", showJSON $ clusterUidPool cluster)
158
            , ("default_iallocator",
159
               showJSON $ clusterDefaultIallocator cluster)
160
            , ("default_iallocator_params",
161
              showJSON $ clusterDefaultIallocatorParams cluster)
162
            , ("reserved_lvs", showJSON $ clusterReservedLvs cluster)
163
            , ("primary_ip_version",
164
               showJSON . ipFamilyToVersion $ clusterPrimaryIpFamily cluster)
165
            , ("prealloc_wipe_disks",
166
               showJSON $ clusterPreallocWipeDisks cluster)
167
            , ("hidden_os", showJSON $ clusterHiddenOs cluster)
168
            , ("blacklisted_os", showJSON $ clusterBlacklistedOs cluster)
169
            , ("enabled_disk_templates", showJSON diskTemplates)
170
            , ("instance_communication_network",
171
               showJSON (clusterInstanceCommunicationNetwork cluster))
172
            ]
173

    
174
  in case master of
175
    Ok _ -> return . Ok . J.makeObj $ obj
176
    Bad ex -> return $ Bad ex
177

    
178
handleCall _ _ cfg (QueryTags kind name) = do
179
  let tags = case kind of
180
               TagKindCluster  -> Ok . clusterTags $ configCluster cfg
181
               TagKindGroup    -> groupTags   <$> Config.getGroup    cfg name
182
               TagKindNode     -> nodeTags    <$> Config.getNode     cfg name
183
               TagKindInstance -> instTags    <$> Config.getInstance cfg name
184
               TagKindNetwork  -> networkTags <$> Config.getNetwork  cfg name
185
  return (J.showJSON <$> tags)
186

    
187
handleCall _ _ cfg (Query qkind qfields qfilter) = do
188
  result <- query cfg True (Qlang.Query qkind qfields qfilter)
189
  return $ J.showJSON <$> result
190

    
191
handleCall _ _ _ (QueryFields qkind qfields) = do
192
  let result = queryFields (Qlang.QueryFields qkind qfields)
193
  return $ J.showJSON <$> result
194

    
195
handleCall _ _ cfg (QueryNodes names fields lock) =
196
  handleClassicQuery cfg (Qlang.ItemTypeOpCode Qlang.QRNode)
197
    (map Left names) fields lock
198

    
199
handleCall _ _ cfg (QueryInstances names fields lock) =
200
  handleClassicQuery cfg (Qlang.ItemTypeOpCode Qlang.QRInstance)
201
    (map Left names) fields lock
202

    
203
handleCall _ _ cfg (QueryGroups names fields lock) =
204
  handleClassicQuery cfg (Qlang.ItemTypeOpCode Qlang.QRGroup)
205
    (map Left names) fields lock
206

    
207
handleCall _ _ cfg (QueryJobs names fields) =
208
  handleClassicQuery cfg (Qlang.ItemTypeLuxi Qlang.QRJob)
209
    (map (Right . fromIntegral . fromJobId) names)  fields False
210

    
211
handleCall _ _ cfg (QueryNetworks names fields lock) =
212
  handleClassicQuery cfg (Qlang.ItemTypeOpCode Qlang.QRNetwork)
213
    (map Left names) fields lock
214

    
215
handleCall _ _ cfg (QueryConfigValues fields) = do
216
  let params = [ ("cluster_name", return . showJSON . clusterClusterName
217
                                    . configCluster $ cfg)
218
               , ("watcher_pause", liftM (maybe JSNull showJSON)
219
                                     QCluster.isWatcherPaused)
220
               , ("master_node", return . genericResult (const JSNull) showJSON
221
                                   $ QCluster.clusterMasterNodeName cfg)
222
               , ("drain_flag", liftM (showJSON . not) isQueueOpen)
223
               ] :: [(String, IO JSValue)]
224
  let answer = map (fromMaybe (return JSNull) . flip lookup params) fields
225
  answerEval <- sequence answer
226
  return . Ok . showJSON $ answerEval
227

    
228
handleCall _ _ cfg (QueryExports nodes lock) =
229
  handleClassicQuery cfg (Qlang.ItemTypeOpCode Qlang.QRExport)
230
    (map Left nodes) ["node", "export"] lock
231

    
232
handleCall qlock qstat cfg (SubmitJobToDrainedQueue ops) = runResultT $ do
233
    let mcs = Config.getMasterCandidates cfg
234
    jid <- mkResultT $ allocateJobId mcs qlock
235
    ts <- liftIO currentTimestamp
236
    job <- liftM (setReceivedTimestamp ts)
237
             $ queuedJobFromOpCodes jid ops
238
    qDir <- liftIO queueDir
239
    mkResultT $ writeJobToDisk qDir job
240
    liftIO $ replicateManyJobs qDir mcs [job]
241
    _ <- liftIO . forkIO $ enqueueNewJobs qstat [job]
242
    return . showJSON . fromJobId $ jid
243

    
244
handleCall qlock qstat cfg (SubmitJob ops) =
245
  do
246
    open <- isQueueOpen
247
    if not open
248
       then return . Bad . GenericError $ "Queue drained"
249
       else handleCall qlock qstat cfg (SubmitJobToDrainedQueue ops)
250

    
251
handleCall qlock qstat cfg (SubmitManyJobs lops) =
252
  do
253
    open <- isQueueOpen
254
    if not open
255
      then return . Bad . GenericError $ "Queue drained"
256
      else do
257
        let mcs = Config.getMasterCandidates cfg
258
        result_jobids <- allocateJobIds mcs qlock (length lops)
259
        case result_jobids of
260
          Bad s -> return . Bad . GenericError $ s
261
          Ok jids -> do
262
            ts <- currentTimestamp
263
            jobs <- liftM (map $ setReceivedTimestamp ts)
264
                      $ zipWithM queuedJobFromOpCodes jids lops
265
            qDir <- queueDir
266
            write_results <- mapM (writeJobToDisk qDir) jobs
267
            let annotated_results = zip write_results jobs
268
                succeeded = map snd $ filter (isOk . fst) annotated_results
269
            when (any isBad write_results) . logWarning
270
              $ "Writing some jobs failed " ++ show annotated_results
271
            replicateManyJobs qDir mcs succeeded
272
            _ <- forkIO $ enqueueNewJobs qstat succeeded
273
            return . Ok . JSArray
274
              . map (\(res, job) ->
275
                      if isOk res
276
                        then showJSON (True, fromJobId $ qjId job)
277
                        else showJSON (False, genericResult id (const "") res))
278
              $ annotated_results
279

    
280
handleCall _ _ cfg (WaitForJobChange jid fields prev_job prev_log tmout) = do
281
  let compute_fn = computeJobUpdate cfg jid fields prev_log
282
  qDir <- queueDir
283
  -- verify if the job is finalized, and return immediately in this case
284
  jobresult <- loadJobFromDisk qDir False jid
285
  case jobresult of
286
    Ok (job, _) | not (jobFinalized job) -> do
287
      let jobfile = liveJobFile qDir jid
288
      answer <- watchFile jobfile (min tmout C.luxiWfjcTimeout)
289
                  (prev_job, JSArray []) compute_fn
290
      return . Ok $ showJSON answer
291
    _ -> liftM (Ok . showJSON) compute_fn
292

    
293
handleCall _ _ cfg (SetWatcherPause time) = do
294
  let mcs = Config.getMasterCandidates cfg
295
      masters = genericResult (const []) return
296
                  . Config.getNode cfg . clusterMasterNode
297
                  $ configCluster cfg
298
  _ <- executeRpcCall (masters ++ mcs) $ RpcCallSetWatcherPause time
299
  return . Ok . maybe JSNull showJSON $ fmap TimeAsDoubleJSON time
300

    
301
handleCall _ _ cfg (SetDrainFlag value) = do
302
  let mcs = Config.getMasterCandidates cfg
303
  fpath <- jobQueueDrainFile
304
  if value
305
     then writeFile fpath ""
306
     else removeFile fpath
307
  _ <- executeRpcCall mcs $ RpcCallSetDrainFlag value
308
  return . Ok . showJSON $ True
309

    
310
handleCall _ qstat cfg (ChangeJobPriority jid prio) = do
311
  maybeJob <- setJobPriority qstat jid prio
312
  case maybeJob of
313
    Bad s -> return . Ok $ showJSON (False, s)
314
    Ok (Just job) -> runResultT $ do
315
      let mcs = Config.getMasterCandidates cfg
316
      qDir <- liftIO queueDir
317
      liftIO $ replicateManyJobs qDir mcs [job]
318
      return $ showJSON (True, "Priorities of pending opcodes for job "
319
                               ++ show (fromJobId jid) ++ " have been changed"
320
                               ++ " to " ++ show prio)
321
    Ok Nothing -> runResultT $ do
322
      -- Job has already started; so we have to forward the request
323
      -- to the job, currently handled by masterd.
324
      socketpath <- liftIO defaultMasterSocket
325
      cl <- liftIO $ getLuxiClient socketpath
326
      ResultT $ callMethod (ChangeJobPriority jid prio) cl
327

    
328
handleCall _ qstat  cfg (CancelJob jid) = do
329
  let jName = (++) "job " . show $ fromJobId jid
330
  dequeueResult <- dequeueJob qstat jid
331
  case dequeueResult of
332
    Ok True -> do
333
      logDebug $ jName ++ " dequeued, marking as canceled"
334
      qDir <- queueDir
335
      readResult <- loadJobFromDisk qDir True jid
336
      let jobFileFailed = return . Ok . showJSON . (,) False
337
                            . (++) ("Dequeued " ++ jName
338
                                    ++ ", but failed to mark as cancelled: ")
339
                          :: String -> IO (ErrorResult JSValue)
340
      case readResult of
341
        Bad s -> jobFileFailed s
342
        Ok (job, _) -> do
343
          now <- currentTimestamp
344
          let job' = cancelQueuedJob now job
345
              mcs = Config.getMasterCandidates cfg
346
          write_result <- writeJobToDisk qDir job'
347
          case write_result of
348
            Bad s -> jobFileFailed s
349
            Ok () -> do
350
              replicateManyJobs qDir mcs [job']
351
              return . Ok . showJSON $ (True, "Dequeued " ++ jName)
352
    Ok False -> do
353
      logDebug $ jName ++ " not queued; trying to cancel directly"
354
      cancelJob jid
355
    Bad s -> return . Ok . showJSON $ (False, s)
356

    
357
handleCall qlock _ cfg (ArchiveJob jid) = do
358
  let archiveFailed = putMVar qlock  () >> (return . Ok $ showJSON False)
359
                      :: IO (ErrorResult JSValue)
360
  qDir <- queueDir
361
  takeMVar qlock
362
  result <- loadJobFromDisk qDir False jid
363
  case result of
364
    Bad _ -> archiveFailed
365
    Ok (job, _) -> if jobFinalized job
366
                     then do
367
                       let mcs = Config.getMasterCandidates cfg
368
                           live = liveJobFile qDir jid
369
                           archive = archivedJobFile qDir jid
370
                       renameResult <- safeRenameFile queueDirPermissions
371
                                         live archive
372
                       putMVar qlock ()
373
                       case renameResult of
374
                         Bad s -> return . Bad . JobQueueError
375
                                    $ "Archiving failed in an unexpected way: "
376
                                        ++ s
377
                         Ok () -> do
378
                           _ <- executeRpcCall mcs
379
                                  $ RpcCallJobqueueRename [(live, archive)]
380
                           return . Ok $ showJSON True
381
                     else archiveFailed
382

    
383
handleCall qlock _ cfg (AutoArchiveJobs age timeout) = do
384
  qDir <- queueDir
385
  resultJids <- getJobIDs [qDir]
386
  case resultJids of
387
    Bad s -> return . Bad . JobQueueError $ show s
388
    Ok jids -> do
389
      result <- bracket_ (takeMVar qlock) (putMVar qlock ())
390
                  . archiveJobs cfg age timeout
391
                  $ sortJobIDs jids
392
      return . Ok $ showJSON result
393

    
394
handleCall _ _ _ (PickupJob _) =
395
  return . Bad
396
    $ GenericError "Luxi call 'PickupJob' is for internal use only"
397

    
398
{-# ANN handleCall "HLint: ignore Too strict if" #-}
399

    
400
-- | Query the status of a job and return the requested fields
401
-- and the logs newer than the given log number.
402
computeJobUpdate :: ConfigData -> JobId -> [String] -> JSValue
403
                    -> IO (JSValue, JSValue)
404
computeJobUpdate cfg jid fields prev_log = do
405
  let sjid = show $ fromJobId jid
406
  logDebug $ "Inspecting fields " ++ show fields ++ " of job " ++ sjid
407
  let fromJSArray (JSArray xs) = xs
408
      fromJSArray _ = []
409
  let logFilter JSNull (JSArray _) = True
410
      logFilter (JSRational _ n) (JSArray (JSRational _ m:_)) = n < m
411
      logFilter _ _ = False
412
  let filterLogs n logs = JSArray (filter (logFilter n) (logs >>= fromJSArray))
413
  jobQuery <- handleClassicQuery cfg (Qlang.ItemTypeLuxi Qlang.QRJob)
414
                [Right . fromIntegral $ fromJobId jid] ("oplog" : fields) False
415
  let (rfields, rlogs) = case jobQuery of
416
        Ok (JSArray [JSArray (JSArray logs : answer)]) ->
417
          (answer, filterLogs prev_log logs)
418
        _ -> (map (const JSNull) fields, JSArray [])
419
  logDebug $ "Updates for job " ++ sjid ++ " are " ++ encode (rfields, rlogs)
420
  return (JSArray rfields, rlogs)
421

    
422

    
423
type LuxiConfig = (MVar (), JQStatus, ConfigReader)
424

    
425
luxiExec
426
    :: LuxiConfig
427
    -> LuxiOp
428
    -> IO (Bool, GenericResult GanetiException JSValue)
429
luxiExec (qlock, qstat, creader) args = do
430
  cfg <- creader
431
  result <- handleCallWrapper qlock qstat cfg args
432
  return (True, result)
433

    
434
luxiHandler :: LuxiConfig -> U.Handler LuxiOp IO JSValue
435
luxiHandler cfg = U.Handler { U.hParse         = decodeLuxiCall
436
                            , U.hInputLogShort = strOfOp
437
                            , U.hInputLogLong  = show
438
                            , U.hExec          = luxiExec cfg
439
                            }
440

    
441
-- | Type alias for prepMain results
442
type PrepResult = (Server, IORef (Result ConfigData), JQStatus)
443

    
444
-- | Check function for luxid.
445
checkMain :: CheckFn ()
446
checkMain _ = return $ Right ()
447

    
448
-- | Prepare function for luxid.
449
prepMain :: PrepFn () PrepResult
450
prepMain _ _ = do
451
  socket_path <- Path.defaultQuerySocket
452
  cleanupSocket socket_path
453
  s <- describeError "binding to the Luxi socket"
454
         Nothing (Just socket_path) $ getLuxiServer True socket_path
455
  cref <- newIORef (Bad "Configuration not yet loaded")
456
  jq <- emptyJQStatus cref
457
  return (s, cref, jq)
458

    
459
-- | Main function.
460
main :: MainFn () PrepResult
461
main _ _ (server, cref, jq) = do
462
  initConfigReader id cref
463
  let creader = readIORef cref
464
  initJQScheduler jq
465

    
466
  qlockFile <- jobQueueLockFile
467
  lockFile qlockFile >>= exitIfBad "Failed to obtain the job-queue lock"
468
  qlock <- newMVar ()
469

    
470
  finally
471
    (forever $ U.listener (luxiHandler (qlock, jq, creader)) server)
472
    (closeServer server)