Hs2Py constants: add 'osApiVersions'
[ganeti-local] / src / Ganeti / JQueue.hs
1 {-# LANGUAGE TemplateHaskell #-}
2
3 {-| Implementation of the job queue.
4
5 -}
6
7 {-
8
9 Copyright (C) 2010, 2012 Google Inc.
10
11 This program is free software; you can redistribute it and/or modify
12 it under the terms of the GNU General Public License as published by
13 the Free Software Foundation; either version 2 of the License, or
14 (at your option) any later version.
15
16 This program is distributed in the hope that it will be useful, but
17 WITHOUT ANY WARRANTY; without even the implied warranty of
18 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
19 General Public License for more details.
20
21 You should have received a copy of the GNU General Public License
22 along with this program; if not, write to the Free Software
23 Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
24 02110-1301, USA.
25
26 -}
27
28 module Ganeti.JQueue
29     ( QueuedOpCode(..)
30     , QueuedJob(..)
31     , InputOpCode(..)
32     , Timestamp
33     , noTimestamp
34     , opStatusFinalized
35     , extractOpSummary
36     , calcJobStatus
37     , calcJobPriority
38     , jobFileName
39     , liveJobFile
40     , archivedJobFile
41     , determineJobDirectories
42     , getJobIDs
43     , sortJobIDs
44     , loadJobFromDisk
45     , noSuchJob
46     ) where
47
48 import Control.Exception
49 import Control.Monad
50 import Data.List
51 import Data.Ord (comparing)
52 -- workaround what seems to be a bug in ghc 7.4's TH shadowing code
53 import Prelude hiding (log, id)
54 import System.Directory
55 import System.FilePath
56 import System.IO.Error (isDoesNotExistError)
57 import System.Posix.Files
58 import qualified Text.JSON
59 import Text.JSON.Types
60
61 import Ganeti.BasicTypes
62 import qualified Ganeti.Constants as C
63 import Ganeti.JSON
64 import Ganeti.Logging
65 import Ganeti.OpCodes
66 import Ganeti.Path
67 import Ganeti.THH
68 import Ganeti.Types
69
70 -- * Data types
71
72 -- | The ganeti queue timestamp type
73 type Timestamp = (Int, Int)
74
75 -- | Missing timestamp type.
76 noTimestamp :: Timestamp
77 noTimestamp = (-1, -1)
78
79 -- | An input opcode.
80 data InputOpCode = ValidOpCode MetaOpCode -- ^ OpCode was parsed successfully
81                  | InvalidOpCode JSValue  -- ^ Invalid opcode
82                    deriving (Show, Eq)
83
84 -- | JSON instance for 'InputOpCode', trying to parse it and if
85 -- failing, keeping the original JSValue.
86 instance Text.JSON.JSON InputOpCode where
87   showJSON (ValidOpCode mo) = Text.JSON.showJSON mo
88   showJSON (InvalidOpCode inv) = inv
89   readJSON v = case Text.JSON.readJSON v of
90                  Text.JSON.Error _ -> return $ InvalidOpCode v
91                  Text.JSON.Ok mo -> return $ ValidOpCode mo
92
93 -- | Invalid opcode summary.
94 invalidOp :: String
95 invalidOp = "INVALID_OP"
96
97 -- | Tries to extract the opcode summary from an 'InputOpCode'. This
98 -- duplicates some functionality from the 'opSummary' function in
99 -- "Ganeti.OpCodes".
100 extractOpSummary :: InputOpCode -> String
101 extractOpSummary (ValidOpCode metaop) = opSummary $ metaOpCode metaop
102 extractOpSummary (InvalidOpCode (JSObject o)) =
103   case fromObjWithDefault (fromJSObject o) "OP_ID" ("OP_" ++ invalidOp) of
104     Just s -> drop 3 s -- drop the OP_ prefix
105     Nothing -> invalidOp
106 extractOpSummary _ = invalidOp
107
108 $(buildObject "QueuedOpCode" "qo"
109   [ simpleField "input"           [t| InputOpCode |]
110   , simpleField "status"          [t| OpStatus    |]
111   , simpleField "result"          [t| JSValue     |]
112   , defaultField [| [] |] $
113     simpleField "log"             [t| [(Int, Timestamp, ELogType, JSValue)] |]
114   , simpleField "priority"        [t| Int         |]
115   , optionalNullSerField $
116     simpleField "start_timestamp" [t| Timestamp   |]
117   , optionalNullSerField $
118     simpleField "exec_timestamp"  [t| Timestamp   |]
119   , optionalNullSerField $
120     simpleField "end_timestamp"   [t| Timestamp   |]
121   ])
122
123 $(buildObject "QueuedJob" "qj"
124   [ simpleField "id"                 [t| JobId          |]
125   , simpleField "ops"                [t| [QueuedOpCode] |]
126   , optionalNullSerField $
127     simpleField "received_timestamp" [t| Timestamp      |]
128   , optionalNullSerField $
129     simpleField "start_timestamp"    [t| Timestamp      |]
130   , optionalNullSerField $
131     simpleField "end_timestamp"      [t| Timestamp      |]
132   ])
133
134 -- | Job file prefix.
135 jobFilePrefix :: String
136 jobFilePrefix = "job-"
137
138 -- | Computes the filename for a given job ID.
139 jobFileName :: JobId -> FilePath
140 jobFileName jid = jobFilePrefix ++ show (fromJobId jid)
141
142 -- | Parses a job ID from a file name.
143 parseJobFileId :: (Monad m) => FilePath -> m JobId
144 parseJobFileId path =
145   case stripPrefix jobFilePrefix path of
146     Nothing -> fail $ "Job file '" ++ path ++
147                       "' doesn't have the correct prefix"
148     Just suffix -> makeJobIdS suffix
149
150 -- | Computes the full path to a live job.
151 liveJobFile :: FilePath -> JobId -> FilePath
152 liveJobFile rootdir jid = rootdir </> jobFileName jid
153
154 -- | Computes the full path to an archives job. BROKEN.
155 archivedJobFile :: FilePath -> JobId -> FilePath
156 archivedJobFile rootdir jid =
157   let subdir = show (fromJobId jid `div` C.jstoreJobsPerArchiveDirectory)
158   in rootdir </> jobQueueArchiveSubDir </> subdir </> jobFileName jid
159
160 -- | Map from opcode status to job status.
161 opStatusToJob :: OpStatus -> JobStatus
162 opStatusToJob OP_STATUS_QUEUED    = JOB_STATUS_QUEUED
163 opStatusToJob OP_STATUS_WAITING   = JOB_STATUS_WAITING
164 opStatusToJob OP_STATUS_SUCCESS   = JOB_STATUS_SUCCESS
165 opStatusToJob OP_STATUS_RUNNING   = JOB_STATUS_RUNNING
166 opStatusToJob OP_STATUS_CANCELING = JOB_STATUS_CANCELING
167 opStatusToJob OP_STATUS_CANCELED  = JOB_STATUS_CANCELED
168 opStatusToJob OP_STATUS_ERROR     = JOB_STATUS_ERROR
169
170 -- | Computes a queued job's status.
171 calcJobStatus :: QueuedJob -> JobStatus
172 calcJobStatus QueuedJob { qjOps = ops } =
173   extractOpSt (map qoStatus ops) JOB_STATUS_QUEUED True
174     where
175       terminalStatus OP_STATUS_ERROR     = True
176       terminalStatus OP_STATUS_CANCELING = True
177       terminalStatus OP_STATUS_CANCELED  = True
178       terminalStatus _                   = False
179       softStatus     OP_STATUS_SUCCESS   = True
180       softStatus     OP_STATUS_QUEUED    = True
181       softStatus     _                   = False
182       extractOpSt [] _ True = JOB_STATUS_SUCCESS
183       extractOpSt [] d False = d
184       extractOpSt (x:xs) d old_all
185            | terminalStatus x = opStatusToJob x -- abort recursion
186            | softStatus x     = extractOpSt xs d new_all -- continue unchanged
187            | otherwise        = extractOpSt xs (opStatusToJob x) new_all
188            where new_all = x == OP_STATUS_SUCCESS && old_all
189
190 -- | Determine whether an opcode status is finalized.
191 opStatusFinalized :: OpStatus -> Bool
192 opStatusFinalized = (> OP_STATUS_RUNNING)
193
194 -- | Compute a job's priority.
195 calcJobPriority :: QueuedJob -> Int
196 calcJobPriority QueuedJob { qjOps = ops } =
197   helper . map qoPriority $ filter (not . opStatusFinalized . qoStatus) ops
198     where helper [] = C.opPrioDefault
199           helper ps = minimum ps
200
201 -- | Log but ignore an 'IOError'.
202 ignoreIOError :: a -> Bool -> String -> IOError -> IO a
203 ignoreIOError a ignore_noent msg e = do
204   unless (isDoesNotExistError e && ignore_noent) .
205     logWarning $ msg ++ ": " ++ show e
206   return a
207
208 -- | Compute the list of existing archive directories. Note that I/O
209 -- exceptions are swallowed and ignored.
210 allArchiveDirs :: FilePath -> IO [FilePath]
211 allArchiveDirs rootdir = do
212   let adir = rootdir </> jobQueueArchiveSubDir
213   contents <- getDirectoryContents adir `Control.Exception.catch`
214                ignoreIOError [] False
215                  ("Failed to list queue directory " ++ adir)
216   let fpaths = map (adir </>) $ filter (not . ("." `isPrefixOf`)) contents
217   filterM (\path ->
218              liftM isDirectory (getFileStatus (adir </> path))
219                `Control.Exception.catch`
220                ignoreIOError False True
221                  ("Failed to stat archive path " ++ path)) fpaths
222
223 -- | Build list of directories containing job files. Note: compared to
224 -- the Python version, this doesn't ignore a potential lost+found
225 -- file.
226 determineJobDirectories :: FilePath -> Bool -> IO [FilePath]
227 determineJobDirectories rootdir archived = do
228   other <- if archived
229              then allArchiveDirs rootdir
230              else return []
231   return $ rootdir:other
232
233 -- Function equivalent to the \'sequence\' function, that cannot be used because
234 -- of library version conflict on Lucid.
235 -- FIXME: delete this and just use \'sequence\' instead when Lucid compatibility
236 -- will not be required anymore.
237 sequencer :: [Either IOError [JobId]] -> Either IOError [[JobId]]
238 sequencer l = fmap reverse $ foldl seqFolder (Right []) l
239
240 -- | Folding function for joining multiple [JobIds] into one list.
241 seqFolder :: Either IOError [[JobId]]
242           -> Either IOError [JobId]
243           -> Either IOError [[JobId]]
244 seqFolder (Left e) _ = Left e
245 seqFolder (Right _) (Left e) = Left e
246 seqFolder (Right l) (Right el) = Right $ el:l
247
248 -- | Computes the list of all jobs in the given directories.
249 getJobIDs :: [FilePath] -> IO (Either IOError [JobId])
250 getJobIDs paths = liftM (fmap concat . sequencer) (mapM getDirJobIDs paths)
251
252 -- | Sorts the a list of job IDs.
253 sortJobIDs :: [JobId] -> [JobId]
254 sortJobIDs = sortBy (comparing fromJobId)
255
256 -- | Computes the list of jobs in a given directory.
257 getDirJobIDs :: FilePath -> IO (Either IOError [JobId])
258 getDirJobIDs path = do
259   either_contents <-
260     try (getDirectoryContents path) :: IO (Either IOError [FilePath])
261   case either_contents of
262     Left e -> do
263       logWarning $ "Failed to list job directory " ++ path ++ ": " ++ show e
264       return $ Left e
265     Right contents -> do
266       let jids = foldl (\ids file ->
267                          case parseJobFileId file of
268                            Nothing -> ids
269                            Just new_id -> new_id:ids) [] contents
270       return . Right $ reverse jids
271
272 -- | Reads the job data from disk.
273 readJobDataFromDisk :: FilePath -> Bool -> JobId -> IO (Maybe (String, Bool))
274 readJobDataFromDisk rootdir archived jid = do
275   let live_path = liveJobFile rootdir jid
276       archived_path = archivedJobFile rootdir jid
277       all_paths = if archived
278                     then [(live_path, False), (archived_path, True)]
279                     else [(live_path, False)]
280   foldM (\state (path, isarchived) ->
281            liftM (\r -> Just (r, isarchived)) (readFile path)
282              `Control.Exception.catch`
283              ignoreIOError state True
284                ("Failed to read job file " ++ path)) Nothing all_paths
285
286 -- | Failed to load job error.
287 noSuchJob :: Result (QueuedJob, Bool)
288 noSuchJob = Bad "Can't load job file"
289
290 -- | Loads a job from disk.
291 loadJobFromDisk :: FilePath -> Bool -> JobId -> IO (Result (QueuedJob, Bool))
292 loadJobFromDisk rootdir archived jid = do
293   raw <- readJobDataFromDisk rootdir archived jid
294   -- note: we need some stricness below, otherwise the wrapping in a
295   -- Result will create too much lazyness, and not close the file
296   -- descriptors for the individual jobs
297   return $! case raw of
298              Nothing -> noSuchJob
299              Just (str, arch) ->
300                liftM (\qj -> (qj, arch)) .
301                fromJResult "Parsing job file" $ Text.JSON.decode str