root / src / Ganeti / JQueue.hs @ e181c8cd
History | View | Annotate | Download (25.4 kB)
1 | aa79e62e | Iustin Pop | {-| Implementation of the job queue. |
---|---|---|---|
2 | aa79e62e | Iustin Pop | |
3 | aa79e62e | Iustin Pop | -} |
4 | aa79e62e | Iustin Pop | |
5 | aa79e62e | Iustin Pop | {- |
6 | aa79e62e | Iustin Pop | |
7 | aa79e62e | Iustin Pop | Copyright (C) 2010, 2012 Google Inc. |
8 | aa79e62e | Iustin Pop | |
9 | aa79e62e | Iustin Pop | This program is free software; you can redistribute it and/or modify |
10 | aa79e62e | Iustin Pop | it under the terms of the GNU General Public License as published by |
11 | aa79e62e | Iustin Pop | the Free Software Foundation; either version 2 of the License, or |
12 | aa79e62e | Iustin Pop | (at your option) any later version. |
13 | aa79e62e | Iustin Pop | |
14 | aa79e62e | Iustin Pop | This program is distributed in the hope that it will be useful, but |
15 | aa79e62e | Iustin Pop | WITHOUT ANY WARRANTY; without even the implied warranty of |
16 | aa79e62e | Iustin Pop | MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU |
17 | aa79e62e | Iustin Pop | General Public License for more details. |
18 | aa79e62e | Iustin Pop | |
19 | aa79e62e | Iustin Pop | You should have received a copy of the GNU General Public License |
20 | aa79e62e | Iustin Pop | along with this program; if not, write to the Free Software |
21 | aa79e62e | Iustin Pop | Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA |
22 | aa79e62e | Iustin Pop | 02110-1301, USA. |
23 | aa79e62e | Iustin Pop | |
24 | aa79e62e | Iustin Pop | -} |
25 | aa79e62e | Iustin Pop | |
26 | aa79e62e | Iustin Pop | module Ganeti.JQueue |
27 | a35a4f52 | Klaus Aehlig | ( queuedOpCodeFromMetaOpCode |
28 | 1c1132f4 | Klaus Aehlig | , queuedJobFromOpCodes |
29 | a7ab381a | Klaus Aehlig | , changeOpCodePriority |
30 | a6b33b72 | Klaus Aehlig | , changeJobPriority |
31 | 363dc9d6 | Klaus Aehlig | , cancelQueuedJob |
32 | 4b887066 | Petr Pudlak | , failQueuedJob |
33 | ae66f3a9 | Klaus Aehlig | , fromClockTime |
34 | aa79e62e | Iustin Pop | , noTimestamp |
35 | c3a70209 | Klaus Aehlig | , currentTimestamp |
36 | 8b5a4b9a | Klaus Aehlig | , advanceTimestamp |
37 | 17aa37ff | Klaus Aehlig | , reasonTrailTimestamp |
38 | 2af22d70 | Klaus Aehlig | , setReceivedTimestamp |
39 | 65a3ff88 | Michele Tartara | , extendJobReasonTrail |
40 | 6a2e71d9 | Klaus Aehlig | , getJobDependencies |
41 | aa79e62e | Iustin Pop | , opStatusFinalized |
42 | aa79e62e | Iustin Pop | , extractOpSummary |
43 | aa79e62e | Iustin Pop | , calcJobStatus |
44 | 02e40b13 | Klaus Aehlig | , jobStarted |
45 | 847df9e9 | Klaus Aehlig | , jobFinalized |
46 | 370f63be | Klaus Aehlig | , jobArchivable |
47 | aa79e62e | Iustin Pop | , calcJobPriority |
48 | aa79e62e | Iustin Pop | , jobFileName |
49 | aa79e62e | Iustin Pop | , liveJobFile |
50 | aa79e62e | Iustin Pop | , archivedJobFile |
51 | aa79e62e | Iustin Pop | , determineJobDirectories |
52 | aa79e62e | Iustin Pop | , getJobIDs |
53 | aa79e62e | Iustin Pop | , sortJobIDs |
54 | aa79e62e | Iustin Pop | , loadJobFromDisk |
55 | aa79e62e | Iustin Pop | , noSuchJob |
56 | cef3f99f | Klaus Aehlig | , readSerialFromDisk |
57 | ae858516 | Klaus Aehlig | , allocateJobIds |
58 | ae858516 | Klaus Aehlig | , allocateJobId |
59 | b498ed42 | Klaus Aehlig | , writeJobToDisk |
60 | 9fd653a4 | Klaus Aehlig | , replicateManyJobs |
61 | 7b0a9096 | Petr Pudlak | , writeAndReplicateJob |
62 | 1b94c0db | Klaus Aehlig | , isQueueOpen |
63 | ac0c5c6d | Klaus Aehlig | , startJobs |
64 | 47c3c7b1 | Klaus Aehlig | , cancelJob |
65 | f23daea8 | Klaus Aehlig | , queueDirPermissions |
66 | c867cfe1 | Klaus Aehlig | , archiveJobs |
67 | a35a4f52 | Klaus Aehlig | -- re-export |
68 | a35a4f52 | Klaus Aehlig | , Timestamp |
69 | a35a4f52 | Klaus Aehlig | , InputOpCode(..) |
70 | a35a4f52 | Klaus Aehlig | , QueuedOpCode(..) |
71 | a35a4f52 | Klaus Aehlig | , QueuedJob(..) |
72 | aa79e62e | Iustin Pop | ) where |
73 | aa79e62e | Iustin Pop | |
74 | 370f63be | Klaus Aehlig | import Control.Applicative (liftA2, (<|>)) |
75 | 8b5a4b9a | Klaus Aehlig | import Control.Arrow (first, second) |
76 | ec98ea2b | Petr Pudlak | import Control.Concurrent (forkIO, threadDelay) |
77 | ae858516 | Klaus Aehlig | import Control.Concurrent.MVar |
78 | aa79e62e | Iustin Pop | import Control.Exception |
79 | 58d29849 | Klaus Aehlig | import Control.Lens (over) |
80 | aa79e62e | Iustin Pop | import Control.Monad |
81 | ea7032da | Petr Pudlak | import Control.Monad.IO.Class |
82 | ec98ea2b | Petr Pudlak | import Control.Monad.Trans (lift) |
83 | ec98ea2b | Petr Pudlak | import Control.Monad.Trans.Maybe |
84 | ec98ea2b | Petr Pudlak | import Data.Functor ((<$), (<$>)) |
85 | aa79e62e | Iustin Pop | import Data.List |
86 | 4b49a72b | Klaus Aehlig | import Data.Maybe |
87 | aa79e62e | Iustin Pop | import Data.Ord (comparing) |
88 | aa79e62e | Iustin Pop | -- workaround what seems to be a bug in ghc 7.4's TH shadowing code |
89 | 557f5dad | Klaus Aehlig | import Prelude hiding (id, log) |
90 | aa79e62e | Iustin Pop | import System.Directory |
91 | aa79e62e | Iustin Pop | import System.FilePath |
92 | aa79e62e | Iustin Pop | import System.IO.Error (isDoesNotExistError) |
93 | aa79e62e | Iustin Pop | import System.Posix.Files |
94 | ec98ea2b | Petr Pudlak | import System.Posix.Signals (sigTERM, signalProcess) |
95 | c3a70209 | Klaus Aehlig | import System.Time |
96 | aa79e62e | Iustin Pop | import qualified Text.JSON |
97 | aa79e62e | Iustin Pop | import Text.JSON.Types |
98 | aa79e62e | Iustin Pop | |
99 | aa79e62e | Iustin Pop | import Ganeti.BasicTypes |
100 | c867cfe1 | Klaus Aehlig | import qualified Ganeti.Config as Config |
101 | aa79e62e | Iustin Pop | import qualified Ganeti.Constants as C |
102 | ec98ea2b | Petr Pudlak | import Ganeti.Errors (ErrorResult, ResultG) |
103 | 58d29849 | Klaus Aehlig | import Ganeti.JQueue.Lens (qoInputL, validOpCodeL) |
104 | a35a4f52 | Klaus Aehlig | import Ganeti.JQueue.Objects |
105 | aa79e62e | Iustin Pop | import Ganeti.JSON |
106 | aa79e62e | Iustin Pop | import Ganeti.Logging |
107 | 493d6920 | Klaus Aehlig | import Ganeti.Luxi |
108 | c867cfe1 | Klaus Aehlig | import Ganeti.Objects (ConfigData, Node) |
109 | aa79e62e | Iustin Pop | import Ganeti.OpCodes |
110 | 58d29849 | Klaus Aehlig | import Ganeti.OpCodes.Lens (metaParamsL, opReasonL) |
111 | aa79e62e | Iustin Pop | import Ganeti.Path |
112 | efb4c025 | Petr Pudlak | import Ganeti.Query.Exec as Exec |
113 | b5a96995 | Klaus Aehlig | import Ganeti.Rpc (executeRpcCall, ERpcError, logRpcErrors, |
114 | c867cfe1 | Klaus Aehlig | RpcCallJobqueueUpdate(..), RpcCallJobqueueRename(..)) |
115 | aa79e62e | Iustin Pop | import Ganeti.Types |
116 | cef3f99f | Klaus Aehlig | import Ganeti.Utils |
117 | 5f6515b6 | Petr Pudlak | import Ganeti.Utils.Atomic |
118 | ec98ea2b | Petr Pudlak | import Ganeti.Utils.Livelock (Livelock, isDead) |
119 | 77676415 | Klaus Aehlig | import Ganeti.VCluster (makeVirtualPath) |
120 | aa79e62e | Iustin Pop | |
121 | aa79e62e | Iustin Pop | -- * Data types |
122 | aa79e62e | Iustin Pop | |
123 | aa79e62e | Iustin Pop | -- | Missing timestamp type. |
124 | aa79e62e | Iustin Pop | noTimestamp :: Timestamp |
125 | aa79e62e | Iustin Pop | noTimestamp = (-1, -1) |
126 | aa79e62e | Iustin Pop | |
127 | ae66f3a9 | Klaus Aehlig | -- | Obtain a Timestamp from a given clock time |
128 | ae66f3a9 | Klaus Aehlig | fromClockTime :: ClockTime -> Timestamp |
129 | ae66f3a9 | Klaus Aehlig | fromClockTime (TOD ctime pico) = |
130 | ae66f3a9 | Klaus Aehlig | (fromIntegral ctime, fromIntegral $ pico `div` 1000000) |
131 | ae66f3a9 | Klaus Aehlig | |
132 | c3a70209 | Klaus Aehlig | -- | Get the current time in the job-queue timestamp format. |
133 | c3a70209 | Klaus Aehlig | currentTimestamp :: IO Timestamp |
134 | ae66f3a9 | Klaus Aehlig | currentTimestamp = fromClockTime `liftM` getClockTime |
135 | c3a70209 | Klaus Aehlig | |
136 | 8b5a4b9a | Klaus Aehlig | -- | From a given timestamp, obtain the timestamp of the |
137 | 8b5a4b9a | Klaus Aehlig | -- time that is the given number of seconds later. |
138 | 8b5a4b9a | Klaus Aehlig | advanceTimestamp :: Int -> Timestamp -> Timestamp |
139 | 8b5a4b9a | Klaus Aehlig | advanceTimestamp = first . (+) |
140 | 8b5a4b9a | Klaus Aehlig | |
141 | aa79e62e | Iustin Pop | |
142 | 6a2e71d9 | Klaus Aehlig | -- | From an InputOpCode obtain the MetaOpCode, if any. |
143 | 6a2e71d9 | Klaus Aehlig | toMetaOpCode :: InputOpCode -> [MetaOpCode] |
144 | 6a2e71d9 | Klaus Aehlig | toMetaOpCode (ValidOpCode mopc) = [mopc] |
145 | 6a2e71d9 | Klaus Aehlig | toMetaOpCode _ = [] |
146 | 6a2e71d9 | Klaus Aehlig | |
147 | aa79e62e | Iustin Pop | -- | Invalid opcode summary. |
148 | aa79e62e | Iustin Pop | invalidOp :: String |
149 | aa79e62e | Iustin Pop | invalidOp = "INVALID_OP" |
150 | aa79e62e | Iustin Pop | |
151 | aa79e62e | Iustin Pop | -- | Tries to extract the opcode summary from an 'InputOpCode'. This |
152 | aa79e62e | Iustin Pop | -- duplicates some functionality from the 'opSummary' function in |
153 | aa79e62e | Iustin Pop | -- "Ganeti.OpCodes". |
154 | aa79e62e | Iustin Pop | extractOpSummary :: InputOpCode -> String |
155 | aa79e62e | Iustin Pop | extractOpSummary (ValidOpCode metaop) = opSummary $ metaOpCode metaop |
156 | aa79e62e | Iustin Pop | extractOpSummary (InvalidOpCode (JSObject o)) = |
157 | aa79e62e | Iustin Pop | case fromObjWithDefault (fromJSObject o) "OP_ID" ("OP_" ++ invalidOp) of |
158 | aa79e62e | Iustin Pop | Just s -> drop 3 s -- drop the OP_ prefix |
159 | aa79e62e | Iustin Pop | Nothing -> invalidOp |
160 | aa79e62e | Iustin Pop | extractOpSummary _ = invalidOp |
161 | aa79e62e | Iustin Pop | |
162 | 4b49a72b | Klaus Aehlig | -- | Convenience function to obtain a QueuedOpCode from a MetaOpCode |
163 | 4b49a72b | Klaus Aehlig | queuedOpCodeFromMetaOpCode :: MetaOpCode -> QueuedOpCode |
164 | 4b49a72b | Klaus Aehlig | queuedOpCodeFromMetaOpCode op = |
165 | 4b49a72b | Klaus Aehlig | QueuedOpCode { qoInput = ValidOpCode op |
166 | 4b49a72b | Klaus Aehlig | , qoStatus = OP_STATUS_QUEUED |
167 | 4b49a72b | Klaus Aehlig | , qoPriority = opSubmitPriorityToRaw . opPriority . metaParams |
168 | 4b49a72b | Klaus Aehlig | $ op |
169 | 4b49a72b | Klaus Aehlig | , qoLog = [] |
170 | 4b49a72b | Klaus Aehlig | , qoResult = JSNull |
171 | 4b49a72b | Klaus Aehlig | , qoStartTimestamp = Nothing |
172 | 4b49a72b | Klaus Aehlig | , qoEndTimestamp = Nothing |
173 | 4b49a72b | Klaus Aehlig | , qoExecTimestamp = Nothing |
174 | 4b49a72b | Klaus Aehlig | } |
175 | 4b49a72b | Klaus Aehlig | |
176 | 1c1132f4 | Klaus Aehlig | -- | From a job-id and a list of op-codes create a job. This is |
177 | 1c1132f4 | Klaus Aehlig | -- the pure part of job creation, as allocating a new job id |
178 | 1c1132f4 | Klaus Aehlig | -- lives in IO. |
179 | 1c1132f4 | Klaus Aehlig | queuedJobFromOpCodes :: (Monad m) => JobId -> [MetaOpCode] -> m QueuedJob |
180 | 1c1132f4 | Klaus Aehlig | queuedJobFromOpCodes jobid ops = do |
181 | 1c1132f4 | Klaus Aehlig | ops' <- mapM (`resolveDependencies` jobid) ops |
182 | 1c1132f4 | Klaus Aehlig | return QueuedJob { qjId = jobid |
183 | 1c1132f4 | Klaus Aehlig | , qjOps = map queuedOpCodeFromMetaOpCode ops' |
184 | 65a3ff88 | Michele Tartara | , qjReceivedTimestamp = Nothing |
185 | 1c1132f4 | Klaus Aehlig | , qjStartTimestamp = Nothing |
186 | 1c1132f4 | Klaus Aehlig | , qjEndTimestamp = Nothing |
187 | 76b4ac58 | Petr Pudlak | , qjLivelock = Nothing |
188 | 76b4ac58 | Petr Pudlak | , qjProcessId = Nothing |
189 | 1c1132f4 | Klaus Aehlig | } |
190 | 1c1132f4 | Klaus Aehlig | |
191 | 2af22d70 | Klaus Aehlig | -- | Attach a received timestamp to a Queued Job. |
192 | 2af22d70 | Klaus Aehlig | setReceivedTimestamp :: Timestamp -> QueuedJob -> QueuedJob |
193 | 2af22d70 | Klaus Aehlig | setReceivedTimestamp ts job = job { qjReceivedTimestamp = Just ts } |
194 | 2af22d70 | Klaus Aehlig | |
195 | 65a3ff88 | Michele Tartara | -- | Build a timestamp in the format expected by the reason trail (nanoseconds) |
196 | 65a3ff88 | Michele Tartara | -- starting from a JQueue Timestamp. |
197 | 65a3ff88 | Michele Tartara | reasonTrailTimestamp :: Timestamp -> Integer |
198 | 65a3ff88 | Michele Tartara | reasonTrailTimestamp (sec, micro) = |
199 | 65a3ff88 | Michele Tartara | let sec' = toInteger sec |
200 | 65a3ff88 | Michele Tartara | micro' = toInteger micro |
201 | 65a3ff88 | Michele Tartara | in sec' * 1000000000 + micro' * 1000 |
202 | 65a3ff88 | Michele Tartara | |
203 | 65a3ff88 | Michele Tartara | -- | Append an element to the reason trail of an input opcode. |
204 | 65a3ff88 | Michele Tartara | extendInputOpCodeReasonTrail :: JobId -> Timestamp -> Int -> InputOpCode |
205 | 65a3ff88 | Michele Tartara | -> InputOpCode |
206 | 65a3ff88 | Michele Tartara | extendInputOpCodeReasonTrail _ _ _ op@(InvalidOpCode _) = op |
207 | 65a3ff88 | Michele Tartara | extendInputOpCodeReasonTrail jid ts i (ValidOpCode vOp) = |
208 | 65a3ff88 | Michele Tartara | let metaP = metaParams vOp |
209 | 65a3ff88 | Michele Tartara | op = metaOpCode vOp |
210 | 65a3ff88 | Michele Tartara | trail = opReason metaP |
211 | 65a3ff88 | Michele Tartara | reasonSrc = opReasonSrcID op |
212 | 65a3ff88 | Michele Tartara | reasonText = "job=" ++ show (fromJobId jid) ++ ";index=" ++ show i |
213 | 65a3ff88 | Michele Tartara | reason = (reasonSrc, reasonText, reasonTrailTimestamp ts) |
214 | 65a3ff88 | Michele Tartara | trail' = trail ++ [reason] |
215 | 65a3ff88 | Michele Tartara | in ValidOpCode $ vOp { metaParams = metaP { opReason = trail' } } |
216 | 65a3ff88 | Michele Tartara | |
217 | 65a3ff88 | Michele Tartara | -- | Append an element to the reason trail of a queued opcode. |
218 | 65a3ff88 | Michele Tartara | extendOpCodeReasonTrail :: JobId -> Timestamp -> Int -> QueuedOpCode |
219 | 65a3ff88 | Michele Tartara | -> QueuedOpCode |
220 | 65a3ff88 | Michele Tartara | extendOpCodeReasonTrail jid ts i op = |
221 | 65a3ff88 | Michele Tartara | let inOp = qoInput op |
222 | 65a3ff88 | Michele Tartara | in op { qoInput = extendInputOpCodeReasonTrail jid ts i inOp } |
223 | 65a3ff88 | Michele Tartara | |
224 | 65a3ff88 | Michele Tartara | -- | Append an element to the reason trail of all the OpCodes of a queued job. |
225 | 65a3ff88 | Michele Tartara | extendJobReasonTrail :: QueuedJob -> QueuedJob |
226 | 65a3ff88 | Michele Tartara | extendJobReasonTrail job = |
227 | 65a3ff88 | Michele Tartara | let jobId = qjId job |
228 | 65a3ff88 | Michele Tartara | mTimestamp = qjReceivedTimestamp job |
229 | 65a3ff88 | Michele Tartara | -- This function is going to be called on QueuedJobs that already contain |
230 | 65a3ff88 | Michele Tartara | -- a timestamp. But for safety reasons we cannot assume mTimestamp will |
231 | 65a3ff88 | Michele Tartara | -- be (Just timestamp), so we use the value 0 in the extremely unlikely |
232 | 65a3ff88 | Michele Tartara | -- case this is not true. |
233 | 65a3ff88 | Michele Tartara | timestamp = fromMaybe (0, 0) mTimestamp |
234 | 65a3ff88 | Michele Tartara | in job |
235 | 65a3ff88 | Michele Tartara | { qjOps = |
236 | 65a3ff88 | Michele Tartara | zipWith (extendOpCodeReasonTrail jobId timestamp) [0..] $ |
237 | 65a3ff88 | Michele Tartara | qjOps job |
238 | 65a3ff88 | Michele Tartara | } |
239 | 65a3ff88 | Michele Tartara | |
240 | 6a2e71d9 | Klaus Aehlig | -- | From a queued job obtain the list of jobs it depends on. |
241 | 6a2e71d9 | Klaus Aehlig | getJobDependencies :: QueuedJob -> [JobId] |
242 | 6a2e71d9 | Klaus Aehlig | getJobDependencies job = do |
243 | 6a2e71d9 | Klaus Aehlig | op <- qjOps job |
244 | 6a2e71d9 | Klaus Aehlig | mopc <- toMetaOpCode $ qoInput op |
245 | 6a2e71d9 | Klaus Aehlig | dep <- fromMaybe [] . opDepends $ metaParams mopc |
246 | 6a2e71d9 | Klaus Aehlig | getJobIdFromDependency dep |
247 | 6a2e71d9 | Klaus Aehlig | |
248 | a7ab381a | Klaus Aehlig | -- | Change the priority of a QueuedOpCode, if it is not already |
249 | a7ab381a | Klaus Aehlig | -- finalized. |
250 | a7ab381a | Klaus Aehlig | changeOpCodePriority :: Int -> QueuedOpCode -> QueuedOpCode |
251 | a7ab381a | Klaus Aehlig | changeOpCodePriority prio op = |
252 | a7ab381a | Klaus Aehlig | if qoStatus op > OP_STATUS_RUNNING |
253 | a7ab381a | Klaus Aehlig | then op |
254 | a7ab381a | Klaus Aehlig | else op { qoPriority = prio } |
255 | a7ab381a | Klaus Aehlig | |
256 | 363dc9d6 | Klaus Aehlig | -- | Set the state of a QueuedOpCode to canceled. |
257 | 363dc9d6 | Klaus Aehlig | cancelOpCode :: Timestamp -> QueuedOpCode -> QueuedOpCode |
258 | 363dc9d6 | Klaus Aehlig | cancelOpCode now op = |
259 | 363dc9d6 | Klaus Aehlig | op { qoStatus = OP_STATUS_CANCELED, qoEndTimestamp = Just now } |
260 | 363dc9d6 | Klaus Aehlig | |
261 | a6b33b72 | Klaus Aehlig | -- | Change the priority of a job, i.e., change the priority of the |
262 | a6b33b72 | Klaus Aehlig | -- non-finalized opcodes. |
263 | a6b33b72 | Klaus Aehlig | changeJobPriority :: Int -> QueuedJob -> QueuedJob |
264 | a6b33b72 | Klaus Aehlig | changeJobPriority prio job = |
265 | a6b33b72 | Klaus Aehlig | job { qjOps = map (changeOpCodePriority prio) $ qjOps job } |
266 | a6b33b72 | Klaus Aehlig | |
267 | 363dc9d6 | Klaus Aehlig | -- | Transform a QueuedJob that has not been started into its canceled form. |
268 | 363dc9d6 | Klaus Aehlig | cancelQueuedJob :: Timestamp -> QueuedJob -> QueuedJob |
269 | 363dc9d6 | Klaus Aehlig | cancelQueuedJob now job = |
270 | 363dc9d6 | Klaus Aehlig | let ops' = map (cancelOpCode now) $ qjOps job |
271 | 4b887066 | Petr Pudlak | in job { qjOps = ops', qjEndTimestamp = Just now } |
272 | 4b887066 | Petr Pudlak | |
273 | 4b887066 | Petr Pudlak | -- | Set the state of a QueuedOpCode to canceled. |
274 | 58d29849 | Klaus Aehlig | failOpCode :: ReasonElem -> Timestamp -> QueuedOpCode -> QueuedOpCode |
275 | 58d29849 | Klaus Aehlig | failOpCode reason now op = |
276 | 58d29849 | Klaus Aehlig | over (qoInputL . validOpCodeL . metaParamsL . opReasonL) (++ [reason]) |
277 | 4b887066 | Petr Pudlak | op { qoStatus = OP_STATUS_ERROR, qoEndTimestamp = Just now } |
278 | 4b887066 | Petr Pudlak | |
279 | 4b887066 | Petr Pudlak | -- | Transform a QueuedJob that has not been started into its canceled form. |
280 | 58d29849 | Klaus Aehlig | failQueuedJob :: ReasonElem -> Timestamp -> QueuedJob -> QueuedJob |
281 | 58d29849 | Klaus Aehlig | failQueuedJob reason now job = |
282 | 58d29849 | Klaus Aehlig | let ops' = map (failOpCode reason now) $ qjOps job |
283 | 4b887066 | Petr Pudlak | in job { qjOps = ops', qjEndTimestamp = Just now } |
284 | 363dc9d6 | Klaus Aehlig | |
285 | aa79e62e | Iustin Pop | -- | Job file prefix. |
286 | aa79e62e | Iustin Pop | jobFilePrefix :: String |
287 | aa79e62e | Iustin Pop | jobFilePrefix = "job-" |
288 | aa79e62e | Iustin Pop | |
289 | aa79e62e | Iustin Pop | -- | Computes the filename for a given job ID. |
290 | aa79e62e | Iustin Pop | jobFileName :: JobId -> FilePath |
291 | aa79e62e | Iustin Pop | jobFileName jid = jobFilePrefix ++ show (fromJobId jid) |
292 | aa79e62e | Iustin Pop | |
293 | aa79e62e | Iustin Pop | -- | Parses a job ID from a file name. |
294 | aa79e62e | Iustin Pop | parseJobFileId :: (Monad m) => FilePath -> m JobId |
295 | aa79e62e | Iustin Pop | parseJobFileId path = |
296 | aa79e62e | Iustin Pop | case stripPrefix jobFilePrefix path of |
297 | aa79e62e | Iustin Pop | Nothing -> fail $ "Job file '" ++ path ++ |
298 | aa79e62e | Iustin Pop | "' doesn't have the correct prefix" |
299 | aa79e62e | Iustin Pop | Just suffix -> makeJobIdS suffix |
300 | aa79e62e | Iustin Pop | |
301 | aa79e62e | Iustin Pop | -- | Computes the full path to a live job. |
302 | aa79e62e | Iustin Pop | liveJobFile :: FilePath -> JobId -> FilePath |
303 | aa79e62e | Iustin Pop | liveJobFile rootdir jid = rootdir </> jobFileName jid |
304 | aa79e62e | Iustin Pop | |
305 | aa79e62e | Iustin Pop | -- | Computes the full path to an archives job. BROKEN. |
306 | aa79e62e | Iustin Pop | archivedJobFile :: FilePath -> JobId -> FilePath |
307 | aa79e62e | Iustin Pop | archivedJobFile rootdir jid = |
308 | aa79e62e | Iustin Pop | let subdir = show (fromJobId jid `div` C.jstoreJobsPerArchiveDirectory) |
309 | aa79e62e | Iustin Pop | in rootdir </> jobQueueArchiveSubDir </> subdir </> jobFileName jid |
310 | aa79e62e | Iustin Pop | |
311 | aa79e62e | Iustin Pop | -- | Map from opcode status to job status. |
312 | aa79e62e | Iustin Pop | opStatusToJob :: OpStatus -> JobStatus |
313 | aa79e62e | Iustin Pop | opStatusToJob OP_STATUS_QUEUED = JOB_STATUS_QUEUED |
314 | aa79e62e | Iustin Pop | opStatusToJob OP_STATUS_WAITING = JOB_STATUS_WAITING |
315 | aa79e62e | Iustin Pop | opStatusToJob OP_STATUS_SUCCESS = JOB_STATUS_SUCCESS |
316 | aa79e62e | Iustin Pop | opStatusToJob OP_STATUS_RUNNING = JOB_STATUS_RUNNING |
317 | aa79e62e | Iustin Pop | opStatusToJob OP_STATUS_CANCELING = JOB_STATUS_CANCELING |
318 | aa79e62e | Iustin Pop | opStatusToJob OP_STATUS_CANCELED = JOB_STATUS_CANCELED |
319 | aa79e62e | Iustin Pop | opStatusToJob OP_STATUS_ERROR = JOB_STATUS_ERROR |
320 | aa79e62e | Iustin Pop | |
321 | aa79e62e | Iustin Pop | -- | Computes a queued job's status. |
322 | aa79e62e | Iustin Pop | calcJobStatus :: QueuedJob -> JobStatus |
323 | aa79e62e | Iustin Pop | calcJobStatus QueuedJob { qjOps = ops } = |
324 | aa79e62e | Iustin Pop | extractOpSt (map qoStatus ops) JOB_STATUS_QUEUED True |
325 | aa79e62e | Iustin Pop | where |
326 | aa79e62e | Iustin Pop | terminalStatus OP_STATUS_ERROR = True |
327 | aa79e62e | Iustin Pop | terminalStatus OP_STATUS_CANCELING = True |
328 | aa79e62e | Iustin Pop | terminalStatus OP_STATUS_CANCELED = True |
329 | aa79e62e | Iustin Pop | terminalStatus _ = False |
330 | aa79e62e | Iustin Pop | softStatus OP_STATUS_SUCCESS = True |
331 | aa79e62e | Iustin Pop | softStatus OP_STATUS_QUEUED = True |
332 | aa79e62e | Iustin Pop | softStatus _ = False |
333 | aa79e62e | Iustin Pop | extractOpSt [] _ True = JOB_STATUS_SUCCESS |
334 | aa79e62e | Iustin Pop | extractOpSt [] d False = d |
335 | aa79e62e | Iustin Pop | extractOpSt (x:xs) d old_all |
336 | aa79e62e | Iustin Pop | | terminalStatus x = opStatusToJob x -- abort recursion |
337 | aa79e62e | Iustin Pop | | softStatus x = extractOpSt xs d new_all -- continue unchanged |
338 | aa79e62e | Iustin Pop | | otherwise = extractOpSt xs (opStatusToJob x) new_all |
339 | aa79e62e | Iustin Pop | where new_all = x == OP_STATUS_SUCCESS && old_all |
340 | aa79e62e | Iustin Pop | |
341 | 02e40b13 | Klaus Aehlig | -- | Determine if a job has started |
342 | 02e40b13 | Klaus Aehlig | jobStarted :: QueuedJob -> Bool |
343 | 02e40b13 | Klaus Aehlig | jobStarted = (> JOB_STATUS_QUEUED) . calcJobStatus |
344 | 02e40b13 | Klaus Aehlig | |
345 | 847df9e9 | Klaus Aehlig | -- | Determine if a job is finalised. |
346 | 847df9e9 | Klaus Aehlig | jobFinalized :: QueuedJob -> Bool |
347 | 847df9e9 | Klaus Aehlig | jobFinalized = (> JOB_STATUS_RUNNING) . calcJobStatus |
348 | 847df9e9 | Klaus Aehlig | |
349 | 370f63be | Klaus Aehlig | -- | Determine if a job is finalized and its timestamp is before |
350 | 370f63be | Klaus Aehlig | -- a given time. |
351 | 370f63be | Klaus Aehlig | jobArchivable :: Timestamp -> QueuedJob -> Bool |
352 | 65a3ff88 | Michele Tartara | jobArchivable ts = liftA2 (&&) jobFinalized |
353 | 370f63be | Klaus Aehlig | $ maybe False (< ts) |
354 | 370f63be | Klaus Aehlig | . liftA2 (<|>) qjEndTimestamp qjStartTimestamp |
355 | 370f63be | Klaus Aehlig | |
356 | aa79e62e | Iustin Pop | -- | Determine whether an opcode status is finalized. |
357 | aa79e62e | Iustin Pop | opStatusFinalized :: OpStatus -> Bool |
358 | aa79e62e | Iustin Pop | opStatusFinalized = (> OP_STATUS_RUNNING) |
359 | aa79e62e | Iustin Pop | |
360 | aa79e62e | Iustin Pop | -- | Compute a job's priority. |
361 | aa79e62e | Iustin Pop | calcJobPriority :: QueuedJob -> Int |
362 | aa79e62e | Iustin Pop | calcJobPriority QueuedJob { qjOps = ops } = |
363 | aa79e62e | Iustin Pop | helper . map qoPriority $ filter (not . opStatusFinalized . qoStatus) ops |
364 | aa79e62e | Iustin Pop | where helper [] = C.opPrioDefault |
365 | aa79e62e | Iustin Pop | helper ps = minimum ps |
366 | aa79e62e | Iustin Pop | |
367 | aa79e62e | Iustin Pop | -- | Log but ignore an 'IOError'. |
368 | aa79e62e | Iustin Pop | ignoreIOError :: a -> Bool -> String -> IOError -> IO a |
369 | aa79e62e | Iustin Pop | ignoreIOError a ignore_noent msg e = do |
370 | aa79e62e | Iustin Pop | unless (isDoesNotExistError e && ignore_noent) . |
371 | aa79e62e | Iustin Pop | logWarning $ msg ++ ": " ++ show e |
372 | aa79e62e | Iustin Pop | return a |
373 | aa79e62e | Iustin Pop | |
374 | aa79e62e | Iustin Pop | -- | Compute the list of existing archive directories. Note that I/O |
375 | aa79e62e | Iustin Pop | -- exceptions are swallowed and ignored. |
376 | aa79e62e | Iustin Pop | allArchiveDirs :: FilePath -> IO [FilePath] |
377 | aa79e62e | Iustin Pop | allArchiveDirs rootdir = do |
378 | aa79e62e | Iustin Pop | let adir = rootdir </> jobQueueArchiveSubDir |
379 | aa79e62e | Iustin Pop | contents <- getDirectoryContents adir `Control.Exception.catch` |
380 | aa79e62e | Iustin Pop | ignoreIOError [] False |
381 | aa79e62e | Iustin Pop | ("Failed to list queue directory " ++ adir) |
382 | aa79e62e | Iustin Pop | let fpaths = map (adir </>) $ filter (not . ("." `isPrefixOf`)) contents |
383 | aa79e62e | Iustin Pop | filterM (\path -> |
384 | aa79e62e | Iustin Pop | liftM isDirectory (getFileStatus (adir </> path)) |
385 | aa79e62e | Iustin Pop | `Control.Exception.catch` |
386 | aa79e62e | Iustin Pop | ignoreIOError False True |
387 | aa79e62e | Iustin Pop | ("Failed to stat archive path " ++ path)) fpaths |
388 | aa79e62e | Iustin Pop | |
389 | aa79e62e | Iustin Pop | -- | Build list of directories containing job files. Note: compared to |
390 | aa79e62e | Iustin Pop | -- the Python version, this doesn't ignore a potential lost+found |
391 | aa79e62e | Iustin Pop | -- file. |
392 | aa79e62e | Iustin Pop | determineJobDirectories :: FilePath -> Bool -> IO [FilePath] |
393 | aa79e62e | Iustin Pop | determineJobDirectories rootdir archived = do |
394 | aa79e62e | Iustin Pop | other <- if archived |
395 | aa79e62e | Iustin Pop | then allArchiveDirs rootdir |
396 | aa79e62e | Iustin Pop | else return [] |
397 | aa79e62e | Iustin Pop | return $ rootdir:other |
398 | aa79e62e | Iustin Pop | |
399 | aa79e62e | Iustin Pop | -- | Computes the list of all jobs in the given directories. |
400 | ea7032da | Petr Pudlak | getJobIDs :: [FilePath] -> IO (GenericResult IOError [JobId]) |
401 | ea7032da | Petr Pudlak | getJobIDs = runResultT . liftM concat . mapM getDirJobIDs |
402 | aa79e62e | Iustin Pop | |
403 | aa79e62e | Iustin Pop | -- | Sorts the a list of job IDs. |
404 | aa79e62e | Iustin Pop | sortJobIDs :: [JobId] -> [JobId] |
405 | aa79e62e | Iustin Pop | sortJobIDs = sortBy (comparing fromJobId) |
406 | aa79e62e | Iustin Pop | |
407 | aa79e62e | Iustin Pop | -- | Computes the list of jobs in a given directory. |
408 | ea7032da | Petr Pudlak | getDirJobIDs :: FilePath -> ResultT IOError IO [JobId] |
409 | ea7032da | Petr Pudlak | getDirJobIDs path = |
410 | ea7032da | Petr Pudlak | withErrorLogAt WARNING ("Failed to list job directory " ++ path) . |
411 | ea7032da | Petr Pudlak | liftM (mapMaybe parseJobFileId) $ liftIO (getDirectoryContents path) |
412 | aa79e62e | Iustin Pop | |
413 | aa79e62e | Iustin Pop | -- | Reads the job data from disk. |
414 | aa79e62e | Iustin Pop | readJobDataFromDisk :: FilePath -> Bool -> JobId -> IO (Maybe (String, Bool)) |
415 | aa79e62e | Iustin Pop | readJobDataFromDisk rootdir archived jid = do |
416 | aa79e62e | Iustin Pop | let live_path = liveJobFile rootdir jid |
417 | aa79e62e | Iustin Pop | archived_path = archivedJobFile rootdir jid |
418 | aa79e62e | Iustin Pop | all_paths = if archived |
419 | aa79e62e | Iustin Pop | then [(live_path, False), (archived_path, True)] |
420 | aa79e62e | Iustin Pop | else [(live_path, False)] |
421 | aa79e62e | Iustin Pop | foldM (\state (path, isarchived) -> |
422 | aa79e62e | Iustin Pop | liftM (\r -> Just (r, isarchived)) (readFile path) |
423 | aa79e62e | Iustin Pop | `Control.Exception.catch` |
424 | aa79e62e | Iustin Pop | ignoreIOError state True |
425 | aa79e62e | Iustin Pop | ("Failed to read job file " ++ path)) Nothing all_paths |
426 | aa79e62e | Iustin Pop | |
427 | aa79e62e | Iustin Pop | -- | Failed to load job error. |
428 | aa79e62e | Iustin Pop | noSuchJob :: Result (QueuedJob, Bool) |
429 | aa79e62e | Iustin Pop | noSuchJob = Bad "Can't load job file" |
430 | aa79e62e | Iustin Pop | |
431 | aa79e62e | Iustin Pop | -- | Loads a job from disk. |
432 | aa79e62e | Iustin Pop | loadJobFromDisk :: FilePath -> Bool -> JobId -> IO (Result (QueuedJob, Bool)) |
433 | aa79e62e | Iustin Pop | loadJobFromDisk rootdir archived jid = do |
434 | aa79e62e | Iustin Pop | raw <- readJobDataFromDisk rootdir archived jid |
435 | aa79e62e | Iustin Pop | -- note: we need some stricness below, otherwise the wrapping in a |
436 | aa79e62e | Iustin Pop | -- Result will create too much lazyness, and not close the file |
437 | aa79e62e | Iustin Pop | -- descriptors for the individual jobs |
438 | aa79e62e | Iustin Pop | return $! case raw of |
439 | aa79e62e | Iustin Pop | Nothing -> noSuchJob |
440 | aa79e62e | Iustin Pop | Just (str, arch) -> |
441 | aa79e62e | Iustin Pop | liftM (\qj -> (qj, arch)) . |
442 | aa79e62e | Iustin Pop | fromJResult "Parsing job file" $ Text.JSON.decode str |
443 | cef3f99f | Klaus Aehlig | |
444 | b498ed42 | Klaus Aehlig | -- | Write a job to disk. |
445 | b498ed42 | Klaus Aehlig | writeJobToDisk :: FilePath -> QueuedJob -> IO (Result ()) |
446 | b498ed42 | Klaus Aehlig | writeJobToDisk rootdir job = do |
447 | b498ed42 | Klaus Aehlig | let filename = liveJobFile rootdir . qjId $ job |
448 | b498ed42 | Klaus Aehlig | content = Text.JSON.encode . Text.JSON.showJSON $ job |
449 | b498ed42 | Klaus Aehlig | tryAndLogIOError (atomicWriteFile filename content) |
450 | b498ed42 | Klaus Aehlig | ("Failed to write " ++ filename) Ok |
451 | b498ed42 | Klaus Aehlig | |
452 | b5a96995 | Klaus Aehlig | -- | Replicate a job to all master candidates. |
453 | b5a96995 | Klaus Aehlig | replicateJob :: FilePath -> [Node] -> QueuedJob -> IO [(Node, ERpcError ())] |
454 | b5a96995 | Klaus Aehlig | replicateJob rootdir mastercandidates job = do |
455 | b5a96995 | Klaus Aehlig | let filename = liveJobFile rootdir . qjId $ job |
456 | b5a96995 | Klaus Aehlig | content = Text.JSON.encode . Text.JSON.showJSON $ job |
457 | 77676415 | Klaus Aehlig | filename' <- makeVirtualPath filename |
458 | 557f5dad | Klaus Aehlig | callresult <- executeRpcCall mastercandidates |
459 | 77676415 | Klaus Aehlig | $ RpcCallJobqueueUpdate filename' content |
460 | 557f5dad | Klaus Aehlig | let result = map (second (() <$)) callresult |
461 | 8e527d04 | Petr Pudlak | _ <- logRpcErrors result |
462 | b5a96995 | Klaus Aehlig | return result |
463 | b5a96995 | Klaus Aehlig | |
464 | 9fd653a4 | Klaus Aehlig | -- | Replicate many jobs to all master candidates. |
465 | 9fd653a4 | Klaus Aehlig | replicateManyJobs :: FilePath -> [Node] -> [QueuedJob] -> IO () |
466 | 9fd653a4 | Klaus Aehlig | replicateManyJobs rootdir mastercandidates = |
467 | 9fd653a4 | Klaus Aehlig | mapM_ (replicateJob rootdir mastercandidates) |
468 | 9fd653a4 | Klaus Aehlig | |
469 | 7b0a9096 | Petr Pudlak | -- | Writes a job to a file and replicates it to master candidates. |
470 | 7b0a9096 | Petr Pudlak | writeAndReplicateJob :: (Error e) |
471 | 7b0a9096 | Petr Pudlak | => ConfigData -> FilePath -> QueuedJob |
472 | 7b0a9096 | Petr Pudlak | -> ResultT e IO [(Node, ERpcError ())] |
473 | 7b0a9096 | Petr Pudlak | writeAndReplicateJob cfg rootdir job = do |
474 | 7b0a9096 | Petr Pudlak | mkResultT $ writeJobToDisk rootdir job |
475 | 7b0a9096 | Petr Pudlak | liftIO $ replicateJob rootdir (Config.getMasterCandidates cfg) job |
476 | 7b0a9096 | Petr Pudlak | |
477 | cef3f99f | Klaus Aehlig | -- | Read the job serial number from disk. |
478 | cef3f99f | Klaus Aehlig | readSerialFromDisk :: IO (Result JobId) |
479 | cef3f99f | Klaus Aehlig | readSerialFromDisk = do |
480 | cef3f99f | Klaus Aehlig | filename <- jobQueueSerialFile |
481 | cef3f99f | Klaus Aehlig | tryAndLogIOError (readFile filename) "Failed to read serial file" |
482 | cef3f99f | Klaus Aehlig | (makeJobIdS . rStripSpace) |
483 | ae858516 | Klaus Aehlig | |
484 | ae858516 | Klaus Aehlig | -- | Allocate new job ids. |
485 | ae858516 | Klaus Aehlig | -- To avoid races while accessing the serial file, the threads synchronize |
486 | ae858516 | Klaus Aehlig | -- over a lock, as usual provided by an MVar. |
487 | ae858516 | Klaus Aehlig | allocateJobIds :: [Node] -> MVar () -> Int -> IO (Result [JobId]) |
488 | ae858516 | Klaus Aehlig | allocateJobIds mastercandidates lock n = |
489 | ae858516 | Klaus Aehlig | if n <= 0 |
490 | ae858516 | Klaus Aehlig | then return . Bad $ "Can only allocate positive number of job ids" |
491 | ae858516 | Klaus Aehlig | else do |
492 | ae858516 | Klaus Aehlig | takeMVar lock |
493 | ae858516 | Klaus Aehlig | rjobid <- readSerialFromDisk |
494 | ae858516 | Klaus Aehlig | case rjobid of |
495 | ae858516 | Klaus Aehlig | Bad s -> do |
496 | ae858516 | Klaus Aehlig | putMVar lock () |
497 | ae858516 | Klaus Aehlig | return . Bad $ s |
498 | ae858516 | Klaus Aehlig | Ok jid -> do |
499 | ae858516 | Klaus Aehlig | let current = fromJobId jid |
500 | ae858516 | Klaus Aehlig | serial_content = show (current + n) ++ "\n" |
501 | ae858516 | Klaus Aehlig | serial <- jobQueueSerialFile |
502 | ae858516 | Klaus Aehlig | write_result <- try $ atomicWriteFile serial serial_content |
503 | ae858516 | Klaus Aehlig | :: IO (Either IOError ()) |
504 | ae858516 | Klaus Aehlig | case write_result of |
505 | ae858516 | Klaus Aehlig | Left e -> do |
506 | f7819050 | Klaus Aehlig | putMVar lock () |
507 | ae858516 | Klaus Aehlig | let msg = "Failed to write serial file: " ++ show e |
508 | ae858516 | Klaus Aehlig | logError msg |
509 | 65a3ff88 | Michele Tartara | return . Bad $ msg |
510 | ae858516 | Klaus Aehlig | Right () -> do |
511 | 77676415 | Klaus Aehlig | serial' <- makeVirtualPath serial |
512 | ae858516 | Klaus Aehlig | _ <- executeRpcCall mastercandidates |
513 | 77676415 | Klaus Aehlig | $ RpcCallJobqueueUpdate serial' serial_content |
514 | f7819050 | Klaus Aehlig | putMVar lock () |
515 | ae858516 | Klaus Aehlig | return $ mapM makeJobId [(current+1)..(current+n)] |
516 | ae858516 | Klaus Aehlig | |
517 | ae858516 | Klaus Aehlig | -- | Allocate one new job id. |
518 | ae858516 | Klaus Aehlig | allocateJobId :: [Node] -> MVar () -> IO (Result JobId) |
519 | ae858516 | Klaus Aehlig | allocateJobId mastercandidates lock = do |
520 | ae858516 | Klaus Aehlig | jids <- allocateJobIds mastercandidates lock 1 |
521 | ae858516 | Klaus Aehlig | return (jids >>= monadicThe "Failed to allocate precisely one Job ID") |
522 | 1b94c0db | Klaus Aehlig | |
523 | 1b94c0db | Klaus Aehlig | -- | Decide if job queue is open |
524 | 1b94c0db | Klaus Aehlig | isQueueOpen :: IO Bool |
525 | 1b94c0db | Klaus Aehlig | isQueueOpen = liftM not (jobQueueDrainFile >>= doesFileExist) |
526 | 493d6920 | Klaus Aehlig | |
527 | efb4c025 | Petr Pudlak | -- | Start enqueued jobs by executing the Python code. |
528 | efb4c025 | Petr Pudlak | startJobs :: ConfigData |
529 | efb4c025 | Petr Pudlak | -> Livelock -- ^ Luxi's livelock path |
530 | efb4c025 | Petr Pudlak | -> [QueuedJob] -- ^ the list of jobs to start |
531 | efb4c025 | Petr Pudlak | -> IO [ErrorResult QueuedJob] |
532 | efb4c025 | Petr Pudlak | startJobs cfg luxiLivelock jobs = do |
533 | efb4c025 | Petr Pudlak | qdir <- queueDir |
534 | ec98ea2b | Petr Pudlak | let updateJob job llfile = |
535 | ec98ea2b | Petr Pudlak | void . writeAndReplicateJob cfg qdir $ job { qjLivelock = Just llfile } |
536 | efb4c025 | Petr Pudlak | let runJob job = do |
537 | ec98ea2b | Petr Pudlak | (llfile, _) <- Exec.forkJobProcess (qjId job) luxiLivelock |
538 | ec98ea2b | Petr Pudlak | (updateJob job) |
539 | efb4c025 | Petr Pudlak | return $ job { qjLivelock = Just llfile } |
540 | efb4c025 | Petr Pudlak | mapM (runResultT . runJob) jobs |
541 | 47c3c7b1 | Klaus Aehlig | |
542 | ec98ea2b | Petr Pudlak | -- | Waits for a job to finalize its execution. |
543 | ec98ea2b | Petr Pudlak | waitForJob :: JobId -> Int -> ResultG (Bool, String) |
544 | ec98ea2b | Petr Pudlak | waitForJob jid tmout = do |
545 | ec98ea2b | Petr Pudlak | qDir <- liftIO queueDir |
546 | ec98ea2b | Petr Pudlak | let jobfile = liveJobFile qDir jid |
547 | ec98ea2b | Petr Pudlak | load = liftM fst <$> loadJobFromDisk qDir False jid |
548 | cab9400a | Petr Pudlak | finalizedR = genericResult (const False) jobFinalized |
549 | cab9400a | Petr Pudlak | jobR <- liftIO $ watchFileBy jobfile tmout finalizedR load |
550 | ec98ea2b | Petr Pudlak | case calcJobStatus <$> jobR of |
551 | ec98ea2b | Petr Pudlak | Ok s | s == JOB_STATUS_CANCELED -> |
552 | ec98ea2b | Petr Pudlak | return (True, "Job successfully cancelled") |
553 | cab9400a | Petr Pudlak | | finalizedR jobR -> |
554 | cab9400a | Petr Pudlak | return (False, "Job exited before it could have been canceled,\ |
555 | cab9400a | Petr Pudlak | \ status " ++ show s) |
556 | ec98ea2b | Petr Pudlak | | otherwise -> |
557 | cab9400a | Petr Pudlak | return (False, "Job could not have been cancelel, status " |
558 | cab9400a | Petr Pudlak | ++ show s) |
559 | ec98ea2b | Petr Pudlak | Bad e -> failError $ "Can't read job status: " ++ e |
560 | ec98ea2b | Petr Pudlak | |
561 | 47c3c7b1 | Klaus Aehlig | -- | Try to cancel a job that has already been handed over to execution, |
562 | ec98ea2b | Petr Pudlak | -- by terminating the process. |
563 | c666e6aa | Petr Pudlak | cancelJob :: Livelock -- ^ Luxi's livelock path |
564 | c666e6aa | Petr Pudlak | -> JobId -- ^ the job to cancel |
565 | c666e6aa | Petr Pudlak | -> IO (ErrorResult (Bool, String)) |
566 | c666e6aa | Petr Pudlak | cancelJob luxiLivelock jid = runResultT $ do |
567 | ec98ea2b | Petr Pudlak | -- we can't terminate the job if it's just being started, so |
568 | ec98ea2b | Petr Pudlak | -- retry several times in such a case |
569 | ec98ea2b | Petr Pudlak | result <- runMaybeT . msum . flip map [0..5 :: Int] $ \tryNo -> do |
570 | ec98ea2b | Petr Pudlak | -- if we're retrying, sleep for some time |
571 | ec98ea2b | Petr Pudlak | when (tryNo > 0) . liftIO . threadDelay $ 100000 * (2 ^ tryNo) |
572 | ec98ea2b | Petr Pudlak | |
573 | ec98ea2b | Petr Pudlak | -- first check if the job is alive so that we don't kill some other |
574 | ec98ea2b | Petr Pudlak | -- process by accident |
575 | ec98ea2b | Petr Pudlak | qDir <- liftIO queueDir |
576 | ec98ea2b | Petr Pudlak | (job, _) <- lift . mkResultT $ loadJobFromDisk qDir True jid |
577 | ec98ea2b | Petr Pudlak | let jName = ("Job " ++) . show . fromJobId . qjId $ job |
578 | c666e6aa | Petr Pudlak | dead <- maybe (return False) (liftIO . isDead) |
579 | c666e6aa | Petr Pudlak | . mfilter (/= luxiLivelock) |
580 | c666e6aa | Petr Pudlak | $ qjLivelock job |
581 | ec98ea2b | Petr Pudlak | case qjProcessId job of |
582 | ec98ea2b | Petr Pudlak | _ | dead -> |
583 | ec98ea2b | Petr Pudlak | return (True, jName ++ " has been already dead") |
584 | ec98ea2b | Petr Pudlak | Just pid -> do |
585 | ec98ea2b | Petr Pudlak | liftIO $ signalProcess sigTERM pid |
586 | ec98ea2b | Petr Pudlak | lift $ waitForJob jid C.luxiCancelJobTimeout |
587 | ec98ea2b | Petr Pudlak | _ -> do |
588 | ec98ea2b | Petr Pudlak | logDebug $ jName ++ " in its startup phase, retrying" |
589 | ec98ea2b | Petr Pudlak | mzero |
590 | ec98ea2b | Petr Pudlak | return $ fromMaybe (False, "Timeout: job still in its startup phase") result |
591 | c867cfe1 | Klaus Aehlig | |
592 | f23daea8 | Klaus Aehlig | -- | Permissions for the archive directories. |
593 | f23daea8 | Klaus Aehlig | queueDirPermissions :: FilePermissions |
594 | f23daea8 | Klaus Aehlig | queueDirPermissions = FilePermissions { fpOwner = Just C.masterdUser |
595 | f23daea8 | Klaus Aehlig | , fpGroup = Just C.daemonsGroup |
596 | f23daea8 | Klaus Aehlig | , fpPermissions = 0o0750 |
597 | f23daea8 | Klaus Aehlig | } |
598 | f23daea8 | Klaus Aehlig | |
599 | c867cfe1 | Klaus Aehlig | -- | Try, at most until the given endtime, to archive some of the given |
600 | c867cfe1 | Klaus Aehlig | -- jobs, if they are older than the specified cut-off time; also replicate |
601 | c867cfe1 | Klaus Aehlig | -- archival of the additional jobs. Return the pair of the number of jobs |
602 | c867cfe1 | Klaus Aehlig | -- archived, and the number of jobs remaining int he queue, asuming the |
603 | c867cfe1 | Klaus Aehlig | -- given numbers about the not considered jobs. |
604 | c867cfe1 | Klaus Aehlig | archiveSomeJobsUntil :: ([JobId] -> IO ()) -- ^ replication function |
605 | c867cfe1 | Klaus Aehlig | -> FilePath -- ^ queue root directory |
606 | c867cfe1 | Klaus Aehlig | -> ClockTime -- ^ Endtime |
607 | c867cfe1 | Klaus Aehlig | -> Timestamp -- ^ cut-off time for archiving jobs |
608 | c867cfe1 | Klaus Aehlig | -> Int -- ^ number of jobs alread archived |
609 | c867cfe1 | Klaus Aehlig | -> [JobId] -- ^ Additional jobs to replicate |
610 | c867cfe1 | Klaus Aehlig | -> [JobId] -- ^ List of job-ids still to consider |
611 | c867cfe1 | Klaus Aehlig | -> IO (Int, Int) |
612 | c867cfe1 | Klaus Aehlig | archiveSomeJobsUntil replicateFn _ _ _ arch torepl [] = do |
613 | c867cfe1 | Klaus Aehlig | unless (null torepl) . (>> return ()) |
614 | c867cfe1 | Klaus Aehlig | . forkIO $ replicateFn torepl |
615 | c867cfe1 | Klaus Aehlig | return (arch, 0) |
616 | c867cfe1 | Klaus Aehlig | |
617 | c867cfe1 | Klaus Aehlig | archiveSomeJobsUntil replicateFn qDir endt cutt arch torepl (jid:jids) = do |
618 | c867cfe1 | Klaus Aehlig | let archiveMore = archiveSomeJobsUntil replicateFn qDir endt cutt |
619 | c867cfe1 | Klaus Aehlig | continue = archiveMore arch torepl jids |
620 | c867cfe1 | Klaus Aehlig | jidname = show $ fromJobId jid |
621 | c867cfe1 | Klaus Aehlig | time <- getClockTime |
622 | c867cfe1 | Klaus Aehlig | if time >= endt |
623 | c867cfe1 | Klaus Aehlig | then do |
624 | c867cfe1 | Klaus Aehlig | _ <- forkIO $ replicateFn torepl |
625 | c867cfe1 | Klaus Aehlig | return (arch, length (jid:jids)) |
626 | c867cfe1 | Klaus Aehlig | else do |
627 | c867cfe1 | Klaus Aehlig | logDebug $ "Inspecting job " ++ jidname ++ " for archival" |
628 | c867cfe1 | Klaus Aehlig | loadResult <- loadJobFromDisk qDir False jid |
629 | c867cfe1 | Klaus Aehlig | case loadResult of |
630 | c867cfe1 | Klaus Aehlig | Bad _ -> continue |
631 | 65a3ff88 | Michele Tartara | Ok (job, _) -> |
632 | c867cfe1 | Klaus Aehlig | if jobArchivable cutt job |
633 | c867cfe1 | Klaus Aehlig | then do |
634 | c867cfe1 | Klaus Aehlig | let live = liveJobFile qDir jid |
635 | c867cfe1 | Klaus Aehlig | archive = archivedJobFile qDir jid |
636 | 0c09ecc2 | Klaus Aehlig | renameResult <- safeRenameFile queueDirPermissions |
637 | 0c09ecc2 | Klaus Aehlig | live archive |
638 | 65a3ff88 | Michele Tartara | case renameResult of |
639 | c867cfe1 | Klaus Aehlig | Bad s -> do |
640 | c867cfe1 | Klaus Aehlig | logWarning $ "Renaming " ++ live ++ " to " ++ archive |
641 | c867cfe1 | Klaus Aehlig | ++ " failed unexpectedly: " ++ s |
642 | c867cfe1 | Klaus Aehlig | continue |
643 | c867cfe1 | Klaus Aehlig | Ok () -> do |
644 | c867cfe1 | Klaus Aehlig | let torepl' = jid:torepl |
645 | c867cfe1 | Klaus Aehlig | if length torepl' >= 10 |
646 | c867cfe1 | Klaus Aehlig | then do |
647 | c867cfe1 | Klaus Aehlig | _ <- forkIO $ replicateFn torepl' |
648 | c867cfe1 | Klaus Aehlig | archiveMore (arch + 1) [] jids |
649 | c867cfe1 | Klaus Aehlig | else archiveMore (arch + 1) torepl' jids |
650 | c867cfe1 | Klaus Aehlig | else continue |
651 | 65a3ff88 | Michele Tartara | |
652 | c867cfe1 | Klaus Aehlig | -- | Archive jobs older than the given time, but do not exceed the timeout for |
653 | c867cfe1 | Klaus Aehlig | -- carrying out this task. |
654 | c867cfe1 | Klaus Aehlig | archiveJobs :: ConfigData -- ^ cluster configuration |
655 | c867cfe1 | Klaus Aehlig | -> Int -- ^ time the job has to be in the past in order |
656 | c867cfe1 | Klaus Aehlig | -- to be archived |
657 | c867cfe1 | Klaus Aehlig | -> Int -- ^ timeout |
658 | c867cfe1 | Klaus Aehlig | -> [JobId] -- ^ jobs to consider |
659 | c867cfe1 | Klaus Aehlig | -> IO (Int, Int) |
660 | c867cfe1 | Klaus Aehlig | archiveJobs cfg age timeout jids = do |
661 | c867cfe1 | Klaus Aehlig | now <- getClockTime |
662 | c867cfe1 | Klaus Aehlig | qDir <- queueDir |
663 | c867cfe1 | Klaus Aehlig | let endtime = addToClockTime (noTimeDiff { tdSec = timeout }) now |
664 | c867cfe1 | Klaus Aehlig | cuttime = if age < 0 then noTimestamp |
665 | c867cfe1 | Klaus Aehlig | else advanceTimestamp (- age) (fromClockTime now) |
666 | c867cfe1 | Klaus Aehlig | mcs = Config.getMasterCandidates cfg |
667 | c867cfe1 | Klaus Aehlig | replicateFn jobs = do |
668 | c867cfe1 | Klaus Aehlig | let olds = map (liveJobFile qDir) jobs |
669 | c867cfe1 | Klaus Aehlig | news = map (archivedJobFile qDir) jobs |
670 | c867cfe1 | Klaus Aehlig | _ <- executeRpcCall mcs . RpcCallJobqueueRename $ zip olds news |
671 | c867cfe1 | Klaus Aehlig | return () |
672 | c867cfe1 | Klaus Aehlig | archiveSomeJobsUntil replicateFn qDir endtime cuttime 0 [] jids |