Statistics
| Branch: | Tag: | Revision:

root / src / Ganeti / JQueue.hs @ 557f5dad

History | View | Annotate | Download (16.4 kB)

1 aa79e62e Iustin Pop
{-# LANGUAGE TemplateHaskell #-}
2 aa79e62e Iustin Pop
3 aa79e62e Iustin Pop
{-| Implementation of the job queue.
4 aa79e62e Iustin Pop
5 aa79e62e Iustin Pop
-}
6 aa79e62e Iustin Pop
7 aa79e62e Iustin Pop
{-
8 aa79e62e Iustin Pop
9 aa79e62e Iustin Pop
Copyright (C) 2010, 2012 Google Inc.
10 aa79e62e Iustin Pop
11 aa79e62e Iustin Pop
This program is free software; you can redistribute it and/or modify
12 aa79e62e Iustin Pop
it under the terms of the GNU General Public License as published by
13 aa79e62e Iustin Pop
the Free Software Foundation; either version 2 of the License, or
14 aa79e62e Iustin Pop
(at your option) any later version.
15 aa79e62e Iustin Pop
16 aa79e62e Iustin Pop
This program is distributed in the hope that it will be useful, but
17 aa79e62e Iustin Pop
WITHOUT ANY WARRANTY; without even the implied warranty of
18 aa79e62e Iustin Pop
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
19 aa79e62e Iustin Pop
General Public License for more details.
20 aa79e62e Iustin Pop
21 aa79e62e Iustin Pop
You should have received a copy of the GNU General Public License
22 aa79e62e Iustin Pop
along with this program; if not, write to the Free Software
23 aa79e62e Iustin Pop
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
24 aa79e62e Iustin Pop
02110-1301, USA.
25 aa79e62e Iustin Pop
26 aa79e62e Iustin Pop
-}
27 aa79e62e Iustin Pop
28 aa79e62e Iustin Pop
module Ganeti.JQueue
29 aa79e62e Iustin Pop
    ( QueuedOpCode(..)
30 aa79e62e Iustin Pop
    , QueuedJob(..)
31 aa79e62e Iustin Pop
    , InputOpCode(..)
32 4b49a72b Klaus Aehlig
    , queuedOpCodeFromMetaOpCode
33 1c1132f4 Klaus Aehlig
    , queuedJobFromOpCodes
34 aa79e62e Iustin Pop
    , Timestamp
35 aa79e62e Iustin Pop
    , noTimestamp
36 c3a70209 Klaus Aehlig
    , currentTimestamp
37 2af22d70 Klaus Aehlig
    , setReceivedTimestamp
38 aa79e62e Iustin Pop
    , opStatusFinalized
39 aa79e62e Iustin Pop
    , extractOpSummary
40 aa79e62e Iustin Pop
    , calcJobStatus
41 02e40b13 Klaus Aehlig
    , jobStarted
42 847df9e9 Klaus Aehlig
    , jobFinalized
43 aa79e62e Iustin Pop
    , calcJobPriority
44 aa79e62e Iustin Pop
    , jobFileName
45 aa79e62e Iustin Pop
    , liveJobFile
46 aa79e62e Iustin Pop
    , archivedJobFile
47 aa79e62e Iustin Pop
    , determineJobDirectories
48 aa79e62e Iustin Pop
    , getJobIDs
49 aa79e62e Iustin Pop
    , sortJobIDs
50 aa79e62e Iustin Pop
    , loadJobFromDisk
51 aa79e62e Iustin Pop
    , noSuchJob
52 cef3f99f Klaus Aehlig
    , readSerialFromDisk
53 ae858516 Klaus Aehlig
    , allocateJobIds
54 ae858516 Klaus Aehlig
    , allocateJobId
55 b498ed42 Klaus Aehlig
    , writeJobToDisk
56 9fd653a4 Klaus Aehlig
    , replicateManyJobs
57 1b94c0db Klaus Aehlig
    , isQueueOpen
58 ac0c5c6d Klaus Aehlig
    , startJobs
59 aa79e62e Iustin Pop
    ) where
60 aa79e62e Iustin Pop
61 557f5dad Klaus Aehlig
import Control.Arrow (second)
62 ae858516 Klaus Aehlig
import Control.Concurrent.MVar
63 aa79e62e Iustin Pop
import Control.Exception
64 aa79e62e Iustin Pop
import Control.Monad
65 557f5dad Klaus Aehlig
import Data.Functor ((<$))
66 aa79e62e Iustin Pop
import Data.List
67 4b49a72b Klaus Aehlig
import Data.Maybe
68 aa79e62e Iustin Pop
import Data.Ord (comparing)
69 aa79e62e Iustin Pop
-- workaround what seems to be a bug in ghc 7.4's TH shadowing code
70 557f5dad Klaus Aehlig
import Prelude hiding (id, log)
71 aa79e62e Iustin Pop
import System.Directory
72 aa79e62e Iustin Pop
import System.FilePath
73 aa79e62e Iustin Pop
import System.IO.Error (isDoesNotExistError)
74 aa79e62e Iustin Pop
import System.Posix.Files
75 c3a70209 Klaus Aehlig
import System.Time
76 aa79e62e Iustin Pop
import qualified Text.JSON
77 aa79e62e Iustin Pop
import Text.JSON.Types
78 aa79e62e Iustin Pop
79 aa79e62e Iustin Pop
import Ganeti.BasicTypes
80 aa79e62e Iustin Pop
import qualified Ganeti.Constants as C
81 aa79e62e Iustin Pop
import Ganeti.JSON
82 aa79e62e Iustin Pop
import Ganeti.Logging
83 493d6920 Klaus Aehlig
import Ganeti.Luxi
84 ae858516 Klaus Aehlig
import Ganeti.Objects (Node)
85 aa79e62e Iustin Pop
import Ganeti.OpCodes
86 aa79e62e Iustin Pop
import Ganeti.Path
87 b5a96995 Klaus Aehlig
import Ganeti.Rpc (executeRpcCall, ERpcError, logRpcErrors,
88 b5a96995 Klaus Aehlig
                   RpcCallJobqueueUpdate(..))
89 aa79e62e Iustin Pop
import Ganeti.THH
90 aa79e62e Iustin Pop
import Ganeti.Types
91 cef3f99f Klaus Aehlig
import Ganeti.Utils
92 aa79e62e Iustin Pop
93 aa79e62e Iustin Pop
-- * Data types
94 aa79e62e Iustin Pop
95 c3a70209 Klaus Aehlig
-- | The ganeti queue timestamp type. It represents the time as the pair
96 c3a70209 Klaus Aehlig
-- of seconds since the epoch and microseconds since the beginning of the
97 c3a70209 Klaus Aehlig
-- second.
98 aa79e62e Iustin Pop
type Timestamp = (Int, Int)
99 aa79e62e Iustin Pop
100 aa79e62e Iustin Pop
-- | Missing timestamp type.
101 aa79e62e Iustin Pop
noTimestamp :: Timestamp
102 aa79e62e Iustin Pop
noTimestamp = (-1, -1)
103 aa79e62e Iustin Pop
104 c3a70209 Klaus Aehlig
-- | Get the current time in the job-queue timestamp format.
105 c3a70209 Klaus Aehlig
currentTimestamp :: IO Timestamp
106 c3a70209 Klaus Aehlig
currentTimestamp = do
107 c3a70209 Klaus Aehlig
  TOD ctime pico <- getClockTime
108 c3a70209 Klaus Aehlig
  return (fromIntegral ctime, fromIntegral $ pico `div` 1000000)
109 c3a70209 Klaus Aehlig
110 aa79e62e Iustin Pop
-- | An input opcode.
111 aa79e62e Iustin Pop
data InputOpCode = ValidOpCode MetaOpCode -- ^ OpCode was parsed successfully
112 aa79e62e Iustin Pop
                 | InvalidOpCode JSValue  -- ^ Invalid opcode
113 aa79e62e Iustin Pop
                   deriving (Show, Eq)
114 aa79e62e Iustin Pop
115 aa79e62e Iustin Pop
-- | JSON instance for 'InputOpCode', trying to parse it and if
116 aa79e62e Iustin Pop
-- failing, keeping the original JSValue.
117 aa79e62e Iustin Pop
instance Text.JSON.JSON InputOpCode where
118 aa79e62e Iustin Pop
  showJSON (ValidOpCode mo) = Text.JSON.showJSON mo
119 aa79e62e Iustin Pop
  showJSON (InvalidOpCode inv) = inv
120 aa79e62e Iustin Pop
  readJSON v = case Text.JSON.readJSON v of
121 aa79e62e Iustin Pop
                 Text.JSON.Error _ -> return $ InvalidOpCode v
122 aa79e62e Iustin Pop
                 Text.JSON.Ok mo -> return $ ValidOpCode mo
123 aa79e62e Iustin Pop
124 aa79e62e Iustin Pop
-- | Invalid opcode summary.
125 aa79e62e Iustin Pop
invalidOp :: String
126 aa79e62e Iustin Pop
invalidOp = "INVALID_OP"
127 aa79e62e Iustin Pop
128 aa79e62e Iustin Pop
-- | Tries to extract the opcode summary from an 'InputOpCode'. This
129 aa79e62e Iustin Pop
-- duplicates some functionality from the 'opSummary' function in
130 aa79e62e Iustin Pop
-- "Ganeti.OpCodes".
131 aa79e62e Iustin Pop
extractOpSummary :: InputOpCode -> String
132 aa79e62e Iustin Pop
extractOpSummary (ValidOpCode metaop) = opSummary $ metaOpCode metaop
133 aa79e62e Iustin Pop
extractOpSummary (InvalidOpCode (JSObject o)) =
134 aa79e62e Iustin Pop
  case fromObjWithDefault (fromJSObject o) "OP_ID" ("OP_" ++ invalidOp) of
135 aa79e62e Iustin Pop
    Just s -> drop 3 s -- drop the OP_ prefix
136 aa79e62e Iustin Pop
    Nothing -> invalidOp
137 aa79e62e Iustin Pop
extractOpSummary _ = invalidOp
138 aa79e62e Iustin Pop
139 aa79e62e Iustin Pop
$(buildObject "QueuedOpCode" "qo"
140 aa79e62e Iustin Pop
  [ simpleField "input"           [t| InputOpCode |]
141 aa79e62e Iustin Pop
  , simpleField "status"          [t| OpStatus    |]
142 aa79e62e Iustin Pop
  , simpleField "result"          [t| JSValue     |]
143 aa79e62e Iustin Pop
  , defaultField [| [] |] $
144 aa79e62e Iustin Pop
    simpleField "log"             [t| [(Int, Timestamp, ELogType, JSValue)] |]
145 aa79e62e Iustin Pop
  , simpleField "priority"        [t| Int         |]
146 aa79e62e Iustin Pop
  , optionalNullSerField $
147 aa79e62e Iustin Pop
    simpleField "start_timestamp" [t| Timestamp   |]
148 aa79e62e Iustin Pop
  , optionalNullSerField $
149 aa79e62e Iustin Pop
    simpleField "exec_timestamp"  [t| Timestamp   |]
150 aa79e62e Iustin Pop
  , optionalNullSerField $
151 aa79e62e Iustin Pop
    simpleField "end_timestamp"   [t| Timestamp   |]
152 aa79e62e Iustin Pop
  ])
153 aa79e62e Iustin Pop
154 aa79e62e Iustin Pop
$(buildObject "QueuedJob" "qj"
155 aa79e62e Iustin Pop
  [ simpleField "id"                 [t| JobId          |]
156 aa79e62e Iustin Pop
  , simpleField "ops"                [t| [QueuedOpCode] |]
157 aa79e62e Iustin Pop
  , optionalNullSerField $
158 aa79e62e Iustin Pop
    simpleField "received_timestamp" [t| Timestamp      |]
159 aa79e62e Iustin Pop
  , optionalNullSerField $
160 aa79e62e Iustin Pop
    simpleField "start_timestamp"    [t| Timestamp      |]
161 aa79e62e Iustin Pop
  , optionalNullSerField $
162 aa79e62e Iustin Pop
    simpleField "end_timestamp"      [t| Timestamp      |]
163 aa79e62e Iustin Pop
  ])
164 aa79e62e Iustin Pop
165 4b49a72b Klaus Aehlig
-- | Convenience function to obtain a QueuedOpCode from a MetaOpCode
166 4b49a72b Klaus Aehlig
queuedOpCodeFromMetaOpCode :: MetaOpCode -> QueuedOpCode
167 4b49a72b Klaus Aehlig
queuedOpCodeFromMetaOpCode op =
168 4b49a72b Klaus Aehlig
  QueuedOpCode { qoInput = ValidOpCode op
169 4b49a72b Klaus Aehlig
               , qoStatus = OP_STATUS_QUEUED
170 4b49a72b Klaus Aehlig
               , qoPriority = opSubmitPriorityToRaw . opPriority . metaParams
171 4b49a72b Klaus Aehlig
                              $ op
172 4b49a72b Klaus Aehlig
               , qoLog = []
173 4b49a72b Klaus Aehlig
               , qoResult = JSNull
174 4b49a72b Klaus Aehlig
               , qoStartTimestamp = Nothing
175 4b49a72b Klaus Aehlig
               , qoEndTimestamp = Nothing
176 4b49a72b Klaus Aehlig
               , qoExecTimestamp = Nothing
177 4b49a72b Klaus Aehlig
               }
178 4b49a72b Klaus Aehlig
179 1c1132f4 Klaus Aehlig
-- | From a job-id and a list of op-codes create a job. This is
180 1c1132f4 Klaus Aehlig
-- the pure part of job creation, as allocating a new job id
181 1c1132f4 Klaus Aehlig
-- lives in IO.
182 1c1132f4 Klaus Aehlig
queuedJobFromOpCodes :: (Monad m) => JobId -> [MetaOpCode] -> m QueuedJob
183 1c1132f4 Klaus Aehlig
queuedJobFromOpCodes jobid ops = do
184 1c1132f4 Klaus Aehlig
  ops' <- mapM (`resolveDependencies` jobid) ops
185 1c1132f4 Klaus Aehlig
  return QueuedJob { qjId = jobid
186 1c1132f4 Klaus Aehlig
                   , qjOps = map queuedOpCodeFromMetaOpCode ops'
187 1c1132f4 Klaus Aehlig
                   , qjReceivedTimestamp = Nothing 
188 1c1132f4 Klaus Aehlig
                   , qjStartTimestamp = Nothing
189 1c1132f4 Klaus Aehlig
                   , qjEndTimestamp = Nothing
190 1c1132f4 Klaus Aehlig
                   }
191 1c1132f4 Klaus Aehlig
192 2af22d70 Klaus Aehlig
-- | Attach a received timestamp to a Queued Job.
193 2af22d70 Klaus Aehlig
setReceivedTimestamp :: Timestamp -> QueuedJob -> QueuedJob
194 2af22d70 Klaus Aehlig
setReceivedTimestamp ts job = job { qjReceivedTimestamp = Just ts }
195 2af22d70 Klaus Aehlig
196 aa79e62e Iustin Pop
-- | Job file prefix.
197 aa79e62e Iustin Pop
jobFilePrefix :: String
198 aa79e62e Iustin Pop
jobFilePrefix = "job-"
199 aa79e62e Iustin Pop
200 aa79e62e Iustin Pop
-- | Computes the filename for a given job ID.
201 aa79e62e Iustin Pop
jobFileName :: JobId -> FilePath
202 aa79e62e Iustin Pop
jobFileName jid = jobFilePrefix ++ show (fromJobId jid)
203 aa79e62e Iustin Pop
204 aa79e62e Iustin Pop
-- | Parses a job ID from a file name.
205 aa79e62e Iustin Pop
parseJobFileId :: (Monad m) => FilePath -> m JobId
206 aa79e62e Iustin Pop
parseJobFileId path =
207 aa79e62e Iustin Pop
  case stripPrefix jobFilePrefix path of
208 aa79e62e Iustin Pop
    Nothing -> fail $ "Job file '" ++ path ++
209 aa79e62e Iustin Pop
                      "' doesn't have the correct prefix"
210 aa79e62e Iustin Pop
    Just suffix -> makeJobIdS suffix
211 aa79e62e Iustin Pop
212 aa79e62e Iustin Pop
-- | Computes the full path to a live job.
213 aa79e62e Iustin Pop
liveJobFile :: FilePath -> JobId -> FilePath
214 aa79e62e Iustin Pop
liveJobFile rootdir jid = rootdir </> jobFileName jid
215 aa79e62e Iustin Pop
216 aa79e62e Iustin Pop
-- | Computes the full path to an archives job. BROKEN.
217 aa79e62e Iustin Pop
archivedJobFile :: FilePath -> JobId -> FilePath
218 aa79e62e Iustin Pop
archivedJobFile rootdir jid =
219 aa79e62e Iustin Pop
  let subdir = show (fromJobId jid `div` C.jstoreJobsPerArchiveDirectory)
220 aa79e62e Iustin Pop
  in rootdir </> jobQueueArchiveSubDir </> subdir </> jobFileName jid
221 aa79e62e Iustin Pop
222 aa79e62e Iustin Pop
-- | Map from opcode status to job status.
223 aa79e62e Iustin Pop
opStatusToJob :: OpStatus -> JobStatus
224 aa79e62e Iustin Pop
opStatusToJob OP_STATUS_QUEUED    = JOB_STATUS_QUEUED
225 aa79e62e Iustin Pop
opStatusToJob OP_STATUS_WAITING   = JOB_STATUS_WAITING
226 aa79e62e Iustin Pop
opStatusToJob OP_STATUS_SUCCESS   = JOB_STATUS_SUCCESS
227 aa79e62e Iustin Pop
opStatusToJob OP_STATUS_RUNNING   = JOB_STATUS_RUNNING
228 aa79e62e Iustin Pop
opStatusToJob OP_STATUS_CANCELING = JOB_STATUS_CANCELING
229 aa79e62e Iustin Pop
opStatusToJob OP_STATUS_CANCELED  = JOB_STATUS_CANCELED
230 aa79e62e Iustin Pop
opStatusToJob OP_STATUS_ERROR     = JOB_STATUS_ERROR
231 aa79e62e Iustin Pop
232 aa79e62e Iustin Pop
-- | Computes a queued job's status.
233 aa79e62e Iustin Pop
calcJobStatus :: QueuedJob -> JobStatus
234 aa79e62e Iustin Pop
calcJobStatus QueuedJob { qjOps = ops } =
235 aa79e62e Iustin Pop
  extractOpSt (map qoStatus ops) JOB_STATUS_QUEUED True
236 aa79e62e Iustin Pop
    where
237 aa79e62e Iustin Pop
      terminalStatus OP_STATUS_ERROR     = True
238 aa79e62e Iustin Pop
      terminalStatus OP_STATUS_CANCELING = True
239 aa79e62e Iustin Pop
      terminalStatus OP_STATUS_CANCELED  = True
240 aa79e62e Iustin Pop
      terminalStatus _                   = False
241 aa79e62e Iustin Pop
      softStatus     OP_STATUS_SUCCESS   = True
242 aa79e62e Iustin Pop
      softStatus     OP_STATUS_QUEUED    = True
243 aa79e62e Iustin Pop
      softStatus     _                   = False
244 aa79e62e Iustin Pop
      extractOpSt [] _ True = JOB_STATUS_SUCCESS
245 aa79e62e Iustin Pop
      extractOpSt [] d False = d
246 aa79e62e Iustin Pop
      extractOpSt (x:xs) d old_all
247 aa79e62e Iustin Pop
           | terminalStatus x = opStatusToJob x -- abort recursion
248 aa79e62e Iustin Pop
           | softStatus x     = extractOpSt xs d new_all -- continue unchanged
249 aa79e62e Iustin Pop
           | otherwise        = extractOpSt xs (opStatusToJob x) new_all
250 aa79e62e Iustin Pop
           where new_all = x == OP_STATUS_SUCCESS && old_all
251 aa79e62e Iustin Pop
252 02e40b13 Klaus Aehlig
-- | Determine if a job has started
253 02e40b13 Klaus Aehlig
jobStarted :: QueuedJob -> Bool
254 02e40b13 Klaus Aehlig
jobStarted = (> JOB_STATUS_QUEUED) . calcJobStatus
255 02e40b13 Klaus Aehlig
256 847df9e9 Klaus Aehlig
-- | Determine if a job is finalised.
257 847df9e9 Klaus Aehlig
jobFinalized :: QueuedJob -> Bool
258 847df9e9 Klaus Aehlig
jobFinalized = (> JOB_STATUS_RUNNING) . calcJobStatus
259 847df9e9 Klaus Aehlig
260 aa79e62e Iustin Pop
-- | Determine whether an opcode status is finalized.
261 aa79e62e Iustin Pop
opStatusFinalized :: OpStatus -> Bool
262 aa79e62e Iustin Pop
opStatusFinalized = (> OP_STATUS_RUNNING)
263 aa79e62e Iustin Pop
264 aa79e62e Iustin Pop
-- | Compute a job's priority.
265 aa79e62e Iustin Pop
calcJobPriority :: QueuedJob -> Int
266 aa79e62e Iustin Pop
calcJobPriority QueuedJob { qjOps = ops } =
267 aa79e62e Iustin Pop
  helper . map qoPriority $ filter (not . opStatusFinalized . qoStatus) ops
268 aa79e62e Iustin Pop
    where helper [] = C.opPrioDefault
269 aa79e62e Iustin Pop
          helper ps = minimum ps
270 aa79e62e Iustin Pop
271 aa79e62e Iustin Pop
-- | Log but ignore an 'IOError'.
272 aa79e62e Iustin Pop
ignoreIOError :: a -> Bool -> String -> IOError -> IO a
273 aa79e62e Iustin Pop
ignoreIOError a ignore_noent msg e = do
274 aa79e62e Iustin Pop
  unless (isDoesNotExistError e && ignore_noent) .
275 aa79e62e Iustin Pop
    logWarning $ msg ++ ": " ++ show e
276 aa79e62e Iustin Pop
  return a
277 aa79e62e Iustin Pop
278 aa79e62e Iustin Pop
-- | Compute the list of existing archive directories. Note that I/O
279 aa79e62e Iustin Pop
-- exceptions are swallowed and ignored.
280 aa79e62e Iustin Pop
allArchiveDirs :: FilePath -> IO [FilePath]
281 aa79e62e Iustin Pop
allArchiveDirs rootdir = do
282 aa79e62e Iustin Pop
  let adir = rootdir </> jobQueueArchiveSubDir
283 aa79e62e Iustin Pop
  contents <- getDirectoryContents adir `Control.Exception.catch`
284 aa79e62e Iustin Pop
               ignoreIOError [] False
285 aa79e62e Iustin Pop
                 ("Failed to list queue directory " ++ adir)
286 aa79e62e Iustin Pop
  let fpaths = map (adir </>) $ filter (not . ("." `isPrefixOf`)) contents
287 aa79e62e Iustin Pop
  filterM (\path ->
288 aa79e62e Iustin Pop
             liftM isDirectory (getFileStatus (adir </> path))
289 aa79e62e Iustin Pop
               `Control.Exception.catch`
290 aa79e62e Iustin Pop
               ignoreIOError False True
291 aa79e62e Iustin Pop
                 ("Failed to stat archive path " ++ path)) fpaths
292 aa79e62e Iustin Pop
293 aa79e62e Iustin Pop
-- | Build list of directories containing job files. Note: compared to
294 aa79e62e Iustin Pop
-- the Python version, this doesn't ignore a potential lost+found
295 aa79e62e Iustin Pop
-- file.
296 aa79e62e Iustin Pop
determineJobDirectories :: FilePath -> Bool -> IO [FilePath]
297 aa79e62e Iustin Pop
determineJobDirectories rootdir archived = do
298 aa79e62e Iustin Pop
  other <- if archived
299 aa79e62e Iustin Pop
             then allArchiveDirs rootdir
300 aa79e62e Iustin Pop
             else return []
301 aa79e62e Iustin Pop
  return $ rootdir:other
302 aa79e62e Iustin Pop
303 3cecd73c Michele Tartara
-- Function equivalent to the \'sequence\' function, that cannot be used because
304 3cecd73c Michele Tartara
-- of library version conflict on Lucid.
305 3cecd73c Michele Tartara
-- FIXME: delete this and just use \'sequence\' instead when Lucid compatibility
306 3cecd73c Michele Tartara
-- will not be required anymore.
307 3cecd73c Michele Tartara
sequencer :: [Either IOError [JobId]] -> Either IOError [[JobId]]
308 3cecd73c Michele Tartara
sequencer l = fmap reverse $ foldl seqFolder (Right []) l
309 3cecd73c Michele Tartara
310 3cecd73c Michele Tartara
-- | Folding function for joining multiple [JobIds] into one list.
311 3cecd73c Michele Tartara
seqFolder :: Either IOError [[JobId]]
312 3cecd73c Michele Tartara
          -> Either IOError [JobId]
313 3cecd73c Michele Tartara
          -> Either IOError [[JobId]]
314 3cecd73c Michele Tartara
seqFolder (Left e) _ = Left e
315 3cecd73c Michele Tartara
seqFolder (Right _) (Left e) = Left e
316 3cecd73c Michele Tartara
seqFolder (Right l) (Right el) = Right $ el:l
317 3cecd73c Michele Tartara
318 aa79e62e Iustin Pop
-- | Computes the list of all jobs in the given directories.
319 be0cb2d7 Michele Tartara
getJobIDs :: [FilePath] -> IO (Either IOError [JobId])
320 3cecd73c Michele Tartara
getJobIDs paths = liftM (fmap concat . sequencer) (mapM getDirJobIDs paths)
321 aa79e62e Iustin Pop
322 aa79e62e Iustin Pop
-- | Sorts the a list of job IDs.
323 aa79e62e Iustin Pop
sortJobIDs :: [JobId] -> [JobId]
324 aa79e62e Iustin Pop
sortJobIDs = sortBy (comparing fromJobId)
325 aa79e62e Iustin Pop
326 aa79e62e Iustin Pop
-- | Computes the list of jobs in a given directory.
327 be0cb2d7 Michele Tartara
getDirJobIDs :: FilePath -> IO (Either IOError [JobId])
328 aa79e62e Iustin Pop
getDirJobIDs path = do
329 be0cb2d7 Michele Tartara
  either_contents <-
330 be0cb2d7 Michele Tartara
    try (getDirectoryContents path) :: IO (Either IOError [FilePath])
331 be0cb2d7 Michele Tartara
  case either_contents of
332 be0cb2d7 Michele Tartara
    Left e -> do
333 be0cb2d7 Michele Tartara
      logWarning $ "Failed to list job directory " ++ path ++ ": " ++ show e
334 be0cb2d7 Michele Tartara
      return $ Left e
335 be0cb2d7 Michele Tartara
    Right contents -> do
336 be0cb2d7 Michele Tartara
      let jids = foldl (\ids file ->
337 be0cb2d7 Michele Tartara
                         case parseJobFileId file of
338 be0cb2d7 Michele Tartara
                           Nothing -> ids
339 be0cb2d7 Michele Tartara
                           Just new_id -> new_id:ids) [] contents
340 be0cb2d7 Michele Tartara
      return . Right $ reverse jids
341 aa79e62e Iustin Pop
342 aa79e62e Iustin Pop
-- | Reads the job data from disk.
343 aa79e62e Iustin Pop
readJobDataFromDisk :: FilePath -> Bool -> JobId -> IO (Maybe (String, Bool))
344 aa79e62e Iustin Pop
readJobDataFromDisk rootdir archived jid = do
345 aa79e62e Iustin Pop
  let live_path = liveJobFile rootdir jid
346 aa79e62e Iustin Pop
      archived_path = archivedJobFile rootdir jid
347 aa79e62e Iustin Pop
      all_paths = if archived
348 aa79e62e Iustin Pop
                    then [(live_path, False), (archived_path, True)]
349 aa79e62e Iustin Pop
                    else [(live_path, False)]
350 aa79e62e Iustin Pop
  foldM (\state (path, isarchived) ->
351 aa79e62e Iustin Pop
           liftM (\r -> Just (r, isarchived)) (readFile path)
352 aa79e62e Iustin Pop
             `Control.Exception.catch`
353 aa79e62e Iustin Pop
             ignoreIOError state True
354 aa79e62e Iustin Pop
               ("Failed to read job file " ++ path)) Nothing all_paths
355 aa79e62e Iustin Pop
356 aa79e62e Iustin Pop
-- | Failed to load job error.
357 aa79e62e Iustin Pop
noSuchJob :: Result (QueuedJob, Bool)
358 aa79e62e Iustin Pop
noSuchJob = Bad "Can't load job file"
359 aa79e62e Iustin Pop
360 aa79e62e Iustin Pop
-- | Loads a job from disk.
361 aa79e62e Iustin Pop
loadJobFromDisk :: FilePath -> Bool -> JobId -> IO (Result (QueuedJob, Bool))
362 aa79e62e Iustin Pop
loadJobFromDisk rootdir archived jid = do
363 aa79e62e Iustin Pop
  raw <- readJobDataFromDisk rootdir archived jid
364 aa79e62e Iustin Pop
  -- note: we need some stricness below, otherwise the wrapping in a
365 aa79e62e Iustin Pop
  -- Result will create too much lazyness, and not close the file
366 aa79e62e Iustin Pop
  -- descriptors for the individual jobs
367 aa79e62e Iustin Pop
  return $! case raw of
368 aa79e62e Iustin Pop
             Nothing -> noSuchJob
369 aa79e62e Iustin Pop
             Just (str, arch) ->
370 aa79e62e Iustin Pop
               liftM (\qj -> (qj, arch)) .
371 aa79e62e Iustin Pop
               fromJResult "Parsing job file" $ Text.JSON.decode str
372 cef3f99f Klaus Aehlig
373 b498ed42 Klaus Aehlig
-- | Write a job to disk.
374 b498ed42 Klaus Aehlig
writeJobToDisk :: FilePath -> QueuedJob -> IO (Result ())
375 b498ed42 Klaus Aehlig
writeJobToDisk rootdir job = do
376 b498ed42 Klaus Aehlig
  let filename = liveJobFile rootdir . qjId $ job
377 b498ed42 Klaus Aehlig
      content = Text.JSON.encode . Text.JSON.showJSON $ job
378 b498ed42 Klaus Aehlig
  tryAndLogIOError (atomicWriteFile filename content)
379 b498ed42 Klaus Aehlig
                   ("Failed to write " ++ filename) Ok
380 b498ed42 Klaus Aehlig
381 b5a96995 Klaus Aehlig
-- | Replicate a job to all master candidates.
382 b5a96995 Klaus Aehlig
replicateJob :: FilePath -> [Node] -> QueuedJob -> IO [(Node, ERpcError ())]
383 b5a96995 Klaus Aehlig
replicateJob rootdir mastercandidates job = do
384 b5a96995 Klaus Aehlig
  let filename = liveJobFile rootdir . qjId $ job
385 b5a96995 Klaus Aehlig
      content = Text.JSON.encode . Text.JSON.showJSON $ job
386 557f5dad Klaus Aehlig
  callresult <- executeRpcCall mastercandidates
387 557f5dad Klaus Aehlig
                  $ RpcCallJobqueueUpdate filename content
388 557f5dad Klaus Aehlig
  let result = map (second (() <$)) callresult
389 b5a96995 Klaus Aehlig
  logRpcErrors result
390 b5a96995 Klaus Aehlig
  return result
391 b5a96995 Klaus Aehlig
392 9fd653a4 Klaus Aehlig
-- | Replicate many jobs to all master candidates.
393 9fd653a4 Klaus Aehlig
replicateManyJobs :: FilePath -> [Node] -> [QueuedJob] -> IO ()
394 9fd653a4 Klaus Aehlig
replicateManyJobs rootdir mastercandidates =
395 9fd653a4 Klaus Aehlig
  mapM_ (replicateJob rootdir mastercandidates)
396 9fd653a4 Klaus Aehlig
397 cef3f99f Klaus Aehlig
-- | Read the job serial number from disk.
398 cef3f99f Klaus Aehlig
readSerialFromDisk :: IO (Result JobId)
399 cef3f99f Klaus Aehlig
readSerialFromDisk = do
400 cef3f99f Klaus Aehlig
  filename <- jobQueueSerialFile
401 cef3f99f Klaus Aehlig
  tryAndLogIOError (readFile filename) "Failed to read serial file"
402 cef3f99f Klaus Aehlig
                   (makeJobIdS . rStripSpace)
403 ae858516 Klaus Aehlig
404 ae858516 Klaus Aehlig
-- | Allocate new job ids.
405 ae858516 Klaus Aehlig
-- To avoid races while accessing the serial file, the threads synchronize
406 ae858516 Klaus Aehlig
-- over a lock, as usual provided by an MVar.
407 ae858516 Klaus Aehlig
allocateJobIds :: [Node] -> MVar () -> Int -> IO (Result [JobId])
408 ae858516 Klaus Aehlig
allocateJobIds mastercandidates lock n =
409 ae858516 Klaus Aehlig
  if n <= 0
410 ae858516 Klaus Aehlig
    then return . Bad $ "Can only allocate positive number of job ids"
411 ae858516 Klaus Aehlig
    else do
412 ae858516 Klaus Aehlig
      takeMVar lock
413 ae858516 Klaus Aehlig
      rjobid <- readSerialFromDisk
414 ae858516 Klaus Aehlig
      case rjobid of
415 ae858516 Klaus Aehlig
        Bad s -> do
416 ae858516 Klaus Aehlig
          putMVar lock ()
417 ae858516 Klaus Aehlig
          return . Bad $ s
418 ae858516 Klaus Aehlig
        Ok jid -> do
419 ae858516 Klaus Aehlig
          let current = fromJobId jid
420 ae858516 Klaus Aehlig
              serial_content = show (current + n) ++  "\n"
421 ae858516 Klaus Aehlig
          serial <- jobQueueSerialFile
422 ae858516 Klaus Aehlig
          write_result <- try $ atomicWriteFile serial serial_content
423 ae858516 Klaus Aehlig
                          :: IO (Either IOError ())
424 ae858516 Klaus Aehlig
          case write_result of
425 ae858516 Klaus Aehlig
            Left e -> do
426 f7819050 Klaus Aehlig
              putMVar lock ()
427 ae858516 Klaus Aehlig
              let msg = "Failed to write serial file: " ++ show e
428 ae858516 Klaus Aehlig
              logError msg
429 ae858516 Klaus Aehlig
              return . Bad $ msg 
430 ae858516 Klaus Aehlig
            Right () -> do
431 ae858516 Klaus Aehlig
              _ <- executeRpcCall mastercandidates
432 ae858516 Klaus Aehlig
                     $ RpcCallJobqueueUpdate serial serial_content
433 f7819050 Klaus Aehlig
              putMVar lock ()
434 ae858516 Klaus Aehlig
              return $ mapM makeJobId [(current+1)..(current+n)]
435 ae858516 Klaus Aehlig
436 ae858516 Klaus Aehlig
-- | Allocate one new job id.
437 ae858516 Klaus Aehlig
allocateJobId :: [Node] -> MVar () -> IO (Result JobId)
438 ae858516 Klaus Aehlig
allocateJobId mastercandidates lock = do
439 ae858516 Klaus Aehlig
  jids <- allocateJobIds mastercandidates lock 1
440 ae858516 Klaus Aehlig
  return (jids >>= monadicThe "Failed to allocate precisely one Job ID")
441 1b94c0db Klaus Aehlig
442 1b94c0db Klaus Aehlig
-- | Decide if job queue is open
443 1b94c0db Klaus Aehlig
isQueueOpen :: IO Bool
444 1b94c0db Klaus Aehlig
isQueueOpen = liftM not (jobQueueDrainFile >>= doesFileExist)
445 493d6920 Klaus Aehlig
446 ac0c5c6d Klaus Aehlig
-- | Start enqueued jobs, currently by handing them over to masterd.
447 ac0c5c6d Klaus Aehlig
startJobs :: [QueuedJob] -> IO ()
448 ac0c5c6d Klaus Aehlig
startJobs jobs = do
449 493d6920 Klaus Aehlig
  socketpath <- defaultMasterSocket
450 d605e261 Petr Pudlak
  client <- getLuxiClient socketpath
451 493d6920 Klaus Aehlig
  pickupResults <- mapM (flip callMethod client . PickupJob . qjId) jobs
452 493d6920 Klaus Aehlig
  let failures = map show $ justBad pickupResults
453 493d6920 Klaus Aehlig
  unless (null failures)
454 493d6920 Klaus Aehlig
   . logWarning . (++) "Failed to notify masterd: " . commaJoin $ failures