Statistics
| Branch: | Tag: | Revision:

root / src / Ganeti / JQueue.hs @ 9fd653a4

History | View | Annotate | Download (15 kB)

1 aa79e62e Iustin Pop
{-# LANGUAGE TemplateHaskell #-}
2 aa79e62e Iustin Pop
3 aa79e62e Iustin Pop
{-| Implementation of the job queue.
4 aa79e62e Iustin Pop
5 aa79e62e Iustin Pop
-}
6 aa79e62e Iustin Pop
7 aa79e62e Iustin Pop
{-
8 aa79e62e Iustin Pop
9 aa79e62e Iustin Pop
Copyright (C) 2010, 2012 Google Inc.
10 aa79e62e Iustin Pop
11 aa79e62e Iustin Pop
This program is free software; you can redistribute it and/or modify
12 aa79e62e Iustin Pop
it under the terms of the GNU General Public License as published by
13 aa79e62e Iustin Pop
the Free Software Foundation; either version 2 of the License, or
14 aa79e62e Iustin Pop
(at your option) any later version.
15 aa79e62e Iustin Pop
16 aa79e62e Iustin Pop
This program is distributed in the hope that it will be useful, but
17 aa79e62e Iustin Pop
WITHOUT ANY WARRANTY; without even the implied warranty of
18 aa79e62e Iustin Pop
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
19 aa79e62e Iustin Pop
General Public License for more details.
20 aa79e62e Iustin Pop
21 aa79e62e Iustin Pop
You should have received a copy of the GNU General Public License
22 aa79e62e Iustin Pop
along with this program; if not, write to the Free Software
23 aa79e62e Iustin Pop
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
24 aa79e62e Iustin Pop
02110-1301, USA.
25 aa79e62e Iustin Pop
26 aa79e62e Iustin Pop
-}
27 aa79e62e Iustin Pop
28 aa79e62e Iustin Pop
module Ganeti.JQueue
29 aa79e62e Iustin Pop
    ( QueuedOpCode(..)
30 aa79e62e Iustin Pop
    , QueuedJob(..)
31 aa79e62e Iustin Pop
    , InputOpCode(..)
32 4b49a72b Klaus Aehlig
    , queuedOpCodeFromMetaOpCode
33 1c1132f4 Klaus Aehlig
    , queuedJobFromOpCodes
34 aa79e62e Iustin Pop
    , Timestamp
35 aa79e62e Iustin Pop
    , noTimestamp
36 aa79e62e Iustin Pop
    , opStatusFinalized
37 aa79e62e Iustin Pop
    , extractOpSummary
38 aa79e62e Iustin Pop
    , calcJobStatus
39 aa79e62e Iustin Pop
    , calcJobPriority
40 aa79e62e Iustin Pop
    , jobFileName
41 aa79e62e Iustin Pop
    , liveJobFile
42 aa79e62e Iustin Pop
    , archivedJobFile
43 aa79e62e Iustin Pop
    , determineJobDirectories
44 aa79e62e Iustin Pop
    , getJobIDs
45 aa79e62e Iustin Pop
    , sortJobIDs
46 aa79e62e Iustin Pop
    , loadJobFromDisk
47 aa79e62e Iustin Pop
    , noSuchJob
48 cef3f99f Klaus Aehlig
    , readSerialFromDisk
49 ae858516 Klaus Aehlig
    , allocateJobIds
50 ae858516 Klaus Aehlig
    , allocateJobId
51 b498ed42 Klaus Aehlig
    , writeJobToDisk
52 9fd653a4 Klaus Aehlig
    , replicateManyJobs
53 1b94c0db Klaus Aehlig
    , isQueueOpen
54 aa79e62e Iustin Pop
    ) where
55 aa79e62e Iustin Pop
56 ae858516 Klaus Aehlig
import Control.Concurrent.MVar
57 aa79e62e Iustin Pop
import Control.Exception
58 aa79e62e Iustin Pop
import Control.Monad
59 aa79e62e Iustin Pop
import Data.List
60 4b49a72b Klaus Aehlig
import Data.Maybe
61 aa79e62e Iustin Pop
import Data.Ord (comparing)
62 aa79e62e Iustin Pop
-- workaround what seems to be a bug in ghc 7.4's TH shadowing code
63 aa79e62e Iustin Pop
import Prelude hiding (log, id)
64 aa79e62e Iustin Pop
import System.Directory
65 aa79e62e Iustin Pop
import System.FilePath
66 aa79e62e Iustin Pop
import System.IO.Error (isDoesNotExistError)
67 aa79e62e Iustin Pop
import System.Posix.Files
68 aa79e62e Iustin Pop
import qualified Text.JSON
69 aa79e62e Iustin Pop
import Text.JSON.Types
70 aa79e62e Iustin Pop
71 aa79e62e Iustin Pop
import Ganeti.BasicTypes
72 aa79e62e Iustin Pop
import qualified Ganeti.Constants as C
73 aa79e62e Iustin Pop
import Ganeti.JSON
74 aa79e62e Iustin Pop
import Ganeti.Logging
75 ae858516 Klaus Aehlig
import Ganeti.Objects (Node)
76 aa79e62e Iustin Pop
import Ganeti.OpCodes
77 aa79e62e Iustin Pop
import Ganeti.Path
78 b5a96995 Klaus Aehlig
import Ganeti.Rpc (executeRpcCall, ERpcError, logRpcErrors,
79 b5a96995 Klaus Aehlig
                   RpcCallJobqueueUpdate(..))
80 aa79e62e Iustin Pop
import Ganeti.THH
81 aa79e62e Iustin Pop
import Ganeti.Types
82 cef3f99f Klaus Aehlig
import Ganeti.Utils
83 aa79e62e Iustin Pop
84 aa79e62e Iustin Pop
-- * Data types
85 aa79e62e Iustin Pop
86 aa79e62e Iustin Pop
-- | The ganeti queue timestamp type
87 aa79e62e Iustin Pop
type Timestamp = (Int, Int)
88 aa79e62e Iustin Pop
89 aa79e62e Iustin Pop
-- | Missing timestamp type.
90 aa79e62e Iustin Pop
noTimestamp :: Timestamp
91 aa79e62e Iustin Pop
noTimestamp = (-1, -1)
92 aa79e62e Iustin Pop
93 aa79e62e Iustin Pop
-- | An input opcode.
94 aa79e62e Iustin Pop
data InputOpCode = ValidOpCode MetaOpCode -- ^ OpCode was parsed successfully
95 aa79e62e Iustin Pop
                 | InvalidOpCode JSValue  -- ^ Invalid opcode
96 aa79e62e Iustin Pop
                   deriving (Show, Eq)
97 aa79e62e Iustin Pop
98 aa79e62e Iustin Pop
-- | JSON instance for 'InputOpCode', trying to parse it and if
99 aa79e62e Iustin Pop
-- failing, keeping the original JSValue.
100 aa79e62e Iustin Pop
instance Text.JSON.JSON InputOpCode where
101 aa79e62e Iustin Pop
  showJSON (ValidOpCode mo) = Text.JSON.showJSON mo
102 aa79e62e Iustin Pop
  showJSON (InvalidOpCode inv) = inv
103 aa79e62e Iustin Pop
  readJSON v = case Text.JSON.readJSON v of
104 aa79e62e Iustin Pop
                 Text.JSON.Error _ -> return $ InvalidOpCode v
105 aa79e62e Iustin Pop
                 Text.JSON.Ok mo -> return $ ValidOpCode mo
106 aa79e62e Iustin Pop
107 aa79e62e Iustin Pop
-- | Invalid opcode summary.
108 aa79e62e Iustin Pop
invalidOp :: String
109 aa79e62e Iustin Pop
invalidOp = "INVALID_OP"
110 aa79e62e Iustin Pop
111 aa79e62e Iustin Pop
-- | Tries to extract the opcode summary from an 'InputOpCode'. This
112 aa79e62e Iustin Pop
-- duplicates some functionality from the 'opSummary' function in
113 aa79e62e Iustin Pop
-- "Ganeti.OpCodes".
114 aa79e62e Iustin Pop
extractOpSummary :: InputOpCode -> String
115 aa79e62e Iustin Pop
extractOpSummary (ValidOpCode metaop) = opSummary $ metaOpCode metaop
116 aa79e62e Iustin Pop
extractOpSummary (InvalidOpCode (JSObject o)) =
117 aa79e62e Iustin Pop
  case fromObjWithDefault (fromJSObject o) "OP_ID" ("OP_" ++ invalidOp) of
118 aa79e62e Iustin Pop
    Just s -> drop 3 s -- drop the OP_ prefix
119 aa79e62e Iustin Pop
    Nothing -> invalidOp
120 aa79e62e Iustin Pop
extractOpSummary _ = invalidOp
121 aa79e62e Iustin Pop
122 aa79e62e Iustin Pop
$(buildObject "QueuedOpCode" "qo"
123 aa79e62e Iustin Pop
  [ simpleField "input"           [t| InputOpCode |]
124 aa79e62e Iustin Pop
  , simpleField "status"          [t| OpStatus    |]
125 aa79e62e Iustin Pop
  , simpleField "result"          [t| JSValue     |]
126 aa79e62e Iustin Pop
  , defaultField [| [] |] $
127 aa79e62e Iustin Pop
    simpleField "log"             [t| [(Int, Timestamp, ELogType, JSValue)] |]
128 aa79e62e Iustin Pop
  , simpleField "priority"        [t| Int         |]
129 aa79e62e Iustin Pop
  , optionalNullSerField $
130 aa79e62e Iustin Pop
    simpleField "start_timestamp" [t| Timestamp   |]
131 aa79e62e Iustin Pop
  , optionalNullSerField $
132 aa79e62e Iustin Pop
    simpleField "exec_timestamp"  [t| Timestamp   |]
133 aa79e62e Iustin Pop
  , optionalNullSerField $
134 aa79e62e Iustin Pop
    simpleField "end_timestamp"   [t| Timestamp   |]
135 aa79e62e Iustin Pop
  ])
136 aa79e62e Iustin Pop
137 aa79e62e Iustin Pop
$(buildObject "QueuedJob" "qj"
138 aa79e62e Iustin Pop
  [ simpleField "id"                 [t| JobId          |]
139 aa79e62e Iustin Pop
  , simpleField "ops"                [t| [QueuedOpCode] |]
140 aa79e62e Iustin Pop
  , optionalNullSerField $
141 aa79e62e Iustin Pop
    simpleField "received_timestamp" [t| Timestamp      |]
142 aa79e62e Iustin Pop
  , optionalNullSerField $
143 aa79e62e Iustin Pop
    simpleField "start_timestamp"    [t| Timestamp      |]
144 aa79e62e Iustin Pop
  , optionalNullSerField $
145 aa79e62e Iustin Pop
    simpleField "end_timestamp"      [t| Timestamp      |]
146 aa79e62e Iustin Pop
  ])
147 aa79e62e Iustin Pop
148 4b49a72b Klaus Aehlig
-- | Convenience function to obtain a QueuedOpCode from a MetaOpCode
149 4b49a72b Klaus Aehlig
queuedOpCodeFromMetaOpCode :: MetaOpCode -> QueuedOpCode
150 4b49a72b Klaus Aehlig
queuedOpCodeFromMetaOpCode op =
151 4b49a72b Klaus Aehlig
  QueuedOpCode { qoInput = ValidOpCode op
152 4b49a72b Klaus Aehlig
               , qoStatus = OP_STATUS_QUEUED
153 4b49a72b Klaus Aehlig
               , qoPriority = opSubmitPriorityToRaw . opPriority . metaParams
154 4b49a72b Klaus Aehlig
                              $ op
155 4b49a72b Klaus Aehlig
               , qoLog = []
156 4b49a72b Klaus Aehlig
               , qoResult = JSNull
157 4b49a72b Klaus Aehlig
               , qoStartTimestamp = Nothing
158 4b49a72b Klaus Aehlig
               , qoEndTimestamp = Nothing
159 4b49a72b Klaus Aehlig
               , qoExecTimestamp = Nothing
160 4b49a72b Klaus Aehlig
               }
161 4b49a72b Klaus Aehlig
162 1c1132f4 Klaus Aehlig
-- | From a job-id and a list of op-codes create a job. This is
163 1c1132f4 Klaus Aehlig
-- the pure part of job creation, as allocating a new job id
164 1c1132f4 Klaus Aehlig
-- lives in IO.
165 1c1132f4 Klaus Aehlig
queuedJobFromOpCodes :: (Monad m) => JobId -> [MetaOpCode] -> m QueuedJob
166 1c1132f4 Klaus Aehlig
queuedJobFromOpCodes jobid ops = do
167 1c1132f4 Klaus Aehlig
  ops' <- mapM (`resolveDependencies` jobid) ops
168 1c1132f4 Klaus Aehlig
  return QueuedJob { qjId = jobid
169 1c1132f4 Klaus Aehlig
                   , qjOps = map queuedOpCodeFromMetaOpCode ops'
170 1c1132f4 Klaus Aehlig
                   , qjReceivedTimestamp = Nothing 
171 1c1132f4 Klaus Aehlig
                   , qjStartTimestamp = Nothing
172 1c1132f4 Klaus Aehlig
                   , qjEndTimestamp = Nothing
173 1c1132f4 Klaus Aehlig
                   }
174 1c1132f4 Klaus Aehlig
175 aa79e62e Iustin Pop
-- | Job file prefix.
176 aa79e62e Iustin Pop
jobFilePrefix :: String
177 aa79e62e Iustin Pop
jobFilePrefix = "job-"
178 aa79e62e Iustin Pop
179 aa79e62e Iustin Pop
-- | Computes the filename for a given job ID.
180 aa79e62e Iustin Pop
jobFileName :: JobId -> FilePath
181 aa79e62e Iustin Pop
jobFileName jid = jobFilePrefix ++ show (fromJobId jid)
182 aa79e62e Iustin Pop
183 aa79e62e Iustin Pop
-- | Parses a job ID from a file name.
184 aa79e62e Iustin Pop
parseJobFileId :: (Monad m) => FilePath -> m JobId
185 aa79e62e Iustin Pop
parseJobFileId path =
186 aa79e62e Iustin Pop
  case stripPrefix jobFilePrefix path of
187 aa79e62e Iustin Pop
    Nothing -> fail $ "Job file '" ++ path ++
188 aa79e62e Iustin Pop
                      "' doesn't have the correct prefix"
189 aa79e62e Iustin Pop
    Just suffix -> makeJobIdS suffix
190 aa79e62e Iustin Pop
191 aa79e62e Iustin Pop
-- | Computes the full path to a live job.
192 aa79e62e Iustin Pop
liveJobFile :: FilePath -> JobId -> FilePath
193 aa79e62e Iustin Pop
liveJobFile rootdir jid = rootdir </> jobFileName jid
194 aa79e62e Iustin Pop
195 aa79e62e Iustin Pop
-- | Computes the full path to an archives job. BROKEN.
196 aa79e62e Iustin Pop
archivedJobFile :: FilePath -> JobId -> FilePath
197 aa79e62e Iustin Pop
archivedJobFile rootdir jid =
198 aa79e62e Iustin Pop
  let subdir = show (fromJobId jid `div` C.jstoreJobsPerArchiveDirectory)
199 aa79e62e Iustin Pop
  in rootdir </> jobQueueArchiveSubDir </> subdir </> jobFileName jid
200 aa79e62e Iustin Pop
201 aa79e62e Iustin Pop
-- | Map from opcode status to job status.
202 aa79e62e Iustin Pop
opStatusToJob :: OpStatus -> JobStatus
203 aa79e62e Iustin Pop
opStatusToJob OP_STATUS_QUEUED    = JOB_STATUS_QUEUED
204 aa79e62e Iustin Pop
opStatusToJob OP_STATUS_WAITING   = JOB_STATUS_WAITING
205 aa79e62e Iustin Pop
opStatusToJob OP_STATUS_SUCCESS   = JOB_STATUS_SUCCESS
206 aa79e62e Iustin Pop
opStatusToJob OP_STATUS_RUNNING   = JOB_STATUS_RUNNING
207 aa79e62e Iustin Pop
opStatusToJob OP_STATUS_CANCELING = JOB_STATUS_CANCELING
208 aa79e62e Iustin Pop
opStatusToJob OP_STATUS_CANCELED  = JOB_STATUS_CANCELED
209 aa79e62e Iustin Pop
opStatusToJob OP_STATUS_ERROR     = JOB_STATUS_ERROR
210 aa79e62e Iustin Pop
211 aa79e62e Iustin Pop
-- | Computes a queued job's status.
212 aa79e62e Iustin Pop
calcJobStatus :: QueuedJob -> JobStatus
213 aa79e62e Iustin Pop
calcJobStatus QueuedJob { qjOps = ops } =
214 aa79e62e Iustin Pop
  extractOpSt (map qoStatus ops) JOB_STATUS_QUEUED True
215 aa79e62e Iustin Pop
    where
216 aa79e62e Iustin Pop
      terminalStatus OP_STATUS_ERROR     = True
217 aa79e62e Iustin Pop
      terminalStatus OP_STATUS_CANCELING = True
218 aa79e62e Iustin Pop
      terminalStatus OP_STATUS_CANCELED  = True
219 aa79e62e Iustin Pop
      terminalStatus _                   = False
220 aa79e62e Iustin Pop
      softStatus     OP_STATUS_SUCCESS   = True
221 aa79e62e Iustin Pop
      softStatus     OP_STATUS_QUEUED    = True
222 aa79e62e Iustin Pop
      softStatus     _                   = False
223 aa79e62e Iustin Pop
      extractOpSt [] _ True = JOB_STATUS_SUCCESS
224 aa79e62e Iustin Pop
      extractOpSt [] d False = d
225 aa79e62e Iustin Pop
      extractOpSt (x:xs) d old_all
226 aa79e62e Iustin Pop
           | terminalStatus x = opStatusToJob x -- abort recursion
227 aa79e62e Iustin Pop
           | softStatus x     = extractOpSt xs d new_all -- continue unchanged
228 aa79e62e Iustin Pop
           | otherwise        = extractOpSt xs (opStatusToJob x) new_all
229 aa79e62e Iustin Pop
           where new_all = x == OP_STATUS_SUCCESS && old_all
230 aa79e62e Iustin Pop
231 aa79e62e Iustin Pop
-- | Determine whether an opcode status is finalized.
232 aa79e62e Iustin Pop
opStatusFinalized :: OpStatus -> Bool
233 aa79e62e Iustin Pop
opStatusFinalized = (> OP_STATUS_RUNNING)
234 aa79e62e Iustin Pop
235 aa79e62e Iustin Pop
-- | Compute a job's priority.
236 aa79e62e Iustin Pop
calcJobPriority :: QueuedJob -> Int
237 aa79e62e Iustin Pop
calcJobPriority QueuedJob { qjOps = ops } =
238 aa79e62e Iustin Pop
  helper . map qoPriority $ filter (not . opStatusFinalized . qoStatus) ops
239 aa79e62e Iustin Pop
    where helper [] = C.opPrioDefault
240 aa79e62e Iustin Pop
          helper ps = minimum ps
241 aa79e62e Iustin Pop
242 aa79e62e Iustin Pop
-- | Log but ignore an 'IOError'.
243 aa79e62e Iustin Pop
ignoreIOError :: a -> Bool -> String -> IOError -> IO a
244 aa79e62e Iustin Pop
ignoreIOError a ignore_noent msg e = do
245 aa79e62e Iustin Pop
  unless (isDoesNotExistError e && ignore_noent) .
246 aa79e62e Iustin Pop
    logWarning $ msg ++ ": " ++ show e
247 aa79e62e Iustin Pop
  return a
248 aa79e62e Iustin Pop
249 aa79e62e Iustin Pop
-- | Compute the list of existing archive directories. Note that I/O
250 aa79e62e Iustin Pop
-- exceptions are swallowed and ignored.
251 aa79e62e Iustin Pop
allArchiveDirs :: FilePath -> IO [FilePath]
252 aa79e62e Iustin Pop
allArchiveDirs rootdir = do
253 aa79e62e Iustin Pop
  let adir = rootdir </> jobQueueArchiveSubDir
254 aa79e62e Iustin Pop
  contents <- getDirectoryContents adir `Control.Exception.catch`
255 aa79e62e Iustin Pop
               ignoreIOError [] False
256 aa79e62e Iustin Pop
                 ("Failed to list queue directory " ++ adir)
257 aa79e62e Iustin Pop
  let fpaths = map (adir </>) $ filter (not . ("." `isPrefixOf`)) contents
258 aa79e62e Iustin Pop
  filterM (\path ->
259 aa79e62e Iustin Pop
             liftM isDirectory (getFileStatus (adir </> path))
260 aa79e62e Iustin Pop
               `Control.Exception.catch`
261 aa79e62e Iustin Pop
               ignoreIOError False True
262 aa79e62e Iustin Pop
                 ("Failed to stat archive path " ++ path)) fpaths
263 aa79e62e Iustin Pop
264 aa79e62e Iustin Pop
-- | Build list of directories containing job files. Note: compared to
265 aa79e62e Iustin Pop
-- the Python version, this doesn't ignore a potential lost+found
266 aa79e62e Iustin Pop
-- file.
267 aa79e62e Iustin Pop
determineJobDirectories :: FilePath -> Bool -> IO [FilePath]
268 aa79e62e Iustin Pop
determineJobDirectories rootdir archived = do
269 aa79e62e Iustin Pop
  other <- if archived
270 aa79e62e Iustin Pop
             then allArchiveDirs rootdir
271 aa79e62e Iustin Pop
             else return []
272 aa79e62e Iustin Pop
  return $ rootdir:other
273 aa79e62e Iustin Pop
274 3cecd73c Michele Tartara
-- Function equivalent to the \'sequence\' function, that cannot be used because
275 3cecd73c Michele Tartara
-- of library version conflict on Lucid.
276 3cecd73c Michele Tartara
-- FIXME: delete this and just use \'sequence\' instead when Lucid compatibility
277 3cecd73c Michele Tartara
-- will not be required anymore.
278 3cecd73c Michele Tartara
sequencer :: [Either IOError [JobId]] -> Either IOError [[JobId]]
279 3cecd73c Michele Tartara
sequencer l = fmap reverse $ foldl seqFolder (Right []) l
280 3cecd73c Michele Tartara
281 3cecd73c Michele Tartara
-- | Folding function for joining multiple [JobIds] into one list.
282 3cecd73c Michele Tartara
seqFolder :: Either IOError [[JobId]]
283 3cecd73c Michele Tartara
          -> Either IOError [JobId]
284 3cecd73c Michele Tartara
          -> Either IOError [[JobId]]
285 3cecd73c Michele Tartara
seqFolder (Left e) _ = Left e
286 3cecd73c Michele Tartara
seqFolder (Right _) (Left e) = Left e
287 3cecd73c Michele Tartara
seqFolder (Right l) (Right el) = Right $ el:l
288 3cecd73c Michele Tartara
289 aa79e62e Iustin Pop
-- | Computes the list of all jobs in the given directories.
290 be0cb2d7 Michele Tartara
getJobIDs :: [FilePath] -> IO (Either IOError [JobId])
291 3cecd73c Michele Tartara
getJobIDs paths = liftM (fmap concat . sequencer) (mapM getDirJobIDs paths)
292 aa79e62e Iustin Pop
293 aa79e62e Iustin Pop
-- | Sorts the a list of job IDs.
294 aa79e62e Iustin Pop
sortJobIDs :: [JobId] -> [JobId]
295 aa79e62e Iustin Pop
sortJobIDs = sortBy (comparing fromJobId)
296 aa79e62e Iustin Pop
297 aa79e62e Iustin Pop
-- | Computes the list of jobs in a given directory.
298 be0cb2d7 Michele Tartara
getDirJobIDs :: FilePath -> IO (Either IOError [JobId])
299 aa79e62e Iustin Pop
getDirJobIDs path = do
300 be0cb2d7 Michele Tartara
  either_contents <-
301 be0cb2d7 Michele Tartara
    try (getDirectoryContents path) :: IO (Either IOError [FilePath])
302 be0cb2d7 Michele Tartara
  case either_contents of
303 be0cb2d7 Michele Tartara
    Left e -> do
304 be0cb2d7 Michele Tartara
      logWarning $ "Failed to list job directory " ++ path ++ ": " ++ show e
305 be0cb2d7 Michele Tartara
      return $ Left e
306 be0cb2d7 Michele Tartara
    Right contents -> do
307 be0cb2d7 Michele Tartara
      let jids = foldl (\ids file ->
308 be0cb2d7 Michele Tartara
                         case parseJobFileId file of
309 be0cb2d7 Michele Tartara
                           Nothing -> ids
310 be0cb2d7 Michele Tartara
                           Just new_id -> new_id:ids) [] contents
311 be0cb2d7 Michele Tartara
      return . Right $ reverse jids
312 aa79e62e Iustin Pop
313 aa79e62e Iustin Pop
-- | Reads the job data from disk.
314 aa79e62e Iustin Pop
readJobDataFromDisk :: FilePath -> Bool -> JobId -> IO (Maybe (String, Bool))
315 aa79e62e Iustin Pop
readJobDataFromDisk rootdir archived jid = do
316 aa79e62e Iustin Pop
  let live_path = liveJobFile rootdir jid
317 aa79e62e Iustin Pop
      archived_path = archivedJobFile rootdir jid
318 aa79e62e Iustin Pop
      all_paths = if archived
319 aa79e62e Iustin Pop
                    then [(live_path, False), (archived_path, True)]
320 aa79e62e Iustin Pop
                    else [(live_path, False)]
321 aa79e62e Iustin Pop
  foldM (\state (path, isarchived) ->
322 aa79e62e Iustin Pop
           liftM (\r -> Just (r, isarchived)) (readFile path)
323 aa79e62e Iustin Pop
             `Control.Exception.catch`
324 aa79e62e Iustin Pop
             ignoreIOError state True
325 aa79e62e Iustin Pop
               ("Failed to read job file " ++ path)) Nothing all_paths
326 aa79e62e Iustin Pop
327 aa79e62e Iustin Pop
-- | Failed to load job error.
328 aa79e62e Iustin Pop
noSuchJob :: Result (QueuedJob, Bool)
329 aa79e62e Iustin Pop
noSuchJob = Bad "Can't load job file"
330 aa79e62e Iustin Pop
331 aa79e62e Iustin Pop
-- | Loads a job from disk.
332 aa79e62e Iustin Pop
loadJobFromDisk :: FilePath -> Bool -> JobId -> IO (Result (QueuedJob, Bool))
333 aa79e62e Iustin Pop
loadJobFromDisk rootdir archived jid = do
334 aa79e62e Iustin Pop
  raw <- readJobDataFromDisk rootdir archived jid
335 aa79e62e Iustin Pop
  -- note: we need some stricness below, otherwise the wrapping in a
336 aa79e62e Iustin Pop
  -- Result will create too much lazyness, and not close the file
337 aa79e62e Iustin Pop
  -- descriptors for the individual jobs
338 aa79e62e Iustin Pop
  return $! case raw of
339 aa79e62e Iustin Pop
             Nothing -> noSuchJob
340 aa79e62e Iustin Pop
             Just (str, arch) ->
341 aa79e62e Iustin Pop
               liftM (\qj -> (qj, arch)) .
342 aa79e62e Iustin Pop
               fromJResult "Parsing job file" $ Text.JSON.decode str
343 cef3f99f Klaus Aehlig
344 b498ed42 Klaus Aehlig
-- | Write a job to disk.
345 b498ed42 Klaus Aehlig
writeJobToDisk :: FilePath -> QueuedJob -> IO (Result ())
346 b498ed42 Klaus Aehlig
writeJobToDisk rootdir job = do
347 b498ed42 Klaus Aehlig
  let filename = liveJobFile rootdir . qjId $ job
348 b498ed42 Klaus Aehlig
      content = Text.JSON.encode . Text.JSON.showJSON $ job
349 b498ed42 Klaus Aehlig
  tryAndLogIOError (atomicWriteFile filename content)
350 b498ed42 Klaus Aehlig
                   ("Failed to write " ++ filename) Ok
351 b498ed42 Klaus Aehlig
352 b5a96995 Klaus Aehlig
-- | Replicate a job to all master candidates.
353 b5a96995 Klaus Aehlig
replicateJob :: FilePath -> [Node] -> QueuedJob -> IO [(Node, ERpcError ())]
354 b5a96995 Klaus Aehlig
replicateJob rootdir mastercandidates job = do
355 b5a96995 Klaus Aehlig
  let filename = liveJobFile rootdir . qjId $ job
356 b5a96995 Klaus Aehlig
      content = Text.JSON.encode . Text.JSON.showJSON $ job
357 b5a96995 Klaus Aehlig
  result <- executeRpcCall mastercandidates
358 b5a96995 Klaus Aehlig
              $ RpcCallJobqueueUpdate filename content
359 b5a96995 Klaus Aehlig
  logRpcErrors result
360 b5a96995 Klaus Aehlig
  return result
361 b5a96995 Klaus Aehlig
362 9fd653a4 Klaus Aehlig
-- | Replicate many jobs to all master candidates.
363 9fd653a4 Klaus Aehlig
replicateManyJobs :: FilePath -> [Node] -> [QueuedJob] -> IO ()
364 9fd653a4 Klaus Aehlig
replicateManyJobs rootdir mastercandidates =
365 9fd653a4 Klaus Aehlig
  mapM_ (replicateJob rootdir mastercandidates)
366 9fd653a4 Klaus Aehlig
367 cef3f99f Klaus Aehlig
-- | Read the job serial number from disk.
368 cef3f99f Klaus Aehlig
readSerialFromDisk :: IO (Result JobId)
369 cef3f99f Klaus Aehlig
readSerialFromDisk = do
370 cef3f99f Klaus Aehlig
  filename <- jobQueueSerialFile
371 cef3f99f Klaus Aehlig
  tryAndLogIOError (readFile filename) "Failed to read serial file"
372 cef3f99f Klaus Aehlig
                   (makeJobIdS . rStripSpace)
373 ae858516 Klaus Aehlig
374 ae858516 Klaus Aehlig
-- | Allocate new job ids.
375 ae858516 Klaus Aehlig
-- To avoid races while accessing the serial file, the threads synchronize
376 ae858516 Klaus Aehlig
-- over a lock, as usual provided by an MVar.
377 ae858516 Klaus Aehlig
allocateJobIds :: [Node] -> MVar () -> Int -> IO (Result [JobId])
378 ae858516 Klaus Aehlig
allocateJobIds mastercandidates lock n =
379 ae858516 Klaus Aehlig
  if n <= 0
380 ae858516 Klaus Aehlig
    then return . Bad $ "Can only allocate positive number of job ids"
381 ae858516 Klaus Aehlig
    else do
382 ae858516 Klaus Aehlig
      takeMVar lock
383 ae858516 Klaus Aehlig
      rjobid <- readSerialFromDisk
384 ae858516 Klaus Aehlig
      case rjobid of
385 ae858516 Klaus Aehlig
        Bad s -> do
386 ae858516 Klaus Aehlig
          putMVar lock ()
387 ae858516 Klaus Aehlig
          return . Bad $ s
388 ae858516 Klaus Aehlig
        Ok jid -> do
389 ae858516 Klaus Aehlig
          let current = fromJobId jid
390 ae858516 Klaus Aehlig
              serial_content = show (current + n) ++  "\n"
391 ae858516 Klaus Aehlig
          serial <- jobQueueSerialFile
392 ae858516 Klaus Aehlig
          write_result <- try $ atomicWriteFile serial serial_content
393 ae858516 Klaus Aehlig
                          :: IO (Either IOError ())
394 ae858516 Klaus Aehlig
          case write_result of
395 ae858516 Klaus Aehlig
            Left e -> do
396 f7819050 Klaus Aehlig
              putMVar lock ()
397 ae858516 Klaus Aehlig
              let msg = "Failed to write serial file: " ++ show e
398 ae858516 Klaus Aehlig
              logError msg
399 ae858516 Klaus Aehlig
              return . Bad $ msg 
400 ae858516 Klaus Aehlig
            Right () -> do
401 ae858516 Klaus Aehlig
              _ <- executeRpcCall mastercandidates
402 ae858516 Klaus Aehlig
                     $ RpcCallJobqueueUpdate serial serial_content
403 f7819050 Klaus Aehlig
              putMVar lock ()
404 ae858516 Klaus Aehlig
              return $ mapM makeJobId [(current+1)..(current+n)]
405 ae858516 Klaus Aehlig
406 ae858516 Klaus Aehlig
-- | Allocate one new job id.
407 ae858516 Klaus Aehlig
allocateJobId :: [Node] -> MVar () -> IO (Result JobId)
408 ae858516 Klaus Aehlig
allocateJobId mastercandidates lock = do
409 ae858516 Klaus Aehlig
  jids <- allocateJobIds mastercandidates lock 1
410 ae858516 Klaus Aehlig
  return (jids >>= monadicThe "Failed to allocate precisely one Job ID")
411 1b94c0db Klaus Aehlig
412 1b94c0db Klaus Aehlig
-- | Decide if job queue is open
413 1b94c0db Klaus Aehlig
isQueueOpen :: IO Bool
414 1b94c0db Klaus Aehlig
isQueueOpen = liftM not (jobQueueDrainFile >>= doesFileExist)