Statistics
| Branch: | Tag: | Revision:

root / src / Ganeti / JQueue.hs @ ae858516

History | View | Annotate | Download (13.8 kB)

1
{-# LANGUAGE TemplateHaskell #-}
2

    
3
{-| Implementation of the job queue.
4

    
5
-}
6

    
7
{-
8

    
9
Copyright (C) 2010, 2012 Google Inc.
10

    
11
This program is free software; you can redistribute it and/or modify
12
it under the terms of the GNU General Public License as published by
13
the Free Software Foundation; either version 2 of the License, or
14
(at your option) any later version.
15

    
16
This program is distributed in the hope that it will be useful, but
17
WITHOUT ANY WARRANTY; without even the implied warranty of
18
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
19
General Public License for more details.
20

    
21
You should have received a copy of the GNU General Public License
22
along with this program; if not, write to the Free Software
23
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
24
02110-1301, USA.
25

    
26
-}
27

    
28
module Ganeti.JQueue
29
    ( QueuedOpCode(..)
30
    , QueuedJob(..)
31
    , InputOpCode(..)
32
    , queuedOpCodeFromMetaOpCode
33
    , queuedJobFromOpCodes
34
    , Timestamp
35
    , noTimestamp
36
    , opStatusFinalized
37
    , extractOpSummary
38
    , calcJobStatus
39
    , calcJobPriority
40
    , jobFileName
41
    , liveJobFile
42
    , archivedJobFile
43
    , determineJobDirectories
44
    , getJobIDs
45
    , sortJobIDs
46
    , loadJobFromDisk
47
    , noSuchJob
48
    , readSerialFromDisk
49
    , allocateJobIds
50
    , allocateJobId
51
    ) where
52

    
53
import Control.Concurrent.MVar
54
import Control.Exception
55
import Control.Monad
56
import Data.List
57
import Data.Maybe
58
import Data.Ord (comparing)
59
-- workaround what seems to be a bug in ghc 7.4's TH shadowing code
60
import Prelude hiding (log, id)
61
import System.Directory
62
import System.FilePath
63
import System.IO.Error (isDoesNotExistError)
64
import System.Posix.Files
65
import qualified Text.JSON
66
import Text.JSON.Types
67

    
68
import Ganeti.BasicTypes
69
import qualified Ganeti.Constants as C
70
import Ganeti.JSON
71
import Ganeti.Logging
72
import Ganeti.Objects (Node)
73
import Ganeti.OpCodes
74
import Ganeti.Path
75
import Ganeti.Rpc (executeRpcCall, RpcCallJobqueueUpdate(..))
76
import Ganeti.THH
77
import Ganeti.Types
78
import Ganeti.Utils
79

    
80
-- * Data types
81

    
82
-- | The ganeti queue timestamp type
83
type Timestamp = (Int, Int)
84

    
85
-- | Missing timestamp type.
86
noTimestamp :: Timestamp
87
noTimestamp = (-1, -1)
88

    
89
-- | An input opcode.
90
data InputOpCode = ValidOpCode MetaOpCode -- ^ OpCode was parsed successfully
91
                 | InvalidOpCode JSValue  -- ^ Invalid opcode
92
                   deriving (Show, Eq)
93

    
94
-- | JSON instance for 'InputOpCode', trying to parse it and if
95
-- failing, keeping the original JSValue.
96
instance Text.JSON.JSON InputOpCode where
97
  showJSON (ValidOpCode mo) = Text.JSON.showJSON mo
98
  showJSON (InvalidOpCode inv) = inv
99
  readJSON v = case Text.JSON.readJSON v of
100
                 Text.JSON.Error _ -> return $ InvalidOpCode v
101
                 Text.JSON.Ok mo -> return $ ValidOpCode mo
102

    
103
-- | Invalid opcode summary.
104
invalidOp :: String
105
invalidOp = "INVALID_OP"
106

    
107
-- | Tries to extract the opcode summary from an 'InputOpCode'. This
108
-- duplicates some functionality from the 'opSummary' function in
109
-- "Ganeti.OpCodes".
110
extractOpSummary :: InputOpCode -> String
111
extractOpSummary (ValidOpCode metaop) = opSummary $ metaOpCode metaop
112
extractOpSummary (InvalidOpCode (JSObject o)) =
113
  case fromObjWithDefault (fromJSObject o) "OP_ID" ("OP_" ++ invalidOp) of
114
    Just s -> drop 3 s -- drop the OP_ prefix
115
    Nothing -> invalidOp
116
extractOpSummary _ = invalidOp
117

    
118
$(buildObject "QueuedOpCode" "qo"
119
  [ simpleField "input"           [t| InputOpCode |]
120
  , simpleField "status"          [t| OpStatus    |]
121
  , simpleField "result"          [t| JSValue     |]
122
  , defaultField [| [] |] $
123
    simpleField "log"             [t| [(Int, Timestamp, ELogType, JSValue)] |]
124
  , simpleField "priority"        [t| Int         |]
125
  , optionalNullSerField $
126
    simpleField "start_timestamp" [t| Timestamp   |]
127
  , optionalNullSerField $
128
    simpleField "exec_timestamp"  [t| Timestamp   |]
129
  , optionalNullSerField $
130
    simpleField "end_timestamp"   [t| Timestamp   |]
131
  ])
132

    
133
$(buildObject "QueuedJob" "qj"
134
  [ simpleField "id"                 [t| JobId          |]
135
  , simpleField "ops"                [t| [QueuedOpCode] |]
136
  , optionalNullSerField $
137
    simpleField "received_timestamp" [t| Timestamp      |]
138
  , optionalNullSerField $
139
    simpleField "start_timestamp"    [t| Timestamp      |]
140
  , optionalNullSerField $
141
    simpleField "end_timestamp"      [t| Timestamp      |]
142
  ])
143

    
144
-- | Convenience function to obtain a QueuedOpCode from a MetaOpCode
145
queuedOpCodeFromMetaOpCode :: MetaOpCode -> QueuedOpCode
146
queuedOpCodeFromMetaOpCode op =
147
  QueuedOpCode { qoInput = ValidOpCode op
148
               , qoStatus = OP_STATUS_QUEUED
149
               , qoPriority = opSubmitPriorityToRaw . opPriority . metaParams
150
                              $ op
151
               , qoLog = []
152
               , qoResult = JSNull
153
               , qoStartTimestamp = Nothing
154
               , qoEndTimestamp = Nothing
155
               , qoExecTimestamp = Nothing
156
               }
157

    
158
-- | From a job-id and a list of op-codes create a job. This is
159
-- the pure part of job creation, as allocating a new job id
160
-- lives in IO.
161
queuedJobFromOpCodes :: (Monad m) => JobId -> [MetaOpCode] -> m QueuedJob
162
queuedJobFromOpCodes jobid ops = do
163
  ops' <- mapM (`resolveDependencies` jobid) ops
164
  return QueuedJob { qjId = jobid
165
                   , qjOps = map queuedOpCodeFromMetaOpCode ops'
166
                   , qjReceivedTimestamp = Nothing 
167
                   , qjStartTimestamp = Nothing
168
                   , qjEndTimestamp = Nothing
169
                   }
170

    
171
-- | Job file prefix.
172
jobFilePrefix :: String
173
jobFilePrefix = "job-"
174

    
175
-- | Computes the filename for a given job ID.
176
jobFileName :: JobId -> FilePath
177
jobFileName jid = jobFilePrefix ++ show (fromJobId jid)
178

    
179
-- | Parses a job ID from a file name.
180
parseJobFileId :: (Monad m) => FilePath -> m JobId
181
parseJobFileId path =
182
  case stripPrefix jobFilePrefix path of
183
    Nothing -> fail $ "Job file '" ++ path ++
184
                      "' doesn't have the correct prefix"
185
    Just suffix -> makeJobIdS suffix
186

    
187
-- | Computes the full path to a live job.
188
liveJobFile :: FilePath -> JobId -> FilePath
189
liveJobFile rootdir jid = rootdir </> jobFileName jid
190

    
191
-- | Computes the full path to an archives job. BROKEN.
192
archivedJobFile :: FilePath -> JobId -> FilePath
193
archivedJobFile rootdir jid =
194
  let subdir = show (fromJobId jid `div` C.jstoreJobsPerArchiveDirectory)
195
  in rootdir </> jobQueueArchiveSubDir </> subdir </> jobFileName jid
196

    
197
-- | Map from opcode status to job status.
198
opStatusToJob :: OpStatus -> JobStatus
199
opStatusToJob OP_STATUS_QUEUED    = JOB_STATUS_QUEUED
200
opStatusToJob OP_STATUS_WAITING   = JOB_STATUS_WAITING
201
opStatusToJob OP_STATUS_SUCCESS   = JOB_STATUS_SUCCESS
202
opStatusToJob OP_STATUS_RUNNING   = JOB_STATUS_RUNNING
203
opStatusToJob OP_STATUS_CANCELING = JOB_STATUS_CANCELING
204
opStatusToJob OP_STATUS_CANCELED  = JOB_STATUS_CANCELED
205
opStatusToJob OP_STATUS_ERROR     = JOB_STATUS_ERROR
206

    
207
-- | Computes a queued job's status.
208
calcJobStatus :: QueuedJob -> JobStatus
209
calcJobStatus QueuedJob { qjOps = ops } =
210
  extractOpSt (map qoStatus ops) JOB_STATUS_QUEUED True
211
    where
212
      terminalStatus OP_STATUS_ERROR     = True
213
      terminalStatus OP_STATUS_CANCELING = True
214
      terminalStatus OP_STATUS_CANCELED  = True
215
      terminalStatus _                   = False
216
      softStatus     OP_STATUS_SUCCESS   = True
217
      softStatus     OP_STATUS_QUEUED    = True
218
      softStatus     _                   = False
219
      extractOpSt [] _ True = JOB_STATUS_SUCCESS
220
      extractOpSt [] d False = d
221
      extractOpSt (x:xs) d old_all
222
           | terminalStatus x = opStatusToJob x -- abort recursion
223
           | softStatus x     = extractOpSt xs d new_all -- continue unchanged
224
           | otherwise        = extractOpSt xs (opStatusToJob x) new_all
225
           where new_all = x == OP_STATUS_SUCCESS && old_all
226

    
227
-- | Determine whether an opcode status is finalized.
228
opStatusFinalized :: OpStatus -> Bool
229
opStatusFinalized = (> OP_STATUS_RUNNING)
230

    
231
-- | Compute a job's priority.
232
calcJobPriority :: QueuedJob -> Int
233
calcJobPriority QueuedJob { qjOps = ops } =
234
  helper . map qoPriority $ filter (not . opStatusFinalized . qoStatus) ops
235
    where helper [] = C.opPrioDefault
236
          helper ps = minimum ps
237

    
238
-- | Log but ignore an 'IOError'.
239
ignoreIOError :: a -> Bool -> String -> IOError -> IO a
240
ignoreIOError a ignore_noent msg e = do
241
  unless (isDoesNotExistError e && ignore_noent) .
242
    logWarning $ msg ++ ": " ++ show e
243
  return a
244

    
245
-- | Compute the list of existing archive directories. Note that I/O
246
-- exceptions are swallowed and ignored.
247
allArchiveDirs :: FilePath -> IO [FilePath]
248
allArchiveDirs rootdir = do
249
  let adir = rootdir </> jobQueueArchiveSubDir
250
  contents <- getDirectoryContents adir `Control.Exception.catch`
251
               ignoreIOError [] False
252
                 ("Failed to list queue directory " ++ adir)
253
  let fpaths = map (adir </>) $ filter (not . ("." `isPrefixOf`)) contents
254
  filterM (\path ->
255
             liftM isDirectory (getFileStatus (adir </> path))
256
               `Control.Exception.catch`
257
               ignoreIOError False True
258
                 ("Failed to stat archive path " ++ path)) fpaths
259

    
260
-- | Build list of directories containing job files. Note: compared to
261
-- the Python version, this doesn't ignore a potential lost+found
262
-- file.
263
determineJobDirectories :: FilePath -> Bool -> IO [FilePath]
264
determineJobDirectories rootdir archived = do
265
  other <- if archived
266
             then allArchiveDirs rootdir
267
             else return []
268
  return $ rootdir:other
269

    
270
-- Function equivalent to the \'sequence\' function, that cannot be used because
271
-- of library version conflict on Lucid.
272
-- FIXME: delete this and just use \'sequence\' instead when Lucid compatibility
273
-- will not be required anymore.
274
sequencer :: [Either IOError [JobId]] -> Either IOError [[JobId]]
275
sequencer l = fmap reverse $ foldl seqFolder (Right []) l
276

    
277
-- | Folding function for joining multiple [JobIds] into one list.
278
seqFolder :: Either IOError [[JobId]]
279
          -> Either IOError [JobId]
280
          -> Either IOError [[JobId]]
281
seqFolder (Left e) _ = Left e
282
seqFolder (Right _) (Left e) = Left e
283
seqFolder (Right l) (Right el) = Right $ el:l
284

    
285
-- | Computes the list of all jobs in the given directories.
286
getJobIDs :: [FilePath] -> IO (Either IOError [JobId])
287
getJobIDs paths = liftM (fmap concat . sequencer) (mapM getDirJobIDs paths)
288

    
289
-- | Sorts the a list of job IDs.
290
sortJobIDs :: [JobId] -> [JobId]
291
sortJobIDs = sortBy (comparing fromJobId)
292

    
293
-- | Computes the list of jobs in a given directory.
294
getDirJobIDs :: FilePath -> IO (Either IOError [JobId])
295
getDirJobIDs path = do
296
  either_contents <-
297
    try (getDirectoryContents path) :: IO (Either IOError [FilePath])
298
  case either_contents of
299
    Left e -> do
300
      logWarning $ "Failed to list job directory " ++ path ++ ": " ++ show e
301
      return $ Left e
302
    Right contents -> do
303
      let jids = foldl (\ids file ->
304
                         case parseJobFileId file of
305
                           Nothing -> ids
306
                           Just new_id -> new_id:ids) [] contents
307
      return . Right $ reverse jids
308

    
309
-- | Reads the job data from disk.
310
readJobDataFromDisk :: FilePath -> Bool -> JobId -> IO (Maybe (String, Bool))
311
readJobDataFromDisk rootdir archived jid = do
312
  let live_path = liveJobFile rootdir jid
313
      archived_path = archivedJobFile rootdir jid
314
      all_paths = if archived
315
                    then [(live_path, False), (archived_path, True)]
316
                    else [(live_path, False)]
317
  foldM (\state (path, isarchived) ->
318
           liftM (\r -> Just (r, isarchived)) (readFile path)
319
             `Control.Exception.catch`
320
             ignoreIOError state True
321
               ("Failed to read job file " ++ path)) Nothing all_paths
322

    
323
-- | Failed to load job error.
324
noSuchJob :: Result (QueuedJob, Bool)
325
noSuchJob = Bad "Can't load job file"
326

    
327
-- | Loads a job from disk.
328
loadJobFromDisk :: FilePath -> Bool -> JobId -> IO (Result (QueuedJob, Bool))
329
loadJobFromDisk rootdir archived jid = do
330
  raw <- readJobDataFromDisk rootdir archived jid
331
  -- note: we need some stricness below, otherwise the wrapping in a
332
  -- Result will create too much lazyness, and not close the file
333
  -- descriptors for the individual jobs
334
  return $! case raw of
335
             Nothing -> noSuchJob
336
             Just (str, arch) ->
337
               liftM (\qj -> (qj, arch)) .
338
               fromJResult "Parsing job file" $ Text.JSON.decode str
339

    
340
-- | Read the job serial number from disk.
341
readSerialFromDisk :: IO (Result JobId)
342
readSerialFromDisk = do
343
  filename <- jobQueueSerialFile
344
  tryAndLogIOError (readFile filename) "Failed to read serial file"
345
                   (makeJobIdS . rStripSpace)
346

    
347
-- | Allocate new job ids.
348
-- To avoid races while accessing the serial file, the threads synchronize
349
-- over a lock, as usual provided by an MVar.
350
allocateJobIds :: [Node] -> MVar () -> Int -> IO (Result [JobId])
351
allocateJobIds mastercandidates lock n =
352
  if n <= 0
353
    then return . Bad $ "Can only allocate positive number of job ids"
354
    else do
355
      takeMVar lock
356
      rjobid <- readSerialFromDisk
357
      case rjobid of
358
        Bad s -> do
359
          putMVar lock ()
360
          return . Bad $ s
361
        Ok jid -> do
362
          let current = fromJobId jid
363
              serial_content = show (current + n) ++  "\n"
364
          serial <- jobQueueSerialFile
365
          write_result <- try $ atomicWriteFile serial serial_content
366
                          :: IO (Either IOError ())
367
          putMVar lock ()
368
          case write_result of
369
            Left e -> do
370
              let msg = "Failed to write serial file: " ++ show e
371
              logError msg
372
              return . Bad $ msg 
373
            Right () -> do
374
              _ <- executeRpcCall mastercandidates
375
                     $ RpcCallJobqueueUpdate serial serial_content
376
              return $ mapM makeJobId [(current+1)..(current+n)]
377

    
378
-- | Allocate one new job id.
379
allocateJobId :: [Node] -> MVar () -> IO (Result JobId)
380
allocateJobId mastercandidates lock = do
381
  jids <- allocateJobIds mastercandidates lock 1
382
  return (jids >>= monadicThe "Failed to allocate precisely one Job ID")