1 {-# LANGUAGE TemplateHaskell #-}
3 {-| Implementation of the job queue.
9 Copyright (C) 2010, 2012 Google Inc.
11 This program is free software; you can redistribute it and/or modify
12 it under the terms of the GNU General Public License as published by
13 the Free Software Foundation; either version 2 of the License, or
14 (at your option) any later version.
16 This program is distributed in the hope that it will be useful, but
17 WITHOUT ANY WARRANTY; without even the implied warranty of
18 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
19 General Public License for more details.
21 You should have received a copy of the GNU General Public License
22 along with this program; if not, write to the Free Software
23 Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
41 , determineJobDirectories
48 import Control.Exception
51 import Data.Ord (comparing)
52 -- workaround what seems to be a bug in ghc 7.4's TH shadowing code
53 import Prelude hiding (log, id)
54 import System.Directory
55 import System.FilePath
56 import System.IO.Error (isDoesNotExistError)
57 import System.Posix.Files
58 import qualified Text.JSON
59 import Text.JSON.Types
61 import Ganeti.BasicTypes
62 import qualified Ganeti.Constants as C
72 -- | The ganeti queue timestamp type
73 type Timestamp = (Int, Int)
75 -- | Missing timestamp type.
76 noTimestamp :: Timestamp
77 noTimestamp = (-1, -1)
80 data InputOpCode = ValidOpCode MetaOpCode -- ^ OpCode was parsed successfully
81 | InvalidOpCode JSValue -- ^ Invalid opcode
84 -- | JSON instance for 'InputOpCode', trying to parse it and if
85 -- failing, keeping the original JSValue.
86 instance Text.JSON.JSON InputOpCode where
87 showJSON (ValidOpCode mo) = Text.JSON.showJSON mo
88 showJSON (InvalidOpCode inv) = inv
89 readJSON v = case Text.JSON.readJSON v of
90 Text.JSON.Error _ -> return $ InvalidOpCode v
91 Text.JSON.Ok mo -> return $ ValidOpCode mo
93 -- | Invalid opcode summary.
95 invalidOp = "INVALID_OP"
97 -- | Tries to extract the opcode summary from an 'InputOpCode'. This
98 -- duplicates some functionality from the 'opSummary' function in
100 extractOpSummary :: InputOpCode -> String
101 extractOpSummary (ValidOpCode metaop) = opSummary $ metaOpCode metaop
102 extractOpSummary (InvalidOpCode (JSObject o)) =
103 case fromObjWithDefault (fromJSObject o) "OP_ID" ("OP_" ++ invalidOp) of
104 Just s -> drop 3 s -- drop the OP_ prefix
106 extractOpSummary _ = invalidOp
108 $(buildObject "QueuedOpCode" "qo"
109 [ simpleField "input" [t| InputOpCode |]
110 , simpleField "status" [t| OpStatus |]
111 , simpleField "result" [t| JSValue |]
112 , defaultField [| [] |] $
113 simpleField "log" [t| [(Int, Timestamp, ELogType, JSValue)] |]
114 , simpleField "priority" [t| Int |]
115 , optionalNullSerField $
116 simpleField "start_timestamp" [t| Timestamp |]
117 , optionalNullSerField $
118 simpleField "exec_timestamp" [t| Timestamp |]
119 , optionalNullSerField $
120 simpleField "end_timestamp" [t| Timestamp |]
123 $(buildObject "QueuedJob" "qj"
124 [ simpleField "id" [t| JobId |]
125 , simpleField "ops" [t| [QueuedOpCode] |]
126 , optionalNullSerField $
127 simpleField "received_timestamp" [t| Timestamp |]
128 , optionalNullSerField $
129 simpleField "start_timestamp" [t| Timestamp |]
130 , optionalNullSerField $
131 simpleField "end_timestamp" [t| Timestamp |]
134 -- | Job file prefix.
135 jobFilePrefix :: String
136 jobFilePrefix = "job-"
138 -- | Computes the filename for a given job ID.
139 jobFileName :: JobId -> FilePath
140 jobFileName jid = jobFilePrefix ++ show (fromJobId jid)
142 -- | Parses a job ID from a file name.
143 parseJobFileId :: (Monad m) => FilePath -> m JobId
144 parseJobFileId path =
145 case stripPrefix jobFilePrefix path of
146 Nothing -> fail $ "Job file '" ++ path ++
147 "' doesn't have the correct prefix"
148 Just suffix -> makeJobIdS suffix
150 -- | Computes the full path to a live job.
151 liveJobFile :: FilePath -> JobId -> FilePath
152 liveJobFile rootdir jid = rootdir </> jobFileName jid
154 -- | Computes the full path to an archives job. BROKEN.
155 archivedJobFile :: FilePath -> JobId -> FilePath
156 archivedJobFile rootdir jid =
157 let subdir = show (fromJobId jid `div` C.jstoreJobsPerArchiveDirectory)
158 in rootdir </> jobQueueArchiveSubDir </> subdir </> jobFileName jid
160 -- | Map from opcode status to job status.
161 opStatusToJob :: OpStatus -> JobStatus
162 opStatusToJob OP_STATUS_QUEUED = JOB_STATUS_QUEUED
163 opStatusToJob OP_STATUS_WAITING = JOB_STATUS_WAITING
164 opStatusToJob OP_STATUS_SUCCESS = JOB_STATUS_SUCCESS
165 opStatusToJob OP_STATUS_RUNNING = JOB_STATUS_RUNNING
166 opStatusToJob OP_STATUS_CANCELING = JOB_STATUS_CANCELING
167 opStatusToJob OP_STATUS_CANCELED = JOB_STATUS_CANCELED
168 opStatusToJob OP_STATUS_ERROR = JOB_STATUS_ERROR
170 -- | Computes a queued job's status.
171 calcJobStatus :: QueuedJob -> JobStatus
172 calcJobStatus QueuedJob { qjOps = ops } =
173 extractOpSt (map qoStatus ops) JOB_STATUS_QUEUED True
175 terminalStatus OP_STATUS_ERROR = True
176 terminalStatus OP_STATUS_CANCELING = True
177 terminalStatus OP_STATUS_CANCELED = True
178 terminalStatus _ = False
179 softStatus OP_STATUS_SUCCESS = True
180 softStatus OP_STATUS_QUEUED = True
182 extractOpSt [] _ True = JOB_STATUS_SUCCESS
183 extractOpSt [] d False = d
184 extractOpSt (x:xs) d old_all
185 | terminalStatus x = opStatusToJob x -- abort recursion
186 | softStatus x = extractOpSt xs d new_all -- continue unchanged
187 | otherwise = extractOpSt xs (opStatusToJob x) new_all
188 where new_all = x == OP_STATUS_SUCCESS && old_all
190 -- | Determine whether an opcode status is finalized.
191 opStatusFinalized :: OpStatus -> Bool
192 opStatusFinalized = (> OP_STATUS_RUNNING)
194 -- | Compute a job's priority.
195 calcJobPriority :: QueuedJob -> Int
196 calcJobPriority QueuedJob { qjOps = ops } =
197 helper . map qoPriority $ filter (not . opStatusFinalized . qoStatus) ops
198 where helper [] = C.opPrioDefault
199 helper ps = minimum ps
201 -- | Log but ignore an 'IOError'.
202 ignoreIOError :: a -> Bool -> String -> IOError -> IO a
203 ignoreIOError a ignore_noent msg e = do
204 unless (isDoesNotExistError e && ignore_noent) .
205 logWarning $ msg ++ ": " ++ show e
208 -- | Compute the list of existing archive directories. Note that I/O
209 -- exceptions are swallowed and ignored.
210 allArchiveDirs :: FilePath -> IO [FilePath]
211 allArchiveDirs rootdir = do
212 let adir = rootdir </> jobQueueArchiveSubDir
213 contents <- getDirectoryContents adir `Control.Exception.catch`
214 ignoreIOError [] False
215 ("Failed to list queue directory " ++ adir)
216 let fpaths = map (adir </>) $ filter (not . ("." `isPrefixOf`)) contents
218 liftM isDirectory (getFileStatus (adir </> path))
219 `Control.Exception.catch`
220 ignoreIOError False True
221 ("Failed to stat archive path " ++ path)) fpaths
223 -- | Build list of directories containing job files. Note: compared to
224 -- the Python version, this doesn't ignore a potential lost+found
226 determineJobDirectories :: FilePath -> Bool -> IO [FilePath]
227 determineJobDirectories rootdir archived = do
229 then allArchiveDirs rootdir
231 return $ rootdir:other
233 -- Function equivalent to the \'sequence\' function, that cannot be used because
234 -- of library version conflict on Lucid.
235 -- FIXME: delete this and just use \'sequence\' instead when Lucid compatibility
236 -- will not be required anymore.
237 sequencer :: [Either IOError [JobId]] -> Either IOError [[JobId]]
238 sequencer l = fmap reverse $ foldl seqFolder (Right []) l
240 -- | Folding function for joining multiple [JobIds] into one list.
241 seqFolder :: Either IOError [[JobId]]
242 -> Either IOError [JobId]
243 -> Either IOError [[JobId]]
244 seqFolder (Left e) _ = Left e
245 seqFolder (Right _) (Left e) = Left e
246 seqFolder (Right l) (Right el) = Right $ el:l
248 -- | Computes the list of all jobs in the given directories.
249 getJobIDs :: [FilePath] -> IO (Either IOError [JobId])
250 getJobIDs paths = liftM (fmap concat . sequencer) (mapM getDirJobIDs paths)
252 -- | Sorts the a list of job IDs.
253 sortJobIDs :: [JobId] -> [JobId]
254 sortJobIDs = sortBy (comparing fromJobId)
256 -- | Computes the list of jobs in a given directory.
257 getDirJobIDs :: FilePath -> IO (Either IOError [JobId])
258 getDirJobIDs path = do
260 try (getDirectoryContents path) :: IO (Either IOError [FilePath])
261 case either_contents of
263 logWarning $ "Failed to list job directory " ++ path ++ ": " ++ show e
266 let jids = foldl (\ids file ->
267 case parseJobFileId file of
269 Just new_id -> new_id:ids) [] contents
270 return . Right $ reverse jids
272 -- | Reads the job data from disk.
273 readJobDataFromDisk :: FilePath -> Bool -> JobId -> IO (Maybe (String, Bool))
274 readJobDataFromDisk rootdir archived jid = do
275 let live_path = liveJobFile rootdir jid
276 archived_path = archivedJobFile rootdir jid
277 all_paths = if archived
278 then [(live_path, False), (archived_path, True)]
279 else [(live_path, False)]
280 foldM (\state (path, isarchived) ->
281 liftM (\r -> Just (r, isarchived)) (readFile path)
282 `Control.Exception.catch`
283 ignoreIOError state True
284 ("Failed to read job file " ++ path)) Nothing all_paths
286 -- | Failed to load job error.
287 noSuchJob :: Result (QueuedJob, Bool)
288 noSuchJob = Bad "Can't load job file"
290 -- | Loads a job from disk.
291 loadJobFromDisk :: FilePath -> Bool -> JobId -> IO (Result (QueuedJob, Bool))
292 loadJobFromDisk rootdir archived jid = do
293 raw <- readJobDataFromDisk rootdir archived jid
294 -- note: we need some stricness below, otherwise the wrapping in a
295 -- Result will create too much lazyness, and not close the file
296 -- descriptors for the individual jobs
297 return $! case raw of
300 liftM (\qj -> (qj, arch)) .
301 fromJResult "Parsing job file" $ Text.JSON.decode str