Statistics
| Branch: | Tag: | Revision:

root / src / Ganeti / JQueue.hs @ 2af22d70

History | View | Annotate | Download (16.1 kB)

1 aa79e62e Iustin Pop
{-# LANGUAGE TemplateHaskell #-}
2 aa79e62e Iustin Pop
3 aa79e62e Iustin Pop
{-| Implementation of the job queue.
4 aa79e62e Iustin Pop
5 aa79e62e Iustin Pop
-}
6 aa79e62e Iustin Pop
7 aa79e62e Iustin Pop
{-
8 aa79e62e Iustin Pop
9 aa79e62e Iustin Pop
Copyright (C) 2010, 2012 Google Inc.
10 aa79e62e Iustin Pop
11 aa79e62e Iustin Pop
This program is free software; you can redistribute it and/or modify
12 aa79e62e Iustin Pop
it under the terms of the GNU General Public License as published by
13 aa79e62e Iustin Pop
the Free Software Foundation; either version 2 of the License, or
14 aa79e62e Iustin Pop
(at your option) any later version.
15 aa79e62e Iustin Pop
16 aa79e62e Iustin Pop
This program is distributed in the hope that it will be useful, but
17 aa79e62e Iustin Pop
WITHOUT ANY WARRANTY; without even the implied warranty of
18 aa79e62e Iustin Pop
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
19 aa79e62e Iustin Pop
General Public License for more details.
20 aa79e62e Iustin Pop
21 aa79e62e Iustin Pop
You should have received a copy of the GNU General Public License
22 aa79e62e Iustin Pop
along with this program; if not, write to the Free Software
23 aa79e62e Iustin Pop
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
24 aa79e62e Iustin Pop
02110-1301, USA.
25 aa79e62e Iustin Pop
26 aa79e62e Iustin Pop
-}
27 aa79e62e Iustin Pop
28 aa79e62e Iustin Pop
module Ganeti.JQueue
29 aa79e62e Iustin Pop
    ( QueuedOpCode(..)
30 aa79e62e Iustin Pop
    , QueuedJob(..)
31 aa79e62e Iustin Pop
    , InputOpCode(..)
32 4b49a72b Klaus Aehlig
    , queuedOpCodeFromMetaOpCode
33 1c1132f4 Klaus Aehlig
    , queuedJobFromOpCodes
34 aa79e62e Iustin Pop
    , Timestamp
35 aa79e62e Iustin Pop
    , noTimestamp
36 c3a70209 Klaus Aehlig
    , currentTimestamp
37 2af22d70 Klaus Aehlig
    , setReceivedTimestamp
38 aa79e62e Iustin Pop
    , opStatusFinalized
39 aa79e62e Iustin Pop
    , extractOpSummary
40 aa79e62e Iustin Pop
    , calcJobStatus
41 aa79e62e Iustin Pop
    , calcJobPriority
42 aa79e62e Iustin Pop
    , jobFileName
43 aa79e62e Iustin Pop
    , liveJobFile
44 aa79e62e Iustin Pop
    , archivedJobFile
45 aa79e62e Iustin Pop
    , determineJobDirectories
46 aa79e62e Iustin Pop
    , getJobIDs
47 aa79e62e Iustin Pop
    , sortJobIDs
48 aa79e62e Iustin Pop
    , loadJobFromDisk
49 aa79e62e Iustin Pop
    , noSuchJob
50 cef3f99f Klaus Aehlig
    , readSerialFromDisk
51 ae858516 Klaus Aehlig
    , allocateJobIds
52 ae858516 Klaus Aehlig
    , allocateJobId
53 b498ed42 Klaus Aehlig
    , writeJobToDisk
54 9fd653a4 Klaus Aehlig
    , replicateManyJobs
55 1b94c0db Klaus Aehlig
    , isQueueOpen
56 493d6920 Klaus Aehlig
    , enqueueJobs
57 aa79e62e Iustin Pop
    ) where
58 aa79e62e Iustin Pop
59 ae858516 Klaus Aehlig
import Control.Concurrent.MVar
60 aa79e62e Iustin Pop
import Control.Exception
61 aa79e62e Iustin Pop
import Control.Monad
62 aa79e62e Iustin Pop
import Data.List
63 4b49a72b Klaus Aehlig
import Data.Maybe
64 aa79e62e Iustin Pop
import Data.Ord (comparing)
65 aa79e62e Iustin Pop
-- workaround what seems to be a bug in ghc 7.4's TH shadowing code
66 aa79e62e Iustin Pop
import Prelude hiding (log, id)
67 aa79e62e Iustin Pop
import System.Directory
68 aa79e62e Iustin Pop
import System.FilePath
69 aa79e62e Iustin Pop
import System.IO.Error (isDoesNotExistError)
70 aa79e62e Iustin Pop
import System.Posix.Files
71 c3a70209 Klaus Aehlig
import System.Time
72 aa79e62e Iustin Pop
import qualified Text.JSON
73 aa79e62e Iustin Pop
import Text.JSON.Types
74 aa79e62e Iustin Pop
75 aa79e62e Iustin Pop
import Ganeti.BasicTypes
76 aa79e62e Iustin Pop
import qualified Ganeti.Constants as C
77 aa79e62e Iustin Pop
import Ganeti.JSON
78 aa79e62e Iustin Pop
import Ganeti.Logging
79 493d6920 Klaus Aehlig
import Ganeti.Luxi
80 ae858516 Klaus Aehlig
import Ganeti.Objects (Node)
81 aa79e62e Iustin Pop
import Ganeti.OpCodes
82 aa79e62e Iustin Pop
import Ganeti.Path
83 b5a96995 Klaus Aehlig
import Ganeti.Rpc (executeRpcCall, ERpcError, logRpcErrors,
84 b5a96995 Klaus Aehlig
                   RpcCallJobqueueUpdate(..))
85 aa79e62e Iustin Pop
import Ganeti.THH
86 aa79e62e Iustin Pop
import Ganeti.Types
87 cef3f99f Klaus Aehlig
import Ganeti.Utils
88 aa79e62e Iustin Pop
89 aa79e62e Iustin Pop
-- * Data types
90 aa79e62e Iustin Pop
91 c3a70209 Klaus Aehlig
-- | The ganeti queue timestamp type. It represents the time as the pair
92 c3a70209 Klaus Aehlig
-- of seconds since the epoch and microseconds since the beginning of the
93 c3a70209 Klaus Aehlig
-- second.
94 aa79e62e Iustin Pop
type Timestamp = (Int, Int)
95 aa79e62e Iustin Pop
96 aa79e62e Iustin Pop
-- | Missing timestamp type.
97 aa79e62e Iustin Pop
noTimestamp :: Timestamp
98 aa79e62e Iustin Pop
noTimestamp = (-1, -1)
99 aa79e62e Iustin Pop
100 c3a70209 Klaus Aehlig
-- | Get the current time in the job-queue timestamp format.
101 c3a70209 Klaus Aehlig
currentTimestamp :: IO Timestamp
102 c3a70209 Klaus Aehlig
currentTimestamp = do
103 c3a70209 Klaus Aehlig
  TOD ctime pico <- getClockTime
104 c3a70209 Klaus Aehlig
  return (fromIntegral ctime, fromIntegral $ pico `div` 1000000)
105 c3a70209 Klaus Aehlig
106 aa79e62e Iustin Pop
-- | An input opcode.
107 aa79e62e Iustin Pop
data InputOpCode = ValidOpCode MetaOpCode -- ^ OpCode was parsed successfully
108 aa79e62e Iustin Pop
                 | InvalidOpCode JSValue  -- ^ Invalid opcode
109 aa79e62e Iustin Pop
                   deriving (Show, Eq)
110 aa79e62e Iustin Pop
111 aa79e62e Iustin Pop
-- | JSON instance for 'InputOpCode', trying to parse it and if
112 aa79e62e Iustin Pop
-- failing, keeping the original JSValue.
113 aa79e62e Iustin Pop
instance Text.JSON.JSON InputOpCode where
114 aa79e62e Iustin Pop
  showJSON (ValidOpCode mo) = Text.JSON.showJSON mo
115 aa79e62e Iustin Pop
  showJSON (InvalidOpCode inv) = inv
116 aa79e62e Iustin Pop
  readJSON v = case Text.JSON.readJSON v of
117 aa79e62e Iustin Pop
                 Text.JSON.Error _ -> return $ InvalidOpCode v
118 aa79e62e Iustin Pop
                 Text.JSON.Ok mo -> return $ ValidOpCode mo
119 aa79e62e Iustin Pop
120 aa79e62e Iustin Pop
-- | Invalid opcode summary.
121 aa79e62e Iustin Pop
invalidOp :: String
122 aa79e62e Iustin Pop
invalidOp = "INVALID_OP"
123 aa79e62e Iustin Pop
124 aa79e62e Iustin Pop
-- | Tries to extract the opcode summary from an 'InputOpCode'. This
125 aa79e62e Iustin Pop
-- duplicates some functionality from the 'opSummary' function in
126 aa79e62e Iustin Pop
-- "Ganeti.OpCodes".
127 aa79e62e Iustin Pop
extractOpSummary :: InputOpCode -> String
128 aa79e62e Iustin Pop
extractOpSummary (ValidOpCode metaop) = opSummary $ metaOpCode metaop
129 aa79e62e Iustin Pop
extractOpSummary (InvalidOpCode (JSObject o)) =
130 aa79e62e Iustin Pop
  case fromObjWithDefault (fromJSObject o) "OP_ID" ("OP_" ++ invalidOp) of
131 aa79e62e Iustin Pop
    Just s -> drop 3 s -- drop the OP_ prefix
132 aa79e62e Iustin Pop
    Nothing -> invalidOp
133 aa79e62e Iustin Pop
extractOpSummary _ = invalidOp
134 aa79e62e Iustin Pop
135 aa79e62e Iustin Pop
$(buildObject "QueuedOpCode" "qo"
136 aa79e62e Iustin Pop
  [ simpleField "input"           [t| InputOpCode |]
137 aa79e62e Iustin Pop
  , simpleField "status"          [t| OpStatus    |]
138 aa79e62e Iustin Pop
  , simpleField "result"          [t| JSValue     |]
139 aa79e62e Iustin Pop
  , defaultField [| [] |] $
140 aa79e62e Iustin Pop
    simpleField "log"             [t| [(Int, Timestamp, ELogType, JSValue)] |]
141 aa79e62e Iustin Pop
  , simpleField "priority"        [t| Int         |]
142 aa79e62e Iustin Pop
  , optionalNullSerField $
143 aa79e62e Iustin Pop
    simpleField "start_timestamp" [t| Timestamp   |]
144 aa79e62e Iustin Pop
  , optionalNullSerField $
145 aa79e62e Iustin Pop
    simpleField "exec_timestamp"  [t| Timestamp   |]
146 aa79e62e Iustin Pop
  , optionalNullSerField $
147 aa79e62e Iustin Pop
    simpleField "end_timestamp"   [t| Timestamp   |]
148 aa79e62e Iustin Pop
  ])
149 aa79e62e Iustin Pop
150 aa79e62e Iustin Pop
$(buildObject "QueuedJob" "qj"
151 aa79e62e Iustin Pop
  [ simpleField "id"                 [t| JobId          |]
152 aa79e62e Iustin Pop
  , simpleField "ops"                [t| [QueuedOpCode] |]
153 aa79e62e Iustin Pop
  , optionalNullSerField $
154 aa79e62e Iustin Pop
    simpleField "received_timestamp" [t| Timestamp      |]
155 aa79e62e Iustin Pop
  , optionalNullSerField $
156 aa79e62e Iustin Pop
    simpleField "start_timestamp"    [t| Timestamp      |]
157 aa79e62e Iustin Pop
  , optionalNullSerField $
158 aa79e62e Iustin Pop
    simpleField "end_timestamp"      [t| Timestamp      |]
159 aa79e62e Iustin Pop
  ])
160 aa79e62e Iustin Pop
161 4b49a72b Klaus Aehlig
-- | Convenience function to obtain a QueuedOpCode from a MetaOpCode
162 4b49a72b Klaus Aehlig
queuedOpCodeFromMetaOpCode :: MetaOpCode -> QueuedOpCode
163 4b49a72b Klaus Aehlig
queuedOpCodeFromMetaOpCode op =
164 4b49a72b Klaus Aehlig
  QueuedOpCode { qoInput = ValidOpCode op
165 4b49a72b Klaus Aehlig
               , qoStatus = OP_STATUS_QUEUED
166 4b49a72b Klaus Aehlig
               , qoPriority = opSubmitPriorityToRaw . opPriority . metaParams
167 4b49a72b Klaus Aehlig
                              $ op
168 4b49a72b Klaus Aehlig
               , qoLog = []
169 4b49a72b Klaus Aehlig
               , qoResult = JSNull
170 4b49a72b Klaus Aehlig
               , qoStartTimestamp = Nothing
171 4b49a72b Klaus Aehlig
               , qoEndTimestamp = Nothing
172 4b49a72b Klaus Aehlig
               , qoExecTimestamp = Nothing
173 4b49a72b Klaus Aehlig
               }
174 4b49a72b Klaus Aehlig
175 1c1132f4 Klaus Aehlig
-- | From a job-id and a list of op-codes create a job. This is
176 1c1132f4 Klaus Aehlig
-- the pure part of job creation, as allocating a new job id
177 1c1132f4 Klaus Aehlig
-- lives in IO.
178 1c1132f4 Klaus Aehlig
queuedJobFromOpCodes :: (Monad m) => JobId -> [MetaOpCode] -> m QueuedJob
179 1c1132f4 Klaus Aehlig
queuedJobFromOpCodes jobid ops = do
180 1c1132f4 Klaus Aehlig
  ops' <- mapM (`resolveDependencies` jobid) ops
181 1c1132f4 Klaus Aehlig
  return QueuedJob { qjId = jobid
182 1c1132f4 Klaus Aehlig
                   , qjOps = map queuedOpCodeFromMetaOpCode ops'
183 1c1132f4 Klaus Aehlig
                   , qjReceivedTimestamp = Nothing 
184 1c1132f4 Klaus Aehlig
                   , qjStartTimestamp = Nothing
185 1c1132f4 Klaus Aehlig
                   , qjEndTimestamp = Nothing
186 1c1132f4 Klaus Aehlig
                   }
187 1c1132f4 Klaus Aehlig
188 2af22d70 Klaus Aehlig
-- | Attach a received timestamp to a Queued Job.
189 2af22d70 Klaus Aehlig
setReceivedTimestamp :: Timestamp -> QueuedJob -> QueuedJob
190 2af22d70 Klaus Aehlig
setReceivedTimestamp ts job = job { qjReceivedTimestamp = Just ts }
191 2af22d70 Klaus Aehlig
192 aa79e62e Iustin Pop
-- | Job file prefix.
193 aa79e62e Iustin Pop
jobFilePrefix :: String
194 aa79e62e Iustin Pop
jobFilePrefix = "job-"
195 aa79e62e Iustin Pop
196 aa79e62e Iustin Pop
-- | Computes the filename for a given job ID.
197 aa79e62e Iustin Pop
jobFileName :: JobId -> FilePath
198 aa79e62e Iustin Pop
jobFileName jid = jobFilePrefix ++ show (fromJobId jid)
199 aa79e62e Iustin Pop
200 aa79e62e Iustin Pop
-- | Parses a job ID from a file name.
201 aa79e62e Iustin Pop
parseJobFileId :: (Monad m) => FilePath -> m JobId
202 aa79e62e Iustin Pop
parseJobFileId path =
203 aa79e62e Iustin Pop
  case stripPrefix jobFilePrefix path of
204 aa79e62e Iustin Pop
    Nothing -> fail $ "Job file '" ++ path ++
205 aa79e62e Iustin Pop
                      "' doesn't have the correct prefix"
206 aa79e62e Iustin Pop
    Just suffix -> makeJobIdS suffix
207 aa79e62e Iustin Pop
208 aa79e62e Iustin Pop
-- | Computes the full path to a live job.
209 aa79e62e Iustin Pop
liveJobFile :: FilePath -> JobId -> FilePath
210 aa79e62e Iustin Pop
liveJobFile rootdir jid = rootdir </> jobFileName jid
211 aa79e62e Iustin Pop
212 aa79e62e Iustin Pop
-- | Computes the full path to an archives job. BROKEN.
213 aa79e62e Iustin Pop
archivedJobFile :: FilePath -> JobId -> FilePath
214 aa79e62e Iustin Pop
archivedJobFile rootdir jid =
215 aa79e62e Iustin Pop
  let subdir = show (fromJobId jid `div` C.jstoreJobsPerArchiveDirectory)
216 aa79e62e Iustin Pop
  in rootdir </> jobQueueArchiveSubDir </> subdir </> jobFileName jid
217 aa79e62e Iustin Pop
218 aa79e62e Iustin Pop
-- | Map from opcode status to job status.
219 aa79e62e Iustin Pop
opStatusToJob :: OpStatus -> JobStatus
220 aa79e62e Iustin Pop
opStatusToJob OP_STATUS_QUEUED    = JOB_STATUS_QUEUED
221 aa79e62e Iustin Pop
opStatusToJob OP_STATUS_WAITING   = JOB_STATUS_WAITING
222 aa79e62e Iustin Pop
opStatusToJob OP_STATUS_SUCCESS   = JOB_STATUS_SUCCESS
223 aa79e62e Iustin Pop
opStatusToJob OP_STATUS_RUNNING   = JOB_STATUS_RUNNING
224 aa79e62e Iustin Pop
opStatusToJob OP_STATUS_CANCELING = JOB_STATUS_CANCELING
225 aa79e62e Iustin Pop
opStatusToJob OP_STATUS_CANCELED  = JOB_STATUS_CANCELED
226 aa79e62e Iustin Pop
opStatusToJob OP_STATUS_ERROR     = JOB_STATUS_ERROR
227 aa79e62e Iustin Pop
228 aa79e62e Iustin Pop
-- | Computes a queued job's status.
229 aa79e62e Iustin Pop
calcJobStatus :: QueuedJob -> JobStatus
230 aa79e62e Iustin Pop
calcJobStatus QueuedJob { qjOps = ops } =
231 aa79e62e Iustin Pop
  extractOpSt (map qoStatus ops) JOB_STATUS_QUEUED True
232 aa79e62e Iustin Pop
    where
233 aa79e62e Iustin Pop
      terminalStatus OP_STATUS_ERROR     = True
234 aa79e62e Iustin Pop
      terminalStatus OP_STATUS_CANCELING = True
235 aa79e62e Iustin Pop
      terminalStatus OP_STATUS_CANCELED  = True
236 aa79e62e Iustin Pop
      terminalStatus _                   = False
237 aa79e62e Iustin Pop
      softStatus     OP_STATUS_SUCCESS   = True
238 aa79e62e Iustin Pop
      softStatus     OP_STATUS_QUEUED    = True
239 aa79e62e Iustin Pop
      softStatus     _                   = False
240 aa79e62e Iustin Pop
      extractOpSt [] _ True = JOB_STATUS_SUCCESS
241 aa79e62e Iustin Pop
      extractOpSt [] d False = d
242 aa79e62e Iustin Pop
      extractOpSt (x:xs) d old_all
243 aa79e62e Iustin Pop
           | terminalStatus x = opStatusToJob x -- abort recursion
244 aa79e62e Iustin Pop
           | softStatus x     = extractOpSt xs d new_all -- continue unchanged
245 aa79e62e Iustin Pop
           | otherwise        = extractOpSt xs (opStatusToJob x) new_all
246 aa79e62e Iustin Pop
           where new_all = x == OP_STATUS_SUCCESS && old_all
247 aa79e62e Iustin Pop
248 aa79e62e Iustin Pop
-- | Determine whether an opcode status is finalized.
249 aa79e62e Iustin Pop
opStatusFinalized :: OpStatus -> Bool
250 aa79e62e Iustin Pop
opStatusFinalized = (> OP_STATUS_RUNNING)
251 aa79e62e Iustin Pop
252 aa79e62e Iustin Pop
-- | Compute a job's priority.
253 aa79e62e Iustin Pop
calcJobPriority :: QueuedJob -> Int
254 aa79e62e Iustin Pop
calcJobPriority QueuedJob { qjOps = ops } =
255 aa79e62e Iustin Pop
  helper . map qoPriority $ filter (not . opStatusFinalized . qoStatus) ops
256 aa79e62e Iustin Pop
    where helper [] = C.opPrioDefault
257 aa79e62e Iustin Pop
          helper ps = minimum ps
258 aa79e62e Iustin Pop
259 aa79e62e Iustin Pop
-- | Log but ignore an 'IOError'.
260 aa79e62e Iustin Pop
ignoreIOError :: a -> Bool -> String -> IOError -> IO a
261 aa79e62e Iustin Pop
ignoreIOError a ignore_noent msg e = do
262 aa79e62e Iustin Pop
  unless (isDoesNotExistError e && ignore_noent) .
263 aa79e62e Iustin Pop
    logWarning $ msg ++ ": " ++ show e
264 aa79e62e Iustin Pop
  return a
265 aa79e62e Iustin Pop
266 aa79e62e Iustin Pop
-- | Compute the list of existing archive directories. Note that I/O
267 aa79e62e Iustin Pop
-- exceptions are swallowed and ignored.
268 aa79e62e Iustin Pop
allArchiveDirs :: FilePath -> IO [FilePath]
269 aa79e62e Iustin Pop
allArchiveDirs rootdir = do
270 aa79e62e Iustin Pop
  let adir = rootdir </> jobQueueArchiveSubDir
271 aa79e62e Iustin Pop
  contents <- getDirectoryContents adir `Control.Exception.catch`
272 aa79e62e Iustin Pop
               ignoreIOError [] False
273 aa79e62e Iustin Pop
                 ("Failed to list queue directory " ++ adir)
274 aa79e62e Iustin Pop
  let fpaths = map (adir </>) $ filter (not . ("." `isPrefixOf`)) contents
275 aa79e62e Iustin Pop
  filterM (\path ->
276 aa79e62e Iustin Pop
             liftM isDirectory (getFileStatus (adir </> path))
277 aa79e62e Iustin Pop
               `Control.Exception.catch`
278 aa79e62e Iustin Pop
               ignoreIOError False True
279 aa79e62e Iustin Pop
                 ("Failed to stat archive path " ++ path)) fpaths
280 aa79e62e Iustin Pop
281 aa79e62e Iustin Pop
-- | Build list of directories containing job files. Note: compared to
282 aa79e62e Iustin Pop
-- the Python version, this doesn't ignore a potential lost+found
283 aa79e62e Iustin Pop
-- file.
284 aa79e62e Iustin Pop
determineJobDirectories :: FilePath -> Bool -> IO [FilePath]
285 aa79e62e Iustin Pop
determineJobDirectories rootdir archived = do
286 aa79e62e Iustin Pop
  other <- if archived
287 aa79e62e Iustin Pop
             then allArchiveDirs rootdir
288 aa79e62e Iustin Pop
             else return []
289 aa79e62e Iustin Pop
  return $ rootdir:other
290 aa79e62e Iustin Pop
291 3cecd73c Michele Tartara
-- Function equivalent to the \'sequence\' function, that cannot be used because
292 3cecd73c Michele Tartara
-- of library version conflict on Lucid.
293 3cecd73c Michele Tartara
-- FIXME: delete this and just use \'sequence\' instead when Lucid compatibility
294 3cecd73c Michele Tartara
-- will not be required anymore.
295 3cecd73c Michele Tartara
sequencer :: [Either IOError [JobId]] -> Either IOError [[JobId]]
296 3cecd73c Michele Tartara
sequencer l = fmap reverse $ foldl seqFolder (Right []) l
297 3cecd73c Michele Tartara
298 3cecd73c Michele Tartara
-- | Folding function for joining multiple [JobIds] into one list.
299 3cecd73c Michele Tartara
seqFolder :: Either IOError [[JobId]]
300 3cecd73c Michele Tartara
          -> Either IOError [JobId]
301 3cecd73c Michele Tartara
          -> Either IOError [[JobId]]
302 3cecd73c Michele Tartara
seqFolder (Left e) _ = Left e
303 3cecd73c Michele Tartara
seqFolder (Right _) (Left e) = Left e
304 3cecd73c Michele Tartara
seqFolder (Right l) (Right el) = Right $ el:l
305 3cecd73c Michele Tartara
306 aa79e62e Iustin Pop
-- | Computes the list of all jobs in the given directories.
307 be0cb2d7 Michele Tartara
getJobIDs :: [FilePath] -> IO (Either IOError [JobId])
308 3cecd73c Michele Tartara
getJobIDs paths = liftM (fmap concat . sequencer) (mapM getDirJobIDs paths)
309 aa79e62e Iustin Pop
310 aa79e62e Iustin Pop
-- | Sorts the a list of job IDs.
311 aa79e62e Iustin Pop
sortJobIDs :: [JobId] -> [JobId]
312 aa79e62e Iustin Pop
sortJobIDs = sortBy (comparing fromJobId)
313 aa79e62e Iustin Pop
314 aa79e62e Iustin Pop
-- | Computes the list of jobs in a given directory.
315 be0cb2d7 Michele Tartara
getDirJobIDs :: FilePath -> IO (Either IOError [JobId])
316 aa79e62e Iustin Pop
getDirJobIDs path = do
317 be0cb2d7 Michele Tartara
  either_contents <-
318 be0cb2d7 Michele Tartara
    try (getDirectoryContents path) :: IO (Either IOError [FilePath])
319 be0cb2d7 Michele Tartara
  case either_contents of
320 be0cb2d7 Michele Tartara
    Left e -> do
321 be0cb2d7 Michele Tartara
      logWarning $ "Failed to list job directory " ++ path ++ ": " ++ show e
322 be0cb2d7 Michele Tartara
      return $ Left e
323 be0cb2d7 Michele Tartara
    Right contents -> do
324 be0cb2d7 Michele Tartara
      let jids = foldl (\ids file ->
325 be0cb2d7 Michele Tartara
                         case parseJobFileId file of
326 be0cb2d7 Michele Tartara
                           Nothing -> ids
327 be0cb2d7 Michele Tartara
                           Just new_id -> new_id:ids) [] contents
328 be0cb2d7 Michele Tartara
      return . Right $ reverse jids
329 aa79e62e Iustin Pop
330 aa79e62e Iustin Pop
-- | Reads the job data from disk.
331 aa79e62e Iustin Pop
readJobDataFromDisk :: FilePath -> Bool -> JobId -> IO (Maybe (String, Bool))
332 aa79e62e Iustin Pop
readJobDataFromDisk rootdir archived jid = do
333 aa79e62e Iustin Pop
  let live_path = liveJobFile rootdir jid
334 aa79e62e Iustin Pop
      archived_path = archivedJobFile rootdir jid
335 aa79e62e Iustin Pop
      all_paths = if archived
336 aa79e62e Iustin Pop
                    then [(live_path, False), (archived_path, True)]
337 aa79e62e Iustin Pop
                    else [(live_path, False)]
338 aa79e62e Iustin Pop
  foldM (\state (path, isarchived) ->
339 aa79e62e Iustin Pop
           liftM (\r -> Just (r, isarchived)) (readFile path)
340 aa79e62e Iustin Pop
             `Control.Exception.catch`
341 aa79e62e Iustin Pop
             ignoreIOError state True
342 aa79e62e Iustin Pop
               ("Failed to read job file " ++ path)) Nothing all_paths
343 aa79e62e Iustin Pop
344 aa79e62e Iustin Pop
-- | Failed to load job error.
345 aa79e62e Iustin Pop
noSuchJob :: Result (QueuedJob, Bool)
346 aa79e62e Iustin Pop
noSuchJob = Bad "Can't load job file"
347 aa79e62e Iustin Pop
348 aa79e62e Iustin Pop
-- | Loads a job from disk.
349 aa79e62e Iustin Pop
loadJobFromDisk :: FilePath -> Bool -> JobId -> IO (Result (QueuedJob, Bool))
350 aa79e62e Iustin Pop
loadJobFromDisk rootdir archived jid = do
351 aa79e62e Iustin Pop
  raw <- readJobDataFromDisk rootdir archived jid
352 aa79e62e Iustin Pop
  -- note: we need some stricness below, otherwise the wrapping in a
353 aa79e62e Iustin Pop
  -- Result will create too much lazyness, and not close the file
354 aa79e62e Iustin Pop
  -- descriptors for the individual jobs
355 aa79e62e Iustin Pop
  return $! case raw of
356 aa79e62e Iustin Pop
             Nothing -> noSuchJob
357 aa79e62e Iustin Pop
             Just (str, arch) ->
358 aa79e62e Iustin Pop
               liftM (\qj -> (qj, arch)) .
359 aa79e62e Iustin Pop
               fromJResult "Parsing job file" $ Text.JSON.decode str
360 cef3f99f Klaus Aehlig
361 b498ed42 Klaus Aehlig
-- | Write a job to disk.
362 b498ed42 Klaus Aehlig
writeJobToDisk :: FilePath -> QueuedJob -> IO (Result ())
363 b498ed42 Klaus Aehlig
writeJobToDisk rootdir job = do
364 b498ed42 Klaus Aehlig
  let filename = liveJobFile rootdir . qjId $ job
365 b498ed42 Klaus Aehlig
      content = Text.JSON.encode . Text.JSON.showJSON $ job
366 b498ed42 Klaus Aehlig
  tryAndLogIOError (atomicWriteFile filename content)
367 b498ed42 Klaus Aehlig
                   ("Failed to write " ++ filename) Ok
368 b498ed42 Klaus Aehlig
369 b5a96995 Klaus Aehlig
-- | Replicate a job to all master candidates.
370 b5a96995 Klaus Aehlig
replicateJob :: FilePath -> [Node] -> QueuedJob -> IO [(Node, ERpcError ())]
371 b5a96995 Klaus Aehlig
replicateJob rootdir mastercandidates job = do
372 b5a96995 Klaus Aehlig
  let filename = liveJobFile rootdir . qjId $ job
373 b5a96995 Klaus Aehlig
      content = Text.JSON.encode . Text.JSON.showJSON $ job
374 b5a96995 Klaus Aehlig
  result <- executeRpcCall mastercandidates
375 b5a96995 Klaus Aehlig
              $ RpcCallJobqueueUpdate filename content
376 b5a96995 Klaus Aehlig
  logRpcErrors result
377 b5a96995 Klaus Aehlig
  return result
378 b5a96995 Klaus Aehlig
379 9fd653a4 Klaus Aehlig
-- | Replicate many jobs to all master candidates.
380 9fd653a4 Klaus Aehlig
replicateManyJobs :: FilePath -> [Node] -> [QueuedJob] -> IO ()
381 9fd653a4 Klaus Aehlig
replicateManyJobs rootdir mastercandidates =
382 9fd653a4 Klaus Aehlig
  mapM_ (replicateJob rootdir mastercandidates)
383 9fd653a4 Klaus Aehlig
384 cef3f99f Klaus Aehlig
-- | Read the job serial number from disk.
385 cef3f99f Klaus Aehlig
readSerialFromDisk :: IO (Result JobId)
386 cef3f99f Klaus Aehlig
readSerialFromDisk = do
387 cef3f99f Klaus Aehlig
  filename <- jobQueueSerialFile
388 cef3f99f Klaus Aehlig
  tryAndLogIOError (readFile filename) "Failed to read serial file"
389 cef3f99f Klaus Aehlig
                   (makeJobIdS . rStripSpace)
390 ae858516 Klaus Aehlig
391 ae858516 Klaus Aehlig
-- | Allocate new job ids.
392 ae858516 Klaus Aehlig
-- To avoid races while accessing the serial file, the threads synchronize
393 ae858516 Klaus Aehlig
-- over a lock, as usual provided by an MVar.
394 ae858516 Klaus Aehlig
allocateJobIds :: [Node] -> MVar () -> Int -> IO (Result [JobId])
395 ae858516 Klaus Aehlig
allocateJobIds mastercandidates lock n =
396 ae858516 Klaus Aehlig
  if n <= 0
397 ae858516 Klaus Aehlig
    then return . Bad $ "Can only allocate positive number of job ids"
398 ae858516 Klaus Aehlig
    else do
399 ae858516 Klaus Aehlig
      takeMVar lock
400 ae858516 Klaus Aehlig
      rjobid <- readSerialFromDisk
401 ae858516 Klaus Aehlig
      case rjobid of
402 ae858516 Klaus Aehlig
        Bad s -> do
403 ae858516 Klaus Aehlig
          putMVar lock ()
404 ae858516 Klaus Aehlig
          return . Bad $ s
405 ae858516 Klaus Aehlig
        Ok jid -> do
406 ae858516 Klaus Aehlig
          let current = fromJobId jid
407 ae858516 Klaus Aehlig
              serial_content = show (current + n) ++  "\n"
408 ae858516 Klaus Aehlig
          serial <- jobQueueSerialFile
409 ae858516 Klaus Aehlig
          write_result <- try $ atomicWriteFile serial serial_content
410 ae858516 Klaus Aehlig
                          :: IO (Either IOError ())
411 ae858516 Klaus Aehlig
          case write_result of
412 ae858516 Klaus Aehlig
            Left e -> do
413 f7819050 Klaus Aehlig
              putMVar lock ()
414 ae858516 Klaus Aehlig
              let msg = "Failed to write serial file: " ++ show e
415 ae858516 Klaus Aehlig
              logError msg
416 ae858516 Klaus Aehlig
              return . Bad $ msg 
417 ae858516 Klaus Aehlig
            Right () -> do
418 ae858516 Klaus Aehlig
              _ <- executeRpcCall mastercandidates
419 ae858516 Klaus Aehlig
                     $ RpcCallJobqueueUpdate serial serial_content
420 f7819050 Klaus Aehlig
              putMVar lock ()
421 ae858516 Klaus Aehlig
              return $ mapM makeJobId [(current+1)..(current+n)]
422 ae858516 Klaus Aehlig
423 ae858516 Klaus Aehlig
-- | Allocate one new job id.
424 ae858516 Klaus Aehlig
allocateJobId :: [Node] -> MVar () -> IO (Result JobId)
425 ae858516 Klaus Aehlig
allocateJobId mastercandidates lock = do
426 ae858516 Klaus Aehlig
  jids <- allocateJobIds mastercandidates lock 1
427 ae858516 Klaus Aehlig
  return (jids >>= monadicThe "Failed to allocate precisely one Job ID")
428 1b94c0db Klaus Aehlig
429 1b94c0db Klaus Aehlig
-- | Decide if job queue is open
430 1b94c0db Klaus Aehlig
isQueueOpen :: IO Bool
431 1b94c0db Klaus Aehlig
isQueueOpen = liftM not (jobQueueDrainFile >>= doesFileExist)
432 493d6920 Klaus Aehlig
433 493d6920 Klaus Aehlig
-- | Enqueue jobs. This will guarantee that jobs get executed eventually.
434 493d6920 Klaus Aehlig
-- Curenntly, the implementation is to unconditionally hand over the job
435 493d6920 Klaus Aehlig
-- to masterd.
436 493d6920 Klaus Aehlig
enqueueJobs :: [QueuedJob] -> IO ()
437 493d6920 Klaus Aehlig
enqueueJobs jobs = do
438 493d6920 Klaus Aehlig
  socketpath <- defaultMasterSocket
439 493d6920 Klaus Aehlig
  client <- getClient socketpath
440 493d6920 Klaus Aehlig
  pickupResults <- mapM (flip callMethod client . PickupJob . qjId) jobs
441 493d6920 Klaus Aehlig
  let failures = map show $ justBad pickupResults
442 493d6920 Klaus Aehlig
  unless (null failures)
443 493d6920 Klaus Aehlig
   . logWarning . (++) "Failed to notify masterd: " . commaJoin $ failures