Statistics
| Branch: | Tag: | Revision:

root / src / Ganeti / Query / Server.hs @ d3e6fd0e

History | View | Annotate | Download (15.2 kB)

1
{-| Implementation of the Ganeti Query2 server.
2

    
3
-}
4

    
5
{-
6

    
7
Copyright (C) 2012, 2013 Google Inc.
8

    
9
This program is free software; you can redistribute it and/or modify
10
it under the terms of the GNU General Public License as published by
11
the Free Software Foundation; either version 2 of the License, or
12
(at your option) any later version.
13

    
14
This program is distributed in the hope that it will be useful, but
15
WITHOUT ANY WARRANTY; without even the implied warranty of
16
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
17
General Public License for more details.
18

    
19
You should have received a copy of the GNU General Public License
20
along with this program; if not, write to the Free Software
21
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22
02110-1301, USA.
23

    
24
-}
25

    
26
module Ganeti.Query.Server
27
  ( main
28
  , checkMain
29
  , prepMain
30
  ) where
31

    
32
import Control.Applicative
33
import Control.Concurrent
34
import Control.Exception
35
import Control.Monad (forever, when, zipWithM, liftM)
36
import Data.Bits (bitSize)
37
import qualified Data.Set as Set (toList)
38
import Data.IORef
39
import Data.Maybe (fromMaybe)
40
import qualified Text.JSON as J
41
import Text.JSON (encode, showJSON, JSValue(..))
42
import System.Info (arch)
43

    
44
import qualified Ganeti.Constants as C
45
import qualified Ganeti.ConstantUtils as ConstantUtils (unFrozenSet)
46
import Ganeti.Errors
47
import qualified Ganeti.Path as Path
48
import Ganeti.Daemon
49
import Ganeti.Objects
50
import qualified Ganeti.Config as Config
51
import Ganeti.ConfigReader
52
import Ganeti.BasicTypes
53
import Ganeti.JQueue
54
import Ganeti.JQScheduler
55
import Ganeti.Logging
56
import Ganeti.Luxi
57
import qualified Ganeti.Query.Language as Qlang
58
import qualified Ganeti.Query.Cluster as QCluster
59
import Ganeti.Path (queueDir, jobQueueLockFile)
60
import Ganeti.Rpc
61
import Ganeti.Query.Query
62
import Ganeti.Query.Filter (makeSimpleFilter)
63
import Ganeti.Types
64
import qualified Ganeti.UDSServer as U (Handler(..), listener)
65
import Ganeti.Utils (lockFile, exitIfBad, watchFile)
66
import qualified Ganeti.Version as Version
67

    
68
-- | Helper for classic queries.
69
handleClassicQuery :: ConfigData      -- ^ Cluster config
70
                   -> Qlang.ItemType  -- ^ Query type
71
                   -> [Either String Integer] -- ^ Requested names
72
                                              -- (empty means all)
73
                   -> [String]        -- ^ Requested fields
74
                   -> Bool            -- ^ Whether to do sync queries or not
75
                   -> IO (GenericResult GanetiException JSValue)
76
handleClassicQuery _ _ _ _ True =
77
  return . Bad $ OpPrereqError "Sync queries are not allowed" ECodeInval
78
handleClassicQuery cfg qkind names fields _ = do
79
  let flt = makeSimpleFilter (nameField qkind) names
80
  qr <- query cfg True (Qlang.Query qkind fields flt)
81
  return $ showJSON <$> (qr >>= queryCompat)
82

    
83
-- | Minimal wrapper to handle the missing config case.
84
handleCallWrapper :: MVar () -> JQStatus ->  Result ConfigData 
85
                     -> LuxiOp -> IO (ErrorResult JSValue)
86
handleCallWrapper _ _ (Bad msg) _ =
87
  return . Bad . ConfigurationError $
88
           "I do not have access to a valid configuration, cannot\
89
           \ process queries: " ++ msg
90
handleCallWrapper qlock qstat (Ok config) op = handleCall qlock qstat config op
91

    
92
-- | Actual luxi operation handler.
93
handleCall :: MVar () -> JQStatus 
94
              -> ConfigData -> LuxiOp -> IO (ErrorResult JSValue)
95
handleCall _ _ cdata QueryClusterInfo =
96
  let cluster = configCluster cdata
97
      master = QCluster.clusterMasterNodeName cdata
98
      hypervisors = clusterEnabledHypervisors cluster
99
      diskTemplates = clusterEnabledDiskTemplates cluster
100
      def_hv = case hypervisors of
101
                 x:_ -> showJSON x
102
                 [] -> JSNull
103
      bits = show (bitSize (0::Int)) ++ "bits"
104
      arch_tuple = [bits, arch]
