Statistics
| Branch: | Tag: | Revision:

root / src / Ganeti / JQueue.hs @ 65a3ff88

History | View | Annotate | Download (24.2 kB)

1 aa79e62e Iustin Pop
{-# LANGUAGE TemplateHaskell #-}
2 aa79e62e Iustin Pop
3 aa79e62e Iustin Pop
{-| Implementation of the job queue.
4 aa79e62e Iustin Pop
5 aa79e62e Iustin Pop
-}
6 aa79e62e Iustin Pop
7 aa79e62e Iustin Pop
{-
8 aa79e62e Iustin Pop
9 aa79e62e Iustin Pop
Copyright (C) 2010, 2012 Google Inc.
10 aa79e62e Iustin Pop
11 aa79e62e Iustin Pop
This program is free software; you can redistribute it and/or modify
12 aa79e62e Iustin Pop
it under the terms of the GNU General Public License as published by
13 aa79e62e Iustin Pop
the Free Software Foundation; either version 2 of the License, or
14 aa79e62e Iustin Pop
(at your option) any later version.
15 aa79e62e Iustin Pop
16 aa79e62e Iustin Pop
This program is distributed in the hope that it will be useful, but
17 aa79e62e Iustin Pop
WITHOUT ANY WARRANTY; without even the implied warranty of
18 aa79e62e Iustin Pop
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
19 aa79e62e Iustin Pop
General Public License for more details.
20 aa79e62e Iustin Pop
21 aa79e62e Iustin Pop
You should have received a copy of the GNU General Public License
22 aa79e62e Iustin Pop
along with this program; if not, write to the Free Software
23 aa79e62e Iustin Pop
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
24 aa79e62e Iustin Pop
02110-1301, USA.
25 aa79e62e Iustin Pop
26 aa79e62e Iustin Pop
-}
27 aa79e62e Iustin Pop
28 aa79e62e Iustin Pop
module Ganeti.JQueue
29 aa79e62e Iustin Pop
    ( QueuedOpCode(..)
30 aa79e62e Iustin Pop
    , QueuedJob(..)
31 aa79e62e Iustin Pop
    , InputOpCode(..)
32 4b49a72b Klaus Aehlig
    , queuedOpCodeFromMetaOpCode
33 1c1132f4 Klaus Aehlig
    , queuedJobFromOpCodes
34 a7ab381a Klaus Aehlig
    , changeOpCodePriority
35 a6b33b72 Klaus Aehlig
    , changeJobPriority
36 363dc9d6 Klaus Aehlig
    , cancelQueuedJob
37 aa79e62e Iustin Pop
    , Timestamp
38 ae66f3a9 Klaus Aehlig
    , fromClockTime
39 aa79e62e Iustin Pop
    , noTimestamp
40 c3a70209 Klaus Aehlig
    , currentTimestamp
41 8b5a4b9a Klaus Aehlig
    , advanceTimestamp
42 2af22d70 Klaus Aehlig
    , setReceivedTimestamp
43 65a3ff88 Michele Tartara
    , extendJobReasonTrail
44 aa79e62e Iustin Pop
    , opStatusFinalized
45 aa79e62e Iustin Pop
    , extractOpSummary
46 aa79e62e Iustin Pop
    , calcJobStatus
47 02e40b13 Klaus Aehlig
    , jobStarted
48 847df9e9 Klaus Aehlig
    , jobFinalized
49 370f63be Klaus Aehlig
    , jobArchivable
50 aa79e62e Iustin Pop
    , calcJobPriority
51 aa79e62e Iustin Pop
    , jobFileName
52 aa79e62e Iustin Pop
    , liveJobFile
53 aa79e62e Iustin Pop
    , archivedJobFile
54 aa79e62e Iustin Pop
    , determineJobDirectories
55 aa79e62e Iustin Pop
    , getJobIDs
56 aa79e62e Iustin Pop
    , sortJobIDs
57 aa79e62e Iustin Pop
    , loadJobFromDisk
58 aa79e62e Iustin Pop
    , noSuchJob
59 cef3f99f Klaus Aehlig
    , readSerialFromDisk
60 ae858516 Klaus Aehlig
    , allocateJobIds
61 ae858516 Klaus Aehlig
    , allocateJobId
62 b498ed42 Klaus Aehlig
    , writeJobToDisk
63 9fd653a4 Klaus Aehlig
    , replicateManyJobs
64 1b94c0db Klaus Aehlig
    , isQueueOpen
65 ac0c5c6d Klaus Aehlig
    , startJobs
66 47c3c7b1 Klaus Aehlig
    , cancelJob
67 f23daea8 Klaus Aehlig
    , queueDirPermissions
68 c867cfe1 Klaus Aehlig
    , archiveJobs
69 aa79e62e Iustin Pop
    ) where
70 aa79e62e Iustin Pop
71 370f63be Klaus Aehlig
import Control.Applicative (liftA2, (<|>))
72 8b5a4b9a Klaus Aehlig
import Control.Arrow (first, second)
73 c867cfe1 Klaus Aehlig
import Control.Concurrent (forkIO)
74 ae858516 Klaus Aehlig
import Control.Concurrent.MVar
75 aa79e62e Iustin Pop
import Control.Exception
76 aa79e62e Iustin Pop
import Control.Monad
77 557f5dad Klaus Aehlig
import Data.Functor ((<$))
78 aa79e62e Iustin Pop
import Data.List
79 4b49a72b Klaus Aehlig
import Data.Maybe
80 aa79e62e Iustin Pop
import Data.Ord (comparing)
81 aa79e62e Iustin Pop
-- workaround what seems to be a bug in ghc 7.4's TH shadowing code
82 557f5dad Klaus Aehlig
import Prelude hiding (id, log)
83 aa79e62e Iustin Pop
import System.Directory
84 aa79e62e Iustin Pop
import System.FilePath
85 aa79e62e Iustin Pop
import System.IO.Error (isDoesNotExistError)
86 aa79e62e Iustin Pop
import System.Posix.Files
87 c3a70209 Klaus Aehlig
import System.Time
88 aa79e62e Iustin Pop
import qualified Text.JSON
89 aa79e62e Iustin Pop
import Text.JSON.Types
90 aa79e62e Iustin Pop
91 aa79e62e Iustin Pop
import Ganeti.BasicTypes
92 c867cfe1 Klaus Aehlig
import qualified Ganeti.Config as Config
93 aa79e62e Iustin Pop
import qualified Ganeti.Constants as C
94 47c3c7b1 Klaus Aehlig
import Ganeti.Errors (ErrorResult)
95 aa79e62e Iustin Pop
import Ganeti.JSON
96 aa79e62e Iustin Pop
import Ganeti.Logging
97 493d6920 Klaus Aehlig
import Ganeti.Luxi
98 c867cfe1 Klaus Aehlig
import Ganeti.Objects (ConfigData, Node)
99 aa79e62e Iustin Pop
import Ganeti.OpCodes
100 aa79e62e Iustin Pop
import Ganeti.Path
101 b5a96995 Klaus Aehlig
import Ganeti.Rpc (executeRpcCall, ERpcError, logRpcErrors,
102 c867cfe1 Klaus Aehlig
                   RpcCallJobqueueUpdate(..), RpcCallJobqueueRename(..))
103 aa79e62e Iustin Pop
import Ganeti.THH
104 aa79e62e Iustin Pop
import Ganeti.Types
105 cef3f99f Klaus Aehlig
import Ganeti.Utils
106 77676415 Klaus Aehlig
import Ganeti.VCluster (makeVirtualPath)
107 aa79e62e Iustin Pop
108 aa79e62e Iustin Pop
-- * Data types
109 aa79e62e Iustin Pop
110 c3a70209 Klaus Aehlig
-- | The ganeti queue timestamp type. It represents the time as the pair
111 c3a70209 Klaus Aehlig
-- of seconds since the epoch and microseconds since the beginning of the
112 c3a70209 Klaus Aehlig
-- second.
113 aa79e62e Iustin Pop
type Timestamp = (Int, Int)
114 aa79e62e Iustin Pop
115 aa79e62e Iustin Pop
-- | Missing timestamp type.
116 aa79e62e Iustin Pop
noTimestamp :: Timestamp
117 aa79e62e Iustin Pop
noTimestamp = (-1, -1)
118 aa79e62e Iustin Pop
119 ae66f3a9 Klaus Aehlig
-- | Obtain a Timestamp from a given clock time
120 ae66f3a9 Klaus Aehlig
fromClockTime :: ClockTime -> Timestamp
121 ae66f3a9 Klaus Aehlig
fromClockTime (TOD ctime pico) =
122 ae66f3a9 Klaus Aehlig
  (fromIntegral ctime, fromIntegral $ pico `div` 1000000)
123 ae66f3a9 Klaus Aehlig
124 c3a70209 Klaus Aehlig
-- | Get the current time in the job-queue timestamp format.
125 c3a70209 Klaus Aehlig
currentTimestamp :: IO Timestamp
126 ae66f3a9 Klaus Aehlig
currentTimestamp = fromClockTime `liftM` getClockTime
127 c3a70209 Klaus Aehlig
128 8b5a4b9a Klaus Aehlig
-- | From a given timestamp, obtain the timestamp of the
129 8b5a4b9a Klaus Aehlig
-- time that is the given number of seconds later.
130 8b5a4b9a Klaus Aehlig
advanceTimestamp :: Int -> Timestamp -> Timestamp
131 8b5a4b9a Klaus Aehlig
advanceTimestamp = first . (+)
132 8b5a4b9a Klaus Aehlig
133 aa79e62e Iustin Pop
-- | An input opcode.
134 aa79e62e Iustin Pop
data InputOpCode = ValidOpCode MetaOpCode -- ^ OpCode was parsed successfully
135 aa79e62e Iustin Pop
                 | InvalidOpCode JSValue  -- ^ Invalid opcode
136 aa79e62e Iustin Pop
                   deriving (Show, Eq)
137 aa79e62e Iustin Pop
138 aa79e62e Iustin Pop
-- | JSON instance for 'InputOpCode', trying to parse it and if
139 aa79e62e Iustin Pop
-- failing, keeping the original JSValue.
140 aa79e62e Iustin Pop
instance Text.JSON.JSON InputOpCode where
141 aa79e62e Iustin Pop
  showJSON (ValidOpCode mo) = Text.JSON.showJSON mo
142 aa79e62e Iustin Pop
  showJSON (InvalidOpCode inv) = inv
143 aa79e62e Iustin Pop
  readJSON v = case Text.JSON.readJSON v of
144 aa79e62e Iustin Pop
                 Text.JSON.Error _ -> return $ InvalidOpCode v
145 aa79e62e Iustin Pop
                 Text.JSON.Ok mo -> return $ ValidOpCode mo
146 aa79e62e Iustin Pop
147 aa79e62e Iustin Pop
-- | Invalid opcode summary.
148 aa79e62e Iustin Pop
invalidOp :: String
149 aa79e62e Iustin Pop
invalidOp = "INVALID_OP"
150 aa79e62e Iustin Pop
151 aa79e62e Iustin Pop
-- | Tries to extract the opcode summary from an 'InputOpCode'. This
152 aa79e62e Iustin Pop
-- duplicates some functionality from the 'opSummary' function in
153 aa79e62e Iustin Pop
-- "Ganeti.OpCodes".
154 aa79e62e Iustin Pop
extractOpSummary :: InputOpCode -> String
155 aa79e62e Iustin Pop
extractOpSummary (ValidOpCode metaop) = opSummary $ metaOpCode metaop
156 aa79e62e Iustin Pop
extractOpSummary (InvalidOpCode (JSObject o)) =
157 aa79e62e Iustin Pop
  case fromObjWithDefault (fromJSObject o) "OP_ID" ("OP_" ++ invalidOp) of
158 aa79e62e Iustin Pop
    Just s -> drop 3 s -- drop the OP_ prefix
159 aa79e62e Iustin Pop
    Nothing -> invalidOp
160 aa79e62e Iustin Pop
extractOpSummary _ = invalidOp
161 aa79e62e Iustin Pop
162 aa79e62e Iustin Pop
$(buildObject "QueuedOpCode" "qo"
163 aa79e62e Iustin Pop
  [ simpleField "input"           [t| InputOpCode |]
164 aa79e62e Iustin Pop
  , simpleField "status"          [t| OpStatus    |]
165 aa79e62e Iustin Pop
  , simpleField "result"          [t| JSValue     |]
166 aa79e62e Iustin Pop
  , defaultField [| [] |] $
167 aa79e62e Iustin Pop
    simpleField "log"             [t| [(Int, Timestamp, ELogType, JSValue)] |]
168 aa79e62e Iustin Pop
  , simpleField "priority"        [t| Int         |]
169 aa79e62e Iustin Pop
  , optionalNullSerField $
170 aa79e62e Iustin Pop
    simpleField "start_timestamp" [t| Timestamp   |]
171 aa79e62e Iustin Pop
  , optionalNullSerField $
172 aa79e62e Iustin Pop
    simpleField "exec_timestamp"  [t| Timestamp   |]
173 aa79e62e Iustin Pop
  , optionalNullSerField $
174 aa79e62e Iustin Pop
    simpleField "end_timestamp"   [t| Timestamp   |]
175 aa79e62e Iustin Pop
  ])
176 aa79e62e Iustin Pop
177 aa79e62e Iustin Pop
$(buildObject "QueuedJob" "qj"
178 aa79e62e Iustin Pop
  [ simpleField "id"                 [t| JobId          |]
179 aa79e62e Iustin Pop
  , simpleField "ops"                [t| [QueuedOpCode] |]
180 aa79e62e Iustin Pop
  , optionalNullSerField $
181 aa79e62e Iustin Pop
    simpleField "received_timestamp" [t| Timestamp      |]
182 aa79e62e Iustin Pop
  , optionalNullSerField $
183 aa79e62e Iustin Pop
    simpleField "start_timestamp"    [t| Timestamp      |]
184 aa79e62e Iustin Pop
  , optionalNullSerField $
185 aa79e62e Iustin Pop
    simpleField "end_timestamp"      [t| Timestamp      |]
186 aa79e62e Iustin Pop
  ])
187 aa79e62e Iustin Pop
188 4b49a72b Klaus Aehlig
-- | Convenience function to obtain a QueuedOpCode from a MetaOpCode
189 4b49a72b Klaus Aehlig
queuedOpCodeFromMetaOpCode :: MetaOpCode -> QueuedOpCode
190 4b49a72b Klaus Aehlig
queuedOpCodeFromMetaOpCode op =
191 4b49a72b Klaus Aehlig
  QueuedOpCode { qoInput = ValidOpCode op
192 4b49a72b Klaus Aehlig
               , qoStatus = OP_STATUS_QUEUED
193 4b49a72b Klaus Aehlig
               , qoPriority = opSubmitPriorityToRaw . opPriority . metaParams
194 4b49a72b Klaus Aehlig
                              $ op
195 4b49a72b Klaus Aehlig
               , qoLog = []
196 4b49a72b Klaus Aehlig
               , qoResult = JSNull
197 4b49a72b Klaus Aehlig
               , qoStartTimestamp = Nothing
198 4b49a72b Klaus Aehlig
               , qoEndTimestamp = Nothing
199 4b49a72b Klaus Aehlig
               , qoExecTimestamp = Nothing
200 4b49a72b Klaus Aehlig
               }
201 4b49a72b Klaus Aehlig
202 1c1132f4 Klaus Aehlig
-- | From a job-id and a list of op-codes create a job. This is
203 1c1132f4 Klaus Aehlig
-- the pure part of job creation, as allocating a new job id
204 1c1132f4 Klaus Aehlig
-- lives in IO.
205 1c1132f4 Klaus Aehlig
queuedJobFromOpCodes :: (Monad m) => JobId -> [MetaOpCode] -> m QueuedJob
206 1c1132f4 Klaus Aehlig
queuedJobFromOpCodes jobid ops = do
207 1c1132f4 Klaus Aehlig
  ops' <- mapM (`resolveDependencies` jobid) ops
208 1c1132f4 Klaus Aehlig
  return QueuedJob { qjId = jobid
209 1c1132f4 Klaus Aehlig
                   , qjOps = map queuedOpCodeFromMetaOpCode ops'
210 65a3ff88 Michele Tartara
                   , qjReceivedTimestamp = Nothing
211 1c1132f4 Klaus Aehlig
                   , qjStartTimestamp = Nothing
212 1c1132f4 Klaus Aehlig
                   , qjEndTimestamp = Nothing
213 1c1132f4 Klaus Aehlig
                   }
214 1c1132f4 Klaus Aehlig
215 2af22d70 Klaus Aehlig
-- | Attach a received timestamp to a Queued Job.
216 2af22d70 Klaus Aehlig
setReceivedTimestamp :: Timestamp -> QueuedJob -> QueuedJob
217 2af22d70 Klaus Aehlig
setReceivedTimestamp ts job = job { qjReceivedTimestamp = Just ts }
218 2af22d70 Klaus Aehlig
219 65a3ff88 Michele Tartara
-- | Build a timestamp in the format expected by the reason trail (nanoseconds)
220 65a3ff88 Michele Tartara
-- starting from a JQueue Timestamp.
221 65a3ff88 Michele Tartara
reasonTrailTimestamp :: Timestamp -> Integer
222 65a3ff88 Michele Tartara
reasonTrailTimestamp (sec, micro) =
223 65a3ff88 Michele Tartara
  let sec' = toInteger sec
224 65a3ff88 Michele Tartara
      micro' = toInteger micro
225 65a3ff88 Michele Tartara
  in sec' * 1000000000 + micro' * 1000
226 65a3ff88 Michele Tartara
227 65a3ff88 Michele Tartara
-- | Append an element to the reason trail of an input opcode.
228 65a3ff88 Michele Tartara
extendInputOpCodeReasonTrail :: JobId -> Timestamp -> Int -> InputOpCode
229 65a3ff88 Michele Tartara
                             -> InputOpCode
230 65a3ff88 Michele Tartara
extendInputOpCodeReasonTrail _ _ _ op@(InvalidOpCode _) = op
231 65a3ff88 Michele Tartara
extendInputOpCodeReasonTrail jid ts i (ValidOpCode vOp) =
232 65a3ff88 Michele Tartara
  let metaP = metaParams vOp
233 65a3ff88 Michele Tartara
      op = metaOpCode vOp
234 65a3ff88 Michele Tartara
      trail = opReason metaP
235 65a3ff88 Michele Tartara
      reasonSrc = opReasonSrcID op
236 65a3ff88 Michele Tartara
      reasonText = "job=" ++ show (fromJobId jid) ++ ";index=" ++ show i
237 65a3ff88 Michele Tartara
      reason = (reasonSrc, reasonText, reasonTrailTimestamp ts)
238 65a3ff88 Michele Tartara
      trail' = trail ++ [reason]
239 65a3ff88 Michele Tartara
  in ValidOpCode $ vOp { metaParams = metaP { opReason = trail' } }
240 65a3ff88 Michele Tartara
241 65a3ff88 Michele Tartara
-- | Append an element to the reason trail of a queued opcode.
242 65a3ff88 Michele Tartara
extendOpCodeReasonTrail :: JobId -> Timestamp -> Int -> QueuedOpCode
243 65a3ff88 Michele Tartara
                        -> QueuedOpCode
244 65a3ff88 Michele Tartara
extendOpCodeReasonTrail jid ts i op =
245 65a3ff88 Michele Tartara
  let inOp = qoInput op
246 65a3ff88 Michele Tartara
  in op { qoInput = extendInputOpCodeReasonTrail jid ts i inOp }
247 65a3ff88 Michele Tartara
248 65a3ff88 Michele Tartara
-- | Append an element to the reason trail of all the OpCodes of a queued job.
249 65a3ff88 Michele Tartara
extendJobReasonTrail :: QueuedJob -> QueuedJob
250 65a3ff88 Michele Tartara
extendJobReasonTrail job =
251 65a3ff88 Michele Tartara
  let jobId = qjId job
252 65a3ff88 Michele Tartara
      mTimestamp = qjReceivedTimestamp job
253 65a3ff88 Michele Tartara
      -- This function is going to be called on QueuedJobs that already contain
254 65a3ff88 Michele Tartara
      -- a timestamp. But for safety reasons we cannot assume mTimestamp will
255 65a3ff88 Michele Tartara
      -- be (Just timestamp), so we use the value 0 in the extremely unlikely
256 65a3ff88 Michele Tartara
      -- case this is not true.
257 65a3ff88 Michele Tartara
      timestamp = fromMaybe (0, 0) mTimestamp
258 65a3ff88 Michele Tartara
    in job
259 65a3ff88 Michele Tartara
        { qjOps =
260 65a3ff88 Michele Tartara
            zipWith (extendOpCodeReasonTrail jobId timestamp) [0..] $
261 65a3ff88 Michele Tartara
              qjOps job
262 65a3ff88 Michele Tartara
        }
263 65a3ff88 Michele Tartara
264 a7ab381a Klaus Aehlig
-- | Change the priority of a QueuedOpCode, if it is not already
265 a7ab381a Klaus Aehlig
-- finalized.
266 a7ab381a Klaus Aehlig
changeOpCodePriority :: Int -> QueuedOpCode -> QueuedOpCode
267 a7ab381a Klaus Aehlig
changeOpCodePriority prio op =
268 a7ab381a Klaus Aehlig
  if qoStatus op > OP_STATUS_RUNNING
269 a7ab381a Klaus Aehlig
     then op
270 a7ab381a Klaus Aehlig
     else op { qoPriority = prio }
271 a7ab381a Klaus Aehlig
272 363dc9d6 Klaus Aehlig
-- | Set the state of a QueuedOpCode to canceled.
273 363dc9d6 Klaus Aehlig
cancelOpCode :: Timestamp -> QueuedOpCode -> QueuedOpCode
274 363dc9d6 Klaus Aehlig
cancelOpCode now op =
275 363dc9d6 Klaus Aehlig
  op { qoStatus = OP_STATUS_CANCELED, qoEndTimestamp = Just now }
276 363dc9d6 Klaus Aehlig
277 a6b33b72 Klaus Aehlig
-- | Change the priority of a job, i.e., change the priority of the
278 a6b33b72 Klaus Aehlig
-- non-finalized opcodes.
279 a6b33b72 Klaus Aehlig
changeJobPriority :: Int -> QueuedJob -> QueuedJob
280 a6b33b72 Klaus Aehlig
changeJobPriority prio job =
281 a6b33b72 Klaus Aehlig
  job { qjOps = map (changeOpCodePriority prio) $ qjOps job }
282 a6b33b72 Klaus Aehlig
283 363dc9d6 Klaus Aehlig
-- | Transform a QueuedJob that has not been started into its canceled form.
284 363dc9d6 Klaus Aehlig
cancelQueuedJob :: Timestamp -> QueuedJob -> QueuedJob
285 363dc9d6 Klaus Aehlig
cancelQueuedJob now job =
286 363dc9d6 Klaus Aehlig
  let ops' = map (cancelOpCode now) $ qjOps job
287 363dc9d6 Klaus Aehlig
  in job { qjOps = ops', qjEndTimestamp = Just now}
288 363dc9d6 Klaus Aehlig
289 aa79e62e Iustin Pop
-- | Job file prefix.
290 aa79e62e Iustin Pop
jobFilePrefix :: String
291 aa79e62e Iustin Pop
jobFilePrefix = "job-"
292 aa79e62e Iustin Pop
293 aa79e62e Iustin Pop
-- | Computes the filename for a given job ID.
294 aa79e62e Iustin Pop
jobFileName :: JobId -> FilePath
295 aa79e62e Iustin Pop
jobFileName jid = jobFilePrefix ++ show (fromJobId jid)
296 aa79e62e Iustin Pop
297 aa79e62e Iustin Pop
-- | Parses a job ID from a file name.
298 aa79e62e Iustin Pop
parseJobFileId :: (Monad m) => FilePath -> m JobId
299 aa79e62e Iustin Pop
parseJobFileId path =
300 aa79e62e Iustin Pop
  case stripPrefix jobFilePrefix path of
301 aa79e62e Iustin Pop
    Nothing -> fail $ "Job file '" ++ path ++
302 aa79e62e Iustin Pop
                      "' doesn't have the correct prefix"
303 aa79e62e Iustin Pop
    Just suffix -> makeJobIdS suffix
304 aa79e62e Iustin Pop
305 aa79e62e Iustin Pop
-- | Computes the full path to a live job.
306 aa79e62e Iustin Pop
liveJobFile :: FilePath -> JobId -> FilePath
307 aa79e62e Iustin Pop
liveJobFile rootdir jid = rootdir </> jobFileName jid
308 aa79e62e Iustin Pop
309 aa79e62e Iustin Pop
-- | Computes the full path to an archives job. BROKEN.
310 aa79e62e Iustin Pop
archivedJobFile :: FilePath -> JobId -> FilePath
311 aa79e62e Iustin Pop
archivedJobFile rootdir jid =
312 aa79e62e Iustin Pop
  let subdir = show (fromJobId jid `div` C.jstoreJobsPerArchiveDirectory)
313 aa79e62e Iustin Pop
  in rootdir </> jobQueueArchiveSubDir </> subdir </> jobFileName jid
314 aa79e62e Iustin Pop
315 aa79e62e Iustin Pop
-- | Map from opcode status to job status.
316 aa79e62e Iustin Pop
opStatusToJob :: OpStatus -> JobStatus
317 aa79e62e Iustin Pop
opStatusToJob OP_STATUS_QUEUED    = JOB_STATUS_QUEUED
318 aa79e62e Iustin Pop
opStatusToJob OP_STATUS_WAITING   = JOB_STATUS_WAITING
319 aa79e62e Iustin Pop
opStatusToJob OP_STATUS_SUCCESS   = JOB_STATUS_SUCCESS
320 aa79e62e Iustin Pop
opStatusToJob OP_STATUS_RUNNING   = JOB_STATUS_RUNNING
321 aa79e62e Iustin Pop
opStatusToJob OP_STATUS_CANCELING = JOB_STATUS_CANCELING
322 aa79e62e Iustin Pop
opStatusToJob OP_STATUS_CANCELED  = JOB_STATUS_CANCELED
323 aa79e62e Iustin Pop
opStatusToJob OP_STATUS_ERROR     = JOB_STATUS_ERROR
324 aa79e62e Iustin Pop
325 aa79e62e Iustin Pop
-- | Computes a queued job's status.
326 aa79e62e Iustin Pop
calcJobStatus :: QueuedJob -> JobStatus
327 aa79e62e Iustin Pop
calcJobStatus QueuedJob { qjOps = ops } =
328 aa79e62e Iustin Pop
  extractOpSt (map qoStatus ops) JOB_STATUS_QUEUED True
329 aa79e62e Iustin Pop
    where
330 aa79e62e Iustin Pop
      terminalStatus OP_STATUS_ERROR     = True
331 aa79e62e Iustin Pop
      terminalStatus OP_STATUS_CANCELING = True
332 aa79e62e Iustin Pop
      terminalStatus OP_STATUS_CANCELED  = True
333 aa79e62e Iustin Pop
      terminalStatus _                   = False
334 aa79e62e Iustin Pop
      softStatus     OP_STATUS_SUCCESS   = True
335 aa79e62e Iustin Pop
      softStatus     OP_STATUS_QUEUED    = True
336 aa79e62e Iustin Pop
      softStatus     _                   = False
337 aa79e62e Iustin Pop
      extractOpSt [] _ True = JOB_STATUS_SUCCESS
338 aa79e62e Iustin Pop
      extractOpSt [] d False = d
339 aa79e62e Iustin Pop
      extractOpSt (x:xs) d old_all
340 aa79e62e Iustin Pop
           | terminalStatus x = opStatusToJob x -- abort recursion
341 aa79e62e Iustin Pop
           | softStatus x     = extractOpSt xs d new_all -- continue unchanged
342 aa79e62e Iustin Pop
           | otherwise        = extractOpSt xs (opStatusToJob x) new_all
343 aa79e62e Iustin Pop
           where new_all = x == OP_STATUS_SUCCESS && old_all
344 aa79e62e Iustin Pop
345 02e40b13 Klaus Aehlig
-- | Determine if a job has started
346 02e40b13 Klaus Aehlig
jobStarted :: QueuedJob -> Bool
347 02e40b13 Klaus Aehlig
jobStarted = (> JOB_STATUS_QUEUED) . calcJobStatus
348 02e40b13 Klaus Aehlig
349 847df9e9 Klaus Aehlig
-- | Determine if a job is finalised.
350 847df9e9 Klaus Aehlig
jobFinalized :: QueuedJob -> Bool
351 847df9e9 Klaus Aehlig
jobFinalized = (> JOB_STATUS_RUNNING) . calcJobStatus
352 847df9e9 Klaus Aehlig
353 370f63be Klaus Aehlig
-- | Determine if a job is finalized and its timestamp is before
354 370f63be Klaus Aehlig
-- a given time.
355 370f63be Klaus Aehlig
jobArchivable :: Timestamp -> QueuedJob -> Bool
356 65a3ff88 Michele Tartara
jobArchivable ts = liftA2 (&&) jobFinalized
357 370f63be Klaus Aehlig
  $ maybe False (< ts)
358 370f63be Klaus Aehlig
    .  liftA2 (<|>) qjEndTimestamp qjStartTimestamp
359 370f63be Klaus Aehlig
360 aa79e62e Iustin Pop
-- | Determine whether an opcode status is finalized.
361 aa79e62e Iustin Pop
opStatusFinalized :: OpStatus -> Bool
362 aa79e62e Iustin Pop
opStatusFinalized = (> OP_STATUS_RUNNING)
363 aa79e62e Iustin Pop
364 aa79e62e Iustin Pop
-- | Compute a job's priority.
365 aa79e62e Iustin Pop
calcJobPriority :: QueuedJob -> Int
366 aa79e62e Iustin Pop
calcJobPriority QueuedJob { qjOps = ops } =
367 aa79e62e Iustin Pop
  helper . map qoPriority $ filter (not . opStatusFinalized . qoStatus) ops
368 aa79e62e Iustin Pop
    where helper [] = C.opPrioDefault
369 aa79e62e Iustin Pop
          helper ps = minimum ps
370 aa79e62e Iustin Pop
371 aa79e62e Iustin Pop
-- | Log but ignore an 'IOError'.
372 aa79e62e Iustin Pop
ignoreIOError :: a -> Bool -> String -> IOError -> IO a
373 aa79e62e Iustin Pop
ignoreIOError a ignore_noent msg e = do
374 aa79e62e Iustin Pop
  unless (isDoesNotExistError e && ignore_noent) .
375 aa79e62e Iustin Pop
    logWarning $ msg ++ ": " ++ show e
376 aa79e62e Iustin Pop
  return a
377 aa79e62e Iustin Pop
378 aa79e62e Iustin Pop
-- | Compute the list of existing archive directories. Note that I/O
379 aa79e62e Iustin Pop
-- exceptions are swallowed and ignored.
380 aa79e62e Iustin Pop
allArchiveDirs :: FilePath -> IO [FilePath]
381 aa79e62e Iustin Pop
allArchiveDirs rootdir = do
382 aa79e62e Iustin Pop
  let adir = rootdir </> jobQueueArchiveSubDir
383 aa79e62e Iustin Pop
  contents <- getDirectoryContents adir `Control.Exception.catch`
384 aa79e62e Iustin Pop
               ignoreIOError [] False
385 aa79e62e Iustin Pop
                 ("Failed to list queue directory " ++ adir)
386 aa79e62e Iustin Pop
  let fpaths = map (adir </>) $ filter (not . ("." `isPrefixOf`)) contents
387 aa79e62e Iustin Pop
  filterM (\path ->
388 aa79e62e Iustin Pop
             liftM isDirectory (getFileStatus (adir </> path))
389 aa79e62e Iustin Pop
               `Control.Exception.catch`
390 aa79e62e Iustin Pop
               ignoreIOError False True
391 aa79e62e Iustin Pop
                 ("Failed to stat archive path " ++ path)) fpaths
392 aa79e62e Iustin Pop
393 aa79e62e Iustin Pop
-- | Build list of directories containing job files. Note: compared to
394 aa79e62e Iustin Pop
-- the Python version, this doesn't ignore a potential lost+found
395 aa79e62e Iustin Pop
-- file.
396 aa79e62e Iustin Pop
determineJobDirectories :: FilePath -> Bool -> IO [FilePath]
397 aa79e62e Iustin Pop
determineJobDirectories rootdir archived = do
398 aa79e62e Iustin Pop
  other <- if archived
399 aa79e62e Iustin Pop
             then allArchiveDirs rootdir
400 aa79e62e Iustin Pop
             else return []
401 aa79e62e Iustin Pop
  return $ rootdir:other
402 aa79e62e Iustin Pop
403 3cecd73c Michele Tartara
-- Function equivalent to the \'sequence\' function, that cannot be used because
404 3cecd73c Michele Tartara
-- of library version conflict on Lucid.
405 3cecd73c Michele Tartara
-- FIXME: delete this and just use \'sequence\' instead when Lucid compatibility
406 3cecd73c Michele Tartara
-- will not be required anymore.
407 3cecd73c Michele Tartara
sequencer :: [Either IOError [JobId]] -> Either IOError [[JobId]]
408 3cecd73c Michele Tartara
sequencer l = fmap reverse $ foldl seqFolder (Right []) l
409 3cecd73c Michele Tartara
410 3cecd73c Michele Tartara
-- | Folding function for joining multiple [JobIds] into one list.
411 3cecd73c Michele Tartara
seqFolder :: Either IOError [[JobId]]
412 3cecd73c Michele Tartara
          -> Either IOError [JobId]
413 3cecd73c Michele Tartara
          -> Either IOError [[JobId]]
414 3cecd73c Michele Tartara
seqFolder (Left e) _ = Left e
415 3cecd73c Michele Tartara
seqFolder (Right _) (Left e) = Left e
416 3cecd73c Michele Tartara
seqFolder (Right l) (Right el) = Right $ el:l
417 3cecd73c Michele Tartara
418 aa79e62e Iustin Pop
-- | Computes the list of all jobs in the given directories.
419 be0cb2d7 Michele Tartara
getJobIDs :: [FilePath] -> IO (Either IOError [JobId])
420 3cecd73c Michele Tartara
getJobIDs paths = liftM (fmap concat . sequencer) (mapM getDirJobIDs paths)
421 aa79e62e Iustin Pop
422 aa79e62e Iustin Pop
-- | Sorts the a list of job IDs.
423 aa79e62e Iustin Pop
sortJobIDs :: [JobId] -> [JobId]
424 aa79e62e Iustin Pop
sortJobIDs = sortBy (comparing fromJobId)
425 aa79e62e Iustin Pop
426 aa79e62e Iustin Pop
-- | Computes the list of jobs in a given directory.
427 be0cb2d7 Michele Tartara
getDirJobIDs :: FilePath -> IO (Either IOError [JobId])
428 aa79e62e Iustin Pop
getDirJobIDs path = do
429 be0cb2d7 Michele Tartara
  either_contents <-
430 be0cb2d7 Michele Tartara
    try (getDirectoryContents path) :: IO (Either IOError [FilePath])
431 be0cb2d7 Michele Tartara
  case either_contents of
432 be0cb2d7 Michele Tartara
    Left e -> do
433 be0cb2d7 Michele Tartara
      logWarning $ "Failed to list job directory " ++ path ++ ": " ++ show e
434 be0cb2d7 Michele Tartara
      return $ Left e
435 be0cb2d7 Michele Tartara
    Right contents -> do
436 be0cb2d7 Michele Tartara
      let jids = foldl (\ids file ->
437 be0cb2d7 Michele Tartara
                         case parseJobFileId file of
438 be0cb2d7 Michele Tartara
                           Nothing -> ids
439 be0cb2d7 Michele Tartara
                           Just new_id -> new_id:ids) [] contents
440 be0cb2d7 Michele Tartara
      return . Right $ reverse jids
441 aa79e62e Iustin Pop
442 aa79e62e Iustin Pop
-- | Reads the job data from disk.
443 aa79e62e Iustin Pop
readJobDataFromDisk :: FilePath -> Bool -> JobId -> IO (Maybe (String, Bool))
444 aa79e62e Iustin Pop
readJobDataFromDisk rootdir archived jid = do
445 aa79e62e Iustin Pop
  let live_path = liveJobFile rootdir jid
446 aa79e62e Iustin Pop
      archived_path = archivedJobFile rootdir jid
447 aa79e62e Iustin Pop
      all_paths = if archived
448 aa79e62e Iustin Pop
                    then [(live_path, False), (archived_path, True)]
449 aa79e62e Iustin Pop
                    else [(live_path, False)]
450 aa79e62e Iustin Pop
  foldM (\state (path, isarchived) ->
451 aa79e62e Iustin Pop
           liftM (\r -> Just (r, isarchived)) (readFile path)
452 aa79e62e Iustin Pop
             `Control.Exception.catch`
453 aa79e62e Iustin Pop
             ignoreIOError state True
454 aa79e62e Iustin Pop
               ("Failed to read job file " ++ path)) Nothing all_paths
455 aa79e62e Iustin Pop
456 aa79e62e Iustin Pop
-- | Failed to load job error.
457 aa79e62e Iustin Pop
noSuchJob :: Result (QueuedJob, Bool)
458 aa79e62e Iustin Pop
noSuchJob = Bad "Can't load job file"
459 aa79e62e Iustin Pop
460 aa79e62e Iustin Pop
-- | Loads a job from disk.
461 aa79e62e Iustin Pop
loadJobFromDisk :: FilePath -> Bool -> JobId -> IO (Result (QueuedJob, Bool))
462 aa79e62e Iustin Pop
loadJobFromDisk rootdir archived jid = do
463 aa79e62e Iustin Pop
  raw <- readJobDataFromDisk rootdir archived jid
464 aa79e62e Iustin Pop
  -- note: we need some stricness below, otherwise the wrapping in a
465 aa79e62e Iustin Pop
  -- Result will create too much lazyness, and not close the file
466 aa79e62e Iustin Pop
  -- descriptors for the individual jobs
467 aa79e62e Iustin Pop
  return $! case raw of
468 aa79e62e Iustin Pop
             Nothing -> noSuchJob
469 aa79e62e Iustin Pop
             Just (str, arch) ->
470 aa79e62e Iustin Pop
               liftM (\qj -> (qj, arch)) .
471 aa79e62e Iustin Pop
               fromJResult "Parsing job file" $ Text.JSON.decode str
472 cef3f99f Klaus Aehlig
473 b498ed42 Klaus Aehlig
-- | Write a job to disk.
474 b498ed42 Klaus Aehlig
writeJobToDisk :: FilePath -> QueuedJob -> IO (Result ())
475 b498ed42 Klaus Aehlig
writeJobToDisk rootdir job = do
476 b498ed42 Klaus Aehlig
  let filename = liveJobFile rootdir . qjId $ job
477 b498ed42 Klaus Aehlig
      content = Text.JSON.encode . Text.JSON.showJSON $ job
478 b498ed42 Klaus Aehlig
  tryAndLogIOError (atomicWriteFile filename content)
479 b498ed42 Klaus Aehlig
                   ("Failed to write " ++ filename) Ok
480 b498ed42 Klaus Aehlig
481 b5a96995 Klaus Aehlig
-- | Replicate a job to all master candidates.
482 b5a96995 Klaus Aehlig
replicateJob :: FilePath -> [Node] -> QueuedJob -> IO [(Node, ERpcError ())]
483 b5a96995 Klaus Aehlig
replicateJob rootdir mastercandidates job = do
484 b5a96995 Klaus Aehlig
  let filename = liveJobFile rootdir . qjId $ job
485 b5a96995 Klaus Aehlig
      content = Text.JSON.encode . Text.JSON.showJSON $ job
486 77676415 Klaus Aehlig
  filename' <- makeVirtualPath filename
487 557f5dad Klaus Aehlig
  callresult <- executeRpcCall mastercandidates
488 77676415 Klaus Aehlig
                  $ RpcCallJobqueueUpdate filename' content
489 557f5dad Klaus Aehlig
  let result = map (second (() <$)) callresult
490 b5a96995 Klaus Aehlig
  logRpcErrors result
491 b5a96995 Klaus Aehlig
  return result
492 b5a96995 Klaus Aehlig
493 9fd653a4 Klaus Aehlig
-- | Replicate many jobs to all master candidates.
494 9fd653a4 Klaus Aehlig
replicateManyJobs :: FilePath -> [Node] -> [QueuedJob] -> IO ()
495 9fd653a4 Klaus Aehlig
replicateManyJobs rootdir mastercandidates =
496 9fd653a4 Klaus Aehlig
  mapM_ (replicateJob rootdir mastercandidates)
497 9fd653a4 Klaus Aehlig
498 cef3f99f Klaus Aehlig
-- | Read the job serial number from disk.
499 cef3f99f Klaus Aehlig
readSerialFromDisk :: IO (Result JobId)
500 cef3f99f Klaus Aehlig
readSerialFromDisk = do
501 cef3f99f Klaus Aehlig
  filename <- jobQueueSerialFile
502 cef3f99f Klaus Aehlig
  tryAndLogIOError (readFile filename) "Failed to read serial file"
503 cef3f99f Klaus Aehlig
                   (makeJobIdS . rStripSpace)
504 ae858516 Klaus Aehlig
505 ae858516 Klaus Aehlig
-- | Allocate new job ids.
506 ae858516 Klaus Aehlig
-- To avoid races while accessing the serial file, the threads synchronize
507 ae858516 Klaus Aehlig
-- over a lock, as usual provided by an MVar.
508 ae858516 Klaus Aehlig
allocateJobIds :: [Node] -> MVar () -> Int -> IO (Result [JobId])
509 ae858516 Klaus Aehlig
allocateJobIds mastercandidates lock n =
510 ae858516 Klaus Aehlig
  if n <= 0
511 ae858516 Klaus Aehlig
    then return . Bad $ "Can only allocate positive number of job ids"
512 ae858516 Klaus Aehlig
    else do
513 ae858516 Klaus Aehlig
      takeMVar lock
514 ae858516 Klaus Aehlig
      rjobid <- readSerialFromDisk
515 ae858516 Klaus Aehlig
      case rjobid of
516 ae858516 Klaus Aehlig
        Bad s -> do
517 ae858516 Klaus Aehlig
          putMVar lock ()
518 ae858516 Klaus Aehlig
          return . Bad $ s
519 ae858516 Klaus Aehlig
        Ok jid -> do
520 ae858516 Klaus Aehlig
          let current = fromJobId jid
521 ae858516 Klaus Aehlig
              serial_content = show (current + n) ++  "\n"
522 ae858516 Klaus Aehlig
          serial <- jobQueueSerialFile
523 ae858516 Klaus Aehlig
          write_result <- try $ atomicWriteFile serial serial_content
524 ae858516 Klaus Aehlig
                          :: IO (Either IOError ())
525 ae858516 Klaus Aehlig
          case write_result of
526 ae858516 Klaus Aehlig
            Left e -> do
527 f7819050 Klaus Aehlig
              putMVar lock ()
528 ae858516 Klaus Aehlig
              let msg = "Failed to write serial file: " ++ show e
529 ae858516 Klaus Aehlig
              logError msg
530 65a3ff88 Michele Tartara
              return . Bad $ msg
531 ae858516 Klaus Aehlig
            Right () -> do
532 77676415 Klaus Aehlig
              serial' <- makeVirtualPath serial
533 ae858516 Klaus Aehlig
              _ <- executeRpcCall mastercandidates
534 77676415 Klaus Aehlig
                     $ RpcCallJobqueueUpdate serial' serial_content
535 f7819050 Klaus Aehlig
              putMVar lock ()
536 ae858516 Klaus Aehlig
              return $ mapM makeJobId [(current+1)..(current+n)]
537 ae858516 Klaus Aehlig
538 ae858516 Klaus Aehlig
-- | Allocate one new job id.
539 ae858516 Klaus Aehlig
allocateJobId :: [Node] -> MVar () -> IO (Result JobId)
540 ae858516 Klaus Aehlig
allocateJobId mastercandidates lock = do
541 ae858516 Klaus Aehlig
  jids <- allocateJobIds mastercandidates lock 1
542 ae858516 Klaus Aehlig
  return (jids >>= monadicThe "Failed to allocate precisely one Job ID")
543 1b94c0db Klaus Aehlig
544 1b94c0db Klaus Aehlig
-- | Decide if job queue is open
545 1b94c0db Klaus Aehlig
isQueueOpen :: IO Bool
546 1b94c0db Klaus Aehlig
isQueueOpen = liftM not (jobQueueDrainFile >>= doesFileExist)
547 493d6920 Klaus Aehlig
548 ac0c5c6d Klaus Aehlig
-- | Start enqueued jobs, currently by handing them over to masterd.
549 ac0c5c6d Klaus Aehlig
startJobs :: [QueuedJob] -> IO ()
550 ac0c5c6d Klaus Aehlig
startJobs jobs = do
551 493d6920 Klaus Aehlig
  socketpath <- defaultMasterSocket
552 d605e261 Petr Pudlak
  client <- getLuxiClient socketpath
553 493d6920 Klaus Aehlig
  pickupResults <- mapM (flip callMethod client . PickupJob . qjId) jobs
554 493d6920 Klaus Aehlig
  let failures = map show $ justBad pickupResults
555 493d6920 Klaus Aehlig
  unless (null failures)
556 493d6920 Klaus Aehlig
   . logWarning . (++) "Failed to notify masterd: " . commaJoin $ failures
557 47c3c7b1 Klaus Aehlig
558 47c3c7b1 Klaus Aehlig
-- | Try to cancel a job that has already been handed over to execution,
559 47c3c7b1 Klaus Aehlig
-- currently by asking masterd to cancel it.
560 47c3c7b1 Klaus Aehlig
cancelJob :: JobId -> IO (ErrorResult JSValue)
561 47c3c7b1 Klaus Aehlig
cancelJob jid = do
562 47c3c7b1 Klaus Aehlig
  socketpath <- defaultMasterSocket
563 47c3c7b1 Klaus Aehlig
  client <- getLuxiClient socketpath
564 47c3c7b1 Klaus Aehlig
  callMethod (CancelJob jid) client
565 c867cfe1 Klaus Aehlig
566 f23daea8 Klaus Aehlig
-- | Permissions for the archive directories.
567 f23daea8 Klaus Aehlig
queueDirPermissions :: FilePermissions
568 f23daea8 Klaus Aehlig
queueDirPermissions = FilePermissions { fpOwner = Just C.masterdUser
569 f23daea8 Klaus Aehlig
                                      , fpGroup = Just C.daemonsGroup
570 f23daea8 Klaus Aehlig
                                      , fpPermissions = 0o0750
571 f23daea8 Klaus Aehlig
                                      }
572 f23daea8 Klaus Aehlig
573 c867cfe1 Klaus Aehlig
-- | Try, at most until the given endtime, to archive some of the given
574 c867cfe1 Klaus Aehlig
-- jobs, if they are older than the specified cut-off time; also replicate
575 c867cfe1 Klaus Aehlig
-- archival of the additional jobs. Return the pair of the number of jobs
576 c867cfe1 Klaus Aehlig
-- archived, and the number of jobs remaining int he queue, asuming the
577 c867cfe1 Klaus Aehlig
-- given numbers about the not considered jobs.
578 c867cfe1 Klaus Aehlig
archiveSomeJobsUntil :: ([JobId] -> IO ()) -- ^ replication function
579 c867cfe1 Klaus Aehlig
                        -> FilePath -- ^ queue root directory
580 c867cfe1 Klaus Aehlig
                        -> ClockTime -- ^ Endtime
581 c867cfe1 Klaus Aehlig
                        -> Timestamp -- ^ cut-off time for archiving jobs
582 c867cfe1 Klaus Aehlig
                        -> Int -- ^ number of jobs alread archived
583 c867cfe1 Klaus Aehlig
                        -> [JobId] -- ^ Additional jobs to replicate
584 c867cfe1 Klaus Aehlig
                        -> [JobId] -- ^ List of job-ids still to consider
585 c867cfe1 Klaus Aehlig
                        -> IO (Int, Int)
586 c867cfe1 Klaus Aehlig
archiveSomeJobsUntil replicateFn _ _ _ arch torepl [] = do
587 c867cfe1 Klaus Aehlig
  unless (null torepl) . (>> return ())
588 c867cfe1 Klaus Aehlig
   . forkIO $ replicateFn torepl
589 c867cfe1 Klaus Aehlig
  return (arch, 0)
590 c867cfe1 Klaus Aehlig
591 c867cfe1 Klaus Aehlig
archiveSomeJobsUntil replicateFn qDir endt cutt arch torepl (jid:jids) = do
592 c867cfe1 Klaus Aehlig
  let archiveMore = archiveSomeJobsUntil replicateFn qDir endt cutt
593 c867cfe1 Klaus Aehlig
      continue = archiveMore arch torepl jids
594 c867cfe1 Klaus Aehlig
      jidname = show $ fromJobId jid
595 c867cfe1 Klaus Aehlig
  time <- getClockTime
596 c867cfe1 Klaus Aehlig
  if time >= endt
597 c867cfe1 Klaus Aehlig
    then do
598 c867cfe1 Klaus Aehlig
      _ <- forkIO $ replicateFn torepl
599 c867cfe1 Klaus Aehlig
      return (arch, length (jid:jids))
600 c867cfe1 Klaus Aehlig
    else do
601 c867cfe1 Klaus Aehlig
      logDebug $ "Inspecting job " ++ jidname ++ " for archival"
602 c867cfe1 Klaus Aehlig
      loadResult <- loadJobFromDisk qDir False jid
603 c867cfe1 Klaus Aehlig
      case loadResult of
604 c867cfe1 Klaus Aehlig
        Bad _ -> continue
605 65a3ff88 Michele Tartara
        Ok (job, _) ->
606 c867cfe1 Klaus Aehlig
          if jobArchivable cutt job
607 c867cfe1 Klaus Aehlig
            then do
608 c867cfe1 Klaus Aehlig
              let live = liveJobFile qDir jid
609 c867cfe1 Klaus Aehlig
                  archive = archivedJobFile qDir jid
610 0c09ecc2 Klaus Aehlig
              renameResult <- safeRenameFile queueDirPermissions
611 0c09ecc2 Klaus Aehlig
                                live archive
612 65a3ff88 Michele Tartara
              case renameResult of
613 c867cfe1 Klaus Aehlig
                Bad s -> do
614 c867cfe1 Klaus Aehlig
                  logWarning $ "Renaming " ++ live ++ " to " ++ archive
615 c867cfe1 Klaus Aehlig
                                 ++ " failed unexpectedly: " ++ s
616 c867cfe1 Klaus Aehlig
                  continue
617 c867cfe1 Klaus Aehlig
                Ok () -> do
618 c867cfe1 Klaus Aehlig
                  let torepl' = jid:torepl
619 c867cfe1 Klaus Aehlig
                  if length torepl' >= 10
620 c867cfe1 Klaus Aehlig
                    then do
621 c867cfe1 Klaus Aehlig
                      _ <- forkIO $ replicateFn torepl'
622 c867cfe1 Klaus Aehlig
                      archiveMore (arch + 1) [] jids
623 c867cfe1 Klaus Aehlig
                    else archiveMore (arch + 1) torepl' jids
624 c867cfe1 Klaus Aehlig
            else continue
625 65a3ff88 Michele Tartara
626 c867cfe1 Klaus Aehlig
-- | Archive jobs older than the given time, but do not exceed the timeout for
627 c867cfe1 Klaus Aehlig
-- carrying out this task.
628 c867cfe1 Klaus Aehlig
archiveJobs :: ConfigData -- ^ cluster configuration
629 c867cfe1 Klaus Aehlig
               -> Int  -- ^ time the job has to be in the past in order
630 c867cfe1 Klaus Aehlig
                       -- to be archived
631 c867cfe1 Klaus Aehlig
               -> Int -- ^ timeout
632 c867cfe1 Klaus Aehlig
               -> [JobId] -- ^ jobs to consider
633 c867cfe1 Klaus Aehlig
               -> IO (Int, Int)
634 c867cfe1 Klaus Aehlig
archiveJobs cfg age timeout jids = do
635 c867cfe1 Klaus Aehlig
  now <- getClockTime
636 c867cfe1 Klaus Aehlig
  qDir <- queueDir
637 c867cfe1 Klaus Aehlig
  let endtime = addToClockTime (noTimeDiff { tdSec = timeout }) now
638 c867cfe1 Klaus Aehlig
      cuttime = if age < 0 then noTimestamp
639 c867cfe1 Klaus Aehlig
                           else advanceTimestamp (- age) (fromClockTime now)
640 c867cfe1 Klaus Aehlig
      mcs = Config.getMasterCandidates cfg
641 c867cfe1 Klaus Aehlig
      replicateFn jobs = do
642 c867cfe1 Klaus Aehlig
        let olds = map (liveJobFile qDir) jobs
643 c867cfe1 Klaus Aehlig
            news = map (archivedJobFile qDir) jobs
644 c867cfe1 Klaus Aehlig
        _ <- executeRpcCall mcs . RpcCallJobqueueRename $ zip olds news
645 c867cfe1 Klaus Aehlig
        return ()
646 c867cfe1 Klaus Aehlig
  archiveSomeJobsUntil replicateFn qDir endtime cuttime 0 [] jids