Statistics
| Branch: | Tag: | Revision:

root / src / Ganeti / JQueue.hs @ 8e527d04

History | View | Annotate | Download (23.2 kB)

1
{-# LANGUAGE TemplateHaskell #-}
2

    
3
{-| Implementation of the job queue.
4

    
5
-}
6

    
7
{-
8

    
9
Copyright (C) 2010, 2012 Google Inc.
10

    
11
This program is free software; you can redistribute it and/or modify
12
it under the terms of the GNU General Public License as published by
13
the Free Software Foundation; either version 2 of the License, or
14
(at your option) any later version.
15

    
16
This program is distributed in the hope that it will be useful, but
17
WITHOUT ANY WARRANTY; without even the implied warranty of
18
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
19
General Public License for more details.
20

    
21
You should have received a copy of the GNU General Public License
22
along with this program; if not, write to the Free Software
23
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
24
02110-1301, USA.
25

    
26
-}
27

    
28
module Ganeti.JQueue
29
    ( QueuedOpCode(..)
30
    , QueuedJob(..)
31
    , InputOpCode(..)
32
    , queuedOpCodeFromMetaOpCode
33
    , queuedJobFromOpCodes
34
    , changeOpCodePriority
35
    , changeJobPriority
36
    , cancelQueuedJob
37
    , Timestamp
38
    , fromClockTime
39
    , noTimestamp
40
    , currentTimestamp
41
    , advanceTimestamp
42
    , setReceivedTimestamp
43
    , extendJobReasonTrail
44
    , opStatusFinalized
45
    , extractOpSummary
46
    , calcJobStatus
47
    , jobStarted
48
    , jobFinalized
49
    , jobArchivable
50
    , calcJobPriority
51
    , jobFileName
52
    , liveJobFile
53
    , archivedJobFile
54
    , determineJobDirectories
55
    , getJobIDs
56
    , sortJobIDs
57
    , loadJobFromDisk
58
    , noSuchJob
59
    , readSerialFromDisk
60
    , allocateJobIds
61
    , allocateJobId
62
    , writeJobToDisk
63
    , replicateManyJobs
64
    , isQueueOpen
65
    , startJobs
66
    , cancelJob
67
    , queueDirPermissions
68
    , archiveJobs
69
    ) where
70

    
71
import Control.Applicative (liftA2, (<|>))
72
import Control.Arrow (first, second)
73
import Control.Concurrent (forkIO)
74
import Control.Concurrent.MVar
75
import Control.Exception
76
import Control.Monad
77
import Control.Monad.IO.Class
78
import Data.Functor ((<$))
79
import Data.List
80
import Data.Maybe
81
import Data.Ord (comparing)
82
-- workaround what seems to be a bug in ghc 7.4's TH shadowing code
83
import Prelude hiding (id, log)
84
import System.Directory
85
import System.FilePath
86
import System.IO.Error (isDoesNotExistError)
87
import System.Posix.Files
88
import System.Time
89
import qualified Text.JSON
90
import Text.JSON.Types
91

    
92
import Ganeti.BasicTypes
93
import qualified Ganeti.Config as Config
94
import qualified Ganeti.Constants as C
95
import Ganeti.Errors (ErrorResult)
96
import Ganeti.JSON
97
import Ganeti.Logging
98
import Ganeti.Luxi
99
import Ganeti.Objects (ConfigData, Node)
100
import Ganeti.OpCodes
101
import Ganeti.Path
102
import Ganeti.Rpc (executeRpcCall, ERpcError, logRpcErrors,
103
                   RpcCallJobqueueUpdate(..), RpcCallJobqueueRename(..))
104
import Ganeti.THH
105
import Ganeti.Types
106
import Ganeti.Utils
107
import Ganeti.Utils.Atomic
108
import Ganeti.VCluster (makeVirtualPath)
109

    
110
-- * Data types
111

    
112
-- | The ganeti queue timestamp type. It represents the time as the pair
113
-- of seconds since the epoch and microseconds since the beginning of the
114
-- second.
115
type Timestamp = (Int, Int)
116

    
117
-- | Missing timestamp type.
118
noTimestamp :: Timestamp
119
noTimestamp = (-1, -1)
120

    
121
-- | Obtain a Timestamp from a given clock time
122
fromClockTime :: ClockTime -> Timestamp
123
fromClockTime (TOD ctime pico) =
124
  (fromIntegral ctime, fromIntegral $ pico `div` 1000000)
125

    
126
-- | Get the current time in the job-queue timestamp format.
127
currentTimestamp :: IO Timestamp
128
currentTimestamp = fromClockTime `liftM` getClockTime
129

    
130
-- | From a given timestamp, obtain the timestamp of the
131
-- time that is the given number of seconds later.
132
advanceTimestamp :: Int -> Timestamp -> Timestamp
133
advanceTimestamp = first . (+)
134

    
135
-- | An input opcode.
136
data InputOpCode = ValidOpCode MetaOpCode -- ^ OpCode was parsed successfully
137
                 | InvalidOpCode JSValue  -- ^ Invalid opcode
138
                   deriving (Show, Eq)
139

    
140
-- | JSON instance for 'InputOpCode', trying to parse it and if
141
-- failing, keeping the original JSValue.
142
instance Text.JSON.JSON InputOpCode where
143
  showJSON (ValidOpCode mo) = Text.JSON.showJSON mo
144
  showJSON (InvalidOpCode inv) = inv
145
  readJSON v = case Text.JSON.readJSON v of
146
                 Text.JSON.Error _ -> return $ InvalidOpCode v
147
                 Text.JSON.Ok mo -> return $ ValidOpCode mo
148

    
149
-- | Invalid opcode summary.
150
invalidOp :: String
151
invalidOp = "INVALID_OP"
152

    
153
-- | Tries to extract the opcode summary from an 'InputOpCode'. This
154
-- duplicates some functionality from the 'opSummary' function in
155
-- "Ganeti.OpCodes".
156
extractOpSummary :: InputOpCode -> String
157
extractOpSummary (ValidOpCode metaop) = opSummary $ metaOpCode metaop
158
extractOpSummary (InvalidOpCode (JSObject o)) =
159
  case fromObjWithDefault (fromJSObject o) "OP_ID" ("OP_" ++ invalidOp) of
160
    Just s -> drop 3 s -- drop the OP_ prefix
161
    Nothing -> invalidOp
162
extractOpSummary _ = invalidOp
163

    
164
$(buildObject "QueuedOpCode" "qo"
165
  [ simpleField "input"           [t| InputOpCode |]
166
  , simpleField "status"          [t| OpStatus    |]
167
  , simpleField "result"          [t| JSValue     |]
168
  , defaultField [| [] |] $
169
    simpleField "log"             [t| [(Int, Timestamp, ELogType, JSValue)] |]
170
  , simpleField "priority"        [t| Int         |]
171
  , optionalNullSerField $
172
    simpleField "start_timestamp" [t| Timestamp   |]
173
  , optionalNullSerField $
174
    simpleField "exec_timestamp"  [t| Timestamp   |]
175
  , optionalNullSerField $
176
    simpleField "end_timestamp"   [t| Timestamp   |]
177
  ])
178

    
179
$(buildObject "QueuedJob" "qj"
180
  [ simpleField "id"                 [t| JobId          |]
181
  , simpleField "ops"                [t| [QueuedOpCode] |]
182
  , optionalNullSerField $
183
    simpleField "received_timestamp" [t| Timestamp      |]
184
  , optionalNullSerField $
185
    simpleField "start_timestamp"    [t| Timestamp      |]
186
  , optionalNullSerField $
187
    simpleField "end_timestamp"      [t| Timestamp      |]
188
  ])
189

    
190
-- | Convenience function to obtain a QueuedOpCode from a MetaOpCode
191
queuedOpCodeFromMetaOpCode :: MetaOpCode -> QueuedOpCode
192
queuedOpCodeFromMetaOpCode op =
193
  QueuedOpCode { qoInput = ValidOpCode op
194
               , qoStatus = OP_STATUS_QUEUED
195
               , qoPriority = opSubmitPriorityToRaw . opPriority . metaParams
196
                              $ op
197
               , qoLog = []
198
               , qoResult = JSNull
199
               , qoStartTimestamp = Nothing
200
               , qoEndTimestamp = Nothing
201
               , qoExecTimestamp = Nothing
202
               }
203

    
204
-- | From a job-id and a list of op-codes create a job. This is
205
-- the pure part of job creation, as allocating a new job id
206
-- lives in IO.
207
queuedJobFromOpCodes :: (Monad m) => JobId -> [MetaOpCode] -> m QueuedJob
208
queuedJobFromOpCodes jobid ops = do
209
  ops' <- mapM (`resolveDependencies` jobid) ops
210
  return QueuedJob { qjId = jobid
211
                   , qjOps = map queuedOpCodeFromMetaOpCode ops'
212
                   , qjReceivedTimestamp = Nothing
213
                   , qjStartTimestamp = Nothing
214
                   , qjEndTimestamp = Nothing
215
                   }
216

    
217
-- | Attach a received timestamp to a Queued Job.
218
setReceivedTimestamp :: Timestamp -> QueuedJob -> QueuedJob
219
setReceivedTimestamp ts job = job { qjReceivedTimestamp = Just ts }
220

    
221
-- | Build a timestamp in the format expected by the reason trail (nanoseconds)
222
-- starting from a JQueue Timestamp.
223
reasonTrailTimestamp :: Timestamp -> Integer
224
reasonTrailTimestamp (sec, micro) =
225
  let sec' = toInteger sec
226
      micro' = toInteger micro
227
  in sec' * 1000000000 + micro' * 1000
228

    
229
-- | Append an element to the reason trail of an input opcode.
230
extendInputOpCodeReasonTrail :: JobId -> Timestamp -> Int -> InputOpCode
231
                             -> InputOpCode
232
extendInputOpCodeReasonTrail _ _ _ op@(InvalidOpCode _) = op
233
extendInputOpCodeReasonTrail jid ts i (ValidOpCode vOp) =
234
  let metaP = metaParams vOp
235
      op = metaOpCode vOp
236
      trail = opReason metaP
237
      reasonSrc = opReasonSrcID op
238
      reasonText = "job=" ++ show (fromJobId jid) ++ ";index=" ++ show i
239
      reason = (reasonSrc, reasonText, reasonTrailTimestamp ts)
240
      trail' = trail ++ [reason]
241
  in ValidOpCode $ vOp { metaParams = metaP { opReason = trail' } }
242

    
243
-- | Append an element to the reason trail of a queued opcode.
244
extendOpCodeReasonTrail :: JobId -> Timestamp -> Int -> QueuedOpCode
245
                        -> QueuedOpCode
246
extendOpCodeReasonTrail jid ts i op =
247
  let inOp = qoInput op
248
  in op { qoInput = extendInputOpCodeReasonTrail jid ts i inOp }
249

    
250
-- | Append an element to the reason trail of all the OpCodes of a queued job.
251
extendJobReasonTrail :: QueuedJob -> QueuedJob
252
extendJobReasonTrail job =
253
  let jobId = qjId job
254
      mTimestamp = qjReceivedTimestamp job
255
      -- This function is going to be called on QueuedJobs that already contain
256
      -- a timestamp. But for safety reasons we cannot assume mTimestamp will
257
      -- be (Just timestamp), so we use the value 0 in the extremely unlikely
258
      -- case this is not true.
259
      timestamp = fromMaybe (0, 0) mTimestamp
260
    in job
261
        { qjOps =
262
            zipWith (extendOpCodeReasonTrail jobId timestamp) [0..] $
263
              qjOps job
264
        }
265

    
266
-- | Change the priority of a QueuedOpCode, if it is not already
267
-- finalized.
268
changeOpCodePriority :: Int -> QueuedOpCode -> QueuedOpCode
269
changeOpCodePriority prio op =
270
  if qoStatus op > OP_STATUS_RUNNING
271
     then op
272
     else op { qoPriority = prio }
273

    
274
-- | Set the state of a QueuedOpCode to canceled.
275
cancelOpCode :: Timestamp -> QueuedOpCode -> QueuedOpCode
276
cancelOpCode now op =
277
  op { qoStatus = OP_STATUS_CANCELED, qoEndTimestamp = Just now }
278

    
279
-- | Change the priority of a job, i.e., change the priority of the
280
-- non-finalized opcodes.
281
changeJobPriority :: Int -> QueuedJob -> QueuedJob
282
changeJobPriority prio job =
283
  job { qjOps = map (changeOpCodePriority prio) $ qjOps job }
284

    
285
-- | Transform a QueuedJob that has not been started into its canceled form.
286
cancelQueuedJob :: Timestamp -> QueuedJob -> QueuedJob
287
cancelQueuedJob now job =
288
  let ops' = map (cancelOpCode now) $ qjOps job
289
  in job { qjOps = ops', qjEndTimestamp = Just now}
290

    
291
-- | Job file prefix.
292
jobFilePrefix :: String
293
jobFilePrefix = "job-"
294

    
295
-- | Computes the filename for a given job ID.
296
jobFileName :: JobId -> FilePath
297
jobFileName jid = jobFilePrefix ++ show (fromJobId jid)
298

    
299
-- | Parses a job ID from a file name.
300
parseJobFileId :: (Monad m) => FilePath -> m JobId
301
parseJobFileId path =
302
  case stripPrefix jobFilePrefix path of
303
    Nothing -> fail $ "Job file '" ++ path ++
304
                      "' doesn't have the correct prefix"
305
    Just suffix -> makeJobIdS suffix
306

    
307
-- | Computes the full path to a live job.
308
liveJobFile :: FilePath -> JobId -> FilePath
309
liveJobFile rootdir jid = rootdir </> jobFileName jid
310

    
311
-- | Computes the full path to an archives job. BROKEN.
312
archivedJobFile :: FilePath -> JobId -> FilePath
313
archivedJobFile rootdir jid =
314
  let subdir = show (fromJobId jid `div` C.jstoreJobsPerArchiveDirectory)
315
  in rootdir </> jobQueueArchiveSubDir </> subdir </> jobFileName jid
316

    
317
-- | Map from opcode status to job status.
318
opStatusToJob :: OpStatus -> JobStatus
319
opStatusToJob OP_STATUS_QUEUED    = JOB_STATUS_QUEUED
320
opStatusToJob OP_STATUS_WAITING   = JOB_STATUS_WAITING
321
opStatusToJob OP_STATUS_SUCCESS   = JOB_STATUS_SUCCESS
322
opStatusToJob OP_STATUS_RUNNING   = JOB_STATUS_RUNNING
323
opStatusToJob OP_STATUS_CANCELING = JOB_STATUS_CANCELING
324
opStatusToJob OP_STATUS_CANCELED  = JOB_STATUS_CANCELED
325
opStatusToJob OP_STATUS_ERROR     = JOB_STATUS_ERROR
326

    
327
-- | Computes a queued job's status.
328
calcJobStatus :: QueuedJob -> JobStatus
329
calcJobStatus QueuedJob { qjOps = ops } =
330
  extractOpSt (map qoStatus ops) JOB_STATUS_QUEUED True
331
    where
332
      terminalStatus OP_STATUS_ERROR     = True
333
      terminalStatus OP_STATUS_CANCELING = True
334
      terminalStatus OP_STATUS_CANCELED  = True
335
      terminalStatus _                   = False
336
      softStatus     OP_STATUS_SUCCESS   = True
337
      softStatus     OP_STATUS_QUEUED    = True
338
      softStatus     _                   = False
339
      extractOpSt [] _ True = JOB_STATUS_SUCCESS
340
      extractOpSt [] d False = d
341
      extractOpSt (x:xs) d old_all
342
           | terminalStatus x = opStatusToJob x -- abort recursion
343
           | softStatus x     = extractOpSt xs d new_all -- continue unchanged
344
           | otherwise        = extractOpSt xs (opStatusToJob x) new_all
345
           where new_all = x == OP_STATUS_SUCCESS && old_all
346

    
347
-- | Determine if a job has started
348
jobStarted :: QueuedJob -> Bool
349
jobStarted = (> JOB_STATUS_QUEUED) . calcJobStatus
350

    
351
-- | Determine if a job is finalised.
352
jobFinalized :: QueuedJob -> Bool
353
jobFinalized = (> JOB_STATUS_RUNNING) . calcJobStatus
354

    
355
-- | Determine if a job is finalized and its timestamp is before
356
-- a given time.
357
jobArchivable :: Timestamp -> QueuedJob -> Bool
358
jobArchivable ts = liftA2 (&&) jobFinalized
359
  $ maybe False (< ts)
360
    .  liftA2 (<|>) qjEndTimestamp qjStartTimestamp
361

    
362
-- | Determine whether an opcode status is finalized.
363
opStatusFinalized :: OpStatus -> Bool
364
opStatusFinalized = (> OP_STATUS_RUNNING)
365

    
366
-- | Compute a job's priority.
367
calcJobPriority :: QueuedJob -> Int
368
calcJobPriority QueuedJob { qjOps = ops } =
369
  helper . map qoPriority $ filter (not . opStatusFinalized . qoStatus) ops
370
    where helper [] = C.opPrioDefault
371
          helper ps = minimum ps
372

    
373
-- | Log but ignore an 'IOError'.
374
ignoreIOError :: a -> Bool -> String -> IOError -> IO a
375
ignoreIOError a ignore_noent msg e = do
376
  unless (isDoesNotExistError e && ignore_noent) .
377
    logWarning $ msg ++ ": " ++ show e
378
  return a
379

    
380
-- | Compute the list of existing archive directories. Note that I/O
381
-- exceptions are swallowed and ignored.
382
allArchiveDirs :: FilePath -> IO [FilePath]
383
allArchiveDirs rootdir = do
384
  let adir = rootdir </> jobQueueArchiveSubDir
385
  contents <- getDirectoryContents adir `Control.Exception.catch`
386
               ignoreIOError [] False
387
                 ("Failed to list queue directory " ++ adir)
388
  let fpaths = map (adir </>) $ filter (not . ("." `isPrefixOf`)) contents
389
  filterM (\path ->
390
             liftM isDirectory (getFileStatus (adir </> path))
391
               `Control.Exception.catch`
392
               ignoreIOError False True
393
                 ("Failed to stat archive path " ++ path)) fpaths
394

    
395
-- | Build list of directories containing job files. Note: compared to
396
-- the Python version, this doesn't ignore a potential lost+found
397
-- file.
398
determineJobDirectories :: FilePath -> Bool -> IO [FilePath]
399
determineJobDirectories rootdir archived = do
400
  other <- if archived
401
             then allArchiveDirs rootdir
402
             else return []
403
  return $ rootdir:other
404

    
405
-- | Computes the list of all jobs in the given directories.
406
getJobIDs :: [FilePath] -> IO (GenericResult IOError [JobId])
407
getJobIDs = runResultT . liftM concat . mapM getDirJobIDs
408

    
409
-- | Sorts the a list of job IDs.
410
sortJobIDs :: [JobId] -> [JobId]
411
sortJobIDs = sortBy (comparing fromJobId)
412

    
413
-- | Computes the list of jobs in a given directory.
414
getDirJobIDs :: FilePath -> ResultT IOError IO [JobId]
415
getDirJobIDs path =
416
  withErrorLogAt WARNING ("Failed to list job directory " ++ path) .
417
    liftM (mapMaybe parseJobFileId) $ liftIO (getDirectoryContents path)
418

    
419
-- | Reads the job data from disk.
420
readJobDataFromDisk :: FilePath -> Bool -> JobId -> IO (Maybe (String, Bool))
421
readJobDataFromDisk rootdir archived jid = do
422
  let live_path = liveJobFile rootdir jid
423
      archived_path = archivedJobFile rootdir jid
424
      all_paths = if archived
425
                    then [(live_path, False), (archived_path, True)]
426
                    else [(live_path, False)]
427
  foldM (\state (path, isarchived) ->
428
           liftM (\r -> Just (r, isarchived)) (readFile path)
429
             `Control.Exception.catch`
430
             ignoreIOError state True
431
               ("Failed to read job file " ++ path)) Nothing all_paths
432

    
433
-- | Failed to load job error.
434
noSuchJob :: Result (QueuedJob, Bool)
435
noSuchJob = Bad "Can't load job file"
436

    
437
-- | Loads a job from disk.
438
loadJobFromDisk :: FilePath -> Bool -> JobId -> IO (Result (QueuedJob, Bool))
439
loadJobFromDisk rootdir archived jid = do
440
  raw <- readJobDataFromDisk rootdir archived jid
441
  -- note: we need some stricness below, otherwise the wrapping in a
442
  -- Result will create too much lazyness, and not close the file
443
  -- descriptors for the individual jobs
444
  return $! case raw of
445
             Nothing -> noSuchJob
446
             Just (str, arch) ->
447
               liftM (\qj -> (qj, arch)) .
448
               fromJResult "Parsing job file" $ Text.JSON.decode str
449

    
450
-- | Write a job to disk.
451
writeJobToDisk :: FilePath -> QueuedJob -> IO (Result ())
452
writeJobToDisk rootdir job = do
453
  let filename = liveJobFile rootdir . qjId $ job
454
      content = Text.JSON.encode . Text.JSON.showJSON $ job
455
  tryAndLogIOError (atomicWriteFile filename content)
456
                   ("Failed to write " ++ filename) Ok
457

    
458
-- | Replicate a job to all master candidates.
459
replicateJob :: FilePath -> [Node] -> QueuedJob -> IO [(Node, ERpcError ())]
460
replicateJob rootdir mastercandidates job = do
461
  let filename = liveJobFile rootdir . qjId $ job
462
      content = Text.JSON.encode . Text.JSON.showJSON $ job
463
  filename' <- makeVirtualPath filename
464
  callresult <- executeRpcCall mastercandidates
465
                  $ RpcCallJobqueueUpdate filename' content
466
  let result = map (second (() <$)) callresult
467
  _ <- logRpcErrors result
468
  return result
469

    
470
-- | Replicate many jobs to all master candidates.
471
replicateManyJobs :: FilePath -> [Node] -> [QueuedJob] -> IO ()
472
replicateManyJobs rootdir mastercandidates =
473
  mapM_ (replicateJob rootdir mastercandidates)
474

    
475
-- | Read the job serial number from disk.
476
readSerialFromDisk :: IO (Result JobId)
477
readSerialFromDisk = do
478
  filename <- jobQueueSerialFile
479
  tryAndLogIOError (readFile filename) "Failed to read serial file"
480
                   (makeJobIdS . rStripSpace)
481

    
482
-- | Allocate new job ids.
483
-- To avoid races while accessing the serial file, the threads synchronize
484
-- over a lock, as usual provided by an MVar.
485
allocateJobIds :: [Node] -> MVar () -> Int -> IO (Result [JobId])
486
allocateJobIds mastercandidates lock n =
487
  if n <= 0
488
    then return . Bad $ "Can only allocate positive number of job ids"
489
    else do
490
      takeMVar lock
491
      rjobid <- readSerialFromDisk
492
      case rjobid of
493
        Bad s -> do
494
          putMVar lock ()
495
          return . Bad $ s
496
        Ok jid -> do
497
          let current = fromJobId jid
498
              serial_content = show (current + n) ++  "\n"
499
          serial <- jobQueueSerialFile
500
          write_result <- try $ atomicWriteFile serial serial_content
501
                          :: IO (Either IOError ())
502
          case write_result of
503
            Left e -> do
504
              putMVar lock ()
505
              let msg = "Failed to write serial file: " ++ show e
506
              logError msg
507
              return . Bad $ msg
508
            Right () -> do
509
              serial' <- makeVirtualPath serial
510
              _ <- executeRpcCall mastercandidates
511
                     $ RpcCallJobqueueUpdate serial' serial_content
512
              putMVar lock ()
513
              return $ mapM makeJobId [(current+1)..(current+n)]
514

    
515
-- | Allocate one new job id.
516
allocateJobId :: [Node] -> MVar () -> IO (Result JobId)
517
allocateJobId mastercandidates lock = do
518
  jids <- allocateJobIds mastercandidates lock 1
519
  return (jids >>= monadicThe "Failed to allocate precisely one Job ID")
520

    
521
-- | Decide if job queue is open
522
isQueueOpen :: IO Bool
523
isQueueOpen = liftM not (jobQueueDrainFile >>= doesFileExist)
524

    
525
-- | Start enqueued jobs, currently by handing them over to masterd.
526
startJobs :: [QueuedJob] -> IO ()
527
startJobs jobs = do
528
  socketpath <- defaultMasterSocket
529
  client <- getLuxiClient socketpath
530
  pickupResults <- mapM (flip callMethod client . PickupJob . qjId) jobs
531
  let failures = map show $ justBad pickupResults
532
  unless (null failures)
533
   . logWarning . (++) "Failed to notify masterd: " . commaJoin $ failures
534

    
535
-- | Try to cancel a job that has already been handed over to execution,
536
-- currently by asking masterd to cancel it.
537
cancelJob :: JobId -> IO (ErrorResult JSValue)
538
cancelJob jid = do
539
  socketpath <- defaultMasterSocket
540
  client <- getLuxiClient socketpath
541
  callMethod (CancelJob jid) client
542

    
543
-- | Permissions for the archive directories.
544
queueDirPermissions :: FilePermissions
545
queueDirPermissions = FilePermissions { fpOwner = Just C.masterdUser
546
                                      , fpGroup = Just C.daemonsGroup
547
                                      , fpPermissions = 0o0750
548
                                      }
549

    
550
-- | Try, at most until the given endtime, to archive some of the given
551
-- jobs, if they are older than the specified cut-off time; also replicate
552
-- archival of the additional jobs. Return the pair of the number of jobs
553
-- archived, and the number of jobs remaining int he queue, asuming the
554
-- given numbers about the not considered jobs.
555
archiveSomeJobsUntil :: ([JobId] -> IO ()) -- ^ replication function
556
                        -> FilePath -- ^ queue root directory
557
                        -> ClockTime -- ^ Endtime
558
                        -> Timestamp -- ^ cut-off time for archiving jobs
559
                        -> Int -- ^ number of jobs alread archived
560
                        -> [JobId] -- ^ Additional jobs to replicate
561
                        -> [JobId] -- ^ List of job-ids still to consider
562
                        -> IO (Int, Int)
563
archiveSomeJobsUntil replicateFn _ _ _ arch torepl [] = do
564
  unless (null torepl) . (>> return ())
565
   . forkIO $ replicateFn torepl
566
  return (arch, 0)
567

    
568
archiveSomeJobsUntil replicateFn qDir endt cutt arch torepl (jid:jids) = do
569
  let archiveMore = archiveSomeJobsUntil replicateFn qDir endt cutt
570
      continue = archiveMore arch torepl jids
571
      jidname = show $ fromJobId jid
572
  time <- getClockTime
573
  if time >= endt
574
    then do
575
      _ <- forkIO $ replicateFn torepl
576
      return (arch, length (jid:jids))
577
    else do
578
      logDebug $ "Inspecting job " ++ jidname ++ " for archival"
579
      loadResult <- loadJobFromDisk qDir False jid
580
      case loadResult of
581
        Bad _ -> continue
582
        Ok (job, _) ->
583
          if jobArchivable cutt job
584
            then do
585
              let live = liveJobFile qDir jid
586
                  archive = archivedJobFile qDir jid
587
              renameResult <- safeRenameFile queueDirPermissions
588
                                live archive
589
              case renameResult of
590
                Bad s -> do
591
                  logWarning $ "Renaming " ++ live ++ " to " ++ archive
592
                                 ++ " failed unexpectedly: " ++ s
593
                  continue
594
                Ok () -> do
595
                  let torepl' = jid:torepl
596
                  if length torepl' >= 10
597
                    then do
598
                      _ <- forkIO $ replicateFn torepl'
599
                      archiveMore (arch + 1) [] jids
600
                    else archiveMore (arch + 1) torepl' jids
601
            else continue
602

    
603
-- | Archive jobs older than the given time, but do not exceed the timeout for
604
-- carrying out this task.
605
archiveJobs :: ConfigData -- ^ cluster configuration
606
               -> Int  -- ^ time the job has to be in the past in order
607
                       -- to be archived
608
               -> Int -- ^ timeout
609
               -> [JobId] -- ^ jobs to consider
610
               -> IO (Int, Int)
611
archiveJobs cfg age timeout jids = do
612
  now <- getClockTime
613
  qDir <- queueDir
614
  let endtime = addToClockTime (noTimeDiff { tdSec = timeout }) now
615
      cuttime = if age < 0 then noTimestamp
616
                           else advanceTimestamp (- age) (fromClockTime now)
617
      mcs = Config.getMasterCandidates cfg
618
      replicateFn jobs = do
619
        let olds = map (liveJobFile qDir) jobs
620
            news = map (archivedJobFile qDir) jobs
621
        _ <- executeRpcCall mcs . RpcCallJobqueueRename $ zip olds news
622
        return ()
623
  archiveSomeJobsUntil replicateFn qDir endtime cuttime 0 [] jids