root / src / Ganeti / JQueue.hs @ cef3f99f
History | View | Annotate | Download (12.2 kB)
1 |
{-# LANGUAGE TemplateHaskell #-} |
---|---|
2 |
|
3 |
{-| Implementation of the job queue. |
4 |
|
5 |
-} |
6 |
|
7 |
{- |
8 |
|
9 |
Copyright (C) 2010, 2012 Google Inc. |
10 |
|
11 |
This program is free software; you can redistribute it and/or modify |
12 |
it under the terms of the GNU General Public License as published by |
13 |
the Free Software Foundation; either version 2 of the License, or |
14 |
(at your option) any later version. |
15 |
|
16 |
This program is distributed in the hope that it will be useful, but |
17 |
WITHOUT ANY WARRANTY; without even the implied warranty of |
18 |
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU |
19 |
General Public License for more details. |
20 |
|
21 |
You should have received a copy of the GNU General Public License |
22 |
along with this program; if not, write to the Free Software |
23 |
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA |
24 |
02110-1301, USA. |
25 |
|
26 |
-} |
27 |
|
28 |
module Ganeti.JQueue |
29 |
( QueuedOpCode(..) |
30 |
, QueuedJob(..) |
31 |
, InputOpCode(..) |
32 |
, queuedOpCodeFromMetaOpCode |
33 |
, queuedJobFromOpCodes |
34 |
, Timestamp |
35 |
, noTimestamp |
36 |
, opStatusFinalized |
37 |
, extractOpSummary |
38 |
, calcJobStatus |
39 |
, calcJobPriority |
40 |
, jobFileName |
41 |
, liveJobFile |
42 |
, archivedJobFile |
43 |
, determineJobDirectories |
44 |
, getJobIDs |
45 |
, sortJobIDs |
46 |
, loadJobFromDisk |
47 |
, noSuchJob |
48 |
, readSerialFromDisk |
49 |
) where |
50 |
|
51 |
import Control.Exception |
52 |
import Control.Monad |
53 |
import Data.List |
54 |
import Data.Maybe |
55 |
import Data.Ord (comparing) |
56 |
-- workaround what seems to be a bug in ghc 7.4's TH shadowing code |
57 |
import Prelude hiding (log, id) |
58 |
import System.Directory |
59 |
import System.FilePath |
60 |
import System.IO.Error (isDoesNotExistError) |
61 |
import System.Posix.Files |
62 |
import qualified Text.JSON |
63 |
import Text.JSON.Types |
64 |
|
65 |
import Ganeti.BasicTypes |
66 |
import qualified Ganeti.Constants as C |
67 |
import Ganeti.JSON |
68 |
import Ganeti.Logging |
69 |
import Ganeti.OpCodes |
70 |
import Ganeti.Path |
71 |
import Ganeti.THH |
72 |
import Ganeti.Types |
73 |
import Ganeti.Utils |
74 |
|
75 |
-- * Data types |
76 |
|
77 |
-- | The ganeti queue timestamp type |
78 |
type Timestamp = (Int, Int) |
79 |
|
80 |
-- | Missing timestamp type. |
81 |
noTimestamp :: Timestamp |
82 |
noTimestamp = (-1, -1) |
83 |
|
84 |
-- | An input opcode. |
85 |
data InputOpCode = ValidOpCode MetaOpCode -- ^ OpCode was parsed successfully |
86 |
| InvalidOpCode JSValue -- ^ Invalid opcode |
87 |
deriving (Show, Eq) |
88 |
|
89 |
-- | JSON instance for 'InputOpCode', trying to parse it and if |
90 |
-- failing, keeping the original JSValue. |
91 |
instance Text.JSON.JSON InputOpCode where |
92 |
showJSON (ValidOpCode mo) = Text.JSON.showJSON mo |
93 |
showJSON (InvalidOpCode inv) = inv |
94 |
readJSON v = case Text.JSON.readJSON v of |
95 |
Text.JSON.Error _ -> return $ InvalidOpCode v |
96 |
Text.JSON.Ok mo -> return $ ValidOpCode mo |
97 |
|
98 |
-- | Invalid opcode summary. |
99 |
invalidOp :: String |
100 |
invalidOp = "INVALID_OP" |
101 |
|
102 |
-- | Tries to extract the opcode summary from an 'InputOpCode'. This |
103 |
-- duplicates some functionality from the 'opSummary' function in |
104 |
-- "Ganeti.OpCodes". |
105 |
extractOpSummary :: InputOpCode -> String |
106 |
extractOpSummary (ValidOpCode metaop) = opSummary $ metaOpCode metaop |
107 |
extractOpSummary (InvalidOpCode (JSObject o)) = |
108 |
case fromObjWithDefault (fromJSObject o) "OP_ID" ("OP_" ++ invalidOp) of |
109 |
Just s -> drop 3 s -- drop the OP_ prefix |
110 |
Nothing -> invalidOp |
111 |
extractOpSummary _ = invalidOp |
112 |
|
113 |
$(buildObject "QueuedOpCode" "qo" |
114 |
[ simpleField "input" [t| InputOpCode |] |
115 |
, simpleField "status" [t| OpStatus |] |
116 |
, simpleField "result" [t| JSValue |] |
117 |
, defaultField [| [] |] $ |
118 |
simpleField "log" [t| [(Int, Timestamp, ELogType, JSValue)] |] |
119 |
, simpleField "priority" [t| Int |] |
120 |
, optionalNullSerField $ |
121 |
simpleField "start_timestamp" [t| Timestamp |] |
122 |
, optionalNullSerField $ |
123 |
simpleField "exec_timestamp" [t| Timestamp |] |
124 |
, optionalNullSerField $ |
125 |
simpleField "end_timestamp" [t| Timestamp |] |
126 |
]) |
127 |
|
128 |
$(buildObject "QueuedJob" "qj" |
129 |
[ simpleField "id" [t| JobId |] |
130 |
, simpleField "ops" [t| [QueuedOpCode] |] |
131 |
, optionalNullSerField $ |
132 |
simpleField "received_timestamp" [t| Timestamp |] |
133 |
, optionalNullSerField $ |
134 |
simpleField "start_timestamp" [t| Timestamp |] |
135 |
, optionalNullSerField $ |
136 |
simpleField "end_timestamp" [t| Timestamp |] |
137 |
]) |
138 |
|
139 |
-- | Convenience function to obtain a QueuedOpCode from a MetaOpCode |
140 |
queuedOpCodeFromMetaOpCode :: MetaOpCode -> QueuedOpCode |
141 |
queuedOpCodeFromMetaOpCode op = |
142 |
QueuedOpCode { qoInput = ValidOpCode op |
143 |
, qoStatus = OP_STATUS_QUEUED |
144 |
, qoPriority = opSubmitPriorityToRaw . opPriority . metaParams |
145 |
$ op |
146 |
, qoLog = [] |
147 |
, qoResult = JSNull |
148 |
, qoStartTimestamp = Nothing |
149 |
, qoEndTimestamp = Nothing |
150 |
, qoExecTimestamp = Nothing |
151 |
} |
152 |
|
153 |
-- | From a job-id and a list of op-codes create a job. This is |
154 |
-- the pure part of job creation, as allocating a new job id |
155 |
-- lives in IO. |
156 |
queuedJobFromOpCodes :: (Monad m) => JobId -> [MetaOpCode] -> m QueuedJob |
157 |
queuedJobFromOpCodes jobid ops = do |
158 |
ops' <- mapM (`resolveDependencies` jobid) ops |
159 |
return QueuedJob { qjId = jobid |
160 |
, qjOps = map queuedOpCodeFromMetaOpCode ops' |
161 |
, qjReceivedTimestamp = Nothing |
162 |
, qjStartTimestamp = Nothing |
163 |
, qjEndTimestamp = Nothing |
164 |
} |
165 |
|
166 |
-- | Job file prefix. |
167 |
jobFilePrefix :: String |
168 |
jobFilePrefix = "job-" |
169 |
|
170 |
-- | Computes the filename for a given job ID. |
171 |
jobFileName :: JobId -> FilePath |
172 |
jobFileName jid = jobFilePrefix ++ show (fromJobId jid) |
173 |
|
174 |
-- | Parses a job ID from a file name. |
175 |
parseJobFileId :: (Monad m) => FilePath -> m JobId |
176 |
parseJobFileId path = |
177 |
case stripPrefix jobFilePrefix path of |
178 |
Nothing -> fail $ "Job file '" ++ path ++ |
179 |
"' doesn't have the correct prefix" |
180 |
Just suffix -> makeJobIdS suffix |
181 |
|
182 |
-- | Computes the full path to a live job. |
183 |
liveJobFile :: FilePath -> JobId -> FilePath |
184 |
liveJobFile rootdir jid = rootdir </> jobFileName jid |
185 |
|
186 |
-- | Computes the full path to an archives job. BROKEN. |
187 |
archivedJobFile :: FilePath -> JobId -> FilePath |
188 |
archivedJobFile rootdir jid = |
189 |
let subdir = show (fromJobId jid `div` C.jstoreJobsPerArchiveDirectory) |
190 |
in rootdir </> jobQueueArchiveSubDir </> subdir </> jobFileName jid |
191 |
|
192 |
-- | Map from opcode status to job status. |
193 |
opStatusToJob :: OpStatus -> JobStatus |
194 |
opStatusToJob OP_STATUS_QUEUED = JOB_STATUS_QUEUED |
195 |
opStatusToJob OP_STATUS_WAITING = JOB_STATUS_WAITING |
196 |
opStatusToJob OP_STATUS_SUCCESS = JOB_STATUS_SUCCESS |
197 |
opStatusToJob OP_STATUS_RUNNING = JOB_STATUS_RUNNING |
198 |
opStatusToJob OP_STATUS_CANCELING = JOB_STATUS_CANCELING |
199 |
opStatusToJob OP_STATUS_CANCELED = JOB_STATUS_CANCELED |
200 |
opStatusToJob OP_STATUS_ERROR = JOB_STATUS_ERROR |
201 |
|
202 |
-- | Computes a queued job's status. |
203 |
calcJobStatus :: QueuedJob -> JobStatus |
204 |
calcJobStatus QueuedJob { qjOps = ops } = |
205 |
extractOpSt (map qoStatus ops) JOB_STATUS_QUEUED True |
206 |
where |
207 |
terminalStatus OP_STATUS_ERROR = True |
208 |
terminalStatus OP_STATUS_CANCELING = True |
209 |
terminalStatus OP_STATUS_CANCELED = True |
210 |
terminalStatus _ = False |
211 |
softStatus OP_STATUS_SUCCESS = True |
212 |
softStatus OP_STATUS_QUEUED = True |
213 |
softStatus _ = False |
214 |
extractOpSt [] _ True = JOB_STATUS_SUCCESS |
215 |
extractOpSt [] d False = d |
216 |
extractOpSt (x:xs) d old_all |
217 |
| terminalStatus x = opStatusToJob x -- abort recursion |
218 |
| softStatus x = extractOpSt xs d new_all -- continue unchanged |
219 |
| otherwise = extractOpSt xs (opStatusToJob x) new_all |
220 |
where new_all = x == OP_STATUS_SUCCESS && old_all |
221 |
|
222 |
-- | Determine whether an opcode status is finalized. |
223 |
opStatusFinalized :: OpStatus -> Bool |
224 |
opStatusFinalized = (> OP_STATUS_RUNNING) |
225 |
|
226 |
-- | Compute a job's priority. |
227 |
calcJobPriority :: QueuedJob -> Int |
228 |
calcJobPriority QueuedJob { qjOps = ops } = |
229 |
helper . map qoPriority $ filter (not . opStatusFinalized . qoStatus) ops |
230 |
where helper [] = C.opPrioDefault |
231 |
helper ps = minimum ps |
232 |
|
233 |
-- | Log but ignore an 'IOError'. |
234 |
ignoreIOError :: a -> Bool -> String -> IOError -> IO a |
235 |
ignoreIOError a ignore_noent msg e = do |
236 |
unless (isDoesNotExistError e && ignore_noent) . |
237 |
logWarning $ msg ++ ": " ++ show e |
238 |
return a |
239 |
|
240 |
-- | Compute the list of existing archive directories. Note that I/O |
241 |
-- exceptions are swallowed and ignored. |
242 |
allArchiveDirs :: FilePath -> IO [FilePath] |
243 |
allArchiveDirs rootdir = do |
244 |
let adir = rootdir </> jobQueueArchiveSubDir |
245 |
contents <- getDirectoryContents adir `Control.Exception.catch` |
246 |
ignoreIOError [] False |
247 |
("Failed to list queue directory " ++ adir) |
248 |
let fpaths = map (adir </>) $ filter (not . ("." `isPrefixOf`)) contents |
249 |
filterM (\path -> |
250 |
liftM isDirectory (getFileStatus (adir </> path)) |
251 |
`Control.Exception.catch` |
252 |
ignoreIOError False True |
253 |
("Failed to stat archive path " ++ path)) fpaths |
254 |
|
255 |
-- | Build list of directories containing job files. Note: compared to |
256 |
-- the Python version, this doesn't ignore a potential lost+found |
257 |
-- file. |
258 |
determineJobDirectories :: FilePath -> Bool -> IO [FilePath] |
259 |
determineJobDirectories rootdir archived = do |
260 |
other <- if archived |
261 |
then allArchiveDirs rootdir |
262 |
else return [] |
263 |
return $ rootdir:other |
264 |
|
265 |
-- Function equivalent to the \'sequence\' function, that cannot be used because |
266 |
-- of library version conflict on Lucid. |
267 |
-- FIXME: delete this and just use \'sequence\' instead when Lucid compatibility |
268 |
-- will not be required anymore. |
269 |
sequencer :: [Either IOError [JobId]] -> Either IOError [[JobId]] |
270 |
sequencer l = fmap reverse $ foldl seqFolder (Right []) l |
271 |
|
272 |
-- | Folding function for joining multiple [JobIds] into one list. |
273 |
seqFolder :: Either IOError [[JobId]] |
274 |
-> Either IOError [JobId] |
275 |
-> Either IOError [[JobId]] |
276 |
seqFolder (Left e) _ = Left e |
277 |
seqFolder (Right _) (Left e) = Left e |
278 |
seqFolder (Right l) (Right el) = Right $ el:l |
279 |
|
280 |
-- | Computes the list of all jobs in the given directories. |
281 |
getJobIDs :: [FilePath] -> IO (Either IOError [JobId]) |
282 |
getJobIDs paths = liftM (fmap concat . sequencer) (mapM getDirJobIDs paths) |
283 |
|
284 |
-- | Sorts the a list of job IDs. |
285 |
sortJobIDs :: [JobId] -> [JobId] |
286 |
sortJobIDs = sortBy (comparing fromJobId) |
287 |
|
288 |
-- | Computes the list of jobs in a given directory. |
289 |
getDirJobIDs :: FilePath -> IO (Either IOError [JobId]) |
290 |
getDirJobIDs path = do |
291 |
either_contents <- |
292 |
try (getDirectoryContents path) :: IO (Either IOError [FilePath]) |
293 |
case either_contents of |
294 |
Left e -> do |
295 |
logWarning $ "Failed to list job directory " ++ path ++ ": " ++ show e |
296 |
return $ Left e |
297 |
Right contents -> do |
298 |
let jids = foldl (\ids file -> |
299 |
case parseJobFileId file of |
300 |
Nothing -> ids |
301 |
Just new_id -> new_id:ids) [] contents |
302 |
return . Right $ reverse jids |
303 |
|
304 |
-- | Reads the job data from disk. |
305 |
readJobDataFromDisk :: FilePath -> Bool -> JobId -> IO (Maybe (String, Bool)) |
306 |
readJobDataFromDisk rootdir archived jid = do |
307 |
let live_path = liveJobFile rootdir jid |
308 |
archived_path = archivedJobFile rootdir jid |
309 |
all_paths = if archived |
310 |
then [(live_path, False), (archived_path, True)] |
311 |
else [(live_path, False)] |
312 |
foldM (\state (path, isarchived) -> |
313 |
liftM (\r -> Just (r, isarchived)) (readFile path) |
314 |
`Control.Exception.catch` |
315 |
ignoreIOError state True |
316 |
("Failed to read job file " ++ path)) Nothing all_paths |
317 |
|
318 |
-- | Failed to load job error. |
319 |
noSuchJob :: Result (QueuedJob, Bool) |
320 |
noSuchJob = Bad "Can't load job file" |
321 |
|
322 |
-- | Loads a job from disk. |
323 |
loadJobFromDisk :: FilePath -> Bool -> JobId -> IO (Result (QueuedJob, Bool)) |
324 |
loadJobFromDisk rootdir archived jid = do |
325 |
raw <- readJobDataFromDisk rootdir archived jid |
326 |
-- note: we need some stricness below, otherwise the wrapping in a |
327 |
-- Result will create too much lazyness, and not close the file |
328 |
-- descriptors for the individual jobs |
329 |
return $! case raw of |
330 |
Nothing -> noSuchJob |
331 |
Just (str, arch) -> |
332 |
liftM (\qj -> (qj, arch)) . |
333 |
fromJResult "Parsing job file" $ Text.JSON.decode str |
334 |
|
335 |
-- | Read the job serial number from disk. |
336 |
readSerialFromDisk :: IO (Result JobId) |
337 |
readSerialFromDisk = do |
338 |
filename <- jobQueueSerialFile |
339 |
tryAndLogIOError (readFile filename) "Failed to read serial file" |
340 |
(makeJobIdS . rStripSpace) |