Statistics
| Branch: | Tag: | Revision:

root / src / Ganeti / JQueue.hs @ a6b33b72

History | View | Annotate | Download (22.3 kB)

1
{-# LANGUAGE TemplateHaskell #-}
2

    
3
{-| Implementation of the job queue.
4

    
5
-}
6

    
7
{-
8

    
9
Copyright (C) 2010, 2012 Google Inc.
10

    
11
This program is free software; you can redistribute it and/or modify
12
it under the terms of the GNU General Public License as published by
13
the Free Software Foundation; either version 2 of the License, or
14
(at your option) any later version.
15

    
16
This program is distributed in the hope that it will be useful, but
17
WITHOUT ANY WARRANTY; without even the implied warranty of
18
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
19
General Public License for more details.
20

    
21
You should have received a copy of the GNU General Public License
22
along with this program; if not, write to the Free Software
23
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
24
02110-1301, USA.
25

    
26
-}
27

    
28
module Ganeti.JQueue
29
    ( QueuedOpCode(..)
30
    , QueuedJob(..)
31
    , InputOpCode(..)
32
    , queuedOpCodeFromMetaOpCode
33
    , queuedJobFromOpCodes
34
    , changeOpCodePriority
35
    , changeJobPriority
36
    , cancelQueuedJob
37
    , Timestamp
38
    , fromClockTime
39
    , noTimestamp
40
    , currentTimestamp
41
    , advanceTimestamp
42
    , setReceivedTimestamp
43
    , opStatusFinalized
44
    , extractOpSummary
45
    , calcJobStatus
46
    , jobStarted
47
    , jobFinalized
48
    , jobArchivable
49
    , calcJobPriority
50
    , jobFileName
51
    , liveJobFile
52
    , archivedJobFile
53
    , determineJobDirectories
54
    , getJobIDs
55
    , sortJobIDs
56
    , loadJobFromDisk
57
    , noSuchJob
58
    , readSerialFromDisk
59
    , allocateJobIds
60
    , allocateJobId
61
    , writeJobToDisk
62
    , replicateManyJobs
63
    , isQueueOpen
64
    , startJobs
65
    , cancelJob
66
    , queueDirPermissions
67
    , archiveJobs
68
    ) where
69

    
70
import Control.Applicative (liftA2, (<|>))
71
import Control.Arrow (first, second)
72
import Control.Concurrent (forkIO)
73
import Control.Concurrent.MVar
74
import Control.Exception
75
import Control.Monad
76
import Data.Functor ((<$))
77
import Data.List
78
import Data.Maybe
79
import Data.Ord (comparing)
80
-- workaround what seems to be a bug in ghc 7.4's TH shadowing code
81
import Prelude hiding (id, log)
82
import System.Directory
83
import System.FilePath
84
import System.IO.Error (isDoesNotExistError)
85
import System.Posix.Files
86
import System.Time
87
import qualified Text.JSON
88
import Text.JSON.Types
89

    
90
import Ganeti.BasicTypes
91
import qualified Ganeti.Config as Config
92
import qualified Ganeti.Constants as C
93
import Ganeti.Errors (ErrorResult)
94
import Ganeti.JSON
95
import Ganeti.Logging
96
import Ganeti.Luxi
97
import Ganeti.Objects (ConfigData, Node)
98
import Ganeti.OpCodes
99
import Ganeti.Path
100
import Ganeti.Rpc (executeRpcCall, ERpcError, logRpcErrors,
101
                   RpcCallJobqueueUpdate(..), RpcCallJobqueueRename(..))
102
import Ganeti.THH
103
import Ganeti.Types
104
import Ganeti.Utils
105
import Ganeti.VCluster (makeVirtualPath)
106

    
107
-- * Data types
108

    
109
-- | The ganeti queue timestamp type. It represents the time as the pair
110
-- of seconds since the epoch and microseconds since the beginning of the
111
-- second.
112
type Timestamp = (Int, Int)
113

    
114
-- | Missing timestamp type.
115
noTimestamp :: Timestamp
116
noTimestamp = (-1, -1)
117

    
118
-- | Obtain a Timestamp from a given clock time
119
fromClockTime :: ClockTime -> Timestamp
120
fromClockTime (TOD ctime pico) =
121
  (fromIntegral ctime, fromIntegral $ pico `div` 1000000)
122

    
123
-- | Get the current time in the job-queue timestamp format.
124
currentTimestamp :: IO Timestamp
125
currentTimestamp = fromClockTime `liftM` getClockTime
126

    
127
-- | From a given timestamp, obtain the timestamp of the
128
-- time that is the given number of seconds later.
129
advanceTimestamp :: Int -> Timestamp -> Timestamp
130
advanceTimestamp = first . (+)
131

    
132
-- | An input opcode.
133
data InputOpCode = ValidOpCode MetaOpCode -- ^ OpCode was parsed successfully
134
                 | InvalidOpCode JSValue  -- ^ Invalid opcode
135
                   deriving (Show, Eq)
136

    
137
-- | JSON instance for 'InputOpCode', trying to parse it and if
138
-- failing, keeping the original JSValue.
139
instance Text.JSON.JSON InputOpCode where
140
  showJSON (ValidOpCode mo) = Text.JSON.showJSON mo
141
  showJSON (InvalidOpCode inv) = inv
142
  readJSON v = case Text.JSON.readJSON v of
143
                 Text.JSON.Error _ -> return $ InvalidOpCode v
144
                 Text.JSON.Ok mo -> return $ ValidOpCode mo
145

    
146
-- | Invalid opcode summary.
147
invalidOp :: String
148
invalidOp = "INVALID_OP"
149

    
150
-- | Tries to extract the opcode summary from an 'InputOpCode'. This
151
-- duplicates some functionality from the 'opSummary' function in
152
-- "Ganeti.OpCodes".
153
extractOpSummary :: InputOpCode -> String
154
extractOpSummary (ValidOpCode metaop) = opSummary $ metaOpCode metaop
155
extractOpSummary (InvalidOpCode (JSObject o)) =
156
  case fromObjWithDefault (fromJSObject o) "OP_ID" ("OP_" ++ invalidOp) of
157
    Just s -> drop 3 s -- drop the OP_ prefix
158
    Nothing -> invalidOp
159
extractOpSummary _ = invalidOp
160

    
161
$(buildObject "QueuedOpCode" "qo"
162
  [ simpleField "input"           [t| InputOpCode |]
163
  , simpleField "status"          [t| OpStatus    |]
164
  , simpleField "result"          [t| JSValue     |]
165
  , defaultField [| [] |] $
166
    simpleField "log"             [t| [(Int, Timestamp, ELogType, JSValue)] |]
167
  , simpleField "priority"        [t| Int         |]
168
  , optionalNullSerField $
169
    simpleField "start_timestamp" [t| Timestamp   |]
170
  , optionalNullSerField $
171
    simpleField "exec_timestamp"  [t| Timestamp   |]
172
  , optionalNullSerField $
173
    simpleField "end_timestamp"   [t| Timestamp   |]
174
  ])
175

    
176
$(buildObject "QueuedJob" "qj"
177
  [ simpleField "id"                 [t| JobId          |]
178
  , simpleField "ops"                [t| [QueuedOpCode] |]
179
  , optionalNullSerField $
180
    simpleField "received_timestamp" [t| Timestamp      |]
181
  , optionalNullSerField $
182
    simpleField "start_timestamp"    [t| Timestamp      |]
183
  , optionalNullSerField $
184
    simpleField "end_timestamp"      [t| Timestamp      |]
185
  ])
186

    
187
-- | Convenience function to obtain a QueuedOpCode from a MetaOpCode
188
queuedOpCodeFromMetaOpCode :: MetaOpCode -> QueuedOpCode
189
queuedOpCodeFromMetaOpCode op =
190
  QueuedOpCode { qoInput = ValidOpCode op
191
               , qoStatus = OP_STATUS_QUEUED
192
               , qoPriority = opSubmitPriorityToRaw . opPriority . metaParams
193
                              $ op
194
               , qoLog = []
195
               , qoResult = JSNull
196
               , qoStartTimestamp = Nothing
197
               , qoEndTimestamp = Nothing
198
               , qoExecTimestamp = Nothing
199
               }
200

    
201
-- | From a job-id and a list of op-codes create a job. This is
202
-- the pure part of job creation, as allocating a new job id
203
-- lives in IO.
204
queuedJobFromOpCodes :: (Monad m) => JobId -> [MetaOpCode] -> m QueuedJob
205
queuedJobFromOpCodes jobid ops = do
206
  ops' <- mapM (`resolveDependencies` jobid) ops
207
  return QueuedJob { qjId = jobid
208
                   , qjOps = map queuedOpCodeFromMetaOpCode ops'
209
                   , qjReceivedTimestamp = Nothing 
210
                   , qjStartTimestamp = Nothing
211
                   , qjEndTimestamp = Nothing
212
                   }
213

    
214
-- | Attach a received timestamp to a Queued Job.
215
setReceivedTimestamp :: Timestamp -> QueuedJob -> QueuedJob
216
setReceivedTimestamp ts job = job { qjReceivedTimestamp = Just ts }
217

    
218
-- | Change the priority of a QueuedOpCode, if it is not already
219
-- finalized.
220
changeOpCodePriority :: Int -> QueuedOpCode -> QueuedOpCode
221
changeOpCodePriority prio op =
222
  if qoStatus op > OP_STATUS_RUNNING
223
     then op
224
     else op { qoPriority = prio }
225

    
226
-- | Set the state of a QueuedOpCode to canceled.
227
cancelOpCode :: Timestamp -> QueuedOpCode -> QueuedOpCode
228
cancelOpCode now op =
229
  op { qoStatus = OP_STATUS_CANCELED, qoEndTimestamp = Just now }
230

    
231
-- | Change the priority of a job, i.e., change the priority of the
232
-- non-finalized opcodes.
233
changeJobPriority :: Int -> QueuedJob -> QueuedJob
234
changeJobPriority prio job =
235
  job { qjOps = map (changeOpCodePriority prio) $ qjOps job }
236

    
237
-- | Transform a QueuedJob that has not been started into its canceled form.
238
cancelQueuedJob :: Timestamp -> QueuedJob -> QueuedJob
239
cancelQueuedJob now job =
240
  let ops' = map (cancelOpCode now) $ qjOps job
241
  in job { qjOps = ops', qjEndTimestamp = Just now}
242

    
243
-- | Job file prefix.
244
jobFilePrefix :: String
245
jobFilePrefix = "job-"
246

    
247
-- | Computes the filename for a given job ID.
248
jobFileName :: JobId -> FilePath
249
jobFileName jid = jobFilePrefix ++ show (fromJobId jid)
250

    
251
-- | Parses a job ID from a file name.
252
parseJobFileId :: (Monad m) => FilePath -> m JobId
253
parseJobFileId path =
254
  case stripPrefix jobFilePrefix path of
255
    Nothing -> fail $ "Job file '" ++ path ++
256
                      "' doesn't have the correct prefix"
257
    Just suffix -> makeJobIdS suffix
258

    
259
-- | Computes the full path to a live job.
260
liveJobFile :: FilePath -> JobId -> FilePath
261
liveJobFile rootdir jid = rootdir </> jobFileName jid
262

    
263
-- | Computes the full path to an archives job. BROKEN.
264
archivedJobFile :: FilePath -> JobId -> FilePath
265
archivedJobFile rootdir jid =
266
  let subdir = show (fromJobId jid `div` C.jstoreJobsPerArchiveDirectory)
267
  in rootdir </> jobQueueArchiveSubDir </> subdir </> jobFileName jid
268

    
269
-- | Map from opcode status to job status.
270
opStatusToJob :: OpStatus -> JobStatus
271
opStatusToJob OP_STATUS_QUEUED    = JOB_STATUS_QUEUED
272
opStatusToJob OP_STATUS_WAITING   = JOB_STATUS_WAITING
273
opStatusToJob OP_STATUS_SUCCESS   = JOB_STATUS_SUCCESS
274
opStatusToJob OP_STATUS_RUNNING   = JOB_STATUS_RUNNING
275
opStatusToJob OP_STATUS_CANCELING = JOB_STATUS_CANCELING
276
opStatusToJob OP_STATUS_CANCELED  = JOB_STATUS_CANCELED
277
opStatusToJob OP_STATUS_ERROR     = JOB_STATUS_ERROR
278

    
279
-- | Computes a queued job's status.
280
calcJobStatus :: QueuedJob -> JobStatus
281
calcJobStatus QueuedJob { qjOps = ops } =
282
  extractOpSt (map qoStatus ops) JOB_STATUS_QUEUED True
283
    where
284
      terminalStatus OP_STATUS_ERROR     = True
285
      terminalStatus OP_STATUS_CANCELING = True
286
      terminalStatus OP_STATUS_CANCELED  = True
287
      terminalStatus _                   = False
288
      softStatus     OP_STATUS_SUCCESS   = True
289
      softStatus     OP_STATUS_QUEUED    = True
290
      softStatus     _                   = False
291
      extractOpSt [] _ True = JOB_STATUS_SUCCESS
292
      extractOpSt [] d False = d
293
      extractOpSt (x:xs) d old_all
294
           | terminalStatus x = opStatusToJob x -- abort recursion
295
           | softStatus x     = extractOpSt xs d new_all -- continue unchanged
296
           | otherwise        = extractOpSt xs (opStatusToJob x) new_all
297
           where new_all = x == OP_STATUS_SUCCESS && old_all
298

    
299
-- | Determine if a job has started
300
jobStarted :: QueuedJob -> Bool
301
jobStarted = (> JOB_STATUS_QUEUED) . calcJobStatus
302

    
303
-- | Determine if a job is finalised.
304
jobFinalized :: QueuedJob -> Bool
305
jobFinalized = (> JOB_STATUS_RUNNING) . calcJobStatus
306

    
307
-- | Determine if a job is finalized and its timestamp is before
308
-- a given time.
309
jobArchivable :: Timestamp -> QueuedJob -> Bool
310
jobArchivable ts = liftA2 (&&) jobFinalized 
311
  $ maybe False (< ts)
312
    .  liftA2 (<|>) qjEndTimestamp qjStartTimestamp
313

    
314
-- | Determine whether an opcode status is finalized.
315
opStatusFinalized :: OpStatus -> Bool
316
opStatusFinalized = (> OP_STATUS_RUNNING)
317

    
318
-- | Compute a job's priority.
319
calcJobPriority :: QueuedJob -> Int
320
calcJobPriority QueuedJob { qjOps = ops } =
321
  helper . map qoPriority $ filter (not . opStatusFinalized . qoStatus) ops
322
    where helper [] = C.opPrioDefault
323
          helper ps = minimum ps
324

    
325
-- | Log but ignore an 'IOError'.
326
ignoreIOError :: a -> Bool -> String -> IOError -> IO a
327
ignoreIOError a ignore_noent msg e = do
328
  unless (isDoesNotExistError e && ignore_noent) .
329
    logWarning $ msg ++ ": " ++ show e
330
  return a
331

    
332
-- | Compute the list of existing archive directories. Note that I/O
333
-- exceptions are swallowed and ignored.
334
allArchiveDirs :: FilePath -> IO [FilePath]
335
allArchiveDirs rootdir = do
336
  let adir = rootdir </> jobQueueArchiveSubDir
337
  contents <- getDirectoryContents adir `Control.Exception.catch`
338
               ignoreIOError [] False
339
                 ("Failed to list queue directory " ++ adir)
340
  let fpaths = map (adir </>) $ filter (not . ("." `isPrefixOf`)) contents
341
  filterM (\path ->
342
             liftM isDirectory (getFileStatus (adir </> path))
343
               `Control.Exception.catch`
344
               ignoreIOError False True
345
                 ("Failed to stat archive path " ++ path)) fpaths
346

    
347
-- | Build list of directories containing job files. Note: compared to
348
-- the Python version, this doesn't ignore a potential lost+found
349
-- file.
350
determineJobDirectories :: FilePath -> Bool -> IO [FilePath]
351
determineJobDirectories rootdir archived = do
352
  other <- if archived
353
             then allArchiveDirs rootdir
354
             else return []
355
  return $ rootdir:other
356

    
357
-- Function equivalent to the \'sequence\' function, that cannot be used because
358
-- of library version conflict on Lucid.
359
-- FIXME: delete this and just use \'sequence\' instead when Lucid compatibility
360
-- will not be required anymore.
361
sequencer :: [Either IOError [JobId]] -> Either IOError [[JobId]]
362
sequencer l = fmap reverse $ foldl seqFolder (Right []) l
363

    
364
-- | Folding function for joining multiple [JobIds] into one list.
365
seqFolder :: Either IOError [[JobId]]
366
          -> Either IOError [JobId]
367
          -> Either IOError [[JobId]]
368
seqFolder (Left e) _ = Left e
369
seqFolder (Right _) (Left e) = Left e
370
seqFolder (Right l) (Right el) = Right $ el:l
371

    
372
-- | Computes the list of all jobs in the given directories.
373
getJobIDs :: [FilePath] -> IO (Either IOError [JobId])
374
getJobIDs paths = liftM (fmap concat . sequencer) (mapM getDirJobIDs paths)
375

    
376
-- | Sorts the a list of job IDs.
377
sortJobIDs :: [JobId] -> [JobId]
378
sortJobIDs = sortBy (comparing fromJobId)
379

    
380
-- | Computes the list of jobs in a given directory.
381
getDirJobIDs :: FilePath -> IO (Either IOError [JobId])
382
getDirJobIDs path = do
383
  either_contents <-
384
    try (getDirectoryContents path) :: IO (Either IOError [FilePath])
385
  case either_contents of
386
    Left e -> do
387
      logWarning $ "Failed to list job directory " ++ path ++ ": " ++ show e
388
      return $ Left e
389
    Right contents -> do
390
      let jids = foldl (\ids file ->
391
                         case parseJobFileId file of
392
                           Nothing -> ids
393
                           Just new_id -> new_id:ids) [] contents
394
      return . Right $ reverse jids
395

    
396
-- | Reads the job data from disk.
397
readJobDataFromDisk :: FilePath -> Bool -> JobId -> IO (Maybe (String, Bool))
398
readJobDataFromDisk rootdir archived jid = do
399
  let live_path = liveJobFile rootdir jid
400
      archived_path = archivedJobFile rootdir jid
401
      all_paths = if archived
402
                    then [(live_path, False), (archived_path, True)]
403
                    else [(live_path, False)]
404
  foldM (\state (path, isarchived) ->
405
           liftM (\r -> Just (r, isarchived)) (readFile path)
406
             `Control.Exception.catch`
407
             ignoreIOError state True
408
               ("Failed to read job file " ++ path)) Nothing all_paths
409

    
410
-- | Failed to load job error.
411
noSuchJob :: Result (QueuedJob, Bool)
412
noSuchJob = Bad "Can't load job file"
413

    
414
-- | Loads a job from disk.
415
loadJobFromDisk :: FilePath -> Bool -> JobId -> IO (Result (QueuedJob, Bool))
416
loadJobFromDisk rootdir archived jid = do
417
  raw <- readJobDataFromDisk rootdir archived jid
418
  -- note: we need some stricness below, otherwise the wrapping in a
419
  -- Result will create too much lazyness, and not close the file
420
  -- descriptors for the individual jobs
421
  return $! case raw of
422
             Nothing -> noSuchJob
423
             Just (str, arch) ->
424
               liftM (\qj -> (qj, arch)) .
425
               fromJResult "Parsing job file" $ Text.JSON.decode str
426

    
427
-- | Write a job to disk.
428
writeJobToDisk :: FilePath -> QueuedJob -> IO (Result ())
429
writeJobToDisk rootdir job = do
430
  let filename = liveJobFile rootdir . qjId $ job
431
      content = Text.JSON.encode . Text.JSON.showJSON $ job
432
  tryAndLogIOError (atomicWriteFile filename content)
433
                   ("Failed to write " ++ filename) Ok
434

    
435
-- | Replicate a job to all master candidates.
436
replicateJob :: FilePath -> [Node] -> QueuedJob -> IO [(Node, ERpcError ())]
437
replicateJob rootdir mastercandidates job = do
438
  let filename = liveJobFile rootdir . qjId $ job
439
      content = Text.JSON.encode . Text.JSON.showJSON $ job
440
  filename' <- makeVirtualPath filename
441
  callresult <- executeRpcCall mastercandidates
442
                  $ RpcCallJobqueueUpdate filename' content
443
  let result = map (second (() <$)) callresult
444
  logRpcErrors result
445
  return result
446

    
447
-- | Replicate many jobs to all master candidates.
448
replicateManyJobs :: FilePath -> [Node] -> [QueuedJob] -> IO ()
449
replicateManyJobs rootdir mastercandidates =
450
  mapM_ (replicateJob rootdir mastercandidates)
451

    
452
-- | Read the job serial number from disk.
453
readSerialFromDisk :: IO (Result JobId)
454
readSerialFromDisk = do
455
  filename <- jobQueueSerialFile
456
  tryAndLogIOError (readFile filename) "Failed to read serial file"
457
                   (makeJobIdS . rStripSpace)
458

    
459
-- | Allocate new job ids.
460
-- To avoid races while accessing the serial file, the threads synchronize
461
-- over a lock, as usual provided by an MVar.
462
allocateJobIds :: [Node] -> MVar () -> Int -> IO (Result [JobId])
463
allocateJobIds mastercandidates lock n =
464
  if n <= 0
465
    then return . Bad $ "Can only allocate positive number of job ids"
466
    else do
467
      takeMVar lock
468
      rjobid <- readSerialFromDisk
469
      case rjobid of
470
        Bad s -> do
471
          putMVar lock ()
472
          return . Bad $ s
473
        Ok jid -> do
474
          let current = fromJobId jid
475
              serial_content = show (current + n) ++  "\n"
476
          serial <- jobQueueSerialFile
477
          write_result <- try $ atomicWriteFile serial serial_content
478
                          :: IO (Either IOError ())
479
          case write_result of
480
            Left e -> do
481
              putMVar lock ()
482
              let msg = "Failed to write serial file: " ++ show e
483
              logError msg
484
              return . Bad $ msg 
485
            Right () -> do
486
              serial' <- makeVirtualPath serial
487
              _ <- executeRpcCall mastercandidates
488
                     $ RpcCallJobqueueUpdate serial' serial_content
489
              putMVar lock ()
490
              return $ mapM makeJobId [(current+1)..(current+n)]
491

    
492
-- | Allocate one new job id.
493
allocateJobId :: [Node] -> MVar () -> IO (Result JobId)
494
allocateJobId mastercandidates lock = do
495
  jids <- allocateJobIds mastercandidates lock 1
496
  return (jids >>= monadicThe "Failed to allocate precisely one Job ID")
497

    
498
-- | Decide if job queue is open
499
isQueueOpen :: IO Bool
500
isQueueOpen = liftM not (jobQueueDrainFile >>= doesFileExist)
501

    
502
-- | Start enqueued jobs, currently by handing them over to masterd.
503
startJobs :: [QueuedJob] -> IO ()
504
startJobs jobs = do
505
  socketpath <- defaultMasterSocket
506
  client <- getLuxiClient socketpath
507
  pickupResults <- mapM (flip callMethod client . PickupJob . qjId) jobs
508
  let failures = map show $ justBad pickupResults
509
  unless (null failures)
510
   . logWarning . (++) "Failed to notify masterd: " . commaJoin $ failures
511

    
512
-- | Try to cancel a job that has already been handed over to execution,
513
-- currently by asking masterd to cancel it.
514
cancelJob :: JobId -> IO (ErrorResult JSValue)
515
cancelJob jid = do
516
  socketpath <- defaultMasterSocket
517
  client <- getLuxiClient socketpath
518
  callMethod (CancelJob jid) client
519

    
520
-- | Permissions for the archive directories.
521
queueDirPermissions :: FilePermissions
522
queueDirPermissions = FilePermissions { fpOwner = Just C.masterdUser
523
                                      , fpGroup = Just C.daemonsGroup
524
                                      , fpPermissions = 0o0750
525
                                      }
526

    
527
-- | Try, at most until the given endtime, to archive some of the given
528
-- jobs, if they are older than the specified cut-off time; also replicate
529
-- archival of the additional jobs. Return the pair of the number of jobs
530
-- archived, and the number of jobs remaining int he queue, asuming the
531
-- given numbers about the not considered jobs.
532
archiveSomeJobsUntil :: ([JobId] -> IO ()) -- ^ replication function
533
                        -> FilePath -- ^ queue root directory
534
                        -> ClockTime -- ^ Endtime
535
                        -> Timestamp -- ^ cut-off time for archiving jobs
536
                        -> Int -- ^ number of jobs alread archived
537
                        -> [JobId] -- ^ Additional jobs to replicate
538
                        -> [JobId] -- ^ List of job-ids still to consider
539
                        -> IO (Int, Int)
540
archiveSomeJobsUntil replicateFn _ _ _ arch torepl [] = do
541
  unless (null torepl) . (>> return ())
542
   . forkIO $ replicateFn torepl
543
  return (arch, 0)
544

    
545
archiveSomeJobsUntil replicateFn qDir endt cutt arch torepl (jid:jids) = do
546
  let archiveMore = archiveSomeJobsUntil replicateFn qDir endt cutt
547
      continue = archiveMore arch torepl jids
548
      jidname = show $ fromJobId jid
549
  time <- getClockTime
550
  if time >= endt
551
    then do
552
      _ <- forkIO $ replicateFn torepl
553
      return (arch, length (jid:jids))
554
    else do
555
      logDebug $ "Inspecting job " ++ jidname ++ " for archival"
556
      loadResult <- loadJobFromDisk qDir False jid
557
      case loadResult of
558
        Bad _ -> continue
559
        Ok (job, _) -> 
560
          if jobArchivable cutt job
561
            then do
562
              let live = liveJobFile qDir jid
563
                  archive = archivedJobFile qDir jid
564
              renameResult <- safeRenameFile queueDirPermissions
565
                                live archive
566
              case renameResult of                   
567
                Bad s -> do
568
                  logWarning $ "Renaming " ++ live ++ " to " ++ archive
569
                                 ++ " failed unexpectedly: " ++ s
570
                  continue
571
                Ok () -> do
572
                  let torepl' = jid:torepl
573
                  if length torepl' >= 10
574
                    then do
575
                      _ <- forkIO $ replicateFn torepl'
576
                      archiveMore (arch + 1) [] jids
577
                    else archiveMore (arch + 1) torepl' jids
578
            else continue
579
                   
580
-- | Archive jobs older than the given time, but do not exceed the timeout for
581
-- carrying out this task.
582
archiveJobs :: ConfigData -- ^ cluster configuration
583
               -> Int  -- ^ time the job has to be in the past in order
584
                       -- to be archived
585
               -> Int -- ^ timeout
586
               -> [JobId] -- ^ jobs to consider
587
               -> IO (Int, Int)
588
archiveJobs cfg age timeout jids = do
589
  now <- getClockTime
590
  qDir <- queueDir
591
  let endtime = addToClockTime (noTimeDiff { tdSec = timeout }) now
592
      cuttime = if age < 0 then noTimestamp
593
                           else advanceTimestamp (- age) (fromClockTime now)
594
      mcs = Config.getMasterCandidates cfg
595
      replicateFn jobs = do
596
        let olds = map (liveJobFile qDir) jobs
597
            news = map (archivedJobFile qDir) jobs
598
        _ <- executeRpcCall mcs . RpcCallJobqueueRename $ zip olds news
599
        return ()
600
  archiveSomeJobsUntil replicateFn qDir endtime cuttime 0 [] jids