Statistics
| Branch: | Tag: | Revision:

root / src / Ganeti / JQueue.hs @ 65a3ff88

History | View | Annotate | Download (24.2 kB)

1
{-# LANGUAGE TemplateHaskell #-}
2

    
3
{-| Implementation of the job queue.
4

    
5
-}
6

    
7
{-
8

    
9
Copyright (C) 2010, 2012 Google Inc.
10

    
11
This program is free software; you can redistribute it and/or modify
12
it under the terms of the GNU General Public License as published by
13
the Free Software Foundation; either version 2 of the License, or
14
(at your option) any later version.
15

    
16
This program is distributed in the hope that it will be useful, but
17
WITHOUT ANY WARRANTY; without even the implied warranty of
18
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
19
General Public License for more details.
20

    
21
You should have received a copy of the GNU General Public License
22
along with this program; if not, write to the Free Software
23
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
24
02110-1301, USA.
25

    
26
-}
27

    
28
module Ganeti.JQueue
29
    ( QueuedOpCode(..)
30
    , QueuedJob(..)
31
    , InputOpCode(..)
32
    , queuedOpCodeFromMetaOpCode
33
    , queuedJobFromOpCodes
34
    , changeOpCodePriority
35
    , changeJobPriority
36
    , cancelQueuedJob
37
    , Timestamp
38
    , fromClockTime
39
    , noTimestamp
40
    , currentTimestamp
41
    , advanceTimestamp
42
    , setReceivedTimestamp
43
    , extendJobReasonTrail
44
    , opStatusFinalized
45
    , extractOpSummary
46
    , calcJobStatus
47
    , jobStarted
48
    , jobFinalized
49
    , jobArchivable
50
    , calcJobPriority
51
    , jobFileName
52
    , liveJobFile
53
    , archivedJobFile
54
    , determineJobDirectories
55
    , getJobIDs
56
    , sortJobIDs
57
    , loadJobFromDisk
58
    , noSuchJob
59
    , readSerialFromDisk
60
    , allocateJobIds
61
    , allocateJobId
62
    , writeJobToDisk
63
    , replicateManyJobs
64
    , isQueueOpen
65
    , startJobs
66
    , cancelJob
67
    , queueDirPermissions
68
    , archiveJobs
69
    ) where
70

    
71
import Control.Applicative (liftA2, (<|>))
72
import Control.Arrow (first, second)
73
import Control.Concurrent (forkIO)
74
import Control.Concurrent.MVar
75
import Control.Exception
76
import Control.Monad
77
import Data.Functor ((<$))
78
import Data.List
79
import Data.Maybe
80
import Data.Ord (comparing)
81
-- workaround what seems to be a bug in ghc 7.4's TH shadowing code
82
import Prelude hiding (id, log)
83
import System.Directory
84
import System.FilePath
85
import System.IO.Error (isDoesNotExistError)
86
import System.Posix.Files
87
import System.Time
88
import qualified Text.JSON
89
import Text.JSON.Types
90

    
91
import Ganeti.BasicTypes
92
import qualified Ganeti.Config as Config
93
import qualified Ganeti.Constants as C
94
import Ganeti.Errors (ErrorResult)
95
import Ganeti.JSON
96
import Ganeti.Logging
97
import Ganeti.Luxi
98
import Ganeti.Objects (ConfigData, Node)
99
import Ganeti.OpCodes
100
import Ganeti.Path
101
import Ganeti.Rpc (executeRpcCall, ERpcError, logRpcErrors,
102
                   RpcCallJobqueueUpdate(..), RpcCallJobqueueRename(..))
103
import Ganeti.THH
104
import Ganeti.Types
105
import Ganeti.Utils
106
import Ganeti.VCluster (makeVirtualPath)
107

    
108
-- * Data types
109

    
110
-- | The ganeti queue timestamp type. It represents the time as the pair
111
-- of seconds since the epoch and microseconds since the beginning of the
112
-- second.
113
type Timestamp = (Int, Int)
114

    
115
-- | Missing timestamp type.
116
noTimestamp :: Timestamp
117
noTimestamp = (-1, -1)
118

    
119
-- | Obtain a Timestamp from a given clock time
120
fromClockTime :: ClockTime -> Timestamp
121
fromClockTime (TOD ctime pico) =
122
  (fromIntegral ctime, fromIntegral $ pico `div` 1000000)
123

    
124
-- | Get the current time in the job-queue timestamp format.
125
currentTimestamp :: IO Timestamp
126
currentTimestamp = fromClockTime `liftM` getClockTime
127

    
128
-- | From a given timestamp, obtain the timestamp of the
129
-- time that is the given number of seconds later.
130
advanceTimestamp :: Int -> Timestamp -> Timestamp
131
advanceTimestamp = first . (+)
132

    
133
-- | An input opcode.
134
data InputOpCode = ValidOpCode MetaOpCode -- ^ OpCode was parsed successfully
135
                 | InvalidOpCode JSValue  -- ^ Invalid opcode
136
                   deriving (Show, Eq)
137

    
138
-- | JSON instance for 'InputOpCode', trying to parse it and if
139
-- failing, keeping the original JSValue.
140
instance Text.JSON.JSON InputOpCode where
141
  showJSON (ValidOpCode mo) = Text.JSON.showJSON mo
142
  showJSON (InvalidOpCode inv) = inv
143
  readJSON v = case Text.JSON.readJSON v of
144
                 Text.JSON.Error _ -> return $ InvalidOpCode v
145
                 Text.JSON.Ok mo -> return $ ValidOpCode mo
146

    
147
-- | Invalid opcode summary.
148
invalidOp :: String
149
invalidOp = "INVALID_OP"
150

    
151
-- | Tries to extract the opcode summary from an 'InputOpCode'. This
152
-- duplicates some functionality from the 'opSummary' function in
153
-- "Ganeti.OpCodes".
154
extractOpSummary :: InputOpCode -> String
155
extractOpSummary (ValidOpCode metaop) = opSummary $ metaOpCode metaop
156
extractOpSummary (InvalidOpCode (JSObject o)) =
157
  case fromObjWithDefault (fromJSObject o) "OP_ID" ("OP_" ++ invalidOp) of
158
    Just s -> drop 3 s -- drop the OP_ prefix
159
    Nothing -> invalidOp
160
extractOpSummary _ = invalidOp
161

    
162
$(buildObject "QueuedOpCode" "qo"
163
  [ simpleField "input"           [t| InputOpCode |]
164
  , simpleField "status"          [t| OpStatus    |]
165
  , simpleField "result"          [t| JSValue     |]
166
  , defaultField [| [] |] $
167
    simpleField "log"             [t| [(Int, Timestamp, ELogType, JSValue)] |]
168
  , simpleField "priority"        [t| Int         |]
169
  , optionalNullSerField $
170
    simpleField "start_timestamp" [t| Timestamp   |]
171
  , optionalNullSerField $
172
    simpleField "exec_timestamp"  [t| Timestamp   |]
173
  , optionalNullSerField $
174
    simpleField "end_timestamp"   [t| Timestamp   |]
175
  ])
176

    
177
$(buildObject "QueuedJob" "qj"
178
  [ simpleField "id"                 [t| JobId          |]
179
  , simpleField "ops"                [t| [QueuedOpCode] |]
180
  , optionalNullSerField $
181
    simpleField "received_timestamp" [t| Timestamp      |]
182
  , optionalNullSerField $
183
    simpleField "start_timestamp"    [t| Timestamp      |]
184
  , optionalNullSerField $
185
    simpleField "end_timestamp"      [t| Timestamp      |]
186
  ])
187

    
188
-- | Convenience function to obtain a QueuedOpCode from a MetaOpCode
189
queuedOpCodeFromMetaOpCode :: MetaOpCode -> QueuedOpCode
190
queuedOpCodeFromMetaOpCode op =
191
  QueuedOpCode { qoInput = ValidOpCode op
192
               , qoStatus = OP_STATUS_QUEUED
193
               , qoPriority = opSubmitPriorityToRaw . opPriority . metaParams
194
                              $ op
195
               , qoLog = []
196
               , qoResult = JSNull
197
               , qoStartTimestamp = Nothing
198
               , qoEndTimestamp = Nothing
199
               , qoExecTimestamp = Nothing
200
               }
201

    
202
-- | From a job-id and a list of op-codes create a job. This is
203
-- the pure part of job creation, as allocating a new job id
204
-- lives in IO.
205
queuedJobFromOpCodes :: (Monad m) => JobId -> [MetaOpCode] -> m QueuedJob
206
queuedJobFromOpCodes jobid ops = do
207
  ops' <- mapM (`resolveDependencies` jobid) ops
208
  return QueuedJob { qjId = jobid
209
                   , qjOps = map queuedOpCodeFromMetaOpCode ops'
210
                   , qjReceivedTimestamp = Nothing
211
                   , qjStartTimestamp = Nothing
212
                   , qjEndTimestamp = Nothing
213
                   }
214

    
215
-- | Attach a received timestamp to a Queued Job.
216
setReceivedTimestamp :: Timestamp -> QueuedJob -> QueuedJob
217
setReceivedTimestamp ts job = job { qjReceivedTimestamp = Just ts }
218

    
219
-- | Build a timestamp in the format expected by the reason trail (nanoseconds)
220
-- starting from a JQueue Timestamp.
221
reasonTrailTimestamp :: Timestamp -> Integer
222
reasonTrailTimestamp (sec, micro) =
223
  let sec' = toInteger sec
224
      micro' = toInteger micro
225
  in sec' * 1000000000 + micro' * 1000
226

    
227
-- | Append an element to the reason trail of an input opcode.
228
extendInputOpCodeReasonTrail :: JobId -> Timestamp -> Int -> InputOpCode
229
                             -> InputOpCode
230
extendInputOpCodeReasonTrail _ _ _ op@(InvalidOpCode _) = op
231
extendInputOpCodeReasonTrail jid ts i (ValidOpCode vOp) =
232
  let metaP = metaParams vOp
233
      op = metaOpCode vOp
234
      trail = opReason metaP
235
      reasonSrc = opReasonSrcID op
236
      reasonText = "job=" ++ show (fromJobId jid) ++ ";index=" ++ show i
237
      reason = (reasonSrc, reasonText, reasonTrailTimestamp ts)
238
      trail' = trail ++ [reason]
239
  in ValidOpCode $ vOp { metaParams = metaP { opReason = trail' } }
240

    
241
-- | Append an element to the reason trail of a queued opcode.
242
extendOpCodeReasonTrail :: JobId -> Timestamp -> Int -> QueuedOpCode
243
                        -> QueuedOpCode
244
extendOpCodeReasonTrail jid ts i op =
245
  let inOp = qoInput op
246
  in op { qoInput = extendInputOpCodeReasonTrail jid ts i inOp }
247

    
248
-- | Append an element to the reason trail of all the OpCodes of a queued job.
249
extendJobReasonTrail :: QueuedJob -> QueuedJob
250
extendJobReasonTrail job =
251
  let jobId = qjId job
252
      mTimestamp = qjReceivedTimestamp job
253
      -- This function is going to be called on QueuedJobs that already contain
254
      -- a timestamp. But for safety reasons we cannot assume mTimestamp will
255
      -- be (Just timestamp), so we use the value 0 in the extremely unlikely
256
      -- case this is not true.
257
      timestamp = fromMaybe (0, 0) mTimestamp
258
    in job
259
        { qjOps =
260
            zipWith (extendOpCodeReasonTrail jobId timestamp) [0..] $
261
              qjOps job
262
        }
263

    
264
-- | Change the priority of a QueuedOpCode, if it is not already
265
-- finalized.
266
changeOpCodePriority :: Int -> QueuedOpCode -> QueuedOpCode
267
changeOpCodePriority prio op =
268
  if qoStatus op > OP_STATUS_RUNNING
269
     then op
270
     else op { qoPriority = prio }
271

    
272
-- | Set the state of a QueuedOpCode to canceled.
273
cancelOpCode :: Timestamp -> QueuedOpCode -> QueuedOpCode
274
cancelOpCode now op =
275
  op { qoStatus = OP_STATUS_CANCELED, qoEndTimestamp = Just now }
276

    
277
-- | Change the priority of a job, i.e., change the priority of the
278
-- non-finalized opcodes.
279
changeJobPriority :: Int -> QueuedJob -> QueuedJob
280
changeJobPriority prio job =
281
  job { qjOps = map (changeOpCodePriority prio) $ qjOps job }
282

    
283
-- | Transform a QueuedJob that has not been started into its canceled form.
284
cancelQueuedJob :: Timestamp -> QueuedJob -> QueuedJob
285
cancelQueuedJob now job =
286
  let ops' = map (cancelOpCode now) $ qjOps job
287
  in job { qjOps = ops', qjEndTimestamp = Just now}
288

    
289
-- | Job file prefix.
290
jobFilePrefix :: String
291
jobFilePrefix = "job-"
292

    
293
-- | Computes the filename for a given job ID.
294
jobFileName :: JobId -> FilePath
295
jobFileName jid = jobFilePrefix ++ show (fromJobId jid)
296

    
297
-- | Parses a job ID from a file name.
298
parseJobFileId :: (Monad m) => FilePath -> m JobId
299
parseJobFileId path =
300
  case stripPrefix jobFilePrefix path of
301
    Nothing -> fail $ "Job file '" ++ path ++
302
                      "' doesn't have the correct prefix"
303
    Just suffix -> makeJobIdS suffix
304

    
305
-- | Computes the full path to a live job.
306
liveJobFile :: FilePath -> JobId -> FilePath
307
liveJobFile rootdir jid = rootdir </> jobFileName jid
308

    
309
-- | Computes the full path to an archives job. BROKEN.
310
archivedJobFile :: FilePath -> JobId -> FilePath
311
archivedJobFile rootdir jid =
312
  let subdir = show (fromJobId jid `div` C.jstoreJobsPerArchiveDirectory)
313
  in rootdir </> jobQueueArchiveSubDir </> subdir </> jobFileName jid
314

    
315
-- | Map from opcode status to job status.
316
opStatusToJob :: OpStatus -> JobStatus
317
opStatusToJob OP_STATUS_QUEUED    = JOB_STATUS_QUEUED
318
opStatusToJob OP_STATUS_WAITING   = JOB_STATUS_WAITING
319
opStatusToJob OP_STATUS_SUCCESS   = JOB_STATUS_SUCCESS
320
opStatusToJob OP_STATUS_RUNNING   = JOB_STATUS_RUNNING
321
opStatusToJob OP_STATUS_CANCELING = JOB_STATUS_CANCELING
322
opStatusToJob OP_STATUS_CANCELED  = JOB_STATUS_CANCELED
323
opStatusToJob OP_STATUS_ERROR     = JOB_STATUS_ERROR
324

    
325
-- | Computes a queued job's status.
326
calcJobStatus :: QueuedJob -> JobStatus
327
calcJobStatus QueuedJob { qjOps = ops } =
328
  extractOpSt (map qoStatus ops) JOB_STATUS_QUEUED True
329
    where
330
      terminalStatus OP_STATUS_ERROR     = True
331
      terminalStatus OP_STATUS_CANCELING = True
332
      terminalStatus OP_STATUS_CANCELED  = True
333
      terminalStatus _                   = False
334
      softStatus     OP_STATUS_SUCCESS   = True
335
      softStatus     OP_STATUS_QUEUED    = True
336
      softStatus     _                   = False
337
      extractOpSt [] _ True = JOB_STATUS_SUCCESS
338
      extractOpSt [] d False = d
339
      extractOpSt (x:xs) d old_all
340
           | terminalStatus x = opStatusToJob x -- abort recursion
341
           | softStatus x     = extractOpSt xs d new_all -- continue unchanged
342
           | otherwise        = extractOpSt xs (opStatusToJob x) new_all
343
           where new_all = x == OP_STATUS_SUCCESS && old_all
344

    
345
-- | Determine if a job has started
346
jobStarted :: QueuedJob -> Bool
347
jobStarted = (> JOB_STATUS_QUEUED) . calcJobStatus
348

    
349
-- | Determine if a job is finalised.
350
jobFinalized :: QueuedJob -> Bool
351
jobFinalized = (> JOB_STATUS_RUNNING) . calcJobStatus
352

    
353
-- | Determine if a job is finalized and its timestamp is before
354
-- a given time.
355
jobArchivable :: Timestamp -> QueuedJob -> Bool
356
jobArchivable ts = liftA2 (&&) jobFinalized
357
  $ maybe False (< ts)
358
    .  liftA2 (<|>) qjEndTimestamp qjStartTimestamp
359

    
360
-- | Determine whether an opcode status is finalized.
361
opStatusFinalized :: OpStatus -> Bool
362
opStatusFinalized = (> OP_STATUS_RUNNING)
363

    
364
-- | Compute a job's priority.
365
calcJobPriority :: QueuedJob -> Int
366
calcJobPriority QueuedJob { qjOps = ops } =
367
  helper . map qoPriority $ filter (not . opStatusFinalized . qoStatus) ops
368
    where helper [] = C.opPrioDefault
369
          helper ps = minimum ps
370

    
371
-- | Log but ignore an 'IOError'.
372
ignoreIOError :: a -> Bool -> String -> IOError -> IO a
373
ignoreIOError a ignore_noent msg e = do
374
  unless (isDoesNotExistError e && ignore_noent) .
375
    logWarning $ msg ++ ": " ++ show e
376
  return a
377

    
378
-- | Compute the list of existing archive directories. Note that I/O
379
-- exceptions are swallowed and ignored.
380
allArchiveDirs :: FilePath -> IO [FilePath]
381
allArchiveDirs rootdir = do
382
  let adir = rootdir </> jobQueueArchiveSubDir
383
  contents <- getDirectoryContents adir `Control.Exception.catch`
384
               ignoreIOError [] False
385
                 ("Failed to list queue directory " ++ adir)
386
  let fpaths = map (adir </>) $ filter (not . ("." `isPrefixOf`)) contents
387
  filterM (\path ->
388
             liftM isDirectory (getFileStatus (adir </> path))
389
               `Control.Exception.catch`
390
               ignoreIOError False True
391
                 ("Failed to stat archive path " ++ path)) fpaths
392

    
393
-- | Build list of directories containing job files. Note: compared to
394
-- the Python version, this doesn't ignore a potential lost+found
395
-- file.
396
determineJobDirectories :: FilePath -> Bool -> IO [FilePath]
397
determineJobDirectories rootdir archived = do
398
  other <- if archived
399
             then allArchiveDirs rootdir
400
             else return []
401
  return $ rootdir:other
402

    
403
-- Function equivalent to the \'sequence\' function, that cannot be used because
404
-- of library version conflict on Lucid.
405
-- FIXME: delete this and just use \'sequence\' instead when Lucid compatibility
406
-- will not be required anymore.
407
sequencer :: [Either IOError [JobId]] -> Either IOError [[JobId]]
408
sequencer l = fmap reverse $ foldl seqFolder (Right []) l
409

    
410
-- | Folding function for joining multiple [JobIds] into one list.
411
seqFolder :: Either IOError [[JobId]]
412
          -> Either IOError [JobId]
413
          -> Either IOError [[JobId]]
414
seqFolder (Left e) _ = Left e
415
seqFolder (Right _) (Left e) = Left e
416
seqFolder (Right l) (Right el) = Right $ el:l
417

    
418
-- | Computes the list of all jobs in the given directories.
419
getJobIDs :: [FilePath] -> IO (Either IOError [JobId])
420
getJobIDs paths = liftM (fmap concat . sequencer) (mapM getDirJobIDs paths)
421

    
422
-- | Sorts the a list of job IDs.
423
sortJobIDs :: [JobId] -> [JobId]
424
sortJobIDs = sortBy (comparing fromJobId)
425

    
426
-- | Computes the list of jobs in a given directory.
427
getDirJobIDs :: FilePath -> IO (Either IOError [JobId])
428
getDirJobIDs path = do
429
  either_contents <-
430
    try (getDirectoryContents path) :: IO (Either IOError [FilePath])
431
  case either_contents of
432
    Left e -> do
433
      logWarning $ "Failed to list job directory " ++ path ++ ": " ++ show e
434
      return $ Left e
435
    Right contents -> do
436
      let jids = foldl (\ids file ->
437
                         case parseJobFileId file of
438
                           Nothing -> ids
439
                           Just new_id -> new_id:ids) [] contents
440
      return . Right $ reverse jids
441

    
442
-- | Reads the job data from disk.
443
readJobDataFromDisk :: FilePath -> Bool -> JobId -> IO (Maybe (String, Bool))
444
readJobDataFromDisk rootdir archived jid = do
445
  let live_path = liveJobFile rootdir jid
446
      archived_path = archivedJobFile rootdir jid
447
      all_paths = if archived
448
                    then [(live_path, False), (archived_path, True)]
449
                    else [(live_path, False)]
450
  foldM (\state (path, isarchived) ->
451
           liftM (\r -> Just (r, isarchived)) (readFile path)
452
             `Control.Exception.catch`
453
             ignoreIOError state True
454
               ("Failed to read job file " ++ path)) Nothing all_paths
455

    
456
-- | Failed to load job error.
457
noSuchJob :: Result (QueuedJob, Bool)
458
noSuchJob = Bad "Can't load job file"
459

    
460
-- | Loads a job from disk.
461
loadJobFromDisk :: FilePath -> Bool -> JobId -> IO (Result (QueuedJob, Bool))
462
loadJobFromDisk rootdir archived jid = do
463
  raw <- readJobDataFromDisk rootdir archived jid
464
  -- note: we need some stricness below, otherwise the wrapping in a
465
  -- Result will create too much lazyness, and not close the file
466
  -- descriptors for the individual jobs
467
  return $! case raw of
468
             Nothing -> noSuchJob
469
             Just (str, arch) ->
470
               liftM (\qj -> (qj, arch)) .
471
               fromJResult "Parsing job file" $ Text.JSON.decode str
472

    
473
-- | Write a job to disk.
474
writeJobToDisk :: FilePath -> QueuedJob -> IO (Result ())
475
writeJobToDisk rootdir job = do
476
  let filename = liveJobFile rootdir . qjId $ job
477
      content = Text.JSON.encode . Text.JSON.showJSON $ job
478
  tryAndLogIOError (atomicWriteFile filename content)
479
                   ("Failed to write " ++ filename) Ok
480

    
481
-- | Replicate a job to all master candidates.
482
replicateJob :: FilePath -> [Node] -> QueuedJob -> IO [(Node, ERpcError ())]
483
replicateJob rootdir mastercandidates job = do
484
  let filename = liveJobFile rootdir . qjId $ job
485
      content = Text.JSON.encode . Text.JSON.showJSON $ job
486
  filename' <- makeVirtualPath filename
487
  callresult <- executeRpcCall mastercandidates
488
                  $ RpcCallJobqueueUpdate filename' content
489
  let result = map (second (() <$)) callresult
490
  logRpcErrors result
491
  return result
492

    
493
-- | Replicate many jobs to all master candidates.
494
replicateManyJobs :: FilePath -> [Node] -> [QueuedJob] -> IO ()
495
replicateManyJobs rootdir mastercandidates =
496
  mapM_ (replicateJob rootdir mastercandidates)
497

    
498
-- | Read the job serial number from disk.
499
readSerialFromDisk :: IO (Result JobId)
500
readSerialFromDisk = do
501
  filename <- jobQueueSerialFile
502
  tryAndLogIOError (readFile filename) "Failed to read serial file"
503
                   (makeJobIdS . rStripSpace)
504

    
505
-- | Allocate new job ids.
506
-- To avoid races while accessing the serial file, the threads synchronize
507
-- over a lock, as usual provided by an MVar.
508
allocateJobIds :: [Node] -> MVar () -> Int -> IO (Result [JobId])
509
allocateJobIds mastercandidates lock n =
510
  if n <= 0
511
    then return . Bad $ "Can only allocate positive number of job ids"
512
    else do
513
      takeMVar lock
514
      rjobid <- readSerialFromDisk
515
      case rjobid of
516
        Bad s -> do
517
          putMVar lock ()
518
          return . Bad $ s
519
        Ok jid -> do
520
          let current = fromJobId jid
521
              serial_content = show (current + n) ++  "\n"
522
          serial <- jobQueueSerialFile
523
          write_result <- try $ atomicWriteFile serial serial_content
524
                          :: IO (Either IOError ())
525
          case write_result of
526
            Left e -> do
527
              putMVar lock ()
528
              let msg = "Failed to write serial file: " ++ show e
529
              logError msg
530
              return . Bad $ msg
531
            Right () -> do
532
              serial' <- makeVirtualPath serial
533
              _ <- executeRpcCall mastercandidates
534
                     $ RpcCallJobqueueUpdate serial' serial_content
535
              putMVar lock ()
536
              return $ mapM makeJobId [(current+1)..(current+n)]
537

    
538
-- | Allocate one new job id.
539
allocateJobId :: [Node] -> MVar () -> IO (Result JobId)
540
allocateJobId mastercandidates lock = do
541
  jids <- allocateJobIds mastercandidates lock 1
542
  return (jids >>= monadicThe "Failed to allocate precisely one Job ID")
543

    
544
-- | Decide if job queue is open
545
isQueueOpen :: IO Bool
546
isQueueOpen = liftM not (jobQueueDrainFile >>= doesFileExist)
547

    
548
-- | Start enqueued jobs, currently by handing them over to masterd.
549
startJobs :: [QueuedJob] -> IO ()
550
startJobs jobs = do
551
  socketpath <- defaultMasterSocket
552
  client <- getLuxiClient socketpath
553
  pickupResults <- mapM (flip callMethod client . PickupJob . qjId) jobs
554
  let failures = map show $ justBad pickupResults
555
  unless (null failures)
556
   . logWarning . (++) "Failed to notify masterd: " . commaJoin $ failures
557

    
558
-- | Try to cancel a job that has already been handed over to execution,
559
-- currently by asking masterd to cancel it.
560
cancelJob :: JobId -> IO (ErrorResult JSValue)
561
cancelJob jid = do
562
  socketpath <- defaultMasterSocket
563
  client <- getLuxiClient socketpath
564
  callMethod (CancelJob jid) client
565

    
566
-- | Permissions for the archive directories.
567
queueDirPermissions :: FilePermissions
568
queueDirPermissions = FilePermissions { fpOwner = Just C.masterdUser
569
                                      , fpGroup = Just C.daemonsGroup
570
                                      , fpPermissions = 0o0750
571
                                      }
572

    
573
-- | Try, at most until the given endtime, to archive some of the given
574
-- jobs, if they are older than the specified cut-off time; also replicate
575
-- archival of the additional jobs. Return the pair of the number of jobs
576
-- archived, and the number of jobs remaining int he queue, asuming the
577
-- given numbers about the not considered jobs.
578
archiveSomeJobsUntil :: ([JobId] -> IO ()) -- ^ replication function
579
                        -> FilePath -- ^ queue root directory
580
                        -> ClockTime -- ^ Endtime
581
                        -> Timestamp -- ^ cut-off time for archiving jobs
582
                        -> Int -- ^ number of jobs alread archived
583
                        -> [JobId] -- ^ Additional jobs to replicate
584
                        -> [JobId] -- ^ List of job-ids still to consider
585
                        -> IO (Int, Int)
586
archiveSomeJobsUntil replicateFn _ _ _ arch torepl [] = do
587
  unless (null torepl) . (>> return ())
588
   . forkIO $ replicateFn torepl
589
  return (arch, 0)
590

    
591
archiveSomeJobsUntil replicateFn qDir endt cutt arch torepl (jid:jids) = do
592
  let archiveMore = archiveSomeJobsUntil replicateFn qDir endt cutt
593
      continue = archiveMore arch torepl jids
594
      jidname = show $ fromJobId jid
595
  time <- getClockTime
596
  if time >= endt
597
    then do
598
      _ <- forkIO $ replicateFn torepl
599
      return (arch, length (jid:jids))
600
    else do
601
      logDebug $ "Inspecting job " ++ jidname ++ " for archival"
602
      loadResult <- loadJobFromDisk qDir False jid
603
      case loadResult of
604
        Bad _ -> continue
605
        Ok (job, _) ->
606
          if jobArchivable cutt job
607
            then do
608
              let live = liveJobFile qDir jid
609
                  archive = archivedJobFile qDir jid
610
              renameResult <- safeRenameFile queueDirPermissions
611
                                live archive
612
              case renameResult of
613
                Bad s -> do
614
                  logWarning $ "Renaming " ++ live ++ " to " ++ archive
615
                                 ++ " failed unexpectedly: " ++ s
616
                  continue
617
                Ok () -> do
618
                  let torepl' = jid:torepl
619
                  if length torepl' >= 10
620
                    then do
621
                      _ <- forkIO $ replicateFn torepl'
622
                      archiveMore (arch + 1) [] jids
623
                    else archiveMore (arch + 1) torepl' jids
624
            else continue
625

    
626
-- | Archive jobs older than the given time, but do not exceed the timeout for
627
-- carrying out this task.
628
archiveJobs :: ConfigData -- ^ cluster configuration
629
               -> Int  -- ^ time the job has to be in the past in order
630
                       -- to be archived
631
               -> Int -- ^ timeout
632
               -> [JobId] -- ^ jobs to consider
633
               -> IO (Int, Int)
634
archiveJobs cfg age timeout jids = do
635
  now <- getClockTime
636
  qDir <- queueDir
637
  let endtime = addToClockTime (noTimeDiff { tdSec = timeout }) now
638
      cuttime = if age < 0 then noTimestamp
639
                           else advanceTimestamp (- age) (fromClockTime now)
640
      mcs = Config.getMasterCandidates cfg
641
      replicateFn jobs = do
642
        let olds = map (liveJobFile qDir) jobs
643
            news = map (archivedJobFile qDir) jobs
644
        _ <- executeRpcCall mcs . RpcCallJobqueueRename $ zip olds news
645
        return ()
646
  archiveSomeJobsUntil replicateFn qDir endtime cuttime 0 [] jids