root / src / Ganeti / JQueue.hs @ 8e527d04
History | View | Annotate | Download (23.2 kB)
1 |
{-# LANGUAGE TemplateHaskell #-} |
---|---|
2 |
|
3 |
{-| Implementation of the job queue. |
4 |
|
5 |
-} |
6 |
|
7 |
{- |
8 |
|
9 |
Copyright (C) 2010, 2012 Google Inc. |
10 |
|
11 |
This program is free software; you can redistribute it and/or modify |
12 |
it under the terms of the GNU General Public License as published by |
13 |
the Free Software Foundation; either version 2 of the License, or |
14 |
(at your option) any later version. |
15 |
|
16 |
This program is distributed in the hope that it will be useful, but |
17 |
WITHOUT ANY WARRANTY; without even the implied warranty of |
18 |
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU |
19 |
General Public License for more details. |
20 |
|
21 |
You should have received a copy of the GNU General Public License |
22 |
along with this program; if not, write to the Free Software |
23 |
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA |
24 |
02110-1301, USA. |
25 |
|
26 |
-} |
27 |
|
28 |
module Ganeti.JQueue |
29 |
( QueuedOpCode(..) |
30 |
, QueuedJob(..) |
31 |
, InputOpCode(..) |
32 |
, queuedOpCodeFromMetaOpCode |
33 |
, queuedJobFromOpCodes |
34 |
, changeOpCodePriority |
35 |
, changeJobPriority |
36 |
, cancelQueuedJob |
37 |
, Timestamp |
38 |
, fromClockTime |
39 |
, noTimestamp |
40 |
, currentTimestamp |
41 |
, advanceTimestamp |
42 |
, setReceivedTimestamp |
43 |
, extendJobReasonTrail |
44 |
, opStatusFinalized |
45 |
, extractOpSummary |
46 |
, calcJobStatus |
47 |
, jobStarted |
48 |
, jobFinalized |
49 |
, jobArchivable |
50 |
, calcJobPriority |
51 |
, jobFileName |
52 |
, liveJobFile |
53 |
, archivedJobFile |
54 |
, determineJobDirectories |
55 |
, getJobIDs |
56 |
, sortJobIDs |
57 |
, loadJobFromDisk |
58 |
, noSuchJob |
59 |
, readSerialFromDisk |
60 |
, allocateJobIds |
61 |
, allocateJobId |
62 |
, writeJobToDisk |
63 |
, replicateManyJobs |
64 |
, isQueueOpen |
65 |
, startJobs |
66 |
, cancelJob |
67 |
, queueDirPermissions |
68 |
, archiveJobs |
69 |
) where |
70 |
|
71 |
import Control.Applicative (liftA2, (<|>)) |
72 |
import Control.Arrow (first, second) |
73 |
import Control.Concurrent (forkIO) |
74 |
import Control.Concurrent.MVar |
75 |
import Control.Exception |
76 |
import Control.Monad |
77 |
import Control.Monad.IO.Class |
78 |
import Data.Functor ((<$)) |
79 |
import Data.List |
80 |
import Data.Maybe |
81 |
import Data.Ord (comparing) |
82 |
-- workaround what seems to be a bug in ghc 7.4's TH shadowing code |
83 |
import Prelude hiding (id, log) |
84 |
import System.Directory |
85 |
import System.FilePath |
86 |
import System.IO.Error (isDoesNotExistError) |
87 |
import System.Posix.Files |
88 |
import System.Time |
89 |
import qualified Text.JSON |
90 |
import Text.JSON.Types |
91 |
|
92 |
import Ganeti.BasicTypes |
93 |
import qualified Ganeti.Config as Config |
94 |
import qualified Ganeti.Constants as C |
95 |
import Ganeti.Errors (ErrorResult) |
96 |
import Ganeti.JSON |
97 |
import Ganeti.Logging |
98 |
import Ganeti.Luxi |
99 |
import Ganeti.Objects (ConfigData, Node) |
100 |
import Ganeti.OpCodes |
101 |
import Ganeti.Path |
102 |
import Ganeti.Rpc (executeRpcCall, ERpcError, logRpcErrors, |
103 |
RpcCallJobqueueUpdate(..), RpcCallJobqueueRename(..)) |
104 |
import Ganeti.THH |
105 |
import Ganeti.Types |
106 |
import Ganeti.Utils |
107 |
import Ganeti.Utils.Atomic |
108 |
import Ganeti.VCluster (makeVirtualPath) |
109 |
|
110 |
-- * Data types |
111 |
|
112 |
-- | The ganeti queue timestamp type. It represents the time as the pair |
113 |
-- of seconds since the epoch and microseconds since the beginning of the |
114 |
-- second. |
115 |
type Timestamp = (Int, Int) |
116 |
|
117 |
-- | Missing timestamp type. |
118 |
noTimestamp :: Timestamp |
119 |
noTimestamp = (-1, -1) |
120 |
|
121 |
-- | Obtain a Timestamp from a given clock time |
122 |
fromClockTime :: ClockTime -> Timestamp |
123 |
fromClockTime (TOD ctime pico) = |
124 |
(fromIntegral ctime, fromIntegral $ pico `div` 1000000) |
125 |
|
126 |
-- | Get the current time in the job-queue timestamp format. |
127 |
currentTimestamp :: IO Timestamp |
128 |
currentTimestamp = fromClockTime `liftM` getClockTime |
129 |
|
130 |
-- | From a given timestamp, obtain the timestamp of the |
131 |
-- time that is the given number of seconds later. |
132 |
advanceTimestamp :: Int -> Timestamp -> Timestamp |
133 |
advanceTimestamp = first . (+) |
134 |
|
135 |
-- | An input opcode. |
136 |
data InputOpCode = ValidOpCode MetaOpCode -- ^ OpCode was parsed successfully |
137 |
| InvalidOpCode JSValue -- ^ Invalid opcode |
138 |
deriving (Show, Eq) |
139 |
|
140 |
-- | JSON instance for 'InputOpCode', trying to parse it and if |
141 |
-- failing, keeping the original JSValue. |
142 |
instance Text.JSON.JSON InputOpCode where |
143 |
showJSON (ValidOpCode mo) = Text.JSON.showJSON mo |
144 |
showJSON (InvalidOpCode inv) = inv |
145 |
readJSON v = case Text.JSON.readJSON v of |
146 |
Text.JSON.Error _ -> return $ InvalidOpCode v |
147 |
Text.JSON.Ok mo -> return $ ValidOpCode mo |
148 |
|
149 |
-- | Invalid opcode summary. |
150 |
invalidOp :: String |
151 |
invalidOp = "INVALID_OP" |
152 |
|
153 |
-- | Tries to extract the opcode summary from an 'InputOpCode'. This |
154 |
-- duplicates some functionality from the 'opSummary' function in |
155 |
-- "Ganeti.OpCodes". |
156 |
extractOpSummary :: InputOpCode -> String |
157 |
extractOpSummary (ValidOpCode metaop) = opSummary $ metaOpCode metaop |
158 |
extractOpSummary (InvalidOpCode (JSObject o)) = |
159 |
case fromObjWithDefault (fromJSObject o) "OP_ID" ("OP_" ++ invalidOp) of |
160 |
Just s -> drop 3 s -- drop the OP_ prefix |
161 |
Nothing -> invalidOp |
162 |
extractOpSummary _ = invalidOp |
163 |
|
164 |
$(buildObject "QueuedOpCode" "qo" |
165 |
[ simpleField "input" [t| InputOpCode |] |
166 |
, simpleField "status" [t| OpStatus |] |
167 |
, simpleField "result" [t| JSValue |] |
168 |
, defaultField [| [] |] $ |
169 |
simpleField "log" [t| [(Int, Timestamp, ELogType, JSValue)] |] |
170 |
, simpleField "priority" [t| Int |] |
171 |
, optionalNullSerField $ |
172 |
simpleField "start_timestamp" [t| Timestamp |] |
173 |
, optionalNullSerField $ |
174 |
simpleField "exec_timestamp" [t| Timestamp |] |
175 |
, optionalNullSerField $ |
176 |
simpleField "end_timestamp" [t| Timestamp |] |
177 |
]) |
178 |
|
179 |
$(buildObject "QueuedJob" "qj" |
180 |
[ simpleField "id" [t| JobId |] |
181 |
, simpleField "ops" [t| [QueuedOpCode] |] |
182 |
, optionalNullSerField $ |
183 |
simpleField "received_timestamp" [t| Timestamp |] |
184 |
, optionalNullSerField $ |
185 |
simpleField "start_timestamp" [t| Timestamp |] |
186 |
, optionalNullSerField $ |
187 |
simpleField "end_timestamp" [t| Timestamp |] |
188 |
]) |
189 |
|
190 |
-- | Convenience function to obtain a QueuedOpCode from a MetaOpCode |
191 |
queuedOpCodeFromMetaOpCode :: MetaOpCode -> QueuedOpCode |
192 |
queuedOpCodeFromMetaOpCode op = |
193 |
QueuedOpCode { qoInput = ValidOpCode op |
194 |
, qoStatus = OP_STATUS_QUEUED |
195 |
, qoPriority = opSubmitPriorityToRaw . opPriority . metaParams |
196 |
$ op |
197 |
, qoLog = [] |
198 |
, qoResult = JSNull |
199 |
, qoStartTimestamp = Nothing |
200 |
, qoEndTimestamp = Nothing |
201 |
, qoExecTimestamp = Nothing |
202 |
} |
203 |
|
204 |
-- | From a job-id and a list of op-codes create a job. This is |
205 |
-- the pure part of job creation, as allocating a new job id |
206 |
-- lives in IO. |
207 |
queuedJobFromOpCodes :: (Monad m) => JobId -> [MetaOpCode] -> m QueuedJob |
208 |
queuedJobFromOpCodes jobid ops = do |
209 |
ops' <- mapM (`resolveDependencies` jobid) ops |
210 |
return QueuedJob { qjId = jobid |
211 |
, qjOps = map queuedOpCodeFromMetaOpCode ops' |
212 |
, qjReceivedTimestamp = Nothing |
213 |
, qjStartTimestamp = Nothing |
214 |
, qjEndTimestamp = Nothing |
215 |
} |
216 |
|
217 |
-- | Attach a received timestamp to a Queued Job. |
218 |
setReceivedTimestamp :: Timestamp -> QueuedJob -> QueuedJob |
219 |
setReceivedTimestamp ts job = job { qjReceivedTimestamp = Just ts } |
220 |
|
221 |
-- | Build a timestamp in the format expected by the reason trail (nanoseconds) |
222 |
-- starting from a JQueue Timestamp. |
223 |
reasonTrailTimestamp :: Timestamp -> Integer |
224 |
reasonTrailTimestamp (sec, micro) = |
225 |
let sec' = toInteger sec |
226 |
micro' = toInteger micro |
227 |
in sec' * 1000000000 + micro' * 1000 |
228 |
|
229 |
-- | Append an element to the reason trail of an input opcode. |
230 |
extendInputOpCodeReasonTrail :: JobId -> Timestamp -> Int -> InputOpCode |
231 |
-> InputOpCode |
232 |
extendInputOpCodeReasonTrail _ _ _ op@(InvalidOpCode _) = op |
233 |
extendInputOpCodeReasonTrail jid ts i (ValidOpCode vOp) = |
234 |
let metaP = metaParams vOp |
235 |
op = metaOpCode vOp |
236 |
trail = opReason metaP |
237 |
reasonSrc = opReasonSrcID op |
238 |
reasonText = "job=" ++ show (fromJobId jid) ++ ";index=" ++ show i |
239 |
reason = (reasonSrc, reasonText, reasonTrailTimestamp ts) |
240 |
trail' = trail ++ [reason] |
241 |
in ValidOpCode $ vOp { metaParams = metaP { opReason = trail' } } |
242 |
|
243 |
-- | Append an element to the reason trail of a queued opcode. |
244 |
extendOpCodeReasonTrail :: JobId -> Timestamp -> Int -> QueuedOpCode |
245 |
-> QueuedOpCode |
246 |
extendOpCodeReasonTrail jid ts i op = |
247 |
let inOp = qoInput op |
248 |
in op { qoInput = extendInputOpCodeReasonTrail jid ts i inOp } |
249 |
|
250 |
-- | Append an element to the reason trail of all the OpCodes of a queued job. |
251 |
extendJobReasonTrail :: QueuedJob -> QueuedJob |
252 |
extendJobReasonTrail job = |
253 |
let jobId = qjId job |
254 |
mTimestamp = qjReceivedTimestamp job |
255 |
-- This function is going to be called on QueuedJobs that already contain |
256 |
-- a timestamp. But for safety reasons we cannot assume mTimestamp will |
257 |
-- be (Just timestamp), so we use the value 0 in the extremely unlikely |
258 |
-- case this is not true. |
259 |
timestamp = fromMaybe (0, 0) mTimestamp |
260 |
in job |
261 |
{ qjOps = |
262 |
zipWith (extendOpCodeReasonTrail jobId timestamp) [0..] $ |
263 |
qjOps job |
264 |
} |
265 |
|
266 |
-- | Change the priority of a QueuedOpCode, if it is not already |
267 |
-- finalized. |
268 |
changeOpCodePriority :: Int -> QueuedOpCode -> QueuedOpCode |
269 |
changeOpCodePriority prio op = |
270 |
if qoStatus op > OP_STATUS_RUNNING |
271 |
then op |
272 |
else op { qoPriority = prio } |
273 |
|
274 |
-- | Set the state of a QueuedOpCode to canceled. |
275 |
cancelOpCode :: Timestamp -> QueuedOpCode -> QueuedOpCode |
276 |
cancelOpCode now op = |
277 |
op { qoStatus = OP_STATUS_CANCELED, qoEndTimestamp = Just now } |
278 |
|
279 |
-- | Change the priority of a job, i.e., change the priority of the |
280 |
-- non-finalized opcodes. |
281 |
changeJobPriority :: Int -> QueuedJob -> QueuedJob |
282 |
changeJobPriority prio job = |
283 |
job { qjOps = map (changeOpCodePriority prio) $ qjOps job } |
284 |
|
285 |
-- | Transform a QueuedJob that has not been started into its canceled form. |
286 |
cancelQueuedJob :: Timestamp -> QueuedJob -> QueuedJob |
287 |
cancelQueuedJob now job = |
288 |
let ops' = map (cancelOpCode now) $ qjOps job |
289 |
in job { qjOps = ops', qjEndTimestamp = Just now} |
290 |
|
291 |
-- | Job file prefix. |
292 |
jobFilePrefix :: String |
293 |
jobFilePrefix = "job-" |
294 |
|
295 |
-- | Computes the filename for a given job ID. |
296 |
jobFileName :: JobId -> FilePath |
297 |
jobFileName jid = jobFilePrefix ++ show (fromJobId jid) |
298 |
|
299 |
-- | Parses a job ID from a file name. |
300 |
parseJobFileId :: (Monad m) => FilePath -> m JobId |
301 |
parseJobFileId path = |
302 |
case stripPrefix jobFilePrefix path of |
303 |
Nothing -> fail $ "Job file '" ++ path ++ |
304 |
"' doesn't have the correct prefix" |
305 |
Just suffix -> makeJobIdS suffix |
306 |
|
307 |
-- | Computes the full path to a live job. |
308 |
liveJobFile :: FilePath -> JobId -> FilePath |
309 |
liveJobFile rootdir jid = rootdir </> jobFileName jid |
310 |
|
311 |
-- | Computes the full path to an archives job. BROKEN. |
312 |
archivedJobFile :: FilePath -> JobId -> FilePath |
313 |
archivedJobFile rootdir jid = |
314 |
let subdir = show (fromJobId jid `div` C.jstoreJobsPerArchiveDirectory) |
315 |
in rootdir </> jobQueueArchiveSubDir </> subdir </> jobFileName jid |
316 |
|
317 |
-- | Map from opcode status to job status. |
318 |
opStatusToJob :: OpStatus -> JobStatus |
319 |
opStatusToJob OP_STATUS_QUEUED = JOB_STATUS_QUEUED |
320 |
opStatusToJob OP_STATUS_WAITING = JOB_STATUS_WAITING |
321 |
opStatusToJob OP_STATUS_SUCCESS = JOB_STATUS_SUCCESS |
322 |
opStatusToJob OP_STATUS_RUNNING = JOB_STATUS_RUNNING |
323 |
opStatusToJob OP_STATUS_CANCELING = JOB_STATUS_CANCELING |
324 |
opStatusToJob OP_STATUS_CANCELED = JOB_STATUS_CANCELED |
325 |
opStatusToJob OP_STATUS_ERROR = JOB_STATUS_ERROR |
326 |
|
327 |
-- | Computes a queued job's status. |
328 |
calcJobStatus :: QueuedJob -> JobStatus |
329 |
calcJobStatus QueuedJob { qjOps = ops } = |
330 |
extractOpSt (map qoStatus ops) JOB_STATUS_QUEUED True |
331 |
where |
332 |
terminalStatus OP_STATUS_ERROR = True |
333 |
terminalStatus OP_STATUS_CANCELING = True |
334 |
terminalStatus OP_STATUS_CANCELED = True |
335 |
terminalStatus _ = False |
336 |
softStatus OP_STATUS_SUCCESS = True |
337 |
softStatus OP_STATUS_QUEUED = True |
338 |
softStatus _ = False |
339 |
extractOpSt [] _ True = JOB_STATUS_SUCCESS |
340 |
extractOpSt [] d False = d |
341 |
extractOpSt (x:xs) d old_all |
342 |
| terminalStatus x = opStatusToJob x -- abort recursion |
343 |
| softStatus x = extractOpSt xs d new_all -- continue unchanged |
344 |
| otherwise = extractOpSt xs (opStatusToJob x) new_all |
345 |
where new_all = x == OP_STATUS_SUCCESS && old_all |
346 |
|
347 |
-- | Determine if a job has started |
348 |
jobStarted :: QueuedJob -> Bool |
349 |
jobStarted = (> JOB_STATUS_QUEUED) . calcJobStatus |
350 |
|
351 |
-- | Determine if a job is finalised. |
352 |
jobFinalized :: QueuedJob -> Bool |
353 |
jobFinalized = (> JOB_STATUS_RUNNING) . calcJobStatus |
354 |
|
355 |
-- | Determine if a job is finalized and its timestamp is before |
356 |
-- a given time. |
357 |
jobArchivable :: Timestamp -> QueuedJob -> Bool |
358 |
jobArchivable ts = liftA2 (&&) jobFinalized |
359 |
$ maybe False (< ts) |
360 |
. liftA2 (<|>) qjEndTimestamp qjStartTimestamp |
361 |
|
362 |
-- | Determine whether an opcode status is finalized. |
363 |
opStatusFinalized :: OpStatus -> Bool |
364 |
opStatusFinalized = (> OP_STATUS_RUNNING) |
365 |
|
366 |
-- | Compute a job's priority. |
367 |
calcJobPriority :: QueuedJob -> Int |
368 |
calcJobPriority QueuedJob { qjOps = ops } = |
369 |
helper . map qoPriority $ filter (not . opStatusFinalized . qoStatus) ops |
370 |
where helper [] = C.opPrioDefault |
371 |
helper ps = minimum ps |
372 |
|
373 |
-- | Log but ignore an 'IOError'. |
374 |
ignoreIOError :: a -> Bool -> String -> IOError -> IO a |
375 |
ignoreIOError a ignore_noent msg e = do |
376 |
unless (isDoesNotExistError e && ignore_noent) . |
377 |
logWarning $ msg ++ ": " ++ show e |
378 |
return a |
379 |
|
380 |
-- | Compute the list of existing archive directories. Note that I/O |
381 |
-- exceptions are swallowed and ignored. |
382 |
allArchiveDirs :: FilePath -> IO [FilePath] |
383 |
allArchiveDirs rootdir = do |
384 |
let adir = rootdir </> jobQueueArchiveSubDir |
385 |
contents <- getDirectoryContents adir `Control.Exception.catch` |
386 |
ignoreIOError [] False |
387 |
("Failed to list queue directory " ++ adir) |
388 |
let fpaths = map (adir </>) $ filter (not . ("." `isPrefixOf`)) contents |
389 |
filterM (\path -> |
390 |
liftM isDirectory (getFileStatus (adir </> path)) |
391 |
`Control.Exception.catch` |
392 |
ignoreIOError False True |
393 |
("Failed to stat archive path " ++ path)) fpaths |
394 |
|
395 |
-- | Build list of directories containing job files. Note: compared to |
396 |
-- the Python version, this doesn't ignore a potential lost+found |
397 |
-- file. |
398 |
determineJobDirectories :: FilePath -> Bool -> IO [FilePath] |
399 |
determineJobDirectories rootdir archived = do |
400 |
other <- if archived |
401 |
then allArchiveDirs rootdir |
402 |
else return [] |
403 |
return $ rootdir:other |
404 |
|
405 |
-- | Computes the list of all jobs in the given directories. |
406 |
getJobIDs :: [FilePath] -> IO (GenericResult IOError [JobId]) |
407 |
getJobIDs = runResultT . liftM concat . mapM getDirJobIDs |
408 |
|
409 |
-- | Sorts the a list of job IDs. |
410 |
sortJobIDs :: [JobId] -> [JobId] |
411 |
sortJobIDs = sortBy (comparing fromJobId) |
412 |
|
413 |
-- | Computes the list of jobs in a given directory. |
414 |
getDirJobIDs :: FilePath -> ResultT IOError IO [JobId] |
415 |
getDirJobIDs path = |
416 |
withErrorLogAt WARNING ("Failed to list job directory " ++ path) . |
417 |
liftM (mapMaybe parseJobFileId) $ liftIO (getDirectoryContents path) |
418 |
|
419 |
-- | Reads the job data from disk. |
420 |
readJobDataFromDisk :: FilePath -> Bool -> JobId -> IO (Maybe (String, Bool)) |
421 |
readJobDataFromDisk rootdir archived jid = do |
422 |
let live_path = liveJobFile rootdir jid |
423 |
archived_path = archivedJobFile rootdir jid |
424 |
all_paths = if archived |
425 |
then [(live_path, False), (archived_path, True)] |
426 |
else [(live_path, False)] |
427 |
foldM (\state (path, isarchived) -> |
428 |
liftM (\r -> Just (r, isarchived)) (readFile path) |
429 |
`Control.Exception.catch` |
430 |
ignoreIOError state True |
431 |
("Failed to read job file " ++ path)) Nothing all_paths |
432 |
|
433 |
-- | Failed to load job error. |
434 |
noSuchJob :: Result (QueuedJob, Bool) |
435 |
noSuchJob = Bad "Can't load job file" |
436 |
|
437 |
-- | Loads a job from disk. |
438 |
loadJobFromDisk :: FilePath -> Bool -> JobId -> IO (Result (QueuedJob, Bool)) |
439 |
loadJobFromDisk rootdir archived jid = do |
440 |
raw <- readJobDataFromDisk rootdir archived jid |
441 |
-- note: we need some stricness below, otherwise the wrapping in a |
442 |
-- Result will create too much lazyness, and not close the file |
443 |
-- descriptors for the individual jobs |
444 |
return $! case raw of |
445 |
Nothing -> noSuchJob |
446 |
Just (str, arch) -> |
447 |
liftM (\qj -> (qj, arch)) . |
448 |
fromJResult "Parsing job file" $ Text.JSON.decode str |
449 |
|
450 |
-- | Write a job to disk. |
451 |
writeJobToDisk :: FilePath -> QueuedJob -> IO (Result ()) |
452 |
writeJobToDisk rootdir job = do |
453 |
let filename = liveJobFile rootdir . qjId $ job |
454 |
content = Text.JSON.encode . Text.JSON.showJSON $ job |
455 |
tryAndLogIOError (atomicWriteFile filename content) |
456 |
("Failed to write " ++ filename) Ok |
457 |
|
458 |
-- | Replicate a job to all master candidates. |
459 |
replicateJob :: FilePath -> [Node] -> QueuedJob -> IO [(Node, ERpcError ())] |
460 |
replicateJob rootdir mastercandidates job = do |
461 |
let filename = liveJobFile rootdir . qjId $ job |
462 |
content = Text.JSON.encode . Text.JSON.showJSON $ job |
463 |
filename' <- makeVirtualPath filename |
464 |
callresult <- executeRpcCall mastercandidates |
465 |
$ RpcCallJobqueueUpdate filename' content |
466 |
let result = map (second (() <$)) callresult |
467 |
_ <- logRpcErrors result |
468 |
return result |
469 |
|
470 |
-- | Replicate many jobs to all master candidates. |
471 |
replicateManyJobs :: FilePath -> [Node] -> [QueuedJob] -> IO () |
472 |
replicateManyJobs rootdir mastercandidates = |
473 |
mapM_ (replicateJob rootdir mastercandidates) |
474 |
|
475 |
-- | Read the job serial number from disk. |
476 |
readSerialFromDisk :: IO (Result JobId) |
477 |
readSerialFromDisk = do |
478 |
filename <- jobQueueSerialFile |
479 |
tryAndLogIOError (readFile filename) "Failed to read serial file" |
480 |
(makeJobIdS . rStripSpace) |
481 |
|
482 |
-- | Allocate new job ids. |
483 |
-- To avoid races while accessing the serial file, the threads synchronize |
484 |
-- over a lock, as usual provided by an MVar. |
485 |
allocateJobIds :: [Node] -> MVar () -> Int -> IO (Result [JobId]) |
486 |
allocateJobIds mastercandidates lock n = |
487 |
if n <= 0 |
488 |
then return . Bad $ "Can only allocate positive number of job ids" |
489 |
else do |
490 |
takeMVar lock |
491 |
rjobid <- readSerialFromDisk |
492 |
case rjobid of |
493 |
Bad s -> do |
494 |
putMVar lock () |
495 |
return . Bad $ s |
496 |
Ok jid -> do |
497 |
let current = fromJobId jid |
498 |
serial_content = show (current + n) ++ "\n" |
499 |
serial <- jobQueueSerialFile |
500 |
write_result <- try $ atomicWriteFile serial serial_content |
501 |
:: IO (Either IOError ()) |
502 |
case write_result of |
503 |
Left e -> do |
504 |
putMVar lock () |
505 |
let msg = "Failed to write serial file: " ++ show e |
506 |
logError msg |
507 |
return . Bad $ msg |
508 |
Right () -> do |
509 |
serial' <- makeVirtualPath serial |
510 |
_ <- executeRpcCall mastercandidates |
511 |
$ RpcCallJobqueueUpdate serial' serial_content |
512 |
putMVar lock () |
513 |
return $ mapM makeJobId [(current+1)..(current+n)] |
514 |
|
515 |
-- | Allocate one new job id. |
516 |
allocateJobId :: [Node] -> MVar () -> IO (Result JobId) |
517 |
allocateJobId mastercandidates lock = do |
518 |
jids <- allocateJobIds mastercandidates lock 1 |
519 |
return (jids >>= monadicThe "Failed to allocate precisely one Job ID") |
520 |
|
521 |
-- | Decide if job queue is open |
522 |
isQueueOpen :: IO Bool |
523 |
isQueueOpen = liftM not (jobQueueDrainFile >>= doesFileExist) |
524 |
|
525 |
-- | Start enqueued jobs, currently by handing them over to masterd. |
526 |
startJobs :: [QueuedJob] -> IO () |
527 |
startJobs jobs = do |
528 |
socketpath <- defaultMasterSocket |
529 |
client <- getLuxiClient socketpath |
530 |
pickupResults <- mapM (flip callMethod client . PickupJob . qjId) jobs |
531 |
let failures = map show $ justBad pickupResults |
532 |
unless (null failures) |
533 |
. logWarning . (++) "Failed to notify masterd: " . commaJoin $ failures |
534 |
|
535 |
-- | Try to cancel a job that has already been handed over to execution, |
536 |
-- currently by asking masterd to cancel it. |
537 |
cancelJob :: JobId -> IO (ErrorResult JSValue) |
538 |
cancelJob jid = do |
539 |
socketpath <- defaultMasterSocket |
540 |
client <- getLuxiClient socketpath |
541 |
callMethod (CancelJob jid) client |
542 |
|
543 |
-- | Permissions for the archive directories. |
544 |
queueDirPermissions :: FilePermissions |
545 |
queueDirPermissions = FilePermissions { fpOwner = Just C.masterdUser |
546 |
, fpGroup = Just C.daemonsGroup |
547 |
, fpPermissions = 0o0750 |
548 |
} |
549 |
|
550 |
-- | Try, at most until the given endtime, to archive some of the given |
551 |
-- jobs, if they are older than the specified cut-off time; also replicate |
552 |
-- archival of the additional jobs. Return the pair of the number of jobs |
553 |
-- archived, and the number of jobs remaining int he queue, asuming the |
554 |
-- given numbers about the not considered jobs. |
555 |
archiveSomeJobsUntil :: ([JobId] -> IO ()) -- ^ replication function |
556 |
-> FilePath -- ^ queue root directory |
557 |
-> ClockTime -- ^ Endtime |
558 |
-> Timestamp -- ^ cut-off time for archiving jobs |
559 |
-> Int -- ^ number of jobs alread archived |
560 |
-> [JobId] -- ^ Additional jobs to replicate |
561 |
-> [JobId] -- ^ List of job-ids still to consider |
562 |
-> IO (Int, Int) |
563 |
archiveSomeJobsUntil replicateFn _ _ _ arch torepl [] = do |
564 |
unless (null torepl) . (>> return ()) |
565 |
. forkIO $ replicateFn torepl |
566 |
return (arch, 0) |
567 |
|
568 |
archiveSomeJobsUntil replicateFn qDir endt cutt arch torepl (jid:jids) = do |
569 |
let archiveMore = archiveSomeJobsUntil replicateFn qDir endt cutt |
570 |
continue = archiveMore arch torepl jids |
571 |
jidname = show $ fromJobId jid |
572 |
time <- getClockTime |
573 |
if time >= endt |
574 |
then do |
575 |
_ <- forkIO $ replicateFn torepl |
576 |
return (arch, length (jid:jids)) |
577 |
else do |
578 |
logDebug $ "Inspecting job " ++ jidname ++ " for archival" |
579 |
loadResult <- loadJobFromDisk qDir False jid |
580 |
case loadResult of |
581 |
Bad _ -> continue |
582 |
Ok (job, _) -> |
583 |
if jobArchivable cutt job |
584 |
then do |
585 |
let live = liveJobFile qDir jid |
586 |
archive = archivedJobFile qDir jid |
587 |
renameResult <- safeRenameFile queueDirPermissions |
588 |
live archive |
589 |
case renameResult of |
590 |
Bad s -> do |
591 |
logWarning $ "Renaming " ++ live ++ " to " ++ archive |
592 |
++ " failed unexpectedly: " ++ s |
593 |
continue |
594 |
Ok () -> do |
595 |
let torepl' = jid:torepl |
596 |
if length torepl' >= 10 |
597 |
then do |
598 |
_ <- forkIO $ replicateFn torepl' |
599 |
archiveMore (arch + 1) [] jids |
600 |
else archiveMore (arch + 1) torepl' jids |
601 |
else continue |
602 |
|
603 |
-- | Archive jobs older than the given time, but do not exceed the timeout for |
604 |
-- carrying out this task. |
605 |
archiveJobs :: ConfigData -- ^ cluster configuration |
606 |
-> Int -- ^ time the job has to be in the past in order |
607 |
-- to be archived |
608 |
-> Int -- ^ timeout |
609 |
-> [JobId] -- ^ jobs to consider |
610 |
-> IO (Int, Int) |
611 |
archiveJobs cfg age timeout jids = do |
612 |
now <- getClockTime |
613 |
qDir <- queueDir |
614 |
let endtime = addToClockTime (noTimeDiff { tdSec = timeout }) now |
615 |
cuttime = if age < 0 then noTimestamp |
616 |
else advanceTimestamp (- age) (fromClockTime now) |
617 |
mcs = Config.getMasterCandidates cfg |
618 |
replicateFn jobs = do |
619 |
let olds = map (liveJobFile qDir) jobs |
620 |
news = map (archivedJobFile qDir) jobs |
621 |
_ <- executeRpcCall mcs . RpcCallJobqueueRename $ zip olds news |
622 |
return () |
623 |
archiveSomeJobsUntil replicateFn qDir endtime cuttime 0 [] jids |