Statistics
| Branch: | Tag: | Revision:

root / src / Ganeti / JQueue.hs @ a7ab381a

History | View | Annotate | Download (22.1 kB)

1
{-# LANGUAGE TemplateHaskell #-}
2

    
3
{-| Implementation of the job queue.
4

    
5
-}
6

    
7
{-
8

    
9
Copyright (C) 2010, 2012 Google Inc.
10

    
11
This program is free software; you can redistribute it and/or modify
12
it under the terms of the GNU General Public License as published by
13
the Free Software Foundation; either version 2 of the License, or
14
(at your option) any later version.
15

    
16
This program is distributed in the hope that it will be useful, but
17
WITHOUT ANY WARRANTY; without even the implied warranty of
18
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
19
General Public License for more details.
20

    
21
You should have received a copy of the GNU General Public License
22
along with this program; if not, write to the Free Software
23
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
24
02110-1301, USA.
25

    
26
-}
27

    
28
module Ganeti.JQueue
29
    ( QueuedOpCode(..)
30
    , QueuedJob(..)
31
    , InputOpCode(..)
32
    , queuedOpCodeFromMetaOpCode
33
    , queuedJobFromOpCodes
34
    , changeOpCodePriority
35
    , cancelQueuedJob
36
    , Timestamp
37
    , fromClockTime
38
    , noTimestamp
39
    , currentTimestamp
40
    , advanceTimestamp
41
    , setReceivedTimestamp
42
    , opStatusFinalized
43
    , extractOpSummary
44
    , calcJobStatus
45
    , jobStarted
46
    , jobFinalized
47
    , jobArchivable
48
    , calcJobPriority
49
    , jobFileName
50
    , liveJobFile
51
    , archivedJobFile
52
    , determineJobDirectories
53
    , getJobIDs
54
    , sortJobIDs
55
    , loadJobFromDisk
56
    , noSuchJob
57
    , readSerialFromDisk
58
    , allocateJobIds
59
    , allocateJobId
60
    , writeJobToDisk
61
    , replicateManyJobs
62
    , isQueueOpen
63
    , startJobs
64
    , cancelJob
65
    , queueDirPermissions
66
    , archiveJobs
67
    ) where
68

    
69
import Control.Applicative (liftA2, (<|>))
70
import Control.Arrow (first, second)
71
import Control.Concurrent (forkIO)
72
import Control.Concurrent.MVar
73
import Control.Exception
74
import Control.Monad
75
import Data.Functor ((<$))
76
import Data.List
77
import Data.Maybe
78
import Data.Ord (comparing)
79
-- workaround what seems to be a bug in ghc 7.4's TH shadowing code
80
import Prelude hiding (id, log)
81
import System.Directory
82
import System.FilePath
83
import System.IO.Error (isDoesNotExistError)
84
import System.Posix.Files
85
import System.Time
86
import qualified Text.JSON
87
import Text.JSON.Types
88

    
89
import Ganeti.BasicTypes
90
import qualified Ganeti.Config as Config
91
import qualified Ganeti.Constants as C
92
import Ganeti.Errors (ErrorResult)
93
import Ganeti.JSON
94
import Ganeti.Logging
95
import Ganeti.Luxi
96
import Ganeti.Objects (ConfigData, Node)
97
import Ganeti.OpCodes
98
import Ganeti.Path
99
import Ganeti.Rpc (executeRpcCall, ERpcError, logRpcErrors,
100
                   RpcCallJobqueueUpdate(..), RpcCallJobqueueRename(..))
101
import Ganeti.THH
102
import Ganeti.Types
103
import Ganeti.Utils
104
import Ganeti.VCluster (makeVirtualPath)
105

    
106
-- * Data types
107

    
108
-- | The ganeti queue timestamp type. It represents the time as the pair
109
-- of seconds since the epoch and microseconds since the beginning of the
110
-- second.
111
type Timestamp = (Int, Int)
112

    
113
-- | Missing timestamp type.
114
noTimestamp :: Timestamp
115
noTimestamp = (-1, -1)
116

    
117
-- | Obtain a Timestamp from a given clock time
118
fromClockTime :: ClockTime -> Timestamp
119
fromClockTime (TOD ctime pico) =
120
  (fromIntegral ctime, fromIntegral $ pico `div` 1000000)
121

    
122
-- | Get the current time in the job-queue timestamp format.
123
currentTimestamp :: IO Timestamp
124
currentTimestamp = fromClockTime `liftM` getClockTime
125

    
126
-- | From a given timestamp, obtain the timestamp of the
127
-- time that is the given number of seconds later.
128
advanceTimestamp :: Int -> Timestamp -> Timestamp
129
advanceTimestamp = first . (+)
130

    
131
-- | An input opcode.
132
data InputOpCode = ValidOpCode MetaOpCode -- ^ OpCode was parsed successfully
133
                 | InvalidOpCode JSValue  -- ^ Invalid opcode
134
                   deriving (Show, Eq)
135

    
136
-- | JSON instance for 'InputOpCode', trying to parse it and if
137
-- failing, keeping the original JSValue.
138
instance Text.JSON.JSON InputOpCode where
139
  showJSON (ValidOpCode mo) = Text.JSON.showJSON mo
140
  showJSON (InvalidOpCode inv) = inv
141
  readJSON v = case Text.JSON.readJSON v of
142
                 Text.JSON.Error _ -> return $ InvalidOpCode v
143
                 Text.JSON.Ok mo -> return $ ValidOpCode mo
144

    
145
-- | Invalid opcode summary.
146
invalidOp :: String
147
invalidOp = "INVALID_OP"
148

    
149
-- | Tries to extract the opcode summary from an 'InputOpCode'. This
150
-- duplicates some functionality from the 'opSummary' function in
151
-- "Ganeti.OpCodes".
152
extractOpSummary :: InputOpCode -> String
153
extractOpSummary (ValidOpCode metaop) = opSummary $ metaOpCode metaop
154
extractOpSummary (InvalidOpCode (JSObject o)) =
155
  case fromObjWithDefault (fromJSObject o) "OP_ID" ("OP_" ++ invalidOp) of
156
    Just s -> drop 3 s -- drop the OP_ prefix
157
    Nothing -> invalidOp
158
extractOpSummary _ = invalidOp
159

    
160
$(buildObject "QueuedOpCode" "qo"
161
  [ simpleField "input"           [t| InputOpCode |]
162
  , simpleField "status"          [t| OpStatus    |]
163
  , simpleField "result"          [t| JSValue     |]
164
  , defaultField [| [] |] $
165
    simpleField "log"             [t| [(Int, Timestamp, ELogType, JSValue)] |]
166
  , simpleField "priority"        [t| Int         |]
167
  , optionalNullSerField $
168
    simpleField "start_timestamp" [t| Timestamp   |]
169
  , optionalNullSerField $
170
    simpleField "exec_timestamp"  [t| Timestamp   |]
171
  , optionalNullSerField $
172
    simpleField "end_timestamp"   [t| Timestamp   |]
173
  ])
174

    
175
$(buildObject "QueuedJob" "qj"
176
  [ simpleField "id"                 [t| JobId          |]
177
  , simpleField "ops"                [t| [QueuedOpCode] |]
178
  , optionalNullSerField $
179
    simpleField "received_timestamp" [t| Timestamp      |]
180
  , optionalNullSerField $
181
    simpleField "start_timestamp"    [t| Timestamp      |]
182
  , optionalNullSerField $
183
    simpleField "end_timestamp"      [t| Timestamp      |]
184
  ])
185

    
186
-- | Convenience function to obtain a QueuedOpCode from a MetaOpCode
187
queuedOpCodeFromMetaOpCode :: MetaOpCode -> QueuedOpCode
188
queuedOpCodeFromMetaOpCode op =
189
  QueuedOpCode { qoInput = ValidOpCode op
190
               , qoStatus = OP_STATUS_QUEUED
191
               , qoPriority = opSubmitPriorityToRaw . opPriority . metaParams
192
                              $ op
193
               , qoLog = []
194
               , qoResult = JSNull
195
               , qoStartTimestamp = Nothing
196
               , qoEndTimestamp = Nothing
197
               , qoExecTimestamp = Nothing
198
               }
199

    
200
-- | From a job-id and a list of op-codes create a job. This is
201
-- the pure part of job creation, as allocating a new job id
202
-- lives in IO.
203
queuedJobFromOpCodes :: (Monad m) => JobId -> [MetaOpCode] -> m QueuedJob
204
queuedJobFromOpCodes jobid ops = do
205
  ops' <- mapM (`resolveDependencies` jobid) ops
206
  return QueuedJob { qjId = jobid
207
                   , qjOps = map queuedOpCodeFromMetaOpCode ops'
208
                   , qjReceivedTimestamp = Nothing 
209
                   , qjStartTimestamp = Nothing
210
                   , qjEndTimestamp = Nothing
211
                   }
212

    
213
-- | Attach a received timestamp to a Queued Job.
214
setReceivedTimestamp :: Timestamp -> QueuedJob -> QueuedJob
215
setReceivedTimestamp ts job = job { qjReceivedTimestamp = Just ts }
216

    
217
-- | Change the priority of a QueuedOpCode, if it is not already
218
-- finalized.
219
changeOpCodePriority :: Int -> QueuedOpCode -> QueuedOpCode
220
changeOpCodePriority prio op =
221
  if qoStatus op > OP_STATUS_RUNNING
222
     then op
223
     else op { qoPriority = prio }
224

    
225
-- | Set the state of a QueuedOpCode to canceled.
226
cancelOpCode :: Timestamp -> QueuedOpCode -> QueuedOpCode
227
cancelOpCode now op =
228
  op { qoStatus = OP_STATUS_CANCELED, qoEndTimestamp = Just now }
229

    
230
-- | Transform a QueuedJob that has not been started into its canceled form.
231
cancelQueuedJob :: Timestamp -> QueuedJob -> QueuedJob
232
cancelQueuedJob now job =
233
  let ops' = map (cancelOpCode now) $ qjOps job
234
  in job { qjOps = ops', qjEndTimestamp = Just now}
235

    
236
-- | Job file prefix.
237
jobFilePrefix :: String
238
jobFilePrefix = "job-"
239

    
240
-- | Computes the filename for a given job ID.
241
jobFileName :: JobId -> FilePath
242
jobFileName jid = jobFilePrefix ++ show (fromJobId jid)
243

    
244
-- | Parses a job ID from a file name.
245
parseJobFileId :: (Monad m) => FilePath -> m JobId
246
parseJobFileId path =
247
  case stripPrefix jobFilePrefix path of
248
    Nothing -> fail $ "Job file '" ++ path ++
249
                      "' doesn't have the correct prefix"
250
    Just suffix -> makeJobIdS suffix
251

    
252
-- | Computes the full path to a live job.
253
liveJobFile :: FilePath -> JobId -> FilePath
254
liveJobFile rootdir jid = rootdir </> jobFileName jid
255

    
256
-- | Computes the full path to an archives job. BROKEN.
257
archivedJobFile :: FilePath -> JobId -> FilePath
258
archivedJobFile rootdir jid =
259
  let subdir = show (fromJobId jid `div` C.jstoreJobsPerArchiveDirectory)
260
  in rootdir </> jobQueueArchiveSubDir </> subdir </> jobFileName jid
261

    
262
-- | Map from opcode status to job status.
263
opStatusToJob :: OpStatus -> JobStatus
264
opStatusToJob OP_STATUS_QUEUED    = JOB_STATUS_QUEUED
265
opStatusToJob OP_STATUS_WAITING   = JOB_STATUS_WAITING
266
opStatusToJob OP_STATUS_SUCCESS   = JOB_STATUS_SUCCESS
267
opStatusToJob OP_STATUS_RUNNING   = JOB_STATUS_RUNNING
268
opStatusToJob OP_STATUS_CANCELING = JOB_STATUS_CANCELING
269
opStatusToJob OP_STATUS_CANCELED  = JOB_STATUS_CANCELED
270
opStatusToJob OP_STATUS_ERROR     = JOB_STATUS_ERROR
271

    
272
-- | Computes a queued job's status.
273
calcJobStatus :: QueuedJob -> JobStatus
274
calcJobStatus QueuedJob { qjOps = ops } =
275
  extractOpSt (map qoStatus ops) JOB_STATUS_QUEUED True
276
    where
277
      terminalStatus OP_STATUS_ERROR     = True
278
      terminalStatus OP_STATUS_CANCELING = True
279
      terminalStatus OP_STATUS_CANCELED  = True
280
      terminalStatus _                   = False
281
      softStatus     OP_STATUS_SUCCESS   = True
282
      softStatus     OP_STATUS_QUEUED    = True
283
      softStatus     _                   = False
284
      extractOpSt [] _ True = JOB_STATUS_SUCCESS
285
      extractOpSt [] d False = d
286
      extractOpSt (x:xs) d old_all
287
           | terminalStatus x = opStatusToJob x -- abort recursion
288
           | softStatus x     = extractOpSt xs d new_all -- continue unchanged
289
           | otherwise        = extractOpSt xs (opStatusToJob x) new_all
290
           where new_all = x == OP_STATUS_SUCCESS && old_all
291

    
292
-- | Determine if a job has started
293
jobStarted :: QueuedJob -> Bool
294
jobStarted = (> JOB_STATUS_QUEUED) . calcJobStatus
295

    
296
-- | Determine if a job is finalised.
297
jobFinalized :: QueuedJob -> Bool
298
jobFinalized = (> JOB_STATUS_RUNNING) . calcJobStatus
299

    
300
-- | Determine if a job is finalized and its timestamp is before
301
-- a given time.
302
jobArchivable :: Timestamp -> QueuedJob -> Bool
303
jobArchivable ts = liftA2 (&&) jobFinalized 
304
  $ maybe False (< ts)
305
    .  liftA2 (<|>) qjEndTimestamp qjStartTimestamp
306

    
307
-- | Determine whether an opcode status is finalized.
308
opStatusFinalized :: OpStatus -> Bool
309
opStatusFinalized = (> OP_STATUS_RUNNING)
310

    
311
-- | Compute a job's priority.
312
calcJobPriority :: QueuedJob -> Int
313
calcJobPriority QueuedJob { qjOps = ops } =
314
  helper . map qoPriority $ filter (not . opStatusFinalized . qoStatus) ops
315
    where helper [] = C.opPrioDefault
316
          helper ps = minimum ps
317

    
318
-- | Log but ignore an 'IOError'.
319
ignoreIOError :: a -> Bool -> String -> IOError -> IO a
320
ignoreIOError a ignore_noent msg e = do
321
  unless (isDoesNotExistError e && ignore_noent) .
322
    logWarning $ msg ++ ": " ++ show e
323
  return a
324

    
325
-- | Compute the list of existing archive directories. Note that I/O
326
-- exceptions are swallowed and ignored.
327
allArchiveDirs :: FilePath -> IO [FilePath]
328
allArchiveDirs rootdir = do
329
  let adir = rootdir </> jobQueueArchiveSubDir
330
  contents <- getDirectoryContents adir `Control.Exception.catch`
331
               ignoreIOError [] False
332
                 ("Failed to list queue directory " ++ adir)
333
  let fpaths = map (adir </>) $ filter (not . ("." `isPrefixOf`)) contents
334
  filterM (\path ->
335
             liftM isDirectory (getFileStatus (adir </> path))
336
               `Control.Exception.catch`
337
               ignoreIOError False True
338
                 ("Failed to stat archive path " ++ path)) fpaths
339

    
340
-- | Build list of directories containing job files. Note: compared to
341
-- the Python version, this doesn't ignore a potential lost+found
342
-- file.
343
determineJobDirectories :: FilePath -> Bool -> IO [FilePath]
344
determineJobDirectories rootdir archived = do
345
  other <- if archived
346
             then allArchiveDirs rootdir
347
             else return []
348
  return $ rootdir:other
349

    
350
-- Function equivalent to the \'sequence\' function, that cannot be used because
351
-- of library version conflict on Lucid.
352
-- FIXME: delete this and just use \'sequence\' instead when Lucid compatibility
353
-- will not be required anymore.
354
sequencer :: [Either IOError [JobId]] -> Either IOError [[JobId]]
355
sequencer l = fmap reverse $ foldl seqFolder (Right []) l
356

    
357
-- | Folding function for joining multiple [JobIds] into one list.
358
seqFolder :: Either IOError [[JobId]]
359
          -> Either IOError [JobId]
360
          -> Either IOError [[JobId]]
361
seqFolder (Left e) _ = Left e
362
seqFolder (Right _) (Left e) = Left e
363
seqFolder (Right l) (Right el) = Right $ el:l
364

    
365
-- | Computes the list of all jobs in the given directories.
366
getJobIDs :: [FilePath] -> IO (Either IOError [JobId])
367
getJobIDs paths = liftM (fmap concat . sequencer) (mapM getDirJobIDs paths)
368

    
369
-- | Sorts the a list of job IDs.
370
sortJobIDs :: [JobId] -> [JobId]
371
sortJobIDs = sortBy (comparing fromJobId)
372

    
373
-- | Computes the list of jobs in a given directory.
374
getDirJobIDs :: FilePath -> IO (Either IOError [JobId])
375
getDirJobIDs path = do
376
  either_contents <-
377
    try (getDirectoryContents path) :: IO (Either IOError [FilePath])
378
  case either_contents of
379
    Left e -> do
380
      logWarning $ "Failed to list job directory " ++ path ++ ": " ++ show e
381
      return $ Left e
382
    Right contents -> do
383
      let jids = foldl (\ids file ->
384
                         case parseJobFileId file of
385
                           Nothing -> ids
386
                           Just new_id -> new_id:ids) [] contents
387
      return . Right $ reverse jids
388

    
389
-- | Reads the job data from disk.
390
readJobDataFromDisk :: FilePath -> Bool -> JobId -> IO (Maybe (String, Bool))
391
readJobDataFromDisk rootdir archived jid = do
392
  let live_path = liveJobFile rootdir jid
393
      archived_path = archivedJobFile rootdir jid
394
      all_paths = if archived
395
                    then [(live_path, False), (archived_path, True)]
396
                    else [(live_path, False)]
397
  foldM (\state (path, isarchived) ->
398
           liftM (\r -> Just (r, isarchived)) (readFile path)
399
             `Control.Exception.catch`
400
             ignoreIOError state True
401
               ("Failed to read job file " ++ path)) Nothing all_paths
402

    
403
-- | Failed to load job error.
404
noSuchJob :: Result (QueuedJob, Bool)
405
noSuchJob = Bad "Can't load job file"
406

    
407
-- | Loads a job from disk.
408
loadJobFromDisk :: FilePath -> Bool -> JobId -> IO (Result (QueuedJob, Bool))
409
loadJobFromDisk rootdir archived jid = do
410
  raw <- readJobDataFromDisk rootdir archived jid
411
  -- note: we need some stricness below, otherwise the wrapping in a
412
  -- Result will create too much lazyness, and not close the file
413
  -- descriptors for the individual jobs
414
  return $! case raw of
415
             Nothing -> noSuchJob
416
             Just (str, arch) ->
417
               liftM (\qj -> (qj, arch)) .
418
               fromJResult "Parsing job file" $ Text.JSON.decode str
419

    
420
-- | Write a job to disk.
421
writeJobToDisk :: FilePath -> QueuedJob -> IO (Result ())
422
writeJobToDisk rootdir job = do
423
  let filename = liveJobFile rootdir . qjId $ job
424
      content = Text.JSON.encode . Text.JSON.showJSON $ job
425
  tryAndLogIOError (atomicWriteFile filename content)
426
                   ("Failed to write " ++ filename) Ok
427

    
428
-- | Replicate a job to all master candidates.
429
replicateJob :: FilePath -> [Node] -> QueuedJob -> IO [(Node, ERpcError ())]
430
replicateJob rootdir mastercandidates job = do
431
  let filename = liveJobFile rootdir . qjId $ job
432
      content = Text.JSON.encode . Text.JSON.showJSON $ job
433
  filename' <- makeVirtualPath filename
434
  callresult <- executeRpcCall mastercandidates
435
                  $ RpcCallJobqueueUpdate filename' content
436
  let result = map (second (() <$)) callresult
437
  logRpcErrors result
438
  return result
439

    
440
-- | Replicate many jobs to all master candidates.
441
replicateManyJobs :: FilePath -> [Node] -> [QueuedJob] -> IO ()
442
replicateManyJobs rootdir mastercandidates =
443
  mapM_ (replicateJob rootdir mastercandidates)
444

    
445
-- | Read the job serial number from disk.
446
readSerialFromDisk :: IO (Result JobId)
447
readSerialFromDisk = do
448
  filename <- jobQueueSerialFile
449
  tryAndLogIOError (readFile filename) "Failed to read serial file"
450
                   (makeJobIdS . rStripSpace)
451

    
452
-- | Allocate new job ids.
453
-- To avoid races while accessing the serial file, the threads synchronize
454
-- over a lock, as usual provided by an MVar.
455
allocateJobIds :: [Node] -> MVar () -> Int -> IO (Result [JobId])
456
allocateJobIds mastercandidates lock n =
457
  if n <= 0
458
    then return . Bad $ "Can only allocate positive number of job ids"
459
    else do
460
      takeMVar lock
461
      rjobid <- readSerialFromDisk
462
      case rjobid of
463
        Bad s -> do
464
          putMVar lock ()
465
          return . Bad $ s
466
        Ok jid -> do
467
          let current = fromJobId jid
468
              serial_content = show (current + n) ++  "\n"
469
          serial <- jobQueueSerialFile
470
          write_result <- try $ atomicWriteFile serial serial_content
471
                          :: IO (Either IOError ())
472
          case write_result of
473
            Left e -> do
474
              putMVar lock ()
475
              let msg = "Failed to write serial file: " ++ show e
476
              logError msg
477
              return . Bad $ msg 
478
            Right () -> do
479
              serial' <- makeVirtualPath serial
480
              _ <- executeRpcCall mastercandidates
481
                     $ RpcCallJobqueueUpdate serial' serial_content
482
              putMVar lock ()
483
              return $ mapM makeJobId [(current+1)..(current+n)]
484

    
485
-- | Allocate one new job id.
486
allocateJobId :: [Node] -> MVar () -> IO (Result JobId)
487
allocateJobId mastercandidates lock = do
488
  jids <- allocateJobIds mastercandidates lock 1
489
  return (jids >>= monadicThe "Failed to allocate precisely one Job ID")
490

    
491
-- | Decide if job queue is open
492
isQueueOpen :: IO Bool
493
isQueueOpen = liftM not (jobQueueDrainFile >>= doesFileExist)
494

    
495
-- | Start enqueued jobs, currently by handing them over to masterd.
496
startJobs :: [QueuedJob] -> IO ()
497
startJobs jobs = do
498
  socketpath <- defaultMasterSocket
499
  client <- getLuxiClient socketpath
500
  pickupResults <- mapM (flip callMethod client . PickupJob . qjId) jobs
501
  let failures = map show $ justBad pickupResults
502
  unless (null failures)
503
   . logWarning . (++) "Failed to notify masterd: " . commaJoin $ failures
504

    
505
-- | Try to cancel a job that has already been handed over to execution,
506
-- currently by asking masterd to cancel it.
507
cancelJob :: JobId -> IO (ErrorResult JSValue)
508
cancelJob jid = do
509
  socketpath <- defaultMasterSocket
510
  client <- getLuxiClient socketpath
511
  callMethod (CancelJob jid) client
512

    
513
-- | Permissions for the archive directories.
514
queueDirPermissions :: FilePermissions
515
queueDirPermissions = FilePermissions { fpOwner = Just C.masterdUser
516
                                      , fpGroup = Just C.daemonsGroup
517
                                      , fpPermissions = 0o0750
518
                                      }
519

    
520
-- | Try, at most until the given endtime, to archive some of the given
521
-- jobs, if they are older than the specified cut-off time; also replicate
522
-- archival of the additional jobs. Return the pair of the number of jobs
523
-- archived, and the number of jobs remaining int he queue, asuming the
524
-- given numbers about the not considered jobs.
525
archiveSomeJobsUntil :: ([JobId] -> IO ()) -- ^ replication function
526
                        -> FilePath -- ^ queue root directory
527
                        -> ClockTime -- ^ Endtime
528
                        -> Timestamp -- ^ cut-off time for archiving jobs
529
                        -> Int -- ^ number of jobs alread archived
530
                        -> [JobId] -- ^ Additional jobs to replicate
531
                        -> [JobId] -- ^ List of job-ids still to consider
532
                        -> IO (Int, Int)
533
archiveSomeJobsUntil replicateFn _ _ _ arch torepl [] = do
534
  unless (null torepl) . (>> return ())
535
   . forkIO $ replicateFn torepl
536
  return (arch, 0)
537

    
538
archiveSomeJobsUntil replicateFn qDir endt cutt arch torepl (jid:jids) = do
539
  let archiveMore = archiveSomeJobsUntil replicateFn qDir endt cutt
540
      continue = archiveMore arch torepl jids
541
      jidname = show $ fromJobId jid
542
  time <- getClockTime
543
  if time >= endt
544
    then do
545
      _ <- forkIO $ replicateFn torepl
546
      return (arch, length (jid:jids))
547
    else do
548
      logDebug $ "Inspecting job " ++ jidname ++ " for archival"
549
      loadResult <- loadJobFromDisk qDir False jid
550
      case loadResult of
551
        Bad _ -> continue
552
        Ok (job, _) -> 
553
          if jobArchivable cutt job
554
            then do
555
              let live = liveJobFile qDir jid
556
                  archive = archivedJobFile qDir jid
557
              renameResult <- safeRenameFile queueDirPermissions
558
                                live archive
559
              case renameResult of                   
560
                Bad s -> do
561
                  logWarning $ "Renaming " ++ live ++ " to " ++ archive
562
                                 ++ " failed unexpectedly: " ++ s
563
                  continue
564
                Ok () -> do
565
                  let torepl' = jid:torepl
566
                  if length torepl' >= 10
567
                    then do
568
                      _ <- forkIO $ replicateFn torepl'
569
                      archiveMore (arch + 1) [] jids
570
                    else archiveMore (arch + 1) torepl' jids
571
            else continue
572
                   
573
-- | Archive jobs older than the given time, but do not exceed the timeout for
574
-- carrying out this task.
575
archiveJobs :: ConfigData -- ^ cluster configuration
576
               -> Int  -- ^ time the job has to be in the past in order
577
                       -- to be archived
578
               -> Int -- ^ timeout
579
               -> [JobId] -- ^ jobs to consider
580
               -> IO (Int, Int)
581
archiveJobs cfg age timeout jids = do
582
  now <- getClockTime
583
  qDir <- queueDir
584
  let endtime = addToClockTime (noTimeDiff { tdSec = timeout }) now
585
      cuttime = if age < 0 then noTimestamp
586
                           else advanceTimestamp (- age) (fromClockTime now)
587
      mcs = Config.getMasterCandidates cfg
588
      replicateFn jobs = do
589
        let olds = map (liveJobFile qDir) jobs
590
            news = map (archivedJobFile qDir) jobs
591
        _ <- executeRpcCall mcs . RpcCallJobqueueRename $ zip olds news
592
        return ()
593
  archiveSomeJobsUntil replicateFn qDir endtime cuttime 0 [] jids