Statistics
| Branch: | Tag: | Revision:

root / src / Ganeti / JQueue.hs @ ea7032da

History | View | Annotate | Download (20.8 kB)

1
{-# LANGUAGE TemplateHaskell #-}
2

    
3
{-| Implementation of the job queue.
4

    
5
-}
6

    
7
{-
8

    
9
Copyright (C) 2010, 2012 Google Inc.
10

    
11
This program is free software; you can redistribute it and/or modify
12
it under the terms of the GNU General Public License as published by
13
the Free Software Foundation; either version 2 of the License, or
14
(at your option) any later version.
15

    
16
This program is distributed in the hope that it will be useful, but
17
WITHOUT ANY WARRANTY; without even the implied warranty of
18
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
19
General Public License for more details.
20

    
21
You should have received a copy of the GNU General Public License
22
along with this program; if not, write to the Free Software
23
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
24
02110-1301, USA.
25

    
26
-}
27

    
28
module Ganeti.JQueue
29
    ( QueuedOpCode(..)
30
    , QueuedJob(..)
31
    , InputOpCode(..)
32
    , queuedOpCodeFromMetaOpCode
33
    , queuedJobFromOpCodes
34
    , cancelQueuedJob
35
    , Timestamp
36
    , fromClockTime
37
    , noTimestamp
38
    , currentTimestamp
39
    , advanceTimestamp
40
    , setReceivedTimestamp
41
    , opStatusFinalized
42
    , extractOpSummary
43
    , calcJobStatus
44
    , jobStarted
45
    , jobFinalized
46
    , jobArchivable
47
    , calcJobPriority
48
    , jobFileName
49
    , liveJobFile
50
    , archivedJobFile
51
    , determineJobDirectories
52
    , getJobIDs
53
    , sortJobIDs
54
    , loadJobFromDisk
55
    , noSuchJob
56
    , readSerialFromDisk
57
    , allocateJobIds
58
    , allocateJobId
59
    , writeJobToDisk
60
    , replicateManyJobs
61
    , isQueueOpen
62
    , startJobs
63
    , cancelJob
64
    , queueDirPermissions
65
    , archiveJobs
66
    ) where
67

    
68
import Control.Applicative (liftA2, (<|>))
69
import Control.Arrow (first, second)
70
import Control.Concurrent (forkIO)
71
import Control.Concurrent.MVar
72
import Control.Exception
73
import Control.Monad
74
import Control.Monad.IO.Class
75
import Data.Functor ((<$))
76
import Data.List
77
import Data.Maybe
78
import Data.Ord (comparing)
79
-- workaround what seems to be a bug in ghc 7.4's TH shadowing code
80
import Prelude hiding (id, log)
81
import System.Directory
82
import System.FilePath
83
import System.IO.Error (isDoesNotExistError)
84
import System.Posix.Files
85
import System.Time
86
import qualified Text.JSON
87
import Text.JSON.Types
88

    
89
import Ganeti.BasicTypes
90
import qualified Ganeti.Config as Config
91
import qualified Ganeti.Constants as C
92
import Ganeti.Errors (ErrorResult)
93
import Ganeti.JSON
94
import Ganeti.Logging
95
import Ganeti.Luxi
96
import Ganeti.Objects (ConfigData, Node)
97
import Ganeti.OpCodes
98
import Ganeti.Path
99
import Ganeti.Rpc (executeRpcCall, ERpcError, logRpcErrors,
100
                   RpcCallJobqueueUpdate(..), RpcCallJobqueueRename(..))
101
import Ganeti.THH
102
import Ganeti.Types
103
import Ganeti.Utils
104
import Ganeti.VCluster (makeVirtualPath)
105

    
106
-- * Data types
107

    
108
-- | The ganeti queue timestamp type. It represents the time as the pair
109
-- of seconds since the epoch and microseconds since the beginning of the
110
-- second.
111
type Timestamp = (Int, Int)
112

    
113
-- | Missing timestamp type.
114
noTimestamp :: Timestamp
115
noTimestamp = (-1, -1)
116

    
117
-- | Obtain a Timestamp from a given clock time
118
fromClockTime :: ClockTime -> Timestamp
119
fromClockTime (TOD ctime pico) =
120
  (fromIntegral ctime, fromIntegral $ pico `div` 1000000)
121

    
122
-- | Get the current time in the job-queue timestamp format.
123
currentTimestamp :: IO Timestamp
124
currentTimestamp = fromClockTime `liftM` getClockTime
125

    
126
-- | From a given timestamp, obtain the timestamp of the
127
-- time that is the given number of seconds later.
128
advanceTimestamp :: Int -> Timestamp -> Timestamp
129
advanceTimestamp = first . (+)
130

    
131
-- | An input opcode.
132
data InputOpCode = ValidOpCode MetaOpCode -- ^ OpCode was parsed successfully
133
                 | InvalidOpCode JSValue  -- ^ Invalid opcode
134
                   deriving (Show, Eq)
135

    
136
-- | JSON instance for 'InputOpCode', trying to parse it and if
137
-- failing, keeping the original JSValue.
138
instance Text.JSON.JSON InputOpCode where
139
  showJSON (ValidOpCode mo) = Text.JSON.showJSON mo
140
  showJSON (InvalidOpCode inv) = inv
141
  readJSON v = case Text.JSON.readJSON v of
142
                 Text.JSON.Error _ -> return $ InvalidOpCode v
143
                 Text.JSON.Ok mo -> return $ ValidOpCode mo
144

    
145
-- | Invalid opcode summary.
146
invalidOp :: String
147
invalidOp = "INVALID_OP"
148

    
149
-- | Tries to extract the opcode summary from an 'InputOpCode'. This
150
-- duplicates some functionality from the 'opSummary' function in
151
-- "Ganeti.OpCodes".
152
extractOpSummary :: InputOpCode -> String
153
extractOpSummary (ValidOpCode metaop) = opSummary $ metaOpCode metaop
154
extractOpSummary (InvalidOpCode (JSObject o)) =
155
  case fromObjWithDefault (fromJSObject o) "OP_ID" ("OP_" ++ invalidOp) of
156
    Just s -> drop 3 s -- drop the OP_ prefix
157
    Nothing -> invalidOp
158
extractOpSummary _ = invalidOp
159

    
160
$(buildObject "QueuedOpCode" "qo"
161
  [ simpleField "input"           [t| InputOpCode |]
162
  , simpleField "status"          [t| OpStatus    |]
163
  , simpleField "result"          [t| JSValue     |]
164
  , defaultField [| [] |] $
165
    simpleField "log"             [t| [(Int, Timestamp, ELogType, JSValue)] |]
166
  , simpleField "priority"        [t| Int         |]
167
  , optionalNullSerField $
168
    simpleField "start_timestamp" [t| Timestamp   |]
169
  , optionalNullSerField $
170
    simpleField "exec_timestamp"  [t| Timestamp   |]
171
  , optionalNullSerField $
172
    simpleField "end_timestamp"   [t| Timestamp   |]
173
  ])
174

    
175
$(buildObject "QueuedJob" "qj"
176
  [ simpleField "id"                 [t| JobId          |]
177
  , simpleField "ops"                [t| [QueuedOpCode] |]
178
  , optionalNullSerField $
179
    simpleField "received_timestamp" [t| Timestamp      |]
180
  , optionalNullSerField $
181
    simpleField "start_timestamp"    [t| Timestamp      |]
182
  , optionalNullSerField $
183
    simpleField "end_timestamp"      [t| Timestamp      |]
184
  ])
185

    
186
-- | Convenience function to obtain a QueuedOpCode from a MetaOpCode
187
queuedOpCodeFromMetaOpCode :: MetaOpCode -> QueuedOpCode
188
queuedOpCodeFromMetaOpCode op =
189
  QueuedOpCode { qoInput = ValidOpCode op
190
               , qoStatus = OP_STATUS_QUEUED
191
               , qoPriority = opSubmitPriorityToRaw . opPriority . metaParams
192
                              $ op
193
               , qoLog = []
194
               , qoResult = JSNull
195
               , qoStartTimestamp = Nothing
196
               , qoEndTimestamp = Nothing
197
               , qoExecTimestamp = Nothing
198
               }
199

    
200
-- | From a job-id and a list of op-codes create a job. This is
201
-- the pure part of job creation, as allocating a new job id
202
-- lives in IO.
203
queuedJobFromOpCodes :: (Monad m) => JobId -> [MetaOpCode] -> m QueuedJob
204
queuedJobFromOpCodes jobid ops = do
205
  ops' <- mapM (`resolveDependencies` jobid) ops
206
  return QueuedJob { qjId = jobid
207
                   , qjOps = map queuedOpCodeFromMetaOpCode ops'
208
                   , qjReceivedTimestamp = Nothing 
209
                   , qjStartTimestamp = Nothing
210
                   , qjEndTimestamp = Nothing
211
                   }
212

    
213
-- | Attach a received timestamp to a Queued Job.
214
setReceivedTimestamp :: Timestamp -> QueuedJob -> QueuedJob
215
setReceivedTimestamp ts job = job { qjReceivedTimestamp = Just ts }
216

    
217
-- | Set the state of a QueuedOpCode to canceled.
218
cancelOpCode :: Timestamp -> QueuedOpCode -> QueuedOpCode
219
cancelOpCode now op =
220
  op { qoStatus = OP_STATUS_CANCELED, qoEndTimestamp = Just now }
221

    
222
-- | Transform a QueuedJob that has not been started into its canceled form.
223
cancelQueuedJob :: Timestamp -> QueuedJob -> QueuedJob
224
cancelQueuedJob now job =
225
  let ops' = map (cancelOpCode now) $ qjOps job
226
  in job { qjOps = ops', qjEndTimestamp = Just now}
227

    
228
-- | Job file prefix.
229
jobFilePrefix :: String
230
jobFilePrefix = "job-"
231

    
232
-- | Computes the filename for a given job ID.
233
jobFileName :: JobId -> FilePath
234
jobFileName jid = jobFilePrefix ++ show (fromJobId jid)
235

    
236
-- | Parses a job ID from a file name.
237
parseJobFileId :: (Monad m) => FilePath -> m JobId
238
parseJobFileId path =
239
  case stripPrefix jobFilePrefix path of
240
    Nothing -> fail $ "Job file '" ++ path ++
241
                      "' doesn't have the correct prefix"
242
    Just suffix -> makeJobIdS suffix
243

    
244
-- | Computes the full path to a live job.
245
liveJobFile :: FilePath -> JobId -> FilePath
246
liveJobFile rootdir jid = rootdir </> jobFileName jid
247

    
248
-- | Computes the full path to an archives job. BROKEN.
249
archivedJobFile :: FilePath -> JobId -> FilePath
250
archivedJobFile rootdir jid =
251
  let subdir = show (fromJobId jid `div` C.jstoreJobsPerArchiveDirectory)
252
  in rootdir </> jobQueueArchiveSubDir </> subdir </> jobFileName jid
253

    
254
-- | Map from opcode status to job status.
255
opStatusToJob :: OpStatus -> JobStatus
256
opStatusToJob OP_STATUS_QUEUED    = JOB_STATUS_QUEUED
257
opStatusToJob OP_STATUS_WAITING   = JOB_STATUS_WAITING
258
opStatusToJob OP_STATUS_SUCCESS   = JOB_STATUS_SUCCESS
259
opStatusToJob OP_STATUS_RUNNING   = JOB_STATUS_RUNNING
260
opStatusToJob OP_STATUS_CANCELING = JOB_STATUS_CANCELING
261
opStatusToJob OP_STATUS_CANCELED  = JOB_STATUS_CANCELED
262
opStatusToJob OP_STATUS_ERROR     = JOB_STATUS_ERROR
263

    
264
-- | Computes a queued job's status.
265
calcJobStatus :: QueuedJob -> JobStatus
266
calcJobStatus QueuedJob { qjOps = ops } =
267
  extractOpSt (map qoStatus ops) JOB_STATUS_QUEUED True
268
    where
269
      terminalStatus OP_STATUS_ERROR     = True
270
      terminalStatus OP_STATUS_CANCELING = True
271
      terminalStatus OP_STATUS_CANCELED  = True
272
      terminalStatus _                   = False
273
      softStatus     OP_STATUS_SUCCESS   = True
274
      softStatus     OP_STATUS_QUEUED    = True
275
      softStatus     _                   = False
276
      extractOpSt [] _ True = JOB_STATUS_SUCCESS
277
      extractOpSt [] d False = d
278
      extractOpSt (x:xs) d old_all
279
           | terminalStatus x = opStatusToJob x -- abort recursion
280
           | softStatus x     = extractOpSt xs d new_all -- continue unchanged
281
           | otherwise        = extractOpSt xs (opStatusToJob x) new_all
282
           where new_all = x == OP_STATUS_SUCCESS && old_all
283

    
284
-- | Determine if a job has started
285
jobStarted :: QueuedJob -> Bool
286
jobStarted = (> JOB_STATUS_QUEUED) . calcJobStatus
287

    
288
-- | Determine if a job is finalised.
289
jobFinalized :: QueuedJob -> Bool
290
jobFinalized = (> JOB_STATUS_RUNNING) . calcJobStatus
291

    
292
-- | Determine if a job is finalized and its timestamp is before
293
-- a given time.
294
jobArchivable :: Timestamp -> QueuedJob -> Bool
295
jobArchivable ts = liftA2 (&&) jobFinalized 
296
  $ maybe False (< ts)
297
    .  liftA2 (<|>) qjEndTimestamp qjStartTimestamp
298

    
299
-- | Determine whether an opcode status is finalized.
300
opStatusFinalized :: OpStatus -> Bool
301
opStatusFinalized = (> OP_STATUS_RUNNING)
302

    
303
-- | Compute a job's priority.
304
calcJobPriority :: QueuedJob -> Int
305
calcJobPriority QueuedJob { qjOps = ops } =
306
  helper . map qoPriority $ filter (not . opStatusFinalized . qoStatus) ops
307
    where helper [] = C.opPrioDefault
308
          helper ps = minimum ps
309

    
310
-- | Log but ignore an 'IOError'.
311
ignoreIOError :: a -> Bool -> String -> IOError -> IO a
312
ignoreIOError a ignore_noent msg e = do
313
  unless (isDoesNotExistError e && ignore_noent) .
314
    logWarning $ msg ++ ": " ++ show e
315
  return a
316

    
317
-- | Compute the list of existing archive directories. Note that I/O
318
-- exceptions are swallowed and ignored.
319
allArchiveDirs :: FilePath -> IO [FilePath]
320
allArchiveDirs rootdir = do
321
  let adir = rootdir </> jobQueueArchiveSubDir
322
  contents <- getDirectoryContents adir `Control.Exception.catch`
323
               ignoreIOError [] False
324
                 ("Failed to list queue directory " ++ adir)
325
  let fpaths = map (adir </>) $ filter (not . ("." `isPrefixOf`)) contents
326
  filterM (\path ->
327
             liftM isDirectory (getFileStatus (adir </> path))
328
               `Control.Exception.catch`
329
               ignoreIOError False True
330
                 ("Failed to stat archive path " ++ path)) fpaths
331

    
332
-- | Build list of directories containing job files. Note: compared to
333
-- the Python version, this doesn't ignore a potential lost+found
334
-- file.
335
determineJobDirectories :: FilePath -> Bool -> IO [FilePath]
336
determineJobDirectories rootdir archived = do
337
  other <- if archived
338
             then allArchiveDirs rootdir
339
             else return []
340
  return $ rootdir:other
341

    
342
-- | Computes the list of all jobs in the given directories.
343
getJobIDs :: [FilePath] -> IO (GenericResult IOError [JobId])
344
getJobIDs = runResultT . liftM concat . mapM getDirJobIDs
345

    
346
-- | Sorts the a list of job IDs.
347
sortJobIDs :: [JobId] -> [JobId]
348
sortJobIDs = sortBy (comparing fromJobId)
349

    
350
-- | Computes the list of jobs in a given directory.
351
getDirJobIDs :: FilePath -> ResultT IOError IO [JobId]
352
getDirJobIDs path =
353
  withErrorLogAt WARNING ("Failed to list job directory " ++ path) .
354
    liftM (mapMaybe parseJobFileId) $ liftIO (getDirectoryContents path)
355

    
356
-- | Reads the job data from disk.
357
readJobDataFromDisk :: FilePath -> Bool -> JobId -> IO (Maybe (String, Bool))
358
readJobDataFromDisk rootdir archived jid = do
359
  let live_path = liveJobFile rootdir jid
360
      archived_path = archivedJobFile rootdir jid
361
      all_paths = if archived
362
                    then [(live_path, False), (archived_path, True)]
363
                    else [(live_path, False)]
364
  foldM (\state (path, isarchived) ->
365
           liftM (\r -> Just (r, isarchived)) (readFile path)
366
             `Control.Exception.catch`
367
             ignoreIOError state True
368
               ("Failed to read job file " ++ path)) Nothing all_paths
369

    
370
-- | Failed to load job error.
371
noSuchJob :: Result (QueuedJob, Bool)
372
noSuchJob = Bad "Can't load job file"
373

    
374
-- | Loads a job from disk.
375
loadJobFromDisk :: FilePath -> Bool -> JobId -> IO (Result (QueuedJob, Bool))
376
loadJobFromDisk rootdir archived jid = do
377
  raw <- readJobDataFromDisk rootdir archived jid
378
  -- note: we need some stricness below, otherwise the wrapping in a
379
  -- Result will create too much lazyness, and not close the file
380
  -- descriptors for the individual jobs
381
  return $! case raw of
382
             Nothing -> noSuchJob
383
             Just (str, arch) ->
384
               liftM (\qj -> (qj, arch)) .
385
               fromJResult "Parsing job file" $ Text.JSON.decode str
386

    
387
-- | Write a job to disk.
388
writeJobToDisk :: FilePath -> QueuedJob -> IO (Result ())
389
writeJobToDisk rootdir job = do
390
  let filename = liveJobFile rootdir . qjId $ job
391
      content = Text.JSON.encode . Text.JSON.showJSON $ job
392
  tryAndLogIOError (atomicWriteFile filename content)
393
                   ("Failed to write " ++ filename) Ok
394

    
395
-- | Replicate a job to all master candidates.
396
replicateJob :: FilePath -> [Node] -> QueuedJob -> IO [(Node, ERpcError ())]
397
replicateJob rootdir mastercandidates job = do
398
  let filename = liveJobFile rootdir . qjId $ job
399
      content = Text.JSON.encode . Text.JSON.showJSON $ job
400
  filename' <- makeVirtualPath filename
401
  callresult <- executeRpcCall mastercandidates
402
                  $ RpcCallJobqueueUpdate filename' content
403
  let result = map (second (() <$)) callresult
404
  logRpcErrors result
405
  return result
406

    
407
-- | Replicate many jobs to all master candidates.
408
replicateManyJobs :: FilePath -> [Node] -> [QueuedJob] -> IO ()
409
replicateManyJobs rootdir mastercandidates =
410
  mapM_ (replicateJob rootdir mastercandidates)
411

    
412
-- | Read the job serial number from disk.
413
readSerialFromDisk :: IO (Result JobId)
414
readSerialFromDisk = do
415
  filename <- jobQueueSerialFile
416
  tryAndLogIOError (readFile filename) "Failed to read serial file"
417
                   (makeJobIdS . rStripSpace)
418

    
419
-- | Allocate new job ids.
420
-- To avoid races while accessing the serial file, the threads synchronize
421
-- over a lock, as usual provided by an MVar.
422
allocateJobIds :: [Node] -> MVar () -> Int -> IO (Result [JobId])
423
allocateJobIds mastercandidates lock n =
424
  if n <= 0
425
    then return . Bad $ "Can only allocate positive number of job ids"
426
    else do
427
      takeMVar lock
428
      rjobid <- readSerialFromDisk
429
      case rjobid of
430
        Bad s -> do
431
          putMVar lock ()
432
          return . Bad $ s
433
        Ok jid -> do
434
          let current = fromJobId jid
435
              serial_content = show (current + n) ++  "\n"
436
          serial <- jobQueueSerialFile
437
          write_result <- try $ atomicWriteFile serial serial_content
438
                          :: IO (Either IOError ())
439
          case write_result of
440
            Left e -> do
441
              putMVar lock ()
442
              let msg = "Failed to write serial file: " ++ show e
443
              logError msg
444
              return . Bad $ msg 
445
            Right () -> do
446
              serial' <- makeVirtualPath serial
447
              _ <- executeRpcCall mastercandidates
448
                     $ RpcCallJobqueueUpdate serial' serial_content
449
              putMVar lock ()
450
              return $ mapM makeJobId [(current+1)..(current+n)]
451

    
452
-- | Allocate one new job id.
453
allocateJobId :: [Node] -> MVar () -> IO (Result JobId)
454
allocateJobId mastercandidates lock = do
455
  jids <- allocateJobIds mastercandidates lock 1
456
  return (jids >>= monadicThe "Failed to allocate precisely one Job ID")
457

    
458
-- | Decide if job queue is open
459
isQueueOpen :: IO Bool
460
isQueueOpen = liftM not (jobQueueDrainFile >>= doesFileExist)
461

    
462
-- | Start enqueued jobs, currently by handing them over to masterd.
463
startJobs :: [QueuedJob] -> IO ()
464
startJobs jobs = do
465
  socketpath <- defaultMasterSocket
466
  client <- getLuxiClient socketpath
467
  pickupResults <- mapM (flip callMethod client . PickupJob . qjId) jobs
468
  let failures = map show $ justBad pickupResults
469
  unless (null failures)
470
   . logWarning . (++) "Failed to notify masterd: " . commaJoin $ failures
471

    
472
-- | Try to cancel a job that has already been handed over to execution,
473
-- currently by asking masterd to cancel it.
474
cancelJob :: JobId -> IO (ErrorResult JSValue)
475
cancelJob jid = do
476
  socketpath <- defaultMasterSocket
477
  client <- getLuxiClient socketpath
478
  callMethod (CancelJob jid) client
479

    
480
-- | Permissions for the archive directories.
481
queueDirPermissions :: FilePermissions
482
queueDirPermissions = FilePermissions { fpOwner = Just C.masterdUser
483
                                      , fpGroup = Just C.daemonsGroup
484
                                      , fpPermissions = 0o0750
485
                                      }
486

    
487
-- | Try, at most until the given endtime, to archive some of the given
488
-- jobs, if they are older than the specified cut-off time; also replicate
489
-- archival of the additional jobs. Return the pair of the number of jobs
490
-- archived, and the number of jobs remaining int he queue, asuming the
491
-- given numbers about the not considered jobs.
492
archiveSomeJobsUntil :: ([JobId] -> IO ()) -- ^ replication function
493
                        -> FilePath -- ^ queue root directory
494
                        -> ClockTime -- ^ Endtime
495
                        -> Timestamp -- ^ cut-off time for archiving jobs
496
                        -> Int -- ^ number of jobs alread archived
497
                        -> [JobId] -- ^ Additional jobs to replicate
498
                        -> [JobId] -- ^ List of job-ids still to consider
499
                        -> IO (Int, Int)
500
archiveSomeJobsUntil replicateFn _ _ _ arch torepl [] = do
501
  unless (null torepl) . (>> return ())
502
   . forkIO $ replicateFn torepl
503
  return (arch, 0)
504

    
505
archiveSomeJobsUntil replicateFn qDir endt cutt arch torepl (jid:jids) = do
506
  let archiveMore = archiveSomeJobsUntil replicateFn qDir endt cutt
507
      continue = archiveMore arch torepl jids
508
      jidname = show $ fromJobId jid
509
  time <- getClockTime
510
  if time >= endt
511
    then do
512
      _ <- forkIO $ replicateFn torepl
513
      return (arch, length (jid:jids))
514
    else do
515
      logDebug $ "Inspecting job " ++ jidname ++ " for archival"
516
      loadResult <- loadJobFromDisk qDir False jid
517
      case loadResult of
518
        Bad _ -> continue
519
        Ok (job, _) -> 
520
          if jobArchivable cutt job
521
            then do
522
              let live = liveJobFile qDir jid
523
                  archive = archivedJobFile qDir jid
524
              renameResult <- safeRenameFile queueDirPermissions
525
                                live archive
526
              case renameResult of                   
527
                Bad s -> do
528
                  logWarning $ "Renaming " ++ live ++ " to " ++ archive
529
                                 ++ " failed unexpectedly: " ++ s
530
                  continue
531
                Ok () -> do
532
                  let torepl' = jid:torepl
533
                  if length torepl' >= 10
534
                    then do
535
                      _ <- forkIO $ replicateFn torepl'
536
                      archiveMore (arch + 1) [] jids
537
                    else archiveMore (arch + 1) torepl' jids
538
            else continue
539
                   
540
-- | Archive jobs older than the given time, but do not exceed the timeout for
541
-- carrying out this task.
542
archiveJobs :: ConfigData -- ^ cluster configuration
543
               -> Int  -- ^ time the job has to be in the past in order
544
                       -- to be archived
545
               -> Int -- ^ timeout
546
               -> [JobId] -- ^ jobs to consider
547
               -> IO (Int, Int)
548
archiveJobs cfg age timeout jids = do
549
  now <- getClockTime
550
  qDir <- queueDir
551
  let endtime = addToClockTime (noTimeDiff { tdSec = timeout }) now
552
      cuttime = if age < 0 then noTimestamp
553
                           else advanceTimestamp (- age) (fromClockTime now)
554
      mcs = Config.getMasterCandidates cfg
555
      replicateFn jobs = do
556
        let olds = map (liveJobFile qDir) jobs
557
            news = map (archivedJobFile qDir) jobs
558
        _ <- executeRpcCall mcs . RpcCallJobqueueRename $ zip olds news
559
        return ()
560
  archiveSomeJobsUntil replicateFn qDir endtime cuttime 0 [] jids