Statistics
| Branch: | Tag: | Revision:

root / src / Ganeti / JQueue.hs @ 13d26b66

History | View | Annotate | Download (21.4 kB)

1
{-# LANGUAGE TemplateHaskell #-}
2

    
3
{-| Implementation of the job queue.
4

    
5
-}
6

    
7
{-
8

    
9
Copyright (C) 2010, 2012 Google Inc.
10

    
11
This program is free software; you can redistribute it and/or modify
12
it under the terms of the GNU General Public License as published by
13
the Free Software Foundation; either version 2 of the License, or
14
(at your option) any later version.
15

    
16
This program is distributed in the hope that it will be useful, but
17
WITHOUT ANY WARRANTY; without even the implied warranty of
18
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
19
General Public License for more details.
20

    
21
You should have received a copy of the GNU General Public License
22
along with this program; if not, write to the Free Software
23
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
24
02110-1301, USA.
25

    
26
-}
27

    
28
module Ganeti.JQueue
29
    ( QueuedOpCode(..)
30
    , QueuedJob(..)
31
    , InputOpCode(..)
32
    , queuedOpCodeFromMetaOpCode
33
    , queuedJobFromOpCodes
34
    , changeOpCodePriority
35
    , changeJobPriority
36
    , cancelQueuedJob
37
    , Timestamp
38
    , fromClockTime
39
    , noTimestamp
40
    , currentTimestamp
41
    , advanceTimestamp
42
    , setReceivedTimestamp
43
    , opStatusFinalized
44
    , extractOpSummary
45
    , calcJobStatus
46
    , jobStarted
47
    , jobFinalized
48
    , jobArchivable
49
    , calcJobPriority
50
    , jobFileName
51
    , liveJobFile
52
    , archivedJobFile
53
    , determineJobDirectories
54
    , getJobIDs
55
    , sortJobIDs
56
    , loadJobFromDisk
57
    , noSuchJob
58
    , readSerialFromDisk
59
    , allocateJobIds
60
    , allocateJobId
61
    , writeJobToDisk
62
    , replicateManyJobs
63
    , isQueueOpen
64
    , startJobs
65
    , cancelJob
66
    , queueDirPermissions
67
    , archiveJobs
68
    ) where
69

    
70
import Control.Applicative (liftA2, (<|>))
71
import Control.Arrow (first, second)
72
import Control.Concurrent (forkIO)
73
import Control.Concurrent.MVar
74
import Control.Exception
75
import Control.Monad
76
import Control.Monad.IO.Class
77
import Data.Functor ((<$))
78
import Data.List
79
import Data.Maybe
80
import Data.Ord (comparing)
81
-- workaround what seems to be a bug in ghc 7.4's TH shadowing code
82
import Prelude hiding (id, log)
83
import System.Directory
84
import System.FilePath
85
import System.IO.Error (isDoesNotExistError)
86
import System.Posix.Files
87
import System.Time
88
import qualified Text.JSON
89
import Text.JSON.Types
90

    
91
import Ganeti.BasicTypes
92
import qualified Ganeti.Config as Config
93
import qualified Ganeti.Constants as C
94
import Ganeti.Errors (ErrorResult)
95
import Ganeti.JSON
96
import Ganeti.Logging
97
import Ganeti.Luxi
98
import Ganeti.Objects (ConfigData, Node)
99
import Ganeti.OpCodes
100
import Ganeti.Path
101
import Ganeti.Rpc (executeRpcCall, ERpcError, logRpcErrors,
102
                   RpcCallJobqueueUpdate(..), RpcCallJobqueueRename(..))
103
import Ganeti.THH
104
import Ganeti.Types
105
import Ganeti.Utils
106
import Ganeti.Utils.Atomic
107
import Ganeti.VCluster (makeVirtualPath)
108

    
109
-- * Data types
110

    
111
-- | The ganeti queue timestamp type. It represents the time as the pair
112
-- of seconds since the epoch and microseconds since the beginning of the
113
-- second.
114
type Timestamp = (Int, Int)
115

    
116
-- | Missing timestamp type.
117
noTimestamp :: Timestamp
118
noTimestamp = (-1, -1)
119

    
120
-- | Obtain a Timestamp from a given clock time
121
fromClockTime :: ClockTime -> Timestamp
122
fromClockTime (TOD ctime pico) =
123
  (fromIntegral ctime, fromIntegral $ pico `div` 1000000)
124

    
125
-- | Get the current time in the job-queue timestamp format.
126
currentTimestamp :: IO Timestamp
127
currentTimestamp = fromClockTime `liftM` getClockTime
128

    
129
-- | From a given timestamp, obtain the timestamp of the
130
-- time that is the given number of seconds later.
131
advanceTimestamp :: Int -> Timestamp -> Timestamp
132
advanceTimestamp = first . (+)
133

    
134
-- | An input opcode.
135
data InputOpCode = ValidOpCode MetaOpCode -- ^ OpCode was parsed successfully
136
                 | InvalidOpCode JSValue  -- ^ Invalid opcode
137
                   deriving (Show, Eq)
138

    
139
-- | JSON instance for 'InputOpCode', trying to parse it and if
140
-- failing, keeping the original JSValue.
141
instance Text.JSON.JSON InputOpCode where
142
  showJSON (ValidOpCode mo) = Text.JSON.showJSON mo
143
  showJSON (InvalidOpCode inv) = inv
144
  readJSON v = case Text.JSON.readJSON v of
145
                 Text.JSON.Error _ -> return $ InvalidOpCode v
146
                 Text.JSON.Ok mo -> return $ ValidOpCode mo
147

    
148
-- | Invalid opcode summary.
149
invalidOp :: String
150
invalidOp = "INVALID_OP"
151

    
152
-- | Tries to extract the opcode summary from an 'InputOpCode'. This
153
-- duplicates some functionality from the 'opSummary' function in
154
-- "Ganeti.OpCodes".
155
extractOpSummary :: InputOpCode -> String
156
extractOpSummary (ValidOpCode metaop) = opSummary $ metaOpCode metaop
157
extractOpSummary (InvalidOpCode (JSObject o)) =
158
  case fromObjWithDefault (fromJSObject o) "OP_ID" ("OP_" ++ invalidOp) of
159
    Just s -> drop 3 s -- drop the OP_ prefix
160
    Nothing -> invalidOp
161
extractOpSummary _ = invalidOp
162

    
163
$(buildObject "QueuedOpCode" "qo"
164
  [ simpleField "input"           [t| InputOpCode |]
165
  , simpleField "status"          [t| OpStatus    |]
166
  , simpleField "result"          [t| JSValue     |]
167
  , defaultField [| [] |] $
168
    simpleField "log"             [t| [(Int, Timestamp, ELogType, JSValue)] |]
169
  , simpleField "priority"        [t| Int         |]
170
  , optionalNullSerField $
171
    simpleField "start_timestamp" [t| Timestamp   |]
172
  , optionalNullSerField $
173
    simpleField "exec_timestamp"  [t| Timestamp   |]
174
  , optionalNullSerField $
175
    simpleField "end_timestamp"   [t| Timestamp   |]
176
  ])
177

    
178
$(buildObject "QueuedJob" "qj"
179
  [ simpleField "id"                 [t| JobId          |]
180
  , simpleField "ops"                [t| [QueuedOpCode] |]
181
  , optionalNullSerField $
182
    simpleField "received_timestamp" [t| Timestamp      |]
183
  , optionalNullSerField $
184
    simpleField "start_timestamp"    [t| Timestamp      |]
185
  , optionalNullSerField $
186
    simpleField "end_timestamp"      [t| Timestamp      |]
187
  ])
188

    
189
-- | Convenience function to obtain a QueuedOpCode from a MetaOpCode
190
queuedOpCodeFromMetaOpCode :: MetaOpCode -> QueuedOpCode
191
queuedOpCodeFromMetaOpCode op =
192
  QueuedOpCode { qoInput = ValidOpCode op
193
               , qoStatus = OP_STATUS_QUEUED
194
               , qoPriority = opSubmitPriorityToRaw . opPriority . metaParams
195
                              $ op
196
               , qoLog = []
197
               , qoResult = JSNull
198
               , qoStartTimestamp = Nothing
199
               , qoEndTimestamp = Nothing
200
               , qoExecTimestamp = Nothing
201
               }
202

    
203
-- | From a job-id and a list of op-codes create a job. This is
204
-- the pure part of job creation, as allocating a new job id
205
-- lives in IO.
206
queuedJobFromOpCodes :: (Monad m) => JobId -> [MetaOpCode] -> m QueuedJob
207
queuedJobFromOpCodes jobid ops = do
208
  ops' <- mapM (`resolveDependencies` jobid) ops
209
  return QueuedJob { qjId = jobid
210
                   , qjOps = map queuedOpCodeFromMetaOpCode ops'
211
                   , qjReceivedTimestamp = Nothing 
212
                   , qjStartTimestamp = Nothing
213
                   , qjEndTimestamp = Nothing
214
                   }
215

    
216
-- | Attach a received timestamp to a Queued Job.
217
setReceivedTimestamp :: Timestamp -> QueuedJob -> QueuedJob
218
setReceivedTimestamp ts job = job { qjReceivedTimestamp = Just ts }
219

    
220
-- | Change the priority of a QueuedOpCode, if it is not already
221
-- finalized.
222
changeOpCodePriority :: Int -> QueuedOpCode -> QueuedOpCode
223
changeOpCodePriority prio op =
224
  if qoStatus op > OP_STATUS_RUNNING
225
     then op
226
     else op { qoPriority = prio }
227

    
228
-- | Set the state of a QueuedOpCode to canceled.
229
cancelOpCode :: Timestamp -> QueuedOpCode -> QueuedOpCode
230
cancelOpCode now op =
231
  op { qoStatus = OP_STATUS_CANCELED, qoEndTimestamp = Just now }
232

    
233
-- | Change the priority of a job, i.e., change the priority of the
234
-- non-finalized opcodes.
235
changeJobPriority :: Int -> QueuedJob -> QueuedJob
236
changeJobPriority prio job =
237
  job { qjOps = map (changeOpCodePriority prio) $ qjOps job }
238

    
239
-- | Transform a QueuedJob that has not been started into its canceled form.
240
cancelQueuedJob :: Timestamp -> QueuedJob -> QueuedJob
241
cancelQueuedJob now job =
242
  let ops' = map (cancelOpCode now) $ qjOps job
243
  in job { qjOps = ops', qjEndTimestamp = Just now}
244

    
245
-- | Job file prefix.
246
jobFilePrefix :: String
247
jobFilePrefix = "job-"
248

    
249
-- | Computes the filename for a given job ID.
250
jobFileName :: JobId -> FilePath
251
jobFileName jid = jobFilePrefix ++ show (fromJobId jid)
252

    
253
-- | Parses a job ID from a file name.
254
parseJobFileId :: (Monad m) => FilePath -> m JobId
255
parseJobFileId path =
256
  case stripPrefix jobFilePrefix path of
257
    Nothing -> fail $ "Job file '" ++ path ++
258
                      "' doesn't have the correct prefix"
259
    Just suffix -> makeJobIdS suffix
260

    
261
-- | Computes the full path to a live job.
262
liveJobFile :: FilePath -> JobId -> FilePath
263
liveJobFile rootdir jid = rootdir </> jobFileName jid
264

    
265
-- | Computes the full path to an archives job. BROKEN.
266
archivedJobFile :: FilePath -> JobId -> FilePath
267
archivedJobFile rootdir jid =
268
  let subdir = show (fromJobId jid `div` C.jstoreJobsPerArchiveDirectory)
269
  in rootdir </> jobQueueArchiveSubDir </> subdir </> jobFileName jid
270

    
271
-- | Map from opcode status to job status.
272
opStatusToJob :: OpStatus -> JobStatus
273
opStatusToJob OP_STATUS_QUEUED    = JOB_STATUS_QUEUED
274
opStatusToJob OP_STATUS_WAITING   = JOB_STATUS_WAITING
275
opStatusToJob OP_STATUS_SUCCESS   = JOB_STATUS_SUCCESS
276
opStatusToJob OP_STATUS_RUNNING   = JOB_STATUS_RUNNING
277
opStatusToJob OP_STATUS_CANCELING = JOB_STATUS_CANCELING
278
opStatusToJob OP_STATUS_CANCELED  = JOB_STATUS_CANCELED
279
opStatusToJob OP_STATUS_ERROR     = JOB_STATUS_ERROR
280

    
281
-- | Computes a queued job's status.
282
calcJobStatus :: QueuedJob -> JobStatus
283
calcJobStatus QueuedJob { qjOps = ops } =
284
  extractOpSt (map qoStatus ops) JOB_STATUS_QUEUED True
285
    where
286
      terminalStatus OP_STATUS_ERROR     = True
287
      terminalStatus OP_STATUS_CANCELING = True
288
      terminalStatus OP_STATUS_CANCELED  = True
289
      terminalStatus _                   = False
290
      softStatus     OP_STATUS_SUCCESS   = True
291
      softStatus     OP_STATUS_QUEUED    = True
292
      softStatus     _                   = False
293
      extractOpSt [] _ True = JOB_STATUS_SUCCESS
294
      extractOpSt [] d False = d
295
      extractOpSt (x:xs) d old_all
296
           | terminalStatus x = opStatusToJob x -- abort recursion
297
           | softStatus x     = extractOpSt xs d new_all -- continue unchanged
298
           | otherwise        = extractOpSt xs (opStatusToJob x) new_all
299
           where new_all = x == OP_STATUS_SUCCESS && old_all
300

    
301
-- | Determine if a job has started
302
jobStarted :: QueuedJob -> Bool
303
jobStarted = (> JOB_STATUS_QUEUED) . calcJobStatus
304

    
305
-- | Determine if a job is finalised.
306
jobFinalized :: QueuedJob -> Bool
307
jobFinalized = (> JOB_STATUS_RUNNING) . calcJobStatus
308

    
309
-- | Determine if a job is finalized and its timestamp is before
310
-- a given time.
311
jobArchivable :: Timestamp -> QueuedJob -> Bool
312
jobArchivable ts = liftA2 (&&) jobFinalized 
313
  $ maybe False (< ts)
314
    .  liftA2 (<|>) qjEndTimestamp qjStartTimestamp
315

    
316
-- | Determine whether an opcode status is finalized.
317
opStatusFinalized :: OpStatus -> Bool
318
opStatusFinalized = (> OP_STATUS_RUNNING)
319

    
320
-- | Compute a job's priority.
321
calcJobPriority :: QueuedJob -> Int
322
calcJobPriority QueuedJob { qjOps = ops } =
323
  helper . map qoPriority $ filter (not . opStatusFinalized . qoStatus) ops
324
    where helper [] = C.opPrioDefault
325
          helper ps = minimum ps
326

    
327
-- | Log but ignore an 'IOError'.
328
ignoreIOError :: a -> Bool -> String -> IOError -> IO a
329
ignoreIOError a ignore_noent msg e = do
330
  unless (isDoesNotExistError e && ignore_noent) .
331
    logWarning $ msg ++ ": " ++ show e
332
  return a
333

    
334
-- | Compute the list of existing archive directories. Note that I/O
335
-- exceptions are swallowed and ignored.
336
allArchiveDirs :: FilePath -> IO [FilePath]
337
allArchiveDirs rootdir = do
338
  let adir = rootdir </> jobQueueArchiveSubDir
339
  contents <- getDirectoryContents adir `Control.Exception.catch`
340
               ignoreIOError [] False
341
                 ("Failed to list queue directory " ++ adir)
342
  let fpaths = map (adir </>) $ filter (not . ("." `isPrefixOf`)) contents
343
  filterM (\path ->
344
             liftM isDirectory (getFileStatus (adir </> path))
345
               `Control.Exception.catch`
346
               ignoreIOError False True
347
                 ("Failed to stat archive path " ++ path)) fpaths
348

    
349
-- | Build list of directories containing job files. Note: compared to
350
-- the Python version, this doesn't ignore a potential lost+found
351
-- file.
352
determineJobDirectories :: FilePath -> Bool -> IO [FilePath]
353
determineJobDirectories rootdir archived = do
354
  other <- if archived
355
             then allArchiveDirs rootdir
356
             else return []
357
  return $ rootdir:other
358

    
359
-- | Computes the list of all jobs in the given directories.
360
getJobIDs :: [FilePath] -> IO (GenericResult IOError [JobId])
361
getJobIDs = runResultT . liftM concat . mapM getDirJobIDs
362

    
363
-- | Sorts the a list of job IDs.
364
sortJobIDs :: [JobId] -> [JobId]
365
sortJobIDs = sortBy (comparing fromJobId)
366

    
367
-- | Computes the list of jobs in a given directory.
368
getDirJobIDs :: FilePath -> ResultT IOError IO [JobId]
369
getDirJobIDs path =
370
  withErrorLogAt WARNING ("Failed to list job directory " ++ path) .
371
    liftM (mapMaybe parseJobFileId) $ liftIO (getDirectoryContents path)
372

    
373
-- | Reads the job data from disk.
374
readJobDataFromDisk :: FilePath -> Bool -> JobId -> IO (Maybe (String, Bool))
375
readJobDataFromDisk rootdir archived jid = do
376
  let live_path = liveJobFile rootdir jid
377
      archived_path = archivedJobFile rootdir jid
378
      all_paths = if archived
379
                    then [(live_path, False), (archived_path, True)]
380
                    else [(live_path, False)]
381
  foldM (\state (path, isarchived) ->
382
           liftM (\r -> Just (r, isarchived)) (readFile path)
383
             `Control.Exception.catch`
384
             ignoreIOError state True
385
               ("Failed to read job file " ++ path)) Nothing all_paths
386

    
387
-- | Failed to load job error.
388
noSuchJob :: Result (QueuedJob, Bool)
389
noSuchJob = Bad "Can't load job file"
390

    
391
-- | Loads a job from disk.
392
loadJobFromDisk :: FilePath -> Bool -> JobId -> IO (Result (QueuedJob, Bool))
393
loadJobFromDisk rootdir archived jid = do
394
  raw <- readJobDataFromDisk rootdir archived jid
395
  -- note: we need some stricness below, otherwise the wrapping in a
396
  -- Result will create too much lazyness, and not close the file
397
  -- descriptors for the individual jobs
398
  return $! case raw of
399
             Nothing -> noSuchJob
400
             Just (str, arch) ->
401
               liftM (\qj -> (qj, arch)) .
402
               fromJResult "Parsing job file" $ Text.JSON.decode str
403

    
404
-- | Write a job to disk.
405
writeJobToDisk :: FilePath -> QueuedJob -> IO (Result ())
406
writeJobToDisk rootdir job = do
407
  let filename = liveJobFile rootdir . qjId $ job
408
      content = Text.JSON.encode . Text.JSON.showJSON $ job
409
  tryAndLogIOError (atomicWriteFile filename content)
410
                   ("Failed to write " ++ filename) Ok
411

    
412
-- | Replicate a job to all master candidates.
413
replicateJob :: FilePath -> [Node] -> QueuedJob -> IO [(Node, ERpcError ())]
414
replicateJob rootdir mastercandidates job = do
415
  let filename = liveJobFile rootdir . qjId $ job
416
      content = Text.JSON.encode . Text.JSON.showJSON $ job
417
  filename' <- makeVirtualPath filename
418
  callresult <- executeRpcCall mastercandidates
419
                  $ RpcCallJobqueueUpdate filename' content
420
  let result = map (second (() <$)) callresult
421
  logRpcErrors result
422
  return result
423

    
424
-- | Replicate many jobs to all master candidates.
425
replicateManyJobs :: FilePath -> [Node] -> [QueuedJob] -> IO ()
426
replicateManyJobs rootdir mastercandidates =
427
  mapM_ (replicateJob rootdir mastercandidates)
428

    
429
-- | Read the job serial number from disk.
430
readSerialFromDisk :: IO (Result JobId)
431
readSerialFromDisk = do
432
  filename <- jobQueueSerialFile
433
  tryAndLogIOError (readFile filename) "Failed to read serial file"
434
                   (makeJobIdS . rStripSpace)
435

    
436
-- | Allocate new job ids.
437
-- To avoid races while accessing the serial file, the threads synchronize
438
-- over a lock, as usual provided by an MVar.
439
allocateJobIds :: [Node] -> MVar () -> Int -> IO (Result [JobId])
440
allocateJobIds mastercandidates lock n =
441
  if n <= 0
442
    then return . Bad $ "Can only allocate positive number of job ids"
443
    else do
444
      takeMVar lock
445
      rjobid <- readSerialFromDisk
446
      case rjobid of
447
        Bad s -> do
448
          putMVar lock ()
449
          return . Bad $ s
450
        Ok jid -> do
451
          let current = fromJobId jid
452
              serial_content = show (current + n) ++  "\n"
453
          serial <- jobQueueSerialFile
454
          write_result <- try $ atomicWriteFile serial serial_content
455
                          :: IO (Either IOError ())
456
          case write_result of
457
            Left e -> do
458
              putMVar lock ()
459
              let msg = "Failed to write serial file: " ++ show e
460
              logError msg
461
              return . Bad $ msg 
462
            Right () -> do
463
              serial' <- makeVirtualPath serial
464
              _ <- executeRpcCall mastercandidates
465
                     $ RpcCallJobqueueUpdate serial' serial_content
466
              putMVar lock ()
467
              return $ mapM makeJobId [(current+1)..(current+n)]
468

    
469
-- | Allocate one new job id.
470
allocateJobId :: [Node] -> MVar () -> IO (Result JobId)
471
allocateJobId mastercandidates lock = do
472
  jids <- allocateJobIds mastercandidates lock 1
473
  return (jids >>= monadicThe "Failed to allocate precisely one Job ID")
474

    
475
-- | Decide if job queue is open
476
isQueueOpen :: IO Bool
477
isQueueOpen = liftM not (jobQueueDrainFile >>= doesFileExist)
478

    
479
-- | Start enqueued jobs, currently by handing them over to masterd.
480
startJobs :: [QueuedJob] -> IO ()
481
startJobs jobs = do
482
  socketpath <- defaultMasterSocket
483
  client <- getLuxiClient socketpath
484
  pickupResults <- mapM (flip callMethod client . PickupJob . qjId) jobs
485
  let failures = map show $ justBad pickupResults
486
  unless (null failures)
487
   . logWarning . (++) "Failed to notify masterd: " . commaJoin $ failures
488

    
489
-- | Try to cancel a job that has already been handed over to execution,
490
-- currently by asking masterd to cancel it.
491
cancelJob :: JobId -> IO (ErrorResult JSValue)
492
cancelJob jid = do
493
  socketpath <- defaultMasterSocket
494
  client <- getLuxiClient socketpath
495
  callMethod (CancelJob jid) client
496

    
497
-- | Permissions for the archive directories.
498
queueDirPermissions :: FilePermissions
499
queueDirPermissions = FilePermissions { fpOwner = Just C.masterdUser
500
                                      , fpGroup = Just C.daemonsGroup
501
                                      , fpPermissions = 0o0750
502
                                      }
503

    
504
-- | Try, at most until the given endtime, to archive some of the given
505
-- jobs, if they are older than the specified cut-off time; also replicate
506
-- archival of the additional jobs. Return the pair of the number of jobs
507
-- archived, and the number of jobs remaining int he queue, asuming the
508
-- given numbers about the not considered jobs.
509
archiveSomeJobsUntil :: ([JobId] -> IO ()) -- ^ replication function
510
                        -> FilePath -- ^ queue root directory
511
                        -> ClockTime -- ^ Endtime
512
                        -> Timestamp -- ^ cut-off time for archiving jobs
513
                        -> Int -- ^ number of jobs alread archived
514
                        -> [JobId] -- ^ Additional jobs to replicate
515
                        -> [JobId] -- ^ List of job-ids still to consider
516
                        -> IO (Int, Int)
517
archiveSomeJobsUntil replicateFn _ _ _ arch torepl [] = do
518
  unless (null torepl) . (>> return ())
519
   . forkIO $ replicateFn torepl
520
  return (arch, 0)
521

    
522
archiveSomeJobsUntil replicateFn qDir endt cutt arch torepl (jid:jids) = do
523
  let archiveMore = archiveSomeJobsUntil replicateFn qDir endt cutt
524
      continue = archiveMore arch torepl jids
525
      jidname = show $ fromJobId jid
526
  time <- getClockTime
527
  if time >= endt
528
    then do
529
      _ <- forkIO $ replicateFn torepl
530
      return (arch, length (jid:jids))
531
    else do
532
      logDebug $ "Inspecting job " ++ jidname ++ " for archival"
533
      loadResult <- loadJobFromDisk qDir False jid
534
      case loadResult of
535
        Bad _ -> continue
536
        Ok (job, _) -> 
537
          if jobArchivable cutt job
538
            then do
539
              let live = liveJobFile qDir jid
540
                  archive = archivedJobFile qDir jid
541
              renameResult <- safeRenameFile queueDirPermissions
542
                                live archive
543
              case renameResult of                   
544
                Bad s -> do
545
                  logWarning $ "Renaming " ++ live ++ " to " ++ archive
546
                                 ++ " failed unexpectedly: " ++ s
547
                  continue
548
                Ok () -> do
549
                  let torepl' = jid:torepl
550
                  if length torepl' >= 10
551
                    then do
552
                      _ <- forkIO $ replicateFn torepl'
553
                      archiveMore (arch + 1) [] jids
554
                    else archiveMore (arch + 1) torepl' jids
555
            else continue
556
                   
557
-- | Archive jobs older than the given time, but do not exceed the timeout for
558
-- carrying out this task.
559
archiveJobs :: ConfigData -- ^ cluster configuration
560
               -> Int  -- ^ time the job has to be in the past in order
561
                       -- to be archived
562
               -> Int -- ^ timeout
563
               -> [JobId] -- ^ jobs to consider
564
               -> IO (Int, Int)
565
archiveJobs cfg age timeout jids = do
566
  now <- getClockTime
567
  qDir <- queueDir
568
  let endtime = addToClockTime (noTimeDiff { tdSec = timeout }) now
569
      cuttime = if age < 0 then noTimestamp
570
                           else advanceTimestamp (- age) (fromClockTime now)
571
      mcs = Config.getMasterCandidates cfg
572
      replicateFn jobs = do
573
        let olds = map (liveJobFile qDir) jobs
574
            news = map (archivedJobFile qDir) jobs
575
        _ <- executeRpcCall mcs . RpcCallJobqueueRename $ zip olds news
576
        return ()
577
  archiveSomeJobsUntil replicateFn qDir endtime cuttime 0 [] jids