105
      obj = [ ("software_version", showJSON C.releaseVersion)
106
            , ("protocol_version", showJSON C.protocolVersion)
107
            , ("config_version", showJSON C.configVersion)
108
            , ("os_api_version", showJSON . maximum .
109
                                 Set.toList . ConstantUtils.unFrozenSet $
110
                                 C.osApiVersions)
111
            , ("export_version", showJSON C.exportVersion)
112
            , ("vcs_version", showJSON Version.version)
113
            , ("architecture", showJSON arch_tuple)
114
            , ("name", showJSON $ clusterClusterName cluster)
115
            , ("master", showJSON (case master of
116
                                     Ok name -> name
117
                                     _ -> undefined))
118
            , ("default_hypervisor", def_hv)
119
            , ("enabled_hypervisors", showJSON hypervisors)
120
            , ("hvparams", showJSON $ clusterHvparams cluster)
121
            , ("os_hvp", showJSON $ clusterOsHvp cluster)
122
            , ("beparams", showJSON $ clusterBeparams cluster)
123
            , ("osparams", showJSON $ clusterOsparams cluster)
124
            , ("ipolicy", showJSON $ clusterIpolicy cluster)
125
            , ("nicparams", showJSON $ clusterNicparams cluster)
126
            , ("ndparams", showJSON $ clusterNdparams cluster)
127
            , ("diskparams", showJSON $ clusterDiskparams cluster)
128
            , ("candidate_pool_size",
129
               showJSON $ clusterCandidatePoolSize cluster)
130
            , ("master_netdev",  showJSON $ clusterMasterNetdev cluster)
131
            , ("master_netmask", showJSON $ clusterMasterNetmask cluster)
132
            , ("use_external_mip_script",
133
               showJSON $ clusterUseExternalMipScript cluster)
134
            , ("volume_group_name",
135
               maybe JSNull showJSON (clusterVolumeGroupName cluster))
136
            , ("drbd_usermode_helper",
137
               maybe JSNull showJSON (clusterDrbdUsermodeHelper cluster))
138
            , ("file_storage_dir", showJSON $ clusterFileStorageDir cluster)
139
            , ("shared_file_storage_dir",
140
               showJSON $ clusterSharedFileStorageDir cluster)
141
            , ("gluster_storage_dir",
142
               showJSON $ clusterGlusterStorageDir cluster)
143
            , ("maintain_node_health",
144
               showJSON $ clusterMaintainNodeHealth cluster)
145
            , ("ctime", showJSON $ clusterCtime cluster)
146
            , ("mtime", showJSON $ clusterMtime cluster)
147
            , ("uuid", showJSON $ clusterUuid cluster)
148
            , ("tags", showJSON $ clusterTags cluster)
149
            , ("uid_pool", showJSON $ clusterUidPool cluster)
150
            , ("default_iallocator",
151
               showJSON $ clusterDefaultIallocator cluster)
152
            , ("default_iallocator_params",
153
              showJSON $ clusterDefaultIallocatorParams cluster)
154
            , ("reserved_lvs", showJSON $ clusterReservedLvs cluster)
155
            , ("primary_ip_version",
156
               showJSON . ipFamilyToVersion $ clusterPrimaryIpFamily cluster)
157
            , ("prealloc_wipe_disks",
158
               showJSON $ clusterPreallocWipeDisks cluster)
159
            , ("hidden_os", showJSON $ clusterHiddenOs cluster)
160
            , ("blacklisted_os", showJSON $ clusterBlacklistedOs cluster)
161
            , ("enabled_disk_templates", showJSON diskTemplates)
162
            ]
163

    
164
  in case master of
165
    Ok _ -> return . Ok . J.makeObj $ obj
166
    Bad ex -> return $ Bad ex
167

    
168
handleCall _ _ cfg (QueryTags kind name) = do
169
  let tags = case kind of
170
               TagKindCluster  -> Ok . clusterTags $ configCluster cfg
171
               TagKindGroup    -> groupTags <$> Config.getGroup    cfg name
172
               TagKindNode     -> nodeTags  <$> Config.getNode     cfg name
173
               TagKindInstance -> instTags  <$> Config.getInstance cfg name
174
               TagKindNetwork  -> Bad $ OpPrereqError
175
                                        "Network tag is not allowed"
176
                                        ECodeInval
177
  return (J.showJSON <$> tags)
178

    
179
handleCall _ _ cfg (Query qkind qfields qfilter) = do
180
  result <- query cfg True (Qlang.Query qkind qfields qfilter)
181
  return $ J.showJSON <$> result
182

    
183
handleCall _ _ _ (QueryFields qkind qfields) = do
184
  let result = queryFields (Qlang.QueryFields qkind qfields)
185
  return $ J.showJSON <$> result
