Statistics
| Branch: | Tag: | Revision:

root / src / Ganeti / JQueue.hs @ e181c8cd

History | View | Annotate | Download (25.4 kB)

1 aa79e62e Iustin Pop
{-| Implementation of the job queue.
2 aa79e62e Iustin Pop
3 aa79e62e Iustin Pop
-}
4 aa79e62e Iustin Pop
5 aa79e62e Iustin Pop
{-
6 aa79e62e Iustin Pop
7 aa79e62e Iustin Pop
Copyright (C) 2010, 2012 Google Inc.
8 aa79e62e Iustin Pop
9 aa79e62e Iustin Pop
This program is free software; you can redistribute it and/or modify
10 aa79e62e Iustin Pop
it under the terms of the GNU General Public License as published by
11 aa79e62e Iustin Pop
the Free Software Foundation; either version 2 of the License, or
12 aa79e62e Iustin Pop
(at your option) any later version.
13 aa79e62e Iustin Pop
14 aa79e62e Iustin Pop
This program is distributed in the hope that it will be useful, but
15 aa79e62e Iustin Pop
WITHOUT ANY WARRANTY; without even the implied warranty of
16 aa79e62e Iustin Pop
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
17 aa79e62e Iustin Pop
General Public License for more details.
18 aa79e62e Iustin Pop
19 aa79e62e Iustin Pop
You should have received a copy of the GNU General Public License
20 aa79e62e Iustin Pop
along with this program; if not, write to the Free Software
21 aa79e62e Iustin Pop
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 aa79e62e Iustin Pop
02110-1301, USA.
23 aa79e62e Iustin Pop
24 aa79e62e Iustin Pop
-}
25 aa79e62e Iustin Pop
26 aa79e62e Iustin Pop
module Ganeti.JQueue
27 a35a4f52 Klaus Aehlig
    ( queuedOpCodeFromMetaOpCode
28 1c1132f4 Klaus Aehlig
    , queuedJobFromOpCodes
29 a7ab381a Klaus Aehlig
    , changeOpCodePriority
30 a6b33b72 Klaus Aehlig
    , changeJobPriority
31 363dc9d6 Klaus Aehlig
    , cancelQueuedJob
32 4b887066 Petr Pudlak
    , failQueuedJob
33 ae66f3a9 Klaus Aehlig
    , fromClockTime
34 aa79e62e Iustin Pop
    , noTimestamp
35 c3a70209 Klaus Aehlig
    , currentTimestamp
36 8b5a4b9a Klaus Aehlig
    , advanceTimestamp
37 17aa37ff Klaus Aehlig
    , reasonTrailTimestamp
38 2af22d70 Klaus Aehlig
    , setReceivedTimestamp
39 65a3ff88 Michele Tartara
    , extendJobReasonTrail
40 6a2e71d9 Klaus Aehlig
    , getJobDependencies
41 aa79e62e Iustin Pop
    , opStatusFinalized
42 aa79e62e Iustin Pop
    , extractOpSummary
43 aa79e62e Iustin Pop
    , calcJobStatus
44 02e40b13 Klaus Aehlig
    , jobStarted
45 847df9e9 Klaus Aehlig
    , jobFinalized
46 370f63be Klaus Aehlig
    , jobArchivable
47 aa79e62e Iustin Pop
    , calcJobPriority
48 aa79e62e Iustin Pop
    , jobFileName
49 aa79e62e Iustin Pop
    , liveJobFile
50 aa79e62e Iustin Pop
    , archivedJobFile
51 aa79e62e Iustin Pop
    , determineJobDirectories
52 aa79e62e Iustin Pop
    , getJobIDs
53 aa79e62e Iustin Pop
    , sortJobIDs
54 aa79e62e Iustin Pop
    , loadJobFromDisk
55 aa79e62e Iustin Pop
    , noSuchJob
56 cef3f99f Klaus Aehlig
    , readSerialFromDisk
57 ae858516 Klaus Aehlig
    , allocateJobIds
58 ae858516 Klaus Aehlig
    , allocateJobId
59 b498ed42 Klaus Aehlig
    , writeJobToDisk
60 9fd653a4 Klaus Aehlig
    , replicateManyJobs
61 7b0a9096 Petr Pudlak
    , writeAndReplicateJob
62 1b94c0db Klaus Aehlig
    , isQueueOpen
63 ac0c5c6d Klaus Aehlig
    , startJobs
64 47c3c7b1 Klaus Aehlig
    , cancelJob
65 f23daea8 Klaus Aehlig
    , queueDirPermissions
66 c867cfe1 Klaus Aehlig
    , archiveJobs
67 a35a4f52 Klaus Aehlig
    -- re-export
68 a35a4f52 Klaus Aehlig
    , Timestamp
69 a35a4f52 Klaus Aehlig
    , InputOpCode(..)
70 a35a4f52 Klaus Aehlig
    , QueuedOpCode(..)
71 a35a4f52 Klaus Aehlig
    , QueuedJob(..)
72 aa79e62e Iustin Pop
    ) where
73 aa79e62e Iustin Pop
74 370f63be Klaus Aehlig
import Control.Applicative (liftA2, (<|>))
75 8b5a4b9a Klaus Aehlig
import Control.Arrow (first, second)
76 ec98ea2b Petr Pudlak
import Control.Concurrent (forkIO, threadDelay)
77 ae858516 Klaus Aehlig
import Control.Concurrent.MVar
78 aa79e62e Iustin Pop
import Control.Exception
79 58d29849 Klaus Aehlig
import Control.Lens (over)
80 aa79e62e Iustin Pop
import Control.Monad
81 ea7032da Petr Pudlak
import Control.Monad.IO.Class
82 ec98ea2b Petr Pudlak
import Control.Monad.Trans (lift)
83 ec98ea2b Petr Pudlak
import Control.Monad.Trans.Maybe
84 ec98ea2b Petr Pudlak
import Data.Functor ((<$), (<$>))
85 aa79e62e Iustin Pop
import Data.List
86 4b49a72b Klaus Aehlig
import Data.Maybe
87 aa79e62e Iustin Pop
import Data.Ord (comparing)
88 aa79e62e Iustin Pop
-- workaround what seems to be a bug in ghc 7.4's TH shadowing code
89 557f5dad Klaus Aehlig
import Prelude hiding (id, log)
90 aa79e62e Iustin Pop
import System.Directory
91 aa79e62e Iustin Pop
import System.FilePath
92 aa79e62e Iustin Pop
import System.IO.Error (isDoesNotExistError)
93 aa79e62e Iustin Pop
import System.Posix.Files
94 ec98ea2b Petr Pudlak
import System.Posix.Signals (sigTERM, signalProcess)
95 c3a70209 Klaus Aehlig
import System.Time
96 aa79e62e Iustin Pop
import qualified Text.JSON
97 aa79e62e Iustin Pop
import Text.JSON.Types
98 aa79e62e Iustin Pop
99 aa79e62e Iustin Pop
import Ganeti.BasicTypes
100 c867cfe1 Klaus Aehlig
import qualified Ganeti.Config as Config
101 aa79e62e Iustin Pop
import qualified Ganeti.Constants as C
102 ec98ea2b Petr Pudlak
import Ganeti.Errors (ErrorResult, ResultG)
103 58d29849 Klaus Aehlig
import Ganeti.JQueue.Lens (qoInputL, validOpCodeL)
104 a35a4f52 Klaus Aehlig
import Ganeti.JQueue.Objects
105 aa79e62e Iustin Pop
import Ganeti.JSON
106 aa79e62e Iustin Pop
import Ganeti.Logging
107 493d6920 Klaus Aehlig
import Ganeti.Luxi
108 c867cfe1 Klaus Aehlig
import Ganeti.Objects (ConfigData, Node)
109 aa79e62e Iustin Pop
import Ganeti.OpCodes
110 58d29849 Klaus Aehlig
import Ganeti.OpCodes.Lens (metaParamsL, opReasonL)
111 aa79e62e Iustin Pop
import Ganeti.Path
112 efb4c025 Petr Pudlak
import Ganeti.Query.Exec as Exec
113 b5a96995 Klaus Aehlig
import Ganeti.Rpc (executeRpcCall, ERpcError, logRpcErrors,
114 c867cfe1 Klaus Aehlig
                   RpcCallJobqueueUpdate(..), RpcCallJobqueueRename(..))
