Statistics
| Branch: | Tag: | Revision:

root / src / Ganeti / JQueue.hs @ 13d26b66

History | View | Annotate | Download (21.4 kB)

1 aa79e62e Iustin Pop
{-# LANGUAGE TemplateHaskell #-}
2 aa79e62e Iustin Pop
3 aa79e62e Iustin Pop
{-| Implementation of the job queue.
4 aa79e62e Iustin Pop
5 aa79e62e Iustin Pop
-}
6 aa79e62e Iustin Pop
7 aa79e62e Iustin Pop
{-
8 aa79e62e Iustin Pop
9 aa79e62e Iustin Pop
Copyright (C) 2010, 2012 Google Inc.
10 aa79e62e Iustin Pop
11 aa79e62e Iustin Pop
This program is free software; you can redistribute it and/or modify
12 aa79e62e Iustin Pop
it under the terms of the GNU General Public License as published by
13 aa79e62e Iustin Pop
the Free Software Foundation; either version 2 of the License, or
14 aa79e62e Iustin Pop
(at your option) any later version.
15 aa79e62e Iustin Pop
16 aa79e62e Iustin Pop
This program is distributed in the hope that it will be useful, but
17 aa79e62e Iustin Pop
WITHOUT ANY WARRANTY; without even the implied warranty of
18 aa79e62e Iustin Pop
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
19 aa79e62e Iustin Pop
General Public License for more details.
20 aa79e62e Iustin Pop
21 aa79e62e Iustin Pop
You should have received a copy of the GNU General Public License
22 aa79e62e Iustin Pop
along with this program; if not, write to the Free Software
23 aa79e62e Iustin Pop
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
24 aa79e62e Iustin Pop
02110-1301, USA.
25 aa79e62e Iustin Pop
26 aa79e62e Iustin Pop
-}
27 aa79e62e Iustin Pop
28 aa79e62e Iustin Pop
module Ganeti.JQueue
29 aa79e62e Iustin Pop
    ( QueuedOpCode(..)
30 aa79e62e Iustin Pop
    , QueuedJob(..)
31 aa79e62e Iustin Pop
    , InputOpCode(..)
32 4b49a72b Klaus Aehlig
    , queuedOpCodeFromMetaOpCode
33 1c1132f4 Klaus Aehlig
    , queuedJobFromOpCodes
34 a7ab381a Klaus Aehlig
    , changeOpCodePriority
35 a6b33b72 Klaus Aehlig
    , changeJobPriority
36 363dc9d6 Klaus Aehlig
    , cancelQueuedJob
37 aa79e62e Iustin Pop
    , Timestamp
38 ae66f3a9 Klaus Aehlig
    , fromClockTime
39 aa79e62e Iustin Pop
    , noTimestamp
40 c3a70209 Klaus Aehlig
    , currentTimestamp
41 8b5a4b9a Klaus Aehlig
    , advanceTimestamp
42 2af22d70 Klaus Aehlig
    , setReceivedTimestamp
43 aa79e62e Iustin Pop
    , opStatusFinalized
44 aa79e62e Iustin Pop
    , extractOpSummary
45 aa79e62e Iustin Pop
    , calcJobStatus
46 02e40b13 Klaus Aehlig
    , jobStarted
47 847df9e9 Klaus Aehlig
    , jobFinalized
48 370f63be Klaus Aehlig
    , jobArchivable
49 aa79e62e Iustin Pop
    , calcJobPriority
50 aa79e62e Iustin Pop
    , jobFileName
51 aa79e62e Iustin Pop
    , liveJobFile
52 aa79e62e Iustin Pop
    , archivedJobFile
53 aa79e62e Iustin Pop
    , determineJobDirectories
54 aa79e62e Iustin Pop
    , getJobIDs
55 aa79e62e Iustin Pop
    , sortJobIDs
56 aa79e62e Iustin Pop
    , loadJobFromDisk
57 aa79e62e Iustin Pop
    , noSuchJob
58 cef3f99f Klaus Aehlig
    , readSerialFromDisk
59 ae858516 Klaus Aehlig
    , allocateJobIds
60 ae858516 Klaus Aehlig
    , allocateJobId
61 b498ed42 Klaus Aehlig
    , writeJobToDisk
62 9fd653a4 Klaus Aehlig
    , replicateManyJobs
63 1b94c0db Klaus Aehlig
    , isQueueOpen
64 ac0c5c6d Klaus Aehlig
    , startJobs
65 47c3c7b1 Klaus Aehlig
    , cancelJob
66 f23daea8 Klaus Aehlig
    , queueDirPermissions
67 c867cfe1 Klaus Aehlig
    , archiveJobs
68 aa79e62e Iustin Pop
    ) where
69 aa79e62e Iustin Pop
70 370f63be Klaus Aehlig
import Control.Applicative (liftA2, (<|>))
71 8b5a4b9a Klaus Aehlig
import Control.Arrow (first, second)
72 c867cfe1 Klaus Aehlig
import Control.Concurrent (forkIO)
73 ae858516 Klaus Aehlig
import Control.Concurrent.MVar
74 aa79e62e Iustin Pop
import Control.Exception
75 aa79e62e Iustin Pop
import Control.Monad
76 ea7032da Petr Pudlak
import Control.Monad.IO.Class
77 557f5dad Klaus Aehlig
import Data.Functor ((<$))
78 aa79e62e Iustin Pop
import Data.List
79 4b49a72b Klaus Aehlig
import Data.Maybe
80 aa79e62e Iustin Pop
import Data.Ord (comparing)
81 aa79e62e Iustin Pop
-- workaround what seems to be a bug in ghc 7.4's TH shadowing code
82 557f5dad Klaus Aehlig
import Prelude hiding (id, log)
83 aa79e62e Iustin Pop
import System.Directory
84 aa79e62e Iustin Pop
import System.FilePath
85 aa79e62e Iustin Pop
import System.IO.Error (isDoesNotExistError)
86 aa79e62e Iustin Pop
import System.Posix.Files
87 c3a70209 Klaus Aehlig
import System.Time
88 aa79e62e Iustin Pop
import qualified Text.JSON
89 aa79e62e Iustin Pop
import Text.JSON.Types
90 aa79e62e Iustin Pop
91 aa79e62e Iustin Pop
import Ganeti.BasicTypes
92 c867cfe1 Klaus Aehlig
import qualified Ganeti.Config as Config
93 aa79e62e Iustin Pop
import qualified Ganeti.Constants as C
94 47c3c7b1 Klaus Aehlig
import Ganeti.Errors (ErrorResult)
95 aa79e62e Iustin Pop
import Ganeti.JSON
96 aa79e62e Iustin Pop
import Ganeti.Logging
97 493d6920 Klaus Aehlig
import Ganeti.Luxi
98 c867cfe1 Klaus Aehlig
import Ganeti.Objects (ConfigData, Node)
99 aa79e62e Iustin Pop
import Ganeti.OpCodes
100 aa79e62e Iustin Pop
import Ganeti.Path
101 b5a96995 Klaus Aehlig
import Ganeti.Rpc (executeRpcCall, ERpcError, logRpcErrors,
102 c867cfe1 Klaus Aehlig
                   RpcCallJobqueueUpdate(..), RpcCallJobqueueRename(..))
103 aa79e62e Iustin Pop
import Ganeti.THH
104 aa79e62e Iustin Pop
import Ganeti.Types
105 cef3f99f Klaus Aehlig
import Ganeti.Utils
106 5f6515b6 Petr Pudlak
import Ganeti.Utils.Atomic
107 77676415 Klaus Aehlig
import Ganeti.VCluster (makeVirtualPath)
108 aa79e62e Iustin Pop
109 aa79e62e Iustin Pop
-- * Data types
110 aa79e62e Iustin Pop
111 c3a70209 Klaus Aehlig
-- | The ganeti queue timestamp type. It represents the time as the pair
112 c3a70209 Klaus Aehlig
-- of seconds since the epoch and microseconds since the beginning of the
113 c3a70209 Klaus Aehlig
-- second.
114 aa79e62e Iustin Pop
type Timestamp = (Int, Int)
115 aa79e62e Iustin Pop
116 aa79e62e Iustin Pop
-- | Missing timestamp type.
117 aa79e62e Iustin Pop
noTimestamp :: Timestamp
118 aa79e62e Iustin Pop
noTimestamp = (-1, -1)
119 aa79e62e Iustin Pop
120 ae66f3a9 Klaus Aehlig
-- | Obtain a Timestamp from a given clock time
121 ae66f3a9 Klaus Aehlig
fromClockTime :: ClockTime -> Timestamp
122 ae66f3a9 Klaus Aehlig
fromClockTime (TOD ctime pico) =
123 ae66f3a9 Klaus Aehlig
  (fromIntegral ctime, fromIntegral $ pico `div` 1000000)
124 ae66f3a9 Klaus Aehlig
125 c3a70209 Klaus Aehlig
-- | Get the current time in the job-queue timestamp format.
126 c3a70209 Klaus Aehlig
currentTimestamp :: IO Timestamp
127 ae66f3a9 Klaus Aehlig
currentTimestamp = fromClockTime `liftM` getClockTime
128 c3a70209 Klaus Aehlig
129 8b5a4b9a Klaus Aehlig
-- | From a given timestamp, obtain the timestamp of the
130 8b5a4b9a Klaus Aehlig
-- time that is the given number of seconds later.
131 8b5a4b9a Klaus Aehlig
advanceTimestamp :: Int -> Timestamp -> Timestamp
132 8b5a4b9a Klaus Aehlig
advanceTimestamp = first . (+)
133 8b5a4b9a Klaus Aehlig
134 aa79e62e Iustin Pop
-- | An input opcode.
135 aa79e62e Iustin Pop
data InputOpCode = ValidOpCode MetaOpCode -- ^ OpCode was parsed successfully
136 aa79e62e Iustin Pop
                 | InvalidOpCode JSValue  -- ^ Invalid opcode
137 aa79e62e Iustin Pop
                   deriving (Show, Eq)
138 aa79e62e Iustin Pop
139 aa79e62e Iustin Pop
-- | JSON instance for 'InputOpCode', trying to parse it and if
140 aa79e62e Iustin Pop
-- failing, keeping the original JSValue.
141 aa79e62e Iustin Pop
instance Text.JSON.JSON InputOpCode where
142 aa79e62e Iustin Pop
  showJSON (ValidOpCode mo) = Text.JSON.showJSON mo
143 aa79e62e Iustin Pop
  showJSON (InvalidOpCode inv) = inv
144 aa79e62e Iustin Pop
  readJSON v = case Text.JSON.readJSON v of
145 aa79e62e Iustin Pop
                 Text.JSON.Error _ -> return $ InvalidOpCode v
146 aa79e62e Iustin Pop
                 Text.JSON.Ok mo -> return $ ValidOpCode mo
147 aa79e62e Iustin Pop
148 aa79e62e Iustin Pop
-- | Invalid opcode summary.
149 aa79e62e Iustin Pop
invalidOp :: String
150 aa79e62e Iustin Pop
invalidOp = "INVALID_OP"
151 aa79e62e Iustin Pop
152 aa79e62e Iustin Pop
-- | Tries to extract the opcode summary from an 'InputOpCode'. This
153 aa79e62e Iustin Pop
-- duplicates some functionality from the 'opSummary' function in
154 aa79e62e Iustin Pop
-- "Ganeti.OpCodes".
155 aa79e62e Iustin Pop
extractOpSummary :: InputOpCode -> String
156 aa79e62e Iustin Pop
extractOpSummary (ValidOpCode metaop) = opSummary $ metaOpCode metaop
157 aa79e62e Iustin Pop
extractOpSummary (InvalidOpCode (JSObject o)) =
158 aa79e62e Iustin Pop
  case fromObjWithDefault (fromJSObject o) "OP_ID" ("OP_" ++ invalidOp) of
159 aa79e62e Iustin Pop
    Just s -> drop 3 s -- drop the OP_ prefix
160 aa79e62e Iustin Pop
    Nothing -> invalidOp
161 aa79e62e Iustin Pop
extractOpSummary _ = invalidOp
162 aa79e62e Iustin Pop
163 aa79e62e Iustin Pop
$(buildObject "QueuedOpCode" "qo"
164 aa79e62e Iustin Pop
  [ simpleField "input"           [t| InputOpCode |]
165 aa79e62e Iustin Pop
  , simpleField "status"          [t| OpStatus    |]
166 aa79e62e Iustin Pop
  , simpleField "result"          [t| JSValue     |]
167 aa79e62e Iustin Pop
  , defaultField [| [] |] $
168 aa79e62e Iustin Pop
    simpleField "log"             [t| [(Int, Timestamp, ELogType, JSValue)] |]
169 aa79e62e Iustin Pop
  , simpleField "priority"        [t| Int         |]
170 aa79e62e Iustin Pop
  , optionalNullSerField $
171 aa79e62e Iustin Pop
    simpleField "start_timestamp" [t| Timestamp   |]
172 aa79e62e Iustin Pop
  , optionalNullSerField $
173 aa79e62e Iustin Pop
    simpleField "exec_timestamp"  [t| Timestamp   |]
174 aa79e62e Iustin Pop
  , optionalNullSerField $
175 aa79e62e Iustin Pop
    simpleField "end_timestamp"   [t| Timestamp   |]
176 aa79e62e Iustin Pop
  ])
177 aa79e62e Iustin Pop
178 aa79e62e Iustin Pop
$(buildObject "QueuedJob" "qj"
179 aa79e62e Iustin Pop
  [ simpleField "id"                 [t| JobId          |]
180 aa79e62e Iustin Pop
  , simpleField "ops"                [t| [QueuedOpCode] |]
181 aa79e62e Iustin Pop
  , optionalNullSerField $
182 aa79e62e Iustin Pop
    simpleField "received_timestamp" [t| Timestamp      |]
183 aa79e62e Iustin Pop
  , optionalNullSerField $
184 aa79e62e Iustin Pop
    simpleField "start_timestamp"    [t| Timestamp      |]
185 aa79e62e Iustin Pop
  , optionalNullSerField $
186 aa79e62e Iustin Pop
    simpleField "end_timestamp"      [t| Timestamp      |]
187 aa79e62e Iustin Pop
  ])
188 aa79e62e Iustin Pop
189 4b49a72b Klaus Aehlig
-- | Convenience function to obtain a QueuedOpCode from a MetaOpCode
190 4b49a72b Klaus Aehlig
queuedOpCodeFromMetaOpCode :: MetaOpCode -> QueuedOpCode
191 4b49a72b Klaus Aehlig
queuedOpCodeFromMetaOpCode op =
192 4b49a72b Klaus Aehlig
  QueuedOpCode { qoInput = ValidOpCode op
193 4b49a72b Klaus Aehlig
               , qoStatus = OP_STATUS_QUEUED
194 4b49a72b Klaus Aehlig
               , qoPriority = opSubmitPriorityToRaw . opPriority . metaParams
195 4b49a72b Klaus Aehlig
                              $ op
196 4b49a72b Klaus Aehlig
               , qoLog = []
197 4b49a72b Klaus Aehlig
               , qoResult = JSNull
198 4b49a72b Klaus Aehlig
               , qoStartTimestamp = Nothing
199 4b49a72b Klaus Aehlig
               , qoEndTimestamp = Nothing
200 4b49a72b Klaus Aehlig
               , qoExecTimestamp = Nothing
201 4b49a72b Klaus Aehlig
               }
202 4b49a72b Klaus Aehlig
203 1c1132f4 Klaus Aehlig
-- | From a job-id and a list of op-codes create a job. This is
204 1c1132f4 Klaus Aehlig
-- the pure part of job creation, as allocating a new job id
205 1c1132f4 Klaus Aehlig
-- lives in IO.
206 1c1132f4 Klaus Aehlig
queuedJobFromOpCodes :: (Monad m) => JobId -> [MetaOpCode] -> m QueuedJob
207 1c1132f4 Klaus Aehlig
queuedJobFromOpCodes jobid ops = do
208 1c1132f4 Klaus Aehlig
  ops' <- mapM (`resolveDependencies` jobid) ops
209 1c1132f4 Klaus Aehlig
  return QueuedJob { qjId = jobid
210 1c1132f4 Klaus Aehlig
                   , qjOps = map queuedOpCodeFromMetaOpCode ops'
211 1c1132f4 Klaus Aehlig
                   , qjReceivedTimestamp = Nothing 
212 1c1132f4 Klaus Aehlig
                   , qjStartTimestamp = Nothing
213 1c1132f4 Klaus Aehlig
                   , qjEndTimestamp = Nothing
214 1c1132f4 Klaus Aehlig
                   }
215 1c1132f4 Klaus Aehlig
216 2af22d70 Klaus Aehlig
-- | Attach a received timestamp to a Queued Job.
217 2af22d70 Klaus Aehlig
setReceivedTimestamp :: Timestamp -> QueuedJob -> QueuedJob
218 2af22d70 Klaus Aehlig
setReceivedTimestamp ts job = job { qjReceivedTimestamp = Just ts }
219 2af22d70 Klaus Aehlig
220 a7ab381a Klaus Aehlig
-- | Change the priority of a QueuedOpCode, if it is not already
221 a7ab381a Klaus Aehlig
-- finalized.
222 a7ab381a Klaus Aehlig
changeOpCodePriority :: Int -> QueuedOpCode -> QueuedOpCode
223 a7ab381a Klaus Aehlig
changeOpCodePriority prio op =
224 a7ab381a Klaus Aehlig
  if qoStatus op > OP_STATUS_RUNNING
225 a7ab381a Klaus Aehlig
     then op
226 a7ab381a Klaus Aehlig
     else op { qoPriority = prio }
227 a7ab381a Klaus Aehlig
228 363dc9d6 Klaus Aehlig
-- | Set the state of a QueuedOpCode to canceled.
229 363dc9d6 Klaus Aehlig
cancelOpCode :: Timestamp -> QueuedOpCode -> QueuedOpCode
230 363dc9d6 Klaus Aehlig
cancelOpCode now op =
231 363dc9d6 Klaus Aehlig
  op { qoStatus = OP_STATUS_CANCELED, qoEndTimestamp = Just now }
232 363dc9d6 Klaus Aehlig
233 a6b33b72 Klaus Aehlig
-- | Change the priority of a job, i.e., change the priority of the
234 a6b33b72 Klaus Aehlig
-- non-finalized opcodes.
235 a6b33b72 Klaus Aehlig
changeJobPriority :: Int -> QueuedJob -> QueuedJob
236 a6b33b72 Klaus Aehlig
changeJobPriority prio job =
237 a6b33b72 Klaus Aehlig
  job { qjOps = map (changeOpCodePriority prio) $ qjOps job }
238 a6b33b72 Klaus Aehlig
239 363dc9d6 Klaus Aehlig
-- | Transform a QueuedJob that has not been started into its canceled form.
240 363dc9d6 Klaus Aehlig
cancelQueuedJob :: Timestamp -> QueuedJob -> QueuedJob
241 363dc9d6 Klaus Aehlig
cancelQueuedJob now job =
242 363dc9d6 Klaus Aehlig
  let ops' = map (cancelOpCode now) $ qjOps job
243 363dc9d6 Klaus Aehlig
  in job { qjOps = ops', qjEndTimestamp = Just now}
244 363dc9d6 Klaus Aehlig
245 aa79e62e Iustin Pop
-- | Job file prefix.
246 aa79e62e Iustin Pop
jobFilePrefix :: String
247 aa79e62e Iustin Pop
jobFilePrefix = "job-"
248 aa79e62e Iustin Pop
249 aa79e62e Iustin Pop
-- | Computes the filename for a given job ID.
250 aa79e62e Iustin Pop
jobFileName :: JobId -> FilePath
251 aa79e62e Iustin Pop
jobFileName jid = jobFilePrefix ++ show (fromJobId jid)
252 aa79e62e Iustin Pop
253 aa79e62e Iustin Pop
-- | Parses a job ID from a file name.
254 aa79e62e Iustin Pop
parseJobFileId :: (Monad m) => FilePath -> m JobId
255 aa79e62e Iustin Pop
parseJobFileId path =
256 aa79e62e Iustin Pop
  case stripPrefix jobFilePrefix path of
257 aa79e62e Iustin Pop
    Nothing -> fail $ "Job file '" ++ path ++
258 aa79e62e Iustin Pop
                      "' doesn't have the correct prefix"
259 aa79e62e Iustin Pop
    Just suffix -> makeJobIdS suffix
260 aa79e62e Iustin Pop
261 aa79e62e Iustin Pop
-- | Computes the full path to a live job.
262 aa79e62e Iustin Pop
liveJobFile :: FilePath -> JobId -> FilePath
263 aa79e62e Iustin Pop
liveJobFile rootdir jid = rootdir </> jobFileName jid
264 aa79e62e Iustin Pop
265 aa79e62e Iustin Pop
-- | Computes the full path to an archives job. BROKEN.
266 aa79e62e Iustin Pop
archivedJobFile :: FilePath -> JobId -> FilePath
267 aa79e62e Iustin Pop
archivedJobFile rootdir jid =
268 aa79e62e Iustin Pop
  let subdir = show (fromJobId jid `div` C.jstoreJobsPerArchiveDirectory)
269 aa79e62e Iustin Pop
  in rootdir </> jobQueueArchiveSubDir </> subdir </> jobFileName jid
270 aa79e62e Iustin Pop
271 aa79e62e Iustin Pop
-- | Map from opcode status to job status.
272 aa79e62e Iustin Pop
opStatusToJob :: OpStatus -> JobStatus
273 aa79e62e Iustin Pop
opStatusToJob OP_STATUS_QUEUED    = JOB_STATUS_QUEUED
274 aa79e62e Iustin Pop
opStatusToJob OP_STATUS_WAITING   = JOB_STATUS_WAITING
275 aa79e62e Iustin Pop
opStatusToJob OP_STATUS_SUCCESS   = JOB_STATUS_SUCCESS
276 aa79e62e Iustin Pop
opStatusToJob OP_STATUS_RUNNING   = JOB_STATUS_RUNNING
277 aa79e62e Iustin Pop
opStatusToJob OP_STATUS_CANCELING = JOB_STATUS_CANCELING
278 aa79e62e Iustin Pop
opStatusToJob OP_STATUS_CANCELED  = JOB_STATUS_CANCELED
279 aa79e62e Iustin Pop
opStatusToJob OP_STATUS_ERROR     = JOB_STATUS_ERROR
280 aa79e62e Iustin Pop
281 aa79e62e Iustin Pop
-- | Computes a queued job's status.
282 aa79e62e Iustin Pop
calcJobStatus :: QueuedJob -> JobStatus
283 aa79e62e Iustin Pop
calcJobStatus QueuedJob { qjOps = ops } =
284 aa79e62e Iustin Pop
  extractOpSt (map qoStatus ops) JOB_STATUS_QUEUED True
285 aa79e62e Iustin Pop
    where
286 aa79e62e Iustin Pop
      terminalStatus OP_STATUS_ERROR     = True
287 aa79e62e Iustin Pop
      terminalStatus OP_STATUS_CANCELING = True
288 aa79e62e Iustin Pop
      terminalStatus OP_STATUS_CANCELED  = True
289 aa79e62e Iustin Pop
      terminalStatus _                   = False
290 aa79e62e Iustin Pop
      softStatus     OP_STATUS_SUCCESS   = True
291 aa79e62e Iustin Pop
      softStatus     OP_STATUS_QUEUED    = True
292 aa79e62e Iustin Pop
      softStatus     _                   = False
293 aa79e62e Iustin Pop
      extractOpSt [] _ True = JOB_STATUS_SUCCESS
294 aa79e62e Iustin Pop
      extractOpSt [] d False = d
295 aa79e62e Iustin Pop
      extractOpSt (x:xs) d old_all
296 aa79e62e Iustin Pop
           | terminalStatus x = opStatusToJob x -- abort recursion
297 aa79e62e Iustin Pop
           | softStatus x     = extractOpSt xs d new_all -- continue unchanged
298 aa79e62e Iustin Pop
           | otherwise        = extractOpSt xs (opStatusToJob x) new_all
299 aa79e62e Iustin Pop
           where new_all = x == OP_STATUS_SUCCESS && old_all
300 aa79e62e Iustin Pop
301 02e40b13 Klaus Aehlig
-- | Determine if a job has started
302 02e40b13 Klaus Aehlig
jobStarted :: QueuedJob -> Bool
303 02e40b13 Klaus Aehlig
jobStarted = (> JOB_STATUS_QUEUED) . calcJobStatus
304 02e40b13 Klaus Aehlig
305 847df9e9 Klaus Aehlig
-- | Determine if a job is finalised.
306 847df9e9 Klaus Aehlig
jobFinalized :: QueuedJob -> Bool
307 847df9e9 Klaus Aehlig
jobFinalized = (> JOB_STATUS_RUNNING) . calcJobStatus
308 847df9e9 Klaus Aehlig
309 370f63be Klaus Aehlig
-- | Determine if a job is finalized and its timestamp is before
310 370f63be Klaus Aehlig
-- a given time.
311 370f63be Klaus Aehlig
jobArchivable :: Timestamp -> QueuedJob -> Bool
312 370f63be Klaus Aehlig
jobArchivable ts = liftA2 (&&) jobFinalized 
313 370f63be Klaus Aehlig
  $ maybe False (< ts)
314 370f63be Klaus Aehlig
    .  liftA2 (<|>) qjEndTimestamp qjStartTimestamp
315 370f63be Klaus Aehlig
316 aa79e62e Iustin Pop
-- | Determine whether an opcode status is finalized.
317 aa79e62e Iustin Pop
opStatusFinalized :: OpStatus -> Bool
318 aa79e62e Iustin Pop
opStatusFinalized = (> OP_STATUS_RUNNING)
319 aa79e62e Iustin Pop
320 aa79e62e Iustin Pop
-- | Compute a job's priority.
321 aa79e62e Iustin Pop
calcJobPriority :: QueuedJob -> Int
322 aa79e62e Iustin Pop
calcJobPriority QueuedJob { qjOps = ops } =
323 aa79e62e Iustin Pop
  helper . map qoPriority $ filter (not . opStatusFinalized . qoStatus) ops
324 aa79e62e Iustin Pop
    where helper [] = C.opPrioDefault
325 aa79e62e Iustin Pop
          helper ps = minimum ps
326 aa79e62e Iustin Pop
327 aa79e62e Iustin Pop
-- | Log but ignore an 'IOError'.
328 aa79e62e Iustin Pop
ignoreIOError :: a -> Bool -> String -> IOError -> IO a
329 aa79e62e Iustin Pop
ignoreIOError a ignore_noent msg e = do
330 aa79e62e Iustin Pop
  unless (isDoesNotExistError e && ignore_noent) .
331 aa79e62e Iustin Pop
    logWarning $ msg ++ ": " ++ show e
332 aa79e62e Iustin Pop
  return a
333 aa79e62e Iustin Pop
334 aa79e62e Iustin Pop
-- | Compute the list of existing archive directories. Note that I/O
335 aa79e62e Iustin Pop
-- exceptions are swallowed and ignored.
336 aa79e62e Iustin Pop
allArchiveDirs :: FilePath -> IO [FilePath]
337 aa79e62e Iustin Pop
allArchiveDirs rootdir = do
338 aa79e62e Iustin Pop
  let adir = rootdir </> jobQueueArchiveSubDir
339 aa79e62e Iustin Pop
  contents <- getDirectoryContents adir `Control.Exception.catch`
340 aa79e62e Iustin Pop
               ignoreIOError [] False
341 aa79e62e Iustin Pop
                 ("Failed to list queue directory " ++ adir)
342 aa79e62e Iustin Pop
  let fpaths = map (adir </>) $ filter (not . ("." `isPrefixOf`)) contents
343 aa79e62e Iustin Pop
  filterM (\path ->
344 aa79e62e Iustin Pop
             liftM isDirectory (getFileStatus (adir </> path))
345 aa79e62e Iustin Pop
               `Control.Exception.catch`
346 aa79e62e Iustin Pop
               ignoreIOError False True
347 aa79e62e Iustin Pop
                 ("Failed to stat archive path " ++ path)) fpaths
348 aa79e62e Iustin Pop
349 aa79e62e Iustin Pop
-- | Build list of directories containing job files. Note: compared to
350 aa79e62e Iustin Pop
-- the Python version, this doesn't ignore a potential lost+found
351 aa79e62e Iustin Pop
-- file.
352 aa79e62e Iustin Pop
determineJobDirectories :: FilePath -> Bool -> IO [FilePath]
353 aa79e62e Iustin Pop
determineJobDirectories rootdir archived = do
354 aa79e62e Iustin Pop
  other <- if archived
355 aa79e62e Iustin Pop
             then allArchiveDirs rootdir
356 aa79e62e Iustin Pop
             else return []
357 aa79e62e Iustin Pop
  return $ rootdir:other
358 aa79e62e Iustin Pop
359 aa79e62e Iustin Pop
-- | Computes the list of all jobs in the given directories.
360 ea7032da Petr Pudlak
getJobIDs :: [FilePath] -> IO (GenericResult IOError [JobId])
361 ea7032da Petr Pudlak
getJobIDs = runResultT . liftM concat . mapM getDirJobIDs
362 aa79e62e Iustin Pop
363 aa79e62e Iustin Pop
-- | Sorts the a list of job IDs.
364 aa79e62e Iustin Pop
sortJobIDs :: [JobId] -> [JobId]
365 aa79e62e Iustin Pop
sortJobIDs = sortBy (comparing fromJobId)
366 aa79e62e Iustin Pop
367 aa79e62e Iustin Pop
-- | Computes the list of jobs in a given directory.
368 ea7032da Petr Pudlak
getDirJobIDs :: FilePath -> ResultT IOError IO [JobId]
369 ea7032da Petr Pudlak
getDirJobIDs path =
370 ea7032da Petr Pudlak
  withErrorLogAt WARNING ("Failed to list job directory " ++ path) .
371 ea7032da Petr Pudlak
    liftM (mapMaybe parseJobFileId) $ liftIO (getDirectoryContents path)
372 aa79e62e Iustin Pop
373 aa79e62e Iustin Pop
-- | Reads the job data from disk.
374 aa79e62e Iustin Pop
readJobDataFromDisk :: FilePath -> Bool -> JobId -> IO (Maybe (String, Bool))
375 aa79e62e Iustin Pop
readJobDataFromDisk rootdir archived jid = do
376 aa79e62e Iustin Pop
  let live_path = liveJobFile rootdir jid
377 aa79e62e Iustin Pop
      archived_path = archivedJobFile rootdir jid
378 aa79e62e Iustin Pop
      all_paths = if archived
379 aa79e62e Iustin Pop
                    then [(live_path, False), (archived_path, True)]
380 aa79e62e Iustin Pop
                    else [(live_path, False)]
381 aa79e62e Iustin Pop
  foldM (\state (path, isarchived) ->
382 aa79e62e Iustin Pop
           liftM (\r -> Just (r, isarchived)) (readFile path)
383 aa79e62e Iustin Pop
             `Control.Exception.catch`
384 aa79e62e Iustin Pop
             ignoreIOError state True
385 aa79e62e Iustin Pop
               ("Failed to read job file " ++ path)) Nothing all_paths
386 aa79e62e Iustin Pop
387 aa79e62e Iustin Pop
-- | Failed to load job error.
388 aa79e62e Iustin Pop
noSuchJob :: Result (QueuedJob, Bool)
389 aa79e62e Iustin Pop
noSuchJob = Bad "Can't load job file"
390 aa79e62e Iustin Pop
391 aa79e62e Iustin Pop
-- | Loads a job from disk.
392 aa79e62e Iustin Pop
loadJobFromDisk :: FilePath -> Bool -> JobId -> IO (Result (QueuedJob, Bool))
393 aa79e62e Iustin Pop
loadJobFromDisk rootdir archived jid = do
394 aa79e62e Iustin Pop
  raw <- readJobDataFromDisk rootdir archived jid
395 aa79e62e Iustin Pop
  -- note: we need some stricness below, otherwise the wrapping in a
396 aa79e62e Iustin Pop
  -- Result will create too much lazyness, and not close the file
397 aa79e62e Iustin Pop
  -- descriptors for the individual jobs
398 aa79e62e Iustin Pop
  return $! case raw of
399 aa79e62e Iustin Pop
             Nothing -> noSuchJob
400 aa79e62e Iustin Pop
             Just (str, arch) ->
401 aa79e62e Iustin Pop
               liftM (\qj -> (qj, arch)) .
402 aa79e62e Iustin Pop
               fromJResult "Parsing job file" $ Text.JSON.decode str
403 cef3f99f Klaus Aehlig
404 b498ed42 Klaus Aehlig
-- | Write a job to disk.
405 b498ed42 Klaus Aehlig
writeJobToDisk :: FilePath -> QueuedJob -> IO (Result ())
406 b498ed42 Klaus Aehlig
writeJobToDisk rootdir job = do
407 b498ed42 Klaus Aehlig
  let filename = liveJobFile rootdir . qjId $ job
408 b498ed42 Klaus Aehlig
      content = Text.JSON.encode . Text.JSON.showJSON $ job
409 b498ed42 Klaus Aehlig
  tryAndLogIOError (atomicWriteFile filename content)
410 b498ed42 Klaus Aehlig
                   ("Failed to write " ++ filename) Ok
411 b498ed42 Klaus Aehlig
412 b5a96995 Klaus Aehlig
-- | Replicate a job to all master candidates.
413 b5a96995 Klaus Aehlig
replicateJob :: FilePath -> [Node] -> QueuedJob -> IO [(Node, ERpcError ())]
414 b5a96995 Klaus Aehlig
replicateJob rootdir mastercandidates job = do
415 b5a96995 Klaus Aehlig
  let filename = liveJobFile rootdir . qjId $ job
416 b5a96995 Klaus Aehlig
      content = Text.JSON.encode . Text.JSON.showJSON $ job
417 77676415 Klaus Aehlig
  filename' <- makeVirtualPath filename
418 557f5dad Klaus Aehlig
  callresult <- executeRpcCall mastercandidates
419 77676415 Klaus Aehlig
                  $ RpcCallJobqueueUpdate filename' content
420 557f5dad Klaus Aehlig
  let result = map (second (() <$)) callresult
421 b5a96995 Klaus Aehlig
  logRpcErrors result
422 b5a96995 Klaus Aehlig
  return result
423 b5a96995 Klaus Aehlig
424 9fd653a4 Klaus Aehlig
-- | Replicate many jobs to all master candidates.
425 9fd653a4 Klaus Aehlig
replicateManyJobs :: FilePath -> [Node] -> [QueuedJob] -> IO ()
426 9fd653a4 Klaus Aehlig
replicateManyJobs rootdir mastercandidates =
427 9fd653a4 Klaus Aehlig
  mapM_ (replicateJob rootdir mastercandidates)
428 9fd653a4 Klaus Aehlig
429 cef3f99f Klaus Aehlig
-- | Read the job serial number from disk.
430 cef3f99f Klaus Aehlig
readSerialFromDisk :: IO (Result JobId)
431 cef3f99f Klaus Aehlig
readSerialFromDisk = do
432 cef3f99f Klaus Aehlig
  filename <- jobQueueSerialFile
433 cef3f99f Klaus Aehlig
  tryAndLogIOError (readFile filename) "Failed to read serial file"
434 cef3f99f Klaus Aehlig
                   (makeJobIdS . rStripSpace)
435 ae858516 Klaus Aehlig
436 ae858516 Klaus Aehlig
-- | Allocate new job ids.
437 ae858516 Klaus Aehlig
-- To avoid races while accessing the serial file, the threads synchronize
438 ae858516 Klaus Aehlig
-- over a lock, as usual provided by an MVar.
439 ae858516 Klaus Aehlig
allocateJobIds :: [Node] -> MVar () -> Int -> IO (Result [JobId])
440 ae858516 Klaus Aehlig
allocateJobIds mastercandidates lock n =
441 ae858516 Klaus Aehlig
  if n <= 0
442 ae858516 Klaus Aehlig
    then return . Bad $ "Can only allocate positive number of job ids"
443 ae858516 Klaus Aehlig
    else do
444 ae858516 Klaus Aehlig
      takeMVar lock
445 ae858516 Klaus Aehlig
      rjobid <- readSerialFromDisk
446 ae858516 Klaus Aehlig
      case rjobid of
447 ae858516 Klaus Aehlig
        Bad s -> do
448 ae858516 Klaus Aehlig
          putMVar lock ()
449 ae858516 Klaus Aehlig
          return . Bad $ s
450 ae858516 Klaus Aehlig
        Ok jid -> do
451 ae858516 Klaus Aehlig
          let current = fromJobId jid
452 ae858516 Klaus Aehlig
              serial_content = show (current + n) ++  "\n"
453 ae858516 Klaus Aehlig
          serial <- jobQueueSerialFile
454 ae858516 Klaus Aehlig
          write_result <- try $ atomicWriteFile serial serial_content
455 ae858516 Klaus Aehlig
                          :: IO (Either IOError ())
456 ae858516 Klaus Aehlig
          case write_result of
457 ae858516 Klaus Aehlig
            Left e -> do
458 f7819050 Klaus Aehlig
              putMVar lock ()
459 ae858516 Klaus Aehlig
              let msg = "Failed to write serial file: " ++ show e
460 ae858516 Klaus Aehlig
              logError msg
461 ae858516 Klaus Aehlig
              return . Bad $ msg 
462 ae858516 Klaus Aehlig
            Right () -> do
463 77676415 Klaus Aehlig
              serial' <- makeVirtualPath serial
464 ae858516 Klaus Aehlig
              _ <- executeRpcCall mastercandidates
465 77676415 Klaus Aehlig
                     $ RpcCallJobqueueUpdate serial' serial_content
466 f7819050 Klaus Aehlig
              putMVar lock ()
467 ae858516 Klaus Aehlig
              return $ mapM makeJobId [(current+1)..(current+n)]
468 ae858516 Klaus Aehlig
469 ae858516 Klaus Aehlig
-- | Allocate one new job id.
470 ae858516 Klaus Aehlig
allocateJobId :: [Node] -> MVar () -> IO (Result JobId)
471 ae858516 Klaus Aehlig
allocateJobId mastercandidates lock = do
472 ae858516 Klaus Aehlig
  jids <- allocateJobIds mastercandidates lock 1
473 ae858516 Klaus Aehlig
  return (jids >>= monadicThe "Failed to allocate precisely one Job ID")
474 1b94c0db Klaus Aehlig
475 1b94c0db Klaus Aehlig
-- | Decide if job queue is open
476 1b94c0db Klaus Aehlig
isQueueOpen :: IO Bool
477 1b94c0db Klaus Aehlig
isQueueOpen = liftM not (jobQueueDrainFile >>= doesFileExist)
478 493d6920 Klaus Aehlig
479 ac0c5c6d Klaus Aehlig
-- | Start enqueued jobs, currently by handing them over to masterd.
480 ac0c5c6d Klaus Aehlig
startJobs :: [QueuedJob] -> IO ()
481 ac0c5c6d Klaus Aehlig
startJobs jobs = do
482 493d6920 Klaus Aehlig
  socketpath <- defaultMasterSocket
483 d605e261 Petr Pudlak
  client <- getLuxiClient socketpath
484 493d6920 Klaus Aehlig
  pickupResults <- mapM (flip callMethod client . PickupJob . qjId) jobs
485 493d6920 Klaus Aehlig
  let failures = map show $ justBad pickupResults
486 493d6920 Klaus Aehlig
  unless (null failures)
487 493d6920 Klaus Aehlig
   . logWarning . (++) "Failed to notify masterd: " . commaJoin $ failures
488 47c3c7b1 Klaus Aehlig
489 47c3c7b1 Klaus Aehlig
-- | Try to cancel a job that has already been handed over to execution,
490 47c3c7b1 Klaus Aehlig
-- currently by asking masterd to cancel it.
491 47c3c7b1 Klaus Aehlig
cancelJob :: JobId -> IO (ErrorResult JSValue)
492 47c3c7b1 Klaus Aehlig
cancelJob jid = do
493 47c3c7b1 Klaus Aehlig
  socketpath <- defaultMasterSocket
494 47c3c7b1 Klaus Aehlig
  client <- getLuxiClient socketpath
495 47c3c7b1 Klaus Aehlig
  callMethod (CancelJob jid) client
496 c867cfe1 Klaus Aehlig
497 f23daea8 Klaus Aehlig
-- | Permissions for the archive directories.
498 f23daea8 Klaus Aehlig
queueDirPermissions :: FilePermissions
499 f23daea8 Klaus Aehlig
queueDirPermissions = FilePermissions { fpOwner = Just C.masterdUser
500 f23daea8 Klaus Aehlig
                                      , fpGroup = Just C.daemonsGroup
501 f23daea8 Klaus Aehlig
                                      , fpPermissions = 0o0750
502 f23daea8 Klaus Aehlig
                                      }
503 f23daea8 Klaus Aehlig
504 c867cfe1 Klaus Aehlig
-- | Try, at most until the given endtime, to archive some of the given
505 c867cfe1 Klaus Aehlig
-- jobs, if they are older than the specified cut-off time; also replicate
506 c867cfe1 Klaus Aehlig
-- archival of the additional jobs. Return the pair of the number of jobs
507 c867cfe1 Klaus Aehlig
-- archived, and the number of jobs remaining int he queue, asuming the
508 c867cfe1 Klaus Aehlig
-- given numbers about the not considered jobs.
509 c867cfe1 Klaus Aehlig
archiveSomeJobsUntil :: ([JobId] -> IO ()) -- ^ replication function
510 c867cfe1 Klaus Aehlig
                        -> FilePath -- ^ queue root directory
511 c867cfe1 Klaus Aehlig
                        -> ClockTime -- ^ Endtime
512 c867cfe1 Klaus Aehlig
                        -> Timestamp -- ^ cut-off time for archiving jobs
513 c867cfe1 Klaus Aehlig
                        -> Int -- ^ number of jobs alread archived
514 c867cfe1 Klaus Aehlig
                        -> [JobId] -- ^ Additional jobs to replicate
515 c867cfe1 Klaus Aehlig
                        -> [JobId] -- ^ List of job-ids still to consider
516 c867cfe1 Klaus Aehlig
                        -> IO (Int, Int)
517 c867cfe1 Klaus Aehlig
archiveSomeJobsUntil replicateFn _ _ _ arch torepl [] = do
518 c867cfe1 Klaus Aehlig
  unless (null torepl) . (>> return ())
519 c867cfe1 Klaus Aehlig
   . forkIO $ replicateFn torepl
520 c867cfe1 Klaus Aehlig
  return (arch, 0)
521 c867cfe1 Klaus Aehlig
522 c867cfe1 Klaus Aehlig
archiveSomeJobsUntil replicateFn qDir endt cutt arch torepl (jid:jids) = do
523 c867cfe1 Klaus Aehlig
  let archiveMore = archiveSomeJobsUntil replicateFn qDir endt cutt
524 c867cfe1 Klaus Aehlig
      continue = archiveMore arch torepl jids
525 c867cfe1 Klaus Aehlig
      jidname = show $ fromJobId jid
526 c867cfe1 Klaus Aehlig
  time <- getClockTime
527 c867cfe1 Klaus Aehlig
  if time >= endt
528 c867cfe1 Klaus Aehlig
    then do
529 c867cfe1 Klaus Aehlig
      _ <- forkIO $ replicateFn torepl
530 c867cfe1 Klaus Aehlig
      return (arch, length (jid:jids))
531 c867cfe1 Klaus Aehlig
    else do
532 c867cfe1 Klaus Aehlig
      logDebug $ "Inspecting job " ++ jidname ++ " for archival"
533 c867cfe1 Klaus Aehlig
      loadResult <- loadJobFromDisk qDir False jid
534 c867cfe1 Klaus Aehlig
      case loadResult of
535 c867cfe1 Klaus Aehlig
        Bad _ -> continue
536 c867cfe1 Klaus Aehlig
        Ok (job, _) -> 
537 c867cfe1 Klaus Aehlig
          if jobArchivable cutt job
538 c867cfe1 Klaus Aehlig
            then do
539 c867cfe1 Klaus Aehlig
              let live = liveJobFile qDir jid
540 c867cfe1 Klaus Aehlig
                  archive = archivedJobFile qDir jid
541 0c09ecc2 Klaus Aehlig
              renameResult <- safeRenameFile queueDirPermissions
542 0c09ecc2 Klaus Aehlig
                                live archive
543 c867cfe1 Klaus Aehlig
              case renameResult of                   
544 c867cfe1 Klaus Aehlig
                Bad s -> do
545 c867cfe1 Klaus Aehlig
                  logWarning $ "Renaming " ++ live ++ " to " ++ archive
546 c867cfe1 Klaus Aehlig
                                 ++ " failed unexpectedly: " ++ s
547 c867cfe1 Klaus Aehlig
                  continue
548 c867cfe1 Klaus Aehlig
                Ok () -> do
549 c867cfe1 Klaus Aehlig
                  let torepl' = jid:torepl
550 c867cfe1 Klaus Aehlig
                  if length torepl' >= 10
551 c867cfe1 Klaus Aehlig
                    then do
552 c867cfe1 Klaus Aehlig
                      _ <- forkIO $ replicateFn torepl'
553 c867cfe1 Klaus Aehlig
                      archiveMore (arch + 1) [] jids
554 c867cfe1 Klaus Aehlig
                    else archiveMore (arch + 1) torepl' jids
555 c867cfe1 Klaus Aehlig
            else continue
556 c867cfe1 Klaus Aehlig
                   
557 c867cfe1 Klaus Aehlig
-- | Archive jobs older than the given time, but do not exceed the timeout for
558 c867cfe1 Klaus Aehlig
-- carrying out this task.
559 c867cfe1 Klaus Aehlig
archiveJobs :: ConfigData -- ^ cluster configuration
560 c867cfe1 Klaus Aehlig
               -> Int  -- ^ time the job has to be in the past in order
561 c867cfe1 Klaus Aehlig
                       -- to be archived
562 c867cfe1 Klaus Aehlig
               -> Int -- ^ timeout
563 c867cfe1 Klaus Aehlig
               -> [JobId] -- ^ jobs to consider
564 c867cfe1 Klaus Aehlig
               -> IO (Int, Int)
565 c867cfe1 Klaus Aehlig
archiveJobs cfg age timeout jids = do
566 c867cfe1 Klaus Aehlig
  now <- getClockTime
567 c867cfe1 Klaus Aehlig
  qDir <- queueDir
568 c867cfe1 Klaus Aehlig
  let endtime = addToClockTime (noTimeDiff { tdSec = timeout }) now
569 c867cfe1 Klaus Aehlig
      cuttime = if age < 0 then noTimestamp
570 c867cfe1 Klaus Aehlig
                           else advanceTimestamp (- age) (fromClockTime now)
571 c867cfe1 Klaus Aehlig
      mcs = Config.getMasterCandidates cfg
572 c867cfe1 Klaus Aehlig
      replicateFn jobs = do
573 c867cfe1 Klaus Aehlig
        let olds = map (liveJobFile qDir) jobs
574 c867cfe1 Klaus Aehlig
            news = map (archivedJobFile qDir) jobs
575 c867cfe1 Klaus Aehlig
        _ <- executeRpcCall mcs . RpcCallJobqueueRename $ zip olds news
576 c867cfe1 Klaus Aehlig
        return ()
577 c867cfe1 Klaus Aehlig
  archiveSomeJobsUntil replicateFn qDir endtime cuttime 0 [] jids