Statistics
| Branch: | Tag: | Revision:

root / src / Ganeti / JQueue.hs @ 0c09ecc2

History | View | Annotate | Download (21.8 kB)

1
{-# LANGUAGE TemplateHaskell #-}
2

    
3
{-| Implementation of the job queue.
4

    
5
-}
6

    
7
{-
8

    
9
Copyright (C) 2010, 2012 Google Inc.
10

    
11
This program is free software; you can redistribute it and/or modify
12
it under the terms of the GNU General Public License as published by
13
the Free Software Foundation; either version 2 of the License, or
14
(at your option) any later version.
15

    
16
This program is distributed in the hope that it will be useful, but
17
WITHOUT ANY WARRANTY; without even the implied warranty of
18
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
19
General Public License for more details.
20

    
21
You should have received a copy of the GNU General Public License
22
along with this program; if not, write to the Free Software
23
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
24
02110-1301, USA.
25

    
26
-}
27

    
28
module Ganeti.JQueue
29
    ( QueuedOpCode(..)
30
    , QueuedJob(..)
31
    , InputOpCode(..)
32
    , queuedOpCodeFromMetaOpCode
33
    , queuedJobFromOpCodes
34
    , cancelQueuedJob
35
    , Timestamp
36
    , fromClockTime
37
    , noTimestamp
38
    , currentTimestamp
39
    , advanceTimestamp
40
    , setReceivedTimestamp
41
    , opStatusFinalized
42
    , extractOpSummary
43
    , calcJobStatus
44
    , jobStarted
45
    , jobFinalized
46
    , jobArchivable
47
    , calcJobPriority
48
    , jobFileName
49
    , liveJobFile
50
    , archivedJobFile
51
    , determineJobDirectories
52
    , getJobIDs
53
    , sortJobIDs
54
    , loadJobFromDisk
55
    , noSuchJob
56
    , readSerialFromDisk
57
    , allocateJobIds
58
    , allocateJobId
59
    , writeJobToDisk
60
    , replicateManyJobs
61
    , isQueueOpen
62
    , startJobs
63
    , cancelJob
64
    , queueDirPermissions
65
    , archiveJobs
66
    ) where
67

    
68
import Control.Applicative (liftA2, (<|>))
69
import Control.Arrow (first, second)
70
import Control.Concurrent (forkIO)
71
import Control.Concurrent.MVar
72
import Control.Exception
73
import Control.Monad
74
import Data.Functor ((<$))
75
import Data.List
76
import Data.Maybe
77
import Data.Ord (comparing)
78
-- workaround what seems to be a bug in ghc 7.4's TH shadowing code
79
import Prelude hiding (id, log)
80
import System.Directory
81
import System.FilePath
82
import System.IO.Error (isDoesNotExistError)
83
import System.Posix.Files
84
import System.Time
85
import qualified Text.JSON
86
import Text.JSON.Types
87

    
88
import Ganeti.BasicTypes
89
import qualified Ganeti.Config as Config
90
import qualified Ganeti.Constants as C
91
import Ganeti.Errors (ErrorResult)
92
import Ganeti.JSON
93
import Ganeti.Logging
94
import Ganeti.Luxi
95
import Ganeti.Objects (ConfigData, Node)
96
import Ganeti.OpCodes
97
import Ganeti.Path
98
import Ganeti.Rpc (executeRpcCall, ERpcError, logRpcErrors,
99
                   RpcCallJobqueueUpdate(..), RpcCallJobqueueRename(..))
100
import Ganeti.THH
101
import Ganeti.Types
102
import Ganeti.Utils
103
import Ganeti.VCluster (makeVirtualPath)
104

    
105
-- * Data types
106

    
107
-- | The ganeti queue timestamp type. It represents the time as the pair
108
-- of seconds since the epoch and microseconds since the beginning of the
109
-- second.
110
type Timestamp = (Int, Int)
111

    
112
-- | Missing timestamp type.
113
noTimestamp :: Timestamp
114
noTimestamp = (-1, -1)
115

    
116
-- | Obtain a Timestamp from a given clock time
117
fromClockTime :: ClockTime -> Timestamp
118
fromClockTime (TOD ctime pico) =
119
  (fromIntegral ctime, fromIntegral $ pico `div` 1000000)
120

    
121
-- | Get the current time in the job-queue timestamp format.
122
currentTimestamp :: IO Timestamp
123
currentTimestamp = fromClockTime `liftM` getClockTime
124

    
125
-- | From a given timestamp, obtain the timestamp of the
126
-- time that is the given number of seconds later.
127
advanceTimestamp :: Int -> Timestamp -> Timestamp
128
advanceTimestamp = first . (+)
129

    
130
-- | An input opcode.
131
data InputOpCode = ValidOpCode MetaOpCode -- ^ OpCode was parsed successfully
132
                 | InvalidOpCode JSValue  -- ^ Invalid opcode
133
                   deriving (Show, Eq)
134

    
135
-- | JSON instance for 'InputOpCode', trying to parse it and if
136
-- failing, keeping the original JSValue.
137
instance Text.JSON.JSON InputOpCode where
138
  showJSON (ValidOpCode mo) = Text.JSON.showJSON mo
139
  showJSON (InvalidOpCode inv) = inv
140
  readJSON v = case Text.JSON.readJSON v of
141
                 Text.JSON.Error _ -> return $ InvalidOpCode v
142
                 Text.JSON.Ok mo -> return $ ValidOpCode mo
143

    
144
-- | Invalid opcode summary.
145
invalidOp :: String
146
invalidOp = "INVALID_OP"
147

    
148
-- | Tries to extract the opcode summary from an 'InputOpCode'. This
149
-- duplicates some functionality from the 'opSummary' function in
150
-- "Ganeti.OpCodes".
151
extractOpSummary :: InputOpCode -> String
152
extractOpSummary (ValidOpCode metaop) = opSummary $ metaOpCode metaop
153
extractOpSummary (InvalidOpCode (JSObject o)) =
154
  case fromObjWithDefault (fromJSObject o) "OP_ID" ("OP_" ++ invalidOp) of
155
    Just s -> drop 3 s -- drop the OP_ prefix
156
    Nothing -> invalidOp
157
extractOpSummary _ = invalidOp
158

    
159
$(buildObject "QueuedOpCode" "qo"
160
  [ simpleField "input"           [t| InputOpCode |]
161
  , simpleField "status"          [t| OpStatus    |]
162
  , simpleField "result"          [t| JSValue     |]
163
  , defaultField [| [] |] $
164
    simpleField "log"             [t| [(Int, Timestamp, ELogType, JSValue)] |]
165
  , simpleField "priority"        [t| Int         |]
166
  , optionalNullSerField $
167
    simpleField "start_timestamp" [t| Timestamp   |]
168
  , optionalNullSerField $
169
    simpleField "exec_timestamp"  [t| Timestamp   |]
170
  , optionalNullSerField $
171
    simpleField "end_timestamp"   [t| Timestamp   |]
172
  ])
173

    
174
$(buildObject "QueuedJob" "qj"
175
  [ simpleField "id"                 [t| JobId          |]
176
  , simpleField "ops"                [t| [QueuedOpCode] |]
177
  , optionalNullSerField $
178
    simpleField "received_timestamp" [t| Timestamp      |]
179
  , optionalNullSerField $
180
    simpleField "start_timestamp"    [t| Timestamp      |]
181
  , optionalNullSerField $
182
    simpleField "end_timestamp"      [t| Timestamp      |]
183
  ])
184

    
185
-- | Convenience function to obtain a QueuedOpCode from a MetaOpCode
186
queuedOpCodeFromMetaOpCode :: MetaOpCode -> QueuedOpCode
187
queuedOpCodeFromMetaOpCode op =
188
  QueuedOpCode { qoInput = ValidOpCode op
189
               , qoStatus = OP_STATUS_QUEUED
190
               , qoPriority = opSubmitPriorityToRaw . opPriority . metaParams
191
                              $ op
192
               , qoLog = []
193
               , qoResult = JSNull
194
               , qoStartTimestamp = Nothing
195
               , qoEndTimestamp = Nothing
196
               , qoExecTimestamp = Nothing
197
               }
198

    
199
-- | From a job-id and a list of op-codes create a job. This is
200
-- the pure part of job creation, as allocating a new job id
201
-- lives in IO.
202
queuedJobFromOpCodes :: (Monad m) => JobId -> [MetaOpCode] -> m QueuedJob
203
queuedJobFromOpCodes jobid ops = do
204
  ops' <- mapM (`resolveDependencies` jobid) ops
205
  return QueuedJob { qjId = jobid
206
                   , qjOps = map queuedOpCodeFromMetaOpCode ops'
207
                   , qjReceivedTimestamp = Nothing 
208
                   , qjStartTimestamp = Nothing
209
                   , qjEndTimestamp = Nothing
210
                   }
211

    
212
-- | Attach a received timestamp to a Queued Job.
213
setReceivedTimestamp :: Timestamp -> QueuedJob -> QueuedJob
214
setReceivedTimestamp ts job = job { qjReceivedTimestamp = Just ts }
215

    
216
-- | Set the state of a QueuedOpCode to canceled.
217
cancelOpCode :: Timestamp -> QueuedOpCode -> QueuedOpCode
218
cancelOpCode now op =
219
  op { qoStatus = OP_STATUS_CANCELED, qoEndTimestamp = Just now }
220

    
221
-- | Transform a QueuedJob that has not been started into its canceled form.
222
cancelQueuedJob :: Timestamp -> QueuedJob -> QueuedJob
223
cancelQueuedJob now job =
224
  let ops' = map (cancelOpCode now) $ qjOps job
225
  in job { qjOps = ops', qjEndTimestamp = Just now}
226

    
227
-- | Job file prefix.
228
jobFilePrefix :: String
229
jobFilePrefix = "job-"
230

    
231
-- | Computes the filename for a given job ID.
232
jobFileName :: JobId -> FilePath
233
jobFileName jid = jobFilePrefix ++ show (fromJobId jid)
234

    
235
-- | Parses a job ID from a file name.
236
parseJobFileId :: (Monad m) => FilePath -> m JobId
237
parseJobFileId path =
238
  case stripPrefix jobFilePrefix path of
239
    Nothing -> fail $ "Job file '" ++ path ++
240
                      "' doesn't have the correct prefix"
241
    Just suffix -> makeJobIdS suffix
242

    
243
-- | Computes the full path to a live job.
244
liveJobFile :: FilePath -> JobId -> FilePath
245
liveJobFile rootdir jid = rootdir </> jobFileName jid
246

    
247
-- | Computes the full path to an archives job. BROKEN.
248
archivedJobFile :: FilePath -> JobId -> FilePath
249
archivedJobFile rootdir jid =
250
  let subdir = show (fromJobId jid `div` C.jstoreJobsPerArchiveDirectory)
251
  in rootdir </> jobQueueArchiveSubDir </> subdir </> jobFileName jid
252

    
253
-- | Map from opcode status to job status.
254
opStatusToJob :: OpStatus -> JobStatus
255
opStatusToJob OP_STATUS_QUEUED    = JOB_STATUS_QUEUED
256
opStatusToJob OP_STATUS_WAITING   = JOB_STATUS_WAITING
257
opStatusToJob OP_STATUS_SUCCESS   = JOB_STATUS_SUCCESS
258
opStatusToJob OP_STATUS_RUNNING   = JOB_STATUS_RUNNING
259
opStatusToJob OP_STATUS_CANCELING = JOB_STATUS_CANCELING
260
opStatusToJob OP_STATUS_CANCELED  = JOB_STATUS_CANCELED
261
opStatusToJob OP_STATUS_ERROR     = JOB_STATUS_ERROR
262

    
263
-- | Computes a queued job's status.
264
calcJobStatus :: QueuedJob -> JobStatus
265
calcJobStatus QueuedJob { qjOps = ops } =
266
  extractOpSt (map qoStatus ops) JOB_STATUS_QUEUED True
267
    where
268
      terminalStatus OP_STATUS_ERROR     = True
269
      terminalStatus OP_STATUS_CANCELING = True
270
      terminalStatus OP_STATUS_CANCELED  = True
271
      terminalStatus _                   = False
272
      softStatus     OP_STATUS_SUCCESS   = True
273
      softStatus     OP_STATUS_QUEUED    = True
274
      softStatus     _                   = False
275
      extractOpSt [] _ True = JOB_STATUS_SUCCESS
276
      extractOpSt [] d False = d
277
      extractOpSt (x:xs) d old_all
278
           | terminalStatus x = opStatusToJob x -- abort recursion
279
           | softStatus x     = extractOpSt xs d new_all -- continue unchanged
280
           | otherwise        = extractOpSt xs (opStatusToJob x) new_all
281
           where new_all = x == OP_STATUS_SUCCESS && old_all
282

    
283
-- | Determine if a job has started
284
jobStarted :: QueuedJob -> Bool
285
jobStarted = (> JOB_STATUS_QUEUED) . calcJobStatus
286

    
287
-- | Determine if a job is finalised.
288
jobFinalized :: QueuedJob -> Bool
289
jobFinalized = (> JOB_STATUS_RUNNING) . calcJobStatus
290

    
291
-- | Determine if a job is finalized and its timestamp is before
292
-- a given time.
293
jobArchivable :: Timestamp -> QueuedJob -> Bool
294
jobArchivable ts = liftA2 (&&) jobFinalized 
295
  $ maybe False (< ts)
296
    .  liftA2 (<|>) qjEndTimestamp qjStartTimestamp
297

    
298
-- | Determine whether an opcode status is finalized.
299
opStatusFinalized :: OpStatus -> Bool
300
opStatusFinalized = (> OP_STATUS_RUNNING)
301

    
302
-- | Compute a job's priority.
303
calcJobPriority :: QueuedJob -> Int
304
calcJobPriority QueuedJob { qjOps = ops } =
305
  helper . map qoPriority $ filter (not . opStatusFinalized . qoStatus) ops
306
    where helper [] = C.opPrioDefault
307
          helper ps = minimum ps
308

    
309
-- | Log but ignore an 'IOError'.
310
ignoreIOError :: a -> Bool -> String -> IOError -> IO a
311
ignoreIOError a ignore_noent msg e = do
312
  unless (isDoesNotExistError e && ignore_noent) .
313
    logWarning $ msg ++ ": " ++ show e
314
  return a
315

    
316
-- | Compute the list of existing archive directories. Note that I/O
317
-- exceptions are swallowed and ignored.
318
allArchiveDirs :: FilePath -> IO [FilePath]
319
allArchiveDirs rootdir = do
320
  let adir = rootdir </> jobQueueArchiveSubDir
321
  contents <- getDirectoryContents adir `Control.Exception.catch`
322
               ignoreIOError [] False
323
                 ("Failed to list queue directory " ++ adir)
324
  let fpaths = map (adir </>) $ filter (not . ("." `isPrefixOf`)) contents
325
  filterM (\path ->
326
             liftM isDirectory (getFileStatus (adir </> path))
327
               `Control.Exception.catch`
328
               ignoreIOError False True
329
                 ("Failed to stat archive path " ++ path)) fpaths
330

    
331
-- | Build list of directories containing job files. Note: compared to
332
-- the Python version, this doesn't ignore a potential lost+found
333
-- file.
334
determineJobDirectories :: FilePath -> Bool -> IO [FilePath]
335
determineJobDirectories rootdir archived = do
336
  other <- if archived
337
             then allArchiveDirs rootdir
338
             else return []
339
  return $ rootdir:other
340

    
341
-- Function equivalent to the \'sequence\' function, that cannot be used because
342
-- of library version conflict on Lucid.
343
-- FIXME: delete this and just use \'sequence\' instead when Lucid compatibility
344
-- will not be required anymore.
345
sequencer :: [Either IOError [JobId]] -> Either IOError [[JobId]]
346
sequencer l = fmap reverse $ foldl seqFolder (Right []) l
347

    
348
-- | Folding function for joining multiple [JobIds] into one list.
349
seqFolder :: Either IOError [[JobId]]
350
          -> Either IOError [JobId]
351
          -> Either IOError [[JobId]]
352
seqFolder (Left e) _ = Left e
353
seqFolder (Right _) (Left e) = Left e
354
seqFolder (Right l) (Right el) = Right $ el:l
355

    
356
-- | Computes the list of all jobs in the given directories.
357
getJobIDs :: [FilePath] -> IO (Either IOError [JobId])
358
getJobIDs paths = liftM (fmap concat . sequencer) (mapM getDirJobIDs paths)
359

    
360
-- | Sorts the a list of job IDs.
361
sortJobIDs :: [JobId] -> [JobId]
362
sortJobIDs = sortBy (comparing fromJobId)
363

    
364
-- | Computes the list of jobs in a given directory.
365
getDirJobIDs :: FilePath -> IO (Either IOError [JobId])
366
getDirJobIDs path = do
367
  either_contents <-
368
    try (getDirectoryContents path) :: IO (Either IOError [FilePath])
369
  case either_contents of
370
    Left e -> do
371
      logWarning $ "Failed to list job directory " ++ path ++ ": " ++ show e
372
      return $ Left e
373
    Right contents -> do
374
      let jids = foldl (\ids file ->
375
                         case parseJobFileId file of
376
                           Nothing -> ids
377
                           Just new_id -> new_id:ids) [] contents
378
      return . Right $ reverse jids
379

    
380
-- | Reads the job data from disk.
381
readJobDataFromDisk :: FilePath -> Bool -> JobId -> IO (Maybe (String, Bool))
382
readJobDataFromDisk rootdir archived jid = do
383
  let live_path = liveJobFile rootdir jid
384
      archived_path = archivedJobFile rootdir jid
385
      all_paths = if archived
386
                    then [(live_path, False), (archived_path, True)]
387
                    else [(live_path, False)]
388
  foldM (\state (path, isarchived) ->
389
           liftM (\r -> Just (r, isarchived)) (readFile path)
390
             `Control.Exception.catch`
391
             ignoreIOError state True
392
               ("Failed to read job file " ++ path)) Nothing all_paths
393

    
394
-- | Failed to load job error.
395
noSuchJob :: Result (QueuedJob, Bool)
396
noSuchJob = Bad "Can't load job file"
397

    
398
-- | Loads a job from disk.
399
loadJobFromDisk :: FilePath -> Bool -> JobId -> IO (Result (QueuedJob, Bool))
400
loadJobFromDisk rootdir archived jid = do
401
  raw <- readJobDataFromDisk rootdir archived jid
402
  -- note: we need some stricness below, otherwise the wrapping in a
403
  -- Result will create too much lazyness, and not close the file
404
  -- descriptors for the individual jobs
405
  return $! case raw of
406
             Nothing -> noSuchJob
407
             Just (str, arch) ->
408
               liftM (\qj -> (qj, arch)) .
409
               fromJResult "Parsing job file" $ Text.JSON.decode str
410

    
411
-- | Write a job to disk.
412
writeJobToDisk :: FilePath -> QueuedJob -> IO (Result ())
413
writeJobToDisk rootdir job = do
414
  let filename = liveJobFile rootdir . qjId $ job
415
      content = Text.JSON.encode . Text.JSON.showJSON $ job
416
  tryAndLogIOError (atomicWriteFile filename content)
417
                   ("Failed to write " ++ filename) Ok
418

    
419
-- | Replicate a job to all master candidates.
420
replicateJob :: FilePath -> [Node] -> QueuedJob -> IO [(Node, ERpcError ())]
421
replicateJob rootdir mastercandidates job = do
422
  let filename = liveJobFile rootdir . qjId $ job
423
      content = Text.JSON.encode . Text.JSON.showJSON $ job
424
  filename' <- makeVirtualPath filename
425
  callresult <- executeRpcCall mastercandidates
426
                  $ RpcCallJobqueueUpdate filename' content
427
  let result = map (second (() <$)) callresult
428
  logRpcErrors result
429
  return result
430

    
431
-- | Replicate many jobs to all master candidates.
432
replicateManyJobs :: FilePath -> [Node] -> [QueuedJob] -> IO ()
433
replicateManyJobs rootdir mastercandidates =
434
  mapM_ (replicateJob rootdir mastercandidates)
435

    
436
-- | Read the job serial number from disk.
437
readSerialFromDisk :: IO (Result JobId)
438
readSerialFromDisk = do
439
  filename <- jobQueueSerialFile
440
  tryAndLogIOError (readFile filename) "Failed to read serial file"
441
                   (makeJobIdS . rStripSpace)
442

    
443
-- | Allocate new job ids.
444
-- To avoid races while accessing the serial file, the threads synchronize
445
-- over a lock, as usual provided by an MVar.
446
allocateJobIds :: [Node] -> MVar () -> Int -> IO (Result [JobId])
447
allocateJobIds mastercandidates lock n =
448
  if n <= 0
449
    then return . Bad $ "Can only allocate positive number of job ids"
450
    else do
451
      takeMVar lock
452
      rjobid <- readSerialFromDisk
453
      case rjobid of
454
        Bad s -> do
455
          putMVar lock ()
456
          return . Bad $ s
457
        Ok jid -> do
458
          let current = fromJobId jid
459
              serial_content = show (current + n) ++  "\n"
460
          serial <- jobQueueSerialFile
461
          write_result <- try $ atomicWriteFile serial serial_content
462
                          :: IO (Either IOError ())
463
          case write_result of
464
            Left e -> do
465
              putMVar lock ()
466
              let msg = "Failed to write serial file: " ++ show e
467
              logError msg
468
              return . Bad $ msg 
469
            Right () -> do
470
              serial' <- makeVirtualPath serial
471
              _ <- executeRpcCall mastercandidates
472
                     $ RpcCallJobqueueUpdate serial' serial_content
473
              putMVar lock ()
474
              return $ mapM makeJobId [(current+1)..(current+n)]
475

    
476
-- | Allocate one new job id.
477
allocateJobId :: [Node] -> MVar () -> IO (Result JobId)
478
allocateJobId mastercandidates lock = do
479
  jids <- allocateJobIds mastercandidates lock 1
480
  return (jids >>= monadicThe "Failed to allocate precisely one Job ID")
481

    
482
-- | Decide if job queue is open
483
isQueueOpen :: IO Bool
484
isQueueOpen = liftM not (jobQueueDrainFile >>= doesFileExist)
485

    
486
-- | Start enqueued jobs, currently by handing them over to masterd.
487
startJobs :: [QueuedJob] -> IO ()
488
startJobs jobs = do
489
  socketpath <- defaultMasterSocket
490
  client <- getLuxiClient socketpath
491
  pickupResults <- mapM (flip callMethod client . PickupJob . qjId) jobs
492
  let failures = map show $ justBad pickupResults
493
  unless (null failures)
494
   . logWarning . (++) "Failed to notify masterd: " . commaJoin $ failures
495

    
496
-- | Try to cancel a job that has already been handed over to execution,
497
-- currently by asking masterd to cancel it.
498
cancelJob :: JobId -> IO (ErrorResult JSValue)
499
cancelJob jid = do
500
  socketpath <- defaultMasterSocket
501
  client <- getLuxiClient socketpath
502
  callMethod (CancelJob jid) client
503

    
504
-- | Permissions for the archive directories.
505
queueDirPermissions :: FilePermissions
506
queueDirPermissions = FilePermissions { fpOwner = Just C.masterdUser
507
                                      , fpGroup = Just C.daemonsGroup
508
                                      , fpPermissions = 0o0750
509
                                      }
510

    
511
-- | Try, at most until the given endtime, to archive some of the given
512
-- jobs, if they are older than the specified cut-off time; also replicate
513
-- archival of the additional jobs. Return the pair of the number of jobs
514
-- archived, and the number of jobs remaining int he queue, asuming the
515
-- given numbers about the not considered jobs.
516
archiveSomeJobsUntil :: ([JobId] -> IO ()) -- ^ replication function
517
                        -> FilePath -- ^ queue root directory
518
                        -> ClockTime -- ^ Endtime
519
                        -> Timestamp -- ^ cut-off time for archiving jobs
520
                        -> Int -- ^ number of jobs alread archived
521
                        -> [JobId] -- ^ Additional jobs to replicate
522
                        -> [JobId] -- ^ List of job-ids still to consider
523
                        -> IO (Int, Int)
524
archiveSomeJobsUntil replicateFn _ _ _ arch torepl [] = do
525
  unless (null torepl) . (>> return ())
526
   . forkIO $ replicateFn torepl
527
  return (arch, 0)
528

    
529
archiveSomeJobsUntil replicateFn qDir endt cutt arch torepl (jid:jids) = do
530
  let archiveMore = archiveSomeJobsUntil replicateFn qDir endt cutt
531
      continue = archiveMore arch torepl jids
532
      jidname = show $ fromJobId jid
533
  time <- getClockTime
534
  if time >= endt
535
    then do
536
      _ <- forkIO $ replicateFn torepl
537
      return (arch, length (jid:jids))
538
    else do
539
      logDebug $ "Inspecting job " ++ jidname ++ " for archival"
540
      loadResult <- loadJobFromDisk qDir False jid
541
      case loadResult of
542
        Bad _ -> continue
543
        Ok (job, _) -> 
544
          if jobArchivable cutt job
545
            then do
546
              let live = liveJobFile qDir jid
547
                  archive = archivedJobFile qDir jid
548
              renameResult <- safeRenameFile queueDirPermissions
549
                                live archive
550
              case renameResult of                   
551
                Bad s -> do
552
                  logWarning $ "Renaming " ++ live ++ " to " ++ archive
553
                                 ++ " failed unexpectedly: " ++ s
554
                  continue
555
                Ok () -> do
556
                  let torepl' = jid:torepl
557
                  if length torepl' >= 10
558
                    then do
559
                      _ <- forkIO $ replicateFn torepl'
560
                      archiveMore (arch + 1) [] jids
561
                    else archiveMore (arch + 1) torepl' jids
562
            else continue
563
                   
564
-- | Archive jobs older than the given time, but do not exceed the timeout for
565
-- carrying out this task.
566
archiveJobs :: ConfigData -- ^ cluster configuration
567
               -> Int  -- ^ time the job has to be in the past in order
568
                       -- to be archived
569
               -> Int -- ^ timeout
570
               -> [JobId] -- ^ jobs to consider
571
               -> IO (Int, Int)
572
archiveJobs cfg age timeout jids = do
573
  now <- getClockTime
574
  qDir <- queueDir
575
  let endtime = addToClockTime (noTimeDiff { tdSec = timeout }) now
576
      cuttime = if age < 0 then noTimestamp
577
                           else advanceTimestamp (- age) (fromClockTime now)
578
      mcs = Config.getMasterCandidates cfg
579
      replicateFn jobs = do
580
        let olds = map (liveJobFile qDir) jobs
581
            news = map (archivedJobFile qDir) jobs
582
        _ <- executeRpcCall mcs . RpcCallJobqueueRename $ zip olds news
583
        return ()
584
  archiveSomeJobsUntil replicateFn qDir endtime cuttime 0 [] jids