186

    
187
handleCall _ _ cfg (QueryNodes names fields lock) =
188
  handleClassicQuery cfg (Qlang.ItemTypeOpCode Qlang.QRNode)
189
    (map Left names) fields lock
190

    
191
handleCall _ _ cfg (QueryInstances names fields lock) =
192
  handleClassicQuery cfg (Qlang.ItemTypeOpCode Qlang.QRInstance)
193
    (map Left names) fields lock
194

    
195
handleCall _ _ cfg (QueryGroups names fields lock) =
196
  handleClassicQuery cfg (Qlang.ItemTypeOpCode Qlang.QRGroup)
197
    (map Left names) fields lock
198

    
199
handleCall _ _ cfg (QueryJobs names fields) =
200
  handleClassicQuery cfg (Qlang.ItemTypeLuxi Qlang.QRJob)
201
    (map (Right . fromIntegral . fromJobId) names)  fields False
202

    
203
handleCall _ _ cfg (QueryNetworks names fields lock) =
204
  handleClassicQuery cfg (Qlang.ItemTypeOpCode Qlang.QRNetwork)
205
    (map Left names) fields lock
206

    
207
handleCall _ _ cfg (QueryConfigValues fields) = do
208
  let params = [ ("cluster_name", return . showJSON . clusterClusterName
209
                                    . configCluster $ cfg)
210
               , ("watcher_pause", liftM (maybe JSNull showJSON)
211
                                     QCluster.isWatcherPaused)
212
               , ("master_node", return . genericResult (const JSNull) showJSON
213
                                   $ QCluster.clusterMasterNodeName cfg)
214
               , ("drain_flag", liftM showJSON isQueueOpen) 
215
               ] :: [(String, IO JSValue)]
216
  let answer = map (fromMaybe (return JSNull) . flip lookup params) fields
217
  answerEval <- sequence answer
218
  return . Ok . showJSON $ answerEval
219

    
220
handleCall qlock qstat cfg (SubmitJobToDrainedQueue ops) =
221
  do
222
    let mcs = Config.getMasterCandidates cfg
223
    jobid <- allocateJobId mcs qlock
224
    case jobid of
225
      Bad s -> return . Bad . GenericError $ s
226
      Ok jid -> do
227
        ts <- currentTimestamp
228
        job <- liftM (setReceivedTimestamp ts)
229
                 $ queuedJobFromOpCodes jid ops
230
        qDir <- queueDir
231
        write_result <- writeJobToDisk qDir job
232
        case write_result of
233
          Bad s -> return . Bad . GenericError $ s
234
          Ok () -> do
235
            _ <- replicateManyJobs qDir mcs [job]
236
            _ <- forkIO $ enqueueNewJobs qstat [job]
237
            return . Ok . showJSON . fromJobId $ jid
238

    
239
handleCall qlock qstat cfg (SubmitJob ops) =
240
  do
241
    open <- isQueueOpen
242
    if not open
243
       then return . Bad . GenericError $ "Queue drained"
244
       else handleCall qlock qstat cfg (SubmitJobToDrainedQueue ops)
245

    
246
handleCall qlock qstat cfg (SubmitManyJobs lops) =
247
  do
248
    open <- isQueueOpen
249
    if not open
250
      then return . Bad . GenericError $ "Queue drained"
251
      else do
252
        let mcs = Config.getMasterCandidates cfg
253
        result_jobids <- allocateJobIds mcs qlock (length lops)
254
        case result_jobids of
255
          Bad s -> return . Bad . GenericError $ s
256
          Ok jids -> do
257
            ts <- currentTimestamp
258
            jobs <- liftM (map $ setReceivedTimestamp ts)
259
                      $ zipWithM queuedJobFromOpCodes jids lops
260
            qDir <- queueDir
261
            write_results <- mapM (writeJobToDisk qDir) jobs
262
            let annotated_results = zip write_results jobs
263
                succeeded = map snd $ filter (isOk . fst) annotated_results
264
            when (any isBad write_results) . logWarning
265
              $ "Writing some jobs failed " ++ show annotated_results
266
            replicateManyJobs qDir mcs succeeded
267
            _ <- forkIO $ enqueueNewJobs qstat succeeded
268
            return . Ok . JSArray
269
              . map (\(res, job) ->
270
                      if isOk res
271
                        then showJSON (True, fromJobId $ qjId job)
272
                        else showJSON (False, genericResult id (const "") res))
273
              $ annotated_results
274

    
275
handleCall _ _ cfg (WaitForJobChange jid fields prev_job prev_log tmout) = do
276
  let compute_fn = computeJobUpdate cfg jid fields prev_log 
277
  qDir <- queueDir
278
  -- verify if the job is finalized, and return immediately in this case
