Revision c867cfe1

b/src/Ganeti/JQueue.hs
61 61
    , isQueueOpen
62 62
    , startJobs
63 63
    , cancelJob
64
    , archiveJobs
64 65
    ) where
65 66

  
66 67
import Control.Applicative (liftA2, (<|>))
67 68
import Control.Arrow (first, second)
69
import Control.Concurrent (forkIO)
68 70
import Control.Concurrent.MVar
69 71
import Control.Exception
70 72
import Control.Monad
......
83 85
import Text.JSON.Types
84 86

  
85 87
import Ganeti.BasicTypes
88
import qualified Ganeti.Config as Config
86 89
import qualified Ganeti.Constants as C
87 90
import Ganeti.Errors (ErrorResult)
88 91
import Ganeti.JSON
89 92
import Ganeti.Logging
90 93
import Ganeti.Luxi
91
import Ganeti.Objects (Node)
94
import Ganeti.Objects (ConfigData, Node)
92 95
import Ganeti.OpCodes
93 96
import Ganeti.Path
94 97
import Ganeti.Rpc (executeRpcCall, ERpcError, logRpcErrors,
95
                   RpcCallJobqueueUpdate(..))
98
                   RpcCallJobqueueUpdate(..), RpcCallJobqueueRename(..))
96 99
import Ganeti.THH
97 100
import Ganeti.Types
98 101
import Ganeti.Utils
......
496 499
  socketpath <- defaultMasterSocket
497 500
  client <- getLuxiClient socketpath
498 501
  callMethod (CancelJob jid) client
502

  
503
-- | Try, at most until the given endtime, to archive some of the given
504
-- jobs, if they are older than the specified cut-off time; also replicate
505
-- archival of the additional jobs. Return the pair of the number of jobs
506
-- archived, and the number of jobs remaining int he queue, asuming the
507
-- given numbers about the not considered jobs.
508
archiveSomeJobsUntil :: ([JobId] -> IO ()) -- ^ replication function
509
                        -> FilePath -- ^ queue root directory
510
                        -> ClockTime -- ^ Endtime
511
                        -> Timestamp -- ^ cut-off time for archiving jobs
512
                        -> Int -- ^ number of jobs alread archived
513
                        -> [JobId] -- ^ Additional jobs to replicate
514
                        -> [JobId] -- ^ List of job-ids still to consider
515
                        -> IO (Int, Int)
516
archiveSomeJobsUntil replicateFn _ _ _ arch torepl [] = do
517
  unless (null torepl) . (>> return ())
518
   . forkIO $ replicateFn torepl
519
  return (arch, 0)
520

  
521
archiveSomeJobsUntil replicateFn qDir endt cutt arch torepl (jid:jids) = do
522
  let archiveMore = archiveSomeJobsUntil replicateFn qDir endt cutt
523
      continue = archiveMore arch torepl jids
524
      jidname = show $ fromJobId jid
525
  time <- getClockTime
526
  if time >= endt
527
    then do
528
      _ <- forkIO $ replicateFn torepl
529
      return (arch, length (jid:jids))
530
    else do
531
      logDebug $ "Inspecting job " ++ jidname ++ " for archival"
532
      loadResult <- loadJobFromDisk qDir False jid
533
      case loadResult of
534
        Bad _ -> continue
535
        Ok (job, _) -> 
536
          if jobArchivable cutt job
537
            then do
538
              let live = liveJobFile qDir jid
539
                  archive = archivedJobFile qDir jid
540
              renameResult <- safeRenameFile live archive
541
              case renameResult of                   
542
                Bad s -> do
543
                  logWarning $ "Renaming " ++ live ++ " to " ++ archive
544
                                 ++ " failed unexpectedly: " ++ s
545
                  continue
546
                Ok () -> do
547
                  let torepl' = jid:torepl
548
                  if length torepl' >= 10
549
                    then do
550
                      _ <- forkIO $ replicateFn torepl'
551
                      archiveMore (arch + 1) [] jids
552
                    else archiveMore (arch + 1) torepl' jids
553
            else continue
554
                   
555
-- | Archive jobs older than the given time, but do not exceed the timeout for
556
-- carrying out this task.
557
archiveJobs :: ConfigData -- ^ cluster configuration
558
               -> Int  -- ^ time the job has to be in the past in order
559
                       -- to be archived
560
               -> Int -- ^ timeout
561
               -> [JobId] -- ^ jobs to consider
562
               -> IO (Int, Int)
563
archiveJobs cfg age timeout jids = do
564
  now <- getClockTime
565
  qDir <- queueDir
566
  let endtime = addToClockTime (noTimeDiff { tdSec = timeout }) now
567
      cuttime = if age < 0 then noTimestamp
568
                           else advanceTimestamp (- age) (fromClockTime now)
569
      mcs = Config.getMasterCandidates cfg
570
      replicateFn jobs = do
571
        let olds = map (liveJobFile qDir) jobs
572
            news = map (archivedJobFile qDir) jobs
573
        _ <- executeRpcCall mcs . RpcCallJobqueueRename $ zip olds news
574
        return ()
575
  archiveSomeJobsUntil replicateFn qDir endtime cuttime 0 [] jids

Also available in: Unified diff