Revision aa79e62e
b/Makefile.am | ||
---|---|---|
483 | 483 |
htools/Ganeti/HTools/Program/Hspace.hs \ |
484 | 484 |
htools/Ganeti/HTools/Types.hs \ |
485 | 485 |
htools/Ganeti/Hash.hs \ |
486 |
htools/Ganeti/JQueue.hs \ |
|
486 | 487 |
htools/Ganeti/JSON.hs \ |
487 | 488 |
htools/Ganeti/Jobs.hs \ |
488 | 489 |
htools/Ganeti/Logging.hs \ |
... | ... | |
530 | 531 |
htest/Test/Ganeti/HTools/Types.hs \ |
531 | 532 |
htest/Test/Ganeti/JSON.hs \ |
532 | 533 |
htest/Test/Ganeti/Jobs.hs \ |
534 |
htest/Test/Ganeti/JQueue.hs \ |
|
533 | 535 |
htest/Test/Ganeti/Luxi.hs \ |
534 | 536 |
htest/Test/Ganeti/Network.hs \ |
535 | 537 |
htest/Test/Ganeti/Objects.hs \ |
b/htest/Test/Ganeti/JQueue.hs | ||
---|---|---|
1 |
{-# LANGUAGE TemplateHaskell #-} |
|
2 |
|
|
3 |
{-| Unittests for the job queue functionality. |
|
4 |
|
|
5 |
-} |
|
6 |
|
|
7 |
{- |
|
8 |
|
|
9 |
Copyright (C) 2012 Google Inc. |
|
10 |
|
|
11 |
This program is free software; you can redistribute it and/or modify |
|
12 |
it under the terms of the GNU General Public License as published by |
|
13 |
the Free Software Foundation; either version 2 of the License, or |
|
14 |
(at your option) any later version. |
|
15 |
|
|
16 |
This program is distributed in the hope that it will be useful, but |
|
17 |
WITHOUT ANY WARRANTY; without even the implied warranty of |
|
18 |
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU |
|
19 |
General Public License for more details. |
|
20 |
|
|
21 |
You should have received a copy of the GNU General Public License |
|
22 |
along with this program; if not, write to the Free Software |
|
23 |
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA |
|
24 |
02110-1301, USA. |
|
25 |
|
|
26 |
-} |
|
27 |
|
|
28 |
module Test.Ganeti.JQueue (testJQueue) where |
|
29 |
|
|
30 |
import Control.Applicative |
|
31 |
import Control.Monad (when) |
|
32 |
import Data.Char (isAscii) |
|
33 |
import Data.List (nub, sort) |
|
34 |
import System.Directory |
|
35 |
import System.FilePath |
|
36 |
import System.IO.Temp |
|
37 |
import System.Posix.Files |
|
38 |
import Test.HUnit |
|
39 |
import Test.QuickCheck as QuickCheck |
|
40 |
import Test.QuickCheck.Monadic |
|
41 |
import Text.JSON |
|
42 |
|
|
43 |
import Test.Ganeti.TestCommon |
|
44 |
import Test.Ganeti.TestHelper |
|
45 |
import Test.Ganeti.Types () |
|
46 |
import Test.Ganeti.OpCodes |
|
47 |
|
|
48 |
import Ganeti.BasicTypes |
|
49 |
import qualified Ganeti.Constants as C |
|
50 |
import Ganeti.JQueue |
|
51 |
import Ganeti.OpCodes |
|
52 |
import Ganeti.Path |
|
53 |
import Ganeti.Types as Types |
|
54 |
|
|
55 |
{-# ANN module "HLint: ignore Use camelCase" #-} |
|
56 |
|
|
57 |
-- * Helpers |
|
58 |
|
|
59 |
-- | noTimestamp in Just form. |
|
60 |
justNoTs :: Maybe Timestamp |
|
61 |
justNoTs = Just noTimestamp |
|
62 |
|
|
63 |
-- | Generates a simple queued opcode. |
|
64 |
genQueuedOpCode :: Gen QueuedOpCode |
|
65 |
genQueuedOpCode = |
|
66 |
QueuedOpCode <$> pure (ValidOpCode $ wrapOpCode OpClusterQuery) <*> |
|
67 |
arbitrary <*> pure JSNull <*> pure [] <*> |
|
68 |
choose (C.opPrioLowest, C.opPrioHighest) <*> |
|
69 |
pure justNoTs <*> pure justNoTs <*> pure justNoTs |
|
70 |
|
|
71 |
-- | Generates an static, empty job. |
|
72 |
emptyJob :: (Monad m) => m QueuedJob |
|
73 |
emptyJob = do |
|
74 |
jid0 <- makeJobId 0 |
|
75 |
return $ QueuedJob jid0 [] justNoTs justNoTs justNoTs |
|
76 |
|
|
77 |
-- | Generates a job ID. |
|
78 |
genJobId :: Gen JobId |
|
79 |
genJobId = do |
|
80 |
p <- arbitrary::Gen (Types.NonNegative Int) |
|
81 |
makeJobId $ fromNonNegative p |
|
82 |
|
|
83 |
-- * Test cases |
|
84 |
|
|
85 |
-- | Tests default priority value. |
|
86 |
case_JobPriorityDef :: Assertion |
|
87 |
case_JobPriorityDef = do |
|
88 |
ej <- emptyJob |
|
89 |
assertEqual "for default priority" C.opPrioDefault $ calcJobPriority ej |
|
90 |
|
|
91 |
-- | Test arbitrary priorities. |
|
92 |
prop_JobPriority :: Property |
|
93 |
prop_JobPriority = |
|
94 |
forAll (listOf1 (genQueuedOpCode `suchThat` |
|
95 |
(not . opStatusFinalized . qoStatus))) $ \ops -> do |
|
96 |
jid0 <- makeJobId 0 |
|
97 |
let job = QueuedJob jid0 ops justNoTs justNoTs justNoTs |
|
98 |
calcJobPriority job ==? minimum (map qoPriority ops) |
|
99 |
|
|
100 |
-- | Tests default job status. |
|
101 |
case_JobStatusDef :: Assertion |
|
102 |
case_JobStatusDef = do |
|
103 |
ej <- emptyJob |
|
104 |
assertEqual "for job status" JOB_STATUS_SUCCESS $ calcJobStatus ej |
|
105 |
|
|
106 |
-- | Test some job status properties. |
|
107 |
prop_JobStatus :: Property |
|
108 |
prop_JobStatus = |
|
109 |
forAll genJobId $ \jid -> |
|
110 |
forAll genQueuedOpCode $ \op -> |
|
111 |
let job1 = QueuedJob jid [op] justNoTs justNoTs justNoTs |
|
112 |
st1 = calcJobStatus job1 |
|
113 |
op_succ = op { qoStatus = OP_STATUS_SUCCESS } |
|
114 |
op_err = op { qoStatus = OP_STATUS_ERROR } |
|
115 |
op_cnl = op { qoStatus = OP_STATUS_CANCELING } |
|
116 |
op_cnd = op { qoStatus = OP_STATUS_CANCELED } |
|
117 |
-- computes status for a job with an added opcode before |
|
118 |
st_pre_op pop = calcJobStatus (job1 { qjOps = pop:qjOps job1 }) |
|
119 |
-- computes status for a job with an added opcode after |
|
120 |
st_post_op pop = calcJobStatus (job1 { qjOps = qjOps job1 ++ [pop] }) |
|
121 |
in conjoin |
|
122 |
[ printTestCase "pre-success doesn't change status" |
|
123 |
(st_pre_op op_succ ==? st1) |
|
124 |
, printTestCase "post-success doesn't change status" |
|
125 |
(st_post_op op_succ ==? st1) |
|
126 |
, printTestCase "pre-error is error" |
|
127 |
(st_pre_op op_err ==? JOB_STATUS_ERROR) |
|
128 |
, printTestCase "pre-canceling is canceling" |
|
129 |
(st_pre_op op_cnl ==? JOB_STATUS_CANCELING) |
|
130 |
, printTestCase "pre-canceled is canceled" |
|
131 |
(st_pre_op op_cnd ==? JOB_STATUS_CANCELED) |
|
132 |
] |
|
133 |
|
|
134 |
-- | Tests job status equivalence with Python. Very similar to OpCodes test. |
|
135 |
case_JobStatusPri_py_equiv :: Assertion |
|
136 |
case_JobStatusPri_py_equiv = do |
|
137 |
let num_jobs = 2000::Int |
|
138 |
sample_jobs <- sample' (vectorOf num_jobs $ do |
|
139 |
num_ops <- choose (1, 5) |
|
140 |
ops <- vectorOf num_ops genQueuedOpCode |
|
141 |
jid <- genJobId |
|
142 |
return $ QueuedJob jid ops justNoTs justNoTs |
|
143 |
justNoTs) |
|
144 |
let jobs = head sample_jobs |
|
145 |
serialized = encode jobs |
|
146 |
-- check for non-ASCII fields, usually due to 'arbitrary :: String' |
|
147 |
mapM_ (\job -> when (any (not . isAscii) (encode job)) . |
|
148 |
assertFailure $ "Job has non-ASCII fields: " ++ show job |
|
149 |
) jobs |
|
150 |
py_stdout <- |
|
151 |
runPython "from ganeti import jqueue\n\ |
|
152 |
\from ganeti import serializer\n\ |
|
153 |
\import sys\n\ |
|
154 |
\job_data = serializer.Load(sys.stdin.read())\n\ |
|
155 |
\decoded = [jqueue._QueuedJob.Restore(None, o, False, False)\n\ |
|
156 |
\ for o in job_data]\n\ |
|
157 |
\encoded = [(job.CalcStatus(), job.CalcPriority())\n\ |
|
158 |
\ for job in decoded]\n\ |
|
159 |
\print serializer.Dump(encoded)" serialized |
|
160 |
>>= checkPythonResult |
|
161 |
let deserialised = decode py_stdout::Text.JSON.Result [(String, Int)] |
|
162 |
decoded <- case deserialised of |
|
163 |
Text.JSON.Ok jobs' -> return jobs' |
|
164 |
Error msg -> |
|
165 |
assertFailure ("Unable to decode jobs: " ++ msg) |
|
166 |
-- this already raised an expection, but we need it |
|
167 |
-- for proper types |
|
168 |
>> fail "Unable to decode jobs" |
|
169 |
assertEqual "Mismatch in number of returned jobs" |
|
170 |
(length decoded) (length jobs) |
|
171 |
mapM_ (\(py_sp, job) -> |
|
172 |
let hs_sp = (jobStatusToRaw $ calcJobStatus job, |
|
173 |
calcJobPriority job) |
|
174 |
in assertEqual ("Different result after encoding/decoding for " ++ |
|
175 |
show job) py_sp hs_sp |
|
176 |
) $ zip decoded jobs |
|
177 |
|
|
178 |
-- | Tests listing of Job ids. |
|
179 |
prop_ListJobIDs :: Property |
|
180 |
prop_ListJobIDs = monadicIO $ do |
|
181 |
jobs <- pick $ resize 10 (listOf1 genJobId `suchThat` (\l -> l == nub l)) |
|
182 |
(e, f, g) <- |
|
183 |
run . withSystemTempDirectory "jqueue-test." $ \tempdir -> do |
|
184 |
empty_dir <- getJobIDs [tempdir] |
|
185 |
mapM_ (\jid -> writeFile (tempdir </> jobFileName jid) "") jobs |
|
186 |
full_dir <- getJobIDs [tempdir] |
|
187 |
invalid_dir <- getJobIDs [tempdir </> "no-such-dir"] |
|
188 |
return (empty_dir, sortJobIDs full_dir, invalid_dir) |
|
189 |
stop $ conjoin [ printTestCase "empty directory" $ e ==? [] |
|
190 |
, printTestCase "directory with valid names" $ |
|
191 |
f ==? sortJobIDs jobs |
|
192 |
, printTestCase "invalid directory" $ g ==? [] |
|
193 |
] |
|
194 |
|
|
195 |
-- | Tests loading jobs from disk. |
|
196 |
prop_LoadJobs :: Property |
|
197 |
prop_LoadJobs = monadicIO $ do |
|
198 |
ops <- pick $ resize 5 (listOf1 genQueuedOpCode) |
|
199 |
jid <- pick genJobId |
|
200 |
let job = QueuedJob jid ops justNoTs justNoTs justNoTs |
|
201 |
job_s = encode job |
|
202 |
-- check that jobs in the right directories are parsed correctly |
|
203 |
(missing, current, archived, missing_current, broken) <- |
|
204 |
run . withSystemTempDirectory "jqueue-test." $ \tempdir -> do |
|
205 |
let load a = loadJobFromDisk tempdir a jid |
|
206 |
live_path = liveJobFile tempdir jid |
|
207 |
arch_path = archivedJobFile tempdir jid |
|
208 |
createDirectory $ tempdir </> jobQueueArchiveSubDir |
|
209 |
createDirectory $ dropFileName arch_path |
|
210 |
-- missing job |
|
211 |
missing <- load True |
|
212 |
writeFile live_path job_s |
|
213 |
-- this should exist |
|
214 |
current <- load False |
|
215 |
removeFile live_path |
|
216 |
writeFile arch_path job_s |
|
217 |
-- this should exist (archived) |
|
218 |
archived <- load True |
|
219 |
-- this should be missing |
|
220 |
missing_current <- load False |
|
221 |
removeFile arch_path |
|
222 |
writeFile live_path "invalid job" |
|
223 |
broken <- load True |
|
224 |
return (missing, current, archived, missing_current, broken) |
|
225 |
stop $ conjoin [ missing ==? noSuchJob |
|
226 |
, current ==? Ganeti.BasicTypes.Ok (job, False) |
|
227 |
, archived ==? Ganeti.BasicTypes.Ok (job, True) |
|
228 |
, missing_current ==? noSuchJob |
|
229 |
, printTestCase "broken job" (isBad broken) |
|
230 |
] |
|
231 |
|
|
232 |
-- | Tests computing job directories. Creates random directories, |
|
233 |
-- files and stale symlinks in a directory, and checks that we return |
|
234 |
-- \"the right thing\". |
|
235 |
prop_DetermineDirs :: Property |
|
236 |
prop_DetermineDirs = monadicIO $ do |
|
237 |
count <- pick $ choose (2, 10) |
|
238 |
nums <- pick $ genUniquesList count |
|
239 |
(arbitrary::Gen (QuickCheck.Positive Int)) |
|
240 |
let (valid, invalid) = splitAt (count `div` 2) $ |
|
241 |
map (\(QuickCheck.Positive i) -> show i) nums |
|
242 |
(tempdir, non_arch, with_arch, invalid_root) <- |
|
243 |
run . withSystemTempDirectory "jqueue-test." $ \tempdir -> do |
|
244 |
let arch_dir = tempdir </> jobQueueArchiveSubDir |
|
245 |
createDirectory arch_dir |
|
246 |
mapM_ (createDirectory . (arch_dir </>)) valid |
|
247 |
mapM_ (\p -> writeFile (arch_dir </> p) "") invalid |
|
248 |
mapM_ (\p -> createSymbolicLink "/dev/null/no/such/file" |
|
249 |
(arch_dir </> p <.> "missing")) invalid |
|
250 |
non_arch <- determineJobDirectories tempdir False |
|
251 |
with_arch <- determineJobDirectories tempdir True |
|
252 |
invalid_root <- determineJobDirectories (tempdir </> "no-such-subdir") True |
|
253 |
return (tempdir, non_arch, with_arch, invalid_root) |
|
254 |
let arch_dir = tempdir </> jobQueueArchiveSubDir |
|
255 |
stop $ conjoin [ non_arch ==? [tempdir] |
|
256 |
, sort with_arch ==? sort (tempdir:map (arch_dir </>) valid) |
|
257 |
, invalid_root ==? [tempdir </> "no-such-subdir"] |
|
258 |
] |
|
259 |
|
|
260 |
-- | Tests the JSON serialisation for 'InputOpCode'. |
|
261 |
prop_InputOpCode :: MetaOpCode -> Int -> Property |
|
262 |
prop_InputOpCode meta i = |
|
263 |
conjoin [ readJSON (showJSON valid) ==? Text.JSON.Ok valid |
|
264 |
, readJSON (showJSON invalid) ==? Text.JSON.Ok invalid |
|
265 |
] |
|
266 |
where valid = ValidOpCode meta |
|
267 |
invalid = InvalidOpCode (showJSON i) |
|
268 |
|
|
269 |
-- | Tests 'extractOpSummary'. |
|
270 |
prop_extractOpSummary :: MetaOpCode -> Int -> Property |
|
271 |
prop_extractOpSummary meta i = |
|
272 |
conjoin [ printTestCase "valid opcode" $ |
|
273 |
extractOpSummary (ValidOpCode meta) ==? summary |
|
274 |
, printTestCase "invalid opcode, correct object" $ |
|
275 |
extractOpSummary (InvalidOpCode jsobj) ==? summary |
|
276 |
, printTestCase "invalid opcode, empty object" $ |
|
277 |
extractOpSummary (InvalidOpCode emptyo) ==? invalid |
|
278 |
, printTestCase "invalid opcode, object with invalid OP_ID" $ |
|
279 |
extractOpSummary (InvalidOpCode invobj) ==? invalid |
|
280 |
, printTestCase "invalid opcode, not jsobject" $ |
|
281 |
extractOpSummary (InvalidOpCode jsinval) ==? invalid |
|
282 |
] |
|
283 |
where summary = opSummary (metaOpCode meta) |
|
284 |
jsobj = showJSON $ toJSObject [("OP_ID", |
|
285 |
showJSON ("OP_" ++ summary))] |
|
286 |
emptyo = showJSON $ toJSObject ([]::[(String, JSValue)]) |
|
287 |
invobj = showJSON $ toJSObject [("OP_ID", showJSON False)] |
|
288 |
jsinval = showJSON i |
|
289 |
invalid = "INVALID_OP" |
|
290 |
|
|
291 |
testSuite "JQueue" |
|
292 |
[ 'case_JobPriorityDef |
|
293 |
, 'prop_JobPriority |
|
294 |
, 'case_JobStatusDef |
|
295 |
, 'prop_JobStatus |
|
296 |
, 'case_JobStatusPri_py_equiv |
|
297 |
, 'prop_ListJobIDs |
|
298 |
, 'prop_LoadJobs |
|
299 |
, 'prop_DetermineDirs |
|
300 |
, 'prop_InputOpCode |
|
301 |
, 'prop_extractOpSummary |
|
302 |
] |
b/htest/test.hs | ||
---|---|---|
53 | 53 |
import Test.Ganeti.HTools.Types |
54 | 54 |
import Test.Ganeti.JSON |
55 | 55 |
import Test.Ganeti.Jobs |
56 |
import Test.Ganeti.JQueue |
|
56 | 57 |
import Test.Ganeti.Luxi |
57 | 58 |
import Test.Ganeti.Network |
58 | 59 |
import Test.Ganeti.Objects |
... | ... | |
103 | 104 |
, testHTools_Types |
104 | 105 |
, testJSON |
105 | 106 |
, testJobs |
107 |
, testJQueue |
|
106 | 108 |
, testLuxi |
107 | 109 |
, testNetwork |
108 | 110 |
, testObjects |
b/htools/Ganeti/JQueue.hs | ||
---|---|---|
1 |
{-# LANGUAGE TemplateHaskell #-} |
|
2 |
|
|
3 |
{-| Implementation of the job queue. |
|
4 |
|
|
5 |
-} |
|
6 |
|
|
7 |
{- |
|
8 |
|
|
9 |
Copyright (C) 2010, 2012 Google Inc. |
|
10 |
|
|
11 |
This program is free software; you can redistribute it and/or modify |
|
12 |
it under the terms of the GNU General Public License as published by |
|
13 |
the Free Software Foundation; either version 2 of the License, or |
|
14 |
(at your option) any later version. |
|
15 |
|
|
16 |
This program is distributed in the hope that it will be useful, but |
|
17 |
WITHOUT ANY WARRANTY; without even the implied warranty of |
|
18 |
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU |
|
19 |
General Public License for more details. |
|
20 |
|
|
21 |
You should have received a copy of the GNU General Public License |
|
22 |
along with this program; if not, write to the Free Software |
|
23 |
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA |
|
24 |
02110-1301, USA. |
|
25 |
|
|
26 |
-} |
|
27 |
|
|
28 |
module Ganeti.JQueue |
|
29 |
( QueuedOpCode(..) |
|
30 |
, QueuedJob(..) |
|
31 |
, InputOpCode(..) |
|
32 |
, Timestamp |
|
33 |
, noTimestamp |
|
34 |
, opStatusFinalized |
|
35 |
, extractOpSummary |
|
36 |
, calcJobStatus |
|
37 |
, calcJobPriority |
|
38 |
, jobFileName |
|
39 |
, liveJobFile |
|
40 |
, archivedJobFile |
|
41 |
, determineJobDirectories |
|
42 |
, getJobIDs |
|
43 |
, sortJobIDs |
|
44 |
, loadJobFromDisk |
|
45 |
, noSuchJob |
|
46 |
) where |
|
47 |
|
|
48 |
import Control.Exception |
|
49 |
import Control.Monad |
|
50 |
import Data.List |
|
51 |
import Data.Ord (comparing) |
|
52 |
-- workaround what seems to be a bug in ghc 7.4's TH shadowing code |
|
53 |
import Prelude hiding (log, id) |
|
54 |
import System.Directory |
|
55 |
import System.FilePath |
|
56 |
import System.IO.Error (isDoesNotExistError) |
|
57 |
import System.Posix.Files |
|
58 |
import qualified Text.JSON |
|
59 |
import Text.JSON.Types |
|
60 |
|
|
61 |
import Ganeti.BasicTypes |
|
62 |
import qualified Ganeti.Constants as C |
|
63 |
import Ganeti.JSON |
|
64 |
import Ganeti.Logging |
|
65 |
import Ganeti.OpCodes |
|
66 |
import Ganeti.Path |
|
67 |
import Ganeti.THH |
|
68 |
import Ganeti.Types |
|
69 |
|
|
70 |
-- * Data types |
|
71 |
|
|
72 |
-- | The ganeti queue timestamp type |
|
73 |
type Timestamp = (Int, Int) |
|
74 |
|
|
75 |
-- | Missing timestamp type. |
|
76 |
noTimestamp :: Timestamp |
|
77 |
noTimestamp = (-1, -1) |
|
78 |
|
|
79 |
-- | An input opcode. |
|
80 |
data InputOpCode = ValidOpCode MetaOpCode -- ^ OpCode was parsed successfully |
|
81 |
| InvalidOpCode JSValue -- ^ Invalid opcode |
|
82 |
deriving (Show, Eq) |
|
83 |
|
|
84 |
-- | JSON instance for 'InputOpCode', trying to parse it and if |
|
85 |
-- failing, keeping the original JSValue. |
|
86 |
instance Text.JSON.JSON InputOpCode where |
|
87 |
showJSON (ValidOpCode mo) = Text.JSON.showJSON mo |
|
88 |
showJSON (InvalidOpCode inv) = inv |
|
89 |
readJSON v = case Text.JSON.readJSON v of |
|
90 |
Text.JSON.Error _ -> return $ InvalidOpCode v |
|
91 |
Text.JSON.Ok mo -> return $ ValidOpCode mo |
|
92 |
|
|
93 |
-- | Invalid opcode summary. |
|
94 |
invalidOp :: String |
|
95 |
invalidOp = "INVALID_OP" |
|
96 |
|
|
97 |
-- | Tries to extract the opcode summary from an 'InputOpCode'. This |
|
98 |
-- duplicates some functionality from the 'opSummary' function in |
|
99 |
-- "Ganeti.OpCodes". |
|
100 |
extractOpSummary :: InputOpCode -> String |
|
101 |
extractOpSummary (ValidOpCode metaop) = opSummary $ metaOpCode metaop |
|
102 |
extractOpSummary (InvalidOpCode (JSObject o)) = |
|
103 |
case fromObjWithDefault (fromJSObject o) "OP_ID" ("OP_" ++ invalidOp) of |
|
104 |
Just s -> drop 3 s -- drop the OP_ prefix |
|
105 |
Nothing -> invalidOp |
|
106 |
extractOpSummary _ = invalidOp |
|
107 |
|
|
108 |
$(buildObject "QueuedOpCode" "qo" |
|
109 |
[ simpleField "input" [t| InputOpCode |] |
|
110 |
, simpleField "status" [t| OpStatus |] |
|
111 |
, simpleField "result" [t| JSValue |] |
|
112 |
, defaultField [| [] |] $ |
|
113 |
simpleField "log" [t| [(Int, Timestamp, ELogType, JSValue)] |] |
|
114 |
, simpleField "priority" [t| Int |] |
|
115 |
, optionalNullSerField $ |
|
116 |
simpleField "start_timestamp" [t| Timestamp |] |
|
117 |
, optionalNullSerField $ |
|
118 |
simpleField "exec_timestamp" [t| Timestamp |] |
|
119 |
, optionalNullSerField $ |
|
120 |
simpleField "end_timestamp" [t| Timestamp |] |
|
121 |
]) |
|
122 |
|
|
123 |
$(buildObject "QueuedJob" "qj" |
|
124 |
[ simpleField "id" [t| JobId |] |
|
125 |
, simpleField "ops" [t| [QueuedOpCode] |] |
|
126 |
, optionalNullSerField $ |
|
127 |
simpleField "received_timestamp" [t| Timestamp |] |
|
128 |
, optionalNullSerField $ |
|
129 |
simpleField "start_timestamp" [t| Timestamp |] |
|
130 |
, optionalNullSerField $ |
|
131 |
simpleField "end_timestamp" [t| Timestamp |] |
|
132 |
]) |
|
133 |
|
|
134 |
-- | Job file prefix. |
|
135 |
jobFilePrefix :: String |
|
136 |
jobFilePrefix = "job-" |
|
137 |
|
|
138 |
-- | Computes the filename for a given job ID. |
|
139 |
jobFileName :: JobId -> FilePath |
|
140 |
jobFileName jid = jobFilePrefix ++ show (fromJobId jid) |
|
141 |
|
|
142 |
-- | Parses a job ID from a file name. |
|
143 |
parseJobFileId :: (Monad m) => FilePath -> m JobId |
|
144 |
parseJobFileId path = |
|
145 |
case stripPrefix jobFilePrefix path of |
|
146 |
Nothing -> fail $ "Job file '" ++ path ++ |
|
147 |
"' doesn't have the correct prefix" |
|
148 |
Just suffix -> makeJobIdS suffix |
|
149 |
|
|
150 |
-- | Computes the full path to a live job. |
|
151 |
liveJobFile :: FilePath -> JobId -> FilePath |
|
152 |
liveJobFile rootdir jid = rootdir </> jobFileName jid |
|
153 |
|
|
154 |
-- | Computes the full path to an archives job. BROKEN. |
|
155 |
archivedJobFile :: FilePath -> JobId -> FilePath |
|
156 |
archivedJobFile rootdir jid = |
|
157 |
let subdir = show (fromJobId jid `div` C.jstoreJobsPerArchiveDirectory) |
|
158 |
in rootdir </> jobQueueArchiveSubDir </> subdir </> jobFileName jid |
|
159 |
|
|
160 |
-- | Map from opcode status to job status. |
|
161 |
opStatusToJob :: OpStatus -> JobStatus |
|
162 |
opStatusToJob OP_STATUS_QUEUED = JOB_STATUS_QUEUED |
|
163 |
opStatusToJob OP_STATUS_WAITING = JOB_STATUS_WAITING |
|
164 |
opStatusToJob OP_STATUS_SUCCESS = JOB_STATUS_SUCCESS |
|
165 |
opStatusToJob OP_STATUS_RUNNING = JOB_STATUS_RUNNING |
|
166 |
opStatusToJob OP_STATUS_CANCELING = JOB_STATUS_CANCELING |
|
167 |
opStatusToJob OP_STATUS_CANCELED = JOB_STATUS_CANCELED |
|
168 |
opStatusToJob OP_STATUS_ERROR = JOB_STATUS_ERROR |
|
169 |
|
|
170 |
-- | Computes a queued job's status. |
|
171 |
calcJobStatus :: QueuedJob -> JobStatus |
|
172 |
calcJobStatus QueuedJob { qjOps = ops } = |
|
173 |
extractOpSt (map qoStatus ops) JOB_STATUS_QUEUED True |
|
174 |
where |
|
175 |
terminalStatus OP_STATUS_ERROR = True |
|
176 |
terminalStatus OP_STATUS_CANCELING = True |
|
177 |
terminalStatus OP_STATUS_CANCELED = True |
|
178 |
terminalStatus _ = False |
|
179 |
softStatus OP_STATUS_SUCCESS = True |
|
180 |
softStatus OP_STATUS_QUEUED = True |
|
181 |
softStatus _ = False |
|
182 |
extractOpSt [] _ True = JOB_STATUS_SUCCESS |
|
183 |
extractOpSt [] d False = d |
|
184 |
extractOpSt (x:xs) d old_all |
|
185 |
| terminalStatus x = opStatusToJob x -- abort recursion |
|
186 |
| softStatus x = extractOpSt xs d new_all -- continue unchanged |
|
187 |
| otherwise = extractOpSt xs (opStatusToJob x) new_all |
|
188 |
where new_all = x == OP_STATUS_SUCCESS && old_all |
|
189 |
|
|
190 |
-- | Determine whether an opcode status is finalized. |
|
191 |
opStatusFinalized :: OpStatus -> Bool |
|
192 |
opStatusFinalized = (> OP_STATUS_RUNNING) |
|
193 |
|
|
194 |
-- | Compute a job's priority. |
|
195 |
calcJobPriority :: QueuedJob -> Int |
|
196 |
calcJobPriority QueuedJob { qjOps = ops } = |
|
197 |
helper . map qoPriority $ filter (not . opStatusFinalized . qoStatus) ops |
|
198 |
where helper [] = C.opPrioDefault |
|
199 |
helper ps = minimum ps |
|
200 |
|
|
201 |
-- | Log but ignore an 'IOError'. |
|
202 |
ignoreIOError :: a -> Bool -> String -> IOError -> IO a |
|
203 |
ignoreIOError a ignore_noent msg e = do |
|
204 |
unless (isDoesNotExistError e && ignore_noent) . |
|
205 |
logWarning $ msg ++ ": " ++ show e |
|
206 |
return a |
|
207 |
|
|
208 |
-- | Compute the list of existing archive directories. Note that I/O |
|
209 |
-- exceptions are swallowed and ignored. |
|
210 |
allArchiveDirs :: FilePath -> IO [FilePath] |
|
211 |
allArchiveDirs rootdir = do |
|
212 |
let adir = rootdir </> jobQueueArchiveSubDir |
|
213 |
contents <- getDirectoryContents adir `Control.Exception.catch` |
|
214 |
ignoreIOError [] False |
|
215 |
("Failed to list queue directory " ++ adir) |
|
216 |
let fpaths = map (adir </>) $ filter (not . ("." `isPrefixOf`)) contents |
|
217 |
filterM (\path -> |
|
218 |
liftM isDirectory (getFileStatus (adir </> path)) |
|
219 |
`Control.Exception.catch` |
|
220 |
ignoreIOError False True |
|
221 |
("Failed to stat archive path " ++ path)) fpaths |
|
222 |
|
|
223 |
-- | Build list of directories containing job files. Note: compared to |
|
224 |
-- the Python version, this doesn't ignore a potential lost+found |
|
225 |
-- file. |
|
226 |
determineJobDirectories :: FilePath -> Bool -> IO [FilePath] |
|
227 |
determineJobDirectories rootdir archived = do |
|
228 |
other <- if archived |
|
229 |
then allArchiveDirs rootdir |
|
230 |
else return [] |
|
231 |
return $ rootdir:other |
|
232 |
|
|
233 |
-- | Computes the list of all jobs in the given directories. |
|
234 |
getJobIDs :: [FilePath] -> IO [JobId] |
|
235 |
getJobIDs = liftM concat . mapM getDirJobIDs |
|
236 |
|
|
237 |
-- | Sorts the a list of job IDs. |
|
238 |
sortJobIDs :: [JobId] -> [JobId] |
|
239 |
sortJobIDs = sortBy (comparing fromJobId) |
|
240 |
|
|
241 |
-- | Computes the list of jobs in a given directory. |
|
242 |
getDirJobIDs :: FilePath -> IO [JobId] |
|
243 |
getDirJobIDs path = do |
|
244 |
contents <- getDirectoryContents path `Control.Exception.catch` |
|
245 |
ignoreIOError [] False |
|
246 |
("Failed to list job directory " ++ path) |
|
247 |
let jids = foldl (\ids file -> |
|
248 |
case parseJobFileId file of |
|
249 |
Nothing -> ids |
|
250 |
Just new_id -> new_id:ids) [] contents |
|
251 |
return $ reverse jids |
|
252 |
|
|
253 |
-- | Reads the job data from disk. |
|
254 |
readJobDataFromDisk :: FilePath -> Bool -> JobId -> IO (Maybe (String, Bool)) |
|
255 |
readJobDataFromDisk rootdir archived jid = do |
|
256 |
let live_path = liveJobFile rootdir jid |
|
257 |
archived_path = archivedJobFile rootdir jid |
|
258 |
all_paths = if archived |
|
259 |
then [(live_path, False), (archived_path, True)] |
|
260 |
else [(live_path, False)] |
|
261 |
foldM (\state (path, isarchived) -> |
|
262 |
liftM (\r -> Just (r, isarchived)) (readFile path) |
|
263 |
`Control.Exception.catch` |
|
264 |
ignoreIOError state True |
|
265 |
("Failed to read job file " ++ path)) Nothing all_paths |
|
266 |
|
|
267 |
-- | Failed to load job error. |
|
268 |
noSuchJob :: Result (QueuedJob, Bool) |
|
269 |
noSuchJob = Bad "Can't load job file" |
|
270 |
|
|
271 |
-- | Loads a job from disk. |
|
272 |
loadJobFromDisk :: FilePath -> Bool -> JobId -> IO (Result (QueuedJob, Bool)) |
|
273 |
loadJobFromDisk rootdir archived jid = do |
|
274 |
raw <- readJobDataFromDisk rootdir archived jid |
|
275 |
-- note: we need some stricness below, otherwise the wrapping in a |
|
276 |
-- Result will create too much lazyness, and not close the file |
|
277 |
-- descriptors for the individual jobs |
|
278 |
return $! case raw of |
|
279 |
Nothing -> noSuchJob |
|
280 |
Just (str, arch) -> |
|
281 |
liftM (\qj -> (qj, arch)) . |
|
282 |
fromJResult "Parsing job file" $ Text.JSON.decode str |
Also available in: Unified diff