Statistics
| Branch: | Tag: | Revision:

root / src / Ganeti / JQueue.hs @ a2a1a8ca

History | View | Annotate | Download (21.3 kB)

1
{-# LANGUAGE TemplateHaskell #-}
2

    
3
{-| Implementation of the job queue.
4

    
5
-}
6

    
7
{-
8

    
9
Copyright (C) 2010, 2012 Google Inc.
10

    
11
This program is free software; you can redistribute it and/or modify
12
it under the terms of the GNU General Public License as published by
13
the Free Software Foundation; either version 2 of the License, or
14
(at your option) any later version.
15

    
16
This program is distributed in the hope that it will be useful, but
17
WITHOUT ANY WARRANTY; without even the implied warranty of
18
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
19
General Public License for more details.
20

    
21
You should have received a copy of the GNU General Public License
22
along with this program; if not, write to the Free Software
23
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
24
02110-1301, USA.
25

    
26
-}
27

    
28
module Ganeti.JQueue
29
    ( QueuedOpCode(..)
30
    , QueuedJob(..)
31
    , InputOpCode(..)
32
    , queuedOpCodeFromMetaOpCode
33
    , queuedJobFromOpCodes
34
    , changeOpCodePriority
35
    , changeJobPriority
36
    , cancelQueuedJob
37
    , Timestamp
38
    , fromClockTime
39
    , noTimestamp
40
    , currentTimestamp
41
    , advanceTimestamp
42
    , setReceivedTimestamp
43
    , opStatusFinalized
44
    , extractOpSummary
45
    , calcJobStatus
46
    , jobStarted
47
    , jobFinalized
48
    , jobArchivable
49
    , calcJobPriority
50
    , jobFileName
51
    , liveJobFile
52
    , archivedJobFile
53
    , determineJobDirectories
54
    , getJobIDs
55
    , sortJobIDs
56
    , loadJobFromDisk
57
    , noSuchJob
58
    , readSerialFromDisk
59
    , allocateJobIds
60
    , allocateJobId
61
    , writeJobToDisk
62
    , replicateManyJobs
63
    , isQueueOpen
64
    , startJobs
65
    , cancelJob
66
    , queueDirPermissions
67
    , archiveJobs
68
    ) where
69

    
70
import Control.Applicative (liftA2, (<|>))
71
import Control.Arrow (first, second)
72
import Control.Concurrent (forkIO)
73
import Control.Concurrent.MVar
74
import Control.Exception
75
import Control.Monad
76
import Control.Monad.IO.Class
77
import Data.Functor ((<$))
78
import Data.List
79
import Data.Maybe
80
import Data.Ord (comparing)
81
-- workaround what seems to be a bug in ghc 7.4's TH shadowing code
82
import Prelude hiding (id, log)
83
import System.Directory
84
import System.FilePath
85
import System.IO.Error (isDoesNotExistError)
86
import System.Posix.Files
87
import System.Time
88
import qualified Text.JSON
89
import Text.JSON.Types
90

    
91
import Ganeti.BasicTypes
92
import qualified Ganeti.Config as Config
93
import qualified Ganeti.Constants as C
94
import Ganeti.Errors (ErrorResult)
95
import Ganeti.JSON
96
import Ganeti.Logging
97
import Ganeti.Luxi
98
import Ganeti.Objects (ConfigData, Node)
99
import Ganeti.OpCodes
100
import Ganeti.Path
101
import Ganeti.Rpc (executeRpcCall, ERpcError, logRpcErrors,
102
                   RpcCallJobqueueUpdate(..), RpcCallJobqueueRename(..))
103
import Ganeti.THH
104
import Ganeti.Types
105
import Ganeti.Utils
106
import Ganeti.VCluster (makeVirtualPath)
107

    
108
-- * Data types
109

    
110
-- | The ganeti queue timestamp type. It represents the time as the pair
111
-- of seconds since the epoch and microseconds since the beginning of the
112
-- second.
113
type Timestamp = (Int, Int)
114

    
115
-- | Missing timestamp type.
116
noTimestamp :: Timestamp
117
noTimestamp = (-1, -1)
118

    
119
-- | Obtain a Timestamp from a given clock time
120
fromClockTime :: ClockTime -> Timestamp
121
fromClockTime (TOD ctime pico) =
122
  (fromIntegral ctime, fromIntegral $ pico `div` 1000000)
123

    
124
-- | Get the current time in the job-queue timestamp format.
125
currentTimestamp :: IO Timestamp
126
currentTimestamp = fromClockTime `liftM` getClockTime
127

    
128
-- | From a given timestamp, obtain the timestamp of the
129
-- time that is the given number of seconds later.
130
advanceTimestamp :: Int -> Timestamp -> Timestamp
131
advanceTimestamp = first . (+)
132

    
133
-- | An input opcode.
134
data InputOpCode = ValidOpCode MetaOpCode -- ^ OpCode was parsed successfully
135
                 | InvalidOpCode JSValue  -- ^ Invalid opcode
136
                   deriving (Show, Eq)
137

    
138
-- | JSON instance for 'InputOpCode', trying to parse it and if
139
-- failing, keeping the original JSValue.
140
instance Text.JSON.JSON InputOpCode where
141
  showJSON (ValidOpCode mo) = Text.JSON.showJSON mo
142
  showJSON (InvalidOpCode inv) = inv
143
  readJSON v = case Text.JSON.readJSON v of
144
                 Text.JSON.Error _ -> return $ InvalidOpCode v
145
                 Text.JSON.Ok mo -> return $ ValidOpCode mo
146

    
147
-- | Invalid opcode summary.
148
invalidOp :: String
149
invalidOp = "INVALID_OP"
150

    
151
-- | Tries to extract the opcode summary from an 'InputOpCode'. This
152
-- duplicates some functionality from the 'opSummary' function in
153
-- "Ganeti.OpCodes".
154
extractOpSummary :: InputOpCode -> String
155
extractOpSummary (ValidOpCode metaop) = opSummary $ metaOpCode metaop
156
extractOpSummary (InvalidOpCode (JSObject o)) =
157
  case fromObjWithDefault (fromJSObject o) "OP_ID" ("OP_" ++ invalidOp) of
158
    Just s -> drop 3 s -- drop the OP_ prefix
159
    Nothing -> invalidOp
160
extractOpSummary _ = invalidOp
161

    
162
$(buildObject "QueuedOpCode" "qo"
163
  [ simpleField "input"           [t| InputOpCode |]
164
  , simpleField "status"          [t| OpStatus    |]
165
  , simpleField "result"          [t| JSValue     |]
166
  , defaultField [| [] |] $
167
    simpleField "log"             [t| [(Int, Timestamp, ELogType, JSValue)] |]
168
  , simpleField "priority"        [t| Int         |]
169
  , optionalNullSerField $
170
    simpleField "start_timestamp" [t| Timestamp   |]
171
  , optionalNullSerField $
172
    simpleField "exec_timestamp"  [t| Timestamp   |]
173
  , optionalNullSerField $
174
    simpleField "end_timestamp"   [t| Timestamp   |]
175
  ])
176

    
177
$(buildObject "QueuedJob" "qj"
178
  [ simpleField "id"                 [t| JobId          |]
179
  , simpleField "ops"                [t| [QueuedOpCode] |]
180
  , optionalNullSerField $
181
    simpleField "received_timestamp" [t| Timestamp      |]
182
  , optionalNullSerField $
183
    simpleField "start_timestamp"    [t| Timestamp      |]
184
  , optionalNullSerField $
185
    simpleField "end_timestamp"      [t| Timestamp      |]
186
  ])
187

    
188
-- | Convenience function to obtain a QueuedOpCode from a MetaOpCode
189
queuedOpCodeFromMetaOpCode :: MetaOpCode -> QueuedOpCode
190
queuedOpCodeFromMetaOpCode op =
191
  QueuedOpCode { qoInput = ValidOpCode op
192
               , qoStatus = OP_STATUS_QUEUED
193
               , qoPriority = opSubmitPriorityToRaw . opPriority . metaParams
194
                              $ op
195
               , qoLog = []
196
               , qoResult = JSNull
197
               , qoStartTimestamp = Nothing
198
               , qoEndTimestamp = Nothing
199
               , qoExecTimestamp = Nothing
200
               }
201

    
202
-- | From a job-id and a list of op-codes create a job. This is
203
-- the pure part of job creation, as allocating a new job id
204
-- lives in IO.
205
queuedJobFromOpCodes :: (Monad m) => JobId -> [MetaOpCode] -> m QueuedJob
206
queuedJobFromOpCodes jobid ops = do
207
  ops' <- mapM (`resolveDependencies` jobid) ops
208
  return QueuedJob { qjId = jobid
209
                   , qjOps = map queuedOpCodeFromMetaOpCode ops'
210
                   , qjReceivedTimestamp = Nothing 
211
                   , qjStartTimestamp = Nothing
212
                   , qjEndTimestamp = Nothing
213
                   }
214

    
215
-- | Attach a received timestamp to a Queued Job.
216
setReceivedTimestamp :: Timestamp -> QueuedJob -> QueuedJob
217
setReceivedTimestamp ts job = job { qjReceivedTimestamp = Just ts }
218

    
219
-- | Change the priority of a QueuedOpCode, if it is not already
220
-- finalized.
221
changeOpCodePriority :: Int -> QueuedOpCode -> QueuedOpCode
222
changeOpCodePriority prio op =
223
  if qoStatus op > OP_STATUS_RUNNING
224
     then op
225
     else op { qoPriority = prio }
226

    
227
-- | Set the state of a QueuedOpCode to canceled.
228
cancelOpCode :: Timestamp -> QueuedOpCode -> QueuedOpCode
229
cancelOpCode now op =
230
  op { qoStatus = OP_STATUS_CANCELED, qoEndTimestamp = Just now }
231

    
232
-- | Change the priority of a job, i.e., change the priority of the
233
-- non-finalized opcodes.
234
changeJobPriority :: Int -> QueuedJob -> QueuedJob
235
changeJobPriority prio job =
236
  job { qjOps = map (changeOpCodePriority prio) $ qjOps job }
237

    
238
-- | Transform a QueuedJob that has not been started into its canceled form.
239
cancelQueuedJob :: Timestamp -> QueuedJob -> QueuedJob
240
cancelQueuedJob now job =
241
  let ops' = map (cancelOpCode now) $ qjOps job
242
  in job { qjOps = ops', qjEndTimestamp = Just now}
243

    
244
-- | Job file prefix.
245
jobFilePrefix :: String
246
jobFilePrefix = "job-"
247

    
248
-- | Computes the filename for a given job ID.
249
jobFileName :: JobId -> FilePath
250
jobFileName jid = jobFilePrefix ++ show (fromJobId jid)
251

    
252
-- | Parses a job ID from a file name.
253
parseJobFileId :: (Monad m) => FilePath -> m JobId
254
parseJobFileId path =
255
  case stripPrefix jobFilePrefix path of
256
    Nothing -> fail $ "Job file '" ++ path ++
257
                      "' doesn't have the correct prefix"
258
    Just suffix -> makeJobIdS suffix
259

    
260
-- | Computes the full path to a live job.
261
liveJobFile :: FilePath -> JobId -> FilePath
262
liveJobFile rootdir jid = rootdir </> jobFileName jid
263

    
264
-- | Computes the full path to an archives job. BROKEN.
265
archivedJobFile :: FilePath -> JobId -> FilePath
266
archivedJobFile rootdir jid =
267
  let subdir = show (fromJobId jid `div` C.jstoreJobsPerArchiveDirectory)
268
  in rootdir </> jobQueueArchiveSubDir </> subdir </> jobFileName jid
269

    
270
-- | Map from opcode status to job status.
271
opStatusToJob :: OpStatus -> JobStatus
272
opStatusToJob OP_STATUS_QUEUED    = JOB_STATUS_QUEUED
273
opStatusToJob OP_STATUS_WAITING   = JOB_STATUS_WAITING
274
opStatusToJob OP_STATUS_SUCCESS   = JOB_STATUS_SUCCESS
275
opStatusToJob OP_STATUS_RUNNING   = JOB_STATUS_RUNNING
276
opStatusToJob OP_STATUS_CANCELING = JOB_STATUS_CANCELING
277
opStatusToJob OP_STATUS_CANCELED  = JOB_STATUS_CANCELED
278
opStatusToJob OP_STATUS_ERROR     = JOB_STATUS_ERROR
279

    
280
-- | Computes a queued job's status.
281
calcJobStatus :: QueuedJob -> JobStatus
282
calcJobStatus QueuedJob { qjOps = ops } =
283
  extractOpSt (map qoStatus ops) JOB_STATUS_QUEUED True
284
    where
285
      terminalStatus OP_STATUS_ERROR     = True
286
      terminalStatus OP_STATUS_CANCELING = True
287
      terminalStatus OP_STATUS_CANCELED  = True
288
      terminalStatus _                   = False
289
      softStatus     OP_STATUS_SUCCESS   = True
290
      softStatus     OP_STATUS_QUEUED    = True
291
      softStatus     _                   = False
292
      extractOpSt [] _ True = JOB_STATUS_SUCCESS
293
      extractOpSt [] d False = d
294
      extractOpSt (x:xs) d old_all
295
           | terminalStatus x = opStatusToJob x -- abort recursion
296
           | softStatus x     = extractOpSt xs d new_all -- continue unchanged
297
           | otherwise        = extractOpSt xs (opStatusToJob x) new_all
298
           where new_all = x == OP_STATUS_SUCCESS && old_all
299

    
300
-- | Determine if a job has started
301
jobStarted :: QueuedJob -> Bool
302
jobStarted = (> JOB_STATUS_QUEUED) . calcJobStatus
303

    
304
-- | Determine if a job is finalised.
305
jobFinalized :: QueuedJob -> Bool
306
jobFinalized = (> JOB_STATUS_RUNNING) . calcJobStatus
307

    
308
-- | Determine if a job is finalized and its timestamp is before
309
-- a given time.
310
jobArchivable :: Timestamp -> QueuedJob -> Bool
311
jobArchivable ts = liftA2 (&&) jobFinalized 
312
  $ maybe False (< ts)
313
    .  liftA2 (<|>) qjEndTimestamp qjStartTimestamp
314

    
315
-- | Determine whether an opcode status is finalized.
316
opStatusFinalized :: OpStatus -> Bool
317
opStatusFinalized = (> OP_STATUS_RUNNING)
318

    
319
-- | Compute a job's priority.
320
calcJobPriority :: QueuedJob -> Int
321
calcJobPriority QueuedJob { qjOps = ops } =
322
  helper . map qoPriority $ filter (not . opStatusFinalized . qoStatus) ops
323
    where helper [] = C.opPrioDefault
324
          helper ps = minimum ps
325

    
326
-- | Log but ignore an 'IOError'.
327
ignoreIOError :: a -> Bool -> String -> IOError -> IO a
328
ignoreIOError a ignore_noent msg e = do
329
  unless (isDoesNotExistError e && ignore_noent) .
330
    logWarning $ msg ++ ": " ++ show e
331
  return a
332

    
333
-- | Compute the list of existing archive directories. Note that I/O
334
-- exceptions are swallowed and ignored.
335
allArchiveDirs :: FilePath -> IO [FilePath]
336
allArchiveDirs rootdir = do
337
  let adir = rootdir </> jobQueueArchiveSubDir
338
  contents <- getDirectoryContents adir `Control.Exception.catch`
339
               ignoreIOError [] False
340
                 ("Failed to list queue directory " ++ adir)
341
  let fpaths = map (adir </>) $ filter (not . ("." `isPrefixOf`)) contents
342
  filterM (\path ->
343
             liftM isDirectory (getFileStatus (adir </> path))
344
               `Control.Exception.catch`
345
               ignoreIOError False True
346
                 ("Failed to stat archive path " ++ path)) fpaths
347

    
348
-- | Build list of directories containing job files. Note: compared to
349
-- the Python version, this doesn't ignore a potential lost+found
350
-- file.
351
determineJobDirectories :: FilePath -> Bool -> IO [FilePath]
352
determineJobDirectories rootdir archived = do
353
  other <- if archived
354
             then allArchiveDirs rootdir
355
             else return []
356
  return $ rootdir:other
357

    
358
-- | Computes the list of all jobs in the given directories.
359
getJobIDs :: [FilePath] -> IO (GenericResult IOError [JobId])
360
getJobIDs = runResultT . liftM concat . mapM getDirJobIDs
361

    
362
-- | Sorts the a list of job IDs.
363
sortJobIDs :: [JobId] -> [JobId]
364
sortJobIDs = sortBy (comparing fromJobId)
365

    
366
-- | Computes the list of jobs in a given directory.
367
getDirJobIDs :: FilePath -> ResultT IOError IO [JobId]
368
getDirJobIDs path =
369
  withErrorLogAt WARNING ("Failed to list job directory " ++ path) .
370
    liftM (mapMaybe parseJobFileId) $ liftIO (getDirectoryContents path)
371

    
372
-- | Reads the job data from disk.
373
readJobDataFromDisk :: FilePath -> Bool -> JobId -> IO (Maybe (String, Bool))
374
readJobDataFromDisk rootdir archived jid = do
375
  let live_path = liveJobFile rootdir jid
376
      archived_path = archivedJobFile rootdir jid
377
      all_paths = if archived
378
                    then [(live_path, False), (archived_path, True)]
379
                    else [(live_path, False)]
380
  foldM (\state (path, isarchived) ->
381
           liftM (\r -> Just (r, isarchived)) (readFile path)
382
             `Control.Exception.catch`
383
             ignoreIOError state True
384
               ("Failed to read job file " ++ path)) Nothing all_paths
385

    
386
-- | Failed to load job error.
387
noSuchJob :: Result (QueuedJob, Bool)
388
noSuchJob = Bad "Can't load job file"
389

    
390
-- | Loads a job from disk.
391
loadJobFromDisk :: FilePath -> Bool -> JobId -> IO (Result (QueuedJob, Bool))
392
loadJobFromDisk rootdir archived jid = do
393
  raw <- readJobDataFromDisk rootdir archived jid
394
  -- note: we need some stricness below, otherwise the wrapping in a
395
  -- Result will create too much lazyness, and not close the file
396
  -- descriptors for the individual jobs
397
  return $! case raw of
398
             Nothing -> noSuchJob
399
             Just (str, arch) ->
400
               liftM (\qj -> (qj, arch)) .
401
               fromJResult "Parsing job file" $ Text.JSON.decode str
402

    
403
-- | Write a job to disk.
404
writeJobToDisk :: FilePath -> QueuedJob -> IO (Result ())
405
writeJobToDisk rootdir job = do
406
  let filename = liveJobFile rootdir . qjId $ job
407
      content = Text.JSON.encode . Text.JSON.showJSON $ job
408
  tryAndLogIOError (atomicWriteFile filename content)
409
                   ("Failed to write " ++ filename) Ok
410

    
411
-- | Replicate a job to all master candidates.
412
replicateJob :: FilePath -> [Node] -> QueuedJob -> IO [(Node, ERpcError ())]
413
replicateJob rootdir mastercandidates job = do
414
  let filename = liveJobFile rootdir . qjId $ job
415
      content = Text.JSON.encode . Text.JSON.showJSON $ job
416
  filename' <- makeVirtualPath filename
417
  callresult <- executeRpcCall mastercandidates
418
                  $ RpcCallJobqueueUpdate filename' content
419
  let result = map (second (() <$)) callresult
420
  logRpcErrors result
421
  return result
422

    
423
-- | Replicate many jobs to all master candidates.
424
replicateManyJobs :: FilePath -> [Node] -> [QueuedJob] -> IO ()
425
replicateManyJobs rootdir mastercandidates =
426
  mapM_ (replicateJob rootdir mastercandidates)
427

    
428
-- | Read the job serial number from disk.
429
readSerialFromDisk :: IO (Result JobId)
430
readSerialFromDisk = do
431
  filename <- jobQueueSerialFile
432
  tryAndLogIOError (readFile filename) "Failed to read serial file"
433
                   (makeJobIdS . rStripSpace)
434

    
435
-- | Allocate new job ids.
436
-- To avoid races while accessing the serial file, the threads synchronize
437
-- over a lock, as usual provided by an MVar.
438
allocateJobIds :: [Node] -> MVar () -> Int -> IO (Result [JobId])
439
allocateJobIds mastercandidates lock n =
440
  if n <= 0
441
    then return . Bad $ "Can only allocate positive number of job ids"
442
    else do
443
      takeMVar lock
444
      rjobid <- readSerialFromDisk
445
      case rjobid of
446
        Bad s -> do
447
          putMVar lock ()
448
          return . Bad $ s
449
        Ok jid -> do
450
          let current = fromJobId jid
451
              serial_content = show (current + n) ++  "\n"
452
          serial <- jobQueueSerialFile
453
          write_result <- try $ atomicWriteFile serial serial_content
454
                          :: IO (Either IOError ())
455
          case write_result of
456
            Left e -> do
457
              putMVar lock ()
458
              let msg = "Failed to write serial file: " ++ show e
459
              logError msg
460
              return . Bad $ msg 
461
            Right () -> do
462
              serial' <- makeVirtualPath serial
463
              _ <- executeRpcCall mastercandidates
464
                     $ RpcCallJobqueueUpdate serial' serial_content
465
              putMVar lock ()
466
              return $ mapM makeJobId [(current+1)..(current+n)]
467

    
468
-- | Allocate one new job id.
469
allocateJobId :: [Node] -> MVar () -> IO (Result JobId)
470
allocateJobId mastercandidates lock = do
471
  jids <- allocateJobIds mastercandidates lock 1
472
  return (jids >>= monadicThe "Failed to allocate precisely one Job ID")
473

    
474
-- | Decide if job queue is open
475
isQueueOpen :: IO Bool
476
isQueueOpen = liftM not (jobQueueDrainFile >>= doesFileExist)
477

    
478
-- | Start enqueued jobs, currently by handing them over to masterd.
479
startJobs :: [QueuedJob] -> IO ()
480
startJobs jobs = do
481
  socketpath <- defaultMasterSocket
482
  client <- getLuxiClient socketpath
483
  pickupResults <- mapM (flip callMethod client . PickupJob . qjId) jobs
484
  let failures = map show $ justBad pickupResults
485
  unless (null failures)
486
   . logWarning . (++) "Failed to notify masterd: " . commaJoin $ failures
487

    
488
-- | Try to cancel a job that has already been handed over to execution,
489
-- currently by asking masterd to cancel it.
490
cancelJob :: JobId -> IO (ErrorResult JSValue)
491
cancelJob jid = do
492
  socketpath <- defaultMasterSocket
493
  client <- getLuxiClient socketpath
494
  callMethod (CancelJob jid) client
495

    
496
-- | Permissions for the archive directories.
497
queueDirPermissions :: FilePermissions
498
queueDirPermissions = FilePermissions { fpOwner = Just C.masterdUser
499
                                      , fpGroup = Just C.daemonsGroup
500
                                      , fpPermissions = 0o0750
501
                                      }
502

    
503
-- | Try, at most until the given endtime, to archive some of the given
504
-- jobs, if they are older than the specified cut-off time; also replicate
505
-- archival of the additional jobs. Return the pair of the number of jobs
506
-- archived, and the number of jobs remaining int he queue, asuming the
507
-- given numbers about the not considered jobs.
508
archiveSomeJobsUntil :: ([JobId] -> IO ()) -- ^ replication function
509
                        -> FilePath -- ^ queue root directory
510
                        -> ClockTime -- ^ Endtime
511
                        -> Timestamp -- ^ cut-off time for archiving jobs
512
                        -> Int -- ^ number of jobs alread archived
513
                        -> [JobId] -- ^ Additional jobs to replicate
514
                        -> [JobId] -- ^ List of job-ids still to consider
515
                        -> IO (Int, Int)
516
archiveSomeJobsUntil replicateFn _ _ _ arch torepl [] = do
517
  unless (null torepl) . (>> return ())
518
   . forkIO $ replicateFn torepl
519
  return (arch, 0)
520

    
521
archiveSomeJobsUntil replicateFn qDir endt cutt arch torepl (jid:jids) = do
522
  let archiveMore = archiveSomeJobsUntil replicateFn qDir endt cutt
523
      continue = archiveMore arch torepl jids
524
      jidname = show $ fromJobId jid
525
  time <- getClockTime
526
  if time >= endt
527
    then do
528
      _ <- forkIO $ replicateFn torepl
529
      return (arch, length (jid:jids))
530
    else do
531
      logDebug $ "Inspecting job " ++ jidname ++ " for archival"
532
      loadResult <- loadJobFromDisk qDir False jid
533
      case loadResult of
534
        Bad _ -> continue
535
        Ok (job, _) -> 
536
          if jobArchivable cutt job
537
            then do
538
              let live = liveJobFile qDir jid
539
                  archive = archivedJobFile qDir jid
540
              renameResult <- safeRenameFile queueDirPermissions
541
                                live archive
542
              case renameResult of                   
543
                Bad s -> do
544
                  logWarning $ "Renaming " ++ live ++ " to " ++ archive
545
                                 ++ " failed unexpectedly: " ++ s
546
                  continue
547
                Ok () -> do
548
                  let torepl' = jid:torepl
549
                  if length torepl' >= 10
550
                    then do
551
                      _ <- forkIO $ replicateFn torepl'
552
                      archiveMore (arch + 1) [] jids
553
                    else archiveMore (arch + 1) torepl' jids
554
            else continue
555
                   
556
-- | Archive jobs older than the given time, but do not exceed the timeout for
557
-- carrying out this task.
558
archiveJobs :: ConfigData -- ^ cluster configuration
559
               -> Int  -- ^ time the job has to be in the past in order
560
                       -- to be archived
561
               -> Int -- ^ timeout
562
               -> [JobId] -- ^ jobs to consider
563
               -> IO (Int, Int)
564
archiveJobs cfg age timeout jids = do
565
  now <- getClockTime
566
  qDir <- queueDir
567
  let endtime = addToClockTime (noTimeDiff { tdSec = timeout }) now
568
      cuttime = if age < 0 then noTimestamp
569
                           else advanceTimestamp (- age) (fromClockTime now)
570
      mcs = Config.getMasterCandidates cfg
571
      replicateFn jobs = do
572
        let olds = map (liveJobFile qDir) jobs
573
            news = map (archivedJobFile qDir) jobs
574
        _ <- executeRpcCall mcs . RpcCallJobqueueRename $ zip olds news
575
        return ()
576
  archiveSomeJobsUntil replicateFn qDir endtime cuttime 0 [] jids