Statistics
| Branch: | Tag: | Revision:

root / src / Ganeti / JQueue.hs @ a7ab381a

History | View | Annotate | Download (22.1 kB)

1 aa79e62e Iustin Pop
{-# LANGUAGE TemplateHaskell #-}
2 aa79e62e Iustin Pop
3 aa79e62e Iustin Pop
{-| Implementation of the job queue.
4 aa79e62e Iustin Pop
5 aa79e62e Iustin Pop
-}
6 aa79e62e Iustin Pop
7 aa79e62e Iustin Pop
{-
8 aa79e62e Iustin Pop
9 aa79e62e Iustin Pop
Copyright (C) 2010, 2012 Google Inc.
10 aa79e62e Iustin Pop
11 aa79e62e Iustin Pop
This program is free software; you can redistribute it and/or modify
12 aa79e62e Iustin Pop
it under the terms of the GNU General Public License as published by
13 aa79e62e Iustin Pop
the Free Software Foundation; either version 2 of the License, or
14 aa79e62e Iustin Pop
(at your option) any later version.
15 aa79e62e Iustin Pop
16 aa79e62e Iustin Pop
This program is distributed in the hope that it will be useful, but
17 aa79e62e Iustin Pop
WITHOUT ANY WARRANTY; without even the implied warranty of
18 aa79e62e Iustin Pop
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
19 aa79e62e Iustin Pop
General Public License for more details.
20 aa79e62e Iustin Pop
21 aa79e62e Iustin Pop
You should have received a copy of the GNU General Public License
22 aa79e62e Iustin Pop
along with this program; if not, write to the Free Software
23 aa79e62e Iustin Pop
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
24 aa79e62e Iustin Pop
02110-1301, USA.
25 aa79e62e Iustin Pop
26 aa79e62e Iustin Pop
-}
27 aa79e62e Iustin Pop
28 aa79e62e Iustin Pop
module Ganeti.JQueue
29 aa79e62e Iustin Pop
    ( QueuedOpCode(..)
30 aa79e62e Iustin Pop
    , QueuedJob(..)
31 aa79e62e Iustin Pop
    , InputOpCode(..)
32 4b49a72b Klaus Aehlig
    , queuedOpCodeFromMetaOpCode
33 1c1132f4 Klaus Aehlig
    , queuedJobFromOpCodes
34 a7ab381a Klaus Aehlig
    , changeOpCodePriority
35 363dc9d6 Klaus Aehlig
    , cancelQueuedJob
36 aa79e62e Iustin Pop
    , Timestamp
37 ae66f3a9 Klaus Aehlig
    , fromClockTime
38 aa79e62e Iustin Pop
    , noTimestamp
39 c3a70209 Klaus Aehlig
    , currentTimestamp
40 8b5a4b9a Klaus Aehlig
    , advanceTimestamp
41 2af22d70 Klaus Aehlig
    , setReceivedTimestamp
42 aa79e62e Iustin Pop
    , opStatusFinalized
43 aa79e62e Iustin Pop
    , extractOpSummary
44 aa79e62e Iustin Pop
    , calcJobStatus
45 02e40b13 Klaus Aehlig
    , jobStarted
46 847df9e9 Klaus Aehlig
    , jobFinalized
47 370f63be Klaus Aehlig
    , jobArchivable
48 aa79e62e Iustin Pop
    , calcJobPriority
49 aa79e62e Iustin Pop
    , jobFileName
50 aa79e62e Iustin Pop
    , liveJobFile
51 aa79e62e Iustin Pop
    , archivedJobFile
52 aa79e62e Iustin Pop
    , determineJobDirectories
53 aa79e62e Iustin Pop
    , getJobIDs
54 aa79e62e Iustin Pop
    , sortJobIDs
55 aa79e62e Iustin Pop
    , loadJobFromDisk
56 aa79e62e Iustin Pop
    , noSuchJob
57 cef3f99f Klaus Aehlig
    , readSerialFromDisk
58 ae858516 Klaus Aehlig
    , allocateJobIds
59 ae858516 Klaus Aehlig
    , allocateJobId
60 b498ed42 Klaus Aehlig
    , writeJobToDisk
61 9fd653a4 Klaus Aehlig
    , replicateManyJobs
62 1b94c0db Klaus Aehlig
    , isQueueOpen
63 ac0c5c6d Klaus Aehlig
    , startJobs
64 47c3c7b1 Klaus Aehlig
    , cancelJob
65 f23daea8 Klaus Aehlig
    , queueDirPermissions
66 c867cfe1 Klaus Aehlig
    , archiveJobs
67 aa79e62e Iustin Pop
    ) where
68 aa79e62e Iustin Pop
69 370f63be Klaus Aehlig
import Control.Applicative (liftA2, (<|>))
70 8b5a4b9a Klaus Aehlig
import Control.Arrow (first, second)
71 c867cfe1 Klaus Aehlig
import Control.Concurrent (forkIO)
72 ae858516 Klaus Aehlig
import Control.Concurrent.MVar
73 aa79e62e Iustin Pop
import Control.Exception
74 aa79e62e Iustin Pop
import Control.Monad
75 557f5dad Klaus Aehlig
import Data.Functor ((<$))
76 aa79e62e Iustin Pop
import Data.List
77 4b49a72b Klaus Aehlig
import Data.Maybe
78 aa79e62e Iustin Pop
import Data.Ord (comparing)
79 aa79e62e Iustin Pop
-- workaround what seems to be a bug in ghc 7.4's TH shadowing code
80 557f5dad Klaus Aehlig
import Prelude hiding (id, log)
81 aa79e62e Iustin Pop
import System.Directory
82 aa79e62e Iustin Pop
import System.FilePath
83 aa79e62e Iustin Pop
import System.IO.Error (isDoesNotExistError)
84 aa79e62e Iustin Pop
import System.Posix.Files
85 c3a70209 Klaus Aehlig
import System.Time
86 aa79e62e Iustin Pop
import qualified Text.JSON
87 aa79e62e Iustin Pop
import Text.JSON.Types
88 aa79e62e Iustin Pop
89 aa79e62e Iustin Pop
import Ganeti.BasicTypes
90 c867cfe1 Klaus Aehlig
import qualified Ganeti.Config as Config
91 aa79e62e Iustin Pop
import qualified Ganeti.Constants as C
92 47c3c7b1 Klaus Aehlig
import Ganeti.Errors (ErrorResult)
93 aa79e62e Iustin Pop
import Ganeti.JSON
94 aa79e62e Iustin Pop
import Ganeti.Logging
95 493d6920 Klaus Aehlig
import Ganeti.Luxi
96 c867cfe1 Klaus Aehlig
import Ganeti.Objects (ConfigData, Node)
97 aa79e62e Iustin Pop
import Ganeti.OpCodes
98 aa79e62e Iustin Pop
import Ganeti.Path
99 b5a96995 Klaus Aehlig
import Ganeti.Rpc (executeRpcCall, ERpcError, logRpcErrors,
100 c867cfe1 Klaus Aehlig
                   RpcCallJobqueueUpdate(..), RpcCallJobqueueRename(..))
101 aa79e62e Iustin Pop
import Ganeti.THH
102 aa79e62e Iustin Pop
import Ganeti.Types
103 cef3f99f Klaus Aehlig
import Ganeti.Utils
104 77676415 Klaus Aehlig
import Ganeti.VCluster (makeVirtualPath)
105 aa79e62e Iustin Pop
106 aa79e62e Iustin Pop
-- * Data types
107 aa79e62e Iustin Pop
108 c3a70209 Klaus Aehlig
-- | The ganeti queue timestamp type. It represents the time as the pair
109 c3a70209 Klaus Aehlig
-- of seconds since the epoch and microseconds since the beginning of the
110 c3a70209 Klaus Aehlig
-- second.
111 aa79e62e Iustin Pop
type Timestamp = (Int, Int)
112 aa79e62e Iustin Pop
113 aa79e62e Iustin Pop
-- | Missing timestamp type.
114 aa79e62e Iustin Pop
noTimestamp :: Timestamp
115 aa79e62e Iustin Pop
noTimestamp = (-1, -1)
116 aa79e62e Iustin Pop
117 ae66f3a9 Klaus Aehlig
-- | Obtain a Timestamp from a given clock time
118 ae66f3a9 Klaus Aehlig
fromClockTime :: ClockTime -> Timestamp
119 ae66f3a9 Klaus Aehlig
fromClockTime (TOD ctime pico) =
120 ae66f3a9 Klaus Aehlig
  (fromIntegral ctime, fromIntegral $ pico `div` 1000000)
121 ae66f3a9 Klaus Aehlig
122 c3a70209 Klaus Aehlig
-- | Get the current time in the job-queue timestamp format.
123 c3a70209 Klaus Aehlig
currentTimestamp :: IO Timestamp
124 ae66f3a9 Klaus Aehlig
currentTimestamp = fromClockTime `liftM` getClockTime
125 c3a70209 Klaus Aehlig
126 8b5a4b9a Klaus Aehlig
-- | From a given timestamp, obtain the timestamp of the
127 8b5a4b9a Klaus Aehlig
-- time that is the given number of seconds later.
128 8b5a4b9a Klaus Aehlig
advanceTimestamp :: Int -> Timestamp -> Timestamp
129 8b5a4b9a Klaus Aehlig
advanceTimestamp = first . (+)
130 8b5a4b9a Klaus Aehlig
131 aa79e62e Iustin Pop
-- | An input opcode.
132 aa79e62e Iustin Pop
data InputOpCode = ValidOpCode MetaOpCode -- ^ OpCode was parsed successfully
133 aa79e62e Iustin Pop
                 | InvalidOpCode JSValue  -- ^ Invalid opcode
134 aa79e62e Iustin Pop
                   deriving (Show, Eq)
135 aa79e62e Iustin Pop
136 aa79e62e Iustin Pop
-- | JSON instance for 'InputOpCode', trying to parse it and if
137 aa79e62e Iustin Pop
-- failing, keeping the original JSValue.
138 aa79e62e Iustin Pop
instance Text.JSON.JSON InputOpCode where
139 aa79e62e Iustin Pop
  showJSON (ValidOpCode mo) = Text.JSON.showJSON mo
140 aa79e62e Iustin Pop
  showJSON (InvalidOpCode inv) = inv
141 aa79e62e Iustin Pop
  readJSON v = case Text.JSON.readJSON v of
142 aa79e62e Iustin Pop
                 Text.JSON.Error _ -> return $ InvalidOpCode v
143 aa79e62e Iustin Pop
                 Text.JSON.Ok mo -> return $ ValidOpCode mo
144 aa79e62e Iustin Pop
145 aa79e62e Iustin Pop
-- | Invalid opcode summary.
146 aa79e62e Iustin Pop
invalidOp :: String
147 aa79e62e Iustin Pop
invalidOp = "INVALID_OP"
148 aa79e62e Iustin Pop
149 aa79e62e Iustin Pop
-- | Tries to extract the opcode summary from an 'InputOpCode'. This
150 aa79e62e Iustin Pop
-- duplicates some functionality from the 'opSummary' function in
151 aa79e62e Iustin Pop
-- "Ganeti.OpCodes".
152 aa79e62e Iustin Pop
extractOpSummary :: InputOpCode -> String
153 aa79e62e Iustin Pop
extractOpSummary (ValidOpCode metaop) = opSummary $ metaOpCode metaop
154 aa79e62e Iustin Pop
extractOpSummary (InvalidOpCode (JSObject o)) =
155 aa79e62e Iustin Pop
  case fromObjWithDefault (fromJSObject o) "OP_ID" ("OP_" ++ invalidOp) of
156 aa79e62e Iustin Pop
    Just s -> drop 3 s -- drop the OP_ prefix
157 aa79e62e Iustin Pop
    Nothing -> invalidOp
158 aa79e62e Iustin Pop
extractOpSummary _ = invalidOp
159 aa79e62e Iustin Pop
160 aa79e62e Iustin Pop
$(buildObject "QueuedOpCode" "qo"
161 aa79e62e Iustin Pop
  [ simpleField "input"           [t| InputOpCode |]
162 aa79e62e Iustin Pop
  , simpleField "status"          [t| OpStatus    |]
163 aa79e62e Iustin Pop
  , simpleField "result"          [t| JSValue     |]
164 aa79e62e Iustin Pop
  , defaultField [| [] |] $
165 aa79e62e Iustin Pop
    simpleField "log"             [t| [(Int, Timestamp, ELogType, JSValue)] |]
166 aa79e62e Iustin Pop
  , simpleField "priority"        [t| Int         |]
167 aa79e62e Iustin Pop
  , optionalNullSerField $
168 aa79e62e Iustin Pop
    simpleField "start_timestamp" [t| Timestamp   |]
169 aa79e62e Iustin Pop
  , optionalNullSerField $
170 aa79e62e Iustin Pop
    simpleField "exec_timestamp"  [t| Timestamp   |]
171 aa79e62e Iustin Pop
  , optionalNullSerField $
172 aa79e62e Iustin Pop
    simpleField "end_timestamp"   [t| Timestamp   |]
173 aa79e62e Iustin Pop
  ])
174 aa79e62e Iustin Pop
175 aa79e62e Iustin Pop
$(buildObject "QueuedJob" "qj"
176 aa79e62e Iustin Pop
  [ simpleField "id"                 [t| JobId          |]
177 aa79e62e Iustin Pop
  , simpleField "ops"                [t| [QueuedOpCode] |]
178 aa79e62e Iustin Pop
  , optionalNullSerField $
179 aa79e62e Iustin Pop
    simpleField "received_timestamp" [t| Timestamp      |]
180 aa79e62e Iustin Pop
  , optionalNullSerField $
181 aa79e62e Iustin Pop
    simpleField "start_timestamp"    [t| Timestamp      |]
182 aa79e62e Iustin Pop
  , optionalNullSerField $
183 aa79e62e Iustin Pop
    simpleField "end_timestamp"      [t| Timestamp      |]
184 aa79e62e Iustin Pop
  ])
185 aa79e62e Iustin Pop
186 4b49a72b Klaus Aehlig
-- | Convenience function to obtain a QueuedOpCode from a MetaOpCode
187 4b49a72b Klaus Aehlig
queuedOpCodeFromMetaOpCode :: MetaOpCode -> QueuedOpCode
188 4b49a72b Klaus Aehlig
queuedOpCodeFromMetaOpCode op =
189 4b49a72b Klaus Aehlig
  QueuedOpCode { qoInput = ValidOpCode op
190 4b49a72b Klaus Aehlig
               , qoStatus = OP_STATUS_QUEUED
191 4b49a72b Klaus Aehlig
               , qoPriority = opSubmitPriorityToRaw . opPriority . metaParams
192 4b49a72b Klaus Aehlig
                              $ op
193 4b49a72b Klaus Aehlig
               , qoLog = []
194 4b49a72b Klaus Aehlig
               , qoResult = JSNull
195 4b49a72b Klaus Aehlig
               , qoStartTimestamp = Nothing
196 4b49a72b Klaus Aehlig
               , qoEndTimestamp = Nothing
197 4b49a72b Klaus Aehlig
               , qoExecTimestamp = Nothing
198 4b49a72b Klaus Aehlig
               }
199 4b49a72b Klaus Aehlig
200 1c1132f4 Klaus Aehlig
-- | From a job-id and a list of op-codes create a job. This is
201 1c1132f4 Klaus Aehlig
-- the pure part of job creation, as allocating a new job id
202 1c1132f4 Klaus Aehlig
-- lives in IO.
203 1c1132f4 Klaus Aehlig
queuedJobFromOpCodes :: (Monad m) => JobId -> [MetaOpCode] -> m QueuedJob
204 1c1132f4 Klaus Aehlig
queuedJobFromOpCodes jobid ops = do
205 1c1132f4 Klaus Aehlig
  ops' <- mapM (`resolveDependencies` jobid) ops
206 1c1132f4 Klaus Aehlig
  return QueuedJob { qjId = jobid
207 1c1132f4 Klaus Aehlig
                   , qjOps = map queuedOpCodeFromMetaOpCode ops'
208 1c1132f4 Klaus Aehlig
                   , qjReceivedTimestamp = Nothing 
209 1c1132f4 Klaus Aehlig
                   , qjStartTimestamp = Nothing
210 1c1132f4 Klaus Aehlig
                   , qjEndTimestamp = Nothing
211 1c1132f4 Klaus Aehlig
                   }
212 1c1132f4 Klaus Aehlig
213 2af22d70 Klaus Aehlig
-- | Attach a received timestamp to a Queued Job.
214 2af22d70 Klaus Aehlig
setReceivedTimestamp :: Timestamp -> QueuedJob -> QueuedJob
215 2af22d70 Klaus Aehlig
setReceivedTimestamp ts job = job { qjReceivedTimestamp = Just ts }
216 2af22d70 Klaus Aehlig
217 a7ab381a Klaus Aehlig
-- | Change the priority of a QueuedOpCode, if it is not already
218 a7ab381a Klaus Aehlig
-- finalized.
219 a7ab381a Klaus Aehlig
changeOpCodePriority :: Int -> QueuedOpCode -> QueuedOpCode
220 a7ab381a Klaus Aehlig
changeOpCodePriority prio op =
221 a7ab381a Klaus Aehlig
  if qoStatus op > OP_STATUS_RUNNING
222 a7ab381a Klaus Aehlig
     then op
223 a7ab381a Klaus Aehlig
     else op { qoPriority = prio }
224 a7ab381a Klaus Aehlig
225 363dc9d6 Klaus Aehlig
-- | Set the state of a QueuedOpCode to canceled.
226 363dc9d6 Klaus Aehlig
cancelOpCode :: Timestamp -> QueuedOpCode -> QueuedOpCode
227 363dc9d6 Klaus Aehlig
cancelOpCode now op =
228 363dc9d6 Klaus Aehlig
  op { qoStatus = OP_STATUS_CANCELED, qoEndTimestamp = Just now }
229 363dc9d6 Klaus Aehlig
230 363dc9d6 Klaus Aehlig
-- | Transform a QueuedJob that has not been started into its canceled form.
231 363dc9d6 Klaus Aehlig
cancelQueuedJob :: Timestamp -> QueuedJob -> QueuedJob
232 363dc9d6 Klaus Aehlig
cancelQueuedJob now job =
233 363dc9d6 Klaus Aehlig
  let ops' = map (cancelOpCode now) $ qjOps job
234 363dc9d6 Klaus Aehlig
  in job { qjOps = ops', qjEndTimestamp = Just now}
235 363dc9d6 Klaus Aehlig
236 aa79e62e Iustin Pop
-- | Job file prefix.
237 aa79e62e Iustin Pop
jobFilePrefix :: String
238 aa79e62e Iustin Pop
jobFilePrefix = "job-"
239 aa79e62e Iustin Pop
240 aa79e62e Iustin Pop
-- | Computes the filename for a given job ID.
241 aa79e62e Iustin Pop
jobFileName :: JobId -> FilePath
242 aa79e62e Iustin Pop
jobFileName jid = jobFilePrefix ++ show (fromJobId jid)
243 aa79e62e Iustin Pop
244 aa79e62e Iustin Pop
-- | Parses a job ID from a file name.
245 aa79e62e Iustin Pop
parseJobFileId :: (Monad m) => FilePath -> m JobId
246 aa79e62e Iustin Pop
parseJobFileId path =
247 aa79e62e Iustin Pop
  case stripPrefix jobFilePrefix path of
248 aa79e62e Iustin Pop
    Nothing -> fail $ "Job file '" ++ path ++
249 aa79e62e Iustin Pop
                      "' doesn't have the correct prefix"
250 aa79e62e Iustin Pop
    Just suffix -> makeJobIdS suffix
251 aa79e62e Iustin Pop
252 aa79e62e Iustin Pop
-- | Computes the full path to a live job.
253 aa79e62e Iustin Pop
liveJobFile :: FilePath -> JobId -> FilePath
254 aa79e62e Iustin Pop
liveJobFile rootdir jid = rootdir </> jobFileName jid
255 aa79e62e Iustin Pop
256 aa79e62e Iustin Pop
-- | Computes the full path to an archives job. BROKEN.
257 aa79e62e Iustin Pop
archivedJobFile :: FilePath -> JobId -> FilePath
258 aa79e62e Iustin Pop
archivedJobFile rootdir jid =
259 aa79e62e Iustin Pop
  let subdir = show (fromJobId jid `div` C.jstoreJobsPerArchiveDirectory)
260 aa79e62e Iustin Pop
  in rootdir </> jobQueueArchiveSubDir </> subdir </> jobFileName jid
261 aa79e62e Iustin Pop
262 aa79e62e Iustin Pop
-- | Map from opcode status to job status.
263 aa79e62e Iustin Pop
opStatusToJob :: OpStatus -> JobStatus
264 aa79e62e Iustin Pop
opStatusToJob OP_STATUS_QUEUED    = JOB_STATUS_QUEUED
265 aa79e62e Iustin Pop
opStatusToJob OP_STATUS_WAITING   = JOB_STATUS_WAITING
266 aa79e62e Iustin Pop
opStatusToJob OP_STATUS_SUCCESS   = JOB_STATUS_SUCCESS
267 aa79e62e Iustin Pop
opStatusToJob OP_STATUS_RUNNING   = JOB_STATUS_RUNNING
268 aa79e62e Iustin Pop
opStatusToJob OP_STATUS_CANCELING = JOB_STATUS_CANCELING
269 aa79e62e Iustin Pop
opStatusToJob OP_STATUS_CANCELED  = JOB_STATUS_CANCELED
270 aa79e62e Iustin Pop
opStatusToJob OP_STATUS_ERROR     = JOB_STATUS_ERROR
271 aa79e62e Iustin Pop
272 aa79e62e Iustin Pop
-- | Computes a queued job's status.
273 aa79e62e Iustin Pop
calcJobStatus :: QueuedJob -> JobStatus
274 aa79e62e Iustin Pop
calcJobStatus QueuedJob { qjOps = ops } =
275 aa79e62e Iustin Pop
  extractOpSt (map qoStatus ops) JOB_STATUS_QUEUED True
276 aa79e62e Iustin Pop
    where
277 aa79e62e Iustin Pop
      terminalStatus OP_STATUS_ERROR     = True
278 aa79e62e Iustin Pop
      terminalStatus OP_STATUS_CANCELING = True
279 aa79e62e Iustin Pop
      terminalStatus OP_STATUS_CANCELED  = True
280 aa79e62e Iustin Pop
      terminalStatus _                   = False
281 aa79e62e Iustin Pop
      softStatus     OP_STATUS_SUCCESS   = True
282 aa79e62e Iustin Pop
      softStatus     OP_STATUS_QUEUED    = True
283 aa79e62e Iustin Pop
      softStatus     _                   = False
284 aa79e62e Iustin Pop
      extractOpSt [] _ True = JOB_STATUS_SUCCESS
285 aa79e62e Iustin Pop
      extractOpSt [] d False = d
286 aa79e62e Iustin Pop
      extractOpSt (x:xs) d old_all
287 aa79e62e Iustin Pop
           | terminalStatus x = opStatusToJob x -- abort recursion
288 aa79e62e Iustin Pop
           | softStatus x     = extractOpSt xs d new_all -- continue unchanged
289 aa79e62e Iustin Pop
           | otherwise        = extractOpSt xs (opStatusToJob x) new_all
290 aa79e62e Iustin Pop
           where new_all = x == OP_STATUS_SUCCESS && old_all
291 aa79e62e Iustin Pop
292 02e40b13 Klaus Aehlig
-- | Determine if a job has started
293 02e40b13 Klaus Aehlig
jobStarted :: QueuedJob -> Bool
294 02e40b13 Klaus Aehlig
jobStarted = (> JOB_STATUS_QUEUED) . calcJobStatus
295 02e40b13 Klaus Aehlig
296 847df9e9 Klaus Aehlig
-- | Determine if a job is finalised.
297 847df9e9 Klaus Aehlig
jobFinalized :: QueuedJob -> Bool
298 847df9e9 Klaus Aehlig
jobFinalized = (> JOB_STATUS_RUNNING) . calcJobStatus
299 847df9e9 Klaus Aehlig
300 370f63be Klaus Aehlig
-- | Determine if a job is finalized and its timestamp is before
301 370f63be Klaus Aehlig
-- a given time.
302 370f63be Klaus Aehlig
jobArchivable :: Timestamp -> QueuedJob -> Bool
303 370f63be Klaus Aehlig
jobArchivable ts = liftA2 (&&) jobFinalized 
304 370f63be Klaus Aehlig
  $ maybe False (< ts)
305 370f63be Klaus Aehlig
    .  liftA2 (<|>) qjEndTimestamp qjStartTimestamp
306 370f63be Klaus Aehlig
307 aa79e62e Iustin Pop
-- | Determine whether an opcode status is finalized.
308 aa79e62e Iustin Pop
opStatusFinalized :: OpStatus -> Bool
309 aa79e62e Iustin Pop
opStatusFinalized = (> OP_STATUS_RUNNING)
310 aa79e62e Iustin Pop
311 aa79e62e Iustin Pop
-- | Compute a job's priority.
312 aa79e62e Iustin Pop
calcJobPriority :: QueuedJob -> Int
313 aa79e62e Iustin Pop
calcJobPriority QueuedJob { qjOps = ops } =
314 aa79e62e Iustin Pop
  helper . map qoPriority $ filter (not . opStatusFinalized . qoStatus) ops
315 aa79e62e Iustin Pop
    where helper [] = C.opPrioDefault
316 aa79e62e Iustin Pop
          helper ps = minimum ps
317 aa79e62e Iustin Pop
318 aa79e62e Iustin Pop
-- | Log but ignore an 'IOError'.
319 aa79e62e Iustin Pop
ignoreIOError :: a -> Bool -> String -> IOError -> IO a
320 aa79e62e Iustin Pop
ignoreIOError a ignore_noent msg e = do
321 aa79e62e Iustin Pop
  unless (isDoesNotExistError e && ignore_noent) .
322 aa79e62e Iustin Pop
    logWarning $ msg ++ ": " ++ show e
323 aa79e62e Iustin Pop
  return a
324 aa79e62e Iustin Pop
325 aa79e62e Iustin Pop
-- | Compute the list of existing archive directories. Note that I/O
326 aa79e62e Iustin Pop
-- exceptions are swallowed and ignored.
327 aa79e62e Iustin Pop
allArchiveDirs :: FilePath -> IO [FilePath]
328 aa79e62e Iustin Pop
allArchiveDirs rootdir = do
329 aa79e62e Iustin Pop
  let adir = rootdir </> jobQueueArchiveSubDir
330 aa79e62e Iustin Pop
  contents <- getDirectoryContents adir `Control.Exception.catch`
331 aa79e62e Iustin Pop
               ignoreIOError [] False
332 aa79e62e Iustin Pop
                 ("Failed to list queue directory " ++ adir)
333 aa79e62e Iustin Pop
  let fpaths = map (adir </>) $ filter (not . ("." `isPrefixOf`)) contents
334 aa79e62e Iustin Pop
  filterM (\path ->
335 aa79e62e Iustin Pop
             liftM isDirectory (getFileStatus (adir </> path))
336 aa79e62e Iustin Pop
               `Control.Exception.catch`
337 aa79e62e Iustin Pop
               ignoreIOError False True
338 aa79e62e Iustin Pop
                 ("Failed to stat archive path " ++ path)) fpaths
339 aa79e62e Iustin Pop
340 aa79e62e Iustin Pop
-- | Build list of directories containing job files. Note: compared to
341 aa79e62e Iustin Pop
-- the Python version, this doesn't ignore a potential lost+found
342 aa79e62e Iustin Pop
-- file.
343 aa79e62e Iustin Pop
determineJobDirectories :: FilePath -> Bool -> IO [FilePath]
344 aa79e62e Iustin Pop
determineJobDirectories rootdir archived = do
345 aa79e62e Iustin Pop
  other <- if archived
346 aa79e62e Iustin Pop
             then allArchiveDirs rootdir
347 aa79e62e Iustin Pop
             else return []
348 aa79e62e Iustin Pop
  return $ rootdir:other
349 aa79e62e Iustin Pop
350 3cecd73c Michele Tartara
-- Function equivalent to the \'sequence\' function, that cannot be used because
351 3cecd73c Michele Tartara
-- of library version conflict on Lucid.
352 3cecd73c Michele Tartara
-- FIXME: delete this and just use \'sequence\' instead when Lucid compatibility
353 3cecd73c Michele Tartara
-- will not be required anymore.
354 3cecd73c Michele Tartara
sequencer :: [Either IOError [JobId]] -> Either IOError [[JobId]]
355 3cecd73c Michele Tartara
sequencer l = fmap reverse $ foldl seqFolder (Right []) l
356 3cecd73c Michele Tartara
357 3cecd73c Michele Tartara
-- | Folding function for joining multiple [JobIds] into one list.
358 3cecd73c Michele Tartara
seqFolder :: Either IOError [[JobId]]
359 3cecd73c Michele Tartara
          -> Either IOError [JobId]
360 3cecd73c Michele Tartara
          -> Either IOError [[JobId]]
361 3cecd73c Michele Tartara
seqFolder (Left e) _ = Left e
362 3cecd73c Michele Tartara
seqFolder (Right _) (Left e) = Left e
363 3cecd73c Michele Tartara
seqFolder (Right l) (Right el) = Right $ el:l
364 3cecd73c Michele Tartara
365 aa79e62e Iustin Pop
-- | Computes the list of all jobs in the given directories.
366 be0cb2d7 Michele Tartara
getJobIDs :: [FilePath] -> IO (Either IOError [JobId])
367 3cecd73c Michele Tartara
getJobIDs paths = liftM (fmap concat . sequencer) (mapM getDirJobIDs paths)
368 aa79e62e Iustin Pop
369 aa79e62e Iustin Pop
-- | Sorts the a list of job IDs.
370 aa79e62e Iustin Pop
sortJobIDs :: [JobId] -> [JobId]
371 aa79e62e Iustin Pop
sortJobIDs = sortBy (comparing fromJobId)
372 aa79e62e Iustin Pop
373 aa79e62e Iustin Pop
-- | Computes the list of jobs in a given directory.
374 be0cb2d7 Michele Tartara
getDirJobIDs :: FilePath -> IO (Either IOError [JobId])
375 aa79e62e Iustin Pop
getDirJobIDs path = do
376 be0cb2d7 Michele Tartara
  either_contents <-
377 be0cb2d7 Michele Tartara
    try (getDirectoryContents path) :: IO (Either IOError [FilePath])
378 be0cb2d7 Michele Tartara
  case either_contents of
379 be0cb2d7 Michele Tartara
    Left e -> do
380 be0cb2d7 Michele Tartara
      logWarning $ "Failed to list job directory " ++ path ++ ": " ++ show e
381 be0cb2d7 Michele Tartara
      return $ Left e
382 be0cb2d7 Michele Tartara
    Right contents -> do
383 be0cb2d7 Michele Tartara
      let jids = foldl (\ids file ->
384 be0cb2d7 Michele Tartara
                         case parseJobFileId file of
385 be0cb2d7 Michele Tartara
                           Nothing -> ids
386 be0cb2d7 Michele Tartara
                           Just new_id -> new_id:ids) [] contents
387 be0cb2d7 Michele Tartara
      return . Right $ reverse jids
388 aa79e62e Iustin Pop
389 aa79e62e Iustin Pop
-- | Reads the job data from disk.
390 aa79e62e Iustin Pop
readJobDataFromDisk :: FilePath -> Bool -> JobId -> IO (Maybe (String, Bool))
391 aa79e62e Iustin Pop
readJobDataFromDisk rootdir archived jid = do
392 aa79e62e Iustin Pop
  let live_path = liveJobFile rootdir jid
393 aa79e62e Iustin Pop
      archived_path = archivedJobFile rootdir jid
394 aa79e62e Iustin Pop
      all_paths = if archived
395 aa79e62e Iustin Pop
                    then [(live_path, False), (archived_path, True)]
396 aa79e62e Iustin Pop
                    else [(live_path, False)]
397 aa79e62e Iustin Pop
  foldM (\state (path, isarchived) ->
398 aa79e62e Iustin Pop
           liftM (\r -> Just (r, isarchived)) (readFile path)
399 aa79e62e Iustin Pop
             `Control.Exception.catch`
400 aa79e62e Iustin Pop
             ignoreIOError state True
401 aa79e62e Iustin Pop
               ("Failed to read job file " ++ path)) Nothing all_paths
402 aa79e62e Iustin Pop
403 aa79e62e Iustin Pop
-- | Failed to load job error.
404 aa79e62e Iustin Pop
noSuchJob :: Result (QueuedJob, Bool)
405 aa79e62e Iustin Pop
noSuchJob = Bad "Can't load job file"
406 aa79e62e Iustin Pop
407 aa79e62e Iustin Pop
-- | Loads a job from disk.
408 aa79e62e Iustin Pop
loadJobFromDisk :: FilePath -> Bool -> JobId -> IO (Result (QueuedJob, Bool))
409 aa79e62e Iustin Pop
loadJobFromDisk rootdir archived jid = do
410 aa79e62e Iustin Pop
  raw <- readJobDataFromDisk rootdir archived jid
411 aa79e62e Iustin Pop
  -- note: we need some stricness below, otherwise the wrapping in a
412 aa79e62e Iustin Pop
  -- Result will create too much lazyness, and not close the file
413 aa79e62e Iustin Pop
  -- descriptors for the individual jobs
414 aa79e62e Iustin Pop
  return $! case raw of
415 aa79e62e Iustin Pop
             Nothing -> noSuchJob
416 aa79e62e Iustin Pop
             Just (str, arch) ->
417 aa79e62e Iustin Pop
               liftM (\qj -> (qj, arch)) .
418 aa79e62e Iustin Pop
               fromJResult "Parsing job file" $ Text.JSON.decode str
419 cef3f99f Klaus Aehlig
420 b498ed42 Klaus Aehlig
-- | Write a job to disk.
421 b498ed42 Klaus Aehlig
writeJobToDisk :: FilePath -> QueuedJob -> IO (Result ())
422 b498ed42 Klaus Aehlig
writeJobToDisk rootdir job = do
423 b498ed42 Klaus Aehlig
  let filename = liveJobFile rootdir . qjId $ job
424 b498ed42 Klaus Aehlig
      content = Text.JSON.encode . Text.JSON.showJSON $ job
425 b498ed42 Klaus Aehlig
  tryAndLogIOError (atomicWriteFile filename content)
426 b498ed42 Klaus Aehlig
                   ("Failed to write " ++ filename) Ok
427 b498ed42 Klaus Aehlig
428 b5a96995 Klaus Aehlig
-- | Replicate a job to all master candidates.
429 b5a96995 Klaus Aehlig
replicateJob :: FilePath -> [Node] -> QueuedJob -> IO [(Node, ERpcError ())]
430 b5a96995 Klaus Aehlig
replicateJob rootdir mastercandidates job = do
431 b5a96995 Klaus Aehlig
  let filename = liveJobFile rootdir . qjId $ job
432 b5a96995 Klaus Aehlig
      content = Text.JSON.encode . Text.JSON.showJSON $ job
433 77676415 Klaus Aehlig
  filename' <- makeVirtualPath filename
434 557f5dad Klaus Aehlig
  callresult <- executeRpcCall mastercandidates
435 77676415 Klaus Aehlig
                  $ RpcCallJobqueueUpdate filename' content
436 557f5dad Klaus Aehlig
  let result = map (second (() <$)) callresult
437 b5a96995 Klaus Aehlig
  logRpcErrors result
438 b5a96995 Klaus Aehlig
  return result
439 b5a96995 Klaus Aehlig
440 9fd653a4 Klaus Aehlig
-- | Replicate many jobs to all master candidates.
441 9fd653a4 Klaus Aehlig
replicateManyJobs :: FilePath -> [Node] -> [QueuedJob] -> IO ()
442 9fd653a4 Klaus Aehlig
replicateManyJobs rootdir mastercandidates =
443 9fd653a4 Klaus Aehlig
  mapM_ (replicateJob rootdir mastercandidates)
444 9fd653a4 Klaus Aehlig
445 cef3f99f Klaus Aehlig
-- | Read the job serial number from disk.
446 cef3f99f Klaus Aehlig
readSerialFromDisk :: IO (Result JobId)
447 cef3f99f Klaus Aehlig
readSerialFromDisk = do
448 cef3f99f Klaus Aehlig
  filename <- jobQueueSerialFile
449 cef3f99f Klaus Aehlig
  tryAndLogIOError (readFile filename) "Failed to read serial file"
450 cef3f99f Klaus Aehlig
                   (makeJobIdS . rStripSpace)
451 ae858516 Klaus Aehlig
452 ae858516 Klaus Aehlig
-- | Allocate new job ids.
453 ae858516 Klaus Aehlig
-- To avoid races while accessing the serial file, the threads synchronize
454 ae858516 Klaus Aehlig
-- over a lock, as usual provided by an MVar.
455 ae858516 Klaus Aehlig
allocateJobIds :: [Node] -> MVar () -> Int -> IO (Result [JobId])
456 ae858516 Klaus Aehlig
allocateJobIds mastercandidates lock n =
457 ae858516 Klaus Aehlig
  if n <= 0
458 ae858516 Klaus Aehlig
    then return . Bad $ "Can only allocate positive number of job ids"
459 ae858516 Klaus Aehlig
    else do
460 ae858516 Klaus Aehlig
      takeMVar lock
461 ae858516 Klaus Aehlig
      rjobid <- readSerialFromDisk
462 ae858516 Klaus Aehlig
      case rjobid of
463 ae858516 Klaus Aehlig
        Bad s -> do
464 ae858516 Klaus Aehlig
          putMVar lock ()
465 ae858516 Klaus Aehlig
          return . Bad $ s
466 ae858516 Klaus Aehlig
        Ok jid -> do
467 ae858516 Klaus Aehlig
          let current = fromJobId jid
468 ae858516 Klaus Aehlig
              serial_content = show (current + n) ++  "\n"
469 ae858516 Klaus Aehlig
          serial <- jobQueueSerialFile
470 ae858516 Klaus Aehlig
          write_result <- try $ atomicWriteFile serial serial_content
471 ae858516 Klaus Aehlig
                          :: IO (Either IOError ())
472 ae858516 Klaus Aehlig
          case write_result of
473 ae858516 Klaus Aehlig
            Left e -> do
474 f7819050 Klaus Aehlig
              putMVar lock ()
475 ae858516 Klaus Aehlig
              let msg = "Failed to write serial file: " ++ show e
476 ae858516 Klaus Aehlig
              logError msg
477 ae858516 Klaus Aehlig
              return . Bad $ msg 
478 ae858516 Klaus Aehlig
            Right () -> do
479 77676415 Klaus Aehlig
              serial' <- makeVirtualPath serial
480 ae858516 Klaus Aehlig
              _ <- executeRpcCall mastercandidates
481 77676415 Klaus Aehlig
                     $ RpcCallJobqueueUpdate serial' serial_content
482 f7819050 Klaus Aehlig
              putMVar lock ()
483 ae858516 Klaus Aehlig
              return $ mapM makeJobId [(current+1)..(current+n)]
484 ae858516 Klaus Aehlig
485 ae858516 Klaus Aehlig
-- | Allocate one new job id.
486 ae858516 Klaus Aehlig
allocateJobId :: [Node] -> MVar () -> IO (Result JobId)
487 ae858516 Klaus Aehlig
allocateJobId mastercandidates lock = do
488 ae858516 Klaus Aehlig
  jids <- allocateJobIds mastercandidates lock 1
489 ae858516 Klaus Aehlig
  return (jids >>= monadicThe "Failed to allocate precisely one Job ID")
490 1b94c0db Klaus Aehlig
491 1b94c0db Klaus Aehlig
-- | Decide if job queue is open
492 1b94c0db Klaus Aehlig
isQueueOpen :: IO Bool
493 1b94c0db Klaus Aehlig
isQueueOpen = liftM not (jobQueueDrainFile >>= doesFileExist)
494 493d6920 Klaus Aehlig
495 ac0c5c6d Klaus Aehlig
-- | Start enqueued jobs, currently by handing them over to masterd.
496 ac0c5c6d Klaus Aehlig
startJobs :: [QueuedJob] -> IO ()
497 ac0c5c6d Klaus Aehlig
startJobs jobs = do
498 493d6920 Klaus Aehlig
  socketpath <- defaultMasterSocket
499 d605e261 Petr Pudlak
  client <- getLuxiClient socketpath
500 493d6920 Klaus Aehlig
  pickupResults <- mapM (flip callMethod client . PickupJob . qjId) jobs
501 493d6920 Klaus Aehlig
  let failures = map show $ justBad pickupResults
502 493d6920 Klaus Aehlig
  unless (null failures)
503 493d6920 Klaus Aehlig
   . logWarning . (++) "Failed to notify masterd: " . commaJoin $ failures
504 47c3c7b1 Klaus Aehlig
505 47c3c7b1 Klaus Aehlig
-- | Try to cancel a job that has already been handed over to execution,
506 47c3c7b1 Klaus Aehlig
-- currently by asking masterd to cancel it.
507 47c3c7b1 Klaus Aehlig
cancelJob :: JobId -> IO (ErrorResult JSValue)
508 47c3c7b1 Klaus Aehlig
cancelJob jid = do
509 47c3c7b1 Klaus Aehlig
  socketpath <- defaultMasterSocket
510 47c3c7b1 Klaus Aehlig
  client <- getLuxiClient socketpath
511 47c3c7b1 Klaus Aehlig
  callMethod (CancelJob jid) client
512 c867cfe1 Klaus Aehlig
513 f23daea8 Klaus Aehlig
-- | Permissions for the archive directories.
514 f23daea8 Klaus Aehlig
queueDirPermissions :: FilePermissions
515 f23daea8 Klaus Aehlig
queueDirPermissions = FilePermissions { fpOwner = Just C.masterdUser
516 f23daea8 Klaus Aehlig
                                      , fpGroup = Just C.daemonsGroup
517 f23daea8 Klaus Aehlig
                                      , fpPermissions = 0o0750
518 f23daea8 Klaus Aehlig
                                      }
519 f23daea8 Klaus Aehlig
520 c867cfe1 Klaus Aehlig
-- | Try, at most until the given endtime, to archive some of the given
521 c867cfe1 Klaus Aehlig
-- jobs, if they are older than the specified cut-off time; also replicate
522 c867cfe1 Klaus Aehlig
-- archival of the additional jobs. Return the pair of the number of jobs
523 c867cfe1 Klaus Aehlig
-- archived, and the number of jobs remaining int he queue, asuming the
524 c867cfe1 Klaus Aehlig
-- given numbers about the not considered jobs.
525 c867cfe1 Klaus Aehlig
archiveSomeJobsUntil :: ([JobId] -> IO ()) -- ^ replication function
526 c867cfe1 Klaus Aehlig
                        -> FilePath -- ^ queue root directory
527 c867cfe1 Klaus Aehlig
                        -> ClockTime -- ^ Endtime
528 c867cfe1 Klaus Aehlig
                        -> Timestamp -- ^ cut-off time for archiving jobs
529 c867cfe1 Klaus Aehlig
                        -> Int -- ^ number of jobs alread archived
530 c867cfe1 Klaus Aehlig
                        -> [JobId] -- ^ Additional jobs to replicate
531 c867cfe1 Klaus Aehlig
                        -> [JobId] -- ^ List of job-ids still to consider
532 c867cfe1 Klaus Aehlig
                        -> IO (Int, Int)
533 c867cfe1 Klaus Aehlig
archiveSomeJobsUntil replicateFn _ _ _ arch torepl [] = do
534 c867cfe1 Klaus Aehlig
  unless (null torepl) . (>> return ())
535 c867cfe1 Klaus Aehlig
   . forkIO $ replicateFn torepl
536 c867cfe1 Klaus Aehlig
  return (arch, 0)
537 c867cfe1 Klaus Aehlig
538 c867cfe1 Klaus Aehlig
archiveSomeJobsUntil replicateFn qDir endt cutt arch torepl (jid:jids) = do
539 c867cfe1 Klaus Aehlig
  let archiveMore = archiveSomeJobsUntil replicateFn qDir endt cutt
540 c867cfe1 Klaus Aehlig
      continue = archiveMore arch torepl jids
541 c867cfe1 Klaus Aehlig
      jidname = show $ fromJobId jid
542 c867cfe1 Klaus Aehlig
  time <- getClockTime
543 c867cfe1 Klaus Aehlig
  if time >= endt
544 c867cfe1 Klaus Aehlig
    then do
545 c867cfe1 Klaus Aehlig
      _ <- forkIO $ replicateFn torepl
546 c867cfe1 Klaus Aehlig
      return (arch, length (jid:jids))
547 c867cfe1 Klaus Aehlig
    else do
548 c867cfe1 Klaus Aehlig
      logDebug $ "Inspecting job " ++ jidname ++ " for archival"
549 c867cfe1 Klaus Aehlig
      loadResult <- loadJobFromDisk qDir False jid
550 c867cfe1 Klaus Aehlig
      case loadResult of
551 c867cfe1 Klaus Aehlig
        Bad _ -> continue
552 c867cfe1 Klaus Aehlig
        Ok (job, _) -> 
553 c867cfe1 Klaus Aehlig
          if jobArchivable cutt job
554 c867cfe1 Klaus Aehlig
            then do
555 c867cfe1 Klaus Aehlig
              let live = liveJobFile qDir jid
556 c867cfe1 Klaus Aehlig
                  archive = archivedJobFile qDir jid
557 0c09ecc2 Klaus Aehlig
              renameResult <- safeRenameFile queueDirPermissions
558 0c09ecc2 Klaus Aehlig
                                live archive
559 c867cfe1 Klaus Aehlig
              case renameResult of                   
560 c867cfe1 Klaus Aehlig
                Bad s -> do
561 c867cfe1 Klaus Aehlig
                  logWarning $ "Renaming " ++ live ++ " to " ++ archive
562 c867cfe1 Klaus Aehlig
                                 ++ " failed unexpectedly: " ++ s
563 c867cfe1 Klaus Aehlig
                  continue
564 c867cfe1 Klaus Aehlig
                Ok () -> do
565 c867cfe1 Klaus Aehlig
                  let torepl' = jid:torepl
566 c867cfe1 Klaus Aehlig
                  if length torepl' >= 10
567 c867cfe1 Klaus Aehlig
                    then do
568 c867cfe1 Klaus Aehlig
                      _ <- forkIO $ replicateFn torepl'
569 c867cfe1 Klaus Aehlig
                      archiveMore (arch + 1) [] jids
570 c867cfe1 Klaus Aehlig
                    else archiveMore (arch + 1) torepl' jids
571 c867cfe1 Klaus Aehlig
            else continue
572 c867cfe1 Klaus Aehlig
                   
573 c867cfe1 Klaus Aehlig
-- | Archive jobs older than the given time, but do not exceed the timeout for
574 c867cfe1 Klaus Aehlig
-- carrying out this task.
575 c867cfe1 Klaus Aehlig
archiveJobs :: ConfigData -- ^ cluster configuration
576 c867cfe1 Klaus Aehlig
               -> Int  -- ^ time the job has to be in the past in order
577 c867cfe1 Klaus Aehlig
                       -- to be archived
578 c867cfe1 Klaus Aehlig
               -> Int -- ^ timeout
579 c867cfe1 Klaus Aehlig
               -> [JobId] -- ^ jobs to consider
580 c867cfe1 Klaus Aehlig
               -> IO (Int, Int)
581 c867cfe1 Klaus Aehlig
archiveJobs cfg age timeout jids = do
582 c867cfe1 Klaus Aehlig
  now <- getClockTime
583 c867cfe1 Klaus Aehlig
  qDir <- queueDir
584 c867cfe1 Klaus Aehlig
  let endtime = addToClockTime (noTimeDiff { tdSec = timeout }) now
585 c867cfe1 Klaus Aehlig
      cuttime = if age < 0 then noTimestamp
586 c867cfe1 Klaus Aehlig
                           else advanceTimestamp (- age) (fromClockTime now)
587 c867cfe1 Klaus Aehlig
      mcs = Config.getMasterCandidates cfg
588 c867cfe1 Klaus Aehlig
      replicateFn jobs = do
589 c867cfe1 Klaus Aehlig
        let olds = map (liveJobFile qDir) jobs
590 c867cfe1 Klaus Aehlig
            news = map (archivedJobFile qDir) jobs
591 c867cfe1 Klaus Aehlig
        _ <- executeRpcCall mcs . RpcCallJobqueueRename $ zip olds news
592 c867cfe1 Klaus Aehlig
        return ()
593 c867cfe1 Klaus Aehlig
  archiveSomeJobsUntil replicateFn qDir endtime cuttime 0 [] jids