115 aa79e62e Iustin Pop
import Ganeti.Types
116 cef3f99f Klaus Aehlig
import Ganeti.Utils
117 5f6515b6 Petr Pudlak
import Ganeti.Utils.Atomic
118 ec98ea2b Petr Pudlak
import Ganeti.Utils.Livelock (Livelock, isDead)
119 77676415 Klaus Aehlig
import Ganeti.VCluster (makeVirtualPath)
120 aa79e62e Iustin Pop
121 aa79e62e Iustin Pop
-- * Data types
122 aa79e62e Iustin Pop
123 aa79e62e Iustin Pop
-- | Missing timestamp type.
124 aa79e62e Iustin Pop
noTimestamp :: Timestamp
125 aa79e62e Iustin Pop
noTimestamp = (-1, -1)
126 aa79e62e Iustin Pop
127 ae66f3a9 Klaus Aehlig
-- | Obtain a Timestamp from a given clock time
128 ae66f3a9 Klaus Aehlig
fromClockTime :: ClockTime -> Timestamp
129 ae66f3a9 Klaus Aehlig
fromClockTime (TOD ctime pico) =
130 ae66f3a9 Klaus Aehlig
  (fromIntegral ctime, fromIntegral $ pico `div` 1000000)
131 ae66f3a9 Klaus Aehlig
132 c3a70209 Klaus Aehlig
-- | Get the current time in the job-queue timestamp format.
133 c3a70209 Klaus Aehlig
currentTimestamp :: IO Timestamp
134 ae66f3a9 Klaus Aehlig
currentTimestamp = fromClockTime `liftM` getClockTime
135 c3a70209 Klaus Aehlig
136 8b5a4b9a Klaus Aehlig
-- | From a given timestamp, obtain the timestamp of the
137 8b5a4b9a Klaus Aehlig
-- time that is the given number of seconds later.
138 8b5a4b9a Klaus Aehlig
advanceTimestamp :: Int -> Timestamp -> Timestamp
139 8b5a4b9a Klaus Aehlig
advanceTimestamp = first . (+)
140 8b5a4b9a Klaus Aehlig
141 aa79e62e Iustin Pop
142 6a2e71d9 Klaus Aehlig
-- | From an InputOpCode obtain the MetaOpCode, if any.
143 6a2e71d9 Klaus Aehlig
toMetaOpCode :: InputOpCode -> [MetaOpCode]
144 6a2e71d9 Klaus Aehlig
toMetaOpCode (ValidOpCode mopc) = [mopc]
145 6a2e71d9 Klaus Aehlig
toMetaOpCode _ = []
146 6a2e71d9 Klaus Aehlig
147 aa79e62e Iustin Pop
-- | Invalid opcode summary.
148 aa79e62e Iustin Pop
invalidOp :: String
149 aa79e62e Iustin Pop
invalidOp = "INVALID_OP"
150 aa79e62e Iustin Pop
151 aa79e62e Iustin Pop
-- | Tries to extract the opcode summary from an 'InputOpCode'. This
152 aa79e62e Iustin Pop
-- duplicates some functionality from the 'opSummary' function in
153 aa79e62e Iustin Pop
-- "Ganeti.OpCodes".
154 aa79e62e Iustin Pop
extractOpSummary :: InputOpCode -> String
155 aa79e62e Iustin Pop
extractOpSummary (ValidOpCode metaop) = opSummary $ metaOpCode metaop
156 aa79e62e Iustin Pop
extractOpSummary (InvalidOpCode (JSObject o)) =
157 aa79e62e Iustin Pop
  case fromObjWithDefault (fromJSObject o) "OP_ID" ("OP_" ++ invalidOp) of
158 aa79e62e Iustin Pop
    Just s -> drop 3 s -- drop the OP_ prefix
159 aa79e62e Iustin Pop
    Nothing -> invalidOp
160 aa79e62e Iustin Pop
extractOpSummary _ = invalidOp
161 aa79e62e Iustin Pop
162 4b49a72b Klaus Aehlig
-- | Convenience function to obtain a QueuedOpCode from a MetaOpCode
163 4b49a72b Klaus Aehlig
queuedOpCodeFromMetaOpCode :: MetaOpCode -> QueuedOpCode
164 4b49a72b Klaus Aehlig
queuedOpCodeFromMetaOpCode op =
165 4b49a72b Klaus Aehlig
  QueuedOpCode { qoInput = ValidOpCode op
166 4b49a72b Klaus Aehlig
               , qoStatus = OP_STATUS_QUEUED
167 4b49a72b Klaus Aehlig
               , qoPriority = opSubmitPriorityToRaw . opPriority . metaParams
168 4b49a72b Klaus Aehlig
                              $ op
169 4b49a72b Klaus Aehlig
               , qoLog = []
170 4b49a72b Klaus Aehlig
               , qoResult = JSNull
171 4b49a72b Klaus Aehlig
               , qoStartTimestamp = Nothing
172 4b49a72b Klaus Aehlig
               , qoEndTimestamp = Nothing
173 4b49a72b Klaus Aehlig
               , qoExecTimestamp = Nothing
174 4b49a72b Klaus Aehlig
               }
175 4b49a72b Klaus Aehlig
176 1c1132f4 Klaus Aehlig
-- | From a job-id and a list of op-codes create a job. This is
177 1c1132f4 Klaus Aehlig
-- the pure part of job creation, as allocating a new job id
178 1c1132f4 Klaus Aehlig
-- lives in IO.
179 1c1132f4 Klaus Aehlig
queuedJobFromOpCodes :: (Monad m) => JobId -> [MetaOpCode] -> m QueuedJob
180 1c1132f4 Klaus Aehlig
queuedJobFromOpCodes jobid ops = do
181 1c1132f4 Klaus Aehlig
  ops' <- mapM (`resolveDependencies` jobid) ops
