Statistics
| Branch: | Tag: | Revision:

root / src / Ganeti / Query / Server.hs @ 9131274c

History | View | Annotate | Download (15.5 kB)

1
{-| Implementation of the Ganeti Query2 server.
2

    
3
-}
4

    
5
{-
6

    
7
Copyright (C) 2012, 2013 Google Inc.
8

    
9
This program is free software; you can redistribute it and/or modify
10
it under the terms of the GNU General Public License as published by
11
the Free Software Foundation; either version 2 of the License, or
12
(at your option) any later version.
13

    
14
This program is distributed in the hope that it will be useful, but
15
WITHOUT ANY WARRANTY; without even the implied warranty of
16
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
17
General Public License for more details.
18

    
19
You should have received a copy of the GNU General Public License
20
along with this program; if not, write to the Free Software
21
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22
02110-1301, USA.
23

    
24
-}
25

    
26
module Ganeti.Query.Server
27
  ( main
28
  , checkMain
29
  , prepMain
30
  ) where
31

    
32
import Control.Applicative
33
import Control.Concurrent
34
import Control.Exception
35
import Control.Monad (forever, when, zipWithM, liftM)
36
import Data.Bits (bitSize)
37
import qualified Data.Set as Set (toList)
38
import Data.IORef
39
import Data.Maybe (fromMaybe)
40
import qualified Text.JSON as J
41
import Text.JSON (encode, showJSON, JSValue(..))
42
import System.Info (arch)
43
import System.Directory
44

    
45
import qualified Ganeti.Constants as C
46
import qualified Ganeti.ConstantUtils as ConstantUtils (unFrozenSet)
47
import Ganeti.Errors
48
import qualified Ganeti.Path as Path
49
import Ganeti.Daemon
50
import Ganeti.Objects
51
import qualified Ganeti.Config as Config
52
import Ganeti.ConfigReader
53
import Ganeti.BasicTypes
54
import Ganeti.JQueue
55
import Ganeti.JQScheduler
56
import Ganeti.Logging
57
import Ganeti.Luxi
58
import qualified Ganeti.Query.Language as Qlang
59
import qualified Ganeti.Query.Cluster as QCluster
60
import Ganeti.Path (queueDir, jobQueueLockFile, jobQueueDrainFile)
61
import Ganeti.Rpc
62
import Ganeti.Query.Query
63
import Ganeti.Query.Filter (makeSimpleFilter)
64
import Ganeti.Types
65
import qualified Ganeti.UDSServer as U (Handler(..), listener)
66
import Ganeti.Utils (lockFile, exitIfBad, watchFile)
67
import qualified Ganeti.Version as Version
68

    
69
-- | Helper for classic queries.
70
handleClassicQuery :: ConfigData      -- ^ Cluster config
71
                   -> Qlang.ItemType  -- ^ Query type
72
                   -> [Either String Integer] -- ^ Requested names
73
                                              -- (empty means all)
74
                   -> [String]        -- ^ Requested fields
75
                   -> Bool            -- ^ Whether to do sync queries or not
76
                   -> IO (GenericResult GanetiException JSValue)
77
handleClassicQuery _ _ _ _ True =
78
  return . Bad $ OpPrereqError "Sync queries are not allowed" ECodeInval
79
handleClassicQuery cfg qkind names fields _ = do
80
  let flt = makeSimpleFilter (nameField qkind) names
81
  qr <- query cfg True (Qlang.Query qkind fields flt)
82
  return $ showJSON <$> (qr >>= queryCompat)
83

    
84
-- | Minimal wrapper to handle the missing config case.
85
handleCallWrapper :: MVar () -> JQStatus ->  Result ConfigData
86
                     -> LuxiOp -> IO (ErrorResult JSValue)
87
handleCallWrapper _ _ (Bad msg) _ =
88
  return . Bad . ConfigurationError $
89
           "I do not have access to a valid configuration, cannot\
90
           \ process queries: " ++ msg
91
handleCallWrapper qlock qstat (Ok config) op = handleCall qlock qstat config op
92

    
93
-- | Actual luxi operation handler.
94
handleCall :: MVar () -> JQStatus
95
              -> ConfigData -> LuxiOp -> IO (ErrorResult JSValue)
96
handleCall _ _ cdata QueryClusterInfo =
97
  let cluster = configCluster cdata
98
      master = QCluster.clusterMasterNodeName cdata
99
      hypervisors = clusterEnabledHypervisors cluster
100
      diskTemplates = clusterEnabledDiskTemplates cluster
101
      def_hv = case hypervisors of
102
                 x:_ -> showJSON x
103
                 [] -> JSNull
104
      bits = show (bitSize (0::Int)) ++ "bits"
105
      arch_tuple = [bits, arch]
