Statistics
| Branch: | Tag: | Revision:

root / src / Ganeti / JQueue.hs @ b9202225

History | View | Annotate | Download (17.2 kB)

1 aa79e62e Iustin Pop
{-# LANGUAGE TemplateHaskell #-}
2 aa79e62e Iustin Pop
3 aa79e62e Iustin Pop
{-| Implementation of the job queue.
4 aa79e62e Iustin Pop
5 aa79e62e Iustin Pop
-}
6 aa79e62e Iustin Pop
7 aa79e62e Iustin Pop
{-
8 aa79e62e Iustin Pop
9 aa79e62e Iustin Pop
Copyright (C) 2010, 2012 Google Inc.
10 aa79e62e Iustin Pop
11 aa79e62e Iustin Pop
This program is free software; you can redistribute it and/or modify
12 aa79e62e Iustin Pop
it under the terms of the GNU General Public License as published by
13 aa79e62e Iustin Pop
the Free Software Foundation; either version 2 of the License, or
14 aa79e62e Iustin Pop
(at your option) any later version.
15 aa79e62e Iustin Pop
16 aa79e62e Iustin Pop
This program is distributed in the hope that it will be useful, but
17 aa79e62e Iustin Pop
WITHOUT ANY WARRANTY; without even the implied warranty of
18 aa79e62e Iustin Pop
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
19 aa79e62e Iustin Pop
General Public License for more details.
20 aa79e62e Iustin Pop
21 aa79e62e Iustin Pop
You should have received a copy of the GNU General Public License
22 aa79e62e Iustin Pop
along with this program; if not, write to the Free Software
23 aa79e62e Iustin Pop
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
24 aa79e62e Iustin Pop
02110-1301, USA.
25 aa79e62e Iustin Pop
26 aa79e62e Iustin Pop
-}
27 aa79e62e Iustin Pop
28 aa79e62e Iustin Pop
module Ganeti.JQueue
29 aa79e62e Iustin Pop
    ( QueuedOpCode(..)
30 aa79e62e Iustin Pop
    , QueuedJob(..)
31 aa79e62e Iustin Pop
    , InputOpCode(..)
32 4b49a72b Klaus Aehlig
    , queuedOpCodeFromMetaOpCode
33 1c1132f4 Klaus Aehlig
    , queuedJobFromOpCodes
34 363dc9d6 Klaus Aehlig
    , cancelQueuedJob
35 aa79e62e Iustin Pop
    , Timestamp
36 aa79e62e Iustin Pop
    , noTimestamp
37 c3a70209 Klaus Aehlig
    , currentTimestamp
38 2af22d70 Klaus Aehlig
    , setReceivedTimestamp
39 aa79e62e Iustin Pop
    , opStatusFinalized
40 aa79e62e Iustin Pop
    , extractOpSummary
41 aa79e62e Iustin Pop
    , calcJobStatus
42 02e40b13 Klaus Aehlig
    , jobStarted
43 847df9e9 Klaus Aehlig
    , jobFinalized
44 aa79e62e Iustin Pop
    , calcJobPriority
45 aa79e62e Iustin Pop
    , jobFileName
46 aa79e62e Iustin Pop
    , liveJobFile
47 aa79e62e Iustin Pop
    , archivedJobFile
48 aa79e62e Iustin Pop
    , determineJobDirectories
49 aa79e62e Iustin Pop
    , getJobIDs
50 aa79e62e Iustin Pop
    , sortJobIDs
51 aa79e62e Iustin Pop
    , loadJobFromDisk
52 aa79e62e Iustin Pop
    , noSuchJob
53 cef3f99f Klaus Aehlig
    , readSerialFromDisk
54 ae858516 Klaus Aehlig
    , allocateJobIds
55 ae858516 Klaus Aehlig
    , allocateJobId
56 b498ed42 Klaus Aehlig
    , writeJobToDisk
57 9fd653a4 Klaus Aehlig
    , replicateManyJobs
58 1b94c0db Klaus Aehlig
    , isQueueOpen
59 ac0c5c6d Klaus Aehlig
    , startJobs
60 47c3c7b1 Klaus Aehlig
    , cancelJob
61 aa79e62e Iustin Pop
    ) where
62 aa79e62e Iustin Pop
63 557f5dad Klaus Aehlig
import Control.Arrow (second)
64 ae858516 Klaus Aehlig
import Control.Concurrent.MVar
65 aa79e62e Iustin Pop
import Control.Exception
66 aa79e62e Iustin Pop
import Control.Monad
67 557f5dad Klaus Aehlig
import Data.Functor ((<$))
68 aa79e62e Iustin Pop
import Data.List
69 4b49a72b Klaus Aehlig
import Data.Maybe
70 aa79e62e Iustin Pop
import Data.Ord (comparing)
71 aa79e62e Iustin Pop
-- workaround what seems to be a bug in ghc 7.4's TH shadowing code
72 557f5dad Klaus Aehlig
import Prelude hiding (id, log)
73 aa79e62e Iustin Pop
import System.Directory
74 aa79e62e Iustin Pop
import System.FilePath
75 aa79e62e Iustin Pop
import System.IO.Error (isDoesNotExistError)
76 aa79e62e Iustin Pop
import System.Posix.Files
77 c3a70209 Klaus Aehlig
import System.Time
78 aa79e62e Iustin Pop
import qualified Text.JSON
79 aa79e62e Iustin Pop
import Text.JSON.Types
80 aa79e62e Iustin Pop
81 aa79e62e Iustin Pop
import Ganeti.BasicTypes
82 aa79e62e Iustin Pop
import qualified Ganeti.Constants as C
83 47c3c7b1 Klaus Aehlig
import Ganeti.Errors (ErrorResult)
84 aa79e62e Iustin Pop
import Ganeti.JSON
85 aa79e62e Iustin Pop
import Ganeti.Logging
86 493d6920 Klaus Aehlig
import Ganeti.Luxi
87 ae858516 Klaus Aehlig
import Ganeti.Objects (Node)
88 aa79e62e Iustin Pop
import Ganeti.OpCodes
89 aa79e62e Iustin Pop
import Ganeti.Path
90 b5a96995 Klaus Aehlig
import Ganeti.Rpc (executeRpcCall, ERpcError, logRpcErrors,
91 b5a96995 Klaus Aehlig
                   RpcCallJobqueueUpdate(..))
92 aa79e62e Iustin Pop
import Ganeti.THH
93 aa79e62e Iustin Pop
import Ganeti.Types
94 cef3f99f Klaus Aehlig
import Ganeti.Utils
95 aa79e62e Iustin Pop
96 aa79e62e Iustin Pop
-- * Data types
97 aa79e62e Iustin Pop
98 c3a70209 Klaus Aehlig
-- | The ganeti queue timestamp type. It represents the time as the pair
99 c3a70209 Klaus Aehlig
-- of seconds since the epoch and microseconds since the beginning of the
100 c3a70209 Klaus Aehlig
-- second.
101 aa79e62e Iustin Pop
type Timestamp = (Int, Int)
102 aa79e62e Iustin Pop
103 aa79e62e Iustin Pop
-- | Missing timestamp type.
104 aa79e62e Iustin Pop
noTimestamp :: Timestamp
105 aa79e62e Iustin Pop
noTimestamp = (-1, -1)
106 aa79e62e Iustin Pop
107 c3a70209 Klaus Aehlig
-- | Get the current time in the job-queue timestamp format.
108 c3a70209 Klaus Aehlig
currentTimestamp :: IO Timestamp
109 c3a70209 Klaus Aehlig
currentTimestamp = do
110 c3a70209 Klaus Aehlig
  TOD ctime pico <- getClockTime
111 c3a70209 Klaus Aehlig
  return (fromIntegral ctime, fromIntegral $ pico `div` 1000000)
112 c3a70209 Klaus Aehlig
113 aa79e62e Iustin Pop
-- | An input opcode.
114 aa79e62e Iustin Pop
data InputOpCode = ValidOpCode MetaOpCode -- ^ OpCode was parsed successfully
115 aa79e62e Iustin Pop
                 | InvalidOpCode JSValue  -- ^ Invalid opcode
116 aa79e62e Iustin Pop
                   deriving (Show, Eq)
117 aa79e62e Iustin Pop
118 aa79e62e Iustin Pop
-- | JSON instance for 'InputOpCode', trying to parse it and if
119 aa79e62e Iustin Pop
-- failing, keeping the original JSValue.
120 aa79e62e Iustin Pop
instance Text.JSON.JSON InputOpCode where
121 aa79e62e Iustin Pop
  showJSON (ValidOpCode mo) = Text.JSON.showJSON mo
122 aa79e62e Iustin Pop
  showJSON (InvalidOpCode inv) = inv
123 aa79e62e Iustin Pop
  readJSON v = case Text.JSON.readJSON v of
124 aa79e62e Iustin Pop
                 Text.JSON.Error _ -> return $ InvalidOpCode v
125 aa79e62e Iustin Pop
                 Text.JSON.Ok mo -> return $ ValidOpCode mo
126 aa79e62e Iustin Pop
127 aa79e62e Iustin Pop
-- | Invalid opcode summary.
128 aa79e62e Iustin Pop
invalidOp :: String
129 aa79e62e Iustin Pop
invalidOp = "INVALID_OP"
130 aa79e62e Iustin Pop
131 aa79e62e Iustin Pop
-- | Tries to extract the opcode summary from an 'InputOpCode'. This
132 aa79e62e Iustin Pop
-- duplicates some functionality from the 'opSummary' function in
133 aa79e62e Iustin Pop
-- "Ganeti.OpCodes".
134 aa79e62e Iustin Pop
extractOpSummary :: InputOpCode -> String
135 aa79e62e Iustin Pop
extractOpSummary (ValidOpCode metaop) = opSummary $ metaOpCode metaop
136 aa79e62e Iustin Pop
extractOpSummary (InvalidOpCode (JSObject o)) =
137 aa79e62e Iustin Pop
  case fromObjWithDefault (fromJSObject o) "OP_ID" ("OP_" ++ invalidOp) of
138 aa79e62e Iustin Pop
    Just s -> drop 3 s -- drop the OP_ prefix
139 aa79e62e Iustin Pop
    Nothing -> invalidOp
140 aa79e62e Iustin Pop
extractOpSummary _ = invalidOp
141 aa79e62e Iustin Pop
142 aa79e62e Iustin Pop
$(buildObject "QueuedOpCode" "qo"
143 aa79e62e Iustin Pop
  [ simpleField "input"           [t| InputOpCode |]
144 aa79e62e Iustin Pop
  , simpleField "status"          [t| OpStatus    |]
145 aa79e62e Iustin Pop
  , simpleField "result"          [t| JSValue     |]
146 aa79e62e Iustin Pop
  , defaultField [| [] |] $
147 aa79e62e Iustin Pop
    simpleField "log"             [t| [(Int, Timestamp, ELogType, JSValue)] |]
148 aa79e62e Iustin Pop
  , simpleField "priority"        [t| Int         |]
149 aa79e62e Iustin Pop
  , optionalNullSerField $
150 aa79e62e Iustin Pop
    simpleField "start_timestamp" [t| Timestamp   |]
151 aa79e62e Iustin Pop
  , optionalNullSerField $
152 aa79e62e Iustin Pop
    simpleField "exec_timestamp"  [t| Timestamp   |]
153 aa79e62e Iustin Pop
  , optionalNullSerField $
154 aa79e62e Iustin Pop
    simpleField "end_timestamp"   [t| Timestamp   |]
155 aa79e62e Iustin Pop
  ])
156 aa79e62e Iustin Pop
157 aa79e62e Iustin Pop
$(buildObject "QueuedJob" "qj"
158 aa79e62e Iustin Pop
  [ simpleField "id"                 [t| JobId          |]
159 aa79e62e Iustin Pop
  , simpleField "ops"                [t| [QueuedOpCode] |]
160 aa79e62e Iustin Pop
  , optionalNullSerField $
161 aa79e62e Iustin Pop
    simpleField "received_timestamp" [t| Timestamp      |]
162 aa79e62e Iustin Pop
  , optionalNullSerField $
163 aa79e62e Iustin Pop
    simpleField "start_timestamp"    [t| Timestamp      |]
164 aa79e62e Iustin Pop
  , optionalNullSerField $
165 aa79e62e Iustin Pop
    simpleField "end_timestamp"      [t| Timestamp      |]
166 aa79e62e Iustin Pop
  ])
167 aa79e62e Iustin Pop
168 4b49a72b Klaus Aehlig
-- | Convenience function to obtain a QueuedOpCode from a MetaOpCode
169 4b49a72b Klaus Aehlig
queuedOpCodeFromMetaOpCode :: MetaOpCode -> QueuedOpCode
170 4b49a72b Klaus Aehlig
queuedOpCodeFromMetaOpCode op =
171 4b49a72b Klaus Aehlig
  QueuedOpCode { qoInput = ValidOpCode op
172 4b49a72b Klaus Aehlig
               , qoStatus = OP_STATUS_QUEUED
173 4b49a72b Klaus Aehlig
               , qoPriority = opSubmitPriorityToRaw . opPriority . metaParams
174 4b49a72b Klaus Aehlig
                              $ op
175 4b49a72b Klaus Aehlig
               , qoLog = []
176 4b49a72b Klaus Aehlig
               , qoResult = JSNull
177 4b49a72b Klaus Aehlig
               , qoStartTimestamp = Nothing
178 4b49a72b Klaus Aehlig
               , qoEndTimestamp = Nothing
179 4b49a72b Klaus Aehlig
               , qoExecTimestamp = Nothing
180 4b49a72b Klaus Aehlig
               }
181 4b49a72b Klaus Aehlig
182 1c1132f4 Klaus Aehlig
-- | From a job-id and a list of op-codes create a job. This is
183 1c1132f4 Klaus Aehlig
-- the pure part of job creation, as allocating a new job id
184 1c1132f4 Klaus Aehlig
-- lives in IO.
185 1c1132f4 Klaus Aehlig
queuedJobFromOpCodes :: (Monad m) => JobId -> [MetaOpCode] -> m QueuedJob
186 1c1132f4 Klaus Aehlig
queuedJobFromOpCodes jobid ops = do
187 1c1132f4 Klaus Aehlig
  ops' <- mapM (`resolveDependencies` jobid) ops
188 1c1132f4 Klaus Aehlig
  return QueuedJob { qjId = jobid
189 1c1132f4 Klaus Aehlig
                   , qjOps = map queuedOpCodeFromMetaOpCode ops'
190 1c1132f4 Klaus Aehlig
                   , qjReceivedTimestamp = Nothing 
191 1c1132f4 Klaus Aehlig
                   , qjStartTimestamp = Nothing
192 1c1132f4 Klaus Aehlig
                   , qjEndTimestamp = Nothing
193 1c1132f4 Klaus Aehlig
                   }
194 1c1132f4 Klaus Aehlig
195 2af22d70 Klaus Aehlig
-- | Attach a received timestamp to a Queued Job.
196 2af22d70 Klaus Aehlig
setReceivedTimestamp :: Timestamp -> QueuedJob -> QueuedJob
197 2af22d70 Klaus Aehlig
setReceivedTimestamp ts job = job { qjReceivedTimestamp = Just ts }
198 2af22d70 Klaus Aehlig
199 363dc9d6 Klaus Aehlig
-- | Set the state of a QueuedOpCode to canceled.
200 363dc9d6 Klaus Aehlig
cancelOpCode :: Timestamp -> QueuedOpCode -> QueuedOpCode
201 363dc9d6 Klaus Aehlig
cancelOpCode now op =
202 363dc9d6 Klaus Aehlig
  op { qoStatus = OP_STATUS_CANCELED, qoEndTimestamp = Just now }
203 363dc9d6 Klaus Aehlig
204 363dc9d6 Klaus Aehlig
-- | Transform a QueuedJob that has not been started into its canceled form.
205 363dc9d6 Klaus Aehlig
cancelQueuedJob :: Timestamp -> QueuedJob -> QueuedJob
206 363dc9d6 Klaus Aehlig
cancelQueuedJob now job =
207 363dc9d6 Klaus Aehlig
  let ops' = map (cancelOpCode now) $ qjOps job
208 363dc9d6 Klaus Aehlig
  in job { qjOps = ops', qjEndTimestamp = Just now}
209 363dc9d6 Klaus Aehlig
210 aa79e62e Iustin Pop
-- | Job file prefix.
211 aa79e62e Iustin Pop
jobFilePrefix :: String
212 aa79e62e Iustin Pop
jobFilePrefix = "job-"
213 aa79e62e Iustin Pop
214 aa79e62e Iustin Pop
-- | Computes the filename for a given job ID.
215 aa79e62e Iustin Pop
jobFileName :: JobId -> FilePath
216 aa79e62e Iustin Pop
jobFileName jid = jobFilePrefix ++ show (fromJobId jid)
217 aa79e62e Iustin Pop
218 aa79e62e Iustin Pop
-- | Parses a job ID from a file name.
219 aa79e62e Iustin Pop
parseJobFileId :: (Monad m) => FilePath -> m JobId
220 aa79e62e Iustin Pop
parseJobFileId path =
221 aa79e62e Iustin Pop
  case stripPrefix jobFilePrefix path of
222 aa79e62e Iustin Pop
    Nothing -> fail $ "Job file '" ++ path ++
223 aa79e62e Iustin Pop
                      "' doesn't have the correct prefix"
224 aa79e62e Iustin Pop
    Just suffix -> makeJobIdS suffix
225 aa79e62e Iustin Pop
226 aa79e62e Iustin Pop
-- | Computes the full path to a live job.
227 aa79e62e Iustin Pop
liveJobFile :: FilePath -> JobId -> FilePath
228 aa79e62e Iustin Pop
liveJobFile rootdir jid = rootdir </> jobFileName jid
229 aa79e62e Iustin Pop
230 aa79e62e Iustin Pop
-- | Computes the full path to an archives job. BROKEN.
231 aa79e62e Iustin Pop
archivedJobFile :: FilePath -> JobId -> FilePath
232 aa79e62e Iustin Pop
archivedJobFile rootdir jid =
233 aa79e62e Iustin Pop
  let subdir = show (fromJobId jid `div` C.jstoreJobsPerArchiveDirectory)
234 aa79e62e Iustin Pop
  in rootdir </> jobQueueArchiveSubDir </> subdir </> jobFileName jid
235 aa79e62e Iustin Pop
236 aa79e62e Iustin Pop
-- | Map from opcode status to job status.
237 aa79e62e Iustin Pop
opStatusToJob :: OpStatus -> JobStatus
238 aa79e62e Iustin Pop
opStatusToJob OP_STATUS_QUEUED    = JOB_STATUS_QUEUED
239 aa79e62e Iustin Pop
opStatusToJob OP_STATUS_WAITING   = JOB_STATUS_WAITING
240 aa79e62e Iustin Pop
opStatusToJob OP_STATUS_SUCCESS   = JOB_STATUS_SUCCESS
241 aa79e62e Iustin Pop
opStatusToJob OP_STATUS_RUNNING   = JOB_STATUS_RUNNING
242 aa79e62e Iustin Pop
opStatusToJob OP_STATUS_CANCELING = JOB_STATUS_CANCELING
243 aa79e62e Iustin Pop
opStatusToJob OP_STATUS_CANCELED  = JOB_STATUS_CANCELED
244 aa79e62e Iustin Pop
opStatusToJob OP_STATUS_ERROR     = JOB_STATUS_ERROR
245 aa79e62e Iustin Pop
246 aa79e62e Iustin Pop
-- | Computes a queued job's status.
247 aa79e62e Iustin Pop
calcJobStatus :: QueuedJob -> JobStatus
248 aa79e62e Iustin Pop
calcJobStatus QueuedJob { qjOps = ops } =
249 aa79e62e Iustin Pop
  extractOpSt (map qoStatus ops) JOB_STATUS_QUEUED True
250 aa79e62e Iustin Pop
    where
251 aa79e62e Iustin Pop
      terminalStatus OP_STATUS_ERROR     = True
252 aa79e62e Iustin Pop
      terminalStatus OP_STATUS_CANCELING = True
253 aa79e62e Iustin Pop
      terminalStatus OP_STATUS_CANCELED  = True
254 aa79e62e Iustin Pop
      terminalStatus _                   = False
255 aa79e62e Iustin Pop
      softStatus     OP_STATUS_SUCCESS   = True
256 aa79e62e Iustin Pop
      softStatus     OP_STATUS_QUEUED    = True
257 aa79e62e Iustin Pop
      softStatus     _                   = False
258 aa79e62e Iustin Pop
      extractOpSt [] _ True = JOB_STATUS_SUCCESS
259 aa79e62e Iustin Pop
      extractOpSt [] d False = d
260 aa79e62e Iustin Pop
      extractOpSt (x:xs) d old_all
261 aa79e62e Iustin Pop
           | terminalStatus x = opStatusToJob x -- abort recursion
262 aa79e62e Iustin Pop
           | softStatus x     = extractOpSt xs d new_all -- continue unchanged
263 aa79e62e Iustin Pop
           | otherwise        = extractOpSt xs (opStatusToJob x) new_all
264 aa79e62e Iustin Pop
           where new_all = x == OP_STATUS_SUCCESS && old_all
265 aa79e62e Iustin Pop
266 02e40b13 Klaus Aehlig
-- | Determine if a job has started
267 02e40b13 Klaus Aehlig
jobStarted :: QueuedJob -> Bool
268 02e40b13 Klaus Aehlig
jobStarted = (> JOB_STATUS_QUEUED) . calcJobStatus
269 02e40b13 Klaus Aehlig
270 847df9e9 Klaus Aehlig
-- | Determine if a job is finalised.
271 847df9e9 Klaus Aehlig
jobFinalized :: QueuedJob -> Bool
272 847df9e9 Klaus Aehlig
jobFinalized = (> JOB_STATUS_RUNNING) . calcJobStatus
273 847df9e9 Klaus Aehlig
274 aa79e62e Iustin Pop
-- | Determine whether an opcode status is finalized.
275 aa79e62e Iustin Pop
opStatusFinalized :: OpStatus -> Bool
276 aa79e62e Iustin Pop
opStatusFinalized = (> OP_STATUS_RUNNING)
277 aa79e62e Iustin Pop
278 aa79e62e Iustin Pop
-- | Compute a job's priority.
279 aa79e62e Iustin Pop
calcJobPriority :: QueuedJob -> Int
280 aa79e62e Iustin Pop
calcJobPriority QueuedJob { qjOps = ops } =
281 aa79e62e Iustin Pop
  helper . map qoPriority $ filter (not . opStatusFinalized . qoStatus) ops
282 aa79e62e Iustin Pop
    where helper [] = C.opPrioDefault
283 aa79e62e Iustin Pop
          helper ps = minimum ps
284 aa79e62e Iustin Pop
285 aa79e62e Iustin Pop
-- | Log but ignore an 'IOError'.
286 aa79e62e Iustin Pop
ignoreIOError :: a -> Bool -> String -> IOError -> IO a
287 aa79e62e Iustin Pop
ignoreIOError a ignore_noent msg e = do
288 aa79e62e Iustin Pop
  unless (isDoesNotExistError e && ignore_noent) .
289 aa79e62e Iustin Pop
    logWarning $ msg ++ ": " ++ show e
290 aa79e62e Iustin Pop
  return a
291 aa79e62e Iustin Pop
292 aa79e62e Iustin Pop
-- | Compute the list of existing archive directories. Note that I/O
293 aa79e62e Iustin Pop
-- exceptions are swallowed and ignored.
294 aa79e62e Iustin Pop
allArchiveDirs :: FilePath -> IO [FilePath]
295 aa79e62e Iustin Pop
allArchiveDirs rootdir = do
296 aa79e62e Iustin Pop
  let adir = rootdir </> jobQueueArchiveSubDir
297 aa79e62e Iustin Pop
  contents <- getDirectoryContents adir `Control.Exception.catch`
298 aa79e62e Iustin Pop
               ignoreIOError [] False
299 aa79e62e Iustin Pop
                 ("Failed to list queue directory " ++ adir)
300 aa79e62e Iustin Pop
  let fpaths = map (adir </>) $ filter (not . ("." `isPrefixOf`)) contents
301 aa79e62e Iustin Pop
  filterM (\path ->
302 aa79e62e Iustin Pop
             liftM isDirectory (getFileStatus (adir </> path))
303 aa79e62e Iustin Pop
               `Control.Exception.catch`
304 aa79e62e Iustin Pop
               ignoreIOError False True
305 aa79e62e Iustin Pop
                 ("Failed to stat archive path " ++ path)) fpaths
306 aa79e62e Iustin Pop
307 aa79e62e Iustin Pop
-- | Build list of directories containing job files. Note: compared to
308 aa79e62e Iustin Pop
-- the Python version, this doesn't ignore a potential lost+found
309 aa79e62e Iustin Pop
-- file.
310 aa79e62e Iustin Pop
determineJobDirectories :: FilePath -> Bool -> IO [FilePath]
311 aa79e62e Iustin Pop
determineJobDirectories rootdir archived = do
312 aa79e62e Iustin Pop
  other <- if archived
313 aa79e62e Iustin Pop
             then allArchiveDirs rootdir
314 aa79e62e Iustin Pop
             else return []
315 aa79e62e Iustin Pop
  return $ rootdir:other
316 aa79e62e Iustin Pop
317 3cecd73c Michele Tartara
-- Function equivalent to the \'sequence\' function, that cannot be used because
318 3cecd73c Michele Tartara
-- of library version conflict on Lucid.
319 3cecd73c Michele Tartara
-- FIXME: delete this and just use \'sequence\' instead when Lucid compatibility
320 3cecd73c Michele Tartara
-- will not be required anymore.
321 3cecd73c Michele Tartara
sequencer :: [Either IOError [JobId]] -> Either IOError [[JobId]]
322 3cecd73c Michele Tartara
sequencer l = fmap reverse $ foldl seqFolder (Right []) l
323 3cecd73c Michele Tartara
324 3cecd73c Michele Tartara
-- | Folding function for joining multiple [JobIds] into one list.
325 3cecd73c Michele Tartara
seqFolder :: Either IOError [[JobId]]
326 3cecd73c Michele Tartara
          -> Either IOError [JobId]
327 3cecd73c Michele Tartara
          -> Either IOError [[JobId]]
328 3cecd73c Michele Tartara
seqFolder (Left e) _ = Left e
329 3cecd73c Michele Tartara
seqFolder (Right _) (Left e) = Left e
330 3cecd73c Michele Tartara
seqFolder (Right l) (Right el) = Right $ el:l
331 3cecd73c Michele Tartara
332 aa79e62e Iustin Pop
-- | Computes the list of all jobs in the given directories.
333 be0cb2d7 Michele Tartara
getJobIDs :: [FilePath] -> IO (Either IOError [JobId])
334 3cecd73c Michele Tartara
getJobIDs paths = liftM (fmap concat . sequencer) (mapM getDirJobIDs paths)
335 aa79e62e Iustin Pop
336 aa79e62e Iustin Pop
-- | Sorts the a list of job IDs.
337 aa79e62e Iustin Pop
sortJobIDs :: [JobId] -> [JobId]
338 aa79e62e Iustin Pop
sortJobIDs = sortBy (comparing fromJobId)
339 aa79e62e Iustin Pop
340 aa79e62e Iustin Pop
-- | Computes the list of jobs in a given directory.
341 be0cb2d7 Michele Tartara
getDirJobIDs :: FilePath -> IO (Either IOError [JobId])
342 aa79e62e Iustin Pop
getDirJobIDs path = do
343 be0cb2d7 Michele Tartara
  either_contents <-
344 be0cb2d7 Michele Tartara
    try (getDirectoryContents path) :: IO (Either IOError [FilePath])
345 be0cb2d7 Michele Tartara
  case either_contents of
346 be0cb2d7 Michele Tartara
    Left e -> do
347 be0cb2d7 Michele Tartara
      logWarning $ "Failed to list job directory " ++ path ++ ": " ++ show e
348 be0cb2d7 Michele Tartara
      return $ Left e
349 be0cb2d7 Michele Tartara
    Right contents -> do
350 be0cb2d7 Michele Tartara
      let jids = foldl (\ids file ->
351 be0cb2d7 Michele Tartara
                         case parseJobFileId file of
352 be0cb2d7 Michele Tartara
                           Nothing -> ids
353 be0cb2d7 Michele Tartara
                           Just new_id -> new_id:ids) [] contents
354 be0cb2d7 Michele Tartara
      return . Right $ reverse jids
355 aa79e62e Iustin Pop
356 aa79e62e Iustin Pop
-- | Reads the job data from disk.
357 aa79e62e Iustin Pop
readJobDataFromDisk :: FilePath -> Bool -> JobId -> IO (Maybe (String, Bool))
358 aa79e62e Iustin Pop
readJobDataFromDisk rootdir archived jid = do
359 aa79e62e Iustin Pop
  let live_path = liveJobFile rootdir jid
360 aa79e62e Iustin Pop
      archived_path = archivedJobFile rootdir jid
361 aa79e62e Iustin Pop
      all_paths = if archived
362 aa79e62e Iustin Pop
                    then [(live_path, False), (archived_path, True)]
363 aa79e62e Iustin Pop
                    else [(live_path, False)]
364 aa79e62e Iustin Pop
  foldM (\state (path, isarchived) ->
365 aa79e62e Iustin Pop
           liftM (\r -> Just (r, isarchived)) (readFile path)
366 aa79e62e Iustin Pop
             `Control.Exception.catch`
367 aa79e62e Iustin Pop
             ignoreIOError state True
368 aa79e62e Iustin Pop
               ("Failed to read job file " ++ path)) Nothing all_paths
369 aa79e62e Iustin Pop
370 aa79e62e Iustin Pop
-- | Failed to load job error.
371 aa79e62e Iustin Pop
noSuchJob :: Result (QueuedJob, Bool)
372 aa79e62e Iustin Pop
noSuchJob = Bad "Can't load job file"
373 aa79e62e Iustin Pop
374 aa79e62e Iustin Pop
-- | Loads a job from disk.
375 aa79e62e Iustin Pop
loadJobFromDisk :: FilePath -> Bool -> JobId -> IO (Result (QueuedJob, Bool))
376 aa79e62e Iustin Pop
loadJobFromDisk rootdir archived jid = do
377 aa79e62e Iustin Pop
  raw <- readJobDataFromDisk rootdir archived jid
378 aa79e62e Iustin Pop
  -- note: we need some stricness below, otherwise the wrapping in a
379 aa79e62e Iustin Pop
  -- Result will create too much lazyness, and not close the file
380 aa79e62e Iustin Pop
  -- descriptors for the individual jobs
381 aa79e62e Iustin Pop
  return $! case raw of
382 aa79e62e Iustin Pop
             Nothing -> noSuchJob
383 aa79e62e Iustin Pop
             Just (str, arch) ->
384 aa79e62e Iustin Pop
               liftM (\qj -> (qj, arch)) .
385 aa79e62e Iustin Pop
               fromJResult "Parsing job file" $ Text.JSON.decode str
386 cef3f99f Klaus Aehlig
387 b498ed42 Klaus Aehlig
-- | Write a job to disk.
388 b498ed42 Klaus Aehlig
writeJobToDisk :: FilePath -> QueuedJob -> IO (Result ())
389 b498ed42 Klaus Aehlig
writeJobToDisk rootdir job = do
390 b498ed42 Klaus Aehlig
  let filename = liveJobFile rootdir . qjId $ job
391 b498ed42 Klaus Aehlig
      content = Text.JSON.encode . Text.JSON.showJSON $ job
392 b498ed42 Klaus Aehlig
  tryAndLogIOError (atomicWriteFile filename content)
393 b498ed42 Klaus Aehlig
                   ("Failed to write " ++ filename) Ok
394 b498ed42 Klaus Aehlig
395 b5a96995 Klaus Aehlig
-- | Replicate a job to all master candidates.
396 b5a96995 Klaus Aehlig
replicateJob :: FilePath -> [Node] -> QueuedJob -> IO [(Node, ERpcError ())]
397 b5a96995 Klaus Aehlig
replicateJob rootdir mastercandidates job = do
398 b5a96995 Klaus Aehlig
  let filename = liveJobFile rootdir . qjId $ job
399 b5a96995 Klaus Aehlig
      content = Text.JSON.encode . Text.JSON.showJSON $ job
400 557f5dad Klaus Aehlig
  callresult <- executeRpcCall mastercandidates
401 557f5dad Klaus Aehlig
                  $ RpcCallJobqueueUpdate filename content
402 557f5dad Klaus Aehlig
  let result = map (second (() <$)) callresult
403 b5a96995 Klaus Aehlig
  logRpcErrors result
404 b5a96995 Klaus Aehlig
  return result
405 b5a96995 Klaus Aehlig
406 9fd653a4 Klaus Aehlig
-- | Replicate many jobs to all master candidates.
407 9fd653a4 Klaus Aehlig
replicateManyJobs :: FilePath -> [Node] -> [QueuedJob] -> IO ()
408 9fd653a4 Klaus Aehlig
replicateManyJobs rootdir mastercandidates =
409 9fd653a4 Klaus Aehlig
  mapM_ (replicateJob rootdir mastercandidates)
410 9fd653a4 Klaus Aehlig
411 cef3f99f Klaus Aehlig
-- | Read the job serial number from disk.
412 cef3f99f Klaus Aehlig
readSerialFromDisk :: IO (Result JobId)
413 cef3f99f Klaus Aehlig
readSerialFromDisk = do
414 cef3f99f Klaus Aehlig
  filename <- jobQueueSerialFile
415 cef3f99f Klaus Aehlig
  tryAndLogIOError (readFile filename) "Failed to read serial file"
416 cef3f99f Klaus Aehlig
                   (makeJobIdS . rStripSpace)
417 ae858516 Klaus Aehlig
418 ae858516 Klaus Aehlig
-- | Allocate new job ids.
419 ae858516 Klaus Aehlig
-- To avoid races while accessing the serial file, the threads synchronize
420 ae858516 Klaus Aehlig
-- over a lock, as usual provided by an MVar.
421 ae858516 Klaus Aehlig
allocateJobIds :: [Node] -> MVar () -> Int -> IO (Result [JobId])
422 ae858516 Klaus Aehlig
allocateJobIds mastercandidates lock n =
423 ae858516 Klaus Aehlig
  if n <= 0
424 ae858516 Klaus Aehlig
    then return . Bad $ "Can only allocate positive number of job ids"
425 ae858516 Klaus Aehlig
    else do
426 ae858516 Klaus Aehlig
      takeMVar lock
427 ae858516 Klaus Aehlig
      rjobid <- readSerialFromDisk
428 ae858516 Klaus Aehlig
      case rjobid of
429 ae858516 Klaus Aehlig
        Bad s -> do
430 ae858516 Klaus Aehlig
          putMVar lock ()
431 ae858516 Klaus Aehlig
          return . Bad $ s
432 ae858516 Klaus Aehlig
        Ok jid -> do
433 ae858516 Klaus Aehlig
          let current = fromJobId jid
434 ae858516 Klaus Aehlig
              serial_content = show (current + n) ++  "\n"
435 ae858516 Klaus Aehlig
          serial <- jobQueueSerialFile
436 ae858516 Klaus Aehlig
          write_result <- try $ atomicWriteFile serial serial_content
437 ae858516 Klaus Aehlig
                          :: IO (Either IOError ())
438 ae858516 Klaus Aehlig
          case write_result of
439 ae858516 Klaus Aehlig
            Left e -> do
440 f7819050 Klaus Aehlig
              putMVar lock ()
441 ae858516 Klaus Aehlig
              let msg = "Failed to write serial file: " ++ show e
442 ae858516 Klaus Aehlig
              logError msg
443 ae858516 Klaus Aehlig
              return . Bad $ msg 
444 ae858516 Klaus Aehlig
            Right () -> do
445 ae858516 Klaus Aehlig
              _ <- executeRpcCall mastercandidates
446 ae858516 Klaus Aehlig
                     $ RpcCallJobqueueUpdate serial serial_content
447 f7819050 Klaus Aehlig
              putMVar lock ()
448 ae858516 Klaus Aehlig
              return $ mapM makeJobId [(current+1)..(current+n)]
449 ae858516 Klaus Aehlig
450 ae858516 Klaus Aehlig
-- | Allocate one new job id.
451 ae858516 Klaus Aehlig
allocateJobId :: [Node] -> MVar () -> IO (Result JobId)
452 ae858516 Klaus Aehlig
allocateJobId mastercandidates lock = do
453 ae858516 Klaus Aehlig
  jids <- allocateJobIds mastercandidates lock 1
454 ae858516 Klaus Aehlig
  return (jids >>= monadicThe "Failed to allocate precisely one Job ID")
455 1b94c0db Klaus Aehlig
456 1b94c0db Klaus Aehlig
-- | Decide if job queue is open
457 1b94c0db Klaus Aehlig
isQueueOpen :: IO Bool
458 1b94c0db Klaus Aehlig
isQueueOpen = liftM not (jobQueueDrainFile >>= doesFileExist)
459 493d6920 Klaus Aehlig
460 ac0c5c6d Klaus Aehlig
-- | Start enqueued jobs, currently by handing them over to masterd.
461 ac0c5c6d Klaus Aehlig
startJobs :: [QueuedJob] -> IO ()
462 ac0c5c6d Klaus Aehlig
startJobs jobs = do
463 493d6920 Klaus Aehlig
  socketpath <- defaultMasterSocket
464 d605e261 Petr Pudlak
  client <- getLuxiClient socketpath
465 493d6920 Klaus Aehlig
  pickupResults <- mapM (flip callMethod client . PickupJob . qjId) jobs
466 493d6920 Klaus Aehlig
  let failures = map show $ justBad pickupResults
467 493d6920 Klaus Aehlig
  unless (null failures)
468 493d6920 Klaus Aehlig
   . logWarning . (++) "Failed to notify masterd: " . commaJoin $ failures
469 47c3c7b1 Klaus Aehlig
470 47c3c7b1 Klaus Aehlig
-- | Try to cancel a job that has already been handed over to execution,
471 47c3c7b1 Klaus Aehlig
-- currently by asking masterd to cancel it.
472 47c3c7b1 Klaus Aehlig
cancelJob :: JobId -> IO (ErrorResult JSValue)
473 47c3c7b1 Klaus Aehlig
cancelJob jid = do
474 47c3c7b1 Klaus Aehlig
  socketpath <- defaultMasterSocket
475 47c3c7b1 Klaus Aehlig
  client <- getLuxiClient socketpath
476 47c3c7b1 Klaus Aehlig
  callMethod (CancelJob jid) client