1 {-# LANGUAGE TemplateHaskell #-}
3 {-| Implementation of the job queue.
9 Copyright (C) 2010, 2012 Google Inc.
11 This program is free software; you can redistribute it and/or modify
12 it under the terms of the GNU General Public License as published by
13 the Free Software Foundation; either version 2 of the License, or
14 (at your option) any later version.
16 This program is distributed in the hope that it will be useful, but
17 WITHOUT ANY WARRANTY; without even the implied warranty of
18 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
19 General Public License for more details.
21 You should have received a copy of the GNU General Public License
22 along with this program; if not, write to the Free Software
23 Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
41 , determineJobDirectories
48 import Control.Exception
51 import Data.Ord (comparing)
52 -- workaround what seems to be a bug in ghc 7.4's TH shadowing code
53 import Prelude hiding (log, id)
54 import System.Directory
55 import System.FilePath
56 import System.IO.Error (isDoesNotExistError)
57 import System.Posix.Files
58 import qualified Text.JSON
59 import Text.JSON.Types
61 import Ganeti.BasicTypes
62 import qualified Ganeti.Constants as C
72 -- | The ganeti queue timestamp type
73 type Timestamp = (Int, Int)
75 -- | Missing timestamp type.
76 noTimestamp :: Timestamp
77 noTimestamp = (-1, -1)
80 data InputOpCode = ValidOpCode MetaOpCode -- ^ OpCode was parsed successfully
81 | InvalidOpCode JSValue -- ^ Invalid opcode
84 -- | JSON instance for 'InputOpCode', trying to parse it and if
85 -- failing, keeping the original JSValue.
86 instance Text.JSON.JSON InputOpCode where
87 showJSON (ValidOpCode mo) = Text.JSON.showJSON mo
88 showJSON (InvalidOpCode inv) = inv
89 readJSON v = case Text.JSON.readJSON v of
90 Text.JSON.Error _ -> return $ InvalidOpCode v
91 Text.JSON.Ok mo -> return $ ValidOpCode mo
93 -- | Invalid opcode summary.
95 invalidOp = "INVALID_OP"
97 -- | Tries to extract the opcode summary from an 'InputOpCode'. This
98 -- duplicates some functionality from the 'opSummary' function in
100 extractOpSummary :: InputOpCode -> String
101 extractOpSummary (ValidOpCode metaop) = opSummary $ metaOpCode metaop
102 extractOpSummary (InvalidOpCode (JSObject o)) =
103 case fromObjWithDefault (fromJSObject o) "OP_ID" ("OP_" ++ invalidOp) of
104 Just s -> drop 3 s -- drop the OP_ prefix
106 extractOpSummary _ = invalidOp
108 $(buildObject "QueuedOpCode" "qo"
109 [ simpleField "input" [t| InputOpCode |]
110 , simpleField "status" [t| OpStatus |]
111 , simpleField "result" [t| JSValue |]
112 , defaultField [| [] |] $
113 simpleField "log" [t| [(Int, Timestamp, ELogType, JSValue)] |]
114 , simpleField "priority" [t| Int |]
115 , optionalNullSerField $
116 simpleField "start_timestamp" [t| Timestamp |]
117 , optionalNullSerField $
118 simpleField "exec_timestamp" [t| Timestamp |]
119 , optionalNullSerField $
120 simpleField "end_timestamp" [t| Timestamp |]
123 $(buildObject "QueuedJob" "qj"
124 [ simpleField "id" [t| JobId |]
125 , simpleField "ops" [t| [QueuedOpCode] |]
126 , optionalNullSerField $
127 simpleField "received_timestamp" [t| Timestamp |]
128 , optionalNullSerField $
129 simpleField "start_timestamp" [t| Timestamp |]
130 , optionalNullSerField $
131 simpleField "end_timestamp" [t| Timestamp |]
134 -- | Job file prefix.
135 jobFilePrefix :: String
136 jobFilePrefix = "job-"
138 -- | Computes the filename for a given job ID.
139 jobFileName :: JobId -> FilePath
140 jobFileName jid = jobFilePrefix ++ show (fromJobId jid)
142 -- | Parses a job ID from a file name.
143 parseJobFileId :: (Monad m) => FilePath -> m JobId
144 parseJobFileId path =
145 case stripPrefix jobFilePrefix path of
146 Nothing -> fail $ "Job file '" ++ path ++
147 "' doesn't have the correct prefix"
148 Just suffix -> makeJobIdS suffix
150 -- | Computes the full path to a live job.
151 liveJobFile :: FilePath -> JobId -> FilePath
152 liveJobFile rootdir jid = rootdir </> jobFileName jid
154 -- | Computes the full path to an archives job. BROKEN.
155 archivedJobFile :: FilePath -> JobId -> FilePath
156 archivedJobFile rootdir jid =
157 let subdir = show (fromJobId jid `div` C.jstoreJobsPerArchiveDirectory)
158 in rootdir </> jobQueueArchiveSubDir </> subdir </> jobFileName jid
160 -- | Map from opcode status to job status.
161 opStatusToJob :: OpStatus -> JobStatus
162 opStatusToJob OP_STATUS_QUEUED = JOB_STATUS_QUEUED
163 opStatusToJob OP_STATUS_WAITING = JOB_STATUS_WAITING
164 opStatusToJob OP_STATUS_SUCCESS = JOB_STATUS_SUCCESS
165 opStatusToJob OP_STATUS_RUNNING = JOB_STATUS_RUNNING
166 opStatusToJob OP_STATUS_CANCELING = JOB_STATUS_CANCELING
167 opStatusToJob OP_STATUS_CANCELED = JOB_STATUS_CANCELED
168 opStatusToJob OP_STATUS_ERROR = JOB_STATUS_ERROR
170 -- | Computes a queued job's status.
171 calcJobStatus :: QueuedJob -> JobStatus
172 calcJobStatus QueuedJob { qjOps = ops } =
173 extractOpSt (map qoStatus ops) JOB_STATUS_QUEUED True
175 terminalStatus OP_STATUS_ERROR = True
176 terminalStatus OP_STATUS_CANCELING = True
177 terminalStatus OP_STATUS_CANCELED = True
178 terminalStatus _ = False
179 softStatus OP_STATUS_SUCCESS = True
180 softStatus OP_STATUS_QUEUED = True
182 extractOpSt [] _ True = JOB_STATUS_SUCCESS
183 extractOpSt [] d False = d
184 extractOpSt (x:xs) d old_all
185 | terminalStatus x = opStatusToJob x -- abort recursion
186 | softStatus x = extractOpSt xs d new_all -- continue unchanged
187 | otherwise = extractOpSt xs (opStatusToJob x) new_all
188 where new_all = x == OP_STATUS_SUCCESS && old_all
190 -- | Determine whether an opcode status is finalized.
191 opStatusFinalized :: OpStatus -> Bool
192 opStatusFinalized = (> OP_STATUS_RUNNING)
194 -- | Compute a job's priority.
195 calcJobPriority :: QueuedJob -> Int
196 calcJobPriority QueuedJob { qjOps = ops } =
197 helper . map qoPriority $ filter (not . opStatusFinalized . qoStatus) ops
198 where helper [] = C.opPrioDefault
199 helper ps = minimum ps
201 -- | Log but ignore an 'IOError'.
202 ignoreIOError :: a -> Bool -> String -> IOError -> IO a
203 ignoreIOError a ignore_noent msg e = do
204 unless (isDoesNotExistError e && ignore_noent) .
205 logWarning $ msg ++ ": " ++ show e
208 -- | Compute the list of existing archive directories. Note that I/O
209 -- exceptions are swallowed and ignored.
210 allArchiveDirs :: FilePath -> IO [FilePath]
211 allArchiveDirs rootdir = do
212 let adir = rootdir </> jobQueueArchiveSubDir
213 contents <- getDirectoryContents adir `Control.Exception.catch`
214 ignoreIOError [] False
215 ("Failed to list queue directory " ++ adir)
216 let fpaths = map (adir </>) $ filter (not . ("." `isPrefixOf`)) contents
218 liftM isDirectory (getFileStatus (adir </> path))
219 `Control.Exception.catch`
220 ignoreIOError False True
221 ("Failed to stat archive path " ++ path)) fpaths
223 -- | Build list of directories containing job files. Note: compared to
224 -- the Python version, this doesn't ignore a potential lost+found
226 determineJobDirectories :: FilePath -> Bool -> IO [FilePath]
227 determineJobDirectories rootdir archived = do
229 then allArchiveDirs rootdir
231 return $ rootdir:other
233 -- | Computes the list of all jobs in the given directories.
234 getJobIDs :: [FilePath] -> IO [JobId]
235 getJobIDs = liftM concat . mapM getDirJobIDs
237 -- | Sorts the a list of job IDs.
238 sortJobIDs :: [JobId] -> [JobId]
239 sortJobIDs = sortBy (comparing fromJobId)
241 -- | Computes the list of jobs in a given directory.
242 getDirJobIDs :: FilePath -> IO [JobId]
243 getDirJobIDs path = do
244 contents <- getDirectoryContents path `Control.Exception.catch`
245 ignoreIOError [] False
246 ("Failed to list job directory " ++ path)
247 let jids = foldl (\ids file ->
248 case parseJobFileId file of
250 Just new_id -> new_id:ids) [] contents
251 return $ reverse jids
253 -- | Reads the job data from disk.
254 readJobDataFromDisk :: FilePath -> Bool -> JobId -> IO (Maybe (String, Bool))
255 readJobDataFromDisk rootdir archived jid = do
256 let live_path = liveJobFile rootdir jid
257 archived_path = archivedJobFile rootdir jid
258 all_paths = if archived
259 then [(live_path, False), (archived_path, True)]
260 else [(live_path, False)]
261 foldM (\state (path, isarchived) ->
262 liftM (\r -> Just (r, isarchived)) (readFile path)
263 `Control.Exception.catch`
264 ignoreIOError state True
265 ("Failed to read job file " ++ path)) Nothing all_paths
267 -- | Failed to load job error.
268 noSuchJob :: Result (QueuedJob, Bool)
269 noSuchJob = Bad "Can't load job file"
271 -- | Loads a job from disk.
272 loadJobFromDisk :: FilePath -> Bool -> JobId -> IO (Result (QueuedJob, Bool))
273 loadJobFromDisk rootdir archived jid = do
274 raw <- readJobDataFromDisk rootdir archived jid
275 -- note: we need some stricness below, otherwise the wrapping in a
276 -- Result will create too much lazyness, and not close the file
277 -- descriptors for the individual jobs
278 return $! case raw of
281 liftM (\qj -> (qj, arch)) .
282 fromJResult "Parsing job file" $ Text.JSON.decode str