182 1c1132f4 Klaus Aehlig
  return QueuedJob { qjId = jobid
183 1c1132f4 Klaus Aehlig
                   , qjOps = map queuedOpCodeFromMetaOpCode ops'
184 65a3ff88 Michele Tartara
                   , qjReceivedTimestamp = Nothing
185 1c1132f4 Klaus Aehlig
                   , qjStartTimestamp = Nothing
186 1c1132f4 Klaus Aehlig
                   , qjEndTimestamp = Nothing
187 76b4ac58 Petr Pudlak
                   , qjLivelock = Nothing
188 76b4ac58 Petr Pudlak
                   , qjProcessId = Nothing
189 1c1132f4 Klaus Aehlig
                   }
190 1c1132f4 Klaus Aehlig
191 2af22d70 Klaus Aehlig
-- | Attach a received timestamp to a Queued Job.
192 2af22d70 Klaus Aehlig
setReceivedTimestamp :: Timestamp -> QueuedJob -> QueuedJob
193 2af22d70 Klaus Aehlig
setReceivedTimestamp ts job = job { qjReceivedTimestamp = Just ts }
194 2af22d70 Klaus Aehlig
195 65a3ff88 Michele Tartara
-- | Build a timestamp in the format expected by the reason trail (nanoseconds)
196 65a3ff88 Michele Tartara
-- starting from a JQueue Timestamp.
197 65a3ff88 Michele Tartara
reasonTrailTimestamp :: Timestamp -> Integer
198 65a3ff88 Michele Tartara
reasonTrailTimestamp (sec, micro) =
199 65a3ff88 Michele Tartara
  let sec' = toInteger sec
200 65a3ff88 Michele Tartara
      micro' = toInteger micro
201 65a3ff88 Michele Tartara
  in sec' * 1000000000 + micro' * 1000
202 65a3ff88 Michele Tartara
203 65a3ff88 Michele Tartara
-- | Append an element to the reason trail of an input opcode.
204 65a3ff88 Michele Tartara
extendInputOpCodeReasonTrail :: JobId -> Timestamp -> Int -> InputOpCode
205 65a3ff88 Michele Tartara
                             -> InputOpCode
206 65a3ff88 Michele Tartara
extendInputOpCodeReasonTrail _ _ _ op@(InvalidOpCode _) = op
207 65a3ff88 Michele Tartara
extendInputOpCodeReasonTrail jid ts i (ValidOpCode vOp) =
208 65a3ff88 Michele Tartara
  let metaP = metaParams vOp
209 65a3ff88 Michele Tartara
      op = metaOpCode vOp
210 65a3ff88 Michele Tartara
      trail = opReason metaP
211 65a3ff88 Michele Tartara
      reasonSrc = opReasonSrcID op
212 65a3ff88 Michele Tartara
      reasonText = "job=" ++ show (fromJobId jid) ++ ";index=" ++ show i
213 65a3ff88 Michele Tartara
      reason = (reasonSrc, reasonText, reasonTrailTimestamp ts)
214 65a3ff88 Michele Tartara
      trail' = trail ++ [reason]
