root / src / Ganeti / JQueue.hs @ ae858516
History | View | Annotate | Download (13.8 kB)
1 | aa79e62e | Iustin Pop | {-# LANGUAGE TemplateHaskell #-} |
---|---|---|---|
2 | aa79e62e | Iustin Pop | |
3 | aa79e62e | Iustin Pop | {-| Implementation of the job queue. |
4 | aa79e62e | Iustin Pop | |
5 | aa79e62e | Iustin Pop | -} |
6 | aa79e62e | Iustin Pop | |
7 | aa79e62e | Iustin Pop | {- |
8 | aa79e62e | Iustin Pop | |
9 | aa79e62e | Iustin Pop | Copyright (C) 2010, 2012 Google Inc. |
10 | aa79e62e | Iustin Pop | |
11 | aa79e62e | Iustin Pop | This program is free software; you can redistribute it and/or modify |
12 | aa79e62e | Iustin Pop | it under the terms of the GNU General Public License as published by |
13 | aa79e62e | Iustin Pop | the Free Software Foundation; either version 2 of the License, or |
14 | aa79e62e | Iustin Pop | (at your option) any later version. |
15 | aa79e62e | Iustin Pop | |
16 | aa79e62e | Iustin Pop | This program is distributed in the hope that it will be useful, but |
17 | aa79e62e | Iustin Pop | WITHOUT ANY WARRANTY; without even the implied warranty of |
18 | aa79e62e | Iustin Pop | MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU |
19 | aa79e62e | Iustin Pop | General Public License for more details. |
20 | aa79e62e | Iustin Pop | |
21 | aa79e62e | Iustin Pop | You should have received a copy of the GNU General Public License |
22 | aa79e62e | Iustin Pop | along with this program; if not, write to the Free Software |
23 | aa79e62e | Iustin Pop | Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA |
24 | aa79e62e | Iustin Pop | 02110-1301, USA. |
25 | aa79e62e | Iustin Pop | |
26 | aa79e62e | Iustin Pop | -} |
27 | aa79e62e | Iustin Pop | |
28 | aa79e62e | Iustin Pop | module Ganeti.JQueue |
29 | aa79e62e | Iustin Pop | ( QueuedOpCode(..) |
30 | aa79e62e | Iustin Pop | , QueuedJob(..) |
31 | aa79e62e | Iustin Pop | , InputOpCode(..) |
32 | 4b49a72b | Klaus Aehlig | , queuedOpCodeFromMetaOpCode |
33 | 1c1132f4 | Klaus Aehlig | , queuedJobFromOpCodes |
34 | aa79e62e | Iustin Pop | , Timestamp |
35 | aa79e62e | Iustin Pop | , noTimestamp |
36 | aa79e62e | Iustin Pop | , opStatusFinalized |
37 | aa79e62e | Iustin Pop | , extractOpSummary |
38 | aa79e62e | Iustin Pop | , calcJobStatus |
39 | aa79e62e | Iustin Pop | , calcJobPriority |
40 | aa79e62e | Iustin Pop | , jobFileName |
41 | aa79e62e | Iustin Pop | , liveJobFile |
42 | aa79e62e | Iustin Pop | , archivedJobFile |
43 | aa79e62e | Iustin Pop | , determineJobDirectories |
44 | aa79e62e | Iustin Pop | , getJobIDs |
45 | aa79e62e | Iustin Pop | , sortJobIDs |
46 | aa79e62e | Iustin Pop | , loadJobFromDisk |
47 | aa79e62e | Iustin Pop | , noSuchJob |
48 | cef3f99f | Klaus Aehlig | , readSerialFromDisk |
49 | ae858516 | Klaus Aehlig | , allocateJobIds |
50 | ae858516 | Klaus Aehlig | , allocateJobId |
51 | aa79e62e | Iustin Pop | ) where |
52 | aa79e62e | Iustin Pop | |
53 | ae858516 | Klaus Aehlig | import Control.Concurrent.MVar |
54 | aa79e62e | Iustin Pop | import Control.Exception |
55 | aa79e62e | Iustin Pop | import Control.Monad |
56 | aa79e62e | Iustin Pop | import Data.List |
57 | 4b49a72b | Klaus Aehlig | import Data.Maybe |
58 | aa79e62e | Iustin Pop | import Data.Ord (comparing) |
59 | aa79e62e | Iustin Pop | -- workaround what seems to be a bug in ghc 7.4's TH shadowing code |
60 | aa79e62e | Iustin Pop | import Prelude hiding (log, id) |
61 | aa79e62e | Iustin Pop | import System.Directory |
62 | aa79e62e | Iustin Pop | import System.FilePath |
63 | aa79e62e | Iustin Pop | import System.IO.Error (isDoesNotExistError) |
64 | aa79e62e | Iustin Pop | import System.Posix.Files |
65 | aa79e62e | Iustin Pop | import qualified Text.JSON |
66 | aa79e62e | Iustin Pop | import Text.JSON.Types |
67 | aa79e62e | Iustin Pop | |
68 | aa79e62e | Iustin Pop | import Ganeti.BasicTypes |
69 | aa79e62e | Iustin Pop | import qualified Ganeti.Constants as C |
70 | aa79e62e | Iustin Pop | import Ganeti.JSON |
71 | aa79e62e | Iustin Pop | import Ganeti.Logging |
72 | ae858516 | Klaus Aehlig | import Ganeti.Objects (Node) |
73 | aa79e62e | Iustin Pop | import Ganeti.OpCodes |
74 | aa79e62e | Iustin Pop | import Ganeti.Path |
75 | ae858516 | Klaus Aehlig | import Ganeti.Rpc (executeRpcCall, RpcCallJobqueueUpdate(..)) |
76 | aa79e62e | Iustin Pop | import Ganeti.THH |
77 | aa79e62e | Iustin Pop | import Ganeti.Types |
78 | cef3f99f | Klaus Aehlig | import Ganeti.Utils |
79 | aa79e62e | Iustin Pop | |
80 | aa79e62e | Iustin Pop | -- * Data types |
81 | aa79e62e | Iustin Pop | |
82 | aa79e62e | Iustin Pop | -- | The ganeti queue timestamp type |
83 | aa79e62e | Iustin Pop | type Timestamp = (Int, Int) |
84 | aa79e62e | Iustin Pop | |
85 | aa79e62e | Iustin Pop | -- | Missing timestamp type. |
86 | aa79e62e | Iustin Pop | noTimestamp :: Timestamp |
87 | aa79e62e | Iustin Pop | noTimestamp = (-1, -1) |
88 | aa79e62e | Iustin Pop | |
89 | aa79e62e | Iustin Pop | -- | An input opcode. |
90 | aa79e62e | Iustin Pop | data InputOpCode = ValidOpCode MetaOpCode -- ^ OpCode was parsed successfully |
91 | aa79e62e | Iustin Pop | | InvalidOpCode JSValue -- ^ Invalid opcode |
92 | aa79e62e | Iustin Pop | deriving (Show, Eq) |
93 | aa79e62e | Iustin Pop | |
94 | aa79e62e | Iustin Pop | -- | JSON instance for 'InputOpCode', trying to parse it and if |
95 | aa79e62e | Iustin Pop | -- failing, keeping the original JSValue. |
96 | aa79e62e | Iustin Pop | instance Text.JSON.JSON InputOpCode where |
97 | aa79e62e | Iustin Pop | showJSON (ValidOpCode mo) = Text.JSON.showJSON mo |
98 | aa79e62e | Iustin Pop | showJSON (InvalidOpCode inv) = inv |
99 | aa79e62e | Iustin Pop | readJSON v = case Text.JSON.readJSON v of |
100 | aa79e62e | Iustin Pop | Text.JSON.Error _ -> return $ InvalidOpCode v |
101 | aa79e62e | Iustin Pop | Text.JSON.Ok mo -> return $ ValidOpCode mo |
102 | aa79e62e | Iustin Pop | |
103 | aa79e62e | Iustin Pop | -- | Invalid opcode summary. |
104 | aa79e62e | Iustin Pop | invalidOp :: String |
105 | aa79e62e | Iustin Pop | invalidOp = "INVALID_OP" |
106 | aa79e62e | Iustin Pop | |
107 | aa79e62e | Iustin Pop | -- | Tries to extract the opcode summary from an 'InputOpCode'. This |
108 | aa79e62e | Iustin Pop | -- duplicates some functionality from the 'opSummary' function in |
109 | aa79e62e | Iustin Pop | -- "Ganeti.OpCodes". |
110 | aa79e62e | Iustin Pop | extractOpSummary :: InputOpCode -> String |
111 | aa79e62e | Iustin Pop | extractOpSummary (ValidOpCode metaop) = opSummary $ metaOpCode metaop |
112 | aa79e62e | Iustin Pop | extractOpSummary (InvalidOpCode (JSObject o)) = |
113 | aa79e62e | Iustin Pop | case fromObjWithDefault (fromJSObject o) "OP_ID" ("OP_" ++ invalidOp) of |
114 | aa79e62e | Iustin Pop | Just s -> drop 3 s -- drop the OP_ prefix |
115 | aa79e62e | Iustin Pop | Nothing -> invalidOp |
116 | aa79e62e | Iustin Pop | extractOpSummary _ = invalidOp |
117 | aa79e62e | Iustin Pop | |
118 | aa79e62e | Iustin Pop | $(buildObject "QueuedOpCode" "qo" |
119 | aa79e62e | Iustin Pop | [ simpleField "input" [t| InputOpCode |] |
120 | aa79e62e | Iustin Pop | , simpleField "status" [t| OpStatus |] |
121 | aa79e62e | Iustin Pop | , simpleField "result" [t| JSValue |] |
122 | aa79e62e | Iustin Pop | , defaultField [| [] |] $ |
123 | aa79e62e | Iustin Pop | simpleField "log" [t| [(Int, Timestamp, ELogType, JSValue)] |] |
124 | aa79e62e | Iustin Pop | , simpleField "priority" [t| Int |] |
125 | aa79e62e | Iustin Pop | , optionalNullSerField $ |
126 | aa79e62e | Iustin Pop | simpleField "start_timestamp" [t| Timestamp |] |
127 | aa79e62e | Iustin Pop | , optionalNullSerField $ |
128 | aa79e62e | Iustin Pop | simpleField "exec_timestamp" [t| Timestamp |] |
129 | aa79e62e | Iustin Pop | , optionalNullSerField $ |
130 | aa79e62e | Iustin Pop | simpleField "end_timestamp" [t| Timestamp |] |
131 | aa79e62e | Iustin Pop | ]) |
132 | aa79e62e | Iustin Pop | |
133 | aa79e62e | Iustin Pop | $(buildObject "QueuedJob" "qj" |
134 | aa79e62e | Iustin Pop | [ simpleField "id" [t| JobId |] |
135 | aa79e62e | Iustin Pop | , simpleField "ops" [t| [QueuedOpCode] |] |
136 | aa79e62e | Iustin Pop | , optionalNullSerField $ |
137 | aa79e62e | Iustin Pop | simpleField "received_timestamp" [t| Timestamp |] |
138 | aa79e62e | Iustin Pop | , optionalNullSerField $ |
139 | aa79e62e | Iustin Pop | simpleField "start_timestamp" [t| Timestamp |] |
140 | aa79e62e | Iustin Pop | , optionalNullSerField $ |
141 | aa79e62e | Iustin Pop | simpleField "end_timestamp" [t| Timestamp |] |
142 | aa79e62e | Iustin Pop | ]) |
143 | aa79e62e | Iustin Pop | |
144 | 4b49a72b | Klaus Aehlig | -- | Convenience function to obtain a QueuedOpCode from a MetaOpCode |
145 | 4b49a72b | Klaus Aehlig | queuedOpCodeFromMetaOpCode :: MetaOpCode -> QueuedOpCode |
146 | 4b49a72b | Klaus Aehlig | queuedOpCodeFromMetaOpCode op = |
147 | 4b49a72b | Klaus Aehlig | QueuedOpCode { qoInput = ValidOpCode op |
148 | 4b49a72b | Klaus Aehlig | , qoStatus = OP_STATUS_QUEUED |
149 | 4b49a72b | Klaus Aehlig | , qoPriority = opSubmitPriorityToRaw . opPriority . metaParams |
150 | 4b49a72b | Klaus Aehlig | $ op |
151 | 4b49a72b | Klaus Aehlig | , qoLog = [] |
152 | 4b49a72b | Klaus Aehlig | , qoResult = JSNull |
153 | 4b49a72b | Klaus Aehlig | , qoStartTimestamp = Nothing |
154 | 4b49a72b | Klaus Aehlig | , qoEndTimestamp = Nothing |
155 | 4b49a72b | Klaus Aehlig | , qoExecTimestamp = Nothing |
156 | 4b49a72b | Klaus Aehlig | } |
157 | 4b49a72b | Klaus Aehlig | |
158 | 1c1132f4 | Klaus Aehlig | -- | From a job-id and a list of op-codes create a job. This is |
159 | 1c1132f4 | Klaus Aehlig | -- the pure part of job creation, as allocating a new job id |
160 | 1c1132f4 | Klaus Aehlig | -- lives in IO. |
161 | 1c1132f4 | Klaus Aehlig | queuedJobFromOpCodes :: (Monad m) => JobId -> [MetaOpCode] -> m QueuedJob |
162 | 1c1132f4 | Klaus Aehlig | queuedJobFromOpCodes jobid ops = do |
163 | 1c1132f4 | Klaus Aehlig | ops' <- mapM (`resolveDependencies` jobid) ops |
164 | 1c1132f4 | Klaus Aehlig | return QueuedJob { qjId = jobid |
165 | 1c1132f4 | Klaus Aehlig | , qjOps = map queuedOpCodeFromMetaOpCode ops' |
166 | 1c1132f4 | Klaus Aehlig | , qjReceivedTimestamp = Nothing |
167 | 1c1132f4 | Klaus Aehlig | , qjStartTimestamp = Nothing |
168 | 1c1132f4 | Klaus Aehlig | , qjEndTimestamp = Nothing |
169 | 1c1132f4 | Klaus Aehlig | } |
170 | 1c1132f4 | Klaus Aehlig | |
171 | aa79e62e | Iustin Pop | -- | Job file prefix. |
172 | aa79e62e | Iustin Pop | jobFilePrefix :: String |
173 | aa79e62e | Iustin Pop | jobFilePrefix = "job-" |
174 | aa79e62e | Iustin Pop | |
175 | aa79e62e | Iustin Pop | -- | Computes the filename for a given job ID. |
176 | aa79e62e | Iustin Pop | jobFileName :: JobId -> FilePath |
177 | aa79e62e | Iustin Pop | jobFileName jid = jobFilePrefix ++ show (fromJobId jid) |
178 | aa79e62e | Iustin Pop | |
179 | aa79e62e | Iustin Pop | -- | Parses a job ID from a file name. |
180 | aa79e62e | Iustin Pop | parseJobFileId :: (Monad m) => FilePath -> m JobId |
181 | aa79e62e | Iustin Pop | parseJobFileId path = |
182 | aa79e62e | Iustin Pop | case stripPrefix jobFilePrefix path of |
183 | aa79e62e | Iustin Pop | Nothing -> fail $ "Job file '" ++ path ++ |
184 | aa79e62e | Iustin Pop | "' doesn't have the correct prefix" |
185 | aa79e62e | Iustin Pop | Just suffix -> makeJobIdS suffix |
186 | aa79e62e | Iustin Pop | |
187 | aa79e62e | Iustin Pop | -- | Computes the full path to a live job. |
188 | aa79e62e | Iustin Pop | liveJobFile :: FilePath -> JobId -> FilePath |
189 | aa79e62e | Iustin Pop | liveJobFile rootdir jid = rootdir </> jobFileName jid |
190 | aa79e62e | Iustin Pop | |
191 | aa79e62e | Iustin Pop | -- | Computes the full path to an archives job. BROKEN. |
192 | aa79e62e | Iustin Pop | archivedJobFile :: FilePath -> JobId -> FilePath |
193 | aa79e62e | Iustin Pop | archivedJobFile rootdir jid = |
194 | aa79e62e | Iustin Pop | let subdir = show (fromJobId jid `div` C.jstoreJobsPerArchiveDirectory) |
195 | aa79e62e | Iustin Pop | in rootdir </> jobQueueArchiveSubDir </> subdir </> jobFileName jid |
196 | aa79e62e | Iustin Pop | |
197 | aa79e62e | Iustin Pop | -- | Map from opcode status to job status. |
198 | aa79e62e | Iustin Pop | opStatusToJob :: OpStatus -> JobStatus |
199 | aa79e62e | Iustin Pop | opStatusToJob OP_STATUS_QUEUED = JOB_STATUS_QUEUED |
200 | aa79e62e | Iustin Pop | opStatusToJob OP_STATUS_WAITING = JOB_STATUS_WAITING |
201 | aa79e62e | Iustin Pop | opStatusToJob OP_STATUS_SUCCESS = JOB_STATUS_SUCCESS |
202 | aa79e62e | Iustin Pop | opStatusToJob OP_STATUS_RUNNING = JOB_STATUS_RUNNING |
203 | aa79e62e | Iustin Pop | opStatusToJob OP_STATUS_CANCELING = JOB_STATUS_CANCELING |
204 | aa79e62e | Iustin Pop | opStatusToJob OP_STATUS_CANCELED = JOB_STATUS_CANCELED |
205 | aa79e62e | Iustin Pop | opStatusToJob OP_STATUS_ERROR = JOB_STATUS_ERROR |
206 | aa79e62e | Iustin Pop | |
207 | aa79e62e | Iustin Pop | -- | Computes a queued job's status. |
208 | aa79e62e | Iustin Pop | calcJobStatus :: QueuedJob -> JobStatus |
209 | aa79e62e | Iustin Pop | calcJobStatus QueuedJob { qjOps = ops } = |
210 | aa79e62e | Iustin Pop | extractOpSt (map qoStatus ops) JOB_STATUS_QUEUED True |
211 | aa79e62e | Iustin Pop | where |
212 | aa79e62e | Iustin Pop | terminalStatus OP_STATUS_ERROR = True |
213 | aa79e62e | Iustin Pop | terminalStatus OP_STATUS_CANCELING = True |
214 | aa79e62e | Iustin Pop | terminalStatus OP_STATUS_CANCELED = True |
215 | aa79e62e | Iustin Pop | terminalStatus _ = False |
216 | aa79e62e | Iustin Pop | softStatus OP_STATUS_SUCCESS = True |
217 | aa79e62e | Iustin Pop | softStatus OP_STATUS_QUEUED = True |
218 | aa79e62e | Iustin Pop | softStatus _ = False |
219 | aa79e62e | Iustin Pop | extractOpSt [] _ True = JOB_STATUS_SUCCESS |
220 | aa79e62e | Iustin Pop | extractOpSt [] d False = d |
221 | aa79e62e | Iustin Pop | extractOpSt (x:xs) d old_all |
222 | aa79e62e | Iustin Pop | | terminalStatus x = opStatusToJob x -- abort recursion |
223 | aa79e62e | Iustin Pop | | softStatus x = extractOpSt xs d new_all -- continue unchanged |
224 | aa79e62e | Iustin Pop | | otherwise = extractOpSt xs (opStatusToJob x) new_all |
225 | aa79e62e | Iustin Pop | where new_all = x == OP_STATUS_SUCCESS && old_all |
226 | aa79e62e | Iustin Pop | |
227 | aa79e62e | Iustin Pop | -- | Determine whether an opcode status is finalized. |
228 | aa79e62e | Iustin Pop | opStatusFinalized :: OpStatus -> Bool |
229 | aa79e62e | Iustin Pop | opStatusFinalized = (> OP_STATUS_RUNNING) |
230 | aa79e62e | Iustin Pop | |
231 | aa79e62e | Iustin Pop | -- | Compute a job's priority. |
232 | aa79e62e | Iustin Pop | calcJobPriority :: QueuedJob -> Int |
233 | aa79e62e | Iustin Pop | calcJobPriority QueuedJob { qjOps = ops } = |
234 | aa79e62e | Iustin Pop | helper . map qoPriority $ filter (not . opStatusFinalized . qoStatus) ops |
235 | aa79e62e | Iustin Pop | where helper [] = C.opPrioDefault |
236 | aa79e62e | Iustin Pop | helper ps = minimum ps |
237 | aa79e62e | Iustin Pop | |
238 | aa79e62e | Iustin Pop | -- | Log but ignore an 'IOError'. |
239 | aa79e62e | Iustin Pop | ignoreIOError :: a -> Bool -> String -> IOError -> IO a |
240 | aa79e62e | Iustin Pop | ignoreIOError a ignore_noent msg e = do |
241 | aa79e62e | Iustin Pop | unless (isDoesNotExistError e && ignore_noent) . |
242 | aa79e62e | Iustin Pop | logWarning $ msg ++ ": " ++ show e |
243 | aa79e62e | Iustin Pop | return a |
244 | aa79e62e | Iustin Pop | |
245 | aa79e62e | Iustin Pop | -- | Compute the list of existing archive directories. Note that I/O |
246 | aa79e62e | Iustin Pop | -- exceptions are swallowed and ignored. |
247 | aa79e62e | Iustin Pop | allArchiveDirs :: FilePath -> IO [FilePath] |
248 | aa79e62e | Iustin Pop | allArchiveDirs rootdir = do |
249 | aa79e62e | Iustin Pop | let adir = rootdir </> jobQueueArchiveSubDir |
250 | aa79e62e | Iustin Pop | contents <- getDirectoryContents adir `Control.Exception.catch` |
251 | aa79e62e | Iustin Pop | ignoreIOError [] False |
252 | aa79e62e | Iustin Pop | ("Failed to list queue directory " ++ adir) |
253 | aa79e62e | Iustin Pop | let fpaths = map (adir </>) $ filter (not . ("." `isPrefixOf`)) contents |
254 | aa79e62e | Iustin Pop | filterM (\path -> |
255 | aa79e62e | Iustin Pop | liftM isDirectory (getFileStatus (adir </> path)) |
256 | aa79e62e | Iustin Pop | `Control.Exception.catch` |
257 | aa79e62e | Iustin Pop | ignoreIOError False True |
258 | aa79e62e | Iustin Pop | ("Failed to stat archive path " ++ path)) fpaths |
259 | aa79e62e | Iustin Pop | |
260 | aa79e62e | Iustin Pop | -- | Build list of directories containing job files. Note: compared to |
261 | aa79e62e | Iustin Pop | -- the Python version, this doesn't ignore a potential lost+found |
262 | aa79e62e | Iustin Pop | -- file. |
263 | aa79e62e | Iustin Pop | determineJobDirectories :: FilePath -> Bool -> IO [FilePath] |
264 | aa79e62e | Iustin Pop | determineJobDirectories rootdir archived = do |
265 | aa79e62e | Iustin Pop | other <- if archived |
266 | aa79e62e | Iustin Pop | then allArchiveDirs rootdir |
267 | aa79e62e | Iustin Pop | else return [] |
268 | aa79e62e | Iustin Pop | return $ rootdir:other |
269 | aa79e62e | Iustin Pop | |
270 | 3cecd73c | Michele Tartara | -- Function equivalent to the \'sequence\' function, that cannot be used because |
271 | 3cecd73c | Michele Tartara | -- of library version conflict on Lucid. |
272 | 3cecd73c | Michele Tartara | -- FIXME: delete this and just use \'sequence\' instead when Lucid compatibility |
273 | 3cecd73c | Michele Tartara | -- will not be required anymore. |
274 | 3cecd73c | Michele Tartara | sequencer :: [Either IOError [JobId]] -> Either IOError [[JobId]] |
275 | 3cecd73c | Michele Tartara | sequencer l = fmap reverse $ foldl seqFolder (Right []) l |
276 | 3cecd73c | Michele Tartara | |
277 | 3cecd73c | Michele Tartara | -- | Folding function for joining multiple [JobIds] into one list. |
278 | 3cecd73c | Michele Tartara | seqFolder :: Either IOError [[JobId]] |
279 | 3cecd73c | Michele Tartara | -> Either IOError [JobId] |
280 | 3cecd73c | Michele Tartara | -> Either IOError [[JobId]] |
281 | 3cecd73c | Michele Tartara | seqFolder (Left e) _ = Left e |
282 | 3cecd73c | Michele Tartara | seqFolder (Right _) (Left e) = Left e |
283 | 3cecd73c | Michele Tartara | seqFolder (Right l) (Right el) = Right $ el:l |
284 | 3cecd73c | Michele Tartara | |
285 | aa79e62e | Iustin Pop | -- | Computes the list of all jobs in the given directories. |
286 | be0cb2d7 | Michele Tartara | getJobIDs :: [FilePath] -> IO (Either IOError [JobId]) |
287 | 3cecd73c | Michele Tartara | getJobIDs paths = liftM (fmap concat . sequencer) (mapM getDirJobIDs paths) |
288 | aa79e62e | Iustin Pop | |
289 | aa79e62e | Iustin Pop | -- | Sorts the a list of job IDs. |
290 | aa79e62e | Iustin Pop | sortJobIDs :: [JobId] -> [JobId] |
291 | aa79e62e | Iustin Pop | sortJobIDs = sortBy (comparing fromJobId) |
292 | aa79e62e | Iustin Pop | |
293 | aa79e62e | Iustin Pop | -- | Computes the list of jobs in a given directory. |
294 | be0cb2d7 | Michele Tartara | getDirJobIDs :: FilePath -> IO (Either IOError [JobId]) |
295 | aa79e62e | Iustin Pop | getDirJobIDs path = do |
296 | be0cb2d7 | Michele Tartara | either_contents <- |
297 | be0cb2d7 | Michele Tartara | try (getDirectoryContents path) :: IO (Either IOError [FilePath]) |
298 | be0cb2d7 | Michele Tartara | case either_contents of |
299 | be0cb2d7 | Michele Tartara | Left e -> do |
300 | be0cb2d7 | Michele Tartara | logWarning $ "Failed to list job directory " ++ path ++ ": " ++ show e |
301 | be0cb2d7 | Michele Tartara | return $ Left e |
302 | be0cb2d7 | Michele Tartara | Right contents -> do |
303 | be0cb2d7 | Michele Tartara | let jids = foldl (\ids file -> |
304 | be0cb2d7 | Michele Tartara | case parseJobFileId file of |
305 | be0cb2d7 | Michele Tartara | Nothing -> ids |
306 | be0cb2d7 | Michele Tartara | Just new_id -> new_id:ids) [] contents |
307 | be0cb2d7 | Michele Tartara | return . Right $ reverse jids |
308 | aa79e62e | Iustin Pop | |
309 | aa79e62e | Iustin Pop | -- | Reads the job data from disk. |
310 | aa79e62e | Iustin Pop | readJobDataFromDisk :: FilePath -> Bool -> JobId -> IO (Maybe (String, Bool)) |
311 | aa79e62e | Iustin Pop | readJobDataFromDisk rootdir archived jid = do |
312 | aa79e62e | Iustin Pop | let live_path = liveJobFile rootdir jid |
313 | aa79e62e | Iustin Pop | archived_path = archivedJobFile rootdir jid |
314 | aa79e62e | Iustin Pop | all_paths = if archived |
315 | aa79e62e | Iustin Pop | then [(live_path, False), (archived_path, True)] |
316 | aa79e62e | Iustin Pop | else [(live_path, False)] |
317 | aa79e62e | Iustin Pop | foldM (\state (path, isarchived) -> |
318 | aa79e62e | Iustin Pop | liftM (\r -> Just (r, isarchived)) (readFile path) |
319 | aa79e62e | Iustin Pop | `Control.Exception.catch` |
320 | aa79e62e | Iustin Pop | ignoreIOError state True |
321 | aa79e62e | Iustin Pop | ("Failed to read job file " ++ path)) Nothing all_paths |
322 | aa79e62e | Iustin Pop | |
323 | aa79e62e | Iustin Pop | -- | Failed to load job error. |
324 | aa79e62e | Iustin Pop | noSuchJob :: Result (QueuedJob, Bool) |
325 | aa79e62e | Iustin Pop | noSuchJob = Bad "Can't load job file" |
326 | aa79e62e | Iustin Pop | |
327 | aa79e62e | Iustin Pop | -- | Loads a job from disk. |
328 | aa79e62e | Iustin Pop | loadJobFromDisk :: FilePath -> Bool -> JobId -> IO (Result (QueuedJob, Bool)) |
329 | aa79e62e | Iustin Pop | loadJobFromDisk rootdir archived jid = do |
330 | aa79e62e | Iustin Pop | raw <- readJobDataFromDisk rootdir archived jid |
331 | aa79e62e | Iustin Pop | -- note: we need some stricness below, otherwise the wrapping in a |
332 | aa79e62e | Iustin Pop | -- Result will create too much lazyness, and not close the file |
333 | aa79e62e | Iustin Pop | -- descriptors for the individual jobs |
334 | aa79e62e | Iustin Pop | return $! case raw of |
335 | aa79e62e | Iustin Pop | Nothing -> noSuchJob |
336 | aa79e62e | Iustin Pop | Just (str, arch) -> |
337 | aa79e62e | Iustin Pop | liftM (\qj -> (qj, arch)) . |
338 | aa79e62e | Iustin Pop | fromJResult "Parsing job file" $ Text.JSON.decode str |
339 | cef3f99f | Klaus Aehlig | |
340 | cef3f99f | Klaus Aehlig | -- | Read the job serial number from disk. |
341 | cef3f99f | Klaus Aehlig | readSerialFromDisk :: IO (Result JobId) |
342 | cef3f99f | Klaus Aehlig | readSerialFromDisk = do |
343 | cef3f99f | Klaus Aehlig | filename <- jobQueueSerialFile |
344 | cef3f99f | Klaus Aehlig | tryAndLogIOError (readFile filename) "Failed to read serial file" |
345 | cef3f99f | Klaus Aehlig | (makeJobIdS . rStripSpace) |
346 | ae858516 | Klaus Aehlig | |
347 | ae858516 | Klaus Aehlig | -- | Allocate new job ids. |
348 | ae858516 | Klaus Aehlig | -- To avoid races while accessing the serial file, the threads synchronize |
349 | ae858516 | Klaus Aehlig | -- over a lock, as usual provided by an MVar. |
350 | ae858516 | Klaus Aehlig | allocateJobIds :: [Node] -> MVar () -> Int -> IO (Result [JobId]) |
351 | ae858516 | Klaus Aehlig | allocateJobIds mastercandidates lock n = |
352 | ae858516 | Klaus Aehlig | if n <= 0 |
353 | ae858516 | Klaus Aehlig | then return . Bad $ "Can only allocate positive number of job ids" |
354 | ae858516 | Klaus Aehlig | else do |
355 | ae858516 | Klaus Aehlig | takeMVar lock |
356 | ae858516 | Klaus Aehlig | rjobid <- readSerialFromDisk |
357 | ae858516 | Klaus Aehlig | case rjobid of |
358 | ae858516 | Klaus Aehlig | Bad s -> do |
359 | ae858516 | Klaus Aehlig | putMVar lock () |
360 | ae858516 | Klaus Aehlig | return . Bad $ s |
361 | ae858516 | Klaus Aehlig | Ok jid -> do |
362 | ae858516 | Klaus Aehlig | let current = fromJobId jid |
363 | ae858516 | Klaus Aehlig | serial_content = show (current + n) ++ "\n" |
364 | ae858516 | Klaus Aehlig | serial <- jobQueueSerialFile |
365 | ae858516 | Klaus Aehlig | write_result <- try $ atomicWriteFile serial serial_content |
366 | ae858516 | Klaus Aehlig | :: IO (Either IOError ()) |
367 | ae858516 | Klaus Aehlig | putMVar lock () |
368 | ae858516 | Klaus Aehlig | case write_result of |
369 | ae858516 | Klaus Aehlig | Left e -> do |
370 | ae858516 | Klaus Aehlig | let msg = "Failed to write serial file: " ++ show e |
371 | ae858516 | Klaus Aehlig | logError msg |
372 | ae858516 | Klaus Aehlig | return . Bad $ msg |
373 | ae858516 | Klaus Aehlig | Right () -> do |
374 | ae858516 | Klaus Aehlig | _ <- executeRpcCall mastercandidates |
375 | ae858516 | Klaus Aehlig | $ RpcCallJobqueueUpdate serial serial_content |
376 | ae858516 | Klaus Aehlig | return $ mapM makeJobId [(current+1)..(current+n)] |
377 | ae858516 | Klaus Aehlig | |
378 | ae858516 | Klaus Aehlig | -- | Allocate one new job id. |
379 | ae858516 | Klaus Aehlig | allocateJobId :: [Node] -> MVar () -> IO (Result JobId) |
380 | ae858516 | Klaus Aehlig | allocateJobId mastercandidates lock = do |
381 | ae858516 | Klaus Aehlig | jids <- allocateJobIds mastercandidates lock 1 |
382 | ae858516 | Klaus Aehlig | return (jids >>= monadicThe "Failed to allocate precisely one Job ID") |