106
      obj = [ ("software_version", showJSON C.releaseVersion)
107
            , ("protocol_version", showJSON C.protocolVersion)
108
            , ("config_version", showJSON C.configVersion)
109
            , ("os_api_version", showJSON . maximum .
110
                                 Set.toList . ConstantUtils.unFrozenSet $
111
                                 C.osApiVersions)
112
            , ("export_version", showJSON C.exportVersion)
113
            , ("vcs_version", showJSON Version.version)
114
            , ("architecture", showJSON arch_tuple)
115
            , ("name", showJSON $ clusterClusterName cluster)
116
            , ("master", showJSON (case master of
117
                                     Ok name -> name
118
                                     _ -> undefined))
119
            , ("default_hypervisor", def_hv)
120
            , ("enabled_hypervisors", showJSON hypervisors)
121
            , ("hvparams", showJSON $ clusterHvparams cluster)
122
            , ("os_hvp", showJSON $ clusterOsHvp cluster)
123
            , ("beparams", showJSON $ clusterBeparams cluster)
124
            , ("osparams", showJSON $ clusterOsparams cluster)
125
            , ("ipolicy", showJSON $ clusterIpolicy cluster)
126
            , ("nicparams", showJSON $ clusterNicparams cluster)
127
            , ("ndparams", showJSON $ clusterNdparams cluster)
128
            , ("diskparams", showJSON $ clusterDiskparams cluster)
129
            , ("candidate_pool_size",
130
               showJSON $ clusterCandidatePoolSize cluster)
131
            , ("master_netdev",  showJSON $ clusterMasterNetdev cluster)
132
            , ("master_netmask", showJSON $ clusterMasterNetmask cluster)
133
            , ("use_external_mip_script",
134
               showJSON $ clusterUseExternalMipScript cluster)
135
            , ("volume_group_name",
136
               maybe JSNull showJSON (clusterVolumeGroupName cluster))
137
            , ("drbd_usermode_helper",
138
               maybe JSNull showJSON (clusterDrbdUsermodeHelper cluster))
139
            , ("file_storage_dir", showJSON $ clusterFileStorageDir cluster)
140
            , ("shared_file_storage_dir",
141
               showJSON $ clusterSharedFileStorageDir cluster)
142
            , ("gluster_storage_dir",
143
               showJSON $ clusterGlusterStorageDir cluster)
144
            , ("maintain_node_health",
145
               showJSON $ clusterMaintainNodeHealth cluster)
146
            , ("ctime", showJSON $ clusterCtime cluster)
147
            , ("mtime", showJSON $ clusterMtime cluster)
148
            , ("uuid", showJSON $ clusterUuid cluster)
149
            , ("tags", showJSON $ clusterTags cluster)
150
            , ("uid_pool", showJSON $ clusterUidPool cluster)
151
            , ("default_iallocator",
152
               showJSON $ clusterDefaultIallocator cluster)
153
            , ("default_iallocator_params",
154
              showJSON $ clusterDefaultIallocatorParams cluster)
155
            , ("reserved_lvs", showJSON $ clusterReservedLvs cluster)
156
            , ("primary_ip_version",
157
               showJSON . ipFamilyToVersion $ clusterPrimaryIpFamily cluster)
158
            , ("prealloc_wipe_disks",
159
               showJSON $ clusterPreallocWipeDisks cluster)
160
            , ("hidden_os", showJSON $ clusterHiddenOs cluster)
161
            , ("blacklisted_os", showJSON $ clusterBlacklistedOs cluster)
162
            , ("enabled_disk_templates", showJSON diskTemplates)
163
            ]
164

    
165
  in case master of
166
    Ok _ -> return . Ok . J.makeObj $ obj
167
    Bad ex -> return $ Bad ex
168

    
169
handleCall _ _ cfg (QueryTags kind name) = do
170
  let tags = case kind of
171
               TagKindCluster  -> Ok . clusterTags $ configCluster cfg
172
               TagKindGroup    -> groupTags <$> Config.getGroup    cfg name
173
               TagKindNode     -> nodeTags  <$> Config.getNode     cfg name
174
               TagKindInstance -> instTags  <$> Config.getInstance cfg name
175
               TagKindNetwork  -> Bad $ OpPrereqError
176
                                        "Network tag is not allowed"
177
                                        ECodeInval
178
  return (J.showJSON <$> tags)
179

    
180
handleCall _ _ cfg (Query qkind qfields qfilter) = do
181
  result <- query cfg True (Qlang.Query qkind qfields qfilter)
182
  return $ J.showJSON <$> result
183

    
184
handleCall _ _ _ (QueryFields qkind qfields) = do
185
  let result = queryFields (Qlang.QueryFields qkind qfields)
186
  return $ J.showJSON <$> result
187

    
188
handleCall _ _ cfg (QueryNodes names fields lock) =
189
  handleClassicQuery cfg (Qlang.ItemTypeOpCode Qlang.QRNode)
