Statistics
| Branch: | Tag: | Revision:

root / src / Ganeti / Query / Server.hs @ c6013594

History | View | Annotate | Download (13.7 kB)

1
{-# LANGUAGE BangPatterns #-}
2

    
3
{-| Implementation of the Ganeti Query2 server.
4

    
5
-}
6

    
7
{-
8

    
9
Copyright (C) 2012, 2013 Google Inc.
10

    
11
This program is free software; you can redistribute it and/or modify
12
it under the terms of the GNU General Public License as published by
13
the Free Software Foundation; either version 2 of the License, or
14
(at your option) any later version.
15

    
16
This program is distributed in the hope that it will be useful, but
17
WITHOUT ANY WARRANTY; without even the implied warranty of
18
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
19
General Public License for more details.
20

    
21
You should have received a copy of the GNU General Public License
22
along with this program; if not, write to the Free Software
23
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
24
02110-1301, USA.
25

    
26
-}
27

    
28
module Ganeti.Query.Server
29
  ( main
30
  , checkMain
31
  , prepMain
32
  ) where
33

    
34
import Control.Applicative
35
import Control.Concurrent
36
import Control.Exception
37
import Control.Monad (forever, when, zipWithM, liftM)
38
import Data.Bits (bitSize)
39
import qualified Data.Set as Set (toList)
40
import Data.IORef
41
import qualified Network.Socket as S
42
import qualified Text.JSON as J
43
import Text.JSON (showJSON, JSValue(..))
44
import System.Info (arch)
45

    
46
import qualified Ganeti.Constants as C
47
import qualified Ganeti.ConstantUtils as ConstantUtils (unFrozenSet)
48
import Ganeti.Errors
49
import qualified Ganeti.Path as Path
50
import Ganeti.Daemon
51
import Ganeti.Objects
52
import qualified Ganeti.Config as Config
53
import Ganeti.ConfigReader
54
import Ganeti.BasicTypes
55
import Ganeti.JQueue
56
import Ganeti.Logging
57
import Ganeti.Luxi
58
import qualified Ganeti.Query.Language as Qlang
59
import qualified Ganeti.Query.Cluster as QCluster
60
import Ganeti.Path (queueDir, jobQueueLockFile)
61
import Ganeti.Query.Query
62
import Ganeti.Query.Filter (makeSimpleFilter)
63
import Ganeti.Types
64
import Ganeti.Utils (lockFile, exitIfBad)
65
import qualified Ganeti.Version as Version
66

    
67
-- | Helper for classic queries.
68
handleClassicQuery :: ConfigData      -- ^ Cluster config
69
                   -> Qlang.ItemType  -- ^ Query type
70
                   -> [Either String Integer] -- ^ Requested names
71
                                              -- (empty means all)
72
                   -> [String]        -- ^ Requested fields
73
                   -> Bool            -- ^ Whether to do sync queries or not
74
                   -> IO (GenericResult GanetiException JSValue)
75
handleClassicQuery _ _ _ _ True =
76
  return . Bad $ OpPrereqError "Sync queries are not allowed" ECodeInval
77
handleClassicQuery cfg qkind names fields _ = do
78
  let flt = makeSimpleFilter (nameField qkind) names
79
  qr <- query cfg True (Qlang.Query qkind fields flt)
80
  return $ showJSON <$> (qr >>= queryCompat)
81

    
82
-- | Minimal wrapper to handle the missing config case.
83
handleCallWrapper :: MVar () -> Result ConfigData 
84
                     -> LuxiOp -> IO (ErrorResult JSValue)
85
handleCallWrapper _ (Bad msg) _ =
86
  return . Bad . ConfigurationError $
87
           "I do not have access to a valid configuration, cannot\
88
           \ process queries: " ++ msg
89
handleCallWrapper qlock (Ok config) op = handleCall qlock config op
90

    
91
-- | Actual luxi operation handler.
92
handleCall :: MVar () -> ConfigData -> LuxiOp -> IO (ErrorResult JSValue)
93
handleCall _ cdata QueryClusterInfo =
94
  let cluster = configCluster cdata
95
      master = QCluster.clusterMasterNodeName cdata
96
      hypervisors = clusterEnabledHypervisors cluster
97
      diskTemplates = clusterEnabledDiskTemplates cluster
98
      def_hv = case hypervisors of
99
                 x:_ -> showJSON x
100
                 [] -> JSNull
101
      bits = show (bitSize (0::Int)) ++ "bits"
102
      arch_tuple = [bits, arch]
103
      obj = [ ("software_version", showJSON C.releaseVersion)
104
            , ("protocol_version", showJSON C.protocolVersion)
105
            , ("config_version", showJSON C.configVersion)
106
            , ("os_api_version", showJSON . maximum .
107
                                 Set.toList . ConstantUtils.unFrozenSet $
108
                                 C.osApiVersions)
109
            , ("export_version", showJSON C.exportVersion)
110
            , ("vcs_version", showJSON Version.version)
111
            , ("architecture", showJSON arch_tuple)
112
            , ("name", showJSON $ clusterClusterName cluster)
113
            , ("master", showJSON (case master of
114
                                     Ok name -> name
115
                                     _ -> undefined))
116
            , ("default_hypervisor", def_hv)
117
            , ("enabled_hypervisors", showJSON hypervisors)
118
            , ("hvparams", showJSON $ clusterHvparams cluster)
119
            , ("os_hvp", showJSON $ clusterOsHvp cluster)
120
            , ("beparams", showJSON $ clusterBeparams cluster)
121
            , ("osparams", showJSON $ clusterOsparams cluster)
122
            , ("ipolicy", showJSON $ clusterIpolicy cluster)
123
            , ("nicparams", showJSON $ clusterNicparams cluster)
124
            , ("ndparams", showJSON $ clusterNdparams cluster)
125
            , ("diskparams", showJSON $ clusterDiskparams cluster)
126
            , ("candidate_pool_size",
127
               showJSON $ clusterCandidatePoolSize cluster)
128
            , ("master_netdev",  showJSON $ clusterMasterNetdev cluster)
129
            , ("master_netmask", showJSON $ clusterMasterNetmask cluster)
130
            , ("use_external_mip_script",
131
               showJSON $ clusterUseExternalMipScript cluster)
132
            , ("volume_group_name",
133
               maybe JSNull showJSON (clusterVolumeGroupName cluster))
134
            , ("drbd_usermode_helper",
135
               maybe JSNull showJSON (clusterDrbdUsermodeHelper cluster))
136
            , ("file_storage_dir", showJSON $ clusterFileStorageDir cluster)
137
            , ("shared_file_storage_dir",
138
               showJSON $ clusterSharedFileStorageDir cluster)
139
            , ("maintain_node_health",
140
               showJSON $ clusterMaintainNodeHealth cluster)
141
            , ("ctime", showJSON $ clusterCtime cluster)
142
            , ("mtime", showJSON $ clusterMtime cluster)
143
            , ("uuid", showJSON $ clusterUuid cluster)
144
            , ("tags", showJSON $ clusterTags cluster)
145
            , ("uid_pool", showJSON $ clusterUidPool cluster)
146
            , ("default_iallocator",
147
               showJSON $ clusterDefaultIallocator cluster)
148
            , ("reserved_lvs", showJSON $ clusterReservedLvs cluster)
149
            , ("primary_ip_version",
150
               showJSON . ipFamilyToVersion $ clusterPrimaryIpFamily cluster)
151
            , ("prealloc_wipe_disks",
152
               showJSON $ clusterPreallocWipeDisks cluster)
153
            , ("hidden_os", showJSON $ clusterHiddenOs cluster)
154
            , ("blacklisted_os", showJSON $ clusterBlacklistedOs cluster)
155
            , ("enabled_disk_templates", showJSON diskTemplates)
156
            ]
157

    
158
  in case master of
159
    Ok _ -> return . Ok . J.makeObj $ obj
160
    Bad ex -> return $ Bad ex
161

    
162
handleCall _ cfg (QueryTags kind name) = do
163
  let tags = case kind of
164
               TagKindCluster  -> Ok . clusterTags $ configCluster cfg
165
               TagKindGroup    -> groupTags <$> Config.getGroup    cfg name
166
               TagKindNode     -> nodeTags  <$> Config.getNode     cfg name
167
               TagKindInstance -> instTags  <$> Config.getInstance cfg name
168
               TagKindNetwork  -> Bad $ OpPrereqError
169
                                        "Network tag is not allowed"
170
                                        ECodeInval
171
  return (J.showJSON <$> tags)
172

    
173
handleCall _ cfg (Query qkind qfields qfilter) = do
174
  result <- query cfg True (Qlang.Query qkind qfields qfilter)
175
  return $ J.showJSON <$> result
176

    
177
handleCall _ _ (QueryFields qkind qfields) = do
178
  let result = queryFields (Qlang.QueryFields qkind qfields)
179
  return $ J.showJSON <$> result
180

    
181
handleCall _ cfg (QueryNodes names fields lock) =
182
  handleClassicQuery cfg (Qlang.ItemTypeOpCode Qlang.QRNode)
183
    (map Left names) fields lock
184

    
185
handleCall _ cfg (QueryInstances names fields lock) =
186
  handleClassicQuery cfg (Qlang.ItemTypeOpCode Qlang.QRInstance)
187
    (map Left names) fields lock
188

    
189
handleCall _ cfg (QueryGroups names fields lock) =
190
  handleClassicQuery cfg (Qlang.ItemTypeOpCode Qlang.QRGroup)
191
    (map Left names) fields lock
192

    
193
handleCall _ cfg (QueryJobs names fields) =
194
  handleClassicQuery cfg (Qlang.ItemTypeLuxi Qlang.QRJob)
195
    (map (Right . fromIntegral . fromJobId) names)  fields False
196

    
197
handleCall _ cfg (QueryNetworks names fields lock) =
198
  handleClassicQuery cfg (Qlang.ItemTypeOpCode Qlang.QRNetwork)
199
    (map Left names) fields lock
200

    
201
handleCall qlock cfg (SubmitJobToDrainedQueue ops) =
202
  do
203
    let mcs = Config.getMasterCandidates cfg
204
    jobid <- allocateJobId mcs qlock
205
    case jobid of
206
      Bad s -> return . Bad . GenericError $ s
207
      Ok jid -> do
208
        ts <- currentTimestamp
209
        job <- liftM (setReceivedTimestamp ts)
210
                 $ queuedJobFromOpCodes jid ops
211
        qDir <- queueDir
212
        write_result <- writeJobToDisk qDir job
213
        case write_result of
214
          Bad s -> return . Bad . GenericError $ s
215
          Ok () -> do
216
            _ <- replicateManyJobs qDir mcs [job]
217
            _ <- forkIO $ enqueueJobs [job]
218
            return . Ok . showJSON . fromJobId $ jid
219

    
220
handleCall qlock cfg (SubmitJob ops) =
221
  do
222
    open <- isQueueOpen
223
    if not open
224
       then return . Bad . GenericError $ "Queue drained"
225
       else handleCall qlock cfg (SubmitJobToDrainedQueue ops)
226

    
227
handleCall qlock cfg (SubmitManyJobs lops) =
228
  do
229
    open <- isQueueOpen
230
    if not open
231
      then return . Bad . GenericError $ "Queue drained"
232
      else do
233
        let mcs = Config.getMasterCandidates cfg
234
        result_jobids <- allocateJobIds mcs qlock (length lops)
235
        case result_jobids of
236
          Bad s -> return . Bad . GenericError $ s
237
          Ok jids -> do
238
            ts <- currentTimestamp
239
            jobs <- liftM (map $ setReceivedTimestamp ts)
240
                      $ zipWithM queuedJobFromOpCodes jids lops
241
            qDir <- queueDir
242
            write_results <- mapM (writeJobToDisk qDir) jobs
243
            let annotated_results = zip write_results jobs
244
                succeeded = map snd $ filter (isOk . fst) annotated_results
245
            when (any isBad write_results) . logWarning
246
              $ "Writing some jobs failed " ++ show annotated_results
247
            replicateManyJobs qDir mcs succeeded
248
            _ <- forkIO $ enqueueJobs succeeded
249
            return . Ok . JSArray
250
              . map (\(res, job) ->
251
                      if isOk res
252
                        then showJSON (True, fromJobId $ qjId job)
253
                        else showJSON (False, genericResult id (const "") res))
254
              $ annotated_results
255

    
256
handleCall _ _ op =
257
  return . Bad $
258
    GenericError ("Luxi call '" ++ strOfOp op ++ "' not implemented")
259

    
260
{-# ANN handleCall "HLint: ignore Too strict if" #-}
261

    
262
-- | Given a decoded luxi request, executes it and sends the luxi
263
-- response back to the client.
264
handleClientMsg :: MVar () -> Client -> ConfigReader -> LuxiOp -> IO Bool
265
handleClientMsg qlock client creader args = do
266
  cfg <- creader
267
  logDebug $ "Request: " ++ show args
268
  call_result <- handleCallWrapper qlock cfg args
269
  (!status, !rval) <-
270
    case call_result of
271
      Bad err -> do
272
        logWarning $ "Failed to execute request " ++ show args ++ ": "
273
                     ++ show err
274
        return (False, showJSON err)
275
      Ok result -> do
276
        -- only log the first 2,000 chars of the result
277
        logDebug $ "Result (truncated): " ++ take 2000 (J.encode result)
278
        logInfo $ "Successfully handled " ++ strOfOp args
279
        return (True, result)
280
  sendMsg client $ buildResponse status rval
281
  return True
282

    
283
-- | Handles one iteration of the client protocol: receives message,
284
-- checks it for validity and decodes it, returns response.
285
handleClient :: MVar () -> Client -> ConfigReader -> IO Bool
286
handleClient qlock client creader = do
287
  !msg <- recvMsgExt client
288
  logDebug $ "Received message: " ++ show msg
289
  case msg of
290
    RecvConnClosed -> logDebug "Connection closed" >> return False
291
    RecvError err -> logWarning ("Error during message receiving: " ++ err) >>
292
                     return False
293
    RecvOk payload ->
294
      case validateCall payload >>= decodeCall of
295
        Bad err -> do
296
             let errmsg = "Failed to parse request: " ++ err
297
             logWarning errmsg
298
             sendMsg client $ buildResponse False (showJSON errmsg)
299
             return False
300
        Ok args -> handleClientMsg qlock client creader args
301

    
302
-- | Main client loop: runs one loop of 'handleClient', and if that
303
-- doesn't report a finished (closed) connection, restarts itself.
304
clientLoop :: MVar () -> Client -> ConfigReader -> IO ()
305
clientLoop qlock client creader = do
306
  result <- handleClient qlock client creader
307
  if result
308
    then clientLoop qlock client creader
309
    else closeClient client
310

    
311
-- | Main listener loop: accepts clients, forks an I/O thread to handle
312
-- that client.
313
listener :: MVar () -> ConfigReader -> S.Socket -> IO ()
314
listener qlock creader socket = do
315
  client <- acceptClient socket
316
  _ <- forkIO $ clientLoop qlock client creader
317
  return ()
318

    
319
-- | Type alias for prepMain results
320
type PrepResult = (FilePath, S.Socket, IORef (Result ConfigData))
321

    
322
-- | Check function for luxid.
323
checkMain :: CheckFn ()
324
checkMain _ = return $ Right ()
325

    
326
-- | Prepare function for luxid.
327
prepMain :: PrepFn () PrepResult
328
prepMain _ _ = do
329
  socket_path <- Path.defaultQuerySocket
330
  cleanupSocket socket_path
331
  s <- describeError "binding to the Luxi socket"
332
         Nothing (Just socket_path) $ getServer True socket_path
333
  cref <- newIORef (Bad "Configuration not yet loaded")
334
  return (socket_path, s, cref)
335

    
336
-- | Main function.
337
main :: MainFn () PrepResult
338
main _ _ (socket_path, server, cref) = do
339
  initConfigReader id cref
340
  let creader = readIORef cref
341
  
342
  qlockFile <- jobQueueLockFile
343
  lockFile qlockFile >>= exitIfBad "Failed to obtain the job-queue lock"
344
  qlock <- newMVar ()
345

    
346
  finally
347
    (forever $ listener qlock creader server)
348
    (closeServer socket_path server)