Statistics
| Branch: | Tag: | Revision:

root / src / Ganeti / JQueue.hs @ 8b5a4b9a

History | View | Annotate | Download (18 kB)

1 aa79e62e Iustin Pop
{-# LANGUAGE TemplateHaskell #-}
2 aa79e62e Iustin Pop
3 aa79e62e Iustin Pop
{-| Implementation of the job queue.
4 aa79e62e Iustin Pop
5 aa79e62e Iustin Pop
-}
6 aa79e62e Iustin Pop
7 aa79e62e Iustin Pop
{-
8 aa79e62e Iustin Pop
9 aa79e62e Iustin Pop
Copyright (C) 2010, 2012 Google Inc.
10 aa79e62e Iustin Pop
11 aa79e62e Iustin Pop
This program is free software; you can redistribute it and/or modify
12 aa79e62e Iustin Pop
it under the terms of the GNU General Public License as published by
13 aa79e62e Iustin Pop
the Free Software Foundation; either version 2 of the License, or
14 aa79e62e Iustin Pop
(at your option) any later version.
15 aa79e62e Iustin Pop
16 aa79e62e Iustin Pop
This program is distributed in the hope that it will be useful, but
17 aa79e62e Iustin Pop
WITHOUT ANY WARRANTY; without even the implied warranty of
18 aa79e62e Iustin Pop
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
19 aa79e62e Iustin Pop
General Public License for more details.
20 aa79e62e Iustin Pop
21 aa79e62e Iustin Pop
You should have received a copy of the GNU General Public License
22 aa79e62e Iustin Pop
along with this program; if not, write to the Free Software
23 aa79e62e Iustin Pop
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
24 aa79e62e Iustin Pop
02110-1301, USA.
25 aa79e62e Iustin Pop
26 aa79e62e Iustin Pop
-}
27 aa79e62e Iustin Pop
28 aa79e62e Iustin Pop
module Ganeti.JQueue
29 aa79e62e Iustin Pop
    ( QueuedOpCode(..)
30 aa79e62e Iustin Pop
    , QueuedJob(..)
31 aa79e62e Iustin Pop
    , InputOpCode(..)
32 4b49a72b Klaus Aehlig
    , queuedOpCodeFromMetaOpCode
33 1c1132f4 Klaus Aehlig
    , queuedJobFromOpCodes
34 363dc9d6 Klaus Aehlig
    , cancelQueuedJob
35 aa79e62e Iustin Pop
    , Timestamp
36 ae66f3a9 Klaus Aehlig
    , fromClockTime
37 aa79e62e Iustin Pop
    , noTimestamp
38 c3a70209 Klaus Aehlig
    , currentTimestamp
39 8b5a4b9a Klaus Aehlig
    , advanceTimestamp
40 2af22d70 Klaus Aehlig
    , setReceivedTimestamp
41 aa79e62e Iustin Pop
    , opStatusFinalized
42 aa79e62e Iustin Pop
    , extractOpSummary
43 aa79e62e Iustin Pop
    , calcJobStatus
44 02e40b13 Klaus Aehlig
    , jobStarted
45 847df9e9 Klaus Aehlig
    , jobFinalized
46 370f63be Klaus Aehlig
    , jobArchivable
47 aa79e62e Iustin Pop
    , calcJobPriority
48 aa79e62e Iustin Pop
    , jobFileName
49 aa79e62e Iustin Pop
    , liveJobFile
50 aa79e62e Iustin Pop
    , archivedJobFile
51 aa79e62e Iustin Pop
    , determineJobDirectories
52 aa79e62e Iustin Pop
    , getJobIDs
53 aa79e62e Iustin Pop
    , sortJobIDs
54 aa79e62e Iustin Pop
    , loadJobFromDisk
55 aa79e62e Iustin Pop
    , noSuchJob
56 cef3f99f Klaus Aehlig
    , readSerialFromDisk
57 ae858516 Klaus Aehlig
    , allocateJobIds
58 ae858516 Klaus Aehlig
    , allocateJobId
59 b498ed42 Klaus Aehlig
    , writeJobToDisk
60 9fd653a4 Klaus Aehlig
    , replicateManyJobs
61 1b94c0db Klaus Aehlig
    , isQueueOpen
62 ac0c5c6d Klaus Aehlig
    , startJobs
63 47c3c7b1 Klaus Aehlig
    , cancelJob
64 aa79e62e Iustin Pop
    ) where
65 aa79e62e Iustin Pop
66 370f63be Klaus Aehlig
import Control.Applicative (liftA2, (<|>))
67 8b5a4b9a Klaus Aehlig
import Control.Arrow (first, second)
68 ae858516 Klaus Aehlig
import Control.Concurrent.MVar
69 aa79e62e Iustin Pop
import Control.Exception
70 aa79e62e Iustin Pop
import Control.Monad
71 557f5dad Klaus Aehlig
import Data.Functor ((<$))
72 aa79e62e Iustin Pop
import Data.List
73 4b49a72b Klaus Aehlig
import Data.Maybe
74 aa79e62e Iustin Pop
import Data.Ord (comparing)
75 aa79e62e Iustin Pop
-- workaround what seems to be a bug in ghc 7.4's TH shadowing code
76 557f5dad Klaus Aehlig
import Prelude hiding (id, log)
77 aa79e62e Iustin Pop
import System.Directory
78 aa79e62e Iustin Pop
import System.FilePath
79 aa79e62e Iustin Pop
import System.IO.Error (isDoesNotExistError)
80 aa79e62e Iustin Pop
import System.Posix.Files
81 c3a70209 Klaus Aehlig
import System.Time
82 aa79e62e Iustin Pop
import qualified Text.JSON
83 aa79e62e Iustin Pop
import Text.JSON.Types
84 aa79e62e Iustin Pop
85 aa79e62e Iustin Pop
import Ganeti.BasicTypes
86 aa79e62e Iustin Pop
import qualified Ganeti.Constants as C
87 47c3c7b1 Klaus Aehlig
import Ganeti.Errors (ErrorResult)
88 aa79e62e Iustin Pop
import Ganeti.JSON
89 aa79e62e Iustin Pop
import Ganeti.Logging
90 493d6920 Klaus Aehlig
import Ganeti.Luxi
91 ae858516 Klaus Aehlig
import Ganeti.Objects (Node)
92 aa79e62e Iustin Pop
import Ganeti.OpCodes
93 aa79e62e Iustin Pop
import Ganeti.Path
94 b5a96995 Klaus Aehlig
import Ganeti.Rpc (executeRpcCall, ERpcError, logRpcErrors,
95 b5a96995 Klaus Aehlig
                   RpcCallJobqueueUpdate(..))
96 aa79e62e Iustin Pop
import Ganeti.THH
97 aa79e62e Iustin Pop
import Ganeti.Types
98 cef3f99f Klaus Aehlig
import Ganeti.Utils
99 77676415 Klaus Aehlig
import Ganeti.VCluster (makeVirtualPath)
100 aa79e62e Iustin Pop
101 aa79e62e Iustin Pop
-- * Data types
102 aa79e62e Iustin Pop
103 c3a70209 Klaus Aehlig
-- | The ganeti queue timestamp type. It represents the time as the pair
104 c3a70209 Klaus Aehlig
-- of seconds since the epoch and microseconds since the beginning of the
105 c3a70209 Klaus Aehlig
-- second.
106 aa79e62e Iustin Pop
type Timestamp = (Int, Int)
107 aa79e62e Iustin Pop
108 aa79e62e Iustin Pop
-- | Missing timestamp type.
109 aa79e62e Iustin Pop
noTimestamp :: Timestamp
110 aa79e62e Iustin Pop
noTimestamp = (-1, -1)
111 aa79e62e Iustin Pop
112 ae66f3a9 Klaus Aehlig
-- | Obtain a Timestamp from a given clock time
113 ae66f3a9 Klaus Aehlig
fromClockTime :: ClockTime -> Timestamp
114 ae66f3a9 Klaus Aehlig
fromClockTime (TOD ctime pico) =
115 ae66f3a9 Klaus Aehlig
  (fromIntegral ctime, fromIntegral $ pico `div` 1000000)
116 ae66f3a9 Klaus Aehlig
117 c3a70209 Klaus Aehlig
-- | Get the current time in the job-queue timestamp format.
118 c3a70209 Klaus Aehlig
currentTimestamp :: IO Timestamp
119 ae66f3a9 Klaus Aehlig
currentTimestamp = fromClockTime `liftM` getClockTime
120 c3a70209 Klaus Aehlig
121 8b5a4b9a Klaus Aehlig
-- | From a given timestamp, obtain the timestamp of the
122 8b5a4b9a Klaus Aehlig
-- time that is the given number of seconds later.
123 8b5a4b9a Klaus Aehlig
advanceTimestamp :: Int -> Timestamp -> Timestamp
124 8b5a4b9a Klaus Aehlig
advanceTimestamp = first . (+)
125 8b5a4b9a Klaus Aehlig
126 aa79e62e Iustin Pop
-- | An input opcode.
127 aa79e62e Iustin Pop
data InputOpCode = ValidOpCode MetaOpCode -- ^ OpCode was parsed successfully
128 aa79e62e Iustin Pop
                 | InvalidOpCode JSValue  -- ^ Invalid opcode
129 aa79e62e Iustin Pop
                   deriving (Show, Eq)
130 aa79e62e Iustin Pop
131 aa79e62e Iustin Pop
-- | JSON instance for 'InputOpCode', trying to parse it and if
132 aa79e62e Iustin Pop
-- failing, keeping the original JSValue.
133 aa79e62e Iustin Pop
instance Text.JSON.JSON InputOpCode where
134 aa79e62e Iustin Pop
  showJSON (ValidOpCode mo) = Text.JSON.showJSON mo
135 aa79e62e Iustin Pop
  showJSON (InvalidOpCode inv) = inv
136 aa79e62e Iustin Pop
  readJSON v = case Text.JSON.readJSON v of
137 aa79e62e Iustin Pop
                 Text.JSON.Error _ -> return $ InvalidOpCode v
138 aa79e62e Iustin Pop
                 Text.JSON.Ok mo -> return $ ValidOpCode mo
139 aa79e62e Iustin Pop
140 aa79e62e Iustin Pop
-- | Invalid opcode summary.
141 aa79e62e Iustin Pop
invalidOp :: String
142 aa79e62e Iustin Pop
invalidOp = "INVALID_OP"
143 aa79e62e Iustin Pop
144 aa79e62e Iustin Pop
-- | Tries to extract the opcode summary from an 'InputOpCode'. This
145 aa79e62e Iustin Pop
-- duplicates some functionality from the 'opSummary' function in
146 aa79e62e Iustin Pop
-- "Ganeti.OpCodes".
147 aa79e62e Iustin Pop
extractOpSummary :: InputOpCode -> String
148 aa79e62e Iustin Pop
extractOpSummary (ValidOpCode metaop) = opSummary $ metaOpCode metaop
149 aa79e62e Iustin Pop
extractOpSummary (InvalidOpCode (JSObject o)) =
150 aa79e62e Iustin Pop
  case fromObjWithDefault (fromJSObject o) "OP_ID" ("OP_" ++ invalidOp) of
151 aa79e62e Iustin Pop
    Just s -> drop 3 s -- drop the OP_ prefix
152 aa79e62e Iustin Pop
    Nothing -> invalidOp
153 aa79e62e Iustin Pop
extractOpSummary _ = invalidOp
154 aa79e62e Iustin Pop
155 aa79e62e Iustin Pop
$(buildObject "QueuedOpCode" "qo"
156 aa79e62e Iustin Pop
  [ simpleField "input"           [t| InputOpCode |]
157 aa79e62e Iustin Pop
  , simpleField "status"          [t| OpStatus    |]
158 aa79e62e Iustin Pop
  , simpleField "result"          [t| JSValue     |]
159 aa79e62e Iustin Pop
  , defaultField [| [] |] $
160 aa79e62e Iustin Pop
    simpleField "log"             [t| [(Int, Timestamp, ELogType, JSValue)] |]
161 aa79e62e Iustin Pop
  , simpleField "priority"        [t| Int         |]
162 aa79e62e Iustin Pop
  , optionalNullSerField $
163 aa79e62e Iustin Pop
    simpleField "start_timestamp" [t| Timestamp   |]
164 aa79e62e Iustin Pop
  , optionalNullSerField $
165 aa79e62e Iustin Pop
    simpleField "exec_timestamp"  [t| Timestamp   |]
166 aa79e62e Iustin Pop
  , optionalNullSerField $
167 aa79e62e Iustin Pop
    simpleField "end_timestamp"   [t| Timestamp   |]
168 aa79e62e Iustin Pop
  ])
169 aa79e62e Iustin Pop
170 aa79e62e Iustin Pop
$(buildObject "QueuedJob" "qj"
171 aa79e62e Iustin Pop
  [ simpleField "id"                 [t| JobId          |]
172 aa79e62e Iustin Pop
  , simpleField "ops"                [t| [QueuedOpCode] |]
173 aa79e62e Iustin Pop
  , optionalNullSerField $
174 aa79e62e Iustin Pop
    simpleField "received_timestamp" [t| Timestamp      |]
175 aa79e62e Iustin Pop
  , optionalNullSerField $
176 aa79e62e Iustin Pop
    simpleField "start_timestamp"    [t| Timestamp      |]
177 aa79e62e Iustin Pop
  , optionalNullSerField $
178 aa79e62e Iustin Pop
    simpleField "end_timestamp"      [t| Timestamp      |]
179 aa79e62e Iustin Pop
  ])
180 aa79e62e Iustin Pop
181 4b49a72b Klaus Aehlig
-- | Convenience function to obtain a QueuedOpCode from a MetaOpCode
182 4b49a72b Klaus Aehlig
queuedOpCodeFromMetaOpCode :: MetaOpCode -> QueuedOpCode
183 4b49a72b Klaus Aehlig
queuedOpCodeFromMetaOpCode op =
184 4b49a72b Klaus Aehlig
  QueuedOpCode { qoInput = ValidOpCode op
185 4b49a72b Klaus Aehlig
               , qoStatus = OP_STATUS_QUEUED
186 4b49a72b Klaus Aehlig
               , qoPriority = opSubmitPriorityToRaw . opPriority . metaParams
187 4b49a72b Klaus Aehlig
                              $ op
188 4b49a72b Klaus Aehlig
               , qoLog = []
189 4b49a72b Klaus Aehlig
               , qoResult = JSNull
190 4b49a72b Klaus Aehlig
               , qoStartTimestamp = Nothing
191 4b49a72b Klaus Aehlig
               , qoEndTimestamp = Nothing
192 4b49a72b Klaus Aehlig
               , qoExecTimestamp = Nothing
193 4b49a72b Klaus Aehlig
               }
194 4b49a72b Klaus Aehlig
195 1c1132f4 Klaus Aehlig
-- | From a job-id and a list of op-codes create a job. This is
196 1c1132f4 Klaus Aehlig
-- the pure part of job creation, as allocating a new job id
197 1c1132f4 Klaus Aehlig
-- lives in IO.
198 1c1132f4 Klaus Aehlig
queuedJobFromOpCodes :: (Monad m) => JobId -> [MetaOpCode] -> m QueuedJob
199 1c1132f4 Klaus Aehlig
queuedJobFromOpCodes jobid ops = do
200 1c1132f4 Klaus Aehlig
  ops' <- mapM (`resolveDependencies` jobid) ops
201 1c1132f4 Klaus Aehlig
  return QueuedJob { qjId = jobid
202 1c1132f4 Klaus Aehlig
                   , qjOps = map queuedOpCodeFromMetaOpCode ops'
203 1c1132f4 Klaus Aehlig
                   , qjReceivedTimestamp = Nothing 
204 1c1132f4 Klaus Aehlig
                   , qjStartTimestamp = Nothing
205 1c1132f4 Klaus Aehlig
                   , qjEndTimestamp = Nothing
206 1c1132f4 Klaus Aehlig
                   }
207 1c1132f4 Klaus Aehlig
208 2af22d70 Klaus Aehlig
-- | Attach a received timestamp to a Queued Job.
209 2af22d70 Klaus Aehlig
setReceivedTimestamp :: Timestamp -> QueuedJob -> QueuedJob
210 2af22d70 Klaus Aehlig
setReceivedTimestamp ts job = job { qjReceivedTimestamp = Just ts }
211 2af22d70 Klaus Aehlig
212 363dc9d6 Klaus Aehlig
-- | Set the state of a QueuedOpCode to canceled.
213 363dc9d6 Klaus Aehlig
cancelOpCode :: Timestamp -> QueuedOpCode -> QueuedOpCode
214 363dc9d6 Klaus Aehlig
cancelOpCode now op =
215 363dc9d6 Klaus Aehlig
  op { qoStatus = OP_STATUS_CANCELED, qoEndTimestamp = Just now }
216 363dc9d6 Klaus Aehlig
217 363dc9d6 Klaus Aehlig
-- | Transform a QueuedJob that has not been started into its canceled form.
218 363dc9d6 Klaus Aehlig
cancelQueuedJob :: Timestamp -> QueuedJob -> QueuedJob
219 363dc9d6 Klaus Aehlig
cancelQueuedJob now job =
220 363dc9d6 Klaus Aehlig
  let ops' = map (cancelOpCode now) $ qjOps job
221 363dc9d6 Klaus Aehlig
  in job { qjOps = ops', qjEndTimestamp = Just now}
222 363dc9d6 Klaus Aehlig
223 aa79e62e Iustin Pop
-- | Job file prefix.
224 aa79e62e Iustin Pop
jobFilePrefix :: String
225 aa79e62e Iustin Pop
jobFilePrefix = "job-"
226 aa79e62e Iustin Pop
227 aa79e62e Iustin Pop
-- | Computes the filename for a given job ID.
228 aa79e62e Iustin Pop
jobFileName :: JobId -> FilePath
229 aa79e62e Iustin Pop
jobFileName jid = jobFilePrefix ++ show (fromJobId jid)
230 aa79e62e Iustin Pop
231 aa79e62e Iustin Pop
-- | Parses a job ID from a file name.
232 aa79e62e Iustin Pop
parseJobFileId :: (Monad m) => FilePath -> m JobId
233 aa79e62e Iustin Pop
parseJobFileId path =
234 aa79e62e Iustin Pop
  case stripPrefix jobFilePrefix path of
235 aa79e62e Iustin Pop
    Nothing -> fail $ "Job file '" ++ path ++
236 aa79e62e Iustin Pop
                      "' doesn't have the correct prefix"
237 aa79e62e Iustin Pop
    Just suffix -> makeJobIdS suffix
238 aa79e62e Iustin Pop
239 aa79e62e Iustin Pop
-- | Computes the full path to a live job.
240 aa79e62e Iustin Pop
liveJobFile :: FilePath -> JobId -> FilePath
241 aa79e62e Iustin Pop
liveJobFile rootdir jid = rootdir </> jobFileName jid
242 aa79e62e Iustin Pop
243 aa79e62e Iustin Pop
-- | Computes the full path to an archives job. BROKEN.
244 aa79e62e Iustin Pop
archivedJobFile :: FilePath -> JobId -> FilePath
245 aa79e62e Iustin Pop
archivedJobFile rootdir jid =
246 aa79e62e Iustin Pop
  let subdir = show (fromJobId jid `div` C.jstoreJobsPerArchiveDirectory)
247 aa79e62e Iustin Pop
  in rootdir </> jobQueueArchiveSubDir </> subdir </> jobFileName jid
248 aa79e62e Iustin Pop
249 aa79e62e Iustin Pop
-- | Map from opcode status to job status.
250 aa79e62e Iustin Pop
opStatusToJob :: OpStatus -> JobStatus
251 aa79e62e Iustin Pop
opStatusToJob OP_STATUS_QUEUED    = JOB_STATUS_QUEUED
252 aa79e62e Iustin Pop
opStatusToJob OP_STATUS_WAITING   = JOB_STATUS_WAITING
253 aa79e62e Iustin Pop
opStatusToJob OP_STATUS_SUCCESS   = JOB_STATUS_SUCCESS
254 aa79e62e Iustin Pop
opStatusToJob OP_STATUS_RUNNING   = JOB_STATUS_RUNNING
255 aa79e62e Iustin Pop
opStatusToJob OP_STATUS_CANCELING = JOB_STATUS_CANCELING
256 aa79e62e Iustin Pop
opStatusToJob OP_STATUS_CANCELED  = JOB_STATUS_CANCELED
257 aa79e62e Iustin Pop
opStatusToJob OP_STATUS_ERROR     = JOB_STATUS_ERROR
258 aa79e62e Iustin Pop
259 aa79e62e Iustin Pop
-- | Computes a queued job's status.
260 aa79e62e Iustin Pop
calcJobStatus :: QueuedJob -> JobStatus
261 aa79e62e Iustin Pop
calcJobStatus QueuedJob { qjOps = ops } =
262 aa79e62e Iustin Pop
  extractOpSt (map qoStatus ops) JOB_STATUS_QUEUED True
263 aa79e62e Iustin Pop
    where
264 aa79e62e Iustin Pop
      terminalStatus OP_STATUS_ERROR     = True
265 aa79e62e Iustin Pop
      terminalStatus OP_STATUS_CANCELING = True
266 aa79e62e Iustin Pop
      terminalStatus OP_STATUS_CANCELED  = True
267 aa79e62e Iustin Pop
      terminalStatus _                   = False
268 aa79e62e Iustin Pop
      softStatus     OP_STATUS_SUCCESS   = True
269 aa79e62e Iustin Pop
      softStatus     OP_STATUS_QUEUED    = True
270 aa79e62e Iustin Pop
      softStatus     _                   = False
271 aa79e62e Iustin Pop
      extractOpSt [] _ True = JOB_STATUS_SUCCESS
272 aa79e62e Iustin Pop
      extractOpSt [] d False = d
273 aa79e62e Iustin Pop
      extractOpSt (x:xs) d old_all
274 aa79e62e Iustin Pop
           | terminalStatus x = opStatusToJob x -- abort recursion
275 aa79e62e Iustin Pop
           | softStatus x     = extractOpSt xs d new_all -- continue unchanged
276 aa79e62e Iustin Pop
           | otherwise        = extractOpSt xs (opStatusToJob x) new_all
277 aa79e62e Iustin Pop
           where new_all = x == OP_STATUS_SUCCESS && old_all
278 aa79e62e Iustin Pop
279 02e40b13 Klaus Aehlig
-- | Determine if a job has started
280 02e40b13 Klaus Aehlig
jobStarted :: QueuedJob -> Bool
281 02e40b13 Klaus Aehlig
jobStarted = (> JOB_STATUS_QUEUED) . calcJobStatus
282 02e40b13 Klaus Aehlig
283 847df9e9 Klaus Aehlig
-- | Determine if a job is finalised.
284 847df9e9 Klaus Aehlig
jobFinalized :: QueuedJob -> Bool
285 847df9e9 Klaus Aehlig
jobFinalized = (> JOB_STATUS_RUNNING) . calcJobStatus
286 847df9e9 Klaus Aehlig
287 370f63be Klaus Aehlig
-- | Determine if a job is finalized and its timestamp is before
288 370f63be Klaus Aehlig
-- a given time.
289 370f63be Klaus Aehlig
jobArchivable :: Timestamp -> QueuedJob -> Bool
290 370f63be Klaus Aehlig
jobArchivable ts = liftA2 (&&) jobFinalized 
291 370f63be Klaus Aehlig
  $ maybe False (< ts)
292 370f63be Klaus Aehlig
    .  liftA2 (<|>) qjEndTimestamp qjStartTimestamp
293 370f63be Klaus Aehlig
294 aa79e62e Iustin Pop
-- | Determine whether an opcode status is finalized.
295 aa79e62e Iustin Pop
opStatusFinalized :: OpStatus -> Bool
296 aa79e62e Iustin Pop
opStatusFinalized = (> OP_STATUS_RUNNING)
297 aa79e62e Iustin Pop
298 aa79e62e Iustin Pop
-- | Compute a job's priority.
299 aa79e62e Iustin Pop
calcJobPriority :: QueuedJob -> Int
300 aa79e62e Iustin Pop
calcJobPriority QueuedJob { qjOps = ops } =
301 aa79e62e Iustin Pop
  helper . map qoPriority $ filter (not . opStatusFinalized . qoStatus) ops
302 aa79e62e Iustin Pop
    where helper [] = C.opPrioDefault
303 aa79e62e Iustin Pop
          helper ps = minimum ps
304 aa79e62e Iustin Pop
305 aa79e62e Iustin Pop
-- | Log but ignore an 'IOError'.
306 aa79e62e Iustin Pop
ignoreIOError :: a -> Bool -> String -> IOError -> IO a
307 aa79e62e Iustin Pop
ignoreIOError a ignore_noent msg e = do
308 aa79e62e Iustin Pop
  unless (isDoesNotExistError e && ignore_noent) .
309 aa79e62e Iustin Pop
    logWarning $ msg ++ ": " ++ show e
310 aa79e62e Iustin Pop
  return a
311 aa79e62e Iustin Pop
312 aa79e62e Iustin Pop
-- | Compute the list of existing archive directories. Note that I/O
313 aa79e62e Iustin Pop
-- exceptions are swallowed and ignored.
314 aa79e62e Iustin Pop
allArchiveDirs :: FilePath -> IO [FilePath]
315 aa79e62e Iustin Pop
allArchiveDirs rootdir = do
316 aa79e62e Iustin Pop
  let adir = rootdir </> jobQueueArchiveSubDir
317 aa79e62e Iustin Pop
  contents <- getDirectoryContents adir `Control.Exception.catch`
318 aa79e62e Iustin Pop
               ignoreIOError [] False
319 aa79e62e Iustin Pop
                 ("Failed to list queue directory " ++ adir)
320 aa79e62e Iustin Pop
  let fpaths = map (adir </>) $ filter (not . ("." `isPrefixOf`)) contents
321 aa79e62e Iustin Pop
  filterM (\path ->
322 aa79e62e Iustin Pop
             liftM isDirectory (getFileStatus (adir </> path))
323 aa79e62e Iustin Pop
               `Control.Exception.catch`
324 aa79e62e Iustin Pop
               ignoreIOError False True
325 aa79e62e Iustin Pop
                 ("Failed to stat archive path " ++ path)) fpaths
326 aa79e62e Iustin Pop
327 aa79e62e Iustin Pop
-- | Build list of directories containing job files. Note: compared to
328 aa79e62e Iustin Pop
-- the Python version, this doesn't ignore a potential lost+found
329 aa79e62e Iustin Pop
-- file.
330 aa79e62e Iustin Pop
determineJobDirectories :: FilePath -> Bool -> IO [FilePath]
331 aa79e62e Iustin Pop
determineJobDirectories rootdir archived = do
332 aa79e62e Iustin Pop
  other <- if archived
333 aa79e62e Iustin Pop
             then allArchiveDirs rootdir
334 aa79e62e Iustin Pop
             else return []
335 aa79e62e Iustin Pop
  return $ rootdir:other
336 aa79e62e Iustin Pop
337 3cecd73c Michele Tartara
-- Function equivalent to the \'sequence\' function, that cannot be used because
338 3cecd73c Michele Tartara
-- of library version conflict on Lucid.
339 3cecd73c Michele Tartara
-- FIXME: delete this and just use \'sequence\' instead when Lucid compatibility
340 3cecd73c Michele Tartara
-- will not be required anymore.
341 3cecd73c Michele Tartara
sequencer :: [Either IOError [JobId]] -> Either IOError [[JobId]]
342 3cecd73c Michele Tartara
sequencer l = fmap reverse $ foldl seqFolder (Right []) l
343 3cecd73c Michele Tartara
344 3cecd73c Michele Tartara
-- | Folding function for joining multiple [JobIds] into one list.
345 3cecd73c Michele Tartara
seqFolder :: Either IOError [[JobId]]
346 3cecd73c Michele Tartara
          -> Either IOError [JobId]
347 3cecd73c Michele Tartara
          -> Either IOError [[JobId]]
348 3cecd73c Michele Tartara
seqFolder (Left e) _ = Left e
349 3cecd73c Michele Tartara
seqFolder (Right _) (Left e) = Left e
350 3cecd73c Michele Tartara
seqFolder (Right l) (Right el) = Right $ el:l
351 3cecd73c Michele Tartara
352 aa79e62e Iustin Pop
-- | Computes the list of all jobs in the given directories.
353 be0cb2d7 Michele Tartara
getJobIDs :: [FilePath] -> IO (Either IOError [JobId])
354 3cecd73c Michele Tartara
getJobIDs paths = liftM (fmap concat . sequencer) (mapM getDirJobIDs paths)
355 aa79e62e Iustin Pop
356 aa79e62e Iustin Pop
-- | Sorts the a list of job IDs.
357 aa79e62e Iustin Pop
sortJobIDs :: [JobId] -> [JobId]
358 aa79e62e Iustin Pop
sortJobIDs = sortBy (comparing fromJobId)
359 aa79e62e Iustin Pop
360 aa79e62e Iustin Pop
-- | Computes the list of jobs in a given directory.
361 be0cb2d7 Michele Tartara
getDirJobIDs :: FilePath -> IO (Either IOError [JobId])
362 aa79e62e Iustin Pop
getDirJobIDs path = do
363 be0cb2d7 Michele Tartara
  either_contents <-
364 be0cb2d7 Michele Tartara
    try (getDirectoryContents path) :: IO (Either IOError [FilePath])
365 be0cb2d7 Michele Tartara
  case either_contents of
366 be0cb2d7 Michele Tartara
    Left e -> do
367 be0cb2d7 Michele Tartara
      logWarning $ "Failed to list job directory " ++ path ++ ": " ++ show e
368 be0cb2d7 Michele Tartara
      return $ Left e
369 be0cb2d7 Michele Tartara
    Right contents -> do
370 be0cb2d7 Michele Tartara
      let jids = foldl (\ids file ->
371 be0cb2d7 Michele Tartara
                         case parseJobFileId file of
372 be0cb2d7 Michele Tartara
                           Nothing -> ids
373 be0cb2d7 Michele Tartara
                           Just new_id -> new_id:ids) [] contents
374 be0cb2d7 Michele Tartara
      return . Right $ reverse jids
375 aa79e62e Iustin Pop
376 aa79e62e Iustin Pop
-- | Reads the job data from disk.
377 aa79e62e Iustin Pop
readJobDataFromDisk :: FilePath -> Bool -> JobId -> IO (Maybe (String, Bool))
378 aa79e62e Iustin Pop
readJobDataFromDisk rootdir archived jid = do
379 aa79e62e Iustin Pop
  let live_path = liveJobFile rootdir jid
380 aa79e62e Iustin Pop
      archived_path = archivedJobFile rootdir jid
381 aa79e62e Iustin Pop
      all_paths = if archived
382 aa79e62e Iustin Pop
                    then [(live_path, False), (archived_path, True)]
383 aa79e62e Iustin Pop
                    else [(live_path, False)]
384 aa79e62e Iustin Pop
  foldM (\state (path, isarchived) ->
385 aa79e62e Iustin Pop
           liftM (\r -> Just (r, isarchived)) (readFile path)
386 aa79e62e Iustin Pop
             `Control.Exception.catch`
387 aa79e62e Iustin Pop
             ignoreIOError state True
388 aa79e62e Iustin Pop
               ("Failed to read job file " ++ path)) Nothing all_paths
389 aa79e62e Iustin Pop
390 aa79e62e Iustin Pop
-- | Failed to load job error.
391 aa79e62e Iustin Pop
noSuchJob :: Result (QueuedJob, Bool)
392 aa79e62e Iustin Pop
noSuchJob = Bad "Can't load job file"
393 aa79e62e Iustin Pop
394 aa79e62e Iustin Pop
-- | Loads a job from disk.
395 aa79e62e Iustin Pop
loadJobFromDisk :: FilePath -> Bool -> JobId -> IO (Result (QueuedJob, Bool))
396 aa79e62e Iustin Pop
loadJobFromDisk rootdir archived jid = do
397 aa79e62e Iustin Pop
  raw <- readJobDataFromDisk rootdir archived jid
398 aa79e62e Iustin Pop
  -- note: we need some stricness below, otherwise the wrapping in a
399 aa79e62e Iustin Pop
  -- Result will create too much lazyness, and not close the file
400 aa79e62e Iustin Pop
  -- descriptors for the individual jobs
401 aa79e62e Iustin Pop
  return $! case raw of
402 aa79e62e Iustin Pop
             Nothing -> noSuchJob
403 aa79e62e Iustin Pop
             Just (str, arch) ->
404 aa79e62e Iustin Pop
               liftM (\qj -> (qj, arch)) .
405 aa79e62e Iustin Pop
               fromJResult "Parsing job file" $ Text.JSON.decode str
406 cef3f99f Klaus Aehlig
407 b498ed42 Klaus Aehlig
-- | Write a job to disk.
408 b498ed42 Klaus Aehlig
writeJobToDisk :: FilePath -> QueuedJob -> IO (Result ())
409 b498ed42 Klaus Aehlig
writeJobToDisk rootdir job = do
410 b498ed42 Klaus Aehlig
  let filename = liveJobFile rootdir . qjId $ job
411 b498ed42 Klaus Aehlig
      content = Text.JSON.encode . Text.JSON.showJSON $ job
412 b498ed42 Klaus Aehlig
  tryAndLogIOError (atomicWriteFile filename content)
413 b498ed42 Klaus Aehlig
                   ("Failed to write " ++ filename) Ok
414 b498ed42 Klaus Aehlig
415 b5a96995 Klaus Aehlig
-- | Replicate a job to all master candidates.
416 b5a96995 Klaus Aehlig
replicateJob :: FilePath -> [Node] -> QueuedJob -> IO [(Node, ERpcError ())]
417 b5a96995 Klaus Aehlig
replicateJob rootdir mastercandidates job = do
418 b5a96995 Klaus Aehlig
  let filename = liveJobFile rootdir . qjId $ job
419 b5a96995 Klaus Aehlig
      content = Text.JSON.encode . Text.JSON.showJSON $ job
420 77676415 Klaus Aehlig
  filename' <- makeVirtualPath filename
421 557f5dad Klaus Aehlig
  callresult <- executeRpcCall mastercandidates
422 77676415 Klaus Aehlig
                  $ RpcCallJobqueueUpdate filename' content
423 557f5dad Klaus Aehlig
  let result = map (second (() <$)) callresult
424 b5a96995 Klaus Aehlig
  logRpcErrors result
425 b5a96995 Klaus Aehlig
  return result
426 b5a96995 Klaus Aehlig
427 9fd653a4 Klaus Aehlig
-- | Replicate many jobs to all master candidates.
428 9fd653a4 Klaus Aehlig
replicateManyJobs :: FilePath -> [Node] -> [QueuedJob] -> IO ()
429 9fd653a4 Klaus Aehlig
replicateManyJobs rootdir mastercandidates =
430 9fd653a4 Klaus Aehlig
  mapM_ (replicateJob rootdir mastercandidates)
431 9fd653a4 Klaus Aehlig
432 cef3f99f Klaus Aehlig
-- | Read the job serial number from disk.
433 cef3f99f Klaus Aehlig
readSerialFromDisk :: IO (Result JobId)
434 cef3f99f Klaus Aehlig
readSerialFromDisk = do
435 cef3f99f Klaus Aehlig
  filename <- jobQueueSerialFile
436 cef3f99f Klaus Aehlig
  tryAndLogIOError (readFile filename) "Failed to read serial file"
437 cef3f99f Klaus Aehlig
                   (makeJobIdS . rStripSpace)
438 ae858516 Klaus Aehlig
439 ae858516 Klaus Aehlig
-- | Allocate new job ids.
440 ae858516 Klaus Aehlig
-- To avoid races while accessing the serial file, the threads synchronize
441 ae858516 Klaus Aehlig
-- over a lock, as usual provided by an MVar.
442 ae858516 Klaus Aehlig
allocateJobIds :: [Node] -> MVar () -> Int -> IO (Result [JobId])
443 ae858516 Klaus Aehlig
allocateJobIds mastercandidates lock n =
444 ae858516 Klaus Aehlig
  if n <= 0
445 ae858516 Klaus Aehlig
    then return . Bad $ "Can only allocate positive number of job ids"
446 ae858516 Klaus Aehlig
    else do
447 ae858516 Klaus Aehlig
      takeMVar lock
448 ae858516 Klaus Aehlig
      rjobid <- readSerialFromDisk
449 ae858516 Klaus Aehlig
      case rjobid of
450 ae858516 Klaus Aehlig
        Bad s -> do
451 ae858516 Klaus Aehlig
          putMVar lock ()
452 ae858516 Klaus Aehlig
          return . Bad $ s
453 ae858516 Klaus Aehlig
        Ok jid -> do
454 ae858516 Klaus Aehlig
          let current = fromJobId jid
455 ae858516 Klaus Aehlig
              serial_content = show (current + n) ++  "\n"
456 ae858516 Klaus Aehlig
          serial <- jobQueueSerialFile
457 ae858516 Klaus Aehlig
          write_result <- try $ atomicWriteFile serial serial_content
458 ae858516 Klaus Aehlig
                          :: IO (Either IOError ())
459 ae858516 Klaus Aehlig
          case write_result of
460 ae858516 Klaus Aehlig
            Left e -> do
461 f7819050 Klaus Aehlig
              putMVar lock ()
462 ae858516 Klaus Aehlig
              let msg = "Failed to write serial file: " ++ show e
463 ae858516 Klaus Aehlig
              logError msg
464 ae858516 Klaus Aehlig
              return . Bad $ msg 
465 ae858516 Klaus Aehlig
            Right () -> do
466 77676415 Klaus Aehlig
              serial' <- makeVirtualPath serial
467 ae858516 Klaus Aehlig
              _ <- executeRpcCall mastercandidates
468 77676415 Klaus Aehlig
                     $ RpcCallJobqueueUpdate serial' serial_content
469 f7819050 Klaus Aehlig
              putMVar lock ()
470 ae858516 Klaus Aehlig
              return $ mapM makeJobId [(current+1)..(current+n)]
471 ae858516 Klaus Aehlig
472 ae858516 Klaus Aehlig
-- | Allocate one new job id.
473 ae858516 Klaus Aehlig
allocateJobId :: [Node] -> MVar () -> IO (Result JobId)
474 ae858516 Klaus Aehlig
allocateJobId mastercandidates lock = do
475 ae858516 Klaus Aehlig
  jids <- allocateJobIds mastercandidates lock 1
476 ae858516 Klaus Aehlig
  return (jids >>= monadicThe "Failed to allocate precisely one Job ID")
477 1b94c0db Klaus Aehlig
478 1b94c0db Klaus Aehlig
-- | Decide if job queue is open
479 1b94c0db Klaus Aehlig
isQueueOpen :: IO Bool
480 1b94c0db Klaus Aehlig
isQueueOpen = liftM not (jobQueueDrainFile >>= doesFileExist)
481 493d6920 Klaus Aehlig
482 ac0c5c6d Klaus Aehlig
-- | Start enqueued jobs, currently by handing them over to masterd.
483 ac0c5c6d Klaus Aehlig
startJobs :: [QueuedJob] -> IO ()
484 ac0c5c6d Klaus Aehlig
startJobs jobs = do
485 493d6920 Klaus Aehlig
  socketpath <- defaultMasterSocket
486 d605e261 Petr Pudlak
  client <- getLuxiClient socketpath
487 493d6920 Klaus Aehlig
  pickupResults <- mapM (flip callMethod client . PickupJob . qjId) jobs
488 493d6920 Klaus Aehlig
  let failures = map show $ justBad pickupResults
489 493d6920 Klaus Aehlig
  unless (null failures)
490 493d6920 Klaus Aehlig
   . logWarning . (++) "Failed to notify masterd: " . commaJoin $ failures
491 47c3c7b1 Klaus Aehlig
492 47c3c7b1 Klaus Aehlig
-- | Try to cancel a job that has already been handed over to execution,
493 47c3c7b1 Klaus Aehlig
-- currently by asking masterd to cancel it.
494 47c3c7b1 Klaus Aehlig
cancelJob :: JobId -> IO (ErrorResult JSValue)
495 47c3c7b1 Klaus Aehlig
cancelJob jid = do
496 47c3c7b1 Klaus Aehlig
  socketpath <- defaultMasterSocket
497 47c3c7b1 Klaus Aehlig
  client <- getLuxiClient socketpath
498 47c3c7b1 Klaus Aehlig
  callMethod (CancelJob jid) client