Revision c867cfe1
b/src/Ganeti/JQueue.hs | ||
---|---|---|
61 | 61 |
, isQueueOpen |
62 | 62 |
, startJobs |
63 | 63 |
, cancelJob |
64 |
, archiveJobs |
|
64 | 65 |
) where |
65 | 66 |
|
66 | 67 |
import Control.Applicative (liftA2, (<|>)) |
67 | 68 |
import Control.Arrow (first, second) |
69 |
import Control.Concurrent (forkIO) |
|
68 | 70 |
import Control.Concurrent.MVar |
69 | 71 |
import Control.Exception |
70 | 72 |
import Control.Monad |
... | ... | |
83 | 85 |
import Text.JSON.Types |
84 | 86 |
|
85 | 87 |
import Ganeti.BasicTypes |
88 |
import qualified Ganeti.Config as Config |
|
86 | 89 |
import qualified Ganeti.Constants as C |
87 | 90 |
import Ganeti.Errors (ErrorResult) |
88 | 91 |
import Ganeti.JSON |
89 | 92 |
import Ganeti.Logging |
90 | 93 |
import Ganeti.Luxi |
91 |
import Ganeti.Objects (Node) |
|
94 |
import Ganeti.Objects (ConfigData, Node)
|
|
92 | 95 |
import Ganeti.OpCodes |
93 | 96 |
import Ganeti.Path |
94 | 97 |
import Ganeti.Rpc (executeRpcCall, ERpcError, logRpcErrors, |
95 |
RpcCallJobqueueUpdate(..)) |
|
98 |
RpcCallJobqueueUpdate(..), RpcCallJobqueueRename(..))
|
|
96 | 99 |
import Ganeti.THH |
97 | 100 |
import Ganeti.Types |
98 | 101 |
import Ganeti.Utils |
... | ... | |
496 | 499 |
socketpath <- defaultMasterSocket |
497 | 500 |
client <- getLuxiClient socketpath |
498 | 501 |
callMethod (CancelJob jid) client |
502 |
|
|
503 |
-- | Try, at most until the given endtime, to archive some of the given |
|
504 |
-- jobs, if they are older than the specified cut-off time; also replicate |
|
505 |
-- archival of the additional jobs. Return the pair of the number of jobs |
|
506 |
-- archived, and the number of jobs remaining int he queue, asuming the |
|
507 |
-- given numbers about the not considered jobs. |
|
508 |
archiveSomeJobsUntil :: ([JobId] -> IO ()) -- ^ replication function |
|
509 |
-> FilePath -- ^ queue root directory |
|
510 |
-> ClockTime -- ^ Endtime |
|
511 |
-> Timestamp -- ^ cut-off time for archiving jobs |
|
512 |
-> Int -- ^ number of jobs alread archived |
|
513 |
-> [JobId] -- ^ Additional jobs to replicate |
|
514 |
-> [JobId] -- ^ List of job-ids still to consider |
|
515 |
-> IO (Int, Int) |
|
516 |
archiveSomeJobsUntil replicateFn _ _ _ arch torepl [] = do |
|
517 |
unless (null torepl) . (>> return ()) |
|
518 |
. forkIO $ replicateFn torepl |
|
519 |
return (arch, 0) |
|
520 |
|
|
521 |
archiveSomeJobsUntil replicateFn qDir endt cutt arch torepl (jid:jids) = do |
|
522 |
let archiveMore = archiveSomeJobsUntil replicateFn qDir endt cutt |
|
523 |
continue = archiveMore arch torepl jids |
|
524 |
jidname = show $ fromJobId jid |
|
525 |
time <- getClockTime |
|
526 |
if time >= endt |
|
527 |
then do |
|
528 |
_ <- forkIO $ replicateFn torepl |
|
529 |
return (arch, length (jid:jids)) |
|
530 |
else do |
|
531 |
logDebug $ "Inspecting job " ++ jidname ++ " for archival" |
|
532 |
loadResult <- loadJobFromDisk qDir False jid |
|
533 |
case loadResult of |
|
534 |
Bad _ -> continue |
|
535 |
Ok (job, _) -> |
|
536 |
if jobArchivable cutt job |
|
537 |
then do |
|
538 |
let live = liveJobFile qDir jid |
|
539 |
archive = archivedJobFile qDir jid |
|
540 |
renameResult <- safeRenameFile live archive |
|
541 |
case renameResult of |
|
542 |
Bad s -> do |
|
543 |
logWarning $ "Renaming " ++ live ++ " to " ++ archive |
|
544 |
++ " failed unexpectedly: " ++ s |
|
545 |
continue |
|
546 |
Ok () -> do |
|
547 |
let torepl' = jid:torepl |
|
548 |
if length torepl' >= 10 |
|
549 |
then do |
|
550 |
_ <- forkIO $ replicateFn torepl' |
|
551 |
archiveMore (arch + 1) [] jids |
|
552 |
else archiveMore (arch + 1) torepl' jids |
|
553 |
else continue |
|
554 |
|
|
555 |
-- | Archive jobs older than the given time, but do not exceed the timeout for |
|
556 |
-- carrying out this task. |
|
557 |
archiveJobs :: ConfigData -- ^ cluster configuration |
|
558 |
-> Int -- ^ time the job has to be in the past in order |
|
559 |
-- to be archived |
|
560 |
-> Int -- ^ timeout |
|
561 |
-> [JobId] -- ^ jobs to consider |
|
562 |
-> IO (Int, Int) |
|
563 |
archiveJobs cfg age timeout jids = do |
|
564 |
now <- getClockTime |
|
565 |
qDir <- queueDir |
|
566 |
let endtime = addToClockTime (noTimeDiff { tdSec = timeout }) now |
|
567 |
cuttime = if age < 0 then noTimestamp |
|
568 |
else advanceTimestamp (- age) (fromClockTime now) |
|
569 |
mcs = Config.getMasterCandidates cfg |
|
570 |
replicateFn jobs = do |
|
571 |
let olds = map (liveJobFile qDir) jobs |
|
572 |
news = map (archivedJobFile qDir) jobs |
|
573 |
_ <- executeRpcCall mcs . RpcCallJobqueueRename $ zip olds news |
|
574 |
return () |
|
575 |
archiveSomeJobsUntil replicateFn qDir endtime cuttime 0 [] jids |
Also available in: Unified diff