Statistics
| Branch: | Tag: | Revision:

root / src / Ganeti / Query / Server.hs @ ac0c5c6d

History | View | Annotate | Download (13.8 kB)

1
{-# LANGUAGE BangPatterns #-}
2

    
3
{-| Implementation of the Ganeti Query2 server.
4

    
5
-}
6

    
7
{-
8

    
9
Copyright (C) 2012, 2013 Google Inc.
10

    
11
This program is free software; you can redistribute it and/or modify
12
it under the terms of the GNU General Public License as published by
13
the Free Software Foundation; either version 2 of the License, or
14
(at your option) any later version.
15

    
16
This program is distributed in the hope that it will be useful, but
17
WITHOUT ANY WARRANTY; without even the implied warranty of
18
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
19
General Public License for more details.
20

    
21
You should have received a copy of the GNU General Public License
22
along with this program; if not, write to the Free Software
23
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
24
02110-1301, USA.
25

    
26
-}
27

    
28
module Ganeti.Query.Server
29
  ( main
30
  , checkMain
31
  , prepMain
32
  ) where
33

    
34
import Control.Applicative
35
import Control.Concurrent
36
import Control.Exception
37
import Control.Monad (forever, when, zipWithM, liftM)
38
import Data.Bits (bitSize)
39
import qualified Data.Set as Set (toList)
40
import Data.IORef
41
import qualified Network.Socket as S
42
import qualified Text.JSON as J
43
import Text.JSON (showJSON, JSValue(..))
44
import System.Info (arch)
45

    
46
import qualified Ganeti.Constants as C
47
import qualified Ganeti.ConstantUtils as ConstantUtils (unFrozenSet)
48
import Ganeti.Errors
49
import qualified Ganeti.Path as Path
50
import Ganeti.Daemon
51
import Ganeti.Objects
52
import qualified Ganeti.Config as Config
53
import Ganeti.ConfigReader
54
import Ganeti.BasicTypes
55
import Ganeti.JQueue
56
import Ganeti.Logging
57
import Ganeti.Luxi
58
import qualified Ganeti.Query.Language as Qlang
59
import qualified Ganeti.Query.Cluster as QCluster
60
import Ganeti.Path (queueDir, jobQueueLockFile)
61
import Ganeti.Query.Query
62
import Ganeti.Query.Filter (makeSimpleFilter)
63
import Ganeti.Types
64
import Ganeti.Utils (lockFile, exitIfBad)
65
import qualified Ganeti.Version as Version
66

    
67
-- | Helper for classic queries.
68
handleClassicQuery :: ConfigData      -- ^ Cluster config
69
                   -> Qlang.ItemType  -- ^ Query type
70
                   -> [Either String Integer] -- ^ Requested names
71
                                              -- (empty means all)
72
                   -> [String]        -- ^ Requested fields
73
                   -> Bool            -- ^ Whether to do sync queries or not
74
                   -> IO (GenericResult GanetiException JSValue)
75
handleClassicQuery _ _ _ _ True =
76
  return . Bad $ OpPrereqError "Sync queries are not allowed" ECodeInval
77
handleClassicQuery cfg qkind names fields _ = do
78
  let flt = makeSimpleFilter (nameField qkind) names
79
  qr <- query cfg True (Qlang.Query qkind fields flt)
80
  return $ showJSON <$> (qr >>= queryCompat)
81

    
82
-- | Minimal wrapper to handle the missing config case.
83
handleCallWrapper :: MVar () -> Result ConfigData 
84
                     -> LuxiOp -> IO (ErrorResult JSValue)
85
handleCallWrapper _ (Bad msg) _ =
86
  return . Bad . ConfigurationError $
87
           "I do not have access to a valid configuration, cannot\
88
           \ process queries: " ++ msg
89
handleCallWrapper qlock (Ok config) op = handleCall qlock config op
90

    
91
-- | Actual luxi operation handler.
92
handleCall :: MVar () -> ConfigData -> LuxiOp -> IO (ErrorResult JSValue)
93
handleCall _ cdata QueryClusterInfo =
94
  let cluster = configCluster cdata
95
      master = QCluster.clusterMasterNodeName cdata
96
      hypervisors = clusterEnabledHypervisors cluster
97
      diskTemplates = clusterEnabledDiskTemplates cluster
98
      def_hv = case hypervisors of
99
                 x:_ -> showJSON x
100
                 [] -> JSNull
101
      bits = show (bitSize (0::Int)) ++ "bits"
102
      arch_tuple = [bits, arch]
103
      obj = [ ("software_version", showJSON C.releaseVersion)
104
            , ("protocol_version", showJSON C.protocolVersion)
105
            , ("config_version", showJSON C.configVersion)
106
            , ("os_api_version", showJSON . maximum .
107
                                 Set.toList . ConstantUtils.unFrozenSet $
108
                                 C.osApiVersions)
109
            , ("export_version", showJSON C.exportVersion)
110
            , ("vcs_version", showJSON Version.version)
111
            , ("architecture", showJSON arch_tuple)
112
            , ("name", showJSON $ clusterClusterName cluster)
113
            , ("master", showJSON (case master of
114
                                     Ok name -> name
115
                                     _ -> undefined))
116
            , ("default_hypervisor", def_hv)
117
            , ("enabled_hypervisors", showJSON hypervisors)
118
            , ("hvparams", showJSON $ clusterHvparams cluster)
119
            , ("os_hvp", showJSON $ clusterOsHvp cluster)
120
            , ("beparams", showJSON $ clusterBeparams cluster)
121
            , ("osparams", showJSON $ clusterOsparams cluster)
122
            , ("ipolicy", showJSON $ clusterIpolicy cluster)
123
            , ("nicparams", showJSON $ clusterNicparams cluster)
124
            , ("ndparams", showJSON $ clusterNdparams cluster)
125
            , ("diskparams", showJSON $ clusterDiskparams cluster)
126
            , ("candidate_pool_size",
127
               showJSON $ clusterCandidatePoolSize cluster)
128
            , ("master_netdev",  showJSON $ clusterMasterNetdev cluster)
129
            , ("master_netmask", showJSON $ clusterMasterNetmask cluster)
130
            , ("use_external_mip_script",
131
               showJSON $ clusterUseExternalMipScript cluster)
132
            , ("volume_group_name",
133
               maybe JSNull showJSON (clusterVolumeGroupName cluster))
134
            , ("drbd_usermode_helper",
135
               maybe JSNull showJSON (clusterDrbdUsermodeHelper cluster))
136
            , ("file_storage_dir", showJSON $ clusterFileStorageDir cluster)
137
            , ("shared_file_storage_dir",
138
               showJSON $ clusterSharedFileStorageDir cluster)
139
            , ("maintain_node_health",
140
               showJSON $ clusterMaintainNodeHealth cluster)
141
            , ("ctime", showJSON $ clusterCtime cluster)
142
            , ("mtime", showJSON $ clusterMtime cluster)
143
            , ("uuid", showJSON $ clusterUuid cluster)
144
            , ("tags", showJSON $ clusterTags cluster)
145
            , ("uid_pool", showJSON $ clusterUidPool cluster)
146
            , ("default_iallocator",
147
               showJSON $ clusterDefaultIallocator cluster)
148
            , ("default_iallocator_params",
149
              showJSON $ clusterDefaultIallocatorParams cluster)
150
            , ("reserved_lvs", showJSON $ clusterReservedLvs cluster)
151
            , ("primary_ip_version",
152
               showJSON . ipFamilyToVersion $ clusterPrimaryIpFamily cluster)
153
            , ("prealloc_wipe_disks",
154
               showJSON $ clusterPreallocWipeDisks cluster)
155
            , ("hidden_os", showJSON $ clusterHiddenOs cluster)
156
            , ("blacklisted_os", showJSON $ clusterBlacklistedOs cluster)
157
            , ("enabled_disk_templates", showJSON diskTemplates)
158
            ]
159

    
160
  in case master of
161
    Ok _ -> return . Ok . J.makeObj $ obj
162
    Bad ex -> return $ Bad ex
163

    
164
handleCall _ cfg (QueryTags kind name) = do
165
  let tags = case kind of
166
               TagKindCluster  -> Ok . clusterTags $ configCluster cfg
167
               TagKindGroup    -> groupTags <$> Config.getGroup    cfg name
168
               TagKindNode     -> nodeTags  <$> Config.getNode     cfg name
169
               TagKindInstance -> instTags  <$> Config.getInstance cfg name
170
               TagKindNetwork  -> Bad $ OpPrereqError
171
                                        "Network tag is not allowed"
172
                                        ECodeInval
173
  return (J.showJSON <$> tags)
174

    
175
handleCall _ cfg (Query qkind qfields qfilter) = do
176
  result <- query cfg True (Qlang.Query qkind qfields qfilter)
177
  return $ J.showJSON <$> result
178

    
179
handleCall _ _ (QueryFields qkind qfields) = do
180
  let result = queryFields (Qlang.QueryFields qkind qfields)
181
  return $ J.showJSON <$> result
182

    
183
handleCall _ cfg (QueryNodes names fields lock) =
184
  handleClassicQuery cfg (Qlang.ItemTypeOpCode Qlang.QRNode)
185
    (map Left names) fields lock
186

    
187
handleCall _ cfg (QueryInstances names fields lock) =
188
  handleClassicQuery cfg (Qlang.ItemTypeOpCode Qlang.QRInstance)
189
    (map Left names) fields lock
190

    
191
handleCall _ cfg (QueryGroups names fields lock) =
192
  handleClassicQuery cfg (Qlang.ItemTypeOpCode Qlang.QRGroup)
193
    (map Left names) fields lock
194

    
195
handleCall _ cfg (QueryJobs names fields) =
196
  handleClassicQuery cfg (Qlang.ItemTypeLuxi Qlang.QRJob)
197
    (map (Right . fromIntegral . fromJobId) names)  fields False
198

    
199
handleCall _ cfg (QueryNetworks names fields lock) =
200
  handleClassicQuery cfg (Qlang.ItemTypeOpCode Qlang.QRNetwork)
201
    (map Left names) fields lock
202

    
203
handleCall qlock cfg (SubmitJobToDrainedQueue ops) =
204
  do
205
    let mcs = Config.getMasterCandidates cfg
206
    jobid <- allocateJobId mcs qlock
207
    case jobid of
208
      Bad s -> return . Bad . GenericError $ s
209
      Ok jid -> do
210
        ts <- currentTimestamp
211
        job <- liftM (setReceivedTimestamp ts)
212
                 $ queuedJobFromOpCodes jid ops
213
        qDir <- queueDir
214
        write_result <- writeJobToDisk qDir job
215
        case write_result of
216
          Bad s -> return . Bad . GenericError $ s
217
          Ok () -> do
218
            _ <- replicateManyJobs qDir mcs [job]
219
            _ <- forkIO $ startJobs [job]
220
            return . Ok . showJSON . fromJobId $ jid
221

    
222
handleCall qlock cfg (SubmitJob ops) =
223
  do
224
    open <- isQueueOpen
225
    if not open
226
       then return . Bad . GenericError $ "Queue drained"
227
       else handleCall qlock cfg (SubmitJobToDrainedQueue ops)
228

    
229
handleCall qlock cfg (SubmitManyJobs lops) =
230
  do
231
    open <- isQueueOpen
232
    if not open
233
      then return . Bad . GenericError $ "Queue drained"
234
      else do
235
        let mcs = Config.getMasterCandidates cfg
236
        result_jobids <- allocateJobIds mcs qlock (length lops)
237
        case result_jobids of
238
          Bad s -> return . Bad . GenericError $ s
239
          Ok jids -> do
240
            ts <- currentTimestamp
241
            jobs <- liftM (map $ setReceivedTimestamp ts)
242
                      $ zipWithM queuedJobFromOpCodes jids lops
243
            qDir <- queueDir
244
            write_results <- mapM (writeJobToDisk qDir) jobs
245
            let annotated_results = zip write_results jobs
246
                succeeded = map snd $ filter (isOk . fst) annotated_results
247
            when (any isBad write_results) . logWarning
248
              $ "Writing some jobs failed " ++ show annotated_results
249
            replicateManyJobs qDir mcs succeeded
250
            _ <- forkIO $ startJobs succeeded
251
            return . Ok . JSArray
252
              . map (\(res, job) ->
253
                      if isOk res
254
                        then showJSON (True, fromJobId $ qjId job)
255
                        else showJSON (False, genericResult id (const "") res))
256
              $ annotated_results
257

    
258
handleCall _ _ op =
259
  return . Bad $
260
    GenericError ("Luxi call '" ++ strOfOp op ++ "' not implemented")
261

    
262
{-# ANN handleCall "HLint: ignore Too strict if" #-}
263

    
264
-- | Given a decoded luxi request, executes it and sends the luxi
265
-- response back to the client.
266
handleClientMsg :: MVar () -> Client -> ConfigReader -> LuxiOp -> IO Bool
267
handleClientMsg qlock client creader args = do
268
  cfg <- creader
269
  logDebug $ "Request: " ++ show args
270
  call_result <- handleCallWrapper qlock cfg args
271
  (!status, !rval) <-
272
    case call_result of
273
      Bad err -> do
274
        logWarning $ "Failed to execute request " ++ show args ++ ": "
275
                     ++ show err
276
        return (False, showJSON err)
277
      Ok result -> do
278
        -- only log the first 2,000 chars of the result
279
        logDebug $ "Result (truncated): " ++ take 2000 (J.encode result)
280
        logInfo $ "Successfully handled " ++ strOfOp args
281
        return (True, result)
282
  sendMsg client $ buildResponse status rval
283
  return True
284

    
285
-- | Handles one iteration of the client protocol: receives message,
286
-- checks it for validity and decodes it, returns response.
287
handleClient :: MVar () -> Client -> ConfigReader -> IO Bool
288
handleClient qlock client creader = do
289
  !msg <- recvMsgExt client
290
  logDebug $ "Received message: " ++ show msg
291
  case msg of
292
    RecvConnClosed -> logDebug "Connection closed" >> return False
293
    RecvError err -> logWarning ("Error during message receiving: " ++ err) >>
294
                     return False
295
    RecvOk payload ->
296
      case validateCall payload >>= decodeCall of
297
        Bad err -> do
298
             let errmsg = "Failed to parse request: " ++ err
299
             logWarning errmsg
300
             sendMsg client $ buildResponse False (showJSON errmsg)
301
             return False
302
        Ok args -> handleClientMsg qlock client creader args
303

    
304
-- | Main client loop: runs one loop of 'handleClient', and if that
305
-- doesn't report a finished (closed) connection, restarts itself.
306
clientLoop :: MVar () -> Client -> ConfigReader -> IO ()
307
clientLoop qlock client creader = do
308
  result <- handleClient qlock client creader
309
  if result
310
    then clientLoop qlock client creader
311
    else closeClient client
312

    
313
-- | Main listener loop: accepts clients, forks an I/O thread to handle
314
-- that client.
315
listener :: MVar () -> ConfigReader -> S.Socket -> IO ()
316
listener qlock creader socket = do
317
  client <- acceptClient socket
318
  _ <- forkIO $ clientLoop qlock client creader
319
  return ()
320

    
321
-- | Type alias for prepMain results
322
type PrepResult = (FilePath, S.Socket, IORef (Result ConfigData))
323

    
324
-- | Check function for luxid.
325
checkMain :: CheckFn ()
326
checkMain _ = return $ Right ()
327

    
328
-- | Prepare function for luxid.
329
prepMain :: PrepFn () PrepResult
330
prepMain _ _ = do
331
  socket_path <- Path.defaultQuerySocket
332
  cleanupSocket socket_path
333
  s <- describeError "binding to the Luxi socket"
334
         Nothing (Just socket_path) $ getServer True socket_path
335
  cref <- newIORef (Bad "Configuration not yet loaded")
336
  return (socket_path, s, cref)
337

    
338
-- | Main function.
339
main :: MainFn () PrepResult
340
main _ _ (socket_path, server, cref) = do
341
  initConfigReader id cref
342
  let creader = readIORef cref
343
  
344
  qlockFile <- jobQueueLockFile
345
  lockFile qlockFile >>= exitIfBad "Failed to obtain the job-queue lock"
346
  qlock <- newMVar ()
347

    
348
  finally
349
    (forever $ listener qlock creader server)
350
    (closeServer socket_path server)