190
    (map Left names) fields lock
191

    
192
handleCall _ _ cfg (QueryInstances names fields lock) =
193
  handleClassicQuery cfg (Qlang.ItemTypeOpCode Qlang.QRInstance)
194
    (map Left names) fields lock
195

    
196
handleCall _ _ cfg (QueryGroups names fields lock) =
197
  handleClassicQuery cfg (Qlang.ItemTypeOpCode Qlang.QRGroup)
198
    (map Left names) fields lock
199

    
200
handleCall _ _ cfg (QueryJobs names fields) =
201
  handleClassicQuery cfg (Qlang.ItemTypeLuxi Qlang.QRJob)
202
    (map (Right . fromIntegral . fromJobId) names)  fields False
203

    
204
handleCall _ _ cfg (QueryNetworks names fields lock) =
205
  handleClassicQuery cfg (Qlang.ItemTypeOpCode Qlang.QRNetwork)
206
    (map Left names) fields lock
207

    
208
handleCall _ _ cfg (QueryConfigValues fields) = do
209
  let params = [ ("cluster_name", return . showJSON . clusterClusterName
210
                                    . configCluster $ cfg)
211
               , ("watcher_pause", liftM (maybe JSNull showJSON)
212
                                     QCluster.isWatcherPaused)
213
               , ("master_node", return . genericResult (const JSNull) showJSON
214
                                   $ QCluster.clusterMasterNodeName cfg)
215
               , ("drain_flag", liftM (showJSON . not) isQueueOpen)
216
               ] :: [(String, IO JSValue)]
217
  let answer = map (fromMaybe (return JSNull) . flip lookup params) fields
218
  answerEval <- sequence answer
219
  return . Ok . showJSON $ answerEval
220

    
221
handleCall qlock qstat cfg (SubmitJobToDrainedQueue ops) =
222
  do
223
    let mcs = Config.getMasterCandidates cfg
224
    jobid <- allocateJobId mcs qlock
225
    case jobid of
226
      Bad s -> return . Bad . GenericError $ s
227
      Ok jid -> do
228
        ts <- currentTimestamp
229
        job <- liftM (setReceivedTimestamp ts)
230
                 $ queuedJobFromOpCodes jid ops
231
        qDir <- queueDir
232
        write_result <- writeJobToDisk qDir job
233
        case write_result of
234
          Bad s -> return . Bad . GenericError $ s
235
          Ok () -> do
236
            _ <- replicateManyJobs qDir mcs [job]
237
            _ <- forkIO $ enqueueNewJobs qstat [job]
238
            return . Ok . showJSON . fromJobId $ jid
239

    
240
handleCall qlock qstat cfg (SubmitJob ops) =
241
  do
242
    open <- isQueueOpen
243
    if not open
244
       then return . Bad . GenericError $ "Queue drained"
245
       else handleCall qlock qstat cfg (SubmitJobToDrainedQueue ops)
246

    
247
handleCall qlock qstat cfg (SubmitManyJobs lops) =
248
  do
249
    open <- isQueueOpen
250
    if not open
251
      then return . Bad . GenericError $ "Queue drained"
252
      else do
253
        let mcs = Config.getMasterCandidates cfg
254
        result_jobids <- allocateJobIds mcs qlock (length lops)
255
        case result_jobids of
256
          Bad s -> return . Bad . GenericError $ s
257
          Ok jids -> do
258
            ts <- currentTimestamp
259
            jobs <- liftM (map $ setReceivedTimestamp ts)
260
                      $ zipWithM queuedJobFromOpCodes jids lops
261
            qDir <- queueDir
262
            write_results <- mapM (writeJobToDisk qDir) jobs
263
            let annotated_results = zip write_results jobs
264
                succeeded = map snd $ filter (isOk . fst) annotated_results
265
            when (any isBad write_results) . logWarning
266
              $ "Writing some jobs failed " ++ show annotated_results
267
            replicateManyJobs qDir mcs succeeded
268
            _ <- forkIO $ enqueueNewJobs qstat succeeded
269
            return . Ok . JSArray
270
              . map (\(res, job) ->
271
                      if isOk res
272
                        then showJSON (True, fromJobId $ qjId job)
273
                        else showJSON (False, genericResult id (const "") res))
274
              $ annotated_results
275

    
276
handleCall _ _ cfg (WaitForJobChange jid fields prev_job prev_log tmout) = do
277
  let compute_fn = computeJobUpdate cfg jid fields prev_log
278
  qDir <- queueDir
279
  -- verify if the job is finalized, and return immediately in this case
280
  jobresult <- loadJobFromDisk qDir False jid
281
  case jobresult of
282
    Ok (job, _) | not (jobFinalized job) -> do
283
      let jobfile = liveJobFile qDir jid
284
      answer <- watchFile jobfile (min tmout C.luxiWfjcTimeout)