279
  jobresult <- loadJobFromDisk qDir False jid
280
  case jobresult of
281
    Ok (job, _) | not (jobFinalized job) -> do
282
      let jobfile = liveJobFile qDir jid
283
      answer <- watchFile jobfile (min tmout C.luxiWfjcTimeout)
284
                  (prev_job, JSArray []) compute_fn
285
      return . Ok $ showJSON answer
286
    _ -> liftM (Ok . showJSON) compute_fn
287

    
288
handleCall _ _ cfg (SetWatcherPause time) = do
289
  let mcs = Config.getMasterCandidates cfg
290
      masters = genericResult (const []) return
291
                  . Config.getNode cfg . clusterMasterNode
292
                  $ configCluster cfg
293
  _ <- executeRpcCall (masters ++ mcs) $ RpcCallSetWatcherPause time
294
  return . Ok . maybe JSNull showJSON $ time
295

    
296
handleCall _ _ _ op =
297
  return . Bad $
298
    GenericError ("Luxi call '" ++ strOfOp op ++ "' not implemented")
299

    
300
{-# ANN handleCall "HLint: ignore Too strict if" #-}
301

    
302
-- | Query the status of a job and return the requested fields
303
-- and the logs newer than the given log number.
304
computeJobUpdate :: ConfigData -> JobId -> [String] -> JSValue 
305
                    -> IO (JSValue, JSValue)
306
computeJobUpdate cfg jid fields prev_log = do
307
  let sjid = show $ fromJobId jid
308
  logDebug $ "Inspecting fields " ++ show fields ++ " of job " ++ sjid
309
  let fromJSArray (JSArray xs) = xs
310
      fromJSArray _ = []
311
  let logFilter JSNull (JSArray _) = True
312
      logFilter (JSRational _ n) (JSArray (JSRational _ m:_)) = n < m
313
      logFilter _ _ = False
314
  let filterLogs n logs = JSArray (filter (logFilter n) (logs >>= fromJSArray))
315
  jobQuery <- handleClassicQuery cfg (Qlang.ItemTypeLuxi Qlang.QRJob)
316
                [Right . fromIntegral $ fromJobId jid] ("oplog" : fields) False
317
  let (rfields, rlogs) = case jobQuery of
318
        Ok (JSArray [JSArray (JSArray logs : answer)]) ->
319
          (answer, filterLogs prev_log logs)
320
        _ -> (map (const JSNull) fields, JSArray [])
321
  logDebug $ "Updates for job " ++ sjid ++ " are " ++ encode (rfields, rlogs)
322
  return (JSArray rfields, rlogs)
323

    
324

    
325
type LuxiConfig = (MVar (), JQStatus, ConfigReader)
326

    
327
luxiExec
328
    :: LuxiConfig
329
    -> LuxiOp
330
    -> IO (Bool, GenericResult GanetiException JSValue)
331
luxiExec (qlock, qstat, creader) args = do
332
  cfg <- creader
333
  result <- handleCallWrapper qlock qstat cfg args
334
  return (True, result)
335

    
336
luxiHandler :: LuxiConfig -> U.Handler LuxiOp JSValue
337
luxiHandler cfg = U.Handler { U.hParse         = decodeLuxiCall
338
                            , U.hInputLogShort = strOfOp
339
                            , U.hInputLogLong  = show
340
                            , U.hExec          = luxiExec cfg
341
                            }
342

    
343

    
344
-- | Type alias for prepMain results
345
type PrepResult = (Server, IORef (Result ConfigData), JQStatus)
346

    
347
-- | Check function for luxid.
348
checkMain :: CheckFn ()
349
checkMain _ = return $ Right ()
350

    
351
-- | Prepare function for luxid.
352
prepMain :: PrepFn () PrepResult
353
prepMain _ _ = do
354
  socket_path <- Path.defaultQuerySocket
355
  cleanupSocket socket_path
356
  s <- describeError "binding to the Luxi socket"
357
         Nothing (Just socket_path) $ getLuxiServer True socket_path
358
  cref <- newIORef (Bad "Configuration not yet loaded")
359
  jq <- emptyJQStatus 
360
  return (s, cref, jq)
361

    
362
-- | Main function.
363
main :: MainFn () PrepResult
364
main _ _ (server, cref, jq) = do
365
  initConfigReader id cref
366
  let creader = readIORef cref
367
  initJQScheduler jq
368
  
369
  qlockFile <- jobQueueLockFile
370
  lockFile qlockFile >>= exitIfBad "Failed to obtain the job-queue lock"
371
  qlock <- newMVar ()
372

    
373
  finally
374
    (forever $ U.listener (luxiHandler (qlock, jq, creader)) server)
375
    (closeServer server)