Statistics
| Branch: | Tag: | Revision:

root / src / Ganeti / Query / Server.hs @ e817723c

History | View | Annotate | Download (13.8 kB)

1
{-# LANGUAGE BangPatterns #-}
2

    
3
{-| Implementation of the Ganeti Query2 server.
4

    
5
-}
6

    
7
{-
8

    
9
Copyright (C) 2012, 2013 Google Inc.
10

    
11
This program is free software; you can redistribute it and/or modify
12
it under the terms of the GNU General Public License as published by
13
the Free Software Foundation; either version 2 of the License, or
14
(at your option) any later version.
15

    
16
This program is distributed in the hope that it will be useful, but
17
WITHOUT ANY WARRANTY; without even the implied warranty of
18
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
19
General Public License for more details.
20

    
21
You should have received a copy of the GNU General Public License
22
along with this program; if not, write to the Free Software
23
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
24
02110-1301, USA.
25

    
26
-}
27

    
28
module Ganeti.Query.Server
29
  ( main
30
  , checkMain
31
  , prepMain
32
  ) where
33

    
34
import Control.Applicative
35
import Control.Concurrent
36
import Control.Exception
37
import Control.Monad (forever, when, zipWithM)
38
import Data.Bits (bitSize)
39
import qualified Data.Set as Set (toList)
40
import Data.IORef
41
import qualified Network.Socket as S
42
import qualified Text.JSON as J
43
import Text.JSON (showJSON, JSValue(..))
44
import System.Info (arch)
45

    
46
import qualified Ganeti.Constants as C
47
import qualified Ganeti.ConstantUtils as ConstantUtils (unFrozenSet)
48
import Ganeti.Errors
49
import qualified Ganeti.Path as Path
50
import Ganeti.Daemon
51
import Ganeti.Objects
52
import qualified Ganeti.Config as Config
53
import Ganeti.ConfigReader
54
import Ganeti.BasicTypes
55
import Ganeti.JQueue
56
import Ganeti.Logging
57
import Ganeti.Luxi
58
import qualified Ganeti.Query.Language as Qlang
59
import qualified Ganeti.Query.Cluster as QCluster
60
import Ganeti.Path (queueDir, jobQueueLockFile, defaultLuxiSocket)
61
import Ganeti.Query.Query
62
import Ganeti.Query.Filter (makeSimpleFilter)
63
import Ganeti.Types
64
import Ganeti.Utils (lockFile, exitIfBad)
65

    
66
-- | Helper for classic queries.
67
handleClassicQuery :: ConfigData      -- ^ Cluster config
68
                   -> Qlang.ItemType  -- ^ Query type
69
                   -> [Either String Integer] -- ^ Requested names
70
                                              -- (empty means all)
71
                   -> [String]        -- ^ Requested fields
72
                   -> Bool            -- ^ Whether to do sync queries or not
73
                   -> IO (GenericResult GanetiException JSValue)
74
handleClassicQuery _ _ _ _ True =
75
  return . Bad $ OpPrereqError "Sync queries are not allowed" ECodeInval
76
handleClassicQuery cfg qkind names fields _ = do
77
  let flt = makeSimpleFilter (nameField qkind) names
78
  qr <- query cfg True (Qlang.Query qkind fields flt)
79
  return $ showJSON <$> (qr >>= queryCompat)
80

    
81
-- | Minimal wrapper to handle the missing config case.
82
handleCallWrapper :: MVar () -> Result ConfigData 
83
                     -> LuxiOp -> IO (ErrorResult JSValue)
84
handleCallWrapper _ (Bad msg) _ =
85
  return . Bad . ConfigurationError $
86
           "I do not have access to a valid configuration, cannot\
87
           \ process queries: " ++ msg
88
handleCallWrapper qlock (Ok config) op = handleCall qlock config op
89

    
90
-- | Actual luxi operation handler.
91
handleCall :: MVar () -> ConfigData -> LuxiOp -> IO (ErrorResult JSValue)
92
handleCall _ cdata QueryClusterInfo =
93
  let cluster = configCluster cdata
94
      master = QCluster.clusterMasterNodeName cdata
95
      hypervisors = clusterEnabledHypervisors cluster
96
      diskTemplates = clusterEnabledDiskTemplates cluster
97
      def_hv = case hypervisors of
98
                 x:_ -> showJSON x
99
                 [] -> JSNull
100
      bits = show (bitSize (0::Int)) ++ "bits"
101
      arch_tuple = [bits, arch]
102
      obj = [ ("software_version", showJSON C.releaseVersion)
103
            , ("protocol_version", showJSON C.protocolVersion)
104
            , ("config_version", showJSON C.configVersion)
105
            , ("os_api_version", showJSON . maximum .
106
                                 Set.toList . ConstantUtils.unFrozenSet $
107
                                 C.osApiVersions)
108
            , ("export_version", showJSON C.exportVersion)
109
            , ("vcs_version", showJSON C.vcsVersion)
110
            , ("architecture", showJSON arch_tuple)
111
            , ("name", showJSON $ clusterClusterName cluster)
112
            , ("master", showJSON (case master of
113
                                     Ok name -> name
114
                                     _ -> undefined))
115
            , ("default_hypervisor", def_hv)
116
            , ("enabled_hypervisors", showJSON hypervisors)
117
            , ("hvparams", showJSON $ clusterHvparams cluster)
118
            , ("os_hvp", showJSON $ clusterOsHvp cluster)
119
            , ("beparams", showJSON $ clusterBeparams cluster)
120
            , ("osparams", showJSON $ clusterOsparams cluster)
121
            , ("ipolicy", showJSON $ clusterIpolicy cluster)
122
            , ("nicparams", showJSON $ clusterNicparams cluster)
123
            , ("ndparams", showJSON $ clusterNdparams cluster)
124
            , ("diskparams", showJSON $ clusterDiskparams cluster)
125
            , ("candidate_pool_size",
126
               showJSON $ clusterCandidatePoolSize cluster)
127
            , ("master_netdev",  showJSON $ clusterMasterNetdev cluster)
128
            , ("master_netmask", showJSON $ clusterMasterNetmask cluster)
129
            , ("use_external_mip_script",
130
               showJSON $ clusterUseExternalMipScript cluster)
131
            , ("volume_group_name",
132
               maybe JSNull showJSON (clusterVolumeGroupName cluster))
133
            , ("drbd_usermode_helper",
134
               maybe JSNull showJSON (clusterDrbdUsermodeHelper cluster))
135
            , ("file_storage_dir", showJSON $ clusterFileStorageDir cluster)
136
            , ("shared_file_storage_dir",
137
               showJSON $ clusterSharedFileStorageDir cluster)
138
            , ("maintain_node_health",
139
               showJSON $ clusterMaintainNodeHealth cluster)
140
            , ("ctime", showJSON $ clusterCtime cluster)
141
            , ("mtime", showJSON $ clusterMtime cluster)
142
            , ("uuid", showJSON $ clusterUuid cluster)
143
            , ("tags", showJSON $ clusterTags cluster)
144
            , ("uid_pool", showJSON $ clusterUidPool cluster)
145
            , ("default_iallocator",
146
               showJSON $ clusterDefaultIallocator cluster)
147
            , ("reserved_lvs", showJSON $ clusterReservedLvs cluster)
148
            , ("primary_ip_version",
149
               showJSON . ipFamilyToVersion $ clusterPrimaryIpFamily cluster)
150
            , ("prealloc_wipe_disks",
151
               showJSON $ clusterPreallocWipeDisks cluster)
152
            , ("hidden_os", showJSON $ clusterHiddenOs cluster)
153
            , ("blacklisted_os", showJSON $ clusterBlacklistedOs cluster)
154
            , ("enabled_disk_templates", showJSON diskTemplates)
155
            ]
156

    
157
  in case master of
158
    Ok _ -> return . Ok . J.makeObj $ obj
159
    Bad ex -> return $ Bad ex
160

    
161
handleCall _ cfg (QueryTags kind name) = do
162
  let tags = case kind of
163
               TagKindCluster  -> Ok . clusterTags $ configCluster cfg
164
               TagKindGroup    -> groupTags <$> Config.getGroup    cfg name
165
               TagKindNode     -> nodeTags  <$> Config.getNode     cfg name
166
               TagKindInstance -> instTags  <$> Config.getInstance cfg name
167
               TagKindNetwork  -> Bad $ OpPrereqError
168
                                        "Network tag is not allowed"
169
                                        ECodeInval
170
  return (J.showJSON <$> tags)
171

    
172
handleCall _ cfg (Query qkind qfields qfilter) = do
173
  result <- query cfg True (Qlang.Query qkind qfields qfilter)
174
  return $ J.showJSON <$> result
175

    
176
handleCall _ _ (QueryFields qkind qfields) = do
177
  let result = queryFields (Qlang.QueryFields qkind qfields)
178
  return $ J.showJSON <$> result
179

    
180
handleCall _ cfg (QueryNodes names fields lock) =
181
  handleClassicQuery cfg (Qlang.ItemTypeOpCode Qlang.QRNode)
182
    (map Left names) fields lock
183

    
184
handleCall _ cfg (QueryGroups names fields lock) =
185
  handleClassicQuery cfg (Qlang.ItemTypeOpCode Qlang.QRGroup)
186
    (map Left names) fields lock
187

    
188
handleCall _ cfg (QueryJobs names fields) =
189
  handleClassicQuery cfg (Qlang.ItemTypeLuxi Qlang.QRJob)
190
    (map (Right . fromIntegral . fromJobId) names)  fields False
191

    
192
handleCall _ cfg (QueryNetworks names fields lock) =
193
  handleClassicQuery cfg (Qlang.ItemTypeOpCode Qlang.QRNetwork)
194
    (map Left names) fields lock
195

    
196
handleCall qlock cfg (SubmitJobToDrainedQueue ops) =
197
  do
198
    jobid <- allocateJobId (Config.getMasterCandidates cfg) qlock
199
    case jobid of
200
      Bad s -> return . Bad . GenericError $ s
201
      Ok jid -> do
202
        qDir <- queueDir
203
        job <- queuedJobFromOpCodes jid ops
204
        write_result <- writeJobToDisk qDir job
205
        case write_result of
206
          Bad s -> return . Bad . GenericError $ s
207
          Ok () -> do
208
            socketpath <- defaultLuxiSocket
209
            client <- getClient socketpath
210
            pickupResult <- callMethod (PickupJob jid) client
211
            closeClient client
212
            case pickupResult of
213
              Ok _ -> return ()
214
              Bad e -> logWarning $ "Failded to notify masterd: " ++ show e
215
            return . Ok . showJSON . fromJobId $ jid
216

    
217
handleCall qlock cfg (SubmitJob ops) =
218
  do
219
    open <- isQueueOpen
220
    if not open
221
       then return . Bad . GenericError $ "Queue drained"
222
       else handleCall qlock cfg (SubmitJobToDrainedQueue ops)
223

    
224
handleCall qlock cfg (SubmitManyJobs lops) =
225
  do
226
    open <- isQueueOpen
227
    if not open
228
      then return . Bad . GenericError $ "Queue drained"
229
      else do
230
        result_jobids <- allocateJobIds (Config.getMasterCandidates cfg)
231
                           qlock (length lops)
232
        case result_jobids of
233
          Bad s -> return . Bad . GenericError $ s
234
          Ok jids -> do
235
            jobs <- zipWithM queuedJobFromOpCodes jids lops
236
            qDir <- queueDir
237
            write_results <- mapM (writeJobToDisk qDir) jobs
238
            let annotated_results = zip write_results jids
239
                succeeded = map snd $ filter (isOk . fst) annotated_results
240
            when (any isBad write_results) . logWarning
241
              $ "Writing some jobs failed " ++ show annotated_results
242
            socketpath <- defaultLuxiSocket
243
            client <- getClient socketpath
244
            pickupResults <- mapM (flip callMethod client . PickupJob)
245
                               succeeded
246
            closeClient client
247
            when (any isBad pickupResults)
248
              . logWarning . (++)  "Failed to notify maserd: " . show
249
              $ zip succeeded pickupResults
250
            return . Ok . JSArray
251
              . map (\(res, jid) ->
252
                      if isOk res
253
                        then showJSON (True, fromJobId jid)
254
                        else showJSON (False, genericResult id (const "") res))
255
              $ annotated_results
256

    
257
handleCall _ _ op =
258
  return . Bad $
259
    GenericError ("Luxi call '" ++ strOfOp op ++ "' not implemented")
260

    
261
{-# ANN handleCall "HLint: ignore Too strict if" #-}
262

    
263
-- | Given a decoded luxi request, executes it and sends the luxi
264
-- response back to the client.
265
handleClientMsg :: MVar () -> Client -> ConfigReader -> LuxiOp -> IO Bool
266
handleClientMsg qlock client creader args = do
267
  cfg <- creader
268
  logDebug $ "Request: " ++ show args
269
  call_result <- handleCallWrapper qlock cfg args
270
  (!status, !rval) <-
271
    case call_result of
272
      Bad err -> do
273
        logWarning $ "Failed to execute request " ++ show args ++ ": "
274
                     ++ show err
275
        return (False, showJSON err)
276
      Ok result -> do
277
        -- only log the first 2,000 chars of the result
278
        logDebug $ "Result (truncated): " ++ take 2000 (J.encode result)
279
        logInfo $ "Successfully handled " ++ strOfOp args
280
        return (True, result)
281
  sendMsg client $ buildResponse status rval
282
  return True
283

    
284
-- | Handles one iteration of the client protocol: receives message,
285
-- checks it for validity and decodes it, returns response.
286
handleClient :: MVar () -> Client -> ConfigReader -> IO Bool
287
handleClient qlock client creader = do
288
  !msg <- recvMsgExt client
289
  logDebug $ "Received message: " ++ show msg
290
  case msg of
291
    RecvConnClosed -> logDebug "Connection closed" >> return False
292
    RecvError err -> logWarning ("Error during message receiving: " ++ err) >>
293
                     return False
294
    RecvOk payload ->
295
      case validateCall payload >>= decodeCall of
296
        Bad err -> do
297
             let errmsg = "Failed to parse request: " ++ err
298
             logWarning errmsg
299
             sendMsg client $ buildResponse False (showJSON errmsg)
300
             return False
301
        Ok args -> handleClientMsg qlock client creader args
302

    
303
-- | Main client loop: runs one loop of 'handleClient', and if that
304
-- doesn't report a finished (closed) connection, restarts itself.
305
clientLoop :: MVar () -> Client -> ConfigReader -> IO ()
306
clientLoop qlock client creader = do
307
  result <- handleClient qlock client creader
308
  if result
309
    then clientLoop qlock client creader
310
    else closeClient client
311

    
312
-- | Main listener loop: accepts clients, forks an I/O thread to handle
313
-- that client.
314
listener :: MVar () -> ConfigReader -> S.Socket -> IO ()
315
listener qlock creader socket = do
316
  client <- acceptClient socket
317
  _ <- forkIO $ clientLoop qlock client creader
318
  return ()
319

    
320
-- | Type alias for prepMain results
321
type PrepResult = (FilePath, S.Socket, IORef (Result ConfigData))
322

    
323
-- | Check function for luxid.
324
checkMain :: CheckFn ()
325
checkMain _ = return $ Right ()
326

    
327
-- | Prepare function for luxid.
328
prepMain :: PrepFn () PrepResult
329
prepMain _ _ = do
330
  socket_path <- Path.defaultQuerySocket
331
  cleanupSocket socket_path
332
  s <- describeError "binding to the Luxi socket"
333
         Nothing (Just socket_path) $ getServer True socket_path
334
  cref <- newIORef (Bad "Configuration not yet loaded")
335
  return (socket_path, s, cref)
336

    
337
-- | Main function.
338
main :: MainFn () PrepResult
339
main _ _ (socket_path, server, cref) = do
340
  initConfigReader id cref
341
  let creader = readIORef cref
342
  
343
  qlockFile <- jobQueueLockFile
344
  lockFile qlockFile >>= exitIfBad "Failed to obtain the job-queue lock"
345
  qlock <- newMVar ()
346

    
347
  finally
348
    (forever $ listener qlock creader server)
349
    (closeServer socket_path server)