215 65a3ff88 Michele Tartara
  in ValidOpCode $ vOp { metaParams = metaP { opReason = trail' } }
216 65a3ff88 Michele Tartara
217 65a3ff88 Michele Tartara
-- | Append an element to the reason trail of a queued opcode.
218 65a3ff88 Michele Tartara
extendOpCodeReasonTrail :: JobId -> Timestamp -> Int -> QueuedOpCode
219 65a3ff88 Michele Tartara
                        -> QueuedOpCode
220 65a3ff88 Michele Tartara
extendOpCodeReasonTrail jid ts i op =
221 65a3ff88 Michele Tartara
  let inOp = qoInput op
222 65a3ff88 Michele Tartara
  in op { qoInput = extendInputOpCodeReasonTrail jid ts i inOp }
223 65a3ff88 Michele Tartara
224 65a3ff88 Michele Tartara
-- | Append an element to the reason trail of all the OpCodes of a queued job.
225 65a3ff88 Michele Tartara
extendJobReasonTrail :: QueuedJob -> QueuedJob
226 65a3ff88 Michele Tartara
extendJobReasonTrail job =
227 65a3ff88 Michele Tartara
  let jobId = qjId job
228 65a3ff88 Michele Tartara
      mTimestamp = qjReceivedTimestamp job
229 65a3ff88 Michele Tartara
      -- This function is going to be called on QueuedJobs that already contain
230 65a3ff88 Michele Tartara
      -- a timestamp. But for safety reasons we cannot assume mTimestamp will
231 65a3ff88 Michele Tartara
      -- be (Just timestamp), so we use the value 0 in the extremely unlikely
232 65a3ff88 Michele Tartara
      -- case this is not true.
233 65a3ff88 Michele Tartara
      timestamp = fromMaybe (0, 0) mTimestamp
234 65a3ff88 Michele Tartara
    in job
235 65a3ff88 Michele Tartara
        { qjOps =
236 65a3ff88 Michele Tartara
            zipWith (extendOpCodeReasonTrail jobId timestamp) [0..] $
237 65a3ff88 Michele Tartara
              qjOps job
238 65a3ff88 Michele Tartara
        }
239 65a3ff88 Michele Tartara
240 6a2e71d9 Klaus Aehlig
-- | From a queued job obtain the list of jobs it depends on.
241 6a2e71d9 Klaus Aehlig
getJobDependencies :: QueuedJob -> [JobId]
242 6a2e71d9 Klaus Aehlig
getJobDependencies job = do
243 6a2e71d9 Klaus Aehlig
  op <- qjOps job
244 6a2e71d9 Klaus Aehlig
  mopc <- toMetaOpCode $ qoInput op
245 6a2e71d9 Klaus Aehlig
  dep <- fromMaybe [] . opDepends $ metaParams mopc
246 6a2e71d9 Klaus Aehlig
  getJobIdFromDependency dep
247 6a2e71d9 Klaus Aehlig
248 a7ab381a Klaus Aehlig
-- | Change the priority of a QueuedOpCode, if it is not already
249 a7ab381a Klaus Aehlig
-- finalized.
250 a7ab381a Klaus Aehlig
changeOpCodePriority :: Int -> QueuedOpCode -> QueuedOpCode
251 a7ab381a Klaus Aehlig
changeOpCodePriority prio op =
252 a7ab381a Klaus Aehlig
  if qoStatus op > OP_STATUS_RUNNING
253 a7ab381a Klaus Aehlig
     then op
254 a7ab381a Klaus Aehlig
     else op { qoPriority = prio }
255 a7ab381a Klaus Aehlig
256 363dc9d6 Klaus Aehlig
-- | Set the state of a QueuedOpCode to canceled.
257 363dc9d6 Klaus Aehlig
cancelOpCode :: Timestamp -> QueuedOpCode -> QueuedOpCode
258 363dc9d6 Klaus Aehlig
cancelOpCode now op =
259 363dc9d6 Klaus Aehlig
  op { qoStatus = OP_STATUS_CANCELED, qoEndTimestamp = Just now }
260 363dc9d6 Klaus Aehlig
261 a6b33b72 Klaus Aehlig
-- | Change the priority of a job, i.e., change the priority of the
262 a6b33b72 Klaus Aehlig
-- non-finalized opcodes.
263 a6b33b72 Klaus Aehlig
changeJobPriority :: Int -> QueuedJob -> QueuedJob
264 a6b33b72 Klaus Aehlig
changeJobPriority prio job =
265 a6b33b72 Klaus Aehlig
  job { qjOps = map (changeOpCodePriority prio) $ qjOps job }
266 a6b33b72 Klaus Aehlig
267 363dc9d6 Klaus Aehlig
-- | Transform a QueuedJob that has not been started into its canceled form.
268 363dc9d6 Klaus Aehlig
cancelQueuedJob :: Timestamp -> QueuedJob -> QueuedJob
269 363dc9d6 Klaus Aehlig
cancelQueuedJob now job =
270 363dc9d6 Klaus Aehlig
  let ops' = map (cancelOpCode now) $ qjOps job
271 4b887066 Petr Pudlak
  in job { qjOps = ops', qjEndTimestamp = Just now }
272 4b887066 Petr Pudlak
273 4b887066 Petr Pudlak
-- | Set the state of a QueuedOpCode to canceled.
274 58d29849 Klaus Aehlig
failOpCode :: ReasonElem -> Timestamp -> QueuedOpCode -> QueuedOpCode
275 58d29849 Klaus Aehlig
failOpCode reason now op =
276 58d29849 Klaus Aehlig
  over (qoInputL . validOpCodeL . metaParamsL . opReasonL) (++ [reason])
277 4b887066 Petr Pudlak
  op { qoStatus = OP_STATUS_ERROR, qoEndTimestamp = Just now }
278 4b887066 Petr Pudlak
279 4b887066 Petr Pudlak
-- | Transform a QueuedJob that has not been started into its canceled form.
280 58d29849 Klaus Aehlig
failQueuedJob :: ReasonElem -> Timestamp -> QueuedJob -> QueuedJob
281 58d29849 Klaus Aehlig
failQueuedJob reason now job =
282 58d29849 Klaus Aehlig
  let ops' = map (failOpCode reason now) $ qjOps job
283 4b887066 Petr Pudlak
  in job { qjOps = ops', qjEndTimestamp = Just now }
284 363dc9d6 Klaus Aehlig
285 aa79e62e Iustin Pop
-- | Job file prefix.
286 aa79e62e Iustin Pop
jobFilePrefix :: String
287 aa79e62e Iustin Pop
jobFilePrefix = "job-"
288 aa79e62e Iustin Pop
289 aa79e62e Iustin Pop
-- | Computes the filename for a given job ID.
290 aa79e62e Iustin Pop
jobFileName :: JobId -> FilePath
291 aa79e62e Iustin Pop
jobFileName jid = jobFilePrefix ++ show (fromJobId jid)
292 aa79e62e Iustin Pop
293 aa79e62e Iustin Pop
-- | Parses a job ID from a file name.
294 aa79e62e Iustin Pop
parseJobFileId :: (Monad m) => FilePath -> m JobId
295 aa79e62e Iustin Pop
parseJobFileId path =
296 aa79e62e Iustin Pop
  case stripPrefix jobFilePrefix path of
297 aa79e62e Iustin Pop
    Nothing -> fail $ "Job file '" ++ path ++
298 aa79e62e Iustin Pop
                      "' doesn't have the correct prefix"
299 aa79e62e Iustin Pop
    Just suffix -> makeJobIdS suffix
300 aa79e62e Iustin Pop
301 aa79e62e Iustin Pop
-- | Computes the full path to a live job.
302 aa79e62e Iustin Pop
liveJobFile :: FilePath -> JobId -> FilePath
303 aa79e62e Iustin Pop
liveJobFile rootdir jid = rootdir </> jobFileName jid
304 aa79e62e Iustin Pop
305 aa79e62e Iustin Pop
-- | Computes the full path to an archives job. BROKEN.
306 aa79e62e Iustin Pop
archivedJobFile :: FilePath -> JobId -> FilePath
307 aa79e62e Iustin Pop
archivedJobFile rootdir jid =
308 aa79e62e Iustin Pop
  let subdir = show (fromJobId jid `div` C.jstoreJobsPerArchiveDirectory)
309 aa79e62e Iustin Pop
  in rootdir </> jobQueueArchiveSubDir </> subdir </> jobFileName jid
310 aa79e62e Iustin Pop
311 aa79e62e Iustin Pop
-- | Map from opcode status to job status.
312 aa79e62e Iustin Pop
opStatusToJob :: OpStatus -> JobStatus
313 aa79e62e Iustin Pop
opStatusToJob OP_STATUS_QUEUED    = JOB_STATUS_QUEUED
314 aa79e62e Iustin Pop
opStatusToJob OP_STATUS_WAITING   = JOB_STATUS_WAITING
315 aa79e62e Iustin Pop
opStatusToJob OP_STATUS_SUCCESS   = JOB_STATUS_SUCCESS
316 aa79e62e Iustin Pop
opStatusToJob OP_STATUS_RUNNING   = JOB_STATUS_RUNNING
317 aa79e62e Iustin Pop
opStatusToJob OP_STATUS_CANCELING = JOB_STATUS_CANCELING
318 aa79e62e Iustin Pop
opStatusToJob OP_STATUS_CANCELED  = JOB_STATUS_CANCELED
319 aa79e62e Iustin Pop
opStatusToJob OP_STATUS_ERROR     = JOB_STATUS_ERROR
320 aa79e62e Iustin Pop
321 aa79e62e Iustin Pop
-- | Computes a queued job's status.
322 aa79e62e Iustin Pop
calcJobStatus :: QueuedJob -> JobStatus
323 aa79e62e Iustin Pop
calcJobStatus QueuedJob { qjOps = ops } =
324 aa79e62e Iustin Pop
  extractOpSt (map qoStatus ops) JOB_STATUS_QUEUED True
325 aa79e62e Iustin Pop
    where
326 aa79e62e Iustin Pop
      terminalStatus OP_STATUS_ERROR     = True
327 aa79e62e Iustin Pop
      terminalStatus OP_STATUS_CANCELING = True
328 aa79e62e Iustin Pop
      terminalStatus OP_STATUS_CANCELED  = True
329 aa79e62e Iustin Pop
      terminalStatus _                   = False
330 aa79e62e Iustin Pop
      softStatus     OP_STATUS_SUCCESS   = True
331 aa79e62e Iustin Pop
      softStatus     OP_STATUS_QUEUED    = True
332 aa79e62e Iustin Pop
      softStatus     _                   = False
333 aa79e62e Iustin Pop
      extractOpSt [] _ True = JOB_STATUS_SUCCESS
334 aa79e62e Iustin Pop
      extractOpSt [] d False = d
335 aa79e62e Iustin Pop
      extractOpSt (x:xs) d old_all
336 aa79e62e Iustin Pop
           | terminalStatus x = opStatusToJob x -- abort recursion
337 aa79e62e Iustin Pop
           | softStatus x     = extractOpSt xs d new_all -- continue unchanged
338 aa79e62e Iustin Pop
           | otherwise        = extractOpSt xs (opStatusToJob x) new_all
339 aa79e62e Iustin Pop
           where new_all = x == OP_STATUS_SUCCESS && old_all
340 aa79e62e Iustin Pop
341 02e40b13 Klaus Aehlig
-- | Determine if a job has started
342 02e40b13 Klaus Aehlig
jobStarted :: QueuedJob -> Bool
343 02e40b13 Klaus Aehlig
jobStarted = (> JOB_STATUS_QUEUED) . calcJobStatus
344 02e40b13 Klaus Aehlig
345 847df9e9 Klaus Aehlig
-- | Determine if a job is finalised.
346 847df9e9 Klaus Aehlig
jobFinalized :: QueuedJob -> Bool
347 847df9e9 Klaus Aehlig
jobFinalized = (> JOB_STATUS_RUNNING) . calcJobStatus
348 847df9e9 Klaus Aehlig
349 370f63be Klaus Aehlig
-- | Determine if a job is finalized and its timestamp is before
350 370f63be Klaus Aehlig
-- a given time.
351 370f63be Klaus Aehlig
jobArchivable :: Timestamp -> QueuedJob -> Bool
352 65a3ff88 Michele Tartara
jobArchivable ts = liftA2 (&&) jobFinalized
353 370f63be Klaus Aehlig
  $ maybe False (< ts)
354 370f63be Klaus Aehlig
    .  liftA2 (<|>) qjEndTimestamp qjStartTimestamp
355 370f63be Klaus Aehlig
356 aa79e62e Iustin Pop
-- | Determine whether an opcode status is finalized.
357 aa79e62e Iustin Pop
opStatusFinalized :: OpStatus -> Bool
358 aa79e62e Iustin Pop
opStatusFinalized = (> OP_STATUS_RUNNING)
359 aa79e62e Iustin Pop
360 aa79e62e Iustin Pop
-- | Compute a job's priority.
361 aa79e62e Iustin Pop
calcJobPriority :: QueuedJob -> Int
362 aa79e62e Iustin Pop
calcJobPriority QueuedJob { qjOps = ops } =
363 aa79e62e Iustin Pop
  helper . map qoPriority $ filter (not . opStatusFinalized . qoStatus) ops
364 aa79e62e Iustin Pop
    where helper [] = C.opPrioDefault
365 aa79e62e Iustin Pop
          helper ps = minimum ps
366 aa79e62e Iustin Pop
367 aa79e62e Iustin Pop
-- | Log but ignore an 'IOError'.
368 aa79e62e Iustin Pop
ignoreIOError :: a -> Bool -> String -> IOError -> IO a
369 aa79e62e Iustin Pop
ignoreIOError a ignore_noent msg e = do
370 aa79e62e Iustin Pop
  unless (isDoesNotExistError e && ignore_noent) .
371 aa79e62e Iustin Pop
    logWarning $ msg ++ ": " ++ show e
372 aa79e62e Iustin Pop
  return a
373 aa79e62e Iustin Pop
374 aa79e62e Iustin Pop
-- | Compute the list of existing archive directories. Note that I/O
375 aa79e62e Iustin Pop
-- exceptions are swallowed and ignored.
376 aa79e62e Iustin Pop
allArchiveDirs :: FilePath -> IO [FilePath]
377 aa79e62e Iustin Pop
allArchiveDirs rootdir = do
378 aa79e62e Iustin Pop
  let adir = rootdir </> jobQueueArchiveSubDir
379 aa79e62e Iustin Pop
  contents <- getDirectoryContents adir `Control.Exception.catch`
380 aa79e62e Iustin Pop
               ignoreIOError [] False
381 aa79e62e Iustin Pop
                 ("Failed to list queue directory " ++ adir)
382 aa79e62e Iustin Pop
  let fpaths = map (adir </>) $ filter (not . ("." `isPrefixOf`)) contents
383 aa79e62e Iustin Pop
  filterM (\path ->
384 aa79e62e Iustin Pop
             liftM isDirectory (getFileStatus (adir </> path))
385 aa79e62e Iustin Pop
               `Control.Exception.catch`
386 aa79e62e Iustin Pop
               ignoreIOError False True
387 aa79e62e Iustin Pop
                 ("Failed to stat archive path " ++ path)) fpaths
388 aa79e62e Iustin Pop
389 aa79e62e Iustin Pop
-- | Build list of directories containing job files. Note: compared to
390 aa79e62e Iustin Pop
-- the Python version, this doesn't ignore a potential lost+found
391 aa79e62e Iustin Pop
-- file.
392 aa79e62e Iustin Pop
determineJobDirectories :: FilePath -> Bool -> IO [FilePath]
393 aa79e62e Iustin Pop
determineJobDirectories rootdir archived = do
394 aa79e62e Iustin Pop
  other <- if archived
395 aa79e62e Iustin Pop
             then allArchiveDirs rootdir
396 aa79e62e Iustin Pop
             else return []
397 aa79e62e Iustin Pop
  return $ rootdir:other
398 aa79e62e Iustin Pop
399 aa79e62e Iustin Pop
-- | Computes the list of all jobs in the given directories.
400 ea7032da Petr Pudlak
getJobIDs :: [FilePath] -> IO (GenericResult IOError [JobId])
401 ea7032da Petr Pudlak
getJobIDs = runResultT . liftM concat . mapM getDirJobIDs
402 aa79e62e Iustin Pop
403 aa79e62e Iustin Pop
-- | Sorts the a list of job IDs.
404 aa79e62e Iustin Pop
sortJobIDs :: [JobId] -> [JobId]
405 aa79e62e Iustin Pop
sortJobIDs = sortBy (comparing fromJobId)
406 aa79e62e Iustin Pop
407 aa79e62e Iustin Pop
-- | Computes the list of jobs in a given directory.
408 ea7032da Petr Pudlak
getDirJobIDs :: FilePath -> ResultT IOError IO [JobId]
409 ea7032da Petr Pudlak
getDirJobIDs path =
410 ea7032da Petr Pudlak
  withErrorLogAt WARNING ("Failed to list job directory " ++ path) .
411 ea7032da Petr Pudlak
    liftM (mapMaybe parseJobFileId) $ liftIO (getDirectoryContents path)
412 aa79e62e Iustin Pop
413 aa79e62e Iustin Pop
-- | Reads the job data from disk.
414 aa79e62e Iustin Pop
readJobDataFromDisk :: FilePath -> Bool -> JobId -> IO (Maybe (String, Bool))
415 aa79e62e Iustin Pop
readJobDataFromDisk rootdir archived jid = do
416 aa79e62e Iustin Pop
  let live_path = liveJobFile rootdir jid
417 aa79e62e Iustin Pop
      archived_path = archivedJobFile rootdir jid
418 aa79e62e Iustin Pop
      all_paths = if archived
419 aa79e62e Iustin Pop
                    then [(live_path, False), (archived_path, True)]
420 aa79e62e Iustin Pop
                    else [(live_path, False)]
421 aa79e62e Iustin Pop
  foldM (\state (path, isarchived) ->
422 aa79e62e Iustin Pop
           liftM (\r -> Just (r, isarchived)) (readFile path)
423 aa79e62e Iustin Pop
             `Control.Exception.catch`
424 aa79e62e Iustin Pop
             ignoreIOError state True
425 aa79e62e Iustin Pop
               ("Failed to read job file " ++ path)) Nothing all_paths
426 aa79e62e Iustin Pop
427 aa79e62e Iustin Pop
-- | Failed to load job error.
428 aa79e62e Iustin Pop
noSuchJob :: Result (QueuedJob, Bool)
429 aa79e62e Iustin Pop
noSuchJob = Bad "Can't load job file"
430 aa79e62e Iustin Pop
431 aa79e62e Iustin Pop
-- | Loads a job from disk.
432 aa79e62e Iustin Pop
loadJobFromDisk :: FilePath -> Bool -> JobId -> IO (Result (QueuedJob, Bool))
433 aa79e62e Iustin Pop
loadJobFromDisk rootdir archived jid = do
434 aa79e62e Iustin Pop
  raw <- readJobDataFromDisk rootdir archived jid
435 aa79e62e Iustin Pop
  -- note: we need some stricness below, otherwise the wrapping in a
436 aa79e62e Iustin Pop
  -- Result will create too much lazyness, and not close the file
437 aa79e62e Iustin Pop
  -- descriptors for the individual jobs
438 aa79e62e Iustin Pop
  return $! case raw of
439 aa79e62e Iustin Pop
             Nothing -> noSuchJob
440 aa79e62e Iustin Pop
             Just (str, arch) ->
441 aa79e62e Iustin Pop
               liftM (\qj -> (qj, arch)) .
442 aa79e62e Iustin Pop
               fromJResult "Parsing job file" $ Text.JSON.decode str
443 cef3f99f Klaus Aehlig
444 b498ed42 Klaus Aehlig
-- | Write a job to disk.
445 b498ed42 Klaus Aehlig
writeJobToDisk :: FilePath -> QueuedJob -> IO (Result ())
446 b498ed42 Klaus Aehlig
writeJobToDisk rootdir job = do
447 b498ed42 Klaus Aehlig
  let filename = liveJobFile rootdir . qjId $ job
448 b498ed42 Klaus Aehlig
      content = Text.JSON.encode . Text.JSON.showJSON $ job
449 b498ed42 Klaus Aehlig
  tryAndLogIOError (atomicWriteFile filename content)
450 b498ed42 Klaus Aehlig
                   ("Failed to write " ++ filename) Ok
451 b498ed42 Klaus Aehlig
452 b5a96995 Klaus Aehlig
-- | Replicate a job to all master candidates.
453 b5a96995 Klaus Aehlig
replicateJob :: FilePath -> [Node] -> QueuedJob -> IO [(Node, ERpcError ())]
454 b5a96995 Klaus Aehlig
replicateJob rootdir mastercandidates job = do
455 b5a96995 Klaus Aehlig
  let filename = liveJobFile rootdir . qjId $ job
456 b5a96995 Klaus Aehlig
      content = Text.JSON.encode . Text.JSON.showJSON $ job
457 77676415 Klaus Aehlig
  filename' <- makeVirtualPath filename
458 557f5dad Klaus Aehlig
  callresult <- executeRpcCall mastercandidates
459 77676415 Klaus Aehlig
                  $ RpcCallJobqueueUpdate filename' content
460 557f5dad Klaus Aehlig
  let result = map (second (() <$)) callresult
461 8e527d04 Petr Pudlak
  _ <- logRpcErrors result
462 b5a96995 Klaus Aehlig
  return result
463 b5a96995 Klaus Aehlig
464 9fd653a4 Klaus Aehlig
-- | Replicate many jobs to all master candidates.
465 9fd653a4 Klaus Aehlig
replicateManyJobs :: FilePath -> [Node] -> [QueuedJob] -> IO ()
466 9fd653a4 Klaus Aehlig
replicateManyJobs rootdir mastercandidates =
467 9fd653a4 Klaus Aehlig
  mapM_ (replicateJob rootdir mastercandidates)
468 9fd653a4 Klaus Aehlig
469 7b0a9096 Petr Pudlak
-- | Writes a job to a file and replicates it to master candidates.
470 7b0a9096 Petr Pudlak
writeAndReplicateJob :: (Error e)
471 7b0a9096 Petr Pudlak
                     => ConfigData -> FilePath -> QueuedJob
472 7b0a9096 Petr Pudlak
                     -> ResultT e IO [(Node, ERpcError ())]
473 7b0a9096 Petr Pudlak
writeAndReplicateJob cfg rootdir job = do
474 7b0a9096 Petr Pudlak
  mkResultT $ writeJobToDisk rootdir job
475 7b0a9096 Petr Pudlak
  liftIO $ replicateJob rootdir (Config.getMasterCandidates cfg) job
476 7b0a9096 Petr Pudlak
477 cef3f99f Klaus Aehlig
-- | Read the job serial number from disk.
478 cef3f99f Klaus Aehlig
readSerialFromDisk :: IO (Result JobId)
479 cef3f99f Klaus Aehlig
readSerialFromDisk = do
480 cef3f99f Klaus Aehlig
  filename <- jobQueueSerialFile
481 cef3f99f Klaus Aehlig
  tryAndLogIOError (readFile filename) "Failed to read serial file"
482 cef3f99f Klaus Aehlig
                   (makeJobIdS . rStripSpace)
483 ae858516 Klaus Aehlig
484 ae858516 Klaus Aehlig
-- | Allocate new job ids.
485 ae858516 Klaus Aehlig
-- To avoid races while accessing the serial file, the threads synchronize
486 ae858516 Klaus Aehlig
-- over a lock, as usual provided by an MVar.
487 ae858516 Klaus Aehlig
allocateJobIds :: [Node] -> MVar () -> Int -> IO (Result [JobId])
488 ae858516 Klaus Aehlig
allocateJobIds mastercandidates lock n =
489 ae858516 Klaus Aehlig
  if n <= 0
490 ae858516 Klaus Aehlig
    then return . Bad $ "Can only allocate positive number of job ids"
491 ae858516 Klaus Aehlig
    else do
492 ae858516 Klaus Aehlig
      takeMVar lock
493 ae858516 Klaus Aehlig
      rjobid <- readSerialFromDisk
494 ae858516 Klaus Aehlig
      case rjobid of
495 ae858516 Klaus Aehlig
        Bad s -> do
496 ae858516 Klaus Aehlig
          putMVar lock ()
497 ae858516 Klaus Aehlig
          return . Bad $ s
498 ae858516 Klaus Aehlig
        Ok jid -> do
499 ae858516 Klaus Aehlig
          let current = fromJobId jid
500 ae858516 Klaus Aehlig
              serial_content = show (current + n) ++  "\n"
501 ae858516 Klaus Aehlig
          serial <- jobQueueSerialFile
502 ae858516 Klaus Aehlig
          write_result <- try $ atomicWriteFile serial serial_content
503 ae858516 Klaus Aehlig
                          :: IO (Either IOError ())
504 ae858516 Klaus Aehlig
          case write_result of
505 ae858516 Klaus Aehlig
            Left e -> do
506 f7819050 Klaus Aehlig
              putMVar lock ()
507 ae858516 Klaus Aehlig
              let msg = "Failed to write serial file: " ++ show e
508 ae858516 Klaus Aehlig
              logError msg
509 65a3ff88 Michele Tartara
              return . Bad $ msg
510 ae858516 Klaus Aehlig
            Right () -> do
511 77676415 Klaus Aehlig
              serial' <- makeVirtualPath serial
512 ae858516 Klaus Aehlig
              _ <- executeRpcCall mastercandidates
513 77676415 Klaus Aehlig
                     $ RpcCallJobqueueUpdate serial' serial_content
514 f7819050 Klaus Aehlig
              putMVar lock ()
515 ae858516 Klaus Aehlig
              return $ mapM makeJobId [(current+1)..(current+n)]
516 ae858516 Klaus Aehlig
517 ae858516 Klaus Aehlig
-- | Allocate one new job id.
518 ae858516 Klaus Aehlig
allocateJobId :: [Node] -> MVar () -> IO (Result JobId)
519 ae858516 Klaus Aehlig
allocateJobId mastercandidates lock = do
520 ae858516 Klaus Aehlig
  jids <- allocateJobIds mastercandidates lock 1
521 ae858516 Klaus Aehlig
  return (jids >>= monadicThe "Failed to allocate precisely one Job ID")
522 1b94c0db Klaus Aehlig
523 1b94c0db Klaus Aehlig
-- | Decide if job queue is open
524 1b94c0db Klaus Aehlig
isQueueOpen :: IO Bool
525 1b94c0db Klaus Aehlig
isQueueOpen = liftM not (jobQueueDrainFile >>= doesFileExist)
526 493d6920 Klaus Aehlig
527 efb4c025 Petr Pudlak
-- | Start enqueued jobs by executing the Python code.
528 efb4c025 Petr Pudlak
startJobs :: ConfigData
529 efb4c025 Petr Pudlak
          -> Livelock -- ^ Luxi's livelock path
530 efb4c025 Petr Pudlak
          -> [QueuedJob] -- ^ the list of jobs to start
531 efb4c025 Petr Pudlak
          -> IO [ErrorResult QueuedJob]
532 efb4c025 Petr Pudlak
startJobs cfg luxiLivelock jobs = do
533 efb4c025 Petr Pudlak
  qdir <- queueDir
534 ec98ea2b Petr Pudlak
  let updateJob job llfile =
535 ec98ea2b Petr Pudlak
        void . writeAndReplicateJob cfg qdir $ job { qjLivelock = Just llfile }
536 efb4c025 Petr Pudlak
  let runJob job = do
537 ec98ea2b Petr Pudlak
        (llfile, _) <- Exec.forkJobProcess (qjId job) luxiLivelock
538 ec98ea2b Petr Pudlak
                                           (updateJob job)
539 efb4c025 Petr Pudlak
        return $ job { qjLivelock = Just llfile }
540 efb4c025 Petr Pudlak
  mapM (runResultT . runJob) jobs
541 47c3c7b1 Klaus Aehlig
542 ec98ea2b Petr Pudlak
-- | Waits for a job to finalize its execution.
543 ec98ea2b Petr Pudlak
waitForJob :: JobId -> Int -> ResultG (Bool, String)
544 ec98ea2b Petr Pudlak
waitForJob jid tmout = do
545 ec98ea2b Petr Pudlak
  qDir <- liftIO queueDir
546 ec98ea2b Petr Pudlak
  let jobfile = liveJobFile qDir jid
547 ec98ea2b Petr Pudlak
      load = liftM fst <$> loadJobFromDisk qDir False jid
548 cab9400a Petr Pudlak
      finalizedR = genericResult (const False) jobFinalized
549 cab9400a Petr Pudlak
  jobR <- liftIO $ watchFileBy jobfile tmout finalizedR load
550 ec98ea2b Petr Pudlak
  case calcJobStatus <$> jobR of
551 ec98ea2b Petr Pudlak
    Ok s | s == JOB_STATUS_CANCELED ->
552 ec98ea2b Petr Pudlak
             return (True, "Job successfully cancelled")
553 cab9400a Petr Pudlak
         | finalizedR jobR ->
554 cab9400a Petr Pudlak
            return (False, "Job exited before it could have been canceled,\
555 cab9400a Petr Pudlak
                           \ status " ++ show s)
556 ec98ea2b Petr Pudlak
         | otherwise ->
557 cab9400a Petr Pudlak
             return (False, "Job could not have been cancelel, status "
558 cab9400a Petr Pudlak
                            ++ show s)
559 ec98ea2b Petr Pudlak
    Bad e -> failError $ "Can't read job status: " ++ e
560 ec98ea2b Petr Pudlak
561 47c3c7b1 Klaus Aehlig
-- | Try to cancel a job that has already been handed over to execution,
562 ec98ea2b Petr Pudlak
-- by terminating the process.
563 c666e6aa Petr Pudlak
cancelJob :: Livelock -- ^ Luxi's livelock path
564 c666e6aa Petr Pudlak
          -> JobId -- ^ the job to cancel
565 c666e6aa Petr Pudlak
          -> IO (ErrorResult (Bool, String))
566 c666e6aa Petr Pudlak
cancelJob luxiLivelock jid = runResultT $ do
567 ec98ea2b Petr Pudlak
  -- we can't terminate the job if it's just being started, so
568 ec98ea2b Petr Pudlak
  -- retry several times in such a case
569 ec98ea2b Petr Pudlak
  result <- runMaybeT . msum . flip map [0..5 :: Int] $ \tryNo -> do
570 ec98ea2b Petr Pudlak
    -- if we're retrying, sleep for some time
571 ec98ea2b Petr Pudlak
    when (tryNo > 0) . liftIO . threadDelay $ 100000 * (2 ^ tryNo)
572 ec98ea2b Petr Pudlak
573 ec98ea2b Petr Pudlak
    -- first check if the job is alive so that we don't kill some other
574 ec98ea2b Petr Pudlak
    -- process by accident
575 ec98ea2b Petr Pudlak
    qDir <- liftIO queueDir
576 ec98ea2b Petr Pudlak
    (job, _) <- lift . mkResultT $ loadJobFromDisk qDir True jid
577 ec98ea2b Petr Pudlak
    let jName = ("Job " ++) . show . fromJobId . qjId $ job
578 c666e6aa Petr Pudlak
    dead <- maybe (return False) (liftIO . isDead)
579 c666e6aa Petr Pudlak
            . mfilter (/= luxiLivelock)
580 c666e6aa Petr Pudlak
            $ qjLivelock job
581 ec98ea2b Petr Pudlak
    case qjProcessId job of
582 ec98ea2b Petr Pudlak
      _ | dead ->
583 ec98ea2b Petr Pudlak
        return (True, jName ++ " has been already dead")
584 ec98ea2b Petr Pudlak
      Just pid -> do
585 ec98ea2b Petr Pudlak
        liftIO $ signalProcess sigTERM pid
586 ec98ea2b Petr Pudlak
        lift $ waitForJob jid C.luxiCancelJobTimeout
587 ec98ea2b Petr Pudlak
      _ -> do
588 ec98ea2b Petr Pudlak
        logDebug $ jName ++ " in its startup phase, retrying"
589 ec98ea2b Petr Pudlak
        mzero
590 ec98ea2b Petr Pudlak
  return $ fromMaybe (False, "Timeout: job still in its startup phase") result
591 c867cfe1 Klaus Aehlig
592 f23daea8 Klaus Aehlig
-- | Permissions for the archive directories.
593 f23daea8 Klaus Aehlig
queueDirPermissions :: FilePermissions
594 f23daea8 Klaus Aehlig
queueDirPermissions = FilePermissions { fpOwner = Just C.masterdUser
595 f23daea8 Klaus Aehlig
                                      , fpGroup = Just C.daemonsGroup
596 f23daea8 Klaus Aehlig
                                      , fpPermissions = 0o0750
597 f23daea8 Klaus Aehlig
                                      }
598 f23daea8 Klaus Aehlig
599 c867cfe1 Klaus Aehlig
-- | Try, at most until the given endtime, to archive some of the given
600 c867cfe1 Klaus Aehlig
-- jobs, if they are older than the specified cut-off time; also replicate
601 c867cfe1 Klaus Aehlig
-- archival of the additional jobs. Return the pair of the number of jobs
602 c867cfe1 Klaus Aehlig
-- archived, and the number of jobs remaining int he queue, asuming the
603 c867cfe1 Klaus Aehlig
-- given numbers about the not considered jobs.
604 c867cfe1 Klaus Aehlig
archiveSomeJobsUntil :: ([JobId] -> IO ()) -- ^ replication function
605 c867cfe1 Klaus Aehlig
                        -> FilePath -- ^ queue root directory
606 c867cfe1 Klaus Aehlig
                        -> ClockTime -- ^ Endtime
607 c867cfe1 Klaus Aehlig
                        -> Timestamp -- ^ cut-off time for archiving jobs
608 c867cfe1 Klaus Aehlig
                        -> Int -- ^ number of jobs alread archived
609 c867cfe1 Klaus Aehlig
                        -> [JobId] -- ^ Additional jobs to replicate
610 c867cfe1 Klaus Aehlig
                        -> [JobId] -- ^ List of job-ids still to consider
611 c867cfe1 Klaus Aehlig
                        -> IO (Int, Int)
612 c867cfe1 Klaus Aehlig
archiveSomeJobsUntil replicateFn _ _ _ arch torepl [] = do
613 c867cfe1 Klaus Aehlig
  unless (null torepl) . (>> return ())
614 c867cfe1 Klaus Aehlig
   . forkIO $ replicateFn torepl
615 c867cfe1 Klaus Aehlig
  return (arch, 0)
616 c867cfe1 Klaus Aehlig
617 c867cfe1 Klaus Aehlig
archiveSomeJobsUntil replicateFn qDir endt cutt arch torepl (jid:jids) = do
618 c867cfe1 Klaus Aehlig
  let archiveMore = archiveSomeJobsUntil replicateFn qDir endt cutt
619 c867cfe1 Klaus Aehlig
      continue = archiveMore arch torepl jids
620 c867cfe1 Klaus Aehlig
      jidname = show $ fromJobId jid
621 c867cfe1 Klaus Aehlig
  time <- getClockTime
622 c867cfe1 Klaus Aehlig
  if time >= endt
623 c867cfe1 Klaus Aehlig
    then do
624 c867cfe1 Klaus Aehlig
      _ <- forkIO $ replicateFn torepl
625 c867cfe1 Klaus Aehlig
      return (arch, length (jid:jids))
626 c867cfe1 Klaus Aehlig
    else do
627 c867cfe1 Klaus Aehlig
      logDebug $ "Inspecting job " ++ jidname ++ " for archival"
628 c867cfe1 Klaus Aehlig
      loadResult <- loadJobFromDisk qDir False jid
629 c867cfe1 Klaus Aehlig
      case loadResult of
630 c867cfe1 Klaus Aehlig
        Bad _ -> continue
631 65a3ff88 Michele Tartara
        Ok (job, _) ->
632 c867cfe1 Klaus Aehlig
          if jobArchivable cutt job
633 c867cfe1 Klaus Aehlig
            then do
634 c867cfe1 Klaus Aehlig
              let live = liveJobFile qDir jid
635 c867cfe1 Klaus Aehlig
                  archive = archivedJobFile qDir jid
636 0c09ecc2 Klaus Aehlig
              renameResult <- safeRenameFile queueDirPermissions
637 0c09ecc2 Klaus Aehlig
                                live archive
638 65a3ff88 Michele Tartara
              case renameResult of
639 c867cfe1 Klaus Aehlig
                Bad s -> do
640 c867cfe1 Klaus Aehlig
                  logWarning $ "Renaming " ++ live ++ " to " ++ archive
641 c867cfe1 Klaus Aehlig
                                 ++ " failed unexpectedly: " ++ s
642 c867cfe1 Klaus Aehlig
                  continue
643 c867cfe1 Klaus Aehlig
                Ok () -> do
644 c867cfe1 Klaus Aehlig
                  let torepl' = jid:torepl
645 c867cfe1 Klaus Aehlig
                  if length torepl' >= 10
646 c867cfe1 Klaus Aehlig
                    then do
647 c867cfe1 Klaus Aehlig
                      _ <- forkIO $ replicateFn torepl'
648 c867cfe1 Klaus Aehlig
                      archiveMore (arch + 1) [] jids
649 c867cfe1 Klaus Aehlig
                    else archiveMore (arch + 1) torepl' jids
650 c867cfe1 Klaus Aehlig
            else continue
651 65a3ff88 Michele Tartara
652 c867cfe1 Klaus Aehlig
-- | Archive jobs older than the given time, but do not exceed the timeout for
653 c867cfe1 Klaus Aehlig
-- carrying out this task.
654 c867cfe1 Klaus Aehlig
archiveJobs :: ConfigData -- ^ cluster configuration
655 c867cfe1 Klaus Aehlig
               -> Int  -- ^ time the job has to be in the past in order
656 c867cfe1 Klaus Aehlig
                       -- to be archived
657 c867cfe1 Klaus Aehlig
               -> Int -- ^ timeout
658 c867cfe1 Klaus Aehlig
               -> [JobId] -- ^ jobs to consider
659 c867cfe1 Klaus Aehlig
               -> IO (Int, Int)
660 c867cfe1 Klaus Aehlig
archiveJobs cfg age timeout jids = do
661 c867cfe1 Klaus Aehlig
  now <- getClockTime
662 c867cfe1 Klaus Aehlig
  qDir <- queueDir
663 c867cfe1 Klaus Aehlig
  let endtime = addToClockTime (noTimeDiff { tdSec = timeout }) now
664 c867cfe1 Klaus Aehlig
      cuttime = if age < 0 then noTimestamp
665 c867cfe1 Klaus Aehlig
                           else advanceTimestamp (- age) (fromClockTime now)
666 c867cfe1 Klaus Aehlig
      mcs = Config.getMasterCandidates cfg
667 c867cfe1 Klaus Aehlig
      replicateFn jobs = do
668 c867cfe1 Klaus Aehlig
        let olds = map (liveJobFile qDir) jobs
669 c867cfe1 Klaus Aehlig
            news = map (archivedJobFile qDir) jobs
670 c867cfe1 Klaus Aehlig
        _ <- executeRpcCall mcs . RpcCallJobqueueRename $ zip olds news
671 c867cfe1 Klaus Aehlig
        return ()
672 c867cfe1 Klaus Aehlig
  archiveSomeJobsUntil replicateFn qDir endtime cuttime 0 [] jids