285
                  (prev_job, JSArray []) compute_fn
286
      return . Ok $ showJSON answer
287
    _ -> liftM (Ok . showJSON) compute_fn
288

    
289
handleCall _ _ cfg (SetWatcherPause time) = do
290
  let mcs = Config.getMasterCandidates cfg
291
      masters = genericResult (const []) return
292
                  . Config.getNode cfg . clusterMasterNode
293
                  $ configCluster cfg
294
  _ <- executeRpcCall (masters ++ mcs) $ RpcCallSetWatcherPause time
295
  return . Ok . maybe JSNull showJSON $ time
296

    
297
handleCall _ _ cfg (SetDrainFlag value) = do
298
  let mcs = Config.getMasterCandidates cfg
299
  fpath <- jobQueueDrainFile
300
  if value
301
     then writeFile fpath ""
302
     else removeFile fpath
303
  _ <- executeRpcCall mcs $ RpcCallSetDrainFlag value
304
  return . Ok . showJSON $ True
305

    
306
handleCall _ _ _ op =
307
  return . Bad $
308
    GenericError ("Luxi call '" ++ strOfOp op ++ "' not implemented")
309

    
310
{-# ANN handleCall "HLint: ignore Too strict if" #-}
311

    
312
-- | Query the status of a job and return the requested fields
313
-- and the logs newer than the given log number.
314
computeJobUpdate :: ConfigData -> JobId -> [String] -> JSValue
315
                    -> IO (JSValue, JSValue)
316
computeJobUpdate cfg jid fields prev_log = do
317
  let sjid = show $ fromJobId jid
318
  logDebug $ "Inspecting fields " ++ show fields ++ " of job " ++ sjid
319
  let fromJSArray (JSArray xs) = xs
320
      fromJSArray _ = []
321
  let logFilter JSNull (JSArray _) = True
322
      logFilter (JSRational _ n) (JSArray (JSRational _ m:_)) = n < m
323
      logFilter _ _ = False
324
  let filterLogs n logs = JSArray (filter (logFilter n) (logs >>= fromJSArray))
325
  jobQuery <- handleClassicQuery cfg (Qlang.ItemTypeLuxi Qlang.QRJob)
326
                [Right . fromIntegral $ fromJobId jid] ("oplog" : fields) False
327
  let (rfields, rlogs) = case jobQuery of
328
        Ok (JSArray [JSArray (JSArray logs : answer)]) ->
329
          (answer, filterLogs prev_log logs)
330
        _ -> (map (const JSNull) fields, JSArray [])
331
  logDebug $ "Updates for job " ++ sjid ++ " are " ++ encode (rfields, rlogs)
332
  return (JSArray rfields, rlogs)
333

    
334

    
335
type LuxiConfig = (MVar (), JQStatus, ConfigReader)
336

    
337
luxiExec
338
    :: LuxiConfig
339
    -> LuxiOp
340
    -> IO (Bool, GenericResult GanetiException JSValue)
341
luxiExec (qlock, qstat, creader) args = do
342
  cfg <- creader
343
  result <- handleCallWrapper qlock qstat cfg args
344
  return (True, result)
345

    
346
luxiHandler :: LuxiConfig -> U.Handler LuxiOp JSValue
347
luxiHandler cfg = U.Handler { U.hParse         = decodeLuxiCall
348
                            , U.hInputLogShort = strOfOp
349
                            , U.hInputLogLong  = show
350
                            , U.hExec          = luxiExec cfg
351
                            }
352

    
353
-- | Type alias for prepMain results
354
type PrepResult = (Server, IORef (Result ConfigData), JQStatus)
355

    
356
-- | Check function for luxid.
357
checkMain :: CheckFn ()
358
checkMain _ = return $ Right ()
359

    
360
-- | Prepare function for luxid.
361
prepMain :: PrepFn () PrepResult
362
prepMain _ _ = do
363
  socket_path <- Path.defaultQuerySocket
364
  cleanupSocket socket_path
365
  s <- describeError "binding to the Luxi socket"
366
         Nothing (Just socket_path) $ getLuxiServer True socket_path
367
  cref <- newIORef (Bad "Configuration not yet loaded")
368
  jq <- emptyJQStatus
369
  return (s, cref, jq)
370

    
371
-- | Main function.
372
main :: MainFn () PrepResult
373
main _ _ (server, cref, jq) = do
374
  initConfigReader id cref
375
  let creader = readIORef cref
376
  initJQScheduler jq
377

    
378
  qlockFile <- jobQueueLockFile
379
  lockFile qlockFile >>= exitIfBad "Failed to obtain the job-queue lock"
380
  qlock <- newMVar ()
381

    
382
  finally
383
    (forever $ U.listener (luxiHandler (qlock, jq, creader)) server)
384
    (closeServer server)